序列模型
训练
生成数据序列
import matplotlib_inline
import torch
import torch.nn as nn
import d2l.torch as d2l
import matplotlib.pyplot as plt
import numpy as np
T = 1000
time = torch.arange(1, T+1, 1, dtype=torch.float32)
x = torch.sin(0.01 * time) + torch.normal(0, 0.2, (T,))
plt.figure(figsize=(4,2), dpi=100, layout='constrained')
plt.plot(time, x, label = 'linear')
plt.xlabel('time');plt.ylabel('x')
plt.xticks(np.arange(0, 1001, step=200))
plt.yticks(np.arange(-1.5, 2.0, step=0.5))
plt.title('data')
plt.show()
x.shape
接下来,我们将这个序列转换为模型的特征-标签(feature-label)对。**
tau = 4
features = torch.zeros(T-tau, tau)
for i in range(tau):
features[:,i] = x[i:T - tau + i]
labels = x[tau:].reshape((-1, 1))
batch_size, n_train = 16, 600
train_iter = d2l.load_array((features[:n_train], labels[:n_train]),batch_size, is_train=True)
使⽤⼀个相当简单的架构训练模型:⼀个拥有两个全连接层的多层感知机, ReLU激活函数和平
⽅损失。
# 模型训练
# 1.初始化权重
def init_weights(m):
if type(m) == nn.Linear:
nn.init.xavier_uniform_(m.weight)
# 2.定义模型
net = nn.Sequential(nn.Linear(4, 10),
nn.ReLU(),
nn.Linear(10, 1))
net.apply(init_weights)
# 注意: MSELoss计算平⽅误差时不带系数1/2
loss = nn.MSELoss(reduction='none')
# 3.训练
def train(net, train_iter, loss, epochs, lr):
optimier = torch.optim.Adam(net.parameters(), lr)
for epoch in range(epochs):
for x, y in train_iter:
optimier.zero_grad()
y_pre = net(x)
l = loss(y_pre, y)
l.sum().backward()
optimier.step()
print(f'epoch{epoch + 1},'
f'loss:{d2l.evaluate_loss(net, train_iter, loss):f}')
train(net, train_iter, loss, 5, 0.01)
预测
由于训练损失很⼩,因此我们期望模型能有很好的⼯作效果。让我们看看这在实践中意味着什么。⾸先是检
查模型预测下⼀个时间步的能⼒,也就是单步预测(one-step-ahead prediction)。
onestep_preds = net(features)
plt.figure(figsize=(4,2), dpi=100, layout='constrained')
plt.plot(time, x.detach().numpy(), linestyle = '--', color='b')
plt.plot(time[tau:], onestep_preds.detach().numpy(), color='g')
plt.xlabel('time');plt.ylabel('x')
plt.xticks(np.arange(0, 1001, step=200))
plt.yticks(np.arange(-1.5, 2.0, step=0.5))
plt.title('data')
plt.legend(['x', 'x_pred'])
plt.show()
通常,对于直到xt的观测序列,其在时间步t + k处的预测输出x^t+k 称为k步预测(k-step-ahead-prediction)。由于我们的观察已经到了x604,它的k步预测是x^604+k。换句话说,我们必须使⽤我们⾃⼰的预测(⽽不是原始数据)来进⾏多步预测。让我们看看效果如何。
multistep_preds = torch.zeros(T)
multistep_preds[: n_train + tau] = x[: n_train + tau]
for i in range(n_train + tau, T):
multistep_preds[i] = net(multistep_preds[i - tau:i].reshape((1, -1)))
plt.figure(figsize=(4,2), dpi=100, layout='constrained')
plt.plot(time, x.detach().numpy(), linestyle = '--', color='b')
plt.plot(time[tau:], onestep_preds.detach().numpy(), color='g')
plt.plot(time[n_train + tau:], multistep_preds[n_train + tau:].detach().numpy(),linestyle = '-.', color='r')
plt.xlabel('time');plt.ylabel('x')
plt.xticks(np.arange(0, 1001, step=200))
plt.yticks(np.arange(-1.5, 2.0, step=0.5))
plt.title('data')
plt.legend(['x', 'x_pred','multistep preds'])
plt.show()
在步骤1之后,我们积累了⼀些错误ϵ1 = ¯ ϵ。 , 于是,步骤2的输⼊被扰动了ϵ1,结果积累的误差是依照次序的ϵ2 = ¯ ϵ + cϵ1,其中c为某个常数,后⾯的预测误差依此类推。因此误差可能会相当快地偏离真实的观测结果 。
文本预处理
我们将解析⽂本的常⻅预处理步骤。这些步骤通常包括:
- 将⽂本作为字符串加载到内存中。
- 将字符串拆分为词元(如单词和字符)。
- 建⽴⼀个词表,将拆分的词元映射到数字索引。
- 将⽂本转换为数字索引序列,⽅便模型操作。
读取数据集
# 1. 导入所需的库
import torch
import collections
import re
import d2l.torch as d2l
从H.G.Well的时光机器99中加载⽂本。
# 2. 读取数据集
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
'090b5e7e70c295757f55df93cb0a180b9691891a')
def read_time_machine():
with open(d2l.download('time_machine'), 'r') as f:
lines = f.readlines()
return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]
lines = read_time_machine()
print(lines)
print(f'# ⽂本总⾏数: {len(lines)}')
print(lines[0])
print(lines[10])
词元化
下⾯的tokenize函数将⽂本⾏列表(lines)作为输⼊,列表中的每个元素是⼀个⽂本序列(如⼀条⽂本⾏)。每个⽂本序列⼜被拆分成⼀个词元列表,词元(token)是⽂本的基本单位。最后,返回⼀个由词元列表组成的列表,其中的每个词元都是⼀个字符串(string)。
# 3. 词元化
def tokenize(lines, token = 'word'):
"""将⽂本⾏拆分为单词或字符词元"""
if token == 'word':
return [line.split() for line in lines]
elif token == 'char':
return [list(line) for line in lines]
else:
print('错误:未知类型:'+ token)
tokens = tokenize(lines)
print(len(tokens))
for i in range(11):
print(tokens[i])
词表
词元的类型是字符串,⽽模型需要的输⼊是数字,因此这种类型不⽅便模型使⽤。现在,让我们构建⼀个字
典,通常也叫做词表(vocabulary),⽤来将字符串类型的词元映射到从0开始的数字索引中。
先将训练集中的所有⽂档合并在⼀起,对它们的唯⼀词元进⾏统计,得到的统计结果称之为语料(corpus)。然后根据每个唯⼀词元的出现频率,为其分配⼀个数字索引。很少出现的词元通常被移除,这可以降低复杂性。另
外,语料库中不存在或已删除的任何词元都将映射到⼀个特定的未知词元“unk”。
# 4.构建词表
def count_corpus(tokens):
# 统计词元频率
if len(tokens) == 0 or isinstance(tokens[0], list):
# 将词元列表展开成一个列表
tokens = [token for line in tokens for token in line]
return collections.Counter(tokens)
class Vocab:
def __init__(self, tokens = None, min_freq = 0, reserved_tokens = None):
if tokens is None:
tokens = []
if reserved_tokens is None:
reserved_tokens = []
# 按出现频率排序
counter = count_corpus(tokens)
self._token_freqs = sorted(counter.items(), key=lambda x:x[1], reverse=True)
# 未知词元的索引为0
self.idx_to_token = ['<unk>'] + reserved_tokens
self.token_to_idx = {token: idx for idx, token in enumerate(self.idx_to_token)}
for token, freq in self._token_freqs:
if freq < min_freq:
break
if token not in self.token_to_idx:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1
def __len__(self):
return len(self.idx_to_token)
def __getitem__(self, tokens):
if not isinstance(tokens, (list, tuple)):
return self.token_to_idx.get(tokens, self.unk)
return [self.__getitem__(token) for token in tokens]
def to_tokens(self, indices):
if not isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
return [self.idx_to_token[index] for index in indices]
@property
def unk(self): # 未知词元的索引为0
return 0
@property
def token_freqs(self):
return self._token_freqs
整合功能
def load_corpus_time_machine(max_tokens=-1): #@save
"""返回时光机器数据集的词元索引列表和词表"""
lines = read_time_machine()
tokens = tokenize(lines, 'char')
vocab = Vocab(tokens)
# 因为时光机器数据集中的每个⽂本⾏不⼀定是⼀个句⼦或⼀个段落,
# 所以将所有⽂本⾏展平到⼀个列表中
corpus = [vocab[token] for line in tokens for token in line]
print(len(corpus)) # 词元数
if max_tokens > 0:
corpus = corpus[:max_tokens]
return corpus, vocab
corpus, vocab = load_corpus_time_machine()
len(corpus), len(vocab)
循环神经网络
隐藏层和隐状态指的是两个截然不同的概念。如上所述,
隐藏层是在从输⼊到输出的路径上(以观测⻆度来理解)的隐藏的层,
隐状态则是在给定步骤所做的任何事情(以技术⻆度来定义)的输⼊,并且这些状态只能通过先前时间步的数据来计算。**循环神经⽹络(recurrent neural networks, RNNs)是具有隐状态的神经⽹络。**** **
无隐状态的神经网络
单隐藏层的多层感知机
有隐状态的神经网络
与多层感知机不同的是,我们在这⾥保存了前⼀个时间步的隐藏变量Ht-1,并引⼊了⼀个新的权重
参数Whh 2 Rh×h,来描述如何在当前时间步中使⽤前⼀个时间步的隐藏变量。
从相邻时间步的隐藏变量Ht和 Ht-1之间的关系可知,这些变量捕获并保留了序列直到其当前时间步的历史信息,就如当前时间步下神经⽹络的状态或记忆,因此这样的隐藏变量被称为隐状态(hidden state)。由于在当前时间步中,隐状态使⽤的定义与前⼀个时间步中使⽤的定义相同,因此 (8.4.5)的计算是循环的(recurrent)。于是基于循环计算的隐状态神经⽹络被命名为 循环神经⽹络(recurrent neural network)。在循环神经⽹络中执⾏ (8.4.5)计算的层称为循环层(recurrent layer)。
图8.4.1展⽰了循环神经⽹络在三个相邻时间步的计算逻辑。在任意时间步t,隐状态的计算可以被视为:
- 拼接当前时间步t的输⼊Xt和前⼀时间步t - 1的隐状态Ht-1;
- 将拼接的结果送⼊带有激活函数ϕ的全连接层。全连接层的输出是当前时间步t的隐状态Ht。
import torch
import d2l.torch as d2l
X, W_xh = torch.normal(0, 1, (3, 1)), torch.normal(0, 1, (1, 4))
H, W_hh = torch.normal(0, 1, (3, 4)), torch.normal(0, 1, (4, 4))
# 乘法实现
# torch.matmul(X, W_xh) + torch.matmul(H, W_hh)
# 拼接实现
torch.matmul(torch.cat((X, H), 1), torch.cat((W_xh, W_hh), 0))
困惑度(Perplexity)
最后,让我们讨论如何度量语⾔模型的质量,这将在后续部分中⽤于评估基于循环神经⽹络的模型。
循环神经网络实现
初始化模型参数
我们初始化循环神经⽹络模型的模型参数。隐藏单元数num_hiddens是⼀个可调的超参数。当训练语
⾔模型时,输⼊和输出来⾃相同的词表
# 1.导入需要的库
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import d2l.torch as d2l
# 2.加载数据
batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
# 3.独热编码
F.one_hot(torch.tensor([0, 2]), len(vocab))
X = torch.arange(10).reshape((2, 5))
F.one_hot(X.T, 28).shape
# 4.初始化模型参数
'''
初始化循环神经⽹络模型的模型参数。隐藏单元数num_hiddens是⼀个可调的超参数。当训练语
⾔模型时,输⼊和输出来⾃相同的词表。因此,它们具有相同的维度,即词表的⼤⼩
'''
def get_params(vocab_size, num_hiddens, device):
num_inputs = num_outputs = vocab_size
def normal(shape):
return torch.randn(size=shape, device=device) * 0.01
# 隐藏层参数
W_xh = normal((num_inputs, num_hiddens))
W_hh = normal((num_hiddens, num_outputs))
b_h = torch.zeros(num_hiddens, device)
# 输出层参数
W_hq = normal((num_hiddens, num_outputs))
b_q = torch.zeros(num_outputs, device)
# 附加梯度
params = [W_xh, W_hh, b_h, W_hq, b_q]
for param in params:
param.requires_grad_(True)
return params
循环神经网络模型
为了定义循环神经⽹络模型,我们⾸先需要⼀个init_rnn_state函数在初始化时返回隐状态。这个函数的返
回是⼀个张量,张量全⽤0填充,形状为(批量⼤⼩,隐藏单元数)。
# 5.循环神经网络模型
def init_rnn_state(batch_size, num_hiddens, device):
return (torch.zeros(batch_size, num_hiddens), device)
def rnn(inputs, state, params):
W_xh, W_hh, b_h, W_hq, b_q = params
H, = state
outputs = []
for X in inputs:
H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
Y = torch.mm(H, W_hq) + b_q
outputs.append(Y)
return torch.cat(outputs, dim=0), (H, )
# 1.导入需要的库
import torch
import torch.nn as nn
import torch.nn.functional as F
import d2l.torch as d2l
# 2.加载数据
batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
# 3.定义模型
num_hiddens = 256
rnn_layer = nn.RNN(len(vocab), num_hiddens)
state = torch.zeros((1, batch_size, num_hiddens))
print(state.shape)
X = torch.rand(size=(num_steps, batch_size, len(vocab)))
Y, state_new = rnn_layer(X, state)
Y.shape, state_new.shape
class RNNModel(nn.Module):
"""The RNN model.
Defined in :numref:`sec_rnn-concise`"""
def __init__(self, rnn_layer, vocab_size, **kwargs):
super(RNNModel, self).__init__(**kwargs)
self.rnn = rnn_layer
self.vocab_size = vocab_size
self.num_hiddens = self.rnn.hidden_size
# If the RNN is bidirectional (to be introduced later),
# `num_directions` should be 2, else it should be 1.
if not self.rnn.bidirectional:
self.num_directions = 1
self.linear = nn.Linear(self.num_hiddens, self.vocab_size)
else:
self.num_directions = 2
self.linear = nn.Linear(self.num_hiddens * 2, self.vocab_size)
def forward(self, inputs, state):
X = F.one_hot(inputs.T.long(), self.vocab_size)
X = X.to(torch.float32)
Y, state = self.rnn(X, state)
# The fully connected layer will first change the shape of `Y` to
# (`num_steps` * `batch_size`, `num_hiddens`). Its output shape is
# (`num_steps` * `batch_size`, `vocab_size`).
output = self.linear(Y.reshape((-1, Y.shape[-1])))
return output, state
def begin_state(self, device, batch_size=1):
if not isinstance(self.rnn, nn.LSTM):
# `nn.GRU` takes a tensor as hidden state
return torch.zeros((self.num_directions * self.rnn.num_layers,
batch_size, self.num_hiddens),
device=device)
else:
# `nn.LSTM` takes a tuple of hidden states
return (torch.zeros((
self.num_directions * self.rnn.num_layers,
batch_size, self.num_hiddens), device=device),
torch.zeros((
self.num_directions * self.rnn.num_layers,
batch_size, self.num_hiddens), device=device))
device = d2l.try_gpu()
net = RNNModel(rnn_layer, vocab_size=len(vocab))
net = net.to(device)
d2l.predict_ch8('time traveller', 10, net, vocab, device)
num_epochs, lr = 500, 1
d2l.train_ch8(net, train_iter, vocab, lr, num_epochs, device)
标签:plt,num,self,torch,token,神经网络,循环,深度,size
From: https://blog.csdn.net/qq_52952281/article/details/144485680