php使用Canal监听msyql

发布于:2024-05-06 ⋅ 阅读:(24) ⋅ 点赞:(0)
  • canal需要java8
    去官网下载java8
    在这里插入图片描述
安装JAVA
#创建目录
mkdir -p /usr/local/java/
#解压到目录
tar zxvf jdk-8u411-linux-x64.tar.gz -C /usr/local/java/

配置环境变量在 /etc/profile 最后加入

export JAVA_HOME=/usr/local/java/jdk1.8.0_411
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export  PATH=$PATH:$JAVA_HOME/bin

使之生效

source /etc/profile

查看是否安装成功

java -version

在这里插入图片描述

设置mysql用户权限
#创建用户名和密码都为 canal 的用户
create user 'canal'@'%' identified by 'canal';
#授予该用户对所有数据库和表的查询、复制主节点数据的操作权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES; #重新加载权限
修改mysql配置
 vim /etc/my.cnf 

修改部分内容如下

 # 开启 binlog
log-bin=mysql-bin
#master端的ID号,不能和 canal 的 slaveId 重复;
server-id=1
#行级,记录每次操作后每行记录的变化。
binlog-format=row
#指定库,缩小监控的范围。
binlog-do-db=test 

查看是否开启主从

show master status;

在这里插入图片描述

安装canal

下载最新canal
在这里插入图片描述

下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
#创建canal目录
mkdir -p /usr/local/canal/
#解压到canal目录
tar -zxvf canal.deployer-1.1.7.tar.gz  -C /usr/local/canal/

查看canal主配置文件

cat /usr/local/canal/conf/canal.properties

查看会看到暴露了三个端口和指定了一个实例

canal.admin.port = 11110
canal.port = 11111
canal.metrics.pull.port = 11112
#多个实例使用逗号分隔: canal.destinations = example1,example2,
#这里对应/usr/local/canal/conf/example/
canal.destinations = example

canal.destinations:canal能可以收集多个MySQL数据库数据,每个MySQL数据库都有独立的配置文件控制。具体配置规则: conf/目录下,使用文件夹放置,文件夹名代表一个MySQL实例。canal.destinations用于配置需要监控数据的数据库。如果是多个,使用,隔开。
修改实例配置文件

修改实例配置
vim /usr/local/canal/conf/example/instance.properties

在文件最后加入

#配置 slaveId ,不能等于 mysql 配置里的 server Id 即可
canal.instance.mysql.slaveId=10 
#数据库连接
canal.instance.master.address=127.0.0.1:3306 
#数据库账号密码
canal.instance.dbUsername=canal 
canal.instance.dbPassword=canal
#代表数据库的编码方式
canal.instance.connectionCharset = UTF-8
#设置白名单,这里是监控test库下所有表
canal.instance.filter.regex=test\\..*
#设置黑名单,这里设置排除test库下以_noc结尾的表
canal.instance.filter.black.regex=test\\..*_noc

  • 这个正则表达式 test\.*_noc 的含义是:
test:精确匹配字符串 "test"。
\\.:匹配一个点(.),因为点在正则表达式中有特殊含义,所以需要使用 \\ 进行转义。
*:匹配零个或多个任意字符。
_noc:精确匹配字符串 "_noc"。
因此,这个正则表达式可以用来匹配所有以 test. 开头且以 _noc 结尾的字符串。在 Canal 的配置中使用这个正则表达式,就可以实现排除以 test. 开头且以 _noc 结尾的数据库和表,而监控其他数据库和表。

比如我们要查询 test库下users表id为3的用户信息,sql语句可以这么写
SELECT * FROM test.users WHERE id = 3;
这里的test.users就是正则要匹配的地方

如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
启动和停止

#启动

 /usr/local/canal/bin/startup.sh

#停止

 /usr/local/canal/bin/stop.sh

查看是否启动成功

 ps -ef | grep canal

在这里插入图片描述

php测试

使用 canal-php

composer require xingwenge/canal_php

新建index.php文件

<?php
require __DIR__.'/vendor/autoload.php';
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\Fmt;
try {
    $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
    # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);
    $client->connect("127.0.0.1", 11111);
    $client->checkValid();
    //设置过滤tes库t下的所有表
    $client->subscribe("1001", "example", "test.*"); 
    while (true) {
        $message = $client->get(100);
        if ($entries = $message->getEntries()) {
            foreach ($entries as $entry) {
                Fmt::println($entry);
            }
        }
        sleep(1);
    }
    $client->disConnect();
} catch (\Exception $e) {
    echo $e->getMessage(), PHP_EOL;
}

运行php文件

php index.php

在这里插入图片描述

  • eventType:1、是新增行,2、修改行,3、删除行、4、新增表

也可以直接将数据写入rabbitmq
修改conf/canal.properties

vim conf/canal.properties

修改RabbitMQ配置如下

rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host = /
#交换机名称
rabbitmq.exchange =exchange.canal
#队列名称
rabbitmq.queue = canal_queue
#路由键名
rabbitmq.routingKey = canal-routing-key
#账号密码
rabbitmq.username =admin
rabbitmq.password =admin
#路由模式,这里是严格模式
rabbitmq.deliveryMode =direct

修改实例配置conf/example/instance.properties

vim conf/example/instance.properties

修改部分内容如下

#rabbitmq的路由键配置
#这里与canal.properties里的rabbitmq.routingKey一样
canal.mq.topic=canal-routing-key
canal.mq.partition=0
canal.instance.multi.stream.on=false

#rabbitmq 的 routing key
#dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
#hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*