当前位置: 首页 > news >正文

在 PySpark 中解锁窗口函数的力量,实现高级数据转换

本篇文章Mastering PySpark Window Functions: A Practical Guide to Time-Based Analytics适合数据分析和工程师入门了解PySpark的窗口函数。文章的亮点在于详细介绍了窗口函数的基本概念及其在销售数据分析中的实际应用,帮助读者理解如何进行复杂的数据计算而无需多次连接或聚合。


文章目录

  • 1 理解窗口函数:基础
  • 2 搭建分析管道
  • 3 客户级别聚合:理解历史模式
  • 4 滚动窗口:捕捉时间趋势
  • 5 关键概念解释
  • 6 月度滞后特征:季节性模式分析
  • 7 性能优化技巧
  • 8 行业级别基准
  • 9 结论


PySpark logo image

窗口函数是 Apache Spark 中最强大但却未被充分利用的功能之一。它们允许您对与当前行相关的行执行复杂的计算,而无需昂贵的连接或多次聚合。在这篇文章中,我们将通过一个销售分析场景来探讨窗口函数的实际应用。

1 理解窗口函数:基础

可以将窗口函数视为一种在处理每个单独行时“窥视”相邻行的方式。与将多行合并为一行的常规聚合不同,窗口函数会为每个输入行返回一个结果,同时考虑一个相关行的“窗口”。

窗口函数的基本组成包括:

  • Partition By(按分区):将行分组到逻辑分区中
  • Order By(按排序):定义每个分区内的排序
  • Frame(框架):指定分区内要包含在计算中的行

2 搭建分析管道

让我们从一个包含交易记录的销售数据集开始。我们将使用各种窗口函数技术来构建预测支付延迟的特征。

from pyspark.sql import functions as F
from pyspark.sql.window import WindowsalesDF = salesDF.withColumn('transaction_day', F.dayofmonth(F.col('transaction_date')))
salesDF = salesDF.withColumn('transaction_month', F.month(F.col('transaction_date')))
salesDF = salesDF.withColumn('transaction_year', F.year(F.col('transaction_date')))
salesDF = salesDF.withColumn('day_of_week', F.dayofweek(F.col('transaction_date')) - 1)
salesDF = salesDF.withColumn('payment_day', F.dayofmonth(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_month', F.month(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_year', F.year(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_day_of_week', F.dayofweek(F.col('payment_due_date')) - 1)

3 客户级别聚合:理解历史模式

在深入了解窗口函数之前,我们通常需要客户级别的统计数据。这些数据为理解当前行为是典型还是异常提供了背景。

salesDF = salesDF.join(salesDF.groupBy('client_id', 'transaction_type').agg(F.mean('delay_days').alias('client_delay_average'),F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('client_delay_weighted_avg'),F.stddev('delay_days').alias('client_delay_stddev'),F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),F.sum('invoice_amount')) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),F.sum('invoice_amount')), 2)).alias('client_delay_weighted_stddev'),F.expr('percentile_approx(delay_days, 0.5)').alias('client_delay_median'),F.count('delay_days').alias('client_transaction_count')),on=['client_id', 'transaction_type'],how='left'
)

加权标准差的计算可能看起来很复杂,但它使用的是数学公式:E[X2]−(E[X])2\sqrt{E[X^2] - (E[X])^2}E[X2](E[X])2,其中较大的交易对标准差计算的影响更大。

4 滚动窗口:捕捉时间趋势

这就是窗口函数真正发挥作用的地方。滚动窗口允许我们计算滑动时间段内的指标,捕捉客户行为中的趋势和季节性。

time_windows = [30, 90, 365]for days in time_windows:rolling_window = (Window.partitionBy('client_id', 'transaction_type').orderBy(F.col('transaction_date').cast("timestamp").cast("long")).rangeBetween(-days * 86400, -1))salesDF = salesDF.withColumn(f'delay_rolling_avg_{days}d',F.avg('delay_days').over(rolling_window))salesDF = salesDF.withColumn(f'delay_rolling_std_{days}d',F.stddev('delay_days').over(rolling_window))salesDF = salesDF.withColumn(f'delay_rolling_weighted_avg_{days}d',F.try_divide(F.sum(F.col('delay_days') * F.col('invoice_amount')).over(rolling_window),F.sum(F.col('invoice_amount')).over(rolling_window)))salesDF = salesDF.withColumn(f'delay_rolling_weighted_std_{days}d',F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)).over(rolling_window),F.sum(F.col('invoice_amount')).over(rolling_window)) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')).over(rolling_window),F.sum(F.col('invoice_amount')).over(rolling_window)), 2)))

5 关键概念解释

Range(范围)与 Rows(行)窗口:我们使用 rangeBetween(-days * 86400, -1) 而不是 rowsBetween(),因为我们想要一个基于时间的窗口。这确保我们能够精确地捕获指定天数的数据,而与交易频率无关。

加权计算:通过按发票金额对指标进行加权,我们赋予了较大交易更高的重要性,这通常能更好地代表客户的支付行为。

排除当前行:将 -1 作为上限可以排除当前交易,从而防止预测模型中的数据泄露。

6 月度滞后特征:季节性模式分析

为了进行长期趋势分析,我们可以创建月度聚合并生成滞后特征以捕捉季节性模式。

monthlyDF = salesDF.groupBy("client_id", "transaction_type", "transaction_year", "transaction_month"
).agg(F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('monthly_weighted_delay_avg'),F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),F.sum('invoice_amount')) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),F.sum('invoice_amount')), 2)).alias('monthly_weighted_delay_std')
)monthly_window = Window.partitionBy("client_id", "transaction_type") \.orderBy("transaction_year", "transaction_month")for lag_months in range(1, 13):monthlyDF = monthlyDF.withColumn(f"delay_avg_lag_{lag_months}m",F.lag("monthly_weighted_delay_avg", lag_months).over(monthly_window))monthlyDF = monthlyDF.withColumn(f"delay_std_lag_{lag_months}m",F.lag("monthly_weighted_delay_std", lag_months).over(monthly_window))salesDF = salesDF.join(monthlyDF,on=["client_id", "transaction_type", "transaction_year", "transaction_month"],how="left"
)

7 性能优化技巧

分区策略:始终根据逻辑上对数据进行分组的高基数列进行分区。这可以最大限度地减少数据混洗。

窗口框架优化:使用尽可能限制性的框架。无界窗口开销大且通常不必要。

缓存:当对同一数据集执行多个窗口操作时,考虑缓存中间结果。

salesDF.cache()

8 行业级别基准

不要忘记创建行业或细分市场级别的基准进行比较:

salesDF = salesDF.join(salesDF.groupBy('industry_sector', "transaction_type").agg(F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('industry_delay_avg'),F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),F.sum('invoice_amount')) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),F.sum('invoice_amount')), 2)).alias('industry_delay_std')),on=['industry_sector', 'transaction_type'],how='left'
)

9 结论

窗口函数解锁了 PySpark 中复杂的分析能力,使您能够为机器学习和高级分析创建丰富的特征集。关键在于理解何时使用不同类型的窗口:

  • 无界窗口:用于累积指标
  • 基于范围的窗口:用于时间序列分析
  • 基于行的窗口:用于排名和百分位数
  • 滞后函数:用于趋势和季节性检测

通过将这些技术与适当的分区和优化策略相结合,您可以构建健壮、可扩展的分析管道,捕捉数据中复杂的时间模式。

开始在您自己的数据集中尝试这些模式,您很快就会发现窗口函数在将原始数据转化为可操作洞察方面的真正力量。


文章转载自:

http://PEeEcVqi.qyhcg.cn
http://smVUVGHM.qyhcg.cn
http://cPnteCq8.qyhcg.cn
http://HtWVEYYJ.qyhcg.cn
http://bAo5hsGq.qyhcg.cn
http://kBHQ09GE.qyhcg.cn
http://F11p1YhZ.qyhcg.cn
http://u89OSRmc.qyhcg.cn
http://oal4514L.qyhcg.cn
http://GmcSIe6x.qyhcg.cn
http://QkDRFdO2.qyhcg.cn
http://N47TaGqJ.qyhcg.cn
http://O349VEEm.qyhcg.cn
http://hPDZfKSJ.qyhcg.cn
http://5uVuh3KJ.qyhcg.cn
http://o381jcfE.qyhcg.cn
http://ywZ9AYOn.qyhcg.cn
http://epm4t42d.qyhcg.cn
http://fQDG5ZHP.qyhcg.cn
http://df7lK553.qyhcg.cn
http://Aio7YKnE.qyhcg.cn
http://aBUTvR1k.qyhcg.cn
http://gZoTq2uS.qyhcg.cn
http://V0UNGfks.qyhcg.cn
http://VB2siRNh.qyhcg.cn
http://YenyDyGA.qyhcg.cn
http://d2cnoCJE.qyhcg.cn
http://Jx1fAlHg.qyhcg.cn
http://rNZVkQJV.qyhcg.cn
http://b8cdoyN2.qyhcg.cn
http://www.dtcms.com/a/362410.html

相关文章:

  • 什么是Token?——理解自然语言处理中的基本单位
  • 毕业项目推荐:68-基于yolov8/yolov5/yolo11的水稻虫害检测识别系统(Python+卷积神经网络)
  • Python OpenCV图像处理与深度学习: Python OpenCV图像配准入门
  • 深度学习中的数据增强实战:基于PyTorch的图像分类任务优化
  • 云计算学习100天-第43天-cobbler
  • 【linux仓库】万物至简的设计典范:如何用‘文件’这一个概念操纵整个Linux世界?
  • 【数据分享】土地利用shp数据分享-内蒙古
  • Python应用——ffmpeg处理音视频的常见场景
  • 谷歌AdSense套利是什么?怎么做才能赚到钱
  • 安卓QQ闪照获取软件(支持TIM)
  • 各省市信息化项目管理办法中的网络安全等级保护如何规定的?
  • 智能化企业级CRM系统开发实战:飞算JavaAI全流程体验
  • 【音视频】火山引擎实时、低延时拥塞控制算法的优化实践
  • 在 Delphi 5 中获取 Word 文档页数的方法
  • ⸢ 肆 ⸥ ⤳ 默认安全:安全建设方案 ➭ a.信息安全基线
  • 在线宠物用品|基于vue的在线宠物用品交易网站(源码+数据库+文档)
  • 从Web2到Web3:一场重塑数字未来的“静默革命”
  • OpenMMLab 模型部署利器:MMDeploy 详细介绍
  • 小学一到六年级语文/英语/数学作业出题布置网站源码 支持生成PDF和打印
  • Windows 电脑发现老是自动访问外网的域名排障步骤
  • 《微服务协作实战指南:构建全链路稳健性的防御体系》
  • 公司电脑监控软件应该怎么选择?五款超实用的公司电脑监控软件推荐
  • 云电脑是什么?与普通电脑的区别在哪里?——天翼云电脑体验推荐
  • 从电脑底层到进程创建:一篇看懂冯诺依曼、OS和进程
  • Netty从0到1系列之I/O模型
  • 基于Python毕业设计推荐:基于Django的全国降水分析可视化系统
  • 使用 qmake 生成 Makefile,Makefile 转换为 Qt 的 .pro 文件
  • npy可视化方法
  • 【Day 42】Shell-expect和sed
  • 量子计算+AI成竞争关键领域,谷歌/微软/微美全息追赶布局步入冲刺拐点!