Flink MySQL CDC 环境配置与验证

发布于:2025-07-05 ⋅ 阅读:(16) ⋅ 点赞:(0)
一、MySQL 服务器配置详解
1. 启用二进制日志(Binlog)

MySQL CDC 依赖二进制日志获取增量数据,需在 MySQL 配置文件(my.cnfmy.ini)中添加以下配置:

# 启用二进制日志
log-bin=mysql-bin
# 二进制日志格式(推荐ROW模式,记录行级变更)
binlog-format=ROW
# 启用GTID(高可用必备)
gtid-mode=ON
enforce-gtid-consistency=ON
# 从库同步时记录binlog(主从架构需要)
log-slave-updates=ON
# 避免长连接超时(大表快照时需要)
interactive_timeout=3600
wait_timeout=3600

配置说明

  • log-bin:指定二进制日志文件名前缀,MySQL 会自动生成如 mysql-bin.000001 的文件
  • binlog-format=ROW:相比 STATEMENT 模式,ROW 模式能精确记录每行数据的变更
  • gtid-mode:全局事务标识符,用于主从切换时保证数据一致性
  • log-slave-updates:若使用从库同步,需开启此配置让从库也记录 binlog
2. 创建专用用户并授权
-- 创建用户(替换为实际用户名和密码)
CREATE USER 'flink_cdc'@'localhost' IDENTIFIED BY 'flink123';

-- 授予必要权限(重要:REPLICATION SLAVE 用于读取binlog)
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_cdc'@'localhost';

-- 刷新权限
FLUSH PRIVILEGES;

权限说明

  • SELECT:读取表数据(快照阶段需要)
  • SHOW DATABASES:获取数据库列表(用于正则匹配监控库)
  • REPLICATION SLAVE:读取 binlog 必备权限
  • REPLICATION CLIENT:获取服务器状态(如binlog位置)
3. 配置唯一 Server ID

每个 Flink 作业需配置不同的 Server ID(避免 binlog 位置冲突):

# 在my.cnf中添加
server-id=1001  # 任意唯一整数,建议范围5400-6400

说明:若 Flink 作业并行度为 N,则 Server ID 可设为范围(如 5400-5400+N),例如:

-- Flink SQL 中通过Hints设置Server ID范围
SELECT * FROM mysql_table /*+ OPTIONS('server-id'='5401-5404') */;
二、Flink 环境配置步骤
1. 添加依赖(Maven 项目)

pom.xml 中添加 MySQL CDC 连接器依赖:

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-sql-connector-mysql-cdc</artifactId>
    <version>3.0.1</version>
    <!-- 若使用Flink 1.14+,无需添加scope -->
    <scope>provided</scope>
</dependency>
2. SQL Client 部署(非Maven环境)
  1. 下载连接器 JAR 包:flink-sql-connector-mysql-cdc-3.0.1.jar
  2. 将 JAR 包放入 $FLINK_HOME/lib/ 目录
  3. 重启 Flink 集群使依赖生效
三、Flink MySQL CDC 表定义与参数详解
1. 完整建表示例(Flink SQL)
-- 设置checkpoint间隔(可选)
SET 'execution.checkpointing.interval' = '3s';

-- 创建MySQL CDC表
CREATE TABLE mysql_orders (
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    -- 可选:添加元数据列
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    table_name STRING METADATA FROM 'table_name' VIRTUAL,
    op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    row_kind STRING METADATA FROM 'row_kind' VIRTUAL,
    PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.1.100',
    'port' = '3306',
    'username' = 'flink_cdc',
    'password' = 'flink123',
    'database-name' = 'mydb',
    'table-name' = 'orders',
    -- 可选参数详解
    'server-id' = '5401',
    'scan.incremental.snapshot.enabled' = 'true',
    'scan.incremental.snapshot.chunk.size' = '8096',
    'scan.startup.mode' = 'initial',
    'heartbeat.interval' = '30s',
    'debezium.binary.handling.mode' = 'base64'
);
2. 核心参数详解
参数名 必选 默认值 类型 说明
connector String 固定为 mysql-cdc
hostname String MySQL 服务器IP或域名
username String 连接MySQL的用户名
password String 连接MySQL的密码
database-name String 监控的数据库名,支持正则表达式(如 ^(test).* 匹配以test开头的库)
table-name String 监控的表名,支持正则表达式(如 `orders
server-id 5400-6400随机 String Flink作业的唯一标识,需与其他MySQL客户端(如主从复制)不同,并行作业建议设为范围(如 5401-5404
scan.incremental.snapshot.enabled true Boolean 启用增量快照(并行读取大表,无需全局锁),建议保持默认
scan.startup.mode initial String 启动模式:initial(快照+binlog)、earliest-offset(从最早binlog开始)、latest-offset(从最新binlog开始)
heartbeat.interval 30s Duration 心跳间隔,用于更新binlog位置,避免长时间无变更时binlog被清理
debezium.binary.handling.mode none String 二进制数据处理模式:base64(转Base64字符串)、hex(转十六进制),适用于BLOB/VARBINARY类型
四、环境验证与测试
1. 准备测试数据(MySQL)
-- 创建测试数据库和表
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE orders (
    order_id INT PRIMARY KEY,
    order_date TIMESTAMP,
    customer_name VARCHAR(100),
    price DECIMAL(10, 2),
    order_status BOOLEAN
);

-- 插入测试数据
INSERT INTO orders VALUES 
(1, '2023-01-01 10:00:00', 'Alice', 100.50, true),
(2, '2023-01-02 11:00:00', 'Bob', 200.75, false);
2. 使用Flink SQL验证
-- 查询MySQL CDC表数据
SELECT * FROM mysql_orders;

-- 观察输出:应显示插入的两条记录
-- 后续在MySQL中更新数据,Flink会实时捕获变更
UPDATE mydb.orders SET price = 150.00 WHERE order_id = 1;
3. DataStream API 验证示例
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;

public class MySqlCdcExample {
    public static void main(String[] args) throws Exception {
        // 创建MySQL Source
        MySqlSource<String> source = MySqlSource.<String>builder()
                .hostname("192.168.1.100")
                .port(3306)
                .databaseList("mydb")
                .tableList("mydb.orders")
                .username("flink_cdc")
                .password("flink123")
                .deserializer(new JsonDebeziumDeserializationSchema()) // 转为JSON格式
                .startupOptions(StartupOptions.initial()) // 初始模式(快照+binlog)
                .build();

        // 配置Flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000); // 5秒checkpoint
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
            .print(); // 打印到控制台
        
        env.execute("MySQL CDC Test");
    }
}
4. 验证关键点
  1. 日志检查

    • Flink 日志应包含 Binlog offset on checkpoint 字样,表明成功获取 binlog 位置
    • Access deniedPermission denied 错误,确认MySQL权限正确
  2. 数据变更测试

    • 在MySQL中执行 INSERT/UPDATE/DELETE 操作,Flink 应实时输出变更数据
    • 查看输出中的 row_kind 字段:+I(插入)、-D(删除)、+U(更新后)、-U(更新前)
  3. 增量快照验证

    • 若表数据量大,查看Flink Web UI的并行度,增量快照模式下多个任务应并行读取
    • 日志中无 FLUSH TABLES WITH READ LOCK 相关记录,确认未获取全局锁
五、常见问题与解决方案
  1. 权限不足错误

    ERROR: Access denied for user 'flink_cdc'@'localhost' (using password: YES)
    
    • 解决方案:确认MySQL用户密码正确,重新执行授权语句,确保包含 REPLICATION SLAVE 权限
  2. Server ID冲突

    ERROR: Another MySQL binlog client is using the same server id
    
    • 解决方案:修改 server-id 为唯一值,或在Flink SQL中通过 'server-id'='5401-5404' 设置范围
  3. 增量快照失败

    ERROR: Table has no primary key, cannot split snapshot chunks
    
    • 解决方案:为表添加主键,或设置 scan.incremental.snapshot.chunk.key-column 为非空列(如 'scan.incremental.snapshot.chunk.key-column'='unique_id'
  4. binlog未启用

    ERROR: Binary logging is not enabled
    
    • 解决方案:检查MySQL配置文件,确认 log-bin 已启用,重启MySQL服务

通过以上步骤,可完成Flink MySQL CDC的环境配置与验证。生产环境中建议结合实际需求调整并行度、checkpoint策略和GTID配置,以确保数据一致性和系统稳定性。


网站公告

今日签到

点亮在社区的每一天
去签到