C++ - 仿 RabbitMQ 实现消息队列--需求分析与模块设计

发布于:2025-07-20 ⋅ 阅读:(17) ⋅ 点赞:(0)

目录

需求分析

核心概念

核心 API

交换机类型

 持久化

网络通信

消息应答

模块划分

服务端模块

持久化数据管理中心模块

虚拟机管理模块

交换路由模块

消费者管理模块

 信道管理模块

连接管理模块

 Broker 服务器模块

客户端模块

消费者管理

 信道请求模块

通信连接模块


需求分析

核心概念

  • 生产者 (Producer)
  • 消费者 (Consumer)
  • 中间人 (Broker)
  • 发布 (Publish)
  • 订阅 (Subscribe)
  • 一个生产者, 一个消费者

  •  N 个生产者, N 个消费者

其中, Broker Server 是最核心的部分, 负责消息的存储和转发。

        而在 AMQP(Advanced Message Queuing Protocol-高级消息队列协议,一个提供统一消息服务的应用层标准高级消息队列协议,为面向消息的中间件设计,使得遵从该规范的客户端应用和消息中间件服务器的全功能互操作成为可能)模型中,也就是消息中间件服务器 Broker 中,又存在以下概念:

  • 虚拟机 (VirtualHost): 类似于 MySQL 的 "database", 是一个逻辑上的集合。一个BrokerServer 上可以存在多个 VirtualHost。
  • 交换机 (Exchange): 生产者把消息先发送到 Broker 的 Exchange 上,再根据不同的规则, 把消息转发给不同的 Queue。
  • 队列 (Queue): 真正用来存储消息的部分, 每个消费者决定自己从哪个 Queue 上读取消息
  • 绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 "多对多" 关系,使用一个关联表就可以把这两个概念联系起来。
  • 消息 (Message): 传递的内容。

所谓的 Exchange 和 Queue 可以理解成 "多对多" 关系, 和数据库中的 "多对多" 一样. 意思是:
一个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息)
一个 Queue 也可以被多个 Exchange 绑定 (一个 Queue 中的消息可以来自于多个Exchange)

 

  上述数据结构, 既需要在内存中存储, 也需要在硬盘中存储

  • 内存存储: 方便使用
  • 硬盘存储: 重启数据不丢失

核心 API

对于 Broker 来说, 要实现以下核心 API,通过这些 API 来实现消息队列的基本功能:

  1. 创建交换机 (exchangeDeclare)
  2. 销毁交换机 (exchangeDelete)
  3. 创建队列 (queueDeclare)
  4. 销毁队列 (queueDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)
  10. 取消订阅 (basicCancel)

另一方面, Producer 和 Consumer 则通过网络的方式, 远程调用这些 API, 实现 生产者
消费者模型。

交换机类型

对于 RabbitMQ 来说, 主要支持四种交换机类型:

  • Direct
  • Fanout
  • Topic
  • Header

其中 Header 这种方式比较复杂, 比较少见。常用的是前三种交换机类型,项目中也主
要实现这三种

  1. Direct: 生产者发送消息时, 直接指定被该交换机绑定的队列名。
  2. Fanout: 生产者发送的消息会被复制到该交换机的所有队列中。
  3. Topic: 绑定队列到交换机上时, 指定一个字符串为 bindingKey。发送消息指定一个字符串为 routingKey。当 routingKey 和 bindingKey 满足一定的匹配条件的时候, 则把消息投递到指定队列。

 持久化

        Exchange, Queue, Binding, Message 等数据都有持久化需求,当程序重启 / 主机重启, 保证上述内容不丢失。

网络通信

        生产者和消费者都是客户端程序, Broker 则是作为服务器,通过网络进行通信。在网络通信的过程中, 客户端部分要提供对应的 api, 来实现对服务器的操作。

  1. 创建 Connection 
  2. 关闭 Connection
  3. 创建 Channel
  4. 关闭 Channel
  5. 创建队列 (queueDeclare)
  6. 销毁队列 (queueDelete)
  7. 创建交换机 (exchangeDeclare)
  8. 销毁交换机 (exchangeDelete)
  9. 创建绑定 (queueBind)
  10. 解除绑定 (queueUnbind)
  11. 发布消息 (basicPublish)
  12. 订阅消息 (basicConsume)13. 确认消息 (basicAck)
  13. 取消订阅(basicCancel)

        可以看到, 在 Broker 的基础上, 客户端还要增加 Connection 操作和 Channel 操作

  • Connection 对应一个 TCP 连接
  • Channel 则是 Connection 中的逻辑通道

        一个 Connection 中可以包含多个 Channel。Channel 和 Channel 之间的数据是独立的,不会相互干扰。这样做主要是为了能够更好的复用 TCP 连接, 达到长连接的效果, 避免频繁的创建关闭 TCP 连接。

消息应答

被消费的消息, 需要进行应答。应答模式分成两种:

  • 自动应答: 消费者只要消费了消息, 就算应答完毕了,Broker 直接删除这个消息。
  • 手动应答: 消费者手动调用应答接口, Broker 收到应答请求之后, 才真正删除这个消息。

模块划分

服务端模块

持久化数据管理中心模块

在数据管理模块中管理交换机,队列,队列绑定,消息等部分数据数据。

  • 交换机管理:
    • a. 管理信息:名称,类型,是否持久化标志,是否(无人使用时)自动删除标志,其他参数,....
    • b. 管理操作:恢复历史信息,声明,删除,获取,判断是否存在
  • 队列管理:
    • 管理信息:名称,是否持久化标志,是否独有标志,是否(无人使用时)自动删除标志,其他参数,....
    •  管理操作:恢复历史信息,声明,删除,获取,判断是否存在
  • 绑定管理:
    • 管理信息:交换机名称,队列名称,绑定主题
    • 管理操作:恢复历史信息,绑定,解绑,解除交换机关联绑定信息,解除队列关联绑定信息,获取交换机关联绑定信息
  • 消息管理:
    • 管理信息
      •  属性:消息 ID, 路由主题,持久化模式标志
      • 消息内容
      • 有效标志(持久化需要)
      •  持久化位置(内存中)
      • 持久化消息长度(内存中)
    • 管理操作:恢复历史信息,向指定队列新增消息,获取指定队列队首消息,确认移除消息

        这几个核心概念数据都需要在内存和硬盘中存储的。

  • 以内存存储为主,主要是保证快速查找信息进行处理
  • 以硬盘存储为辅,主要是保证服务器重启之后,之前的信息都可以正常保持

虚拟机管理模块

        因为交换机/队列/绑定都是基于虚拟机为单元整体进行操作的,因此虚拟机是对以上数
据管理模块的整合模块。

  • 虚拟机管理信息:
    • 交换机数据管理模块句柄
    • 队列数据管理模块句柄
    • 绑定数据管理模块句柄
    • 消息数据管理模块句柄
  • 虚拟机对外操作:
    • 提供虚拟机内交换机声明,交换机删除操作。
    • 提供虚拟机内队列声明,队列删除操作。
    • 提供虚拟机内交换机-队列绑定,解除绑定操作。
    • 获取交换机相关绑定信息
  • 虚拟机管理操作:
    • 创建虚拟机
    • 查询虚拟机
    • 删除虚拟机

交换路由模块

        当客户端发布一条消息到交换机后,这条消息,应该被入队到该交换机绑定的哪些队列中?交换路由模块就是决定这件事情的。
        在绑定信息中有一个 binding_key,而每条发布的消息中有一个 routing_key,能否入队取决于两个要素:交换机类型和 key

  1. 广播:将消息入队到该交换机的所有绑定队列中
  2. 直接:将消息入队到绑定信息中 binding_key 与消息 routing_key 一致的队列中
  3. 主题:将消息入队到绑定信息中 binding_key 与 routing_key 是匹配成功的队列中

binding_key
是由数字字母下划线构成的, 并且使用 . 分成若干部分。
例如:news.music.#,这用于表示交换机绑定的当前队列是一个用于发布音乐新闻的队列。

  • 支持 * 和 # 两种通配符, 但是 * # 只能作为 . 切分出来的独立部分, 不能和其他数字字母混用, 
    • 比如 a.*.b 是合法的, a.*a.b 是不合法的
    • * 可以匹配任意一个单词(注意是单词不是字母)
    • # 可以匹配任意零个或者多个单词(注意是单词不是字母)

routing_key
是由数据、字母和下划线构成, 并且可以使用 . 划分成若干部分。
例如:news.music.pop,这用于表示当前发布的消息是一个流行音乐的新闻.

消费者管理模块

        消费者管理是以队列为单元的,因为每个消费者都会在开始的时候订阅一个队列的消息,当队列中有消息后,会将队列消息轮询推送给订阅了该队列的消费者。
        因此操作流程通常是,从队列关联的消息管理中取出消息,从队列关联的消费者中取出一个消费者,然后将消息推送给消费者(这就是发布订阅中负载均衡的用法)。

  • 消费者信息:
    • 标识
    • 订阅队列名称
    • 自动应答标志(决定了一条消息推送给消费者后,是否需要等待收到确认后再删除消息)
    • 消息处理回调函数指针(一个消息发布后调用回调,选择消费者进行推送....)
    • void(const std::string& tag, const BasicProperties& p, const std::string& body)
  • 消费者管理:添加,删除,轮询获取指定队列的消费者,移除队列所有消费者等操作。

 信道管理模块

        本质上,在 AMQP 模型中,除了通信连接 Connection 概念外,还有一个 Channel 的概念,Channel 是针对 Connection 连接的一个更细粒度的通信信道,多个 Channel 可以使用同一个通信连接 Connection 进行通信,但是同一个 Connection 的 Channel 之间相互独立。
        而信道模块就是再次将上述模块进行整合提供服务的模块

  • 管理信息:
    • 信道 ID
    • 信道关联的消费者
    • 信道关联的连接
    • 信道关联的虚拟机
    • 工作线程池(一条消息被发布到队列后,需要将消息推送给订阅了对应队列的消费者,过程由线程池完成)
  • 管理操作:
    • 提供声明&删除交换机操作(删除交换机的同时删除交换机关联的绑定信息)b. 提供声明&删除队列操作(删除队列的同时,删除队列关联的绑定信息,消息,消费者信息)
    • 提供绑定&解绑队列操作
    • 提供订阅&取消订阅队列消息操作
    • 提供发布&确认消息操作

连接管理模块

         本质上,咱们仿照实现的服务器是通过 muduo 库来实现底层通信的,而这里的连接管理,更多的是对 muduo 库中的 Connection 进行二次封装管理,并额外提供项目所需操作。

  • 管理信息:
    • 连接关联的信道
    • 连接关联的 muduo 库 Connection
  • 管理操作:新增连接,删除连接,获取连接,打开信道,关闭信道。

 Broker 服务器模块

        整合以上所有模块,并搭建网络通信服务器,实现与客户端网络通信,能够识别客户端请求,并提供客户端请求的处理服务。
        管理信息:

  • 虚拟机管理模块句柄
  • 消费者管理模块句柄
  • 连接管理模块句柄
  • 工作线程池句柄
  • muduo 库通信所需元素...

客户端模块

消费者管理

        消费者在客户端的存在感比较低,因为在用户的使用角度中,只要创建一个信道后,就可以通过信道完成所有的操作,因此对于消费者的感官更多是在订阅的时候传入了一个消费者标识,且当前的简单实现也仅仅是一个信道只能创建订阅一个队列,也就是只能创建一个消费者,它们一一对应,因此更是弱化了消费者的存在。

  • 消费者信息:
    • 标识
    • 订阅队列名称
    • 自动应答标志(决定了一条消息推送给消费者后,是否需要等待收到确认后再删除消息)
    • 消息处理回调函数指针(一个消息发布后调用回调,选择消费者进行推送....)
  • 消费者管理:添加,删除,轮询获取指定队列的消费者,移除队列所有消费者等操作        

 信道请求模块

        与服务端的信道类似,客户端这边在 AMQP 模型中,也是除了通信连接 Connection概念外,还有一个 Channel 的概念,Channel 是针对 Connection 连接的一个更细粒度的通信信道,多个 Channel 可以使用同一个通信连接 Connection 进行通信,但是同一个 Connection 的 Channel 之间相互独立。

  • 信道管理信息:
    • 信道 ID
    • 信道关联的通信连接
    • 信道关联的消费者
    • 请求对应的响应信息队列(这里队列使用 hash 表,以便于查找指定的响应)e. 互斥锁&条件变量(大部分的请求都是阻塞操作,发送请求后需要等到响应才能继续,但是 muduo 库的通信是异步的,因此需要我们自己在收到响应后,通过判断是否是等待的指定响应来进行同步)
  • 信道管理操作:
    • 提供创建信道操作
    • 提供删除信道操作
    • 提供声明交换机操作(强断言-有则 OK,没有则创建)
    • 提供删除交换机
    • 提供创建队列操作(强断言-有则 OK,没有则创建)f. 提供删除队列操作
    • 提供交换机-队列绑定操作
    • 提供交换机-队列解除绑定操作
    • 提供添加订阅操作
    • 提供取消订阅操作
    • 提供发布消息操作

通信连接模块

        向用户提供一个用于实现网络通信的 Connection 对象,从其内部可创建出粒度更轻的Channel 对象,用于与服务端进行网络通信。

  • 管理信息:
    • 连接关联的实际用于通信的 muduo::net::Connection 连接
    • 连接关联的信管理句柄(实现信道的增删查)
    • 连接关联的 EventLoop 异步循环工作线程
    • 异步工作线程池(用于对收到服务器推送过来的消息进行处理的线程池)2. 管理操作:
    • 提供创建 Channel 信道的操作
    • 提供删除 Channel 信道的操作

网站公告

今日签到

点亮在社区的每一天
去签到