发送者可靠性
发送者重连
其实就是写个配置,让发送者连接失败了就去尝试重新连接MQ。
发送者确认
临时消息就是不需要写到磁盘的数据,也就是不需要持久化的数据。持久消息则反之。ACK是Publisher Confirm来返回的。
correlated就是发送者发送消息后,直接往下走,MQ判断完是否发送成功后调用回调函数。
@Test public void testConfirmCallback() { //0.创建CorrelationData CorrelationData cd = new CorrelationData(UUID.randomUUID().toString()); cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Override public void onFailure(Throwable ex) { log.error("spring amqp 处理确认结果异常", ex); } @Override public void onSuccess(CorrelationData.Confirm result) { //判断是否成功 if (result.isAck()) { log.debug("收到ConfirmCallback ack,消息发送成功!"); } else { log.error("收到ConfirmCallback nack,消息发送失败! reason: {}",result.getReason()); } } }); //1.交换机名 String exchangeName = "hmall.fanout"; //2.消息 String message = "hello world"; //3.发送消息 rabbitTemplate.convertAndSend(exchangeName, null, message, cd); }
ConfirmCallback是针对消息的,所以每次发送都得写一个CorrelationData。这里在new CorrelationData的时候,同时也给消息指定了唯一id。这里的Future是多并发编程的知识。注意这里的onFailure不是返回nack,而是Spring处理Future异常了,nack在下面的onSuccess里。如果返回的是nack,还要重发消息,就是再用一次convertAndSend,但这里就只是记录了一下日志。不过这一块白雪呜呜呜,不用太在意。
MQ可靠性
当队列满了的时候,MQ会去给早发的那些消息做持久化,持久化到磁盘,给队列空出一点空间,持久化期间发送者是不能成功发送消息到队列里的。
数据持久化
其实就是把数据持久化到磁盘,但不是被动的等满了再去做,而是提前做,这样就不会导致消息堆积了。
交换机和队列的持久化都是默认的,所以不用管,但是消息的持久化,你得在发消息时选择第二个Persistent。但其实Spring AMQP默认就是发持久化的消息,所以我们啥都不用管。
Lazy Queue
消息队列会动态监测消费者处理消息的速度,如果速度慢,就直接从磁盘读取数据,如果发现速度很快,超过了我读取磁盘的速度,那就会提前缓存一批数据到内存。这个我们不用管,用高版本的RabbitMQ就行。
消费者可靠性
消费者确认机制
其实应该是MQ投递消息,如果返回ack或者reject,消息直接丢弃。如果返回nack,消息重新入队(requeue),然后再重新投递给消费者。
推荐使用auto。比如MessageConversionException,就属于消息处理异常,会返回reject。
消费者失败重试
意思就是不会MQ和消费者不会来回踢皮球了。
第三种就是重试耗尽后,将失败消息(还包括报错信息)投递给指定的交换机(error.direct),再给这个交换机绑定队列和消费者,就可以处理这种投递失败的消息,通知开发者介入。
也就是重试耗尽后,用你给的rabbitTemplate去重新投递,投递到error.direct交换机,routingKey为"error"。
@Configuration public class ErrorMessageConfiguration { @Bean public DirectExchange errorExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue"); } @Bean public Binding errorQueueBinding(Queue errorQueue,DirectExchange errorExchange){ return BindingBuilder.bind(errorQueue).to(errorExchange).with("error"); } @Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error"); } }
业务幂等性
为了避免消费者重复处理同一条消息影响业务的情况,需要保证业务幂等性。
注意这里的id和之前发送者确认的那个id不一样。这个配置是在Publisher里的。
@RabbitListener(queues = "simple.queue") public void listenSimpleQueue(Message message) { log.info("监听到simple.queue的消息:{}", new String(message.getBody())); log.info("监听到simple.queue的消息: ID: {}", message.getMessageProperties().getMessageId()); }
然后用消费者用Message类来接受消息,因为消息转换器就是把Java对象转换为Message类的嘛。在message中就能拿到消息体和消息ID了。不过这种方法太影响性能了,不推荐......白雪。
这一种就是添加业务逻辑了。比如这个支付业务,MQ发一个消息是修改订单为已支付,我去修改订单为已支付的时候,我先去查看当前订单状态,如果当前订单状态是未支付,那就说明这个消息不是重复消息;如果当前订单状态是已支付或者退款中,那就说明这个消息是重复消息,这样就保证了业务的幂等性。