提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

一、Dataset

class myDatasets(Dataset):
    def __init__(self, split):
        super().__init__()
        self.dataset = []
        # 加载数据集
        self.original_data = datasets.load_dataset("csv", data_dir="./", data_files="data.csv")[split]
        # 标签    O\PAD:5, S-part:1, M-part:2, S-tool:3, M-tool:4
        self.label_index_dict = {'part': 1, 'tool': 3}
        for data in self.original_data:
            new_label = self.format_label(data['text'], data['label'])
            self.dataset.append((list(data['text']), new_label))

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, item):
        return self.dataset[item]

    # 处理其他位置的标签
    # 我们导入的标签只是描述了从哪个位置到哪个位置是part或者是tool,我们需要将中间部分也标上标签
    def format_label(self, text, label):
        label = json.loads(label)
        new_label = [0] * len(text)
        # 枚举当前文字的每个标签
        for single_label in label:
            # 取出标签名
            label_name = single_label['labels'][0]
            # 取出标签名的开头的索引
            label_index = self.label_index_dict[label_name]
            # 处理开头,将索引放置在标签上
            new_label[single_label['start']] = label_index
            # 处理中间部分
            for count in range(single_label['start'] + 1, single_label['end']):
                new_label[count] = label_index + 1
        return new_label


def collate_fn(data):
    tokens = [i[0] for i in data]
    labels = [i[1] for i in data]
    # 对tokens进行编码
    tokenizer = AutoTokenizer.from_pretrained('hfl/rbt6')
    inputs = tokenizer.batch_encode_plus(tokens, truncation=True, padding=True, return_tensors='pt',
                                         is_split_into_words=True)
    # 获取到一批数据的最长的长度
    lens = len(inputs['input_ids'][0])
    for num in range(len(labels)):
        labels[num] = [5] + labels[num]
        labels[num] += [5] * lens
        # 截取到最长的长度即可
        labels[num] = labels[num][:lens]
    return inputs, torch.LongTensor(labels)

二、Transformer

  • 由于只需要实现分类任务,因此只采用了Transformer的Encoder部分
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import datasets
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer, AutoModel
import math
import json


# 计算自注意力,返回计算后的新的词向量以及注意力得分
class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super(ScaledDotProductAttention, self).__init__()

    def forward(self, Q, K, V, attn_mask):
        # Q_shape: (batch_size, n_heads, src_len, d_k)
        # K_shape: (batch_size, n_heads, src_len, d_k)
        # V_shape: (batch_size, n_heads, src_len, d_v)
        # 先计算相关性
        scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k)

        # 将输入词编码是0的位置设置成负无穷大
        scores.masked_fill_(attn_mask, -1e9)
        # 计算softmax,也就是相关性得分
        attn = nn.Softmax(dim=-1)(scores)
        # context_shape: (batch_size, n_heads, src_len, d_v)
        context = torch.matmul(attn, V)
        return context, attn


# 多头注意力计算
class MultiHeadAttention(nn.Module):
    def __init__(self):
        super(MultiHeadAttention, self).__init__()
        # 输入进来的QKV是相等的,都是输入的词向量(batch_size, src_len, d_model)
        # 通过线性层将其映射到(batch_size, src_len, d_q * n_heads),其中d_q与d_k是相等的
        # 通过线性层将其映射到(batch_size, src_len, d_k * n_heads)
        # 通过线性层将其映射到(batch_size, src_len, d_v * n_heads)
        self.W_Q = nn.Linear(d_model, d_k * n_heads)
        self.W_K = nn.Linear(d_model, d_k * n_heads)
        self.W_V = nn.Linear(d_model, d_v * n_heads)
        self.linear = nn.Linear(n_heads * d_v, d_model)
        self.layer_norm = nn.LayerNorm(d_model)

    def forward(self, Q, K, V, attn_mask):
        # 残差Q
        residual, batch_size = Q, Q.size(0)
        # 计算q, k, v
        q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,
                                                                       2)  # shape: (batch_size, n_heads, src_len, d_k)
        k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,
                                                                       2)  # shape: (batch_size, n_heads, src_len, d_k)
        v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1,
                                                                       2)  # shape: (batch_size, n_heads, src_len, d_v)
        # shape: (batch_size, src_len, src_len) => (batch_size, n_heads, src_len, src_len)
        attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)

        # attn就是注意力得分
        # context就是计算完注意力的结果
        # 计算自注意力
        context, attn = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask)
        # 改变一下context的维度排列,方便后续线性层直接映射到原词向量的维度
        # (batch_size, n_heads, src_len, d_v) => (batch_size, src_len, n_heads * d_v)
        context = context.transpose(1, 2).contiguous().view(batch_size, -1,
                                                            n_heads * d_v)
        # 线性层映射
        # (batch_size, src_len, n_heads * d_v) => (batch_size, src_len, d_model)
        output = self.linear(context)
        # 最后残差相加并层归一化后返回
        return self.layer_norm(output + residual), attn


# 前馈神经网络
class PoswiseFeedForwardNet(nn.Module):
    def __init__(self):
        super(PoswiseFeedForwardNet, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
        self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
        self.layer_norm = nn.LayerNorm(d_model)

    def forward(self, inputs):
        # 残差
        residual = inputs
        # inputs.transpose(1, 2) 用来方便卷积计算,将维度提前一个位置
        #   shape: (batch_size, src_len, d_model) => (batch_size, d_model, src_len)
        # shape: (batch_size, d_model, src_len) => (batch_size, d_ff, src_len)
        output = nn.ReLU()(self.conv1(inputs.transpose(1, 2)))
        # self.conv2(output)
        #   shape: (batch_size, d_ff, src_len) => (batch_size, d_model, src_len)
        # self.conv2(output).transpose(1, 2)
        #   shape: (batch_size, d_model, src_len) => (batch_size, src_len, d_model)
        output = self.conv2(output).transpose(1, 2)
        # 最后残差相加并层归一化后返回
        return self.layer_norm(output + residual)


# 该方法用来对句子中是PAD的部分进行Mask处理,保证在计算注意力时不予考虑
# 可以是计算自注意力时,也可以是计算交互注意力时
# 计算自注意力时,输入参数相同,都是编码词向量前的词编码,shape: (batch_size, src_len)
#   找到PAD(词编码为0)的位置,将其设置为True,后续计算得分时直接设置为负无穷大,softmax的结果就是趋近于0
#   最终的shape: (batch_size, src_len, src_len)
# 计算交互注意力时,输入的参数是解码端输入的词编码,以及编码端输入的词编码
#   注意:解码端是作为查询Q,编码端是作为K,我们是考虑Q对K的注意力,换句人话讲就是我们对于解码端查询的这个词
#        想要知道与编码端的每一个词的相关程度,因此只需要对编码端的K进行Mask
#   最终的shape: (batch_size, dec_src_len, enc_src_len)
def get_attn_pad_mask(seq_q, seq_k):
    batch_size, len_q = seq_q.size()
    batch_size, len_k = seq_k.size()
    # eq(0) 用来判断seq_k中有多少是等于0的,返回的是一个bool类型的tensor
    # 自注意力:
    #   shape: (batch_size, src_len) => (batch_size, 1, src_len)
    # 交互注意力:
    #   shape: (batch_size, enc_src_len) => (batch_size, 1, enc_src_len)
    pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)
    # 自注意力
    #   shape: (batch_size, 1, src_len) => (batch_size, src_len, src_len)
    # 交互注意力
    #   shape: (batch_size, 1, enc_src_len) => (batch_size, dec_src_len, enc_src_len)
    return pad_attn_mask.expand(batch_size, len_q, len_k)


# 位置编码
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        # 将其加入缓冲区,不进行参数的更新
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x: (src_len, batch_size, d_model)
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)


class EncoderLayer(nn.Module):
    def __init__(self):
        super(EncoderLayer, self).__init__()
        # 多头注意力机制
        self.enc_self_attn = MultiHeadAttention()
        # 前馈神经网络(残差),可以理解为进一步提取抽离更高维度的特征
        self.pos_ffn = PoswiseFeedForwardNet()

    def forward(self, enc_inputs, enc_self_attn_mask):
        # 计算自注意力
        # 输入的QKV都是编码后的词向量或者上一层Encoder的输出,最后一个输入参数是PAD的Mask
        # 输出 shape: (batch_size, src_len, d_model)
        enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs,
                                               enc_self_attn_mask)
        # 输出 shape: (batch_size, src_len, d_model)
        enc_outputs = self.pos_ffn(enc_outputs)
        return enc_outputs, attn


class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        # 编码词表到词向量
        #   第一个参数表示词表的大小,就是词编码的最大编码值
        #   第二个参数就是编码后词向量的维度,一般是512
        self.src_emb = nn.Embedding(20000, d_model)
        # 添加位置编码
        self.pos_emb = PositionalEncoding(d_model)
        # 堆叠Encoder层
        self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])

    def forward(self, enc_inputs):
        # 对词进行编码成词向量,每个词都会编码成d_model维度的向量,换句人话说就是将数字索引映射成d_model维度的向量
        # shape: (batch_size, src_len) => (batch_size, src_len, d_model)
        enc_outputs = self.src_emb(enc_inputs)
        # 添加位置编码(生成的位置编码直接和词向量相加)
        # shape: (batch_size, src_len, d_model)
        enc_outputs = self.pos_emb(enc_outputs.transpose(0, 1)).transpose(0, 1)
        # get_attn_pad_mask是为了得到句子中pad的位置信息,在计算自注意力和交互注意力的时候去掉pad符号的影响
        enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs)
        enc_self_attns = []
        for layer in self.layers:
            # 堆叠Encoder层
            enc_outputs, enc_self_attn = layer(enc_outputs, enc_self_attn_mask)
            # 获取到每层的自注意力得分
            enc_self_attns.append(enc_self_attn)
        return enc_outputs, enc_self_attns


class Transformer(nn.Module):
    def __init__(self):
        super(Transformer, self).__init__()
        # 编码层
        self.encoder = Encoder()
        # 全连接层映射分类类别
        #   d_model 就是我们embedding词向量的维度,一般是512
        #   classes_num 就是我们最终要实现分类的类别的数量
        self.projection = nn.Linear(d_model, classes_num, bias=False)

    def forward(self, enc_inputs):
        # 输入:
        #   enc_inputs shape: (batch_size, src_len)
        # 输出:
        #   enc_outputs 经过多层Encoder处理后的更加健壮的词向量 shape: (batch_size, src_len, d_model)
        #   enc_self_attns 自注意力得分,表示单词与单词之间的相关性(batch_size, src_len, src_len)
        enc_outputs, enc_self_attns = self.encoder(enc_inputs)

        # enc_outputs做映射到类别数量
        # shape: (batch_size, src_len, classes_num)
        enc_logits = self.projection(enc_outputs)
        # 返回值reshape成(batch_size * src_len, classes_num)
        return enc_logits.view(-1, enc_logits.size(-1)), enc_self_attns

三、辅助函数

# 可视化测试结果
def format_output_validation(tokenizer, inputs, outputs, labels):
    for cnt, output in enumerate(outputs):
        original_text = ""
        label_text = ""
        output_text = ""
        # 将inputs的PAD去除
        new_inputs = inputs['input_ids'][cnt][inputs['input_ids'][cnt] != 0]
        for id, out_val in enumerate(new_inputs):
            original_text += tokenizer.decode(new_inputs[id])
            # 当前位置输出是0的部分用*显示
            if not output[id]:
                output_text += '*'
            # 当前位置输出不是0的,解码显示
            else:
                output_text += tokenizer.decode(new_inputs[id])
            # 显示原标签
            if not labels[cnt][id]:
                label_text += '*'
            else:
                label_text += tokenizer.decode(new_inputs[id])
        print("原句子:         ", original_text)
        print("标签:           ", label_text)
        print("Transformer输出:", output_text)


def process(model, type):
    if type == 'train':
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        min_loss = 1e9
        for epoch in range(200000):
            for idx, (inputs, labels) in enumerate(dl):
                optimizer.zero_grad()
                outputs, attns = model(inputs['input_ids'])
                loss = criterion(outputs, labels.contiguous().view(-1))
                print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
                if (loss < min_loss):
                    torch.save(model, 'my_best_model.model')
                    min_loss = loss
                loss.backward()
                optimizer.step()
    else:
        model = torch.load(r"./my_best_model.model")
        tokenizer = AutoTokenizer.from_pretrained('hfl/rbt6')
        for idx, (inputs, labels) in enumerate(dl):
            outputs, _ = model(inputs['input_ids'])
            outputs = outputs.argmax(dim=1).unsqueeze(0)
            format_output_validation(tokenizer, inputs, outputs, labels)

四、主函数

if __name__ == '__main__':
    # 类别总数
    classes_num = 6
    dt = myDatasets('train')
    dl = DataLoader(dt, batch_size=1, shuffle=True, collate_fn=collate_fn, drop_last=True)

    # 模型参数
    d_model = 512  # Embedding Size
    d_ff = 2048  # FeedForward dimension
    d_k = d_v = 64  # dimension of K(=Q), V
    n_layers = 6  # number of Encoder of Layer
    n_heads = 8  # number of heads in Multi-Head Attention

    model = Transformer()

    process(model, 'test')
Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐