数据库操作的准备 1、开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下: vi /etc/my.cnf [mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 重启 MySQL [root@localhost etc]# systemctl restart mysql 查看下binlog是否开启,如果值为on 代表开启。 SHOW VARIABLES LIKE 'log_bin'; 查看binlog的格式 show variables like '%binlog_format%'; https://dev.mysql.com/doc/refman/8.4/en/ https://dev.mysql.com/doc/refman/8.4/en/show-master-status.html 8.4中已经没有SHOW MASTER STATUS;这条命令,需要用SHOW BINARY LOG STATUS SHOW MASTER STATUS; 是一条 MySQL 查询命令,用于获取当前 MySQL 服务器作为主服务器时的二进制日志状态。这条命令对于设置从服务器的复制非常有用,因为它提供了从服务器需要的信息,以便知道从哪里开始复制数据。 SHOW BINARY LOG STATUS binlog.000034 158 2、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES; MySQL的slave需要的权限说明如下: REPLICATION SLAVE权限,这是Slave服务器必须拥有的最重要的权限。它允许Slave服务器连接到Master服务器并获取复制数据。如果Slave服务器没有REPLICATION SLAVE权限,它将无法连接到Master服务器并执行复制操作。 REPLICATION CLIENT权限,这个权限允许Slave服务器向Master服务器发送查询语句以获取复制数据。如果Slave服务器没有REPLICATION CLIENT权限,它将无法向Master服务器发送查询语句,从而无法获取复制数据。 SELECT权限,Slave服务器需要SELECT权限来读取Master服务器上的数据。如果Slave服务器没有SELECT权限,它将无法获取Master服务器上的数据,从而无法保持与Master服务器的同步。 查看权限 SHOW grants for 'canal' 显示:GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO `canal`@`%` 1、下载用1.1.7这个版本比较稳定 1.1 https://github.com/alibaba/canal/releases/tag/canal-1.1.7 1.2 解压缩和修改配置 mkdir canal mkdir canal-admin tar -zxvf canal.deployer-1.1.7.tar.gz -C canal/ tar -zxvf canal.admin-1.1.7.tar.gz -C canal-admin/ 修改conf下的canal.properties,主要改绑定的IP,注意:最好改成IP,别用127.0.0.1,因为要是程序和代码没在一台机器的话连不上。 [root@localhost conf]# vi canal.properties # tcp bind ip canal.ip = 192.168.150.50 修改/conf/example下的 instance.properties canal.instance.master.address=192.168.3.70:3306 # username/password这个是要同步的数据库的账号和密码这个里用的canal,前提是得提前在数据库里创建canal的账号和授权。 canal.instance.dbUsername=canal canal.instance.dbPassword=canal 1.3启动 /usr/local/software/canal/conf /usr/local/software/canal/bin 看日志 /usr/local/software/canal/logs/canal /usr/local/software/canal/bin ./startup.sh ./stop.sh
使用注意事项: 1)修改canal.admin-1.1.7\conf和/canal/conf下的application.yml 启动的驱动driver-class-name: com.mysql.cj.jdbc.Driver 2) java21去掉admin和canal bin目录下startup.sh里的java8的参数。删除掉:-XX:+AggressiveOpts -XX:-UseBiasedLocking 3)启动提示druid找不到 将druid的jar包放在lib目录就可以了。druid-1.2.22.jar测试通过 https://repo1.maven.org/maven2/com/alibaba/druid/1.2.22/ 2、加入依赖,Canal TCP模式需要的依赖 <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.7</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>1.1.7</version> </dependency> 3、canal 1.1.7 只能同步mysql8.0下的版本,mysql8.4版本的收不到消息。版本的观念要建立起来,或者我哪弄的不对。 4、RocketMQ 监听 canal的变化。 1)启动 RocketMQ 2)修改canal.properties # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ canal.serverMode = rocketMQ 3)写消费的监听者 参考RocketMQTool类 import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.demo.demo.entity.CanalBinlog; import org.apache.rocketmq.shaded.com.google.gson.Gson; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * canal.properties 设置生产者组rocketmq.producer.group = test 用的默认组 * example/instance.properties 修改主题 canal.mq.topic=rocketmq_toppic_01 */ @Component @RocketMQMessageListener(topic = "example", consumerGroup = "test", selectorExpression="*") public class RocketMQTool implements RocketMQListener<String> { private Gson gson = new Gson(); @Override public void onMessage(String mqJson){ CanalBinlog canalBinlog = gson.fromJson(mqJson, CanalBinlog.class); System.out.println(mqJson); System.out.println(canalBinlog); CanalBinlog canalBinlog2 = JSON.parseObject(mqJson, CanalBinlog.class); System.out.println("fastJson: "+JSON.toJSONString(canalBinlog2)); } }
import java.util.List; import java.util.Map; @Data public class CanalBinlog { // 数据 private List<Map<String,Object>> data; // 数据库名称 private String database; private long es; // 递增,从1开始 private int id; // 是否是DDL语句 private boolean isDdl; // 表结构的字段类型 private Map<String,Object> mysqlType; private List<Map<String,Object>> old; // 主键名称 private List<String> pkNames; // sql语句 private String sql; private Map<String,Object> sqlType; // 表名 private String table; private long ts; // INSERT、UPDATE、DELETE、ERASE(删除表) private String type; }