Spring Boot | Spring Boot 整合 “RabbitMQ“ ( 消息中间件 ) 实现

发布于:2024-05-09 ⋅ 阅读:(26) ⋅ 点赞:(0)

目录:

Spring Boot 整合 “RabbitMQ” ( 消息中间件 )实现 :

在这里插入图片描述

作者简介 :一只大皮卡丘,计算机专业学生,正在努力学习、努力敲代码中! 让我们一起继续努力学习!

该文章参考学习教材为:
《Spring Boot企业级开发教程》 黑马程序员 / 编著
文章以课本知识点 + 代码为主线,结合自己看书学习过程中的理解和感悟 ,最终成就了该文章

文章用于本人学习使用 , 同时希望能帮助大家。
欢迎大家点赞👍 收藏⭐ 关注💖哦!!!

(侵权可联系我,进行删除,如果雷同,纯属巧合)


  • Spring Boot 可对 RabbitMQ工作模式 ( 6种工作模式 )进行了 整合,并支持 多种整合方式,包括 : 基于API 的方式 基于配置类的方式 基于注解的方式
  • 下面将 选取常用 Publish/Subscribe ( 发布订阅 )工作模式 Routing ( 路由 )工作模式Topics ( 通配符模式 ) 工作模式 3种工作模式完成在 Spring Boot 项目 中的 消息服务整合实现 ( 整合实现"消息中间件" )。

一、Spring Boot 整合 整合实现 : Publish/Subscribe ( 发布订阅 ) 工作模式 ( "3种"整合实现方式 )

  • Spring Boot 整合 RabbitMQ 中间件实现消息服务,主要围绕3个部分的工作进行展开 :
    定制中间件消息发送者发送消息消息消费者接收消息。其中,定制中间件比较麻烦的工作,且必须预先定制
  • 下面我们以 用户注册成功同时 “发送邮件通知”发送短信通知 这一场景为例,分别使用 : 基于API 的方式 基于配置类的方式 基于注解的方式3种 方式实现 Publish/Subscribe ( 发布订阅 )工作模式 整合

1.1 基于"API"的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 偶然使用

  • 基于API 的方式主要讲的是使用 Spring 框架提供的 API管理类 : AmqpAdmin 定制 消息发送组件,并进行消息发送
  • 这种 定制消息发送组件方式与在 RabbitMQ 可视化界面上通过 对应面板 进行 组件操作实现基本一样,都是通过管理员的身份预先手动声明交换器队列路由键等,然后 “组装消息队列” 供应用程序调用,从而 实现消息服务。下面我们就对这种 基于 API的方式进行 讲解和演示
(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

    application.properties ( 全局配置文件 ):

    #配置RabbitMQ消息中间件的"连接配置"
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    #配置RabbitMQ虚拟主机路径/ , 默认可省略
    spring.rabbitmq.virtual-host=/
    

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-parent</artifactId>
           <version>3.2.5</version>
           <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.myh</groupId>
        <artifactId>chapter_22</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>chapter_22</name>
        <description>chapter_22</description>
        <properties>
           <java.version>17</java.version>
        </properties>
        <dependencies>
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-amqp</artifactId>
           </dependency>
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-web</artifactId>
           </dependency>
    
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-test</artifactId>
              <scope>test</scope>
           </dependency>
           <dependency>
              <groupId>org.springframework.amqp</groupId>
              <artifactId>spring-rabbit-test</artifactId>
              <scope>test</scope>
           </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
    <!--	<build>-->
    <!--		<plugins>-->
    <!--			<plugin>-->
    <!--				<groupId>org.springframework.boot</groupId>-->
    <!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
    <!--			</plugin>-->
    <!--		</plugins>-->
    <!--	</build>-->
    
    </project>
    
(2) 使用 AmqpAdmin “定制消息发送组件”
  • 在项目的测试类 : Chapter22ApplicationTests , 在该测试类先引入 AmqpAdmin 管理类 定制 Publish/Subscribe 工作模式 所需的 消息组件代码如下所示 :

    Chapter22ApplicationTests.Java ( 测试类 ) :

    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter22ApplicationTests {
    
        @Test
        void contextLoads() {
        }
    
        //自动注入 RabbitMQ消息中间件的API操作类: AmqpAdmin
        @Autowired
        private AmqpAdmin amqpAdmin;
    
        @Test
        public void amqpAdmin() {
            //1.定义"fanout"类型的交换器
            amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));
            //2.定义两个"默认持久化队列" , 分别处理email 和 sms
            amqpAdmin.declareQueue(new Queue("fanout_queue_email")); //该队列处理"邮件信息"
            amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));//该队列处理"短信信息"
            //3.将两个队列分别与"交换器"进行绑定
            amqpAdmin.declareBinding(new Binding("fanout_queue_email",
                    Binding.DestinationType.QUEUE,"fanout_exchange","",null));
            amqpAdmin.declareBinding(new Binding("fanout_queue_sms",
                    Binding.DestinationType.QUEUE,"fanout_exchange","",null));
        }
    
    }
    

    上面的代码中,使用 Spring 框架提供的消息管理组件 AmqpAdmin定制了消息组件。其中定义了一个 fanout 类型交换器 : fanout_exchange ;
    还定义两个消息队列 : fanout_queue_emailfanout_queue_sms,分别用来处理邮件信息短信信息 ;最后 将定义的两个队列分别交换器绑定

(3) 查看 “RabbitMQ消息组件” 的定制效果
  • 执行上述单元测试方法 : amgpAdmin()验证 RabbitMQ 消息组件的定制效果单元测试方法执行成功,可通过 RabbitMQ 可视化管理页面具体操作为: 打开RabbitMQ安装目录中的 sbin目录,双击 rabbitmq-server.bat ,后在浏览器访问访问 : http://localhost:15672 ,输入密码和密码 : guest , 此时 可在可视化界面查询 “RabbitMQ消息组件定制效果 ,如下图所示

    在这里插入图片描述


    在这里插入图片描述

    通过上面图片看出,在 Queues 队列面板页面中,展示有定制消息队列信息与程序中 "定制"消息队列一致。可以单击消息队列名称查看每个队列详情

    通过上述操作可以发现,在管理页面中提供了 消息组件交换器队列的定制功能。在程序中使用 Spring 框架提供的管理员 API组件 AmqpAdmin 定制消息组件和在管理页面上手动定制消息组件本质是一样 的。

(4) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"
        private Integer id;
        private String username;
    
        public Integer getId() {
            return id;
        }
    
        public void setId(Integer id) {
            this.id = id;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", username='" + username + '\'' +
                    '}';
        }
    }
    
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架提供的 RabbitTemplate 模板类实现消息发送示例代码如下 ( 在原测试类中加入以下"主要代码") :

    Chapter22ApplicationTests.java :

    import com.myh.chapter_22.domain.User;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter22ApplicationTests {
    
        @Test
        void contextLoads() {
        }
    
        @Autowired
        //引入进行"消息中间件"管理的 RabbitTemplate组件对象
        private RabbitTemplate rabbitTemplate;
    
       /**
        * "消息发送者" 发送信息
        */
        @Test
        public void psubPublisher() {
            User user = new User(); //消息发送者 要发送的"消息内容"
            user.setId(1);
            user.setUsername("石头");
            /*
              使用 RabbitTemplate中的
              convertAndSend(String exchange, String routingKey, Object object)方法完成 "消息发送者" "发送信息" 这一行为
                 String exchange : 表示发送信息的"交换器"
                 String routingKey : 表示路由键,因为此处实现的Publish/Subscrible工作模式,所以"不需要指定"
                 Object object : 表示要"发送的信息内容"
             */
            //执行哪个"交换器" 和 指定给该"交换器"的"消息内容"
            rabbitTemplate.convertAndSend("fanout_exchange","",user); //user 表示要发送的"信息内容"
        }
     }
    

    上述代码中,先使用@Autowired 注解引入了进行 消息中间件管理RabbitTemplate 组件对象,然后使用该模板工具类
    convertAndSend ( String exchange , String routingKey , Object object )方法进行消息发布。其中,该方法中的
    第 1个参数表示发送消息交换器
    ,这个参数值与之前定制交换器名称一致第2个参数表示路由键,因为实现的是Publish/Subscribe 工作模式所以不需要指定第3 个参数发送的消息内容接收 Object 类型

③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决 上述 消息中间件发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般 推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"
    
        @Bean //将该类的返回值对象加入到IOC容器中
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" 
        }
    }
    
  • 再次执行 psubPublisher( ) 方法,该方法 执行成功后,查看 RabbitMQ可视化管理页面 中的 Queues面板信息,具体如下图所示

    在这里插入图片描述

  • 进入 队列详情页面查看消息 , 如下图所示

    在这里插入图片描述

    上图看出,在 消息队列存储有指定发送消息详情其他参数信息,这与程序指定发送信息完全一致

(5) “消息消费者” 接收消息
  • 项目中创建 : service 包,并在该包下创建一个 针对RabbitMQ 消息中间件进行 "消息接收" 和 “处理” 的业务类 :RabbitMQService
    代码如下所示

    RabbitMQService.java

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    @Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类
    
        /**
         * Publish/Subscribe 工作模式接收、处理 "邮件业务"
         *
         * @RabbitListener(queues = "fanout_queue_email") :
         *  使用该注解监听"队列信息"后,一旦服务 "启动且监听到" 指定的队列有 "消息" 存在,该注解对应的方法就会 "接收并消费" 队列中的消息。
         *  
         */
    @RabbitListener(queues = "fanout_queue_email") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作
        public void psubConsumerEmail(Message message) {
            byte[] body = message.getBody();
            //转化为字符串
            String str = new String(body);
            System.out.println("邮件业务接收到的信息为: "+str);
        }
    
       /**
       * Publish/Subscribe 工作模式接收、处理 "短信业务"
       */
    @RabbitListener(queues = "fanout_queue_sms") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作
      public void psubConsumerSms(Message message) {
          byte[] body = message.getBody();
          //转化为字符串
          String str = new String(body);
          System.out.println("短信业务接收到的信息为: "+str);
         }
      }
    

    上面的代码中创建了一个 接收处理 RabbitMQ消息业务处理类 : RabbitMQService,在该类中使用 Spring 框架提供的 @Rabbitistener 注解监听队列名称为 fanout_queue_emailfanout_queue_sms消息监听 的这 两个 队列是 前面 “指定发送并存储” 消息消息队列

    需要说明的是,使用 @RabbitListener 注解 监听队列消息后,一旦服务启动且监听指定的队列消息存在 ( 目前两个队列都存在消息 ),对应注解方法立即接收并消费队列消息 ( 即 注解对应方法会被调用 )。另外,在接收消息的方法中,参数类型 可以与 发送的消息类型保持一致,或者使用 Object 类型Message 类型如果使用与消息类型对应参数接收消息的话,只能够得到 具体的消息体信息如果使用 Object或者 Message 类型参数接收消息的话,还可以获得除了消息体外的消息参数信息 MessageProperties


    此时 成功启动项目后控制台显示的消息消费效果下图所示

    在这里插入图片描述

    从上图可以看出项目启动成功后消息消费者监听” 到 “消息队列” 中存在两条消息,并进行了 各自的消费 ( 执行了 @Rabbitistener 注解 对应的"方法" )。与此同时,通过 RabbitMQ 可视化管理页面Queues 面板查看队列消息情况会发现两个队列中存储的消息 已经 被消费如下图所示

    在这里插入图片描述

    至此,一条完整消息发送、消息中间件存储消息消费Publish/Subscribe 工作模式的业务案例 已经实现


    ps

    使用的是开发中常用@RabbitListener注解 监听指定名称队列消息情况这种方式会在监听指定队列存在消息后立即进行消费处理。除此之外,还可以使用 RabbitTemplate 模板类receiveAndConvert ( String queueName )方法 手动消费指定队列中的消息

1.2 基于 “配置类” 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) - 常用

  • 基于 配置类方式 主要讲的是使用 Spring Boot 框架提供的 @Configuration 注解 配置类 定制消息发送组件并进行消息发送例子代码如下所示
(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

    application.properties ( 全局配置文件 ):

    #配置RabbitMQ消息中间件的"连接配置"
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    #配置RabbitMQ虚拟主机路径/ , 默认可省略
    spring.rabbitmq.virtual-host=/
    

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-parent</artifactId>
           <version>3.2.5</version>
           <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.myh</groupId>
        <artifactId>chapter_22</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>chapter_22</name>
        <description>chapter_22</description>
        <properties>
           <java.version>17</java.version>
        </properties>
        <dependencies>
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-amqp</artifactId>
           </dependency>
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-web</artifactId>
           </dependency>
    
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-test</artifactId>
              <scope>test</scope>
           </dependency>
           <dependency>
              <groupId>org.springframework.amqp</groupId>
              <artifactId>spring-rabbit-test</artifactId>
              <scope>test</scope>
           </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
    <!--	<build>-->
    <!--		<plugins>-->
    <!--			<plugin>-->
    <!--				<groupId>org.springframework.boot</groupId>-->
    <!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
    <!--			</plugin>-->
    <!--		</plugins>-->
    <!--	</build>-->
    
    </project>
    
(2) 使用 "配置类"的方式 “定制消息发送组件”
  • 使用 "配置类"的方式 “定制消息发送组件(Publish/Subscribe 工作模式) 代码如下所示 :

    RabbitMQConfig.Java ( 配置类 ) :

    import org.springframework.amqp.core.*;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"
    
        /**
         * 使用基于"配置类"的方式定义"消息发送组件"
         */
        //1.定义定义fanout类型的交换器
        @Bean
        public Exchange fanout_exchange() {
            //创建一个名为"fanout_exchange" 的 "交换器"
            return ExchangeBuilder.fanoutExchange("fanout_exchange").build();
        }
    
        //2.定义两个不同名称的消息队列
        @Bean
        public Queue fanout_queue_email() { //创建消息队列
            //创建一个名为 "fanout_queue_email" 的 "消息队列"
            return new Queue("fanout_queue_email");
        }
        @Bean
        public Queue fanout_queue_sms() { //创建消息队列
            //创建一个名为 "fanout_queue_sms" 的 "消息队列"
            return new Queue("fanout_queue_sms");
        }
    
        //3.将两个不同名称的"消息队列"
        @Bean
        public Binding bindingEmail() { //将"fanout_queue_email"消息队列 和 "fanout_exchange"交换器 进行"绑定"
            return BindingBuilder.bind(fanout_queue_email()).to(fanout_exchange()).with("").noargs();
        }
        @Bean
        public Binding bindingSms() { //将"fanout_queue_sms"消息队列 和 "fanout_exchange"交换器 进行"绑定"
            return BindingBuilder.bind(fanout_queue_sms()).to(fanout_exchange()).with("").noargs();
        }
    
    }
    
(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"
        private Integer id;
        private String username;
    
        public Integer getId() {
            return id;
        }
    
        public void setId(Integer id) {
            this.id = id;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", username='" + username + '\'' +
                    '}';
        }
    }
    
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架提供的 RabbitTemplate 模板类实现消息发送示例代码如下 ( 在原测试类中加入以下"主要代码") :

    Chapter22ApplicationTests.java :

    import com.myh.chapter_22.domain.User;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter22ApplicationTests {
    
        @Test
        void contextLoads() {
        }
    
        @Autowired
        //引入进行"消息中间件"管理的 RabbitTemplate组件对象
        private RabbitTemplate rabbitTemplate;
    
       /**
        * "消息发送者" 发送信息
        */
        @Test
        public void psubPublisher() {
            User user = new User(); //消息发送者 要发送的"消息内容"
            user.setId(1);
            user.setUsername("石头");
            /*
              使用 RabbitTemplate中的
              convertAndSend(String exchange, String routingKey, Object object)方法完成 "消息发送者" "发送信息" 这一行为
                 String exchange : 表示发送信息的"交换器"
                 String routingKey : 表示路由键,因为此处实现的Publish/Subscrible工作模式,所以"不需要指定"
                 Object object : 表示要"发送的信息内容"
             */
            //执行哪个"交换器" 和 指定给该"交换器"的"消息内容"
            rabbitTemplate.convertAndSend("fanout_exchange","",user); //user 表示要发送的"信息内容"
        }
     }
    

    上述代码中,先使用@Autowired 注解引入了进行 消息中间件管理RabbitTemplate 组件对象,然后使用该模板工具类
    convertAndSend ( String exchange , String routingKey , Object object )方法进行消息发布。其中,该方法中的
    第 1个参数表示发送消息交换器
    ,这个参数值与之前定制交换器名称一致第2个参数表示路由键,因为实现的是Publish/Subscribe 工作模式所以不需要指定第3 个参数发送的消息内容接收 Object 类型

③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决 上述 消息中间件发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般 推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"
    
        @Bean //将该类的返回值对象加入到IOC容器中
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" 
        }
    }
    
(4) “消息消费者” 接收消息
  • 项目中创建 : service 包,并在该包下创建一个 针对RabbitMQ 消息中间件进行 "消息接收" 和 “处理” 的业务类 :RabbitMQService
    代码如下所示

    RabbitMQService.java

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    @Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类
    
        /**
         * Publish/Subscribe 工作模式接收、处理 "邮件业务"
         *
         * @RabbitListener(queues = "fanout_queue_email") :
         *  使用该注解监听"队列信息"后,一旦服务 "启动且监听到" 指定的队列有 "消息" 存在,该注解对应的方法就会 "接收并消费" 队列中的消息。
         *  
         */
    @RabbitListener(queues = "fanout_queue_email") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作
        public void psubConsumerEmail(Message message) {
            byte[] body = message.getBody();
            //转化为字符串
            String str = new String(body);
            System.out.println("邮件业务接收到的信息为: "+str);
        }
    
       /**
       * Publish/Subscribe 工作模式接收、处理 "短信业务"
       */
    @RabbitListener(queues = "fanout_queue_sms") //监听"队列信息",指定队列 "有信息" 时会 "调用" 该方法进行操作
      public void psubConsumerSms(Message message) {
          byte[] body = message.getBody();
          //转化为字符串
          String str = new String(body);
          System.out.println("短信业务接收到的信息为: "+str);
         }
      }
    

    上面的代码中创建了一个 接收处理 RabbitMQ消息业务处理类 : RabbitMQService,在该类中使用 Spring 框架提供的 @Rabbitistener 注解监听队列名称为 fanout_queue_emailfanout_queue_sms消息监听的这 两个 队列是 前面 “指定发送并存储” 消息消息队列

    需要说明的是,使用 @RabbitListener 注解 监听队列消息后,一旦服务启动且监听指定的队列消息存在 ( 目前两个队列都存在消息 ),对应注解方法会**立即接收并消费队列消息 ( 即注解对应方法会被调用 )。另外,在 接收消息的方法中,参数类型 可以与 发送的消息类型保持一致,或者使用 Object 类型Message 类型如果 使用与消息类型对应参数**接收消息的话,只能够得到 具体的消息体信息如果 使用 Object 或者 Message 类型参数 接收消息 的话,还可以获得 除了 消息体外的消息参数信息 MessageProperties


    此时 成功启动项目后控制台显示的消息消费效果下图所示

    在这里插入图片描述

    从上图可以看出项目启动成功后消息消费者监听” 到 “消息队列” 中存在两条消息,并进行了 各自的消费 ( 执行了 @Rabbitistener 注解 对应的"方法" )。与此同时,通过 RabbitMQ 可视化管理页面Queues 面板查看队列消息情况会发现两个队列中存储的消息 已经 被消费如下图所示

    在这里插入图片描述

    至此,一条完整消息发送、消息中间件存储消息消费Publish/Subscribe 工作模式的业务案例 已经实现


    ps

    使用的是开发中常用@RabbitListener注解 监听指定名称队列消息情况这种方式会在监听指定队列存在消息后立即进行消费处理。除此之外,还可以使用 RabbitTemplate 模板类receiveAndConvert ( String queueName )方法 手动消费指定队列中的消息

1.3 基于 “注解” 的方式 ( 实现 Publish/Subscribe "发布订阅"工作模式 ) -常用

  • 基于注解的方式指的是使用 Spring框架@RabbitListener注解 定制消息发送组件 "消息消费者" 接收信息 , 当然还要结合之前的代码,才能完整实现个功能需求
(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

application.properties ( 全局配置文件 ):

#配置RabbitMQ消息中间件的"连接配置"
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#配置RabbitMQ虚拟主机路径/ , 默认可省略
spring.rabbitmq.virtual-host=/

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>3.2.5</version>
       <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.myh</groupId>
    <artifactId>chapter_22</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>chapter_22</name>
    <description>chapter_22</description>
    <properties>
       <java.version>17</java.version>
    </properties>
    <dependencies>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
       </dependency>

       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
       </dependency>
       <dependency>
          <groupId>org.springframework.amqp</groupId>
          <artifactId>spring-rabbit-test</artifactId>
          <scope>test</scope>
       </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

<!--	<build>-->
<!--		<plugins>-->
<!--			<plugin>-->
<!--				<groupId>org.springframework.boot</groupId>-->
<!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
<!--			</plugin>-->
<!--		</plugins>-->
<!--	</build>-->

</project>
(2) 使用 “注解” 的方式定制 “消息发送组件” 和 “消息消费者” “接收消息”
  • 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息” , 实例代码如下

    RabbitMQService.java

    import com.myh.chapter_24.domain.User;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    @Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类
    
        /**
         * 使用基于注解的方式实现消息服务 ( 使用"注解"的方式 ①定制消息组件 ②接收信息)
         * 1.1 Publish/Subscribe工作模式接收、处理"邮件业务"
         */
        @RabbitListener(bindings = @QueueBinding(value =
                                   @Queue("fanout_queue_email"),exchange =
                                   @Exchange(value = "fanout_exchange",type = "fanout")))
        public void psubConsumerEmailAno(User user) {
            System.out.println("邮件业务接收到消息: "+user);
        }
    
         /**
       *
       * 1.2 Publish/Subscribe工作模式接收、处理"短信业务"
       */
      @RabbitListener(bindings = @QueueBinding(value =
                                 @Queue("fanout_queue_sms"),exchange =
                                 @Exchange(value = "fanout_exchange",type = "fanout")))
      public void psubConsumerSmsAno(User user) {
          System.out.println("短信业务接收到消息: "+user);
        }
     }
    

    上面代码中,使用 @RabbitListener 注解及 其相关属性定制了两个消息组件消费者,这两个消费者都接收实体类 User 并消费。在 @RabbitListener 注解中,bindings 属性用于创建并绑定交换器消息队列组件,需要注意的是,为了能使两个消息组件的消费者接受实体类User,需要我们在 定制交换器将交换器类型 type 设置为 fanout。另外,bindings 属性@QueueBinding 注解除了有 valuetype 属性外,还有key 属性用于定制路由键 : routingKey (当前发布订阅模式不需要)。

(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"
        private Integer id;
        private String username;
    
        public Integer getId() {
            return id;
        }
    
        public void setId(Integer id) {
            this.id = id;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", username='" + username + '\'' +
                    '}';
        }
    }
    
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架 提供的 RabbitTemplate 模板类实现消息发送示例代码如下 :

    Chapter22ApplicationTests.java :

    import com.myh.chapter_22.domain.User;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter22ApplicationTests {
    
        @Test
        void contextLoads() {
        }
    
        @Autowired
        //引入进行"消息中间件"管理的 RabbitTemplate组件对象
        private RabbitTemplate rabbitTemplate;
    
       /**
        * "消息发送者" 发送信息
        */
        @Test
        public void psubPublisher() {
            User user = new User(); //消息发送者 要发送的"消息内容"
            user.setId(1);
            user.setUsername("石头");
            /*
              使用 RabbitTemplate中的
              convertAndSend(String exchange, String routingKey, Object object)方法完成 "消息发送者" "发送信息" 这一行为
                 String exchange : 表示发送信息的"交换器"
                 String routingKey : 表示路由键,因为此处实现的Publish/Subscrible工作模式,所以"不需要指定"
                 Object object : 表示要"发送的信息内容"
             */
            //执行哪个"交换器" 和 指定给该"交换器"的"消息内容"
            rabbitTemplate.convertAndSend("fanout_exchange","",user); //user 表示要发送的"信息内容"
        }
     }
    

    上述代码中,先使用@Autowired 注解引入了进行 消息中间件管理RabbitTemplate 组件对象,然后使用该模板工具类
    convertAndSend ( String exchange , String routingKey , Object object )方法进行消息发布。其中,该方法中的
    第 1个参数表示发送消息交换器
    ,这个参数值与之前定制交换器名称一致第2个参数表示路由键,因为实现的是Publish/Subscribe 工作模式所以不需要指定第3 个参数发送的消息内容接收 Object 类型

③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决上述 消息中间件 发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"
    
        @Bean //将该类的返回值对象加入到IOC容器中
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" 
        }
    }
    
(4) 控制台查看 “消费者” 消费信息情况
  • 此时 成功启动项目后执行psubPublisher( )方法控制台显示的消息消费效果下图所示

    在这里插入图片描述

  • 至此,在 Spring Boot 中完成了 使用基于 API基于配置类基于注解3 种方式来实现 Publish/Subscribe 工作模式整合讲解 在这 3 种实现消息服务的方式中,

  • 上面讲述过三种配置方式区别
    基于 API 的方式
    相对简单、直观
    ,但容易与业务代码产生 耦合

    基于 配置类方式相对隔离 容易统一管理、符合Spring Boot 框架思想;

    基于 注解的方式 清晰明了方便各自管理,但是也容易与业务代码产生耦合。在实际开发中,使用 基于配置类的方式 和 基于注解的方式定制组件 实现消息服务较为常见。使用 基于 API的方式偶尔使用,具体还需要根据实际情况进行选择

二、Spring Boot 整合 整合实现 : Routing ( 路由模式 ) 工作模式

2.1 基于 “注解” 的方式 ( 实现 Routing "路由模式"工作模式 )

(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

    application.properties ( 全局配置文件 ):

    #配置RabbitMQ消息中间件的"连接配置"
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    #配置RabbitMQ虚拟主机路径/ , 默认可省略
    spring.rabbitmq.virtual-host=/
    

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-parent</artifactId>
           <version>3.2.5</version>
           <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.myh</groupId>
        <artifactId>chapter_22</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>chapter_22</name>
        <description>chapter_22</description>
        <properties>
           <java.version>17</java.version>
        </properties>
        <dependencies>
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-amqp</artifactId>
           </dependency>
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-web</artifactId>
           </dependency>
    
           <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-test</artifactId>
              <scope>test</scope>
           </dependency>
           <dependency>
              <groupId>org.springframework.amqp</groupId>
              <artifactId>spring-rabbit-test</artifactId>
              <scope>test</scope>
           </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
    <!--	<build>-->
    <!--		<plugins>-->
    <!--			<plugin>-->
    <!--				<groupId>org.springframework.boot</groupId>-->
    <!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
    <!--			</plugin>-->
    <!--		</plugins>-->
    <!--	</build>-->
    
    </project>
    
(2) 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息”
  • 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息” , 实例代码如下

    RabbitMQService.java

    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    @Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类
    
        //使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息"
    
        /**
         * 2.1路由模式消息接收、处理error级别日志信息
         * (使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息")
         */
        @RabbitListener(bindings = @QueueBinding(value =
                                   @Queue("routing_queue_error"), exchange =
                                   @Exchange(value = "routing_exchange", type = "direct"),
                       key = "error_routing_key"))
        public void routingConsumerError(String message) {
            System.out.println("接收到error级别日志信息: "+message);
        }
    
    
        /**
         * 2.2路由模式消息接收、处理info、error、warning级别日志信息
         * (使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息")
         */
        @RabbitListener(bindings = @QueueBinding(value =
                                   @Queue("routing_queue_all"), exchange =
                                   @Exchange(value = "routing_exchange", type = "direct"),
                        key = {"error_routing_key","info_routing_key","warning_routing_key"}))
        public void routingConsumerAll(String message) {
            System.out.println("接收到info、error、warning级别日志信息: "+message);
        }
    
    }
    
    

    上述代码中,在消息业务处理类 : RabbitMQService 中编写了两个用来 处理 Routing 路由模式消息消费者方法,在两个消费者方法上使用 @RabbitListener 注解及其相关属性定制路由模式消息服务组件

    Routing路由模式下的交换器类型 : type属性direct,而且还必须 指定 key 属性
    ( 每个消息队列可以
    映射多个路由键
    但在 Spring Boot 1.X版本中@QueueBinding 中的 key 属性只接收 Spring 类型不接收 Spring [ ] 类型 )。

(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"
        private Integer id;
        private String username;
    
        public Integer getId() {
            return id;
        }
    
        public void setId(Integer id) {
            this.id = id;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", username='" + username + '\'' +
                    '}';
        }
    }
    
② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架 提供的 RabbitTemplate 模板类实现消息发送示例代码如下 :

    Chapter24ApplicationTests.java : ( 项目测试类 )

    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter24ApplicationTests {
    
        @Test
        void contextLoads() {
        }
    
        @Autowired
        //引入进行"消息中间件"管理的 RabbitTemplate组件对象
        private RabbitTemplate rabbitTemplate;
    
        //Routing工作模式消息发送端 ( "消息发布者" "发送信息" )
        @Test
        public void routingPublisher() {
            //发送信息
            //给指定的"交换器" "发送信息"
            rabbitTemplate.convertAndSend("routing_exchange","error_routing_key","routing send 错误信息...");
        }
    
    }
    

    上述代码中,通过调用 RabbitTemplateconverAndSend ( String exchange , String routingKey , Object object )方法来 "发送消息"。在 Routing 工作模式下发送消息时,必须指定路由键参数该参数要与消息队列映射路由键保持一致,否则发送的消息将会丢失

    本次示例中使用的是error_routing_key 路由键,根据定制规则,编写的两个消息消费者方法应该都可以正常接收并消费发送端发送消息

③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决上述 消息中间件 发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般 推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"
    
        @Bean //将该类的返回值对象加入到IOC容器中
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" 
        }
    }
    
(4) 控制台 查看 “消费者” 消费信息情况
  • 此时 成功启动项目后执行routingPublisher( )方法控制台显示的消息消费效果下图所示

    在这里插入图片描述


    此时,修改 routingPublisher( )方法中的消息发送参数调整发送info 级别日志信息( 注意同修改 info_routing_key 路由键 ),再次启动 routingPublisher( )方法控制台效果如下图所示

    在这里插入图片描述

    上图所示,控制台打印出使用 info_routing_key 路由键发送 info 级别日志信息,说明只有配置映射 info_routing_key 路由键消息消费者 的方法 消费了消息

三、Spring Boot 整合 整合实现 : Topics ( 通配符模式 ) 工作模式

3.1 基于 “注解” 的方式 ( 实现 Topics"通配符模式"工作模式 )

(1) 创建项目,“全局配置文件” 中配置信息
  • 创建项目,"全局配置文件"中 配置信息 :

    在这里插入图片描述

application.properties ( 全局配置文件 ):

#配置RabbitMQ消息中间件的"连接配置"
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#配置RabbitMQ虚拟主机路径/ , 默认可省略
spring.rabbitmq.virtual-host=/

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>3.2.5</version>
       <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.myh</groupId>
    <artifactId>chapter_22</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>chapter_22</name>
    <description>chapter_22</description>
    <properties>
       <java.version>17</java.version>
    </properties>
    <dependencies>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
       </dependency>

       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
       </dependency>
       <dependency>
          <groupId>org.springframework.amqp</groupId>
          <artifactId>spring-rabbit-test</artifactId>
          <scope>test</scope>
       </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

<!--	<build>-->
<!--		<plugins>-->
<!--			<plugin>-->
<!--				<groupId>org.springframework.boot</groupId>-->
<!--				<artifactId>spring-boot-maven-plugin</artifactId>-->
<!--			</plugin>-->
<!--		</plugins>-->
<!--	</build>-->

</project>
(2) 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息”
  • 使用 “注解” 的方式 “定制消息发送组件” 和 “消息消费者” “接收消息” , 实例代码如下

    RabbitMQService.java

    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    @Service //加入到IOC容器中
    public class RabbitMQService { //针对RabbitMQ消息中间件的 进行"消息接收和处理" 的 业务类
    
        //使用注解的方式定制"消息组件" 和 定制"消息消费者"且 "接收信息"
    
        /**
         * 3.1使用"通配符模式"消息接收、进行"邮件业务"订阅处理
         *   ---( 定制"消息组件" 和 定制"消息消费者"且 "接收信息" )
         */
        @RabbitListener(bindings = @QueueBinding(value =
                                   @Queue("topic_queue_email"),exchange =
                                   @Exchange(value = "topic_exchange",type = "topic"),
                                   key = "info.#.email.#"))
        public void topicConsumerEmail(String message) {
            System.out.println("接收到邮件订阅需求处理信息: "+message);
        }
    
          /**
       * 3.2使用"通配符模式"消息接收、进行"短信业务"订阅处理
       *   ---( 定制"消息组件" 和 定制"消息消费者"且 "接收信息" )
       */
      @RabbitListener(bindings = @QueueBinding(value =
                                 @Queue("topic_queue_sms"),exchange =
                                 @Exchange(value = "topic_exchange",type = "topic"),
                                 key = "info.#.sms.#"))
      public void topicConsumerSms(String message) {
          System.out.println("接收到短信订阅需求处理信息: "+message);
        }
     }
    

    上述代码中,在消息业务处理类RabbitMQService 中编写了两个处理 Topics 通配符模式消息消费者方法 ( 这两个方法即创建了"消息组件",又创建了"消息消费者接收且消费信息 "),在两个消费者方法上使用 @RabbitListener 注解及其相关属性定制了通配符模式下的 消息组件

    从上述示例可以看出Topics 通配符模式下注解使用方式与 Routing 路由模式使用基本一样,主要是将交换器类型 type 修改为了 topic,还分别使用通配符的样式指定路由键 routingKey

(3) “消息发送者” 发送消息 :
① 创建 “实体类对象” ( 存储发送的"消息内容" )
  • 完成消息组件定制工作后,创建消息发送者/消息发布者 "发送消息"到 "消息队列"中。发送消息时,借助一个实体类传递消息,需要 预先创建一个实体类对象 ,在项目中创建 domain,在其中创建实体类

    User.java

    public class User { //消息发送者 "发送信息" 时借助的 "实体类对象" , 该对象中存储发送的"消息内容"
        private Integer id;
        private String username;
    
        public Integer getId() {
            return id;
        }
    
        public void setId(Integer id) {
            this.id = id;
        }
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", username='" + username + '\'' +
                    '}';
        }
    }
    

    针对不同的用户订阅需求,使用 RabbitTemplate 模板工具类的 convertAndSend ( Stringexchange,String routingKey,Object object )方法发送不同的消息发送消息时,必须 根据具体需求已经定制路由键通配符 设置 具体的路由键

② 使用 “RabbitTemplate模板类” 实现 “消息发送”
  • 项目测试类 中使用 Spring 框架 提供的 RabbitTemplate 模板类实现消息发送示例代码如下 :

    Chapter24ApplicationTests.java : ( 项目测试类 )

    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    class Chapter24ApplicationTests {
    
        @Test
        void contextLoads() {
        }
    
        @Autowired
        //引入进行"消息中间件"管理的 RabbitTemplate组件对象
        private RabbitTemplate rabbitTemplate;
    
        //3.Topcis工作模式下的"消息发布者" "发送消息"
        @Test
        public void topicPublisher() {
               //(1)只发送"邮件"订阅用户信息 ---(给指定的"交换器" "发送信息")
            rabbitTemplate.convertAndSend("topic_exchange","info.email","topics send email message...");
    
              //(2)只发送"短信"订阅用户信息
            rabbitTemplate.convertAndSend("topic_exchange","info.sms","topics send  sms message...");
    
              //(3)"邮件"订阅用户 和 "短信"订阅用户 都发送消息
            rabbitTemplate.convertAndSend("topic_exchange","info.email.sms","topics send email and sms  message...");
        }
    
      }
    
③ 自定义 “消息转换器” ( 对"消息"进行转换,保持较好的"可视化效果")
  • 执行上述 消息发送测试方法 : psubPublisher( )控制台执行效果下图所示 :

    在这里插入图片描述

    如果要 解决上述 消息中间件 发送 实体类消息出现异常,通常可以采用两种解决方案 :

    第一种 是执行 JDK 自带serializable 序列化接口 ;

    第二种定制其他类型消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后可视化效果较差转换后的消息无法辨识,所以一般 推荐使用第二种方式配置代码如下所示 :

  • 自定义 “消息转换器”配置代码如下所示

    RabbitMQConfig.java

    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration //标记该类为"配置类"
    public class RabbitMQConfig { //关于RabbitMQ消息中间件的"配置类"
    
        @Bean //将该类的返回值对象加入到IOC容器中
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter(); //创建一个Jackson2JsonMessageConverter 类型的 "消息转换器组件" 
        }
    }
    
(4) 控制台 查看 “消费者” 消费信息情况
  • 执行topicPublisher( ) 方法 中的 步骤(1)邮件订阅用户的消息发送控制台效果下图所示 :

    在这里插入图片描述


    在这里插入图片描述

  • topicPublisher( )方法步骤(1) 进行 注释打开步骤(2) 中只进行 短信订阅用户消息发送方法,并 再次启动该测试方法控制台效果如下图所示 : 在这里插入图片描述


在这里插入图片描述

  • 为了查看 topicPublisher( )方法步骤(3)的效果,我们需要把 步骤(2) 的代码注释步骤(3) 的代码主要进行 邮件短信订阅用户消息发送方法项目重新启动 后的效果如下图所示 :

    在这里插入图片描述


在这里插入图片描述