消息确认机制
简单介绍
RabbitMQ Broker 发送消息给消费者后,消费者处理该消息时可能会发生异常,导致消费失败。
如果 Broker 在发送消息后就直接删了,就会导致消息的丢失。
为了保证消息可靠到达消费者并且成功处理了该消息,RabbitMQ 提供了消息确认机制。
消费者在订阅队列时,可以指定 autoAck
参数,该参数指定是否自动确认消息。
autoAck=true
:消费者接收到消息后,自动确认消息,RabbitMQ Broker 立即删除该消息。autoAck=false
:消费者接收到消息后,不自动确认消息,需要消费者调用channel.basicAck()
方法确认消息。如果消费者处理消息时发生异常,则可以调用channel.basicNack()
方法,表示不确认该消息的接收。
Spring AMQP 提供了三种模式的消息确认
AcknowledgeMode.NONE
:消息一经发送,就不管它了,不管消费者是否处理成功,都直接确认消息。AcknowledgeMode.AUTO
(默认):自动确认,消息接收后,消费者处理成功时自动确认该消息,如果处理时发送异常,则不会确认消息。AcknowledgeMode.MANUAL
:手动确认,消息接收后,消费者处理成功时,需要调用channel.basicAck()
方法确认消息,如果处理时发送异常,则需要调用channel.basicNack()
方法,表示不确认该消息的接收。
代码示例
spring:
application:
name: rabbit-extensions-demo
rabbitmq:
addresses: amqp://admin:admin@47.94.9.33:5672/extension
listener:
simple:
acknowledge-mode: manual
package com.ljh.extensions.rabbit.config;
import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean("ackQueue")
public Queue ackQueue() {
return QueueBuilder.durable(Constants.ACK_QUEUE)
.build();
}
@Bean("ackExchange")
public DirectExchange ackExchange() {
return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE)
.durable(true)
.build();
}
// @Bean("binding")
// public Binding binding(Exchange exchange, Queue queue) {
// return BindingBuilder.bind(queue)
// .to(exchange)
// .with("ack")
// .noargs();
// }
@Bean("binding1")
public Binding binding1(@Qualifier("ackExchange") DirectExchange exchange, @Qualifier("ackQueue") Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("ack");
}
}
package com.ljh.extensions.rabbit.controller;
import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Resource(name = "rabbitTemplate")
RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "消费者消息确认喵~");
return "发送成功";
}
}
package com.ljh.extensions.rabbit.listener;
import com.ljh.extensions.rabbit.constants.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void process(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("接收到消息:%s,deliveryTag:%d\n",
new String(message.getBody(), "UTF-8"),
deliveryTag);
try {
System.out.println("模拟处理业务逻辑");
int a = 3 / 0;
System.out.println("模拟处理业务完成");
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true);
}
}
}
持久性机制
简单介绍
前面讲了消费端处理消息时,消息如何不丢失,但是如何保证 RabbitMQ
服务停掉以后,生产者发送的消息不丢失呢。默认情况下, RabbitMQ
退出或者由于某种原因崩溃时,会忽视队列和消息。
为了保证消息持久化,RabbitMQ 提供了持久化机制,分别是:交换机持久化、队列持久化和消息持久化。
- 交换机持久化:使用
ExchangeBuilder.durable(true)
方法创建的交换机,RabbitMQ 会将交换机信息持久化到磁盘,重启 RabbitMQ 后可以自动恢复。 - 队列持久化:使用
QueueBuilder.durable(true)
方法创建的队列,RabbitMQ 会将队列信息持久化到磁盘,重启 RabbitMQ 后可以自动恢复。 - 消息持久化:消息持久化可以保证消息不丢失,即使 RabbitMQ 重启或者崩溃,消息也不会丢失。
将所有的消息都设置为持久化,会严重影响 RabbitMQ 的性能,这是因为写入磁盘的速度相比于写入内存的速度还是很慢的,对于可靠性不是那么高的消息,可以不采用持久化处理以提高整体的吞吐量。
在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。
尽管设置了持久化,也不能保证就一定可以持久化。这是因为在将这些持久化信息写入磁盘时也是需要时间的,如果 RabbitMQ 在这段时间内崩溃,那么这些信息也会丢失。
代码示例
package com.ljh.extensions.rabbit.config;
import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean("persistQueue")
public Queue persistQueue() {
return QueueBuilder.nonDurable(Constants.PERSIST_QUEUE)
.build();
}
@Bean("persistExchange")
public DirectExchange persistExchange() {
return ExchangeBuilder.directExchange(Constants.PERSIST_EXCHANGE)
.durable(false)
.build();
}
@Bean("binding2")
public Binding binding2(@Qualifier("persistExchange") Exchange exchange, @Qualifier("persistQueue") Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("persist")
.noargs();
}
}
package com.ljh.extensions.rabbit.controller;
import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Resource(name = "rabbitTemplate")
RabbitTemplate rabbitTemplate;
@RequestMapping("/persist")
public String persist() {
Message message = new Message("消费者消息确认喵~".getBytes(StandardCharsets.UTF_8), new MessageProperties());
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
rabbitTemplate.convertAndSend(Constants.PERSIST_EXCHANGE, "persist", message);
return "发送成功";
}
}
发送方确认机制
简单介绍
在发送方将消息发送至 RabbitMQ Broker 时,也有可能出现消息丢失的情况。
为了保证消息可靠到达 Broker,RabbitMQ 提供了发送方确认机制。
发送方确认机制是指,在消息发送到 Broker
后,发送方会等待 Broker
回应,如果发送方收到消息,则发送方认为消息发送成功,如果发送方未收到消息,则发送方认为消息发送失败,可以重新发送。
RabbitMQ 提供了两种方式保证发送方发送的消息的可靠传输
confirm 确认模式
:发送方在发送消息后,对发送方设置一个ConfirmCallback
的监听,无论消息是否抵达Exchange
,这个监听都会被执行,如果消息抵达了Exchange
,则ACK
为true
,如果消息没有抵达Exchange
,则ACK
为false
。returns 退回模式
:尽管确认消息发送至Exchange
后,也依然不能完全保证消息的可靠传输。在Exchange
和Queue
会有一个Routing Key(Binding Key)
的绑定关系,如果消息没有匹配到任何一个Queue
,则通过returns
模式则会退回到发送方。
代码示例
confirm 确认模式
spring:
application:
name: rabbit-extensions-demo
rabbitmq:
addresses: amqp://admin:admin@47.94.9.33:5672/extension
listener:
simple:
acknowledge-mode: auto
# 消息发送确认机制
publisher-confirm-type: correlated
package com.ljh.extensions.rabbit.config;
import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(Constants.CONFIRM_QUEUE)
.build();
}
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE)
.durable(true)
.build();
}
@Bean("binding3")
public Binding binding3(@Qualifier("confirmExchange") DirectExchange directExchange, @Qualifier("confirmQueue") Queue queue) {
return BindingBuilder.bind(queue)
.to(directExchange)
.with("confirm");
}
}
package com.ljh.extensions.rabbit.config;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: Themberfue
* @date: 2025/4/30 21:08
* @description:
*/
@Configuration
public class RabbitTemplateConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// ? 设置确认消息机制
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("执行了confirm方法");
if (ack) {
System.out.printf("接收到消息,消息ID:%s\n",
correlationData == null ? null : correlationData.getId());
} else {
System.out.printf("未接收到消息,消息ID:%s;原因:%s\n",
correlationData == null ? null : correlationData.getId(), cause);
}
}
});
return rabbitTemplate;
}
}
package com.ljh.extensions.rabbit.controller;
import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Resource(name = "confirmRabbitTemplate")
RabbitTemplate confirmRabbitTemplate;
@RequestMapping("/confirm")
public String confirm() {
// ! 直接使用 setConfirmCallback 会影响其他接口的调用
// ! 且只能设置一个确认回调,多次发起请求会报错
// rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
// @Override
// public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// System.out.println("执行了confirm方法");
// if (ack) {
// System.out.printf("接收到消息,消息ID:%s\n",
// correlationData == null ? null : correlationData.getId());
// } else {
// System.out.printf("未接收到消息,消息ID:%s\n;原因:%s",
// correlationData == null ? null : correlationData.getId(), cause);
// }
// }
// });
CorrelationData correlationData = new CorrelationData("1");
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE + "1", "confirm", "confirm test...", correlationData);
return "消息发送成功";
}
}
returns 退回模式
spring:
application:
name: rabbit-extensions-demo
rabbitmq:
addresses: amqp://admin:admin@47.94.9.33:5672/extension
listener:
simple:
acknowledge-mode: auto
# 消息发送退回机制
publisher-returns: true
package com.ljh.extensions.rabbit.config;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: Themberfue
* @date: 2025/4/30 21:08
* @description:
*/
@Configuration
public class RabbitTemplateConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// ? 设置消息退回机制
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("消息被退回:" + returned);
}
});
return rabbitTemplate;
}
}
总结:如何确保消息的可靠性传输
- 发送方 => 服务端:通过发送方确认机制,confirm 确认模式 和 returns 退回模式,确保消息可靠到达。
- 服务端:通过持久化机制,保证消息不丢失。
- 服务端 => 接收方:通过消息确认机制,确保消息被消费者正确消费。
重试机制
简单介绍
消息在处理失败后,重新发送,重新处理,这便是消息重试机制。
RabbitMQ 提供了消息重试机制,可以设置消息最大重试次数,超过最大重试次数还未成功消费,则消息会被丢弃。
代码示例
spring:
application:
name: rabbit-extensions-demo
rabbitmq:
addresses: amqp://admin:admin@47.94.9.33:5672/extension
listener:
simple:
# 消息接收确认机制
# acknowledge-mode: manual # 手动确认时,重发机制无效
acknowledge-mode: auto
retry:
enabled: true # 开启重试机制
initial-interval: 5000ms # 重发时间间隔
max-attempts: 5 # 最大重试次数
package com.ljh.extensions.rabbit.config;
import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean("retryQueue")
public Queue retryQueue() {
return QueueBuilder.durable(Constants.RETRY_QUEUE)
.build();
}
@Bean("retryExchange")
public DirectExchange retryExchange() {
return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE)
.durable(true)
.build();
}
@Bean("binding4")
public Binding binding4(@Qualifier("retryExchange") DirectExchange exchange, @Qualifier("retryQueue") Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("retry");
}
}
package com.ljh.extensions.rabbit.controller;
import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Resource(name = "rabbitTemplate")
RabbitTemplate rabbitTemplate;
@RequestMapping("/retry")
public String retry() {
rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE, "retry", "retry test...");
return "消息发送成功";
}
}
package com.ljh.extensions.rabbit.listener;
import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
@Component
public class RetryListener {
@RabbitListener(queues = Constants.RETRY_QUEUE)
public void process(Message message) throws UnsupportedEncodingException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("[%s]收到消息:%s,deliveryTag:%d\n",
Constants.RETRY_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);
int num = 3 / 0;
System.out.println("业务处理完成");
}
}
TTL 机制
简单介绍
TTL(Time To Live)机制,可以设置消息的存活时间,超过存活时间还未消费,则消息会被丢弃。
RabbitMQ 提供了 TTL 机制,可以设置队列和消息的 TTL 值,超过 TTL 值还未消费,则消息会被丢弃。
两者区别:
- 设置队列 TTL 值,一旦消息过期,就会从队列中删除。设置队列过期时间,队列中已过期的消息肯定在队列头部,RabbitMQ 只要定期扫描对头的消息是否过期即可。
- 设置消息 TTL 值,即使消息过期,也不会马上删除,只有在发送至消费者时才会检测其是否已经过期,如果过期才会删除。设置消息过期时间,每个消息的过期时间都可能不尽相同,所以需要扫描整个队列的消息才可确定是否过期,为了确保性能,所以采取类似于
懒汉模式
的方式。
将队列 TTL 设置为 30s,第一个消息的 TTL 设置为 30s,第二个消息的 TTL 设置为 10s。
理论上说,在 10s 后,第二个消息应该被丢弃。但由于设置了队列 TTL 值的机制,只会扫描队头的消息是否过期,所以在第一个消息过期之前,第二个消息不会被删除。
代码示例
package com.ljh.extensions.rabbit.config;
import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean("ttlQueue")
public Queue ttlQueue() {
return QueueBuilder.durable(Constants.TTL_QUEUE)
.ttl(20_000) // ? 设置队列的 TTL 值
.build();
}
@Bean("ttlExchange")
public DirectExchange ttlExchange() {
return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE)
.durable(true)
.build();
}
@Bean("binding5")
public Binding binding5(@Qualifier("ttlExchange") DirectExchange exchange, @Qualifier("ttlQueue") Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("ttl");
}
}
package com.ljh.extensions.rabbit.controller;
import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Resource(name = "rabbitTemplate")
RabbitTemplate rabbitTemplate;
@RequestMapping("/ttl")
public String ttl() {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// ? 设置消息的 TTL 值
message.getMessageProperties().setExpiration("10000");
return message;
}
};
rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...",
messagePostProcessor);
return "消息发送成功";
}
}
死信队列
简单介绍
死信(Dead Letter),就是因为某种原因,导致消费者消费失败的消息,称之为死信。
死信队列,当消息在一个队列中变成死信时,它就被重新被发送到另一个交换机,该交换机就是死信交换机(Dead Letter Exchange)。
该死信交换机绑定死信队列,当消息被重新发送到死信交换机时,它就被重新投递到死信队列。
消息变成死信会有如下几种原因:
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue 参数设置为 false。
- 消息过期。
- 队列达到最大长度。
代码示例
package com.ljh.extensions.rabbit.config;
import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DlConfig {
@Bean("normalQueue")
public Queue normalQueue() {
return QueueBuilder.durable(Constants.NORMAL_QUEUE)
.deadLetterExchange(Constants.DL_EXCHANGE) // ? 配置该队列的死信交换机
.deadLetterRoutingKey("dl") // ? 死信交换机绑定死信队列的 Routing Key
.ttl(10_000)
.maxLength(10L) // ? 设置队列最大长度
.build();
}
@Bean("normalExchange")
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE)
.durable(true)
.build();
}
@Bean("normalBinding")
public Binding normalBinding(@Qualifier("normalExchange") DirectExchange exchange, @Qualifier("normalQueue") Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("normal");
}
@Bean("dlQueue")
public Queue dlQueue() {
return QueueBuilder.durable(Constants.DL_QUEUE)
.build();
}
@Bean("dlExchange")
public DirectExchange dlExchange() {
return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE)
.durable(true)
.build();
}
@Bean("dlBinding")
public Binding dlBinding(@Qualifier("dlExchange") DirectExchange exchange, @Qualifier("dlQueue") Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("dl");
}
}
package com.ljh.extensions.rabbit.listener;
import com.ljh.extensions.rabbit.constants.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class DlListener {
@RabbitListener(queues = Constants.NORMAL_QUEUE)
public void processNormal(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("[%s]收到消息:%s,deliveryTag:%d\n",
Constants.NORMAL_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);
try {
int num = 3 / 0;
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, false);
}
}
@RabbitListener(queues = Constants.DL_QUEUE)
public void processDl(Message message) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("%s:[%s]收到消息:%s,deliveryTag:%d\n",
new Date(), Constants.DL_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);
}
}
package com.ljh.extensions.rabbit.controller;
import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import java.util.Date;
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Resource(name = "rabbitTemplate")
RabbitTemplate rabbitTemplate;
@RequestMapping("/dl")
public String dl() {
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "normal test...");
return "消息发送成功:" + new Date();
}
}
延迟队列
简单介绍
延迟队列(Delay Queue),即消息被发送以后,并不想让消费者立刻消费该消息,而是等待一段时间后再消费。
延迟队列的使用场景有很多,比如:
- 智能家居:智能设备产生的事件,如开关、温度变化等,可以先存放在延迟队列中,等待一段时间后再消费。
- 日常管理:预定会议,需要在会议开始前 15 分钟通知参会人员。
- 订单处理:订单创建后,需要 30 分钟后才会发货。
RabbitMQ 本身没有提供延迟队列的功能,但是基于消息过期后会变成死信的特性,可以通过设置 TTL 和死信队列来实现延迟队列的功能。
代码示例
@RequestMapping("/delay")
public String delay() {
//发送带ttl的消息
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 10s..."+ new Date(), messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setExpiration("10000");
//10s过期
return messagePostProcessor;
});
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 20s..."+ new Date(), messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setExpiration("20000");
//20s过期
return messagePostProcessor;
});
return "发送成功!";
}
由于 RabbitMQ 检查消息是否过期的机制,如果 20s 的消息先到队列,那么 10s 的消息只会在 20s 后才会被检查到过期。
延迟队列插件
RabbitMQ 官方提供了延迟队列插件,可以实现延迟队列的功能。
下载插件后,需要将插件放到 RabbitMQ 的插件目录(/usr/lib/rabbitmq/plugins
)下,然后重启 RabbitMQ 服务。
代码示例
package com.ljh.extensions.rabbit.config;
import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DelayConfig {
@Bean("delayQueue")
public Queue delayQueue() {
return QueueBuilder.durable(Constants.DELAY_QUEUE)
.build();
}
@Bean("delayExchange")
public DirectExchange delayExchange() {
return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE)
.durable(true)
.delayed() // ? 设置队列为延迟队列
.build();
}
@Bean("delayBinding")
public Binding delayBinding(@Qualifier("delayExchange") DirectExchange exchange, @Qualifier("delayQueue") Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("delay");
}
}
package com.ljh.extensions.rabbit.listener;
import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class DelayListener {
@RabbitListener(queues = Constants.DELAY_QUEUE)
public void processDelay(Message message) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("%s:[%s]收到消息:%s,deliveryTag:%d\n",
new Date(), Constants.DELAY_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);
}
}
package com.ljh.extensions.rabbit.controller;
import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import java.util.Date;
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Resource(name = "rabbitTemplate")
RabbitTemplate rabbitTemplate;
@RequestMapping("/delay")
public String delay() {
rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test...", message -> {
message.getMessageProperties()
.setDelayLong(20_000L); // ? 设置消息的延迟发送时间
return message;
});
rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test...", message -> {
message.getMessageProperties()
.setDelayLong(10_000L); // ? 设置消息的延迟发送时间
return message;
});
return "消息发送成功:" + new Date();
}
}
事务机制
简单介绍
RabbitMQ
是基于 AMQP
协议实现的,该协议实现了事务机制,因此RabbitMQ也支持事务机制事务。
Spring AMQP
也提供了对事务相关的操作。RabbitMQ
事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败。
代码示例
配置事务管理器:
@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
public RabbitTransactionManager rabbitTransactionManager (ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
@Bean("transQueue")
public Queue transQueue() {
return QueueBuilder.durable(Constants.TRANS_QUEUE)
.build();
}
@Transactional
@RequestMapping("/trans")
public String trans() {
String msg = "trans test...";
System.out.println("发送第一条消息:" + msg + 1);
transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, msg);
int a = 3 / 0;
System.out.println("发送第二条消息:" + msg + 2);
transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, msg);
return "消息发送成功:";
}
消息分发
简单介绍
一个队列可以给多个消费者消费,默认情况下,RabbitMQ 是以轮询的方式将消息分发给这些消费者,不管该消费是否已经消费并且确认。
这种情况是不太合理的,如果每个消费者消费的能力都不同,有的消费者消费快,有的慢,这会极大降低整体系统的吞吐量和处理速度。
我们可以使用 channel.basicQos(int prefetchCount)
来限制当前信
道上的消费者所能保持的最大未确认消息的数量。
当该消费者达到最大的 prefetchCount
限制时,RabbitMQ 会停止向该消费者分发消息,直到该消费者的未确认消息数量小于 prefetchCount
时。
代码示例
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动确认
prefetch: 5 # 队列最大接收五条消息
@Bean("qosQueue")
public Queue qosQueue() {
return QueueBuilder.durable(Constants.QOS_QUEUE)
.build();
}
@Bean("qosExchange")
public DirectExchange qosExchange() {
return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE)
.durable(true)
.build();
}
@Bean("qosBinding")
public Binding qosBinding(@Qualifier("qosExchange") DirectExchange exchange, @Qualifier("qosQueue") Queue queue) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("qos");
}
@RequestMapping("/qos")
public String qos() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "qos test...");
}
return "消息发送成功:" + new Date();
}
@RabbitListener(queues = Constants.QOS_QUEUE)
public void process(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("接收到消息:%s,deliveryTag:%d\n",
new String(message.getBody(), "UTF-8"),
deliveryTag);
try {
System.out.println("模拟处理业务逻辑");
System.out.println("模拟处理业务完成");
// channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true);
}
}
应用场景
- 限流:可以根据消费者的处理能力,设置
prefetchCount
限制每个消费者所能接收的消息数量,从而达到限流的目的。 - 负载均衡:通过将
prefetchCount
设置为1
,通过设 prefetch = 1 的方式,告诉 RabbitMQ 一次只给一个消费者一条消息,也就是说,在处理并确认前一条消息之前,不要向该消费者发送新消息。相反,它会将它分派给下一个不忙的消费者。