当前位置: 首页 > news >正文

Spark内核调度

一、DAG

DAG的方向其实就是代码的实现流程

演示代码

# coding:utf8
# 导入Spark的相关包
from pyspark import SparkConf,SparkContext
from pyspark.storagelevel import StorageLevel
from defs import context_jieba,filter_words,append_words,extract_user_and_word
from operator import add
import time
if __name__ == "__main__":# 0.初始化执行环境,构建SparkContext对象conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 1.读取数据文件file_rdd = sc.textFile("hdfs://node1:8020/input/SogouQ.txt")# 2.对数据进行切分\tsplit_rdd = file_rdd.map(lambda x:x.split("\t"))# 3.因为要做多个需求,split_rdd作为基础的rdd,会被多次使用split_rdd.persist(StorageLevel.DISK_ONLY)# TODO:需求1:用户搜索的关键’词‘分析# 主要分析热点词# 将所有的搜索内容取出# print(split_rdd.takeSample(True, 3))context_rdd = split_rdd.map(lambda x:x[2])# 对搜索的内容进行分词分析words_rdd = context_rdd.flatMap(context_jieba)# print(words_rdd.collect())# 院校 帮 -> 院校帮# 博学 谷 -> 博学谷# 传智播 客 -> 传智播客filtered_rdd =words_rdd.filter(filter_words)# 将关键词转换:传智播 -> 传智播客final_words_rdd = filtered_rdd.map(append_words)# 对单词进行分组、聚合、排序求出前5名result1 = final_words_rdd.reduceByKey(lambda a,b:a+b).\sortBy(lambda x:x[1],ascending=False,numPartitions=1).\take(5)print("需求1结果:",result1)# TODO:需求2:用户和关键词组合分析# 1,我喜欢传智播客# 1+我 1+喜欢 1+传智播客user_context_rdd = split_rdd.map(lambda x:(x[1],x[2]))# 对用户的搜索内容进行分词,分词后和用户ID再次组合user_word_with_one_rdd = user_context_rdd.flatMap(extract_user_and_word)# 对内容进行 分组、聚合、排序 ,求前5result2 = user_word_with_one_rdd.reduceByKey(lambda a,b:a+b).\sortBy(lambda x:x[1],ascending=False,numPartitions=1).\take(5)print("需求2结果:",result2)# TODO:需求3:热门搜索时间段分析# 取出来所有的时间time_rdd = split_rdd.map(lambda x:x[0])# 对时间进行处理,只保留小时精度即可hour_with_one_rdd = time_rdd.map(lambda x:(x.split(":")[0],1))# 分组 聚合 排序result3 = hour_with_one_rdd.reduceByKey(add).\sortBy(lambda x:x[1],ascending=False,numPartitions=1).\collect()print("需求3结果:",result3)

针对代码的DAG图

一个action会产生1个job,每一个job有各自自己的DAG图

执行以上代码,可以在4040端口查看,有三个job

带有分区关系的DAG图

二、DAG的宽窄依赖和阶段划分

当我们遇到一个宽依赖就划分,这样就可以保证每一个阶段内都是窄依赖

三、内存迭代计算

面试题

四、spark并行度

先有并行度才有分区,比如先有三个并行度,对应才有了三个分区

rdd的一个分区只会被一个并行处理,也就是一个task处理

一个task可以处理多个rdd,但是只能处理rdd的一个分区

也就是在横向纵向上,都是一对一的关系

设置多个并行度的目的就是为了不让cpu闲下来,最大程度利用集群的资源

五、spark的任务调度

注意: executors  的数量和并行度无关

先规划出task,在根据executors进行分配

为了保证并行能力,将六个task合理分配到executors

        如果一台服务器上运行两个executor,相当于两个进程,之间也是走网络,只不过是走的本地回环网络,比跨机器的网络快一点,因为进程之间是无法通过内存传输的

        假设一台服务器上有四个task,一个executor,task之间就是通过内存传输,两个executor,不同executor中的task就是通过网络传输

        建议多少台服务器,设置多少个executor

六、扩展 - spark运行中的概念名词大全

七、总结

http://www.dtcms.com/a/315727.html

相关文章:

  • RTC实时时钟RX8900SA国产替代FRTC8900S
  • 使用maven-shade-plugin解决es跨版本冲突
  • 微信小程序功能实现:页面导航与跳转
  • jenkins插件Active Choices的使用通过参数动态控制多选参数的选项
  • LHA6958D是一款代替AD7606的芯片
  • 【前端】网站favicon图标制作
  • MyBatisPlus查询数据库中所有表的数据(AI)
  • 使标签垂直水平居中的多种方法
  • 自动驾驶控制算法——MPC控制算法
  • 数据结构 实现单链表
  • Vue3核心语法进阶(Props)
  • C语言:选择排序算法深度剖析!
  • nodejs 编码初体验
  • JAVA无人共享球杆柜系统球杆柜租赁系统源码支持微信小程序
  • 嵌入式硬件中运放的基本控制原理
  • 基于k8s环境下的pulsar常用命令(上)
  • 达梦分布式集群DPC_分布式任务执行拆分流程_yxy
  • 安全测绘之敏感网络资产排查指南
  • 在Linux上部署RabbitMQ、Redis、ElasticSearch
  • Taro Hooks 完整分类详解
  • 深度解析随机森林 API:参数奥秘与调优指南
  • 在AI时代,如何制定有效的职业规划?AI时代职业规划+AI产品经理角色
  • 【学习笔记】NTP时间同步验证
  • Kali Linux 2025.2基于MITRE ATTCK框架
  • DPU(数据处理单元)架构中,SoC(系统级芯片)与FPGA(现场可编程门阵列)之间的数据交互
  • 山东移动e企组网技术分析:底层架构与实现方式
  • 第12届蓝桥杯Scratch_选拔赛_初级组_真题2020年11月21日
  • SpringBoot3.x入门到精通系列:4.2 整合 Kafka 详解
  • Linux第十二讲:线程概念与控制
  • 前端保持和服务器时间同步的方法【使用vue3举例】