select,poll,epoll

发布于:2024-05-07 ⋅ 阅读:(22) ⋅ 点赞:(0)

在 Linux Socket 服务器短编程时,为了处理大量客户的连接请求,需要使用非阻塞I/O和复用,select,poll 和 epoll 是 Linux API 提供的I/O复用方式。

\ select poll epoll
操作方式 遍历 遍历 回调
底层实现 数组 链表 哈希表
IO效率 每次调用都进行线性遍历,时间复杂度为O(n) 每次调用都进行线性遍历,时间复杂度为O(n) 事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪 fd 放到 rdllist 里面。时间复杂度O(1)
最大连接数 1024(x86)或 2048(x64) 无上限 无上限
fd拷贝 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态 每次调用 poll,都需要把 fd 集合从用户态拷贝到内核态 调用 epoll_ctl 时拷贝进内核并保存,之后每次 epoll_wait 不拷贝

select

在一段指定的时间内,监听用户感兴趣的文件描述符上可读、可写和异常等事件。

select 实现多路复用的方式是,将已连接的 Socket 都放到一个文件描述符集合,然后调用 select 函数将文件描述符集合拷贝到内核里,让内核来检查是否有网络事件产生,检查的方式很粗暴,就是通过遍历文件描述符集合的方式,当检查到有事件产生后,将此 Socket 标记为可读或可写, 接着再把整个文件描述符集合拷贝回用户态里,然后用户态还需要再通过遍历的方法找到可读或可写的 Socket,然后再对其处理。

所以,对于 select 这种方式,需要进行 2 次「遍历」文件描述符集合,一次是在内核态里,一个次是在用户态里 ,而且还会发生 2 次「拷贝」文件描述符集合,先从用户空间传入内核空间,由内核修改后,再传出到用户空间中。

select 使用固定长度的 BitsMap,表示文件描述符集合,而且所支持的文件描述符的个数是有限制的,在 Linux 系统中,由内核中的 FD_SETSIZE 限制, 默认最大值为 1024,只能监听 0~1023 的文件描述符。

#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述副就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以 通过遍历fdset,来找到就绪的描述符。

参数:

nfds : 最大的文件描述符加1。
readfds: 用于检查可读的。
writefds:用于检查可写性。
exceptfds:用于检查异常的数据。
timeout:一个指向 timeval 结构的指针,用于决定 select 等待 I/O 的最长时间。如果为空将一直等待。
timeval 结构的定义:

struct timeval{
	long tv_sec; // seconds
	long tv_usec; // microseconds
}

返回值>0 是已就绪的文件句柄的总数, =0 超时,<0表示出错,错误: errno

void FD_CLR(int fd, fd_set *set);	// 把文件描述符集合里fd清0
int  FD_ISSET(int fd, fd_set *set); // 测试文件描述符集合里fd是否置1
void FD_SET(int fd, fd_set *set);   // 把文件描述符集合里fd位置1
void FD_ZERO(fd_set *set);		   //把文件描述符集合里所有位清0

在这里插入图片描述
传统 select/pol l的另一个致命弱点就是当你拥有一个很大的 socket 集合,由于网络得延时,使得任一时间只有部分的 socket 是"活跃" 的,而 select/poll 每次调用都会线性扫描全部的集合,导致效率呈现线性下降

但是 epoll 不存在这个问题,它只会对"活跃"的 socket 进行操作—这是因为在内核实现中 epoll 是根据每个 fd 上面的 callback 函数实现的。于是,只有"活跃"的 socket 才会主动去调用 callback函数,其他idle 状态的 socket 则不会,在这点上,epoll实现了一个 "伪"AIO,因为这时候推动力在os内核。

示例

select_server.c

#include <sys/types.h> 
#include <sys/socket.h> 
#include <stdio.h> 
#include <netinet/in.h> 
#include <sys/time.h> 
#include <sys/ioctl.h> 
#include <unistd.h> 
#include <stdlib.h>

int main()
{
    int server_sockfd, client_sockfd;
    int server_len, client_len;
    struct sockaddr_in server_address;
    struct sockaddr_in client_address;
    int result;
    fd_set readfds, testfds;
    server_sockfd = socket(AF_INET, SOCK_STREAM, 0);//建立服务器端socket 
    server_address.sin_family = AF_INET;
    server_address.sin_addr.s_addr = htonl(INADDR_ANY);
    server_address.sin_port = htons(9000);
    server_len = sizeof(server_address);
    bind(server_sockfd, (struct sockaddr*)&server_address, server_len);
    listen(server_sockfd, 5); //监听队列最多容纳5个 
    FD_ZERO(&readfds);
    FD_SET(server_sockfd, &readfds);//将服务器端socket加入到集合中
    while (1)
    {
        char ch;
        int fd;
        int nread;
        testfds = readfds;//将需要监视的描述符集copy到select查询队列中,select会对其修改,所以一定要分开使用变量 
        printf("server waiting\n");

        /*无限期阻塞,并测试文件描述符变动 */
        result = select(FD_SETSIZE, &testfds, (fd_set*)0, (fd_set*)0, (struct timeval*)0); //FD_SETSIZE:系统默认的最大文件描述符
        if (result < 1)
        {
            perror("server5");
            exit(1);
        }

        /*扫描所有的文件描述符*/
        for (fd = 0; fd < FD_SETSIZE; fd++)
        {
            /*找到相关文件描述符*/
            if (FD_ISSET(fd, &testfds))
            {
                /*判断是否为服务器套接字,是则表示为客户请求连接。*/
                if (fd == server_sockfd)
                {
                    client_len = sizeof(client_address);
                    client_sockfd = accept(server_sockfd,
                        (struct sockaddr*)&client_address, &client_len);
                    FD_SET(client_sockfd, &readfds);//将客户端socket加入到集合中
                    printf("adding client on fd %d\n", client_sockfd);
                }
                /*客户端socket中有数据请求时*/
                else
                {
                    ioctl(fd, FIONREAD, &nread);//取得数据量交给nread

                    /*客户数据请求完毕,关闭套接字,从集合中清除相应描述符 */
                    if (nread == 0)
                    {
                        close(fd);
                        FD_CLR(fd, &readfds); //去掉关闭的fd
                        printf("removing client on fd %d\n", fd);
                    }
                    /*处理客户数据请求*/
                    else
                    {
                        read(fd, &ch, 1);
                        sleep(5);
                        printf("serving client on fd %d\n", fd);
                        ch++;
                        write(fd, &ch, 1);
                    }
                }
            }
        }
    }

    return 0;
}

select_client.c

#include <sys/types.h> 
#include <sys/socket.h> 
#include <stdio.h> 
#include <netinet/in.h> 
#include <arpa/inet.h> 
#include <unistd.h> 
#include <stdlib.h>
#include <sys/time.h>

int main()
{
    int client_sockfd;
    int len;
    struct sockaddr_in address;//服务器端网络地址结构体 
    int result;
    char ch = 'A';
    client_sockfd = socket(AF_INET, SOCK_STREAM, 0);//建立客户端socket 
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = inet_addr("127.0.0.1");
    address.sin_port = htons(9000);
    len = sizeof(address);
    result = connect(client_sockfd, (struct sockaddr*)&address, len);
    if (result == -1)
    {
        perror("oops: client2");
        exit(1);
    }
    //第一次读写
    write(client_sockfd, &ch, 1);
    read(client_sockfd, &ch, 1);
    printf("the first time: char from server = %c\n", ch);
    sleep(5);

    //第二次读写
    write(client_sockfd, &ch, 1);
    read(client_sockfd, &ch, 1);
    printf("the second time: char from server = %c\n", ch);

    close(client_sockfd);

    return 0;
}

poll

poll 不再用 BitsMap 来存储所关注的文件描述符,取而代之用动态数组,以链表形式来组织,突破了 select 的文件描述符个数限制,当然还会受到系统文件描述符限制。

但是 poll 和 select 并没有太大的本质区别,都是使用 「线性结构」 存储进程关注的 Socket 集合,因此都需要遍历文件描述符集合来找到可读或可写的 Socket,时间复杂度为 O(n),而且也需要在用户态与内核态之间拷贝文件描述符集合,这种方式随着并发数上来,性能的损耗会呈指数级增长。

poll 和select 区别: select 有文件句柄上线设置,值为 FD_SETSIZE,而poll 理论上没有限制!

#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

输入参数:
fds:可以传递多个结构体,也就是说可以监测多个驱动设备所产生的事件,只要有一个产生了请求事件,就能立即返回。

struct pollfd {
      int fd;           /*文件描述符   open打开的那个*/
      short events;     /*请求的事件类型,监视驱动文件的事件掩码*/  POLLIN | POLLOUT
      short revents;    /*驱动文件实际返回的事件*/
}

nfds:监测驱动文件的个数。
timeout:超时时间,单位是ms。

事件类型 events 可以为下列值:
POLLIN 有数据可读
POLLRDNORM 有普通数据可读,等效与POLLIN
POLLPRI 有紧迫数据可读
POLLOUT 写数据不会导致阻塞
POLLER 指定的文件描述符发生错误
POLLHUP 指定的文件描述符挂起事件
POLLNVAL 无效的请求,打不开指定的文件描述符

返回值:
有事件发生 返回revents域不为0的文件描述符个数
超时:return 0
失败:return -1 错误:errno

示例

poll_server.c

#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <stdlib.h>
#include <poll.h>

#define MAX_FD  8192
struct pollfd  fds[MAX_FD];
int cur_max_fd = 0;


int main()
{
    int server_sockfd, client_sockfd;
    int server_len, client_len;
    struct sockaddr_in server_address;
    struct sockaddr_in client_address;
    int result;
    //fd_set readfds, testfds;
    server_sockfd = socket(AF_INET, SOCK_STREAM, 0);//建立服务器端socket
    server_address.sin_family = AF_INET;
    server_address.sin_addr.s_addr = htonl(INADDR_ANY);
    server_address.sin_port = htons(9000);
    server_len = sizeof(server_address);
    bind(server_sockfd, (struct sockaddr*)&server_address, server_len);
    listen(server_sockfd, 5); //监听队列最多容纳5个
    //FD_ZERO(&readfds);
    //FD_SET(server_sockfd, &readfds);//将服务器端socket加入到集合中
    fds[server_sockfd].fd = server_sockfd;
    fds[server_sockfd].events = POLLIN;
    fds[server_sockfd].revents = 0;
    if(cur_max_fd <= server_sockfd)
    {
        cur_max_fd = server_sockfd + 1;
    }


    while (1)
    {
        char ch;
        int i, fd;
        int nread;
        //testfds = readfds;//将需要监视的描述符集copy到select查询队列中,select会对其修改,所以一定要分开使用变量
        printf("server waiting\n");

        /*无限期阻塞,并测试文件描述符变动 */
        result = poll(fds, cur_max_fd, 1000);
        //result = select(FD_SETSIZE, &testfds, (fd_set*)0, (fd_set*)0, (struct timeval*)0); //FD_SETSIZE:系统默认的最大文件描述符
        if (result < 0)
        {
            perror("server5");
            exit(1);

        }
        /*扫描所有的文件描述符*/
        for (i = 0; i < cur_max_fd; i++)
        {

            /*找到相关文件描述符*/
            if (fds[i].revents)
            {
                fd = fds[i].fd;
                /*判断是否为服务器套接字,是则表示为客户请求连接。*/
                if (fd == server_sockfd)
                {
                    client_len = sizeof(client_address);
                    client_sockfd = accept(server_sockfd,
                        (struct sockaddr*)&client_address, &client_len);
                    fds[client_sockfd].fd = client_sockfd;//将客户端socket加入到集合中
                    fds[client_sockfd].events = POLLIN;
                    fds[client_sockfd].revents = 0;


                    if(cur_max_fd <= client_sockfd)
                    {
                        cur_max_fd = client_sockfd + 1;
                    }

                    printf("adding client on fd %d\n", client_sockfd);
                    //fds[server_sockfd].events = POLLIN;
                }
                /*客户端socket中有数据请求时*/
                else
                {
                    //ioctl(fd, FIONREAD, &nread);//取得数据量交给nread
                    nread = read(fd, &ch, 1);
                    /*客户数据请求完毕,关闭套接字,从集合中清除相应描述符 */
                    if (nread == 0)
                    {
                        close(fd);
                        memset(&fds[i], 0, sizeof(struct pollfd)); //去掉关闭的fd
                        printf("removing client on fd %d\n", fd);
                    }
                    /*处理客户数据请求*/
                    else
                    {
                        //read(fds[fd].fd, &ch, 1);
                        sleep(5);
                        printf("serving client on fd %d, read: %c\n", fd, ch);
                        ch++;
                        write(fd, &ch, 1);
                        //fds[fd].events = POLLIN;
                    }
                }
            }
        }
    }

    return 0;
}

epoll

epoll 可以理解为 event poll (基于事件的轮询)

使用场合:

  • 当客户处理多个描述符时(一般是交互式输入和网络套接口),必须使用I/O复用。
  • 当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。
  • 如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。
  • 如果一个服务器既要处理TCP,又要处理UDP,一般要使用I/O复用。
  • 如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。

Reactor 设计模式

在这里插入图片描述
reactor 是一种事件驱动的反应堆模式,高效的事件处理模型

reactor 反应堆:事件来了,执行,事件类型可能不尽相同,所以我们需要提前注册好不同的事件处理函数;事件到来就由 epoll_wait 获取同时到来的多个事件,并且根据数据的不同类型将事件分发给事件处理机制 (事件处理器), 也就是我们提前注册的那些接口函数。

reactor 模型的设计思想和思维方式: 它需要的是事件驱动,相应的事件发生,我们需要根据事件自动的调用相应的函数,所以我们需要提前注册好处理函数的接口到 reactor 中, 函数是由 reactor 去调用的,而不是再主函数中直接进行调用的,所以需要使用回调函数

Reactor 模式是处理并发 I/O 比较常见的一种模式,用于同步 I/O
中心思想将所有要处理的 I/O 事件注册到一个 I/O 多路复用器上,同时主线程/进程阻塞在多路复用器上; 一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中。

流程
1.注册事件和对应的事件处理器;
2.多路复用器等待事件到来;
3.事件到来,激发事件分发器分发事件到对应的处理器;
4.事件处理器处理事件,然后注册新的事件, (比如处理读事件,处理完成之后需要将其设置为写事件再注册,因为读取之后我们需要针对业务需求进行数据处理,之后将其send 回去响应客户端结果,所以自然需要改成写事件,也就需要重新注册)

多路复用器 :由操作系统提供,在 linux 上一般是 select, poll, epoll 等系统调用。
事件分发器 :将多路复用器中返回的就绪事件分到对应的处理函数中,分发给事件处理器。
事件处理器 :处理对应的IO事件。

Reactor优点:
1)响应快,不必为单个同步事件所阻塞,虽然 Reactor 本身依然是同步的;
2)编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
3)可扩展性,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源;
4)可复用性,reactor 框架本身与具体事件处理逻辑无关,具有很高的复用性;

I/O 多路复用

1.创建 EPOLL 句柄

int epoll_create(int size); 

2.向 EPOLL 对象中添加、修改或者删除感兴趣的事件

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); 

参数:
op 取值:
EPOLL_CTL_ADD 添加新的事件到epoll中
EPOLL_CTL_MOD 修改EPOLL中的事件
EPOLL_CTL_DEL 删除epoll中的事件
events 取值:
EPOLLIN 表示有数据可以读出(接受连接、关闭连接)
EPOLLOUT 表示连接可以写入数据发送(向服务器发起连接,连接成功事件)
EPOLLERR 表示对应的连接发生错误
EPOLLHUP 表示对应的连接被挂起

3.收集在epoll监控的事件中已经发生的事件

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout); 

参数:
epfd: epoll 的描述符。
events:分配好的 epoll_event 结构体数组,epoll 将会把发生的事件复制到 events 数组中(events 不可以是空指针,内核只负责把数据复制到这个 events 数组中,不会去帮助我们在用户态中分配内存。内核这种做法效率很高)。

struct epoll_event{
	__uint32_t  events;
	epoll_data_t data;
}
typedef union epoll_data{
	void *ptr;
	int fd;
	uint32_t u32;
	uint64_t u64;
}epoll_data_t

maxevents: 本次可以返回的最大事件数目,通常 maxevents 参数与预分配的 events 数组的大小是相等的。
timeout: 表示在没有检测到事件发生时最多等待的时间(单位为毫秒),如果 timeout0,立刻返回,不会等待。-1表示无限期阻塞。

epoll 支持两种事件触发模式:边缘触发(edge-triggered,ET)和水平触发(level-triggered,LT)。
边缘触发:当被监控的文件描述符上有可读写事件发生时,epoll_wait() 会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!!!
这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符!!!
设置方式: stat->_ev.events = EPOLLIN | EPOLLET
水平触发:当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!
如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率。

两者的区别:水平触发的意思是只要满足事件的条件,比如内核中有数据需要读,就一直不断地把这个事件传递给用户;而边缘触发的意思是只有第一次满足条件的时候才触发,之后就不会再传递同样的事件了。

epoll为什么高效?

1、红黑树提高 epoll 事件增删查改效率;
2、回调通知机制;
当epoll监听套接字有数据读或者写时,会通过注册到socket的回调函数通知epoll,epoll检测到事件后,将事件存储在就绪队列(rdllist)。

就绪队列
epoll_wait 返回成功后,会将所有就绪事件存储在事件数组,用户不需要进行无效的轮询,从而提高了效率。

示例1:使用 epoll 实现简单的 web 服务器

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/types.h>
#include<sys/epoll.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<assert.h>
#include<fcntl.h>
#include<unistd.h>

//    int fd;
typedef struct _ConnectStat  ConnectStat;

typedef void(*response_handler) (ConnectStat * stat);

struct _ConnectStat {
	int fd;
	char name[64];
	char  age[64];
	struct epoll_event _ev;
	int  status;//0 -未登录   1 - 已登陆
	response_handler handler;//不同页面的处理函数
};

//http协议相关代码
ConnectStat * stat_init(int fd);
void connect_handle(int new_fd);
void do_http_respone(ConnectStat * stat);
void do_http_request(ConnectStat * stat);
void welcome_response_handler(ConnectStat * stat);
void commit_respone_handler(ConnectStat * stat);


const char *main_header = "HTTP/1.0 200 OK\r\nServer: Martin Server\r\nContent-Type: text/html\r\nConnection: Close\r\n";

static int epfd = 0;

void usage(const char* argv)
{
	printf("%s:[ip][port]\n", argv);
}

void set_nonblock(int fd)
{
	int fl = fcntl(fd, F_GETFL);
	fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}

int startup(char* _ip, int _port)  //创建一个套接字,绑定,检测服务器
{
	//sock
	//1.创建套接字
	int sock = socket(AF_INET, SOCK_STREAM, 0);
	if (sock < 0)
	{
		perror("sock");
		exit(2);
	}

	int opt = 1;
	setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

	//2.填充本地 sockaddr_in 结构体(设置本地的IP地址和端口)
	struct sockaddr_in local;
	local.sin_port = htons(_port);
	local.sin_family = AF_INET;
	local.sin_addr.s_addr = inet_addr(_ip);

	//3.bind()绑定
	if (bind(sock, (struct sockaddr*)&local, sizeof(local)) < 0)
	{
		perror("bind");
		exit(3);
	}
	//4.listen()监听 检测服务器
	if (listen(sock, 5) < 0)
	{
		perror("listen");
		exit(4);
	}
	//sleep(1000);
	return sock;    //这样的套接字返回
}

int main(int argc, char *argv[])
{
	if (argc != 3)     //检测参数个数是否正确
	{
		usage(argv[0]);
		exit(1);
	}

	int listen_sock = startup(argv[1], atoi(argv[2]));      //创建一个绑定了本地 ip 和端口号的套接字描述符


	//1.创建epoll    
	epfd = epoll_create(256);    //可处理的最大句柄数256个
	if (epfd < 0)
	{
		perror("epoll_create");
		exit(5);
	}

	struct epoll_event _ev;       //epoll结构填充 
	ConnectStat * stat = stat_init(listen_sock);
	_ev.events = EPOLLIN;         //初始关心事件为读
	_ev.data.ptr = stat;
	//_ev.data.fd = listen_sock;    //  

	//2.托管
	epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sock, &_ev);  //将listen sock添加到epfd中,关心读事件

	struct epoll_event revs[64];

	int timeout = -1;
	int num = 0;
	int done = 0;

	while (!done)
	{
		//epoll_wait()相当于在检测事件
		switch ((num = epoll_wait(epfd, revs, 64, timeout)))  //返回需要处理的事件数目  64表示 事件有多大
		{
		case 0:                  //返回0 ,表示监听超时
			printf("timeout\n");
			break;
		case -1:                 //出错
			perror("epoll_wait");
			break;
		default:                 //大于零 即就是返回了需要处理事件的数目
		{
			struct sockaddr_in peer;
			socklen_t len = sizeof(peer);

			int i;
			for (i = 0; i < num; i++)
			{
				ConnectStat * stat = (ConnectStat *)revs[i].data.ptr;

				int rsock = stat->fd; //准确获取哪个事件的描述符
				if (rsock == listen_sock && (revs[i].events) && EPOLLIN) //如果是初始的 就接受,建立链接
				{
					int new_fd = accept(listen_sock, (struct sockaddr*)&peer, &len);

					if (new_fd > 0)
					{
						printf("get a new client:%s:%d\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port));
						//sleep(1000);
						connect_handle(new_fd);
					}
				}
				else // 接下来对num - 1 个事件处理
				{
					if (revs[i].events & EPOLLIN)
					{
						do_http_request((ConnectStat *)revs[i].data.ptr);
					}
					else if (revs[i].events & EPOLLOUT)
					{
						do_http_respone((ConnectStat *)revs[i].data.ptr);
					}
					else
					{
					}
				}
			}
		}
		break;
		}//end switch
	}//end while
	return 0;
}


ConnectStat * stat_init(int fd) {
	ConnectStat * temp = NULL;
	temp = (ConnectStat *)malloc(sizeof(ConnectStat));

	if (!temp) {
		fprintf(stderr, "malloc failed. reason: %m\n");
		return NULL;
	}

	memset(temp, '\0', sizeof(ConnectStat));
	temp->fd = fd;
	temp->status = 0;
	//temp->handler = welcome_response_handler;

}

//初始化连接,然后等待浏览器发送请求
void connect_handle(int new_fd) {
	ConnectStat *stat = stat_init(new_fd);
	set_nonblock(new_fd);

	stat->_ev.events = EPOLLIN;
	stat->_ev.data.ptr = stat;

	epoll_ctl(epfd, EPOLL_CTL_ADD, new_fd, &stat->_ev);    //二次托管

}

void do_http_respone(ConnectStat * stat) {
	stat->handler(stat);
}

void do_http_request(ConnectStat * stat) {

	//读取和解析http 请求
	char buf[4096];
	char * pos = NULL;
	//while  header \r\n\r\ndata
	ssize_t _s = read(stat->fd, buf, sizeof(buf) - 1);
	if (_s > 0)
	{
		buf[_s] = '\0';
		printf("receive from client:%s\n", buf);

		pos = buf;

		//Demo 仅仅演示效果,不做详细的协议解析
		if (!strncasecmp(pos, "GET", 3)) {
			stat->handler = welcome_response_handler;
		}
		else if (!strncasecmp(pos, "Post", 4)) {
			//获取 uri
			printf("---Post----\n");
			pos += strlen("Post");
			while (*pos == ' ' || *pos == '/') ++pos;

			if (!strncasecmp(pos, "commit", 6)) {//获取名字和年龄
				int len = 0;

				printf("post commit --------\n");
				pos = strstr(buf, "\r\n\r\n");
				char *end = NULL;
				if (end = strstr(pos, "name=")) {
					pos = end + strlen("name=");
					end = pos;
					while (('a' <= *end && *end <= 'z') || ('A' <= *end && *end <= 'Z') || ('0' <= *end && *end <= '9'))	end++;
					len = end - pos;
					if (len > 0) {
						memcpy(stat->name, pos, end - pos);
						stat->name[len] = '\0';
					}
				}

				if (end = strstr(pos, "age=")) {
					pos = end + strlen("age=");
					end = pos;
					while ('0' <= *end && *end <= '9')	end++;
					len = end - pos;
					if (len > 0) {
						memcpy(stat->age, pos, end - pos);
						stat->age[len] = '\0';
					}
				}
				stat->handler = commit_respone_handler;

			}
			else {
				stat->handler = welcome_response_handler;
			}

		}
		else {
			stat->handler = welcome_response_handler;
		}

		//生成处理结果 html ,write

		stat->_ev.events = EPOLLOUT;
		//stat->_ev.data.ptr = stat;
		epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);    //二次托管
	}
	else if (_s == 0)  //client:close
	{
		printf("client: %d close\n", stat->fd);
		epoll_ctl(epfd, EPOLL_CTL_DEL, stat->fd, NULL);
		close(stat->fd);
		free(stat);
	}
	else
	{
		perror("read");
	}

}


void welcome_response_handler(ConnectStat * stat) {
	const char * welcome_content = "\
    <html lang=\"zh-CN\">\n\
    <head>\n\
    <meta content=\"text/html; charset=utf-8\" http-equiv=\"Content-Type\">\n\
    <title>This is a test</title>\n\
    </head>\n\
    <body>\n\
    <div align=center height=\"500px\" >\n\
    <br/><br/><br/>\n\
    <h2>大家好,欢迎来到奇牛学院VIP 课!</h2><br/><br/>\n\
    <form action=\"commit\" method=\"post\">\n\
    尊姓大名: <input type=\"text\" name=\"name\" />\n\
    <br/>芳龄几何: <input type=\"password\" name=\"age\" />\n\
    <br/><br/><br/><input type=\"submit\" value=\"提交\" />\n\
    <input type=\"reset\" value=\"重置\" />\n\
    </form>\n\
    </div>\n\
    </body>\n\
    </html>";

	char sendbuffer[4096];
	char content_len[64];

	strcpy(sendbuffer, main_header);
	snprintf(content_len, 64, "Content-Length: %d\r\n\r\n", (int)strlen(welcome_content));
	strcat(sendbuffer, content_len);
	strcat(sendbuffer, welcome_content);
	printf("send reply to client \n%s", sendbuffer);

	write(stat->fd, sendbuffer, strlen(sendbuffer));

	stat->_ev.events = EPOLLIN;
	//stat->_ev.data.ptr = stat;
	epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);


}


void commit_respone_handler(ConnectStat * stat) {
	const char * commit_content = "\
    <html lang=\"zh-CN\">\n\
    <head>\n\
    <meta content=\"text/html; charset=utf-8\" http-equiv=\"Content-Type\">\n\
    <title>This is a test</title>\n\
    </head>\n\
    <body>\n\
    <div align=center height=\"500px\" >\n\
    <br/><br/><br/>\n\
    <h2>欢迎学霸同学&nbsp;%s &nbsp;,你的芳龄是&nbsp;%s!</h2><br/><br/>\n\
    </div>\n\
    </body>\n\
    </html>\n";

	char sendbuffer[4096];
	char content[4096];
	char content_len[64];
	int len = 0;

	len = snprintf(content, 4096, commit_content, stat->name, stat->age);
	strcpy(sendbuffer, main_header);
	snprintf(content_len, 64, "Content-Length: %d\r\n\r\n", len);
	strcat(sendbuffer, content_len);
	strcat(sendbuffer, content);
	printf("send reply to client \n%s", sendbuffer);

	write(stat->fd, sendbuffer, strlen(sendbuffer));

	stat->_ev.events = EPOLLIN;
	//stat->_ev.data.ptr = stat;
	epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);
}

示例2:使用 epoll 封装的库的使用

globals.h

#ifndef GLOBALS_H
#define GLOBALS_H

#include <sys/time.h>
#include <sys/resource.h>	
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/stat.h>

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <errno.h>
#include <fcntl.h>
#include <assert.h>

#define FD_DESC_SZ		64

#define COMM_OK		  (0)
#define COMM_ERROR	 (-1)
#define COMM_NOMESSAGE	 (-3)
#define COMM_TIMEOUT	 (-4)
#define COMM_SHUTDOWN	 (-5)
#define COMM_INPROGRESS  (-6)
#define COMM_ERR_CONNECT (-7)
#define COMM_ERR_DNS     (-8)
#define COMM_ERR_CLOSING (-9)


//调试相关
#define DEBUG_LEVEL  0
#define DEBUG_ONLY   8
#define debug(m, n)    if( m >= DEBUG_LEVEL && n <= DEBUG_ONLY  ) printf
	
#define safe_free(x)	if (x) { free(x); x = NULL; }

typedef void PF(int, void *);

struct _fde {
    unsigned int type;
    __u_short local_port;
    __u_short remote_port;
    struct in_addr local_addr;
 
    char ipaddr[16];		/* dotted decimal address of peer */
   
   
    PF *read_handler;
    void *read_data;
    PF *write_handler;
    void *write_data;
    PF *timeout_handler;
    time_t timeout;
    void *timeout_data;
};

typedef struct _fde fde;

extern fde *fd_table;		
extern int Biggest_FD;		

/*系统时间相关,设置成全局变量,供所有模块使用*/
extern struct timeval current_time;
extern double current_dtime;
extern time_t sys_curtime;


/* epoll 相关接口实现 */
extern void do_epoll_init(int max_fd);
extern void do_epoll_shutdown();
extern void epollSetEvents(int fd, int need_read, int need_write);
extern int do_epoll_select(int msec);

/*框架外围接口*/
void comm_init(int max_fd);
extern int comm_select(int msec);
extern inline void comm_call_handlers(int fd, int read_event, int write_event);
void  commUpdateReadHandler(int fd, PF * handler, void *data);
void commUpdateWriteHandler(int fd, PF * handler, void *data);



extern const char *xstrerror(void);
int ignoreErrno(int ierrno);

#endif /* GLOBALS_H */

comm_epoll.c

#include "globals.h"
#include <sys/epoll.h>

#define MAX_EVENTS	256	/* 一次处理的最大的事件 */

/* epoll structs */
static int kdpfd;
static struct epoll_event events[MAX_EVENTS];
static int epoll_fds = 0;
static unsigned *epoll_state;	/* 保存每个epoll 的事件状态 */

static const char *
epolltype_atoi(int x)
{
    switch (x) {

    case EPOLL_CTL_ADD:
	return "EPOLL_CTL_ADD";

    case EPOLL_CTL_DEL:
	return "EPOLL_CTL_DEL";

    case EPOLL_CTL_MOD:
	return "EPOLL_CTL_MOD";

    default:
	return "UNKNOWN_EPOLLCTL_OP";
    }
}

void do_epoll_init(int max_fd)
{

    
    kdpfd = epoll_create(max_fd);
    if (kdpfd < 0)
	  fprintf(stderr,"do_epoll_init: epoll_create(): %s\n", xstrerror());
    //fd_open(kdpfd, FD_UNKNOWN, "epoll ctl");
    //commSetCloseOnExec(kdpfd);

    epoll_state = calloc(max_fd, sizeof(*epoll_state));
}


void do_epoll_shutdown()
{
    
    close(kdpfd);
    kdpfd = -1;
    safe_free(epoll_state);
}



void epollSetEvents(int fd, int need_read, int need_write)
{
    int epoll_ctl_type = 0;
    struct epoll_event ev;

    assert(fd >= 0);
    debug(5, 8) ("commSetEvents(fd=%d)\n", fd);

	memset(&ev, 0, sizeof(ev));
    
    ev.events = 0;
    ev.data.fd = fd;

    if (need_read)
	ev.events |= EPOLLIN;

    if (need_write)
	ev.events |= EPOLLOUT;

    if (ev.events)
	ev.events |= EPOLLHUP | EPOLLERR;

    if (ev.events != epoll_state[fd]) {
	/* If the struct is already in epoll MOD or DEL, else ADD */
	if (!ev.events) {
	    epoll_ctl_type = EPOLL_CTL_DEL;
	} else if (epoll_state[fd]) {
	    epoll_ctl_type = EPOLL_CTL_MOD;
	} else {
	    epoll_ctl_type = EPOLL_CTL_ADD;
	}

	/* Update the state */
	epoll_state[fd] = ev.events;

	if (epoll_ctl(kdpfd, epoll_ctl_type, fd, &ev) < 0) {
	    debug(5, 1) ("commSetEvents: epoll_ctl(%s): failed on fd=%d: %s\n",
		epolltype_atoi(epoll_ctl_type), fd, xstrerror());
	}
	switch (epoll_ctl_type) {
	case EPOLL_CTL_ADD:
	    epoll_fds++;
	    break;
	case EPOLL_CTL_DEL:
	    epoll_fds--;
	    break;
	default:
	    break;
	}
    }
}

int do_epoll_select(int msec)
{
    int i;
    int num;
    int fd;
    struct epoll_event *cevents;

    /*if (epoll_fds == 0) {
	assert(shutting_down);
	return COMM_SHUTDOWN;
    }
    statCounter.syscalls.polls++;
    */
    num = epoll_wait(kdpfd, events, MAX_EVENTS, msec);
    if (num < 0) {
	getCurrentTime();
	if (ignoreErrno(errno))
	    return COMM_OK;

	debug(5, 1) ("comm_select: epoll failure: %s\n", xstrerror());
	return COMM_ERROR;
    }
    //statHistCount(&statCounter.select_fds_hist, num);

    if (num == 0)
	return COMM_TIMEOUT;

    for (i = 0, cevents = events; i < num; i++, cevents++) {
		fd = cevents->data.fd;
		comm_call_handlers(fd, cevents->events & ~EPOLLOUT, cevents->events & ~EPOLLIN);
    }

    return COMM_OK;
}

comm.c

#include "globals.h"

double current_dtime;
time_t sys_curtime;
struct timeval current_time;


int Biggest_FD = 1024;  /*默认的最大文件描述符数量 1024*/
static int MAX_POLL_TIME = 1000;	/* see also comm_quick_poll_required() */
fde *fd_table = NULL;	

time_t getCurrentTime(void)
{
    gettimeofday(&current_time, NULL);
    current_dtime = (double) current_time.tv_sec +
	(double) current_time.tv_usec / 1000000.0;
    return sys_curtime = current_time.tv_sec;
}


void
comm_close(int fd)
{
	assert(fd>0);
	fde *F = &fd_table[fd];
	if(F) memset((void *)F,'\0',sizeof(fde));
	epollSetEvents(fd, 0, 0);
	close(fd);
}

void
comm_init(int max_fd)
{
	if(max_fd > 0 ) Biggest_FD = max_fd;
	fd_table = calloc(Biggest_FD, sizeof(fde));
    do_epoll_init(Biggest_FD);
}



void
comm_select_shutdown(void)
{
    do_epoll_shutdown();
	if(fd_table) free(fd_table);
}


//static int comm_select_handled;


inline void
comm_call_handlers(int fd, int read_event, int write_event)
{
    fde *F = &fd_table[fd];
    
    debug(5, 8) ("comm_call_handlers(): got fd=%d read_event=%x write_event=%x F->read_handler=%p F->write_handler=%p\n"
	,fd, read_event, write_event, F->read_handler, F->write_handler);
    if (F->read_handler && read_event) {
	    PF *hdl = F->read_handler;
	    void *hdl_data = F->read_data;
	    /* If the descriptor is meant to be deferred, don't handle */

		debug(5, 8) ("comm_call_handlers(): Calling read handler on fd=%d\n", fd);
		//commUpdateReadHandler(fd, NULL, NULL);
		hdl(fd, hdl_data);
    }
	
    if (F->write_handler && write_event) {
	
	    PF *hdl = F->write_handler;
	    void *hdl_data = F->write_data;
	
	    //commUpdateWriteHandler(fd, NULL, NULL);
	    hdl(fd, hdl_data);
    }
}


int
commSetTimeout(int fd, int timeout, PF * handler, void *data)
{
    fde *F;
    debug(5, 3) ("commSetTimeout: FD %d timeout %d\n", fd, timeout);
    assert(fd >= 0);
    assert(fd < Biggest_FD);
    F = &fd_table[fd];

	
    if (timeout < 0) {
	F->timeout_handler = NULL;
	F->timeout_data = NULL;
	return F->timeout = 0;
    }
    assert(handler || F->timeout_handler);
    if (handler || data) {
	F->timeout_handler = handler;
	F->timeout_data = data;
    }
    return F->timeout = sys_curtime + (time_t) timeout;
}

void
commUpdateReadHandler(int fd, PF * handler, void *data)
{
    fd_table[fd].read_handler = handler;
    fd_table[fd].read_data = data;
    
    epollSetEvents(fd,1,0); 
}

void
commUpdateWriteHandler(int fd, PF * handler, void *data)
{
    fd_table[fd].write_handler = handler;
    fd_table[fd].write_data = data;
	
    epollSetEvents(fd,0,1); 
}



static void
checkTimeouts(void)
{
    int fd;
    fde *F = NULL;
    PF *callback;

    for (fd = 0; fd <= Biggest_FD; fd++) {
	F = &fd_table[fd];
	/*if (!F->flags.open)
	    continue;
	*/
	
	if (F->timeout == 0)
	    continue;
	if (F->timeout > sys_curtime)
	    continue;
	debug(5, 5) ("checkTimeouts: FD %d Expired\n", fd);
	
	if (F->timeout_handler) {
	    debug(5, 5) ("checkTimeouts: FD %d: Call timeout handler\n", fd);
	    callback = F->timeout_handler;
	    F->timeout_handler = NULL;
	    callback(fd, F->timeout_data);
	} else {
	    debug(5, 5) ("checkTimeouts: FD %d: Forcing comm_close()\n", fd);
	    comm_close(fd);
	}
    }
}


int
comm_select(int msec)
{
    static double last_timeout = 0.0;
    int rc;
    double start = current_dtime;

    debug(5, 3) ("comm_select: timeout %d\n", msec);

    if (msec > MAX_POLL_TIME)
	msec = MAX_POLL_TIME;


    //statCounter.select_loops++;
    /* Check timeouts once per second */
    if (last_timeout + 0.999 < current_dtime) {
	last_timeout = current_dtime;
	checkTimeouts();
    } else {
	int max_timeout = (last_timeout + 1.0 - current_dtime) * 1000;
	if (max_timeout < msec)
	    msec = max_timeout;
    }
    //comm_select_handled = 0;

    rc = do_epoll_select(msec);


    getCurrentTime();
    //statCounter.select_time += (current_dtime - start);

    if (rc == COMM_TIMEOUT)
	debug(5, 8) ("comm_select: time out\n");

    return rc;
}


const char *
xstrerror(void)
{
    static char xstrerror_buf[BUFSIZ];
    const char *errmsg;

    errmsg = strerror(errno);

    if (!errmsg || !*errmsg)
	errmsg = "Unknown error";

    snprintf(xstrerror_buf, BUFSIZ, "(%d) %s", errno, errmsg);
    return xstrerror_buf;
}


int
ignoreErrno(int ierrno)
{
    switch (ierrno) {
    case EINPROGRESS:
    case EWOULDBLOCK:
#if EAGAIN != EWOULDBLOCK
    case EAGAIN:
#endif
    case EALREADY:
    case EINTR:
#ifdef ERESTART
    case ERESTART:
#endif
	return 1;
    default:
	return 0;
    }
    /* NOTREACHED */
}

main.c

#include "globals.h"
typedef struct _ConnectStat  ConnectStat;

#define BUFLEN  1024

struct _ConnectStat {
	int fd;
	char send_buf[BUFLEN];
	PF *handler;//不同页面的处理函数
};

//echo 服务实现相关代码
ConnectStat * stat_init(int fd);
void accept_connection(int fd, void *data);
void do_echo_handler(int fd, void  *data);
void do_echo_response(int fd,void *data);
void do_echo_timeout(int fd, void *data);



void usage(const char* argv)
{
	printf("%s:[ip][port]\n", argv);
}

void set_nonblock(int fd)
{
	int fl = fcntl(fd, F_GETFL);
	fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}

int startup(char* _ip, int _port)  //创建一个套接字,绑定,检测服务器
{
	//sock
	//1.创建套接字
	int sock = socket(AF_INET, SOCK_STREAM, 0);
	if (sock < 0)
	{
		perror("sock");
		exit(2);
	}

	int opt = 1;
	setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

	//2.填充本地 sockaddr_in 结构体(设置本地的IP地址和端口)
	struct sockaddr_in local;
	local.sin_port = htons(_port);
	local.sin_family = AF_INET;
	local.sin_addr.s_addr = inet_addr(_ip);

	//3.bind()绑定
	if (bind(sock, (struct sockaddr*)&local, sizeof(local)) < 0)
	{
		perror("bind");
		exit(3);
	}
	//4.listen()监听 检测服务器
	if (listen(sock, 5) < 0)
	{
		perror("listen");
		exit(4);
	}
	
	return sock;    //这样的套接字返回
}


ConnectStat * stat_init(int fd) {
	ConnectStat * temp = NULL;
	temp = (ConnectStat *)malloc(sizeof(ConnectStat));

	if (!temp) {
		fprintf(stderr, "malloc failed. reason: %m\n");
		return NULL;
	}

	memset(temp, '\0', sizeof(ConnectStat));
	temp->fd = fd;
	//temp->status = 0;
}

void do_welcome_handler(int fd, void  *data) {
	const char * WELCOME= "Welcome.\n";
	int wlen = strlen(WELCOME);
	int n ;
	ConnectStat * stat = (ConnectStat *)(data);
	
	if( (n = write(fd, "Welcome.\n",wlen)) != wlen ){
		
		if(n<=0){
		    fprintf(stderr, "write failed[len:%d], reason: %s\n",n,strerror(errno));
		}else fprintf(stderr, "send %d bytes only ,need to send %d bytes.\n",n,wlen);
		
	}else {
		commUpdateReadHandler(fd, do_echo_handler,(void *)stat);
		commSetTimeout(fd, 10, do_echo_timeout, (void *)stat);
	}
}


void do_echo_handler(int fd, void  *data) {
	ConnectStat * stat = (ConnectStat *)(data);
	char * p = NULL;
	
	assert(stat!=NULL);
	
	
	p = stat->send_buf;
	*p++ = '-';
	*p++ = '>';
	ssize_t _s = read(fd, p, BUFLEN-(p-stat->send_buf)-1); //2字节"->" +字符结束符.
    if (_s > 0)
    {
		
		*(p+_s) = '\0';
		printf("receive from client: %s\n", p);
		//_s--;
		//while( _s>=0 && ( stat->send_buf[_s]=='\r' || stat->send_buf[_s]=='\n' ) ) stat->send_buf[_s]='\0';
		
		if(!strncasecmp(p, "quit", 4)){//退出.
			comm_close(fd);
            free(stat);
			return ;
		}
		//write(fd,
		commUpdateWriteHandler(fd, do_echo_response, (void *)stat);
		commSetTimeout(fd, 10, do_echo_timeout, (void *)stat);
	}else if (_s == 0)  //client:close
    {
        fprintf(stderr,"Remote connection[fd: %d] has been closed\n", fd);
        comm_close(fd);
        free(stat);
    }
    else //err occurred.
    {
        fprintf(stderr,"read faield[fd: %d], reason:%s [%d]\n",fd , strerror(errno), _s);
    }
}

void do_echo_response(int fd, void  *data) {
	ConnectStat * stat = (ConnectStat *)(data);
	int len = strlen(stat->send_buf);
	int _s = write(fd, stat->send_buf, len);
	
	if(_s>0){
		commSetTimeout(fd, 10, do_echo_timeout, (void *)stat);
		commUpdateReadHandler(fd, do_echo_handler, (void *)stat);
		
	}else if(_s==0){
		fprintf(stderr,"Remote connection[fd: %d] has been closed\n", fd);
                comm_close(fd);
                free(stat);
	}else {
		fprintf(stderr,"read faield[fd: %d], reason:%s [%d]\n",fd ,_s ,strerror(errno));
	}
}

//read()
//注册写事件
  
//写事件就绪
//write()

void accept_connection(int fd, void *data){
	struct sockaddr_in peer;
	socklen_t len = sizeof(peer);

	ConnectStat * stat = (ConnectStat *)data;
	
	int new_fd = accept(fd, (struct sockaddr*)&peer, &len);

	if (new_fd > 0)
	{
		ConnectStat *stat = stat_init(new_fd);
		set_nonblock(new_fd);

		printf("new client: %s:%d\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port));
		commUpdateWriteHandler(new_fd, do_welcome_handler, (void *)stat);
		commSetTimeout(new_fd, 30,do_echo_timeout, (void *)stat);
	}
}

void do_echo_timeout(int fd, void *data){
	fprintf(stdout,"---------timeout[fd:%d]----------\n",fd);
	comm_close(fd);
    free(data);
}

int main(int argc,char **argv){

	if (argc != 3)     //检测参数个数是否正确
	{
		usage(argv[0]);
		exit(1);
	}

	int listen_sock = startup(argv[1], atoi(argv[2]));      //创建一个绑定了本地 ip 和端口号的套接字描述符
	//初始化异步事件处理框架epoll
	
	comm_init(102400);
	
	ConnectStat * stat = stat_init(listen_sock);
	commUpdateReadHandler(listen_sock,accept_connection,(void *)stat);

	do{
		//不断循环处理事件
		comm_select(1000);
		
	}while(1==1);

	comm_select_shutdown();
}

参考:
Select、Poll、Epoll详解
小林coding-I/O 多路复用
epoll高度封装reactor,几乎所有可见服务器的底层框架