Spark-SQL(一)

发布于:2025-04-16 ⋅ 阅读:(30) ⋅ 点赞:(0)

Spark SQL

概述

Spark SQL是Apache Spark用于处理结构化数据的模块

特点

1 易整合。无缝的整合了 SQL 查询和 Spark 编程

2 统一的数据访问。使用相同的方式连接不同的数据源

3 兼容 Hive。在已有的仓库上直接运行 SQL 或者 HQL

4 标准数据连接。通过 JDBC 或者 ODBC 来连接

代码

创建 DataFrame

1. 首先,在spark的bin目录下,创建data目录,在里面创建一个json文件,名为 user.json 文件中的数据为:

{"username":"zhangsan","age":20}

{"username":"lisi","age":17}

2. 读取 json 文件创建 DataFrame--------df

 val df = spark.read.json("D:/spark/bin/data/user.json")

3. 展示数据:

 df.show

SQL语法

1. 读取数据-----df1

 val df1 = spark.read.json("D:/spark/bin/data/user.json")

2. 对 DataFrame 创建一个临时表

 df1.createOrReplaceTempView("people")

3. 通过 SQL 语句实现查询全表

 val sqlDF = spark.sql("select * from people")

4. 结果展示

sqlDF.show

DSL语法

1. 创建一个 DataFrame-----df2

val df2 = spark.read.json("D:/spark/bin/data/user.json")

2. 查看 DataFrame 的 Schema 信息

df2.printSchema

3. 只查看"username"列数据

 df2.select("username").show()

4. 查看"username"列数据以及"age+1"数据

注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名

df2.select($"username",$"age" + 1).show

df2.select('username ,'age + 1).show()

5. 查看"age"大于"18"的数据

df2.filter($"age" > 18).show

6. 按照"age"分组,查看数据条数

 df2.groupBy("age").count.show

RDD 转换为 DataFrame

DataFrame 转换为 RDD

val df3 = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF

val rdd = df.rdd

val array = rdd.collect

注意:此时得到的 RDD 存储类型为 Row

array(0)

array(0)(0)

array(0).getAs[String]("name")

DataSet

1. 使用样例类序列创建 DataSet

case class Person(name: String, age: Long)

val caseClassDS = Seq(Person("zhangsan",2)).toDS()

caseClassDS.show

2. 使用基本类型的序列创建 DataSet

val ds = Seq(1,2,3,4,5).toDS

ds.show

注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet

RDD 转换为 DataSet

case class User(name:String, age:Int)

sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS

DataSet 转换为 RDD

case class User(name:String, age:Int)

val aaa = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS

val rdd = aaa.rdd

rdd.collect

DataFrame 和 DataSet 转换

DataFrame 转换为 DataSet

case class User(name:String, age:Int)

val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")

val ds = df.as[User]

DataSet 转换为 DataFrame

val ds = df.as[User]

val df = ds.toDF

RDD、DataFrame、DataSet 三者的关系

三者的共性

RDD、DataFrame、DataSet 全都是 spark 平台下的分布式弹性数据集,为处理超大型数

据提供便利;

三者都有惰性机制,在进行创建、转换,如 map 方法时,不会立即执行,只有在遇到Action 如 foreach 时,三者才会开始遍历运算;

三者有许多共同的函数,如 filter,排序等;

在对 DataFrame 和 Dataset 进行操作许多操作都需要这个包:

import spark.implicits._(在创建好 SparkSession 对象后尽量直接导入)

三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出

三者都有分区(partition)的概念

DataFrame 和 DataSet 均可使用模式匹配获取各个字段的值和类型

三者的区别

1. RDD

RDD 一般和 spark mllib 同时使用

RDD 不支持 sparksql 操作

2. DataFrame

与 RDD 和 Dataset 不同,DataFrame 每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值

DataFrame 与 DataSet 一般不与 spark mllib 同时使用

DataFrame 与 DataSet 均支持 SparkSQL 的操作,比如 select,groupby 之类,还能注册临时表/视窗,进行 sql 语句操作

DataFrame 与 DataSet 支持一些特别方便的保存方式,比如保存成 csv,可以带上表头,这样每一列的字段名一目了然

3. DataSet

Dataset 和 DataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同。

DataFrame 其实就是 DataSet 的一个特例 type DataFrame = Dataset[Row]

DataFrame 也可以叫 Dataset[Row],每一行的类型是 Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的 getAS 方法或者共性里提到的模式匹配拿出特定字段。而 Dataset 中,每一行是什么类型是不一定的,在自定义了 case class 之后可以很自由的获得每一行的信息。

三者可以通过上图的方式进行相互转换。


网站公告

今日签到

点亮在社区的每一天
去签到