## 部署模式和运行模式
### 部署模式
- 本地local
- 单机无需分布式资源管理
- 集群
- 独立集群standalone
- 需要flink自身的任务管理工具
- jobmanager接收和调度任务
- taskmanager执行
- on其他资源管理工具yarn/k8s
- yarn
- 注意区分flink的和yarn的taskmanager
### 运行模式
- session
- 先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。 集群启动时所有资源就都已经确定,所有提交的作业会竞争集群中的资源。
- 适合任务规模小,执行时间短的大量作业。
- 作业执行环境会一直保留在集群上,直到会话被人为终止。
- per-job
- 每次提交的job都会创建一个独立的作业执行环境,该作业执行环境仅用于通过客户端提交上来的特定的那个作业。
- 适合执行时间长的少量作业
- 当作业完成后,作业执行环境会被自动释放,集群关闭,资源释放
- application-mode
- 为解决客户端执行代码,客户端提交任务给jobmanager占用带宽而生
### 不需要外部资源管理
- per job-一个任务(作业)起一个集群 (提交作业后建立集群)
- standalone-一个集群跑多个任务,手动分配集群整个的资源,启动起来就已经固定了。任务从集群中取得的资源是否能调整?
### 需要外部资源管理
- application-一个任务(作业/应用(同时提交、有依赖关系的多个作业))起一个集群 (提交作业后建立集群)
- session-一个集群跑多个任务,yarn根据配置生成集群?并且将集群里各个任务对资源的使用情况来分配资源?
## 分层API
- sql 与tableAPI类似,以sql查询表达式的形式表现程序
- table API 以表为中心(处理结构化数据)
- 遵循关系模型(像关系型数据库中的表)
- 提供可比较操作(select\project\join\group by\aggregate)
- 通常以方法链的形式调用(提供了一个声明式的接口来处理批处理和流处理任务),语法上更接近于编程语言
- sparkstreaming API 封装处理函数,提供通用模块:
- 转换 transformations(map\flatmap)
- 连接 joins
- 聚合 aggregations
- 窗口 windows
- 有状态的流处理 处理函数
## 运行架构(standalone会话模式)
- jobManager 作业管理器控制执行应用(管理应用里的任务执行和调度)
- jobmaster 负责处理单独的作业(job),等同于早期flink版本中的jobmanager
- 接收要执行的应用
- jobGraph->excutionGraph(包含了所有可并发执行的任务
- 向resourceManager发送请求,申请执行所需资源
- 获取足够资源后,分发执行图到运行它们的taskmanager上
- 运行过程中,负责所有需要中央协调的操作(checkpoints的协调)
- resourceManager 资源管理器
- 资源(taskManager的任务槽task slots)的分配和管理
- 任务槽-资源调配的最小单元,包含了机器用来执行计算的一组CPU和内存
- 每个任务都要分配到一个slot上执行
- 注意区分flink和yarn的resourceManager
- dispatcher 分发器
- 提供一个reset接口,用来提交应用
- 为每一个新提交的作业启动一个新的jobMaster组件
- 启动Web UI
- 非必需架构,有些部署模式下会被省掉(本地模式,一个JVM,不涉及集群部署,所以不需要分发器;kurbernets有自己的controller控制器管理Pod的生命周期,所以替代分发器的部分功能)。
- taskManager任务管理器
- 是flink的工作进程
- 对数据流做具体的计算
- 集群中至少有一个
- 每个包含了一定量的任务槽
- 任务槽的数量限制taskManager处理任务的并行数
- 启动后,先向资源管理器注册它的slots
- 收到资源管理器的反馈指令后,将至少一个槽位提供给jobMaster调用
- jobMaster来分配任务
- taskManager可以缓冲数据,并和运行同一应用的taskManager交换数据
## 并行度和任务槽
- 基本概念
- 算子 是Flink数据处理的基本单元,每个算子负责执行特定的任务。通过组合不同的算子,可以构建复杂的数据处理逻辑。常见的算子包括 Map、Filter、FlatMap、KeyBy、Reduce、Window、Join、Union 和 Sink 、source等。
- source \ sink 类似于datax从哪个地址获取数据输送给哪个地址
- map 一对一 类型转换、四则运算 对数据流中的每个元素应用一个函数,生成一个新的元素
- flatMap 一对多
- keyby 分组
- reduce 聚合
- 并行子任务
- 一个算子任务被拆分成多个并行子任务,再分发到不同节点,实现并行计算。
- 如果处理的数据量大,把一个算子操作复制到多个节点,数据来了后可以到任意一个节点执行。
- 并行是将一个大任务分给两个人同时做
- 比如一个平台上的不同大屏,两个人同时做。这两个人都做过这个项目下的大屏,再来了新的数据新的大屏任务,两个人都能做。(两个人间可能有信息不同步问题,但是算子操作是复制粘贴的,一定同步,所以给哪个节点做都行)
- 并发是一个人同时做好几件事
- 并行度
- 一个特定算子的子任务的个数
- 一段流处理程序的并行度== 所有算子中最大的并行度
- 设置
- #算子层面并行度设置 考虑到动态扩容,通常使用单个算子设置并行度stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
- 全局层面设置 env.setParallelism(2);
- 配置文件设置
- 集群 flink-conf.yaml 默认1
- 开发环境无配置文件 默认为当前机器CPU核数
- 并行数据流
- 包含并行子任务的数据流
- 需要多个分区来分配并行任务
## 算子链
- 算子间的数据传输
- 一对一
- 充分区 类似于shuffle
- 合并算子链
- 一对一+算子并行度相同
## 任务槽
- 每个takManager是一个JVM进程,可以启动多个独立线程,并行执行多个子任务
- 计算资源有限,并行任务越多,每个线程资源越少。
```
public class RealTimeWordCountFromPGToMySQL {
public static void main(String[] args) throws Exception {
// 创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(2); // 设置全局并行度为2
// 定义 PostgreSQL CDC 源
Properties props = new Properties();
props.setProperty("plugin.name", "pgoutput"); // PostgreSQL 的逻辑解码插件
props.setProperty("database.hostname", "localhost");
props.setProperty("database.port", "5432");
props.setProperty("database.username", "yourusername");
props.setProperty("database.password", "yourpassword");
props.setProperty("database.server.id", "1888");
props.setProperty("database.server.name", "dbserver1");
props.setProperty("table.whitelist", "public.your_table");
DataStream<String> source = PostgreSQLSource.<String>builder()
.hostname(props.getProperty("database.hostname"))
.port(Integer.parseInt(props.getProperty("database.port")))
.database(props.getProperty("database.server.name"))
.tableList(props.getProperty("table.whitelist"))
.username(props.getProperty("database.username"))
.password(props.getProperty("database.password"))
.deserializer(new DebeziumDeserializationSchema<String>() {
@Override
public String deserialize(ChangeRecord changeRecord) {
if (changeRecord instanceof DataChangeRecord) {
DataChangeRecord dataChangeRecord = (DataChangeRecord) changeRecord;
return dataChangeRecord.after().getField(0).toString();
}
return null;
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public TypeInformation<String> getProducedType() {
return Types.STRING();
}
})
.build()
.addSource();
// 使用 flatMap 算子将文本拆分为单词
DataStream<Tuple2<String, Integer>> words = source
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : line.split(" ")) {
out.collect(new Tuple2<>(word, 1));
}
}
});
// 使用 keyBy 算子按单词分组
DataStream<Tuple2<String, Integer>> wordCounts = words
.keyBy(value -> value.f0)
.sum(1);
// 定义 MySQL 数据目标
JdbcStatementBuilder<Tuple2<String, Integer>> statementBuilder = (ps, t) -> {
ps.setString(1, t.f0);
ps.setInt(2, t.f1);
};
JdbcConnectionOptions jdbcOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/yourdb")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("yourusername")
.withPassword("yourpassword")
.build();
wordCounts.addSink(JdbcSink.sink(
"INSERT INTO word_count (word, count) VALUES (?, ?)",
statementBuilder,
jdbcOptions
));
// 执行任务
env.execute("Real-Time Word Count from PG to MySQL Example");
}
}
```