【网络编程】事件驱动 reactor 式的服务器(EPOLL机制)

发布于:2025-07-03 ⋅ 阅读:(23) ⋅ 点赞:(0)

推荐一个零声教育学习教程,个人觉得老师讲得不错,分享给大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,点击立即学习: https://github.com/0voice 链接

业务拆解

在上一篇 文章 里,我们使用 EPOLL 机制去搭建服务器,能够低成本高效率的执行 “多路复用网络 I/O 高并发”。在本篇文章里,我们将要把上一篇问文章的代码切分开来,使之模块化,更好地满足业务特色化需求。比如,最基础的网络 I 任务,就是读取信息,如果业务有特殊需求,我们要特殊处理所读取的信息,我们就要专门在源代码的基础上额外写多几个业务读函数;再比如,最基础的网络 O 任务,是发送特定资源信息,如果业务有特殊需求,我们要特殊处理将要发送的信息,比如以 HTTP 报文形式发送给用户,使得用户可以在浏览器上看到我们所发的内容。

总之,我们要实现一个服务器底座,一个可根据业务内容做简单扩展的服务器代码,降低开发难度。我们称之为 “事件驱动的 reactor”。

事件驱动的 reactor

reactor 顾名思义就是反应器的原理,不同的信号就会有不同的反应,回顾上一篇 文章 的 EPOLL 服务器,主要有三类网络 I/O 任务

  • 输入任务1:监听套接字 sockfd 监听到来访 IP,读取连接信息,分配套接字 clientfd,以负责对应网络 I/O。
  • 输入任务2:客户端发来信息,epoll 调用操作系统内核通知进程处理读事件。
  • 输出任务:沿着对应套接字 clientfd 向客户端发送信息。

也就是说我们要把代码分成三个模块,实现三种因事件信号而异的反应,综合起来就是一个反应器 reactor。我们在每一个模块中预留一个地方给各自的业务代码函数。这三个模块我们分别记成

  1. accept_cb 模块,
  2. read_cb 模块,
  3. send_cb 模块

总流程图

为了简化表达,避免像上一篇 文章 那样写的那么复杂。我们也分模块来写流程图,各个模块再给出它自己的流程图。我们先给总体的流程图。

Created with Raphaël 2.3.0 开始 init_server 占用若干个端口建立服务器套接字 sockfds,使这些 sockfds 处于监听状态 创建 epoll 实例 epfd(被一个套接字所表示),作为总集 创建长度固定的 struct epoll_event 事件数组,作为 epfd 内核中就绪链表的誊写白纸 把若干个监听套接字 sockfds 的关注事件类型设置成 EPOLLIN,以 epoll_event 形式加入实例 epfd 调用 epoll_wait 函数, epoll 实例同时监视服务器的监听套接字 sockfd 以及所注册所有的来访 IP 连接的套接字 clientfd,并且统计响应数量 nready 对这 nready 个内核响应的套接字-事件,我们逐个来处理 这 nready 个套接字-事件是否还没有处理完? 当前所检查的套接字是否正常(没出现错误 EPOLLERR )? 当前所检查的套接字不是监控套接字吗? 当前所检查的 套接字-事件 不是 EPOLLIN 事件吗? 当前所检查的 套接字-事件 不是 EPOLLOUT 事件吗? 关闭套接字,释放资源,快进到下一个套接字-事件 已处理的 套接字-事件 计数加 1 send_cb 模块,发送内容给客户端 recv_cb 模块,ET 边沿模式需循环读取 I/O 文件 accept_cb 模块,注册新的套接字-事件入 EPOLL 实例 关闭套接字,释放资源,快进到下一个套接字-事件 yes no yes no yes no yes no yes no

C 代码实现

准备工作

编写头文件 reactor.h

此文件内定义了特殊的结构体,记录了若干个套接字每次 I/O 的内容

#ifndef __SERVER_H__
#define __SERVER_H__

#define BUFFER_LENGTH		1024


typedef int (*RCALLBACK)(int fd);


struct conn {
	int fd;

	//	申请缓冲区的大小 1024 --> 2048 ...... 逐步递增 KB,在 accept_cb 函数中用 malloc 建立,在 recv_cb 和 send_cb 函数中用 realloc 函数调整其大小
	char *rbuffer;	//	边沿模式下,我们是不能够假设我们要读取总量多少的内容,只能是全部读取
	ssize_t rlength;
	ssize_t rcap;  // 读缓冲区容量

	char *wbuffer;	//	边沿模式下,我们是不能够假设我们要读取总量多少的内容,只能是全部写入
	ssize_t wlength;
	ssize_t wcap;  // 写缓冲区容量

	RCALLBACK send_callback;

	union {
		RCALLBACK recv_callback;
		RCALLBACK accept_callback;
	} r_action;

	

};

#endif

其中以下代码结构是头文件为了保护定义而专门设置的,避免同一个头文件重复定义

#ifndef __SERVER_H__
#define __SERVER_H__

// 定义类型

#endif

所定义的 struct conn 类型中有一个成员值得注意,那就是 r_action,它是一个联合体而非结构体,它有一词多义性,我们知道 RCALLBACK 是一个回调函数,

typedef int (*RCALLBACK)(int fd);

回调函数可以是任何 同 (返回类型、传入参数类型)的函数。而在类型 struct conn 中的成员 r_action 就好比 “面向对象编程中的多态”,成员都叫同一个名字,但是具体的定义是不一样的。下文的主函数(服务器代码)中会出现两行类似的代码,就是来自于回调函数的使用,它们分别是(读者可以通篇阅读完这篇文章后再回来)

1、conn_list[sockfd].r_action.accept_callback = accept_cb;
2、conn_list[fd].r_action.recv_callback = recv_cb;
3、conn_list[fd].send_callback = send_cb;
4for (int j = 0; j < MAX_PORTS; j++) {
		if (connfd == listen_fds[j]) {
			conn_list[connfd].r_action.accept_callback(connfd);
			is_listener = 1;
			break;
		}
	}
5if (conn_list[connfd].r_action.recv_callback(connfd) >= 0) { // 返回0表示成功
		printf("[%ld] RECV: %s\n",conn_list[connfd].rlength, conn_list[connfd].rbuffer);
	} else {
		//	连接在 recv 函数处早已释放
		continue; // 跳过后续处理
	}
6、conn_list[connfd].send_callback(connfd);

准备头文件

#include <errno.h>				// 这是全局变量 errno,用于健壮的读取功能
#include <stdio.h>	
#include <stdlib.h>				// 动态内存分配
#include <sys/socket.h>			// 创建和管理套接字。绑定地址、监听连接和接受连接。发送和接收数据。设置和获取套接字选项。 socket()、connect()、sendto()、recvfrom()、accept()
#include <netinet/in.h>			// 提供了结构体 sockaddr_in
#include <string.h>				// strerror 函数
#include <fcntl.h>				// 用于更改套接字的模式,比如非阻塞模式
#include <unistd.h>				// close 函数,关闭套接字
#include <sys/types.h>			// ssize_t 是一个有符号的整数类型
#include <sys/epoll.h>			// EPOLL 高并发机制
#include <sys/time.h>			// timeval 类型,用于表述等待时间


#include "reactor.h"				// 底层数据结构,回调函数、业务端函数的统一声明

此处注意到我们是导入了刚刚所写的头文件 “reactor.h”。

准备宏定义

这个代码可建立一百多万个连接,下一篇文章里我将介绍百万连接的方法。为了释放这百万连接的代码潜力,我们要定义这三个宏。

#define CONNECTION_SIZE			1048576 	// 	1024 * 1024,即我们要测试 1 M 的连接数

#define MAX_PORTS				20			//	该服务器占用本地 20 个端口,用以建立网络 I/O ,更好地实现百万并发

//	这是一个宏操作,是两个时间戳的相减,表示 “计时”
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)	

MAX_PORTS 是用来支持百万连接的,CONNECTION_SIZE 是指定最大的连接数。TIME_SUB_MS(tv1, tv2) 宏操作是用来测试服务器的性能,用来计算运行的时长

声明三大模块函数和基础的内存变量长度

// 声明函数,先写主函数的代码,适用于底层链接的
int accept_cb(int fd);
int recv_cb(int fd );
int send_cb(int fd);

#define BUFFER_LENGTH		1024

无论是网络输入还是输出,每次操作的字节数上限都是 BUFFER_LENGTH 个,慢慢的逐步地有条不紊地接收发送信息,而非一次过把所有内容都吞下。

定义全局变量

当我们定义了某个全局变量,我们所定义所有函数都可以在不传入该变量的前提下对其进行改变,无通过效仿一般函数的传入参数前外加取址符,以求在内存层次改变变量本身。

全局变量并不占用线程栈的内存空间,而是存储在 “静态数据区” 之中。

int epfd = 0;			//  epoll 事件文件套接字,它申请为一个全局变量
struct timeval begin;	//	声明一个全局时间戳变量

//	本 EPOLL 实例一次最多建立 1 M = 1024*1024 个网络 I/O 事件(百万并发);这又被称之为总体事件集
struct conn conn_list[CONNECTION_SIZE] = {0};	
// 	这是一个全局变量
//	在函数中对全局变量赋值后,函数执行完毕后全局变量的值会保持修改后的值。全局变量的特性决定了它的值在程序运行期间会持久存在,不会因为函数执行结束而重置。

如果读者不知道静态数据区的大小是多少,可以通过以下方法查看(我是用 Linux 系统编程的)

qiming@qiming:~/share/CTASK/TCP_test$ ulimit -a
real-time non-blocking time  (microseconds, -R) unlimited
core file size              (blocks, -c) 0
data seg size               (kbytes, -d) unlimited
scheduling priority                 (-e) 0
file size                   (blocks, -f) unlimited
pending signals                     (-i) 15051
max locked memory           (kbytes, -l) 496096
max memory size             (kbytes, -m) unlimited
open files                          (-n) 1024
pipe size                (512 bytes, -p) 8
POSIX message queues         (bytes, -q) 819200
real-time priority                  (-r) 0
stack size                  (kbytes, -s) 8192
cpu time                   (seconds, -t) unlimited
max user processes                  (-u) 15051
virtual memory              (kbytes, -v) unlimited
file locks                          (-x) unlimited

我们注意到

data seg size               (kbytes, -d) unlimited
  • data seg size:表示程序数据段的最大大小,单位为 KB。数据段包括全局变量和静态变量,这些变量存储在全局数据区中。

这说明静态数据区没有特别的限制,仅由操作系统的内存所限制。故而我们不必担心 conn_list 这个一百万多元的超大数组会超规模占用空间。

另外,epfd 将会是下文的 “EPOLL 实例”,begin 是全局的开端时间戳。

定义 EPOLL 实例事件处理的函数与释放资源的函数

EPOLL 实例事件处理的函数如下。

//	该函数可以实现对文件描述符的 EPOLL 事件注册、修改或删除,flag 参数是用来区分情况的
int set_event(int fd, int event, int flag) {
	//	fd 是目标文件描述符或套接字;
	//	event 是事件类型,比如读事件 EPOLLIN;
	//	flag 是用来区分是注册还是修改的;1 表示是注册;0 表示修改;2 表示删除
	
	if (flag == 1) {  // add 1

		struct epoll_event ev;
		ev.events = event;
		ev.data.fd = fd;
		if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
			// 老板感兴趣的事件是,前台小姐姐的工作情况;EPOLL_CTL_ADD 是把套接字 sockfd 和事件 event 注册入 EPOLL 之中
			// (即一个套接字 fd 对应一个就绪状态表 0/1)
			perror("epoll_ctl failed");
			close(fd);
			return -1;
    	}
	} else if (flag == 2) {	// delete 2

		struct epoll_event ev;
		ev.events = event;
		ev.data.fd = fd;
		if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev) < 0) {
			// 老板感兴趣的事件是,前台小姐姐的工作情况;EPOLL_CTL_ADD 是把套接字 sockfd 和事件 event 注册入 EPOLL 之中
			// (即一个套接字 fd 对应一个就绪状态表 0/1)
			perror("epoll_ctl failed");
			
			return -1;
    	}
		
	} else if (flag == 0) {  // modify 0

		struct epoll_event ev;
		ev.events = event;
		ev.data.fd = fd;
		//	EPOLL_CTL_MOD:修改已经注册到 epoll 实例中的文件描述符 fd 的监视事件。
		if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) < 0) {
			// 老板感兴趣的事件是,前台小姐姐的工作情况;EPOLL_CTL_ADD 是把套接字 sockfd 和事件 event 注册入 EPOLL 之中
			// (即一个套接字 fd 对应一个就绪状态表 0/1)
			perror("epoll_ctl failed");
			close(fd);
			return -1;
		}
	} else {
		printf("Param Error: flag =0, 1, 2\n");
	}
	

}

我们注意到,该函数有一个 int flag 参数,这其实是一个状态机,用来标明选择处理方式。用来区分是注册还是修改的,抑或删除;1 表示是注册;0 表示修改;2 表示删除。

释放资源的函数如下。

//	只在连接关闭时释放资源:事件 event 代号 0 通常表示没有事件发生
void close_connection(int fd) {
    set_event(fd, 0, 2);
    close(fd);
    
    if (conn_list[fd].rbuffer) {
        free(conn_list[fd].rbuffer);
        conn_list[fd].rbuffer = NULL; // 防止重复释放
    }
    
    if (conn_list[fd].wbuffer) {
        free(conn_list[fd].wbuffer);
        conn_list[fd].wbuffer = NULL;
    }
    
    memset(&conn_list[fd], 0, sizeof(struct conn));
}

缓冲区生命周期必须与连接生命周期一致,在连接关闭前不要释放缓冲区,在连接关闭后确保完全释放。

注册服务器监听套接字的函数

占用设备的端口,设置监听套接字。

//	创建监听套接字 sockfd,并且绑定本机 IP 和端口 port
// (远程IP, 远程PORT, 本地IP, 本地PORT, 协议) 与 网络 I/O 一对一成型一个 sockfd 
int init_server(unsigned short port) {

	int sockfd = socket(AF_INET, SOCK_STREAM, 0);

	struct sockaddr_in servaddr;
	servaddr.sin_family = AF_INET;
	servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 0.0.0.0
	servaddr.sin_port = htons(port); // 0-1023, 

	// 设置端口复用
    // 当服务器主动关闭 TCP 连接时,会进入 TIME_WAIT 状态(通常持续 2MSL,约 1-4 分钟)。在此期间,操作系统会保留该端口绑定记录,防止延迟到达的数据包干扰新连接。
    // 问题:服务器崩溃或重启后尝试重新绑定端口时,会因 TIME_WAIT 状态导致 bind() 失败(错误:Address already in use) 
    // 以下处理措施:能避免再次启用服务器程序时,系统的宕机
    int reuse = 1;
    if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) {
        //  如果未设置 SO_REUSEADDR,导致端口被占用后无法立即重用(TIME_WAIT 状态)
        printf("setsockopt failed: %s\n", strerror(errno));
        return -1;
    }
    // setsockopt 是一个用于设置套接字选项的系统调用函数。
    // 第二项参数 level:指定选项所在的协议级别。常见的值包括:SOL_SOCKET:表示套接字级别的选项。IPPROTO_TCP:表示 TCP 协议级别的选项。IPPROTO_IP:表示 IP 协议级别的选项。IPPROTO_IPV6:表示 IPv6 协议级别的选项。
    // 第三项参数 optname:指定要设置的选项名称。不同的协议级别有不同的选项名称。例如:在 SOL_SOCKET 级别,常见的选项包括 SO_REUSEADDR、SO_KEEPALIVE、SO_LINGER 等。在 IPPROTO_TCP 级别,常见的选项包括 TCP_NODELAY 等。
    // 第四项参数 optval:指向包含选项值的内存区域。选项值的类型和大小取决于 optname
    // 第五项参数 optlen:指定 optval 的长度(以字节为单位)。


	if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {
		printf("bind failed: %s\n", strerror(errno));	//	strerror 函数定义在 <string.h> 中,而 errno 是 <errno.h> 的全局变量
		return -1;
	}

	//	一次监听 10 个来访 IP:PORT
	//	printf("listen finshed: %d\n", sockfd); // 3 
	if (listen(sockfd, 10) < 0) {
        // 将套接字设置为被动模式:套接字从主动连接模式(用于客户端)转换为被动监听模式(用于服务器)。我们可以把这个 socket 想象成公司的前台小姐。
        // 5:是监听队列的最大长度,表示系统可以为该套接字排队的最大未完成连接数。当新的连接请求到达时,如果队列已满,新的连接请求将被拒绝。
        // 返回值:成功,返回 0。失败,返回 -1,并设置 errno 以指示错误原因。
        printf("listen finshed: %d\n", sockfd); 
        return -1;
    }

	return sockfd;

}

accept_cb 模块

首先给出的是,针对已经在 EPOLL 实例中注册的套接字,初始化其在全局变量 conn_list 对应位置上的内存配置。

//	针对已经注册的 clientfd	(当然也有可能注册不成功,要注意处理失败) 在事件总集的对应 fd 上的综合情况进行初始化
int event_register(int fd, int event) {

	if (fd < 0) return -1;								//	这里是用来应对 accept 函数调用失败的错误

	conn_list[fd].fd = fd;
	conn_list[fd].r_action.recv_callback = recv_cb;
	conn_list[fd].send_callback = send_cb;

	conn_list[fd].rbuffer = NULL;		//	重置读缓冲区
	conn_list[fd].rlength = 0;
	conn_list[fd].rcap = 0;
	
	conn_list[fd].wbuffer = NULL; // 确保初始化为NULL
    conn_list[fd].wcap = 0;
    conn_list[fd].wlength = 0;

	set_event(fd, event, 1);							//  标志 1 表示当前是对 fd 的事件注册而非修改
}

紧接着,当服务器的监听套接字监听到了来访 IP 时,分配出新的套接字以对接接下来对该 IP 的 I/O 任务。

// listenfd(sockfd) --> EPOLLIN --> accept_cb	根据情况使用回调函数————监控套接字 sockfd 的读事件是 accept 注册 clientfd
int accept_cb(int fd) {

	struct sockaddr_in  clientaddr;
	socklen_t len = sizeof(clientaddr);
	//	这两个变量是专门用来注册 clientfd 的

	int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
	// accept 会因为无输入而阻塞(因为 sockfd 是默认的阻塞模式),本设计就是防止其阻塞(利用条件判断绕开阻塞)
	if (clientfd < 0) {
		printf("accept errno: %d --> %s\n", errno, strerror(errno));
		return -1;
	}
	
	// 动态分配读入内存的空间
    conn_list[clientfd].rbuffer = malloc(BUFFER_LENGTH); // 先行分配1024字节的内存,它是读取内容最终归宿
	if (conn_list[clientfd].rbuffer) {
		memset(conn_list[clientfd].rbuffer, 0, BUFFER_LENGTH); // 初始化为0
	}
	conn_list[clientfd].rlength = 0;
	conn_list[clientfd].rcap = BUFFER_LENGTH;  //  记录容量

    if (conn_list[clientfd].rbuffer == NULL) {
        perror("Malloc ReadBuffer Error");
        return -1;
    }
	
	//  使用 `EPOLLET` 必须配合非阻塞套接字,否则可能阻塞线程
	int flags = fcntl(clientfd, F_GETFL, 0);        // F_GETFL 是获取标志的命令
	fcntl(clientfd, F_SETFL, flags | O_NONBLOCK);   // 要注意 “|” 是按位或操作,能进行掩码叠加;F_SETFL 是设置文件状态标志,在原来的基础上增加非阻塞功能
	//  函数 fcntl() 作用:读取 clientfd 当前的所有文件状态标志
	//  返回值:包含位掩码的整数,表示当前所有设置的标志
	//          O_RDONLY:只读模式 (0);O_WRONLY:只写模式 (1);O_RDWR:读写模式 (2);
	//          O_NONBLOCK:非阻塞模式 (04000);O_APPEND:追加模式 (02000)
	
	event_register(clientfd, EPOLLIN | EPOLLET);  // | EPOLLET 是 clientfd 使用边沿触发模式

	//	这是用于说明情况的
	if ((clientfd % 1000) == 0) {		//	为了使得打印不过分密集,每一千个 I/O 就打印一次

		struct timeval current;
		gettimeofday(&current, NULL);	//	获取当前时间戳,定义在 <sys/time.h> 头文件中

		int time_used = TIME_SUB_MS(current, begin);	//	获取时间差
		memcpy(&begin, &current, sizeof(struct timeval));
		

		printf("accept finshed: %d, time_used: %d\n", clientfd, time_used);

	}
	
	return 0;
}

read_cb 模块

这个函数是有相当多细节的。首先,我们用于计数的变量 count 所用的数字类型是 ssize_t 类型的(在头文件 <sys/types.h> 中定义的),是一个ssize_t 是一个有符号的整数类型,在 64 位操作系统中,范围是 [-263,263-1],是一个相当大的数。这说明这个服务器是有被用户上传视频这种大型数据的能力的。

//	根据情况使用回调函数————普通套接字 clientfd 的读事件是 recv_cb  读取内存
//	在边沿模式下,该函数要循环执行
int recv_cb(int fd) {

	ssize_t count=0;
	char buffer[BUFFER_LENGTH] = {0}; 		
	//struct conn* c = &conn_list[fd];  // 使用局部变量简化代码

	if (conn_list[fd].rbuffer != NULL) {	//	连接复用,二次收信息的时候
		free(conn_list[fd].rbuffer);
		conn_list[fd].rbuffer = NULL; // 防止重复释放
	}
	conn_list[fd].rlength =0;
	conn_list[fd].rcap = BUFFER_LENGTH; 
	
	while (1) {
		
		count = recv(fd, buffer, BUFFER_LENGTH, 0);	//	读取固定长度的内容,0 代表以阻塞模式读取数据
		 
		
		// 因为 clientfd 不是阻塞模式, recv 不会因无输入而阻断,而是会立即返回 -1
		// 函数 recv: 读取固定字节的内容。
		// 返回值 > 0:表示成功接收了数据,返回值表示实际接收到的字节数。
		// 返回值 == 0:表示对端已经关闭了连接(TCP连接的正常关闭)。这是TCP协议的对端关闭连接的标志。
		// 返回值 == -1:表示无信息可读。并设置错误码为 EAGAIN 或 EWOULDBLOCK。这表示当前没有数据可读,但连接仍然有效。
		// 当调用 recv 时,会触发用户态到内核态的切换,内核负责从套接字接收缓冲区复制数据到用户提供的缓冲区,内核会更新接收缓冲区的状态(如移除已读取数据)
		if (count == 0) { 	// disconnect
			
			close_connection(fd); // 连接断开必须要清理套接字资源,前面已经清理过一次了,避免在次释放

			printf("client disconnect: %d\n", fd);
			//	当客户端主动发来断开连接的请求时,
			
			return -1;
		} else if (count < 0) { 
			if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 无数据可读,退出循环
				
                return 0;
            }
            else {
                printf("recv error: %s\n", strerror(errno));
                close_connection(fd);
                return -2;
            }

		} else {
            // 使用结构体中的长度和容量字段
            ssize_t new_length = conn_list[fd].rlength + count;

            // 确保缓冲区存在,这是下文能成功使用 memcpy 的原因
            if (!conn_list[fd].rbuffer) {
                conn_list[fd].rbuffer = malloc(BUFFER_LENGTH);
                if (!conn_list[fd].rbuffer) {
                    perror("malloc failed");
                    close_connection(fd);
                    return -3;
                }
                conn_list[fd].rcap = BUFFER_LENGTH;
                conn_list[fd].rlength = 0;
            }

            // 动态扩容
            if (new_length > conn_list[fd].rcap) {
                ssize_t new_cap = conn_list[fd].rcap * 2;
                char *new_buf = realloc(conn_list[fd].rbuffer, new_cap);
                if (!new_buf) {
                    perror("realloc failed");
                    close_connection(fd);
                    return -3;
                }
                conn_list[fd].rbuffer = new_buf;
                conn_list[fd].rcap = new_cap;
            }
            
            // 复制数据
            memcpy(conn_list[fd].rbuffer + conn_list[fd].rlength, buffer, count);	//	我在这个地方多次出错,memcpy 用的不好,我在此处爆栈了,
			
            conn_list[fd].rlength = new_length;
            
            // 安全打印(指定长度)
            // printf("[%zd]RECV: %.*s\n", count, (int)count, buffer);	//	这是特殊的占位符用法
        }
		
		memset(buffer,0,BUFFER_LENGTH); 	// 重置读取的缓冲区,他是固定长度的,用于接收 fd 里面的字节,每次只读取一点点
	}

	return 0;
}

这个函数先是对初始化 conn_list 的读缓冲区和读缓冲区的长度和容量进行初始化,而后采用边沿读取的方法,无限循化地读取,直至读完。边沿触发模式下,事件只在状态发生变化时通知一次,不会因为缓冲区中持续有数据而反复触发。这大大减少了epoll_wait的调用次数,降低了内核与用户态之间的上下文切换开销,从而显著提高了性能。边沿触发模式通常与非阻塞I/O搭配使用。在非阻塞模式下,程序会尽可能多地读取或写入数据,直到遇到EAGAINEWOULDBLOCK错误为止。这种模式下,边沿触发能够更好地发挥其优势。

  • 当用户远程断开连接时,会自动给服务器发送信息,触发 EPOLL 的读事件 event,并且使用 recv 函数后,返回值是 0
  • 当文件读取完毕后,由于 clientfd 被设置成非阻塞模式,recv 不会因无输入而阻断,而是会立即返回 -1,对应的错误是 EAGAINEWOULDBLOCK 。但这是正常现象。换句话说,在阻塞模式下,是不存在这个错误的。
  • 我们在读取 I/O 文件内容时,是使用了动态内存扩充的方法,这使得我们的服务器可以接受超大型的数据。

send_cb 模块

在处理网络输出(即 EPOLLOUT 事件)时,套接字 clientfd 的模式被函数set_event 调整为阻塞模式,可保证发送到内核缓冲区(即套接字对应的 I/O 文件上),但不保证完整传输到对端(受网络状况影响),因而还需要循环发送。我们还需设置超时处理,保证可以全部发送。

//	挂起等待,写事件就绪,处理 send 函数的 Bug
void wait_for_socket_writable(int sockfd) {
    // 需提前创建epoll实例并注册事件
	int epoll_fd_1 = epoll_create1(0);
	struct epoll_event ev;
	ev.events = EPOLLOUT;  // 监听可写
	ev.data.fd = sockfd;
	epoll_ctl(epoll_fd_1, EPOLL_CTL_ADD, sockfd, &ev);

	// 等待事件触发,无限阻塞
	epoll_wait(epoll_fd_1, &ev, 1, -1);
	close(epoll_fd_1); // 添加关闭
}

//	确保数据全部发送
//	即使使用阻塞 sockfd 也必须循环发送!因为阻塞模式只保证发送到内核缓冲区,不保证完整传输到对端(受网络状况影响)。
ssize_t send_all(int sockfd, const void *buf, size_t len, int flags) {
    ssize_t total_sent = 0;      // 已发送字节数
    const char *ptr = (const char *)buf;  // 移动指针指向未发送数据

	//	即使使用阻塞 sockfd 也必须循环发送!因为阻塞模式只保证发送到内核缓冲区,不保证完整传输到对端(受网络状况影响)。
    while (total_sent < len) {
        // 尝试发送剩余数据
        ssize_t n = send(sockfd, ptr, len - total_sent, flags);
        
        if (n < 0) {
            // 错误处理(重点!)
            if (errno == EINTR) continue;   // 信号中断:重试
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 阻塞模式:等待可写(需配合 select/poll/epoll)
                wait_for_socket_writable(sockfd);
                continue;
            }
            return -1;  // 其他错误(如连接断开)
        } 
        else if (n == 0) {
			
			printf("client disconnect: %d\n", sockfd);
			close_connection(sockfd);
            return total_sent; // 连接关闭(部分发送)
        }

        // 更新状态
        total_sent += n;
        ptr += n;  // 移动指针到未发送数据位置
    }
    return total_sent;  // 返回实际发送的字节数(应等于 len)
}

综合以上两个函数,我们还指示了业务处理位置。

//	根据情况使用回调函数————普通套接字 clientfd 的输出事件是 send_cb  发送内容
//	在边沿模式下,该函数要循环执行
int send_cb(int fd) {
	
	if (conn_list[fd].wbuffer != NULL) {	//	连接复用,二次发送信息的时候,写缓冲区的初始化
		free(conn_list[fd].wbuffer);	
		conn_list[fd].wbuffer = NULL; // 防止重复释放
	}
	conn_list[fd].wlength =0;

	///		响应操作的业务端(开始)	 /

	///		响应操作的业务端(结束)	 /


	ssize_t count = 0;
	if (conn_list[fd].wlength != 0) {
		count = send_all(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
	}
	
	printf("SEND: %zd\n", count);
	
	return count;
}

服务器代码

结合前面的流程图,我们可以给出这个服务器的代码

int main() {

	unsigned short port = 2000;	// 	端口 port

	epfd = epoll_create(1);		//	epfd 已经被声明为一个全局变量
	int listen_fds[MAX_PORTS] = {0}; // 存储所有监听socket

	int i = 0;

	for (i = 0; i < MAX_PORTS; i++) {
		int sockfd = init_server(port + i);
		listen_fds[i] = sockfd;
		conn_list[sockfd].fd = sockfd;  //  使用fd作为索引
		conn_list[sockfd].r_action.accept_callback = accept_cb;
		set_event(sockfd, EPOLLIN, 1);
	}


	gettimeofday(&begin, NULL);		//	获取最初的时间戳,begin 是全局变量

	while (1) { // mainloop	服务器的根本
		
		struct epoll_event events[1024] = {0};
		int nready = epoll_wait(epfd, events, 1024, 5);	//	5 指代等待 5 毫秒,如果 I/O 响应满员的话,立即返回
		//	该通知函数是需要调用系统内核的,是需要花费成本的,如果只是通知某些内容没有读完而调用,是极其得不偿失的,这是我们需要用到边沿触发的原因,只是编程难度更大了
		//	我们要重视操作系统内核的调动,系统 I/O 并不全体现在代码之上,而代码要考虑操作系统内核可能出现的情况;编写代码要看到代码之外的东西

		int i = 0;
		for (i = 0;i < nready;i ++) {

			int connfd = events[i].data.fd;
			int is_listener = 0;	//	状态机-重置

			// 当连接异常断开时未清理资源,添加错误处理:
			if (events[i].events & EPOLLERR || events[i].events & EPOLLHUP) {	// 要注意 “|” 是按位或操作,能进行掩码叠加;“&” 是按位与操作
				//  EPOLLHUP 表示对应的文件描述符被挂断。EPOLLERR 表示对应的文件描述符发生错误。
				
				close_connection(connfd);
				continue;
			}
			
			// 检查是否为监听套接字
			for (int j = 0; j < MAX_PORTS; j++) {
				if (connfd == listen_fds[j]) {
					conn_list[connfd].r_action.accept_callback(connfd);
					is_listener = 1;
					break;
				}
			}
			if (is_listener) continue;	//	状态机-进入下一个环节

			

			
			// 处理读事件
			if (events[i].events & EPOLLIN) {
				// 这是监控到了除 sockfd 以外的套接字
				// ET 边沿模式需循环读取,原因是要保证网络 I/O 所有内容都被读取!
				if (conn_list[connfd].recv_callback(connfd) >= 0) { // 返回0表示成功
					printf("[%ld] RECV: %s\n",conn_list[connfd].rlength, conn_list[connfd].rbuffer);
				} else {
					//	连接在 recv 函数处早已释放
					continue; // 跳过后续处理
				}
				///		读操作的业务端(start)     
				//	至此,客户端的请求报文全部写完了,写入了 rbuffer 之中。我们要利用这个内存去执行业务操作
				//	实现 HTTP 请求,在代码里面,该函数只是形式上存在,并不是重点
				//	http_request(&conn_list[fd]);


				//	WebSocket 协议的请求
				// 	ws_request(&conn_list[fd]);
				///		读操作的业务端(start)     

				set_event(connfd, EPOLLOUT, 0);
				

			} else if (events[i].events & EPOLLOUT) {

				//	这里必须注意 EPOLLOUT 不是边沿事件触发模式,我们通常只把响应报文的 header 写入 wbuffer 中,长度是固定的,因此水平出发即可。
				//	至于那大段大段的文件资源传输,则是通过文件描述符之间操作完成,跳过缓冲区读写
				
				// send 会因无输输出而阻断
				// 函数 send: 发送固定字节的内容。
				// 返回值 > 0:表示成功发送了数据,返回值表示实际发送的字节数。
				// 返回值 == -1:表示发送操作失败。错误原因可以通过 errno 获取
				
				
				conn_list[connfd].send_callback(connfd);

				//	如果是为了测试百万并发,则用这个,不要把 fd 关闭
				set_event(connfd, EPOLLIN | EPOLLET, 0);	// 这次输出发送事件结束了;改为 “边沿读写模式”,执行 epoll_ctl 函数,系统内核会直接把 fd 加载到 epoll 的就绪集之中



			}	else {
				printf("Unknown event on clientfd: %d, errno:%d\n", connfd, errno);
            	close_connection(connfd);
			}
			
		}

	}
	

}

我们注意到,对于模块函数 accept_cb 的使用,我们借助了 “状态机” 的思路,即代码中的 is_listener,让情况得以分类。

代码运行效果

代码编译

qiming@qiming:~/share/CTASK/TCP_test$ gcc -o reactor reactor.c

程序执行,一开始没链接的时候,程序并没有挂起,而是作无意义的 while 死循环,原因是代码中的 epoll_wait(epfd, events, 1024, 5) 并非阻塞运行,而是等待 5 毫秒后运行下一行代码。

qiming@qiming:~/share/CTASK/TCP_test$ ./reactor

NetAssist 远程连接
在这里插入图片描述
发送信息(我们并没发送什么东西,只是象征性的设置,读者可自行设置,输出)

qiming@qiming:~/share/CTASK/TCP_test$ ./reactor
[13244] RECV: The drawings convey the significance of self-discipline and hard-work. The youngster on the left concentrates on her assignments and shows no intention of putting them off. In contrast, the man on the right exhibits a desire for delaying his task and holds that he doesn't go cracking until the deadline.
As the thought-provoking pictures intend to mirror, the idle boy as mentioned above is not likely to finish his assignments in the end, any more than a man can attain great achievement if he is accustomed to putting tasks off. For one thing, the habits shape the turns of our attitudes and behaviors of how to deal with our plans and tasks. In the course of pursuing our goals, we may face a great deal of temp-
tations and adversities, and a good habit of self-discipline can direct us to make right decisions and refrain from indulging us with entertainment. For another, if we fail to be self-disciplined, we will be deprived of the opportunities for forging the brilliant traits, like mental maturity, fortitude, creativity and so on, which will render us considerable autonomy and lead us to be successful.
Personally, we can't underscore the significance of the good habits too much. It is advisable that we should concentrate on our daily tasks through strict time management and enjoy the course of striving.
The drawings convey the significance of self-discipline and hard-work. The youngster on the left concentrates on her assignments and shows no intention of putting them off. In contrast, the man on the right exhibits a desire for delaying his task and holds that he doesn't go cracking until the deadline.
As the thought-provoking pictures intend to mirror, the idle boy as mentioned above is not likely to finish his assignments in the end, any more than a man can attain great achievement if he is accustomed to putting tasks off. For one thing, the habits shape the turns of our attitudes and behaviors of how to deal with our plans and tasks. In the course of pursuing our goals, we may face a great deal of temp-
tations and adversities, and a good habit of self-discipline can direct us to make right decisions and refrain from indulging us with entertainment. For another, if we fail to be self-disciplined, we will be deprived of the opportunities for forging the brilliant traits, like mental maturity, fortitude, creativity and so on, which will render us considerable autonomy and lead us to be successful.
Personally, we can't underscore the significance of the good habits too much. It is advisable that we should concentrate on our daily tasks through strict time management and enjoy the course of striving.The drawings convey the significance of self-discipline and hard-work. The youngster on the left concentrates on her assignments and shows n

我发送了若干段考研英语一的大作文(同一篇文章多次复制粘贴),模拟客户端发送的内容非常多,多到超过了一开始设置的读缓冲区容量 BUFFER_LENGTH,即 1 KB 的内容。我们注意到

[13244] RECV: The drawings ... 略

共发送了 1 万多个字节的内容,说明我们的动态内存扩充的代码是有效的。

而且接受完信息后,还可以发送信息,说明事件转化的设计也是有效的(在输出的最末端,我们可在命令行处注意到)。

SEND: 0

这样一来,我们这个服务器是可以完成一个准网络 I/O 事务的。完整的网络 I/O 事务是

客户端 服务器 第一步,网络请求 第二步,网络响应 第三步,客户端处理响应 客户端 服务器

总结

本篇文章,对上一篇 文章 的 EPOLL 服务器作出模块化的升级,让代码更有组织度的同时,还有以下的特性

  1. 定义了 conn 类型,它能记录对应网络 I/O 文件的输入输出内容,而且还记录了对应套接字的输入和输出行为。类似于 “多态” 的概念,同类型的两个变量内同一成员名字有不同含义。让命名变得简单了许多。
  2. 对任何套接字(无论是否为监听套接字)设置了根据事件信号而触发的网络 I/O 行为。这就是 “Reactor” 这个名字的由来。
  3. 对于 recv_cb 函数设计了可动态扩充读缓冲区的功能,能够接收超大型数据。
  4. 函数 set_event 能够有效的进行事件转化,使之同一个套接字的事件能够在 EPOLLIN | EPOLLETEPOLLOUT 之间来回转换,刚好适应了网络 I/O 的事务模式。

在下一篇文章里,我们针对这个事件驱动 reactor 式的服务器(还是这份代码)测试其百万并发的能力。


网站公告

今日签到

点亮在社区的每一天
去签到