IO多路复用

发布于:2025-06-10 ⋅ 阅读:(21) ⋅ 点赞:(0)

一、IO多路复用概念

I/O多路复用(IO multiplexing)是一种并发处理多个I/O操作的机制。它允许一个进程或线程同时监听多个文件描述符(如套接字、管道、标准输入等)的I/O事件,并在有事件发生时进行处理。

传统的I/O模型中,通常使用阻塞I/O非阻塞I/O来处理单个I/O操作。如果需要同时处理多个I/O操作,那么需要使用多个线程或多个进程来管理和执行这些I/O操作。这种方式会导致系统资源的浪费,且编程复杂度较高。

而I/O多路复用通过提供一个统一的接口,如selectpollepoll等,来同时监听多个文件描述符的I/O事件。它们会在任意一个文件描述符上有I/O事件发生时立即返回,并告知应用程序哪些文件描述符有事件发生。应用程序可以根据返回的结果来针对有事件发生的文件描述符进行读取、写入或其他操作。

I/O多路复用的优点包括:

  • 单个进程或线程可以同时处理多个I/O操作,提高了系统的并发性。
  • 避免了大量的进程或线程切换,节约了系统资源
  • 使用较少的线程或进程,简化了编程模型和维护工作。

二、IO多路复用的方式简介

主要的 I/O 多路复用方式有以下几种:

  • selectselect 是最早的一种 I/O 多路复用方式,可以同时监听多个文件描述符的可读、可写和异常事件。通过在调用 select 时传递关注的文件描述符集合,及时返回有事件发生的文件描述符,然后应用程序可以对这些文件描述符进行读写操作。
  • pollpoll 是 select 的一种改进版,也能够同时监听多个文件描述符的可读、可写和异常事件。通过调用 poll 时传递关注的文件描述符数组,返回有事件发生的文件描述符,应用程序执行对应的读写操作。
  • epoll:epoll 是 Linux 特有的一种 I/O 多路复用机制,相较于 select 和 poll 具有更高的性能,适用于高并发环境。epoll 使用了回调机制来通知应用程序文件描述符上的事件发生,并且支持水平触发(LT,level triggered)和边缘触发(ET,edge triggered)两种模式。

三、select方式

select 是一种 I/O 多路复用的机制,用于同时监听多个文件描述符的可读、可写和异常事件。它是最早的一种实现,适用于多平台。select几乎在所有的操作系统上都可用,并且拥有相似的接口和语义。这使得应用程序在多个平台上能够以相似的方式使用 select

1.select运行原理

select 函数在阻塞过程中,主要依赖于一个名为 fd_set 的数据结构来表示文件描述符集合。通过向 select 函数传递待检测的 fd_set 集合,可以指定需要检测哪些文件描述符。fd_set 结构一般是通过使用宏函数以及相关操作进行初始化和处理。

fd_set 结构可以用于传递三种不同类型的文件描述符集合,包括读缓冲区、写缓冲区和异常状态。通过将文件描述符放入相应的集合中,程序员可以选择性地检查特定类型的事件或操作。通过使用传出变量,程序员可以获取与就绪状态对应的文件描述符集合,并相应地处理与就绪内容相关的操作。

下面两张图展示了select函数在运行时的逻辑(读缓冲区为例)

 

select函数使用方法

select函数原型如下:

int select(int nfds, fd_set *readfds, fd_set *writefds,
                  fd_set *exceptfds, struct timeval *timeout);
  • nfds:需要监视的最大文件描述符加1,即待监视的文件描述符的最大值加1。
  • readfds:可读性检查的文件描述符集合。
  • writefds:可写性检查的文件描述符集合。
  • exceptfds:异常条件的文件描述符集合。
  • timeout:最长等待时间,也可以设置为 NULL,表示一直阻塞直到有事件发生。

 函数返回值如下:

  • 大于 0:返回值为有事件发生的文件描述符的总数。
  • 0:表示超时,没有事件发生。
  • -1:出错,可以通过查看全局变量 errno 来获取错误码。

 一些值得注意的小细节

  • nfds 的值必须是所有待监视文件描述符中最大的值加1。
  • 在某些平台上,select 的文件描述符集大小有可能有限制。
  • 调用 select 会阻塞等待,直到有事件发生,这会导致效率问题。
  • 在多个线程中使用 select 可能需要使用互斥锁来保护传递的文件描述符集。

 操作fd_set的API:

       void FD_CLR(int fd, fd_set *set);
       int  FD_ISSET(int fd, fd_set *set);
       void FD_SET(int fd, fd_set *set);
       void FD_ZERO(fd_set *set);

1. FD_ZERO(fd_set *set):清空指定的文件描述符集合 set,将其所有位都置为0。

2. FD_SET(int fd, fd_set *set):将指定的文件描述符 fd 添加到文件描述符集合 set 中,相应的位将被置为1。

3. FD_CLR(int fd, fd_set *set):将指定的文件描述符 fd 从文件描述符集合 set 中移除,相应的位将被清零(置为0)。

4. FD_ISSET(int fd, fd_set *set):检查指定的文件描述符 fd 是否在文件描述符集合 set 中,如果存在,则返回非零值(true);否则,返回零值(false)。

2.实例

下面是一个利用select实现的客户端与服务器端相互传输的简单示例:

服务器端:

#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h> /* See NOTES */
#include <time.h>
#include <unistd.h>
/* According to POSIX.1-2001, POSIX.1-2008 */
#include <sys/select.h>

/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

typedef struct sockaddr*(SA);
int main(int argc, char** argv)
{
  //监听套接字
  int listfd = socket(AF_INET, SOCK_STREAM, 0);
  if (-1 == listfd)
    {
      perror("socket");
      return 1;
    }

  struct sockaddr_in ser, cli;
  bzero(&ser, sizeof(ser));
  bzero(&cli, sizeof(cli));
  ser.sin_family = AF_INET;
  ser.sin_port = htons(50000);
  ser.sin_addr.s_addr = inet_addr("127.0.0.1");
  int ret = bind(listfd, (SA)&ser, sizeof(ser));
  if (-1 == ret)
    {
      perror("bind");
      return 1;
    }
  // 三次握手的排队数  ,
  listen(listfd, 3);

  fd_set rd_set, tmp_set;
  FD_ZERO(&rd_set);
  FD_ZERO(&tmp_set);

  FD_SET(listfd, &tmp_set);
  int max_fd = listfd;
  socklen_t len = sizeof(cli);
  while (1)
    {
      rd_set = tmp_set;  // clean flag
      select(max_fd + 1, &rd_set, NULL, NULL, NULL);
      int i = 0;
      for (i = listfd; i < max_fd + 1; i++)
        {
          if (FD_ISSET(i, &rd_set) && i == listfd)  // 来三次握手 监听套接字
            {
              //通信套接字
              int conn = accept(listfd, (SA)&cli, &len);
              if (-1 == conn)
                {
                  perror("accept");
                  continue;
                }
              FD_SET(conn, &tmp_set);
              if (conn > max_fd)
                {
                  max_fd = conn;
                }
            }
          else if (FD_ISSET(i, &rd_set) && i != listfd)  //进行通信 通讯套接字
            {
              int conn = i;
              char buf[256] = {0};
              // ret >0 实际收到的字节数
              //==0  表示对方断开
              // -1 出错。
              ret = recv(conn, buf, sizeof(buf), 0);
              if (ret <= 0)
                {
                  FD_CLR(conn, &tmp_set);
                  close(conn);
                  printf("cli offline\n");
                  // break;
                  continue;
                }
              getpeername(conn, (SA)&cli, &len);
              printf("cli ip:%s port :%d  %s\n", inet_ntoa(cli.sin_addr),
                     ntohs(cli.sin_port), buf);
              time_t tm;
              time(&tm);
              struct tm* info = localtime(&tm);
              sprintf(buf, "%s %d:%d:%d\n", buf, info->tm_hour, info->tm_min,
                      info->tm_sec);
              send(conn, buf, strlen(buf), 0);
            }
        }
    }
  close(listfd);
  return 0;
}

客户端:

#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h> /* See NOTES */
#include <time.h>
#include <unistd.h>
typedef struct sockaddr*(SA);
int main(int argc, char** argv)
{
  int conn = socket(AF_INET, SOCK_STREAM, 0);
  if (-1 == conn)
    {
      perror("socket");
      return 1;
    }
  // man 7 ip
  struct sockaddr_in ser, cli;
  bzero(&ser, sizeof(ser));
  bzero(&cli, sizeof(cli));
  ser.sin_family = AF_INET;
  ser.sin_port = htons(50000);
  ser.sin_addr.s_addr = inet_addr("127.0.0.1");
  int ret = connect(conn,(SA)&ser,sizeof(ser));
  if(-1 == ret)
  {
    perror("connect");
    return 1;
  }
  int i=5;
  while (1)
    {
      char buf[256] = {0};
      strcpy(buf,"this is tcp test");
      send(conn,buf,strlen(buf),0);
      ret = recv(conn, buf, sizeof(buf), 0);
      if(ret<=0)
      {
        break;
      }
      printf("ser:%s",buf);
      fflush(stdout);
      sleep(1);    
    }
  close(conn);

  return 0;
}

3.注意

在服务器端中,调用select函数时,因为select函数会将检测的结果写回fd_set,所以如果不做其他操作的话,写回的数据会覆盖掉最初的fd_set,造成错误。所以我们在调用select函数之前可以将fd_set暂时先赋给一个临时变量,如下:

fd_set rdset;
fd_set rdtemp;
 
 
rdtemp = rdset;
int num = select(maxfd + 1, &rdtemp, NULL, NULL, NULL);

四、epoll方式

1.epoll运行原理

epoll是Linux下的一种I/O 多路复用机制,可以高效地处理大量的并发连接。

epoll模型使用一个文件描述符(epoll fd)来管理多个其他文件描述符(event fd)。在epoll fd上注册了感兴趣的事件,当有感兴趣的事件发生时,epoll会通知应用程序。相比于传统的select和poll模型,epoll模型有以下几个优势:

  1. 高效:在大规模并发连接的场景下,epoll模型可以显著提高效率。使用一个文件描述符来管理多个连接,避免了遍历所有连接的开销。并且epoll使用了“事件通知”的方式,只有在有事件发生时才会通知应用程序,避免了无效轮询。
  2. 更快的响应速度:由于epoll是基于事件驱动的模型,在有事件发生时立即通知应用程序,可以更快地响应客户端的请求。
  3. 可扩展性好:epoll模型采用了无锁设计,将连接集合的管理交给内核处理,并利用回调函数机制处理连接的读写事件,减少了锁竞争,提高了系统的可扩展性。

2.epoll函数使用方法

在Linux下,epoll函数主要包括以下几个:

#include <sys/epoll.h>  //头文件
 
int epoll_create(int size);   //创建一个epoll实例
 
int epoll_ctl(int epfd, int op, 
              int fd, struct epoll_event *event);  //控制epoll上的事件
 
int epoll_wait(int epfd, struct epoll_event *events,
               int maxevents, int timeout); //阻塞等待事件发生

同时在这些参数中,有一个重要的数据结构epoll_event。epoll_event结构体用于描述事件,包括文件描述符、事件类型和事件数据。其中的定义如下:

typedef union epoll_data
{
  void *ptr;
  int fd;
  uint32_t u32;
  uint64_t u64;
} epoll_data_t;
 
struct epoll_event
{
  uint32_t events;	/* Epoll events */
  epoll_data_t data;	/* User data variable */
} __EPOLL_PACKED;

 其中,events是事件类型,包括以下几种:

  • EPOLLIN:可读事件,表示连接上有数据可读。
  • EPOLLOUT:可写事件,表示连接上可以写入数据。
  • EPOLLPRI:紧急事件,表示连接上有紧急数据可读。
  • EPOLLRDHUP:连接关闭事件,表示连接已关闭。
  • EPOLLERR:错误事件,表示连接上发生错误。
  • EPOLLHUP:挂起事件,表示连接被挂起。

结构体中的epoll_data是一个联合体,用于在epoll_event结构体中传递事件数据。它有四个成员变量,可以根据具体的需求选择使用其中的一个。通常可以选择int类型的fd,用于存储发生对应事件的文件描述符

epoll_create函数:创建一个epoll fd,返回一个新的epoll文件描述符。参数size用于指定监听的文件描述符个数,但是在Linux 2.6.8之后的版本,该参数已经没有实际意义。传入一个大于0的值即可。

int epfd=epoll_create(1);

 epoll_ctl函数:用于控制epoll事件的函数之一。它用于向epoll实例中添加、修改或删除关注的文件描述符和对应事件。函数原型如下:

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

 函数参数:

epfd:epoll文件描述符,通过epoll_create函数创建获得。

op:操作类型,可以是以下三种取值之一:

  • EPOLL_CTL_ADD:将文件描述符添加到epoll实例中。
  • EPOLL_CTL_MOD:修改已添加到epoll实例中的文件描述符的关注事件。
  • EPOLL_CTL_DEL:从epoll实例中删除文件描述符。

fd:要控制的文件描述符。 

event:指向epoll_event结构体的指针,用于指定要添加、修改或删除的事件。

 函数返回值:

  • 成功时返回0,表示操作成功。
  • 失败时返回-1,并设置errno错误码来指示具体错误原因。

 epoll_wait函数:用于等待事件的发生。它会一直阻塞直到有事件发生或超时。函数原型如下:

#include <sys/epoll.h>
 
int epoll_wait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout);

 函数参数:

epfd:epoll文件描述符,通过epoll_create函数创建获得。

events:用于接收事件的epoll_event结构体数组。

maxeventsevents数组的大小,表示最多可以接收多少个事件。

timeout:超时时间,单位为毫秒,表示epoll_wait函数阻塞的最长时间。常用的取值有以下三种:

  • -1:表示一直阻塞,直到有事件发生。
  • 0:表示立即返回,不管有没有事件发生。
  • > 0:表示等待指定的时间(以毫秒为单位),如果在指定时间内没有事件发生,则返回。

 函数返回值:

  • 成功时返回接收到的事件的数量。如果超时时间为0并且没有事件发生,则返回0。
  • 失败时返回-1,并设置errno错误码来指示具体错误原因。

注意:

在epoll_wait函数中用于接收事件的epoll_event结构体数组是一个传出参数,需要定义一个epoll_event的数组,比如: 

    struct epoll_event evens[100];//用于接取传出的内容
    int len=sizeof(evens)/sizeof(struct epoll_event);

工作模式

epoll的工作模式可以分为两种:边缘触发(Edge Triggered, ET)模式和水平触发(Level Triggered, LT)模式。一般epoll运行的模式默认是水平触发模式。

3.实例

服务器端:

#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h> /* See NOTES */
#include <time.h>
#include <unistd.h>
/* According to POSIX.1-2001, POSIX.1-2008 */
#include <sys/select.h>

/* According to earlier standards */
#include <sys/epoll.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
typedef struct sockaddr*(SA);
int add_fd(int epfd, int fd)
{
  struct epoll_event ev;
  ev.events = EPOLLIN;
  ev.data.fd = fd;
  int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
  if (-1 == ret)
    {
      perror("add fd ");
      return 1;
    }
  return 0;
}
int del_fd(int epfd, int fd)
{
  struct epoll_event ev;
  ev.events = EPOLLIN;
  ev.data.fd = fd;
  int ret = epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev);
  if (-1 == ret)
    {
      perror("add fd ");
      return 1;
    }
  return 0;
}
int main(int argc, char** argv)
{
  //监听套接字
  int listfd = socket(AF_INET, SOCK_STREAM, 0);
  if (-1 == listfd)
    {
      perror("socket");
      return 1;
    }

  struct sockaddr_in ser, cli;
  bzero(&ser, sizeof(ser));
  bzero(&cli, sizeof(cli));
  ser.sin_family = AF_INET;
  ser.sin_port = htons(50000);
  ser.sin_addr.s_addr = inet_addr("127.0.0.1");
  int ret = bind(listfd, (SA)&ser, sizeof(ser));
  if (-1 == ret)
    {
      perror("bind");
      return 1;
    }
  // 三次握手的排队数  ,
  listen(listfd, 3);
  // 1 create set
  int epfd = epoll_create(10);
  if (-1 == epfd)
    {
      perror("epoll_create");
      return 1;
    }
  struct epoll_event rev[10];
  // 2 add fd
  add_fd(epfd, listfd);

  socklen_t len = sizeof(cli);
  while (1)
    {
      int ep_ret = epoll_wait(epfd, rev, 10, -1);
      int i = 0;
      for (i = 0; i < ep_ret; i++)
        {
          if (listfd == rev[i].data.fd)  // 来三次握手 监听套接字
            {
              //通信套接字
              int conn = accept(listfd, (SA)&cli, &len);
              if (-1 == conn)
                {
                  perror("accept");
                  continue;
                }
              add_fd(epfd, conn);
            }
          else  //进行通信 通讯套接字
            {
              int conn = rev[i].data.fd;
              char buf[256] = {0};
              // ret >0 实际收到的字节数
              //==0  表示对方断开
              // -1 出错。
              getpeername(conn, (SA)&cli, &len);
              ret = recv(conn, buf, sizeof(buf), 0);
              if (ret <= 0)
                {
                  // FD_CLR(conn, &tmp_set);
                  del_fd(epfd, conn);
                  close(conn);
                  printf("cli ip:%s port:%d offline\n",
                            inet_ntoa(cli.sin_addr),ntohs(cli.sin_port));             
                  // break;
                  continue;
                }

              printf("cli ip:%s port :%d  %s\n", inet_ntoa(cli.sin_addr),
                     ntohs(cli.sin_port), buf);
              time_t tm;
              time(&tm);
              struct tm* info = localtime(&tm);
              sprintf(buf, "%s %d:%d:%d\n", buf, info->tm_hour, info->tm_min,
                      info->tm_sec);
              send(conn, buf, strlen(buf), 0);
            }
        }
    }
  close(listfd);
  return 0;
}

客户端:

#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h> /* See NOTES */
#include <time.h>
#include <unistd.h>
typedef struct sockaddr*(SA);
int main(int argc, char** argv)
{
  
  int conn = socket(AF_INET, SOCK_STREAM, 0);
  if (-1 == conn)
    {
      perror("socket");
      return 1;
    }

  struct sockaddr_in ser, cli;
  bzero(&ser, sizeof(ser));
  bzero(&cli, sizeof(cli));
  ser.sin_family = AF_INET;
  ser.sin_port = htons(50000);
  ser.sin_addr.s_addr = inet_addr("127.0.0.1");
  int ret = connect(conn,(SA)&ser,sizeof(ser));
  if(-1 == ret)
  {
    perror("connect");
    return 1;
  }
  int i=5;
  while (1)
    {
      char buf[256] = {0};
      strcpy(buf,"this is tcp test");
      send(conn,buf,strlen(buf),0);
      ret = recv(conn, buf, sizeof(buf), 0);
      if(ret<=0)
      {
        break;
      }
      printf("ser:%s",buf);
      fflush(stdout);
      sleep(1);
      
    }
  close(conn);

  return 0;
}