在 Apache Flink 中,并行度(Parallelism) 是控制任务并发执行的核心参数之一。Flink 提供了 多个层级设置并行度的方式,优先级从高到低如下:
🧩 一、Flink 并行度的四个设置层级
层级 |
描述 |
设置方式 |
Operator Level |
为某个具体的算子设置并行度 |
operator.setParallelism(n) |
Execution Environment Level |
为整个流处理环境设置默认并行度 |
env.setParallelism(n) |
Client Level(提交作业时) |
通过命令行指定全局并行度 |
flink run -p n |
System Level(系统配置) |
在 flink-conf.yaml 中定义全局默认值 |
parallelism.default: n |
✅ 二、各层级设置详解与示例
1. Operator Level(算子级别)
- 优先级最高
- 可以为特定算子设置不同并行度,适用于数据倾斜或资源敏感操作
🔧 示例:
DataStream<String> stream = env.fromElements("a", "b", "c");
stream.map(new MyMapFunction())
.setParallelism(4)
.print();
✅ 适用场景:
- 某个算子计算密集,需要更多资源
- 数据源分区数较少,但后续算子可并行化处理
2. Execution Environment Level(执行环境级别)
- 设置整个 Job 的默认并行度
- 如果未对某些算子单独设置,并使用此值
🔧 示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataStream<String> stream = env.fromElements("a", "b", "c");
stream.map(new MyMapFunction()).print();
✅ 适用场景:
3. Client Level(客户端提交作业时)
- 使用命令行参数动态设置并行度
- 不修改代码即可适配不同运行环境(如测试/生产)
🔧 示例:
flink run -p 4 -c com.example.MyJob ./myjob.jar
✅ 适用场景:
4. System Level(系统级别)
- 在
flink-conf.yaml
中设置全局默认并行度
- 对所有提交的作业生效(除非被更高级别覆盖)
🔧 示例(flink-conf.yaml
):
parallelism.default: 4
✅ 适用场景:
📊 三、并行度优先级对比表
设置方式 |
是否推荐 |
场景 |
覆盖关系 |
Operator Level |
✅✅✅ |
特定算子优化 |
最高优先级 |
Execution Environment Level |
✅✅ |
整体统一配置 |
被 Operator 覆盖 |
Client Level (-p) |
✅ |
动态部署 |
被前两者覆盖 |
System Level (flink-conf.yaml) |
⚠️ |
兜底默认值 |
最低优先级 |
💡 四、并行度设置建议
✅ 推荐做法:
- 开发/测试环境:使用
.setParallelism()
或 -p
命令行设置较小值(如1~4)
- 生产环境:
- 使用
flink-conf.yaml
设置基础并行度
- 使用
env.setParallelism()
明确控制默认值
- 为关键算子单独设置更高并行度(如窗口聚合、复杂逻辑)
⚙️ 示例组合:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
.setParallelism(8)
.map(new MyMapFunction())
.keyBy(keySelector)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new MyProcessWindowFunction())
.print();
🧠 五、并行度与资源的关系
并行度 |
TaskManager 数量 |
Slot 数量 |
资源要求 |
≤ TM × slot |
✅ 正常运行 |
✅ 正常运行 |
资源充足 |
> TM × slot |
❌ 无法启动 |
❌ 无法启动 |
资源不足 |
✅ 建议:确保总并行度 ≤ 总 slot 数量
📈 六、实际调优建议
场景 |
建议设置 |
Kafka Source |
并行度 = Kafka Topic 分区数 |
Map / FlatMap |
根据 CPU 利用率设置 |
Keyed Window Aggregation |
可适当提高并行度提升吞吐 |
Join / CoGroup |
视数据分布决定是否提高并行度 |
Sink |
若写入慢可适当增加并行度 |
✅ 七、完整示例(Java + Shell)
Java 设置(Env + Operator):
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.fromElements("a", "b", "c")
.map(x -> x)
.setParallelism(2)
.print();
env.execute("Parallelism Example");
Shell 设置(Client Level):
flink run -p 8 -c com.example.MyJob ./myjob.jar
✅ 八、总结
层级 |
用途 |
是否推荐使用 |
Operator Level |
控制单个算子并行度 |
✅✅✅ 强烈推荐用于关键路径优化 |
Execution Environment Level |
设置默认并行度 |
✅✅ 推荐作为基础配置 |
Client Level |
动态设置并行度 |
✅ 适合多环境部署 |
System Level |
全局兜底配置 |
⚠️ 推荐配合其他方式使用 |