目录
一.接口
1.1epoll_creaet
注意返回值是一个文件描述符
创建一个epoll模型
1.2epoll_ctl
返回值:
第一个参数是epoll_create的返回值
第二个参数表示动作,用三个宏来表示.
第三个参数是需要监听的 fd.
第四个参数是告诉内核需要监听什么事.
第二个参数的取值:
• EPOLL_CTL_ADD:注册新的 fd 到 epfd 中;
• EPOLL_CTL_MOD:修改已经注册的 fd 的监听事件;
• EPOLL_CTL_DEL:从 epfd 中删除一个 fd;
第四个参数:
• EPOLLIN : 表示对应的文件描述符可以读 (包括对端 SOCKET 正常关闭);
• EPOLLOUT : 表示对应的文件描述符可以写;
• EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外 数据到来);
• EPOLLERR : 表示对应的文件描述符发生错误;
• EPOLLHUP : 表示对应的文件描述符被挂断;
• EPOLLET : 将 EPOLL 设为边缘触发(Edge Triggered)模式, 这是相对于水平 触发(Level Triggered)来说的.
• EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继 续监听这个 socket 的话, 需要再次把这个 socket 加入到 EPOLL 队列里
1.3epoll_wait
• epoll 将会把发生的事件赋值到 events 数组中 (events 不可以是空指针,内核 只负责把数据复制到这个 events 数组中,不会去帮助我们在用户态中分配内存).
• maxevents 告之内核这个 events 有多大,这个 maxevents 的值不能大于创建 epoll_create()时的 size.
• 参数 timeout 是超时时间 (毫秒,0 会立即返回,-1 是永久阻塞).
• 如果函数调用成功,返回对应 I/O 上已准备好的文件描述符数目,如返回 0 表 示已超时, 返回小于 0 表示函数失败.
二.细节问题
epoll模型实际上就是三种东西,红黑树,就绪队列,回调机制。epoll_ctl实际上就是维护红黑树的,用户告诉内核,要求内核帮我去关心哪些fd。epoll_wait就是内核告诉用户,哪一个fd上面的某些事情已经就绪了。
2.1 工作原理
细节:epoll_ctl的作用:向红黑树中插入节点,向底层回调注册回调方法。
当某一进程调用 epoll_create 方法时,Linux 内核会创建一个 eventpoll 结构 体,这个结构体中有两个成员与 epoll 的使用方式密切相关.
struct eventpoll{
....
/*红黑树的根节点,这颗树中存储着所有添加到 epoll 中的需要监控的事件
*/
struct rb_root rbr;
/*双链表中则存放着将要通过 epoll_wait 返回给用户的满足条件的事件*/
struct list_head rdlist;
....
};
• 每一个 epoll 对象都有一个独立的 eventpoll 结构体,用于存放通过 epoll_ctl 方 法向 epoll 对象中添加进来的事件.
• 这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高 效的识别出来(红黑树的插入时间效率是 lgn,其中 n 为树的高度).
• 而所有添加到 epoll 中的事件都会与设备(网卡)驱动程序建立回调关系,也就是 说,当响应的事件发生时会调用这个回调方法.
• 这个回调方法在内核中叫 ep_poll_callback,它会将发生的事件添加到 rdlist 双 链表中.
• 在 epoll 中,对于每一个事件,都会建立一个 epitem 结构体.
struct epitem{
struct rb_node rbn;//红黑树节点
struct list_head rdllink;//双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的 eventpoll 对象
struct epoll_event event; //期待发生的事件类型
}
• 当调用 epoll_wait 检查是否有事件发生时,只需要检查 eventpoll 对象中的 rdlist 双链表中是否有 epitem 元素即可.(有事件就绪了,提前注册的回调机制会自动的把红黑树的节点添加到双链表中)
• 如果 rdlist 不为空,则把发生的事件复制到用户态,同时将事件数量返回给用 户. 这个操作的时间复杂度是 O(1).
2.2 epoll的demo
#pragma once
#include <iostream>
#include <memory>
#include <unistd.h>
#include <cstring>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "Common.hpp"
using namespace SocketModule;
class EpollServer
{
const static int size = 64;
const int defaultfd = -1;
public:
EpollServer(int port) : _listensock(std::make_unique<TcpSocket>()), _isruning(false), _epfd(defaultfd)
{
// 1.创建listensocket
_listensock->BuildTcpSocketMethod(port);
// 2.创建epoll模型
_epfd = epoll_create(256);
if (_epfd < 0)
{
LOG(LogLevel::FATAL) << "epoll_create error...";
exit(EPOLL_CREATE_ERR);
}
LOG(LogLevel::FATAL) << "epoll_create success , epfd: " << _epfd;
// 3.将listensocket设置到内核中
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = _listensock->Fd();
int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock->Fd(), &ev);
if (n < 0)
{
LOG(LogLevel::FATAL) << "add listensocket failed";
exit(EPOLL_CTL_ERR);
}
}
void Start()
{
int timeout = -1;
_isruning = true;
while (true)
{
int n = epoll_wait(_epfd, revs, size, timeout);
switch (n)
{
case 0:
LOG(LogLevel::DEBUG) << "timeout...";
break;
case -1:
LOG(LogLevel::ERROR) << "epoll error";
break;
default:
Dispatcher(n);
break;
}
}
}
void Dispatcher(int n)
{
for (int i = 0; i < n; i++)
{
if (revs[i].events & EPOLLIN)
{
if (revs[i].data.fd == _listensock->Fd())
{
// 新链接到来
Accepter();
}
else
{
Recver(i);
}
}
}
}
void Accepter()
{
InetAddr client;
int sockfd = _listensock->Accept(&client); // 这里一定不会阻塞,等和拷贝分离了
if (sockfd >= 0)
{
// 获取新链接成功
LOG(LogLevel::INFO) << "get a new link , sockfd: " << sockfd << "client is: " << client.StringAddr();
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = sockfd;
int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sockfd, &ev);
if (n < 0)
{
LOG(LogLevel::WARING) << "add socket failed";
}
else
{
LOG(LogLevel::INFO) << "add socket success";
}
}
}
void Recver(int pos)
{
// recv的时候肯定也不会阻塞
char buffer[1024];
ssize_t n = recv(revs[pos].data.fd, buffer, sizeof(buffer) - 1, 0); // 这样写是有bug的,tcp是面向字节流的
if (n > 0)
{
buffer[n] = 0;
std::cout << "client say& " << buffer << std::endl;
}
else if (n == 0)
{
LOG(LogLevel::DEBUG) << "client quit";
// 不让epoll关心这个fd了
int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, revs[pos].data.fd, nullptr);
if (n < 0)
{
LOG(LogLevel::FATAL) << "del socket failed";
exit(EPOLL_CTL_ERR);
}
close(revs[pos].data.fd); // 先移除,在关闭
}
else
{
LOG(LogLevel::ERROR) << "recv error";
// 不让epoll关心这个fd了
int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, revs[pos].data.fd, nullptr);
if (n < 0)
{
LOG(LogLevel::FATAL) << "del socket failed";
exit(EPOLL_CTL_ERR);
}
close(revs[pos].data.fd); // 先移除,在关闭
}
}
~EpollServer()
{
_listensock->Close();
if (_epfd > 0)
close(_epfd);
}
private:
std::unique_ptr<Socket> _listensock;
bool _isruning;
int _epfd;
struct epoll_event revs[size];
};
2.3 epoll的优点
• 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要 每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
• 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到 内核中, 这个操作并不频繁(而 select/poll 都是每次循环都要进行拷贝)
• 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符 结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就 绪. 这个操作时间复杂度 O(1). 即使文件描述符数目很多, 效率也不会受到影响.
• 没有数量限制: 文件描述符数目无上限.
三. LT 与 ET模式
理解ET
使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞. 这个不是接口上的要求, 而是 "工 程实践" 上的要求. 假设这样的场景: 服务器接收到一个 10k 的请求, 会向客户端返回一个应答数据. 如果客 户端收不到应答, 不会发送第二个 10k 请求.
如果服务端写的代码是阻塞式的 read, 并且一次只 read 1k 数据的话(read 不能保证一 次就把所有的数据都读出来, 参考 man 手册的说明, 可能被信号打断), 剩下的 9k 数据 就会待在缓冲区中.
此时由于 epoll 是 ET 模式, 并不会认为文件描述符读就绪. epoll_wait 就不会再次返 回. 剩下的 9k 数据会一直在缓冲区中. 直到下一次客户端再给服务器写数据. epoll_wait 才能返回
但是问题来了.
• 服务器只读到 1k 个数据, 要 10k 读完才会给客户端返回响应数据.
• 客户端要读到服务器的响应, 才会发送下一个请求
• 客户端发送了下一个请求, epoll_wait 才会返回, 才能去读缓冲区中剩余的数 据.
所以, 为了解决上述问题(阻塞 read 不一定能一下把完整的请求读完), 于是就可以使用 非阻塞轮训的方式来读缓冲区, 保证一定能把完整的请求都读出来.而如果是 LT 没这个问题. 只要缓冲区中的数据没读完, 就能够让 epoll_wait 返回文件 描述符读就绪.
四. reactor
Reactor.hpp
#pragma once
#include <iostream>
#include <memory>
#include <unordered_map>
#include "Epoller.hpp"
#include "Connection.hpp"
class Reactor
{
private:
static const int revs_num = 128;
bool IsConnectionExists(std::shared_ptr<Connection> &conn)
{
return IsConnectionExistsHelper(conn->GetSockFd());
}
bool IsConnectionExists(int sockfd)
{
return IsConnectionExistsHelper(sockfd);
}
bool IsConnectionExistsHelper(int sockfd)
{
auto iter = _connections.find(sockfd);
if (iter == _connections.end())
{
return false;
}
return true;
}
bool IsConnectionEmpty()
{
return _connections.empty();
}
public:
Reactor() : _epoll_ptr(std::make_unique<Epoller>()), _isruning(false)
{
}
void Start()
{
if (IsConnectionEmpty())
{
return;
}
_isruning = true;
while (true)
{
PrintConnection();
int n = _epoll_ptr->Wait(_revs, revs_num, -1);
for (int i = 0; i < n; i++)
{
int sockfd = _revs[i].data.fd;
uint32_t revents = _revs[i].events;
//将所有的异常处理转换为IO错误
if(revents & EPOLLERR)
revents |= (EPOLLIN | EPOLLOUT);//只要是出错了就打开读写端
if(revents & EPOLLHUP)
revents |= (EPOLLIN | EPOLLOUT);
if(revents & EPOLLIN)
{
//不用区分异常了,因为统一处理
//不用区分是listensocket还是普通事件就绪
if(IsConnectionExists(sockfd))
_connections[sockfd]->Recver();
}
if(revents & EPOLLOUT)
{
if(IsConnectionExists(sockfd))
_connections[sockfd]->Sender();
}
}
}
_isruning = false;
}
void AddNewConnection(std::shared_ptr<Connection> &conn)
{
if (IsConnectionExists(conn))
{
LOG(LogLevel::WARING) << "conn is exists: " << conn->GetSockFd();
return;
}
// 1.把conn对应的fd和他关心的事件写到内核
uint32_t events = conn->GetEvent();
int sockfd = conn->GetSockFd();
_epoll_ptr->Add(sockfd, events);
// *.设置回指指针
conn->SetOwner(this);
// 2.把connection对象添加到connections内部
_connections[sockfd] = conn;
}
void EnableReadWrite(int sockfd,bool enableread,bool enablewrite)
{
// 不要重复添加
if (!IsConnectionExists(sockfd))
{
LOG(LogLevel::WARING) << "EnableReadWrite: conn is exists: " << sockfd;
return;
}
// 修改当前的sockfd对应的connection关心的事件
uint32_t events = (EPOLLET | (enableread ? EPOLLIN : 0) | (enablewrite ? EPOLLOUT : 0));
_connections[sockfd]->SetEvent(events);
//再去写透到内核中
_epoll_ptr->Mod(sockfd,events);
}
void DelConnection(int sockfd)
{
_epoll_ptr->Del(sockfd);
_connections.erase(sockfd);
close(sockfd);
}
void PrintConnection()
{
std::cout << "当前正在管理的fd:" << std::endl;
for(auto &conn : _connections)
{
std::cout << conn.second->GetSockFd() << " ";
}
std::cout << "\r\n";
}
~Reactor()
{
}
private:
// 1.epoll模型
std::unique_ptr<Epoller> _epoll_ptr;
// // 2.listensocket 单独封装管理
// std::shared_ptr<Listener> _listener_ptr;
// 3.每一个fd都需要一个单独的输入输出缓冲区,管理套接字
std::unordered_map<int, std::shared_ptr<Connection>> _connections;
// 4.就绪的所有事件
struct epoll_event _revs[revs_num];
bool _isruning;
};
Listener.hpp
#pragma once
#include <iostream>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "Channel.hpp"
#include "Connection.hpp"
using namespace SocketModule;
class Listener : public Connection
{
public:
Listener(int port = 8080) : _port(port), _listensock(std::make_unique<TcpSocket>())
{
_listensock->BuildTcpSocketMethod(_port);
SetEvent(EPOLLIN | EPOLLET);
SetNonBlock(_listensock->Fd());
}
int GetSockFd()
{
return _listensock->Fd();
}
void Recver() override
{
InetAddr client;
//虽然是新链接到来了,但是只有一个链接吗,
//while,ET,设置fd为非阻塞
while (true)
{
int sockfd = _listensock->Accept(&client);
if(sockfd == ACCEPT_ERR)
{
break;
}
else if(sockfd == ACCEPT_CONTINUE)
{
continue;
}
else if(sockfd == ACCEPT_DONE)
{
break;
}
else
{
//是一个合法的fd,但是怎么去添加到_connections里?需要回调指针
std::shared_ptr<Connection> conn = std::make_shared<Channel>(sockfd,client);
conn->SetEvent(EPOLLIN | EPOLLET);
if(_handler != nullptr)
conn->RegisterHandler(_handler);
GetOwner()->AddNewConnection(conn);
}
}
}
// std::string& Inbuffer() override
// {}
// std::string& AppendOutBuffer(std::string& out) override
// {}
void Sender() override
{}
void Excepter() override
{}
~Listener()
{
}
private:
int _port;
std::unique_ptr<Socket> _listensock;
};
Channel.hpp
#pragma once
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <functional>
#include "Common.hpp"
#include "Connection.hpp"
#include "InetAddr.hpp"
#define SIZE 1024
class Channel : public Connection
{
public:
Channel(int sockfd,const InetAddr& client) : _sockfd(sockfd),_client_addr(client)
{
SetNonBlock(sockfd);
}
int GetSockFd()
{
return _sockfd;
}
//保证把本轮数据读完 (while循环)
//即便是读完了,怎么知道数据由完整的报文,如果是多个报文呢?(协议)
void Recver() override
{
char buffer[SIZE];
while(true)
{
buffer[0] = 0;
ssize_t n = recv(_sockfd,buffer,sizeof(buffer) - 1, 0);
if(n > 0)
{
buffer[n] = 0;
_inbuffer += buffer;
}
else if(n == 0)
{
Excepter();
return;
}
else
{
if(errno == EAGAIN || errno == EWOULDBLOCK)
{
break;//本轮数据读完了
}
else if(errno == EINTR)
{
continue;
}
else
{
Excepter();
return;
}
}
}
LOG(LogLevel::DEBUG) << _inbuffer;
if(!_inbuffer.empty())
{
// _handler(std::shared_ptr<Connection>(this));
_outbuffer += _handler(_inbuffer);
}
if(!_outbuffer.empty())
{
Sender();
// GetOwner()->EnableReadWrite(_sockfd,true,true);
}
}
// std::string& Inbuffer() override
// {
// return _inbuffer;
// }
// std::string& AppendOutBuffer(std::string& out) override
// {
// _outbuffer += out;
// return _outbuffer;
// }
void Sender() override
{
while (true)
{
ssize_t n = send(_sockfd,_outbuffer.c_str(),_outbuffer.size(),0);//非阻塞发
if(n > 0)
{
_outbuffer.erase(0,n);
if(_outbuffer.empty())
break;
}
else if(n == 0)
{
break;
}
else
{
if(errno == EAGAIN || errno == EWOULDBLOCK)
{
break;
}
else if(errno == EINTR)
{
continue;
}
else
{
Excepter();
return;
}
}
}
// 1.数据发送完毕
// 2.发送条件不具备
if(!_outbuffer.empty())
{
// 开启对写事件的关心
GetOwner()->EnableReadWrite(_sockfd,true,true);
}
else
{
GetOwner()->EnableReadWrite(_sockfd,true,false);
}
}
void Excepter() override
{
GetOwner()->DelConnection(_sockfd);
}
~Channel()
{
}
private:
int _sockfd;
std::string _inbuffer;
std::string _outbuffer;
InetAddr _client_addr;
// handler_t _handler;
};
Connection.hpp
#pragma once
#include <iostream>
#include <string>
#include "InetAddr.hpp"
class Reactor;
class Connection;
using handler_t = std::function<std::string (std::string &)>;
class Connection
{
public:
Connection():_owner(nullptr),_events(0)
{
}
virtual void Recver() = 0;
virtual void Sender() = 0;
virtual void Excepter() = 0;
virtual int GetSockFd() = 0;
// virtual std::string& Inbuffer() = 0;
// virtual std::string& AppendOutBuffer(std::string& out) = 0;
void RegisterHandler(handler_t handler)
{
_handler = handler;
}
void SetEvent(const uint32_t &events)
{
_events = events;
}
uint32_t GetEvent()
{
return _events;
}
void SetOwner(Reactor *owner)
{
_owner = owner;
}
Reactor *GetOwner()
{
return _owner;
}
~Connection()
{
}
private:
// 回指指针,用于listensocket添加普通套接字
Reactor *_owner;
// 关心事件
uint32_t _events;
public:
handler_t _handler;
};
Main.cc
#include <iostream>
#include "Log.hpp"
#include "Reactor.hpp"
#include "Listener.hpp"
#include "Protocol.hpp"
#include "NetCal.hpp"
static void Usage(std::string proc)
{
std::cerr << "Usage: " << proc << " port" << std::endl;
}
int main(int argc, char *argv[])
{
if (argc != 2)
{
Usage(argv[0]);
exit(USAGE_ERR);
}
Enable_Console_Log_Strtegy();
int port = std::stoi(argv[1]);
// 构建业务模块
std::shared_ptr<Cal> cal = std::make_shared<Cal>();
// 构建协议对象
std::shared_ptr<Protocol> protocal = std::make_shared<Protocol>([&cal](Request& req) -> Response{
return cal->Execute(req);
});
// 构建listener对象
std::shared_ptr<Connection> conn = std::make_shared<Listener>(port);
conn->RegisterHandler([&protocal](std::string& inbuffer)->std::string{
std::string response_str;
//可能不止一个报文
while(true)
{
std::string package;
if(!protocal->Decode(inbuffer,&package))
break;
response_str += protocal->Execute(package);
}
return response_str;
});
std::unique_ptr<Reactor> tsvr = std::make_unique<Reactor>();
tsvr->AddNewConnection(conn);
tsvr->Start();
return 0;
}