Apache Flink 从流处理基础到恰好一次语义

发布于:2025-09-13 ⋅ 阅读:(16) ⋅ 点赞:(0)

1. 为什么是流:有界 vs 无界

  • 批处理(有界):数据“先到齐,再处理”。你能做全量排序、全局统计和“最终报告”。
  • 流处理(无界):数据“永不停止”,必须边到边算
  • 现实世界的数据(点击、交易、传感器)天生是。能统一跑实时与回放(重处理历史)且产出一致结果,是现代数据系统的关键诉求。

在 Flink 中,一切皆是流式数据流(streaming dataflow):从一个或多个 Source 流入,经一系列 Operator 转换,流向一个或多个 Sink

在这里插入图片描述

2. Flink 程序模型与数据流图

一个 Flink 作业可视为有向图:节点是算子(map/filter/keyBy/window/process),边是数据流。常见来源/去向:

  • Source:Kafka、Kinesis、文件、数据库 CDC、对象存储等
  • Sink:Kafka、Elastic、Iceberg/Hudi、JDBC、Data Warehouse、OLAP 引擎等

示意(Mermaid):

keyBy
Kafka / Kinesis / Files
Source
Map/Filter
Window/Aggregation
Process/Enrich
Sink: DW/ES/Kafka/DB

实践中,程序中的“一个转换”可能会在运行时被优化为“多个算子”。

3. 并行与数据重分发:one-to-one vs redistributing

  • 并行度(parallelism):同一算子可有 N 个子任务(subtask),各自独立、不同线程/机器运行。

  • one-to-one:保序、分区不变(如 Source -> map())。

  • redistributing:改变分区/路由(如 keyBy() 按 Key 哈希分发,rebalance() 随机打散,broadcast() 广播)。

    • 注意:重分发后,仅发送子任务与接收子任务这一对之间的相对顺序被保留;跨 Key 的全局顺序不可保证。

4. 事件时间与水位线:让“时间”说了算

  • 处理时间(Processing Time):算子机器的时钟。简单但不稳定。

  • 事件时间(Event Time):事件本身携带的时间戳。可按真实发生顺序计算,适合乱序、重放场景。

  • 水位线(Watermark):系统对“截至某时刻,应该已经看到所有早于它的事件”的一种进度宣告

    • 常见策略:有界乱序(允许最大延迟,如 5s),或基于观测的 自适应 策略。

要点:用事件时间 + 合理水位线,才能在“实时 + 历史重放”两种模式下得到一致可复算的结果。

5. 有状态流处理:本地状态、KeyBy 与算子并行实例

  • Flink 的很多算子是有状态的:当前事件的处理依赖于之前所有事件的累积影响
  • keyBy() 将同一 Key 的事件路由到同一并行子任务,从而让每个子任务维护“自己那份 Key 的本地状态”。
  • 本地状态(JVM 堆或托管到 RocksDB 等持久化结构)带来高吞吐、低延迟
  • 从系统视角看,有状态算子的并行实例集合,像一个分片的 KV 存储

6. 容错与恰好一次:检查点与回放

  • 检查点(Checkpoint):周期性、异步地捕获作业全局状态(包含 Source 的偏移和各算子本地状态)。

  • 失败恢复:从最近检查点恢复状态,同时让 Source 回退到检查点时的偏移,继续处理。

  • Exactly-Once 语义:对状态更新Source 偏移的一致快照 + 对 Sink 的幂等/两阶段提交支持,可实现“计算端恰好一次”。

    • 端到端 Exactly-Once 还取决于外部系统的事务/幂等等支持(如 Kafka 事务写、支持两阶段提交的 JDBC/文件写入等)。

示意(Mermaid):

Source (Kafka Offset) Stateful Operator (Keyed State) Checkpoint Storage Sink Async snapshot(Keyed State) Save offsets loop [every N seconds] Failure happens Fail Restore last snapshot Restore offsets Replay from restored offset Resume processing (exactly-once) Source (Kafka Offset) Stateful Operator (Keyed State) Checkpoint Storage Sink

7. 实战一:实时订单统计(Java DataStream)

场景:从 Kafka 消费订单事件,按用户 userId 的事件时间做滚动窗口汇总(1 分钟订单数与金额),同时打开检查点实现容错与“恰好一次”。

依赖(Maven 核心)

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka</artifactId>
  <version>${flink.version}</version>
</dependency>

示例代码(精简版)

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Duration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

public class OrdersJob {
  public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 1) 开启检查点(每 10s,一次性语义)
    env.enableCheckpointing(10_000, CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5_000);
    env.setStateBackend(new HashMapStateBackend());
    env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-ckpt"); // 示例:本地存储

    // 2) Kafka Source
    KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers("localhost:9092")
        .setTopics("orders")
        .setGroupId("orders-consumer")
        .setStartingOffsets(OffsetsInitializer.latest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();

    DataStream<String> raw = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-orders");

    // 3) 解析 JSON 并抽取事件时间(假设字段 ts 为毫秒)
    DataStream<Order> orders = raw
        .map(Order::fromJson) // 自行实现:String -> Order(userId, amount, ts)
        .assignTimestampsAndWatermarks(
            WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
              .withTimestampAssigner((e, ts) -> e.ts)
        );

    // 4) 按 userId 做 1 分钟滚动窗口统计
    DataStream<UserAgg> result = orders
        .keyBy(o -> o.userId)
        .window(TumblingEventTimeWindows.of(Time.minutes(1)))
        .reduce(
          (a, b) -> new Order(a.userId, a.amount + b.amount, Math.max(a.ts, b.ts)),
          (key, window, it, out) -> {
            Order agg = it.iterator().next();
            out.collect(new UserAgg(key.getKey(), window.getStart(), window.getEnd(), 1, agg.amount));
          }
        );

    // 5) Sink(示例打印;生产环境请用支持事务/幂等的 Sink)
    result.print();

    env.execute("orders-1min-aggregation");
  }

  static class Order {
    public String userId;
    public double amount;
    public long ts;

    static Order fromJson(String s) { /* TODO: 解析实现 */ return null; }

    public Order() {}
    public Order(String userId, double amount, long ts) {
      this.userId = userId; this.amount = amount; this.ts = ts;
    }
  }

  static class UserAgg {
    public String userId;
    public long windowStart;
    public long windowEnd;
    public long cnt;
    public double amount;

    public UserAgg(String userId, long ws, long we, long cnt, double amount) {
      this.userId = userId; this.windowStart = ws; this.windowEnd = we; this.cnt = cnt; this.amount = amount;
    }

    @Override public String toString() {
      return String.format("user=%s, win=[%d,%d), cnt=%d, amount=%.2f",
        userId, windowStart, windowEnd, cnt, amount);
    }
  }
}

要点回顾

  • 事件时间 + 有界乱序 5s 水位线。
  • keyBy(userId) 将同一用户的事件路由到同一子任务,窗口聚合依赖本地状态。
  • 打开 EXACTLY_ONCE 检查点;生产环境搭配事务性 Sink获得端到端“恰好一次”。

8. 实战二:Flink SQL 统计 10 分钟 UV

场景:统计近 10 分钟的去重访客数(UV),按事件时间滑动。

-- 1) 定义 Kafka 源(简化示意)
CREATE TABLE page_view (
  user_id STRING,
  url STRING,
  ts BIGINT,
  WATERMARK FOR ts_rowtime AS ts_rowtime - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'pv',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset'
);

-- 2) 计算 10 分钟滑动 UV,每 1 分钟出一次结果
SELECT
  window_start,
  window_end,
  COUNT(DISTINCT user_id) AS uv
FROM TABLE(
  TUMBLE(TABLE page_view, DESCRIPTOR(ts_rowtime), INTERVAL '10' MINUTES)
)
GROUP BY window_start, window_end;

说明

  • 通过 WATERMARK 声明事件时间列,设定乱序容忍度。
  • 使用窗口 TVF 进行聚合,语义清晰、易于维护。

9. 常见坑与最佳实践清单

  1. 端到端 Exactly-Once ≠ 仅开启检查点

    • Source 偏移 + 算子状态 + Sink 写入三者需要原子一致。选择支持两阶段提交/事务/幂等的 Sink(如 Kafka 事务、支持 XA/2PC 的 JDBC Sink、文件写入的临时+原子 rename 策略等)。
  2. 水位线过于保守或激进

    • 过小:容忍不了真实乱序,导致早关窗、丢失迟到数据;过大:延迟增大、状态胀大。基于数据分布观测合理设置。
  3. Key 倏地倾斜

    • 热 Key 导致单个子任务背压。方案:拆分热 Key、基于“二级 Key + 后聚合”的 Key 拆分(salting),或使用分层聚合。
  4. 状态体量不可控

    • 定期 TTL、合理的窗口策略、布隆过滤/近似去重结构、外部维表缓存策略;必要时选择 RocksDB 状态后端并调优。
  5. 反压与资源

    • 监控背压链路、合理设置并行度与重分发策略;Source 拉取速率、Sink 刷写批次、网络缓冲都影响端到端吞吐。
  6. 历史重放一致性

    • 必须使用事件时间语义 + 稳定的水位线策略;避免处理时间驱动的逻辑影响可复算性。

10. 进一步学习与参考路线

  • 优先打牢四件事:事件时间、水位线、有状态算子、检查点。

  • 实践优先:先跑通一个 Kafka→Flink→Sink 的端到端流水线,再逐步引入窗口、状态与容错。

  • 升级路径

    1. 单流 ETL/聚合 →
    2. 多流 Join/会话窗口 →
    3. 事务性 Sink 与端到端恰好一次 →
    4. 复杂事件驱动(ProcessFunction/CEP)与侧输出流 →
    5. 生产级部署(资源隔离、HA、监控告警、滚动升级、Savepoint 运维)。

网站公告

今日签到

点亮在社区的每一天
去签到