在高并发网络编程中,传统的 阻塞 I/O + 多线程模型 已经难以支撑百万级连接。为了解决这一问题,Linux 提供了 epoll 等高效 I/O 复用机制,结合 Reactor 模式,我们可以构建一个性能优越的 WebSocket/HTTP Server。
一、什么是 Reactor 模式?
Reactor 模式 是一种典型的事件驱动设计模式,核心思想是:
Reactor:事件分发器,负责监听和分发事件(如
accept
、read
、write
)。Handler:事件处理器,根据事件类型执行相应的业务逻辑。
Demultiplexer:事件多路复用器(如
epoll
、select
、poll
),负责监控多个连接的状态。
一句话概括:
Reactor 模式就是用一个线程/少量线程,监听所有 socket 的事件,并分发给对应的处理函数。
二、代码结构
在这份实现中,我们将代码分为三个核心文件:
Reactor.c
负责epoll
事件循环,监听和分发事件。Webserver.c
负责 HTTP/WebSocket 协议解析和响应。server.h
定义连接对象struct conn
以及回调函数指针。
1. Reactor 主循环(Reactor.c)
while(1){ // mainloop
struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd, events, 1024, -1);
for(int i = 0; i < nready; i++){
int connfd = events[i].data.fd;
if(events[i].events & EPOLLIN){
conn_list[connfd].r_action.recv_callback(connfd);
}
if(events[i].events & EPOLLOUT){
conn_list[connfd].send_callback(connfd);
}
}
}
这里就是典型的 Reactor 模式:
epoll_wait
监听所有事件。收到 可读事件(
EPOLLIN
)时,调用recv_callback
。收到 可写事件(
EPOLLOUT
)时,调用send_callback
。
2. 事件回调(accept / recv / send)
accept_cb:处理客户端连接建立。
recv_cb:读取请求(支持 HTTP / WebSocket)。
send_cb:发送响应(HTTP 响应 或 WebSocket 帧)。
例如 recv_cb
:
int recv_cb(int fd){
int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);
if(count <= 0){
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return 0;
}
conn_list[fd].rlength = count;
// 分发给具体协议层(HTTP / WebSocket)
ws_request(&conn_list[fd]);
set_event(fd, EPOLLOUT, 0); // 切换到可写
return count;
}
这里可以看到,Reactor 层并不关心业务逻辑,它只负责分发事件。
3. WebServer 协议层(Webserver.c)
http_response
示例:
c->wlength = sprintf(c->wbuffer,
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/html\r\n"
"Content-Length: 82\r\n\r\n"
"<html><head><title>Charon.html</title></head><body><h1>Charon</h1></body></html>\r\n");
如果是 HTTP 请求,直接返回 HTML 或静态文件。
如果是 WebSocket 请求,则交给
ws_request/ws_response
处理握手和数据帧。
三、Reactor 模式下的 WebSocket/HTTP
1. HTTP 请求流程
客户端发起 TCP 连接 →
accept_cb
客户端发送 HTTP 报文 →
recv_cb
→http_request
服务器生成 HTTP 响应 →
http_response
→send_cb
2. WebSocket 请求流程
客户端通过 HTTP 请求发起 WebSocket 握手。
服务器返回
Sec-WebSocket-Accept
完成升级。后续数据传输通过 WebSocket 帧,走
ws_request/ws_response
。
这样,整个 Reactor 框架既能处理 普通 HTTP,又能处理 WebSocket 实时通信。
四、Reactor 模式业务分层的优点
I/O 多路复用层(Reactor.c)
负责
epoll_wait
事件分发不关心业务逻辑
协议层(Webserver.c / Websocket.c)
HTTP / WebSocket 解析
封装成
request/response
接口
业务逻辑层
上层业务只需要实现自己的逻辑,不用关心
epoll
。例如:聊天室消息分发、文件传输、心跳检测。
对比点 | 普通 epoll 写法 | Reactor 分层写法 |
---|---|---|
代码结构 | 事件循环里塞满协议解析和业务逻辑 | 事件循环只负责分发,协议层和业务层独立 |
可读性 | 逻辑混杂,代码庞大难读 | 清晰分层,逻辑职责单一 |
可维护性 | 改协议或改业务逻辑时容易影响主循环 | 主循环稳定,业务改动只影响对应层 |
扩展性 | 新增协议必须改 epoll 循环 |
新协议只需增加新的回调函数 |
复用性 | 协议逻辑和业务绑死在一起 | I/O 层、协议层、业务层可独立复用 |
五、完整代码
1、Reactor.c
#include <errno.h> // 包含错误码定义
#include <stdio.h> // 包含标准输入输出函数
#include <sys/socket.h> // 包含套接字相关函数和结构体
#include <netinet/in.h> // 包含网络相关结构体定义
#include <string.h> // 包含字符串操作函数
#include <pthread.h> // 包含线程相关函数
#include <unistd.h> // 包含Unix标准函数
#include <sys/epoll.h> // 包含epoll相关函数和结构体
#include <errno.h> // 再次包含错误码定义(可能冗余)
#include <sys/time.h> // 包含时间相关函数和结构体
#include <stdlib.h> // 包含标准库函数
#include "server.h" // 包含自定义的服务器头文件
#define CONNECTION_SIZE 1024*1024 // 最大连接数定义
#define MAX_PORTS 20 // 最大监听端口数量
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) // 计算两个时间戳的毫秒差
// 回调函数声明
int accept_cb(int fd); // 处理新连接的回调函数
int recv_cb(int fd); // 处理接收数据的回调函数
int send_cb(int fd); // 处理发送数据的回调函数
int epfd = 0; // epoll实例文件描述符
struct timeval begin; // 用于计时的开始时间
struct conn *conn_list; // 连接列表指针,用于存储所有连接信息
/**
* 设置epoll事件
* @param fd 文件描述符
* @param event 事件类型(EPOLLIN/EPOLLOUT等)
* @param flag 1表示添加事件,0表示修改事件
*/
int set_event(int fd, int event, int flag){
if(flag){ // 非零表示添加事件
struct epoll_event ev;
ev.events = event; // 设置事件类型
ev.data.fd = fd; // 关联文件描述符
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); // 添加事件到epoll
}else{ // 零表示修改事件
struct epoll_event ev;
ev.events = event; // 设置新的事件类型
ev.data.fd = fd; // 关联文件描述符
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev); // 修改epoll中的事件
}
}
/**
* 注册事件和连接信息
* @param fd 文件描述符
* @param event 要监听的事件
*/
int event_register(int fd, int event){
if (fd < 0) return -1; // 无效的文件描述符
// 初始化连接信息
conn_list[fd].fd = fd;
conn_list[fd].r_action.recv_callback = recv_cb; // 设置接收回调
conn_list[fd].send_callback = send_cb; // 设置发送回调
// 初始化接收缓冲区
memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
conn_list[fd].rlength = 0;
// 初始化发送缓冲区
memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);
conn_list[fd].wlength = 0;
// 添加事件监听
set_event(fd, event, 1);
}
/**
* 处理新连接的回调函数
* @param fd 服务器监听套接字
*/
int accept_cb(int fd){
struct sockaddr_in clientaddr; // 客户端地址结构体
socklen_t len = sizeof(clientaddr); // 地址长度
// 接受新连接
int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
if(clientfd < 0){ // 接受连接失败
printf("accept errno: %d --> %s\n", errno, strerror(errno));
return -1;
}
// 注册新连接,监听读事件
event_register(clientfd, EPOLLIN);
// 每1000个连接打印一次信息,用于性能统计
if((clientfd % 1000) == 0){
struct timeval current;
gettimeofday(¤t, NULL); // 获取当前时间
// 计算时间差
int time_used = TIME_SUB_MS(current,begin);
memcpy(&begin, ¤t,sizeof(struct timeval)); // 更新开始时间
printf("accept finished: %d, time_used: %d\n", clientfd, time_used);
}
return 0;
}
/**
* 处理接收数据的回调函数
* @param fd 客户端连接套接字
*/
int recv_cb(int fd){
// 清空接收缓冲区
memset(conn_list[fd].rbuffer,0,BUFFER_LENGTH);
// 接收数据
int count = recv(fd,conn_list[fd].rbuffer,BUFFER_LENGTH,0);
if(count == 0){ // 客户端断开连接
printf("client disconnect: %d\n",fd);
close(fd); // 关闭连接
epoll_ctl(epfd,EPOLL_CTL_DEL, fd, NULL); // 从epoll中移除事件
return 0;
}else if(count < 0) { // 接收数据出错
printf("count: %d, errno: %d, %s\n", count, errno, strerror(errno));
close(fd); // 关闭连接
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // 从epoll中移除事件
return 0;
}
// 记录接收数据长度
conn_list[fd].rlength = count;
#if 0 // 回声模式(当前禁用)
// 将接收缓冲区数据复制到发送缓冲区
conn_list[fd].wlength = conn_list[fd].rlength;
memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);
printf("[%d]RECV: %s\n", conn_list[fd].rlength, conn_list[fd].rbuffer);
#elif 0 // HTTP请求处理(当前禁用)
http_request(&conn_list[fd]);
#else // WebSocket请求处理(当前启用)
ws_request(&conn_list[fd]);
#endif
// 将事件切换为监听可写事件,准备发送数据
set_event(fd, EPOLLOUT, 0);
return count;
}
/**
* 处理发送数据的回调函数
* @param fd 客户端连接套接字
*/
int send_cb(int fd){
#if 0 // HTTP响应处理(当前禁用)
http_response(&conn_list[fd]);
#else // WebSocket响应处理(当前启用)
ws_response(&conn_list[fd]);
#endif
int count = 0; // 发送字节数
// 根据连接状态处理发送
if(conn_list[fd].status == 1){
// 状态1:发送数据并继续监听可写事件
count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
set_event(fd, EPOLLOUT, 0);
} else if (conn_list[fd].status == 2){
// 状态2:继续监听可写事件
set_event(fd, EPOLLOUT, 0);
} else if (conn_list[fd].status == 0){
// 状态0:发送数据(如果有)并切换到监听可读事件
if(conn_list[fd].wlength != 0){
count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
}
set_event(fd, EPOLLIN, 0);
}
return count;
}
/**
* 初始化服务器监听套接字
* @param port 监听端口
* @return 成功返回套接字描述符,失败返回-1
*/
int init_server(unsigned short port){
// 创建TCP套接字
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
// 初始化服务器地址结构体
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET; // IPv4协议
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听所有网卡
servaddr.sin_port = htons(port); // 设置端口号
// 绑定套接字到端口
if(-1 == bind(sockfd,(struct sockaddr*)&servaddr,sizeof(struct sockaddr))){
printf("bind failed: %s\n",strerror(errno));
}
// 开始监听,最大等待队列长度为10
listen(sockfd,10);
return sockfd; // 返回监听套接字
}
/**
* 主函数
*/
int main(){
unsigned short port = 2000; // 起始监听端口
// 创建epoll实例
epfd = epoll_create(1);
// 分配连接列表内存
conn_list = malloc(sizeof(struct conn) * CONNECTION_SIZE);
if (!conn_list) { // 内存分配失败
perror("malloc conn_list failed");
return -1;
}
// 初始化连接列表
memset(conn_list, 0, sizeof(struct conn) * CONNECTION_SIZE);
// 初始化多个监听端口
int i = 0;
for(i = 0;i< MAX_PORTS;i++){
// 初始化服务器,监听从port开始的多个端口
int sockfd = init_server(port + i);
// 设置监听套接字的回调函数(接收新连接)
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.recv_callback =accept_cb;
// 添加监听套接字到epoll,监听可读事件
set_event(sockfd, EPOLLIN, 1);
}
// 记录开始时间
gettimeofday(&begin, NULL);
// 主事件循环
while(1){
struct epoll_event events[1024] = {0}; // 存储就绪事件
// 等待事件发生,超时时间为-1(无限等待)
int nready = epoll_wait(epfd, events, 1024, -1);
// 处理所有就绪事件
int i = 0;
for(i = 0;i< nready;i++){
int connfd = events[i].data.fd; // 获取事件对应的文件描述符
// 处理可读事件
if(events[i].events & EPOLLIN){
conn_list[connfd].r_action.recv_callback(connfd);
}
// 处理可写事件
if(events[i].events & EPOLLOUT){
conn_list[connfd].send_callback(connfd);
}
}
}
// 释放连接列表内存(理论上不会执行到这里)
free(conn_list);
return 0;
}
2、Webserver.c
#include <stdio.h> // 标准输入输出库,用于printf等函数
#include <unistd.h> // Unix标准库,提供文件操作、进程控制等功能
#include <sys/stat.h> // 提供文件状态相关结构体和函数
#include <fcntl.h> // 文件控制相关函数,如open等
#include <string.h> // 字符串操作函数库
#include <sys/sendfile.h> // 提供sendfile函数,用于高效文件传输
#include <errno.h> // 错误处理相关定义
#include "server.h" // 自定义服务器头文件,包含连接结构体等定义
#define WEBSERVER_ROOTDIR "./" // 定义Web服务器的根目录为当前目录
/**
* 处理HTTP请求
* @param c 指向连接结构体的指针,包含客户端连接信息和缓冲区
* @return 函数目前未实现具体功能,返回值未定义
*/
int http_request(struct conn *c){
// 清空发送缓冲区,准备存储响应数据
memset(c->wbuffer, 0, BUFFER_LENGTH);
}
/**
* 生成HTTP响应
* @param c 指向连接结构体的指针,用于存储响应数据
* @return 生成的响应数据长度
*/
int http_response(struct conn *c){
// 构建HTTP响应报文并存储到发送缓冲区
// 格式化字符串包含HTTP头部和简单的HTML内容
c->wlength = sprintf(c->wbuffer,
"HTTP/1.1 200 OK\r\n" // HTTP版本和状态码(200表示成功)
"Content-Type: text/html\r\n" // 响应内容类型为HTML
"Accept-Ranges: bytes\r\n" // 支持字节范围请求
"Content-Length: 82\r\n" // 响应体长度(字节数)
"Date: Tue, 30 Apr 2024 13:16:46 GMT\r\n\r\n" // 响应日期
"<html><head><title>Charon.html</title></head><boby><h1>Charon</h1></body></html>\r\n\r\n"); // HTML响应体
return c->wlength; // 返回响应数据的长度
}
3、server.h
#ifndef __SERVER_H__
#define __SERVER_H__
#define BUFFER_LENGTH 1024
typedef int (*RCALLBACK)(int fd);
struct conn{
int fd;
char rbuffer[BUFFER_LENGTH];
int rlength;
char wbuffer[BUFFER_LENGTH];
int wlength;
int status;
RCALLBACK send_callback;
union{
RCALLBACK recv_callback;
RCALLBACK accept_callback;
} r_action;
};
int http_request(struct conn *c);
int http_response(struct conn *c);
int ws_request(struct conn *c);
int ws_response(struct conn *c);
#endif