flink mysql数据表同步SQL CDC

发布于:2024-04-19 ⋅ 阅读:(34) ⋅ 点赞:(0)

1、CDC简介  Change Data Capture

FlinkCDC提供一组源数据的连接器,使用变更数据捕获的方式,直接吸收来自不同数据库的变更数据。通过CDC获取源数据表的更新内容,将更新内容作为数据流下发到下游系统,可以做到mysql数据表数据的实时同步操作。

基于Flink CDC的MySQL表数据同步流程大致如下:

  1. 数据源(MySQL):首先,一个MySQL数据库作为数据源,其中包含了想要同步的表。

  2. Flink CDC Connector:Flink CDC Connector是一个用于捕获MySQL表数据变更的组件。它连接到MySQL数据库,并持续监听数据变更(如插入、更新、删除操作)。

  3. 数据捕获:当MySQL表中的数据发生变化时,Flink CDC Connector会捕获这些变更事件,并将它们作为数据流进行处理。

  4. Flink流处理:捕获到的数据流会进入Flink流处理引擎。在Flink中,你可以定义一系列的操作来处理这些数据,比如过滤、聚合、转换等。

  5. 目标存储:处理后的数据会被写入到目标存储系统。这可以是一个数据库、数据仓库、消息队列或其他任何数据存储系统。

  6. 监控与告警:同步过程中,你可以设置监控和告警机制,以便在出现问题时能够及时得到通知并进行处理。

  7. 错误处理与重试:在同步过程中,可能会遇到各种错误,如网络问题、目标存储故障等。你需要设计合适的错误处理机制,比如重试策略,以确保数据的可靠性和一致性。

2、CDC配置

(1)开启MySql的binlog

1,修改 mysql 的配置文件 my.cnf
追加内容:
log-bin=mysql-bin     #binlog
binlog_format=ROW     #选择row
server_id=1           #mysql实例id
2,重启 mysql:
service mysql restart    
3,登录 mysql 客户端,查看 log_bin 开启状态
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON|
+---------------+-------+
————————————————
状态为ON表示该功能已开启

(2)创建Mysql数据库表

两个库表源表和目标表:

源表:

CREATE TABLE zentao.`zt_group` (
    `id` MEDIUMINT(7) UNSIGNED NOT NULL AUTO_INCREMENT,
    `project` MEDIUMINT(7) UNSIGNED NOT NULL DEFAULT '0',
    `vision` VARCHAR(10) NOT NULL DEFAULT 'rnd' COLLATE 'utf8_general_ci',
    `name` CHAR(30) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
    `role` CHAR(30) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
    `desc` CHAR(255) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
    `acl` TEXT NULL DEFAULT NULL COLLATE 'utf8_general_ci',
    `developer` ENUM('0','1') NOT NULL DEFAULT '1' COLLATE 'utf8_general_ci',
    PRIMARY KEY (`id`) USING BTREE
);

INSERT INTO `zt_group` (`id`, `project`, `vision`, `name`, `role`, `desc`, `acl`, `developer`) VALUES (1, 0, 'rnd', '管理员', 'admin', '系统管理员', NULL, '1');
INSERT INTO `zt_group` (`id`, `project`, `vision`, `name`, `role`, `desc`, `acl`, `developer`) VALUES (2, 0, 'rnd', '研发', 'dev', '研发人员', NULL, '1');
INSERT INTO `zt_group` (`id`, `project`, `vision`, `name`, `role`, `desc`, `acl`, `developer`) VALUES (3, 0, 'rnd', '测试', 'qa', '测试人员', NULL, '1');
INSERT INTO `zt_group` (`id`, `project`, `vision`, `name`, `role`, `desc`, `acl`, `developer`) VALUES (4, 0, 'rnd', '项目经理', 'pm', '项目经理', NULL, '1');
INSERT INTO `zt_group` (`id`, `project`, `vision`, `name`, `role`, `desc`, `acl`, `developer`) VALUES (5, 0, 'rnd', '产品经理', 'po', '产品经理', NULL, '1');
INSERT INTO `zt_group` (`id`, `project`, `vision`, `name`, `role`, `desc`, `acl`, `developer`) VALUES (6, 0, 'rnd', '研发主管', 'td', '研发主管', NULL, '1');
 

目标表:

CREATE TABLE wfg.`zentao_zt_group` (
    `id` MEDIUMINT(7) UNSIGNED NOT NULL AUTO_INCREMENT,
    `project` MEDIUMINT(7) UNSIGNED NOT NULL DEFAULT '0',
    `vision` VARCHAR(10) NOT NULL DEFAULT 'rnd' COLLATE 'utf8_general_ci',
    `name` CHAR(30) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
    `role` CHAR(30) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
    `desc` CHAR(255) NOT NULL DEFAULT '' COLLATE 'utf8_general_ci',
    `acl` TEXT NULL DEFAULT NULL COLLATE 'utf8_general_ci',
    `developer` ENUM('0','1') NOT NULL DEFAULT '1' COLLATE 'utf8_general_ci',
    PRIMARY KEY (`id`) USING BTREE
);
 

3、SQL CDC同步数据代码

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class TenTao2wfgUserSql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        env.enableCheckpointing(5000);

        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        tEnv.executeSql("CREATE DATABASE IF NOT EXISTS source_test");
        tEnv.executeSql("CREATE DATABASE IF NOT EXISTS target_test");

        // 动态表,此为source表
        tEnv.executeSql("CREATE TABLE source_test.zentao (\n" +
                "    id                           INT,\n" +
                "    project                      INT,\n" +
                "    vision                       STRING,\n" +
                "    name                         STRING,\n" +
                "    role                         STRING,\n" +
                "    desc                         STRING,\n" +
                "    acl                          STRING,\n" +
                "    developer                    INT,\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'mysql-cdc',\n" +
                "  'hostname' = '127.0.0.1',\n" +
                "  'port' = '3306',\n" +
                "  'username' = 'root',\n" +
                "  'password' = '123456',\n" +
                "  'database-name' = 'zentao',\n" +
                "  'table-name' = 'zt_group',\n" +
                "  'scan.incremental.snapshot.enabled' = 'false'\n" +
                ")");
    // 动态表,此为sink表。sink表和source表的connector不一样
        tEnv.executeSql("CREATE TABLE target_test.zentao (\n" +
                "    id                           INT,\n" +
                "    project                      INT,\n" +
                "    vision                       STRING,\n" +
                "    name                         STRING,\n" +
                "    role                         STRING,\n" +
                "    desc                         STRING,\n" +
                "    acl                          STRING,\n" +
                "    developer                    INT,\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'jdbc',\n" +
                "  'url' = 'jdbc:mysql://127.0.0.1:3306/wfg',\n" +
                "  'username' = 'root',\n" +
                "  'password' = '123456',\n" +
                "  'table-name' = 'zentao_zt_group'\n" +
                ")");

        tEnv.executeSql("INSERT INTO target_test.zentao (id, project, vision, name, role, desc,acl,developer) \n" +
                "select f.id,\n" +
                "       f.project,\n" +
                "       f.vision,\n" +
                "       f.name,\n" +
                "       f.role,\n" +
                "       f.desc,\n" +
                "       f.acl,\n" +
                "       f.developer \n" +
                "       from source_test.zentao f ");
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到