import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
import matplotlib.pyplot as plt
import seaborn as sns
import time
# 加载数据
df = pd.read_csv('data.csv')
# 数据预处理
def preprocess_data(df):
# 删除ID列
df = df.drop('Id', axis=1)
# 处理缺失值
num_features = ['Annual Income', 'Years in current job', 'Tax Liens',
'Number of Open Accounts', 'Years of Credit History',
'Maximum Open Credit', 'Number of Credit Problems',
'Months since last delinquent', 'Bankruptcies',
'Current Loan Amount', 'Current Credit Balance',
'Monthly Debt', 'Credit Score']
cat_features = ['Home Ownership', 'Purpose', 'Term']
# 数值特征:用中位数填充缺失值
num_imputer = SimpleImputer(strategy='median')
df[num_features] = num_imputer.fit_transform(df[num_features])
# 分类特征:用众数填充缺失值
cat_imputer = SimpleImputer(strategy='most_frequent')
df[cat_features] = cat_imputer.fit_transform(df[cat_features])
# 创建特征转换器
preprocessor = ColumnTransformer(
transformers=[
('num', StandardScaler(), num_features),
('cat', OneHotEncoder(handle_unknown='ignore'), cat_features)
])
# 分离特征和目标变量
X = df.drop('Credit Default', axis=1)
y = df['Credit Default']
# 应用预处理
X_processed = preprocessor.fit_transform(X)
# 获取特征名称(用于理解模型)
num_feature_names = num_features
cat_feature_names = preprocessor.named_transformers_['cat'].get_feature_names_out(cat_features)
all_feature_names = np.concatenate([num_feature_names, cat_feature_names])
return X_processed, y, all_feature_names
# 预处理数据
X, y, feature_names = preprocess_data(df)
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# 转换为PyTorch张量
X_train_tensor = torch.tensor(X_train.toarray() if hasattr(X_train, 'toarray') else X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32).view(-1, 1)
X_test_tensor = torch.tensor(X_test.toarray() if hasattr(X_test, 'toarray') else X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test.values, dtype=torch.float32).view(-1, 1)
# 设置GPU设备
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")
# 定义神经网络模型
class CreditRiskModel(nn.Module):
def __init__(self, input_size):
super(CreditRiskModel, self).__init__()
self.model = nn.Sequential(
nn.Linear(input_size, 128),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(128, 64),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1),
nn.Sigmoid()
)
def forward(self, x):
return self.model(x)
# 获取输入特征数量
input_size = X_train_tensor.shape[1]
model = CreditRiskModel(input_size).to(device)
# 定义损失函数和优化器
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
# 训练模型
num_epochs = 500
batch_size = 64
train_losses = []
test_losses = []
train_accuracies = []
test_accuracies = []
start_time = time.time()
for epoch in range(num_epochs):
# 训练模式
model.train()
epoch_train_loss = 0.0
correct_train = 0
total_train = 0
# 小批量训练
permutation = torch.randperm(X_train_tensor.size()[0])
for i in range(0, X_train_tensor.size()[0], batch_size):
indices = permutation[i:i+batch_size]
batch_x, batch_y = X_train_tensor[indices].to(device), y_train_tensor[indices].to(device)
# 前向传播
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
epoch_train_loss += loss.item() * batch_x.size(0)
# 计算准确率
predicted = (outputs > 0.5).float()
correct_train += (predicted == batch_y).sum().item()
total_train += batch_y.size(0)
# 计算训练损失和准确率
train_loss = epoch_train_loss / total_train
train_accuracy = correct_train / total_train
train_losses.append(train_loss)
train_accuracies.append(train_accuracy)
# 评估模式
model.eval()
with torch.no_grad():
# 测试集评估
test_outputs = model(X_test_tensor.to(device))
test_loss = criterion(test_outputs, y_test_tensor.to(device))
test_losses.append(test_loss.item())
# 计算测试准确率
predicted_test = (test_outputs > 0.5).float()
correct_test = (predicted_test == y_te