当前位置: 首页 > news >正文

hive调优系列-3.HQL语法和运行参数层面

HQL语法和运行参数层面,主要跟大家讲讲如果写出高效的HQL,以及如果利用一些控制参数来调优HQL的执行。这是HQL调优的一个大头。

3.1. 查看Hive执行计划

Hive的SQL语句在执行之前需要将SQL语句转换成MapReduce任务,因此需要了解具体的转换过程,可以在SQL语句中输入如下命令查看具体的执行计划。

--查看执行计划,添加extended关键字可以查看更加详细的执行计划
explain [extended] query 
explain select department, count(*) as total from student where age >=18 group by department order by total desc limit 3;

3.2. 列裁剪

列裁剪就是在查询时只读取需要的列,分区裁剪就是只读取需要的分区。当列很多或者数据量很大时,如果 select * 或者不指定分区,全列扫描和全表扫描效率都很低。

Hive 在读数据的时候,可以只读取查询中需要用到的列,而忽略其他的列。这样做可以节省读取开销:中间表存储开销和数据整合开销。

set hive.optimize.cp = true;  ## 列裁剪,取数只取查询中需要用到的列,默认是true

3.3. 谓词下推

将 SQL 语句中的 where 谓词逻辑都尽可能提前执行,减少下游处理的数据量。对应逻辑优化器是 PredicatePushDown。

set hive.optimize.ppd=true;  ## 默认是true

示例程序:

select a.*, b.* from a join b on a.id = b.id where b.age > 20;select a.*, c.* from a join (select * from b where age > 20) c on a.id = c.id;

3.4. 分区裁剪

列裁剪就是在查询时只读取需要的列,分区裁剪就是只读取需要的分区。当列很多或者数据量很大时,如果 select * 或者不指定分区,全列扫描和全表扫描效率都很低。

在查询的过程中只选择需要的分区,可以减少读入的分区数目,减少读入的数据量。

Hive 中与分区裁剪优化相关的则是:

set hive.optimize.pruner=true; ## 默认是true

在 HiveQL 解析阶段对应的则是 ColumnPruner 逻辑优化器。

select * from student where department = "AAAA";

3.5. 合并小文件

如果一个mapreduce job碰到一对小文件作为输入,一个小文件启动一个Task

3.5.1 Map 输入合并

在执行 MapReduce 程序的时候,一般情况是一个文件的一个数据分块需要一个 mapTask 来处理。但是如果数据源是大量的小文件,这样就会启动大量的 mapTask 任务,这样会浪费大量资源。可以将输入的小文件进行合并,从而减少 mapTask 任务数量。

--Map端输入、合并文件之后按照block的大小分割(默认)
set hive.input.format=org.apache.hadoop.hive.io.CombineHiveInputFormat;--Map端输入,不合并
set hive.input.format=org.apache.hadoop.hive.io.HiveInputFormat;

3.5.2 Map/Reduce 输出合并

大量的小文件会给HDFS带来压力,影响处理效率。可以通过合并  Map 和 Reduce 的结果文件来消除影响。

## 是否合并Map输出文件,默认值为true
set hive.merge.mapfiles=true;## 是否合并Reduce端输出文件,默认值为false
set hive.merge.mapredfiles=true;## 合并文件的大小,默认值为256000000
set hive.merge.size.per.task=256000000;## 每个Map最大分割大小
set mapred.max.split.size=256000000;## 一个节点上split的最少值
set mapred.min.split.size.per.node=1; // 服务器节点## 一个机架上split的最少值
set mapred.min.split.size.per.rack=1; // 服务器机架

3.6. 合理设置MapTask并行度

3.6.1 MapReduce中的MapTask的并行度机制

Map数过大:当输入文件特别大,MapTask特别多,每个计算节点分配执行的MapTask都很多,这个时候可以考虑减少MapTask的数量。增大每个MapTask处理的数据量。而且MapTask过多,最终生成的结果文件数也太多。

  1. Map阶段输出文件太小,产生大量小文件
  2. 初始化和创建Map的开销很大

Map数太小:当输入文件都很大,任务逻辑复杂,MapTask执行非常慢的时候,可以考虑增加MapTask数,来使得每个MapTask处理的数据量减少,从而提高任务的执行效率。

  1. 文件处理或查询并行度小,Job执行时间过长
  2. 大量作业时,容易堵塞集群

在 MapReduce 的编程案例中,我们得知,一个 MapReduce Job 的 MapTask 数量是由输入分片决定的。而输入分片是由 FileInputFormat.getSplit() 决定的。一个输入分片对应一个 MapTask,而输入分片是由三个参数决定的:

参数默认值意义
dfs.blocksize128MHDFS 默认数据块大小
mapreduce.input.fileinputformat.split.minsize1最小分片大小 (MR)
mapreduce.input.fileinputformat.split.maxsize256M最大分片大小 (MR)

输入分片大小的计算公式是:

## 取三者中间值
long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))

默认情况下,输入分片大小和 HDFS 集群默认数据块大小一致,也就是默认一个数据块,启用一个 MapTask 进行处理,这样做的好处是避免了服务器节点之间的数据传输,提高 job 处理效率。

两种经典的控制 MapTask 的个数方案:减少 MapTask 数或者增加 MapTask 数

  1. 减少 MapTask 数是通过合并小文件来实现,这一点主要是针对数据源
  2. 增加 MapTask 数可以通过控制上一个 job 的 reduceTask 个数

重点注意:不推荐把这个值进行随意设置! 推荐的方式:使用默认的切块大小即可。如果非要调整,最好是切块的 N 倍数

Task = (N × 0.95) × M

  • N:NodeManager 节点数量(集群工作节点总数)
  • 0.95资源保留系数
    • 保留 5% 的资源给系统守护进程(如 DataNode、NodeManager 自身)
    • 避免节点资源耗尽导致进程崩溃
  • M:单个 NodeManager 节点可运行的 最大容器数(Container)
    • 由 YARN 配置参数决定:

      M = min( yarn.nodemanager.resource.cpu-vcores, yarn.nodemanager.resource.memory-mb / 容器内存大小 )

  • Task:集群可同时运行的 最大任务数(每个任务对应一个 YARN Container)

3.6.2 合理控制 MapTask 数量

1、减少 MapTask 数可以通过合并小文件来实现

2、增加 MapTask 数可以通过控制上一个 ReduceTask 默认的 MapTask 个数

计算方式

输入文件总大小:total_size HDFS

设置的数据块大小:dfs_block_size

default_mapper_num = total_size / dfs_block_size

MapReduce 中提供了如下参数来控制 map 任务个数,从字面上看,貌似是可以直接设置 MapTask 个数的样子,但是很遗憾不行,这个参数设置只有在大于 default_mapper_num 的时候,才会生效。

set mapred.map.tasks=10; -- 默认值是2

那如果我们需要减少 MapTask 数量,但是文件大小是固定的,那该怎么办呢?

可以通过 mapred.min.split.size 设置每个任务处理的文件的大小,这个大小只有在大于 dfs_block_size 的时候才会生效

split_size = max(mapred.min.split.size, dfs_block_size)

split_num = total_size / split_size

compute_map_num = Math.min(split_num, Math.max(default_mapper_num, mapred.map.tasks))

这样就可以减少 MapTask 数量了。

总结一下控制 mapper 个数的方法:

  1. 如果想增加 MapTask 个数,可以设置 mapred.map.tasks 为一个较大的值
  2. 如果想减少 MapTask 个数,可以设置 mapred.min.split.size 为一个较大的值
  3. 如果输入是大量小文件,想减少 mapper 个数,可以通过设置 hive.input.format 合并小文件

如果想要调整 mapper 个数,在调整之前,需要确定处理的文件大概大小以及文件的存在形式(是大量小文件,还是单个大文件),然后再设置合适的参数。不能盲目进行暴力设置,不然适得其反。

MapTask 数量与输入文件的 split 数息息相关,在 Hadoop 源码 org.apache.hadoop.mapreduce.lib.input.FileInputFormat 类中可以看到 split 划分的具体逻辑。可以直接通过参数 mapred.map.tasks(默认值 2)来设定 MapTask 数的期望值,但它不一定会生效。

3.7. 合理设置ReduceTask并行度

如果 ReduceTask 数量过多,一个 ReduceTask 会产生一个结果文件,这样就会生成很多小文件,那么如果这些结果文件会作为下一个 Job 的输入,则会出现小文件需要进行合并的问题,而且启动和初始化 ReduceTask 需要耗费资源。

如果 ReduceTask 数量过少,这样一个 ReduceTask 就需要处理大量的数据,并且还有可能会出现数据倾斜的问题,使得整个查询耗时长。默认情况下,Hive 分配的 reducer 个数由下列参数决定:

Hadoop MapReduce 程序中,ReducerTask 个数的设定极大影响执行效率,ReducerTask 数量与输出文件的数量相关。如果 ReducerTask 数太多,会产生大量小文件,对 HDFS 造成压力。如果 ReducerTask 数太少,每个 ReducerTask 要处理很多数据,容易拖慢运行时间或者造成 OOM。这使得 Hive 怎样决定 ReducerTask 个数成为一个关键问题。遗憾的是 Hive 的估计机制很弱,不指定 ReducerTask 个数的情况下,Hive 会猜测确定一个 ReducerTask 个数,基于以下两个设定:

参数1: hive.exec.reducers.bytes.per.reducer (默认为 256M)

参数2: hive.exec.reducers.max (默认为 1009)

参数3: mapreduce.job.reduces (默认值为 -1,表示没有设置,那么就按照以上两个参数进行设置)

ReducerTask 的计算公式为:

N = Math.min(参数2, 总输入数据大小 / 参数1)

可以通过改变上述两个参数的值来控制 ReducerTask 的数量。

也可以通过

set mapred.map.tasks=10;set mapreduce.job.reduces=10;

通常情况下,有必要手动指定 ReduceTask 个数。考虑到 Mapper 阶段的输出数据量通常会比输入有大幅减少,因此即使不设定 ReduceTask 个数,重设参数2 还是必要的。 依据经验,可以将参数2 设定为 M * (0.95 * N)(N为集群中 NodeManager 个数)。一般来说,NodeManager 和 DataNode 的个数是一样的。

3.8. join 优化

join 优化整体原则:

  1. 优先过滤后再进行Join操作,最大限度的减少参与join的数据量
  2. 小表join大表,最好启动mapjoin,hive自动启用mapjoin,小表不能超过25M,可以更改
  3. Join on的条件相同的话,最好放入同一个job,并且join表的排列顺序从小到大:select a., b., c.* from a join b on a.id = b.id join c on a.id = c.id
  4. 如果多张表做join,如果多个链接条件都相同,会转换成一个job

优先过滤数据

尽量减少每个阶段的数据量,对于分区表能用上分区字段的尽量使用,同时只选择后面需要使用到的列,最大限度的减少参与Join的数据量。

小表join大表原则

  • Hive 在 MapReduce 引擎下,多个连续 JOIN 会合并为一个 MR Job(除非发生数据倾斜强制拆分)
  • 执行顺序由优化器决定
    即使 SQL 中写为 A JOIN B JOIN C,实际执行顺序可能调整为 B JOIN (A JOIN C),Hive 优化器会根据表大小统计数据重排顺序(需开启 hive.auto.convert.join=true

使用相同的连接键

在hive中,当对3个或更多张表进行join时,如果on条件使用相同字段,那么它们会合并为一个MapReduce Job,利用这种特性,可以将相同的join on放入一个job来节省执行时间

尽量原子操作

尽量避免一个SQL包含复杂的逻辑,可以使用中间表来完成复杂的逻辑。

大表Join大表

  1. 空key过滤:有时join超时是因为某些key对应的数据太多,而相同key对应的数据都会发送到相同的reducer上,从而导致内存不够。此时我们应该仔细分析这些异常的key,很多情况下,这些key对应的数据是异常数据,我们需要在SQL语句中进行过滤。

  2. 空key转换:有时虽然某个key为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join的结果中,此时我们可以表a中key为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的reducer上

Join 类型触发条件内存风险优化建议
MapJoin (Broadcast)小表 < hive.mapjoin.smalltable.filesize (默认25MB)自动生效,无须关心左右顺序
Bucket-MapJoinJOIN 列是分桶列,且桶数成倍数关系分桶表设计优先
Common Join不符合以上条件的大表 JOIN必须确保左表为小表
Skew Join存在严重数据倾斜时开启 hive.optimize.skewjoin=true

3.8.1 启用 MapJoin

这个优化措施,但凡能用就用!大表 join 小表 小表满足需求:小表数据小于控制条件时

MapJoin 是将 join 双方比较小的表直接分发到各个 map 进程的内存中,在 map 进程中进行 join 操作,这样就不用进行 reduce 步骤,从而提高了速度。只有 join 操作才能启用 MapJoin。

--是否根据输入小表的大小,自动将 reduce 端的 common join 转化为 map join,将小表刷入内存中。
对应逻辑优化器是 MapJoinProcessor
set hive.auto.convert.join = true;--刷入内存表的大小(字节)
set hive.mapjoin.smalltable.filesize = 25000000;--hive 会基于表的 size 自动的将普通 join 转换成 mapjoin
set hive.auto.convert.join.noconditionaltask=true;--多大的表可以自动触发放到内层 LocalTask 中,默认大小 10M
set hive.auto.convert.join.noconditionaltask.size=10000000;

Hive 可以进行多表 Join。Join 操作尤其是 Join 大表的时候代价是非常大的。MapJoin 特别适合大小表 join 的情况。在 Hive join 场景中,一般总有一张相对小的表和一张相对大的表,小表叫 build table,大表叫 probe table。在维度建模数据仓库中,事实表就是 probe table,维度表就是 build table。这种 Join 方式在 map 端直接完成 join 过程,消灭了 reduce,效率很高。而且 MapJoin 还支持非等值连接。

当 Hive 执行 Join 时,需要选择哪个表被流式传输(stream),哪个表被缓存(cache)。Hive 将 JOIN 语句中的最后一个表用于流式传输,因此我们需要确保这个流表在两者之间是最大的。如果要在不同的 key 上 join 更多的表,那么对于每个 join 集,只需在 ON 条件右侧指定较大的表。

也可以手动开启 mapjoin:

--SQL方式,在SQL语句中添加MapJOIN标记(mapjoin hint)
--将小表放到内存中,省去shuffle操作
//在没有开启mapjoin的情况下,执行的是reduceJoin
SELECT /*+ MAPJOIN(smallTable) */ smallTable.key, bigTable.value FROM smallTable JOIN bigTable ON smallTable.key = bigTable.key;

3.8.2 Sort-Merge-Bucket(SMB) Map Join

它是另一种Hive Join优化技术,使用这个技术的前提是所有的表都必须是分桶表(bucket)和分桶排序的(sort)。分桶表的优化!

具体实现:

  1. 针对参与join的这两张做相同的hash散列,每个桶里面的数据还要排序
  2. 这两张表的分桶个数要成倍数。
  3. 开启 SMB join 的开关!

一些常见参数设置:

## 当用户执行bucket map join的时候,发现不能执行时,禁止查询
set hive.enforce.sortmergebucketmapjoin=false;## 如果join的表通过sort merge join的条件,join是否会自动转换为sort merge join
set hive.auto.convert.sortmerge.join=true;## 当两个分桶表 join 时,如果 join on的是分桶字段,小表的分桶数是大表的倍数时,可以利用 mapjoin 来提高效率。
# bucket map join优化,默认值是 false
set hive.optimize.bucketmapjoin=false;## bucket map join 优化,默认值是 false
set hive.optimize.bucketmapjoin.sortedmerge=false;

3.8.3 Join数据倾斜优化(Skew Join)

在编写Join查询语句时,如果确定是由于join出现的数据倾斜,那么请做如下设置:

set hive.skewjoin.key=100000;
set hive.optimize.skewjoin=false;

如果开启了,在join过程中Hive会将计数超过阈值hive.skewjoin.key(默认100000)的倾斜key对应的行临时写进文件中,然后再启动另一个job做map join生成结果。

通过hive.skewjoin.mapjoin.map.tasks参数还可以控制第二个job的mapper数量,默认10000。

set hive.skewjoin.mapjoin.map.tasks=10000;

3.9. CBO优化

join的时候表的顺序的关系:前面的表都会被加载到内存中,后面的表进行磁盘扫描

select a.*, b.*, c.* from a join b on a.id = b.id join c on a.id = c.id;

Hive自0.14.0开始,加入了一项“Cost based Optimizer”来对HQL执行计划进行优化,这个功能通过“hive.cbo.enable”来开启。在Hive 1.1.0之后,这个feature是默认开启的,它可以自动优化HQL中多个Join的顺序,并选择合适的Join算法。

CBO,成本优化器,代价最小的执行计划就是最好的执行计划。传统的数据库,成本优化器做出最优化的执行计划是依据统计信息来计算的。Hive的成本优化器也一样。

Hive在提供最终执行前,优化每个查询的执行逻辑和物理执行计划。这些优化工作是交给底层来完成的。根据查询成本执行进一步的优化,从而产生潜在的不同决策:如何排序连接,执行哪种类型的连接,并行度等等。

要使用基于成本的优化(也称为CBO),请在查询开始设置以下参数:

set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;

3.10. 怎样做笛卡尔积

当Hive设定为严格模式(hive.mapred.mode=strict)时,不允许在HQL语句中出现笛卡尔积,这实际说明了Hive对笛卡尔积支持较弱。因为找不到Join key,Hive只能使用1个reducer来完成笛卡尔积。

当然也可以使用limit的办法来减少某个表参与join的数据量,但对于需要笛卡尔积语义的需求来说,经常是一个大表和一个小表的Join操作,结果仍然很大(以至于无法用单机处理),这时MapJoin才是最好的解决办法。MapJoin,顾名思义,会在Map端完成Join操作。这需要将Join操作的一个或多个表完全读入内存。

PS:MapJoin在子查询中可能出现未知BUG。在大表和小表做笛卡尔积时,规避笛卡尔积的方法是,给Join添加一个Join key,原理很简单:将小表扩充一列join key,并将小表的条目复制数倍,join key各不相同;将大表扩充一列join key为随机数。

精髓就在于复制几倍,最后就有几个reduce来做,而且大表的数据是前面小表扩张key值范围里面随机出来的,所以复制了几倍n,就相当于这个随机范围就有多大n,那么相应的,大表的数据就被随机的分为了n份。并且最后处理所用的reduce数量也是n,而且也不会出现数据倾斜。

http://www.dtcms.com/a/393802.html

相关文章:

  • 计算机网络学习(三、数据链路层)
  • Refresh keys changed: [] 2023.0.3.3 问题排查
  • 高并发内存池(二):三层缓存的整体框架设计
  • Android音视频编解码全流程之Extractor
  • 基于 @antv/x6 实现流程图
  • markdown 绘制流程图
  • Spark专题-第二部分:Spark SQL 入门(5)-算子介绍-Join
  • 平替Jira,推荐一款国产开源免费的项目管理工具-Kanass
  • ssh不用版本管理器为多个服务器添加密钥
  • Windows Docker Desktop 实战:大模型存入 docker-desktop 实例 home 目录并与 Dify 联动运行指南
  • linux驱动开发笔记
  • 阿里云与腾讯云产品操作与体验:云平台运维实战技术解析
  • 深入了解linux网络—— 网络基础
  • leetcode3 哈希
  • Spring AI 整合OpenAI 聊天、做图
  • 阿里Motionshop人物角色替换成3D虚拟形象
  • C语言自学--字符函数和字符串函数
  • spring-boot--邮箱验证码发送--spring-boot-starter-mail
  • 3ds Max 2026安装教程(附安装包)3ds Max 2026下载详细安装图文教程
  • Genie 2:Google DeepMind 推出的基础世界模型,单张图生成 1分钟可玩 3D 世界
  • LeetCode 104. 二叉树的最大深度
  • 欧拉角描述相机的运动
  • Unity2D-Spriteshape
  • 打工人日报#20250921
  • Coolmuster Android Assistant:Windows系统下的Android设备管理专家
  • Android 的多进程机制 (Android Multi-Process Model)
  • 2025研究生数学建模通用神经网络处理器下的核内调度问题草案
  • Spring Boot 4 新特性详解:5大核心更新助力企业级开发
  • 计算机网络经典问题透视:网络利用率和网络时延之间,究竟存在着怎样一种“爱恨交织”的关系?我们梦寐以求的100%网络利用率,在现实世界中真的能够实现吗
  • requests 和 lxml 库的xpath实现