当前位置: 首页 > news >正文

Spark的Broadcast Join以及其它的Join策略

一、什么是Broadcast Join?

当有一张表较小时,我们通常选择 Broadcast Hash Join,这样可以避免 Shuffle 带来的开销,从而提高性能。比如事实表与维表进行JOIN时,由于维表的数据通常会很小,所以可以使用 BHJ 将维表进行 Broadcast。这样可以避免数据Shuffle(在 Spark 中 Shuffle 操作是很耗时的),从而提高 JOIN 的效率。

在进行Broadcast Join 之前,Spark 需要把处于 Executor 端的数据先发送到 Driver 端,然后Driver 端再把数据广播到 Executor 端。如果我们需要广播的数据比较多,会造成Driver 端出现OOM。具体如下图示:
在这里插入图片描述
一句话简单理解一下其原理:

  1. 小表广播:首先,Spark会将小表发送给driver,然后再广到集群中的每个工作节点。
  2. 大表分区:大表会被分区并分配到不同的工作节点。
  3. 本地Join操作:每个大表的工作节点接收到广播的小表后,都会在本地执行Join操作,这样大大减少了数据的网络传输。

条件与特点:

  1. 仅支持等值连接,join key 不需要排序
  2. 支持除了全外连接(full outer)之外的所有 join 类型
  3. 在 Driver 端缓存数据,所以当小表的数据量较大时,会出现 OOM 的情况
  4. 被广播的小表的数据量要小于 spark.sql.autoBroadcastJoinThreshold 值,默认是 10MB,但是被广播表的大小阈值不能超过 8GB,否则会报错。
  5. 基表不能被 broadcast,比如左连接时,只能将右表进行广播。

二、优化引擎如何判断需要走Broadcast Join?

在使用Hive SQL时,如果底层执行引擎是Spark,优化引擎会根据统计信息和查询计划来决定是否使用broadcast join。以下是优化引擎(特别是spark的catalyst优化器)判断是否使用broadcast join的一般步骤和考虑因素:

  1. 收集统计信息
    为了进行优化,Hive会依赖表和列的统计信息,这些统计信息包括表的大小、行数、列的基数等,Hive通过以下几种方式收集统计信息:
    ANALYZE TABLE语句:显示运行ANALYZE TABLE命令来收集表的统计信息
    自动统计,某些i情况下适用

  2. Hive逻辑计划的生成
    Hive解析Hive SQL查询并生成初始的逻辑执行计划,这个执行计划包含了所有需要执行的操作步骤,但还没有进行任何优化。

  3. 基于规则的优化(RBO)
    初始的逻辑计划会经过基于规则的优化,进行一些基本的转换和简化。例如,常见的优化包括:谓词下推、列裁剪等等。

  4. 基于代价的优化(CBO)
    在代价优化阶段,hive的优化器会使用统计信息来评估不同执行计划的成本。
    hive通过以下步骤来决定是否使用broadcast join:

    • 评估表的大小:优化器会检查连接操作中表的大小,如果表足够小,可以考虑将其广播到所有节点。
    • 计算Join成本:优化器会比较不同Join策略的代价(广播、Shuffle Hash Join、Sort Merge Join)
    • 选择最优策略:如果broadcast join的代价低于其他join策略并且满足内存限制,优化器会选择broadcast join
  5. 当使用spark作为执行引擎时,hive的逻辑计划会被转换成spark的物理计划,catalyst优化器将进一步优化执行计划,将小表标记为广播表。

三、还有哪些Join策略?

3.1 Shuffle Hash Join

首先,对于两张参与join的表,分别按照join key进行重分区,该过程会涉及shuffle,其目的是将相同Join Key的数据发送到同一个分区,方便分区内进行Join。

其次,对于每个Shuffle之后的分区,会将小表的分区数据构建成一个hash table,然后根据join key与大表的分区数据进行匹配。

在这里插入图片描述

shuffle hash join主要包括两个阶段:

  • Shuffle 阶段:两张大表根据Join Key进行Shuffle重分区
  • merge 阶段:对来自不同表的分区进行Join,通过遍历元素,连接具有相同Join Key的值的行来合并数据集。

条件与特点:

  1. 仅支持等值连接,join key 不需要排序
  2. 支持所有的 join 类型(3.1.0之前不支持full outer join)
  3. 需要对小表构建 Hash table,属于内存密集型的操作,如果构建侧数据比较大,可能会造成 OOM
  4. 将参数 spark.sql.join.prefersortmergeJoin (default true)置为false

3.2 Sort Merge Join

Spark默认的Join方式,非常常见。

一般在两个大表进行Join时。使用该方式。Sort merge join可以减少集群中的数据传输,该方式不会先加载所有数据到内存,然后进行hash join,但是在join之前需要对join key进行排序

在这里插入图片描述
Sort merge join主要包括三个阶段:

  • shuffle 阶段:两张大表根据join key进行shuffle重分区
  • sort 阶段:每个分区内的数据进行排序(原因就是,这种情况下的右表比较大,无法完全的放入到内存中(放不下的数据需要spill到磁盘中),所以进行排序
  • merge阶段:对来自不同表的排序好的分区进行join,通过遍历元素,连接具有相同的key值的行来合并数据集(因为是遍历的,所以不需要在内存中构建哈希表)

条件与特点:

  1. 仅支持等值连接
  2. 支持所有 join 类型
  3. Join Keys 是排序的
  4. 参数 spark.sql.join.prefersortmergeJoin (默认 true)设定为true

解释一下为什么不需要构建哈希表

因为两个序列都是有序的,从头遍历,碰到key相同的就输出;如果不同,左边小就继续取左边,反之取右边。可以看出,无论分区有多大,Sort Merge Join都不用把某一侧的数据全部加载到内存中,而是即用即取即丢,从而大大提升了大数据量下SQL Join的稳定性(如果使用广播或者Shuffle Hash Join,万一数据量膨胀,非常容易出现OOM问题,Sort Merge Join对内存的要求小很多,所以基本不会出现OOM问题,Join稳定性得到保证)。

在这里插入图片描述

3.3 Cartesian Join

在日志里看到这种join的时候,大概率是写出bug了

因为这种join,类似于参与join的表没有指定join key,这个join得到的结果其实就是两张表的笛卡尔积。

条件:

  1. 仅支持内连接
  2. 支持等值和不等值连接
  3. 开启参数 spark.sql.crossJoin.enabled=true

3.4 Broadcast Nested Loop Join(BNLJ)

兜底策略

该方式是在没有合适的 JOIN 机制可供选择时,最终会选择该种join 策略。优先级为:Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join >Broadcast Nested Loop Join

在 Cartesian 与 Broadcast Nested Loop Join 之间,如果是内连接,或者非等值连接,则优先选择 Cartesian Join 策略;当是非等值连接并且一张表可以被广播时,会选择Broadcast Nested Loop。

条件与特点:

  1. 支持等值和非等值连接
  2. 支持所有的 JOIN 类型,主要优化点如下:
    • 当右外连接时要广播左表
    • 当左外连接时要广播右表
    • 当内连接时,要广播左右两张表

四、一些额外补充

4.1 Sort Merge Join VS Shuffle Hash Join

在这里插入图片描述

4.2 Sort Merge Join VS Broadcast Join

某些场景下,”广播”的任务,效率不一定高哦。

比如:

Join两边的数据大小都是1G,spark.sql.autoBroadcastJoinThreshold的值设置为2g,程序走了广播,这个时候广播的操作就会比较耗时,网络传输成本会非常高。

这种场景更加适合Sort merge Join。

4.3 在等值数据关联中,谁的效率最高呢?

在等值数据关联中,Spark 会尝试按照 BHJ > SMJ > SHJ 的顺序,依次选择 Join 策略。在这三种策略中,执行效率最高的是 BHJ,其次是 SHJ,再次是 SMJ。BHJ 尽管效率最高,但是有两个前提条件:

  1. 连接类型不能是全连接(Full Outer Join)
  2. 基表要足够小,可以放到广播变量里面去。

那为什么 SHJ 比 SMJ 执行效率高,排名却不如 SMJ 靠前呢?
这是个非常好的问题。我们先来说结论,相比 SHJ,Spark 优先选择 SMJ 的原因在于,SMJ 的实现方式更加稳定,更不容易 OOM

回顾 HJ 的实现机制,在 Build 阶段,算法根据内表创建哈希表。在 Probe 阶段,为了让外表能够成功“探测”(Probe)到每一个 Hash Key,哈希表要全部放进内存才行。坦白说,这个前提还是蛮苛刻的,仅这一点要求就足以让 Spark 对其望而却步。 要知道,在不同的计算场景中,数据分布的多样性很难保证内表一定能全部放进内存

而且在 Spark 中,SHJ 策略要想被选中必须要满足两个先决条件,这两个条件都是对数据尺寸的要求:

  1. 首先,外表大小至少是内表的 3 倍。
  2. 其次,内表数据分片的平均大小要小于广播变量阈值。

第一个条件的动机很好理解,只有当内外表的尺寸悬殊到一定程度时,HJ 的优势才会比 SMJ 更显著。第二个限制的目的是,确保内表的每一个数据分片都能全部放进内存。 和 SHJ 相比,SMJ 没有这么多的附加条件,无论是单表排序,还是两表做归并关联,都可以借助磁盘来完成,内存中放不下的数据,可以临时溢出到磁盘。单表排序的过程,我们可以参考 Shuffle Map 阶段生成中间文件的过程。在做归并关联的时候,算法可以把磁盘中的有序数据用合理的粒度,依次加载进内存完成计算。这个粒度可大可小,大到以数据分片为单位,小到逐条扫描。 正是考虑到这些因素,相比 SHJ,Spark SQL 会优先选择 SMJ。

事实上,在配置项 spark.sql.join.preferSortMergeJoin 默认为 True 的情况下,Spark SQL 会用 SMJ 策略来兜底,确保作业执行的稳定性,压根就不会打算去尝试 SHJ。开发者如果想通过配置项来调整 Join 策略,需要把这个参数改为 False,这样 Spark SQL 才有可能去尝试 SHJ。

http://www.dtcms.com/a/439301.html

相关文章:

  • 宝安做网站的公司网站快速排名的方法
  • 重庆网站建设公司的网站西安做商铺的网站
  • 嵌入式开发学习日志33——stm32之PWM舵机简单项目
  • 桂林旅游网站建设品牌营销的四大策略
  • 为什么Java线程栈容易溢出?
  • 怎么做福彩网站营销系统
  • Java 后端与 AI 融合:技术路径、实战案例与未来趋势
  • 一键建站公司wordpress 404 插件
  • 大连网站设计培训班网站建设公司推荐互赢网络
  • 网站一般建什么百度公司官网招聘
  • 如何使用unity制作游戏
  • Mosquitto 安全架构深度解析:security.c 与 security_default.c 的作用与协同机制
  • 国外打开网站会不会乱码龙岗做商城网站建设
  • css选择器继承性
  • 做投资的网站高端网站建设成都
  • 丹阳网站怎么做seo主机屋 WordPress 问题 多
  • 中文名字英文名字日本名字txt合集
  • 搜狗seo快速排名公司山东东营网络seo
  • 如何做百度的网站手工制作花朵
  • 【2025最新】ArcGIS for JS 实现地图卷帘效果
  • 网站域名备案密码新网站 被百度收录
  • 做网站开发没有人带贵阳市住房城乡建设局八大员网站
  • Vue3+TypeScript开发:从ProTable封装到Echarts联动
  • (二分、思维)洛谷 P4090 USACO17DEC Greedy Gift Takers P 题解
  • 业务层的抽取和业务层方法的实现详解
  • 【开题答辩全过程】以 “人和小区”便民快递平台为例,包含答辩的问题和答案
  • 找网络公司建网站每年收维护费手机网站会员中心模板
  • 网站建设公司谁管网络营销的发展趋势和前景
  • 网站建设公司包括哪些溧阳建设集团网站
  • wordpress访客统计插件网络优化怎么自己做网站