PyTorch 中训练语言模型过程

发布于:2025-09-04 ⋅ 阅读:(16) ⋅ 点赞:(0)

🏗️ 完整的语言模型训练流程

1. 数据准备和预处理

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import re

class TextDataset(Dataset):
    def __init__(self, text, seq_length=50):
        self.seq_length = seq_length
        self.text = text
        
        # 构建词汇表
        self._build_vocab()
        self._encode_text()
    
    def _build_vocab(self):
        # 简单的文本清洗和分词
        tokens = re.findall(r'\w+|[^\w\s]', self.text.lower())
        token_counts = Counter(tokens)
        
        # 创建词汇表(保留最常见的10000个词)
        self.vocab = {
            '<PAD>': 0,  # 填充符
            '<UNK>': 1,  # 未知词
            '<SOS>': 2,  # 序列开始
            '<EOS>': 3   # 序列结束
        }
        
        # 添加最常见的词
        for i, (token, _) in enumerate(token_counts.most_common(10000), start=4):
            self.vocab[token] = i
        
        self.idx2word = {idx: word for word, idx in self.vocab.items()}
        self.vocab_size = len(self.vocab)
    
    def _encode_text(self):
        tokens = re.findall(r'\w+|[^\w\s]', self.text.lower())
        self.encoded = [self.vocab.get(token, 1) for token in tokens]  # 1是<UNK>
    
    def __len__(self):
        return len(self.encoded) - self.seq_length
    
    def __getitem__(self, idx):
        # 输入序列
        input_seq = self.encoded[idx:idx + self.seq_length]
        # 目标序列(下一个词)
        target_seq = self.encoded[idx + 1:idx + self.seq_length + 1]
        
        return torch.tensor(input_seq), torch.tensor(target_seq)

# 示例文本数据
sample_text = """
自然语言处理是人工智能的一个重要领域。
深度学习模型如Transformer在NLP任务中表现出色。
语言模型可以预测下一个词的概率分布。
PyTorch提供了灵活的深度学习框架。
"""

dataset = TextDataset(sample_text, seq_length=20)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

2. 定义语言模型架构

class LanguageModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim=128, hidden_dim=256, num_layers=2, dropout=0.2):
        super().__init__()
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        
        # 词嵌入层
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        # LSTM层(也可以是Transformer)
        self.lstm = nn.LSTM(
            embedding_dim, 
            hidden_dim, 
            num_layers, 
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0
        )
        
        # 输出层
        self.fc = nn.Linear(hidden_dim, vocab_size)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, hidden=None):
        # 词嵌入
        embedded = self.embedding(x)  # (batch_size, seq_len, embedding_dim)
        
        # LSTM前向传播
        lstm_out, hidden = self.lstm(embedded, hidden)
        
        # 通过全连接层
        output = self.fc(self.dropout(lstm_out))  # (batch_size, seq_len, vocab_size)
        
        return output, hidden
    
    def init_hidden(self, batch_size):
        """初始化隐藏状态"""
        weight = next(self.parameters())
        return (
            weight.new_zeros(self.lstm.num_layers, batch_size, self.hidden_dim),
            weight.new_zeros(self.lstm.num_layers, batch_size, self.hidden_dim)
        )

# 创建模型实例
model = LanguageModel(vocab_size=dataset.vocab_size)
print(f"模型参数量: {sum(p.numel() for p in model.parameters()):,}")

3. 训练循环

def train_language_model(model, dataloader, num_epochs=10, learning_rate=0.001):
    # 定义损失函数和优化器
    criterion = nn.CrossEntropyLoss(ignore_index=0)  # 忽略填充符
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)
    
    model.train()
    
    for epoch in range(num_epochs):
        total_loss = 0
        hidden = None
        
        for batch_idx, (inputs, targets) in enumerate(dataloader):
            # 清零梯度
            optimizer.zero_grad()
            
            # 如果需要,重新初始化隐藏状态
            if hidden is not None:
                hidden = (hidden[0].detach(), hidden[1].detach())
            else:
                hidden = model.init_hidden(inputs.size(0))
            
            # 前向传播
            outputs, hidden = model(inputs, hidden)
            
            # 计算损失
            # 将输出reshape为 (batch_size * seq_len, vocab_size)
            # 将目标reshape为 (batch_size * seq_len)
            loss = criterion(
                outputs.reshape(-1, outputs.size(-1)), 
                targets.reshape(-1)
            )
            
            # 反向传播
            loss.backward()
            
            # 梯度裁剪(防止梯度爆炸)
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            
            # 更新参数
            optimizer.step()
            
            total_loss += loss.item()
            
            if batch_idx % 10 == 0:
                print(f'Epoch {epoch+1}/{num_epochs}, '
                      f'Batch {batch_idx}/{len(dataloader)}, '
                      f'Loss: {loss.item():.4f}')
        
        # 更新学习率
        scheduler.step()
        
        avg_loss = total_loss / len(dataloader)
        print(f'Epoch {epoch+1} completed, Average Loss: {avg_loss:.4f}')
    
    return model

# 开始训练
trained_model = train_language_model(model, dataloader, num_epochs=20)

4. 文本生成(推理)

def generate_text(model, start_text, dataset, max_length=50, temperature=0.8):
    model.eval()
    
    # 将起始文本编码
    tokens = re.findall(r'\w+|[^\w\s]', start_text.lower())
    input_ids = [dataset.vocab.get(token, 1) for token in tokens]
    
    generated = input_ids.copy()
    hidden = None
    
    with torch.no_grad():
        for _ in range(max_length):
            # 准备输入
            input_tensor = torch.tensor([input_ids]).long()
            
            # 前向传播
            if hidden is not None:
                hidden = (hidden[0].detach(), hidden[1].detach())
            
            output, hidden = model(input_tensor, hidden)
            
            # 获取最后一个时间步的输出
            last_output = output[0, -1, :] / temperature
            
            # 应用softmax获取概率分布
            probabilities = torch.softmax(last_output, dim=-1)
            
            # 从分布中采样下一个词
            next_token_id = torch.multinomial(probabilities, 1).item()
            
            # 如果是结束符则停止
            if next_token_id == 3:  # <EOS>
                break
                
            generated.append(next_token_id)
            input_ids = [next_token_id]  # 用生成的词作为下一个输入
    
    # 将ID转换回文本
    generated_text = ' '.join([dataset.idx2word.get(idx, '<UNK>') for idx in generated])
    return generated_text

# 生成文本示例
start_text = "自然语言处理"
generated = generate_text(trained_model, start_text, dataset, max_length=20)
print(f"生成的文本: {generated}")

5. 使用Transformer架构(现代选择)

class TransformerLanguageModel(nn.Module):
    def __init__(self, vocab_size, d_model=256, nhead=8, num_layers=4, dropout=0.1):
        super().__init__()
        self.d_model = d_model
        
        # 词嵌入
        self.embedding = nn.Embedding(vocab_size, d_model)
        
        # 位置编码
        self.pos_encoder = nn.Embedding(1000, d_model)  # 简单的位置嵌入
        
        # Transformer编码器
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, 
            nhead=nhead, 
            dropout=dropout,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        # 输出层
        self.fc = nn.Linear(d_model, vocab_size)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        batch_size, seq_len = x.size()
        
        # 词嵌入
        x = self.embedding(x) * (self.d_model ** 0.5)  # 缩放
        
        # 位置编码
        positions = torch.arange(seq_len, device=x.device).expand(batch_size, seq_len)
        x = x + self.pos_encoder(positions)
        
        # Transformer
        x = self.dropout(x)
        transformer_out = self.transformer(x)
        
        # 输出
        output = self.fc(transformer_out)
        return output

# 使用Transformer模型
transformer_model = TransformerLanguageModel(vocab_size=dataset.vocab_size)

6. 保存和加载模型

def save_model(model, vocab, filepath):
    """保存模型和词汇表"""
    torch.save({
        'model_state_dict': model.state_dict(),
        'vocab': vocab,
        'model_config': {
            'vocab_size': len(vocab),
            'embedding_dim': model.embedding_dim,
            'hidden_dim': model.hidden_dim,
            'num_layers': model.lstm.num_layers
        }
    }, filepath)

def load_model(filepath):
    """加载模型"""
    checkpoint = torch.load(filepath)
    
    model = LanguageModel(**checkpoint['model_config'])
    model.load_state_dict(checkpoint['model_state_dict'])
    
    return model, checkpoint['vocab']

# 保存模型
save_model(trained_model, dataset.vocab, 'language_model.pth')

🎯 关键技巧和最佳实践

1. 数据处理优化

# 使用更高效的数据加载
from torch.utils.data import DataLoader

dataloader = DataLoader(
    dataset, 
    batch_size=64, 
    shuffle=True,
    num_workers=4,  # 多进程加载
    pin_memory=True  # 如果使用GPU
)

2. 梯度累积(处理大batch)

accumulation_steps = 4
optimizer.zero_grad()

for i, (inputs, targets) in enumerate(dataloader):
    outputs, hidden = model(inputs, hidden)
    loss = criterion(outputs.view(-1, outputs.size(-1)), targets.view(-1))
    loss = loss / accumulation_steps  # 归一化损失
    loss.backward()
    
    if (i + 1) % accumulation_steps == 0:
        optimizer.step()
        optimizer.zero_grad()
        hidden = None  # 重置隐藏状态

3. 学习率调度

# 使用更先进的学习率调度
from torch.optim.lr_scheduler import CosineAnnealingLR

scheduler = CosineAnnealingLR(optimizer, T_max=num_epochs, eta_min=1e-6)

💡 总结

训练语言模型的关键步骤:

  1. 数据预处理:构建词汇表,文本编码
  2. 模型架构:选择LSTM或Transformer
  3. 训练循环:前向传播、损失计算、反向传播
  4. 文本生成:使用训练好的模型生成文本
  5. 模型保存:保存模型权重和词汇表

建议

  • 从小数据集开始实验
  • 使用GPU加速训练(model.cuda()
  • 监控训练损失和验证困惑度(perplexity)
  • 尝试不同的超参数组合

这个框架可以扩展到训练更大的语言模型,只需要增加数据量、模型规模和训练时间。


网站公告

今日签到

点亮在社区的每一天
去签到