Flink架构概览,Flink DataStream API 的使用,FlinkCDC的使用

发布于:2025-05-22 ⋅ 阅读:(18) ⋅ 点赞:(0)

一、Flink与其他组件的协同

Flink 是一个分布式、高性能、始终可用、准确一次(Exactly-Once)语义的流处理引擎,广泛应用于大数据实时处理场景中。它与 Hadoop 生态系统中的组件可以深度集成,形成完整的大数据处理链路。下面我们从 Flink 的 核心架构 出发,结合与 Hadoop 组件协同方式,详细剖析 Flink 的作用。


1. Flink 核心架构详解

1)架构组件图概览

+-------------------------+
|       Client            |
+-------------------------+
             |
             v
+-------------------------+
|    JobManager (JM)      |  <-- Master 负责调度
+-------------------------+
             |
             v
+-------------------------+
|    TaskManagers (TM)    |  <-- Worker 执行算子任务
+-------------------------+
             |
             v
+-------------------------+
|         Slot            |  <-- 执行资源单位
+-------------------------+

2)核心组件职责

组件 描述
Client 提交作业到 Flink 集群,触发作业执行。
JobManager (JM) 管理作业生命周期,负责调度任务、故障恢复、协调检查点(Checkpoint)等。
TaskManager (TM) 具体执行作业的物理任务(算子),负责数据交换、状态管理等。
Slot TaskManager 内部的资源单位,用于任务部署。每个 TaskManager 有多个 Slot。

3)状态管理与容错

  • Checkpoint/Savepoint:可恢复一致性状态(Exactly Once)

  • State Backend:保存状态(如 RocksDB、FsStateBackend)

  • Recovery:通过重放 Checkpoint 恢复任务


2. Flink 与 Hadoop 各组件的协同关系

Flink 虽然是独立系统,但能与 Hadoop 生态的多个关键组件协同工作,构建完整的大数据平台。

1)与 HDFS(Hadoop Distributed File System)

协同方式 描述
输入源 Flink 可直接读取 HDFS 中的批量数据(如 ORC、Parquet、Text 等格式)
状态后端 Flink Checkpoint/Savepoint 可存储到 HDFS 上,保证高可用与容灾
输出目标 Flink 作业可以将计算结果输出到 HDFS,作为后续离线处理的数据
fs.defaultFS: hdfs://namenode:8020
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints/

2)与 Hive

协同方式 描述
读取表数据 Flink 可通过 Hive Catalog 与 Hive 元数据打通,直接读取 Hive 表
写入表 Flink SQL 可将流式数据写入 Hive(使用 INSERT INTO)
统一元数据 Flink + Hive Catalog 支持表结构共享,便于湖仓一体实践
CREATE CATALOG my_hive WITH (
  'type' = 'hive',
  'hive-conf-dir' = '/etc/hive/conf'
);

3)与 Kafka(实时采集)

协同方式 描述
实时数据源 Flink 通过 Kafka Source 接收实时数据流(如日志、订单等)
下游结果写入 Flink 可将流式计算结果写入 Kafka(供下游消费)
Exactly Once 语义 Flink + Kafka + Checkpoint 可实现端到端的精确一次语义
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
  "topic", new SimpleStringSchema(), properties);
consumer.setStartFromGroupOffsets();
consumer.setCommitOffsetsOnCheckpoints(true);

4)与 HBase(实时查询)

协同方式 描述
维表关联 Flink 可使用 HBase 作为维表进行流批 Join,实时补充维度数据
实时写入 计算结果可实时写入 HBase,支持下游查询系统使用(如用户画像等)
tableEnv.executeSql("CREATE TABLE hbase_dim (...) WITH ('connector' = 'hbase-2.2', ...)");

5)与 YARN

协同方式 描述
资源调度 Flink 可部署在 YARN 上,利用 Hadoop 的资源调度管理能力
Session / Per-Job 模式 支持多租户资源隔离或每个作业独立资源隔离部署
flink run -m yarn-cluster -ynm my-flink-job myjob.jar

6)与 Zookeeper

协同方式 描述
高可用 JobManager 使用 Zookeeper 实现 JobManager 的 leader election
Checkpoint HA 元数据存储 配合 HDFS 存储 Checkpoint 元数据路径
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.storageDir: hdfs://namenode:8020/flink/ha/

3. Flink 的作用总结

模块 Flink 的角色
实时数据处理 核心组件,进行低延迟、高吞吐流处理计算
数据清洗与 ETL 提供强大 SQL / DataStream API 进行多源数据处理与聚合
实时指标计算 支持实时 KPI、UV/PV、订单流等分析
数据湖构建 可作为流式数据入湖的计算引擎(结合 Hudi/Iceberg)
实时监控预警 搭配 Kafka + Prometheus,构建告警与监控系统
实时数仓建设 联合 Kafka + Hive + HDFS + HBase 构建流批一体数仓体系

4. Flink 架构在 Hadoop 平台的实际部署图

                     +-------------+
                     |  Flume/Nginx|
                     +------+------+
                            |
                         Kafka集群
                            |
        +-------------------+--------------------+
        |                                        |
    +---v---+                               +----v----+
    | Flink |--> 清洗 → 维表 Join → 计算    |  Spark  |
    +---+---+                               +----+----+
        |                                        |
+-------v---------+                     +--------v--------+
| HBase/Redis     |                     |  HDFS / Hive    |
+-----------------+                     +-----------------+

二、Flink DataStream API的使用

现在以 Flink DataStream API 为核心,深入剖析一个真实生产场景的 从 Kafka 到 Kafka 的流式处理全流程,包括:

  1. 项目结构与依赖

  2. 数据模型与清洗

  3. 水位线与乱序处理

  4. 异步维表查询(HBase/MySQL/Redis)

  5. 窗口聚合逻辑

  6. 数据下发(Kafka Sink)

  7. 容错机制与 Checkpoint 配置


1. 项目结构与依赖

1)Maven 依赖(pom.xml

<dependencies>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.17.1</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.17.1</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>3.0.1-1.17</version>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.14.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hbase-2.2</artifactId>
    <version>1.17.1</version>
  </dependency>
</dependencies>

2. 数据模型定义

1)订单数据结构(OrderEvent)

public class OrderEvent {
    public String orderId;
    public String userId;
    public String productId;
    public double price;
    public int quantity;
    public long orderTime; // epoch millis
}

2) 商品维度(ProductInfo)

public class ProductInfo {
    public String productId;
    public String categoryId;
    public String productName;
}

3)聚合结果结构(OrderStat)

public class OrderStat {
    public String categoryId;
    public long windowStart;
    public long windowEnd;
    public double totalAmount;
}

3. Kafka Source + JSON 反序列化

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);

KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
        .setBootstrapServers("kafka:9092")
        .setTopics("order_events")
        .setGroupId("flink-consumer")
        .setStartingOffsets(OffsetsInitializer.latest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();

DataStream<OrderEvent> orderStream = env
        .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "KafkaSource")
        .map(json -> new ObjectMapper().readValue(json, OrderEvent.class))
        .returns(OrderEvent.class);

4. 水位线处理(乱序数据支持)

WatermarkStrategy<OrderEvent> watermarkStrategy = WatermarkStrategy
        .<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, ts) -> event.orderTime);

DataStream<OrderEvent> orderStreamWithWM = orderStream
        .assignTimestampsAndWatermarks(watermarkStrategy);

5. 异步维表关联(以 HBase 为例)

使用 AsyncFunction 实现异步查询(支持 Redis/HBase/MySQL)

示例实现:AsyncProductEnrichmentFunction

public class AsyncProductEnrichmentFunction extends RichAsyncFunction<OrderEvent, Tuple2<OrderEvent, ProductInfo>> {

    private transient HBaseClient hBaseClient;

    @Override
    public void open(Configuration parameters) throws Exception {
        hBaseClient = new HBaseClient("hbase.zookeeper.quorum");
    }

    @Override
    public void asyncInvoke(OrderEvent input, ResultFuture<Tuple2<OrderEvent, ProductInfo>> resultFuture) {
        CompletableFuture
            .supplyAsync(() -> hBaseClient.queryProductInfo(input.productId))
            .thenAccept(productInfo -> resultFuture.complete(Collections.singletonList(Tuple2.of(input, productInfo))));
    }

    @Override
    public void close() throws Exception {
        hBaseClient.close();
    }
}

应用异步函数

DataStream<Tuple2<OrderEvent, ProductInfo>> enrichedStream = AsyncDataStream.unorderedWait(
        orderStreamWithWM,
        new AsyncProductEnrichmentFunction(),
        5, TimeUnit.SECONDS, 100
);

6. 按类目 ID 滚动窗口聚合

DataStream<OrderStat> resultStream = enrichedStream
    .map(tuple -> new Tuple3<>(tuple.f1.categoryId, tuple.f0.orderTime, tuple.f0.price * tuple.f0.quantity))
    .returns(Types.TUPLE(Types.STRING, Types.LONG, Types.DOUBLE))
    .assignTimestampsAndWatermarks(
        WatermarkStrategy.<Tuple3<String, Long, Double>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((t, ts) -> t.f1)
    )
    .keyBy(t -> t.f0)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new AggregateFunction<Tuple3<String, Long, Double>, Double, OrderStat>() {
        private long windowStart, windowEnd;
        private String categoryId;

        public Double createAccumulator() { return 0.0; }
        public Double add(Tuple3<String, Long, Double> value, Double acc) {
            categoryId = value.f0;
            return acc + value.f2;
        }
        public OrderStat getResult(Double acc) {
            return new OrderStat(categoryId, windowStart, windowEnd, acc);
        }
        public Double merge(Double acc1, Double acc2) {
            return acc1 + acc2;
        }
    }, new ProcessWindowFunction<OrderStat, OrderStat, String, TimeWindow>() {
        @Override
        public void process(String key, Context context, Iterable<OrderStat> elements, Collector<OrderStat> out) {
            OrderStat stat = elements.iterator().next();
            stat.windowStart = context.window().getStart();
            stat.windowEnd = context.window().getEnd();
            out.collect(stat);
        }
    });

7. 写入 Kafka Sink

KafkaSink<OrderStat> kafkaSink = KafkaSink.<OrderStat>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("order_stats")
        .setValueSerializationSchema(stat -> {
            ObjectMapper mapper = new ObjectMapper();
            return mapper.writeValueAsBytes(stat);
        })
        .build())
    .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .build();

resultStream.sinkTo(kafkaSink);

8.  容错与 HA 配置(关键)

1)Checkpoint 配置

env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode/flink/checkpoints"));

2)高可用配置(flink-conf.yaml)

high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181
state.checkpoints.dir: hdfs://namenode/flink/checkpoints
state.savepoints.dir: hdfs://namenode/flink/savepoints

9. 运行命令(on YARN)

flink run -m yarn-cluster -c com.company.OrderRealtimeJob your-job.jar

10. 监控与排障建议

工具 功能
Flink Web UI 监控 Task、Checkpoint、Watermark
Prometheus 指标采集
Grafana 可视化
AlertManager 告警配置
Savepoint 容错恢复点

三、FlinkCDC实时采集数据入湖

解析Flink CDC(Change Data Capture)在大数据体系中的使用方法,并结合 Kafka、Hudi、Iceberg、Hive、HDFS 等大数据组件,提供一套 可落地、可执行、可扩展的完整集成方案


1. Flink CDC 简介

Flink CDC 是 Apache Flink + Debezium 的组合,用于实时采集 MySQL/PostgreSQL 等数据库的变更数据(INSERT/UPDATE/DELETE),并以 流式方式传递到下游系统(Kafka、Hudi、Iceberg、HBase 等)。


2. 典型架构场景:Flink CDC + Hudi + Hive 实时数据湖方案

           +-------------+               +---------------------+
           | MySQL/Postgres            |                     |
           |    Source DB   +-------->  | Flink CDC Connector |
           +-------------+             |                     |
                                       +----------+----------+
                                                  |
                                                  | Row-level ChangeLog
                                                  v
                                       +----------+----------+
                                       |     Flink Job       |
                                       |   (数据清洗/处理)   |
                                       +----------+----------+
                                                  |
                                                  v
                                       +----------+----------+
                                       | Hudi Sink (Flink)   |
                                       +----------+----------+
                                                  |
                                                  v
                                    +-------------+-------------+
                                    | Hive / Presto / Trino     |
                                    | 实时查询(支持 ACID)      |
                                    +---------------------------+

3. 方案目标

  • 实时采集 MySQL 数据(基于 Binlog)

  • 支持变更(Insert/Update/Delete)语义

  • 数据存入 Hudi 表(支持 MOR/COW 格式)

  • Hive/Presto 端可直接查询


4. 组件版本建议

组件 版本建议
Flink 1.17.x 或 1.18.x
Flink CDC 2.4.1
Debezium 内嵌于 Flink CDC
Hudi 0.13.1+
Hive 2.3.x / 3.1.x
Hadoop/HDFS 3.x

5. 部署准备

1)安装 Kafka(可选)

用于做 CDC 中转(可选,支持 Flink 直接接 Hudi)

2)安装 Hive Metastore + Hadoop HDFS

用于管理 Hudi 表元数据和 HDFS 存储

3)准备 MySQL 源数据库

配置 binlog,设置 binlog_format = ROW,并开启 server_idbinlog_row_image = full


6. 关键配置代码与步骤

1)添加 Maven 依赖

<dependencies>
  <!-- Flink CDC -->
  <dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.4.1</version>
  </dependency>

  <!-- Hudi Sink -->
  <dependency>
    <groupId>org.apache.hudi</groupId>
    <artifactId>hudi-flink-bundle_2.12</artifactId>
    <version>0.13.1</version>
  </dependency>
</dependencies>

2)Flink SQL 示例(CDC → Hudi)

-- 1. 源表:MySQL CDC 表
CREATE TABLE ods_orders (
  id STRING,
  user_id STRING,
  amount DOUBLE,
  ts TIMESTAMP(3),
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'mysql-host',
  'port' = '3306',
  'username' = 'flink',
  'password' = 'flink123',
  'database-name' = 'srm',
  'table-name' = 'orders',
  'scan.startup.mode' = 'initial'
);

-- 2. 目标表:Hudi 表(MOR 模式)
CREATE TABLE dwd_orders (
  id STRING PRIMARY KEY NOT ENFORCED,
  user_id STRING,
  amount DOUBLE,
  ts TIMESTAMP(3)
) PARTITIONED BY (`user_id`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://namenode/data/hudi/dwd_orders',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'write.tasks' = '4',
  'compaction.async.enabled' = 'true',
  'hive_sync.enabled' = 'true',
  'hive_sync.mode' = 'hms',
  'hive_sync.metastore.uris' = 'thrift://hive-metastore:9083',
  'hive_sync.db' = 'ods',
  'hive_sync.table' = 'dwd_orders'
);

-- 3. 实时写入
INSERT INTO dwd_orders
SELECT * FROM ods_orders;

7. 关键功能说明

功能 配置字段 说明
主键变更支持 PRIMARY KEY ... NOT ENFORCED 支持 upsert
增量采集模式 scan.startup.mode = initial 首次全量 + 后续增量
实时 compaction compaction.async.enabled = true MOR 表性能保障
Hive 数据同步 hive_sync.enabled = true Hudi 自动注册 Hive 元数据

8. 整合优化建议

1)多表 CDC 同步统一处理

使用 Flink CDC 的 schema-name.table-name通配符

'database-name' = 'srm',
'table-name' = '.*',

配合 Flink SQL Catalog + Dynamic Table Factory,可实现一拖 N 的多表处理逻辑。


2)增加清洗逻辑(如空值过滤、转换)

SELECT
  id,
  user_id,
  amount * 1.13 AS amount_tax,
  ts
FROM ods_orders
WHERE amount IS NOT NULL;

3)写入 Kafka(替代 Hudi) → 用于事件总线或下游消费

CREATE TABLE kafka_sink (
  id STRING,
  user_id STRING,
  amount DOUBLE,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'topic' = 'ods.orders',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'json',
  'scan.startup.mode' = 'latest-offset'
);

9. Flink CDC 整合场景汇总

场景 描述 推荐组件
实时数据入湖 MySQL → Hudi Flink CDC + Hudi
数据仓库加速 Oracle → Iceberg Flink CDC + Iceberg
数据中台构建 MySQL → Kafka → 多下游 Flink CDC + Kafka
数据回流校验 Kafka → Flink → MySQL Flink SQL + JDBC Sink
DWD建模 ODS → DWD/DWM → ADS Flink SQL + 维表 JOIN

10. 可视化监控

工具 功能
Flink UI Checkpoint、Watermark、吞吐
Prometheus 指标采集
Grafana 监控仪表盘
Hive SQL 查询验证

四、自定义 Flink CDC Job 的完整实现

自定义 Flink CDC Job 的完整实现,采用 Java DataStream API 编写,支持:

  • 多表接入(MySQL 为例)

  • 自定义清洗、转换逻辑

  • 支持写入 Kafka、Hudi、Iceberg 等下游系统

  • 可部署为标准 Flink 应用(flink run 执行)


1. 自定义 Flink CDC Job 场景说明

目标:

  • 从 MySQL 采集订单表 srm.orders

  • 做清洗(如金额换算、字段过滤)

  • 输出到 Hudi 表(或 Kafka/Console)


2. 依赖配置(Maven)

<dependencies>
  <!-- Flink CDC -->
  <dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.4.1</version>
  </dependency>

  <!-- Flink 通用 -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.17.2</version>
  </dependency>

  <!-- 可选:Sink 依赖,如 Kafka、Hudi、Iceberg -->
</dependencies>

3. 完整代码示例:CustomCdcJob.java

public class CustomCdcJob {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 2. 配置 CDC 源:MySQL
        MySqlSource<Order> mysqlSource = MySqlSource.<Order>builder()
                .hostname("mysql-host")
                .port(3306)
                .databaseList("srm")
                .tableList("srm.orders")
                .username("flink")
                .password("flink123")
                .deserializer(new OrderDeserializationSchema()) // 自定义反序列化
                .build();

        // 3. 接入 Source
        DataStreamSource<Order> orderStream = env.fromSource(
                mysqlSource,
                WatermarkStrategy.noWatermarks(),
                "MySQL CDC Source"
        );

        // 4. 数据清洗/转换
        SingleOutputStreamOperator<Order> cleaned = orderStream
                .filter(order -> order.amount > 0)
                .map(order -> {
                    order.amount = order.amount * 1.13; // 加税
                    return order;
                });

        // 5. Sink:控制台 / Kafka / Hudi
        cleaned.print();

        env.execute("Custom Flink CDC Job");
    }
}

4. 自定义反序列化器:OrderDeserializationSchema

public class OrderDeserializationSchema implements DebeziumDeserializationSchema<Order> {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<Order> collector) {
        Struct value = (Struct) sourceRecord.value();
        if (value == null) return;

        Struct after = value.getStruct("after");
        if (after != null) {
            Order order = new Order();
            order.id = after.getString("id");
            order.userId = after.getString("user_id");
            order.amount = after.getFloat64("amount");
            order.ts = Instant.ofEpochMilli(after.getInt64("ts")).atZone(ZoneId.of("UTC")).toLocalDateTime();
            collector.collect(order);
        }
    }

    @Override
    public TypeInformation<Order> getProducedType() {
        return TypeInformation.of(Order.class);
    }
}

5. 定义 POJO 类:Order.java

public class Order implements Serializable {
    public String id;
    public String userId;
    public Double amount;
    public LocalDateTime ts;

    @Override
    public String toString() {
        return String.format("[Order] id=%s, user=%s, amt=%.2f, ts=%s",
                id, userId, amount, ts.toString());
    }
}

6. Sink 可选方案

1)控制台输出(开发调试)

cleaned.print();

2)Kafka Sink(事件总线)

KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
        .setBootstrapServers("kafka:9092")
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                .setTopic("srm.orders.cdc")
                .setValueSerializationSchema(new SimpleStringSchema())
                .build())
        .build();

cleaned.map(order -> JSON.toJSONString(order)).sinkTo(kafkaSink);

3)写入 Hudi 表(通过 Flink Hudi Sink)

cleaned.addSink(HudiSinkUtil.getSink());

自定义 Hudi Sink 工具类可基于 HoodieSink 封装。


七、打包部署方式

1)使用 maven-shade-plugin 打 fat-jar:

mvn clean package -DskipTests

输出:custom-cdc-job-1.0-SNAPSHOT.jar


2)提交到 Flink 集群

flink run -m yarn-cluster -c com.example.CustomCdcJob custom-cdc-job.jar

8. 扩展功能(可选)

功能 实现方式
多表同步 .tableList("srm.orders,srm.invoice")
动态 schema 推导 使用 JsonDebeziumDeserializationSchema
维表 join Flink SQL / Broadcast Join
自定义状态存储 Flink KeyedState
exactly-once 写入 Kafka/Hudi 使用 checkpoint 支持


网站公告

今日签到

点亮在社区的每一天
去签到