一、Flink与其他组件的协同
Flink 是一个分布式、高性能、始终可用、准确一次(Exactly-Once)语义的流处理引擎,广泛应用于大数据实时处理场景中。它与 Hadoop 生态系统中的组件可以深度集成,形成完整的大数据处理链路。下面我们从 Flink 的 核心架构 出发,结合与 Hadoop 组件协同方式,详细剖析 Flink 的作用。
1. Flink 核心架构详解
1)架构组件图概览
+-------------------------+
| Client |
+-------------------------+
|
v
+-------------------------+
| JobManager (JM) | <-- Master 负责调度
+-------------------------+
|
v
+-------------------------+
| TaskManagers (TM) | <-- Worker 执行算子任务
+-------------------------+
|
v
+-------------------------+
| Slot | <-- 执行资源单位
+-------------------------+
2)核心组件职责
组件 | 描述 |
---|---|
Client | 提交作业到 Flink 集群,触发作业执行。 |
JobManager (JM) | 管理作业生命周期,负责调度任务、故障恢复、协调检查点(Checkpoint)等。 |
TaskManager (TM) | 具体执行作业的物理任务(算子),负责数据交换、状态管理等。 |
Slot | TaskManager 内部的资源单位,用于任务部署。每个 TaskManager 有多个 Slot。 |
3)状态管理与容错
Checkpoint/Savepoint:可恢复一致性状态(Exactly Once)
State Backend:保存状态(如 RocksDB、FsStateBackend)
Recovery:通过重放 Checkpoint 恢复任务
2. Flink 与 Hadoop 各组件的协同关系
Flink 虽然是独立系统,但能与 Hadoop 生态的多个关键组件协同工作,构建完整的大数据平台。
1)与 HDFS(Hadoop Distributed File System)
协同方式 | 描述 |
---|---|
输入源 | Flink 可直接读取 HDFS 中的批量数据(如 ORC、Parquet、Text 等格式) |
状态后端 | Flink Checkpoint/Savepoint 可存储到 HDFS 上,保证高可用与容灾 |
输出目标 | Flink 作业可以将计算结果输出到 HDFS,作为后续离线处理的数据 |
fs.defaultFS: hdfs://namenode:8020
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints/
2)与 Hive
协同方式 | 描述 |
---|---|
读取表数据 | Flink 可通过 Hive Catalog 与 Hive 元数据打通,直接读取 Hive 表 |
写入表 | Flink SQL 可将流式数据写入 Hive(使用 INSERT INTO) |
统一元数据 | Flink + Hive Catalog 支持表结构共享,便于湖仓一体实践 |
CREATE CATALOG my_hive WITH (
'type' = 'hive',
'hive-conf-dir' = '/etc/hive/conf'
);
3)与 Kafka(实时采集)
协同方式 | 描述 |
---|---|
实时数据源 | Flink 通过 Kafka Source 接收实时数据流(如日志、订单等) |
下游结果写入 | Flink 可将流式计算结果写入 Kafka(供下游消费) |
Exactly Once 语义 | Flink + Kafka + Checkpoint 可实现端到端的精确一次语义 |
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"topic", new SimpleStringSchema(), properties);
consumer.setStartFromGroupOffsets();
consumer.setCommitOffsetsOnCheckpoints(true);
4)与 HBase(实时查询)
协同方式 | 描述 |
---|---|
维表关联 | Flink 可使用 HBase 作为维表进行流批 Join,实时补充维度数据 |
实时写入 | 计算结果可实时写入 HBase,支持下游查询系统使用(如用户画像等) |
tableEnv.executeSql("CREATE TABLE hbase_dim (...) WITH ('connector' = 'hbase-2.2', ...)");
5)与 YARN
协同方式 | 描述 |
---|---|
资源调度 | Flink 可部署在 YARN 上,利用 Hadoop 的资源调度管理能力 |
Session / Per-Job 模式 | 支持多租户资源隔离或每个作业独立资源隔离部署 |
flink run -m yarn-cluster -ynm my-flink-job myjob.jar
6)与 Zookeeper
协同方式 | 描述 |
---|---|
高可用 JobManager | 使用 Zookeeper 实现 JobManager 的 leader election |
Checkpoint HA 元数据存储 | 配合 HDFS 存储 Checkpoint 元数据路径 |
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.storageDir: hdfs://namenode:8020/flink/ha/
3. Flink 的作用总结
模块 | Flink 的角色 |
---|---|
实时数据处理 | 核心组件,进行低延迟、高吞吐流处理计算 |
数据清洗与 ETL | 提供强大 SQL / DataStream API 进行多源数据处理与聚合 |
实时指标计算 | 支持实时 KPI、UV/PV、订单流等分析 |
数据湖构建 | 可作为流式数据入湖的计算引擎(结合 Hudi/Iceberg) |
实时监控预警 | 搭配 Kafka + Prometheus,构建告警与监控系统 |
实时数仓建设 | 联合 Kafka + Hive + HDFS + HBase 构建流批一体数仓体系 |
4. Flink 架构在 Hadoop 平台的实际部署图
+-------------+
| Flume/Nginx|
+------+------+
|
Kafka集群
|
+-------------------+--------------------+
| |
+---v---+ +----v----+
| Flink |--> 清洗 → 维表 Join → 计算 | Spark |
+---+---+ +----+----+
| |
+-------v---------+ +--------v--------+
| HBase/Redis | | HDFS / Hive |
+-----------------+ +-----------------+
二、Flink DataStream API的使用
现在以 Flink DataStream API 为核心,深入剖析一个真实生产场景的 从 Kafka 到 Kafka 的流式处理全流程,包括:
项目结构与依赖
数据模型与清洗
水位线与乱序处理
异步维表查询(HBase/MySQL/Redis)
窗口聚合逻辑
数据下发(Kafka Sink)
容错机制与 Checkpoint 配置
1. 项目结构与依赖
1)Maven 依赖(pom.xml
)
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.0.1-1.17</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2</artifactId>
<version>1.17.1</version>
</dependency>
</dependencies>
2. 数据模型定义
1)订单数据结构(OrderEvent)
public class OrderEvent {
public String orderId;
public String userId;
public String productId;
public double price;
public int quantity;
public long orderTime; // epoch millis
}
2) 商品维度(ProductInfo)
public class ProductInfo {
public String productId;
public String categoryId;
public String productName;
}
3)聚合结果结构(OrderStat)
public class OrderStat {
public String categoryId;
public long windowStart;
public long windowEnd;
public double totalAmount;
}
3. Kafka Source + JSON 反序列化
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("order_events")
.setGroupId("flink-consumer")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<OrderEvent> orderStream = env
.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "KafkaSource")
.map(json -> new ObjectMapper().readValue(json, OrderEvent.class))
.returns(OrderEvent.class);
4. 水位线处理(乱序数据支持)
WatermarkStrategy<OrderEvent> watermarkStrategy = WatermarkStrategy
.<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.orderTime);
DataStream<OrderEvent> orderStreamWithWM = orderStream
.assignTimestampsAndWatermarks(watermarkStrategy);
5. 异步维表关联(以 HBase 为例)
使用 AsyncFunction
实现异步查询(支持 Redis/HBase/MySQL)
示例实现:AsyncProductEnrichmentFunction
public class AsyncProductEnrichmentFunction extends RichAsyncFunction<OrderEvent, Tuple2<OrderEvent, ProductInfo>> {
private transient HBaseClient hBaseClient;
@Override
public void open(Configuration parameters) throws Exception {
hBaseClient = new HBaseClient("hbase.zookeeper.quorum");
}
@Override
public void asyncInvoke(OrderEvent input, ResultFuture<Tuple2<OrderEvent, ProductInfo>> resultFuture) {
CompletableFuture
.supplyAsync(() -> hBaseClient.queryProductInfo(input.productId))
.thenAccept(productInfo -> resultFuture.complete(Collections.singletonList(Tuple2.of(input, productInfo))));
}
@Override
public void close() throws Exception {
hBaseClient.close();
}
}
应用异步函数
DataStream<Tuple2<OrderEvent, ProductInfo>> enrichedStream = AsyncDataStream.unorderedWait(
orderStreamWithWM,
new AsyncProductEnrichmentFunction(),
5, TimeUnit.SECONDS, 100
);
6. 按类目 ID 滚动窗口聚合
DataStream<OrderStat> resultStream = enrichedStream
.map(tuple -> new Tuple3<>(tuple.f1.categoryId, tuple.f0.orderTime, tuple.f0.price * tuple.f0.quantity))
.returns(Types.TUPLE(Types.STRING, Types.LONG, Types.DOUBLE))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple3<String, Long, Double>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((t, ts) -> t.f1)
)
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new AggregateFunction<Tuple3<String, Long, Double>, Double, OrderStat>() {
private long windowStart, windowEnd;
private String categoryId;
public Double createAccumulator() { return 0.0; }
public Double add(Tuple3<String, Long, Double> value, Double acc) {
categoryId = value.f0;
return acc + value.f2;
}
public OrderStat getResult(Double acc) {
return new OrderStat(categoryId, windowStart, windowEnd, acc);
}
public Double merge(Double acc1, Double acc2) {
return acc1 + acc2;
}
}, new ProcessWindowFunction<OrderStat, OrderStat, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<OrderStat> elements, Collector<OrderStat> out) {
OrderStat stat = elements.iterator().next();
stat.windowStart = context.window().getStart();
stat.windowEnd = context.window().getEnd();
out.collect(stat);
}
});
7. 写入 Kafka Sink
KafkaSink<OrderStat> kafkaSink = KafkaSink.<OrderStat>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("order_stats")
.setValueSerializationSchema(stat -> {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsBytes(stat);
})
.build())
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
resultStream.sinkTo(kafkaSink);
8. 容错与 HA 配置(关键)
1)Checkpoint 配置
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode/flink/checkpoints"));
2)高可用配置(flink-conf.yaml)
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181
state.checkpoints.dir: hdfs://namenode/flink/checkpoints
state.savepoints.dir: hdfs://namenode/flink/savepoints
9. 运行命令(on YARN)
flink run -m yarn-cluster -c com.company.OrderRealtimeJob your-job.jar
10. 监控与排障建议
工具 | 功能 |
---|---|
Flink Web UI | 监控 Task、Checkpoint、Watermark |
Prometheus | 指标采集 |
Grafana | 可视化 |
AlertManager | 告警配置 |
Savepoint | 容错恢复点 |
三、FlinkCDC实时采集数据入湖
解析Flink CDC(Change Data Capture)在大数据体系中的使用方法,并结合 Kafka、Hudi、Iceberg、Hive、HDFS 等大数据组件,提供一套 可落地、可执行、可扩展的完整集成方案。
1. Flink CDC 简介
Flink CDC 是 Apache Flink + Debezium 的组合,用于实时采集 MySQL/PostgreSQL 等数据库的变更数据(INSERT/UPDATE/DELETE),并以 流式方式传递到下游系统(Kafka、Hudi、Iceberg、HBase 等)。
2. 典型架构场景:Flink CDC + Hudi + Hive 实时数据湖方案
+-------------+ +---------------------+
| MySQL/Postgres | |
| Source DB +--------> | Flink CDC Connector |
+-------------+ | |
+----------+----------+
|
| Row-level ChangeLog
v
+----------+----------+
| Flink Job |
| (数据清洗/处理) |
+----------+----------+
|
v
+----------+----------+
| Hudi Sink (Flink) |
+----------+----------+
|
v
+-------------+-------------+
| Hive / Presto / Trino |
| 实时查询(支持 ACID) |
+---------------------------+
3. 方案目标
实时采集 MySQL 数据(基于 Binlog)
支持变更(Insert/Update/Delete)语义
数据存入 Hudi 表(支持 MOR/COW 格式)
Hive/Presto 端可直接查询
4. 组件版本建议
组件 | 版本建议 |
---|---|
Flink | 1.17.x 或 1.18.x |
Flink CDC | 2.4.1 |
Debezium | 内嵌于 Flink CDC |
Hudi | 0.13.1+ |
Hive | 2.3.x / 3.1.x |
Hadoop/HDFS | 3.x |
5. 部署准备
1)安装 Kafka(可选)
用于做 CDC 中转(可选,支持 Flink 直接接 Hudi)
2)安装 Hive Metastore + Hadoop HDFS
用于管理 Hudi 表元数据和 HDFS 存储
3)准备 MySQL 源数据库
配置 binlog,设置 binlog_format = ROW
,并开启 server_id
、binlog_row_image = full
6. 关键配置代码与步骤
1)添加 Maven 依赖
<dependencies>
<!-- Flink CDC -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.4.1</version>
</dependency>
<!-- Hudi Sink -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink-bundle_2.12</artifactId>
<version>0.13.1</version>
</dependency>
</dependencies>
2)Flink SQL 示例(CDC → Hudi)
-- 1. 源表:MySQL CDC 表
CREATE TABLE ods_orders (
id STRING,
user_id STRING,
amount DOUBLE,
ts TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql-host',
'port' = '3306',
'username' = 'flink',
'password' = 'flink123',
'database-name' = 'srm',
'table-name' = 'orders',
'scan.startup.mode' = 'initial'
);
-- 2. 目标表:Hudi 表(MOR 模式)
CREATE TABLE dwd_orders (
id STRING PRIMARY KEY NOT ENFORCED,
user_id STRING,
amount DOUBLE,
ts TIMESTAMP(3)
) PARTITIONED BY (`user_id`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://namenode/data/hudi/dwd_orders',
'table.type' = 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field' = 'id',
'write.tasks' = '4',
'compaction.async.enabled' = 'true',
'hive_sync.enabled' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://hive-metastore:9083',
'hive_sync.db' = 'ods',
'hive_sync.table' = 'dwd_orders'
);
-- 3. 实时写入
INSERT INTO dwd_orders
SELECT * FROM ods_orders;
7. 关键功能说明
功能 | 配置字段 | 说明 |
---|---|---|
主键变更支持 | PRIMARY KEY ... NOT ENFORCED |
支持 upsert |
增量采集模式 | scan.startup.mode = initial |
首次全量 + 后续增量 |
实时 compaction | compaction.async.enabled = true |
MOR 表性能保障 |
Hive 数据同步 | hive_sync.enabled = true |
Hudi 自动注册 Hive 元数据 |
8. 整合优化建议
1)多表 CDC 同步统一处理
使用 Flink CDC 的 schema-name.table-name
通配符:
'database-name' = 'srm',
'table-name' = '.*',
配合 Flink SQL Catalog + Dynamic Table Factory,可实现一拖 N 的多表处理逻辑。
2)增加清洗逻辑(如空值过滤、转换)
SELECT
id,
user_id,
amount * 1.13 AS amount_tax,
ts
FROM ods_orders
WHERE amount IS NOT NULL;
3)写入 Kafka(替代 Hudi) → 用于事件总线或下游消费
CREATE TABLE kafka_sink (
id STRING,
user_id STRING,
amount DOUBLE,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'ods.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
9. Flink CDC 整合场景汇总
场景 | 描述 | 推荐组件 |
---|---|---|
实时数据入湖 | MySQL → Hudi | Flink CDC + Hudi |
数据仓库加速 | Oracle → Iceberg | Flink CDC + Iceberg |
数据中台构建 | MySQL → Kafka → 多下游 | Flink CDC + Kafka |
数据回流校验 | Kafka → Flink → MySQL | Flink SQL + JDBC Sink |
DWD建模 | ODS → DWD/DWM → ADS | Flink SQL + 维表 JOIN |
10. 可视化监控
工具 | 功能 |
---|---|
Flink UI | Checkpoint、Watermark、吞吐 |
Prometheus | 指标采集 |
Grafana | 监控仪表盘 |
Hive | SQL 查询验证 |
四、自定义 Flink CDC Job 的完整实现
自定义 Flink CDC Job 的完整实现,采用 Java DataStream API 编写,支持:
多表接入(MySQL 为例)
自定义清洗、转换逻辑
支持写入 Kafka、Hudi、Iceberg 等下游系统
可部署为标准 Flink 应用(
flink run
执行)
1. 自定义 Flink CDC Job 场景说明
目标:
从 MySQL 采集订单表
srm.orders
做清洗(如金额换算、字段过滤)
输出到 Hudi 表(或 Kafka/Console)
2. 依赖配置(Maven)
<dependencies>
<!-- Flink CDC -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.4.1</version>
</dependency>
<!-- Flink 通用 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.2</version>
</dependency>
<!-- 可选:Sink 依赖,如 Kafka、Hudi、Iceberg -->
</dependencies>
3. 完整代码示例:CustomCdcJob.java
public class CustomCdcJob {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 配置 CDC 源:MySQL
MySqlSource<Order> mysqlSource = MySqlSource.<Order>builder()
.hostname("mysql-host")
.port(3306)
.databaseList("srm")
.tableList("srm.orders")
.username("flink")
.password("flink123")
.deserializer(new OrderDeserializationSchema()) // 自定义反序列化
.build();
// 3. 接入 Source
DataStreamSource<Order> orderStream = env.fromSource(
mysqlSource,
WatermarkStrategy.noWatermarks(),
"MySQL CDC Source"
);
// 4. 数据清洗/转换
SingleOutputStreamOperator<Order> cleaned = orderStream
.filter(order -> order.amount > 0)
.map(order -> {
order.amount = order.amount * 1.13; // 加税
return order;
});
// 5. Sink:控制台 / Kafka / Hudi
cleaned.print();
env.execute("Custom Flink CDC Job");
}
}
4. 自定义反序列化器:OrderDeserializationSchema
public class OrderDeserializationSchema implements DebeziumDeserializationSchema<Order> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<Order> collector) {
Struct value = (Struct) sourceRecord.value();
if (value == null) return;
Struct after = value.getStruct("after");
if (after != null) {
Order order = new Order();
order.id = after.getString("id");
order.userId = after.getString("user_id");
order.amount = after.getFloat64("amount");
order.ts = Instant.ofEpochMilli(after.getInt64("ts")).atZone(ZoneId.of("UTC")).toLocalDateTime();
collector.collect(order);
}
}
@Override
public TypeInformation<Order> getProducedType() {
return TypeInformation.of(Order.class);
}
}
5. 定义 POJO 类:Order.java
public class Order implements Serializable {
public String id;
public String userId;
public Double amount;
public LocalDateTime ts;
@Override
public String toString() {
return String.format("[Order] id=%s, user=%s, amt=%.2f, ts=%s",
id, userId, amount, ts.toString());
}
}
6. Sink 可选方案
1)控制台输出(开发调试)
cleaned.print();
2)Kafka Sink(事件总线)
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("srm.orders.cdc")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.build();
cleaned.map(order -> JSON.toJSONString(order)).sinkTo(kafkaSink);
3)写入 Hudi 表(通过 Flink Hudi Sink)
cleaned.addSink(HudiSinkUtil.getSink());
自定义 Hudi Sink 工具类可基于 HoodieSink
封装。
七、打包部署方式
1)使用 maven-shade-plugin
打 fat-jar:
mvn clean package -DskipTests
输出:custom-cdc-job-1.0-SNAPSHOT.jar
2)提交到 Flink 集群
flink run -m yarn-cluster -c com.example.CustomCdcJob custom-cdc-job.jar
8. 扩展功能(可选)
功能 | 实现方式 |
---|---|
多表同步 | .tableList("srm.orders,srm.invoice") |
动态 schema 推导 | 使用 JsonDebeziumDeserializationSchema |
维表 join | Flink SQL / Broadcast Join |
自定义状态存储 | Flink KeyedState |
exactly-once 写入 Kafka/Hudi | 使用 checkpoint 支持 |