Flink最佳实践 - Watermark原理及实践问题解析

发布于:2025-06-25 ⋅ 阅读:(18) ⋅ 点赞:(0)

原文链接

Watermark概念

关于Watermark的概念, Flink文档中的描述比较容易理解, 以下是文档中的原话.

A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark).

简单来说, 一个Watermark就是一个标识, 一个时间戳为t的Watermark表示Event Time小于或等于t的事件都已经到达. 有了这个前提, 基于Event Time的窗口计算能产生准确的结果, 例如, 如果一个时间窗口的结束时间为t0, 当前已经产生的最大Watermark为t1, 并且t1>t0
, 那么现在触发该窗口的计算可以得到准确的结果, 因为属于该窗口的数据都已经到达.

Watermark可以用来平衡计算结果的准确性和延迟.

如果从Watermark的角度来审视一下批处理和流处理的关系我们可以发现: 在批处理中我们实际上是使用了非常宽松的Watermark策略, 比如我们通常会在当天处理前一天的数据, 这样的Watermark策略可以理解为将当前观察到的数据的最大时间戳减去数个小时作为Watermark. 在这样宽松的Watermark策略下, 总能保证在批处理程序启动时所有数据已经全部到达, 因此产生准确的结果. 而在流处理中我们通常会使用紧迫的Watermark策略, 以更快得到处理结果, 这在降低延迟的情况下可能牺牲一定的准确性. 从中也可以看出, Watermark是统一流处理与批处理的重要理论依据.

Flink中的Watermark

Watermark的生成

https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners

Watermark的传播

Watermark可在Source或之后的操作中生成, 一旦生成必须不断往下游传播. 要理解Watermark在Flink中的传播流程, 我们先来看下Flink中Watermark的实体是什么.Flink中有一个Watermark类, 它和StreamRecord(DataStream中的每条数据在运行时都作为StreamRecord的实例在算子间流转)一样都继承自StreamElement. 也就是说在Flink中, Watermark同数据流中的数据一样是以StreamElement实例在算子间流转的.

对于无需存数据的算子(如map算子), 在接收到Watermark之后可直接发送到下游. 而对于需要缓存数据的算子, 如窗口算子, 则需要分两种情况: 若是当前Watermark没有触发计算, 那么直接将其往下游发送即可; 若是当前Watermark触发了计算, 那么需要等计算结果发送之后, 再发送Watermark.

另外需要注意的是在并行流中, 算子会向下游的所有算子广播Watermark. map算子会向window算子广播Watermark, 而window算子会将来自上游各个算子的Watermark的最小值作为当前Watermark. 这一机制产生的现象就是所有window算子的Watermark都是一致的.

并行流下的Watermark

非Source算子生成Watermark的问题
Flink DataStream API支持在非Source算子中生成Watermark, 然而在并行流下这种生成方式很可能出现问题.
假设有如下Flink作业图, 数据源有三个Paritition, Flink作业的并行度为2, 在map算子之后调用了assignTimestampsAndWatermarks生成Watermark, 策略是当前最大时间戳减5秒. keyby之后是两个Tumbling window算子, 窗口大小为10秒.

输入数据的格式为(id, value, time). 并且id为0的数据会进入partition(0), 依次类推. 由于Flink作业的并行度为2, 因此其中一个Source算子必然会对应两个Partition. 现在如果有如下数据按顺序达到, 那么由于在第5条数据达到的时候, 窗口[2022-04-25 10:00:00, 2022-04-25 10:00:10]被触发, 之后达到的第6条数据就会变为迟到数据.

0,A,2022-04-25 10:00:00
1,B,2022-04-25 10:00:00
2,C,2022-04-25 10:00:00
1,B,2022-04-25 10:00:16
2,C,2022-04-25 10:00:16
0,A,2022-04-25 10:00:08

上述案例中, 其中一个Source算子读取了两个Partition的数据, 且并没有将各个Partition的Watermark进行对齐, 在之后的map算子中, 来自多个Partition的数据进行混合, 因此生成的Watermark将无法表达各个Partition内数据的情况. 而如果将Watermark的生成放到Source算子中, 那么即使Source算子的并行度与Partition数量不同, Source算子也会为每个Partition生成单独的Watermark并进行对齐, 这样就不会出现上述问题了.

因此, 在并行流下对于像Kafka这样的多Partition数据源, 应该把Watermark的生成放在Source算子中. 在原来的SourceFunction API和最新的Source API中都提供了对应的方法.

空闲Source的问题

上文也说到了, 在并行流下Flink会实行Watermark对齐, 即上游算子向下游广播Watermark, 下游算子将来自上游各个算子的Watermark的最小值作为当前的Watermark. 在多Partition情况下, 如果存在长时间不更新数据的空闲Partition, 那么由于Watermark对齐, 会使其他window算子长时间得不到触发.

这里还是以一个详细的案例说明上述情况. 假设有如下Flink作业图, Watermark在Source算子中生成, 生成策略, window类型及大小, 输入数据的格式和对应Partition同上一小节.

0,A,2022-04-25 10:00:00
1,B,2022-04-25 10:00:00
1,B,2022-04-25 10:00:01
1,B,2022-04-25 10:00:08
1,B,2022-04-25 10:00:13
...一直是id为1的数据
1,B,2022-04-25 10:30:00
0,A,2022-04-25 10:30:00

现在我们有如下数据按顺序到达, 可以看到, 在2022-04-25 10:00:00之后的半个小时内都没有id为0数据, 也就是说partition(0)中有很长一段时间没有数据写入, 而partition(1)中不断有数据写入. 但是由于Watermark对齐机制, window算子的Low Watermark一直无法推进, 需要一直等到直到partition(0)重新有数据写入.

上述这种情况会导致两个问题:

  • 其一是window算子中需要缓存大量id为1的数据, 使得作业的State不断增大, 给Checkpoint带来压力, 失败后的重启时间也变大.

  • 其二是计算结果的延迟变大, 可以看到id为1在2022-04-25 10:00:00后的数据最大需要延迟30分钟后才能输出计算结果.

对于空闲Source的问题, Flink中已经提供了解决方案. 可以在WatermarkStrategy中通过withIdleness指定判断Source为空闲的最大时间间隔. 比如以下代码表示, 如果一个Source超过1分钟没有数据更新, 那么将该Source标识为空闲Source, 其他Source的Watermark不需要再与该Source对齐.

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withIdleness(Duration.ofMinutes(1));

Event Time倾斜的问题

在并行流下多Partition数据源中可能产生的另一个问题是Event Time倾斜. Event Time倾斜即各个Partition中数据的Event Time推进不一致, 部分Partition中的Event Time与其他Partition中的Event Time存在较大差距. 在这种情况下, 由于Watermark对齐机制, 就导致了下游Low Watermark不能推进, 而Event Time推进较快的Partition的数据又被不断读入, 对于需要Watermark触发的window算子就会缓存大量数据.

举例来说, 如果我们从某个Kafka Topic的开头读取历史数据, 各个分区的Event Time很可能并不同步, 如果一个Partition的Event Time明显比其他Partition慢, 那么由于Watermark对齐, window算子的Low Watermark会被拖慢, 而其他分区的数据又在不断读入, 这就造成了大量的数据缓存.

从表象上来看, 空闲Source和Event Time倾斜都会造成大量的数据缓存, 不过这两个问题是存在本质区别的:

  • 空闲Source是某一Partition在一段时间内没有数据写入, 经过一段时间后又有数据写入, 在这个过程中数据的Event Time存在跳跃式推进, 也就是说这段时间内确实没有数据, 而不是数据迟到. 在这种情况下我们可以将这个Partition标识为空闲从而直接忽略.

  • 然而在Event Time倾斜问题中, 各个Partition中并不存在Event Time的跳跃式推进, 也就是说并不存在某个Partition在某段时间内没有数据, 而是各个Partition的Event Time推进不一致. 这也就无法通过将某个Partition标识为空闲解决.

对于Event Time倾斜问题, 在Flink 1.15中提供了解决方案. 可通过如下方式使用, 即通过在WatermarkStrategy中添加withWatermarkAlignment来实现各个Source的Watermark同步推进. 它实现了这样的语义, 如果两个Source间Watermark的差值超过了一个给定值maxAllowedWatermarkDrift, 那么停止读取Watermark推进较快的Source, 等到两个Source间的Watermark小于maxAllowedWatermarkDrift时再重新开始读取该Source. 需要注意的是Source间的Watermark同步只支持新的Source API, 且只能通过以下方式使用, 在DataStream.assignTimestampsAndWatermarks中使用是无效的.

DataStream<Tuple2<Long, String>> streamSource = env.fromSource(
        source,
        WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1)),
        "source");

Watermark生成实践

总结

Watermark是流处理中的重要抽象, 它是平衡流处理准确性和延迟的机制. Watermark需要根据数据的具体情况和已经观察到的数据来生成. Flink在DataStream API和SQL API中都已经增加了对Watermark的支持, 不过对于复杂的Watermark策略, 目前只能用DataStream API实现. 在并行流下使用Watermark可能会出现两类问题: 一类是由于在非Source中生成Watermark导致数据乱序, 这一问题是可以避免的; 另一类问题是由于数据特征而导致的空闲Source和Event Time倾斜, Flink现在也引入了对这两种问题的解决方案.

补充参考材料

深入理解流计算中的 Watermark

Flink Event Time 倾斜