Spring Boot with RabbitMQ:四大核心模式指南

发布于:2025-07-30 ⋅ 阅读:(20) ⋅ 点赞:(0)

在现代分布式系统和微服务架构中,消息队列(Message Queue)是不可或缺的组件。它能实现服务间的异步通信、应用解耦和流量削峰。RabbitMQ 作为最受欢迎的消息队列中间件之一,以其稳定性、可靠性和灵活的路由模式而著称。

本文将带领你使用 Spring Boot (spring-boot-starter-amqp),通过清晰的代码实例和详尽的解释,深入理解并通过代码demo实践 RabbitMQ 的四种核心工作模式:

  • Work Queue (工作队列模式)
  • Direct Exchange (直连交换机模式)
  • Fanout Exchange (扇出/广播交换机模式)
  • Topic Exchange (主题交换机模式)
    java原生的操作方式请看这边

核心概念速览

在深入代码之前,我们先快速了解几个 RabbitMQ 的核心概念:

  • Producer (生产者):发送消息的一方。
  • Consumer (消费者):接收并处理消息的一方。
  • Queue (队列):存储消息的缓冲区,位于内存或磁盘。
  • Exchange (交换机):接收来自生产者的消息,并根据特定规则(类型和路由键)将消息路由到一个或多个队列。
  • Binding (绑定):建立 Exchange 和 Queue 之间的关联关系。
  • Routing Key (路由键):生产者在发送消息给 Exchange 时指定的“地址”或“标签”,Exchange 根据它来决定消息的去向。

模式一:Work Queue (工作队列)

这是最简单的模式,用于将一个耗时任务分发给多个消费者并行处理,从而提高整体处理效率。

特点:一个生产者将消息发送到一个特定队列,多个消费者共同监听这同一个队列。消息会以轮询(Round-Robin)的方式被分发给消费者,即一条消息只会被一个消费者处理。

1. 生产者配置

在 Work 模式下,我们只需要定义一个队列即可。消息将直接发送到这个队列。

// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {
    // 为了方便管理,我们将队列、交换机、路由键的名称统一定义在常量类中
    public static final String SPRING_WORK_QUEUE = "spring.work.queue";

    @Bean
    public Queue workQueue() {
        // 创建一个持久化的队列
        return QueueBuilder.durable(SPRING_WORK_QUEUE).build();
    }
}

2. 生产者接口

我们创建一个接口,循环发送 10 条消息到工作队列。

// ProducerController.java
@RestController
@RequestMapping("/producer")
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/work")
    public String sendWorkMessages() {
        for (int i = 1; i <= 10; i++) {
            String message = "Hello, Spring Work Queue message " + i;
            // 第一个参数是交换机名,这里为空字符串表示使用默认交换机
            // 第二个参数是路由键,对于工作队列模式,通常就是队列名
            rabbitTemplate.convertAndSend("", RabbitMQConfig.SPRING_WORK_QUEUE, message);
        }
        return "10 work messages sent successfully.";
    }
}

3. 消费者代码

我们创建两个消费者,它们监听同一个 SPRING_WORK_QUEUE 队列,以模拟任务竞争。

// WorkListener.java
@Component
public class WorkListener {

    @RabbitListener(queues = RabbitMQConfig.SPRING_WORK_QUEUE)
    public void listen1(String message) throws InterruptedException {
        System.out.println("[Work Consumer 1] received: " + message);
        // 模拟耗时任务
        Thread.sleep(100); 
    }

    @RabbitListener(queues = RabbitMQConfig.SPRING_WORK_QUEUE)
    public void listen2(String message) throws InterruptedException {
        System.out.println("[Work Consumer 2] received: " + message);
        Thread.sleep(150);
    }
}

4. 测试与结果

访问 http://localhost:8080/producer/work。你会看到控制台交替打印输出来自两个消费者的日志,这表明 10 条消息被两个消费者“瓜分”了。

[Work Consumer 1] received: Hello, Spring Work Queue message 1
[Work Consumer 2] received: Hello, Spring Work Queue message 2
[Work Consumer 1] received: Hello, Spring Work Queue message 3
[Work Consumer 2] received: Hello, Spring Work Queue message 4
[Work Consumer 1] received: Hello, Spring Work Queue message 5
... (交替输出)

注意:默认情况下,分发策略是公平轮询。可以配置 prefetch 等参数实现更复杂的负载均衡。


模式二:Direct Exchange (直连交换机)

Direct Exchange 会将消息路由到 Routing Key 与 Binding Key 完全匹配的队列。这是一种精确的、点对点的路由方式。

特点:一对一路由。你可以将多个队列用不同的 Binding Key 绑定到同一个 Direct Exchange 上,实现消息的精确投递。

Direct Exchange 模型图

1. 生产者配置

我们定义一个 Direct Exchange,两个队列,以及三个绑定关系:

  • queue1 绑定 orange
  • queue2 绑定 black
  • queue2 也绑定 green
// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {
    // ... 其他常量
    public static final String SPRING_DIRECT_EXCHANGE = "spring.direct.exchange";
    public static final String SPRING_DIRECT_QUEUE_1 = "spring.direct.queue1";
    public static final String SPRING_DIRECT_QUEUE_2 = "spring.direct.queue2";

    // 声明 Direct Exchange
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(SPRING_DIRECT_EXCHANGE);
    }

    // 声明 Queue 1
    @Bean
    public Queue directQueue1() {
        return new Queue(SPRING_DIRECT_QUEUE_1);
    }

    // 声明 Queue 2
    @Bean
    public Queue directQueue2() {
        return new Queue(SPRING_DIRECT_QUEUE_2);
    }

    // 绑定关系1: queue1 -> exchange, with routingKey "orange"
    @Bean
    public Binding directBinding1(Queue directQueue1, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue1).to(directExchange).with("orange");
    }
    
    // 绑定关系2: queue2 -> exchange, with routingKey "black"
    @Bean
    public Binding directBinding2(Queue directQueue2, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue2).to(directExchange).with("black");
    }

    // 绑定关系3: queue2 -> exchange, with routingKey "green"
    @Bean
    public Binding directBinding3(Queue directQueue2, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue2).to(directExchange).with("green");
    }
}

2. 生产者接口

// ProducerController.java
// ...
@GetMapping("/direct")
public String sendDirectMessage(String routingKey) {
    String message = "Hello, Spring Direct Exchange with routingKey: " + routingKey;
    rabbitTemplate.convertAndSend(RabbitMQConfig.SPRING_DIRECT_EXCHANGE, routingKey, message);
    return "Direct message sent with routingKey: " + routingKey;
}

3. 消费者代码

// DirectListener.java
@Component
public class DirectListener {
    @RabbitListener(queues = RabbitMQConfig.SPRING_DIRECT_QUEUE_1)
    public void listenQueue1(String message) {
        System.out.println("[" + RabbitMQConfig.SPRING_DIRECT_QUEUE_1 + "] received: " + message);
    }

    @RabbitListener(queues = RabbitMQConfig.SPRING_DIRECT_QUEUE_2)
    public void listenQueue2(String message) {
        System.out.println("[" + RabbitMQConfig.SPRING_DIRECT_QUEUE_2 + "] received: " + message);
    }
}

4. 测试与结果

  • 发送 orange: http://localhost:8080/producer/direct?routingKey=orange

    • 输出: [spring.direct.queue1] received: Hello, Spring Direct Exchange with routingKey: orange
    • 分析: orange 精确匹配 directBinding1,消息进入 queue1
  • 发送 black: http://localhost:8080/producer/direct?routingKey=black

    • 输出: [spring.direct.queue2] received: Hello, Spring Direct Exchange with routingKey: black
    • 分析: black 精确匹配 directBinding2,消息进入 queue2
  • 发送 green: http://localhost:8080/producer/direct?routingKey=green

    • 输出: [spring.direct.queue2] received: Hello, Spring Direct Exchange with routingKey: green
    • 分析: green 精确匹配 directBinding3,消息同样进入 queue2
  • 发送 blue: http://localhost:8080/producer/direct?routingKey=blue

    • 输出: (无任何输出)
    • 分析: blue 路由键在所有绑定关系中都找不到匹配项,消息被交换机丢弃。

模式三:Fanout Exchange (扇出/广播)

Fanout Exchange 是最简单的交换机类型。它会忽略 Routing Key,将收到的所有消息广播给所有绑定到该交换机上的队列。

特点:一对多广播。适用于需要将同一消息通知给所有订阅者的场景,如系统通知、配置更新等。

1. 生产者配置

我们定义一个 Fanout Exchange 和两个队列,并将这两个队列都绑定到交换机上。
在这里插入图片描述

// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {
    // ... 其他常量
    public static final String SPRING_FANOUT_EXCHANGE = "spring.fanout.exchange";
    public static final String SPRING_FANOUT_QUEUE_1 = "spring.fanout.queue1";
    public static final String SPRING_FANOUT_QUEUE_2 = "spring.fanout.queue2";

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(SPRING_FANOUT_EXCHANGE);
    }

    @Bean
    public Queue fanoutQueue1() {
        return new Queue(SPRING_FANOUT_QUEUE_1);
    }

    @Bean
    public Queue fanoutQueue2() {
        return new Queue(SPRING_FANOUT_QUEUE_2);
    }

    // 绑定 Queue1 到 Fanout Exchange
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    
    // 绑定 Queue2 到 Fanout Exchange
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

注意:Fanout 类型的绑定不需要 .with(routingKey)

2. 生产者接口

// ProducerController.java
// ...
@GetMapping("/fanout")
public String sendFanoutMessage() {
    String message = "Hello, this is a broadcast message!";
    // Fanout Exchange 忽略路由键,所以第二个参数可以为空字符串
    rabbitTemplate.convertAndSend(RabbitMQConfig.SPRING_FANOUT_EXCHANGE, "", message);
    return "Broadcast message sent successfully.";
}

3. 消费者代码

// FanoutListener.java
@Component
public class FanoutListener {
    @RabbitListener(queues = RabbitMQConfig.SPRING_FANOUT_QUEUE_1)
    public void listenQueue1(String message) {
        System.out.println("[" + RabbitMQConfig.SPRING_FANOUT_QUEUE_1 + "] received: " + message);
    }

    @RabbitListener(queues = RabbitMQConfig.SPRING_FANOUT_QUEUE_2)
    public void listenQueue2(String message) {
        System.out.println("[" + RabbitMQConfig.SPRING_FANOUT_QUEUE_2 + "] received: " + message);
    }
}

4. 测试与结果

访问 http://localhost:8080/producer/fanout

  • 输出:

    [spring.fanout.queue1] received: Hello, this is a broadcast message!
    [spring.fanout.queue2] received: Hello, this is a broadcast message!
    
  • 分析: 一条消息被成功广播到了所有绑定的队列,两个消费者都收到了同样的消息。


模式四:Topic Exchange (主题交换机)

Topic Exchange 是最灵活的交换机。它通过模式匹配来路由消息,Routing Key 是一个由点(.)分隔的单词列表,而 Binding Key 可以使用通配符。

  • * (星号): 匹配一个单词。
  • # (井号): 匹配零个或多个单词。

特点:灵活的、多对多的路由。非常适合用于实现基于内容的多维度订阅/发布系统。

![[Pasted image 20250705160511.png]]

1. 生产者配置

我们定义一个 Topic Exchange,两个队列,以及三个绑定关系,来演示通配符的用法:

  • queue1 绑定 *.orange.* (匹配中间是 orange 的三个单词的 key)
  • queue2 绑定 *.*.rabbit (匹配结尾是 rabbit 的三个单词的 key)
  • queue2 也绑定 lazy.# (匹配以 lazy. 开头的所有 key)
// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {
    // ... 其他常量
    public static final String SPRING_TOPIC_EXCHANGE = "spring.topic.exchange";
    public static final String SPRING_TOPIC_QUEUE_1 = "spring.topic.queue1";
    public static final String SPRING_TOPIC_QUEUE_2 = "spring.topic.queue2";

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(SPRING_TOPIC_EXCHANGE);
    }

    @Bean
    public Queue topicQueue1() {
        return new Queue(SPRING_TOPIC_QUEUE_1);
    }

    @Bean
    public Queue topicQueue2() {
        return new Queue(SPRING_TOPIC_QUEUE_2);
    }

    // 绑定1: *.orange.*
    @Bean
    public Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue1).to(topicExchange).with("*.orange.*");
    }

    // 绑定2: *.*.rabbit
    @Bean
    public Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with("*.*.rabbit");
    }

    // 绑定3: lazy.#
    @Bean
    public Binding topicBinding3(Queue topicQueue2, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with("lazy.#");
    }
}

2. 生产者接口

// ProducerController.java
// ...
@GetMapping("/topic")
public String sendTopicMessage(String routingKey) {
    String message = "Hello, Spring Topic Exchange with routingKey: " + routingKey;
    rabbitTemplate.convertAndSend(RabbitMQConfig.SPRING_TOPIC_EXCHANGE, routingKey, message);
    return "Topic message sent with routingKey: " + routingKey;
}

3. 消费者代码

// TopicListener.java
@Component
public class TopicListener {
    @RabbitListener(queues = RabbitMQConfig.SPRING_TOPIC_QUEUE_1)
    public void listenQueue1(String message) {
        System.out.println("[" + RabbitMQConfig.SPRING_TOPIC_QUEUE_1 + "] received: " + message);
    }

    @RabbitListener(queues = RabbitMQConfig.SPRING_TOPIC_QUEUE_2)
    public void listenQueue2(String message) {
        System.out.println("[" + RabbitMQConfig.SPRING_TOPIC_QUEUE_2 + "] received: " + message);
    }
}

4. 测试与结果

  • 发送 quick.orange.rabbit: http://localhost:8080/producer/topic?routingKey=quick.orange.rabbit

    • 输出:
[spring.topic.queue1] received: Hello, Spring Topic Exchange with routingKey: quick.orange.rabbit
[spring.topic.queue2] received: Hello, Spring Topic Exchange with routingKey: quick.orange.rabbit
  • 分析: quick.orange.rabbit 同时匹配 *.orange.* (queue1) 和 *.*.rabbit (queue2),所以两个队列都收到了消息。

  • 发送 lazy.orange.elephant: http://localhost:8080/producer/topic?routingKey=lazy.orange.elephant

    • 输出:
[spring.topic.queue1] received: Hello, Spring Topic Exchange with routingKey: lazy.orange.elephant
[spring.topic.queue2] received: Hello, Spring Topic Exchange with routingKey: lazy.orange.elephant
  • 分析: lazy.orange.elephant 同时匹配 *.orange.* (queue1) 和 lazy.# (queue2)。

  • 发送 quick.brown.fox: http://localhost:8080/producer/topic?routingKey=quick.brown.fox

    • 输出: (无任何输出)
    • 分析: 该 routing key 不匹配任何绑定规则,消息被丢弃。
  • 发送 lazy.fox: http://localhost:8080/producer/topic?routingKey=lazy.fox

    • 输出:
[spring.topic.queue2] received: Hello, Spring Topic Exchange with routingKey: lazy.fox
  • 分析: 该 routing key 仅匹配 lazy.# (queue2)。

总结:如何选择合适的模式?

模式 交换机类型 Routing Key 核心特点 适用场景
Work Queue 默认 (空字符串) 必须是队列名 任务分发,竞争消费 耗时任务处理、资源密集型操作,如视频转码、日志分析。
Direct Direct 精确匹配 点对点精确路由 需要将消息准确发送到特定处理者的场景,如按地区、按类型分发任务。
Fanout Fanout 忽略 广播 向所有订阅者发送相同消息,如系统通知、配置更新、实时聊天室。
Topic Topic 通配符模式匹配 灵活的、多维度的订阅/发布 基于内容的多条件订阅,如新闻系统(*.sports.basketball)、日志系统(error.critical.#)。

网站公告

今日签到

点亮在社区的每一天
去签到