2.生成Transformation

发布于:2024-12-18 ⋅ 阅读:(149) ⋅ 点赞:(0)

目录

前言

Source

FlatMap

KeyBy

sum

总结


前言

以下面的WordCount为例

package com.wlh.p1;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        //
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.socketTextStream("localhost", 7777)
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        String[] split = s.split(" ");
                        for (String word : split) {
                            collector.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                        return stringIntegerTuple2.f0;
                    }
                })
                .sum(1)
                .print();

        //
        env.execute();

    }
}

上面是一个简单的WordCount程序,理解Flink之前,需要理解好Flink中的一些核心抽象概念

如下图所示,主要为3个:

(1)Transformation

(2)StreamOperator

(3)User-Defined Function

Transformation指的是在DateStream之间转换的操作,比如上面WordCount例子中的flatMap,它其实就对应着一个Transformation,表示从某个DataStream转换为另一个DataStream对应的Transformation。

以WordCount为例,先看一下对应的transformations,上述任务对应的transformations是一个list,list包含3个元素,但是元素对应的transformation的id是2/4/5。具体这些transformation是如何产生的是本文的重点。

Source

我们根据代码一步步跟进看一下

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

当获取到流式执行环境后(如果在本地获取的是Local的执行环境),StreamExecutionEnvironment中会存在一个成员变量transformations,初始化为空集合。

env.socketTextStream("localhost", 7777)

跟进代码后,

可以看到new SocketTextStreamFunction

这就是上面说的User-Defined Function,跟进该方法,可以看到确实是继承了Function接口

跟进addSource(

        new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), "Socket Stream");

这里有一个比较关键的参数Boundedness.CONTINUOUS_UNBOUNDED

表示该source是一个无界流,事实也是如此,socket流当然是无界的。

跟进 addSource(function, sourceName, typeInfo, Boundedness.CONTINUOUS_UNBOUNDED);

注意TypeInformation<OUT> resolvedTypeInfo =

        getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);

TypeInformation是Flink中类型系统的核心类,该方法的内部逻辑是通过java提供的Type系统来提取的。debug跟进一下类型的提取

baseClass是SourceFunction;clazz是具体的实现类SocketTextStreamFunction。通过new TypeExtractor()对象提取该function具体的输出类型信息。

跟进new TypeExtractor()

        .privateCreateTypeInfo(baseClass, clazz, returnParamPos, in1Type, in2Type);查看具体的类型提取代码,通过反射获取,关于反射获取具体的类型,自行学习了解。

获取到SocketTextStreamFunction的输出类型后,继续跟进 addSource(function, sourceName, typeInfo, Boundedness.CONTINUOUS_UNBOUNDED);

刚刚已经获取输出类型之后,继续跟进后续代码

boolean isParallel = function instanceof ParallelSourceFunction; // 判断该source是不是可以并行的source,很明显这里的isParallel是false

clean(function); // 该行在做闭包清理/检查。如果不通过会报类似异常"Object " + obj + " is not serializable"

final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function); 

通过StreamSource的继承关系可以看出,StreamSource其实是开头提到的二号人物StreamOperator,至此,User-Defined Function和StreamOperator都已经出现了,并且它们的关系是StreamOperator中包含User-Defined Function,和开头图示一致。

return new DataStreamSource<>(

        this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);

最终,构建了Transformation,并且将operator传入,至此Transformation、StreamOperator、User-Defined Function都已经出现在视野中。

注意:在看DataStreamSource的继承关系时,会看到它继承自SingleOutputStreamOperator,这里是我觉得Flink命名不太好的地方,SingleOutputStreamOperator会被误认为是StreamOperator,但其实不是,SingleOutputStreamOperator是继承自DataStream的,并且在注释中明确说明SingleOutputStreamOperator是transformation。

跟进DataStream,会看到DataStream中封装了environment环境和通过env.socketTextStream("localhost", 7777)定义的第一个transformation。后面的api操作,如flatMap等,都是基于该DataStream进行操作了。

FlatMap

在上面DataStream的基础上将后续的api都介绍一下,跟进.flatMap

形参flatMapper,即为用户在编程时自定义的function,代码逻辑很清晰,依然是先获取输出的类型信息

跟进return flatMap(flatMapper, outType);

StreamFlatMap是StreamOperator的子类

跟进return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));

跟进return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));

首先new OneInputTransformation,创建一个新的Transformation,在构建的时候,传入了this.transformation,就是上面的LegacySourceTransformation。

new SingleOutputStreamOperator(environment, resultTransform); 创建一个新的DataStream,作为这一步API操作的返回值。

跟进getExecutionEnvironment().addOperator(resultTransform);

将transformation添加进env环境的transformations集合中,这个集合在未来会遍历生成StreamGraph。

KeyBy

在上面DataStream的基础上将后续的api都介绍一下,跟进.keyBy

直接new KeyedStream,跟进

获取key的类型信息,继续跟进构造方法

new PartitionTransformation创建了一个Transformation,将当前Datastream的Transformation作为PartitionTransformation的输入,并且将用户自定义的keySelector封装进KeyGroupStreamPartitioner。

继续跟进后,由于KeyedStream继承自DataStream,同样的,将env和当前的transformation封装进去。

至此KeyedStream构建完成,它的内容如下,

sum

在上面KeyedStream的基础上,继续跟进代码

 SumAggregator<T> extends AggregationFunction<T> 

SumAggerator就是User-Defined Function

跟进return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));

方法的签名为AggeragationFunction

跟进return reduce(aggregate).name("Keyed Aggregation");

这里出现了transformation

getExecutionEnvironment().addOperator(reduce); 这一行代码在之前说过,将该transformation加入到env的transformations中。

这里重点来看一下ReduceTransformation

构造方法如下:包含了聚合算子必须的一些信息,reducer是聚合的函数,input是之前的transformation,keySelector是分组key的提取方式。

值得注意的一行代码updateManagedMemoryStateBackendUseCase(true);

这里是在设置状态后端,这里第一次提到了状态的概念,状态是Flink得以流行的重要原因之一,有状态的流式计算。

print

这里是WordCount例子中的最后一步了,和前面的算子都非常类似,看到这里应该是可以举一反三了。

熟悉的味道,创建Function

创建StreamOperator

在创建DateStream时,创建了transformation

总结

本文介绍了Flink是如何将用户的api转换为Transformation,这是Flink的核心抽象,DataStream是面向用户的,Transformation并不面向用户,在Flink触发执行时,transformation会被Flink转换为更贴近底层执行的各种有向无环图,即常说的DAG。


网站公告

今日签到

点亮在社区的每一天
去签到