目录
spark基础&rdd
docs
docs: https://archive.apache.org/dist/spark/docs/3.1.1/
RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。(不可变、自动容错、位置感知性调度和可伸缩性)
RDD is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel
resilient 英[rɪˈzɪliənt]
adj. 有弹性(或弹力)的;有适应力的;能复原的;可迅速恢复的;
- 一组分片(Partition),即数据集的基本组成单位。对于 RDD 来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建 RDD 时指定 RDD 的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的 CPU Core 的数目。
- 一个计算每个分区的函数。Spark 中 RDD 的计算是以分片为单位的,每个 RDD 都会实现 compute 函数以达到这个目的。compute 函数会对迭代器进行复合,不需要保存每次计算的结果。
- RDD 之间的依赖关系。RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算。
- 一个 Partitioner,即 RDD 的分片函数。当前 Spark 中实现了两种类型的分片函数,一个是基于哈希的 HashPartitioner,另外一个是基于范围的 RangePartitioner。只有对于于 key-value 的 RDD,才会有 Partitioner,非 key-value 的 RDD 的 Parititioner 的值是 None。Partitioner 函数不但决定了 RDD 本身的分片数量,也决定了 parent RDD Shuffle 输出时的分片数量。
- 一个列表,存储存取每个 Partition 的优先位置(preferred location)。对于一个 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
spark架构
- Driver(驱动程序): 作为应用程序的主控进程,负责解析用户代码、生成DAG(有向无环图)并划分Stage。
- Cluster Manager(集群管理器):管理集群资源,支持Standalone、YARN、Mesos和Kubernetes等模式
- Worker(工作节点):在集群节点上运行,负责启动Executor进程并监控其状态。
- Executor(执行器):在Worker节点上执行具体任务,管理内存缓存和磁盘I/O,定期向Driver发送心跳。
- Task(任务):最小执行单元,由Executor并行处理,每个Stage被拆分为多个Task。
特点
- 内存计算:中间结果存储在内存中,相比Hadoop MapReduce减少磁盘I/O,部分速度能提升10-100倍。
- 弹性分布式数据集(RDD):基础数据结构,支持容错和并行计算,通过Lineage(血统、依赖链、依赖关系)机制恢复数据。
- 多模式支持:兼容批处理、流处理(Spark Streaming)、SQL查询(Spark SQL)及机器学习(MLlib)。
Spark 对比 hadoop MapReduce
spark例子参考:https://xie.infoq.cn/article/71e6677d03b59ce7aa5eec22a
hadoop MapReduce参考:https://doctording.blog.csdn.net/article/details/78467216
对比hadoop MapReduce:
Map:
Reduce:
spark maven依赖
<!-- Spark core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.1</version>
</dependency>
<!-- Spark SQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.1</version>
</dependency>
<!-- 如果需要使用 MLlib -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<version>3.1.1</version>
</dependency>
Spark的checkpoint
spark 中 Checkpoint 的主要作用是斩断 RDD 的依赖链。
Checkpoint 的两种方式
- 可靠的将数据存储在可靠的存储引擎中,例如 HDFS。要将 RDD 缓存在本地 block manager 中,在 exquator中。如果 work 崩溃,数据消失。而 RDD 也能存储在 HDFS 中,即一种 checkpoint 可靠的方式,即将 RDD 的数据缓存到 HDFS 中。
- 本地的将数据存储在本地,称为 Local checkpoint,较少使用,和 RDD 的区别比较小。本地存储方式不太符合checkpoint 的思想。
https://developer.aliyun.com/article/1086764
https://developer.aliyun.com/article/1329824
transformations、shuffle、actions
transformation 英[ˌtrænsfəˈmeɪʃn]
n. (彻底的)变化,改观,转变,改革;转型;(用于南非)民主改革;
- map
- filter
- flatMap
- mapPartitions
- mapPartitionsWithIndex
- sample
- union
- intersection
- distinct
- groupByKey
- reduceByKey
- aggregateByKey
- sortByKey
- join
- cogroup
- cartesian
- pipe
- coalesce
- repartition
- repartitionAndSortWithinPartitions
shuffle 英[ˈʃʌfl]
v. 洗牌;洗(牌);拖着脚走;坐立不安;把(纸张等)变换位置,打乱次序;(笨拙或尴尬地)把脚动来动去;
n.洗牌;拖着脚走;曳步舞;
Operations which can cause a shuffle include repartition operations like repartition and coalesce, ‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.
actions
- collect()
- count()
- first()
- take(n)
- takeSample(withReplacement, num, [seed])
- takeOrdered(n, [ordering])
- saveAsTextFile(path)
- saveAsSequenceFile(path)
- saveAsObjectFile(path)
- countByKey()
- foreach(func)
reduceByKey的用法
值可合并的聚合操作
- scala
val data = List(("a", 1), ("b", 2), ("a", 3), ("b", 4))
val rdd = sc.parallelize(data)
val result = rdd.reduceByKey(_ + _) // 对相同key的值求和
result.collect().foreach(println)
- 输出
(a,4)
(b,6)
核心特点
- 本地聚合优化:先在分区内合并数据(类似MapReduce的combiner),减少shuffle数据量
- 函数要求:聚合函数需满足结合律(如加法、最大值等)
- 性能对比:比groupByKey更高效,因后者不进行预聚合
典型场景
- 统计词频(单词计数)
- 分组求和/求平均值
- 数据去重后的聚合计算
groupByKey的用法
Spark中的groupByKey是一个针对键值对RDD的核心转换算子
核心机制
- 分组逻辑:将相同key的所有value合并为迭代器(Iterable),输出格式为(K, Iterable)。与reduceByKey不同,它仅进行分组而不执行聚合操作
- 执行过程:直接对全量数据进行shuffle,不进行本地预聚合,导致网络传输量较大。例如处理(“a”,1), (“a”,2)会直接传输所有键值对,而reduceByKey会先在分区内合并为(“a”,3)再shuffle。
典型应用场景
- 需保留所有值的场景:如统计每个用户的完整行为序列
- 非聚合操作:例如对分组后的值进行复杂处理(排序、去重等)
缺点
- 数据倾斜风险:单个key对应的value过多会导致OOM,如某key关联百万级记录:使用reduceByKey或aggregateByKey替代(若允许预聚合);对倾斜key单独处理(如加盐分片)
- 内存消耗:未压缩的迭代器对象会占用更多内存,建议配合mapValues转换为紧凑数据结构
例子
val data = sc.parallelize(Seq(("a",1), ("b",2), ("a",3)))
val grouped = data.groupByKey() // 输出: (a, [1,3]), (b, [2])
grouped.foreach(println)
输出
(a,CompactBuffer(1, 3))
(b,CompactBuffer(2))
count / count distinct
count:
- 触发计算:count是行动算子,会立即触发作业执行
- 全量扫描:默认会扫描全表数据,大数据集可能耗时较长。
- 优化建议:
- 对过滤后的数据统计(先filter再count)
- 使用缓存(cache())避免重复计算
scala> val rdd = sc.parallelize(Seq(1, 2, 3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> val total = rdd.count() // 输出:3
total: Long = 3
scala>
- count distinct例子
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;
/**
* @Author mubi
* @Date 2025/8/14 23:39
*/
public class Main {
public static void main(String[] args) throws Exception {
// 初始化SparkSession
SparkSession spark = SparkSession.builder()
.appName("countDistinct")
.master("local[*]")
.getOrCreate();
// 定义Schema
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("fruit", DataTypes.StringType, false)
});
// 创建Row数据
List<Row> data = Arrays.asList(
RowFactory.create(1, "apple"),
RowFactory.create(2, "banana"),
RowFactory.create(3, "apple"),
RowFactory.create(4, "orange"),
RowFactory.create(5, "banana")
);
// 构建DataFrame
Dataset<Row> df = spark.createDataFrame(data ,schema);
// 计算不同水果数量
long distinctFruitsCount = df.select("fruit")
.distinct()
.count();
System.out.println("Number of distinct fruits: " + distinctFruitsCount);
spark.stop();
}
}
例子:单词计数
- java程序
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/**
* @Author mubi
* @Date 2025/8/14 23:39
*/
public class Main {
public static void main(String[] args) throws Exception {
// 创建Spark配置和上下文
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 读取文本文件,这里使用Spark自带的示例文本文件
JavaRDD<String> input = sc.textFile("src/main/resources/words.txt");
// 将每行文本拆分成单词,并对单词进行计数
JavaRDD<String> words = input
.flatMap(s -> Arrays.asList(s.split(" ")).iterator());
JavaPairRDD<String, Integer> wordCounts = words
.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((a, b) -> a + b);
// 收集结果并打印
List<Tuple2<String, Integer>> output = wordCounts.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
// 关闭Spark上下文
sc.close();
}
}
- 输入
hello how are you
fine
hello you
- 输出
are: 1
fine: 1
you: 2
how: 1
hello: 2
例子:一批人员年龄数据求平均(rdd)
- java程序
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Random;
/**
* @Author mubi
* @Date 2025/8/14 23:39
*/
public class Main {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("AvgAge");
JavaSparkContext sc = new JavaSparkContext(conf);
//刚从文件读出来的RDD已经是一行一行的字符串,所以可以直接进行mapToPair
JavaRDD<String> fileRDD = sc.textFile("src/main/resources/peopleAges.txt");
final long peopleCnt = fileRDD.count();
// map: <年龄,数量1>
JavaPairRDD<Integer, Integer> ageOneRdd = fileRDD.mapToPair(new PairFunction<String, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(String s) throws Exception {
Integer age = Integer.parseInt(s.split("\\s+")[1]);
return new Tuple2(age, 1);
}
});
// reduce: <年龄,总数量>
JavaPairRDD<Integer, Integer> ageCountRDD = ageOneRdd
.reduceByKey((v1, v2) -> v1 + v2)
.sortByKey();
ageCountRDD.saveAsTextFile("src/main/resources/ageAnalysis");
//求总年龄和
long totalAgeSum = ageCountRDD
.map(tuple -> (long)tuple._1() * (long)tuple._2())
.reduce((a, b) -> a + b);
System.out.println("totalAgeSum:" + totalAgeSum + ",peopleCnt:" + peopleCnt);
//求平均年龄
System.out.println("all people avg age is:" + totalAgeSum * 1.0d / peopleCnt);
}
//生成年龄数据,格式"序号 年龄"
public static void makeAgeData() throws IOException {
File newFile = new File("src/main/resources/peopleAges.txt");
if (newFile.exists()){
newFile.delete();
}
newFile.createNewFile();
FileWriter fw = new FileWriter(newFile,true);
Random rand = new Random();
for (int i = 1; i <= 1000000; i++) {
fw.append(i + " " + (rand.nextInt(100) + 1) + "\n");
fw.flush();
}
fw.close();
System.out.println("makeAgeData finish");
}
}
例子文件:
1 20
2 30
3 40
4 50
5 60
6 60
7 50
8 40
例子输出
totalAgeSum:350, peopleCnt:8
all people avg age is:43.75
例子:求不同性别的最大/最小身高(Sql)
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Random;
/**
* @Author mubi
* @Date 2025/8/14 23:39
*/
public class Main {
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession.builder()
.appName("HeightAnalysis")
.master("local[*]")
.getOrCreate();
Dataset<Row> df = spark.read()
.option("delimiter", " ")
.option("header", "false")
.csv("src/main/resources/peopleSexHeight.txt")
.toDF("id", "gender", "height")
.withColumn("height", functions.col("height").cast("int"));
System.out.println("=== 数据 ===");
df.show();
// 男性身高统计
Dataset<Row> maleStats = df.filter("gender = 'M'")
.agg(functions.max("height").alias("max_height"),
functions.min("height").alias("min_height"));
System.out.println("=== 男性身高统计 ===");
maleStats.show();
// 女性身高统计
Dataset<Row> femaleStats = df.filter("gender = 'F'")
.agg(functions.max("height").alias("max_height"),
functions.min("height").alias("min_height"));
System.out.println("=== 女性身高统计 ===");
femaleStats.show();
spark.stop();
}
public static void makeData() throws IOException {
File newFile = new File("src/main/resources/peopleSexHeight.txt");
if (newFile.exists()){
newFile.delete();
}
newFile.createNewFile();
FileWriter fw = new FileWriter(newFile,true);
Random rand = new Random();
final int N = 1_000_000;
for (int i = 1; i <= N; i++) {
String gender = getRandomGender();
int height = rand.nextInt(220);
if (height < 100 && gender.equals("M")) {
height = height + 100;
}
if (height < 100 && gender == "F") {
height = height + 50;
}
fw.append(i + " " + gender + " " + height + "\n");
fw.flush();
}
fw.close();
System.out.println("makeAgeData finish");
}
static String getRandomGender() {
Random rand = new Random();
int randNum = rand.nextInt(2) + 1;
if (randNum % 2 == 0) {
return "M";
} else {
return "F";
}
}
}
- 数据
1 M 153
2 F 64
3 M 107
4 F 83
5 F 131
6 M 174
7 F 86
8 M 115
9 M 191
10 F 208
- 输出
=== 男性身高统计 ===
+----------+----------+
|max_height|min_height|
+----------+----------+
| 191| 107|
+----------+----------+
=== 女性身高统计 ===
+----------+----------+
|max_height|min_height|
+----------+----------+
| 208| 64|
+----------+----------+
附rdd:
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("maxMinHeight");
JavaSparkContext sc = new JavaSparkContext(conf);
//刚从文件读出来的RDD已经是一行一行的字符串,所以可以直接进行mapToPair
JavaRDD<String> dataRdd = sc.textFile("src/main/resources/peopleSexHeight.txt");
// 处理男性数据
JavaRDD<Integer> maleHeights = dataRdd
.filter(line -> line.split(" ")[1].equalsIgnoreCase("M"))
.map(line -> Integer.parseInt(line.split(" ")[2]));
// 处理女性数据
JavaRDD<Integer> femaleHeights = dataRdd
.filter(line -> line.split(" ")[1].equalsIgnoreCase("F"))
.map(line -> Integer.parseInt(line.split(" ")[2]));
// 计算并打印结果
System.out.println("男性最高身高: " + maleHeights.max(Comparator.naturalOrder()));
System.out.println("男性最低身高: " + maleHeights.min(Comparator.naturalOrder()));
System.out.println("女性最高身高: " + femaleHeights.max(Comparator.naturalOrder()));
System.out.println("女性最低身高: " + femaleHeights.min(Comparator.naturalOrder()));
}
map/reduce求平均身高(rdd)
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("maxMinHeight");
JavaSparkContext sc = new JavaSparkContext(conf);
//刚从文件读出来的RDD已经是一行一行的字符串,所以可以直接进行mapToPair
JavaRDD<String> dataRdd = sc.textFile("src/main/resources/peopleSexHeight.txt");
// 处理男性数据
JavaRDD<Integer> maleHeights = dataRdd
.filter(line -> line.split(" ")[1].equalsIgnoreCase("M"))
.map(line -> Integer.parseInt(line.split(" ")[2]));
long count = maleHeights.count();
// 计算总和和计数
int totalHeight = maleHeights.reduce((a, b) -> a + b);
// 计算并打印平均身高
double average = (double) totalHeight / count;
System.out.printf("男性平均身高: %.2f cm\n", average);
}