当前位置: 首页 > news >正文

Spark专题-第一部分:Spark 核心概述(2)-Spark 应用核心组件剖析

这一篇依然是偏理论向的内容,用两篇理论搭建起Spark的框架,让读者有个基础的认知,下一篇就可以开始sql的内容了

第一部分:Spark 核心概述(2)

Spark 应用核心组件剖析

1. Job, Stage, Task 的三层架构

理解 Spark 的执行模型是掌握其性能调优的关键。Spark 采用了三层执行模型:

Spark Application
Job 1
Job 2
Job N
Stage 1-1
Stage 1-2
Task 1-1-1
Task 1-1-2
Task 1-1-N
Task 1-2-1
Task 1-2-2
Task 1-2-N

定义与关系

  • Job(作业)

    • 由 Action 操作触发的一个完整计算任务
    • 一个应用可能包含多个 Job
    • 例如:count(), collect(), saveAsTextFile() 等操作会触发 Job
  • Stage(阶段)

    • Job 被划分为多个 Stage,划分依据是 Shuffle 操作
    • 每个 Stage 包含一系列可以在单个节点上并行执行的任务
    • Stage 分为两种类型:
      • Shuffle Map Stage:为后续 Stage 准备数据
      • Result Stage:执行最终计算并输出结果
  • Task(任务)

    • Stage 的基本执行单元,每个 Task 处理一个数据分区
    • Task 的数量由数据分区数决定
    • 在 Executor 上执行的实际计算代码

2. DAG(有向无环图):Spark 的调度大脑

DAG(Directed Acyclic Graph)是 Spark 的核心调度模型,它表示数据处理的逻辑流程。

Stage2
Stage1
宽依赖
Shuffle 操作
聚合操作
结果输出
读取数据
过滤操作
映射转换

DAG 的关键概念

  • 窄依赖(Narrow Dependency)

    • 每个父分区最多被一个子分区使用
    • 允许在单个 Stage 内进行流水线执行
    • 例如:map, filter 等操作
  • 宽依赖(Wide Dependency)/ Shuffle 依赖

    • 每个父分区可能被多个子分区使用
    • 需要跨节点数据混洗(Shuffle)
    • 导致 Stage 的划分
    • 例如:groupByKey, reduceByKey 等操作

查看 DAG 的方法

// 在代码中获取 RDD 的 lineage(血统)
rdd.toDebugString// 通过 Spark UI 查看可视化 DAG
// 访问 http://driver-node:4040

3. 分区(Partitioning):并行度的基础

分区是 Spark 实现并行计算的基础,合理设置分区数对性能至关重要。

原始数据
分区 1
分区 2
分区 3
分区 N
Executor 1
Executor 2
Executor 3
Executor N
Task 1
Task 2
Task 3
Task N

分区的关键点

  1. 分区数量

    • 决定了任务的并行度
    • 默认值:spark.default.parallelism 或父 RDD 的分区数
    • 对于读取 HDFS 文件,通常等于 HDFS 块数
  2. 分区策略

    • Hash 分区:根据键的哈希值分配分区
    • Range 分区:根据键的范围分配分区
    • 自定义分区:实现 Partitioner 接口
  3. 分区操作

    • repartition(numPartitions):增加或减少分区数,触发 Shuffle
    • coalesce(numPartitions):只能减少分区数,避免 Shuffle

代码示例:分区操作

# 创建初始 DataFrame
df = spark.range(0, 1000, 1, 10)  # 10个分区
print("初始分区数:", df.rdd.getNumPartitions())# 减少分区数(避免 Shuffle)
df_coalesced = df.coalesce(5)
print("Coalesce 后分区数:", df_coalesced.rdd.getNumPartitions())# 增加分区数(触发 Shuffle)
df_repartitioned = df.repartition(20)
print("Repartition 后分区数:", df_repartitioned.rdd.getNumPartitions())# 按列重新分区
df_repartitioned_by_col = df.repartition(10, "id")
print("按列 Repartition 后分区数:", df_repartitioned_by_col.rdd.getNumPartitions())

4. 执行流程全景图

UserDriverClusterManagerExecutor提交应用申请资源分配资源构建 DAG将 DAG 划分为 Stage将 Stage 分解为 Task分发 Task执行 Task返回任务状态和结果返回最终结果UserDriverClusterManagerExecutor

5. 举例:观察 Job/Stage/Task

通过一个简单例子观察 Spark 的执行层次:

from pyspark.sql import SparkSession
from pyspark.sql.functions import colspark = SparkSession.builder.appName("JobStageTaskDemo").getOrCreate()# 创建示例数据
data = [("Alice", 28, "Engineer"), ("Bob", 35, "Manager"),("Charlie", 42, "Director"),("Diana", 29, "Engineer")]
df = spark.createDataFrame(data, ["name", "age", "job"])# 执行一系列操作
print("=== 开始执行操作 ===")# 第一个 Job:count 操作
print("员工总数:", df.count())# 第二个 Job:filter + groupBy + count
result = df.filter(col("age") > 30).groupBy("job").count()
result.show()# 第三个 Job:写入操作
result.write.mode("overwrite").csv("/tmp/job_demo_output")print("=== 操作完成 ===")
spark.stop()

6. 核心概念总结表

概念描述触发条件/决定因素
Job由 Action 操作触发的完整计算任务count(), save(), collect()
StageJob 的子集,由 Shuffle 操作划分宽依赖(Shuffle 操作)
TaskStage 的基本执行单元,处理一个分区数据分区数量
Partition数据的基本划分单位,决定并行度数据源特性、配置参数

写到最后突然意识到漏了shuffle这个专业词汇,就在这里单独补充吧,不去调整上面的结构,也不新开一篇文章了

深入理解 Shuffle:数据重分布的核心机制

1. Shuffle 的本质:为什么要"洗牌"?

想象一下扑克牌游戏中的洗牌过程:将牌打乱重新分配,确保每个玩家获得随机分布的牌。Spark 中的 Shuffle 也是类似的概念,但目的不同:为了将相同键的数据重新分组到同一台机器上,以便进行聚合、排序等操作。

Shuffle 的简单定义

Shuffle 是 Spark 在不同 Executor 甚至不同节点之间重新分配数据的过程,使得所有相同键的数据能够聚集到同一个分区中。

2. 什么操作会触发 Shuffle?

以下操作通常会导致 Shuffle:

  • 分组操作groupByKey(), reduceByKey(), combineByKey()
  • 连接操作join(), cogroup()
  • 排序操作sortByKey(), sortBy()
  • 重分区repartition(), coalesce()(当增加分区时)

3. Shuffle 的物理实现:两阶段过程

Shuffle 过程分为两个主要阶段:Map 阶段Reduce 阶段

Reduce 阶段-Shuffle Read
Map 阶段-Shuffle Write
网络传输
通过网络获取
相关Map输出
Executor 2
请求数据
合并和排序
不同Map的数据
为Reduce任务
准备输入数据
计算分区映射
Executor 1
分区数据
按Reduce分区排序
并写入磁盘
生成Map输出文件
和索引文件
Map 阶段(Shuffle Write):
  1. 数据处理:每个 Map 任务处理输入分区的数据
  2. 分区划分:根据目标分区数(由Partitioner决定)对输出数据进行划分
  3. 排序和溢出:对每个分区的数据进行排序,可能溢出到磁盘
  4. 文件生成:将最终结果写入磁盘文件,并生成索引文件
Reduce 阶段(Shuffle Read):
  1. 获取元数据:了解需要从哪些Map任务获取数据
  2. 获取数据:通过网络从各个Map任务的输出中获取属于自己分区的数据
  3. 合并数据:将来自不同Map任务的相同键的数据合并在一起
  4. 排序数据:对合并后的数据进行排序(如果需要)

4. Shuffle 的物理存储与网络传输

Reduce节点
节点 3
节点 2
节点 1
写入Shuffle文件
写入Shuffle文件
写入Shuffle文件
从各节点获取数据
网络传输
网络传输
网络传输
Executor 4
网络传输
Executor 3
本地磁盘
Executor 2
本地磁盘
Executor 1
本地磁盘

关键物理细节

  1. 磁盘 I/O

    • Map 任务将中间结果写入本地磁盘
    • Reduce 任务从多个Map任务的磁盘读取数据
    • 大量磁盘读写是Shuffle的主要开销来源
  2. 网络传输

    • 数据在不同节点的Executor之间传输
    • 网络带宽可能成为瓶颈
    • 机架感知(rack-aware)调度可以减少跨机架传输
  3. 内存使用

    • 使用内存缓冲区减少磁盘I/O
    • 可能发生内存溢出,导致额外磁盘写入

5. Shuffle 的性能影响与优化

为什么 Shuffle 代价高昂

  1. 磁盘 I/O:大量中间数据写入和读取
  2. 网络 I/O:数据在节点间传输
  3. 序列化/反序列化:数据在网络传输前需要序列化
  4. 内存压力:排序和合并操作消耗大量内存

优化策略

# 1. 使用reduceByKey而不是groupByKey(减少Shuffle数据量)
# groupByKey: 将所有值发送到Reducer
rdd.groupByKey().mapValues(sum)# reduceByKey: 先在Map端进行局部聚合,减少Shuffle数据量
rdd.reduceByKey(lambda a, b: a + b)# 2. 使用合适的并行度
spark.conf.set("spark.sql.shuffle.partitions", "200")  # 默认200,根据数据量调整# 3. 使用广播连接避免大表Shuffle
from pyspark.sql.functions import broadcast
large_df.join(broadcast(small_df), "key")# 4. 使用合适的序列化格式
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

6. Shuffle 相关配置参数

配置项默认值说明
spark.sql.shuffle.partitions200设置Shuffle操作的分区数
spark.shuffle.compresstrue是否压缩Shuffle输出
spark.shuffle.spill.compresstrue是否压缩溢出的Shuffle数据
spark.shuffle.memoryFraction0.2Executor内存中用于Shuffle的比例
spark.reducer.maxSizeInFlight48mReduce任务一次获取数据的最大大小

7. 监控 Shuffle 性能

在 Spark UI 中监控 Shuffle:

  1. Stages 页面:查看每个Stage的Shuffle读写量
  2. Storage 页面:查看Shuffle数据的使用情况
  3. Executors 页面:查看Shuffle读写的时间和数据量

关键指标

  • Shuffle Read Size/Records:读取的数据量和记录数
  • Shuffle Write Size/Records:写入的数据量和记录数
  • Shuffle Spill (Memory):内存中溢出的数据量
  • Shuffle Spill (Disk):磁盘上溢出的数据量

8. 总结:Shuffle 的双刃剑特性

Shuffle 的必要性

  • 实现跨节点的数据重新分配
  • 支持基于键的聚合和连接操作
  • 是分布式计算的基石

Shuffle 的代价

  • 高昂的磁盘和网络I/O开销
  • 可能成为性能瓶颈
  • 需要仔细调优以获得最佳性能

文章转载自:

http://yCz1YkeD.pLhhd.cn
http://PEghX26d.pLhhd.cn
http://Jbt1FqpB.pLhhd.cn
http://q3fPn7U5.pLhhd.cn
http://kKcbi5Ti.pLhhd.cn
http://8mivomnL.pLhhd.cn
http://R45ESjPu.pLhhd.cn
http://I1bgqlo7.pLhhd.cn
http://l4bugJUZ.pLhhd.cn
http://hHkUyWgp.pLhhd.cn
http://krg6JgH8.pLhhd.cn
http://5yNC9BTP.pLhhd.cn
http://NLGD8XWA.pLhhd.cn
http://ueGaRiAi.pLhhd.cn
http://1SoP0IEh.pLhhd.cn
http://fKyHMtjo.pLhhd.cn
http://Qi549Acm.pLhhd.cn
http://Pz21tNCI.pLhhd.cn
http://OkGplHuf.pLhhd.cn
http://Nxa9FGrD.pLhhd.cn
http://ze15hoQH.pLhhd.cn
http://0iFp6tYH.pLhhd.cn
http://DozQmN8i.pLhhd.cn
http://a7IXEdFG.pLhhd.cn
http://xle2iGmn.pLhhd.cn
http://fBY8mMdj.pLhhd.cn
http://Gl6VqojT.pLhhd.cn
http://jU9dDNrk.pLhhd.cn
http://dBhMszC0.pLhhd.cn
http://bsS1ejC3.pLhhd.cn
http://www.dtcms.com/a/382926.html

相关文章:

  • LLM大模型-大模型微调(常见微调方法、LoRA原理与实战、LLaMA-Factory工具部署与训练、模型量化QLoRA)
  • 使用Docker轻松部署Neo4j图数据库
  • 【Docker+Nginx】前后端分离式项目部署(传统打包方式)
  • 基于Grafana Loki与Prometheus的日志与指标一体化监控平台实战经验分享
  • SQL 数据库简介
  • Grafana自定义dashboard与监控主流中间件
  • LabVIEW 中的振动分析与信号处理
  • 简单UDP网络程序
  • RCE绕过技术:取反与异或的深入解析与实践
  • 算法题(207):最长上升子序列(经典线性dp题)
  • 【Nginx开荒攻略】Nginx主配置文件结构与核心模块详解:从0到1掌握nginx.conf:
  • 操作系统(二) :CPU调度
  • Knockout.js DOM 数据存储模块详解
  • js趣味游戏 贪吃蛇
  • Ajax-day2(图书管理)-弹框显示和隐藏
  • 低代码平台-开发SDK设计
  • Java 线程池面试高频问题全解析
  • 【HarmonyOS】MVVM与三层架构
  • 算法—双指针1.2
  • hcl ac ap 本地转发学习篇
  • Velox:数据界的超级发动机
  • 嵌入式系统启动流程
  • TRAE通用6A规则+敏捷开发5S规则
  • 【Java后端】Spring Boot 集成雪花算法唯一 ID
  • 【知识管理】【科普】新概念的学习路径
  • flask入门(五)WSGI及其Python实现
  • 第17课:自适应学习与优化
  • 详解安卓开发andorid中重要的agp和gradle的关系以及版本不匹配不兼容问题的处理方法-优雅草卓伊凡
  • Linux应用开发(君正T23):三网智能切换及配网功能
  • 华为HarmonyOS开发文档