为什么需要监控数据库变化?
当 Flowable 表中的数据发生变化(例如插入新任务、更新状态或删除记录),我们可能需要触发其他操作,比如通知用户、更新仪表盘或启动新流程。Maxwell 可以读取 MySQL 的二进制日志(binlog),将变化事件以 JSON 格式发送到 RabbitMQ 消息队列,供其他系统消费。
准备工作
你需要以下组件:
MySQL:运行 Flowable 的数据库,需启用二进制日志(binlog)。
RabbitMQ:消息队列,用于接收 Maxwell 发送的事件。
Maxwell:读取 MySQL binlog 并发送事件到 RabbitMQ。
Flowable:已部署,数据库中有需要监控的表(例如 flowable.ACT_RU_TASK)。
步骤 1:配置 MySQL
确保 MySQL 启用了二进制日志。以下是一个简单的 Docker Compose 配置:
mysql:
image: mysql:8.0
container_name: mysql
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql
command:
- "--log_bin=mysql-bin"
- "--server_id=1"
- "--binlog_format=ROW"
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: flowable
MYSQL_USER: maxwell
MYSQL_PASSWORD: maxwell_password
log_bin=mysql-bin:启用二进制日志。
server_id=1:设置唯一服务器 ID。
binlog_format=ROW:使用 ROW 格式,适合 Maxwell 解析。
为 Maxwell 创建用户并授予权限:
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'maxwell'@'%' IDENTIFIED BY 'maxwell_password';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
步骤 2:启动 RabbitMQ
使用 Docker 运行 RabbitMQ:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
访问 http://localhost:15672(默认用户/密码:guest/guest)检查 RabbitMQ 是否正常运行。
步骤 3:配置 Maxwell
Maxwell 读取 MySQL 的 binlog 并发送事件到 RabbitMQ。创建一个配置文件 maxwell.properties:
log_level=INFO
host=mysql
port=3306
user=maxwell
password=maxwell_password
producer=rabbitmq
rabbitmq_host=rabbitmq
rabbitmq_port=5672
rabbitmq_user=guest
rabbitmq_pass=guest
rabbitmq_exchange=maxwell
rabbitmq_exchange_type=topic
rabbitmq_routing_key_template=%database%.%table%
filter=exclude: *.*, include: flowable.*
replica_server_id=64
filter:只监控 flowable 数据库的表。
rabbitmq_exchange:事件发送到 maxwell 交换机。
运行 Maxwell:
docker run -d --name maxwell \
-v $(pwd)/maxwell.properties:/config.properties \
--network=host \
zendesk/maxwell:latest \
bin/maxwell --config=/config.properties
注意:确保 Maxwell、MySQL 和 RabbitMQ 在同一网络(例如使用 --network=host 或自定义 Docker 网络)。
步骤 4:测试监控
在 Flowable 数据库中插入一条任务记录:
INSERT INTO flowable.ACT_RU_TASK (ID_, NAME_, PRIORITY_, CREATE_TIME_) VALUES ('task123', 'Test Task', 50, NOW());
Maxwell 将捕获变更并发送 JSON 事件到 RabbitMQ,例如:
{ "database": "flowable", "table": "ACT_RU_TASK", "type": "insert", "data": { "id_": "task123", "name_": "Test Task", "priority_": 50, "create_time_": "2025-08-06 17:03:00" } }
在 RabbitMQ 管理界面(http://localhost:15672):
创建一个队列(例如 flowable_queue)。
绑定到 maxwell 交换机,路由键为 flowable.*。
检查队列中的消息。
步骤 5:消费事件
开发一个简单的消费者来处理 RabbitMQ 的事件。以下是一个 Python 示例:
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='flowable_queue')
channel.queue_bind(exchange='maxwell', queue='flowable_queue', routing_key='flowable.*')
def callback(ch, method, properties, body):
event = json.loads(body)
print(f"Received event: {event}")
if event['table'] == 'ACT_RU_TASK' and event['type'] == 'insert':
print(f"New task created: ID={event['data']['id_']}, Name={event['data']['name_']}")
channel.basic_consume(queue='flowable_queue', on_message_callback=callback, auto_ack=True)
channel.start_consuming()
保存为 consumer.py 并运行:
pip install pika
python consumer.py
当插入新任务时,消费者会打印类似以下输出:
Received event: {'database': 'flowable', 'table': 'ACT_RU_TASK', 'type': 'insert', ...}
New task created: ID=task123, Name=Test Task
总结
通过 Maxwell 和 RabbitMQ,我们可以轻松监控 Flowable 表的变更,并将事件发送到消息队列供其他系统使用。这个方案简单高效,适合实时数据处理场景。下一步,你可以:
优化 Maxwell 过滤规则,只监控特定表(如 ACT_RU_TASK)。
将事件集成到 Flowable 流程,触发自动化任务。
使用 Prometheus 监控 RabbitMQ 队列性能。