8、Flink 在 source 处生成水位线 和 在 source 之后生成水位线案例

发布于:2024-04-30 ⋅ 阅读:(18) ⋅ 点赞:(0)

1、AtSourceGenerateWatermark
注意:从 Flink 1.17开始,FLIP-27 源框架支持拆分级别的水印对齐。

import java.time.Duration;

public class _02_AtSourceGenerateWatermark {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("my-broker")
                .setTopics("my-topic")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStreamSource<String> source = env.fromSource(kafkaSource
                , WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
                , "kafka_source"
                , TypeInformation.of(new TypeHint<String>() {
                }));

        source.print();

        env.execute();
    }
}

2、在 source 之后生成水位线

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;

public class _03_AfterSourceGenerateWatermark {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<_01_MyEvent> eventMap = source.map(new MapFunction<String, _01_MyEvent>() {
            @Override
            public _01_MyEvent map(String value) throws Exception {
                String[] fields = value.split(",");
                return new _01_MyEvent(Integer.parseInt(fields[0]),
                        fields[1],
                        Long.parseLong(fields[2]));
            }
        });

        SingleOutputStreamOperator<_01_MyEvent> timestampsAndWatermarks = eventMap.assignTimestampsAndWatermarks(WatermarkStrategy.<_01_MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner(new SerializableTimestampAssigner<_01_MyEvent>() {
                    @Override
                    public long extractTimestamp(_01_MyEvent element, long recordTimestamp) {
                        return element.getEventTime();
                    }
                }));

        timestampsAndWatermarks.print();

        env.execute();
    }
}