flink学习(9)——time+water mark

发布于:2024-11-29 ⋅ 阅读:(37) ⋅ 点赞:(0)

Time的分类 (时间语义)

EventTime:事件(数据)时间,是事件/数据真真正正发生时/产生时的时间

IngestionTime:摄入时间,是事件/数据到达流处理系统的时间

ProcessingTime:处理时间,是事件/数据被处理/计算时的系统的时间

EventTime 

一个订单数据,支付数据的事件时间是11点59分(发送支付请求的时间),而支付数据的处理时间是12点01分(完成订单,扣钱的时间)

问题:
如果要统计12之前的订单金额,那么这笔交易是否应被统计?
答案:
应该被统计,因为该数据的真真正正的产生时间为11点59分,即该数据的事件时间为11点59分,
事件时间能够真正反映/代表事件的本质! 所以一般在实际开发中会以事件时间作为计算标准。
——要想使用eventTime进行统计,那么必须添加waterMark(水印)

Watermark——水印,水位线

为什么会有WaterMark?

当flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示:
————只能解决短期时间的问题

Watermark的核心本质可以理解成一个延迟触发机制。

假如没有这个水位线
1、错过了就是错过了,然后坐下一班车
2、这几辆车均留下,一直等着,那么什么时候开车呢——waterMark
3、当此时出现的最大事件事件 - 最大允许的延迟时间(或最大允许的乱序度时间) = waterMark

4、车走的时机,也就是窗口触发的时机
    有了Watermark就可以根据Watermark来决定窗口的触发时机,满足下面的条件才触发:
    1.窗口有数据
    2.Watermark >= 窗口的结束时间

可以认为水印时间就是一个标志时间戳,用于判断这个窗口等待多长时间

举例:
窗口5秒,延迟(水印)3秒,按照事件时间计算
数据事件时间3, 落入窗口0-5.水印时间0
来一条数据事件时间7, 落入窗口6-10,水印时间4
来一条数据事件时间4,落入窗口0-5,水印时间4
来一条数据事件时间8,落入窗口6-10,水印时间5
这一条数据水印时间大于等于窗口0-5的窗口结束时间。
满足了对窗口0-5的提交,这个窗口关闭,并触发数据计算
可以看出,第三条数据,其是延迟数据,它的事件时间是4,却来的比事件时间为7的数据还要晚。

但是因为水印的机制,这个数据未错过它的窗口,依旧成功进入属于它的窗口并且被计算
这就是水印的功能:在不影响按照事件时间判断数据属于哪个窗口的前提下,延迟某个窗口的关闭时间,让其等待一会儿延迟数据。

 并行度的水印触发

在多并行度下,每个并行有一个水印
比如并行度是6,那么程序中就有6个watermark
分别属于这6个并行度(线程)

那么,触发条件以6个水印中最小的那个为准

比如, 有个窗口是0-5
其中5个并行度的水印都超过了5
但有一个并行度的水印是3
那么,不管另外5个并行度中的水印达到了多大,都不会触发
因为6个并行度中的6个水印,最小的是3,不满足大于等于窗口结束5的条件

Watermark代码演示

实体类 OrderInfo

package com.bigdata.day05;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderInfo {

        private String orderId;
        private int uId;
        private double money;
        private long timeStamp;

}

自定义source

// 为了防止并行度的影响,因此设置为一个并行度
package com.bigdata.day05;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;
import java.util.UUID;

public class MySource implements SourceFunction<OrderInfo>{
        private boolean flag = true;


        @Override
        public void run(SourceContext<OrderInfo> ctx) throws Exception {
            Random random = new Random();
            while (flag){

                String orderId = UUID.randomUUID().toString();

                int uId = random.nextInt(10);
                double money =Math.round(random.nextDouble() * 100);
                long timeStamp = System.currentTimeMillis() - random.nextInt(3000);
                OrderInfo orderInfo = new OrderInfo(orderId,uId,money,timeStamp);
                ctx.collect(orderInfo);
                Thread.sleep(20);

            }



        }
        @Override
        public void cancel() {
            flag = false;
        }

}

CartInfo案例_waterMark

// 滑动窗口 
每隔5秒收集5秒的数据
计算每个用户总共的订单金额
package com.bigdata.day05;

import java.time.Duration;


public class _02_CartInfo案例_waterMark {
    public static void main(String[] args) throws Exception {
        // 获取环境 
        StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
        // 添加数据
        DataStreamSource<OrderInfo> orderInfoDataStreamSource = env.addSource(new MySource());
        
        // 设置水印
        SingleOutputStreamOperator<OrderInfo> watermarkSource = orderInfoDataStreamSource
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.
                        // 最大延迟时间为3秒
  <OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
            @Override
            public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {
                // 注意:时间戳和 watermark 都是从 1970-01-01T00:00:00Z 起的 Java 纪元开始,并以毫秒为单位。
                return orderInfo.getTimeStamp();
            }
        }));

        watermarkSource.keyBy(new KeySelector<OrderInfo, Integer>() {

            @Override
            public Integer getKey(OrderInfo value) throws Exception {
                return value.getUId();
            }
        }).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {
            @Override
            public void apply(Integer uId, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception {
                double sumMoney = 0;

                for (OrderInfo orderInfo : input) {
                    sumMoney += orderInfo.getMoney();
                }
                String startStr = DateFormatUtils.format(window.getStart(), "yyyy-MM-dd HH:mm:ss");
                String endStr = DateFormatUtils.format(window.getEnd(), "yyyy-MM-dd HH:mm:ss");
                out.collect("uid: "+uId+","+"sumMoney: "+sumMoney+","+"startTime: "+startStr+","+"endTime: "+endStr);

            }
        }).print().setParallelism(1);

        env.execute();
    }
}
1、时间戳的分配与 watermark 的生成是齐头并进的,
2、可以告诉 Flink 应用程序事件时间的进度。通过指定 WatermarkGenerator 来配置 watermark 的生成方式。
3、Watermark定义方式,首先assignTimestampsAndWatermarks
    然后
    1、实现WatermarkStrategy 接口
    2、通过WatermarkStrategy 工具类
    
    第二种方式的具体实现:
    orderInfoDataStreamSource
        .assignTimestampsAndWatermarks(
                WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
    @Override
    public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {
        // 注意:时间戳和 watermark 都是从 1970-01-01T00:00:00Z 起的 Java 纪元开始,并以毫秒为单位。 若不是则需要转为毫秒值
        return orderInfo.getTimeStamp();
    }
}));


4、WatermarkStrategy 可以在 Flink 应用程序中的两处使用,
第一种是直接在数据源上使用
第二种是直接在非数据源的操作之后使用