欢迎来到啾啾的博客🐱。
记录学习点滴。分享工作思考和实用技巧,偶尔也分享一些杂谈💬。
有很多很多不足的地方,欢迎评论交流,感谢您的阅读和评论😄。
引言
AI使用声明:在内容整理、结构优化和语言表达的过程中,我使用了人工智能(AI)工具作为辅助。
如果以LLM应用工程师为目标,其实我们并不需要熟练掌握PyTorch,熟练掌握Transformer,但是我们必须对这两者与其背后的信息有基本的了解诶,进而更好的团队协作,以及微调模型。
本篇是一个完整的从0开始构建Transformer的Demo。
代码由QWen3-Coder生成,可以运行调试。
1 一个完整的Transformer模型
![[从零构建TransformerP2-新闻分类Demo.png]]
2 需要准备的“工具包”
工具 | 作用 |
---|---|
nn.Embedding |
词嵌入 |
nn.Linear |
投影层 |
F.softmax , F.relu |
激活函数 |
torch.matmul |
矩阵乘法(注意力核心) |
mask (triu, masked_fill) |
实现因果注意力 |
LayerNorm , Dropout |
稳定训练 |
nn.ModuleList |
堆叠多层 |
DataLoader |
批量加载数据 |
3 Demo
"""
基于Transformer的新闻分类模型
严格按照设计流程实现,每个组件都有明确设计依据
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from torch.utils.data import Dataset, DataLoader
from typing import Dict, List, Optional, Tuple
# ==============================================
# 第一部分:基础组件设计(根据设计决策选择)
# ==============================================
class TokenEmbedding(nn.Module):
"""
词嵌入层:将输入的词ID映射为密集向量表示
设计依据:
- 文本任务需要词嵌入表示语义
- 乘以sqrt(d_model)稳定初始化方差(原论文做法)
"""
def __init__(self, vocab_size: int, d_model: int):
super().__init__()
self.embedding = nn.Embedding(vocab_size, d_model)
self.d_model = d_model
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
前向传播
参数:
x: 输入词ID张量,形状为(batch_size, seq_len)
返回:
嵌入后的张量,形状为(batch_size, seq_len, d_model)
""" # 原论文建议乘以sqrt(d_model)来稳定方差
return self.embedding(x) * math.sqrt(self.d_model)
class PositionalEncoding(nn.Module):
"""
位置编码:为输入序列添加位置信息
设计依据:
- Transformer没有顺序感知能力,必须添加位置信息
- 选择可学习位置编码(更灵活,适合变长序列)
"""
def __init__(self, d_model: int, max_len: int = 512):
super().__init__()
self.pos_embedding = nn.Embedding(max_len, d_model)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
前向传播
参数:
x: 输入张量,形状为(batch_size, seq_len, d_model)
返回:
添加位置编码后的张量
""" batch_size, seq_len = x.size(0), x.size(1)
# 生成位置ID: [0, 1, 2, ..., seq_len-1]
positions = torch.arange(seq_len, device=x.device).unsqueeze(0).expand(batch_size, -1)
return x + self.pos_embedding(positions)
class MultiHeadAttention(nn.Module):
"""
多头注意力机制
设计依据:
- 需要建模词与词之间的关系(自注意力)
- 多头机制允许模型在不同子空间关注不同关系
"""
def __init__(self, d_model: int, num_heads: int, dropout: float = 0.1):
super().__init__()
assert d_model % num_heads == 0, "d_model必须能被num_heads整除"
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
# 线性变换层
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(dropout)
def scaled_dot_product_attention(
self,
q: torch.Tensor,
k: torch.Tensor,
v: torch.Tensor,
mask: Optional[torch.Tensor] = None
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
缩放点积注意力
参数:
q: 查询张量,形状为(batch_size, num_heads, seq_len, d_k)
k: 键张量,形状为(batch_size, num_heads, seq_len, d_k)
v: 值张量,形状为(batch_size, num_heads, seq_len, d_k)
mask: 注意力掩码,用于屏蔽padding或未来位置
返回:
attention_output: 注意力输出
attention_weights: 注意力权重(可用于可视化)
""" attn_scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)
if mask is not None:
# 将mask为0的位置设为极小值,使softmax后为0
attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
attn_probs = F.softmax(attn_scores, dim=-1)
attn_probs = self.dropout(attn_probs)
output = torch.matmul(attn_probs, v)
return output, attn_probs
def split_heads(self, x: torch.Tensor) -> torch.Tensor:
"""将输入拆分为多个头"""
batch_size = x.size(0)
x = x.view(batch_size, -1, self.num_heads, self.d_k)
return x.transpose(1, 2) # (batch_size, num_heads, seq_len, d_k)
def combine_heads(self, x: torch.Tensor) -> torch.Tensor:
"""将多个头合并回原始形状"""
batch_size = x.size(0)
x = x.transpose(1, 2).contiguous()
return x.view(batch_size, -1, self.d_model)
def forward(
self,
q: torch.Tensor,
k: torch.Tensor,
v: torch.Tensor,
mask: Optional[torch.Tensor] = None
) -> torch.Tensor:
"""
前向传播
参数:
q, k, v: 查询、键、值张量,形状为(batch_size, seq_len, d_model)
mask: 注意力掩码
返回:
多头注意力输出,形状为(batch_size, seq_len, d_model)
""" q = self.split_heads(self.W_q(q))
k = self.split_heads(self.W_k(k))
v = self.split_heads(self.W_v(v))
attn_output, _ = self.scaled_dot_product_attention(q, k, v, mask)
output = self.W_o(self.combine_heads(attn_output))
return output
class FeedForward(nn.Module):
"""
前馈神经网络
设计依据:
- 每个位置独立处理,增强模型表示能力
- 通常d_ff = 4 * d_model(原论文比例)
"""
def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
super().__init__()
self.fc1 = nn.Linear(d_model, d_ff)
self.fc2 = nn.Linear(d_ff, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = F.gelu(self.fc1(x))
x = self.dropout(x)
x = self.fc2(x)
return x
class EncoderLayer(nn.Module):
"""
编码器层
设计依据:
- 新闻分类需要双向上下文理解
- 残差连接和层归一化提升训练稳定性
"""
def __init__(self, d_model: int, num_heads: int, d_ff: int, dropout: float = 0.1):
super().__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
self.ffn = FeedForward(d_model, d_ff, dropout)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> torch.Tensor:
# 自注意力 + 残差连接 + 层归一化
attn_output = self.self_attn(x, x, x, mask)
x = self.norm1(x + self.dropout(attn_output))
# 前馈网络 + 残差连接 + 层归一化
ffn_output = self.ffn(x)
x = self.norm2(x + self.dropout(ffn_output))
return x
# ==============================================
# 第二部分:完整模型组装(根据设计决策)
# ==============================================
class NewsClassifier(nn.Module):
"""
新闻分类Transformer模型
设计决策回顾:
- 任务类型:文本分类(Encoder-only)
- 输入:新闻文本序列
- 输出:新闻类别(体育、科技、娱乐等)
- 架构选择:Encoder-only(无需生成能力)
- 输入表示:Token Embedding + 可学习位置编码
- 输出头:[CLS] token + 分类层
""" def __init__(
self,
vocab_size: int,
d_model: int = 768,
num_heads: int = 12,
num_layers: int = 6,
d_ff: int = 3072,
num_classes: int = 10,
max_len: int = 512,
dropout: float = 0.1
):
"""
参数:
vocab_size: 词汇表大小
d_model: 模型维度(默认768,与BERT-base一致)
num_heads: 注意力头数(默认12,与BERT-base一致)
num_layers: 编码器层数(默认6,平衡性能与计算成本)
d_ff: FFN隐藏层维度(默认3072=4*d_model)
num_classes: 分类类别数
max_len: 最大序列长度
dropout: dropout概率
""" super().__init__()
self.d_model = d_model
# 1. 特殊token(设计依据:BERT-style分类需要[CLS])
self.cls_token = nn.Parameter(torch.randn(1, 1, d_model))
# 2. 词嵌入层
self.token_embedding = TokenEmbedding(vocab_size, d_model)
# 3. 位置编码(设计依据:选择可学习位置编码)
self.pos_encoding = PositionalEncoding(d_model, max_len)
# 4. 编码器层堆叠
self.encoder_layers = nn.ModuleList([
EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)
])
# 5. 分类头(设计依据:使用[CLS] token进行分类)
self.classifier = nn.Sequential(
nn.Linear(d_model, d_model),
nn.GELU(),
nn.Linear(d_model, num_classes)
)
self.dropout = nn.Dropout(dropout)
# 权重初始化(设计依据:稳定训练)
self._init_weights()
def _init_weights(self):
"""初始化模型权重"""
for module in self.modules():
if isinstance(module, nn.Linear):
nn.init.xavier_uniform_(module.weight)
if module.bias is not None:
nn.init.zeros_(module.bias)
elif isinstance(module, nn.Embedding):
nn.init.normal_(module.weight, mean=0.0, std=0.02)
elif isinstance(module, nn.LayerNorm):
nn.init.ones_(module.weight)
nn.init.zeros_(module.bias)
def add_cls_token(self, x: torch.Tensor) -> torch.Tensor:
"""
在序列开头添加[CLS] token
设计依据:BERT-style分类使用[CLS]聚合全局信息
参数:
x: 输入张量,形状为(batch_size, seq_len, d_model)
返回:
添加[CLS]后的张量,形状为(batch_size, seq_len+1, d_model)
""" batch_size = x.size(0)
cls_tokens = self.cls_token.expand(batch_size, -1, -1)
return torch.cat((cls_tokens, x), dim=1)
def create_padding_mask(self, input_ids: torch.Tensor, pad_idx: int = 0) -> torch.Tensor:
"""
创建padding掩码
设计依据:处理变长序列,忽略padding位置
参数:
input_ids: 输入ID张量,形状为(batch_size, seq_len)
pad_idx: padding token的ID
返回:
掩码张量,形状为(batch_size, 1, 1, seq_len)
True表示有效位置,False表示padding位置 (BoolTensor) """ # 创建布尔掩码,非pad为True
mask = (input_ids != pad_idx).unsqueeze(1).unsqueeze(2) # (batch_size, 1, 1, seq_len)
return mask.bool() # 确保返回的是布尔类型
def forward(self, input_ids: torch.Tensor, attention_mask: Optional[torch.Tensor] = None) -> torch.Tensor:
"""
前向传播
参数:
input_ids: 输入词ID,形状为(batch_size, original_seq_len)
attention_mask: 可选的注意力掩码,形状为(batch_size, original_seq_len)。
1.0 表示有效位置,0.0 表示padding位置。
如果提供,应为浮点类型 (如 torch.float) 或布尔类型。
如果为 None,则根据 input_ids 自动创建。
返回:
分类logits,形状为(batch_size, num_classes)
""" batch_size, original_seq_len = input_ids.size()
# 1. 词嵌入
x = self.token_embedding(input_ids) # (batch_size, original_seq_len, d_model)
# 2. 添加[CLS] token
x = self.add_cls_token(x) # (batch_size, original_seq_len + 1, d_model)
new_seq_len = x.size(1) # 获取添加[CLS]后的序列长度
# 3. 位置编码
x = self.pos_encoding(x)
x = self.dropout(x)
# 4. 准备注意力掩码 (用于屏蔽padding)
if attention_mask is not None:
# 如果提供了 attention_mask,确保其为四维且为布尔类型
# 预期输入形状: (batch_size, original_seq_len)
# 目标形状: (batch_size, 1, 1, original_seq_len)
if attention_mask.dim() == 2:
# 假设非零值为有效位置
attention_mask_for_padding = (attention_mask != 0).unsqueeze(1).unsqueeze(2)
elif attention_mask.dim() == 4:
attention_mask_for_padding = (attention_mask.squeeze(1).squeeze(1) != 0).unsqueeze(1).unsqueeze(2)
else:
raise ValueError(f"attention_mask must be 2D or 4D, but got {attention_mask.dim()}D")
else:
# 如果没有提供,根据 input_ids 自动创建
# 形状: (batch_size, 1, 1, original_seq_len)
attention_mask_for_padding = self.create_padding_mask(input_ids)
# --- 关键修复:正确扩展 mask 以适应添加了 [CLS] token 后的新序列长度 --- # 创建一个针对新序列长度 (new_seq_len = original_seq_len + 1) 的掩码
# [CLS] token (索引 0) 应该总是被 attend 到,所以我们需要扩展 mask # 1. 初始化一个全为 True 的新掩码,形状 (batch_size, 1, 1, new_seq_len) expanded_mask = torch.ones((batch_size, 1, 1, new_seq_len), dtype=torch.bool, device=x.device)
# 2. 将原始 padding mask 复制到新 mask 的 [1:] 位置 (跳过 [CLS]) # 原始 mask 形状: (batch_size, 1, 1, original_seq_len)
# 新 mask 的 [1:] 部分形状: (batch_size, 1, 1, original_seq_len)
expanded_mask[:, :, :, 1:] = attention_mask_for_padding
# 最终用于注意力的掩码,形状 (batch_size, 1, 1, new_seq_len) # 在 MultiHeadAttention 中,这个掩码会被广播用于屏蔽 key (src_seq) 的 padding 位置
final_attention_mask = expanded_mask
# 5. 通过编码器层
# 将扩展后的 mask 传递给每一层,以屏蔽 padding for layer in self.encoder_layers:
x = layer(x, final_attention_mask) # 传递匹配新序列长度的 mask
# 6. 取[CLS] token作为句子表示
cls_output = x[:, 0, :] # (batch_size, d_model)
# 7. 分类
logits = self.classifier(cls_output)
return logits
# ==============================================
# 第三部分:训练流程(根据设计决策)
# ==============================================
def train_news_classifier():
"""新闻分类模型训练流程"""
# 1. 超参数设置(根据设计决策)
config = {
"vocab_size": 30000, # 词汇表大小(设计依据:新闻领域常用词)
"d_model": 768, # 模型维度(设计依据:平衡性能与计算成本)
"num_heads": 12, # 注意力头数(设计依据:与d_model匹配)
"num_layers": 6, # 编码器层数(设计依据:足够捕捉复杂关系)
"d_ff": 3072, # FFN维度(设计依据:4*d_model)
"num_classes": 10, # 分类类别数(设计依据:新闻类别数量)
"max_len": 512, # 最大序列长度(设计依据:覆盖大多数新闻)
"dropout": 0.1, # dropout概率(设计依据:防止过拟合)
"batch_size": 32, # 批量大小(设计依据:GPU内存限制)
"learning_rate": 2e-5, # 学习率(设计依据:微调预训练模型常用值)
"epochs": 3, # 训练轮数(设计依据:避免过拟合)
"warmup_steps": 500, # warmup步数(设计依据:稳定训练初期)
"weight_decay": 0.01 # 权重衰减(设计依据:正则化)
}
# 2. 创建模型
print("✅ 创建新闻分类模型...")
model = NewsClassifier(
vocab_size=config["vocab_size"],
d_model=config["d_model"],
num_heads=config["num_heads"],
num_layers=config["num_layers"],
d_ff=config["d_ff"],
num_classes=config["num_classes"],
max_len=config["max_len"],
dropout=config["dropout"]
)
# 3. 设备选择
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f" 模型将运行在: {device}")
# 4. 伪造数据集(实际应用中替换为真实数据)
class NewsDataset(Dataset):
def __init__(self, num_samples: int = 1000, max_len: int = 512):
self.num_samples = num_samples
self.max_len = max_len
def __len__(self):
return self.num_samples
def __getitem__(self, idx):
# 伪造新闻文本(词ID)
seq_len = min(500, 100 + idx % 400) # 变长序列
input_ids = torch.randint(1, 30000, (seq_len,))
# 伪造类别标签(0-9)
label = torch.tensor(idx % 10, dtype=torch.long)
return input_ids, label
# 5. 数据加载器(处理变长序列的关键)
def collate_fn(batch):
"""处理变长序列的collate函数"""
input_ids, labels = zip(*batch)
# 找出最大长度
max_len = max(len(ids) for ids in input_ids)
# padding
padded_ids = []
for ids in input_ids:
padding = torch.zeros(max_len - len(ids), dtype=torch.long)
padded_ids.append(torch.cat([ids, padding]))
input_ids = torch.stack(padded_ids)
labels = torch.stack(labels)
return input_ids, labels
print("✅ 创建数据集和数据加载器...")
train_dataset = NewsDataset(num_samples=1000)
train_loader = DataLoader(
train_dataset,
batch_size=config["batch_size"],
shuffle=True,
collate_fn=collate_fn
)
# 6. 损失函数和优化器
print("✅ 配置训练组件...")
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(
model.parameters(),
lr=config["learning_rate"],
weight_decay=config["weight_decay"]
)
# 7. 学习率调度器(设计依据:warmup + linear decay)
total_steps = len(train_loader) * config["epochs"]
warmup_steps = config["warmup_steps"]
def lr_lambda(current_step: int):
if current_step < warmup_steps:
return float(current_step) / float(max(1, warmup_steps))
return max(
0.0, float(total_steps - current_step) / float(max(1, total_steps - warmup_steps))
)
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)
# 8. 训练循环
print("🚀 开始训练...")
for epoch in range(config["epochs"]):
model.train()
total_loss = 0
for batch_idx, (input_ids, labels) in enumerate(train_loader):
input_ids = input_ids.to(device)
labels = labels.to(device)
# 前向传播
optimizer.zero_grad()
logits = model(input_ids)
loss = loss_fn(logits, labels)
# 反向传播
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # 梯度裁剪
optimizer.step()
scheduler.step()
total_loss += loss.item()
# 打印进度
if batch_idx % 50 == 0:
avg_loss = total_loss / (batch_idx + 1)
current_lr = optimizer.param_groups[0]['lr']
print(f"Epoch [{epoch + 1}/{config['epochs']}] | "
f"Batch [{batch_idx}/{len(train_loader)}] | "
f"Loss: {avg_loss:.4f} | " f"LR: {current_lr:.2e}")
print(f"✅ Epoch {epoch + 1} 完成 | Average Loss: {total_loss / len(train_loader):.4f}")
# 9. 保存模型
torch.save(model.state_dict(), "news_classifier.pth")
print("💾 模型已保存至 news_classifier.pth")
# ==============================================
# 第四部分:推理示例
# ==============================================
def predict_news_category(text: str, model: NewsClassifier, tokenizer, device: torch.device):
"""
新闻分类推理
设计依据:
- 使用与训练相同的预处理流程
- 取[CLS] token进行分类
参数:
text: 新闻文本
model: 训练好的模型
tokenizer: 文本分词器
device: 设备
返回:
预测类别和概率
""" model.eval()
# 1. 文本预处理
input_ids = tokenizer.encode(text, max_length=512, truncation=True, padding="max_length")
input_ids = torch.tensor(input_ids).unsqueeze(0).to(device)
# 2. 前向传播
with torch.no_grad():
logits = model(input_ids)
probs = F.softmax(logits, dim=-1)
# 3. 获取结果
predicted_class = torch.argmax(probs, dim=-1).item()
confidence = probs[0, predicted_class].item()
return predicted_class, confidence
if __name__ == "__main__":
# 这里只是演示结构,实际运行需要完整实现
print("=" * 50)
print("Transformer新闻分类模型设计与实现")
print("=" * 50)
print("\n本示例演示了如何根据任务需求设计并实现一个Transformer模型")
print("设计流程严格遵循:问题分析 → 架构选择 → 组件设计 → 训练实现")
print("\n关键设计决策:")
print("- 选择Encoder-only架构(分类任务无需生成能力)")
print("- 使用[CLS] token进行分类(BERT-style)")
print("- 可学习位置编码(更适合变长新闻文本)")
print("- 6层编码器(平衡性能与计算成本)")
print("\n要运行完整训练,请取消注释train_news_classifier()调用")
train_news_classifier()