std::experimental::future 中的延续链(Continuation Chain)详解
延续链是 C++ 异步编程中处理多阶段异步操作的强大机制,它允许将多个异步任务串联起来,前一个任务的结果自动作为后一个任务的输入。这种链式编程模型避免了回调地狱,使异步代码更清晰、更易维护。
一、延续链的核心概念
延续(Continuation)
一个函数或可调用对象,在异步任务完成后自动执行。通过then()
方法将延续添加到future
上:future.then(continuation);
延续链
多个延续的串联调用,形成异步操作链。每个then()
调用返回一个新的future
,代表当前延续的执行结果:future.then(cont1).then(cont2).then(cont3);
二、延续链的工作原理
依赖关系
后续延续依赖于前序延续的完成,形成线性执行流:异步任务1 → 延续1 → 延续2 → 延续3 → ...
结果传递
前序延续的结果(或异常)自动传递给后续延续:- 若前序延续返回值
T
,后续延续的参数类型为future<T>
- 若前序延续抛出异常,后续延续会收到携带该异常的
future
- 若前序延续返回值
线程调度
延续的执行线程由实现决定(通常是线程池),用户无需手动管理线程。
三、延续链代码示例
#include <experimental/future>
#include <iostream>
#include <thread>
#include <string>
#include <vector>
// 模拟异步获取用户ID
std::experimental::future<int> get_user_id() {
std::experimental::promise<int> p;
auto fut = p.get_future();
std::thread([p = std::move(p)]() mutable {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "获取用户ID: 1234" << std::endl;
p.set_value(1234);
}).detach();
return fut;
}
// 模拟异步获取用户信息(依赖用户ID)
std::experimental::future<std::string> get_user_info(int user_id) {
std::experimental::promise<std::string> p;
auto fut = p.get_future();
std::thread([p = std::move(p), user_id]() mutable {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "获取用户ID " << user_id << " 的信息" << std::endl;
p.set_value("User: " + std::to_string(user_id) + ", Name: Alice");
}).detach();
return fut;
}
// 模拟异步处理用户信息(依赖用户信息)
std::experimental::future<void> process_user_info(std::string info) {
std::experimental::promise<void> p;
auto fut = p.get_future();
std::thread([p = std::move(p), info]() mutable {
std::cout << "处理用户信息: " << info << std::endl;
p.set_value();
}).detach();
return fut;
}
int main() {
std::cout << "开始异步操作链...\n" << std::endl;
// 构建延续链
auto chain = get_user_id()
.then([](std::experimental::future<int> id_fut) {
try {
int user_id = id_fut.get();
std::cout << "延续1: 用户ID已获取,继续获取用户信息" << std::endl;
return get_user_info(user_id);
} catch (const std::exception& e) {
std::cout << "错误: " << e.what() << std::endl;
throw; // 重新抛出异常,传递给后续延续
}
})
.then([](std::experimental::future<std::string> info_fut) {
try {
std::string user_info = info_fut.get();
std::cout << "延续2: 用户信息已获取,继续处理信息" << std::endl;
return process_user_info(user_info);
} catch (const std::exception& e) {
std::cout << "错误: " << e.what() << std::endl;
throw;
}
})
.then([]() {
std::cout << "\n延续3: 所有操作完成" << std::endl;
});
// 等待链完成(实际应用中可使用其他等待方式)
chain.wait();
std::cout << "\n主程序结束" << std::endl;
return 0;
}
四、延续链的执行流程
- 初始异步任务:
get_user_id()
返回future<int>
- 延续1:获取用户ID后,调用
get_user_info()
- 延续2:获取用户信息后,调用
process_user_info()
- 延续3:处理完成后,执行最终回调
输出结果:
开始异步操作链...
获取用户ID: 1234
延续1: 用户ID已获取,继续获取用户信息
获取用户ID 1234 的信息
延续2: 用户信息已获取,继续处理信息
处理用户信息: User: 1234, Name: Alice
延续3: 所有操作完成
主程序结束
五、异常处理在延续链中的表现
异常传递
若某一延续抛出异常,后续延续会收到携带该异常的future
:auto faulty_chain = std::experimental::make_ready_future(42) .then([](auto fut) { throw std::runtime_error("模拟错误"); return fut.get() * 2; }) .then([](auto fut) { try { int value = fut.get(); // 这里会抛出前序延续的异常 return value + 10; } catch (const std::exception& e) { std::cout << "捕获异常: " << e.what() << std::endl; return -1; // 异常处理后返回默认值 } }); faulty_chain.wait(); // 输出:捕获异常: 模拟错误
异常处理最佳实践
- 在每个延续中使用
try-catch
处理可能的异常 - 若不处理异常,它会传播到链的末端
- 可通过
then
专门处理异常(C++20 引入catch_exception
等专用接口)
- 在每个延续中使用
六、延续链的优势
代码可读性
避免回调嵌套(回调地狱),异步操作按顺序书写,类似同步代码。结果自动传递
无需手动管理future
的依赖关系,前序结果自动作为后续输入。非阻塞编程
主线程无需等待异步操作完成,延续在后台线程执行。异常透明
异常沿链传播,处理逻辑集中,便于调试。
七、C++20 中的延续增强
C++20 对异步编程模型进行了标准化,引入 std::future
的延续支持(此前为实验性),并提供更强大的接口:
then
重载
支持直接接收值而非future
,简化常见场景:std::future<int> fut = ...; auto new_fut = fut.then([](int value) { return value * 2; });
并行延续
通过when_all
/when_any
处理多个future
的组合结果:auto fut1 = std::async(do_task1); auto fut2 = std::async(do_task2); auto combined = std::when_all(fut1, fut2) .then([](auto results) { int val1 = results.get()[0]; int val2 = results.get()[1]; return val1 + val2; });
异常专用处理
使用catch_exception
专门处理异常:auto chain = get_data() .then(process_data) .catch_exception([](std::exception_ptr ep) { try { std::rethrow_exception(ep); } catch (const std::exception& e) { std::cout << "全局异常处理: " << e.what() << std::endl; } });
总结
延续链是 C++ 异步编程的核心机制,它通过链式调用将多个异步任务串联起来,实现了非阻塞、高可读性的异步代码。从实验性的 std::experimental::future
到 C++20 标准化的 std::future
,延续机制不断完善,成为处理异步操作依赖关系的首选方案。在网络请求、数据处理、并行计算等场景中,延续链能显著提升代码的可维护性和执行效率。
大规模数据并行处理与std::experimental::when_all
的优化应用
这段内容讨论了在处理大量可独立处理的数据时,如何利用异步任务并行计算,并通过std::experimental::when_all
优化结果收集过程。以下是关键要点的解读:
一、传统并行处理的问题与实现
1. 场景描述
- 处理大量可独立处理的数据项
- 每个数据块由异步任务处理,通过
future
返回结果 - 需要等待所有任务完成并收集结果进行最终处理
2. 传统实现方式
std::future<FinalResult> process_data(std::vector<MyData>& vec) {
size_t const chunk_size = whatever;
std::vector<std::future<ChunkResult>> results;
// 1. 分割数据并启动异步任务处理每个块
for (auto begin = vec.begin(), end = vec.end(); begin != end;) {
size_t const remaining_size = end - begin;
size_t const this_chunk_size = std::min(remaining_size, chunk_size);
results.push_back(
std::async(process_chunk, begin, begin + this_chunk_size));
begin += this_chunk_size;
}
// 2. 启动新的异步任务等待并收集所有结果
return std::async([all_results = std::move(results)]() {
std::vector<ChunkResult> v;
v.reserve(all_results.size());
for (auto& f : all_results) {
v.push_back(f.get()); // 逐个等待结果
}
return gather_results(v);
});
}
3. 传统实现的缺陷
- 线程资源浪费:收集结果的任务需逐个等待
future
,导致线程长时间阻塞 - 上下文切换开销:每个
future
就绪时触发线程唤醒和再次睡眠 - 调度效率低:线程在等待过程中反复被调度,增加系统开销
二、std::experimental::when_all
的优化方案
1. 核心优势
- 一次性等待所有
future
就绪,避免逐个等待的开销 - 返回一个新
future
,仅在所有输入future
就绪时触发 - 可结合延续链(continuation)实现非阻塞结果收集
2. 优化后的实现
#include <experimental/future>
#include <vector>
std::experimental::future<FinalResult> process_data_optimized(std::vector<MyData>& vec) {
size_t const chunk_size = whatever;
std::vector<std::experimental::future<ChunkResult>> results;
// 1. 分割数据并启动异步任务(与传统方式相同)
for (auto begin = vec.begin(), end = vec.end(); begin != end;) {
size_t const remaining_size = end - begin;
size_t const this_chunk_size = std::min(remaining_size, chunk_size);
results.push_back(
std::experimental::async(process_chunk, begin, begin + this_chunk_size));
begin += this_chunk_size;
}
// 2. 使用 when_all 等待所有 future 就绪
return std::experimental::when_all(results.begin(), results.end())
.then([](std::experimental::future<std::vector<std::experimental::future<ChunkResult>>> all_futures) {
try {
// 获取所有已就绪的 future 结果
auto chunk_results = all_futures.get();
std::vector<ChunkResult> v;
v.reserve(chunk_results.size());
// 提取每个块的处理结果
for (auto& f : chunk_results) {
v.push_back(f.get());
}
// 执行最终结果收集
return gather_results(v);
} catch (const std::exception& e) {
// 处理可能的异常(如某个任务失败)
throw std::runtime_error("数据处理失败: " + std::string(e.what()));
}
});
}
3. 执行流程对比
传统方式 | when_all 方式 |
---|---|
启动N个任务处理数据块,再启动1个任务收集结果 | 启动N个任务,通过when_all等待并自动触发收集 |
收集任务逐个等待future,多次阻塞/唤醒 | 仅在所有future就绪时触发一次回调 |
线程在等待过程中持续占用资源 | 等待过程不占用线程,结果就绪时调度 |
三、std::experimental::when_all
的工作原理
输入与输出
- 输入:一组
future
对象(可通过迭代器范围或初始化列表传递) - 输出:一个新的
future
,其值为std::vector<InputFuture::value_type>
- 输入:一组
就绪条件
当且仅当所有输入future
都就绪时,when_all
返回的future
才会就绪。异常处理
- 若任意输入
future
包含异常,when_all
的future
也会包含该异常 - 异常传播至后续延续函数,可集中处理
- 若任意输入
非阻塞特性
when_all
的等待过程不阻塞线程,仅在所有future
就绪时通过延续机制触发回调。
四、优化方案的优势
资源利用率提升
- 避免专门的线程用于等待,释放线程资源处理其他任务
- 减少上下文切换次数,降低系统开销
代码简洁性
- 无需手动管理任务等待逻辑,通过延续链实现异步结果收集
- 异常处理逻辑集中,代码结构更清晰
调度效率优化
- 仅在所有任务完成时触发一次回调,避免重复调度
- 充分利用线程池资源,提升整体吞吐量
五、扩展:处理大量任务的最佳实践
1. 结合线程池控制并发度
std::experimental::future<FinalResult> process_large_data(std::vector<MyData>& vec, size_t max_concurrent) {
std::vector<std::experimental::future<ChunkResult>> results;
std::vector<std::experimental::future<ChunkResult>> current_tasks;
auto process_chunk_range = [&](auto begin, auto end) {
for (; begin != end && current_tasks.size() < max_concurrent; ++begin) {
current_tasks.push_back(
std::experimental::async(process_chunk, begin, begin + 1));
}
// 等待部分任务完成以释放并发槽位
if (!current_tasks.empty()) {
auto done = std::experimental::when_all(current_tasks.begin(), current_tasks.end());
auto completed = done.then([](auto fut) {
return fut.get();
});
// 将完成的任务结果转移到最终结果集
results.insert(results.end(),
completed.get().begin(), completed.get().end());
current_tasks.clear();
}
};
// 分批次处理数据,控制并发任务数量
process_chunk_range(vec.begin(), vec.end());
// 等待所有剩余任务完成
if (!current_tasks.empty()) {
auto done = std::experimental::when_all(current_tasks.begin(), current_tasks.end());
results.insert(results.end(),
done.then([](auto fut) { return fut.get(); }).get().begin(),
done.then([](auto fut) { return fut.get(); }).get().end());
}
// 执行最终结果收集
return std::experimental::when_all(results.begin(), results.end())
.then([](auto fut) {
auto all_results = fut.get();
return gather_results(all_results);
});
}
2. 处理异常与取消操作
// 带异常处理和取消标记的版本
std::experimental::future<FinalResult> process_data_with_cancellation(
std::vector<MyData>& vec, std::experimental::cancel_token token) {
std::vector<std::experimental::future<ChunkResult>> results;
for (auto begin = vec.begin(), end = vec.end(); begin != end && !token.cancelled();) {
// 启动任务时传递取消标记
results.push_back(
std::experimental::async(
[](auto b, auto e, std::experimental::cancel_token tok) {
return process_chunk(b, e, tok);
},
begin, begin + chunk_size, token));
begin += chunk_size;
}
if (token.cancelled()) {
// 若已取消,返回已取消的 future
std::experimental::promise<FinalResult> p;
p.set_cancelled();
return p.get_future();
}
return std::experimental::when_all(results.begin(), results.end())
.then([](auto fut) {
try {
auto chunk_results = fut.get();
return gather_results(chunk_results);
} catch (const std::experimental::future_errc& e) {
if (e == std::experimental::future_errc::cancelled) {
throw std::runtime_error("数据处理已取消");
}
throw;
}
});
}
六、总结
std::experimental::when_all
是处理多异步任务结果收集的关键工具,它通过以下方式优化传统方案:
- 一次性等待:避免逐个等待
future
的开销 - 非阻塞调度:仅在所有任务完成时触发回调,释放线程资源
- 异常统一处理:集中处理所有任务的异常情况
在处理大规模可并行数据时,结合when_all
和延续链能显著提升代码的执行效率和可维护性,是现代C++异步编程的重要技术手段。
std::experimental::when_any
机制详解
1. 基本概念与作用
std::experimental::when_any
是 C++ 并发编程中用于处理异步任务的重要工具,其核心功能是:
- 接收一组
future
对象作为输入 - 返回一个新的
future
,当任意一个输入的future
完成时,该新future
即变为就绪状态 - 新
future
中存储的是一个std::tuple
,包含:- 已完成
future
的索引位置 - 所有输入
future
的移动后副本(未完成的future
仍保持可等待状态)
- 已完成
与 when_all
(等待所有任务完成)不同,when_any
适用于以下场景:
- 只需要第一个完成的任务结果
- 希望通过某个任务的完成触发后续操作
- 需要处理任务超时或失败的优先级问题
2. 核心特性与实现逻辑
- 非阻塞特性:
when_any
不会阻塞当前线程,而是通过延续(continuation)机制异步处理结果 - 资源管理:返回的
future
会自动管理输入future
的生命周期,避免资源泄漏 - 异常处理:若任意输入
future
抛出异常,when_any
返回的future
会将异常封装其中 - 线程调度:由实现决定线程池的调度策略,通常会复用异步任务的线程资源
3. 代码示例:使用 when_any
处理异步任务
下面通过一个网络请求场景演示 when_any
的实际应用:
#include <experimental/future>
#include <experimental/promise>
#include <vector>
#include <string>
#include <iostream>
#include <thread>
#include <chrono>
// 模拟网络请求函数,返回future<string>
std::experimental::future<std::string> make_network_request(int id, int delay_ms) {
// 创建promise用于设置future结果
auto promise = std::experimental::make_ready_promise<std::string>();
// 在新线程中模拟网络请求延迟
std::thread([promise = std::move(promise), id, delay_ms]() {
std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
try {
if (delay_ms > 1000) {
// 模拟请求失败
throw std::runtime_error("Request timeout");
}
promise.set_value("Response from request " + std::to_string(id));
} catch (...) {
promise.set_exception(std::current_exception());
}
}).detach();
return promise.get_future();
}
int main() {
// 创建多个异步网络请求
std::vector<std::experimental::future<std::string>> requests;
requests.push_back(make_network_request(1, 500)); // 500ms后完成
requests.push_back(make_network_request(2, 1500)); // 1500ms后完成
requests.push_back(make_network_request(3, 800)); // 800ms后完成
std::cout << "All requests started, waiting for the first response...\n";
// 使用when_any等待任意一个请求完成
auto when_any_fut = std::experimental::when_any(requests.begin(), requests.end());
// 添加延续函数处理第一个完成的请求
when_any_fut.then([](std::experimental::future<std::tuple<std::ptrdiff_t,
std::vector<std::experimental::future<std::string>>>> fut) {
try {
// 获取when_any的结果
auto [index, remaining_futures] = fut.get();
// 处理第一个完成的请求结果
std::string result = remaining_futures[index].get();
std::cout << "First response received (index " << index << "):\n";
std::cout << result << std::endl;
// 处理剩余的请求(可选)
std::cout << "\nRemaining requests (" << remaining_futures.size() - 1 << " left):\n";
for (size_t i = 0; i < remaining_futures.size(); ++i) {
if (i != static_cast<size_t>(index)) {
try {
std::string res = remaining_futures[i].get();
std::cout << " Request " << i << " result: " << res << std::endl;
} catch (const std::exception& e) {
std::cout << " Request " << i << " failed: " << e.what() << std::endl;
}
}
}
} catch (const std::exception& e) {
std::cout << "Error in when_any continuation: " << e.what() << std::endl;
}
});
// 模拟主线程继续执行其他任务
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "Main thread finished other tasks.\n";
return 0;
}
4. 执行流程解析
- 任务创建:发起3个网络请求,延迟分别为500ms、1500ms、800ms
- when_any调用:将3个
future
传入when_any
,返回新的future
- 延续链注册:通过
then()
注册回调函数,当任意请求完成时触发 - 结果处理:
- 首先获取完成任务的索引和剩余
future
列表 - 处理第一个完成的任务结果
- 可选:处理剩余任务(等待或取消)
- 首先获取完成任务的索引和剩余
- 异常处理:若任务抛出异常,会被封装在
future
中并在get()
时重新抛出
5. 应用场景
- 超时控制:设置一个超时任务与实际任务并行,通过
when_any
优先处理完成的任务 - 优先级任务:多个任务中优先处理高优先级任务的结果
- 资源优化:当只需要第一个结果时,避免等待所有任务完成造成的资源浪费
- 事件驱动:某个事件(如用户输入)触发后,取消其他正在等待的异步任务
6. 与when_all的对比
特性 | when_any | when_all |
---|---|---|
等待条件 | 任意一个任务完成 | 所有任务完成 |
返回future就绪时机 | 第一个任务完成时 | 所有任务完成时 |
结果类型 | tuple<索引, 剩余future列表> | tuple<所有任务结果> |
适用场景 | 优先处理首个结果、超时控制 | 汇总所有结果、批量处理 |
资源消耗 | 较低(无需等待所有任务) | 较高(需等待所有任务完成) |
7. 注意事项
- 线程安全:
when_any
本身是线程安全的,但输入的future
需确保在多线程环境中的正确使用 - 移动语义:传入
when_any
的future
会被移动,原future
会变为无效 - 异常传播:若所有输入
future
都抛出异常,when_any
返回的future
会封装第一个异常 - C++标准状态:
std::experimental
属于实验性接口,可能在未来标准中调整
通过 when_any
机制,开发者可以更灵活地处理异步任务,尤其是在需要响应最快完成任务或处理任务优先级的场景中,相比传统的轮询或阻塞等待方式,能显著提升程序的响应性和资源利用率。