参考博客:https://blog.csdn.net/enlaihe/article/details/112474295
一、异步I/O的概念
1. 异步 IO vs 同步 IO
- 同步 IO:应用程序发起 IO 请求后会阻塞,直到 IO 操作完成(如
read()
函数等待数据返回)。 - 异步 IO:应用程序发起 IO 请求后立即返回,继续执行其他任务;IO 操作由系统后台处理,完成后通过回调、信号或事件通知应用程序。
2. 异步 IO 的核心优势
高并发处理:无需为每个 IO 请求创建线程,减少线程开销(对比多线程同步 IO)。
资源利用率:主线程不阻塞,可同时处理多个 IO 任务,适合 IO 密集型场景(如服务器、文件处理)。
write、read 如果没有设置 O_NONBLOCK 标识则为同步阻塞式 IO,一旦 IO 发起,则进程一直等待直到操作完成
设置了 O_NONBLOCK 标识后,write、read 成为非阻塞 IO,调用后如果资源可用则进行操作,并立即返回,如果资源不可用则直接返回出错,这样的情况下,程序通常需要进入忙等待状态,反复调用 IO 操作,直到正常返回
以上两种 IO 模型虽然可以很好地完成单机的 IO 操作,但是对于并发的请求则无法实现
对于并发的多个请求,可以使用 IO 复用模型,如 select、poll、epoll 等,但是进程必须阻塞直到操作完成
如果需要进行并发、非阻塞的 IO 操作,比如 CPU 密集型应用及较慢的 IO 操作应用场景下,使用异步 IO 是一个很好地选择
3. 操作系统中的异步 IO 实现
- Linux 系统:
- POSIX AIO:遵循 POSIX 标准的异步 IO 接口,由 glibc 封装(如
aio_read()
)。 - libaio:Linux 内核原生异步 IO 库,提供更底层的接口(如
io_submit()
)。 - io_uring:Linux 5.1 + 引入的高性能异步 IO 机制,性能优于传统方案。
- POSIX AIO:遵循 POSIX 标准的异步 IO 接口,由 glibc 封装(如
- 其他系统:
- Windows:
ReadFileAsync()
等 API。 - macOS/BSD:
kqueue
配合异步 IO 接口。
- Windows:
我们这里主要讲解POSIX AIO
,即glibc
二、glibc 介绍
1. glibc 是什么?
GNU C Library(glibc)是 Linux 系统中最常用的 C 标准库,提供用户空间 API,封装内核系统调用,包括 IO、内存管理、线程等功能。
2. glibc对异步I/O的实现
glibc 实现的 POSIX AIO 是一套跨平台的异步 IO 标准接口,主要函数包括:
函数 | 功能 | 原型 |
---|---|---|
aio_read | 请求异步读操作 | int aio_read(struct aiocb *aiocbp); |
aio_write | 请求异步写操作 | int aio_write(struct aiocb *aiocbp); |
aio_error | 检查异步请求的状态 | int aio_error(const struct aiocb *aiocbp); |
aio_return | 获得完成的异步请求的返回状态 | ssize_t aio_return(struct aiocb *aiocbp); |
aio_suspend | 挂起调用进程,直到一个或多个异步请求已经完成(或失败) | int aio_suspend(const struct aiocb * const list[], int nent, const struct timespec *timeout); |
aio_cancel | 取消异步 I/O 请求 | int aio_cancel(int fildes, struct aiocb *aiocbp); |
lio_listio | 同时发起多个异步IO传输 | int lio_listio( int mode, struct aiocb *list[], int nent, struct sigevent *sig ); |
- glibc主要是通过线程池在用户端来模拟异步的,并不是真正的异步I/O:glibc 创建线程池,用阻塞 IO 模拟异步(如线程执行
read()
,主线程非阻塞)。 - 本质上是多线程同步I/O,性能较低
aiocb
上述函数用到了一个 struct aiocb 结构,主要包含以下字段:
struct aiocb {
int aio_fildes; // 文件描述符
off_t aio_offset; // 文件偏移量
volatile void *aio_buf; // 数据缓冲区
size_t aio_nbytes; // 传输字节数
int aio_reqprio; // 请求优先级(通常忽略)
struct sigevent aio_sigevent; // 完成通知方式
int aio_lio_opcode; // 操作类型(用于lio_listio)
};
参数含义
基本 I/O 参数
字段 | 类型 | 描述 |
---|---|---|
aio_fildes |
int |
文件描述符,指定要操作的文件(通过 open() 返回)。 |
aio_offset |
off_t |
文件偏移量,指定 I/O 操作的起始位置(如读取或写入从文件的哪个位置开始)。 |
aio_buf |
volatile void* |
数据缓冲区指针,用于存储读取的数据或写入的数据。必须在操作完成前保持有效。 |
aio_nbytes |
size_t |
要传输的字节数(如读取 / 写入的字节长度)。 |
优先级与操作类型
字段 | 类型 | 描述 |
---|---|---|
aio_reqprio |
int |
请求优先级(POSIX 标准保留字段)。通常被内核忽略,实际优先级由系统调度决定。 |
aio_lio_opcode |
int |
操作类型,仅用于 lio_listio() 批量操作: - LIO_READ :读操作 - LIO_WRITE :写操作 - LIO_NOP :空操作(占位符) |
sigevent
上述结构中有一个 aio_sigevent 域,用于定义异步操作完成时通知信号或回调函数
struct sigevent
{
int sigev_notify; // 响应类型
int sigev_signo; // 信号
union sigval sigev_value; // 信号传递的参数
void (*sigev_notify_function)(union sigval); // 回调函数
pthread_attr_t *sigev_notify_attributes; // 线程回调
}
通知方式:
SIGEV_NONE
:不发送通知SIGEV_SIGNAL
:发送信号SIGEV_THREAD
:创建新线程调用回调函数
sigval
上述结构中用到了一个联合体 sigval:
typedef union sigval
{
int sival_int;
void *sival_ptr;
} sigval_t;
sival_int
:当需要传递简单的整数值(如状态码、计数器)时使用。sival_ptr
:- 用于传递复杂数据结构的地址(如结构体指针、缓冲区指针)。
- 在异步 I/O 中,常用来指向对应的
aiocb
结构体,以便在回调函数中获取操作上下文:
主要函数
aio_read - 发起异步读操作
- 提交异步读请求
- 成功返回0,失败返回-1并设置errno
int aio_read(struct aiocb *aiocbp);
aio_write - 发起异步写操作
- 提交异步写请求
int aio_write(struct aiocb *aiocbp);
aio_error - 检查异步操作状态
- 返回值:
0
:操作已完成EINPROGRESS
:操作仍在进行中- 其他错误码:操作失败
int aio_error(const struct aiocb *aiocbp);
aio_return - 获取操作结果
- 必须在操作完成后调用
- 返回实际读写的字节数(成功时)或-1(失败时)
ssize_t aio_return(struct aiocb *aiocbp);
aio_suspend - 等待异步操作完成
- 挂起调用线程,直到列表中至少一个操作完成
list
:aiocb指针数组nent
:数组元素数量timeout
:超时时间(NULL表示无限等待)
int aio_suspend(const struct aiocb *const list[], int nent,
const struct timespec *timeout);
aio_cancel - 取消异步操作
- 取消特定或所有操作
fd
:文件描述符aiocbp
:- NULL:取消所有针对该fd的操作
- 非NULL:取消特定操作
int aio_cancel(int fd, struct aiocb *aiocbp);
lio_listio - 批量提交多个操作
- 提交多个异步I/O操作
mode
:LIO_WAIT
:阻塞直到所有操作完成LIO_NOWAIT
:非阻塞提交
int lio_listio(int mode, struct aiocb *const list[], int nent,
struct sigevent *sig);
三、glibc 实现异步读写文件
3.1 异步写入文件+轮询
这里异步写入后,我们通过轮训的方式来查看I/O是否完成
#include<iostream>
#include<cstdio>
#include<cstring>
#include<error.h>
#include<signal.h>
#include<fcntl.h>
#include<unistd.h>
#include<aio.h>
#define FILENAME "posix_aio_demo.txt"
#define BUF_SIZE 1024
int main(){
//准备文件
int fd, ret;
char* write_buffer,* read_buffer;
struct aiocb read_cb,write_cb;
struct sigaction sa;
fd = open(FILENAME,O_CREAT | O_RDWR | O_TRUNC ,0644);
if(fd < 0){
perror("open");
return 1;
}
const char* init_text = "Initial file content\n";
write(fd,init_text,strlen(init_text));
close(fd);
//重新打开文件进行异步IO
fd = open(FILENAME, O_RDWR);
if(fd < 0){
perror("open");
return 1;
}
//分配缓冲区
write_buffer = (char*)malloc(BUF_SIZE);
read_buffer = (char*)malloc(BUF_SIZE);
snprintf(write_buffer,BUF_SIZE,"Hello, POSIX AIO! Current time:%ld\n",time(NULL));
size_t write_size = strlen(write_buffer);
//异步写入 + 轮询检查
memset(&write_cb,0,sizeof(write_cb));
write_cb.aio_fildes = fd;
write_cb.aio_offset = 0; //开头写入
write_cb.aio_buf = write_buffer;
write_cb.aio_nbytes = write_size;
if(aio_write(&write_cb) < 0){
perror("aio_write");
close(fd);
return 1;
}
//轮询等待写入
while(aio_error(&write_cb) == EINPROGRESS){
std::cout << "Write in progress ... "<< std::endl;
usleep(1000);
}
//检查写入结果
if(aio_error(&write_cb) != 0){
std::cerr << "Write failed :" << strerror(aio_error(&write_cb)) << std::endl;
}
else{
ssize_t bytes_written = aio_return(&write_cb);
std::cout << "Write completed:" << bytes_written << " bytes" << std::endl;
}
close(fd);
return 0;
}
运行结果
3.2 异步读取文件+信号通知
当SA_SIGINFO
标志启用并且设置触发的信号为SIGRTMIN
时,异步I/O完成后就会发送SIGRTMIN
信号,此时我们就可以注册自定义的信号处理函数,来处理对应的结果
- 设置异步I/O通知方式
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGRTMIN;
read_cb.aio_sigevent = sigev;
- 注册信号回调函数,类型为
void aio_completion_handler(int signo, siginfo_t *info,void* context)
sa.sa_sigaction = aio_completion_handler;
sa.sa_flags = SA_SIGINFO;
sigemptyset(&sa.sa_mask);
sigaction(SIGRTMIN,&sa,NULL);
- 在回调函数内部要处理异步
I/O
的结果 - SI_ASYNCIO:表示信号由异步 I/O 完成触发
void aio_completion_handler(int signo, siginfo_t *info,void* context){
if(info->si_code == SI_ASYNCIO){
std::cout << "I/O operation completed via signal !" << std::endl;
read_completed = 1;
}
}
完整代码
#include<iostream>
#include<cstdio>
#include<cstring>
#include<error.h>
#include<signal.h>
#include<fcntl.h>
#include<unistd.h>
#include<aio.h>
#define FILENAME "posix_aio_demo.txt"
#define BUF_SIZE 1024
volatile sig_atomic_t read_completed; //原子变量
void aio_completion_handler(int signo, siginfo_t *info,void* context){
if(info->si_code == SI_ASYNCIO){
std::cout << "I/O operation completed via signal !" << std::endl;
read_completed = 1;
}
}
int main(){
//准备文件
int fd, ret;
char* write_buffer,* read_buffer;
struct aiocb read_cb,write_cb;
struct sigaction sa;
struct sigevent sigev;
fd = open(FILENAME,O_CREAT | O_RDWR | O_TRUNC ,0644);
if(fd < 0){
perror("open");
return 1;
}
const char* init_text = "Initial file content\n";
write(fd,init_text,strlen(init_text));
close(fd);
//重新打开文件进行异步IO
fd = open(FILENAME, O_RDWR);
if(fd < 0){
perror("open");
return 1;
}
//分配缓冲区
write_buffer = (char*)malloc(BUF_SIZE);
read_buffer = (char*)malloc(BUF_SIZE);
//设置信号处理
memset(&sa,0,sizeof(sa));
sa.sa_sigaction = aio_completion_handler;
sa.sa_flags = SA_SIGINFO;
sigemptyset(&sa.sa_mask);
sigaction(SIGRTMIN,&sa,NULL);
//初始化read_cb
memset(&read_cb,0,sizeof(read_cb));
read_cb.aio_fildes = fd;
read_cb.aio_offset = 0;
read_cb.aio_buf = read_buffer;
read_cb.aio_nbytes = BUF_SIZE;
// 设置信号通知
memset(&sigev, 0, sizeof(sigev));
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGRTMIN;
read_cb.aio_sigevent = sigev;
if(aio_read(&read_cb) < 0){
perror("aio_read");
close(fd);
}
std::cout << "Waiting for read completion signal..." << std::endl;
while(!read_completed){
pause();
}
if(aio_error(&read_cb) != 0){
std::cerr << "Read failed:" << strerror(aio_error(&read_cb)) << std::endl;
}
else{
ssize_t bytes_read = aio_return(&read_cb);
read_buffer[bytes_read] = '\0';
std::cout << "Read completed : " << bytes_read << " bytes" << std::endl;
std::cout << "Content: " << read_buffer << std::endl;
}
close(fd);
return 0;
}
运行结果
3.3 异步写入+线程通知
- 使用线程通知的方式,当
I/O
完成的时候会通过线程的方式调用回调函数
#include<iostream>
#include<cstdio>
#include<cstring>
#include<error.h>
#include<signal.h>
#include<fcntl.h>
#include<unistd.h>
#include<aio.h>
#define FILENAME "posix_aio_demo.txt"
#define BUF_SIZE 1024
void aio_thread_callback(union sigval sv){
struct aiocb* req = (struct aiocb*)sv.sival_ptr;
std::cout << "I/O completed in thread:" << (char*)req->aio_buf << std::endl;
}
int main(){
//准备文件
int fd, ret;
char* write_buffer,* read_buffer;
struct aiocb read_cb,write_cb;
struct sigaction sa;
struct sigevent sigev;
fd = open(FILENAME,O_CREAT | O_RDWR | O_TRUNC ,0644);
if(fd < 0){
perror("open");
return 1;
}
const char* init_text = "Initial file content\n";
write(fd,init_text,strlen(init_text));
close(fd);
//重新打开文件进行异步IO
fd = open(FILENAME, O_RDWR);
if(fd < 0){
perror("open");
return 1;
}
//分配缓冲区
write_buffer = (char*)malloc(BUF_SIZE);
read_buffer = (char*)malloc(BUF_SIZE);
snprintf(write_buffer,BUF_SIZE,"Write via thread callback. Time:%ld\n",time(NULL));
write_size = strlen(write_buffer);
memset(&write_cb,0,sizeof(write_cb));
write_cb.aio_fildes = fd;
write_cb.aio_offset = 0;
write_cb.aio_buf = write_buffer;
write_cb.aio_nbytes = write_size;
//设置线程通知
memset(&sigev,0,sizeof(sigev));
sigev.sigev_notify = SIGEV_THREAD;
sigev.sigev_notify_function = aio_thread_callback;
sigev.sigev_value.sival_ptr = &write_cb;
write_cb.aio_sigevent = sigev;
//提交异步结果
if(aio_write(&write_cb) < 0){
perror("aio_write");
close(fd);
return 1;
}
std::cout << "Main thread continuing while write operation in progress ... " << std::endl;
sleep(1);
close(fd);
free(write_buffer);
free(read_buffer);
return 0;
}
运行结果
3.4 批量异步操作
lio_listio
函数可以批量加入异步操作,需要传入aiocb
数组,并且设置对应的I/O
类型- 异步顺序由内核决定,无法保证执行的顺序,因此需要而外的同步
#include<iostream>
#include<cstdio>
#include<cstring>
#include<error.h>
#include<signal.h>
#include<fcntl.h>
#include<unistd.h>
#include<aio.h>
#define FILENAME "posix_aio_demo.txt"
#define BUF_SIZE 1024
volatile sig_atomic_t read_completed; //原子变量
int main(){
//准备文件
int fd, ret;
char* write_buffer,* read_buffer;
struct aiocb read_cb,write_cb;
struct sigaction sa;
struct sigevent sigev;
fd = open(FILENAME,O_CREAT | O_RDWR | O_TRUNC ,0644);
if(fd < 0){
perror("open");
return 1;
}
const char* init_text = "Initial file content\n";
write(fd,init_text,strlen(init_text));
close(fd);
//重新打开文件进行异步IO
fd = open(FILENAME, O_RDWR);
if(fd < 0){
perror("open");
return 1;
}
//分配缓冲区
write_buffer = (char*)malloc(BUF_SIZE);
read_buffer = (char*)malloc(BUF_SIZE);
struct aiocb*op_list[2];
memset(&write_cb,0,sizeof(write_cb));
write_cb.aio_fildes = fd;
write_cb.aio_offset = 0;
write_cb.aio_buf = write_buffer;
write_cb.aio_nbytes = write_size;
write_cb.aio_lio_opcode = LIO_WRITE; //设置I/O类型
op_list[0] = &write_cb;
memset(&read_cb,0,sizeof(read_cb));
read_cb.aio_fildes = fd;
read_cb.aio_offset = 0;
read_cb.aio_buf = read_buffer;
read_cb.aio_nbytes = BUF_SIZE;
read_cb.aio_lio_opcode = LIO_READ; //设置I/O类型
op_list[1] = &read_cb;
snprintf(write_buffer,BUF_SIZE,"Final write via lio_listio. Time: %ld\n",time(NULL));
write_size = strlen(write_buffer);
//提交批量操作
if(lio_listio(LIO_WAIT,op_list,2,NULL) < 0){
perror("lio_listio");
}
else{
std::cout << "Batch operations completed !" << std::endl;
//检查读取结果
if(aio_error(&read_cb) == 0){
ssize_t bytes_read = aio_return(&read_cb);
read_buffer[bytes_read] = '\0';
std::cout << "Read content :" << read_buffer <<std::endl;
}
}
close(fd);
free(write_buffer);
free(read_buffer);
remove(FILENAME);
return 0;
}
运行结果
四、总结
通知机制选择:
- 轮询:简单但CPU效率低
- 信号:高效但需要处理信号竞争
- 线程回调:灵活但增加线程管理开销
错误处理:
- 始终检查
aio_error()
返回值 - 操作完成后才调用
aio_return()
- 注意
aio_return()
只能调用一次
- 始终检查
文件定位:
- 使用
aio_offset
指定操作位置 - 注意并发操作时的定位问题
- 使用
线程安全性:
- POSIX AIO函数是线程安全的
- 但单个
aiocb
结构不应被多个线程同时操作
实现差异:
- Linux的glibc实现使用线程池模拟异步I/O
- 某些系统(如FreeBSD)有真正的内核支持
性能考虑:
- 对于小文件操作,同步I/O可能更高效
- 大量小I/O操作时线程池可能成为瓶颈
与Linux AIO(libaio)的比较
特性 | POSIX AIO | Linux AIO (libaio) |
---|---|---|
标准 | POSIX标准 | Linux特有 |
实现 | 用户空间线程池(glibc) | 内核支持 |
性能 | 中等(线程切换开销) | 高(真正的异步) |
复杂度 | 较低 | 较高 |
可移植性 | 高(跨Unix系统) | 仅Linux |
通知机制 | 信号/线程回调/轮询 | 事件完成队列 |
文件类型 | 支持常规文件 | 支持常规文件和块设备 |
glibc 与异步 IO 的核心关系
- glibc 是桥梁:为用户提供跨平台的异步 IO 接口(POSIX AIO),封装内核能力或模拟异步行为。
- 异步 IO 的本质:通过解耦 IO 操作与主线程,提升系统并发能力,glibc 的实现兼顾兼容性和性能。
- 进阶方向:若追求极致性能,可直接使用 Linux 内核的
libaio
或io_uring
,或结合异步编程框架(如 libuv、libevent)。