【仿muduo库实现并发服务器】eventloop模块

发布于:2025-07-07 ⋅ 阅读:(22) ⋅ 点赞:(0)

一.eventloop模块

一个Eventloop对应一个线程

Eventloop模块它是进行事件监控以及事件处理的模块。
一个Eventlopp对应一个线程。
当Eventloop监控了一个连接,而这个连接一旦有事件就绪,就会去处理。
如果这个连接在多个线程中都触发了事件(前提是描述符被多个线程Eventloop监控),就会存在线程安全问题。所以为了避免这个问题,就要求连接必须在一个线程中处理。

也就是连接的所有操作都必须在同一个线程中处理,将连接的事件监控,以及连接事件处理,关闭连接等操作都放在同一个线程中进行。

也因为连接的内部函数可能会被不同的线程同时执行,就会存在线程安全问题。

如何保证一个连接的所有操作都在同一个线程中执行呢?

连接是无法与线程进行绑定的,但是连接可以与Eventloop绑定,而Eventloop则与线程是绑定的。
所以每个连接都有对应的Eventloop,只要让连接去对应的Eventloop中执行就是在同一个线程中执行。

在这里插入图片描述
这其实就相当于把我们的channel跟eventloop给关联到了一起之间
的关联到了一起,而eventloop又跟线程是一一对应的,它是在线程里边去运行的。
eventloop模块的功能都是在同一个线程里边去完成的,一旦它监控了连接,并且连接绑定的是同一个eventloop,那也就意味着。这个连接呢,它的一个监控以及它的处理是同一个线程里边。

eventloop的处理流程:
1.首先在线程中对连接进行事件监控
2.当连接有事件就绪则就进行事件处理(调用回调函数)
3.所有的就绪事件都处理完了,再去任务队列中将任务一一取出来执行。


如何保证处理连接的回调函数中的操作都是在同一个线程中进行的呢?


解决方法:任务队列
给每个Eventloop都添加一个任务队列。
对连接的所有操作(事件监控,事件处理,关闭连接等)都进行一次封装,向外部提供的连接操作并不直接执行,而是将连接的操作以任务形式压入到任务队列里。在这里插入图片描述

不过要注意并不是将所有回调函数都压入到任务队列中去,当前执行函数的线程如果就是该连接绑定的eventloop对应的线程,那么就可以直接对函数执行,不需要再压入到任务队列中,如果不一样那么就需要压入到任务队列中。

因为如果别的线程要执行当前的线程中的conn的对象的函数的时候,是不会执行的,因为会涉及到安全问题,所以就需要吧他执行的任务放到这个任务队列中去,让当前线程自己执行。


通知机制eventfd
因为有可能因为等待描述符IO事件就绪,导致执行流流程阻塞,这时候任务队列中的任务将得不到胁行因此得有一个事件通知的东西,能够唤醒事件监控的阻塞

当线程将任务压入到任务队列时,会存在这样情况,任务队列里有任务,但不能执行。
为什么呢?因为epoll会因为没有事件就绪而阻塞住,一旦阻塞住了就无法往后执行任务队列里的任务了。
所以我们就需要一个通知机制,当一旦任务队列中有任务时,就唤醒可能阻塞的epoll监控。

1.成员变量

private:
    // 每一个eventloop对应一个线程,当线程创建时,eventloop就会被创建出来并绑定线程的id
    std::thread::id _thread_id;
    // eventloop是一个监控管理模块,里面封装了poller,用来监控所有的连接的事件
    Poller _poll;
    // 需要一个通知机制eventfd,用来唤醒可能因为IO事件监控没有事件就绪而阻塞,也就是epoll阻塞,需要唤醒它,往后执行任务队列里的任务
    int _event_fd;
    // eventfd也是一个描述符,也可以挂到poll里进行事件监控,poller监控的对象是Channel*,所以通过Channel来管理event_fd
    std::unique_ptr<Channel> _eventfd_channel;
    // 每个eventllop中都有对应的任务队列,里面存放着外界调用的函数
    using Function = std::function<void()>;
    // 如果别的线程要执行当前的线程中的conn的对象的函数的时候,是不会执行的,因为会涉及到安全问题,所以就需要吧他执行的任务放到这个任务队列中去,让当前线程自己执行,
    std::vector<Function> _task;
    // 任务队列可能存在线程安全问题,所以需要一把锁保护它
    std::mutex _mutex;
    //eventloop中还存在的功能是定时任务,所以需要时间轮定时器
    TimerWheel _timer_wheel;

std::thread::id _thread_id;//线程ID

作用1:绑定线程
一个eventloop对应一个线程,当线程创建后,该线程就会创建eventloop,并且eventloop会绑定该线程的id。

作用2:用来标识当前线程是否与eventloop绑定的线程一致
如果连接的一个函数回调里面要执行一个任务,那么这个任务如果本身就是有eventloop对应的线程执行的,那么该任务是可以直接执行的,如果不是eventloop对应的线程执行的那么就需要压入到任务队列去。

当事件就绪,需要处理的时候,处理的过程中,如果需要对连接进行某些操作,那么这些操作必须在当前eventloop对应的线程中执行,而不能由其他eventloop线程执行,要保证对连接的各项操作都是线程安全的。
1.如果执行的操作本就在线程中,不需要将操作压入队列了,可以直接执行
2.如果执行的操作不再线程中,才需要加入任务池,等到事件处理完了然后执行任务

Poller _poll;

作用:对所有连接进行事件监控,以及事件处理

int _event_fd;

int _event_fd
std::unique_ptr _eventfd_channel;

eventfd事件通知机制,用来唤醒阻塞epoll。
eventfd也是一个文件描述符,则也可以挂到epoll上监控,即poller也可以对eventfd进行监控,而epoll监控的对象是channel,所以通过channel来管理eventfd。

std::vector<Function> _task;

using Function =std::function<void()>;
std::vector _task;
//任务队列可能存在线程安全问题,所以需要一把锁保护它
std::mutex _mutex;

作用:如果别的线程要执行当前的线程中的conn的对象的函数的时候,是不会执行的,因为会涉及到安全问题,所以就需要吧他执行的任务放到这个任务队列中去,让当前线程自己执行。

任务队列中的任务都是由其他线程通过调用RunInloop压入的函数。这个任务队列是存在线程安全的,所以在访问时,需要加锁访问。
不需要每次都一次一次的加锁访问任务队列,我们只需要加锁一次,然后定义一个临时的任务队列,将原来的任务队列中的任务全部交换到临时的中去,当全部取出来就可以解锁了。然后执行临时队列中的任务即可。

这里是引用
在这里插入图片描述
当获取到一个新连接之后,会调用这里的NewConnection函数,这个函数里面会为新连接创建connection,并且绑定eventloop,再下面这里设置非活跃超时销毁 以及 可读事件监听时,就会涉及当前线程,操作另一个线程的connection对应的操作函数

此时可能会有线程安全的问题,所以封装成任务加到队列里面,交由从属线程执行

TimerWheel _timer_wheel

eventloop另外一个主要功能就是管理着所有的定时任务,添加定时任务,刷新定时任务等。
所以封装一个时间轮定时器,用来对定时任务进行操作。
定时器里面有一个timerfd,本质也是一个计数器,一旦创建,内核就会每个一段事件往该描述符中发送次数。可以实现每秒钟执行一下任务。

2.EventLoop构造

需要构造的主要由线程id,事件通知event_fd以及管理该event_fd的channel对象。
需要创建一个eventfd,并且new一个channel对象管理event_fd,并设置该描述符事件就绪时的回调函数,设置可读监控。

public: // 构造
    EventLoop() : _thread_id(std::this_thread::get_id()),
                  _event_fd(CreateEventFd()),
                  _eventfd_channel(new Channel(this, _event_fd)),
                  _timer_wheel(this)
    {
        // 设置_event_fd的读回调函数,当事件就绪就会去执行
        _eventfd_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd, this));
        // 启动event_fd的读事件监控
        _eventfd_channel->EnableRead();
    }

3.针对eventfd的操作

对事件通知的操作主要有三个:
1.创建事件通知event_fd
2.构造event_fd可读事件就绪后的回调函数,即读取event_fd中的通知次数。
3.构造event_fd通知函数,也就是唤醒机制,本质上就是往event_fd描述符中发送一次通知。当发生一次数据,event_fd描述符的可读事件就会就绪,那么epoll就会被唤醒,往后执行。

public://针对通知机制event_fd的操作:创建eventfd,读取eventfd中数据(就绪回到函数),网eventfd中写数据(唤醒epoll)
     
     //1.创建通知机制event_fd
     static int CreateEventFd()
     {
        int efd=eventfd(0,EFD_CLOEXEC|EFD_NONBLOCK);
        if(efd<0)
        {
            ERRLog("createeventfd failed");
            abort();//异常退出
        }
        return efd;
     }
     //2.构建event_fd的读事件回调函数,读取eventfd中的通知次数,单纯的就是读取出来,没有作用
     void ReadEventFd()
     {
        int res;
        ssize_t ret=(_event_fd,&res,sizeof(res));
        if(ret<0)
        {
           // 有两种情况是合法的套接字是没有问题的
            if (errno == EAGAIN) // EAGAIN表示sock缓冲区中没有数据可读了
                return ;
            if (errno == EINTR) // EINTR表示在读取的过程中被信号中断了
                return ;
            ERRLog("readevent failed");
            abort();
        }
     }
     //3.构建唤醒机制,本质就是玩eventfd中发送一个数据,那么event的读事件就绪就会唤醒阻塞的epoll
     void WakeupEventFd()
     { 
        int val=1;`在这里插入代码片`
        ssize_t ret=write(_event_fd,&val,sizeof(val));
        if(ret<0)
        {
            // 有两种情况是合法的套接字是没有问题的
            if (errno == EAGAIN) // EAGAIN表示sock缓冲区中没有空间可写了
                return ;
            if (errno == EINTR) // EINTR表示在写入的过程中被信号中断了
                return ;
            ERRLog("wakeupevent failed");
            abort();
        }

     }

4.针对poller的操作

主要是封装poller中的添加/修改事件监控操作和移除事件监控操作。从而让channel对象可以调用Eventlooop中的事件监控操作对描述符进行监控。

public://针对poller的操作,要封装添加事件监控和移除事件监控的操作
     
     //1.添加或者修改某个连接的事件监控
     void UpdateEvent(Channel*channel)
     {
        return _poll.UpdateEvent(channel);
     }     
     //2.移除某个连接的事件监控
     void RemoveEvent(Channel*channel)
     {
        return _poll.RemoveEvent(channel);
     }

channel对象中调用eventloop中的事件监控操作:
// 在Channel前面只是声明了Eventloop,但Channel不知道Eventloopr类里的成员函数,在类里无法调用Eventloop的对象函数,只能放在外面
void Channel::Update()
{
    return _loop->UpdateEvent(this);
}
void Channel::Remove()
{
    return _loop->RemoveEvent(this);
}

5.针对threadID的操作

主要判断当前的线程是否与eventloop对应的线程是一致的,如果是一致的那么对于连接的操作可以直接执行,如果不是就需要压入到任务对立中执行。

public://针对线程id的操作
     //用来判断当前的线程是否是创建eventloop的线程
     bool IsInLopp()
     {
        return _thread_id==std::this_thread::get_id();
     } 

6.针对TaskQueue的操作

对于任务队列的操作主要有:
1.将任务函数压入到任务队列中
2.根据线程id,决定是否真正压入到任务队列中
3.执行任务队列中的所有任务。

要注意在任务函数压入到任务队列中后,理论上队列中的任务就必须有当前线程去执行,但是会存在epoll因为没有事件就绪而阻塞住,所以压入一个任务后,就必须唤醒epoll。

在访问到任务队列时,都必须加锁访问,保证线程安全。
执行任务队列中的操作的思想是:先将任务中的所有任务都交换出来,再依次执行。

public://针对任务队列_task的操作
     //1.将外界的任务(函数)插入到任务队列里
     void TaskInLoop(Function cb)
     {
        {
            std::unique_lock<std::mutex> _lock(_mutex);
            //规定一个作用域,并且顶一个临时的锁对象,在定义域内加锁,出来就解锁。
            _task.push_back(cb);
        }
        //当任务队列里有任务了就要把它执行掉,但存在epoll因为没有事件就绪而阻塞住无法执行任务队列里的任务
        //所以插入一个就需要唤醒epoll防止它在阻塞中
        WakeupEventFd();
     }
     //2.有的任务是不用压入到任务队列里执行的,如果是eventloop对应的线程调用外部函数,则就直接去执行,如果是其他线程调用的,就需要压入到任务队列里去
     //所以外面真正调用的都是这个接口
     void RunInLoop(Function cb)
     {
        if(IsInLopp())return cb();//如果是eventloop对应的线程调用,则直接执行
        //如果不是则需要压入对eventloop的任务队列里执行
        return TaskInLoop(cb);
     }
     //3.执行任务队列里的任务
     void RunAllTask()
     {
        //利用一个临时vector,将task里面的任务全部交换出来,再全部执行
        std::vector<Function> temp_task;
        {
           //访问任务队列就需要加锁
           std::unique_lock<std::mutex> _lock(_mutex);
           _task.swap(temp_task);
        }
        //执行任务队列里的任务
        for(auto &task:temp_task)
        {
            task();
        } 
     }

7.针对定时器的操作

针对定时器的操作很简单,就是添加定时任务,刷新定时任务(本质就是延迟事件),终止定时任务,判断是否有该定时任务等操作。
就是封装一个定时器的任务即可。

public://针对定时器timer_wheel的操作
     //1.添加定时任务
     void AddTask(uint64_t id, uint32_t timeout, const TaskFunc cb)
     {
        return _timer_wheel.AddTask(id,timeout,cb);
     }
     //2.刷新,延迟定时任务
     void RefreshTask(uint64_t id)
     {
        return _timer_wheel.RefreshTask(id);
     }
     //3.终止定时任务
     void CancelTimer(uint64_t id)
     {
        return _timer_wheel.CancelTimer(id);
     }
     //4.查看是否有该定时任务
     bool HasTimer(uint64_t id)
     {
        return _timer_wheel.HasTimer(id);
     }

8.EventLoop的主要工作

eventloop的主要工作:
1.对描述符进行事件监控
2.描述符的事件就绪,则进行事件处理
3.执行任务队列中的任务。
循环往后的执行。

public://Eventloop的主要任务
      void Start()
      {
       while(1)
       {
         //1.进行事件监控,并获取就绪的事件和fd
        std::vector<Channel*> active;
        _poll.Poll(&active);
        //2.进行就绪事件处理
        for(auto &it: active)
        {
            it->HandlerEvent();
        }
        //3.执行任务队列里的任务
        RunAllTask();

       }
      }   
};

二.全部代码

class EventLoop
{
private:
   //每一个eventloop对应一个线程,当线程创建时,eventloop就会被创建出来并绑定线程的id
   std::thread::id _thread_id;    
   //eventloop是一个监控管理模块,里面封装了poller,用来监控所有的连接的事件
   Poller _poll;
   //需要一个通知机制eventfd,用来唤醒可能因为IO事件监控没有事件就绪而阻塞,也就是epoll阻塞,需要唤醒它,往后执行任务队列里的任务
   int _event_fd;
   //eventfd也是一个描述符,也可以挂到poll里进行事件监控,poller监控的对象是Channel*,所以通过Channel来管理event_fd
   std::unique_ptr<Channel> _eventfd_channel;
   //每个eventllop中都有对应的任务队列,里面存放着外界调用的函数
   using Function =std::function<void()>;
   //如果别的线程要执行当前的线程中的conn的对象的函数的时候,是不会执行的,因为会涉及到安全问题,所以就需要吧他执行的任务放到这个任务队列中去,让当前线程自己执行,
   std::vector<Function> _task;
   //任务队列可能存在线程安全问题,所以需要一把锁保护它
   std::mutex _mutex;

public: //构造
    EventLoop():_thread_id(std::this_thread::get_id()),
                _event_fd(CreateEventFd()),
                _eventfd_channel(new Channel(this,_event_fd))
                {
                 //设置_event_fd的读回调函数,当事件就绪就会去执行
                 _eventfd_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd,this));
                 //启动event_fd的读事件监控
                 _eventfd_channel->EnableRead();
                }
public://针对通知机制event_fd的操作:创建eventfd,读取eventfd中数据(就绪回到函数),网eventfd中写数据(唤醒epoll)
     
     //1.创建通知机制event_fd
     static int CreateEventFd()
     {
        int efd=eventfd(0,EFD_CLOEXEC|EFD_NONBLOCK);
        if(efd<0)
        {
            ERRLog("createeventfd failed");
            abort();//异常退出
        }
        return efd;
     }
     //2.构建event_fd的读事件回调函数,读取eventfd中的通知次数,单纯的就是读取出来,没有作用
     void ReadEventFd()
     {
        int res;
        ssize_t ret=(_event_fd,&res,sizeof(res));
        if(ret<0)
        {
           // 有两种情况是合法的套接字是没有问题的
            if (errno == EAGAIN) // EAGAIN表示sock缓冲区中没有数据可读了
                return ;
            if (errno == EINTR) // EINTR表示在读取的过程中被信号中断了
                return ;
            ERRLog("readevent failed");
            abort();
        }
     }
     //3.构建唤醒机制,本质就是玩eventfd中发送一个数据,那么event的读事件就绪就会唤醒阻塞的epoll
     void WakeupEventFd()
     { 
        int val=1;
        ssize_t ret=write(_event_fd,&val,sizeof(val));
        if(ret<0)
        {
            // 有两种情况是合法的套接字是没有问题的
            if (errno == EAGAIN) // EAGAIN表示sock缓冲区中没有空间可写了
                return ;
            if (errno == EINTR) // EINTR表示在写入的过程中被信号中断了
                return ;
            ERRLog("wakeupevent failed");
            abort();
        }

     }
public://针对poller的操作,要封装添加事件监控和移除事件监控的操作
     
     //1.添加或者修改某个连接的事件监控
     void UpdateEvent(Channel*channel)
     {
        return _poll.UpdateEvent(channel);
     }     
     //2.移除某个连接的事件监控
     void RemoveEvent(Channel*channel)
     {
        return _poll.RemoveEvent(channel);
     }
public://针对线程id的操作
     //用来判断当前的线程是否是创建eventloop的线程
     bool IsInLopp()
     {
        return _thread_id==std::this_thread::get_id();
     } 
public://针对任务队列_task的操作
     //1.将外界的任务(函数)插入到任务队列里
     void TaskInLoop(Function cb)
     {
        {
            std::unique_lock<std::mutex> _lock(_mutex);
            //规定一个作用域,并且顶一个临时的锁对象,在定义域内加锁,出来就解锁。
            _task.push_back(cb);
        }
        //当任务队列里有任务了就要把它执行掉,但存在epoll因为没有事件就绪而阻塞住无法执行任务队列里的任务
        //所以插入一个就需要唤醒epoll防止它在阻塞中
        WakeupEventFd();
     }
     //2.有的任务是不用压入到任务队列里执行的,如果是eventloop对应的线程调用外部函数,则就直接去执行,如果是其他线程调用的,就需要压入到任务队列里去
     //所以外面真正调用的都是这个接口
     void RunInLoop(Function cb)
     {
        if(IsInLopp())cb();//如果是eventloop对应的线程调用,则直接执行
        //如果不是则需要压入对eventloop的任务队列里执行
        TaskInLoop(cb);
     }
     //3.执行任务队列里的任务
     void RunAllTask()
     {
        //利用一个临时vector,将task里面的任务全部交换出来,再全部执行
        std::vector<Function> temp_task;
        {
           //访问任务队列就需要加锁
           std::unique_lock<std::mutex> _lock(_mutex);
           _task.swap(temp_task);
        }
        //执行任务队列里的任务
        for(auto &task:temp_task)
        {
            task();
        } 
     }
public://Eventloop的主要任务
      void Start()
      {
       while(1)
       {
         //1.进行事件监控,并获取就绪的事件和fd
        std::vector<Channel*> active;
        _poll.Poll(&active);
        //2.进行就绪事件处理
        for(auto &it: active)
        {
            it->HandlerEvent();
        }
        //3.执行任务队列里的任务
        RunAllTask();

       }
      }   

};
#include "../Server.hpp"


void Handleclose(Channel *channel)
{
    ERRLog("close:%d",channel->Fd());
    channel->Remove(); // 移除监控
    delete channel;
}
void HandleRead(Channel *channel)
{
    int fd = channel->Fd();
    char buf[1024] = {0};
    int ret = recv(fd, buf, 1023, 0);
    if (ret <= 0)
    {
       return Handleclose(channel);
    }
    buf[ret] = 0;
    ERRLog("%s",buf);
    channel->EnableWrite(); // 启动可写事件
}
void HandleError(Channel *channel)
{
    return Handleclose(channel); // 关闭释放
}
void HandleEvent(EventLoop* loop,Channel *channel,uint64_t timerid)
{
    loop->RefreshTask(timerid);
}
void HandleWrite(Channel *channel)
{
    int fd = channel->Fd();
    const char *data = "陶恩威你好呀";
    int ret = send(fd, data, strlen(data), 0);
    if (ret < 0)
    {
        return Handleclose(channel); // 关闭释放
    }
    channel->DisableWrite(); // 关闭写监控
}
void Acceptor(EventLoop *loop, Channel *lst_channel)
{
    int fd = lst_channel->Fd();
    int newfd = accept(fd, NULL, NULL);
    
    if (newfd < 0)
        return;
    uint64_t timerid=rand()%1000;
    Channel *channel = new Channel(loop, newfd);//获得一个新连接并挂到loop上
    channel->SetReadCallback(std::bind(HandleRead, channel));    // 为通信套接字设置可读事件的回调函数
    channel->SetWriteCallback(std::bind(HandleWrite, channel));  // 可写事件的回调函数
    channel->SetCloseCallback(std ::bind(Handleclose, channel)); // 关闭事件的回调函数
    channel->SetErrorCallback(std::bind(HandleError, channel));  // 关闭事件的回调函数
    channel->SetEventCallback(std::bind(HandleEvent, loop,channel,timerid));  // 关闭事件的回调函数
    //定时器的设置必须要在启动监听之前,防止先监听后立刻有了事件,但还没定时任务。该连接就会多存活
    loop->AddTask(timerid,10,std::bind(Handleclose,channel));
    channel->EnableRead();
}
int main()
{
    //Poller poll;
    srand(time(NULL));
    EventLoop loop;
    Socket lst_sock;
    lst_sock.CreateServer(8886);
    Channel *lst_channel = new Channel(&loop, lst_sock.Fd());              // 为listen套接字创建一个channel对象
    lst_channel->SetReadCallback(std::bind(Acceptor, &loop, lst_channel)); // 为listen套接字设置读回调函数Accept,用来接收新连接,并创建Channel对象,添加事件监控,启动写事件监控等
    lst_channel->EnableRead();                                             // 启动读事件监控
    loop.Start();
    lst_sock.Close();
    return 0;

}

#include "../Server.hpp"
int main()
{

    Socket cli_sock;
    cli_sock.CreateClient(8886, "112.126.85.136");
    
    for(int i=0;i<5;i++)
    {
        std::string str = "tew";
        cli_sock.Send(str.c_str(), str.size());
        char buf[1024] = {0};
        int n = cli_sock.Recv(buf, 1023);
        buf[n] = 0;
        DBG_LOG("%s", buf);
        sleep(1);
    }
    while(1)sleep(1);
        

    return 0;
}

在这里插入图片描述

这个简单的服务器流程,就是一开始创建了一个eventloop。然后创建出监听连接,并绑定该eventloop,然后就设置该连接的回调函数。
最后启动lst_channel->EnableRead(); 这个函数就是将该套接字挂到poller上去进行事件监听。当有新连接时,读事件就绪,就会调用Acceptor,在Acceptor中,首先会创建一个新的套接字,并且也绑定该eventloop,然后设置对应的回调函数,最后启动channel->EnableRead();这个函数就是将新的套接字挂到poller上去进行事件监听。当有数据来时,读事件就会就绪。

添加定时销毁就是将任务添加到时间轮里,然后连接每操作一次都会调用一次任意事件,进行刷新时长。