canal安装部署,tcp模式下的数据消费

发布于:2022-12-11 ⋅ 阅读:(759) ⋅ 点赞:(0)

第一步:开启mysql的binlog日志

1、修改mysql配置文件my.cnf
        //数据库在集群中的唯一 id
        server-id=1
        //binlog日志存放的地址 默认在数据库datadir目录下,mysql-bin是binlog文件的前缀名
        log-bin=mysql-bin
        //binlog日志格式 binlog_format= statement|mixed|row。三种格式的区别
        binlog-format=row
说明
1)statement:语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空
    间,但是可能产生不一致性,比如“update tt set create_date=now()”,如果用 binlog 日志
    进行恢复,由于执行时间不同可能产生的数据就不同。
    优点:节省空间。
    缺点:有可能造成数据不一致。

2)row:行级, binlog 会记录每次操作后每行记录的变化。
    优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录
    执行后的效果。
    缺点:占用较大空间。
3)mixed:statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement
    模式不一致问题,默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时;包含
    AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照
    ROW 的方式进行处理
   //开启binlog的数据库名称,不写默认所有库,databases_name 写自己的数据库名称
   binlog-do-db=database_name

第二步:创建canal同步数据的用户

1 创建用户

  账号:canal  密码: canal

create user 'canal'@'%' identified by 'canal';

2 给用户授权
 5.x数据库执行

set global validate_password_length=4;
set global validate_password_policy=0;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO  'canal'@'%' IDENTIFIED BY 'canal';
flush privileges;


8.x版本版本数据库执行

 set global validate_password.length=4;
 set global validate_password.policy=0;
 grant all privileges on *.* to 'canal'@'%' with grant option;
 flush privileges;

使用命令查看是否打开binlog模式:

SHOW VARIABLES like 'log_bin%';

 

log_bin = ON说明日志已经开启
查看binlog日志文件列表:
show binary logs;

 

查看当前正在写入的binlog文件:
show master status;

 第三步:配置canal server 服务端

canal的配置可以参考github canal 的文档
这里采用canal 1.1.5的版本 

canal.properties配置文件我们采用默认配置

修改conf目录下的canal.properties 配参数canal.serverMode = tcp  默认就是tcp
修改conf/example目录下的instance.properties文件中的数据库连接信息
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

//这里canal伪装成mysql slave服务,所以在集群里服务的id不能有重复的,这里slaveId和master中的id是不能重复的
canal.instance.mysql.slaveId=20

//这里配置需要拉取的表的binlog日志,可以采用正则表达式我们正比例采用数据库名称加表明多个表用,隔开
canal.instance.filter.regex=database_name.table_name

在bin目录下启动canal

第四步:消费canal数据

Java客户端消费canal数据的代码gihub 上有可以参考了。 地址

 maven依赖

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>1.1.5</version>
</dependency>
<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.protocol</artifactId>
  <version>1.1.6</version>
</dependency>

Java代码如下:

package com.wanqiao.service.impl;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.wanqiao.service.ReceiveDataService;
import com.wanqiao.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;

import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.List;

/**
 * canal消费客户端发送过来的数据
 */
@Slf4j
@Component
public class CannalConsume implements InitializingBean {

    @Autowired
    private ReceiveDataService receiveDataService;

    @Value("${canal.tablename}")
    private String tableName;

    @Value("${canal.ip}")
    private String canalIp;

    @Value("${canal.port}")
    private Integer canalPort;

    //每次获取十条数据
    @Value("${canal.batchsize}")
    private Integer batchSize = 10;

    private final static Long no_data_sleep_time = 5 * 1000l;

    @Override
    public void afterPropertiesSet() {
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(canalIp, canalPort), "example", "", "");
        try {
            //打开连接
            connector.connect();
            log.info("------------ canal服务连接成功!---------------------");
            //订阅数据库表,全部表
            connector.subscribe(tableName);
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) {
                
                Message message = connector.getWithoutAck(batchSize);
                //获取批量ID 此id非数据id
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    try {
                      
                        Thread.sleep(no_data_sleep_time);
                    } catch (InterruptedException e) {
                        log.error("没有数据 线程进入休眠状态 报错 e= {}", e);
                    }
                } else {
                    //如果有数据,处理数据
                    analyticalBinlogData(message.getEntries());
                }
                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            log.error("canal获取数据报错 e = {}", e);
        } finally {
            if(connector != null) {
                connector.disconnect();
            }
        }
    }

    /**
     * 解析binlog数据
     */
    private  void analyticalBinlogData(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                //开启/关闭事务的实体类型,跳过
                continue;
            }
            //RowChange对象,包含了一行数据变化的所有特征
            //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                log.error("解析binlog数据报错 e = {}, data = {}", e, entry.toString());
            }
            //获取操作类型:insert/update/delete类型
            if(rowChage == null) {
                continue;
            }
            CanalEntry.EventType eventType = rowChage.getEventType();
            //打印Header信息
            log.info("答应binlog日志文件铭传给你 name = {}, 数据偏移量 offset = {}, " +
                    "表名称 tableName = {}, 表操作类型 eventType = {}",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName(),
                    eventType);
            //判断是否是DDL语句
            if (rowChage.getIsDdl()) {
                continue;
            }
            if(eventType != CanalEntry.EventType.INSERT) {
                continue;
            }
            //获取RowChange对象里的每一行数据,打印出来
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                saveDateRedis(rowData.getAfterColumnsList());
            }
        }
    }

    private  void saveDateRedis(List<CanalEntry.Column> columns) {
        StringBuffer sb = new StringBuffer(" | ");
        for (CanalEntry.Column column : columns) {
            if("content".equals(column.getName()) && StringUtils.isNoneEmpty(column.getValue())) {
                try{
                    //处理数据逻辑在这里
                } catch (Exception e) {
                    log.error("推送数据失败 e = {}", e);
                }

            } else {
                sb.append(column.getName()).append("=").append(column.getValue()).append(" ;");
            }
        }
        log.info("从binlog读取到数据data = {}", sb.toString());
    }
}