从预训练到有监督微调再到基于人类反馈的强化学习
你可能已经看过很多关于使用 Transformer 架构创建 LLM 的文章了,但最近像 OpenAI 的 o3、Gemini 2.5 Pro 这样的 LLM 不仅能生成文本,还能“思考”。那么,我们如何创建一个在回答问题之前能够“思考”的推理型 LLM 呢?
创建一个最简单的推理型 LLM 大致需要经过三个步骤:
推理型 LLM 流程
- 使用 Transformer 架构预训练 LLM。
- 在预训练的 LLM 上进行有监督微调(SFT)。
- 在 SFT 训练的模型上进行基于人类反馈的强化学习(RLHF),使其具备推理能力。
在这篇博客中,我将尝试让模型依次经过这三个阶段,并检查最终的输出结果。源码在文中,注意查看哦
最终输出
这是我们在样本数据和完整训练数据上得到的输出对比:
三个阶段的输出
在更大数据集上训练的模型表现更好,而在样本数据集上训练的模型虽然效果不佳,但仍然具有学习价值,因为它可以快速调试代码。
完整的代码和注释可以在 GitHub 仓库中找到:
你还可以在 Hugging Face Space 上测试由 jingyaogong 使用相同方法(预训练 → SFT → RLHF)创建的更强大的推理模型(0.1 亿参数),并下载其权重。
在开始之前,你需要具备一些基本知识,比如面向对象编程(OOP)和神经网络(NN)的基础知识。熟悉 PyTorch 也会对编码有所帮助。
准备工作
在本项目中,我们将使用一系列 Python 库,现在先导入它们:
# 标准库
import os
import json
import math
import time
import random
import warnings
from typing import Optional, Tuple, List, Union, Iterator# 数值计算
import numpy as np# PyTorch 深度学习库
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import Dataset, DataLoader# 自动混合精度和上下文管理
from contextlib import nullcontext# Hugging Face 库,用于分词和模型处理
from transformers import AutoTokenizer, PretrainedConfig, PreTrainedModel
from transformers.modeling_outputs import (
CausalLMOutputWithPast, # 因果语言建模的输出类型
BaseModelOutputWithPast # 带缓存键/值的基本模型输出类型
)
from transformers.activations import ACT2FN # Transformer 模型中使用的激活函数# Hugging Face 的 `tokenizers` 库中的分词器(快速分词器库)
from tokenizers import Tokenizer as HFTokenizer # 重命名以避免与 transformers 的分词器冲突
from tokenizers import models as hf_models
from tokenizers import trainers as hf_trainers
from tokenizers import pre_tokenizers as hf_pre_tokenizers
from tokenizers import decoders as hf_decoders
现在我们已经导入了后续训练和预处理所需的模块,接下来我们开始训练分词器。
三个阶段的数据集
由于我们涉及三个阶段的训练,每个阶段都需要不同格式的数据集。
第一个数据集用于预训练,我们使用 Transformer 架构从零开始创建 LLM,使其能够生成语法正确的文本。
让我们定义这个样本训练数据集:
# --- 预训练数据 ---
sample_pretrain_data = [
{"text": "The sun shines brightly in the clear blue sky."},
{"text": "Cats love to chase mice and play with yarn balls."},
{"text": "Reading books expands your knowledge and vocabulary."},
{"text": "Artificial intelligence is a rapidly evolving field of study."},
{"text": "To bake a cake, you need flour, sugar, eggs, and butter."},
{"text": "Large language models are trained on vast amounts of text data."},
{"text": "The quick brown fox jumps over the lazy dog."}
]
在我们用 Transformer 架构训练完这个数据之后,我们会在其基础上使用 SFT 训练数据集,让我们也定义一下这个数据集:
# --- SFT 数据 ---
sample_sft_data = [
{"conversations": [
{"role": "user", "content": "Hello, how are you?"},
{"role": "assistant", "content": "I am doing well, thank you! How can I help you today?"}
]},
{"conversations": [
{"role": "user", "content": "What is the capital of France?"},
{"role": "assistant", "content": "The capital of France is Paris."}
]},
{"conversations": [
{"role": "user", "content": "Explain gravity in simple terms."},
{"role": "assistant", "content": "Gravity is the force that pulls objects towards each other. It's why things fall down to the ground!"}
]}
]
SFT 训练通过提供问题及其正确答案来提升 LLM 的知识水平。
最后,在预训练和 SFT 训练之后,我们创建 RLHF 数据集,赋予我们的 LLM 推理能力。
# --- 推理数据 ---
sample_reasoning_data = [ {"conversations": [ {"role":
"user", "content": "If I have 3 apples and eat 1, how many are left?"}, {"role":
"assistant", "content": "<think
>The user starts with 3 apples. The user eats 1 apple. This means 1 apple is
subtracted from the initial amount. So, 3 - 1 = 2.</think
><answer>You have 2 apples left.</answer>"} ]}, {"conversations": [ {"role":
"user", "content": "What are the primary colors?"}, {"role": "assistant",
"content": "<think
>The user is asking about primary colors. These are colors that cannot be made
by mixing other colors. The standard set of primary colors in additive color
models (like light) are Red, Green, and Blue (RGB). For subtractive models
(like paint), they are often considered Red, Yellow, Blue (RYB) or Cyan,
Magenta, Yellow (CMY).</think
><answer
>The primary colors are typically considered to be red, yellow, and blue.
These are colors that can be mixed to create a range of other colors but
cannot be created by mixing other colors themselves.</answer
>"} ]} ]
在我们的 RLHF 数据集中,当用户提出问题时,回答中包含 <think>
标签,这些标签为 LLM 提供了推理的“思考”过程。
现在我们已经定义了每个阶段的训练数据集,是时候开始训练分词器了,让我们开始吧。
训练一个 BPE 分词器
有许多类型的分词器可供选择,例如在大型语料库上预训练的分词器(如 tiktoken
库中的分词器),或者在我们自己的数据集上自定义训练的分词器。我们使用的英语文本语料库越大,其词汇表的大小也就越大。
让我们使用一个样本数据集来训练,看看代码中发生了什么。
# 分词器训练数据
tokenizer_corpus = [
"Hello world, this is a demonstration of building a thinking LLM.",
"Language models learn from text data.",
"Tokenization is a crucial first step.",
"We will train a BPE tokenizer.",
"Think before you answer.",
"The answer is forty-two.",
"<think>Let's consider the options.</think><answer>Option A seems best.</answer>",
"<|im_start|>user\nWhat's up?<|im_end|>\n<|im_start|>assistant\nNot much!<|im_end|>"
]# 将数据保存到临时文件供分词器训练器使用
tokenizer_corpus_file = os.path.join("YOUR_DIR_PATH", "tokenizer_corpus.txt")
with open(tokenizer_corpus_file, 'w', encoding='utf-8') as f:
for line in tokenizer_corpus:
f.write(line + "\n")
我这里定义了一些随机文本数据,你也可以使用 莎士比亚数据集 来进行训练。
让我们创建一个简单的 BPE 函数,用它来训练分词器。
def train_demo_tokenizer( corpus_files: List[str], vocab_size: int, save_path:
str, special_tokens: List[str] ): """ 使用 Hugging Face Tokenizers 训练一个 Byte-Pair Encoding (BPE)
分词器。 Args: corpus_files (List[str]): 包含训练语料库的文件路径列表。 vocab_size (int): 期望的词汇表大小。 save_path (str): 保存训练好的分词器的路径。 special_tokens
(List[str]): 要包含在分词器中的特殊标记列表。 """ #
初始化 BPE 分词器,并设置未知标记为 <unk
> tokenizer_bpe =
HFTokenizer(hf_models.BPE(unk_token="<unk
>")) # 设置预分词器为 ByteLevel,以实现稳健的字节级处理
tokenizer_bpe.pre_tokenizer =
hf_pre_tokenizers.ByteLevel(add_prefix_space=False, use_regex=True) # 设置
解码器为 ByteLevel,以正确解码字节级标记
tokenizer_bpe.decoder = hf_decoders.ByteLevel() # 创建 BPE 训练器,设置
词汇表大小、特殊标记以及完整的字节字母表 trainer =
hf_trainers.BpeTrainer( vocab_size=vocab_size, special_tokens=special_tokens,
show_progress=True, initial_alphabet=hf_pre_tokenizers.ByteLevel.alphabet() )
# 确保 corpus_files 是一个列表 if isinstance(corpus_files, str): corpus_files
= [corpus_files] # 训练分词器 tokenizer_bpe.train(corpus_files,
trainer=trainer) print(f"分词器训练完成。词汇表大小:
{tokenizer_bpe.get_vocab_size()}") # 将分词器保存为 JSON 文件
tokenizer_bpe.save(save_path) print(f"分词器已保存至 {save_path}") return
tokenizer_bpe</unk
>
我们定义了 <unk>
来处理未知标记。我们还需要创建一些特殊标记,用于指示文本的开始和结束,以及一个填充标记,用于添加额外的空格。让我们把这些标记定义在一个列表中。
# 特殊标记 SPECIAL_TOKENS_LIST = ["<|endoftext|>", "<|im_start|>",
"<|im_end|>", "<pad>"]</pad
>
现在我们来调用这个函数,训练我们的分词器。
# 训练并保存演示 BPE 分词器
# 使用指定的语料库、词汇表大小、保存路径和特殊标记
trained_hf_tokenizer = train_demo_tokenizer(
corpus_files=[tokenizer_corpus_file],
vocab_size=1000,
save_path="NOTEBOOK_TOKENIZER_PATH",
special_tokens=SPECIAL_TOKENS_LIST
)
现在我们已经训练好了分词器,让我们在样本数据集上测试一下。
# 使用训练好的分词器测试编码和解码 test_sentence = "Hello
<|im_start|> world <think>思考中</think><answer>答案</answer> <|im_end|>"#
对句子进行编码 encoded = trained_hf_tokenizer.encode(test_sentence)#
显示原始句子及其编码形式 print(f"Original:
{test_sentence}") print(f"Encoded IDs: {encoded.ids}") print(f"Encoded Tokens:
{encoded.tokens}")# 将编码后的数据解码回文本 decoded =
trained_hf_tokenizer.decode(encoded.ids) print(f"Decoded: {decoded}")### 输出
### 训练好的分词器词汇表(前 10 个和特殊标记): 'ain': 328 '}': 96
'D': 39 'Ġfirst': 428 'Wh': 324 ...
你可以看到我们的训练好的 BPE 分词器是如何分解测试句子的。
接下来,我们来创建 Transformer 模型,它将用于执行第一阶段的训练。让我们开始吧。
Transformer 概述
让我们快速了解一下 Transformer 架构是如何处理和理解文本的。它通过将文本分解为更小的部分(称为标记)并预测序列中的下一个标记来工作。Transformer 有许多层,称为 Transformer 块,这些块一层一层地堆叠在一起,最后有一层用于进行预测。
每个 Transformer 块包含两个主要组件:
- 自注意力头(Self-Attention Heads):这些组件会找出输入中对模型来说最重要的部分。例如,在处理句子时,注意力头可以突出显示单词之间的关系,比如代词与其所指代的名词之间的关系。
- 多层感知机(MLP):这是一个简单的前馈神经网络。它接收自注意力头强调的信息,并进一步处理这些信息。MLP 包含一个输入层,用于接收来自注意力头的数据;一个隐藏层,用于增加处理的复杂性;以及一个输出层,用于将结果传递给下一个 Transformer 块。
结合起来,注意力头充当了“思考什么”的部分,而 MLP 则是“如何思考”的部分。堆叠多个 Transformer 块可以让模型理解文本中的复杂模式和关系,但这并不是绝对的。
让我们来可视化一个更简单、更易于编码的架构图。
让我们阅读一下我们将要编码的架构流程:
- 输入标记被转换为嵌入,并与位置信息相结合。
- 模型包含 64 个相同的 Transformer 块,按顺序处理数据。
- 每个块首先运行多头注意力,以查看标记之间的关系。
- 每个块然后通过 MLP 处理数据,该 MLP 会扩展数据,然后将其压缩回去。
- 每个步骤都使用残差连接(快捷方式),以帮助信息流动。
- 在整个过程中使用层归一化,以稳定训练。
- 注意力机制计算哪些标记应该相互关注。
- MLP 将数据扩展到 4 倍大小,应用 ReLU 激活函数,然后将其压缩回去。
- 模型使用 16 个注意力头来捕获不同类型的 关系。
- 最后一层将处理后的数据转换为词汇表大小的预测结果。
- 模型通过反复预测下一个最有可能的标记来生成文本。
多层感知机(MLP)
MLP 是 Transformer 中前馈网络的一个基本构建块。它的作用是引入非线性,并学习嵌入表示中的复杂关系。定义 MLP 模块时,一个重要的参数是 n_embed
,它定义了输入嵌入的维度。
MLP 通常由一个隐藏的线性层组成,该线性层将输入维度扩展一个因子(通常是 4,我们将使用这个因子),后面跟着一个非线性激活函数,通常是 ReLU。这种结构允许我们的网络学习更复杂的特征。最后,一个投影线性层将扩展后的表示映射回原始嵌入维度。这一系列变换使 MLP 能够细化由注意力机制学到的表示。
MLP
# --- 多层感知机(MLP)类 ---
class MLP(nn.Module):
"""
一个简单的多层感知机,包含一个隐藏层。
这个模块用于 Transformer 块中的前馈处理。
它将输入嵌入维度扩展,应用 ReLU 激活函数,然后将其投影回原始嵌入维度。
"""
def __init__(self, n_embed):
super().__init__()
self.hidden = nn.Linear(n_embed, 4 * n_embed) # 扩展嵌入维度的线性层
self.relu = nn.ReLU() # ReLU 激活函数
self.proj = nn.Linear(4 * n_embed, n_embed) # 投影回原始维度的线性层
def forward(self, x):
"""
MLP 的前向传播。
参数:
x (torch.Tensor):输入张量,形状为 (B, T, C),其中 B 是批量大小,
T 是序列长度,C 是嵌入维度。
返回:
torch.Tensor:与输入形状相同的输出张量。
"""
x = self.forward_embedding(x)
x = self.project_embedding(x)
return x
def forward_embedding(self, x):
"""
应用隐藏线性层和 ReLU 激活函数。
参数:
x (torch.Tensor):输入张量。
返回:
torch.Tensor:经过隐藏层和 ReLU 处理后的输出。
"""
x = self.relu(self.hidden(x))
return x
def project_embedding(self, x):
"""
应用投影线性层。
参数:
x (torch.Tensor):输入张量。
返回:
torch.Tensor:经过投影层处理后的输出。
"""
x = self.proj(x)
return x
我们刚刚编写了 MLP 部分的代码,其中 __init__
方法初始化了一个隐藏线性层,用于扩展输入嵌入维度(n_embed
),以及一个投影层,用于将其压缩回去。ReLU 激活函数在隐藏层之后应用。
forward
方法定义了数据通过这些层的流动过程,通过 forward_embedding
应用隐藏层和 ReLU,通过 project_embedding
应用投影层。
单头注意力
注意力头是模型的核心部分。它的作用是专注于输入序列的相关部分。
定义一个 Head 模块时,一些重要的参数是 head_size
、n_embed
和 context_length
。head_size
参数决定了键、查询和值投影的维度,影响注意力机制的表示能力。
输入嵌入维度 n_embed
定义了这些投影层的输入大小。context_length
用于创建因果掩码,确保模型只能关注前面的标记。
在 Head 中,初始化了无偏置的键、查询和值的线性层(nn.Linear
)。基于 context_length
的下三角矩阵被注册为缓冲区,用于实现因果掩码,防止注意力机制关注未来的标记。
# --- 注意力头类 ---
class Head(nn.Module):
"""
单个注意力头。
这个模块计算注意力分数,并将其应用于值。
它包括键、查询和值投影,并使用因果掩码
防止关注未来的标记。
"""
def __init__(self, head_size, n_embed, context_length):
super().__init__()
self.key = nn.Linear(n_embed, head_size, bias=False) # 键投影
self.query = nn.Linear(n_embed, head_size, bias=False) # 查询投影
self.value = nn.Linear(n_embed, head_size, bias=False) # 值投影
# 用于因果掩码的下三角矩阵
self.register_buffer('tril', torch.tril(torch.ones(context_length, context_length)))
def forward(self, x):
"""
注意力头的前向传播。
参数:
x (torch.Tensor):输入张量,形状为 (B, T, C)。
返回:
torch.Tensor:应用注意力后的输出张量。
"""
B, T, C = x.shape
k = self.key(x) # (B, T, head_size)
q = self.query(x) # (B, T, head_size)
scale_factor = 1 / math.sqrt(C)
# 计算注意力权重:(B, T, head_size) @ (B, head_size, T) -> (B, T, T)
attn_weights = q @ k.transpose(-2, -1) * scale_factor
# 应用因果掩码
attn_weights = attn_weights.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
attn_weights = F.softmax(attn_weights, dim=-1)
v = self.value(x) # (B, T, head_size)
# 应用注意力权重到值
out = attn_weights @ v # (B, T, T) @ (B, T, head_size) -> (B, T, head_size)
return out
我们的注意力头类的 __init__
方法初始化了键、查询和值的线性层,每个层都将输入嵌入(n_embed
)投影到 head_size
。
基于 context_length
的下三角矩阵用于因果掩码。
forward
方法通过缩放查询和键的点积来计算注意力权重,应用因果掩码,使用 softmax 对权重进行归一化,并计算值的加权和以产生注意力输出。
多头注意力
为了捕获输入序列中的多样化关系,我们将使用多头注意力的概念。MultiHeadAttention
模块管理多个并行运行的独立注意力头。
这里的关键参数是 n_head
,它决定了并行注意力头的数量。输入嵌入维度(n_embed
)和 context_length
也是实例化各个注意力头所必需的。
每个头独立处理输入,将其投影到一个更低维度的子空间中,大小为 n_embed // n_head
。通过拥有多个头,模型可以同时关注输入的不同方面。
# --- 多头注意力类 ---
class MultiHeadAttention(nn.Module):
"""
多头注意力模块。
这个模块将多个注意力头并行组合。每个头的输出被拼接起来形成最终输出。
"""
def __init__(self, n_head, n_embed, context_length):
super().__init__()
self.heads = nn.ModuleList([Head(n_embed // n_head, n_embed, context_length) for _ in range(n_head)])
def forward(self, x):
"""
多头注意力的前向传播。
参数:
x (torch.Tensor):输入张量,形状为 (B, T, C)。
返回:
torch.Tensor:拼接所有头的输出后的输出张量。
"""
# 沿最后一个维度(C)拼接每个头的输出
x = torch.cat([h(x) for h in self.heads], dim=-1)
return x
现在我们已经定义了 MultiHeadAttention
类,它将多个注意力头组合在一起。__init__
方法初始化了一个包含 n_head
个 Head
实例的列表,每个头的 head_size
为 n_embed // n_head
。
forward
方法将每个注意力头应用于输入 x
,并将它们的输出沿最后一个维度拼接起来,合并每个头学到的信息。
Transformer 块
为了创建一个拥有数十亿参数的模型,我们肯定需要一个深度架构。为此,我们需要编写 Transformer 块并将其堆叠起来。
Transformer 块的关键参数是 n_head
、n_embed
和 context_length
。每个块包含一个多头注意力层和一个前馈网络(MLP),在每个层之前应用层归一化,并在每个层之后使用残差连接。
层归一化由嵌入维度 n_embed
参数化,有助于稳定训练。多头注意力机制如前所述,需要 n_head
、n_embed
和 context_length
。
MLP 也使用嵌入维度 n_embed
。这些组件协同工作,处理输入并学习复杂的模式。
Transformer 块
# --- Transformer 块类 ---
class Block(nn.Module):
"""
单个 Transformer 块。
这个块包含一个多头注意力层,后面跟着一个 MLP,
并在每个层之前应用层归一化,以及在每个层之后使用残差连接。
"""
def __init__(self, n_head, n_embed, context_length):
super().__init__()
self.ln1 = nn.LayerNorm(n_embed)
self.attn = MultiHeadAttention(n_head, n_embed, context_length)
self.ln2 = nn.LayerNorm(n_embed)
self.mlp = MLP(n_embed)
def forward(self, x):
"""
Transformer 块的前向传播。
参数:
x (torch.Tensor):输入张量。
返回:
torch.Tensor:经过块处理后的输出张量。
"""
# 应用多头注意力并使用残差连接
x = x + self.attn(self.ln1(x))
# 应用 MLP 并使用残差连接
x = x + self.mlp(self.ln2(x))
return x
def forward_embedding(self, x):
"""
关注嵌入和注意力部分的前向传播。
参数:
x (torch.Tensor):输入张量。
返回:
tuple:一个元组,包含经过 MLP 嵌入后的输出和残差。
"""
res = x + self.attn(self.ln1(x))
x = self.mlp.forward_embedding(self.ln2(res))
return x, res
我们的 Block
类代表一个单独的 Transformer 块。__init__
方法初始化了层归一化层(ln1
、ln2
)、一个多头注意力模块和一个 MLP 模块,所有这些都由 n_head
、n_embed
和 context_length
参数化。
forward
方法实现了块的前向传播,应用层归一化和多头注意力,并使用残差连接,然后再次应用层归一化和 MLP,同样使用残差连接。
forward_embedding
方法提供了一个替代的前向传播,专注于注意力和初始 MLP 嵌入阶段。
最终模型
到目前为止,我们已经编写了 Transformer 模型的小型组件。接下来,我们将集成标记嵌入和位置嵌入,与一系列 Transformer 块结合,以执行序列到序列的任务。为此,我们需要编写几个关键参数:n_head
、n_embed
、context_length
、vocab_size
和 N_BLOCKS
。
vocab_size
决定了标记嵌入层的大小,将每个标记映射到一个大小为 n_embed
的密集向量。context_length
参数对于位置嵌入层很重要,它也以维度 n_embed
编码输入序列中每个标记的位置。注意力头的数量(n_head
)和块的数量(N_BLOCKS
)决定了网络的深度和复杂性。
这些参数共同定义了 Transformer 模型的架构和容量,所以让我们来编写代码。
Transformer 类
# --- Transformer 模型类 ---
class Transformer(nn.Module):
"""
主要的 Transformer 模型。
这个类将标记嵌入和位置嵌入与一系列 Transformer 块结合,
并在最后使用一个线性层进行语言建模。
"""
def __init__(self, n_head, n_embed, context_length, vocab_size, N_BLOCKS):
super().__init__()
self.context_length = context_length
self.N_BLOCKS = N_BLOCKS
self.token_embed = nn.Embedding(vocab_size, n_embed)
self.position_embed = nn.Embedding(context_length, n_embed)
self.attn_blocks = nn.ModuleList([Block(n_head, n_embed, context_length) for _ in range(N_BLOCKS)])
self.layer_norm = nn.LayerNorm(n_embed)
self.lm_head = nn.Linear(n_embed, vocab_size)
self.register_buffer('pos_idxs', torch.arange(context_length))
def _pre_attn_pass(self, idx):
"""
结合标记嵌入和位置嵌入。
参数:
idx (torch.Tensor):输入标记索引。
返回:
torch.Tensor:标记嵌入和位置嵌入的和。
"""
B, T = idx.shape
tok_embedding = self.token_embed(idx)
pos_embedding = self.position_embed(self.pos_idxs[:T])
return tok_embedding + pos_embedding
def forward(self, idx, targets=None):
"""
Transformer 的前向传播。
参数:
idx (torch.Tensor):输入标记索引。
targets (torch.Tensor, 可选):用于损失计算的目标标记索引。默认为 None。
返回:
tuple:输出的 logits 和损失(如果提供了目标)。
"""
x = self._pre_attn_pass(idx)
for block in self.attn_blocks:
x = block(x)
x = self.layer_norm(x)
logits = self.lm_head(x)
loss = None
if targets is not None:
B, T, C = logits.shape
flat_logits = logits.view(B * T, C)
targets = targets.view(B * T).long()
loss = F.cross_entropy(flat_logits, targets)
return logits, loss
def forward_embedding(self, idx):
"""
关注嵌入和注意力块的前向传播。
参数:
idx (torch.Tensor):输入标记索引。
返回:
tuple:注意力块后的输出和残差。
"""
x = self._pre_attn_pass(idx)
residual = x
for block in self.attn_blocks:
x, residual = block.forward_embedding(x)
return x, residual
def generate(self, idx, max_new_tokens):
"""
给定起始序列,生成新的标记。
参数:
idx (torch.Tensor):初始标记索引序列。
max_new_tokens (int):要生成的标记数量。
返回:
torch.Tensor:扩展后的标记序列。
"""
for _ in range(max_new_tokens):
idx_cond = idx[:, -self.context_length:]
logits, _ = self(idx_cond)
logits = logits[:, -1, :]
probs = F.softmax(logits, dim=-1)
idx_next = torch.multinomial(probs, num_samples=1)
idx = torch.cat((idx, idx_next), dim=1)
return idx
我们的 Transformer
类的 __init__
方法初始化了标记嵌入层和位置嵌入层(token_embed
、position_embed
)、一系列 Block
模块(attn_blocks
)、一个最终的层归一化层(layer_norm
)以及一个用于语言建模的线性层(lm_head
)。
_pre_attn_pass
方法结合了标记嵌入和位置嵌入。forward
方法将输入序列通过嵌入层和一系列 Transformer 块,应用最终的层归一化,并生成 logits。
如果提供了目标,它还会计算损失。forward_embedding
方法提供了一个中间的前向传播,直到注意力块的输出,而 generate
方法实现了标记生成。
预训练 Transformer
在开始训练之前,让我们为 Transformer 设置一些超参数。为了演示,我们将保持它们相对较小,以便在标准机器上运行。
# --- 定义模型超参数 ---
CONTEXT_LENGTH = 64 # 模型一次查看的最大标记数
N_EMBED = 256 # 嵌入维度(标记向量的大小)
N_HEAD = 8 # 注意力头的数量
N_BLOCKS = 6 # 要堆叠的 Transformer 块数量
VOCAB_SIZE = trained_hf_tokenizer.get_vocab_size() # 从我们训练好的分词器获取词汇表大小
LEARNING_RATE = 3e-4 # 优化器的学习率
BATCH_SIZE = 32 # 一次处理的序列数量
N_EPOCHS = 100 # 我们将完整遍历整个数据集的次数(用于演示)# --- 设备配置 ---
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'# --- 实例化 Transformer 模型 ---
model = Transformer(
n_head=N_HEAD,
n_embed=N_EMBED,
context_length=CONTEXT_LENGTH,
vocab_size=VOCAB_SIZE,
N_BLOCKS=N_BLOCKS
).to(DEVICE)# 打印模型中的参数数量
num_params = sum(p.numel() for p in model.parameters())
print(f"我们的 Transformer 模型有 {num_params/1e6:.2f} 百万参数。")### 输出
使用设备:cuda(如果没有 GPU,则为 cpu)
我们的 Transformer 模型有 2.15 百万参数。
所以,我们得到了一个大约有 200 万参数的模型。与 GPT-3 这样的庞然大物相比,这个模型简直小得可怜,但它是一个很好的起点,有助于我们理解其工作机制。
我们将把数据输入模型,计算其预测的错误程度(“损失”),并使用优化器调整其内部参数(权重),使其随着时间的推移变得更好。
# --- 优化器 ---
# AdamW 是训练 Transformers 的流行选择
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)# --- 混合精度(如果使用 GPU,推荐使用) ---
# Autocast 启用自动混合精度,对某些操作使用 float16
# 以加速训练并减少内存占用,同时对其他操作保持精度。
PTDTYPE = torch.float32 # 博客默认值
autocast_ctx = nullcontext() if DEVICE == 'cpu' else torch.amp.autocast(device_type=DEVICE, dtype=PTDTYPE)
# GradScaler 帮助防止使用 float16 时梯度变得过小(下溢)
scaler = torch.cuda.amp.GradScaler(enabled=(PTDTYPE != torch.float32 and DEVICE == 'cuda'))# --- 预训练循环 ---
print(f"开始预训练,共 {N_EPOCHS} 个周期...")
model.train() # 将模型设置为训练模式for epoch in range(N_EPOCHS):
total_loss = 0
batch_count = 0 for xb, yb in pretrain_dataloader: # 将批次移动到配置的设备
xb, yb = xb.to(DEVICE), yb.to(DEVICE) # 清零上一次迭代的梯度
optimizer.zero_grad(set_to_none=True) with autocast_ctx: # 前向传播:获取模型的预测(logits)和损失 # 我们的 Transformer 的 forward 方法返回 (logits, loss)
logits, loss = model(xb, targets=yb) if loss is not None: # 确保计算了损失 # 反向传播:计算损失相对于模型参数的梯度 # 如果使用 scaler,则在反向传播之前对损失进行缩放
scaler.scale(loss).backward() # 优化器步骤:更新模型参数
scaler.step(optimizer) # 更新下一次迭代的 scaler
scaler.update() total_loss += loss.item()
batch_count += 1 # 记录进度(例如,对于小数据集,每 10 步记录一次)
if batch_count % max(1, len(pretrain_dataloader)//2) == 0 : # 每个 epoch 记录两次
print(f"Epoch {epoch+1}/{N_EPOCHS}, Batch {batch_count}/{len(pretrain_dataloader)}, Loss: {loss.item():.4f}") avg_epoch_loss = total_loss / batch_count if batch_count > 0 else float('nan')
print(f"--- 第 {epoch+1} 个周期结束,平均预训练损失: {avg_epoch_loss:.4f} ---")print("预训练完成。")# --- 保存预训练模型 ---
PRETRAINED_MODEL_PATH = os.path.join(NOTEBOOK_OUT_DIR, "thinking_llm_pretrained.pth")
torch.save(model.state_dict(), PRETRAINED_MODEL_PATH)
print(f"预训练模型权重已保存到: {PRETRAINED_MODEL_PATH}")
当我们开始训练循环时,它将为每个周期打印损失。
### 输出
开始预训练,共 100 个周期...
第 1 个周期,第 3 批次,损失:5.8732
第 1 个周期,第 6 批次,损失:5.1234
--- 第 1 个周期结束,平均预训练损失:5.4321 ---
... (很多周期后) ...
第 100 个周期,第 3 批次,损失:1.5678
第 100 个周期,第 6 批次,损失:1.4990--- 第 100 个周期结束,平均预训练损失:1.5220 ---
预训练完成。
预训练模型权重已保存到:./out_notebook_scratch_blog/thinking_llm_pretrained.pth
我们的模型现在已经从预训练数据中学习到了一些基本的语言结构。如果我们现在测试它的 generate
方法,它可能可以生成一些连贯的(但可能重复或毫无意义的)文本。
预训练模型推理
让我们快速测试一下,看看我们的预训练模型是否学到了一些东西,哪怕是最小的内容。
pt_model.eval() # 将模型设置为评估模式
test_prompt_str_pt = "Language models learn"
# 如果分词器没有自动添加 BOS,则在生成时添加 BOS 以保持一致性
pt_test_input_ids = tokenizer(tokenizer.bos_token + test_prompt_str_pt, return_tensors="pt").input_ids.to(DEVICE)with torch.no_grad(), autocast_ctx:
generated_output_pt = pt_model.generate(
pt_test_input_ids,
max_new_tokens=15,
do_sample=False, # 此测试使用贪婪策略
eos_token_id=tokenizer.eos_token_id,
pad_token_id=tokenizer.pad_token_id
)
decoded_generated_pt = tokenizer.decode(generated_output_pt[0], skip_special_tokens=True)
print(f"提示:'{test_prompt_str_pt}' -> 生成:'{decoded_generated_pt}'")### 输出 ###
Language models learn learn learn learn learn
输出中出现了明显的幻觉,这是预料之中的,因为我们只在少量数据上训练了几个周期。
我在 Pile 数据集(约 5 GB 片段) 上训练了相同的架构,并且训练了相同数量的周期,我们得到了这样的输出。
# 200 万参数 LLM 输出(在样本数据上)
Language models learn learn learn learn learn# 200 万参数 LLM 输出(在 Pile 数据上)
Language models were directly linked to
但目前,我们已经得到了一个预训练的 LLM。在进入下一步 SFT 之前,通常需要进行大量的工作,例如进行幻觉检查、移除脏话,以及进行多次试错迭代。
然而,由于这篇博客是一个简单的推理型 LLM 工作原理的实现,我们可以直接进入 SFT 阶段。让我们开始吧。
SFT 数据集加载器
预训练使模型通过暴露于大量通用文本,获得了对语言的广泛理解。然而,为了使模型更适合特定任务,例如回答问题或遵循指令,我们使用 有监督微调(SFT)。
在 SFT 中,模型在像我们的 sample_sft_data
这样的精心策划的数据集上进行训练,这些数据集包括用户提示及其理想的助手回答。
SFT 的一个关键部分是 损失掩码。在训练期间,用户的输入和助手的回答都被包含在模型的输入序列中。
然而,我们只想让模型从助手的回答中学习,而不是从用户的提示中学习(因为在实际交互中,模型会收到用户的提示)。
损失掩码确保只有与助手回答对应的标记才计入损失计算。这使模型的学习专注于生成适当且有帮助的回答,同时在优化过程中忽略提示本身。
首先,我们需要创建一个 SFT 数据集加载器类,它将处理我们样本数据集的 SFT 训练。
class SFTDataset(Dataset):
"""
简化的有监督微调数据集,使用 ChatML 风格的格式。
只有助手的回答用于损失计算。
"""
def __init__(self, file_path, tokenizer, context_length):
self.tokenizer = tokenizer
self.context_length = context_length with open(file_path, 'r', encoding='utf-8') as f:
self.conversations = [json.loads(line)['conversations'] for line in f] print(f"加载了 {len(self.conversations)} 次对话。") def __len__(self):
return len(self.conversations) def __getitem__(self, idx):
conversation = self.conversations[idx] tokens = [self.tokenizer.bos_token_id]
labels = [-100] for turn in conversation:
role, content = turn['role'], turn['content'] # 编码轮次
prefix = self.tokenizer.encode(f"<|im_start|>{role}\n", add_special_tokens=False).ids
content_ids = self.tokenizer.encode(content, add_special_tokens=False).ids
suffix = self.tokenizer.encode("<|im_end|>\n", add_special_tokens=False).ids # 添加标记
tokens.extend(prefix + content_ids + suffix) # 创建标签
if role == 'assistant':
labels.extend([-100] * len(prefix))
labels.extend(content_ids)
labels.extend(suffix)
else:
labels.extend([-100] * (len(prefix) + len(content_ids) + len(suffix))) # 确保长度为 context_length + 1,以便进行移位
pad_id = self.tokenizer.pad_token_id
tokens = tokens[:self.context_length + 1] + [pad_id] * max(0, self.context_length + 1 - len(tokens))
labels = labels[:self.context_length + 1] + [-100] * max(0, self.context_length + 1 - len(labels)) # 准备输入和目标
input_ids = torch.tensor(tokens[:-1], dtype=torch.long)
target_ids = torch.tensor(labels[1:], dtype=torch.long) return input_ids, target_ids
SFTDataset
准备输入和标签序列,以便在训练期间仅使用助手的回答计算损失。
这意味着用户的标记包含在输入中,但在损失计算中被忽略,专注于学习助手的部分。
在更复杂的情况下,可能需要更仔细地对齐标记,但这涵盖了基本的方法。
SFT 训练循环
让我们编写 SFT 训练循环,它将使用我们之前使用 Transformer 架构训练的相同预训练模型。
# --- SFT 训练循环 ---
# 使用更小的学习率进行微调
SFT_LEARNING_RATE = 1e-4
SFT_N_EPOCHS = 50 # SFT 往往需要在较小的数据集上进行更多周期的训练optimizer_sft = torch.optim.AdamW(model.parameters(), lr=SFT_LEARNING_RATE)
# 如果 GPU 设置相同,可以重用 Scaler 和 autocast_ctxprint(f"开始 SFT,共 {SFT_N_EPOCHS} 个周期...")
model.train() # 将模型设置为训练模式for epoch in range(SFT_N_EPOCHS):
total_sft_loss = 0
sft_batch_count = 0 for xb_sft, yb_sft_labels in sft_dataloader:
xb_sft, yb_sft_labels = xb_sft.to(DEVICE), yb_sft_labels.to(DEVICE) optimizer_sft.zero_grad(set_to_none=True) with autocast_ctx: # 前向传播:我们的模型的 forward 方法接受 `idx` 和 `targets`。 # `targets` 在这里应该是 `yb_sft_labels`,它已经在非助手部分标记为 -100。 # 模型内部使用这些标签计算交叉熵损失。
logits_sft, loss_sft = model(xb_sft, targets=yb_sft_labels) if loss_sft is not None:
scaler.scale(loss_sft).backward()
scaler.step(optimizer_sft)
scaler.update()
total_sft_loss += loss_sft.item()
sft_batch_count += 1 if sft_batch_count % max(1, len(sft_dataloader)//1) == 0: # 对于小 SFT 数据,每个周期记录一次
print(f"SFT 第 {epoch+1}/{SFT_N_EPOCHS} 个周期,第 {sft_batch_count}/{len(sft_dataloader)} 批次,损失: {loss_sft.item():.4f}") avg_epoch_sft_loss = total_sft_loss / sft_batch_count if sft_batch_count > 0 else float('nan')
print(f"--- SFT 第 {epoch+1} 个周期结束,平均 SFT 损失: {avg_epoch_sft_loss:.4f} ---")print("SFT 完成。")
这个循环非常简单,唯一的区别在于 SFT 损失。当我们开始训练时,它将为每个周期打印损失,看看输出。
### 输出
开始 SFT,共 50 个周期...
SFT 第 1 个周期,第 1 批次,损失:2.1241
--- SFT 第 1 个周期结束,平均 SFT 损失:2.1241 ---
...
SFT 第 50 个周期,第 1 批次,损失:0.8721
--- SFT 第 50 个周期结束,平均 SFT 损失:0.8721 ---
SFT 完成。
我们之前仅学习生成单词的预训练模型,现在由于我们在 SFT 训练数据上的训练,已经获得了基本的语言问答能力。它很可能会产生一些连贯的回答,例如当我们说“Hi”时,它会回答“Hi, how can I help…”。
SFT 模型推理
尽管我们的 SFT 训练数据集非常小,我们的预训练模型也很有限,但这是我们在 OpenOrca 数据集上进行了 50 个周期的 SFT 训练的 200 万参数 LLM 的输出。
# 准备一个简单的对话
chat_history = [{"role": "user", "content": "What is the capital of France?"}]
prompt = tokenizer.apply_chat_template(chat_history, tokenize=False, add_generation_prompt=True)
inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE)# 生成回答
with torch.no_grad(), autocast_ctx_sft:
outputs = sft_model_demo.generate(
inputs.input_ids,
max_new_tokens=200,
do_sample=True,
eos_token_id=tokenizer.eos_token_id,
pad_token_id=tokenizer.pad_token_id
)# 解码并记录回答
response = tokenizer.decode(outputs[0]inputs.input_ids.shape[1]:], skip_special_tokens=True)
logger(f"SFT Prompt: '{chat_history[0]['content']}' -> Generated: '{response}'")
我们问了一个非常简单的问题,看看我们更好的模型的输出。
# 200 万参数 SFT LLM 输出(在样本 SFT 数据上)
unbiased asf gg link# 200 万参数 SFT LLM 输出(在 ORCA SFT 数据上)
paris is the bigger
在更大数据集上训练的模型开始提供更准确的回答,例如对“法国的首都是什么?”这个问题的回答。
但目前,我们假设我们的 SFT 模型表现良好。接下来是推理训练,强化学习,赋予 LLM 推理能力。
RLHF 风格的数据集加载器
我们可以大量重用 SFTDataset
的结构,因为对话的核心格式是相似的。sample_reasoning_data
已经在内容中嵌入了助手的“思考”和“回答”部分。
SFTDataset
将确保只有这些助手部分(包括特殊标签)用于计算损失。
# --- 将推理数据保存到文件 ---
reasoning_file_path = os.path.join(NOTEBOOK_DATA_DIR, "reasoning_data.jsonl")
with open(reasoning_file_path, 'w', encoding='utf-8') as f:
for item in sample_reasoning_data: # sample_reasoning_data 在文档开头已定义
f.write(json.dumps(item) + '\n')
print(f"推理数据已保存到: {reasoning_file_path}")# --- 实例化推理数据集和数据加载器 ---
# 我们重用 SFTDataset 类,因为它正确处理对话格式
# 并且只对助手的轮次计算损失。
# <think> 和 <answer> 标签在助手的内容中。
reasoning_dataset = SFTDataset(
file_path=reasoning_file_path,
tokenizer=trained_hf_tokenizer, # 我们训练好的分词器
context_length=CONTEXT_LENGTH # 之前定义的超参数
)
# 如果内存受限,可以使用更小的批量大小,特别是对于更大的模型或上下文长度
REASONING_BATCH_SIZE = max(1, BATCH_SIZE // 2) # 示例:预训练/SFT 批量大小的一半
reasoning_dataloader = DataLoader(reasoning_dataset, batch_size=REASONING_BATCH_SIZE, shuffle=True)
数据集加载器正确地准备了序列,使模型学习生成整个助手回答,包括关键的 <think>
和 <answer>
标签。
带权重损失的训练循环
为了真正让模型优先学习 <think>
和 <answer>
结构,我们将稍微修改损失计算。
我们将识别这些特定的标签标记,并在损失中为它们分配更高的权重,向模型发出信号,表明正确生成这些结构标记非常重要。
# SFT_MODEL_PATH = os.path.join(NOTEBOOK_OUT_DIR, "thinking_llm_sft.pth") #
示例路径 if os.path.exists(SFT_MODEL_PATH):
model.load_state_dict(torch.load(SFT_MODEL_PATH, map_location=DEVICE))
print(f"从 {SFT_MODEL_PATH} 加载了 SFT 训练的权重到模型中。") else:
print(f"警告:SFT 模型路径 {SFT_MODEL_PATH} 未找到。推理训练
将使用当前模型状态。")# --- 推理训练超参数 ---
REASONING_LEARNING_RATE = 5e-5 # 这个最终微调阶段通常更小
REASONING_N_EPOCHS = 75 # 可能需要更多周期来学习新的
结构optimizer_rsn = torch.optim.AdamW(model.parameters(),
lr=REASONING_LEARNING_RATE)# --- 定义特殊推理标签及其 ID ---
# 这些标签应在分词器
训练期间添加到 SPECIAL_TOKENS_LIST 中 # 以确保它们被
`trained_hf_tokenizer` 作为单个标记处理。 think_token_id =
trained_hf_tokenizer.token_to_id("<think
>") end_think_token_id = trained_hf_tokenizer.token_to_id("</think
>") answer_token_id = trained_hf_tokenizer.token_to_id("<answer
>") end_answer_token_id = trained_hf_tokenizer.token_to_id("</answer
>")
现在我们已经定义了训练参数,让我们定义 RLHF 学习的训练循环。
# --- 推理训练循环 ---
print(f"开始推理训练,共 {REASONING_N_EPOCHS} 个周期...")
model.train() # 将模型设置为训练模式for epoch in range(REASONING_N_EPOCHS):
total_reasoning_loss = 0
rsn_batch_count = 0 for xb_rsn, yb_rsn_labels in reasoning_dataloader:
xb_rsn, yb_rsn_labels = xb_rsn.to(DEVICE), yb_rsn_labels.to(DEVICE) optimizer_rsn.zero_grad(set_to_none=True) with autocast_ctx: # 获取模型的 logits。我们的模型.forward() 带 `targets` 计算标准 CE 损失。 # 为了加权损失,我们需要手动获取原始 logits 并计算损失。
logits_rsn, * = model(xb_rsn) # 仅传递 idx 以获取 logits # 展平 logits 和标签以进行逐标记损失计算 # logits_rsn 形状:(B, T, VocabSize),yb_rsn_labels 形状:(B, T)
flat_logits_rsn = logits_rsn.view(-1, VOCAB_SIZE) # (B*T, VocabSize)
flat_labels_rsn = yb_rsn_labels.view(-1) # (B*T) # 计算每个标记的原始损失(尊重 ignore_index=-100)
raw_loss_per_token = loss_fct_no_reduction(flat_logits_rsn, flat_labels_rsn) # 为每个标记创建一个权重张量。 # 初始化权重:1.0 用于目标标记(标签不是 -100),0.0 否则。
token_weights = (flat_labels_rsn != -100).float() # 识别特殊推理标签在目标标签中的位置 # is_special_tag_mask 在 flat_labels_rsn 是我们的特殊推理标签之一时为 True
is_special_tag_mask = torch.isin(flat_labels_rsn, special_reasoning_tag_ids) # 为特殊标签(同时也是有效目标标记)应用更高的权重 # (即,其标签不是 -100 且是我们的特殊标签之一)
target_special_tags_mask = is_special_tag_mask & (flat_labels_rsn != -100)
token_weights[target_special_tags_mask] _= REASONING_TAG_LOSS_WEIGHT # 计算最终加权损失 # (loss_per_token _ weight_per_token) 的总和 # 通过有效目标标记的数量(标签 != -100)归一化
# 这样可以保持损失量级与未加权的 SFT 大致相当,
# 同时仍然强调标签。
num_actual_target_tokens = (flat_labels_rsn != -100).float().sum().clamp(min=1)
weighted_loss_rsn = (raw_loss_per_token \* token_weights).sum() / num_actual_target_tokens
# 反向传播和优化器步骤,检查损失是否有效
if not torch.isnan(weighted_loss_rsn) and not torch.isinf(weighted_loss_rsn):
scaler.scale(weighted_loss_rsn).backward()
scaler.step(optimizer_rsn)
scaler.update()
total_reasoning_loss += weighted_loss_rsn.item()
else:
# 如果某个批次的 num_actual_target_tokens 为零(例如,所有标签都是 -100)
# 可能会出现这种情况,跳过该批次的更新。
print(f"警告:在推理周期 {epoch+1},批次 {rsn_batch_count+1} 中遇到 NaN 或 Inf 损失。跳过此批次的更新。")
rsn_batch_count += 1
# 记录进度
if rsn_batch_count % max(1, len(reasoning_dataloader)//1) == 0: # 如果批次较少,则每个周期记录一次
current_loss_item = weighted_loss_rsn.item() if not (torch.isnan(weighted_loss_rsn) or torch.isinf(weighted_loss_rsn)) else float('nan')
print(f"推理周期 {epoch+1}/{REASONING_N_EPOCHS},批次 {rsn_batch_count}/{len(reasoning_dataloader)},加权损失:{current_loss_item:.4f}")
avg_epoch_rsn_loss = total_reasoning_loss / rsn_batch_count if rsn_batch_count > 0 else float('nan')
print(f"--- 推理周期 {epoch+1} 结束,平均推理损失:{avg_epoch_rsn_loss:.4f} ---")
print("推理训练完成。")
在这个循环中,模型专门针对推理任务进行训练,经过多个周期。对于每个数据批次,它计算预测(logits),并与真实标签进行比较。
关键在于,损失(衡量模型预测错误程度的指标)被调整为给某些特殊推理标签赋予更高的权重,从而让模型更加专注于学习这些关键部分。
计算加权损失后,模型通过反向传播和优化器更新内部参数。我们还加入了安全检查,以跳过那些损失计算可能出错的批次(例如,损失为 NaN 或 Inf)。
在整个训练过程中,它会记录进度,并在每个周期结束时报告平均损失。
### 输出
加载了 SFT 训练的权重到模型中,路径为 ./blog_output_dir/thinking_llm_sft.pth
特殊推理标签 ID 用于加权:[ID_think, ID_endthink, ID_answer, ID_endanswer](实际标记 ID)
开始推理训练,共 75 个周期...
推理周期 1/75,批次 1/1,加权损失:1.9503
--- 推理周期 1 结束,平均推理损失:1.9503 ---
... (很多周期后) ...
推理周期 75/75,批次 1/1,加权损失:0.3881
--- 推理周期 75 结束,平均推理损失:0.3881 ---
它开始训练并打印损失。训练完成后,我们训练好的 RLHF 模型会被保存。
我们期望经过训练的 RLHF 模型在生成回答时会先生成 <think>
标签,也就是说,它会先进行“思考”,然后再基于这个思考过程生成回答。
推理我们的思考型 LLM
正如之前观察到的,SFT 或预训练只有在使用更大的数据集、更多的周期和参数时才有效。在 RLHF 中,我也发现了同样的情况。在更大的 RLHF 数据集上训练的模型生成的回答要好得多。虽然并不完美,但更接近我们的最低期望。
推理 RLHF 模型的代码与之前略有不同,因为回答中包含了标签,而之前的 SFT 和预训练 LLM 的回答只是原始输出。
# 加载最终推理模型配置,匹配保存的模型
final_model_config = DemoLLMConfig(
vocab_size=DEMO_VOCAB_SIZE_FINAL,
hidden_size=DEMO_HIDDEN_SIZE,
intermediate_size=DEMO_INTERMEDIATE_SIZE,
num_hidden_layers=DEMO_NUM_LAYERS,
num_attention_heads=DEMO_NUM_ATTENTION_HEADS,
num_key_value_heads=DEMO_NUM_KV_HEADS,
max_position_embeddings=DEMO_MAX_SEQ_LEN,
bos_token_id=tokenizer.bos_token_id,
eos_token_id=tokenizer.eos_token_id,
pad_token_id=tokenizer.pad_token_id,
)# 初始化并加载训练好的模型权重
final_thinking_llm = DemoLLMForCausalLM(final_model_config).to(DEVICE)
final_thinking_llm.load_state_dict(torch.load(final_reasoning_model_path, map_location=DEVICE))
final_thinking_llm.eval()# 使用分词器的聊天模板准备提示
user_query = "If I have 3 apples and eat 1, how many are left?"
chat_history = [{"role": "user", "content": user_query}]
prompt_text = tokenizer.apply_chat_template(chat_history, tokenize=False, add_generation_prompt=True)# 对提示进行分词,并将输入 ID 移至设备
input_ids = tokenizer(prompt_text, return_tensors="pt").input_ids.to(DEVICE)# 使用采样参数生成模型回答
with torch.no_grad(), autocast_ctx_rsn:
generated_ids = final_thinking_llm.generate(
input_ids,
max_new_tokens=DEMO_MAX_SEQ_LEN - 10,
do_sample=True,
temperature=0.7,
top_k=10,
eos_token_id=tokenizer.eos_token_id,
pad_token_id=tokenizer.pad_token_id,
)# 解码生成的标记,跳过特殊标记
assistant_response = tokenizer.decode(generated_ids[0][input_ids.shape[1]:], skip_special_tokens=True)# 辅助函数,用于提取标签内的内容
def extract_tag(text, tag):
start_tag = f"<{tag}>"
end_tag = f"</{tag}>"
start_idx = text.find(start_tag)
end_idx = text.find(end_tag)
if start_idx != -1 and end_idx != -1:
return text[start_idx + len(start_tag):end_idx].strip()
return None# 提取 <think> 和 <answer> 部分(如果存在)
think = extract_tag(assistant_response, "think") or "未找到"
answer = extract_tag(assistant_response, "answer") or assistant_response# 打印解析后的思考和回答部分
print(f"<think>: {think}")
print(f"<answer>: {answer}")
我们已经看到,在非常小的样本数据集上训练的模型生成的结果毫无意义。
然而,我也在更好的数据集(例如 HFHub RL 数据)上对 RLHF 模型进行了微调,结果比我们在小数据集上训练的模型要好得多。看看下面的内容。
# 用户查询 If I have 3 apples and eat 1, how many are left?# 200 万参数 RLHF LLM 输出(在样本 RLHD 数据上)
<think>
ugh far igu when # 200 万参数 SFT LLM 输出(在 HFHub RLHF 数据上)
<think> 用户询问苹果问题,还剩下多少个?</think>2</think
>
我们在样本数据集上训练的模型生成了乱码,但在更大数据集上训练的模型(参数类似)开始识别思考标签,尽管有一些错误。
最重要的是,它预测了正确的答案,尽管思考标签有一些小问题,但这些问题肯定可以通过在更广泛的数据上进行更大规模的训练来改进。
建议
代码:https://github.com/xrzlizheng/PyLLMFromScratch/blob/main/llm_reasoning_pipeline_sft_rlhf.py
我建议从开发一个拥有超过 1300 万参数的模型开始,然后逐步增加参数,每次增加 100 万,专注于提升其在较短上下文中的表现。
你可以根据具体目标决定要训练多少额外的参数。当模型接近 10 亿参数时,可以考虑在特定领域的数据集上进行微调,例如邮件或论文写作,以评估它在这些领域的文本生成能力。