Flink Table/SQL 执行流程解析

发布于:2025-08-01 ⋅ 阅读:(11) ⋅ 点赞:(0)

1、完整执行流程概览

在这里插入图片描述

2. SQL到逻辑执行计划(Logical RelNode)

2.1 解析流程

  1. SQL解析:使用Apache Calcite解析SQL字符串复制下载

    // TableEnvironmentImpl.executeSql()
    public TableResult executeSql(String statement) {
            List<Operation> operations = getParser().parse(statement);
            Operation operation = operations.get(0);
            return executeInternal(operation);
    }
    

2.2 核心组件

  • CatalogManager:元数据管理
  • SqlNodeToOperationConversion:SQL到Operation转换

e.g.

protected Operation parse(String sql) {
   FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
   final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
   SqlNode node = parser.parse(sql);
   return SqlNodeToOperationConversion.convert(planner, catalogManager, node).get();
}
  • QueryOperationConverter:Operation到RelNode转换

e.g.

public RelNode visit(ProjectQueryOperation projection) {
   List<RexNode> rexNodes = convertToRexNodes(projection.getProjectList());
	 return relBuilder
                    .project(rexNodes, projection.getResolvedSchema().getColumnNames(), true)
                    .build();
}

3. 逻辑优化阶段(Optimizer)

3.1 优化流程

Logical RelNode → Rule-Based optimization → Cost-based optimization → Optimized logic plan

1、SQL 解析 → 生成 逻辑关系代数树(Logical RelNode)

2、基于规则的优化(RBO) → 重写逻辑计划

  • HepPlanner(RBO 优化器):按顺序应用规则集。
  • RelOptRule:优化规则基类,Flink 扩展了流处理专属规则。
FlinkStreamProgram
// project rewrite
    chainedProgram.addLast(
      PROJECT_REWRITE,
      FlinkHepRuleSetProgramBuilder.newBuilder
        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
        .add(FlinkStreamRuleSets.PROJECT_RULES)
        .build()
    )

3、基于代价的优化(CBO) → 选择最优物理计划

  • VolcanoPlanner:采用动态规划搜索最优计划树。
  • FlinkCost:定义 Flink 的代价计算逻辑(CPU/内存/网络权重)。
  • RelMetadataProvider:提供统计信息(需集成 Catalog 元数据,FlinkDefaultRelMetadataProvider
FlinkStreamProgram
chainedProgram.addLast(
      LOGICAL,
      FlinkVolcanoProgramBuilder.newBuilder
        .add(FlinkStreamRuleSets.LOGICAL_OPT_RULES)
        .setRequiredOutputTraits(Array(FlinkConventions.LOGICAL))
        .build()
    )

4、生成优化逻辑计划 → 转换为 Flink 物理算子

将优化后的 RelNode 转换为 Flink 的物理执行计划(Transformation)。

  • FlinkPhysicalRel:物理计划节点基类。
  • PlannerBase:物理计划生成入口,调用 translateToPlan()
  • Transformation:Flink 算子底层表示。

4、执行入口

Pipeline pipeline =
                execEnv.createPipeline(
                        transformations,
                        tableConfig.getConfiguration(),
                        defaultJobName,
                        jobStatusHookList);

JobClient jobClient = execEnv.executeAsync(pipeline);

网站公告

今日签到

点亮在社区的每一天
去签到