快速入门
这里省去了交换机,让入门案例变得更简单。
编写好监听类之后启动springboot项目就能监听消息队列了。值得注意的是,这里的方法形参中的String msg的String要和第三步的message的类型一致。第三步中的message可以是User类型,但第四步方法形参的msg也要是User类型,Spring会自动完成对象和消息之间的转换。
Work Queues
@Component @Slf4j public class SpringRabbitListener { // @RabbitListener(queues = "simple.queue") // public void listenSimpleQueue(String message) { // log.info("监听到simple.queue的消息:{}", message); // } @RabbitListener(queues = "work.queue") public void listenWorkQueue1(String message) { System.out.println("消费者1接收到消息:" + message + "," + LocalTime.now()); } @RabbitListener(queues = "work.queue") public void listenWorkQueue2(String message) { System.err.println("消费者2.....接收到消息:" + message + "," + LocalTime.now()); } }
这是添加两个消费者的代码,可以看到是一个方法就是一个消费者。System.err.println打印出来是红色的,可以和sout区分开。然后编写发送50条数据(带编号)给消息队列的代码,比较简单就不贴出来了。运行之后发现,两个消费者各得到一半的消息,而且消费者2处理的是奇数数据,消费者1处理的是偶数数据。所以,队列中同一个消息只能被一个消费者处理。其次,如果有多个消费者,会通过轮询的方式平均交给消费者处理。
这个是consumer的配置,添加这个设置之后呢,就避免了之前”我管你处理没处理完,我就是给你分配消息“的问题。因为只有这个消费者处理完一条消息了,消息队列才能给这个消费者分配下一条数据。这样的话,处理慢的消费者就会被分配较少的消息,从而实现“能者多劳”。
Fanout交换机
Fanout Exchange会将接收到的消息路由到每一个与其绑定的queue上,所以也叫广播模式。通过Fanout Exchange,就实现了发送者发送一个消息,多个消费者都能处理同一个消息的功能。
指定交换机的名称和类型。
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String message) { log.info("消费者1监听到fanout.queue1的消息:{}", message); } @RabbitListener(queues = "fanout.queue2") public void listenSimpleQueue2(String message) { log.info("消费者2监听到fanout.queue2的消息:{}", message); }
这是两个消费者监听不同队列。
注意形参和之前不一样了。
Direct交换机
这里的BindingKey和RoutingKey就像谍战片里的对暗号一样。不同的消费者的BindingKey是可以相同的,交换机都会路由到,类似于Fanout的效果。
这是交换机绑定队列,Routing Key就是在指定BindingKey,不能两个一起指定,只能一个一个指定。
这是发送者发送消息的API。
Topic交换机
基于Bean声明队列交换机
.to()后面还可以跟.with()来指定BindingKey。是在Consumer模块里编写配置类来声明的。
基于注解声明队列和交换机
消息转换器
可以看到,同样的消息,修改消息转换器之后的可读性强了很多。一样是把对象序列化成字符串,默认的那个就序列化出来的很丑,可读性差。