Spark中非键值对中常用的行动算子

发布于:2023-01-04 ⋅ 阅读:(297) ⋅ 点赞:(0)
/**
 * 行动算子:返回的是一个数据值、集合或者没有返回
 */
object RDDDemo01 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("action").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(sparkConf)
    reduceOper(sc)
    aggregateOper(sc)
    collectOper(sc)
    countOper(sc)
    sc.stop()
  }

  /**
   * reduce——聚合算子
   * 通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的
   */
  def reduceOper(sc:SparkContext):Unit = {
    val rdd:RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 0),1)
    val sum = rdd.reduce((a: Int, b: Int) => {
      println(s"a=$a,b=$b")
      a + b
    })
    val max = rdd.reduce((a: Int, b: Int) => {
      println(s"a=$a,b=$b")
      var c = 0
      if (a>b) {
        a
      } else {
        b
      }
    })

    println("最大值为:"+max)
  }

  /**
   * aggregate——聚合算子,根据初始值和每一个分区进行运算
   * first——获取RDD数据集中第一条数据
   */
  def aggregateOper(sc:SparkContext):Unit = {
    val rdd:RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9),4)
    val res:Int = rdd.aggregate(0)(
      (a: Int, b: Int) => {a + b},
      (a: Int, b: Int) => {a + b}
    )
    println("aggregateOper聚合算子求和:"+res)

    val max:Int = rdd.aggregate(rdd.first())(
      (a: Int, b: Int) => {
        if (a>b) {
          a
        } else {
          b
        }
      },
      (a: Int, b: Int) => {
        if (a>b) {
          a
        } else {
          b
        }
      }
    )
    println("aggregateOper聚合算子求最大值:"+max)
  }

  /**
   * collect——(慎用)将RDD数据集中的全部数据拉取到Driver所在节点形成一个数组,数组包含RDD数据集中的所有数据,数组在内存中的概念,会导致内存溢出
   * take(n)——拉取前n条数据,顺序
   * takeSample——随机抽样
   * takeOrdered——将RDD数据集中数据排序之后获取前n条数据
   */
  def collectOper(sc:SparkContext):Unit = {
    val rdd:RDD[Int] = sc.makeRDD(Array(24,54,67,23,65,32,87,23,21,11,99))
    val array:Array[Int] = rdd.collect()
    println(array.mkString(","))

    // take算子
    val array1:Array[Int] = rdd.take(5)
    println(array1.mkString("="))

    // takeSample算子
    val array2:Array[Int] = rdd.takeSample(false, 10)
    println(array2.mkString("::"))

    // takeOrdered算子
    val array3:Array[Int] = rdd.takeOrdered(5)
    println(array2.mkString("->"))
  }

  /**
   * count算子
   */
  def countOper(sc:SparkContext):Unit = {
    val rdd:RDD[Int] = sc.makeRDD(1 to 100)
    val rdd1:RDD[Int] = rdd.filter((a: Int) => {
      if (a % 2 == 1) {
        true
      } else {
        false
      }
    })
    val count:Long = rdd1.count()
    println("count="+count)

    /**
     * foreach——无返回值,累加器计算或者RDD数据集遍历
     */
    rdd1.foreach((a:Int)=>{println(a)})

  }

  /**
   * saveAsSequenceFile(path):K-V键值对RDD
   * saveAsTextFile(path):数值型RDD
   */
  def saveAsXxxFileOper(sc:SparkContext):Unit = {
    val rdd:RDD[Int] = sc.makeRDD(1 to 100)
    rdd.saveAsTextFile("hdfs://node1:9000/action")
  }
}
本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到