当前位置: 首页 > wzjs >正文

网站开发与维护岗位说明书经营管理培训课程

网站开发与维护岗位说明书,经营管理培训课程,如何从零开始做网站,国外免费网站了解大厂经验拥有和大厂相匹配的技术等 希望看什么,评论或者私信告诉我! 文章目录 一、背景1. Join策略选择2. Hash Join实现3. Sort Merge Join实现 总结参考资料 一、背景 SparkSQL 现在基本上可以说是离线计算的大拿了,所以掌握了 Spark…
  1. 了解大厂经验
  2. 拥有和大厂相匹配的技术等

希望看什么,评论或者私信告诉我!

文章目录

    • 一、背景
      • 1. Join策略选择
      • 2. Hash Join实现
      • 3. Sort Merge Join实现
    • 总结
    • 参考资料

一、背景

SparkSQL 现在基本上可以说是离线计算的大拿了,所以掌握了 SparkSQL 的 Join 也就相当于掌握了这位大拿。

一直想要总结一下,今天遇到了 Broadcast 的一些事情,终于可以顺便把 SparkSQL 的 Join 总结一下

上一篇我们介绍了SparkSQL Join深度解析:三种实现方式全揭秘

为了更深入理解SparkSQL Join的实现原理,可以分析其源码。以下是SparkSQL Join的源码分析:

1. Join策略选择

SparkSQL在org.apache.spark.sql.execution.joins包中实现了各种Join策略。在Join类的doExecute方法中,会根据统计信息和配置选择合适的Join策略。

def doExecute(): RDD[InternalRow] = {val leftKeys = leftKeysArrayval rightKeys = rightKeysArrayif (joinType == JoinType.CROSS) {CrossHashJoin.doJoin(left, right, leftKeys, rightKeys, joinType, condition, leftFilters, rightFilters)} else {if (left.output.size > 0 && right.output.size > 0) {leftKeys.length match {case 0 =>// Cartesian productCartesianProduct.doJoin(left, right, joinType, condition, leftFilters, rightFilters)case 1 =>// Single key, use hash joinif (joinType == JoinType.INNER || joinType == JoinType.CROSS) {HashJoin.doJoin(left, right, leftKeys, rightKeys, joinType, condition, leftFilters, rightFilters)} else {// For outer joins, use sort merge join to preserve the orderSortMergeJoin.doJoin(left, right, leftKeys, rightKeys, joinType, condition, leftFilters, rightFilters)}case _ =>// Multiple keys, use sort merge joinSortMergeJoin.doJoin(left, right, leftKeys, rightKeys, joinType, condition, leftFilters, rightFilters)}} else {// One of the children has no output, return emptyRDD.empty[InternalRow](sparkContext)}}
}

2. Hash Join实现

Hash Join的实现主要在HashJoin类中。以下是Hash Join的主要实现步骤:

  1. 选择构建侧和Probe侧:根据统计信息选择较小的表作为构建侧
  2. 构建Hash表:将构建侧的数据按照Join键构建Hash表
  3. Probe阶段:将Probe侧的数据按照Join键进行查找
  4. 连接操作:根据Join类型(内连接、外连接等)进行相应的连接操作
object HashJoin {def doJoin(left: RDD[InternalRow],right: RDD[InternalRow],leftKeys: Array[Expression],rightKeys: Array[Expression],joinType: JoinType,condition: Option[Expression],leftFilters: Option[Expression],rightFilters: Option[Expression]): RDD[InternalRow] = {// 选择构建侧和Probe侧val (buildSide, probeSide) = chooseSides(left, right)val (buildKeys, probeKeys) = if (buildSide == BuildSide.LEFT) {(leftKeys, rightKeys)} else {(rightKeys, leftKeys)}// 构建Hash表val buildRDD = buildSide match {case BuildSide.LEFT =>left.mapPartitions(iter => {val keyToRows = new mutable.HashMap[Any, mutable.Buffer[InternalRow]]()iter.foreach(row => {val key = leftKeys.map(_.eval(row)).toArraykeyToRows.getOrElseUpdate(key, new mutable.ArrayBuffer[InternalRow]()) += row})iter ++ keyToRows.values.flatten})case BuildSide.RIGHT =>right.mapPartitions(iter => {val keyToRows = new mutable.HashMap[Any, mutable.Buffer[InternalRow]]()iter.foreach(row => {val key = rightKeys.map(_.eval(row)).toArraykeyToRows.getOrElseUpdate(key, new mutable.ArrayBuffer[InternalRow]()) += row})iter ++ keyToRows.values.flatten})}// Probe阶段val probeRDD = probeSide match {case BuildSide.LEFT =>right.mapPartitions(iter => {val keyToRows = new mutable.HashMap[Any, mutable.Buffer[InternalRow]]()iter.foreach(row => {val key = rightKeys.map(_.eval(row)).toArraykeyToRows.getOrElseUpdate(key, new mutable.ArrayBuffer[InternalRow]()) += row})iter ++ keyToRows.values.flatten})case BuildSide.RIGHT =>left.mapPartitions(iter => {val keyToRows = new mutable.HashMap[Any, mutable.Buffer[InternalRow]]()iter.foreach(row => {val key = leftKeys.map(_.eval(row)).toArraykeyToRows.getOrElseUpdate(key, new mutable.ArrayBuffer[InternalRow]()) += row})iter ++ keyToRows.values.flatten})}// 连接操作probeRDD.join(buildRDD).mapPartitions(iter => {iter.flatMap { case (key, (probeRow, buildRow)) =>// 根据Join类型进行连接操作joinType match {case JoinType.INNER =>if (condition.map(_.eval(probeRow, buildRow)).getOrElse(true)) {Some(InternalRow.fromSeq(probeRow ++ buildRow))} else {None}case JoinType.LEFT =>if (condition.map(_.eval(probeRow, buildRow)).getOrElse(true)) {Some(InternalRow.fromSeq(probeRow ++ buildRow))} else {Some(InternalRow.fromSeq(probeRow ++ Seq.fill(buildRow.length)(null)))}case JoinType.RIGHT =>if (condition.map(_.eval(probeRow, buildRow)).getOrElse(true)) {Some(InternalRow.fromSeq(Seq.fill(probeRow.length)(null) ++ buildRow))} else {Some(InternalRow.fromSeq(Seq.fill(probeRow.length)(null) ++ buildRow))}case JoinType.FULL =>if (condition.map(_.eval(probeRow, buildRow)).getOrElse(true)) {Some(InternalRow.fromSeq(probeRow ++ buildRow))} else {Some(InternalRow.fromSeq(probeRow ++ Seq.fill(buildRow.length)(null)))Some(InternalRow.fromSeq(Seq.fill(probeRow.length)(null) ++ buildRow))}}}})}
}

3. Sort Merge Join实现

Sort Merge Join的实现主要在SortMergeJoin类中。以下是Sort Merge Join的主要实现步骤:

  1. 排序:对两个表按照Join键进行排序
  2. 合并:使用双指针技术合并两个排序后的数据集
  3. 连接操作:根据Join类型进行连接操作
object SortMergeJoin {def doJoin(left: RDD[InternalRow],right: RDD[InternalRow],leftKeys: Array[Expression],rightKeys: Array[Expression],joinType: JoinType,condition: Option[Expression],leftFilters: Option[Expression],rightFilters: Option[Expression]): RDD[InternalRow] = {// 排序val sortedLeft = left.sortBy(row => leftKeys.map(_.eval(row)).toArray)val sortedRight = right.sortBy(row => rightKeys.map(_.eval(row)).toArray)// 合并sortedLeft.zip(sortedRight).mapPartitions(iter => {val leftIter = iter.map(_._1).iteratorval rightIter = iter.map(_._2).iteratorval leftRow = new mutable.ArrayBuffer[InternalRow]()val rightRow = new mutable.ArrayBuffer[InternalRow]()while (leftIter.hasNext && rightIter.hasNext) {val l = leftIter.next()val r = rightIter.next()val lKey = leftKeys.map(_.eval(l)).toArrayval rKey = rightKeys.map(_.eval(r)).toArrayif (lKey < rKey) {leftRow += l} else if (lKey > rKey) {rightRow += r} else {// Join键相等,进行连接操作if (condition.map(_.eval(l, r)).getOrElse(true)) {yield JoinedRow(l, r)}// 处理重复键while (leftIter.hasNext && leftKeys.map(_.eval(leftIter.head)).toArray == lKey) {leftRow += leftIter.next()}while (rightIter.hasNext && rightKeys.map(_.eval(rightIter.head)).toArray == rKey) {rightRow += rightIter.next()}// 生成所有可能的组合for (l <- leftRow; r <- rightRow) {if (condition.map(_.eval(l, r)).getOrElse(true)) {yield JoinedRow(l, r)}}leftRow.clear()rightRow.clear()}}// 处理剩余的行while (leftIter.hasNext) {leftRow += leftIter.next()}while (rightIter.hasNext) {rightRow += rightIter.next()}// 根据Join类型处理剩余的行joinType match {case JoinType.INNER =>// 不需要处理剩余的行case JoinType.LEFT =>for (l <- leftRow) {if (leftFilters.map(_.eval(l)).getOrElse(true)) {yield JoinedRow(l, null)}}case JoinType.RIGHT =>for (r <- rightRow) {if (rightFilters.map(_.eval(r)).getOrElse(true)) {yield JoinedRow(null, r)}}case JoinType.FULL =>for (l <- leftRow) {if (leftFilters.map(_.eval(l)).getOrElse(true)) {yield JoinedRow(l, null)}}for (r <- rightRow) {if (rightFilters.map(_.eval(r)).getOrElse(true)) {yield JoinedRow(null, r)}}}})}
}

总结

本报告详细介绍了SparkSQL中Join的实现方式,包括Broadcast Join、Hash Join(包括Shuffle Hash Join)和Sort Merge Join。通过分析它们的实现原理、工作流程和适用场景,我们可以更好地理解SparkSQL中Join操作的内部机制。
在实际应用中,选择合适的Join策略对于提高SparkSQL查询性能至关重要。根据表的大小、数据分布和内存资源选择合适的Join策略,可以显著提高Join操作的性能。
通过深入理解SparkSQL Join的实现原理,我们可以更好地优化SparkSQL查询,提高大数据处理的效率和性能。

参考资料

  1. Spark SQL join的三种实现方式
  2. Spark SQL Join实现原理
  3. SparkSQL中的三种Join及其具体实现
  4. Spark SQL Performance Tuning Documentation
  5. Spark SQL Join Source Code Analysis
http://www.dtcms.com/wzjs/188925.html

相关文章:

  • 企业网站建设报价怎么在百度打广告
  • 网站中flash banner图片可以删吗北京网站推广助理
  • 贵州网站建设lonwone推广seo是什么意思
  • 专业电容层析成像代做网站友情链接购买网站
  • 现在网络推广有哪些平台关键词seo排名优化软件
  • 汕头微网站网络营销模式
  • 宁波网站建设友情链接
  • 设计网站需要什么条件免费友链平台
  • 会展网站模板济南最新消息
  • 小米wifi设置网址入口网站2023年7月疫情爆发
  • adobe illustrator做网站电商平台开发
  • 南宁专门建网站的公司哪里能买精准客户电话
  • 广州工程建设网站汕头自动seo
  • 网站建设规划大纲站长工具seo优化
  • HTTPS网站做CDN加速网络营销和传统营销的区别
  • 网站未建设完善是什么意思成都网站推广哪家专业
  • 厦门网站建设培训学校竞价代运营外包公司
  • 视频制作软件app手机版免费版东莞seo优化
  • 网站建设岗位所需技能常见的营销策略有哪些
  • 网站流量分布牛排seo系统
  • php可以做网站布局吗朝阳seo排名
  • logo设计思路南昌百度搜索排名优化
  • 网站建设商业计划书郑州优化公司有哪些
  • 伊宁市做网站淘宝付费推广有几种方式
  • jquery qq客服 添加到网站中数据分析师一般一个月多少钱
  • h5开发工程师是做什么的长沙百度快速优化
  • 儿童网站开发制造业中小微企业
  • b站倒过来的网站谁做的google官网注册
  • 没有版权可以做视频网站吗淘宝指数查询官网
  • 淘宝客导购网站网络营销推广8种方法