反向传播推导

从零实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import torch
from d2l import torch as d2l
from torch.nn import functional as F
from torch import nn
import math

def get_params(vocab_size, num_hiddens, device):
input_size = output_size = vocab_size
def normal(shape):
return torch.randn(size=shape, device=device)*0.01

w_xh = normal((input_size, num_hiddens))
w_hh = normal((num_hiddens, num_hiddens))
b_h = normal((num_hiddens, ))
w_ho = normal((num_hiddens, output_size))
b_o = normal((output_size, ))

# 需要梯度
params = [w_xh, w_hh, b_h, w_ho, b_o]
for param in params:
param.requires_grad_(True)
return params

def init_state(batch_size, num_hiddens, device):
return (torch.zeros((batch_size, num_hiddens), device=device),)

# 需要修改,因为初始化H只在一开始初始化即可
def rnn(inputs: torch.Tensor, params, state)->torch.Tensor:
w_xh, w_hh, b_h, w_ho, b_o = params
# inputs shape 应该是(时间步,batch size,vocab size)
# 初始化隐藏层H
H, = state

outputs = []
for X in inputs:
# 一次调用一个时间步
# H = ReLu(w_xh*X+x_hh*H+b_h)
H = F.relu(torch.matmul(X, w_xh) + torch.matmul(H, w_hh) + b_h)
Y = torch.matmul(H, w_ho) + b_o
outputs.append(Y)

# 这里作升维操作,容易看懂些
return torch.stack(outputs, dim=0), H

class RNN(object):
def __init__(self, vocab_size, num_hiddens, device, init_state, get_params, forward_fn):
self.vocab_size, self.num_hiddens = vocab_size, num_hiddens
self.state = init_state(batch_size, num_hiddens, device)
self.params = get_params(vocab_size, num_hiddens, device)
self.forward_fn = forward_fn
def __call__(self, X:torch.tensor, state=None):
# state每次call前需要detach
if state is not None: self.state = state
for s in self.state:
s.detach_()
inputs = F.one_hot(X.T, len(vocab)).type(torch.float32)
outputs, H = self.forward_fn(inputs, self.params, self.state)
self.state = (H, )
return outputs, self.state
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
batch_size = 32
vocab_size = 35
# vocab 是基于字母的标号
train_iter, vocab = d2l.load_data_time_machine(batch_size, vocab_size)

num_epochs, lr = 1000, 1
device = torch.device('cuda')
net = RNN(len(vocab), 256, device, init_state, get_params, rnn)
loss = nn.CrossEntropyLoss()
# updater = lambda batch_size: d2l.sgd(net.params, lr, batch_size)
updater = torch.optim.SGD(net.params, lr=lr)

def grad_clipping(net, theta): #@save
"""裁剪梯度"""
if isinstance(net, nn.Module):
params = [p for p in net.parameters() if p.requires_grad]
else:
params = net.params
norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
if norm > theta:
for param in params:
param.grad[:] *= theta / norm

metric = d2l.Accumulator(2)
timer = d2l.Timer()
for epoch in range(num_epochs):
for x,y in train_iter:
x,y = x.to(device), y.to(device)
y_hat,_ = net(x)
# 为了计算loss需要展平
y = y.T.reshape(-1)
y_hat = y_hat.reshape(-1, len(vocab))

l = loss(y_hat, y.long()).mean()
updater.zero_grad()
l.backward()
grad_clipping(net, 1)
updater.step()
# updater(batch_size=1)
metric.add(l * y.numel(), y.numel())
if epoch%100 == 0:
print('困惑度:', math.exp(metric[0] / metric[1]))

math.exp(metric[0] / metric[1]), metric[1] / timer.stop()
torch.save(net, './model/rnn.pth')

# 输出
困惑度: 24.934630997035686
困惑度: 8.3297534126987
困惑度: 4.68385855178817
困惑度: 3.194267959177166
困惑度: 2.5469194289914774
困惑度: 2.194944532541399
困惑度: 1.9762917313365411
困惑度: 1.832809809645257
困惑度: 1.7284918412347725
困惑度: 1.6507796068733742
(1.587854868672201, 100896.02240493629)

predict

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
net = torch.load('./model/rnn.pth')
def predict(prefix, num_preds, net, vocab, device):
state = init_state(batch_size=1, num_hiddens=256, device=device)
outputs = [vocab[prefix[0]]]
get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))
for y in prefix[1:]:
_,state = net(get_input(), state)
outputs.append(vocab[y])
for _ in range(num_preds): # 预测num_preds步
y, _ = net(get_input())
outputs.append(int(y.argmax(dim=2).reshape(1)))
return ''.join([vocab.idx_to_token[i] for i in outputs])

device = torch.device('cuda')
predict('time traveller ', 100, net, vocab, device)

# 输出
'time traveller for so it will be convenient to speak of himwas expounding a recondite matter to us his grey cyeing '