“鱼书”深度学习进阶笔记(3)第四章

发布于:2025-08-12 ⋅ 阅读:(19) ⋅ 点赞:(0)

最近在看斋藤康毅的《深度学习进阶:自然语言处理》,以下按章节做一点笔记。
这本书是《深度学习入门:基于Python的理论与实现》的续作,针对自然语言处理和时序数据处理。如对“鱼书”第一本的笔记感兴趣,可看我之前做的笔记。

01 第四章:word2vec的高速化

本章将重点放在word2vec的加速上,来改善word2vec。
具体而言,将对上一章中简单的word2vec进行两点改进:引入名为Embedding层的新层,以及引入名为Negative Sampling的新损失函数。
完成优化后,将在PTB数据集(一个大小比较实用的语料库)上进行学习,并实际评估所获得的单词的分布式表示的优劣。

1.1 word2vec的改进(1)

在上一章提到的结构中,如果我们要处理的词汇量有100万及以上(说明输入层和输出层存在100万个神经元),中间层的神经元有100个。中间计算会花费很长时间。
具体来说,以下两个地方的计算会出现瓶颈。

  • 输入层的one-hot表示和权重矩阵Win的乘积(第1节解决)
  • 中间层和权重矩阵Wout的乘积以及Softmax层的计算(第2节解决)

第1个问题与输入层的one-hot表示有关:
这是因为用one-hot表示来处理单词,随着词汇量的增加,one-hot表示的向量大小也会增加。
此外,还需要计算one-hot表示和权重矩阵Win的乘积,这也要花费大量的计算资源。
使用embedding层来解决。

第2个问题是中间层之后的计算。
首先,中间层和权重矩阵Wout的乘积需要大量的计算。
其次,随着词汇量的增加,Softmax层的计算量也会增加。
引入Negative Sampling(负采样)这一新的损失函数来解决。

1.1.1 embedding层

在上一章的逻辑中,我们将单词转化成了one-hot表示,将其输入到了MatMul层,在MatMul层中计算了该one-hot表示和权重矩阵的乘积。
这里,假设词汇量是100万,中间层的神经元个数是100,则MatMul层中的矩阵乘积如下图所示:
解释
我们需要计算这个巨大向量和权重矩阵的乘积。
如上图的运算,本质上是把矩阵特定的行取出来。因此,直觉上将单词转化为one-hot向量的处理和MatMul层中的矩阵乘法似乎没有必要。

因此,我们创建一个从权重参数中抽取“单词ID对应行(向量)​”的层,称为Embedding层
**Embedding来自“词嵌入”​(word embedding)**这一术语。即,在这个Embedding层存放词嵌入(分布式表示)​。
一点科普

1.1.2 embedding层的实现

实现代码如下:

class Embedding:
    def __init__(self, W):
        self.params = [W]
        self.grads = [np.zeros_like(W)]
        self.idx = None  # 用来保存前向传播时输入的索引(单词ID),初始化为 None

    def forward(self, idx):
        W, = self.params
        self.idx = idx  # 保存索引,方便后向传播用。
        out = W[idx]  # 根据索引选出对应的词向量。例如 idx 是 [3, 7],就选 W 中第3行和第7行,形成对应的词向量输出。
        return out

    def backward(self, dout):
        dW, = self.grads  # 用逗号解包 dW, = ...,拿到的是这个梯度数组的引用(不是拷贝).所以 dW 和 self.grads[0] 指向同一个内存地址
        dW[...] = 0  # 数组所有元素替换为 0,但保留原数组的内存地址
        np.add.at(dW, self.idx, dout)  # 把 dout 对应的梯度,累加到 dW 中对应索引行的位置
        return None

注意,这里直接用书里的代码,因为表达方式现在不用,会有如下报错,进行如下替代即可:
报错

反向传播会稍微难以理解一点。
Embedding层的正向传播只是从权重矩阵W中提取特定的行,并将该特定行的神经元原样传给下一层。
在反向传播时,从上一层(输出侧的层)传过来的梯度,将原样传给下一层(输入侧的层)​。不过,从上一层传来的梯度会被应用到权重梯度dW的特定行(idx)​。
如下图所示:
正向与反向传播的比较
关于反向传播dW的处理,有一点很有趣的设置,讨论情况如下:
讨论1
方式2

1.2 word2vec的改进(2)

1.2.1 中间层之后的计算问题

目前,在以下两个地方还需要较多时间:

  • 中间层的神经元和权重矩阵(Wout)的乘积
  • Softmax层的计算

1.2.2 从多分类到二分类

负采样的关键思想是二分类(binary classification),更准确地说,是用二分类来拟合多分类(multiclass classification)​。

如何思考将多分类问题转化为二分类问题呢?
首先,我们先考察一个可以用“Yes/No”来回答的问题。
比如,让神经网络来回答“当上下文是you和goodbye时,目标词是say吗?​”这个问题,这时输出层只需要一个神经元即可。可以认为输出层的神经元输出的是say的得分。
逻辑图
如上图,输出层的神经元仅有一个。
因此,要计算中间层和输出侧的权重矩阵的乘积,只需要提取say对应的列(单词向量)​,并用它与中间层的神经元计算内积即可。

1.2.3 多分类到二分类的实现

之前,我们清楚二分类神经网络的老套路:使用sigmoid函数将得分转化为概率,使用交叉熵误差作为损失函数。 在多分类里面的套路:输出层使用Softmax函数将得分转化为概率,损失函数使用交叉熵误差

转化为二分类的基本结构如下:
基本结构
这里,将中间层的神经元记为h,并计算它与输出侧权重Wout中的单词say对应的单词向量的内积。然后,将其输出输入Sigmoid with Loss层,得到最终的损失。

在上图中,向Sigmoid with Loss层输入正确解标签1,这意味着现在正在处理的问题的答案是“Yes”​。当答案是“No”时,向Sigmoid with Loss层输入0。

为了进一步简化,引入Embedding Dot层,该层将上图中的Embedding层和dot运算(内积)合并起来处理。则流程图转变为:
流程图
实现代码:

class EmbeddingDot:
    def __init__(self, W):
        self.embed = Embedding(W)  # embed保存Embedding层
        self.params = self.embed.params
        self.grads = self.embed.grads
        self.cache = None  # 占位,用来在 forward 保存正向计算中需要在 backward 使用的中间值(cache)

    def forward(self, h, idx):
        target_W = self.embed.forward(idx)
        out = np.sum(target_W * h, axis=1)

        self.cache = (h, target_W)
        return out

    def backward(self, dout):
        h, target_W = self.cache  # 取出 forward 时保存的中间量
        dout = dout.reshape(dout.shape[0], 1)

        dtarget_W = dout * h
        self.embed.backward(dtarget_W)
        dh = dout * target_W
        return dh

对于前向传播,下面的例子会更有利于理解,如下图:
理解
关于后向传播的理解,可参考下面:
解释

1.2.4 负采样

前面的代码只学习了正例(正确答案),那负例(错误答案)会有怎样的结果呢?
我们想要实现的,(基于前面的例子)对于正例(say)​,使Sigmoid层的输出接近1;对于负例(say以外的单词)​,使Sigmoid层的输出接近0。
就像下图:
图示
提问,我们需要让所有负例都进行学习吗?
采用一种近似方法,选择若干个(5个或者10个)负例(如何选择将在下文介绍)​。也就是说,只使用少数负例。这就是负采样方法的含义。
将这些数据(正例和采样出来的负例)的损失加起来,将其结果作为最终的损失

再次提问,如何抽取负例呢?
关于这一点,基于语料库的统计数据进行采样的方法比随机抽样要好
即,就是让语料库中经常出现的单词容易被抽到,让语料库中不经常出现的单词难以被抽到。

基于语料库中单词使用频率的采样方法,会先计算语料库中各个单词的出现次数,并将其表示为“概率分布”​,然后使用这个概率分布对单词进行采样。

一些注意:
注意事项
word2vec中提出的负采样对刚才的概率分布增加了一个步骤。对原来的概率分布取0.75次方:
概率函数
提问,为什么进行如此操作呢?
这是为了防止低频单词被忽略。更准确地说,通过取0.75次方,低频单词的概率将稍微变高。(书中提到,0.75这个值并没有什么理论依据,也可以设置成0.75以外的值)

上面的这种实现方法被称为UnigramSampler类
unigram是“1个(连续)单词”的意思。,bigram是“2个连续单词”的意思,trigram是“3个连续单词”的意思。
这里使用UnigramSampler这个名字,是因为我们以1个单词为对象创建概率分布。
如果是bigram,则以‘( you’, ‘say’)​、‘(you’,‘goodbye’)……这样的2个单词的组合为对象创建概率分布。

实现代码为:

import collections
class UnigramSampler:
    def __init__(self, corpus, power, sample_size):  # 单词ID列表格式的corpus、对概率分布取的次方值power(默认值是0.75)和负例的采样个数sample_size
        self.sample_size = sample_size  # 将sample_size保存为实例变量
        self.vocab_size = None
        self.word_p = None

        counts = collections.Counter()  # 创建一个Counter对象用于计数
        for word_id in corpus:
            counts[word_id] += 1  # 遍历语料库中的每个单词id,同级每个单词id出现的次数+1

        vocab_size = len(counts)  # 词汇表大小等于不同单词ID的数量
        self.vocab_size = vocab_size  # 将词汇表大小保存为实例变量

        self.word_p = np.zeros(vocab_size)
        for i in range(vocab_size):
            self.word_p[i] = counts[i]  # 遍历每个单词,将每个单词的出现次数存入数组

        self.word_p = np.power(self.word_p, power)  # 对每个单词的计数取power次方
        self.word_p /= np.sum(self.word_p)  # 归一化处理,得到概率分布

    def get_negative_sample(self, target):
        batch_size = target.shape[0]
        # 使用 NumPy 的 random.choice 从 [0, 1, ..., vocab_size-1] 中按概率 self.word_p 抽样
        negative_sample = np.random.choice(self.vocab_size, size=(batch_size, self.sample_size),
                                               replace=True, p=self.word_p)
        # size=(batch_size, self.sample_size)表示要生成一个二维数组,每行是一个样本的负例。
        # replace=True表示允许重复选择(即同一个单词可以被多次选作负例)
        # p = self.word_p表示按照之前计算的概率分布进行采样

        return negative_sample

接着实现负采样:


class SigmoidWithLoss:
    def __init__(self):
        self.params, self.grads = [], []
        self.loss = None
        self.y = None  # sigmoid的输出
        self.t = None  # 监督标签

    def forward(self, x, t):
        self.t = t
        self.y = 1 / (1 + np.exp(-x))

        self.loss = cross_entropy_error(np.c_[1 - self.y, self.y], self.t)

        return self.loss

    def backward(self, dout=1):
        batch_size = self.t.shape[0]

        dx = (self.y - self.t) * dout / batch_size
        return dx


class NegativeSamplingLoss:
    def __init__(self, W, corpus, power=0.75, sample_size=5):
        self.sample_size = sample_size  # 保存负例样本数量
        self.sampler = UnigramSampler(corpus, power, sample_size)  # 创建负采样器实例
        self.loss_layers = [SigmoidWithLoss() for _ in range(sample_size + 1)]  # 创建损失层列表,数量为sample_size + 1(1 个正例 + sample_size 个负例),每个都是SigmoidWithLoss层(结合了 Sigmoid 激活和交叉熵损失)
        self.embed_dot_layers = [EmbeddingDot(W) for _ in range(sample_size + 1)]  # 创建嵌入点积层列表,数量同样为sample_size + 1,每个都是EmbeddingDot层(用于计算嵌入向量的点积)

        self.params, self.grads = [], []
        for layer in self.embed_dot_layers:  # 遍历所有嵌入点积层
            self.params += layer.params
            self.grads += layer.grads

    def forward(self, h, target):
        batch_size = target.shape[0]  # 获取批次大小
        negative_sample = self.sampler.get_negative_sample(target)  # 使用之前创建的采样器生成负例样本,形状为(batch_size, sample_size)

        # 正例的正向传播
        score = self.embed_dot_layers[0].forward(h, target)  # 使用第一个EmbeddingDot层计算正例的得分(中心词嵌入与目标词嵌入的点积)
        correct_label = np.ones(batch_size, dtype=np.int32)  # 创建正例的标签数组,全部为 1(表示 "相关")
        loss = self.loss_layers[0].forward(score, correct_label)  # 使用第一个损失层计算正例的损失(Sigmoid 交叉熵损失)

        # 负例的正向传播
        negative_label = np.zeros(batch_size, dtype=np.int32)  # 创建负例的标签数组,全部为 0(表示 "不相关")
        for i in range(self.sample_size):
            negative_target = negative_sample[:, i]  # 获取第i个负例
            score = self.embed_dot_layers[1 + i].forward(h, negative_target)  # 使用对应的EmbeddingDot层计算该负例的得分
            loss += self.loss_layers[1 + i].forward(score, negative_label)  # 计算该负例的损失并累加到总损失中

        return loss

    def backward(self, dout=1):
        dh = 0

        # zip() 是一个内置函数,用于将多个可迭代对象(如列表、元组等)“打包” 成一个迭代器,每次迭代会同时从每个可迭代对象中取出一个元素,组成一个元组
        for l0, l1 in zip(self.loss_layers, self.embed_dot_layers):  # 同时遍历损失层和嵌入点积层
            dscore = l0.backward(dout)  # 给出损失对score的梯度
            dh += l1.backward(dscore)  # 计算并把对 W 的梯度累加到 embedding 参数(通过 Embedding.backward 的 scatter_add)
            # 所以这里把每个正/负例对 h 的贡献加起来,累积到 dh。最终 dh 是来自正例和所有负例对 h 的总梯度

        return dh # 返回隐藏层的梯度

1.3 改进版word2vec的学习

1.3.1 CBOW模型的实现

改进上一章的简单的SimpleCBOW类:

  1. 使用Embedding层和Negative Sampling Loss层;
  2. 将上下文部分扩展为可以处理任意的窗口大小。
class CBOW:
    def __init__(self, vocab_size, hidden_size, window_size, corpus):
        V, H = vocab_size, hidden_size

        # 初始化权重
        W_in = 0.01 * np.random.randn(V, H).astype('f')
        W_out = 0.01 * np.random.randn(V, H).astype('f')

        # 生成层
        self.in_layers = []
        for i in range(2 * window_size):
            layer = Embedding(W_in)  # 使用Embedding层
            self.in_layers.append(layer)
        self.ns_loss = NegativeSamplingLoss(W_out, corpus, power=0.75, sample_size=5)

        # 将所有的权重和梯度整理到列表中
        layers = self.in_layers + [self.ns_loss]
        self.params, self.grads = [], []
        for layer in layers:  # 将神经网络中使用的参数和梯度放入成员变量params和grads中
            self.params += layer.params
            self.grads += layer.grads

        # 将单词的分布式表示设置为成员变量
        self.word_vecs = W_in

    def forward(self, contexts, target):
        h = 0
        for i, layer in enumerate(self.in_layers):
            h += layer.forward(contexts[:, i])
        h *= 1 / len(self.in_layers)
        loss = self.ns_loss.forward(h, target)
        return loss

    def backward(self, dout=1):
        dout = self.ns_loss.backward(dout)
        dout *= 1 / len(self.in_layers)
        for layer in self.in_layers:
            layer.backward(dout)
        return None

实现和训练方法:

# 设定超参数
window_size = 5
hidden_size = 100
batch_size = 100
max_epoch = 10

import pickle
import sys,os
import urllib.request

sys.path.append('..')
url_base = 'https://raw.githubusercontent.com/tomsercu/lstm/master/data/'
key_file = {  # 原始文本文件名映射(训练/测试/验证)
    'train':'ptb.train.txt',
    'test':'ptb.test.txt',
    'valid':'ptb.valid.txt'
}
save_file = {  # 将处理后的 numpy 数组保存成 .npy 的文件名映射
    'train':'ptb.train.npy',
    'test':'ptb.test.npy',
    'valid':'ptb.valid.npy'
}
vocab_file = 'ptb.vocab.pkl'  # 保存词表映射(word_to_id, id_to_word)的文件名

dataset_dir = os.path.dirname(os.path.abspath(__file__))  # 当前脚本所在目录


def _download(file_name):
    file_path = dataset_dir + '/' + file_name  # 构建本地目标文件路径
    if os.path.exists(file_path):  # 如果已经存在就直接返回,不重复下载
        return

    print('Downloading ' + file_name + ' ... ')  # 提示用户开始下载

    try:
        urllib.request.urlretrieve(url_base + file_name, file_path)  # 尝试直接从 GitHub 下载
    except urllib.error.URLError:
        import ssl
        ssl._create_default_https_context = ssl._create_unverified_context  # 忽略 SSL 验证(必要时)
        urllib.request.urlretrieve(url_base + file_name, file_path)  # 再次下载(有时企业/环境的证书会阻止)
    print('Done')


def load_vocab():
    vocab_path = dataset_dir + '/' + vocab_file

    if os.path.exists(vocab_path):
        with open(vocab_path, 'rb') as f:
            word_to_id, id_to_word = pickle.load(f)
        return word_to_id, id_to_word

    word_to_id = {}  # 新建空字典:词->id
    id_to_word = {}  # 新建空字典:id->词
    data_type = 'train'  # 用训练集文本来构建词表(通常是最大的语料)
    file_name = key_file[data_type]  # 对应的文件名
    file_path = dataset_dir + '/' + file_name

    _download(file_name)  # 确保原始文本在本地(会跳过已存在的情况)

    words = open(file_path).read().replace('\n', '<eos>').strip().split()
    # 打开训练文本,读取为字符串:
    # 1) 把换行符替换成特殊标记 '<eos>'(end-of-sentence)
    # 2) strip() 去掉首尾空白
    # 3) split() 把字符串按空白分割成词列表

    for i, word in enumerate(words):
        if word not in word_to_id:
            tmp_id = len(word_to_id)
            word_to_id[word] = tmp_id
            id_to_word[tmp_id] = word

    with open(vocab_path, 'wb') as f:
        pickle.dump((word_to_id, id_to_word), f)  # 把构建好的词表保存到本地,方便下次直接加载

    return word_to_id, id_to_word


def load_data(data_type='train'):
    '''
        :param data_type: 数据的种类:'train' or 'test' or 'valid (val)'
        :return:
    '''
    if data_type == 'val': data_type = 'valid'
    save_path = dataset_dir + '/' + save_file[data_type]

    word_to_id, id_to_word = load_vocab()

    if os.path.exists(save_path):
        corpus = np.load(save_path)
        return corpus, word_to_id, id_to_word

    file_name = key_file[data_type]
    file_path = dataset_dir + '/' + file_name
    _download(file_name)

    words = open(file_path).read().replace('\n', '<eos>').strip().split()
    corpus = np.array([word_to_id[w] for w in words])

    np.save(save_path, corpus)
    return corpus, word_to_id, id_to_word


# 读入数据
corpus, word_to_id, id_to_word = load_data('train')  # 加载训练语料(id 序列)和词表
vocab_size = len(word_to_id)                         # 词汇表大小(V)

contexts, target = create_contexts_target(corpus, window_size)
# create_contexts_target: (外部函数)把连续的 corpus 转成 CBOW/skip-gram 所需的上下文矩阵 contexts
# 和中心词向量 target。通常:
# - contexts 形状为 (N, 2*window_size)
# - target 形状为 (N,)

# 生成模型等
model = CBOW(vocab_size, hidden_size, window_size, corpus)
# model = SkipGram(vocab_size, hidden_size, window_size, corpus)
optimizer = Adam()
trainer = Trainer(model, optimizer)

# 开始学习
trainer.fit(contexts, target, max_epoch, batch_size)
trainer.plot()

# 保存必要数据,以便后续使用
word_vecs = model.word_vecs
params = {}
params['word_vecs'] = word_vecs.astype(np.float16)
params['word_to_id'] = word_to_id
params['id_to_word'] = id_to_word
pkl_file = 'cbow_params.pkl'  # or 'skipgram_params.pkl'
with open(pkl_file, 'wb') as f:
    pickle.dump(params, f, -1)   # -1 指用最高协议保存(兼容性与压缩)

会跑10个epoch,如果想快点看到结果,可自己修改小一点。

一般而言,当窗口大小为2~10、中间层的神经元个数(单词的分布式表示的维数)为50~500时,结果会比较好。
在学习结束后,取出权重(输入侧的权重)​,并保存在文件中以备后用(用于单词和单词ID之间的转化的字典也一起保存)​。
上面代码中,使用Python的pickle功能进行文件保存。pickle可以将Python代码中的对象保存到文件中(或者从文件中读取对象)​。

提醒
得到的迭代图为:
迭代图

1.3.2 CBOW模型的评价

利用前面章节实现的most_similar(​)函数,显示几个单词的最接近的单词:


def cos_similarity(x, y, eps=1e-8):
    '''计算余弦相似度

    :param x: 向量
    :param y: 向量
    :param eps: 用于防止“除数为0”的微小值
    :return:
    '''
    nx = x / (np.sqrt(np.sum(x ** 2)) + eps)
    ny = y / (np.sqrt(np.sum(y ** 2)) + eps)
    return np.dot(nx, ny)


def most_similar(query, word_to_id, id_to_word, word_matrix, top=5):
    '''相似单词的查找

    :param query: 查询词
    :param word_to_id: 从单词到单词ID的字典
    :param id_to_word: 从单词ID到单词的字典
    :param word_matrix: 汇总了单词向量的矩阵,假定保存了与各行对应的单词向量
    :param top: 显示到前几位
    '''
    if query not in word_to_id:
        print('%s is not found' % query)
        return

    print('\n[query] ' + query)
    query_id = word_to_id[query]
    query_vec = word_matrix[query_id]

    vocab_size = len(id_to_word)

    similarity = np.zeros(vocab_size)
    for i in range(vocab_size):
        similarity[i] = cos_similarity(word_matrix[i], query_vec)

    count = 0
    for i in (-1 * similarity).argsort():
        if id_to_word[i] == query:
            continue
        print(' %s: %s' % (id_to_word[i], similarity[i]))

        count += 1
        if count >= top:
            return

pkl_file = 'cbow_params.pkl'
# pkl_file = 'skipgram_params.pkl'

with open(pkl_file, 'rb') as f:
    params = pickle.load(f)
    word_vecs = params['word_vecs']
    word_to_id = params['word_to_id']
    id_to_word = params['id_to_word']

# most similar task
querys = ['you', 'year', 'car', 'toyota']
for query in querys:
    most_similar(query, word_to_id, id_to_word, word_vecs, top=5)

由word2vec获得的单词的分布式表示不仅可以将近似单词聚拢在一起,还可以捕获更复杂的模式,其中一个具有代表性的例子是因“king-man+woman=queen”而出名的类推问题(类比问题)​。
即,使用word2vec的单词的分布式表示,可以通过向量的加减法来解决类推问题
向量关系
同样,也可以思考,这样的逻辑是否也可以推理出单数与复数、原型和比较级、现在时和过去时等等的词汇呢。
上面代码的结果展示:
结果1
结果2

1.4 word2vec相关的其他话题

使用word2vec获得的单词的分布式表示可以用来查找近似单词。
单词的分布式表示还可以用在迁移学习上。
关于这点如何理解呢,gpt老师是这样解释的:
理解1
理解2
理解3
在解决自然语言处理任务时,一般不会使用word2vec从零开始学习单词的分布式表示,而是先在大规模语料库(Wikipedia、Google News等文本数据)上学习,然后将学习好的分布式表示应用于某个单独的任务。

常用逻辑:
常用逻辑