近期(20240301)加密货币市场涨幅喜人,不断有新资金流入。笔者之前过对趋势交易有过一段时间研究,各种策略都略有了解,但在仓位管理和风险控制上都不太精通,比较幸运的是有一公开的趋势跟踪策略不仅可以跟踪趋势 还有基于波动率的风险控制方法,被称为海龟交易系统,在上个世纪曾经风靡一时。曾经用python写过一个海龟交易(原封不动的复刻的经典策略),但是没有对接实盘,今天就利用okx平台的接口接入实盘吧。
注意 这个海龟策略,有相当长的历史了 ,很多内容已经过时了 ,用于短期套利 完全没问题,毕竟是正宗的趋势交易法 盈亏比极大, 但是要长期投资的话 必然大亏! 投资上记住,收益的90%都来自正确的资产配置,这个道理已经被经济学家验证很多次了。
点击查看代码
import asyncio
import datetime
import json
import time
from okx import MarketData, Trade, Account
from okx.websocket.WsPublicAsync import WsPublicAsync
id = "PEPE-USDT"
bar = "1m"
class Kline:
def __init__(self, max_line=55, before='', after=''):
# API 初始化
apikey = "*******"
secretkey = "******"
passphrase = "****"
flag = "1" # 实盘: 0, 模拟盘: 1
self.accountAPI = Account.AccountAPI(apikey, secretkey, passphrase, False, flag)
self.tradeAPI = Trade.TradeAPI(apikey, secretkey, passphrase, False, flag)
self.n = None
self.max_units = 6
self.max_line = max_line
md = MarketData.MarketAPI(flag='1', debug=False)
ret = md.get_mark_price_candlesticks_chai(instId=id, limit=max_line, bar=bar, before=before, after=after)
self.ks = ret['data'][0:max_line]
print(f"初始化k线:{self.ks}")
# 由rest请求构建 由websocket维护
# atr(20) 10k高低线 20k高低线 55k高低线 平均持仓成本 前一订单的持仓成本 止损是atr的n倍数 浮盈前订单+0.5atr加仓
self.atr = None # atr
self.pyramid_input = 1 # 加仓的atr倍数
self.stop_input = 4 # 止损的atr倍数
self.l10 = self.h10 = self.l20 = self.h20 = self.l55 = self.h55 = 0
self.high = self.low = self.close = self.open = 0.0
self.position_avg_price = None # 市场的平均入场价
self.netprofit = 0.0 # 已完成交易的利润
self.position_size = 0.0 # 头寸的大小
self.position_price = 0.0 # 头寸的总成本 (持仓的市值)
# 查看账户余额
result2 = self.accountAPI.get_account_balance()
money=0.0
for i in result2["data"][0]["details"]:
if i["ccy"]=="USDT":
money=float(i["cashBal"])
print(money)
if money==0.0:
exit('money error')
self.capitalLeft = money # 总资金
self.initial_capital = money # 策略初始资金
self.riskPercent = 0.002 # 单笔交易承受的风险 越高持仓越多 加仓越少
self.exit_long = None
self.enter_long = None
self.risk = 0.01
self.win = False # 上次交易是否获利
self.buyPrice = 0.0 # 上次做多的价位
self.nextBuyPrice = 0.0 # 下一次做多的价位
self.stopPrice = 0.0 # 止损价
self.totalBuys = 0 # 金字塔加仓的总数
self.inBuy = False # 是否在一个做多的头寸
self.inBuy_p = False # 是否在一个做多的头寸
self.longLevel = None
self.mode = 'L1'
self.mode_p = 'L1'
self.fake = False
self.fake_p = False
self.fakeBuyPrice = 0.0 # 假交易的价位
self.shares = 0.0
self.sxf = 0.0 # 手续费
# o h l c
# ['1709080200000', '57031.1', '57107.4', '57031', '57102.7', '63367', '63.367', '3616608.9098', '0']
def buy(self, size):
msg = self.tradeAPI.place_order(instId=id,
tdMode="cash",
clOrdId="b15",
side="buy",
ordType="market",
sz=f"{size}")
return msg
def sell(self):
# 市价全平
result = self.accountAPI.get_max_order_size(
instId=id,
tdMode="isolated"
)
sell = float(result["data"][0]['maxSell'])
msg = self.tradeAPI.place_order(instId=id,
tdMode="cash",
clOrdId="b15",
side="sell",
ordType="market",
sz=f"{sell}")
print(msg)
self.netprofit = 0.0 # 已完成交易的利润
self.position_size = 0.0 # 头寸的大小
self.position_price = 0.0 # 头寸的总成本 (持仓的市值)
# 查看账户余额
result2 = self.accountAPI.get_account_balance()
money=self.initial_capital
for i in result2["data"][0]["details"]:
if i["ccy"]=="USDT":
money=float(i["cashBal"])
print(money)
self.capitalLeft = money # 总资金
self.initial_capital = money # 策略初始资金
def close(self):
pass
def run_s(self):
self.high, self.low, self.close, self.open = float(self.ks[0][2]), float(self.ks[0][3]), float(
self.ks[0][4]), float(self.ks[0][1])
self.inBuy_p = self.inBuy
self.mode_p = self.mode
self.fake_p = self.fake
# 判断是否进入做多头寸
if not self.inBuy and (self.high > self.h20 or self.high > self.h55):
self.inBuy = True
else:
# 判断是否需要退出做多头寸
if self.inBuy:
if self.mode == 'L1' and self.low < self.l10:
self.inBuy = False
elif self.mode == 'L2' and self.low < self.l20:
self.inBuy = False
elif self.low < self.stopPrice:
self.inBuy = False
# 如果没有头寸 且突破 上次盈利 就标记为虚假头寸
if not self.inBuy_p and self.high > self.h20 and self.win:
self.fake = True
self.fakeBuyPrice = self.close
# 当上个为假订单,在此时平仓,关闭假订单 并根据盈利情况赋值给win
if self.fake_p and self.inBuy_p and not self.inBuy:
self.fake = False
self.win = self.close >= self.fakeBuyPrice
# 突破h55时始终为真订单
self.fake = False if self.high > self.h55 else self.fake
# 当突破l1 2时记录多头水平
self.longLevel = 'L1' if not self.inBuy_p and self.high > self.h20 else None
if (not self.inBuy_p or (self.inBuy_p and self.fake)) and self.high > self.h55:
self.longLevel = 'L2'
if self.longLevel is not None:
self.mode = self.longLevel
if self.longLevel in ['L1', 'L2']:
self.buyPrice = self.close
self.totalBuys = 1
self.stopPrice = self.close - (self.stop_input * self.n)
self.nextBuyPrice = self.close + (self.pyramid_input * self.n)
# 当加仓时
if self.longLevel is None and self.inBuy_p and self.high > self.nextBuyPrice and self.totalBuys < self.max_units:
self.longLevel = 'P'
self.buyPrice = self.close
self.totalBuys += 1
self.stopPrice = self.close - (self.stop_input * self.n)
self.nextBuyPrice = self.close + (self.pyramid_input * self.n)
# Tracks stops and exits, marking them with SG or SR
if self.position_avg_price is not None:
if self.longLevel is None and self.inBuy_p and self.low < self.stopPrice and self.close >= self.position_avg_price:
self.longLevel = 'SG'
elif self.longLevel is None and self.inBuy_p and self.low < self.stopPrice and self.close < self.position_avg_price:
self.longLevel = 'SR'
elif self.longLevel is None and self.mode_p == 'L1' and self.inBuy_p and (
self.low < self.l10) and self.close >= self.position_avg_price:
self.longLevel = 'SG'
elif self.longLevel is None and self.mode_p == 'L2' and self.inBuy_p and (
self.low < self.l20) and self.close >= self.position_avg_price:
self.longLevel = 'SG'
elif self.longLevel is None and self.mode_p == 'L1' and self.inBuy_p and (
self.low < self.l10) and self.close < self.position_avg_price:
self.longLevel = 'SR'
elif self.longLevel is None and self.mode_p == 'L2' and self.inBuy_p and (
self.low < self.l20) and self.close < self.position_avg_price:
self.longLevel = 'SR'
# Tracks if the trade was a win or loss.
if self.longLevel == 'SG':
self.win = True
if self.longLevel == 'SR':
self.win = False
# Variables used to tell strategy when to enter/exit trade.
self.enter_long = (self.longLevel == 'L1' or self.longLevel == 'L2' or self.longLevel == 'P') and not self.fake
self.exit_long = (self.longLevel == 'SG' or self.longLevel == 'SR') and (not self.fake)
self.risk = (self.initial_capital + self.netprofit) * self.riskPercent
self.shares = float(self.risk / (self.stop_input * self.n))
if self.position_avg_price is not None:
self.capitalLeft = self.initial_capital + self.netprofit - (self.position_size * self.position_avg_price)
else:
self.capitalLeft = self.initial_capital + self.netprofit
if self.shares * self.close > self.capitalLeft:
self.shares = max(0.0, float(self.capitalLeft / self.close))
self.shares = max(0.0, self.shares)
# 做多时更新入场价
if self.enter_long:
self.position_size += self.shares # 头寸的大小
self.position_price += self.close * self.shares # 头寸成本增加
self.position_avg_price = self.position_price / self.position_size # 市场的平均入场价
print(
f"多{self.shares}张,price:{self.close},占:{self.shares * self.close / self.initial_capital * 100}%,{datetime.datetime.fromtimestamp(int(self.ks[0][0]) / 1000.0)}")
self.sxf += (self.shares * self.close) * 0.0008
if self.shares != 0.0:
self.buy(int(self.shares * self.close))
pass
if self.exit_long:
print(
f"平{self.position_size}张,price:{self.close},{datetime.datetime.fromtimestamp(int(self.ks[0][0]) / 1000.0)}")
self.netprofit += (self.close - self.position_avg_price) * self.position_size # 已完成交易的利润
self.position_size = 0.0 # 头寸的大小
self.position_avg_price = None # 市场的平均入场价
self.position_price = 0.0 # 头寸sum
print(f"交易盈利:{self.netprofit},手续费{self.sxf}")
self.sell()
# 指标数值更新
def reload(self, k):
if self.ks[0][0] == k[0][0]:
self.ks[0] = k[0]
else:
self.ks.insert(0, k[0])
del self.ks[self.max_line - 1]
# 更新atr值
self.atr = self.reload_atr(self.ks)
self.n = self.atr
# 更新各周期最大最小值
self.l10, self.h10 = kline.hl_find(self.ks, 10)
self.l20, self.h20 = kline.hl_find(self.ks, 20)
self.l55, self.h55 = kline.hl_find(self.ks, 55)
self.run_s()
@staticmethod
def hl_find(data, window):
d = data[0:window]
# o h l c
# ['1709080200000', '57031.1', '57107.4', '57031', '57102.7', '63367', '63.367', '3616608.9098', '0']
# 初始化最大值和最小值为第一个子列表的第2个元素 本次k线不算
min_value = float(data[1][3])
max_value = float(data[1][2])
# 遍历剩余子列表,更新最大值和最小值
for sublist in d[1:]:
if float(sublist[3]) < min_value:
min_value = float(sublist[3])
if float(sublist[2]) > max_value:
max_value = float(sublist[2])
return min_value, max_value
@staticmethod
def calculate_sma(data, window):
return sum(data[:window]) / window
@staticmethod
def reload_atr(ks):
# TR = max(high - low, abs(high - close[1]), abs(low - close[1]))
count = len(ks)
tr = [0.0] * count
# o h l c
# ['1709080200000', '57031.1', '57107.4', '57031', '57102.7', '63367', '63.367', '3616608.9098', '0']
for i in range(count - 1, -1, -1):
if i == count - 1:
tr[i] = float(ks[i][2]) - float(ks[i][3])
else:
tr[i] = max(float(ks[i][2]) - float(ks[i][3]), abs(float(ks[i][2]) - float(ks[i + 1][4])),
abs(float(ks[i][3]) - float(ks[i + 1][4])))
# ATR = SMA(TR, n)
# 计算SMA,时间段为3
atr = kline.calculate_sma(tr, 20)
return atr
def publicCallback(message):
# 解析字符串为Python对象
global kline
message = json.loads(message)
if 'data' in message:
k = message['data']
kline.reload(k)
pass
async def main():
# url = "wss://wspap.okex.com:8443/ws/v5/public?brokerId=9999"
url = "wss://wspap.okx.com:8443/ws/v5/business?brokerId=9999"
ws = WsPublicAsync(url=url)
await ws.start()
args = []
arg1 = {"channel": "candle1m", "instId": f"{id}"}
args.append(arg1)
await ws.subscribe(args, publicCallback)
await asyncio.sleep(9999999999999)
print("-----------------------------------------unsubscribe all--------------------------------------------")
await ws.unsubscribe(args, None)
if __name__ == '__main__':
kline = Kline(55)
asyncio.run(main())
"""
md2 = MarketData.MarketAPI(flag='1', debug=False)
before_s = 1708120700000
# 从2023开始, before="1708099200000", after="1708120800000"
kline = Kline(55, before="1707926400000", after="1708120800000")
for i in range(100000):
time.sleep(0.2)
ret3 = \
md2.get_mark_price_candlesticks_chai(instId=id, bar=bar, before=str(before_s),
after=str(before_s + 3600000))[
'data']
before_s += 3600000
kline.reload(ret3)
"""
点击查看代码
def buy(self, size):
msg = self.tradeAPI.place_order(instId=id,
tdMode="cash",
clOrdId="b15",
side="buy",
ordType="market",
sz=f"{size}")
money = self.capitalLeft
# 查看账户余额
result2 = self.accountAPI.get_account_balance()
for i in result2["data"][0]["details"]:
if i["ccy"]=="USDT":
money=float(i["cashBal"])
print(money)
self.capitalLeft = money # 总资金
return msg
买完之后更新下持仓
还有 如果程序发错了错误的买卖指令或者明显不合理的指令 要有函数能拦截 不让指令发出 这个功能我是这样实现的
点击查看代码
# size 是真实的买单金额
def check(self,size,warning_value=0.5):
if size>self.capitalLeft*warning_value:
self.sell()
exit("order size>warning_value !")
在买入指令下达前直接检测金额是否超过警戒值 如果超过 全部头寸卖出后退出程序
其实对接还是比较简单的 几乎没有难对 就是对着api文档编程 难点全在买卖系统的建模 传统海龟交易模型 有很大的漏洞 当交易成本(手续费,流动性)太大,会在大量的止损单中损失大部分本金,这些买卖模型的建模需要非常复杂的逻辑实现 现代的交易系统大多使用强化学习,统计分析等手段实现,这里不做过多解释,最后给出程序的流程图希望对大家的投机或者赌博有所帮助。
这个流图没画完 原版还有虚假头寸和自保险下单的逻辑还有仓位控制 逻辑太复杂 懒得画了,感兴趣参考最开始我写的代码吧