Spark 核心编程之 RDD 介绍

发布于:2024-06-03 ⋅ 阅读:(86) ⋅ 点赞:(0)

一、Spark 分布式计算模拟

Driver 端将数据拆分成 n 个 Task 发送给 Executor,n 为 Executor 个数,Task 包含数据和计算逻辑,Executor 接收到 Task 后进行计算并将计算后的结果返回给 Driver

在这里插入图片描述

  • 定义封装整体数据和逻辑的资源类

    class Resource extends Serializable {
        // 数据集
        val datas: List[Int] = List(1, 2, 3, 4)
        
        // 计算逻辑
        val logic: Int => Int = _ * 2
    }
    
  • 定义 Task 类

    class Task extends Serializable {
        // 持有的数据
        var data: List[Int] = _
        
        // 持有的逻辑
        var logic: Int => Int = _
        
        // 计算方法
        def compute() = {
            data.map(logic)
        }
    }
    
  • 定义 Driver 类

    /*
    	负责与 Executor 通信并将准备好的 Task 发送给 Executor
    */
    object Driver {
        def main(args: Array[String]): Unit = {
            // 1.建立与 Executor 的连接
            val client1 = new Socket("localhost", 8888)
            val client2 = new Socket("localhost", 9999)
            
            // 2.封装 Task
            val resource = new Resource()
            val task1 = new Task()
            task1.data = resource.datas.take(2)
            task1.logic = resource.logic
            
            val task2 = new Task()
            task2.data = resource.datas.takeRight(2)
            task2.logic = resource.logic
            
            // 3.发送 Task
            val objOut1 = new ObjectOutputStream(client1.getOutputStream)
            objOut1.writeObject(task1)
            objOut1.close()
            objOut1.flush()
            client1.close()
            
            val objOut2 = new ObjectOutputStream(client2.getOutputStream)
            objOut2.writeObject(task2)
            objOut2.close()
            objOut2.flush()
            client2.close()
            
            println("客户端数据发送完毕")
            
        }
    }
    
  • 定义两个 Executor 类

    /*
    	负责接收 Driver 发送过来的 Task 并计算出结果
    */
    object Executor1 {
        def main(args: Array[String]): Unit = {
            // 1.启动服务并等待客户端连接
            val server = new ServerSocket(8888)
            println("服务端[8888]启动,等待客户端连接...")
            val client = server.accept()
            
            // 2.接收 Task
            val objIn = new ObjectInputStream(client.getInputStream)
            val task = objIn.readObject()
            
            // 3.执行计算并输出结果
            val result = task.compute()
            println("Executor[8888]计算结果为:" + result)
            
            objIn.close()
            client.close()
            server.close()
            
        }
    }
    
    /*
    	负责接收 Driver 发送过来的 Task 并计算出结果
    */
    object Executor2 {
        def main(args: Array[String]): Unit = {
            // 1.启动服务并等待客户端连接
            val server = new ServerSocket(9999)
            println("服务端[9999]启动,等待客户端连接...")
            val client = server.accept()
            
            // 2.接收 Task
            val objIn = new ObjectInputStream(client.getInputStream)
            val task = objIn.readObject()
            
            // 3.执行计算并输出结果
            val result = task.compute()
            println("Executor[9999]计算结果为:" + result)
            
            objIn.close()
            client.close()
            server.close()
            
        }
    }
    

二、RDD 介绍

Resilient Distributed Dataset,简称 RDD,弹性分布式数据集

1. 概念

  • RDD 是 Spark 中最基本的数据处理模型,在代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合
  • 弹性的特点:
    • 存储的弹性:内存与磁盘的自动切换
    • 容错的弹性:数据丢失可以自动恢复
    • 计算的弹性:计算出错重试机制
    • 分片的弹性:可根据需要重新分片
  • 分布式:数据存储在大数据集群不同节点上
  • 数据集:RDD 封装了计算逻辑,并不保存数据
  • 数据抽象:RDD 是一个抽象类,需要子类具体实现
  • 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑
  • 可分区、并行计算

2. 实现原理

2.1 IO 的基本实现原理

IO 的实现体现了装饰者设计模式思想,实现了对类的功能的增强

  • 字节流

    InputStream in = new FileInputStream("filePath");
    int i = -1;
    while((i = in.read()) != -1) { // 每读取一个字节就打印输出一个字节
        System.out.println(i);
    }
    

    在这里插入图片描述

  • 缓冲字节流

    InputStream in = new BufferedInputStream(new FileInputStream("filePath"));
    int i = -1;
    while((i = in.read()) != -1) { // 每读取一个字节就放进缓存中,当超过缓存阈值就全部输出
        System.out.println(i);
    }
    

    在这里插入图片描述

  • 字符流

    Reader in = new BufferedReader(new InputStreamReader(new FileInputStream("filePath"), "UTF-8"));
    
    String s = null;
    while((s = in.readLine()) != null) { // 每读取一个字节就放入转换区,满足大小就转换成一个字符放入缓存区,超过缓存区阈值就将全部字符输出
        System.out.println(s);
    }
    

    在这里插入图片描述

2.2 RDD 与 IO 的关系
// 以 wordCount 案例
val lines: RDD[String] = sc.textFile("datas")
val words: RDD[String] = lines.flatMap(_.split(" "))
val wordToOne: RDD[(String, Int)] = words.map((_, 1))
val wordToCount: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
val wordArray: Array[(String, Int)] = wordToCount.collect()
wordArray.foreach(println)

在这里插入图片描述

  • RDD 的数据处理方式类似于 IO 流,也包含了装饰者设计模式思想
  • RDD 的数据只有在调用 collect 方法时才会真正地执行业务逻辑操作,之前的操作都是对 RDD 功能的扩展
  • RDD 是不保存数据的,但是 IO 流的缓存区可以临时保存一部分数据

3. 核心属性

  • 分区列表:用于执行任务时并行计算,是实现分布式计算的重要属性

    protected def getPartitions: Array[Partition]
    
  • 分区计算函数:Spark 在计算时,是使用分区函数对每一个分区进行计算

    def compute(split: Partition, context: TaskContext): Iterator[T]
    
  • RDD 之间的依赖关系:RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系

    protected def getDependencies: Seq[Dependency[_]] = deps
    
  • 分区器:可选,当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区

    @transient val partitioner: Option[Partitioner] = None
    
  • 首选位置:可选,计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算,可以判断发送到哪个节点计算效率最优

    protected def getPreferredLocations(split: Partition): Seq[String] = Nil
    
    

4. 执行原理

在这里插入图片描述

  • 启动 Yarn 集群环境(ResourceManager 和 NodeManager)
  • Spark 通过申请资源创建调度节点(Driver)和计算节点(Executor)
  • Spark 框架根据需求将计算逻辑根据分区划分成不同的任务(task)
  • 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
  • 总结:RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给 Executor 节点执行计算

5. RDD 创建

object TestRDDCreate {
    def main(args: Array[String]): Unit = {
        // 1.创建 spark 连接对象
        val sparkConf = new SparkConf.setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(sparkConf)
        
        // 2.创建 RDD
        // 2.1 从集合(内存)中创建 RDD:parallelize() 和 makeRDD()
        val seq: Seq[Int] = Seq[Int](1,2,3,4)
        val rdd1: RDD[Int] = sc.parallelize(seq)
        val rdd2: RDD[Int] = sc.makeRDD(seq) // 推荐,makeRDD底层实现是调用parallelize方法
        
        rdd1.collect.foreach(println)
        rdd2.collect.foreach(println)
        
        println("=================")
        
        // 2.2 从外部存储(文件)创建 RDD:本地的文件系统、Hadoop支持的数据集(如HDFS、HBase等)
        // 2.2.1 使用本地文件的绝对路径或相对路径(相对于项目的根目录)创建
        val rdd3: RDD[String] = sc.textFile("D:\\data\\1.txt")
        // val rdd3: RDD[String] = sc.textFile("input/1.txt")
        
        // 2.2.2 使用目录创建(读取目录下所有文件)
        val rdd4 = sc.textFile("D:\\data") // 以行为单位读取,结果是内容字符串
        val rdd41 = sc.wholeTextFiles("D:\\data") // 以文件为单位读取,结果是二元组,第一个元素为文件路径,第二个元素为文件内容
        
        // 2.2.3 使用路径通配符创建
        val rdd5 = sc.textFile("input/1*.txt")
        
        // 2.2.4 使用分布式文件系统路径
        val rdd6 = sc.textFile("hdfs://hadoop102:8020/data/word.txt")
        
        println("=================")
        
        // 2.3 从其他 RDD 创建:通过一个 RDD 运算完后,再产生新的 RDD
        // 2.4 使用 new 直接创建 RDD:一般由 Spark 框架自身使用
        
        // 3.关闭连接
        sc.stop()
    }
}

6. RDD 并行度与分区

6.1 分区规则
  • RDD 有分区列表属性,可以将一个作业切分成多个任务后,发送给不同的 Executor 节点并行计算,而能够同时计算的任务数量称之为并行度

  • RDD 在创建时可以指定分区个数

    // makeRDD 方法的第二个参数表示分区的数量,其默认值为 defaultParallelism 方法的返回值;
    // defaultParallelism 方法底层最终执行返回的是 scheduler.conf.getInt("spark.default.parallelism", totalCores) 的返回值;
    // 首先会在 SparkConf 中获取 key="spark.default.parallelism" 的值,获取到则返回;如果获取不到则返回 totalCores 的值,即当前运行环境(机器)的最大可用 CPU 核数
    
    val rdd1 = sc.makeRDD(List(1,2,3,4), 2)
    
    // 将数据保存成对应个数的分区文件
    rdd1.saveAsTextFile("output1") // 目录下有 2 个分区文件
    
    
    // textFile 方法的第二个参数表示最小的分区数量;其默认值为 defaultMinPartitions 方法的返回值;
    // defaultMinPartitions 方法实现为:math.min(defaultParallelism, 2),如果没指定 "spark.default.parallelism" 的值,则最小分区数为 2
    // 可以通过第二个参数指定最小分区数
    // 由于 spark 读取文件底层使用的是 Hadoop 的 TextInputFormat,所以其分区计算使用的是 TextInputFormat 的 getSplits 方法:
    // 首先获取文件的总字节数:totalSize (如 7 字节);再根据指定的最小分区数计算出每个分区存储的字节大小:goalSize = totalSize/(numSplits == 0 ? 1 : numSplits) = 7/2 = 3;再根据 1.1 倍原则判断剩余的大小是否需要创建新分区:7 - 7/3 = 1,由于 1/3 + 1 > 1.1,所以需要创建新分区,即 7/3 + 1 = 3 个分区 
    val rdd2 = sc.textFile("input/1.txt", 2)
    
    rdd2.saveAsTextFile("output2") // 目录下有 3 个分区文件
    
  • 建立 Spark 连接时可以指定并行度配置

    // local 表示单核运行;local[n] 表示指定核数运行;local[*] 表示最大核数运行
    val sparkConf = new SparkConf.setMaster("local[*]").setAppName("spark")
    sparkConf.set("spark.default.parallelism", "5")
    
    val sc = new SparkContext(sparkConf)
    
6.2 分区数据分配规则
  • makeRDD 创建:集合(内存)数据

    // 以 5 个元素的集合和 3 个分区创建 RDD
    val list = List(1,2,3,4,5)
    val rdd = sc.makeRDD(list, 3)
    
    // makeRDD 底层调用 parallelize
    parallelize(list, 3)
    
    // parallelize 中创建 ParallelCollectionRDD
    new ParallelCollectionRDD(.., list, 3, ..)
    
    // ParallelCollectionRDD 中有核心属性分区列表
    def getPartitions: Array[Partition] = {
        // 调用 slice 方法
        slice(list, 3)
    }
    
    // slice 方法中有分配的核心方法 positions
    def positions(length: Int, numSlice: Int): Iterator[(Int,Int)] = { // 5, 3
        (0 until numSlices).iterator.map { // (0, 1, 2)
            i => { // 0 -- 1 -- 2
                val start = ((i * length) / numSlices).toInt // 0 -- 1 -- 3
                val end = (((i + 1) * length) / numSlices).toInt // 1 -- 3 -- 5
                (start, end) // (0,1) -- (1,3) -- (3,5)
            }
        }
    }
    
    // slice 中会对数据集进行类型模式匹配判断
    case _ => { 
        // 调用 position 方法再映射
        positions(list.toArray.length, 3).map { // 5, 3
            case (start,end) => list.toArray.slice(start, end) // slice(from,until)
            // (0,1) -> (1,2,3,4,5) -> 1
            // (1,3) -> (1,2,3,4,5) -> 2,3
            // (3,5) -> (1,2,3,4,5) -> 4,5
        }
    }
    
    // 结论:3 个分区文件中的数据分配:【1】,【2,3】,【4,5】
    
  • textFile 创建:文件数据

    /**
    	读取的文件内容:1.txt,@表示换行符的位置
    	1@@
    	2@@
    	3
    */
    // 创建文件读取的 RDD
    val rdd = sc.textFile("input/1.txt", 2) // 由于文件为 7 字节,所以分区数为 3,每个分区存储 3 字节
    
    // 1.Spark文件读取是使用 hadoop 的方式以行为单位读取,与字节数无关
    // 2.Spark是以偏移量的形式来读取一行数据,且偏移量不会被重复读取
    /*
    	1@@ -> 偏移量:012
    	2@@ -> 偏移量:345
    	3 -> 偏移量:6
    */
    // 3.每个分区所包含的偏移量范围:(起始偏移量 ~ 起始偏移量 + 分区字节数)
    /*
    	分区0:[0 - 0 + 3 = 3]  -> 1@@2
    	分区1:[3 - 3 + 3 = 6]    -> 3
    	分区2:[6 - 6 + 3 = 9]    -> 
    */
    
    // 结论:3 个分区文件中的数据分配:【1, 2】,【3】,【】
    

网站公告

今日签到

点亮在社区的每一天
去签到