springboot rabbitmq 消息队列入门与实战

发布于:2025-09-13 ⋅ 阅读:(19) ⋅ 点赞:(0)

Spring Boot3 RabbitMq 项目地址

https://gitee.com/supervol/loong-springboot-study

(记得给个start,感谢)

RabbitMq 概述

        RabbitMQ 是基于 AMQP(Advanced Message Queuing Protocol)的开源消息中间件,核心优势在于解耦、削峰、异步通信;而 Spring Boot 3 作为主流的 Java 开发框架,通过 spring-boot-starter-amqp starter 简化了 RabbitMQ 的整合流程。本文将从基础概念、环境搭建、核心功能、高级特性到最佳实践,全面讲解 Spring Boot 3 与 RabbitMQ 的整合方案。

RabbitMq 核心

1. RabbitMQ 组件

组件 作用说明
Broker RabbitMQ 服务器实例,负责接收、存储、转发消息
Exchange 交换机,接收生产者发送的消息,根据路由规则将消息路由到绑定的队列
Queue 消息队列,存储待消费的消息,支持持久化、限流、死信等特性
Binding 交换机与队列的绑定关系,包含 “路由键(Routing Key)” 用于匹配消息
Routing Key 消息的 “地址标识”,交换机通过 Routing Key 决定消息路由到哪个队列
Virtual Host 虚拟主机,实现多租户隔离(不同应用使用不同 Virtual Host,避免资源冲突)
Connection 客户端与 Broker 的 TCP 连接,重量级资源,一般复用
Channel 基于 Connection 的轻量级通信通道, RabbitMQ 推荐通过 Channel 操作消息(减少 TCP 连接开销)

        交换机(Exchange)的 4 种核心类型,交换机是 RabbitMQ 消息路由的核心,不同类型对应不同的路由策略:

  • Direct Exchange(直连交换机):精确匹配 Routing Key(消息的 Routing Key 与 Binding 的 Routing Key 完全一致才路由),适用于点对点通信(如订单支付通知)。
  • Topic Exchange(主题交换机):模糊匹配 Routing Key(支持 * 匹配单个单词、# 匹配多个单词,单词间用 . 分隔),适用于发布订阅 + 多条件过滤(如日志按 “服务名。级别” 路由)。
  • Fanout Exchange(扇出交换机):忽略 Routing Key,将消息广播到所有绑定的队列,适用于广播通信(如系统通知、缓存清理)。
  • Headers Exchange(头交换机):不依赖 Routing Key,通过匹配消息头(Headers)的键值对路由,适用于复杂属性匹配(较少用,灵活但性能略低)。

2. Spring AMQP 核心组件

        Spring Boot 3 整合 RabbitMQ 依赖 Spring AMQP(版本与 Spring Boot 3 强绑定,如 Spring Boot 3.2 对应 Spring AMQP 3.2+),核心组件如下:

  • RabbitTemplate:封装了 RabbitMQ 的消息发送逻辑,支持同步 / 异步发送、消息回调、消息转换器等。
  • AmqpAdmin:用于声明交换机、队列、绑定关系(支持编程式声明,也可通过注解声明)。
  • @RabbitListener:注解式消费者,标注在方法上即可监听指定队列,支持批量消费、手动确认等。
  • MessageListenerContainer:消费者容器,负责管理消费者生命周期(如并发消费、消息重试、异常处理),Spring Boot 会自动配置默认容器。

RabbitMq 示例

1. 前提条件

        Spring Boot 3 对依赖版本有严格要求,避免版本冲突:

组件 最低版本要求 推荐版本
JDK JDK 17+ JDK 17/21
RabbitMQ 3.9+ 3.12+
Spring Boot 3.0+ 3.2.x(稳定版)
Spring AMQP 3.0+(随 Spring Boot 自动引入) 3.2.x

2. 代码位置

        请参考项目地址中 springboot-mq/springboot-rabbitmq 模块代码。

RabbitMq 高级

        基础整合仅满足简单通信,实际项目需解决消息丢失、重复消费、延迟消息等问题,本节讲解核心高级特性。

1. 消息可靠性保障

        RabbitMQ 消息丢失可能发生在三个环节:生产者→BrokerBroker 存储Broker→消费者,需针对性防护。

环节 防护措施
生产者→Broker 开启生产者确认(publisher-confirm-type: correlated)+ 回调重试
Broker 存储 交换机 / 队列持久化(durable=true)+ 消息持久化(deliveryMode=PERSISTENT
Broker→消费者 手动确认(acknowledge-mode: manual)+ 消费失败转发死信队列
(1)消息持久化配置

        在声明交换机和队列时,需设置 durable=true;发送消息时,需设置 deliveryMode=PERSISTENT

// 1. 声明持久化交换机
DirectExchange durableExchange = new DirectExchange("durable-exchange", true, false);

// 2. 声明持久化队列
Queue durableQueue = new Queue("durable-queue", true, false, false);

// 3. 发送持久化消息(通过 RabbitTemplate 设置消息属性)
rabbitTemplate.convertAndSend(
        "durable-exchange",
        "durable-routing-key",
        "持久化消息",
        message -> {
            // 设置消息持久化(DeliveryMode.PERSISTENT)
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return message;
        },
        new CorrelationData(UUID.randomUUID().toString())
);

2. 死信队列

        死信是指无法被正常消费的消息(如消费失败、消息过期、队列满),死信队列用于存储这些消息,避免丢失或阻塞正常队列。

(1)死信产生条件
  • 消息被消费者拒绝(basicReject/basicNack,且 requeue=false)。
  • 消息过期(队列设置 x-message-ttl 或消息单独设置 expiration)。
  • 队列达到最大长度(x-max-length),无法存储新消息。
(2)死信队列配置示例
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DeadLetterQueueConfig {
    // 1. 死信交换机(普通 Direct 交换机)
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("dlx-exchange", true, false);
    }

    // 2. 死信队列(存储死信消息)
    @Bean
    public Queue deadLetterQueue() {
        return new Queue("dlx-queue", true, false, false);
    }

    // 3. 绑定死信交换机与死信队列
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue())
                .to(deadLetterExchange())
                .with("dlx-routing-key"); // 死信路由键
    }

    // 4. 普通队列(设置死信属性,将死信转发到死信交换机)
    @Bean
    public Queue normalQueue() {
        return QueueBuilder.durable("normal-queue")
                .withArgument("x-dead-letter-exchange", "dlx-exchange") // 死信交换机
                .withArgument("x-dead-letter-routing-key", "dlx-routing-key") // 死信路由键
                .withArgument("x-message-ttl", 10000) // 消息过期时间(10秒)
                .build();
    }

    // 5. 绑定普通队列与普通交换机
    @Bean
    public Binding normalBinding(DirectExchange normalExchange, Queue normalQueue) {
        return BindingBuilder.bind(normalQueue)
                .to(normalExchange)
                .with("normal-routing-key");
    }

    // 6. 普通交换机
    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange("normal-exchange", true, false);
    }
}

        测试:发送消息到 normal-queue,若 10 秒内未被消费,消息会自动转为死信,进入 dlx-queue

3. 延迟队列

        延迟队列用于 “消息延迟指定时间后再消费”(如订单超时未支付自动取消、定时任务),RabbitMQ 无原生延迟队列,需通过以下两种方式实现:

(1)基于死信队列 + TTL

        利用 “消息过期后转为死信” 的特性,设置队列的 x-message-ttl,死信队列即为延迟队列。
缺陷:队列中所有消息的延迟时间固定,无法灵活设置不同延迟时间。

(2)基于 RabbitMQ 延迟插件

        RabbitMQ 提供 rabbitmq_delayed_message_exchange 插件,支持自定义消息延迟时间,灵活性更高。

步骤 1:安装延迟插件

  1. 下载rabbitmq_delayed_message_exchange插件,并放到指定位置
  2. 安装插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  3. 验证:访问管理界面,在 Exchanges 的 Type 下拉框中可看到 x-delayed-message
  4. 注意,本文不讨论和涉及rabbitmq及其插件安装和配置,请自行搜索。

步骤 2:配置延迟交换机与队列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DelayedQueueConfig {
    // 1. 声明延迟交换机(类型为 x-delayed-message)
    @Bean
    public CustomExchange delayedExchange() {
        // 参数:名称、类型、持久化、自动删除、附加参数(指定延迟交换机的路由类型)
        return new CustomExchange(
                "delayed-exchange",
                "x-delayed-message",
                true,
                false,
                Map.of("x-delayed-type", "direct") // 延迟交换机的底层路由类型(如 direct)
        );
    }

    // 2. 声明延迟队列
    @Bean
    public Queue delayedQueue() {
        return new Queue("delayed-queue", true, false, false);
    }

    // 3. 绑定延迟交换机与队列
    @Bean
    public Binding delayedBinding() {
        return BindingBuilder.bind(delayedQueue())
                .to(delayedExchange())
                .with("delayed-routing-key")
                .noargs();
    }
}

步骤 3:发送延迟消息

// 发送延迟消息(设置延迟时间,单位:毫秒)
public void sendDelayedMessage(Object message, long delayMs) {
    rabbitTemplate.convertAndSend(
            "delayed-exchange",
            "delayed-routing-key",
            message,
            msg -> {
                // 设置延迟时间
                msg.getMessageProperties().setDelay((int) delayMs);
                // 消息持久化
                msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                return msg;
            },
            new CorrelationData(UUID.randomUUID().toString())
    );
}

// 调用:延迟 5 秒后消费
sendDelayedMessage("延迟 5 秒的消息", 5000);

4. 消息幂等性

        重复消费:同一消息被消费者多次处理(如消费者确认前宕机,Broker 重新投递)。需保证 “重复消费不影响业务正确性”(即幂等)。

(1) 解决方案:唯一 ID + 去重存储

  1. 生成唯一消息 ID:生产者发送消息时,设置 messageId(如 UUID)。
  2. 消费前检查去重:消费者接收消息后,先查询存储(Redis / 数据库)中是否存在该 messageId,若存在则跳过,若不存在则处理业务并记录 messageId
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

@Service
public class IdempotentConsumerService {
    @Resource
    private RedisTemplate<String, String> redisTemplate;

    // 幂等消费逻辑
    public void consumeIdempotentMessage(Object message, Message amqpMessage, Channel channel) throws IOException {
        String messageId = amqpMessage.getMessageProperties().getMessageId();
        String redisKey = "rabbitmq:message:id:" + messageId;

        try {
            // 1. Redis 分布式锁:避免并发重复处理(setIfAbsent 原子操作)
            Boolean isFirstConsume = redisTemplate.opsForValue().setIfAbsent(
                    redisKey,
                    "CONSUMED",
                    24, // 过期时间(根据业务调整,避免 Redis 堆积)
                    TimeUnit.HOURS
            );

            if (Boolean.FALSE.equals(isFirstConsume)) {
                // 2. 非首次消费:直接确认消息
                System.out.printf("消息已重复消费,ID=%s%n", messageId);
                channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
                return;
            }

            // 3. 首次消费:处理业务逻辑
            System.out.printf("幂等消费消息:ID=%s,内容=%s%n", messageId, message);

            // 4. 处理完成:确认消息
            channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 5. 消费失败:拒绝消息(不重回队列)
            channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, false);
            System.err.printf("幂等消费失败:ID=%s,原因=%s%n", messageId, e.getMessage());
        }
    }
}

5. 监控与运维

1. RabbitMQ Management UI

        RabbitMQ 管理界面是最基础的监控工具,关键监控指标:

  • Exchanges:交换机是否正常,绑定数、消息入站 / 出站速率。
  • Queues:队列长度(Ready 数,若持续增长需扩容消费者)、消息消费速率(Consumers 数、Acknowledged 数)。
  • Connections/Channels:连接数、信道数是否超出阈值(避免资源耗尽)。
  • Admin:用户权限、虚拟主机配置是否正确。

2. Spring Boot Actuator 监控

        通过 Spring Boot Actuator 暴露 RabbitMQ metrics,结合 Prometheus + Grafana 可实现可视化监控。

<!-- Spring Boot Actuator -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<!-- 可选:Prometheus 适配(用于对接 Grafana) -->
<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-prometheus</artifactId>
</dependency>

配置暴露监控端点

management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,prometheus # 暴露的端点
  metrics:
    export:
      prometheus:
        enabled: true # 启用 Prometheus 导出
  endpoint:
    health:
      show-details: always # 显示健康详情

查看监控数据

  • 访问 http://localhost:8080/actuator/health:查看 RabbitMQ 连接健康状态(rabbitmq 节点为 UP 表示正常)。
  • 访问 http://localhost:8080/actuator/metrics/rabbitmq.messages.sent:查看消息发送总数。
  • 访问 http://localhost:8080/actuator/prometheus:获取 Prometheus 格式的 metrics,用于 Grafana 可视化。

RabbitMq 总结

  1. 组件设计规范

    • 交换机 / 队列命名:按 “业务模块 - 类型 - 用途” 命名(如 order-direct-exchangeorder-pay-queue)。
    • 虚拟主机隔离:不同环境(开发 / 测试 / 生产)或不同应用使用独立 Virtual Host。
  2. 性能优化

    • 连接池配置:使用 CachingConnectionFactory 缓存信道(默认缓存 25 个),避免频繁创建信道。
    • 消息体大小:单个消息不超过 1MB(大消息建议存储到 MinIO/OSS,消息中携带文件地址)。
    • 并发控制:消费者并发数(concurrency)根据 CPU 核心数调整(如 2-4 倍核心数),避免过度并发导致资源竞争。
  3. 可靠性优先

    • 必开特性:生产者确认、手动确认、消息持久化、死信队列。
    • 避免滥用自动确认:仅在 “消费逻辑无副作用” 场景使用 acknowledge-mode: auto
  4. 问题排查

    • 日志配置:开启 RabbitMQ DEBUG 日志(logging.level.org.springframework.amqp=DEBUG),便于追踪消息流转。
    • 死信监控:定期检查死信队列,分析死信原因(如消费异常、消息过期)。