RDD算子的基本介绍

发布于:2025-05-12 ⋅ 阅读:(14) ⋅ 点赞:(0)

在 Apache Spark 中,RDD(弹性分布式数据集)是核心的数据抽象,RDD 算子可分为转换算子(Transformation)和行动算子(Action)。下面使用 Scala 语言为你详细介绍这两类算子。

转换算子(Transformation)

转换算子会基于现有的 RDD 创建一个新的 RDD,这些操作是惰性的,即只有在遇到行动算子时才会真正触发计算。

常见转换算子示例
  1. map(func):对 RDD 中的每个元素应用指定的函数,生成一个新的 RDD。

scala

import org.apache.spark.{SparkConf, SparkContext}

object MapExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MapExample").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(Seq(1, 2, 3))
    val newRDD = rdd.map(x => x * 2)
    newRDD.collect().foreach(println)
    sc.stop()
  }
}

  1. filter(func):筛选出满足指定条件的元素,生成新的 RDD。

scala

import org.apache.spark.{SparkConf, SparkContext}

object FilterExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("FilterExample").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(Seq(1, 2, 3, 4))
    val newRDD = rdd.filter(x => x % 2 == 0)
    newRDD.collect().foreach(println)
    sc.stop()
  }
}

  1. flatMap(func):对 RDD 中的每个元素应用函数,然后将结果扁平化,生成一个新的 RDD。

scala

import org.apache.spark.{SparkConf, SparkContext}

object FlatMapExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("FlatMapExample").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(Seq("hello world", "spark rdd"))
    val newRDD = rdd.flatMap(x => x.split(" "))
    newRDD.collect().foreach(println)
    sc.stop()
  }
}

  1. union(other):将两个 RDD 合并成一个新的 RDD。

scala

import org.apache.spark.{SparkConf, SparkContext}

object UnionExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("UnionExample").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd1 = sc.parallelize(Seq(1, 2))
    val rdd2 = sc.parallelize(Seq(3, 4))
    val newRDD = rdd1.union(rdd2)
    newRDD.collect().foreach(println)
    sc.stop()
  }
}

  1. groupByKey():对键值对 RDD 按照键进行分组。

scala

import org.apache.spark.{SparkConf, SparkContext}

object GroupByKeyExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("GroupByKeyExample").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))
    val newRDD = rdd.groupByKey()
    newRDD.collect().foreach(println)
    sc.stop()
  }
}

  1. reduceByKey(func):对键值对 RDD 按照键进行聚合操作。

scala

import org.apache.spark.{SparkConf, SparkContext}

object ReduceByKeyExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ReduceByKeyExample").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))
    val newRDD = rdd.reduceByKey(_ + _)
    newRDD.collect().foreach(println)
    sc.stop()
  }
}

行动算子(Action)

行动算子会触发对 RDD 的计算,并返回一个具体的结果或者将结果保存到外部存储。

常见行动算子示例
  1. collect():将 RDD 中的所有元素收集到驱动程序中。

scala

import org.apache.spark.{SparkConf, SparkContext}

object CollectExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("CollectExample").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(Seq(1, 2, 3))
    val result = rdd.collect()
    result.foreach(println)
    sc.stop()
  }
}

  1. count():返回 RDD 中元素的数量。

scala

import org.apache.spark.{SparkConf, SparkContext}

object CountExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("CountExample").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(Seq(1, 2, 3, 4))
    val count = rdd.count()
    println(count)
    sc.stop()
  }
}

  1. first():返回 RDD 中的第一个元素。

scala

import org.apache.spark.{SparkConf, SparkContext}

object FirstExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("FirstExample").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(Seq(1, 2, 3))
    val firstElement = rdd.first()
    println(firstElement)
    sc.stop()
  }
}

  1. take(n):返回 RDD 中的前 n 个元素。

scala

import org.apache.spark.{SparkConf, SparkContext}

object TakeExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TakeExample").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
    val result = rdd.take(3)
    result.foreach(println)
    sc.stop()
  }
}

  1. reduce(func):使用指定的函数对 RDD 中的元素进行聚合操作。

scala

import org.apache.spark.{SparkConf, SparkContext}

object ReduceExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ReduceExample").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(Seq(1, 2, 3, 4))
    val result = rdd.reduce(_ + _)
    println(result)
    sc.stop()
  }
}

  1. saveAsTextFile(path):将 RDD 中的元素以文本文件的形式保存到指定的路径。

scala

import org.apache.spark.{SparkConf, SparkContext}

object SaveAsTextFileExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SaveAsTextFileExample").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(Seq(1, 2, 3))
    rdd.saveAsTextFile("output_path")
    sc.stop()
  }
}


网站公告

今日签到

点亮在社区的每一天
去签到