io_uring的异步IO机制
io_uring 原理
io_uring 是 Linux 内核 5.1 版本引入的全新异步 I/O 接口,由 Jens Axboe 开发。它通过两个共享环形缓冲区(ring buffers)在内核和用户空间之间高效传递 I/O 请求和完成事件,避免了传统 AIO 的各种限制。
我们之前了解过,在网络高并发的场景下,epoll 在这方面的性能是独树一帜,通过 epoll 的工作模式,实现了 reactor 这种事件触发的机制,在 reactor 中异步的实现我们可以通过多线程,也可以通过协程去进行实现。但是本质上最后还是调用到 read/write/recv/send
这样的接口,来完成数据的收发工作。
理解read/write/recv/send本质
他们作为系统调用函数,其实本质上还是一种拷贝函数,如果底层缓冲区中有数据,就会调用这些接口,如果没有数据,就会等待数据就绪,或者设置为非阻塞的状态,不等就直接返回一个错误码,表示此时数据还未就绪。
IO 操作本质上就是 “等 + 拷贝” 的操作,单纯的去看待 read/write/recv/send
这些函数,其实他返回请求,接收数据和返回数据这些操作都是一起的。 在 epoll 的处理中我们可以看见,他可以一次性处理多个文件描述符,将这种等的操作重叠了起来,透过他的底层实现我们可以发现,他其实是将 “等 + 拷贝” 的操作实现了一种分离,被监听的 fd 与就绪的 fd 实则是两种不同的数据结构进行处理,构成了一种生产者消费者模型,也就是说 “等” 其实在 epoll 的接口中就已经完成了,而我们调用read/write/recv/send
函数的时候就不用再等了,数据已经就绪,由于这种可以对多个 fd 进行处理的机制,加上使用回调函数进行处理,使得epoll 的功能就很强大。
如何理解io_uring
对于 io_uring 来说,其实更倾向于将这种 IO 操作实现成异步的,在 io_uring 的实现当中,提供了两个队列结构,一个是提交队列(SQ)
,另一个是完成队列(CQ)
,注意,这两个队列都是环形队列结构。
提交队列的作用就是提交 IO 请求,完成队列的作用就是内核通知完成的 I/O 操作,假设当前有 100 个客户端发起读的请求,在 io_uring 的工作方式中,会将这 100 个 IO 请求先 push 到提交队列当中,然后进行处理,然后处理完成的在 push 到完成队列当中,返回结果,他也是由不同线程去完成的。这两个队列干的事两件不同的事情,从而产生了异步的效果。
由于 io_uring 的内部使用 mmap 去进行实现,这种方式是他在整个过程中也只会进行一次的数据拷贝,无异于也是对效率的一个提升,而且通过这种无锁的环形队列接口,减少了频繁进行加锁解锁的消耗,这对于高并发的场景无异于是一个巨大的提升,其实这两个流程也是一个典型的生产者消费者模型。
io_uring接口应用
io_uring 在这儿主要提供了 3 个系统调用接口:
int io_uring_setup(unsigned entries, struct io_uring_params *params);
io_uring_setup 是 Linux 内核提供的系统调用,用于初始化一个异步 I/O 上下文(io_uring 实例),参数如下:
entries
:指定提交队列(SQ)和完成队列(CQ)的初始大小(条目数)。通常为 2 的幂次方,内核可能会调整实际大小。params
:指向 io_uring_params 结构的指针,用于传递配置参数并返回队列信息。结构定义如下:
struct io_uring_params {
__u32 sq_entries; // 内核实际分配的 SQ 大小
__u32 cq_entries; // 内核实际分配的 CQ 大小
__u32 flags; // 配置标志(如 IORING_SETUP_IOPOLL)
__u32 sq_thread_cpu; // 绑定 SQ 线程的 CPU
__u32 sq_thread_idle; // SQ 线程空闲超时(毫秒)
__u32 features; // 内核返回的支持特性
__u32 resv[4];
struct io_sqring_offsets sq_off; // SQ 环的偏移信息
struct io_cqring_offsets cq_off; // CQ 环的偏移信息
};
返回值:
- 成功时返回一个文件描述符(fd),代表创建的 io_uring 实例;失败时返回 -1 并设置 errno。
int io_uring_enter(unsigned int fd, unsigned int to_submit,
unsigned int min_complete, unsigned int flags,
sigset_t *sig);
io_uring_enter 用于提交 I/O 操作请求或等待已完成事件,参数如下:
fd
: 关联的 io_uring 实例的文件描述符。to_submit
: 准备提交的 I/O 操作数量。min_complete
: 要求内核等待至少完成的事件数(若 flags 包含 IORING_ENTER_GETEVENTS)。flags
: 控制行为的标志位(如 IORING_ENTER_GETEVENTS)。sig
: 等待时临时屏蔽的信号集(可为 NULL)。
int io_uring_register(unsigned int fd, unsigned int opcode, void *arg, unsigned int nr_args);
io_uring_register 是 Linux 内核提供的系统调用(syscall),用于为 io_uring 实例注册资源(如文件描述符、缓冲区等),以优化异步 I/O 操作的性能,参数如下:
fd
: io_uring 实例的文件描述符,由 io_uring_setup 创建。opcode
: 注册操作的类型,如 IORING_REGISTER_BUFFERS(注册缓冲区)或 IORING_REGISTER_FILES(注册文件描述符)。arg
: 指向用户空间数据的指针,具体内容取决于 opcode。nr_args
: arg 指向的数组中的条目数。
其中,opcode 的类型有如下几种:
IORING_REGISTER_BUFFERS
:注册固定缓冲区,用于减少 read/write 操作中的内核-用户空间数据拷贝。IORING_REGISTER_FILES
:注册文件描述符,避免每次 I/O 操作重复传递文件描述符。IORING_REGISTER_EVENTFD
:注册事件文件描述符(eventfd),用于异步通知 I/O 完成事件。IORING_REGISTER_PROBE
:检查内核支持的 io_uring 功能(需配合 struct io_uring_probe)。
io_uring 的库其实对这三个函数进行了封装,然后提供给我们一个使用的库。
接下来我们实际写一段代码来看一下:
#include <stdio.h>
#include <sys/socket.h>
#include <liburing.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024
#define ACCEPT_EVENT 0
#define READ_EVENT 1
#define WRITE_EVENT 2
struct conn_info
{
int fd;
int event;
};
int init_server(int port)
{
// 创建socket连接
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
serveraddr.sin_port = htons(port);
// 绑定套接字
int ret = bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr));
if (ret == -1)
{
perror("bind");
return -1;
}
// 监听套接字
listen(sockfd, 10);
return sockfd;
}
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *len, int flags)
{
// 接受队列事件结构, 调用 io_uring_get_sqe 接口提交的 IO 事件
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = ACCEPT_EVENT};
// 用于准备一个异步接受连接(accept)的请求
io_uring_prep_accept(sqe, sockfd, addr, len, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
return 1;
}
int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = READ_EVENT};
// 准备一个接收数据的操作请求
io_uring_prep_recv(sqe, sockfd, buf, len, flags);
return 1;
}
int main()
{
unsigned short port = 9999;
int sockfd = init_server(port);
struct io_uring_params params;
memset(¶ms, 0, sizeof(params));
struct io_uring ring;
// 先构建两个队列出来
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);
#if 0
// 建立与客户端的连接
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
accept(sockfd, (struct sockaddr*)&clientaddr, &len);
#else
// 建立与客户端的连接
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
#endif
char buffer[BUFFER_LENGTH] = {0};
while (1)
{
// 内部实现就是 io_uring_enter,用于提交 IO 请求
io_uring_submit(&ring);
// 创建一个完成队列事件结构,通过 io_uring_wait_cqe
// 获取完成 IO 操作的事件
struct io_uring_cqe *cqe;
io_uring_wait_cqe(&ring, &cqe);
// 批量获取完成 IO 操作的事件
struct io_uring_cqe *cqes[128];
int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);
int i = 0;
for (i = 0; i < nready; i++)
{
// 获取到 accept 事件的入口
struct io_uring_cqe *entries = cqes[i];
struct conn_info result;
memcpy(&result, &entries->user_data, sizeof(struct conn_info));
if (result.event == ACCEPT_EVENT)
{
printf("set_event_accept\n");
}
}
}
return 0;
}
接口介绍
其中,io_uring_prep_accept 接口作用就是准备一个异步接受连接(accept)的请求。
void io_uring_prep_accept(struct io_uring_sqe *sqe,
int sockfd,
struct sockaddr *addr,
socklen_t *addrlen,
int flags);
参数介绍:
sqe
: 指向 io_uring 提交队列条目(Submission Queue Entry)的指针。sockfd
: 监听套接字的文件描述符。addr
: 用于存储客户端地址信息的结构体指针(可选,可为 NULL)。addrlen
: 输入时为 addr 的缓冲区大小,输出时为实际地址长度。flags
: 额外的标志位(如 SOCK_NONBLOCK 或 SOCK_CLOEXEC)。
返回值处理:
- 通过 io_uring_wait_cqe 等待完成事件后,cqe->res 为返回的客户端文件描述符。
- 若返回值 < 0,表示错误(如 -EINVAL 或 -EBADF)
io_uring_sqe 结构体,用于描述一个待提交的 I/O 操作。每个 io_uring_sqe 对应一个异步 I/O 请求(如读写、网络操作等),通过填充该结构并提交到提交队列(Submission Queue, SQ)。
struct io_uring_sqe {
__u8 opcode; // 操作类型(如 IORING_OP_READ, IORING_OP_WRITE)
__u8 flags; // 请求标志(如 IOSQE_FIXED_FILE, IOSQE_IO_LINK)
__u16 ioprio; // I/O 优先级
__s32 fd; // 文件描述符
__u64 off; // 文件偏移量
__u64 addr; // 用户态缓冲区地址(读写操作)
__u32 len; // 操作长度
union {
__kernel_rwf_t rw_flags; // 读写标志(如 RWF_NOWAIT)
__u32 fsync_flags;
__u16 poll_events;
};
__u64 user_data; // 用户自定义数据,用于回调识别
union {
__u16 buf_index; // 固定缓冲区的索引(IORING_OP_READ_FIXED)
__u64 __pad2[3];
};
};
关键字段说明:
opcode
:指定操作类型,常见值包括:
IORING_OP_READ/IORING_OP_WRITE:文件读写。
IORING_OP_SEND/IORING_OP_RECV:网络通信。
IORING_OP_POLL_ADD:事件监听。flags
:控制请求行为,例如:
IOSQE_FIXED_FILE:使用固定文件描述符(预先注册的文件表)。
IOSQE_IO_LINK:链接多个请求,形成依赖链。user_data
:用于在完成事件(CQE)中标识请求的唯一值。
使用方法:
- 获取空闲 SQE:通过 io_uring_get_sqe 从提交队列中获取一个空闲条目。
- 设置操作参数:填充 opcode、fd、addr、len 等字段。
- 提交请求:调用 io_uring_submit 将请求提交到内核。
运行程序,我们就会发现一个有意思的现象,此时一直在打印 set_event_accept 这条信息。
原因就在于在当前这个循环中,我们并没有将已完成的队列中特定的条目给回收掉,当循环回去以后,此时又继续通知处理该条目,就会一直打印,此时就需要用到 io_uring_cq_advance 接口:
void io_uring_cq_advance(struct io_uring *ring, unsigned nr);
io_uring_cq_advance 接口用于通知内核用户空间已处理完成队列(Completion Queue, CQ)中的特定条目,允许内核回收相关资源。
参数解析:
ring
: 指向 io_uring 实例的指针。nr
: 需要推进的完成队列条目数量,通常为已处理的条目数。
它的作用就在于:
- 推进完成队列:每次从 CQ 中取出并处理一个事件后,需调用此函数更新队列头指针,避免重复处理同一事件。
- 资源管理:内核会回收已标记为 “完成” 的条目,释放相关资源(如内存)。
添加这个接口以后就正常了,但是此时就存在一个问题,我们将这个事件标记为完成以后,后续就不会再发送 accept 请求了,所以在这儿就需要我们每一次都发送一个 accept 请求,所以这儿也是需要进行修改的,修改后代码如下:
#include <stdio.h>
#include <sys/socket.h>
#include <liburing.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024
#define ACCEPT_EVENT 0
#define READ_EVENT 1
#define WRITE_EVENT 2
struct conn_info
{
int fd;
int event;
};
int init_server(int port)
{
// 创建socket连接
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
serveraddr.sin_port = htons(port);
// 绑定套接字
int ret = bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr));
if (ret == -1)
{
perror("bind");
return -1;
}
// 监听套接字
listen(sockfd, 10);
return sockfd;
}
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *len, int flags)
{
// 接受队列事件结构, 调用 io_uring_get_sqe 接口提交的 IO 事件
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = ACCEPT_EVENT};
// 用于准备一个异步接受连接(accept)的请求
io_uring_prep_accept(sqe, sockfd, addr, len, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
return 1;
}
int main()
{
unsigned short port = 9999;
int sockfd = init_server(port);
struct io_uring_params params;
memset(¶ms, 0, sizeof(params));
struct io_uring ring;
// 先构建两个队列出来
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);
#if 0
// 建立与客户端的连接
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
accept(sockfd, (struct sockaddr*)&clientaddr, &len);
#else
// 建立与客户端的连接
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
#endif
char buffer[BUFFER_LENGTH] = {0};
while (1)
{
// 内部实现就是 io_uring_enter,用于提交 IO 请求
io_uring_submit(&ring);
// 创建一个完成队列事件结构,通过 io_uring_wait_cqe
// 获取完成 IO 操作的事件
struct io_uring_cqe *cqe;
io_uring_wait_cqe(&ring, &cqe);
// 批量获取完成 IO 操作的事件
struct io_uring_cqe *cqes[128];
int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);
int i = 0;
for (i = 0; i < nready; i++)
{
// 获取到 accept 事件的入口
struct io_uring_cqe *entries = cqes[i];
struct conn_info result;
memcpy(&result, &entries->user_data, sizeof(struct conn_info));
if (result.event == ACCEPT_EVENT)
{
// 保证每一次都会有 accept 请求
set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
printf("set_event_accept\n");
}
}
// 避免重复处理同一事件
io_uring_cq_advance(&ring, nready);
}
return 0;
}
运行代码,正常连接断开,再次进行连接也不会存在问题:
接下来就需要接收到这个信息,我们在这儿使用的也是 io_uring 库里面提供的函数 io_uring_prep_recv ,io_uring_prep_recv 用于准备一个接收数据的操作请求:
void io_uring_prep_recv(struct io_uring_sqe *sqe, int sockfd,
void *buf, size_t len, int flags);
参数如下:
sqe
: 指向 io_uring_sqe 结构的指针,表示提交队列条目(Submission Queue Entry)。sockfd
: 文件描述符,通常是套接字。buf
: 缓冲区指针,用于存储接收到的数据。len
: 缓冲区的长度。flags
: 接收操作的标志,与 recv(2) 系统调用中的 flags 参数相同。
代码改写如下:
#include <stdio.h>
#include <sys/socket.h>
#include <liburing.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024
#define ACCEPT_EVENT 0
#define READ_EVENT 1
#define WRITE_EVENT 2
struct conn_info
{
int fd;
int event;
};
int init_server(int port)
{
// 创建socket连接
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
serveraddr.sin_port = htons(port);
// 绑定套接字
int ret = bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr));
if (ret == -1)
{
perror("bind");
return -1;
}
// 监听套接字
listen(sockfd, 10);
return sockfd;
}
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *len, int flags)
{
// 接受队列事件结构, 调用 io_uring_get_sqe 接口提交的 IO 事件
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = ACCEPT_EVENT
};
// 用于准备一个异步接受连接(accept)的请求
io_uring_prep_accept(sqe, sockfd, addr, len, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
return 1;
}
int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = READ_EVENT
};
// 准备一个接收数据的操作请求
io_uring_prep_recv(sqe, sockfd, buf, len, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
return 1;
}
int main()
{
unsigned short port = 9999;
int sockfd = init_server(port);
struct io_uring_params params;
memset(¶ms, 0, sizeof(params));
struct io_uring ring;
// 先构建两个队列出来
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);
#if 0
// 建立与客户端的连接
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
accept(sockfd, (struct sockaddr*)&clientaddr, &len);
#else
// 建立与客户端的连接
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
#endif
char buffer[BUFFER_LENGTH] = {0};
while (1)
{
// 内部实现就是 io_uring_enter,用于提交 IO 请求
io_uring_submit(&ring);
// 创建一个完成队列事件结构,通过 io_uring_wait_cqe
// 获取完成 IO 操作的事件
struct io_uring_cqe *cqe;
io_uring_wait_cqe(&ring, &cqe);
// 批量获取完成 IO 操作的事件
struct io_uring_cqe *cqes[128];
int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);
int i = 0;
for (i = 0; i < nready; i++)
{
// 获取到 accept 事件的入口
struct io_uring_cqe *entries = cqes[i];
struct conn_info result;
memcpy(&result, &entries->user_data, sizeof(struct conn_info));
if (result.event == ACCEPT_EVENT)
{
// 保证每一次都会有 accept 请求
set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
// printf("set_event_accept\n");
int connfd = entries->res;
set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
}
else if (result.event == READ_EVENT)
{
int ret = entries->res;
printf("set_event_recv ret: %d, %s\n", ret, buffer);
}
}
// 避免重复处理同一事件
io_uring_cq_advance(&ring, nready);
}
return 0;
}
运行程序,此时就可以看见我们的服务端是可以正常的接收到消息的,但是此时只能接受一次,后续客户端继续发送我们又接受不到了,而且我们也不支持回发消息:
我们需要解决上面的问题就需要调用回发数据的接口,回发数据以后,更新状态,当前又需要接收到客户端所发的数据,就像前面 accept 的时候一样,这儿我们每一次都是需要发起一个 recv 的请求的,否则就会被标记为已完成的事件。
完整代码如下:
#include <stdio.h>
#include <sys/socket.h>
#include <liburing.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024
#define ACCEPT_EVENT 0
#define READ_EVENT 1
#define WRITE_EVENT 2
struct conn_info
{
int fd;
int event;
};
int init_server(unsigned short port)
{
// 创建socket连接
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
serveraddr.sin_port = htons(port);
// 绑定套接字
int ret = bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr));
if (ret == -1)
{
perror("bind");
return -1;
}
// 监听套接字
listen(sockfd, 10);
return sockfd;
}
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *len, int flags)
{
// 接受队列事件结构, 调用 io_uring_get_sqe 接口提交的 IO 事件
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = ACCEPT_EVENT,
};
// 用于准备一个异步接受连接(accept)的请求
io_uring_prep_accept(sqe, sockfd, addr, len, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
return 1;
}
int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = READ_EVENT,
};
// 准备一个接收数据的操作请求
io_uring_prep_recv(sqe, sockfd, buf, len, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
return 1;
}
int set_event_send(struct io_uring *ring, int sockfd, const void *buf, size_t len, int flags)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info = {
.fd = sockfd,
.event = WRITE_EVENT,
};
io_uring_prep_send(sqe, sockfd, buf, len, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
int main()
{
unsigned short port = 9999;
int sockfd = init_server(port);
struct io_uring_params params;
memset(¶ms, 0, sizeof(params));
struct io_uring ring;
// 先构建两个队列出来
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);
#if 0
// 建立与客户端的连接
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
accept(sockfd, (struct sockaddr*)&clientaddr, &len);
#else
// 建立与客户端的连接
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
#endif
char buffer[BUFFER_LENGTH] = {0};
while (1)
{
// 内部实现就是 io_uring_enter,用于提交 IO 请求
io_uring_submit(&ring);
// 创建一个完成队列事件结构,通过 io_uring_wait_cqe
// 获取完成 IO 操作的事件
// struct io_uring_cqe *cqe;
// io_uring_wait_cqe(&ring, &cqe);
// 批量获取完成 IO 操作的事件
struct io_uring_cqe *cqes[128];
int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);
int i = 0;
for (i = 0; i < nready; i++)
{
// 获取到已完成 IO 事件的入口
struct io_uring_cqe *entries = cqes[i];
struct conn_info result;
memcpy(&result, &entries->user_data, sizeof(struct conn_info));
if (result.event == ACCEPT_EVENT)
{
// 保证每一次都会有 accept 请求
set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
printf("set_event_accept\n");
int connfd = entries->res;
printf("connfd: %d\n", connfd);
set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
}
else if (result.event == READ_EVENT)
{
int ret = entries->res;
printf("set_event_recv ret: %d, %s\n", ret, buffer);
if (ret == 0)
{
close(result.fd);
}
else if (ret > 0)
{
set_event_send(&ring, result.fd, buffer, BUFFER_LENGTH, 0);
}
}
else if (result.event == WRITE_EVENT)
{
int ret = entries->res;
printf("set_event_send ret: %d, %s\n", ret, buffer);
set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);
}
}
// 避免重复处理同一事件
io_uring_cq_advance(&ring, nready);
}
return 0;
}
运行代码,创建 3 个客户端,此时每个客户端都可以连接上,对于每次发送的消息,客户端也可以接收到:
性能测试
接下来我们编写一段客户端代码,对 io_uring 的性能进行一下测试
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <unistd.h>
#include <getopt.h>
#include <sys/time.h>
#include <pthread.h>
#include <arpa/inet.h>
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#define TEST_MESSAGE "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyz\r\n"
#define RBUFFER_LENGTH 2048
#define WBUFFER_LENGTH 2048
typedef struct test_context_s {
char serverip[16];
int port;
int threadnum;
int connection;
int requestion;
#if 1
int failed;
#endif
} test_context_t;
int connect_tcpserver(const char *ip, unsigned short port) {
int connfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in tcpserver_addr;
memset(&tcpserver_addr, 0, sizeof(struct sockaddr_in));
tcpserver_addr.sin_family = AF_INET;
tcpserver_addr.sin_addr.s_addr = inet_addr(ip);
tcpserver_addr.sin_port = htons(port);
int ret = connect(connfd, (struct sockaddr*)&tcpserver_addr, sizeof(struct sockaddr_in));
if (ret) {
perror("connect");
return -1;
}
return connfd;
}
int send_recv_tcppkt(int fd) {
char wbuffer[WBUFFER_LENGTH] = {0};
int i = 0;
for (i = 0;i < 16;i ++) {
strcpy(wbuffer + i * strlen(TEST_MESSAGE), TEST_MESSAGE);
}
int res = send(fd, wbuffer, strlen(wbuffer), 0);
if (res < 0) {
exit(1);
}
char rbuffer[RBUFFER_LENGTH] = {0};
res = recv(fd, rbuffer, RBUFFER_LENGTH, 0);
if (res <= 0) {
exit(1);
}
if (strcmp(rbuffer, wbuffer) != 0) {
printf("failed: '%s' != '%s'\n", rbuffer, wbuffer);
return -1;
}
return 0;
}
static void *test_qps_entry(void *arg) {
test_context_t *pctx = (test_context_t*)arg;
int connfd = connect_tcpserver(pctx->serverip, pctx->port);
if (connfd < 0) {
printf("connect_tcpserver failed\n");
return NULL;
}
int count = pctx->requestion / pctx->threadnum;
int i = 0;
int res;
while (i++ < count) {
res = send_recv_tcppkt(connfd);
if (res != 0) {
printf("send_recv_tcppkt failed\n");
pctx->failed ++; //
continue;
}
}
return NULL;
}
// ./test_qps_tcpclient -s 127.0.0.1 -p 2048 -t 50 -c 100 -n 10000
int main(int argc, char *argv[]) {
int ret = 0;
test_context_t ctx = {0};
int opt;
while ((opt = getopt(argc, argv, "s:p:t:c:n:?")) != -1) {
switch (opt) {
case 's':
printf("-s: %s\n", optarg);
strcpy(ctx.serverip, optarg);
break;
case 'p':
printf("-p: %s\n", optarg);
ctx.port = atoi(optarg);
break;
case 't':
printf("-t: %s\n", optarg);
ctx.threadnum = atoi(optarg);
break;
case 'c':
printf("-c: %s\n", optarg);
ctx.connection = atoi(optarg);
break;
case 'n':
printf("-n: %s\n", optarg);
ctx.requestion = atoi(optarg);
break;
default:
return -1;
}
}
pthread_t *ptid = malloc(ctx.threadnum * sizeof(pthread_t));
int i = 0;
struct timeval tv_begin;
gettimeofday(&tv_begin, NULL);
for (i = 0;i < ctx.threadnum;i ++) {
pthread_create(&ptid[i], NULL, test_qps_entry, &ctx);
}
for (i = 0;i < ctx.threadnum;i ++) {
pthread_join(ptid[i], NULL);
}
struct timeval tv_end;
gettimeofday(&tv_end, NULL);
int time_used = TIME_SUB_MS(tv_end, tv_begin);
printf("success: %d, failed: %d, time_used: %d, qps: %d\n", ctx.requestion-ctx.failed,
ctx.failed, time_used, ctx.requestion * 1000 / time_used);
free(ptid);
return ret;
}
简单介绍一下客户端的代码,就是将一段数据写入到缓冲区当中,然后发送给服务端,我们在这儿的逻辑就是通过启动多个客户端发送请求,然后对应的服务端进行处理,看其处理时间,在这儿跟之前的 epoll 进行对比,查看两者之间的性能差距。
之前 epoll 的代码如下:
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <sys/time.h>
#define BUFFER_LENGTH 1024
#define CONNECTION_SIZE 1024 * 1024
#define MAX_PORTS 1
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
typedef int (*CALLBACK)(int fd);
int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);
struct timeval begin;
int epfd = 3;
struct conn
{
// 负责IO的文件描述符
int fd;
// 接收缓冲区的buffer
char rbuffer[BUFFER_LENGTH];
int rlength;
// 发送缓冲区的buffer
char wbuffer[BUFFER_LENGTH];
int wlength;
// 三个对应的回调函数
CALLBACK send_callback;
union
{
CALLBACK accept_callback;
CALLBACK recv_callback;
} r_action;
};
struct conn con_list[CONNECTION_SIZE] = {0};
void send_event(int fd, int event, int flag)
{
if (flag)
{
struct epoll_event ev;
ev.data.fd = fd;
ev.events = event;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
}
else
{
struct epoll_event ev;
ev.data.fd = fd;
ev.events = event;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
int event_register(int fd, int event)
{
if (fd < 0)
{
return -1;
}
con_list[fd].fd = fd;
con_list[fd].r_action.recv_callback = recv_cb;
con_list[fd].send_callback = send_cb;
memset(con_list[fd].rbuffer, 0, BUFFER_LENGTH);
con_list[fd].rlength = 0;
memset(con_list[fd].wbuffer, 0, BUFFER_LENGTH);
con_list[fd].wlength = 0;
send_event(fd, event, 1);
}
int accept_cb(int fd)
{
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(fd, (struct sockaddr *)&clientaddr, &len);
if (clientfd < 0)
{
printf("accept failed !!!\n");
}
else
{
// printf("accept finished: %d\n", clientfd);
}
event_register(clientfd, EPOLLIN);
if ((clientfd % 1000) == 0)
{
struct timeval current;
gettimeofday(¤t, NULL);
int time_used = TIME_SUB_MS(current, begin);
memcpy(&begin, ¤t, sizeof(struct timeval));
printf("accept finshed: %d, time_used: %d\n", clientfd, time_used);
}
return 0;
}
int recv_cb(int fd)
{
// memset(con_list[fd].rbuffer, 0, BUFFER_LENGTH);
int count = recv(fd, con_list[fd].rbuffer, BUFFER_LENGTH, 0);
if (count == 0)
{
printf("client disconnect: %d\n", fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
return 0;
}
else if (count < 0)
{
printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno));
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
return 0;
}
con_list[fd].rlength = count;
// printf("recv succ: %s\n", con_list[fd].rbuffer);
#if 1
con_list[fd].wlength = con_list[fd].rlength;
memcpy(con_list[fd].wbuffer, con_list[fd].rbuffer, con_list[fd].rlength);
#endif
send_event(fd, EPOLLOUT, 0);
return count;
}
int send_cb(int fd)
{
int count = send(fd, con_list[fd].wbuffer, BUFFER_LENGTH, 0);
send_event(fd, EPOLLIN, 0);
return count;
}
int init_server(unsigned short port)
{
// 创建套接字
int socketfd = socket(AF_INET, SOCK_STREAM, 0);
printf("socketfd: %d\n", socketfd);
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
serveraddr.sin_port = htons(port); // 0 ~ 1023
// 绑定套接字
int ret = bind(socketfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr));
if (ret == -1)
{
printf("bind failed: %s\n", strerror(errno));
}
// 监听套接字
listen(socketfd, 10);
return socketfd;
}
int main()
{
unsigned short port = 2000;
int epfd = epoll_create(1);
printf("epfd: %d\n", epfd);
int i = 0;
for (i = 0; i < MAX_PORTS; i++)
{
int sockfd = init_server(port + i);
// printf("socket fd: %d\n", sockfd);
con_list[sockfd].fd = sockfd;
con_list[sockfd].r_action.recv_callback = accept_cb;
send_event(sockfd, EPOLLIN, 1);
}
gettimeofday(&begin, NULL);
while (1)
{
struct epoll_event events[1024] = {0};
// 将就绪事件放入到就绪队里当中
int nready = epoll_wait(epfd, events, 1024, -1);
for (int i = 0; i < nready; i++)
{
int connfd = events[i].data.fd;
#if 0
if((events[i].events & EPOLLIN))
{
con_list[i].r_action.recv_callback(connfd);
}
else if ((events[i].events & EPOLLOUT))
{
con_list[i].send_callback(connfd);
}
#else
if ((events[i].events & EPOLLIN))
{
con_list[connfd].r_action.recv_callback(connfd);
}
if ((events[i].events & EPOLLOUT))
{
con_list[connfd].send_callback(connfd);
}
#endif
}
}
return 0;
}
测试的两者均创建 100 个线程,发起一百万个请求,查看其处理时间,首先来看 io_uring 的测试结果,花费时间为 16 ms左右:
接下来再来看 epoll 的处理性能:
两者的差距在 2ms 左右,其实对比下来 io_uring 还是有一个提升的,对于更多的连接的时候,io_uring 的效率还是会优于 epoll 的。