Hive 知识点梳理
一、HSQL优化
以下为HSQL优化的思路,分三部分
第一部分:减少IO
- 列裁剪、分区裁剪(避免 select * 写法),读一张表,有很多列,但我实际上只用到其中一列,那么其他列是不用读取的。
- 谓词下推,就是把一些过滤条件直接推到读取数据这一部分,这样可以尽量少的读取数据
- 使用分区表,在 Hive 中,分区表是一种将数据按照某个或某些列进行分区的存储结构。通过使用分区表,可以将数据按照不同的查询需求存储在不同的物理设备上,从而提高查询效率。
- 尽量原子化操作:
尽量避免一个 SQL 包含复杂逻辑,可以使用中间表(或 with as)来完成复杂的逻辑
第二部分:关联优化
- 大小表关联,小表采用 mapjoin 写法(/+ mapjoin(小表名)/) ,mapjoin 是将小表格先读取到内存中,然后用大表去匹配内存中小表的数据。
- 使用分桶表,进行表格连接的查询加速;加快表格的抽样查询。
- 使用相同的连接键
当对 3 个或更多表进行 join 连接时,若每个 on 子句都使用相同的连接键的话,那么只会产生一个 MapReduce job。
第三部分:执行优化
-并行执行优化
Hive 会将一个查询转化成一个或者多个阶段。这样的阶段可以是 MapReduce 阶段、抽样阶段、合并阶段、limit阶段。或者 Hive 执行过程中可能需要的其他阶段.默认情况下,Hive 一次只会执行一个阶段。不过,某个特定的 job 可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个 job 的执行时间缩短。若有更多的阶段可以并行执行,则 job 可能就越快完成。
通过设置参数 hive.exec.parallel 值为 true,就可以开启并发执行。在共享集群中,需要注意下,若 job 中并行阶段增多,则集群利用率就会增加。
set hive.exec.parallel = true;--打开任务并行执行set hive.exec.parallel.thread.number = 16;--同一个 sql 允许最大并行度,默认为 8
当然得是在系统资源比较空闲的时候才有优势,否则没资源,并行也起不来。
- 选择合适的文件格式
Hive 支持多种文件格式,如 ORC、Parquet 等。不同的文件格式适用于不同的场景,需要根据具体的查询需求进行选择。例如:若需要高读取速度,可以选择 ORC(列存储);若需要节省存储空间,可以选择 Parquet(行存储)。 - 调整 MapReduce 任务数量
set mapreduce.job.maps = 10;--设置 Map 任务的个数为 10。
set mapreduce.job.reduces = 5;--设置 Reduce 任务的个数为 5。
set hive.exec.reducers.bytes.per.reducer = 100000000;--设置每个 Reduce 任务处理的数据量为 100 MB。
在 Hive 中,查询语句的执行是通过 MapReduce 任务进行的。可以通过调整 MapReduce 任务的数量来优化查询性能。一般来说,任务数量越多,查询速度越快,但同时也会消耗更多的计算资源。
- JVM 重用
set mapred.job.reuse.jvm.num.tasks = 10;--10 为重用个数
二、HIVE 数据倾斜
简单来说两部分:
- 产生数据倾斜的原因
- 解决数据倾斜的方法
数据侵袭产生的原因
- key 值分布不均匀
-数据中存在大量相同 key 值
-数据中的 key 值存在大量异常值和空值 - 业务数据本身特性
-例如某个分公司订单量大幅提升几十倍甚至几百倍,对该分公司的订单统计聚合时,容易发生数据倾斜。 - 某些 SQL 语句本身导致数据倾斜
-group by 维度过少,某个值数量较多
-count distinct 特殊值较多
-join 一个 key 值集中的小表
-两个表中关联字段存在大量空值或关联字段数据类型不一致
解决数据倾斜的方法
- group by 导致数据倾斜
-两阶段聚合,放粗粒度:需要聚合的 key 加一个随机数的前后缀,按照加上前后缀的 hey 分组聚合一次,之后再按照原始的 key 分组聚合一次。
详细过程:生成的查询计划有两个 MapReduce 任务。
在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中,每个 reduce 做部分聚合操作,并输出结果。相同的 Group By Key 有可能分发到不同的 reduce 中,从而达到负载均衡的目的;
在第二个 MapReduce 任务再根据预处理的数据结果按照 Group By Key 分不到 reduce 中(这个过程可以保证相同的 Group By Key 分不到同一个 reduce 中),最后完成最终的聚合操作。
--假设 key = 水果
select count(substr(a.key,1,2)) as key
from(selectconcat(key,'_',cast(round(10*rand())+1 as string)) tmpfrom tablegroup by tmp
)a
group by key;
- count(distinct) 特殊值较多
-采用 count() group by 的方式来替换 count(distinct)
--count(distinct)写法
select count(distinct a) from test;
--count()group by的方式来替换 count(distinct)写法
select count(x.a)
from ( select a from test group by a ) x ;
- join 操作
-小表和大表 join
reduce join 改为 map join:将较小 RDD 中的数据直接通过 collect 算子拉取到 Driver 端的内存中来,然后对其创建一个 Broadcast 变量;接着对另外 RDD 执行 map 类算子,在算子函数内,从 Broadcast 变量中获取较小 RDD 的全量数据,与当前 RDD 的每一条数据按照连接 key 进行比对,若连接 key 相同的话,则将两个 RDD 的数据用你需要的方式连接起来。
设置自动选择Mapjoin
set hive.auto.convert.join = true;--默认为 true
reduce join:先讲所有相同的 key 对应的 values,汇聚到一个 task 中,然后再进行 join。
map reduce:broadcast 出去那个小表的数据以后,就会在每个 executor 的 block manager 中都驻留一份 +map 算子来实现与 join 同样的效果。不会发生 shuffer,从根本上杜绝了 join 操作可能导致的数据倾斜的问题;
-大表 join 大表
将有大表中倾斜 key 对应的数据集单独抽取出来加上随机前缀,另外一个 RDD 每条数据分别与随机前缀结合形成新的 RDD(笛卡尔积,相当于将其数据增到原来的 N 倍,N 即为随机前缀的总个数)然后将二者 join 后去掉前缀,然后将不包含倾斜 key 的剩余数据进行 join。最后将两次 join 的结果集通过 union 合并,即可得到全部 join 结果。
空值或数据类型不一致所致
- 解决空值导致数据倾斜
--1.在查询的时候,过滤掉所有为 NULL 的数据,比如:
SELECT *
FROM log a
JOIN bmw_users b ON a.user_id IS NOT NULL
AND a.user_id = b.user_id
UNION ALL
SELECT *
FROM log a
WHERE a.user_id IS NULL;
--2.查询出空值并给其赋上随机数,避免了 key 值为空(数据倾斜中常用的一种技巧)
SELECT *
FROM log a
LEFT JOIN bwm_users b
ON (CASE WHEN a.user_id IS NULL THEN concat('dp_hive',rand()) ELSE a.user_id END) = b.user_id;
- 关联数据类型不一致产生数据倾斜
一张表 s8_log,每个商品一条记录,要和商品表关联。s8_log 中有字符串商品 id,也有数字的商品 id。字符串商品 id 类型是 string 的,但商品中的数字 id 是 bigint 的。问题的原因是把 s8_log 的商品 id 做 Hash (数字的 Hash 值为其本身,相同的字符串的 Hash 也不同)来分配 Reducer,所以相同字符串 id 的 s8_log,都到一个 Reducer 上了。
--把数字类型转换成字符串类型
SELECT *
FROM s8_log a
LEFT JOIN R_AUCTION_AUTIONS b ON a.auction_id = CAST(b.auction_id AS string);
- 优化 in/exists
hive1.2.1 版本也支持 in/exists 操作,但还是推荐使用 hive 的一个高效替代方案:left semi join。
LEFT SEMI JOIN (左半连接) 是 IN/EXISTS 子查询的一种高效的实现。
--in/exists
SELECT a.key,b.value
FROM a
WHERE a.key IN (SELECT b.key FROM b);--left semi join
SELECT a.key,b.value
FROM a
LEFT SEMI JOIN b ON a.key = b.key;
-
- left semi join 的限制是: join 子句中右边的表只能在 on 子句中设置过滤条件,在 where 子句、select 子句或其他地方过滤都不行。
-left semi join 是只传递表的 join key 给 map 阶段,因此 left semi join 中最后 select 的结果只许出现在左表。
-因为 left semi join 是 in(keySet) 的关系,遇到游标重复记录,左表会跳过,而 join 则会一直遍历。这就导致游标有重复值的情况下 left semi join 只产生一条,join 会产生多条,也会导致 left semi join 的性能更高。
- left semi join 的限制是: join 子句中右边的表只能在 on 子句中设置过滤条件,在 where 子句、select 子句或其他地方过滤都不行。
- 排序选择
cluster by:对同一个字段分桶并排序,不能和 sort by 连用;
distribute by + sort by:分桶,保证同一字段值只存在一个结果文件中,结合 sort by 保证每个 reduceTask 结果有序;
sort by:单机排序,单个 reduce 结果有序;
order by:全局排序,缺陷是只能使用一个 reduce;
三、Hive 内部表和外部表的区别
- 存储:外部表数据由 HDFS 管理;内部表数据由 hive 自身管理
- 存储:外部表数据存储位置由自己指定(没有指定 location 则默认地址下新建)内部表数据存储在 hive.metastore.warehouse.dir(默认在/uer/hive/warehouse)
- 创建:被 external 修饰的就是外部表;没有被修饰是内部表
- 删除:删除外部表仅仅删除元数据;删除内部表会删除元数据和存储数据
四、Hive 与传统数据库的区别
- 存储位置:Hive 数据存储在 HDFS 上;数据库保存在块设备或本地文件系统。
- 数据更新:Hive 不建议对数据改写;数据库通常需要经常修改。
- 执行引擎:Hive 通过 MapReduce 来实现;数据库用自己的执行引擎。
- 执行速度:Hive 执行延迟高,但它数据规模远超数据库处理能力时,Hive 的并行计算能力就体现优势了;数据库执行延迟较低。
- 数据规模:Hive 大规模的数据计算;数据库能支持的数据规模较小。
- 扩展性:Hive 建立在 Hadoop 上,随 Hadoop 的扩展性;数据库由于 ACID 语义的严格限制,扩展有限。
五、row_number(),rank() 和 dense_rank() 的区别
- row_number():根据查询结果的顺序计算排序,多用于分页查询。(如:1,2,3,4…)
- rank():排序相同时序号重复,总序数不变。(如:1,2,2,4…)
- dense_rank():排序相同时序号重复,总序数减少。(如:1,2,2,3…)
tips:order 排序时默认为asc,asc-升序/desc-降序
六、UDF、UDAF、UDTF 的区别
- UDF:User-Defined-Function,用户自定义函数,数据是一进一出,功能类似于大多数属性函数或者字符串处理函数;
- UDAF:User-Defined Aggregation Function,用户自定义聚合函数,数据是多进一出,功能类似于 count/max/min;
- UDTF:User-Defined Table-Generating Functions,用户自定义表生成函数,数据是一进多出,功能类似于 lateral view explore()
七、怎么自定义 UDF、UDAF、UDTF 函数
此处涉及到 java 或 python 编程语言
- 自定义 UDF 函数
-继承 org.apache.hadoop.hive.ql.UDF 函数;
-重写 evalueate 方法,evaluate 方法支持重载。 - 自定义 UDAF 函数
-必须继承 org.apache.hadoop.hive.ql.exec.UDAF(函数类继承)和 org.apache.hadoop.hive.ql.exec.UDAFEvaluator(内部类 Evaluator 实现 UDAFEvaluator 接口);
-重写 Evaluator 方法时需要实现 init、iterate、terminatePartial、merge、terminate 这几个函数:
init():类似于构造函数,用于 UDAF 的初始化。
iterate():接收传入的参数,并进行内部的轮转,返回 boolean。
terminatePartial():无参数,其为 iterate 函数轮转结束后,返回轮转数据,类似于 hadoop 的 Combiner。
merge():接收 terminatePartial 的返回结果,进行数据 merge 操作,其返回类型为 boolean。
terminate():返回最终的聚集函数结果。 - 自定义 UDTF 函数
-继承 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF 函数;
-重写实现 initialize,process,close 三个方法.
–1 UDTF 首先会调用 initialize 方法,此方法返回 UDTF 的返回行的信息(返回个数,类型).
–2 初始化完成后,会调用 process 方法,真正的处理过程在 process 函数中,在 process 中,每一次 forward() 调用产生一行;若产生多列可以将多个列的值放在一个数组中,然后将该数组传入到 froward() 函数。
–3 最后 close() 方法调用,对需要清理的方法进行清理。
八、Hive 的窗口函数扩展
over():指定分析函数工作的数据窗口大小随行变化(跟在聚合函数 后面,只对聚合函数有效)
current row 当前行
n preceding 往前 n 行数据
n following 往后 n 行数据
unbounded:unbounded preceding 从前面开始 |--->unbounded following 知道重点 --->|
lag(col,n) 往前第 n 行数据
lead(col,n) 往后第 n 行数据
ntile(n) 把有序分区中的行分化到 n 个 数据组中,各组的编号从 1 开始,ntile 会返回每行所属的组编号(n 为 int 类型)