前言
作为服务器,核心自然是高效的处理来自client的多个连接啦,那问题在于,如何高效的处理client的连接呢?这里就介绍两种架构:单Reactor架构和主丛Reactor架构。
单Reactor架构
单Reactor架构的核心为,由一个主线程监听包括ServerFd在内的所有fd,当某个fd 可读/可写的时候,就把它交给线程池中的某个线程去处理,核心流程大概是:
while(true) {
epoll_wait();
/*遍历所有监听到的event*/
for(所有活跃的events的fd) {
if(fd == listenfd) dealListen(); /*如果是listen fd可读 ,则accept建立新的连接*/
if(fd == readfd) dealRead(); /*如果clientfd可读,就交给线程池处理可读*/
if(fd == writefd) dealWrite(); /*如果可写 交给线程池处理可写*/
}
}
/*伪代码*/
dealListen()
{
do{
int retFd = accept();
if(retFd < 0) /*没有待处理的fd了*/
break;
/*其它和retFd相关的处理*/
}while(true);
}
这看起来是个很合理的实现,但是在某些场景下是会遇到性能瓶颈的,试想这样一个场景:如果同时突然间出现了大量的client请求链接,那我们的epoll_wait监听到的应该是listenFd可读,调用dealListen()来处理listen事件,dealListen可能是调用多次accept函数直到返回值小于0,才表明没有新的连接了,这个过程很费时间,也就意味着:哪怕有些client已经建立连接了,并且触发可读/可写了,主线程都无法及时将事件交给线程池处理,而这就是它的性能瓶颈
这就引入了我们的主丛Reactor架构,它的处理逻辑是:主线程的epoll只负责监听,监听到了就把这个fd交给其他的epoll去处理(监听它的可读可写),这样,在面对突发IO连接的时候,也能及时响应。
其中主Reactor的核心是Acceptor中处理listenfd的可读/写操作,然后把监听到的clientfd分发给其他的sub_reactor,当client_fd可读可写的时候,由对应的sub_reactor的epoll监听到进行读写。
有了思路,剩下的事情就是怎么做,我们来看看muduo库的实现。
Channel类—>对fd的封装
对于每个fd,不管是clientfd还是listenfd也好,监听到可读/可写后,都应该调用自身对应的可读/写回调函数进行处理;同时对于每个fd,他也有自己对应的epoll(在这里用EventLoop对epoll和thread进一步的封装)
class Channel : noncopyable
{
public:
/*只是起了个别名的作用*/
using EventCallback = std::function<void()>; // muduo仍使用typedef
using ReadEventCallback = std::function<void(Timestamp)>;// read和时间挂钩了
Channel(EventLoop *loop, int fd);
~Channel();
// fd得到Poller通知以后 处理事件 handleEvent在EventLoop::loop()中调用
void handleEvent(Timestamp receiveTime);
// 设置回调函数对象
// 用move比直接赋值好
// 直接赋值有两次拷贝操作(实参到形参 形参到类内对象) 但是move简化了第二次
// 使用move的前提:移动的资源在堆上,且支持这样的操作
void setReadCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }
void setWriteCallback(EventCallback cb) { writeCallback_ = std::move(cb); }
void setCloseCallback(EventCallback cb) { closeCallback_ = std::move(cb); }
void setErrorCallback(EventCallback cb) { errorCallback_ = std::move(cb); }
// 防止当channel被手动remove掉 channel还在执行回调操作
void tie(const std::shared_ptr<void> &);
int fd() const { return fd_; }
int events() const { return events_; }
void set_revents(int revt) { revents_ = revt; }
// 设置fd相应的事件状态 相当于epoll_ctl add delete
void enableReading() { events_ |= kReadEvent; update(); }
void disableReading() { events_ &= ~kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void disableWriting() { events_ &= ~kWriteEvent; update(); }
void disableAll() { events_ = kNoneEvent; update(); }
/*只是可读*/
void setReading() { events_ = kReadEvent; update(); }
/*只是可写*/
void setWriting() { events_ = kWriteEvent; update(); }
// 返回fd当前的事件状态
bool isNoneEvent() const { return events_ == kNoneEvent; }
bool isWriting() const { return events_ & kWriteEvent; }
bool isReading() const { return events_ & kReadEvent; }
int index() { return index_; }
void set_index(int idx) { index_ = idx; }
// one loop per thread
EventLoop *ownerLoop() { return loop_; }
void remove();
private:
void update();
void handleEventWithGuard(Timestamp receiveTime);
static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;
EventLoop *loop_; // 事件循环
const int fd_; // fd,Poller监听的对象
int events_; // 注册fd感兴趣的事件(只是一个表示,通过update调用到epoller的ctl重新设置flag)
int revents_; // Poller返回的具体发生的事件(poller在wait到之后通过set_revents返回值)
int index_;
std::weak_ptr<void> tie_; /*观察对象是否存在*/
bool tied_;
// 因为channel通道里可获知fd最终发生的具体的事件events,所以它负责调用具体事件的回调操作
ReadEventCallback readCallback_; /*具体可读函数得看具体实现是咋样的*/
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;
};
/*核心处理函数,当可读/写/ERROR发生的时候 调用对应的回调*/
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
//LOG_INFO<<"channel handleEvent revents:"<<revents_;
// LOG_INFO("channel handleEvent revents:[%d]", revents_);
// 关闭
if ((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN)) // 当TcpConnection对应Channel 通过shutdown 关闭写端 epoll触发EPOLLHUP
{
if (closeCallback_)
{
closeCallback_();
}
}
// 错误
if (revents_ & EPOLLERR)
{
if (errorCallback_)
{
errorCallback_();
}
}
// 读
if (revents_ & (EPOLLIN | EPOLLPRI))
{
if (readCallback_)
{
readCallback_(receiveTime);
}
}
// 写
if (revents_ & EPOLLOUT)
{
if (writeCallback_)
{
writeCallback_();
}
}
}
/*update只是对epoll_ctl的进一步封装*/
/*只是可读*/
void setReading() { events_ = kReadEvent; update(); }
/*只是可写*/
void setWriting() { events_ = kWriteEvent; update(); }
Epoller—>对Epoll的封装
对于常用API进行了抽象和封装,epoll本身也归属于某个EventLoop。
EventLoop----“one loop per thread的体现”
对于主Reactor也好,子Reactor也好,它们每个都是对应着某个线程,这也就是我们为什么抽象EventLoop这样一个类,那么思考EventLoop得有什么呢?
- 得有个线程处理函数func一直在执行
- fuc要做两件事:(1) 处理epoll_wait;(2) 处理和其它线程/资源交互的信息(比如主线程怎么把clientfd分发给其他线程)
清楚了这两件事,那如何做呢?我们来看看muduo库的做法
线程间交互: eventfd + 回调函数机制
如果想要和其它线程/资源交互,能想到的一点就是利用信号量/互斥量等操作,但是这样就有了另一个问题:我们的线程处理函数是要同时执行epoll_wait的,等不到的时候可是阻塞当前线程的,所以对于线程间的交互,处理的会不及时。
while(true)
{
epoll_wait(超时时间);
for(){处理所有的事件};
lock(); /*最坏情况得等到超时时间到了才能处理*/
for(){处理线程间的交互事件}
unlock();
}
这里就引入了一个eventfd。我们给每一个EventLoop的Epoll,除了要监视clientfd之外,还需要额外监视一个fd----eventfd,关于它如果想多了解可以自行搜索,粗略的讲,eventfd里面有个计数器,每次write()写就会增加(此时会触发epoll可读),而每次读read()就会清零计数器。
那以此来看其它线程可以怎么通过eventfd来和当前线程交互呢?—就是queueInLoop函数
// 把cb放入队列中 唤醒loop所在的线程执行cb(由其他线程或者本线程调用)
void EventLoop::queueInLoop(Functor cb)
{
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);/*存储了所有需要交互的函数*/
}
/**
* || callingPendingFunctors的意思是 当前loop正在执行回调中 但是loop的pendingFunctors_中又加入了新的回调 需要通过wakeup写事件
* 唤醒相应的需要执行上面回调操作的loop的线程 让loop()下一次poller_->poll()不再阻塞(阻塞的话会延迟前一次新加入的回调的执行),然后
* 继续执行pendingFunctors_中的回调函数
**/
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup(); // 唤醒loop所在线程,本质就是对eventfd进行写的操作
}
}
// 当epoll触发的时候,eventfd对应的Channel绑定的回调函数
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof(one)); /*不清零就会一直epoll可读*/
if (n != sizeof(one))
{
//LOG_ERROR<<"EventLoop::handleRead() reads"<<n<<"bytes instead of 8";
LOG_ERROR("EventLoop::handleRead() reads[%d]bytes instead of 8",n);
}
}
依次就能写出这个EventLoop的loop函数的逻辑啦,eventfd确实帮了大忙简化了很多逻辑
EventLoop的核心----loop()函数
void EventLoop::loop()
{
looping_ = true;
quit_ = false;
//LOG_INFO<<"EventLoop start looping";
LOG_INFO("EventLoop start looping");
while (!quit_)
{
activeChannels_.clear();/*清空容器所有变量*/
pollRetureTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (Channel *channel : activeChannels_)
{
//LOG_INFO("active channel fd:[%d]",channel->fd());
// Poller监听哪些channel发生了事件 然后上报给EventLoop 通知channel处理相应的事件
// 包括cilentfd和eventfd两类(子Reactor)
// 包括listenfd和eventfd两类(主Reactor)
// 当然还有一个timefd,可以epoll + timefd处理一些超时事件
// 感兴趣可以了解一下
channel->handleEvent(pollRetureTime_);
}
/**
* 执行当前EventLoop事件循环需要处理的回调操作 对于线程数 >=2 的情况 IO线程 mainloop(mainReactor) 主要工作:
* accept接收连接 => 将accept返回的connfd打包为Channel => TcpServer::newConnection通过轮询将TcpConnection对象分配给subloop处理
*
* mainloop调用queueInLoop将回调加入subloop(该回调需要subloop执行 但subloop还在poller_->poll处阻塞) queueInLoop通过wakeup将subloop唤醒
**/
doPendingFunctors(); /*调用vector保存的所有回调函数*/
}
//LOG_INFO<<"EventLoopstop looping";
LOG_INFO("EventLoopstop looping")
looping_ = false;
}
Acceptor:ListenFd的封装
Acceptor里面的成员函数,就是主Reactor对应的线程的操作函数。我们来想Acceptor需要做什么:
- 得能开启listen
- 监听到可读事件后,需要调用对应的可读回调函数处理
- 在回调函数中,应该把clientfd分配给某个EventLoop
Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
: loop_(loop)
, acceptSocket_(createNonblocking())
, acceptChannel_(loop, acceptSocket_.fd())
, listenning_(false)
{
//LOG_INFO("server id:[%d]",acceptSocket_.fd());
//LOG_INFO("Acceptor make success");
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(true);
acceptSocket_.bindAddress(listenAddr);
// TcpServer::start() => Acceptor.listen() 如果有新用户连接 要执行一个回调(accept => connfd => 打包成Channel => 唤醒subloop)
// baseloop监听到有事件发生 => acceptChannel_(listenfd) => 执行该回调函数
acceptChannel_.setReadCallback(
std::bind(&Acceptor::handleRead, this));
}
Acceptor::~Acceptor()
{
acceptChannel_.disableAll(); // 把从Poller中感兴趣的事件删除掉
acceptChannel_.remove(); // 调用EventLoop->removeChannel => Poller->removeChannel 把Poller的ChannelMap对应的部分删除
}
void Acceptor::listen()
{
listenning_ = true;
acceptSocket_.listen(); // 开启listen
acceptChannel_.enableReading(); // acceptChannel_注册至Poller,要不怎么监听呢
}
// listenfd有事件发生了,就是有新用户连接了
void Acceptor::handleRead()
{
InetAddress peerAddr;
do
{
int connfd = acceptSocket_.accept(&peerAddr);
if(connfd < 0)
break;
//LOG_INFO("listen fd:[%d] success",connfd);
//fcntl(connfd, F_SETFL, fcntl(connfd, F_GETFD, 0) | O_NONBLOCK);(不用 已经设计过了)
if (connfd >= 0)
{
if (NewConnectionCallback_) /*Tcp Server中调用的*/
{
/*这里实现看需求了,所以封装成回调函数的形式更灵活*/
NewConnectionCallback_(connfd, peerAddr); // 轮询找到subLoop 唤醒并分发当前的新客户端的Channel
}
else
{
::close(connfd);
}
}
else
{
LOG_ERROR("accept Err");
if (errno == EMFILE)
{
LOG_ERROR("sockfd reached limit");
}
}
} while (true);
}
EventLoopThreadPool----对应subReactor的操作
#include "../MultiReactor/EventLoopThread.h"
#include "../MultiReactor/EventLoop.h"
EventLoopThread::EventLoopThread(const ThreadInitCallback &cb,
const std::string &name)
: loop_(nullptr)
, exiting_(false)
, thread_(std::bind(&EventLoopThread::threadFunc, this), name)
, mutex_()
, cond_()
, callback_(cb)
{
}
// 由谁调用呢
EventLoopThread::~EventLoopThread()
{
exiting_ = true;
if (loop_ != nullptr)
{
loop_->quit();
thread_.join();//调用EventLoopThread析构的线程会阻塞 直到thread_对应的线程运行完毕
}
}
/*是让主Reactor调用的*/
EventLoop *EventLoopThread::startLoop()
{
thread_.start(); // 启用底层线程Thread类对象thread_中通过start()创建的线程
EventLoop *loop = nullptr;
{
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this](){return loop_ != nullptr;});
loop = loop_;
}
return loop;
}
// 下面这个方法 是在单独的新线程里运行的
void EventLoopThread::threadFunc()
{
// 给每个线程创建一个EventLoop
EventLoop loop;
if(callback_)
{
callback_(&loop);//如果设置了回调函数
}// 新线程调用回调就能获得自己的EventLoop了
// 所有的Loop是由谁创建的呢?
{
std::unique_lock<std::mutex> lock(mutex_);
loop_ = &loop;
cond_.notify_one();
}
loop.loop(); // 执行EventLoop的loop() 开启了底层的Poller的poll()
std::unique_lock<std::mutex> lock(mutex_);
loop_ = nullptr;
}
#include <memory>
#include "../MultiReactor/EventLoopThreadPool.h"
#include "../MultiReactor/EventLoopThread.h"
#include "../Log/log.h"
EventLoopThreadPool::EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg)
: baseLoop_(baseLoop), name_(nameArg), started_(false), numThreads_(12), next_(0)
{
}
/*析构就是啥也不做?不过好像确实这个是整个程序的生命周期*/
EventLoopThreadPool::~EventLoopThreadPool()
{
// Don't delete loop, it's stack variable
}void EventLoopThreadPool::start(const ThreadInitCallback &cb)
{
started_ = true;
for (int i = 0; i < numThreads_; ++i)
{
char buf[name_.size() + 32];
snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
/*池子确定了回调函数是什么*/
/*这是一个创建成功的回调函数 新线程创建成功了通过callback就能获得自己的专属loop_*/
EventLoopThread *t = new EventLoopThread(cb, buf);
threads_.push_back(std::unique_ptr<EventLoopThread>(t));
loops_.push_back(t->startLoop()); // 底层创建线程 绑定一个新的EventLoop 并返回该loop的地址
}
if (numThreads_ == 0 && cb) // 整个服务端只有一个线程运行baseLoop
{
cb(baseLoop_);
}
}
// 如果工作在多线程中,baseLoop_(mainLoop)会默认以轮询的方式分配Channel给subLoop
EventLoop *EventLoopThreadPool::getNextLoop()
{
// 如果只设置一个线程 也就是只有一个mainReactor 无subReactor
// 那么轮询只有一个线程 getNextLoop()每次都返回当前的baseLoop_
EventLoop *loop = baseLoop_;
// 通过轮询获取下一个处理事件的loop
// 如果没设置多线程数量,则不会进去,相当于直接返回baseLoop
if(!loops_.empty())
{
loop = loops_[next_];
++next_;
// 轮询
if(next_ >= loops_.size())
{
next_ = 0;
}
}
return loop;
}
std::vector<EventLoop *> EventLoopThreadPool::getAllLoops()
{
if (loops_.empty())
{
return std::vector<EventLoop *>(1, baseLoop_);
}
else
{
return loops_;
}
}