基于libhv实现的TCP Client & Server支持同步,异步传输 (C++11)

发布于:2025-07-28 ⋅ 阅读:(16) ⋅ 点赞:(0)

基于libhv开源库实现的TCP Client & Server sample, TCP Client支持同步和异步数据传输, 同步使用C++11的future特性实现.

libhv介绍

Like libevent, libev, and libuv, libhv provides event-loop with non-blocking IO and timer, but simpler api and richer protocols.

✨ Features

  • Cross-platform (Linux, Windows, macOS, Android, iOS, BSD, Solaris)
  • High-performance EventLoop (IO, timer, idle, custom, signal)
  • TCP/UDP client/server/proxy
  • TCP supports heartbeat, reconnect, upstream, MultiThread-safe write and close, etc.
  • Built-in common unpacking modes (FixedLength, Delimiter, LengthField)
  • RUDP support: WITH_KCP
  • SSL/TLS support: (via WITH_OPENSSL or WITH_GNUTLS or WITH_MBEDTLS)
  • HTTP client/server (support https http1/x http2 grpc)
  • HTTP supports static service, indexof service, forward/reverse proxy service, sync/async API handler
  • HTTP supports RESTful, router, middleware, keep-alive, chunked, SSE, etc.
  • WebSocket client/server
  • MQTT client

异步数据传输实现机制

RpcMessage HTcpClient::SendSyncMessage(RpcMessage& rpc_msg,
                                       const int64_t timeout) {
  if (connect_state_ != kConnected) {
    LOG(ERROR) << "HTcpClient::SendMessage() - not connected!";
    return RpcMessage();
  }
  std::shared_ptr<MessageFuture> message_future =
      std::make_shared<MessageFuture>();
  message_future->timeout_ = timeout;
  message_future->request_ = rpc_msg;
  rpc_msg.request_id_ = counter_.GetAndIncrement();
  {
    std::lock_guard<std::mutex> lock(mutex_);
    map_.insert(std::make_pair(rpc_msg.request_id_, message_future));
  }
  SendMessage(rpc_msg);
  try {
    return message_future->Get(timeout);
  } catch (TimeoutException& e) {
    LOG(ERROR) << "HTcpClient::SendSyncMessage() - timeout exception, timeout: "
               << timeout;
    // remove the request from the map
    std::lock_guard<std::mutex> lock(mutex_);
    size_t erased_count = map_.erase(rpc_msg.request_id_);
    LOG(INFO) << "HTcpClient::SendSyncMessage() - erased_count: "
              << erased_count;
    return RpcMessage();
  }
}

HTcpClient 类的成员函数 SendSyncMessage,用于同步发送 RPC 消息并等待响应。
首先,函数检查客户端的连接状态 connect_state_ 是否为已连接状态。如果未连接,则记录错误日志并返回一个空的 RpcMessage 对象。
然后,创建一个 MessageFuture 对象,并设置其超时时间和请求消息。接着,使用 counter_.GetAndIncrement() 为请求消息生成一个唯一的请求 ID。
在一个受互斥锁 mutex_ 保护的代码块中,将请求 ID 和 MessageFuture 对象插入到映射 map_ 中。
然后,调用 SendMessage 方法发送请求消息。
在尝试获取响应时,调用 message_future->Get(timeout) 方法。如果在指定的超时时间内未收到响应,则捕获 TimeoutException 并记录错误日志。然后,再次锁住互斥量并从映射中移除对应的请求 ID,记录被移除的条目数量。
最终,函数返回一个空的 RpcMessage 对象,表示请求失败或超时。

MessageFuture Get实现

RpcMessage MessageFuture::Get(int64_t timeout) {
  std::future<RpcMessage> future = promise_.get_future();
  std::future_status status =
      future.wait_for(std::chrono::milliseconds(timeout));
  if (status == std::future_status::ready) {
    return future.get();
  } else if (status == std::future_status::timeout) {
    // throw timeout exception
    throw TimeoutException("timeout exception");
  }
}

MessageFuture 类的成员函数 Get,用于在指定的超时时间内获取 RpcMessage 对象。
首先,函数通过 promise_.get_future() 获取一个 std::future 对象 future。std::promise 和 std::future 是 C++ 标准库中的同步机制,用于在线程间传递结果。promise_ 是一个 std::promise 对象,它的 get_future 方法返回一个与之关联的 std::future 对象。
接下来,函数调用 future.wait_for(std::chrono::milliseconds(timeout)),等待指定的超时时间(以毫秒为单位)。wait_for 方法返回一个 std::future_status 枚举值,表示 future 的状态。可能的状态包括 std::future_status::ready(表示结果已准备好)和 std::future_status::timeout(表示等待超时)。
如果 status 等于 std::future_status::ready,则调用 future.get() 获取 RpcMessage 对象并返回。get 方法会阻塞当前线程,直到结果可用,并返回存储在 future 中的值。
如果 status 等于 std::future_status::timeout,则抛出一个 TimeoutException 异常,表示等待超时。异常消息为 “timeout exception”。
通过这种方式,Get 函数能够在指定的超时时间内等待并获取 RpcMessage 对象,如果超时则抛出异常。

使用事项

static void onConnection(const TcpClient::TSocketChannelPtr& channel) {
  if (channel->isConnected()) {
    LOG(INFO) << "connected";
    RpcMessage msg;
    msg.command_ = 100;
    msg.request_id_ = counter.GetAndIncrement();
    msg.payload_ = "hello";
    msg.length_ = msg.payload_.size();
    // client.SendMessage(msg);
    std::thread([channel, &msg] {
      LOG(INFO) << "SendSyncMessage Start";
      RpcMessage ret = client.SendSyncMessage(msg, 10000);
      LOG(INFO) << "SendSyncMessage END" << " request_id: " << ret.request_id_
                << " payload: " << ret.payload_;

    }).detach();
  } else {
    LOG(INFO) << "disconnected";
  }
}

同步传输

为什么放在另外的线程中处理?

TCPClient自身会有EventLoopThread, 是在此线程中运行, 而SendAsyncMessage会阻塞此线程, 因此具体数据的处理都放在其它线程中处理; 类似于Linux驱动开发中的中断处理, 分为上半部和下半部.

Reference

基于libhv实现的TCP client和TCP server,支持同步和异步, 帮我点点Star

TCP Client & Server Sample


网站公告

今日签到

点亮在社区的每一天
去签到