【C/C++】高性能网络编程之Reactor模型

发布于:2025-06-02 ⋅ 阅读:(23) ⋅ 点赞:(0)

Reactor模型

Reactor 模型是一种事件驱动的并发模型,广泛应用于高性能网络服务器开发中,比如 Nginx、Redis、Muduo、libevent 等,属于 同步非阻塞 IO(Reactor 与 Proactor 是两大典型 IO 模型)。


1 核心思想

Reactor 模型通过注册事件和对应的处理器(handler),当某个事件发生时,由事件分发器(Demultiplexer)将其分发给相应的处理器处理。其核心流程为:

事件发生 → 事件多路复用器检测到事件 → 分发事件给处理器 → 处理器处理事件

2 组成组件

组件 说明
Reactor(反应器) 核心调度模块,负责监听 IO 事件并分发到相应处理器
Demultiplexer(多路分发器) 常用 select / poll / epoll,用于等待事件
Handler(事件处理器) 处理具体的读/写/连接等事件逻辑
Acceptor(连接处理器) 处理新连接接入事件
Channel(通道) 封装描述符及事件类型、回调函数

3 工作流程图

+-------------------------+
|        Reactor          |
+-------------------------+
           |
           v
+-------------------------+
|  IO 多路复用器(epoll)   |
+-------------------------+
           |
     事件发生(如连接/读/写)
           |
           v
+-------------------------+
|   分发给对应的 Handler   |
+-------------------------+
           |
           v
+-------------------------+
|   执行读/写/业务处理逻辑  |
+-------------------------+

4 常见的三种 Reactor 模型

4.1 单 Reactor 单线程

  • 所有事件都由一个线程完成监听和处理。
  • 简单但性能有限,适用于低并发场景(如 Redis)。
Reactor 线程: 监听事件 → 处理事件(accept/read/write/业务处理)
  • 优点:结构简单,适合逻辑简单、IO 密集场景。
  • 缺点:处理慢或阻塞将影响其他事件。

4.2 单 Reactor 多线程

  • 主线程负责事件分发,多个工作线程负责业务处理。
  • 比如:主线程只处理 accept,交由线程池处理 read/write。
Reactor 线程: accept
Worker 线程池: read/write/业务处理
  • 优点:适合并发较高,业务处理耗时场景。
  • 缺点:需要加锁、线程同步,逻辑复杂。

4.3 多 Reactor 多线程(主从 Reactor)

  • 主 Reactor:只负责 accept,新连接后分发给从 Reactor。
  • 从 Reactor:每个从线程监听部分连接,处理其 IO 事件。
主 Reactor: accept
从 Reactor: read/write
工作线程池: 业务逻辑处理
  • Nginx、Muduo 等采用此结构。
  • 优点:高性能、高并发、分工明确。
  • 缺点:结构复杂,实现难度大。

5 和 Proactor 区别

特性 Reactor Proactor
IO 模式 同步非阻塞 异步 IO
事件处理 应用负责读/写处理 内核完成 IO,应用只处理结果
使用平台 POSIX/Linux 常见 Windows 支持良好
示例 Nginx、Muduo、libevent、Netty 等 Windows IOCP、Boost.Asio(部分模式)

6 入门教程c++

目标

  • epoll 多路复用;
  • 非阻塞套接字;
  • 事件注册与回调;
  • 支持新连接接入(Acceptor)和客户端数据读取(Handler);
  • 单 Reactor + 单线程模型

设计

组件 职责
Reactor 主循环,监听事件并分发
Channel 封装 fd + 事件类型 + 回调函数
Acceptor 监听 socket,接收新连接
Handler 处理连接上的 IO(如读/写)
EventLoop 封装 epoll,对应主线程或每个工作线程的事件循环

实现代码

reactor-demo/
├── main.cpp
├── EventLoop.h / .cpp
├── Channel.h / .cpp
├── Acceptor.h / .cpp
├── ConnectionHandler.h / .cpp
└── CMakeLists.txt

EventLoop.h

#pragma once
#include <vector>
#include <functional>

class Channel;

class EventLoop {
public:
    EventLoop();
    ~EventLoop();

    void loop();
    void updateChannel(Channel* channel);

private:
    int epoll_fd_;
    static const int MAX_EVENTS = 1024;
};

EventLoop.cpp

#include "EventLoop.h"
#include "Channel.h"
#include <sys/epoll.h>
#include <unistd.h>
#include <cstring>
#include <iostream>

EventLoop::EventLoop() {
    epoll_fd_ = epoll_create1(0);
}

EventLoop::~EventLoop() {
    close(epoll_fd_);
}

void EventLoop::updateChannel(Channel* channel) {
    epoll_event ev;
    ev.events = channel->events();
    ev.data.ptr = channel;
    if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, channel->fd(), &ev) == -1) {
        std::cerr << "epoll_ctl failed\n";
    }
}

void EventLoop::loop() {
    epoll_event events[MAX_EVENTS];
    while (true) {
        int n = epoll_wait(epoll_fd_, events, MAX_EVENTS, -1);
        for (int i = 0; i < n; ++i) {
            Channel* ch = static_cast<Channel*>(events[i].data.ptr);
            ch->handleEvent();
        }
    }
}

Channel.h

#pragma once
#include <functional>

class Channel {
public:
    using Callback = std::function<void()>;

    Channel(int fd);
    void setReadCallback(Callback cb);
    void handleEvent();
    int fd() const;
    int events() const;

private:
    int fd_;
    int events_;
    Callback read_callback_;
};

Channel.cpp

#include "Channel.h"
#include <sys/epoll.h>

Channel::Channel(int fd)
    : fd_(fd), events_(EPOLLIN) {}

void Channel::setReadCallback(Callback cb) {
    read_callback_ = std::move(cb);
}

void Channel::handleEvent() {
    if (read_callback_) read_callback_();
}

int Channel::fd() const { return fd_; }
int Channel::events() const { return events_; }

Acceptor.h

#pragma once
#include "Channel.h"

class EventLoop;

class Acceptor {
public:
    Acceptor(EventLoop* loop, int port, std::function<void(int)> new_conn_cb);
    void listen();

private:
    int listen_fd_;
    Channel channel_;
    std::function<void(int)> new_conn_cb_;
};

Acceptor.cpp

#include "Acceptor.h"
#include "EventLoop.h"
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <iostream>

static int setNonBlocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

Acceptor::Acceptor(EventLoop* loop, int port, std::function<void(int)> cb)
    : listen_fd_(socket(AF_INET, SOCK_STREAM, 0)),
      channel_(listen_fd_),
      new_conn_cb_(cb) {

    sockaddr_in addr {};
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = INADDR_ANY;
    addr.sin_port = htons(port);

    bind(listen_fd_, (sockaddr*)&addr, sizeof(addr));
    setNonBlocking(listen_fd_);
    ::listen(listen_fd_, SOMAXCONN);

    channel_.setReadCallback([this]() {
        sockaddr_in cli_addr;
        socklen_t len = sizeof(cli_addr);
        int conn_fd = accept(listen_fd_, (sockaddr*)&cli_addr, &len);
        if (conn_fd >= 0) {
            setNonBlocking(conn_fd);
            new_conn_cb_(conn_fd);
        }
    });

    loop->updateChannel(&channel_);
}

ConnectionHandler.h

#pragma once
#include "Channel.h"
#include <memory>

class EventLoop;

class ConnectionHandler {
public:
    ConnectionHandler(EventLoop* loop, int fd);

private:
    int conn_fd_;
    Channel channel_;
};

ConnectionHandler.cpp

#include "ConnectionHandler.h"
#include "EventLoop.h"
#include <unistd.h>
#include <iostream>

ConnectionHandler::ConnectionHandler(EventLoop* loop, int fd)
    : conn_fd_(fd), channel_(fd) {
    channel_.setReadCallback([this]() {
        char buf[1024] = {0};
        ssize_t n = read(conn_fd_, buf, sizeof(buf));
        if (n > 0) {
            std::cout << "Recv: " << buf;
            write(conn_fd_, buf, n);  // echo
        } else if (n == 0) {
            close(conn_fd_);
        }
    });
    loop->updateChannel(&channel_);
}

main.cpp

#include "EventLoop.h"
#include "Acceptor.h"
#include "ConnectionHandler.h"

int main() {
    EventLoop loop;

    Acceptor acceptor(&loop, 12345, [&](int conn_fd) {
        new ConnectionHandler(&loop, conn_fd);
    });

    loop.loop();
    return 0;
}

CMakeLists.txt

cmake_minimum_required(VERSION 3.10)
project(ReactorDemo)

set(CMAKE_CXX_STANDARD 17)

add_executable(ReactorDemo
    main.cpp
    EventLoop.cpp
    Channel.cpp
    Acceptor.cpp
    ConnectionHandler.cpp
)

编译并运行:

mkdir build && cd build
cmake ..
make
./ReactorDemo

telnet 测试:

telnet 127.0.0.1 12345

输入数据,能看到回显,说明 Reactor 模型生效。

7 总结

  • Reactor 模型是现代高性能服务器开发的核心框架。
  • 利用 IO 多路复用机制,有效提升单线程处理多个客户端连接的能力。
  • 推荐结合 epoll、线程池、状态机 进一步构建高性能系统。

网站公告

今日签到

点亮在社区的每一天
去签到