本文使用上篇文章中生成的稀疏向量进行建模。
因from pyspark_lightgbm import LGBMClassifier和from synapse.ml.lightgbm import LightGBMClassifier在集群上均未安装,故使用原生lgb进行建模。(理论上前两者效率更优,可并行处理数据,而原生lgb只能单机处理)
# 网格寻参
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
import sys
import time
import numpy as np
import lightgbm as lgb
import joblib
from scipy.sparse import csr_matrix
from sklearn.metrics import (
roc_auc_score,
average_precision_score,
f1_score,
precision_score,
recall_score,
accuracy_score,
confusion_matrix
)
from sklearn.model_selection import train_test_split, GridSearchCV
# 配置环境变量
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.config("spark.metrics.conf",
"/opt/mobdata/spark/spark-2.4.3.mob1-bin-2.6.5/conf/metrics.properties") \
.config("spark.driver.memory", "48g") \
.config("spark.driver.maxResultSize", "16g") \
.appName("test_djj") \
.enableHiveSupport() \
.getOrCreate()
# 计时装饰器
def timeit(func):
def wrapper(*args, **kwargs):
start_time = time.time()
print(f"开始: {func.__name__}...")
result = func(*args, **kwargs)
end_time = time.time()
elapsed = end_time - start_time
print(f"完成: {func.__name__} | 耗时: {elapsed:.2f}秒")
return result
return wrapper
# 1. 数据加载并转换为CSR矩阵
@timeit
def load_and_prepare_data():
print("加载数据并转换为CSR矩阵...")
# 直接读取已标记的正负样本(label=1为正样本,label=0为负样本)
df = spark.sql(
"SELECT id, aggregated_vector, label FROM database.table WHERE label IN (0, 1)")
# 收集数据到Driver节点
print("收集数据到Driver节点...")
pdf = df.select(
F.col("aggregated_vector").alias("features"),
F.col("label").alias("label")
).toPandas()
print("提取特征和标签...")
# 准备CSR矩阵的数据
row_indices = []
col_indices = []
data_values = []
labels = []
# 遍历所有样本,构建CSR矩阵
for i, row in pdf.iterrows():
vec = row['features']
size = vec['size']
indices = vec['indices']
values = vec['values']
label = row['label']
for j in range(len(indices)):
row_indices.append(i)
col_indices.append(indices[j])
data_values.append(values[j])
labels.append(label)
# 获取特征维度
num_features = pdf["features"].iloc[0]['size']
num_samples = len(pdf)
# 创建CSR矩阵
X = csr_matrix((data_values, (row_indices, col_indices)),
shape=(num_samples, num_features))
y = np.array(labels)
print(f"数据矩阵形状: {X.shape}")
return X, y
# 2. 准备训练数据(直接使用已标记的正负样本)
@timeit
def prepare_training_data(X, y):
print("准备训练数据...")
# 计算样本权重
positive_count = sum(y == 1)
negative_count = sum(y == 0)
total_count = len(y)
print(f"正样本数量: {positive_count:,}")
print(f"负样本数量: {negative_count:,}")
print(f"总量: {total_count:,}")
# 创建样本权重数组
sample_weight = np.zeros(total_count)
sample_weight[y == 1] = total_count / (2.0 * positive_count)
sample_weight[y == 0] = total_count / (2.0 * negative_count)
return X, y, sample_weight
# 3. LightGBM模型训练(带精简网格寻参)
@timeit
def train_lightgbm_model(X_train, y_train, sample_weight):
print("训练LightGBM模型(带精简网格寻参)...")
# 定义基础模型
base_model = lgb.LGBMClassifier(
objective='binary',
random_state=42,
n_jobs=-1 # 使用所有CPU核心
)
# 精简参数网格 - 专注于最重要的参数
param_grid = {
'num_leaves': [31, 63], # 控制模型复杂度
'min_child_samples': [20, 50], # 防止过拟合
'reg_alpha': [0, 0.1], # L1正则化
'reg_lambda': [0, 0.1], # L2正则化
}
# 创建网格搜索对象
grid_search = GridSearchCV(
estimator=base_model,
param_grid=param_grid,
scoring='roc_auc',
cv=3, # 3折交叉验证
verbose=2,
n_jobs=-1 # 使用所有CPU核心
)
# 执行网格搜索
start_time = time.time()
grid_search.fit(X_train, y_train, sample_weight=sample_weight)
training_time = time.time() - start_time
# 输出最佳参数
print("最佳参数组合:")
for param, value in grid_search.best_params_.items():
print(f"{param}: {value}")
print(f"最佳AUC分数: {grid_search.best_score_:.4f}")
# 获取最佳模型
best_model = grid_search.best_estimator_
return best_model, training_time
# 4. 模型评估
@timeit
def evaluate_model(model, X_test, y_test):
print("模型评估...")
# 使用模型进行预测
start_time = time.time()
y_pred_proba = model.predict_proba(X_test)[:, 1] # 正类的概率
y_pred = (y_pred_proba > 0.5).astype(int)
prediction_time = time.time() - start_time
# 计算评估指标
auc = roc_auc_score(y_test, y_pred_proba)
auprc = average_precision_score(y_test, y_pred_proba)
f1 = f1_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
accuracy = accuracy_score(y_test, y_pred)
# 生成混淆矩阵
cm = confusion_matrix(y_test, y_pred)
tn, fp, fn, tp = cm.ravel()
# 计算各类别指标
precision_pos = tp / (tp + fp) if (tp + fp) > 0 else 0
recall_pos = tp / (tp + fn) if (tp + fn) > 0 else 0
f1_pos = 2 * (precision_pos * recall_pos) / (precision_pos + recall_pos) if (precision_pos + recall_pos) > 0 else 0
precision_neg = tn / (tn + fn) if (tn + fn) > 0 else 0
recall_neg = tn / (tn + fp) if (tn + fp) > 0 else 0
f1_neg = 2 * (precision_neg * recall_neg) / (precision_neg + recall_neg) if (precision_neg + recall_neg) > 0 else 0
# 生成分类报告(包含所有评估指标)
report = f"""
================= 分类报告 =================
混淆矩阵:
[[{tn} {fp}]
[{fn} {tp}]]
正样本 (1):
Precision: {precision_pos:.4f}
Recall: {recall_pos:.4f}
F1 Score: {f1_pos:.4f}
负样本 (0):
Precision: {precision_neg:.4f}
Recall: {recall_neg:.4f}
F1 Score: {f1_neg:.4f}
整体评估指标:
Accuracy: {accuracy:.4f}
AUC: {auc:.4f}
AUPRC: {auprc:.4f}
F1 Score: {f1:.4f}
Precision: {precision:.4f}
Recall: {recall:.4f}
"""
# 返回时间信息和报告
return {
"training_time": None, # 将在主函数中设置
"prediction_time": prediction_time
}, report
# 主函数
def main():
# 1. 加载数据并转换为CSR矩阵
X, y = load_and_prepare_data()
# 2. 准备训练数据
X, y, sample_weight = prepare_training_data(X, y)
# 计算正负样本比例(用于scale_pos_weight参数)
positive_count = sum(y == 1)
negative_count = sum(y == 0)
print(f"正负样本比例: {negative_count / positive_count:.2f}:1")
# 3. 数据分割
print("数据分割...")
X_train, X_test, y_train, y_test, sample_weight_train, _ = train_test_split(
X, y, sample_weight, test_size=0.2, random_state=42
)
print(f"训练集大小: {X_train.shape[0]}")
print(f"测试集大小: {X_test.shape[0]}")
# 4. 训练LightGBM模型
print(f"\n{'=' * 50}")
print("开始训练 LightGBM 模型(带精简网格寻参)")
print(f"{'=' * 50}")
model, training_time = train_lightgbm_model(X_train, y_train, sample_weight_train)
# 5. 模型评估
results, report = evaluate_model(model, X_test, y_test)
results["training_time"] = training_time
# 打印报告(包含所有评估指标)
print(report)
# 打印时间信息
print(f"\n===== 时间统计 =====")
print(f"训练时间: {results['training_time']:.2f}秒")
print(f"预测时间: {results['prediction_time']:.2f}秒")
print("建模流程完成!")
spark.stop()
# [Info] 最佳参数组合:
# [Info] min_child_samples: 50
# [Info] num_leaves: 63
# [Info] reg_alpha: 0.1
# [Info] reg_lambda: 0
# [Info] 最佳AUC分数: 0.6631
# [Info] 完成: train_lightgbm_model | 耗时: 15272.61秒
# [Info] 模型保存成功!
if __name__ == "__main__":
main()