一、MySQL 服务器配置详解
1. 启用二进制日志(Binlog)
MySQL CDC 依赖二进制日志获取增量数据,需在 MySQL 配置文件(my.cnf
或 my.ini
)中添加以下配置:
# 启用二进制日志
log-bin=mysql-bin
# 二进制日志格式(推荐ROW模式,记录行级变更)
binlog-format=ROW
# 启用GTID(高可用必备)
gtid-mode=ON
enforce-gtid-consistency=ON
# 从库同步时记录binlog(主从架构需要)
log-slave-updates=ON
# 避免长连接超时(大表快照时需要)
interactive_timeout=3600
wait_timeout=3600
配置说明:
log-bin
:指定二进制日志文件名前缀,MySQL 会自动生成如mysql-bin.000001
的文件binlog-format=ROW
:相比 STATEMENT 模式,ROW 模式能精确记录每行数据的变更gtid-mode
:全局事务标识符,用于主从切换时保证数据一致性log-slave-updates
:若使用从库同步,需开启此配置让从库也记录 binlog
2. 创建专用用户并授权
-- 创建用户(替换为实际用户名和密码)
CREATE USER 'flink_cdc'@'localhost' IDENTIFIED BY 'flink123';
-- 授予必要权限(重要:REPLICATION SLAVE 用于读取binlog)
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_cdc'@'localhost';
-- 刷新权限
FLUSH PRIVILEGES;
权限说明:
SELECT
:读取表数据(快照阶段需要)SHOW DATABASES
:获取数据库列表(用于正则匹配监控库)REPLICATION SLAVE
:读取 binlog 必备权限REPLICATION CLIENT
:获取服务器状态(如binlog位置)
3. 配置唯一 Server ID
每个 Flink 作业需配置不同的 Server ID(避免 binlog 位置冲突):
# 在my.cnf中添加
server-id=1001 # 任意唯一整数,建议范围5400-6400
说明:若 Flink 作业并行度为 N,则 Server ID 可设为范围(如 5400-5400+N
),例如:
-- Flink SQL 中通过Hints设置Server ID范围
SELECT * FROM mysql_table /*+ OPTIONS('server-id'='5401-5404') */;
二、Flink 环境配置步骤
1. 添加依赖(Maven 项目)
在 pom.xml
中添加 MySQL CDC 连接器依赖:
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>3.0.1</version>
<!-- 若使用Flink 1.14+,无需添加scope -->
<scope>provided</scope>
</dependency>
2. SQL Client 部署(非Maven环境)
- 下载连接器 JAR 包:flink-sql-connector-mysql-cdc-3.0.1.jar
- 将 JAR 包放入
$FLINK_HOME/lib/
目录 - 重启 Flink 集群使依赖生效
三、Flink MySQL CDC 表定义与参数详解
1. 完整建表示例(Flink SQL)
-- 设置checkpoint间隔(可选)
SET 'execution.checkpointing.interval' = '3s';
-- 创建MySQL CDC表
CREATE TABLE mysql_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
-- 可选:添加元数据列
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
row_kind STRING METADATA FROM 'row_kind' VIRTUAL,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.100',
'port' = '3306',
'username' = 'flink_cdc',
'password' = 'flink123',
'database-name' = 'mydb',
'table-name' = 'orders',
-- 可选参数详解
'server-id' = '5401',
'scan.incremental.snapshot.enabled' = 'true',
'scan.incremental.snapshot.chunk.size' = '8096',
'scan.startup.mode' = 'initial',
'heartbeat.interval' = '30s',
'debezium.binary.handling.mode' = 'base64'
);
2. 核心参数详解
参数名 | 必选 | 默认值 | 类型 | 说明 |
---|---|---|---|---|
connector |
是 | 无 | String | 固定为 mysql-cdc |
hostname |
是 | 无 | String | MySQL 服务器IP或域名 |
username |
是 | 无 | String | 连接MySQL的用户名 |
password |
是 | 无 | String | 连接MySQL的密码 |
database-name |
是 | 无 | String | 监控的数据库名,支持正则表达式(如 ^(test).* 匹配以test开头的库) |
table-name |
是 | 无 | String | 监控的表名,支持正则表达式(如 `orders |
server-id |
否 | 5400-6400随机 | String | Flink作业的唯一标识,需与其他MySQL客户端(如主从复制)不同,并行作业建议设为范围(如 5401-5404 ) |
scan.incremental.snapshot.enabled |
否 | true | Boolean | 启用增量快照(并行读取大表,无需全局锁),建议保持默认 |
scan.startup.mode |
否 | initial | String | 启动模式:initial (快照+binlog)、earliest-offset (从最早binlog开始)、latest-offset (从最新binlog开始) |
heartbeat.interval |
否 | 30s | Duration | 心跳间隔,用于更新binlog位置,避免长时间无变更时binlog被清理 |
debezium.binary.handling.mode |
否 | none | String | 二进制数据处理模式:base64 (转Base64字符串)、hex (转十六进制),适用于BLOB/VARBINARY类型 |
四、环境验证与测试
1. 准备测试数据(MySQL)
-- 创建测试数据库和表
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE orders (
order_id INT PRIMARY KEY,
order_date TIMESTAMP,
customer_name VARCHAR(100),
price DECIMAL(10, 2),
order_status BOOLEAN
);
-- 插入测试数据
INSERT INTO orders VALUES
(1, '2023-01-01 10:00:00', 'Alice', 100.50, true),
(2, '2023-01-02 11:00:00', 'Bob', 200.75, false);
2. 使用Flink SQL验证
-- 查询MySQL CDC表数据
SELECT * FROM mysql_orders;
-- 观察输出:应显示插入的两条记录
-- 后续在MySQL中更新数据,Flink会实时捕获变更
UPDATE mydb.orders SET price = 150.00 WHERE order_id = 1;
3. DataStream API 验证示例
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
public class MySqlCdcExample {
public static void main(String[] args) throws Exception {
// 创建MySQL Source
MySqlSource<String> source = MySqlSource.<String>builder()
.hostname("192.168.1.100")
.port(3306)
.databaseList("mydb")
.tableList("mydb.orders")
.username("flink_cdc")
.password("flink123")
.deserializer(new JsonDebeziumDeserializationSchema()) // 转为JSON格式
.startupOptions(StartupOptions.initial()) // 初始模式(快照+binlog)
.build();
// 配置Flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // 5秒checkpoint
env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
.print(); // 打印到控制台
env.execute("MySQL CDC Test");
}
}
4. 验证关键点
日志检查:
- Flink 日志应包含
Binlog offset on checkpoint
字样,表明成功获取 binlog 位置 - 无
Access denied
或Permission denied
错误,确认MySQL权限正确
- Flink 日志应包含
数据变更测试:
- 在MySQL中执行
INSERT/UPDATE/DELETE
操作,Flink 应实时输出变更数据 - 查看输出中的
row_kind
字段:+I
(插入)、-D
(删除)、+U
(更新后)、-U
(更新前)
- 在MySQL中执行
增量快照验证:
- 若表数据量大,查看Flink Web UI的并行度,增量快照模式下多个任务应并行读取
- 日志中无
FLUSH TABLES WITH READ LOCK
相关记录,确认未获取全局锁
五、常见问题与解决方案
权限不足错误:
ERROR: Access denied for user 'flink_cdc'@'localhost' (using password: YES)
- 解决方案:确认MySQL用户密码正确,重新执行授权语句,确保包含
REPLICATION SLAVE
权限
- 解决方案:确认MySQL用户密码正确,重新执行授权语句,确保包含
Server ID冲突:
ERROR: Another MySQL binlog client is using the same server id
- 解决方案:修改
server-id
为唯一值,或在Flink SQL中通过'server-id'='5401-5404'
设置范围
- 解决方案:修改
增量快照失败:
ERROR: Table has no primary key, cannot split snapshot chunks
- 解决方案:为表添加主键,或设置
scan.incremental.snapshot.chunk.key-column
为非空列(如'scan.incremental.snapshot.chunk.key-column'='unique_id'
)
- 解决方案:为表添加主键,或设置
binlog未启用:
ERROR: Binary logging is not enabled
- 解决方案:检查MySQL配置文件,确认
log-bin
已启用,重启MySQL服务
- 解决方案:检查MySQL配置文件,确认
通过以上步骤,可完成Flink MySQL CDC的环境配置与验证。生产环境中建议结合实际需求调整并行度、checkpoint策略和GTID配置,以确保数据一致性和系统稳定性。