【Linux】Linux异步I/O- POSIX glibc

发布于:2025-06-18 ⋅ 阅读:(18) ⋅ 点赞:(0)

参考博客:https://blog.csdn.net/enlaihe/article/details/112474295

一、异步I/O的概念

1. 异步 IO vs 同步 IO

  • 同步 IO:应用程序发起 IO 请求后会阻塞,直到 IO 操作完成(如read()函数等待数据返回)。
  • 异步 IO:应用程序发起 IO 请求后立即返回,继续执行其他任务;IO 操作由系统后台处理,完成后通过回调、信号或事件通知应用程序。

在这里插入图片描述

2. 异步 IO 的核心优势

  • 高并发处理:无需为每个 IO 请求创建线程,减少线程开销(对比多线程同步 IO)。

  • 资源利用率:主线程不阻塞,可同时处理多个 IO 任务,适合 IO 密集型场景(如服务器、文件处理)。

  • write、read 如果没有设置 O_NONBLOCK 标识则为同步阻塞式 IO,一旦 IO 发起,则进程一直等待直到操作完成

  • 设置了 O_NONBLOCK 标识后,write、read 成为非阻塞 IO,调用后如果资源可用则进行操作,并立即返回,如果资源不可用则直接返回出错,这样的情况下,程序通常需要进入忙等待状态,反复调用 IO 操作,直到正常返回

以上两种 IO 模型虽然可以很好地完成单机的 IO 操作,但是对于并发的请求则无法实现

  • 对于并发的多个请求,可以使用 IO 复用模型,如 select、poll、epoll 等,但是进程必须阻塞直到操作完成

  • 如果需要进行并发、非阻塞的 IO 操作,比如 CPU 密集型应用及较慢的 IO 操作应用场景下,使用异步 IO 是一个很好地选择

3. 操作系统中的异步 IO 实现

  • Linux 系统
    • POSIX AIO:遵循 POSIX 标准的异步 IO 接口,由 glibc 封装(如aio_read())。
    • libaio:Linux 内核原生异步 IO 库,提供更底层的接口(如io_submit())。
    • io_uring:Linux 5.1 + 引入的高性能异步 IO 机制,性能优于传统方案。
  • 其他系统
    • Windows:ReadFileAsync()等 API。
    • macOS/BSD:kqueue配合异步 IO 接口。

我们这里主要讲解POSIX AIO,即glibc

二、glibc 介绍

1. glibc 是什么?

GNU C Library(glibc)是 Linux 系统中最常用的 C 标准库,提供用户空间 API,封装内核系统调用,包括 IO、内存管理、线程等功能。

2. glibc对异步I/O的实现

glibc 实现的 POSIX AIO 是一套跨平台的异步 IO 标准接口,主要函数包括:

函数 功能 原型
aio_read 请求异步读操作 int aio_read(struct aiocb *aiocbp);
aio_write 请求异步写操作 int aio_write(struct aiocb *aiocbp);
aio_error 检查异步请求的状态 int aio_error(const struct aiocb *aiocbp);
aio_return 获得完成的异步请求的返回状态 ssize_t aio_return(struct aiocb *aiocbp);
aio_suspend 挂起调用进程,直到一个或多个异步请求已经完成(或失败) int aio_suspend(const struct aiocb * const list[], int nent, const struct timespec *timeout);
aio_cancel 取消异步 I/O 请求 int aio_cancel(int fildes, struct aiocb *aiocbp);
lio_listio 同时发起多个异步IO传输 int lio_listio( int mode, struct aiocb *list[], int nent, struct sigevent *sig );
  • glibc主要是通过线程池在用户端来模拟异步的,并不是真正的异步I/O:glibc 创建线程池,用阻塞 IO 模拟异步(如线程执行read(),主线程非阻塞)。
  • 本质上是多线程同步I/O,性能较低
aiocb

上述函数用到了一个 struct aiocb 结构,主要包含以下字段:

struct aiocb {
    int             aio_fildes;     // 文件描述符
    off_t           aio_offset;     // 文件偏移量
    volatile void  *aio_buf;        // 数据缓冲区
    size_t          aio_nbytes;     // 传输字节数
    int             aio_reqprio;    // 请求优先级(通常忽略)
    struct sigevent aio_sigevent;   // 完成通知方式
    int             aio_lio_opcode; // 操作类型(用于lio_listio)
};
参数含义

基本 I/O 参数

字段 类型 描述
aio_fildes int 文件描述符,指定要操作的文件(通过 open() 返回)。
aio_offset off_t 文件偏移量,指定 I/O 操作的起始位置(如读取或写入从文件的哪个位置开始)。
aio_buf volatile void* 数据缓冲区指针,用于存储读取的数据或写入的数据。必须在操作完成前保持有效。
aio_nbytes size_t 要传输的字节数(如读取 / 写入的字节长度)。

优先级与操作类型

字段 类型 描述
aio_reqprio int 请求优先级(POSIX 标准保留字段)。通常被内核忽略,实际优先级由系统调度决定。
aio_lio_opcode int 操作类型,仅用于 lio_listio() 批量操作:
- LIO_READ:读操作
- LIO_WRITE:写操作
- LIO_NOP:空操作(占位符)
sigevent

上述结构中有一个 aio_sigevent 域,用于定义异步操作完成时通知信号或回调函数

struct sigevent  
{  
    int                sigev_notify;                    // 响应类型  
    int                sigev_signo;                    // 信号  
    union sigval    sigev_value;                    // 信号传递的参数  
    void (*sigev_notify_function)(union sigval);    // 回调函数  
    pthread_attr_t    *sigev_notify_attributes;        // 线程回调  
}

通知方式

  • SIGEV_NONE:不发送通知
  • SIGEV_SIGNAL:发送信号
  • SIGEV_THREAD:创建新线程调用回调函数
sigval

上述结构中用到了一个联合体 sigval:

typedef union sigval  
{  
    int        sival_int;  
    void    *sival_ptr;  
} sigval_t;
  • sival_int:当需要传递简单的整数值(如状态码、计数器)时使用。
  • sival_ptr
    • 用于传递复杂数据结构的地址(如结构体指针、缓冲区指针)。
    • 在异步 I/O 中,常用来指向对应的 aiocb 结构体,以便在回调函数中获取操作上下文:
主要函数
aio_read - 发起异步读操作
  • 提交异步读请求
  • 成功返回0,失败返回-1并设置errno
int aio_read(struct aiocb *aiocbp);
aio_write - 发起异步写操作
  • 提交异步写请求
int aio_write(struct aiocb *aiocbp);
aio_error - 检查异步操作状态
  • 返回值:
    • 0:操作已完成
    • EINPROGRESS:操作仍在进行中
    • 其他错误码:操作失败
int aio_error(const struct aiocb *aiocbp);
aio_return - 获取操作结果
  • 必须在操作完成后调用
  • 返回实际读写的字节数(成功时)或-1(失败时)
ssize_t aio_return(struct aiocb *aiocbp);
aio_suspend - 等待异步操作完成
  • 挂起调用线程,直到列表中至少一个操作完成
  • list:aiocb指针数组
  • nent:数组元素数量
  • timeout:超时时间(NULL表示无限等待)
int aio_suspend(const struct aiocb *const list[], int nent,
               const struct timespec *timeout);
aio_cancel - 取消异步操作
  • 取消特定或所有操作
  • fd:文件描述符
  • aiocbp
    • NULL:取消所有针对该fd的操作
    • 非NULL:取消特定操作
int aio_cancel(int fd, struct aiocb *aiocbp);
lio_listio - 批量提交多个操作
  • 提交多个异步I/O操作
  • mode
    • LIO_WAIT:阻塞直到所有操作完成
    • LIO_NOWAIT:非阻塞提交
int lio_listio(int mode, struct aiocb *const list[], int nent,
              struct sigevent *sig);

三、glibc 实现异步读写文件

3.1 异步写入文件+轮询

这里异步写入后,我们通过轮训的方式来查看I/O是否完成

#include<iostream>
#include<cstdio>
#include<cstring>
#include<error.h>
#include<signal.h>
#include<fcntl.h>
#include<unistd.h>
#include<aio.h>

#define FILENAME "posix_aio_demo.txt"
#define BUF_SIZE 1024

int main(){
    //准备文件
    int fd, ret;
    char* write_buffer,* read_buffer;

    struct aiocb read_cb,write_cb;
    struct sigaction sa;

    fd = open(FILENAME,O_CREAT | O_RDWR | O_TRUNC ,0644);
    if(fd < 0){
        perror("open");
        return 1;
    }

    const char* init_text = "Initial file content\n";

    write(fd,init_text,strlen(init_text));
    close(fd);

    //重新打开文件进行异步IO
    fd = open(FILENAME, O_RDWR);
    if(fd < 0){
        perror("open");
        return 1;
    }

    //分配缓冲区
    write_buffer = (char*)malloc(BUF_SIZE);
    read_buffer = (char*)malloc(BUF_SIZE);

    snprintf(write_buffer,BUF_SIZE,"Hello, POSIX AIO! Current time:%ld\n",time(NULL));
    size_t write_size = strlen(write_buffer);

    //异步写入 + 轮询检查
    memset(&write_cb,0,sizeof(write_cb));
    write_cb.aio_fildes = fd;
    write_cb.aio_offset = 0; //开头写入
    write_cb.aio_buf = write_buffer;
    write_cb.aio_nbytes = write_size;

    if(aio_write(&write_cb) < 0){
        perror("aio_write");
        close(fd);
        return 1;
    }

    //轮询等待写入
    while(aio_error(&write_cb) == EINPROGRESS){
        std::cout << "Write in progress ... "<< std::endl;
        usleep(1000);
    }

    //检查写入结果
    if(aio_error(&write_cb) != 0){
        std::cerr << "Write failed :" << strerror(aio_error(&write_cb)) << std::endl;
    }
    else{
        ssize_t bytes_written = aio_return(&write_cb);
        std::cout << "Write completed:" << bytes_written << " bytes" << std::endl;
    }

    close(fd);

    return 0;
}

运行结果

在这里插入图片描述

在这里插入图片描述

3.2 异步读取文件+信号通知

SA_SIGINFO标志启用并且设置触发的信号为SIGRTMIN时,异步I/O完成后就会发送SIGRTMIN信号,此时我们就可以注册自定义的信号处理函数,来处理对应的结果

  • 设置异步I/O通知方式
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGRTMIN;
read_cb.aio_sigevent = sigev;
  • 注册信号回调函数,类型为void aio_completion_handler(int signo, siginfo_t *info,void* context)
sa.sa_sigaction = aio_completion_handler;
sa.sa_flags = SA_SIGINFO;

sigemptyset(&sa.sa_mask);
sigaction(SIGRTMIN,&sa,NULL);
  • 在回调函数内部要处理异步I/O的结果
  • SI_ASYNCIO:表示信号由异步 I/O 完成触发
void aio_completion_handler(int signo, siginfo_t *info,void* context){
    if(info->si_code == SI_ASYNCIO){
        std::cout << "I/O operation completed via signal !" << std::endl;
        read_completed = 1;
    }
}

完整代码

#include<iostream>
#include<cstdio>
#include<cstring>
#include<error.h>
#include<signal.h>
#include<fcntl.h>
#include<unistd.h>
#include<aio.h>

#define FILENAME "posix_aio_demo.txt"
#define BUF_SIZE 1024

volatile sig_atomic_t read_completed; //原子变量

void aio_completion_handler(int signo, siginfo_t *info,void* context){
    if(info->si_code == SI_ASYNCIO){
        std::cout << "I/O operation completed via signal !" << std::endl;
        read_completed = 1;
    }
}


int main(){
    //准备文件
    int fd, ret;
    char* write_buffer,* read_buffer;

    struct aiocb read_cb,write_cb;
    struct sigaction sa;

    struct sigevent sigev;

    fd = open(FILENAME,O_CREAT | O_RDWR | O_TRUNC ,0644);
    if(fd < 0){
        perror("open");
        return 1;
    }

    const char* init_text = "Initial file content\n";

    write(fd,init_text,strlen(init_text));
    close(fd);

    //重新打开文件进行异步IO
    fd = open(FILENAME, O_RDWR);
    if(fd < 0){
        perror("open");
        return 1;
    }

    //分配缓冲区
    write_buffer = (char*)malloc(BUF_SIZE);
    read_buffer = (char*)malloc(BUF_SIZE);

    //设置信号处理
    memset(&sa,0,sizeof(sa));
    sa.sa_sigaction = aio_completion_handler;
    sa.sa_flags = SA_SIGINFO;

    sigemptyset(&sa.sa_mask);
    sigaction(SIGRTMIN,&sa,NULL);

    //初始化read_cb
    memset(&read_cb,0,sizeof(read_cb));
    read_cb.aio_fildes = fd;
    read_cb.aio_offset = 0;
    read_cb.aio_buf = read_buffer;
    read_cb.aio_nbytes = BUF_SIZE;

    // 设置信号通知
    memset(&sigev, 0, sizeof(sigev));
    sigev.sigev_notify = SIGEV_SIGNAL;
    sigev.sigev_signo = SIGRTMIN;
    read_cb.aio_sigevent = sigev;

    if(aio_read(&read_cb) < 0){
        perror("aio_read");
        close(fd);
    }

    std::cout << "Waiting for read completion signal..." << std::endl;
    while(!read_completed){
        pause();
    }

    if(aio_error(&read_cb) != 0){
        std::cerr << "Read failed:" << strerror(aio_error(&read_cb)) << std::endl;
    }
    else{
        ssize_t bytes_read = aio_return(&read_cb);
        read_buffer[bytes_read] = '\0';
        std::cout << "Read completed : " << bytes_read << " bytes" << std::endl;
        std::cout << "Content: " << read_buffer << std::endl;
    }

    close(fd);

    return 0;
}

运行结果

在这里插入图片描述

3.3 异步写入+线程通知

  • 使用线程通知的方式,当I/O完成的时候会通过线程的方式调用回调函数
#include<iostream>
#include<cstdio>
#include<cstring>
#include<error.h>
#include<signal.h>
#include<fcntl.h>
#include<unistd.h>
#include<aio.h>

#define FILENAME "posix_aio_demo.txt"
#define BUF_SIZE 1024

void aio_thread_callback(union sigval sv){
    struct aiocb* req = (struct aiocb*)sv.sival_ptr;
    std::cout << "I/O completed in thread:" << (char*)req->aio_buf << std::endl; 
}


int main(){
    //准备文件
    int fd, ret;
    char* write_buffer,* read_buffer;

    struct aiocb read_cb,write_cb;
    struct sigaction sa;

    struct sigevent sigev;

    fd = open(FILENAME,O_CREAT | O_RDWR | O_TRUNC ,0644);
    if(fd < 0){
        perror("open");
        return 1;
    }

    const char* init_text = "Initial file content\n";

    write(fd,init_text,strlen(init_text));
    close(fd);

    //重新打开文件进行异步IO
    fd = open(FILENAME, O_RDWR);
    if(fd < 0){
        perror("open");
        return 1;
    }

    //分配缓冲区
    write_buffer = (char*)malloc(BUF_SIZE);
    read_buffer = (char*)malloc(BUF_SIZE);

	snprintf(write_buffer,BUF_SIZE,"Write via thread callback. Time:%ld\n",time(NULL));
    write_size = strlen(write_buffer);

    memset(&write_cb,0,sizeof(write_cb));
    write_cb.aio_fildes = fd;
    write_cb.aio_offset = 0;
    write_cb.aio_buf = write_buffer;
    write_cb.aio_nbytes = write_size;

    //设置线程通知
    memset(&sigev,0,sizeof(sigev));
    sigev.sigev_notify = SIGEV_THREAD;
    sigev.sigev_notify_function = aio_thread_callback;
    sigev.sigev_value.sival_ptr = &write_cb;
    write_cb.aio_sigevent = sigev;

    //提交异步结果
    if(aio_write(&write_cb) < 0){
        perror("aio_write");
        close(fd);
        return 1;
    }

    std::cout << "Main thread continuing while write operation in progress ... " << std::endl;

    sleep(1);


    close(fd);
    free(write_buffer);
    free(read_buffer);
    return 0;
}

运行结果

在这里插入图片描述

3.4 批量异步操作

  • lio_listio函数可以批量加入异步操作,需要传入aiocb数组,并且设置对应的I/O类型
  • 异步顺序由内核决定,无法保证执行的顺序,因此需要而外的同步
#include<iostream>
#include<cstdio>
#include<cstring>
#include<error.h>
#include<signal.h>
#include<fcntl.h>
#include<unistd.h>
#include<aio.h>

#define FILENAME "posix_aio_demo.txt"
#define BUF_SIZE 1024

volatile sig_atomic_t read_completed; //原子变量

int main(){
    //准备文件
    int fd, ret;
    char* write_buffer,* read_buffer;

    struct aiocb read_cb,write_cb;
    struct sigaction sa;

    struct sigevent sigev;

    fd = open(FILENAME,O_CREAT | O_RDWR | O_TRUNC ,0644);
    if(fd < 0){
        perror("open");
        return 1;
    }

    const char* init_text = "Initial file content\n";

    write(fd,init_text,strlen(init_text));
    close(fd);

    //重新打开文件进行异步IO
    fd = open(FILENAME, O_RDWR);
    if(fd < 0){
        perror("open");
        return 1;
    }

    //分配缓冲区
    write_buffer = (char*)malloc(BUF_SIZE);
    read_buffer = (char*)malloc(BUF_SIZE);

    struct aiocb*op_list[2];

    memset(&write_cb,0,sizeof(write_cb));
    write_cb.aio_fildes = fd;
    write_cb.aio_offset = 0;
    write_cb.aio_buf = write_buffer;
    write_cb.aio_nbytes = write_size;
    write_cb.aio_lio_opcode = LIO_WRITE; //设置I/O类型
    op_list[0] = &write_cb;

    memset(&read_cb,0,sizeof(read_cb));
    read_cb.aio_fildes = fd;
    read_cb.aio_offset = 0;
    read_cb.aio_buf = read_buffer;
    read_cb.aio_nbytes = BUF_SIZE;
    read_cb.aio_lio_opcode = LIO_READ; //设置I/O类型
    op_list[1] = &read_cb;
    
    snprintf(write_buffer,BUF_SIZE,"Final write via lio_listio. Time: %ld\n",time(NULL));
    write_size = strlen(write_buffer);
    
    
    //提交批量操作
    if(lio_listio(LIO_WAIT,op_list,2,NULL) < 0){
        perror("lio_listio");
    }
    else{
        std::cout << "Batch operations completed !" << std::endl;

        //检查读取结果
        if(aio_error(&read_cb) == 0){
            ssize_t bytes_read = aio_return(&read_cb);
            read_buffer[bytes_read] = '\0';
            std::cout << "Read content :" << read_buffer <<std::endl;
        }


    }

    close(fd);
    free(write_buffer);
    free(read_buffer);
    remove(FILENAME);


    return 0;
}

运行结果

在这里插入图片描述

四、总结

  1. 通知机制选择

    • 轮询:简单但CPU效率低
    • 信号:高效但需要处理信号竞争
    • 线程回调:灵活但增加线程管理开销
  2. 错误处理

    • 始终检查aio_error()返回值
    • 操作完成后才调用aio_return()
    • 注意aio_return()只能调用一次
  3. 文件定位

    • 使用aio_offset指定操作位置
    • 注意并发操作时的定位问题
  4. 线程安全性

    • POSIX AIO函数是线程安全的
    • 但单个aiocb结构不应被多个线程同时操作
  5. 实现差异

    • Linux的glibc实现使用线程池模拟异步I/O
    • 某些系统(如FreeBSD)有真正的内核支持
  6. 性能考虑

    • 对于小文件操作,同步I/O可能更高效
    • 大量小I/O操作时线程池可能成为瓶颈

与Linux AIO(libaio)的比较

特性 POSIX AIO Linux AIO (libaio)
标准 POSIX标准 Linux特有
实现 用户空间线程池(glibc) 内核支持
性能 中等(线程切换开销) 高(真正的异步)
复杂度 较低 较高
可移植性 高(跨Unix系统) 仅Linux
通知机制 信号/线程回调/轮询 事件完成队列
文件类型 支持常规文件 支持常规文件和块设备
glibc 与异步 IO 的核心关系
  • glibc 是桥梁:为用户提供跨平台的异步 IO 接口(POSIX AIO),封装内核能力或模拟异步行为。
  • 异步 IO 的本质:通过解耦 IO 操作与主线程,提升系统并发能力,glibc 的实现兼顾兼容性和性能。
  • 进阶方向:若追求极致性能,可直接使用 Linux 内核的libaioio_uring,或结合异步编程框架(如 libuv、libevent)。

网站公告

今日签到

点亮在社区的每一天
去签到