Spark处理过程-案例数据清洗

发布于:2025-05-14 ⋅ 阅读:(20) ⋅ 点赞:(0)

一、缺失值处理

缺失值是数据中最常见的问题之一,处理方法包括删除、填充或预测。

1. 检测缺失值

python

运行

import pandas as pd
import numpy as np

# 创建示例DataFrame
data = {
    'A': [1, np.nan, 3, 4],
    'B': [5, 6, np.nan, 8],
    'C': [9, 10, 11, 12]
}
df = pd.DataFrame(data)

# 检测缺失值
print("缺失值统计:")
print(df.isnull().sum())  # 统计每列的缺失值数量
2. 删除缺失值

python

运行

# 删除包含缺失值的行
df_dropna = df.dropna()

# 删除全为缺失值的列
df_dropna_col = df.dropna(axis=1, how='all')
3. 填充缺失值

python

运行

# 使用固定值填充
df_filled_constant = df.fillna(0)

# 使用均值/中位数/众数填充
df_filled_mean = df.fillna(df.mean())  # 均值填充
df_filled_median = df.fillna(df.median())  # 中位数填充

# 向前/向后填充
df_filled_ffill = df.fillna(method='ffill')  # 向前填充
df_filled_bfill = df.fillna(method='bfill')  # 向后填充

# 使用插值法填充
df_interpolated = df.interpolate()  # 线性插值

二、异常值处理

异常值可能由数据录入错误或真实异常事件导致,处理方法包括删除、修正或离散化。

1. 检测异常值

python

运行

# 使用Z-score检测异常值
from scipy import stats

z_scores = np.abs(stats.zscore(df['A']))
threshold = 3
outliers = df[z_scores > threshold]

# 使用箱线图检测异常值
import matplotlib.pyplot as plt

plt.boxplot(df['A'])
plt.show()
2. 处理异常值

python

运行

# 删除异常值
df_clean = df[(z_scores < threshold)]

# 替换异常值(例如,将超过3倍标准差的值替换为阈值)
df_replaced = df.copy()
df_replaced['A'] = np.where(z_scores > threshold, df['A'].median(), df['A'])

三、重复数据处理

python

运行

# 创建包含重复数据的DataFrame
df_duplicates = pd.DataFrame({
    'A': [1, 2, 2, 3],
    'B': ['x', 'y', 'y', 'z']
})

# 检测重复行
print("重复行检测:")
print(df_duplicates.duplicated())

# 删除重复行
df_cleaned = df_duplicates.drop_duplicates()

四、数据类型转换

确保数据类型与业务需求一致,避免计算错误。

python

运行

# 转换数据类型
df['A'] = df['A'].astype(int)  # 转为整数类型

# 字符串转日期
df['date'] = pd.to_datetime(df['date_string'])

# 处理格式错误(例如,将"$1,000"转为数值1000)
df['price'] = df['price'].str.replace('$', '').str.replace(',', '').astype(float)

五、不一致数据处理

处理大小写、单位、拼写等不一致问题。

python

运行

# 统一大小写
df['name'] = df['name'].str.upper()  # 全部大写

# 统一单位(例如,将英寸转为厘米)
df['height_cm'] = df['height_in'] * 2.54

# 修正拼写错误
df['country'] = df['country'].replace('USA', 'United States')

六、数据标准化 / 归一化

用于将数据缩放到统一范围,常见方法有 Min-Max 缩放和 Z-score 标准化。

python

运行

from sklearn.preprocessing import MinMaxScaler, StandardScaler

# Min-Max缩放(将数据缩放到[0,1]范围)
scaler = MinMaxScaler()
df['A_scaled'] = scaler.fit_transform(df[['A']])

# Z-score标准化(均值为0,标准差为1)
scaler = StandardScaler()
df['A_standardized'] = scaler.fit_transform(df[['A']])

七、文本数据清洗

文本数据常包含噪声,需进行特殊处理。

python

运行

# 去除空格和特殊字符
df['text'] = df['text'].str.strip()  # 去除首尾空格
df['text'] = df['text'].str.replace('[^\w\s]', '')  # 去除标点符号

# 分词和词干提取
import nltk
from nltk.stem import PorterStemmer

nltk.download('punkt')
stemmer = PorterStemmer()

df['tokens'] = df['text'].apply(lambda x: nltk.word_tokenize(x))
df['stemmed'] = df['tokens'].apply(lambda tokens: [stemmer.stem(token) for token in tokens])

八、高级数据清洗(使用 Spark)

对于大规模数据,可使用 Spark 进行分布式清洗。

python

运行

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

spark = SparkSession.builder.appName("DataCleaning").getOrCreate()

# 读取数据
df_spark = spark.read.csv("data.csv", header=True, inferSchema=True)

# 处理缺失值
df_spark = df_spark.na.fill(0, subset=["numeric_col"])  # 填充数值列
df_spark = df_spark.na.fill("unknown", subset=["string_col"])  # 填充字符串列

# 过滤异常值
df_spark = df_spark.filter(col("age") > 0)  # 年龄必须大于0

# 去重
df_spark = df_spark.dropDuplicates()

九、数据清洗流程建议

  1. 探索性分析:先了解数据结构、分布和缺失情况。
  2. 制定清洗策略:根据业务需求和数据特点选择合适的清洗方法。
  3. 逐步清洗:按列或数据类型分步处理,避免错误传播。
  4. 验证结果:清洗后检查数据质量,确保问题已解决。
  5. 记录日志:记录清洗过程和处理结果,便于追溯和审计。

网站公告

今日签到

点亮在社区的每一天
去签到