问题描述
生产者方代码:
private void rollbackOrder(long orderId, CorrelationData correlationData) {
rabbitTemplate.convertAndSend("order-rollback-exchange",
"rollback.order",
new QuotaRollbackTO(orderId,null,null),
correlationData);
}
消费者方代码:
@RabbitListener(queues = "rollback.order.queue")
public void handleRollback(QuotaRollbackTO quotaRollbackTO, Message message, Channel channel) throws Exception {
try {
//回滚订单的逻辑
orderMainMapper.deleteById(quotaRollbackTO.getOrderId());
orderProductMapper.deleteById(quotaRollbackTO.getOrderId());
// 手动ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
System.err.println("消息处理失败:" + e.getMessage());
// 拒绝消息并转入死信队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
Rabbit MQ发送消息报错:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void org.ragdollcat.order.mqlistener.OrderMQListener.handleRollback(org.ragdollcat.order.to.QuotaRollbackTO,org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception]
原因分析:
由于未配置 RabbitMQ 使用 JSON 消息转换器,Spring Boot 默认采用了 SimpleMessageConverter
,导致无法将 JSON 数据反序列化为 QuotaRollbackTO 对象,从而造成监听方法调用失败。在错误信息中,可能会发现以下提示:
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [org.ragdollcat.order.to.QuotaRollbackTO] for GenericMessage [payload=byte[234], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=rollback.order, spring_listener_return_correlation=4e708ed8-bfde-4f43-9282-4701c1b5ef19, amqp_receivedExchange=order-rollback-exchange, spring_returned_message_correlation=4591a034-2ee5-4d4c-8a3c-4c95a3b43572, amqp_deliveryTag=1, amqp_consumerQueue=rollback.order.queue, amqp_redelivered=false, id=58a03a74-a8d4-329c-7d26-c229977f4c15, amqp_consumerTag=amq.ctag-0deI5Ts_m6_K32RdEYyDZA, contentType=application/x-java-serialized-object, timestamp=1747560067493}]
证实了发送端把对象以JDK默认方式(ObjectOutputStream)序列化成 byte[],接收端尝试用 JSON 的方式将这个 byte[] 解析成 QuotaRollbackTO,失败。
解决方案:
在配置类中,显式使用 JSON 消息转换器,为 RabbitTemplate 设置 Jackson2JsonMessageConverter
(或放在Spring Boot的引导类中也可以,引导类本身也是作为一个配置类),这样后续注入的就是自定义的RabbitTemplate
:
@Configuration
public class RabbitTemplateConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,MessageConverter jacksonMessageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 开启强制投递,必须配置这个 ReturnCallback 才会生效!
rabbitTemplate.setMandatory(true);
// 设置 ConfirmCallback
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println(" 交换机已收到消息:" + correlationData);
} else {
System.err.println(" 交换机未收到消息:" + cause + ",相关数据:" + correlationData);
}
});
// 设置 ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.err.println(" 消息未成功路由到队列:");
System.err.println(" 原始消息:" + new String(message.getBody()));
System.err.println(" 应答码:" + replyCode);
System.err.println(" 原因:" + replyText);
System.err.println(" 交换机:" + exchange);
System.err.println(" 路由键:" + routingKey);
});
//为 RabbitTemplate 设置 `Jackson2JsonMessageConverter`
rabbitTemplate.setMessageConverter(jacksonMessageConverter);
return rabbitTemplate;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
MessageConverter jacksonMessageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jacksonMessageConverter);
return factory;
}
}
同样地在发送消息时,不应采用对象转换为JSON字符串的写法:
rabbitTemplate.convertAndSend("rollback.order", JSON.toJSONString(rollbackTO));
验证方式:
临时打印发送内容 headers:
rabbitTemplate.setReturnsCallback(returned -> {
Message returnedMessage = returned.getMessage();
System.out.println("发送内容类型: " + returnedMessage.getMessageProperties().getContentType());
});
看到的 contentType 应该是:
“application/json”