1.1用sparkContext.parallelize(...)方法创建RDD
1.2用sparkContext.textFile(...)创建RDD
前言
RDD
RDD是弹性分布式数据集(Resilient Distributed Dataset, RDD)英文缩写,它是Spark编程中最核心的数据对象。
RDD具有不可变、可容错、可伸缩、分布式、易用(Spark的计算过程可以简单抽象为对RDD的创建、转换操作和行动操作的过程)等特点。
转换算子
对已有的RDD中的数据执行转换,并产生新的RDD的方法,即为转换(Transformation)算子。
转换算子采用延迟计算机制。如果RDD的行动算子没有被调用,那么转换算子就不会被执行。
常见的转换算子有map、flatMap、filter、groupByKey、cache等方法。
行动算子
对已有的RDD中的数据执行计算并产生结果,将结果返回Driver程序或者写入到外部物理存储(如HDFS)的方法,即为行动(Action)算子
常见的行动算子有reduce、collect、take、saveAsTextFile等方法。
1.RDD创建
1.1用sparkContext.parallelize(...)方法创建RDD
parallelize方法的第二个参数是指定分区数,也就是将RDD对应的数据分成多少份来处理。
一个RDD在物理上被切分成多个分区,这些分区可以被分布在不同的节点。
每一个分区,都会被分配一个task任务来处理,有多少分区,就会有多少task任务,因此分区数决定了并行度(通俗来说,分区越多,并发相对就越高)。
1.2用sparkContext.textFile(...)创建RDD
说明:(1)textFile(...)的第二个参数可以指定最小分区数。默认情况,Spark为HDFS的每个数据块创建一个分区。
- textFile(...)也可以读取本地文件系统:例如:sc.textFile(“D:\\hello.txt”)
- textFile(...)的第一个参数,可以是具体的文件名,也支持通配符,如:sc.textFile(“D:\\*.txt”),*就是通配符,表示读取D盘下的所有文件名后缀为.txt的文件。
2.行动算子之collect
在测试代码时经常使用该算子。collect算子的作用是从Executor侧将RDD的所有数据取回给 Driver程序(即我们编写的Spark应用程序即为Driver程序)。从代码上看,就是将RDD的所有数据赋值给main方法的某个List<T>类型的局部变量。
Executor侧:将spark任务提交到YARN集群运行时(如:spark-submit --master=yarn spark-demo.jar yarn),RDD底层分区是由集群中不同节点上的Executor程序分别处理的,Executor程序所在的节点即可理解为Executor侧。
注意:如果RDD数据量很大,比如几百G,那么采用collect算子取回数据就可能带来内存溢出(OOM,out of memory)的风险(一般运行Driver进程被分配的内存也就一两个G)。
3.行动算子之take
take算子与collect方法类似,collect是返回RDD的所有元素,而take返回指定的前N个数据
以下为SparkDemoAPP代码示例
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.List;
public class SparkDemoApp {
public static void main(String[] args) {
/**
* 第一个Spark应用程序
SparkConf sparkConf = new SparkConf()
.setAppName("spark_demo2")
.setMaster(args[0])
.set("spark.driver.bindAddress", "0.0.0.0"); // 指定网络地址
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = sc.textFile("hdfs://192.168.121.3:8020/hellospark1.txt");
rdd.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
System.out.println("hhhhhhhhhhhhhhhhhhhhh");
}
}
);
sc.stop();
**/
/**
* 4.2.1 parallelize
SparkConf sparkConf = new SparkConf()
.setAppName("spark_demo2")
.setMaster(args[0])
.set("spark.driver.bindAddress", "0.0.0.0"); // 指定网络地址
JavaSparkContext sc = new JavaSparkContext(sparkConf);
List<String> data = Arrays.asList("hello word", "hello hadoop", "hello spark");
JavaRDD<String> rdd3 = sc.parallelize(data , 3); //[[hello word], [hello hadoop], [hello spark]]
System.out.println(rdd3.glom().collect());
JavaRDD<String> rdd2 = sc.parallelize(data , 2); //[[hello word], [hello hadoop, hello spark]]
System.out.println(rdd2.glom().collect());
JavaRDD<String> rdd1 = sc.parallelize(data , 1); //[[hello word, hello hadoop, hello spark]]
System.out.println(rdd1.glom().collect());
**/
/**
* 4.2.2 textFile
SparkConf sparkConf = new SparkConf()
.setAppName("spark_demo2")
.setMaster(args[0])
.set("spark.driver.bindAddress", "0.0.0.0"); // 指定网络地址
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = sc.textFile("E:\\hsn(勿删,2024-2025下学期仍需使用)\\spark\\hellospark1.txt");
System.out.println(rdd.glom().collect());
**/
/**
* 4.2.3 collect
SparkConf sparkConf = new SparkConf()
.setAppName("spark_demo2")
.setMaster(args[0])
.set("spark.driver.bindAddress", "0.0.0.0"); // 指定网络地址
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = sc.textFile("hdfs://192.168.121.3:8020/hellospark1.txt");
// collect 容易内存溢出,生产环境不要用 1024GB=1TB,1024TB=1PB
List<String> res = rdd.collect();// ["a a a a" , "bb bb b b b" , "CCCC C cc cc cc" , "123 123 1123 123 1123" , "hsn hsn"]
for (String re : res) {
System.out.println(re);
}
// foreach 可以避免内存溢出
rdd.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
*/
/**
* 4.2.4 take
*/
SparkConf sparkConf = new SparkConf()
.setAppName("spark_demo2")
.setMaster(args[0])
.set("spark.driver.bindAddress", "0.0.0.0"); // 指定网络地址
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> rdd = sc.textFile("hdfs://192.168.121.3:8020/hellospark1.txt");
List<String> res = rdd.take(2);
for (String re : res) {
System.out.println(re);
}
}
}
4.转换算子之map
map操作是最常用的转换算子,其将RDD的每个元素传入自定义函数后得到新的元素,并用新的元素组成新的RDD。
示例:将一串#号加在文件每一行内容后面,并将修改后的内容输出到控制台
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.util.List;
public class MapTest {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf()
.setAppName("spark_demo2")
.setMaster("local")
.set("spark.driver.bindAddress", "0.0.0.0"); // 指定网络地址
JavaSparkContext sc = new JavaSparkContext(sparkConf);
// 原本的rdd算子
JavaRDD<String> rdd = sc.textFile("hdfs://192.168.121.3:8020/data/num.txt");
// 新的rdd算子
// 写到JavaRDD<String> newrdd = rdd.map()的时候,在map()里写入new,自带出的Function<String, String>()方法
JavaRDD<String> newrdd = rdd.map(new Function<String, String>() {
/** 课本map案例
*
* @param s
* @return s
* @throws Exception 课本map案例
@Override
public String call(String s) throws Exception { //s => a a a a
// 自定义的函数
System.out.println("+++++++++");
s = s + "######";
//第一次循环
//+++++++++
//a a a a######
// 返回新的算子
return s;
}*/
/** spark的map方法转化成原MapReduce的map方法
*
* @param s
* @return
* @throws Exception spark的map方法转化成原MapReduce的map方法
@Override
public String call(String s) throws Exception { //s => a a a a
String news="";
String[] words = s.split(" "); //a a a a => ["a","a","a","a"]
for (String word : words) {
// word循环结果
// a#####
// a#####
// a#####
// a#####
word += "#####";
// news拼接结果 a##### a##### a##### a#####
news = news + word+" ";
}
//news = news.substring(0,news.length()-1);
return news;
}*/
/** 4.5课堂练习,每个文本的每行的每个数字+10
*
* @param s
* @return
* @throws Exception
**/
@Override
public String call(String s) throws Exception { //s => 1 2 3 4
String news="";
String[] words = s.split(" "); //1 2 3 4 => ["1","2","3","4"]
for (String word : words) {
Integer num = 0;
// num(word)循环结果
// 11
// 12
// 13
// 14
num = Integer.parseInt(word) + 10;
// news拼接结果 11 12 13 14
news = news + num + " ";
}
//news = news.substring(0,news.length()-1);
return news;
}
});
List<String> res = newrdd.collect();
for (String re : res) {
System.out.println(re);
}
}
}
5.转换算子之flatMap
将RDD的所有元素传入自定义的函数后,返回一个迭代器对象,然后将所有返回的迭代器的所有的元素构造成一个新的RDD
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class FlatMapTest {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf()
.setAppName("spark_demo2")
.setMaster("local")
.set("spark.driver.bindAddress", "0.0.0.0"); // 指定网络地址
JavaSparkContext sc = new JavaSparkContext(sparkConf);
// 原本的rdd算子
JavaRDD<String> rdd = sc.textFile("hdfs://192.168.121.3:8020/data/num.txt");
JavaRDD<String> newrdd = rdd.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
/**
* 第三步(其实第二步就可以了)
* 再写详细点
* 1、把输入的s,使用split方法完成切割
* 2、切割完成后,转化成迭代器类型Iterator<String>
* 2.1、写入Word.能找到转换结果的提示,找到Iterator<String>类型的一行
* 2.2、点击选择Iterator<String>类型的一行,自动包装成Arrays.stream(words).iterator()
* 2.3、写完整初始化(Iterator<String>)+自定义变量(Word)+赋值的过程(= Arrays.stream(words).iterator();)
* 2。4、完整的结果:Iterator<String> Word = Arrays.stream(words).iterator();
* 3、完成return的填入
*/
String[] words = s.split(" ");
Iterator<String> Word = Arrays.stream(words).iterator();
return Word;
/**
* 第二步
* 分析了课本的内容,本质上还是需要对s进行split使用空格完成分割
* 所以,做一个数组类型,String[] words = s.split(" ");
* 第一次把words写入return里,return words;查看到需要的是Iterator<String>类型,但是我们提供的是String[]类型
* 必然需要让words转化格式,尝试在words后面输入一个点. 查看提示, return words.;
* 出现了一个提示列表,并且看到一个stream().iterator()的提示词,并且它是一个Iterator<String>类型
* 最终点击它选择,IDEA自动完成了数据类型的转化
* 由写入的return words. 变成了 return Arrays.stream(words).iterator();
*/
//String[] words = s.split(" ");
//return words;
//return Arrays.stream(words).iterator();
/**
* 第一步,分析原课本返回的内容,就是return Arrays.asList(s.split(" ")).iterator();
*/
//return Arrays.asList(s.split(" ")).iterator();
}
});
List<String> res = newrdd.collect();
for (String re : res) {
System.out.println(re);
}
}
}
6.转换算子之sortBy
sortBy参数:
Function<T, S> f : sortBy根据Function<T,S>对象的call方法的返回值来对RDD元素进行排序
boolean ascending: true表示升序,false表示降序
int numPartitions: 指定分区数,尽量和父RDD的分区数保持一致。RDD对象的getNumPartitions()可以获得分区数。
举例:自定义一个List<User>类型对象(User含有一个年龄属性和一个课程成绩属性),然后使用sortBy对这些User按照课程成绩由大到小排序,成绩相同的则按照年龄由小到大排序,并将排序后的结果输出到控制台。
List<User> =>(复合类型)
{
User3(25, 85),
User3(20, 90),
User3(22, 90),
User3(18, 95),
User3(30, 85)
}
第一个,单字段排序案例代码:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.sources.In;
import java.util.Arrays;
import java.util.List;
public class SortByTest {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf()
.setAppName("spark_demo2")
.setMaster("local")
.set("spark.driver.bindAddress", "0.0.0.0"); // 指定网络地址
JavaSparkContext sc = new JavaSparkContext(sparkConf);
//JavaRDD<String> rdd = sc.textFile("hdfs://192.168.121.3:8020/data/num.txt");
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(9,10,1,2,3,4,5,6,7),2);
/*JavaRDD<Integer> rdd1 = rdd.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return Integer.parseInt(s);
}
});*/
// sortBy算子 有三个参数
// 第一个参数,call(),返回结果,填入new F有提示,后续需要导入具体实现函数
// 第二个参数,写入true或者false,控制升序或者降序
// 第三个参数,写入一个整数,控制分区数
JavaRDD<Integer> newrdd = rdd.sortBy(new Function<Integer, Object>() {
@Override
public Object call(Integer s) throws Exception {
return s;
}
}, false, 2);
System.out.println("带glom()方法:"+newrdd.glom().collect());
// 行动算子
List<Integer> res = newrdd.collect();
for (Integer re : res) {
System.out.println(re);
}
}
}
第二个,双字段排序(类):
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
// 1、user用户类
class User3 implements Serializable {
private int age;
private int score;
//构造函数,实现函数
public User3(int age, int score) {
this.age = age;
this.score = score;
}
public User3(){
}
public int getAge() { return age; }
public int getScore() { return score; }
@Override
public String toString() {
return "User3 = {" +
"age=" + age +
", score=" + score +
'}';
}
}
public class SortByTest3 {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf()
.setAppName("spark_demo2")
.setMaster("local")
.set("spark.driver.bindAddress", "0.0.0.0");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
// 初始化users类数据集,Arrays.asList方法快速简洁地新增数据
List<User3> users = Arrays.asList(
new User3(25, 85),
new User3(22, 90),
new User3(20, 90),
new User3(18, 95),
new User3(30, 85)
);
JavaRDD<User3> userRDD = sc.parallelize(users, 2);
// 转化算子
JavaRDD<User3> sortedRDD = userRDD.sortBy(
new Function<User3, String>() {
@Override
public String call(User3 user) throws Exception {
// 如果是多个排序,返回值必须包含多个变量,并按顺序拼接填入
// 如,先按成绩排序,再按年龄排序的话,返回值应该是按照成绩字段在前、年龄字段在后的方式返回
// 当前案例是返回一个String类型,就应该是return user.getScore()+"--"+user.getAge()+"";
return user.getScore()+user.getAge()+"";//因为前面是user类的字段属性是int类型,返回string类型的话加一个空字符串“”能自动转换
/**return user.getAge()+"--"+user.getScore()+"";
* 排序结果:
* User3 = {age=18, score=95}
* User3 = {age=20, score=90}
* User3 = {age=22, score=90}
* User3 = {age=25, score=85}
* User3 = {age=30, score=85}
*/
/**跟前一个,返回的值有前后顺序更改
* return user.getScore()+"--"+user.getAge()+"";
* 排序结果:
* User3 = {age=25, score=85}
* User3 = {age=30, score=85}
* User3 = {age=20, score=90}
* User3 = {age=22, score=90}
* User3 = {age=18, score=95}
*/
/**return user.getScore()+"";
* 排序结果:
* User3 = {age=25, score=85}
* User3 = {age=30, score=85}
* User3 = {age=22, score=90}
* User3 = {age=20, score=90}
* User3 = {age=18, score=95}
*/
}
},
true, // 保持升序(实际排序由CompositeKey的compareTo控制)
2
);
System.out.println("glom()结果:"+sortedRDD.glom().collect());
// 行动算子
List<User3> results = sortedRDD.collect();
System.out.println("排序结果:");
for (User3 result : results) {
System.out.println(result.getAge()+"-??-"+result.getScore()+"!!"); // 直接打印对象(已重写toString)
}
}
}