Flink快速上手
对 Flink 有了基本的了解后,接下来就要理论联系实际,真正上手写代码了。Flink 底层是以 Java 编写的,并为开发人员同时提供了完整的 Java 和 Scala API。在本书中,代码示例将全部用 Java 实现;而在具体项目应用中,可以根据需要选择合适语言的API 进行开发。在这一章,我们将会以大家最熟悉的 IntelliJ IDEA 作为开发工具,用实际项目中最常见的Maven 作为包管理工具,在开发环境中编写一个简单的 Flink 项目,实现零基础快速上手。
环境准备
工欲善其事,必先利其器。在进行代码的编写之前,先将我们使用的开发环境和工具介绍一下:
1.系统环境为 Windows 10。
2.提前安装 Java 8。
3.集成开发环境(IDE)使用 IntelliJ IDEA,具体的安装流程参见 IntelliJ 官网
4. 安装 IntelliJ IDEA 之后,还需要安装一些插件——Maven 和Git。Maven 用来管理项目依赖;通过 Git 可以轻松获取我们的示例代码,并进行本地代码的版本控制。
创建工程
添加项目依赖
<dependencies>
<!-- 引入 Flink 相关依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 引入日志管理相关依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
配置日志管理
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
批处理
1.工程根目录下新建一个 input 文件夹,并在下面创建文本文件 words.txt
在 words.txt 中输入一些文字,例如:
hello world
hello flink
hello java
2.代码实现
package com.atguigu.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.lang.reflect.Type;
public class BatchWorkCount {
public static void main(String[] args) throws Exception {
//执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//按行读取数据
DataSource<String> lineDataSource = env.readTextFile("input/words.txt");
FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] worsd = line.split(" ");
for (String word : worsd
) {
out.collect(Tuple2.of(word, 1L));
}
})
.returns(Types.TUPLE(Types.STRING, Types.LONG));
//按照word分组
UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);
//分组内进行聚合计算
AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
sum.print();
}
}
这边注意的是所有导的包都得是org.apache.flink.api.*下面的
输出结果:
(flink,1)
(world,1)
(hello,3)
(java,1)
流处理
我们已经知道,用DataSet API 可以很容易地实现批处理;与之对应,流处理当然可以用DataStream API 来实现。对于 Flink 而言,流才是整个处理逻辑的底层核心,所以流批统一之后的DataStream API 更加强大,可以直接处理批处理和流处理的所有场景。
DataStream API 作为“数据流”的处理接口,又怎样处理批数据呢?
回忆一下上一章中我们讲到的 Flink 世界观。在 Flink 的视角里,一切数据都可以认为是流,流数据是无界流,而批数据则是有界流。所以批处理,其实就可以看作有界流的处理。
对于流而言,我们会在获取输入数据后立即处理,这个过程是连续不断的。当然,有时我们的输入数据可能会有尽头,这看起来似乎就成了一个有界流;但是它跟批处理是截然不同的
——在输入结束之前,我们依然会认为数据是无穷无尽的,处理的模式也仍旧是连续逐个处理。下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。
读取文件
1.准备文件(上同)
2.代码实现
package com.atguigu.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class BounderStreamWorkCount {
public static void main(String[] args) throws Exception {
//创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文件
DataStreamSource<String> lineDataStreamSource = env.readTextFile("input/words.txt");
SingleOutputStreamOperator<Tuple2<String, Long>> workAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
})
.returns(Types.TUPLE(Types.STRING, Types.LONG));
//分组
KeyedStream<Tuple2<String, Long>, String> WorkAndOneKeyedStream = workAndOneTuple.keyBy(data -> data.f0);
//求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = WorkAndOneKeyedStream.sum(1);
//打印
sum.print();
//启动执行
env.execute();
}
}
输出实现:
5> (hello,1)
5> (hello,2)
3> (java,1)
9> (world,1)
5> (hello,3)
13> (flink,1)
读取文件流
package com.atguigu.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWorkCount {
public static void main(String[] args) throws Exception {
//创建流式处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//从参数中提取主机名和端口号
ParameterTool parameterTool = ParameterTool.fromArgs(args);
// String host = parameterTool.get("host");
// Integer port = parameterTool.getInt("port");
//读取文本流
DataStreamSource<String> lineDataStream = env.socketTextStream("hadoop102", 7777);
SingleOutputStreamOperator<Tuple2<String, Long>> workAndOneTuple = lineDataStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
})
.returns(Types.TUPLE(Types.STRING, Types.LONG));
//分组
KeyedStream<Tuple2<String, Long>, String> WorkAndOneKeyedStream = workAndOneTuple.keyBy(data -> data.f0);
//求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = WorkAndOneKeyedStream.sum(1);
//打印
sum.print();
//启动执行
env.execute();
}
}
Flink部署
1.上传解压
在 hadoop102 节点服务器上创建安装目录/opt/module,将 flink 安装包放在该目录下,并执行解压命令,解压至当前目录。
tar -zxvf flink-1.13.0-bin-scala_2.12.tgz -C /opt/module/
2.启动
bin/start-cluster.sh Starting cluster.
Starting standalonesession daemon on host hadoop102.
Starting taskexecutor daemon on host hadoop102.
3.关闭
bin/stop-cluster.sh
4.修改配置文件
1)进入 conf 目录下,修改 flink-conf.yaml 文件,修改 jobmanager.rpc.address 参数hadoop102
cd conf/
vim flink-conf.yaml # JobManager 节点地址.
jobmanager.rpc.address: hadoop102
2)修改works配置
vim workers
hadoop103
hadoop104
3)分发文件
xsync flink/
bin/start-cluster.sh Starting cluster.
启动集群
Starting standalonesession daemon on host hadoop102.
Starting taskexecutor daemon on host hadoop103.
Starting taskexecutor daemon on host hadoop104.