针对序列(时间、文本)数据的网络结构 续
运行系统:macOS Sequoia 15.0
Python编译器:PyCharm 2024.1.4 (Community Edition)
Python版本:3.12
TensorFlow版本:2.17.0
Pytorch版本:2.4.1
往期链接:
1-5 | 6-10 | 11-20 | 21-30 | 31-40 | 41-50 |
---|
51-60:函数 | 61-70:类 | 71-80:编程范式及设计模式 |
---|
81-90:Python编码规范 | 91-100:Python自带常用模块-1 |
---|
101-105:Python自带模块-2 | 106-110:Python自带模块-3 |
---|
111-115:Python常用第三方包-频繁使用 | 116-120:Python常用第三方包-深度学习 |
---|
121-125:Python常用第三方包-爬取数据 | 126-130:Python常用第三方包-为了乐趣 |
---|
131-135:Python常用第三方包-拓展工具1 | 136-140:Python常用第三方包-拓展工具2 |
---|
Python项目实战
141-145 | 146-150 | 151-155 | 156-160 | 161-165 | 166-170 | 171-175 |
---|
176-180:卷积结构 | 181-182:卷积结构(续) | 183-185:时间、序列数据 |
---|
P186-- 双向LSTM(Bidirectional Long Short-Term Memory 2005)
(1)模型结构说明
2005年,Alex Graves和Jürgen Schmidhuber将双向RNN的概念应用于LSTM,正式提出了双向LSTM架构。这一创新使得LSTM能够同时利用序列的过去和未来上下文信息,从而提升了在许多序列建模任务中的表现。双向LSTM(Bidirectional Long Short-Term Memory)是对传统LSTM的一种扩展,旨在同时考虑序列的过去和未来信息。其工作原理包括:
结构:由两个独立的LSTM层组成,一个按正向(从左到右)处理输入序列,另一个按反向(从右到左)处理同一序列。
信息流:每个时间步的输出结合了来自两个方向的信息,这样模型能更好地捕捉上下文。
输出:两个LSTM的输出通常被连接或合并,以供后续处理。
(2)创新性说明
双向信息流
正向与反向处理:双向LSTM通过两个独立的LSTM层,一个从序列的开始到结束(正向),另一个从结束到开始(反向)处理信息。这种双向处理方式使得模型能够同时利用过去和未来的上下文信息。
上下文理解
全面的上下文捕捉:传统LSTM只能利用过去的信息,而双向LSTM能够捕捉到完整的上下文信息。这对许多自然语言处理任务(如命名实体识别、情感分析等)至关重要,因为这些任务常常需要理解语句的前后关系。
更丰富的特征表示
融合信息:双向LSTM结合了正向和反向的输出,提供了更丰富的特征表示。这种综合特征增强了模型对复杂模式的捕捉能力,有助于提高预测的准确性。
改进的性能
在多种任务中的表现:双向LSTM在许多序列建模任务中表现优于单向LSTM,特别是在需要理解完整上下文的场景,如文本分类、机器翻译等。
适应性强
广泛应用:双向LSTM适用于多种序列数据的任务,不仅限于文本,还可以应用于语音识别、时间序列预测等领域,显示了其广泛的适应性。
解决长距离依赖问题
缓解梯度消失:通过结合双向信息流,双向LSTM在一定程度上缓解了长距离依赖问题,使得模型能够更好地学习长期依赖关系。
(3)示例代码:IMDB电影评论情感分析
import tensorflow as tf
from tensorflow.keras.datasets import imdb
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, Bidirectional, LSTM, Dense
# 设置参数
max_features = 20000
maxlen = 100
batch_size = 32
embedding_dims = 128
epochs = 5
# 加载IMDB数据集
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features)
# 填充序列
x_train = pad_sequences(x_train, maxlen=maxlen)
x_test = pad_sequences(x_test, maxlen=maxlen)
# 构建模型
model = Sequential()
model.add(Embedding(max_features, embedding_dims, input_length=maxlen))
model.add(Bidirectional(LSTM(64)))
model.add(Dense(1, activation='sigmoid'))
# 编译模型
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
# 训练模型
model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_split=0.2)
# 评估模型
score, acc = model.evaluate(x_test, y_test, batch_size=batch_size)
print(f'Test score: {score}, Test accuracy: {acc}')
# 预测情感
def predict_sentiment(review):
sequence = imdb.get_word_index()
sequence = [sequence.get(word, 0) for word in review.lower().split()]
sequence = pad_sequences([sequence], maxlen=maxlen)
prediction = model.predict(sequence)
return "Positive" if prediction[0][0] > 0.5 else "Negative"
# 测试预测
sample_review = "This movie was fantastic!"
print(f"Sample review: {sample_review}")
print(f"Predicted sentiment: {predict_sentiment(sample_review)}")
P187–变换器结构(Transformer 2017)
(1)模型结构说明
Transformer模型是由Vaswani等人在2017年的论文"Attention Is All You Need"中提出的。这个模型在自然语言处理领域产生了革命性的影响,并且后来被扩展到其他领域如计算机视觉。Transformer的核心原理是完全基于注意力机制(Attention Mechanism)的序列到序列(Sequence-to-Sequence)模型。原理如下:
自注意力机制(Self-Attention):允许模型在处理序列中的每个位置时,都能关注到序列中的其他所有位置。
多头注意力(Multi-Head Attention):允许模型同时关注不同的表示子空间。
位置编码(Positional Encoding):由于模型不含递归或卷积,使用位置编码来为模型提供序列中的位置信息。
编码器-解码器结构 :模型包含多层编码器和解码器,每层都包含自注意力机制和前馈神经网络。
(2)创新性说明
全注意力架构:首次提出完全基于注意力机制的模型,摒弃了之前广泛使用的RNN和CNN结构。
多头注意力:创新性地提出了多头注意力机制,增强了模型的表达能力。
位置编码:巧妙地解决了序列顺序信息的问题,而不需要引入递归结构。
残差连接和层归一化:在每个子层后使用残差连接和层归一化,有助于训练更深的网络。
缩放点积注意力:通过缩放因子改进了注意力计算的稳定性。
并行训练:设计了可以高度并行化的结构,大大提高了训练效率。
Transformer模型的创新性在于它完全重新思考了序列处理的方式。它证明了仅仅依靠注意力机制就能达到甚至超越之前最先进的基于RNN的模型的性能。
(3)示例代码:模拟气象数据预测(多输出多输出)
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
# 1. 数据生成
def generate_weather_data(n_samples, n_steps):
time = np.linspace(0, 1, n_steps)
# 温度:基础温度 + 季节变化 + 日间变化 + 随机噪声
temp_seasonal = 10 * np.sin(2 * np.pi * time) # 季节变化
temp_daily = 5 * np.sin(2 * np.pi * time * n_steps) # 日间变化
temp = 15 + temp_seasonal + temp_daily + np.random.normal(0, 2, (n_samples, n_steps))
# 湿度:与温度负相关 + 随机噪声
humidity = 100 - temp / 40 * 100 + np.random.normal(0, 5, (n_samples, n_steps))
humidity = np.clip(humidity, 0, 100)
# 风速:使用 Gamma 分布随机生成
wind_speed = np.random.gamma(2, 2, (n_samples, n_steps))
# 合并数据
data = np.stack([temp, humidity, wind_speed], axis=-1) # 形状:(n_samples, n_steps, 3)
return data.astype(np.float32)
# 2. 数据准备
n_samples = 10000
n_steps = 100 # 输入时间步数
n_future = 24 # 预测未来的时间步数
data = generate_weather_data(n_samples, n_steps + n_future)
X = data[:, :n_steps, :] # 输入特征:温度、湿度、风速
y = data[:, n_steps:, :2] # 输出目标:未来的温度和湿度
# 划分训练集、验证集和测试集
train_size = int(n_samples * 0.7)
val_size = int(n_samples * 0.9)
X_train = X[:train_size]
y_train = y[:train_size]
X_val = X[train_size:val_size]
y_val = y[train_size:val_size]
X_test = X[val_size:]
y_test = y[val_size:]
n_features = X_train.shape[2] # 输入特征数:3(温度、湿度、风速)
# 3. 定义 Positional Encoding
class PositionalEncoding(layers.Layer):
def __init__(self, sequence_length, embed_dim):
super(PositionalEncoding, self).__init__()
self.pos_encoding = self.positional_encoding(sequence_length, embed_dim)
def get_angles(self, pos, i, embed_dim):
angles = pos / np.power(10000, (2 * (i // 2)) / np.float32(embed_dim))
return angles
def positional_encoding(self, sequence_length, embed_dim):
angle_rads = self.get_angles(
np.arange(sequence_length)[:, np.newaxis],
np.arange(embed_dim)[np.newaxis, :],
embed_dim
)
# 将 sin 应用于偶数索引(2i)
sines = np.sin(angle_rads[:, 0::2])
# 将 cos 应用于奇数索引(2i+1)
cosines = np.cos(angle_rads[:, 1::2])
pos_encoding = np.zeros((sequence_length, embed_dim))
pos_encoding[:, 0::2] = sines
pos_encoding[:, 1::2] = cosines
pos_encoding = pos_encoding[np.newaxis, ...]
return tf.cast(pos_encoding, dtype=tf.float32)
def call(self, inputs):
return inputs + self.pos_encoding
# 4. 定义 TransformerBlock
class TransformerBlock(layers.Layer):
def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
super(TransformerBlock, self).__init__()
self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
self.ffn = keras.Sequential([
layers.Dense(ff_dim, activation='relu'),
layers.Dense(embed_dim)
])
self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
self.dropout1 = layers.Dropout(rate)
self.dropout2 = layers.Dropout(rate)
def call(self, inputs, training=None):
attn_output = self.att(inputs, inputs, training=training)
attn_output = self.dropout1(attn_output, training=training)
out1 = self.layernorm1(inputs + attn_output)
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output, training=training)
return self.layernorm2(out1 + ffn_output)
# 5. 构建 Transformer 模型
def build_transformer_model(input_shape, embed_dim, num_heads, ff_dim, num_layers, dropout, n_future,
n_output_features):
inputs = keras.Input(shape=input_shape) # 输入形状:(n_steps, n_features)
# 投影到嵌入维度
x = layers.Dense(embed_dim)(inputs)
# 添加位置编码
x = PositionalEncoding(input_shape[0], embed_dim)(x)
# 堆叠 Transformer 块
for _ in range(num_layers):
x = TransformerBlock(embed_dim, num_heads, ff_dim, rate=dropout)(x)
# 全局平均池化
x = layers.GlobalAveragePooling1D()(x)
# 全连接层
x = layers.Dense(64, activation='relu')(x)
x = layers.Dropout(dropout)(x)
x = layers.Dense(n_future * n_output_features)(x)
outputs = layers.Reshape((n_future, n_output_features))(x)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
# 6. 设置模型参数并构建模型
input_shape = (n_steps, n_features)
embed_dim = 64
num_heads = 4
ff_dim = 128
num_layers = 2
dropout = 0.1
n_output_features = y_train.shape[2] # 输出特征数:2(温度和湿度)
model = build_transformer_model(
input_shape, embed_dim, num_heads, ff_dim, num_layers, dropout, n_future, n_output_features
)
# 7. 编译和训练模型
model.compile(optimizer='adam', loss='mse', metrics=['mae'])
history = model.fit(
X_train, y_train,
validation_data=(X_val, y_val),
epochs=20,
batch_size=64
)
# 8. 评估模型
loss, mae = model.evaluate(X_test, y_test)
print(f'Test MAE: {mae}')
# 9. 可视化训练过程
plt.figure(figsize=(10, 6))
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('MSE Loss')
plt.legend()
plt.title('Training and Validation Loss')
plt.show()
# 10. 进行预测并可视化结果
# 生成新数据进行预测
X_new = generate_weather_data(1, n_steps)
y_pred = model.predict(X_new)
# 绘制预测结果
plt.figure(figsize=(12, 8))
# 温度预测
plt.subplot(2, 1, 1)
plt.plot(range(n_steps), X_new[0, :, 0], label='Historical Temperature')
plt.plot(range(n_steps, n_steps + n_future), y_pred[0, :, 0], label='Predicted Temperature')
plt.xlabel('Time Step')
plt.ylabel('Temperature')
plt.legend()
plt.title('Temperature Prediction')
# 湿度预测
plt.subplot(2, 1, 2)
plt.plot(range(n_steps), X_new[0, :, 1], label='Historical Humidity')
plt.plot(range(n_steps, n_steps + n_future), y_pred[0, :, 1], label='Predicted Humidity')
plt.xlabel('Time Step')
plt.ylabel('Humidity')
plt.legend()
plt.title('Humidity Prediction')
plt.tight_layout()
plt.show()
P188-- 时间卷积网络(TCN,Temporal Convolutional Network 2018)
(1)模型结构说明
Bai、Kolter和Koltun发表了题为"An Empirical Evaluation of Generic Convolutional and Recurrent Networks for Sequence Modeling"的论文。这篇论文正式提出了本节介绍的TCN架构。时间卷积网络(Temporal Convolutional Network,TCN)是一种专门用于处理时间序列数据的深度学习模型。下面我将详细介绍其原理、特点和实际应用例子。
原理
因果卷积:TCN使用因果卷积,确保t时刻的输出只依赖于t时刻及之前的输入,避免信息泄露。
膨胀卷积:通过在卷积核中引入间隔,增大感受野,捕捉长期依赖关系。
残差连接:使用残差块,有助于训练更深的网络并缓解梯度消失问题。
层级结构:通过堆叠多层卷积,逐步提取更高层次的时间特征。
(2)创新性说明
并行处理:相比RNN,TCN可以并行处理输入序列,提高计算效率。
固定感受野:每层的膨胀卷积可以精确控制网络的感受野大小。
灵活的序列长度:可以处理任意长度的输入序列。
稳定梯度:避免了RNN中的梯度消失/爆炸问题。
内存效率:相比LSTM,TCN的内存占用随序列长度的增加而增加得较慢。
(3)示例代码:模拟气象数据预测(多输出多输出)
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv1D, Dense, Dropout, LayerNormalization, Activation, \
GlobalAveragePooling1D
from tensorflow.keras.models import Model
# TCN残差块
def residual_block(x, dilation_rate, nb_filters, kernel_size):
padding = (kernel_size - 1) * dilation_rate
r = Conv1D(filters=nb_filters, kernel_size=kernel_size,
dilation_rate=dilation_rate, padding='causal')(x)
r = LayerNormalization()(r)
r = Activation('relu')(r)
r = Dropout(0.1)(r)
r = Conv1D(filters=nb_filters, kernel_size=kernel_size,
dilation_rate=dilation_rate, padding='causal')(r)
r = LayerNormalization()(r)
r = Activation('relu')(r)
r = Dropout(0.1)(r)
if x.shape[-1] != nb_filters:
x = Conv1D(filters=nb_filters, kernel_size=1, padding='same')(x)
return tf.keras.layers.add([x, r])
# TCN模型
def build_tcn_model(input_shape, nb_filters, kernel_size, nb_stacks, dilations, output_dim):
input_layer = Input(shape=input_shape)
x = input_layer
for _ in range(nb_stacks):
for d in dilations:
x = residual_block(x, d, nb_filters, kernel_size)
# 使用GlobalAveragePooling1D来将时间维度压缩
x = GlobalAveragePooling1D()(x)
x = Dense(64, activation='relu')(x)
output = Dense(output_dim)(x)
model = Model(inputs=input_layer, outputs=output)
return model
# 生成示例数据
def generate_data(n_samples, n_timesteps, n_features_in, n_features_out):
X = np.random.randn(n_samples, n_timesteps, n_features_in)
y = np.random.randn(n_samples, n_features_out)
return X, y
# 设置参数
n_samples = 1000
n_timesteps = 10
n_features_in = 3 # 温度、湿度、风速
n_features_out = 2 # 预测未来的温度和湿度
# 生成数据
X, y = generate_data(n_samples, n_timesteps, n_features_in, n_features_out)
# 构建模型
input_shape = (n_timesteps, n_features_in)
model = build_tcn_model(input_shape, nb_filters=64, kernel_size=3, nb_stacks=1,
dilations=[1, 2, 4, 8], output_dim=n_features_out)
# 编译模型
model.compile(optimizer='adam', loss='mse')
# 打印模型摘要
model.summary()
# 训练模型
history = model.fit(X, y, epochs=50, batch_size=32, validation_split=0.2, verbose=1)
# 生成测试数据
X_test, y_test = generate_data(10, n_timesteps, n_features_in, n_features_out)
# 预测
predictions = model.predict(X_test)
# 打印预测结果
for i in range(10):
print(f"Sample {i + 1}:")
print(f"Predicted Temperature: {predictions[i][0]:.2f}, Actual: {y_test[i][0]:.2f}")
print(f"Predicted Humidity: {predictions[i][1]:.2f}, Actual: {y_test[i][1]:.2f}")
print()
P189-- CNN+LSTM混合模型
(1)模型结构说明
原理
CNN部分:
CNN通过卷积层提取输入数据中的局部特征,适用于处理图像、时序数据等。
主要用于捕捉空间特征,如气象数据中的空间相关性。
LSTM部分:
LSTM通过其特殊的门控机制(输入门、遗忘门、输出门)来控制信息的流动,能够记忆长时间的依赖关系。
适合处理时间序列数据,保留过去信息以进行未来预测。
结合机制:
CNN首先对输入数据进行特征提取,然后将提取的特征输入到LSTM中进行时间序列建模。
通过这种方式,CNN负责捕捉数据中的空间模式,LSTM则负责处理时间序列的动态变化。
(2)创新性说明
多模态特征提取:通过结合CNN的空间特征提取能力和LSTM的时间建模能力,使得模型能够处理复杂的时空数据。
提高预测精度:在气象预测等任务中,CNN+LSTM的组合能够显著提高预测精度,尤其是在需要考虑长期依赖关系的情况下。
应用广泛:这种结合的模型在视频分析、语音识别、气象预测等多个领域表现出色,成为深度学习中的一种重要架构。
(3)示例代码:模拟气象数据预测(多输出多输出)
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
# macos系统显示中文
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS']
# 生成模拟的气象数据
def generate_weather_data(samples=1000, timesteps=24, features=3):
data = np.zeros((samples, timesteps, features))
for i in range(samples):
base_temp = 15 + np.random.rand() * 10 # 基础温度在 15 到 25 度之间
base_humidity = 50 + np.random.rand() * 50 # 基础湿度在 50% 到 100% 之间
base_pressure = 1000 + np.random.rand() * 50 # 基础气压在 1000 到 1050 hPa 之间
for t in range(timesteps):
data[i, t, 0] = base_temp + np.sin(t / timesteps * 2 * np.pi) * 10 + np.random.randn() # 温度随时间变化
data[i, t, 1] = base_humidity + np.cos(t / timesteps * 2 * np.pi) * 20 + np.random.randn() # 湿度随时间变化
data[i, t, 2] = base_pressure + np.sin(t / timesteps * 4 * np.pi) * 5 + np.random.randn() # 气压随时间变化
return data
# 创建数据集
def create_dataset(data, seq_length, pred_length):
X, y = [], []
for sample in data:
for i in range(len(sample) - seq_length - pred_length + 1):
X.append(sample[i:i + seq_length])
y.append(sample[i + seq_length:i + seq_length + pred_length])
return np.array(X), np.array(y)
# 准备数据
# 设置参数
samples = 1000
timesteps = 24
features = 3 # 温度、湿度、气压
seq_length = 12 # 输入序列长度
pred_length = 6 # 预测序列长度
# 生成数据
data = generate_weather_data(samples, timesteps, features)
# 创建数据集
X, y = create_dataset(data, seq_length, pred_length)
# 划分训练集和测试集
train_size = int(len(X) * 0.8)
X_train = X[:train_size]
y_train = y[:train_size]
X_test = X[train_size:]
y_test = y[train_size:]
# 转换为 PyTorch 张量
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)
# 定义 CNN+LSTM 模型
class CNNLSTM(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, seq_length, pred_length, num_features):
super(CNNLSTM, self).__init__()
self.num_features = num_features
self.pred_length = pred_length
# 定义卷积层
self.conv1 = nn.Conv1d(in_channels=num_features, out_channels=64, kernel_size=3, padding=1)
self.relu1 = nn.ReLU()
self.pool1 = nn.MaxPool1d(kernel_size=2)
# 计算卷积和池化后的序列长度
conv_seq_length = seq_length // 2 # 池化层会将序列长度减半
# 定义 LSTM 层
self.lstm = nn.LSTM(input_size=64, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
# 定义全连接层
self.fc = nn.Linear(hidden_size, num_features * pred_length)
def forward(self, x):
# x: (batch_size, seq_length, num_features)
x = x.permute(0, 2, 1) # 转换为 (batch_size, num_features, seq_length)
x = self.conv1(x)
x = self.relu1(x)
x = self.pool1(x) # (batch_size, out_channels, seq_length // 2)
x = x.permute(0, 2, 1) # 转换为 (batch_size, seq_length // 2, out_channels)
# LSTM 层
x, _ = self.lstm(x) # x: (batch_size, seq_length // 2, hidden_size)
x = x[:, -1, :] # 取最后一个时间步的输出 (batch_size, hidden_size)
# 全连接层
x = self.fc(x) # (batch_size, num_features * pred_length)
x = x.view(-1, self.pred_length, self.num_features) # (batch_size, pred_length, num_features)
return x
# 实例化模型并定义损失函数与优化器
input_size = features
hidden_size = 128
num_layers = 2
model = CNNLSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, seq_length=seq_length, pred_length=pred_length, num_features=features)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练模型
# 训练参数
epochs = 20
batch_size = 64
# 创建数据加载器
train_dataset = torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# 训练循环
for epoch in range(epochs):
model.train()
total_loss = 0
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
print(f'Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}')
# 测试模型
model.eval()
with torch.no_grad():
test_outputs = model(X_test_tensor)
test_loss = criterion(test_outputs, y_test_tensor)
print(f'Test Loss: {test_loss.item():.4f}')
# 选择一个样本进行可视化
index = 0
input_sample = X_test_tensor[index].numpy()
target_sample = y_test_tensor[index].numpy()
prediction = test_outputs[index].numpy()
# 绘制结果
time_input = np.arange(seq_length)
time_pred = np.arange(seq_length, seq_length + pred_length)
plt.figure(figsize=(12, 8))
feature_names = ['温度', '湿度', '气压']
for i in range(features):
plt.subplot(features, 1, i+1)
plt.plot(time_input, input_sample[:, i], label='历史数据')
plt.plot(time_pred, target_sample[:, i], label='真实值')
plt.plot(time_pred, prediction[:, i], label='预测值', linestyle='--')
plt.title(f'特征 {feature_names[i]} 的预测')
plt.xlabel('时间步')
plt.ylabel(f'{feature_names[i]}')
plt.legend()
plt.tight_layout()
plt.show()
P190-- Informer结构 2020
(1)模型结构说明
Informer由清华大学和华为诺亚方舟实验室的研究团队在2020年提出,旨在解决传统Transformer在处理长序列时的计算效率问题。该模型在AAAI 2021会议上发表,成为时序预测领域的重要进展。详细原理如下:
架构基础
Informer基于标准Transformer架构,但针对长序列进行了优化。
ProbSparse自注意力机制
通过引入ProbSparse机制,Informer在计算自注意力时只关注最重要的部分。这种策略显著降低了计算复杂度。
自注意力蒸馏
在模型的不同层中逐渐减少序列长度,使得后续层的计算更加高效。
生成式解码器
Informer使用生成式解码器,一次性预测长序列,避免了自回归模型的累积误差。
数据嵌入
使用专门的数据嵌入层处理不同特征(如时间特征),增强模型对时序数据的理解能力。
(2)创新性说明
效率提升:通过ProbSparse自注意力机制,Informer能够在保持高精度的同时,显著减少计算资源的消耗。
长序列处理:优化后的自注意力机制使得模型能够有效处理长序列数据,克服了传统Transformer的局限性。
灵活性:生成式解码器的引入使得模型在处理复杂预测任务时表现更加出色。
(3)示例代码:模拟气象数据预测(多输出多输出)
# 导入必要的库
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
# macos系统显示中文
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS']
# 1. 生成模拟的气象数据
def generate_weather_data(num_samples=1000, seq_len=100, num_features=3):
# 生成随机数据
data = np.random.randn(num_samples, seq_len, num_features)
# 添加趋势和季节性因素
for i in range(num_features):
trend = np.linspace(0, 1, seq_len)
seasonality = np.sin(np.linspace(0, 2 * np.pi, seq_len))
data[:, :, i] += trend + seasonality + np.random.randn(num_samples, seq_len) * 0.1
return data
# 2. 准备数据集
def create_dataset(data, input_length, output_length):
X, y = [], []
num_samples, seq_len, num_features = data.shape
for i in range(num_samples):
if seq_len < input_length + output_length:
continue
for j in range(seq_len - input_length - output_length + 1):
X.append(data[i, j:j + input_length, :])
y.append(data[i, j + input_length:j + input_length + output_length, :])
return np.array(X), np.array(y)
# 3. 生成数据
num_samples = 1000
seq_len = 100
num_features = 3 # 例如,温度、湿度、风速
data = generate_weather_data(num_samples, seq_len, num_features)
input_length = 60 # 输入序列长度
output_length = 10 # 预测序列长度
# 4. 创建数据集
X, y = create_dataset(data, input_length, output_length)
print("Input shape:", X.shape) # (样本数, 输入长度, 特征数)
print("Output shape:", y.shape) # (样本数, 输出长度, 特征数)
# 5. 划分训练集和测试集
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 转换为 PyTorch 的张量
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)
# 6. 定义简化的 Informer 模型
class Informer(nn.Module):
def __init__(self, input_dim, d_model, n_heads, e_layers, d_ff, dropout, output_dim, seq_len, pred_len):
super(Informer, self).__init__()
self.seq_len = seq_len
self.pred_len = pred_len
# 输入嵌入层
self.enc_embedding = nn.Linear(input_dim, d_model)
# 位置编码(可选,简单起见使用参数)
self.positional_encoding = nn.Parameter(torch.zeros(1, seq_len, d_model))
# Transformer 编码器层
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=n_heads, dim_feedforward=d_ff, dropout=dropout
)
self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=e_layers)
# 全连接输出层
self.projection = nn.Linear(d_model, output_dim)
def forward(self, x_enc):
"""
x_enc: (batch_size, seq_len, input_dim)
"""
# 输入嵌入
x = self.enc_embedding(x_enc) # (batch_size, seq_len, d_model)
x += self.positional_encoding # 添加位置编码
x = x.permute(1, 0, 2) # (seq_len, batch_size, d_model)
# Transformer 编码器
enc_output = self.encoder(x) # (seq_len, batch_size, d_model)
enc_output = enc_output[-self.pred_len:, :, :] # 取最后 pred_len 个时间步
enc_output = enc_output.permute(1, 0, 2) # (batch_size, pred_len, d_model)
# 全连接输出
output = self.projection(enc_output) # (batch_size, pred_len, output_dim)
return output
# 7. 实例化模型并定义损失函数和优化器
input_dim = num_features
output_dim = num_features
d_model = 64
n_heads = 4
e_layers = 2
d_ff = 128
dropout = 0.1
seq_len = input_length
pred_len = output_length
model = Informer(
input_dim, d_model, n_heads, e_layers, d_ff, dropout,
output_dim, seq_len, pred_len
)
# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 8. 训练模型
import time
epochs = 10
batch_size = 64
train_dataset = torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor)
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=batch_size, shuffle=True
)
for epoch in range(epochs):
start_time = time.time()
model.train()
total_loss = 0
for batch_x, batch_y in train_loader:
optimizer.zero_grad()
output = model(batch_x)
loss = criterion(output, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
elapsed = time.time() - start_time
print(f'Epoch {epoch + 1}/{epochs}, Loss: {avg_loss:.4f}, Time: {elapsed:.2f}s')
# 9. 测试模型并可视化结果
model.eval()
with torch.no_grad():
test_output = model(X_test_tensor)
test_loss = criterion(test_output, y_test_tensor)
print(f'Test Loss: {test_loss.item():.4f}')
# 可视化预测结果(以第一个测试样本为例)
sample_input = X_test_tensor[0].unsqueeze(0) # (1, input_length, num_features)
sample_output = y_test_tensor[0].unsqueeze(0) # (1, output_length, num_features)
with torch.no_grad():
sample_pred = model(sample_input) # (1, output_length, num_features)
sample_input = sample_input.numpy().squeeze()
sample_output = sample_output.numpy().squeeze()
sample_pred = sample_pred.numpy().squeeze()
time_input = np.arange(input_length)
time_output = np.arange(input_length, input_length + output_length)
plt.figure(figsize=(12, 8))
for i in range(num_features):
plt.subplot(num_features, 1, i + 1)
plt.plot(time_input, sample_input[:, i], label=f'输入特征 {i + 1}')
plt.plot(time_output, sample_output[:, i], label=f'真实未来特征 {i + 1}')
plt.plot(time_output, sample_pred[:, i], label=f'预测未来特征 {i + 1}')
plt.legend()
plt.xlabel('时间步')
plt.ylabel('特征值')
plt.title(f'特征 {i + 1} 的预测结果')
plt.tight_layout()
plt.show()
标签:plt,features,Python,self,length,200Tips,186,np,size
From: https://blog.csdn.net/qq_32882309/article/details/142752354