21.6 数据预处理
数据预处理是训练高质量机器学习模型的关键步骤,在这一步需要检查缺失数据并进行特征工程,以将数据转换为适合模型训练的状态。本项目的数据预处理江湾城以下工作:
- 添加技术指标:在实际交易中,需要考虑各种信息,例如历史股价、当前持仓股票、技术指标等。本文演示了两个趋势跟踪技术指标:MACD和RSI。
- 添加紧急指数:风险厌恶反映了投资者是否选择保留资本,它还在面对不同市场波动水平时影响交易策略。为了在最坏的情况下控制风险,比如2007-2008年的金融危机,FinRL使用了金融紧急指数来衡量极端资产价格波动。
注意:风险厌恶是指个体或投资者对于面临潜在风险时的心理和行为倾向。在金融领域,风险厌恶通常表现为投资者更倾向于选择相对较低的风险投资,即使这可能意味着较低的收益。
1. 技术指标
(1)使用类FeatureEngineer对金融数据进行预处理和特征工程。通过设置参数use_technical_indicator=True,启用了技术指标(如MACD和RSI),为模型提供更多市场趋势和力量的信息。通过user_defined_feature = False禁用了风险紧急指数,表示不考虑极端波动对模型的影响。最后,通过preprocess_data方法对数据进行标准化和处理缺失值等操作,为后续的强化学习模型训练提供准备。这些步骤旨在提高模型的性能和对金融市场行为的理解。
fe = FeatureEngineer(
use_technical_indicator=True,
use_turbulence=False,
user_defined_feature = False)
df = fe.preprocess_data(df)
(2)获取数据框 df 的形状,返回一个表示数据框维度的元组(行数,列数)。
df.shape
上述代码的目的是查看数据经过预处理和特征工程后的规模,即数据框中的行数和列数。通过df.shape,可以确认处理后的数据的规模,确保数据准备步骤没有导致数据维度的意外变化。执行后会输出:
(97524, 17)
(3)通过如下代码显示 DataFrame df 的前几行数据,目的是展示经过预处理和特征工程后的数据的头部,以便查看数据的结构和内容。
df.head()
执行后会输出:
# 将协方差矩阵作为状态添加
df = df.sort_values(['date', 'tic'], ignore_index=True)
df.index = df.date.factorize()[0]
cov_list = [] # 存储协方差矩阵
return_list = [] # 存储收益率
# 回看窗口为一年
lookback = 252
for i in range(lookback, len(df.index.unique())):
# 提取过去一年的数据
data_lookback = df.loc[i - lookback:i, :]
price_lookback = data_lookback.pivot_table(index='date', columns='tic', values='close')
return_lookback = price_lookback.pct_change().dropna()
return_list.append(return_lookback)
# 计算协方差矩阵
covs = return_lookback.cov().values
cov_list.append(covs)
# 创建包含协方差矩阵和收益率的数据框
df_cov = pd.DataFrame({'date': df.date.unique()[lookback:], 'cov_list': cov_list, 'return_list': return_list})
df = df.merge(df_cov, on='date')
df = df.sort_values(['date', 'tic']).reset_index(drop=True)
2. 将协方差矩阵添加为状态
在金融建模的背景下,特别是在投资组合优化和风险管理中,协方差矩阵是一个关键的度量标准。它捕捉了投资组合中不同资产之间运动关系的程度,为整体风险和分散化提供了洞察。通过将协方差矩阵添加为状态,模型能能够更全面地理解不同资产之间的相互关系和依赖性。这有助于提高模型对整体市场风险和资产关联性的认识。
(1)如下代码将协方差矩阵作为状态加入数据集,实现了对金融数据的处理。通过对过去一年的股票收益率数据计算协方差矩阵,提取了相应的协方差和收益率信息,并将其添加到数据框中。这有助于模型更全面地理解股票之间的关联性和风险特征,为后续强化学习模型的训练提供更丰富的状态信息。
# 将协方差矩阵作为状态添加
df = df.sort_values(['date', 'tic'], ignore_index=True)
df.index = df.date.factorize()[0]
cov_list = [] # 存储协方差矩阵
return_list = [] # 存储收益率
# 回看窗口为一年
lookback = 252
for i in range(lookback, len(df.index.unique())):
# 提取过去一年的数据
data_lookback = df.loc[i - lookback:i, :]
price_lookback = data_lookback.pivot_table(index='date', columns='tic', values='close')
return_lookback = price_lookback.pct_change().dropna()
return_list.append(return_lookback)
# 计算协方差矩阵
covs = return_lookback.cov().values
cov_list.append(covs)
# 创建包含协方差矩阵和收益率的数据框
df_cov = pd.DataFrame({'date': df.date.unique()[lookback:], 'cov_list': cov_list, 'return_list': return_list})
df = df.merge(df_cov, on='date')
df = df.sort_values(['date', 'tic']).reset_index(drop=True)
(2)通过df.shape获取 DataFrame df 的形状,返回一个表示数据框维度的元组(行数,列数)。
df.shape
上述代码的目的是查看数据框 df 经过协方差矩阵添加后的规模,即数据框中的行数和列数。通过df.shape,你可以确认处理后的数据的规模,确保数据维度的正确性。执行后会输出:
(90468, 19)
(3)通过如下代码显示 DataFrame df 的前几行数据。这行代码的目的是展示经过协方差矩阵添加后的数据的头部,以便查看数据的结构和内容。通过这种方式,你可以快速了解处理后数据的格式,包括日期、股票代码、技术指标、协方差矩阵等信息。这有助于确保数据准备过程的正确性,为后续建模和分析提供良好的基础。
df.head()
执行后会输出:
date open high low close adjcp volume tic day macd boll_ub boll_lb rsi_30 cci_30 dx_30 close_30_sma close_60_sma cov_list return_list
0 2008-12-31 3.070357 3.133571 3.047857 3.048214 2.602662 607541200 AAPL 2 -0.097446 3.649552 2.895305 42.254771 -80.847207 16.129793 3.243631 3.375887 [[0.001348968986171653, 0.00042841264280825875... tic AAPL AMGN AXP ...
1 2008-12-31 57.110001 58.220001 57.060001 57.750000 43.587837 6287200 AMGN 2 0.216368 58.947401 56.388599 51.060614 51.895357 10.432018 56.671334 56.044333 [[0.001348968986171653, 0.00042841264280825875... tic AAPL AMGN AXP ...
2 2008-12-31 17.969999 18.750000 17.910000 18.549999 14.852879 9625600 AXP 2 -1.191668 23.723023 16.106977 42.521170 -74.811722 25.776759 20.030000 22.412000 [[0.001348968986171653, 0.00042841264280825875... tic AAPL AMGN AXP ...
3 2008-12-31 41.590000 43.049999 41.500000 42.669998 32.005894 5443100 BA 2 -0.391219 42.894634 38.486366 47.290375 157.922391 5.366299 40.432000 43.304500 [[0.001348968986171653, 0.00042841264280825875... tic AAPL AMGN AXP ...
4 2008-12-31 43.700001 45.099998 43.700001 44.669998 30.416977 6277400 CAT 2 0.979845 45.785565 38.404435 51.073052 98.904653 26.331746 40.266000 39.918333 [[0.001348968986171653, 0.00042841264280825875... tic AAPL AMGN AXP ...
21.7 构建交易环境
考虑到自动股票交易任务的随机性和互动性,在本项目中将金融任务建模为马尔可夫决策过程(Markov Decision Process,MDP)问题。在训练过程观察股价的变化、执行操作以及奖励计算,使代理根据奖励调整其策略。通过与环境互动,交易代理将制定随着时间推移而最大化奖励的交易策略。
本项目的交易环境基于OpenAI Gym框架实现,根据时间驱动模拟的原则模拟实时股票市场,使用真实的市场数据。
1. 训练数据拆分
(1)使用data_split函数将数据集df划分为训练集和交易集,这样的数据集划分是为了在模型训练阶段使用train集,而在模型训练后的回测或实际应用中使用trade集。
train = data_split(df, '2009-01-01','2020-07-01')
#trade = data_split(df, '2020-01-01', config.END_DATE)
对上述代码的具体说明如下所:
- train = data_split(df, '2009-01-01','2020-07-01'):将数据集在时间范围从2009年1月1日到2020年7月1日进行划分,生成训练集。
- #trade = data_split(df, '2020-01-01', config.END_DATE):被注释掉的这行代码,是指将数据集在时间范围从2020年1月1日到config.END_DATE进行划分,生成交易集。由于被注释掉,该行代码当前并未执行。
(2)通过如下代码,显示训练集train的前几行数据,以便能够查看数据的结构和内容。
train.head()
执行后会输出:
date open high low close adjcp volume tic day macd boll_ub boll_lb rsi_30 cci_30 dx_30 close_30_sma close_60_sma cov_list return_list
0 2009-01-02 3.067143 3.251429 3.041429 3.241071 2.767330 746015200 AAPL 4 -0.082758 3.633600 2.892864 45.440193 -30.508777 2.140064 3.244631 3.376833 [[0.001366150662406761, 0.000433938195725591, ... tic AAPL AMGN AXP ...
0 2009-01-02 58.590000 59.080002 57.750000 58.990002 44.523743 6547900 AMGN 4 0.320448 59.148360 56.339640 52.756859 94.549630 0.814217 56.759667 56.166000 [[0.001366150662406761, 0.000433938195725591, ... tic AAPL AMGN AXP ...
0 2009-01-02 18.570000 19.520000 18.400000 19.330000 15.477422 10955700 AXP 4 -1.059847 23.489423 16.086577 43.923322 -42.018825 16.335101 20.028333 22.263333 [[0.001366150662406761, 0.000433938195725591, ... tic AAPL AMGN AXP ...
0 2009-01-02 42.799999 45.560001 42.779999 45.250000 33.941101 7010200 BA 4 -0.019566 43.926849 37.932151 50.664690 275.696308 20.494464 40.621667 43.237334 [[0.001366150662406761, 0.000433938195725591, ... tic AAPL AMGN AXP ...
0 2009-01-02 44.910000 46.980000 44.709999 46.910000 31.942251 7117200 CAT 4 1.248426 46.543072 38.372928 53.534743 131.675975 34.637448 40.623333 39.911333 [[0.001366150662406761, 0.000433938195725591, ... tic AAPL AMGN AXP ...
2. 投资组合配置(Portfolio Allocation)环境
"Portfolio Allocation"环境是针对投资组合分配任务设计的强化学习环境。在这个环境中,模型的目标是通过动态调整投资组合中各个资产的权重,以最大化投资组合的价值或收益。这个环境需要考虑股票市场的波动性、交易成本、风险管理等因素,以制定有效的投资策略。这种环境的设计旨在模拟真实的投资情境,帮助强化学习模型学习并优化投资组合配置。
(1)定义类StockPortfolioEnv,这是一个基于 OpenAI Gym 实现的股票交易环境
class StockPortfolioEnv(gym.Env):
"""用于OpenAI Gym的单一股票交易环境
属性
----------
df: DataFrame
输入数据
stock_dim : int
唯一股票的数量
hmax : int
最大交易股数
initial_amount : int
初始资金
transaction_cost_pct: float
每笔交易的交易成本百分比
reward_scaling: float
奖励的缩放因子,有助于训练
state_space: int
输入特征的维度
action_space: int
等于股票的维度
tech_indicator_list: list
技术指标名称的列表
turbulence_threshold: int
控制风险厌恶的阈值
day: int
控制日期的增量数
方法
-------
step()
在每个步骤中,代理将返回动作,然后
我们将计算奖励,并返回下一个观察。
reset()
重置环境
render()
使用render返回其他功能
save_asset_memory()
返回每个时间步的账户价值
save_action_memory()
返回每个时间步的动作/仓位
"""
metadata = {'render.modes': ['human']}
(2)__init__ 方法:是Python类的构造函数,在创建类实例时自动调用。在StockPortfolioEnv类中,__init__ 方法用于初始化环境的各种属性和参数。
def __init__(self,
df,
stock_dim,
hmax,
initial_amount,
transaction_cost_pct,
reward_scaling,
state_space,
action_space,
tech_indicator_list,
turbulence_threshold=None,
lookback=252,
day = 0):
#super(StockEnv, self).__init__()
#money = 10 , scope = 1
self.day = day
self.lookback=lookback
self.df = df
self.stock_dim = stock_dim
self.hmax = hmax
self.initial_amount = initial_amount
self.transaction_cost_pct =transaction_cost_pct
self.reward_scaling = reward_scaling
self.state_space = state_space
self.action_space = action_space
self.tech_indicator_list = tech_indicator_list
# 动作空间标准化,形状为 self.stock_dim
self.action_space = spaces.Box(low=0, high=1, shape=(self.action_space,))
# 形状为 (34, 30) 的协方差矩阵 + 技术指标
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.state_space + len(self.tech_indicator_list), self.state_space))
# 从 pandas 数据框加载数据
self.data = self.df.loc[self.day, :]
self.covs = self.data['cov_list'].values[0]
self.state = np.append(np.array(self.covs), [self.data[tech].values.tolist() for tech in self.tech_indicator_list], axis=0)
self.terminal = False
self.turbulence_threshold = turbulence_threshold
# 初始化状态:初始投资组合回报 + 单个股票回报 + 单个权重
self.portfolio_value = self.initial_amount
# 每步记住投资组合价值
self.asset_memory = [self.initial_amount]
# 每步记住投资组合回报
self.portfolio_return_memory = [0]
self.actions_memory = [[1 / self.stock_dim] * self.stock_dim]
self.date_memory = [self.data.date.unique()[0]]
(3)step:是强化学习环境中的一个关键方法,定义了在每个时间步执行的操作。在StockPortfolioEnv类中,step 方法用于执行一个时间步的交易操作,并返回相关的环境信息。方法step的具体说明如下所示。
- actions:接收代理程序选择的动作,即投资组合中每个股票的权重。这些权重用于确定在该时间步内如何分配资金。
- 检查环境是否已经到达时间序列的末尾(self.terminal),如果到达末尾,将生成并保存有关投资组合表现的一些图表,并输出总资产、夏普比率等信息。如果时间序列尚未结束,方法将执行代理程序选择的动作,计算并返回奖励、新的观察状态以及是否已经到达时间序列末尾的信息。
- 方法step还用于更新环境的内部状态,包括日期、股票价格等,并将投资组合的相关信息保存在内存中供后续分析使用。
def step(self, actions):
# print(self.day)
self.terminal = self.day >= len(self.df.index.unique()) - 1
# print(actions)
if self.terminal:
df = pd.DataFrame(self.portfolio_return_memory)
df.columns = ['daily_return']
plt.plot(df.daily_return.cumsum(), 'r')
plt.savefig('results/cumulative_reward.png')
plt.close()
plt.plot(self.portfolio_return_memory, 'r')
plt.savefig('results/rewards.png')
plt.close()
print("=================================")
print("begin_total_asset:{}".format(self.asset_memory[0]))
print("end_total_asset:{}".format(self.portfolio_value))
df_daily_return = pd.DataFrame(self.portfolio_return_memory)
df_daily_return.columns = ['daily_return']
if df_daily_return['daily_return'].std() != 0:
sharpe = (252 ** 0.5) * df_daily_return['daily_return'].mean() / df_daily_return[
'daily_return'].std()
print("Sharpe: ", sharpe)
print("=================================")
return self.state, self.reward, self.terminal, {}
else:
# print("Model actions: ",actions)
# 动作是投资组合权重
# 标准化为和为1
# if (np.array(actions) - np.array(actions).min()).sum() != 0:
# norm_actions = (np.array(actions) - np.array(actions).min()) / (np.array(actions) - np.array(actions).min()).sum()
# else:
# norm_actions = actions
weights = self.softmax_normalization(actions)
# print("Normalized actions: ", weights)
self.actions_memory.append(weights)
last_day_memory = self.data
# 加载下一个状态
self.day += 1
self.data = self.df.loc[self.day, :]
self.covs = self.data['cov_list'].values[0]
self.state = np.append(np.array(self.covs),
[self.data[tech].values.tolist() for tech in self.tech_indicator_list], axis=0)
# print(self.state)
# 计算投资组合回报
# 单个股票的回报 * 权重
portfolio_return = sum(((self.data.close.values / last_day_memory.close.values) - 1) * weights)
# 更新投资组合价值
new_portfolio_value = self.portfolio_value * (1 + portfolio_return)
self.portfolio_value = new_portfolio_value
# 保存到记忆中
self.portfolio_return_memory.append(portfolio_return)
self.date_memory.append(self.data.date.unique()[0])
self.asset_memory.append(new_portfolio_value)
# 奖励是新的投资组合价值或结束投资组合价值
self.reward = new_portfolio_value
# print("Step reward: ", self.reward)
# self.reward = self.reward*self.reward_scaling
return self.state, self.reward, self.terminal, {}
(4)reset():用于重置交易环境的状态。在训练或评估过程中,当需要重新开始时,可以调用此方法。
def reset(self):
self.asset_memory = [self.initial_amount]
self.day = 0
self.data = self.df.loc[self.day, :]
# 加载状态
self.covs = self.data['cov_list'].values[0]
self.state = np.append(np.array(self.covs),
[self.data[tech].values.tolist() for tech in self.tech_indicator_list], axis=0)
self.portfolio_value = self.initial_amount
# self.cost = 0
# self.trades = 0
self.terminal = False
self.portfolio_return_memory = [0]
self.actions_memory = [[1 / self.stock_dim] * self.stock_dim]
self.date_memory = [self.data.date.unique()[0]]
return self.state
(5)render():用于返回当前环境状态的表示,用于可视化或记录。在人类可读的模式下返回当前状态。
def render(self, mode='human'):
return self.state
(6)softmax_normalization():对动作进行 softmax 标准化操作,以确保动作概率的有效性。
def softmax_normalization(self, actions):
numerator = np.exp(actions)
denominator = np.sum(np.exp(actions))
softmax_output = numerator / denominator
return softmax_output
(7)save_asset_memory():用于返回每个时间步的账户价值,用于记录和分析模型的性能。
def save_asset_memory(self):
date_list = self.date_memory
portfolio_return = self.portfolio_return_memory
# print(len(date_list))
# print(len(asset_list))
df_account_value = pd.DataFrame({'date': date_list, 'daily_return': portfolio_return})
return df_account_value
(8)save_action_memory():用于返回每个时间步的动作/持仓,记录和分析模型的交易决策。
def save_action_memory(self):
# 日期和收盘价的长度必须与动作的长度匹配
date_list = self.date_memory
df_date = pd.DataFrame(date_list)
df_date.columns = ['date']
action_list = self.actions_memory
df_actions = pd.DataFrame(action_list)
df_actions.columns = self.data.tic.values
df_actions.index = df_date.date
# df_actions = pd.DataFrame({'date':date_list,'actions':action_list})
return df_actions
(9)_seed():这是一个种子生成器函数,用于确保环境在不同运行中具有相同的初始状态。
def _seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
(10)get_sb_env():用于获取 Stable-Baselines3 环境,将当前环境包装为适用于 Stable-Baselines3 库的向量化环境。
def get_sb_env(self):
e = DummyVecEnv([lambda: self])
obs = e.reset()
return e, obs
在强化学习中,通常会使用向量化环境(Vectorized Environment)来提高训练效率。Stable-Baselines3(SB3)库中的向量化环境是指一次执行多个并发环境的能力。与传统的环境相比,向量化环境允许同时在多个环境实例上执行模型的前进和训练步骤。
在SB3中使用的向量化环境,主要是通过类DummyVecEnv实现。类DummyVecEnv将多个环境实例包装在一起,使其表现得像单个环境。这种环境的向量化可以带来训练速度的显著提升,特别是在使用深度学习模型进行训练时,因为模型的计算可以在多个环境之间并行进行。在使用SB3进行强化学习时,通过使用向量化环境,可以更有效地利用硬件资源,加速训练过程。
(11)下面代码首先计算了训练数据集中不同股票(tic是股票代码)的数量,然后将该数量分配给变量stock_dimension。接着,将stock_dimension的值赋给state_space变量。最后,打印输出了股票维度(即不同股票的数量)和状态空间的维度。
stock_dimension = len(train.tic.unique())
state_space = stock_dimension
print(f"Stock Dimension: {stock_dimension}, State Space: {state_space}")
上述代码的目的可能是在股票交易环境中设置环境参数,其中stock_dimension表示股票的数量,而state_space表示状态空间的维度。执行后会输出:
Stock Dimension: 28, State Space: 28
(12)下面这段代码创建了一个股票交易环境e_train_gym,并使用之前定义的参数字典env_kwargs来设置环境的各种参数。具体来说,包括如下所示的参数。
- "hmax":最大交易数量,限制每次交易的最大股票数量。
- "initial_amount":初始资金,代表交易的起始资金。
- "transaction_cost_pct":交易成本百分比,表示每次交易的手续费百分比。
- "state_space":状态空间的维度,这里使用之前计算的state_space。
- "stock_dim":股票的数量,这里使用之前计算的stock_dimension。
- "tech_indicator_list":技术指标列表,包含在训练环境中使用的技术指标。
- "action_space":动作空间的维度,这里设置为股票的数量。
- "reward_scaling":奖励缩放因子,用于调整奖励的规模。
这样,通过传递这些参数,我们创建的股票交易环境就具备了相应的特性和限制。
env_kwargs = {
"hmax": 100,
"initial_amount": 1000000,
"transaction_cost_pct": 0.001,
"state_space": state_space,
"stock_dim": stock_dimension,
"tech_indicator_list": config.INDICATORS,
"action_space": stock_dimension,
"reward_scaling": 1e-4
}
e_train_gym = StockPortfolioEnv(df = train, **env_kwargs)
(13)通过get_sb_env()方法获取库Stable-Baselines3的向量化环境env_train,然后打印其类型。通过使用get_sb_env()方法,原始的股票交易环境被包装成了Stable-Baselines3库中的向量化环境,以便与该库中的强化学习算法进行集成。
env_train, _ = e_train_gym.get_sb_env()
print(type(env_train))
在上述代码中,print(type(env_train))语句用于打印输出env_train的类型,以确认其为Stable-Baselines3中的环境类型。这通常是DummyVecEnv类型,这是Stable-Baselines3库中用于包装环境的向量化环境。执行后会输出:
<class 'stable_baselines3.common.vec_env.dummy_vec_env.DummyVecEnv'>
标签:PyPortfolioOpt,FinRL,21,df,self,list,actions,date,return
From: https://blog.csdn.net/asd343442/article/details/143647464