Springboot仿抖音app开发之RabbitMQ 异步解耦(进阶)

发布于:2025-06-23 ⋅ 阅读:(13) ⋅ 点赞:(0)

Springboot仿抖音app开发之评论业务模块后端复盘及相关业务知识总结

Springboot仿抖音app开发之粉丝业务模块后端复盘及相关业务知识总结

Springboot仿抖音app开发之用短视频务模块后端复盘及相关业务知识总结

Springboot仿抖音app开发之用户业务模块后端复盘及相关业务知识总结

Springboot仿抖音app开发之消息业务模块后端复盘及相关业务知识总结 

为什么需要接口解耦

1. 数据重要性分级处理

在实际业务系统中,数据通常被分为不同重要级别:

重要数据(关键业务数据)

  • 用户账户信息、交易记录、订单数据
  • 需要强一致性和ACID特性
  • 通常存储在关系型数据库(MySQL、PostgreSQL等)

非重要数据(辅助业务数据)

  • 用户行为日志、消息通知、统计数据
  • 可以容忍最终一致性
  • 适合存储在NoSQL数据库(MongoDB、Redis等)

2. 接口解耦的核心优势

故障隔离

  • 重要数据操作失败不影响非重要数据的处理
  • MongoDB服务异常不会阻塞核心业务流程
  • 提高系统整体可用性

为什么MongoDB服务异常会阻塞核心业务流程 

问题场景分析

1. 未解耦的情况(会阻塞核心业务)

问题

  • 如果MongoDB服务异常,整个订单处理流程都会失败
  • 核心的订单数据无法保存,影响业务连续性
  • 一个非关键功能的故障导致关键业务无法进行

2. 故障隔离的好处

业务连续性保障

  • 核心业务(订单创建)不受MongoDB故障影响
  • 用户可以正常下单,不会感知到系统部分组件的异常

系统健壮性提升

  • 不同重要级别的数据采用不同的处理策略
  • 非关键功能的故障不会造成系统雪崩

运维友好

  • 可以独立维护和升级MongoDB服务
  • MongoDB的性能调优不会影响核心业务

3. 实际案例

假设一个电商系统:

核心流程:用户下单 → 扣减库存 → 生成订单 → 扣款 辅助功能:记录用户行为 → 发送消息通知 → 更新推荐算法数据

如果没有解耦,MongoDB异常会导致:

  • 用户无法下单
  • 订单系统完全瘫痪
  • 收入损失

解耦后的效果:

  • 用户正常下单和支付
  • 部分通知功能暂时不可用(用户基本无感知)
  • 系统稳定运行,收入不受影响

工厂与批发商的故事

如果没有中间件(微信群)生产者给消费者发消息需要逐个去发送对应的消息,有了中间件之后只需要 统一发送就行,消费者去找自己对应的消息

 RabbitMQ

 

1. 异步任务处理

  • 场景:耗时操作(如发送邮件、生成报表、图片处理)不适合阻塞主流程。
  • 实现:将任务放入消息队列,由消费者异步处理。
  • 优势
    • 主程序快速响应(如用户注册后立即返回,邮件发送由队列处理)。
    • 避免因任务失败导致主流程崩溃。

2. 系统提速

  • 场景:高延迟操作(如数据库写入、第三方API调用)拖慢整体性能。
  • 实现:主程序发布消息后立即返回,消费者逐步处理。
  • 示例
    • 电商下单后,库存扣减和日志记录通过队列异步执行。
    • 吞吐量提升:队列充当缓冲区,允许系统以最大承受速度处理任务。

3. 接口解耦

  • 场景:系统间直接调用导致强依赖(如支付系统与物流系统)。
  • 实现:通过消息队列间接通信,生产者无需知道消费者细节。
  • 优势
    • 系统可独立扩展或升级(如新增一个消费者不会影响生产者)。
    • 协议灵活性:不同语言/框架的系统可通过标准协议(如AMQP)交互。

4. 流量削峰(Peak Shaving)

  • 场景:突发流量(如秒杀活动)可能压垮后端服务。
  • 实现:将请求放入队列,消费者按固定速率处理。
  • 关键点
    • 队列缓冲超载请求,避免服务崩溃。
    • 配合限流策略(如设置队列最大长度),保证系统稳定。

核心组件及其关系

1. 生产者 (Producer)

  • 作用: 消息的发送方,类似于写信的人
  • 职责: 创建消息并发送到交换机
  • 特点: 生产者不直接将消息发送给队列,而是发送给Exchange

2. 交换机 (Exchange)

  • 作用: 消息的邮局/分拣中心
  • 职责: 接收生产者的消息,根据路由规则决定消息发送到哪个队列
  • 类型:
    • Direct: 精确匹配路由键
    • Topic: 支持通配符匹配 (* 和 #)
    • Fanout: 广播到所有绑定的队列
    • Headers: 根据消息头属性路由

3. 队列 (Queue)

  • 作用: 消息的邮箱
  • 职责: 存储消息,等待消费者来获取
  • 特点: 先进先出(FIFO)的数据结构

4. 消费者 (Consumer)

  • 作用: 消息的接收方,类似于收信的人
  • 职责: 从队列中获取并处理消息

工作流程

生产者 → Exchange → Queue → 消费者
   ↓        ↓        ↓        ↓
  写信   →  邮局   → 邮箱  →  收信人

绑定关系 (Binding)

绑定是连接Exchange和Queue的路由规则

// 您代码中的绑定示例
.bind(queue)           // 绑定队列
.to(exchange)          // 到交换机  
.with("sys.msg.*")     // 路由键规则

这意味着:

  • 当生产者发送路由键为 sys.msg.login 的消息时 → 会路由到 queue_sys_msg
  • 当发送路由键为 user.info.update 的消息时 → 不会路由到此队列

实际应用场景举例

场景: 系统消息通知

  1. 生产者: 用户服务

    // 发送登录消息
    rabbitTemplate.convertAndSend("exchange_msg", "sys.msg.login", "用户张三登录");
    
  2. Exchange: exchange_msg (Topic类型)

    • 接收到路由键 sys.msg.login 的消息
  3. 路由判断:

    • sys.msg.login 匹配 sys.msg.* 规则 ✅
    • 消息被路由到 queue_sys_msg
  4. 队列: queue_sys_msg

    • 存储消息等待处理
  5. 消费者: 系统通知服务

    @RabbitListener(queues = "queue_sys_msg")
    public void handleSysMessage(String message) {
        // 处理系统消息
        logger.info("收到系统消息: " + message);
    }
    

关键理解点

  1. 解耦性: 生产者不需要知道具体哪个消费者会处理消息
  2. 灵活性: 通过不同的Exchange类型和绑定规则,可以实现各种消息路由策略
  3. 可靠性: 消息持久化、队列持久化确保消息不会丢失
  4. 扩展性: 可以轻松添加新的队列和消费者

集成Rabbitmq - 引入配置和依赖

        <!-- 引入 RabbitMQ 依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    virtual-host: imooc-red-book

集成Rabbitmq - 创建交换机和队列

 我们来看完整代码

package com.imooc;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    /**
     * 根据模型编写代码:
     * 1. 定义交换机
     * 2. 定义队列
     * 3. 创建交换机
     * 4. 创建队列
     * 5. 队列和交换机的绑定
     */

    public static final String EXCHANGE_MSG = "exchange_msg";

    public static final String QUEUE_SYS_MSG = "queue_sys_msg";

    @Bean(EXCHANGE_MSG)
    public Exchange exchange() {
        return ExchangeBuilder                      // 构建交换机
                .topicExchange(EXCHANGE_MSG)        // 使用topic类型,参考:https://www.rabbitmq.com/getstarted.html
                .durable(true)                      // 设置持久化,重启mq后依然存在
                .build();
    }

    @Bean(QUEUE_SYS_MSG)
    public Queue queue() {
        return new Queue(QUEUE_SYS_MSG);
    }

    @Bean
    public Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange,
                           @Qualifier(QUEUE_SYS_MSG) Queue queue) {

        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("sys.msg.*")          // 定义路由规则(requestMapping)
                .noargs();

        // FIXME: * 和 # 分别代表什么意思?
    }


}

Spring Boot自动创建机制

当Spring容器启动时,会自动扫描所有标注了@Bean的方法,并将返回值注册到Spring容器中。对于RabbitMQ组件,Spring AMQP会自动检测这些Bean并在RabbitMQ服务器上创建对应的实体。

代码执行流程

1. 创建交换机

@Bean(EXCHANGE_MSG)
public Exchange exchange() {
    return ExchangeBuilder
        .topicExchange(EXCHANGE_MSG)    // 创建名为"exchange_msg"的Topic交换机
        .durable(true)                  // 持久化,服务器重启后不会丢失
        .build();
}

2. 创建队列

@Bean(QUEUE_SYS_MSG)
public Queue queue() {
    return new Queue(QUEUE_SYS_MSG);    // 创建名为"queue_sys_msg"的队列
}

3. 建立绑定关系

@Bean
public Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange,
                      @Qualifier(QUEUE_SYS_MSG) Queue queue) {
    return BindingBuilder
        .bind(queue)                    // 绑定队列
        .to(exchange)                   // 到交换机
        .with("sys.msg.*")              // 使用路由键模式
        .noargs();
}

Topic交换机的路由规则

关于您代码中的FIXME注释,Topic类型的通配符含义:

  • *(星号): 匹配一个单词

    • sys.msg.* 可以匹配:sys.msg.loginsys.msg.logoutsys.msg.error
    • 不能匹配:sys.msg.user.login(因为有两个单词)
  • #(井号): 匹配零个或多个单词

    • sys.msg.# 可以匹配:sys.msgsys.msg.loginsys.msg.user.login.success

集成Rabbitmq - 创建生产者,配置路由规则

1. 添加依赖 (pom.xml)

<dependency>


    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>


</dependency>

2. 配置文件 (application.yml)

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /

3. RabbitMQ配置类 (已有)

@Configuration
public class RabbitMQConfig {
    public static final String EXCHANGE_MSG = "exchange_msg";
    public static final String QUEUE_SYS_MSG = "queue_sys_msg";
    
    // 创建Topic交换机
    @Bean(EXCHANGE_MSG)
    public Exchange exchange() {
        return ExchangeBuilder
            .topicExchange(EXCHANGE_MSG)
            .durable(true)
            .build();
    }
    
    // 创建队列
    @Bean(QUEUE_SYS_MSG)
    public Queue queue() {
        return new Queue(QUEUE_SYS_MSG);
    }
    
    // 绑定关系:队列绑定到交换机,使用路由规则
    @Bean
    public Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange,
                          @Qualifier(QUEUE_SYS_MSG) Queue queue) {
        return BindingBuilder
            .bind(queue)
            .to(exchange)
            .with("sys.msg.*")  // 关键:路由键规则
            .noargs();
    }
}
1. 核心常量定义
public static final String EXCHANGE_MSG = "exchange_msg";
public static final String QUEUE_SYS_MSG = "queue_sys_msg";
  • 作用: 定义交换机和队列的名称常量
  • 好处: 避免硬编码,便于维护和引用
2. Topic交换机配置
@Bean(EXCHANGE_MSG)
public Exchange exchange() {
    return ExchangeBuilder
        .topicExchange(EXCHANGE_MSG)    // 创建Topic类型交换机
        .durable(true)                  // 持久化配置
        .build();
}

Topic交换机特点

  • 类型: Topic Exchange(主题交换机)
  • 路由规则: 支持通配符匹配
    • * :匹配一个单词
    • # :匹配零个或多个单词
  • 持久化durable(true) 确保服务器重启后交换机不丢失
3. 队列配置
@Bean(QUEUE_SYS_MSG)
public Queue queue() {
    return new Queue(QUEUE_SYS_MSG);
}
  • 队列名queue_sys_msg
  • 用途: 存储系统消息
  • 默认配置: 非持久化队列(可以改为持久化)
4. 绑定关系配置
@Bean
public Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange,
                      @Qualifier(QUEUE_SYS_MSG) Queue queue) {
    return BindingBuilder
        .bind(queue)           // 绑定队列
        .to(exchange)          // 到交换机
        .with("sys.msg.*")     // 使用路由规则
        .noargs();
}

 

4. 生产者控制器

@RestController
public class ProducerController {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @GetMapping("produce")
    public Object produce() throws Exception {
        // 发送消息到指定交换机,使用特定路由键
        rabbitTemplate.convertAndSend(
            RabbitMQConfig.EXCHANGE_MSG,    // 交换机名称
            "sys.msg.send",                 // 路由键
            "我发了一个消息\~\~"                // 消息内容
        );
        
        return GraceJSONResult.ok();
    }
}

集成Rabbitmq - 消费者接受消息处理业务 

package com.imooc;

import com.imooc.enums.MessageEnum;
import com.imooc.exceptions.GraceException;
import com.imooc.grace.result.ResponseStatusEnum;
import com.imooc.mo.MessageMO;
import com.imooc.service.MsgService;
import com.imooc.utils.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class RabbitMQConsumer {

    @Autowired
    private MsgService msgService;

    @RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
    public void watchQueue(String payload, Message message) {
        log.info(payload);



        String routingKey = message.getMessageProperties().getReceivedRoutingKey();
        log.info(routingKey);


    }


}

1. 消费者组件定义

@Slf4j
@Component
public class RabbitMQConsumer {
  • @Component: 将此类注册为Spring Bean
  • @Slf4j: 自动生成日志对象,用于记录日志

2. 队列监听

@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
public void watchQueue(String payload, Message message) {
  • @RabbitListener: 监听指定队列的注解
  • queues = {RabbitMQConfig.QUEUE_SYS_MSG}: 监听名为 queue_sys_msg 的队列
  • String payload: 接收消息的具体内容
  • Message message: 完整的消息对象,包含元数据

3. 消息处理逻辑

log.info(payload);  // 打印消息内容
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
log.info(routingKey);  // 打印路由键

完整工作机制

监听机制

  1. 当应用启动时,Spring会自动扫描带有 @RabbitListener 的方法
  2. 为该方法创建一个消息监听容器
  3. 容器会持续监听 queue_sys_msg 队列

消息处理流程

  1. 接收消息: 当队列中有新消息时,自动触发 watchQueue 方法
  2. 解析内容: 获取消息的文本内容 (payload)
  3. 提取路由键: 从消息属性中获取路由键信息
  4. 记录日志: 将消息内容和路由键打印到控制台

rabbitTemplate.convertAndSend 方法详解 

这行代码是RabbitMQ异步消息发送的核心部分,让我逐个参数详细解析:

rabbitTemplate.convertAndSend(
    RabbitMQConfig.EXCHANGE_MSG,
    "sys.msg." + MessageEnum.FOLLOW_YOU.enValue,
    JsonUtils.objectToJson(messageMO)
);

参数解析

参数1: 交换机名称

RabbitMQConfig.EXCHANGE_MSG
  • 作用: 指定消息要发送到哪个交换机
  • 实际值: 通常是 "exchange_msg" (常量值)
  • 工作方式: 交换机接收所有消息,并根据路由规则分发

参数2: 路由键

"sys.msg." + MessageEnum.FOLLOW_YOU.enValue
  • 拼接结果"sys.msg.follow"
  • 作用: 决定消息如何被路由到目标队列
  • 匹配规则: 与交换机绑定时定义的模式进行匹配
    • 如: sys.msg.* 会匹配此路由键

参数3: 消息内容

JsonUtils.objectToJson(messageMO)
  • 输入messageMO (消息对象)
  • 转换过程: 对象 → JSON字符串
  • 输出示例:
{
  "fromUserId": "用户123",
  "toUserId": "博主456",
  "msgContent": null
}
  • 传输形式: 字节数组形式在网络上传输

 异步解耦 - 系统消息入库保存

阶段一:关注操作(生产者)

@Transactional
@Override
public void doFollow(String myId, String vlogerId) {
    // 1. 核心业务逻辑(同步)
    String fid = sid.nextShort();
    Fans fans = new Fans();
    // ... 设置粉丝关系
    fansMapper.insert(fans);  // 插入粉丝关系到数据库
    
    // 2. 构建消息对象
    MessageMO messageMO = new MessageMO();
    messageMO.setFromUserId(myId);     // 关注者
    messageMO.setToUserId(vlogerId);   // 被关注者
    
    // 3. 异步发送消息(关键点)
    rabbitTemplate.convertAndSend(
        RabbitMQConfig.EXCHANGE_MSG,
        "sys.msg." + MessageEnum.FOLLOW_YOU.enValue,  // 路由键:sys.msg.follow
        JsonUtils.objectToJson(messageMO)
    );
    
    // 事务提交,关注操作立即完成!
}

阶段二:消息消费(消费者)

@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
public void watchQueue(String payload, Message message) {
    // 1. 解析JSON消息
    MessageMO messageMO = JsonUtils.jsonToPojo(payload, MessageMO.class);
    String routingKey = message.getMessageProperties().getReceivedRoutingKey();
    
    // 2. 根据路由键判断消息类型并处理
    if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.FOLLOW_YOU.enValue)) {
        // 异步执行系统消息入库
        msgService.createMsg(
            messageMO.getFromUserId(),    // 关注者ID
            messageMO.getToUserId(),      // 被关注者ID  
            MessageEnum.FOLLOW_YOU.type,  // 消息类型:关注(1)
            null                         // 无额外内容
        );
    }
    // ... 处理其他消息类型
}

异步解耦的核心优势

1. 事务分离

// 主事务:关注业务
@Transactional  
public void doFollow() {
    fansMapper.insert(fans);        // 核心业务
    rabbitTemplate.convertAndSend(); // 发送MQ(非阻塞)
}  // 事务立即提交

// 独立处理:消息入库
@RabbitListener
public void watchQueue() {
    msgService.createMsg();  // 在独立的事务中处理
}

2. 时序对比

传统同步方式

用户点击关注
    ↓
[开始事务]
    ├── 插入粉丝关系     (50ms)
    ├── 更新互关状态     (30ms)  
    └── 创建系统消息     (100ms)  ← 可能慢
[提交事务]             (180ms总耗时)
    ↓
返回成功给用户

MQ异步方式

用户点击关注
    ↓
[开始事务]
    ├── 插入粉丝关系     (50ms)
    ├── 更新互关状态     (30ms)
    └── 发送MQ消息       (5ms)   ← 超快
[提交事务]             (85ms总耗时)
    ↓
返回成功给用户          ← 用户立即看到结果

[后台异步]
    └── 创建系统消息     (100ms)  ← 后台处理

3. 系统消息入库的异步处理

消息流转过程

// 发送的JSON消息
{
  "fromUserId": "user123",
  "toUserId": "vlogger456"
}

// 路由键
"sys.msg.follow"

// 最终入库的系统消息
INSERT INTO sys_msg (
  from_user_id = 'user123',
  to_user_id = 'vlogger456', 
  msg_type = 1,              -- FOLLOW_YOU类型
  msg_content = null,
  create_time = now()
);

容错和可靠性保障

1. 消息持久化

// RabbitMQ配置
@Bean
public Queue queue() {
    return QueueBuilder
        .durable("queue_sys_msg")  // 持久化队列
        .build();
}

2. 失败重试机制

@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
public void watchQueue(String payload, Message message) {
    try {
        // 处理消息
        msgService.createMsg(...);
    } catch (Exception e) {
        log.error("处理消息失败: {}", e.getMessage());
        // 消息会自动重新入队重试
        throw e;  // 抛出异常触发重试
    }
}

其他相关的操作也同样进行异步解耦即可,我们已经在消费者模型中做了if判断处理

package com.imooc;

import com.imooc.base.RabbitMQConfig;
import com.imooc.enums.MessageEnum;
import com.imooc.exceptions.GraceException;
import com.imooc.grace.result.ResponseStatusEnum;
import com.imooc.mo.MessageMO;
import com.imooc.service.MsgService;
import com.imooc.utils.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class RabbitMQConsumer {

    @Autowired
    private MsgService msgService;

    @RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
    public void watchQueue(String payload, Message message) {
        log.info(payload);

        MessageMO messageMO = JsonUtils.jsonToPojo(payload, MessageMO.class);

        String routingKey = message.getMessageProperties().getReceivedRoutingKey();
        log.info(routingKey);


        if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.FOLLOW_YOU.enValue)) {
            msgService.createMsg(messageMO.getFromUserId(),
                    messageMO.getToUserId(),
                    MessageEnum.FOLLOW_YOU.type,
                    null);
        } else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.LIKE_VLOG.enValue)) {
            msgService.createMsg(messageMO.getFromUserId(),
                    messageMO.getToUserId(),
                    MessageEnum.FOLLOW_YOU.type,
                    messageMO.getMsgContent());
        } else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.COMMENT_VLOG.enValue)) {
            msgService.createMsg(messageMO.getFromUserId(),
                    messageMO.getToUserId(),
                    MessageEnum.COMMENT_VLOG.type,
                    messageMO.getMsgContent());
        } else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.REPLY_YOU.enValue)) {
            msgService.createMsg(messageMO.getFromUserId(),
                    messageMO.getToUserId(),
                    MessageEnum.REPLY_YOU.type,
                    messageMO.getMsgContent());
        } else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.LIKE_COMMENT.enValue)) {
            msgService.createMsg(messageMO.getFromUserId(),
                    messageMO.getToUserId(),
                    MessageEnum.LIKE_COMMENT.type,
                    messageMO.getMsgContent());
        } else {
            GraceException.display(ResponseStatusEnum.SYSTEM_OPERATION_ERROR);
        }

    }


}

消息流转的完整过程

1. 发送端(生产者)

// 关注操作中发送消息
rabbitTemplate.convertAndSend(
    RabbitMQConfig.EXCHANGE_MSG,           // 交换机:exchange_msg
    "sys.msg." + MessageEnum.FOLLOW_YOU.enValue,  // 路由键:sys.msg.follow
    JsonUtils.objectToJson(messageMO)      // JSON消息
);

2. RabbitMQ路由过程

消息发送到交换机 "exchange_msg"
        ↓
交换机根据路由键 "sys.msg.follow" 进行路由判断
        ↓  
匹配绑定规则:queue_sys_msg 绑定了 "sys.msg.*"
        ↓
"sys.msg.follow" 匹配 "sys.msg.*" ✅
        ↓
消息被路由到队列 "queue_sys_msg"
        ↓
消息在队列中等待被消费

3. 消费端(监听器)

@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
public void watchQueue(String payload, Message message) {
    // 自动监听队列,有消息就触发此方法
}

监听机制的工作原理

@RabbitListener 注解的作用

@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})

Spring Boot启动时

  1. 扫描注解: Spring扫描到 @RabbitListener 注解
  2. 创建监听容器: 为该方法创建一个 MessageListenerContainer
  3. 建立连接: 连接到RabbitMQ服务器
  4. 监听队列: 持续监听 queue_sys_msg 队列
  5. 等待消息: 进入阻塞状态,等待队列中有新消息

消息到达时

[队列中有新消息]
        ↓
[监听容器检测到消息]
        ↓
[自动调用 watchQueue 方法]
        ↓
[传入消息内容和元数据]

消息处理的具体流程

参数接收

public void watchQueue(String payload, Message message) {
    // payload: JSON字符串内容
    // message: 完整的AMQP消息对象(包含属性、路由键等)
}

消息解析

// 1. 打印消息内容
log.info(payload);  // 输出:{"fromUserId":"user123","toUserId":"vlogger456"}

// 2. 解析JSON为对象
MessageMO messageMO = JsonUtils.jsonToPojo(payload, MessageMO.class);

// 3. 获取路由键
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
log.info(routingKey);  // 输出:sys.msg.follow

业务逻辑分发

// 根据路由键判断消息类型
if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.FOLLOW_YOU.enValue)) {
    // 处理关注消息:sys.msg.follow
    msgService.createMsg(
        messageMO.getFromUserId(),    // 关注者
        messageMO.getToUserId(),      // 被关注者
        MessageEnum.FOLLOW_YOU.type,  // 消息类型:1
        null                         // 无额外内容
    );
}

网站公告

今日签到

点亮在社区的每一天
去签到