首页 > 其他分享 >智能升级:构建由Open AI驱动的实时交易系统,倍增收益潜力(附源代码)

智能升级:构建由Open AI驱动的实时交易系统,倍增收益潜力(附源代码)

时间:2024-12-29 12:28:36浏览次数:7  
标签:AI bid 实时 kafka pip ask 源代码 Open Kafka

作者:老余捞鱼

原创不易,转载请标明出处及原作者。

写在前面的话:在金融科技的浪潮中,实时数据处理和智能决策的重要性日益凸显。在本文中,我将分享如何利用Kafka和LlamaIndex构建一套基于GPT-4o的高效人工智能实时交易系统。从下载和分析欧元/美元对的日线数据,到设置Kafka数据流实时跟踪市场动态,再到GPT-4o整合了所有信息后做出买入、卖出或持有的决策,以及每步选择背后的原因分析。此次探索不仅是一个技术实践的过程,更是为您打造更高级的 AI 交易系统提供了坚实的基础,也是对未来金融交易模式的一次深度探索。

一、基础知识

本次我们使用Kafka(用于欧元/美元对的数据流)、LlamaIndex 工作流(用于无缝逻辑处理)和OpenAI的 GPT-4o 构建一个实时交易机器人,它除了图像识别外,还会整合信息做出买入、卖出或持有决策的所有信息。

1.1 Kafka

Kafka是一个开源流处理平台,由Apache软件基金会开发。它主要用于构建实时的数据流和消息队列系统。以下是Kafka的一些核心特点:

Kafka通常用于以下场景:

在本次项目中,我们将用 Kafka建立一个数据流,以每5秒接收一次新定价数据的方式跟踪欧元兑美元的买入价和卖出价。

1.2 LlamaIndex

LlamaIndex是一个开源的框架,用于构建基于自然语言处理的索引和查询系统。它允许用户通过自然语言查询非结构化数据,如文档、电子邮件或数据库中的信息。以下是LlamaIndex的一些特点:

LlamaIndex通常用于以下场景:

LlamaIndex 工作流是我们机器人逻辑的核心,它根据技术分析和 GPT-4o 图像分析的见解处理实时数据并做出交易决策。

1.3 GPT-4O

GPT-4O是OpenAI开发的一种先进的自然语言处理模型,旨在提升机器理解和生成文本的能力。通过对大量数据的训练,GPT-4O能够生成更加流畅和自然的语言,目前在数字金融领域的应用也开始崭露头角。其特点如下:

在本文中, GPT-4o 将成为一个实时交易机器人,除了图像识别外,还要整合信息做出买入、卖出或持有决策的所有信息。

1.4 工作流程

在金融领域,Kafka可以用于实时处理交易数据流,而LlamaIndex可以帮助分析和查询这些数据,从而为交易决策提供支持。两者的结合可以构建一个强大的实时交易分析系统。

本次要构建的工作流程如上图所示。

二、数据准备

我们首先使用 Selenium(一种常用于网页抓取的工具)下载每日 EUR/USD 图像图表。GPT-4 将处理该图像以获得初步数据,然后指导我们的交易机器人。代码如下:

!pip install selenium
!apt-get install chromium-driver
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time

def initialize_web_driver():
    """Sets up and returns a configured Chrome WebDriver instance."""
    options = webdriver.ChromeOptions()
    options.add_argument('--verbose')
    options.add_argument('--no-sandbox')
    options.add_argument('--headless')
    options.add_argument('--disable-gpu')
    options.add_argument('--window-size=1920,1200')
    options.add_argument('--disable-dev-shm-usage')
    driver = webdriver.Chrome(options=options)
    return driver

driver = initialize_web_driver()

try:
    # Navigate to the page
    driver.get("https://www.tradingview.com/symbols/EURUSD/")

    # Wait a few seconds for the page to load fully
    time.sleep(4)  # Adjust sleep duration if necessary
    driver.refresh()
    time.sleep(4)

    # Locate the chart using an appropriate selector
    chart_element = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.CLASS_NAME, "container-nORFfEfo"))
    )

    # Capture a screenshot of the chart element
    time.sleep(4)
    chart_element.screenshot("eurusd_chart.png")
    print("Chart screenshot saved as 'eurusd_chart.png'.")

except Exception as e:
    print("An error occurred:", e)

finally:
    # Close the browser
    driver.quit()

  • 函数 initialize_web_driver:设置用于下载欧元/美元日线图的 Chrome WebDriver。
  • 图像捕捉 Image Capture捕捉并保存欧元/美元图表截图,GPT-4o 将对其进行分析。

如上图所示,该脚本将为我们提供欧元/美元日线图的快照。

三、Kafka 实时流设置

在本节中,我们将建立一个基于 Kafka 的数据流设置,以接近实时的方式跟踪欧元兑美元的买入价和卖出价。欧元兑美元价格的数据源将从数据提供商 Investing.com 获得。为使该系统正常运行,我们将创建一个 Kafka 主题 eurusd_bidask,该主题将每 5 秒接收一次新的定价数据。这里的核心组成部分是:

  • Kafka Producer:负责每 5 秒获取并向 eurusd_bidask主题发布竞价任务数据。
  • Kafka Consumer:实时接收和处理来自 Kafka 主题的数据,并将其存储起来以备进一步分析。

这种设置通过模拟实时数据馈送和处理,为构建动态交易机器人提供了一个起点。虽然由于网络请求,数据会稍有延迟,但它为更复杂的交易策略奠定了坚实的基础。

#setup kafka configuration
!wget https://downloads.apache.org/kafka/3.8.1/kafka_2.13-3.8.1.tgz
!tar -xzf kafka_2.13-3.8.1.tgz
!./kafka_2.13-3.8.1/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.8.1/config/zookeeper.properties
!./kafka_2.13-3.8.1/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.8.1/config/server.properties
!sleep 10
#remove topic
!./kafka_2.13-3.8.1/bin/kafka-topics.sh --delete --topic eurusd_bidask --bootstrap-server localhost:9092
!./kafka_2.13-3.8.1/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic eurusd_bidask

在集成整个工作流程之前,我们先测试eurusdscraper,确保它能使用 BeautifulSoup库每 5 秒获取一次数据。此外,我们还要检查 Kafka 队列是否正常工作,以确保数据流的无缝衔接。

!pip install kafka-python nest_asyncio

import json
import requests
import time
import pandas as pd
from kafka import KafkaProducer, KafkaConsumer
from bs4 import BeautifulSoup
import asyncio
import nest_asyncio
import threading

# Apply nest_asyncio for environments with an already running event loop (e.g., Jupyter/Colab)
nest_asyncio.apply()

# Control variable to stop the loop
stop_flag = False

# Kafka Producer Configuration
async def kafka_producer():
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )

    async def fetch_and_send_bid_ask():
        url = 'https://www.investing.com/currencies/eur-usd-spreads'
        headers = {"User-Agent": "Mozilla/5.0"}
        response = requests.get(url, headers=headers)
        
        # Check if the response is successful
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, 'html.parser')
            bid_element = soup.find("span", class_="inlineblock pid-1-bid")
            ask_element = soup.find("span", class_="inlineblock pid-1-ask")
            
            # Check if bid and ask elements were found
            if bid_element and ask_element:
                bid_value = float(bid_element.text.replace(',', ''))
                ask_value = float(ask_element.text.replace(',', ''))
                message = {'bid': bid_value, 'ask': ask_value}
                
                # Send message to Kafka
                producer.send('eurusd_bidask', value=message)
                producer.flush()
                print(f"Producer sent bid: {bid_value}, ask: {ask_value}")
            else:
                print("Error: Could not find bid/ask elements on the page.")
        else:
            print(f"Error fetching data: Status code {response.status_code}")

    # Infinite loop to capture and send data every 5 seconds
    while not stop_flag:
        await fetch_and_send_bid_ask()
        await asyncio.sleep(5)

# Kafka Consumer Configuration
def kafka_consumer_bot():
    consumer = KafkaConsumer(
        'eurusd_bidask',
        bootstrap_servers=['localhost:9092'],
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        auto_offset_reset='latest',
        enable_auto_commit=False,
        group_id='my-group'
    )

    df = pd.DataFrame(columns=['Bid', 'Ask', 'Mid_Price'], dtype=float)

    print("Starting Kafka consumer... Listening for messages on the 'eurusd_bidask' topic.")
    
    for msg in consumer:
        if stop_flag:
            break
        
        # Debug message to show that a message has been received
        print(f"Consumer received raw message: {msg.value}")

        # Extract bid and ask values
        bid = float(msg.value['bid'])
        ask = float(msg.value['ask'])
        mid_price = (bid + ask) / 2

        # Add new row to DataFrame and print the updated DataFrame
        new_row = pd.DataFrame({'Bid': [bid], 'Ask': [ask], 'Mid_Price': [mid_price]})
        df = pd.concat([df, new_row], ignore_index=True)
        
        print(f"Consumer processed bid: {bid:.4f}, ask: {ask:.4f}, mid_price: {mid_price:.4f}")
        print("Updated DataFrame:")
        print(df)

        # Keep only the last 50 rows
        if len(df) > 50:
            df = df.iloc[-50:].reset_index(drop=True)
        
        # Small sleep to avoid rapid polling in this example
        time.sleep(5)

# Main function to run both producer and consumer
async def main():
    # Start the producer in the event loop
    producer_task = asyncio.create_task(kafka_producer())

    # Start the consumer in a separate thread
    consumer_thread = threading.Thread(target=kafka_consumer_bot, daemon=True)
    consumer_thread.start()

    try:
        await producer_task
    except asyncio.CancelledError:
        pass
    finally:
        consumer_thread.join()

# Run the main function
try:
    asyncio.run(main())
except KeyboardInterrupt:
    print("Program terminated by the user.")

Kafka 测试输出结果如下:

提示:在测试过程中尝试停止 Kafka 队列时,如果你使用 Colab 笔记本可能会遇到问题。我的建议是重启 Colab,重新安装并重新导入库。如果想测试完整的工作流程,请跳过 Kafka 测试,直接进入完整的工作流程,确保在第一步安装所有依赖项并获取映像。

四、LlamaIndex 交易决策工作流程

先安装必要的库,包括用于工作流的 llama-index 、用于 Kafka 交互的 kafka-python,以及用于技术分析的 pandas_ta

# Install necessary packages
!pip install llama-index-core llama-index-llms-openai llama-index-multi-modal-llms-openai
!pip install llama_index.readers.file
!pip install pandas_ta
!pip install kafka-python

LlamaIndex 工作流是我们机器人逻辑的核心,它根据技术分析和 GPT-4o 图像分析的见解处理实时数据并做出交易决策。这些是代码的主要部分:

  • TradingDecisionResult Class(交易决策结果类):定义用于存储交易决策和推理的数据结构。
  • 图像分析步骤:使用 GPT-4 分析捕获的欧元/美元图表。分析结果将指导机器人的初始行为。
  • 数据分析步骤:该步骤利用 EMA、RSI 和布林线等技术指标分析实时买卖数据。根据这些指标,GPT会提示您做出买入、卖出或持有的决定,并说明选择的理由。
  • 停止命令监听器:监听 "stop "命令,在必要时停止程序。

上图是交易机器人输出(图像分析)。

上图是交易机器人输出(交易决策和推理)。

在编程时,管理库版本是一件非常重要的事情,因为随着时间的推移,可能会出现兼容性问题和功能过时的问题。为了解决这个问题,我们使用 pip freezepip show 来显示本教程中使用的所有库,确保版本和依赖关系的透明度。

!pip freeze
!pip show selenium
!pip show llama-index-core
!pip show llama-index-llms-openai
!pip show llama-index-multi-modal-llms-openai
!pip show llama_index.readers.file
!pip show pandas_ta
!pip show kafka-python

五、未来展望

在未来的教程中,我还会添加一项功能,根据机器人生成的交易建议动态调整投资金额。这一改进将使机器人对市场趋势的适应能力和反应能力更强,从而做出更具战略性和更有效率的决策。

不过值得注意的是,在当前环境下运行该机器人有一些限制,尤其是在 Colab 笔记本设置中。例如,由于网络延迟、处理滞后或数据源的限制,捕捉买入价和卖出价的网络请求可能会出现延迟,甚至时间上的轻微误差。因此,最初的机器人并不能像在与金融交易所直接相连的生产环境中那样完全实时运行。

代码下载地址:MediumArticles/Medium_EffortlessPressArticleGeneration.ipynb at main · alexyu2013/MediumArticles · GitHub

六、观点总结

本文阐述了如何捕捉和处理实时市场数据、运用机器学习模型提炼出实用的洞察,并探讨了构建潜在高级AI驱动交易系统的核心原理。随着未来版本的迭代和教程的深入,我们将针对现有局限进行改进,比如优化数据源的连接或升级至生产级服务器以提升实时性能,从而推动该交易机器人向更高级的交易策略迈进。

  • 实时交易系统的重要性:实时处理市场数据的非常重要,这能使您在金融市场中做出快速而准确的交易决策。
  • 技术栈的选择:本文选择了 Kafka 用于数据流处理,LlamaIndex 用于逻辑处理和工作流程管理,GPT-4o 用于分析和决策,展示了这些技术在金融领域的应用潜力。
  • 代码示例的实用性:提供了详细的代码示例,帮助读者理解如何实现实时交易系统的各个组件,包括数据采集、处理和决策。
  • 环境兼容性和版本管理:指出了在 Colab 环境中运行交易机器人可能遇到的问题,并强调了管理库版本的重要性。
  • 未来发展方向:预见了交易系统的未来扩展,如投资再平衡功能,以及如何通过改进数据源连接和使用生产级服务器来提高系统的实时性能。

谢谢您阅读到最后,希望本文能给您带来新的收获。祝您投资顺利!如果对文中的内容有任何疑问,请给我留言,必复。


本文内容仅限技术探讨和学习,不构成任何投资建议。

标签:AI,bid,实时,kafka,pip,ask,源代码,Open,Kafka
From: https://blog.csdn.net/weixin_70955880/article/details/144206472

相关文章

  • opencv只在bin目录下编译dll,在lib目录下编译lib,在bin目录下不编译测试程序的各种exe
    penCV是一个开源的计算机视觉库,它提供了多种编程语言的接口。如果你只想编译出DLL和Lib库文件,而不编译EXE(可执行文件),这通常是因为你想要进行某种形式的动态链接或者库的分发,而不关心EXE文件本身。要实现这一点,你可以在CMake配置OpenCV时,设置BUILD_opencv_world标志为ON,这样会生成......
  • Go 并发之WaitGroup,并发锁,Context
    目录1Go并发1.1WaitGroup1.2并发锁1.2.1互斥锁1.2.2读写互斥锁1.2.3sync.Once1.2.4sync.Map1.3Context1.3.1简介1.3.2主要功能1.3.3使用示例1.3.3.1取消信号1.3.3.2设置超时1.3.3.3传递值1Go并发1.1WaitGroupsync.WaitGroup是Go标准库提供的一种同步原语,常......
  • 主动式AI(代理式)与生成式AI的关键差异与影响
    大型语言模型(LLMs)如GPT可以生成文本、回答问题并协助完成许多任务。然而,它们是被动的,这意味着它们仅根据已学到的模式对接收到的输入作出响应。LLMs无法自行决策;除此之外,它们无法规划或适应变化的环境。主动式AI(代理式)的出现正是为了解决这一问题。与生成式AILLMs不同,主动式AI(......
  • 这是最新的方法获取微信头像和昵称,其它所有的方法都已经失效,所有AI给的答案也都是旧的
    文章目录1、my.wxml2、my.js3、wx.getStorageInfo4、微信小程序的本地存储能存储多久?wx.getUserProfile:https://developers.weixin.qq.com/miniprogram/dev/api/open-api/user-info/wx.getUserProfile.htmlwx.getUserInfo:https://developers.weixin.qq.com/mi......
  • [转]关于opencv4.0中“未定义标识符cvNamedWindow”的解决方法
    这个问题困扰了很久,在网上找了很多方法,但是都没用,比如:“在代码开头加入头文件#include<opencv2/highgui/highgui_c.h>”之类的方法,就完全没解决问题。 经过探索,终于找到了解决之道。原来这是由于opencv4.0和之前的版本中有一些命令发生了变化,比如在之前的版本中cvNamedWindo......
  • [开源]用QT+OPENCV做了一个图片处理软件
    yusongmin1/QT_OPENCV界面如下基本功能,基本上没有基于opencv的库函数,手搓关于常见的传统图片处理的算法的开发,包括了内置图片,图片加载与保存,图片变换GRB2GRAY,RGB2HSV镜像,水平镜像,垂直镜像,负90度到正90度之间的旋转,阈值分割,反向腐蚀膨胀,开运算闭运算直方图......
  • 【影刀AI Power搭建黑神话小游戏】
    初次接触影刀AIPower,分享一下自己的使用心得,这里搭建了一个简单的黑神话小游戏,首先进入首页,我们选择创建一个智能体创建后,拖动指令进行搭建,这里是连线方式让流程组合起来,可以根据自己的逻辑搭建,可以选择发送卡片文本选项等消息,可以获取用户输入,根据反馈来实现后续流程,还可......
  • RAID 是什么?
    在Linux中,RAID(RedundantArraysofIndependentDisks,独立磁盘冗余阵列)是一种磁盘存储技术,它通过组合多个独立磁盘(物理磁盘)来提供更高的数据可靠性、性能和数据冗余。RAID技术最初是由加州大学伯克利分校在1987年提出的,目的是通过组合小的廉价磁盘来代替大的昂贵磁盘,同时提供数据......
  • 纳德拉的远见与中小企业的AI破局之道:先有组织进化,才有技术突破
    最近,微软CEO萨提亚·纳德拉的一次访谈引发了广泛关注。他抛出了一个在AI时代至关重要的观点:先有组织进化,才有技术突破。这句话看似简单,却蕴含着深刻的洞察,尤其对于想要在AI浪潮中实现弯道超车的中小企业而言,更是指明了一条清晰的路径。一、别只盯着AI光环,先看看你的“地基”稳不......
  • 纳德拉的远见与中小企业的AI破局之道:先有组织进化,才有技术突破
    最近,微软CEO萨提亚·纳德拉的一次访谈引发了广泛关注。他抛出了一个在AI时代至关重要的观点:先有组织进化,才有技术突破。这句话看似简单,却蕴含着深刻的洞察,尤其对于想要在AI浪潮中实现弯道超车的中小企业而言,更是指明了一条清晰的路径。一、别只盯着AI光环,先看看你的“地基”稳不......