Python 训练营打卡 Day 45

发布于:2025-06-07 ⋅ 阅读:(14) ⋅ 点赞:(0)

TensorBoard

简单来说,TensorBoard 是 TensorFlow 自带的一个「可视化工具」,就像给机器学习模型训练过程装了一个「监控屏幕」。你可以用它直观看到训练过程中的数据变化(比如损失值、准确率)、模型结构、数据分布等,不用盯着一堆枯燥的数字看,对新手非常友好,目前这个工具还在不断发展,比如一些额外功能在tensorboardX上存在,但是我们目前只需要要用到最经典的几个功能即可

  1. 保存模型结构图
  2. 保存训练集和验证集的loss变化曲线,不需要手动打印了
  3. 保存每一个层结构权重分布
  4. 保存预测图片的预测信息

原理:TensorBoard的核心原理就是在训练过程中,把训练过程中的数据(比如损失、准确率、图片等)先记录到日志文件里,再通过工具把这些日志文件可视化成图表,这样就不用自己手动打印数据或者用其他工具画图,所以核心就是2个步骤:

1. 数据怎么存?—— 先写日志文件

训练模型时,TensorBoard 会让程序把训练数据(比如损失值、准确率)和模型结构等信息,写入一个特殊的日志文件(.tfevents 文件)

2. 数据怎么看?—— 用网页展示日志

写完日志后,TensorBoard会启动一个本地网页服务,自动读取日志文件里的数据,用图表、图像、文本等形式展示出来。如果只用 print(损失值) 或者自己用 matplotlib 画图,不仅麻烦,还得手动保存数据、写代码,尤其训练几天几夜时,根本没法实时盯着看。而 TensorBoard 能自动把这些数据 “存下来 + 画出来”,还能生成网页版的可视化界面,随时刷新查看!

常用到的核心代码解析:

# 1.日志目录自动管理:自动创建新目录,避免重复覆盖
log_dir = 'runs/cifar10_mlp_experiment' #设置存储目录
if os.path.exists(log_dir):
# 检查目录是否存在
    i = 1
    while os.path.exists(f"{log_dir}_{i}"):
        i += 1
    log_dir = f"{log_dir}_{i}"
writer = SummaryWriter(log_dir) #关键入口,用于写入数据到日志目录

# 2.记录标量数据:跟踪训练过程中的损失、准确率等
# 记录每个 Batch 的损失和准确率
writer.add_scalar('Train/Batch_Loss', batch_loss, global_step)
writer.add_scalar('Train/Batch_Accuracy', batch_acc, global_step)

# 记录每个 Epoch 的训练指标
writer.add_scalar('Train/Epoch_Loss', epoch_train_loss, epoch)
writer.add_scalar('Train/Epoch_Accuracy', epoch_train_acc, epoch)

# 3.可视化结构模型:记录模型的计算图
dataiter = iter(train_loader)  # 从训练数据集中获取一个批次的图像和标签
images, labels = next(dataiter)  # 获取第一个批次的数据
images = images.to(device) 
writer.add_graph(model, images)  # 通过真实输入样本生成模型计算图

# 4.可视化图像:记录训练过程中的原始图像和预测结果
# 可视化原始训练图像
img_grid = torchvision.utils.make_grid(images[:8].cpu()) # 将多张图像拼接成网格状(方便可视化),将前8张图像拼接成一个网格
writer.add_image('原始训练图像', img_grid)  # 记录原始图像

# 可视化错误预测样本(训练结束后)
wrong_img_grid = torchvision.utils.make_grid(wrong_images[:display_count])
writer.add_image('错误预测样本', wrong_img_grid)  # 记录错误预测的图像

# 5.记录权重和梯度直方图
if (batch_idx + 1) % 500 == 0:
    for name, param in model.named_parameters():
        writer.add_histogram(f'weights/{name}', param, global_step)  # 权重分布
        if param.grad is not None:
            writer.add_histogram(f'grads/{name}', param.grad, global_step)  # 梯度分布

CIFAR-10实例(MLP模型训练)

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
import numpy as np
import matplotlib.pyplot as plt
import os

# 设置随机种子以确保结果可复现
torch.manual_seed(42)
np.random.seed(42)

# 1. 数据预处理
transform = transforms.Compose([
    transforms.ToTensor(),                # 转换为张量
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # 标准化处理
])

# 2. 加载CIFAR-10数据集
train_dataset = datasets.CIFAR10(
    root='./data',
    train=True,
    download=True,
    transform=transform
)

test_dataset = datasets.CIFAR10(
    root='./data',
    train=False,
    transform=transform
)

# 3. 创建数据加载器,用于批量处理数据
batch_size = 64  # 每个批次的样本数
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) # 训练集打乱
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)  # 测试集不打乱

# CIFAR-10的类别名称
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

# 4. 定义MLP模型
class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.flatten = nn.Flatten()  # 将3x32x32的图像展平为3072维向量
        self.layer1 = nn.Linear(3072, 512)  # 第一层:3072个输入,512个神经元
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(0.2)  # 添加Dropout防止过拟合
        self.layer2 = nn.Linear(512, 256)  # 第二层:512个输入,256个神经元
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(0.2)
        self.layer3 = nn.Linear(256, 10)  # 输出层:10个类别
        
    def forward(self, x):
        # 第一步:将输入图像展平为一维向量
        x = self.flatten(x)  # 输入尺寸: [batch_size, 3, 32, 32] → [batch_size, 3072]
        
        # 第一层全连接 + 激活 + Dropout
        x = self.layer1(x)   # 线性变换: [batch_size, 3072] → [batch_size, 512]
        x = self.relu1(x)    # 应用ReLU激活函数
        x = self.dropout1(x) # 训练时随机丢弃部分神经元输出
        
        # 第二层全连接 + 激活 + Dropout
        x = self.layer2(x)   # 线性变换: [batch_size, 512] → [batch_size, 256]
        x = self.relu2(x)    # 应用ReLU激活函数
        x = self.dropout2(x) # 训练时随机丢弃部分神经元输出
        
        # 第三层(输出层)全连接
        x = self.layer3(x)   # 线性变换: [batch_size, 256] → [batch_size, 10]
        
        return x  # 返回未经过Softmax的logits

# 检查GPU是否可用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 初始化模型
model = MLP()
model = model.to(device)  # 将模型移至GPU

criterion = nn.CrossEntropyLoss()  # 交叉熵损失函数
optimizer = optim.Adam(model.parameters(), lr=0.001)  # Adam优化器,学习率为0.001

# 创建TensorBoard的SummaryWriter,指定日志保存目录
log_dir = 'runs/cifar10_mlp_experiment'
# 如果目录已存在,添加后缀避免覆盖
if os.path.exists(log_dir):
    i = 1
    while os.path.exists(f"{log_dir}_{i}"):
        i += 1
    log_dir = f"{log_dir}_{i}"
writer = SummaryWriter(log_dir)

# 5. 训练模型(使用TensorBoard记录各种信息)
def train(model, train_loader, test_loader, criterion, optimizer, device, epochs, writer):
    model.train()  # 设置为训练模式
    
    # 记录训练开始时间,用于计算训练速度
    global_step = 0
    
    # 可视化模型结构
    dataiter = iter(train_loader)  # 获取一个批次的样本
    images, labels = next(dataiter)  # 获取图像和标签
    images = images.to(device) 
    writer.add_graph(model, images)  # 添加模型图
    
    # 可视化原始图像样本
    img_grid = torchvision.utils.make_grid(images[:8].cpu())  # 取前8张图像
    writer.add_image('原始训练图像', img_grid) 
    
    for epoch in range(epochs):
        running_loss = 0.0 #累计损失
        correct = 0  #正确预测数
        total = 0 #总样本数
        
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)  # 移至GPU
            
            optimizer.zero_grad()  # 梯度清零
            output = model(data)  # 前向传播
            loss = criterion(output, target)  # 计算损失
            loss.backward()  # 反向传播
            optimizer.step()  # 更新参数
            
            # 统计准确率和损失
            running_loss += loss.item()
            _, predicted = output.max(1)
            total += target.size(0)
            correct += predicted.eq(target).sum().item()
            
            # 每100个批次记录一次信息到TensorBoard
            if (batch_idx + 1) % 100 == 0:
                batch_loss = loss.item()
                batch_acc = 100. * correct / total
                
                # 记录标量数据(损失、准确率)
                writer.add_scalar('Train/Batch_Loss', batch_loss, global_step)
                writer.add_scalar('Train/Batch_Accuracy', batch_acc, global_step)
                
                # 记录学习率
                writer.add_scalar('Train/Learning_Rate', optimizer.param_groups[0]['lr'], global_step)
                
                # 每500个批次记录一次直方图(权重和梯度)
                if (batch_idx + 1) % 500 == 0:
                    for name, param in model.named_parameters():
                        writer.add_histogram(f'weights/{name}', param, global_step)
                        if param.grad is not None:
                            writer.add_histogram(f'grads/{name}', param.grad, global_step)
                
                print(f'Epoch: {epoch+1}/{epochs} | Batch: {batch_idx+1}/{len(train_loader)} '
                      f'| 单Batch损失: {batch_loss:.4f} | 累计平均损失: {running_loss/(batch_idx+1):.4f}')
            
            global_step += 1
        
        # 计算当前epoch的平均训练损失和准确率
        epoch_train_loss = running_loss / len(train_loader)
        epoch_train_acc = 100. * correct / total
        
        # 记录每个epoch的训练损失和准确率
        writer.add_scalar('Train/Epoch_Loss', epoch_train_loss, epoch)
        writer.add_scalar('Train/Epoch_Accuracy', epoch_train_acc, epoch)
        
        # 测试阶段
        model.eval()  # 设置为评估模式
        test_loss = 0
        correct_test = 0
        total_test = 0
        
        # 用于存储预测错误的样本
        wrong_images = []
        wrong_labels = []
        wrong_preds = []
        
        with torch.no_grad():
            for data, target in test_loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                test_loss += criterion(output, target).item()
                _, predicted = output.max(1)
                total_test += target.size(0)
                correct_test += predicted.eq(target).sum().item()
                
                # 收集预测错误的样本
                wrong_mask = (predicted != target).cpu()
                if wrong_mask.sum() > 0:
                    wrong_batch_images = data[wrong_mask].cpu()
                    wrong_batch_labels = target[wrong_mask].cpu()
                    wrong_batch_preds = predicted[wrong_mask].cpu()
                    
                    wrong_images.extend(wrong_batch_images)
                    wrong_labels.extend(wrong_batch_labels)
                    wrong_preds.extend(wrong_batch_preds)
        
        epoch_test_loss = test_loss / len(test_loader)
        epoch_test_acc = 100. * correct_test / total_test
        
        # 记录每个epoch的测试损失和准确率
        writer.add_scalar('Test/Loss', epoch_test_loss, epoch)
        writer.add_scalar('Test/Accuracy', epoch_test_acc, epoch)
        
        # 计算并记录训练速度(每秒处理的样本数)
        # 这里简化处理,假设每个epoch的时间相同
        samples_per_epoch = len(train_loader.dataset)
        # 实际应用中应该使用time.time()来计算真实时间
        
        print(f'Epoch {epoch+1}/{epochs} 完成 | 训练准确率: {epoch_train_acc:.2f}% | 测试准确率: {epoch_test_acc:.2f}%')
        
        # 可视化预测错误的样本(只在最后一个epoch进行)
        if epoch == epochs - 1 and len(wrong_images) > 0:
            # 最多显示8个错误样本
            display_count = min(8, len(wrong_images))
            wrong_img_grid = torchvision.utils.make_grid(wrong_images[:display_count])
            
            # 创建错误预测的标签文本
            wrong_text = []
            for i in range(display_count):
                true_label = classes[wrong_labels[i]]
                pred_label = classes[wrong_preds[i]]
                wrong_text.append(f'True: {true_label}, Pred: {pred_label}')
            
            writer.add_image('错误预测样本', wrong_img_grid)
            writer.add_text('错误预测标签', '\n'.join(wrong_text), epoch)
    
    # 关闭TensorBoard写入器
    writer.close()
    
    return epoch_test_acc  # 返回最终测试准确率

# 6. 执行训练和测试
epochs = 20  # 训练轮次
print("开始训练模型...")
print(f"TensorBoard日志保存在: {log_dir}")
print("训练完成后,使用命令 `tensorboard --logdir=runs` 启动TensorBoard查看可视化结果")

final_accuracy = train(model, train_loader, test_loader, criterion, optimizer, device, epochs, writer)
print(f"训练完成!最终测试准确率: {final_accuracy:.2f}%")