8:双均线交叉
查看代码
# -*- coding:utf-8 -*-
import backtrader as bt
#####################
import pandas as pd
import os
import datetime
import matplotlib.pyplot as plt
class AceStrategy(bt.Strategy):
params = (
('周期1',5),
('周期2', 20),
)
def __init__(self):
self.dataclose = self.datas[0].close
self.sma_5 = bt.indicators.SimpleMovingAverage(
self.dataclose, period=self.params.周期1)
self.sma_60 = bt.indicators.SimpleMovingAverage(self.dataclose, period=self.params.周期2)
self.买入信号 = bt.indicators.CrossOver(self.sma_5,self.sma_60)
self.卖出信号 = bt.indicators.CrossDown(self.sma_5, self.sma_60)
self.买入信号.plotinfo.plot = False
self.卖出信号.plotinfo.plot = False
def start(self):
pass
# print(f"start!___{self.datas[0].datetime.date(0)}")
def prenext(self):
pass
# print(f"prenext___{self.datas[0].datetime.date(0)}")
def nextstart(self):
pass
# print(f'nextstart___{self.datas[0].datetime.date(0)}')
def next(self):
if not self.position:
if self.买入信号[0] > 0:
self.order = self.buy()
print(f"{self.data0.datetime.date(0)},买入!价格为{self.data0.close[0]}")
else:
if self.卖出信号[0] > 0:
self.order = self.sell()
print(f"{self.data0.datetime.date(0)},卖出!价格为{self.data0.close[0]}")
def stop(self):
print(f"stop___{self.datas[0].datetime.date(0)}")
if __name__ == '__main__':
cerebro = bt.Cerebro()
cerebro.broker.set_cash(100000.00) # 设置初始资金金额
cerebro.broker.setcommission(commission=0.0003)
cerebro = bt.Cerebro(stdstats=False)
# cerebro.addobserver(bt.observers.Broker)
cerebro.addobserver(bt.observers.Trades)
cerebro.addobserver(bt.observers.BuySell)
# cerebro.addobserver(bt.observers.DrawDown)
cerebro.addobserver(bt.observers.Value)
# cerebro.addobserver(bt.observers.TimeReturn)
初始资金 = cerebro.broker.getvalue()
print(f'初始资金:{初始资金}')
数据地址= os.path.join(os.path.join(os.getcwd(),"数据地址"),"002342.csv") #本次是单个,未来可以用循环遍历,列表表达式用if 过滤CSV
# print(数据地址)
data =pd.read_csv(数据地址,index_col = "date",parse_dates = True)
日线 = bt.feeds.PandasData( dataname=data,
fromdate=datetime.datetime(2018, 9, 10),
todate=datetime.datetime(2020, 10, 12)
)
cerebro.adddata(日线)
cerebro.addstrategy(AceStrategy)
cerebro.run()
期末资金 = cerebro.broker.getvalue()
print(f'期末资金:{期末资金}')
cerebro.plot()
9:talib指标调用
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import talib
import os
数据地址 = os.path.join(os.path.join(os.getcwd(), "数据地址"), "002342.csv") # 本次是单个,未来可以用循环遍历,列表表达式用if 过滤CSV
# print(数据地址)
data = pd.read_csv(数据地址, index_col="date", parse_dates=True)
data = data.drop(columns="ma")
# data["MA_10"] = pd.rolling_mean(data["close"],10)
# data["MA_10"] = data["close"].rolling(10).mean()
# data["ma_10_talib"] = talib.MA(np.array(data["close"]),timeperiod = 10)
MA周期= [5,10,20,30,60,120,250]
for i in MA周期:
name = "ma_" + str(i)
data[name] = talib.MA(np.array(data["close"]),timeperiod = i)
data = data.fillna(0.00)
data = data.applymap(lambda x : round(x,2))
print(data)
data['MACD'],data['MACD信号'],data['MACD柱子'] = talib.MACD(np.array(data["close"]),
fastperiod=6, slowperiod=12, signalperiod=9)
data = data.fillna(0.00)
data = data.applymap(lambda x : round(x,2))
print(data)
path = os.path.join(os.path.join(os.getcwd(), "数据地址"), "扩充002342.csv")
data.to_csv( path)
10:扩展数据
查看代码
import backtrader as bt
#####################
import pandas as pd
import os
import datetime
import matplotlib.pyplot as plt
class AcePandasData(bt.feeds.PandasData):
lines = ("ma_5","ma_10","ma_20")
params = (
("ma_5",6),
("ma_10",7),
("ma_20", 8),
)
class AceStrategy(bt.Strategy):
def __init__(self):
self.ma_5 = self.datas[0].ma_5
self.ma_10 = self.data0.ma_10
self.ma_20 = self.data0.ma_20
# self.买入信号 = bt.indicators.CrossOver(self.ma_5,self.ma_10)
# self.卖出信号 = bt.indicators.CrossDown(self.ma_5, self.ma_10)
def start(self):
pass
# print(f"start!___{self.datas[0].datetime.date(0)}")
def prenext(self):
pass
# print(f"prenext___{self.datas[0].datetime.date(0)}")
def nextstart(self):
pass
# print(f'nextstart___{self.datas[0].datetime.date(0)}')
def next(self):
print(self.data0.datetime.date(0),self.data0.ma_5[0])
print(f"日期:{self.data0.datetime.date(0)}, 5日线:{self.data0.ma_5[0]} , 10日线:{self.data0.ma_10[0]} , 20日线:{self.data0.ma_20[0]} ")
# print(self.data0.datetime.date(0),self.data0.ma_5[0])
# print(self.data0.datetime.date(0), self.ma_5[0])
# print(self.data0.datetime.date(0),self.ma_20[0])
# if not self.position:
# if self.买入信号[0] > 0:
# self.order = self.buy()
# print(f"{self.data0.datetime.date(0)},买入!价格为{self.data0.close[0]}")
# else:
# if self.卖出信号[0] > 0:
# self.order = self.sell()
# print(f"{self.data0.datetime.date(0)},卖出!价格为{self.data0.close[0]}")
def stop(self):
print(f"stop___{self.datas[0].datetime.date(0)}")
if __name__ == '__main__':
cerebro = bt.Cerebro(stdstats=False)
cerebro.broker.set_cash(100000.00) # 设置初始资金金额
cerebro.broker.setcommission(commission=0.0003)
cerebro.addobserver(bt.observers.Trades)
cerebro.addobserver(bt.observers.BuySell)
cerebro.addobserver(bt.observers.Value)
初始资金 = cerebro.broker.getvalue()
print(f'初始资金:{初始资金}')
数据地址= os.path.join(os.path.join(os.getcwd(),"数据地址"),"002342_.csv") #本次是单个,未来可以用循环遍历,列表表达式用if 过滤CSV
# print(数据地址)
data =pd.read_csv(数据地址,index_col = "date",parse_dates = True)
# 日线 = bt.feeds.PandasData( dataname=data,
# fromdate=datetime.datetime(2018, 9, 10),
# todate=datetime.datetime(2020, 10, 11)
# )
日线 = AcePandasData( dataname=data,
fromdate=datetime.datetime(2018, 9, 10),
todate=datetime.datetime(2020, 10, 19)
)
cerebro.adddata(日线)
cerebro.addstrategy(AceStrategy)
cerebro.run()
期末资金 = cerebro.broker.getvalue()
print(f'期末资金:{期末资金}')
cerebro.plot()
11:Analyzer
# -*- coding:utf-8 -*-
import backtrader as bt
#####################
import pandas as pd
import os
import datetime
class stampDutyCommissionScheme(bt.CommInfoBase):
params = (
("印花税",0.001),
("commission",0.001)
)
def _getcommission(self, size, price, pseudoexec):
if size>0:
return size*price*self.p.commission
elif size<0:
return abs(size) * (self.p.commission +self.p.印花税)
class AceStrategy(bt.Strategy):
params = (
('三日线周期', 3),
('maperiod_5',5)
)
def __init__(self):
# print(f'init___{self.datas[0].datetime.date(0)}')
self.dataclose = self.data0.close
self.sma_3 = bt.indicators.SimpleMovingAverage(
self.dataclose, period=self.params.三日线周期)
self.sma_5 = bt.indicators.SimpleMovingAverage(
self.data0.close, period=self.params.maperiod_5)
self.order = None
self.买入条件 = bt.indicators.CrossOver(self.sma_3,self.sma_5)
self.卖出条件 = bt.indicators.CrossDown(self.sma_3,self.sma_5)
self.买入条件.plotinfo.plot = False
self.卖出条件.plotinfo.plot = False
# self.sma_3.plotinfo.plot = False
# self.sma_5.plotinfo.plot = False
def start(self):
pass
def prenext(self):
pass
def nextstart(self):
pass
# def notify_order(self, order):
# if order.status in [order.Submitted, order.Accepted]:
# return
# if order.status in [order.Completed]:
# pass
# self.bar_executed = len(self)
# elif order.status in [order.Canceled, order.Margin, order.Rejected]:
# pass
# self.order = None # 无挂起
def next(self):
# if self.order:
# return
if not self.position:
if self.买入条件>0:
self.order = self.buy(size=1000)
# if self.sma_3[0]>self.sma_5[0]:
# self.order = self.buy(size=1000)
print(f"{self.datas[0].datetime.date(0)},买入!价格为{self.dataclose[0]}")
else:
if self.卖出条件 >0:
self.order = self.sell(size=1000)
# if self.sma_3[0]<self.sma_5[0]:
# self.order = self.sell(size=1000)
print(f"{self.datas[0].datetime.date(0)},卖出!价格为{self.dataclose[0]}")
def stop(self):
pass
if __name__ == '__main__':
# cerebro = bt.Cerebro()
cerebro = bt.Cerebro(stdstats=False)
cerebro.broker.set_cash(100000.00) # 设置初始资金金额
cerebro.broker.setcommission(0.01)
# cerebro.addobserver(bt.observers.Broker)
cerebro.addobserver(bt.observers.Trades)
cerebro.addobserver(bt.observers.BuySell)
# cerebro.addobserver(bt.observers.DrawDown)
cerebro.addobserver(bt.observers.Value)
# cerebro.addobserver(bt.observers.TimeReturn)
初始资金 = cerebro.broker.getvalue()
print(f'初始资金:{初始资金}')
数据地址= os.path.join(os.path.join(os.getcwd(),"数据地址"),"002342.csv")
# print(数据地址)
data =pd.read_csv(数据地址)
data.index=pd.to_datetime(data.date)
data.drop(columns=["date"],inplace=True)
# print(data)
日线 = bt.feeds.PandasData( dataname=data,
fromdate=datetime.datetime(2000, 1, 1),
todate=datetime.datetime(2020, 10, 30)
)
cerebro.adddata(日线)
cerebro.addstrategy(AceStrategy)
cerebro.addanalyzer(bt.analyzers.SharpeRatio,riskfreerate=0.02,annualize = True ,_name="夏普比率")
cerebro.addanalyzer(bt.analyzers.DrawDown, _name="回撤")
cerebro.addanalyzer(bt.analyzers.TradeAnalyzer)
策略分析汇总 = cerebro.run()
策略_1 = 策略分析汇总[0]
# print(f"夏普比率:{策略_1.analyzers.夏普比率.get_analysis()['sharperatio']}")
# print(f"最大回撤:{策略_1.analyzers.回撤.get_analysis()}")
# for 信息 in 策略_1.analyzers:
# 信息.print()
cerebro.addwriter(bt.WriterFile,rounding = 2)
期末资金 = cerebro.broker.getvalue()
print(f'期末资金:{期末资金}')
cerebro.plot(style = "candle")
12:order
# -*- coding:utf-8 -*-
import backtrader as bt
#####################
import pandas as pd
import os
import datetime
class Acecommission(bt.CommInfoBase):
params = (
("印花税",0.001),
("commission",0.00025),
)
def _getcommission(self, size, price, pseudoexec):
if size > 0:
return max(size*price*self.params.commission*100,5)
elif size<0:
return abs(size)*price*self.params.印花税 + max(size*price*self.params.commission*100,5)
class AceStrategy(bt.Strategy):
params = (
('三日线周期', 3),
('五日线周期',5)
)
def __init__(self):
self.dataclose = self.data0.close
self.sma_3 = bt.indicators.SimpleMovingAverage(
self.dataclose, period=self.params.三日线周期)
self.sma_5 = bt.indicators.SimpleMovingAverage(
self.data0.close, period=self.params.五日线周期)
self.order = None
self.买入条件 = bt.indicators.CrossOver(self.sma_3,self.sma_5)
self.卖出条件 = bt.indicators.CrossDown(self.sma_3,self.sma_5)
self.买入条件.plotinfo.plot = False
self.卖出条件.plotinfo.plot = False
self.order = None
self.买入价 =None
self.佣金 = None
self.盈利_list=[]
def start(self):
pass
def prenext(self):
print("prenext")
self.next() #如果不写 会比最小周期多1个bar 或者直接覆盖也没关系
def nextstart(self):
pass
def notify_order(self, order):
if order.status in [order.Submitted, order.Accepted]:
return
if order.status in [order.Completed]:
if order.isbuy():
print(f"已经买入.价格为{order.executed.price}\n市值:{order.executed.value}\n佣金:{order.executed.comm}")
self.买入价 = order.executed.price
self.佣金 = order.executed.comm
elif order.issell():
print(f"已经卖出.价格为{order.executed.price}\n费用:{order.executed.value}\n佣金:{order.executed.comm}\n")
self.bar_executed = len(self)
# elif order.status in [order.Canceled, order.Margin, order.Rejected]
elif order.status in [order.Canceled]:
print("订单取消\n")
elif order.status in [order.Margin]:
print("订单超时\n")
elif order.status in [order.Rejected]:
print("订单拒绝\n")
self.order = None
def notify_trade(self, trade):
if trade.isclosed:
print(f"毛收益:{round(trade.pnl,2)}......佣金:{round(trade.commission,2)}......收益:{round(trade.pnlcomm,2)}\n\n\n")
self.盈利_list.append(round(trade.pnlcomm,2))
def next(self):
# if self.order:
# return
if not self.position:
if self.买入条件>0:
self.order = self.buy(exectype=bt.Order.StopLimit,price=self.data0.close[-1],plimit=self.data0.close[-1]*0.98,size=100)
# if self.sma_3[0]>self.sma_5[0]:
# self.order = self.buy(size=1000)
print(f"{self.datas[0].datetime.date(0)},触发买入!收盘价格为{self.dataclose[0]}")
else:
if self.卖出条件 >0:
self.order = self.sell(size=100)
# if self.sma_3[0]<self.sma_5[0]:
# self.order = self.sell(size=1000)
print(f"{self.datas[0].datetime.date(0)},触发卖出!收盘价格为{self.dataclose[0]}")
def stop(self):
print(f"总共交易了{len(self.盈利_list)}笔 ,收益损失详情如下:{self.盈利_list}\n")
# for i in self._trades[self.data0][0]:
# print(f"{i.close_datetime()}毛利率:{i.pnl}")
if __name__ == '__main__':
# cerebro = bt.Cerebro()
cerebro = bt.Cerebro(stdstats=False)
cerebro.broker.set_cash(100000.00) # 设置初始资金金额
cerebro.broker.addcommissioninfo(Acecommission(印花税=0.01,commission = 0.00025))
# cerebro.broker.setcommission(0.00025)
# cerebro.addobserver(bt.observers.Broker)
cerebro.addobserver(bt.observers.Trades)
cerebro.addobserver(bt.observers.BuySell)
# cerebro.addobserver(bt.observers.DrawDown)
cerebro.addobserver(bt.observers.Value)
# cerebro.addobserver(bt.observers.TimeReturn)
初始资金 = cerebro.broker.getvalue()
print(f'初始资金:{初始资金}')
数据地址= os.path.join(os.path.join(os.getcwd(),"数据地址"),"300579.csv")
# print(数据地址)
data =pd.read_csv(数据地址,index_col = "trade_date",parse_dates = True)
data.rename(columns={"vol":"volume"},inplace=True)
# data["date"] = data["date"].map(lambda x: pd.to_datetime(str(x)).strftime("%Y-%m-%d"))
# data["date"] = data["date"].map(lambda x: pd.to_datetime(str(x)).strftime("%Y-%m-%d"))
# data.index=pd.to_datetime(data.date)
# data.drop(columns=["date"],inplace=True)
# print(data)
日线 = bt.feeds.PandasData( dataname=data,
fromdate=datetime.datetime(2019, 11, 1),
todate=datetime.datetime(2020, 10, 30)
)
cerebro.adddata(日线)
cerebro.addstrategy(AceStrategy)
cerebro.run()
期末资金 = cerebro.broker.getvalue()
print(f'期末资金:{期末资金}')
# cerebro.plot(style = "candle")
cerebro.plot()
13:ACE_SMA
# -*- coding: utf-8 -*-
"""
Spyder Editor
This is a temporary script file.
"""
import os
import pandas as pd
import numpy as np
import talib as tb
import datetime
os.chdir(r"E:\量化\ACE_backtrader\ACE量化回测学习\数据地址")
#SMA(X,N,M),求X的N日移动平均,M为权重。算法:若Y=SMA(X,N,M) 则 Y=(M*X+(N-M)*Y')/N,其中Y'表示上一周期Y值,N必须大于M
def ace_sma(df,lb,n,m,倍数=1):
df_1 = df.copy()
value_list=[]
close_list = df_1[str(lb)].to_list()
for i in range(len(df_1)):
if i < n:
Y=sum(close_list[:i+1])/(i+1)
value_list.append(Y)
else:
Y = (m*close_list[i] +(n-m)*value_list[i-1])/n
value_list.append(Y)
value_list = [i * 倍数 for i in value_list]
name = "SMA_"+str(lb)+"_" +str(n)
df_1[name] = value_list
df_1 = df_1[[name]]
df_2 = pd.merge(df,df_1,left_index = True,right_index = True,how="left")
return df_2
if __name__ == '__main__':
data = pd.read_csv("002342.csv",index_col = "date",parse_dates = True)
周期 = [5,10,20]
for i in 周期:
data= ace_sma(data,"close",i,1,1)
14:取消订单
# -*- coding:utf-8 -*-
import backtrader as bt
#####################
import pandas as pd
import os
import datetime
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei']
class Acecommission(bt.CommInfoBase):
params = (
("印花税",0.001),
("commission",0.001)
)
def _getcommission(self, size, price, pseudoexec):
if size>0:
return max(size*price*self.p.commission*100,5)
# return size * price * self.p.commission
elif size<0:
return abs(size) * price*self.p.印花税 + max(abs(size)*price*self.p.commission*100,5)
# return abs(size) * price * self.p.印花税
class AceDataframe(bt.feeds.PandasData):
lines = ("三十分钟买点日线支撑",)
params = (
("三十分钟买点日线支撑",10),
)
class AceStrategy(bt.Strategy):
def __init__(self):
self.dic = dict() #用于辨别code
for i ,d in enumerate(self.datas):
self.dic[d] = dict()
self.dic[d]["三十分钟买点日线支撑"] = d.三十分钟买点日线支撑
self.dic[d]["买入价"]=0
self.dic[d]["卖出价"]=0
self.dic[d]["order"] = None
# self.dic[d]["orderstatus"]=[]
# self.dic[d].order = None
# self.dic[d].买点买价 = 0.0
# self.dic[d].买点触发 = 0.0
def start(self):
pass
def prenext(self):
self.next()
def nextstart(self):
pass
def notify_order(self, order):
if order.status in [order.Submitted, order.Accepted]:
return
if order.status in [order.Completed]:
if order.isbuy():
print(f"已经买入.价格为{order.executed.price}\n市值:{order.executed.value}\n佣金:{order.executed.comm}\n{self.datetime.datetime()}\n")
self.买入价 = order.executed.price
self.佣金 = order.executed.comm
elif order.issell():
print(f"已经卖出.价格为{order.executed.price}\n费用:{order.executed.value}\n佣金:{order.executed.comm}\n{self.datetime.datetime()}\n")
self.bar_executed = len(self)
# elif order.status in [order.Canceled, order.Margin, order.Rejected]
elif order.status in [order.Canceled]:
print("订单取消\n")
elif order.status in [order.Margin]:
print("订单超时\n")
elif order.status in [order.Rejected]:
print("订单拒绝\n")
self.order = None
def notify_trade(self, trade):
if trade.isclosed:
print(f"毛收益:{round(trade.pnl, 2)}......佣金:{round(trade.commission, 2)}......收益:{round(trade.pnlcomm,2)}\n\n\n")
def next(self):
for i, d in enumerate(self.datas):
dt, dn = self.datetime.datetime(), d._name # 获取时间及股票代码
pos = self.getposition(d).size
if not pos:
if self.dic[d]['三十分钟买点日线支撑'][0]>0:
if d.close[0] >= self.dic[d]['三十分钟买点日线支撑'][0]:
self.dic[d]["买入价"] = self.dic[d]['三十分钟买点日线支撑'][0]
validday = dt+datetime.timedelta(days=3)
self.dic[d]["order"]=self.cancel(self.dic[d]["order"])
self.dic[d]["order"] = self.buy(exectype=bt.Order.Limit,data=d, size=1000,price=self.dic[d]["买入价"],valid=validday)
self.dic[d]["卖出价"]=self.dic[d]["买入价"]
print(dt, dn)
#
elif pos >0:
if self.dic[d]["卖出价"]*1.05<=d.close[0] or self.dic[d]["卖出价"]*0.98>=d.close[0]:
self.order = self.close()
self.dic[d]["买入价"] = 0
#
# elif pos<0:
# pass
# self.order = self.sell(data = d,size=1000)
def stop(self):
pass
# for i, d in enumerate(self.datas):
# dt, dn = self.datetime.datetime(), d._name # 获取时间及股票代码
# pos = self.getposition(d).size
# if pos>0:
# self.order = self.close()
# print(f"{self.datas[0].datetime.date(0)},卖出!价格为{self.data.close[0]}")
if __name__ == '__main__':
cerebro = bt.Cerebro(stdstats = False)
cerebro.addobserver(bt.observers.Trades)
cerebro.addobserver(bt.observers.BuySell)
cerebro.addobserver(bt.observers.Value)
cerebro.broker.set_cash(100000.00) # 设置初始资金金额
cerebro.broker.addcommissioninfo(Acecommission(印花税=0.001,commission = 0.00025))
初始资金 = cerebro.broker.getvalue()
print(f'初始资金:{初始资金}')
股票池 = [ os.path.join(r"D:\股票数据\三十支撑",i) for i in os.listdir(r"D:\股票数据\三十支撑") ]
for i in range(len(股票池)):
stk_code = 股票池[i].split("\\")[-1].split("_")[0]
data = pd.read_csv(股票池[i], index_col="date", parse_dates=True)
三十分钟线 = AceDataframe(dataname=data,
fromdate=datetime.datetime(2019, 1, 1),
todate=datetime.datetime(2020, 11, 30),
timeframe = bt.TimeFrame.Minutes,
)
cerebro.adddata(三十分钟线, name=stk_code)
cerebro.addstrategy(AceStrategy)
cerebro.run()
期末资金 = cerebro.broker.getvalue()
print(f'期末资金:{期末资金}')
# cerebro.plot(style = "candle")
标签:交叉,均线,self,cerebro,datetime,bt,import,data From: https://www.cnblogs.com/lianshanspeak/p/18108147