文章目录
工作队列模式
引入依赖
我们在创建 SpringBoot
项目的时候,选上这两个依赖即可
或者在依赖中加入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置
将配置文件后缀改成 yml
之后,进行配置
#配置 RabbitMQ 的基本信息
spring:
rabbitmq:
host: 127.0.0.1 #RabbitMQ 服务器的地址
port: 15673 #RabbitMQ的TCP协议的端口号,而不是管理平台的端口号。默认为5672
username: guest
password: guest
virtual-host: coding #默认为 /
或者这样写
spring:
rabbitmq:
addresses: amqp://guest:guest@127.0.0.1:5672/coding
- 格式为:
amqp://username:password@ip:port/virtual-host
声明
注意引入的是这个包
package org.example.rabbitmq.config;
import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// 声明一个队列,来自第三方包,就是一个对象
@Bean("workQueue")
public Queue workQueue(){
return QueueBuilder.durable(Constants.WORK_QUEUE).build();
}
}
生产者代码
package org.example.rabbitmq.controller;
import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/work")
public String work() {
// 使用内置交换机的话,RoutingKey 和队列名称一致
rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, "hello spring amqp: work...");
return "发送成功";
}
}
- 在运行程序之后,队列不会被立马创建出来
- 需要发送消息之后才会被创建
消费者代码
消费者是通过实现一个监听类,来监听有没有消息
- 采用一个注解——
@RabbitListener
@RabbitListener
是Spring
框架中用于监听RabbitMQ
队列的注解,通过使用这个注解,可以定义一个方法,以便从RabbitMQ
队列中接收消息。
- 该注解支持多种参数类型,这些参数类型代表了从
RabbitMQ
接收到的消息和相关信息- 以下是一些常用的参数类型:
String
:返回消息的内容Message
(org.spring.framework.ampq.core.Message
):Spring AMPQ
的Message
类,返回原始的消息体以及消息的属性,如消息ID
,内容,队列信息等Channel
(com.rabbitmq.client.Channel
):RabbitMQ
的通道对象,可以用于进行高级的操作,如手动确认消息
package org.example.rabbitmq.listener;
import org.apache.logging.log4j.message.Message;
import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkListener {
@RabbitListener(queues = Constants.WORK_QUEUE)
public void queueListener1(Message message) {
System.out.println("listener 1 [" + Constants.WORK_QUEUE + "] 接收到消息:" + message);
}
@RabbitListener(queues = Constants.WORK_QUEUE)
public void queueListener2(String message) {
System.out.println("listener 2 [" + Constants.WORK_QUEUE + "] 接收到消息:" + message);
}
}
发布/订阅模式
在发布/订阅模式中,多了一个 Exchange
角色。Exchange
常见有三种类型,分别代表不同的路由规则
Fanout
: 广播,将消息交给所有绑定到交换机的队列 (Publish/Subscribe
模式)Direct
: 定向,把消息交给符合指定Routing Key
的队列(Routing
模式)Topic
: 通配符,把消息交给符合Routing pattern
(路由模式) 的队列(Topics
模式)
引入依赖
我们在创建 SpringBoot
项目的时候,选上这两个依赖即可
或者在依赖中加入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
声明
package org.example.rabbitmq.config;
import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
/**
* 二、发布/订阅模式
* 声明队列、声明交换机、声明队列和交换机的绑定
* @return
*/
@Bean("fanoutQueue1")
// @Bean注解:交给Spring进行管理, 括号里面是指定名称
public Queue fanoutQueue1() {
return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();
}
@Bean("fanoutQueue2")
public Queue fanoutQueue2() {
return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();
}
@Bean("fanoutExchange")
// 声明交换机有很多种类型:FanoutExchange、DirectExchange、TopicExchange
public FanoutExchange fanoutExchange() {
return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();
}
@Bean("fanoutQueueBinding1")
public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean("fanoutQueueBinding2")
public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
}
生产者代码
- 声明队列
- 声明交换机
- 声明交换机和队列的绑定
- 发送消息
发送消息
package org.example.rabbitmq.controller;
import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/fanout")
public String fanout() {
rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"","hello spring amqp:fanout...");
return "发送成功";
}
}
消费者代码
package org.example.rabbitmq.listener;
import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutListener {
@RabbitListener(queues = Constants.FANOUT_QUEUE1)
public void queueListener1(String message) {
System.out.println("队列[" + Constants.FANOUT_QUEUE1 + "] 接收到消息:" + message);
}
@RabbitListener(queues = Constants.FANOUT_QUEUE2)
public void queueListener2(String message) {
System.out.println("队列[" + Constants.FANOUT_QUEUE2 + "] 接收到消息:" + message);
}
}
运行程序
- 运行项目,调用接口发送消息
- http://127.0.0.1:8080/producer/fanout
- 监听类收到消息,并打印
路由模式
交换机类型为 Direct
时,会把消息交给符合指定 Routing Key
的队列
- 队列和交换机的绑定,不是任意的绑定了,而是要制定一个
RoutingKey
(路由key
) - 消息的发送方在向
Exchange
发送消息时,也需要指定消息的RoutingKey
Exchange
也不再把消息交给每一个绑定的key
,而是根据消息的RoutingKey
进行判断,只有队列的RoutingKey
和消息的RoutingKey
完全一致,才会接收消息
声明
按照这个图片,进行绑定
/**
* 三、 路由模式
* 声明队列、声明交换机、声明队列和交换机的绑定
* @return
*/
@Bean("directQueue1")
public Queue directQueue1(){
return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
}
@Bean("directQueue2")
public Queue directQueue2(){
return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
}
@Bean("directExchange")
// 声明交换机有很多种类型:FanoutExchange、DirectExchange、TopicExchange
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();
}
@Bean("directQueueBinding1")
public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue1") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("a");
}
@Bean("directQueueBinding2")
public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("a");
}
@Bean("directQueueBinding3")
public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("b");
}
@Bean("directQueueBinding4")
public Binding directQueueBinding4(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("c");
}
生产者代码
package org.example.rabbitmq.controller;
import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 三、路由模式
* @param routingKey
* @return
*/
@RequestMapping("/direct/{routingKey}")
//从路径中拿到这个routingKey
public String direct(@PathVariable("routingKey") String routingKey) {
rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey,"hello spring amqp:direct, my routing key is" + routingKey);
return "发送成功";
}
}
消费者代码
package org.example.rabbitmq.listener;
import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectListener {
@RabbitListener(queues = Constants.DIRECT_QUEUE1)
public void queueListener1(String message) {
System.out.println("队列[" + Constants.DIRECT_QUEUE1 + "] 接收到消息:" + message);
}
@RabbitListener(queues = Constants.DIRECT_QUEUE2)
public void queueListener2(String message) {
System.out.println("队列[" + Constants.DIRECT_QUEUE2 + "] 接收到消息:" + message);
}
}
运行程序
运行项目
调用接口发送
routingKey
为a
的消息- http://127.0.0.1:8080/producer/direct/a
- 观察后端日志,队列 1 和 2 都收到消息
调用接口发送
routingKey
为b
的消息- http://127.0.0.1:8080/producer/direct/b
- 观察后端日志,队列 2 收到消息
调用接口发送
routingKey
为c
的消息- http://127.0.0.1:8080/producer/direct/c
- 观察后端日志,队列 2 收到消息
通配符模式
Topics
和 Routing
模式的区别是:
topics
模式使用的交换机类型为topic
(Routing
模式使用的是direct
)topic
类型的交换机在匹配规则上进行了扩展,Binding Key
支持通配符匹配
*
表示一个单词#
表示多个单词
声明
/**
* 四、通配符模式
* 声明队列、声明交换机、声明队列和交换机的绑定
* @return
*/
@Bean("topicQueue1")
public Queue topicQueue1(){
return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
}
@Bean("topicQueue2")
public Queue topicQueue2(){
return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
}
@Bean("topicExchange")
public TopicExchange topicExchange() {
return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();
}
@Bean("topicQueueBinding1")
public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue1") Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange()).with("*.a.*");
}
@Bean("topicQueueBinding2")
public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange()).with("*.*.b");
}
@Bean("topicQueueBinding3")
public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(topicExchange()).with("c.#");
}
生产者代码
package org.example.rabbitmq.controller;
import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 四、通配符模式
* @param routingKey
* @return
*/
@RequestMapping("/topic/{routingKey}")
public String topic(@PathVariable("routingKey") String routingKey) {
rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey, "hello spring amqp:topic, my routing key is " + routingKey);
return "发送成功";
}
}
消费者代码
运行程序
运行程序
调用接口发送
routingKey
为qqq.a.b
的消息- http://127.0.0.1:8080/producer/topic/qqq.a.b
- 观察后端日志,队列 1 和队列 2 均收到消息
调用接口发送
routingKey
为c.abc.fff
的消息- http://127.0.0.1:8080/producer/topic/c.abc.fff
- 观察后端日志,队列 2 收到信息
调用接口发送
routingKey
为g.h.j
的消息- http://127.0.0.1:8080/producer/topic/g.h.j
- 观察后端日志,没有队列收到消息