基于transformer的机器翻译实战
文本序列中的单词是有顺序的,一个单词在序列中所处的位置对我们理解其词义、上下文关系都十分重要,但是传统的词向量嵌入(word embedding)并不包含位置信息,所以专门引入位置向量。两种方式的比较:有论文实验显示,绝对位置嵌入和可学习位置嵌入最终的效果是类似的,但是可学习位置嵌入会引入额外的参数,增加训练开销,所以本项目使用。作用是避免过拟合,如果不使用mask,会导致模型在训练时就能看到整个
一、基本模块
1、位置嵌入(position embedding)
(1)为什么要引入位置嵌入?
文本序列中的单词是有顺序的,一个单词在序列中所处的位置对我们理解其词义、上下文关系都十分重要,但是传统的词向量嵌入(word embedding)并不包含位置信息,所以专门引入位置向量。
(2)如何实现位置嵌入?
主要有两种方式:
- 可学习位置嵌入:为每一个位置初始化一个位置嵌入向量,并且将位置嵌入向量作为模型参数,之后会训练过程中不断更新该向量。
- 绝对位置嵌入:位置嵌入向量初始化之后就不再改变。一般基于三角函数式,又称Sinusoidal Position Encoding,公式如下:
分别通过sin和cos计算位置 k 的编码向量的第 2 i 和 2 i + 1个分量,是位置向量的维度。
两种方式的比较:有论文实验显示,绝对位置嵌入和可学习位置嵌入最终的效果是类似的,但是可学习位置嵌入会引入额外的参数,增加训练开销,所以本项目使用基于三角函数式的绝对位置嵌入。
2、掩码机制(mask)
(1)mask的作用是什么?什么情况下需要使用mask?
作用是避免过拟合,如果不使用mask,会导致模型在训练时就能看到整个句子,从而导致训练准确度上升很快,但是验证准确度会先升后降。
第一种情况是输入序列长度不一致,需要使用“pad”字符补全短序列,保证序列长度一致性。在计算注意力时,就会需要将“pad”字符掩去。
第二种情况是为了保证训练效果,在训练时不能直接看到整个句子,而是只能看到当前所处位置及其之前位置的单词,所以可以使用三角型的mask矩阵。
(2)mask实现方式
对于第一种情况,需要先确定在词表中“pad”的序号,不妨假设pad = 1,序列向量,辅助矩阵
,这里的1是因为pad=1,然后比较seq和p,相等的位置置1,不相等的位置置0,得到mask矩阵:
本项目使用self-attention,会出现上述第二种情况。由于是self-attention,因此Q = K = V,假设:
根据注意力计算公式,需要先计算:
当我们遍历到第2个位置时,应该只能知道和
,而无法看到
和
,所以理论上无法计算出
和
,因此要把这两个位置掩去,同理可以推出mask矩阵形式为:
二、代码
1、目录架构
Machine_translation
--data #存放数据集
--eng-fra.txt #英语-法语数据集
--save #保存模型参数
--data_process.py #数据预处理
--decoder.py #定义transformer解码器
--encoder.py #定义transformer编码器
--layer.py #定义transformer网络层
--modules.py #实现位置嵌入、mask、词/索引转换等模块
--optimizer.py #动态学习率
--train.py #配置以及训练
--transformer.py #搭建transformer模型
2、data_process.py:数据预处理
数据集下载:见文章顶部
数据标准化流程:转小写 ——> 转码 ——> 在标点符号前插入空格 ——> 剔除数字等非法字符 ——> 剔除多余空格
import unicodedata
import re
import pandas as pd
import torchtext
import torch
from tqdm import tqdm
from sklearn.model_selection import train_test_split
class DataLoader:
def __init__(self, data_iter):
self.data_iter = data_iter
self.length = len(data_iter) # 一共有多少个batch?
def __len__(self):
return self.length
def __iter__(self):
# 注意,在此处调整text的shape为batch first
for batch in self.data_iter:
yield (torch.transpose(batch.src, 0, 1), torch.transpose(batch.targ, 0, 1))
# 将unicode字符串转化为ASCII码
def unicodeToAscii(s):
return ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn')
# 标准化句子序列
def normalizeString(s):
s = s.lower().strip() # 全部转小写
s = unicodeToAscii(s)
s = re.sub(r"([.!?])", r" \1", s) # \1表示group(1)即第一个匹配到的 即匹配到'.'或者'!'或者'?'后,一律替换成'空格.'或者'空格!'或者'空格?'
s = re.sub(r"[^a-zA-Z.!?]+", r" ", s) # 非字母以及非.!?的其他任何字符 一律被替换成空格
s = re.sub(r'[\s]+', " ", s) # 将出现的多个空格,都使用一个空格代替。例如:w='abc aa bb' 处理后:w='abc aa bb'
return s
# 文件是英译法,我们实现的是法译英,所以进行了reverse,所以pair[1]是英语
def exchangepairs(pairs):
# 过滤,并交换句子顺序,得到法英句子对(之前是英法句子对)
return [[pair[1], pair[0]] for pair in pairs]
def get_dataset(pairs, src, targ):
fields = [('src', src), ('targ', targ)] # filed信息 fields dict[str, Field])
examples = [] # list(Example)
for fra, eng in tqdm(pairs): # 进度条
# 创建Example时会调用field.preprocess方法
examples.append(torchtext.legacy.data.Example.fromlist([fra, eng], fields))
return examples, fields
def get_datapipe(opt, src, tar):
data_df = pd.read_csv(opt.data_dir + 'eng-fra.txt', # 数据格式:英语\t法语,注意我们的任务源语言是法语,目标语言是英语
encoding='UTF-8', sep='\t', header=None,
names=['eng', 'fra'], index_col=False)
pairs = [[normalizeString(s) for s in line] for line in data_df.values]
pairs = exchangepairs(pairs)
train_pairs, val_pairs = train_test_split(pairs, test_size=0.2, random_state=1234)
ds_train = torchtext.legacy.data.Dataset(*get_dataset(train_pairs, src, tar))
ds_val = torchtext.legacy.data.Dataset(*get_dataset(val_pairs, src, tar))
train_iter, val_iter = torchtext.legacy.data.Iterator.splits(
(ds_train, ds_val),
sort_within_batch=True,
sort_key=lambda x: len(x.src),
batch_sizes=(opt.batch_size, opt.batch_size)
)
train_dataloader = DataLoader(train_iter)
val_dataloader = DataLoader(val_iter)
return train_dataloader, val_dataloader, ds_train
3、modules.py:实现位置嵌入、mask、词/索引转换模块
import torch
from torch import nn
import numpy as np
from data_process import normalizeString
"""
位置编码:有可学习位置编码与相对位置编码,实验表明两者的效果差不多,但是绝对位置编码可以减少参数量
本项目使用三角函数式的绝对位置编码,原理是计算sin和cos来表示位置k的编码向量的2i,2i+1个分量
"""
def positional_encoding(max_seq_len, d_word_vec):
"""
max_seq_len:序列长度,即单词数
d_word_vec:位置编码向量维度
"""
# 计算位置向量
pos_enc = np.array(
[[pos / np.power(10000, 2.0 * (j // 2) / d_word_vec) for j in range(d_word_vec)]
for pos in range(max_seq_len)])
pos_enc[:, 0::2] = np.sin(pos_enc[:, 0::2])
pos_enc[:, 1::2] = np.cos(pos_enc[:, 1::2])
# 维度扩展
pos_enc = pos_enc[np.newaxis, :] # (max_seq_len, d_word_vec) -> (1, max_seq_len, d_word_vec)
return torch.tensor(pos_enc, dtype=torch.float32)
"""
掩码机制
在encoder中使用padding_mask
在decoder中使用look_ahead_mask与padding_mask
"""
pad = 0 # 重要参数,必须与字符‘pad’在词表中的索引保持一致,在train.py中可以查看
def create_look_ahead_mask(seq_len):
look_ahead_mask = torch.tril(torch.ones(seq_len, seq_len), diagonal = 0)
return look_ahead_mask
def create_padding_mask(pad, seq):
seq = torch.eq(seq, torch.tensor(pad)).float()
return seq[:, np.newaxis, np.newaxis, :]
# 计算带有mask的损失
def mask_loss_func(real, pred):
loss_object = torch.nn.CrossEntropyLoss(reduction='none')
_loss = loss_object(pred.transpose(-1,-2), real)
# logical_not 取非
# mask 每个元素为bool值,如果real中有pad,则mask相应位置就为False
# mask = torch.logical_not(real.eq(0)).type(_loss.dtype) # [b, targ_seq_len] pad=0的情况
mask = torch.logical_not(real.eq(pad)).type(_loss.dtype) # [b, targ_seq_len] pad!=0的情况
# 对应位置相乘,token上的损失被保留了下来,pad的loss被置为0或False 去掉,不计算在内
_loss *= mask
return _loss.sum() / mask.sum().item()
# 计算带有mask的准确度
# real [b, targ_seq_len]
# pred [b, targ_seq_len, target_vocab_size]
def mask_accuracy_func(real, pred):
_pred = pred.argmax(dim=-1) # [b, targ_seq_len, target_vocab_size]=>[b, targ_seq_len]
corrects = _pred.eq(real) # [b, targ_seq_len] bool值
# logical_not 取非
# mask 每个元素为bool值,如果real中有pad,则mask相应位置就为False
# mask = torch.logical_not(real.eq(0)) # [b, targ_seq_len] bool值 pad=0的情况
mask = torch.logical_not(real.eq(pad)) # [b, targ_seq_len] bool值 pad!=0的情况
# 对应位置相乘,token上的值被保留了下来,pad上的值被置为0或False 去掉,不计算在内
corrects *= mask
return corrects.sum().float() / mask.sum().item()
# inp [b, inp_seq_len] 序列已经加入pad填充
# targ [b, targ_seq_len] 序列已经加入pad填充
def create_mask(inp, targ):
# encoder padding mask
enc_padding_mask = create_padding_mask(pad, inp) # =>[b,1,1,inp_seq_len] mask=1的位置为pad
# decoder's first attention block(self-attention)
# 使用的padding create_mask & look-ahead create_mask
look_ahead_mask = create_look_ahead_mask(targ.shape[-1]) # =>[targ_seq_len,targ_seq_len] ##################
dec_targ_padding_mask = create_padding_mask(pad, targ) # =>[b,1,1,targ_seq_len]
combined_mask = torch.max(look_ahead_mask, dec_targ_padding_mask) # 结合了2种mask =>[b,1,targ_seq_len,targ_seq_len]
# decoder's second attention block(encoder-decoder attention) 使用的padding create_mask
# 【注意】:这里的mask是用于遮挡encoder output的填充pad,而encoder的输出与其输入shape都是[b,inp_seq_len,d_model]
# 所以这里mask的长度是inp_seq_len而不是targ_mask_len
dec_padding_mask = create_padding_mask(pad, inp) # =>[b,1,1,inp_seq_len] mask=1的位置为pad
return enc_padding_mask, combined_mask, dec_padding_mask
# [b,1,1,inp_seq_len], [b,1,targ_seq_len,targ_seq_len], [b,1,1,inp_seq_len]
"""
token与索引的编码与解码
"""
# tokenizer = lambda x:x.split()
# 单词 -> 索引
def tokenzier_encode(tokenize, sentence, vocab):
sentence = normalizeString(sentence) # 句子标准化
sentence = tokenize(sentence) # 分词,str -> list
sentence = ['<start>'] + sentence + ['<end>']
sentence_ids = [vocab.stoi[token] for token in sentence] # vocab.stoi可以快速查询到token在词表中对应的索引
return sentence_ids
# 索引 -> 单词
def tokenzier_decode(sentence_ids, vocab):
sentence = [vocab.itos[id] for id in sentence_ids if id<len(vocab)]
return " ".join(sentence) # 将sentence中的单词以空格为分隔的方式连接起来
4、layer.py:定义transformer网络层
前馈网络层Feed Forward:包含两个线性层,中间夹着一层ReLU激活。作用是更好的提取特征。
import torch
from torch import nn
# 多头注意力层
class MultiHeadAttention(torch.nn.Module):
def __init__(self, d_word_vec, num_heads, dropout):
"""
d_word_vec: 词向量维度
num_heads: 注意力头的数目
dropout: 取值0~1, 表示随机置为0的神经元的比例
"""
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.d_word_vec = d_word_vec
assert d_word_vec%self.num_heads == 0
self.wq = nn.Linear(d_word_vec, d_word_vec)
self.wk = nn.Linear(d_word_vec, d_word_vec)
self.wv = nn.Linear(d_word_vec, d_word_vec)
self.final_linear = nn.Linear(d_word_vec, d_word_vec)
self.dropout = nn.Dropout(dropout)
# 缩放:注意力计算时用到的根号d_k
self.scale = torch.sqrt(torch.FloatTensor([d_word_vec // self.num_heads])).cuda()
def split_heads(self, x, batch_size):
x = x.view(batch_size, -1, self.num_heads, self.d_word_vec // self.num_heads) # (batch_size, seq_len, d_word_vec) -> (batch_size, seq_len, num_heads, depth)
x = x.permute(0, 2, 1, 3) # (batch_size, seq_len, num_heads, depth) -> (batch_size, num_heads, seq_len, depth)
return x
def forward(self, q, k, v, mask):
batch_size = q.shape[0]
# 计算Q,K,V
Q = self.wq(q) # (batch_size, seq_len, d_word_vec)
K = self.wk(k) # (batch_size, seq_len, d_word_vec)
V = self.wv(v) # (batch_size, seq_len, d_word_vec)
# 将Q,K,V在d_word_vec维度上划分到多个注意力头中
Q = self.split_heads(Q, batch_size) # (batch_size, num_heads, seq_len, depth)
K = self.split_heads(K, batch_size) # (batch_size, num_heads, seq_len, depth)
V = self.split_heads(V, batch_size) # (batch_size, num_heads, seq_len, depth)
# 计算注意力
attention = torch.matmul(Q, K.permute(0, 1, 3, 2))/self.scale # (batch_size, num_heads, seq_len, seq_len)
# 掩码机制:如果mask不为空,就将mask中取值为0的位置的注意力设定为 -1e10
if mask is not None:
attention = attention.masked_fill(mask==1, -1e10)
attention = self.dropout(torch.softmax(attention, dim=-1))
# 将注意力分数与权重矩阵V相乘
x = torch.matmul(attention, V) # (batch_size, num_heads, seq_len, depth)
# 将多头计算结果拼接: (batch_size, num_heads, seq_len, depth) -> (batch_size, seq_len, num_heads, depth) -> (batch_size, seq_len, d_word_vec)
x = x.permute(0, 2, 1, 3).reshape(batch_size, -1, self.d_word_vec)
return self.final_linear(x)
# 点式前馈网络
def point_wise_feed_forward_network(d_word_vec, d_hidden):
feed_forward_net = nn.Sequential(
nn.Linear(d_word_vec, d_hidden),
nn.ReLU(),
nn.Linear(d_hidden, d_word_vec)
)
return feed_forward_net
class EncoderLayer(nn.Module):
def __init__(self, d_word_vec, num_heads, d_hidden, dropout=0.1):
super(EncoderLayer, self).__init__()
self.multiheadlayer = MultiHeadAttention(d_word_vec, num_heads, dropout)
self.ffn = point_wise_feed_forward_network(d_word_vec, d_hidden)
self.layernorm1 = nn.LayerNorm(normalized_shape=d_word_vec, eps=1e-6)
self.layernorm2 = nn.LayerNorm(normalized_shape=d_word_vec, eps=1e-6)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
def forward(self, x, mask):
attn_output = self.multiheadlayer(x, x, x, mask) # (batch_size, seq_len, d_word_vec)
attn_output = self.dropout1(attn_output)
out1 = self.layernorm1(x + attn_output)
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output)
out2 = self.layernorm2(out1 + ffn_output)
return out2 # (batch_size, seq_len, d_word_vec)
class DecoderLayer(nn.Module):
def __init__(self, d_word_vec, num_heads, d_hidden, dropout=0.1):
super(DecoderLayer, self).__init__()
self.multiheadlayer1 = MultiHeadAttention(d_word_vec, num_heads, dropout)
self.multiheadlayer2 = MultiHeadAttention(d_word_vec, num_heads, dropout)
self.ffn = point_wise_feed_forward_network(d_word_vec, d_hidden)
self.layernorm1 = nn.LayerNorm(normalized_shape=d_word_vec, eps=1e-6)
self.layernorm2 = nn.LayerNorm(normalized_shape=d_word_vec, eps=1e-6)
self.layernorm3 = nn.LayerNorm(normalized_shape=d_word_vec, eps=1e-6)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.dropout3 = nn.Dropout(dropout)
def forward(self, x, enc_output, look_ahead_mask, padding_mask):
attn1 = self.multiheadlayer1(x, x, x, look_ahead_mask)
attn1 = self.dropout1(attn1)
out1 = self.layernorm1(x + attn1)
attn2 = self.multiheadlayer2(out1, enc_output, enc_output, padding_mask)
attn2 = self.dropout2(attn2)
out2 = self.layernorm2(out1 + attn2)
ffn_output = self.ffn(out2)
ffn_output = self.dropout3(ffn_output)
out3 = self.layernorm3(out2 + ffn_output)
return out3
5、encoder.py:定义编码器
import torch
import pdb
from torch import nn
from modules import positional_encoding
from layer import EncoderLayer
class Encoder(nn.Module):
def __init__(self,
num_layers,
d_word_vec,
num_heads,
d_hidden,
vocab_size,
max_seq_len,
dropout = 0.1):
"""
num_layers: encoder网络层数
vocab_size: 源词表大小(待翻译语言词表)
"""
super(Encoder, self).__init__()
self.num_layers = num_layers
self.d_word_vec = d_word_vec
self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=d_word_vec)
self.pos_encoding = positional_encoding(max_seq_len, d_word_vec) # (1, max_seq_len, d_word_vec)
self.enc_layers = nn.ModuleList([EncoderLayer(d_word_vec, num_heads, d_hidden, dropout) for _ in range(num_layers)])
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask):
input_seq_len = x.shape[-1]
# 词嵌入与位置嵌入
x = self.embedding(x) # (batch_size, input_seq_len) -> (batch_size, input_seq_len, d_word_vec)
x *= torch.sqrt(torch.tensor(self.d_word_vec, dtype=torch.float32))
pos_enc = self.pos_encoding[:, :input_seq_len, :] # (1, input_seq_len, d_word_vec)
pos_enc = pos_enc.cuda()
x += pos_enc
x = self.dropout(x) # (batch_size, input_seq_len, d_word_vec)
# 经过num_layers层encoder
for i in range(self.num_layers):
x = self.enc_layers[i](x, mask)
return x # (batch_size, input_seq_len, d_word_vec)
6、decoder.py:定义解码器
import torch
from torch import nn
from modules import positional_encoding
from layer import DecoderLayer
class Decoder(nn.Module):
def __init__(
self,
num_layers,
d_word_vec,
num_heads,
d_hidden,
vocab_size,
max_seq_len,
dropout = 0.1):
"""
vocab_size: 目标词表大小
"""
super(Decoder, self).__init__()
self.num_layers = num_layers
self.d_word_vec = d_word_vec
self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=d_word_vec)
self.pos_encoding = positional_encoding(max_seq_len, d_word_vec) # (1, max_seq_len, d_word_vec)
self.dec_layers = nn.ModuleList([DecoderLayer(d_word_vec, num_heads, d_hidden, dropout) for _ in range(num_layers)])
self.dropout = nn.Dropout(dropout)
def forward(self, x, enc_output, look_ahead_mask, padding_mask):
target_seq_len = x.shape[-1]
# 词嵌入与位置嵌入
x = self.embedding(x) # (batch_size, target_seq_len) -> (batch_size, target_seq_len, d_word_vec)
x *= torch.sqrt(torch.tensor(self.d_word_vec, dtype=torch.float32)) # (batch_size, target_seq_len, d_word_vec)
pos_enc = self.pos_encoding[:, :target_seq_len, :] # (1, target_seq_len, d_word_vec)
pos_enc = pos_enc.cuda()
x += pos_enc # (batch_size, target_seq_len, d_word_vec)
x = self.dropout(x)
for i in range(self.num_layers):
x = self.dec_layers[i](x, enc_output, look_ahead_mask, padding_mask)
return x # (batch_size, target_seq_len, d_word_vec)
7、transformer.py:搭建transformer模型
from torch import nn
import pdb
from encoder import Encoder
from decoder import Decoder
class Transformer(nn.Module):
def __init__(self,
num_layers,
d_word_vec,
num_heads,
d_hidden,
input_vocab_size,
target_vocab_size,
input_seq_len,
target_seq_len,
dropout = 0.1
):
"""
input_vocab_size: 源语言词表大小
target_vocab_size: 目标语言词表大小
input_seq_len: 源语言序列的最大序列长度
target_seq_len: 目标语言序列的最大序列长度
"""
super(Transformer, self).__init__()
self.encoder = Encoder(num_layers, d_word_vec, num_heads, d_hidden, input_vocab_size, input_seq_len, dropout)
self.decoder = Decoder(num_layers, d_word_vec, num_heads, d_hidden, target_vocab_size, target_seq_len, dropout)
self.final_layer = nn.Linear(d_word_vec, target_vocab_size)
def forward(self, input, target, enc_padding_mask, look_ahead_mask, dec_papdding_mask):
enc_output = self.encoder(input, enc_padding_mask)
dec_output = self.decoder(target, enc_output, look_ahead_mask, dec_papdding_mask)
final_output = self.final_layer(dec_output)
return final_output
8、optimizer.py:实现动态学习率
import torch
class CustomSchedule(torch.optim.lr_scheduler._LRScheduler):
def __init__(self, optimizer, d_word_vec, warm_steps = 4):
"""
warm_steps: 热身步数,即学习率达到最大值所需的步数
"""
self.optimizer = optimizer
self.d_word_vec = d_word_vec
self.warmup_steps = warm_steps
super(CustomSchedule, self).__init__(optimizer)
# 使用动态学习率
def get_lr(self):
arg1 = self._step_count ** (-0.5)
arg2 = self._step_count * (self.warmup_steps ** -1.5)
dynamic_lr = (self.d_word_vec ** (-0.5)) * min(arg1, arg2)
return [dynamic_lr for group in self.optimizer.param_groups]
9、train.py:训练配置
要启动训练,只需要设置‘mode’为‘train’,并将‘model_path’设置为' '。
import torch
import torchtext
import argparse
import pandas as pd
from matplotlib import pyplot as plt
import datetime
import time
import copy
import os
from transformer import Transformer
from optimizer import CustomSchedule
from data_process import get_datapipe
from modules import create_mask, mask_loss_func, mask_accuracy_func, tokenzier_encode, tokenzier_decode
use_cuda = torch.cuda.is_available()
device = torch.device("cuda:0" if use_cuda else "cpu")
# 打印分隔线
def printbar():
nowtime = datetime.datetime.now().strftime('%Y-%m_%d %H:%M:%S')
print('\n' + "=========="*8 + '%s'%nowtime)
def train_step(model, optimizer, inp, targ):
# 目标(target)被分成了 tar_inp 和 tar_real
# tar_inp 作为输入传递到解码器。
# tar_real 是位移了 1 的同一个输入:在 tar_inp 中的每个位置,tar_real 包含了应该被预测到的下一个标记(token)。
targ_inp = targ[:, :-1]
targ_real = targ[:, 1:]
enc_padding_mask, combined_mask, dec_padding_mask = create_mask(inp, targ_inp)
inp = inp.to(device)
targ_inp = targ_inp.to(device)
targ_real = targ_real.to(device)
enc_padding_mask = enc_padding_mask.to(device)
combined_mask = combined_mask.to(device)
dec_padding_mask = dec_padding_mask.to(device)
model.train() # 设置train mode
optimizer.zero_grad() # 梯度清零
# forward
prediction = model(inp, targ_inp, enc_padding_mask, combined_mask, dec_padding_mask)
# [b, targ_seq_len, target_vocab_size]
loss = mask_loss_func(targ_real, prediction)
metric = mask_accuracy_func(targ_real, prediction)
# backward
loss.backward() # 反向传播计算梯度
optimizer.step() # 更新参数
return loss.item(), metric.item()
df_history = pd.DataFrame(columns=['epoch', 'loss', 'acc', 'val_loss', 'val_' + 'acc'])
tokenizer = lambda x:x.split() # 分词器规则
def validate_step(model, inp, targ):
targ_inp = targ[:, :-1]
targ_real = targ[:, 1:]
enc_padding_mask, combined_mask, dec_padding_mask = create_mask(inp, targ_inp)
inp = inp.to(device)
targ_inp = targ_inp.to(device)
targ_real = targ_real.to(device)
enc_padding_mask = enc_padding_mask.to(device)
combined_mask = combined_mask.to(device)
dec_padding_mask = dec_padding_mask.to(device)
model.eval() # 设置eval mode
with torch.no_grad():
# forward
prediction = model(inp, targ_inp, enc_padding_mask, combined_mask, dec_padding_mask)
val_loss = mask_loss_func(targ_real, prediction)
val_metric = mask_accuracy_func(targ_real, prediction)
return val_loss.item(), val_metric.item()
def train_model(model, optimizer, train_dataloader, val_dataloader, model_state):
opt = model_state['opt']
starttime = time.time()
print('*' * 27, 'start training...')
printbar()
best_acc = 0.
for epoch in range(1, opt.max_epochs + 1):
loss_sum = 0.
metric_sum = 0.
for step, (inp, targ) in enumerate(train_dataloader, start=1):
# inp [64, 10] , targ [64, 10]
loss, metric = train_step(model, optimizer, inp, targ)
loss_sum += loss
metric_sum += metric
# 打印batch级别日志
if step % opt.print_trainstep_every == 0:
print('*' * 8, f'[step = {step}] loss: {loss_sum / step:.3f}, {opt.metric_name}: {metric_sum / step:.3f}')
opt.lr_scheduler.step() # 更新学习率
# 一个epoch的train结束,做一次验证
# test(model, train_dataloader)
val_loss_sum = 0.
val_metric_sum = 0.
for val_step, (inp, targ) in enumerate(val_dataloader, start=1):
# inp [64, 10] , targ [64, 10]
loss, metric = validate_step(model, inp, targ)
val_loss_sum += loss
val_metric_sum += metric
# 记录和收集1个epoch的训练(和验证)信息
# record = (epoch, loss_sum/step, metric_sum/step)
record = (epoch, loss_sum/step, metric_sum/step, val_loss_sum/val_step, val_metric_sum/val_step)
df_history.loc[epoch - 1] = record
# 打印epoch级别的日志
print('EPOCH = {} loss: {:.3f}, {}: {:.3f}, val_loss: {:.3f}, val_{}: {:.3f}'.format(
record[0], record[1], 'acc', record[2], record[3], 'acc', record[4]))
printbar()
# 保存模型
current_acc_avg = val_metric_sum / val_step # 看验证集指标
if current_acc_avg > best_acc: # 保存更好的模型
best_acc = current_acc_avg
checkpoint = './save/' + '{:03d}_{:.2f}_ckpt.tar'.format(epoch, current_acc_avg)
if device.type == 'cuda' and opt.ngpu > 1:
model_sd = copy.deepcopy(model.module.state_dict())
else:
model_sd = copy.deepcopy(model.state_dict()) ##################
torch.save({
'loss': loss_sum / step,
'epoch': epoch,
'net': model_sd,
'opt': optimizer.state_dict(),
# 'lr_scheduler': lr_scheduler.state_dict()
}, checkpoint)
print('finishing training...')
endtime = time.time()
time_elapsed = endtime - starttime
print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
return df_history
def test(model, dataloader):
model.eval() # 设置为eval mode
test_loss_sum = 0.
test_metric_sum = 0.
for test_step, (inp, targ) in enumerate(dataloader, start=1):
# inp [64, 10] , targ [64, 10]
loss, metric = validate_step(model, inp, targ)
test_loss_sum += loss
test_metric_sum += metric
# 打印
print('*' * 8,
'Test: loss: {:.3f}, {}: {:.3f}'.format(test_loss_sum / test_step, 'test_acc', test_metric_sum / test_step))
def evaluate(model, inp_sentence, src_vocab, targ_vocab, model_state):
model.eval()
opt = model_state['opt']
inp_sentence_ids: list = tokenzier_encode(tokenizer, inp_sentence, src_vocab) # 单词 -> 索引
encoder_input = torch.tensor(inp_sentence_ids).unsqueeze(dim=0) # (inp_seq_len) -> (1, inp_seq_len)
decoder_input = [targ_vocab.stoi['<start>']] # 初始化,以start字符开头
decoder_input = torch.tensor(decoder_input).unsqueeze(0) # (1, 1)
with torch.no_grad():
for i in range(opt.max_length+2):
enc_padding_mask, combined_mask, dec_padding_mask = create_mask(encoder_input.cpu(), decoder_input.cpu())
# (b, 1, 1, inp_seq_len), (b, 1, targ_seq_len, inp_seq_len), (b, 1, 1, inp_seq_len)
encoder_input = encoder_input.to(device)
decoder_input = decoder_input.to(device)
enc_padding_mask = enc_padding_mask.to(device)
combined_mask = combined_mask.to(device)
dec_padding_mask = dec_padding_mask.to(device)
predictions = model(encoder_input,
decoder_input,
enc_padding_mask,
combined_mask,
dec_padding_mask) # (b, targ_seq_len, target_vocab_size)
prediction = predictions[:, -1:, :] # (b, 1, target_vocab_size)
# torch.argmax()返回张量沿着指定维度最大值的索引,此处是返回预测到的最后一个词在词表中最可能的索引。
prediction_id = torch.argmax(prediction, dim=-1) # (b, 1)
if prediction_id.squeeze().item() == targ_vocab.stoi['<end>']:
return decoder_input.squeeze(dim=0)
# 将预测到的单词添加至decoder_input
decoder_input = torch.cat([decoder_input, prediction_id], dim=-1)
return decoder_input.squeeze(dim=0)
def create_model(opt):
SRC_TEXT = torchtext.legacy.data.Field(sequential=True,
tokenize=tokenizer,
# lower=True,
fix_length=opt.max_length + 2,
preprocessing=lambda x: ['<start>'] + x + ['<end>'],
# after tokenizing but before numericalizing
# postprocessing # after numericalizing but before the numbers are turned into a Tensor
)
TARG_TEXT = torchtext.legacy.data.Field(sequential=True,
tokenize=tokenizer,
# lower=True,
fix_length=opt.max_length + 2,
preprocessing=lambda x: ['<start>'] + x + ['<end>'],
)
# 获取训练集和测试集
train_dataloader, val_dataloader, ds_train = get_datapipe(opt, SRC_TEXT, TARG_TEXT)
# 构建词表
SRC_TEXT.build_vocab(ds_train)
TARG_TEXT.build_vocab(ds_train)
opt.input_vocab_size = len(SRC_TEXT.vocab) # 3901
opt.target_vocab_size = len(TARG_TEXT.vocab) # 2591
model = Transformer(opt.num_layers,
opt.d_word_vec,
opt.num_heads,
opt.d_hidden,
opt.input_vocab_size,
opt.target_vocab_size,
input_seq_len=opt.input_vocab_size,
target_seq_len=opt.target_vocab_size,
dropout=opt.dropout).to(device)
if opt.ngpu > 1:
model = torch.nn.DataParallel(model, device_ids = list(range(opt.ngpu)))
if os.path.exists(opt.model_path):
ckpt = torch.load(opt.model_path)
model.load_state_dict(ckpt['net'])
model_state = {'opt': opt, 'curr_epochs': 0, 'train_steps': 0}
return model, model_state, train_dataloader, val_dataloader, SRC_TEXT.vocab, TARG_TEXT.vocab
def main(opt):
model, model_state, train_dataloader, val_dataloader, src_vocab, targ_vocab = create_model(opt)
print(src_vocab.stoi['pad']) # 查看pad在词表对应的索引
"""
定义Adam优化器
"""
if opt.dynamic_lr: # 使用动态学习率
optimizer = torch.optim.Adam(model.parameters(), lr=0, betas=(0.9, 0.98), eps=1e-9)
opt.lr_scheduler = CustomSchedule(optimizer, opt.d_word_vec, warm_steps=opt.warm_steps)
else:
optimizer = torch.optim.Adam(model.parameters(), lr=opt.lr, betas=(0.9, 0.98), eps=1e-9)
if opt.mode == 'train':
# 开始训练
df_history = pd.DataFrame(columns=['epoch', 'loss', 'acc', 'val_loss', 'val_' + 'acc'])
df_history = train_model(model, optimizer, train_dataloader, val_dataloader, model_state)
print(df_history)
elif opt.mode == 'test':
# 在测试集上测试指标,这里使用val_dataloader模拟测试集
print('*' * 8, 'final test...')
test(model, val_dataloader)
elif opt.mode == 'eval':
# 批量翻译
sentence_pairs = [
['je pars en vacances pour quelques jours .', 'i m taking a couple of days off .'],
['je ne me panique pas .', 'i m not panicking .'],
['je recherche un assistant .', 'i am looking for an assistant .'],
['je suis loin de chez moi .', 'i m a long way from home .'],
['vous etes en retard .', 'you re very late .'],
['j ai soif .', 'i am thirsty .'],
['je suis fou de vous .', 'i m crazy about you .'],
['vous etes vilain .', 'you are naughty .'],
['il est vieux et laid .', 'he s old and ugly .'],
['je suis terrifiee .', 'i m terrified .'],
]
for pair in sentence_pairs:
print('input:', pair[0])
print('target:', pair[1])
pred_result = evaluate(model, pair[0], src_vocab, targ_vocab, model_state)
pred_sentence = tokenzier_decode(pred_result, targ_vocab)
print('pred:', pred_sentence)
print('')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Training Hyperparams')
# data loading params
parser.add_argument('-data_path', help='Path to the preprocessed data',default='./data/')
# network params
parser.add_argument('-d_word_vec', type=int, default=128)
parser.add_argument('-num_layers', type=int, default=4)
parser.add_argument('-num_heads', type=int, default=8)
parser.add_argument('-d_hidden', type=int, default=512)
parser.add_argument('-dropout', type=float, default=0.1)
parser.add_argument('-model_path', default='./save/039_0.85_ckpt.tar', help='如果有训练好的模型参数,可以加载')
# training params
parser.add_argument('-mode', default='eval')
parser.add_argument('-ngpu', type=int, default=1)
parser.add_argument('-dynamic_lr', type=bool, default=True, help='是否使用动态学习率')
parser.add_argument('-warm_steps', type=int, default=4000, help='动态学习率达到最大值所需步数')
parser.add_argument('-lr', type=float, default=0.00001, help='设置学习率,如果使用动态学习率则设置无效')
parser.add_argument('-batch_size', type=int, default=64)
parser.add_argument('-max_epochs', type=int, default=40)
parser.add_argument('-max_length', type=int, default=10, help='为了快速训练而设置的参数,将长度大于max_length的句子筛除')
parser.add_argument('-input_vocab_size', type=int)
parser.add_argument('-target_vocab_size', type=int)
parser.add_argument('-print_trainstep_every', type=int, default=50, help='每50个step做一次打印')
parser.add_argument('-metric_name', default='acc')
opt = parser.parse_args()
main(opt)
更多推荐
所有评论(0)