【分布式】SpirngBoot 整合RabbitMQ、Exchagne模式、确认消费

发布于:2022-12-14 ⋅ 阅读:(514) ⋅ 点赞:(0)

分布式


SpringBoot 整合 RabbitMQ,消息模型,确认消费


RabbitMQ在模块解耦,接口限流、异步通信等方面发挥重要作用,在成熟的RabbitMQ之前,项目一般采用Spring的事件驱动模型来进行异步通信

缓存穿透 除了缓存null之外 还可以在之前加上布隆过滤器,也就是hash到一个大bitmap,查询先过滤看是否存在key,存在再查询缓存

并且之前的Cache只是表面的知识,虽然加上分布式🔒可以抗住高并发,但是一旦缓存失效,需要准备高可用的解决方案, 并且必须使用mq限流,因为应用层容器Tomcagt的吞吐量有限,必须保证同时到达的流量不查过Tomcat集群的上限

SpringBoot 整合RabbitMQ

RabbitMQ是一款用于接收、存储和转发消息的开源中间件,其核心在于消息、消息模型、生产和消费者,在整合之前,先介绍生产者、消费者、消息、队列、交换机、路由等基本组件

在这里插入图片描述

RabbitMQ可以用邮局来类比【或者STOMP的聊天】, 邮局的核心包括邮件、邮寄箱、寄邮件用户、收邮件用户、邮递员

投递用户A --投递---> 邮件 ---> 邮件箱
							 |_取---> 邮递员M --> 邮件 ---> 收件人B
							 
STOMP聊天服务(私聊频道)

用户A ---发送---> 消息 ----> chat消息频道
						  |__ broker代理--> 用户频道 --> 用户B

投递用户A就是RabbitMQ产生消息的生产者,邮件就相当于消息,接收邮件用户B相当于RabbitMQ接收消息的消费者,邮件箱 就是RabbitMQ的交换机, 邮递员就是 模型中的队列

  • 生产者 Producer
    • 用于产生、发送消息的程序, 向消息队列发布消息
  • 消费者 Consumer
    • 用于监听、接收、消费和处理消息的程序, 从消息队列获取消息
  • 消息 Message
    • 消息由消息头和消息体组成。消息体是不透明的,消息头由一系列可选属性【routing-key,priority优先权、delivery-mode 是否持久性存储】
    • 可以是实际的数据,比如文字、图片等,在RabbitMQ的底层架构中,消息通过二进制的数据流进行传输
  • Channel 信道(频道)
    • 多路复用连接中的一条独立的双向数据流通道,信道建立在真实的TCP内的虚拟连接,复用TCP连接的通道
  • 队列 Queue
    • 存储消息的一种数据结构,保存消息,是消息的容器,也是消息的终点
    • 消息的暂存区或存储区,可以看作中转站,消息经过队列传输到消费者手里
    • 一个消息可以投入一个或者多个队列,一直在Queue中,等待Consumer连接Queue将消息取走
    • 多个消费者同时订阅一个Queue,Queue中的Message平均分摊给多个消费者,不是每个消费者都收到所有的消息并处理
    • 每一个消息只能被一个订阅者接收
  • 交换机 Exchange 路由器
    • 消息的中转站、路由器,提供Producer到Queue之间的路由匹配,接收Procuder的消息,将消息按照路由规则转发到消息队列,路由器只是转发消息,不会存储消息,如果没有Queue半岛到Exchange,会丢弃Producer的Message,用于接收和分发消息
    • 其有四种消息调度策略: Headers,Fanout(广播)、Direct、Topic
  • 路由键 Routing Key
    • 消息头的一个属性,标记消息的路由规则,,一般不单独使用,而是和交换机绑定在一起,决定消息路由到指定的队列,最大长度255字节
  • Binding 绑定
    • 用于建立Exchange和Queue之间的关联, 一个绑定就是基于Binding Key将Exchange和Queue连接起来的路由规则, 交换器理解就是一个由Binding构成的路由表
  • Binding Key 绑定键
    • Exchange和Queue的绑定关系,匹配Routing Key, 最大长度255字节

Broker就是RabbitMQ的部署的服务器,实现消息的通信代理

RabbitMQ的消息模型主要有队列、交换机、路由组成

						——-->交换机-|
生产者 --产生---> 消息 ---|			---消息-->队列 ---监听接收---> 消费者
						——-> 路由 --|

整合过程

  1. 要在项目中使用RabbitMQ,首先就是要引入相关的Starter,在server模块加入, 因为RabbitMQ是基于AMQP协议的,所以springboot是直接抽象的amqp起步依赖
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  1. 配置RabbitMQ 【yml】,分布式架构下,RabbitMQ也是单独部署的,可以搭建集群,所以需要在项目中像redis一样指定RabbitMQ服务器所在的Host、端口号、用户名、密码
#rabbit配置
  rabbitmq:
    virtual-host: /
    host: 192.168.204.100
    port: 5672
    username: cfeng
    password: a123456sgssdhhhs7890bfagds
    publisher-confirm-type: correlated  #发送消息之后进行确认  发送到交换机触发回调 publisher-confirm 废弃
    publisher-returns: true  #发送消息后返回确认信息

因为SpirngBoot的自动配置【可以参见之前的blog和封装minio-starter】,快速使用rabbitMQ【 springBoot、Docker】让使用一个服务变得非常简单

  1. 自定义配置Bean , 配置文件

上面的简易配置只能够使用默认的简易功能,为了自定义使用Bean,创建RabbitmqConfig配置文件

除了@Value(${})可以调取yml的配置项之外,还可以直接使用Environment对象的getProperty获取yml中的所有的配置项 非常方便,注入Environment对象即可

 * rabbitmq自定义配置文件,需要配置单一消费者工厂,多消费者工厂、消息发送Template对象
 */

@Slf4j
@Configuration
@RequiredArgsConstructor
@EnableRabbit  //开启rabbitmq的注解方式
public class RabbitmqConfig {

    //自动装配的连接工厂,连接到rabbitMQ服务器
    private final CachingConnectionFactory connectionFactory;

    //自动装配的简单Rabbit消息监听器所在容器工厂的配置对象
    private final SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    
    //方便快捷代替配置文件,Environment可以方便获取yml文件中的配置项
    private final Environment environment;

    /**
     * 单一消费者实例监听器容器工厂
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
        //消费者监听器所在容器工厂
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置连接的实例
        factory.setConnectionFactory(connectionFactory);
        //设置消息的阐述格式,JSON
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //设置并发消费者实例最大数量(Security的并发最大)
        factory.setMaxConcurrentConsumers(1);
        //设置并发消费者实例每个实例拉取的消息数量 1
        factory.setPrefetchCount(1);

        return factory;
    }

    /**
     * 多消费者实例工厂,针对高并发的业务场景
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置连接工厂,可以使用configure对象配置
        factoryConfigurer.configure(factory,connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //消息的确认消费模式,先设置为NONE,表示不需要确认消费
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);

        //高并发并发消费者实例最大数量  初始状态
        factory.setConcurrentConsumers(10);
        //并发的最大数量
        factory.setMaxConcurrentConsumers(15);
        //并发消费者实例每个实例拉取的消息数量
        factory.setPrefetchCount(10);

        return factory;
    }

    /**
     * 配置RabbitMQ发送消息组件RabbitTemplate
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        //设置发送消息后进行确认, connectionFactory的配置都直接在配置文件配置
//        connectionFactory.setPublisherConfirms(true);
//        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        //生产者通过调用channel.addReturnListener()方法来添加ReturnListener监听器,实现获取没有被正确路由到合适队列的消息
        rabbitTemplate.setMandatory(true);

        //发消息如果成功,给出反馈信息
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,b,s);
            }
        });

        //发送消息失败,给出反馈
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.warn("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message({})",returnedMessage.getExchange(),returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(),returnedMessage.getReplyText(),returnedMessage.getMessage());
            }
        });

        return rabbitTemplate;
    }

当rabbitMQ处理高并发业务场景,使用多消费者实例,正常情况即不需要并发监听消费处理时,只需要配置单一消费者实例的容器工厂

配置完成就可以开始使用,使用rabbitMQ完成生产者发送一串简单的字符串信息到基本的消息模型,由消费者进行监听消费处理

继续在RabbitmqConfig中配置创建队列、交换机、路由及其绑定,也就是创建Queue、基础的DirectExchange、Binding对象

	//方便快捷代替配置文件,Environment可以方便获取yml文件中的配置项
    private final Environment environment;

/**
     * 配置简单的消息模型 :队列、路由、交换机
     */
    @Bean(name = "basicQueue")
    public Queue basicQueue() {
        return new Queue(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.queue-name")),true);
    }

    @Bean
    public DirectExchange basicExchange() {
        return new DirectExchange(environment.getProperty("spring.rabbitmq.exchange-name"),true,false);
    }

    @Bean
    public Binding basicBinding() {
        return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(environment.getProperty("spring.rabbitmq.route-name"));
    }

之后编写消息的生产者BasicPulisher,生产者发送消息

 * 基础模型的生产者
 */


@Slf4j
@Component
@RequiredArgsConstructor
public class BasicPublisher {

    private final ObjectMapper objectMapper;

    private final  RabbitTemplate rabbitTemplate;

    //配置项获取
    private final Environment environment;

    /**
     * 发送消息
     */
    public void sendMsg(String message) {
        //首先要判断消息是否为空
        if(!Strings.isNullOrEmpty(message)) {
            try {
                //传输数据格式JSON
                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                //指定消息模型的交换机
                rabbitTemplate.setExchange(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.exchange-name")));
                //指定消息模型的路由
                rabbitTemplate.setRoutingKey(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.route-name")));
                //消息以二进制字符流传输 , 可以使用writeValueAsBytes,不然转换失败
                Message msg = MessageBuilder.withBody(message.getBytes("utf-8")).build();
                //转化发送消息
                rabbitTemplate.convertAndSend(msg);
                log.info("基本模型: 生产者发送消息: {}",message);
            } catch (Exception e) {
                log.error("基本消息模型 生产者 发送消息发生异常: {}", e.fillInStackTrace());
            }
        }
    }
}

这里的消息发送依赖的就是RabbitTemplate, 之前STOMP服务消息发送依赖的是SimpleMessageOpreations, 也就是一个messageTemplate

之后再简单创建一个基础的消费者实例BasicConsumer

 * rabbitMQ基础模型 消费者,监听接收消息
 */

@Component
@Slf4j
@RequiredArgsConstructor
public class BasicConsumer {

    private final ObjectMapper objectMapper;


    /**
     *  监听接收消费队列中的消息, 这里采用单一容器工厂
     * @param msg  传输的二进制消息,这里的@PayLoad和之前的STOMP的是类似的就是将消息转化为给定的类型
     *  这里使用@RabbitListener 类似之前STOMP的@MessageMapping类似
     */
    @RabbitListener(queues = "${spring.rabbitmq.queue-name}",containerFactory = "singleListenerContainer")
    public void consumeMsg(@Payload byte[] msg) {
        try {
            String message = new String(msg,"utf-8");
            log.info("基本消费模型 消费者:监听到消息: {}", message);
        } catch (Exception e) {
            log.error("基本消息模型-消费者 发生异常 : {}",e.fillInStackTrace());
        }
    }
}

这样也就创建好了基本的生产消费模型,和之前的SpringBoot的事件驱动模型类似,这里编写测试类调用生产者发送消息【主要依靠的是RabbitmqTemplate的convertAndSend】,消费者位置监听器需要指定队列和监听器的容器工厂

@Test
    public  void testProduceMessage() {
        basicPublisher.sendMsg("kiss");
    }

之后测试发送,容易出现的问题就是convert失败,所以这里需要注意生产者写入消息的方式,可以直接采用ObjectMapper的writeValueAsBytes

2022-09-16 09:53:04.392  INFO 4428 --- [           main] c.s.rabbitmq.producer.BasicPublisher     : 基本模型: 生产者发送消息: kiss
2022-09-16 09:53:04.397  INFO 4428 --- [nectionFactory1] c.server.config.RabbitmqConfig           : 消息发送成功:correlationData(null),ack(true),cause(null)

2022-09-16 09:53:04.421  INFO 4428 --- [ntContainer#0-1] c.s.rabbitmq.consumer.BasicConsumer      : 基本消费模型 消费者:监听到消息: �+,
2022-09-16 09:53:04.427  INFO 4428 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2022-09-16 09:53:05.431  INFO 4428 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.

可以生产者生产消息之后,消费者成功监听,监听线程也是单独的【main线程发送消息】

 BasicProducer ---Msg-->  交换机Exchange
 							|__ Binding -- 队列BasicQueue <---listen--BasicConsumer

RabbitMQ除了可以简单发送字节型(通过getBytes方法或者序列化方法)的消息和采用@RabbitListener监听字节数组类型消息之外,还可以通过发送、接收 对象类型的方式实现消息的发送和接收,下面也简单演示一下

建立一个测试的对象类型用于传输,因为要网络传输,所以需要序列化

 * 测试rabbitMQ以对象作为消息类型,不简单为String
 */

@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Student {

    private Integer id;

    private String realName;

    private String username;
}

再单独配置对象类型的队列、交换机和路由

##自定义配置对象类型队列、交换机、路由名称
    queue-object-name:  middleware.mq.object.info.queue
    exchange-object-name: middleware.mq.object.info.exchange
    route-object-name: middleware.mq.object.info.route
    
    
   /**
     * 配置简单类型的消息模型: 对象类型   : 队列、路由、交换机
     */
    @Bean(name = "objectQueue")
    public Queue objectQueue() {
        return new Queue(environment.getProperty("spring.rabbitmq.queue-object-name"));
    }

    @Bean
    public  DirectExchange objectExchange() {
        return new DirectExchange(environment.getProperty("spring.rabbitmq.exchange-object-name"),true,false);
    }

    @Bean
    public Binding objectBinding() {
        return BindingBuilder.bind(objectQueue()).to(objectExchange()).with(environment.getProperty("route-object-name"));
    }

之后就是再同样建立对象之间的消息传播的基本模型: 消费者、生产者、消息

public void sendObjectMsg(Student student) {
        //首先选哟判断是否不为null
        if(!Objects.isNull(student)) {
            try {
                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                //消息的交换机和binding
                rabbitTemplate.setExchange(environment.getProperty("spring.rabbitmq.exchange-object-name"));
                rabbitTemplate.setRoutingKey(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.route-object-name")));
                //发送消息
                rabbitTemplate.convertAndSend(student, message -> {
                    //获取消息属性
                    MessageProperties properties = message.getMessageProperties();
                    //持久化模式,PERSISTENT
                    properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    //设置消息的类型,消息头设置AbstractJavaTypeMapper的默认的类型
                    properties.setHeader(AbstractJavaTypeMapper.DEFAULT_KEY_CLASSID_FIELD_NAME,Student.class);
                    //返回配置后的消息
                    return message;
                });
                log.info("基本消息模型-生产者-发送对象类型消息: {}",student.toString());
            } catch (Exception e) {
                log.error("基本消费模型-生产者, 发送消息失败: {}", student.toString(),e.fillInStackTrace());
            }
        }
    }

对比简单的消息发送,这里主要就是再发送消息时,通过Process指定消息的属性: 消息的持久化模式、消息头【类型】

@RabbitListener(queues = "${spring.rabbitmq.queue-object-name}",containerFactory = "singleListenerContainer")
    public void consumeObjMsg(@Payload Student student) {
        try {
            log.info("基本消费模型 --- 消费者: 监听到消息: {}",student);
        } catch (Exception e) {
            log.error("基本消费模型 --- 消费者: 监听消息异常 {}", e.fillInStackTrace());
        }
    }

之后简单调用Producer的API发送消息即可

2022-09-16 19:46:48.860  INFO 17068 --- [           main] c.s.rabbitmq.producer.BasicPublisher     : 基本消息模型-生产者-发送对象类型消息: Student(id=1, realName=zs, username=cfeng)
2022-09-16 19:46:48.872  INFO 17068 --- [nectionFactory1] c.server.config.RabbitmqConfig           : 消息发送成功:correlationData(null),ack(true),cause(null)

2022-09-16 19:46:48.890  INFO 17068 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2022-09-16 19:46:48.892  INFO 17068 --- [ntContainer#1-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2022-09-16 19:46:48.905  INFO 17068 --- [ntContainer#0-1] c.s.rabbitmq.consumer.BasicConsumer      : 基本消费模型 --- 消费者: 监听到消息: Student(id=1, realName=zs, username=cfeng)

这里演示采用的都是Direct交换机模型,一共四种交换机模型

多种消息模型 — Exchange调度策略

调度策略就是Exchange交换机收到生产者发送的消息后按照什么规则转发消息,调度策略的三个因素:Exchange Type、 Binding Key 和 消息的标记信息(Routing Key 和headers, Exchagne根据消息的Routing Key和Exchange 绑定Queue 的Binding Key分配消息, 生产者发送消息时,一般会指定Routing Key,指定路由规则, Exchange Type和 binding key确定时,

Producer指定routing key即可决定消息的流向, Exhange路由有多种不同的模式, 也就决定了几种不同的消息模型

Fanout 订阅 、广播模式 ---- 适用于 业务数据需要广播场景: 用户操作写日志

Fanout Exhange,Fanout交换机具有广播消息的作用,当消息进入Exchange之后,交换机会检查 binding了哪些Queue,找到之后,将消息发送到相关的Queue中【每一个Queue】, 由队列对应的消费者监听使用

在这里插入图片描述

订阅模式 与 Binding Key和Routing Key无关, 同时,Fanout交换机转发消息最快,交换器会将消息分发给所有有绑定关系的消息Queue中, 就像子网广播一样,都获得一份复制的消息; 就算绑定了路由也是无用的; 队列数N >= 1

其实该订阅模式就类似STOMP协议下的服务,订阅相关的频道,就可以介绍到发送到该频道的消息,所以进入的用户都订阅public频道就可以接收消息

这里简单的演示消息的发送,封装消息实体ChatMessage,建立多个队列

###自定义配置广播、订阅模型的多个Queue、交换机,不需要路由,因为绑定了也无用,会子网官博,不是独享
    queue-one-name: middleware.mq.fanout.one.queue
    queue-two-name: middleware.mq.fanout.two.queue
    exchange-fanout-name: middleware.mq.fanout.info.route
    
/**
     * 广播模式交换机、 不需要配置路由信息,因为路由无用,子网广播
     */
    @Bean(name = "fanoutQueueOne")
    public Queue fanoutQueueOne() {
        return new Queue(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.queue-one-name")));
    }

    @Bean(name = "fanoutQueueTwo")
    public Queue fanoutQueueTwo() {
        return new Queue(Objects.requireNonNull(environment.getProperty("queue-two-name")));
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.exchange-fanout-name")),true,false);
    }

    /**
     * 只需要将交换机和队列绑定再一起即可,不需要指定Bindinging Key
     */
    @Bean
    public Binding fanoutBindingOne() {
        return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
    }

    @Bean
    public Binding fanoutBindingTwo() {
        return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
    }

之后简单封装消息

@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class MessageEntity implements Serializable {

    private static final long serialVersionUID = -6995434700637844150L;

    private Integer id;

    private String message;

    private Student sender;
}

消息生产者ModelPubilsher

@Component
@RequiredArgsConstructor
@Slf4j
public class FanoutPublisher {

    private final RabbitTemplate rabbitTemplate;

    private final ObjectMapper objectMapper;

    private final Environment environment;

    public void sendMsg(MessageEntity messageEntity) throws Exception {
        if(!Objects.isNull(messageEntity)) {
            try {
                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

                //这里只需要指定交换机,不需要给出Routing key
                rabbitTemplate.setExchange(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.exchange-fanout-name")));
                //创建消息Message,转换
                Message message = MessageBuilder.withBody(objectMapper.writeValueAsBytes(messageEntity)).build();
                log.info("消息模型Fanout 生产者发送消息: {}",messageEntity);
            } catch (Exception e) {
                log.error("消息模型Fanout 生产者发送消息异常: {}", messageEntity,e.fillInStackTrace());
            }
        }
    }
}

监听队列的消费者

@Component
@Slf4j
@RequiredArgsConstructor
public class ModelConsumer {

    private final ObjectMapper objectMapper;

    @RabbitListener(queues = "${spring.rabbitmq.queue-one-name}",containerFactory = "singleListenerContainer")
    public void consumeFanoutMsgOne(@Payload byte[] msg) {
        try {
            MessageEntity messageEntity = objectMapper.readValue(msg,MessageEntity.class);
            log.info("消息模型fanoutOne 监听到消息:{}", messageEntity);
        } catch (Exception e) {
            log.info("消息模型——消费者one 发生异常: {}",e.fillInStackTrace());
        }
    }

    @RabbitListener(queues = "${spring.rabbitmq.queue-two-name}",containerFactory = "singleListenerContainer")
    public void consumeFanoutMsgTwo(@Payload byte[] msg) {
        try {
            MessageEntity messageEntity = objectMapper.readValue(msg,MessageEntity.class);
            log.info("消息模型fanoutTWO 监听到消息:{}", messageEntity);
        } catch (Exception e) {
            log.info("消息模型——消费者two 发生异常: {}",e.fillInStackTrace());
        }
    }
}

之后调用生产者发送消息即可

2022-09-16 21:15:27.775  INFO 15684 --- [           main] c.s.rabbitmq.producer.FanoutPublisher    : 消息模型Fanout 生产者发送消息: MessageEntity(id=1, message=你好,春风, sender=Student(id=1, realName=zs, username=cfeng))
2022-09-16 21:15:27.783  INFO 15684 --- [nectionFactory1] c.server.config.RabbitmqConfig           : 消息发送成功:correlationData(null),ack(true),cause(null)
2022-09-16 21:15:27.795  INFO 15684 --- [ntContainer#2-1] c.s.rabbitmq.consumer.FanoutConsumer     : 消息模型fanoutTWO 监听到消息:(Body:'[B@2855a650(byte[87])' 
2022-09-16 21:15:27.795  INFO 15684 --- [ntContainer#3-1] c.s.rabbitmq.consumer.FanoutConsumer     : 消息模型fanoutOne 监听到消息:

使用Fanout交换机可以让一个交换机绑定多个队列,从而对应多个消费者, 不需要指定binding Key, 所以发送消息也不需要Routing Key

广播式、订阅式消息模型适用于 业务数据需要广播式传播, 说白了,也就是多个地方需要使用该数据,比如用户操作日志 — 需要将操作信息发送给数据库,同时也需要将其发送给专门的日志系统进行存储, 将其操作日志封装为消息实体,进行传播即可

Direct 路由模式 — 业务数据直接传输消费

Direct交换器也就是直接交换器,就是最直接的意思,需要Routing key和Binding key, 识别消息头中的Routing Key,查找Exchange和Queue之间的Binding key,如果匹配,将消息分发到该队列,Direct 式Exchange的默认模式

默认提供了一个Exchange,名称为空, 类型为Direct,绑定到所有的Queue, 每一个Queue和该Exchange的Binding Key为Queue的名称, 所以不交换器的情况下是可以借助默认交换器发送消息

在这里插入图片描述

该模式可以参见上面的整合过程的Basic Queue

此模型需要严格意义的绑定,也就是在Binding配置时,在配置Exchange和Queue之间的关系时,同时需要指定Binding Key; 这样Producer发送数据时,会传递一个Routing Key检验

rabbitTemplate.setExchange(environment.getProperty("spring.rabbitmq.exchange-object-name"));
                rabbitTemplate.setRoutingKey(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.route-object-name")));


这里设置的Routing-key和配置conifg的Binding Key相同
@Bean
    public Binding basicBinding() {
        return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(environment.getProperty("spring.rabbitmq.route-name"));
    }

这个时候就不会像Fanout一样广播了,消息只会投递到绑定的且满足路由Routing Key的Queue上,不会投递到所有的绑定Queue上

该模式适合业务数据直接传输消费,比如业务服务模块之间的信息交互,一般业务服务模块之间的消息通信时直接、实时的可以借助DirectExchange, 也是最常见的消息通信模式 == 路由模式

Topic 通配符模式

TopicExchange 是一种 发布-主题–订阅 的消息模型,Topic模式也要求交换机、路由、队列严格绑定构成, 和之前的Direct不同,其支持通配方式的路由,可以通过为路由的名称指定特定的 通配符 * 或者 #, 从而绑定到不同的队列中; Routing Key 是一个以 句号. 为分隔的字符串

在这里插入图片描述

通配符 * :表示一个特定的单词

通配符 #: 表示任意的单词,可以0到多个 # > *

如果direct是正规军,那么Topic就是王牌军,其可以统治direct, 当路由名称包含* 时,*代表一个单词,所以就会降级为基于DirectExchange的消息模型

而路由名称包含# 时,由于匹配多个单词所以绑定的路由不再起作用,相当于绑定基于FanoutExchange的消息模型, 哪怕有Routing Key,也会匹配到所有的,不再起作用

在这里插入图片描述

可以看出Topic和Direct的最主要的不同就是绑定规则Binding Key 是通配的

这里Cfeng 就只是大概阐述一下代码了,提一下和Direct不同的部分

###首先就是定义的route-key 是包含通配符的
route-one-name: middleware.mq.object.*.route 【*匹配一个单词】
route-two-name: middleware.mq.#.info.route  【这里#可以匹配任意长度route字符串】

之后就是config中对上面两个路由的绑定,这里创建的是Topic交换机实例
    @Bean
    public  TopicExchange objectExchange() {
        return new DirectExchange(environment.getProperty("spring.rabbitmq.exchange-topic-name"),true,false);
    }
    
@Bean
    public Binding fanoutBindingOne() {
        return BindingBuilder.bind(topicQueueOne()).to(topicExchange()).with(Objects.requireNonNull(environment.getProperties("spirng.rabbitmq.route-one-name")));
    }

    @Bean
    public Binding fanoutBindingTwo() {
        return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).with(Objects.requireNonNull(environment.getProperties("spirng.rabbitmq.route-two-name")));
    }

建立交换机和Queue的Binding关系之后,就直接使用Producer向Exchagne发送数据,这里可以测试多个Routing Key验证通配

 ......
     public void sendTopicMsg(MessageEntity messageEntity, String route) throws Exception {
     try {
         rabbitTemplate.setMessageConverter(XXX);
         rabbitTemplatesetExchange(environment.getProperties("XXX"));
         //路由绑定
         rabbitTemplate.setRoutingKey(route);
         //创建消息
         Message msg = ....
         rabbitTemplate.sendAndConvert(msg);
     } catch(Exception e) {
         ....
     }
 }

之后消费者就是@RabbitListener中指定queues即可,消费容器工厂还是单个消费者即可

测试的时候,可以测试输入不同的Routing Key,可以发现只要符合匹配规则,对应的消费者都能够消费消息

确认消费机制 RPC

RabbitMQ 是高性能、高可用的消息分布式中间件,不只是因为消息异步通信和业务模块解耦、 接口限流、消息延迟处理等功能, 重要的是其在消息发送过程中,可以保证消息成功发送、不会丢失、确认消费

  • 保证消息成功发送: 生产者的生产确认机制( Publisher的Confirm机制)
  • 消息不丢失、确认消费: 面向消费者确认消费而言

消息高可用 – 确认消费

分布式消息中间件RabbitMq本身是基于异步的消息处理, 前面的实例就可以知道: 生产者P将消息发送到RabbitMQ后不会知道消费者处理成功或者失败, 甚至都不知道是否有消费者来处理消息

  • 消息是否真正发送成功,P自认为发送成功,可是采用template发送时,绑定模型如果不存在,实际上是发送失败的 【之前的一个demo Cfeng甚至没有bind,但是P端仍然认为投递成功】
  • 由于特殊原因,RabbitMQ服务挂掉了,导致需要重启,但是这时队列中还有大量消息没有消费,可能重启过程中丢失
  • 消费者监听消息时,可能出现监听失败的问题,导致消息所在队列找不到消费者而不断重新入队,重复消费

在这里插入图片描述

但是在实际场景中,可能需要同步处理,同步等待服务端将消息处理完成之后再进行下一步的操作,相当于RPC(remote procedure Call 远程过程调用)

RabbitMQ实现RPC机制:

  • P 发送消息,在消息头属性MessageProperties中设置属性 ReplyTo — Queue名称 【消费者处理完成后的消息发送到该Queue】,和correlationId: 此次请求的标识,消费者处理完成之后将此属性返还,P由此确认消息是否成功执行
  • 消费者收到消息进行处理
  • 处理完成生成应答消息replyTo指定的Queue,同时带上correlationId
  • P 订阅了replyTo指定的Queue,从中收到应答消息,根据correlationId确定执行情况
消息生产确认

为了确认消息是否真的发送成功,RabbitMQ要求生产者在发送消息后进行发送确认,代表消息成功发出,配置的规则很easy,

就是配置Template时设置PublisherConfirms,和其反馈信息Returns都为True,同时给出ConfirmCallBack作为发送成功的处理

publisher-confirm-type: correlated  #发送消息之后进行确认  发送到交换机触发回调 publisher-confirm 废弃
    publisher-returns: true  #发送消息后返回确认信息

rabbitTemplate.setConnectionFactory(connectionFactory);
        //发消息如果成功,给出反馈信息
        rabbitTemplate.setConfirmCallback((correlationData, b, s) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,b,s));

        //发送消息失败,给出反馈
        rabbitTemplate.setReturnsCallback(returnedMessage -> log.warn("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message({})",returnedMessage.getExchange(),returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(),returnedMessage.getReplyText(),returnedMessage.getMessage()));

confirm为成功,returns为失败,这样P就可以知道消息是成功发送成功

rabbitMQ宕机、消息积压 — 持久化

rabbitMQ可能出现消息积压的情况 — Producer生产能力强,Consumer宕机、Consumer消费能力弱

消息积压最简单的方法就是增加rabbitMQ机器的数量,如果不行,那就手工增加消费能力 — 并发消费【多个消费者消费,同时设置消费最大数量】 ,还可以再开一个消费者,将MQ的消息全部录入数据库,后续处理

而RabbitMQ本身为保证消息不丢失,建议创建队列、交换机时设置持久化参数durable为true, 也就是durable参数取值为true

@Bean(name = "basicQueue")
    public Queue basicQueue() {
        return new Queue(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.queue-name")),true);
    }

    @Bean
    public DirectExchange basicExchange() {
        return new DirectExchange(environment.getProperty("spring.rabbitmq.exchange-name"),true,false);
    }


properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

持久化之后,RabbitMQ服务器崩溃重启操作,队列、交换机依然存在并且消息不会丢失

但是可能发生RabbitMQ收到消息还未持久化,就宕机了【和数据库一样】,那么这里也可以采用相似的方式 : 事务

AMQP的重要特点 – 支持事务, 如果生产者将持久化消息发送给服务器,consume命令本身无Response返回,为了应对发出消息服务器崩溃,可以预先通过

  • txSelect()开启事务
  • txCommit() 提交事务
  • txRollBack() 回滚事务
消息不重复消费 — ACK模式

为了保证消息不重复消费、能够准备被消费,提供了消息确认机制,ACK模式

Ack — acknowledge key确认字符,接收方在收到报文后,若未发现错误,就给方法放确认回答ACK,表明信息正确接收

TCP报文的控制位就有一个ACK,当然AMQP协议中基于的信道Channel,不是TCP

为了提高信息的可用性、防止消息重复消费, 一般都会使用消息确认机制 — 当消息确认消费后,消息才会从队列中移除

RabbitMQ提供的取人模式: AcnowledgeMode : NONE、MANUAL手动,AUTO自动,电商平台支付的支付金额、游戏充值的消息提示是必须进行确认消费的,不然就不够严谨可靠

NONE – 无需确认

生产者发送消息到队列,消费者消费之后不给出任何反馈消息, 一股脑发送即可,不需要考虑丢失等异常,就像UDP的感觉, 这样处理流程更短,更快一点,但是实际生产中很少使用

类似用户禁用APP更新提醒,P发送消息提示更新,用户消费之后并不会提示已消费

AUTO — 自动确认

生产者发送消息到队列,消费者监听消息之后,需要发送一个AUTO ACK的反馈给服务器, 之后消息出队

RabbitMQ自动触发, 依靠的是RabbitMQ的内置组件

Producer ---msg--> Exchange---bind-->Queue <--listen--- Consumer
	|___RabbitMQ服务器 --- AUTO ACK <---- rabbitMQ 内置组件---|

而确认消费的模式,是在容器工厂配置中配置setAcckownledgeMode即可, 之前为了流程简单,模式设置的为NONE

@Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置连接工厂,可以使用configure对象配置
        factoryConfigurer.configure(factory,connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());

        //消息的确认消费模式,先设置为NONE,表示不需要确认消费
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);

而这里的交换机模式选择Direct,也就是最初是演示使用的,路由模式

生产者还是BasicProducer, 确认消费模式,主要的变化是在消费者端, 但是这里是自动消费, 消费者的@RabbitListener中的container就会指定容器工厂,设置了Auto

当然,也可以直接在配置文件中指定:

spring:
	rabbitmq:
		listener:
      		direct:
       		 acknowledge-mode: auto

和之前的不同就是增加了ACK确认,重新执行代码

[nectionFactory1] c.server.config.RabbitmqConfig           : 消息发送成功:correlationData(null),ack(true),cause(null)

可以看到ack为true,也就是确认消费

MANUAL 手动确认

人工手动确认消息消费,生产者发送消息到队列后,消费者监听到该消息时需要 手动以代码方式发送一个ACK 给服务器,之后消息出队,告知P 成功消费

Producer ---msg--> Exchange---bind-->Queue <--listen--- Consumer
	|___RabbitMQ服务器 --- ACK   信息 <---- 人为手动ACK  ---|

手动确认 需要在消费者 消费 消息逻辑 之后 编写确认消费的逻辑, 消息出队,避免消息重复投入队列而重复消费, 还是使用Direct模式交换机

和AUTO不同, MANUAL需要指定容器Container,并且当容器监听的队列和监听的Consumer

/**
     * 配置监听容器
     * 消费者监听实例
     */
    @Resource
    private BasicConsumer basicConsumer;

    @Bean(name = "simpleContainerManual")
    public SimpleMessageListenerContainer simpleMessageListenerContainer(@Qualifier("basicQueue") Queue basicQueue) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //消费实例配置
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);
        container.setPrefetchCount(1);

        //消息确认模式
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        container.setQueues(basicQueue);

        //手动确认消费 消费者需要实现ChannelAwareMessageListener接口
        container.setMessageListener(basicConsumer);

        return container;
    }

这里的消费者需要实现ChannelAwareMessageListener接口

@Component
@Slf4j
@RequiredArgsConstructor
public class BasicConsumer implements ChannelAwareMessageListener {

    private final ObjectMapper objectMapper;

    /**
     * @param message
     * @param channel 通道实例
     * @throws Exception
     */
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //这里就是rabbit的一个Message监听器,可以监听到消息
        MessageProperties properties = message.getMessageProperties();
        //获取消息分发的全局标识
        long deliveryTag = properties.getDeliveryTag();
        try {
            byte[] msg = message.getBody();
            //解析消息
            String entity = objectMapper.readValue(msg,String.class);
            //log
            log.info("确认消费模式 - 人为手动确认消息: {}", entity);
            //执行业务逻辑后,手动ACK, deliverTag 为全局分发标识(唯一)  是否允许批量消费: 这里设置为true
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            log.info("人为手动确认消费- 监听到消息: {}", e.fillInStackTrace());
            //发生异常,拒绝ACK
            channel.basicReject(deliveryTag,false);
        }
    }

基于MANUAL确认消费模式 需要在执行完实际的业务逻辑业务之后,手动调用相关方法比如BasicAck,Reject,当处理过程发生异常,需要执行确认消费, 避免消息一直留在队列中,导致重复消费

INFO 2268 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.204.100:5672]
2022-09-17 16:56:38.394  INFO 2268 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#28fc1132:0/SimpleConnection@3c82bac3 [delegate=amqp://cfeng@192.168.204.100:5672/, localPort= 54514]
2022-09-17 16:56:38.477  INFO 2268 --- [           main] c.server.RabbitmqBasicMessageTest        : Started RabbitmqBasicMessageTest in 5.256 seconds (JVM running for 6.762)

2022-09-17 16:56:39.102  INFO 2268 --- [           main] c.s.rabbitmq.producer.BasicPublisher     : 基本模型: 生产者发送消息: kiss
2022-09-17 16:56:39.116  INFO 2268 --- [nectionFactory1] c.server.config.RabbitmqConfig           : 消息发送成功:correlationData(null),ack(true),cause(null)
2022-09-17 16:56:39.121  INFO 2268 --- [ntainerManual-1] c.s.rabbitmq.consumer.BasicConsumer      : 确认消费模式 - 人为手动确认消息: kiss

用户登录成功 写入日志

用户登录操作,作为常规系统的基础操作,一些应用需要跟踪用户的登录、操作轨迹,需要记录用户的登录的日志,将相关的记录录入数据库

用户登录是主流程, 记录用户登录日志是辅助流程, 所以实际过程中,记录用户登录日志操作不应该影响用户登录,也就是不应该是同步的,因为再次数据库操作耗时; 这个时候就可以RabbitMQ进行解耦

在这里插入图片描述

常规操作也可以采用之前抢红包操作录入数据库的@Async; 整体业历程:包括 登录模块 和 日志记录模块, 这两个模块不应该相互影响,异步操作

这里为了模拟整个流程,就大概先分析一下思路,首先网站的用户登录成功进入后台的判断逻辑,判断之后就发送一个消息,借助RabbitTemplate即可,发送之后,日志监听消费者监听到消息获取相关的登录信息录入数据库, 这里采用Direct交换机, 手动ACK

这里也就不完整写代码了,没什么特别的地方,这里Cfeng就简单写一些代码说明整个流程【 结合之前的Cfeng.net 使用的Security】

//首先就是用户信息, 就是之前的ChatUser -- 网站用户
//之后封装一个登录日志实体类  SysLog   包括id、用户id、用户操作所属模块、操作的数据、备注、操作时间等

之前Security登录成功后设置的SuccessfulURL, 现在因为需要登录成功后进行后处理操作,所以这里需要自定义SuccessfulHandler, 在其中调用  LogPublisher 将当前用户的登录信息发布到Exchange  ; 之后再跳转进入成功的欢迎页面

LogPbulisher的操作就是封装对象类型的消息, MessageProcessor就可以完成,指定消息头的java type为SysLog

在rabbitMQ中配置Direct交换机, 配置日志系统的消费的队列LogQueue, 并且指定日志系统的消费者LogConsumer,配置相关的监听容器container,指定队列和消费者, LogConsumer实现Listener接口 【确认消费】, 在message方法中进行数据库的写入操纵, 当然需要try catch, 如果出现异常手动Direct, 正常就ACK,消息出队避免重复消费,导致数据库消息异常

如果有相关friend好奇,我也可以详细说一下具体的代码🌳

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到