sparkSQL的UDF,最常用的regeister方式自定义函数和udf注册方式定义UDF函数 (详细讲解)

发布于:2024-11-11 ⋅ 阅读:(119) ⋅ 点赞:(0)

- UDF:一对一的函数【User Defined Functions】
  - substr、split、concat、instr、length、from_unixtime
- UDAF:多对一的函数【User Defined Aggregation Functions】 聚合函数
  - count、sum、max、min、avg、collect_set/list
- UDTF:一对多的函数【User Defined Tabular Functions】
  - explode、json_tuple【解析JSON格式】、parse_url_tuple【解析URL函数】

Spark中支持UDF和UDAF两种,支持直接使用Hive中的UDF、UDAF、UDTF.

pyspark中自定义函数的三种写法:

使用最常用的regeister方式自定义函数 

 最常用的方式,这种方式编写的函数,既能用于SQL中,也能用于DSL中

语法:

UDF变量名 = spark.udf.register(UDF函数名, 函数的处理逻辑)

定义:spark.udf.register()
UDF变量名:DSL中调用UDF使用的
UDF函数名:SQL中调用UDF使用

案例:

查看以下数据

id    name    msg
01    周杰伦    150/175
02    周杰    130/185
03    周华健    148/178
04    周星驰    130/175
05    闫妮    110/180

将以上数据,通过自定义函数,变为如下数据:
01    周杰伦     150斤/175cm
02    周杰      130斤/185cm
03    周华健  148斤/178cm

 

第一步 :自定义函数 

# 编写一个普通的函数,用于写逻辑
def get_data(str1):
    list1 = str1.split("/")
    return list1[0] + "斤/" + list1[1] + "cm" 

 第二步:注册函数

# 定义一个UDF:变量名-dsl = spark.udf.register(函数名-sql, 处理逻辑, 返回值)
    # get_new_info 用于 sql 中
    # get_info 用于DSL
    get_info = spark.udf.register(name="get_new_info", f=lambda oldinfo: get_data(oldinfo))

第三步:使用函数

#使用sql的方式调用
    spark.sql("select id,name,get_new_info(msg) from star").show()

    # 使用dsl的方式调用
    # DSL:用变量名
    import pyspark.sql.functions as F

    new_df.select(F.col("id"), F.col("name"), F.col("msg"), get_info(F.col("msg")).alias("newinfo")).show()

代码演示 以及解释

import os

from pyspark.sql import SparkSession

import pyspark.sql.functions as F
if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'D:/java/jdk'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    spark = SparkSession.builder.master("local[2]").appName("").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate()
    # 第一种方案
    # format("csv") 是读取文件格式,后面是文件路径  toDF("id","name","msg")是把文件里面的数据源变成指定的字段
    df = spark.read.format("csv").option("sep", "\t").load("../../datas/function/udf.txt").toDF("id", "name","msg") 
    df.createOrReplaceTempView("t") #给数据源起个名字
    # 编写sql
    spark.sql("""
        select id,name,concat(split(msg,"/")[0],'斤/',split(msg,"/")[1],'cm')msg from t   
    """).show()
    # 第二种方案 自定义函数
    #第一步 定义函数
    def my_function(msg):
        return msg.split("/")[0] + "斤/" + msg.split("/")[1] + "cm"
    # 第二步注册函数
    my_function2 = spark.udf.register("my_function",my_function)
    # 第三步调用函数
    spark.sql("""
        select id,name,my_function(msg) msg from t
    """).show()
    # 自定义函数DSL使用 register
    df.select(F.col("id"),F.col("name"),my_function2(F.col("msg"))).show()
    spark.stop()

 

 


网站公告

今日签到

点亮在社区的每一天
去签到