探秘Flink维表:从源码到运行时的深度解析

发布于:2025-06-25 ⋅ 阅读:(19) ⋅ 点赞:(0)

在大数据实时处理领域,Flink凭借其强大的流处理能力占据重要地位。在日常开发中,我们常常面临这样的需求:以原始流数据为基础,关联第三方存储引擎来补充数据属性。在传统OLAP或OLTP系统中,多表关联的原理易于理解,例如Apache Doris可将一张表加载到内存中完成数据关联 。然而,在流表上实现与原始流数据的关联操作,其原理却并不容易掌握。多数时候,开发者仅停留在使用层面,面对任务优化往往无从下手。本文将以JDBC维表关联为例,深入剖析Flink维表的实现原理,从执行计划到运行时层面,揭开其神秘面纱。

一、核心概念:理解Flink维表的基石

在深入探究Flink维表实现原理之前,我们需要先明晰几个核心概念,为后续的知识理解奠定基础。

1.1 维表

维表是数据仓库中的重要概念,其维度属性为数据观察提供了不同视角。在离线数仓建设中,通常将维表与事实表关联构建星型模型。在实时数仓领域,同样存在维表与事实表。其中,事实表常存储于Kafka等消息队列,而维表一般存储在MySQL、HBase等外部存储设备 。对于每条流式数据,可关联外部维表数据源,实现实时计算中的数据关联查询。值得注意的是,维表数据可能动态变化,在进行维表JOIN操作时,需明确记录关联维表快照的时刻。目前,Flink SQL的维表JOIN仅支持处理时间语义下当前时刻的维表快照关联,暂不支持基于事件时间语义的关联。

1.2 流表

传统SQL和关系代数主要针对有界数据设计,若直接应用于流计算会面临诸多问题。为解决这一难题,Flink引入动态流表概念,将无界数据流表示为随时间持续写入数据的表。在Flink体系中,“流”对应DataStream API概念,“动态流表”属于Flink SQL范畴,本质上二者都是无界数据集的不同表示形式。

1.3 异步I/O

在流处理应用中,与外部系统交互(如使用数据库数据扩充流数据)时,通信延迟会对整体性能产生显著影响。常规的同步交互方式,例如使用MapFunction访问外部数据库,函数会在发送请求后一直等待响应,大量时间消耗在等待过程中。而异步交互允许并行函数实例并发处理多个请求与接收响应,将等待时间分摊到多个请求,从而大幅提升流处理的吞吐量。

二、JDBC维表实现:从选择到实践

2.1 选择JDBC维表的原因

Flink官方提供了多种支持维表功能的connector,如jdbc、hive、hbase等。由于本地环境仅安装了MySQL,未部署hive、hbase等第三方存储引擎,因此选择JDBC实现的维表功能作为分析切入点。尽管不同connector的具体实现存在差异,但核心原理具有相似性,官方提供的connector地址可查阅更多相关信息。

2.2 前期准备:搭建测试环境

为了更直观地理解JDBC维表的实现原理,通过搭建一个测试任务来调试运行流程。该测试任务以Kafka作为数据源,MySQL作为维表和结果表存储,具体步骤如下:

  1. 引入依赖
    在项目中引入相关依赖,若为普通项目运行测试,需添加以下依赖及打包插件:
<!-- flink-connector-jdbc_2.12 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.13.6</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.6</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>1.13.6</version>
    <scope>test</scope>
</dependency>
<!-- mysql-connector-java -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.22</version>
</dependency>
<plugins>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <executions>
            <execution>
                <id>shade</id>
                <phase>package</phase>
                <goals>
                    <goal>shade</goal>
                </goals>
                <configuration>
                    <transformers combine.children="append">
                        <!-- The service transformer is needed to merge META-INF/services files -->
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                        <!-- ... -->
                    </transformers>
                </configuration>
            </execution>
        </executions>
    </plugin>
</plugins>

若在源码中进行调试,则在flink-examples-table模块的pom文件中引入以下依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.14-SNAPSHOT</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.14-SNAPSHOT</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.23</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>1.14-SNAPSHOT</version>
</dependency>
  1. 测试任务代码实现
public class LookupFunctionExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); // source only supports parallelism of 1
        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        String kafkaSource = "CREATE TABLE kafka_source (\n"
                + "  -- declare the schema of the table\n"
                + "  `id` INT,\n"
                + "  `name` STRING,\n"
                + "  `sex` STRING,\n"
                + "  `age` INT,   \n"
                + "  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',\n"
                + "  `proctime` AS PROCTIME(),    -- use a computed column to define a proctime attribute\n"
                + "  WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND    -- use a WATERMARK statement to define a ts attribute\n"
                + ") WITH (\n"
                + "  -- declare the external system to connect to\n"
                + "  'connector' = 'kafka',\n"
                + "  'topic' = 'lookuptest',\n"
                + "  'scan.startup.mode' = 'latest-offset',\n"
                + "  'properties.bootstrap.servers' = 'localhost:9092',\n"
                + "  'format' = 'json'   -- declare a format for this system\n"
                + ")";
        String lookupSql = "CREATE TABLE lookup_tbl (\n"
                + "  id BIGINT,\n"
                + "  name STRING,\n"
                + "  PRIMARY KEY (id) NOT ENFORCED\n"
                + ") WITH (\n"
                + "   'connector' = 'jdbc',\n"
                + "   'url' = 'jdbc:mysql://localhost:3306/flink_test',\n"
                + "   'username'='root',\n"
                + "   'password'='123456789',\n"
                + "   'table-name' = 'flink_lookup'\n"
                + ")";
        String sinkSql = "CREATE TABLE sink_tbl (\n"
                + "  `id` INT,\n"
                + "  `name` STRING,\n"
                + "  `sex` STRING,\n"
                + "  `age` INT,   \n"
                + "  PRIMARY KEY (id) NOT ENFORCED\n"
                + ") WITH (\n"
                + "   'connector' = 'jdbc',\n"
                + "   'url' = 'jdbc:mysql://localhost:3306/flink_test',\n"
                + "   'username'='root',\n"
                + "   'password'='123456789',\n"
                + "   'table-name' = 'flink_test'\n"
                + ")";
        String insertSql = "INSERT INTO sink_tbl SELECT a.id, b.name, a.age, a.sex FROM kafka_source a\n"
                + "LEFT JOIN lookup_tbl FOR SYSTEM_TIME AS OF a.proctime AS  b \n"
                + "ON a.id = b.id";
        tEnv.executeSql(kafkaSource);
        tEnv.executeSql(lookupSql);
        tEnv.executeSql(sinkSql);
        tEnv.executeSql(insertSql);
        env.execute("LookupFunction Test");
    }
}

该代码通过Flink的Table API定义了Kafka数据源表、JDBC维表和JDBC结果表,并执行了从Kafka源表与JDBC维表的LEFT JOIN操作,将结果写入JDBC结果表。

三、JDBC维表源码剖析:核心功能的实现细节

在Flink的SQL Connector中,Source Function不仅负责源表数据读取,还承担维表功能。其入口为org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory,该类中定义了源表和目标表的创建方法,分别为createDynamicTableSourcecreateDynamicTableSink。其中,JdbcDynamicTableSource实现了ScanTableSource接口的源表功能和LookupTableSource接口的维表功能,在运行阶段,这两个接口会被转换为TableFunction执行,维表在运行时通过CommonExecLookupJoin进行算子转换。

在这里插入图片描述

JdbcDynamicTableSourcegetLookupRuntimeProvider方法是实现维表功能的关键,部分代码如下:

@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
    // JDBC只支持非嵌套查找键
    String[] keyNames = new String[context.getKeys().length];
    for (int i = 0; i < keyNames.length; i++) {
        int[] innerKeyArr = context.getKeys()[i];
        Preconditions.checkArgument(innerKeyArr.length == 1, "JDBC only support non-nested look up keys");
        keyNames[i] = physicalSchema.getFieldNames()[innerKeyArr[0]];
    }
    final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
    return TableFunctionProvider.of(
            new JdbcRowDataLookupFunction(
                    options,
                    lookupOptions,
                    physicalSchema.getFieldNames(),
                    physicalSchema.getFieldDataTypes(),
                    keyNames,
                    rowType));
}

从上述代码可知,维表的数据读取逻辑主要在JdbcRowDataLookupFunction中实现,其核心方法为openevalopen方法主要用于确保维表连接正常,并根据配置初始化缓存对象:

public void open(FunctionContext context) throws Exception {
    try {
        // 确保维表能正常连接
        establishConnectionAndStatement();
        // 初始化缓存,设置缓存最大值、过期时间
        this.cache =
                cacheMaxSize == -1 || cacheExpireMs == -1
                        ? null
                        : CacheBuilder.newBuilder()
                                .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
                                .maximumSize(cacheMaxSize)
                                .build();
    } catch (SQLException sqe) {
        throw new IllegalArgumentException("open() failed.", sqe);
    } catch (ClassNotFoundException cnfe) {
        throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
    }
}

eval方法负责实际的数据读取操作,由于代码较长,核心逻辑如下:

public void eval(Object... keys) {
    RowData keyRow = GenericRowData.of(keys);
    if (cache != null) {
        List<RowData> cachedRows = cache.getIfPresent(keyRow);
        if (cachedRows != null) {
            for (RowData cachedRow : cachedRows) {
                collect(cachedRow);
            }
            return;
        }
    }

    for (int retry = 0; retry <= maxRetryTimes; retry++) {
        statement.clearParameters();
        statement = lookupKeyRowConverter.toExternal(keyRow, statement);
        try (ResultSet resultSet = statement.executeQuery()) {
            if (cache == null) {
                while (resultSet.next()) {
                    collect(jdbcRowConverter.toInternal(resultSet));
                }
            } else {
                ArrayList<RowData> rows = new ArrayList<>();
                while (resultSet.next()) {
                    RowData row = jdbcRowConverter.toInternal(resultSet);
                    rows.add(row);
                    collect(row);
                }
                rows.trimToSize();
                cache.put(keyRow, rows);
            }
        }
        break;
    }
}

eval方法首先检查缓存,若缓存中有数据则直接发送到下游算子;若无缓存数据,则根据查询条件从数据库获取数据,并在缓存存在时将数据存入缓存。

四、维表运行时剖析:从代码到执行的全流程追踪

通过对JDBC维表源码的分析,我们知道数据读取在eval方法中完成。为进一步了解维表在运行时的调用过程,我们从Flink SQL的运行流程入手。Flink SQL的执行需经过词法解析、语法解析、抽象语法转换、优化器、物理执行计划等环节,我们在TableEnvironmentImpl#executeInternal方法处设置断点进行调试。
在这里插入图片描述

调试发现,运行时的核心类为LookupJoinRunner,其processElement方法实现如下:

public void processElement(RowData in, Context ctx, Collector<RowData> out) throws Exception {
    collector.setCollector(out);
    collector.setInput(in);
    collector.reset();

    // 当对象重用被启用时,Fetcher已经复制了输入字段
    fetcher.flatMap(in, getFetcherCollector());

    if (isLeftOuterJoin && !collector.isCollected()) {
        outRow.replace(in, nullRow);
        outRow.setRowKind(in.getRowKind());
        out.collect(outRow);
    }
}

数据拉取操作主要在fetcher.flatMap方法中,fetcher对象在open阶段实例化。进一步深入调试,获取userFunctiongenerateFetchergeneratedCollector的核心代码:
在这里插入图片描述

4.1 generateFetcher#flatMap

public class LookupFunction$10
        extends org.apache.flink.api.common.functions.RichFlatMapFunction {
    @Override
    public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws Exception {
        org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) _in1;
        int field$6;
        boolean isNull$6;
        isNull$6 = in1.isNullAt(0);
        field$6 = -1;
        if (!isNull$6) {
            field$6 = in1.getInt(0);
        }
        resultConverterCollector$9.setCollector(c);
        if (isNull$6) {
            // skip
        } else {
            // 去调用维表中的eval
            function_org$apache$flink$connector$jdbc$table$JdbcRowDataLookupFunction$096de1ff849635b06dca993bab61661b.eval(isNull$6 ?
            // 因为关联条件是主键id 整数类型,在此处进行强转
            null : ((java.lang.Integer) field$6));
        }
    }
}

4.2 generatedCollector#collect

public class JoinTableFuncCollector$14 extends org.apache.flink.table.runtime.collector.TableFunctionCollector {
    @Override
    public void collect(Object record) throws Exception {
        org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) getInput();
        org.apache.flink.table.data.RowData in2 = (org.apache.flink.table.data.RowData) record;
        int field$11;
        boolean isNull$11;
        org.apache.flink.table.data.binary.BinaryStringData field$12;
        boolean isNull$12;
        isNull$12 = in2.isNullAt(1);
        field$12 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
        if (!isNull$12) {
            field$12 = ((org.apache.flink.table.data.binary.BinaryStringData) in2.getString(1));
        }
        isNull$11 = in2.isNullAt(0);
        field$11 = -1;
        if (!isNull$11) {
            field$11 = in2.getInt(0);
        }
        if (isNull$11) {
            out.setField(0, null);
        } else {
            out.setField(0, field$11);
        }
        if (isNull$12) {
            out.setField(1, null);
        } else {
            out.setField(1, field$12);
        }
        // 处理两个流的数据进行join,并将数据结果,继续发送到下游
        joinedRow$13.replace(in1, out);
        joinedRow$13.setRowKind(in1.getRowKind());
        outputResult(joinedRow$13);
    }
}

以上为整个维表的实现流程剖析


网站公告

今日签到

点亮在社区的每一天
去签到