import pandas as pd
import numpy as np
from scipy import stats
from scipy.stats import chi2_contingency, normaltest
from scipy.signal import savgol_filter
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.feature_selection import mutual_info_classif
import warnings
warnings.filterwarnings('ignore')
def create_sample_data():
"""创建模拟的客户数据集"""
np.random.seed(42)
n_customers = 10000
data = {
'customer_id': range(1, n_customers + 1),
'age': np.random.normal(45, 15, n_customers).astype(int),
'gender': np.random.choice(['M', 'F'], n_customers),
'income': np.random.lognormal(10, 0.5, n_customers),
'education': np.random.choice(['High School', 'Bachelor', 'Master', 'PhD'],
n_customers, p=[0.4, 0.35, 0.2, 0.05]),
'tenure_months': np.random.exponential(24, n_customers),
'monthly_charges': np.random.normal(70, 20, n_customers),
'total_charges': np.random.exponential(1000, n_customers),
'payment_method': np.random.choice(['Credit Card', 'Bank Transfer', 'Electronic Check', 'Mailed Check'],
n_customers, p=[0.3, 0.25, 0.25, 0.2]),
'phone_service': np.random.choice([0, 1], n_customers, p=[0.1, 0.9]),
'internet_service': np.random.choice(['No', 'DSL', 'Fiber'], n_customers, p=[0.2, 0.4, 0.4]),
'online_security': np.random.choice([0, 1], n_customers, p=[0.5, 0.5]),
'tech_support': np.random.choice([0, 1], n_customers, p=[0.6, 0.4]),
'support_calls': np.random.poisson(2, n_customers),
'late_payments': np.random.poisson(1, n_customers),
'contract_type': np.random.choice(['Month-to-month', 'One year', 'Two year'],
n_customers, p=[0.5, 0.3, 0.2]),
}
df = pd.DataFrame(data)
churn_prob = (
0.1 +
0.3 * (df['tenure_months'] < 12) +
0.2 * (df['monthly_charges'] > 80) +
0.1 * (df['support_calls'] > 3) +
0.15 * (df['late_payments'] > 2) +
0.2 * (df['contract_type'] == 'Month-to-month')
)
df['churn'] = np.random.binomial(1, np.clip(churn_prob, 0, 1), n_customers)
return df
def basic_feature_engineering(df):
"""基础特征工程"""
print("=== 基础特征工程 ===")
df['age_group'] = pd.cut(df['age'],
bins=[0, 25, 35, 45, 55, 100],
labels=['Young', 'Adult', 'Middle', 'Senior', 'Elder'])
df['income_quartile'] = pd.qcut(df['income'], 4, labels=['Low', 'Medium-Low', 'Medium-High', 'High'])
df['avg_monthly_spend'] = df['total_charges'] / (df['tenure_months'] + 1)
df['charge_per_month_ratio'] = df['monthly_charges'] / df['avg_monthly_spend']
df['customer_value_score'] = (
df['total_charges'] * 0.4 +
df['tenure_months'] * 10 * 0.3 +
(df['monthly_charges'] * df['tenure_months']) * 0.3
)
service_cols = ['phone_service', 'online_security', 'tech_support']
df['service_usage_score'] = df[service_cols].sum(axis=1)
df['problematic_customer'] = (
(df['support_calls'] > df['support_calls'].quantile(0.8)) |
(df['late_payments'] > df['late_payments'].quantile(0.8))
).astype(int)
return df
def advanced_numerical_features(df):
"""使用numpy进行高级数值特征构造"""
print("=== 高级数值特征构造 ===")
df['log_income'] = np.log1p(df['income'])
df['log_total_charges'] = np.log1p(df['total_charges'])
df['sqrt_tenure'] = np.sqrt(df['tenure_months'])
df['sqrt_monthly_charges'] = np.sqrt(df['monthly_charges'])
df['boxcox_income'] = np.sign(df['income']) * np.power(np.abs(df['income']), 0.3)
numerical_cols = ['age', 'income', 'tenure_months', 'monthly_charges', 'total_charges']
scaler = StandardScaler()
scaled_features = scaler.fit_transform(df[numerical_cols])
for i, col in enumerate(numerical_cols):
df[f'{col}_scaled'] = scaled_features[:, i]
df['age_income_interaction'] = df['age'] * df['income'] / 1000000
df['tenure_charges_interaction'] = df['tenure_months'] * df['monthly_charges']
df['charges_to_income_ratio'] = df['monthly_charges'] / (df['income'] / 12 + 1)
df['tenure_to_age_ratio'] = df['tenure_months'] / (df['age'] * 12)
df['income_outlier'] = (np.abs(stats.zscore(df['income'])) > 3).astype(int)
df['charges_outlier'] = (np.abs(stats.zscore(df['monthly_charges'])) > 3).astype(int)
return df
def statistical_features(df):
"""使用scipy进行统计特征构造"""
print("=== 统计特征构造 ===")
groupby_features = []
age_group_stats = df.groupby('age_group').agg({
'monthly_charges': ['mean', 'std', 'median'],
'total_charges': ['mean', 'std'],
'support_calls': 'mean'
}).round(2)
age_group_stats.columns = ['_'.join(col) for col in age_group_stats.columns]
age_group_stats = age_group_stats.add_prefix('age_group_')
df = df.merge(age_group_stats, left_on='age_group', right_index=True, how='left')
income_quartile_stats = df.groupby('income_quartile').agg({
'monthly_charges': 'mean',
'tenure_months': 'mean',
'churn': 'mean'
}).round(3)
income_quartile_stats.columns = [f'income_quartile_{col}' for col in income_quartile_stats.columns]
df = df.merge(income_quartile_stats, left_on='income_quartile', right_index=True, how='left')
numerical_features = ['monthly_charges', 'total_charges', 'tenure_months', 'support_calls']
for col in numerical_features:
group_mean_col = f'age_group_{col}_mean'
if group_mean_col in df.columns:
df[f'{col}_deviation_from_age_group'] = df[col] - df[group_mean_col]
df[f'{col}_percentile'] = df[col].rank(pct=True)
df = df.sort_values('customer_id')
df[f'{col}_rolling_mean'] = df[col].rolling(window=100, min_periods=1).mean()
df[f'{col}_rolling_std'] = df[col].rolling(window=100, min_periods=1).std()
for col in ['income', 'monthly_charges', 'total_charges']:
df[f'{col}_skewness'] = stats.skew(df[col])
df[f'{col}_kurtosis'] = stats.kurtosis(df[col])
return df
def advanced_feature_engineering(df):
"""高级特征工程技术"""
print("=== 高级特征工程 ===")
categorical_cols = ['gender', 'education', 'payment_method', 'internet_service', 'contract_type']
for col in categorical_cols:
target_mean = df.groupby(col)['churn'].mean()
df[f'{col}_target_encoded'] = df[col].map(target_mean)
for col in categorical_cols:
freq_map = df[col].value_counts(normalize=True)
df[f'{col}_frequency'] = df[col].map(freq_map)
def calculate_woe(df, col, target):
"""计算WOE值"""
crosstab = pd.crosstab(df[col], df[target])
crosstab['total'] = crosstab.sum(axis=1)
crosstab['bad_rate'] = crosstab[1] / crosstab['total']
crosstab['good_rate'] = crosstab[0] / crosstab['total']
total_bad = df[target].sum()
total_good = len(df) - total_bad
crosstab['bad_dist'] = crosstab[1] / total_bad
crosstab['good_dist'] = crosstab[0] / total_good
crosstab['bad_dist'] = crosstab['bad_dist'].replace(0, 0.0001)
crosstab['good_dist'] = crosstab['good_dist'].replace(0, 0.0001)
crosstab['woe'] = np.log(crosstab['bad_dist'] / crosstab['good_dist'])
return crosstab['woe'].to_dict()
for col in ['education', 'contract_type']:
woe_map = calculate_woe(df, col, 'churn')
df[f'{col}_woe'] = df[col].map(woe_map)
from sklearn.cluster import KMeans
cluster_features = ['age', 'income', 'tenure_months', 'monthly_charges', 'total_charges']
scaler = StandardScaler()
scaled_data = scaler.fit_transform(df[cluster_features])
kmeans = KMeans(n_clusters=5, random_state=42)
df['customer_cluster'] = kmeans.fit_predict(scaled_data)
cluster_centers = kmeans.cluster_centers_
distances = np.sqrt(((scaled_data - cluster_centers[df['customer_cluster']]) ** 2).sum(axis=1))
df['cluster_distance'] = distances
df['age_tenure_cross'] = df['age'] * df['tenure_months']
df['income_charges_cross'] = df['income'] * df['monthly_charges']
df['gender_education'] = df['gender'] + '_' + df['education']
df['contract_payment'] = df['contract_type'] + '_' + df['payment_method']
return df
def feature_selection_analysis(df):
"""特征选择和重要性分析"""
print("=== 特征选择和重要性分析 ===")
feature_cols = [col for col in df.columns
if col not in ['customer_id', 'churn', 'age_group', 'income_quartile']]
categorical_cols = df[feature_cols].select_dtypes(include=['object']).columns
le_dict = {}
df_encoded = df.copy()
for col in categorical_cols:
if col in df_encoded.columns:
le = LabelEncoder()
df_encoded[col] = le.fit_transform(df_encoded[col].astype(str))
le_dict[col] = le
df_encoded = df_encoded.fillna(df_encoded.median(numeric_only=True))
df_encoded = df_encoded.fillna(0)
X = df_encoded[feature_cols]
y = df_encoded['churn']
mi_scores = mutual_info_classif(X, y, random_state=42)
mi_df = pd.DataFrame({
'feature': feature_cols,
'mutual_info_score': mi_scores
}).sort_values('mutual_info_score', ascending=False)
print("Top 20 features by mutual information:")
print(mi_df.head(20))
corr_matrix = df_encoded[feature_cols + ['churn']].corr()
target_corr = corr_matrix['churn'].abs().sort_values(ascending=False)
print("\nTop 15 features by correlation with target:")
print(target_corr.head(15))
feature_importance = pd.DataFrame({
'feature': feature_cols,
'mutual_info': mi_scores,
'correlation': [abs(corr_matrix.loc[feat, 'churn']) for feat in feature_cols]
})
feature_importance['combined_score'] = (
feature_importance['mutual_info'] * 0.6 +
feature_importance['correlation'] * 0.4
)
feature_importance = feature_importance.sort_values('combined_score', ascending=False)
return feature_importance, df_encoded
def main():
"""主函数"""
print("开始数据挖掘特征工程实战案例\n")
print("1. 创建样本数据...")
df = create_sample_data()
print(f"数据集大小: {df.shape}")
print(f"流失率: {df['churn'].mean():.3f}")
print(f"数据预览:\n{df.head()}\n")
df = basic_feature_engineering(df)
print(f"基础特征工程后列数: {df.shape[1]}\n")
df = advanced_numerical_features(df)
print(f"数值特征工程后列数: {df.shape[1]}\n")
df = statistical_features(df)
print(f"统计特征工程后列数: {df.shape[1]}\n")
df = advanced_feature_engineering(df)
print(f"高级特征工程后列数: {df.shape[1]}\n")
feature_importance, df_encoded = feature_selection_analysis(df)
print(f"\n最终特征数量: {len(feature_importance)}")
print("\n=== 特征工程总结 ===")
print(f"原始特征数: 15")
print(f"最终特征数: {df.shape[1] - 2}")
print(f"特征工程增加特征数: {df.shape[1] - 2 - 15}")
print("\n特征工程技术应用:")
print("✓ pandas: 数据处理、分组统计、分箱、编码")
print("✓ numpy: 数学变换、多项式特征、异常值检测")
print("✓ scipy: 统计检验、分布分析、正态性检验")
print("✓ sklearn: 聚类、标准化、特征选择")
print(f"\n数据质量检查:")
print(f"缺失值: {df.isnull().sum().sum()}")
print(f"重复行: {df.duplicated().sum()}")
return df, feature_importance
if __name__ == "__main__":
df_final, importance_df = main()
print("\n=== 关键特征统计 ===")
key_features = ['customer_value_score', 'charges_to_income_ratio',
'problematic_customer', 'cluster_distance']
for feature in key_features:
if feature in df_final.columns:
print(f"\n{feature}:")
print(f" 均值: {df_final[feature].mean():.3f}")
print(f" 标准差: {df_final[feature].std():.3f}")
print(f" 与流失的相关性: {df_final[feature].corr(df_final['churn']):.3f}")