2507C++,C++协程的发送者

发布于:2025-07-26 ⋅ 阅读:(12) ⋅ 点赞:(0)

原文

为什么C++协程时需要发送者?
这一切都太复杂了!

所以把它具体化.这里展示如何把一个生硬的旧C风格的异步API引入发送者的世界,及你可能想要这样的原因.

C(经典)异步API

过去,我做了很多Win32编程.Win32多种异步模型待选,但最简单的是好的回调.异步IOAPI(如ReadFileEx)的形状如下:

    //带回调的旧式异步`CAPI/`(如`Win32`的`ReadFileEx)`
struct overlapped {
    //...操作系统`内部结构`在此...
};
using overlapped_callback =
  void(int status, int bytes, overlapped* user);
int read_file(FILE*, char* buffer, int bytes,
              overlapped* user, overlapped_callback* cb);

协议非常简单:传入一般的参数加上两个额外的参数来调用read_file:"覆盖"结构的指针和回调.操作系统将使用覆盖的结构,但用户也可在其中填充数据,这样回调可稍后使用它.像这样:

struct my_overlapped : overlapped {
    //...额外`数据`在此......
};
void my_callback(int status, int bytes, overlapped* data) {
  auto* my_data = static_cast<my_overlapped*>(data);
    //...使用放入`"my_overlapped"`对象中的额外内容......
  delete my_data;
    //清理
}
void enqueue_read(FILE* pfile) {
    //分配和初化`my_data...`
  auto* my_data =
    new my_overlapped{{}, /*`数据`在此*/ };
  int status =
    read_file(pfile, buff, bytes, my_data, my_callback);
    //...
}

原理:read_file导致操作系统排队IO操作,并保存覆盖(overlapped)overlapped_callback指针.IO完成后,OS传入覆盖结构指针并调用回调.

我已写过数百次此代码了.你可能也有.

很简单.它有效.为什么要让它更复杂,对吧?

C(混乱)异步API

回调API没有任何问题.问题是,每个公开异步的库都使用略有不同的回调API.如果想链接两个不同库的两个异步操作,需要编写一堆胶水代码来按另一个异步抽象映射该异步抽象.
这是巴别塔问题.

解决方法是让C++标准认可一个异步抽象.然后,所有公开异步的库都可按标准抽象映射它们的抽象,这样都可相互通信.

这就是C++标准化委员会对它感兴趣的原因.实际上,只能通过标准来解决的问题.

C(组合)异步API

因此引入发送者.它们很高效,结构化,且是组合的!,这里展示一些代码,让代码说话.
如果查看上面的read_file,API,可识别出一些不同部分:
1,分配异步操作期望的任何资源.
2,操作时,数据必须在稳定的地址(即覆盖结构),
3,初化将异步IO排入队列的异步操作,及处理异常,
4,异步操作完成后执行用户提供的延续(即回调).
5,回收第1步中分配的任何资源.

发送者也有所有这些部件,但形状统一,因此可通用地使用它们.事实上,发送者和C风格API之间唯一区别是,发送者中不是一个回调,而是三个:成功,失败和取消各一个.

第1步:分配

重新构想的read_fileAPI将如下:

read_file_sender
  async_read_file(FILE* file, char* buffer, int size)
  {
    return {file, buffer, size};
  }

工作是将参数放入发送器形状对象中,该对象如下:

namespace stdex = std::execution;
struct read_file_sender
{
  using sender_concept = stdex::sender_t;
    //`(1)`
  using completion_signatures =
    //`(2)`
    stdex::completion_signatures<
      stdex::set_value_t( int, char* ),
      stdex::set_error_t( int ) >;
  auto connect( stdex::receiver auto rcvr )
    //`(3)`
  {
    return read_file_operation{{}, {}, pfile, buffer, size, std::move(rcvr)};
  }
  FILE* pfile;
  char* buffer;
  int size;
};

发送者的工作是描述异步操作.(它也是操作状态的工厂,但这是第2步.在标有"(1)"的行上,按发送者声明此类型.在标有"(2)"的行上,声明了完成此异步操作的方式.
使用函数类型列表来完成:

stdex::set_value_t( int, char* )

…声明此异步操作可通过将char*传递给值回调来成功完成.记住,有三个回调.还有该:

stdex::set_error_t( int )

…声明此异步操作可能会通过向错误回调传递来在错误中完成.如果此异步操作可取消的,它将使用stdex::set_stopped_t()声明.

第2步:数据

在上面标有"(3)"的行上,连接,成员函数接受"接收者"并返回"操作状态".接收者三个回调的合并:值,错误和停止(大致是已取消).

连接发送者和接收者的结果是操作状态.操作状态,与CAPI中的覆盖结构一样,是异步操作数据.它必须保存在持续时间内的稳定的地址.

连接函数返回一个read_file_operation对象.连接调用者负责确保保活此对象,且在执行其中一个回调前不会移动.
read_file_operation类型如下:

struct immovable {
  immovable() = default;
  immovable(immovable&&) = delete;
};
template <class Receiver>
struct read_file_operation : overlapped, immovable
    //`(1)`
{
  static void _callback(int status, int bytes,
    //`(2)`
                overlapped* data)
  {
    auto* op =
      static_cast<read_file_operation*>(data);
    //`(3)`
    if (status == OK)
      stdex::set_value(std::move(op->rcvr),
    //`(4)`
                       bytes, op->buffer);
    else
      stdex::set_error(std::move(op->rcvr), status);
  }
  void start() noexcept
    //`(5)`
  {
    int status =
      read_file(pfile, buffer, size, this, &_callback);
    if (status != OK)
      stdex::set_error(std::move(rcvr), status);
  }
  FILE* pfile;
  char* buffer;
  int size;
  Receiver rcvr;
};

操作状态存储初化异步操作期望参数接收者(三个回调).按行分析一下.

1,"(1)":操作状态覆盖继承,因此可把它的指针传递进read_file.它还从不可移动(immovable)的结构继承.虽然不是绝对必要的,但这可确保不会意外移动操作状态.

2,"(2)":按类静态函数定义传递给read_fileoverlapped_callback.

3,"(3)":在回调中,将覆盖的指针下转至指向read_file_operation对象的指针.

4,"(4)":在回调中,检查状态,看看是否成功完成操作,并适当调用接收者set_valueset_error.
5,"(5)":在必须要有所有操作状态start()函数中,实际初化读操作.如果初化失败,因为不会执行回调,会立即将错误传递给接收者.

第3步:初化

注意,当调用发送者版本的async_read_file函数时,只是在构造一个发送者.并未开始实际工作.
然后用接收者调用连接并返回操作状态,但仍没有开始任何工作.

只是刚刚排好队,确保一切都在稳定的地址,这样可开始工作.在操作状态调用.start()之前,不会初化工作.只有这样,才会调用C风格的read_fileAPI,从而排队IO操作.

一旦开始构建发送者管道和任务图,所有这些就很重要.把初化工作与构建操作状态分开,可按一个包含整个任务图期望的所有数据的状态,聚集大量操作状态,从而在开始任何工作前将所有内容旋转到位.

即可仅使用单个动态分配或有时不分配,就启动大量有复杂依赖的异步工作.

必须说明,当我说连接的调用者直到执行其中一个回调,要在稳定地址保持操作状态时,我有点胡言乱语.

只有在调用.start()后,这才会变成现实.只要尚未调用.start(),将发送者连接到接收者,然后在地板上删除操作状态是完全可接受的.
但是调用.start()后,你就承诺了..start()发射火箭.没有退路了.

好的,已构造了操作状态,并在其上调用了.start().

第4步:延续

操作系统发挥其IO神奇.时间流逝.完成IO后,它将用状态码,覆盖结构指针(read_file_operation),调用_callback函数,如果成功,读取了字节数.

_callback将完成信息传递给连接到发送者的接收者,圆圈完成.

但是等等,"第5步:释放"呢从一开始就没有真正分配过任何东西!连接函数按值返回操作状态.由连接的调用者,无论是谁,来保活它.

可通过在堆上放置它来完成,此时,他们负责清理它.或,如果此异步操作任务图的一部分,他们可通过在更大的状态中聚集操作状态来完成.

第6步:获利!

此时,你会问,这一切的意义何在.发送者和接收者,麻烦的生命期期望操作状态,连接,初化,三个不同回调,谁想要管理所有这些?CAPI简单得多.是真!
则,为什么我对这一切如此激动?
async_read_file的调用者不需要关心这些.

终端用户,即async_read_file的调用者,不关心接收者操作状态.他们只是在协程等待发送者.如下代码使用stdexec库中的协程任务类型.

exec::task< std::string > process_file( FILE* pfile )
{
  std::string str;
  str.resize( MAX_BUFFER );
 
  auto [bytes, buff] =
    co_await async_read_file(pfile, str.data(), str.size());
 
  str.resize( bytes );
  co_return str;
}

这有什么神奇?写了一个发送者,而不是一个可等待,对吧,但这是工作代码!

这是可从标准异步模型编程中受益的地方.通用代码,无论是来自标准库还是来自第三方库,都将与发送者一起使用.

上例,stdexec库中的task<>类型知道如何等待任何像发送者的东西.如果你有发送者,则无需执行任何额外工作,即可co_await它.

针对常见异步模式,P2300还附带了一小部分通用异步算法,如链接(then),动态选择下个任务(let_value),按(when_all)分组发送者,及阻止直到发送者完成(sync_wait).

可以肯定的是,这是一个很简单的集合,但它会随着未来的标准而增长.随着第三方库开始带该模型,越来越多的异步代码将协同工作.

为什么要使用它?

你想用发送者,因为可用其他库通用算法异步操作与其他库中的其他操作拼接在一起.因此,你可无需编写额外的代码行,在协程co_await异步操作.

为什么C++协程时需要发送者?

我承认实现发送者比使用普通C风格回调更复杂.但是使用发送者与输入co_await一样简单,或像传递参数给sync_wait()此类算法一样简单.

选入发送者就是选入一个不断增长可重用代码的生态系统.
因此发送者很有趣.
毕竟,在发送者中包装read_file不难.


网站公告

今日签到

点亮在社区的每一天
去签到