第四章 source数据源应用

发布于:2022-10-16 ⋅ 阅读:(487) ⋅ 点赞:(0)

第四章 source数据源应用

一、什么能被转化成流?

::: tip 如果不懂序列化?
java序列化定义
:::

1、flink数据类型

在这里插入图片描述

2、Flink 的序列化过程

在 Flink 序列化过程中,进行序列化操作必须要有序列化器,那么序列化器从何而来?
每一个具体的数据类型都对应一个 TypeInformation 的具体实现,每一个 TypeInformation 都会为对应的具体数据类型提供一个专属的序列化器。通过 Flink 的序列化过程图可以看到 TypeInformation 会提供一个 createSerializer() 方法,通过这个方法就可以得到该类型进行数据序列化操作与反序列化操作的对象 TypeSerializer

对于大多数数据类型 Flink 可以自动生成对应的序列化器,能非常高效地对数据集进行序列化和非序列化,比如,BasicTypeInfo、WritableTypeInfo 等,但针对 GenericTypeInfo 类型,Flink 会使用 Kryo 进行序列化和反序列化。其中,Tuple、Pojo 和 CaseClass 类型是复合类型,它们可能嵌套一个或者多个数据类型。在这种情况下,他们的序列化器同样是复合的。它们会将内嵌类型的序列化委托给对应类型的序列化器。
在这里插入图片描述

3、序列化规则

Flink的java和Scala DataStream API可以将任何可序列化的对象转化为流。Flink自带的序列化器有

  • 基本类型,即 String、Long、Integer、Boolean、Array
  • 复合类型:Tuples、POJOs 和 Scala case classes而且 Flink 会交给 Kryo 序列化其他类型。也可以将其他序列化器和 Flink 一起使用。特别是有良好支持的 Avro。

Java tuples 和 POJOs

Flink 的原生序列化器可以高效地操作 tuples 和 POJOs
Tuples
对于 Java,Flink 自带有 Tuple0 到 Tuple25 类型。

Tuple2<String, Integer> person = Tuple2.of("Fred", 35);

// zero based index!  
String name = person.f0;
Integer age = person.f1;

POJOs
如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许“按名称”字段引用):

  • 该类是公有且独立的(没有非静态内部类)
  • 该类有公有的无参构造函数
  • 类(及父类)中所有的所有不被 static、transient 修饰的属性要么是公有的(且不被 final 修饰),要么是包含公有的 getter 和 setter 方法,这些方法遵循 Java bean 命名规范。
    示例:
public class Person {
    public String name;  
    public Integer age;  
    public Person() {}
    public Person(String name, Integer age) {  
        . . .
    }
}  
Person person = new Person("Fred Flintstone", 35);

4、序列化过程案例

下面就以内嵌型的 Tuple3 这个对象为例,简述一下它的序列化过程。
Tuple3 包含三个层面,一是 int 类型,二是 double 类型,三是 Person。Person 包含两个字段,一个是 int 类型的 ID,另一个是 String 类型的 name,它在序列化操作时,会委托相应具体的序列化器进行相应的序列化操作。从上图中可以看出 Tuple3 会把 int 类型通过 IntSerializer 进行序列化操作,此时 int 只需要占有 4 个字节就可以了。根据 int 占有 4 个字节,这个能够体现出 Flink 可序列化过程中的一个优势,即在知道数据类型的前提下,可以更好的进行相应的序列化与反序列化操作。相反,如果采用 Java 的序列化,虽然能够存储更多的属性信息,但一次占据的存储空间会受到一定的损耗。

Person 类会被当成一个 Pojo 对象来进行处理,PojoSerializer 序列化器会把一些属性信息使用一个字节存储起来。同样,其字段则采取相对应的序列化器进行相应序列化,在序列化完的结果中,可以看到所有的数据都是由 MemorySegment 去支持。MemorySegment 具有什么作用呢?

MemorySegment 在 Flink 中会将对象序列化到预分配的内存块上,它代表 1 个固定长度的内存,默认大小为 32 kb。MemorySegment 代表 Flink 中的一个最小的内存分配单元,相当于是 Java 的一个 byte 数组。每条记录都会以序列化的形式存储在一个或多个 MemorySegment 中。

二、自定义数据源

Source 是你的程序从中读取其输入的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。Flink 自带了许多预先实现的 source functions。
addSource - 关联一个新的 source function。例如,你可以使用 addSource(new FlinkKafkaConsumer<>(…)) 来从 Apache Kafka 获取数据。更多详细信息见连接器

非并行

  • 实现SourceFunction接口

并行

  • 实现ParallelSourceFunction接口
  • 继承RichParallelSourceFunction类

三、预定义数据源

1、基于文件

  • readTextFile(path) - 读取文本文件,例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。

  • readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。

  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是前两个方法内部调用的方法。它基于给定的 fileInputFormat 读取路径 path 上的文件。根据提供的 watchType 的不同,source 可能定期(每 interval 毫秒)监控路径上的新数据(watchType 为 FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次当前路径中的数据然后退出(watchType 为 FileProcessingMode.PROCESS_ONCE)。使用 pathFilter,用户可以进一步排除正在处理的文件。
    实现:
    在底层,Flink 将文件读取过程拆分为两个子任务,即 目录监控 和 数据读取。每个子任务都由一个单独的实体实现。监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度和作业的并行度相等。单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于 watchType),找到要处理的文件,将它们划分为 分片,并将这些分片分配给下游 reader。Reader 是将实际获取数据的角色。每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。
    ::: tip 重要提示:
    1.如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,当一个文件被修改时,它的内容会被完全重新处理。这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的所有内容。

2.如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,source 扫描一次路径然后退出,无需等待 reader 读完文件内容。当然,reader 会继续读取数据,直到所有文件内容都读完。关闭 source 会导致在那之后不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。
:::

2、基于套接字

socketTextStream - 从套接字读取。元素可以由分隔符分隔。

3、基于集合

  • fromCollection(Collection) - 从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。

  • fromCollection(Iterator, Class) - 从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。

  • fromElements(T …) - 从给定的对象序列中创建数据流。所有的对象必须属于同一类型。

  • fromParallelCollection(SplittableIterator, Class) - 从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型。

  • generateSequence(from, to) - 基于给定间隔内的数字序列并行生成数据流。

四、详细代码

::: tip
项目4 全部数据源应用代码
:::

本文含有隐藏内容,请 开通VIP 后查看