监控 MySQL Binlog 使用 Flink 将数据实时写入 Kafka
一、前言
在实时数据处理场景中,很多企业需要将 MySQL 数据库中的变化(新增、修改、删除)实时同步到下游系统,例如 Kafka、Elasticsearch、HBase 等,以实现数据分析、监控预警或搜索查询。
本文将介绍如何通过 Flink CDC(Change Data Capture)监控 MySQL Binlog,并将变更数据实时写入 Kafka,实现准实时数据同步。
二、整体架构
实现流程如下:
MySQL(Binlog) --> Flink CDC --> Kafka(Topic) --> 下游消费者
- MySQL
需要开启binlog
,并设置为ROW
格式。 - Flink CDC
使用ververica/flink-cdc-connectors
直接解析 MySQL Binlog。 - Kafka
用作消息中间件,方便数据被下游系统订阅消费。
三、环境准备
1. MySQL 配置
修改 my.cnf
(Linux 可能是 /etc/my.cnf
或 /etc/mysql/mysql.conf.d/mysqld.cnf
):
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL
执行以下命令,给 Flink 连接用户授权:
CREATE USER 'flink'@'%' IDENTIFIED BY 'flink123';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink'@'%';
FLUSH PRIVILEGES;
2. Kafka 启动
如果本地是单机测试,可以直接用
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
创建 Topic:
bin/kafka-topics.sh --create --topic mysql_binlog_topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
3. Maven 依赖
在 Flink 项目中引入必要依赖(以 Flink 1.15 为例):
<dependencies>
<!-- Flink 核心依赖 -->
<dependency>
<groupId>org.apache.flink</groupId