Reactor 模式全解:实现非阻塞 I/O 多路复用

发布于:2024-03-29 ⋅ 阅读:(23) ⋅ 点赞:(0)

4302f48a32e14b49bdb6314e2b0af1ba.png

8d21465ea8f14802aced98dcb5f9bb43.png

6ed7c2d2f8f64ce898ef83d0c1aed539.png

Reactor网络模式是什么?

Reactor网络模式时目前网络最常用的网络模式。如果你使用Netty,那么你在使用Reactor;如果你使用Twisted,那么你子啊使用Reactor;如果你使用netpoll,那么你在使用Reactor。

这里先给出答案:
Reactor = I/O多路复用+非阻塞I/O。

什么是I/O多路复用?

我们还是先使用文字拆解来看看每个词是什么意思吧。

拆词解释

I/O

I/O表示输入和输出,英文为Input/Output。I为输入,O为输出。我们日常编程中操作最多的无非就是网络和文件了,这两类就属于I/O,我们通常称为网络I/O和文件I/O。

下面是两个Java和Go操作文件I/O的例子:
java按行读取文件:

  try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
            String line;
            while ((line = br.readLine()) != null) {
                System.out.println(line);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

 

Go按行读取文件:

    file, _ := os.Open(fileName)
    defer file.Close()

    reader := bufio.NewReader(file)
    for {
        line,err := reader.ReadString('\n')
        if err != nil {
            break
        }
    fmt.Print(line)

 

好的,I/O搞清楚了我们就是搞清楚多路。

多路

多路字面意思就是多条路,放在计算机网络编程中的话,一般是指多个通道或者数据源。比如:你的进程或者需要打开很多的文件或者有很多网络连接,并监控这些文件或者网络连接是否发生变化(也就是是否产生一些事件)以进行必要的处理。

复用

复用的字面意思就是重复使用。我们把多路放到计算机网络编程中的话,一般是指重复使用一个或者几个线程,这里的关键是线程一定要很少而且重复使用它来完成I/O+多路。
总的来说就是:重复使用一个或者几个献策会给你来完成多路I/O的变化(事件)处理。

给I/O多路复用下个定义

好了,有了以上的背景或许你已经对I/O多路复用有了自己的理解和定义。这儿我根据自己的理解来对I/O多路复用进行定义:
I/O多路复用就是使用一个或者几个进程或者线程来完成大量通道或者数据源的事件监控和处理。

I/O多路复用复用了什么?

复用了线程。
在没有I/O多路复用之前,可能客户端每创建一个连接服务端都需要新建一个线程来处理事件,这样10K个客户端连接就需要10K个线程,服务端应对这些连接很吃力,因为创建线程有开销,切换线程有开销,还有同步,锁,死锁等问题。

那有了I/O多路复用之后,服务端可能1个线程就可以应对10K个连接的事件。

一般使用什么技术实现I/O多路复用

I/O多路复用技术实现依赖于操作系统,但是主流操作系统都是支持,下面是三大操作系统对I/O多路复用的支持:

  1. Linux: 这个是目前的大哥,Linux使用epoll,当然还有select, poll,目前网络上基本都用epoll
  2. MacOS: Kqueue
  3. Windows: IOCP I/O完成端口

I/O多路复用就告一段落,看下非阻塞I/O。

什么是非阻塞I/O

非阻塞I/O是相对于阻塞I/O而言的,它们之间的区别就是你进行I/O操作时是否阻塞你后续的执行。非阻塞不会阻塞后续执行,而阻塞会。这就好比:
你用某App网上下单到店取一样。假设你直接到店里面用手机下单,你必须在店里等待食物准备好。在这个过程中,你不能去做其他任何事情,直到拿到东西后,你才离开。

而非阻塞I/O就像是你网上下单起手配送,在起手配送期间你可以和你的朋友或者同时唠唠嗑,等骑手把东西送到给你打电话的时候你就下去拿。

总的来说:阻塞I/O中的程序在等待I/O完成时会一直停留在相应操作上,不会执行后续的代码。与之相反,在非阻塞I/O模式下,程序会立即返回一个状态值,如果I/O尚未完成,则程序可以继续执行其他任务,然后随后再次检查I/O状态。
阻塞I/O: 死等
非阻塞I/O:立即返回,下次重试

I/O多路复用和非阻塞I/O组合到一起擦出什么样的火花?

Reactor设计模式结合了非阻塞I/O和I/O多路复用,使得单个线程就能高效地处理多个网络通信。这种结合擦出的“火花”就是使事件驱动的网络服务器变得可能,这种服务器可以以非常轻量级的方式支持大规模并发连接。

在Reactor模式中,一个中央分派器(Reactor)负责监听所有I/O事件(使用select、poll、epoll等系统调用),并且当某个事件发生时,它将调用预先注册的回调函数来处理这些事件。由于采用了非阻塞I/O,这个中央分派器在等待I/O事件时不会被阻塞,这使得它可以在任何给定时间处理上千甚至上万个不同的I/O请求。
下面是单线程的Reactor模型:

单线程Reactor模式

e47b5644f5874f69998ce30f1f2099e7.png

快速实现一个Reactor

  1. 代码关键点1:Reactor线程创建一个事件循环(可能这会勾起你想起Netty的Boss)
 // 创建线程,执行事件循环
    pthread_t accept_threads[2];
    for (int i = 0; i < 1; i++) {
        printf("create acceptor thread. index: %d\n", i);
        // run_event_loop为事件的处理函数,循环处理
        pthread_create(&accept_threads[i], NULL, run_event_loop, &server_fd);
        pthread_detach(accept_threads[i]);
    }

 

  1. 代码关键点2: 事件处理线程(可能这会勾起你想起Netty的Worker)
// 事件发生后的处理函数: handle_io_event
pthread_create(&worker_thread, NULL, handle_io_event, &client_fd);

 

  1. 代码关键点3: 非阻塞I/O
// 设置文件描述符为非阻塞I/O
fcntl(client_fd, F_SETFL, fcntl(client_fd, F_GETFL, 0) | O_NONBLOCK);

// 设置非阻塞
fcntl(server_fd, F_SETFL, fcntl(server_fd, F_GETFL, 0) | O_NONBLOCK);

 

  1. 完整代码
#include <stdio.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <signal.h>

#define PORT 12345
#define MAX_EVENTS 10
#define BUFF_SIZE 1024
#define WORKER_SIZE 4

// epoll file descriptor
int epoll_fd;

// handlers
void* run_event_loop(void* arg);
void* handle_io_event(void* arg);
void wait_to_death();

int main() {
    int server_fd;
    struct sockaddr_in server_addr;

    // 创建server
    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    // 设置非阻塞
    fcntl(server_fd, F_SETFL, fcntl(server_fd, F_GETFL, 0) | O_NONBLOCK);

    // 绑定
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(PORT);

    printf("binding\n");
    bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));

    printf("listen\n");
    // 监听
    listen(server_fd, MAX_EVENTS);

    printf("epoll create\n");
    // epoll创建
    epoll_fd = epoll_create1(0);

    struct epoll_event event;
    event.events = EPOLLIN;
    event.data.fd = server_fd;
    printf("epoll add\n");
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event);

    // 创建线程,执行事件循环
    pthread_t accept_threads[2];
    for (int i = 0; i < 1; i++) {
        printf("create acceptor thread. index: %d\n", i);
        pthread_create(&accept_threads[i], NULL, run_event_loop, &server_fd);
        pthread_detach(accept_threads[i]);
    }

    wait_to_death();

    close(epoll_fd);
    close(server_fd);

    return 0;
}

void* run_event_loop(void* arg) {
    int server_fd = *(int*)arg;

    while (1) {
        struct epoll_event events[MAX_EVENTS];
        int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        for (int i = 0; i < n; i++) {
            if (events[i].data.fd == server_fd) {
                // 新连接
                int client_fd = accept(server_fd, NULL, NULL);

                // 设置非阻塞
                fcntl(client_fd, F_SETFL, fcntl(client_fd, F_GETFL, 0) | O_NONBLOCK);

                // worker线程负责处理这个事件
                pthread_t worker_thread;
                pthread_create(&worker_thread, NULL, handle_io_event, &client_fd);
            }
        }
    }
}

void* handle_io_event(void* arg) {
    int client_fd = *(int*)arg;

    while (1) {
        char buff[BUFF_SIZE] = {0};
        int len = read(client_fd, buff, BUFF_SIZE);
        if (len <= 0) {
            close(client_fd);

            struct epoll_event event;
            event.events = EPOLLIN;
            event.data.fd = client_fd;
            epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, &event);

            break;
        }
        else {
            printf("Received %s from client\n", buff);
        }
    }

    return NULL;
}

void wait_to_death() {
    sigset_t allset;
    sigemptyset(&allset);
    sigaddset(&allset, SIGINT); // Ctrl+C
    sigaddset(&allset, SIGQUIT); // Ctrl+\

    int sig;
    for (;;) {
        int err = sigwait(&allset, &sig);
        if (err == 0) {
            printf("received signal %d, prepare to exit\n", sig);
            break;
        }
    }
}

 

搞定收工,如有错误请指正,谢谢

 

本文含有隐藏内容,请 开通VIP 后查看