Flink CDC介绍和简单实用

发布于:2023-01-24 ⋅ 阅读:(12) ⋅ 点赞:(0) ⋅ 评论:(0)

简介

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

种类

基于查询和基于binlog
在这里插入图片描述

基于日志的 CDC 方案介绍

从 ETL 的角度进行分析,一般采集的都是业务库数据,这里使用 MySQL 作为需要采集的数据库,通过 Debezium 把 MySQL Binlog 进行采集后发送至 Kafka 消息队列,然后对接一些实时计算引擎或者 APP 进行消费后把数据传输入 OLAP 系统或者其他存储介质。
Flink 希望打通更多数据源,发挥完整的计算能力。我们生产中主要来源于业务日志和数据库日志,Flink 在业务日志的支持上已经非常完善,但是在数据库日志支持方面在 Flink 1.11 前还属于一片空白,这就是为什么要集成 CDC 的原因之一。
Flink SQL 内部支持了完整的 changelog 机制,所以 Flink 对接 CDC 数据只需要把CDC 数据转换成 Flink 认识的数据,所以在 Flink 1.11 里面重构了 TableSource 接口,以便更好支持和集成 CDC。
在这里插入图片描述
在这里插入图片描述
重构后的 TableSource 输出的都是 RowData 数据结构,代表了一行的数据。在RowData 上面会有一个元数据的信息,我们称为 RowKind 。RowKind 里面包括了插入、更新前、更新后、删除,这样和数据库里面的 binlog 概念十分类似。通过 Debezium 采集的 JSON 格式,包含了旧数据和新数据行以及原数据信息,op 的 u表示是 update 更新操作标识符,ts_ms 表示同步的时间戳。因此,对接 Debezium JSON 的数据,其实就是将这种原始的 JSON 数据转换成 Flink 认识的 RowData。

flink作为etl工具

原工作原理
在这里插入图片描述
优化后
在这里插入图片描述
Flink SQL 采集+计算+传输(ETL)一体化优点:
 开箱即用,简单易上手
 减少维护的组件,简化实时链路,减轻部署成本
 减小端到端延迟
 Flink 自身支持 Exactly Once 的读取和计算
 数据不落地,减少存储成本
 支持全量和增量流式读取
 binlog 采集位点可回溯

应用场景

 实时数据同步,数据备份,数据迁移,数仓构建
优势:丰富的上下游(E & L),强大的计算(T),易用的 API(SQL),流式计算低延迟
 数据库之上的实时物化视图、流式数据分析
 索引构建和实时维护
 业务 cache 刷新
 审计跟踪
 微服务的解耦,读写分离
 基于 CDC 的维表关联

开源地址

https://github.com/ververica/flink-cdc-connectors

最新flink cdc官方文档分享

https://flink-learning.org.cn/article/detail/eed4549f80e80cc30c69c406cb08b59a

流程图

个人理解作图
在这里插入图片描述

1.X痛点

在这里插入图片描述所以设计目标
在这里插入图片描述
设计实现上
在对于有主键的表做初始化模式,整体的流程主要分为5个阶段:
1.Chunk切分;2.Chunk分配;(实现并行读取数据&CheckPoint)
3.Chunk读取;(实现无锁读取)
4.Chunk汇报;
5.Chunk分配。
对于并发线程
会对比各个读取切分的最高和最低的位置区间,超过区间进行更新

目前支持开发方式

个人理解作图
在这里插入图片描述

开发测试大致流程

个人理解作图
在这里插入图片描述

使用

mysql开启binlog

vi /etc/my.cnf 底部追加

server_id=2
log_bin=mysql-bin
binlog_format=ROW
# 下面这行可写可不写  监控对应的数据库
binlog_do_db=elebap_bak

重启mysqld服务, 并启动mysql

systemctl restart mysqld
或者
bin/mysqld --initialize --user=root --basedir=/usr/local/mysql --datadir=/data/mysql
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000001 |      154 |              |                  |                   |
+------------------+----------+--------------+------------------+-------------------+

mysql> show variables like '%log_bin%';
+---------------------------------+--------------------------------+
| Variable_name                   | Value                          |
+---------------------------------+--------------------------------+
| log_bin                         | ON                             |
| log_bin_basename                | /var/lib/mysql/mysql-bin       |
| log_bin_index                   | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF                            |
| log_bin_use_v1_row_events       | OFF                            |
| sql_log_bin                     | ON                             |
+---------------------------------+--------------------------------+
6 rows in set (0.01 sec)

log_bin显示ON开启状态。
mysql的建表以及插入数据:

CREATE TABLE study(
	ID INT NOT NULL PRIMARY KEY AUTO_INCREMENT ,
	NAME VARCHAR(20) NOT NULL,
	AGE INT(10)
);

INSERT INTO study VALUES(1 , 'a' , 10);
INSERT INTO study VALUES(2 , 'b' , 11);
INSERT INTO study VALUES(3 , 'c' , 12);
INSERT INTO study VALUES(4 , 'd' , 13);
INSERT INTO study VALUES(5 , 'e' , 14);
INSERT INTO study VALUES(6 , 'f' , 15);

代码

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.12.0</version>
</dependency>


import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

/**
 * @author wyi
 * @date 2022/8/18 11:06
 * @description
 */
public class flinkcdcTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties pops = new Properties();
        pops.setProperty("debezium.snapshot.locking.mode", "none");

        DebeziumSourceFunction<JSONObject> mysqlSource = MySQLSource.<JSONObject>builder()
                .hostname("192.168.80.161")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList(BussinessConstant.DATABASE_LIST)
                .tableList(BussinessConstant.ABLE_LIST_ALARM_CONFIG_CAP_UNBALANCE)
                .deserializer(new TestRuleDeserialization())
                .build();
        SingleOutputStreamOperator<Object> map = env.addSource(mysqlSource).map(new MapFunction<JSONObject, Object>() {
            @Override
            public Object map(JSONObject jsonObject) throws Exception {
                return jsonObject.toString();
            }
        });
        map.print();
        env.execute("CdcMysqlSource");
    }
}

自定义的序列化类

package com.cosmosource.da.cdc;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

/**
 * @author wyi
 * @date 2022/8/18 10:32
 * @description 这是一个demo,测试flink-cdc连接mysql的反序列化类
 */
public class TestRuleDeserialization implements DebeziumDeserializationSchema<JSONObject> {

    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<JSONObject> collector) throws Exception {
        //获取主题
        String topic = sourceRecord.topic();
        String[] arr = topic.split("\\.");
        String db = arr[1];
        String tableName = arr[2];

        System.out.println(arr[1]);
        System.out.println(arr[2]);

        //获取操作类型 READ DELETE UPDATE CREATE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        //获取值信息并转换为Struct类型
        Struct value = (Struct) sourceRecord.value();

        System.out.println("value:"+value);
        //获取变化后的数据
        Struct after = value.getStruct("after");

        //创建JSON对象用于存储数据信息
        JSONObject data = new JSONObject();
        for (Field field : after.schema().fields()) {
            Object o = after.get(field);
            data.put(field.name(), o);
        }

        //创建JSON对象用于封装最终返回值数据信息
        JSONObject result = new JSONObject();
        result.put("operation", operation.toString().toLowerCase());
        result.put("data", data);
        result.put("database", db);
        result.put("table", tableName);

        //发送数据至下游
        collector.collect(result);


    }

    @Override
    public TypeInformation<JSONObject> getProducedType() {

        return TypeInformation.of(JSONObject.class);
    }
}

结果:

……………………………………
……………………………………
……………………………………

wy
study
value:Struct{after=Struct{ID=9,NAME=1,AGE=15},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=last,db=wy,table=study,server_id=0,file=mysql-bin.000001,pos=3128,row=0},op=c,ts_ms=1660793058775}
八月 18, 2022 11:24:19 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to 192.168.80.161:3306 at mysql-bin.000001/3128 (sid:5501, cid:24)
5> {"database":"wy","data":{"ID":4,"NAME":"d","AGE":13},"operation":"create","table":"study"}
1> {"database":"wy","data":{"ID":8,"NAME":"1","AGE":15},"operation":"create","table":"study"}
8> {"database":"wy","data":{"ID":7,"NAME":"f","AGE":15},"operation":"create","table":"study"}
4> {"database":"wy","data":{"ID":3,"NAME":"c","AGE":12},"operation":"create","table":"study"}
2> {"database":"wy","data":{"ID":1,"NAME":"a","AGE":10},"operation":"create","table":"study"}
2> {"database":"wy","data":{"ID":9,"NAME":"1","AGE":15},"operation":"create","table":"study"}
3> {"database":"wy","data":{"ID":2,"NAME":"b","AGE":11},"operation":"create","table":"study"}
6> {"database":"wy","data":{"ID":5,"NAME":"e","AGE":14},"operation":"create","table":"study"}
7> {"database":"wy","data":{"ID":6,"NAME":"f","AGE":15},"operation":"create","table":"study"}