大家好,我是工藤学编程 🦉 | 一个正在努力学习的小博主,期待你的关注 |
---|---|
实战代码系列最新文章😉 | C++实现图书管理系统(Qt C++ GUI界面版) |
SpringBoot实战系列🐷 | 【SpringBoot实战系列】SpringBoot3.X 整合 MinIO 存储原生方案 |
分库分表 | 分库分表之实战-sharding-JDBC分库分表执行流程原理剖析 |
消息队列 | 深入浅出 RabbitMQ - 主题模式(Topic) |
前情摘要:
1、深入浅出 RabbitMQ-核心概念介绍与容器化部署
2、深入浅出 RabbitMQ-简单队列实战
3、深入浅出 RabbitMQ-工作队列实战(轮训策略VS公平策略)
4、深入浅出 RabbitMQ-交换机详解与发布订阅模型实战
4、深入浅出 RabbitMQ-路由模式详解
5、深入浅出 RabbitMQ - 主题模式(Topic)
本文章目录
深入浅出 RabbitMQ - SpringBoot2.X整合RabbitMQ实战
一、环境准备
安装RabbitMQ
首先确保已安装RabbitMQ服务并启动,可通过官方镜像或包管理工具安装。可参见深入浅出 RabbitMQ-核心概念介绍与容器化部署。项目依赖
在SpringBoot项目的pom.xml
中添加RabbitMQ Starter依赖:<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
二、配置RabbitMQ连接
SpringBoot通过yml配置文件自动注入RabbitMQ连接信息,无需手动创建连接工厂。
核心配置(application.yml)
spring:
rabbitmq:
host: 192.168.229.128 # RabbitMQ服务器地址
port: 5672 # 通信端口(默认5672)
virtual-host: /dev # 虚拟主机(隔离不同环境的消息资源)
username: admin # 登录用户名
password: password # 登录密码
配置说明:
virtual-host
用于隔离不同项目的消息,避免命名冲突;- 若使用默认配置,可省略
port
(默认5672)和virtual-host
(默认/
)。
三、定义消息队列核心组件
通过配置类定义交换机(Exchange)、队列(Queue)及绑定关系(Binding),这是RabbitMQ消息路由的核心。
RabbitMQConfig配置类
@Configuration
public class RabbitMQConfig {
// 交换机名称
public static final String EXCHANGE_NAME = "order_exchange";
// 队列名称
public static final String QUEUE_NAME = "order_queue";
/**
* 声明主题交换机(Topic Exchange)
* 特点:按路由键模式匹配消息,支持通配符(*匹配一个词,#匹配多个词)
*/
@Bean
public Exchange orderExchange() {
// durable(true):交换机持久化(服务重启后不丢失)
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
/**
* 声明持久化队列
*/
@Bean
public Queue orderQueue() {
// durable(true):队列持久化(服务重启后队列仍存在)
return QueueBuilder.durable(QUEUE_NAME).build();
}
/**
* 绑定交换机与队列
* 路由键规则:order.#(匹配以order.开头的所有路由键,如order.new、order.pay等)
*/
@Bean
public Binding orderBinding(Queue queue, Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
}
}
核心概念解析:
- 交换机:接收生产者发送的消息,根据规则路由到队列;这里使用Topic类型,灵活性更高;
- 队列:存储消息的容器,必须与交换机绑定才能接收消息;
- 绑定关系:通过路由键(Routing Key)定义交换机到队列的映射规则,决定消息流向。
四、实现消息生产者
使用RabbitTemplate
发送消息到指定交换机,SpringBoot已自动配置该组件,可直接注入使用。
消息生产者(测试类)
@SpringBootTest
class DemoApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息测试
*/
@Test
void sendMessage() {
// 参数1:交换机名称(从配置类引用,避免硬编码)
// 参数2:路由键(需符合绑定规则order.#)
// 参数3:消息内容(支持String、对象等类型)
rabbitTemplate.convertAndSend(
RabbitMQConfig.EXCHANGE_NAME,
"order.new",
"新订单来啦1"
);
System.out.println("消息发送成功");
}
}
发送逻辑说明:
- 路由键
order.new
符合绑定规则order.#
,消息会被路由到order_queue
; - 若路由键不符合规则(如
user.new
),消息将被丢弃(除非配置了死信交换机); - 消息内容支持序列化对象,需确保对象实现
Serializable
接口。
五、实现消息消费者
通过@RabbitListener
注解监听指定队列,自动接收并处理消息。
消息消费者
@Component
public class OrderMQListener {
/**
* 监听order_queue队列的消息
* @param msg 消息内容(自动转换为String类型)
* @param message 消息完整对象(包含属性、路由键等元数据)
*/
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME) // 绑定监听的队列
@RabbitHandler // 处理消息的具体方法
public void handleOrderMessage(String msg, Message message) {
// 获取消息投递标签(用于手动确认消息)
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 打印消息详情
System.out.println("===== 接收到消息 =====");
System.out.println("消息标签:" + deliveryTag);
System.out.println("消息内容:" + msg);
System.out.println("消息元数据:" + message.getMessageProperties());
}
}
消费逻辑说明:
@RabbitListener
:指定监听的队列,支持同时监听多个队列;@RabbitHandler
:标记消息处理方法,支持根据消息类型重载;- 消息确认:默认开启自动确认(消息接收后自动从队列删除),如需手动确认,可配置
acknowledge-mode: manual
。
六、测试验证
- 启动服务:启动SpringBoot应用,消费者会自动注册到RabbitMQ;
- 发送消息:运行测试类的
sendMessage
方法,发送测试消息; - 查看结果:控制台输出如下,说明消息发送和接收成功:
七、注意事项
- 持久化配置:交换机和队列需设置
durable(true)
,避免服务重启后组件丢失; - 路由键匹配:Topic交换机的路由键需符合
*
和#
的通配规则,否则消息无法路由; - 消息序列化:默认使用JDK序列化,建议配置JSON序列化器(如
Jackson2JsonMessageConverter
); - 异常处理:消费者需添加异常处理逻辑,避免消息处理失败导致无限重试。
总结
本文通过实战案例演示了SpringBoot2.X整合RabbitMQ的核心步骤,从配置连接、定义组件到实现消息的生产与消费。关键在于理解交换机、队列、路由键的协作关系,以及SpringBoot自动配置带来的简化。后续可进一步学习消息确认机制、死信队列、延迟消息等高级特性,提升消息队列的可靠性。
觉得有用请点赞收藏!
如果有相关问题,欢迎评论区留言讨论~