当前位置: 首页 > news >正文

Spark执行计划与UI分析

文章目录

  • 1.Spark任务阶段划分
    • 1.1 job,stage与task
    • 1.2 job划分
    • 1.3 stage和task划分
  • 2.任务执行时机
  • 3.task内部数据存储与流动
  • 4.根据sparkUI了解Spark执行计划
    • 4.1查看job和stage
    • 4.2 查看DAG图
    • 4.3查看task

1.Spark任务阶段划分

1.1 job,stage与task

  • 首先根据action()操作顺序将应用划分为作业job。
  • 根据每个job的逻辑处理流程中的ShuffleDependency依赖关系,将job划分为执行阶段stage。
  • 在每个stage中,根据最后生成的RDD的分区个数生成多个计算任务task。

1.2 job划分

举一个简单的例子,在下面这段代码中:

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, count, col# 初始化SparkSession
spark = SparkSession.builder.appName("MultiJobStageTaskExample").getOrCreate()# 读取数据(Transformation,不触发Job)
orders = spark.read.csv("orders.csv",header=True,inferSchema=True
).select("用户ID", "订单金额", "支付方式")users = spark.read.csv("users.csv",header=True,inferSchema=True
).select("用户ID", "所在城市")# 缓存重复使用的数据集(优化性能)
orders.cache()
users.cache()# --------------------------
# Job 1:计算不同支付方式的订单数和总金额
# --------------------------
payment_analysis = orders.groupBy("支付方式") \.agg(count("用户ID").alias("订单数"),  # 聚合操作(宽依赖,触发Shuffle)sum("订单金额").alias("总金额"))# Action操作:触发Job 1
payment_result = payment_analysis.collect()  # Job 1
print("支付方式分析结果:", payment_result)# --------------------------
# Job 2:计算每个城市的平均订单金额
# --------------------------
city_analysis = orders.join(users, on="用户ID", how="inner") \  #  join是宽依赖(Shuffle).groupBy("所在城市") \  # 再次宽依赖(Shuffle).agg(sum("订单金额").alias("城市总金额"),count("用户ID").alias("城市订单数")) \.withColumn("平均订单金额", col("城市总金额") / col("城市订单数"))# Action操作:触发Job 2
city_analysis.write.csv("city_avg_order")  # Job 2# --------------------------
# Job 3:统计高消费用户(订单总金额>10000)的分布
# --------------------------
high_value_users = orders.groupBy("用户ID") \  # 宽依赖(Shuffle).agg(sum("订单金额").alias("用户总消费")) \.filter(col("用户总消费") > 10000) \  # 过滤(窄依赖).join(users, on="用户ID", how="inner")  # 宽依赖(Shuffle)# Action操作:触发Job 3
high_value_count = high_value_users.count()  # Job 3
print("高消费用户数量:", high_value_count)spark.stop()

根据payment_analysis.collect(),city_analysis.write.csv(“city_avg_order”)和high_value_count = high_value_users.count(),这段代码被划分成了三个job。

1.3 stage和task划分

如下图所示,在一个job中,出现了shuffle操作,就会划分一个stage。再根据每个stage中的分区数量划分task数量。
在这里插入图片描述

2.任务执行时机

  • job的提交时间与action()被调用的时间有关,当应用程序执行到rdd.action()时,就会立即将rdd.action()形成的job提交给Spark。这其实也就是为什么有的时候写完代码没有运行的原因,因为没写action()操作,job不会被提交到Spark
  • 仅当上游的stage都执行完成后,再执行下游的stage。如果stage之间没有依赖,则并行执行,例如stage1和stage0是并行执行,当且仅当两者执行后,stage2才开始执行。
  • stage中每个task因为是独立而且同构的,可以并行运行没有先后之分。

3.task内部数据存储与流动

task是根据分区来划分的,而一个分区中有很多个record,根据不同record之间的关系,存储的方式也不同:
在这里插入图片描述
这是一个task的执行流程的几种不同的情况:

  • 第一个流程:record之间并没有相互依赖,因此可以进行流式处理,即record1处理成record1’之后就可以将record1从内存中删掉,而不用关心record2和record3处理到哪里了。
  • 第二个流程:f()流程无相互依赖,但是g()流程有相互依赖,也就是说record1在处理成record1’‘后,record1’‘会被保存到内存中,直到record2’‘和record3’'被处理完成。
  • 第三个流程:同理,在record1,record2和record3都被算出之后,才能执行f(),而在执行g()时,record1’,record2’和record3’才不会相互依赖。
  • 第四个流程:无法进行流水线处理,每处理完一个操作,才能回收该操作的输入结果。

4.根据sparkUI了解Spark执行计划

4.1查看job和stage

在spark的首界面可以看到当前正在执行的job:
在这里插入图片描述
点击job的链接,可以看到当前job中的stage数量:
在这里插入图片描述
其中stage 0包含3个task,共Shuffle Write了376.0B,stage 1包含4个task,共Shuffle Write了988.0B,而stage 2包含3个task,一共Shuffle Read了1364.0B=376.0B+988.0B。

4.2 查看DAG图

将Job链接中界面上的DAG Visualization展开,可以看到正在执行的DAG图:
在这里插入图片描述
每个黑色实心圆圈代表一个RDD,但这个图稍显混乱,stage 0中parallelize操作生成的RDD应该是被stage 2中的partitionBy处理的,与stage 1中的parallelize无关,也就是stage 0到stage 2的横箭头并没有在stage1中作停留生成一个RDD
如果想进一步了解黑色实心圆圈代表哪些RDD,则可以进入stage的UI界面:
在这里插入图片描述
这张图展示了每个操作会生成哪些RDD(如join()操作生成了CoGroupedRDD及两个MapPartitionsRDD),但没有展示stage之间的连接关系。但是没有展示Stage的连接关系。

4.3查看task

在某个stage界面,可以看到该stage的task信息:
在这里插入图片描述
stage 0包含3个task,每个task都进行了Shuffle Write,写入了2~3个record,也就是说Spark UI中也会统计Shuffle Write/Read的record数目。
在这里插入图片描述
stage 1包含4个task,每个task都进行了ShuffleWrite,写入了2个record。
在这里插入图片描述
stage 2包含3个task,每个task从上游的stage 0/1那里Shuffle Read了5~6个record。

http://www.dtcms.com/a/324227.html

相关文章:

  • 【软考中级网络工程师】知识点之 DCC 深度剖析
  • 系统架构设计师备考之架构设计高级知识
  • 企业高性能web服务器——Nginx
  • App Trace 功能详解 (开发者视角)
  • IDEA 如何导入系统设置
  • 从0到1学LangChain之Agent代理:解锁大模型应用新姿势
  • 【机器学习深度学习】Embedding 模型详解:从基础原理到实际应用场景
  • Xstream反序列化,fastjson,jcakson靶场复现
  • 刑法视野下的虚拟财产属性争议:法律风险与市场潜力解析
  • ThinkPHP8学习篇(二):路由
  • Day39--动态规划--198. 打家劫舍,213. 打家劫舍 II,337. 打家劫舍 III
  • Code Exercising Day 10 of “Code Ideas Record“:StackQueue part02
  • MVCC和日志
  • 国内外主流大模型深度体验与横向评测:技术、场景与未来展望
  • 后置定语:for + 宾语 + 被动不定式
  • CentOS 10在文本控制台模式下修改字体大小
  • 2020/12 JLPT听力原文 问题一
  • LLM多模态模型应用探索调研
  • 【0基础3ds Max】主工具栏介绍(下)
  • 故障诊断 | VMD-CNN-LSTM西储大学轴承故障诊断附MATLAB代码
  • 智慧社区--4
  • 【C++详解】红黑树规则讲解与模拟实现(内附红黑树插入操作思维导图)
  • 本地代码上传Github步骤
  • 《设计模式》UML类图
  • 通过trae开发你的第一个Chrome扩展插件
  • A4.0:继C5.2的BJT理论引申的开关作用的应用示例
  • DAY36打卡
  • 计算机网络:求地址块128.14.35.7/20中的相关信息
  • 枚举-dfs深度优先搜索
  • 女子试穿4条裤子留下血渍赔50元引争议:消费责任边界在哪?