SparkSQL函数

发布于:2025-02-10 ⋅ 阅读:(34) ⋅ 点赞:(0)

1. SparkSQL函数概述

  • Spark SQL 提供了丰富的内置函数,用于数据处理和分析。这些函数包括字符串操作、日期时间处理、数学计算、聚合函数以及窗口函数等。通过使用这些函数,用户可以高效地进行数据转换、过滤和聚合操作。Spark SQL 还支持用户自定义函数(UDF),以满足特定需求。这些函数使得在分布式环境中处理大规模数据变得更加便捷和灵活。

2. SparkSQL内置函数

2.1 常用内置函数分类

在这里插入图片描述

2.2 常用数组函数

2.2.1 array()函数

1. 定义
  • 在 Spark SQL 中,array()函数用于创建一个包含指定元素的数组。该函数接受任意数量的参数,并将它们组合成一个数组。数组中的元素可以是任意类型,包括基本类型和复杂类型。
2. 语法
  • 语法:array(expr1, expr2, ..., exprN)
  • 参数:expr1, expr2, …, exprN:要包含在数组中的元素。这些表达式可以是列名、常量或其他表达式。
  • 返回值:返回一个包含指定元素的数组。
3. 示例
  • 创建包含常量的数组,执行命令:spark.sql("SELECT array(1, 5, 2, 6, 9) AS arr").show()
    在这里插入图片描述
  • 使用列创建数组,执行命令:spark.sql("SELECT array(name, age) AS name_age_arr FROM student").show()
    在这里插入图片描述
  • 创建嵌套数组,执行命令:spark.sql("SELECT array(array(1, 2), array(3, 4)) AS nested_array").show()
    在这里插入图片描述

2.3 常用日期与时间戳函数

2.4 常见聚合函数

2.5 常见窗口函数

3. SparkSQL自定义函数

3.1 自定义函数分类

UDF 函数 UDAF 函数 UDTF 函数
一进一出(使用广泛) 多进一出(使用一般) 一进多出(很少使用)

在这里插入图片描述

3.2 自定义函数案例演示

  • 在 Spark SQL 中,你可以使用 UDF(用户定义函数)来扩展 SQL 查询的功能。以下是一个简单的示例,展示如何使用 SparkSQL 自定义函数(UDF)。

3.2.1 定义自定义函数

  • 创建一个自定义函数,用于计算字符串的反转
import org.apache.spark.sql.functions.udf

// 定义 UDF 函数:反转字符串
val reverseString = udf((s: String) => s.reverse)

// 注册 UDF 函数
spark.udf.register("reverseString", reverseString)
  • 依次执行上述代码
    在这里插入图片描述

3.2.2 使用自定义函数

  • 注册完成后,你可以在 Spark SQL 中使用这个函数。例如,有一个 DataFrame,名为 df,包含一个 name 字段,你希望对每个名字应用反转操作。
val df = Seq(
  ("John"),
  ("Alice"),
  ("Bob")
).toDF("name")

// 使用 SQL 语句来调用 UDF 函数
df.createOrReplaceTempView("people")
val result = spark.sql("SELECT name, reverseString(name) AS reversed_name FROM people")

// 显示处理结果
result.show()
  • 执行上述代码,查看结果
    在这里插入图片描述

3.3 自定义函数课堂练习

3.3.1 提出任务

  • 要使用SparkSQL自定义函数(UDF)将文本文件中的所有小写英文字母转换为大写字母并打印到控制台

3.3.2 完成任务

1. 创建Maven项目
  • 创建SparkSQLFunctions项目
    在这里插入图片描述
  • pom.xml里添加相关依赖
    在这里插入图片描述
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>net.huawei.sql</groupId>
    <artifactId>SparkSQLFunctions</artifactId>
    <version>1.0-SNAPSHOT</version>   

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.3.0</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
    </build>

</project>
  • 刷新项目依赖
    在这里插入图片描述
  • 创建日志属性文件log4j2.properties
    在这里插入图片描述
rootLogger.level = ERROR
rootLogger.appenderRef.stdout.ref = console

appender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex
2. 准备数据文件
  • 在项目根目录创建data目录
    在这里插入图片描述
  • data里创建words.txt文件
    在这里插入图片描述
hello world hello hadoop
hello scala hello spark
now we are in big data era
we learn hadoop and spark

在这里插入图片描述

3. 创建Scala对象
  • net.huawei.sql里创建UDFToUpperCase对象
    在这里插入图片描述
package net.huawei.sql

import org.apache.spark.sql.SparkSession

/**
 * 功能:转成大写字母的UDF
 * 作者:华卫
 * 日期:2025年01月20日
 */
object UDFToUpperCase {
  def main(args: Array[String]): Unit = {
    // 获取或创建Spark会话对象
    val spark = SparkSession.builder() // 创建Builder对象
      .appName("UDFToUpperCase") // 设置应用程序名称
      .master("local[*]") // 运行模式:本地运行
      .getOrCreate() // 获取或创建Spark会话对象

    // 读取文本文件
    val fileDS = spark.read.textFile("data/words.txt")
    // 注册UDF函数
    spark.udf.register("udfToUpperCase", (str: String) => str.toUpperCase)
    // 创建视图
    fileDS.createOrReplaceTempView("words")
    // 执行SQL查询,使用UDF函数
    spark.sql(
      """
      |SELECT
      | value AS word, udfToUpperCase(value) as upper_word
      |FROM
      | words
      |""".stripMargin).show(false)

    // 关闭会话对象
    spark.stop()
  }
}
  • 代码说明:该代码使用SparkSQL实现了一个UDF(用户自定义函数),将文本文件中的小写字母转换为大写字母。首先,创建Spark会话并读取文本文件。然后,注册UDF函数udfToUpperCase,将字符串转换为大写。接着,创建临时视图并执行SQL查询,使用UDF函数转换数据并显示结果。最后,关闭Spark会话。
4. 运行程序,查看结果
  • 运行UDFToUpperCase对象
    在这里插入图片描述

4. 实战小结

  • 在本次实战中,我们深入探讨了SparkSQL的内置函数和自定义函数(UDF)的使用。首先,我们了解了SparkSQL提供的丰富内置函数,包括字符串操作、日期时间处理、数学计算、聚合函数以及窗口函数等。这些内置函数使得数据处理和分析变得更加高效和便捷。例如,我们学习了array()函数的用法,它可以将多个元素组合成一个数组,并支持嵌套数组的创建。

  • 接着,我们重点介绍了SparkSQL的自定义函数(UDF)。通过定义一个将小写字母转换为大写字母的UDF,我们展示了如何在SparkSQL中扩展SQL查询的功能。我们首先创建了一个Spark会话并读取了文本文件,然后注册了UDF函数udfToUpperCase,并使用SQL查询将文本文件中的小写字母转换为大写字母。最后,我们将转换后的结果打印到控制台并关闭了Spark会话。

  • 通过这个实战,我们不仅掌握了如何在SparkSQL中使用UDF,还加深了对Spark数据处理流程的理解。这个案例展示了SparkSQL在数据转换和处理中的强大功能,为处理更复杂的数据任务奠定了基础。此外,我们还学习了如何通过SQL语句与DataFrame API结合使用,进一步提高了数据处理的灵活性和效率。这次实战为我们今后在分布式环境中处理大规模数据提供了宝贵的经验。


网站公告

今日签到

点亮在社区的每一天
去签到