Flink Stream API 源码走读 - map 和 flatMap

发布于:2025-08-16 ⋅ 阅读:(18) ⋅ 点赞:(0)

概述

本文深入分析了 Flink 中 map()flatMap() 方法的源码实现,展示了从 Function 到 Operator 再到 Transformation 的完整转换流程。

前置知识回顾

DataStream 的核心结构

public class DataStream<T> {
    protected final StreamExecutionEnvironment environment;  // 执行环境
    protected final Transformation<T> transformation;        // 转换操作
}

重要理解:

  • 每个 DataStream 都包含一个 Transformation
  • Transformation 持有上一个 Transformation 的引用,形成链条
  • 执行环境 environment 在整个调用链中保持不变

map() 方法源码分析

1. map 方法的重载链

// 用户调用入口
dataStream.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) throws Exception {
        return value.length();
    }
})
map(MapFunction<T,R> mapper)
抽取返回类型
TypeExtractor.getMapReturnTypes()
map(mapper, outType)
transform('Map', outType, new StreamMap(mapper))

2. 类型信息抽取

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
    // 1. 抽取输出类型信息
    TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(
        clean(mapper), getType(), Utils.getCallLocationName(), true);
    
    // 2. 调用重载方法
    return map(mapper, outType);
}

关键点:

  • clean(mapper) - 对用户函数进行序列化检查
  • TypeExtractor.getMapReturnTypes() - 抽取 MapFunction 的返回类型
  • 解决 Java 泛型擦除问题

3. Function → Operator 转换

public <R> SingleOutputStreamOperator<R> map(
        MapFunction<T, R> mapper, TypeInformation<R> outputType) {
    // 将 MapFunction 封装成 StreamMap Operator
    return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}

转换过程:

  • MapFunction (用户逻辑) → StreamMap (算子)
  • StreamMap 继承自 AbstractUdfStreamOperator
  • 这是第二个核心概念:Operator(算子)

4. transform 方法分析

public <R> SingleOutputStreamOperator<R> transform(
        String operatorName,
        TypeInformation<R> outTypeInfo, 
        OneInputStreamOperator<T, R> operator) {
    
    // 将 Operator 包装成 OperatorFactory
    return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}

重要概念:

  • SimpleOperatorFactory - 简单算子工厂
  • 算子工厂是对算子的进一步包装
  • 提供 getOperator()createStreamOperator() 方法

5. doTransform 核心逻辑

protected <R> SingleOutputStreamOperator<R> doTransform(
        String operatorName,
        TypeInformation<R> outTypeInfo,
        StreamOperatorFactory<R> operatorFactory) {
    
    // 1. 创建 OneInputTransformation
    OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
        this.transformation,    // 上一个 transformation
        operatorName,          // 算子名称 "Map"
        operatorFactory,       // 算子工厂
        outTypeInfo,          // 输出类型
        environment.getParallelism(),  // 并行度
        false                 // 是否并行配置
    );
    
    // 2. 创建新的 DataStream
    SingleOutputStreamOperator<R> returnStream = 
        new SingleOutputStreamOperator(environment, resultTransform);
    
    // 3. 将 transformation 添加到执行环境
    getExecutionEnvironment().addOperator(resultTransform);
    
    return returnStream;
}

核心概念转换流程

MapFunction
(用户逻辑)
StreamMap
(算子)
SimpleOperatorFactory
(算子工厂)
OneInputTransformation
(单输入转换)
SingleOutputStreamOperator
(新的DataStream)

转换详解

  1. Function → Operator

    MapFunction<String, Integer> mapper = ...;
    StreamMap<String, Integer> operator = new StreamMap<>(mapper);
    
  2. Operator → OperatorFactory

    SimpleOperatorFactory<Integer> factory = SimpleOperatorFactory.of(operator);
    
  3. OperatorFactory → Transformation

    OneInputTransformation<String, Integer> transformation = 
        new OneInputTransformation<>(this.transformation, "Map", factory, outType, parallelism, false);
    
  4. Transformation → DataStream

    SingleOutputStreamOperator<Integer> result = 
        new SingleOutputStreamOperator<>(environment, transformation);
    

OneInputTransformation 详解

为什么叫 OneInput?

public class OneInputTransformation<IN, OUT> extends Transformation<OUT> {
    private final Transformation<IN> input;  // 只有一个输入
    // ...
}

命名含义:

  • OneInput - 表示只有一个输入流
  • 对应的还有 TwoInputTransformation (如 join、union 操作)
  • 体现了不同算子的输入特性

Transformation 链条

SourceTransformation
(socketTextStream)
OneInputTransformation
(map)
OneInputTransformation
(flatMap)
OneInputTransformation
(filter)
SinkTransformation
(print)

flatMap() 方法源码分析

1. flatMap 与 map 的相似性

public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
    // 1. 抽取输出类型
    TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(
        clean(flatMapper), getType(), Utils.getCallLocationName(), true);
    
    // 2. 调用重载方法
    return flatMap(flatMapper, outType);
}

public <R> SingleOutputStreamOperator<R> flatMap(
        FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
    // 将 FlatMapFunction 封装成 StreamFlatMap Operator
    return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}

2. 与 map 的差异

特性 map flatMap
函数接口 MapFunction<T, R> FlatMapFunction<T, R>
算子实现 StreamMap StreamFlatMap
输出特性 1对1映射 1对多映射
算子名称 “Map” “Flat Map”

3. 相同的处理流程

FlatMapFunction
StreamFlatMap
SimpleOperatorFactory
OneInputTransformation
SingleOutputStreamOperator

流程一致性:

  • 都经过相同的 doTransform 方法
  • 都创建 OneInputTransformation
  • 都返回 SingleOutputStreamOperator

执行环境的 Transformation 管理

addOperator 方法

// 在 doTransform 中调用
getExecutionEnvironment().addOperator(resultTransform);
// StreamExecutionEnvironment 中的实现
public class StreamExecutionEnvironment {
    private final List<Transformation<?>> transformations = new ArrayList<>();
    
    public void addOperator(Transformation<?> transformation) {
        transformations.add(transformation);
    }
}

重要作用:

  • 将每个 Transformation 添加到环境的列表中
  • 为后续生成 JobGraph 做准备
  • 形成完整的 Transformation 树

命名问题的吐槽

容易混淆的命名

  1. SingleOutputStreamOperator

    // 实际上是个 DataStream,不是 Operator!
    public class SingleOutputStreamOperator<T> extends DataStream<T>
    
  2. addOperator vs addTransformation

    // 方法名叫 addOperator,实际添加的是 Transformation
    environment.addOperator(transformation);  // 应该叫 addTransformation
    

建议:

  • 忽略这些命名问题,理解本质
  • SingleOutputStreamOperator 就是 DataStream
  • 重点关注概念转换流程

链式调用的实现

返回值分析

DataStream<String> source = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Integer> mapped = source.map(...);      // 返回 DataStream
SingleOutputStreamOperator<String> flatMapped = mapped.flatMap(...); // 返回 DataStream
KeyedStream<String, String> keyed = flatMapped.keyBy(...);          // 返回 KeyedStream

链式调用原理:

  • 每个操作都返回某种形式的 DataStream
  • SingleOutputStreamOperator 继承自 DataStream
  • 所有 API 方法都定义在 DataStream

为什么叫 SingleOutput?

public class SingleOutputStreamOperator<T> extends DataStream<T>

命名含义:

  • SingleOutput - 表示只有一个输出流
  • 区别于可能有多个输出的算子
  • 体现了算子的输出特性

总结

核心流程回顾

  1. 用户调用 dataStream.map(mapFunction)
  2. 类型抽取 通过 TypeExtractor 获取返回类型
  3. Function→OperatorMapFunction 封装成 StreamMap
  4. Operator→Factory 将算子包装成 SimpleOperatorFactory
  5. Factory→Transformation 创建 OneInputTransformation
  6. Transformation→DataStream 创建新的 SingleOutputStreamOperator
  7. 环境管理Transformation 添加到执行环境

设计模式体现

  • 装饰器模式: Function → Operator → Factory → Transformation → DataStream
  • 工厂模式: SimpleOperatorFactory 封装算子创建逻辑
  • 建造者模式: 逐步构建复杂的 Transformation 对象

关键技术点

  • 类型安全: 通过 TypeInformation 解决泛型擦除
  • 链式调用: 每个操作返回新的 DataStream
  • 延迟执行: 只构建 Transformation 树,不立即执行
  • 统一抽象: map 和 flatMap 使用相同的处理框架

下节预告

Flink Stream API 源码走读 - keyby


重要提醒:

  • 忽略混淆的命名,关注核心概念
  • SingleOutputStreamOperator 本质就是 DataStream
  • 重点理解 Function → Operator → Transformation → DataStream 的转换流程

网站公告

今日签到

点亮在社区的每一天
去签到