【Flink】迟到数据的处理

发布于:2023-01-22 ⋅ 阅读:(18) ⋅ 点赞:(0) ⋅ 评论:(0)

窗口

        在流上的工作方式与批处理不同,因为流通常是无限的,所以不可能计算流中的所有元素,流上的聚合事件则由窗口限定,例如“过去 5 分钟的计数”或“最后 100 个元素的总和”。划定的一段范围,称为窗口;在这个范围内的数据进行处理,就是窗口计算。

        窗口按照驱动类型可分为:时间窗口、基数窗口。

 

        按照窗口分配数据的规则可分为:滚动窗口、滑动窗口、会话窗口、全局窗口

时间语义

        当在流式程序中,可以引用不同的时间概念:处理时间、事件时间、摄入时间

        处理时间:执行处理操作的机器的系统时间;

        事件时间:每个事件在对应设备上发生的时间,即数据产生的时间;

        摄入时间:数据进入flink数据流的时间,即数据源算子读入数据的时间,相当于事件时间和处理时间的中和,把数据源任务的处理时间当作数据的产生时间添加到数据里。

水位线 

        在实际应用中,一般会采用事件时间语义。而水位线,就是基于事件时间提出的概念。

        在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了。在 Flink中,这种用来衡量事件时间进展的标记,就被称作“水位线 Watermark)。

        为了提高效率,一般会每隔一段时间生成一个 水位线,这个水位线的时间戳,就是当前最新数据的时间戳。

注意:对于水位线的周期性生成,周期时间是指处理时间,而不是事件时间。

有序流

        对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,就是直接拿当前最大的时间戳作为水位线就可以了。

stream.assignTimestampsAndWatermarks(
    WatermarkStrategy.<Event>forMonotonousTimestamps()
        .withTimestampAssigner(new SerializableTimestampAssigner<Event>()
            @Override
                public long extractTimestamp(Event event, long recordTimestamp)
                    return event.timestamp;
                }
        })
}

 乱序流

        由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。

stream.assignTimestampAndWatermarks(
    // 针对乱序流插入水位线,延迟时间设置为 5s
    WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner (new SerializableTimestampAssigner<Event>() {
            // 抽取时间戳的逻辑
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        })
}

注意:乱序流中生成的水位线真正的时间戳,其实是当前最大时间戳—延迟时间—1。在BoundedOutOfOrdernessWatermarks的源码onPeriodicEmit()可以看到。

迟到数据

        迟到数据是指某个水位线之后到来的数据,其自身的时间戳应该在水位线之前,并且只有在事件时间语义下,迟到数据的处理才有意义。具体处理办法如下:

1、设置水位线延迟时间

        水位是所有事件时间定时器触发的判断标准。那么水位线的延迟,就是全局时钟的滞后。当我们设置了水位线延迟时间后,所有定时器就都会按照延迟后的水位线来触发。如果一个数据所包含的时间戳,小于当前的水位线,那么它就是迟到数据。

stream.assignTimestampAndWatermarks(
    WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
        .withTimestampAssigner (new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        })
}

2、允许窗口处理迟到数据

        因为流处理的实时性至关重要,所以一般情况下不会把水位线的延迟设置的太大。当水位线延迟时间设置较短时,可考虑使用Flink的窗口,Flink的窗口也是可以设置延迟时间,允许继续处理迟到数据的。

.allowedLateness(Time.minutes(1))

3、将迟到数据放入窗口侧输出流

        经过上两种方式,已经能够处理大部分迟到数据,但窗口不能一直不关闭。为了保证处理结果的正确性,一旦窗口关闭,需要对剩余小部分的迟到数据进行处理。

        最后一种方式是:用窗口的侧输出流来收集关窗以后的迟到数据,但只能保证数据不丢失;因为窗口已经真正关闭,所以是无法基于之前窗口的结果直接做更新的。我们只能将之前的窗口计算结果保存下来,然后获取侧输出流中的迟到数据,判断数据所属的窗口,手动对结果进行合并更新。尽管有些烦琐,实时性也不够强,但能够保证最终结果一定是正确的。

.sideOutputLateData(xxxxx)

总结

        Flink处理迟到数据,对于结果的正确性有三重保障:水位线的延迟,窗口允许迟到数据,将迟到数据放入窗口侧输出流。