一、网络基础
1.协议
1.1 协议的概念
典型协议:
1.2 分层模型
总结:
1.3 通信过程
总结:
1.4 以太网帧和ARP请求
(1)以太网帧格式
(2)ARP数据报格式
例:
ARP请求的作用范围是在局域网内,若局域网内无满足的物理主机,则会将请求发送打默认网关(路由器),由路由器发送到下一网络,再进行ARP请求。
总结:
1.5 IP协议
(1)数据报格式
总结:
一般跳数指经过了多少个路由器
1.6 UDP协议
数据报格式:
总结:
1.7 TCP协议
数据报格式:
总结:
1.8 CS和BS模型
(1)基本概念
(2)优缺点
总结:
(3)对比
可以大致认为安装软件或app的为C/S架构,使用web访问的为B/S架构
2. TCP协议
2.1 TCP通信时序–三次握手
三次握手发生在内核空间
SYN报文段不能携带数据,但要消耗一个序号,1000
确认报文段不能携带数据,也要消耗一个序号,8000;同时确认号为1001,表示1001号以前的数据已收到或下一个期望收到的序号为1001
第三个握手可以携带数据,若不携带数据,则不消耗序号
2.2 断开连接—四次挥手
当处于半关闭状态时,客户端关闭写缓冲区,保留读缓冲区
2.3 滑动窗口—TCP流量控制
滑动窗口的目的:匹配发送方的发送速率和接收方的接收速率
发送报文中的win字段保存当前发送窗口的大小
确认报文中的win字段保存接收方接收窗口的大小,发送方根据这个大小调整自己发送窗口的大小
总结:
2.4 TCP状态转换
图示1:
图示2:
总结:
2.5 端口复用
实例:
服务器崩溃后快速重启而不需要等待TIME_WAIT结束
总结:
2.6 半关闭和shutdown函数
总结:
二、 SOCKET编程
1. 套接字
1.1 套接字概念
1.2 套接字通信原理
套接字都是通信两边一边一个,组成套接字对。一个套接字只有一个文件描述符,同时表示发送和接收
总结:
2. 预备知识
2.1 网络字节序
网络字节序与主机字节序转换:
总结:
将无符号的32位整型的ip和端口与网络字节序的ip和端口转换
主要转换字节序,得到的结果也为32位二进制
2.2 IP地址转换函数
在绑定socket地址结构时,参三传入的应该是指向struct in_addr 的指针,而非结构体内的in_addr.s_addr;
总结:
可以将点分十进制的ip与网络字节序转换
将ip转换位网络字节序
2.3 sockaddr地址结构
具体结构:
结构体定义:
总结:
sin表示socket_internet
3. 网络套接字函数
3.1 socket模型的创建流程
图示:
n个客户端和1个服务端通信总共会创建2n+1 个socket
n(客户端socket) + n(与客户端对应的服务端socket) + 1(监听socket)
问:服务端的n个socket地址结构相同为什么不只创建一个socket用于通信?
总结:
3.2 socket和bind函数
socket():
bind()
总结:
注:socket函数的第三个参数设置为0,意味着选定第二个参数设置协议类型的协议。
SOCK_STREAM(流式协议) = 》 tcp
SOCK_DGRAM(报式协议) =》 udp
注2: bind的函数的参2是设置sockaddr_in类型的参数转换为sockaddr类型
3.3 listen和accept函数
listen()
accept()
accept得到的是与客户端建立连接的服务端socket的fd
而服务端通信的socket的地址结构继承于监听的socket
3.4 connect函数
总结:
客户端可以不使用bind绑定地址结构,会自动分配
4. C/S架构–TCP
4.1 通信流程与代码结合示意图
4.2 服务端代码server.c
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<ctype.h>
void sys_err(const char* str){
perror(str);
exit(1);
}
int main(){
char buf[BUFSIZ]; // BUFSIZ系统默认设置为4096
// 创建一个socket=>ipv4,流式协议, tcp
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd == -1)
sys_err("socket error");
// 绑定地址结构(ip + port)
struct sockaddr_in addr; // 创建sockaddr属性
addr.sin_family = AF_INET; // 设置ipv4
addr.sin_port = htons(8080); // 设置端口号
addr.sin_addr.s_addr = htonl(INADDR_ANY); // 设置一个可用的ip地址,转换为网络地址
socklen_t size= sizeof(addr), client_size;
int ret = bind(sockfd, (struct sockaddr*)&addr, size);
if(ret == -1)
sys_err("bind error");
// 设置监听上限
ret = listen(sockfd, 20);
if(ret == -1)
sys_err("listen error");
// 开始监听
struct sockaddr_in c_addr; // 创建用于接收客户端的地址结构参数
// 参三为传入传出参数,传入原地址结构大小,传出新的地址结构大小
client_size = sizeof(c_addr);
int client_sockfd = accept(sockfd, (struct sockaddr*)&c_addr, &client_size);
if(client_sockfd == -1)
sys_err("accept error");
while(1){
ret = read(client_sockfd, &buf, sizeof(buf));
if(ret == 0)
break;
// 转换为大写
int i = 0;
for(i = 0; i < ret; i++)
buf[i] = toupper(buf[i]);
write(client_sockfd, &buf, ret); // 在服务端中写
}
close(client_sockfd);
close(sockfd);
return 0;
}
可以使用nc ip 端口号,测试服务端
服务端的读和写均是在服务端的读写缓冲区中
服务端打印客户端的ip+port
// 打印客户端的ip和端口
char c_ip[1024];
printf("client ip =%s, port = %d\n",
inet_ntop(AF_INET, &c_addr.sin_addr.s_addr, c_ip, 1024),
ntohs(c_addr.sin_port));
4.3 客户端代码client.c
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<sys/socket.h>
#include<arpa/inet.h>
void sys_err(const char* str){
perror(str);
exit(1);
}
int main(){
char buf[BUFSIZ];
// 创建一个服务器的地址结构
struct sockaddr_in s_addr; // 通过man 7 ip查具体格式
s_addr.sin_family = AF_INET;
s_addr.sin_port = htons(8080); // 端口号必须与服务器创建的一致
int ret = inet_pton(AF_INET, "127.0.0.1", &s_addr.sin_addr.s_addr);
if(ret < 0)
perror("inet_pton error");
// 创建socket
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd == -1)
sys_err("socket error");
// 建立连接,连接服务端的socket
ret = connect(sockfd, (struct sockaddr*)&s_addr, sizeof(s_addr));
if(ret == -1)
perror("connect error");
printf("============\n");
while(1){
// 写数据
ret = read(STDIN_FILENO, buf, sizeof(buf));
ret = write(sockfd, buf, ret); // 往自己的发送缓冲区写
ret = read(sockfd, buf, sizeof(buf)); // 从自己的接收缓冲区读
printf("client:%s\n", buf);
}
close(sockfd);
return 0;
}
运行结果:
服务端与客户端的读/写,均是在自己的发送缓冲区/接收缓冲区中进行的。TCP会根据连接,将一方的发送缓冲区的内容传输到另一方的接收缓冲区。
5. 出错处理函数封装
5.1 wrap.h 和 wrap.c
封装出错处理函数的原因:为了简化main函数里的代码逻辑,减少重复但不可去除的判断错误的代码
wrap.h
#ifndef _WRAP_H_ // 头文件保护,防止头文件被多次引用
#define _WRAP_H_ // 一般命名方式为头文件的 "_大写_H_"
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<ctype.h>
#include<errno.h>
void sys_err(const char* str);
int Socket(int domain, int type, int protocol);
int Bind(int sockfd, const struct sockaddr *addr,socklen_t addrlen);
int Listen(int sockfd, int backlog);
int Accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
int Connect(int sockfd, const struct sockaddr *addr,socklen_t addrlen);
ssize_t Read(int fd, void *buf, size_t count);
ssize_t Write(int fd, const void *buf, size_t count);
int Inet_pton(int af, const char *src, void *dst);
ssize_t Readn(int fd, void* vptr, size_t n); // 读n个字节的数据
ssize_t Writen(int fd, const void* vptr, size_t n); // 写n个字节的数据
ssize_t Readline(int fd, void* vptr, size_t maxlen);
#endif
将一些系统调用进行封装,使其包含错误处理函数
wrap.c
// 实现wrap.h中的接口
#include"wrap.h"
void sys_err(const char* str){
perror(str);
exit(1);
}
int Socket(int domain, int type, int protocol){
int ret = socket(domain, type, protocol);
if(ret == -1)
sys_err("socket error");
return ret;
}
int Bind(int sockfd, const struct sockaddr *addr,socklen_t addrlen){
int ret = bind(sockfd, addr, addrlen);
if(ret == -1)
sys_err("bind error");
return ret;
}
int Listen(int sockfd, int backlog){
int ret = listen(sockfd, backlog);
if(ret == -1)
sys_err("listen error");
return ret;
}
int Accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen){
int ret;
// 当accept成功时,跳出循环
while((ret = accept(sockfd, addr, addrlen)) == -1){
// 由于accept是慢速系统调用,可能被信号打断
// 连接错误 / 被信号 打断
if((errno == ECONNABORTED) || (errno == EINTR))
continue; // 当这两种情况时,重新监听
else
sys_err("accept error"); // 当其他情况,就报错
}
return ret;
}
int Connect(int sockfd, const struct sockaddr *addr,socklen_t addrlen){
int ret;
while((ret = connect(sockfd, addr, addrlen)) == -1){
if(errno == EINTR)
continue;
else
sys_err("connect error");
}
return ret;
}
ssize_t Read(int fd, void *buf, size_t count){
ssize_t ret;
while((ret = read(fd, buf, count)) == -1){
if(errno == EINTR) // 当read被信号打断时,重新read
continue;
else
sys_err("read error");
}
return ret;
}
ssize_t Write(int fd, const void *buf, size_t count){
ssize_t ret;
while((ret = write(fd, buf, count)) == -1){
if(errno == EINTR)
continue;
else
sys_err("write error");
}
return ret;
}
int Inet_pton(int af, const char *src, void *dst){
int ret = inet_pton(af, src, dst);
if(ret == -1)
sys_err("inet_pton error");
return ret;
}
// vptr是传出参数
ssize_t Readn(int fd, void* vptr, size_t n){ // 返回实际读取的字节数
size_t nleft; // 剩余未读取的字节数
ssize_t nread; // int本次实际读到的字节数
char* ptr;
ptr = vptr;
nleft = n;
while(nleft > 0){
if((nread = read(fd, ptr, nleft)) < 0){
if(errno == EINTR)
nread = 0;
else
return -1;
}
else if(nread == 0)
break;
nleft -= nread;
ptr += nread;
}
return n - nleft;
}
ssize_t Writen(int fd, const void* vptr, size_t n){ // 返回实际写入的字节数
size_t nleft; // 剩余未读取的字节数
size_t nwritten; // int本次实际写的字节数
const char* ptr;
ptr = vptr;
nleft = n;
while(nleft > 0){
if((nwritten = write(fd, ptr, nleft)) <= 0){
if(nwritten < 0 && errno == EINTR)
nwritten = 0;
else
break;
}
nleft -= nwritten;
ptr += nwritten;
}
return n - nwritten;
}
// 仅在当前文件内可以使用 , 读出一个字符传出
static sszie_t my_read(int fd, char* ptr){ // ptr为传出参数
// static修饰的变量的生命周期为整个程序运行期间,且只用初始化一次
static int read_cnt; // 静态缓冲区内剩余的字节数
static char* read_ptr; // 指向静态缓冲区中字符串的指针
static char read_buf[100]; // 创建一个静态缓冲区
if(read_cnt <= 0){ // 当静态缓冲区内无内容时,读内容进缓冲区
while((read_cnt = read(fd, read_buf, sizeof(read_buf))) <= 0){
if(read_cnt == 0)
return 0;
else if(errno == EINTR) // 当被信号打断,再执行一次
continue;
return -1;
}
read_ptr = read_buf; // 重置缓冲区的指针
}
// 读出一个字符,由ptr传出
read_cnt--;
*ptr = *read_ptr++;
return 1;
}
// vptr是传出参数
ssize_t Readline(int fd, void* vptr, size_t maxlen){
ssize_t n, rc; // n为已读字节数,rc表示本次读的结果
char c, *ptr; // c接收一个字符
ptr = vptr;
for(n = 1; n < maxlen; n++){
if((rc = my_read(fd, &c)) == 1){
*ptr++ = c;
if(c == '\n') // 遇到换行终止,读一行的内容
break;
}
else if(rc == 0){ // 当本次未读到内容
*ptr = 0; // 将当前位置设置为空字符null ‘\0’
return n - 1;
}
else // 出错
return -1;
}
*ptr = 0; // 最后一个字符设置为'\0'
return n;
}
实现wrap.h封装的所有函数
5.2 封装writen,readn和readline
// vptr是传出参数
ssize_t Readn(int fd, void* vptr, size_t n){ // 返回实际读取的字节数
size_t nleft; // 剩余未读取的字节数
ssize_t nread; // int本次实际读到的字节数
char* ptr;
ptr = vptr;
nleft = n;
while(nleft > 0){
if((nread = read(fd, ptr, nleft)) < 0){
if(errno == EINTR)
nread = 0;
else
return -1;
}
else if(nread == 0)
break;
nleft -= nread;
ptr += nread;
}
return n - nleft;
}
ssize_t Writen(int fd, const void* vptr, size_t n){ // 返回实际写入的字节数
size_t nleft; // 剩余未读取的字节数
size_t nwritten; // int本次实际写的字节数
const char* ptr;
ptr = vptr;
nleft = n;
while(nleft > 0){
if((nwritten = write(fd, ptr, nleft)) <= 0){
if(nwritten < 0 && errno == EINTR)
nwritten = 0;
else
break;
}
nleft -= nwritten;
ptr += nwritten;
}
return n - nwritten;
}
// 仅在当前文件内可以使用 , 读出一个字符传出
static sszie_t my_read(int fd, char* ptr){ // ptr为传出参数
// static修饰的变量的生命周期为整个程序运行期间,且只用初始化一次
static int read_cnt; // 静态缓冲区内剩余的字节数
static char* read_ptr; // 指向静态缓冲区中字符串的指针
static char read_buf[100]; // 创建一个静态缓冲区
if(read_cnt <= 0){ // 当静态缓冲区内无内容时,读内容进缓冲区
while((read_cnt = read(fd, read_buf, sizeof(read_buf))) <= 0){
if(read_cnt == 0)
return 0;
else if(errno == EINTR) // 当被信号打断,再执行一次
continue;
return -1;
}
read_ptr = read_buf; // 重置缓冲区的指针
}
// 读出一个字符,由ptr传出
read_cnt--;
*ptr = *read_ptr++;
return 1;
}
// vptr是传出参数
ssize_t Readline(int fd, void* vptr, size_t maxlen){
ssize_t n, rc; // n为已读字节数,rc表示本次读的结果
char c, *ptr; // c接收一个字符
ptr = vptr;
for(n = 1; n < maxlen; n++){
if((rc = my_read(fd, &c)) == 1){
*ptr++ = c;
if(c == '\n') // 遇到换行终止,读一行的内容
break;
}
else if(rc == 0){ // 当本次未读到内容
*ptr = 0; // 将当前位置设置为空字符null ‘\0’
return n - 1;
}
else // 出错
return -1;
}
*ptr = 0; // 最后一个字符设置为'\0'
return n;
}
readline函数是逐个字符读取判断是否为\n,进而判断是否读完一行
使用my_read函数,将一部分读的内容放入静态缓冲区,防止逐个判断字符时调用太多次系统调用read
readn和readline中的参二vptr均为传出参数,在函数执行结束,表示为读到内容的指针
static修饰局部变量:使局部变量的 生命周期延长至整个程序运行期间,但 作用域仍限于函数内(只能在该函数内访问)。
static修饰全局变量或函数:限制全局变量或函数的 链接属性,使其 仅在当前文件可见(避免命名冲突)。
5.3 修改后的server.c 和 client.c
server.c
#include"wrap.h"
int main(){
// BUFSIZ系统默认设置为4096
char buf[BUFSIZ], c_ip[1024]; // c_ip为客户端的ip
int sockfd, client_sockfd, ret;
// 创建一个socket=>ipv4,流式协议, tcp
sockfd = Socket(AF_INET, SOCK_STREAM, 0);
// 绑定地址结构(ip + port)
struct sockaddr_in addr; // 创建sockaddr属性
addr.sin_family = AF_INET; // 设置ipv4
addr.sin_port = htons(8080); // 设置端口号
addr.sin_addr.s_addr = htonl(INADDR_ANY); // 设置一个可用的ip地址,转换为网络地址
socklen_t size= sizeof(addr), client_size;
Bind(sockfd, (struct sockaddr*)&addr, size);
// 设置监听上限
Listen(sockfd, 20);
// 开始监听
struct sockaddr_in c_addr; // 创建用于接收客户端的地址结构参数
// 参三为传入传出参数,传入原地址结构大小,传出新的地址结构大小
client_size = sizeof(c_addr);
client_sockfd = Accept(sockfd, (struct sockaddr*)&c_addr, &client_size);
// 打印客户端的ip和端口
printf("client ip =%s, port = %d\n",
inet_ntop(AF_INET, &c_addr.sin_addr.s_addr, c_ip, 1024),
ntohs(c_addr.sin_port));
while(1){
ret = Read(client_sockfd, &buf, sizeof(buf));
if(ret == 0)
break;
// 转换为大写
int i = 0;
for(i = 0; i < ret; i++)
buf[i] = toupper(buf[i]);
// 在自己的输入缓冲区写,会发送到客户端的接收缓冲区
Write(client_sockfd, buf, ret);
}
close(client_sockfd);
close(sockfd);
return 0;
}
client.c
#include"wrap.h"
int main(){
char buf[BUFSIZ], recvbuf[BUFSIZ];
int ret;
// 创建一个服务器的地址结构
struct sockaddr_in s_addr; // 通过man 7 ip查具体格式
s_addr.sin_family = AF_INET;
s_addr.sin_port = htons(8080); // 端口号必须与服务器创建的一致
inet_pton(AF_INET, "127.0.0.1", &s_addr.sin_addr.s_addr);
// 创建socket
int sockfd = Socket(AF_INET, SOCK_STREAM, 0);
// 建立连接,连接服务端的socket
Connect(sockfd, (struct sockaddr*)&s_addr, sizeof(s_addr));
while(1){
// 写数据
ret = Read(STDIN_FILENO, buf, sizeof(buf));
if(ret == 1) { // 当输入\n或EOF时,跳出循环
printf("client exit\n");
break;
}
ret = Write(sockfd, buf, ret); // 往自己的发送缓冲区写
ret = Read(sockfd, recvbuf, sizeof(recvbuf)); // 从自己的接收缓冲区读
recvbuf[ret] = '\0'; // 手动添加结束符,read不会添加\0
printf("client:%s\n", recvbuf); // printf会读到\0终止
}
close(sockfd);
return 0;
}
在客户端代码中,判断每次从终端读到的数据是否为1( \n ),来判断是否结束输入
在传输过程中需手动添加 \0 , 因为read不会自动添加,而printf判断字符串结束是通过\0 判断,若不添加,则会出现之前的结果(仍在内存中)
运行结果:
5.4 read函数的返回值
5.5 总结
三、 高并发服务器
1. 多进程并发服务器
1.1 实现思路
1.2 serve2.c
// 多进程并发服务器
#include"wrap.h"
#include<sys/wait.h>
#define SRV_PORT 9999
void func_catch(int signum){
int wpid;
// 无僵尸进程(0),或无子进程(-1) 均会退出循环\g
while((wpid = waitpid(-1, NULL, WNOHANG)) > 0);
return;
}
int main(){
int lfd, cfd;
struct sockaddr_in srv_addr, clt_addr;
socklen_t clt_addr_len;
char buf[BUFSIZ];
int ret, i;
pid_t pid;
lfd = Socket(AF_INET, SOCK_STREAM, 0);
memset(&srv_addr, 0 , sizeof(srv_addr)); // 将该块内存置 \0
srv_addr.sin_family = AF_INET;
srv_addr.sin_port = htons(SRV_PORT);
srv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
Bind(lfd, (struct sockaddr*)&srv_addr, sizeof(srv_addr));
Listen(lfd, 128);
clt_addr_len = sizeof(clt_addr);
while(1){
cfd = Accept(lfd, (struct sockaddr*)&clt_addr, &clt_addr_len); // 开始监听
pid = fork(); // 创建子进程
if(pid < 0){
sys_err("fork error");
}
else if(pid == 0){ // 子进程
close(lfd); // 关闭监听socket
break;
}
else{
close(cfd); // 关闭通信socket
// 注册信号捕捉SIGCHLD
struct sigaction act;
act.sa_handler = func_catch; // 设置回调函数
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
ret = sigaction(SIGCHLD, &act, NULL);
if(ret == -1)
sys_err("sigaction error");
continue;
}
}
if(pid == 0){
while(1){
ret = Read(cfd, buf, sizeof(buf));
if(ret == 0){
close(cfd);
exit(10);
}
for(i = 0; i < ret; i++)
buf[i] = toupper(buf[i]);
Write(cfd, buf, ret);
Write(STDOUT_FILENO, buf, ret);
}
}
return 0;
}
循环回收子进程,当子进程为空或僵尸进程队列为空时,跳出循环
2. 多线程并发服务器
2.1 实现思路
2.2 server3.c
// 多线程并发服务器
#include"wrap.h"
#include<sys/wait.h>
#define SRV_PORT 9999
void err_func(int num){
fprintf(stderr, "%s\n", strerror(num));
exit(1);
}
void* tfunc(void* arg){ // 线程函数
int cfd = *(int*)arg, ret = 0, i = 0;
char buf[BUFSIZ];
free(arg); // 释放保存cfd的内存
while(1){
ret = Read(cfd, buf, sizeof(buf));
if(ret == 0){
close(cfd);
exit(10);
}
for(i = 0; i < ret; i++)
buf[i] = toupper(buf[i]);
Write(cfd, buf, ret);
Write(STDOUT_FILENO, buf, ret);
}
close(cfd);
return NULL;
}
int main(){
int lfd, cfd;
struct sockaddr_in srv_addr, clt_addr;
socklen_t clt_addr_len;
int ret, i = 0;
pthread_t tid[256];
lfd = Socket(AF_INET, SOCK_STREAM, 0);
memset(&srv_addr, 0 , sizeof(srv_addr)); // 将该块内存置 \0
srv_addr.sin_family = AF_INET;
srv_addr.sin_port = htons(SRV_PORT);
srv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
Bind(lfd, (struct sockaddr*)&srv_addr, sizeof(srv_addr));
Listen(lfd, 128);
clt_addr_len = sizeof(clt_addr);
while(1){
cfd = Accept(lfd, (struct sockaddr*)&clt_addr, &clt_addr_len); // 开始监听
int* client_fd = malloc(sizeof(int)); // 创建指向cfd的指针,堆区
*client_fd = cfd;
// 创建线程
ret = pthread_create(&tid[i], NULL, tfunc, (void*)client_fd);
if(ret != 0)
err_func(ret);
ret = pthread_detach(tid[i]); // 设置线程分离
if(ret != 0)
err_func(ret);
i++;
}
return 0;
}
创建线程时,传递的参数为指向cfd的指针,保证后续修改cfd时不会出错(提前malloc使用堆区内存保存该值,在线程中释放该块内存)
使用pthread_detach分离线程,使线程可以自动回收
3. 多路IO转接服务器
多路I/O转接(也称为I/O多路复用)是一种高效处理多个客户端连接的服务器设计模式,它允许单个进程/线程同时监视多个文件描述符(通常是套接字)的可读、可写或异常状态。
3.1 select
图示:
当客户端有连接请求发送过来时,首先发送给select,select接收到后给服务端发送消息,服务端调用accept,此时accept不会阻塞等待,而是直接连接客户端
(1)select函数原型分析
总结:
timeout决定了select在没有任何文件描述符就绪时最多等待多长时间
返回值为三个集合中总共的满足事件的数量
(2)select相关函数参数分析
总结:
(3)思路分析
(4)具体实现
#include"wrap.h"
int main(){
// BUFSIZ系统默认设置为4096
char buf[BUFSIZ], c_ip[1024]; // c_ip为客户端的ip
int sockfd, client_sockfd;
// 创建一个socket=>ipv4,流式协议, tcp
sockfd = Socket(AF_INET, SOCK_STREAM, 0);
// 绑定地址结构(ip + port)
struct sockaddr_in addr, c_addr; // 创建sockaddr属性
addr.sin_family = AF_INET; // 设置ipv4
addr.sin_port = htons(9999); // 设置端口号
addr.sin_addr.s_addr = htonl(INADDR_ANY); // 设置一个可用的ip地址,转换为网络地址
socklen_t size= sizeof(addr), client_size;
Bind(sockfd, (struct sockaddr*)&addr, size);
// 设置监听上限
Listen(sockfd, 20);
fd_set rset, allset; // 创建读集合和所有期望的读集合
FD_ZERO(&allset);
FD_SET(sockfd, &allset); // 将监听fd添加进集合
int maxfd = 0, ret, i, j, n;
maxfd = sockfd;
while(1){
// 设置读集合,传入时为期望监听的集合, 传出为实际有事件的集合
rset = allset;
ret = select(maxfd + 1, &rset, NULL, NULL, NULL);
if(ret == -1)
sys_err("select error");
// 查看监听fd是否在传出集合中(判断是否有新客户端请求连接)
if(FD_ISSET(sockfd, &rset)){
client_size = sizeof(c_addr);
client_sockfd = Accept(sockfd, (struct sockaddr*)&c_addr, &client_size);
FD_SET(client_sockfd, &allset); // 添加新的客户端入集合
if(maxfd < client_sockfd)
maxfd = client_sockfd; // 更新最大fd
if(ret == 1) // 当前事件只有一个且为监听时,跳过后续
continue;
}
// 若还有其他事件发生
for(i = sockfd + 1; i <= maxfd; i++){
if(FD_ISSET(i, &rset)){ // 查看i是否发生事件
n = Read(i, buf, sizeof(buf));
if(n == 0){ // 表示客户端关闭
close(i); // 关闭服务端与之对应客户端连接的套接字
FD_CLR(i, &allset); // 移除该fd
}
for(j = 0; j < n; j++)
buf[j] = toupper(buf[j]);
Write(i, buf, n);
Write(STDOUT_FILENO, buf, n);
}
}
}
close(sockfd);
return 0;
}
(5)select优缺点
(6)使用数组进行优化
#include"wrap.h"
int main(){
// BUFSIZ系统默认设置为4096
char buf[BUFSIZ], c_ip[1024]; // c_ip为客户端的ip
int sockfd, client_sockfd;
// 创建一个socket=>ipv4,流式协议, tcp
sockfd = Socket(AF_INET, SOCK_STREAM, 0);
// 绑定地址结构(ip + port)
struct sockaddr_in addr, c_addr; // 创建sockaddr属性
addr.sin_family = AF_INET; // 设置ipv4
addr.sin_port = htons(9999); // 设置端口号
addr.sin_addr.s_addr = htonl(INADDR_ANY); // 设置一个可用的ip地址,转换为网络地址
socklen_t size= sizeof(addr), client_size;
Bind(sockfd, (struct sockaddr*)&addr, size);
// 设置监听上限
Listen(sockfd, 20);
fd_set rset, allset; // 创建读集合和所有期望的读集合
FD_ZERO(&allset);
FD_SET(sockfd, &allset); // 将监听fd添加进集合
int maxfd = 0, ret, i, j, n;
maxfd = sockfd;
int client[FD_SETSIZE]; //FD_SETSIZE 默认为1024
int maxid = -1;
for(i = 0;i < 1024; i++)
client[i] = -1;
while(1){
// 设置读集合,传入时为期望监听的集合, 传出为实际有事件的集合
rset = allset;
ret = select(maxfd + 1, &rset, NULL, NULL, NULL);
if(ret == -1)
sys_err("select error");
// 查看监听fd是否在传出集合中(判断是否有新客户端请求连接)
if(FD_ISSET(sockfd, &rset)){
client_size = sizeof(c_addr);
client_sockfd = Accept(sockfd, (struct sockaddr*)&c_addr, &client_size);
for(i = 0; i < FD_SETSIZE; i++){
if(client[i] < 0){
client[i] = client_sockfd;
break;
}
}
if(i == 1024){
fputs("too many clients\n", stderr);
exit(1);
}
FD_SET(client_sockfd, &allset); // 添加新的客户端入集合
if(maxfd < client_sockfd)
maxfd = client_sockfd; // 更新最大fd
if(i > maxid)
maxid = i;
if(--ret == 0) // 当前事件只有一个且为监听时,跳过后续
continue;
}
// 若还有其他事件发生
int fd;
for(i = 0; i <= maxid; i++){
if((fd = client[i]) < 0)
continue;
if(FD_ISSET(fd, &rset)){ // 查看i是否发生事件
n = Read(fd, buf, sizeof(buf));
if(n == 0){ // 表示客户端关闭
close(fd); // 关闭服务端与之对应客户端连接的套接字
FD_CLR(fd, &allset); // 移除该fd
client[i] = -1; // 重置当前位置
}
else if(n > 0){
for(j = 0; j < n; j++)
buf[j] = toupper(buf[j]);
Write(fd, buf, n);
Write(STDOUT_FILENO, buf, n);
}
if(--ret == 0)
break;
}
}
}
close(sockfd);
return 0;
}
3.2 poll
(1)poll函数原型分析
总结:
(2)使用注意事项
判断是否有事件发生,使用位与运算判断
参数1使用的是结构体数组
(3)优缺点
(4)具体实现
#include"wrap.h"
#include<poll.h>
int main(){
// BUFSIZ系统默认设置为4096,INET_ADDRSTRLEN默认为16
char buf[BUFSIZ], c_ip[1024], str[INET_ADDRSTRLEN]; // c_ip为客户端的ip
int sockfd, client_sockfd;
// 创建一个socket=>ipv4,流式协议, tcp
sockfd = Socket(AF_INET, SOCK_STREAM, 0);
// 绑定地址结构(ip + port)
struct sockaddr_in addr, c_addr; // 创建sockaddr属性
addr.sin_family = AF_INET; // 设置ipv4
addr.sin_port = htons(9999); // 设置端口号
addr.sin_addr.s_addr = htonl(INADDR_ANY); // 设置一个可用的ip地址,转换为网络地址
socklen_t size= sizeof(addr), client_size;
Bind(sockfd, (struct sockaddr*)&addr, size);
// 设置监听上限
Listen(sockfd, 20);
int ret, i, j, n;
struct pollfd client[1024]; // 创建结构体数组
int maxi; // 指向当前最大使用过的文件描述符下标
for(i = 1;i < 1024; i++) // 初始化
client[i].fd = -1;
client[0].fd = sockfd;
client[0].events = POLLIN;
maxi = 0;
while(1){
ret = poll(client, maxi + 1, -1);
if(client[0].revents & POLLIN){ // 判断监听fd是否有事件
client_sockfd = Accept(sockfd, (struct sockaddr*)&c_addr, &client_size);
for(i = 0; i < 1024; i++){
if(client[i].fd < 0) { // 查找空位
client[i].fd = client_sockfd;
break;
}
}
if(i == 1024){
fputs("too many clients\n", stderr);
exit(1);
}
client[i].events = POLLIN; // 设置读事件
if(i > maxi)
maxi = i;
if(--ret == 0)
continue;
}
// 查看其他套接字是否触发事件
int fd;
for(i = 1; i <= maxi; i++){
if((fd = client[i].fd) < 0)
continue;
if(client[i].revents & POLLIN){
if((n = read(fd, buf, sizeof(buf))) < 0){
if(errno == EINTR ||
errno == EWOULDBLOCK || errno == EAGAIN
|| errno == ECONNRESET) { // 收到RST标志位
close(fd);
client[i].fd = -1; // 重置那个位置的fd
}
else
sys_err("read error");
}
else if(n == 0){
close(fd);
client[i].fd = -1;
}
else{
for(j = 0; j < n; j++)
buf[j] = toupper(buf[j]);
Write(fd, buf, n);
Write(STDOUT_FILENO, buf, n);
}
}
if(--ret == 0)
break;
}
}
close(sockfd);
return 0;
}
3.3 epoll
epoll 是 Linux 内核提供的一种高效的 I/O 事件通知机制,专门用于处理大量文件描述符的监控,是 select 和 poll 的替代方案。
流程:
图示:
(1)epoll_create函数
总结:
epoll_create函数会创建一个struct eventpoll 结构体,其中包含红黑树的根节点还有一个就绪链表(epoll_wait函数的传出参数是从内核的就绪链表复制而来)。返回一个操作该epoill的fd
每个结点内包含监听的文件描述符和epoll_event等
(2)epoll_ctl函数
结构体类型:
总结:
参3设置监听的fd, 参4的event.events参数设置参3fd监听的事件类型
参4的event.data.fd也能设置fd,一般与参三一致,但不是硬性要求,具体看所需的功能实现
(3)epoll_wait函数
总结:
传出参数events参数用于接收从内核就绪链表(rdllist)中传出的就绪事件。就绪链表在epoll_create时创建,由内核维护
(4)实现思路
(5)具体实现
#include"wrap.h"
#include<sys/epoll.h>
#define OPNE_MAX 1024
int main(){
// BUFSIZ系统默认设置为4096,INET_ADDRSTRLEN默认为16
char buf[BUFSIZ], c_ip[1024], str[INET_ADDRSTRLEN]; // c_ip为客户端的ip
int sockfd, client_sockfd;
// 创建一个socket=>ipv4,流式协议, tcp
sockfd = Socket(AF_INET, SOCK_STREAM, 0);
// 绑定地址结构(ip + port)
struct sockaddr_in addr, c_addr; // 创建sockaddr属性
addr.sin_family = AF_INET; // 设置ipv4
addr.sin_port = htons(9999); // 设置端口号
addr.sin_addr.s_addr = htonl(INADDR_ANY); // 设置一个可用的ip地址,转换为网络地址
socklen_t size= sizeof(addr), client_size;
Bind(sockfd, (struct sockaddr*)&addr, size);
// 设置监听上限
Listen(sockfd, 20);
// temp用于epoll_ctl, ep数组用于epoll_wait
struct epoll_event temp,ep[OPNE_MAX];
int epollfd, ret, i = 0, j = 0, n;
// 创建一个epoll实例,此处的OPEN_MAX可突破1024上限
epollfd = epoll_create(OPNE_MAX);
if(epollfd == -1)
sys_err("epoll_create error");
// 初始化监听fd的event
temp.events = EPOLLIN; // 设置监听读事件
temp.data.fd = sockfd; // 可与参3的fd不同
ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &temp); // 插入红黑树
if(ret == -1)
sys_err("epoll_ctl error");
while(1){
// 参三为参二数组的大小,最大容量而非实际使用的大小
// 返回值为传出数组的实际使用大小
ret = epoll_wait(epollfd, ep, OPNE_MAX, -1); // 阻塞监听
if(ret == -1)
sys_err("epoll_wait error");
// 遍历数组
for(i = 0; i < ret; i++){
if(!(ep[i].events & EPOLLIN)) // 如果监听不是读事件,继续循环
continue;
int res = 0; // 临时存储各个返回值
if(ep[i].data.fd == sockfd){ // 当处理监听fd时
client_sockfd = Accept(sockfd, (struct sockaddr*)&c_addr, &client_size);
// 添加结点进红黑树
temp.events = EPOLLIN; // 设置事件为读
temp.data.fd = client_sockfd;
res = epoll_ctl(epollfd, EPOLL_CTL_ADD, client_sockfd, &temp);
if(res == -1)
sys_err("epoll_ctl error");
}
else{ // 当是其他客户端通信fd触发事件时
int fd = ep[i].data.fd;
n = read(fd, buf, sizeof(buf));
if(n == 0){ // 当读到0字节,表示结束通信
// 从红黑树中移除结点
res = epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL);
if(res == -1)
sys_err("epoll_ctl error");
close(fd);
}
else if(n < 0){ // 出错
perror("read error");
res = epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
}
else{
for(j = 0; j < n; j++)
buf[j] = toupper(buf[j]);
Write(fd, buf, n);
Write(STDOUT_FILENO, buf, n);
}
}
}
}
close(sockfd);
return 0;
}
epoll_wait设置非阻塞的情况,epoll_wait 超时仍然会检查 fd 状态
即使没有新事件,epoll_wait 也会检查注册的 fd 是否满足触发条件(如可读/可写)。
(6)优缺点
3.4 突破1024文件描述符设置(poll和epoll通用)
总结:
注:此方法不能影响select,因为select受fd_set位图的影响,而内核规定该位图的大小为1024位,因此最大上线为1024
4. epoll进阶
4.1 事件模型----ET和LT
ET模式下,仅在 socket 状态变化时触发一次事件,内核不会主动唤醒阻塞的 read,因为 ET 模式下:
事件通知完全依赖 epoll_wait 返回。
内核认为“状态变化已通知,后续责任在用户态”。
LT模式下,只要 socket 缓冲区有数据,内核就会持续标记其为“可读”。内核会在数据到达后 直接唤醒 read(不依赖 epoll_wait)
图示:
电平变换触发为ET模式,电平持续触发为LT模式
实例:
总结:
ET模式是在socket状态变幻时,会触发。即同一个socket在wait期间有数据流入/流出也会触发
若在超时事件后,即使没有事件触发,内核也会检查监听的fd是否有可读内容
4.2 epoll的ET非阻塞模式
epoll 应设置为 ET+非阻塞+轮询的方式一次读完全部需要的数据
总结:
ET模式下设置非阻塞的必要性:依赖epoll_wait
LT模式会一直检测缓冲区内是否有数据,如果因为read阻塞,当对端发送数据后,内核会唤醒read,满足条件后read会解除阻塞,进而继续后续代码
若为ET模式,则当线程被read阻塞,而ET模式下read的唤醒依赖epoll_wait的触发,而由于阻塞在read,因此不会进行到后续代码
注:epoll_wait设置为非阻塞的情况,当到达超时时长,即使没有新事件,epoll_wait也会检查其监控的fd是否满足条件(可读/可写)。
4.3 epoll反应堆
(1)模型综述
反应堆最大的不同是,将结点设置为读后,调用完处理函数后,自动修改为写监听,在写完后又转换为读监听
(2)具体代码及注释
#include"wrap.h"
#include<fcntl.h>
#define MAX_EVENTS 1024 // 监听上限数
#define BUFLEN 4096
#define SERV_PORT 8080
void recvdata(int fd, int events, void *arg); // cfd的读回调函数声明
void senddata(int fd, int events, void *arg);
// 描述就绪文件描述符相关信息(使用void* ptr接收)
struct myevent_s{
int fd; // 监听的fd
int events; // 对应的监听事件
void* arg; // 泛型参数
void (*call_back)(int fd, int events, void* arg); // 函数指针(回调函数)
int status; // 是否在监听,1-》在树上
char buf[BUFLEN]; // 缓冲区
int len; // 缓冲区大小
long last_active; // 记录每次加入红黑树的时间值
}
int g_efd; // 全局变量,保存红黑树的fd
struct myevent_s g_events[MAX_EVENTS + 1]; // 自定义结构体数组,+1-》lfd
// 将myevent_s的成员变量初始化=>传入的ev指向的结构体初始化
void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void*), void* arg){
ev->fd = fd;
ev->call_back = call_back;
ev->events = 0; // 后续也不使用0这个值,会重新赋值(在eventadd时,根据传入的读/写)
ev->arg = arg; // call_back的参数之一,根据代码调用可知,arg保存的当前结构体的地址
ev->status = 0;
// memset(ev->buf, 0, sizeof(ev->buf));
// ev->len = 0;
ev->last_active = time(NULL); // 调用eventset的当前时间
return;
}
// 向epoll的红黑树上添加结点,通过参二控制传入的监听读/写
void eventadd(int efd, int events, struct myevent_s *ev){
struct epoll_event tmp = {0, {0}}; // 初始化一个变量, epoll_ctl的参数
int op; // epoll_ctl的参数,判断结点是添加/修改/删除
tmp.data.ptr = ev; // 将ptr赋值为一个自定义结构体的指针
tmp.events = ev->events = events; // EPOLLIN / EPOLLOUT(对结构体的event赋值)
if(ev->status == 0){ // 传入的结点未上树的情况
op = EPOLL_CTL_ADD; // 操作为添加
ev->status = 1; // 设置该结点已上树
}
if(epoll_ctl(efd, op, ev->fd, &tmp) < 0)
printf("event add failed [fd =%d], events[%d]\n", ev->fd, events);
else
printf("event add ok [fd=%d],op=%d, events[%0X]\n", ev->fd, op, events);
return;
}
// 当有文件描述符就绪,epoll返回,调用该函数与客户端建立链接
// lfd的回调函数
void acceptconn(int lfd, int events, void* arg){
struct sockaddr_in cin; // 客户端的地址结构
socklen_t len = sizeof(cin);
int cfd, i;
if((cfd = accept(lfd, (struct sockaddr*)&cin, &len)) == -1){
if(errno != EAGAIN && errno != EINTR){
// 错误处理,暂时没写
}
printf("%s, accept, %s\n", __func__, strerror(errno)); // __func__表示当前函数名
return;
}
do{ // 单层循环,为了满足条件break
for(i = 0; i < MAX_EVENTS; i++) // 找到空闲的g_events的位置
if(g_events[i].status == 0)
break; // 找到空位,跳出当前循环,执行单层循环后续代码
if(i == MAX_EVENTS){
printf("%s:max connect limit[%d]\n", __func__, MAX_EVENTS);
break; // 没有空位,跳出单层循环
}
int flag = 0; // 接收返回值
if((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0){ // 将cfd设置为非阻塞
printf("%s:fcnt nonblocking failed, %s\n", __func__, strerror(errno));
break;
}
// 刚创建的结点,默认设置为读监听 回调函数默认设置为读回调
// 给cfd设置一个myevent_s结构体,并初始化,同时设置cfd对应的回调函数
eventset(&g_events[i], cfd, recvdata, &g_events[i]);
// 将cfd对应结点添加进红黑树
eventadd(g_efd, EPOLLIN, &g_events[i]); // 设置监听读事件
}while(0);
printf("new connect[%s:%d][time:%ld], pos[%d]\n"inet_ntoa(cin.sin addr),ntohs(cin.sin port),g events[i].last active, i);
return;
}
// 移除结点
void eventdel(int efd, struct myevent_s *ev){
struct epoll_event tmp = {0, {0}};
if(ev->status != 1) // 如果已经不在树上
return;
tmp.data.ptr = NULL;
ev->status = 0; // 改变状态
epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &tmp); // 移除结点
return;
}
// cfd的 读 回调函数
void recvdata(int fd, int events, void* arg){
struct myevent_s *ev = (struct myevent_s*)arg;
int len;
len = recv(fd, ev->buf, sizeof(ev->buf), 0); // 与read类似
eventdel(g_efd, ev); // 移除结点
if(len > 0){
ev->len = len; // 设置结构体中缓冲区的实际大小
ev->buf[len] = '\0'; // 便于printf
printf("C[%d]:%s\n", fd, ev->buf);
eventset(ev, fd, senddata, ev); // 将结点转换为 写的回调函数
eventadd(g_efd, EPOLLOUT, ev); // 结点监听写事件,添加进红黑树
}
else if(len == 0){
close(ev->fd);
printf("[fd=%d] pos[%ld], closed\n", fd, ev-g_events);
}
else{
close(ev->fd);
printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(error));
}
return;
}
// cfd的 写 回调函数
void senddata(int fd, int events, void* arg){
struct myevent_s *ev = (struct myevent_s*)arg;
int len;
len = send(fd, ev->buf, ev->len, 0); // 与write类似
eventdel(g_efd, ev); // 从红黑树移除
if(len > 0){
printf("send[fd=%d],[%d]%s\n", fd, len, ev->buf);
eventset(ev, fd, recvdata, ev); // 再将结点设置为读回调
eventadd(g_efd, EPOLLIN, ev); // 将结点设置读监听,放回树上
}
else{
close(ev->fd);
printf("send[fd=%d] error %s\n", fd, strerror(errno));
}
return ;
}
// 创建socket,初始化lfd
void initlistensocket(int efd, short port){
struct sockaddr_in sin; // 服务端地址结构
int lfd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(lfd, F_SETFL, O_NONBLOCK); // 设置lfd为非阻塞
memset(&sin, 0, sizeof(sin)); // 将这块内存清零
sin.sin_family = AF_INET;
sin.sin_port = htons(port);
sin.sin_addr = INADDR_ANY;
bind(lfd, (struct sockaddr*)&sin, sizeof(sin));
listen(lfd, 20);
// void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void*), void* arg)
eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]);
// 将lfd结点添加进红黑树
eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);
}
int main(int argc, char* argv[]){
unsigned short port = SERV_PORT; // 设置端口
if(argc == 2) // 可以手动传入端口
port = atoi(argv[1]);
g_efd = epoll_create(MAX_EVENTS + 1);// 创建红黑树,赋值给全局变量
if(g_efd < 0)
fprintf(stderr, "epoll_create error:%s\n", strerror(error));
initlistensocket(g_efd, port); // 初始化监听socket
struct epoll_event events[MAX_EVENTS + 1]; // epoll_wait的传出参数
printf("server running:port[%d]\n", port);
int i, checkpos = 0;
while(1){
// 超时验证,每次循环检测100个链接,超过60s不活跃就移除
long now = time(NULL); // 获取当前时间
for(i = 0; i < 100 ; i++, checkpos++){
if(checkpos == MAX_EVENTS)
checkpos = 0; // 到达最大就归零
if(g_events[checkpos].status != 1) // 不在树上,跳过
continue;
long duration = now - g_events[checkpos].last_active;
if(duration > 60){
close(g_events[checkpos].fd); // 关闭对应的fd
printf("[fd=%d],timeout\n", g_events[checkpos].fd);
evendel(g_efd, &g_events[checkpos]); // 从红黑树移除结点
}
}
// 监听红黑树,将满足的事件的fd添加到events数组中,超时时间设置为1000ms=1s
int nfd = epoll_wait(g_efd, events, MAX_EVENTS+1, 1000);
if(nfd < 0){
printf("epoll_wait error,exit\n");
break;
}
for(i = 0; i < nfd; i++){
// 使用自定义结构体myevent_s,接收epoll_event[i].data.ptr参数
// ptr存储的指向某个myevent_s结构体的地址(在eventadd中设置的)
struct myevent_s *ev = (struct myevent_s*)events[i].data.ptr;
if((events[i].events & EPOLLIN) && (ev->event & EPOLLIN)){ // 读就绪事件
ev->call_back(ev->fd, events[i].events, ev->arg);
}
if((events[i].events & EPOLLOUT) && (ev->event & EPOLLOUT)){ // 写就绪事件
ev->call_back(ev->fd, events[i].events, ev->arg);
}
}
}
}
(3)流程图图示
4.4 ctags
centos使用sudo yum install ctags安装
5. 线程池
5.1 线程池模型原理分析
5.2 线程池模块分析
5.3 线程池实现代码及注释
#include"wrap.h"
// 各子线程的任务结构体
typedef struct{
void *(*function)(void *); // 函数指针---回调函数
void *arg; // 回调函数的参数
}threadpool_task_t;
// 描述线程池相关信息
struct threadpool_t{
pthread_mutex_t lock; // 用于锁住本结构体
pthread_mutex_t thread_counter; //记录忙线程个数的锁,busy_thr_num
pthread_cond_t queue_not_full; // 条件变量,当任务队列满时,添加任务的进程阻塞
pthread_cond_t queue_not_empty; // 当任务队列为空时,阻塞等待添加任务
pthread_t *threads; // 存放每个线程的tid,数组
pthread_t adjust_tid; // 存管理线程的tid
threadpool_task_t *task_queue; // 任务队列,数组
int min_thr_num; // 线程池最小线程数
int max_thr_num; // 线程池最大线程数
int live_thr_num; // 当前存活线程数
int busy_thr_num; // 忙状态的线程数
int wait_exit_thr_num; // 要销毁的线程数
int queue_front; // 任务队列队头指针
int queue_rear; // 任务队列队尾指针
int queue_size; // 任务队列中实际任务数
int queue_max_size; // 任务队列最大容量
int shutdown; // 线程池是否使用,1--不使用,0--使用
}
// 工作线程的回调函数
void* threadpool_thread(void* threadpool){
threadpool_t *pool = (threadpool_t*)threadpool; // 接收传入的线程池指针
threadpool_task_t task; // 创建一个任务
while(true){
// 刚创建线程时,等待任务队列中有任务,否则阻塞
pthread_mutex_lock(&(pool->lock)); // 给结构体加锁
// queue_size == 0 说明没有任务,调wait阻塞,等待条件满足;如果有任务,则跳过该while
while((pool->queue_size == 0) && (!pool->shutdown)){
printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());
// 条件是队列不为空,锁为结构体中的锁
// 先持有锁,发现不满足条件释放锁,等待条件满足(其他线程通知),再拿锁
pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
// 清楚指定数目的线程======》 管理者线程控制瘦身清理线程
if(pool->wait_exit_thr_num > 0){
pool->wait_exit_thr_num--;
// 如果线程池里线程的个数大于最小值时可以结束当前线程
if(pool->live_thr_num > pool->min_thr_num){
printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
pool->live_thr_num--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
}
}
}
// 销毁线程池的时候触发
// 如果线程池处于关闭状态,关闭线程池中所有线程,自行退出
if(pool->shutdown){
pthread_mutex_unlock(&(pool->lock));
printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
pthread_detach(pthread_self());
pthread_exit(NULL);
}
// 从任务队列里获取任务,是一个出队操作(从队头取)
task.function = pool->task_queue[pool->queue_front].function;
task.arg = pool->task_queue[pool->queue_front].arg;
pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;
pool->queue_size--;
// 通知可以有新的任务加进来(消耗了一个任务,满足队列非满的条件)
pthread_mutex_unlock(&(pool->lock)); // 先解锁,改
pthread_cond_broadcast(&(pool->queue_not_full));
// 执行任务
printf("thread 0x%x start working\n", (unsigned int)pthread_self());
pthread_mutex_lock(&(pool->thread_counter));
pool->busy_thr_num++;
pthread_mutex_unlock(&(pool->thread_counter));
// 等价于 task.function(task.arg)
(*(task.function))(task.arg);
// 任务处理结束
printf("thread 0x%x end working\n", (unsigned int)pthread_self());
pthread_mutex_lock(&(pool->thread_counter));
pool->busy_thr_num--;
pthread_mutex_unlock(&(pool->thread_counter));
}
pthread_exit(NULL);
}
// 管理者线程的回调函数
void* adjust_thread(void* threadpool){
int i;
threadpool_t *pool = (threadpool_t*)threadpool; // 接收传入参数
while(!(pool->shutdown)){ // 当线程池为关闭时
// 定时对线程池管理
sleep(DEFAULT_TIME); // 默认为10s
pthread_mutex_lock(&(pool->lock));
int queue_size = pool->queue_size; // 关注任务数
int live_thr_num = pool->live_thr_num; // 关注存活线程数
pthread_mutex_unlock(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
int busy_thr_num = pool->busy_thr_num; // 关注忙线程数
pthread_mutex_unlock(&(pool->thread_counter));
// 创建新线程,算法:任务数大于最小线程池个数 且存活的线程数少于最大线程数(改)
if(queue_size >= pool->min_thr_num && live_thr_num < pool->max_thr_num){
pthread_mutex_lock(&(pool->lock));
int add = 0;
// 一次增加DEFAULT_THREAD_VARY个线程
for(i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
&& pool->live_thr_num < pool->max_thr_num; i++){
// 当线程数组该位未使用或者使用过但被销毁了,将新线程放入该位置
if(pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])){
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool);
add++;
pool->live_thr_num++;
}
}
pthread_mutex_unlock(&(pool->lock));
}
// 销毁线程,算法:忙线程* 2 < 存活的线程数 且存活线程数 大于 最小线程数
if((busy_thr_num * 2) < live_thr_num && live_thr_num > min_thr_num){
// 一次销毁DEFAULT_THREAD_VARY个线程
pthread_mutex_lock(&(pool->lock));
pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; // 设置要销毁的线程数为DEFAULT_THREAD_VARY
pthread_mutex_unlock(&(pool->lock));
if(i = 0 ; i < DEFAULT_THREAD_VARY; i++){
// 通知空闲状态的线程,他们会在处理的函数中终止
pthread_cond_signal(&(pool->queue_not_empty));
}
}
}
return NULL;
}
// 创建线程池,传入 线程池最小线程数,最大线程数,和任务队列的最大容量
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size){
int i;
threadpool_t *pool = NULL; // 线程池结构体指针
do{
// 开辟线程池空间
if((pool = (threadpool_t*) malloc(sizeof(threadpool_t))) == NULL){
printf("malloc threadpool fail\n");
break;
}
// 初始化线程池
pool->min_thr_num = min_thr_num;
pool->max_thr_num = max_thr_num;
pool->busy_thr_num = 0;
pool->live_thr_num = min_thr_num; // 存活的线程数,初值设置为最小线程数
pool->wait_exit_thr_num = 0;
pool->queue_size = 0; // 任务队列中的任务数初值为0
pool->queue_front = 0;
pool->rear = 0;
pool->queue_max_size = queue_max_size;
pool->shutdown = false; // 是否关闭,设置false-》打开
// 根据最大线程上限数,给工作县城数组开辟空间,并清零
pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * max_thr_num);
if(pool->threads == NULL){
printf("malloc threads fail\n");
break;
}
memset(pool->threads, 0, sizeof(pthread_t) * max_thr_num);
// 给任务队列开辟空间
pool->task_queue = (threadpool_task_t*)malloc(sizeof(threadpool_task_t) * queue_max_size);
if(pool->task_queue == NULL){
printf("malloc threadpool_task_t fail\n");
break;
}
// 初始化互斥锁、条件变量=>成功返回0
if(pthread_mutex_init(&(pool->lock), NULL) != 0
|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0
|| pthread_cond_init((&pool->queue_not_empty), NULL) != 0
|| pthread_cond_init((&pool->queue_not_full), NULL) != 0){
printf("init the lock or cond fail\n");
break;
}
// 创建最小数的线程,并启动这些线程
for(i = 0; i < min_thr_num; i++){
// threadpool_thread是回调函数,将pool的地址作为参数传给回调函数
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool);
printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]); // 打印线程id
}
// 创建管理者线程
pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void*)pool);
return pool;
}while(0);
threadpool_free(pool); // 释放已开辟的空间
return NULL;
}
// 向线程池中添加一个任务
int threadpool_add(threadpool_t *pool, void*(*function)(void* arg), void* arg){
pthread_mutex_lock(&(pool->lock));
// 当队列满时,条件变量 队列非满开始阻塞添加任务
while((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)){
// 当有任务被消耗时,会发出信号解除阻塞
pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
}
if(pool->shutdown){ // 当线程池未使用 ?
pthread_mutex_unlock(&(pool->lock));
pthread_cond_broadcast(&(pool->queue_not_empty));
return 0;
}
// 清空 工作线程的调用的回调函数的参数
if(pool->task_queue[pool->queue_rear].arg != NULL){
pool->task_queue[pool->queue_rear].arg = NULL;
}
// 添加到任务队列,向队尾添加
pool->task_queue[pool->queue_rear].function = function;
pool->task_queue[pool->queue_rear].arg = arg;
pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;
pool->queue_size++;
// 添加完任务后,队列不为空,唤醒线程中,等待队列非空的线程
// 先释放锁,再通知wait的线程
pthread_mutex_unlock(&(pool->lock));
pthread_cond_signal(&(pool->queue_not_empty));
return 0;
}
// 销毁线程池
int threadpool_destroy(threadpool_t *pool){
int i;
if(pool == NULL){
return -1;
}
pool->shutdown = true; // 将线程池设置为 不使用
// 先销毁管理线程
pthread_join(pool->adjust_tid, NULL);
for(i = 0; i < pool->live_thr_num; i++){
// 通知所有空闲线程, 会在工作线程的回调函数中自动返回
pthread_cond_broadcast(&(pool->queue_not_empty));
}
for(i = 0; i < pool->live_thr_num; i++){
pthread_join(pool->threads[i], NULL);
}
threadpool_free(pool); // 释放线程池空间
return 0;
}
// 释放线程池空间
int threadpool_free(threadpool_t *pool){
if(pool == NULL){
return -1;
}
if(pool->task_queue){
free(pool->task_queue);
}
if(pool->threads){
free(pool->threads);
pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
pthread_mutex_destroy(&(pool->thread_counter));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
}
free(pool);
pool = NULL;
return 0;
}
int main(void){
// 创建一个线程池,最小线程为3,最大为100,任务队列最大100
threadpool_t *thp = threadpool_create(3, 100, 100);
printf("pool inited\n");
// 模拟创建任务
int num[20], i;
for(i = 0; i < 20; i++){
num[i] = i;
printf("add task %d\n", i);
// 模拟向线程池中添加任务
threadpool_add(thp, process, (void*)&num[i]);
}
sleep(10);
threadpool_destroy(thp); // 销毁线程池
return 0;
}
5.4 执行的流程图
6. C/S模型----UDP
6.1 TCP和UDP通信的优缺点
udp不借助select等即可实现并发,因为不需要连接
6.2 UDP实现C/S模型
6.3 recvfrom和sendto
(1)recvfrom函数
总结:
(2)sendto函数
总结:
6.4 代码实现
服务端 server.c
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<ctype.h>
#define SERV_PORT 9557
int main(){
int sfd, ret, i;
char buf[1024], c_ip[1024];
struct sockaddr_in srv_addr, c_addr;
socklen_t client_addr_len;
// 创建一个套接字
sfd = socket(AF_INET, SOCK_DGRAM, 0); // 报式协议---udp
// 绑定地址结构
srv_addr.sin_family = AF_INET;
srv_addr.sin_port = htons(SERV_PORT);
srv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
// 写法2 inet_pton(AF_INET, "127.0.0.1", &srv_addr.sin_addr);
bind(sfd, (struct sockaddr*)&srv_addr, sizeof(srv_addr));
// 设置监听上限
listen(sfd, 128);
client_addr_len = sizeof(c_addr);
while(1){
// 接收数据
ret = recvfrom(sfd, buf, sizeof(buf), 0, (struct sockaddr*)&c_addr, &client_addr_len);
// 打印对端ip和端口
inet_ntop(AF_INET, &c_addr.sin_addr, c_ip, sizeof(c_ip));
printf("client's IP :%s port:%d\n", c_ip, ntohs(c_addr.sin_port));
// 小写转大写
for(i = 0; i < ret; i++)
buf[i] = toupper(buf[i]);
// 写回对端和标准输出
sendto(sfd, buf, ret, 0, (struct sockaddr*)&c_addr, client_addr_len);
}
close(sfd);
return 0;
}
客户端 client.c
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<ctype.h>
#define SERV_PORT 9557
#define SERV_IP "127.0.0.1"
int main(){
int cfd, ret;
struct sockaddr_in srv_addr;
char buf[1024];
cfd = socket(AF_INET, SOCK_DGRAM, 0);
memset(&srv_addr, 0, sizeof(srv_addr)); // 将地址清零
srv_addr.sin_family = AF_INET;
srv_addr.sin_port = htons(SERV_PORT);
inet_pton(AF_INET, SERV_IP, &srv_addr.sin_addr);
while(1){
fgets(buf, sizeof(buf), stdin);
sendto(cfd, buf, strlen(buf), 0, (struct sockaddr*) &srv_addr, sizeof(srv_addr));
ret = recvfrom(cfd, buf, sizeof(buf), 0, NULL, 0);
write(STDOUT_FILENO, buf, ret);
}
close(cfd);
return 0;
}
运行结果;
服务端可以打印所有传输数据的客户端的ip和端口(实现并发)
客户端打印输入字符串的大写
7. 本地套接字domain(socket IPC)
实现本地的进程间通信
7.1 本地套接字和网络套接字对比
(1)地址结构对比
分别通过man 7 ip 和 man 7 unix 查看
总结:
unlink的作用是类似于删除一个文件,保证bind()创建的套接字未存在
bind的参三是实际的地址结构使用大小,并非开辟空间的大小
7.2 本地套接字的结构体
该结构体包含在 #include<sys/un.h> 中
7.3 服务端实现
#include"wrap.h"
#include<stddef.h>
#include<ctype.h>
#include<sys/un.h>
#define SERV_ADDR "srv.socket" // 设置套接字的文件名
int main(){
int lfd, cfd, len, size, i;
char buf[1024];
struct sockaddr_un srv_addr, clt_addr;
lfd = Socket(AF_UNIX, SOCK_STREAM, 0); // 参2两种都行
memset(&srv_addr, 0, sizeof(srv_addr)); // 将内存清零
// 初始化服务端的地址结构
srv_addr.sun_family = AF_UNIX;
strcpy(srv_addr.sun_path, SERV_ADDR);
// offsetof用于查看成员在结构体的偏移字节
// len计算该块空间内实际使用字节数
len = offsetof(struct sockaddr_un, sun_path) + strlen(srv_addr.sun_path);
unlink(SERV_ADDR); // 将该文件关闭,防止在创建socket已有同名文件
Bind(lfd, (struct sockaddr*)&srv_addr, len); // 参3不为sizeof(srv_addr)
Listen(lfd, 20);
printf("Accept ...\n");
while(1){
len = sizeof(clt_addr); // len保存客户端的地址结构大小
// clt_addr传出参数,保存对端的地址结构
cfd = Accept(lfd, (struct sockaddr*)&clt_addr, &len);
len -= offsetof(struct sockaddr_un, sun_path); // 保存对端文件名长度
clt_addr.sun_path[len] = '\0';
printf("client bind filename:%s\n", clt_addr.sun_path);
while((size = Read(cfd, buf, sizeof(buf))) > 0){
for(i = 0; i < size; i++)
buf[i] = toupper(buf[i]);
Write(cfd, buf, size);
}
close(cfd);
}
close(lfd);
return 0;
}
7.4 客户端实现
#include"wrap.h"
#include<stddef.h>
#include<ctype.h>
#include<sys/un.h>
#define SERV_ADDR "srv.socket"
#define CLT_ADDR "clt.socket"
int main(){
int fd, len, size, i;
char buf[1024];
struct sockaddr_un s_addr, c_addr;
fd = Socket(AF_UNIX, SOCK_STREAM, 0);
memset(&c_addr, 0, sizeof(c_addr));
// 初始化clt_addr
c_addr.sun_family = AF_UNIX;
strcpy(c_addr.sun_path, CLT_ADDR);
len = offsetof(struct sockaddr_un, sun_path) + strlen(c_addr.sun_path);
unlink(CLT_ADDR);
Bind(fd, (struct sockaddr*)&c_addr, len);
// 设置srv_addr
memset(&s_addr, 0, sizeof(s_addr));
s_addr.sun_family = AF_UNIX;
strcpy(s_addr.sun_path, SERV_ADDR);
len = offsetof(struct sockaddr_un, sun_path) + strlen(s_addr.sun_path);
// 连接服务器
Connect(fd, (struct sockaddr*)&s_addr, len);
printf("connect sucessful\n");
while(fgets(buf, sizeof(buf), stdin) != NULL){
len = strlen(buf);
Write(fd, buf, len);
len = Read(fd, buf , sizeof(buf));
Write(STDOUT_FILENO, buf, len);
}
close(fd);
return 0;
}
fgets读会在读到的内容后加 \0,而read不会
运行结果:
7.5 总结
四、 libevent库
1. libevent官方
libevent 是一个基于 epoll(及其他类似机制)构建的上层跨平台网络库
1.1 libevent库的下载及安装
(1)下载libevent
官方链接
下载安装包后,解压该安装包
解压完成后,会生成同名文件夹
(2)安装
(3)测试—利用sample目录下的hello_world.c
需链接动态库再编译,使用方法为-l + 动态库名(去掉前缀lib 和后缀.so-----例如libevent.so=>event)
运行结果:
报错原因:需要指定动态库所在目录
解决方法1:临时解决,设置环境变量
:为分隔符,表示库文件既可以在/usr/local/lib下寻找,也可以在LD_LIBRARY_PATH下寻找
解决方法2:永久生效(用户级)
结果:
另一终端:
2. libevent框架
2.1 创建event_base
在这一步,libevent 会自动检测当前操作系统所能支持的最高效的 I/O 多路复用机制。如果在 Linux 上,它毫无疑问会优先选择 epoll。
2.2 创建、添加事件
添加事件=》在内部,libevent 会通过 epoll_ctl 系统调用,将这个 sockfd 和其关注的事件(EV_READ)添加到 epoll 内核事件表中。
2.3 启动循环
这是一个无限循环,在内部它核心的工作就是:
- 调用 epoll_wait 系统调用,阻塞等待这些文件描述符上是否有事件发生。
- 当 epoll_wait 返回后,意味着有一个或多个事件就绪(例如,某个 socket 有数据可读了)。
- libevent 会根据 epoll_wait 返回的结果,将就绪的事件转换为 active state,然后依次触发调用你之前为每个事件设置好的回调函数。
2.4 释放event_base
2.5 其他相关函数(了解)
第一个返回指针的指针,可以赋值为字符数组指针
实例:
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<event2/event.h>
int main(){
int i = 0;
const char **buf1, *buf2;
struct event_base *base = event_base_new();
// 获取支持哪些多路IO
buf1 = event_get_supported_methods();
while(1){
if(buf1[i] == NULL)
break;
printf("%s\n", buf1[i++]);
}
// 查看当前用的多路IO
buf2 = event_base_get_method(base);
printf("当前使用:%s\n", buf2);
return 0;
}
运行结果:
注:在编译时,需添加 -levent
2.6 总结
3. 常规事件
3.1 创建事件对象
总结:
3.2 事件event操作
(1)添加事件到base上
(2)释放事件
(3)总结
3.3. 实例–使用命名管道通过libevent传输数据
读端:
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<event2/event.h>
void sys_err(const char* str){
perror(str);
exit(1);
}
// 读事件的回调函数
void read_cb(evutil_socket_t fd, short what, void* arg){
char buf[1024];
int len = read(fd, buf, sizeof(buf));
buf[len] = '\0';
printf("what = %s:, read from write:%s\n"
, what & EV_READ ? "YES":"NO", buf);
sleep(1);
return;
}
int main(){
unlink("testfifo");
mkfifo("testfifo", 0664); // 创建命名管道
int fd = open("testfifo", O_RDONLY | O_NONBLOCK);
if(fd == -1)
sys_err("open error");
// 创建event_base
struct event_base *base = event_base_new();
// 创建事件对象
struct event *ev = event_new(base, fd, EV_READ | EV_PERSIST, read_cb, NULL);
// 将事件增加到base上
event_add(ev, NULL);
// 启动循环监听(事件需设置为 EV|PERSIST)
event_base_dispatch(base);
// 释放base
event_base_free(base);
return 0;
}
写端:
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<string.h>
#include<event2/event.h>
void sys_err(const char* str){
perror(str);
exit(1);
}
// 读事件的回调函数
void write_cb(evutil_socket_t fd, short what, void* arg){
char buf[] = "hello libevent";
write(fd, buf, strlen(buf) + 1);
sleep(1);
return;
}
int main(){
int fd = open("testfifo", O_WRONLY | O_NONBLOCK);
if(fd == -1)
sys_err("open error");
// 创建event_base
struct event_base *base = event_base_new();
// 创建事件对象
struct event *ev = event_new(base, fd, EV_WRITE | EV_PERSIST, write_cb, NULL);
// 将事件增加到base上
event_add(ev, NULL);
// 启动循环监听(事件需设置为 EV|PERSIST)
event_base_dispatch(base);
// 释放base
event_base_free(base);
return 0;
}
运行结果:
3.4 未决和非未决
总结:
4. 带缓冲区的事件—bufferevent
4.1 bufferevent
使用bufferevent不需要手动将事件添加到base上
图示:
4.2 事件的创建、释放
总结:
自动注册:调用 bufferevent_socket_new() 时,bufferevent 会自动与 event_base 关联
隐式管理:bufferevent_enable() 和 bufferevent_disable() 会自动处理事件的添加和移除
无需手动操作:不需要调用 event_add() 或类似函数来管理这些事件
4.3 给读写缓冲区设置回调
readcb对应的读回调函数:
writecb对应的回调函数:
eventcb对应的回调函数:
总结:
4.4 缓冲区的关闭和开启
默认开启写缓冲,关闭读缓冲
总结:
4.5 客户端的连接和监听
(1)实现流程
(2)总结
(3)具体代码
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<ctype.h>
#include<string.h>
#include<event2/bufferevent.h>
#include<event2/listener.h>
#include<arpa/inet.h>
#include<sys/socket.h>
// 读回调
void read_cb(struct bufferevent* bev, void* ctx){
char buf[1024] = {0};
bufferevent_read(bev, buf, sizeof(buf));
printf("server say:%s\n", buf);
char *p = "I am client and receive your message sucessfully!";
// 写数据给客户端
bufferevent_write(bev, p, strlen(p) + 1);
sleep(1);
}
// 写回调
void write_cb(struct bufferevent* bev, void* ctx){
printf("I am client and write message sucessfully\n");
}
// 事件回调
void event_cb(struct bufferevent* bev, short events, void* ctx){
if(events & BEV_EVENT_EOF){ // 遇到文件结束指示
printf("connection closed\n");
}
else if(events & BEV_EVENT_ERROR){ // 操作时发生错误
printf("some other error\n");
}else if(events & BEV_EVENT_CONNECTED){ // 当连接成功时,写数据
printf("connect sucessfully\n");
// 连接成功后发送初始消息
char *greeting = "Hello server! I am client.";
bufferevent_write(bev, greeting, strlen(greeting) + 1);
}
if(events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)){
// 释放资源
bufferevent_free(bev);
printf("buffrevent free\n");
}
}
int main(){
// 创建事件基
struct event_base *base = event_base_new();
// 连接服务端
// 1.设置服务端地址结构
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr)); // 清空该块空间
addr.sin_family = AF_INET;
addr.sin_port = htons(9558);
inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
int fd = socket(AF_INET, SOCK_STREAM, 0); // 创建socket
// 2. 创建事件,并添加到base
struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
// 3. 连接服务端
bufferevent_socket_connect(bev, (struct sockaddr*)&addr, sizeof(addr));
// 设置回调
bufferevent_setcb(bev, read_cb, write_cb, event_cb, NULL);
// 启动缓冲区
bufferevent_enable(bev, EV_READ);
// 循环监听
event_base_dispatch(base);
// 释放资源
event_base_free(base);
return 0;
}
注:客户端需自己创建socket,而服务端创建socket已经封装在创建监听器中了。
运行结果:
4.6 服务端的连接和监听
(1)创建监听器
创建监听者对象可以完成socket,bind,listen,accept四个函数的任务
(2)回调函数类型:
(3)实现流程
(4)释放监听服务器
(5)具体代码
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<ctype.h>
#include<string.h>
#include<event2/bufferevent.h>
#include<event2/listener.h>
#include<arpa/inet.h>
// 读回调
void read_cb(struct bufferevent* bev, void* ctx){
char buf[1024] = {0};
bufferevent_read(bev, buf, sizeof(buf));
printf("client say:%s\n", buf);
char *p = "I am server and receive your message sucessfully!";
// 写数据给客户端
bufferevent_write(bev, p, strlen(p) + 1);
sleep(1);
}
// 写回调
void write_cb(struct bufferevent* bev, void* ctx){
printf("I am server and write message sucessfully\n");
}
// 事件回调
void event_cb(struct bufferevent* bev, short events, void* ctx){
if(events & BEV_EVENT_EOF){ // 遇到文件结束指示
printf("connection closed\n");
}
else if(events & BEV_EVENT_ERROR){ // 操作时发生错误
printf("some other error\n");
}
// 释放资源
bufferevent_free(bev);
printf("buffrevent free\n");
}
// 当有新客户端连接时,会调用该函数(会将accept得到lfd传给fd)
void cb_listener(
struct evconnlistener* listener,evutil_socket_t fd,
struct sockaddr* addr,int len,void* ptr){
printf("connect new client\n");
// 接收传入的base
struct event_base *base = (struct event_base*)ptr;
// 创建新的事件,并添加
struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
// 设置事件的缓冲区回调
bufferevent_setcb(bev, read_cb, write_cb, event_cb, NULL);
// 打开缓冲区(默认读缓冲关闭,需启动)
bufferevent_enable(bev, EV_READ);
}
int main(){
struct sockaddr_in addr; // 监听器地址结构
// 创建event_base
struct event_base *base = event_base_new();
// 创建监听器
// 设置监听器地址结构
addr.sin_family = AF_INET;
addr.sin_port = htons(9558);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
// 参三传base的原因:回调函数中会使用到base,参三会直接作为回调函数的参数
struct evconnlistener *listener = evconnlistener_new_bind(
base, cb_listener, base,
LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE,
36, (struct sockaddr*)&addr, sizeof(addr)
);
// 启动循环
event_base_dispatch(base);
// 释放监听器
evconnlistener_free(listener);
// 释放事件基
event_base_free(base);
return 0;
}
运行结果:
(6)不使用read读数据的原因
int* myptr = (int*)malloc(4);
*myptr = fd;
// 设置事件的缓冲区回调
// bufferevent_setcb(bev, read_cb, write_cb, event_cb, NULL);
bufferevent_setcb(bev, read_cb, write_cb, event_cb, (void*)myptr);
这么写确实可以使用socket,但是会破坏libevent原有的逻辑,而且无法读到数据
五、 简单Web服务器
1. HTML
自学网站:
w3school
1.1 列表标签
例:
1.2 图片标签
例:
1.3 超链接标签
例:
还可以超链接标签中写图片标签,实现点击图片跳转
例:
1.4 表格标签
例:
1.5 文本和标题标签
例:
2. HTTP协议基础
2.1 请求消息(Request)
例:
2.2 响应消息(Response)
例:
2.3 http请求方法
2.4 http常用状态码
2.5 常见网络文件类型
3. 服务器的实现
3.1 单文件通信流程
3.2 正则表达式获取文件名
else if(ret > 0){ // 获取第一行的内容,并用正则表达式拆分为三部分
char method[16], path[256], protocol[16];
sscanf(buf, "%[^ ] %[^ ] %[^ ]", method, path, protocol); // % 是c语言标准一个检查集的开始
printf("method = %s, path =%s, protocol=%s", method, path, protocol);
}
3.3 判断文件是否存在
// get请求处理函数,判断文件是否存在并回发
void http_request(const char* filename){
struct stat sbuf;
int ret = stat(filename, &sbuf); // 判断文件是否存在
if(ret < 0){
// 回发错误页面404
perror("stat error");
exit(1);
}
// 判断文件类型
if(S_ISREG(sbuf.st_mode)){ // 普通文件
printf("---------It is a file\n");
// 回发 http协议应答
// 回发 给客户端请求数据内容
}
}
3.4 写出http应答协议头 和 发送请求的文件
// http响应协议头封装
// cfd, 错误号, 错误描述, 类型, 文件长度
void send_response(int cfd, int no, char* description, char* type, int len){
char buf[1024];
sprintf(buf, "HTTP/1.1 %d %s\r\n", no, description);
send(cfd, buf, strlen(buf), 0);
sprintf(buf, "%s\r\n", type);
send(cfd, buf, strlen(buf), 0);
sprintf(buf, "Content-Length:%d\r\n", len);
send(cfd, buf, strlen(buf), 0);
send(cfd, "\r\n", 2, 0);
}
// 发送请求的文件
void send_file(int cfd, const char* filename){
int fd = open(filename, O_RDONLY);
if(fd == -1){
// 发送错误页面404
perror("open error");
exit(1);
}
int n = 0;
char buf[1024] = {0};
while((n = read(fd, buf , sizeof(buf))) > 0){
send(cfd, buf, n, 0);
}
close(fd);
return;
}
char* get_file_type(const char* filename){
char* dot;
// 自右向左查找'.',dot保存 ". + 右边的字符串",无点返回NULL
dot = strrchr(filename, '.');
if(dot == NULL)
return "Content-Type:text/plain;charset=iso-8859-1";
if(strcmp(dot, ".png") == 0)
return "Content-Type:image/png";
if((strcmp(dot, ".jpg") == 0) || strcmp(dot, ".jepg") == 0)
return "Content-Type:image/jepg";
if(strcmp(dot, ".gif") == 0)
return "Content-Type:image/gif";
if(strcmp(dot, ".mp3") == 0)
return "Content-Type:audio/mpeg";
return "Content-Type:text/plain;charset=utf-8";
}
// get请求处理函数,判断文件是否存在并回发
void http_request(int cfd, const char* filename){
struct stat sbuf;
int ret = stat(filename, &sbuf); // 判断文件是否存在
if(ret < 0){
// 回发错误页面404
perror("stat error");
exit(1);
}
// 判断文件类型
if(S_ISREG(sbuf.st_mode)){ // 普通文件
printf("---------It is a file\n");
// 回发 http协议应答(不同的文件类型,需修改参4)
// send_response(cfd, 200, "OK", "Content-Type:text/plain;charset=iso-8859-1", sbuf.st_size);
// send_response(cfd, 200, "OK", "Content-Type:image/png", sbuf.st_size);
char* type = get_file_type(filename);
send_response(cfd, 200, "OK", type, -1);
// 回发 给客户端请求数据内容
send_file(cfd, filename);
}
}
3.5 错误页面404
3.6 处理目录文件
3.7 汉字字符的编码和解码
3.8 telnet调试
3.9 容错处理
3.10 具体代码
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<sys/epoll.h>
#include<sys/socket.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<arpa/inet.h>
#define MAXSIZE 1024
// 读取\r\n一行的数据,保存数据结尾只保存/n
int get_line(int cfd, char* buf, int size){
int i = 0;
char c = '\0';
int n;
while((i < size - 1) && (c != '\n')){
n = recv(cfd, &c, 1, 0); // 相当于read
if(n > 0){
if(c == '\r'){
n = recv(cfd, &c, 1, MSG_PEEK); // 模拟读一次,缓冲区的数据不会消失
if((n > 0) && (c == '\n'))
n = recv(cfd, &c, 1, 0); // 实际读出数据
else
c = '\r'; // 若不满足,将c改回\r
}
buf[i++] = c;
}
else{
c = '\n';
}
}
buf[i] = '\0';
if(-1 == n)
i = n;
return i;
}
// 创建并初始化lfd
int init_listen_fd(int port, int efd){
int lfd, ret;
struct sockaddr_in s_addr;
// 创建lfd, 并绑定地址结构
lfd = socket(AF_INET, SOCK_STREAM, 0);
if(lfd < 0){
perror("socket error");
exit(1);
}
s_addr.sin_family = AF_INET;
s_addr.sin_port = htons(port);
s_addr.sin_addr.s_addr = htonl(INADDR_ANY);
// 端口复用
int opt = 1;
setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
ret = bind(lfd, (struct sockaddr*)&s_addr, sizeof(s_addr));
if(ret < 0){
perror("bind error");
exit(1);
}
listen(lfd, 20);
// 将lfd的结点添加上树
struct epoll_event tmp;
tmp.events = EPOLLIN; // 监听读事件
tmp.data.fd = lfd; // fd为lfd
ret = epoll_ctl(efd, EPOLL_CTL_ADD, lfd, &tmp); // 将lfd结点上树
if(ret < 0){
perror("epoll_ctl error");
exit(1);
}
return lfd;
}
// lfd触发的事件
void do_accept(int lfd, int efd){
struct sockaddr_in c_addr;
socklen_t c_size;
// 创建新的套接字cfd
int cfd = accept(lfd, (struct sockaddr*)&c_addr, &c_size);
if(cfd < 0){
perror("accept error");
exit(1);
}
// 将cfd设置为非阻塞
int flags = fcntl(cfd, F_GETFL);
flags |= O_NONBLOCK;
fcntl(cfd, F_SETFL,flags);
// 创建新的结点
struct epoll_event tmp;
tmp.events = EPOLLIN | EPOLLET; // 设置为边沿触发
tmp.data.fd = cfd;
// 添加到树上
int ret = epoll_ctl(efd, EPOLL_CTL_ADD, cfd, &tmp);
if(ret == -1){
perror("epoll_ctl error");
exit(1);
}
}
// 断开连接的函数
void disconnect(int cfd, int efd){
int ret = epoll_ctl(efd, EPOLL_CTL_DEL, cfd, NULL);
if(ret < 0){
perror("epoll_ctl error");
exit(1);
}
close(cfd);
return;
}
// http响应协议头封装
// cfd, 错误号, 错误描述, 类型, 文件长度
void send_response(int cfd, int no, char* description, char* type, int len){
char buf[1024];
sprintf(buf, "HTTP/1.1 %d %s\r\n", no, description);
send(cfd, buf, strlen(buf), 0);
sprintf(buf, "%s\r\n", type);
send(cfd, buf, strlen(buf), 0);
sprintf(buf, "Content-Length:%d\r\n", len);
send(cfd, buf, strlen(buf), 0);
send(cfd, "\r\n", 2, 0);
}
// 发送请求的文件
void send_file(int cfd, const char* filename){
int fd = open(filename, O_RDONLY);
if(fd == -1){
// 发送错误页面404
perror("open error");
exit(1);
}
int n = 0;
char buf[1024] = {0};
while((n = read(fd, buf , sizeof(buf))) > 0){
send(cfd, buf, n, 0);
}
close(fd);
return;
}
char* get_file_type(const char* filename){
char* dot;
// 自右向左查找'.',dot保存 ". + 右边的字符串",无点返回NULL
dot = strrchr(filename, '.');
if(dot == NULL)
return "Content-Type:text/plain;charset=iso-8859-1";
if(strcmp(dot, ".png") == 0)
return "Content-Type:image/png";
if((strcmp(dot, ".jpg") == 0) || strcmp(dot, ".jepg") == 0)
return "Content-Type:image/jepg";
if(strcmp(dot, ".gif") == 0)
return "Content-Type:image/gif";
if(strcmp(dot, ".mp3") == 0)
return "Content-Type:audio/mpeg";
return "Content-Type:text/plain;charset=utf-8";
}
// get请求处理函数,判断文件是否存在并回发
void http_request(int cfd, const char* filename){
struct stat sbuf;
int ret = stat(filename, &sbuf); // 判断文件是否存在
if(ret < 0){
// 回发错误页面404
perror("stat error");
exit(1);
}
// 判断文件类型
if(S_ISREG(sbuf.st_mode)){ // 普通文件
printf("---------It is a file\n");
// 回发 http协议应答(不同的文件类型,需修改参4)
// send_response(cfd, 200, "OK", "Content-Type:text/plain;charset=iso-8859-1", sbuf.st_size);
// send_response(cfd, 200, "OK", "Content-Type:image/png", sbuf.st_size);
send_response(cfd, 200, "OK", get_file_type(filename), -1);
// 回发 给客户端请求数据内容
send_file(cfd, filename);
}
else if(S_ISDIR(sbuf.st_mode)){ // 目录文件
printf("---------It is a dir\n");
// 发协议头
// 发具体数据
}
}
// cfd触发的事件(读HTTP请求报文的第一行)
void do_read(int cfd, int efd){
char buf[1024] = {0};
int ret = get_line(cfd, buf, sizeof(buf));
if(ret == 0){
printf("对端关闭,断开链接\n");
disconnect(cfd, efd);
}
else if(ret > 0){ // 获取第一行的内容,并用正则表达式拆分为三部分
char method[16], path[256], protocol[16];
sscanf(buf, "%[^ ] %[^ ] %[^ ]", method, path, protocol); // % 是c语言标准一个检查集的开始
printf("method = %s, path =%s, protocol=%s", method, path, protocol);
// 读取剩余数据
char buf2[1024];
while(1){
ret = get_line(cfd, buf2, sizeof(buf2));
if(ret == 1 || ret == -1){ // 当接收到最后一个空行或出错时退出
break;
}
printf("%s\n", buf2);
sleep(1);
}
if(strncasecmp(method, "GET", 3) == 0){ // 判断是否为get请求
char* filename = path + 1; // 取文件名,"/hello.c"去掉 /
// 处理开始传空的情况
if(strcmp(path, "/") == 0)
filename = "./";
http_request(cfd, filename);
}
}
}
// 启动epoll
void epoll_run(int port){
struct epoll_event all_events[MAXSIZE]; // wait的传出参数
int ret, i;
// 创建监听树
int efd = epoll_create(MAXSIZE);
if(efd == -1){
perror("epoll_create error");
exit(1);
}
// 创建lfd,并添加到树上
int lfd = init_listen_fd(port ,efd);
// 循环监听红黑树
while(1){
// 阻塞监听对应结点的事件, 返回监听到的事件数
ret = epoll_wait(efd, all_events, MAXSIZE, -1);
if(ret == -1){
perror("epoll_wait error");
exit(1);
}
for(i = 0; i < ret; i++){
// 设置只处理读事件
struct epoll_event* pev = &all_events[i];
// 不是读事件就跳过
if(!(pev->events & EPOLLIN))
continue;
if(pev->data.fd == lfd){ // 当事件为lfd触发时
do_accept(lfd, efd);
}
else{ // 当为cfd触发时,读事件
do_read(pev->data.fd, efd);
}
}
}
}
int main(int argc, char* argv[]){
if(argc < 3){ // 当启动服务器参数不够,直接退出
printf("./server port path\n");
return 0;
}
// 获取用户输入的端口
int port = atoi(argv[1]);
// 改变当前工作目录=》传入的参3
int ret = chdir(argv[2]);
if(ret != 0){
perror("chdir error");
exit(1);
}
// 启动epoll
epoll_run(port);
return 0;
}
4. 正则表达式
4.1 基本语法
字符类:
数量限定符: