本篇文章Mastering PySpark Window Functions: A Practical Guide to Time-Based Analytics适合数据分析和工程师入门了解PySpark的窗口函数。文章的亮点在于详细介绍了窗口函数的基本概念及其在销售数据分析中的实际应用,帮助读者理解如何进行复杂的数据计算而无需多次连接或聚合。
文章目录
窗口函数是 Apache Spark 中最强大但却未被充分利用的功能之一。它们允许您对与当前行相关的行执行复杂的计算,而无需昂贵的连接或多次聚合。在这篇文章中,我们将通过一个销售分析场景来探讨窗口函数的实际应用。
1 理解窗口函数:基础
可以将窗口函数视为一种在处理每个单独行时“窥视”相邻行的方式。与将多行合并为一行的常规聚合不同,窗口函数会为每个输入行返回一个结果,同时考虑一个相关行的“窗口”。
窗口函数的基本组成包括:
- Partition By(按分区):将行分组到逻辑分区中
- Order By(按排序):定义每个分区内的排序
- Frame(框架):指定分区内要包含在计算中的行
2 搭建分析管道
让我们从一个包含交易记录的销售数据集开始。我们将使用各种窗口函数技术来构建预测支付延迟的特征。
from pyspark.sql import functions as F
from pyspark.sql.window import Window
salesDF = salesDF.withColumn('transaction_day', F.dayofmonth(F.col('transaction_date')))
salesDF = salesDF.withColumn('transaction_month', F.month(F.col('transaction_date')))
salesDF = salesDF.withColumn('transaction_year', F.year(F.col('transaction_date')))
salesDF = salesDF.withColumn('day_of_week', F.dayofweek(F.col('transaction_date')) - 1)
salesDF = salesDF.withColumn('payment_day', F.dayofmonth(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_month', F.month(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_year', F.year(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_day_of_week', F.dayofweek(F.col('payment_due_date')) - 1)
3 客户级别聚合:理解历史模式
在深入了解窗口函数之前,我们通常需要客户级别的统计数据。这些数据为理解当前行为是典型还是异常提供了背景。
salesDF = salesDF.join(
salesDF.groupBy('client_id', 'transaction_type')
.agg(
F.mean('delay_days').alias('client_delay_average'),
F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('client_delay_weighted_avg'),
F.stddev('delay_days').alias('client_delay_stddev'),
F.sqrt(
F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),
F.sum('invoice_amount')) -
F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),
F.sum('invoice_amount')), 2)
).alias('client_delay_weighted_stddev'),
F.expr('percentile_approx(delay_days, 0.5)').alias('client_delay_median'),
F.count('delay_days').alias('client_transaction_count')
),
on=['client_id', 'transaction_type'],
how='left'
)
加权标准差的计算可能看起来很复杂,但它使用的是数学公式: E [ X 2 ] − ( E [ X ] ) 2 \sqrt{E[X^2] - (E[X])^2} E[X2]−(E[X])2,其中较大的交易对标准差计算的影响更大。
4 滚动窗口:捕捉时间趋势
这就是窗口函数真正发挥作用的地方。滚动窗口允许我们计算滑动时间段内的指标,捕捉客户行为中的趋势和季节性。
time_windows = [30, 90, 365]
for days in time_windows:
rolling_window = (
Window.partitionBy('client_id', 'transaction_type')
.orderBy(F.col('transaction_date').cast("timestamp").cast("long"))
.rangeBetween(-days * 86400, -1)
)
salesDF = salesDF.withColumn(
f'delay_rolling_avg_{days}d',
F.avg('delay_days').over(rolling_window)
)
salesDF = salesDF.withColumn(
f'delay_rolling_std_{days}d',
F.stddev('delay_days').over(rolling_window)
)
salesDF = salesDF.withColumn(
f'delay_rolling_weighted_avg_{days}d',
F.try_divide(
F.sum(F.col('delay_days') * F.col('invoice_amount')).over(rolling_window),
F.sum(F.col('invoice_amount')).over(rolling_window)
)
)
salesDF = salesDF.withColumn(
f'delay_rolling_weighted_std_{days}d',
F.sqrt(
F.try_divide(
F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)).over(rolling_window),
F.sum(F.col('invoice_amount')).over(rolling_window)
) -
F.pow(
F.try_divide(
F.sum(F.col('invoice_amount') * F.col('delay_days')).over(rolling_window),
F.sum(F.col('invoice_amount')).over(rolling_window)
), 2
)
)
)
5 关键概念解释
Range(范围)与 Rows(行)窗口:我们使用 rangeBetween(-days * 86400, -1)
而不是 rowsBetween()
,因为我们想要一个基于时间的窗口。这确保我们能够精确地捕获指定天数的数据,而与交易频率无关。
加权计算:通过按发票金额对指标进行加权,我们赋予了较大交易更高的重要性,这通常能更好地代表客户的支付行为。
排除当前行:将 -1
作为上限可以排除当前交易,从而防止预测模型中的数据泄露。
6 月度滞后特征:季节性模式分析
为了进行长期趋势分析,我们可以创建月度聚合并生成滞后特征以捕捉季节性模式。
monthlyDF = salesDF.groupBy(
"client_id", "transaction_type", "transaction_year", "transaction_month"
).agg(
F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('monthly_weighted_delay_avg'),
F.sqrt(
F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),
F.sum('invoice_amount')) -
F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),
F.sum('invoice_amount')), 2)
).alias('monthly_weighted_delay_std')
)
monthly_window = Window.partitionBy("client_id", "transaction_type") \
.orderBy("transaction_year", "transaction_month")
for lag_months in range(1, 13):
monthlyDF = monthlyDF.withColumn(
f"delay_avg_lag_{lag_months}m",
F.lag("monthly_weighted_delay_avg", lag_months).over(monthly_window)
)
monthlyDF = monthlyDF.withColumn(
f"delay_std_lag_{lag_months}m",
F.lag("monthly_weighted_delay_std", lag_months).over(monthly_window)
)
salesDF = salesDF.join(
monthlyDF,
on=["client_id", "transaction_type", "transaction_year", "transaction_month"],
how="left"
)
7 性能优化技巧
分区策略:始终根据逻辑上对数据进行分组的高基数列进行分区。这可以最大限度地减少数据混洗。
窗口框架优化:使用尽可能限制性的框架。无界窗口开销大且通常不必要。
缓存:当对同一数据集执行多个窗口操作时,考虑缓存中间结果。
salesDF.cache()
8 行业级别基准
不要忘记创建行业或细分市场级别的基准进行比较:
salesDF = salesDF.join(
salesDF.groupBy('industry_sector', "transaction_type")
.agg(
F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('industry_delay_avg'),
F.sqrt(
F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),
F.sum('invoice_amount')) -
F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),
F.sum('invoice_amount')), 2)
).alias('industry_delay_std')
),
on=['industry_sector', 'transaction_type'],
how='left'
)
9 结论
窗口函数解锁了 PySpark 中复杂的分析能力,使您能够为机器学习和高级分析创建丰富的特征集。关键在于理解何时使用不同类型的窗口:
- 无界窗口:用于累积指标
- 基于范围的窗口:用于时间序列分析
- 基于行的窗口:用于排名和百分位数
- 滞后函数:用于趋势和季节性检测
通过将这些技术与适当的分区和优化策略相结合,您可以构建健壮、可扩展的分析管道,捕捉数据中复杂的时间模式。
开始在您自己的数据集中尝试这些模式,您很快就会发现窗口函数在将原始数据转化为可操作洞察方面的真正力量。