Spark应用开发--WordCount实战

发布于:2025-06-25 ⋅ 阅读:(21) ⋅ 点赞:(0)

前言

RDD

转换算子

行动算子

1.RDD创建

1.1用sparkContext.parallelize(...)方法创建RDD

1.2用sparkContext.textFile(...)创建RDD

2.行动算子之collect

3.行动算子之take

4.转换算子之map

5.转换算子之flatMap

6.转换算子之sortBy


前言

RDD

RDD是弹性分布式数据集(Resilient Distributed Dataset, RDD)英文缩写,它是Spark编程中最核心的数据对象

RDD具有不可变可容错可伸缩分布式易用(Spark的计算过程可以简单抽象为对RDD的创建、转换操作和行动操作的过程)等特点。

转换算子

对已有的RDD中的数据执行转换,并产生新的RDD的方法,即为转换(Transformation)算子。

转换算子采用延迟计算机制。如果RDD的行动算子没有被调用,那么转换算子就不会被执行

常见的转换算子有map、flatMap、filter、groupByKey、cache等方法。

行动算子

对已有的RDD中的数据执行计算并产生结果,将结果返回Driver程序或者写入到外部物理存储(如HDFS)的方法,即为行动(Action)算子

常见的行动算子有reduce、collect、take、saveAsTextFile等方法。

1.RDD创建

1.1用sparkContext.parallelize(...)方法创建RDD

parallelize方法的第二个参数是指定分区数,也就是将RDD对应的数据分成多少份来处理。

一个RDD在物理上被切分成多个分区,这些分区可以被分布在不同的节点

每一个分区,都会被分配一个task任务来处理,有多少分区,就会有多少task任务,因此分区数决定了并行度(通俗来说,分区越多,并发相对就越高)。

1.2用sparkContext.textFile(...)创建RDD

说明:(1)textFile(...)的第二个参数可以指定最小分区数。默认情况,Spark为HDFS的每个数据块创建一个分区。

  1. textFile(...)也可以读取本地文件系统:例如:sc.textFile(“D:\\hello.txt”)
  2. textFile(...)的第一个参数,可以是具体的文件名,也支持通配符,如:sc.textFile(“D:\\*.txt”),*就是通配符,表示读取D盘下的所有文件名后缀为.txt的文件。

2.行动算子之collect

在测试代码时经常使用该算子。collect算子的作用是从Executor侧将RDD的所有数据取回给 Driver程序(即我们编写的Spark应用程序即为Driver程序)。从代码上看,就是将RDD的所有数据赋值给main方法的某个List<T>类型的局部变量

Executor侧:将spark任务提交到YARN集群运行时(如:spark-submit --master=yarn spark-demo.jar yarn),RDD底层分区是由集群中不同节点上的Executor程序分别处理的,Executor程序所在的节点即可理解为Executor侧。

注意:如果RDD数据量很大,比如几百G,那么采用collect算子取回数据就可能带来内存溢出(OOM,out of memory)的风险(一般运行Driver进程被分配的内存也就一两个G)。

3.行动算子之take

take算子与collect方法类似,collect是返回RDD的所有元素,而take返回指定的前N个数据

以下为SparkDemoAPP代码示例

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
 
import java.util.List;
 
public class SparkDemoApp {
    public static void main(String[] args) {
 
        /**
         * 第一个Spark应用程序
        SparkConf sparkConf = new SparkConf()
                .setAppName("spark_demo2")
                .setMaster(args[0])
                .set("spark.driver.bindAddress", "0.0.0.0"); // 指定网络地址
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        JavaRDD<String> rdd = sc.textFile("hdfs://192.168.121.3:8020/hellospark1.txt");
        rdd.foreach(new VoidFunction<String>() {
                        @Override
                        public void call(String s) throws Exception {
                            System.out.println(s);
                            System.out.println("hhhhhhhhhhhhhhhhhhhhh");
                        }
                    }
        );
        sc.stop();
         **/
 
        /**
         * 4.2.1 parallelize
        SparkConf sparkConf = new SparkConf()
                .setAppName("spark_demo2")
                .setMaster(args[0])
                .set("spark.driver.bindAddress", "0.0.0.0"); // 指定网络地址
 
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        List<String> data = Arrays.asList("hello word", "hello hadoop", "hello spark");
        JavaRDD<String> rdd3 = sc.parallelize(data , 3); //[[hello word], [hello hadoop], [hello spark]]
        System.out.println(rdd3.glom().collect());
 
        JavaRDD<String> rdd2 = sc.parallelize(data , 2); //[[hello word], [hello hadoop, hello spark]]
        System.out.println(rdd2.glom().collect());
 
        JavaRDD<String> rdd1 = sc.parallelize(data , 1); //[[hello word, hello hadoop, hello spark]]
        System.out.println(rdd1.glom().collect());
         **/
 
        /**
         * 4.2.2 textFile
        SparkConf sparkConf = new SparkConf()
                .setAppName("spark_demo2")
                .setMaster(args[0])
                .set("spark.driver.bindAddress", "0.0.0.0"); // 指定网络地址
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        JavaRDD<String> rdd = sc.textFile("E:\\hsn(勿删,2024-2025下学期仍需使用)\\spark\\hellospark1.txt");
        System.out.println(rdd.glom().collect());
         **/
 
        /**
         * 4.2.3 collect
         SparkConf sparkConf = new SparkConf()
         .setAppName("spark_demo2")
         .setMaster(args[0])
         .set("spark.driver.bindAddress", "0.0.0.0"); // 指定网络地址
 
         JavaSparkContext sc = new JavaSparkContext(sparkConf);
         JavaRDD<String> rdd = sc.textFile("hdfs://192.168.121.3:8020/hellospark1.txt");
 
         // collect 容易内存溢出,生产环境不要用 1024GB=1TB,1024TB=1PB
         List<String> res = rdd.collect();// ["a a a a" , "bb bb b b b" , "CCCC C cc cc cc" , "123 123 1123 123 1123" , "hsn hsn"]
         for (String re : res) {
         System.out.println(re);
         }
 
         // foreach 可以避免内存溢出
         rdd.foreach(new VoidFunction<String>() {
        @Override
        public void call(String s) throws Exception {
        System.out.println(s);
        }
        });
         */
 
        /**
         * 4.2.4 take
         */
        SparkConf sparkConf = new SparkConf()
                .setAppName("spark_demo2")
                .setMaster(args[0])
                .set("spark.driver.bindAddress", "0.0.0.0"); // 指定网络地址
 
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        JavaRDD<String> rdd = sc.textFile("hdfs://192.168.121.3:8020/hellospark1.txt");
        List<String> res = rdd.take(2);
        for (String re : res) {
            System.out.println(re);
        }
 
    }
}

4.转换算子之map

map操作是最常用的转换算子,其将RDD的每个元素传入自定义函数后得到新的元素,并用新的元素组成新的RDD。

示例:将一串#号加在文件每一行内容后面,并将修改后的内容输出到控制台

	import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
 
import java.util.List;
 
public class MapTest {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("spark_demo2")
                .setMaster("local")
                .set("spark.driver.bindAddress", "0.0.0.0"); // 指定网络地址
 
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
 
        // 原本的rdd算子
        JavaRDD<String> rdd = sc.textFile("hdfs://192.168.121.3:8020/data/num.txt");
 
        // 新的rdd算子
        // 写到JavaRDD<String> newrdd = rdd.map()的时候,在map()里写入new,自带出的Function<String, String>()方法
        JavaRDD<String> newrdd = rdd.map(new Function<String, String>() {
            /** 课本map案例
             *
             * @param s
             * @return s
             * @throws Exception 课本map案例
 
            @Override
            public String call(String s) throws Exception { //s => a a a a
 
                 // 自定义的函数
                 System.out.println("+++++++++");
                 s = s + "######";
 
                 //第一次循环
                 //+++++++++
                 //a a a a######
 
                 // 返回新的算子
                 return s;
 
            }*/
 
 
            /** spark的map方法转化成原MapReduce的map方法
             *
             * @param s
             * @return
             * @throws Exception spark的map方法转化成原MapReduce的map方法
 
            @Override
            public String call(String s) throws Exception { //s => a a a a
                String news="";
                String[] words = s.split(" "); //a a a a => ["a","a","a","a"]
                for (String word : words) {
                    // word循环结果
                    // a#####
                    // a#####
                    // a#####
                    // a#####
                    word += "#####";
 
                    // news拼接结果 a##### a##### a##### a#####
                    news = news + word+" ";
                }
                //news = news.substring(0,news.length()-1);
                return news;
            }*/
 
 
            /** 4.5课堂练习,每个文本的每行的每个数字+10
             *
             * @param s
             * @return
             * @throws Exception
             **/
            @Override
            public String call(String s) throws Exception { //s => 1 2 3 4
                String news="";
                String[] words = s.split(" "); //1 2 3 4 => ["1","2","3","4"]
                for (String word : words) {
                    Integer num = 0;
                    // num(word)循环结果
                    // 11
                    // 12
                    // 13
                    // 14
                    num = Integer.parseInt(word) + 10;
 
                    // news拼接结果 11 12 13 14
                    news = news + num + " ";
                }
                //news = news.substring(0,news.length()-1);
                return news;
            }
        });
 
        List<String> res = newrdd.collect();
        for (String re : res) {
            System.out.println(re);
        }
    }
}

5.转换算子之flatMap

将RDD的所有元素传入自定义的函数后,返回一个迭代器对象,然后将所有返回的迭代器的所有的元素构造成一个新的RDD

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
 
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
 
public class FlatMapTest {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("spark_demo2")
                .setMaster("local")
                .set("spark.driver.bindAddress", "0.0.0.0"); // 指定网络地址
 
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
 
        // 原本的rdd算子
        JavaRDD<String> rdd = sc.textFile("hdfs://192.168.121.3:8020/data/num.txt");
 
        JavaRDD<String> newrdd = rdd.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                /**
                 * 第三步(其实第二步就可以了)
                 * 再写详细点
                 * 1、把输入的s,使用split方法完成切割
                 * 2、切割完成后,转化成迭代器类型Iterator<String>
                 *      2.1、写入Word.能找到转换结果的提示,找到Iterator<String>类型的一行
                 *      2.2、点击选择Iterator<String>类型的一行,自动包装成Arrays.stream(words).iterator()
                 *      2.3、写完整初始化(Iterator<String>)+自定义变量(Word)+赋值的过程(= Arrays.stream(words).iterator();)
                 *      2。4、完整的结果:Iterator<String> Word = Arrays.stream(words).iterator();
                 * 3、完成return的填入
                 */
                String[] words = s.split(" ");
                Iterator<String> Word = Arrays.stream(words).iterator();
                return Word;
 
                /**
                 * 第二步
                 * 分析了课本的内容,本质上还是需要对s进行split使用空格完成分割
                 * 所以,做一个数组类型,String[] words = s.split(" ");
                 * 第一次把words写入return里,return words;查看到需要的是Iterator<String>类型,但是我们提供的是String[]类型
                 * 必然需要让words转化格式,尝试在words后面输入一个点.      查看提示,   return words.;
                 * 出现了一个提示列表,并且看到一个stream().iterator()的提示词,并且它是一个Iterator<String>类型
                 * 最终点击它选择,IDEA自动完成了数据类型的转化
                 * 由写入的return words.  变成了  return Arrays.stream(words).iterator();
                 */
                //String[] words = s.split(" ");
                //return words;
                //return Arrays.stream(words).iterator();
 
                /**
                 * 第一步,分析原课本返回的内容,就是return Arrays.asList(s.split(" ")).iterator();
                 */
                //return Arrays.asList(s.split(" ")).iterator();
            }
        });
 
        List<String> res = newrdd.collect();
        for (String re : res) {
            System.out.println(re);
        }
    }
}

6.转换算子之sortBy

sortBy参数:

Function<T, S> f  :  sortBy根据Function<T,S>对象的call方法的返回值来对RDD元素进行排序

boolean ascending:  true表示升序,false表示降序

int numPartitions:  指定分区数,尽量和父RDD的分区数保持一致。RDD对象的getNumPartitions()可以获得分区数。

举例:自定义一个List<User>类型对象(User含有一个年龄属性和一个课程成绩属性),然后使用sortBy对这些User按照课程成绩由大到小排序,成绩相同的则按照年龄由小到大排序,并将排序后的结果输出到控制台。

List<User> =>(复合类型)

{

User3(25, 85),
User3(20, 90),
User3(22, 90),
User3(18, 95),
User3(30, 85)

}

第一个,单字段排序案例代码:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.sources.In;
 
import java.util.Arrays;
import java.util.List;
 
public class SortByTest {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("spark_demo2")
                .setMaster("local")
                .set("spark.driver.bindAddress", "0.0.0.0"); // 指定网络地址
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        //JavaRDD<String> rdd = sc.textFile("hdfs://192.168.121.3:8020/data/num.txt");
        JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(9,10,1,2,3,4,5,6,7),2);
 
        /*JavaRDD<Integer> rdd1 = rdd.map(new Function<String, Integer>() {
            @Override
            public Integer call(String s) throws Exception {
                return Integer.parseInt(s);
            }
        });*/
 
        // sortBy算子 有三个参数
        // 第一个参数,call(),返回结果,填入new F有提示,后续需要导入具体实现函数
        // 第二个参数,写入true或者false,控制升序或者降序
        // 第三个参数,写入一个整数,控制分区数
        JavaRDD<Integer> newrdd = rdd.sortBy(new Function<Integer, Object>() {
            @Override
            public Object call(Integer s) throws Exception {
                return s;
            }
        }, false, 2);
 
        System.out.println("带glom()方法:"+newrdd.glom().collect());
 
        // 行动算子
        List<Integer> res = newrdd.collect();
        for (Integer re : res) {
            System.out.println(re);
        }
    }
}

第二个,双字段排序(类):

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
 
// 1、user用户类
class User3 implements Serializable {
    private int age;
    private int score;
 
    //构造函数,实现函数
    public User3(int age, int score) {
        this.age = age;
        this.score = score;
    }
 
    public User3(){
 
    }
 
 
    public int getAge() { return age; }
    public int getScore() { return score; }
 
    @Override
    public String toString() {
        return "User3 = {" +
                "age=" + age +
                ", score=" + score +
                '}';
    }
}
 
public class SortByTest3 {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("spark_demo2")
                .setMaster("local")
                .set("spark.driver.bindAddress", "0.0.0.0");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
 
        // 初始化users类数据集,Arrays.asList方法快速简洁地新增数据
        List<User3> users = Arrays.asList(
                new User3(25, 85),
                new User3(22, 90),
                new User3(20, 90),
                new User3(18, 95),
                new User3(30, 85)
        );
        JavaRDD<User3> userRDD = sc.parallelize(users, 2);
 
        // 转化算子
        JavaRDD<User3> sortedRDD = userRDD.sortBy(
                new Function<User3, String>() {
                    @Override
                    public String call(User3 user) throws Exception {
                        // 如果是多个排序,返回值必须包含多个变量,并按顺序拼接填入
                        // 如,先按成绩排序,再按年龄排序的话,返回值应该是按照成绩字段在前、年龄字段在后的方式返回
                        // 当前案例是返回一个String类型,就应该是return user.getScore()+"--"+user.getAge()+"";
                        return user.getScore()+user.getAge()+"";//因为前面是user类的字段属性是int类型,返回string类型的话加一个空字符串“”能自动转换
                        /**return user.getAge()+"--"+user.getScore()+"";
                         * 排序结果:
                         * User3 = {age=18, score=95}
                         * User3 = {age=20, score=90}
                         * User3 = {age=22, score=90}
                         * User3 = {age=25, score=85}
                         * User3 = {age=30, score=85}
                         */
 
                        /**跟前一个,返回的值有前后顺序更改
                         * return user.getScore()+"--"+user.getAge()+"";
                         * 排序结果:
                         * User3 = {age=25, score=85}
                         * User3 = {age=30, score=85}
                         * User3 = {age=20, score=90}
                         * User3 = {age=22, score=90}
                         * User3 = {age=18, score=95}
                         */
 
                        /**return user.getScore()+"";
                         * 排序结果:
                         * User3 = {age=25, score=85}
                         * User3 = {age=30, score=85}
                         * User3 = {age=22, score=90}
                         * User3 = {age=20, score=90}
                         * User3 = {age=18, score=95}
                         */
                    }
                },
                true, // 保持升序(实际排序由CompositeKey的compareTo控制)
                2
        );
 
        System.out.println("glom()结果:"+sortedRDD.glom().collect());
 
        // 行动算子
        List<User3> results = sortedRDD.collect();
        System.out.println("排序结果:");
        for (User3 result : results) {
            System.out.println(result.getAge()+"-??-"+result.getScore()+"!!"); // 直接打印对象(已重写toString)
        }
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到