C++ 搭建一个双向多线程的GRPC通信服务框架

发布于:2025-02-10 ⋅ 阅读:(42) ⋅ 点赞:(0)

功能点

  • 双向通信:即指程序既有客服端又有服务端,以处理复杂的需求
  • 客户端信息线程处理:程序客户端发出某个请求后,应开辟其他线程处理,防止等待消息的时候主程序卡死
  • 心跳机制:若是下位的客服端一直向服务端推送消息(没有就单独设置一个心跳函数),那么可以监听这个函数,若是多少秒内没有收到消息即为断链
  • protobufgrpccmakelist编译与链接

服务端

  • 在继承grpc服务的同时,我们也继承qt的qobject这样可以使用信号与槽的机制让其他类的到信息
  • 若是不是使用qt用不了信号机制,那么可以使用观察与订阅的设计模式去监听信息变化
class GrpcServer final : public QObject, public Scheduling::HMISend::Service
{
    Q_OBJECT
public:
    GrpcServer();
    /// 系统信息
    grpc::Status SystemStatus(grpc::ServerContext *context, const SystemStatusReq *request,
                              SystemStatusResp *response);
signals:
    void systemStatusReceived(const QString &str);
};
grpc::Status GrpcServer::SystemStatus(grpc::ServerContext *context, const SystemStatusReq *request, SystemStatusResp *response)
{
    QString json_info = QString::fromStdString(request->json_info());
    emit systemStatusReceived(json_info);
    return Status::OK;
}


客户端

  • 客户端比较简单,实例一个类,定义好你在proto中的函数并调用对应的函数就行,同时客服端也是可以处理服务端返回的消息的
    std::pair<bool, std::string> GrpcClient::ScheduleTask(int32_t task, const std::string &cardata)
    {
        SchedulingRequest request;
        request.set_task(task);
        request.set_carbon_data(cardata);
    
        SchedulingResponse response;
        ClientContext context;
    
        Status status = stub_->Scheduling(&context, request, &response);
    
        std::pair<bool, std::string> result;
        if (status.ok())
        {
            if (response.abnormal() == "abnormal")
            {
                result.first = false;
            }
            else if (response.abnormal() == "normal")
            {
                result.first = true;
            }
            result.second = response.json_info();
        }
        else
        {
            result.first = false;
            result.second = "";
        }
        return result;
    }
    

服务端线程

  • 可继承QThread,将服务端加入其中,一直监听下位机的信息。注意程序终止后对线程的释放,和当grpc为正常链接时的异常处理
    ServerThread::ServerThread(QSharedPointer<GrpcServer> service, QObject *parent)
        : QThread(parent), m_service(service)
    {
    }
    
    void ServerThread::run()
    {
        std::string server_address("0.0.0.0:50055");
        ServerBuilder builder;
        builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
        builder.RegisterService(m_service.data());
        std::unique_ptr<Server> server;
        try
        {
            server = builder.BuildAndStart();
            if (!server)
            {
                qWarning() << "GRPC服务未启动!请检查IP或者端口是否正确!";
                return;
            }
        }
        catch (const std::exception &e)
        {
            qWarning() << "异常: " << e.what();
            return;
        }
        exec();
        if (server)
        {
            server->Shutdown();
            server->Wait();
        }
    }
    
    ServerThread::~ServerThread()
    {
        if (isRunning())
        {
            quit();
            wait();
        }
    }
    
    

客户端线程

  • 将客户端将其加入其中进行管理,每当有需求,比如点击一个按钮时,就创建一个线程去处理这个需求,再利用信号机制,来及时处理返回信息
    void GrpcTaskHandler::executeTask(int32_t task, const std::string &cardata)
    {
        QThread *thread = QThread::create([=]()
                                          {
            auto result = m_client->ScheduleTask(task,cardata);
            emit taskFinished(task, result); });
        thread->start();
        connect(thread, &QThread::finished, thread, &QThread::deleteLater);
    }
    

心跳机制

  • 创建一个计时器,定义一个时间,每次服务端调用某个函数时更新这个时间,达到心跳检测机制
    void Widget::initUI()
    {
            // 心跳检测
            QTimer *timer2 = new QTimer(this);
            connect(timer2, &QTimer::timeout, this, &Widget::checkPlcStatusDataCallback);
            timer2->start(2000);
            // 初始化最后一次调用时间
            lastCallbackTime = QDateTime::currentDateTime();
    }
    
    void Widget::checkPlcStatusDataCallback()
    {
            QDateTime currentTime = QDateTime::currentDateTime();
            int elapsed = lastCallbackTime.msecsTo(currentTime);
            // 检查是否在过去的一段时间内回调函数被调用
            bool g = elapsed > 1800; // 不能为2000的原因是计时器有点误差存在
            if (!g)
            {
                    ///
            }
            else
            {
                    ///
            }
    }
    

服务创建

  • 建议客户端服务端都使用智能指针管理,客服端可加入一个保活。

    void Widget::initServerClint()
    {
            auto portIP = "127.0.0.1:50052";
            grpc::ChannelArguments channel_args;
            channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 10000);          // 10秒,发起 KeepAlive ping 的间隔
            channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 5000);        // 5秒,等待 KeepAlive 响应的超时时间
            channel_args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); // 即使没有活跃的 RPC,也允许发送 KeepAlive ping
            auto channel = grpc::CreateCustomChannel(portIP, grpc::InsecureChannelCredentials(), channel_args);
            m_grpcTaskHandler = new GrpcTaskHandler(std::shared_ptr<GrpcClient>(new GrpcClient(channel)), this);
            m_services = QSharedPointer<GrpcServer>(new GrpcServer());
            m_thread = QSharedPointer<ServerThread>(new ServerThread(m_services, this));
    
            // 连接信号和槽
            connect(m_services.data(), &GrpcServer::systemStatusReceived,
                    [=]()
                    { lastCallbackTime = QDateTime::currentDateTime(); });
            connect(m_services.data(), &GrpcServer::logInfoReceived,
                    [=]() {});
            connect(m_services.data(), &GrpcServer::logInfoReceived,
                    [=]() {});
            connect(m_services.data(), &GrpcServer::autoCalibReceived,
                    [=]() {});
            connect(m_grpcTaskHandler, &GrpcTaskHandler::taskFinished, this, &Widget::handleToolMaintenanceResult);
            // 启动服务
            m_thread->start();
    }
    
    

    总结

    • 知识理应共享,源码在此
    • 这个proto文件的cmakelist编译还是挺难写的需要用到官方的grpc.cmake
      然后在cmakelist中还需要在实现一个function函数,有兴趣的可以自己琢磨
    • 上述通信服务的框架应该是比较完善的了,够应付一般的场景了。
    • protogrpc的安装,可以看我的这篇文章.

网站公告

今日签到

点亮在社区的每一天
去签到