为pytorch前向和反向的Tensor生成描述性统计

发布于:2024-05-16 ⋅ 阅读:(128) ⋅ 点赞:(0)

为pytorch前向和反向的Tensor生成描述性统计


在调试Megatron-DeepSpeed的精度时,我们希望对比每一层前向和反向传播的输入输出误差。然而,由于数据量过大,直接保存所有数据不太现实。因此,我们生成了输入输出tensor的描述性统计信息,并等间隔抽样N个数据点,以比较这些点的相对误差,从而查找精度异常的位置。为了准确定位,我们通过类名和对象ID生成唯一的对象名称(形式为[类名-创建的第几个])以及前向和反向传播的次数。通过保存上述信息,我们可以详细记录并回溯当时的实际输入输出数据。

代码

cat > linear_test.py <<-'EOF'
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from datetime import datetime

# 设置设备
device = "cpu"

if torch.cuda.is_available():
    device = "cuda:4"

def is_tensor(val):
    # 判断是否为tensor或Parameter
    return isinstance(val, (torch.Tensor, nn.Parameter))

def describe_tensor(tensor):
    # 返回tensor的描述,包括形状和部分数据统计信息
    shape = list(tensor.shape)
    tensor_data = tensor.cpu().float().detach().numpy().ravel()
    num_points = min(16, len(tensor_data))
    indices = np.linspace(0, len(tensor_data) - 1, num_points, dtype=int)
    stats = [np.max(tensor_data), np.min(tensor_data), np.mean(tensor_data), np.std(tensor_data)]
    sample_data = tensor_data[indices]
    stats_str = ",".join(f"{x:.5f}" for x in stats)
    sample_str = ",".join(f"{x:.5f}" for x in sample_data)
    return f"{shape}-{stats_str},{sample_str}"

def generate_random_data(shape):
    # 生成符合指定形状的随机数据
    max_val, min_val, mean, std = 0.04025, -0.04651, 0.0, 0.00134
    data = np.random.normal(mean, std, shape)
    data = (data - data.min()) / (data.max() - data.min()) * (max_val - min_val) + min_val
    return data

index_counter = 0

def log_tensor_data(name, tensor):
    # 打印tensor的日志数据
    global index_counter
    index_counter += 1
    timestamp = datetime.now().strftime("%H%M%S%f")
    if is_tensor(tensor):
        print(f"{timestamp},{index_counter},{name},0,{describe_tensor(tensor)}")
    elif isinstance(tensor, (tuple, list)):
        for idx, t in enumerate(tensor):
            if is_tensor(t):
                print(f"{timestamp},{index_counter},{name},{idx},{describe_tensor(t)}")

def log_gradient(model):
    # 打印模型参数梯度信息
    for name, param in model.named_parameters():
        if param.grad is not None:
            log_tensor_data(f"grad-{name}", param.grad)

# 对象和类名缓存
object_cache = {}
class_name_count = {}

def get_unique_name(class_name, obj_id):
    # 生成唯一的对象名称
    if class_name not in class_name_count:
        class_name_count[class_name] = 0
    uid = f"{class_name}_{obj_id}"
    if uid not in object_cache:
        class_name_count[class_name] += 1
        object_cache[uid] = {"idx": class_name_count[class_name]}
    return f'{class_name}-{object_cache[uid]["idx"]}'

def initialize_module_attributes(module):
    # 初始化模块属性
    if not hasattr(module, 'uuid'):
        module.uuid = get_unique_name(module.__class__.__name__, id(module))
    if not hasattr(module, 'backward_step'):
        module.backward_step = 0
    if not hasattr(module, 'forward_step'):
        module.forward_step = 0

def forward_decorator():
    # 包装forward函数的修饰器
    def decorator(func):
        def wrapped(*args, **kwargs):
            module = args[0]
            initialize_module_attributes(module)
            module.forward_step += 1
            log_tensor_data(f"forward-{module.uuid}-{module.forward_step}-input", args)
            output = func(*args, **kwargs)
            log_tensor_data(f"forward-{module.uuid}-{module.forward_step}-output", output)
            return output
        return wrapped
    return decorator

def pre_backward_hook(module, grad_input):
    # 反向传播前的钩子函数
    initialize_module_attributes(module)
    module.backward_step += 1
    log_tensor_data(f"backward-{module.uuid}-{module.backward_step}-input", grad_input)

def post_backward_hook(module, grad_input, grad_output):
    # 反向传播后的钩子函数
    initialize_module_attributes(module)
    log_tensor_data(f"backward-{module.uuid}-{module.backward_step}-output", grad_output)

def register_backward_hooks(module):
    # 注册反向传播钩子
    module.register_full_backward_pre_hook(pre_backward_hook)
    module.register_full_backward_hook(post_backward_hook)

class CustomLinear(nn.Module):
    def __init__(self, shape):
        super(CustomLinear, self).__init__()
        weight_data = torch.from_numpy(generate_random_data(shape)).half().to(device)
        self.weight = nn.Parameter(weight_data)
        self.register_parameter('bias', None)
        register_backward_hooks(self)

    @forward_decorator()
    def forward(self, input_):
        return F.linear(input_, self.weight, self.bias)

class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.layer1 = CustomLinear((5504, 4096))
        self.layer2 = CustomLinear((4096, 5504))

    @forward_decorator()
    def forward(self, input_):
        out = self.layer1(input_)
        out = self.layer2(out)
        return out
# 设置随机种子
np.random.seed(1)
torch.manual_seed(2)

# 创建和训练模型
model = MyModel().half().to(device)
model.train()

input_data = torch.from_numpy(generate_random_data((1024, 12, 4096))).half().to(device)
target_data = torch.from_numpy(generate_random_data((1024, 12, 4096))).half().to(device)

for _ in range(2):
    outputs = model(input_data)
    outputs.backward(target_data)  # 使用全一的梯度来反向传播
    log_gradient(model)
EOF
python3 linear_test.py