【计算机网络】非阻塞IO——poll实现多路转接

发布于:2025-06-10 ⋅ 阅读:(21) ⋅ 点赞:(0)

🔥个人主页🔥:孤寂大仙V
🌈收录专栏🌈:计算机网络
🌹往期回顾🌹:【计算机网络】非阻塞IO——select实现多路转接
🔖流水不争,争的是滔滔不息


一、poll实现多路转接

在网络编程或多路 IO 编程中,我们经常需要同时监听多个文件描述符(fd),比如多个客户端的 socket 连接。这时,poll 就登场了。
poll 是 Linux 提供的一种 IO 多路复用机制,用于监听多个 fd 上的读写等事件,一旦有就绪,立刻通知我们处理。

poll的参数

int poll(struct pollfd fds[], nfds_t nfds, int timeout);
  1. 参数struct pollfd fds[]是关心的文件描述符数组,每一个元素代表一个要监听的 fd 及其感兴趣的事件和返回的事件。
struct pollfd {
    int fd;         // 要监听的文件描述符
    short events;   // 感兴趣的事件(由你设置)
    short revents;  // 实际返回的事件(由内核设置)
};

  1. 参数 nfds_t nfds这个是 fds[] 数组里有效元素的数量,简单说就是你监听几个文件描述符就写几。
  2. int timeout
    单位是 毫秒(ms),表示阻塞多久。timeout > 0:等待指定毫秒数后返回、timeout == 0:立即返回(非阻塞)、timeout < 0:永远阻塞,直到有事件发生。

函数返回值

  • 0:就绪的文件描述符数量
  • 0:超时,没有任何事件发生
  • <0:出错(比如信号中断)

常用事件(events/revents)取值

POLLIN     // 有数据可读
POLLOUT    // 可以写数据而不会阻塞
POLLERR    // 错误(由 revents 设置)
POLLHUP    // 对端关闭连接
POLLNVAL   // 描述符非法

poll的缺点

  1. 每次调用都要传入整个 fd 列表
    poll 的 pollfd 是个数组,内核不会保存状态,每次都得重新传。
    如果你有上千个连接,那每次 poll() 都会把这上千个 fd 重新复制到内核 → 代价大。
  2. 线性扫描效率低
    poll 返回的是“多少个 fd 就绪”,但你要线性扫描整个数组找出来。
    比如你监听 1000 个连接,但只有 1 个可读,你得从 0 扫到 999 才找到它。
  3. fd 数量仍有限制
    虽然 poll 相比 select 不再限制 1024 个,但:
    它还是受限于内核最大文件描述符数量(ulimit -n,比如 65535)
    每个 pollfd 占用空间较大,更不适合极大并发
  4. 无事件通知机制
    poll 只能“等和扫”,没有像 epoll 的 EPOLLONESHOT、EPOLLET(边缘触发)那种高级机制。
    没法在事件处理完后说“我暂时不关注这个 fd 了”。

二、poll实现非阻塞服务器

#pragma once
#include "Common.hpp"
#include "Log.hpp"
#include "Socket.hpp"
#include <sys/poll.h>

using namespace std;
using namespace LogModule;
using namespace SocketModule;

class PollServer
{
    const static int size = 4096;
    const static int defaultfd = -1;

public:
    PollServer(int port)
        : _isrunning(false), _listensockfd(make_unique<TcpSocket>())
    {
        _listensockfd->BuildTcpSocketServer(port); // 构造TCP服务器
        for (int i = 0; i < size; i++)
        {
            _fds[i].fd = defaultfd;
            _fds[i].events = 0;
            _fds[i].revents = 0;
        }
        _fds[0].fd = _listensockfd->FD();
        _fds[0].events = POLLIN;
    }

    void Start() // 服务器启动
    {
        int timeout = 1000; // 1000毫秒
        _isrunning = true;
        while (true)
        {
            int n = poll(_fds, size, timeout); // 多路转接只关系读事件
            switch (n)
            {
            case -1:
                LOG(LogLevel::ERROR) << "poll error"; // 异常
                break;
            case 0:
                LOG(LogLevel::WARNING) << "poll timeout"; // 超时
            default:
                LOG(LogLevel::INFO) << "事件就绪"; // 读事件就绪
                Dispatcher();                      // 派发
                break;
            }
        }
    }
    void Dispatcher() // 事件派发
    {
        for (int i = 0; i < size; i++)
        {
            if (_fds[i].fd == defaultfd) // 跳过
                continue;
            if (_fds[i].revents & POLLIN) // 是读就绪
            {
                if (_fds[i].fd == _listensockfd->FD())
                {
                    // listen 套接字
                    Accept();
                }
                else
                {
                    // 普通  套接字
                    Recv(i);
                }
            }
        }
    }
    void Accept()
    {
        InetAddr client;
        int sockfd = _listensockfd->AcceptOrDie(&client);
        LOG(LogLevel::DEBUG) << "accept a new client" << client.StringAddr();
        int pos = 0;
        for (; pos < size; pos++)
        {
            if (_fds[pos].fd == defaultfd)
                break; // 数组中找到空位
        }
        if (pos == size)
        {
            LOG(LogLevel::WARNING) << "poll server full";
            close(sockfd);
        }
        else
        {
            _fds[pos].fd = sockfd;
            _fds[pos].events = POLLIN;
            _fds[pos].revents = 0;
        }
    }
    void Recv(int pos) // 读数据
    {
        char buffer[1024];
        ssize_t n = recv(_fds[pos].fd, buffer, sizeof(buffer) - 1, 0); // 收信息
        if (n > 0)
        {
            buffer[n] = 0;
            cout << "client say@ " << buffer << endl;
        }
        else if (n == 0) // 客户端退出
        {
            LOG(LogLevel::INFO) << "client quit";
            _fds[pos].fd = defaultfd;
            _fds[pos].events = 0;
            _fds[pos].revents = 0;
            close(_fds[pos].fd);
        }
        else // 出现错误 异常
        {
            LOG(LogLevel::FATAL) << "recv error";
            _fds[pos].fd = defaultfd;
            _fds[pos].events = 0;
            _fds[pos].revents = 0;
            close(_fds[pos].fd);
        }
    }

    ~PollServer()
    {
    }

private:
    unique_ptr<Socket> _listensockfd;
    bool _isrunning;
    struct pollfd _fds[size];
};

构造

//私有成员变量
private:
    unique_ptr<Socket> _listensockfd;
    bool _isrunning;
    struct pollfd _fds[size];
//构造
    PollServer(int port)
        : _isrunning(false), _listensockfd(make_unique<TcpSocket>())
    {
        _listensockfd->BuildTcpSocketServer(port); // 构造TCP服务器
        for (int i = 0; i < size; i++)
        {
            _fds[i].fd = defaultfd;
            _fds[i].events = 0;
            _fds[i].revents = 0;
        }
        _fds[0].fd = _listensockfd->FD();
        _fds[0].events = POLLIN;
    }

私有成员变量中,把关心文件描述符的数组开好,构造服务器的时候遍历这个数组,把要监听的文件描述符设置进去,把要关系的状态设置为关系读事件。


服务器启动

void Start() // 服务器启动
    {
        int timeout = 1000; // 1000毫秒
        _isrunning = true;
        while (true)
        {
            int n = poll(_fds, size, timeout); // 多路转接只关系读事件
            switch (n)
            {
            case -1:
                LOG(LogLevel::ERROR) << "poll error"; // 异常
                break;
            case 0:
                LOG(LogLevel::WARNING) << "poll timeout"; // 超时
            default:
                LOG(LogLevel::INFO) << "事件就绪"; // 读事件就绪
                Dispatcher();                      // 派发
                break;
            }
        }
    }

用poll函数,进行多路转接,事件就绪就派发任务。


事件派发

void Dispatcher() // 事件派发
    {
        for (int i = 0; i < size; i++)
        {
            if (_fds[i].fd == defaultfd) // 跳过
                continue;
            if (_fds[i].revents & POLLIN) // 是读就绪
            {
                if (_fds[i].fd == _listensockfd->FD())
                {
                    // listen 套接字
                    Accept();
                }
                else
                {
                    // 普通  套接字
                    Recv(i);
                }
            }
        }
    }

对文件描述符数组,进行遍历。如果不是合法位置(没放文件描述符)就跳过这个位置。如果是读就绪然后判断是监听套接字合适普通套接字。

收到客户端的连接accept

void Accept()
    {
        InetAddr client;
        int sockfd = _listensockfd->AcceptOrDie(&client);
        LOG(LogLevel::DEBUG) << "accept a new client" << client.StringAddr();
        int pos = 0;
        for (; pos < size; pos++)
        {
            if (_fds[pos].fd == defaultfd)
                break; // 数组中找到空位
        }
        if (pos == size)
        {
            LOG(LogLevel::WARNING) << "poll server full";
            close(sockfd);
        }
        else
        {
            _fds[pos].fd = sockfd;
            _fds[pos].events = POLLIN;
            _fds[pos].revents = 0;
        }
    }

主要是收到客户端的连接,创建了accept的套接字,要把这个套接字放到数组中。在文件描述符数组中找到空位,把这个acceptfd设置进去。


读数据

void Recv(int pos) // 读数据
    {
        char buffer[1024];
        ssize_t n = recv(_fds[pos].fd, buffer, sizeof(buffer) - 1, 0); // 收信息
        if (n > 0)
        {
            buffer[n] = 0;
            cout << "client say@ " << buffer << endl;
        }
        else if (n == 0) // 客户端退出
        {
            LOG(LogLevel::INFO) << "client quit";
            _fds[pos].fd = defaultfd;
            _fds[pos].events = 0;
            _fds[pos].revents = 0;
            close(_fds[pos].fd);
        }
        else // 出现错误 异常
        {
            LOG(LogLevel::FATAL) << "recv error";
            _fds[pos].fd = defaultfd;
            _fds[pos].events = 0;
            _fds[pos].revents = 0;
            close(_fds[pos].fd);
        }

这时候,读数据已经是非阻塞的了,客户端退出和异常要把文件描述符数组的内容清空,然后关闭文件描述符。


在这里插入图片描述
源码:poll多路转接