linux高性能服务器编程笔记

发布于:2023-01-17 ⋅ 阅读:(275) ⋅ 点赞:(0)

第1章 TCP/IP协议族

 

数据链路层

数据链路层实现了网卡接口的网络驱动程序,以处理数据在物理媒介(比如以太网、令牌环等)上的传输。不同的物理网络具有不同的电气特性,网络驱动程序隐藏了这些细节,为上层协议提供一个统一的接口。 封装了物理网络的电气细节

ARP 、RARP(无盘工作站:缺乏存储设备,无盘工作站无法记住自己的IP地址 )

数据链路层使用物理地址寻址一台机器 ,实现IP地址和机器物理地址之间的相互转换

网络层

实现数据包的选路和转发 。封装了网络连接的细节

IP协议

IP协议根据数据包的目的IP地址来决定如何投递它 ,寻找一个合适的下一跳路由器,并将数据包交付给该路由器来转发

ICMP协议(因特网控制报文协议)

用于检测网络连接

8位类型 :差错报文-----用来回应网络错误,比如 目标不可到达 和 重定向

查询报文-----查询网络信息 , 比如ping

8位代码 :进一步细分不同的条件,比如重定向报文使用代码值0表示对网络重定向,代码值1表示对主机重定向

16位校验和 :对整个报文(包括头部和内容部分)进行循环冗余校验 ,以检验报文在传输过程中是否损坏

传输层

传输层只关心通信的起始端和目的端,而不在乎数据包的中转过程 。封装了一条端到端的逻辑通信链路,它负责数据的收发、链路的超时重连等。

TCP协议:为应用层提供可靠的、面向连接的和基于流的服务

使用超时重传、数据确认等;通信的双方必须先建立TCP连接 ; 数据没有边界限制,发送端可以逐个字节地向数据流中写入数据,接收端也可以逐个字节地将它们读出

UDP协议:提供不可靠、无连接和基于数据报的服务

无法保证数据从发送端正确地传送到目的端,使用UDP协议的应用程序通常要自己处理数据确认、超时重传等逻辑; 每次发送数据都要明确指定接收端的地址 ;每个UDP数据报都有一个长度,接收端必须以该长度为最小单位将其所有内容一次性读出,否则数据将被截断

SCTP协议

为了在因特网上传输电话信号而设计的

应用层

处理应用程序的逻辑,在用户空间实现,(数据链路层、网络层和传输层负责处理网络通信细节 ,在内核空间中实现 )。可以通过/etc/services文件查看所有知名的应用层协议,以及它们都能使用哪些传输层服务。

ping(应用程序):它利用ICMP报文检测网络连接 ,可跳过传输层直接使用网络层提供的服务

telnet :远程登录协议

OSPF(开放最短路径优先) : 一种动态路由更新协议,用于路由器之间的通信 ,可能跳过传输层直接使用网络层提供的服务

DNS 机器域名到IP地址的转换

封装

应用程序数据沿着协议栈从上往下依次传递,每层协议都将在上层数据的基础上加上自己的头部信息

 经过TCP封装后的数据称为TCP报文段,TCP头部信息和TCP内核缓冲区数据一起构成了TCP报文段,当发送端应用程序使用send(或者write)函数向一个TCP连接写入数据时,内核中的TCP模块首先把这些数据复制到与该连接对应的TCP内核发送缓冲区中,然后TCP模块调用IP模块提供的服务,传递的参数包括TCP头部信息和TCP发送缓冲区中的数据,即TCP报文段 。

经过UDP封装后的数据称为UDP数据报,UDP无须为应用层数据保存副本 ,当一个UDP数据报被成功发送之后,UDP内核缓冲区中的该数据报就被丢弃了。如果应用程序检测到该数据报未能被接收端正确接收,并打算重发这个数据报,则应用程序需要重新从用户空间将该数据报拷贝到UDP内核发送缓冲区中

经过IP封装后的数据称为IP数据报,数据部分就是一个TCP报文段、UDP数据报或者ICMP报文

经过数据链路层封装的数据称为帧,MTU(帧的最大传输单元),以太网帧的MTU是1500字节,过长的IP数据报可能需要被分片

分用

帧到达目的主机时,将沿着协议栈自底向上依次传递,各层协议依次处理帧中本层负责的头部数据,以获取所需的信息,并最终将处理后的帧交给目标应用程序 。帧的头部需要提供某个字段来区分不同协议

ARP

IP地址到以太网地址的转换 。原理:主机向自己所在的网络广播一个ARP请求,该请求包含目标机器的网络地址。此网络上的其他机器都将收到这个请求,但只有被请求的目标机器会回应一个ARP应答,其中包含自己的物理地址。

通常,ARP维护一个高速缓存,其中包含经常访问,或最近访问的机器的IP地址到物理地址的映射,避免了重复的ARP请求

socket

将应用程序数据从用户缓冲区中复制到TCP/UDP内核发送缓冲区,以交付内核来发送数据,或者是从内核TCP/UDP接收缓冲区中复制数据到用户缓冲区,以读取数据

应用程序可以通过它们来修改内核中各层协议的某些头部信息或其他数据结构,从而精细地控制底层通信的行为,如设置TTL

第2章 IP协议详解

IP服务的特点

为上层协议提供无状态、无连接、不可靠的服务。

无状态 :所有IP数据报的发送、传输和接收都是相互独立、没有上下文关系的 ,接收端的IP模块无法检测到乱序和重复(IP数据报头部标识字段用来处理IP分片和重组,不是用来指示接收顺序 )

无连接 :IP通信双方都不长久地维持对方的任何信息,每次发送数据需指明对方的IP地址

不可靠 :不能保证IP数据报准确地到达接收端,上层协议需要自己实现数据确认、超时重传等机制

 

IP分片

 

长度为1501字节的IP数据报被拆分成两个IP分片,第一个IP分片长度为1500字节,第二个IP分片的长度为21字节。每个IP分片都包含自己的IP头部(20字节),且第一个IP分片的IP头部设置了MF标志,而第二个IP分片的IP头部则没有设置该标志,因为它已经是最后一个分片了。原始IP数据报中的ICMP头部内容被完整地复制到了第一个IP分片中。第二个IP分片不包含ICMP头部信息,因为IP模块重组该ICMP报文的时候只需要一份ICMP头部信息,重复传送这个信息没有任何益处。1473字节的ICMP报文数据的前1472字节被IP模块复制到第一个IP分片中,使其总长度为1500字节,从而满足MTU的要求;而多出的最后1字节则被复制到第二个IP分片中。

IP路由

IP模块接收到来自数据链路层的IP数据报时 ,先对该数据报的头部做CRC校验,无误分析其头部 ;如果头部设置了源站选路选项,则IP模块调用数据报转发子模块来处理该数据报 ,如果该IP数据报是发送给本机的,则IP模块就根据数据报头部中的协议字段来决定将它派发给哪个上层应用 ;如果该IP数据报不是发送给本机的,也调用数据报转发子模块来处理该数据报。

数据报转发子模块将首先检测系统是否允许转发,允许,然后将它交给IP数据报输出子模块;IP模块实现数据报路由的核心数据结构是路由表

路由表如何给定数据报的目标IP地:

1、查找路由表中和数据报的目标IP地址完全匹配的主机IP地址。如果找到,就使用该路由项,没找到则转步骤2

2、查找路由表中和数据报的目标IP地址具有相同网路ID的网络IP地址,如果找到,就使用该路由项;没找到则转步骤3

3、选择默认路由项,这通常意味着数据报的下一跳路由是网关

重定向

ICMP重定向报文可用于更新路由表,给源端发送一个ICMP重定向报文,以告诉它一个更合理的下一跳路由器。

第3章 TCP协议详解

TCP服务的特点

面向连接:通信的双方先建立连接,为该连接分配必要的内核资源,以管理连接的状态和 连接上数据的传输,一对一通信

字节流:发送端执行的写操作次数和接收端执行的读操作次数之间没有任何数量关系,应用程序对数据的发送和接收是没有边界限制的

可靠传输:采用发送应答机制,超时重传机制,TCP协议还会对接收到的TCP报文段重排、整理,再交付给应用层

半关闭

通信的一端可以发送结束报文段给对方,告诉它本端已经完成了数据的发送,但允许继续接收来自对方的数据,直到对方也发送结束报文段以关闭连接。TCP连接的这种状态称为半关闭

连接超时

服务器对于客户端发送出的同步报文段没有应答,如果重连仍然无效,则通知应用程序连接超时

 

TIME_WAIT状态

在这个状态,客户端连接要等待一段长为2MSL(Maximum Segment Life,报文段最大生存时间)的时间,才能完全关闭。MSL是TCP报文段在网络中的最大生存时间 ,标准文档建议2 min。

TIME_WAIT状态存在的原因有两点:

可靠地终止TCP连接。--报文段7丢失 ,那么服务器将重发结束报文段 ,客户端需要停留在某个状态以处理重复收到的结束报文段

保证让迟来的TCP报文段有足够的时间被识别并丢弃。--防止应用程序能够立即建立一个和刚关闭的连接相同的IP地址和端口号 (可能接收到属于原来的连接的应用程序数据 )

复位报文段

TCP连接的一端会向另一端发送携带RST标志的报文段,即复位报文段,以通知对方关闭连接或重新建立连接 ,3种情况:

1、访问不存在的端口和TIME_WAIT状态的连接

2、异常终止连接

3、客户端(或服务器)往处于半打开状态的连接写入数据,则对方将回应一个复位报文段。

Nagle

Nagle算法要求一个TCP连接的通信双方在任意时刻都最多只能发送一个未被确认的TCP报文段,在该TCP报文段的确认到达之前不能发送其他TCP报文段。另一方面,发送方在等待确认的同时收集本端需要发送的微量数据,并在确认到来时以一个TCP报文段将它们全部发出。这样就极大地减少了网络上的微小TCP报文段的数量。该算法的另一个优点在于其自适应性:确认到达得越快,数据也就发送得越快

带外数据

用于迅速通告对方本端发生的重要事件,比普通数据(也称为带内数据)有更高的优先级,它应该总是立即被发送,而不论发送缓冲区中是否有排队等待发送的普通数据 。带外数据的传输可以使用一条独立的传输层连接,也可以映射到传输普通数据的连接中。

TCP利用其头部中的紧急指针标志和紧急指针两个字段,给应用程序提供了一种紧急方式 。根据紧急指针所指的位置确定带外数据的位置,并将它读入一个特殊的缓存中。这个缓存只有1字节,称为带外缓存。如果上层应用程序没有及时将带外数据从带外缓存中读出,则后续的带外数据(如果有的话)将覆盖它

TCP超时重传

TCP服务必须能够重传超时时间内未收到确认的TCP报文段。TCP模块为每个TCP报文段都维护一个重传定时器

拥塞控制

TCP模块还有一个重要的任务,就是提高网络利用率,降低丢包率,并保证网络资源对每条数据流的公平性。这就是所谓的拥塞控制 .四个部分 :慢启动、拥塞避免、快速重传和快速恢复

控制发送端向网络一次连续写入的数据量SWND(发送窗口 ),不过,发送端最终以TCP报文段来发送数据,所以SWND限定了发送端能连续发送的TCP报文段数量。发送端需要合理地选择SWND的大小。如果SWND太小,会引起明显的网络延迟;反之,如果SWND太大,则容易导致网络拥塞。

发送端需要合理地选择SWND的大小。接收方可通过其接收通告窗口(RWND)来控制发送端的SWND 。但这显然不够,所以发送端引入了一个称为拥塞窗口(Congestion Window,CWND)的状态变量。实际的SWND值是RWND和CWND中的较小者

慢启动和拥塞避免

TCP连接建立好之后,CWND将被设置成初始值IW(Initial Window),其大小为2~4个SMSS(TCP报文段的最大长度(仅指数据部分)),其值一般等于MSS,此后发送端每收到接收端的一个确认,增加

 当CWND的大小超过慢启动门限该值时,TCP拥塞控制将进入拥塞避免阶段

快速重传和快速恢复

发送端如果连续收到3个重复的确认报文段,就认为是拥塞发生了。后它启用快速重传和快速恢复算法来处理拥塞,过程如下:

1.当收到第3个重复的确认报文段时,按照式(3-3)计算ssthresh,然后立即重传丢失的报文段,并按照式(3-4)设置CWND。

 

 2.每次收到1个重复的确认时,设置CWND=CWND+SMSS。此时发送端可以发送新的TCP报文段(如果新的CWND允许的话)

3.当收到新数据的确认时,设置CWND=ssthresh(ssthresh是新的慢启动门限值,由第一步计算得到)。

快速重传和快速恢复完成之后,拥塞控制将恢复到拥塞避免阶段,这一点由第3步操作可得知。

第4章 TCP/IP通信案例:访问Internet上的Web服务器

HTTP代理服务器的工作原理

在HTTP通信链上,客户端和目标服务器之间通常存在某些中转代理服务器,它们提供对目标资源的中转访问。一个HTTP请求可能被多个代理服务器转发,后面的服务器称为前面服务器的上游服务器。

正向代理服务器

客户端自己设置代理服务器的地址。客户的每次请求都将直接发送到该代理服务器,并由代理服务器来请求目标资源。

反向代理服务器

反向代理则被设置在服务器端,因而客户端无须进行任何设置。用代理服务器来接收Internet上的连接请求,然后将请求转发给内部网络上的服务器,并将从内部服务器上得到的结果返回给客户端。

透明代理服务器

透明代理只能设置在网关上。 用户访问Internet的数据报必然都经过网关,如果在网关上设置代理,则该代理对用户来说显然是透明的。透明代理可以看作正向代理的一种特殊情况

路由转发

虽然IP数据报是先发送到路由器,但IP头部的源端IP地址和目的端IP地址在转发过程中是始终不变的,帧头部的源端物理地址和目的端物理地址在转发过程中则是一直在变化的。

本地名称查询

通过域名来访问Internet上的某台主机时,需要使用DNS服务来获取该主机的IP地址。但如果我们通过主机名来访问本地局域网上的机器,则可通过本地的静态文件来获得该机器的IP地址

HTTP请求

GET http://www.baidu.com/index.html HTTP/1.0
User-Agent:Wget/1.12(linux-gnu)
Host:www.baidu.com
Connection:close

 

HTTP应答

HTTP/1.0 200 OK
Server:BWS/1.0
Content-Length:8024
Content-Type:text/html;charset=gbk
SetCookie:BAIDUID=A5B6C72D68CF639CE8896FD79A03FBD8:FG=1;expires=Wed,04-
Jul-42 00:10:47 GMT;path=/;domain=.baidu.com
Via:1.0 localhost(squid/3.0 STABLE18)

HTTP协议是一种无状态的协议,使用额外的手段来保持HTTP连接状态,常见的解决方法就是Cookie。 Cookie是服务器发送给客户端的特殊信息(通过HTTP应答的头部字段“SetCookie”),客户端每次向服务器发送请求的时候都需要带上这些信息(通过HTTP请求的头部字段“Cookie”)。这样服务器就可以区分不同的客户了。基于浏览器的自动登录就是用Cookie实现的。

第5章 Linux网络编程基础API

#include<arpa/inet.h>
in_addr_t inet_addr(const char*strptr);
int inet_aton(const char*cp,struct in_addr*inp);
char*inet_ntoa(struct in_addr in);

inet_addr函数将用点分十进制字符串表示的IPv4地址转化为用网络字节序整数表示的IPv4地址。它失败时返回INADDR_NONE。

inet_aton函数完成和inet_addr同样的功能,但是将转化结果存储于参数inp指向的地址结构中。它成功时返回1,失败则返回0。

inet_ntoa函数将用网络字节序整数表示的IPv4地址转化为用点分十进制字符串表示的IPv4地址。 用一个静态变量存储转化结果,函数的返回值指向该静态内存,因此inet_ntoa是不可重入的。

inet_pton函数将用字符串表示的IP地址src(用点分十进制字符串表示的IPv4地址或用十六进制字符串表示的IPv6地址)转换成用网络字节序整数表示的IP地址,并把转换结果存储于dst指向的内存中 ,成功时返回1 ,失败则返回0

inet_ntop函数进行相反的转换,前三个参数的含义与inet_pton的参数相同,最后一个参数cnt指定目标存储单元的大小。 成功时返回目标存储单元的地址 ,失败则返回NULL

#include<sys/socket.h>
int getsockname(int sockfd,struct
sockaddr*address,socklen_t*address_len);
int getpeername(int sockfd,struct
sockaddr*address,socklen_t*address_len);

getsockname获取sockfd对应的本端socket地址,并将其存储于address参数指定的内存中,该socket地址的长度则存储于address_len参数指向的变量中。 成功时返回0,失败 返回-1

getpeername获取sockfd对应的远端socket地址,其参数及返回值的含义与getsockname的参数及返回值相同

struct hostent*gethostbyname(const char*name);
struct hostent*gethostbyaddr(const void*addr,size_t len,int type);
struct hostent
{
    char*h_name;/*主机名*/
    char**h_aliases;/*主机别名列表,可能有多个*/
    int h_addrtype;/*地址类型(地址族)*/
    int h_length;/*地址长度*/
    char**h_addr_list/*按网络字节序列出的主机IP地址列表*/
};

gethostbyname函数根据主机名称获取主机的完整信息, gethostbyaddr函数根据IP地址获取主机的完整信息。

#include<netdb.h>
struct servent*getservbyname(const char*name,const char*proto);
struct servent*getservbyport(int port,const char*proto);
struct servent
{
    char*s_name;/*服务名称*/
    char**s_aliases;/*服务的别名列表,可能有多个*/
    int s_port;/*端口号*/
    char*s_proto;/*服务类型,通常是tcp或者udp*/
};

getservbyname函数根据名称获取某个服务的完整信息, getservbyport函数根据端口号获取某个服务的完整信息。

例子

#include<sys/socket.h>
#include<netinet/in.h>
#include<netdb.h>
#include<stdio.h>
#include<unistd.h>
#include<assert.h>
int main(int argc,char*argv[]){
    assert(argc==2);
    char*host=argv[1];
    /*获取目标主机地址信息*/
    struct hostent*hostinfo=gethostbyname(host);
    assert(hostinfo);
    /*获取daytime服务信息*/
    struct servent*servinfo=getservbyname("daytime","tcp");
    assert(servinfo);
    printf("daytime port is%d\n",ntohs(servinfo->s_port));
    struct sockaddr_in address;
    address.sin_family=AF_INET;
    address.sin_port=servinfo->s_port;
    /*注意下面的代码,因为h_addr_list本身是使用网络字节序的地址列表,所以使用其
    中的IP地址时,无须对目标IP地址转换字节序*/
    address.sin_addr=*(struct in_addr*)*hostinfo->h_addr_list;
    int sockfd=socket(AF_INET,SOCK_STREAM,0);
    int result=connect(sockfd,(struct sockaddr*)&
    address,sizeof(address));
    assert(result!=-1);
    char buffer[128];
    result=read(sockfd,buffer,sizeof(buffer));
    assert(result>0);
    buffer[result]='\0';
    printf("the day tiem is:%s",buffer);
    close(sockfd);
    return 0;
}

#include<netdb.h>
int getaddrinfo(const char*hostname,const char*service,const
struct addrinfo*hints,struct addrinfo**result);
​
struct addrinfo
{
    int ai_flags;/*见后文*/
    int ai_family;/*地址族*/
    int ai_socktype;/*服务类型,SOCK_STREAM或SOCK_DGRAM*/
    int ai_protocol;/*见后文*/
    socklen_t ai_addrlen;/*socket地址ai_addr的长度*/
    主机的别名char*ai_canonname;/*主机的别名*/
    struct sockaddr*ai_addr;/*指向socket地址*/
    struct addrinfo*ai_next;/*指向下一个sockinfo结构的对象*/
};

getaddrinfo函数既能通过主机名获得IP地址(内部使用的是gethostbyname函数),也能通过服务名获得端口号(内部使用的是getservbyname函数)。

hostname参数可以接收主机名,也可以接收字符串表示的IP地址(IPv4采用点分十进制字符串,IPv6则采用十六进制字符串)。

service参数可以接收服务名,也可以接收字符串表示的十进制端口号。

hints参数是应用程序给getaddrinfo的一个提示,以对getaddrinfo的输出进行更精确的控制。可以被设置为NULL,表示允许getaddrinfo反馈任何可用的结果。使用hints参数的时候,可以设置其ai_flags,ai_family,ai_socktype和ai_protocol四个字段,其他字段则必须被设置为NULL。

result参数指向一个链表,该链表用于存储getaddrinfo反馈的结果。

例子

struct addrinfo hints
struct addrinfo*res;
bzero(&hints,sizeof(hints));
hints.ai_socktype=SOCK_STREAM;
getaddrinfo("ernest-laptop","daytime",&hints,&res);

getaddrinfo调用结束后,我们必须使用如下配对函数来释放这块内存:

void freeaddrinfo(struct addrinfo*res);

#include<netdb.h>
int getnameinfo(const struct sockaddr*sockaddr,socklen_t
addrlen,char*host,socklen_t hostlen,char*serv,socklen_t servlen,int
flags);

getnameinfo函数能通过socket地址同时获得以字符串表示的主机名(内部使用的是gethostbyaddr函数)和服务名(内部使用的是getservbyport函数)。

第6章 高级I/O函数

#include<unistd.h>
int pipe(int fd[2]);

pipe函数可用于创建一个管道,以实现进程间通信。 成功时返回0 ,失败返回-1

fd[0]和fd[1]分别构成管道的两端,往fd[1]写入的数据可以从fd[0]读出。 并且,fd[0]只能用于从管道读出数据,fd[1]则只能用于往管道写入数据。 这一对文件描述符都是阻塞的,直到管道内有数据可读;往一个满的管道(见后文)中写入数据,则write亦将被阻塞,直到管道有足够多的空闲空间可用。管道本身拥有一个容量限制,它规定如果应用程序不将数据从管道读走的话,该管道最多能被写入多少字节的数据。管道容量的大小默认是65536字节。我们可以使用fcntl函数来修改管道容量

#include<sys/types.h>
#include<sys/socket.h>
int socketpair(int domain,int type,int protocol,int fd[2]);

它能够方便地创建双向管道。

  • domain表示协议族,PF_UNIX或者AF_UNIX

  • type表示协议,可以是SOCK_STREAM或者SOCK_DGRAM,SOCK_STREAM基于TCP,SOCK_DGRAM基于UDP

  • protocol表示类型,只能为0

  • sv[2]表示套节字柄对,该两个句柄作用相同,均能进行读写双向操作

  • 返回结果, 0为创建成功,-1为创建失败

#include<unistd.h>
int dup(int file_descriptor);
int dup2(int file_descriptor_one,int file_descriptor_two);

有时我们希望把标准输入重定向到一个文件,或者把标准输出重定向到一个网络连接(比如CGI编程)。这可以通过用于复制文件描述符的dup或dup2函数来实现

dup函数创建一个新的文件描述符,该新文件描述符和原有文件描述符file_descriptor指向相同的文件、管道或者网络连接。并且dup返回的文件描述符总是取系统当前可用的最小整数值。dup2和dup类似,不过它将返回第一个不小于file_descriptor_two的整数值。dup和dup2系统调用失败时返回-1并设置errno。

例子 CGI服务器原理

#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>#include<assert.h>
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
int main(int argc,char*argv[])
{
    if(argc<=2)
    {
    printf("usage:%s ip_address port_number\n",basename(argv[0]));
    return 1;
    }
    const char*ip=argv[1];
    int port=atoi(argv[2]);
    struct sockaddr_in address;
    bzero(&address,sizeof(address));
    address.sin_family=AF_INET;
    inet_pton(AF_INET,ip,&address.sin_addr);
    address.sin_port=htons(port);
    int sock=socket(PF_INET,SOCK_STREAM,0);
    assert(sock>=0);
    int ret=bind(sock,(struct sockaddr*)&address,sizeof(address));
    assert(ret!=-1);
    ret=listen(sock,5);
    assert(ret!=-1);
    struct sockaddr_in client;
    socklen_t client_addrlength=sizeof(client);
    int connfd=accept(sock,(struct sockaddr*)&client,&
    client_addrlength);
    if(connfd<0)
    {
    printf("errno is:%d\n",errno);
    }
    else
    {
    close(STDOUT_FILENO);//关闭标准输出文件描述符STDOUT_FILENO(其值是1)
    dup(connfd);
    printf("abcd\n");
    close(connfd);
    }
    close(sock);
    return 0;
}

先关闭标准输出文件描述符STDOUT_FILENO(其值是1),然后复制socket文件描述符connfd。因为dup总是返回系统中最小的可用文件描述符,所以它的返回值实际上是1,即之前关闭的标准输出文件描述符的值。这样一来,服务器输出到标准输出的内容(这里是“abcd”)就会直接发送到与客户连接对应的socket上,因此printf调用的输出将被客户端获得(而不是显示在服务器程序的终端上)。这就是CGI服务器的基本工作原理。


#include<sys/sendfile.h>
ssize_t sendfile(int out_fd,int in_fd,off_t*offset,size_t count);

sendfile函数在两个文件描述符之间直接传递数据(完全在内核中操作),从而避免了内核缓冲区和用户缓冲区之间的数据拷贝,效率很高,这被称为零拷贝。

in_fd参数是待读出内容的文件描述符,

out_fd参数是待写入内容的文件描述符。

offset参数指定从读入文件流的哪个位置开始读,如果为空,则使用读入文件流默认的起始位置。

count参数指定在文件描述符in_fd和out_fd之间传输的字节数。

成功时返回传输的字节数,失败则返回-1并设置errno。

in_fd必须是一个支持类似mmap函数的文件描述符,即它必须指向真实的文件,不能是socket和管道;而out_fd则必须是一个socket。

#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>#include<assert.h>
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
#include<sys/types.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<sys/sendfile.h>
int main(int argc,char*argv[])
{
    if(argc<=3)
    {
        printf("usage:%s ip_address port_number
        filename\n",basename(argv[0]));
        return 1;
    }
    const char*ip=argv[1];
    int port=atoi(argv[2]);
    const char*file_name=argv[3];
    int filefd=open(file_name,O_RDONLY);
    assert(filefd>0);
    struct stat stat_buf;
    fstat(filefd,&stat_buf);
    struct sockaddr_in address;
    bzero(&address,sizeof(address));
    address.sin_family=AF_INET;
    inet_pton(AF_INET,ip,&address.sin_addr);
    address.sin_port=htons(port);
    int sock=socket(PF_INET,SOCK_STREAM,0);
    assert(sock>=0);
    int ret=bind(sock,(struct sockaddr*)&address,sizeof(address));
    assert(ret!=-1);
    ret=listen(sock,5);
    assert(ret!=-1);
    struct sockaddr_in client;
    socklen_t client_addrlength=sizeof(client);
    int connfd=accept(sock,(struct sockaddr*)&client,&
    client_addrlength);
    if(connfd<0)
    {
        printf("errno is:%d\n",errno);
    }
    else
    {
        sendfile(connfd,filefd,NULL,stat_buf.st_size);
        close(connfd);
    }
    close(sock);
    return 0;
}

#include<sys/mman.h>
void*mmap(void*start,size_t length,int prot,int flags,int fd,off_t offset);
int munmap(void*start,size_t length);//成功时返回0,失败则返回-1

mmap函数用于申请一段内存空间。我们可以将这段内存作为进程间通信的共享内存,也可以将文件直接映射到其中。munmap函数则释放由mmap创建的这段内存空间。

start参数允许用户使用某个特定的地址作为这段内存的起始地址。如果它被设置成NULL,则系统自动分配一个地址。

length参数指定内存段的长度。

prot参数用来设置内存段的访问权限。它可以取以下几个值的按位或:

❑PROT_READ,内存段可读。 ❑PROT_WRITE,内存段可写。 ❑PROT_EXEC,内存段可执行。 ❑PROT_NONE,内存段不能被访问。

flags参数控制内存段内容被修改后程序的行为。

fd参数是被映射文件对应的文件描述符。它一般通过open系统调用获得。

offset参数设置从文件的何处开始映射

成功时返回指向目标内存区域的指针,失败则返回MAP_FAILED((void*)-1)并设置errno


#include<fcntl.h>
ssize_t splice(int fd_in,loff_t*off_in,int fd_out,loff_t*off_out,size_t len,unsigned int flags);

splice函数用于在两个文件描述符之间移动数据,也是零拷贝操作。

fd_in参数是待输入数据的文件描述符。如果fd_in是一个管道文件描述符,那么off_in参数必须被设置为NULL。如果fd_in不是一个管道文件描述符(比如socket),那么off_in表示从输入数据流的何处开始读取数据。此时,若off_in被设置为NULL,则表示从输入数据流的当前偏移位置读入;若off_in不为NULL,则它将指出具体的偏移位置。

fd_out/off_out参数的含义与fd_in/off_in相同,不过用于输出数据流

fd_in和fd_out必须至少有一个是管道文件描述符

成功时返回移动字节的数量 失败时返回-1

#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
#include<fcntl.h>
int main(int argc,char*argv[])
{
    if(argc<=2)
    {
        printf("usage:%s ip_address port_number\n",basename(argv[0]));
        return 1;
    }
    const char*ip=argv[1];
    int port=atoi(argv[2]);
    struct sockaddr_in address;
    bzero(&address,sizeof(address));
    address.sin_family=AF_INET;
    inet_pton(AF_INET,ip,&address.sin_addr);
    address.sin_port=htons(port);
    int sock=socket(PF_INET,SOCK_STREAM,0);
    assert(sock>=0);
    int ret=bind(sock,(struct sockaddr*)&address,sizeof(address));
    assert(ret!=-1);
    ret=listen(sock,5);
    assert(ret!=-1);
    struct sockaddr_in client;
    socklen_t client_addrlength=sizeof(client);
    int connfd=accept(sock,(struct sockaddr*)&client,&
    client_addrlength);
    if(connfd<0)
    {
        printf("errno is:%d\n",errno);
    }
    else
    {
        int pipefd[2];
        assert(ret!=-1);
        ret=pipe(pipefd);/*创建管道*/
        /*将connfd上流入的客户数据定向到管道中*/
        ret=splice(connfd,NULL,pipefd[1],NULL,32768,SPLICE_F_MORE|SPLICE_
        F_MOVE);
        assert(ret!=-1);
        /*将管道的输出定向到connfd客户连接文件描述符*/
        ret=splice(pipefd[0],NULL,connfd,NULL,32768,SPLICE_F_MORE|SPLICE_
        F_MOVE);
        assert(ret!=-1);
        close(connfd);
    }
    close(sock);
    return 0;
}

将客户端的内容读入到pipefd[1]中,然后再使用splice函数从pipefd[0]中读出该内容到客户端,从而实现了简单高效的回射服务。整个过程未执行recv/send操作,因此也未涉及用户空间和内核空间之间的数据拷贝。


#include<fcntl.h>
ssize_t tee(int fd_in,int fd_out,size_t len,unsigned int flags);

在两个管道文件描述符之间复制数据,也是零拷贝操作

该函数的参数的含义与splice相同(但fd_in和fd_out必须都是管道文件描述符)。tee函数成功时返回在两个文件描述符之间复制的数据数量(字节数),失败时返回-1

#include<assert.h>
#include<stdio.h>
#include<unistd.h>
#include<errno.h>
#include<string.h>
#include<fcntl.h>
int main(int argc,char*argv[])
{   if(argc!=2)
    {
        printf("usage:%s<file>\n",argv[0]);
        return 1;
    }
    int filefd=open(argv[1],O_CREAT|O_WRONLY|O_TRUNC,0666);
    assert(filefd>0);
    int pipefd_stdout[2];
    int ret=pipe(pipefd_stdout);
    assert(ret!=-1);
    int pipefd_file[2];
    ret=pipe(pipefd_file);
    assert(ret!=-1);
    /*将标准输入内容输入管道pipefd_stdout*/
    ret=splice(STDIN_FILENO,NULL,pipefd_stdout[1],NULL,32768,SPLICE_
    F_MORE|SPLICE_F_MOVE);
    assert(ret!=-1);
    /*将管道pipefd_stdout的输出复制到管道pipefd_file的输入端*/
    ret=tee(pipefd_stdout[0],pipefd_file[1],32768,SPLICE_F_NONBLOCK)
    ;
    assert(ret!=-1);
    /*将管道pipefd_file的输出定向到文件描述符filefd上,从而将标准输入的内容写
    入文件*/
    ret=splice(pipefd_file[0],NULL,filefd,NULL,32768,SPLICE_F_MORE|S
    PLICE_F_MOVE);
    assert(ret!=-1);
    /*将管道pipefd_stdout的输出定向到标准输出,其内容和写入文件的内容完全一致
    */
    ret=splice(pipefd_stdout[0],NULL,STDOUT_FILENO,NULL,32768,SPLICE
    _F_MORE|SPLICE_F_MOVE);
    assert(ret!=-1);
    close(filefd);
    close(pipefd_stdout[0]);
    close(pipefd_stdout[1]);
    close(pipefd_file[0]);
    close(pipefd_file[1]);
    return 0;
}

#include<fcntl.h>
int fcntl(int fd,int cmd,…);

对文件描述符的各种控制操作

在网络编程中,fcntl函数通常用来将一个文件描述符设置为非阻塞的

int setnonblocking(int fd)
{
    int old_option=fcntl(fd,F_GETFL);/*获取文件描述符旧的状态标志*/
    int new_option=old_option|O_NONBLOCK;/*设置非阻塞标志*/
    fcntl(fd,F_SETFL,new_option);
    return old_option;/*返回文件描述符旧的状态标志,以便*/
    /*日后恢复该状态标志*/
}

第7章 Linux服务器程序规范

后台进程又称守护进程

它没有控制终端,因而也不会意外接收到用户输入。守护进程的父进程通常是init进程

日志

服务器的调试和维护都需要一个专业的日志系统 ,Linux提供一个守护进程来处理系统日志——syslogd ,升级版——rsyslogd。

用户进程是通过调用syslog函数生成系统日志的,将日志输出到一个UNIX本地域socket类型(AF_UNIX)的文件/dev/log中,rsyslogd则监听该文件以获取用户进程的输出 ,rsyslogd守护进程在接收到用户进程或内核输入的日志后,会把它们输出至某些特定的日志文件 ,调试信息会保存至/var/log/debug文件,普通信息保存至/var/log/messages文件,内核消息则保存至/var/log/kern.log文件

用户信息

当前进程的真实用户ID(UID)、有效用户ID(EUID)、真实组ID(GID)和有效组ID(EGID)

#include<sys/types.h>
#include<unistd.h>
uid_t getuid();/*获取真实用户ID*/
uid_t geteuid();/*获取有效用户ID*/
gid_t getgid();/*获取真实组ID*/
gid_t getegid();/*获取有效组ID*/
int setuid(uid_t uid);/*设置真实用户ID*/
int seteuid(uid_t uid);/*设置有效用户ID*/
int setgid(gid_t gid);/*设置真实组ID*/
int setegid(gid_t gid);/*设置有效组ID*/

一个进程拥有两个用户ID:UID和EUID。EUID存在的目的是方便资源访问:它使得运行程序的用户拥有该程序的有效用户的权限 ,比如su程序

进程组

进程PID ,进程组PGID

#include<unistd.h>
pid_t getpgid(pid_t pid);
int setpgid(pid_t pid,pid_t pgid);//将PID为pid的进程的PGID设置为pgid

每个进程组都有一个首领进程,其PGID和PID相同。

会话

有关联的进程组将形成一个会话

#include<unistd.h>
pid_t setsid(void);//创建一个会话,成功时返回新的进程组的PGID
pid_t getsid(pid_t pid);

该函数不能由进程组的首领进程调用 ,对于非组首领的进程,调用该函数不仅创建新会话,而且有如下 :

❑调用进程成为会话的首领,此时该进程是新会话的唯一成员。 ❑新建一个进程组,其PGID就是调用进程的PID,调用进程成为该组的首领。 ❑调用进程将甩开终端(如果有的话)。

第8章 高性能服务器程序框架

服务器模型

C/S模型

所有客户端都通过访问服务器来获取所需的资源

 

P2P模型

让网络上所有主机重新回归对等的地位 ,每台机器在消耗服务的同时也给别人提供服务

发现服务器:提供查找服务,使每个客户都能尽快地找到自己需要的资源

 

I/O模型

阻塞和非阻塞的概念能应用于所有文件描述符 ,阻塞的文件描述符为阻塞I/O,称非阻塞的文件描述符为非阻塞I/O。

阻塞I/O执行的系统调用可能因为无法立即完成而被操作系统挂起,直到等待的事件发生为止 ,可能被阻塞的系统调用包括accept、send、recv和connect。

非阻塞I/O执行的系统调用则总是立即返回,而不管事件是否已经发生,事件没有立即发生,这些系统调用就返回-1,对accept、send和recv而言,事件未发生时errno通常被设置成EAGAIN(意为“再来一次”)或者EWOULDBLOCK(意为“期望阻塞”);对connect而言,errno则被设置成EINPROGRESS(意为“在处理中”)。

非阻塞I/O通常要和I/O复用和SIGIO信号一起使用

I/O复用是最常使用的I/O通知机制,应用程序通过I/O复用函数向内核注册一组事件,内核通过I/O复用函数把其中就绪的事件通知给应用程序 ,I/O复用函数本身是阻塞的,它们能提高程序效率的原因在于它们具有同时监听多个I/O事件的能力。

阻塞I/O、I/O复用和信号驱动I/O都是同步I/O模型。因为在这三种I/O模型中,I/O的读写操作,都是在I/O事件发生之后,由应用程序来完成的。

异步I/O而言,用户可以直接对I/O执行读写操作,这些操作告诉内核用户读写缓冲区的位置,以及I/O操作完成之后内核通知应用程序的方式。异步I/O的读写操作总是立即返回,而不论I/O是否是阻塞的,因为真正的读写操作已经由内核接管

同步I/O模型要求用户代码自行执行I/O操作(将数据从内核缓冲区读入用户缓冲区,或将数据从用户缓冲区写入内核缓冲区),而异步I/O机制则由内核来执行I/O操作(数据在内核缓冲区和用户缓冲区之间的移动是由内核在“后台”完成的)。 你可以这样认为,同步I/O向应用程序通知的是I/O就绪事件,而异步I/O向应用程序通知的是I/O完成事件

 

两种高效的事件处理模式

同步I/O模型通常用于实现Reactor模式,异步I/O模型则用于实现Proactor模式。

Reactor模式

它要求主线程(I/O处理单元,下同)只负责监听文件描述上是否有事件发生,有的话就立即将该事件通知工作线程(逻辑单元,下同)。除此之外,主线程不做任何其他实质性的工作

使用同步I/O模型(以epoll_wait为例)实现的Reactor模式的工作流程是:

1)主线程往epoll内核事件表中注册socket上的读就绪事件。 2)主线程调用epoll_wait等待socket上有数据可读。 3)当socket上有数据可读时,epoll_wait通知主线程。主线程则将socket可读事件放入请求队列。 4)睡眠在请求队列上的某个工作线程被唤醒,它从socket读取数据,并处理客户请求,然后往epoll内核事件表中注册该socket上的写就绪事件。 5)主线程调用epoll_wait等待socket可写。 6)当socket可写时,epoll_wait通知主线程。主线程将socket可写事件放入请求队列。 7)睡眠在请求队列上的某个工作线程被唤醒,它往socket上写入服务器处理客户请求的结果。

Proactor模式

将所有I/O操作都交给主线程和内核来处理,工作线程仅仅负责业务逻辑

使用异步I/O模型(以aio_read和aio_write为例)实现的Proactor模式的工作流程是:

1)主线程调用aio_read函数向内核注册socket上的读完成事件,并告诉内核用户读缓冲区的位置,以及读操作完成时如何通知应用程序(这里以信号为例,详情请参考sigevent的man手册)。 2)主线程继续处理其他逻辑。 3)当socket上的数据被读入用户缓冲区后,内核将向应用程序发送一个信号,以通知应用程序数据已经可用。 4)应用程序预先定义好的信号处理函数选择一个工作线程来处理客户请求。工作线程处理完客户请求之后,调用aio_write函数向内核注册socket上的写完成事件,并告诉内核用户写缓冲区的位置,以及写操作完成时如何通知应用程序(仍然以信号为例)。 5)主线程继续处理其他逻辑。 6)当用户缓冲区的数据被写入socket之后,内核将向应用程序发送一个信号,以通知应用程序数据已经发送完毕。 7)应用程序预先定义好的信号处理函数选择一个工作线程来做善后处理,比如决定是否关闭socket。

两种高效的并发模式

并发模式是指I/O处理单元(处理客户连接)和多个逻辑单元(进程、线程)之间协调完成任务的方法。

半同步/半异步模式

在I/O模型中,“同步”和“异步”区分的是内核向应用程序通知的是何种I/O事件(是就绪事件还是完成事件),以及该由谁来完成I/O读写(是应用程序还是内核)。在并发模式中,“同步”指的是程序完全按照代码序列的顺序执行;“异步”指的是程序的执行需要由系统事件来驱动。常见的系统事件包括中断、信号等。

半同步/半异步模式中,同步线程用于处理客户逻辑,相当于逻辑单元;异步线程用于处理I/O事件,相当于I/O处理单元。异步线程监听到客户请求后,就将其封装成请求对象并插入请求队列中。请求队列将通知某个工作在同步模式的工作线程来读取并处理该请求对象。

领导者/追随者模式

多个工作线程轮流获得事件源集合,轮流监听、分发并处理事件的一种模式

当前的领导者如果检测到I/O事件,首先要从线程池中推选出新的领导者线程,然后处理I/O事件。此时,新的领导者等待新的I/O事件,而原来的领导者则处理I/O事件,二者实现了并发。

领导者/追随者模式包含如下几个组件:句柄集、线程集、事件处理器 和 具体的事件处理器

池是一组资源的集合,这组资源在服务器启动之初就被完全创建好并初始化,这称为静态资源分配。当服务器进入正式运行阶段,即开始处理客户请求的时候,如果它需要相关的资源,就可以直接从池中获取,无须动态分配。

内存池通常用于socket的接收缓存和发送缓存

进程池 :可以直接从进程池或线程池中取得一个执行实体,而无须动态地调用fork或pthread_create等函数来创建进程和线程

连接池是服务器预先和数据库程序建立的一组连接的集合。当某个逻辑单元需要访问数据库时,它可以直接从连接池中取得一个连接的实体并使用之。待完成数据库的访问之后,逻辑 单元再将该连接返还给连接池。

数据复制

避免不必要的数据复制,如果内核可以直接处理从socket或者文件读入的数据,则应用程序就没必要将这些数据从内核缓冲区复制到应用程序缓冲区中,而是可以使用“零拷贝”函数。

上下文切换和锁

并发程序必须考虑上下文切换,即进程切换或线程切换导致的系统开销问题(占用大量的CPU时间)。

共享资源的加锁保护,锁引入的代码不仅不处理任何业务逻辑,而且需要访问内核资源

第9章 I/O复用

同时监听多个文件描述符

select

在一段指定时间内,监听用户感兴趣的文件描述符上的可读、可写和异常等事件,select能处理的异常情况只有一种:socket上接收到带外数据。

#include<sys/select.h>
int select(int nfds,fd_set*readfds,fd_set*writefds,fd_set*exceptfds,struct timeval*timeout);
​
FD_ZERO(fd_set*fdset);/*清除fdset的所有位*/
FD_SET(int fd,fd_set*fdset);/*设置fdset的位fd*/
FD_CLR(int fd,fd_set*fdset);/*清除fdset的位fd*/
int FD_ISSET(int fd,fd_set*fdset);/*测试fdset的位fd是否被设置*/
​
struct timeval
{
long tv_sec;/*秒数*/
long tv_usec;/*微秒数*/
};//timeout传递NULL,则select将一直阻塞,直到某个文件描述符就绪。

nfds参数指定被监听的文件描述符的总数。它通常被设置为select监听的所有文件描述符中的最大值加1

readfds、writefds和exceptfds参数分别指向可读、可写和异常等事件对应的文件描述符集合

select成功时返回就绪(可读、可写和异常)文件描述符的总数,内核将修改readfds、writefds和exceptfds参数来通知应用程序哪些文件描述符已经就绪

timeout参数用来设置select函数的超时时间

#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<stdio.h>
#include<unistd.h>
#include<errno.h>
#include<string.h>
#include<fcntl.h>
#include<stdlib.h>
int main(int argc,char*argv[])
{
    if(argc<=2)
    {
        printf("usage:%s ip_address port_number\n",basename(argv[0]));
        return 1;
    }
    const char*ip=argv[1];
    int port=atoi(argv[2]);
    int ret=0;
    struct sockaddr_in address;
    bzero(&address,sizeof(address));
    address.sin_family=AF_INET;
    inet_pton(AF_INET,ip,&address.sin_addr);
    address.sin_port=htons(port);
    int listenfd=socket(PF_INET,SOCK_STREAM,0);
    assert(listenfd>=0);
    ret=bind(listenfd,(struct sockaddr*)&address,sizeof(address));
    assert(ret!=-1);
    ret=listen(listenfd,5);
    assert(ret!=-1);
    struct sockaddr_in client_address;
    socklen_t client_addrlength=sizeof(client_address);
    int connfd=accept(listenfd,(struct sockaddr*)&client_address,&
    client_addrlength);
    if(connfd<0)
    {
        printf("errno is:%d\n",errno);
        close(listenfd);
    }
    char buf[1024];
    fd_set read_fds;
    fd_set exception_fds;
    FD_ZERO(&read_fds);
    FD_ZERO(&exception_fds);
    while(1)
    {
        memset(buf,'\0',sizeof(buf));
        /*每次调用select前都要重新在read_fds和exception_fds中设置文件描述符
        connfd,因为事件发生之后,文件描述符集合将被内核修改*/
        FD_SET(connfd,&read_fds);
        FD_SET(connfd,&exception_fds);
        ret=select(connfd+1,&read_fds,NULL,&exception_fds,NULL);
        if(ret<0)
        {
            printf("selection failure\n");
            break;
        }
        /*对于可读事件,采用普通的recv函数读取数据*/
        if(FD_ISSET(connfd,&read_fds))
        {
            ret=recv(connfd,buf,sizeof(buf)-1,0);
            if(ret<=0){
            break;
        }
        printf("get%d bytes of normal data:%s\n",ret,buf);
        }
        /*对于异常事件,采用带MSG_OOB标志的recv函数读取带外数据*/
        else if(FD_ISSET(connfd,&exception_fds))
        {
            ret=recv(connfd,buf,sizeof(buf)-1,MSG_OOB);
            if(ret<=0)
            {
                break;
            }
            printf("get%d bytes of oob data:%s\n",ret,buf);
        }
    }
    close(connfd);
    close(listenfd);
    return 0;
}

poll

和select类似,也是在指定时间内轮询一定数量的文件描述符,以测试其中是否有就绪者。

#include<poll.h>
int poll(struct pollfd*fds,nfds_t nfds,int timeout);
​
struct pollfd
{
    int fd;/*文件描述符*/
    short events;/*注册的事件*/
    short revents;/*实际发生的事件,由内核填充*/
};

fds参数是一个pollfd结构类型的数组,它指定所有我们感兴趣的文件描述符上发生的可读、可写和异常等事件

 

nfds参数指定被监听事件集合fds的大小。

epoll

epoll使用一组函数来完成任务,而不是单个函数。

epoll把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传入文件描述符集或事件集

epoll需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表。这个文件描述符使用如下epoll_create函数来创建:

#include<sys/epoll.h>
int epoll_create(int size)//size告诉它事件表需要多大。

下面的函数用来操作epoll的内核事件表:

#include<sys/epoll.h>
int epoll_ctl(int epfd,int op,int fd,struct epoll_event*event)

fd参数是要操作的文件描述符,op参数则指定操作类型。操作类型有如下3种:

❑EPOLL_CTL_ADD,往事件表中注册fd上的事件。 ❑EPOLL_CTL_MOD,修改fd上的注册事件。 ❑EPOLL_CTL_DEL,删除fd上的注册事件

event参数指定事件

struct epoll_event
{
    __uint32_t events;/*epoll事件,和poll对应的宏前加上“E”,如EPOLLIN*/
    epoll_data_t data;/*用户数据*/
};  
​
typedef union epoll_data
{
    void*ptr;//指定与fd相关的用户数据
    int fd;//指定事件所从属的目标文件描述符
    uint32_t u32;
    uint64_t u64;
}epoll_data_t;

epoll_wait

epoll系列系统调用的主要接口是epoll_wait函数,该函数成功时返回就绪的文件描述符的个数

#include<sys/epoll.h>
int epoll_wait(int epfd,struct epoll_event*events,int
maxevents,int timeout);//maxevents参数指定最多监听多少个事件

epoll_wait函数如果检测到事件,就将所有就绪的事件从内核事件表(由epfd参数指定)中复制到它的第二个参数events指向的数组中

poll和epoll在使用上的差别

/*如何索引poll返回的就绪文件描述符*/
int ret=poll(fds,MAX_EVENT_NUMBER,-1);
/*必须遍历所有已注册文件描述符并找到其中的就绪者(当然,可以利用ret来稍做优化)*/
for(int i=0;i<MAX_EVENT_NUMBER;++i)
{
    if(fds[i].revents&POLLIN)/*判断第i个文件描述符是否就绪*/
    {
        int sockfd=fds[i].fd;
        /*处理sockfd*/
    }
}
/*如何索引epoll返回的就绪文件描述符*/
int ret=epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
/*仅遍历就绪的ret个文件描述符*/
for(int i=0;i<ret;i++)
{
    int sockfd=events[i].data.fd;
    /*sockfd肯定就绪,直接处理*/
}

LT和ET模式

epoll对文件描述符的操作有两种模式:LT(Level Trigger,电平触发)模式和ET(Edge Trigger,边沿触发)模式。LT模式是默认的工作模式,这种模式下epoll相当于一个效率较高的poll。

对于采用LT工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到该事件被处理。

采用ET工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用 程序后,应用程序必须立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件。

ET模式在很大程度上降低了同一个epoll事件被重复触发的次数,因此效率要比LT模式高

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
​
#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10
/*将文件描述符设置成非阻塞的*/
int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}
/*将文件描述符fd上的EPOLLIN注册到epollfd指示的epoll内核事件表中,参数enable_et指定是否对fd启用ET模式*/
void addfd( int epollfd, int fd, bool enable_et )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN;
    if( enable_et )
    {
        event.events |= EPOLLET;
    }
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}
/*LT模式的工作流程*/
void lt( epoll_event* events, int number, int epollfd, int listenfd )
{
    char buf[ BUFFER_SIZE ];
    for ( int i = 0; i < number; i++ )
    {
        int sockfd = events[i].data.fd;
        if ( sockfd == listenfd )
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof( client_address );
            int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
            addfd( epollfd, connfd, false );/*对connfd禁用ET模式*/
        }
        else if ( events[i].events & EPOLLIN )
        {
            /*只要socket读缓存中还有未读出的数据,这段代码就被触发*/
            printf( "event trigger once\n" );
            memset( buf, '\0', BUFFER_SIZE );
            int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
            if( ret <= 0 )
            {
                close( sockfd );
                continue;
            }
            printf( "get %d bytes of content: %s\n", ret, buf );
        }
        else
        {
            printf( "something else happened \n" );
        }
    }
}
/*ET模式的工作流程*/
void et( epoll_event* events, int number, int epollfd, int listenfd )
{
    char buf[ BUFFER_SIZE ];
    for ( int i = 0; i < number; i++ )
    {
        int sockfd = events[i].data.fd;
        if ( sockfd == listenfd )
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof( client_address );
            int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
            addfd( epollfd, connfd, true );
        }
        else if ( events[i].events & EPOLLIN )
        {
/*这段代码不会被重复触发,所以我们循环读取数据,以确保把socket读缓存中的所有数据读出*/
            printf( "event trigger once\n" );
            while( 1 )
            {
                memset( buf, '\0', BUFFER_SIZE );
                int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
                if( ret < 0 )
                {
                /*对于非阻塞IO,下面的条件成立表示数据已经全部读取完毕。此后,epoll就能再次触发sockfd上的EPOLLIN事件,以驱动下一次读操作*/
                    if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
                    {
                        printf( "read later\n" );
                        break;
                    }
                    close( sockfd );
                    break;
                }
                else if( ret == 0 )
                {
                    close( sockfd );
                }
                else
                {
                    printf( "get %d bytes of content: %s\n", ret, buf );
                }
            }
        }
        else
        {
            printf( "something else happened \n" );
        }
    }
}
​
int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );
​
    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );
​
    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );
​
    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );
​
    ret = listen( listenfd, 5 );
    assert( ret != -1 );
​
    epoll_event events[ MAX_EVENT_NUMBER ];
    int epollfd = epoll_create( 5 );
    assert( epollfd != -1 );
    addfd( epollfd, listenfd, true );
​
    while( 1 )
    {
        int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ret < 0 )
        {
            printf( "epoll failure\n" );
            break;
        }
    
        lt( events, ret, epollfd, listenfd );/*使用LT模式*/
        //et( events, ret, epollfd, listenfd );/*使用ET模式*/
    }
​
    close( listenfd );
    return 0;
}
​

EPOLLONESHOT事件

使用ET模式,一个socket上的某个事件还是可能被触发多次,可能出现了两个线程同时操作一个socket的局面。我们期望的是一个socket连接在任一时刻都只被一个线程处理。使用epoll的EPOLLONESHOT事件实现。

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
​
#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024
struct fds
{
   int epollfd;
   int sockfd;
};
​
int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}
/*将fd上的EPOLLIN和EPOLLET事件注册到epollfd指示的epoll内核事件表中,参
数oneshot指定是否注册fd上的EPOLLONESHOT事件*/
void addfd( int epollfd, int fd, bool oneshot )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    if( oneshot )
    {
        event.events |= EPOLLONESHOT;
    }
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}
/*重置fd上的事件。这样操作之后,尽管fd上的EPOLLONESHOT事件被注册,但是操
作系统仍然会触发fd上的EPOLLIN事件,且只触发一次*/
void reset_oneshot( int epollfd, int fd )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );
}
/*工作线程*/
void* worker( void* arg )
{
    int sockfd = ( (fds*)arg )->sockfd;
    int epollfd = ( (fds*)arg )->epollfd;
    printf( "start new thread to receive data on fd: %d\n", sockfd );
    char buf[ BUFFER_SIZE ];
    memset( buf, '\0', BUFFER_SIZE );
    /*循环读取sockfd上的数据,直到遇到EAGAIN错误*/
    while( 1 )
    {
        int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
        if( ret == 0 )
        {
            close( sockfd );
            printf( "foreiner closed the connection\n" );
            break;
        }
        else if( ret < 0 )
        {
            if( errno == EAGAIN )
            {
                reset_oneshot( epollfd, sockfd );
                printf( "read later\n" );
                break;
            }
        }
        else
        {
            printf( "get content: %s\n", buf );
            sleep( 5 );/*休眠5s,模拟数据处理过程*/
        }
    }
    printf( "end thread receiving data on fd: %d\n", sockfd );
}
​
int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );
​
    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );
​
    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );
​
    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );
​
    ret = listen( listenfd, 5 );
    assert( ret != -1 );
​
    epoll_event events[ MAX_EVENT_NUMBER ];
    int epollfd = epoll_create( 5 );
    assert( epollfd != -1 );
    /*注意,监听socket listenfd上是不能注册EPOLLONESHOT事件的,否则应用程序
只能处理一个客户连接!因为后续的客户连接请求将不再触发listenfd上的EPOLLIN事件*/
    addfd( epollfd, listenfd, false );
​
    while( 1 )
    {
        int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ret < 0 )
        {
            printf( "epoll failure\n" );
            break;
        }
    
        for ( int i = 0; i < ret; i++ )
        {
            int sockfd = events[i].data.fd;
            if ( sockfd == listenfd )
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                /*对每个非监听文件描述符都注册EPOLLONESHOT事件*/
                addfd( epollfd, connfd, true );
            }
            else if ( events[i].events & EPOLLIN )
            {
                pthread_t thread;
                fds fds_for_new_worker;
                fds_for_new_worker.epollfd = epollfd;
                fds_for_new_worker.sockfd = sockfd;
                /*新启动一个工作线程为sockfd服务*/
                pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
            }
            else
            {
                printf( "something else happened \n" );
            }
        }
    }
​
    close( listenfd );
    return 0;
}

三组I/O复用函数的比较

 

 

I/O复用的高级应用二:聊天室程序

聊天室客户端程序

#define _GNU_SOURCE 1
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <poll.h>
#include <fcntl.h>
​
#define BUFFER_SIZE 64
​
int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );
​
    struct sockaddr_in server_address;
    bzero( &server_address, sizeof( server_address ) );
    server_address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &server_address.sin_addr );
    server_address.sin_port = htons( port );
​
    int sockfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( sockfd >= 0 );
    if ( connect( sockfd, ( struct sockaddr* )&server_address, sizeof( server_address ) ) < 0 )
    {
        printf( "connection failed\n" );
        close( sockfd );
        return 1;
    }
​
    pollfd fds[2];
    /*注册文件描述符0(标准输入)和文件描述符sockfd上的可读事件*/
    fds[0].fd = 0;
    fds[0].events = POLLIN;
    fds[0].revents = 0;
    fds[1].fd = sockfd;
    fds[1].events = POLLIN | POLLRDHUP;
    fds[1].revents = 0;
    char read_buf[BUFFER_SIZE];
    int pipefd[2];
    int ret = pipe( pipefd );
    assert( ret != -1 );
​
    while( 1 )
    {
        ret = poll( fds, 2, -1 );
        if( ret < 0 )
        {
            printf( "poll failure\n" );
            break;
        }
​
        if( fds[1].revents & POLLRDHUP )
        {
            printf( "server close the connection\n" );
            break;
        }
        else if( fds[1].revents & POLLIN )
        {
            memset( read_buf, '\0', BUFFER_SIZE );
            recv( fds[1].fd, read_buf, BUFFER_SIZE-1, 0 );
            printf( "%s\n", read_buf );
        }
​
        if( fds[0].revents & POLLIN )
        {
            /*使用splice将用户输入的数据直接写到sockfd上(零拷贝)*/
            ret = splice( 0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE );
            ret = splice( pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE );
        }
    }
    
    close( sockfd );
    return 0;
}
​

服务器

#define _GNU_SOURCE 1
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <poll.h>
​
#define USER_LIMIT 5/*最大用户数量*/
#define BUFFER_SIZE 64/*读缓冲区的大小*/
#define FD_LIMIT 65535/*文件描述符数量限制*/
/*客户数据:客户端socket地址、待写到客户端的数据的位置、从客户端读入的数据*/
struct client_data
{
    sockaddr_in address;
    char* write_buf;
    char buf[ BUFFER_SIZE ];
};
​
int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}
​
int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );
​
    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );
​
    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );
​
    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );
​
    ret = listen( listenfd, 5 );
    assert( ret != -1 );
    /*创建users数组,分配FD_LIMIT个client_data对象。可以预期:每个可能的
socket连接都可以获得一个这样的对象,并且socket的值可以直接用来索引(作为数组的
下标)socket连接对应的client_data对象,这是将socket和客户数据关联的简单而高
效的方式*/
    client_data* users = new client_data[FD_LIMIT];
    /*尽管我们分配了足够多的client_data对象,但为了提高poll的性能,仍然有必要
限制用户的数量*/
    pollfd fds[USER_LIMIT+1];
    int user_counter = 0;
    for( int i = 1; i <= USER_LIMIT; ++i )
    {
        fds[i].fd = -1;
        fds[i].events = 0;
    }
    fds[0].fd = listenfd;
    fds[0].events = POLLIN | POLLERR;
    fds[0].revents = 0;
​
    while( 1 )
    {
        ret = poll( fds, user_counter+1, -1 );
        if ( ret < 0 )
        {
            printf( "poll failure\n" );
            break;
        }
    
        for( int i = 0; i < user_counter+1; ++i )
        {
            if( ( fds[i].fd == listenfd ) && ( fds[i].revents & POLLIN ) )
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                if ( connfd < 0 )
                {
                    printf( "errno is: %d\n", errno );
                    continue;
                }
                /*如果请求太多,则关闭新到的连接*/
                if( user_counter >= USER_LIMIT )
                {
                    const char* info = "too many users\n";
                    printf( "%s", info );
                    send( connfd, info, strlen( info ), 0 );
                    close( connfd );
                    continue;
                }
                /*对于新的连接,同时修改fds和users数组。前文已经提到,                         users[connfd]对应于新连接文件描述符connfd的客户数据*/
                user_counter++;
                users[connfd].address = client_address;
                setnonblocking( connfd );
                fds[user_counter].fd = connfd;
                fds[user_counter].events = POLLIN | POLLRDHUP | POLLERR;
                fds[user_counter].revents = 0;
                printf( "comes a new user, now have %d users\n", user_counter );
            }
            else if( fds[i].revents & POLLERR )
            {
                printf( "get an error from %d\n", fds[i].fd );
                char errors[ 100 ];
                memset( errors, '\0', 100 );
                socklen_t length = sizeof( errors );
                if( getsockopt( fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length ) < 0 )
                {
                    printf( "get socket option failed\n" );
                }
                continue;
            }
            else if( fds[i].revents & POLLRDHUP )
            {
                /*如果客户端关闭连接,则服务器也关闭对应的连接,并将用户总数减1*/
                users[fds[i].fd] = users[fds[user_counter].fd];
                close( fds[i].fd );
                fds[i] = fds[user_counter];
                i--;
                user_counter--;
                printf( "a client left\n" );
            }
            else if( fds[i].revents & POLLIN )
            {
                int connfd = fds[i].fd;
                memset( users[connfd].buf, '\0', BUFFER_SIZE );
                ret = recv( connfd, users[connfd].buf, BUFFER_SIZE-1, 0 );
                printf( "get %d bytes of client data %s from %d\n", ret, users[connfd].buf, connfd );
                if( ret < 0 )
                {
                    /*如果读操作出错,则关闭连接*/
                    if( errno != EAGAIN )
                    {
                        close( connfd );
                        users[fds[i].fd] = users[fds[user_counter].fd];
                        fds[i] = fds[user_counter];
                        i--;
                        user_counter--;
                    }
                }
                else if( ret == 0 )
                {
                    printf( "code should not come to here\n" );
                }
                else
                {
                    /*如果接收到客户数据,则通知其他socket连接准备写数据*/
                    for( int j = 1; j <= user_counter; ++j )
                    {
                        if( fds[j].fd == connfd )
                        {
                            continue;
                        }
                        
                        fds[j].events |= ~POLLIN;
                        fds[j].events |= POLLOUT;
                        users[fds[j].fd].write_buf = users[connfd].buf;
                    }
                }
            }
            else if( fds[i].revents & POLLOUT )
            {
                int connfd = fds[i].fd;
                if( ! users[connfd].write_buf )
                {
                    continue;
                }
                ret = send( connfd, users[connfd].write_buf, strlen( users[connfd].write_buf ), 0 );
                users[connfd].write_buf = NULL;
                /*写完数据后需要重新注册fds[i]上的可读事件*/
                fds[i].events |= ~POLLOUT;
                fds[i].events |= POLLIN;
            }
        }
    }
​
    delete [] users;
    close( listenfd );
    return 0;
}
​

同时处理TCP和UDP服务

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
​
#define MAX_EVENT_NUMBER 1024
#define TCP_BUFFER_SIZE 512
#define UDP_BUFFER_SIZE 1024
​
int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}
​
void addfd( int epollfd, int fd )
{
    epoll_event event;
    event.data.fd = fd;
    //event.events = EPOLLIN | EPOLLET;
    event.events = EPOLLIN;
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}
​
int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );
​
    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );
    /*创建TCP socket,并将其绑定到端口port上*/
    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );
​
    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );
​
    ret = listen( listenfd, 5 );
    assert( ret != -1 );
    /*创建UDP socket,并将其绑定到端口port上*/
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );
    int udpfd = socket( PF_INET, SOCK_DGRAM, 0 );
    assert( udpfd >= 0 );
​
    ret = bind( udpfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );
​
    epoll_event events[ MAX_EVENT_NUMBER ];
    int epollfd = epoll_create( 5 );
    assert( epollfd != -1 );
    /*注册TCP socket和UDP socket上的可读事件*/
    addfd( epollfd, listenfd );
    addfd( epollfd, udpfd );
​
    while( 1 )
    {
        int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( number < 0 )
        {
            printf( "epoll failure\n" );
            break;
        }
    
        for ( int i = 0; i < number; i++ )
        {
            int sockfd = events[i].data.fd;
            if ( sockfd == listenfd )
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                addfd( epollfd, connfd );
            }
            else if ( sockfd == udpfd )
            {
                char buf[ UDP_BUFFER_SIZE ];
                memset( buf, '\0', UDP_BUFFER_SIZE );
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
​
                ret = recvfrom( udpfd, buf, UDP_BUFFER_SIZE-1, 0, ( struct sockaddr* )&client_address, &client_addrlength );
                if( ret > 0 )
                {
                    sendto( udpfd, buf, UDP_BUFFER_SIZE-1, 0, ( struct sockaddr* )&client_address, client_addrlength );
                }
            }
            else if ( events[i].events & EPOLLIN )
            {
                char buf[ TCP_BUFFER_SIZE ];
                while( 1 )
                {
                    memset( buf, '\0', TCP_BUFFER_SIZE );
                    ret = recv( sockfd, buf, TCP_BUFFER_SIZE-1, 0 );
                    if( ret < 0 )
                    {
                        if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
                        {
                            break;
                        }
                        close( sockfd );
                        break;
                    }
                    else if( ret == 0 )
                    {
                        close( sockfd );
                    }
                    else
                    {
                        send( sockfd, buf, ret, 0 );
                    }
                }
            }
            else
            {
                printf( "something else happened \n" );
            }
        }
    }
​
    close( listenfd );
    return 0;
}
​

第10章 信号

sigaction函数

#include <signal.h>
int sigaction(int signum, const struct sigaction *act, struct sigaction *oldact);
  • signum表示操作的信号。

  • act表示对信号设置新的处理方式。

  • oldact表示信号原来的处理方式。

  • 返回值,0 表示成功,-1 表示有错误发生

struct sigaction {
    void (*sa_handler)(int);
    void (*sa_sigaction)(int, siginfo_t *, void *);
    sigset_t sa_mask;
    int sa_flags;
    void (*sa_restorer)(void);
}
  • sa_handler是一个函数指针,指向信号处理函数

  • sa_sigaction同样是信号处理函数,有三个参数,可以获得关于信号更详细的信息

  • sa_mask用来指定在信号处理函数执行期间需要被屏蔽的信号

  • sa_flags用于指定信号处理的行为

    • SA_RESTART,使被信号打断的系统调用自动重新发起

    • SA_NOCLDSTOP,使父进程在它的子进程暂停或继续运行时不会收到 SIGCHLD 信号

    • SA_NOCLDWAIT,使父进程在它的子进程退出时不会收到 SIGCHLD 信号,这时子进程如果退出也不会成为僵尸进程

    • SA_NODEFER,使对信号的屏蔽无效,即在信号处理函数执行期间仍能发出这个信号

    • SA_RESETHAND,信号处理之后重新设置为默认的处理方式

    • SA_SIGINFO,使用 sa_sigaction 成员而不是 sa_handler 作为信号处理函数

  • sa_restorer一般不使用

sigfillset函数

#include <signal.h>
​
int sigfillset(sigset_t *set);

用来将参数set信号集初始化,然后把所有的信号加入到此信号集里。

alarm函数

#include <unistd.h>;
​
#define SIGALRM  14     //由alarm系统调用产生timer时钟信号
#define SIGTERM  15     //终端发送的终止信号
unsigned int alarm(unsigned int seconds);

设置信号传送闹钟,即用来设置信号SIGALRM在经过参数seconds秒数后发送给目前的进程。如果未设置信号SIGALRM的处理函数,那么alarm()默认处理终止进程.

kill

#include<sys/types.h>
#include<signal.h>
int kill(pid_t pid,int sig);

进程给其他进程发送信号,该函数把信号sig发送给目标进程;目标进程由pid参数指定

 

SIGHUP

当挂起进程的控制终端时,SIGHUP信号将被触发,通常利用SIGHUP信号来强制服务器重读配置文件

SIGPIPE

往一个读端关闭的管道或socket连接中写数据将引发SIGPIPE信号

SIGURG

内核通知应用程序带外数据到达

第13章 多进程编程

fork

#include<sys/types.h>
#include<unistd.h>
pid_t fork(void);

在父进程中返回的是子进程的PID,在子进程中则返回0。失败时返回-1,并设置errno

fork函数复制当前进程,在内核进程表中创建一个新的进程表项,子进程的代码与父进程完全相同,同时它还会复制父进程的数据(堆数据、栈数据和静态数据),写时复制 ,文件描述符的引用计数加1,父进程的用户根目录、当前工作目录等变量的引用计数均会加1。

僵尸进程

当子进程结束运行时,内核不会立即释放该进程的进程表表项,以满足父进程后续对该子进程退出信息的查询 。在子进程结束运行之后,父进程读取其退出状态之前,我们称该子进程处于僵尸态。或父进程结束或者异常终止,而子进程继续运行

避免了僵尸进程

#include<sys/types.h>
#include<sys/wait.h>
pid_t wait(int*stat_loc);
pid_t waitpid(pid_t pid,int*stat_loc,int options);

wait函数将阻塞进程,直到该进程的某个子进程结束运行为止。它返回结束运行的子进程的PID,并将该子进程的退出状态信息存储于stat_loc参数指向的内存中

waitpid只等待由pid参数指定的子进程。如果pid取值为-1,那么它就和wait函数相同,即等待任意一个子进程结束。stat_loc参数的含义和wait函数的stat_loc参数相同。options的取值是WNOHANG时,waitpid调用将是非阻塞的

管道

管道只能用于有关联的两个进程(比如父、子进程)间的通信。管道是父进程和子进程间通信的常用手段。

信号量

进程的同步问题,以确保任一时刻只有一个进程可以拥有对资源的独占式访问

程序对共享资源的访问引发了进程之间的竞态条件,对共享资源的访问代码称为临界区或关键代码

信号量是一种特殊的变量,它只能取自然数值并且只支持两种操作:等待(wait)和信号(signa),对信号量的这两种操作更常用的称呼是P、V操作

假设有信号量SV,则对它的P、V操作含义如下

❑P(SV),如果SV的值大于0,就将它减1;如果SV的值为0,则挂起进程的执行。 ❑V(SV),如果有其他进程因为等待SV而挂起,则唤醒之;如果没有,则将SV加1。

共享内存

共享内存通常和其他进程间通信方式一起使用

消息队列

两个进程之间传递二进制块数据的一种简单有效的方式

第14章 多线程编程

线程

是程序中完成一个独立任务的完整执行序列,即一个可调度的实体,为内核线程和用户 线程。内核线程,在有的系统上也称为LWP(Light Weight Process,轻量级进程),运行在内核空间,由内核来调度;用户线程运行在用户空间,由线程库来调度。当进程的一个内核线程获得CPU的使用权时,它就加载并运行一个用户线程。可见,内核线程相当于用户线程运行的“容器”。

一个进程可以拥有M个内核线程和N个用户线程,其中M≤N。 按照M:N的取值,线程的实现方式可分为三种模式:完全在用户空间实现、完全由内核调度和双层调度

完全在用户空间实现的线程无须内核的支持,内核甚至根本不知道这些线程的存在。进程的所有执行线程共享该进程的时间片,它们对外表现出相同的优先级。 这种实现方式, M个用户空间线程对应1个内核线程,而该内核线程实际上就是进程本身 。优点:创建和调度线程都无须内核的干预,因此速度相当快。并且由于它不占用额外的内核资源,缺点:对于多处理器系统,一个进程的多个线程无法运行在不同的CPU上,因为内核是按照其最小调度单位来分配CPU的

完全由内核调度的模式将创建、调度线程的任务都交给了内核,运行在用户空间的线程库无须执行管理任务,这与完全在用户空间实现的线程恰恰相反。二者的优缺点也正好互换。

双层调度模式是前两种实现模式的混合体:内核调度M个内核线程,线程库调度N个用户线程。这种线程实现方式结合了前两种方式的优点:不但不会消耗过多的内核资源,而且线程切换速度也较快,同时它可以充分利用多处理器的优势

创建线程

#include<pthread.h>
int pthread_create(pthread_t*thread,const
pthread_attr_t*attr,void*(*start_routine)(void*),void*arg);

thread参数是新线程的标识符,attr参数用于设置新线程的属性。给它传递NULL表示使用默认线程属性。 start_routine和arg参数分别指定新线程将运行的函数及其参数。 pthread_create成功时返回0

#include<pthread.h>
void pthread_exit(void*retval);

安全、干净地退出,通过retval参数向线程的回收者传递其退出信息

#include<pthread.h>
int pthread_join(pthread_t thread,void**retval);

一个进程中的所有线程都可以调用pthread_join函数来等待线程结束。thread参数是目标线程的标识符,retval参数则是目标线程返回的退出信息。该函数会一直阻塞,直到被回收的线程结束为止

#include<pthread.h>
int pthread_cancel(pthread_t thread);
​
int pthread_setcancelstate(int state,int*oldstate);//第一个参数分别用于设置线程的取消状态(是否允许取消)和取消类型.第二个参数则分别记录线程原来的取消状态和取消类型
int pthread_setcanceltype(int type,int*oldtype)

异常终止一个线程,即取消线程,

线程同步的机制

POSIX信号量、互斥锁(互斥量)和条件变量

POSIX信号量以sem_开头

#include<semaphore.h>
int sem_init(sem_t*sem,int pshared,unsigned int value);
int sem_destroy(sem_t*sem);
int sem_wait(sem_t*sem);
int sem_trywait(sem_t*sem);
int sem_post(sem_t*sem);

sem_init函数用于初始化一个未命名的信号量(POSIX信号量API支持命名信号量,不过本书不讨论它)。pshared参数指定信号量的类型。如果其值为0,就表示这个信号量是当前进程的局部信号量,否则该信号量就可以在多个进程之间共享。value参数指定信号量的初始值。此外,初始化一个已经被初始化的信号量将导致不可预期的结果。

sem_destroy函数用于销毁信号量,以释放其占用的内核资源。如果销毁一个正被其他线程等待的信号量,则将导致不可预期的结果。

sem_wait函数以原子操作的方式将信号量的值减1。如果信号量的值为0,则sem_wait将被阻塞,直到这个信号量具有非0值。

sem_trywait与sem_wait函数相似,不过它始终立即返回,而不论被操作的信号量是否具有非0值,相当于sem_wait的非阻塞版本。当信号量的值非0时,sem_trywait对信号量执行减1操作。当信号量的值为0时,它将返回-1并设置errno为EAGAIN。

sem_post函数以原子操作的方式将信号量的值加1。当信号量的值大于0时,其他正在调用sem_wait等待信号量的线程将被唤醒。上面这些函数成功时返回0,失败则返回-1并设置errno。

互斥锁(也称互斥量)可以用于保护关键代码段,以确保其独占式的访问

#include<pthread.h>
int pthread_mutex_init(pthread_mutex_t*mutex,const
pthread_mutexattr_t*mutexattr);
int pthread_mutex_destroy(pthread_mutex_t*mutex);
int pthread_mutex_lock(pthread_mutex_t*mutex);
int pthread_mutex_trylock(pthread_mutex_t*mutex);
int pthread_mutex_unlock(pthread_mutex_t*mutex);

pthread_mutex_init函数用于初始化互斥锁。mutexattr参数指定互斥锁的属性。如果将它设置为NULL,则表示使用默认属性

pthread_mutex_destroy函数用于销毁互斥锁,以释放其占用的内核资源。销毁一个已经加锁的互斥锁将导致不可预期的后果

pthread_mutex_lock函数以原子操作的方式给一个互斥锁加锁。如果目标互斥锁已经被锁上,则pthread_mutex_lock调用将阻塞,直到该互斥锁的占有者将其解锁。

pthread_mutex_trylock与pthread_mutex_lock函数类似,不过它始终立即返回,而不论被操作的互斥锁是否已经被加锁,相当于pthread_mutex_lock的非阻塞版本。当目标互斥锁未被加锁时,pthread_mutex_trylock对互斥锁执行加锁操作。当互斥锁已经被加锁时,pthread_mutex_trylock将返回错误码EBUSY。需要注意的是,这里讨论的pthread_mutex_lock和pthread_mutex_trylock的行为是针对普通锁而言的

pthread_mutex_unlock函数以原子操作的方式给一个互斥锁解锁。如果此时有其他线程正在等待这个互斥锁,则这些线程中的某一个将获得它。

死锁:在一个线程中对一个已经加锁的普通锁再次加锁,将导致死锁。

条件变量 :用于在线程之间同步共享数据的值,条件变量提供了一种线程间的通知机制:当某个共享数据达到某个值的时候,唤醒等待这个共享数据的线程。

#include<pthread.h>
int pthread_cond_init(pthread_cond_t*cond,const
pthread_condattr_t*cond_attr);//参数cond指向要操作的目标条件变量
int pthread_cond_destroy(pthread_cond_t*cond);
int pthread_cond_broadcast(pthread_cond_t*cond);
int pthread_cond_signal(pthread_cond_t*cond);
int pthread_cond_wait(pthread_cond_t*cond,pthread_mutex_t*mutex);

pthread_cond_init函数用于初始化条件变量。cond_attr参数指定条件变量的属性。如果将它设置为NULL,则表示使用默认属性

pthread_cond_destroy函数用于销毁条件变量,以释放其占用的内核资源。销毁一个正在被等待的条件变量将失败并返回EBUSY

pthread_cond_broadcast函数以广播的方式唤醒所有等待目标条件变量的线程。

pthread_cond_signal函数用于唤醒一个等待目标条件变量的线程。

pthread_cond_wait函数用于等待目标条件变量。mutex参数是用于保护条件变量的互斥锁,以确保pthread_cond_wait操作的原子性。

3种

线程同步机制分别封装成3个类

#ifndef LOCKER_H
#define LOCKER_H
​
#include <exception>
#include <pthread.h>
#include <semaphore.h>
/*封装信号量的类*/
class sem
{
public:
    /*创建并初始化信号量*/
    sem()
    {
        if( sem_init( &m_sem, 0, 0 ) != 0 )
        {
            /*构造函数没有返回值,可以通过抛出异常来报告错误*/
            throw std::exception();
        }
    }
    /*销毁信号量*/
    ~sem()
    {
        sem_destroy( &m_sem );
    }
    /*等待信号量*/
    bool wait()
    {
        return sem_wait( &m_sem ) == 0;
    }
    /*增加信号量*/
    bool post()
    {
        return sem_post( &m_sem ) == 0;
    }
​
private:
    sem_t m_sem;
};
​
/*封装互斥锁的类*/
class locker
{
public:
    /*创建并初始化互斥锁*/
    locker()
    {
        if( pthread_mutex_init( &m_mutex, NULL ) != 0 )
        {
            throw std::exception();
        }
    }
    /*销毁互斥锁*/
    ~locker()
    {
        pthread_mutex_destroy( &m_mutex );
    }
    /*获取互斥锁*/
    bool lock()
    {
        return pthread_mutex_lock( &m_mutex ) == 0;
    }
    /*释放互斥锁*/
    bool unlock()
    {
        return pthread_mutex_unlock( &m_mutex ) == 0;
    }
​
private:
    pthread_mutex_t m_mutex;
};
​
/*封装条件变量的类*/
class cond
{
public:
    /*创建并初始化条件变量*/
    cond()
    {
        if( pthread_mutex_init( &m_mutex, NULL ) != 0 )
        {
            throw std::exception();
        }
        if ( pthread_cond_init( &m_cond, NULL ) != 0 )
        {
            /*构造函数中一旦出现问题,就应该立即释放已经成功分配了的资源*/
            pthread_mutex_destroy( &m_mutex );
            throw std::exception();
        }
    }
    /*销毁条件变量*/
    ~cond()
    {
        pthread_mutex_destroy( &m_mutex );
        pthread_cond_destroy( &m_cond );
    }
    /*等待条件变量*/
    bool wait()
    {
        int ret = 0;
        pthread_mutex_lock( &m_mutex );
        ret = pthread_cond_wait( &m_cond, &m_mutex );
        pthread_mutex_unlock( &m_mutex );
        return ret == 0;
    }
    /*唤醒等待条件变量的线程*/
    bool signal()
    {
        return pthread_cond_signal( &m_cond ) == 0;
    }
​
private:
    pthread_mutex_t m_mutex;
    pthread_cond_t m_cond;
};
​
#endif
​

可重入

如果一个函数能被多个线程同时调用且不发生竞态条件,则我们称它是线程安全的(thread safe),或者说它是可重入函数

多不可重入的库函数提供了对应的可重入版本 ,在原函数名尾部加上_r ,比如,函数 localtime对应的可重入函数是localtime_r ,在多线程程序中调用库函数,一定要使用其可重入版本,否则可能导致预想不到的结果

第15章 进程池和线程池

进程池

动态创建进程(或线程)是比较耗费时间的,这将导致较慢的客户响应。由服务器预先创建的一组子进程,这些子进程的数目在3~10个之间 ,进程池中的所有子进程都运行着相同的代码,并具有相同的属性,比如优先级、PGID等。当有新的任务到来时,主进程将通过某种方式选择进程池中的某一个子进程来为之服务。

主进程选择哪个子进程来为新任务服务,则有两种方式:

❑主进程使用某种算法来主动选择子进程。 随机算法和Round Robin(轮流选取)算法等

❑主进程和所有子进程通过一个共享的工作队列来同步,子进程都睡眠在该工作队列上。当有新的任务到来时,主进程将任务添加到工作队列中。

当选择好子进程后,主进程还需要使用某种通知机制来告诉目标子进程有新任务需要处理,并传递必要的数据。 最简单的方法是,在父进程和子进程之间预先建立好一条管道

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到