Java Spark例子程序

发布于:2025-08-17 ⋅ 阅读:(16) ⋅ 点赞:(0)

spark基础&rdd

docs

docs: https://archive.apache.org/dist/spark/docs/3.1.1/

RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。(不可变、自动容错、位置感知性调度和可伸缩性)

RDD is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel

resilient 英[rɪˈzɪliənt]
adj. 有弹性(或弹力)的;有适应力的;能复原的;可迅速恢复的;
  1. 一组分片(Partition),即数据集的基本组成单位。对于 RDD 来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建 RDD 时指定 RDD 的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的 CPU Core 的数目。
  2. 一个计算每个分区的函数。Spark 中 RDD 的计算是以分片为单位的,每个 RDD 都会实现 compute 函数以达到这个目的。compute 函数会对迭代器进行复合,不需要保存每次计算的结果。
  3. RDD 之间的依赖关系。RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算。
  4. 一个 Partitioner,即 RDD 的分片函数。当前 Spark 中实现了两种类型的分片函数,一个是基于哈希的 HashPartitioner,另外一个是基于范围的 RangePartitioner。只有对于于 key-value 的 RDD,才会有 Partitioner,非 key-value 的 RDD 的 Parititioner 的值是 None。Partitioner 函数不但决定了 RDD 本身的分片数量,也决定了 parent RDD Shuffle 输出时的分片数量。
  5. 一个列表,存储存取每个 Partition 的优先位置(preferred location)。对于一个 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

spark架构

在这里插入图片描述

  1. Driver(驱动程序)‌: 作为应用程序的主控进程,负责解析用户代码、生成DAG(有向无环图)并划分Stage。
  2. Cluster Manager(集群管理器)‌:管理集群资源,支持Standalone、YARN、Mesos和Kubernetes等模式
  3. Worker(工作节点)‌:在集群节点上运行,负责启动Executor进程并监控其状态。‌‌
  4. ‌Executor(执行器):在Worker节点上执行具体任务,管理内存缓存和磁盘I/O,定期向Driver发送心跳。‌‌
  5. ‌Task(任务)‌:最小执行单元,由Executor并行处理,每个Stage被拆分为多个Task。‌‌

特点

  • 内存计算‌:中间结果存储在内存中,相比Hadoop MapReduce减少磁盘I/O,部分速度能提升10-100倍。‌‌
  • 弹性分布式数据集(RDD)‌:基础数据结构,支持容错和并行计算,通过Lineage(血统、依赖链、依赖关系)机制恢复数据。‌‌
  • 多模式支持‌:兼容批处理、流处理(Spark Streaming)、SQL查询(Spark SQL)及机器学习(MLlib)。‌‌

Spark 对比 hadoop MapReduce

spark例子参考:https://xie.infoq.cn/article/71e6677d03b59ce7aa5eec22a

在这里插入图片描述
hadoop MapReduce参考:https://doctording.blog.csdn.net/article/details/78467216

对比hadoop MapReduce:
在这里插入图片描述
Map:
在这里插入图片描述
Reduce:
在这里插入图片描述

spark maven依赖

<!-- Spark core -->
 <dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-core_2.12</artifactId>
     <version>3.1.1</version>
 </dependency>
 <!-- Spark SQL -->
 <dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-sql_2.12</artifactId>
     <version>3.1.1</version>
 </dependency>
 <!-- 如果需要使用 MLlib -->
 <dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-mllib_2.12</artifactId>
     <version>3.1.1</version>
 </dependency>

Spark的checkpoint

spark 中 Checkpoint 的主要作用是斩断 RDD 的依赖链。
Checkpoint 的两种方式

  1. 可靠的将数据存储在可靠的存储引擎中,例如 HDFS。要将 RDD 缓存在本地 block manager 中,在 exquator中。如果 work 崩溃,数据消失。而 RDD 也能存储在 HDFS 中,即一种 checkpoint 可靠的方式,即将 RDD 的数据缓存到 HDFS 中。
  2. 本地的将数据存储在本地,称为 Local checkpoint,较少使用,和 RDD 的区别比较小。本地存储方式不太符合checkpoint 的思想。

https://developer.aliyun.com/article/1086764

https://developer.aliyun.com/article/1329824

transformations、shuffle、actions

transformation 英[ˌtrænsfəˈmeɪʃn]
n. (彻底的)变化,改观,转变,改革;转型;(用于南非)民主改革;

  • map
  • filter
  • flatMap
  • mapPartitions
  • mapPartitionsWithIndex
  • sample
  • union
  • intersection
  • distinct
  • groupByKey
  • reduceByKey
  • aggregateByKey
  • sortByKey
  • join
  • cogroup
  • cartesian
  • pipe
  • coalesce
  • repartition
  • repartitionAndSortWithinPartitions

shuffle 英[ˈʃʌfl]
v. 洗牌;洗(牌);拖着脚走;坐立不安;把(纸张等)变换位置,打乱次序;(笨拙或尴尬地)把脚动来动去;
n.洗牌;拖着脚走;曳步舞;

Operations which can cause a shuffle include repartition operations like repartition and coalesce, ‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.


actions

  • collect()
  • count()
  • first()
  • take(n)
  • takeSample(withReplacement, num, [seed])
  • takeOrdered(n, [ordering])
  • saveAsTextFile(path)
  • saveAsSequenceFile(path)
  • saveAsObjectFile(path)
  • countByKey()
  • foreach(func)

reduceByKey的用法

值可合并的聚合操作

  • scala
val data = List(("a", 1), ("b", 2), ("a", 3), ("b", 4))
val rdd = sc.parallelize(data)
val result = rdd.reduceByKey(_ + _)  // 对相同key的值求和
result.collect().foreach(println)
  • 输出
(a,4)
(b,6)

核心特点

  1. 本地聚合优化‌:先在分区内合并数据(类似MapReduce的combiner),减少shuffle数据量‌
  2. 函数要求‌:聚合函数需满足结合律(如加法、最大值等)
  3. 性能对比‌:比groupByKey更高效,因后者不进行预聚合

典型场景

  1. 统计词频(单词计数)
  2. 分组求和/求平均值
  3. 数据去重后的聚合计算

groupByKey的用法

Spark中的groupByKey是一个针对键值对RDD的核心转换算子

核心机制

  1. 分组逻辑‌:将相同key的所有value合并为迭代器(Iterable),输出格式为(K, Iterable)。与reduceByKey不同,它仅进行分组而不执行聚合操作‌
  2. 执行过程‌:直接对全量数据进行shuffle,不进行本地预聚合,导致网络传输量较大。例如处理(“a”,1), (“a”,2)会直接传输所有键值对,而reduceByKey会先在分区内合并为(“a”,3)再shuffle。

典型应用场景

  1. 需保留所有值的场景‌:如统计每个用户的完整行为序列
  2. 非聚合操作‌:例如对分组后的值进行复杂处理(排序、去重等)

缺点

  1. 数据倾斜风险:单个key对应的value过多会导致OOM,如某key关联百万级记录:使用reduceByKey或aggregateByKey替代(若允许预聚合);对倾斜key单独处理(如加盐分片)
  2. 内存消耗:未压缩的迭代器对象会占用更多内存,建议配合mapValues转换为紧凑数据结构

例子

val data = sc.parallelize(Seq(("a",1), ("b",2), ("a",3)))
val grouped = data.groupByKey()  // 输出: (a, [1,3]), (b, [2])
grouped.foreach(println)

输出

(a,CompactBuffer(1, 3))
(b,CompactBuffer(2))

count / count distinct

count:

  1. 触发计算‌:count是行动算子,会立即触发作业执行
  2. 全量扫描‌:默认会扫描全表数据,大数据集可能耗时较长。
  3. 优化建议‌:
    • 对过滤后的数据统计(先filter再count)
    • 使用缓存(cache())避免重复计算
scala> val rdd = sc.parallelize(Seq(1, 2, 3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> val total = rdd.count()  // 输出:3
total: Long = 3

scala>

  • count distinct例子
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.List;

/**
 * @Author mubi
 * @Date 2025/8/14 23:39
 */
public class Main {
    public static void main(String[] args) throws Exception {
        // 初始化SparkSession
        SparkSession spark = SparkSession.builder()
                .appName("countDistinct")
                .master("local[*]")
                .getOrCreate();


        // 定义Schema
        StructType schema = new StructType(new StructField[]{
                DataTypes.createStructField("id", DataTypes.IntegerType, false),
                DataTypes.createStructField("fruit", DataTypes.StringType, false)
        });

        // 创建Row数据
        List<Row> data = Arrays.asList(
                RowFactory.create(1, "apple"),
                RowFactory.create(2, "banana"),
                RowFactory.create(3, "apple"),
                RowFactory.create(4, "orange"),
                RowFactory.create(5, "banana")
        );

        // 构建DataFrame
        Dataset<Row> df = spark.createDataFrame(data ,schema);

        // 计算不同水果数量
        long distinctFruitsCount = df.select("fruit")
                .distinct()
                .count();

        System.out.println("Number of distinct fruits: " + distinctFruitsCount);

        spark.stop();
    }
}

例子:单词计数

在这里插入图片描述

  • java程序
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

/**
 * @Author mubi
 * @Date 2025/8/14 23:39
 */
public class Main {
    public static void main(String[] args) throws Exception {
        // 创建Spark配置和上下文
        SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 读取文本文件,这里使用Spark自带的示例文本文件
        JavaRDD<String> input = sc.textFile("src/main/resources/words.txt");

        // 将每行文本拆分成单词,并对单词进行计数
        JavaRDD<String> words = input
                .flatMap(s -> Arrays.asList(s.split(" ")).iterator());
        JavaPairRDD<String, Integer> wordCounts = words
                .mapToPair(s -> new Tuple2<>(s, 1))
                .reduceByKey((a, b) -> a + b);

        // 收集结果并打印
        List<Tuple2<String, Integer>> output = wordCounts.collect();
        for (Tuple2<?,?> tuple : output) {
            System.out.println(tuple._1() + ": " + tuple._2());
        }

        // 关闭Spark上下文
        sc.close();
    }


}
  • 输入
hello how are you
fine
hello you
  • 输出
are: 1
fine: 1
you: 2
how: 1
hello: 2

例子:一批人员年龄数据求平均(rdd)

  • java程序
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Random;

/**
 * @Author mubi
 * @Date 2025/8/14 23:39
 */
public class Main {
    public static void main(String[] args) throws Exception {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("AvgAge");
        JavaSparkContext sc = new JavaSparkContext(conf);
        //刚从文件读出来的RDD已经是一行一行的字符串,所以可以直接进行mapToPair
        JavaRDD<String> fileRDD = sc.textFile("src/main/resources/peopleAges.txt");
        final long peopleCnt = fileRDD.count();

        // map: <年龄,数量1>
        JavaPairRDD<Integer, Integer> ageOneRdd = fileRDD.mapToPair(new PairFunction<String, Integer, Integer>() {
            @Override
            public Tuple2<Integer, Integer> call(String s) throws Exception {
                Integer age = Integer.parseInt(s.split("\\s+")[1]);
                return new Tuple2(age, 1);
            }
        });

        // reduce: <年龄,总数量>
        JavaPairRDD<Integer, Integer> ageCountRDD = ageOneRdd
                .reduceByKey((v1, v2) -> v1 + v2)
                .sortByKey();
        ageCountRDD.saveAsTextFile("src/main/resources/ageAnalysis");

        //求总年龄和
        long totalAgeSum = ageCountRDD
                .map(tuple -> (long)tuple._1() * (long)tuple._2())
                .reduce((a, b) -> a + b);
        System.out.println("totalAgeSum:" + totalAgeSum + ",peopleCnt:" + peopleCnt);


        //求平均年龄
        System.out.println("all people avg age is:" + totalAgeSum * 1.0d / peopleCnt);
    }

    //生成年龄数据,格式"序号 年龄"
    public static void makeAgeData() throws IOException {
        File newFile = new File("src/main/resources/peopleAges.txt");
        if (newFile.exists()){
            newFile.delete();
        }
        newFile.createNewFile();
        FileWriter fw = new FileWriter(newFile,true);
        Random rand = new Random();
        for (int i = 1; i <= 1000000; i++) {
            fw.append(i + " " + (rand.nextInt(100) + 1) + "\n");
            fw.flush();
        }
        fw.close();
        System.out.println("makeAgeData finish");
    }
}

例子文件:

1 20
2 30
3 40
4 50
5 60
6 60
7 50
8 40

例子输出

totalAgeSum:350, peopleCnt:8
all people avg age is:43.75

例子:求不同性别的最大/最小身高(Sql)

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Random;

/**
 * @Author mubi
 * @Date 2025/8/14 23:39
 */
public class Main {
    public static void main(String[] args) throws Exception {
        SparkSession spark = SparkSession.builder()
                .appName("HeightAnalysis")
                .master("local[*]")
                .getOrCreate();

        Dataset<Row> df = spark.read()
                .option("delimiter", " ")
                .option("header", "false")
                .csv("src/main/resources/peopleSexHeight.txt")
                .toDF("id", "gender", "height")
                .withColumn("height", functions.col("height").cast("int"));
        System.out.println("=== 数据 ===");
        df.show();

        // 男性身高统计
        Dataset<Row> maleStats = df.filter("gender = 'M'")
                .agg(functions.max("height").alias("max_height"),
                        functions.min("height").alias("min_height"));
        System.out.println("=== 男性身高统计 ===");
        maleStats.show();

        // 女性身高统计
        Dataset<Row> femaleStats = df.filter("gender = 'F'")
                .agg(functions.max("height").alias("max_height"),
                        functions.min("height").alias("min_height"));
        System.out.println("=== 女性身高统计 ===");
        femaleStats.show();

        spark.stop();
    }

    public static void makeData() throws IOException {
        File newFile = new File("src/main/resources/peopleSexHeight.txt");
        if (newFile.exists()){
            newFile.delete();
        }
        newFile.createNewFile();
        FileWriter fw = new FileWriter(newFile,true);
        Random rand = new Random();
        final int N = 1_000_000;
        for (int i = 1; i <= N; i++) {
            String gender = getRandomGender();
            int height = rand.nextInt(220);
            if (height < 100 && gender.equals("M")) {
                height = height + 100;
            }
            if (height < 100 && gender == "F") {
                height = height + 50;
            }
            fw.append(i + " " + gender + " " + height + "\n");
            fw.flush();
        }
        fw.close();
        System.out.println("makeAgeData finish");
    }

    static String getRandomGender() {
        Random rand = new Random();
        int randNum = rand.nextInt(2) + 1;
        if (randNum % 2 == 0) {
            return "M";
        } else {
            return "F";
        }
    }
}
  • 数据
1 M 153
2 F 64
3 M 107
4 F 83
5 F 131
6 M 174
7 F 86
8 M 115
9 M 191
10 F 208
  • 输出
=== 男性身高统计 ===
+----------+----------+
|max_height|min_height|
+----------+----------+
|       191|       107|
+----------+----------+

=== 女性身高统计 ===
+----------+----------+
|max_height|min_height|
+----------+----------+
|       208|        64|
+----------+----------+

附rdd:

 public static void main(String[] args) throws Exception {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("maxMinHeight");
        JavaSparkContext sc = new JavaSparkContext(conf);
        //刚从文件读出来的RDD已经是一行一行的字符串,所以可以直接进行mapToPair
        JavaRDD<String> dataRdd = sc.textFile("src/main/resources/peopleSexHeight.txt");

        // 处理男性数据
        JavaRDD<Integer> maleHeights = dataRdd
                .filter(line -> line.split(" ")[1].equalsIgnoreCase("M"))
                .map(line -> Integer.parseInt(line.split(" ")[2]));


        // 处理女性数据
        JavaRDD<Integer> femaleHeights = dataRdd
                .filter(line -> line.split(" ")[1].equalsIgnoreCase("F"))
                .map(line -> Integer.parseInt(line.split(" ")[2]));

        // 计算并打印结果
        System.out.println("男性最高身高: " + maleHeights.max(Comparator.naturalOrder()));
        System.out.println("男性最低身高: " + maleHeights.min(Comparator.naturalOrder()));
        System.out.println("女性最高身高: " + femaleHeights.max(Comparator.naturalOrder()));
        System.out.println("女性最低身高: " + femaleHeights.min(Comparator.naturalOrder()));
    }

map/reduce求平均身高(rdd)

public static void main(String[] args) throws Exception {
    SparkConf conf = new SparkConf();
    conf.setMaster("local");
    conf.setAppName("maxMinHeight");
    JavaSparkContext sc = new JavaSparkContext(conf);
    //刚从文件读出来的RDD已经是一行一行的字符串,所以可以直接进行mapToPair
    JavaRDD<String> dataRdd = sc.textFile("src/main/resources/peopleSexHeight.txt");

    // 处理男性数据
    JavaRDD<Integer> maleHeights = dataRdd
            .filter(line -> line.split(" ")[1].equalsIgnoreCase("M"))
            .map(line -> Integer.parseInt(line.split(" ")[2]));

    long count = maleHeights.count();
    // 计算总和和计数
    int totalHeight = maleHeights.reduce((a, b) -> a + b);

    // 计算并打印平均身高
    double average = (double) totalHeight / count;
    System.out.printf("男性平均身高: %.2f cm\n", average);
}

网站公告

今日签到

点亮在社区的每一天
去签到