python37天打卡

发布于:2025-05-31 ⋅ 阅读:(26) ⋅ 点赞:(0)

知识点回顾:

过拟合的判断:测试集和训练集同步打印指标
模型的保存和加载
仅保存权重
保存权重和模型

保存全部信息checkpoint,还包含训练状态
早停策略
作业:对信贷数据集训练后保存权重,加载权重后继续训练50轮,并采取早停策略

import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (accuracy_score, precision_score, 
                            recall_score, f1_score, roc_auc_score,
                            confusion_matrix)
import matplotlib.pyplot as plt
import seaborn as sns
import os
from pathlib import Path
from typing import Tuple, Dict, List, Optional

# --- 1. 配置常量 ---
# 使用Path对象处理路径,确保跨平台兼容性
BASE_DIR = Path(__file__).parent.resolve()
DATA_PATH = BASE_DIR / "data" / "credit_risk_data.csv"
MODEL_SAVE_DIR = BASE_DIR / "saved_models"
MODEL_SAVE_PATH = MODEL_SAVE_DIR / "credit_risk_model.pth"

# 确保目录存在
MODEL_SAVE_DIR.mkdir(parents=True, exist_ok=True)

# 训练超参数
RANDOM_SEED = 42
BATCH_SIZE = 64
LEARNING_RATE = 0.001
NUM_EPOCHS = 50
HIDDEN_LAYER_SIZES = [128, 64, 32]  # 隐藏层配置
DROPOUT_RATE = 0.3  # 添加dropout防止过拟合

# 设备配置
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {DEVICE}")

# 设置随机种子确保可复现性
def set_seed(seed):
    torch.manual_seed(seed)
    np.random.seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

set_seed(RANDOM_SEED)

# --- 2. 数据加载与预处理 ---
class CreditRiskDataset(Dataset):
    """信贷风险数据集类"""
    def __init__(self, features: np.ndarray, labels: np.ndarray):
        self.features = torch.tensor(features, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.float32).unsqueeze(1)
    
    def __len__(self) -> int:
        return len(self.labels)
    
    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        return self.features[idx], self.labels[idx]

def load_and_preprocess_data(file_path: Path, target_col: str = 'default') -> Tuple[DataLoader, DataLoader, DataLoader, StandardScaler]:
    """加载并预处理数据"""
    print(f"加载数据: {file_path}")
    
    # 检查文件是否存在
    if not file_path.exists():
        raise FileNotFoundError(f"数据文件不存在: {file_path}")
    
    # 读取数据
    df = pd.read_csv(file_path)
    print(f"数据形状: {df.shape}")
    
    # 处理缺失值
    if df.isnull().sum().sum() > 0:
        print("处理缺失值...")
        for col in df.select_dtypes(include=np.number).columns:
            df[col].fillna(df[col].median(), inplace=True)
    
    # 分离特征和目标
    X = df.drop(target_col, axis=1).values
    y = df[target_col].values
    
    # 标准化特征
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    # 创建数据集
    full_dataset = CreditRiskDataset(X_scaled, y)
    
    # 划分数据集 (70% 训练, 15% 验证, 15% 测试)
    train_size = int(0.7 * len(full_dataset))
    val_size = int(0.15 * len(full_dataset))
    test_size = len(full_dataset) - train_size - val_size
    
    train_dataset, val_dataset, test_dataset = random_split(
        full_dataset, [train_size, val_size, test_size], 
        generator=torch.Generator().manual_seed(RANDOM_SEED)
    )
    
    # 创建数据加载器
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)
    
    print(f"数据集划分: 训练集 {len(train_dataset)} | 验证集 {len(val_dataset)} | 测试集 {len(test_dataset)}")
    
    return train_loader, val_loader, test_loader, scaler

# --- 3. 模型架构 ---
class CreditRiskPredictor(nn.Module):
    """信贷风险预测神经网络"""
    def __init__(self, input_size: int, hidden_sizes: List[int], output_size: int = 1):
        super().__init__()
        
        layers = []
        prev_size = input_size
        
        # 构建隐藏层
        for i, h_size in enumerate(hidden_sizes):
            layers.append(nn.Linear(prev_size, h_size))
            layers.append(nn.BatchNorm1d(h_size))  # 添加批归一化
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(DROPOUT_RATE))  # 添加dropout
            prev_size = h_size
        
        # 输出层
        layers.append(nn.Linear(prev_size, output_size))
        
        self.model = nn.Sequential(*layers)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.model(x)

# --- 4. 训练函数 ---
def train_model(model, train_loader, val_loader, optimizer, criterion, epochs, device):
    """训练模型并返回训练历史"""
    history = {'train_loss': [], 'val_loss': [], 'val_auc': []}
    best_val_loss = float('inf')
    
    model.to(device)
    
    for epoch in range(epochs):
        # 训练阶段
        model.train()
        train_loss = 0.0
        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item() * inputs.size(0)
        
        train_loss = train_loss / len(train_loader.dataset)
        history['train_loss'].append(train_loss)
        
        # 验证阶段
        model.eval()
        val_loss = 0.0
        all_targets = []
        all_probs = []
        
        with torch.no_grad():
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                val_loss += loss.item() * inputs.size(0)
                
                probs = torch.sigmoid(outputs)
                all_targets.extend(targets.cpu().numpy())
                all_probs.extend(probs.cpu().numpy())
        
        val_loss = val_loss / len(val_loader.dataset)
        val_auc = roc_auc_score(all_targets, all_probs)
        
        history['val_loss'].append(val_loss)
        history['val_auc'].append(val_auc)
        
        # 保存最佳模型
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), MODEL_SAVE_PATH)
            print(f"保存最佳模型 @ Epoch {epoch+1}, Val Loss: {val_loss:.4f}, AUC: {val_auc:.4f}")
        
        print(f"Epoch {epoch+1}/{epochs} | "
              f"Train Loss: {train_loss:.4f} | "
              f"Val Loss: {val_loss:.4f} | "
              f"AUC: {val_auc:.4f}")
    
    # 绘制训练历史
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(history['train_loss'], label='Train Loss')
    plt.plot(history['val_loss'], label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    
    plt.subplot(1, 2, 2)
    plt.plot(history['val_auc'], 'g-', label='Validation AUC')
    plt.title('Validation AUC')
    plt.xlabel('Epoch')
    plt.ylabel('AUC')
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.savefig('training_history.png')
    plt.show()
    
    return history

# --- 5. 评估函数 ---
def evaluate_model(model, test_loader, device):
    """评估模型性能"""
    model.eval()
    model.to(device)
    
    all_targets = []
    all_preds = []
    all_probs = []
    
    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            
            outputs = model(inputs)
            probs = torch.sigmoid(outputs)
            preds = (probs > 0.5).float()
            
            all_targets.extend(targets.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())
    
    # 计算指标
    metrics = {
        'accuracy': accuracy_score(all_targets, all_preds),
        'precision': precision_score(all_targets, all_preds),
        'recall': recall_score(all_targets, all_preds),
        'f1': f1_score(all_targets, all_preds),
        'roc_auc': roc_auc_score(all_targets, all_probs)
    }
    
    # 打印指标
    print("\n模型评估结果:")
    for metric, value in metrics.items():
        print(f"{metric.capitalize()}: {value:.4f}")
    
    # 绘制混淆矩阵
    cm = confusion_matrix(all_targets, all_preds)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=['No Default', 'Default'],
                yticklabels=['No Default', 'Default'])
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.savefig('confusion_matrix.png')
    plt.show()
    
    return metrics

# --- 6. 主函数 ---
def main():
    # 加载数据
    try:
        train_loader, val_loader, test_loader, scaler = load_and_preprocess_data(DATA_PATH)
    except Exception as e:
        print(f"数据加载失败: {e}")
        return
    
    # 初始化模型
    sample_features, _ = next(iter(train_loader))
    input_size = sample_features.shape[1]
    
    model = CreditRiskPredictor(input_size, HIDDEN_LAYER_SIZES)
    print(f"模型架构:\n{model}")
    
    # 损失函数和优化器
    criterion = nn.BCEWithLogitsLoss()
    optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-5)
    
    # 训练模型
    history = train_model(
        model, train_loader, val_loader, 
        optimizer, criterion, NUM_EPOCHS, DEVICE
    )
    
    # 加载最佳模型进行评估
    best_model = CreditRiskPredictor(input_size, HIDDEN_LAYER_SIZES)
    best_model.load_state_dict(torch.load(MODEL_SAVE_PATH))
    best_model.to(DEVICE)
    
    # 在测试集上评估
    test_metrics = evaluate_model(best_model, test_loader, DEVICE)
    
    # 示例预测
    sample_idx = np.random.randint(0, len(test_loader.dataset))
    sample_data, true_label = test_loader.dataset[sample_idx]
    
    best_model.eval()
    with torch.no_grad():
        sample_data = sample_data.unsqueeze(0).to(DEVICE)
        logit = best_model(sample_data)
        prob = torch.sigmoid(logit).item()
        pred = 1 if prob > 0.5 else 0
    
    print(f"\n示例预测:")
    print(f"原始特征: {sample_data.cpu().numpy().squeeze()}")
    print(f"真实标签: {true_label.item()}")
    print(f"预测概率: {prob:.4f}")
    print(f"预测结果: {pred}")

if __name__ == "__main__":
    main()

@浙大疏锦行