深度学习 循环神经网络 笔记

本文最后更新于:2023年7月23日 晚上

序列模型

数据

  • 现实世界中很多数据是有时序结构的
  • 音乐、语言、文本,很多数据都是连续的

统计工具

  • 不独立的随机变量
    • 这是时序序列和其他序列的主要区别
模型
模型2

序列模型

  • 对条件概率建模,自回归模型
  • 给一定数据,预测另外一组数据时用前面给的数据
  • 核心:计算\(P\)\(f(x_1,...x_{t-1})\)
自回归模型

马尔科夫假设

  • 假设当前数据只跟\(\tau\)个过去数据点相关

\[p(x_t|x_1,...x_{t-1})=p(x_t|x_{t-\tau},...x_{t-1})=p(x_t|f(x_{t-\tau},...x_{t-1}))\]

潜变量模型

  • 引入潜变量\(h_t\)来表示过去信息\(h_t=f(x_1,...x_{t-1})\)
    • \(x_t = p(x_t|h_t)\)
    • 新的\(h'\)和前面的\(h\)\(x\)相关(模型1)
    • 给定前面的\(x\)和新的\(h'\)计算新的\(x'\)(模型2)

 潜变量模型

潜变量自回归模型

潜变量自回归模型
  • 使用潜变量\(h_t\)总结过去信息

总结

  • 时序模型中,当前数据只跟前面观察到的数据相关
  • 自回归模型使用自身过去数据来预测未来
  • 马尔可夫模型假设当前只跟最近少数数据相关,从而简化模型
  • 潜变量模型使用潜变量来概括历史信息

代码

1
2
3
4
5
6
7
8
9
%matplotlib inline
import torch
from torch import nn
from d2l import torch as d2l

T = 1000 # 总共产生1000个点
time = torch.arange(1, T + 1, dtype=torch.float32)
x = torch.sin(0.01 * time) + torch.normal(0, 0.2, (T,))
d2l.plot(time, [x], 'time', 'x', xlim=[1, 1000], figsize=(6, 3))
1
2
3
4
5
6
7
8
9
10
11
tau = 4
# 996 4
features = torch.zeros((T - tau, tau))
for i in range(tau):
features[:, i] = x[i: T - tau + i]
labels = x[tau:].reshape((-1, 1))

batch_size, n_train = 16, 600
# 只有前n_train个样本用于训练
train_iter = d2l.load_array((features[:n_train], labels[:n_train]),
batch_size, is_train=True)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 初始化网络权重的函数
def init_weights(m):
if type(m) == nn.Linear:
nn.init.xavier_uniform_(m.weight)

# 一个简单的多层感知机
def get_net():
net = nn.Sequential(nn.Linear(4, 10),
nn.ReLU(),
nn.Linear(10, 1))
net.apply(init_weights)
return net

# 平方损失。注意:MSELoss计算平方误差时不带系数1/2
loss = nn.MSELoss(reduction='none')
1
2
3
4
5
6
7
8
9
10
11
12
13
def train(net, train_iter, loss, epochs, lr):
trainer = torch.optim.Adam(net.parameters(), lr)
for epoch in range(epochs):
for X, y in train_iter:
trainer.zero_grad()
l = loss(net(X), y)
l.sum().backward()
trainer.step()
print(f'epoch {epoch + 1}, '
f'loss: {d2l.evaluate_loss(net, train_iter, loss):f}')

net = get_net()
train(net, train_iter, loss, 5, 0.01)
1
2
3
4
5
onestep_preds = net(features)
d2l.plot([time, time[tau:]],
[x.detach().numpy(), onestep_preds.detach().numpy()], 'time',
'x', legend=['data', '1-step preds'], xlim=[1, 1000],
figsize=(6, 3))
1
2
3
4
5
6
7
8
9
10
11
multistep_preds = torch.zeros(T)
multistep_preds[: n_train + tau] = x[: n_train + tau]
for i in range(n_train + tau, T):
multistep_preds[i] = net(
multistep_preds[i - tau:i].reshape((1, -1)))

d2l.plot([time, time[tau:], time[n_train + tau:]],
[x.detach().numpy(), onestep_preds.detach().numpy(),
multistep_preds[n_train + tau:].detach().numpy()], 'time',
'x', legend=['data', '1-step preds', 'multistep preds'],
xlim=[1, 1000], figsize=(6, 3))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
max_steps = 64

features = torch.zeros((T - tau - max_steps + 1, tau + max_steps))
# 列i(i<tau)是来自x的观测,其时间步从(i)到(i+T-tau-max_steps+1)
for i in range(tau):
features[:, i] = x[i: i + T - tau - max_steps + 1]

# 列i(i>=tau)是来自(i-tau+1)步的预测,其时间步从(i)到(i+T-tau-max_steps+1)
for i in range(tau, tau + max_steps):
features[:, i] = net(features[:, i - tau:i]).reshape(-1)

steps = (1, 4, 16, 64)
d2l.plot([time[tau + i - 1: T - max_steps + i] for i in steps],
[features[:, tau + i - 1].detach().numpy() for i in steps], 'time', 'x',
legend=[f'{i}-step preds' for i in steps], xlim=[5, 1000],
figsize=(6, 3))

文本预处理

代码

1
2
3
import collections
import re
from d2l import torch as d2l
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 读取数据集
#@save
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
'090b5e7e70c295757f55df93cb0a180b9691891a')

def read_time_machine(): #@save
"""将时间机器数据集加载到文本行的列表中"""
with open(d2l.download('time_machine'), 'r') as f:
lines = f.readlines()
return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]

lines = read_time_machine()
print(f'# 文本总行数: {len(lines)}')
print(lines[0])
print(lines[10])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 根据token='word'或'char'
# 将句子拆分为一个个单词或者字符
def tokenize(lines, token='word'): #@save
"""将文本行拆分为单词或字符词元"""
if token == 'word':
return [line.split() for line in lines]
elif token == 'char':
return [list(line) for line in lines]
else:
print('错误:未知词元类型:' + token)

tokens = tokenize(lines)
for i in range(11):
print(tokens[i])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 将tokens创建Vocabulary 词表
# 建立单词与索引的关系
class Vocab: #@save
"""文本词表"""
def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
# min_freq 出现频率小于该参数的单词忽略不计
if tokens is None:
tokens = []
if reserved_tokens is None:
reserved_tokens = []
# 按出现频率排序
counter = count_corpus(tokens)
self._token_freqs = sorted(counter.items(), key=lambda x: x[1],
reverse=True)
# 未知词元的索引为0
self.idx_to_token = ['<unk>'] + reserved_tokens
self.token_to_idx = {token: idx
for idx, token in enumerate(self.idx_to_token)}
for token, freq in self._token_freqs:
if freq < min_freq:
break
if token not in self.token_to_idx:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1

def __len__(self):
return len(self.idx_to_token)

def __getitem__(self, tokens):
if not isinstance(tokens, (list, tuple)):
return self.token_to_idx.get(tokens, self.unk)
return [self.__getitem__(token) for token in tokens]

def to_tokens(self, indices):
if not isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
return [self.idx_to_token[index] for index in indices]

@property
def unk(self): # 未知词元的索引为0
return 0

@property
def token_freqs(self):
return self._token_freqs

def count_corpus(tokens): #@save
"""统计词元的频率"""
# 这里的tokens是1D列表或2D列表
if len(tokens) == 0 or isinstance(tokens[0], list):
# 将词元列表展平成一个列表
tokens = [token for line in tokens for token in line]
return collections.Counter(tokens)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def load_corpus_time_machine(max_tokens=-1):  #@save
"""返回时光机器数据集的词元索引列表和词表"""
lines = read_time_machine()
tokens = tokenize(lines, 'char')
vocab = Vocab(tokens)
# 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落,
# 所以将所有文本行展平到一个列表中
corpus = [vocab[token] for line in tokens for token in line]
if max_tokens > 0:
corpus = corpus[:max_tokens]
return corpus, vocab

corpus, vocab = load_corpus_time_machine()
len(corpus), len(vocab)

语言模型数据集

目的

对一个文档,甚至是一个词元序列进行建模。

\[P(x_1, x_2, \ldots, x_T) = \prod_{t=1}^T P(x_t \mid x_1, \ldots, x_{t-1}).\]

\[P(\text{deep}, \text{learning}, \text{is}, \text{fun}) = P(\text{deep}) P(\text{learning} \mid \text{deep}) P(\text{is} \mid \text{deep}, \text{learning}) P(\text{fun} \mid \text{deep}, \text{learning}, \text{is}).\]

马尔可夫模型

  • n元语法

涉及一个、两个和三个变量的概率公式分别被称为 一元语法(unigram)、二元语法(bigram)和三元语法(trigram)模型。

\[\begin{split}\begin{aligned}P(x_1, x_2, x_3, x_4) &= P(x_1) P(x_2) P(x_3) P(x_4),\\ P(x_1, x_2, x_3, x_4) &= P(x_1) P(x_2 \mid x_1) P(x_3 \mid x_2) P(x_4 \mid x_3),\\ P(x_1, x_2, x_3, x_4) &= P(x_1) P(x_2 \mid x_1) P(x_3 \mid x_1, x_2) P(x_4 \mid x_2, x_3). \end{aligned}\end{split}\]

读取长时间序列数据

  • 随机分区
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def seq_data_iter_random(corpus, batch_size, num_steps):  #@save
"""使用随机抽样生成一个小批量子序列"""
# 从随机偏移量开始对序列进行分区,随机范围包括num_steps-1
# [4:34] len = 31
corpus = corpus[random.randint(0, num_steps - 1):]
# 减去1,是因为我们需要考虑标签
# 6个sebseq 每个包含5个词元
num_subseqs = (len(corpus) - 1) // num_steps
# 长度为num_steps的子序列的起始索引
# [0,5,10,15,20,25]
initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
# 在随机抽样的迭代过程中,
# 来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻
# [5,15,0,25,10,20]
random.shuffle(initial_indices)

def data(pos):
# 返回从pos位置开始的长度为num_steps的序列
return corpus[pos: pos + num_steps]

num_batches = num_subseqs // batch_size
for i in range(0, batch_size * num_batches, batch_size):
# 在这里,initial_indices包含子序列的随机起始索引
# [5,15] [0,25] [10,20]
initial_indices_per_batch = initial_indices[i: i + batch_size]
# [5:5+5],[15:15+5] ...
X = [data(j) for j in initial_indices_per_batch]
# [6:6+5],[16:16+5] ...
Y = [data(j + 1) for j in initial_indices_per_batch]
yield torch.tensor(X), torch.tensor(Y)

seg_data_iter_random(range(0,35),bathch_size=2,num_steps=5)
  • 顺序分区

​ 没有random.shuffle

1
2
3
4
5
6
7
8
9
10
11
12
13
def seq_data_iter_sequential(corpus, batch_size, num_steps):  #@save
"""使用顺序分区生成一个小批量子序列"""
# 从随机偏移量开始划分序列
offset = random.randint(0, num_steps)
num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
Xs = torch.tensor(corpus[offset: offset + num_tokens])
Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])
Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
num_batches = Xs.shape[1] // num_steps
for i in range(0, num_steps * num_batches, num_steps):
X = Xs[:, i: i + num_steps]
Y = Ys[:, i: i + num_steps]
yield X, Y
  • 封装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class SeqDataLoader:  #@save
"""加载序列数据的迭代器"""
def __init__(self, batch_size, num_steps, use_random_iter, max_tokens):
if use_random_iter:
self.data_iter_fn = d2l.seq_data_iter_random
else:
self.data_iter_fn = d2l.seq_data_iter_sequential
self.corpus, self.vocab = d2l.load_corpus_time_machine(max_tokens)
self.batch_size, self.num_steps = batch_size, num_steps

def __iter__(self):
return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)

def load_data_time_machine(batch_size, num_steps, #@save
use_random_iter=False, max_tokens=10000):
"""返回时光机器数据集的迭代器和词表"""
data_iter = SeqDataLoader(
batch_size, num_steps, use_random_iter, max_tokens)
return data_iter, data_iter.vocab

循环神经网络

独热编码

概念
  • 将词元的数字索引变为更具表现力的特征向量
方法
  • 假设词表中不同词元的数目为\(N\)(len(vocab)),词元索引的范围为\(0\)\(N-1\)

  • 如果词元的索引是整数\(i\), 那么我们将创建一个长度为\(N\)的全向量, 并将第处的元素设置为\(i\)

注意
1
2
3
4
5
6
7
8
# (2,5) batch_size = 2 num_step = 5 (一次五个字符(词元),时间维度、长度T )
X = torch.arange(10).reshape((2, 5))
# X.T 把时间维度提到第一维
# 因为后面是对时间维度进行更新
# X.T更方便
F.one_hot(X.T, 28).shape

# [5,2,28]

梯度裁剪

原因

对于长度为的序列,我们在迭代中计算这个时间步上的梯度, 将会在反向传播过程中产生长度为的矩阵乘法链。 当\(\mathcal{O}(T)\)较大时,它可能导致数值不稳定, 例如可能导致梯度爆炸或梯度消失。 因此,循环神经网络模型往往需要额外的方式来支持稳定训练。

概念
  • 如果梯度长度超过\(\theta\),那么拖影回长度$ $

\[\mathbf{g} \leftarrow \min\left(1, \frac{\theta}{\|\mathbf{g}\|}\right) \mathbf{g}.\]

困惑度

概念
  • 下一个词元的实际选择数的调和平均数
  • 用于衡量语言模型的质量
  • 我们在引入softmax回归 ( 3.4.7节)时定义了熵、惊异和交叉熵。 如果想要压缩文本,我们可以根据当前词元集预测的下一个词元。 一个更好的语言模型应该能让我们更准确地预测下一个词元。 因此,它应该允许我们在压缩序列时花费更少的比特。 所以我们可以通过一个序列中所有的个词元的交叉熵损失的平均值来衡量:

\[\frac{1}{n} \sum_{t=1}^n -\log P(x_t \mid x_{t-1}, \ldots, x_1),\]

  • 其中由语言模型给出, 是在时间步从该序列中观察到的实际词元。 这使得不同长度的文档的性能具有了可比性。 由于历史原因,自然语言处理的科学家更喜欢使用一个叫做困惑度(perplexity)的量。 简而言之,它是 交叉熵损失的平均值的指数:

\[\exp\left(-\frac{1}{n} \sum_{t=1}^n \log P(x_t \mid x_{t-1}, \ldots, x_1)\right).\]

  • 预测情况
    • 在最好的情况下,模型总是完美地估计标签词元的概率为1。 在这种情况下,模型的困惑度为1。
    • 在最坏的情况下,模型总是预测标签词元的概率为0。 在这种情况下,困惑度是正无穷大。
    • 在基线上,该模型的预测是词表的所有可用词元上的均匀分布。 在这种情况下,困惑度等于词表中唯一词元的数量。 事实上,如果我们在没有任何压缩的情况下存储序列, 这将是我们能做的最好的编码方式。 因此,这种方式提供了一个重要的上限, 而任何实际模型都必须超越这个上限。

实现部分

一次迭代过程

  • data_iter部分 train_iter,test_iter

    • 加载数据load_data(batch_size,num_steps,use_random_sample)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    # batch_size = 2
    # 每次训练两个batch
    # num_setp = 5
    # 往后预测5个 train模式下,算5个的误差 重复两次(batch)
    # X->y
    # 3->5 5->13 13->2 2->1 1->13
    tensor([[ 3, 5, 13, 2, 1],
    [14, 11, 2, 1, 17]])
    tensor([[ 5, 13, 2, 1, 13],
    [11, 2, 1, 17, 4]])
  • 网络部分初始化

    • \(n\)\(batch\_size\)\(d\)\(input\_dim\)\(h\)\(hidden\_num\)
      • 其中\(d=h=vobal\_size=len(vobal)\)
      • \(W\_xh\)\(d\cdot h\), \(W\_hh\)\(h\cdot h\)\(b_h\)\(1\cdot h\),(隐藏层参数)
      • $ W_hq\(:\)hq\(,\) b_q$:\(1\cdot q\)(输出层参数)
    • begin_state:->init_rnn_state:隐藏层的隐藏单元的初始化 \(n\cdot h\)
    • get_params:->self.params:隐藏层与输出层参数初始化\([W_xh, W_hh, b_h, W_hq, b_q]\)
  • 网络部分计算

    • \(n · d · d · h\) + $· h · q + 1 · q = n · q $ (\(x_{t-1}\)计算\(x_t\)) + $· h · h + 1 · h = n · h $(更新隐藏层)

    • 具体过程

      • onehot编码: input 5 · 2 --> 5 · 2 · 32
        • num_step · batch_size --> num_step · batch_size · vocal_size

      1
      2
      3
      F.one_hot(torch.tensor([0, 2]), 5) 
      tensor([[1, 0, 0, 0, 0],
      [0, 0, 1, 0, 0]])

      • forward计算
      1
      2
      3
      4
      5
      6
      7
      8
      9
      y = Y.T.reshape(-1)
      X, y = X.to(device), y.to(device)
      y_hat, state = net(X, state)
      # net = RNNModelScratch(len(vocab), num_hiddens, d2l.try_gpu(), get_params,init_rnn_state, rnn)
      # rnn
      # for X in inputs:
      # H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
      # Y = torch.mm(H, W_hq) + b_q
      # outputs.append(Y)
      • loss计算与反向传播
      1
      2
      3
      4
      5
      l = loss(y_hat, y.long()).mean()
      updater.zero_grad()
      l.backward()
      grad_clipping(net, 1)
      updater.step()

手动实现

1
2
3
4
5
6
7
8
9
10

import math
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l

# 加载数据 批量大小 32 时间步长 35
batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 初始化隐藏层参数
# 28 512
def get_params(vocab_size, num_hiddens, device):
num_inputs = num_outputs = vocab_size

def normal(shape):
return torch.randn(size=shape, device=device) * 0.01

# 隐藏层参数
# 28 512 d x h
W_xh = normal((num_inputs, num_hiddens))
# 512 512 h x h
W_hh = normal((num_hiddens, num_hiddens))
# 512 1 x h
b_h = torch.zeros(num_hiddens, device=device)
# 输出层参数
# 512 28 h x q(d)
W_hq = normal((num_hiddens, num_outputs))
# 28 1 x q(d)
b_q = torch.zeros(num_outputs, device=device)
# 附加梯度
params = [W_xh, W_hh, b_h, W_hq, b_q]
for param in params:
param.requires_grad_(True)
return params

# 初始化隐藏层状态· ht n x h
# 32 512
def init_rnn_state(batch_size, num_hiddens, device):
return (torch.zeros((batch_size, num_hiddens)), device=device),)
隐藏层输入输出与参数
1
2
3
4
5
6
7
8
9
10
11
12
# 定义RNN层
def rnn(inputs, state, params):
# inputs的形状:(时间步数量,批量大小,词表大小)
W_xh, W_hh, b_h, W_hq, b_q = params
H, = state
outputs = []
# X的形状:(批量大小,词表大小)
for X in inputs:
H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
Y = torch.mm(H, W_hq) + b_q
outputs.append(Y)
return torch.cat(outputs, dim=0), (H,)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 封装
class RNNModelScratch: #@save
"""从零开始实现的循环神经网络模型"""
def __init__(self, vocab_size, num_hiddens, device,
get_params, init_state, forward_fn):
self.vocab_size, self.num_hiddens = vocab_size, num_hiddens
self.params = get_params(vocab_size, num_hiddens, device)
self.init_state, self.forward_fn = init_state, forward_fn

def __call__(self, X, state):
# X 32 x 35 x 1 batch_size num_step seq
X = F.one_hot(X.T, self.vocab_size).type(torch.float32)
# X 35 x 32 x 28 num_step batch_size OnehotSeq
return self.forward_fn(X, state, self.params)

def begin_state(self, batch_size, device):
return self.init_state(batch_size, self.num_hiddens, device)
1
2
3
num_hiddens = 512
net = RNNModelScratch(len(vocab), num_hiddens, d2l.try_gpu(), get_params,
init_rnn_state, rnn)
1
2
3
4
5
6
7
8
9
10
11
12
13
# 预测
def predict_ch8(prefix, num_preds, net, vocab, device): #@save
"""在prefix后面生成新字符"""
state = net.begin_state(batch_size=1, device=device)
outputs = [vocab[prefix[0]]]
get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))
for y in prefix[1:]: # 预热期
_, state = net(get_input(), state)
outputs.append(vocab[y])
for _ in range(num_preds): # 预测num_preds步
y, state = net(get_input(), state)
outputs.append(int(y.argmax(dim=1).reshape(1)))
return ''.join([vocab.idx_to_token[i] for i in outputs])
1
2
3
4
5
6
7
8
9
10
def grad_clipping(net, theta):  #@save
"""裁剪梯度"""
if isinstance(net, nn.Module):
params = [p for p in net.parameters() if p.requires_grad]
else:
params = net.params
norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
if norm > theta:
for param in params:
param.grad[:] *= theta / norm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#@save
def train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter):
"""训练网络一个迭代周期(定义见第8章)"""
state, timer = None, d2l.Timer()
metric = d2l.Accumulator(2) # 训练损失之和,词元数量
for X, Y in train_iter:
if state is None or use_random_iter:
# 在第一次迭代或使用随机抽样时初始化state
state = net.begin_state(batch_size=X.shape[0], device=device)
else:
if isinstance(net, nn.Module) and not isinstance(state, tuple):
# state对于nn.GRU是个张量
state.detach_()
else:
# state对于nn.LSTM或对于我们从零开始实现的模型是个张量
for s in state:
s.detach_()
y = Y.T.reshape(-1)
X, y = X.to(device), y.to(device)
y_hat, state = net(X, state)
# state 最后一步的状态,包含了前面所有信息的更新
# y: [1020] num_step x batch_size
# y_hat: [1020,28] num_step x batch_size x vocal_size
# loss 为CrossEntropy
l = loss(y_hat, y.long()).mean()
if isinstance(updater, torch.optim.Optimizer):
updater.zero_grad()
l.backward()
grad_clipping(net, 1)
updater.step()
else:
l.backward()
grad_clipping(net, 1)
# 因为已经调用了mean函数
updater(batch_size=1)
metric.add(l * y.numel(), y.numel())
return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#@save
def train_ch8(net, train_iter, vocab, lr, num_epochs, device,
use_random_iter=False):
"""训练模型(定义见第8章)"""
loss = nn.CrossEntropyLoss()
animator = d2l.Animator(xlabel='epoch', ylabel='perplexity',
legend=['train'], xlim=[10, num_epochs])
# 初始化
if isinstance(net, nn.Module):
updater = torch.optim.SGD(net.parameters(), lr)
else:
updater = lambda batch_size: d2l.sgd(net.params, lr, batch_size)
predict = lambda prefix: predict_ch8(prefix, 50, net, vocab, device)
# 训练和预测
for epoch in range(num_epochs):
ppl, speed = train_epoch_ch8(
net, train_iter, loss, updater, device, use_random_iter)
if (epoch + 1) % 10 == 0:
print(predict('time traveller'))
animator.add(epoch + 1, [ppl])
print(f'困惑度 {ppl:.1f}, {speed:.1f} 词元/秒 {str(device)}')
print(predict('time traveller'))
print(predict('traveller'))
1
2
3
4
5
6
# 训练
num_epochs, lr = 500, 1
train_ch8(net, train_iter, vocab, lr, num_epochs, d2l.try_gpu())
# 调用网络预测
predict_ch8('time traveller ', 10, net, vocab, d2l.try_gpu())

简洁实现

1
2
3
4
5
6
7
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l

batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
1
2
3
4
5
6
7
8
# 初始化隐藏单元数量
# 初始化隐藏层
num_hiddens = 256
# 利用nn.RNN()定义RNN层
# 没有输出层,需要在封装的网络类中加入nn.Linear()作为输出
rnn_layer = nn.RNN(len(vocab), num_hiddens)
# 初始化隐藏层状态 (隐藏层数、批量大小、隐藏单元数)
state = torch.zeros((1, batch_size, num_hiddens))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#@save
class RNNModel(nn.Module):
"""循环神经网络模型"""
def __init__(self, rnn_layer, vocab_size, **kwargs):
super(RNNModel, self).__init__(**kwargs)
self.rnn = rnn_layer
self.vocab_size = vocab_size
self.num_hiddens = self.rnn.hidden_size
# 如果RNN是双向的(之后将介绍),num_directions应该是2,否则应该是1
if not self.rnn.bidirectional:
self.num_directions = 1
self.linear = nn.Linear(self.num_hiddens, self.vocab_size)
else:
self.num_directions = 2
self.linear = nn.Linear(self.num_hiddens * 2, self.vocab_size)

def forward(self, inputs, state):
X = F.one_hot(inputs.T.long(), self.vocab_size)
# X 35 x 32 x 28 num_step batch_size OnehotSeq
X = X.to(torch.float32)
# Y 35 x 32 x 256 num_step x batch_size x hidden_num
# state 1 x 32 x 256 最后一次训练的state state = Y[-1]
Y, state = self.rnn(X, state)
# 全连接层首先将Y的形状改为(时间步数*批量大小,隐藏单元数)
# 它的输出形状是(时间步数*批量大小,词表大小)。
# 每次的状态计算每次的输出计算loss
output = self.linear(Y.reshape((-1, Y.shape[-1])))
return output, state

def begin_state(self, device, batch_size=1):
if not isinstance(self.rnn, nn.LSTM):
# nn.GRU以张量作为隐状态
return torch.zeros((self.num_directions * self.rnn.num_layers,
batch_size, self.num_hiddens),
device=device)
else:
# nn.LSTM以元组作为隐状态
return (torch.zeros((
self.num_directions * self.rnn.num_layers,
batch_size, self.num_hiddens), device=device),
torch.zeros((
self.num_directions * self.rnn.num_layers,
batch_size, self.num_hiddens), device=device))
1
2
3
4
5
6
7
8
device = d2l.try_gpu()
# 定义网络
net = RNNModel(rnn_layer, vocab_size=len(vocab))
net = net.to(device)
num_epochs, lr = 500, 1
# 训练、预测
d2l.train_ch8(net, train_iter, vocab, lr, num_epochs, device)
d2l.predict_ch8('time traveller', 10, net, vocab, device)

深度学习 循环神经网络 笔记
https://anonymouslosty.ink/2023/07/05/深度学习 循环神经网络 笔记/
作者
Ling yi
发布于
2023年7月5日
更新于
2023年7月23日
许可协议