RabbitMQ 声明队列和交换机详解

发布于:2025-08-12 ⋅ 阅读:(14) ⋅ 点赞:(0)

RabbitMQ 声明队列和交换机详解

一、为什么需要声明队列和交换机?

RabbitMQ先声明再使用的机制:

  • 队列负责存储消息
  • 交换机负责路由消息
  • 绑定关系决定消息从交换机到队列的路径

如果没有事先声明:

  • 队列/交换机不存在时,发送消息会失败
  • 队列没有绑定到交换机时,消息会丢失(除非设置了备用交换机)

二、声明交换机(Exchange)

2.1 交换机参数说明

RabbitMQ 提供四种类型(direct、fanout、topic、headers),声明时需要指定以下参数:

参数 类型 说明
name String 交换机名称(不为空字符串)
type String 类型:directfanouttopicheaders
durable boolean 是否持久化(重启 RabbitMQ 后仍存在)
autoDelete boolean 是否自动删除(最后一个队列解绑后删除)
arguments Map 额外参数(如 TTL、死信交换机配置)

2.2 Java 原生声明交换机

channel.exchangeDeclare(
    "my.direct.exchange", // 交换机名称
    BuiltinExchangeType.DIRECT, // 类型
    true,  // durable
    false, // autoDelete
    null   // arguments
);

2.3 Spring AMQP 声明交换机

@Bean
public DirectExchange directExchange() {
    return new DirectExchange("my.direct.exchange", true, false);
}

Spring 会自动在应用启动时向 RabbitMQ 发送声明请求。

三、声明队列(Queue)

3.1 队列参数说明

参数 类型 说明
name String 队列名称(匿名队列可由 RabbitMQ 自动生成)
durable boolean 是否持久化(消息是否持久化取决于发送时的 deliveryMode
exclusive boolean 是否排他队列(仅连接可见,断开即删除)
autoDelete boolean 是否自动删除(最后一个消费者断开时删除)
arguments Map 额外参数(TTL、死信队列、最大长度等)

3.2 Java 原生声明队列

channel.queueDeclare(
    "my.queue", // 队列名称
    true,       // durable
    false,      // exclusive
    false,      // autoDelete
    null        // arguments
);

3.3 Spring AMQP 声明队列

@Bean
public Queue myQueue() {
    return new Queue("my.queue", true, false, false);
}

四、绑定交换机和队列

4.1 Java 原生绑定

channel.queueBind(
    "my.queue",           // 队列名称
    "my.direct.exchange", // 交换机名称
    "order.create"        // routingKey
);

4.2 Spring AMQP 绑定

@Bean
public Binding binding() {
    return BindingBuilder
        .bind(myQueue())
        .to(directExchange())
        .with("order.create");
}

五、实战示例

5.2 fanout示例

package com.itheima.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("hmall.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

5.3 direct示例

package com.itheima.consumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

    /**
     * 声明交换机
     * @return Direct类型交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("hmall.direct").build();
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    }
    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    }
}

六、基于注解声明

  • Direct模式
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
  • Topic模式
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}