写作目的
现在人工智能快速发展,机器学习对我们的生活产生很多影响,对于想从事数据分析的友友们而言,学习机器学习似乎是一项必备技能。自己也是一个应用数学专业的小白,刚刚完成一个机器学习的项目有点过于“简陋”。但是我希望通过分享一些自己的机器学习的一下比亚迪股票预测,帮助一些刚刚入门的友友有一个好的开始,废话少说,开始今天的学习之旅吧
预测背景
新能源汽车产业在中国经济发展中扮演着越来越重要的角色,它不仅代表了中国在技术创新和产业升级方面的成就,也是推动经济转型的关键力量。新能源汽车产业的发展得益于国家政策的大力支持、市场需求的快速增长以及技术创新的不断突破。
比亚迪作为新能源汽车行业的领军企业,其股票表现尤为引人注目。根据东方财富网的数据,比亚迪在多项关键财务指标上均位于行业前列。2022年,比亚迪的营业总收入达到了4240.61亿元,净利润高,净资产大,这反映出公司的经营效益好,信用风险低,具备较强的行业地位和市场影响力。
在新能源汽车产业上市公司全方位对比中,比亚迪以其强劲的产销规模、较高的毛利水平以及显著的研发投入强度,确立了其在行业中的领先地位。同时,比亚迪的市值和股价表现也显示出市场对其长期投资价值的认可。所以此项目以比亚迪的股票进行预测。
模型选择
使用卷积神经网络(CNN)进行股票价格预测可以有效捕捉到比亚迪股票收盘价格与其他几家汽车制造企业股票价格之间的潜在联系。CNN在处理时间序列数据方面表现出色,能够识别出复杂的非线性模式和长期依赖关系,这对于理解不同股票之间的相互作用至关重要。通过分析这些数据,我们可以揭示不同公司股票价格的共同趋势和潜在的市场影响力。
而XGBoost作为一种强大的梯度提升框架,它在处理具有大量特征的回归问题上表现出色。XGBoost通过构建多个决策树并优化它们以最小化预测误差,从而提供准确的预测。在股票预测中,XGBoost可以处理各种影响股票价格的因素,包括宏观经济指标、公司业绩、市场情绪等,通过这种方式,在此项目,我们使用不同公司的收盘价格,作为训练样本。从而,XGBoost能够捕捉到影响比亚迪股票价格的细微变化。
因此,结合CNN和XGBoost的优势,我们可以构建一个更为全面和精确的股票预测模型。CNN可以用于特征提取和模式识别,而XGBoost则可以用于最终的预测和模型优化。通过这种方式,我们可以更深入地理解不同汽车制造商股票价格之间的相互关系,并预测它们在未来市场的表现。这种综合方法不仅提高了预测的准确性,还增强了模型对市场变化的适应性和鲁棒性。
代码详解
1. 导入必要的包
本文使用Pytorch架构进行构建模型
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.metrics import mean_squared_error
2. 数据预处理
# 计算加权移动平均线
def weighted_moving_average(data, window_size):
weights = np.arange(1, window_size + 1)
wma = np.convolve(data, weights[::-1], mode='valid') / weights.sum()
return np.concatenate((data[:window_size - 1], wma))
# 读取数据
file_path = 'fin_sample.csv' # 更新为本地文件路径
data = pd.read_csv(file_path)
# 提取比亚迪(BYD)和其他公司的关闭数据
byd_data = data[data['Name'] == '比亚迪']['Close'].values
other_companies_data = data[data['Name'] != '比亚迪'][['Name', 'Close']]
# 使用加权移动平均线平滑数据
window_size = 5 # 可以调整窗口大小
byd_data_wma = weighted_moving_average(byd_data, window_size)
# 为每个其他公司平滑数据
company_dict_wma = {}
for name, group in other_companies_data.groupby('Name'):
company_data = group['Close'].values
company_data_wma = weighted_moving_average(company_data, window_size)
company_dict_wma[name] = company_data_wma
# 标准化数据
scaler = MinMaxScaler()
byd_data_wma = scaler.fit_transform(byd_data_wma.reshape(-1, 1)).flatten()
# 为每个其他公司标准化数据
for name in company_dict_wma:
company_dict_wma[name] = scaler.fit_transform(company_dict_wma[name].reshape(-1, 1)).flatten()
# 创建输入输出数据
def create_sequences(data, other_data, seq_length):
xs = []
ys = []
for i in range(len(data) - seq_length):
x = np.column_stack([data[i:i + seq_length]] + [other_data[name][i:i + seq_length] for name in other_data])
y = data[i + seq_length]
xs.append(x)
ys.append(y)
return np.array(xs), np.array(ys)
seq_length = 10 # 可以调整序列长度
X, y = create_sequences(byd_data_wma, company_dict_wma, seq_length)
# 分割训练集、验证集和测试集
train_size = 0.7
val_size = 0.1
test_size = 1 - train_size - val_size
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=1-train_size, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=test_size/(val_size+test_size), random_state=42)
# 将数据转换为PyTorch的Tensor格式
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1) # 改变目标张量的形状
y_val_tensor = torch.tensor(y_val, dtype=torch.float32).unsqueeze(1) # 改变目标张量的形状
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1) # 改变目标张量的形状
# 创建数据加载器
batch_size = 16 # 可以调整批量大小
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
# 定义EarlyStopping类
class EarlyStopping:
def __init__(self, patience=5, min_delta=0):
self.patience = patience
self.min_delta = min_delta
self.best_loss = None
self.counter = 0
self.early_stop = False
def __call__(self, val_loss):
if self.best_loss is None:
self.best_loss = val_loss
elif val_loss < self.best_loss - self.min_delta:
self.best_loss = val_loss
self.counter = 0
else:
self.counter += 1
if self.counter >= self.patience:
self.early_stop = True
-
加权移动平均线 (
weighted_moving_average
函数): 这个函数用于计算股票价格数据的加权移动平均值,这是一种平滑技术,可以帮助减少数据中的噪声。 -
数据读取: 脚本从CSV文件中读取数据,然后提取特定公司(例如比亚迪)的收盘价数据和其他公司的收盘价数据。
-
数据平滑: 使用加权移动平均线对比亚迪和其他公司的数据进行平滑处理。
-
数据标准化: 使用
MinMaxScaler
对数据进行标准化,使其范围在0到1之间,这有助于模型训练。 -
创建序列数据 (
create_sequences
函数): 这个函数用于创建模型训练所需的输入输出序列数据。 -
数据集分割: 将数据集分割为训练集、验证集和测试集。
-
数据转换: 将数据转换为PyTorch的Tensor格式,这是PyTorch模型训练所需的数据格式。
-
创建数据加载器: 使用
DataLoader
创建数据加载器,用于在模型训练过程中批量加载数据。 -
定义EarlyStopping类: 这是一个自定义的早停法(Early Stopping)实现,用于在模型训练过程中监控验证损失,如果连续多个周期内损失没有显著改善,则停止训练以避免过拟合。
3. 卷积神经网络构造
# 定义模型、损失函数和优化器
class CNNResNet(nn.Module):
def __init__(self, seq_length, num_features):
super(CNNResNet, self).__init__()
self.conv1 = nn.Conv1d(in_channels=num_features, out_channels=16, kernel_size=2, stride=1)
self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
self.resblock1 = ResidualBlock(16, 32)
self.resblock2 = ResidualBlock(32, 64)
self.flatten = nn.Flatten()
self._calculate_fc1_input_size(seq_length)
def _calculate_fc1_input_size(self, seq_length):
dummy_input = torch.zeros(1, 10, seq_length) # 10 是 num_features
with torch.no_grad():
dummy_output = self._forward_conv_layers(dummy_input)
fc1_input_size = dummy_output.shape[1] * dummy_output.shape[2]
self.fc1 = nn.Linear(fc1_input_size, 50)
self.fc2 = nn.Linear(50, 1)
def _forward_conv_layers(self, x):
x = self.pool(torch.relu(self.conv1(x)))
x = self.resblock1(x)
x = self.resblock2(x)
return x
def forward(self, x):
x = self._forward_conv_layers(x)
x = self.flatten(x)
x = torch.relu(self.fc1(x))
x = self.fc2(x)
return x
def feature_extract(self, x):
x = self._forward_conv_layers(x)
x = self.flatten(x)
return x
class ResidualBlock(nn.Module):
def __init__(self, in_channels, out_channels):
super(ResidualBlock, self).__init__()
self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=3, stride=1, padding=1)
self.bn1 = nn.BatchNorm1d(out_channels)
self.relu = nn.ReLU(inplace=True)
self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
self.bn2 = nn.BatchNorm1d(out_channels)
if in_channels != out_channels:
self.downsample = nn.Sequential(
nn.Conv1d(in_channels, out_channels, kernel_size=1, stride=1),
nn.BatchNorm1d(out_channels)
)
else:
self.downsample = None
def forward(self, x):
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
identity = self.downsample(identity)
out += identity
out = self.relu(out)
return out
1. 定义卷积神经网络模型:
1. 构建1D的卷积层,1D的卷积层有助于提取时间序列中的局部时序特征。
2. 加入池化层来降低数据的时间分辨率和提高模型的抽象能力
3.使用批归一化层和激活层增加网络的非线性
2. 早停机制:
# 初始化模型、损失函数和优化器
seq_length = X_train_tensor.shape[2]
num_features = X_train_tensor.shape[1]
model = CNNResNet(seq_length, num_features)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 初始化EarlyStopping
early_stopping = EarlyStopping(patience=20, min_delta=0.001)
# 训练模型
num_epochs = 130
for epoch in range(num_epochs):
model.train()
train_losses = []
for inputs, targets in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
train_losses.append(loss.item())
# 验证模型
model.eval()
val_losses = []
with torch.no_grad():
for val_inputs, val_targets in val_loader:
val_outputs = model(val_inputs)
val_loss = criterion(val_outputs, val_targets)
val_losses.append(val_loss.item())
# 计算平均损失
train_loss = np.mean(train_losses)
val_loss = np.mean(val_losses)
print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
# 检查是否需要早停
early_stopping(val_loss)
if early_stopping.early_stop:
print("Early stopping")
break
1.将CNN的输出作为特征向量,这些特征向量可能会捕捉到与比亚迪股价相关性的关键特征
-
初始化模型 (
model = CNNResNet(...)
): 这里创建了一个自定义的卷积神经网络模型,CNNResNet
,它结合了卷积层和残差连接。seq_length
和num_features
分别代表输入数据的时间序列长度和特征数量。 -
初始化损失函数 (
criterion = nn.MSELoss()
): 使用均方误差(MSE)作为损失函数,这是回归问题常用的损失函数。 -
初始化优化器 (
optimizer = optim.Adam(...)
): 使用Adam优化器来更新模型的权重。学习率设置为0.001。 -
初始化EarlyStopping: 使用之前定义的
EarlyStopping
类实例化一个早停对象,设置(patience)为20个epoch,最小损失改进(min_delta)为0.001。 -
训练模型: 循环进行多个epoch的训练。每个epoch包括以下步骤:
- 将模型设置为训练模式。
- 循环遍历训练数据加载器中的批次数据。
- 清零梯度,进行前向传播,计算损失,执行反向传播,然后更新模型的权重。
- 记录每个批次的损失值。
-
验证模型: 在每个epoch结束后,将模型设置为评估模式,循环遍历验证数据加载器中的批次数据,计算验证损失,但不执行反向传播。
-
计算平均损失: 对训练和验证过程中收集的损失值取平均,以评估模型在训练集和验证集上的性能。
-
打印训练和验证损失: 在控制台打印每个epoch的训练损失和验证损失。
-
检查早停条件: 使用
early_stopping
对象检查是否满足早停条件。如果满足,则打印“Early stopping”信息并中断训练循环。
3. 特征工程:
# 提取特征
model.eval()
with torch.no_grad():
train_features = model.feature_extract(X_train_tensor).numpy()
val_features = model.feature_extract(X_val_tensor).numpy()
test_features = model.feature_extract(X_test_tensor).numpy()
# 转换标签为numpy数组
y_train = y_train_tensor.numpy()
y_val = y_val_tensor.numpy()
y_test = y_test_tensor.numpy()
1. CNN的输出作为特征向量,这些特征向量可能会捕捉到与比亚迪股价相关性的关键特征。
4.训练模型:
# 创建XGBoost模型
xgb_model = xgb.XGBRegressor(objective='reg:squarederror', n_estimators=100, learning_rate=0.1)
# 训练XGBoost模型
xgb_model.fit(train_features, y_train, eval_set=[(val_features, y_val)], early_stopping_rounds=10, verbose=True)
# 使用XGBoost模型进行预测
y_pred = xgb_model.predict(test_features)
# 将标准化后的预测结果和实际值反标准化
y_test_inverse = scaler.inverse_transform(y_test.reshape(-1, 1)).flatten()
y_pred_inverse = scaler.inverse_transform(y_pred.reshape(-1, 1)).flatten()
# 计算反标准化后的MSE
mse_inverse = mean_squared_error(y_test_inverse, y_pred_inverse)
print(f'Mean Squared Error after inverse transformation: {mse_inverse:.4f}')
-
创建XGBoost模型 (
xgb.XGBRegressor
): 这里创建了一个XGBoost回归模型实例,objective
设置为'reg:squarederror'
表示使用均方误差作为目标函数,n_estimators
表示模型中树的数量,learning_rate
是学习率。 -
训练XGBoost模型 (
xgb_model.fit
): 使用训练特征集train_features
和目标变量y_train
来训练模型。eval_set
参数用于指定验证集,early_stopping_rounds
参数用于指定在验证集上如果连续多少轮没有改善则停止训练,verbose
参数设置为True
表示在控制台输出训练进度。 -
使用XGBoost模型进行预测 (
xgb_model.predict
): 使用训练好的模型对测试特征集test_features
进行预测,得到预测结果y_pred
。 -
反标准化: 由于原始数据经过了标准化处理,这里使用之前拟合的
scaler
对实际值y_test
和预测值y_pred
进行反标准化,以便得到原始尺度上的值。 -
计算反标准化后的MSE (
mean_squared_error
): 计算反标准化后的实际值和预测值之间的均方误差,以评估模型性能。
5. 画图和计算MSE
import matplotlib.pyplot as plt
# 创建XGBoost模型
xgb_model = xgb.XGBRegressor(objective='reg:squarederror', n_estimators=100, learning_rate=0.1)
# 训练XGBoost模型
xgb_model.fit(train_features, y_train)
# 使用XGBoost模型进行预测
y_pred = xgb_model.predict(test_features)
scaler_byd = MinMaxScaler()
scaler_byd.fit(byd_data.reshape(-1, 1))
# 反标准化 y_test 和 y_pred
y_test = y_test_tensor.numpy() # 标准化后的实际值
y_pred = np.array(y_pred) # 标准化后的预测结果
# 将它们转换回原始尺度
y_test_inverse = scaler_byd.inverse_transform(y_test.reshape(-1, 1)).flatten()
y_pred_inverse = scaler_byd.inverse_transform(y_pred.reshape(-1, 1)).flatten()
# 计算反标准化后的MSE
mse_inverse = mean_squared_error(y_test_inverse, y_pred_inverse)
print(f'Mean Squared Error after inverse transformation: {mse_inverse:.4f}')
# 绘制预测结果和实际值的对比图
plt.figure(figsize=(10, 6))
plt.plot(y_test_inverse, label='Actual')
plt.plot(y_pred_inverse, label='Predicted')
plt.legend()
plt.show()
注意:细心的友友可能会发现,我们使用两次反标准化,这是因为,我们训练CNN时,已经进行一次标准化,在训练XGBOOST,有进行一次标准化,如果我们希望获得原来数值尺度的股票价格,就需要两次反标准化
结论:
-
模型表现: 我们的XGBoost模型在本次练习中表现出了良好的预测能力。MSE值为27.0434,这表明预测值与实际值之间的差异在可接受的范围内,模型能够捕捉到股票价格的基本波动趋势。
-
项目效果: 尽管MSE值不是最低,但考虑到这是一个练习项目,我们认为模型的效果是不错的。它为股票价格预测提供了一个有效的参考,并为进一步的模型优化和应用奠定了基础。
-
学习价值: 本次项目不仅帮助我们理解了XGBoost模型在时间序列预测中的应用,还加深了我们对股票市场波动性的认识。通过实践,我们学习了如何准备数据、选择特征、调整模型参数以及评估模型性能。
-
未来改进: 我们认识到,通过进一步的特征工程、模型调参或采用更先进的算法,模型的预测精度还有提升的空间。此外,结合其他类型的模型,进行超参数优化,可能会进一步提高预测的准确性。
-
实际应用: 虽然这是一个模拟练习,但我们相信,通过本次项目所获得的知识和经验,可以为实际的股票市场分析和投资决策提供一定的帮助。
-
项目意义: 最后,我们希望本次项目能够对大家有所启发,无论是在学术研究还是在金融行业的实际工作中,都能够成为一个小小的帮助和参考。
总结来说,本次练习项目是一个成功的实践,它不仅提升了我们对机器学习在金融领域应用的理解,也为未来的股票市场预测提供了有价值的见解和方法。我们期待将这些知识应用到更广泛的领域,并在未来的工作中取得更好的成果。
标签:val,train,self,XGBOOST,test,从零开始,CNN,模型,size From: https://blog.csdn.net/weixin_74241638/article/details/139844310