RabbitMQ

发布于:2025-09-09 ⋅ 阅读:(21) ⋅ 点赞:(0)

RabbitMQ

workqueues模型

Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,这样显然是有问题的。

能者多劳

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。

正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

交换机

在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
  • Consumer:消费者,与以前一样,订阅队列,没有变化

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

Fanout交换机

Fanout 广播模式 消息交给所有绑定到交换机的队列 它有以下几个特点

  • 可以有多个队列
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机
  • 交换机把消息发送给绑定过的所有队列
  • 订阅队列的消费者都能拿到消息

交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

Topic交换机

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

通配符规则:

  • #:匹配一个或多个词
  • :匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

用Java代码声明交换机

在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。
因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。

基本api

SpringAMQP提供了一个Queue类 用来创建队列


public class Queue extends AbstractQueue inplements Clonable { }

springAMQP还提供了exchange接口 表示不同的交换机 我们可以自己创建队列和交换机,不过SpringAMQP还提供了ExchangeBuilder来简化这个过程 而在绑定队列和交换机时,则需要使用BindingBuilder来创建Binding对象

1. Queue

在 RabbitMQ 里,队列是最基础的消息存储和转发容器。

Spring AMQP 提供了 org.springframework.amqp.core.Queue 类,直接用来在 Spring 环境中声明和管理队列。

Queue queue = new Queue("simple.queue", true);

这个类本质上就是对 RabbitMQ 队列的一个封装对象,它的构造参数里可以控制:

  • 队列名simple.queue
  • 是否持久化true 持久化到磁盘,false 内存队列)
  • 是否自动删除
  • 是否排他

Spring Boot 项目启动时,如果你注册了这个 Bean,Spring AMQP 会自动帮你在 RabbitMQ 里声明出这个队列。


2. Exchange 接口

RabbitMQ 的交换机有好几种类型:DirectExchangeTopicExchangeFanoutExchangeHeadersExchange

Spring AMQP 为它们都定义了一个统一的接口:org.springframework.amqp.core.Exchange

比如:

Exchange directExchange = new DirectExchange("hmall.direct");
Exchange topicExchange = new TopicExchange("hmall.topic");


3. ExchangeBuilder

为了简化交换机的创建,Spring AMQP 提供了 构建者模式

Exchange fanoutExchange = ExchangeBuilder.fanoutExchange("hmall.fanout")
        .durable(true)
        .build();

这样比自己 new FanoutExchange(...) 灵活很多,可以链式设置属性(是否持久化、是否自动删除等)。


4. BindingBuilder

队列要跟交换机绑定起来才有用,这就是 Binding 的作用。

Spring 提供了 BindingBuilder 来简化绑定:

Queue queue = new Queue("simple.queue");
Exchange exchange = new DirectExchange("hmall.direct");

Binding binding = BindingBuilder
        .bind(queue)            // 绑定队列
        .to(exchange)           // 绑定到哪个交换机
        .with("hmall.key")      // 指定路由键(Direct/Topic 才用)
        .noargs();              // 不需要额外参数

这样就得到一个 Binding 对象,Spring Boot 会自动帮你声明到 RabbitMQ。

或者也可以基于注解声明交换机和队列

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

配置消息转换器

RabbitMQ 自己 只能处理字节数组byte[])。

所以你无论发 String对象 还是 Map,都必须先转成字节,再放进 MQ。

  • 发送时:Java 对象 → 序列化成字节 → 发到 MQ
  • 接收时:从 MQ 里取出字节 → 反序列化成 Java 对象

这个过程,就靠 消息转换器 MessageConverter

Spring 默认用 SimpleMessageConverter,它的策略是:

  • 如果你传的是对象,会用 JDK 自带的序列化机制ObjectOutputStream)。
  • 结果就是 MQ 收到一个 二进制字节流

为什么 JDK 序列化有问题?

  • 体积大:JDK 序列化带有类元信息(类名、字段类型等),比 JSON 多很多冗余。
  • 有安全漏洞:反序列化时如果黑客伪造了恶意类,可能触发任意代码执行。
  • 可读性差:放到 MQ 管理界面看,看到的就是一堆乱码字节,人类根本读不懂。

比如发送一个 User{id=1, name="家宇"},存到 MQ 里就是一堆二进制,看不到内容。

我们一般用 Jackson2JsonMessageConverter

这样 Spring 会把对象转成 JSON 字符串,再转字节。

效果:

  • MQ 里能看到 JSON(可读性强)。
  • 体积比 JDK 序列化小很多。
  • 语言无关(其他语言也能直接消费 JSON)。

配置消息转换器

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

publisherconsumer两个服务中都引入依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

消息转换器中添加的messageId可以便于我们将来做幂等性判断


网站公告

今日签到

点亮在社区的每一天
去签到