Flink学习(七)-单词统计

发布于:2024-04-25 ⋅ 阅读:(28) ⋅ 点赞:(0)

前言

Flink是流批一体的框架。因此既可以处理以流的方式处理,也可以按批次处理。

一、代码基础格式

//1st 设置执行环境
xxxEnvironment env = xxxEnvironment.getEnvironment;

//2nd 设置流
DataSource xxxDS=env.xxxx();

//3rd 设置转换
Xxx transformation =xxxDS.xxxx();

//4th 设置sink
transformation.print();

//5th 可能需要
env.execute();

二、Demo1 批处理

  • 源码

 public static void main(String[] args) throws Exception {
        //1,创建一个执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2,获取输入流
        DataSource<String> lineDS = env.readTextFile("input/word.txt");
        //3,处理数据
        FlatMapOperator<String, Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                //3.1 分隔字符串
                String[] values = value.split(" ");
                //3.2 汇总统计
                for (String word : values) {
                    Tuple2<String, Integer> wordTuple = Tuple2.of(word, 1);
                    collector.collect(wordTuple);
                }
            }
        });
        //4,按单词聚合
        UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping = wordDS.groupBy(0);
        //5,分组内聚合
        AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);

        //6,输出结果
        sum.print();
    }
  • 效果展示

三、Demo2 流处理

  • 源码

   public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> lineDS = env.readTextFile("input/word.txt");

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    Tuple2<String, Integer> temp = Tuple2.of(word, 1);
                    collector.collect(temp);
                }
            }
        });

        KeyedStream<Tuple2<String, Integer>, Tuple> wordCountKeyBy = wordDS.keyBy(0);
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordCountKeyBy.sum(1);
        sum.print();
        env.execute();

    }
  • 效果展示

四、Demo3 无边界流处理

  • 源码

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> lineDS = env.socketTextStream("192.168.3.11", 9999);

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = lineDS.flatMap(
                        (String value, Collector<Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1));
                            }
                        }
                ).returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .sum(1);

        sum.print();

        env.execute();

    }
  • 效果展示 

往192.168.3.11的9999端口上持续输送数据流,程序端会出现如下统计