【RabbitMQ】整合 SpringBoot,实现工作队列、发布/订阅、路由和通配符模式

发布于:2025-05-18 ⋅ 阅读:(26) ⋅ 点赞:(0)

工作队列模式

引入依赖

我们在创建 SpringBoot 项目的时候,选上这两个依赖即可 |380

或者在依赖中加入

<dependency>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-amqp</artifactId>  
</dependency>

配置

将配置文件后缀改成 yml 之后,进行配置image.png|372

#配置 RabbitMQ 的基本信息
spring:
  rabbitmq:  
    host: 127.0.0.1 #RabbitMQ 服务器的地址  
    port: 15673  #RabbitMQ的TCP协议的端口号,而不是管理平台的端口号。默认为5672  
    username: guest  
    password: guest  
    virtual-host: coding #默认为 /

或者这样写

spring:
  rabbitmq:
  	addresses: amqp://guest:guest@127.0.0.1:5672/coding
  • 格式为: amqp://username:password@ip:port/virtual-host

声明

注意引入的是这个包
image.png

package org.example.rabbitmq.config;  
  
import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.core.Queue;  
import org.springframework.amqp.core.QueueBuilder;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
  
@Configuration  
public class RabbitMQConfig {  
    // 声明一个队列,来自第三方包,就是一个对象  
    @Bean("workQueue")  
    public Queue workQueue(){  
        return QueueBuilder.durable(Constants.WORK_QUEUE).build();  
    }  
}

生产者代码

package org.example.rabbitmq.controller;  
  
import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  
  
@RestController  
@RequestMapping("/producer")  
public class ProducerController {  
    @Autowired  
    private RabbitTemplate rabbitTemplate;  
  
    @RequestMapping("/work")  
    public String work() {  
        // 使用内置交换机的话,RoutingKey 和队列名称一致  
        rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, "hello spring amqp: work...");  
        return "发送成功";  
    }  
}
  • 在运行程序之后,队列不会被立马创建出来
  • 需要发送消息之后才会被创建image.png|278

消费者代码

消费者是通过实现一个监听类,来监听有没有消息

  • 采用一个注解—— @RabbitListener

@RabbitListenerSpring 框架中用于监听 RabbitMQ 队列的注解,通过使用这个注解,可以定义一个方法,以便从 RabbitMQ 队列中接收消息。

  • 该注解支持多种参数类型,这些参数类型代表了从 RabbitMQ 接收到的消息和相关信息
  • 以下是一些常用的参数类型:
    • String:返回消息的内容
    • Message (org.spring.framework.ampq.core.Message):Spring AMPQMessage 类,返回原始的消息体以及消息的属性,如消息 ID,内容,队列信息等
    • Channel (com.rabbitmq.client.Channel):RabbitMQ 的通道对象,可以用于进行高级的操作,如手动确认消息
package org.example.rabbitmq.listener;  
  
import org.apache.logging.log4j.message.Message;  
import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  
  
@Component  
public class WorkListener {  
    @RabbitListener(queues = Constants.WORK_QUEUE)  
    public void queueListener1(Message message) {  
        System.out.println("listener 1 [" + Constants.WORK_QUEUE + "] 接收到消息:" + message);  
    }  
  
    @RabbitListener(queues = Constants.WORK_QUEUE)  
    public void queueListener2(String message) {  
        System.out.println("listener 2 [" + Constants.WORK_QUEUE + "] 接收到消息:" + message);  
    }  
}

发布/订阅模式

在发布/订阅模式中,多了一个 Exchange 角色。Exchange 常见有三种类型,分别代表不同的路由规则

  • Fanout: 广播,将消息交给所有绑定到交换机的队列 (Publish/Subscribe 模式)
  • Direct: 定向,把消息交给符合指定 Routing Key 的队列(Routing 模式)
  • Topic: 通配符,把消息交给符合 Routing pattern (路由模式) 的队列(Topics 模式)

引入依赖

我们在创建 SpringBoot 项目的时候,选上这两个依赖即可 |380

或者在依赖中加入

<dependency>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-amqp</artifactId>  
</dependency>

声明

package org.example.rabbitmq.config;  
  
import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.core.*;  
import org.springframework.beans.factory.annotation.Qualifier;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
  
@Configuration  
public class RabbitMQConfig {  
    /**  
     * 二、发布/订阅模式  
     * 声明队列、声明交换机、声明队列和交换机的绑定  
     * @return  
     */  
    @Bean("fanoutQueue1")  
    // @Bean注解:交给Spring进行管理, 括号里面是指定名称  
    public Queue fanoutQueue1() {  
        return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();  
    }  
  
    @Bean("fanoutQueue2")  
    public Queue fanoutQueue2() {  
        return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();  
    }  
  
    @Bean("fanoutExchange")  
    // 声明交换机有很多种类型:FanoutExchange、DirectExchange、TopicExchange  
    public FanoutExchange fanoutExchange() {  
        return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();  
    }  
  
    @Bean("fanoutQueueBinding1")  
    public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue) {  
        return BindingBuilder.bind(queue).to(fanoutExchange);  
    }  
  
    @Bean("fanoutQueueBinding2")  
    public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue) {  
        return BindingBuilder.bind(queue).to(fanoutExchange);  
    }  
}

生产者代码

image.png|276

  1. 声明队列
  2. 声明交换机
  3. 声明交换机和队列的绑定
  4. 发送消息

发送消息

package org.example.rabbitmq.controller;  
  
import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  
  
@RestController  
@RequestMapping("/producer")  
public class ProducerController {  
    @Autowired  
    private RabbitTemplate rabbitTemplate;  
  
    @RequestMapping("/fanout")  
    public String fanout() {  
        rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"","hello spring amqp:fanout...");  
        return "发送成功";  
    }  
}

消费者代码

package org.example.rabbitmq.listener;  
  
import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  
  
@Component  
public class FanoutListener {  
    @RabbitListener(queues = Constants.FANOUT_QUEUE1)  
    public void queueListener1(String message) {  
        System.out.println("队列[" + Constants.FANOUT_QUEUE1 + "] 接收到消息:" + message);  
    }  
  
    @RabbitListener(queues = Constants.FANOUT_QUEUE2)  
    public void queueListener2(String message) {  
        System.out.println("队列[" + Constants.FANOUT_QUEUE2 + "] 接收到消息:" + message);  
    }  
}

运行程序

  1. 运行项目,调用接口发送消息
    • http://127.0.0.1:8080/producer/fanout
    • image.png

image.png

  1. 监听类收到消息,并打印
    image.png

路由模式

交换机类型为 Direct 时,会把消息交给符合指定 Routing Key 的队列

  • 队列和交换机的绑定,不是任意的绑定了,而是要制定一个 RoutingKey(路由 key
  • 消息的发送方在向 Exchange 发送消息时,也需要指定消息的 RoutingKey
  • Exchange 也不再把消息交给每一个绑定的 key,而是根据消息的 RoutingKey 进行判断,只有队列的 RoutingKey 和消息的 RoutingKey 完全一致,才会接收消息

image.png|315

声明

按照这个图片,进行绑定image.png|385

/**  
 * 三、 路由模式  
 * 声明队列、声明交换机、声明队列和交换机的绑定  
 * @return  
 */  
@Bean("directQueue1")  
public Queue directQueue1(){  
    return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();  
}  
  
@Bean("directQueue2")  
public Queue directQueue2(){  
    return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();  
}  
  
@Bean("directExchange")  
// 声明交换机有很多种类型:FanoutExchange、DirectExchange、TopicExchange  
public DirectExchange directExchange() {  
    return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();  
}  
  
@Bean("directQueueBinding1")  
public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue1") Queue queue) {  
    return BindingBuilder.bind(queue).to(directExchange).with("a");  
}  
  
@Bean("directQueueBinding2")  
public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {  
    return BindingBuilder.bind(queue).to(directExchange).with("a");  
}  
  
@Bean("directQueueBinding3")  
public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {  
    return BindingBuilder.bind(queue).to(directExchange).with("b");  
}  
  
@Bean("directQueueBinding4")  
public Binding directQueueBinding4(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {  
    return BindingBuilder.bind(queue).to(directExchange).with("c");  
}

生产者代码

package org.example.rabbitmq.controller;  
  
import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.PathVariable;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  
  
@RestController  
@RequestMapping("/producer")  
public class ProducerController {  
    @Autowired  
    private RabbitTemplate rabbitTemplate;  
    /**  
     * 三、路由模式  
     * @param routingKey  
     * @return  
     */  
    @RequestMapping("/direct/{routingKey}")  
    //从路径中拿到这个routingKey  
    public String direct(@PathVariable("routingKey") String routingKey) {  
        rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey,"hello spring amqp:direct, my routing key is" + routingKey);  
        return "发送成功";  
    }  
}

消费者代码

package org.example.rabbitmq.listener;  
  
import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  
  
@Component  
public class DirectListener {  
    @RabbitListener(queues = Constants.DIRECT_QUEUE1)  
    public void queueListener1(String message) {  
        System.out.println("队列[" + Constants.DIRECT_QUEUE1 + "] 接收到消息:" + message);  
    }  
  
    @RabbitListener(queues = Constants.DIRECT_QUEUE2)  
    public void queueListener2(String message) {  
        System.out.println("队列[" + Constants.DIRECT_QUEUE2 + "] 接收到消息:" + message);  
    }  
}

运行程序

  1. 运行项目

  2. 调用接口发送 routingKeya 的消息

    • http://127.0.0.1:8080/producer/direct/a
    • 观察后端日志,队列 1 和 2 都收到消息 image.png
  3. 调用接口发送 routingKeyb 的消息

    • http://127.0.0.1:8080/producer/direct/b
    • 观察后端日志,队列 2 收到消息image.png|347
  4. 调用接口发送 routingKeyc 的消息

    • http://127.0.0.1:8080/producer/direct/c
    • 观察后端日志,队列 2 收到消息|372

通配符模式

TopicsRouting 模式的区别是:

  1. topics 模式使用的交换机类型为 topicRouting 模式使用的是 direct
  2. topic 类型的交换机在匹配规则上进行了扩展,Binding Key 支持通配符匹配

image.png|419

  • * 表示一个单词
  • # 表示多个单词

声明

/**  
 * 四、通配符模式  
 * 声明队列、声明交换机、声明队列和交换机的绑定  
 * @return  
 */  
@Bean("topicQueue1")  
public Queue topicQueue1(){  
    return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();  
}  
  
@Bean("topicQueue2")  
public Queue topicQueue2(){  
    return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();  
}  
  
@Bean("topicExchange")  
public TopicExchange topicExchange() {  
    return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();  
}  
  
@Bean("topicQueueBinding1")  
public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue1") Queue queue) {  
    return BindingBuilder.bind(queue).to(topicExchange()).with("*.a.*");  
}  
  
@Bean("topicQueueBinding2")  
public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) {  
    return BindingBuilder.bind(queue).to(topicExchange()).with("*.*.b");  
}  
  
@Bean("topicQueueBinding3")  
public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) {  
    return BindingBuilder.bind(queue).to(topicExchange()).with("c.#");  
}

生产者代码

package org.example.rabbitmq.controller;  
  
import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.PathVariable;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  
  
@RestController  
@RequestMapping("/producer")  
public class ProducerController {  
    @Autowired  
    private RabbitTemplate rabbitTemplate;  
  
    
    /**  
     * 四、通配符模式  
     * @param routingKey  
     * @return  
     */  
    @RequestMapping("/topic/{routingKey}")  
    public String topic(@PathVariable("routingKey") String routingKey) {  
        rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey, "hello spring amqp:topic, my routing key is " + routingKey);  
        return "发送成功";  
    }  
}

消费者代码

运行程序

  1. 运行程序

  2. 调用接口发送 routingKeyqqq.a.b 的消息

    • http://127.0.0.1:8080/producer/topic/qqq.a.b
    • 观察后端日志,队列 1 和队列 2 均收到消息image.png|435
  3. 调用接口发送 routingKeyc.abc.fff 的消息

    • http://127.0.0.1:8080/producer/topic/c.abc.fff
    • 观察后端日志,队列 2 收到信息image.png
  4. 调用接口发送 routingKeyg.h.j 的消息

    • http://127.0.0.1:8080/producer/topic/g.h.j
    • 观察后端日志,没有队列收到消息

网站公告

今日签到

点亮在社区的每一天
去签到