作者:老余捞鱼
原创不易,转载请标明出处及原作者。
写在前面的话:在金融科技的浪潮中,实时数据处理和智能决策的重要性日益凸显。在本文中,我将分享如何利用Kafka和LlamaIndex构建一套基于GPT-4o的高效人工智能实时交易系统。从下载和分析欧元/美元对的日线数据,到设置Kafka数据流实时跟踪市场动态,再到GPT-4o整合了所有信息后做出买入、卖出或持有的决策,以及每步选择背后的原因分析。此次探索不仅是一个技术实践的过程,更是为您打造更高级的 AI 交易系统提供了坚实的基础,也是对未来金融交易模式的一次深度探索。
一、基础知识
本次我们使用Kafka(用于欧元/美元对的数据流)、LlamaIndex 工作流(用于无缝逻辑处理)和OpenAI的 GPT-4o 构建一个实时交易机器人,它除了图像识别外,还会整合信息做出买入、卖出或持有决策的所有信息。
1.1 Kafka
Kafka是一个开源流处理平台,由Apache软件基金会开发。它主要用于构建实时的数据流和消息队列系统。以下是Kafka的一些核心特点:
Kafka通常用于以下场景:
在本次项目中,我们将用 Kafka建立一个数据流,以每5秒接收一次新定价数据的方式跟踪欧元兑美元的买入价和卖出价。
1.2 LlamaIndex
LlamaIndex是一个开源的框架,用于构建基于自然语言处理的索引和查询系统。它允许用户通过自然语言查询非结构化数据,如文档、电子邮件或数据库中的信息。以下是LlamaIndex的一些特点:
LlamaIndex通常用于以下场景:
LlamaIndex 工作流是我们机器人逻辑的核心,它根据技术分析和 GPT-4o 图像分析的见解处理实时数据并做出交易决策。
1.3 GPT-4O
GPT-4O是OpenAI开发的一种先进的自然语言处理模型,旨在提升机器理解和生成文本的能力。通过对大量数据的训练,GPT-4O能够生成更加流畅和自然的语言,目前在数字金融领域的应用也开始崭露头角。其特点如下:
在本文中, GPT-4o 将成为一个实时交易机器人,除了图像识别外,还要整合信息做出买入、卖出或持有决策的所有信息。
1.4 工作流程
在金融领域,Kafka可以用于实时处理交易数据流,而LlamaIndex可以帮助分析和查询这些数据,从而为交易决策提供支持。两者的结合可以构建一个强大的实时交易分析系统。
本次要构建的工作流程如上图所示。
二、数据准备
我们首先使用 Selenium(一种常用于网页抓取的工具)下载每日 EUR/USD 图像图表。GPT-4 将处理该图像以获得初步数据,然后指导我们的交易机器人。代码如下:
!pip install selenium
!apt-get install chromium-driver
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
def initialize_web_driver():
"""Sets up and returns a configured Chrome WebDriver instance."""
options = webdriver.ChromeOptions()
options.add_argument('--verbose')
options.add_argument('--no-sandbox')
options.add_argument('--headless')
options.add_argument('--disable-gpu')
options.add_argument('--window-size=1920,1200')
options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=options)
return driver
driver = initialize_web_driver()
try:
# Navigate to the page
driver.get("https://www.tradingview.com/symbols/EURUSD/")
# Wait a few seconds for the page to load fully
time.sleep(4) # Adjust sleep duration if necessary
driver.refresh()
time.sleep(4)
# Locate the chart using an appropriate selector
chart_element = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.CLASS_NAME, "container-nORFfEfo"))
)
# Capture a screenshot of the chart element
time.sleep(4)
chart_element.screenshot("eurusd_chart.png")
print("Chart screenshot saved as 'eurusd_chart.png'.")
except Exception as e:
print("An error occurred:", e)
finally:
# Close the browser
driver.quit()
- 函数
initialize_web_driver
:设置用于下载欧元/美元日线图的 Chrome WebDriver。 - 图像捕捉 Image Capture:捕捉并保存欧元/美元图表截图,GPT-4o 将对其进行分析。
如上图所示,该脚本将为我们提供欧元/美元日线图的快照。
三、Kafka 实时流设置
在本节中,我们将建立一个基于 Kafka 的数据流设置,以接近实时的方式跟踪欧元兑美元的买入价和卖出价。欧元兑美元价格的数据源将从数据提供商 Investing.com 获得。为使该系统正常运行,我们将创建一个 Kafka 主题 eurusd_bidask,该主题将每 5 秒接收一次新的定价数据。这里的核心组成部分是:
- Kafka Producer:负责每 5 秒获取并向
eurusd_bidask
主题发布竞价任务数据。 - Kafka Consumer:实时接收和处理来自 Kafka 主题的数据,并将其存储起来以备进一步分析。
这种设置通过模拟实时数据馈送和处理,为构建动态交易机器人提供了一个起点。虽然由于网络请求,数据会稍有延迟,但它为更复杂的交易策略奠定了坚实的基础。
#setup kafka configuration
!wget https://downloads.apache.org/kafka/3.8.1/kafka_2.13-3.8.1.tgz
!tar -xzf kafka_2.13-3.8.1.tgz
!./kafka_2.13-3.8.1/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.8.1/config/zookeeper.properties
!./kafka_2.13-3.8.1/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.8.1/config/server.properties
!sleep 10
#remove topic
!./kafka_2.13-3.8.1/bin/kafka-topics.sh --delete --topic eurusd_bidask --bootstrap-server localhost:9092
!./kafka_2.13-3.8.1/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic eurusd_bidask
在集成整个工作流程之前,我们先测试eurusd
scraper,确保它能使用 BeautifulSoup
库每 5 秒获取一次数据。此外,我们还要检查 Kafka 队列是否正常工作,以确保数据流的无缝衔接。
!pip install kafka-python nest_asyncio
import json
import requests
import time
import pandas as pd
from kafka import KafkaProducer, KafkaConsumer
from bs4 import BeautifulSoup
import asyncio
import nest_asyncio
import threading
# Apply nest_asyncio for environments with an already running event loop (e.g., Jupyter/Colab)
nest_asyncio.apply()
# Control variable to stop the loop
stop_flag = False
# Kafka Producer Configuration
async def kafka_producer():
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
async def fetch_and_send_bid_ask():
url = 'https://www.investing.com/currencies/eur-usd-spreads'
headers = {"User-Agent": "Mozilla/5.0"}
response = requests.get(url, headers=headers)
# Check if the response is successful
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
bid_element = soup.find("span", class_="inlineblock pid-1-bid")
ask_element = soup.find("span", class_="inlineblock pid-1-ask")
# Check if bid and ask elements were found
if bid_element and ask_element:
bid_value = float(bid_element.text.replace(',', ''))
ask_value = float(ask_element.text.replace(',', ''))
message = {'bid': bid_value, 'ask': ask_value}
# Send message to Kafka
producer.send('eurusd_bidask', value=message)
producer.flush()
print(f"Producer sent bid: {bid_value}, ask: {ask_value}")
else:
print("Error: Could not find bid/ask elements on the page.")
else:
print(f"Error fetching data: Status code {response.status_code}")
# Infinite loop to capture and send data every 5 seconds
while not stop_flag:
await fetch_and_send_bid_ask()
await asyncio.sleep(5)
# Kafka Consumer Configuration
def kafka_consumer_bot():
consumer = KafkaConsumer(
'eurusd_bidask',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
auto_offset_reset='latest',
enable_auto_commit=False,
group_id='my-group'
)
df = pd.DataFrame(columns=['Bid', 'Ask', 'Mid_Price'], dtype=float)
print("Starting Kafka consumer... Listening for messages on the 'eurusd_bidask' topic.")
for msg in consumer:
if stop_flag:
break
# Debug message to show that a message has been received
print(f"Consumer received raw message: {msg.value}")
# Extract bid and ask values
bid = float(msg.value['bid'])
ask = float(msg.value['ask'])
mid_price = (bid + ask) / 2
# Add new row to DataFrame and print the updated DataFrame
new_row = pd.DataFrame({'Bid': [bid], 'Ask': [ask], 'Mid_Price': [mid_price]})
df = pd.concat([df, new_row], ignore_index=True)
print(f"Consumer processed bid: {bid:.4f}, ask: {ask:.4f}, mid_price: {mid_price:.4f}")
print("Updated DataFrame:")
print(df)
# Keep only the last 50 rows
if len(df) > 50:
df = df.iloc[-50:].reset_index(drop=True)
# Small sleep to avoid rapid polling in this example
time.sleep(5)
# Main function to run both producer and consumer
async def main():
# Start the producer in the event loop
producer_task = asyncio.create_task(kafka_producer())
# Start the consumer in a separate thread
consumer_thread = threading.Thread(target=kafka_consumer_bot, daemon=True)
consumer_thread.start()
try:
await producer_task
except asyncio.CancelledError:
pass
finally:
consumer_thread.join()
# Run the main function
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Program terminated by the user.")
Kafka 测试输出结果如下:
提示:在测试过程中尝试停止 Kafka 队列时,如果你使用 Colab 笔记本可能会遇到问题。我的建议是重启 Colab,重新安装并重新导入库。如果想测试完整的工作流程,请跳过 Kafka 测试,直接进入完整的工作流程,确保在第一步安装所有依赖项并获取映像。
四、LlamaIndex 交易决策工作流程
先安装必要的库,包括用于工作流的 llama-index
、用于 Kafka 交互的 kafka-python
,以及用于技术分析的 pandas_ta
。
# Install necessary packages
!pip install llama-index-core llama-index-llms-openai llama-index-multi-modal-llms-openai
!pip install llama_index.readers.file
!pip install pandas_ta
!pip install kafka-python
LlamaIndex 工作流是我们机器人逻辑的核心,它根据技术分析和 GPT-4o 图像分析的见解处理实时数据并做出交易决策。这些是代码的主要部分:
TradingDecisionResult
Class(交易决策结果类):定义用于存储交易决策和推理的数据结构。- 图像分析步骤:使用 GPT-4 分析捕获的欧元/美元图表。分析结果将指导机器人的初始行为。
- 数据分析步骤:该步骤利用 EMA、RSI 和布林线等技术指标分析实时买卖数据。根据这些指标,GPT会提示您做出买入、卖出或持有的决定,并说明选择的理由。
- 停止命令监听器:监听 "stop "命令,在必要时停止程序。
上图是交易机器人输出(图像分析)。
上图是交易机器人输出(交易决策和推理)。
在编程时,管理库版本是一件非常重要的事情,因为随着时间的推移,可能会出现兼容性问题和功能过时的问题。为了解决这个问题,我们使用 pip freeze
和pip show
来显示本教程中使用的所有库,确保版本和依赖关系的透明度。
!pip freeze
!pip show selenium
!pip show llama-index-core
!pip show llama-index-llms-openai
!pip show llama-index-multi-modal-llms-openai
!pip show llama_index.readers.file
!pip show pandas_ta
!pip show kafka-python
五、未来展望
在未来的教程中,我还会添加一项功能,根据机器人生成的交易建议动态调整投资金额。这一改进将使机器人对市场趋势的适应能力和反应能力更强,从而做出更具战略性和更有效率的决策。
不过值得注意的是,在当前环境下运行该机器人有一些限制,尤其是在 Colab 笔记本设置中。例如,由于网络延迟、处理滞后或数据源的限制,捕捉买入价和卖出价的网络请求可能会出现延迟,甚至时间上的轻微误差。因此,最初的机器人并不能像在与金融交易所直接相连的生产环境中那样完全实时运行。
六、观点总结
本文阐述了如何捕捉和处理实时市场数据、运用机器学习模型提炼出实用的洞察,并探讨了构建潜在高级AI驱动交易系统的核心原理。随着未来版本的迭代和教程的深入,我们将针对现有局限进行改进,比如优化数据源的连接或升级至生产级服务器以提升实时性能,从而推动该交易机器人向更高级的交易策略迈进。
- 实时交易系统的重要性:实时处理市场数据的非常重要,这能使您在金融市场中做出快速而准确的交易决策。
- 技术栈的选择:本文选择了 Kafka 用于数据流处理,LlamaIndex 用于逻辑处理和工作流程管理,GPT-4o 用于分析和决策,展示了这些技术在金融领域的应用潜力。
- 代码示例的实用性:提供了详细的代码示例,帮助读者理解如何实现实时交易系统的各个组件,包括数据采集、处理和决策。
- 环境兼容性和版本管理:指出了在 Colab 环境中运行交易机器人可能遇到的问题,并强调了管理库版本的重要性。
- 未来发展方向:预见了交易系统的未来扩展,如投资再平衡功能,以及如何通过改进数据源连接和使用生产级服务器来提高系统的实时性能。
谢谢您阅读到最后,希望本文能给您带来新的收获。祝您投资顺利!如果对文中的内容有任何疑问,请给我留言,必复。
本文内容仅限技术探讨和学习,不构成任何投资建议。
标签:AI,bid,实时,kafka,pip,ask,源代码,Open,Kafka From: https://blog.csdn.net/weixin_70955880/article/details/144206472