服务器并发实现的五种方法

发布于:2025-05-22 ⋅ 阅读:(16) ⋅ 点赞:(0)

前言

关于网络编程相关知识可看我之前写过的文章:

一、单线程 / 进程

在TCP通信过程中,服务器端启动之后可以同时和多个客户端建立连接,并进行网络通信,在代码中经常会用到三个引起程序阻塞的函数,分别是:

  • accept():如果服务器端没有新客户端连接,阻塞当前进程/线程,如果检测到新连接解除阻塞,建立连接
  • read():如果通信的套接字对应的读缓冲区没有数据,阻塞当前进程/线程,检测到数据解除阻塞,接收数据
  • write():如果通信的套接字写缓冲区被写满了,阻塞当前进程/线程(这种情况比较少见)

如果需要和发起新的连接请求的客户端建立连接,那么就必须在服务器端通过一个循环调用 accept() 函数,另外已经和服务器建立连接的客户端需要和服务器通信,发送数据时的阻塞可以忽略,当接收不到数据时程序也会被阻塞,这时候就会非常矛盾,被accept()阻塞就无法通信,被 read() 阻塞就无法和客户端建立新连接。因此得出一个结论,基于上述处理方式,在单线程/单进程场景下,服务器是无法处理多连接的,解决方案也有很多,常用的有三种:

  • 使用多线程实现
  • 使用多进程实现
  • 使用IO多路转接(复用)实现
  • 使用IO多路转接 + 多线程实现

二、多进程并发

如果要编写多进程版的并发服务器程序,首先要考虑,创建出的多个进程都是什么角色,这样就可以在程序中对号入座了。在Tcp服务器端一共有两个角色,分别是:监听和通信,监听是一个持续的动作,如果有新连接就建立连接,如果没有新连接就阻塞。关于通信是需要和多个客户端同时进行的,因此需要多个进程,这样才能达到互不影响的效果。进程也有两大类:父进程和子进程,通过分析我们可以这样分配进程:

  • 父进程:

    • 负责监听,处理客户端的连接请求,也就是在父进程中循环调用accept()函数
    • 创建子进程:建立一个新的连接,就创建一个新的子进程,让这个子进程和对应的客户端通信
    • 回收子进程资源:子进程退出回收其内核PCB资源,防止出现僵尸进程
  • 子进程:负责通信,基于父进程建立新连接之后得到的文件描述符,和对应的客户端完成数据的接收和发送。

    • 发送数据:send() / write()
    • 接收数据:recv() / read()

在多进程版的服务器端程序中,多个进程是有血缘关系,对应有血缘关系的进程来说,还需要想明白他们有哪些资源是可以被继承的,哪些资源是独占的,以及一些其他细节:

  • 子进程是父进程的拷贝,在子进程的内核区PCB中,文件描述符也是可以被拷贝的,因此在父进程可以使用的文件描述符在子进程中也有一份,并且可以使用它们做和父进程一样的事情。
  • 父子进程有用各自的独立的虚拟地址空间,因此所有的资源都是独占的
  • 为了节省系统资源,对于只有在父进程才能用到的资源,可以在子进程中将其释放掉,父进程亦如此。
  • 由于需要在父进程中做accept()操作,并且要释放子进程资源,如果想要更高效一下可以使用信号的方式处理

服务器代码:

#include <sys/types.h>			/* See NOTES */
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>
#include <errno.h>
#include <signal.h>


#define SERVER_PORT 8888
#define BACKLOG     10

// 信号处理函数
void callback(int num)
{
    while(1)
    {
        pid_t pid = waitpid(-1, NULL, WNOHANG);
        if(pid <= 0)
        {
            printf("子进程正在运行, 或者子进程被回收完毕了\n");
            break;
        }
        printf("child die, pid = %d\n", pid);
    }
}

int childWork(int cfd);


int main(int argc, char **argv)
{
	int iSocketServer;
	int iSocketClient;
	struct sockaddr_in tSocketServerAddr;
	struct sockaddr_in tSocketClientAddr;

	int iRet;
	int iAddrlen;
	int iClientNum = 0;
	int cnt = 0;

	int iRcvLen;
	int iSendLen;
	unsigned char ucSendBuf[1000];
	unsigned char ucRcvBuf[1000];
	
	/* 1. socket */
	iSocketServer = socket(AF_INET, SOCK_STREAM, 0); 
	
	tSocketServerAddr.sin_addr.s_addr = INADDR_ANY;
	tSocketServerAddr.sin_family      = AF_INET;
	tSocketServerAddr.sin_port  	  = htons(SERVER_PORT);
	memset(tSocketServerAddr.sin_zero, 0, 8);

	/* 2. bind   */
	iRet = bind(iSocketServer, (const struct sockaddr *)&tSocketServerAddr, sizeof(struct sockaddr_in));

	/* 3. listen */
	iRet = listen(iSocketServer, BACKLOG);

	// 注册信号的捕捉
	// 用于父进程管理子进程的退出状态,避免僵尸进程
	struct sigaction act;
	act.sa_flags = 0;
	act.sa_handler = callback;
	sigemptyset(&act.sa_mask);
	sigaction(SIGCHLD, &act, NULL);
	

	while (1)
	{	
		/* 4. accept */
		iAddrlen = sizeof(struct sockaddr_in);
		iSocketClient = accept(iSocketServer, (struct sockaddr *)&tSocketClientAddr, &iAddrlen);

		if (iSocketClient == -1)
		{
			if (errno == EINTR)
			{
				continue;
			}
			perror("accept");
			exit(0);
		}

		iClientNum++;
		printf("get connect from client %d:%s\n", iClientNum, inet_ntoa(tSocketClientAddr.sin_addr));

		/* 创建一个子进程 */
		pid_t pid = fork();
		if (pid == 0)
		{
	
			// 子进程 -> 和客户端通信
			// 通信的文件描述符cfd被拷贝到子进程中
			// 子进程不负责监听
			close(iSocketServer);
			while(1)
			{
				int ret = childWork(iSocketClient);
				if(ret <= 0)
				{
					break;
				}
			}
			// 退出子进程
			close(iSocketClient);
			exit(0);
		}	
		else if (pid > 0)
		{
            // 父进程不和客户端通信
            close(iSocketClient);
		}
	}
	
	return 0;
}


// 5. 和客户端通信
int childWork(int cfd)
{

    // 接收数据
    char buf[1024];
    memset(buf, 0, sizeof(buf));
    int len = read(cfd, buf, sizeof(buf));
    if(len > 0)
    {
        printf("客户端say: %s\n", buf);
        write(cfd, buf, len);
    }
    else if(len  == 0)
    {
        printf("客户端断开了连接...\n");
    }
    else
    {
        perror("read");
    }

    return len;
}

客户端代码:

#include <sys/types.h>			/* See NOTES */
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>

#define SERVER_PORT 8888

int main(int argc, char **argv)
{
	int iSocketClient;
	struct sockaddr_in tSocketClientAddr;

	int iRet;
	int iClientNum = 0;
	int iSendLen;
	int iRcvLen;
	
	unsigned char ucSendBuf[1000];
	unsigned char ucRcvBuf[1000];

	if (argc != 2)
	{
		printf("Usage:%s IP\n", argv[0]);
		return -1;
	}
	
	/* 1. socket  */
	iSocketClient = socket(AF_INET, SOCK_STREAM, 0); 

	if (-1 == iSocketClient)
	{	
		printf("socket error!\n");
		return -1;
	}
	
	tSocketClientAddr.sin_family = AF_INET;
	tSocketClientAddr.sin_port   = htons(SERVER_PORT);

	/* 2. inet_aton */
	iRet = inet_aton(argv[1], &tSocketClientAddr.sin_addr);
	if (0 == iRet)
	{
		printf("inet_aton error!\n");
		return -1;
	}
	memset(tSocketClientAddr.sin_zero, 0, 8);
	
	/* 3. connect  */
	iRet = connect(iSocketClient, (struct sockaddr *)&tSocketClientAddr, sizeof(struct sockaddr_in));
	if (-1 == iRet)
	{	
		printf("connect error!\n");
		return -1;
	}

	while (1)
	{	
		/* 用来读取终端输入的一行数据 */
		if (fgets(ucSendBuf, 999, stdin))
		{
			/* 发送该行数据给服务器 */
			iSendLen = send(iSocketClient, ucSendBuf, strlen(ucSendBuf), 0);
			if (iSendLen <= 0)
			{
				close(iSocketClient);
				return -1;
			}
			
			/* 接收服务器发过来的数据 */
			iRcvLen = recv(iSocketClient, ucRcvBuf, 999, 0); 
			if (iRcvLen > 0)
			{
				ucRcvBuf[iRcvLen] = '\0';
				printf("get msg from server:%s\n", ucRcvBuf);
			}
		}
	}
	
	close(iSocketClient);

	return 0;
}

在上面的示例代码中,父子进程中分别关掉了用不到的文件描述符(父进程不需要通信,子进程也不需要监听)。如果客户端主动断开连接,那么服务器端负责和客户端通信的子进程也就退出了,子进程退出之后会给父进程发送一个叫做 SIGCHLD的信号,在父进程中通过sigaction()函数捕捉了该信号,通过回调函数callback()中的waitpid()对退出的子进程进行了资源回收。

另外还有一个细节要说明一下,这是父进程的处理代码:

int cfd = accept(lfd, (struct sockaddr*)&cliaddr, &clilen);
while(1)
{
        int cfd = accept(lfd, (struct sockaddr*)&cliaddr, &clilen);
        if(cfd == -1)
        {
            if(errno == EINTR)
            {
                // accept调用被信号中断了, 解除阻塞, 返回了-1
                // 重新调用一次accept
                continue;
            }
            perror("accept");
            exit(0);
 
        }
 }

如果父进程调用accept() 函数没有检测到新的客户端连接,父进程就阻塞在这儿了,这时候有子进程退出了,发送信号给父进程,父进程就捕捉到了这个信号SIGCHLD, 由于信号的优先级很高,会打断代码正常的执行流程,因此父进程的阻塞被中断,转而去处理这个信号对应的函数callback(),处理完毕,再次回到accept()位置,但是这是已经无法阻塞了,函数直接返回-1,此时函数调用失败,错误描述为accept: Interrupted system call,对应的错误号为EINTR,由于代码是被信号中断导致的错误,所以可以在程序中对这个错误号进行判断,让父进程重新调用accept(),继续阻塞或者接受客户端的新连接。

三、多线程并发

编写多线程版的并发服务器程序和多进程思路差不多,考虑明白了对号入座即可。多线程中的线程有两大类:主线程(父线程)和子线程,他们分别要在服务器端处理监听和通信流程。根据多进程的处理思路,就可以这样设计了:

  • 主线程:

    • 负责监听,处理客户端的连接请求,也就是在父进程中循环调用accept()函数
    • 创建子线程:建立一个新的连接,就创建一个新的子进程,让这个子进程和对应的客户端通信
    • 回收子线程资源:由于回收需要调用阻塞函数,这样就会影响accept(),直接做线程分离即可。
  • 子线程:负责通信,基于主线程建立新连接之后得到的文件描述符,和对应的客户端完成数据的接收和发送。

    • 发送数据:send() / write()
    • 接收数据:recv() / read()

在多线程版的服务器端程序中,多个线程共用同一个地址空间,有些数据是共享的,有些数据的独占的,下面来分析一些其中的一些细节:

  • 同一地址空间中的多个线程的栈空间是独占的
  • 多个线程共享全局数据区,堆区,以及内核区的文件描述符等资源,因此需要注意数据覆盖问题,并且在多个线程访问共享资源的时候,还需要进行线程同步。

服务器代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <pthread.h>


#define SERVER_PORT 8888
#define BACKLOG     10

struct SockInfo
{
    int fd;                      // 通信
    pthread_t tid;               // 线程ID
    struct sockaddr_in addr;     // 地址信息
};

struct SockInfo infos[128];


void* working(void* arg)
{
    while(1)
    {
        struct SockInfo* info = (struct SockInfo*)arg;
        // 接收数据
        char buf[1024];
        int ret = read(info->fd, buf, sizeof(buf));
        if(ret == 0)
        {
            printf("客户端已经关闭连接...\n");
            info->fd = -1;
            break;
        }
        else if(ret == -1)
        {
            printf("接收数据失败...\n");
            info->fd = -1;
            break;
        }
        else
        {        
        	printf("客户端say: %s\n", buf);
            write(info->fd, buf, strlen(buf) + 1);
        }
    }
    return NULL;
}


int main(int argc, char **argv)
{
	int iSocketServer;
	int iSocketClient;
	struct sockaddr_in tSocketServerAddr;

	int iRet;
	
	/* 1. socket */
	iSocketServer = socket(AF_INET, SOCK_STREAM, 0); 
	
	tSocketServerAddr.sin_addr.s_addr = INADDR_ANY;
	tSocketServerAddr.sin_family      = AF_INET;
	tSocketServerAddr.sin_port  	  = htons(SERVER_PORT);
	memset(tSocketServerAddr.sin_zero, 0, 8);

	/* 2. bind   */
	iRet = bind(iSocketServer, (const struct sockaddr *)&tSocketServerAddr, sizeof(struct sockaddr_in));

	/* 3. listen */
	iRet = listen(iSocketServer, BACKLOG);

	int len = sizeof(struct sockaddr);
	
	// 数据初始化
    int max = sizeof(infos) / sizeof(infos[0]);
    for(int i = 0; i < max; i++)
    {
        memset(&infos[i], 0, sizeof(infos[i]));
        infos[i].fd = -1;
        infos[i].tid = -1;
    }


	while (1)
	{	
		// 创建子线程
		struct SockInfo* pinfo;
		for(int i = 0; i < max; i++)
		{
			if(infos[i].fd == -1)
			{
				pinfo = &infos[i];
				break;
			}
			if(i == max-1)
			{
				sleep(1);
				i--;
			}
		}
		
		/* 4. accept */
		int iSocketClient = accept(iSocketServer, (struct sockaddr*)&pinfo->addr, &len);
		printf("parent thread, iSocketClient: %d\n", iSocketClient);
		
		if(iSocketClient == -1)
		{
			perror("accept");
			exit(0);
		}
		pinfo->fd = iSocketClient;
		pthread_create(&pinfo->tid, NULL, working, pinfo);
		pthread_detach(pinfo->tid);
	}

	close(iSocketServer);
	
	return 0;
}

客户端代码:

#include <sys/types.h>			/* See NOTES */
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>

#define SERVER_PORT 8888

int main(int argc, char **argv)
{
	int iSocketClient;
	struct sockaddr_in tSocketClientAddr;

	int iRet;
	int iClientNum = 0;
	int iSendLen;
	int iRcvLen;
	
	unsigned char ucSendBuf[1000];
	unsigned char ucRcvBuf[1000];

	if (argc != 2)
	{
		printf("Usage:%s IP\n", argv[0]);
		return -1;
	}
	
	/* 1. socket  */
	iSocketClient = socket(AF_INET, SOCK_STREAM, 0); 

	if (-1 == iSocketClient)
	{	
		printf("socket error!\n");
		return -1;
	}
	
	tSocketClientAddr.sin_family = AF_INET;
	tSocketClientAddr.sin_port   = htons(SERVER_PORT);

	/* 2. inet_aton */
	iRet = inet_aton(argv[1], &tSocketClientAddr.sin_addr);
	if (0 == iRet)
	{
		printf("inet_aton error!\n");
		return -1;
	}
	memset(tSocketClientAddr.sin_zero, 0, 8);
	
	/* 3. connect  */
	iRet = connect(iSocketClient, (struct sockaddr *)&tSocketClientAddr, sizeof(struct sockaddr_in));
	if (-1 == iRet)
	{	
		printf("connect error!\n");
		return -1;
	}

	while (1)
	{	
		/* 用来读取终端输入的一行数据 */
		if (fgets(ucSendBuf, 999, stdin))
		{
			/* 发送该行数据给服务器 */
			iSendLen = send(iSocketClient, ucSendBuf, strlen(ucSendBuf), 0);
			if (iSendLen <= 0)
			{
				close(iSocketClient);
				return -1;
			}
			
			/* 接收服务器发过来的数据 */
			iRcvLen = recv(iSocketClient, ucRcvBuf, 999, 0); 
			if (iRcvLen > 0)
			{
				ucRcvBuf[iRcvLen] = '\0';
				printf("get msg from server:%s\n", ucRcvBuf);
			}
		}
	}
	
	close(iSocketClient);

	return 0;
}

编译运行结果:
在这里插入图片描述

四、IO多路转接(复用)select

服务器代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>

#define SERVER_PORT 8888
#define BACKLOG     10


int main(int argc, char **argv)
{
	int iSocketServer;
	int iSocketClient;
	struct sockaddr_in tSocketServerAddr;
	struct sockaddr_in tSocketClientAddr;

	int iRet;

	/* 1. socket */
	iSocketServer = socket(AF_INET, SOCK_STREAM, 0); 
	
	tSocketServerAddr.sin_addr.s_addr = INADDR_ANY;
	tSocketServerAddr.sin_family      = AF_INET;
	tSocketServerAddr.sin_port  	  = htons(SERVER_PORT);
	memset(tSocketServerAddr.sin_zero, 0, 8);

	/* 2. bind   */
	iRet = bind(iSocketServer, (const struct sockaddr *)&tSocketServerAddr, sizeof(struct sockaddr_in));

	/* 3. listen */
	iRet = listen(iSocketServer, BACKLOG);
	

	// 将监听的fd的状态检测委托给内核检测
	int maxfd = iSocketServer;
	// 初始化检测的读集合
	fd_set rdset;
	fd_set rdtemp;
	// 清零
	FD_ZERO(&rdset);
	// 将监听的iSocketServer设置到检测的读集合中
	FD_SET(iSocketServer, &rdset);
	// 通过select委托内核检测读集合中的文件描述符状态, 检测read缓冲区有没有数据
	// 如果有数据, select解除阻塞返回
	// 应该让内核持续检测

	while (1)
	{	
		// 默认阻塞
		// rdset 中是委托内核检测的所有的文件描述符
		rdtemp = rdset;
		int num = select(maxfd + 1, &rdtemp, NULL, NULL, NULL);
		// rdset中的数据被内核改写了, 只保留了发生变化的文件描述的标志位上的1, 没变化的改为0
		// 只要rdset中的fd对应的标志位为1 -> 缓冲区有数据了
		// 判断
		// 有没有新连接
		if(FD_ISSET(iSocketServer, &rdtemp))
		{
			// 接受连接请求, 这个调用不阻塞
			int cliLen = sizeof(tSocketClientAddr);
			int iSocketClient = accept(iSocketServer, (struct sockaddr*)&tSocketClientAddr, &cliLen);

			// 得到了有效的文件描述符
			// 通信的文件描述符添加到读集合
			// 在下一轮select检测的时候, 就能得到缓冲区的状态
			FD_SET(iSocketClient, &rdset);
			// 重置最大的文件描述符
			maxfd = iSocketClient > maxfd ? iSocketClient : maxfd;
		}

		// 没有新连接, 通信
		for(int i = 0; i < maxfd + 1; i++)
		{
			// 排除监听服务器套接字,仅处理客户端套接字
			if(i != iSocketServer && FD_ISSET(i, &rdtemp))
			{
				// 接收数据
				char buf[10] = {0};
				// 一次只能接收10个字节, 客户端一次发送100个字节
				// 一次是接收不完的, 文件描述符对应的读缓冲区中还有数据
				// 下一轮select检测的时候, 内核还会标记这个文件描述符缓冲区有数据 -> 再读一次
				//	循环会一直持续, 知道缓冲区数据被读完位置
				int len = read(i, buf, sizeof(buf));
				if(len == 0)
				{
					printf("客户端关闭了连接...\n");
					// 将检测的文件描述符从读集合中删除
					FD_CLR(i, &rdset);
					close(i);
				}
				else if(len > 0)
				{
					// 收到了数据
		        	printf("客户端say: %s\n", buf);
					// 发送数据
					write(i, buf, strlen(buf)+1);
				}
				else
				{
					// 异常
					perror("read");
				}
			}
		}
	}

	close(iSocketServer);
	
	return 0;
}

客户端代码与之前一样

五、IO多路转接(复用)poll

poll的机制与select类似,与select在本质上没有多大差别,使用方法也类似:

  • 内核对应文件描述符的检测也是以线性的方式进行轮询,根据描述符的状态进行处理;
  • poll和select检测的文件描述符集合会在检测过程中频繁的进行用户区和内核区的拷贝,它的开销随着文件描述符数量的增加而线性增大,从而效率也会越来越低;

不同点:

  • select检测的文件描述符个数上限是1024,poll没有最大文件描述符数量的限制;
  • select可以跨平台使用,poll只能在Linux平台使用;

poll函数的函数原型如下:

#include <poll.h>
// 每个委托poll检测的fd都对应这样一个结构体
struct pollfd {
    int   fd;         /* 委托内核检测的文件描述符 */
    short events;     /* 委托内核检测文件描述符的什么事件 */
    short revents;    /* 文件描述符实际发生的事件 -> 传出 */
};

struct pollfd myfd[100];
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

函数参数:

  • fds: 这是一个 struct pollfd类型的数组, 里边存储了待检测的文件描述符的信息,这个数组中有三个成员:

    • fd:委托内核检测的文件描述符
    • events:委托内核检测的fd事件(输入、输出、错误),每一个事件有多个取值
    • revents:这是一个传出参数,数据由内核写入,存储内核检测之后的结果
      在这里插入图片描述
  • nfds: 这是第一个参数数组中最后一个有效元素的下标 + 1(也可以指定参数1数组的元素总个数)

  • timeout: 指定poll函数的阻塞时长

    • -1:一直阻塞,直到检测的集合中有就绪的文件描述符(有事件产生)解除阻塞
    • 0:不阻塞,不管检测集合中有没有已就绪的文件描述符,函数马上返回
    • 大于0:阻塞指定的毫秒(ms)数之后,解除阻塞
  • 函数返回值:

    • 失败: 返回-1
    • 成功:返回一个大于0的整数,表示检测的集合中已就绪的文件描述符的总个数

服务器代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <poll.h>

#define SERVER_PORT 8080
#define BACKLOG     10

int main(int argc, char **argv)
{
	int iSocketServer;
	int iSocketClient;
	struct sockaddr_in tSocketServerAddr;
	struct sockaddr_in tSocketClientAddr;


	int iRet;
	
	/* 1. socket */
	iSocketServer = socket(AF_INET, SOCK_STREAM, 0); 
	
	tSocketServerAddr.sin_addr.s_addr = INADDR_ANY;
	tSocketServerAddr.sin_family      = AF_INET;
	tSocketServerAddr.sin_port  	  = htons(SERVER_PORT);
	memset(tSocketServerAddr.sin_zero, 0, 8);

	/* 2. bind   */
	iRet = bind(iSocketServer, (const struct sockaddr *)&tSocketServerAddr, sizeof(struct sockaddr_in));

	/* 3. listen */
	iRet = listen(iSocketServer, BACKLOG);

	// 检测 -> 读缓冲区, 委托内核去处理
	// 数据初始化, 创建自定义的文件描述符集
	struct pollfd fds[1024];
	// 初始化
	for(int i = 0; i < 1024; i++)
	{
		fds[i].fd = -1;
		fds[i].events = POLLIN;
	}
	fds[0].fd = iSocketServer;

	int maxfd = 0;
	
	while (1)
	{	
		// 委托内核检测
		iRet = poll(fds, maxfd + 1, -1);
		if(iRet == -1)
		{
			perror("poll");
			exit(0);
		}

		// 内核检测之后的结果为真
		if(fds[0].revents & POLLIN)
		{
			// 接收连接请求
			int len = sizeof(tSocketClientAddr);
			// 这个accept是不会阻塞的
			int iSocketClient = accept(iSocketServer, (struct sockaddr*)&tSocketClientAddr, &len);
			// 委托内核检测iSocketClient的读缓冲区
			int i;
			for(i = 0; i < 1024; i++)
			{
				if(fds[i].fd == -1)
				{
					fds[i].fd = iSocketClient;
					break;
				}
			}
			maxfd = i > maxfd ? i : maxfd;
		}
		
		// 通信, 有客户端发送数据过来
		for(int i = 1; i <= maxfd; i++)
		{
			// 如果在集合中, 说明读缓冲区有数据
			if(fds[i].revents & POLLIN)
			{
				char buf[128];
				int ret = read(fds[i].fd, buf, sizeof(buf));
				if(ret == -1)
				{
					perror("read");
					exit(0);
				}
				else if(ret == 0)
				{
					printf("对方已经关闭了连接...\n");
					close(fds[i].fd);
					fds[i].fd = -1;
				}
				else
				{
					printf("客户端say: %s\n", buf);
					write(fds[i].fd, buf, strlen(buf)+1);
				}
			}
		}

	}

	close(iSocketServer);
	
	return 0;
}

客户端代码与之前一样

六、IO多路转接(复用)epoll

epoll 全称 eventpoll,是 linux 内核实现IO多路转接/复用(IO multiplexing)的一个实现。IO多路转接的意思是在一个操作里同时监听多个输入输出源,在其中一个或多个输入输出源可用的时候返回,然后对其的进行读写操作。epoll是select和poll的升级版,相较于这两个前辈,epoll改进了工作方式,因此它更加高效。

  • 对于待检测集合select和poll是基于线性方式处理的,epoll是基于红黑树来管理待检测集合的。
  • select和poll每次都会线性扫描整个待检测集合,集合越大速度越慢,epoll使用的是回调机制,效率高,处理效率也不会随着检测集合的变大而下降
  • select和poll工作过程中存在内核/用户空间数据的频繁拷贝问题,在epoll中内核和用户区使用的是共享内存(基于mmap内存映射区实现),省去了不必要的内存拷贝。
  • 程序猿需要对select和poll返回的集合进行判断才能知道哪些文件描述符是就绪的,通过epoll可以直接得到已就绪的文件描述符集合,无需再次检测
  • 使用epoll没有最大文件描述符的限制,仅受系统中进程能打开的最大文件数目限制

当多路复用的文件数量庞大、IO流量频繁的时候,一般不太适合使用select()poll(),这种情况下select()poll()表现较差,推荐使用epoll()

在epoll中一共提供是三个API函数,分别处理不同的操作,函数原型如下:

#include <sys/epoll.h>
// 创建epoll实例,通过一棵红黑树管理待检测集合
int epoll_create(int size);
// 管理红黑树上的文件描述符(添加、修改、删除)
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 检测epoll树中是否有就绪的文件描述符
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

服务器代码:

#include <stdio.h>
#include <ctype.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>


#define SERVER_PORT 8888
#define BACKLOG     10

int main(int argc, char **argv)
{
	int iSocketServer;
	int iSocketClient;
	struct sockaddr_in tSocketServerAddr;
	struct sockaddr_in tSocketClientAddr;

	int iRet;
	int iAddrlen;
	int iClientNum = 0;
	int cnt = 0;

	int iRcvLen;
	int iSendLen;
	unsigned char ucSendBuf[1000];
	unsigned char ucRcvBuf[1000];
	
	/* 1. socket */
	iSocketServer = socket(AF_INET, SOCK_STREAM, 0); 
	
	tSocketServerAddr.sin_addr.s_addr = INADDR_ANY;
	tSocketServerAddr.sin_family      = AF_INET;
	tSocketServerAddr.sin_port  	  = htons(SERVER_PORT);
	memset(tSocketServerAddr.sin_zero, 0, 8);

	// 设置端口复用
	int opt = 1;
	setsockopt(iSocketServer, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
	if(iRet == -1)
	{
		perror("bind error");
		exit(1);
	}


	/* 2. bind   */
	iRet = bind(iSocketServer, (const struct sockaddr *)&tSocketServerAddr, sizeof(struct sockaddr_in));
	if(iRet == -1)
	{
		perror("listen error");
		exit(1);
	}
	


	/* 3. listen */
	iRet = listen(iSocketServer, BACKLOG);

	// 现在只有监听的文件描述符
	// 所有的文件描述符对应读写缓冲区状态都是委托内核进行检测的epoll
	// 创建一个epoll模型
	int epfd = epoll_create(100);
	if(epfd == -1)
	{
		perror("epoll_create");
		exit(0);
	}

	// 往epoll实例中添加需要检测的节点, 现在只有监听的文件描述符
	struct epoll_event ev;
	ev.events = EPOLLIN;	// 检测 iSocketServer 的读缓冲区是否有数据
	ev.data.fd = iSocketServer;
	iRet = epoll_ctl(epfd, EPOLL_CTL_ADD, iSocketServer, &ev);
	if(iRet == -1)
	{
		perror("epoll_ctl");
		exit(0);
	}

	struct epoll_event evs[1024];
	int size = sizeof(evs) / sizeof(struct epoll_event);
	
	// 持续检测
	while (1)
	{	
		// 调用一次, 检测一次
		int num = epoll_wait(epfd, evs, size, -1);
		for(int i = 0; i < num; i++)
		{
			// 取出当前的文件描述符
			int curfd = evs[i].data.fd;
			// 判断这个文件描述符是不是用于监听的
			if(curfd == iSocketServer)
			{
				/* 4. accept */
				iSocketClient = accept(curfd, NULL, NULL);
				// 新得到的文件描述符添加到epoll模型中, 下一轮循环的时候就可以被检测了
				ev.events = EPOLLIN;	// 读缓冲区是否有数据
				ev.data.fd = iSocketClient;
				iRet = epoll_ctl(epfd, EPOLL_CTL_ADD, iSocketClient, &ev);
				if(iRet == -1)
				{
					perror("epoll_ctl-accept");
					exit(0);
				}
			}
			else
			{
				// 处理通信的文件描述符
				// 接收数据
				char buf[1024];
				memset(buf, 0, sizeof(buf));
				int len = recv(curfd, buf, sizeof(buf), 0);
				if(len == 0)
				{
					printf("客户端已经断开了连接\n");
					// 将这个文件描述符从epoll模型中删除
					epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL);
					close(curfd);
				}
				else if(len > 0)
				{
					printf("客户端say: %s\n", buf);
					send(curfd, buf, len, 0);
				}
				else
				{
					perror("recv");
					exit(0);
				} 
			}
		}
	}
	
	return 0;
}

客户端代码与之前一样

当在服务器端循环调用epoll_wait()的时候,会得到一个就绪列表,并通过该函数的第二个参数传出:

struct epoll_event evs[1024];
int num = epoll_wait(epfd, evs, size, -1);

每当 epoll_wait() 函数返回一次,在 evs 中最多可以存储 size 个已就绪的文件描述符信息,但是在这个数组中实际存储的有效元素个数为num个,如果在这个 epoll 实例的红黑树中已就绪的文件描述符很多,并且 evs 数组无法将这些信息全部传出,那么这些信息会在下一次epoll_wait() 函数返回的时候被传出。

通过 evs数组被传递出的每一个有效元素里边都包含了已就绪的文件描述符的相关信息,这些信息并不是凭空得来的,这取决于我们在往epoll 实例中添加节点的时候,往节点中初始化了哪些数据:

struct epoll_event ev;
ev.events = EPOLLIN;	// 检测 iSocketServer 的读缓冲区是否有数据
ev.data.fd = iSocketServer;
iRet = epoll_ctl(epfd, EPOLL_CTL_ADD, iSocketServer, &ev);