2505C++,py和go调用雅兰亭库的协程工具

发布于:2025-05-17 ⋅ 阅读:(22) ⋅ 点赞:(0)

原文

神算调用C++

一般调用pybind11封装的C++库实现神算调用C++库,pybind11封装c++的接口很简单.

创建一个py_example.cpp文件

#include <pybind11/pybind11.h>
#include <string>
namespace py = pybind11;
PYBIND11_MODULE(py_example, m) {
  m.def("hello", [] {
    return std::string("hello");
  });
}

通过cmake去编译成动态库:

pybind11_add_module(py_example py_example/py_example.cpp)

神算调用pybind11封装的你好函数:

import py_example
print(py_example.hello())

通过pybind11,几行代码就可实现神算调用c++了.
如果要同步调用coro_http_client还是很简单的,封装一个函数就完了.
但是如果要异步调用,事情就不是则简单了,coro_httpcoro_rpc都是协程异步的,如何把C++``协程神算协程结合起来却不容易,当前通过回调函数神算asynio未来来实现的.

每次调用C++协程函数之前创建一个未来和设置一个回调函数,当协程返回时设置未来,神算侧则等待未来.
神算调用coro_http_client为例:

#include <pybind11/pybind11.h>
#include <string>
#include <ylt/coro_http/coro_http_client.hpp>
#include <ylt/coro_io/client_pool.hpp>
namespace py = pybind11;
class caller {
 public:
  caller(py::function callback) : callback_(std::move(callback)) {}
  void async_get(std::string url) {
    static auto pool =
        coro_io::client_pool<coro_http::coro_http_client>::create(url);
    pool->send_request([this, url](coro_http::coro_http_client &client) -> async_simple::coro::Lazy<void> {
          auto result = co_await client.async_get(url);
          py::gil_scoped_acquire acquire;
          callback_(result.resp_body);
        })
        .start([](auto &&) {
        });
  }
 private:
  py::function callback_;
};

PYBIND11_MODULE(py_example, m) {
  py::class_<caller>(m, "caller")
      .def(py::init<py::function>())
      .def("async_get", &caller::async_get,
           py::call_guard<py::gil_scoped_release>());
}

pybind11不支持封装一个协程函数,因此需要通过一个普通函数去调用协程,即使用回调方式调用协程,在协程回调函数中设置神算未来.
通过一个调用者类来实现调用C++``协程.

创建调用者类时传入一个神算回调函数,在async_get方法中通过回调方式调用client_pool中的send_request``协程,并在其回调函数co_await客户的请求,请求结束后设置回调函数.

注意的是神算GIL锁,调用回调函数时需要加锁.

为何需要一个调用者对象来保存回调函数,而不是按函数的参数传入呢回调函数?试过,但因为回调函数的生命期的问题,不好解决,在内部的λ移动或拷贝它,总会出现pybind11对象释放的错误.
调用者里放置,由神算去管理调用者回调生命期就没问题了.

神算侧,通过协程去调用coro_http_client有点麻烦了,需要创建一个未来并设置调用者回调函数,并在回调函数中设置未来的值:

import asyncio
import py_example
async def async_get(url):
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    def cpp_callback(message):
        def set_result():
            future.set_result(message)
        loop.call_soon_threadsafe(set_result)
    caller = py_example.caller(cpp_callback)
    caller.async_get(url)
    result = await future
    print(result)

async def main():
    await async_get("http://taobao.com")

if __name__ == "__main__":
    asyncio.run(main())

虽然有点复杂,但工作了,如果大家有更好的神算``协程C++20``协程结合的方法也请留言告诉我.

go调用C++

相比较而言,因为go协程用起来比较方便,而且go更期望调用的是同步接口,因为go总可在go协程中执行同步方法,因此用go来封装coro_rpc``简单一些.

先按c接口封装coro_rpc,然后通过cgo去调用这些c接口,等待异步函数结束时,放进go协程中即可.

封装coro_http_server的c接口

创建和结束coro_http_server

extern void *start_rpc_server(char *addr, int parallel);
extern void stop_rpc_server(void *server);

注册rpc函数和回调

coro_rpc_server需要注册一个,需要根据用户自己的需求来实现的rpc函数,来提供rpc服务,比如有一个加载rpc服务:

inline void ylt_load_service(coro_rpc::context<void> context, uint64_t req_id) {
    //做`CPP`业务
}

在创建rpcserver后,注册该rpc函数:

void *start_rpc_server(char *addr, int parallel) {
  auto server = std::make_unique<coro_rpc::coro_rpc_server>( parallel, addr, std::chrono::seconds(600));
  server->register_handler<ylt_load_service>();
    //注册`rpc`函数
  auto res = server->async_start();
  if (res.hasResult()) {
    ELOG_ERROR << "start server failed";
    return nullptr;
  }
  return server.release();
}

这里注意,该rpc函数需要转到go那里去执行go业务逻辑,而不是在c++这一侧去写业务逻辑,因此需要在go中回调.

extern void load_service(void *ctx, uint64_t req_id);
inline void ylt_load_service(coro_rpc::context<void> context, uint64_t req_id) {
  auto ctx_ptr = std::make_unique<coro_rpc::context<void>>(std::move(context));
  load_service(ctx_ptr.release(), req_id);
}

注意该只有声明的load_service,就是通过它回调到go的,具体实现go那里,所以这里只有个声明,cgo支持这样.

另外一个细节是,为何使用coro_rpc::context呢?因为是rpc函数是回调到go那里处理的,go那里何时处理完业务,c++这边是不知道的,所以需要该环境go那里处理完业务逻辑之后发送rpc响应.

go实现的load_service:

    //导出`load_service`
func load_service(ctx unsafe.Pointer, req_id C.uint64_t) {
    //fmt.Println("load req_id", req_id);
    //go代码
  if(req_id == 2) {
    var err_msg = C.CString("error from server")
    C.response_error(ctx, 1001, err_msg)
    C.free(unsafe.Pointer(err_msg))
    return;
  }
    //响应`RPC`结果
  var promise = C.response_msg(ctx, ((*C.char)(unsafe.Pointer(&g_resp_buf[0]))), C.uint64_t(len(g_resp_buf)));
  go func(p unsafe.Pointer, buf []byte) {
    result := C.wait_response_finish(p);
    if(result.ec > 0) {
      fmt.Println(result.ec, C.GoString(result.err_msg))
      C.free(unsafe.Pointer(result.err_msg))
    }
    //现在释放`缓冲`
  }(promise, g_resp_buf)
}

因为response_msg异步发送消息的,go这边需要稍微等待一下,等发送完成之后再清理发送数据等资源.这里是通过一个go协程来等待发送完成.

封装coro_http_client的c接口

extern void *create_client_pool(char *addr, int req_timeout_sec);
extern void free_client_pool(void *pool);
extern rpc_result load(void *pool, uint64_t req_id, char *dest, uint64_t dest_len);

客户端的封装简单一些,创建clientpool,释放clientpool,并调用rpcload的接口.看一下调用的实现:

rpc_result load(void *pool, uint64_t req_id, char *dest, uint64_t dest_len) {
  using namespace async_simple::coro;
  rpc_result ret{};
  auto ld = (coro_io::load_balancer<coro_rpc::coro_rpc_client> *)pool;
  auto lazy = [&]() -> Lazy<void> {
    auto result =
        co_await ld->send_request([&](coro_rpc::coro_rpc_client &client, std::string_view hostname) -> Lazy<void> {
          client.set_resp_attachment_buf(std::span<char>(dest, dest_len));
          auto result = co_await client.call<ylt_load_service>(req_id);
          if (!result) {
            set_rpc_result(ret, result.error());
            co_return;
          }
          if (!client.is_resp_attachment_in_external_buf()) {
            set_rpc_result(
                ret, coro_rpc::rpc_error(coro_rpc::errc::message_too_large));
            co_return;
          }
          ret.len = client.get_resp_attachment().size();
        });
  };
  syncAwait(lazy());
  return ret;
}

通过clientpool发送请求,内部同步等待rpc调用结束,go那里则可在一个go协程中放置该加载调用.

这里展示一个如何实现rpc请求零拷贝的场景:

client.set_resp_attachment_buf(std::span<char>(dest, dest_len));

设置附件是为了零拷贝,附件的内存可能是来自于go那里的一块很大的内存,并在附件中放置,coro_rpc_client发送请求时不会去序化它的,而是直接发送到服务器.

go那里调用加载比较简单了:

func test_client(host string, len int) {
  peer := C.CString(host)
  pool := C.create_client_pool(peer, C.int(30))
  outbuf := make([]byte, len)
  for i := 0; i < 3; i++ {
    req_id := uint64(i)
    result := C.load(pool, C.uint64_t(req_id), ((*C.char)(unsafe.Pointer(&outbuf[0]))), C.uint64_t(len))
    if(result.ec > 0) {
      fmt.Println("error: ", result.ec, C.GoString(result.err_msg))
      C.free(unsafe.Pointer(result.err_msg))
    }else {
      fmt.Println("result: ", string(outbuf[0:result.len]), "len", result.len)
    }
  }
  C.free_client_pool(pool)
  C.free(unsafe.Pointer(peer))
}
test_client("0.0.0.0:8806", 1024)

网站公告

今日签到

点亮在社区的每一天
去签到