🏗️ 完整的语言模型训练流程
1. 数据准备和预处理
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import re
class TextDataset(Dataset):
def __init__(self, text, seq_length=50):
self.seq_length = seq_length
self.text = text
# 构建词汇表
self._build_vocab()
self._encode_text()
def _build_vocab(self):
# 简单的文本清洗和分词
tokens = re.findall(r'\w+|[^\w\s]', self.text.lower())
token_counts = Counter(tokens)
# 创建词汇表(保留最常见的10000个词)
self.vocab = {
'<PAD>': 0, # 填充符
'<UNK>': 1, # 未知词
'<SOS>': 2, # 序列开始
'<EOS>': 3 # 序列结束
}
# 添加最常见的词
for i, (token, _) in enumerate(token_counts.most_common(10000), start=4):
self.vocab[token] = i
self.idx2word = {idx: word for word, idx in self.vocab.items()}
self.vocab_size = len(self.vocab)
def _encode_text(self):
tokens = re.findall(r'\w+|[^\w\s]', self.text.lower())
self.encoded = [self.vocab.get(token, 1) for token in tokens] # 1是<UNK>
def __len__(self):
return len(self.encoded) - self.seq_length
def __getitem__(self, idx):
# 输入序列
input_seq = self.encoded[idx:idx + self.seq_length]
# 目标序列(下一个词)
target_seq = self.encoded[idx + 1:idx + self.seq_length + 1]
return torch.tensor(input_seq), torch.tensor(target_seq)
# 示例文本数据
sample_text = """
自然语言处理是人工智能的一个重要领域。
深度学习模型如Transformer在NLP任务中表现出色。
语言模型可以预测下一个词的概率分布。
PyTorch提供了灵活的深度学习框架。
"""
dataset = TextDataset(sample_text, seq_length=20)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
2. 定义语言模型架构
class LanguageModel(nn.Module):
def __init__(self, vocab_size, embedding_dim=128, hidden_dim=256, num_layers=2, dropout=0.2):
super().__init__()
self.vocab_size = vocab_size
self.embedding_dim = embedding_dim
self.hidden_dim = hidden_dim
# 词嵌入层
self.embedding = nn.Embedding(vocab_size, embedding_dim)
# LSTM层(也可以是Transformer)
self.lstm = nn.LSTM(
embedding_dim,
hidden_dim,
num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0
)
# 输出层
self.fc = nn.Linear(hidden_dim, vocab_size)
self.dropout = nn.Dropout(dropout)
def forward(self, x, hidden=None):
# 词嵌入
embedded = self.embedding(x) # (batch_size, seq_len, embedding_dim)
# LSTM前向传播
lstm_out, hidden = self.lstm(embedded, hidden)
# 通过全连接层
output = self.fc(self.dropout(lstm_out)) # (batch_size, seq_len, vocab_size)
return output, hidden
def init_hidden(self, batch_size):
"""初始化隐藏状态"""
weight = next(self.parameters())
return (
weight.new_zeros(self.lstm.num_layers, batch_size, self.hidden_dim),
weight.new_zeros(self.lstm.num_layers, batch_size, self.hidden_dim)
)
# 创建模型实例
model = LanguageModel(vocab_size=dataset.vocab_size)
print(f"模型参数量: {sum(p.numel() for p in model.parameters()):,}")
3. 训练循环
def train_language_model(model, dataloader, num_epochs=10, learning_rate=0.001):
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss(ignore_index=0) # 忽略填充符
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)
model.train()
for epoch in range(num_epochs):
total_loss = 0
hidden = None
for batch_idx, (inputs, targets) in enumerate(dataloader):
# 清零梯度
optimizer.zero_grad()
# 如果需要,重新初始化隐藏状态
if hidden is not None:
hidden = (hidden[0].detach(), hidden[1].detach())
else:
hidden = model.init_hidden(inputs.size(0))
# 前向传播
outputs, hidden = model(inputs, hidden)
# 计算损失
# 将输出reshape为 (batch_size * seq_len, vocab_size)
# 将目标reshape为 (batch_size * seq_len)
loss = criterion(
outputs.reshape(-1, outputs.size(-1)),
targets.reshape(-1)
)
# 反向传播
loss.backward()
# 梯度裁剪(防止梯度爆炸)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# 更新参数
optimizer.step()
total_loss += loss.item()
if batch_idx % 10 == 0:
print(f'Epoch {epoch+1}/{num_epochs}, '
f'Batch {batch_idx}/{len(dataloader)}, '
f'Loss: {loss.item():.4f}')
# 更新学习率
scheduler.step()
avg_loss = total_loss / len(dataloader)
print(f'Epoch {epoch+1} completed, Average Loss: {avg_loss:.4f}')
return model
# 开始训练
trained_model = train_language_model(model, dataloader, num_epochs=20)
4. 文本生成(推理)
def generate_text(model, start_text, dataset, max_length=50, temperature=0.8):
model.eval()
# 将起始文本编码
tokens = re.findall(r'\w+|[^\w\s]', start_text.lower())
input_ids = [dataset.vocab.get(token, 1) for token in tokens]
generated = input_ids.copy()
hidden = None
with torch.no_grad():
for _ in range(max_length):
# 准备输入
input_tensor = torch.tensor([input_ids]).long()
# 前向传播
if hidden is not None:
hidden = (hidden[0].detach(), hidden[1].detach())
output, hidden = model(input_tensor, hidden)
# 获取最后一个时间步的输出
last_output = output[0, -1, :] / temperature
# 应用softmax获取概率分布
probabilities = torch.softmax(last_output, dim=-1)
# 从分布中采样下一个词
next_token_id = torch.multinomial(probabilities, 1).item()
# 如果是结束符则停止
if next_token_id == 3: # <EOS>
break
generated.append(next_token_id)
input_ids = [next_token_id] # 用生成的词作为下一个输入
# 将ID转换回文本
generated_text = ' '.join([dataset.idx2word.get(idx, '<UNK>') for idx in generated])
return generated_text
# 生成文本示例
start_text = "自然语言处理"
generated = generate_text(trained_model, start_text, dataset, max_length=20)
print(f"生成的文本: {generated}")
5. 使用Transformer架构(现代选择)
class TransformerLanguageModel(nn.Module):
def __init__(self, vocab_size, d_model=256, nhead=8, num_layers=4, dropout=0.1):
super().__init__()
self.d_model = d_model
# 词嵌入
self.embedding = nn.Embedding(vocab_size, d_model)
# 位置编码
self.pos_encoder = nn.Embedding(1000, d_model) # 简单的位置嵌入
# Transformer编码器
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=nhead,
dropout=dropout,
batch_first=True
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
# 输出层
self.fc = nn.Linear(d_model, vocab_size)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
batch_size, seq_len = x.size()
# 词嵌入
x = self.embedding(x) * (self.d_model ** 0.5) # 缩放
# 位置编码
positions = torch.arange(seq_len, device=x.device).expand(batch_size, seq_len)
x = x + self.pos_encoder(positions)
# Transformer
x = self.dropout(x)
transformer_out = self.transformer(x)
# 输出
output = self.fc(transformer_out)
return output
# 使用Transformer模型
transformer_model = TransformerLanguageModel(vocab_size=dataset.vocab_size)
6. 保存和加载模型
def save_model(model, vocab, filepath):
"""保存模型和词汇表"""
torch.save({
'model_state_dict': model.state_dict(),
'vocab': vocab,
'model_config': {
'vocab_size': len(vocab),
'embedding_dim': model.embedding_dim,
'hidden_dim': model.hidden_dim,
'num_layers': model.lstm.num_layers
}
}, filepath)
def load_model(filepath):
"""加载模型"""
checkpoint = torch.load(filepath)
model = LanguageModel(**checkpoint['model_config'])
model.load_state_dict(checkpoint['model_state_dict'])
return model, checkpoint['vocab']
# 保存模型
save_model(trained_model, dataset.vocab, 'language_model.pth')
🎯 关键技巧和最佳实践
1. 数据处理优化
# 使用更高效的数据加载
from torch.utils.data import DataLoader
dataloader = DataLoader(
dataset,
batch_size=64,
shuffle=True,
num_workers=4, # 多进程加载
pin_memory=True # 如果使用GPU
)
2. 梯度累积(处理大batch)
accumulation_steps = 4
optimizer.zero_grad()
for i, (inputs, targets) in enumerate(dataloader):
outputs, hidden = model(inputs, hidden)
loss = criterion(outputs.view(-1, outputs.size(-1)), targets.view(-1))
loss = loss / accumulation_steps # 归一化损失
loss.backward()
if (i + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
hidden = None # 重置隐藏状态
3. 学习率调度
# 使用更先进的学习率调度
from torch.optim.lr_scheduler import CosineAnnealingLR
scheduler = CosineAnnealingLR(optimizer, T_max=num_epochs, eta_min=1e-6)
💡 总结
训练语言模型的关键步骤:
- 数据预处理:构建词汇表,文本编码
- 模型架构:选择LSTM或Transformer
- 训练循环:前向传播、损失计算、反向传播
- 文本生成:使用训练好的模型生成文本
- 模型保存:保存模型权重和词汇表
建议:
- 从小数据集开始实验
- 使用GPU加速训练(
model.cuda()
) - 监控训练损失和验证困惑度(perplexity)
- 尝试不同的超参数组合
这个框架可以扩展到训练更大的语言模型,只需要增加数据量、模型规模和训练时间。