C++ Json-Rpc框架-4项目回顾总结

发布于:2025-04-20 ⋅ 阅读:(54) ⋅ 点赞:(0)

一.公共抽象模块(接口与工具)

abstract.hpp抽象类 detail.hpp其它函数 fields.hpp类型定义

主要职责:

  • 提供统一接口抽象、基础工具类,支持模块解耦

代表类:

  • BaseBuffer / BaseConnection / BaseMessage / BaseProtocol

  • MType, ELOG, DLOG 等日志与类型定义

1.abstract.hpp抽象类

1. BaseBuffer 缓冲区

class BaseBuffer {
public:
    using ptr = std::shared_ptr<BaseBuffer>;
    virtual ~BaseBuffer() {}
    
    virtual size_t readableSize() = 0;                    // 返回当前缓冲区中可读字节数
    virtual int32_t peekInt32() = 0;                      // 读4字节int(不取出)
    virtual void retrieveInt32() = 0;                     // 取出4字节int
    virtual int32_t readInt32() = 0;                      // 读取并取出4字节int
    virtual std::string retrieveAsString(size_t len) = 0; // 读取指定长度数据并返回
};

抽象统一缓冲区读写接口

不依赖具体实现(如 MuduoBuffer、其他网络库)

对缓存数据进行操作,读取4字节/任意字节长度,仅读4int/仅取4int,返回缓冲区可读取的字节长度。

2. BaseConnection 连接

class BaseConnection {
public:
    using ptr = std::shared_ptr<BaseConnection>;
    virtual ~BaseConnection() {}

    virtual void send(const BaseMessage::ptr& msg) = 0; // 发送一条消息
    virtual void shutdown() = 0;                        // 主动关闭连接
    virtual bool connected() = 0;                       // 查询连接是否存活
};

抽象连接行为,不绑定具体实现(如 MuduoConnection)

发消息 断开连接 判断连接是否存在

3. BaseProtocol 协议

class BaseProtocol {
public:
    using ptr = std::shared_ptr<BaseProtocol>;
    virtual ~BaseProtocol() {}

    virtual bool canProcessed(const BaseBuffer::ptr& buf) = 0;      // 判断是否能解析出完整包
    virtual bool onMessage(const BaseBuffer::ptr& buf, BaseMessage::ptr& msg) = 0; // 解析消息
    virtual std::string serialize(const BaseMessage::ptr& msg) = 0; // 序列化消息
};

为协议格式(如 LVProtocol)定义统一接口

支持多种协议实现(如 JSON-RPC、二进制、压缩格式等)

1.判断报文是否完整 2.解析去掉报头 3.序列化正文

4. BaseMessage 消息

class BaseMessage
    {
    public:
        using ptr=std::shared_ptr<BaseMessage>;
        virtual ~BaseMessage(){};
        virtual void setId(const std::string &id)
        {_rid=id;}
        virtual std::string rid(){return _rid;}
        virtual void setMType(MType mtype)
        {_mtype=mtype;}
        virtual MType mtype(){return _mtype;}
        virtual std::string serialize()=0;//返回josn序列化字符串
        virtual bool unserialize(const std::string &msg)=0;//从josn恢复对象 对msg进行反序列化 并将对应字段设置到成员变量中
        virtual bool check()=0;//检测字段是否完整
    private:
        std::string _rid;
        MType _mtype;
    };

所有消息对象的基类(如 RequestMessage / ResponseMessage)

提供序列化/反序列化功能

统一 ID / 类型字段访问方式

5. BaseClient 客户端

    using ConnectionCallback=std::function<void(const BaseConnection::ptr&)>;
    using CloseCallback=std::function<void(const BaseConnection::ptr&)>;
    using MessageCallback=std::function<void(constBaseConnection::ptr&,BaseMessage::ptr&)>;
    class BaseClient
    {
    public:
        using ptr =std::shared_ptr<BaseClient>;
        virtual void setConnectionCallback(const ConnectionCallback&cb){_cb_connection=cb;}//设置连接建立时的回调
        virtual void setCloseCallback(const CloseCallback&cb){_cb_close=cb;}//设置连接关闭时的回调
        virtual void setMessageCallback(const MessageCallback&cb){_cb_message=cb;}//设置信息发送时的回调
        virtual void connect()=0;//连接服务端
        virtual void shutdown()=0;//断开连接
        virtual bool send(const BaseMessage::ptr&)=0;//发送数据
        virtual BaseConnection::ptr connection()=0; //获取当前连接
        virtual bool connected()=0; //判断连接是否断开
    protected:
        ConnectionCallback _cb_connection;
        CloseCallback _cb_close;
        MessageCallback _cb_message;
    };

提供统一的客户端接口,不绑定 Muduo 或任何实现

支持未来接入其他传输协议(如 WebSocket、Redis 通道等)

RpcClient/DiscoveryClient 等都基于该接口封装

6. BaseServer 服务端

    using ConnectionCallback=std::function<void(const BaseConnection::ptr&)>;
    using CloseCallback=std::function<void(const BaseConnection::ptr&)>;
    using MessageCallback=std::function<void(const BaseConnection::ptr&,BaseMessage::ptr&)>;
    class BaseServer
    {
    public:
        using ptr =std::shared_ptr<BaseServer>;
        virtual void setConnectionCallback(const ConnectionCallback&cb){_cb_connection=cb;}//设置连接建立时的回调
        virtual void setCloseCallback(const CloseCallback&cb){_cb_close=cb;}//设置连接关闭时的回调
        virtual void setMessageCallback(const MessageCallback&cb){_cb_message=cb;}//设置信息发送时的回调
        virtual void start()=0;//运行服务端
    protected:
        ConnectionCallback _cb_connection;
        CloseCallback _cb_close;
        MessageCallback _cb_message;
    };

所有服务端组件(如 MuduoServer, RegistryServer, TopicServer)统一实现

简化启动流程,适配工厂模式创建(如 ServerFactory::create())

2.detail.hpp其它函数

1.日志宏定义

输出标准日志,标明日志级别 + 时间戳 + 文件名 + 行号

2.JSON 工具类

✅ 方法解析:

static bool serialize(const Json::Value&, std::string&)

  • Json::Value 对象序列化成 JSON 字符串

  • 用于发送前格式化消息体

static bool unserialize(const std::string&, Json::Value&)

  • 将 JSON 字符串反序列化为 Json::Value

  • 用于接收到消息后提取字段

✅ 用途:

  • RequestMessage / ResponseMessage 调用

  • 底层通过 jsoncpp 实现,接口封装简单易用

3.UUID 工具类

✅ 方法解析:

static std::string uuid()

  • 返回一个形式为:xxxx-xxxx-xxxx-xxxx-xxxx 的唯一 ID 字符串

  • 前半部分为随机生成,后半部分为线程安全递增序列号

✅ 特点:

  • 使用 std::random_device + std::mt19937 生成高质量伪随机数

  • std::atomic<size_t> 保证并发下的唯一性

  • 不依赖操作系统 UUID 库,适合跨平台场景

✅ 用途:

  • 生成 RPC 请求的 id

  • 可用于日志追踪、消息标识符等场景

3.fields.hpp类型定义

名称 分类 作用说明
字段宏定义 协议字段常量 定义标准化的 JSON 字段名,避免硬编码,提升代码一致性与可维护性。常用于消息序列化与解析过程中,例如 methodparametersrcode 等字段。
MType 消息类型识别枚举 表示当前消息的类型(如 RPC 请求、响应、服务注册、主题发布等),用于协议解析、消息创建、Dispatcher 分发等逻辑中进行类型判别。
RCode + errReason() 错误码与错误信息解释 用于表示消息处理过程中的各种错误类型,例如解析失败、服务未找到等,errReason() 提供错误码到用户可读信息的映射,便于日志输出与客户端提示。
RType RPC 调用类型枚举 表示发起的 RPC 请求属于哪种类型,如异步请求或带回调的请求,便于 RpcCallerRequestor 在调用时采用对应的处理策略。
TopicOptype Topic 操作类型枚举 表示对主题的操作类型,如创建、删除、订阅、取消订阅、发布,用于 Topic 模块判断请求行为和分发处理逻辑。
ServiceOptype 服务操作类型枚举 用于标识服务注册中心请求的具体操作,如注册服务、发现服务、服务上线下线等,服务端据此分类处理不同的服务控制请求。

4.总结:

为了让框架具备高可扩展性、低耦合性和良好的抽象能力,我们设计了多个抽象接口模块,比如:

  • BaseConnection 抽象了连接行为

  • BaseBuffer 抽象了缓冲区读取行为

  • BaseMessage 抽象了协议消息对象


BaseConnection 统一连接行为接口 解耦底层实现,提高可替换性

  • 我们的网络层底层使用了 Muduo,但我们不希望业务逻辑与具体网络库绑定

  • 所以用 BaseConnection 抽象连接操作,上层只关注 send()/shutdown() 等接口,而不关心底层是 TCP 还是 WebSocket、是 Muduo 还是 Boost.Asio。

如果未来我们想从 Muduo 切换到其他网络库,只需要重新实现 BaseConnection 而不用动上层业务代码Requestor、Dispatcher 等逻辑。


BaseProtocol 统一协议处理流程,支持多协议扩展

  • BaseProtocol 抽象了解码、序列化、判断是否可处理完整包的逻辑。

  • 我们可以在不影响上层逻辑的前提下,支持多种协议(比如自定义二进制协议、JSON、ProtoBuf 等)。

如果后期要扩展成 HTTP-RPC 或加密 RPC,仅需新增协议实现类,实现 BaseProtocol 接口即可。


BaseMessage统一消息结构,支持多种消息类型并行演进

  • 所有消息(如 RequestMessageResponseMessageTopicRequest 等)都继承自 BaseMessage,并实现统一的 serialize()unserialize()setId()mtype() 等接口。

  • 上层只需要持有 BaseMessage::ptr,无需关心消息类型,也不会因类型扩展而修改原逻辑。

Dispatcher 在收到消息时,只需调用 msg->mtype()msg->serialize()不需要知道这是哪种消息类型,从而实现真正的开闭原则。


✅ 总结一句话:

我们设计这些统一接口,是为了降低模块间耦合、提升可维护性与扩展性,从而构建一个高内聚、低耦合、模块化的可插拔式 RPC 框架。

二.消息协议模块

JsonMessage 以及子类

message.hpp

  • 主要类:BaseMessageRequestMessageResponseMessage

  • 实现 JSON-RPC 协议格式的消息封装、解析、序列化与反序列化

  • 作用:对外提供符合 JSON-RPC 规范的请求/响应数据结构,方便调用方使用

JsonMessage 继承自 BaseMessage,统一实现了所有消息的通用接口,包括序列化与反序列化逻辑

在此基础上,JsonRequest 作为请求类的基类继承自 JsonMessage,用于表示所有请求类型的父类

JsonResponse 作为响应类的基类,同样继承自 JsonMessage,因为响应中一定有状态码rode,所以额外提供了基础的 check() 方法校验响应是否成功,子类也可根据自身需求对其进行重写。以及提供设置和获取 rcode 的接口。

此后,基于 JsonRequest 和 JsonResponse,我们分别实现了对应功能的子类消息结构,包括:

RpcRequest / RpcResponse:远程函数调用

TopicRequest / TopicResponse:发布订阅消息

ServiceRequest / ServiceResponse:服务注册与发现

每个子类根据自身需要定义的字段(如 method、parameters、optype 等),实现对应的 set、get 方法,并在必要时重写 check(),确保消息完整性和处理合法性。

为什么采用这种继承结构实现 RpcRequest/Response、TopicRequest/Response、ServiceRequest/Response 等子类?

1. 职责清晰,逻辑分离

不同类型的消息负责的事情不一样像,RPC 是调用函数的,Topic 是发消息的,Service 是注册服务的。每个子类只负责处理一种业务,结构更清楚。

2. 复用基础能力,避免重复

这些子类有一些公共字段(如 id、mtype)和通用功能(如序列化、反序列化)在基类中统一实现,这样在子类就不需要重新编写。

3. 扩展灵活,遵循开闭原则

新增消息类型时,只需添加新的子类即可,不需要修改现有代码结构,原来的那些 RpcRequest、TopicRequest 完全不用动。

三. 网络通信模块

net.hpp

  • 基于 Muduo 网络库进行封装

  • 实现客户端 MuduoClient、服务端 MuduoServer、连接类 MuduoConnection、缓冲区 MuduoBuffer

  • 自定义通信协议 LVProtocol(Length-Value 协议)

  • 作用:处理低层网络事件与数据收发,为上层屏蔽底层细节

1、MuduoBuffer 缓冲区封装

MuduoBuffer 类直接套用 Muduo 库中的 muduo::net::Buffer 来实现。

目的 说明
✅ 解耦通信库 上层只操作 BaseBuffer::ptr,不知道也不关心用的是 Muduo
✅ 易于替换 如果将来换成 Boost.Asiolibuv,只需重新实现一个 AsioBuffer,不动协议/调度层
✅ 复用稳定实现 Muduo 的 Buffer 是成熟实现,具备性能优化和安全处理,直接复用即可

2、LVProtocol 协议处理类(自定义协议)

实现粘包拆包,负责从 Buffer 中解码消息、将消息编码成数据。

自定义的报文结构:

Length:该字段固定4字节⻓度,⽤于表⽰后续的本条消息数据⻓度(不包含本身)。
MType:该字段为Value中的固定字段,固定4字节⻓度,⽤于表⽰该条消息的类型。
IDLength:为消息中的固定字段,该字段固定4字节⻓度,⽤于描述后续ID字段的实际⻓度。
MID:在每条消息中都会有⼀个固定字段为ID字段,⽤于唯⼀标识消息,ID字段⻓度不固定。
Body:消息主题正⽂数据字段,为请求或响应的实际内容字段。

类成员:

1.canProcessed() 判断buffer缓冲区中是否至少有一个完整的报文

1.判断读的数据是否大于4字节 能就读4字节但不取出 因为可能后面报文并不完整

2.再判断缓冲区buffer中的可读数据是否>一个报文的长度

2.onMessage() 从缓冲区中解码一条完整消息,填充为对应的 BaseMessage 子类对象;

1.先获取这个报文的各个字段

2.再根据mtype创建对应的消息类

3.反序列化 body 并填入msg

4.最后设置msg中的 id 和类型。

3.serialize()将子类Message消息封装成协议格式的字节流。

1.获取各个字段 body字段把msg消息进行序列化

2.转化为字节流时,total_len mtye idlen 这些整型字段要填入网络字节序,事先htonl转为网络字节序。而id body字段是字符不需要字节序转换。

3、MuduoConnection 网络连接

MuduoConnection 底层用的muduo::net::TcpConnectionPtr,并借助_protocol完成序列化并添加报文字段_conn->send()进行发送。

4、MuduoServer 服务端封装类

类内成员:

1.构造函数

  • 创建一个 TcpServer 对象监听指定端口;

  • 使用 "0.0.0.0" 表示绑定本机所有 IP;

  • 使用 ProtocolFactory 创建协议解析器_protocol;

  • 内部绑定事件循环 _baseloop

    创建一个 TcpServer,并让它运行在 _baseloop 这个事件循环上,也就是说:

    所有的连接监听、新连接到达、消息接收等事件,都会由 _baseloop 来统一调度和处理。

2.start() 启动服务器

  • 设置连接与消息回调函数;

  • 启动 TCP 监听;

  • 启动事件循环,阻塞当前线程。

3.onConnection() 连接建立/断开的回调函数

管理_conns连接,从map根据TcpConnectionPtr删除或者建立BaseConntion连接。并调用删除/建立的回调函数

  • 若连接建立:

    • 创建 MuduoConnection,存入 _conns 映射表;

    • 调用连接建立回调 _cb_connection()

  • 若连接断开:

    • 查找映射并触发 _cb_close(),然后清理。

我们整个框架上层(比如 Dispatcher、Router、业务逻辑层)都用的是你封装好的抽象类 BaseConnection,所以必须建立从 TcpConnectionPtr 到 BaseConnection::ptr 的映射。

断开连接时也可以通过TcpConnectionPtr找到对应的BaseConnection,完成删除

4.onMessage() — 处理消息连接的回调函数

服务端收到消息后,解包、解析、找到连接,再交给业务处理。

  1. 把 Muduo 的 Buffer 转成统一的 BaseBuffer

  2. 循环判断是否有完整的消息包

  3. 每个完整包:

    • onMessage() 反序列化成 BaseMessage

    • 通过 _conns 找到对应的连接对象

    • 调用 _cb_message(base_conn, msg) 通知业务层处理

        // 接收到消息的回调函数
        void onMessage(const muduo::net::TcpConnectionPtr &conn,
                       muduo::net::Buffer *buf, muduo::Timestamp)
        {
            DLOG("连接有数据到来,开始处理!");
            auto base_buf = BufferFactory::create(buf);
            while (1)
            {

                if (_protocol->canProcessed(base_buf) == false)
                {
                    // 如果缓冲区的数据比2^16还大,但仍不是完整的报文
                    // 说明报文的len字段错误
                    if (base_buf->readableSize() > maxDataSize)
                    {
                        conn->shutdown(); // 断开连接
                        ELOG("缓冲区数据过大");
                        return;
                    }
                    DLOG("数据量不足");
                    break;
                }
                // 有完整的报文
                BaseMessage::ptr msg;
                // 从缓冲区的报文解 析出body字段 再 反序列化 并填入msg中
                bool ret = _protocol->onMessage(base_buf, msg);
                if (ret == false)
                {
                    conn->shutdown();
                    ELOG("缓冲区中数据错误");
                    return;
                }
                // 此时反序列化成功body字段信息放入msg中
                BaseConnection::ptr base_conn; // 获取TcpConn对应的连接 进行回调函数的调用
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _conns.find(conn);
                    if (it == _conns.end())
                    {
                        conn->shutdown();
                        return;
                    }
                    base_conn = it->second;
                }

                // 缓冲区数据解析完毕 调用回调函数
                if (_cb_message)
                    _cb_message(base_conn, msg);
            }
        }
  • 将原始 Muduo 的 Buffer* 转为统一的 BaseBuffer::ptr

  • 用协议对象 _protocol 判断是否存在完整消息(拆包逻辑)。如果 buffer 太大或内容非法,自动 shutdown()

  • 如果完整,则使用 onMessage() 解包并生成 BaseMessage::ptr;失败就shutdown()

  • 根据连接找到对应 BaseConnection 对象;如果找不到对应连接,也会断开,避免悬空指针或崩溃。

  • 调用消息处理回调 _cb_message() 将消息交给业务层。

内部调用的回调函数谁设置的?

_cb_message、_cb_connection、_cb_close 这些回调,都是上层业务在启动时通过 setXXXCallback() 主动设置进来的,用来告诉 MuduoServer:收到消息/连接变化时应该怎么通知上层

回调函数 设置者
_cb_message 通常由 Dispatcher 设置(收到消息后怎么处理)
_cb_connection 通常由 Requestor / RpcServer 设置(连接建立时要干啥)
_cb_close 通常由连接池 / Dispatcher 设置(连接断开时如何清理

5、MuduoClient客户端封装类

类内成员:

1.构造函数

初始化一个 TCP 客户端 _client,告诉它要连哪个地址(IP+端口)并指定它的事件循环在 _baseloop 中进行处理。

  • 创建了协议解析器 _protocol

  • 创建一个线程(EventLoopThread)并取出其 loop,赋给 _baseloop (EventLoopThread 是一个辅助类,它专门开一个线程,在里面跑一个 EventLoop)

  • 初始化 Muduo 的 TcpClient,绑定事件循环和目标地址

2、connect() — 连接服务端

  • 设置连接建立/断开的回调函数 onConnection()

  • 设置收到消息的回调函数 onMessage()

  • 调用 connect() 发起非阻塞连接

  • 通过 CountDownLatch 实现阻塞等待连接完成 

3、onConnection() — 连接建立回调

  • 创建并封装连接对象 _conn(是 BaseConnection 类型)

  • 将连接建立通知 CountDownLatch,使 connect() 不再阻塞连接服务器

  • 如果断开,只打印日志。

4.、onMessage() — 接收消息回调

客户端的oMessage和服务端的 只有一处不同。那就是客户端不需要去找对应的连接,解析完直接调用回调

因为服务端 用map管理着多个连接(来自多个客户端)

  • 每当某一个客户端发来数据,Muduo 就会调用这个连接对应的 onMessage()

  • 服务端需要管理连接池、区分不同客户端,从而找到消息是“谁发的”。

而客户端一般只维护一条连接(就是连向服务端的)

  • 所以 onMessage() 不需要处理多个连接;

  • 它只需要关心:服务端发回来了什么消息,我应该怎么处理。

        // 接收到消息的回调函数
        void onMessage(const muduo::net::TcpConnectionPtr &conn,
                       muduo::net::Buffer *buf, muduo::Timestamp)
        {
            DLOG("连接有数据到来,开始处理!");
            auto base_buf = BufferFactory::create(buf);
            while (1)
            {
                if (_protocol->canProcessed(base_buf) == false)
                {
                    // 如果缓冲区的数据比2^16还大,但仍不是完整的报文
                    // 说明报文的len字段错误
                    if (base_buf->readableSize() > maxDataSize)
                    {
                        conn->shutdown(); // 断开连接
                        ELOG("缓冲区数据过大");
                        return;
                    }
                    DLOG("数据量不足");
                    break;
                }
                // 有完整的报文
                BaseMessage::ptr msg;
                // 从缓冲区的报文解析出body字段再内容反序列化并填入msg中
                bool ret = _protocol->onMessage(base_buf, msg);
                if (ret == false)
                {
                    conn->shutdown();
                    ELOG("缓冲区中数据错误");
                    return;
                }
                // 缓冲区数据解析完毕 调用回调函数
                if (_cb_message)
                    _cb_message(_conn, msg);
            }
        }

服务端的 onMessage() 重点在于“识别是哪一个连接发来的消息,并进行映射分发”;而客户端的 onMessage() 则更聚焦于“接收响应并直接处理”,不涉及连接管理,逻辑更加单一直接。

5.send() — 发送消息

  • 检查连接是否正常

  • 通过 _conn->send() 发送消息(底层调用的moude库中的send 由 MuduoConnection 完成序列化 + 发送)

总结:

我的网络通信模块基于 Muduo 网络库 实现,采用
了面向接口的设计方式。上层模块不直接依赖 Muduo,而是通过我们自定义的 BaseConnection、BaseClient、BaseServer 等抽象接口进行操作。还写了一个自定义协议类 LVProtocol,专门负责消息的序列化和拆包。

四.Dispatcher 消息分发模块

dispatcher.hpp

Callback 类(抽象基类)

作用:

  • 所有消息类型的回调都继承它;

  • 定义统一的回调接口 onMessage()

  • 实现了 Dispatcher 对所有消息类型“统一操作”的能力。

CallbackT<T> 模板类(子类)

作用:

  • 针对不同的消息类型(如 RpcRequestTopicRequest)封装对应的回调;

  • 内部持有具体类型的处理函数 _handler

  • 在触发时,先用 dynamic_pointer_cast<T> 安全转换成真正的消息类型,再执行 handler。

③ Dispatcher 类本体

函数 作用
registerHandler() 注册某个消息类型对应的处理函数
onMessage() 接收到消息时,根据 mtype 分发给对应回调执行
_handlers 保存不同消息类型对应的 Callback 实例,实现多态调用
总结一下dispatcher模块运行的逻辑,要在dispatcher中注册某个类型消息的回调函数,把1消息类型2回调函数传给registerHandler,然后在插入_handlers中消息类型->对应的回调函数。当Dispatcher 收到了某个消息,根据这个消息的类型去 _handlers中找对应的回调函数,并执行。

这个过程中会出现一个问题。

在我们向dispatcher中注册某个类型消息的回调函数,传入的第二个参数 函数类型是不一样的。(具体在于每个回调函数的第二个参数 消息的子类是不同类型)

这就导致了map _handlers无法存储统一的回调函数。

那如果把回调函数的第二个参数 不同消息的子类改为用BaseMessgae统一的基类接收,再在回调函数中std::dynamic_pointer_cast将父类->子类 怎么样?

可以,但有缺点。主要是我必须去 猜 判断这个msg是不是对应的子类,如果不是要怎么处理。 如果说后面我们新增了一个类型,是不是还要去修改原代码来判断怎么处理,这违反开闭原则

缺点 说明
✅ 类型不安全 如果写错类型,比如试图将 TopicRequest 转成 RpcRequestdynamic_pointer_cast 会失败,返回空指针;
✅ 可读性差 每次都要手动写类型转换,代码冗长、容易错;
✅ 高耦合 调用者必须“猜出”对象真实类型,违反开闭原则
✅ 性能成本 动态类型检查在运行时有额外性能开销(尽管不大,但大量调用下也会累积);

所以具体讲一下:在传入回调函数类型不一样的情况下,怎么用map统一管理?

Dispatcher 如何实现类型安全的回调派发?

解决方案:采用模板 + 继承 + 多态(+ dynamic_pointer_cast)

定义一个父类Callback 里面的回调函数设置为虚函数,这些根据消息类型实现的不同的模板类CallbackT作为子类继承父类并实现各自的回调函数map中的handler存的不再是回调函数,而是一个父类指针,通过父类指针调用子类的回调函数。

具体过程: 使用模板类 CallbackT<T> 根据消息类型 T 生成不同的子类,它们内部包装了各自类型的回调函数。(因为子类传入的是一个函数类型,保存的一个函数类型,而T是一个消息类型。还需要MessageCallback将T消息类型和回调函数类型绑定在一起)

这些模板类继承自统一的基类 Callback,并重写了其 onMessage() 虚函数。 当我们在调用registerHandler注册回调函数时,会创建一个CallbackT<T>的对象并获取其指针,最后设置到map中。

在 Dispatcher 中,map<MType, Callback::ptr> 存储的是基类指针,但实际指向的是不同子类 CallbackT<T> 的对象

调用 onMessage() 时,先通过   std::unordered_map<MType,Callback::ptr> _handlers 找到对应消息类型的基类指针 基类指针会通过虚函数机制调用对应子类的实现再在各子类重写的oMessgae通过 dynamic_pointer_cast 将消息转换为正确的类型,最终调用具体的业务回调函数。

当你用基类指针调用一个虚函数时,程序会在运行时根据对象真实的类型,自动跳转到对应子类重写的函数上

这就是虚函数机制,也叫做动态绑定

简单流程:

🔹 第一步:注册阶段

我们在服务端调用 registerHandler<T>() 注册某个消息类型的回调函数。

  •  模板:根据传入的消息类型 T(如 RpcRequest),生成一个 CallbackT<T> 子类。

  •  继承:这个模板子类继承自统一的基类 Callback

  •  然后把这个子类对象作为 Callback::ptr 存入 _handlers 映射表中。


🔹 第二步:消息到达 → 分发阶段

当 Dispatcher 收到一条消息时,调用 onMessage(conn, msg)

  • 根据 msg->mtype()_handlers 中查出对应的基类指针;

  •  多态:通过虚函数 onMessage(),自动调用到具体的 CallbackT<T> 子类实现;

  • 子类中使用 dynamic_pointer_cast<T>(msg) 转为强类型;

  • 最终调用你注册的回调函数,完成业务处理。

简单总结:

首先先定义一个父类里面有虚函数 oMessage() ,再来一个模板类,根据消息类型T生成子类继承父类,在里面重写oMessage() 。插入中_handler中保存的父类指针,后面找到父类指针通过虚函数机制调用对应子类的oMessage() ,完成类型安全的回调处理。

五.RPC 调用模块

支持同步调用、callback 异步、future 异步三种方式。

client/requestor.hpp  请求发送 & 响应处理器

client/rpc_caller.hpp  统一封装三种调用方式

client/rpc_client.hpp  整体客户端封装,组合了网络、注册发现与调用流程

requestor.hpp

1.Requestor 类(请求发送与响应处理器)

Requestor 类负责将请求发送到服务器,接收响应并处理,完成请求/响应的通信。

类内成员

类内函数

1. onResponse() — 处理响应

 作用:

  • 接收响应:该函数在接收到响应消息后,根据rid查找并找到该请求的描述RequestDescribe)对象,然后处理响应。

  • 根据请求类型处理响应

    • 异步请求(REQ_ASYNC:将响应设置到 std::promise 对象中,触发关联的 std::future 就绪。

    • 回调请求(REQ_CALLBACK:如果存在回调函数,就调用回调函数。

处理步骤:

  1. 查找请求描述:根据响应的 rid(请求 ID),从 _request_desc 中查找对应的请求描述。

  2. 根据请求类型处理响应

    • 异步请求:通过 response.set_value(msg) 设置响应结果,使 future 对象变为可获取的状态。

    • 回调请求:如果存在回调函数 callback,则调用 callback(msg)

  3. 删除请求描述:处理完响应后,移除该请求描述对象。

2. send() — 发送异步请求

作用:

  • 发送异步请求:该函数用于向服务端发送请求,并获取一个 std::future 对象,用于获取异步响应。

处理步骤:

  1. 创建请求描述:调用 newDescribe() 函数生成一个新的请求描述对象,并将请求类型设置为 REQ_ASYNC

  2. 发送请求:通过 conn->send(req) 向服务端发送请求。

  3. 获取 future:从请求描述中获取与 std::promise 关联的 std::future 对象,并赋值给 async_rsp,客户端可以通过 async_rsp.get() 获取响应结果。

3. send() — 发送同步请求

作用:

  • 发送同步请求:该函数是同步调用的封装,发送请求并等待服务端响应。本质就是       异步send()+函数内get()阻塞等待。

处理步骤:

  1. 调用异步 send():通过调用上面定义的异步 send() 函数,并获取到 std::future 对象。

  2. 等待响应:调用 rsp_future.get() 阻塞等待异步响应,直到响应可用并返回。

4. send() — 发送异步回调请求

作用:

  • 发送回调请求:该函数用于发送异步请求,并在响应到达时通过回调函数 cb 进行处理。

处理步骤:

  1. 创建请求描述:通过 newDescribe() 创建请求描述对象,设置请求类型为 REQ_CALLBACK,并保存用户的回调函数 cb

  2. 发送请求:调用 conn->send(req) 向服务端发送请求。 newDescribe时设置回调函数

  3. 回调函数:服务端响应后,onResponse() 函数会触发回调函数 cb(msg),执行用户定义的处理逻辑。

2.RpcCaller 类(统一封装三种调用方式)

rpc_caller.hpp

RpcCaller 封装了对 Requestor 的使用,屏蔽了发送请求、处理响应的底层逻辑,对外提供统一的 RPC 接口,支持:

  • 同步调用(阻塞直到返回结果);

  • 异步 Future 调用(返回 future,可等待);

  • 异步回调调用(收到响应后触发回调函数);

为什么 Requestor::send() 不直接处理 RpcRequest,而是让 RpcCaller 来封装具体业务?(如 RpcRequest 和 RpcResponse)

Requestor要处理多种消息类型(不仅仅是 RpcRequest

Requestor::send() 接收 通用的 BaseMessage,这样它可以支持不同类型的消息,不仅仅局限于 RpcRequest。

模块 职责
Requestor 发送消息、注册回调函数、处理响应回调;只关心消息的传递和回调,不涉及业务逻辑。
RpcCaller 构造 RpcRequest、解析 RpcResponse,并组织具体的业务逻辑。它封装了 RPC 的调用和响应处理,涉及到协议的解析和具体的消息内容。
优点:可维护 Requestor 专注于网络通信和回调处理,不涉及具体的业务协议和消息格式, 这样修改或新增协议时,只需要改动 RpcCaller 或其他业务相关的类,不需要修改 Requestor。 
Requestor 只负责消息的发送和响应处理,而不关心具体的业务协议和消息类型。通过让 RpcCaller 封装具体的协议(如 RpcRequest 和 RpcResponse)。

 构造函数 RpcCaller(const Requestor::ptr&)

1. 同步调用接口

作用:

  • 封装一个 阻塞式的 RPC 调用

  • 用户传入方法名和参数,等待服务器响应后,返回调用结果。

内部流程:

  1. 构造 RpcRequest 请求消息(设置 id、mtype、method、params);

  2. 使用 _requestor->send() 发送请求,并同步等待响应;(因为send()是重载函数 参数的类型必须保持一致(req_msg子类RpcRequest->基类BaseMessage))

  3. 对响应 rsp_msg 进行类型转换(转为 RpcResponse);

  4. 判断返回码 rcode 是否成功;

  5. 返回其中的 result() 给调用者。

2. 异步 Future 调用接口

作用:

  • 向服务器发送请求,立即返回一个 future 对象,调用方可稍后使用 future.get() 获取结果;

  • 非阻塞,适合异步场景。

内部流程:

  1. 构造 RpcRequest

  2. 创建一个 std::shared_ptr<std::promise<Json::Value>>,从中派生 future

  3. 将回调函数绑定为 RpcCaller::Callback,并捕获 promise

  4. 调用 _requestor->send() 发送请求并注册回调;

  5. 回调触发时,内部调用 promise->set_value(...),将结果设置到 future。1.收到消息调用回调cb 2. cb bind绑定的Callback函数 在里面进行set_value

局部对象auto json_promise=std::make_shared<std::promise<Json::Value>>();创建promise对象并用shared指针来管理它,出了作用域call函数 引用计数-- 为0 promise会析构。这就会导致与它关联的future result在外部get()获取结果时,会异常。

有没有什么方法让 json_promise 会存活到 callback 被调用?

shared_ptr+bind(值捕获)+外部拷贝保存

为什么 bind 捕获 shared_ptr 能延长 promise 的生命周期?

本质就是:

只要某个 shared_ptr 指向的对象,引用计数 不为 0,那它就不会被销毁

所以我们通过创建另一个生命周期更长的 shared_ptr(比如通过 bind() 值捕获并在外部保存),来延长这段资源的生命周期。

1. bind 捕获的作用:

  • std::bind 值捕获 shared_ptrshared_ptr 拷贝到回调函数 cb 内部,bind 本身并不会延长 json_promise 的生命周期。它只是创建了一个闭包,其中包含了 shared_ptr 的拷贝。

  • 这意味着,shared_ptr 的生命周期没有被直接延长,它仍然会在局部作用域结束后被销毁。

2. 真正延长生命周期的机制:

  • 关键在于:回调函数 cb 被外部保存。在 Requestor::send() 中,cb 被保存到了 RequestDescribe 中,这就是生命周期延长的真正关键

  • 通过将回调函数 cb 存储在 RequestDescribe 中,json_promise 的生命周期得以延长,直到 RequestDescribe 被销毁

3. 生命周期延长的过程:

  • 当调用 std::bind 捕获 json_promise 时,shared_ptr 被拷贝到 cb 中。

  • 然后,cb 被存储在 RequestDescribe 中,并通过 _request_desc 保持对 RequestDescribe 对象的引用。

  • 由于 RequestDescribe 的引用保持了对回调函数 cb 的持有,json_promise 的生命周期被延长,直到服务端响应到达并触发回调,最后 RequestDescribe 被删除。

4. 总结:

bind 捕获 json_promise 并不会延长其生命周期,真正延长生命周期的关键是回调函数 cb 被外部保存到 RequestDescribe 中,直到请求描述对象被删除,json_promise 才会被析构。

3. 异步回调调用接口

作用:

  • 向服务器发送请求,当响应到来时,自动触发你提供的回调函数 cb

  • 非阻塞,适合事件驱动。

内部流程:

  1. 构造 RpcRequest

  2. 使用 std::bind 把回调函数包装成 Callback1

  3. 调用 _requestor->send() 注册该回调;

  4. Requestor::onResponse() 收到响应后,调用你的回调。即Callback1 () 

六.服务注册与发现模块

实现服务注册、发现、上线、下线、轮询负载均衡。

  • client/rpc_registry.hpp  包含 RegistryClientDiscoveryClient

  • server/rpc_registry.hpp  包含 RegistryServerPDManager

一.client/rpc_registry.hpp客户端

客户端要么是服务提供者,要么是调用者,因此必须 注册自己的服务,也要 发现别人的服务

1. Provider 类(服务提供者)

Provider 类负责将服务注册到服务注册中心。它是服务提供者端的客户端,通过与 Requestor 类合作,向服务注册中心注册自己的服务信息。

registryMethod() — 注册服务方法

作用:

  • 向服务注册中心发送服务注册请求。

  • 通过 MessageFactory::create<ServiceRequest>() 创建 ServiceRequest 消息,并设置为 SERVICE_REGISTRY 操作类型(注册服务)。

  • 调用 Requestor::send() 发送请求,并同步等待响应。

流程:

  1. 创建请求消息:构造一个 ServiceRequest 消息,其中包括服务地址、方法名等信息。

  2. 发送注册请求:通过 Requestor 发送请求,等待注册中心的响应。

  3. 响应处理:如果注册失败,输出错误信息;如果成功,则返回 true

2. MethodHost 类(方法与服务提供者管理)

MethodHost 类用于本地存储 每个服务方法对应的服务提供者地址。它支持服务上线时的添加服务下线时的删除,并通过轮询负载均衡来选择服务提供者。

1.appendHost() — 添加服务提供者

当新的服务上线时,向 MethodHost 中的地址列表 _hosts 中添加新的服务提供者地址。

2.removeHost() — 移除服务提供者

3.chooseHost() — 选择服务提供者

使用轮询机制来选择一个服务提供者。_idx 是当前轮询的索引,确保每次请求都能按顺序选择不同的提供者。

4.empty() — 检查是否有服务提供者

3. Discoverer 类(服务发现者)

Discoverer 类用于处理服务发现的过程,包括从服务注册中心查询服务提供者的地址,并且管理服务下线时的清理工作

1.serviceDiscovery() — 查询服务提供者

作用:

  • 通过服务发现,查找指定方法(method)的服务提供者地址。

  • 如果本地已有缓存的服务提供者,则直接返回;

  • 如果没有,构造服务发现请求并发送到注册中心,接收并更新服务提供者信息。

流程:

  1. 查询本地缓存:首先检查 _method_hosts 中是否已有该方法的服务提供者地址。

  2. 发送服务发现请求:如果没有,创建 ServiceRequest 消息并发送给服务注册中心,请求该服务的提供者地址。

  3. 处理响应:如果服务提供者地址返回成功,将其缓存到 _method_hosts 中,并选择一个服务提供者(使用 MethodHost::chooseHost())。

2.onServiceRequest() — 服务上线/下线通知处理

作用:

  • 处理服务上线或下线的通知。

  • 如果服务上线,添加新的服务提供者地址;如果服务下线,移除地址并调用下线回调。

流程:

  1. 判断操作类型:根据消息的 optype 字段判断是上线(SERVICE_ONLINE)还是下线(SERVICE_OFFLINE)。

  2. 服务上线:如果是上线操作,调用 MethodHost::appendHost() 将新服务提供者地址添加到本地。

  3. 服务下线:如果是下线操作,调用 MethodHost::removeHost() 从本地移除服务提供者地址。

模块职责:

类名 职责
Provider 向服务注册中心注册服务并处理服务提供者相关的操作。
MethodHost 管理每个方法的服务提供者地址,提供服务的负载均衡和服务状态管理。
Discoverer 实现服务发现功能,查询并缓存服务提供者地址,处理服务上线/下线通知。

二.server/rpc_registry.hpp服务端

注册中心负责 接收服务注册/发现请求,并保存和回应服务信息,让客户端之间能互相找到。

1.ProviderManager 服务提供者管理类

管理所有服务提供者的信息;

维护:连接 → 提供者(vector<string> methods)、方法名 → 提供者集合 的双向映射;

支持服务注册、服务断开清理、获取某方法的提供者地址列表。

1.addProvider(conn, host, method)

  • 注册新的服务提供者;

  • 建立连接→提供者对象的映射,并将其加入 _providers[method] 中。

2.getProvider(conn)

  • 获取某个连接所绑定的服务提供者对象,用于判断连接断开是否是服务提供者。

3.delProvider(conn)

  • 当服务提供者下线或断开连接时:

    1. 找出其支持的所有方法;

    2. 将其从 _providers[method] 中移除;

    3. _conns 中移除此连接。

4.methodHosts(method)

  • 获取某个方法的所有服务提供者地址列表,供服务发现应答使用。

2.DiscovererManager 服务消费者管理类

管理所有调用过服务发现的客户端;

维护:连接 → 服务发现者对象、方法名 → 已订阅发现者 的映射;

当服务上线或下线时,对发现过该服务的所有客户端发送通知。

1.addDiscoverer(conn, method)

  • 在服务发现请求中被调用;

  • 查找 不存在则建立发现者对象并加入_conns[c]

  • 找到提供该method方法的发现者列表 _discoverers[method]并加入discoverer该发现者

  • 再从该发现者中 增加已经发现的方法discoverer->appendMethod(method)

2.delDiscoverer(conn)

  • 客户端断开连接时:

    1. 找到该连接conn对应的发现者 没有直接返回

    2. 查找它发现过的所有方法,从一个一个方法中 _discoverers[method] 中移除此客户端;(it->second discoverers)

    3. _conns 中移除。

3.onlineNotify(method, host)

  • 通知发现过该服务方法的所有客户端:新的服务提供者上线;

  • 调用内部的 notify(...) 构造并发送 SERVICE_ONLINE 消息。

    • 先判断该服务有没有被人发现过,没有就不需要进行通知。

    • 有就构建服务请求 操作类型上线还是下线由传入的optype决定

    • 对所有发现该方法的客户端conn进行发现请求

4.offlineNotify(method, host)

  • 类似 onlineNotify(),但操作类型为 SERVICE_OFFLINE,用于服务下线通知。

3.PDManager 注册与发现总调度

将请求分发给 ProviderManager 和 DiscovererManager;

处理服务注册、发现、以及连接断开事件;

构造并发送注册/发现/错误响应消息

onServiceRequest(conn, msg)

  • 中央调度方法,根据消息操作类型:

    • SERVICE_REGISTRY:添加提供者,通知上线,发送注册响应;

    • SERVICE_DISCOVERY:添加发现者,发送服务地址列表;

    • 其他类型:返回错误响应。

onConnShutdown(conn)

  • 连接断开时被调用;

    • 若是服务提供者:调用 offlineNotify() 通知下线,删除其所有方法;

    • 无论是否提供者,都调用 delDiscoverer() 删除发现者信息。

registryResponse() / discoverResponse() / errorResponse()

  • 构造并发送对应的应答消息,使用统一消息协议进行交互。

类名 职责
ProviderManager 管理服务提供者,处理注册与断开,维护方法与提供者的映射
DiscovererManager 管理服务调用方,处理服务发现与上线/下线通知,维护方法订阅关系
PDManager 服务注册发现统一调度器,处理注册、发现、连接断开等事件

总结:

客户端难点:

我们客户端 Discoverer 有一个 method → address 的本地缓存,服务发现时先从本地查找,用来降低频繁访问注册中心的开销。但如果有方法上线/下线,缓存就需要进行更新。
为了解决这个问题,我在服务端实现了订阅推送机制。
服务端通过 ProviderManager 记录所有注册的服务提供者,每当有服务注册或上线时,会由 PDManager::onServiceRequest 作为统一调度入口,调用 ProviderManager::addProvider 完成服务注册。
随后,PDManager 会通知 DiscovererManager::onProviderAdded,将该服务的上线信息推送给所有已订阅该服务的客户端。
客户端这边,Discoverer 通过注册订阅时绑定的 onServiceRequest 回调函数来接收服务端的推送消息,并自动更新本地缓存中的服务地址信息。


这个机制实现了从服务注册、发现、订阅到推送和缓存更新的完整闭环,保证了客户端发现服务的实时性和正确性,极大提高了框架的稳定性与扩展能力。

简单来说就是:当有服务上线时,注册中心服务端就会给客户端发服务上线/下线的消息,客户端再进行更新本地缓存的method->address的映射关系。

服务端难点:

要维护“服务名 ↔ 服务提供者 ↔ 发现者”三者的双向映射关系。

注册中心必须要知道

  • 谁提供了什么服务(服务名 → 提供者)

  • 谁想发现什么服务(服务名 → 发现者)

  • 断开后如何准确清除(连接 → 服务名)

🔹 ProviderManager:服务提供者管理器

✅ 管理哪些服务方法是由哪些连接(Provider)提供的。

🔸 关键结构:

std::unordered_map<std::string, std::set<Provider::ptr>> _providers; // 方法 → 提供者集合
std::unordered_map<BaseConnection::ptr, Provider::ptr> _conns;       // 连接 → 提供者

🔸 添加流程:addProvider(conn, host, method)

  • 若连接已存在 → 复用旧 Provider;否则创建新 Provider。

  • 更新 _conns_providers[method] 的双向映射。

  • 更新 Provider::methods 中记录它能提供的所有方法。

🔸 删除流程:delProvider(conn)

  • 拿到 Provider::methods 中每个 method

  • 逐个从 _providers[method] 中删除该 Provider

  • 最后清除 _conns[conn]


🔹 DiscovererManager:服务发现者管理器

✅ 管理谁订阅了哪些服务,服务上线时推送给哪些人。

🔸 关键结构:

std::unordered_map<std::string, std::set<Discoverer::ptr>> _discoverers; // 方法 → 发现者集合
std::unordered_map<BaseConnection::ptr, Discoverer::ptr> _conns;         // 连接 → 发现者

🔸 添加流程:addDiscoverer(conn, method)

  • 若连接已存在 → 复用旧 Discoverer;否则创建新 Discoverer。

  • _discoverers[method] 加入该 Discoverer

  • _subscribers[conn] 加入该 method(方法存储在 Discoverer::methods 中)

🔸 删除流程:delDiscoverer(conn)

  • 从其记录的所有方法里逐个删除

  • 最后清除 _conns[conn]


🔹 PDManager:统一调度器

onServiceRequest() 是所有服务请求的统一调度入口。

🚦 onServiceRequest(conn, msg)

  • 若是服务注册:

    1. _providers->addProvider() → 建立注册映射

    2. _discoverers->onlineNotify() → 对该方法的所有发现者进行服务上线通知

    3. 回应 registryResponse() 对进行服务注册的服务提供者返回应答

  • 若是服务发现:

    1. _discoverers->addDiscoverer() → 记录发现关系

    2. 回应 discoverResponse() 对进行服务发现的发现者返回应答

🔥 onConnShutdown(conn)

  • 若是提供者 → 先推送服务下线(offlineNotify()),再 delProvider(conn)

  • 再尝试删除该连接对应的发现者 delDiscoverer(conn)

简单流程:

服务注册,1._providers->addProvider() 进行保存连接conn->提供者 方法->提供者+1 更新提供者中提供的method方法(便于后面提供者下线时,能反向查找它注册过的所有方法,从而一一删除 method → provider 的映射,并向订阅者正确推送服务下线通知)

2._discoverers->onlineNotify() 对服务发现者进行服务上线通知 3.registryResponse() 对进行注册的客户端返回应答。

连接断开,1.先判断是否为提供者下线。是1.遍历提供者的每个方法进行服务下线通知。

  2.删除对该提供者的管理,遍历提供者的每个方法进行删除method->提供者列表中的该提供者。3.最后删除连接与服务提供者的关系

2.不管是不是发现者 都直接进行_discoverers->delDiscoverer(conn) 同理从_discoverers根据发现的方法名获取对应的发现者列表,再进行删除。最后删除_conns里面的连接。

delDiscoverer(_conn) 传入_conn → _conns[conn] → Discoverer::methods每个 method → _discoverers[method] → 删除该 discoverer 最后 → _conns.erase(conn) 完成清理

七. 发布-订阅模块(Topic 模块)

client/rpc_topic.hpp server/rpc_topic.hpp

  • 客户端模块:TopicClient, TopicManager

  • 服务端模块:TopicManager, Dispatcher 分发消息

  • 支持创建主题、订阅、取消订阅、消息推送等操作

  • 作用:支持广播式/订阅式消息分发,拓展场景适用性

一.server/rpc_topic.hpp服务端

Topic 服务端的作用是维护主题与订阅者之间的映射关系,并实现基于主题的高效、解耦、实时的消息推送机制。

功能

服务端要完成的功能包括以下几个部分:

功能点 代码接口 作用描述
创建主题 topicCreate 客户端请求创建一个新的主题用于后续订阅和消息发布。
删除主题 topicRemove 删除一个主题并清理其订阅者数据。
订阅主题 topicSubscribe 客户端订阅指定主题,服务端记录该订阅关系。
取消订阅 topicCancel 客户端取消订阅指定主题。
发布消息到主题 topicPublish 将消息推送给所有订阅该主题的客户端。
连接关闭时清理订阅 onShutdown 当客户端断开连接,移除它在所有主题中的订阅。
请求分发 onTopicRequest 根据请求类型调用上述具体操作。

类内成员

1. Topic 结构(一个主题对象)

  • 字段:

    • topic_name: 主题名

    • subscribers: 订阅该主题的所有客户端连接封装对象(Subscriber)

  • 主要方法:

    • appendSubscriber():添加订阅者

    • removeSubscriber():删除订阅者

    • pushMessage():向所有订阅者发送消息


2. Subscriber 结构(客户端连接的订阅者视图)

  • 字段:

    • conn: 当前客户端连接

    • topics: 此客户端当前订阅的主题集合

  • 方法:

    • appendTopic():添加订阅记录

    • removeSTopic():取消订阅


3. 订阅与主题管理容器:

  • _topics: map<string, Topic::ptr>
    存储所有主题名及其对应的 Topic 对象。

  • _subscribers: map<BaseConnection::ptr, Subscriber::ptr>
    记录每个连接所对应的订阅者对象,用于断开连接时自动清理。

各功能实现流程

1. 创建主题(TOPIC_CREATE

  1. 客户端发送主题创建请求(MType 为 REQ_TOPIC,操作类型为 TOPIC_CREATE,附带主题名)。

  2. Dispatcher 模块接收到消息并调用 TopicManager::onTopicRequest()

  3. 判断操作类型为 TOPIC_CREATE,调用 topicCreate()

  4. 不管存不存在 直接创建并插入:

    • 创建 Topic 对象;

    • 插入 _topics[topic_name] = topic

  5. 返回成功响应 topicResponse() 给客户端。

2. 删除主题(TOPIC_REMOVE

  1. 客户端发送删除主题请求(TOPIC_REMOVE)。

  2. onTopicRequest() 判断操作类型,调用 topicRemove()

  3. 查找主题名是否存在于 _topics

    • 若不存在,返回错误响应 errorResponse()

  4. 获取当前主题的所有订阅者集合。

  5. _topics 删除该主题对象。

  6. 遍历每个订阅者,调用 subscriber->removeSTopic(topic_name),删除订阅的该主题。

  7. 返回 topicResponse()


3. 订阅主题(TOPIC_SUBSCRIBE)

  1. 客户端发送订阅请求,指定订阅的主题名。

  2. Dispatcher 调用 onTopicRequest(),判断为 TOPIC_SUBSCRIBE,转向 topicSubscribe()

  3. 加锁 _topics

    • 查找是否存在指定的 topic,若无返回 false -> errorResponse()

  4. 查找当前连接是否已经存在订阅者对象:

    • 有就获取 无则创建 Subscriber(conn) 并插入 _subscribers[conn] = subscriber

  5. 互相注册关系:

    • topic->appendSubscriber(subscriber)

    • subscriber->appendTopic(topic_name)

  6. 返回订阅成功响应 topicResponse()


 4. 取消订阅主题(TOPIC_CANCEL

  1. 客户端发送取消订阅请求。

  2. Dispatcher 调用 onTopicRequest(),判断为 TOPIC_CANCEL

  3. 调用 topicCancel()

    • 查找对应的主题和订阅者对象(可能为空)。

    • 如果找到了订阅者调用removeSTopic删除主题,对于主题再存在再调用removeSubscriber删除里面的订阅者的管理

      • subscriber->removeSTopic(topic_name)

      • topic->removeSubscriber(subscriber)

  4. 返回取消订阅响应 topicResponse()


5. 发布消息(TOPIC_PUBLISH

  1. 客户端发送主题发布请求(含发布的主题名 + 消息内容)。

  2. Dispatcher 调用 onTopicRequest(),判断为 TOPIC_PUBLISH

  3. 执行 topicPublish()

    • 查找 _topics[topic_name],如果没有主题则返回 false -> errorResponse()

  4. 调用 topic->pushMessage(msg)给每个订阅该主题的订阅者发消息

    • 遍历该主题所有 subscribers,逐一 subscriber->conn->send(msg)

  5. 返回发布成功响应 topicResponse()


 6. 客户端断开连接时自动清理(onShutdown()

  1. 某客户端连接断开后,Dispatcher 通知调用 onShutdown(conn)

  2. _subscribers 中查找该连接对应的 subscriber

    • 若找不到说明不是订阅者,直接 return。

  3. 获取该订阅者订阅的所有主题名 vector<Topic::ptr> topics。

  4.  erase_subscribers[conn]

  5. 遍历每个主题名:

    • _topics 中查找对应 Topic

    • 执行 topic->removeSubscriber(subscriber)

二.client/rpc_topic.hpp客户端

Topic 客户端是发布-订阅通信模型中的参与者,既可以订阅某个主题接收消息,也可以向指定主题发布消息,实现松耦合、高并发、低延迟的消息分发。

功能

功能 方法名 说明
创建主题 create(conn, key) 向服务端发起创建某个主题
删除主题 remove(conn, key) 请求服务端删除指定主题
订阅主题 subscribe(conn, key, cb) 注册回调,并订阅主题,接收消息推送
取消订阅 cancel(conn, key) 停止接收某主题消息,清理回调
发布消息 publish(conn, key, msg) 发布一条消息 msg 到指定主题 key,请求服务端转发给所有订阅者
消息处理 onPublish(conn, msg) 收到服务端推送的消息时触发回调处理

类内成员

 std::mutex _mutex

  • 用途:保护 _topic_callbacks 的线程安全,防止并发访问导致竞态条件


std::unordered_map<std::string, SubCallback> _topic_callbacks

  • 用途:以主题名为 key,存储每个主题对应的回调函数

  • 说明:这个 map 让客户端可以根据推送过来的 topic_key 找到并执行注册时的回调函数


Requestor::ptr _requestor

  • 用途:底层通信模块,负责消息的构造、发送、接收与响应解析

  • 说明:Requestor 是核心的通信组件,它封装了 send + wait 的同步调用过程

各功能实现流程

关于主题的操作都会调用一个共同的函数commonRequest,负责发送请求消息并同步等待响应。

构建请求并发送commonRequest

统一封装所有请求的发送逻辑,是 create/remove/subscribe/cancel/publish 的公共底层实现。

  • 它负责构造 TopicRequest,设置 topic_keyoptypemsg

  • 调用 _requestor->send() 发送请求并接收响应。

  • 判断请求是否处理成功,响应失败会被记录日志并返回 false。

1. 创建主题( TOPIC_CREATE

  1. 客户端调用 TopicManager::create(conn, key),意图创建主题。

  2. 封装消息类型 MType::REQ_TOPIC,操作类型为 TOPIC_CREATE,设置主题名 key

  3. 调用 _requestor->send(conn, msg_req, msg_rsp) 发送请求,等待服务端响应。

  4. 解析响应 msg_rsp,转换为 TopicResponse 类型。

  5. 判断 rcode() 是否为 RCODE_OK,表示服务端创建主题成功。


2. 删除主题(TOPIC_REMOVE

  1. 客户端调用 TopicManager::remove(conn, key),意图删除某主题。

  2. 构造 TopicRequest,设置 optype = TOPIC_REMOVE

  3. 通过 _requestor->send() 发起请求。

  4. 服务端处理后返回响应。

  5. 客户端根据返回的 rcode() 判断删除是否成功。


3. 订阅主题( TOPIC_SUBSCRIBE

  1. 客户端调用 TopicManager::subscribe(conn, key, cb),准备订阅某主题并注册回调。

  2. 首先调用 addSubscribe(key, cb),将回调函数存入 _topic_callbacks设置该主题的回调函数

  3. 构造 TopicRequest,设置 optype = TOPIC_SUBSCRIBE 并发送。

  4. 如果 _requestor->send() 返回失败,则调用 delSubscribe(key) 删除回调

  5. 若响应成功,表示客户端已成为该主题的订阅者,等待推送。


4. 取消订阅主题( TOPIC_CANCEL

  1. 客户端调用 TopicManager::cancel(conn, key),取消订阅某主题。

  2. 调用 delSubscribe(key),从 _topic_callbacks移除对应回调

  3. 构造取消请求 TopicRequest,设置 optype = TOPIC_CANCEL

  4. 使用 _requestor->send() 向服务端发送请求。

  5. 服务端移除该连接的订阅记录,客户端不再收到该主题的消息。


5. 发布消息( TOPIC_PUBLISH

  1. 客户端调用 TopicManager::publish(conn, key, msg),准备发布一条消息。

  2. 构造 TopicRequest,设置 optype = TOPIC_PUBLISH,填入 topic_key = keytopic_msg = msg

  3. 通过 _requestor->send() 发送到服务端。

  4. 服务端查找该主题对应的所有订阅者,逐一推送消息。

  5. 客户端订阅者通过 onPublish() 接收并处理消息。


6. 接收服务端推送(onPublish()

  1. 服务端推送一条 TOPIC_PUBLISH 类型的消息。

  2. 客户端 Dispatcher 触发 TopicManager::onPublish(conn, msg)

  3. 先判断消息的操作类型是否为 发布消息。不是直接返回

  4. msg 中获取 topic_keytopic_msg

  5. 使用 getSubscribe(topic_key)_topic_callbacks 中查找回调函数。

  6. 若回调存在,则执行 callback(topic_key, topic_msg) 处理消息。

  7. 若回调为空,则打印日志说明没有注册处理函数。

总结

我在客户端的 TopicManager 模块里,做了一套发布订阅功能。所有创建、订阅、取消、发布这些操作,底层都通过一个统一的函数 commonRequest 发送请求给服务端

客户端本地维护了一个回调函数的 map,叫 _topic_callbacks,每次订阅主题的时候我就先把回调函数注册进去。服务端如果有消息推过来,onPublish() 就会根据主题名查这个 map,找到回调函数再执行。这样就做到了服务端主动推送,客户端自动执行回调,即异步推送 + 本地响应。

服务端这边也有一个 TopicManager,它是用两个 map 来管理的:一个是 _topics,记录每个主题有哪些订阅者;另一个是 _subscribers,记录每个连接订阅了哪些主题

客户端发来订阅请求,服务端就把这个连接加入对应的主题对象topic中;如果有客户端断开连接onShutdown() 会自动把它从所有主题里清掉,防止内存泄漏。

整体结构比较清晰,客户端负责发请求和处理回调,服务端负责维护主题和转发主题消息。在关键地方也加了 mutex 做线程保护,确保并发安全。

客户端难点:
1.subscribe()回调注册的时机问题

为什么我们要先注册该主题的回调方法再取发送消息订阅主题呢?

因为如果先订阅主题,可能主题立马有新消息,服务端把消息推送过来 客户端onPublish找该主题对应的回调函数就找不到,消息就会丢失。

服务端的难点:

1.断开连接的清理问题:
如果某个客户端断开连接了,我们要从所有主题里把它移除,不然就会占内存还推不到消息。这个是在 onShutdown() 里做的。

2.主题和订阅者之间的双向关系管理:

服务端的难点在于“主题和订阅者”的所有增删操作都必须双向更新。我们既要知道该主题被哪些订阅者订阅了,还要知道订阅者订阅了哪些主题。

所以我在服务端维护了两张表:

一张是 _topics,通过主题名可以找到对应的 Topic 对象,里面记录了所有订阅这个主题的订阅者;

另一张是 _subscribers,通过连接对象可以找到对应的 Subscriber,它记录了这个连接订阅了哪些主题。

这两张表都是通过 key 就能快速定位,不走遍历,查找效率高。

但难点就在于——所有增删操作都必须双向更新。

如果只改一边,不但会导致资源泄漏,还有可能造成消息推送失败、状态不同步等问题。

比如:

  • 客户端订阅或取消某个主题时,我必须同时更新:

    • _topics 里该主题的订阅者集合

    • _subscribers 里该连接的订阅主题列表

  • 删除主题时,要先遍历所有订阅它的订阅者,在每个 Subscriber 里删掉这个主题,再从 _topics 把该主题移除

  • 客户端 连接断开时,也要先拿到它订阅的所有主题,在每个 Topic 中删除该订阅者,最后从 _subscribers 把对应的订阅者删除。

八.RPC 执行与协调模块

client/rpc_router  server/rpc_server client/rpc_client 

rpc_client 发起调用请求 → 由 rpc_server 接收并通过 Dispatcher 分发 → 交由 rpc_router 根据方法名路由到本地函数执行 → 执行结果原路返回给 rpc_client。

一.client/rpc_router 路由分发器

client/rpc_router 

rpc_router 是服务端本地的服务注册管理器 + 路由分发器,负责把客户端发来的调用请求路由给对应的本地业务函数并将其执行结果返回。

1.ServiceDescribe服务描述类

封装一个本地服务的全部信息,包括:

  • 方法名

  • 参数名与类型

  • 返回值类型

  • 实际处理逻辑(callback)

用于注册进路由器,供后续调用。

1.构造函数

2. const std::string & method()

  • 返回方法名称(供注册与查询用)。

3.bool paramCheck(const Json::Value& params)

  • 校验参数完整性和类型

    1. 参数字段是否都存在;

    2. 字段类型是否匹配 VType

  • 错误时打印日志并返回 false

4.bool call(const Json::Value& params, Json::Value& result)

  • 调用注册的业务处理函数(callback)。

  • 调用完后校验返回值类型(rtypeCheck())。

2.SDescribeFactory 构建服务描述对象

 

  • setMethodName():设置方法名;

  • setReturnType():设置返回值类型;

  • setParamsDesc():追加一个参数描述;

  • setCallback():设置业务处理函数;

  • build():构建一个完整的 ServiceDescribe 并返回其指针。

优点

通过DescribeFactory 工厂模式,造完后ServiceDescribe 的成员就不再修改,仅读取 天然线程安全。

如果在ServiceDescribe 设置set(),若多个线程同时调用 setXxx() 方法 会出现线程安全的问题 需要在每个set函数中加锁。

3.ServiceManager服务管理类

线程安全地管理所有已注册的服务。method_name → ServiceDescribe::ptr 进行插入、查找和移除服务。

  • insert()

    • ServiceDescribe 对象插入 _service 哈希表中(key 是方法名)。

  • select()

    • 查询某个服务是否存在;

    • 找不到则返回空指针。

  • remove()

    • 根据方法名从 _service 中移除该服务。

🔒 使用 mutex 实现线程安全,适合并发场景下的服务动态注册/注销。

4. RpcRouter 本地服务路由器

  • 注册所有本地服务;

  • 接收客户端 RPC 请求;

  • 自动调度调用对应的服务函数;

  • 返回结果(成功或失败)给客户端。

1.void onRpcRequest(...)

  • 核心调度函数:服务端接收到一个 RPC 请求后做什么?

  1. 根据方法名查找本地服务;

  2. 校验请求参数是否正确;

  3. 调用本地业务处理函数;

  4. 返回结果给客户端。

2. void registerMethod(...)

  • 注册服务到本地 _service_manager

3.void response(...)

  • 构造一个 RpcResponse 消息对象,设置 id、类型、返回码、结果字段;

  • 通过 conn 发送给客户端。

二.server/rpc_server 服务端

server/rpc_server

它是 客户端发起 RPC 调用的核心组件,负责构造远程请求、发送消息、等待响应并解析结果。

1.RegistryServer 类

这是 服务注册中心的服务端类,处理“注册 / 发现 / 上下线”请求。

1.构造函数

📌 作用:初始化注册中心服务端

  • 创建 _pd_manager:统一管理 Provider + Discoverer

  • 创建 _dispatcher:消息调度器

  • 注册回调:将 ServiceRequest 类型消息交给 _pd_manager->onServiceRequest

  • 创建 TCP Server,并绑定:

    • onMessage() → 消息处理

    • onConnShutdown() → 断连清理

📥 谁会用它?
你的主程序在启动注册中心进程时会创建这个对象。

2.void start()

📌 作用:启动 TCP 服务监听
调用 _server->start() 开始监听传入连接与消息。

2.RpcServer

对外提供 JSON-RPC 服务,负责:

  • 创建 TCP 服务监听客户端请求;

  • 将 RPC 请求分发给 RpcRouter

  • 如果启用了服务注册功能,还会将服务注册到注册中心。

1.构造函数:

1.是否启用服务注册功能如果启用,将创建一个连接注册中心的客户端。

2.绑定 Dispatcher → RpcRouter 的回调函数:

当收到 RPC 请求(MType::REQ_RPC)时,转交 RpcRouter 处理。

3.创建 TCP 服务器并注册消息分发回调:

创建服务端 socket,监听来自客户端的连接和请求。

2.registerMethod() 函数

  • 如果启用了注册中心,将当前方法注册过去;

  • 无论是否注册中心,一定要注册到本地的 RpcRouter,供后续本地调用使用。

_reg_client->registryMethod(method, address)

 远程注册

  • 通知注册中心:“我当前机器可以提供某个方法”

  • 注册中心会记录:method -> host:port

  • 用于服务发现(客户端调用时通过 DiscoveryClient 发现地址)

_router->registerMethod(service)

 本地注册

  • ServiceDescribe 注册进本地服务路由表 _methods_map

  • 当有客户端调用你时,你能路由请求到对应回调函数执行

3.start() 函数

启动 TCP 服务,正式对外提供服务。

3.TopicServer 类 

提供发布订阅服务的支持,负责:

  • 创建 TCP 服务监听客户端连接;

  • 响应 Topic 创建、订阅、发布、取消等操作;

  • 客户端断开时,进行资源清理(取消订阅等)

构造:

  • 绑定 Dispatcher → TopicManager 的回调

    
    
    • 当收到 MType::REQ_TOPIC 消息类型,交给 TopicManager 处理。

  • 创建 TCP 服务端 + 消息处理绑定

    
    
  • 连接断开回调处理(清理订阅信息)

    
    
    • onConnShutdown() 中调用 _topic_manager->onShutdown(conn) 清理对应订阅信息。

三.client/rpc_client.hpp客户端

client/rpc_client 

让用户像调用本地函数一样,方便地调用远程 RPC 服务,同时支持服务发现与主题订阅发布。

1.RegistryClient 类

将当前主机提供的 RPC 方法注册到注册中心。

1.构造函数

  • 创建 _requestor 用于发送请求;

  • 创建 _provider,负责构造并发送“服务注册消息”;

  • 创建 _dispatcher 注册消息响应回调;

  • 通过 ClientFactory::create 创建 TCP 客户端并连接注册中心。

2.服务注册

调用 _provider->registryMethod() 实际向注册中心发送 ServiceRequest

一次服务注册 + 推送通知 + 本地缓存更新全流程调用:

1.服务端启动 注册服务

  • RpcServer::registerMethod()
    👉 向本地 RpcRouter 注册方法,同时调用注册中心:

RegistryClient::registryMethod()
👉 发起服务注册请求。

  • Provider::registryMethod()
    👉 构造 ServiceRequest 消息,通过:

  • Requestor::send()
    👉 将消息发送到注册中心。

2.注册中心处理注册

  • PDManager::onServiceRequest()
    → 调用 ProviderManager::addProvider() 完成服务注册;
    → 调用 DiscovererManager::onProviderAdded() 触发上线通知。

3.客户端接收推送并更新缓存

  • Dispatcher::onMessage()
    👉 收到 SERVICE_ONLINE 推送,交给:

  • Discoverer::onServiceRequest()
    👉 根据推送内容更新本地缓存 _method_hosts[method]

服务端在启动时发起服务注册,调用客户端模块中的 RegistryClient::registryMethod(),再由客户端内部调用 server/rpc_registry 中的 Provider::registryMethod() 构造注册消息并发送给注册中心。

注册中心接收到注册请求后完成服务信息登记,并发送服务上线通知。客户端的 Dispatcher 收到上线消息后,触发已注册的回调函数 client/rpc_registry.hpp 中的 Discoverer::onServiceRequest(),进而更新本地服务地址缓存,实现服务端与客户端的自动同步。

2.DiscoveryClient 类

DiscoveryClient 是客户端用来访问注册中心的“服务发现入口”,可主动查找服务地址,也可被动接收服务上线/下线通知,并自动更新本地缓存。

1.构造函数

  • _requestor:用于消息的发送和响应接收(负责同步 send());

  • _discoverer:服务发现核心逻辑,管理 method → address 缓存;

  • _dispatcher:处理注册中心发来的各种消息,调用已绑定的函数;

  • _client:连接注册中心的 TCP 客户端。

2.服务发现

服务发现流程:

1.进入 RpcClient::call()

客户端在调用远程服务前,用户会先调用 RpcClient::call(),调用链会进入 DiscoveryClient::serviceDiscovery() 来获取目标服务的方法地址。

2.进入 DiscoveryClient::serviceDiscovery()

该函数会继续调用 client/rpc_registry.hpp 中的 Discoverer::serviceDiscovery(),本地没有对应函数,就进行服务发现,构造 ServiceRequest 消息,请求类型设置为 SERVICE_DISCOVERY,然后通过 Requestor::send() 发送消息,同时使用 std::promise 阻塞当前线程,等待服务端响应。

3.进入 PDManager::onServiceRequest()(注册中心)

注册中心在处理服务发现请求时,PDManager::onServiceRequest() 识别出 SERVICE_DISCOVERY 类型的请求,调用 DiscovererManager::addDiscoverer() 新增服务发现者,随后调用 discoverResponse()。

在 discoverResponse() 中,通过 ProviderManager::methodHosts() 获取当前可提供该方法的服务主机地址列表,并封装为 ServiceResponse 应答发送回客户端。

4.在 Requestor::onResponse() 中

Requestor::onResponse() 根据消息 ID 找到之前注册的请求上下文 RequestData,并调用其 promise.set_value(msg) 将响应结果写入,从而唤醒 Discoverer::serviceDiscovery() 中阻塞的 future.get() 调用。

5.再次回到 Discoverer::serviceDiscovery()

线程恢复执行后,继续向下处理:将响应转换为 ServiceResponse 类型,提取服务地址列表,构造 MethodHost,并将其缓存至 _method_hosts[method],最终返回一个可用的地址。

简单流程:

1.客户端在调用远程服务前,会先向注册中心发起服务发现请求。
2.如果本地没有对应的服务地址缓存,客户端会构造请求消息发送到注册中心,并阻塞等待响应。

3.注册中心收到请求后,将客户端登记为该服务的订阅者,然后查询当前有哪些主机提供该服务,并将这些地址打包后发送回客户端。

4.客户端收到响应后,解除等待,提取出服务地址列表,保存到本地缓存中。之后客户端就可以直接从缓存中获取地址,避免重复访问注册中心。

5.如果服务端有新的主机上线或下线,注册中心还会主动推送通知给客户端,客户端自动更新本地缓存,始终保持服务地址的最新状态。

3.RpcClient 类

RpcClient 是客户端发起 RPC 调用的核心类。它支持:

  • 启用/禁用服务发现(通过 enableDiscovery 参数)

  • 提供三种调用方式(同步、异步 future、异步回调);

  • 维护与服务提供者的连接池

  • 自动处理服务下线的连接清理。

1.构造函数

  • 初始化核心模块:RequestorRpcCallerDispatcher

  • 注册 Dispatcher 回调函数,处理 RPC 响应消息。

  • 启用服务发现时,创建 DiscoveryClient

  • 否则直接连接指定服务端,保存为 _rpc_client

2.call() 方法(3 种)

  • 获取连接:通过 getClient(method) 找到或创建连接;

  • 发送 RPC 请求,通过 _caller->call()

  • 不同重载方式对应同步等待、std::future 或回调处理。

3.newClient(const Address &host)

  • 创建新的 BaseClient 连接;

  • 设置消息分发回调;

  • 连接远程主机;

  • 放入 _rpc_clients 连接池。

4.getClient(const std::string &method) 方法

根据是否启用服务发现进行分流:

  1. 启用服务发现:

    • 通过 _discovery_client->serviceDiscovery() 请求注册中心返回服务提供者地址;

    • 尝试从 _rpc_clients 池中取连接;

    • 如果不存在就调用 newClient() 创建;

  2. 未启用服务发现:

    • 直接使用 _rpc_client(单一固定连接)。

难点:
1.异步响应管理(请求发出去后怎么收回来)

异步请求发出后,响应是延迟回来的,要确保:发出请求后,保证最后能收到结果并处理它

1.使用 Requestor 管理一个 rid → promise/callback 的映射;

(rpc_call中RpcCall::send()设置好rid并调用 Requestor::send()调用newDescribe保存请求并send发送消息)

2.在收到响应时,通过 Requestor::onResponse() 根据 rid 找到对应回调;

3.并通过 promise.set_value() 唤醒 future,或直接调用 callback;

核心:requestor管理的每个请求描述对象都有唯一 rid,通过 map<rid, callback> 实现精准回调;

2.为什么构建 RPC 客户端时,我们采用了长连接而非短连接?

主要还是短连接异步处理时管理麻烦,处理不好收不到响应。

短链接,客户端进行完rpc调用就会关闭,后面服务提供者返回结果给客户端,客户端没了收到收到应答的回调函数onResponse   也就不能把结果设置道promise中,上层futrue就不能就绪。

短连接机制(未采用):

  • 每次调用新建一个 rpc client,调用结束即关闭。

  • 优点:

    • 实现简单,按需连接按需释放;

    • 没有资源长期占用问题。

  • 缺点:

    • 每次调用都建立销毁 TCP,性能差

    • 无法支撑高频 RPC 场景;

    • 异步请求不安全:调用结束连接关闭,响应无法回传,导致回调/future卡死。


 长连接机制(当前采用):

  • 调用后不关闭连接,而是将客户端连接放入连接池中复用。

  • 优点:

    • 高性能:连接复用,无需频繁建立关闭;

    • 适用于高并发、低延迟的 RPC 场景;

    • 支持异步调用:连接存在,响应回调稳定可靠。

  • 缺点:

    • 连接池需要维护,如服务下线要自动剔除;

    • 涉及线程安全、失效策略等额外逻辑。


❗ 为什么不使用短连接?

异步调用时短连接存在以下致命问题:

  • 响应收不到:请求发出连接关闭,服务端响应回来找不到 client,对应的 onResponse 回调无法执行;

  • future 无法就绪promise.set_value() 无法触发,业务逻辑卡死;

  • 回调悬空导致崩溃:连接关闭后资源被释放,回调中引用了被销毁对象;

  • 响应丢失难排查:调用失败没有报错,连接断开无日志,bug 难以定位。

长链接流程:

  • 客户端调用 call() 发起调用;

  • 内部通过 DiscoveryClient 服务发现获取提供者地址;

  • 若该地址已在连接池,复用;否则新建连接并添加到连接池;

  • 调用远程方法,服务端响应处理;

  • 客户端通过回调或 future 获取响应;

  • 若服务下线,注册中心推送通知,自动从连接池删除对应连接。

长连接详细流程:


RpcClient::call(...)

客户端发起远程调用请求(同步 / 异步 / 回调均从此入口)。


RpcClient::getClient(const std::string& method)

  • 如果启用服务发现(_enableDiscovery == true):

    • 调用:
      DiscoveryClient::serviceDiscovery(method, host)
      ⬇ 获取该方法对应的服务提供者地址(host)

  • 查找连接池是否已有该地址的连接:
    RpcClient::getClient(const Address& host)

  • 若不存在,则创建新连接:
    RpcClient::newClient(host)

    • 内部调用 ClientFactory::create(ip, port)

    • 调用 BaseClient::connect() 建立连接

    • 注册消息回调,调用 RpcClient::putClient(host, client) 将其添加进 _rpc_clients 连接池。


RpcCaller::call(conn, method, params, ...)

通过连接发送 RPC 请求,底层使用 _requestor->send(),并注册 rid → promise 或 callback 映射。


④ 服务端处理后返回响应

响应到达后,客户端 Dispatcher::onMessage() 分发至:
Requestor::onResponse(conn, msg)
➡ 根据 msg->id() 找到原始请求,设置 promise.set_value() 或调用回调。


⑤ 注册中心推送服务下线通知

当服务提供者下线时:

  • 服务端推送 ServiceRequest 消息,类型为 SERVICE_OFFLINE

  • 客户端 Dispatcher 分发至:
    Discoverer::onServiceRequest(conn, msg)

  • 检测到是下线通知后:

    • _method_hosts[method] 中移除该 host;

    • 调用注册时绑定的回调函数:
      RpcClient::delClient(const Address& host)

  • 在回调中:
    _rpc_clients.erase(host)
    从连接池中删除该地址对应的 BaseClient 对象

4.TopicClient 类

TopicClient 是客户端用于执行发布订阅操作的接口封装类,支持主题的创建、订阅、取消订阅、消息发布和连接管理等功能,负责与发布订阅服务端进行通信并处理主题消息的发送与接收。

1.构造函数

rsp_cb注册响应回调(用于同步请求响应)

msg_cb注册推送回调(用于订阅收到消息)

创建 TCP 客户端;

设置所有消息的统一入口为 Dispatcher::onMessage;

启动连接。

  • Requestor:用于发送请求、接收响应(支持 promise / future 模式);

  • Dispatcher:分发接收到的消息;

  • TopicManager:封装了所有主题相关的客户端操作逻辑。

2.主题创建/删除 订阅主题/取消订阅  发布主题消息 关闭连接

函数名 作用 请求类型 内部处理逻辑简介
create() 创建主题 TOPIC_CREATE 构造请求,发送给服务端,等待创建结果响应
remove() 删除主题 TOPIC_REMOVE 构造请求,发送给服务端,等待删除结果响应
subscribe() 订阅主题并绑定回调 TOPIC_SUBSCRIBE 记录回调函数,构造请求,服务端推送后触发回调
cancel() 取消主题订阅 TOPIC_CANCEL 移除本地回调关系,通知服务端取消订阅
publish() 向主题发布消息 TOPIC_PUBLISH 构造请求,将消息发送到服务端
shutdown() 关闭连接 调用 _rpc_client->shutdown() 释放连接资源

主题创建流程:

1. 客户端发起请求

进入 client::TopicManager::create()

2. 客户端构造请求并发送

此时客户端:

  • 构造了类型为 TOPIC_CREATE 的请求;

  • 使用 Requestor::send() 向服务端发送;

  • 阻塞等待服务端返回应答(future 模式)。

3. 服务端 Dispatcher 分发请求

服务端接收到客户端发来的 REQ_TOPIC 消息,Dispatcher 触发:

TopicManager::onTopicRequest(conn, msg)

4. 服务端执行主题创建

  • 服务端创建了一个 Topic 对象;

  • 加入 _topics 容器;

  • 支持多个客户端共享该主题。

5. 服务端返回响应

6. 客户端接收响应并判断结果

服务端响应返回后,客户端 Dispatcher 分发 

Dispatcher::onMessage() → Requestor::onResponse()

Requestor::onResponse() 中:

  • 根据请求 ID 找到原始请求的 promise

  • 调用 set_value(msg),唤醒等待的 future.get()

返回 commonRequest() 继续执行:

主题创建

客户端发起主题创建请求后,由 TopicClient::create() 调用 client::TopicManager::create(),在其中构造 TopicRequest 请求并通过 Requestor::send() 发送给服务端,消息类型为 MType::REQ_TOPIC,操作类型为 TOPIC_CREATE。

服务端接收到请求后,由 Dispatcher 分发至 server::TopicManager::onTopicRequest()。该函数识别请求类型为 TOPIC_CREATE,随后调用 topicCreate() 创建对应的主题对象,并将其加入 _topics 管理容器中。

主题创建完成后,服务端调用 topicResponse() 构造 TopicResponse 应答消息,设置响应类型为 MType::RSP_TOPIC,返回码为 RCODE_OK,通过连接发送给客户端。

客户端收到应答后,Dispatcher 将响应交由 Requestor::onResponse() 处理,唤醒之前阻塞等待的 future。随后客户端从响应中确认操作成功,完成整个主题创建流程。

主题发布:

客户端希望向某个主题发布一条消息时,会调用 TopicClient::publish() 接口,传入主题名称和消息内容。该接口会进一步调用 client::TopicManager::publish(),构造一个 TopicRequest 请求,类型为 TOPIC_PUBLISH,填入主题名与消息内容。

构造完成后,客户端通过 Requestor::send() 向服务端同步发送该发布请求,并阻塞等待服务端的确认应答。

服务端接收到该发布请求后,由 Dispatcher 将其分发至 server::TopicManager::onTopicRequest(),识别请求类型为 TOPIC_PUBLISH,随后执行消息推送逻辑。

服务端首先根据请求中的主题名称在 _topics 映射中查找对应的 Topic 对象,如果未找到则返回错误响应。如果找到了对应主题,服务端将该消息对象广播给所有订阅了该主题的客户端订阅者。广播操作通过调用该 Topic 的 pushMessage() 方法实现,该方法会遍历订阅者列表,逐一调用它们对应连接的 conn->send(),将这条主题消息主动推送到客户端。

推送完成后,服务端构造 TopicResponse 应答消息,标记发布成功,并通过连接返回给发布者客户端。

客户端接收到响应后,由 Dispatcher 分发至 Requestor::onResponse(),唤醒等待流程,确认服务端已成功接收并处理发布请求,流程结束。

项目总结

模块编号 模块名称 主要文件 功能说明
公共抽象模块 abstract.hpp, fields.hpp, detail.hpp 定义接口、字段常量、基础结构体等
消息协议模块 message.hpp 定义 Json-RPC 消息格式(请求/响应)
网络通信模块 net.hpp 封装 TCP 通信接口、连接、缓冲区
Dispatcher 分发模块 dispatcher.hpp 将收到的消息派发到对应回调处理器
RPC 调用工具模块 requestor.hpp, rpc_caller.hpp 管理请求-响应关系,支持异步处理
注册与发现模块 rpc_registry.hpp(Client/Server) 服务注册、发现、上线/下线通知
发布订阅模块 rpc_topic.hpp(Client/Server) 支持创建主题、订阅/取消订阅、推送消息
RPC 执行主控模块 rpc_client.hpp, rpc_server.hpp, rpc_router.hpp 核心入口:封装客户端/服务端启动逻辑

核心功能与机制

🔷 服务注册与发现(注册中心 RegistryServer)

  • 服务提供者启动时向注册中心注册服务名与地址;

  • 客户端调用前,向注册中心查询服务提供者地址;

  • 注册中心支持服务上线/下线通知(推送机制);

  • 客户端维护 method → host 的本地缓存;

  • 实现 RpcClient 连接池,服务下线时可自动移除失效连接。


🔷 RPC 调用支持同步与异步(支持未来与回调)

  • 每个请求分配全局唯一 rid

  • 使用 Requestor 管理 rid → promise/callback

  • 支持同步(阻塞)与异步(future/callback)多种调用方式;

  • 响应回来由 Dispatcher 派发给 Requestor::onResponse()

  • 可复用连接,避免频繁建立 TCP。


🔷 发布订阅机制(Topic)

  • 支持主题创建、删除、订阅、取消订阅;

  • 服务端 TopicManager 维护 TopicSubscriber 映射;

  • 客户端收到服务端主动推送消息后,执行本地回调;

  • 支持断开连接时清理订阅者信息。

难点:

难点 问题简述 解决方式 涉及类
类型安全回调派发 消息是基类指针,需安全转为具体类型再调用回调 模板注册回调 + dynamic_pointer_cast Dispatcher
异步响应生命周期管理 回调对象提前释放,future 无法就绪 shared_ptr 捕获 RequestData 延长生命周期 Requestor
异步响应匹配 响应找不到对应请求,callback 无法触发 使用 rid → 请求描述对象 映射精准匹配响应 Requestor
为什么选用长连接 短连接断开后响应收不到,future 卡死、回调失效 长连接 + 连接池复用 + 服务下线自动清理连接 RpcClient

网站公告

今日签到

点亮在社区的每一天
去签到