NLP(16)--生成式任务

发布于:2024-05-19 ⋅ 阅读:(189) ⋅ 点赞:(0)

前言

仅记录学习过程,有问题欢迎讨论

  输入输出均为不定长序列(seq2seq)

自回归语言模型:

  • x 为 str[start : end ]; y为 [start+1 : end +1] 同时训练多个字,逐字计算交叉熵

encode-decode结构:

  • Encoder将输入转化为向量或矩阵,其中包含了输入中的信息
  • Decoder将Encoder的输出转化为输出

attention机制

  • 输入和输出应该和重点句子强相关,给输入加权(所以维度应该和输入的size一致)
  • 在这里插入图片描述

Teacher forcing

  • 使用真实标签作为下一个输入(自回归语言模型就是使用的teacher forcing)

Transform结构

  • Query来自Decode ,KV来自Encode
  • 在这里插入图片描述

使用Mask Attation 来避免对output做计算时,获取了所有的信息。只使用当前的位置对应的output信息。(自回归模型,先mask,然后在softmax)
在这里插入图片描述

评价指标:

  • BLEU:按照输出的字符计算一系列的数学(惩罚机制,Ngrim)计算来评价相似性

采样:

  • Beam size:
    保留概率最大的n条路径

  • Temperature Sampling
    根据概率分布生成下一个词,通过参数T,T越大,结果越随机,分布更均匀

  • TOP-P/K
    采样先按概率从大到小排序,累加概率不超过P的范围中选
    采样从TOP-K中采样下一个词

代码

使用bert实现自回归训练模型,
添加mask attention 来实现

# coding:utf8

import torch
import torch.nn as nn
import numpy as np
import math
import random
import os
import re

from transformers import BertModel, BertTokenizer

"""
基于pytorch的LSTM语言模型
"""


class LanguageModel(nn.Module):
    def __init__(self, input_dim, vocab_size):
        super(LanguageModel, self).__init__()
        # self.embedding = nn.Embedding(len(vocab), input_dim)
        # self.layer = nn.LSTM(input_dim, input_dim, num_layers=1, batch_first=True)
        self.bert = BertModel.from_pretrained(r"D:\NLP\video\第六周\bert-base-chinese", return_dict=False)
        self.classify = nn.Linear(input_dim, vocab_size)
        # self.dropout = nn.Dropout(0.1)
        self.loss = nn.functional.cross_entropy

    # 当输入真实标签,返回loss值;无真实标签,返回预测值
    def forward(self, x, y=None):
        # x = self.embedding(x)  # output shape:(batch_size, sen_len, input_dim)
        # 使用mask来防止提前预知结果
        if y is not None:
            # 构建一个下三角的mask
            # bert的mask attention 为(batch_size, vocab_size, vocab_size) L*L
            mask = torch.tril(torch.ones(x.shape[0], x.shape[1], x.shape[1]))
            print(mask)
            x, _ = self.bert(x, attention_mask=mask)
            y_pred = self.classify(x)
            return self.loss(y_pred.view(-1, y_pred.shape[-1]), y.view(-1))
        else:
            x = self.bert(x)[0]
            y_pred = self.classify(x)
            return torch.softmax(y_pred, dim=-1)


# 加载字表
def build_vocab(vocab_path):
    vocab = {"<pad>": 0}
    with open(vocab_path, encoding="utf8") as f:
        for index, line in enumerate(f):
            char = line[:-1]  # 去掉结尾换行符
            vocab[char] = index + 1  # 留出0位给pad token
    return vocab


# 加载语料
def load_corpus(path):
    corpus = ""
    with open(path, encoding="utf8") as f:
        for line in f:
            corpus += line.strip()
    return corpus


# 随机生成一个样本
# 从文本中截取随机窗口,前n个字作为输入,最后一个字作为输出
def build_sample(tokenizer, window_size, corpus):
    start = random.randint(0, len(corpus) - 1 - window_size)
    end = start + window_size
    window = corpus[start:end]
    target = corpus[start + 1:end + 1]  # 输入输出错开一位
    # print(window, target)
    # 中文的文本转化为tokenizer的id
    input_ids_x = tokenizer.encode(window, add_special_tokens=False, padding='max_length', truncation=True,
                                   max_length=10)
    input_ids_y = tokenizer.encode(target, add_special_tokens=False, padding='max_length', truncation=True,
                                   max_length=10)
    return input_ids_x, input_ids_y


# 建立数据集
# sample_length 输入需要的样本数量。需要多少生成多少
# vocab 词表
# window_size 样本长度
# corpus 语料字符串
def build_dataset(sample_length, tokenizer, window_size, corpus):
    dataset_x = []
    dataset_y = []
    for i in range(sample_length):
        x, y = build_sample(tokenizer, window_size, corpus)
        dataset_x.append(x)
        dataset_y.append(y)
    return torch.LongTensor(dataset_x), torch.LongTensor(dataset_y)


# 建立模型
def build_model(vocab_size, char_dim):
    model = LanguageModel(char_dim, vocab_size)
    return model


# 文本生成测试代码
def generate_sentence(openings, model, tokenizer, window_size):
    # reverse_vocab = dict((y, x) for x, y in vocab.items())
    model.eval()
    with torch.no_grad():
        pred_char = ""
        # 生成文本超过30字终止
        while len(openings) <= 30:
            openings += pred_char
            x = tokenizer.encode(openings, add_special_tokens=False, padding='max_length', truncation=True,
                                 max_length=10)
            x = torch.LongTensor([x])
            if torch.cuda.is_available():
                x = x.cuda()
            # batch_size = 1 最后一个字符的概率
            y = model(x)[0][-1]
            index = sampling_strategy(y)
            # 转化为中文 只有一个字符
            pred_char = tokenizer.decode(index)
    return openings


# 采样方式
def sampling_strategy(prob_distribution):
    if random.random() > 0.1:
        strategy = "greedy"
    else:
        strategy = "sampling"
    if strategy == "greedy":
        return int(torch.argmax(prob_distribution))
    elif strategy == "sampling":
        prob_distribution = prob_distribution.cpu().numpy()
        return np.random.choice(list(range(len(prob_distribution))), p=prob_distribution)


# 计算文本ppl
def calc_perplexity(sentence, model, vocab, window_size):
    prob = 0
    model.eval()
    with torch.no_grad():
        for i in range(1, len(sentence)):
            start = max(0, i - window_size)
            window = sentence[start:i]
            x = [vocab.get(char, vocab["<UNK>"]) for char in window]
            x = torch.LongTensor([x])
            target = sentence[i]
            target_index = vocab.get(target, vocab["<UNK>"])
            if torch.cuda.is_available():
                x = x.cuda()
            pred_prob_distribute = model(x)[0][-1]
            target_prob = pred_prob_distribute[target_index]
            prob += math.log(target_prob, 10)
    return 2 ** (prob * (-1 / len(sentence)))


def train(corpus_path, save_weight=True):
    epoch_num = 15  # 训练轮数
    batch_size = 64  # 每次训练样本个数
    train_sample = 10000  # 每轮训练总共训练的样本总数
    char_dim = 768  # 每个字的维度
    window_size = 10  # 样本文本长度
    # vocab = build_vocab(r"vocab.txt")  # 建立字表
    tokenizer = BertTokenizer.from_pretrained(r"D:\NLP\video\第六周\bert-base-chinese")
    vocab_size = 21128
    corpus = load_corpus(corpus_path)  # 加载语料
    model = build_model(vocab_size, char_dim)  # 建立模型
    if torch.cuda.is_available():
        model = model.cuda()
    optim = torch.optim.Adam(model.parameters(), lr=0.001)  # 建立优化器
    print("文本词表模型加载完毕,开始训练")
    for epoch in range(epoch_num):
        model.train()
        watch_loss = []
        for batch in range(int(train_sample / batch_size)):
            x, y = build_dataset(batch_size, tokenizer, window_size, corpus)  # 构建一组训练样本
            if torch.cuda.is_available():
                x, y = x.cuda(), y.cuda()
            optim.zero_grad()  # 梯度归零
            loss = model(x, y)  # 计算loss
            loss.backward()  # 计算梯度
            optim.step()  # 更新权重
            watch_loss.append(loss.item())
        print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss)))
        print(generate_sentence("忽然一阵狂风吹过,他直接", model, tokenizer, window_size))
        print(generate_sentence("天青色等烟雨,而我在", model, tokenizer, window_size))
    if not save_weight:
        return
    else:
        base_name = os.path.basename(corpus_path).replace("txt", "pth")
        model_path = os.path.join("model", base_name)
        torch.save(model.state_dict(), model_path)
        return


if __name__ == "__main__":
    train("corpus.txt", False)

    # mask = torch.tril(torch.ones(4, 4)).unsqueeze(0).unsqueeze(0)
    # print(mask)


网站公告

今日签到

点亮在社区的每一天
去签到