RabbitMQ快速入门
1. docker安装RabbitMQ
rabbitMQ docker镜像官网:https://hub.docker.com/_/rabbitmq/
SpringBoot整合RabbitMQ参考网站:https://docs.spring.io/spring-boot/3.3/reference/messaging/amqp.html
docker安装rabbitMQ
docker run -d --name rabbitmq --restart=always -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 5672:5672 -p 15672:15672 rabbitmq:3.13.7-management
RabbitMQ结构和概念
RabbitMQ中的几个概念:
- channel:操作MQ的工具
- exchange:路由消息到队列中
- queue:缓存消息
- virtual host:虚拟主机,是对queue、exchange等资源进行逻辑分组
基本消息队列消息发送流程:
- 建立connection;
- 创建channel;
- 利用channel声明队列;
- 利用channel向队列发送消息
基本消息队列消息接收流程:
- 建立connection;
- 创建channel;
- 利用channel声明队列;
- 定义consumer的消费行为handleDelivery();
- 利用channel将消费者与队列绑定
2. 简单队列模型
简单队列模型角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接收并缓存消息
- consumer:订阅队列,处理队列中的消息
SpringBoot整合RabbitMQ(生产者)
- 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置application.yml
rabbitmq:
host: 192.168.xx.xx
port: 5672
username: admin
password: admin
virtual-host: /
- 创建RabbitMQ配置类
@Configuration
public class RabbitMQConfig {
@Bean()
public Queue simpleQueue(){
return new Queue("simple_queue",true);
}
}
对于生产者,当队列不存在时或消息发送时,均会报错。未防止误判,需提前创建Queue的Bean,若该队列不存在,则会自动创建,若存在,则不创建。
- 编写消息发送类
import jakarta.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
@Service
public class SimpleQueueService {
@Resource
private RabbitTemplate rabbitTemplate;
public Boolean send(String msg){
rabbitTemplate.convertAndSend("simple_queue",msg);
return true;
}
}
消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SimpleConsumerService {
@RabbitListener(queues = "simple_queue")
public void receive(String msg){
System.out.println("处理消息:"+msg);
}
}
添加@RabbitListener注解的方法即为消费者处理函数,queues参数可以为数组(多个队列时),函数的参数类型与发送者的类型一致
3. 工作队列模型
consumer1和consumer2为合作关系,一起处理queue中的消息。作用:提高消息的处理速度,避免消息的堆积
生产者
public Boolean send(String msg){
for (int i=0;i<50;i++){
rabbitTemplate.convertAndSend("simple_queue","message "+i);
}
return true;
}
消费者
@Component
public class SimpleConsumerService {
@RabbitListener(queues = "simple_queue")
public void receiver1(String msg) throws InterruptedException {
System.out.println("队列1处理消息:"+msg);
Thread.sleep(10);
}
@RabbitListener(queues = "simple_queue")
public void receiver2(String msg) throws InterruptedException {
System.out.println("队列2处理消息:"+msg);
Thread.sleep(20);
}
}
默认情况下,队列会以轮询的方式将消息均分给消费者,如果想实现消费者按需处理,可设置prefetch的值
spring:
application:
name: hello-world
rabbitmq:
host: 192.168.5.3
port: 5672
username: admin
password: admin
virtual-host: /
listener:
simple:
prefetch: 1 #工作队列的消费者每次可取1条消息,处理完成后再取下一条
4. 发布订阅模型
简单队列模型和工作队列模型只有一个队列,消息只能提交给一个消费者处理。发布订阅模型通过交换机(exchange)可实现将消息推送到多个队列,交给多个消费者处理。
常见的exchange类型包括:
- Fanout:广播
- Direct:路由
- Topic:话题
exchange只负责消息的转发,而不是存储,路由失败则消息丢失
4.1 Fanout Exchange
Fanout Exchange会将接收到的消息路由到每一个与其绑定的queue
案例
- 在Consumer服务中添加配置类,声明FanoutExchange和队列Queue,同时绑定FanoutExchange和Queue
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// 声明exchange交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout");
}
// 声明队列1
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1",true);
}
// 声明队列2
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2",true);
}
// 绑定队列1和交换机
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
// 绑定队列2和交换机
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
- 编写消费者
public class SimpleConsumerService {
@RabbitListener(queues = "fanout.queue1")
public void receiver1(String msg) throws InterruptedException {
System.out.println("队列1处理消息:"+msg);
Thread.sleep(10);
}
@RabbitListener(queues = "fanout.queue2")
public void receiver2(String msg) throws InterruptedException {
System.out.println("队列2处理消息:"+msg);
Thread.sleep(20);
}
}
3.编写生产者
@Service
public class SimpleQueueService {
@Resource
private RabbitTemplate rabbitTemplate;
public Boolean send(String msg){
// rabbitTemplate.convertAndSend("simple_queue",msg);
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend("fanout","","message "+i);
}
return true;
}
}
4.2 DirectExchange
DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模型。
- 每一个Queue都与Exchange设置一个BindingKey(Exchange和Queue默认BindingKey为交换机或Queue名称)
- 生产者发布消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
案例
- 在consumer服务中声明两个消费者,分别监听queue1和queue2,并利用@RabbitListener声明Exchange,Queue、RoutingKey
@Component
public class SimpleConsumerService {
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "direct",type = ExchangeTypes.DIRECT),
key = {"red","blue"})) //key为数组,为Queue设置的RoutingKey
public void receiver1(String msg) throws InterruptedException {
System.out.println("队列1处理消息:"+msg);
Thread.sleep(10);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "direct",type = ExchangeTypes.DIRECT),
key = {"red","yellow"})) //key为数组,为Queue设置的RoutingKey
public void receiver2(String msg) throws InterruptedException {
System.out.println("队列2处理消息:"+msg);
Thread.sleep(20);
}
}
- 编写生产者
@Service
public class SimpleQueueService {
@Resource
private RabbitTemplate rabbitTemplate;
public Boolean send(String msg){
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend("direct","blue","message "+i);
}
return true;
}
}
4.3 TopicExchange
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以==.==分割
Queue与Exchange指定BingKey时可以使用通配符:
#:代指0个或者多个单词
*:代指一个单词
案例
- 消费者
@Component
public class SimpleConsumerService {
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "topic",type = ExchangeTypes.TOPIC),
key = "hunan.#"))
public void receiver1(String msg) throws InterruptedException {
System.out.println("队列1处理消息:"+msg);
Thread.sleep(10);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "topic",type = ExchangeTypes.TOPIC),
key = "#.news"))
public void receiver2(String msg) throws InterruptedException {
System.out.println("队列2处理消息:"+msg);
Thread.sleep(20);
}
}
- 生产者
public Boolean send(String msg){
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend("topic","hunan.flowers","message "+i);
}
return true;
}
5. SpringAMQP消息转化器
Spring的消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:
- 在publisher服务中引入依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.11.3</version>
</dependency>
- 在publisher服务中声明MessageConverter
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}