Temporal Join,一探究竟

发布于:2025-06-25 ⋅ 阅读:(17) ⋅ 点赞:(0)

flink sql的join分为Regular Joins、Interval Joins、Temporal Joins和Lookup Join。

尽管官方文档中对上述几种join类型的概念和原理有很好的解释,但针对Temporal Joins仍有以下一些疑惑。

  • 1.官方文档中给出的tempoal join中语法如下,但紧随着分别说明只有基于事件时间方式使用可以FOR SYSTEM_TIME AS OF leftTable.rowtime语法,而基于处理时间方式却只能使用temporal table function join语法?
SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1
  • 2.基于事件时间和处理时间方式join的实现细节是什么?如何理解不同时间语义下的“时态”?
  • 3.join结果是何时、如何下发的?
  • 4.表中的记录在状态中如何存储?状态如何清理?
  • 5.如何判断不需要(过期)记录的版本?
  • 6.基于处理时间的temporal join和lookup join的区别是什么?

带着上述问题,开始尝试从源码中(1.16)获取答案。

temporal join两种时间属性的Operator实现类在StreamExecTemporalJoin#createJoinOperator方法中初始化。

ps: 一件事找到正确入口便已成功了30%,那么这个入口是如何找到的?在源码中通过关键字硬硬硬搜出来的……

private TwoInputStreamOperator<RowData, RowData, RowData> createJoinOperator(...) {
    // ...
    // 根据table.exec.state.ttl配置项计算最小、大留存时间,temporal join中将会根据此来计算状态的留存时间
    long minRetentionTime = config.getStateRetentionTime();
    long maxRetentionTime = TableConfigUtils.getMaxIdleStateRetentionTime(config);
    if (rightTimeAttributeIndex >= 0) {
        // 基于事件时间的temporal join
        return new TemporalRowTimeJoinOperator(...);
    } else {
        // 基于处理时间的temporal join
        if (isTemporalFunctionJoin) {
            // 使用temporal table function join时,初始化TemporalProcessTimeJoinOperator
            return new TemporalProcessTimeJoinOperator(...);
        } else {
            // 否则抛出异常,官方原因如下
            // The exsiting TemporalProcessTimeJoinOperator has already supported temporal table join. 
            // However, the semantic of this implementation is problematic, because the join processing for left stream doesn't wait for the complete snapshot of temporal table,
            // this may mislead users in production environment. See FLINK-19830 for more details.
            throw new TableException("Processing-time temporal join is not supported yet.");
        }
    }
}

因此Temporal Join基于事件时间和处理时间的operator实现类分别为TemporalRowTimeJoinOperatorTemporalProcessTimeJoinOperator,二者继承共同父类BaseTwoInputStreamOperatorWithStateRetention

temporal继承关系

基于处理时间的temporal join只有使用temporal table function join方式时才会初始化TemporalProcessTimeJoinOperator,否则抛出异常。FLINK-19830中说明了这样做的原因:当前TemporalProcessTimeJoinOperator实现中,由于左表记录并不会等待右表记录的完整快照,存在可能会误导用户的语义问题。之所以支持temporal table function join方式,是由于这种方式已经存在很长时间的历史原因,出于兼容性烤考虑,所以继续支持它。

1. BaseTwoInputStreamOperatorWithStateRetention

BaseTwoInputStreamOperatorWithStateRetention,一个允许子类基于TTL清理它们状态的抽象TwoInputStreamOperator

该抽象类的主要作用是根据table.exec.state.ttl,默认值0配置项,决定是否开启处理时间timer(table.exec.state.ttl>1时开启)。开启后,对表中每条记录的key最多维护一个处理时间timer。子类通过实现其唯一的抽象方法cleanupState(long time)来决定timer触发时的具体动作。

  • 关键成员属性
// 取值table.exec.state.ttl
private final long minRetentionTime;
// 取值table.exec.state.ttl * 3 / 2
private final long maxRetentionTime;
// 保存已注册timer的触发时间
private transient ValueState<Long> latestRegisteredCleanupTimer;

ValueState<Long> latestRegisteredCleanupTimer状态中保存的是已注册的处理时间timer的触发时间,通过该状态值判断处理时间timer是否注册、更新等操作。除此之外minRetentionTimemaxRetentionTime表示状态最小、最大留存时间,用于计算处理时间timer的时间。

  • 关键方法
// 注册、更新处理时间timer,由子类主动调用
protected void registerProcessingCleanupTimer()
// timer触发时自动执行
public final void onProcessingTime
// 唯一的抽象方法,子类实现具体的timer触发时的执行逻辑
public abstract void cleanupState(long time)

registerProcessingCleanupTimer()方法负责新增、更新处理时间timer,由子类主动调用,核心逻辑如下

if(table.exec.state.ttl>1){
    if(timer未注册){
        注册处理时间timer,timer时间=curProcessingTime+maxRetentionTime;
    }else if(curProcessingTime+minRetentionTime > 已注册timer的时间){
        删除旧的timer;
        更新处理时间timer,timer时间=curProcessingTime+maxRetentionTime;
    }
}

onProcessingTime()方法在timer触发时自动执行,主要逻辑为执行cleanupState(long time)抽象方法,核心逻辑如下

public final void onProcessingTime(InternalTimer<Object, VoidNamespace> timer) throws Exception {
    if(table.exec.state.ttl>1){
        if(timer已注册 && 保存在状态中的time==timer){
            执行cleanupState(long time),该方法由子类具体实现;
            清空latestRegisteredCleanupTimer状态;
        }
    }
}

画个图来理解下处理时间timer

处理时间timer

假设table.exec.state.ttl配置为2h,则minRetentionTime=2h,maxRetentionTime=3h

1.在processTime=10:00时刻首次调用registerProcessingCleanupTimer()方法注册timer,该timer将会在curProcesstime+maxRetentionTime(13:00) 时触发执行。
2.在(10:00,11:00)的时间范围内,继续调用registerProcessingCleanupTimer()方法,将不会导致timer发生变更。
3.在[11:00,-)之后的时间内,继续调用registerProcessingCleanupTimer()方法,由于processTime+minRetentionTime > 13:00,将导致timer不断被重置。
如果之后始终不断调用registerProcessingCleanupTimer()方法,则timer将一直不会触发,只有在timer触发之前未调用registerProcessingCleanupTimer()方法时,timer才会被触发执行。

2. 基于事件时间的 Temporal join Operator

TemporalRowTimeJoinOperatorprocessElement1()processElement2()方法分别负责处理左右表中的每条记录。两个方法的处理逻辑类似。主要为以下步骤

  • 1.获取(左/右表)记录中事件时间
  • 2.将记录存储到对应状态中
  • 3.使用最小事件时间注册事件时间timer
  • 4.调用父类中的registerProcessingCleanupTimer()方法,注册处理时间timer

有以下几个关键点

  • 当左/右表中记录到达后并没有执行join操作并输出结果,仅仅是将数据放入到各自的状态中,并使用最小事件时间注册事件时间timer
  • 为什么是使用最小事件事件注册事件时间timer?

用于存储左右表记录状态的成员属性定义如下

private transient MapState<Long, RowData> leftState;
private transient MapState<Long, RowData> rightState;

二者的区别主要体现在key实际存储的内容上,leftState中key保存的内容是一个从1开始的自增数字,当左表中第一条数据到达后其状态中key=1,第二条数据其状态中key=2,依次递增。而rightState中key保存的是右表中每条记录的事件时间,这样设计的原因将从emitResultAndCleanUpState(long currentWatermark)方法中得到解释。

如何理解使用最小事件时间注册事件时间timer?

基于事件时间的temporal join的工作原理是先保存左右表中记录到状态中并注册timer,当watermark超过timer时间,在timer触发执行时来处理状态中的数据并输出结果。

如果为每个key的全部数据都注册事件时间timer,就会导致注册的timer数量巨大。例如左表记录中key=A的事件时间在流中的体现形式为{9,8,5,2,1},如果为这5个事件时间都注册timer,当收到watermark(10)时,这5个timer都将会触发,造成冗余。为了避免这种情况,对同一个数据记录key仅使用最小事件时间(事件时间1)注册的事件时间timer。这样不仅大大减少了timer数量,当收到watermark(10)时,只有一个timer被触发执行。

最小事件时间timer将会保存在ValueState<Long> registeredTimer,由左右表中的记录共同更新,当左/右表的数据记录到达processElement1()/processElement2()方法后,根据当前记录的事件时间和registeredTimer状态中已注册timer时间进行比较,使用最小事件时间重新注册或更新timer(更新操作为先删除旧timer,再注册新timer)。

上述过程在registerSmallestTimer(long timestamp)方法中完成,核心逻辑如下

// 入参timestamp为左右表中当前记录的事件时间
private void registerSmallestTimer(long timestamp) throws IOException {
    Long currentRegisteredTimer = registeredTimer.value();
    if(currentRegisteredTimer == null){
        registeredTimer.update(timestamp);
        注册timer;
    }else if(currentRegisteredTimer > timestamp){
        删除旧timer;
        registeredTimer.update(timestamp);
        注册新timer;
    }
}

processElement1()方法相比processElement2()稍有不同的地方在于,processElement1()方法中需要为左表中每条记录计算存储在leftState中的自增序号。使用ValueState<Long> nextLeftIndex状态保存自增序号。计算当前记录自增序号也很简单:状态值+1。

已注册的事件时间timer触发时onEventTime()方法将会执行,在该方法进行状态清理和结果输出。核心逻辑如下

  • 1.registeredTimer.clear();
  • 2.调用emitResultAndCleanUpState(long currentWatermark)方法来清理状态和下发结果,该方法会返回leftState中未处理数据中最小的事件时间(lastUnprocessedTime);
  • 3.lastUnprocessedTime有效的情况下,使用lastUnprocessedTime重新注册事件时间timer;
  • 4.调用父类方法,注册或清理处理时间timer,该步骤具体逻辑如下
if (stateCleaningEnabled) {
    if (lastUnprocessedTime < Long.MAX_VALUE || !rightState.isEmpty()) {
        registerProcessingCleanupTimer();
    } else {
        cleanupLastTimer();
        //  清空保存左表记录自增序号的状态
        nextLeftIndex.clear();
    }
}

最最最核心的join逻辑将在emitResultAndCleanUpState(long currentWatermark)方法中揭晓。该方法主要处理逻辑如下

1.从右表记录状态(rightState)中获取全部数据到本地list中(rightRowsSorted),该list已按照事件时间升序排序;
2.对左表记录状态(leftState)中全部数据,按照<=currentWatermark条件分成两部分,将满足条件的数据从状态中取出到本地map(orderedLeftRecords),并从状态中删除,该map使用TreeMap结构默认根据key(自增序号)升序排序,对不满足条件的数据部分计算其最小事件时间(lastUnprocessedTime);
因此之所以使用自增序号作为leftState状态中key的目是为了在处理左表记录时与记录到达顺序保持一致。
3.挨个处理orderedLeftRecords中数据,对每一条记录根据其事件时间使用二分查找从rightRowsSorted中获取右表中相同事件时间的的记录,如果右表中无相同事件时间的记录,则返回右表中<左表记录事件时间且最新事件时间的右表记录(rightRow);
4.根据第3步rightRow结果、join条件、是否是left join输出不同结果,该步骤具体逻辑如下

if (rightRow存在 && rightRow in(INSERT,UPDATE_AFTER)) {
    if (join条件成立) {
        输出左表 join 右表结果
    } else {
        if (isLeftOuterJoin) {
            输出左表 JOIN null结果
        }
    }
} else {
    if (isLeftOuterJoin) {
        输出左表 JOIN null结果
    }
}

5.orderedLeftRecords.clear();
6.删除右表状态(rightState)中记录事件时间<=currentWatermark的全部记录,但是在<=currentWatermark的记录中会保留中下事件时间最大的那条记录,这意味着如果右表记录已经全部过期,rightState中也会保留一条最新事件时间的记录。
通过记录事件时间<=currentWatermark的条件来判断右表中记录是否已失效。
7.返回lastUnprocessedTime时间。

最后父类cleanupState(long time)的方法实现如下。负责清空全部状态数据。

@Override
public void cleanupState(long time) {
    leftState.clear();
    rightState.clear();
    nextLeftIndex.clear();
    registeredTimer.clear();
}

以上即从TemporalRowTimeJoinOperator的关键逻辑。可以得到该join仅支持inner join或left join,join过程是以左表数据记录为主的,右表数据记录作为维表被动等待左表记录进行匹配。这也解释了官方文档中描述中“基于事件时间的temporal join允许对版本表进行join”,为什么仅将右表称为版本表的原因。

3. 基于处理时间的 Temporal join Operator

相比于TemporalRowTimeJoinOperator实现类,TemporalProcessTimeJoinOperator实现类要简单很多。仅仅定义了一个ValueState<RowData> rightState来保存右表记录中相同key的一条数据。左表记录不会通过状态进行存储,这也意味这在左表记录到达时将会立即输出join结果。

processElement1()方法中处理左表记录的流程如下

  • 1.从右表状态(rightState)中获取右表记录(rightSideRow);
  • 2.根据右表记录、join条件、是否left join来处理输出结果,该过程具体逻辑如下
if (rightSideRow == null) {
    // 右表记录为空
    if (left join) {
        输出左表 join null结果
    } else {
        return;
    }
} else {
    if (join条件成立) {
        输出左表 join 右表结果
    } else {
        if (left join) {
            输出左表 join null结果
        }
    }
    // 调用父类方法注册处理时间timer,进行状态清理
    registerProcessingCleanupTimer();
}

processElement2()方法中对右表记录的处理仅存储到状态或从状态中删除,核心逻辑如下

if (记录类型 in(INSERT,UPDATE_AFTER)) {
    //将记录保存到状态中,调用父类方法注册处理时间timer,进行状态清理
    rightState.update(element.getValue());
    registerProcessingCleanupTimer();
} else {
    // 清空右表记录状态,然后删除父类中的处理时间timer
    rightState.clear();
    cleanupLastTimer();
}

最后父类最后父类cleanupState(long time)的方法实现如下,清空保存右表记录的状态.

@Override
public void cleanupState(long time) {
    rightState.clear();
}

在基于处理时间的temporal join同样仅支持inner join或left join。以左表记录为主,右表记录被动等待左表记录进行匹配。

同基于事件时间的temporal join实现的最大区别在于左表记录并不会等待右表记录。当左表中key=A的记录到达后,尽管右表中同样存在key=A的数据,但是在左表key=A的记录到达时右表记录还未保存到状态中,左表记录仍然是无法正确关联到右表数据的。这就是上文提到的可能会引发用户误解的语义问题。

单纯从TemporalProcessTimeJoinOperator的实现来看,无论是(LATERAL TemporalTableFunction(o.proctime))形式的 temporal table function join还是 (FOR SYSTEM_TIME AS OF leftTable.processTime)形式的 temporal table join都是支持的。实际之所以必须使用temporal table function join,如上文提到的出于历史兼容和避免语义歧义考虑。


网站公告

今日签到

点亮在社区的每一天
去签到