2.1 Spark中WordCount的执行流程
Spark的WordCount示例
package com.chh.spark.core.wc
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_WordCount {
def main(args: Array[String]): Unit = {
// 创建Spark运行配置对象
val sparkConf = new SparkConf().setMaster( "local[*]").setAppName("WordCount")
// 创建Spark的上下文对象
val sc = new SparkContext(sparkConf)
// 读取文件
val source = sc.textFile("input/word.txt")
// 将文件中的数据进行分词
val word = source.flatMap(_.split(","))
// 将单词转为(x,1)
val wordToOne = word.map((_, 1))
// 将转换后的数据进行分组求和
val sumByKey = wordToOne.reduceByKey(_ + _)
//将数据收集到Drive端
val result = sumByKey.collect()
//对结果进行打印
result.foreach(println)
//关闭Spark连接
sc.stop()
}
}
WordCount中各个对象和方法的主要作用
SparkConf
创建SparkConf对象,设置Spark应用的配置信息。setAppName() 设置Spark应用程序在运行中的名字;如果是集群运行,就可以在监控页面直观看到我们运行的job任务。setMaster() 设置运行模式、是本地运行,设置为local即可;如果是集群运行,就可以设置程序要连接的Spark集群的master节点的url(localhost:7077)。
SparkContext
创建SparkContext对象, 在Spark中,SparkContext是Spark所有功能的一个入口,你无论是用java、scala,甚至是python编写,都必须要有一个SparkContext,它的主要作用,包括初始化Spark应用程序所需的一些核心组件,包括调度器(DAGSchedule、TaskScheduler),还会去Spark Master节点上进行注册等。所以SparkContext在Spark应用中是很重要的一个对象。
textFile
源码注释:
Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings. The text files must be encoded as UTF-8.
可以从本地文件系统或者HDFS上读取文件,将其转换为String类型的RDD数据集。注意:文件编码格式必须为UTF-8。
flatMap
A list of partitions
A function for computing each split
A list of dependencies on other RDD
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
它是RDD数据集对象的函数,作用于每个分区上,作用是将一条数据展开,也就是我们常说的一进多出。
map
学过python或者Java的人应该不陌生这个函数,典型的一进一出函数
reduceByKey
Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.
通过对key进行分组,然后求出每个key的value和,类似于Mapreduce中的combiner,所以也会产生shuffle。
collect
Return an array that contains all of the elements in this RDD. Note: This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory.
说白了,就是将executor中的计算结果,发送到driver端的内存中,所以使用这个方法时,一定注意数据量的大小。
stop
关闭spark的连接
执行流程
2.2对比flink中WordCount的批处理
package com.zhisheng.data.sources;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class Main {
public static void main(String[] args) throws Exception{
// 创建flink流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置flink应用的并行度
env.setParallelism(1);
// 设置flink运行时的处理模式为批处理
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStreamSource<String> dataStreamSource = env.readTextFile("books/word.txt");
dataStreamSource
.flatMap(
(String s, Collector<String> collector) -> {for (String string : s.split(",")) collector.collect(string);}
).returns(String.class)
.map(s -> Tuple2.of(s, 1)).returns(new TypeHint<Tuple2<String, Integer>>() {
})
.keyBy(t-> t.f0).sum(1)
.print();
env.execute("Flink add data source");
}
}
两者一对比就发现,Java语言处理数据时语法过于繁琐了,而Scala就显得简洁很多。当然,flink也是支持Scala的,而且flink1.16版本中优化了flink批处理的相关模块,flinkSQL未来或许可以如同sparkSQL一样无缝衔接hiveSQL。最最重要的,flink的流批一体已经很成熟了,一套架构,两种模式自动探查,牛逼~~