探究C++20协程(5)——基于挂起实现无阻塞的定时器

发布于:2024-04-25 ⋅ 阅读:(28) ⋅ 点赞:(0)

实现目标

当用传统的线程 sleep 函数来让程序等待时,实际上是在阻塞当前线程。阻塞意味着这个线程在指定的时间(例如100毫秒)内无法执行任何其他任务。这种方式虽然简单,但效率低下,因为它导致CPU资源在等待期间未被充分利用。

协程提供了一种更加高效的方式来处理这种等待情况。它们在单个线程内部执行,并且能够在不阻塞线程的情况下挂起和恢复。当一个协程遇到需要等待的操作(如 sleep)时,它会挂起自身,而不会阻塞所在的线程。这使得线程可以转而去执行其他的协程。

这种挂起和恢复的过程是由协程调度器管理的。在协程 sleep 的这100毫秒期间,调度器可以安排其他协程运行,从而充分利用CPU资源。等到等待时间结束后,原来挂起的协程会被自动恢复执行。这样可以在不牺牲响应性的前提下,更有效地管理和利用系统资源。

实现效果如下,直接使用co_wait让协程无阻塞等待1s

Task<int, AsyncExecutor> simple_task2() {
  debug("task 2 start ...");
  using namespace std::chrono_literals;
  // 之前的写法,用 sleep_for 让当前线程睡眠 1 秒
  // std::this_thread::sleep_for(1s);
  // 等待 1 秒,注意 1s 是 chrono_literals 的字面值写法
  co_await 1s;
  debug("task 2 returns after 1s.");
  co_return 2;
}

这里的1s 实际上这是 C++ 11 对字面值的一种支持,本质上就是一个运算符重载,类型是 duration< long long >。除了秒以外,时间的单位也可以是毫秒、纳秒、分钟、小时等等,这些 C++ 11 的 duration 都已经提供了完善的支持,因此只要对 duration 做支持即可。

为 duration 实现 await_transform

template<typename ResultType, typename Executor>
struct TaskPromise {
  ...
  template<typename _Rep, typename _Period>
  SleepAwaiter await_transform(std::chrono::duration<_Rep, _Period> &&duration) {
    return SleepAwaiter(&executor, std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
  }
  ...
}

需要在TaskPromise里引入了一个新的类型 SleepAwaiter,它的任务有两个:

  • 确保当前协程在若干毫秒之后恢复执行。
  • 确保当前协程恢复执行时要调度到对应的调度器上。
struct SleepAwaiter {

  explicit SleepAwaiter(AbstractExecutor *executor, long long duration) noexcept
      : _executor(executor), _duration(duration) {}

  bool await_ready() const { return false; }

  void await_suspend(std::coroutine_handle<> handle) const {
    // 自定义的延时执行工具类,全局只需要一个实例
    static Scheduler scheduler;

    scheduler.execute([this, handle]() {
      // _duration 毫秒之后执行下面的代码
      _executor->execute([handle]() {
        handle.resume();
      });
    }, _duration);
  }

  void await_resume() {}

 private:
  AbstractExecutor *_executor;
  long long _duration;
}

最重要的是await_suspend:这个函数会被调用来暂停协程。它使用一个静态的Scheduler对象,调度一个延迟任务。这个任务在指定的_duration毫秒后由执行器_executor恢复协程。

这当中最为关键的就是 Scheduler 的实现了,这个类实际上本身就是一个独立的定时任务调度器。

定时任务调度器 Scheduler

Scheduler是一个管理和执行定时任务的单例类。它可以安排任务在指定时间后执行。为了有效管理这些任务,使用优先级队列来存储待执行的任务,这些任务按照执行时间的优先级进行排序。

定时任务的描述类型

为了方便管理定时任务,我们需要定义一个类型 DelayedExecutable。这是一个用于描述定时任务的类。它包含一个将要执行的函数和该函数的计划执行时间(绝对时间)。该类通过以下方式计算任务的计划执行时间:

  • 获取当前时间。
  • 根据提供的延迟(以毫秒为单位)计算出未来的绝对执行时间。
class DelayedExecutable {
 public:
  DelayedExecutable(std::function<void()> &&func, long long delay) : func(std::move(func)) {
    using namespace std;
    using namespace std::chrono;
    auto now = system_clock::now();
    // 当前的时间戳,单位毫秒
    auto current = duration_cast<milliseconds>(now.time_since_epoch()).count();
    // 计算出任务的计划执行时间
    scheduled_time = current + delay;
  }

  // 调用时,返回从当前时间还需要多少毫秒到任务执行时间
  long long delay() const {
    using namespace std;
    using namespace std::chrono;
    auto now = system_clock::now();
    auto current = duration_cast<milliseconds>(now.time_since_epoch()).count();
    return scheduled_time - current;
  }

  long long get_scheduled_time() const {
    return scheduled_time;
  }

  void operator()() {
    func();
  }

 private:
  long long scheduled_time;
  std::function<void()> func;
};

为了将 DelayedExecutable 存入优先级队列当中,我们还需要给它提给一个比较大小的类:

class DelayedExecutableCompare {
 public:
  bool operator()(DelayedExecutable &left, DelayedExecutable &right) {
    return left.get_scheduled_time() > right.get_scheduled_time();
  }
};

这个类就很简单了,直接将对 DelayedExecutable 的比较转换成对它们的执行时间的比较。使用这个类对 DelayedExecutable 进行排序时,会使得时间靠前的对象排到前面。

定时任务调度器

Scheduler类使用了多线程和条件变量来管理和执行任务。它的工作原理如下:

  • 维护一个优先级队列,队列中的任务按执行时间排序。
  • 使用一个工作线程来检查队列并执行任务。
  • 如果队列为空,工作线程会等待新任务的到来。
  • 如果当前时间未到队列头部任务的执行时间,工作线程将等待直到那个时间。
  • 当任务的执行时间到达,从队列中取出任务(恢复协程)并执行。
class Scheduler {
 private:
  std::condition_variable queue_condition;
  std::mutex queue_lock;
  // 注意这里改用优先级队列
  std::priority_queue<DelayedExecutable, std::vector<DelayedExecutable>, DelayedExecutableCompare> executable_queue;

  std::atomic<bool> is_active;
  std::thread work_thread;

  void run_loop() {
    while (is_active.load(std::memory_order_relaxed) || !executable_queue.empty()) {
      std::unique_lock lock(queue_lock);
      if (executable_queue.empty()) {
        queue_condition.wait(lock);
        if (executable_queue.empty()) {
          continue;
        }
      }

      // 从这里开始于 LooperExecutor 不同,这里需要判断优先级队头的任务,也就是最先要执行的任务是否需要立即执行
      auto executable = executable_queue.top();
      long long delay = executable.delay();
      if (delay > 0) {
        // 队头的任务还没到执行时间,等待 delay 毫秒
        auto status = queue_condition.wait_for(lock, std::chrono::milliseconds(delay));
        // 如果等待期间没有延时比 delay 更小的任务加入,这里就会返回 timeout
        if (status != std::cv_status::timeout) {
          // 不是 timeout,需要重新计算队头的延时
          continue;
        }
      }
      executable_queue.pop();
      lock.unlock();
      executable();
    }
  }
 public:

  Scheduler() {
    ... // 与 LooperExecutor 完全相同
  }

  ~Scheduler() {
    ... // 与 LooperExecutor 完全相同
  }

  void execute(std::function<void()> &&func, long long delay) {
    delay = delay < 0 ? 0 : delay;
    std::unique_lock lock(queue_lock);
    if (is_active.load(std::memory_order_relaxed)) {
      // 只有队列为空或者比当前队头任务的延时更小时,需要调用 notify_one
      // 其他情况只需要按顺序依次执行即可
      bool need_notify = executable_queue.empty() || executable_queue.top().delay() > delay;
      executable_queue.push(DelayedExecutable(std::move(func), delay));
      lock.unlock();
      if (need_notify) {
        queue_condition.notify_one();
      }
    }
  }

  void shutdown(bool wait_for_complete = true) {
    ... // 与 LooperExecutor 完全相同
  }

  void join() {
    if (work_thread.joinable()) {
      work_thread.join();
    }
  }
};

关于阻塞的说明

虽然Scheduler的实现在等待任务执行时会阻塞一个线程,这种方式比阻塞多个线程要高效得多。在实际应用中,这可以显著减少系统资源的占用,提高线程的利用率。通过这种方式,即使有多个协程或任务需要延时执行,只需要阻塞一个专门的调度线程而不是每个协程对应的线程。

结果展示

在这里插入图片描述

任务开始执行

  • 51:59.226:任务(ID:7900)开始执行。
  • 51:59.329:100毫秒后,任务(ID:7900)记录了一个时间点,显示从开始到现在大约过去了103毫秒。
    第二任务开始
  • 51:59.331:另一个任务(ID:38916)开始执行。
  • 52:00.339:大约一秒后,任务(ID:38916)完成。时间点从开始(51:59.331)到结束(52:00.339)约为1.008秒,符合预期的一秒内。
    任务返回结果
  • 52:00.340:任务(ID:7900)从任务2获得结果(返回值:2)。
    第三个时间点和第三任务开始
  • 52:00.854:500毫秒后,任务(ID:7900)记录另一个时间点,从51:59.329到52:00.854约为1.525秒。
  • 52:00.855:接着,第三个任务(ID:38248)开始。
    第三任务结束
  • 52:02.869:第三个任务(ID:23916)完成,历时约2秒,从52:00.855到52:02.869。
    任务返回结果并结束
  • 52:02.872:任务(ID:7900)从任务3获得结果(返回值:3)。
  • 52:02.873:任务(ID:7900)完成,总结果是6。
  • 52:02.874:几乎同时,从另一个获取点(ID:8524)也报告任务完成,结果同样是6。
    结束运行循环
  • 52:02.874:任务(ID:7900)退出运行循环。
  • 52:02.876:另一个任务(ID:11984)也退出运行循环。

通过上述分析,我们可以看到每个任务的开始和结束时间记录非常明确,并且与预期执行时间相匹配:

100毫秒的任务实际耗时约103毫秒。
1秒的任务实际耗时约1.008秒。
500毫秒后记录的时间与预期相符。
2秒的任务实际耗时约2.014秒。

完整代码

#define __cpp_lib_coroutine
#define  _CRT_SECURE_NO_WARNINGS
#include <coroutine>
#include <exception>
#include <iostream>
#include <thread>
#include <functional>
#include <mutex>
#include <list>
#include <optional>
#include <cassert>
#include <queue>
#include <future>
#include <chrono>
#include <ctime>
using namespace std;
void print_time() {
    // 获取当前时间点
    auto now = std::chrono::system_clock::now();
    // 转换为time_t类型
    auto now_c = std::chrono::system_clock::to_time_t(now);
    // 获取本地时间
    auto local_time = std::localtime(&now_c);
    // 输出当前的分钟和秒(不输出年、月、日和小时)
    std::cout << "Current time: ";
    std::cout << std::setfill('0') << std::setw(2) << local_time->tm_min << ":";  // 分钟
    std::cout << std::setfill('0') << std::setw(2) << local_time->tm_sec;        // 秒

    // 处理毫秒部分
    auto now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch());
    auto sec_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::seconds(now_c));
    std::cout << "." << std::setfill('0') << std::setw(3) << (now_ms.count() % 1000) << std::endl;  // 毫秒
}
void debug(const std::string& s) {
    print_time();
    printf(" %d %s\n",std::this_thread::get_id(), s.c_str());
}

void debug(const std::string& s, int x) {
    print_time();
    printf("%d %s %d\n", std::this_thread::get_id(), s.c_str(), x);
}
// 定时器
class DelayedExecutable {
public:
    DelayedExecutable(std::function<void()>&& func, long long delay) : func(std::move(func)) {
        using namespace std;
        using namespace std::chrono;
        auto now = system_clock::now();
        auto current = duration_cast<milliseconds>(now.time_since_epoch()).count();

        scheduled_time = current + delay;
    }

    long long delay() const {
        using namespace std;
        using namespace std::chrono;

        auto now = system_clock::now();
        auto current = duration_cast<milliseconds>(now.time_since_epoch()).count();
        return scheduled_time - current;
    }

    long long get_scheduled_time() const {
        return scheduled_time;
    }

    void operator()() {
        func();
    }

private:
    long long scheduled_time;
    std::function<void()> func;
};

class DelayedExecutableCompare {
public:
    bool operator()(DelayedExecutable& left, DelayedExecutable& right) {
        return left.get_scheduled_time() > right.get_scheduled_time();
    }
};

class Scheduler {
private:
    std::condition_variable queue_condition;
    std::mutex queue_lock;
    std::priority_queue<DelayedExecutable, std::vector<DelayedExecutable>, DelayedExecutableCompare> executable_queue;

    std::atomic<bool> is_active;
    std::thread work_thread;

    void run_loop() {
        while (is_active.load(std::memory_order_relaxed) || !executable_queue.empty()) {
            std::unique_lock lock(queue_lock);
            if (executable_queue.empty()) {
                queue_condition.wait(lock);
                if (executable_queue.empty()) {
                    continue;
                }
            }
            auto executable = executable_queue.top();
            long long delay = executable.delay();
            if (delay > 0) {
                auto status = queue_condition.wait_for(lock, std::chrono::milliseconds(delay));
                if (status != std::cv_status::timeout) {
                    // a new executable should be executed before.
                    continue;
                }
            }
            executable_queue.pop();
            lock.unlock();
            executable();
        }
        debug("run_loop exit.");
    }
public:

    Scheduler() {
        is_active.store(true, std::memory_order_relaxed);
        work_thread = std::thread(&Scheduler::run_loop, this);
    }

    ~Scheduler() {
        shutdown(false);
        join();
    }

    void execute(std::function<void()>&& func, long long delay) {
        delay = delay < 0 ? 0 : delay;
        std::unique_lock lock(queue_lock);
        if (is_active.load(std::memory_order_relaxed)) {
            bool need_notify = executable_queue.empty() || executable_queue.top().delay() > delay;
            executable_queue.push(DelayedExecutable(std::move(func), delay));
            lock.unlock();
            if (need_notify) {
                queue_condition.notify_one();
            }
        }
    }

    void shutdown(bool wait_for_complete = true) {
        is_active.store(false, std::memory_order_relaxed);
        if (!wait_for_complete) {
            // clear queue.
            std::unique_lock lock(queue_lock);
            decltype(executable_queue) empty_queue;
            std::swap(executable_queue, empty_queue);
            lock.unlock();
        }

        queue_condition.notify_all();
    }

    void join() {
        if (work_thread.joinable()) {
            work_thread.join();
        }
    }
};

// 调度器
class AbstractExecutor {
public:
    virtual void execute(std::function<void()>&& func) = 0;
};

class NoopExecutor : public AbstractExecutor {
public:
    void execute(std::function<void()>&& func) override {
        func();
    }
};

class NewThreadExecutor : public AbstractExecutor {
public:
    void execute(std::function<void()>&& func) override {
        std::thread(func).detach();
    }
};

class AsyncExecutor : public AbstractExecutor {
public:
    void execute(std::function<void()>&& func) override {
        auto future = std::async(func);
    }
};

class LooperExecutor : public AbstractExecutor {
private:
    std::condition_variable queue_condition;
    std::mutex queue_lock;
    std::queue<std::function<void()>> executable_queue;

    std::atomic<bool> is_active;
    std::thread work_thread;

    void run_loop() {
        while (is_active.load(std::memory_order_relaxed) || !executable_queue.empty()) {
            std::unique_lock lock(queue_lock);
            if (executable_queue.empty()) {
                queue_condition.wait(lock);
                if (executable_queue.empty()) {
                    continue;
                }
            }
            auto func = executable_queue.front();
            executable_queue.pop();
            lock.unlock();

            func();
        }
        debug("run_loop exit.");
    }

public:

    LooperExecutor() {
        is_active.store(true, std::memory_order_relaxed);
        work_thread = std::thread(&LooperExecutor::run_loop, this);
    }

    ~LooperExecutor() {
        shutdown(false);
        if (work_thread.joinable()) {
            work_thread.join();
        }
    }

    void execute(std::function<void()>&& func) override {
        std::unique_lock lock(queue_lock);
        if (is_active.load(std::memory_order_relaxed)) {
            executable_queue.push(func);
            lock.unlock();
            queue_condition.notify_one();
        }
    }

    void shutdown(bool wait_for_complete = true) {
        is_active.store(false, std::memory_order_relaxed);
        if (!wait_for_complete) {
            // clear queue.
            std::unique_lock lock(queue_lock);
            decltype(executable_queue) empty_queue;
            std::swap(executable_queue, empty_queue);
            lock.unlock();
        }

        queue_condition.notify_all();
    }
};

template<typename T>
struct Result
{
    explicit Result() = default;
    explicit Result(T&& value) : _value(value) {}
    explicit Result(std::exception_ptr&& exception_ptr) : _exception_ptr(exception_ptr) {}
    T get_or_throw() {
        if (_exception_ptr) {
            std::rethrow_exception(_exception_ptr);
        }
        return _value;
    }
private:
    T _value;
    std::exception_ptr _exception_ptr;
};
// 用于协程initial_suspend()时直接将运行逻辑切入调度器的等待体
struct DispatchAwaiter {

    explicit DispatchAwaiter(AbstractExecutor* executor) noexcept
        : _executor(executor) {}

    bool await_ready() const { return false; }

    void await_suspend(std::coroutine_handle<> handle) const {
        _executor->execute([handle]() {
            handle.resume();
            });
    }

    void await_resume() {}

private:
    AbstractExecutor* _executor;
};

//对于时间的等待体
struct SleepAwaiter {

    explicit SleepAwaiter(AbstractExecutor* executor, long long duration) noexcept
        : _executor(executor), _duration(duration) {}

    bool await_ready() const { return false; }

    void await_suspend(std::coroutine_handle<> handle) const {
        static Scheduler scheduler;

        scheduler.execute([this, handle]() {
            _executor->execute([handle]() {
                handle.resume();
                });
            }, _duration);
    }

    void await_resume() {}

private:
    AbstractExecutor* _executor;
    long long _duration;
};

// 前向声明
template<typename ResultType, typename Executor>
struct Task;

template<typename Result, typename Executor>
struct TaskAwaiter {
    explicit TaskAwaiter(AbstractExecutor* executor, Task<Result, Executor>&& task) noexcept
        : _executor(executor), task(std::move(task)) {}

    TaskAwaiter(TaskAwaiter&& completion) noexcept
        : _executor(completion._executor), task(std::exchange(completion.task, {})) {}

    TaskAwaiter(TaskAwaiter&) = delete;

    TaskAwaiter& operator=(TaskAwaiter&) = delete;

    constexpr bool await_ready() const noexcept {
        return false;
    }
    // 在这里增加了调度器的运行
    void await_suspend(std::coroutine_handle<> handle) noexcept {
        task.finally([handle, this]() {
            _executor->execute([handle]() {
                handle.resume();
                });
            });
    }

    Result await_resume() noexcept {
        return task.get_result();
    }

private:
    Task<Result, Executor> task;
    AbstractExecutor* _executor;
};

// 对应修改增加调度器的传入
template<typename ResultType,typename Executor>
struct TaskPromise {
    //此时调度器将开始调度,执行的逻辑
    DispatchAwaiter initial_suspend() { return DispatchAwaiter(&executor); }

    std::suspend_always final_suspend() noexcept { return {}; }

    Task<ResultType, Executor> get_return_object() {
        return Task{ std::coroutine_handle<TaskPromise>::from_promise(*this) };
    }
    //在这里返回等待器对象时需要将调度器的指针带上
    template<typename _ResultType, typename _Executor>
    TaskAwaiter<_ResultType, _Executor> await_transform(Task<_ResultType, _Executor>&& task) {
        return TaskAwaiter<_ResultType, _Executor>(&executor, std::move(task));
    }

    template<typename _Rep, typename _Period>
    SleepAwaiter await_transform(std::chrono::duration<_Rep, _Period>&& duration) {
        return SleepAwaiter(&executor, std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());
    }

    void unhandled_exception() {
        std::lock_guard lock(completion_lock);
        result = Result<ResultType>(std::current_exception());
        completion.notify_all();
        notify_callbacks();
    }

    void return_value(ResultType value) {
        std::lock_guard lock(completion_lock);
        result = Result<ResultType>(std::move(value));
        completion.notify_all();
        notify_callbacks();
    }

    ResultType get_result() {
        std::unique_lock lock(completion_lock);
        if (!result.has_value()) {
            completion.wait(lock);
        }
        return result->get_or_throw();
    }

    void on_completed(std::function<void(Result<ResultType>)>&& func) {
        std::unique_lock lock(completion_lock);
        if (result.has_value()) {
            auto value = result.value();
            lock.unlock();
            func(value);
        }
        else {
            completion_callbacks.push_back(func);
        }
    }

private:
    std::optional<Result<ResultType>> result;
    Executor executor;
    std::mutex completion_lock;
    std::condition_variable completion;

    std::list<std::function<void(Result<ResultType>)>> completion_callbacks;

    void notify_callbacks() {
        auto value = result.value();
        for (auto& callback : completion_callbacks) {
            callback(value);
        }
        completion_callbacks.clear();
    }

};

template<typename ResultType,typename Executor = NewThreadExecutor>
struct Task {

    using promise_type = TaskPromise<ResultType, Executor>;

    ResultType get_result() {
        return handle.promise().get_result();
    }

    Task& then(std::function<void(ResultType)>&& func) {
        handle.promise().on_completed([func](auto result) {
            try {
                func(result.get_or_throw());
            }
            catch (std::exception& e) {
                // ignore.
            }
            });
        return *this;
    }

    Task& catching(std::function<void(std::exception&)>&& func) {
        handle.promise().on_completed([func](auto result) {
            try {
                result.get_or_throw();
            }
            catch (std::exception& e) {
                func(e);
            }
            });
        return *this;
    }

    Task& finally(std::function<void()>&& func) {
        handle.promise().on_completed([func](auto result) { func(); });
        return *this;
    }

    explicit Task(std::coroutine_handle<promise_type> handle) noexcept : handle(handle) {}

    Task(Task&& task) noexcept : handle(std::exchange(task.handle, {})) {}

    Task(Task&) = delete;

    Task& operator=(Task&) = delete;

    ~Task() {
        if (handle) handle.destroy();
    }

private:
    std::coroutine_handle<promise_type> handle;
};

Task<int, AsyncExecutor> simple_task2() {
    debug("task 2 start ...");
    using namespace std::chrono_literals;
    co_await 1s;
    debug("task 2 returns after 1s.");
    co_return 2;
}

Task<int, NewThreadExecutor> simple_task3() {
    debug("in task 3 start ...");
    using namespace std::chrono_literals;
    co_await 2s;
    debug("task 3 returns after 2s.");
    co_return 3;
}

Task<int, LooperExecutor> simple_task() {
    debug("task start ...");
    using namespace std::chrono_literals;
    co_await 100ms;
    debug("after 100ms ...");
    auto result2 = co_await simple_task2();
    debug("returns from task2: ", result2);

    co_await 500ms;
    debug("after 500ms ...");
    auto result3 = co_await simple_task3();
    debug("returns from task3: ", result3);
    co_return 1 + result2 + result3;
}


int main() {
    auto simpleTask = simple_task();
    simpleTask.then([](int i) {
        debug("simple task end: ", i);
        }).catching([](std::exception& e) {
            //debug("error occurred", e.what());
        });
    try {
        auto i = simpleTask.get_result();
        debug("simple task end from get: ", i);
    }
    catch (std::exception& e) {
        //debug("error: ", e.what());
    }
    return 0;
}