flink 入门(一)
简介
阅读目标:
本文为入门级别文章,即阅读完下文你需要简单的知道 flink 是做什么用的,他的主要特点是什么。工欲善其事必先利其器更深入的了解,待熟练后再回头看看。
简而言之flink就是一个框架,你在框架里面编写代码(接收从某处来的数据->数据处理/转换->将处理好的数据输出到某地),将编写好的代码交给flink集群,由集群取调度任务去处理
阅读并实践本文可能会存在某些问题,你还需要阅读其他文章/博客加深对flink的理解(如下文中提到的某些概念:有界、无界等等
实际是因为我懒得写了。。。
Flink 起源于一个叫作 Stratosphere 的项目,它是由 3 所地处柏林的大学和欧洲其他一些大 学在 2010~2014 年共同进行的研究项目,由柏林理工大学的教授沃克尔·马尔科(Volker Markl) 领衔开发。2014 年 4 月,Stratosphere 的代码被复制并捐赠给了 Apache 软件基金会,Flink 就 是在此基础上被重新设计出来的。 在德语中,“flink”一词表示“快速、灵巧”。项目的 logo 是一只彩色的松鼠,当然了, 这不仅是因为 Apache 大数据项目对动物的喜好(是否联想到了 Hadoop、Hive?),更是因为 松鼠这种小动物完美地体现了“快速、灵巧”的特点。关于 logo 的颜色,还一个有趣的缘由: 柏林当地的松鼠非常漂亮,颜色是迷人的红棕色;而 Apache 软件基金会的 logo,刚好也是一 根以红棕色为主的渐变色羽毛。于是,Flink 的松鼠 Logo 就设计成了红棕色,而且拥有一个漂 亮的渐变色尾巴,尾巴的配色与 Apache 软件基金会的 logo 一致。这只松鼠色彩炫目,既呼应 了 Apache 的风格,似乎也预示着 Flink 未来将要大放异彩。
Flink 的官网主页地址:https://flink.apache.org/ 在 Flink 官网主页的顶部可以看到,项目的核心目标,是“数据流上的有状态计算”(Stateful Computations over Data Streams)。
很多专业词汇,我们从中至少可以提炼出一些容易理解的信息:Flink 是一个“框 架”,是一个数据处理的“引擎”;既然是“分布式”,当然是为了应付大规模数据的应用场景 了;另外,Flink 处理的是数据流。所以,Flink 是一个流式大数据处理引擎。 而“内存执行速度”和“任意规模”,突出了 Flink 的两个特点:速度快、可扩展性强— —这说的自然就是小松鼠的“快速”和“灵巧”了。
java 开发案例
以下案例为环境jdk1.8,且以下案例均为展示使用,目的是为了明白这两种方式的区别以及基本使用
- jdk 1.8
- maven
- win10
- flink 1.15.2
以下示例代码仅做入门级别使用,非生产可用。
pom文件
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> <flink.version>1.15.2</flink.version> <target.java.version>1.8</target.java.version> <scala.binary.version>2.12</scala.binary.version> <log4j.version>2.17.1</log4j.version> </properties> <dependencies> <!-- Apache Flink dependencies --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- connector kafka--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> <!-- 在此处 我添加了分词 --> <dependency> <groupId>org.ansj</groupId> <artifactId>ansj_seg</artifactId> <version>5.1.6</version> </dependency> <!-- Add logging framework, to produce console output when running in the IDE. --> <!-- These dependencies are excluded from the application JAR by default. --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${log4j.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> <scope>runtime</scope> </dependency> </dependencies>
Streaming(无界)
这里可以简单的理解为源源不断的数据,需要不断监听某个消息队列(kafka)或者其他来源。
public static final String HOST = "192.168.20.127";
public static final Integer PORT = 8888;
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = environment.socketTextStream(HOST, PORT);
SingleOutputStreamOperator<Tuple2<String, Long>> wordsCollector = source.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(new Tuple2<String, Long>(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordsCollector.keyBy(0).sum(1);
sum.print();
environment.execute();
}
Batch(有界)
这里可以简单的理解为批量数据处理。
kafka
运行类
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); //new 一个实例! Properties properties = new Properties(); //告诉程序我们要接收那台机器上生产的数据 properties.setProperty("bootstrap.servers", "master:9092"); //告诉程序开启分区,已经分区名称 properties.setProperty("group.id", "temp-1"); //属性key.serializer和value.serializer就是key和value指定的序列化方式。 properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //读取kafka数据的时候需要指定消费策略,如果不指定会使用auto.offset.reset设置 //earliest当各分区下有已提交的offset时,从提交的offset开始消费; //无提交的offset时,从头开始消费; //latest,当各分区下有已提交的offset时,从提交的offset开始消费; //无提交的offset时,消费新产生的该分区下的数据; //none,topic各分区都存在已提交的offset时,从offset后开始消费; //只要有一个分区不存在已提交的offset,则抛出异常 properties.setProperty("auto.offset.reset", "earliest"); //enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。 properties.setProperty("enable.auto.commit", "false"); //如果FlinkKafkaConsumer没有开启checkpoint功能,为了不重复读取 //这种方式无法实现Exactly-Once(只执行一次) FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer("test_topic", new SimpleStringSchema(), properties); DataStreamSource<String> lines = environment.addSource(flinkKafkaConsumer); SingleOutputStreamOperator<Tuple2<String, Long>> sum = lines.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> { List<Term> terms = ToAnalysis.parse(line).getTerms(); terms.forEach(item -> { collector.collect(new Tuple2<>(item.getName(), 1L)); }); }).returns(Types.TUPLE(Types.STRING, Types.LONG)).keyBy(0).sum(1); sum.print(); environment.execute("word-coun-kafka"); }
任务提交
提交有两种方式
web-ui界面
访问部署服务器 ip:8081
点击 Submit new Job
点击Add new
编辑Entry class与Parallelism等
- Entry class 为入口类 即为上文中的运行
main()
函数的类的全限定名
- Entry class 为入口类 即为上文中的运行
点击Submit
点击Jobs -> Running Jobs 查看
命令行
如果要把job提交到jobmanager,应该在jobmanager服务器上提交
flink 安装与部署
Flink的安装和部署主要分为本地模式和集群模式,其中本地模式只需直接解压就可以使用,不以修改任何参数,一般在做一些简单测试的时候使用。
集群模式包含Standalone、Flink on Yarn等模式,适合在生产环境下面使用,且需要修改对应的配置 参数。
flink 下载
## 官方版本(可能下载速度慢)
curl -O https://dlcdn.apache.org/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
## 腾讯云镜像(推荐,国内速度快)
curl -O http://mirrors.cloud.tencent.com/apache/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
下载完成解压,解压后目录如下
CentOS/Kernel环境
系统环境
以下均基于 Kernel
- CentOS Linux release 7.9.2009 (Core)
- Linux version 3.10.0-1160.el7.x86_64
- gcc version 4.8.5 20150623 (Red Hat 4.8.5-44) (GCC)
- open-jdk 11
- 大部分过程中使用root用户。请在生产环境或特殊环境注意用户切换。本文不在linux用户做过多赘述。
本地模式
自己是jobmanager也是taskmanager(会话模式)
配置文件详解
- 修改
conf/flink-conf.yaml
cd conf vim flink-conf.yaml
# 此处修改集群时需要修改 jobmanager.rpc.address: localhost # 默认1623 jobmanager.rpc.port: 6123 # 任务管理默认 jobmanager.memory.process.size: 1600m taskmanager.memory.process.size: 1728m # 任务槽 资源(并行执行 相当于 组) taskmanager.numberOfTaskSlots: 1 # 默认并行度 parallelism.default: 1 # web界面默认端口 需要修改时 解开注释 #rest.port: 8081
master
当前jobmanager(默认localhost)以及webui端口(默认8081)
works
单节点启动默认这里面没有东西
- 修改
启动脚本
# 进入flink bin目录 cd bin # 单节点集群启动 ./start-cluster.sh
访问服务器ip加8081(默认)
停止服务
# 进入flink bin目录 cd bin # 单节点集群启动 ./stop-cluster.sh
集群
至少需要三台服务器。一台jobmanager
,两台taskmanager
,三台服务器之间需要配置免密登录,这里为了方便,我修改了hosts文件,三台服务器分别为
master
、slave0
、slave1
。(会话模式)
修改hosts(ip地址 主机名/域名 (主机别名))
自己的服务器IP-1 master 自己的服务器IP-2 slave0 自己的服务器IP-3 slave1
使配置文件生效请参考 CentOS修改hosts
服务器之间免密登录
请自行百度/google(master 最好也将自身产生的秘钥导入自身,不导也可以会导致每次启动flink需要输入本机密码)
修改配置文件
master 服务器
flink-conf.yaml
# 用于节点间通信 jobmanager.rpc.address: 0.0.0.0
master
master:8081
works
# 另外两台机器 slave0 slave1
slave0 服务器
flink-conf.yaml
jobmanager.rpc.address: master # 不改此处 集群运行后 solt为0 jobmanager.bind-host: 0.0.0.0
master
master:8081
works
slave0 slave1
slave1 服务器
flink-conf.yaml
jobmanager.rpc.address: master # 不改此处 集群运行后 solt为0 jobmanager.bind-host: 0.0.0.0
master
master:8081
works
slave0 slave1
修改环境变量
master/slave0/slave1 分别执行以下操作(因文件都是由master分发,所以目录位置应都一致,当然可自行修改)
## 修改环境变量 vim /etc/profile ## 新增以下内容 export FLINK_HOME=/software/flink-cluster/flink/ export PATH=$PATH:$FLINK_HOME/bin ## 使环境变量生效 source /etc/profile
运行集群
在
master
bin目录下执行,看到以下几截图后集群启动成功,即可访问webUI界面./start-cluster.sh
且执行
jps
命令后且
slave0
与slave1
执行jps
后web ui 界面
jdk安装(多版本切换)
## 下载openjdkjdk
curl -O https://download.java.net/openjdk/jdk11/ri/openjdk-11+28_linux-x64_bin.tar.gz
## 解压
tar zxf openjdk-11+28_linux-x64_bin.tar.gz
## 添加jdk11 /opt/openjdk11/jdk-11/ 应为压缩包实际解压路径
sudo update-alternatives --install /usr/bin/java java /home/flink/opt/jdk-11/bin/java 1
## 添加jdk11 /opt/openjdk11/jdk-11/ 应为压缩包实际解压路径
sudo update-alternatives --install /usr/bin/javac javac /home/flink/opt/jdk-11/bin/javac 1
## 切换
sudo update-alternatives --config java
sudo update-alternatives --config javac
docker for windows
windows 10 专业版 21H2 WSL2
运行Docker Desktop Installer.exe
参考链接
用户提权
su
chmod -v u+w /etc/sudoers
vim /etc/sudoers
root ALL=(ALL) ALL
chmod -v u-w /etc/sudoers
exit