SparkKV转换算子实战解析

发布于:2025-08-06 ⋅ 阅读:(15) ⋅ 点赞:(0)


目录

KV算子

parallelizePairs

mapToPair

mapValues

groupByKey

reduceByKey

sortByKey

算子应用理解

reduceByKey和groupByKey的区别

groupByKey+mapValues实现KV数据的V的操作

改进用reduceByKey

groupby通过K和通过V分组的模板代码

问题集锦

宝贵的经验


这里会讲到之前还未讲到过的KV算子。我们之前的操作都是单值操作,这一篇我们会着重讲到KV操作、行动算子和持久化等知识。

KV算子

作用:操作KV流数据,能够分别操作K和V

出现JavaPairRDD就表示出现了成对KV数据流

parallelizePairs

作用:封装Tuple2集合形成RDD

细节源码如下


 

mapToPair


作用:配合parallelizePairs方法
1.单值数据转化成KV对数据
2.Tuple元组整体转化成KV键值对形式

 


两者一起的代码

        JavaPairRDD<String, Integer> JRD = sc.parallelizePairs(Arrays.asList(a, a1, a2, a3));
        JRD.mapToPair( tuple -> new Tuple2<>(tuple._1, tuple._2*2))
                .collect().forEach(System.out::println);

mapValues

作用:K不变,操作KV流中的V,并且只要类型是JavaPairRDD就可以用此方法

        

示意图

        
代码实现

        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
        sc.parallelize(list)
                .mapToPair(integer -> new Tuple2<Integer,Integer>(integer, integer * 2))
                .mapValues(int1 -> int1 * 2)
                .collect()
                .forEach(System.out::println);

这里配合一个wordcount案例加深一下理解


思考链条:
读取文件textFile --> flatmap扁平化流数据(String[] -> String)->groupby分组 ->mapValues按照V来计算


代码

//TODO 写一个wordcount
        JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));

        JavaRDD<String> rdd = javaSparkContext.textFile("E:\\ideaProjects\\spark_project\\data\\wordcount");

        rdd.flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public Iterator<String> call(String s) throws Exception {
                        return Arrays.asList(s.split(" ")).iterator();
                    }
                })
                .groupBy(n -> n)
                .mapValues(
                        iter -> {
                            int sum = 0;
                            for (String word : iter) {
                                sum++;
                            }
                            return sum;
                        }
                ).collect().forEach(System.out::println);
        javaSparkContext.close();

 所以,整个转换过程是:
   - 输入:一行字符串(`String`)
   - 用`split`方法:将该行字符串分割成字符串数组(`String[]`)
   - 用`Arrays.asList`:将字符串数组转换为字符串列表(`List<String>`)
   - 调用列表的`iterator`方法:得到字符串的迭代器(`Iterator<String>`)
   - 在`flatMap`中,Spark会遍历这个迭代器,将每个字符串(单词)作为新元素放入结果RDD。

flatmap本质:都是将数组转换成一个可以逐个访问其元素的迭代器

groupByKey

作用:将KV对按照K对V进行分组


代码实现
 

        JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));
        Tuple2<String, Integer> a = new Tuple2<>("a", 1);
        Tuple2<String, Integer> b = new Tuple2<>("b", 2);
        Tuple2<String, Integer> c = new Tuple2<>("a", 3);
        Tuple2<String, Integer> d = new Tuple2<>("b", 4);

        javaSparkContext.parallelizePairs(Arrays.asList(a, b, c, d))
                .collect().forEach(System.out::println);
        System.out.println();
        javaSparkContext.parallelizePairs(Arrays.asList(a, b, c, d))
                        .groupByKey(3).collect().forEach(System.out::println);

reduceByKey

作用:在KV对中,按照K对V进行聚合操作,(底层会在分区内进行预聚合优化)


代码实现
对二元组进行按照K对V相加的聚合操作

        

                javaSparkContext.parallelizePairs(tuple2s)
                .reduceByKey(new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;
                    }
                })
                .collect().forEach(System.out::println);

sortByKey

        
作用:按照K进行XXX的升序/降序排列

代码实现

JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));
        Tuple2<String, Integer> aa0 = new Tuple2<>("a", 4);
        Tuple2<String, Integer> aa1 = new Tuple2<>("a", 1);
        Tuple2<String, Integer> aa2 = new Tuple2<>("a", 2);
        Tuple2<String, Integer> bb1 = new Tuple2<>("b", 2);
        Tuple2<String, Integer> aa3 = new Tuple2<>("a", 3);
        Tuple2<String, Integer> bb2 = new Tuple2<>("b", 1);
        ArrayList<Tuple2<String, Integer>> tuple2s = new ArrayList<>(Arrays.asList(aa0,aa1, aa2, aa3, bb1, bb2));
        javaSparkContext.parallelizePairs(tuple2s)
                        .sortByKey().collect().forEach(System.out::println);

        javaSparkContext.close();

传入参数为false时



Comparable接口的使用

利用自定义类型进行排序操作

        JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));
        Artist at1 = new Artist("小王", 100);
        Artist at2 = new Artist("小李", 1000);
        Artist at3 = new Artist("小赵", 10000);
        Artist at4 = new Artist("小宇", 100000);
        Tuple2<Artist, Integer> artistIntegerTuple2 = new Tuple2<>(at1, 1);
        Tuple2<Artist, Integer> artistIntegerTuple3 = new Tuple2<>(at2, 2);
        Tuple2<Artist, Integer> artistIntegerTuple4 = new Tuple2<>(at3, 3);
        Tuple2<Artist, Integer> artistIntegerTuple5 = new Tuple2<>(at4, 4);

        JavaPairRDD<Artist, Integer> artistIntegerJavaPairRDD = javaSparkContext.parallelize(Arrays.asList(artistIntegerTuple2, artistIntegerTuple3, artistIntegerTuple4, artistIntegerTuple5))
                .mapToPair(t -> t);

        artistIntegerJavaPairRDD.sortByKey().collect().forEach(System.out::println);

        javaSparkContext.close();

class Artist implements Serializable, Comparable<Artist> {
    String name;
    int salary;

    public Artist(String name, int salary) {
        this.name = name;
        this.salary = salary;
    }

    @Override
    public int compareTo(Artist o) {
        return o.salary - this.salary;
    }

    @Override
    public String toString() {
        return "Artist{" +
                "name='" + name + '\'' +
                ", salary=" + salary +
                '}';
    }
}

coalesce

        
作用:缩减分区,不会自动进行shuffle


示意图


代码实现

        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
        javaSparkContext.parallelize(tuple2s)
                .coalesce(2).collect().forEach(System.out::println);
        javaSparkContext.close();


repartition

        
作用:调整分区数,等价于coalesce的shuffle=true时

示意图


代码实现
 

        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
        javaSparkContext.parallelize(tuple2s)
                .repartition(2).saveAsTextFile("out2");
        javaSparkContext.close();

算子应用理解

reduceByKey和groupByKey的区别

 

性能更高:在shuffle之前有一个预聚合的功能Combine,可以将分区中的小文件合并,减少shuffle落盘的数据量
因此在实际开发中





groupByKey+mapValues实现KV数据的V的操作

JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));
        Tuple2<String, Integer> a = new Tuple2<>("a", 1);
        Tuple2<String, Integer> b = new Tuple2<>("b", 2);
        Tuple2<String, Integer> c = new Tuple2<>("a", 3);
        Tuple2<String, Integer> d = new Tuple2<>("b", 4);
        
        System.out.println();
        javaSparkContext.parallelizePairs(Arrays.asList(a, b, c, d))
                        .groupByKey(3).mapValues(new Function<Iterable<Integer>, Integer>() {
                    @Override
                    public Integer call(Iterable<Integer> v1) throws Exception {
                        int sum = 0;
                        for (Integer v2 : v1) {
                            sum += v2;
                        }
                        return sum;
                    }
                }).collect().forEach(System.out::println);

        javaSparkContext.close();

改进用reduceByKey

JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));
        Tuple2<String, Integer> a = new Tuple2<>("a", 1);
        Tuple2<String, Integer> b = new Tuple2<>("b", 1);
        Tuple2<String, Integer> c = new Tuple2<>("a", 2);
        Tuple2<String, Integer> d = new Tuple2<>("b", 2);
        ArrayList<Tuple2<String, Integer>> tuple2s = new ArrayList<>(Arrays.asList(a, b, c, d));
javaSparkContext.parallelizePairs(tuple2s)
                .reduceByKey(new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;
                    }
                })
                .collect().forEach(System.out::println);

        javaSparkContext.close();

groupby通过K和通过V分组的模板代码
 

JavaSparkContext javaSparkContext = new JavaSparkContext(new SparkConf().setMaster("local[*]").setAppName("artical4"));
        Tuple2<String, Integer> a = new Tuple2<>("a", 1);
        Tuple2<String, Integer> b = new Tuple2<>("b", 1);
        Tuple2<String, Integer> c = new Tuple2<>("a", 2);
        Tuple2<String, Integer> d = new Tuple2<>("b", 2);

        System.out.println();
        javaSparkContext.parallelizePairs(Arrays.asList(a, b, c, d))
                .groupBy(new Function<Tuple2<String, Integer>, Integer>() {
                    @Override
                    public Integer call(Tuple2<String, Integer> v1) throws Exception {
                        return v1._2(); //通过Values分组 将2改为1就是通过K分组
                    }
                })
                        .collect().forEach(System.out::println);

        javaSparkContext.close();

数据转换图

问题集锦


1.iterator迭代器怎么迭代,它在mapValues方法中的传出类型是iterator类型,并且在将Lambda和匿名内部类互转的时候注意传出泛型即可。(其中封装了两种迭代方法)

        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);

        JavaPairRDD<Integer, Iterable<Integer>> groupByRDD = sc.parallelize(list)
                .groupBy(n -> n % 2, 2);
        groupByRDD.mapValues(
                new Function<Iterable<Integer>, Integer>() {
                    public Integer call(Iterable<Integer> integers) {
                        int sum = 0;
                        Iterator<Integer> iterator = integers.iterator();
                        while (iterator.hasNext()) {
                            sum += iterator.next();
                        }
                        return sum;
//                        int sum = 0;
//                        for (Integer i : integers) {
//                            sum += i;
//                        }
//                        return sum;
                    }
                }
        ).collect().forEach(System.out::println);

宝贵的经验

1.function函数传入泛型不能修改,但是传出泛型可以修改



2.正则表达式可以通过中括号将多次分割的逻辑封装到一行代码中



3.RDD采用了和javaIO一样的设计模式-装饰者设计模式,将对象嵌套实现功能



网站公告

今日签到

点亮在社区的每一天
去签到