RabbitMQ实践学习笔记

发布于:2025-07-22 ⋅ 阅读:(14) ⋅ 点赞:(0)

RabbitMQ实践

以下是关于RabbitMQ实践的整理,涵盖常见场景和示例代码(基于Markdown格式)。内容按模块分类,避免步骤词汇,直接提供可操作的方法:

基础连接与队列声明

使用Python的pika库建立连接并声明队列:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')  # 声明持久化队列可添加参数 durable=True

消息发布示例:

channel.basic_publish(exchange='',
                     routing_key='hello',
                     body='Hello World!',
                     properties=pika.BasicProperties(delivery_mode=2))  # 消息持久化


工作队列模式

消费者端的公平分发(prefetch)设置:

channel.basic_qos(prefetch_count=1)  # 每次只处理一条消息
channel.basic_consume(queue='task_queue', on_message_callback=callback)

消息确认机制:

def callback(ch, method, properties, body):
    print("Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认


发布/订阅模式

声明扇形交换机:

channel.exchange_declare(exchange='logs', exchange_type='fanout')

临时队列绑定:

result = channel.queue_declare(queue='', exclusive=True)
channel.queue_bind(exchange='logs', queue=result.method.queue)

路由与主题模式

直连交换机实现路由键过滤:

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key='error')

主题交换机匹配模式:

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='*.critical')

RPC远程调用

客户端发送请求并监听回调队列:

result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(
    exchange='',
    routing_key='rpc_queue',
    properties=pika.BasicProperties(
        reply_to=callback_queue,
        correlation_id=corr_id,
    ),
    body=request_body)

服务端响应处理:

def on_request(ch, method, props, body):
    response = process_request(body)
    ch.basic_publish(
        exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(correlation_id=props.correlation_id),
        body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


消息TTL与死信队列

设置消息过期时间:

channel.basic_publish(
    exchange='',
    routing_key='queue_with_ttl',
    body='expiring message',
    properties=pika.BasicProperties(expiration='60000'))  # 60秒后过期

配置死信交换机:

args = {"x-dead-letter-exchange": "dlx_exchange"}
channel.queue_declare(queue='work_queue', arguments=args)


插件与扩展

延迟消息通过插件实现:

headers = {"x-delay": 5000}  # 延迟5秒
channel.basic_publish(
    exchange='delayed_exchange',
    routing_key='delayed_queue',
    body='delayed msg',
    properties=pika.BasicProperties(headers=headers))

集群与高可用

配置镜像队列策略:

rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'  # 同步所有节点

客户端连接多个节点:

hosts = [{'host': 'node1'}, {'host': 'node2'}]
connection = pika.BlockingConnection(pika.ConnectionParameters(hosts))

监控与管理

通过API获取队列状态:

import requests
response = requests.get('http://localhost:15672/api/queues', auth=('guest', 'guest'))

以上示例覆盖了RabbitMQ的核心功能场景,包括消息模式、可靠性保证和扩展功能。实际应用时需根据业务需求调整参数和错误处理机制。更多细节可参考RabbitMQ官方文档和社区最佳实践。

RabbitMQ 源码分析(C++ 客户端)

RabbitMQ 是一个开源的消息代理软件,采用 Erlang 语言编写。C++ 客户端是 RabbitMQ 官方提供的客户端库之一,用于在 C++ 项目中与 RabbitMQ 服务器进行交互。以下是对 RabbitMQ C++ 客户端源码的分析框架和关键点。

源码结构与核心模块

RabbitMQ C++ 客户端的源码主要包含以下几个核心模块:

AMQP 协议实现
C++ 客户端基于 AMQP 0-9-1 协议实现,核心代码位于 amqp.hamqp.c 中。这部分负责协议的编解码、帧的构造与解析。

Socket 通信层
使用系统套接字(Socket)实现与 RabbitMQ 服务器的 TCP 通信。代码位于 amqp_socket.hamqp_socket.c,支持普通 Socket 和 SSL/TLS 加密通信。

连接管理
amqp_connection.h 中定义了连接的生命周期管理,包括连接建立、心跳检测、连接关闭等逻辑。

通道管理
通过 amqp_channel.h 实现多路复用机制,单个 TCP 连接可以支持多个逻辑通道(Channel),每个通道独立处理消息。

消息发布与消费
amqp_queue.hamqp_exchange.h 实现了队列和交换器的声明、绑定操作。消息的发布(basic_publish)和消费(basic_consume)逻辑在 amqp_basic.h 中定义。


关键流程分析

连接建立流程

  1. 调用 amqp_new_connection 创建连接对象。
  2. 通过 amqp_socket_open 建立 Socket 连接。
  3. 发送协议头并协商参数(amqp_login)。

消息发布流程

  1. 声明交换器(amqp_exchange_declare)。
  2. 构造消息属性(amqp_basic_properties_t)。
  3. 调用 amqp_basic_publish 发送消息帧。

消息消费流程

  1. 声明队列并绑定到交换器(amqp_queue_bind)。
  2. 调用 amqp_basic_consume 订阅队列。
  3. 通过 amqp_consume_message 循环接收消息。

核心数据结构

帧(Frame)结构
RabbitMQ 使用帧作为通信的基本单位,帧类型包括:

  • METHOD:传递 AMQP 方法(如 queue.declare)。
  • HEADER:消息属性。
  • BODY:消息内容。

连接状态机
通过 amqp_connection_state_t 管理连接状态,包括协议版本、通道列表、心跳超时等。


示例代码片段

以下是使用 C++ 客户端发布消息的简化代码:

#include <amqp.h>
#include <amqp_tcp_socket.h>

void publish_message() {
    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t *socket = amqp_tcp_socket_new(conn);
    amqp_socket_open(socket, "localhost", 5672);

    amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    amqp_channel_open(conn, 1);

    amqp_basic_publish(conn, 1, amqp_cstring_bytes("exchange_name"),
                       amqp_cstring_bytes("routing_key"), 0, 0, NULL,
                       amqp_cstring_bytes("Hello, RabbitMQ!"));

    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
}


性能优化与调试

连接复用
通过通道复用 TCP 连接,减少频繁建立连接的开销。

异步模式
使用 amqp_consume_message 的非阻塞模式,结合事件循环实现高吞吐量。

调试工具

  • 启用 AMQP_DEBUG 宏打印协议帧日志。
  • 使用 Wireshark 抓包分析 AMQP 流量。

参考资源

  1. RabbitMQ 官方文档
  2. AMQP 0-9-1 协议规范
  3. rabbitmq-c 源码仓库

通过分析源码,可以更深入地理解 RabbitMQ 的通信机制和设计思想,便于定制化开发或性能调优。

基于Python和RabbitMQ的基础连接与队列

以下是基于Python和RabbitMQ的基础连接与队列声明的实例代码示例,涵盖不同场景和参数配置:


基础连接与简单队列声明

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
connection.close()

声明持久化队列

channel.queue_declare(queue='durable_queue', durable=True)


声明带自定义参数的队列

channel.queue_declare(queue='custom_queue', arguments={'x-message-ttl': 60000})


声明排他队列

channel.queue_declare(queue='exclusive_queue', exclusive=True)


声明自动删除队列

channel.queue_declare(queue='auto_delete_queue', auto_delete=True)


声明队列并绑定死信交换器

args = {'x-dead-letter-exchange': 'dlx_exchange'}
channel.queue_declare(queue='with_dlx', arguments=args)


声明优先级队列

channel.queue_declare(queue='priority_queue', arguments={'x-max-priority': 10})


声明延迟队列(通过插件)

channel.queue_declare(queue='delayed_queue', arguments={'x-delayed-type': 'direct'})


声明长度限制队列

channel.queue_declare(queue='length_limited', arguments={'x-max-length': 1000})


声明多消费者队列

channel.queue_declare(queue='multi_consumer', durable=True)
channel.basic_qos(prefetch_count=1)  # 公平分发

使用SSL加密连接

ssl_params = pika.SSLOptions(ssl.SSLContext(), 'localhost')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(port=5671, ssl_options=ssl_params))

连接集群节点

params = pika.ConnectionParameters(host='node1.cluster')
connection = pika.BlockingConnection(params)

使用连接池

from rabbitmq import ConnectionPool
pool = ConnectionPool(pika.ConnectionParameters, max_size=10)

声明队列并返回队列信息

result = channel.queue_declare(queue='passive_queue', passive=True)
print(f"Queue has {result.method.message_count} messages")

声明无参数默认队列

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

使用异步连接适配器

import pika
connection = pika.SelectConnection(pika.ConnectionParameters('localhost'))

连接失败重试机制

parameters = pika.ConnectionParameters(
    host='localhost',
    connection_attempts=5,
    retry_delay=3
)

声明队列并绑定多个路由键

channel.queue_declare(queue='multi_binding')
channel.queue_bind(exchange='logs', queue='multi_binding', routing_key='info')
channel.queue_bind(exchange='logs', queue='multi_binding', routing_key='error')

使用URL参数连接

url = 'amqp://user:pass@host:port/vhost'
parameters = pika.URLParameters(url)
connection = pika.BlockingConnection(parameters)

声明队列时设置TTL

args = {'x-expires': 3600000}  # 1小时后自动删除
channel.queue_declare(queue='temp_queue', arguments=args)

注意事项

  • 所有示例需预先安装pika库:pip install pika
  • 生产环境建议添加异常处理逻辑
  • 连接参数应根据实际RabbitMQ服务器配置调整
  • 队列属性(如durable)需与已存在队列的属性一致,否则会报错

每个示例均可独立运行,建议根据实际需求组合使用不同参数。

RabbitMQ 工作队列模式与 prefetch 设置

RabbitMQ 的工作队列模式(Work Queue)用于在多个消费者之间分发任务。通过 prefetch 参数可以控制消费者未确认消息的最大数量,从而优化任务分配效率。

Python 实现工作队列模式

安装 RabbitMQ 的 Python 客户端库:

pip install pika

生产者代码示例(发送任务):

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

for i in range(10):
    message = f"Task {i}"
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(delivery_mode=2)  # 消息持久化
    )
    print(f" [x] Sent {message}")

connection.close()

消费者代码示例(处理任务):

import pika
import time

def callback(ch, method, properties, body):
    print(f" [x] Received {body.decode()}")
    time.sleep(body.count(b'.'))  # 模拟耗时任务
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
channel.basic_qos(prefetch_count=1)  # 关键 prefetch 设置
channel.basic_consume(queue='task_queue', on_message_callback=callback)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

prefetch 参数详解

prefetch_count=1 表示每个消费者最多只能有一个未确认的消息。该设置能实现:

  • 公平调度:避免某个消费者积压大量消息,而其他消费者空闲
  • 负载均衡:新任务会自动分配给空闲的消费者
  • 流量控制:防止消费者过载

对于需要更高吞吐量的场景,可以适当增大 prefetch_count 值,但需注意:


网站公告

今日签到

点亮在社区的每一天
去签到