目录

1.seq2seq训练代码

2.预测代码 

 3.评估代码

 4.知识点个人理解


 

1.seq2seq训练代码

seq2seq的训练代码:pytorch中训练代码一般都相同类似

#将无效的序列数据都变成0(屏蔽无效内容的部分)
def sequence_mask(X, valid_len, value=0):
    """
    valid_len:有效序列的长度
    """
    #找到最大序列长度
    maxlen = X.size(1)
    #判断掩码区域
    mask = torch.arange((maxlen), dtype=torch.float32, device=X.device)[None] < valid_len[:, None]
    #[~mask]表示取相反的数据(取原本为False的数据)
    X[~mask] = value
    return X

#重写交叉熵损失, 添加屏蔽无效内容的部分

class MaskedSoftmaxCELoss(nn.CrossEntropyLoss):
    #重写forward
    #预测值pred的形状:(batch_size, num_steps, vocab_size)
    #真实值label的形状:(batch_size, num_steps)
    #valid_len的形状:(batch_size)
    def forward(self, pred, label, valid_len):
        #创建一个像label形状的全是1的tensor,赋值给初始权重
        weights = torch.ones_like(label)
        #使用掩码,将无效的序列内容屏蔽(其权重变为0),重新赋值
        weights = sequence_mask(weights, valid_len)
        #设置不聚合维度
        self.reduction = 'none'
        #调用原始的forward()来计算未屏蔽无效内容前的交叉熵损失
        #pred的shape使用permute()转换,将num_steps换到最后
        unweighted_loss = super().forward(pred.permute(0, 2 ,1), label)
        #用unweighted_loss * 屏蔽后的weights 求平均:每一批数据的交叉熵损失
        weighted_loss = (unweighted_loss * weights).mean(dim=1)
        return weighted_loss



#测试重写的交叉熵损失
loss = MaskedSoftmaxCELoss()
loss(torch.ones(3, 4, 10), torch.ones((3, 4), dtype=torch.long), torch.tensor([4, 2, 0]))
tensor([2.3026, 1.1513, 0.0000])
#训练代码


def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):
    #初始化  RNN网络的xavier初始化的代码都一样
    def xavier_init_weights(m):
        #判断模型是线性模型时
        if type(m) == nn.Linear:
            nn.init.xavier_uniform_(m.weight)
        #若模型为GRU(2层循环层)模型时
        if type(m) == nn.GRU:
            #遍历每一层的权重参数名称
            for param in m._flat_weights_names:
                #若权重在参数中
                if 'weight' in param:
                    nn.init.xavier_uniform_(m._parameters[param])
    #网络应用xavier初始化的权重
    net.apply(xavier_init_weights)
    #网络转到device上
    net.to(device)
    #用网络初始化的参数与学习率设置优化器
    optimizer = torch.optim.Adam(net.parameters(), lr=lr)
    #创建损失函数的实例对象
    loss = MaskedSoftmaxCELoss()
    #设置实时更新的画图可视化dltools.Animator()
    animator = dltools.Animator(xlabel='epoch', ylabel='loss', xlim=[10, num_epochs])
    
    for epoch in range(num_epochs):
        timer = dltools.Timer()   #训练数据的计时
        metric = dltools.Accumulator(2) #累加统计两种数值:训练的总损失, 词元数量
        
        for batch in data_iter:  #遍历数据迭代器的批次
            #梯度清零(只要在反向传播之前就行)
            optimizer.zero_grad()
            #取数据
            X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch]
            bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0], device=device).reshape(-1,1)
            # 开头加上了bos, 那么Y就要去掉最后一列, 保证序列的长度不变. 
            dec_input = torch.cat([bos, Y[:, :-1]], 1)  #给每一行都加上bos
            #获取预测值,state不接收
            Y_pred, _ = net(X, dec_input, X_valid_len)
            #计算损失
            l = loss(Y_pred, Y, Y_valid_len)  #Y_valid_len属于*args其他位置参数传入的
            #反向传播
            l.sum().backward()
            #梯度裁剪
            dltools.grad_clipping(net, theta=1)
            num_tokens = Y_valid_len.sum()
            #更新梯度
            optimizer.step()
            with torch.no_grad():  #不求导
                metric.add(l.sum(), num_tokens)
            
        if (epoch+1) % 10 ==0:  #若每训练循环10次
            animator.add(epoch+1, (metric[0]/ metric[1]))
    print(f'loss {metric[0]/ metric[1]:.3f}, {metric[1] / timer.stop():.1f}', f'tokens/sec on {str(device)}')
#验证封装的训练代码
embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1
batch_size, num_steps = 64, 10
lr, num_epochs, device = 0.005, 500, dltools.try_gpu()

train_iter, src_vocab, tgt_vocab = dltools.load_data_nmt(batch_size, num_steps)

encoder = Seq2SeqEncoder(len(src_vocab), embed_size, num_hiddens, num_layers, dropout)

decoder = Seq2SeqDecoder(len(tgt_vocab), embed_size, num_hiddens, num_layers, dropout)

net = EncoderDecoder(encoder, decoder)

train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)

 

2.预测代码 

def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps, device):
    """
    src_sentence:传入的需要翻译的句子
    src_vocab:需要翻译的词汇表
    tgt_vocab:目标真实值词汇表
    num_steps:子序列长度
    device:GPU或CPU设备
    """
    #预测的时候需要把net设置为评估模式
    net.eval()
    #获取处理后的文本词元索引(输出是一个索引列表),在文本的结尾加上'<eos>'
    src_tokens = src_vocab[src_sentence.lower().split(' ')] + [src_vocab['<eos>']]
    #获取编码器输入内容src_tokens的有效长度,转化为tensor(用列表创建tensor)
    enc_valid_len = torch.tensor([len(src_tokens)], device=device)
    #处理src_tokens太长/太短的问题:截断或者补充pad ,  num_steps表示隔多长截断一次, 覆盖赋值
    src_tokens = dltools.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])
    
    #给src_tokens增加一个维度来表示批次,  获取enc_X 编码器的输入数据
    enc_X = torch.unsqueeze(torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0)
    #向网络的编码器中传入enc_X, enc_valid_len,获取编码器的输出结果
    enc_outputs = net.encoder(enc_X, enc_valid_len)
    #将编码器的输出结果enc_output和有效长度enc_valid_len传入解码器中,获取解码器的输出结果初始化状态dec_state
    dec_state = net.decoder.init_state(enc_outputs, enc_valid_len)
    
    #给预测结果也提前添加一个维度,   tgt_vocab预测词汇表的第一个词应该是文本开头的bos
    dec_X = torch.unsqueeze(torch.tensor([tgt_vocab['<bos>']], dtype=torch.long, device=device), dim=0)
    
    output_seq = []
    for _ in range(num_steps):  #循环子序列长度次数
        #将dec_state, dec_X输入网络的解码器中
        Y, dec_state = net.decoder(dec_X, dec_state)
        #将Y重新赋值给dec_X,实现循环输入
        dec_X = Y.argmax(dim=2)  #将Y的vocab_size对应的索引2维度聚合找最大值(预测的值)
        
        #获取预测值:将dec_X去掉一个batc_size维度(此时batc_size=1,就一批数据,可以不要这个维度)
        pred = dec_X.squeeze(dim=0).type(torch.int32).item()
        #判断结束的条件
        if pred == tgt_vocab['<eos>']:
            break
        output_seq.append(pred)
    #返回值:按照索引返回对应词表中的词
    return ' '.join(tgt_vocab.to_tokens(output_seq))

 3.评估代码

seq2seq的评估指标: BLEU: bilingual evaluation understudy 双语互译质量评估辅助工具

def bleu(pred_seq, label_seq, k):
    """
    pred_seq:预测序列
    label_seq:真实序列
    k: 设定几元连续
    """
    #pred_seq, label_seq预测与目标序列的空格分隔处理(分词)
    pred_tokens, label_tokens = pred_seq.split(' '), label_seq.split(' ')
    #获取预测词与目标词的长度
    len_pred, len_label = len(pred_tokens), len(label_tokens)
    #计算bleu的左边部分_比较最小值
    score = math.exp(min(0, 1 - (len_label / len_pred)))
    for n in range(1, k + 1):  #range左闭右开   #分几元连续的情况
        #赋值 ,  #num_matches:预测值与目标值匹配的数量, 
        #collections.defaultdict(int)创建了一个默认值为int的字典  label_subs
        num_matches, label_subs = 0, collections.defaultdict(int)
        #循环连续词元的数量
        for i in range(len_label - n + 1):
            #若预测的词能与目标值匹配上
            label_subs[' '.join(label_tokens[i: i + n])] += 1
        
        for i in range(len_pred - n + 1):
            #若能匹配上
            if label_subs[' '.join(pred_tokens[i: i + n])] > 0:
                num_matches += 1  #匹配数+1
                label_subs[' '.join(pred_tokens[i: i + n])] -= 1
        score *=  math.pow(num_matches / (len_pred - n + 1), math.pow(0.5, n)) 
    return score
   
# 开始预测
engs = ['go .', 'i lost .', 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
for eng, fra in zip(engs, fras):
    translation = predict_seq2seq(net, eng, src_vocab, tgt_vocab, num_steps, device)
    print(f'{eng} => {translation}, bleu {bleu(translation, fra, k=2):.3f}')
go . => va !, bleu 1.000
i lost . => j'ai perdu perdu ., bleu 0.783
he's calm . => il <unk> gagné suis perdu ., bleu 0.000
i'm home . => je suis chez nous <unk> !, bleu 0.562

 4.知识点个人理解

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐