为pytorch前向和反向的Tensor生成描述性统计
在调试Megatron-DeepSpeed的精度时,我们希望对比每一层前向和反向传播的输入输出误差。然而,由于数据量过大,直接保存所有数据不太现实。因此,我们生成了输入输出tensor的描述性统计信息,并等间隔抽样N个数据点,以比较这些点的相对误差,从而查找精度异常的位置。为了准确定位,我们通过类名和对象ID生成唯一的对象名称(形式为[类名-创建的第几个])以及前向和反向传播的次数。通过保存上述信息,我们可以详细记录并回溯当时的实际输入输出数据。
代码
cat > linear_test.py <<-'EOF'
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from datetime import datetime
# 设置设备
device = "cpu"
if torch.cuda.is_available():
device = "cuda:4"
def is_tensor(val):
# 判断是否为tensor或Parameter
return isinstance(val, (torch.Tensor, nn.Parameter))
def describe_tensor(tensor):
# 返回tensor的描述,包括形状和部分数据统计信息
shape = list(tensor.shape)
tensor_data = tensor.cpu().float().detach().numpy().ravel()
num_points = min(16, len(tensor_data))
indices = np.linspace(0, len(tensor_data) - 1, num_points, dtype=int)
stats = [np.max(tensor_data), np.min(tensor_data), np.mean(tensor_data), np.std(tensor_data)]
sample_data = tensor_data[indices]
stats_str = ",".join(f"{x:.5f}" for x in stats)
sample_str = ",".join(f"{x:.5f}" for x in sample_data)
return f"{shape}-{stats_str},{sample_str}"
def generate_random_data(shape):
# 生成符合指定形状的随机数据
max_val, min_val, mean, std = 0.04025, -0.04651, 0.0, 0.00134
data = np.random.normal(mean, std, shape)
data = (data - data.min()) / (data.max() - data.min()) * (max_val - min_val) + min_val
return data
index_counter = 0
def log_tensor_data(name, tensor):
# 打印tensor的日志数据
global index_counter
index_counter += 1
timestamp = datetime.now().strftime("%H%M%S%f")
if is_tensor(tensor):
print(f"{timestamp},{index_counter},{name},0,{describe_tensor(tensor)}")
elif isinstance(tensor, (tuple, list)):
for idx, t in enumerate(tensor):
if is_tensor(t):
print(f"{timestamp},{index_counter},{name},{idx},{describe_tensor(t)}")
def log_gradient(model):
# 打印模型参数梯度信息
for name, param in model.named_parameters():
if param.grad is not None:
log_tensor_data(f"grad-{name}", param.grad)
# 对象和类名缓存
object_cache = {}
class_name_count = {}
def get_unique_name(class_name, obj_id):
# 生成唯一的对象名称
if class_name not in class_name_count:
class_name_count[class_name] = 0
uid = f"{class_name}_{obj_id}"
if uid not in object_cache:
class_name_count[class_name] += 1
object_cache[uid] = {"idx": class_name_count[class_name]}
return f'{class_name}-{object_cache[uid]["idx"]}'
def initialize_module_attributes(module):
# 初始化模块属性
if not hasattr(module, 'uuid'):
module.uuid = get_unique_name(module.__class__.__name__, id(module))
if not hasattr(module, 'backward_step'):
module.backward_step = 0
if not hasattr(module, 'forward_step'):
module.forward_step = 0
def forward_decorator():
# 包装forward函数的修饰器
def decorator(func):
def wrapped(*args, **kwargs):
module = args[0]
initialize_module_attributes(module)
module.forward_step += 1
log_tensor_data(f"forward-{module.uuid}-{module.forward_step}-input", args)
output = func(*args, **kwargs)
log_tensor_data(f"forward-{module.uuid}-{module.forward_step}-output", output)
return output
return wrapped
return decorator
def pre_backward_hook(module, grad_input):
# 反向传播前的钩子函数
initialize_module_attributes(module)
module.backward_step += 1
log_tensor_data(f"backward-{module.uuid}-{module.backward_step}-input", grad_input)
def post_backward_hook(module, grad_input, grad_output):
# 反向传播后的钩子函数
initialize_module_attributes(module)
log_tensor_data(f"backward-{module.uuid}-{module.backward_step}-output", grad_output)
def register_backward_hooks(module):
# 注册反向传播钩子
module.register_full_backward_pre_hook(pre_backward_hook)
module.register_full_backward_hook(post_backward_hook)
class CustomLinear(nn.Module):
def __init__(self, shape):
super(CustomLinear, self).__init__()
weight_data = torch.from_numpy(generate_random_data(shape)).half().to(device)
self.weight = nn.Parameter(weight_data)
self.register_parameter('bias', None)
register_backward_hooks(self)
@forward_decorator()
def forward(self, input_):
return F.linear(input_, self.weight, self.bias)
class MyModel(nn.Module):
def __init__(self):
super(MyModel, self).__init__()
self.layer1 = CustomLinear((5504, 4096))
self.layer2 = CustomLinear((4096, 5504))
@forward_decorator()
def forward(self, input_):
out = self.layer1(input_)
out = self.layer2(out)
return out
# 设置随机种子
np.random.seed(1)
torch.manual_seed(2)
# 创建和训练模型
model = MyModel().half().to(device)
model.train()
input_data = torch.from_numpy(generate_random_data((1024, 12, 4096))).half().to(device)
target_data = torch.from_numpy(generate_random_data((1024, 12, 4096))).half().to(device)
for _ in range(2):
outputs = model(input_data)
outputs.backward(target_data) # 使用全一的梯度来反向传播
log_gradient(model)
EOF
python3 linear_test.py