(高阶)Redis 7 第13讲 数据双写一致性 canal篇

发布于:2023-09-22 ⋅ 阅读:(105) ⋅ 点赞:(0)

 面试题

问题 答案
如何保证mysql改动后,立即同步到Redis canal

 

简介

https://github.com/alibaba/canal/wikiicon-default.png?t=N7T8https://github.com/alibaba/canal/wiki

 基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 

 业务

  1. 数据库镜像
  2. 数据库实时备份
  3. 多级索引 (卖家和买家各自分库索引)
  4. search build
  5. 业务cache刷新
  6. 价格变化等重要业务消息

下载

官网 https://github.com/alibaba/canal/releases/tag/canal-1.1.6icon-default.png?t=N7T8https://github.com/alibaba/canal/releases/tag/canal-1.1.6
百度网盘 链接:https://pan.baidu.com/s/1Hs7JieAZA_q4lmvIdJZgFw?pwd=aqi2 
提取码:aqi2 

 Mysql 主从复制原理

Canal原理

  1. canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  2. MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  3. canal 解析 binary log 对象(原始为 byte 流)

 案例

mysql 环境 

# 查看mysql 版本
SELECT VERSION();

# 查看当前主机的二进制日志
SHOW MASTER status;

# 查看binlog 开启状态

SHOW VARIABLES LIKE 'log_bin'

 

 my.ini 配置

# 在mysqld中加入一下内容
[mysqld]
log-bin=mysql-bin #开启 binlog
binlog-format=ROW #选择 ROW 模式
server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复

 

 

 重启Mysql

 

创建canal用户并授权

DROP USER IF EXISTS 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';

GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

FLUSH PRIVILEGES;

SELECT * FROM mysql.`user`

canal安装配置

上传安装包

解压安装包

tar -zxvf canal.deployer-1.1.6.tar.gz

 配置文件地址

修改配置

启动canal

 

启动成功

 如果出现如下错误

Caused by: java.io.IOException: caching_sha2_password Auth failed
com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:260)

 

# 修改加密方式
select host,user,plugin from mysql.user ;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';

创建测试表

CREATE TABLE `t_user` (
  `id` int NOT NULL AUTO_INCREMENT,
  `userName` varchar(255) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

 创建Redis工具类

package com.mco.utils;

import cn.hutool.core.util.RandomUtil;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.GeoCoordinate;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.args.GeoUnit;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
 * @author :liao.wei
 * @date :2023/9/18 21:15
 * @package : com.mco.utils
 */
public class RedisUtils {
	private static Logger logger = LoggerFactory.getLogger(JedisPoolUtil.class);
	
	public static final String REDIS_IP_ADDR = "120.77.64.190";
	public static final String REDIS_PWD = "111111";
	
	public static JedisPool jedisPool;
	
	static {
		JedisPoolConfig poolConfig = new JedisPoolConfig();
		poolConfig.setMaxIdle(8);
		poolConfig.setMinIdle(2);
		poolConfig.setMaxWait(Duration.ofSeconds(30000));
		jedisPool = new JedisPool(poolConfig, REDIS_IP_ADDR, 6379, 10000, REDIS_PWD);
	}
	
	public static Jedis getJedis() throws Exception {
		if (null != jedisPool) {
			return jedisPool.getResource();
		}
		throw new Exception("Jedispool is not ok");
	}
}

Canal 业务类

public class RedisCanalClient {
	public static final Integer _60SECONDS = 60;
	public static final String  CANAL_IP_ADDR = "192.168.1.11";
	
	private static void redisInsert(List<Column> columns)
	{
		JSONObject jsonObject = new JSONObject();
		for (Column column : columns)
		{
			System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
			jsonObject.put(column.getName(),column.getValue());
		}
		if(columns.size() > 0)
		{
			try(Jedis jedis = RedisUtils.getJedis())
			{
				jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
			}catch (Exception e){
				e.printStackTrace();
			}
		}
	}
	
	
	private static void redisDelete(List<Column> columns)
	{
		JSONObject jsonObject = new JSONObject();
		for (Column column : columns)
		{
			jsonObject.put(column.getName(),column.getValue());
		}
		if(columns.size() > 0)
		{
			try(Jedis jedis = RedisUtils.getJedis())
			{
				jedis.del(columns.get(0).getValue());
			}catch (Exception e){
				e.printStackTrace();
			}
		}
	}
	
	private static void redisUpdate(List<Column> columns)
	{
		JSONObject jsonObject = new JSONObject();
		for (Column column : columns)
		{
			System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
			jsonObject.put(column.getName(),column.getValue());
		}
		if(columns.size() > 0)
		{
			try(Jedis jedis = RedisUtils.getJedis())
			{
				jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
				System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
			}catch (Exception e){
				e.printStackTrace();
			}
		}
	}
	
	public static void printEntry(List<Entry> entrys)
	{
		for (Entry entry : entrys) {
			if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
				continue;
			}
			
			RowChange rowChage = null;
			try {
				//获取变更的row数据
				rowChage = RowChange.parseFrom(entry.getStoreValue());
			} catch (Exception e) {
				throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
			}
			//获取变动类型
			EventType eventType = rowChage.getEventType();
			System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
					entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
					entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
			
			for (RowData rowData : rowChage.getRowDatasList()) {
				if (eventType == EventType.INSERT) {
					redisInsert(rowData.getAfterColumnsList());
				} else if (eventType == EventType.DELETE) {
					redisDelete(rowData.getBeforeColumnsList());
				} else {//EventType.UPDATE
					redisUpdate(rowData.getAfterColumnsList());
				}
			}
		}
	}
	
	
	public static void main(String[] args)
	{
		System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
		
		//=================================
		// 创建链接canal服务端
		CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CANAL_IP_ADDR,
				11111), "example", "", "");  // 这里用户名和密码如果在这写了,会覆盖canal配置文件的账号密码,如果不填从配置文件中读
		int batchSize = 1000;
		//空闲空转计数器
		int emptyCount = 0;
		System.out.println("---------------------canal init OK,开始监听mysql变化------");
		try {
			connector.connect();
			//connector.subscribe(".*\\..*");
			connector.subscribe("test.t_user");   // 设置监听哪个表
			connector.rollback();
			int totalEmptyCount = 10 * _60SECONDS;
			while (emptyCount < totalEmptyCount) {
				System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
				Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
				long batchId = message.getId();
				int size = message.getEntries().size();
				if (batchId == -1 || size == 0) {
					emptyCount++;
					try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
				} else {
					//计数器重新置零
					emptyCount = 0;
					printEntry(message.getEntries());
				}
				connector.ack(batchId); // 提交确认
				// connector.rollback(batchId); // 处理失败, 回滚数据
			}
			System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
		} finally {
			connector.disconnect();
		}
	}
}

 说明:

        

CANAL_IP_ADDR:canal 服务部署ip

InetSocketAddress: 端口可从canal.log 中查看

启动main方法

在数据库中新增一条数据

 查看Canal客户端监听

 查看Redis数据

POM引入

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <junit.version>4.12</junit.version>
        <log4j.version>1.2.17</log4j.version>
        <lombok.version>1.16.18</lombok.version>
        <mysql.version>5.1.47</mysql.version>
        <druid.version>1.1.16</druid.version>
        <mapper.version>4.1.5</mapper.version>
        <mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>4.3.1</version>
        </dependency>
        <!--canal-->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.0</version>
        </dependency>
        <!--SpringBoot通用依赖模块-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--SpringBoot与Redis整合依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <!--SpringBoot与AOP-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
        </dependency>
        <!--Mysql数据库驱动-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>
        <!--SpringBoot集成druid连接池-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>${druid.version}</version>
        </dependency>
        <!--mybatis和springboot整合-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>${mybatis.spring.boot.version}</version>
        </dependency>
        <!--hutool-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.2.3</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <optional>true</optional>
        </dependency>
        <!--persistence-->
        <dependency>
            <groupId>javax.persistence</groupId>
            <artifactId>persistence-api</artifactId>
            <version>1.0.2</version>
        </dependency>
        <!--通用Mapper-->
        <dependency>
            <groupId>tk.mybatis</groupId>
            <artifactId>mapper</artifactId>
            <version>${mapper.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.1</version>
        </dependency>
    </dependencies>

connector.subscribe过滤规则

源码地址icon-default.png?t=N7T8https://gitee.com/UniQue006/redis_example.git 

🌹 以上分享 Redis 数据一致性 canal应用,如有问题请指教。
 
🌹🌹 如你对技术也感兴趣,欢迎交流。
 
🌹🌹🌹  如有需要,请👍点赞💖收藏🐱‍🏍分享