RabbitMQ
workqueues模型
Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,这样显然是有问题的。
能者多劳
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。
正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。
交换机
在订阅模型中,多了一个exchange角色,而且过程略有变化:
- Publisher:生产者,不再发送消息到队列中,而是发给交换机
- Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
- Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
- Consumer:消费者,与以前一样,订阅队列,没有变化
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
Fanout交换机
Fanout 广播模式 消息交给所有绑定到交换机的队列 它有以下几个特点
- 可以有多个队列
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机
- 交换机把消息发送给绑定过的所有队列
- 订阅队列的消费者都能拿到消息
交换机的作用是什么?
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列
Direct交换机
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
描述下Direct交换机与Fanout交换机的差异?
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
Topic交换机
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。
只不过Topic
类型Exchange
可以让队列在绑定BindingKey
的时候使用通配符!
BindingKey
一般都是有一个或多个单词组成,多个单词之间以.
分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词- :匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者item.spu
item.*
:只能匹配item.spu
用Java代码声明交换机
在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。
因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。
基本api
SpringAMQP提供了一个Queue类 用来创建队列
public class Queue extends AbstractQueue inplements Clonable { }
springAMQP还提供了exchange接口 表示不同的交换机 我们可以自己创建队列和交换机,不过SpringAMQP还提供了ExchangeBuilder来简化这个过程 而在绑定队列和交换机时,则需要使用BindingBuilder来创建Binding对象
1. Queue
类
在 RabbitMQ 里,队列是最基础的消息存储和转发容器。
Spring AMQP 提供了 org.springframework.amqp.core.Queue
类,直接用来在 Spring 环境中声明和管理队列。
Queue queue = new Queue("simple.queue", true);
这个类本质上就是对 RabbitMQ 队列的一个封装对象,它的构造参数里可以控制:
- 队列名(
simple.queue
) - 是否持久化(
true
持久化到磁盘,false
内存队列) - 是否自动删除
- 是否排他
Spring Boot 项目启动时,如果你注册了这个 Bean,Spring AMQP 会自动帮你在 RabbitMQ 里声明出这个队列。
2. Exchange
接口
RabbitMQ 的交换机有好几种类型:DirectExchange
、TopicExchange
、FanoutExchange
、HeadersExchange
。
Spring AMQP 为它们都定义了一个统一的接口:org.springframework.amqp.core.Exchange
。
比如:
Exchange directExchange = new DirectExchange("hmall.direct");
Exchange topicExchange = new TopicExchange("hmall.topic");
3. ExchangeBuilder
为了简化交换机的创建,Spring AMQP 提供了 构建者模式:
Exchange fanoutExchange = ExchangeBuilder.fanoutExchange("hmall.fanout")
.durable(true)
.build();
这样比自己 new FanoutExchange(...)
灵活很多,可以链式设置属性(是否持久化、是否自动删除等)。
4. BindingBuilder
队列要跟交换机绑定起来才有用,这就是 Binding 的作用。
Spring 提供了 BindingBuilder
来简化绑定:
Queue queue = new Queue("simple.queue");
Exchange exchange = new DirectExchange("hmall.direct");
Binding binding = BindingBuilder
.bind(queue) // 绑定队列
.to(exchange) // 绑定到哪个交换机
.with("hmall.key") // 指定路由键(Direct/Topic 才用)
.noargs(); // 不需要额外参数
这样就得到一个 Binding 对象,Spring Boot 会自动帮你声明到 RabbitMQ。
或者也可以基于注解声明交换机和队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
配置消息转换器
RabbitMQ 自己 只能处理字节数组(byte[]
)。
所以你无论发 String
、对象
还是 Map
,都必须先转成字节,再放进 MQ。
- 发送时:Java 对象 → 序列化成字节 → 发到 MQ
- 接收时:从 MQ 里取出字节 → 反序列化成 Java 对象
这个过程,就靠 消息转换器 MessageConverter。
Spring 默认用 SimpleMessageConverter
,它的策略是:
- 如果你传的是对象,会用 JDK 自带的序列化机制(
ObjectOutputStream
)。 - 结果就是 MQ 收到一个 二进制字节流。
为什么 JDK 序列化有问题?
- 体积大:JDK 序列化带有类元信息(类名、字段类型等),比 JSON 多很多冗余。
- 有安全漏洞:反序列化时如果黑客伪造了恶意类,可能触发任意代码执行。
- 可读性差:放到 MQ 管理界面看,看到的就是一堆乱码字节,人类根本读不懂。
比如发送一个 User{id=1, name="家宇"}
,存到 MQ 里就是一堆二进制,看不到内容。
我们一般用 Jackson2JsonMessageConverter
。
这样 Spring 会把对象转成 JSON 字符串,再转字节。
效果:
- MQ 里能看到 JSON(可读性强)。
- 体积比 JDK 序列化小很多。
- 语言无关(其他语言也能直接消费 JSON)。
配置消息转换器
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
在publisher
和consumer
两个服务中都引入依赖:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
注意,如果项目中引入了spring-boot-starter-web
依赖,则无需再次引入Jackson
依赖。
配置消息转换器,在publisher
和consumer
两个服务的启动类中添加一个Bean即可:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
消息转换器中添加的messageId可以便于我们将来做幂等性判断