第一步:开启mysql的binlog日志
1、修改mysql配置文件my.cnf
//数据库在集群中的唯一 id
server-id=1
//binlog日志存放的地址 默认在数据库datadir目录下,mysql-bin是binlog文件的前缀名
log-bin=mysql-bin
//binlog日志格式 binlog_format= statement|mixed|row。三种格式的区别
binlog-format=row
说明
1)statement:语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空
间,但是可能产生不一致性,比如“update tt set create_date=now()”,如果用 binlog 日志
进行恢复,由于执行时间不同可能产生的数据就不同。
优点:节省空间。
缺点:有可能造成数据不一致。
2)row:行级, binlog 会记录每次操作后每行记录的变化。
优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录
执行后的效果。
缺点:占用较大空间。
3)mixed:statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement
模式不一致问题,默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时;包含
AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照
ROW 的方式进行处理
//开启binlog的数据库名称,不写默认所有库,databases_name 写自己的数据库名称
binlog-do-db=database_name
第二步:创建canal同步数据的用户
1 创建用户
账号:canal 密码: canal
create user 'canal'@'%' identified by 'canal';
2 给用户授权
5.x数据库执行
set global validate_password_length=4;
set global validate_password_policy=0;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
flush privileges;
8.x版本版本数据库执行
set global validate_password.length=4;
set global validate_password.policy=0;
grant all privileges on *.* to 'canal'@'%' with grant option;
flush privileges;
使用命令查看是否打开binlog模式:
SHOW VARIABLES like 'log_bin%';
log_bin = ON说明日志已经开启
查看binlog日志文件列表:
show binary logs;
查看当前正在写入的binlog文件:
show master status;
第三步:配置canal server 服务端
canal的配置可以参考github canal 的文档
这里采用canal 1.1.5的版本
canal.properties配置文件我们采用默认配置
修改conf目录下的canal.properties 配参数canal.serverMode = tcp 默认就是tcp
修改conf/example目录下的instance.properties文件中的数据库连接信息
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
//这里canal伪装成mysql slave服务,所以在集群里服务的id不能有重复的,这里slaveId和master中的id是不能重复的
canal.instance.mysql.slaveId=20
//这里配置需要拉取的表的binlog日志,可以采用正则表达式我们正比例采用数据库名称加表明多个表用,隔开
canal.instance.filter.regex=database_name.table_name
在bin目录下启动canal
第四步:消费canal数据
Java客户端消费canal数据的代码gihub 上有可以参考了。 地址
maven依赖
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.5</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>1.1.6</version> </dependency>
Java代码如下:
package com.wanqiao.service.impl;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.wanqiao.service.ReceiveDataService;
import com.wanqiao.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.List;
/**
* canal消费客户端发送过来的数据
*/
@Slf4j
@Component
public class CannalConsume implements InitializingBean {
@Autowired
private ReceiveDataService receiveDataService;
@Value("${canal.tablename}")
private String tableName;
@Value("${canal.ip}")
private String canalIp;
@Value("${canal.port}")
private Integer canalPort;
//每次获取十条数据
@Value("${canal.batchsize}")
private Integer batchSize = 10;
private final static Long no_data_sleep_time = 5 * 1000l;
@Override
public void afterPropertiesSet() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(canalIp, canalPort), "example", "", "");
try {
//打开连接
connector.connect();
log.info("------------ canal服务连接成功!---------------------");
//订阅数据库表,全部表
connector.subscribe(tableName);
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize);
//获取批量ID 此id非数据id
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
//如果没有数据
if (batchId == -1 || size == 0) {
try {
Thread.sleep(no_data_sleep_time);
} catch (InterruptedException e) {
log.error("没有数据 线程进入休眠状态 报错 e= {}", e);
}
} else {
//如果有数据,处理数据
analyticalBinlogData(message.getEntries());
}
//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
connector.ack(batchId);
}
} catch (Exception e) {
log.error("canal获取数据报错 e = {}", e);
} finally {
if(connector != null) {
connector.disconnect();
}
}
}
/**
* 解析binlog数据
*/
private void analyticalBinlogData(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
//开启/关闭事务的实体类型,跳过
continue;
}
//RowChange对象,包含了一行数据变化的所有特征
//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
log.error("解析binlog数据报错 e = {}, data = {}", e, entry.toString());
}
//获取操作类型:insert/update/delete类型
if(rowChage == null) {
continue;
}
CanalEntry.EventType eventType = rowChage.getEventType();
//打印Header信息
log.info("答应binlog日志文件铭传给你 name = {}, 数据偏移量 offset = {}, " +
"表名称 tableName = {}, 表操作类型 eventType = {}",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName(),
eventType);
//判断是否是DDL语句
if (rowChage.getIsDdl()) {
continue;
}
if(eventType != CanalEntry.EventType.INSERT) {
continue;
}
//获取RowChange对象里的每一行数据,打印出来
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
saveDateRedis(rowData.getAfterColumnsList());
}
}
}
private void saveDateRedis(List<CanalEntry.Column> columns) {
StringBuffer sb = new StringBuffer(" | ");
for (CanalEntry.Column column : columns) {
if("content".equals(column.getName()) && StringUtils.isNoneEmpty(column.getValue())) {
try{
//处理数据逻辑在这里
} catch (Exception e) {
log.error("推送数据失败 e = {}", e);
}
} else {
sb.append(column.getName()).append("=").append(column.getValue()).append(" ;");
}
}
log.info("从binlog读取到数据data = {}", sb.toString());
}
}