0. Abstract

最近在看一个用 RNNs 网络做 Translation 任务的程序, 关于数据处理部分, 主要用到工具包 sentencepiecefairseq, 前者主要是对文本进行分词处理, 后者则是对已分词的文本进行二进制化快速加载. 包越方便使用, 就说明包装得越狠, 也就越令人一头雾水, 本文简要记录学习过程.

1. Installation

pip install sentencepiece
pip install fairseq

注意, Windows 下安装 fairseq 需要 Visual C++ 构建工具, 否则会报错:

error: Microsoft Visual C++ 14.0 or greater is required. Get it with "Microsoft C++ Build Tools": https://visualstudio.microsoft.com/visual-cpp-build-tools/"

去官网下载 Microsoft C++ Build Tools 并安装就好了, 安装时须选择以下组件:

[注]: 不建议在 Windows 下跑这些程序, 很多包在 Windows 下问题太多, 甚至不支持. Microsoft C++ Build Tools 会占用 C 盘好几个 G (即使安装在其他盘).

小结: 建议在 Linux 系统下玩这些程序.

2. 用 sentencepiece 分词

翻译任务的数据集一般是这样的:


以中英文翻译任务为例, 有 {train/test}.{en/zh} 共 4 个文件, 文件中每一行代表一段文字, 中英文件内相同的行是彼此对应的.

2.1 字符全角转半角

这些数据内容是原始的 (raw), 首先要做的是进行数据清理: 字符编码统一化, 去除过长或过短的文本. “字符编码统一化” 值得记录一下:

def _str_q2b(ustring: str):
	"""
	把字串全形轉半形, 參考來源: https://ithelp.ithome.com.tw/articles/10233122;
	全形(全角): 中文标点符号, 有时中文输入法输入英文字母也是全形, 类似 word 中输入英文, 字体是宋体
	东亚文字一般是全形, 而西文字母和数字是半形(且对应有全形值). 
	那么为了使相同的字符在后端表示统一, 则把 65281 <= unicode <= 65374 范围内的全形字符转换到半形. 
	"""
	rchars = []
	for uchar in ustring:
		unicode = ord(uchar)  # unicode 码
		if unicode == 12288:  # 全形空格直接轉換
			unicode = 32
		elif 65281 <= unicode <= 65374:  # 全形字元(除空格)根據關係轉化
			unicode -= 65248
		rchars.append(chr(unicode))
	return ''.join(rchars)

关于全角和半角:

print('A', 'a', '0')
print(
	chr(ord('A') + 65248),
	chr(ord('a') + 65248),
	chr(ord('0') + 65248)
)

会输出:

小结: Unicode 字符有全半角之分, 数据预处理时需先进行全角转半角, 统一化.

2.2 训练分词模型
spm.SentencePieceTrainer.train(
	input=','.join([
		f'{self._path_clean}/train.{self._lang_src}',
		f'{self._path_clean}/train.{self._lang_tgt}'
	]),
	# will get 2 files: model_prefix.model, model_prefix.vocab
	model_prefix=self._path_tokens / f'spm{vocab_size}',
	vocab_size=vocab_size,
	character_coverage=1,
	model_type='unigram',  # 'bpe' 也可
	input_sentence_size=1e6,
	shuffle_input_sentence=True,
	normalization_rule_name='nmt_nfkc_cf'
)

这段代码展示了如何使用 sentencepiece 库中的 SentencePieceTrainer.train() 方法来训练一个 SentencePiece 分词模型, 特别适用于机器翻译任务, 其中涉及到两种不同的语言: [来自通义千问]

  • input: 指定了用于训练模型的文本文件. 在这个例子中, 使用的是源语言和目标语言的训练数据, 它们被拼接成一个字符串并用逗号分隔;
  • model_prefix: 定义输出模型文件的前缀. 模型训练后会生成两个文件: 一个是 .model 模型文件, 另一个是 .vocab词汇表文件;
  • vocab_size: 指定了词汇表的大小, 即模型将学习的词汇单元数量. 较大的词汇表可能会提供更好的表达能力, 但也可能导致过拟合或增加计算成本;
  • character_coverage: 设置为 1 表示模型应该覆盖输入文本中的所有字符. 较小的值(如 0.995)意味着模型将忽略最不常见的 0.5% 的字符, 这可能有助于减少词汇表的大小;
  • model_type: 设置为 ‘unigram’ 意味着模型将使用 Unigram 算法进行训练. 另一种选择是 ‘bpe’, 即 Byte-Pair Encoding, 也是一个常用的子词分割算法;
  • input_sentence_size: 控制着用于训练的句子数量. 1e6 表示一百万句. 如果数据集非常大, 你可能想要抽样一定数量的句子以加快训练速度;
  • shuffle_input_sentence: 如果设置为 True, 则在训练过程中输入句子会被随机打乱, 有助于提高模型的泛化能力;
  • normalization_rule_name: 这个参数确定了如何规范化输入文本. 'nmt_nfkc_cf' 是一个预定义的规则, 通常用于机器翻译任务, 它会执行一些文本清洗操作, 如转换为半角字符等. 默认值是 'identity': 不会应用任何文本规范化规则.

呃!!! 前面还特意搞了一下字符转换, 可能多虑了. 那就看一下在 normalization_rule_name='nmt_nfkc_cf' 的情况下不进行半角转换, 模型会不会自动转换字符吧:

input=','.join([
	f'{self._path_raw}/train.{self._lang_src}',  # 使用 raw 中的数据进行分词
	f'{self._path_raw}/train.{self._lang_tgt}'
]),

直接使用 raw 文件夹下的文本文件进行分词, 果然发现 spm.SentencePieceTrainer.train() 进行了半角转换:

没骗我们, 逗点变成英文的了, 但引号没有, 因为中文引号和英文引号不是全角和半角的关系.

小结: 使用 sentencepiece 库中的 SentencePieceTrainer.train() 方法可训练一个 SentencePiece 分词模型, 输入train.{en/zh} 文件, 输出为两个文件: model_prefix.model, model_prefix.vocab.

2.3 加载分词模型进行分词
spm_model = spm.SentencePieceProcessor()
spm_model.load(str(self._path_tokens / f'spm{vocab_size}.model'))
for file_name, lang in itertools.product(['train', 'valid', 'test'], [self._lang_src, self._lang_tgt]):
	out_path = self._path_tokens / f'{file_name}.{lang}'
	if not re_tokenize and out_path.exists():
		print(f'{out_path} exists. skipping spm_encode.')
	else:
		with (
			open(self._path_split / f'{file_name}.{lang}', 'r') as in_file,
			open(out_path, 'w') as out_file
		):
			for line in in_file:
				tokens = spm_model.encode(line.strip(), out_type=str)  # 编码结果是 str
				print(' '.join(tokens), file=out_file)

创建一个 SentencePieceProcessor() 实例, 并加载刚才训练所的的分词模型, 然后用 spm_model.encode(...) 对文本文件进行编码, 也就是分词.

3. fairseq 的使用

fairseq 的使用包括很多内容, 包括: 数据的二进制化、数据的加载、模型的定义与训练等.

3.1 数据的二进制化

经过 sentencepiece 的分词处理, 得到了分词后的文本. 下一步要对文本执行二进制化, 以便于快速加载到内存中. 注意不是写 python 代码, 而是执行命令行:

python -m fairseq_cli.preprocess \
	--source-lang {src_lang}\
	--target-lang {tgt_lang}\
	--trainpref {prefix/'train'}\
	--validpref {prefix/'valid'}\
	--testpref {prefix/'test'}\
	--destdir {binpath}\
	--joined-dictionary\
	--workers 2

此命令执行几个关键任务: [来自通义千问]

  1. 数据准备: 从指定的文件('train', 'valid', 'test' 前缀)读取源语言和目标语言数据, 并将二进制格式的结果写入目的地目录(binpath);
  2. 字典创建: 通过指定 --joined-dictionary, 它为源语言和目标语言创建一个统一的字典. 这对于低资源场景或者当希望模型学习两种语言之间相似单词的映射时是有益的;

参数解析:

  • --source-lang {src_lang}: 源语言;
  • --target-lang {tgt_lang}: 目标语言;
  • --trainpref {prefix/'train'}: 训练数据文件的前缀; 实际的文件名将是 {prefix}/train.{src_lang}{prefix}/train.{tgt_lang}, 分别对应源语言和目标语言的文件;
  • --validpref {prefix/'valid'}: 与上述相同, 但针对验证数据;
  • --testpref {prefix/'test'}: 与上述相同, 但针对测试数据;
  • --destdir {binpath}: 数据以二进制格式保存的目录;
  • --joined-dictionary: 是否为源语言和目标语言创建共享词汇表.

运行此命令后, 应该能在 {binpath} 目录中找到多个二进制文件, 包括词典已标记好的数据, 这些数据已经准备好用于训练神经机器翻译模型.

=== dict.{en/zh}.txt ===
两个词典文件是特殊的, 可能是词典内容往往不会很多, 所以没必要用二进制格式存储. 注意, 词典的根据仅仅是 train.{en/zh} 中的文本内容, 而不包括 {valid/test}.{en/zh}. 其内容是这样的:

统计了每个 token 的词频. 由于使用了 --joined-dictionary, 两个词典的内容是一样的. 倘若不使用这个参数, 则会将中英文的词典分开.

=== preprocess.log ===
preprocess.log 文件是数据预处理过程的日志文件, 里面会记录配置参数, 以及处理结果的数据统计:

可以看到有少量词被 <unk> 替代了, 但我们不知道替代的机制是什么, 也不打算追究了.

== 二进制文件 ==
二进制文件命名的前缀都是 {train/valid/test}.en-zh.{en/zh}, 后缀是 {bin/idx}, 以下解释来自通义千问:
bin 目录下的 .bin.idx 文件是预处理阶段生成的二进制数据文件, 它们用于加速模型的训练过程. 这些文件由 fairseq-preprocess 工具生成, 当处理大规模文本数据时, 使用二进制格式可以显著减少数据加载和处理的时间.

=== .bin 文件 ===
.bin 文件包含了经过 Tokenization[?]NumericalizationPadding[?] 处理后的序列数据. 每个 .bin 文件通常对应于一个特定的数据集(如训练集、验证集或测试集), 并且分为源语言和目标语言两部分. 例如, 对于英语到德语的翻译任务, 你可能会看到 train.en-de.en.bintrain.en-de.de.bin 这样的文件名.

.bin 文件中, 通常包含了以下信息:

  • 每个样本的 Token ID 序列;
  • 对应的长度信息;
  • 可能还包括特殊标记, 如开始标记(BOS)和结束标记(EOS).

=== .idx 文件 ===
.idx 文件是一个索引文件, 它为对应的 .bin 文件提供快速随机访问的能力. 这是因为直接从二进制文件中随机读取数据通常是低效的, 尤其是当数据量很大时, .idx 文件存储了每个样本在 .bin 文件中的起始位置和长度信息, 使得模型在训练时可以直接跳转到所需的样本位置, 无需从头开始顺序读取整个文件.

还不知道有没有可以直接写在程序中的 python 代码 API.
上面说 “经过 Tokenization[?]、… 和 … 处理后”, 也不知道 fairseq_cli.preprocess是否真的执行了分词操作.
将在附录中解答这两个问题.

小结: 使用 fairseq_cli.preprocess 命令可对数据进行二进制化, 可节省存储空间以及加快数据的加载速度, 同时提供数据的随机读取; 在过程中, 会对文本进行分词, 创建词典.

3.2 数据的加载

这种二进制文件让人讨厌的地方就在于你没法打开文件看一看.

任务配置及 task 对象的创建 → 用 task 对象 load 数据 → 遍历数据.

3.2.1 任务配置及 task 对象的创建

经过 fairseq_cli.preprocess 的处理, bin 目录中的各种文件就可以被 fairseq 中的数据加载工具加载了. 首先需要创建一个 task:

task_cfg = translation.TranslationConfig(
	data=str(self._path_bin),
	source_lang=self._lang_src,
	target_lang=self._lang_tgt,
	train_subset='train',
	required_seq_len_multiple=8,
	dataset_impl='mmap',
	upsample_primary=1
)
task = translation.TranslationTask.setup_task(task_cfg)

TranslationConfig 是一个用于封装翻译任务相关配置的类, 它允许你设定训练数据处理的多个方面. 下面详细解释一下这段代码中的参数:

  • data=str(self._path_bin): 数据目录的路径. 这个目录应该包含预处理后的数据, 如 .bin.idx 文件;
  • source_lang=self._lang_src: 源语言代码, 这将决定模型将哪种语言的文本作为输入. 例如, 如果源语言是英语, 则设置为 "en";
  • target_lang=self._lang_tgt: 目标语言代码, 这将决定模型将哪种语言的文本作为输出. 例如, 如果目标语言是汉语, 则设置为 "zh";
  • train_subset='train': 用于训练的子集名称. 在预处理阶段, 数据通常会被划分为训练、验证和测试三个子集, 这里选择的是 "train" 子集;
  • required_seq_len_multiple=8: 设定序列长度需要是某个正整数的倍数. 在某些情况下, 特别是涉及到卷积神经网络(CNN)或注意力机制时, 要求输入序列长度是特定数值的倍数以优化计算效率. 这里设置为 8, 意味着序列长度必须会被填充为 8 的倍数;
  • dataset_impl='mmap': 数据集的实现方式. "mmap" 表示使用内存映射文件的方式加载数据, 这种方式可以提高数据读取的速度, 尤其是在处理大量数据时;
  • upsample_primary=1: 如果你使用的是多个训练数据集, upsample_primary 参数用于控制主数据集(primary dataset)的上采样倍数. 设置为 1 意味着不进行上采样, 即主数据集的样本数量保持不变;

上采样(Upsampling)主要用于增加数据集中的样本数量. 上采样通常有以下两种主要的应用场景:

  • 处理不平衡数据集: 当数据集中某一类别的样本数量远少于其他类别时, 少数类别的模型训练效果可能会受到影响, 因为模型容易偏向于多数类别. 上采样可以通过复制少数类别的样本或合成新的样本(如通过 SMOTE 算法)来平衡数据集, 从而改进模型在少数类别上的预测能力.
  • 图像和序列数据的生成: 在生成对抗网络(GANs)或自编码器(Autoencoders)等模型中, 上采样层用于从较低分辨率或较短序列恢复高分辨率图像或长序列. 例如, 超分辨率重建就是一种典型的上采样应用, 旨在从低分辨率图像生成高分辨率图像.

上采样可以通过多种方式进行, 包括但不限于:

  • 重复样本: 直接复制数据集中的少数类别样本, 增加其在数据集中的比例.
    数据增强: 通过对现有样本进行变换(如旋转、翻转、缩放等)来合成新的样本, 这种方法常用于图像数据.
  • 插值: 在信号处理中, 上采样可以通过插入零值并在相邻点之间进行插值来实现, 从而提高信号的分辨率.
  • 基于模型的方法: 如 GANs 和 Autoencoders 可以生成新的样本, 这种方法通常用于图像和序列数据的上采样.

这些配置通常会在训练脚本中被传入到 fairseq-train相关的训练函数中, 以启动模型的训练过程.

点开 TranslationConfig, 查看源码, 可以看到里面还有很多其他的设置, 包括:

left_pad_source: bool = field(
	default=True, metadata={"help": "pad the source on the left"}
)  # 默认情况下是在左侧 pad source 句子
left_pad_target: bool = field(  # 而 target 句子不 pad
	default=False, metadata={"help": "pad the target on the left"}
)
max_source_positions: int = field(
	default=1024, metadata={"help": "max number of tokens in the source sequence"}
)
max_target_positions: int = field(
	default=1024, metadata={"help": "max number of tokens in the target sequence"}
)

task = translation.TranslationTask.setup_task(task_cfg) 是根据 task_cfg 创建了一个 TranslationTask 实例对象, 其中 setup_task(...) 是工厂方法:

@classmethod
def setup_task(cls, cfg: TranslationConfig, **kwargs):
	"""
	Setup the task (e.g., load dictionaries).
	Args:
		args (argparse.Namespace): parsed command-line arguments
	"""
	paths = utils.split_paths(cfg.data)
	...
	# load dictionaries
	src_dict = cls.load_dictionary(
		os.path.join(paths[0], "dict.{}.txt".format(cfg.source_lang))
	)
	tgt_dict = cls.load_dictionary(
		os.path.join(paths[0], "dict.{}.txt".format(cfg.target_lang))
	)
	...
	return cls(cfg, src_dict, tgt_dict)

首先加载了 dictionary, 然后连同 cfg 一起作为参数, 创建了一个 TranslationTask 实例对象, 其属性包括(继承自父类FairseqTask的属性和两个 dictionary):


小结: 根据 TranslationConfig 创建 TranslationTask 实例对象是一切的开始, 里面封装了很多操作, 如 数据集的加载与管理, 模型输入输出格式的定义等.

3.2.2 用 task 对象 load 数据

值得注意的是, path 不是直接来源于 cfg.data, 而是经过处理的:

paths = utils.split_paths(cfg.data)
>>>
def split_paths(paths: str, separator=os.pathsep) -> List[str]:
	return (
		paths.split(separator) if "://" not in paths else paths.split(MANIFOLD_PATH_SEP)
	)

os.pathsep:, 这让我想起来 Ubuntu 中一些环境变量配置时的 :, 我估摸着这就是多个路径的分隔符. 所以, cfg.data 有可能是多个路径. 这也呼应了前面的:

def load_dataset(self, split, epoch=1, combine=False, **kwargs):
	"""
	Load a given dataset split.
	Args:
		split (str): name of the split (e.g., train, valid, test)
	"""
	paths = utils.split_paths(self.cfg.data)
	assert len(paths) > 0
	if split != self.cfg.train_subset:
		# if not training data set, use the first shard for valid and test
		paths = paths[:1]
	data_path = paths[(epoch - 1) % len(paths)]

	# infer langcode
	src, tgt = self.cfg.source_lang, self.cfg.target_lang

	self.datasets[split] = load_langpair_dataset(
		data_path,
		split,
		src, self.src_dict,
		tgt, self.tgt_dict,
		combine=combine,
		dataset_impl=self.cfg.dataset_impl,
		...
	)

先解除一个疑惑: 训练文件的命名已经是 train.xxx, 且二进制预处理时也使用了参数 --trainpref, 文件名也是 fairseq 包生成的, 为何还要在 TranslationConfig 中使用 train_subset='train' 设置训练文件名?
load_dataset(...) 中也许有了答案:

if split != self.cfg.train_subset:
	# if not training data set, use the first shard for valid and test
	paths = paths[:1]

这里用于判断 split 是否是 train_subset, 如果不是, 则去掉其他数据路径, 只保留第一个路径. 而后面的一句:

data_path = paths[(epoch - 1) % len(paths)]  # path[(epoch-1) % 1] = path[0]

data_path 就只取第一个路径. 所以猜测:

  • cfg.data 是由 : 分隔的多个数据集路径;
  • 训练数据集可能存在于多个路径中, 而 valid/test 数据则只存在于第一个路径中, 即 primary 数据集;
  • split == self.cfg.train_subset', 即读取训练集, 则读取数据的路径epoch 滚动.

那为什么不直接 if split != 'train' 呢? 呃! 咱也不知道, 可能还能设置 train_subset='valid'?

好! 接下来使用 load_langpair_dataset(...) 函数加载数据集:

def load_langpair_dataset(...):
	...
	src_datasets = []
	tgt_datasets = []
	# 迭代 k, 意思是文件夹内可能存在 {train/valid/test}{k} 多个文件, 需要 append 到 [] 中
	for k in itertools.count():  # 无限迭代器
		split_k = split + (str(k) if k > 0 else "")  # train, train1, train2, ...
		# prefix = ./bin/train.en-zh.
		if split_exists(split_k, src, tgt, src, data_path):
			prefix = os.path.join(data_path, "{}.{}-{}.".format(split_k, src, tgt))
		elif split_exists(split_k, tgt, src, src, data_path):
			prefix = os.path.join(data_path, "{}.{}-{}.".format(split_k, tgt, src))
		else:
			if k > 0:  # 如果 ./bin/train1.en-zh.en.{bin/idx} 和 ./bin/train1.zh-en.en.{bin/idx} 都没找到
				break
			else:
				raise FileNotFoundError(
					"Dataset not found: {} ({})".format(split, data_path)
				)
		src_dataset = data_utils.load_indexed_dataset(
			prefix + src, src_dict, dataset_impl
		)
		...
		src_datasets.append(src_dataset)

		tgt_dataset = data_utils.load_indexed_dataset(
			prefix + tgt, tgt_dict, dataset_impl
		)
		if tgt_dataset is not None:
			tgt_datasets.append(tgt_dataset)
		...
		if not combine:
			break
	...
	# >>> 合并多个文件中的文本 >>>
	if len(src_datasets) == 1:
		src_dataset = src_datasets[0]
		tgt_dataset = tgt_datasets[0] if len(tgt_datasets) > 0 else None
	else:
		sample_ratios = [1] * len(src_datasets)
		sample_ratios[0] = upsample_primary  # 增加主文件的采样权重
		src_dataset = ConcatDataset(src_datasets, sample_ratios)
		if len(tgt_datasets) > 0:
			tgt_dataset = ConcatDataset(tgt_datasets, sample_ratios)
		else:
			tgt_dataset = None
	...
	eos = None
	...
	align_dataset = None
	...
	tgt_dataset_sizes = tgt_dataset.sizes if tgt_dataset is not None else None
	return LanguagePairDataset(
		src_dataset,
		src_dataset.sizes,
		src_dict,
		tgt_dataset,
		tgt_dataset_sizes,
		tgt_dict,
		...cfg.xxx...
	)

这段代码假设 bin 文件夹内存在 {train/valid/test}{k} 多个文件, 读到多个 {src/tgt}_datasets = [] 中, 并通过 ConcatDataset(src_datasets, sample_ratios) 合并到一个 dataset 中, 最后通过 LanguagePairDataset(...) 创建语言对数据集. 所以这里我们要继续看三个东西:

  • data_utils.load_indexed_dataset(...);
  • ConcatDataset(xxx_datasets, sample_ratios);
  • LanguagePairDataset(src_dataset, src_dataset.sizes, src_dict, tgt_dataset, tgt_dataset_sizes, tgt_dict, ...).
3.2.2.1 data_utils.load_indexed_dataset(...)
src_dataset = data_utils.load_indexed_dataset(
	prefix + src, src_dict, dataset_impl
)

这里的 prefix+src = './bin/train.en-zh.en', 连同 src_dict, dataset_impl 传入 data_utils.load_indexed_dataset(...), 进行数据加载:

def load_indexed_dataset(
	path, dictionary=None, dataset_impl=None, combine=False, default="cached"
):
	import fairseq.data.indexed_dataset as indexed_dataset
	from fairseq.data.concat_dataset import ConcatDataset

	datasets = []
	for k in itertools.count():
		path_k = path + (str(k) if k > 0 else "")
		try:  # 有后缀 '.bin/.idx' 的 path 去掉后缀, 这里的 path = 
			# prefix+src = './bin/train.en-zh.en' 无, 故保持不变
			path_k = indexed_dataset.get_indexed_dataset_to_local(path_k)
		except Exception as e:
			...
		...
		dataset = indexed_dataset.make_dataset(
			path_k,
			impl=dataset_impl_k or default,
			fix_lua_indexing=True,
			dictionary=dictionary
		)
		if dataset is None:
			break  # 加载完毕
		datasets.append(dataset)
		if not combine:
			break  # not combine, 则仅使用一个数据文件

	if len(datasets) == 0:
		return None
	elif len(datasets) == 1:
		return datasets[0]
	else:
		return ConcatDataset(datasets)

这里依然假设有多个文件要加载, 由 combine 参数决定加载多个, 还是只加载头一个:

combine (bool, optional): automatically load and combine multiple datasets.
	For example, if *path* is 'data-bin/train', then we will combine
	'data-bin/train', 'data-bin/train1', ... and
	return a single ConcatDataset instance.

只可惜, 这里的调用使用了默认值 False, 也就是只有一个文件. 我们只需要看:

dataset = indexed_dataset.make_dataset(
	path_k,
	impl=dataset_impl_k or default,
	fix_lua_indexing=True,
	dictionary=dictionary
)
def make_dataset(path, impl, fix_lua_indexing=False, dictionary=None):
	if ...:
	...
	elif impl == "mmap" and MMapIndexedDataset.exists(path):
		return MMapIndexedDataset(path)  // 我们使用的, 也是默认的
	...
	return None

它是根据 impl 决定加载方式的, 对于我们使用的 'mmap', 它不需要 fix_lua_indexing, dictionary 这两个参数, 仅仅有加载路径就行了. 下面是真正的重点, 展现了如何加载二进制文件:

class MMapIndexedDataset(torch.utils.data.Dataset):
	def __init__(self, path):
		super().__init__()
		self._path = None
		self._index = None
		self._bin_buffer = None
		self._do_init(path)

	def _do_init(self, path):
		self._path = path
		self._index = self.Index(index_file_path(self._path))  # idx 文件加载
		# >>> .bin 文件加载 >>>
		_warmup_mmap_file(data_file_path(self._path))  # 加载测试, 啥都没干
		self._bin_buffer_mmap = np.memmap(
			data_file_path(self._path), mode="r", order="C"
		)
		self._bin_buffer = memoryview(self._bin_buffer_mmap)

	@lru_cache(maxsize=8)
	def __getitem__(self, i):
		ptr, size = self._index[i]
		np_array = np.frombuffer(
			self._bin_buffer, dtype=self._index.dtype, count=size, offset=ptr
		)
		if self._index.dtype != np.int64:
			np_array = np_array.astype(np.int64)
		return torch.from_numpy(np_array)

可以看到, 构建这个 MMapIndexedDataset 主要干了两件事: (1) 加载 .idx 文件, (2) np.memmap(...) 构建了 .idx 文件的内存映射. 然后 __getitem__(self, i) 使用 np.frombuffer(...) 读取 .bin 文件, 将数据转化为 torch.long 返回. 至于 self.Index, 这个对象里有数据的一些元信息以及每条数据在 .bin 文件中的位置与长度, 详情在附录中(包含以内存映射方式读取二进制文件).

3.2.2.2 ConcatDataset(xxx_datasets, sample_ratios)

虽然此例中, 我们只有一个数据集, 但还是要了解一下多数据集是如何拼接的:

class ConcatDataset(FairseqDataset):
	def __init__(self, datasets, sample_ratios=1):
		super(ConcatDataset, self).__init__()
		self.datasets = list(datasets)
		if isinstance(sample_ratios, int):
			sample_ratios = [sample_ratios] * len(self.datasets)
		self.sample_ratios = sample_ratios
		# 根据每个数据集的采样权重, 设置整个数据集的累计下标, 也即把数据集挨个摆放, 样本下标是累计的;
		# 如果有 3 个数据集, 大小分别为: 100, 300, 200; sample_ratios = [2, 4, 3];
		# 那么, cumulative_sizes = [200, 1400, 2000]
		self.cumulative_sizes = self.cumsum(self.datasets, sample_ratios)
		self.real_sizes = [len(d) for d in self.datasets]  # 各数据集的实际大小: [100, 300, 200]

	def __len__(self):
		return self.cumulative_sizes[-1]

	def __getitem__(self, idx):
		# 根据 real_sizes, cumulative_sizes 等信息, 计算出该取哪个数据集的哪个样本
		dataset_idx, sample_idx = self._get_dataset_and_sample_index(idx)
		return self.datasets[dataset_idx][sample_idx]
3.2.2.3 LanguagePairDataset(src_dataset, src_dataset.sizes, src_dict, tgt_dataset, tgt_dataset_sizes, tgt_dict, ...)

其实这个 LanguagePairDataset 就没什么好说的了, 它只是将源和目标数据集放在一块, 一起加载而已. 如果非要说点什么, 无非就是它除了拼装之外, 还进行了一些额外的操作:

class LanguagePairDataset(FairseqDataset):
	"""
	A pair of torch.utils.data.Datasets.

	Args:
		left_pad_source (bool, optional): pad source tensors on the left side
			(default: True).
		left_pad_target (bool, optional): pad target tensors on the left side
			(default: False).
		input_feeding (bool, optional): create a shifted version of the targets
			to be passed into the model for teacher forcing (default: True).
		remove_eos_from_source (bool, optional): if set, removes eos from end
			of source if it's present (default: False).
		append_eos_to_target (bool, optional): if set, appends eos to end of
			target if it's absent (default: False).
		append_bos (bool, optional): if set, appends bos to the beginning of
			source/target sentence.
		num_buckets (int, optional): if set to a value greater than 0, then
			batches will be bucketed into the given number of batch shapes.
	"""

其中我们能通过 TranslationConfig 设置的参数有:

left_pad_source=self.cfg.left_pad_source,  # default=True
left_pad_target=self.cfg.left_pad_target,  # default=False
num_buckets=self.cfg.num_batch_buckets,    # default=0
pad_to_multiple=self.cfg.required_seq_len_multiple

起码在使用 task 对象加载数据的过程中, LanguagePairDataset其他参数都是默认固定好的.

其样本的样式是这样的:

def __getitem__(self, index):
	tgt_item = self.tgt[index] if self.tgt is not None else None
	src_item = self.src[index]
	
	if self.append_eos_to_target:
		...
	if self.append_bos:
		...
	if self.remove_eos_from_source:
		...
	example = {
		"id": index,
		"source": src_item,
		"target": tgt_item,
	}
	...
	return example

小结: 至此, 漫长的数据加载之路就结束了, 太过漫长, 以至于我们"不识大体". 画一张图来"识一下大体"吧:

3.2.3 数据集的遍历

加载完数据之后, 就可以遍历数据了, 我们把 valid 数据拿出来看看:

sample = task.dataset("valid")[0]  # valid 数据集的第 1 个样本
pprint.pprint(sample)
pprint.pprint(  # 将下标张量转化为 str 的句子
	"Source: " + task.source_dictionary.string(
		sample['source'],
		"sentencepiece",  # 说明预处理时的分词工具, 以便还原句子
	)
)
pprint.pprint(
	"Target: " + task.target_dictionary.string(
		sample['target'],
		"sentencepiece",
	)
)

其本质是一个含 __getitem__(idx) 的迭代器, 所以 task.dataset("valid")[0] 返回了第一个样本. 输出:

{
	'id': 0,
	'source': tensor([22, 59, 12, 2601, 13, 452, 16, 5, 326, 13, 21, 6, 7, 2]),
	'target': tensor([51, 760, 1488, 792, 442, 1591, 1564, 10, 2])
}
'Source: we have to buy a lot of those ads .'
'Target: 我們必須買很多那些廣告 。'

可见结果如 LanguagePairDataset 中写的那样, 返回了一个形如:

example = {
	"id": index,
	"source": src_item,
	"target": tgt_item,
}

的样本, 包含着样本的 id, 源语言的句子 src_item, 目标语言的句子 tgt_item. 句子是由单词对应的下标张量所表示, 还没有经过 padding 等操作, 还是比较原始的, 仅仅在末尾添加了一个 2, 从

class Dictionary:
	def __init__(
			self,
			*,  # begin keyword-only arguments
			bos="<s>",
			pad="<pad>",
			eos="</s>",
			unk="<unk>",
			extra_special_symbols=None,
	):
		self.bos_word, self.unk_word, self.pad_word, self.eos_word = bos, unk, pad, eos
		self.symbols = []  # 单词列表
		self.count = []  # # 对应词频
		self.indices = {}  # 单词在 symbol 和 count 中的下标
		self.bos_index = self.add_symbol(bos)  # 首先添加四个特殊标记
		self.pad_index = self.add_symbol(pad)  # 并返回其下标
		self.eos_index = self.add_symbol(eos)
		self.unk_index = self.add_symbol(unk)

来看, 2eos="</s>", 表示句子的结束. 0 是句子的开始 <s>, 1<pad>, 3<unk>.

上面只是拿出来一个样本看看, 如何迭代呢? 还得组成 batch 呢! 然而并不是放到 DataLoader 里面, fairseq 里有专门的迭代器:

batch_iterator = task.get_batch_iterator(
	dataset=task.dataset(split),
	max_tokens=config.max_tokens,
	max_sentences=None,
	max_positions=utils.resolve_max_positions(task.max_positions(), config.max_tokens),
	ignore_invalid_inputs=True,
	seed=config.seed,
	num_workers=config.num_workers,
	epoch=epoch,
	disable_iterator_cache=not cached
)

似乎 batch_size 并不是由一个叫 batch_size 的参数决定, 而是由:

max_tokens=config.max_tokens,
max_sentences=None,
max_positions=utils.resolve_max_positions(task.max_positions(), config.max_tokens)

等参数决定. 输出两个batch 看看:

{
	'id': tensor([1960, 3713, 1096,  441, 2897, 2340,  493, 2309, 1383, 2517]),
	'net_input': {
		'prev_output_tokens': tensor(
			[[2, 138, ...,   10,  1, 1, 1, 1],
			 [2,   5, ...,  869,  1, 1, 1, 1],
			 ...,
			 [2, 115, ..., 2178, 10, 1, 1, 1]]
		),
		'src_lengths': tensor([30, 30, 30, 30, 30, 30, 30, 30, 30, 30]),
		'src_tokens': tensor(
			[[1, ..., 1, 17, ..., 6,    7,    2],
			 ...,
			 [1, ..., 1, 33, ..., 6,    7,    2]]
		)
	},
	'nsentences': 10,
	'ntokens': 207,
	'target': tensor(
		[[138, ...,  10, 2, 1, 1, 1, 1],
		 [  5, ..., 869, 2, 1, 1, 1, 1],
		 ...,
		 [115, ...,  10, 2, 1, 1, 1, 1]]
	)
}
{
	'id': tensor([2488, 3873, 1464,  387]),
	'net_input': {
		'prev_output_tokens': tensor(
			[[  2, 148, ..., 1170,   4, 1, ..., 1],
			 [  2,  51, ...,    8,  10, 1, ..., 1],
			 [  2,   5, ..., 4293,  10, 1, ..., 1],
			 [  2, 627, ..., 3082, 231, 1,   1, 1]]
		),
		'src_lengths': tensor([57, 57, 57, 57]),
		'src_tokens': tensor(
			[[1, 1, 1, 32, ..., 2405, 7, 2],
			 [1, 1, 1, 11, ...,    6, 7, 2],
			 [1, 1, 1, 12, ...,  643, 7, 2],
			 [1, 1, 1, 32, ..., 1456, 7, 2]]
		)
	},
	'nsentences': 4,
	'ntokens': 231,
	'target': tensor(
		[[148, ..., 1170,   4, 2, 1, ..., 1],
		 [ 51, ...,    8,  10, 2, 1, ..., 1],
		 [  5, ..., 4293,  10, 2, 1, ..., 1],
		 [627, ..., 3082, 231, 2, 1,   1, 1]]
	)
}

=== 关于 batch_size ===
可以看到, 两个 batchntokensnsentences 都不一样, 说明 batch 中 sample 的数量不是由这两者决定. 来看看通义千问的解释:

max_tokens=None,            # 每个批次的最大令牌数
max_sentences=None,         # 每个批次的最大句子数
max_positions=None,         # 输入序列的最大长度

在函数 task.get_batch_iterator(...) 内确实能找到:

# filter examples that are too large
if max_positions is not None:
	indices = self.filter_indices_by_size(
		indices, dataset, max_positions, ignore_invalid_inputs
	)
# create mini-batches with given size constraints
batch_sampler = dataset.batch_by_size(
	indices,
	max_tokens=max_tokens,
	max_sentences=max_sentences,
	required_batch_size_multiple=required_batch_size_multiple  # 这里可能还要求 batch_size 是某个整数的倍数
)   # less than N or a multiple of N

所以, max_position 是用来过滤掉过长的句子的. 而 ignore_invalid_inputs 决定是否忽略过长的句子, 而不是抛出异常. 实际上模型也有这个最大句长限制:

# 解决最大位置限制
max_positions = utils.resolve_max_positions(
	model.max_positions(),
	task.max_positions(),
	default_max_positions=(1024, 1024)
)

模型数据用户设置三者决定, 默认都是 None, 可能就是没有限制的意思. 具体细节就不追究了.

max_tokensmax_sentences 共同决定了 batch 的大小. 每个batch 所含的 tokens 数量不会超过 max_tokens, 句子数量也不会超过 max_sentences. (注意, 计数不包括填充.) 那 batch_size 到底是什么? 通过设置 required_batch_size_multiple=5 后发现, batch['ntokens'] 都是 5 的倍数, 而 batch['nsentences'] 没有规律. 故, batch_size 是指 batch['ntokens'], 它不是固定的值, 只是被限制 < cfg.max_tokens.

=== batch_iteratorcache ===
现在只剩下两个参数的意义不清楚了: epochdisable_iterator_cache. 打开 get_batch_iterator(...) 看一看:

def get_batch_iterator(..., epoch=1, disable_iterator_cache=False, update_epoch_batch_itr=False, ...):
	"""
	Get an iterator that yields batches of data from the given dataset.
	Args:
		epoch (int, optional): the epoch to start the iterator from (default: 1).
		disable_iterator_cache (bool, optional): don't cache the EpochBatchIterator (default: False).
		update_epoch_batch_itr (bool optional): if true then do not use the cached batch iterator for the epoch
	Returns:
		~fairseq.iterators.EpochBatchIterator: a batched iterator over the given dataset split
	"""
	can_reuse_epoch_itr = (  # 三个条件: 启动了 cache; 用户要使用 cache; 数据集支持 cache
		not disable_iterator_cache  # 启动了 EpochBatchIterator 的 cache
		and not update_epoch_batch_itr  # use the cached batch iterator for the epoch
		and self.can_reuse_epoch_itr(dataset)  # 数据集支持, 默认由 EpochListening 决定
		# can_reuse_epoch_itr_across_epochs 也没有重写, 说明是可以 reuse 的
	)
	if can_reuse_epoch_itr and dataset in self.dataset_to_epoch_iter:  # 已被缓存, 直接返回
		logger.debug("reusing EpochBatchIterator for epoch {}".format(epoch))
		return self.dataset_to_epoch_iter[dataset]

	# initialize the dataset with the correct starting epoch
	# 设置数据集的 epoch, 可能影响数据集内容?
	dataset.set_epoch(epoch)  # 经查明, LanguagePairDataset 没有重写 set_epoch, 相当于啥都没做
	# get indices ordered by example size
	with data_utils.numpy_seed(seed):  # 有些 dataset 的排序是依赖于 epoch 的, 故而先 set_epoch
		# LanguagePairDataset 仅仅是按句子长度排序, 不依赖于 epoch, 所以暂时不用关注 epoch 对 batch iterator 的影响
		indices = dataset.ordered_indices()
	# filter examples that are too large
	...
	# create mini-batches with given size constraints
	...
	# TranslationConfig 没有可不代表其他 cfg 没有啊
	reuse_dataloader = getattr(self.cfg, "reuse_dataloader", True)  # 没有就是 True

	# return a reusable, sharded iterator; 可 reuse
	# 这东西本质不是一个迭代器, 只是一个包含了迭代信息的类
	epoch_iter = iterators.EpochBatchIterator(
		dataset=dataset,
		collate_fn=dataset.collater,
		batch_sampler=batch_sampler,  # List[np.ndarray], 句子的分组情况, 每一个 np.ndarray 都包含着一批样本的 id
		...
		epoch=epoch,
		...
		reuse_dataloader=reuse_dataloader,
	)
	if can_reuse_epoch_itr:  # 缓存吗?
		self.dataset_to_epoch_iter[dataset] = epoch_iter
	return epoch_iter

这个函数返回一个"迭代器"(iterators.EpochBatchIterator), 本质不是一个迭代器, 只是一个包含了迭代信息的类. 首先做的是检查是否返回迭代器的缓存, 那么需要三个条件: (1) 启动了 cache; (2) 用户要使用 cache; (3) 数据集支持 cache. 那么参数 disable_iterator_cache 的意义就清楚了: 是否开启迭代器的缓存.

缓存的前两个条件是参数直接决定的, 后一个则由数据集决定, 为了了解数据集是否支持缓存, 得看 EpochListening:
fairseq 包内的数据集类基本都是继承 FairseqDataset, 其标签为:

# 多了一个 EpochListening
class FairseqDataset(torch.utils.data.Dataset, EpochListening):
	"""A dataset that provides helpers for batching."""

首先映入眼帘的是注释, 说明了其作用是: 为数据的 batching 提供帮助. 其除了继承 torch.utils.data.Dataset 之外, 还继承一个叫 EpochListening 的类:

class EpochListening:  # 顾名思义是使数据集监听训练过程的 epoch
	"""Mixin for receiving updates whenever the epoch increments."""

	@property
	def can_reuse_epoch_itr_across_epochs(self):
		"""
		Whether we can reuse the :class:`fairseq.data.EpochBatchIterator` for
		this dataset across epochs.

		This needs to return ``False`` if the sample sizes can change across
		epochs, in which case we may need to regenerate batches at each epoch.  需要重新生成 batches
		If your dataset relies in ``set_epoch`` then you should consider setting
		this to ``False``.  依赖于 set_epoch, 则为 False
		"""
		return True  # 看来默认是 True; 如果 sample sizes 在不同 epoch 可能改变, 则应为 False

	def set_epoch(self, epoch):
		"""Will receive the updated epoch number at the beginning of the epoch."""
		pass

值得注意的是, 这两个方法 LanguagePairDataset 都没有重写, 继承路径中也没有, 也就是说, LanguagePairDataset epoch 无感, 可以 reuse.

好! 缓存的逻辑也很简单: return self.dataset_to_epoch_iter[dataset], 数据集对应的迭代器在一个字典里存着, 需要的时候返回就行了.

=== 为什么 task.get_batch_iterator(...epoch...) 需要 epoch? ===

// get indices ordered by example size
with data_utils.numpy_seed(seed):  // 有些 dataset 的排序是依赖于 epoch 的, 故而先 set_epoch
	// LanguagePairDataset 仅仅是按句子长度排序, 不依赖于 epoch, 所以暂时不用关注 epoch 对 batch iterator 的影响
	indices = dataset.ordered_indices()

task.get_batch_iterator(...) 函数中确实发现了这么一句排序的代码, 也许就是 “句子已经按长度排好序了” 的原因. 注意, 只是 LanguagePairDatasetordered_indices() 排序是按句子长度:

def ordered_indices(self):
	...
	# sort by target length, then source length
	if self.tgt_sizes is not None:
		indices = indices[np.argsort(self.tgt_sizes[indices], kind="mergesort")]
	return indices[np.argsort(self.src_sizes[indices], kind="mergesort")]

其他数据集未必是这么排的, 有些 dataset 的排序是依赖于 epoch, 故而先 set_epoch(epoch), 如 MultiCorpusDataset.

==== reuse 参数 ====

# TranslationConfig 没有可不代表其他 cfg 没有啊
reuse_dataloader = getattr(self.cfg, "reuse_dataloader", True)  # 没有就是 True

这一句判断是否 reuse_dataloader, 对于翻译任务的 TranslationConfig, 没有这个配置, 返回默认的 True. 也即 reuse_dataloader = True.

=== iterators.EpochBatchIterator ===
下面就可以看一看 iterators.EpochBatchIterator 是怎么回事了:

# return a reusable, sharded iterator; 可 reuse
# 这东西本质不是一个迭代器, 只是一个包含了迭代信息的类
epoch_iter = iterators.EpochBatchIterator(
	dataset=dataset,
	collate_fn=dataset.collater,
	batch_sampler=batch_sampler,  # List[np.ndarray], 句子的分组情况, 每一个 np.ndarray 都包含着一批样本的 id
	...
	epoch=epoch,
	...
	reuse_dataloader=reuse_dataloader
)

由于 iterators.EpochBatchIterator源码太长, 这里只列出类头和其基类:

class EpochBatchIterator(EpochBatchIterating):
	"""
	A multi-epoch iterator over a :class:`torch.utils.data.Dataset`.

	Compared to :class:`torch.utils.data.DataLoader`, this iterator:
		- can be reused across multiple epochs with the :func:`next_epoch_itr`
	 		method (optionally shuffled between epochs)
		- can be serialized/deserialized with the :func:`state_dict` and
	 		:func:`load_state_dict` methods
		- supports sharding with the *num_shards* and *shard_id* arguments
	"""

class EpochBatchIterating(object):
	def __len__(self) -> int:
		raise NotImplementedError

	@property
	def next_epoch_idx(self):
		raise NotImplementedError

	def next_epoch_itr(
			self, shuffle=True, fix_batches_to_gpus=False, set_dataset_epoch=True
	):
		"""
		Return a new iterator over the dataset.

		Args:
			shuffle (bool, optional): shuffle batches before returning the
				iterator (default: True).
			fix_batches_to_gpus (bool, optional): ensure that batches are always
				allocated to the same shards across epochs. Requires
				that :attr:`dataset` supports prefetching (default: False).
			set_dataset_epoch (bool, optional): update the wrapped Dataset with
				the new epoch number (default: True).
		"""
		raise NotImplementedError

	def end_of_epoch(self) -> bool:
		"""Returns whether the most recent epoch iterator has been exhausted"""
		raise NotImplementedError

	@property
	def iterations_in_epoch(self) -> int:
		"""The number of consumed batches in the current epoch."""
		raise NotImplementedError

	def state_dict(self):
		"""Returns a dictionary containing a whole state of the iterator."""
		raise NotImplementedError

	def load_state_dict(self, state_dict):
		"""Copies the state of the iterator from the given *state_dict*."""
		raise NotImplementedError

	@property
	def first_batch(self):
		return "DUMMY"

这是一个 ‘multi-epoch iterator’, 相比于 torch.utils.data.Dataset, 有三点改进功能:

  • 跨越多个 epoch 时, 可以 reuse;
  • state_dictload_state_dict 保存与加载 迭代器;
  • 这个 sharding 咱就不说了.

其实我对这些并不感兴趣, 只是因为看到了程序的一段奇怪的代码:

epoch_itr = load_data_iterator(task, "train", config.start_epoch, config.max_tokens, config.num_workers)
try_load_checkpoint(model, optimizer, name=config.resume)
while epoch_itr.next_epoch_idx <= config.max_epoch:
	# train for one epoch
	train_one_epoch(epoch_itr, model, criterion, optimizer, config.accum_steps)
	stats = validate_and_save(model, task, criterion, optimizer, epoch=epoch_itr.epoch)
	logger.info("end of epoch {}".format(epoch_itr.epoch))
	epoch_itr = load_data_iterator(task, "train", epoch_itr.next_epoch_idx, config.max_tokens, config.num_workers)

[注意: 这里代码写成 epoch_itr = load_data_iterator(...) 是不恰当的, 实际上写 batch_iterator 比较好, 因为 batch_iterator.next_epoch_itr(...) 返回的才是一个 epoch 的迭代器. 下面继续使用原代码的写法 epoch_itr, 注意别被误导了.]

为什么每个 epoch 都要再重新一次 epoch_itr = load_data_iterator(...)? 既然在 train_one_epoch 中执行了 itr = epoch_itr.next_epoch_itr(shuffle=True), 那么 epoch_itr 应该还能执行很多次 next_epoch_itr(shuffle=True). 果不其然, EpochBatchIterator 的介绍就是:

A multi-epoch iterator over a :class:`torch.utils.data.Dataset`.

根据上面的分析, 你这样每个 epoch 都重新 load_data_iterator(...), 无非就是重新搞了一个一模一样的 epoch_itr, 仅仅不一样的是 epoch 数值.

经试验, 上面奇怪的代码连续两次 epoch_itr.next_epoch_idx 并没有连续增加 epoch_itr.epoch 而导致 epoch+2, 为什么? 来看一看:

@property
def next_epoch_idx(self):
	"""Return the epoch index after *next_epoch_itr* is called."""
	if self._next_epoch_itr is not None:
		return self.epoch
	elif self._cur_epoch_itr is not None and self.end_of_epoch():  # not self._cur_epoch_itr.has_next()
		return self.epoch + 1  # 一个 epoch 结束了? 才会 +1
	else:
		return self.epoch

只有一个 epoch 结束了(self.end_of_epoch())才会 self.epoch + 1, 况且, 在这个属性函数里 self.epoch 并未真的增加.

下面来看一下 itr = epoch_itr.next_epoch_itr(shuffle=True) 到底得到了什么:

def next_epoch_itr(
	self, shuffle=True, fix_batches_to_gpus=False, set_dataset_epoch=True
):
	...
	self.epoch = self.next_epoch_idx  # 刚开始, prev_epoch == self.epoch, 即 epoch 没往上加
	if set_dataset_epoch and hasattr(self.dataset, "set_epoch"):
		self.dataset.set_epoch(self.epoch)  # 可能对数据集的 order 产生影响. 而 LanguagePairDataset 对此无感

	if self._next_epoch_itr is not None:
		...
	else:  # 对于 LanguagePairDataset, batch_sampler 是 List[ndarray(batch)]
		...
		self._cur_epoch_itr = self._get_iterator_for_epoch(
			self.epoch,
			shuffle,
			fix_batches_to_gpus=fix_batches_to_gpus,
		)
	return self._cur_epoch_itr

def _get_iterator_for_epoch(
	self, epoch, shuffle, fix_batches_to_gpus=False, offset=0
):
	if self.reuse_dataloader and self.dataloader is not None:
		self.batch_sampler.make_batches_for_epoch(epoch, offset)
		itr = self.dataloader
	else:
		self.batch_sampler = FrozenBatchSampler(  # 就是把 batch 根据随机种子 seed+epoch 打乱了一下
			...
			epoch,  # 用于随机打乱的 seed
			...
		)
		...
		# Create data loader
		itr = torch.utils.data.DataLoader(
			self.dataset,
			collate_fn=self.collate_fn,
			batch_sampler=self.batch_sampler,
			...
		)
		if self.reuse_dataloader:
			self.dataloader = itr
	...
	return itr

首先就是更新 epoch, 所以, 上面奇怪的代码中每个 epoch 加载一次 EpochBatchIterator 是不合适的, 甚至是错误的, 幸亏 disable_iterator_cache=False, 不然会报错:

print('batch_sampler: ', batch_sampler[0][:3])
batch_sampler:  [ 674 2116 3917]
batch_sampler:  [ 674 2116 3917]
2
batch_sampler:  [ 674 2116 3917]
3
Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x7fa029f0a050>
Traceback (most recent call last):
  File "/root/Miniconda/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 1479, in __del__
    self._shutdown_workers()
  File "/root/Miniconda/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 1462, in _shutdown_workers
    if w.is_alive():
  File "/root/Miniconda/lib/python3.10/multiprocessing/process.py", line 160, in is_alive
    assert self._parent_pid == os.getpid(), 'can only test a child process'
AssertionError: can only test a child process

缓存使得每次得到的 epoch_itr 都是一样的, 连 batch_sampler 都一样.

如果没缓存, 则调用 self._get_iterator_for_epoch(...) 以获取迭代器. 然后呢, 这个 dataloader 也是有缓存重用的, 套路都是一样的. 然后我们只需要注意它是构建了一个 torch.utils.data.DataLoader 并返回, CountingIterator 只是增加了一个迭代器的计数功能.

综上所述, epoch 的作用仅仅是参与计算打乱 batch 时的随机种子, 并无太大作用, 起码对于翻译任务是这样. 至于数据的遍历访问方式, 是明朗的: 调用一次 batch_iterator = task.get_batch_iterator(...) 就够了, 然后调用 epoch_itr = batch_iterator.next_epoch_itr(...) 得到一个 epoch 的遍历, 用 forwhile 没有影响:

batch_iterator = task.get_batch_iterator(...)
for epoch in range(max_epoch):
	epoch_itr = batch_iterator.next_epoch_itr(...)
	for batch_samples in epoch_itr:
		...

while batch_iterator.epoch < max_epoch:
	epoch_itr = batch_iterator.next_epoch_itr(...)
	for batch_samples in epoch_itr:
		...

但是又隐隐约约感觉哪里不对:

def state_dict(self):
	"""Returns a dictionary containing a whole state of the iterator."""
	...
	return {
		"version": 2,
		"epoch": epoch,
		"iterations_in_epoch": iter_in_epoch,
		"shuffle": self.shuffle,
	}

def load_state_dict(self, state_dict):
	"""Copies the state of the iterator from the given *state_dict*."""
	self.epoch = state_dict["epoch"]
	itr_pos = state_dict.get("iterations_in_epoch", 0)
	version = state_dict.get("version", 1)
	...

EpochBatchIterating 是保持了一个状态的, 包含 epoch, 倘若需要 load_state_dict(state_dict), 岂不是要改变 epoch, 从而导致 for 样式的遍历出错. while 样式的遍历无此顾虑. 故, 推荐 while 样式.

注意!!! 一旦 not disable_iterator_cache # 启动了 EpochBatchIterator 的 cache, 便无法更新设置, 故在需要新的设置(如max_tokens=config.max_tokens) 时, 不要使用缓存.

最后, 再来看一下 batch 的内部, 字典内有 'id', 'net_input', 'nsentences', 'ntokens', 'target' 等五部分组成, 真正输入到网络里的是 'net_input'(后面再说). 值得注意是:

  • 数据都是经过填充的, 一个 batch 内的 src_tokens, 'target''prev_output_tokens' 都是各自在内部对其的, 这样才能在表示在一个二维张量里. 默认情况下, src 是在左侧填充, tgt 是在右侧填充;
  • 真正输入到解码器的不是 'target' 而是 batch['net_input']['prev_output_tokens'], 其是把 'target'2 移到开端;
  • 一个 batch 内的句子长度都是接近的, 这似乎是由于句子已经按长度排好序了. 设置 shuffle=False, 会发现 batch[nsentences] 越来越小, 说明句子是越来越长的;
  • 填充句子时, 会将句子张量的长度 batch['src_tokens'].shape[-1] 等填充到 cfg.required_seq_len_multiple 的倍数, 据说是为了内存的高效利用.
3.3 Seq2Seq 模型
class Seq2Seq(FairseqEncoderDecoderModel):
	def __init__(self, args, encoder, decoder):
		super().__init__(encoder, decoder)
		self.args = args

	def forward(
			self,
			src_tokens,
			src_lengths,
			prev_output_tokens,
			return_all_hiddens: bool = True
	):
		"""
		Run the forward pass for an encoder-decoder model.
		"""
		encoder_out = self.encoder(
			src_tokens, src_lengths=src_lengths, return_all_hiddens=return_all_hiddens
		)
		logits, extra = self.decoder(
			prev_output_tokens,
			encoder_out=encoder_out,
			src_lengths=src_lengths,
			return_all_hiddens=return_all_hiddens
		)
		return logits, extra

这是翻译任务的模型架构, 由一个 encoder 和一个 decoder 组成, 继承 fairseq 中的 FairseqEncoderDecoderModel, 并实现其抽象方法 forward 方法. 可以看到, 其所需参数恰好跟 batch_iterator = task.get_batch_iterator(...) 迭代器所返回的样本中 batch['net_input'] 内容是一致的. 然后, 数据经过 encoderdecoder 的处理, 得到了 logits, 我们知道, 它就是输入到损失函数的东西.

下面, 我们分别详细地看一下 encoderdecoder 是什么样的.

3.3.1 FairseqEncoder
class RNNEncoder(FairseqEncoder):
	def __init__(self, args, dictionary, embeddings):
		super().__init__(dictionary)
		self.embeddings = embeddings  # nn.Embedding, 包含了词向量列表

		self.embed_dim = args.encoder_embed_dim
		self.hidden_dim = args.encoder_ffn_embed_dim
		self.num_layers = args.encoder_layers

		self.dropout_in_module = nn.Dropout(args.dropout)
		self.rnn = nn.GRU(
			self.embed_dim,
			self.hidden_dim,
			self.num_layers,
			dropout=args.dropout,
			batch_first=True,  # True 时, 输入和输出的形状应为 (batch, seq, feature), 而不是默认的 (seq, batch, feature)
			# 同时也会影响输出数据的形状. 如果 batch_first=True, 则输出也将按照 (batch, seq, hidden) 的形状返回.
			bidirectional=True
		)
		self.dropout_out_module = nn.Dropout(args.dropout)
		self.padding_idx = dictionary.pad()

	def forward(self, src_tokens, src_lengths=None, **kwargs):
		bsz, seqlen = src_tokens.size()  # batch_size 在前

		# get embeddings
		x = self.embeddings(src_tokens)  # (batch_size, seq_len, embd_dim)
		x = self.dropout_in_module(x)  # 输入前就进行 dropout

		# 過雙向 RNN
		h0 = x.new_zeros(2 * self.num_layers, bsz, self.hidden_dim)
		x, final_hiddens = self.rnn(x, h0)
		outputs = self.dropout_out_module(x)  # 输出 dropout
		# outputs = [batch_size, seq_len, hid_dim * directions] 是最上層 RNN 的輸出
		#  hidden = [num_layers * directions, batch_size, hid_dim]

		# 因為 Encoder 是雙向的 RNN,所以需要將同一層兩個方向的 hidden_state 接在一起
		final_hiddens = self.combine_bidir(final_hiddens, bsz)
		# hidden = [num_layers x batch x num_directions*hidden]

		encoder_padding_mask = src_tokens.eq(self.padding_idx)
		return (
			outputs,  # batch x seq_len x 2*hidden
			final_hiddens,  # num_layers x batch x 2*hidden
			encoder_padding_mask  # batch x seq_len
		)

它的前向传播输入形式是:

class FairseqEncoder(nn.Module):
	def forward(self, src_tokens, src_lengths=None, **kwargs):

继承了 FairseqEncoderencoder 都要是这种形式.

RNNEncoder 是基于双向 RNN 的, 初始隐状态需要双倍的:

h0 = x.new_zeros(2 * self.num_layers, bsz, self.hidden_dim)

但它的双倍是在 2 * self.num_layers, 而不是 hidden_dim 处, RNN 输出也是这样子:

x, final_hiddens = self.rnn(x, h0)
outputs = self.dropout_out_module(x)  # 输出 dropout
# outputs = [batch_size, seq_len, hid_dim * directions] 是最上層 RNN 的輸出
#  hidden = [num_layers * directions, batch_size, hid_dim]

输出 outputembedding 处, 而隐状态的 依然在 num_layers * directions 处. 在返回结果前, 对隐状态维度信息进行了变换:

# 因為 Encoder 是雙向的 RNN,所以需要將同一層兩個方向的 hidden_state 接在一起
final_hiddens = self.combine_bidir(final_hiddens, bsz)
# hidden = [num_layers x batch x num_directions*hidden]

还有一个陌生的 reorder_encoder_out(self, encoder_out, new_order) 说是 beam search 时会用到, 到时候再说吧.

3.3.2 FairseqDecoder
class RNNDecoder(FairseqIncrementalDecoder):
	def __init__(self, args, dictionary, embeddings):
		super().__init__(dictionary)
		self.embeddings = embeddings

		assert args.decoder_layers == args.encoder_layers, f"""seq2seq rnn requires that encoder 
		and decoder have same layers of rnn. got: {args.encoder_layers, args.decoder_layers}"""
		assert args.decoder_ffn_embed_dim == args.encoder_ffn_embed_dim * 2, f"""seq2seq-rnn requires
		that decoder hidden to be 2*encoder hidden dim. got: {args.decoder_ffn_embed_dim, args.encoder_ffn_embed_dim * 2}"""
		...
		self.dropout_in_module = nn.Dropout(args.dropout)
		self.rnn = nn.GRU(
			...
			batch_first=True,
			bidirectional=False  # decoder 是单向的
		)
		self.attention = AttentionLayer(
			self.embed_dim, self.hidden_dim, self.embed_dim, bias=False
		)
		# self.attention = None
		self.dropout_out_module = nn.Dropout(args.dropout)

		if self.hidden_dim != self.embed_dim:
			self.project_out_dim = nn.Linear(self.hidden_dim, self.embed_dim)
		else:
			self.project_out_dim = None

		if args.share_decoder_input_output_embed:
			self.output_projection = nn.Linear(
				self.embeddings.weight.shape[1],
				self.embeddings.weight.shape[0],
				bias=False
			)
			self.output_projection.weight = self.embeddings.weight
		else:
			self.output_projection = nn.Linear(
				self.output_embed_dim, len(dictionary), bias=False
			)
			nn.init.normal_(
				self.output_projection.weight, mean=0, std=self.output_embed_dim ** -0.5
			)

	def forward(self, prev_output_tokens, encoder_out=None, incremental_state=None, **kwargs):
		encoder_outputs, encoder_hiddens, encoder_padding_mask = encoder_out  # 取出 encoder 的輸出
		if incremental_state is not None and len(incremental_state) > 0:
			# 有上個 timestep 留下的資訊,讀進來就可以繼續 decode,不用從 bos 重來
			prev_output_tokens = prev_output_tokens[:, -1:]  # 只取最后一个? 感觉是 [:, 1:] 吧?
			cache_state = self.get_incremental_state(incremental_state, 'cached_state')
			prev_hiddens = cache_state['prev_hiddens']
		else:
			# 沒有 incremental state 代表這是 training 或者是 test time 時的第一步
			# 準備 seq2seq: 把 encoder_hiddens pass 進去 decoder 的 hidden states
			prev_hiddens = encoder_hiddens
		# embed tokens
		x = self.embeddings(prev_output_tokens)
		x = self.dropout_in_module(x)
		# 做 decoder-to-encoder attention
		if self.attention is not None:
			x, attn = self.attention(x, encoder_outputs, encoder_padding_mask)
		# 過單向RNN
		x, final_hiddens = self.rnn(x, prev_hiddens)
		# outputs = [batch, seq, hid]
		# hidden =  [num_layers * directions, batch, hid]
		x = self.dropout_out_module(x)
		# 投影到 embedding size (如果 hidden 和 embed size 不一樣,然後 share_embedding 又設成 True,需要額外 project 一次)
		if self.project_out_dim is not None:
			x = self.project_out_dim(x)
		# 投影到 vocab size 的分佈
		x = self.output_projection(x)
		# 如果是 Incremental, 記錄這個 timestep 的 hidden states, 下個 timestep 讀回來
		cache_state = {'prev_hiddens': final_hiddens}
		self.set_incremental_state(incremental_state, 'cached_state', cache_state)  # 应该是 FairseqIncrementalDecoder 中的吧

		return x, None

	def reorder_incremental_state(self, incremental_state, new_order):
		# 這個 beam search 時會用到,意義並不是很重要
		cache_state = self.get_incremental_state(incremental_state, 'cached_state')
		prev_hiddens = cache_state['prev_hiddens']
		prev_hiddens = [p.index_select(0, new_order) for p in prev_hiddens]
		cache_state = {'prev_hiddens': torch.stack(prev_hiddens)}
		self.set_incremental_state(incremental_state, 'cached_state', cache_state)
		return

首先要注意的是, seq2seq rnn 要求 encoderdecoder 具有相同的层数, 且 decoder hidden_dim 应该是 encoder hidden_dim 的两倍, 大概是因为 decoder 是单向的.

incremental_state 暂时不用管, 因为本程序中是按 incremental_state=None 处理的, 如果想了解, 请看附录中的 4. 关于 attention, 也可到附录 5 查询.

下面就是正常的 RNN 网络了, 输入是 prev_output_tokensprev_hiddens = encoder_hiddens. 不过还是想注意一下在 incremental_state is not Noneprev_hiddens = cache_state['prev_hiddens'] 是什么. 结合末尾的:

# 如果是 Incremental, 記錄這個 timestep 的 hidden states, 下個 timestep 讀回來
cache_state = {'prev_hiddens': final_hiddens}
self.set_incremental_state(incremental_state, 'cached_state', cache_state)

它是上一次运行解码器时输出的 final_hiddens. 为什么不是 encoder_hiddens 了? 猜测是因为 output_tokens(也就是解码器的输出) 太长, 被分段了, 然后分几次计算的话, 就需要解码器的上一次隐层输出.

附录

1. fairseq_cli.preprocess 对应的 Python API

对于 fairseq_cli.preprocess 命令行, 想要把该功能融合进程序中, 得把命令行内容写进一个 shell 脚本, 这里假设脚本文件名为 'tobin.sh', 然后调用 python 的 subprocess.run(...) 函数以执行 shell 脚本:

#!/bin/bash
python -m fairseq_cli.preprocess \
	--source-lang $1\
	--target-lang $2\
	--trainpref $3\
	--validpref $4\
	--testpref $5\
	--destdir $6\
	--joined-dictionary\
	--workers 4
result = subprocess.run(
	[
		'bash', './tobin.sh',
		f'{self._lang_src}',
		f'{self._lang_tgt}',
		f'{self._path_tokens}/train',
		f'{self._path_tokens}/valid',
		f'{self._path_tokens}/test',
		f'{self._path_bin}'
	],
	capture_output=True, text=True
)
if result.returncode == 0:
	print(result.stdout)
else:
	print(result.stderr)

那有没有对应的 python API 可调用呢? 答案是肯定的, 我们找到了 fairseq_cli.preprocess.py 文件, 里面是数据处理代码:

def cli_main():
	parser = options.get_preprocessing_parser()
	args = parser.parse_args()
	main(args)

操作主要在 main(args) 函数中, 其中 args 中充满了各种配置, 来自于命令行, 那我们自然可以自己构建一个 args, 然后调用 main(args). 只可惜, 我们不知道里面到底需要哪些配置, 因为 args = parser.parse_args() 中可是有很多默认配置的, 要是盲目地这么干:

args = {  # 设置参数
	'source_lang': 'en',
	'target_lang': 'zh',
	'trainpref': './data/tokens/train',
	'validpref': './data/tokens/valid',
	'testpref': './data/tokens/test',
	'destdir': './data/bin',
	'joined_dictionary': True,
	'workers': 2
}
args = argparse.Namespace(**args)  # 转换命名空间
os.makedirs(args.destdir, exist_ok=True)  # 创建输出目录
preprocess.main(args)  # 调用预处理函数

则:

AttributeError: 'Namespace' object has no attribute 'dataset_impl'

那么我们可以这么干:

import argparse
import os

from fairseq import options
from fairseq_cli import preprocess

parser = options.get_preprocessing_parser()
args = vars(parser.parse_args()).update({  # 设置参数
	'source_lang': 'en',
	'target_lang': 'zh',
	'trainpref': './data/tokens/train',
	'validpref': './data/tokens/valid',
	'testpref': './data/tokens/test',
	'destdir': './data/bin',
	'joined_dictionary': True,
	'workers': 2
})
args = argparse.Namespace(**args)  # 转换命名空间
os.makedirs(args.destdir, exist_ok=True)  # 创建输出目录

preprocess.main(args)  # 调用预处理函数

拿到默认配置, 然后用设置我们的参数.

2. fairseq_cli.preprocess 预处理时 ‘Tokenization’ 了吗?

为了验证这个问题, 我们直接将未经 sentencepiece 处理过的数据fairseq_cli.preprocess 进行二进制化:

args.update({  # 设置参数
	...
	'trainpref': f'{self._path_split}/train',
	'validpref': f'{self._path_split}/valid',
	'testpref': f'{self._path_split}/test',
	...
})

会得到:

[en] Dictionary: 846872 types
[en] ./data/split/train.en: 389051 sents, 7972723 tokens, 0.0% replaced (by <unk>)
[en] Dictionary: 846872 types
[en] ./data/split/valid.en: 3929 sents, 80476 tokens, 0.588% replaced (by <unk>)
[en] Dictionary: 846872 types
[en] ./data/split/test.en: 1000 sents, 20242 tokens, 0.558% replaced (by <unk>)
[zh] Dictionary: 846872 types
[zh] ./data/split/train.zh: 389051 sents, 2011056 tokens, 0.0% replaced (by <unk>)
[zh] Dictionary: 846872 types
[zh] ./data/split/valid.zh: 3929 sents, 20342 tokens, 36.4% replaced (by <unk>)
[zh] Dictionary: 846872 types
[zh] ./data/split/test.zh: 1000 sents, 5903 tokens, 35.7% replaced (by <unk>)

天呐, 得到了 846904 个词单元? 点击 dict.en.txt 文件, 提示文件太大无法打开. 更令人震惊的是, {valid/test}.zh 文件中 35% 以上的 tokens 被 replaced (by <unk>).

怎么办? 问了 Kimi: fairseq_cli.preprocess 分词时如何设置词单元数量?

答:
--nwordssrc 10000 \
--nwordstgt 10000 \

好吧, 试试:

args.update({  # 设置参数
	...
	'nwordssrc': 10000,
	'nwordstgt': 10000,
	...
})

输出:

[en] Dictionary: 10000 types
[en] ./data/split/train.en: 389051 sents, 7972723 tokens, 4.82% replaced (by <unk>)
[en] Dictionary: 10000 types
[en] ./data/split/valid.en: 3929 sents, 80476 tokens, 5.01% replaced (by <unk>)
[en] Dictionary: 10000 types
[en] ./data/split/test.en: 1000 sents, 20242 tokens, 4.54% replaced (by <unk>)
[zh] Dictionary: 10000 types
[zh] ./data/split/train.zh: 389051 sents, 2011056 tokens, 39.4% replaced (by <unk>)
[zh] Dictionary: 10000 types
[zh] ./data/split/valid.zh: 3929 sents, 20342 tokens, 39.5% replaced (by <unk>)
[zh] Dictionary: 10000 types
[zh] ./data/split/test.zh: 1000 sents, 5903 tokens, 38.4% replaced (by <unk>)

(⊙o⊙)?更多的 tokens 被 replaced (by <unk>) 了. 打开 dict.en.txt 文件, 发现里面是比较符合逻辑的整个单词或汉语词语. 那应该是分词算法的问题, 样本量太小, 以至于学习到的 tokens 太少, 导致 {valid/test}.zh 文件中大量 tokens 被 replaced (by <unk>).[英语还好]. 于是想起来 sentencepiece 的分词适合东亚语言, 也适合英语, 它分词的结果是字母片段, 汉字短语.

问 Kimi: 能设置设置分词算法吗?

 --tokenizer moses  # 指定分词算法, 例如 moses, nltk, space 等

好吧, 试试:

args.update({  # 设置参数
	...
	'nwordssrc': 10000,
	'nwordstgt': 10000,
	'tokenizer': 'moses'
	...
})

输出:

[en] Dictionary: 10000 types
[en] ./data/split/train.en: 389051 sents, 7972723 tokens, 4.82% replaced (by <unk>)
[en] Dictionary: 10000 types
[en] ./data/split/valid.en: 3929 sents, 80476 tokens, 5.01% replaced (by <unk>)
[en] Dictionary: 10000 types
[en] ./data/split/test.en: 1000 sents, 20242 tokens, 4.54% replaced (by <unk>)
[zh] Dictionary: 10000 types
[zh] ./data/split/train.zh: 389051 sents, 2011056 tokens, 39.4% replaced (by <unk>)
[zh] Dictionary: 10000 types
[zh] ./data/split/valid.zh: 3929 sents, 20342 tokens, 39.5% replaced (by <unk>)
[zh] Dictionary: 10000 types
[zh] ./data/split/test.zh: 1000 sents, 5903 tokens, 38.4% replaced (by <unk>)

很遗憾, 情况没有好转, 试试其他算法 nltk, space 也都没好转. 当使用 'tokenizer': 'sentencepiece' 时, 告诉我只支持 moses, nltk, space 三种分词工具. 汉语的话, 还是老老实实用 sentencepiece.

3. 二进制文件的读取过程

首先看 .idx 文件的读取:

class Index:
	def __init__(self, path):
		with open(path, "rb") as stream:
			magic_test = stream.read(9)
			...
			version = struct.unpack("<Q", stream.read(8))
			...
			(dtype_code,) = struct.unpack("<B", stream.read(1))
			...
			self._len = struct.unpack("<Q", stream.read(8))[0]
			offset = stream.tell()
		...
		self._bin_buffer_mmap = np.memmap(path, mode="r", order="C")
		self._bin_buffer = memoryview(self._bin_buffer_mmap)
		self._sizes = np.frombuffer(
			self._bin_buffer, dtype=np.int32, count=self._len, offset=offset
		)
		self._pointers = np.frombuffer(self._bin_buffer, dtype=np.int64, count=self._len, offset=offset + self._sizes.nbytes)

	@lru_cache(maxsize=8)
	def __getitem__(self, i):
		return self._pointers[i], self._sizes[i]

首先以 'rb' 模式打开了 '.idx' 文件, 依次读取了 9, 8, 1, 8 个字节的内容, 分别是 magic_test 用于检查存储方式是否符合本类的读取方式; version 记录存储文件版本, dtype_code 是文件中数据的类型; self._len 是文件的数据长度. offset = stream.tell() 是数据开始的位置.

好! 在读取了文件的元信息之后, 后面的四行代码我们求助通义千问:

这段代码展示了如何使用 NumPymemmap 功能结合 memoryviewNumPy 的数组操作, 高效地从磁盘上的二进制文件中读取和处理数据. 下面是对每一步的详细解释:

=== 创建内存映射 ===

self._bin_buffer_mmap = np.memmap(path, mode="r", order="C")

创建了一个内存映射文件 self._bin_buffer_mmap, 将磁盘上的二进制文件 path 映射到内存中, 允许像访问内存中的数组那样访问文件内容. mode="r" 表示只读模式, order="C" 指定数据的布局为 C 风格的行优先顺序.

=== 创建 memoryview ===

self._bin_buffer = memoryview(self._bin_buffer_mmap)

这一行创建了一个 memoryview 对象 self._bin_buffer, 它提供了对 self._bin_buffer_mmap低级内存视图, 允许对内存映射文件进行高效、低开销的切片操作, 且不会复制数据, 这对于处理大型数据集特别有用.

=== buffer 中读取数据 ===
接下来的两行代码使用 np.frombuffer 方法self._bin_buffer 中读取数据:

self._sizes = np.frombuffer(
	self._bin_buffer, dtype=np.int32, count=self._len, offset=offset
)
self._pointers = np.frombuffer(self._bin_buffer, dtype=np.int64, count=self._len, offset=offset + self._sizes.nbytes)

self._sizes 读取了数据集中每个样本的大小信息, 存储为 np.int32 类型的数组. count=self._len 表示读取 self._len 个这样的元素, offset=offset 指定从 self._bin_buffer 的哪个位置开始读取.

self._pointers 读取了指向每个样本起始位置的指针, 存储为 np.int64 类型的数组. 读取的起点是从 self._sizes 数组的末尾开始, 即 offset + self._sizes.nbytes.

根据上面通义千问的解释, 我们可以认为, self._pointers, self._sizes 分别是存储了数据位置和大小的两个 numpy 数组, 只不过它们是由内存映射实现的. 然后就可以 return self._pointers[i], self._sizes[i] 作为 __getitem__ 的结果了.

4. Python 类的继承与装饰器

在上面的 class RNNDecoder(FairseqIncrementalDecoder) 中, 发现有两个方法:

self.get_incremental_state(incremental_state, 'cached_state')
self.set_incremental_state(incremental_state, 'cached_state', cache_state)

按住 ctrl 再点击它们, 是找不到它们的源头的, 为何呀? RNNDecoder 中未定义它们, 找到 FairseqIncrementalDecoder, 发现其中也未定义它们. 仔细观察, 才发现:

@with_incremental_state
class FairseqIncrementalDecoder(FairseqDecoder):

FairseqIncrementalDecoder 上有一个叫 with_incremental_state 的装饰器, 点进去看一看:

def with_incremental_state(cls):
	cls.__bases__ = (FairseqIncrementalState,) + tuple(
		b for b in cls.__bases__ if b != FairseqIncrementalState
	)
	return cls

一个以 cls 为参数的函数, 返回的也是 cls, 作用就是扩展 cls 的基类 cls.__bases__, 它是一个元组, 记录着 cls 的基类列表, FairseqIncrementalState 被加入到了其中. 回想 decorator & register, 当 import 其所在模块时, 装饰器 with_incremental_state 会被执行, 传入实参 FairseqIncrementalDecoder, 使之继承基类 FairseqIncrementalState, 相当于:

class FairseqIncrementalDecoder(FairseqDecoder, FairseqIncrementalState):

那么 RNNDecoder 也成了 FairseqIncrementalState 的子类. 再看一看 FairseqIncrementalState:

class FairseqIncrementalState(object):
	def __init__(self, *args, **kwargs):
		super().__init__(*args, **kwargs)
		self.init_incremental_state()

	def init_incremental_state(self):
		self._incremental_state_id = str(uuid.uuid4())

	def _get_full_incremental_state_key(self, key: str) -> str:
		return "{}.{}".format(self._incremental_state_id, key)

	def get_incremental_state(
		self,
		incremental_state: Optional[Dict[str, Dict[str, Optional[Tensor]]]],
		key: str,
	) -> Optional[Dict[str, Optional[Tensor]]]:
		"""Helper for getting incremental state for an nn.Module."""
		full_key = self._get_full_incremental_state_key(key)
		if incremental_state is None or full_key not in incremental_state:
			return None
		return incremental_state[full_key]

	def set_incremental_state(
		self,
		incremental_state: Optional[Dict[str, Dict[str, Optional[Tensor]]]],
		key: str,
		value: Dict[str, Optional[Tensor]],
	) -> Optional[Dict[str, Dict[str, Optional[Tensor]]]]:
		"""Helper for setting incremental state for an nn.Module."""
		if incremental_state is not None:
			full_key = self._get_full_incremental_state_key(key)
			incremental_state[full_key] = value
		return incremental_state

简单地说一下什么是 incremental_state, 从 get_incremental_state 的参数来看, 它是:

incremental_state: Optional[Dict[str, Dict[str, Optional[Tensor]]]]

一个双层的字典, 双层关键字都是 str, incremental_state[full_key]: Dict[str, Optional[Tensor]] 是返回的结果, 而 full_key 是:

def _get_full_incremental_state_key(self, key: str) -> str:
	return "{}.{}".format(self._incremental_state_id, key)  # self._incremental_state_id = str(uuid.uuid4())

可见每个 Decoder 维持着自己的多个 incremental_state[full_key] 状态.

5. Attention

注意力机制很常见, 这里解析上面出现的:

self.attention = AttentionLayer(
	self.embed_dim, self.hidden_dim, self.embed_dim, bias=False
)

x, attn = self.attention(x, encoder_outputs, encoder_padding_mask)

Attention 的代码是这样的:

class AttentionLayer(nn.Module):
	def __init__(self, input_embed_dim, source_embed_dim, output_embed_dim, bias=False):
		super().__init__()

		self.input_proj = nn.Linear(input_embed_dim, source_embed_dim, bias=bias)
		self.output_proj = nn.Linear(
			input_embed_dim + source_embed_dim, output_embed_dim, bias=bias
		)

	def forward(self, inputs, encoder_outputs, encoder_padding_mask):
		"""
		:param inputs: B x T x dim
		:param encoder_outputs: B x S x dim
		:param encoder_padding_mask: B x S
		:return: 相当于重构 inputs, 每个 t 相当于 S 个 encoder_outputs 的加权平均, 权重为 t 和 encoder_outputs 注意力分数
		"""
		x = self.input_proj(inputs)  # 投影到 encoder_outputs 的維度
		# >>> 計算 attention >>>
		attn_scores = torch.bmm(x, encoder_outputs.transpose(1, 2))  # (B,T,dim) x (B,dim,S) = (B,T,S)
		if encoder_padding_mask is not None:  # 擋住 padding 位置的 attention
			encoder_padding_mask = encoder_padding_mask.unsqueeze(1)  # (B,S) -> (B,1,S)
			# FP16 support: cast to float and back
			attn_scores = attn_scores.float().masked_fill_(encoder_padding_mask, -torch.inf).type_as(attn_scores)
		attn_scores = func.softmax(attn_scores, dim=-1)  # exp(-inf) = 0
		x = torch.bmm(attn_scores, encoder_outputs)  # 形狀 (B,T,S) x (B,S,dim) = (B,T,dim) 加權平均
		# (B,T,dim)
		x = torch.cat([x, inputs], dim=-1)
		x = torch.tanh(self.output_proj(x))  # concat + linear + tanh
		# <<< 計算 attention <<<
		return x, attn_scores

首先, 通过 self.input_proj(inputs)input 投影到 encoder_outputs 的空间(就是维度的变化), 然后计算 inputs[i]encoder_outputs[j] 两两之间的内积(相似度?), 再通过 softmax 计算权重, 这个权重是每个 encoder_outputs[j] 用于线性重组 input[i] 的权重. 示意图如下:
在这里插入图片描述

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐