hive窗口函数与自定义函数
1.Window语法
假设我们有如下数据,数据第一列为时间,第二列为类别,第三列为金额:
#data.txt
19700109 A 200
19700525 A 100
19700813 A 80
19700329 A 60
19700516 B 100
19700305 B 90
19700201 B 80
19700723 B 70#在Hive中创建表,并将数据加载到表中
CREATE TABLE window_tbl(dt string,catagory string,amount int
) row format delimited fields terminated by '\t';#加载数据
load data inpath '/data.txt' into table window_tbl;
现在我们需要对以上数据每种类中的价格进行升序排序并排名,想要得到如下分析结果:
19700329 A 60 1
19700813 A 80 2
19700525 A 100 3
19700109 A 200 4
19700723 B 70 1
19700201 B 80 2
19700305 B 90 3
19700516 B 100 4
这种场景下就可以通过Hive中提供的Window窗口分析函数实现。
Hive中Window窗口分析可以对数据按照某列进行分组或排序后,然后针对这组数据进行一些聚合/取值/排序/操作。Window窗口函数使用语法如下:
SELECT [column_list],
window_analytics_function OVER ([PARTITION BY col1[,col2...] ORDER BY col3 [asc|desc] [range_definition]
) AS colAliasName
FROM tbl
对以上语法的解释如下:
window_analytics_function:指定窗口分析函数,包括聚合函数、排名函数和其他函数。
PARTITION BY :可选项,用于将结果集划分成不同的分组,类似group by。如果不指定partition by 而设置Over窗口函数,则所有数据分到一个分组中处理。
ORDER BY :按照给定的排序列对分组的数据进行排序。
range_definition:用于定义窗口聚合的行范围,该范围通过BETWEEN语句定义窗口上下限:BETWEEN <下界> AND <上界>,边界的行也包含在聚合中,语法如下:
(ROWS | RANGE) BETWEEN (UNBOUNDED | [num]) PRECEDING AND ([num] PRECEDING | CURRENT ROW | (UNBOUNDED | [num]) FOLLOWING) (ROWS | RANGE) BETWEEN CURRENT ROW AND (CURRENT ROW | (UNBOUNDED | [num]) FOLLOWING) (ROWS | RANGE) BETWEEN [num] FOLLOWING AND (UNBOUNDED | [num]) FOLLOWING
关于range_definition语法的解释如下:
- unbounded:无边界。
- preceding:往前。
- following:往后。
- unbounded preceding :往前所有行,即初始行。
- n preceding:往前n行。
- unbounded following :往后所有行,即末尾行。
- n following:往后n行。
- current row : 当前行。
- rows between ... and ... : rows是指以行号来决定窗口的范围,是物理意义上的行。如:sum(score) over (PARTITION by id order by score ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) 表示按照id分组、score排序后,获取从当前行往前一行以及往后一行为窗口范围数据进行sum求和统计。
- range between ... and ... : range指在当前顺序下,以当前行的值为根基,拿当前的数值进行加减得到一个范围。例如:sum(score) over (PARTITION by id order by score RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING),表示按照id分组、score排序后,获取score减1到score加1范围内的所有数据,可能不止前后两行数据,形成一个窗口范围进行sum求和统计,这里可以理解为逻辑的行,可多可少。以当前行哪列为根基?就是以order by 后跟的列,该列必须是整数数值类型。
注意:
- Window函数中的range_definition不支持ROW_NUMBER()/RANK/DENSE_RANK 窗口函数。
- 在窗口函数中如果指定了ORDER BY 没有指定 range_definition,那么窗口统计大小为初始行到当前行(RANGE BETWEEN unbounded preceding AND current row)。
- 在窗口函数中没有指定ORDER BY 也没有指定 range_definition,那么窗口统计大小为初始行到末尾行(ROW BETWEEN unbounded preceding AND unbounded following),即所有数据。
了解Window语法后,以上需求可以通过window窗口函数row_number() 来实现,sql如下:
#sql语句
select dt,catagory,amount,row_number() over(partition by catagory order by amount desc) as rk from window_tbl;#结果
+-----------+-----------+---------+-----+
| dt | catagory | amount | rk |
+-----------+-----------+---------+-----+
| 19700109 | A | 200 | 1 |
| 19700525 | A | 100 | 2 |
| 19700813 | A | 80 | 3 |
| 19700329 | A | 60 | 4 |
| 19700516 | B | 100 | 1 |
| 19700305 | B | 90 | 2 |
| 19700201 | B | 80 | 3 |
| 19700723 | B | 70 | 4 |
+-----------+-----------+---------+-----+
2. Window窗口函数
常用的Window窗口函数聚合函数(COUNT/SUM/MIN/MAX/AVG)、排序函数(ROW_NUMBER()/RANK/DENSE_RANK、其他函数(LEAD/LAG/FIRST_VALUE/LAST_VALUE),下面分别进行介绍。
2.1 聚合函数
常用的Window窗口函数聚合函数包括:COUNT/SUM/MIN/MAX/AVG,这里以SUM为例举例讲解。有如下用户访问网站停留数据,三列分别表示用户ID、时间、网站停留时长:
#data.txt
111 1970-06-20 1
111 1970-06-21 2
111 1970-06-22 3
222 1970-06-20 4
222 1970-06-21 8
222 1970-06-22 13
333 1970-06-20 7
333 1970-06-21 9
333 1970-06-22 20
444 1970-06-23 10#Hive中建表如下:
CREATE TABLE user_acc_tbl(uid string,dt string,duration int
) row format delimited fields terminated by '\t';#将数据加载到表中
load data inpath '/data.txt' into table user_acc_tbl;#查询表中数据
select * from user_acc_tbl;
+-------------------+------------------+------------------------+
| user_acc_tbl.uid | user_acc_tbl.dt | user_acc_tbl.duration |
+-------------------+------------------+------------------------+
| 111 | 1970-06-20 | 1 |
| 111 | 1970-06-21 | 2 |
| 111 | 1970-06-22 | 3 |
| 222 | 1970-06-20 | 4 |
| 222 | 1970-06-21 | 8 |
| 222 | 1970-06-22 | 13 |
| 333 | 1970-06-20 | 7 |
| 333 | 1970-06-21 | 9 |
| 333 | 1970-06-22 | 20 |
| 444 | 1970-06-23 | 10 |
+-------------------+------------------+------------------------+
1) 使用window窗口分析函数统计每个用户网站停留总时长。
#sql语句
select uid,dt,duration,sum(duration) over (partition by uid) from user_acc_tbl;#结果
+------+-------------+-----------+---------------+
| uid | dt | duration | sum_window_0 |
+------+-------------+-----------+---------------+
| 111 | 1970-06-22 | 3 | 6 |
| 111 | 1970-06-21 | 2 | 6 |
| 111 | 1970-06-20 | 1 | 6 |
| 222 | 1970-06-22 | 13 | 25 |
| 222 | 1970-06-21 | 8 | 25 |
| 222 | 1970-06-20 | 4 | 25 |
| 333 | 1970-06-22 | 20 | 36 |
| 333 | 1970-06-21 | 9 | 36 |
| 333 | 1970-06-20 | 7 | 36 |
| 444 | 1970-06-23 | 10 | 10 |
+------+-------------+-----------+---------------+
2) 统计每个用户停留累计时长,要求同一用户每天都累加之前天数的值,累计到当前天。
#sql语句
select uid,dt,duration,sum(duration) over (partition by uid order by dt) from user_acc_tbl;#结果
+------+-------------+-----------+---------------+
| uid | dt | duration | sum_window_0 |
+------+-------------+-----------+---------------+
| 111 | 1970-06-20 | 1 | 1 |
| 111 | 1970-06-21 | 2 | 3 |
| 111 | 1970-06-22 | 3 | 6 |
| 222 | 1970-06-20 | 4 | 4 |
| 222 | 1970-06-21 | 8 | 12 |
| 222 | 1970-06-22 | 13 | 25 |
| 333 | 1970-06-20 | 7 | 7 |
| 333 | 1970-06-21 | 9 | 16 |
| 333 | 1970-06-22 | 20 | 36 |
| 444 | 1970-06-23 | 10 | 10 |
+------+-------------+-----------+---------------+
3) 统计每个用户停留时长,每天统计结果为当前天停留时长累加前1天停留时长。
#sql语句
select uid,dt,duration,sum(duration) over (partition by uid order by dt rows between 1 preceding and current row) from user_acc_tbl;#结果
+------+-------------+-----------+---------------+
| uid | dt | duration | sum_window_0 |
+------+-------------+-----------+---------------+
| 111 | 1970-06-20 | 1 | 1 |
| 111 | 1970-06-21 | 2 | 3 |
| 111 | 1970-06-22 | 3 | 5 |
| 222 | 1970-06-20 | 4 | 4 |
| 222 | 1970-06-21 | 8 | 12 |
| 222 | 1970-06-22 | 13 | 21 |
| 333 | 1970-06-20 | 7 | 7 |
| 333 | 1970-06-21 | 9 | 16 |
| 333 | 1970-06-22 | 20 | 29 |
| 444 | 1970-06-23 | 10 | 10 |
+------+-------------+-----------+---------------+
4) 统计每个用户停留时长,每天统计结果为当前天停留时长累加前1天和后一天的停留时长。
#sql语句
select uid,dt,duration,sum(duration) over (partition by uid order by dt rows between 1 preceding and 1 following ) from user_acc_tbl;#结果
+------+-------------+-----------+---------------+
| uid | dt | duration | sum_window_0 |
+------+-------------+-----------+---------------+
| uid | dt | duration | sum_window_0 |
+------+-------------+-----------+---------------+
| 111 | 1970-06-20 | 1 | 3 |
| 111 | 1970-06-21 | 2 | 6 |
| 111 | 1970-06-22 | 3 | 5 |
| 222 | 1970-06-20 | 4 | 12 |
| 222 | 1970-06-21 | 8 | 25 |
| 222 | 1970-06-22 | 13 | 21 |
| 333 | 1970-06-20 | 7 | 16 |
| 333 | 1970-06-21 | 9 | 36 |
| 333 | 1970-06-22 | 20 | 29 |
| 444 | 1970-06-23 | 10 | 10 |
+------+-------------+-----------+---------------+
5) 统计每个用户停留时长,每天统计结果为当前天停留时长及前后停留时长相差不超过5的对应天累计停留时长之和。
#sql语句
select uid,dt,duration,sum(duration) over (partition by uid order by duration range between 3 preceding and 5 following ) from user_acc_tbl;#结果
+------+-------------+-----------+---------------+
| uid | dt | duration | sum_window_0 |
+------+-------------+-----------+---------------+
| 111 | 1970-06-20 | 1 | 6 |
| 111 | 1970-06-21 | 2 | 6 |
| 111 | 1970-06-22 | 3 | 6 |
| 222 | 1970-06-20 | 4 | 12 |
| 222 | 1970-06-21 | 8 | 21 |
| 222 | 1970-06-22 | 13 | 13 |
| 333 | 1970-06-20 | 7 | 16 |
| 333 | 1970-06-21 | 9 | 16 |
| 333 | 1970-06-22 | 20 | 20 |
| 444 | 1970-06-23 | 10 | 10 |
+------+-------------+-----------+---------------+
2.2 排序函数
常用的排序函数有ROW_NUMBER()/RANK/DENSE_RANK。如下是三个排序函数区别:
- row_number() over (partitin by XXX order by XXX) 同个分组内生成连续的序号,每个分组内从1开始且排序相同的数据会标不同的序号。
- rank() over (partitin by XXX order by XXX) 同个分组内生成不连续的序号,在每个分组内从1开始,同个分组内相同数据标的序号相同。
- dense_rank() over (partitin by XXX order by XXX)同个分组内生成连续的序号,在每个分组内从1开始,同个分组内相同数据标的序号相同,之后的数据标号连续。
案例如下,有如下学生分数数据,创建Hive表并将数据加载到Hive表中。
#data.txt
1 name1 cls1 100
2 name2 cls1 100
3 name3 cls1 90
4 name4 cls1 80
5 name5 cls1 80
6 name6 cls2 90
7 name7 cls2 85
8 name8 cls2 85
9 name9 cls2 70
10 name10 cls2 60#创建hive student_scores表
CREATE TABLE student_scores (id string,name string,cls string,score int
) row format delimited fields terminated by '\t';#向表student_scores中加载数据
load data inpath '/data.txt' into table student_scores;#查询表中数据
select * from student_scores;
+--------------------+----------------------+---------------------+-----------------------+
| student_scores.id | student_scores.name | student_scores.cls | student_scores.score |
+--------------------+----------------------+---------------------+-----------------------+
| 1 | name1 | cls1 | 100 |
| 2 | name2 | cls1 | 100 |
| 3 | name3 | cls1 | 90 |
| 4 | name4 | cls1 | 80 |
| 5 | name5 | cls1 | 80 |
| 6 | name6 | cls2 | 90 |
| 7 | name7 | cls2 | 85 |
| 8 | name8 | cls2 | 85 |
| 9 | name9 | cls2 | 70 |
| 10 | name10 | cls2 | 60 |
+--------------------+----------------------+---------------------+-----------------------+
1) 使用row_number() 函数对不同班级学生分数进行排名
#sql语句
select id,name,cls,score ,row_number() over(partition by cls order by score desc) as rk from student_scores;#结果
+-----+---------+-------+--------+-----+
| id | name | cls | score | rk |
+-----+---------+-------+--------+-----+
| 2 | name2 | cls1 | 100 | 1 |
| 1 | name1 | cls1 | 100 | 2 |
| 3 | name3 | cls1 | 90 | 3 |
| 5 | name5 | cls1 | 80 | 4 |
| 4 | name4 | cls1 | 80 | 5 |
| 6 | name6 | cls2 | 90 | 1 |
| 8 | name8 | cls2 | 85 | 2 |
| 7 | name7 | cls2 | 85 | 3 |
| 9 | name9 | cls2 | 70 | 4 |
| 10 | name10 | cls2 | 60 | 5 |
+-----+---------+-------+--------+-----+
2) 使用rank() 函数对不同班级学生分数进行排名
#sql语句
select id,name,cls,score ,rank() over(partition by cls order by score desc) as rk from student_scores;#结果
+-----+---------+-------+--------+-----+
| id | name | cls | score | rk |
+-----+---------+-------+--------+-----+
| 2 | name2 | cls1 | 100 | 1 |
| 1 | name1 | cls1 | 100 | 1 |
| 3 | name3 | cls1 | 90 | 3 |
| 5 | name5 | cls1 | 80 | 4 |
| 4 | name4 | cls1 | 80 | 4 |
| 6 | name6 | cls2 | 90 | 1 |
| 8 | name8 | cls2 | 85 | 2 |
| 7 | name7 | cls2 | 85 | 2 |
| 9 | name9 | cls2 | 70 | 4 |
| 10 | name10 | cls2 | 60 | 5 |
+-----+---------+-------+--------+-----+
3) 使用dense_rank()函数对不同班级学生分数进行排名
#sql语句
select id,name,cls,score ,dense_rank() over(partition by cls order by score desc) as rk from student_scores;#结果
+-----+---------+-------+--------+-----+
| id | name | cls | score | rk |
+-----+---------+-------+--------+-----+
| 2 | name2 | cls1 | 100 | 1 |
| 1 | name1 | cls1 | 100 | 1 |
| 3 | name3 | cls1 | 90 | 2 |
| 5 | name5 | cls1 | 80 | 3 |
| 4 | name4 | cls1 | 80 | 3 |
| 6 | name6 | cls2 | 90 | 1 |
| 8 | name8 | cls2 | 85 | 2 |
| 7 | name7 | cls2 | 85 | 2 |
| 9 | name9 | cls2 | 70 | 3 |
| 10 | name10 | cls2 | 60 | 4 |
+-----+---------+-------+--------+-----+
注意:也可以将以上多个排序行数写入到一个HQL中进行查询。
2.3 其他函数
常用的其他函数还有LEAD/LAG/FIRST_VALUE/LAST_VALUE。如下是对这几个函数的解释。
- lag:lag(col,n,DEFAULT)窗口函数返回分组中当前行之前行(可以指定第几行)的值, 如果没有行,则返回null。第一个参数为列名,第二个参数为当前行之前第n行(可选,默认为1),第三个参数为缺失时默认值(当前行之前第n行为NULL没有时,返回该默认值,如不指定,则为NULL)。
- lead:lead(col,n,DEFAULT)窗口函数返回分组中当前行后面行(可以指定第几行)的值, 如果没有行,则返回null。第一个参数为列名,第二个参数为当前行后面第n行(可选,默认为1),第三个参数为缺失时默认值(当前行后面第n行为没有时,返回该默认值,如不指定,则为NULL)。
- first_value:first_value(col)窗口函数返回相对于窗口中第一行的指定列的值。
- last_value:last_value(col)窗口函数返回相对于窗口中最后一行的指定列的值。
案例:针对前面user_acc_tbl表中数据进行如下业务指标统计。
1) lag函数统计每个用户当期天停留时长与上一天停留时长差值。
#sql
select uid,dt,duration,lag(duration,1) over(partition by uid order by dt),(duration - lag(duration,1,0) over(partition by uid order by dt) ) as diff from user_acc_tbl;#结果
+------+-------------+-----------+---------------+-------+
| uid | dt | duration | lag_window_0 | diff |
+------+-------------+-----------+---------------+-------+
| 111 | 1970-06-20 | 1 | NULL | 1 |
| 111 | 1970-06-21 | 2 | 1 | 1 |
| 111 | 1970-06-22 | 3 | 2 | 1 |
| 222 | 1970-06-20 | 4 | NULL | 4 |
| 222 | 1970-06-21 | 8 | 4 | 4 |
| 222 | 1970-06-22 | 13 | 8 | 5 |
| 333 | 1970-06-20 | 7 | NULL | 7 |
| 333 | 1970-06-21 | 9 | 7 | 2 |
| 333 | 1970-06-22 | 20 | 9 | 11 |
| 444 | 1970-06-23 | 10 | NULL | 10 |
+------+-------------+-----------+---------------+-------+
2) lead函数统计每个用户当期天停留时长与下一天停留时长差值。
#sql
select uid,dt,duration,lead(duration,1) over(partition by uid order by dt),(duration - lead(duration,1,0) over(partition by uid order by dt) ) as diff from user_acc_tbl;#结果
+------+-------------+-----------+----------------+-------+
| uid | dt | duration | lead_window_0 | diff |
+------+-------------+-----------+----------------+-------+
| 111 | 1970-06-20 | 1 | 2 | -1 |
| 111 | 1970-06-21 | 2 | 3 | -1 |
| 111 | 1970-06-22 | 3 | NULL | 3 |
| 222 | 1970-06-20 | 4 | 8 | -4 |
| 222 | 1970-06-21 | 8 | 13 | -5 |
| 222 | 1970-06-22 | 13 | NULL | 13 |
| 333 | 1970-06-20 | 7 | 9 | -2 |
| 333 | 1970-06-21 | 9 | 20 | -11 |
| 333 | 1970-06-22 | 20 | NULL | 20 |
| 444 | 1970-06-23 | 10 | NULL | 10 |
+------+-------------+-----------+----------------+-------+
3) first_value和last_value获取窗口中第一条和最后一条数据。
#sql
select uid,dt,duration,first_value(duration) over(partition by uid order by dt),last_value(duration) over(partition by uid order by dt) from user_acc_tbl;#结果
+------+-------------+-----------+-----------------------+----------------------+
| uid | dt | duration | first_value_window_0 | last_value_window_1 |
+------+-------------+-----------+-----------------------+----------------------+
| 111 | 1970-06-20 | 1 | 1 | 1 |
| 111 | 1970-06-21 | 2 | 1 | 2 |
| 111 | 1970-06-22 | 3 | 1 | 3 |
| 222 | 1970-06-20 | 4 | 4 | 4 |
| 222 | 1970-06-21 | 8 | 4 | 8 |
| 222 | 1970-06-22 | 13 | 4 | 13 |
| 333 | 1970-06-20 | 7 | 7 | 7 |
| 333 | 1970-06-21 | 9 | 7 | 9 |
| 333 | 1970-06-22 | 20 | 7 | 20 |
| 444 | 1970-06-23 | 10 | 10 | 10 |
+------+-------------+-----------+-----------------------+----------------------+
以上last_value默认获取的最后一条数据时截止到当前条数据,也可以指定“rows between current row and unbounded following) ”数据范围到最后一条来获取每个分组内的最后一条数据。
#sql
select uid,dt,duration,first_value(duration) over(partition by uid order by dt),last_value(duration) over(partition by uid order by dt rows between current row and unbounded following) from user_acc_tbl;#结果
+------+-------------+-----------+-----------------------+----------------------+
| uid | dt | duration | first_value_window_0 | last_value_window_1 |
+------+-------------+-----------+-----------------------+----------------------+
| 111 | 1970-06-20 | 1 | 1 | 3 |
| 111 | 1970-06-21 | 2 | 1 | 3 |
| 111 | 1970-06-22 | 3 | 1 | 3 |
| 222 | 1970-06-20 | 4 | 4 | 13 |
| 222 | 1970-06-21 | 8 | 4 | 13 |
| 222 | 1970-06-22 | 13 | 4 | 13 |
| 333 | 1970-06-20 | 7 | 7 | 20 |
| 333 | 1970-06-21 | 9 | 7 | 20 |
| 333 | 1970-06-22 | 20 | 7 | 20 |
| 444 | 1970-06-23 | 10 | 10 | 10 |
+------+-------------+-----------+-----------------------+----------------------+
3. Hive With...As表达式
Hive中支持With...As通用表达式(Common Table Expression,CTE),通过该表达式可以将指定查询看成一个逻辑表,方便后续使用,cte可以大大简化sql,使sql更加简洁,提高sql可读性。
with...as表达式使用格式如下:
WITH cte_name AS(cte_query)[,cte_name2 AS (cte_query2)
,……]
SELECT |INSERT |CREATE TABLE AS SELECT ... ... FROM cte_name ;
注意:hive中 with...as 语句只是将sql片段看成一个逻辑表,类似视图,简化了sql,并没有实体表创建。在Hive 4.x版本后,with...as 语句可以将SQL片段结果进行物化(数据落地磁盘),默认通过”hive.optimize.cte.materialize.threshold”参数进行设置,该参数默认为“-1”表示不开启物化数据,当设置大于0时,例如:设置为2,则with...as语句被引用2次及以上时,会把with...as语句生成的table进行物化,提高效率。
在Window其他函数中的例子lag函数统计每个用户当期天停留时长与上一天停留时长差值。如下:
select uid,dt,duration,lead(duration,1) over(partition by uid order by dt),(duration - lead(duration,1,0) over(partition by uid order by dt) ) as diff from user_acc_tbl;
使用with...as语句如下:
#sql语句如下:
with temp as(select uid,dt,duration,lag(duration,1,0) over(partition by uid order by dt) as ld from user_acc_tbl
)
select uid,dt,duration,(duration-ld) as diff from temp;#结果:
+------+-------------+-----------+-------+
| uid | dt | duration | diff |
+------+-------------+-----------+-------+
| 111 | 1970-06-20 | 1 | 1 |
| 111 | 1970-06-21 | 2 | 1 |
| 111 | 1970-06-22 | 3 | 1 |
| 222 | 1970-06-20 | 4 | 4 |
| 222 | 1970-06-21 | 8 | 4 |
| 222 | 1970-06-22 | 13 | 5 |
| 333 | 1970-06-20 | 7 | 7 |
| 333 | 1970-06-21 | 9 | 2 |
| 333 | 1970-06-22 | 20 | 11 |
| 444 | 1970-06-23 | 10 | 10 |
+------+-------------+-----------+-------+
4. 自定义函数
在Hive中提供的函数可以满足我们绝大多数数据分析场景,对于一些复杂的分析场景如果不能使用Hive自带函数来解决,也可以通过自定义函数来实现。最常用的自定义函数类型就是UDF。
4.1 UDF实现
下面通过实现一个用户自定义函数实现对字符串的拼接功能。
1) 创建Maven项目,在pom.xml中引入如下依赖
... ...
<dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>4.0.0</version>
</dependency>
... ...<build><plugins> <!-- 插件配置 --><plugin><artifactId>maven-assembly-plugin</artifactId> <!-- Maven Assembly 插件 --><version>2.6</version> <!-- 使用的插件版本 --><configuration> <!-- 插件配置 --><!-- 设置 false 后会去掉生成的 Jar 文件名后缀中的 "-jar-with-dependencies" --><!--<appendAssemblyId>false</appendAssemblyId>--><descriptorRefs> <!-- 指定使用的 assembly 描述符 --><descriptorRef>jar-with-dependencies</descriptorRef> <!-- 使用的是带依赖项的描述符 --></descriptorRefs><archive> <!-- 归档配置 --><manifest> <!-- MANIFEST.MF 文件配置 --><mainClass>xx.xx.xx.xx</mainClass> <!-- 指定主类 --></manifest></archive></configuration><executions> <!-- 插件执行配置 --><execution><id>make-assembly</id> <!-- 执行的 ID --><phase>package</phase> <!-- 在 Maven 构建的 package 阶段执行 --><goals> <!-- 执行的目标 --><goal>assembly</goal> <!-- 执行 assembly 目标 --></goals></execution></executions></plugin></plugins></build>
... ...
2) 编写Hive UDF代码
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;/*** 用户自定义 UDF - 实现两个字符串的拼接*/
public class MyUDF extends GenericUDF {/*** 该方法在UDF初始化时调用,用于校验输入参数的类型和数量,并返回结果的ObjectInspector* @param arguments :调用UDF函数传入的参数* @return :UDF函数返回的类型。* @throws UDFArgumentException*/@Overridepublic ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {//如果参数不是2个,则抛出异常if (arguments.length != 2) {throw new UDFArgumentLengthException("user defined udf only takes 2 arguments: str1, str2");}//遍历参数,如果每个参数不是字符串类型则抛出异常for (int i = 0; i < arguments.length; i++) {//如果类型不属于基本类型或者不是基本类型中的string类型,抛出异常if (!arguments[i].getCategory().equals(ObjectInspector.Category.PRIMITIVE) ||!arguments[i].getTypeName().equalsIgnoreCase("string")) {throw new UDFArgumentTypeException(i, "Only string type arguments are accepted");}}//返回 string 类型return PrimitiveObjectInspectorFactory.writableStringObjectInspector;}/*** 该方法在UDF实际计算时调用,接收输入参数,执行计算并返回结果。* @param arguments* @return* @throws HiveException*/@Overridepublic Object evaluate(DeferredObject[] arguments) throws HiveException {String str1 = arguments[0].get().toString();String str2 = arguments[1].get().toString();if (str1 == null || str2 == null) {return null;}//String类型需要返回Text类型return new Text(str1+str2);}/*** 该方法内部获取UDF的描述信息用于日志记录,可以不实现。* @param strings* @return*/@Overridepublic String getDisplayString(String[] strings) {return "";}
}
用户实现Hive UDF需要继承GenericUDF类,该类中有3个方法需要实现,如下:
- initialize:该方法在UDF初始化时调用,用于校验输入参数的类型和数量,并返回结果的ObjectInspector,指定UDF返回的类型。
- evaluate:该方法在UDF实际计算时调用,接收输入参数,执行计算并返回结果。
- getDisplayString:该方法内部获取UDF的描述信息用于日志记录,可以不实现。
编写完成UDF后,使用UDF是可以将UDF注册成临时函数或者永久函数,临时函数特点就是在本次会话中有效;永久函数是任意会话都可使用UDF。
4.2 临时函数
1) 打包UDF
编写完代码后,将项目打成jar包,然后上传至HiveServer2节点某个路径中,这里上传至/root目录下。
[root@hadoop101 ~]# ls |grep MyHive-1.0-SNAPSHOT-jar-with-dependencies.jar
MyHive-1.0-SNAPSHOT-jar-with-dependencies.jar
2) 在Hive beeline客户端中使用UDF
#将jar包添加到hive classpath中
add jar /root/MyHive-1.0-SNAPSHOT-jar-with-dependencies.jar;#创建临时函数并关联UDF
create temporary function myconcat as "com.myhive.MyUDF";#使用UDF
select myconcat("Hello","World");
+-------------+
| _c0 |
+-------------+
| HelloWorld |
+-------------+#传入非字符串类型数据,抛出异常
select myconcat(1,2);
Error: Error while compiling statement: FAILED: SemanticException [Error 10016]: L
ine 1:16 Argument type mismatch '1': Only string type arguments are accepted (state=42000,code=10016)
3) 删除临时UDF函数
#删除UDF
drop temporary function myconcat;
4.3 永久函数
永久UDF函数就不能使用add jar方式,本身add jar 方式就是本次会话有效,所以创建永久函数时需要将jar包上传至HDFS某个路径中,在创建函数时指定该jar即可,这样会话退出后进入新的会话,也能找到UDF对应的jar包。
1) 打包UDF
编写完代码后,将项目打成jar包,然后上传至HDFS中路径下,这里上传至/hiveudf目录下。
#HDFS中创建路径
[root@hadoop101 ~]# hdfs dfs -mkdir /hiveudf#上传
[root@hadoop101 ~]# hdfs dfs -put ./MyHive-1.0-SNAPSHOT-jar-with-dependencies.jar /hiveudf
2) 在Hive beeline客户端中使用UDF
#创建永久函数,在哪个库中创建UDF,该UDF默认就属于哪个库
create function myconcat as "com.myhive.MyUDF" using jar "hdfs://mycluster/hiveudf/MyHive-1.0-SNAPSHOT-jar-with-dependencies.jar";#使用UDF
select myconcat("Hello","World");
+-------------+
| _c0 |
+-------------+
| HelloWorld |
+-------------+#传入非字符串类型数据,抛出异常
select myconcat(1,2);
Error: Error while compiling statement: FAILED: SemanticException [Error 10016]: L
ine 1:16 Argument type mismatch '1': Only string type arguments are accepted (state=42000,code=10016)
3) 删除永久UDF函数
#删除UDF
drop function myconcat;