Boost.Asio 异步写:为什么多次 async_write_some 会导致乱序,以及如何解决
问题背景
在网络编程中,我们经常需要向对端发送多条消息,例如:
WriteToSocketErr("Hello ");
WriteToSocketErr("World");
这两个函数内部调用:
_socket->async_write_some(...);
很多人会以为:既然是在同一个线程里顺序调用两次 async_write_some
,发送顺序也应该是固定的,对端就会先收到 "Hello " 再收到 “World”。
但事实并非如此 —— 对端有可能先收到 “World”,再收到 "Hello "。
为什么会出现乱序?
1. async_write_some 是异步的
当我们调用:
async_write_some(buffer, handler);
这个函数会立即返回,并不是立刻执行底层 write()
,而是:
- 把写操作和回调注册到事件循环(io_context)
- 等到 epoll(或 kqueue 等)通知 fd 可写时,再调用回调执行实际的写操作
2. 多个异步写操作的竞争
如果我们连续快速发起两个 async_write_some
:
async_write_some("Hello ", handler1);
async_write_some("World", handler2);
这两个操作都会注册在 io_context 的任务队列里:
- 当 epoll 通知 fd 可写时,io_context.run() 会调度回调执行
- 关键问题:哪个回调先被调度执行是不确定的,取决于事件循环内部的调度顺序
- 可能先执行 handler2,再执行 handler1
结果:
- 内核先执行
write(fd, "World", 5)
,再执行write(fd, "Hello ", 6)
- 对端先收到 “World”,后收到 "Hello "
3. TCP 协议层面的保证
需要澄清的是:
- TCP 保证同一次
write()
调用的内容不会被拆散;“World” 永远是连续的 - TCP 保证数据包的顺序性:先写入内核缓冲区的数据先发送
- 但是,如果多个
async_write_some
的回调执行顺序不确定,就会导致写入内核缓冲区的顺序不确定
总结:乱序的根本原因
async_write_some
只是注册回调,不是立即写入- 同时注册多个
async_write_some
时,Boost.Asio 无法保证它们的执行顺序 - 执行顺序的不确定性导致了最终发送顺序的不确定性
解决方案:使用发送队列
要让对端收到严格有序的数据,关键原则是:
同一时刻只发起一次异步写操作,上一次写完成后再发起下一次写。
实现方案
- 定义消息队列和状态标志:
class Session {
private:
std::queue<std::shared_ptr<MsgNode>> _send_queue;
bool _writing = false; // 标记是否有正在进行的写操作
std::shared_ptr<asio::ip::tcp::socket> _socket;
};
- 封装发送函数:
void Session::Send(const std::string& data) {
auto node = std::make_shared<MsgNode>(data.c_str(), data.size());
_send_queue.push(node);
// 如果当前没有正在进行的写操作,启动写操作
if (!_writing) {
DoWrite();
}
}
- 串行写操作的核心函数:
void Session::DoWrite() {
if (_send_queue.empty()) {
_writing = false;
return;
}
_writing = true;
auto node = _send_queue.front();
_socket->async_write_some(
asio::buffer(node->_msg + node->_cur_len,
node->_total_len - node->_cur_len),
[this, node](const boost::system::error_code& ec,
std::size_t bytes_transferred) {
if (!ec) {
node->_cur_len += bytes_transferred;
if (node->_cur_len < node->_total_len) {
// 当前消息还没写完,继续写剩余部分
DoWrite();
} else {
// 当前消息写完了,处理下一个消息
_send_queue.pop();
DoWrite();
}
} else {
// 错误处理
_writing = false;
HandleError(ec);
}
}
);
}
方案优势
- 保证顺序:写操作是串行的,对端收到顺序与
Send()
调用顺序一致 - 处理部分写入:
async_write_some
可能只写入部分数据,代码能正确处理 - 高效:避免了阻塞等待,保持了异步编程的优势
线程安全性考虑
单线程情况(推荐)
如果你的程序采用典型的单线程 Boost.Asio 模型:
- 只在一个线程内调用
io_context.run()
- 所有对消息队列的操作都在这个线程里完成
→ 不需要加锁
原因:
- Boost.Asio 在单线程模型下,所有回调都在
io_context.run()
中顺序执行 - 不会出现并发访问
_send_queue
的情况 - 天然线程安全
多线程情况
如果出现以下情况:
- 多个线程同时调用
io_context.run()
- 或者其他线程直接访问
_send_queue
→ 需要加锁保护队列
class Session {
private:
std::queue<std::shared_ptr<MsgNode>> _send_queue;
std::mutex _queue_mutex; // 保护队列的互斥锁
bool _writing = false;
};
void Session::Send(const std::string& data) {
auto node = std::make_shared<MsgNode>(data.c_str(), data.size());
{
std::lock_guard<std::mutex> lock(_queue_mutex);
_send_queue.push(node);
if (!_writing) {
DoWrite();
}
}
}
实践建议
- 优先使用单线程模型:简单、高效、不需要考虑锁的问题
- 避免直接使用多个 async_write_some:始终通过队列机制来保证顺序
- 错误处理:在写操作失败时,要正确重置
_writing
状态 - 使用 async_write 而非 async_write_some:如果不需要处理部分写入,可以使用
async_write
来简化代码
进阶话题
1. 为什么不推荐用 async_write?
你可能会想:既然 async_write_some
有乱序问题,为什么不直接用 async_write
?
// 看起来更简单的方式
asio::async_write(*_socket, asio::buffer("Hello "), handler1);
asio::async_write(*_socket, asio::buffer("World"), handler2);
问题依然存在:
async_write
内部也是异步的,多次调用仍然会有乱序问题- 它只是保证单次写操作的完整性,不解决多次写操作的顺序问题
正确做法:仍然需要队列机制。
2. 性能优化:批量写入
如果你的应用需要频繁发送小消息,可以考虑批量写入:
void Session::DoWrite() {
if (_send_queue.empty()) {
_writing = false;
return;
}
_writing = true;
// 批量处理多个小消息
std::vector<asio::const_buffer> buffers;
std::vector<std::shared_ptr<MsgNode>> current_batch;
// 收集一批消息(最多N个或总大小不超过M字节)
while (!_send_queue.empty() && current_batch.size() < 10) {
auto node = _send_queue.front();
_send_queue.pop();
buffers.push_back(asio::buffer(node->_msg, node->_total_len));
current_batch.push_back(node);
}
// 一次性写入多个buffer
asio::async_write(*_socket, buffers,
[this, current_batch](const boost::system::error_code& ec,
std::size_t bytes_transferred) {
if (!ec) {
DoWrite(); // 继续处理下一批
} else {
_writing = false;
HandleError(ec);
}
}
);
}
3. 常见陷阱与调试技巧
陷阱1:忘记处理部分写入
// 错误示例:假设async_write_some总是写完所有数据
_socket->async_write_some(buffer, [](auto ec, auto bytes) {
// 危险!可能只写了部分数据
});
陷阱2:在回调中直接递归调用
// 可能导致栈溢出
void DoWrite() {
_socket->async_write_some(buffer, [this](auto ec, auto bytes) {
DoWrite(); // 危险的递归调用
});
}
调试技巧:
- 使用 Wireshark 抓包验证发送顺序
- 添加日志记录每次写操作的时间戳和内容
- 在测试环境中故意增加网络延迟来暴露问题
4. 完整的消息节点实现
class MsgNode {
public:
MsgNode(const char* data, size_t len)
: _total_len(len), _cur_len(0) {
_msg = new char[len];
memcpy(_msg, data, len);
}
~MsgNode() {
delete[] _msg;
}
char* _msg;
size_t _total_len;
size_t _cur_len;
// 禁止拷贝
MsgNode(const MsgNode&) = delete;
MsgNode& operator=(const MsgNode&) = delete;
};
相关技术对比
方案 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
多个async_write_some | 简单直接 | 乱序问题 | ❌ 不推荐 |
发送队列 | 保证顺序,高效 | 代码复杂度稍高 | ✅ 推荐 |
同步写入 | 简单,天然有序 | 阻塞,性能差 | 低并发场景 |
批量写入 | 高性能 | 实现复杂 | 高频小消息 |
总结
Boost.Asio 的 async_write_some
是异步的,多次调用会导致执行顺序不确定,从而造成发送数据乱序。
解决方案:
- 使用发送队列,确保同一时刻只有一个活跃的写操作
- 在单线程模型下不需要加锁
- 在多线程模型下需要用互斥锁保护队列
核心原则:串行化写操作,保证数据发送的顺序性。
进阶优化:
- 对于高频小消息,考虑批量写入
- 注意处理部分写入的情况
- 避免递归调用导致的栈溢出
这个问题在实际项目中很常见,理解其根本原因和解决方案对于编写可靠的网络程序至关重要。