python训练day43 复习日

发布于:2025-06-28 ⋅ 阅读:(23) ⋅ 点赞:(0)
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
import matplotlib.pyplot as plt
import numpy as np
 
# 设置中文字体支持,避免绘图时中文乱码
plt.rcParams["font.family"] = ["SimHei"]
plt.rcParams['axes.unicode_minus'] = False  
 
# 检查 GPU 是否可用,优先使用 GPU 加速训练
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")
 
# 1. 数据预处理(调整为 128×128 尺寸,适配数据增强)
# 训练集数据增强:先随机裁剪(从原图多样本中截取),再 resize 到 128×128;或直接随机裁剪到目标尺寸(若原图足够大)
# 这里演示更灵活的方式:先随机从原图取一块,再统一成 128×128(假设原图有足够尺寸支撑裁剪,否则需调整策略)
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(128, scale=(0.6, 1.0)),  # 随机裁剪并 resize 到 128×128,scale 控制裁剪区域占原图比例
    transforms.RandomHorizontalFlip(),  # 随机水平翻转
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  # 颜色抖动
    transforms.RandomRotation(15),  # 随机旋转
    transforms.ToTensor(),  # 转为张量
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))  # 标准化(可根据实际数据统计均值、方差优化)
])
 
# 测试集预处理:直接 resize 到 128×128,保证尺寸统一
test_transform = transforms.Compose([
    transforms.Resize((128, 128)),  # 统一 resize 到 128×128
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])
 
 
 
# 2、用全部数据拆分(示例按 8:2 拆分训练集和测试集)
full_dataset = datasets.ImageFolder(
    root=r"D:\python_learning\cyl_python\day43CNN_kaggle\BengaliFishImages\fish_images",  
    transform=train_transform
)
train_size = int(0.8 * len(full_dataset))
test_size = len(full_dataset) - train_size
train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])
 
# 3. 创建数据加载器,按批次加载数据
batch_size = 32  # 因图像尺寸变大,可适当减小 batch_size 避免显存不足
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
 
# 4. 定义适配 128×128 尺寸的 CNN 模型(需重新计算全连接层输入维度)
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()  
        
        # 第一个卷积块:输入 3 通道,输出 32 通道
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(2, 2)  # 128 -> 64
        
        # 第二个卷积块:输入 32 通道,输出 64 通道
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(2)  # 64 -> 32
        
        # 第三个卷积块:输入 64 通道,输出 128 通道
        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.relu3 = nn.ReLU()
        self.pool3 = nn.MaxPool2d(2)  # 32 -> 16
        
        # 第四个卷积块(可选,进一步提取特征,若觉得网络浅可添加)
        self.conv4 = nn.Conv2d(128, 256, 3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        self.relu4 = nn.ReLU()
        self.pool4 = nn.MaxPool2d(2)  # 16 -> 8
        
        # 计算全连接层输入维度:根据卷积后输出尺寸推算
        # 经过上述 4 次池化(若用 3 次,需对应调整),假设用 4 次池化,最终特征图尺寸是 8×8
        # 若卷积块数量变化,需重新计算:比如 3 次池化,尺寸是 128/(2^3)=16,那维度是 128*16*16 
        self.fc1 = nn.Linear(256 * 8 * 8, 512)  # 这里按 4 次池化到 8×8 计算
        self.dropout = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(512, 20)  # 20 分类,匹配数据集类别数
 
    def forward(self, x):
        # 卷积块 1 前向
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu1(x)
        x = self.pool1(x)
        
        # 卷积块 2 前向
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu2(x)
        x = self.pool2(x)
        
        # 卷积块 3 前向
        x = self.conv3(x)
        x = self.bn3(x)
        x = self.relu3(x)
        x = self.pool3(x)
        
        # 卷积块 4 前向(若添加了此块)
        x = self.conv4(x)
        x = self.bn4(x)
        x = self.relu4(x)
        x = self.pool4(x)
        
        # 展平特征图,进入全连接层
        x = x.view(-1, 256 * 8 * 8)  # 与 __init__ 中 fc1 输入维度对应
        x = self.fc1(x)
        x = self.relu3(x)
        x = self.dropout(x)
        x = self.fc2(x)
        
        return x
 
# 初始化模型,并移动到 GPU(若可用)
model = CNN().to(device)
 
# 定义损失函数、优化器、学习率调度器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', patience=3, factor=0.5
)
 
# 5. 训练与测试函数(复用逻辑,无需修改)
def train(model, train_loader, test_loader, criterion, optimizer, scheduler, device, epochs):
    model.train()  # 设置为训练模式
    all_iter_losses = []  # 记录每个 batch 的损失
    iter_indices = []     # 记录 iteration 序号
    train_acc_history = []  # 训练集准确率历史
    test_acc_history = []   # 测试集准确率历史
    train_loss_history = [] # 训练集损失历史
    test_loss_history = []  # 测试集损失历史
    
    for epoch in range(epochs):
        running_loss = 0.0
        correct = 0
        total = 0
        
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)  # 数据移到 GPU
            
            optimizer.zero_grad()  # 梯度清零
            output = model(data)   # 前向传播
            loss = criterion(output, target)  # 计算损失
            loss.backward()        # 反向传播
            optimizer.step()       # 更新参数
            
            # 记录当前 iteration 损失
            iter_loss = loss.item()
            all_iter_losses.append(iter_loss)
            iter_indices.append(epoch * len(train_loader) + batch_idx + 1)
            
            # 统计训练集准确率
            running_loss += iter_loss
            _, predicted = output.max(1)
            total += target.size(0)
            correct += predicted.eq(target).sum().item()
            
            # 每 100 个批次打印训练信息
            if (batch_idx + 1) % 100 == 0:
                print(f'Epoch: {epoch+1}/{epochs} | Batch: {batch_idx+1}/{len(train_loader)} '
                      f'| 单Batch损失: {iter_loss:.4f} | 累计平均损失: {running_loss/(batch_idx+1):.4f}')
        
        # 计算当前 epoch 训练集指标
        epoch_train_loss = running_loss / len(train_loader)
        epoch_train_acc = 100. * correct / total
        train_acc_history.append(epoch_train_acc)
        train_loss_history.append(epoch_train_loss)
        
        # 测试阶段
        model.eval()  # 切换为评估模式
        test_loss = 0
        correct_test = 0
        total_test = 0
        
        with torch.no_grad():  # 测试时无需计算梯度
            for data, target in test_loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                test_loss += criterion(output, target).item()
                _, predicted = output.max(1)
                total_test += target.size(0)
                correct_test += predicted.eq(target).sum().item()
        
        # 计算当前 epoch 测试集指标
        epoch_test_loss = test_loss / len(test_loader)
        epoch_test_acc = 100. * correct_test / total_test
        test_acc_history.append(epoch_test_acc)
        test_loss_history.append(epoch_test_loss)
        
        # 更新学习率调度器
        scheduler.step(epoch_test_loss)
        
        print(f'Epoch {epoch+1}/{epochs} 完成 | 训练准确率: {epoch_train_acc:.2f}% | 测试准确率: {epoch_test_acc:.2f}%')
    
    # 绘制 iteration 损失曲线
    plot_iter_losses(all_iter_losses, iter_indices)
    # 绘制 epoch 指标曲线
    plot_epoch_metrics(train_acc_history, test_acc_history, train_loss_history, test_loss_history)
    
    return epoch_test_acc
 
# 6. 绘图函数(可视化训练过程,无需修改)
def plot_iter_losses(losses, indices):
    plt.figure(figsize=(10, 4))
    plt.plot(indices, losses, 'b-', alpha=0.7, label='Iteration Loss')
    plt.xlabel('Iteration(Batch序号)')
    plt.ylabel('损失值')
    plt.title('每个 Iteration 的训练损失')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()
 
def plot_epoch_metrics(train_acc, test_acc, train_loss, test_loss):
    epochs = range(1, len(train_acc) + 1)
    
    plt.figure(figsize=(12, 4))
    
    # 绘制准确率曲线
    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_acc, 'b-', label='训练准确率')
    plt.plot(epochs, test_acc, 'r-', label='测试准确率')
    plt.xlabel('Epoch')
    plt.ylabel('准确率 (%)')
    plt.title('训练和测试准确率')
    plt.legend()
    plt.grid(True)
    
    # 绘制损失曲线
    plt.subplot(1, 2, 2)
    plt.plot(epochs, train_loss, 'b-', label='训练损失')
    plt.plot(epochs, test_loss, 'r-', label='测试损失')
    plt.xlabel('Epoch')
    plt.ylabel('损失值')
    plt.title('训练和测试损失')
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.show()
 
# 7. 执行训练(根据显存情况调整 epochs 和 batch_size )
epochs = 80  # 图像尺寸大,训练慢,可先小批次试训
print("开始使用 CNN 训练自定义鱼类数据集(128×128 尺寸)...")
final_accuracy = train(model, train_loader, test_loader, criterion, optimizer, scheduler, device, epochs)
print(f"训练完成!最终测试准确率: {final_accuracy:.2f}%")
 
# 保存模型(可选,根据需求决定是否保存)
# torch.save(model.state_dict(), 'bengali_fish_cnn_128.pth')
import warnings
warnings.filterwarnings("ignore")
import matplotlib.pyplot as plt
import numpy as np
import torch
from torchvision import transforms
from PIL import Image
 
# 设置中文字体支持
plt.rcParams["font.family"] = ["SimHei"]
plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题
 
# Grad-CAM实现
class GradCAM:
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.feature_maps = None
        self.gradient = None
        
        # 注册钩子
        self.hook_handles = []
        
        # 保存特征图的正向钩子
        def forward_hook(module, input, output):
            self.feature_maps = output.detach()
            
        # 保存梯度的反向钩子
        def backward_hook(module, grad_in, grad_out):
            self.gradient = grad_out[0].detach()
            
        self.hook_handles.append(target_layer.register_forward_hook(forward_hook))
        self.hook_handles.append(target_layer.register_backward_hook(backward_hook))
        
        # 设置为评估模式
        self.model.eval()
        
    def __call__(self, x):
        # 前向传播
        x = x.to(device)
        self.model.zero_grad()
        output = self.model(x)
        
        # 获取预测类别
        pred_class = torch.argmax(output, dim=1).item()
        
        # 反向传播
        one_hot = torch.zeros_like(output)
        one_hot[0, pred_class] = 1
        output.backward(gradient=one_hot, retain_graph=True)
        
        # 计算权重 (全局平均池化梯度)
        weights = torch.mean(self.gradient, dim=(2, 3), keepdim=True)
        
        # 加权组合特征图
        cam = torch.sum(weights * self.feature_maps, dim=1).squeeze()
        
        # ReLU激活,因为我们只关心对预测有积极贡献的区域
        cam = torch.relu(cam)
        
        # 归一化
        if torch.max(cam) > 0:
            cam = cam / torch.max(cam)
            
        # 调整为图像大小
        cam = torch.nn.functional.interpolate(
            cam.unsqueeze(0).unsqueeze(0),
            size=(128, 128),  # 匹配图像尺寸
            mode='bilinear',
            align_corners=False
        ).squeeze()
        
        return cam.cpu().numpy(), pred_class
    
    def remove_hooks(self):
        for handle in self.hook_handles:
            handle.remove()
 
# 转换图像以便可视化
def tensor_to_np(tensor):
    img = tensor.cpu().numpy().transpose(1, 2, 0)
    # 使用实际训练时的均值和标准差
    mean = np.array([0.4914, 0.4822, 0.4465])
    std = np.array([0.2023, 0.1994, 0.2010])
    img = std * img + mean  # 反标准化
    img = np.clip(img, 0, 1)  # 确保像素值在[0,1]范围内
    return img
 
# 选择一个随机图像
# idx = np.random.randint(len(test_dataset))
idx = 110  # 选择测试集中的第103张图片 (索引从0开始)
image, label = test_dataset[idx]
 
# 获取类别名称(根据ImageFolder自动映射的类别顺序)
classes = test_dataset.dataset.classes  # 注意:如果使用了random_split,需要从原始dataset获取classes
print(f"选择的图像类别: {classes[label]}")
 
# 添加批次维度并移动到设备
input_tensor = image.unsqueeze(0).to(device)
 
# 初始化Grad-CAM(使用model.conv3作为目标层)
# grad_cam = GradCAM(model, model.conv3)
grad_cam = GradCAM(model, model.conv4)
 
# 生成热力图(修正方法调用)
heatmap, pred_class = grad_cam(input_tensor)  # 直接调用对象,而非使用generate_cam方法
 
# 可视化
plt.figure(figsize=(15, 5))
 
# 原始图像
plt.subplot(1, 3, 1)
plt.imshow(tensor_to_np(image))
plt.title(f"原始图像: {classes[label]}")
plt.axis('off')
 
# 热力图
plt.subplot(1, 3, 2)
plt.imshow(heatmap, cmap='jet')
plt.title(f"Grad-CAM热力图: {classes[pred_class]}")
plt.axis('off')
 
# 叠加的图像
plt.subplot(1, 3, 3)
img = tensor_to_np(image)
heatmap_resized = np.uint8(255 * heatmap)
heatmap_colored = plt.cm.jet(heatmap_resized)[:, :, :3]
superimposed_img = heatmap_colored * 0.4 + img * 0.6
plt.imshow(superimposed_img)
plt.title("叠加热力图")
plt.axis('off')
 
plt.tight_layout()
plt.savefig('grad_cam_result_128_crop_100.png')
plt.show()
 
print("Grad-CAM可视化完成。已保存为grad_cam_result_128_crop_100.png")

@浙大疏锦行


网站公告

今日签到

点亮在社区的每一天
去签到