Apache SeaTunnel Flink引擎执行流程源码分析

发布于:2025-06-25 ⋅ 阅读:(16) ⋅ 点赞:(0)

目录

1. 任务启动入口

2. 任务执行命令类:FlinkTaskExecuteCommand

3. FlinkExecution的创建与初始化

3.1 核心组件初始化

3.2 关键对象说明

4. 任务执行:FlinkExecution.execute()

5. Source处理流程

5.1 插件初始化

5.2 数据流生成

6. Transform处理流程

6.1 插件初始化

6.2 转换执行

7. Sink处理流程

7.1 插件初始化

7.2 数据输出

执行流程全景图

关键设计总结


        本文基于SeaTunnel 2.3.x源码分析Flink引擎执行流程,以seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java为入口,完整解析Flink引擎的执行流程。


1. 任务启动入口

启动类核心代码:

// 1. 初始化Flink启动命令参数
FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
​
// 2. 执行SeaTunnel.run()回调Flink执行命令
SeaTunnel.run(flinkCommandArgs.buildCommand());
  • buildCommand()返回FlinkTaskExecuteCommand实例

  • SeaTunnel.run()最终调用FlinkTaskExecuteCommand.execute()


2. 任务执行命令类:FlinkTaskExecuteCommand

核心执行流程:

public void execute() {
    // 1. 解析配置文件生成Config对象
    Config config = ConfigBuilder.of(configFile);
    
    // 2. 创建FlinkExecution实例
    FlinkExecution seaTunnelTaskExecution = new FlinkExecution(config);
    
    // 3. 执行任务
    seaTunnelTaskExecution.execute();
}

3. FlinkExecution的创建与初始化
3.1 核心组件初始化
public FlinkExecution(Config config) {
    // 创建三大处理器
    this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(
        jarPaths, config.getConfigList(Constants.SOURCE), jobContext);
    
    this.transformPluginExecuteProcessor = new TransformExecuteProcessor(
        jarPaths, 
        TypesafeConfigUtils.getConfigList(config, Constants.TRANSFORM, Collections.emptyList()),
        jobContext);
    
    this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(
        jarPaths, config.getConfigList(Constants.SINK), jobContext);
    
    // 初始化Flink执行环境
    this.flinkRuntimeEnvironment = FlinkRuntimeEnvironment.getInstance(
        this.registerPlugin(config, jarPaths));
    
    // 为处理器注入运行时环境
    this.sourcePluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);
    this.transformPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);
    this.sinkPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);
}
3.2 关键对象说明
组件 类型 功能
sourcePluginExecuteProcessor SourceExecuteProcessor 处理数据源接入
transformPluginExecuteProcessor TransformExecuteProcessor 处理数据转换逻辑
sinkPluginExecuteProcessor SinkExecuteProcessor 处理数据输出
flinkRuntimeEnvironment FlinkRuntimeEnvironment 封装Flink StreamExecutionEnvironment

4. 任务执行:FlinkExecution.execute()

DAG构建流程:

public void execute() {
    // 初始

网站公告

今日签到

点亮在社区的每一天
去签到