C++ 并发编程:异步任务

发布于:2025-09-09 ⋅ 阅读:(22) ⋅ 点赞:(0)

在这里插入图片描述


如果觉得本文对您有所帮助,请点个赞和关注吧,谢谢!!!你的支持就是我持续更新的最大动力


一、引言

在现代高性能计算领域,异步编程模型是充分利用多核处理器、避免 I/O 阻塞、提升应用程序响应能力的关键。C++11 及后续标准为我们提供了一套优雅且功能强大的异步任务工具集,其核心在于 std::futurestd::promisestd::packaged_taskstd::async。本文将对这一体系进行一次全面而深入的解构。

二、核心机制:共享状态 (Shared State)

在深入了解各个组件之前,我们必须理解它们背后的通信基石——共享状态 (Shared State)。这是一个概念上的、由标准库在内部管理的对象,它充当了异步生产者和消费者之间的通信信道。

  • 作用:共享状态负责存储异步任务的结果(一个值或一个异常),并维护一个状态标志(例如:就绪、延迟)。
  • 访问句柄:我们无法直接操作共享状态。标准库提供了两种“句柄”来与之交互:
    1. 生产者端 (Writer Handle):如 std::promisestd::packaged_task,用于向共享状态中写入值或异常。
    2. 消费者端 (Reader Handle)std::futurestd::shared_future,用于从共享状态中读取值或异常。

一个共享状态只能被一个生产者写入一次,但可以被一个或多个消费者读取。这套机制保证了线程安全的数据传递,避免了显式的锁和条件变量。


三、std::future: 异步结果的只读凭证

std::future<T> 是一个模板类,它代表了一个可能在未来某个时间点才会就绪的值。它提供了对共享状态的只读访问权限,是异步结果的“消费者”。

1、核心职责

  • 结果获取:从共享状态中检索计算结果。如果结果尚未就绪,调用线程将被阻塞。
  • 异常传播:如果异步操作抛出异常,该异常被存储在共享状态中。当 future 调用 get() 时,此异常会在调用者线程中被重新抛出。
  • 状态查询与同步:查询异步操作是否完成,并提供阻塞等待的同步原语。

2、关键成员函数详解

函数原型 详细解释
T get() 获取并消费结果
- 行为: 阻塞当前线程,直到共享状态变为 ready。然后,它会检索共享状态中的值或异常。
- 返回值: T 类型的结果。如果函数返回 void,则返回 void。对于 future<T&>,返回 T&
- 后置条件: 调用 get() 后,future 对象自身变为无效 (valid() 返回 false),共享状态中的值被“消费”。这是一个一次性操作。再次调用 get() 会抛出 std::future_error(错误码为 std::future_errc::future_already_retrieved)。
- 异常: 如果共享状态中存储的是异常,get() 会重新抛出该异常。
bool valid() const noexcept 检查有效性
- 行为: 检查 future 对象是否与一个共享状态关联。
- 返回值: true 表示 future 关联了一个有效的共享状态;false 则表示它是一个空的 future(通过默认构造、移动操作或 get() 调用后产生)。对无效的 future 调用除 valid() 和析构函数外的任何成员函数都是未定义行为。
void wait() const 阻塞等待
- 行为: 阻塞当前线程直到共享状态变为 ready。它获取结果,future 在调用后依然保持 valid 状态,可以稍后调用 get()。可多次调用。
template<class Rep, class Period> std::future_status wait_for(...) const 带超时的阻塞等待
- 行为: 最多阻塞 timeout_duration 时间,等待共享状态变为 ready
- 返回值: 一个 std::future_status 枚举:
- ready: 结果已就绪。
- timeout: 超时,结果仍未就绪。
- deferred: 任务被延迟执行(由 std::asyncdeferred 策略引起),等待 wait()get() 触发。
std::shared_future<T> share() 创建 std::shared_future
- 行为: 将 future 对象的共享状态所有权转移到一个新的 std::shared_future 对象中。调用后,原 future 对象变为无效。详见后文 shared_future 部分。

四、std::promise: 结果的承诺与实现

std::promise<T> 是一个模板类,它提供了向共享状态写入一次结果的能力。它是异步结果的“生产者”,做出了一个在未来提供值的“承诺”。

1、核心职责

  • 创建一个共享状态。
  • 提供一个关联的 std::future 对象给消费者。
  • 在适当的时候,通过 set_value()set_exception() 来履行承诺,使共享状态变为 ready

2、使用格式与成员函数详解

#include <future>
#include <thread>
#include <iostream>
#include <stdexcept>

// 生产者函数
void compute_and_fulfill(std::promise<int> p, bool should_throw) {
    try {
        if (should_throw) {
            throw std::runtime_error("Simulating an error in producer.");
        }
        std::this_thread::sleep_for(std::chrono::seconds(1));
        p.set_value(100); // 1. 履行承诺,设置值
    } catch (...) {
        p.set_exception(std::current_exception()); // 2. 或通过设置异常来履行
    }
}

int main() {
    std::promise<int> my_promise;
    std::future<int> my_future = my_promise.get_future(); // 获取关联的 future

    // 将 promise 的所有权移动到新线程
    std::thread t(compute_and_fulfill, std::move(my_promise), false);

    std::cout << "Main thread waiting..." << std::endl;
    int result = my_future.get(); // 阻塞并等待结果
    std::cout << "Result received: " << result << std::endl;

    t.join();
}
函数原型 详细解释
std::future<T> get_future() 获取关联的 future
- 行为: 返回一个与此 promise 的共享状态相关联的 std::future 对象。
- 前置条件: 必须在 set_valueset_exception 之前调用。
- 限制: 每个 promise 对象只能调用一次此函数。再次调用会抛出 std::future_error(错误码 std::future_errc::future_already_retrieved)。
void set_value(const T& value)
void set_value(T&& value)
设置正常结果
- 行为: 将 value 原子地存储到共享状态中,并将状态标记为 ready。这会唤醒任何在关联的 future 上等待的线程。
- 限制: promise 的生命周期内,set_valueset_exception 系列函数总共只能调用一次。再次调用会抛出 std::future_error(错误码 std::future_errc::promise_already_satisfied)。
void set_exception(std::exception_ptr p) 设置异常结果
- 行为: 将一个异常指针原子地存储到共享状态中,并将状态标记为 ready
- 参数: p 通常通过 std::current_exception()catch 块中获取。
void set_value_at_thread_exit(...)
void set_exception_at_thread_exit(...)
在线程退出时履行承诺
- 行为: 这两个函数与 set_value/set_exception 类似,但它们不会立即将共享状态设为 ready。相反,它们会等到当前线程完全退出时才这样做。这对于需要确保所有线程局部存储(thread-local storage)都被销毁后再通知结果的场景非常有用。

析构行为:如果一个 promise 在未被履行(既未 set_value 也未 set_exception)的情况下被析构,它会自动用一个 std::future_error(错误码 std::future_errc::broken_promise)来“履行”承诺,以通知消费者该承诺已被打破。


五、std::packaged_task: 任务与 future 的封装体

std::packaged_task<Signature> 是一个连接可调用对象与 future 的强大中间件。它是一个模板类,其模板参数是一个函数签名(如 int(int, int))。

1、核心职责

  • 封装: 它包装一个可调用对象(函数、lambda等)。
  • 连接: 它内部管理一个 promise,并对外暴露其关联的 future
  • 执行: 当 packaged_task 对象自身被调用时(通过其 operator()),它会执行被包装的可调用对象,并将返回值或异常自动存入其内部的 promise 中。

这使得任务的定义任务的执行可以完全解耦。你可以创建一个任务,获取它的 future,然后将任务本身传递给任何执行上下文(如线程、线程池)。

2、使用格式与成员函数详解

#include <future>
#include <thread>
#include <iostream>
#include <vector>

int task_func(int id) {
    std::cout << "Task " << id << " running on thread " << std::this_thread::get_id() << std::endl;
    return id * id;
}

int main() {
    // 1. 定义一个 packaged_task,包装函数 task_func
    std::packaged_task<int(int)> my_task(task_func);

    // 2. 获取与该任务关联的 future
    std::future<int> my_future = my_task.get_future();

    // 3. 将任务移动到线程中执行
    // 注意:packaged_task 不可拷贝,只能移动
    std::thread t(std::move(my_task), 42);

    // 4. 从 future 获取结果
    std::cout << "Main thread waiting for packaged_task result..." << std::endl;
    std::cout << "Result: " << my_future.get() << std::endl;

    t.join();
}

packaged_task 的主要接口包括:

  • 构造函数: 接受一个可调用对象。
  • get_future(): 与 promise::get_future 行为一致。
  • operator(): 调用此对象会执行其内部的函数。
  • valid(): 检查 packaged_task 是否拥有一个可调用对象。
  • reset(): 重置 packaged_task,使其可以被再次调用(会创建一个新的 promisefuture)。

六、std::async: 最高级的异步调用接口

std::async 是一个函数模板,它提供了一个极其简洁的方式来异步运行一个函数并获取其结果,是迄今为止最高级的抽象。

1、核心职责

  • 以异步方式启动一个可调用对象。
  • 自动处理线程的创建与管理(可能使用线程池)。
  • 返回一个 std::future,该 future 将在任务完成时持有其结果。

2、启动策略 (Launch Policy) - 行为的关键

std::async 的行为由其第一个(可选)参数——启动策略——精确控制。

std::launch Policy 行为描述
std::launch::async 强制异步: 函数 f 必须在一个新的线程上立即开始执行(或者由实现选择线程池中的一个线程)。这是真正的并发执行。
std::launch::deferred 延迟执行: 函数 f 不会立即执行。它将在其返回的 future首次调用 get()wait(),在调用 get()wait() 的那个线程上同步执行。这是一种惰性求值(Lazy Evaluation)。
std::launch::async | std::launch::deferred (默认) 实现定义: 这是默认策略。标准库的实现可以根据当前系统负载等因素,自由选择 asyncdeferred 策略。这带来了灵活性,但也引入了不确定性,通常建议显式指定策略。
示例:策略对比
#include <future>
#include <iostream>
#include <thread>

void print_thread_id(const std::string& policy_name) {
    std::cout << "[" << policy_name << "] Executing on thread: " << std::this_thread::get_id() << std::endl;
}

int main() {
    std::cout << "[Main] Main thread ID: " << std::this_thread::get_id() << std::endl;

    auto fut_async = std::async(std::launch::async, print_thread_id, "async");
    auto fut_deferred = std::async(std::launch::deferred, print_thread_id, "deferred");

    std::cout << "[Main] Waiting for deferred task to be called..." << std::endl;
    // 只有在调用 get() 时,deferred 任务才会在主线程上执行
    fut_deferred.get();
    
    // async 任务已在另一个线程上执行或执行完毕
    fut_async.get();

    return 0;
}

3、关键陷阱:std::async 的析构函数

这是一个至关重要且容易被忽略的特性:如果一个由 std::async 返回的 std::future 在其关联的异步任务尚未完成时被析构,那么这个析构函数将会阻塞,直到任务完成

为什么? 这是为了保证程序不会在 main 函数或其他作用域结束后,留下一个仍在后台运行的“僵尸”线程。它确保了资源的正确回收和异常的传播。

后果是什么? 如果你无意中创建了一个临时的 future,你的异步调用会退化成同步调用。

// 错误示例:异步调用退化为同步
void run_tasks() {
    std::cout << "Starting task..." << std::endl;
    // fut 是一个临时对象,在分号处即被析构
    // 析构函数会阻塞,直到 lambda 执行完毕
    std::async(std::launch::async, [] {
        std::this_thread::sleep_for(std::chrono::seconds(2));
        std::cout << "Task finished." << std::endl;
    });
    std::cout << "run_tasks function is about to exit." << std::endl; // 这句话会等待 2 秒后才打印
}

// 正确示例:保存 future,延迟阻塞点
void run_tasks_correctly() {
    std::cout << "Starting task..." << std::endl;
    auto fut = std::async(std::launch::async, [] { // ... });
    std::cout << "run_tasks function is about to exit." << std::endl; // 这句话会立即打印
    // ... 在未来的某个点,当你需要结果时 ...
    // fut.get(); // 阻塞发生在这里
}

七、std::shared_future: 一对多的结果广播

std::future 的所有权是独占的,其 get() 只能调用一次。如果需要多个线程等待同一个事件并获取相同的结果,就需要 std::shared_future

  • 创建: 通过 std::future::share() 或从另一个 std::shared_future 拷贝构造。
  • 行为: 它的 get() 方法返回的是 const T& (或 const T* for void),可以被多个线程安全地、多次地调用。
#include <future>
#include <thread>
#include <iostream>
#include <vector>

void waiter(int id, std::shared_future<int> sf) {
    std::cout << "Waiter " << id << " is waiting..." << std::endl;
    int result = sf.get(); // 所有 waiter 都会在这里阻塞
    std::cout << "Waiter " << id << " got result: " << result << std::endl;
}

int main() {
    std::promise<int> p;
    std::future<int> f = p.get_future();
    std::shared_future<int> sf = f.share(); // 创建 shared_future

    std::vector<std::thread> waiters;
    for (int i = 0; i < 5; ++i) {
        waiters.emplace_back(waiter, i, sf); // 传递 shared_future 的副本
    }

    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Main thread fulfilling the promise." << std::endl;
    p.set_value(2024); // 一旦设置,所有等待的线程都会被唤醒

    for (auto& t : waiters) {
        t.join();
    }
}

八、选择指南

特性 std::promise + std::thread std::packaged_task std::async
核心用途 手动、精细地控制线程间的值/异常传递 将任务(callable)与 future 绑定,解耦定义与执行 简单、高级的“即发即忘”式异步函数调用
控制粒度 最高。完全控制线程生命周期和值设置时机 中等。控制任务的执行时机和线程 最低。线程管理由库实现,行为由策略决定
复杂度 高。需要手动管理 threadpromise 对象 中等。比 promise 简单,但仍需手动管理执行 低。单行代码即可完成异步调用
推荐场景 复杂的事件驱动模型;一个线程的结果需要由另一个完全不相关的线程设置时 任务队列、线程池实现;需要将任务作为对象传递 大多数常规的异步计算场景;希望将耗时操作移出主线程

九、总结

C++ 的异步任务库提供了一个从低级到高级、层次分明的工具集。std::promisestd::future 构成了底层的通信原语;std::packaged_task 在此之上封装了可调用对象,实现了任务与执行的分离;而 std::async 则提供了最简洁、最易用的顶层接口。深刻理解它们各自的职责、生命周期和微妙的行为差异,特别是 std::async 的析构函数阻塞行为和启动策略,是编写出健壮、高效、可维护的现代 C++ 并发程序的基石。

如果觉得本文对您有所帮助,请点个赞和关注吧,谢谢!!!你的支持就是我持续更新的最大动力


网站公告

今日签到

点亮在社区的每一天
去签到