1. 为什么是流:有界 vs 无界
- 批处理(有界):数据“先到齐,再处理”。你能做全量排序、全局统计和“最终报告”。
- 流处理(无界):数据“永不停止”,必须边到边算。
- 现实世界的数据(点击、交易、传感器)天生是流。能统一跑实时与回放(重处理历史)且产出一致结果,是现代数据系统的关键诉求。
在 Flink 中,一切皆是流式数据流(streaming dataflow):从一个或多个 Source 流入,经一系列 Operator 转换,流向一个或多个 Sink。
2. Flink 程序模型与数据流图
一个 Flink 作业可视为有向图:节点是算子(map/filter/keyBy/window/process),边是数据流。常见来源/去向:
- Source:Kafka、Kinesis、文件、数据库 CDC、对象存储等
- Sink:Kafka、Elastic、Iceberg/Hudi、JDBC、Data Warehouse、OLAP 引擎等
示意(Mermaid):
实践中,程序中的“一个转换”可能会在运行时被优化为“多个算子”。
3. 并行与数据重分发:one-to-one vs redistributing
并行度(parallelism):同一算子可有 N 个子任务(subtask),各自独立、不同线程/机器运行。
one-to-one:保序、分区不变(如
Source -> map()
)。redistributing:改变分区/路由(如
keyBy()
按 Key 哈希分发,rebalance()
随机打散,broadcast()
广播)。- 注意:重分发后,仅发送子任务与接收子任务这一对之间的相对顺序被保留;跨 Key 的全局顺序不可保证。
4. 事件时间与水位线:让“时间”说了算
处理时间(Processing Time):算子机器的时钟。简单但不稳定。
事件时间(Event Time):事件本身携带的时间戳。可按真实发生顺序计算,适合乱序、重放场景。
水位线(Watermark):系统对“截至某时刻,应该已经看到所有早于它的事件”的一种进度宣告。
- 常见策略:有界乱序(允许最大延迟,如 5s),或基于观测的 自适应 策略。
要点:用事件时间 + 合理水位线,才能在“实时 + 历史重放”两种模式下得到一致、可复算的结果。
5. 有状态流处理:本地状态、KeyBy 与算子并行实例
- Flink 的很多算子是有状态的:当前事件的处理依赖于之前所有事件的累积影响。
keyBy()
将同一 Key 的事件路由到同一并行子任务,从而让每个子任务维护“自己那份 Key 的本地状态”。- 本地状态(JVM 堆或托管到 RocksDB 等持久化结构)带来高吞吐、低延迟。
- 从系统视角看,有状态算子的并行实例集合,像一个分片的 KV 存储。
6. 容错与恰好一次:检查点与回放
检查点(Checkpoint):周期性、异步地捕获作业全局状态(包含 Source 的偏移和各算子本地状态)。
失败恢复:从最近检查点恢复状态,同时让 Source 回退到检查点时的偏移,继续处理。
Exactly-Once 语义:对状态更新与Source 偏移的一致快照 + 对 Sink 的幂等/两阶段提交支持,可实现“计算端恰好一次”。
- 端到端 Exactly-Once 还取决于外部系统的事务/幂等等支持(如 Kafka 事务写、支持两阶段提交的 JDBC/文件写入等)。
示意(Mermaid):
7. 实战一:实时订单统计(Java DataStream)
场景:从 Kafka 消费订单事件,按用户
userId
的事件时间做滚动窗口汇总(1 分钟订单数与金额),同时打开检查点实现容错与“恰好一次”。
依赖(Maven 核心):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
示例代码(精简版):
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Duration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
public class OrdersJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1) 开启检查点(每 10s,一次性语义)
env.enableCheckpointing(10_000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5_000);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-ckpt"); // 示例:本地存储
// 2) Kafka Source
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("orders")
.setGroupId("orders-consumer")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> raw = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-orders");
// 3) 解析 JSON 并抽取事件时间(假设字段 ts 为毫秒)
DataStream<Order> orders = raw
.map(Order::fromJson) // 自行实现:String -> Order(userId, amount, ts)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((e, ts) -> e.ts)
);
// 4) 按 userId 做 1 分钟滚动窗口统计
DataStream<UserAgg> result = orders
.keyBy(o -> o.userId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(
(a, b) -> new Order(a.userId, a.amount + b.amount, Math.max(a.ts, b.ts)),
(key, window, it, out) -> {
Order agg = it.iterator().next();
out.collect(new UserAgg(key.getKey(), window.getStart(), window.getEnd(), 1, agg.amount));
}
);
// 5) Sink(示例打印;生产环境请用支持事务/幂等的 Sink)
result.print();
env.execute("orders-1min-aggregation");
}
static class Order {
public String userId;
public double amount;
public long ts;
static Order fromJson(String s) { /* TODO: 解析实现 */ return null; }
public Order() {}
public Order(String userId, double amount, long ts) {
this.userId = userId; this.amount = amount; this.ts = ts;
}
}
static class UserAgg {
public String userId;
public long windowStart;
public long windowEnd;
public long cnt;
public double amount;
public UserAgg(String userId, long ws, long we, long cnt, double amount) {
this.userId = userId; this.windowStart = ws; this.windowEnd = we; this.cnt = cnt; this.amount = amount;
}
@Override public String toString() {
return String.format("user=%s, win=[%d,%d), cnt=%d, amount=%.2f",
userId, windowStart, windowEnd, cnt, amount);
}
}
}
要点回顾
- 事件时间 + 有界乱序 5s 水位线。
keyBy(userId)
将同一用户的事件路由到同一子任务,窗口聚合依赖本地状态。- 打开 EXACTLY_ONCE 检查点;生产环境搭配事务性 Sink获得端到端“恰好一次”。
8. 实战二:Flink SQL 统计 10 分钟 UV
场景:统计近 10 分钟的去重访客数(UV),按事件时间滑动。
-- 1) 定义 Kafka 源(简化示意)
CREATE TABLE page_view (
user_id STRING,
url STRING,
ts BIGINT,
WATERMARK FOR ts_rowtime AS ts_rowtime - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'pv',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
-- 2) 计算 10 分钟滑动 UV,每 1 分钟出一次结果
SELECT
window_start,
window_end,
COUNT(DISTINCT user_id) AS uv
FROM TABLE(
TUMBLE(TABLE page_view, DESCRIPTOR(ts_rowtime), INTERVAL '10' MINUTES)
)
GROUP BY window_start, window_end;
说明
- 通过
WATERMARK
声明事件时间列,设定乱序容忍度。 - 使用窗口 TVF 进行聚合,语义清晰、易于维护。
9. 常见坑与最佳实践清单
端到端 Exactly-Once ≠ 仅开启检查点
- Source 偏移 + 算子状态 + Sink 写入三者需要原子一致。选择支持两阶段提交/事务/幂等的 Sink(如 Kafka 事务、支持 XA/2PC 的 JDBC Sink、文件写入的临时+原子 rename 策略等)。
水位线过于保守或激进
- 过小:容忍不了真实乱序,导致早关窗、丢失迟到数据;过大:延迟增大、状态胀大。基于数据分布观测合理设置。
Key 倏地倾斜
- 热 Key 导致单个子任务背压。方案:拆分热 Key、基于“二级 Key + 后聚合”的 Key 拆分(salting),或使用分层聚合。
状态体量不可控
- 定期 TTL、合理的窗口策略、布隆过滤/近似去重结构、外部维表缓存策略;必要时选择 RocksDB 状态后端并调优。
反压与资源
- 监控背压链路、合理设置并行度与重分发策略;Source 拉取速率、Sink 刷写批次、网络缓冲都影响端到端吞吐。
历史重放一致性
- 必须使用事件时间语义 + 稳定的水位线策略;避免处理时间驱动的逻辑影响可复算性。
10. 进一步学习与参考路线
优先打牢四件事:事件时间、水位线、有状态算子、检查点。
实践优先:先跑通一个 Kafka→Flink→Sink 的端到端流水线,再逐步引入窗口、状态与容错。
升级路径:
- 单流 ETL/聚合 →
- 多流 Join/会话窗口 →
- 事务性 Sink 与端到端恰好一次 →
- 复杂事件驱动(ProcessFunction/CEP)与侧输出流 →
- 生产级部署(资源隔离、HA、监控告警、滚动升级、Savepoint 运维)。