std::experimental::future和延续链

发布于:2025-07-06 ⋅ 阅读:(16) ⋅ 点赞:(0)

std::experimental::future 中的延续链(Continuation Chain)详解

延续链是 C++ 异步编程中处理多阶段异步操作的强大机制,它允许将多个异步任务串联起来,前一个任务的结果自动作为后一个任务的输入。这种链式编程模型避免了回调地狱,使异步代码更清晰、更易维护。

一、延续链的核心概念

  1. 延续(Continuation)
    一个函数或可调用对象,在异步任务完成后自动执行。通过 then() 方法将延续添加到 future 上:

    future.then(continuation);
    
  2. 延续链
    多个延续的串联调用,形成异步操作链。每个 then() 调用返回一个新的 future,代表当前延续的执行结果:

    future.then(cont1).then(cont2).then(cont3);
    

二、延续链的工作原理

  1. 依赖关系
    后续延续依赖于前序延续的完成,形成线性执行流:

    异步任务1 → 延续1 → 延续2 → 延续3 → ...
    
  2. 结果传递
    前序延续的结果(或异常)自动传递给后续延续:

    • 若前序延续返回值 T,后续延续的参数类型为 future<T>
    • 若前序延续抛出异常,后续延续会收到携带该异常的 future
  3. 线程调度
    延续的执行线程由实现决定(通常是线程池),用户无需手动管理线程。

三、延续链代码示例

#include <experimental/future>
#include <iostream>
#include <thread>
#include <string>
#include <vector>

// 模拟异步获取用户ID
std::experimental::future<int> get_user_id() {
    std::experimental::promise<int> p;
    auto fut = p.get_future();
    
    std::thread([p = std::move(p)]() mutable {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::cout << "获取用户ID: 1234" << std::endl;
        p.set_value(1234);
    }).detach();
    
    return fut;
}

// 模拟异步获取用户信息(依赖用户ID)
std::experimental::future<std::string> get_user_info(int user_id) {
    std::experimental::promise<std::string> p;
    auto fut = p.get_future();
    
    std::thread([p = std::move(p), user_id]() mutable {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::cout << "获取用户ID " << user_id << " 的信息" << std::endl;
        p.set_value("User: " + std::to_string(user_id) + ", Name: Alice");
    }).detach();
    
    return fut;
}

// 模拟异步处理用户信息(依赖用户信息)
std::experimental::future<void> process_user_info(std::string info) {
    std::experimental::promise<void> p;
    auto fut = p.get_future();
    
    std::thread([p = std::move(p), info]() mutable {
        std::cout << "处理用户信息: " << info << std::endl;
        p.set_value();
    }).detach();
    
    return fut;
}

int main() {
    std::cout << "开始异步操作链...\n" << std::endl;
    
    // 构建延续链
    auto chain = get_user_id()
        .then([](std::experimental::future<int> id_fut) {
            try {
                int user_id = id_fut.get();
                std::cout << "延续1: 用户ID已获取,继续获取用户信息" << std::endl;
                return get_user_info(user_id);
            } catch (const std::exception& e) {
                std::cout << "错误: " << e.what() << std::endl;
                throw; // 重新抛出异常,传递给后续延续
            }
        })
        .then([](std::experimental::future<std::string> info_fut) {
            try {
                std::string user_info = info_fut.get();
                std::cout << "延续2: 用户信息已获取,继续处理信息" << std::endl;
                return process_user_info(user_info);
            } catch (const std::exception& e) {
                std::cout << "错误: " << e.what() << std::endl;
                throw;
            }
        })
        .then([]() {
            std::cout << "\n延续3: 所有操作完成" << std::endl;
        });
    
    // 等待链完成(实际应用中可使用其他等待方式)
    chain.wait();
    
    std::cout << "\n主程序结束" << std::endl;
    return 0;
}

四、延续链的执行流程

  1. 初始异步任务get_user_id() 返回 future<int>
  2. 延续1:获取用户ID后,调用 get_user_info()
  3. 延续2:获取用户信息后,调用 process_user_info()
  4. 延续3:处理完成后,执行最终回调

输出结果

开始异步操作链...

获取用户ID: 1234
延续1: 用户ID已获取,继续获取用户信息
获取用户ID 1234 的信息
延续2: 用户信息已获取,继续处理信息
处理用户信息: User: 1234, Name: Alice

延续3: 所有操作完成

主程序结束

五、异常处理在延续链中的表现

  1. 异常传递
    若某一延续抛出异常,后续延续会收到携带该异常的 future

    auto faulty_chain = std::experimental::make_ready_future(42)
        .then([](auto fut) {
            throw std::runtime_error("模拟错误");
            return fut.get() * 2;
        })
        .then([](auto fut) {
            try {
                int value = fut.get(); // 这里会抛出前序延续的异常
                return value + 10;
            } catch (const std::exception& e) {
                std::cout << "捕获异常: " << e.what() << std::endl;
                return -1; // 异常处理后返回默认值
            }
        });
        
    faulty_chain.wait(); // 输出:捕获异常: 模拟错误
    
  2. 异常处理最佳实践

    • 在每个延续中使用 try-catch 处理可能的异常
    • 若不处理异常,它会传播到链的末端
    • 可通过 then 专门处理异常(C++20 引入 catch_exception 等专用接口)

六、延续链的优势

  1. 代码可读性
    避免回调嵌套(回调地狱),异步操作按顺序书写,类似同步代码。

  2. 结果自动传递
    无需手动管理 future 的依赖关系,前序结果自动作为后续输入。

  3. 非阻塞编程
    主线程无需等待异步操作完成,延续在后台线程执行。

  4. 异常透明
    异常沿链传播,处理逻辑集中,便于调试。

七、C++20 中的延续增强

C++20 对异步编程模型进行了标准化,引入 std::future 的延续支持(此前为实验性),并提供更强大的接口:

  1. then 重载
    支持直接接收值而非 future,简化常见场景:

    std::future<int> fut = ...;
    auto new_fut = fut.then([](int value) { return value * 2; });
    
  2. 并行延续
    通过 when_all/when_any 处理多个 future 的组合结果:

    auto fut1 = std::async(do_task1);
    auto fut2 = std::async(do_task2);
    
    auto combined = std::when_all(fut1, fut2)
        .then([](auto results) {
            int val1 = results.get()[0];
            int val2 = results.get()[1];
            return val1 + val2;
        });
    
  3. 异常专用处理
    使用 catch_exception 专门处理异常:

    auto chain = get_data()
        .then(process_data)
        .catch_exception([](std::exception_ptr ep) {
            try {
                std::rethrow_exception(ep);
            } catch (const std::exception& e) {
                std::cout << "全局异常处理: " << e.what() << std::endl;
            }
        });
    

总结

延续链是 C++ 异步编程的核心机制,它通过链式调用将多个异步任务串联起来,实现了非阻塞、高可读性的异步代码。从实验性的 std::experimental::future 到 C++20 标准化的 std::future,延续机制不断完善,成为处理异步操作依赖关系的首选方案。在网络请求、数据处理、并行计算等场景中,延续链能显著提升代码的可维护性和执行效率。

大规模数据并行处理与std::experimental::when_all的优化应用

这段内容讨论了在处理大量可独立处理的数据时,如何利用异步任务并行计算,并通过std::experimental::when_all优化结果收集过程。以下是关键要点的解读:

一、传统并行处理的问题与实现

1. 场景描述
  • 处理大量可独立处理的数据项
  • 每个数据块由异步任务处理,通过future返回结果
  • 需要等待所有任务完成并收集结果进行最终处理
2. 传统实现方式
std::future<FinalResult> process_data(std::vector<MyData>& vec) {
    size_t const chunk_size = whatever;
    std::vector<std::future<ChunkResult>> results;
    
    // 1. 分割数据并启动异步任务处理每个块
    for (auto begin = vec.begin(), end = vec.end(); begin != end;) {
        size_t const remaining_size = end - begin;
        size_t const this_chunk_size = std::min(remaining_size, chunk_size);
        results.push_back(
            std::async(process_chunk, begin, begin + this_chunk_size));
        begin += this_chunk_size;
    }
    
    // 2. 启动新的异步任务等待并收集所有结果
    return std::async([all_results = std::move(results)]() {
        std::vector<ChunkResult> v;
        v.reserve(all_results.size());
        for (auto& f : all_results) {
            v.push_back(f.get()); // 逐个等待结果
        }
        return gather_results(v);
    });
}
3. 传统实现的缺陷
  1. 线程资源浪费:收集结果的任务需逐个等待future,导致线程长时间阻塞
  2. 上下文切换开销:每个future就绪时触发线程唤醒和再次睡眠
  3. 调度效率低:线程在等待过程中反复被调度,增加系统开销

二、std::experimental::when_all的优化方案

1. 核心优势
  • 一次性等待所有future就绪,避免逐个等待的开销
  • 返回一个新future,仅在所有输入future就绪时触发
  • 可结合延续链(continuation)实现非阻塞结果收集
2. 优化后的实现
#include <experimental/future>
#include <vector>

std::experimental::future<FinalResult> process_data_optimized(std::vector<MyData>& vec) {
    size_t const chunk_size = whatever;
    std::vector<std::experimental::future<ChunkResult>> results;
    
    // 1. 分割数据并启动异步任务(与传统方式相同)
    for (auto begin = vec.begin(), end = vec.end(); begin != end;) {
        size_t const remaining_size = end - begin;
        size_t const this_chunk_size = std::min(remaining_size, chunk_size);
        results.push_back(
            std::experimental::async(process_chunk, begin, begin + this_chunk_size));
        begin += this_chunk_size;
    }
    
    // 2. 使用 when_all 等待所有 future 就绪
    return std::experimental::when_all(results.begin(), results.end())
        .then([](std::experimental::future<std::vector<std::experimental::future<ChunkResult>>> all_futures) {
            try {
                // 获取所有已就绪的 future 结果
                auto chunk_results = all_futures.get();
                std::vector<ChunkResult> v;
                v.reserve(chunk_results.size());
                
                // 提取每个块的处理结果
                for (auto& f : chunk_results) {
                    v.push_back(f.get());
                }
                
                // 执行最终结果收集
                return gather_results(v);
            } catch (const std::exception& e) {
                // 处理可能的异常(如某个任务失败)
                throw std::runtime_error("数据处理失败: " + std::string(e.what()));
            }
        });
}
3. 执行流程对比
传统方式 when_all 方式
启动N个任务处理数据块,再启动1个任务收集结果 启动N个任务,通过when_all等待并自动触发收集
收集任务逐个等待future,多次阻塞/唤醒 仅在所有future就绪时触发一次回调
线程在等待过程中持续占用资源 等待过程不占用线程,结果就绪时调度

三、std::experimental::when_all的工作原理

  1. 输入与输出

    • 输入:一组future对象(可通过迭代器范围或初始化列表传递)
    • 输出:一个新的future,其值为std::vector<InputFuture::value_type>
  2. 就绪条件
    当且仅当所有输入future都就绪时,when_all返回的future才会就绪。

  3. 异常处理

    • 若任意输入future包含异常,when_allfuture也会包含该异常
    • 异常传播至后续延续函数,可集中处理
  4. 非阻塞特性
    when_all的等待过程不阻塞线程,仅在所有future就绪时通过延续机制触发回调。

四、优化方案的优势

  1. 资源利用率提升

    • 避免专门的线程用于等待,释放线程资源处理其他任务
    • 减少上下文切换次数,降低系统开销
  2. 代码简洁性

    • 无需手动管理任务等待逻辑,通过延续链实现异步结果收集
    • 异常处理逻辑集中,代码结构更清晰
  3. 调度效率优化

    • 仅在所有任务完成时触发一次回调,避免重复调度
    • 充分利用线程池资源,提升整体吞吐量

五、扩展:处理大量任务的最佳实践

1. 结合线程池控制并发度
std::experimental::future<FinalResult> process_large_data(std::vector<MyData>& vec, size_t max_concurrent) {
    std::vector<std::experimental::future<ChunkResult>> results;
    std::vector<std::experimental::future<ChunkResult>> current_tasks;
    
    auto process_chunk_range = [&](auto begin, auto end) {
        for (; begin != end && current_tasks.size() < max_concurrent; ++begin) {
            current_tasks.push_back(
                std::experimental::async(process_chunk, begin, begin + 1));
        }
        
        // 等待部分任务完成以释放并发槽位
        if (!current_tasks.empty()) {
            auto done = std::experimental::when_all(current_tasks.begin(), current_tasks.end());
            auto completed = done.then([](auto fut) {
                return fut.get();
            });
            
            // 将完成的任务结果转移到最终结果集
            results.insert(results.end(), 
                completed.get().begin(), completed.get().end());
            current_tasks.clear();
        }
    };
    
    // 分批次处理数据,控制并发任务数量
    process_chunk_range(vec.begin(), vec.end());
    
    // 等待所有剩余任务完成
    if (!current_tasks.empty()) {
        auto done = std::experimental::when_all(current_tasks.begin(), current_tasks.end());
        results.insert(results.end(), 
            done.then([](auto fut) { return fut.get(); }).get().begin(),
            done.then([](auto fut) { return fut.get(); }).get().end());
    }
    
    // 执行最终结果收集
    return std::experimental::when_all(results.begin(), results.end())
        .then([](auto fut) {
            auto all_results = fut.get();
            return gather_results(all_results);
        });
}
2. 处理异常与取消操作
// 带异常处理和取消标记的版本
std::experimental::future<FinalResult> process_data_with_cancellation(
    std::vector<MyData>& vec, std::experimental::cancel_token token) {
    
    std::vector<std::experimental::future<ChunkResult>> results;
    
    for (auto begin = vec.begin(), end = vec.end(); begin != end && !token.cancelled();) {
        // 启动任务时传递取消标记
        results.push_back(
            std::experimental::async(
                [](auto b, auto e, std::experimental::cancel_token tok) {
                    return process_chunk(b, e, tok);
                },
                begin, begin + chunk_size, token));
        begin += chunk_size;
    }
    
    if (token.cancelled()) {
        // 若已取消,返回已取消的 future
        std::experimental::promise<FinalResult> p;
        p.set_cancelled();
        return p.get_future();
    }
    
    return std::experimental::when_all(results.begin(), results.end())
        .then([](auto fut) {
            try {
                auto chunk_results = fut.get();
                return gather_results(chunk_results);
            } catch (const std::experimental::future_errc& e) {
                if (e == std::experimental::future_errc::cancelled) {
                    throw std::runtime_error("数据处理已取消");
                }
                throw;
            }
        });
}

六、总结

std::experimental::when_all是处理多异步任务结果收集的关键工具,它通过以下方式优化传统方案:

  1. 一次性等待:避免逐个等待future的开销
  2. 非阻塞调度:仅在所有任务完成时触发回调,释放线程资源
  3. 异常统一处理:集中处理所有任务的异常情况

在处理大规模可并行数据时,结合when_all和延续链能显著提升代码的执行效率和可维护性,是现代C++异步编程的重要技术手段。

std::experimental::when_any 机制详解

1. 基本概念与作用

std::experimental::when_any 是 C++ 并发编程中用于处理异步任务的重要工具,其核心功能是:

  • 接收一组 future 对象作为输入
  • 返回一个新的 future,当任意一个输入的 future 完成时,该新 future 即变为就绪状态
  • future 中存储的是一个 std::tuple,包含:
    • 已完成 future 的索引位置
    • 所有输入 future 的移动后副本(未完成的 future 仍保持可等待状态)

when_all(等待所有任务完成)不同,when_any 适用于以下场景:

  • 只需要第一个完成的任务结果
  • 希望通过某个任务的完成触发后续操作
  • 需要处理任务超时或失败的优先级问题
2. 核心特性与实现逻辑
  • 非阻塞特性when_any 不会阻塞当前线程,而是通过延续(continuation)机制异步处理结果
  • 资源管理:返回的 future 会自动管理输入 future 的生命周期,避免资源泄漏
  • 异常处理:若任意输入 future 抛出异常,when_any 返回的 future 会将异常封装其中
  • 线程调度:由实现决定线程池的调度策略,通常会复用异步任务的线程资源
3. 代码示例:使用 when_any 处理异步任务

下面通过一个网络请求场景演示 when_any 的实际应用:

#include <experimental/future>
#include <experimental/promise>
#include <vector>
#include <string>
#include <iostream>
#include <thread>
#include <chrono>

// 模拟网络请求函数,返回future<string>
std::experimental::future<std::string> make_network_request(int id, int delay_ms) {
    // 创建promise用于设置future结果
    auto promise = std::experimental::make_ready_promise<std::string>();
    
    // 在新线程中模拟网络请求延迟
    std::thread([promise = std::move(promise), id, delay_ms]() {
        std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
        try {
            if (delay_ms > 1000) {
                // 模拟请求失败
                throw std::runtime_error("Request timeout");
            }
            promise.set_value("Response from request " + std::to_string(id));
        } catch (...) {
            promise.set_exception(std::current_exception());
        }
    }).detach();
    
    return promise.get_future();
}

int main() {
    // 创建多个异步网络请求
    std::vector<std::experimental::future<std::string>> requests;
    requests.push_back(make_network_request(1, 500));   // 500ms后完成
    requests.push_back(make_network_request(2, 1500));  // 1500ms后完成
    requests.push_back(make_network_request(3, 800));   // 800ms后完成
    
    std::cout << "All requests started, waiting for the first response...\n";
    
    // 使用when_any等待任意一个请求完成
    auto when_any_fut = std::experimental::when_any(requests.begin(), requests.end());
    
    // 添加延续函数处理第一个完成的请求
    when_any_fut.then([](std::experimental::future<std::tuple<std::ptrdiff_t, 
                             std::vector<std::experimental::future<std::string>>>> fut) {
        try {
            // 获取when_any的结果
            auto [index, remaining_futures] = fut.get();
            
            // 处理第一个完成的请求结果
            std::string result = remaining_futures[index].get();
            std::cout << "First response received (index " << index << "):\n";
            std::cout << result << std::endl;
            
            // 处理剩余的请求(可选)
            std::cout << "\nRemaining requests (" << remaining_futures.size() - 1 << " left):\n";
            for (size_t i = 0; i < remaining_futures.size(); ++i) {
                if (i != static_cast<size_t>(index)) {
                    try {
                        std::string res = remaining_futures[i].get();
                        std::cout << "  Request " << i << " result: " << res << std::endl;
                    } catch (const std::exception& e) {
                        std::cout << "  Request " << i << " failed: " << e.what() << std::endl;
                    }
                }
            }
        } catch (const std::exception& e) {
            std::cout << "Error in when_any continuation: " << e.what() << std::endl;
        }
    });
    
    // 模拟主线程继续执行其他任务
    std::this_thread::sleep_for(std::chrono::seconds(3));
    std::cout << "Main thread finished other tasks.\n";
    
    return 0;
}
4. 执行流程解析
  1. 任务创建:发起3个网络请求,延迟分别为500ms、1500ms、800ms
  2. when_any调用:将3个 future 传入 when_any,返回新的 future
  3. 延续链注册:通过 then() 注册回调函数,当任意请求完成时触发
  4. 结果处理
    • 首先获取完成任务的索引和剩余 future 列表
    • 处理第一个完成的任务结果
    • 可选:处理剩余任务(等待或取消)
  5. 异常处理:若任务抛出异常,会被封装在 future 中并在 get() 时重新抛出
5. 应用场景
  • 超时控制:设置一个超时任务与实际任务并行,通过 when_any 优先处理完成的任务
  • 优先级任务:多个任务中优先处理高优先级任务的结果
  • 资源优化:当只需要第一个结果时,避免等待所有任务完成造成的资源浪费
  • 事件驱动:某个事件(如用户输入)触发后,取消其他正在等待的异步任务
6. 与when_all的对比
特性 when_any when_all
等待条件 任意一个任务完成 所有任务完成
返回future就绪时机 第一个任务完成时 所有任务完成时
结果类型 tuple<索引, 剩余future列表> tuple<所有任务结果>
适用场景 优先处理首个结果、超时控制 汇总所有结果、批量处理
资源消耗 较低(无需等待所有任务) 较高(需等待所有任务完成)
7. 注意事项
  • 线程安全when_any 本身是线程安全的,但输入的 future 需确保在多线程环境中的正确使用
  • 移动语义:传入 when_anyfuture 会被移动,原 future 会变为无效
  • 异常传播:若所有输入 future 都抛出异常,when_any 返回的 future 会封装第一个异常
  • C++标准状态std::experimental 属于实验性接口,可能在未来标准中调整

通过 when_any 机制,开发者可以更灵活地处理异步任务,尤其是在需要响应最快完成任务或处理任务优先级的场景中,相比传统的轮询或阻塞等待方式,能显著提升程序的响应性和资源利用率。


网站公告

今日签到

点亮在社区的每一天
去签到