【高并发服务器】多路复用的总结 eventfd timerfd

发布于:2025-07-16 ⋅ 阅读:(17) ⋅ 点赞:(0)

多路复用

select

过程

过程:三次循环 两次用户态<–> 内核态的拷贝,且select返回时会直接修改fd_set,意味着下次使用时仍要执行步骤1

  1. 将需要关心的fdArr设置进fd_set
  2. 调用select,待监听的fd_set从用户空间复制到内核空间
  3. select遍历fd_array,判断时间就绪
  4. select返回,内核把就绪的 FD 集合复制回用户空间
  5. 遍历fdArr和fd_set,判断哪些fd就绪

demo

void Start()
{
    int listensock = _listenSock.getSocketFd();
    fd_array[0] = listensock;
    while(true)
    {
        fd_set rfds;
        FD_ZERO(&rfds);

        int maxfd = fd_array[0];
        for (int i = 0; i < fd_num_max; i++) // loop one
        {
            if (fd_array[i] == defaultFd)
                continue;
            FD_SET(fd_array[i], &rfds);
            if (maxfd < fd_array[i])
            {
                maxfd = fd_array[i];
                lg(Info, "max fd update, max fd is: %d", maxfd);
            }
        }

        struct timeval timeout = {0, 0};//null:阻塞等待; {0, 0}:非阻塞; 其余:指定时长

        int n = select(maxfd + 1, &rfds, nullptr, nullptr, /*&timeout*/ nullptr);
        switch (n)
        {
            case 0:
                cout << "time out, timeout: " << timeout.tv_sec << "." << timeout.tv_usec << endl;
                break;
            case -1:
                cerr << "select error" << endl;
                break;
            default:
                cout << "get a new link!!!!!" << endl;
                Dispatcher(rfds);
                break;
        }
    }
}
void Dispatcher(fd_set &rfds)
{
    for (int i = 0; i < fd_num_max; i++) //loop two
    {
        int fd = fd_array[i];

        if (fd == defaultFd)
            continue;
        if (FD_ISSET(fd, &rfds))
        {
            if (fd == _listenSock.getSocketFd())
                Accepter(); // 连接管理器
            else           
                Recver(fd, i); // 读事件管理器
        }
    }
}

缺点

  1. 能够监视的文件描述符数量有上限
  2. 性能低下

poll

过程

  1. 在用户态维护一个结构体数组,存储需要关心的fd及其感兴趣的事件。
struct pollfd fds[10];
fds[0].fd = sockfd;
fds[0].events = POLLIN;  // 监听可读事件
  1. 调用 poll ,拷贝:pollfd数组从用户空间 → 内核空间
  2. 内核线性扫描pollfd数组,检查并更新每个FD的就绪状态:poll为每个感兴趣的fd注册一个等待事件,这些等待事件会被挂接到内核中相应的等待队列上
  3. 内核将修改后的pollfd数组从内核空间复制回用户空间
  4. 遍历结构体数组,判断哪些fd就绪

demo

void Start()
{
    _event_fds[0].fd = _listensock.getSocketFd();
    _event_fds[0].events = POLLIN;
    int timeout = 3000; // 3s
    while(true)
    {
        int n = poll(_event_fds, fd_num_max, timeout);
        switch (n)
        {
            case 0:
                cout << "time out... " << endl;
                break;
            case -1:
                cerr << "poll error" << endl;
                break;
            default:
                cout << "get a new link!!!!!" << endl;
                Dispatcher();
                break;
        }
    }
}
void Dispatcher()
{
    for (int i = 0; i < fd_num_max; i++) // loop
    {
        int fd = _event_fds[i].fd;
        if (fd == defaultFd)
            continue;

        if (_event_fds[i].revents & POLLIN)
        {
            if (fd == _listensock.getSocketFd())
                Accepter(); // 连接管理器
            else
                Recver(fd, i); // non listenfd
        }
    }
}

优缺

  1. 解除文件描述符数量上限
  2. 分离用户事件和内核返回的事件,不用每次使用前都循环设置感兴趣的fd
  3. 仍存在拷贝和循环的开销

epoll

过程

  1. 创建 epoll 模型:创建红黑树、就绪队列,注册底层回调机制。
int epfd = epoll_create(size);  // Linux 2.6.8 后 size 参数被忽略,填任意正数即可

红黑树用于存放通过epoll_ctl方法向epoll对象中添加进来的事件,所有添加到epoll模型中的事件都会与设备(网卡)驱动程序建立回调关系。
当响应的事件发生时会调用回调方法。这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表即fd就绪队列中。

在epoll中,对于每一个事件,都会建立一个epitem结构体【红黑树结点】

struct epitem
{  
    struct rb_node  rbn;//红黑树节点  
    struct list_head    rdllink;//双向链表节点  
    struct epoll_filefd  ffd;  //事件句柄信息  
    struct eventpoll *ep;    //指向其所属的eventpoll对象  
    struct epoll_event event; //期待发生的事件类型  
} // fd event 就绪队列 所处的epoll模型 所处的红黑树结点
  1. 注册监听事件,用户空间 → 内核空间,将事件挂载到红黑树并注册回调机制
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  • 参数
    • op:操作类型(EPOLL_CTL_ADD/MOD/DEL)。
    • fd:要监听的 FD。
    • event:指定监听的事件类型(如 EPOLLINEPOLLET)。
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;  // 可读事件 + 边缘触发模式
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);  // 将 fd 加入 epoll 实例
  1. 等待事件发生,epoll会将就绪的事件放入就绪队列,调用epoll_wait时以O(1)复杂度判断是否存在就绪事件,以O(N)复杂度将就绪事件复制到 events 数组。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
  • 参数
    • events:用于存储就绪 FD 的数组(由用户分配)。
    • maxevents:最多返回的事件数。
    • timeout:超时时间(毫秒)。
  1. 处理就绪事件,用户遍历 events 数组,处理每个就绪的 FD:
struct epoll_event events[10];
int nfds = epoll_wait(epfd, events, 10, -1);  // 阻塞等待
for (int i = 0; i < nfds; i++) {
    if (events[i].events & EPOLLIN) {
        // 处理可读事件
    }
    if (events[i].events & EPOLLOUT) {
        // 处理可写事件
    }
}

demo

void Start()
{
    // 将listensock添加到epoll中 -> listensock和他关心的事件 添加到内核epoll模型中rb_tree
    _epollerPtr->EpllerUpdate(EPOLL_CTL_ADD, _listenSocketPtr->getSocketFd(), ET_EVENT_IN);
    struct epoll_event revs[num];
    while(true)
    {
        int n = _epollerPtr->EpollerWait(revs, num);
        if (n > 0) // 有事件就绪
        {
            lg(Debug, "event happened, first event fd is : %d", revs[0].data.fd);
            Dispatcher(revs, n);
        }
        else if (n == 0)
            lg(Info, "time out ...");
        else
            lg(Error, "epll wait error");
    }
}
void Dispatcher(struct epoll_event revs[], int num)
{
    for (int i = 0; i < num; i++)
    {
        uint32_t events = revs[i].events;
        int fd = revs[i].data.fd;
        if (events & ET_EVENT_IN) // 客户联连接读取事件就绪
        {
            if (fd == _listenSocketPtr->getSocketFd())
                Accepter();
            else // 普通读取事件就绪
                Recver(fd);
        }
        // else if (events & EVENT_OUT){}
        // else{}
    }
}
void Accepter() // 获取了一个新连接
{
    std::string clientip;
    uint16_t clientport;
    int sock = _listenSocketPtr->Accept(&clientip, &clientport);
    if (sock > 0)
    {
        // 不能直接读取 原因select/poll已讲
        _epollerPtr->EpllerUpdate(EPOLL_CTL_ADD, sock, ET_EVENT_IN);
        lg(Info, "get a new link, client info@ %s:%d", clientip.c_str(), clientport);
    }
}

void Recver(int fd)
{
    char buffer[1024];
    // 1.不一定是完整的报文
    // 2.读到部分报文 下次的Recver()函数中buffer是局部变量 
    // A报文如果发两次过来 也无法拼接成完整报文
    ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
    if (n > 0)
    {
        buffer[n] = 0;
        std::cout << "get a messge: " << buffer << std::endl;
        std::string echo_str = "server echo $ ";
        echo_str += buffer;
        write(fd, echo_str.c_str(), echo_str.size());
    }
    else if (n == 0)
    {
        lg(Info, "client quit, me too, close fd is : %d", fd);
        _epollerPtr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);
        close(fd);
    }
    else
    {
        lg(Warning, "recv error: fd is : %d", fd);
        _epollerPtr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);
        close(fd);
    }
}

优点

判断有没有事件就绪的时间复杂度为 O(1)。【只需要看队列是否为空】
获取就绪的时间复杂度为 O(n),需要将就绪队列中的节点一个一个拷贝到应用层。
fd 和 event 没有上限,该红黑树有多大由操作系统承载。
不需要在用户层由用户维护一个数组这样的数据结构来管理所有的文件描述符及其要关心的事件。
epoll_wait() 的返回值 n表示有 n 个 fd 就绪了。该接口会将已经就绪的节点放入输出型参数 events ,所以就绪事件是连续的,有 n 个!上层用户处理已经就绪的事件不需要像以前一样检测有哪些 fd 是非法的,哪些是没有就绪的了,只需要根据返回值 n,遍历 events 即可!

面试题

1. 总连接多、活跃连接少:epoll 更高效

epoll 的核心优势是 “事件驱动”+“内核维护状态”

  • 注册阶段epoll 通过 epoll_ctl 将所有需要监听的文件描述符(FD)注册到内核的红黑树中,后续无需重复传递整个 FD 集合(仅需调用 epoll_wait 等待事件)。
  • 事件通知:内核会主动记录活跃的 FD 并放入就绪队列,epoll_wait 直接返回就绪的 FD 列表(无需用户遍历所有 FD)。

当总连接多但活跃少(如百万级连接中仅几百个活跃)时:

  • epoll 只需处理少量活跃 FD,无需遍历全部连接,时间复杂度为 O(活跃数),效率极高。
  • select/poll 每次调用都需将所有 FD 从用户态拷贝到内核态,并由内核遍历全部 FD 检查状态(时间复杂度 O(总连接数)),总连接数越多,开销越大。
2. 总连接少、活跃连接多:select 可能更优

select 的劣势在“总连接多”时被放大,但在“总连接少”时,其 “简单直接” 的特点反而可能更高效:

  • select 无需提前注册 FD(每次调用直接传入 FD 集合),省去了 epollepoll_createepoll_ctl 等额外的系统调用和内核红黑树维护成本。
  • 当活跃连接多(接近总连接数)时,epoll 的“内核筛选活跃 FD”优势被削弱(因为大部分 FD 都是活跃的,筛选成本接近遍历全部),而 select 省去了红黑树操作的开销,反而可能更快。

例如:若总连接仅 10 个且 8 个活跃,select 一次拷贝 + 遍历 10 个 FD 的成本,可能低于 epoll 维护红黑树 + 调用 epoll_ctl 的成本。

总结

总连接多,设置一次关注就行,活跃连接少,事件回调即可。select在总连接多时的循环和拷贝存在极大开销。

总连接少,活跃连接多,循环和拷贝均有效且不用维护红黑树,就绪队列,回调机制等复杂机制。

基于时间轮的定时器

#include <iostream>
#include <vector>
#include <unordered_map>
#include <cstdint>
#include <functional>
#include <memory>
#include <unistd.h>
// g++ timerWheel.cpp -o timerWheel -std=c++11
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
private:
    uint64_t _id;         // 定时任务ID
    uint32_t _expiration; // 定时任务的超时时间
    bool _isValid;        // 定时任务是否有效

    TaskFunc _onProcess;          // 定时任务到期 任务执行回调
    ReleaseFunc _removeFromTable; // 定时任务释放 删除哈希表记录
public:
    TimerTask(uint64_t id, uint32_t expireTime, const TaskFunc &task_cb)
        : _id(id), _expiration(expireTime), _onProcess(task_cb), _isValid(true)
    {
    }

    ~TimerTask()
    {
        // 任务有效 则执行任务
        if (_isValid == true)
            _onProcess();

        // 不论任务是否有效 都需要释放哈希表中的定时任务记录
        _removeFromTable();
    }

    void Cancel()
    {
        _isValid = false;
    }

    void setRelease(const ReleaseFunc &removeFromTable)
    {
        _removeFromTable = removeFromTable;
    }

    uint32_t getExpTime()
    {
        return _expiration;
    }
};

class TimerWheel
{
    using Task_wp = std::weak_ptr<TimerTask>;
    using Task_sp = std::shared_ptr<TimerTask>;

private:
    int _tick;     // 秒针 移动到哪个位置就执行该位置的特定事件
    int _capacity; // 时间轮最大数量/最大延迟时间
    std::vector<std::vector<Task_sp>> _wheel;
    std::unordered_map<uint64_t, Task_wp> _timers;

private:
    void Remove_Timers(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it != _timers.end())
            _timers.erase(it);
    }

public:
    TimerWheel()
        : _tick(0), _capacity(60), _wheel(_capacity)
    {
    }

    void AddTimerTask(uint64_t id, uint32_t expireTime, const TaskFunc &task_cb)
    {
        Task_sp taskPtr(new TimerTask(id, expireTime, task_cb));
        taskPtr->setRelease(std::bind(&TimerWheel::Remove_Timers, this, id));

        int pos = (_tick + expireTime) % _capacity;
        _wheel[pos].push_back(taskPtr);
        _timers[id] = Task_wp(taskPtr);
    }

    // 刷新/延迟定时任务
    void RefreshTimerTask(uint64_t id)
    {
        // 通过保存的定时器对象的weak_ptr构造一个shared_ptr出来,添加到轮子中
        auto it = _timers.find(id);
        if (it == _timers.end())
            return; // 定时任务不存在

        Task_sp taskPtr = it->second.lock();
        int expireTime = taskPtr->getExpTime();
        int pos = (_tick + expireTime) % _capacity;
        _wheel[pos].push_back(taskPtr);
    }

    void TimerCancel(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it == _timers.end())
            return;
        // 此处wp变为sp会增加引用计数
        // 但是taskPtr在该函数调用完后会析构引用计数--
        Task_sp taskPtr = it->second.lock();

        if (taskPtr)
            taskPtr->Cancel();
    }

    // 秒针每秒走一步 该函数每秒被执行一次
    void TickMove()
    {
        _tick = (_tick + 1) % _capacity;
        _wheel[_tick].clear(); // clear调用该时刻下所有定时任务指针的析构 即引用计数--
    }

    // 输出查看_timers
    void PrintTimers()
    {
        for (auto &it : _timers)
        {
            std::cout << "id: " << it.first << std::endl;
        }
    }
};

class A
{
public:
    A() { std::cout << "构造" << std::endl; }
    ~A() { std::cout << "析构" << std::endl; }
};

void Delete_APtr(A *pA)
{
    delete pA;
}

int main()
{
    A *pA = new A();
    TimerWheel timerWheel;
    timerWheel.AddTimerTask(1234, 3, std::bind(Delete_APtr, pA));

    for (int i = 0; i < 3; i++)
    {
        sleep(1);
        timerWheel.RefreshTimerTask(1234);
        std::cout << "定时任务被刷新" << std::endl;
        timerWheel.TickMove();
        timerWheel.PrintTimers();
    }
    timerWheel.TimerCancel(1234);

    while (1)
    {
        std::cout << "-------------------" << std::endl;
        timerWheel.TickMove();
        timerWheel.PrintTimers();

        sleep(1);
    }
    return 0;
}

eventfd timerfd


网站公告

今日签到

点亮在社区的每一天
去签到