这个是新开的一个系列用来手把手复现一些模型工程,之所以开这个系列是因为有人留言说看到一个工程不知道从哪里读起,出于对自身能力的提升与兴趣,故新开了这个系列。由于主要动机是顺一遍代码并提供注释。
该系列第一篇博客是 nanoGPT
,这个仓库是前OpenAI大佬 karpathy Andrej 的工程,基本是GPT的mini版,虽然最近一次更新是在2年前,部分组件和理念已经有些过时了,但作为入门工程而言是再适合不过的。
- 项目链接:https://github.com/karpathy/nanoGPT
- 代码分支:master
- commit:93a43d9a5c22450bbf06e78da2cb6eeef084b717
- 代码仓库:https://github.com/GaohaoZhou-ops/Model-Reproduction/tree/main/nanoGPT
写在最前面
- 在没有特殊说明的情况下不会对训练和微调进行实际操作,但会包含相关代码;
- 项目中的文件实现顺序以大标题的1,2,3 … 为依据,按照该顺序即可构建出完成工程;
- 文章会尽可能保留原始工程的目录结构,但对于一些git、assets等文件夹就不进行赘述;
- 文章的主要目的是过一遍代码,特别是模型工程的构建流程,如果你想直接使用工程建议还是去拉取原始仓库;
- 每篇文章都会在我的github仓库中对应一个文件夹,建议在阅读时配合代码仓库一起看;
- 期间中会根据情况删除、添加、翻译一些注释;
Github 仓库:https://github.com/GaohaoZhou-ops/Model-Reproduction
1. 模型构建 model.py
这个工程相对比较简单,在模型构建方面只用到了pytorch的一些基本组件,通过以下命令可以查看原始工程中 model.py
文件的库依赖关系:
【Note】:这一步需要在原始工程文件夹中执行,你也可以不执行只看我的推演过程。
(model) $ pydeps model.py --max-bacon=1
导入必要的包:
import math
import inspect
from dataclasses import dataclass
import torch
import torch.nn as nn
from torch.nn import functional as F
定义归一化层类:
因为torch自带的 layer_norm
对象没有设置 bias 为None 的选项,所以这里需要覆写一下类:
class LayerNorm(nn.Module):
def __init__(self, ndim, bias):
super().__init__()
self.weight = nn.Parameter(torch.ones(ndim))
self.bias = nn.Parameter(torch.zeros(ndim)) if bias else None
def forward(self, input):
return F.layer_norm(input, self.weight.shape, self.weight, self.bias, 1e-5)
定义CausalSelfAttention类:
CausalSelfAttention
是一种自注意力机制,主要用于序列到序列的任务中,例如机器翻译、文本摘要等。主要特点如下:
- 仅允许注意力模型 访问当前和之前的输入,不能访问之后的输入,例如在机器翻译任务中,翻译某个词时不能访问该词之后的内容;
- 通过掩盖(masking)未来的输入来实现因果性。具体方法是在计算注意力权重时,对角线以上的权重置零,从而避免模型看到未来的信息;
- 可用于 Transformer 和其变种模型中捕捉序列的时间依赖关系。Transformer 原本是并行计算的,使用
CausalSelfAttention
赋予其顺序处理能力; CausalSelfAttention
学习到的表示更符合时间顺序,更有利于序列生成任务;- 一般用于编码器端,而解码器端则使用 MaskedSelfAttention,它允许解码器访问编码器输出的所有时刻,以使用整个输入序列的上下文信息;
class CausalSelfAttention(nn.Module):
def __init__(self, config) -> None:
super().__init__()
assert config.n_embd % config.n_head == 0 # 确保嵌入维度(n_embd)能被注意力头数(n_head)整除,每个头将分配到相同大小的子空间
self.c_attn = nn.Linear(config.n_embd, 3*config.n_head, bias=config.bias) # 线性变换层,用于生成查询(Q)、键(K)和值(V),将嵌入维度扩展为三倍,以便后续拆分为 Q、K、V。
self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias) # 将注意力输出重新投影回原始维度
self.attn_dropout = nn.Dropout(config.dropout) # 用于注意力矩阵(防止过拟合)
self.resid_dropout = nn.Dropout(config.dropout) # 用于残差连接的输出
self.n_head = config.n_head
self.n_embd = config.n_embd
self.dropout = config.dropput
self.flash = hasattr(torch.nn.functional, 'scaled_dot_product_attention')
if not self.flash: # 判断是否支持GPU 加速的Flash Attention
print("WARNING: using slow attention. Flash Attention requires PyTorch >= 2.0")
# 使用下三角矩阵构建因果掩码,确保只关注左侧信息(即历史信息)
self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size)).view(1,1, config.block_size, config.block_size))
def forward(self, x):
B,T,C = x.size() # batch_size, token_length, embedding_dim
q,k,v = self.c_attn(x).split(self.n_embd, dim=2) # 计算 Q、K、V
k = k.view(B, T, self.n_head, C // self.n_head).transpose(1,2) # shape=(B,nh,T,hs)
q = q.view(B, T, self.n_head, C // self.n_head).transpose(1,2) # shape=(B,nh,T,hs)
v = v.view(B, T, self.n_head, C // self.n_head).transpose(1,2) # shape=(B,nh,T,hs)
if self.flash: # 如果支持GPU加速,则使用flash attention
y = torch.nn.functional.scaled_dot_product_attention(q,k,v, attn_mask=None, dropout_p=self.dropout if self.training else 0, is_causal=True)
else: # 否则手动计算
att = (q @ k.transpose(-2,-1)) * (1.0 / math.sqrt(k.size(-1)))
att = att.masked_fill(self.bias[:,:,:T,:T] == 0, float('-inf'))
att = F.softmax(att, dim=-1)
att = self.attn_dropout(att)
y = att @ v # (B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)
y = y.transpose(1,2).contiguous().view(B,T,C) # 还原为原始形状,便于后续处理
y = self.resid_dropout(self.c_proj(y))
return y
定义 MLP 类
该类用于线性映射,总体结构非常简单
class MLP(nn.Module):
def __init__(self, config) -> None:
super().__init__()
self.c_fc = nn.Linear(config.n_embd, 4*config.n_embd, bias=config.bias)
self.gelu = nn.GELU()
self.c_proj = nn.Linear(4*config.n_embd, config.n_embd, bias=config.bias)
self.dropout = nn.Dropout(config.dropout)
def forward(self, x):
x = self.c_fc(x)
x = self.gelu(x)
x = self.c_proj(x)
x = self.dropout(x)
return x
定义单个计算单元 Block
class Block(nn.Module):
def __init__(self, config) -> None:
super().__init__()
self.ln_1 = LayerNorm(config.n_embd, bias=config.bias)
self.attn = CausalSelfAttention(config)
self.ln_2 = LayerNorm(config.n_embd, bias=config.bias)
self.mlp = MLP(config)
def forward(self, x):
x = x + self.attn(self.ln_1(x))
x = x + self.mlp(self.ln_2(x))
return x
定义GPT类需要的配置对象
@dataclass
class GPTConfig:
block_size: int=1024
vocab_size: int=50304
n_layer: int=12
n_head: int=12
n_embd: int=768
dropout: float=0.0
bias: bool=True
定义完整GPT模型
在模型中使用了 AdamW
优化器,相对与Adam
的改动其实十分简单,其将权重衰减项从梯度的计算中拿出来直接加在了最后的权重更新步骤上。其提出的动机在于:原先Adam
的实现中如果采用了L2
权重衰减,则相应的权重衰减项会被直接加在loss里,从而导致动量的一阶与二阶滑动平均均考虑了该权重衰减项,而这影响了Adam
的优化效果,而将权重衰减与梯度的计算进行解耦能够显著提升Adam
的效果。
目前,AdamW现在已经成为transformer训练中的默认优化器。
class GPT(nn.Module):
def __init__(self, config:GPTConfig) -> None:
super().__init__()
assert config.vocab_size is not None # 检查配置中 词汇表大小(vocab_size)和块大小(block_size)是否有效。
assert config.block_size is not None
self.config = config
self.transformer = nn.ModuleDict(dict( # Transformer结构
wte=nn.Embedding(config.vocab_size, config.n_embd), # 词嵌入(wte):将词索引映射为向量表示,shape=(vocab_size, n_embd)
wpe=nn.Embedding(config.block_size, config.n_embd), # 位置嵌入(wpe):将位置索引映射为向量,shape=(block_size, n_embd)
drop=nn.Dropout(config.dropout),
h=nn.ModuleList([Block(config) for _ in range(config.n_layer)]), # 多个并列的注意力块,每个块由自定义 Block(config) 生成
ln_f = LayerNorm(config.n_embd, bias=config.bias), # 最终输出的层归一化
))
self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False) # 语言模型头:将最后的特征映射回词汇表空间,用于词预测
self.transformer.wte.weight = self.lm_head.weight # 权重绑定:词嵌入层和输出投影层共享权重,减少参数量,提高性能
self.apply(self._init_weights) # 权重初始化
for pn, p in self.named_parameters(): # 对残差连接权重(c_proj)进行特殊初始化,符合 GPT-2 论文规范
if pn.endswith('c_proj.weight'):
torch.nn.init.normal_(p, mean=0.0, std=0.02/math.sqrt(2*config.n_layer))
print("number of parameters: %.2fM" % (self.get_num_params()/1e6,))
# 获得参数总量
def get_num_params(self, non_embedding=True):
n_params = sum(p.numel() for p in self.parameters())
if non_embedding:
n_params -= self.transformer.wpe.weight.numel()
return n_params
# 初始化整体权重
def _init_weights(self, module):
if isinstance(module, nn.Linear):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
if module.bias is not None:
torch.nn.init.zeros_(module.bias)
elif isinstance(module, nn.Embedding):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
def forward(self, idx, targets=None):
device = idx.device
b,t = idx.size()
assert t <= self.config.block_size, f"Cannot forward sequence of length {t}, block size is only {self.config.block_size}" # 确保输入序列长度不超过配置的块大小
pos = torch.arange(0, t, dtype=torch.long, device=device) # 位置编码,shape=(t)
# 嵌入层前向计算:词嵌入 & 位置嵌入
tok_emb = self.transformer.wte(idx) # shape=(b, t, n_embd)
pos_emb = self.transformer.wpe(pos) # shape=(t, n_embd)
x = self.transformer.drop(tok_emb+pos_emb)
# 逐层通过多层注意力块(Block),最后进行层归一化
for block in self.transformer.h:
x = block(x)
x = self.transformer.ln_f(x)
if targets is not None: # 训练阶段
logits = self.lm_head(x) # 计算预测分布(logits):使用语言模型头得到词概率分布
loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1) # 交叉熵损失:比较预测值和目标值,忽略填充索引 -1
else: # 推理阶段
logits = self.lm_head(x[:, [-1], :]) # 仅计算最后一个时间步的预测,减少推理时的计算量
loss = None
return logits, loss
def crop_block_size(self, block_size):
assert block_size <= self.config.block_size
self.config.block_size = block_size
self.transformer.wpe.weight = nn.Parameter(self.transformer.wpe.weight[:block_size])
for block in self.transformer.h:
if hasattr(block.attn, 'bias'):
block.attn.bias = block.attn.bias[:, :, :block_size, :block_size]
@classmethod
def from_pretrained(cls, model_type, override_args=None):
assert model_type in {'gpt2', 'gpt2-medium', 'gpt2-large', 'gpt2-xl'}
override_args = override_args or {}
assert all(k=='dropout' for k in override_args)
from transformers import GPT2LMHeadModel
print("loading weights from pretrained gpt: %s" % model_type)
# 模型配置参数
config_args = {
'gpt2': dict(n_layer=12, n_head=12, n_embd=768), # 124M params
'gpt2-medium': dict(n_layer=24, n_head=16, n_embd=1024), # 350M params
'gpt2-large': dict(n_layer=36, n_head=20, n_embd=1280), # 774M params
'gpt2-xl': dict(n_layer=48, n_head=25, n_embd=1600), # 1558M params
}[model_type]
print("forcing vocab_size=50257, block_size=1024, bias=True")
config_args['vocab_size'] = 50257
config_args['block_size'] = 1024
config_args['bias'] = True
# 如果 dropout 在参数列表中则对齐进行覆写
if 'dropout' in override_args:
print(f"overriding dropout rate to {override_args['dropout']}")
config_args['dropout'] = override_args['dropout']
config = GPTConfig(**config_args)
model = GPT(config)
sd = model.state_dict()
sd_keys = sd.keys()
sd_keys = [k for k in sd_keys if not k.endswith('.attn.bias')]
# 加载预训练权重
model_hf = GPT2LMHeadModel.from_pretrained(model_type)
sd_hf = model_hf.state_dict()
# 因为这里是nanoGPT,所以只需要从完整GPT模型中拷贝一部分权重即可
sd_keys_hf = sd_hf.keys()
sd_keys_hf = [k for k in sd_keys_hf if not k.endswith('.attn.masked_bias')]
sd_keys_hf = [k for k in sd_keys_hf if not k.endswith('.attn.bias')]
transposed = ['attn.c_attn.weight', 'attn.c_proj.weight', 'mlp.c_fc.weight', 'mlp.c_proj.weight']
assert len(sd_keys_hf) == len(sd_keys), f"mismatched keys: {len(sd_keys_hf)} != {len(sd_keys)}"
for k in sd_keys_hf:
if any(k.endswith(w) for w in transposed):
assert sd_hf[k].shape[::-1] == sd[k].shape
with torch.no_grad():
sd[k].copy_(sd_hf[k].t())
else:
assert sd_hf[k].shape == sd[k].shape
with torch.no_grad():
sd[k].copy_(sd_hf[k])
return model
# 配置 AdamW 优化器
def configure_optimizers(self, weight_decay, learning_rate, betas, device_type):
param_dict = {pn: p for pn,p in self.named_parameters()}
param_dict = {pn: p for pn,p in param_dict.items() if p.requires_grad}
decay_params = [p for n,p in param_dict.items() if p.dim() >= 2]
nodecay_params = [p for n,p in param_dict.items() if p.dim() < 2]
optim_groups = [
{'params': decay_params, 'weight_decay': weight_decay},
{'params': nodecay_params, 'weight_decay': 0.0}
]
num_decay_params = sum(p.numel() for p in decay_params)
num_nodecay_params = sum(p.numel() for p in nodecay_params)
print(f"num decayed parameter tensors: {len(decay_params)}, with {num_decay_params:,} parameters")
print(f"num non-decayed parameter tensors: {len(nodecay_params)}, with {num_nodecay_params:,} parameters")
fused_available = 'fused' in inspect.signature(torch.optim.AdamW).parameters
use_fused = fused_available and device_type == 'cuda'
extra_args = dict(fused=True) if use_fused else dict()
optimizer = torch.optim.AdamW(optim_groups, lr=learning_rate, betas=betas, **extra_args)
print(f"using fused AdamW: {use_fused}")
return optimizer
# 估算模型的GPU利用率
def estimate_mfu(self, fwdbwd_per_iter, dt):
N = self.get_num_params()
cfg = self.config
L,H,Q,T = cfg.n_layer, cfg.n_head, cfg.n_embd//cfg.n_head, cfg.block_size
flops_per_token = 6*N + 12*L*H*Q*T
flops_per_fwdbwd = flops_per_token * T
flops_per_iter = flops_per_fwdbwd * fwdbwd_per_iter
flops_achieved = flops_per_iter * (1.0/dt)
flops_promised = 312e12
mfu = flops_achieved / flops_promised
return mfu
@torch.no_grad()
def generate(self, idx, max_new_tokens, temperature=0.1, top_k=None):
for _ in range(max_new_tokens):
idx_cond = idx if idx.size(1) <= self.config.block_size else idx [:, -self.config.block_size:]
logits, _ = self(idx_cond)
logits = logits[:, -1, :] / temperature
if top_k is not None:
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
logits[logits < v[:, [-1]]] = -float('Inf')
probs = F.softmax(logits, dim=-1)
idx_next = torch.multinomial(probs, num_samples=1)
idx = torch.cat((idx, idx_next), dim=1)
return idx
2. 训练用的快捷配置导入 configuration.py
因为在模型训练时需要导入很多公共配置信息,这里额外给了一个文件用来从 config
文件夹中加载模型配置,这部分就是python简单的文件与sys读取。
具体用法如下:
(model) $ python train.py config/override_file.py --batch_size=32
config
文件夹结构如下:
(model) $ tree -L 2
├── config
│ ├── eval_gpt2.py
│ ├── eval_gpt2_large.py
│ ├── eval_gpt2_medium.py
│ ├── eval_gpt2_xl.py
│ ├── finetune_shakespeare.py
│ ├── train_gpt2.py
│ └── train_shakespeare_char.py
代码如下:
import sys
from ast import literal_eval
for arg in sys.argv[1:]:
if '=' not in arg:
assert not arg.startswith('--')
config_file = arg
print(f"Overriding config with {config_file}:")
with open(config_file) as f:
print(f.read())
exec(open(config_file).read())
else:
assert arg.startswith('--')
key, val = arg.split('=')
key = key[2:]
if key in globals():
try:
attempt = literal_eval(val)
except (SyntaxError, ValueError):
attempt = val
assert type(attempt) == type(globals()[key])
print(f"Overriding: {key} = {attempt}")
globals()[key] = attempt
else:
raise ValueError(f"Unknown config key: {key}")
3. 训练模型 train.py
定义好模型后通常是进行训练,这里的训练是基于huggingface上的预训练权重进行的。
导入必要的包
import os, time, math, pickle
from contextlib import nullcontext
import numpy as np
import torch
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
from model import GPTConfig, GPT
配置模型超参数
# I/O 相关
out_dir = 'out'
eval_interval = 2000
log_interval = 1
eval_iters = 200
eval_only = False
always_save_checkpoint = True
init_from = 'scratch' # 'scratch' or 'resume' or 'gpt2*'
# wandb logging 日志
wandb_log = False
wandb_project = 'owt'
wandb_run_name = 'gpt2'
# data
dataset = 'openwebtext'
gradient_accumulation_steps = 5 * 8
batch_size = 12
block_size = 1024
# model
n_layer = 12
n_head = 12
n_embd = 768
dropout = 0.0
bias = False
# adamw optimizer
learning_rate = 6e-4
max_iters = 600000
weight_decay = 1e-1
beta1 = 0.9
beta2 = 0.95
grad_clip = 1.0
# learning rate 衰减
decay_lr = True
warmup_iters = 2000
lr_decay_iters = 600000
min_lr = 6e-5
# DDP settings
backend = 'nccl' # 'nccl', 'gloo', etc.
# system
device = 'cuda' # 'cpu', 'cuda', 'cuda:0', 'cuda:1', 'mps'
dtype = 'bfloat16' if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else 'float16' # 'float32', 'bfloat16', 'float16'
compile = True # 使用 PyTorch 2.0 编译得到更快的模型
config_keys = [k for k,v in globals().items() if not k.startswith('_') and isinstance(v, (int, float, bool, str))]
exec(open('configurator.py').read())
config = {k: globals()[k] for k in config_keys}
配置多GPU并行计算
ddp = int(os.environ.get('RANK', -1)) != -1
if ddp:
init_process_group(backend=backend)
ddp_rank = int(os.environ['RANK'])
ddp_local_rank = int(os.environ['LOCAL_RANK'])
ddp_world_size = int(os.environ['WORLD_SIZE'])
device = f'cuda:{ddp_local_rank}'
torch.cuda.set_device(device)
master_process = ddp_rank == 0
seed_offset = ddp_rank
assert gradient_accumulation_steps % ddp_world_size == 0
gradient_accumulation_steps //= ddp_world_size
else:
master_process = True
seed_offset = 0
ddp_world_size = 1
tokens_per_iter = gradient_accumulation_steps * ddp_world_size * batch_size * block_size
print(f"tokens per iteration will be: {tokens_per_iter:,}")
初始化torch的一些训练用配置
if master_process:
os.makedirs(out_dir, exist_ok=True)
torch.manual_seed(1337 + seed_offset)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
device_type = 'cuda' if 'cuda' in device else 'cpu'
ptdtype = {'float32': torch.float32, 'bfloat16': torch.bfloat16, 'float16': torch.float16}[dtype]
ctx = nullcontext() if device_type == 'cpu' else torch.amp.autocast(device_type=device_type, dtype=ptdtype)
data_dir = os.path.join('data', dataset)
def get_batch(split):
if split == 'train':
data = np.memmap(os.path.join(data_dir, 'train.bin'), dtype=np.uint16, mode='r')
else:
data = np.memmap(os.path.join(data_dir, 'val.bin'), dtype=np.uint16, mode='r')
ix = torch.randint(len(data) - block_size, (batch_size,))
x = torch.stack([torch.from_numpy((data[i:i+block_size]).astype(np.int64)) for i in ix])
y = torch.stack([torch.from_numpy((data[i+1:i+1+block_size]).astype(np.int64)) for i in ix])
if device_type == 'cuda':
x, y = x.pin_memory().to(device, non_blocking=True), y.pin_memory().to(device, non_blocking=True)
else:
x, y = x.to(device), y.to(device)
return x, y
iter_num = 0
best_val_loss = 1e9
meta_path = os.path.join(data_dir, 'meta.pkl')
meta_vocab_size = None
if os.path.exists(meta_path):
with open(meta_path, 'rb') as f:
meta = pickle.load(f)
meta_vocab_size = meta['vocab_size']
print(f"found vocab_size = {meta_vocab_size} (inside {meta_path})")
初始化模型
model_args = dict(n_layer=n_layer, n_head=n_head, n_embd=n_embd, block_size=block_size,
bias=bias, vocab_size=None, dropout=dropout)
# 根据配置信息初始化模型
if init_from == 'scratch':
print("Initializing a new model from scratch")
if meta_vocab_size is None:
print("defaulting to vocab_size of GPT-2 to 50304 (50257 rounded up for efficiency)")
model_args['vocab_size'] = meta_vocab_size if meta_vocab_size is not None else 50304
gptconf = GPTConfig(**model_args)
model = GPT(gptconf)
elif init_from == 'resume':
print(f"Resuming training from {out_dir}")
ckpt_path = os.path.join(out_dir, 'ckpt.pt')
checkpoint = torch.load(ckpt_path, map_location=device)
checkpoint_model_args = checkpoint['model_args']
for k in ['n_layer', 'n_head', 'n_embd', 'block_size', 'bias', 'vocab_size']:
model_args[k] = checkpoint_model_args[k]
gptconf = GPTConfig(**model_args)
model = GPT(gptconf)
state_dict = checkpoint['model']
unwanted_prefix = '_orig_mod.'
for k,v in list(state_dict.items()):
if k.startswith(unwanted_prefix):
state_dict[k[len(unwanted_prefix):]] = state_dict.pop(k)
model.load_state_dict(state_dict)
iter_num = checkpoint['iter_num']
best_val_loss = checkpoint['best_val_loss']
elif init_from.startswith('gpt2'):
print(f"Initializing from OpenAI GPT-2 weights: {init_from}")
override_args = dict(dropout=dropout)
model = GPT.from_pretrained(init_from, override_args)
for k in ['n_layer', 'n_head', 'n_embd', 'block_size', 'bias', 'vocab_size']:
model_args[k] = getattr(model.config, k)
if block_size < model.config.block_size:
model.crop_block_size(block_size)
model_args['block_size'] = block_size
model.to(device)
# 初始化 GradScaler
scaler = torch.cuda.amp.GradScaler(enabled=(dtype == 'float16'))
配置优化器
optimizer = model.configure_optimizers(weight_decay, learning_rate, (beta1, beta2), device_type)
if init_from == 'resume':
optimizer.load_state_dict(checkpoint['optimizer'])
checkpoint = None
Pytorch 2.0 编译模型(可选)
if compile:
print("compiling the model... (takes a ~minute)")
unoptimized_model = model
model = torch.compile(model)
# 配置分布式训练
if ddp:
model = DDP(model, device_ids=[ddp_local_rank])
定义评估损失函数
@torch.no_grad()
def estimate_loss():
out = {}
model.eval()
for split in ['train', 'val']:
losses = torch.zeros(eval_iters)
for k in range(eval_iters):
X,Y = get_batch(split)
with ctx:
logits, loss = model(X,Y)
losses[k] = loss.item()
out[split] = losses.mean()
model.train()
return out
定义学习率衰减计划
def get_lr(it):
if it < warmup_iters:
return learning_rate * (it + 1) / (warmup_iters + 1)
if it > warmup_iters:
return min_lr
decay_ratio = (it - warmup_iters) / (lr_decay_iters - warmup_iters)
assert 0 <= decay_ratio <= 1
coeff = 0.5 * (1.0 + math.cos(math.pi * decay_ratio))
return min_lr + coeff * (learning_rate - min_lr)
初始化日志
if wandb_log and master_process:
import wandb
wandb.init(project=wandb_project, name=wandb_run_name, config=config)
启动训练
X, Y = get_batch('train')
t0 = time.time()
local_iter_num = 0
raw_model = model.module if ddp else model
running_mfu = -1.0
while True:
lr = get_lr(iter_num) if decay_lr else learning_rate
for param_group in optimizer.param_groups:
param_group['lr'] = lr
# 根据当前迭代的次数进行日志与eval
if iter_num % eval_interval == 0 and master_process:
losses = estimate_loss()
print(f"step {iter_num}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")
if wandb_log:
wandb.log({
"iter": iter_num,
"train/loss": losses['train'],
"val/loss": losses['val'],
"lr": lr,
"mfu": running_mfu*100, # convert to percentage
})
if losses['val'] < best_val_loss or always_save_checkpoint:
best_val_loss = losses['val']
if iter_num > 0:
checkpoint = {
'model': raw_model.state_dict(),
'optimizer': optimizer.state_dict(),
'model_args': model_args,
'iter_num': iter_num,
'best_val_loss': best_val_loss,
'config': config,
}
print(f"saving checkpoint to {out_dir}")
torch.save(checkpoint, os.path.join(out_dir, 'ckpt.pt'))
if iter_num == 0 and eval_only:
break
# 前向传播与训练
for micro_step in range(gradient_accumulation_steps):
if ddp:
model.require_backward_grad_sync = (micro_step == gradient_accumulation_steps - 1)
with ctx:
logits, loss = model(X, Y)
loss = loss / gradient_accumulation_steps
X, Y = get_batch('train')
# 反向传播更新梯度
scaler.scale(loss).backward()
# 优化器根据梯度对参数进行更新
if grad_clip != 0.0:
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad(set_to_none=True)
# 训练期间的日志与计时
t1 = time.time()
dt = t1 - t0
t0 = t1
if iter_num % log_interval == 0 and master_process:
lossf = loss.item() * gradient_accumulation_steps
if local_iter_num >= 5:
mfu = raw_model.estimate_mfu(batch_size * gradient_accumulation_steps, dt)
running_mfu = mfu if running_mfu == -1.0 else 0.9*running_mfu + 0.1*mfu
print(f"iter {iter_num}: loss {lossf:.4f}, time {dt*1000:.2f}ms, mfu {running_mfu*100:.2f}%")
iter_num += 1
local_iter_num += 1
if iter_num > max_iters:
break
# 终止分布式配置
if ddp:
destroy_process_group()
4. 使用示例 sample.py
在定义好模型结构、训练流程后就可以调用使用示例了,这里的使用示例就是一个纯推理。
导入必要的包
import os, pickle, torch, tiktoken
from contextlib import nullcontext
from model import GPTConfig, GPT
配置超参数
init_from = 'resume' # 'resume' or 'gpt2-xl'
out_dir = 'out'
start = "\n"
num_samples = 10
max_new_tokens = 500
temperature = 0.8
top_k = 200
seed = 1337
device = 'cuda' # 'cpu', 'cuda', 'cuda:0', 'cuda:1'
dtype = 'bfloat16' if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else 'float16' # 'float32' or 'bfloat16' or 'float16'
compile = False # PyTorch 2.0 加速编译
exec(open('configurator.py').read())
配置torch
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
device_type = 'cuda' if 'cuda' in device else 'cpu'
ptdtype = {'float32': torch.float32, 'bfloat16': torch.bfloat16, 'float16': torch.float16}[dtype]
ctx = nullcontext() if device_type == 'cpu' else torch.amp.autocast(device_type=device_type, dtype=ptdtype)
初始化模型
if init_from == 'resume':
ckpt_path = os.path.join(out_dir, 'ckpt.pt')
checkpoint = torch.load(ckpt_path, map_location=device)
gptconf = GPTConfig(**checkpoint['model_args'])
model = GPT(gptconf)
state_dict = checkpoint['model']
unwanted_prefix = '_orig_mod.'
for k,v in list(state_dict.items()):
if k.startswith(unwanted_prefix):
state_dict[k[len(unwanted_prefix):]] = state_dict.pop(k)
model.load_state_dict(state_dict)
elif init_from.startswith('gpt2'):
model = GPT.from_pretrained(init_from, dict(dropout=0.0))
model.eval()
model.to(device)
if compile:
model = torch.compile(model)
初始化环境
load_meta = False
if init_from == 'resume' and 'config' in checkpoint and 'dataset' in checkpoint['config']:
meta_path = os.path.join('data', checkpoint['config']['dataset'], 'meta.pkl')
load_meta = os.path.exists(meta_path)
if load_meta:
print(f"Loading meta from {meta_path}...")
with open(meta_path, 'rb') as f:
meta = pickle.load(f)
stoi, itos = meta['stoi'], meta['itos']
encode = lambda s: [stoi[c] for c in s]
decode = lambda l: ''.join([itos[i] for i in l])
else:
print("No meta.pkl found, assuming GPT-2 encodings...")
enc = tiktoken.get_encoding("gpt2")
encode = lambda s: enc.encode(s, allowed_special={"<|endoftext|>"})
decode = lambda l: enc.decode(l)
对提示词进行编码
if start.startswith('FILE:'):
with open(start[5:], 'r', encoding='utf-8') as f:
start = f.read()
start_ids = encode(start)
x = (torch.tensor(start_ids, dtype=torch.long, device=device)[None, ...])
进行测试
with torch.no_grad():
with ctx:
for k in range(num_samples):
y = model.generate(x, max_new_tokens, temperature=temperature, top_k=top_k)
print(decode(y[0].tolist()))
print('---------------')
5. benchmark评估 bench.py
在训练和测试完模型后通常需要对其进行一次bench评估,如果要发论文的话这步是必不可少的环节。
导入必要的包
import os
from contextlib import nullcontext
import numpy as np
import time
import torch
from model import GPTConfig, GPT
配置超参数
batch_size = 12
block_size = 1024
bias = False
real_data = True
seed = 1337
device = 'cuda' # 'cpu', 'cuda', 'cuda:0', 'cuda:1'
dtype = 'bfloat16' if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else 'float16' # 'float32' or 'bfloat16' or 'float16'
compile = True # PyTorch 2.0 加速编译
profile = False # 是否使用 pytorch profiler 或只进行简单评估
exec(open('configurator.py').read())
配置torch
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
device_type = 'cuda' if 'cuda' in device else 'cpu'
ptdtype = {'float32': torch.float32, 'bfloat16': torch.bfloat16, 'float16': torch.float16}[dtype]
ctx = nullcontext() if device_type == 'cpu' else torch.amp.autocast(device_type=device_type, dtype=ptdtype)
加载数据并初始化 - openwebtext
if real_data:
dataset = 'openwebtext'
data_dir = os.path.join('data', dataset)
train_data = np.memmap(os.path.join(data_dir, 'train.bin'), dtype=np.uint16, mode='r')
def get_batch(split):
data = train_data
ix = torch.randint(len(data) - block_size, (batch_size,))
x = torch.stack([torch.from_numpy((data[i:i+block_size]).astype(np.int64)) for i in ix])
y = torch.stack([torch.from_numpy((data[i+1:i+1+block_size]).astype(np.int64)) for i in ix])
x, y = x.pin_memory().to(device, non_blocking=True), y.pin_memory().to(device, non_blocking=True)
return x, y
else: # 直接给一堆噪声
x = torch.randint(50304, (batch_size, block_size), device=device)
y = torch.randint(50304, (batch_size, block_size), device=device)
get_batch = lambda split: (x, y)
模型初始化
gptconf = GPTConfig(
block_size = block_size,
n_layer = 12, n_head = 12, n_embd = 768,
dropout = 0,
bias = bias,
)
model = GPT(gptconf)
model.to(device)
optimizer = model.configure_optimizers(weight_decay=1e-2, learning_rate=1e-4, betas=(0.9, 0.95), device_type=device_type)
if compile:
print("Compiling model...")
model = torch.compile(model) # pytorch 2.0
benchmark评估
if profile:
wait, warmup, active = 5, 5, 5
num_steps = wait + warmup + active
with torch.profiler.profile(
activities=[torch.profiler.ProfilerActivity.CPU, torch.profiler.ProfilerActivity.CUDA],
schedule=torch.profiler.schedule(wait=wait, warmup=warmup, active=active, repeat=1),
on_trace_ready=torch.profiler.tensorboard_trace_handler('./bench_log'),
record_shapes=False,
profile_memory=False,
with_stack=False,
with_flops=True,
with_modules=False,
) as prof:
X, Y = get_batch('train')
for k in range(num_steps):
with ctx:
logits, loss = model(X, Y)
X, Y = get_batch('train')
optimizer.zero_grad(set_to_none=True)
loss.backward()
optimizer.step()
lossf = loss.item()
print(f"{k}/{num_steps} loss: {lossf:.4f}")
prof.step()
else: # simple benchmarking
torch.cuda.synchronize()
for stage, num_steps in enumerate([10, 20]):
t0 = time.time()
X, Y = get_batch('train')
for k in range(num_steps):
with ctx:
logits, loss = model(X, Y)
X, Y = get_batch('train')
optimizer.zero_grad(set_to_none=True)
loss.backward()
optimizer.step()
lossf = loss.item()
print(f"{k}/{num_steps} loss: {lossf:.4f}")
torch.cuda.synchronize()
t1 = time.time()
dt = t1-t0
mfu = model.estimate_mfu(batch_size * 1 * num_steps, dt)
if stage == 1:
print(f"time per iteration: {dt/num_steps*1000:.4f}ms, MFU: {mfu*100:.2f}%")