Flink nc -l -p 监听端口测试

发布于:2024-06-22 ⋅ 阅读:(146) ⋅ 点赞:(0)

1、9999端口未占用

netstat -apn|grep 9999

2、消息发送端

nc -l -k -p 9999
{"user":"ming","url":"www.baidu1.com", "timestamp":1200L, "score":1}
{"user":"xiaohu","url":"www.baidu5.com","timestamp":1267L, "score":10}
{"user":"ming","url":"www.baidu7.com","timestamp":4200L, "score":9}
{"user":"xiaohu","url":"www.baidu8.com","timestamp":5500L, "score":90}
{"user":"Biu","url":"www.baidu8.com","timestamp":5500L, "score":1000}

{"user":"ming","url":"www.baidu1.com", "timestamp":1717171200000, "score":1}
{"user":"xiaohu","url":"www.baidu5.com","timestamp":1717171202000, "score":10}
{"user":"ming","url":"www.baidu7.com","timestamp":1717171260000, "score":9}
{"user":"xiaohu","url":"www.baidu8.com","timestamp":1717264860000, "score":90}
{"user":"Biu","url":"www.baidu8.com","timestamp":1718780790000, "score":1000}

3、运行

周期性水位线

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.sql.Timestamp;
import java.util.ArrayList;

/**
 * Description: 
 * forMonotonousTimestamps->AscendingTimestampsWatermarks 有序流 -> 自定义断点式水位线(周期延迟时间=0ms)\
 * forBoundedOutOfOrderness->BoundedOutOfOrdernessWatermarks 无序流 -> 自定义周期性水位线
 */
public class FlinkPeriodicWatermarkGeneratorTestJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//        ArrayList<Event> list = new ArrayList<>();
//        list.add(new Event("ming","www.baidu1.com",1200L));
//        list.add(new Event("xiaohu","www.baidu5.com",1267L));
//        list.add(new Event("ming","www.baidu7.com",4200L));
//        list.add(new Event("xiaohu","www.baidu8.com",5500L));
//
//        DataStreamSource<Event> ds = env.fromCollection(list, BasicTypeInfo.of(Event.class));

        DataStreamSource<String> dss = env.socketTextStream("test002", 9999);
        SingleOutputStreamOperator<Event> ds = dss.map(new MapFunction<String, Event>() {
            @Override
            public Event map(String value) throws Exception {
                Event event = new Event();
                event.toEvent(value);
                return event;
            }
        });
//        ds.print();
        SingleOutputStreamOperator<Event> watermarks = ds
                // AscendingTimestampsWatermarks 有序流 查看源码,实际上是延迟时间=0ms的乱序流
//                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                // BoundedOutOfOrdernessWatermarks 无序流 5ms固定延迟时间/表示最大乱序程度 处理乱序流数据
                .assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {
                    @Override
                    public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                        return new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.getTimestamp();
                            }
                        };
                    }

                    @Override
                    public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                        return new WatermarkGenerator<Event>() {
                            private Long delayTime = 5000L; // 延迟时间

                            private Long maxTs = Long.MIN_VALUE + delayTime + 1L;

                            @Override
                            public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
                                // 每来一条数据就调用一次
                                maxTs = Math.max(event.timestamp, maxTs);// 更新最大时间戳
                            }

                            @Override
                            public void onPeriodicEmit(WatermarkOutput output) {
                                // 发射水位线,默认 200ms 调用一次 可以使用 env.getConfig().setAutoWatermarkInterval(60 * 1000L); 调整周期时间 flink时间窗口(左开,右闭]
                                output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
                            }
                        };
                    }
                });

        ds.print();

        env.setParallelism(1);
        env.execute();
    }

    public static class Event{
        String user;
        String url;
        Long timestamp;

        public Event(){
        }
        public Event(String user, String url, Long timestamp) {
            this.user = user;
            this.url = url;
            this.timestamp = timestamp;
        }

        public String getUser() {
            return user;
        }

        public String getUrl() {
            return url;
        }

        public Long getTimestamp() {
            return timestamp;
        }

        @Override
        public String toString() {
            return "Event{" +
                    "user='" + user + '\'' +
                    ", url='" + url + '\'' +
                    ", timestamp=" + new Timestamp(timestamp) +
                    '}';
        }

        public void toEvent(String val){
            JSONObject js = JSONObject.parseObject(val);
            this.user = js.getString("user");
            this.url = js.getString("url");
            this.timestamp = js.getLong("timestamp");
        }
    }
}

断点式水位线

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.sql.Timestamp;
import java.util.ArrayList;

/**
 * Description: 
 * forMonotonousTimestamps->AscendingTimestampsWatermarks 有序流 -> 自定义断点式水位线(周期延迟时间=0ms)\
 * forBoundedOutOfOrderness->BoundedOutOfOrdernessWatermarks 无序流 -> 自定义周期性水位线
 */
public class FlinkPunctuatedWatermarkGeneratorTestJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> dss = env.socketTextStream("test002", 9999);
        SingleOutputStreamOperator<Event> ds = dss.map(new MapFunction<String, Event>() {
            @Override
            public Event map(String value) throws Exception {
                Event event = new Event();
                event.toEvent(value);
                return event;
            }
        });
//        ds.print();
        SingleOutputStreamOperator<Event> watermarks = ds
                // AscendingTimestampsWatermarks 有序流 查看源码,实际上是延迟时间=0ms的乱序流
//                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                // BoundedOutOfOrdernessWatermarks 无序流 5ms固定延迟时间/表示最大乱序程度 处理乱序流数据
                .assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {
                    @Override
                    public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                        return new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.getTimestamp();
                            }
                        };
                    }

                    @Override
                    public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                        return new WatermarkGenerator<Event>() {
                            @Override
                            public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
                                // 只有在遇到特定的 itemId 时,才发出水位线
                                if (event.getUser().equals("Biu")) {
                                    output.emitWatermark(new Watermark(event.getTimestamp() - 1));
                                }
                            }

                            @Override
                            public void onPeriodicEmit(WatermarkOutput output) {
                                // 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线
                            }
                        };
                    }
                });

        ds.print();

        env.setParallelism(1);
        env.execute();
    }

    public static class Event{
        String user;
        String url;
        Long timestamp;

        public Event(){
        }
        public Event(String user, String url, Long timestamp) {
            this.user = user;
            this.url = url;
            this.timestamp = timestamp;
        }

        public String getUser() {
            return user;
        }

        public String getUrl() {
            return url;
        }

        public Long getTimestamp() {
            return timestamp;
        }

        @Override
        public String toString() {
            return "Event{" +
                    "user='" + user + '\'' +
                    ", url='" + url + '\'' +
                    ", timestamp=" + new Timestamp(timestamp) +
                    '}';
        }

        public void toEvent(String val){
            JSONObject js = JSONObject.parseObject(val);
            this.user = js.getString("user");
            this.url = js.getString("url");
            this.timestamp = js.getLong("timestamp");
        }
    }
}


4、打印

3> Event{user='ming', url='www.baidu1.com', timestamp=1970-01-01 08:00:01.2}
4> Event{user='xiaohu', url='www.baidu5.com', timestamp=1970-01-01 08:00:01.267}
5> Event{user='ming', url='www.baidu7.com', timestamp=1970-01-01 08:00:04.2}
6> Event{user='xiaohu', url='www.baidu8.com', timestamp=1970-01-01 08:00:05.5}

参考:

【Flink】Flink 中的时间和窗口之水位线(Watermark)-CSDN博客

Flink watermark_nc -lp 9999-CSDN博客

NoteWarehouse/05_BigData/09_Flink(1).md at main · FGL12321/NoteWarehouse · GitHub


网站公告

今日签到

点亮在社区的每一天
去签到