Linux io_uring

发布于:2024-07-11 ⋅ 阅读:(7) ⋅ 点赞:(0)

io_uring是Linux 内核提供的用于处理大量并发 I/O 事件的机制,在性能上优于epoll。

主要优势有:

(1)无锁设计:

io_uring:提供了无锁的用户空间和内核空间通信机制,这意味着在高并发场景下,io_uring 可以减少锁争用,提高性能。
(2)提交和完成队列:

io_uring:使用两个队列,一个是提交队列(submission queue),用于提交 I/O 请求;另一个是完成队列(completion queue),用于接收完成的 I/O 事件。这种分离可以减少内核和用户空间的交互次数。
epoll:使用一个事件列表来通知用户空间哪些 I/O 事件已经准备好。虽然 epoll 也提供了高效的 I/O 事件通知机制,但它通常需要更多的上下文切换和内核调用。
可扩展性:

(3)异步IO支持

io_uring:原生支持异步 I/O 操作,可以直接在用户空间管理 I/O 事件,减少了系统调用的开销。
epoll:主要用于通知 I/O 事件的就绪状态,但实际的 I/O 操作(如 read 和 write)仍然需要通过系统调用来完成。
 

下面实现一个简单的基于io_uring的TCP服务器

#include <stdio.h> // 包含标准输入输出库
#include <liburing.h> // 包含io_uring库,用于异步I/O操作
#include <netinet/in.h> // 包含网络接口的函数和常量
#include <string.h> // 包含字符串操作函数
#include <unistd.h> // 包含UNIX标准函数的定义

#define EVENT_ACCEPT   	0 // 定义事件类型:接受连接
#define EVENT_READ		1 // 定义事件类型:读取数据
#define EVENT_WRITE		2 // 定义事件类型:写入数据

struct conn_info { // 定义一个结构体用来存储连接信息
	int fd; // 文件描述符
	int event; // 事件类型
};

// 初始化服务器并监听指定端口
int init_server(unsigned short port) {	
	int sockfd = socket(AF_INET, SOCK_STREAM, 0); // 创建套接字
	// 填充服务器地址结构体
	struct sockaddr_in serveraddr;	
	memset(&serveraddr, 0, sizeof(struct sockaddr_in));	
	serveraddr.sin_family = AF_INET; // 地址族
	serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); // 服务器IP地址
	serveraddr.sin_port = htons(port); // 服务器端口

	// 绑定套接字到指定的地址和端口
	if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) {		
		perror("bind"); // 如果绑定失败,打印错误信息
		return -1; // 返回错误代码
	}	
	listen(sockfd, 10); // 开始监听套接字,最大连接数为10
	return sockfd; // 返回套接字描述符
}

// 定义常量,设置io_uring的条目数量和缓冲区长度
#define ENTRIES_LENGTH		1024
#define BUFFER_LENGTH		1024

// 设置接收事件
int set_event_recv(struct io_uring *ring, int sockfd,
				      void *buf, size_t len, int flags) {
	// 获取一个SQE(提交队列条目)
	struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
	// 准备接收数据的请求
	io_uring_prep_recv(sqe, sockfd, buf, len, flags);
	// 将conn_info结构体的值复制到SQE的用户数据区域
	memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}

// 设置发送事件
int set_event_send(struct io_uring *ring, int sockfd,
				      void *buf, size_t len, int flags) {
	// 获取一个SQE
	struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
	// 准备发送数据的请求
	io_uring_prep_send(sqe, sockfd, buf, len, flags);
	// 复制conn_info到SQE的用户数据区域
	memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}

// 设置接受连接事件
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr,
					socklen_t *addrlen, int flags) {
	// 获取一个SQE
	struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
	// 准备接受连接的请求
	io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags);
	// 复制conn_info到SQE的用户数据区域
	memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}

// 主函数
int main(int argc, char *argv[]) {
	// 设置服务器监听的端口
	unsigned short port = 9999;
	// 初始化服务器并获取套接字描述符
	int sockfd = init_server(port);

	// 初始化io_uring参数
	struct io_uring_params params;
	// 清零参数结构体
	memset(&params, 0, sizeof(params));
	// 初始化io_uring
	struct io_uring ring;
	io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);

	// 以下代码块被注释掉,说明原始代码可能考虑过使用传统的accept方法
#if 0
	// 省略的代码块
#else
	// 使用io_uring设置接受连接的事件
	set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);
#endif

	// 定义缓冲区用于数据交换
	char buffer[BUFFER_LENGTH] = {0};

	// 无限循环,处理io_uring事件
	while (1) {
		// 提交所有SQE到io_uring
		io_uring_submit(&ring);

		// 获取一个完成的CQE(完成队列条目)
		struct io_uring_cqe *cqe;
		io_uring_wait_cqe(&ring, &cqe);

		// 批量获取完成的CQE
		struct io_uring_cqe *cqes[128];
		int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);

		// 遍历所有完成的CQE
		int i = 0;
		for (i = 0; i < nready; i++) {
			// 获取CQE
			struct io_uring_cqe *entries = cqes[i];
			// 创建conn_info结构体用于存储事件信息
			struct conn_info result;
			// 从CQE的用户数据区域复制conn_info
			memcpy(&result, &entries->user_data, sizeof(struct conn_info));

			// 根据事件类型处理不同的I/O操作
			if (result.event == EVENT_ACCEPT) {
				// 处理接受连接事件
				set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);
				int connfd = entries->res; // 获取新的连接描述符
				set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0); // 设置读取事件
			} else if (result.event == EVENT_READ) {
				// 处理读取事件
				int ret = entries->res;
				if (ret == 0) {
					close(result.fd); // 如果读取结束,则关闭连接
				} else if (ret > 0) {
					set_event_send(&ring, result.fd, buffer, ret, 0); // 设置发送事件
				}
			} else if (result.event == EVENT_WRITE) {
				// 处理发送事件
				int ret = entries->res;
				set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0); // 发送完成后设置读取事件
			}
		}

		// 推进CQ(完成队列)的偏移量
		io_uring_cq_advance(&ring, nready);
	}
}