在 Apache Spark 中,RDD(弹性分布式数据集)是核心的数据抽象,RDD 算子可分为转换算子(Transformation)和行动算子(Action)。下面使用 Scala 语言为你详细介绍这两类算子。
转换算子(Transformation)
转换算子会基于现有的 RDD 创建一个新的 RDD,这些操作是惰性的,即只有在遇到行动算子时才会真正触发计算。
常见转换算子示例
- map(func):对 RDD 中的每个元素应用指定的函数,生成一个新的 RDD。
scala
import org.apache.spark.{SparkConf, SparkContext}
object MapExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("MapExample").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Seq(1, 2, 3))
val newRDD = rdd.map(x => x * 2)
newRDD.collect().foreach(println)
sc.stop()
}
}
- filter(func):筛选出满足指定条件的元素,生成新的 RDD。
scala
import org.apache.spark.{SparkConf, SparkContext}
object FilterExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("FilterExample").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Seq(1, 2, 3, 4))
val newRDD = rdd.filter(x => x % 2 == 0)
newRDD.collect().foreach(println)
sc.stop()
}
}
- flatMap(func):对 RDD 中的每个元素应用函数,然后将结果扁平化,生成一个新的 RDD。
scala
import org.apache.spark.{SparkConf, SparkContext}
object FlatMapExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("FlatMapExample").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Seq("hello world", "spark rdd"))
val newRDD = rdd.flatMap(x => x.split(" "))
newRDD.collect().foreach(println)
sc.stop()
}
}
- union(other):将两个 RDD 合并成一个新的 RDD。
scala
import org.apache.spark.{SparkConf, SparkContext}
object UnionExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("UnionExample").setMaster("local")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(Seq(1, 2))
val rdd2 = sc.parallelize(Seq(3, 4))
val newRDD = rdd1.union(rdd2)
newRDD.collect().foreach(println)
sc.stop()
}
}
- groupByKey():对键值对 RDD 按照键进行分组。
scala
import org.apache.spark.{SparkConf, SparkContext}
object GroupByKeyExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("GroupByKeyExample").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))
val newRDD = rdd.groupByKey()
newRDD.collect().foreach(println)
sc.stop()
}
}
- reduceByKey(func):对键值对 RDD 按照键进行聚合操作。
scala
import org.apache.spark.{SparkConf, SparkContext}
object ReduceByKeyExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ReduceByKeyExample").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))
val newRDD = rdd.reduceByKey(_ + _)
newRDD.collect().foreach(println)
sc.stop()
}
}
行动算子(Action)
行动算子会触发对 RDD 的计算,并返回一个具体的结果或者将结果保存到外部存储。
常见行动算子示例
- collect():将 RDD 中的所有元素收集到驱动程序中。
scala
import org.apache.spark.{SparkConf, SparkContext}
object CollectExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("CollectExample").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Seq(1, 2, 3))
val result = rdd.collect()
result.foreach(println)
sc.stop()
}
}
- count():返回 RDD 中元素的数量。
scala
import org.apache.spark.{SparkConf, SparkContext}
object CountExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("CountExample").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Seq(1, 2, 3, 4))
val count = rdd.count()
println(count)
sc.stop()
}
}
- first():返回 RDD 中的第一个元素。
scala
import org.apache.spark.{SparkConf, SparkContext}
object FirstExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("FirstExample").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Seq(1, 2, 3))
val firstElement = rdd.first()
println(firstElement)
sc.stop()
}
}
- take(n):返回 RDD 中的前
n
个元素。
scala
import org.apache.spark.{SparkConf, SparkContext}
object TakeExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("TakeExample").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))
val result = rdd.take(3)
result.foreach(println)
sc.stop()
}
}
- reduce(func):使用指定的函数对 RDD 中的元素进行聚合操作。
scala
import org.apache.spark.{SparkConf, SparkContext}
object ReduceExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ReduceExample").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Seq(1, 2, 3, 4))
val result = rdd.reduce(_ + _)
println(result)
sc.stop()
}
}
- saveAsTextFile(path):将 RDD 中的元素以文本文件的形式保存到指定的路径。
scala
import org.apache.spark.{SparkConf, SparkContext}
object SaveAsTextFileExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SaveAsTextFileExample").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Seq(1, 2, 3))
rdd.saveAsTextFile("output_path")
sc.stop()
}
}