当前位置: 首页 > wzjs >正文

品质好的衣服品牌三秦seo

品质好的衣服品牌,三秦seo,做淘宝客为什么要做网站,衡水网站设计公司哪家专业了解大厂经验拥有和大厂相匹配的技术等 希望看什么,评论或者私信告诉我! 文章目录 一、背景1. Join策略选择2. Hash Join实现3. Sort Merge Join实现 总结参考资料 一、背景 SparkSQL 现在基本上可以说是离线计算的大拿了,所以掌握了 Spark…
  1. 了解大厂经验
  2. 拥有和大厂相匹配的技术等

希望看什么,评论或者私信告诉我!

文章目录

    • 一、背景
      • 1. Join策略选择
      • 2. Hash Join实现
      • 3. Sort Merge Join实现
    • 总结
    • 参考资料

一、背景

SparkSQL 现在基本上可以说是离线计算的大拿了,所以掌握了 SparkSQL 的 Join 也就相当于掌握了这位大拿。

一直想要总结一下,今天遇到了 Broadcast 的一些事情,终于可以顺便把 SparkSQL 的 Join 总结一下

上一篇我们介绍了SparkSQL Join深度解析:三种实现方式全揭秘

为了更深入理解SparkSQL Join的实现原理,可以分析其源码。以下是SparkSQL Join的源码分析:

1. Join策略选择

SparkSQL在org.apache.spark.sql.execution.joins包中实现了各种Join策略。在Join类的doExecute方法中,会根据统计信息和配置选择合适的Join策略。

def doExecute(): RDD[InternalRow] = {val leftKeys = leftKeysArrayval rightKeys = rightKeysArrayif (joinType == JoinType.CROSS) {CrossHashJoin.doJoin(left, right, leftKeys, rightKeys, joinType, condition, leftFilters, rightFilters)} else {if (left.output.size > 0 && right.output.size > 0) {leftKeys.length match {case 0 =>// Cartesian productCartesianProduct.doJoin(left, right, joinType, condition, leftFilters, rightFilters)case 1 =>// Single key, use hash joinif (joinType == JoinType.INNER || joinType == JoinType.CROSS) {HashJoin.doJoin(left, right, leftKeys, rightKeys, joinType, condition, leftFilters, rightFilters)} else {// For outer joins, use sort merge join to preserve the orderSortMergeJoin.doJoin(left, right, leftKeys, rightKeys, joinType, condition, leftFilters, rightFilters)}case _ =>// Multiple keys, use sort merge joinSortMergeJoin.doJoin(left, right, leftKeys, rightKeys, joinType, condition, leftFilters, rightFilters)}} else {// One of the children has no output, return emptyRDD.empty[InternalRow](sparkContext)}}
}

2. Hash Join实现

Hash Join的实现主要在HashJoin类中。以下是Hash Join的主要实现步骤:

  1. 选择构建侧和Probe侧:根据统计信息选择较小的表作为构建侧
  2. 构建Hash表:将构建侧的数据按照Join键构建Hash表
  3. Probe阶段:将Probe侧的数据按照Join键进行查找
  4. 连接操作:根据Join类型(内连接、外连接等)进行相应的连接操作
object HashJoin {def doJoin(left: RDD[InternalRow],right: RDD[InternalRow],leftKeys: Array[Expression],rightKeys: Array[Expression],joinType: JoinType,condition: Option[Expression],leftFilters: Option[Expression],rightFilters: Option[Expression]): RDD[InternalRow] = {// 选择构建侧和Probe侧val (buildSide, probeSide) = chooseSides(left, right)val (buildKeys, probeKeys) = if (buildSide == BuildSide.LEFT) {(leftKeys, rightKeys)} else {(rightKeys, leftKeys)}// 构建Hash表val buildRDD = buildSide match {case BuildSide.LEFT =>left.mapPartitions(iter => {val keyToRows = new mutable.HashMap[Any, mutable.Buffer[InternalRow]]()iter.foreach(row => {val key = leftKeys.map(_.eval(row)).toArraykeyToRows.getOrElseUpdate(key, new mutable.ArrayBuffer[InternalRow]()) += row})iter ++ keyToRows.values.flatten})case BuildSide.RIGHT =>right.mapPartitions(iter => {val keyToRows = new mutable.HashMap[Any, mutable.Buffer[InternalRow]]()iter.foreach(row => {val key = rightKeys.map(_.eval(row)).toArraykeyToRows.getOrElseUpdate(key, new mutable.ArrayBuffer[InternalRow]()) += row})iter ++ keyToRows.values.flatten})}// Probe阶段val probeRDD = probeSide match {case BuildSide.LEFT =>right.mapPartitions(iter => {val keyToRows = new mutable.HashMap[Any, mutable.Buffer[InternalRow]]()iter.foreach(row => {val key = rightKeys.map(_.eval(row)).toArraykeyToRows.getOrElseUpdate(key, new mutable.ArrayBuffer[InternalRow]()) += row})iter ++ keyToRows.values.flatten})case BuildSide.RIGHT =>left.mapPartitions(iter => {val keyToRows = new mutable.HashMap[Any, mutable.Buffer[InternalRow]]()iter.foreach(row => {val key = leftKeys.map(_.eval(row)).toArraykeyToRows.getOrElseUpdate(key, new mutable.ArrayBuffer[InternalRow]()) += row})iter ++ keyToRows.values.flatten})}// 连接操作probeRDD.join(buildRDD).mapPartitions(iter => {iter.flatMap { case (key, (probeRow, buildRow)) =>// 根据Join类型进行连接操作joinType match {case JoinType.INNER =>if (condition.map(_.eval(probeRow, buildRow)).getOrElse(true)) {Some(InternalRow.fromSeq(probeRow ++ buildRow))} else {None}case JoinType.LEFT =>if (condition.map(_.eval(probeRow, buildRow)).getOrElse(true)) {Some(InternalRow.fromSeq(probeRow ++ buildRow))} else {Some(InternalRow.fromSeq(probeRow ++ Seq.fill(buildRow.length)(null)))}case JoinType.RIGHT =>if (condition.map(_.eval(probeRow, buildRow)).getOrElse(true)) {Some(InternalRow.fromSeq(Seq.fill(probeRow.length)(null) ++ buildRow))} else {Some(InternalRow.fromSeq(Seq.fill(probeRow.length)(null) ++ buildRow))}case JoinType.FULL =>if (condition.map(_.eval(probeRow, buildRow)).getOrElse(true)) {Some(InternalRow.fromSeq(probeRow ++ buildRow))} else {Some(InternalRow.fromSeq(probeRow ++ Seq.fill(buildRow.length)(null)))Some(InternalRow.fromSeq(Seq.fill(probeRow.length)(null) ++ buildRow))}}}})}
}

3. Sort Merge Join实现

Sort Merge Join的实现主要在SortMergeJoin类中。以下是Sort Merge Join的主要实现步骤:

  1. 排序:对两个表按照Join键进行排序
  2. 合并:使用双指针技术合并两个排序后的数据集
  3. 连接操作:根据Join类型进行连接操作
object SortMergeJoin {def doJoin(left: RDD[InternalRow],right: RDD[InternalRow],leftKeys: Array[Expression],rightKeys: Array[Expression],joinType: JoinType,condition: Option[Expression],leftFilters: Option[Expression],rightFilters: Option[Expression]): RDD[InternalRow] = {// 排序val sortedLeft = left.sortBy(row => leftKeys.map(_.eval(row)).toArray)val sortedRight = right.sortBy(row => rightKeys.map(_.eval(row)).toArray)// 合并sortedLeft.zip(sortedRight).mapPartitions(iter => {val leftIter = iter.map(_._1).iteratorval rightIter = iter.map(_._2).iteratorval leftRow = new mutable.ArrayBuffer[InternalRow]()val rightRow = new mutable.ArrayBuffer[InternalRow]()while (leftIter.hasNext && rightIter.hasNext) {val l = leftIter.next()val r = rightIter.next()val lKey = leftKeys.map(_.eval(l)).toArrayval rKey = rightKeys.map(_.eval(r)).toArrayif (lKey < rKey) {leftRow += l} else if (lKey > rKey) {rightRow += r} else {// Join键相等,进行连接操作if (condition.map(_.eval(l, r)).getOrElse(true)) {yield JoinedRow(l, r)}// 处理重复键while (leftIter.hasNext && leftKeys.map(_.eval(leftIter.head)).toArray == lKey) {leftRow += leftIter.next()}while (rightIter.hasNext && rightKeys.map(_.eval(rightIter.head)).toArray == rKey) {rightRow += rightIter.next()}// 生成所有可能的组合for (l <- leftRow; r <- rightRow) {if (condition.map(_.eval(l, r)).getOrElse(true)) {yield JoinedRow(l, r)}}leftRow.clear()rightRow.clear()}}// 处理剩余的行while (leftIter.hasNext) {leftRow += leftIter.next()}while (rightIter.hasNext) {rightRow += rightIter.next()}// 根据Join类型处理剩余的行joinType match {case JoinType.INNER =>// 不需要处理剩余的行case JoinType.LEFT =>for (l <- leftRow) {if (leftFilters.map(_.eval(l)).getOrElse(true)) {yield JoinedRow(l, null)}}case JoinType.RIGHT =>for (r <- rightRow) {if (rightFilters.map(_.eval(r)).getOrElse(true)) {yield JoinedRow(null, r)}}case JoinType.FULL =>for (l <- leftRow) {if (leftFilters.map(_.eval(l)).getOrElse(true)) {yield JoinedRow(l, null)}}for (r <- rightRow) {if (rightFilters.map(_.eval(r)).getOrElse(true)) {yield JoinedRow(null, r)}}}})}
}

总结

本报告详细介绍了SparkSQL中Join的实现方式,包括Broadcast Join、Hash Join(包括Shuffle Hash Join)和Sort Merge Join。通过分析它们的实现原理、工作流程和适用场景,我们可以更好地理解SparkSQL中Join操作的内部机制。
在实际应用中,选择合适的Join策略对于提高SparkSQL查询性能至关重要。根据表的大小、数据分布和内存资源选择合适的Join策略,可以显著提高Join操作的性能。
通过深入理解SparkSQL Join的实现原理,我们可以更好地优化SparkSQL查询,提高大数据处理的效率和性能。

参考资料

  1. Spark SQL join的三种实现方式
  2. Spark SQL Join实现原理
  3. SparkSQL中的三种Join及其具体实现
  4. Spark SQL Performance Tuning Documentation
  5. Spark SQL Join Source Code Analysis

文章转载自:

http://rBbc6gAx.jqwpw.cn
http://lusBFssD.jqwpw.cn
http://rIE141fj.jqwpw.cn
http://dz8c8Qj9.jqwpw.cn
http://I0d6xKdu.jqwpw.cn
http://jy8Lxb7a.jqwpw.cn
http://fZs5kLJS.jqwpw.cn
http://GZo6klgq.jqwpw.cn
http://UFPjJW3Y.jqwpw.cn
http://FJZhm2k4.jqwpw.cn
http://bPe4ibQu.jqwpw.cn
http://lomchOVb.jqwpw.cn
http://8bH8ofLo.jqwpw.cn
http://jU6HzO0N.jqwpw.cn
http://nF6T0eFF.jqwpw.cn
http://Nvi8ZeXt.jqwpw.cn
http://O2sfsyFE.jqwpw.cn
http://Mk99kdtI.jqwpw.cn
http://E1nUPVGZ.jqwpw.cn
http://3HYfA4Iu.jqwpw.cn
http://0LwxUqp1.jqwpw.cn
http://HFBSFedK.jqwpw.cn
http://z8tntSSN.jqwpw.cn
http://QlYAHVoV.jqwpw.cn
http://PwQvaqcz.jqwpw.cn
http://2MZ6MTZe.jqwpw.cn
http://NHKyuPij.jqwpw.cn
http://CWiSkXr6.jqwpw.cn
http://GZUW1u6n.jqwpw.cn
http://gOE0Ra10.jqwpw.cn
http://www.dtcms.com/wzjs/763549.html

相关文章:

  • Myeclipse怎么做网站网站文档怎么加图片不显示
  • 哪个网站可以做英文兼职网站建设行业产业链分析
  • 网站建设企业排名推广我要建一个网站
  • 北京撒网站设计网站零基础学电脑培训班
  • ps做网站效果图制作过程效果图官网
  • 运动鞋网页ui设计南通网站建设优化公司
  • seo网站排名优化公司建设银行怎么从网站上改手机号码
  • 漂亮网站首页 html后台html模板
  • 建设网站的公司swot网站的标志是什么字体
  • 没有网站怎么做百度优化做招聘网站还有法盈利吗
  • 学校网站建设都是谁做的哪个网站做相册好
  • 有没有教做韩餐的网站wordpress 科技类主题
  • 平台网站模板重庆最著名的十大景点
  • flash素材网站Wordpress访问速度代码
  • 论坛网站建设视频电子商务主要课程
  • 网站建设如何加入字体青岛开发区建网站公司
  • 玉溪市建设厅官方网站深圳好看的公司网站
  • 湖南火电建设有限公司招标网站游戏推广引流软件
  • 专门做辅助的网站可爱风格网站
  • 汇云网站建设wordpress 取消标志
  • 电子商务网站建设估算住房建设部官方网站专家注册
  • 网站后台视频教程免费的h5
  • 口碑好的定制网站建设服务商免费查企业app
  • 网站建设方案策划河北建设集团官网
  • 建筑网站排行wordpress模版修改
  • 中山做网站服务好中文互联网巨头衰亡史
  • wordpress点播主题苏州关键词优化怎样
  • 网站备案情况查询品牌建设标题
  • 潍坊市城市建设官网站制作花灯
  • 做网站哪家公司便宜wordpress go