RabbitMQ

发布于:2025-06-11 ⋅ 阅读:(29) ⋅ 点赞:(0)


官方文档:https://www.rabbitmq.com/docs

什么是RabbitMQ?

RabbitMQ 是一个消息代理。它接收来自发布者的消息,路由它们,并且(如果有可路由到的队列)存储它们以供使用,或者立即发送给消费者(如果有)。

发布者根据协议的不同将信息发布到不同的目的地。

协议

RabbitMQ 支持的每个协议中,发布消息的过程都非常相似。所有四种协议都允许用户发布包含有效负载(消息体)和一个或多个消息属性(消息头)的消息。

所有四种协议还支持发布者的确认机制,该机制允许发布应用程序跟踪代理已成功或未成功接受的消息,并继续发布下一批消息或重试发布当前消息。

  • AMQP协议(高级消息队列协议):RabbitMQ的核心协议
  • STOMP(简单文本导向的消息协议)
  • MQTT(消息队列遥测传输)
  • HTTP API(REST API)

AMQP协议

是 RabbitMQ 的核心协议,提供可靠、异步的消息传递。

  • 支持事务、确认机制(publisher confirms)和持久化。
  • 提供丰富的消息路由模型(交换机类型)。
  • 广泛支持各种编程语言的客户端库。

主要组件:

Broker(消息代理服务器):RabbitMQ 服务器实例,负责接收客户端连接、管理消息路由和队列

Exchange(交换机):消息路由的核心组件,根据规则将消息分发到对应队列。

  • Direct:按消息路由键(Routing Key)精确匹配队列。
  • Topic:按通配符模式(如*.order.*)匹配队列。
  • Fanout:广播消息到所有绑定的队列。
  • Headers:按消息头(Headers)属性匹配队列。

Queue(队列):消息的存储容器,保存等待消费的消息,支持持久化(保存在磁盘)、消息过期(TTL)、死信队列(DLQ)等机制。

Binding(绑定):建立交换机与队列的关联关系,定义消息路由规则。

  • 对 Direct 交换机:绑定规则为具体的 Routing Key。
  • 对 Topic 交换机:绑定规则为通配符模式。
  • 对 Fanout 交换机:绑定规则无意义(广播所有消息)。

Connection(连接):客户端与 Broker 之间的 TCP 长连接,维护通信通道。

Channel(信道):基于 Connection 的逻辑会话通道,用于执行具体的消息操作(发布、消费、确认等)

Virtual Host(虚拟主机):逻辑隔离的资源分组,类似数据库中的 “Schema”。

Message(消息):由消息体(Payload)和元数据(路由键、消息头、属性等)组成

  • Routing Key:用于 Direct/Topic 交换机的路由。
  • Headers:用于 Headers 交换机的匹配条件。
  • Delivery Mode:标记消息是否持久化(1 = 非持久,2 = 持久)。

队列(Queue)

  • 标准队列(Standard Queue)
  • 持久化队列(Durable Queue)
  • 惰性队列(Lazy Queue)
  • 镜像队列(Mirrored Queue)
  • 死信队列(Dead Letter Queue, DLQ)
  • 延迟队列(Delayed Queue)
  • 临时队列(Temporary Queue)
  • 优先级队列(Priority Queue)
1. 标准队列(Standard Queue)
  • 特点:
    • 最基础的队列类型,遵循 “生产者 - 消费者” 模型,消息被消费后从队列中移除
    • 支持基本的队列属性配置(如持久化、排他性、自动删除等)。
  • 关键属性:
    • durable:是否持久化(消息和队列元数据保存到磁盘,重启后恢复)。
    • exclusive:是否排他(仅创建它的连接可访问,连接关闭后队列自动删除)。
    • auto-delete:是否自动删除(最后一个消费者取消订阅后自动删除)。
  • 适用场景:
    • 常规的异步任务处理、消息通信。
2. 持久化队列(Durable Queue)
  • 特点:
    • 通过设置durable=true实现,队列元数据和消息会持久化到磁盘
    • 即使 RabbitMQ 服务重启,队列和未消费的消息仍可恢复。
  • 技术实现:
    • 队列声明时指定durable参数为true
    • 消息需配合deliveryMode=2(持久化标记)才能同时持久化消息体。
  • 适用场景:
    • 对消息可靠性要求高的场景(如订单处理、金融交易)。
3. 惰性队列(Lazy Queue)
  • 特点:
    • 基于lazy参数(RabbitMQ 3.6.0 + 引入),将消息优先存储在磁盘而非内存
    • 减少内存占用,适合处理海量消息(百万级以上)。
  • 技术实现:
    • 声明队列时设置"x-queue-type": "lazy"(或通过插件配置)。
    • 消费时按需将消息加载到内存,降低峰值内存压力。
  • 适用场景:
    • 日志收集、大数据流处理、消息积压缓冲。
4. 镜像队列(Mirrored Queue)
  • 特点:
    • 通过集群镜像策略,将队列数据同步到多个节点,实现高可用性。
    • 主节点故障时,从节点自动接管,避免单点故障。
  • 配置方式:
    • 通过 RabbitMQ 管理界面或策略(Policy)设置ha-mode参数(如allexactly-n)。
    • 同步方式:异步复制,可能存在短暂数据不一致。
  • 适用场景:
    • 对服务可用性要求极高的核心业务(如实时通信、交易系统)。
5. 死信队列(Dead Letter Queue, DLQ)
  • 特点:
    • 用于存储无法被正常消费的 “死信” 消息,通常因以下原因进入死信队列:
      • 消息被消费者拒绝(basic.reject/basic.nack且未重新入队)。
      • 消息超时未被消费(设置x-message-ttl)。
      • 队列达到最大长度(设置x-max-lengthx-max-length-bytes)。
  • 技术实现:
    • 为队列设置x-dead-letter-exchange参数,指定死信交换机。
    • 死信交换机再将消息路由到死信队列。
  • 适用场景:
    • 消息重试机制、异常消息监控和处理。
6. 延迟队列(Delayed Queue)
  • 特点:
    • 消息在指定延迟时间后才允许被消费,非 RabbitMQ 原生功能,需插件或间接实现。
  • 实现方式:
    • 插件方式:使用rabbitmq-delayed-message-exchange插件,通过交换机实现延迟。
    • TTL + 死信队列方式:给消息设置x-message-ttl,超时后通过死信交换机路由到目标队列。
  • 适用场景:
    • 订单超时取消、任务定时执行、消息重试延迟。
7. 临时队列(Temporary Queue)
  • 特点:
    • 通常设置为exclusive=trueauto-delete=true连接关闭后自动删除
    • 无需显式删除,适用于临时任务或一次性消费场景。
  • 典型用法:
    • 客户端创建临时队列用于响应式通信(如 RPC 模式中的回调队列)。
8. 优先级队列(Priority Queue)
  • 特点:
    • 通过x-max-priority参数设置队列优先级,高优先级消息优先被消费。
  • 技术实现:
    • 声明队列时指定x-max-priority(如 10),消息发送时设置priority属性。
  • 适用场景:
    • 任务调度(如紧急订单优先处理)、资源抢占场景。

交换机(Exchange)

  • 直连交换机(Direct Exchange)
  • 主题交换机(Topic Exchange)
  • 扇形交换机(Fanout Exchange)
  • 头交换机(Headers Exchange)
1. 直连交换机(Direct Exchange)
  • 路由规则:根据消息的路由键(Routing Key)和绑定键(Binding Key)的精确匹配进行路由。
  • 绑定方式:
    • 队列通过指定 Binding Key 绑定到直连交换机。
    • 当消息的 Routing Key 与某个队列的 Binding Key 完全匹配时,消息被路由到该队列。
  • 适用场景:
    • 基于类型的任务分发:例如,将用户注册、订单支付等不同类型的消息路由到不同队列。
    • 优先级路由:将高优先级订单路由到专门的队列处理。
2. 主题交换机(Topic Exchange)
  • 路由规则:

    根据消息的路由键和绑定键的模式匹配进行路由。绑定键支持两种通配符:

    • *(星号):匹配一个单词(例如order.*匹配order.create但不匹配order.create.paid)。
    • #(井号):匹配零个或多个单词(例如order.#匹配orderorder.createorder.create.paid)。
  • 绑定方式:

    • 队列通过带通配符的 Binding Key 绑定到主题交换机。
    • 当消息的 Routing Key 满足某个队列的 Binding Key 模式时,消息被路由到该队列。
  • 适用场景:

    • 系统事件广播:例如,user.createdorder.paid等事件按主题分类。
    • 日志分类:将不同级别的日志(如system.errorapplication.warning)路由到不同队列。
3. 扇形交换机(Fanout Exchange)
  • 路由规则:将接收到的所有消息广播到所有与之绑定的队列,完全忽略消息的路由键。
  • 绑定方式:
    • 队列直接绑定到扇形交换机,无需指定 Binding Key(或 Binding Key 为空)。
    • 交换机接收到的所有消息都会被复制到所有绑定的队列。
  • 适用场景:
    • 配置更新:当系统配置变更时,同时通知所有服务。
    • 事件广播:如系统通知、用户登录事件,需要多个服务同时处理。
4. 头交换机(Headers Exchange)
  • 路由规则:

    根据消息的头部信息(Headers)而非路由键进行匹配。匹配方式有两种:

    • x-match=all:所有指定的头键值对必须匹配。
    • x-match=any:至少一个头键值对匹配。

    绑定方式:

    • 队列通过指定 Headers 键值对和匹配方式(x-match)绑定到头交换机。
    • 当消息的 Headers 满足某个队列的绑定条件时,消息被路由到该队列。
  • 适用场景:

    • 多条件过滤:当路由规则复杂且不适合用路由键表达时。
    • 元数据路由:根据消息的元数据(如 content-type、version)路由。

Spring AMQP

官方文档:https://springdoc.cn/spring-amqp/

介绍

Spring AMQP是一个基于AMQP协议的消息中间件框架,它提供了一个简单的API来发送和接收异步、可靠的消息。

来源:https://springdoc.cn/spring-amqp/

AMQP的关键组件

Queue

在 Spring AMQP 中,Queue对象代表消息队列,是消息的存储和处理单元

参数有:

参数 类型 描述
name String 队列名称(必须唯一)。
durable boolean 是否持久化(默认true)。持久化队列会在 Broker 重启后保留。
exclusive boolean 是否排他(默认false)。排他队列仅对首次声明它的连接可见,并在连接断开时自动删除。
autoDelete boolean 是否自动删除(默认false)。自动删除队列在最后一个消费者断开连接后自动删除。
arguments Map<String, Object> 自定义参数(如死信交换机、消息 TTL、优先级等)。
Binding

将消息从 Exchange 路由到 Queue,通过 绑定键(Binding Key)消息头(Headers)** 实现匹配。

Spring AMQP 提供了Binding类及其构建器BindingBuilder,用于声明不同类型的绑定:

import org.springframework.amqp.core.*;

// 声明Binding的示例(以Direct类型为例)
Binding directBinding = BindingBuilder.bind(queue())     // 目标队列
                                     .to(directExchange()) // 源交换机
                                     .with("order.create"); // 绑定键
Exchange

在 Spring AMQP 中,Exchange对象是消息路由的核心组件,负责接收生产者发送的消息,并根据规则将消息路由到一个或多个队列

**Exchange**的实现类:

DirectExchange 实现 Direct 类型交换机,路由键精确匹配。
TopicExchange 实现 Topic 类型交换机,支持通配符匹配。
FanoutExchange 实现 Fanout 类型交换机,广播消息到所有绑定队列。
HeadersExchange 实现 Headers 类型交换机,根据消息头匹配。
CustomExchange 自定义交换机类型(如 RabbitMQ 的延迟交换机x-delayed-message)。
Message

在 Spring AMQP 框架中,Message对象是消息传递的核心载体,它封装了从 AMQP broker(如 RabbitMQ)接收或发送的消息内容和元数据

主要部分

public class Message {
    private final byte[] body;              // 消息内容(字节数组)
    private final MessageProperties messageProperties; // 消息属性
}
  1. 消息体(Body):实际传输的数据(字节数组)
  2. 消息属性(MessageProperties):包含路由信息、优先级、过期时间等元数据

Spring AMQP案例学习

jdk版本:1.8

springboot版本: 2.1.6.RELEASE

rabbitmq版本: rabbitmq:3.12-management(docker部署)

在这里插入图片描述

声明队列
@Bean
public Queue queue() {
    return new Queue("queue", true);
}
声明交换机
/**
 * 直连交换机
 * @return
 */
@Bean
public DirectExchange directExchange() {
    // 持久化,非自动删除
    return new DirectExchange("directExchange", true, false);
}

/**
 * 扇形交换机(广播)
 * @return
 */
@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanoutExchange", true, false);
}

/**
 * 主题交换机
 * @return
 */
@Bean
public TopicExchange topicExchange() {
    return new TopicExchange("topicExchange", true, false);
}

/**
 * 头交换机
 * @return
 */
@Bean
public HeadersExchange headersExchange() {
    return new HeadersExchange("headersExchange", true, false);
}

声明绑定
直连交换机:
@Bean
public Binding binding() {
    return BindingBuilder
            .bind(queue())
            .to(directExchange())
            .with("red");
}
扇形交换机
@Bean
public Binding binding() {
    return BindingBuilder
            .bind(queue())
            .to(fanoutExchange());
}
主题交换机
@Bean
public Binding binding() {
    return BindingBuilder
            .bind(queue())
            .to(topicExchange())
            .with("topic.#");
}
头交换机
@Bean
public Binding binding() {
    HashMap<String, Object> headers = new HashMap<>();
    headers.put("key1", "value1");
    headers.put("key2", "value2");
    headers.put("key3", "value3");
    return BindingBuilder
            .bind(queue())
            .to(headersExchange())
            .whereAny(headers)
            .match();
}
声明消息转换器及确认机制
/**
 * 消息转换器
 * 转换成json格式
 * @return
 */
@Bean
public Jackson2JsonMessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(messageConverter()); // 设置 JSON 转换器
    // 启用发布确认(确保消息到达Exchange)
    template.setConfirmCallback((correlationData, ack, cause) -> {
        if (ack) {
            System.out.println("消息成功发送到Exchange: " + correlationData);
        } else {
            System.err.println("消息发送失败: " + cause);
            // 可在此处实现重试逻辑
        }
    });

    // 启用失败退回(当Exchange无法路由到队列时触发)
    template.setMandatory(true);
    template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
        System.err.println("消息被退回: " + new String(message.getBody()) +
                ", replyCode: " + replyCode +
                ", replyText: " + replyText +
                ", exchange: " + exchange +
                ", routingKey: " + routingKey);
    });

    return template;
}
确认机制容器工厂
@Autowired
private ConnectionFactory connectionFactory;
// 手动确认模式的容器工厂
@Bean
public SimpleRabbitListenerContainerFactory manualAckContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);  // 手动确认
    factory.setPrefetchCount(1);  // 每次只处理一条消息
    factory.setMessageConverter(jsonMessageConverter()); // 必须设置
    return factory;
}

// 自动确认模式的容器工厂
@Bean
public SimpleRabbitListenerContainerFactory autoAckContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAcknowledgeMode(AcknowledgeMode.AUTO);  // 自动确认
    factory.setMessageConverter(jsonMessageConverter()); // 必须设置
    factory.setPrefetchCount(250);  // 默认预取数量
    return factory;
}

在消费者监听时指定工厂:

@RabbitListener(queues = "queue10", containerFactory = "autoAckContainerFactory") // 自动确认
@RabbitListener(queues = "queue9", containerFactory = "manualAckContainerFactory") // 手动确认

手动确认:

/**
 * 测试话题交换机
 * 手动确认消息已消费
 * @param message 消息内容
 * @param channel RabbitMQ Channel
 * @param tag deliveryTag
 */
@RabbitListener(queues = "queue7", containerFactory = "manualAckContainerFactory")
public void queue7(String message, Channel channel,
                           @Header(AmqpHeaders.DELIVERY_TAG) long tag,
                           @Header(AmqpHeaders.CONSUMER_QUEUE) String queueName,
                           Message amqpMessage) {
    try {
        System.out.println("处理消息: " + message + ", 来自队列:" + queueName);
        // 手动确认
        channel.basicAck(tag, false); // 正确调用 basicAck
    } catch (IOException e) {
        try {
            // 拒绝消息并重新入队
            channel.basicNack(tag, false, true);
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

测试:
在这里插入图片描述
在这里插入图片描述

如有错误,欢迎指正!