RabbitMQ

发布于:2025-08-04 ⋅ 阅读:(10) ⋅ 点赞:(0)

同步的缺点:

1.拓展性差,每次来新的需求都需要更改代码,违反开闭原则

2.性能下降,每一步都要等待,一旦有一步时间很长就一直没法继续进行

3.级联失败问题:有一步失败了就整个都失败了

但有的时候必须用同步调用:需要返回结果的

异步的缺点:拿不到结果,只是进行一个通知操作,也不知道后续是成功还是失败,什么时候执行的

 

不同MQ的对比 

 

日志收集:kafka,因为日志量很大,需要吞吐量高的,但是它的可靠性较低

RocketMQ:阿里用java语言开发的,和java结合较好,可靠性和可用性很强,吞吐量可达十万/s,比rabbitMQ高,但是不能跨平台

什么时候用MQ?

1.数据驱动的任务依赖:B的输入需要A的输出,C的输入需要B的输出,那么就让A任务结束的时候向Mq发送一条消息,B订阅这条消息,然后C再订阅B发送的消息,这样可以保证A完成的时候B可见,能及时开始,提高效率

2.上游不关心执行结果

3.上游关注执行结果,但执行时间很长

上游实时关心结果的不适合使用MQ

一、概念

1.作用

(1)削峰填谷:应对高并发

(2)应用解耦:

  • 假设有系统B、C、D都需要系统A的数据,于是系统A调用三个方法发送数据到B、C、D。这时,系统D不需要了,那就需要在系统A把相关的代码删掉。假设这时有个新的系统E需要数据,这时系统A又要增加调用系统E的代码。为了降低这种强耦合,就可以使用MQ,系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可

(3)异步提速:调用链太长了,改成异步:发送短信,发送邮件等等 

(4)分布式事务

(5)数据分发

2.劣势

(1)引入更多依赖,稳定性变差

(2)系统复杂度提高

(3)数据一致性容易出现问题

2.1和kafka相比有什么区别:

        kafka:效率更快,因为内核态用户态的切换少,用的sendfile函数,适用于大数据场景

        RocketMQ:可以读取到消息的具体内容,便于死信的处理,但相应的效率没有kafka那么快

二、工作模式

1.简单模式:一对一,队列,默认交换机

        1.1 编写生产者

        发送消息的方法为channel.basicPublish

// 生产者
public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建队列,如果队列已存在,则使用该队列
    /**
     * 参数1:队列名
     * 参数2:是否持久化,true表示MQ重启后队列还在。
     * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
     * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
     * 参数5:其他额外参数
     */
    channel.queueDeclare("simple_queue",false,false,false,null);
    // 5.发送消息
    String message = "hello!rabbitmq!";
    /**
     * 参数1:交换机名,""表示默认交换机
     * 参数2:路由键,简单模式就是队列名
     * 参数3:其他额外参数
     * 参数4:要传递的消息字节数组
     */
    channel.basicPublish("","simple_queue",null,message.getBytes());
    // 6.关闭信道和连接
    channel.close();
    connection.close();
    System.out.println("===发送成功===");
   }
}

        1.2 编写消费者

        要一直进行监听,一直连接着RabbitMQ,通过channel信道监听,监听的方法为basicConsume

// 消费者
public class Consumer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.监听队列
    /**
     * 参数1:监听的队列名
     * 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
     * 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
     */
    channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("接受消息,消息为:"+message);
       }
     });
   }
}

2.工作队列模式:使用默认交换机,一个生产者对多个消费者,轮询消息平均发送

  • 多个消费者绑定到一个队列
  • 同一条消息只会被一个消费者消费

worker模式怎么避免消息堆积? 

可设置perfetch来控制消费者预取的数量,能者多劳

        2.1 编写生产者:与简单模式相同,只需要发送消息的时候多循环几次即可并设置成持久化队列

        2.2 编写消费者:与简单模式不同的是,要加入监听队列,以编写三个消费者监听同一个队列为例,消费者不能写在一起,每个消费者都需要一个新的类

// 消费者1
public class Consumer1 {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.监听队列,处理消息
    channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("消费者1消费消息,消息为:"+message);
       }
     });
   }
}


// 消费者2
public class Consumer2 {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.监听队列,处理消息
    channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("消费者2消费消息,消息为:"+message);
       }
     });
   }
}


// 消费者3
public class Consumer3 {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.监听队列,处理消息
    channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("消费者3消费消息,消息为:"+message);
       }
     });
   }
}

3.fanout:发布订阅模式(广播模式)

当消息需要不同消费者进行不同处理的时候用该模式,比如一条消息需要短信发送, 邮件发送,站内发送

特点:能将消息发送给多个队列,使用fanout(扇形)交换机(需要自己创建)

        3.1 编写生产者:要手写fanout交换机并手动绑定,创建一个交换机,多个队列

// 生产者
public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建交换机
    /**
     * 参数1:交换机名
     * 参数2:交换机类型
     * 参数3:交换机持久化
     */
    channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);
    // 5.创建队列
    channel.queueDeclare("SEND_MAIL",true,false,false,null);
    channel.queueDeclare("SEND_MESSAGE",true,false,false,null);
    channel.queueDeclare("SEND_STATION",true,false,false,null);
    // 6.交换机绑定队列
    /**
     * 参数1:队列名
     * 参数2:交换机名
     * 参数3:路由关键字,发布订阅模式写""即可
     */
    channel.queueBind("SEND_MAIL","exchange_fanout","");
    channel.queueBind("SEND_MESSAGE","exchange_fanout","");
    channel.queueBind("SEND_STATION","exchange_fanout","");
    // 7.发送消息
    for (int i = 1; i <= 10 ; i++) {
      channel.basicPublish("exchange_fanout","",null,
           ("你好,尊敬的用户,秒杀商品开抢了!"+i).getBytes(StandardCharsets.UTF_8));
     }
    // 8.关闭资源
    channel.close();
    connection.close();
   }
}

        3.2 编写消费者,消费者不能写在一起,每个消费者都要写一个新的类

// 站内信消费者
public class CustomerStation {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_STATION", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送站内信:"+message);
       }
     });
   }
}


// 邮件消费者
public class Customer_Mail {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_MAIL", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送邮件:"+message);
       }
     });
   }
}


// 短信消费者
public class Customer_Message {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送短信:"+message);
       }
     });
   }
}

4.路由模式:与发布订阅模式相比,不会将所有消息都发送到所有队列中,想要有筛选的将信息发送到对应队列中,则用该模式

        特点:每个队列绑定路由关键字(RoutingKey),生产者将带有路由关键字的消息发送给交换机,再转发到相应队列,使用默认(direct)交换机,但最好自己创建交换机

        4.1 编写生产者:比起发布订阅者模式,多了在Bingings时的路由关键字绑定

// 生产者
public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建交换机
    channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);
    // 5.创建队列
    channel.queueDeclare("SEND_MAIL2",true,false,false,null);
    channel.queueDeclare("SEND_MESSAGE2",true,false,false,null);
    channel.queueDeclare("SEND_STATION2",true,false,false,null);
    // 6.交换机绑定队列
    channel.queueBind("SEND_MAIL2","exchange_routing","import");
    channel.queueBind("SEND_MESSAGE2","exchange_routing","import");
    channel.queueBind("SEND_STATION2","exchange_routing","import");
    channel.queueBind("SEND_STATION2","exchange_routing","normal");
    // 7.发送消息
    channel.basicPublish("exchange_routing","import",null,
        "双十一大促活动".getBytes());
    channel.basicPublish("exchange_routing","normal",null,
        "小心促销活动".getBytes());
    // 8.关闭资源
    channel.close();
    connection.close();
   }
}

        4.2 编写消费者:只需要改动一下监听队列

// 站内信消费者
public class Customer_Station {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_STATION2", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送站内信:"+message);
       }
     });
   }
}


// 邮件消费者
public class Customer_Mail {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_MAIL2", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送邮件:"+message);
       }
     });
   }
}


// 短信消费者
public class Customer_Message {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_MESSAGE2", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送短信:"+message);
       }
     });
   }
}

5.通配符模式(topic模式):就是带通配符的路由模式,比路由模式更灵活,使用topic交换机

  #可以匹配任意多个单词,*可以匹配任意一个单词。

        路由关键字由多个单词构成时,中间以.分割

        5.1 编写生产者:比起路由模式,改变了交换机名称和类型、队列名称

        改动queueBind的交换机名称和队列名称要对应,路由关键字里加入通配符

 channel.queueBind("SEND_MAIL3","exchange_topic","#.mail.#");

        改动发送消息,mail.message.station可以和三个队列都匹配上

channel.basicPublish("exchange_topic","mail.message.station",null,
        "双十一大促活动".getBytes());

// 生产者
public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建交换机
    channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);
    // 5.创建队列
    channel.queueDeclare("SEND_MAIL3",true,false,false,null);
    channel.queueDeclare("SEND_MESSAGE3",true,false,false,null);
    channel.queueDeclare("SEND_STATION3",true,false,false,null);
    // 6.交换机绑定队列
    channel.queueBind("SEND_MAIL3","exchange_topic","#.mail.#");
    channel.queueBind("SEND_MESSAGE3","exchange_topic","#.message.#");
    channel.queueBind("SEND_STATION3","exchange_topic","#.station.#");
    // 7.发送消息
    channel.basicPublish("exchange_topic","mail.message.station",null,
        "双十一大促活动".getBytes());
    channel.basicPublish("exchange_topic","station",null,
        "小型促销活动".getBytes());
    // 8.关闭资源
    channel.close();
    connection.close();
   }
}

        5.2 编写消费者,和之前的一样,只是改变监听队列名

6.远程调用模式

三、死信队列

1.死信消息:死信队列用于存储“死信消息”,那么什么是死信消息呢

        (1)被拒绝的消息

        (2)过期的消息(设置了TTL)

        (3)队列已满

        (4)消费失败,消费者在处理消息时抛出异常

2.应用场景

        错误处理

        重试机制:将失败的消息放入死信队列,稍后重新尝试处理

        日志记录:保存死信消息用于分析

        流量控制:系统负载过高时,将部分消息转移到死信队列

四、整合springboot实际运用

        1.配置:引入RabbitMQ起步依赖

<!-- rabbitmq起步依赖 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

        编写配置文件

spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /


#日志格式
logging:
  pattern:
   console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

        2.配置类创建队列和交换机

        配置类在

        

@Configuration
public class RabbitConfig {
  private final String EXCHANGE_NAME = "boot_topic_exchange";
  private final String QUEUE_NAME = "boot_queue";


  // 创建交换机
  @Bean("bootExchange")
  public Exchange getExchange() {
    return ExchangeBuilder
         .topicExchange(EXCHANGE_NAME) // 交换机类型
         .durable(true) // 是否持久化
         .build();
   }


  // 创建队列
  @Bean("bootQueue")
  public Queue getMessageQueue() {
    return new Queue(QUEUE_NAME); // 队列名
   }


  // 交换机绑定队列
  @Bean
  public Binding bindMessageQueue(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue) {
    return BindingBuilder
         .bind(queue)
         .to(exchange)
         .with("#.message.#")
         .noargs();
   }
}

        3.编写生产者:只需要发送消息:注入RabbitTemplate即可发送消息

@SpringBootTest
public class TestProducer {
  // 注入RabbitTemplate工具类
  @Autowired
  private RabbitTemplate rabbitTemplate;


  @Test
  public void testSendMessage(){
    /**
     * 发送消息
     * 参数1:交换机
     * 参数2:路由key
     * 参数3:要发送的消息
     */
    rabbitTemplate.convertAndSend("boot_topic_exchange","message","双十一开始了!");
   }
}

        4.编写消费者:生产者和消费者一般不在一起,所以在另一个项目里编写消费者        

@Component
public class Consumer {
  // 监听队列
  @RabbitListener(queues = "boot_queue")
  public void listen_message(String message){
    System.out.println("发送短信:"+message);
   }
}

五、消息的可靠性传递

每一步都可能发生消息的丢失,为了确保消息传递的可靠性,有三种模式监听消息是否成功投递

 1.生产者重试机制(生产者没发到MQ则重试)

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

2.确认模式:要在生产者配置文件中开启确认模式(MQ给生产者反馈,MQ是否接收到了)

spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
  # 开启确认模式
   publisher-confirm-type: correlated

这个确认模式有三种可选:

  • none:关闭confirm机制

  • simple:同步阻塞等待MQ的回执

  • correlated:MQ异步回调返回回执

注意: 开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致

  • 交换机名称错误:同样是编程错误导致

  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。

在生产者中定义确认模式的回调方法

@SpringBootTest
public class ProducerTest {
  @Autowired
  private RabbitTemplate rabbitTemplate;


  @Test
  public void testConfirm(){
    // 定义确认模式的回调方法,消息向交换机发送后会调用confirm方法
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
      /**
       * 被调用的回调方法
       * @param correlationData 相关配置信息
       * @param ack 交换机是否成功收到了消息
       * @param cause 失败原因
       */
      @Override
      public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack){
          System.out.println("confirm接受成功!");
         }else{
          System.out.println("confirm接受失败,原因为:"+cause);
          // 做一些处理。
         }
       }
     });
    rabbitTemplate.convertAndSend("my_topic_exchange","my_routing","send message...");
   }
}

3.消费者下线:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 不做处理

消息应答机制:自动应答(默认)、手动应答

自动应答:即消息发送后立即被认为已经传送成功,吞吐量高但是可靠性低,容易丢失消息

手动应答:采用手动应答后的消息自动重新入队可以避免自动应答中消息丢失的情况。如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

3.2.失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。 极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

消息达到最大重试次数后会被删除,对于消息可靠性要求较高的业务场景不适合,怎么办呢? 

有三种处理方法

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

4.broker持久化:将队列和消息都标记为持久化

MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。

出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

交换机持久化

配置交换机的Durability参数:

队列持久化:在声明队列的时候把 durable 参数设置为 true

消息持久化:要想让消息实现持久化需要在消息生产者修改代码,添加MessageProperties.PERSISTENT_TEXT_PLAIN 属性。

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。

兜底方案

MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。

黄色线圈起来的部分就是MQ通知失败后的兜底处理方案,由交易服务自己主动去查询支付状态。

 交易服务并不知道用户会在什么时候支付,如果查询的时机不正确(比如查询的时候用户正在支付中),可能查询到的支付状态也不正确。 那么问题来了,我们到底该在什么时间主动查询支付状态呢?

这个时间是无法确定的,因此,通常我们采取的措施就是利用定时任务定期查询,例如每隔20秒就查询一次,并判断支付状态。如果发现订单已经支付,则立刻更新订单状态为已支付即可。

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

  • 首先,支付服务会在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。

  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性

  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

SpringTask只适用于单机,没有完备的管理机制,没有可视化的交互页面

六、消息队列协议:

1.为什么不直接用TCP/IP:因为TCP/IP协议太过于简单,并不能承载消息的内容和载体,因此在此之上增加一些内容,给消息的传递分发高可用提供基础。

2.为什么消息中间件不直接使用http协议呢?

        1.因为http请求报文头和响应报文头是比较复杂的,包含了cookie、数据的加密解密、状态码、晌应码等附加的功能,但是对于个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据传递,存储,分发就够,要追求的是高性能。尽量简洁,快速。
        2.大部分情况下http大部分都是短链接,在实际的交互过程中,一个请求到响应很有可能会中断,中断以后就不会就行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取消息的过程,出现问题和故障要对数据或消息就行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行。

3.常用MQ协议:

AMQP协议

        特性:支持分布式事务

                   支持消息持久化
                   高性能和高可靠的消息处理优势

Kafka协议

        特性:结构简单

                   解析速度快

                   无事务支持

                   支持持久化

七、消息的分发策略

不公平分发:有的消费者处理速度快有的处理速度慢,轮询就不合适,可以设置参数 channel.basicQos(1),意思就是每个消费者只能处理完当前消息才能接受新的消息。

八、高可用

1.主从共享数据

master和slave共享一块内存,主节点负责写入,从节点负责读取,当主节点挂掉时,从节点代替进行服务

2.主从同步

消息写入主节点,然后同步到从节点,可以实现负载均衡,消费者多个就可以到不同的节点进行消费

3.多主集群同步部署模式

和主从同步类似,但是所有节点都可以写入

4.多主集群转发部署模式

如果插入的数据是Broker1,元数据信息会存储数据的相关描述和记录存放的位置(队列),它会对描述信息,也就是元数据进行同步;

如果消费者在Broker2中进行消费,发现自己没有对应的消息,就会在自己的元数据信息中去查询,如果查询到了直接返回。如果没有查询到就会将该消息的信息携带在请求中转发到其他节点去询问,直到找到所需的信息为止。

场景:比如买火车票或者黄牛买演唱会门票,比如第一个黄牛没有顾客说要买的演唱会门票,但是他会去联系其他的黄牛询问,如果有就返回

5.Master-slave与Breoker-cluster组合的方案

集群+负载均衡

这些集群模式最终目的都是为保证:消息服务器不会挂掉,出现了故障依然可以抱着消息服务继续使用。反正终归三句话:

  1. 要么消息共享
  2. 要么消息同步
  3. 要么元数据兴享

九、预取值

相当于未确认的消息缓冲区存在一个滑动窗口,以避免缓冲区里面无限制的未确认消息问题

使用 basic.qos 方法设置“预取计数”值来完成。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认。

十、发布确认机制

发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法

发布确认机制有三种策略:单个确认发布批量确认发布异步确认发布。其中前两者是同步确认的方式,也就是发布一个/一批消息之后只有被确认发布,后续的消息才能继续发布,后者是异步确认的方式,我们只管发布消息即可,消息是否被确认可以通过回调函数来接收到。

批量确认发布比单个确认发布要更有效率但是一旦发生消息丢失不知道具体丢失的是哪一条,必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。

异步确认是怎么实现的?

把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

十一、死信队列

死信就是无法被消费的消息

  • 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。
  • 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

死信的原因:

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

最大的特点:分流,可以将消息按某些条件分开处理

TTL:表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒,可以设置消息的TTL也可以设置队列的TTL

队列TTL:一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中)

消息TTL:即使消息过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;

十二、延迟队列

TTL和死信队列一组装就变成了延迟队列:TTL让消息延迟多久后成为死信,投递到死信队列中,消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息(即延迟队列)

但是只设置消息TTL不设置队列TTL的话可能会出现TTL短的等待TTL长的,导致没法时间一到就被消费,可以通过 Rabbitmq 自带的延时队列插件解决这个问题

该插件等作用等价于在交换机处延迟,实现方式更为简单,只需要一个交换机,一个队列即可。

通过死信实现消息延迟的情况:

基于插件实现延迟队列: 

延迟队列实现超时未支付订单取消的功能,如果每个订单都存入延迟队列TTL为30min,并发量很高的情况下MQ压力很大,由于大多数订单都在弹出支付的前10s支付,所以设置一开始的TTL为10s,30s内90%的订单都支付了,后续的延迟时间倍增

十三、消息堆积

一旦出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞。

惰性队列就是为了解决该问题的

惰性队列:

  • 接收到消息后直接存入磁盘而非内存

  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)

  • 支持数百万条的消息存储

在3.12版本之后,LazyQueue已经成为所有队列的默认格式

十四、幂等性

我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。 举例:

  1. 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。

  2. 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。

  3. 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。

  4. 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

  • 唯一消息ID

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。

  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库

  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

  • 业务状态判断

基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。 例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

 @Override
    public void markOrderPaySuccess(Long orderId) {
        // 1.查询订单
        Order old = getById(orderId);
        // 2.判断订单状态
        if (old == null || old.getStatus() != 1) {
            // 订单不存在或者订单状态不是1,放弃处理
            return;
        }
        // 3.尝试更新订单
        Order order = new Order();
        order.setId(orderId);
        order.setStatus(2);
        order.setPayTime(LocalDateTime.now());
        updateById(order);
    }

相当于CAS但不保证原子性,可以合并上述操作为这样:  

@Override
public void markOrderPaySuccess(Long orderId) {
    // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
    lambdaUpdate()
            .set(Order::getStatus, 2)
            .set(Order::getPayTime, LocalDateTime.now())
            .eq(Order::getId, orderId)
            .eq(Order::getStatus, 1)
            .update();
}

等同于这样的SQL语句:

UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1

 实现了原子性

十四、各步骤细节

生产者 -> broker

无法路由的消息怎么处理?

1.丢弃

2.备份交换机

3.消息回退:将消息回退给生产者,这样生产者可以自行处理未被路由的消息

4.死信队列

borker -> 消费者

推模式:

缺点:推送速率难以适应接受速率,需要broker来维护,增加了borker自身的复杂度
适用于:消息量不大,消费能力强,实时性要求高

RabbitMQ


拉模式

缺点:不知道什么时候有消息,只能不断拉取,但又不能很频繁的请求,可能出现消息延迟

优点:不至于消费赶不上生产

RocketMQ、Kafka

推还是拉?

个人认为拉更加合适,因为broker本来就可以持久化,也就是可以进行消息存储,保存好消息等待消费者消费即可,为了减轻拉模式的缺点,采用以下方法

长轮询

保持broker和消费者的链接,避免频繁拉取

十五、实战 

producer需要设置:exchange名称,路由键、消息体

config需要设置:声明exchange、队列,绑定交换机和队列

consumer需要设置:监听哪个队列

mq集群

RabbitMQ 集群要求底层erlang所使用cookie值相同,确保各个节点的 cookie 文件使用的是同一个值

将节点2节点3添加到节点1中组成集群

rabbitmqctl stop_app	# 关闭RabbitMQ服务
rabbitmqctl reset	# 重置
rabbitmqctl join_cluster rabbit@node1	# 加入到node1几点
rabbitmqctl start_app	# 启动RabbitMQ服务

注意命令区别:rabbitmqctl stop会将Erlang虚拟机关闭,rabbitmqctl stop_app只关闭RabbitMQ服务

在 node2 和 node3 两台机器分别执行以下命令即可解除集群关系:

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status

然后在 node1 机器上执行以下命令忘记 node2 和 node3 机器:

rabbitmqctl forget_cluster_node rabbit@node2
rabbitmqctl forget_cluster_node rabbit@node3


网站公告

今日签到

点亮在社区的每一天
去签到