使用ASIO的协程实现高并发服务器

发布于:2025-06-27 ⋅ 阅读:(22) ⋅ 点赞:(0)

使用ASIO的协程实现高并发服务器

在 C++ 网络编程领域,Asio 库提供了两种主要的异步编程范式:传统的回调模式和基于协程的现代模式,传统的回调模式大家都很清楚,这里不多做介绍,本文主要介绍基于协程的模式,Asio 协程基于 C++20 标准协程,提供了更简洁的异步编程模型。

ASIO协程和回调对比

下面先列举一下ASIO使用回调和使用协程的区别,等看完区别后再讲讲ASIO使用协程的核心关键函数有哪些

让我用一个更生动的定时器示例来展示协程如何优雅地解决回调地狱问题。我们将实现一个简单但常见需求:

每隔3秒打印一次消息,共打印5次。

要实现上面这个功能,用传统的回调实现是很麻烦的,下面是用回调实现的代码(为了更清晰,这里不用lambda表达式):

#include <boost/asio.hpp>
#include <iostream>

namespace asio = boost::asio;

// 前向声明
void timer_callback1(asio::steady_timer* timer, int count);
void timer_callback2(asio::steady_timer* timer, int count);
void timer_callback3(asio::steady_timer* timer, int count);
void timer_callback4(asio::steady_timer* timer, int count);
void timer_callback5(asio::steady_timer* timer, int count);
void final_callback(asio::steady_timer* timer, int count);

// 处理第一次定时器到期
void timer_callback1(asio::steady_timer* timer, int count) {
    std::cout << "回调模式: 第1次打印 - 3秒已过\n";
    timer->expires_after(std::chrono::seconds(3));
    timer->async_wait(
        std::bind(timer_callback2, timer, count + 1)
    );
}

// 处理第二次定时器到期
void timer_callback2(asio::steady_timer* timer, int count) {
    std::cout << "回调模式: 第2次打印 - 3秒已过\n";
    timer->expires_after(std::chrono::seconds(3));
    timer->async_wait(
        std::bind(timer_callback3, timer, count + 1)
    );
}

// 处理第三次定时器到期
void timer_callback3(asio::steady_timer* timer, int count) {
    std::cout << "回调模式: 第3次打印 - 3秒已过\n";
    timer->expires_after(std::chrono::seconds(3));
    timer->async_wait(
        std::bind(timer_callback4, timer, count + 1)
    );
}

// 处理第四次定时器到期
void timer_callback4(asio::steady_timer* timer, int count) {
    std::cout << "回调模式: 第4次打印 - 3秒已过\n";
    timer->expires_after(std::chrono::seconds(3));
    timer->async_wait(
        std::bind(timer_callback5, timer, count + 1)
    );
}

// 处理第五次定时器到期
void timer_callback5(asio::steady_timer* timer, int count) {
    std::cout << "回调模式: 第5次打印 - 3秒已过\n";
    timer->expires_after(std::chrono::seconds(3));
    timer->async_wait(
        std::bind(final_callback, timer, count + 1)
    );
}

// 最终回调
void final_callback(asio::steady_timer* timer, int count) {
    std::cout << "回调模式: 完成" << count << "次打印\n";
    delete timer; // 清理资源
}

// 启动定时器序列
void start_timer_sequence(asio::io_context& io) {
    // 在堆上创建定时器(需要在整个序列中保持存活)
    asio::steady_timer* timer = new asio::steady_timer(io);
    
    // 设置第一次等待
    timer->expires_after(std::chrono::seconds(3));
    timer->async_wait(
        std::bind(timer_callback1, timer, 0)
    );
}

int main() {
    asio::io_context io;
    
    // 启动回调地狱
    start_timer_sequence(io);
    
    io.run();
    return 0;
}

正如上面的例子,每个异步操作需要独立的处理函数,5次操作需要6个函数(5个回调+1个启动函数),逻辑上深层嵌套:start → callback1 → callback2 → ... → final,回调函数形成链式调用,如果业务很复杂,你自己都不知道哪一个回调调用了哪一个,这是非常不友好的。c++11后有了lambda表达式这个情况有所好转,但lambda嵌套lambda看的还是很费劲

上面代码用协程来实现就比较简单了:

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>

namespace asio = boost::asio;

// 协程版 - 单一函数顺序执行
asio::awaitable<void> sequential_operations() {
    asio::steady_timer timer(co_await asio::this_coro::executor);
    
    // 操作1
    std::cout << "协程: 启动操作1\n";
    timer.expires_after(std::chrono::seconds(3));
    co_await timer.async_wait(asio::use_awaitable);
    std::cout << "协程: 操作1完成 - 3秒已过\n";
    
    // 操作2
    std::cout << "协程: 启动操作2\n";
    timer.expires_after(std::chrono::seconds(3));
    co_await timer.async_wait(asio::use_awaitable);
    std::cout << "协程: 操作2完成 - 3秒已过\n";
    
    // 操作3
    std::cout << "协程: 启动操作3\n";
    timer.expires_after(std::chrono::seconds(3));
    co_await timer.async_wait(asio::use_awaitable);
    std::cout << "协程: 操作3完成 - 3秒已过\n";
    
    // 操作4
    std::cout << "协程: 启动操作4\n";
    timer.expires_after(std::chrono::seconds(3));
    co_await timer.async_wait(asio::use_awaitable);
    std::cout << "协程: 操作4完成 - 3秒已过\n";
    
    // 操作5
    std::cout << "协程: 启动操作5\n";
    timer.expires_after(std::chrono::seconds(3));
    co_await timer.async_wait(asio::use_awaitable);
    std::cout << "协程: 操作5完成 - 3秒已过\n";
    
    std::cout << "协程: 所有操作完成\n";
    co_return;
}

int main() {
    asio::io_context io;
    asio::co_spawn(io, sequential_operations(), asio::detached);
    io.run();
    return 0;
}

使用协程可以让整个序列在一个函数中实现,执行流程一目了然,从上到下顺序执行,无嵌套,无跳转,调整操作顺序只需调整代码顺序,另外,错误处理上,一个 try/catch 块覆盖所有操作

协程与传统回调对比

特性 协程 回调
代码结构 顺序执行,类似同步代码 嵌套回调,容易形成"回调地狱"
可读性 高,逻辑清晰 低,跳转复杂
错误处理 使用 try/catch 错误码参数
局部变量 保持状态 需要手动保持状态
控制流 标准控制结构 手动状态管理
资源管理 RAII 自然适用 需要额外注意生命周期

Asio 协程通过 C++20 标准协程提供了更简洁、更易读的异步编程模型,同时保持了高性能特性。掌握 co_awaitco_spawn 等关键机制,可以显著提高网络应用的开发效率和可维护性。

ASIO协程的核心概念和关键字

ASIO协程的关键概念和关键字如下:

1. co_await

  • 作用:挂起当前协程,等待异步操作完成

  • 用法:以asio的读取数据的例子举例,异步读取等到读取到数据后继续执行数据处理,传统的回调是这样实现的:

// 读取完成处理函数
void handle_read(boost::system::error_code ec, size_t n, 
                 std::shared_ptr<std::array<char, 1024>> buffer) {
    if (!ec) {
        std::cout << "读取到 " << n << " 字节数据\n";
    } else {
        std::cerr << "读取错误: " << ec.message() << "\n";
    }
}

// 启动读取操作
void start_read(tcp::socket& socket) {
    // 使用 shared_ptr 管理缓冲区
    auto buffer = std::make_shared<std::array<char, 1024>>();
    
    socket.async_read_some(
        asio::buffer(*buffer),
        std::bind(handle_read, std::placeholders::_1, std::placeholders::_2, buffer)
    );
}

使用协程是这样实现的

asio::awaitable<void> read_data(tcp::socket& socket) {
    char data[1024];
    // 使用 co_await 等待异步读取
    size_t n = co_await socket.async_read_some(
        asio::buffer(data), 
        asio::use_awaitable
    );
    std::cout << "读取到 " << n << " 字节数据\n";
}

协程直接"等待"操作完成,代码线性执行,从这个例子也看到协程的另外一个好处,就是可以使用局部变量来管理缓冲区,因为从语法上看,协程它是在一个函数里执行,缓冲区生命周期不会释放,而回调你只能用成员变量或者堆分配空间了,当然,你用lambda表达式也可以实现类型效果

2. co_return和asio::awaitable

  • 作用co_return类似return结束协程执行并返回值,这个返回值实际是asio::awaitable
//这里T是result对应类型
asio::awaitable<T> calculate_value() {
    co_return result; // 对于有返回值的协程
}

asio::awaitable<void>  do_something() {
    co_return;        // 对于 void 协程
}

下面举例一个具体的例子

//calculate_value函数将延时1秒后返回42
asio::awaitable<int> calculate_value() {
    asio::steady_timer timer(co_await asio::this_coro::executor);
    timer.expires_after(std::chrono::seconds(1));
    co_await timer.async_wait(asio::use_awaitable);
    
    // 使用 co_return 返回值
    co_return 42;
}

协程直接返回结果,类似同步函数,co_return搭配asio::awaitable使用

3. asio::use_awaitable

  • 作用:将异步操作转换为可等待对象,这个作用是告诉asio的函数这是一个协程函数,是为了和异步函数能进行重载区分用的,这个标志能让asio的协程函数和异步回调函数都一样,仅仅是传入的参数不同
//协程版本
asio::awaitable<void> wait_for_timer() {
asio::steady_timer timer(co_await asio::this_coro::executor);

// 使用 use_awaitable 使异步操作可等待
timer.expires_after(std::chrono::seconds(1));
co_await timer.async_wait(asio::use_awaitable);

std::cout << "定时器完成\n";
}

//回调版本
void wait_for_timer(asio::io_context& io) {
    auto timer = std::make_shared<asio::steady_timer>(io, std::chrono::seconds(1));
    
    timer->async_wait(
        [](const asio::error_code& ec) {
            if (!ec) {
                std::cout << "定时器完成\n";
            } else {
                std::cerr << "定时器错误: " << ec.message() << "\n";
            }
        }
    );
}

4. 启动协程 (co_spawn)

  • 作用co_spawn的作用是启动一个协程
asio::co_spawn(
    executor,           // 执行上下文 (io_context/strand)
    coroutine_function, // 协程函数
    completion_handler   // 完成回调
);

completion_handler是完成处理程序选项有两种:

  • asio::detached: 不关心协程结束(最常用)
asio::co_spawn(executor, session(std::move(socket)), asio::detached);
  • 自定义处理:
asio::co_spawn(executor, session(), 
    [](std::exception_ptr e, int result) {
        if (e) std::rethrow_exception(e);
        std::cout << "Result: " << result << "\n";
    });

这里用ASIO回调方式写一个最简单的tcp服务器是这样的:

// 会话状态管理类
class Session : public std::enable_shared_from_this<Session> {
public:
    Session(tcp::socket socket) : socket_(std::move(socket)) {}
    
    void start() {
        start_read();
    }
    
    void start_read() {
        auto self = shared_from_this();
        auto buffer = std::make_shared<std::array<char, 1024>>();
        
        socket_.async_read_some(
            asio::buffer(*buffer),
            std::bind(&Session::handle_read, this, std::placeholders::_1, std::placeholders::_2, buffer)
        );
    }
    
    void handle_read(boost::system::error_code ec, size_t n, 
                    std::shared_ptr<std::array<char, 1024>> buffer) {
        if (!ec) {
            // 处理数据...
            start_read(); // 继续读取
        }
    }
    
private:
    tcp::socket socket_;
};

int main() {
    asio::io_context io;
    tcp::socket socket(io);
    
    // 创建会话并启动
    auto session = std::make_shared<Session>(std::move(socket));
    session->start();
    
    io.run();
}

用协程可以变为这样:

int main() {
    asio::io_context io;
    tcp::socket socket(io);
    
    // 使用 co_spawn 启动协程
    asio::co_spawn(io, session(std::move(socket)), asio::detached);
    
    io.run();
}

// 协程函数
asio::awaitable<void> session(tcp::socket socket) {
    // ...协程逻辑...
    co_return;
}

asio::co_spawn启用一个协程进入session函数,然后等待其完成。

上面例子中asio::detached说明启动这个协程后就不管他的返回了,如果你要获取返回,可以传入一个回调(协程最终还是不能完全避免回调),例如:

// 完成处理函数
void handle_completion(std::exception_ptr e, int result) {
    if (e) {
        try {
            std::rethrow_exception(e);
        } catch (const std::exception& ex) {
            std::cerr << "计算错误: " << ex.what() << "\n";
        }
    } else {
        std::cout << "计算结果: " << result << "\n";
    }
}

// 可能失败的协程
asio::awaitable<int> compute_value() {
    // ...可能抛出异常的计算...
    co_return 42;
}

int main() {
    asio::io_context io;
    
    // 启动协程并指定完成处理函数
    asio::co_spawn(io, compute_value(), handle_completion);
    
    io.run();
}

ASIO协程的其它操作

1. 切换执行上下文

asio::dispatch:协程执行上下文切换,它允许开发者精确控制协程在哪个线程或执行器上运行

// 切换到指定线程的 io_context
co_await asio::dispatch(target_io_context, asio::use_awaitable);

当协程执行到co_await asio::dispatch(...)时:

  • 挂起当前协程:暂停当前执行流程
  • 调度到目标执行器:将协程续体(continuation)提交到目标执行器的队列
  • 在目标上下文恢复:当目标执行器调度该任务时,协程在目标线程恢复执行

例如:网络请求后,有些复杂计算切换到别的线程进行操作:

asio::awaitable<void> process_request() {
    // 在网络线程池处理请求
    Request req = co_await receive_request();
    
    // 切换到计算线程池处理CPU密集型任务
    co_await asio::dispatch(compute_pool, asio::use_awaitable);
    Result res = co_await heavy_computation(req);
    
    // 切回网络线程池发送响应
    co_await asio::dispatch(network_pool, asio::use_awaitable);
    co_await send_response(res);
}

还有就是线程和GUI的切换,GUI线程一般不会和网络请求在一个线程中

asio::awaitable<void> update_ui() {
    Data data = co_await fetch_data();
    
    // 切换到GUI线程更新界面
    co_await asio::dispatch(gui_executor, asio::use_awaitable);
    ui_label.set_text(data.message);
    ui_chart.update(data.values);
}

2. 协程取消

asio::cancellation_signal cancel_signal;

// 启动可取消协程
asio::co_spawn(executor, 
    [](asio::cancellation_signal sig) -> asio::awaitable<void> {
        asio::steady_timer timer(co_await asio::this_coro::executor);
        timer.expires_after(10s);
        
        // 绑定取消信号
        co_await timer.async_wait(
            asio::bind_cancellation_slot(sig.slot(), asio::use_awaitable));
    }(cancel_signal),
    asio::detached
);

// 在需要时取消
cancel_signal.emit(asio::cancellation_type::all);

协程操作符

协程操作符主要是||&&,允许多个协程同时运行,并等待所有(&&)或者某个(||)协程完成。

1.||操作符

下面举个网络编程中最常见的场景之一"带超时的异步读取"来展示协程的操作符,网络编程经常有一个需求,就是你给对方写一个数据,要n秒内等待对方回复,如果对方n秒内不回复,你要重发,最多重发m次,这个逻辑梳理为:

  • 向对方发送请求数据
  • 等待回复,设置超时时间(如3秒)
  • 如果超时未收到回复,重发请求
  • 最多重发M次(如3次)

用协程这样实现的(注意,这里协程的||操作符要include asio/experimental/awaitable_operators.hpp):

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>
#include <system_error>
#include <chrono>

namespace asio = boost::asio;
using asio::ip::tcp;
using namespace asio::experimental::awaitable_operators;
namespace chrono = std::chrono;

// 带超时重发的可靠请求
asio::awaitable<bool> reliable_request(
    tcp::socket& socket, 
    const std::string& request,
    int max_retries = 3,
    chrono::seconds timeout = 3s) 
{
    int attempt = 0;
    
    while (attempt <= max_retries) {
        attempt++;
        std::cout << "尝试 #" << attempt << "/" << max_retries << "\n";
        
        try {
            // 发送请求
            co_await asio::async_write(
                socket, asio::buffer(request), asio::use_awaitable);
            
            // 准备接收响应
            std::string response;
            response.resize(1024);
            
            // 设置定时器
            asio::steady_timer timer(co_await asio::this_coro::executor);
            timer.expires_after(timeout);
            
            // 同时等待读取和超时,这里演示了||操作符,这里面有两个协程函数
            auto result = co_await (
                socket.async_read_some(asio::buffer(response), asio::use_awaitable) ||
                timer.async_wait(asio::use_awaitable)
            );
            //只要上面有一个返回结果就会往下执行
            // 处理结果
            if (result.index() == 0) {  // 收到响应
                size_t bytes_read = std::get<0>(result).first;
                response.resize(bytes_read);
                std::cout << "收到响应: " << response << "\n";
                co_return true;
            }
            else {  // 超时
                std::cout << "请求超时, ";
                if (attempt < max_retries) {
                    std::cout << "准备重试...\n";
                } else {
                    std::cout << "已达最大重试次数\n";
                }
            }
        }
        catch (const std::exception& e) {
            std::cerr << "错误: " << e.what() << "\n";
            co_return false;
        }
    }
    
    co_return false;  // 所有尝试失败
}

// 示例使用
asio::awaitable<void> client_session(tcp::socket socket) {
    try {
        // 构造请求数据
        std::string request = "QUERY_DATA";
        
        // 发送可靠请求(最多重试3次,超时3秒)
        bool success = co_await reliable_request(socket, request, 3, 3s);
        
        if (success) {
            std::cout << "请求成功完成\n";
        } else {
            std::cout << "请求失败\n";
        }
    }
    catch (const std::exception& e) {
        std::cerr << "会话错误: " << e.what() << "\n";
    }
}

int main() {
    asio::io_context io;
    
    // 创建并连接socket(示例)
    tcp::socket socket(io);
    // 实际中应连接服务器: socket.connect(endpoint);
    
    // 启动客户端会话
    asio::co_spawn(io, client_session(std::move(socket)), asio::detached);
    
    io.run();
    return 0;
}

上面这个协程实现还是比较清晰的,如果用回调实现,则非常麻烦:

#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <functional>

namespace asio = boost::asio;
using asio::ip::tcp;
namespace chrono = std::chrono;

// 前向声明
class ReliableRequest;
void handle_write(boost::system::error_code ec, size_t, 
                  std::shared_ptr<ReliableRequest> request);
void handle_read(boost::system::error_code ec, size_t bytes_read,
                 std::shared_ptr<ReliableRequest> request);
void handle_timeout(boost::system::error_code ec,
                    std::shared_ptr<ReliableRequest> request);
void start_request(std::shared_ptr<ReliableRequest> request);

// 请求状态管理类
class ReliableRequest : public std::enable_shared_from_this<ReliableRequest> {
public:
    ReliableRequest(tcp::socket& socket, std::string request, 
                   int max_retries, chrono::seconds timeout)
        : socket_(std::move(socket))
        , request_(std::move(request))
        , max_retries_(max_retries)
        , timeout_(timeout)
        , timer_(socket_.get_executor())
        , attempt_(0)
    {}
    
    void start() {
        attempt_ = 0;
        start_attempt();
    }
    
private:
    friend void handle_write(boost::system::error_code, size_t, 
                            std::shared_ptr<ReliableRequest>);
    friend void handle_read(boost::system::error_code, size_t,
                           std::shared_ptr<ReliableRequest>);
    friend void handle_timeout(boost::system::error_code,
                              std::shared_ptr<ReliableRequest>);
    
    void start_attempt() {
        attempt_++;
        std::cout << "尝试 #" << attempt_ << "/" << max_retries_ << "\n";
        
        auto self = shared_from_this();
        
        // 启动异步写入
        asio::async_write(
            socket_, asio::buffer(request_),
            std::bind(handle_write, std::placeholders::_1, std::placeholders::_2, self)
        );
    }
    
    void start_wait() {
        auto self = shared_from_this();
        response_buffer_ = std::make_shared<std::array<char, 1024>>();
        
        // 启动异步读取
        socket_.async_read_some(
            asio::buffer(*response_buffer_),
            std::bind(handle_read, std::placeholders::_1, std::placeholders::_2, self)
        );
        
        // 启动超时定时器
        timer_.expires_after(timeout_);
        timer_.async_wait(
            std::bind(handle_timeout, std::placeholders::_1, self)
        );
    }
    
    void handle_success(size_t bytes_read) {
        std::string response(response_buffer_->data(), bytes_read);
        std::cout << "收到响应: " << response << "\n";
        if (completion_) completion_(true);
    }
    
    void handle_failure() {
        std::cout << "请求失败\n";
        if (completion_) completion_(false);
    }
    
    void handle_retry() {
        if (attempt_ < max_retries_) {
            std::cout << "准备重试...\n";
            start_attempt();
        } else {
            std::cout << "已达最大重试次数\n";
            handle_failure();
        }
    }

public:
    // 完成回调
    using CompletionHandler = std::function<void(bool)>;
    CompletionHandler completion_;
    
    tcp::socket socket_;
    std::string request_;
    int max_retries_;
    chrono::seconds timeout_;
    asio::steady_timer timer_;
    int attempt_;
    std::shared_ptr<std::array<char, 1024>> response_buffer_;
};

// 写入完成处理
void handle_write(boost::system::error_code ec, size_t, 
                 std::shared_ptr<ReliableRequest> request) {
    if (ec) {
        std::cerr << "写入错误: " << ec.message() << "\n";
        request->handle_failure();
        return;
    }
    
    request->start_wait();
}

// 读取完成处理
void handle_read(boost::system::error_code ec, size_t bytes_read,
                std::shared_ptr<ReliableRequest> request) {
    // 取消定时器
    request->timer_.cancel();
    
    if (ec == asio::error::operation_aborted) {
        return; // 超时已处理
    }
    
    if (ec) {
        std::cerr << "读取错误: " << ec.message() << "\n";
        request->handle_retry();
        return;
    }
    
    request->handle_success(bytes_read);
}

// 超时处理
void handle_timeout(boost::system::error_code ec,
                   std::shared_ptr<ReliableRequest> request) {
    if (ec == asio::error::operation_aborted) {
        return; // 读取已完成
    }
    
    if (ec) {
        std::cerr << "定时器错误: " << ec.message() << "\n";
        request->handle_retry();
        return;
    }
    
    // 取消读取操作
    request->socket_.cancel();
    
    std::cout << "请求超时, ";
    request->handle_retry();
}

// 启动可靠请求
void start_reliable_request(tcp::socket socket, std::string request,
                           ReliableRequest::CompletionHandler completion,
                           int max_retries = 3, chrono::seconds timeout = 3s) {
    auto request_obj = std::make_shared<ReliableRequest>(
        std::move(socket), std::move(request), max_retries, timeout
    );
    request_obj->completion_ = completion;
    request_obj->start();
}

// 示例使用
void handle_request_complete(bool success) {
    if (success) {
        std::cout << "请求成功完成\n";
    } else {
        std::cout << "请求失败\n";
    }
}

int main() {
    asio::io_context io;
    
    // 创建socket(示例)
    tcp::socket socket(io);
    // 实际中应连接服务器
    
    // 启动可靠请求
    start_reliable_request(
        std::move(socket), 
        "QUERY_DATA", 
        handle_request_complete,
        3, 
        chrono::seconds(3)
    );
    
    io.run();
    return 0;
}

2. &&操作符

&&操作符允许你同时启动多个异步操作,等待所有操作完成,以元组形式获取所有结果

auto [result1, result2] = co_await (
    async_op1(use_awaitable) &&
    async_op2(use_awaitable)
);

例如要多个接口结果执行完成后才返回,那么久可以用&&操作符

asio::awaitable<SearchResults> search_products(string_view query) {
    // 并行搜索多个分类
    auto [electronics, clothing, books] = co_await (
        product_service.search("electronics", query) &&
        product_service.search("clothing", query) &&
        product_service.search("books", query)
    );
    ...
    //合并到results
    co_return results;
}

总结

Asio协程方便了C++高并发服务器的开发,在保持异步高性能的同时,提供同步代码的简洁性,同步执行流简化调试和性能分析,随着C++20标准的广泛采用,协程已成为构建下一代高性能服务器的首选范式。

但要注意的是,协程并不会提高程序运行的效率,从原理上讲,协程有可能还没有回调高效,但这个性能并不会有太大的差距


网站公告

今日签到

点亮在社区的每一天
去签到