知识点回顾:
过拟合的判断:测试集和训练集同步打印指标
模型的保存和加载
仅保存权重
保存权重和模型
保存全部信息checkpoint,还包含训练状态
早停策略
作业:对信贷数据集训练后保存权重,加载权重后继续训练50轮,并采取早停策略
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (accuracy_score, precision_score,
recall_score, f1_score, roc_auc_score,
confusion_matrix)
import matplotlib.pyplot as plt
import seaborn as sns
import os
from pathlib import Path
from typing import Tuple, Dict, List, Optional
# --- 1. 配置常量 ---
# 使用Path对象处理路径,确保跨平台兼容性
BASE_DIR = Path(__file__).parent.resolve()
DATA_PATH = BASE_DIR / "data" / "credit_risk_data.csv"
MODEL_SAVE_DIR = BASE_DIR / "saved_models"
MODEL_SAVE_PATH = MODEL_SAVE_DIR / "credit_risk_model.pth"
# 确保目录存在
MODEL_SAVE_DIR.mkdir(parents=True, exist_ok=True)
# 训练超参数
RANDOM_SEED = 42
BATCH_SIZE = 64
LEARNING_RATE = 0.001
NUM_EPOCHS = 50
HIDDEN_LAYER_SIZES = [128, 64, 32] # 隐藏层配置
DROPOUT_RATE = 0.3 # 添加dropout防止过拟合
# 设备配置
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {DEVICE}")
# 设置随机种子确保可复现性
def set_seed(seed):
torch.manual_seed(seed)
np.random.seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
set_seed(RANDOM_SEED)
# --- 2. 数据加载与预处理 ---
class CreditRiskDataset(Dataset):
"""信贷风险数据集类"""
def __init__(self, features: np.ndarray, labels: np.ndarray):
self.features = torch.tensor(features, dtype=torch.float32)
self.labels = torch.tensor(labels, dtype=torch.float32).unsqueeze(1)
def __len__(self) -> int:
return len(self.labels)
def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
return self.features[idx], self.labels[idx]
def load_and_preprocess_data(file_path: Path, target_col: str = 'default') -> Tuple[DataLoader, DataLoader, DataLoader, StandardScaler]:
"""加载并预处理数据"""
print(f"加载数据: {file_path}")
# 检查文件是否存在
if not file_path.exists():
raise FileNotFoundError(f"数据文件不存在: {file_path}")
# 读取数据
df = pd.read_csv(file_path)
print(f"数据形状: {df.shape}")
# 处理缺失值
if df.isnull().sum().sum() > 0:
print("处理缺失值...")
for col in df.select_dtypes(include=np.number).columns:
df[col].fillna(df[col].median(), inplace=True)
# 分离特征和目标
X = df.drop(target_col, axis=1).values
y = df[target_col].values
# 标准化特征
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 创建数据集
full_dataset = CreditRiskDataset(X_scaled, y)
# 划分数据集 (70% 训练, 15% 验证, 15% 测试)
train_size = int(0.7 * len(full_dataset))
val_size = int(0.15 * len(full_dataset))
test_size = len(full_dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(
full_dataset, [train_size, val_size, test_size],
generator=torch.Generator().manual_seed(RANDOM_SEED)
)
# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)
print(f"数据集划分: 训练集 {len(train_dataset)} | 验证集 {len(val_dataset)} | 测试集 {len(test_dataset)}")
return train_loader, val_loader, test_loader, scaler
# --- 3. 模型架构 ---
class CreditRiskPredictor(nn.Module):
"""信贷风险预测神经网络"""
def __init__(self, input_size: int, hidden_sizes: List[int], output_size: int = 1):
super().__init__()
layers = []
prev_size = input_size
# 构建隐藏层
for i, h_size in enumerate(hidden_sizes):
layers.append(nn.Linear(prev_size, h_size))
layers.append(nn.BatchNorm1d(h_size)) # 添加批归一化
layers.append(nn.ReLU())
layers.append(nn.Dropout(DROPOUT_RATE)) # 添加dropout
prev_size = h_size
# 输出层
layers.append(nn.Linear(prev_size, output_size))
self.model = nn.Sequential(*layers)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.model(x)
# --- 4. 训练函数 ---
def train_model(model, train_loader, val_loader, optimizer, criterion, epochs, device):
"""训练模型并返回训练历史"""
history = {'train_loss': [], 'val_loss': [], 'val_auc': []}
best_val_loss = float('inf')
model.to(device)
for epoch in range(epochs):
# 训练阶段
model.train()
train_loss = 0.0
for inputs, targets in train_loader:
inputs, targets = inputs.to(device), targets.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
train_loss += loss.item() * inputs.size(0)
train_loss = train_loss / len(train_loader.dataset)
history['train_loss'].append(train_loss)
# 验证阶段
model.eval()
val_loss = 0.0
all_targets = []
all_probs = []
with torch.no_grad():
for inputs, targets in val_loader:
inputs, targets = inputs.to(device), targets.to(device)
outputs = model(inputs)
loss = criterion(outputs, targets)
val_loss += loss.item() * inputs.size(0)
probs = torch.sigmoid(outputs)
all_targets.extend(targets.cpu().numpy())
all_probs.extend(probs.cpu().numpy())
val_loss = val_loss / len(val_loader.dataset)
val_auc = roc_auc_score(all_targets, all_probs)
history['val_loss'].append(val_loss)
history['val_auc'].append(val_auc)
# 保存最佳模型
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), MODEL_SAVE_PATH)
print(f"保存最佳模型 @ Epoch {epoch+1}, Val Loss: {val_loss:.4f}, AUC: {val_auc:.4f}")
print(f"Epoch {epoch+1}/{epochs} | "
f"Train Loss: {train_loss:.4f} | "
f"Val Loss: {val_loss:.4f} | "
f"AUC: {val_auc:.4f}")
# 绘制训练历史
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(history['train_loss'], label='Train Loss')
plt.plot(history['val_loss'], label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.subplot(1, 2, 2)
plt.plot(history['val_auc'], 'g-', label='Validation AUC')
plt.title('Validation AUC')
plt.xlabel('Epoch')
plt.ylabel('AUC')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig('training_history.png')
plt.show()
return history
# --- 5. 评估函数 ---
def evaluate_model(model, test_loader, device):
"""评估模型性能"""
model.eval()
model.to(device)
all_targets = []
all_preds = []
all_probs = []
with torch.no_grad():
for inputs, targets in test_loader:
inputs, targets = inputs.to(device), targets.to(device)
outputs = model(inputs)
probs = torch.sigmoid(outputs)
preds = (probs > 0.5).float()
all_targets.extend(targets.cpu().numpy())
all_preds.extend(preds.cpu().numpy())
all_probs.extend(probs.cpu().numpy())
# 计算指标
metrics = {
'accuracy': accuracy_score(all_targets, all_preds),
'precision': precision_score(all_targets, all_preds),
'recall': recall_score(all_targets, all_preds),
'f1': f1_score(all_targets, all_preds),
'roc_auc': roc_auc_score(all_targets, all_probs)
}
# 打印指标
print("\n模型评估结果:")
for metric, value in metrics.items():
print(f"{metric.capitalize()}: {value:.4f}")
# 绘制混淆矩阵
cm = confusion_matrix(all_targets, all_preds)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=['No Default', 'Default'],
yticklabels=['No Default', 'Default'])
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.savefig('confusion_matrix.png')
plt.show()
return metrics
# --- 6. 主函数 ---
def main():
# 加载数据
try:
train_loader, val_loader, test_loader, scaler = load_and_preprocess_data(DATA_PATH)
except Exception as e:
print(f"数据加载失败: {e}")
return
# 初始化模型
sample_features, _ = next(iter(train_loader))
input_size = sample_features.shape[1]
model = CreditRiskPredictor(input_size, HIDDEN_LAYER_SIZES)
print(f"模型架构:\n{model}")
# 损失函数和优化器
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-5)
# 训练模型
history = train_model(
model, train_loader, val_loader,
optimizer, criterion, NUM_EPOCHS, DEVICE
)
# 加载最佳模型进行评估
best_model = CreditRiskPredictor(input_size, HIDDEN_LAYER_SIZES)
best_model.load_state_dict(torch.load(MODEL_SAVE_PATH))
best_model.to(DEVICE)
# 在测试集上评估
test_metrics = evaluate_model(best_model, test_loader, DEVICE)
# 示例预测
sample_idx = np.random.randint(0, len(test_loader.dataset))
sample_data, true_label = test_loader.dataset[sample_idx]
best_model.eval()
with torch.no_grad():
sample_data = sample_data.unsqueeze(0).to(DEVICE)
logit = best_model(sample_data)
prob = torch.sigmoid(logit).item()
pred = 1 if prob > 0.5 else 0
print(f"\n示例预测:")
print(f"原始特征: {sample_data.cpu().numpy().squeeze()}")
print(f"真实标签: {true_label.item()}")
print(f"预测概率: {prob:.4f}")
print(f"预测结果: {pred}")
if __name__ == "__main__":
main()