RabbitMQ:基于SpringAMQP声明队列与交换机并配置消息转换器(三)

发布于:2025-08-04 ⋅ 阅读:(12) ⋅ 点赞:(0)

一、声明队列和交换机

1.1介绍

        在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。​

        因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。

1.2基本API 

SpringAMQP提供了一个Queue类,用来创建队列:​

SpringAMQP还提供了一个Exchange接口,来表示所有不同类型的交换机:​

 

​我们可以自己创建队列和交换机,不过SpringAMQP还提供了ExchangeBuilder来简化这个过程:

 

 而在绑定队列和交换机时,则需要使用BindingBuilder来创建Binding对象:

 

1.3 fanout示例 

在consumer中创建一个config配置类,声明队列和交换机: 

 

package com.itheima.consumer.config;​
​
import org.springframework.amqp.core.Binding;​
import org.springframework.amqp.core.BindingBuilder;​
import org.springframework.amqp.core.FanoutExchange;​
import org.springframework.amqp.core.Queue;​
import org.springframework.context.annotation.Bean;​
import org.springframework.context.annotation.Configuration;​
​
@Configuration​
public class FanoutConfig {​
    /**​
     * 声明交换机​
     * @return Fanout类型交换机​
     */​
    @Bean​
    public FanoutExchange fanoutExchange(){​
        return new FanoutExchange("hmall.fanout");​
    }​
​
    /**​
     * 第1个队列​
     */​
    @Bean​
    }​
}

1.4 direct示例

direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding:

package com.itheima.consumer.config;​
​
import org.springframework.amqp.core.*;​
import org.springframework.context.annotation.Bean;​
import org.springframework.context.annotation.Configuration;​
​
@Configuration​
public class DirectConfig {​
​
    /**​
     * 声明交换机​
     * @return Direct类型交换机​
     */​
    @Bean​
    public DirectExchange directExchange(){​
        return ExchangeBuilder.directExchange("hmall.direct").build();​
    }​
​
    /**​
     * 第1个队列​
     */​
    @Bean​
    public Queue directQueue1(){​
        return new Queue("direct.queue1");​
    }​
    }​
}

 1.5 基于注解声明

        1.5.1 基本用法

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。 

例如,我们同样声明Direct模式的交换机和队列: 

 

@RabbitListener(bindings = @QueueBinding(​
    value = @Queue(name = "direct.queue1"),​
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),​
    key = {"red", "blue"}​
))​
public void listenDirectQueue1(String msg){​
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");​
}​
​
@RabbitListener(bindings = @QueueBinding(​
    value = @Queue(name = "direct.queue2"),​
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),​
    key = {"red", "yellow"}​
))​
public void listenDirectQueue2(String msg){​
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");​
}

是不是简单多了。​

再试试Topic模式:

@RabbitListener(bindings = @QueueBinding(​
    value = @Queue(name = "topic.queue1"),​
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),​
    key = "china.#"​
))​
public void listenTopicQueue1(String msg){​
    System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");​
}​
​
@RabbitListener(bindings = @QueueBinding(​
    value = @Queue(name = "topic.queue2"),​
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),​
    key = "#.news"​
))​
public void listenTopicQueue2(String msg){​
    System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");​
}
        1.5.2 注意⚠️

如果交换机或队列不存在,Spring AMQP会自动创建它们。

如果交换机(Exchange)不存在,Spring AMQP会根据注解中的配置(如名称和类型)自动创建交换机。

如果队列(Queue)不存在,Spring AMQP会根据注解中的配置(如名称)自动创建队列。

如果绑定关系(Binding)不存在,Spring AMQP会根据注解中的配置(如路由键)自动建立绑定关系。

如果交换机或队列已经存在,不会更改已存在的交换机或队列。

如果交换机或队列已经存在,并且它们的配置(如名称、类型等)与注解中的配置一致,Spring AMQP不会对它们进行任何更改。

如果交换机或队列的配置与注解中的配置不一致(例如,交换机类型不同或队列的持久性设置不同),RabbitMQ会抛出错误,导致声明失败。

 二、消息转换器

2.1简介 

Spring的消息发送代码接收的消息体是一个Object: 

 

        而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 

        只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题: 

  • 数据体积过大​
  • 有安全漏洞​
  • 可读性差​

​2.2 测试默认转换器

1)创建测试队列​

首先,我们在consumer服务中声明一个新的配置类:

 

 

利用@Bean的方式创建一个队列,​

具体代码:

 

package com.itheima.consumer.config;​
​
import org.springframework.amqp.core.Queue;​
import org.springframework.context.annotation.Bean;​
import org.springframework.context.annotation.Configuration;​
​
@Configuration​
public class MessageConfig {​
​
    @Bean​
    public Queue objectQueue() {​
        return new Queue("object.queue");​
    }​
}

 

注意,这里我们先不要给这个队列添加消费者,我们要查看消息体的格式。​

重启consumer服务以后,该队列就会被自动创建出来了:

 

2)发送消息​

我们在publisher模块的SpringAmqpTest中新增一个消息发送的代码,发送一个Map对象:

@Test​
public void testSendMap() throws InterruptedException {​
    // 准备消息​
    Map<String,Object> msg = new HashMap<>();​
    msg.put("name", "柳岩");​
    msg.put("age", 21);​
    // 发送消息​
    rabbitTemplate.convertAndSend("object.queue", msg);​
}

 发送消息后查看控制台:

 

可以看到消息格式非常不友好。​

 2.3 配置JSON转换器

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。 

在publisher和consumer两个服务中都引入依赖: 

<dependency>​
    <groupId>com.fasterxml.jackson.dataformat</groupId>​
    <artifactId>jackson-dataformat-xml</artifactId>​
    <version>2.9.10</version>​
</dependency>

 注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

配置消息转换器,在publisher和consumer两个服务的启动类中添加一个Bean即可: 

 

@Bean​
public MessageConverter messageConverter(){​
    // 1.定义消息转换器​
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();​
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息​
    jackson2JsonMessageConverter.setCreateMessageIds(true);​
    return jackson2JsonMessageConverter;​
}

​消息转换器中添加的messageId可以便于我们将来做幂等性判断。
 

此时,我们到MQ控制台删除object.queue中的旧的消息(在Purge选项卡中清除消息)。然后再次执行刚才的消息发送的代码,到MQ的控制台查看消息结构: 

 

 与之前相比,可以发现,消息编码为utf-8 可以显示,并且存储大小只有26bytes


网站公告

今日签到

点亮在社区的每一天
去签到