hive调优系列-3.HQL语法和运行参数层面
HQL语法和运行参数层面,主要跟大家讲讲如果写出高效的HQL,以及如果利用一些控制参数来调优HQL的执行。这是HQL调优的一个大头。
3.1. 查看Hive执行计划
Hive的SQL语句在执行之前需要将SQL语句转换成MapReduce任务,因此需要了解具体的转换过程,可以在SQL语句中输入如下命令查看具体的执行计划。
--查看执行计划,添加extended关键字可以查看更加详细的执行计划
explain [extended] query
explain select department, count(*) as total from student where age >=18 group by department order by total desc limit 3;
3.2. 列裁剪
列裁剪就是在查询时只读取需要的列,分区裁剪就是只读取需要的分区。当列很多或者数据量很大时,如果 select * 或者不指定分区,全列扫描和全表扫描效率都很低。
Hive 在读数据的时候,可以只读取查询中需要用到的列,而忽略其他的列。这样做可以节省读取开销:中间表存储开销和数据整合开销。
set hive.optimize.cp = true; ## 列裁剪,取数只取查询中需要用到的列,默认是true
3.3. 谓词下推
将 SQL 语句中的 where 谓词逻辑都尽可能提前执行,减少下游处理的数据量。对应逻辑优化器是 PredicatePushDown。
set hive.optimize.ppd=true; ## 默认是true
示例程序:
select a.*, b.* from a join b on a.id = b.id where b.age > 20;select a.*, c.* from a join (select * from b where age > 20) c on a.id = c.id;
3.4. 分区裁剪
列裁剪就是在查询时只读取需要的列,分区裁剪就是只读取需要的分区。当列很多或者数据量很大时,如果 select * 或者不指定分区,全列扫描和全表扫描效率都很低。
在查询的过程中只选择需要的分区,可以减少读入的分区数目,减少读入的数据量。
Hive 中与分区裁剪优化相关的则是:
set hive.optimize.pruner=true; ## 默认是true
在 HiveQL 解析阶段对应的则是 ColumnPruner 逻辑优化器。
select * from student where department = "AAAA";
3.5. 合并小文件
如果一个mapreduce job碰到一对小文件作为输入,一个小文件启动一个Task
3.5.1 Map 输入合并
在执行 MapReduce 程序的时候,一般情况是一个文件的一个数据分块需要一个 mapTask 来处理。但是如果数据源是大量的小文件,这样就会启动大量的 mapTask 任务,这样会浪费大量资源。可以将输入的小文件进行合并,从而减少 mapTask 任务数量。
--Map端输入、合并文件之后按照block的大小分割(默认)
set hive.input.format=org.apache.hadoop.hive.io.CombineHiveInputFormat;--Map端输入,不合并
set hive.input.format=org.apache.hadoop.hive.io.HiveInputFormat;
3.5.2 Map/Reduce 输出合并
大量的小文件会给HDFS带来压力,影响处理效率。可以通过合并 Map 和 Reduce 的结果文件来消除影响。
## 是否合并Map输出文件,默认值为true
set hive.merge.mapfiles=true;## 是否合并Reduce端输出文件,默认值为false
set hive.merge.mapredfiles=true;## 合并文件的大小,默认值为256000000
set hive.merge.size.per.task=256000000;## 每个Map最大分割大小
set mapred.max.split.size=256000000;## 一个节点上split的最少值
set mapred.min.split.size.per.node=1; // 服务器节点## 一个机架上split的最少值
set mapred.min.split.size.per.rack=1; // 服务器机架
3.6. 合理设置MapTask并行度
3.6.1 MapReduce中的MapTask的并行度机制
Map数过大:当输入文件特别大,MapTask特别多,每个计算节点分配执行的MapTask都很多,这个时候可以考虑减少MapTask的数量。增大每个MapTask处理的数据量。而且MapTask过多,最终生成的结果文件数也太多。
- Map阶段输出文件太小,产生大量小文件
- 初始化和创建Map的开销很大
Map数太小:当输入文件都很大,任务逻辑复杂,MapTask执行非常慢的时候,可以考虑增加MapTask数,来使得每个MapTask处理的数据量减少,从而提高任务的执行效率。
- 文件处理或查询并行度小,Job执行时间过长
- 大量作业时,容易堵塞集群
在 MapReduce 的编程案例中,我们得知,一个 MapReduce Job 的 MapTask 数量是由输入分片决定的。而输入分片是由 FileInputFormat.getSplit() 决定的。一个输入分片对应一个 MapTask,而输入分片是由三个参数决定的:
参数 | 默认值 | 意义 |
---|---|---|
dfs.blocksize | 128M | HDFS 默认数据块大小 |
mapreduce.input.fileinputformat.split.minsize | 1 | 最小分片大小 (MR) |
mapreduce.input.fileinputformat.split.maxsize | 256M | 最大分片大小 (MR) |
输入分片大小的计算公式是:
## 取三者中间值
long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
默认情况下,输入分片大小和 HDFS 集群默认数据块大小一致,也就是默认一个数据块,启用一个 MapTask 进行处理,这样做的好处是避免了服务器节点之间的数据传输,提高 job 处理效率。
两种经典的控制 MapTask 的个数方案:减少 MapTask 数或者增加 MapTask 数
- 减少 MapTask 数是通过合并小文件来实现,这一点主要是针对数据源
- 增加 MapTask 数可以通过控制上一个 job 的 reduceTask 个数
重点注意:不推荐把这个值进行随意设置! 推荐的方式:使用默认的切块大小即可。如果非要调整,最好是切块的 N 倍数
Task = (N × 0.95) × M
N
:NodeManager 节点数量(集群工作节点总数)0.95
:资源保留系数- 保留 5% 的资源给系统守护进程(如 DataNode、NodeManager 自身)
- 避免节点资源耗尽导致进程崩溃
M
:单个 NodeManager 节点可运行的 最大容器数(Container)- 由 YARN 配置参数决定:
M = min( yarn.nodemanager.resource.cpu-vcores, yarn.nodemanager.resource.memory-mb / 容器内存大小 )
- 由 YARN 配置参数决定:
Task
:集群可同时运行的 最大任务数(每个任务对应一个 YARN Container)
3.6.2 合理控制 MapTask 数量
1、减少 MapTask 数可以通过合并小文件来实现
2、增加 MapTask 数可以通过控制上一个 ReduceTask 默认的 MapTask 个数
计算方式
输入文件总大小:total_size HDFS
设置的数据块大小:dfs_block_size
default_mapper_num = total_size / dfs_block_size
MapReduce 中提供了如下参数来控制 map 任务个数,从字面上看,貌似是可以直接设置 MapTask 个数的样子,但是很遗憾不行,这个参数设置只有在大于 default_mapper_num 的时候,才会生效。
set mapred.map.tasks=10; -- 默认值是2
那如果我们需要减少 MapTask 数量,但是文件大小是固定的,那该怎么办呢?
可以通过 mapred.min.split.size 设置每个任务处理的文件的大小,这个大小只有在大于 dfs_block_size 的时候才会生效
split_size = max(mapred.min.split.size, dfs_block_size)
split_num = total_size / split_size
compute_map_num = Math.min(split_num, Math.max(default_mapper_num, mapred.map.tasks))
这样就可以减少 MapTask 数量了。
总结一下控制 mapper 个数的方法:
- 如果想增加 MapTask 个数,可以设置 mapred.map.tasks 为一个较大的值
- 如果想减少 MapTask 个数,可以设置 mapred.min.split.size 为一个较大的值
- 如果输入是大量小文件,想减少 mapper 个数,可以通过设置 hive.input.format 合并小文件
如果想要调整 mapper 个数,在调整之前,需要确定处理的文件大概大小以及文件的存在形式(是大量小文件,还是单个大文件),然后再设置合适的参数。不能盲目进行暴力设置,不然适得其反。
MapTask 数量与输入文件的 split 数息息相关,在 Hadoop 源码 org.apache.hadoop.mapreduce.lib.input.FileInputFormat 类中可以看到 split 划分的具体逻辑。可以直接通过参数 mapred.map.tasks(默认值 2)来设定 MapTask 数的期望值,但它不一定会生效。
3.7. 合理设置ReduceTask并行度
如果 ReduceTask 数量过多,一个 ReduceTask 会产生一个结果文件,这样就会生成很多小文件,那么如果这些结果文件会作为下一个 Job 的输入,则会出现小文件需要进行合并的问题,而且启动和初始化 ReduceTask 需要耗费资源。
如果 ReduceTask 数量过少,这样一个 ReduceTask 就需要处理大量的数据,并且还有可能会出现数据倾斜的问题,使得整个查询耗时长。默认情况下,Hive 分配的 reducer 个数由下列参数决定:
Hadoop MapReduce 程序中,ReducerTask 个数的设定极大影响执行效率,ReducerTask 数量与输出文件的数量相关。如果 ReducerTask 数太多,会产生大量小文件,对 HDFS 造成压力。如果 ReducerTask 数太少,每个 ReducerTask 要处理很多数据,容易拖慢运行时间或者造成 OOM。这使得 Hive 怎样决定 ReducerTask 个数成为一个关键问题。遗憾的是 Hive 的估计机制很弱,不指定 ReducerTask 个数的情况下,Hive 会猜测确定一个 ReducerTask 个数,基于以下两个设定:
参数1: hive.exec.reducers.bytes.per.reducer (默认为 256M)
参数2: hive.exec.reducers.max (默认为 1009)
参数3: mapreduce.job.reduces (默认值为 -1,表示没有设置,那么就按照以上两个参数进行设置)
ReducerTask 的计算公式为:
N = Math.min(参数2, 总输入数据大小 / 参数1)
可以通过改变上述两个参数的值来控制 ReducerTask 的数量。
也可以通过
set mapred.map.tasks=10;set mapreduce.job.reduces=10;
通常情况下,有必要手动指定 ReduceTask 个数。考虑到 Mapper 阶段的输出数据量通常会比输入有大幅减少,因此即使不设定 ReduceTask 个数,重设参数2 还是必要的。 依据经验,可以将参数2 设定为 M * (0.95 * N)(N为集群中 NodeManager 个数)。一般来说,NodeManager 和 DataNode 的个数是一样的。
3.8. join 优化
join 优化整体原则:
- 优先过滤后再进行Join操作,最大限度的减少参与join的数据量
- 小表join大表,最好启动mapjoin,hive自动启用mapjoin,小表不能超过25M,可以更改
- Join on的条件相同的话,最好放入同一个job,并且join表的排列顺序从小到大:select a., b., c.* from a join b on a.id = b.id join c on a.id = c.id
- 如果多张表做join,如果多个链接条件都相同,会转换成一个job
优先过滤数据
尽量减少每个阶段的数据量,对于分区表能用上分区字段的尽量使用,同时只选择后面需要使用到的列,最大限度的减少参与Join的数据量。
小表join大表原则
- Hive 在 MapReduce 引擎下,多个连续 JOIN 会合并为一个 MR Job(除非发生数据倾斜强制拆分)
- 执行顺序由优化器决定:
即使 SQL 中写为A JOIN B JOIN C
,实际执行顺序可能调整为B JOIN (A JOIN C)
,Hive 优化器会根据表大小统计数据重排顺序(需开启hive.auto.convert.join=true
)
使用相同的连接键
在hive中,当对3个或更多张表进行join时,如果on条件使用相同字段,那么它们会合并为一个MapReduce Job,利用这种特性,可以将相同的join on放入一个job来节省执行时间
尽量原子操作
尽量避免一个SQL包含复杂的逻辑,可以使用中间表来完成复杂的逻辑。
大表Join大表
-
空key过滤:有时join超时是因为某些key对应的数据太多,而相同key对应的数据都会发送到相同的reducer上,从而导致内存不够。此时我们应该仔细分析这些异常的key,很多情况下,这些key对应的数据是异常数据,我们需要在SQL语句中进行过滤。
-
空key转换:有时虽然某个key为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join的结果中,此时我们可以表a中key为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的reducer上
Join 类型 | 触发条件 | 内存风险 | 优化建议 |
---|---|---|---|
MapJoin (Broadcast) | 小表 < hive.mapjoin.smalltable.filesize (默认25MB) | 低 | 自动生效,无须关心左右顺序 |
Bucket-MapJoin | JOIN 列是分桶列,且桶数成倍数关系 | 低 | 分桶表设计优先 |
Common Join | 不符合以上条件的大表 JOIN | 高 | 必须确保左表为小表 |
Skew Join | 存在严重数据倾斜时 | 中 | 开启 hive.optimize.skewjoin=true |
3.8.1 启用 MapJoin
这个优化措施,但凡能用就用!大表 join 小表 小表满足需求:小表数据小于控制条件时
MapJoin 是将 join 双方比较小的表直接分发到各个 map 进程的内存中,在 map 进程中进行 join 操作,这样就不用进行 reduce 步骤,从而提高了速度。只有 join 操作才能启用 MapJoin。
--是否根据输入小表的大小,自动将 reduce 端的 common join 转化为 map join,将小表刷入内存中。
对应逻辑优化器是 MapJoinProcessor
set hive.auto.convert.join = true;--刷入内存表的大小(字节)
set hive.mapjoin.smalltable.filesize = 25000000;--hive 会基于表的 size 自动的将普通 join 转换成 mapjoin
set hive.auto.convert.join.noconditionaltask=true;--多大的表可以自动触发放到内层 LocalTask 中,默认大小 10M
set hive.auto.convert.join.noconditionaltask.size=10000000;
Hive 可以进行多表 Join。Join 操作尤其是 Join 大表的时候代价是非常大的。MapJoin 特别适合大小表 join 的情况。在 Hive join 场景中,一般总有一张相对小的表和一张相对大的表,小表叫 build table,大表叫 probe table。在维度建模数据仓库中,事实表就是 probe table,维度表就是 build table。这种 Join 方式在 map 端直接完成 join 过程,消灭了 reduce,效率很高。而且 MapJoin 还支持非等值连接。
当 Hive 执行 Join 时,需要选择哪个表被流式传输(stream),哪个表被缓存(cache)。Hive 将 JOIN 语句中的最后一个表用于流式传输,因此我们需要确保这个流表在两者之间是最大的。如果要在不同的 key 上 join 更多的表,那么对于每个 join 集,只需在 ON 条件右侧指定较大的表。
也可以手动开启 mapjoin:
--SQL方式,在SQL语句中添加MapJOIN标记(mapjoin hint)
--将小表放到内存中,省去shuffle操作
//在没有开启mapjoin的情况下,执行的是reduceJoin
SELECT /*+ MAPJOIN(smallTable) */ smallTable.key, bigTable.value FROM smallTable JOIN bigTable ON smallTable.key = bigTable.key;
3.8.2 Sort-Merge-Bucket(SMB) Map Join
它是另一种Hive Join优化技术,使用这个技术的前提是所有的表都必须是分桶表(bucket)和分桶排序的(sort)。分桶表的优化!
具体实现:
- 针对参与join的这两张做相同的hash散列,每个桶里面的数据还要排序
- 这两张表的分桶个数要成倍数。
- 开启 SMB join 的开关!
一些常见参数设置:
## 当用户执行bucket map join的时候,发现不能执行时,禁止查询
set hive.enforce.sortmergebucketmapjoin=false;## 如果join的表通过sort merge join的条件,join是否会自动转换为sort merge join
set hive.auto.convert.sortmerge.join=true;## 当两个分桶表 join 时,如果 join on的是分桶字段,小表的分桶数是大表的倍数时,可以利用 mapjoin 来提高效率。
# bucket map join优化,默认值是 false
set hive.optimize.bucketmapjoin=false;## bucket map join 优化,默认值是 false
set hive.optimize.bucketmapjoin.sortedmerge=false;
3.8.3 Join数据倾斜优化(Skew Join)
在编写Join查询语句时,如果确定是由于join出现的数据倾斜,那么请做如下设置:
set hive.skewjoin.key=100000;
set hive.optimize.skewjoin=false;
如果开启了,在join过程中Hive会将计数超过阈值hive.skewjoin.key(默认100000)的倾斜key对应的行临时写进文件中,然后再启动另一个job做map join生成结果。
通过hive.skewjoin.mapjoin.map.tasks参数还可以控制第二个job的mapper数量,默认10000。
set hive.skewjoin.mapjoin.map.tasks=10000;
3.9. CBO优化
join的时候表的顺序的关系:前面的表都会被加载到内存中,后面的表进行磁盘扫描
select a.*, b.*, c.* from a join b on a.id = b.id join c on a.id = c.id;
Hive自0.14.0开始,加入了一项“Cost based Optimizer”来对HQL执行计划进行优化,这个功能通过“hive.cbo.enable”来开启。在Hive 1.1.0之后,这个feature是默认开启的,它可以自动优化HQL中多个Join的顺序,并选择合适的Join算法。
CBO,成本优化器,代价最小的执行计划就是最好的执行计划。传统的数据库,成本优化器做出最优化的执行计划是依据统计信息来计算的。Hive的成本优化器也一样。
Hive在提供最终执行前,优化每个查询的执行逻辑和物理执行计划。这些优化工作是交给底层来完成的。根据查询成本执行进一步的优化,从而产生潜在的不同决策:如何排序连接,执行哪种类型的连接,并行度等等。
要使用基于成本的优化(也称为CBO),请在查询开始设置以下参数:
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
3.10. 怎样做笛卡尔积
当Hive设定为严格模式(hive.mapred.mode=strict)时,不允许在HQL语句中出现笛卡尔积,这实际说明了Hive对笛卡尔积支持较弱。因为找不到Join key,Hive只能使用1个reducer来完成笛卡尔积。
当然也可以使用limit的办法来减少某个表参与join的数据量,但对于需要笛卡尔积语义的需求来说,经常是一个大表和一个小表的Join操作,结果仍然很大(以至于无法用单机处理),这时MapJoin才是最好的解决办法。MapJoin,顾名思义,会在Map端完成Join操作。这需要将Join操作的一个或多个表完全读入内存。
PS:MapJoin在子查询中可能出现未知BUG。在大表和小表做笛卡尔积时,规避笛卡尔积的方法是,给Join添加一个Join key,原理很简单:将小表扩充一列join key,并将小表的条目复制数倍,join key各不相同;将大表扩充一列join key为随机数。
精髓就在于复制几倍,最后就有几个reduce来做,而且大表的数据是前面小表扩张key值范围里面随机出来的,所以复制了几倍n,就相当于这个随机范围就有多大n,那么相应的,大表的数据就被随机的分为了n份。并且最后处理所用的reduce数量也是n,而且也不会出现数据倾斜。