spark知识点

发布于:2024-05-16 ⋅ 阅读:(60) ⋅ 点赞:(0)

目录

第二章Scala基础

一.Scala常用数据类型

二.定义与使用数组

三.定义与使用函数

四.定义与使用列表

五.定义与使用集合

六.定义与使用映射

七.定义与使用元组

第三章Spark编程基础

一.从内存中读取创建RDD

二.从外部存储系统中读取数据创建RDD

 三.RDD方法归纳

1.使用map()方法转换数据

2.使用 sortBy()方法进行排序

3.使用collect()方法查询数据

4.使用flatMap()方法转换数据

5.使用take()方法查询某几个值

6.使用union()方法合并多个RDD

7.使用filter()方法进行过滤

8.使用distinct()方法进行去重

第四章Spark SQL

一.Spark SQL的简介

二.DataFrame概述


第二章Scala基础

一.Scala常用数据类型

二.定义与使用数组

数组是Scala中常用的一种数据结构,数组是一种存储了相同类型元素的固定大小的顺序集合,

Scala定义一个数组的语法格式如下:

# 第1种方式

var arr: Array[String] = new Array[String](num)

# 第2种方式

var arr:Array[String] = Array(元素1,元素2,…)

三.定义与使用函数

函数是Scala的重要组成部分,Scala作为支持函数式编程的语言,可以将函数作为对象.

定义函数的语法格式如下:

# def functionName(参数列表):[return type]={}

Scala提供了多种不同的函数调用方式,以下是调用函数的标准格式。

# functionName(参数列表)

如果函数定义在一个类中,那么可以通过“类名.方法名(参数列表)”的方式调用

四.定义与使用列表

列表操作常用方法

Scala中常用的查看列表元素的方法有head、init、last、tail和take()。

1.head:查看列表的第一个元素。

2.tail:查看第一个元素之后的其余元素。

3.last:查看列表的最后一个元素。

4.Init:查看除最后一个元素外的所有元素。

5.take():查看列表前n个元素。

五.定义与使用集合

Scala Set(集合)是没有重复的对象集合,所有的元素都是唯一的

集合操作常用方法

Scala合并两个列表时使用的是:::()或concat()方法,而合并两个集合使用的是++()方法。

六.定义与使用映射

1. 映射( Map ) 是一种可迭代的键值对结构 。

2. 所有的值都可以通过键来获取 ,并且映射中的键都是唯一的 。

3. 集合操作常用方法同样也适合映射。

4. 另外映射还可以通过 keys 方法获取所有的键,通过 values 方法获取所有值,也可以通过 isEmpty 方法判断映射的数据是否为空

七.定义与使用元组

1.元组(Tuple)是一种类似于列表的结构,但与列表不同的是,元组可以包含不同类型的元素。

2.元组的值是通过将单个的值包含在圆括号中构成的

3.目前,Scala支持的元组最大长度为22,即Scala元组最多只能包含22个元素

4.访问元组元素可以通过“元组名称._元素索引”进行,索引从1开始

第三章Spark编程基础

一.从内存中读取创建RDD

1.parallelize()

parallelizeO方法有两个输人参数,说明如下:

(1)要转化的集合:必须是 Seq集合。Seq 表示序列,指的是一类具有一定长度的、可迭代访问的对象,其中每个数据元素均带有一个从0开始的、固定的索引。

(2)分区数。若不设分区数,则RDD 的分区数默认为该程序分配到的资源的 CPU核心数。

通过 parallelizeO方法用一个数组的数据创建RDD,并设置分区数为4,如代码3-1所示,创建后查看该 RDD 的分区数,结果如图所示。

2.makeRDD()

makeRDD0方法有两种使用方式,第一种使用方式与 parallelize0方法一致;第二种方式是通过接收一个 Seq[(T,Seq[String])]参数类型创建 RDD。第二种方式生成的RDD中保存的是T的值,Seq[String]部分的数据会按照 Seqf(T,Seq[String])的顺序存放到各个分区中,一个 Seq[Stringl对应存放至一个分区,并为数据提供位置信息,通过preferredLocations0方法可以根据位置信息查看每一个分区的值。调用 makeRDD0时不可以直接指定 RDD 的分区个数,分区的个数与 Seq[String]参数的个数是保持一致的。

使用 makeRDD0方法创建 RDD,并根据位置信息查看每一个分区的值,结果如图所示。

二.从外部存储系统中读取数据创建RDD

1.从外部存储系统中读取数据创建RDD是指直接读取存放在文件系统中的数据文件创建RDD。从内存中读取数据创建 RDD 的方法常用于测试,从外部存储系统中读取数据创建 RDD 才是用于实践操作的常用方法。

2.从外部存储系统中读取数据创建 RDD 的方法可以有很多种数据来源,可通过SparkContext对象的 textFile0方法读取数据集。textFileO方法支持多种类型的数据集,如目录、文本文件、压缩文件和通配符匹配的文件等,并且允许设定分区个数,分别读取 HDFS文件和Linux本地文件的数据并创建 RDD,具体操作如下。

(1)通过HDFS文件创建 RDD

这种方式较为简单和常用,直接通过 textFile()方法读取 HDFS文件的位置即可。

在HDFS 的/user/toot 目录下有一个文件test.txt,读取该文件创建一个 RDD,代码如下

val test = sc. textile ("/user/root/test.txt")

(2)通过 Linux 本地文件创建 RDD

本地文件的读取也是通过 sc.textFile("路径")的方法实现的,在路径前面加上“file://”表示从Linux 本地文件系统读取。在 IntelliJIDEA 开发环境中可以直接读取本地文件;但在 spark-shell 中,要求在所有节点的相同位置保存该文件才可以读取它,例如,在Linux的/opt 目录下创建一个文件 test.txt,任意输入4行数据并保存,将 test.txt 文件远程传输至所有节点的/opt 目录下,才可以读取文件 test.txt。读取 test.txt 文件,并且统计文件的数据行数,代码如下

 三.RDD方法归纳

1.使用map()方法转换数据

             map()方法是一种基础的RDD转换操作,可以对 RDD 中的每一个数据元素通过某种函数进行转换并返回新的RDD。mapO方法是懒操作,不会立即进行计算。

                转换操作是创建RDD的第二种方法,通过转换已有RDD生成新的RDD。因为RDD是一个不可变的集合,所以如果对 RDD 数据进行了某种转换,那么会生成一个新的 RDD。

例如,通过一个存放了5个 Int类型的数据元素的列表创建一个 RDD,可通过 map0方法对每一个元素进行平方运算,结果会生成一个新的RDD,代码如下:

2.使用 sortBy()方法进行排序

sortBy0方法用于对标准RDD 进行排序,有3个可输人参数,说明如下。

        (1)第1个参数是一个函数f:(T)=>K,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。

        (2)第2个参数是 ascending,决定排序后 RDD 中的元素是升序的还是降序的,默认是 true,即升序排序,如果需要降序排序则需要将参数的值设置为 false。

        (3)第3个参数是numPartitions,决定排序后的RDD 的分区个数,默认排序后的分区个数和排序之前的分区个数相等,即 this.partitions.size。

第一个参数是必须输人的,而后面的两个参数可以不输人。例如,通过一个存放了 3个二元组的列表创建一个 RDD,对元组的第二个值进行降序排序,分区个数设置为1,代码如下

3.使用collect()方法查询数据

          collectO方法是一种行动操作,可以将 RDD 中所有元素转换成数组并返回到 Driver 端,适用于返回处理后的少量数据。因为需要从集群各个节点收集数据到本地,经过网络传输,并且加载到 Driver 内存中,所以如果数据量比较大,会给网络传输造成很大的压力。因此,数据量较大时,尽量不使用collectO方法,否则可能导致Driver 端出现内存溢出间题。collectO方法有以下两种操作方式。

(1) collect:直接调用 collect 返回该 RDD 中的所有元素,返回类型是一个 Array[T数组,这是较为常用的一种方式。

(2)collect[U: ClassTag](f: PartialFunction[T, U]):RDD[U]。这种方式需要提供一个标准的偏函数,将元素保存至一个RDD中。首先定义一个函数one,用于将collect方法得到的数组中数值为1的值替换为“one”,将其他值替换为“other”。

4.使用flatMap()方法转换数据

        flatMap()方法将函数参数应用于RDD之中的每一个元素,将返回的迭代器(如数组、列表等)中的所有元素构成新的RDD。

        使用flatMap()方法时先进行map(映射)再进行flat(扁平化)操作,数据会先经过跟map一样的操作,为每一条输入返回一个迭代器(可迭代的数据类型),然后将所得到的不同级别的迭代器中的元素全部当成同级别的元素,返回一个元素级别全部相同的RDD。这个转换操作通常用来切分单词。

        例如,分别用 maPO方法和 AatapO方法分制字符串。用 mapO方法分削后,每个元素对应返回一个迷代器,即数组。fatNapO方法在进行同 mapO方法一样的操作后,将3个选代器的元素扁平化(压成同一级别),保存在新 RDD 中,代码如下

5.使用take()方法查询某几个值

        take(N)方法用于获取RDD的前N个元素,返回数据为数组。take()与collect()方法的原理相似,collect()方法用于获取全部数据,take()方法获取指定个数的数据。获取RDD的前5个元素,代码如下。

6.使用union()方法合并多个RDD

        union()方法是一种转换操作,用于将两个RDD合并成一个,不进行去重操作,而且两个RDD中每个元素中的值的个数、数据类型需要保持一致。代码如下

7.使用filter()方法进行过滤

filter()方法是一种转换操作,用于过滤RDD中的元素。

filter()方法需要一个参数,这个参数是一个用于过滤的函数,该函数的返回值为Boolean类型。

filter()方法将返回值为true的元素保留,将返回值为false的元素过滤掉,最后返回一个存储符合过滤条件的所有元素的新RDD。

创建一个RDD,并且过滤掉每个元组第二个值小于等于1的元素。代码如下

8.使用distinct()方法进行去重

        distinct()方法是一种转换操作,用于RDD的数据去重,去除两个完全相同的元素,没有参数。创建一个带有重复数据的RDD,并使用distinct()方法去重。代码如下

三.使用简单的集合操作

(1)intersection()方法

intersection()方法用于求出两个RDD的共同元素,即找出两个RDD的交集,参数是另一个RDD,先后顺序与结果无关。创建两个RDD,其中有相同的元素,通过intersection()方法求出两个RDD的交集

(2)subtract()方法

subtract()方法用于将前一个RDD中在后一个RDD出现的元素删除,可以认为是求补集的操作,返回值为前一个RDD去除与后一个RDD相同元素后的剩余值所组成的新的RDD。两个RDD的顺序会影响结果。创建两个RDD,分别为rdd1和rdd2,包含相同元素和不同元素,通过subtract()方法求rdd1和rdd2彼此的补集。

(3)cartesian()方法

cartesian()方法可将两个集合的元素两两组合成一组,即求笛卡儿积。创建两个RDD,分别有4个元素,通过cartesian()方法求两个RDD的笛卡儿积。

任务实现

第四章Spark SQL

一.Spark SQL的简介

Spark SQL主要提供了以下三个功能:

1.Spark SQL可从各种结构化数据源中读取数据,进行数据分析。

2.Spark SQL包含行业标准的JDBC和ODBC连接方式,因此它不局限于在Spark程序内使用SQL语句进行查询。

3.Spark SQL可以无缝地将SQL查询与Spark程序进行结合,它能够将结构化数据作为Spark中的分布式数据集(RDD)进行查询。

二.DataFrame概述

1.DataFrame简介

 2.DataFrame的创建

我们通过Spark读取数据源的方式进行创建DataFrame

3.RDD直接转换为DataFrame

在Scala语言中使用Spark SQL进行操作时,通常会用到DataFrame API。以下是一个简单的例子,展示如何使用Scala语言结合Spark SQL创建DataFrame并执行SQL查询:

首先,确保你已经有了一个SparkSession实例,这是与Spark交互的主要入口点。

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.sql.types._

// 初始化SparkSession

val spark = SparkSession.builder()

  .appName("Spark SQL example")

  .getOrCreate()

import spark.implicits._

然后,定义一个schema来描述你的数据。例如,如果你有一个包含姓名、年龄和薪资的表,你可以这样定义schema:

val schema = StructType(Array(

  StructField("name", StringType),

  StructField("age", IntegerType),

  StructField("salary", DoubleType)

))

接下来,创建一个DataFrame。你可以手动创建Row实例,也可以从CSV文件或其他数据源加载数据:

// 手动创建Row实例

val rows = Array(

  Row("Alice", 30, 80000.0),

  Row("Bob", 25, 70000.0),

  Row("Charlie", 35, 95000.0)

)

// 创建DataFrame

val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)

现在你可以使用SQL来查询DataFrame了:

// 注册DataFrame为临时视图

df.createOrReplaceTempView("employees")

// 执行SQL查询

val results = spark.sql("SELECT * FROM employees WHERE age > 30")

// 显示查询结果

results.show()

//最后,不要忘记在结束时停止SparkSession:



spark.stop()