Spark基础【RDD持久化、分区器、文件保存读取】

发布于:2023-01-13 ⋅ 阅读:(508) ⋅ 点赞:(0)

一 RDD持久化

1 RDD Cache缓存

(1)为什么使用Cache

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用

如现有两个需求:

  • File – textFile – flatMap – map – reduceByKey
  • File – textFile – flatMap – map – group

因为前四个阶段相同,能否直接在第一个需求的map后面进行更改?

def main(args: Array[String]): Unit = {

  val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
  val sc = new SparkContext(conf)

  val lines: RDD[String] = sc.textFile("data/word.txt")
  val words: RDD[String] = lines.flatMap(_.split(" "))
  val wordToOne: RDD[(String, Int)] = words.map(
    t => {
      println("**************")
      (t,1)
    }
  )
  val wordCount: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
  wordCount.collect().foreach(println)

  println("----------------------")
  val rdd2: RDD[(String, Iterable[(String, Int)])] = wordToOne.groupBy(_._1)
  rdd2.collect().foreach(println)

  sc.stop()
}

执行结果

**************
**************
**************
**************
**************
**************
(scala,1)
(hello,3)
(world,1)
(spark,1)
----------------------
**************
**************
**************
**************
**************
**************
(scala,CompactBuffer((scala,1)))
(hello,CompactBuffer((hello,1), (hello,1), (hello,1)))
(world,CompactBuffer((world,1)))
(spark,CompactBuffer((spark,1)))

第一个需求:共六个单词,打印六个*,正确

但是会发现第二个需求并没有起到简化的作用,因为当处理完reduceByKey之后,map中不保留数据,所以不能直接去处理groupBy,需要返回找到上一个RDD(groupBy – map – flatMap – textFile – File),重新执行此过程

如果让map中保留数据,就可以完成以上需求,将map中的结果保留下来的过程就称为持久化操作

使用cache方法设定数据的持久化,如下代码所示

wordToOne.cache()

再次执行以上代码,发现执行结果为

**************
**************
**************
**************
**************
**************
(scala,1)
(hello,3)
(world,1)
(spark,1)
----------------------
(scala,CompactBuffer((scala,1)))
(hello,CompactBuffer((hello,1), (hello,1), (hello,1)))
(world,CompactBuffer((world,1)))
(spark,CompactBuffer((spark,1)))

(2)修改血缘关系

cache操作可以将血缘关系进行修改,会增加一个和缓存相关的血缘关系,但不改变原有的血缘关系

wordToOne.cache()
val wordCount: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
println(wordCount.toDebugString)
wordCount.collect()//.foreach(println)


//    println("----------------------")
//    val rdd2: RDD[(String, Iterable[(String, Int)])] = wordToOne.groupBy(_._1)
//    rdd2.collect().foreach(println)
println(wordCount.toDebugString)

输出结果

(2) ShuffledRDD[4] at reduceByKey at Spark01_WordCount.scala:21 []
 +-(2) MapPartitionsRDD[3] at map at Spark01_WordCount.scala:14 []
    |      CachedPartitions: 2; MemorySize: 568.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
    |  MapPartitionsRDD[2] at flatMap at Spark01_WordCount.scala:13 []
    |  data/word.txt MapPartitionsRDD[1] at textFile at Spark01_WordCount.scala:12 []
    |  data/word.txt HadoopRDD[0] at textFile at Spark01_WordCount.scala:12 []

因为cache将数据放在缓存当中,所以cache操作不安全

cache源码

def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

MEMORY_ONLY意味着将数据保存时,只保存到内存中,如果当内存不够时,数据不能溢写磁盘,会将数据丢弃(LRU算法:最近最少访问的数据会优先抛弃;FIFO等)

(3)存储级别

使用persist可以更改存储级别,存储级别如下

wordToOne.persist(StorageLevel.OFF_HEAP)

堆内内存存在一个问题:因为GC线程是为用户线程服务的线程,它不能影响用户线程的执行,如果现用户线程正在运行,且想执行GC操作System.gc()之后进行Load(内存),GC可能不会立即释放垃圾内存,造成程序申请空间失败

堆外内存是不受JVM管理的内存,是主动向OS操作系统申请的内存,JVM默认向OS申请1 64 的内存,最大1 /4 ,剩余的3/4 是堆外内存

2 代表副本,SER代表序列化,OFF_HEAP代表堆外内存

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
}

在这里插入图片描述

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition

Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache

2 RDD CheckPoint检查点

如果存在持久化,持久化的文件只能自己使用,而且使用完毕后会自动删除

如果想让别人使用需要使用CheckPoint方法

Spark可以将中间的计算结果保存到检查点中,让其他的应用使用,所谓的检查点其实就是通过将RDD中间结果写入磁盘

wordToOne.checkpoint()

没有设置目录会报如下错误:Checkpoint directory has not been set in the SparkContext

sc.setCheckpointDir("cp")

由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销

对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发

检查点可以切断血缘关系,将保存下来的数据当做新的数据源

检查点为了数据的安全,会重新再执行一遍作业,所以会执行两次

为了解决这个问题,可以将检查点和缓存联合使用

wordToOne.cache()
wordToOne.checkpoint()

3 缓存和检查点的区别

  • Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖
  • Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高,将其当做新的血缘,作为新的数据源
  • 建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否需要再从头计算一次RDD

二 RDD分区器

Spark目前支持Hash分区和Range分区,和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区,进而决定了Reduce的个数

  • 只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None
  • 每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的

1 Hash分区

对于给定的key,计算其hashCode,并除以分区个数取余

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

2 Range分区

将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序

class RangePartitioner[K : Ordering : ClassTag, V](
    partitions: Int,
    rdd: RDD[_ <: Product2[K, V]],
    private var ascending: Boolean = true)
  extends Partitioner {

  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
  require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")

  private var ordering = implicitly[Ordering[K]]

  // An array of upper bounds for the first (partitions - 1) partitions
  private var rangeBounds: Array[K] = {
  ...
  }

  def numPartitions: Int = rangeBounds.length + 1

  private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

  def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[K]
    var partition = 0
    if (rangeBounds.length <= 128) {
      // If we have less than 128 partitions naive search
      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
        partition += 1
      }
    } else {
      // Determine which binary search method to use only once.
      partition = binarySearch(rangeBounds, k)
      // binarySearch either returns the match location or -[insertion point]-1
      if (partition < 0) {
        partition = -partition-1
      }
      if (partition > rangeBounds.length) {
        partition = rangeBounds.length
      }
    }
    if (ascending) {
      partition
    } else {
      rangeBounds.length - partition
    }
  }

  override def equals(other: Any): Boolean = other match {
  ...
  }

  override def hashCode(): Int = {
  ...
  }

  @throws(classOf[IOException])
  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
  ...
  }

  @throws(classOf[IOException])
  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
  ...
  }
}

3 自定义分区

自定义分区器

  • 继承Partitioner
  • 重写方法(2 + 2)

numPartitions:准备分几个区

getPartition:根据数据的K返回所在的分区编号,从0开始

class MyPartitioner extends Partitioner{
  override def numPartitions: Int = {
    3
  }
  override def getPartition(key: Any): Int = {
    key match {
      case "NBA" => 0
      case "CBA" => 1
      case "WNBA" => 2
    }
  }
def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    val sc = new SparkContext(conf)

    val lines: RDD[(String, String)] = sc.makeRDD(
      List(
        ("NBA", "aaaaa"),
        ("CBA", "bbbbb"),
        ("NBA", "ccccc"),
        ("WNBA", "dddd")
      ), 2
    )

    val rdd: RDD[(String, String)] = lines.partitionBy(new MyPartitioner())

    rdd.saveAsTextFile("output")
  
    sc.stop()
}

如果有两个连续的相同分区操作,会发生什么

第二个partitionBy会不会有shuffle操作

val rdd: RDD[(String, String)] = lines.partitionBy(new MyPartitioner())
val rdd1: RDD[(String, String)] = rdd.partitionBy(new MyPartitioner())

partitionBy 源码:

def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
  // key的类型是数组类型,且分区器是Hash,抛异常
  if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
    throw new SparkException("HashPartitioner cannot partition array keys.")
  }
  // 当前的分区器和传进来的分区器判断是否相等,相等返回自身,不会产生新的rdd,就用当前的rdd
  // ==:非空equals
  if (self.partitioner == Some(partitioner)) {
    self
  // 不相等,就会做shuffle操作
  } else {
    new ShuffledRDD[K, V, V](self, partitioner)
  }
}

......
class HashPartitioner(partitions: Int) extends Partitioner {
  override def equals(other: Any): Boolean = other match {
    // 如果类型是HashPartitioner,判断两分区器分区是否分区相等,相等是同一个,反之不是同一个
    // 如果类型不是HashPartitioner,也认为不相等
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

所以重写的方法应该是2 + 2,前2个是硬性要求,后两个以便应对相同重复操作的问题

class MyPartitioner extends Partitioner{

  override def numPartitions: Int = {
    3
  }

  override def getPartition(key: Any): Int = {
    key match {
      case "NBA" => 0
      case "CBA" => 1
      case "WNBA" => 2
    }
  }

  override def equals(other: Any): Boolean = other match {
    case h: MyPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

三 RDD文件读取与保存

Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统

文件格式分为:text文件、csv文件、sequence文件以及Object文件;

文件系统分为:本地文件系统、HDFS、HBASE以及数据库

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    val sc = new SparkContext(conf)

    val lines: RDD[(String, String)] = sc.makeRDD(
      List(
        ("NBA", "aaaaa"),
        ("CBA", "bbbbb"),
        ("NBA", "ccccc"),
        ("WNBA", "dddd")
      ), 2
    )

    //lines.saveAsTextFile("output")
    //lines.saveAsObjectFile("output1")
    //lines.saveAsSequenceFile("output2")

    val rdd: RDD[String] = sc.textFile("output")
    //val rdd1: RDD[(String, String)] = sc.objectFile[(String,String)]("output1")
    //val rdd2: RDD[(String, String)] = sc.sequenceFile[String,String]("output2")

    rdd.collect().foreach(println)

    sc.stop()
  }

SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。在SparkContext中,可以调用sequenceFilekeyClass, valueClass

对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFileT: ClassTag函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到