目录
2. 任务执行命令类:FlinkTaskExecuteCommand
4. 任务执行:FlinkExecution.execute()
本文基于SeaTunnel 2.3.x源码分析Flink引擎执行流程,以seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java为入口,完整解析Flink引擎的执行流程。
1. 任务启动入口
启动类核心代码:
// 1. 初始化Flink启动命令参数 FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs(); // 2. 执行SeaTunnel.run()回调Flink执行命令 SeaTunnel.run(flinkCommandArgs.buildCommand());
buildCommand()返回FlinkTaskExecuteCommand实例SeaTunnel.run()最终调用FlinkTaskExecuteCommand.execute()
2. 任务执行命令类:FlinkTaskExecuteCommand
核心执行流程:
public void execute() {
// 1. 解析配置文件生成Config对象
Config config = ConfigBuilder.of(configFile);
// 2. 创建FlinkExecution实例
FlinkExecution seaTunnelTaskExecution = new FlinkExecution(config);
// 3. 执行任务
seaTunnelTaskExecution.execute();
}
3. FlinkExecution的创建与初始化
3.1 核心组件初始化
public FlinkExecution(Config config) {
// 创建三大处理器
this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(
jarPaths, config.getConfigList(Constants.SOURCE), jobContext);
this.transformPluginExecuteProcessor = new TransformExecuteProcessor(
jarPaths,
TypesafeConfigUtils.getConfigList(config, Constants.TRANSFORM, Collections.emptyList()),
jobContext);
this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(
jarPaths, config.getConfigList(Constants.SINK), jobContext);
// 初始化Flink执行环境
this.flinkRuntimeEnvironment = FlinkRuntimeEnvironment.getInstance(
this.registerPlugin(config, jarPaths));
// 为处理器注入运行时环境
this.sourcePluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);
this.transformPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);
this.sinkPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);
}
3.2 关键对象说明
| 组件 | 类型 | 功能 |
|---|---|---|
sourcePluginExecuteProcessor |
SourceExecuteProcessor |
处理数据源接入 |
transformPluginExecuteProcessor |
TransformExecuteProcessor |
处理数据转换逻辑 |
sinkPluginExecuteProcessor |
SinkExecuteProcessor |
处理数据输出 |
flinkRuntimeEnvironment |
FlinkRuntimeEnvironment |
封装Flink StreamExecutionEnvironment |
4. 任务执行:FlinkExecution.execute()
DAG构建流程:
public void execute() {
// 初始