1.Flink数据类型

发布于:2024-04-26 ⋅ 阅读:(26) ⋅ 点赞:(0)

概述

  在Flink内部处理数据时,涉及到数据的网络传输、数据的序列化及反序列化,Flink需要知道操作的数据类型,为了能够在分布式计算过程中对数据的类型进行管理和判断,Flink中定义了TypeInformation来对数据类型进行描述,通过TypeInfomation能够在数据处理之前将数据类型推断出来,而不是真正在触发计算后才识别出,这样可以有效避免用户在编写Flink应用的过程出现数据类型问题。
常用的 TypeInformation 有BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo
类等,针对这些常用TypeInfomation介绍如下:

  • Flink通过实现BasicTypeInfo数据类型,能够支持任意Java原生基本(或装箱)类型和String类型,例如:Integer,String,Double等,除了BasicTypeInfo外,类似的还BasicArrayTypeInfo,支持Java中数组和集合类型;
  • 通过定义TupleTypeInfo来支持Tuple类型的数据;
  • 通过CaseClassTypeInfo支持Scala Case Class ;
  • PojoTypeInfo可以识别任意的POJOs类,包括Java和Scala类,POJOs可以完成复杂数据架构的定义,但是在Flink中使用POJOs数据类型需要满足以下要求:
    • POJOs类必须是Public修饰且独立定义,不能是内部类;
    • POJOs 类中必须含有默认空构造器;
    • POJOs类中所有的Fields必须是Public或者具有Public修饰的getter和Setter方法;

注意

  在使用Java API开发Flink应用时,通常情况下Flink都能正常进行数据类型推断进而选择合适的serializers以及comparators,但是在定义函数时如果使用到了泛型(lambda),JVM就会出现类型擦除的问题,Flink就获取不到对应的类型信息,这就需要借助类型提示(Type Hints)来告诉系统函数中传入的参数类型信息和输出类型,进而对数据类型进行推断处理。

实践

非lambda写法

非lambda写法,Flink是能正常识别的

public class WcTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // source:hello flink
        DataStreamSource<String> ds = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<Tuple2<String, Integer>> tuple2Ds = ds.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")) {
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        KeyedStream<Tuple2<String, Integer>, String> keyedStream = tuple2Ds.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.f0;
            }
        });

        keyedStream.sum(1).print();

        env.execute();

    }

}
2024-04-25 11:26:16,255 [flink-akka.actor.default-dispatcher-8] [org.apache.flink.runtime.executiongraph.ExecutionGraph] [INFO] - Keyed Aggregation -> Sink: Print to Std. Out (6/8) (5fc86f952e35ab7079f08070c6694019_e70bbd798b564e0a50e10e343f1ac56b_5_0) switched from INITIALIZING to RUNNING.
3> (hello,1)
3> (hello,2)
3> (hello,3)
5> (world,1)
3> (hello,4)

在这里插入图片描述

在这里插入图片描述

lambda写法

public class WcTestL {

  public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // source:hello flink
      DataStreamSource<String> ds = env.socketTextStream("localhost", 9999);

      SingleOutputStreamOperator<Tuple2<String, Integer>> tuple2Ds =
              ds.flatMap((s, collector) -> {
                  for (String word : s.split(" ")) {
                      collector.collect(Tuple2.of(word, 1));
                  }
              });

      KeyedStream<Tuple2<String, Integer>, String> keyedStream = tuple2Ds.keyBy(stringIntegerTuple2 -> stringIntegerTuple2.f0);

      keyedStream.sum(1).print();

      env.execute();
}

问题如下,如注意章节所说
在这里插入图片描述

lambda改进

package com.fun.luxshare.test;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 单词统计
 * 数据类型
 */
public class WcTestL {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // source:hello flink
        DataStreamSource<String> ds = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<Tuple2<String, Long>> tuple2Ds =
                ds.flatMap((String s, Collector<Tuple2<String, Long>> collector) -> {
                    for (String world : s.split(" ")) {
                        collector.collect(Tuple2.of(world, 1L));
                    }
                }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        KeyedStream<Tuple2<String, Long>, String> keyedStream = tuple2Ds.keyBy(stringTuple2 -> stringTuple2.f0);

        keyedStream.sum(1).print();
        env.execute();
    }
}

在这里插入图片描述

结束

Flink数据类型