核心配置类
@Configuration
public class RabbitMQConfig{
//通知队列
private static final String NOTIFICATION_QUEUE = "notification_queue";
//直接交换机
private static final String DIRECT_EXCHANGE = "direct_exchange";
//路由键
private static final String ROUTING_KEY = "notify.routing.key";
// 创建队列 (控制台Durable参数表示持久化)
@Bean
public Queue notificationQueue() {
// 持久化队列(消息可靠性基础)
return new Queue(NOTIFICATION_QUEUE, true);
}
// 创建交换器 (控制台Durability表示持久化)
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE, true, false);
}
// 绑定队列与交换器
@Bean
public Binding queueBinding() {
return BindingBuilder.bind(notificationQueue())
.to(directExchange())
.with(ROUTING_KEY);
}
//创建Topic交换器
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topic_exchange",true,false);
}
//绑定队列
@Bean
public Binding smsBinding(){
return BindingBuilder.bind(smsQueue()).to(topicExchange()).with("notify.sms.*");
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(topicExchange()).with("notify.email.#");
}
//创建惰性队列
@Bean
public Queue lazyQueue(){
return QueueBuilder.durable("lazy.queue")
.lazy() //开启lazy模式
.build()
}
//开启生产者确认机制
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate remplate = new RabbitTemplate(connectionFactory);
template.setConfirmCallback((correlationData,ack,cause)->{
if(ack){
System.out.println("消息发送成功:" + correlationData.getId());
}else{
System.out.println("消息发送失败:" + cause)
//这里可实现重试或日志记录
}
});
return template;
}
//配置死信队列
@Bean
public Queue deadLetterQueue(){
return new Queue("dead_letter_queue",true);
}
@Bean
public DirectExchange deadLetterExchange(){
return new DirectExchange("dead_letter_exchange",true,false);
}
//主队列绑定死信交换器
@Bean
public Queue notificationQueue(){
Map<String,Object> args = new HashMap<>();
//设置死信交换器和路由键
args.put("x-dead-letter-exchange", "dead_letter_exchange");
args.put("x-dead-letter-routing-key", "dead.routing.key");
return new Queue(NOTIFICATION_QUEUE, true, false, false, args);
}
/**
配置失败消息的交换机和队列
*/
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.dierect");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue",true);
}
@Bean
public Binding errorBinding(Queue errorQueue,DirectExchange errorMessageExchange){
return BindingBuilder.build(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
returnnew RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
消息生产者
- 生产者发送消息时连接MQ失败
- 生产者发送消息到达MQ后未找到Exchange
- 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
- 消息到达MQ后,处理消息的进程发生异常
配置文件中开启重试机制
spring:
rabbitmq:
publisher-confirm-type: correlated #开启publisher confirm机制,并设置MQ异步回调返回回执 simple表示同步阻塞等待MQ的回执
publisher-returns: true #开启publisher return 机制
connection-timeout: 1s# 设置MQ的连接超时时间
template:
retry:
enabled: true# 开启超时重试机制(阻塞式,虽然提高了消息发送成功率,但消耗性能)
initial-interval: 1000ms# 失败后的初始等待时间
multiplier: 1# 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3# 最大重试次数
生产者消息确认机制(默认是关闭的,需要通过配置文件开启)
- Publisher Confirm
- Publisher Return
MQ会根据消息处理的情况返回不同的回执:
- 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
- 其它情况都会返回NACK,告知投递失败
/**
定义ReturnCallback
*/
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}
@Component
public class NotificationProducer {
private final RabbitTemplate rabbitTemplate;
private static final String EXCHANGE_NAME = "direct_exchange";
private static final String ROUTING_KEY = "notify.routing.key";
public NotificationProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
// 发送通知消息(支持JSON格式)
public void sendNotification(String message) {
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message);
System.out.println("发送通知消息:" + message);
}
//发送短信通知(路由键:notify.sms.10086)
public void sendNotification(String message,String routKey){
}
/**
定义ConfirmCallback
*/
CorrelationData cd = new CorrelationData();
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.confirm>(){
@Override
public void onFailure(Throwable ex){
//Future发生异常时的处理逻辑,基本不会触发
log.error("send message fail",ex);
}
@Override
public void onsuccess(CorrelationData.Confirm result){
//Future 接收到回执的处理逻辑,参数中的result就是回执内容
if(result.isAck()){
log.debug("发送消息成功,收到ack!");
}else{
log.error("发送消息失败,收到nack,reson:{}",result.getReason());
}
}
});
rabbitTemplate.convertAndSend("hmall.direct","q","hello",cd);
}
消息积压问题:
- 消费者宕机或出现网络故障
- 消息发送量激增,超过了消费者处理速度
- 消费者处理业务发生阻塞
出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。
RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
- 支持数百万条的消息存储
在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。
- 添加x-queue-mod=lazy参数即可设置队列为Lazy模式
- 配置文件中可以通过@Bean创建设置lazy方法设置Lazy模式
消息消费者
- 消息接收后尚未处理突然宕机
- 消息接收后处理过程中抛出异常
消费者确认机制三种回执:
- ack 成功处理消息,RabbitMQ从队列中删除消息
- nack 消息处理失败,RabbitMQ需要再次投递消息
- reject 消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
# 配置多线程并发数
spring:
rabbitmq:
listener:
simple:
concurrency: 10 # 最小并发数
max-concurrency: 20 # 最大并发数
acknowledge-mode: none # 不做处理 manual 手动模式,需要在业务中调用api发送ack或reject auto 自动模式业务异常会自动返回nack,正常返回ack
retry:
enable: true #开启消费者失败重试
initial-interval: 1000s #初始的失败等待时长为1秒
multiplier: 1 # 败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 #最大重试次数
stateless: true# true无状态;false有状态。如果业务中包含事务,这里改为false
@Component
public class NotificationConsumer {
@RabbitListener(queues = "notification_queue")
public void processNotification(Channel channel,String message) throw Exception{
try{
String content = new String(message.getBody(),"UTF-8");
// 这里添加具体通知逻辑(异步执行,不阻塞队列)
}catch(Exception e){
//处理失败,拒绝消息并放入死信队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
//监听lazy队列
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode",value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到lazy.queue的消息:{}",msg);
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg)throws InterruptedException{
log.info("spring 消费者收到的消息:" + msg);
if(true){
throw new MessageConversionException("故意的")
}
log.info("消息处理完成");
}
}
失败处理策略
业务Controller接口
@RestController
@RequestMapping("/notify")
public class NotificationController {
private final NotificationProducer producer;
public NotificationController(NotificationProducer producer) {
this.producer = producer;
}
// 接收通知请求,异步发送消息
@PostMapping
public String triggerNotification(@RequestBody String content) {
producer.sendNotification(content);
return "通知已提交(异步处理中)";
}
}
延迟消息
订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。
实现延迟消息也有两种方案:
- 死信交换机+TTL
- 延迟消息插件
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。
DelayExchange插件
基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。
插件下载地址:GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
基于Docker安装
docker volume inspect mq-plugins #先查看RabbitMQ的插件目录对应的数据卷
插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。
接下来执行命令,安装插件:
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
运行结果如下:
//基于注解的方式声明延迟交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue",durable = "true"),
exchange = @Exchange(name = "delay.direct",delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}",msg);
}
/**
基于配置文件的方式
*/
@Slf4j
@Configuration
publicclass DelayExchangeConfig {
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}
@Bean
public Queue delayedQueue(){
returnnew Queue("delay.queue");
}
@Bean
public Binding delayQueueBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}
发送延迟消息,必须通过x-delay属性设定延迟时间
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
①、common模块中创建一个记录消息延迟时间的消息体
@Data
publicclass MultiDelayMessage<T> {
/**
* 消息体
*/
private T data;
/**
* 记录延迟时间的集合
*/
private List<Long> delayMillis;
public MultiDelayMessage(T data, List<Long> delayMillis) {
this.data = data;
this.delayMillis = delayMillis;
}
publicstatic <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){
returnnew MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));
}
/**
* 获取并移除下一个延迟时间
* @return 队列中的第一个延迟时间
*/
public Long removeNextDelay(){
return delayMillis.remove(0);
}
/**
* 是否还有下一个延迟时间
*/
public boolean hasNextDelay(){
return !delayMillis.isEmpty();
}
}
②、trade-service中定义一个常量类
public interface MqConstants {
String DELAY_EXCHANGE = "trade.delay.topic";
String DELAY_ORDER_QUEUE = "trade.order.delay.queue";
String DELAY_ORDER_ROUTING_KEY = "order.query";
}
③、nacos中抽取一个共享配置shared-mq.xml
spring:
rabbitmq:
host: ${hm.mq.host:192.168.150.101}# 主机名
port: ${hm.mq.port:5672}# 端口
virtual-host: ${hm.mq.vhost:/hmall}# 虚拟主机
username: ${hm.mq.un:hmall}# 用户名
password: ${hm.mq.pw:123}# 密码
listener:
simple:
prefetch: 1# 每次只能获取一条消息,处理完成才能获取下一个消息
trade-service模块添加共享配置
④、trade-service下单任务
⑤、消息监听
@Slf4j
@Component
@RequiredArgsConstructor
publicclass OrderStatusListener {
privatefinal IOrderService orderService;
privatefinal PayClient payClient;
privatefinal RabbitTemplate rabbitTemplate;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),
exchange = @Exchange(name = MqConstants.DELAY_EXCHANGE, type = ExchangeTypes.TOPIC),
key = MqConstants.DELAY_ORDER_ROUTING_KEY
))
public void listenOrderCheckDelayMessage(MultiDelayMessage<Long> msg) {
// 1.获取消息中的订单id
Long orderId = msg.getData();
// 2.查询订单,判断状态:1是未支付,大于1则是已支付或已关闭
Order order = orderService.getById(orderId);
if (order == null || order.getStatus() > 1) {
// 订单不存在或交易已经结束,放弃处理
return;
}
// 3.可能是未支付,查询支付服务
PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);
if (payOrder != null && payOrder.getStatus() == 3) {
// 支付成功,更新订单状态
orderService.markOrderPaySuccess(orderId);
return;
}
// 4.确定未支付,判断是否还有剩余延迟时间
if (msg.hasNextDelay()) {
// 4.1.有延迟时间,需要重发延迟消息,先获取延迟时间的int值
int delayVal = msg.removeNextDelay().intValue();
// 4.2.发送延迟消息
rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,
message -> {
message.getMessageProperties().setDelay(delayVal);
return message;
});
return;
}
// 5.没有剩余延迟时间了,说明订单超时未支付,需要取消订单
orderService.cancelOrder(orderId);
}
}