当前位置: 首页 > news >正文

Spark 中的窗口函数

在 Spark 中,窗口函数(Window Functions) 是一种强大的工具,用于在分组数据上执行复杂的聚合操作,同时保留原始数据的行。窗口函数允许你在数据的某个“窗口”内进行计算,例如计算排名、累积和、移动平均等。

窗口函数的核心思想是定义一个“窗口”(通过 Window 类),然后在这个窗口上应用聚合函数(如 row_numberranksumavg 等)。


1. 窗口函数的基本概念

(1)窗口的定义

窗口函数通过 Window 类定义,主要包括以下两个部分:

  • 分区(Partitioning):将数据分为多个组(类似于 GROUP BY)。

  • 排序(Ordering):在每个分区内对数据进行排序。

  • 窗口范围(Frame):定义窗口的大小(如当前行及其前后若干行)。

(2)常见的窗口函数
  • 排名函数row_numberrankdense_rankpercent_rank

  • 聚合函数sumavgminmaxcount

  • 分析函数leadlagfirst_valuelast_value


2. 窗口函数的语法

(1)定义窗口
import org.apache.spark.sql.expressions.Window

val windowSpec = Window
  .partitionBy("column1", "column2") // 按列分区
  .orderBy("column3")                // 按列排序
  .rowsBetween(start, end)           // 定义窗口范围(可选)
  • partitionBy:指定分区的列。

  • orderBy:指定排序的列。

  • rowsBetween:定义窗口的范围(如 Window.unboundedPreceding 表示从分区的第一行开始)。

(2)应用窗口函数
import org.apache.spark.sql.functions._

val resultDF = df.withColumn("new_column", F.row_number().over(windowSpec))
  • withColumn:添加新列。

  • row_number().over(windowSpec):在定义的窗口上应用 row_number 函数。


3. 窗口函数的示例

示例 1:计算每个部门的工资排名

假设有一个 DataFrame,包含用户的姓名、部门和工资:

import org.apache.spark.sql.{SparkSession, functions => F}
import org.apache.spark.sql.expressions.Window

val spark = SparkSession.builder()
  .appName("Window Function Example")
  .master("local[*]")
  .getOrCreate()

// 示例数据
val data = Seq(
  ("Alice", "HR", 3000),
  ("Bob", "IT", 4000),
  ("Charlie", "HR", 3500),
  ("David", "IT", 4500),
  ("Eva", "Finance", 5000)
)

// 创建 DataFrame
val df = spark.createDataFrame(data).toDF("name", "department", "salary")

// 定义窗口:按部门分区,按工资降序排序
val windowSpec = Window
  .partitionBy("department")
  .orderBy(F.desc("salary"))

// 计算每个部门的工资排名
val rankedDF = df.withColumn("rank", F.row_number().over(windowSpec))

// 显示结果
rankedDF.show()

输出:

+-------+----------+------+----+
|   name|department|salary|rank|
+-------+----------+------+----+
|    Eva|   Finance|  5000|   1|
|  Alice|        HR|  3000|   2|
|Charlie|        HR|  3500|   1|
|   David|        IT|  4500|   1|
|    Bob|        IT|  4000|   2|
+-------+----------+------+----+
  • partitionBy("department"):按部门分区。

  • orderBy(F.desc("salary")):按工资降序排序。

  • row_number():计算每行的排名。


示例 2:计算每个部门的累积工资

使用 sum 函数计算每个部门的累积工资:

// 定义窗口:按部门分区,按工资升序排序
val windowSpec = Window
  .partitionBy("department")
  .orderBy("salary")
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)

// 计算累积工资
val cumulativeDF = df.withColumn("cumulative_salary", F.sum("salary").over(windowSpec))

// 显示结果
cumulativeDF.show()

输出:

+-------+----------+------+----------------+
|   name|department|salary|cumulative_salary|
+-------+----------+------+----------------+
|    Eva|   Finance|  5000|            5000|
|  Alice|        HR|  3000|            3000|
|Charlie|        HR|  3500|            6500|
|    Bob|        IT|  4000|            4000|
|   David|        IT|  4500|            8500|
+-------+----------+------+----------------+
  • rowsBetween(Window.unboundedPreceding, Window.currentRow):定义窗口范围为从分区的第一行到当前行。

  • sum("salary").over(windowSpec):计算累积工资。


示例 3:计算每个部门的工资移动平均
使用 avg 函数计算每个部门的工资移动平均(当前行及其前一行):

// 定义窗口:按部门分区,按工资升序排序,窗口范围为当前行及其前一行
val windowSpec = Window
  .partitionBy("department")
  .orderBy("salary")
  .rowsBetween(-1, Window.currentRow)

// 计算移动平均
val movingAvgDF = df.withColumn("moving_avg", F.avg("salary").over(windowSpec))

// 显示结果
movingAvgDF.show()

输出:

+-------+----------+------+----------+
|   name|department|salary| moving_avg|
+-------+----------+------+----------+
|    Eva|   Finance|  5000|    5000.0|
|  Alice|        HR|  3000|    3000.0|
|Charlie|        HR|  3500|    3250.0|
|    Bob|        IT|  4000|    4000.0|
|   David|        IT|  4500|    4250.0|
+-------+----------+------+----------+
  • rowsBetween(-1, Window.currentRow):定义窗口范围为当前行及其前一行。

  • avg("salary").over(windowSpec):计算移动平均。


4. 常见的窗口函数

(1)排名函数
  • row_number():为每行分配一个唯一的序号(从 1 开始)。

  • rank():计算排名,相同值会有相同的排名,后续排名会跳过。

  • dense_rank():计算排名,相同值会有相同的排名,后续排名不会跳过。

  • percent_rank():计算百分比排名。

(2)聚合函数
  • sum():计算窗口内的总和。

  • avg():计算窗口内的平均值。

  • min():计算窗口内的最小值。

  • max():计算窗口内的最大值。

  • count():计算窗口内的行数。

(3)分析函数
  • lead():获取当前行之后的某一行。

  • lag():获取当前行之前的某一行。

  • first_value():获取窗口内的第一个值。

  • last_value():获取窗口内的最后一个值。


5. 窗口范围的定义

窗口范围通过 rowsBetween 或 rangeBetween 定义:

  • rowsBetween(start, end):基于行的偏移量定义窗口范围。

    • Window.unboundedPreceding:从分区的第一行开始。

    • Window.unboundedFollowing:到分区的最后一行结束。

    • Window.currentRow:当前行。

  • rangeBetween(start, end):基于值的范围定义窗口范围(适用于数值或日期类型)。


6. 总结

  • 窗口函数 用于在分组数据上执行复杂的聚合操作,同时保留原始数据的行。

  • 通过 Window 类定义窗口,包括分区、排序和窗口范围。

  • 常见的窗口函数包括排名函数、聚合函数和分析函数。

  • 窗口范围可以通过 rowsBetween 或 rangeBetween 定义。

相关文章:

  • c#知识点补充
  • TensorFlow 与 TensorFlow Lite:核心解析与层应用
  • [数据结构]排序之 直接选择排序
  • 【RTSP】客户端(五)H264 265处理逻辑
  • AI绘画笔记--基础知识
  • LeetCode 每日一题 2025/3/10-2025/3/16
  • 招聘信息|基于SprinBoot+vue的招聘信息管理系统(源码+数据库+文档)
  • 【Linux网络】HTTPS
  • 社交网络分析实战(NetworkX分析Twitter关系图)
  • 第十次CCF-CSP认证(含C++源码)
  • SpringBoot MCP 入门使用
  • Axios 请求取消:从原理到实践
  • XSS漏洞学习(1)
  • 无需 Docker 也能下载镜像!轻松获取 Docker 镜像文件!
  • uniapp scroll组件下拉刷新异步更新数据列表
  • spring-aop笔记
  • 【生日蛋糕——DFS剪枝优化】
  • Vue Date 今天的开始时间与结束时间
  • 【小沐学Web3D】three.js 加载三维模型(vue3)
  • HCIA-AI人工智能笔记1:大模型技术演进与发展历程
  • 二维码网站建设源码/seo管理
  • 手机网站设计建设服务/谷歌广告推广怎么做
  • 郴州网站建设/网站排名优化培训
  • 西安政府网站建设/怎样在百度上发帖子
  • 品牌网站建设费/营销app
  • 动漫做暧昧视频网站/关键词seo培训