一、UDF(User Defined Function:用户定义函数)
Hive本身内置大量用户函数UDF 来操作时间、字符串和其他的数据挖掘工具,同时也支持用户扩展UDF 函数来完成内置函数无法实现的操作。 官网API
1.1、继承UDF函数
继承UDF类,实现简单,只需要重写evaluate方法(该方法必须返回String类型)读取和返回基本类型,但是在hive3.0版本中,已不建议使用该类,推荐使用 GenericUDF
1.1.1、pom.xml 引入hive执行包
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.0</version> </dependency>
1.1.2、MaskUDF 实现掩码函数
3.0开始已不建议使用:
源码:
package com.renxiaozhao.udf; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.metadata.HiveException; /** * 掩码函数. * @author rxz 20220808 * */ @SuppressWarnings("deprecation") public class MaskUDF extends UDF { private static final int MASK_LEFT = 0;//掩码从左至右标识 private static final int MASK_RIGHT = -1;//掩码从右至左标识 /** * 掩码处理. * @param input 掩码输入参数 * @param startIndex 掩码从左至右标识:0-掩码从左至右;-1-掩码从右至左;其他则按位截取,比如2-从第三位开始截取 * @param subIndex 掩码位数 * @return * @throws HiveException */ public String evaluate(String input,Integer startIndex,Integer subIndex) throws HiveException { //掩码处理 String append = ""; if (startIndex == MASK_LEFT || startIndex == MASK_RIGHT) { for (int i = 0;i < subIndex;i++) { append += "*"; } } else { for (int i = 0;i < (subIndex - startIndex);i++) { append += "*"; } } if (startIndex == MASK_LEFT) { return append + input.substring(subIndex); } else if (startIndex == MASK_RIGHT) { return input.substring(0, input.length() - subIndex) + append; } else { return input.substring(0,startIndex) + append + input.substring(subIndex); } } //测试 public static void main(String[] args) { String input = "18866866888"; String append = ""; for (int i = 0;i < (6 - 4);i++) { append += "*"; } System.out.println(input.substring(0,4) + append + input.substring(6)); } }
1.1.3、打包放到hive环境
hive-env.sh配置jar包路径
将jar包放到 HIVE_AUX_JARS_PATH
对应的目录下(不再需要单独执行add jar …),重启hive
export HIVE_AUX_JARS_PATH=/root/collect
1.1.4、创建临时函数测试
create temporary function mask_udf as "com.renxiaozhao.udf.MaskUDF";
1.1.5、验证
select mask_udf('18792010988',3,6);
1.2、继承GenericUDF函数
适合处理复杂数据,list、map等,更加灵活,但是实现起来比较复杂,最容易报各种类之间的转换错,需要重写三个方法:
- initialize :初始化方法,可以给定义的List,初始化赋值、转换、校验参数个数等
- evaluate :同UDF,具体的逻辑处理
- getDisplayString:固定输出一段提示,可以展示一个正常的结果值
1.2.1、直接编写CutOffUDF 实现截取函数,其它同UDF步骤
package com.renxiaozhao.udf; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; /** * 截断函数. * @author rxz 20220810 * */ public class CutOffUDF extends GenericUDF { private static final int CUT_LEFT = 0;//截取从左至右标识 private static final int CUT_RIGHT = -1;//截取从右至左标识 ObjectInspectorConverters.Converter[] converters = new ObjectInspectorConverters.Converter[3]; /** * 初始化操作,在函数进行初始化的时候会执行,其他时间不执行. */ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { if (arguments.length != 3) { throw new UDFArgumentException("参数个数不符合要求,应包含三个参数"); } if (arguments.length == 3) { //判断: 参数类型 if ((((PrimitiveObjectInspector) arguments[0])).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) { throw new UDFArgumentException("第一个参数类型应为String"); } if ((((PrimitiveObjectInspector) arguments[1])).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.INT) { throw new UDFArgumentException("第二个参数类型应为Int"); } if ((((PrimitiveObjectInspector) arguments[2])).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.INT) { throw new UDFArgumentException("第三个参数类型应为Int"); } //参数 <--->参数转换器 converters[0] = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.javaStringObjectInspector); converters[1] = ObjectInspectorConverters.getConverter(arguments[1], PrimitiveObjectInspectorFactory.javaIntObjectInspector); converters[2] = ObjectInspectorConverters.getConverter(arguments[2], PrimitiveObjectInspectorFactory.javaIntObjectInspector); } return PrimitiveObjectInspectorFactory.javaStringObjectInspector; } /** * 进行业务计算逻辑,处理具体的数据. */ public String evaluate(DeferredObject[] arguments) throws HiveException { //输入掩码参数 String input = (String)converters[0].convert(arguments[0].get()); //0-掩码从左至右;-1-掩码从右至左 int startIndex = (Integer)converters[1].convert(arguments[1].get()); //掩码位数 int subIndex = (Integer)converters[2].convert(arguments[2].get()); //掩码处理 if (startIndex == CUT_LEFT) { return input.substring(0, input.length() - subIndex); } else if (startIndex == CUT_RIGHT) { return input.substring(subIndex); } else { return input.substring(0,startIndex) + input.substring(subIndex); } } /** * 进行函数描述结果的显示. */ public String getDisplayString(String[] children) { return "cutoffudf('18866866888',0,3) = 66866888"; } public static void main(String[] args) { String input = "18866866888"; System.out.println(input.substring(0,4) + input.substring(6)); } }
1.2.2、测试
create temporary function cutoff_udf as "com.renxiaozhao.udf.CutOffUDF"; select cutoff_udf('18792010988',3,6);
二、JDBC连接测试
2.1、创建表及数据
SQL 语句:
CREATE TABLE `rxz_udf_test`(`phone` varchar(100)); insert into rxz_udf_test(phone) values ('18792010980'); select * from rxz_udf_test;
特别慢…
2.2、创建正式函数
临时函数只针对当前会话,只在该会话内有效
create function rxz_mask as "com.renxiaozhao.udf.MaskUDF"; create function rxz_cutoff as "com.renxiaozhao.udf.CutOffUDF";
2.3、JDBC连接工具类及驱动引入
2.3.1、HiveJDBCUtil
package com.renxiaozhao.udf.util; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /**. * JDBC连接工具类 * @author https://renxiaozhao.blog.csdn.net/ * */ public class HiveJDBCUtil { private static final Logger LOGGER = LoggerFactory.getLogger(HiveJDBCUtil.class); public static Connection getConn() throws ClassNotFoundException { Connection conn = null; try { Class.forName("org.apache.hive.jdbc.HiveDriver"); conn = DriverManager.getConnection("jdbc:hive2://192.168.38.10:10000/default", "root", "");// 连接数据库 } catch (SQLException e) { LOGGER.error("HiveJDBCUtil.getConn()异常---->", e); } return conn; } public void closeConn(Connection conn) { try { if (conn != null) { conn.close(); conn = null; } } catch (SQLException e) { LOGGER.error("HiveJDBCUtil.closeConn()异常---->" + e); } } public static PreparedStatement getPStmt(Connection conn, String sql) { PreparedStatement pstmt = null; try { pstmt = conn.prepareStatement(sql); } catch (SQLException e) { LOGGER.error("HiveJDBCUtil.getPStmt()异常---->" + e); } return pstmt; } public static void closePStmt(PreparedStatement stmt) { try { if (stmt != null) { stmt.close(); stmt = null; } } catch (SQLException e) { LOGGER.error("HiveJDBCUtil.closePStmt()异常---->" + e); } } public static void closeRs(ResultSet rs) { try { if (rs != null) { rs.close(); rs = null; } } catch (SQLException e) { LOGGER.error("HiveJDBCUtil.closeRs()异常---->" + e); } } public ResultSet executeQuery(Connection conn, String sql) { ResultSet rs = null; try { rs = conn.createStatement().executeQuery(sql); } catch (SQLException e) { LOGGER.error("HiveJDBCUtil.executeQuery()异常---->", e); } return rs; } }
2.3.2、pom.xml引入hive-jdbc驱动
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>3.1.0</version> </dependency>
2.4、UDF测试类
2.4.1、UdfUseDemo
package com.renxiaozhao.udf; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import com.renxiaozhao.udf.util.HiveJDBCUtil; /** * UdfUseDemo. * @author https://renxiaozhao.blog.csdn.net/ */ public class UdfUseDemo { public static void main(String[] args) throws ClassNotFoundException, SQLException { Connection conn = HiveJDBCUtil.getConn(); //掩码函数rxz_mask String maskSql1 = "select rxz_mask(phone,0,4) as maskPhone from rxz_udf_test"; String maskSql2 = "select rxz_mask(phone,-1,4) as maskPhone from rxz_udf_test"; String maskSql3 = "select rxz_mask(phone,3,4) as maskPhone from rxz_udf_test"; PreparedStatement maskPstmt1 = HiveJDBCUtil.getPStmt(conn,maskSql1); PreparedStatement maskPstmt2 = HiveJDBCUtil.getPStmt(conn,maskSql2); PreparedStatement maskPstmt3 = HiveJDBCUtil.getPStmt(conn,maskSql3); ResultSet maskRes1 = maskPstmt1.executeQuery(); ResultSet maskRes2 = maskPstmt2.executeQuery(); ResultSet maskRes3 = maskPstmt3.executeQuery(); while (maskRes1.next()) { System.out.println("掩码 rxz_mask(phone,0,4):" + maskRes1.getString("maskPhone")); } while (maskRes2.next()) { System.out.println("掩码 rxz_mask(phone,-1,4):" + maskRes2.getString("maskPhone")); } while (maskRes3.next()) { System.out.println("掩码 rxz_mask(phone,3,4):" + maskRes3.getString("maskPhone")); } HiveJDBCUtil.closePStmt(maskPstmt1); HiveJDBCUtil.closePStmt(maskPstmt2); HiveJDBCUtil.closePStmt(maskPstmt3); HiveJDBCUtil.closeRs(maskRes1); HiveJDBCUtil.closeRs(maskRes2); HiveJDBCUtil.closeRs(maskRes3); //截断函数rxz_cutoff conn = HiveJDBCUtil.getConn(); String cutoffSql1 = "select rxz_cutoff(phone,0,4) as cutoffPhone from rxz_udf_test"; String cutoffSql2 = "select rxz_cutoff(phone,-1,4) as cutoffPhone from rxz_udf_test"; String cutoffSql3 = "select rxz_cutoff(phone,3,4) as cutoffPhone from rxz_udf_test"; PreparedStatement cutoffPstmt1 = HiveJDBCUtil.getPStmt(conn,cutoffSql1); PreparedStatement cutoffPstmt2 = HiveJDBCUtil.getPStmt(conn,cutoffSql2); PreparedStatement cutoffPstmt3 = HiveJDBCUtil.getPStmt(conn,cutoffSql3); ResultSet cutoffRes1 = cutoffPstmt1.executeQuery(); ResultSet cutoffRes2 = cutoffPstmt2.executeQuery(); ResultSet cutoffRes3 = cutoffPstmt3.executeQuery(); while (cutoffRes1.next()) { System.out.println("截断 rxz_cutoff(phone,0,4):" + cutoffRes1.getString("cutoffPhone")); } while (cutoffRes2.next()) { System.out.println("截断 rxz_cutoff(phone,-1,4):" + cutoffRes2.getString("cutoffPhone")); } while (cutoffRes3.next()) { System.out.println("截断 rxz_cutoff(phone,3,4):" + cutoffRes3.getString("cutoffPhone")); } HiveJDBCUtil.closePStmt(cutoffPstmt1); HiveJDBCUtil.closePStmt(cutoffPstmt2); HiveJDBCUtil.closePStmt(cutoffPstmt3); HiveJDBCUtil.closeRs(cutoffRes1); HiveJDBCUtil.closeRs(cutoffRes2); HiveJDBCUtil.closeRs(cutoffRes3); } }
2.4.2、测试结果
2.4.2.1、截断函数报错
2.4.2.2、原因-类型不匹配
表定义的 varchar
类型,代码写的 String
类型,不想打包了,直接修改字段类型,再次验证:
2.4.2.2.1、修改字段类型
alter table rxz_udf_test change column phone phone string;
2.4.2.2.2、再次验证
JDBC验证:
三、其他
- 测试报
Invalid function xxx
,可以同步UDF函数,登录hive执行reload function;
- Concurrency mode is disabled, not creating a lock manager
放开hive-env.sh
的export HADOOP_HEAPSIZE=1024
,默认256,放开后重启依然会报上面提示,不过目前没发现影响。
源码下载
-
包含之前采集hive元数据的代码,需要自取
本文含有隐藏内容,请 开通VIP 后查看