基于SpringBoot利用死信队列解决RabbitMQ业务队列故障重试无效场景问题
解决方案
1、场景说明
利用RabbitMQ的死信队列,解决RabbitMQ业务队列故障重试无效场景问题,在MQ业务队列消费失败时,将消息进行重试,重试若干次数(可自定义),如果还是失败则发送到死信队列,消费者监听死信队列,然后对死信队列的消息再进行相关处理。
项目实战
1、生产者服务
1.1、RabbitConfig定义相关交换机及死信队列等配置数据
@Slf4j
@Configuration
public class RabbitConfig {
public static final String BUSINESS_EXCHANGE_NAME = "business-exchange";
public static final String DEAD_LETTER_EXCHANGE_NAME = "dead-letter-exchange";
public static final String BUSINESS_QUEUE_NAME = "business-queue";
public static final String DEAD_LETTER_QUEUE_NAME = "dead-letter-queue";
public static final String ROUTING_KEY = "routing-key";
// 声明业务交换机
@Bean
public DirectExchange businessExchange(){
return new DirectExchange(BUSINESS_EXCHANGE_NAME);
}
// 声明死信交换机
@Bean
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
}
// 声明业务队列
@Bean
public Queue businessQueue(){
Map<String, Object> args = new HashMap<>(2);
// 设置业务队列的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
return QueueBuilder.durable(BUSINESS_QUEUE_NAME).withArguments(args).build();
}
// 声明死信队列
@Bean
public Queue deadLetterQueue(){
return new Queue(DEAD_LETTER_QUEUE_NAME);
}
// 将业务队列绑定到业务交换机
@Bean
public Binding bindBusinessQueue(){
return BindingBuilder.bind(businessQueue()).to(businessExchange()).with(ROUTING_KEY);
}
// 将死信队列绑定到死信交换机
@Bean
public Binding bindDeadLetterQueue(){
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ROUTING_KEY);
}
}
1.2、TestController测试接口Controller
@RestController
public class TestController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public void send(@RequestParam String msg){
// 发送消息,模拟失败到死信队列
rabbitTemplate.convertAndSend(RabbitConfig.BUSINESS_EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, msg);
}
}
2、消费者服务
2.1 BusinessQueueConsumer业务队列监听器
@Service
@Slf4j
@RabbitListener(queues = "business-queue")
public class BusinessQueueConsumer {
/**
* 指定消费的队列
*/
@RabbitHandler
public void consume(String msg, Message message, Channel channel) {
boolean success = false;
int retryCount = 3;
while (!success && retryCount-- > 0) {
try {
// 处理消息
log.info("收到消息: {}, deliveryTag = {}", msg, message.getMessageProperties().getDeliveryTag());
if (msg.equals("netowrk-error")) {
throw new RuntimeException("模拟调用接口网络故障!");
}
// 正常处理完毕,手动确认
success = true;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("程序异常:{}", e.getMessage());
}
}
// 达到最大重试次数后仍然消费失败
if (!success) {
// 手动删除,移至死信队列
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} catch (IOException e) {
log.error("IO异常:移至死信队列失败", e);
}
}
}
}
2.2 DeadLetterConsumer死信队列监听器
@Service
@Slf4j
@RabbitListener(queues = "dead-letter-queue")
public class DeadLetterConsumer {
/**
* 指定消费的队列
*/
@RabbitHandler
public void consume(String msg, Message message, Channel channel) throws IOException {
log.info("死信队列收到消息: {}, deliveryTag = {}", msg, message.getMessageProperties().getDeliveryTag());
log.info("死信队列收到了消息: {}" , msg);
log.info("这里模拟向手动处理数据表添加数据完成!");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
项目测试
1、启动RabbitMQ、Producer、调用send接口生成业务交换机及队列
2、发送netowrk-error消息,模拟网络故障
可以看到,消息已经正确被消费失败,自动重试了3次后被发送到了死信队列,另外在死信队列里面进行了消息处理。