作业:
对于心脏病数据集,对于病人这个不平衡的样本用GAN来学习并生成病人样本,观察不用GAN和用GAN的F1分数差异。
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score, classification_report
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
import seaborn as sns
# 设置随机种子保证可重复性
torch.manual_seed(42)
np.random.seed(42)
# 设备配置
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# ----------------------------
# 1. 数据准备与不平衡分析
# ----------------------------
print("1. 加载数据并分析不平衡...")
data = pd.read_csv('heart.csv')
print("原始数据分布:\n", data['target'].value_counts())
# 提取少数类样本(患病样本)
minority_data = data[data['target'] == 0].drop('target', axis=1)
majority_data = data[data['target'] == 1]
# 数据标准化到[-1, 1]
scaler = MinMaxScaler(feature_range=(-1, 1))
scaled_minority = scaler.fit_transform(minority_data)
# 转换为PyTorch张量
tensor_data = torch.FloatTensor(scaled_minority).to(device)
dataset = TensorDataset(tensor_data)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
# ----------------------------
# 2. GAN模型定义
# ----------------------------
LATENT_DIM = 10
FEATURE_DIM = minority_data.shape[1]
class Generator(nn.Module):
def __init__(self):
super().__init__()
self.model = nn.Sequential(
nn.Linear(LATENT_DIM, 32),
nn.LeakyReLU(0.2),
nn.Linear(32, 64),
nn.BatchNorm1d(64),
nn.LeakyReLU(0.2),
nn.Linear(64, FEATURE_DIM),
nn.Tanh()
)
def forward(self, z):
return self.model(z)
class Discriminator(nn.Module):
def __init__(self):
super().__init__()
self.model = nn.Sequential(
nn.Linear(FEATURE_DIM, 64),
nn.LeakyReLU(0.2),
nn.Linear(64, 32),
nn.LeakyReLU(0.2),
nn.Linear(32, 1),
nn.Sigmoid()
)
def forward(self, x):
return self.model(x)
# 初始化模型
generator = Generator().to(device)
discriminator = Discriminator().to(device)
# 损失函数和优化器
criterion = nn.BCELoss()
g_optimizer = optim.Adam(generator.parameters(), lr=0.0002, betas=(0.5, 0.999))
d_optimizer = optim.Adam(discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))
# ----------------------------
# 3. GAN训练
# ----------------------------
print("\n2. 开始训练GAN...")
EPOCHS = 10000
for epoch in range(EPOCHS):
for real_samples in dataloader:
real_samples = real_samples[0]
batch_size = real_samples.size(0)
# 训练判别器
d_optimizer.zero_grad()
# 真实样本
real_labels = torch.ones(batch_size, 1).to(device)
real_output = discriminator(real_samples)
d_loss_real = criterion(real_output, real_labels)
# 生成样本
noise = torch.randn(batch_size, LATENT_DIM).to(device)
fake_samples = generator(noise).detach()
fake_labels = torch.zeros(batch_size, 1).to(device)
fake_output = discriminator(fake_samples)
d_loss_fake = criterion(fake_output, fake_labels)
d_loss = d_loss_real + d_loss_fake
d_loss.backward()
d_optimizer.step()
# 训练生成器
g_optimizer.zero_grad()
noise = torch.randn(batch_size, LATENT_DIM).to(device)
fake_samples = generator(noise)
g_output = discriminator(fake_samples)
g_loss = criterion(g_output, real_labels)
g_loss.backward()
g_optimizer.step()
if (epoch+1) % 1000 == 0:
print(f"Epoch [{epoch+1}/{EPOCHS}] | D_loss: {d_loss.item():.4f} | G_loss: {g_loss.item():.4f}")
# ----------------------------
# 4. 生成新样本
# ----------------------------
print("\n3. 生成合成样本...")
num_samples = len(minority_data) # 生成与原始少数类相同数量的样本
noise = torch.randn(num_samples, LATENT_DIM).to(device)
with torch.no_grad():
generated_samples = generator(noise).cpu().numpy()
# 反标准化到原始范围
generated_data = scaler.inverse_transform(generated_samples)
generated_df = pd.DataFrame(generated_data, columns=minority_data.columns)
generated_df['target'] = 0 # 标记为患病类
# 合并增强数据集
augmented_data = pd.concat([data, generated_df], ignore_index=True)
print("增强后数据分布:\n", augmented_data['target'].value_counts())
# ----------------------------
# 5. 模型评估
# ----------------------------
print("\n4. 评估模型性能...")
def evaluate(X, y):
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, stratify=y, random_state=42
)
clf = RandomForestClassifier(random_state=42)
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
print(classification_report(y_test, y_pred))
return f1_score(y_test, y_pred, pos_label=0)
# 原始数据评估
print("原始数据性能:")
original_f1 = evaluate(
data.drop('target', axis=1),
data['target']
)
# 增强数据评估
print("\nGAN增强后性能:")
augmented_f1 = evaluate(
augmented_data.drop('target', axis=1),
augmented_data['target']
)
print(f"\nF1分数提升: {augmented_f1 - original_f1:.2f}")
1. 加载数据并分析不平衡...
原始数据分布:
target
1 165
0 138
Name: count, dtype: int64
2. 开始训练GAN...
Epoch [1000/10000] | D_loss: 1.1630 | G_loss: 1.0916
Epoch [2000/10000] | D_loss: 0.7163 | G_loss: 1.0824
Epoch [3000/10000] | D_loss: 0.8886 | G_loss: 1.2037
Epoch [4000/10000] | D_loss: 0.7883 | G_loss: 1.1231
Epoch [5000/10000] | D_loss: 0.7736 | G_loss: 1.5894
Epoch [6000/10000] | D_loss: 0.4905 | G_loss: 1.3961
Epoch [7000/10000] | D_loss: 0.8555 | G_loss: 1.8729
Epoch [8000/10000] | D_loss: 0.5514 | G_loss: 1.6928
Epoch [9000/10000] | D_loss: 0.7020 | G_loss: 1.4983
Epoch [10000/10000] | D_loss: 0.6022 | G_loss: 1.4150
3. 生成合成样本...
增强后数据分布:
target
0 276
1 165
Name: count, dtype: int64
4. 评估模型性能...
原始数据性能:
precision recall f1-score support
0 0.79 0.73 0.76 41
1 0.79 0.84 0.82 50
accuracy 0.79 91
macro avg 0.79 0.79 0.79 91
weighted avg 0.79 0.79 0.79 91
GAN增强后性能:
precision recall f1-score support
0 0.99 0.88 0.93 83
1 0.83 0.98 0.90 50
accuracy 0.92 133
macro avg 0.91 0.93 0.91 133
weighted avg 0.93 0.92 0.92 133
F1分数提升: 0.17