当前位置: 首页 > news >正文

低版本hive(1.2.1)UDF实现清除历史分区数据

目标:通过UDF实现对表历史数据清除

入参:表名、保留天数N

一、pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>hive-udf-example</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>hive-udf-example</name><description>Hive UDF for deleting partitions by date</description><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><!-- Hive Exec (Hive 1.2.1版本) --><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>1.2.1</version></dependency><!-- Hive Metastore (Hive 1.2.1版本) --><dependency><groupId>org.apache.hive</groupId><artifactId>hive-metastore</artifactId><version>1.2.1</version></dependency><!-- Hadoop Client (Hive 1.2.1默认依赖Hadoop 2.7.3) --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.3</version></dependency><!-- SLF4J 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><!-- 打包 --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency></dependencies><build><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
</project>

二、java代码

package org.udf;import org.apache.hadoop.fs.*;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;@Description(name = "del_dt",value = "通过删除HDFS文件并同步元数据的方式删除表N天前的分区 - 入参:表名, N(天数)"
)
public class del_dt extends UDF {/*UDF复用需要修改 dbName(数仓schema) 和 sdf(分区格式)两个参数*/private static final Logger LOG = LoggerFactory.getLogger(del_dt.class);public String evaluate(String tableName, int days) {if (tableName == null || days < 0) {return "错误:表名不能为空且天数不能为负数";}Configuration conf = new Configuration();FileSystem fs = null;HiveMetaStoreClient client = null;int deletedCount = 0;try {// 获取HDFS文件系统fs = FileSystem.get(conf);// 获取Hive元数据客户端HiveConf hiveConf = new HiveConf(conf, this.getClass());client = new HiveMetaStoreClient(hiveConf);// 解析表名(处理db.table格式)                                                    -- 需要修改的变量String dbName = "bjsythzczcpt";String tableOnlyName = tableName;if (tableName.contains(".")) {String[] parts = tableName.split("\\.");dbName = parts[0];tableOnlyName = parts[1];}// 检查表是否存在if (!client.tableExists(dbName, tableOnlyName)) {return "错误:表 " + tableName + " 不存在";}// 获取表信息Table table = client.getTable(dbName, tableOnlyName);// 检查表是否为分区表List<FieldSchema> partitionKeys = table.getPartitionKeys();if (partitionKeys == null || partitionKeys.isEmpty()) {return "错误:表 " + tableName + " 不是分区表";}// 检查是否包含日期分区列(假设为dt)boolean hasDatePartition = false;for (FieldSchema key : partitionKeys) {if (key.getName().equalsIgnoreCase("dt")) {hasDatePartition = true;break;}}if (!hasDatePartition) {return "错误:表 " + tableName + " 不包含日期分区列(dt)";}// 计算N天前的日期Calendar cal = Calendar.getInstance();cal.add(Calendar.DAY_OF_YEAR, -days);Date cutoffDate = cal.getTime();SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");               // 分区格式String cutoffDateStr = sdf.format(cutoffDate);// 获取表的所有分区List<Partition> partitions = client.listPartitions(dbName, tableOnlyName, (short) -1);// 获取表的存储描述符(用于构建分区路径)String tableLocation = table.getSd().getLocation();// 遍历分区并删除N天前的分区for (Partition partition : partitions) {Map<String, String> partitionValues = getPartitionValues(client, partition);String dtValue = partitionValues.get("dt");if (dtValue != null) {try {Date partitionDate = sdf.parse(dtValue);if (partitionDate.before(cutoffDate)) {// 构建分区路径String partitionPath = buildPartitionPath(tableLocation, partition.getValues(), partitionKeys);Path hdfsPath = new Path(partitionPath);// 删除HDFS上的分区数据if (fs.exists(hdfsPath)) {fs.delete(hdfsPath, true);LOG.info("成功删除HDFS分区路径: {}", partitionPath);// 从元数据中删除分区client.dropPartition(dbName, tableOnlyName, partition.getValues(), true);deletedCount++;LOG.info("成功删除分区: {}", partition.getValues());}}} catch (Exception e) {LOG.error("处理分区失败 ({}): {}", partition.getValues(), e.getMessage());}}}return "操作完成:成功删除 " + deletedCount + " 个分区";} catch (IOException | TException e) {LOG.error("执行失败: {}", e.getMessage());return "错误:执行失败 - " + e.getMessage();} finally {// 关闭资源if (fs != null) {try {fs.close();} catch (IOException e) {LOG.error("关闭HDFS连接失败: {}", e.getMessage());}}if (client != null) {client.close();}}}// 解析分区值(修正静态方法问题)private Map<String, String> getPartitionValues(HiveMetaStoreClient client, Partition partition) {Map<String, String> values = new HashMap<>();List<String> partitionVals = partition.getValues();try {// 使用已创建的client实例获取表信息Table table = client.getTable(partition.getDbName(), partition.getTableName());List<FieldSchema> partitionKeys = table.getPartitionKeys();for (int i = 0; i < Math.min(partitionKeys.size(), partitionVals.size()); i++) {values.put(partitionKeys.get(i).getName(), partitionVals.get(i));}} catch (TException e) {LOG.error("获取分区键失败: {}", e.getMessage());}return values;}// 构建分区路径private String buildPartitionPath(String tableLocation, List<String> partitionValues, List<FieldSchema> partitionKeys) {StringBuilder pathBuilder = new StringBuilder(tableLocation);for (int i = 0; i < partitionValues.size(); i++) {if (i < partitionKeys.size()) {pathBuilder.append("/").append(partitionKeys.get(i).getName()).append("=").append(partitionValues.get(i));}}return pathBuilder.toString();}
}

三、函数创建与修改

-- 创建函数
add jar hdfs:///hdfs路径/jar包名.jar;
CREATE  FUNCTION del_dt AS 'org.udf.del_dt';-- 修改函数
DELETE jar hdfs:///hdfs路径/jar包名.jar;
add jar hdfs:///hdfs路径/jar包名.jar;
drop FUNCTION del_dt;
CREATE  FUNCTION del_dt AS 'org.udf.del_dt';

四、调用示例;

-- 删除dwd_abc_df表历史分区数据,保留最近7天分区
hive> SELECT del_dt('dwd_abc_df',7);
-- 结果输出
OK
操作完成:成功删除 0 个分区
Time taken: 0.192 seconds
http://www.dtcms.com/a/272201.html

相关文章:

  • 1.1.1数据类型与变量——AI教你学Django
  • 基于Uniapp+MySQL+PHP的景区多商户小程序源码系统 带完整的搭建指南
  • vue引入应用通义AI大模型-(一)前期准备整理思路
  • idea如何打开extract surround
  • 【CPU】不同核数下的翻译
  • hbuilder开发app记录
  • ReaLTaiizor:WinForms 界面设计利器
  • 《未来已来:当人类智慧遇上AI智能体》
  • 【WPF实战】MVVM中如何从数据模型反查自定义控件实例(ImageView + Halcon)
  • 学习开发之条件函数
  • 如何用 LangChain 自定义 Chat Model —— 测试工程师实践指南
  • Maven生命周期:构建流程深度解析
  • eVTOL动力测试台架气动干扰分析与应对措施
  • TestCafe ➜ Playwright fixture 架构迁移指南
  • 上位机与Modbus的32位数据交互
  • 嘿嘿嘿嘿嘿
  • C++---多态
  • Camera2API笔记
  • Unity WebGL文本输入
  • centos7 安装jenkins
  • jenkins部署springboot项目
  • 抽象类与接口:Java面向对象设计的两大支柱
  • 表达式索引海外云持久化实践:关键技术解析与性能优化
  • Spring Boot 整合 RabbitMQ
  • 【前端】接口日志追踪
  • 06.消息传递网络
  • 「日拱一码」023 机器学习——超参数优化
  • 判断当前是否为钉钉环境
  • 【Pandas】pandas DataFrame from_dict
  • 1.2.3_1 OSI参考模型