ASIO 避坑指南:高效、安全与稳健的异步网络编程
引言
ASIO是很强大的一个异步io库,但服务器除了高效以外,稳定也是关键,这篇文章主要总结了ASIO使用遇到的典型问题和坑:
- 如何榨干
io_context
的性能,让CPU和网卡持续饱和工作? - 如何安全地关闭一个连接,避免资源泄漏或程序崩溃?(特别是当异步操作还在进行时)
- 如何正确地实现一个异步写操作,确保数据完整发送且内存安全?
- 如何管理跨越多个异步操作的对象生命周期?
- 如何设计缓冲区(Buffer) 才能避免悬垂指针或数据竞争?
- 如何在多线程环境下安全地操作共享资源?
- 如何处理错误码,哪些错误码要特殊处理,例如
operation_aborted
和eof
?
上面这些问题处理不好,轻则导致性能低下、资源泄漏,重则引发程序崩溃、数据错误,是基于ASIO开发服务器必须要了解清楚的点。
一、最大化利用 I/O (榨干 io_context
的性能)
很多人包括很多博客对asio的多线程操作仅仅照搬例子进行介绍,没有真正的线上实战,asio多线程有两种方法:
单一 I/O 上下文 + 多工作线程
- 单
io_context
实例 - 多个线程调用
io_context.run()
- 事件分发机制:操作系统将就绪事件分配给不同工作线程执行
- 单
多 I/O 上下文 (io_context per Thread)
- 每个线程独占一个
io_context
实例 - 每个线程调用自己
io_context.run()
- 资源隔离:Socket/Timer 绑定到特定线程的
io_context
- 每个线程独占一个
io_context
的核心是事件循环。一个线程调用 run()
通常足以高效处理数千连接,如果多个线程都执行 run()
,那么事件会分配到多个线程中执行,这样你首先要考虑的是线程安全,每个回调如果调用了共享资源都需要枷锁,这会降低运行效率。
单一 I/O 上下文 + 多工作线程
// 创建线程池执行同一个io_context
asio::io_context io;
asio::thread_pool pool(4); // 4个线程
// 多个线程执行同一个io_context的run()
for(int i=0; i<4; ++i){
asio::post(pool, [&]{ io.run(); });
}
// 注意:所有Handler可能在不同线程执行!
socket.async_read_some(..., [](...){
// 需要线程同步!可能被任意线程执行
});
优势:
- 最佳负载均衡:内核自动分配事件到空闲线程
- 简化资源管理:所有操作共享单一I/O上下文
劣势:
- 锁竞争开销:共享资源访问需要同步,抵消多线程收益
ASIO针对这种情况提供了 strand
进行序列化访问
例如:
// 创建strand绑定到io_context
asio::strand<asio::io_context::executor_type> my_strand =
asio::make_strand(io.get_executor());
// 通过strand分发处理程序
socket.async_read_some(asio::bind_executor(my_strand,
[](...){
// 保证同一strand上的handler不会并发执行
connections.erase(id); // 无需锁!
}
));
每线程独立 I/O 上下文 (io_context per Thread)
这是推荐做法,经过本人验证,能极大提高并发处理能力
// 每个线程拥有独立io_context
std::vector<std::unique_ptr<asio::io_context>> io_contexts;
std::vector<std::thread> threads;
for(int i=0; i<4; ++i){
io_contexts.emplace_back(std::make_unique<asio::io_context>());
threads.emplace_back([ioc=io_contexts.back().get()]{
ioc->run(); // 每个线程运行自己的io_context
});
}
// 连接绑定到特定io_context
auto& io = *io_contexts[connection_id % 4];
tcp::socket socket(io);
优势:
- 处理程序始终在同一线程执行,避免线程切换开销
- 能更大程度发挥单个io的性能
保证异步操作链的持续
一个异步操作完成时,在其完成处理函数 (Completion Handler) 中发起下一个异步操作(如 async_read
后发起 async_write
,或继续 async_read
),这样可以保持 I/O 通道持续忙碌,避免轮询
要注意的是,一定要避免在 回调 中做耗时同步操作阻塞事件循环。
高效 Buffer 管理
asio::buffer
是视图: 它不拥有数据,只是指向现有内存块的引用,处理不当会导致野指针、数据损坏或程序崩溃。底层数据必须在异步操作期间保持有效!
绝对要避免使用栈分配的内容做作为 Buffer,例如下面这个就是典型的错误:
//错误示范
void do_async_write(tcp::socket& socket) {
char buffer[1024]; // 错误:栈分配缓冲区
generate_data(buffer, 1024); // 填充数据
// 异步写操作 - 缓冲区可能在函数返回后失效!
socket.async_write_some(asio::buffer(buffer, 1024),
[](const asio::error_code& ec, size_t bytes) {
// 此时原buffer栈帧已销毁 - 野指针访问!
});
} // 函数退出,栈缓冲区被销毁!
除非你用的是协程模式,否则不要用栈分配内存做 Buffer,因为异步操作结束后,栈内存会被回收,Buffer 就会变成无效的,过一段时间在执行回调你的buffer里面就是野指针,因此要严格保证 async_read
/async_write
使用的 buffer 底层内存在其整个操作期间(从调用开始到 Handler 执行结束)有效且不被修改
你应该使用智能指针来分配缓冲,并让这个智能指针跟随回调函数,直至回调函数结束,典型的就是让lambda把这个智能指针捕获,让它跟着回调函数的生命周期。
void send_large_data(tcp::socket& socket) {
// 使用shared_ptr管理堆缓冲区
auto buf = std::make_shared<std::vector<char>>(generate_large_data());
asio::async_write(socket, asio::buffer(*buf),
// 捕获智能指针延长生命周期
[buf](const asio::error_code& ec, size_t) {
// 缓冲区在lambda销毁前保持有效
});
}
或者作为session的成员变量
class Connection : public std::enable_shared_from_this<Connection> {
std::array<char, 8192> buffer_; // 成员缓冲区
void start_read() {
auto self(shared_from_this());
socket_.async_read_some(asio::buffer(buffer_),
[self](const asio::error_code& ec, size_t length) {
if (!ec) self->process_data(length);
});
}
};
如果是linux系统,还可以用零拷贝缓冲区注册方法,让io和回调都操作这个缓冲区,从而避免了数据拷贝。
// 注册持久内存到io_context
auto buf = std::make_shared<std::array<char, 4096>>();
asio::io_context& ioc = socket.get_executor().context();
// 显式注册缓冲区(Linux专属优化)
const bool registered = asio::register_buffer(ioc,
asio::buffer(*buf), asio::buffer_registration::permanent);
socket.async_read_some(asio::buffer(*buf),
[buf](const asio::error_code& ec, size_t bytes) {
// 缓冲区保持注册状态
});
这种尤其适合高频小包数据的处理
安全关闭 Socket 和连接
关闭是异步编程中最容易出资源泄漏或崩溃的地方。关闭做的不好,会出现如下问题:
- 资源泄漏(文件描述符、内存)
- 大量CLOSE_WAIT状态连接
- 程序崩溃(访问已销毁对象)
- 数据丢失(未发送完的数据)
Socket的关闭有shutdown()
和 close()
两个行数
socket.shutdown
shutdown
可以理解为是关闭通知,有三种模式(shutdown_receive
, shutdown_send
, shutdown_both
),通知对端“我不会再发数据了”(shutdown_send
)或“我不想再收数据了”(shutdown_receive
)。
shutdown
执行后,后续的 async_read
会立即完成并返回 asio::error::shut_down
(如果接收端关闭),后续的 async_write
会立即完成并返回 asio::error::shut_down
(如果发送端关闭)。
需要注意的是,shutdown()
后,Socket 描述符依然有效。
socket.close
socket.close
会释放系统资源(Socket 描述符),它会隐式地执行 shutdown(both)
。任何挂起(Pending)的异步操作(async_read
, async_write
, async_connect
等)会立即取消,它们的回调函数会被调用,并传入 asio::error::operation_aborted
错误码。
因此,在读写回调中,遇到asio::error::operation_aborted
错误码要特殊处理,避免重复关闭
回调函数设计时,应检查错误码,如果是 operation_aborted
,通常意味着 Socket 正在被关闭/销毁,回调函数应该:
- 忽略这个操作的结果。
- 清理相关的资源(如释放为这次操作分配的 Buffer)。
- 避免再访问Socket
当调用 socket.close()
取消操作时,包含 socket
的对象(如 connection
)可能正在被销毁
安全关闭方法
关闭分服务器主动关闭,以及对方客户端主动关闭,两种不同方式的关闭处理方式不太一样
- 服务器主动关闭
- 标记关闭开始,执行
shutdown(socket, asio::ip::tcp::socket::shutdown_receive);
// 告诉对方我不再接收数据了 - 检查是否有待发送数据,无数据 → 立即关闭,有数据 → 等待当前写操作完成
// 在管理类中触发关闭
void ConnectionManager::stop_all() {
for (auto& conn : connections_) {
conn->safe_shutdown();
}
}
// Connection::safe_shutdown实现:
void safe_shutdown() {
if (shutdown_initiated_.exchange(true)) return;
// 1. 停止接收新数据
socket_.shutdown(shutdown_receive);
// 2. 检查写状态
if (!writing_) {
final_close(); // 无待发数据直接关闭
}
// 否则等待进行中的写操作完成
}
- 客户端主动关闭
- 在
async_read
Handler 中检测到error_code == asio::error::eof
(对方正常关闭发送端) - (可选)如果还有数据要发送,可以尝试发送(但对方可能已关闭接收端,会出错)。
- 调用
socket.close()
。
下面是一个客户端的安全关闭示例:
void Connection::handle_read_error(asio::error_code ec) {
if (ec == asio::error::eof) {
// 客户端发送了FIN包
safe_shutdown();
}
else if (ec == asio::error::operation_aborted) {
// 正常关闭过程中的取消
// 不进行任何操作,连接即将销毁
return;
}
}
因此,一个安全的关闭不仅仅是close,还要针对不同的错误码来执行不同的关闭策略,在接收和发送的错误码处理不一样,建议一个回话应该对错误码处理单独提取一个函数,如下:
class Connection : public std::enable_shared_from_this<Connection> {
private:
asio::ip::tcp::socket socket_;
std::queue<std::vector<char>> write_queue_;
bool writing_;
std::atomic<bool> shutdown_initiated_;
std::array<char, 1024> read_buffer_;
public:
Connection(asio::ip::tcp::socket socket)
: socket_(std::move(socket)),
writing_(false),
shutdown_initiated_(false) {}
void start() {
read_header(); // 开始读循环
}
void safe_shutdown() {
if (shutdown_initiated_.exchange(true)) return;
// 1. 停止接受新数据
asio::error_code ec;
socket_.shutdown(asio::ip::tcp::socket::shutdown_receive, ec);
// 忽略错误:可能已关闭
// 2. 检查写队列
if (!writing_) {
// 无待发送数据,直接关闭
final_close();
} else {
// 等待进行中的写操作完成
// final_close将在写回调中调用
}
}
private:
void read_header() {
auto self(shared_from_this());
socket_.async_read_some(asio::buffer(read_buffer_),
[this, self](asio::error_code ec, size_t length) {
if (ec) {
handle_read_error(ec);
return;
}
process_data(length);
read_header(); // 继续读
});
}
void handle_read_error(asio::error_code ec) {
if (ec == asio::error::eof) {
// 客户端正常关闭
safe_shutdown();
} else if (ec == asio::error::operation_aborted) {
// 关闭过程中的正常取消
} else {
// 其他错误
final_close();
}
}
void async_write_data(std::vector<char> data) {
// 将数据加入队列
bool write_in_progress = !write_queue_.empty();
write_queue_.push(std::move(data));
if (!write_in_progress && !writing_) {
start_write_chain();
}
}
void start_write_chain() {
writing_ = true;
auto self(shared_from_this());
auto& buf = write_queue_.front();
asio::async_write(socket_, asio::buffer(buf),
[this, self](asio::error_code ec, size_t /*bytes*/) {
writing_ = false;
if (ec) {
handle_write_error(ec);
return;
}
write_queue_.pop();
if (!write_queue_.empty()) {
start_write_chain();
} else if (shutdown_initiated_) {
// 所有数据已发送,安全关闭
final_close();
}
});
}
void handle_write_error(asio::error_code ec) {
if (ec == asio::error::operation_aborted) {
// 正常取消,忽略
} else if (ec == asio::error::broken_pipe ||
ec == asio::error::connection_reset) {
// 连接已断开
final_close();
} else {
// 其他错误处理
safe_shutdown();
}
}
void final_close() {
asio::error_code ignore_ec;
// 取消所有异步操作
socket_.cancel(ignore_ec);
// 关闭socket
socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ignore_ec);
socket_.close(ignore_ec);
// 清理资源
decltype(write_queue_) empty;
std::swap(write_queue_, empty);
}
};
上面的例子不仅展示了安全关闭,还包含了安全发送,关闭和发生接收关联紧密,因此很难用一句函数就能涵盖,上面的例子的发送用来一个队列,这引出下一节,如何用asio进行高并发的安全的异步写操作
安全的异步写操作
除了上面提到的buffer有效性外,异步写操作有其特定要点:
- 同一 Socket 上的多个并发
async_write
操作是未定义行为! TCP 是流协议,数据顺序必须保证。 - 必须使用队列(见上面的例子)。同一时间只允许一个
async_write
操作在进行,等回调完了之后,再写入下一个
写回调应该作如下工作:
- 检查
error_code
(包括operation_aborted
,具体见上节安全关闭)。 - 处理
bytes_transferred
(通常成功时等于请求量)。 - 释放或标记该次写操作使用的 Buffer 可重用/释放。
- **检查写队列:如果队列非空,取出下一批数据发起新的
async_write
,如果队列为空,设置writing_ = false
。
如上面的例子所示,安全大并发的异步写操作,应该如下:
class Connection : public std::enable_shared_from_this<Connection> {
private:
asio::ip::tcp::socket socket_;
std::queue<std::vector<char>> write_queue_;
bool writing_;
std::atomic<bool> shutdown_initiated_;
std::array<char, 1024> read_buffer_;
public:
//这里省略其他函数,在安全关闭里已经展示完整代码
void async_write_data(std::vector<char> data) {
// 将数据加入队列
bool write_in_progress = !write_queue_.empty();
write_queue_.push(std::move(data));
if (!write_in_progress && !writing_) {
start_write_chain();
}
}
void start_write_chain() {
writing_ = true;
auto self(shared_from_this());
auto& buf = write_queue_.front();
asio::async_write(socket_, asio::buffer(buf),
[this, self](asio::error_code ec, size_t /*bytes*/) {
writing_ = false;
if (ec) {
handle_write_error(ec);
return;
}
write_queue_.pop();
if (!write_queue_.empty()) {
start_write_chain();
} else if (shutdown_initiated_) {
// 所有数据已发送,安全关闭
final_close();
}
});
}
void handle_write_error(asio::error_code ec) {
if (ec == asio::error::operation_aborted) {
// 正常取消,忽略
} else if (ec == asio::error::broken_pipe ||
ec == asio::error::connection_reset) {
// 连接已断开
final_close();
} else {
// 其他错误处理
safe_shutdown();
}
}
};
- 要有个写队列,示例中的
std::queue<std::shared_ptr<std::string>> write_queue_;
- 写状态标记
writing_
,主要作用是在关闭时,检查是否还没写完,没写完就等写完再关闭socket - 关闭标记
shutdown_initiated_
,这个主要作用是关闭标记,如果写完发现已经关闭,直接调用close,通过shutdown_initiated_
和writing_
可以实现安全的关闭,同时保证写数据不会丢失