首页 > 其他分享 >Pyhthon实时行情接口WebSocket接入

Pyhthon实时行情接口WebSocket接入

时间:2024-11-10 20:07:46浏览次数:5  
标签:__ .__ WebSocket self cmd 接口 Pyhthon ws def

Python做量化,如果是日内策略,需要更实时的行情数据,不然策略滑点太大,容易跑偏结果。
之前用行情网站提供的level1行情接口,实测平均更新延迟达到了6秒,超过10只股票并发请求频率过快很容易封IP。后面又尝试了买代理IP来请求,成本太高而且不稳定。
在Github上看到一个可转债的Golang高频T+0策略,对接的是WebSocket协议,拿来改了改,封装了一个Python版本的包,记录一下:

#!python3
# -*- coding:utf-8 -*-
import time
import datetime
import websocket
import zlib
import requests
import threading

# 行情订阅推送封装
class Construct:
    __token = ""
    __server_req_url = "http://jvquant.com/query/server?market=ab&type=websocket&token="
    __ws_ser_addr = ""
    __ws_conn = ""
    __lv1_field = ["time", "name", "price", "ratio", "volume", "amount", "b1", "b1p", "b2", "b2p", "b3", "b3p", "b4",
                   "b4p", "b5", "b5p", "s1", "s1p", "s2", "s2p", "s3", "s3p", "s4", "s4p", "s5", "s5p"]
    __lv2_field = ["time", "oid", "price", "vol"]

    def __init__(self, logHandle, token, onRevLv1, onRevLv2):
        if logHandle == "" or token == "" or onRevLv1 == "" or onRevLv2 == "":
            msg = "行情初始化失败:logHandle或token或onRevLv1或onRevLv2必要参数缺失。"
            print(msg)
            exit(-1)
        self.__log = logHandle
        self.__token = token
        self.__deal_lv1 = onRevLv1
        self.__deal_lv2 = onRevLv2
        self.__getSerAddr()
        self.__conn_event = threading.Event()
        self.th_handle = threading.Thread(target=self.__conn)
        self.th_handle.start()
        self.__conn_event.wait()

    def __getSerAddr(self):
        url = self.__server_req_url + self.__token
        try:
            res = requests.get(url=url)
        except Exception as e:
            self.__log(e)
            return
        if (res.json()["code"] == "0"):
            self.__ws_ser_addr = res.json()["server"]
            print("获取行情服务器地址成功:" + self.__ws_ser_addr)
        else:
            msg = "获取行情服务器地址失败:" + res.text
            self.__log(msg)
            exit(-1)

    def __conn(self):
        wsUrl = self.__ws_ser_addr + "?token=" + self.__token
        self.__ws_conn = websocket.WebSocketApp(wsUrl,
                                                on_open=self.__on_open,
                                                on_data=self.__on_message,
                                                on_error=self.__on_error,
                                                on_close=self.__on_close)
        self.__ws_conn.run_forever()
        self.__conn_event.set()
        self.__log("ws thread exited")

    def addLv1(self, codeArr):
        cmd = "add="
        lv1Codes = []
        for code in codeArr:
            lv1Codes.append("lv1_" + code)

        cmd = cmd + ",".join(lv1Codes)
        self.__log("cmd:" + cmd)
        self.__ws_conn.send(cmd)

    def addLv2(self, codeArr):
        cmd = "add="
        lv1Codes = []
        for code in codeArr:
            lv1Codes.append("lv2_" + code)

        cmd = cmd + ",".join(lv1Codes)
        self.__log("cmd:" + cmd)
        self.__ws_conn.send(cmd)

    def dealLv1(self, data):
        self.__deal_lv1(data)

    def dealLv2(self, data):
        self.__deal_lv1(data)

    def __on_open(self, ws):
        self.__conn_event.set()
        self.__log("行情连接已创建")

    def __on_error(self, ws, error):
        self.__log("行情处理error:", error)

    def __on_close(self, ws, code, msg):
        self.__log("行情服务未连接")

    def close(self):
        self.__ws_conn.close()

    def __on_message(self, ws, message, type, flag):
        # 命令返回文本消息
        if type == websocket.ABNF.OPCODE_TEXT:
            self.__log("Text响应:" + message)
        # 行情推送压缩二进制消息,在此解压缩
        if type == websocket.ABNF.OPCODE_BINARY:
            now = datetime.datetime.now()
            nStamp = time.mktime(now.timetuple())
            date = now.strftime('%Y-%m-%d')

            rb = zlib.decompress(message, -zlib.MAX_WBITS)
            text = rb.decode("utf-8")
            # self.__log("Binary响应:" + text)
            ex1 = text.split("\n")
            for e1 in ex1:
                ex2 = e1.split("=")
                if len(ex2) != 2:
                    continue
                code = ex2[0]
                hqs = ex2[1]
                if code.startswith("lv1_"):
                    code = code.replace("lv1_", "")
                    hq = hqs.split(",")
                    if len(hq) == len(self.__lv1_field):
                        hqMap = dict(zip(self.__lv1_field, hq))
                        timeStr = hqMap['time']
                        date_obj = datetime.datetime.strptime(date + ' ' + timeStr, '%Y-%m-%d %H:%M:%S')
                        tStamp = int(time.mktime(date_obj.timetuple()))
                        if abs(tStamp - nStamp) <= 2:
                            self.__deal_lv1(code, hqMap)

                if code.startswith("lv2_"):
                    code = code.replace("lv2_", "")
                    hqArr = hqs.split("|")
                    for hq in hqArr:
                        hqEx = hq.split(",")
                        if len(hqEx) == len(self.__lv2_field):
                            hqMap = dict(zip(self.__lv2_field, hqEx))
                            timeEx = hqMap['time'].split('.')
                            if len(timeEx) == 2:
                                timeStr = timeEx[0]
                                date_obj = datetime.datetime.strptime(date + ' ' + timeStr, '%Y-%m-%d %H:%M:%S')
                                tStamp = int(time.mktime(date_obj.timetuple()))
                                if abs(tStamp - nStamp) <= 2:
                                    self.__deal_lv2(code, hqMap)

引用地址:https://github.com/freevolunteer/bondTrader/blob/main/pyscript/jvUtil/HanqQing.py
订阅指令参考:https://jvquant.com/wiki.html#--9

原文地址:https://zhuanlan.zhihu.com/p/6059899873

标签:__,.__,WebSocket,self,cmd,接口,Pyhthon,ws,def
From: https://www.cnblogs.com/itHub/p/18538404

相关文章

  • stm32以太网接口:MII和RMII
    前言使用stm32和lwip进行网络通信开发时,实现结构如下:而MII和RMII就是stm32与PHY芯片之间的通信接口,类似于I2C、UART等。stm32以太网模块有专用的DMA控制器,通过AHB接口将以太网内核和存储器相连。数据发送时,先将数据从存储器以DMA传输到TXFIFO中进行缓冲,然后由MAC内核......
  • 在vue项目中如何实现权限控制,菜单权限,按钮权限,接口权限,路由权限,操作权限,数据权限如何
    在实际项目开发中,权限管理是一个关键功能,用于控制不同用户对系统资源的访问。权限是对特定资源的访问许可,权限控制的目的是确保用户只能访问到被分配的资源。例如,网站管理员可以对网站数据进行增删改查,而普通用户只能浏览。权限管理的分类根据功能的不同,权限控制可以分为......
  • C++代码优化(二): 区分接口继承和实现继承
    目录1.引言2.接口继承3.实现继承4.如何选择接口继承与实现继承5.完整实例6.总结1.引言        在C++中,区分接口继承和实现继承是一种良好的编程实践,有助于提高代码的可维护性、可读性和可扩展性。接口继承通常指的是从基类继承纯虚函数(purevirtualfunctions......
  • vue通过ollama接口调用开源模型
    先展示下最终效果: 第一步:先安装ollama,并配置对应的开源大模型。安装步骤可以查看上一篇博客:ollama搭建本地ai大模型并应用调用 第二步:需要注意两个配置,页面才可以调用1)OLLAMA_HOST="0.0.0.0:11434"2)若应用部署服务器后想调用,需要配置:OLLAMA_ORIGINS=* 第三步:js流式调......
  • List接口相关问题
    目录1.迭代器Iterator是什么2.Iterator怎么使用?有什么特点?3.如何边遍历边移除Collection中的元素?4.Iterator和ListIterator有什么区别?5.遍历一个List有哪些不同的方式?每种方法的实现原理是什么?Java中List遍历的最佳实践是什么?6.RandomAccess6.1什么是Random......
  • 浅谈 PHP 与手机 APP 开发(API 接口开发)
    一、先简单回答两个问题:1、PHP可以开发客户端?答:不可以,因为PHP是脚本语言,是负责完成B/S架构或C/S架构的S部分,即:服务端的开发。(别去纠结GTK、WinBinder)2、为什么选择PHP作为开发服务端的首选?答:跨平台(可以运行在UNIX、LINUX、WINDOWS、MacOS下)、低消耗(PHP消耗相当少的系统......
  • D61【python 接口自动化学习】- python基础之数据库
    day61数据库定义学习日期:20241107学习目标:MySQL数据库--130:MySQL入门使用学习笔记:在命令提示符内先试用MySQL使用图形化工具操作MySQLDBeaver安装DBeaver连接MySQL总结MySQL安装成功后,可以使用命令提示符查看数据库安装使用图形化工具DBeaver操作MySQL......
  • 使用 Python 流式 Websocket 传输 Binance 订单更新 附代码
    对于从事加密货币行业的任何人来说,使用RESTapi从交易所查询实时数据并不总是最佳做法,原因有很多效率低下:每个查询都需要时间,并且会显着影响性能,尤其是对于高频策略。交易所施加的限制很容易被打破,例如Binance的硬限制为每分钟1200个请求权重您只能检索有限数量的历史数......
  • 电商数据api1688接口获取商品实时数据价格比价api代码演示案例
    1688商品详情接口(接口入口)它的主要功能是允许卖家从自己的系统中快速获取商品详细信息。通过这个接口,卖家可以提取到商品的各类数据,包括但不限于商品标题、价格、优惠价、收藏数、下单人数、月销售量等。此外,还可以获取到商品的SKU图、详情页图片等信息。这些信息可以帮助卖家......
  • 如何利用 1688 API 接口获取商品信息?
    以下是利用1688API接口获取商品信息的一般步骤:一、注册成为开发者并创建应用:注册账号:访问阿里巴巴开放平台,进行开发者账号注册,点击注册账号获取key和secret,填写真实有效的基本信息,如联系方式等。创建应用:使用注册的账号登录到阿里巴巴开放平台后,创建一个新的应用。为应用......