使用torchtext 库进行文本分类
定义pipeline text_pipeline = lambda x : vocab(tokenizer(x)) label_pipeline = lambda x : int(x) - 1>> > # 可选 >> > text_pipeline('here is the an example') [ 475 , 21 , 2 , 30 , 5297 ] >> > label_pipeline(
文章目录
1. 使用的数据集(Dataset)
import torch
from torchtext.datasets import AG_NEWS
train_iter = AG_NEWS(split='train')
AG_NEWS是torchtext.datasets里面的一个数据集。AG_NEWS介绍
让我们看看train_iter里面的内容,train_iter是一个迭代器类型,使用next()函数,就可以打印出一个实例。
train_iter中的每一个元素,是(label,text)的结构。
2. 处理管道(Processing pipeline)
生成词汇表
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
tokenizer = get_tokenizer('basic_english')
train_iter = AG_NEWS(split='train')
def yield_tokens(data_iter):
for _, text in data_iter:
yield tokenizer(text)
vocab = build_vocab_from_iterator(yield_tokens(train_iter), specials=["<unk>"])
vocab.set_default_index(vocab["<unk>"])
上面的过程:
- 得到一个可以处理英文的tokenizer。
- 得到AG_NEWS数据集中的训练集train_iter,它是iterable类型的。
- 得到vocab类的一个实例,并且设置了默认的索引,即未知词""的索引。 注意:
- get_tokenizer: 当我们指定参数(‘basic_english’),就可以得到对应的分词器tokenizer 。
- 如何理解tokenizer? 从命名出发,我们可以知道它就是让文本(text)变成token,可以看下面的例子。
- build_vocab_from_iterator:根据给定的迭代器yield_tokens(train_iter),来构造一个Vocab对象。具体的Vocab类的介绍Vocab类
>>> # 可选
>>> sentence = "i have a apple"
>>> sentence_token = tokenizer(sentence)
>>> sentence_token
['i', 'have', 'a', 'apple']
>>> # 使用了Vocab类的forward方法:
>>> # forward(tokens: List[str]) → List[int]
>>> vocab(sentence_token)
[282, 39, 5, 295]
2.定义pipeline
# 定义pipeline
text_pipeline = lambda x : vocab(tokenizer(x))
label_pipeline = lambda x : int(x) - 1
>>> # 可选
>>> text_pipeline('here is the an example')
[475, 21, 2, 30, 5297]
>>> label_pipeline('10')
9
3.设置Dataloader
from torch.utils.data import DataLoader
device = torch.device("cuda" if torch.cuda.is_available() else 'cpu')
def collate_batch(batch):
label_list, text_list, offsets = [], [], [0]
for (_label, _text) in batch:
label_list.append(label_pipeline(_label))
processed_text = torch.tensor(text_pipeline(_text), dtype=torch.int64)
text_list.append(processed_text)
offsets.append(processed_text.size(0))
label_list = torch.tensor(label_list, dtype=torch.int64)
offsets = torch.tensor(offsets[:-1]).cumsum(dim=0)
text_list = torch.cat(text_list)
return text_list.to(device), label_list.to(device), offsets.to(device)
train_iter = AG_NEWS(split='train')
dataloader = DataLoader(train_iter, batch_size=8, shuffle=False, collate_fn=collate_batch)
上面过程:
1.定义了collate_batch函数,对批数据batch进行处理,最后返回三个list,分别是text_list、label_list、offsets。
2.得到了dataloader,从命名上它就是对数据进行加载的。换句话,它一次迭代可以得到一批数据batch,然后对 batch调用collate_batch函数,得到3个list。 注意:
3.train_iter一次迭代,只返回一条(label,text)。 当放到DataLoader里面,并且batch_size=8,我们对dataloader一次迭代,就可以返回8条(label,text),计做batch。
4.collate_fn就是对这个batch重新处理,这里定义的collate_fn就是返回三个list,分别是text、label、offsets。
4.定义模型
from torch import nn
class TextClassificationModel(nn.Module):
def __init__(self, vocab_size, emb_dim, num_label):
super(TextClassificationModel, self).__init__()
self.embedding = nn.EmbeddingBag(vocab_size, emb_dim, sparse=True)
self.FC = nn.Linear(emb_dim, num_label)
self.init_weights()
def init_weights(self,):
initrange = 0.5
self.embedding.weight.data.uniform_(-initrange, initrange)
self.FC.weight.data.uniform_(-initrange, initrange)
self.FC.bias.data.zero_()
def forward(self, text, offsets):
embedded = self.embedding(text, offsets)
return self.FC(embedded)
train_iter = AG_NEWS(split='train')
num_class = len(set([label for (label, text) in train_iter]))
vocab_size = len(vocab)
emsize = 64
model = TextClassificationModel(vocab_size, emsize, num_class).to(device)
模型介绍:
- 模型一共只有两层,第一层是embdding层,把输入的text,映射成emb_dim维度的向量。
- 这里使用的nn.EmbeddingBag,即默认对多个词的embedding取平均值。
- 第二层是映射层,将embeded之后的数据映射到num_label个标签中去。
注意:
在forward中,我们需要text和offsets,要text很好理解,就是输入的文本。这里的offsets就是每条(label, text)开始的下标。这样的话,就不需要将所有的句子补齐到相同长度。
>>> 可选
>>> # an EmbeddingBag module containing 10 tensors of size 3
>>> embedding_sum = nn.EmbeddingBag(10, 3, mode='sum')
>>> # a batch of 2 samples of 4 indices each
>>> input = torch.tensor([1,2,4,5,4,3,2,9], dtype=torch.long)
>>> offsets = torch.tensor([0,4], dtype=torch.long)
>>> embedding_sum(input, offsets)
tensor([[-0.8861, -5.4350, -0.0523],
[ 1.1306, -2.5798, -1.0044]])
5. 定义训练过程
import time
def train(model, dataloader):
model.train()
total_acc = 0
start_time = time.time()
log_interval = 500
total_count = 0
for idx, (texts, labels, offsets) in enumerate(dataloader):
optimizer.zero_grad()
pred_labels = model(texts, offsets)
loss = criterion(pred_labels, labels)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 0.1)
optimizer.step()
total_acc += (pred_labels.argmax(1) == labels).sum().item()
total_count += labels.size(0)
if idx % log_interval == 0 and idx >0:
elapsed = time.time() - start_time
print('| epoch {:3d} | {:5d}/{:5d} batches '
'| accuracy {:8.3f}'.format(epoch, idx, len(dataloader),
total_acc/total_count))
total_acc, total_count = 0, 0
start_time = time.time()
def evaluate(model, dataloader):
model.eval()
total_acc, total_count = 0, 0
with torch.no_grad():
for idx, (texts, labels, offsets) in enumerate(dataloader):
pred_label = model(texts, offsets)
loss = criterion(pred_label, labels)
total_acc += (pred_label.argmax(1) == labels).sum().item()
total_count += labels.size(0)
return total_acc / total_count
训练过程:
1.设置记录的数据,正确个数total_acc,和总数total_count,准确率是total_acc / total_count。
2.log_interval是打印的间隔,start_time来计算时间花费。
3.循环获得dataloader的batch数据,对每一批数据训练:
optimizer.zero_grad()梯度清空。
pred_labels = model(texts, offsets)获得模型结果
loss = criterion(pred_labels, labels)计算损失值
loss.backward() ,计算梯度
optimizer.step(),对模型权重更新。
模型评估过程:
与训练过程类似,不需要更新模型权重,最终返回模型的准确率。
6.进行训练
from torch.utils.data.dataset import random_split
from torchtext.data.functional import to_map_style_dataset
# Hyperparameters
EPOCHS = 10 # epoch
LR = 5 # learning rate
BATCH_SIZE = 64 # batch size for training
# 定义优化器和损失函数
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=LR)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.1)
train_iter, test_iter = AG_NEWS()
train_dataset = to_map_style_dataset(train_iter)
test_dataset = to_map_style_dataset(test_iter)
num_train = int(len(train_dataset) * 0.95)
split_train_, split_valid_ = \
random_split(train_dataset, [num_train, len(train_dataset) - num_train])
train_dataloader = DataLoader(split_train_, batch_size=BATCH_SIZE,
shuffle=True, collate_fn=collate_batch)
valid_dataloader = DataLoader(split_valid_, batch_size=BATCH_SIZE,
shuffle=True, collate_fn=collate_batch)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE,
shuffle=True, collate_fn=collate_batch)
total_accu = None
for epoch in range(1, EPOCHS + 1):
epoch_start_time = time.time()
train(model, train_dataloader)
accu_val = evaluate(model, valid_dataloader)
if total_accu is not None and total_accu > accu_val:
scheduler.step()
else:
total_accu = accu_val
print('-' * 59)
print('| end of epoch {:3d} | time: {:5.2f}s | '
'valid accuracy {:8.3f} '.format(epoch,
time.time() - epoch_start_time,
accu_val))
print('-' * 59)
上面过程:
1.设置超参数,循环次数EPOCHS,学习率LR,批大小BATCH_SIZE。
2.定义损失函数criterion,优化器optimizer,学习率调动器scheduler
3.获取训练集train_dataset和测试集test_dataset,将训练集分为训练集和验证集。
4.对数据集进行EPOCHS次训练。
print('Checking the results of test dataset.')
accu_test = evaluate(model, test_dataloader)
print('test accuracy {:8.3f}'.format(accu_test))
更多推荐
所有评论(0)