在大数据实时处理领域,Flink凭借其强大的流处理能力占据重要地位。在日常开发中,我们常常面临这样的需求:以原始流数据为基础,关联第三方存储引擎来补充数据属性。在传统OLAP或OLTP系统中,多表关联的原理易于理解,例如Apache Doris可将一张表加载到内存中完成数据关联 。然而,在流表上实现与原始流数据的关联操作,其原理却并不容易掌握。多数时候,开发者仅停留在使用层面,面对任务优化往往无从下手。本文将以JDBC维表关联为例,深入剖析Flink维表的实现原理,从执行计划到运行时层面,揭开其神秘面纱。
一、核心概念:理解Flink维表的基石
在深入探究Flink维表实现原理之前,我们需要先明晰几个核心概念,为后续的知识理解奠定基础。
1.1 维表
维表是数据仓库中的重要概念,其维度属性为数据观察提供了不同视角。在离线数仓建设中,通常将维表与事实表关联构建星型模型。在实时数仓领域,同样存在维表与事实表。其中,事实表常存储于Kafka等消息队列,而维表一般存储在MySQL、HBase等外部存储设备 。对于每条流式数据,可关联外部维表数据源,实现实时计算中的数据关联查询。值得注意的是,维表数据可能动态变化,在进行维表JOIN操作时,需明确记录关联维表快照的时刻。目前,Flink SQL的维表JOIN仅支持处理时间语义下当前时刻的维表快照关联,暂不支持基于事件时间语义的关联。
1.2 流表
传统SQL和关系代数主要针对有界数据设计,若直接应用于流计算会面临诸多问题。为解决这一难题,Flink引入动态流表概念,将无界数据流表示为随时间持续写入数据的表。在Flink体系中,“流”对应DataStream API概念,“动态流表”属于Flink SQL范畴,本质上二者都是无界数据集的不同表示形式。
1.3 异步I/O
在流处理应用中,与外部系统交互(如使用数据库数据扩充流数据)时,通信延迟会对整体性能产生显著影响。常规的同步交互方式,例如使用MapFunction访问外部数据库,函数会在发送请求后一直等待响应,大量时间消耗在等待过程中。而异步交互允许并行函数实例并发处理多个请求与接收响应,将等待时间分摊到多个请求,从而大幅提升流处理的吞吐量。
二、JDBC维表实现:从选择到实践
2.1 选择JDBC维表的原因
Flink官方提供了多种支持维表功能的connector,如jdbc、hive、hbase等。由于本地环境仅安装了MySQL,未部署hive、hbase等第三方存储引擎,因此选择JDBC实现的维表功能作为分析切入点。尽管不同connector的具体实现存在差异,但核心原理具有相似性,官方提供的connector地址可查阅更多相关信息。
2.2 前期准备:搭建测试环境
为了更直观地理解JDBC维表的实现原理,通过搭建一个测试任务来调试运行流程。该测试任务以Kafka作为数据源,MySQL作为维表和结果表存储,具体步骤如下:
- 引入依赖
在项目中引入相关依赖,若为普通项目运行测试,需添加以下依赖及打包插件:
<!-- flink-connector-jdbc_2.12 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.13.6</version>
<scope>test</scope>
</dependency>
<!-- mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers combine.children="append">
<!-- The service transformer is needed to merge META-INF/services files -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<!-- ... -->
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
若在源码中进行调试,则在flink-examples-table
模块的pom
文件中引入以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.14-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.14-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.23</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.14-SNAPSHOT</version>
</dependency>
- 测试任务代码实现
public class LookupFunctionExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // source only supports parallelism of 1
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String kafkaSource = "CREATE TABLE kafka_source (\n"
+ " -- declare the schema of the table\n"
+ " `id` INT,\n"
+ " `name` STRING,\n"
+ " `sex` STRING,\n"
+ " `age` INT, \n"
+ " `ts` TIMESTAMP(3) METADATA FROM 'timestamp',\n"
+ " `proctime` AS PROCTIME(), -- use a computed column to define a proctime attribute\n"
+ " WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND -- use a WATERMARK statement to define a ts attribute\n"
+ ") WITH (\n"
+ " -- declare the external system to connect to\n"
+ " 'connector' = 'kafka',\n"
+ " 'topic' = 'lookuptest',\n"
+ " 'scan.startup.mode' = 'latest-offset',\n"
+ " 'properties.bootstrap.servers' = 'localhost:9092',\n"
+ " 'format' = 'json' -- declare a format for this system\n"
+ ")";
String lookupSql = "CREATE TABLE lookup_tbl (\n"
+ " id BIGINT,\n"
+ " name STRING,\n"
+ " PRIMARY KEY (id) NOT ENFORCED\n"
+ ") WITH (\n"
+ " 'connector' = 'jdbc',\n"
+ " 'url' = 'jdbc:mysql://localhost:3306/flink_test',\n"
+ " 'username'='root',\n"
+ " 'password'='123456789',\n"
+ " 'table-name' = 'flink_lookup'\n"
+ ")";
String sinkSql = "CREATE TABLE sink_tbl (\n"
+ " `id` INT,\n"
+ " `name` STRING,\n"
+ " `sex` STRING,\n"
+ " `age` INT, \n"
+ " PRIMARY KEY (id) NOT ENFORCED\n"
+ ") WITH (\n"
+ " 'connector' = 'jdbc',\n"
+ " 'url' = 'jdbc:mysql://localhost:3306/flink_test',\n"
+ " 'username'='root',\n"
+ " 'password'='123456789',\n"
+ " 'table-name' = 'flink_test'\n"
+ ")";
String insertSql = "INSERT INTO sink_tbl SELECT a.id, b.name, a.age, a.sex FROM kafka_source a\n"
+ "LEFT JOIN lookup_tbl FOR SYSTEM_TIME AS OF a.proctime AS b \n"
+ "ON a.id = b.id";
tEnv.executeSql(kafkaSource);
tEnv.executeSql(lookupSql);
tEnv.executeSql(sinkSql);
tEnv.executeSql(insertSql);
env.execute("LookupFunction Test");
}
}
该代码通过Flink的Table API定义了Kafka数据源表、JDBC维表和JDBC结果表,并执行了从Kafka源表与JDBC维表的LEFT JOIN操作,将结果写入JDBC结果表。
三、JDBC维表源码剖析:核心功能的实现细节
在Flink的SQL Connector中,Source Function
不仅负责源表数据读取,还承担维表功能。其入口为org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory
,该类中定义了源表和目标表的创建方法,分别为createDynamicTableSource
和createDynamicTableSink
。其中,JdbcDynamicTableSource
实现了ScanTableSource
接口的源表功能和LookupTableSource
接口的维表功能,在运行阶段,这两个接口会被转换为TableFunction
执行,维表在运行时通过CommonExecLookupJoin
进行算子转换。
JdbcDynamicTableSource
的getLookupRuntimeProvider
方法是实现维表功能的关键,部分代码如下:
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
// JDBC只支持非嵌套查找键
String[] keyNames = new String[context.getKeys().length];
for (int i = 0; i < keyNames.length; i++) {
int[] innerKeyArr = context.getKeys()[i];
Preconditions.checkArgument(innerKeyArr.length == 1, "JDBC only support non-nested look up keys");
keyNames[i] = physicalSchema.getFieldNames()[innerKeyArr[0]];
}
final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
return TableFunctionProvider.of(
new JdbcRowDataLookupFunction(
options,
lookupOptions,
physicalSchema.getFieldNames(),
physicalSchema.getFieldDataTypes(),
keyNames,
rowType));
}
从上述代码可知,维表的数据读取逻辑主要在JdbcRowDataLookupFunction
中实现,其核心方法为open
和eval
。open
方法主要用于确保维表连接正常,并根据配置初始化缓存对象:
public void open(FunctionContext context) throws Exception {
try {
// 确保维表能正常连接
establishConnectionAndStatement();
// 初始化缓存,设置缓存最大值、过期时间
this.cache =
cacheMaxSize == -1 || cacheExpireMs == -1
? null
: CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
.maximumSize(cacheMaxSize)
.build();
} catch (SQLException sqe) {
throw new IllegalArgumentException("open() failed.", sqe);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
}
}
而eval
方法负责实际的数据读取操作,由于代码较长,核心逻辑如下:
public void eval(Object... keys) {
RowData keyRow = GenericRowData.of(keys);
if (cache != null) {
List<RowData> cachedRows = cache.getIfPresent(keyRow);
if (cachedRows != null) {
for (RowData cachedRow : cachedRows) {
collect(cachedRow);
}
return;
}
}
for (int retry = 0; retry <= maxRetryTimes; retry++) {
statement.clearParameters();
statement = lookupKeyRowConverter.toExternal(keyRow, statement);
try (ResultSet resultSet = statement.executeQuery()) {
if (cache == null) {
while (resultSet.next()) {
collect(jdbcRowConverter.toInternal(resultSet));
}
} else {
ArrayList<RowData> rows = new ArrayList<>();
while (resultSet.next()) {
RowData row = jdbcRowConverter.toInternal(resultSet);
rows.add(row);
collect(row);
}
rows.trimToSize();
cache.put(keyRow, rows);
}
}
break;
}
}
eval
方法首先检查缓存,若缓存中有数据则直接发送到下游算子;若无缓存数据,则根据查询条件从数据库获取数据,并在缓存存在时将数据存入缓存。
四、维表运行时剖析:从代码到执行的全流程追踪
通过对JDBC维表源码的分析,我们知道数据读取在eval
方法中完成。为进一步了解维表在运行时的调用过程,我们从Flink SQL的运行流程入手。Flink SQL的执行需经过词法解析、语法解析、抽象语法转换、优化器、物理执行计划等环节,我们在TableEnvironmentImpl#executeInternal
方法处设置断点进行调试。
调试发现,运行时的核心类为LookupJoinRunner
,其processElement
方法实现如下:
public void processElement(RowData in, Context ctx, Collector<RowData> out) throws Exception {
collector.setCollector(out);
collector.setInput(in);
collector.reset();
// 当对象重用被启用时,Fetcher已经复制了输入字段
fetcher.flatMap(in, getFetcherCollector());
if (isLeftOuterJoin && !collector.isCollected()) {
outRow.replace(in, nullRow);
outRow.setRowKind(in.getRowKind());
out.collect(outRow);
}
}
数据拉取操作主要在fetcher.flatMap
方法中,fetcher
对象在open
阶段实例化。进一步深入调试,获取userFunction
中generateFetcher
和generatedCollector
的核心代码:
4.1 generateFetcher#flatMap
public class LookupFunction$10
extends org.apache.flink.api.common.functions.RichFlatMapFunction {
@Override
public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws Exception {
org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) _in1;
int field$6;
boolean isNull$6;
isNull$6 = in1.isNullAt(0);
field$6 = -1;
if (!isNull$6) {
field$6 = in1.getInt(0);
}
resultConverterCollector$9.setCollector(c);
if (isNull$6) {
// skip
} else {
// 去调用维表中的eval
function_org$apache$flink$connector$jdbc$table$JdbcRowDataLookupFunction$096de1ff849635b06dca993bab61661b.eval(isNull$6 ?
// 因为关联条件是主键id 整数类型,在此处进行强转
null : ((java.lang.Integer) field$6));
}
}
}
4.2 generatedCollector#collect
public class JoinTableFuncCollector$14 extends org.apache.flink.table.runtime.collector.TableFunctionCollector {
@Override
public void collect(Object record) throws Exception {
org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) getInput();
org.apache.flink.table.data.RowData in2 = (org.apache.flink.table.data.RowData) record;
int field$11;
boolean isNull$11;
org.apache.flink.table.data.binary.BinaryStringData field$12;
boolean isNull$12;
isNull$12 = in2.isNullAt(1);
field$12 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
if (!isNull$12) {
field$12 = ((org.apache.flink.table.data.binary.BinaryStringData) in2.getString(1));
}
isNull$11 = in2.isNullAt(0);
field$11 = -1;
if (!isNull$11) {
field$11 = in2.getInt(0);
}
if (isNull$11) {
out.setField(0, null);
} else {
out.setField(0, field$11);
}
if (isNull$12) {
out.setField(1, null);
} else {
out.setField(1, field$12);
}
// 处理两个流的数据进行join,并将数据结果,继续发送到下游
joinedRow$13.replace(in1, out);
joinedRow$13.setRowKind(in1.getRowKind());
outputResult(joinedRow$13);
}
}
以上为整个维表的实现流程剖析