目标:通过UDF实现对表历史数据清除
入参:表名、保留天数N
一、pom文件
< project xmlns = " http://maven.apache.org/POM/4.0.0"
xmlns: xsi= " http://www.w3.org/2001/XMLSchema-instance"
xsi: schemaLocation= " http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
< modelVersion> 4.0.0</ modelVersion>
< groupId> com.example</ groupId>
< artifactId> hive-udf-example</ artifactId>
< version> 1.0-SNAPSHOT</ version>
< packaging> jar</ packaging>
< name> hive-udf-example</ name>
< description> Hive UDF for deleting partitions by date</ description>
< properties>
< project.build.sourceEncoding> UTF-8</ project.build.sourceEncoding>
< maven.compiler.source> 1.8</ maven.compiler.source>
< maven.compiler.target> 1.8</ maven.compiler.target>
</ properties>
< dependencies>
< dependency>
< groupId> org.apache.hive</ groupId>
< artifactId> hive-exec</ artifactId>
< version> 1.2.1</ version>
</ dependency>
< dependency>
< groupId> org.apache.hive</ groupId>
< artifactId> hive-metastore</ artifactId>
< version> 1.2.1</ version>
</ dependency>
< dependency>
< groupId> org.apache.hadoop</ groupId>
< artifactId> hadoop-client</ artifactId>
< version> 2.7.3</ version>
</ dependency>
< dependency>
< groupId> org.slf4j</ groupId>
< artifactId> slf4j-api</ artifactId>
< version> 1.7.25</ version>
</ dependency>
< dependency>
< groupId> org.slf4j</ groupId>
< artifactId> slf4j-log4j12</ artifactId>
< version> 1.7.25</ version>
</ dependency>
< dependency>
< groupId> junit</ groupId>
< artifactId> junit</ artifactId>
< version> 4.12</ version>
< scope> test</ scope>
</ dependency>
</ dependencies>
< build>
< plugins>
< plugin>
< groupId> org.apache.maven.plugins</ groupId>
< artifactId> maven-compiler-plugin</ artifactId>
< version> 3.8.1</ version>
< configuration>
< source> 1.8</ source>
< target> 1.8</ target>
</ configuration>
</ plugin>
</ plugins>
</ build>
</ project>
二、java代码
package org. udf ;
import org. apache. hadoop. fs. * ;
import org. apache. hadoop. hive. conf. HiveConf ;
import org. apache. hadoop. hive. ql. exec. Description ;
import org. apache. hadoop. hive. ql. exec. UDF;
import org. apache. hadoop. hive. metastore. HiveMetaStoreClient ;
import org. apache. hadoop. hive. metastore. api. FieldSchema ;
import org. apache. hadoop. hive. metastore. api. Partition ;
import org. apache. hadoop. hive. metastore. api. Table ;
import org. apache. hadoop. conf. Configuration ;
import org. apache. thrift. TException ;
import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
import java. io. IOException ;
import java. text. SimpleDateFormat ;
import java. util. * ;
@Description (
name = "del_dt" ,
value = "通过删除HDFS文件并同步元数据的方式删除表N天前的分区 - 入参:表名, N(天数)"
)
public class del_dt extends UDF {
private static final Logger LOG = LoggerFactory . getLogger ( del_dt. class ) ;
public String evaluate ( String tableName, int days) {
if ( tableName == null || days < 0 ) {
return "错误:表名不能为空且天数不能为负数" ;
}
Configuration conf = new Configuration ( ) ;
FileSystem fs = null ;
HiveMetaStoreClient client = null ;
int deletedCount = 0 ;
try {
fs = FileSystem . get ( conf) ;
HiveConf hiveConf = new HiveConf ( conf, this . getClass ( ) ) ;
client = new HiveMetaStoreClient ( hiveConf) ;
String dbName = "bjsythzczcpt" ;
String tableOnlyName = tableName;
if ( tableName. contains ( "." ) ) {
String [ ] parts = tableName. split ( "\\." ) ;
dbName = parts[ 0 ] ;
tableOnlyName = parts[ 1 ] ;
}
if ( ! client. tableExists ( dbName, tableOnlyName) ) {
return "错误:表 " + tableName + " 不存在" ;
}
Table table = client. getTable ( dbName, tableOnlyName) ;
List < FieldSchema > partitionKeys = table. getPartitionKeys ( ) ;
if ( partitionKeys == null || partitionKeys. isEmpty ( ) ) {
return "错误:表 " + tableName + " 不是分区表" ;
}
boolean hasDatePartition = false ;
for ( FieldSchema key : partitionKeys) {
if ( key. getName ( ) . equalsIgnoreCase ( "dt" ) ) {
hasDatePartition = true ;
break ;
}
}
if ( ! hasDatePartition) {
return "错误:表 " + tableName + " 不包含日期分区列(dt)" ;
}
Calendar cal = Calendar . getInstance ( ) ;
cal. add ( Calendar . DAY_OF_YEAR, - days) ;
Date cutoffDate = cal. getTime ( ) ;
SimpleDateFormat sdf = new SimpleDateFormat ( "yyyyMMdd" ) ;
String cutoffDateStr = sdf. format ( cutoffDate) ;
List < Partition > partitions = client. listPartitions ( dbName, tableOnlyName, ( short ) - 1 ) ;
String tableLocation = table. getSd ( ) . getLocation ( ) ;
for ( Partition partition : partitions) {
Map < String , String > partitionValues = getPartitionValues ( client, partition) ;
String dtValue = partitionValues. get ( "dt" ) ;
if ( dtValue != null ) {
try {
Date partitionDate = sdf. parse ( dtValue) ;
if ( partitionDate. before ( cutoffDate) ) {
String partitionPath = buildPartitionPath ( tableLocation, partition. getValues ( ) , partitionKeys) ;
Path hdfsPath = new Path ( partitionPath) ;
if ( fs. exists ( hdfsPath) ) {
fs. delete ( hdfsPath, true ) ;
LOG. info ( "成功删除HDFS分区路径: {}" , partitionPath) ;
client. dropPartition ( dbName, tableOnlyName, partition. getValues ( ) , true ) ;
deletedCount++ ;
LOG. info ( "成功删除分区: {}" , partition. getValues ( ) ) ;
}
}
} catch ( Exception e) {
LOG. error ( "处理分区失败 ({}): {}" , partition. getValues ( ) , e. getMessage ( ) ) ;
}
}
}
return "操作完成:成功删除 " + deletedCount + " 个分区" ;
} catch ( IOException | TException e) {
LOG. error ( "执行失败: {}" , e. getMessage ( ) ) ;
return "错误:执行失败 - " + e. getMessage ( ) ;
} finally {
if ( fs != null ) {
try {
fs. close ( ) ;
} catch ( IOException e) {
LOG. error ( "关闭HDFS连接失败: {}" , e. getMessage ( ) ) ;
}
}
if ( client != null ) {
client. close ( ) ;
}
}
}
private Map < String , String > getPartitionValues ( HiveMetaStoreClient client, Partition partition) {
Map < String , String > values = new HashMap < > ( ) ;
List < String > partitionVals = partition. getValues ( ) ;
try {
Table table = client. getTable ( partition. getDbName ( ) , partition. getTableName ( ) ) ;
List < FieldSchema > partitionKeys = table. getPartitionKeys ( ) ;
for ( int i = 0 ; i < Math . min ( partitionKeys. size ( ) , partitionVals. size ( ) ) ; i++ ) {
values. put ( partitionKeys. get ( i) . getName ( ) , partitionVals. get ( i) ) ;
}
} catch ( TException e) {
LOG. error ( "获取分区键失败: {}" , e. getMessage ( ) ) ;
}
return values;
}
private String buildPartitionPath ( String tableLocation, List < String > partitionValues, List < FieldSchema > partitionKeys) {
StringBuilder pathBuilder = new StringBuilder ( tableLocation) ;
for ( int i = 0 ; i < partitionValues. size ( ) ; i++ ) {
if ( i < partitionKeys. size ( ) ) {
pathBuilder. append ( "/" )
. append ( partitionKeys. get ( i) . getName ( ) )
. append ( "=" )
. append ( partitionValues. get ( i) ) ;
}
}
return pathBuilder. toString ( ) ;
}
}
三、函数创建与修改
add jar hdfs:
CREATE FUNCTION del_dt AS 'org.udf.del_dt' ;
DELETE jar hdfs:
add jar hdfs:
drop FUNCTION del_dt;
CREATE FUNCTION del_dt AS 'org.udf.del_dt' ;
四、调用示例;
hive> SELECT del_dt( 'dwd_abc_df' , 7 ) ;
OK
操作完成:成功删除 0 个分区
Time taken: 0.192 seconds