ASIO 避坑指南:高效、安全与稳健的异步网络编程

发布于:2025-06-27 ⋅ 阅读:(17) ⋅ 点赞:(0)

ASIO 避坑指南:高效、安全与稳健的异步网络编程

引言

ASIO是很强大的一个异步io库,但服务器除了高效以外,稳定也是关键,这篇文章主要总结了ASIO使用遇到的典型问题和坑:

  • 如何榨干io_context的性能,让CPU和网卡持续饱和工作?
  • 如何安全地关闭一个连接,避免资源泄漏或程序崩溃?(特别是当异步操作还在进行时)
  • 如何正确地实现一个异步写操作,确保数据完整发送且内存安全?
  • 如何管理跨越多个异步操作的对象生命周期
  • 如何设计缓冲区(Buffer) 才能避免悬垂指针或数据竞争?
  • 如何在多线程环境下安全地操作共享资源?
  • 如何处理错误码,哪些错误码要特殊处理,例如operation_abortedeof

上面这些问题处理不好,轻则导致性能低下、资源泄漏,重则引发程序崩溃、数据错误,是基于ASIO开发服务器必须要了解清楚的点。


一、最大化利用 I/O (榨干 io_context 的性能)

很多人包括很多博客对asio的多线程操作仅仅照搬例子进行介绍,没有真正的线上实战,asio多线程有两种方法:

  1. 单一 I/O 上下文 + 多工作线程

    • io_context 实例
    • 多个线程调用 io_context.run()
    • 事件分发机制:操作系统将就绪事件分配给不同工作线程执行
  2. 多 I/O 上下文 (io_context per Thread)

    • 每个线程独占一个 io_context 实例
    • 每个线程调用自己 io_context.run()
    • 资源隔离:Socket/Timer 绑定到特定线程的 io_context

io_context 的核心是事件循环。一个线程调用 run() 通常足以高效处理数千连接,如果多个线程都执行 run() ,那么事件会分配到多个线程中执行,这样你首先要考虑的是线程安全,每个回调如果调用了共享资源都需要枷锁,这会降低运行效率。

单一 I/O 上下文 + 多工作线程

// 创建线程池执行同一个io_context
asio::io_context io;
asio::thread_pool pool(4); // 4个线程

// 多个线程执行同一个io_context的run()
for(int i=0; i<4; ++i){
    asio::post(pool, [&]{ io.run(); });
}

// 注意:所有Handler可能在不同线程执行!
socket.async_read_some(..., [](...){
    // 需要线程同步!可能被任意线程执行
});

优势

  • 最佳负载均衡:内核自动分配事件到空闲线程
  • 简化资源管理:所有操作共享单一I/O上下文

劣势

  • 锁竞争开销:共享资源访问需要同步,抵消多线程收益

ASIO针对这种情况提供了 strand 进行序列化访问

例如:

// 创建strand绑定到io_context
asio::strand<asio::io_context::executor_type> my_strand = 
    asio::make_strand(io.get_executor());

// 通过strand分发处理程序
socket.async_read_some(asio::bind_executor(my_strand, 
    [](...){
        // 保证同一strand上的handler不会并发执行
        connections.erase(id); // 无需锁!
    }
));

每线程独立 I/O 上下文 (io_context per Thread)

这是推荐做法,经过本人验证,能极大提高并发处理能力

// 每个线程拥有独立io_context
std::vector<std::unique_ptr<asio::io_context>> io_contexts;
std::vector<std::thread> threads;

for(int i=0; i<4; ++i){
    io_contexts.emplace_back(std::make_unique<asio::io_context>());
    threads.emplace_back([ioc=io_contexts.back().get()]{
        ioc->run(); // 每个线程运行自己的io_context
    });
}

// 连接绑定到特定io_context
auto& io = *io_contexts[connection_id % 4];
tcp::socket socket(io);

优势

  • 处理程序始终在同一线程执行,避免线程切换开销
  • 能更大程度发挥单个io的性能

保证异步操作链的持续

一个异步操作完成时,在其完成处理函数 (Completion Handler) 中发起下一个异步操作(如 async_read 后发起 async_write,或继续 async_read),这样可以保持 I/O 通道持续忙碌,避免轮询

要注意的是,一定要避免在 回调 中做耗时同步操作阻塞事件循环。

高效 Buffer 管理

asio::buffer 是视图: 它不拥有数据,只是指向现有内存块的引用,处理不当会导致野指针、数据损坏或程序崩溃。底层数据必须在异步操作期间保持有效!

绝对要避免使用栈分配的内容做作为 Buffer,例如下面这个就是典型的错误:

//错误示范
void do_async_write(tcp::socket& socket) {
    char buffer[1024]; // 错误:栈分配缓冲区
    generate_data(buffer, 1024); // 填充数据
    
    // 异步写操作 - 缓冲区可能在函数返回后失效!
    socket.async_write_some(asio::buffer(buffer, 1024),
        [](const asio::error_code& ec, size_t bytes) {
            // 此时原buffer栈帧已销毁 - 野指针访问!
        });
} // 函数退出,栈缓冲区被销毁!

除非你用的是协程模式,否则不要用栈分配内存做 Buffer,因为异步操作结束后,栈内存会被回收,Buffer 就会变成无效的,过一段时间在执行回调你的buffer里面就是野指针,因此要严格保证 async_read/async_write 使用的 buffer 底层内存在其整个操作期间(从调用开始到 Handler 执行结束)有效且不被修改

你应该使用智能指针来分配缓冲,并让这个智能指针跟随回调函数,直至回调函数结束,典型的就是让lambda把这个智能指针捕获,让它跟着回调函数的生命周期。

void send_large_data(tcp::socket& socket) {
    // 使用shared_ptr管理堆缓冲区
    auto buf = std::make_shared<std::vector<char>>(generate_large_data());
    
    asio::async_write(socket, asio::buffer(*buf),
        // 捕获智能指针延长生命周期
        [buf](const asio::error_code& ec, size_t) {
            // 缓冲区在lambda销毁前保持有效
        });
}

或者作为session的成员变量

class Connection : public std::enable_shared_from_this<Connection> {
    std::array<char, 8192> buffer_; // 成员缓冲区
    
    void start_read() {
        auto self(shared_from_this());
        socket_.async_read_some(asio::buffer(buffer_),
            [self](const asio::error_code& ec, size_t length) {
                if (!ec) self->process_data(length);
            });
    }
};

如果是linux系统,还可以用零拷贝缓冲区注册方法,让io和回调都操作这个缓冲区,从而避免了数据拷贝。

// 注册持久内存到io_context
auto buf = std::make_shared<std::array<char, 4096>>();
asio::io_context& ioc = socket.get_executor().context();

// 显式注册缓冲区(Linux专属优化)
const bool registered = asio::register_buffer(ioc, 
    asio::buffer(*buf), asio::buffer_registration::permanent);

socket.async_read_some(asio::buffer(*buf),
    [buf](const asio::error_code& ec, size_t bytes) {
        // 缓冲区保持注册状态
    });

这种尤其适合高频小包数据的处理

安全关闭 Socket 和连接

关闭是异步编程中最容易出资源泄漏或崩溃的地方。关闭做的不好,会出现如下问题:

  • 资源泄漏(文件描述符、内存)
  • 大量CLOSE_WAIT状态连接
  • 程序崩溃(访问已销毁对象)
  • 数据丢失(未发送完的数据)

Socket的关闭有shutdown()close()两个行数

socket.shutdown

shutdown可以理解为是关闭通知,有三种模式(shutdown_receive, shutdown_send, shutdown_both),通知对端“我不会再发数据了”(shutdown_send)或“我不想再收数据了”(shutdown_receive)。

shutdown执行后,后续的 async_read 会立即完成并返回 asio::error::shut_down (如果接收端关闭),后续的 async_write 会立即完成并返回 asio::error::shut_down (如果发送端关闭)。

需要注意的是,shutdown()后,Socket 描述符依然有效。

socket.close

socket.close会释放系统资源(Socket 描述符),它会隐式地执行 shutdown(both)。任何挂起(Pending)的异步操作(async_read, async_write, async_connect 等)会立即取消,它们的回调函数会被调用,并传入 asio::error::operation_aborted 错误码。

因此,在读写回调中,遇到asio::error::operation_aborted 错误码要特殊处理,避免重复关闭

回调函数设计时,应检查错误码,如果是 operation_aborted,通常意味着 Socket 正在被关闭/销毁,回调函数应该:

  • 忽略这个操作的结果。
  • 清理相关的资源(如释放为这次操作分配的 Buffer)。
  • 避免再访问Socket

当调用 socket.close() 取消操作时,包含 socket 的对象(如 connection)可能正在被销毁

安全关闭方法

关闭分服务器主动关闭,以及对方客户端主动关闭,两种不同方式的关闭处理方式不太一样

  • 服务器主动关闭
  1. 标记关闭开始,执行shutdown(socket, asio::ip::tcp::socket::shutdown_receive); // 告诉对方我不再接收数据了
  2. 检查是否有待发送数据,无数据 → 立即关闭,有数据 → 等待当前写操作完成
// 在管理类中触发关闭
void ConnectionManager::stop_all() {
    for (auto& conn : connections_) {
        conn->safe_shutdown();
    }
}

// Connection::safe_shutdown实现:
void safe_shutdown() {
    if (shutdown_initiated_.exchange(true)) return;
    
    // 1. 停止接收新数据
    socket_.shutdown(shutdown_receive);
    
    // 2. 检查写状态
    if (!writing_) {
        final_close();  // 无待发数据直接关闭
    }
    // 否则等待进行中的写操作完成
}
  • 客户端主动关闭
  1. async_read Handler 中检测到 error_code == asio::error::eof (对方正常关闭发送端)
  2. (可选)如果还有数据要发送,可以尝试发送(但对方可能已关闭接收端,会出错)。
  3. 调用 socket.close()

下面是一个客户端的安全关闭示例:

void Connection::handle_read_error(asio::error_code ec) {
    if (ec == asio::error::eof) {
        // 客户端发送了FIN包
        safe_shutdown();
    }
    else if (ec == asio::error::operation_aborted) {
        // 正常关闭过程中的取消
        // 不进行任何操作,连接即将销毁
        return;
    }
}

因此,一个安全的关闭不仅仅是close,还要针对不同的错误码来执行不同的关闭策略,在接收和发送的错误码处理不一样,建议一个回话应该对错误码处理单独提取一个函数,如下:

class Connection : public std::enable_shared_from_this<Connection> {
private:
    asio::ip::tcp::socket socket_;
    std::queue<std::vector<char>> write_queue_;
    bool writing_;
    std::atomic<bool> shutdown_initiated_;
    std::array<char, 1024> read_buffer_;
public:
    Connection(asio::ip::tcp::socket socket)
        : socket_(std::move(socket)), 
          writing_(false),
          shutdown_initiated_(false) {}
    
    void start() {
        read_header();  // 开始读循环
    }

    void safe_shutdown() {
        if (shutdown_initiated_.exchange(true)) return;
        
        // 1. 停止接受新数据
        asio::error_code ec;
        socket_.shutdown(asio::ip::tcp::socket::shutdown_receive, ec);
        // 忽略错误:可能已关闭
        
        // 2. 检查写队列
        if (!writing_) {
            // 无待发送数据,直接关闭
            final_close();
        } else {
            // 等待进行中的写操作完成
            // final_close将在写回调中调用
        }
    }

private:

    
    void read_header() {
        auto self(shared_from_this());
        socket_.async_read_some(asio::buffer(read_buffer_),
            [this, self](asio::error_code ec, size_t length) {
                if (ec) {
                    handle_read_error(ec);
                    return;
                }
                process_data(length);
                read_header();  // 继续读
            });
    }
    
    void handle_read_error(asio::error_code ec) {
        if (ec == asio::error::eof) {
            // 客户端正常关闭
            safe_shutdown();
        } else if (ec == asio::error::operation_aborted) {
            // 关闭过程中的正常取消
        } else {
            // 其他错误
            final_close();
        }
    }
    
    void async_write_data(std::vector<char> data) {
        // 将数据加入队列
        bool write_in_progress = !write_queue_.empty();
        write_queue_.push(std::move(data));
        
        if (!write_in_progress && !writing_) {
            start_write_chain();
        }
    }
    
    void start_write_chain() {
        writing_ = true;
        auto self(shared_from_this());
        auto& buf = write_queue_.front();
        
        asio::async_write(socket_, asio::buffer(buf),
            [this, self](asio::error_code ec, size_t /*bytes*/) {
                writing_ = false;
                
                if (ec) {
                    handle_write_error(ec);
                    return;
                }
                
                write_queue_.pop();
                
                if (!write_queue_.empty()) {
                    start_write_chain();
                } else if (shutdown_initiated_) {
                    // 所有数据已发送,安全关闭
                    final_close();
                }
            });
    }
    
    void handle_write_error(asio::error_code ec) {
        if (ec == asio::error::operation_aborted) {
            // 正常取消,忽略
        } else if (ec == asio::error::broken_pipe || 
                   ec == asio::error::connection_reset) {
            // 连接已断开
            final_close();
        } else {
            // 其他错误处理
            safe_shutdown();
        }
    }
    
    void final_close() {
        asio::error_code ignore_ec;
        
        // 取消所有异步操作
        socket_.cancel(ignore_ec);
        
        // 关闭socket
        socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ignore_ec);
        socket_.close(ignore_ec);
        
        // 清理资源
        decltype(write_queue_) empty;
        std::swap(write_queue_, empty);
    }
};

上面的例子不仅展示了安全关闭,还包含了安全发送,关闭和发生接收关联紧密,因此很难用一句函数就能涵盖,上面的例子的发送用来一个队列,这引出下一节,如何用asio进行高并发的安全的异步写操作

安全的异步写操作

除了上面提到的buffer有效性外,异步写操作有其特定要点:

  • 同一 Socket 上的多个并发 async_write 操作是未定义行为! TCP 是流协议,数据顺序必须保证。
  • 必须使用队列(见上面的例子)。同一时间只允许一个 async_write 操作在进行,等回调完了之后,再写入下一个

写回调应该作如下工作:

  • 检查 error_code (包括 operation_aborted,具体见上节安全关闭)。
  • 处理 bytes_transferred (通常成功时等于请求量)。
  • 释放或标记该次写操作使用的 Buffer 可重用/释放。
  • **检查写队列:如果队列非空,取出下一批数据发起新的async_write,如果队列为空,设置 writing_ = false

如上面的例子所示,安全大并发的异步写操作,应该如下:

class Connection : public std::enable_shared_from_this<Connection> {
private:
    asio::ip::tcp::socket socket_;
    std::queue<std::vector<char>> write_queue_;
    bool writing_;
    std::atomic<bool> shutdown_initiated_;
    std::array<char, 1024> read_buffer_;
public:
    //这里省略其他函数,在安全关闭里已经展示完整代码
    void async_write_data(std::vector<char> data) {
        // 将数据加入队列
        bool write_in_progress = !write_queue_.empty();
        write_queue_.push(std::move(data));
        
        if (!write_in_progress && !writing_) {
            start_write_chain();
        }
    }
    
    void start_write_chain() {
        writing_ = true;
        auto self(shared_from_this());
        auto& buf = write_queue_.front();
        
        asio::async_write(socket_, asio::buffer(buf),
            [this, self](asio::error_code ec, size_t /*bytes*/) {
                writing_ = false;
                
                if (ec) {
                    handle_write_error(ec);
                    return;
                }
                
                write_queue_.pop();
                
                if (!write_queue_.empty()) {
                    start_write_chain();
                } else if (shutdown_initiated_) {
                    // 所有数据已发送,安全关闭
                    final_close();
                }
            });
    }
    
    void handle_write_error(asio::error_code ec) {
        if (ec == asio::error::operation_aborted) {
            // 正常取消,忽略
        } else if (ec == asio::error::broken_pipe || 
                   ec == asio::error::connection_reset) {
            // 连接已断开
            final_close();
        } else {
            // 其他错误处理
            safe_shutdown();
        }
    }
};
  • 要有个写队列,示例中的std::queue<std::shared_ptr<std::string>> write_queue_;
  • 写状态标记writing_,主要作用是在关闭时,检查是否还没写完,没写完就等写完再关闭socket
  • 关闭标记shutdown_initiated_,这个主要作用是关闭标记,如果写完发现已经关闭,直接调用close,通过shutdown_initiated_writing_可以实现安全的关闭,同时保证写数据不会丢失

网站公告

今日签到

点亮在社区的每一天
去签到