目标:通过UDF实现对表历史数据清除
入参:表名、保留天数N
一、pom文件
< project xmlns = " http://maven.apache.org/POM/4.0.0" xmlns: xsi= " http://www.w3.org/2001/XMLSchema-instance" xsi: schemaLocation= " http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion> 4.0.0</ modelVersion> < groupId> com.example</ groupId> < artifactId> hive-udf-example</ artifactId> < version> 1.0-SNAPSHOT</ version> < packaging> jar</ packaging> < name> hive-udf-example</ name> < description> Hive UDF for deleting partitions by date</ description> < properties> < project.build.sourceEncoding> UTF-8</ project.build.sourceEncoding> < maven.compiler.source> 1.8</ maven.compiler.source> < maven.compiler.target> 1.8</ maven.compiler.target> </ properties> < dependencies> < dependency> < groupId> org.apache.hive</ groupId> < artifactId> hive-exec</ artifactId> < version> 1.2.1</ version> </ dependency> < dependency> < groupId> org.apache.hive</ groupId> < artifactId> hive-metastore</ artifactId> < version> 1.2.1</ version> </ dependency> < dependency> < groupId> org.apache.hadoop</ groupId> < artifactId> hadoop-client</ artifactId> < version> 2.7.3</ version> </ dependency> < dependency> < groupId> org.slf4j</ groupId> < artifactId> slf4j-api</ artifactId> < version> 1.7.25</ version> </ dependency> < dependency> < groupId> org.slf4j</ groupId> < artifactId> slf4j-log4j12</ artifactId> < version> 1.7.25</ version> </ dependency> < dependency> < groupId> junit</ groupId> < artifactId> junit</ artifactId> < version> 4.12</ version> < scope> test</ scope> </ dependency> </ dependencies> < build> < plugins> < plugin> < groupId> org.apache.maven.plugins</ groupId> < artifactId> maven-compiler-plugin</ artifactId> < version> 3.8.1</ version> < configuration> < source> 1.8</ source> < target> 1.8</ target> </ configuration> </ plugin> </ plugins> </ build>
</ project>
二、java代码
package org. udf ; import org. apache. hadoop. fs. * ;
import org. apache. hadoop. hive. conf. HiveConf ;
import org. apache. hadoop. hive. ql. exec. Description ;
import org. apache. hadoop. hive. ql. exec. UDF;
import org. apache. hadoop. hive. metastore. HiveMetaStoreClient ;
import org. apache. hadoop. hive. metastore. api. FieldSchema ;
import org. apache. hadoop. hive. metastore. api. Partition ;
import org. apache. hadoop. hive. metastore. api. Table ;
import org. apache. hadoop. conf. Configuration ;
import org. apache. thrift. TException ;
import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
import java. io. IOException ;
import java. text. SimpleDateFormat ;
import java. util. * ; @Description ( name = "del_dt" , value = "通过删除HDFS文件并同步元数据的方式删除表N天前的分区 - 入参:表名, N(天数)"
)
public class del_dt extends UDF { private static final Logger LOG = LoggerFactory . getLogger ( del_dt. class ) ; public String evaluate ( String tableName, int days) { if ( tableName == null || days < 0 ) { return "错误:表名不能为空且天数不能为负数" ; } Configuration conf = new Configuration ( ) ; FileSystem fs = null ; HiveMetaStoreClient client = null ; int deletedCount = 0 ; try { fs = FileSystem . get ( conf) ; HiveConf hiveConf = new HiveConf ( conf, this . getClass ( ) ) ; client = new HiveMetaStoreClient ( hiveConf) ; String dbName = "bjsythzczcpt" ; String tableOnlyName = tableName; if ( tableName. contains ( "." ) ) { String [ ] parts = tableName. split ( "\\." ) ; dbName = parts[ 0 ] ; tableOnlyName = parts[ 1 ] ; } if ( ! client. tableExists ( dbName, tableOnlyName) ) { return "错误:表 " + tableName + " 不存在" ; } Table table = client. getTable ( dbName, tableOnlyName) ; List < FieldSchema > partitionKeys = table. getPartitionKeys ( ) ; if ( partitionKeys == null || partitionKeys. isEmpty ( ) ) { return "错误:表 " + tableName + " 不是分区表" ; } boolean hasDatePartition = false ; for ( FieldSchema key : partitionKeys) { if ( key. getName ( ) . equalsIgnoreCase ( "dt" ) ) { hasDatePartition = true ; break ; } } if ( ! hasDatePartition) { return "错误:表 " + tableName + " 不包含日期分区列(dt)" ; } Calendar cal = Calendar . getInstance ( ) ; cal. add ( Calendar . DAY_OF_YEAR, - days) ; Date cutoffDate = cal. getTime ( ) ; SimpleDateFormat sdf = new SimpleDateFormat ( "yyyyMMdd" ) ; String cutoffDateStr = sdf. format ( cutoffDate) ; List < Partition > partitions = client. listPartitions ( dbName, tableOnlyName, ( short ) - 1 ) ; String tableLocation = table. getSd ( ) . getLocation ( ) ; for ( Partition partition : partitions) { Map < String , String > partitionValues = getPartitionValues ( client, partition) ; String dtValue = partitionValues. get ( "dt" ) ; if ( dtValue != null ) { try { Date partitionDate = sdf. parse ( dtValue) ; if ( partitionDate. before ( cutoffDate) ) { String partitionPath = buildPartitionPath ( tableLocation, partition. getValues ( ) , partitionKeys) ; Path hdfsPath = new Path ( partitionPath) ; if ( fs. exists ( hdfsPath) ) { fs. delete ( hdfsPath, true ) ; LOG. info ( "成功删除HDFS分区路径: {}" , partitionPath) ; client. dropPartition ( dbName, tableOnlyName, partition. getValues ( ) , true ) ; deletedCount++ ; LOG. info ( "成功删除分区: {}" , partition. getValues ( ) ) ; } } } catch ( Exception e) { LOG. error ( "处理分区失败 ({}): {}" , partition. getValues ( ) , e. getMessage ( ) ) ; } } } return "操作完成:成功删除 " + deletedCount + " 个分区" ; } catch ( IOException | TException e) { LOG. error ( "执行失败: {}" , e. getMessage ( ) ) ; return "错误:执行失败 - " + e. getMessage ( ) ; } finally { if ( fs != null ) { try { fs. close ( ) ; } catch ( IOException e) { LOG. error ( "关闭HDFS连接失败: {}" , e. getMessage ( ) ) ; } } if ( client != null ) { client. close ( ) ; } } } private Map < String , String > getPartitionValues ( HiveMetaStoreClient client, Partition partition) { Map < String , String > values = new HashMap < > ( ) ; List < String > partitionVals = partition. getValues ( ) ; try { Table table = client. getTable ( partition. getDbName ( ) , partition. getTableName ( ) ) ; List < FieldSchema > partitionKeys = table. getPartitionKeys ( ) ; for ( int i = 0 ; i < Math . min ( partitionKeys. size ( ) , partitionVals. size ( ) ) ; i++ ) { values. put ( partitionKeys. get ( i) . getName ( ) , partitionVals. get ( i) ) ; } } catch ( TException e) { LOG. error ( "获取分区键失败: {}" , e. getMessage ( ) ) ; } return values; } private String buildPartitionPath ( String tableLocation, List < String > partitionValues, List < FieldSchema > partitionKeys) { StringBuilder pathBuilder = new StringBuilder ( tableLocation) ; for ( int i = 0 ; i < partitionValues. size ( ) ; i++ ) { if ( i < partitionKeys. size ( ) ) { pathBuilder. append ( "/" ) . append ( partitionKeys. get ( i) . getName ( ) ) . append ( "=" ) . append ( partitionValues. get ( i) ) ; } } return pathBuilder. toString ( ) ; }
}
三、函数创建与修改
add jar hdfs:
CREATE FUNCTION del_dt AS 'org.udf.del_dt' ;
DELETE jar hdfs:
add jar hdfs:
drop FUNCTION del_dt;
CREATE FUNCTION del_dt AS 'org.udf.del_dt' ;
四、调用示例;
hive> SELECT del_dt( 'dwd_abc_df' , 7 ) ;
OK
操作完成:成功删除 0 个分区
Time taken: 0.192 seconds