【C++ - 仿mudou库one thread one loop式高并发服务器实现】

发布于:2025-05-18 ⋅ 阅读:(18) ⋅ 点赞:(0)

文章目录


项目介绍

通过多Reactor多线程:多I/O多路复用+线程池(业务处理)的处理模式,搭建高并发服务器组件。

主Reactor线程监控监听描述符,获取新建连接,随后分发给从属Reactor进行通信事件监控,从Reactor监控各自描述符的读写事件,一旦事件触发,则接收数据分发给Work线程池,Work线程池分配独立的线程进行业务处理,处理完毕后,将响应交给子Reactor线程进行数据响应。

利用CPU多核资源达到并发业务处理的目的。

项目模块和服务器主要设计模式

关于整个项目,分为两大模块

一、Server服务器模块

二、应用层协议支持的的协议模块。

项目主要流程

项⽬的主要流程:

  1. 在实例化TcpServer对象过程中,完成BaseLoop的设置,Acceptor对象的实例化,以及EventLoop线程池的实例化,以及std::shared_ptr的hash表的实例化。
  2. 为Acceptor对象设置回调函数:获取到新连接后,为新连接构建Connection对象,设置
    Connection的各项回调,并使⽤shared_ptr进⾏管理,并添加到hash表中进⾏管理,并为
    Connection选择⼀个EventLoop线程,为Connection添加⼀个定时销毁任务,为Connection添加
    事件监控
  3. 启动BaseLoop。
  4. 通过Poller模块对当前模块管理内的所有描述符进⾏IO事件监控,有描述符事件就绪后,通过描述符对应的Channel进⾏事件处理。
  5. 所有就绪的描述符IO事件处理完毕后,对任务队列中的所有操作顺序进⾏执⾏。
  6. 由于epoll的事件监控,有可能会因为没有事件到来⽽持续阻塞,导致任务队列中的任务不能及时得到执⾏,因此创建了eventfd,添加到Poller的事件监控中,⽤于实现每次向任务队列添加任务的时候,通过向eventfd写⼊数据来唤醒epoll的阻塞。
  7. 实现向Channel提供可读,可写,错误等不同事件的IO事件回调函数,然后将Channel和对应的描述符添加到Poller事件监控中。
  8. 当描述符在Poller模块中就绪了IO可读事件,则调⽤描述符对应Channel中保存的读事件处理函
    数,进⾏数据读取,将socket接收缓冲区全部读取到Connection管理的⽤⼾态接收缓冲区中。然
    后调⽤由组件使⽤者传⼊的新数据到来回调函数进⾏处理。
  9. 组件使⽤者进⾏数据的业务处理完毕后,通过Connection向使⽤者提供的数据发送接⼝,将数据
    写⼊Connection的发送缓冲区中。
  10. 启动描述符在Poll模块中的IO写事件监控,就绪后,调⽤Channel中保存的写事件处理函数,将发送缓冲区中的数据通过Socket进⾏⾯向系统的实际数据发送。

前置知识

1.bind函数

bind函数一般用来绑定一些回调函数。
一般在各个模块中,比如该项目下的Channel模块,EventLoop模块,Connection模块等。
都需要监听获取到新链接,并对链接进行事件监控管理,一旦触发新事件,就要对事件进行具体的处理,如数据接收发送解析处理等,业务处理等。

为了让各个模块更清晰明确地完成这些模块的任务,就把具体的数据处理,事件处理的具体操作用具体的接口完成,在这些模块中只需要绑定具体的接口即可完成数据处理。

这样就降低了代码直接的耦合,让各个模块直接的功能更加清晰。

bind函数使用案例

//  function<返回值类型(参数一类型,参数二类型,...)> 函数名 = [选择值传递,引用传递等](参数类型  形参名,参数类型 形参名, ...) -> 返回值类型{
//            ... 
//   };
//参数1:绑定一个函数,参数2:传递该函数的参数,std::placeholders::_1 可以用来传递更多参数(但只能是int类型)
//bind (Fn&& fn, Arge& args,..)        

//使用案例
void print(const std::string& str, int num)
{
    std::cout << str << num << std::endl;
}

int main()
{
    using Task = std::function<void()>; 

    std::vector<Task> array;
    
    array.push_back(std::bind(print,"hello",10));
    array.push_back(std::bind(print,"hello world",10));
    array.push_back(std::bind(print,"nihao",10));
    array.push_back(std::bind(print,"bite",10));

    for(auto & func : array)
    {
        func();
    }
    return 0;
}

2.定时器任务TimerTask和时间轮思想TimerWheel

在当前的并发服务器中,必须要重视一个问题:客户端连接上服务器后,长时间不进行数据通信,但是也不关闭,就会造成资源空耗。
所以就需要设置定时任务,定时将超时的连接进行释放。
在这里插入图片描述
下面是定时任务对象TimerTask类,用于封装定时任务。
在这里插入图片描述
时间轮思想:
单独创建一个定时器,并给该定时器设置一个超时时间,时间到了之后该定时器会去遍历服务器的所有链接,检测哪些连接是超时的,但是如果有上万个链接,这样无疑效率非常低。

所以就引进时间轮的算法来优化,提高效率。

定义⼀个数组,并且有⼀个指针,指向数组起始位置,这个指针每秒钟向后⾛动⼀步,⾛到哪⾥,则代表哪⾥的任务该被执⾏了,那么如果我们想要定⼀个3s后的任务,则只需要将任务添加到tick+3位置,则每秒中⾛⼀步,三秒钟后tick⾛到对应位置,这时候执⾏对应位置的任务即可。
在这里插入图片描述
上述操作也有⼀些缺陷,⽐如我们如果要定义⼀个60s后的任务,则需要将数组的元素个数设置
为60才可以,如果设置⼀⼩时后的定时任务,则需要定义3600个元素的数组。
因此,可以采⽤多层级的时间轮,有秒针轮,分针轮,时针轮, 60<time<3600则time/60就是分针轮
对应存储的位置,当tick/3600等于对应位置的时候,将其位置的任务向分针,秒针轮进⾏移动。

在这里插入图片描述

复杂思考:
在这里插入图片描述
具体一个task任务放到定时器中的实例图:
在这里插入图片描述
但是编写时间轮模块类还需要注意一个问题:
在这里插入图片描述
TimerTask定时任务对象和TimerWheel时间轮的联合详细解析图:
在这里插入图片描述

3.正则表达式

使用示例

regex_match(src,matches,e);
src:源字符串
matches:容器
e:匹配规则
#include <iostream>
#include <string>
#include <regex>

int main()
{
    std::string src = "/abcd/1234";
    
    //匹配规则
    std::regex e("/abcd/(\\d+)");
    //匹配以 /abcd/ 开始,后面跟的一个或多个数字字符的字符串,并在匹配过程中提取这个字符串
    // "\\"表示转义字符,如果是 "\d+",这样'\d'就是转义字符了
    // d 表示后面匹配一个数字字符,d+匹配的是多个数字字符,直到匹配到第一个非数字字符为止
    std::smatch matches; //存放匹配成功的字符串的容器
    //第一个是src字符串本身,后面就是匹配成功的字符串。

    bool ret = std::regex_match(src,matches,e); 
    if(ret == false)
        return -1;
    for(auto &s: matches)
    {
        std::cout << s << std::endl;
    }
    
    return 0;
}

使用正则表达式对HTTP进行请求

  • 1.提取匹配方法,GET|HEAD|PUSH|PUT|DELETE 等等
  • 2.提取HTTP资源路径。从请求方法之后的"空格"开始直到遇到"?"结束,这段字符串就是资源路径。
  • 3.提取查询字符串。也就是提取"?“之后的字符串,直到空格截止(不要”?")
  • 4.提取协议版本
int main()
{
    // HTTP请求行格式 "GET /bitejiuyeke/login?user=xiaoming&pass=123123 HTTP/1.1\r\n"
    std::string str = "GET /bitejiuyeke/login?user=xiaoming&pass=123123 HTTP/1.1";
    std::smatch matches; //存储容器
    //请求方法的匹配: GET HEAD POST PUT DELETE ..
    std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)\\?(.*) (\\HTTP/1\\.[01])"); // "."匹配任意非\r\n的单个字符,"*"表示能匹配前面的"."0次或多次
    //很明显,这里就是匹配除了GET之后的所有字符

    // [^?] :匹配非"?"字符,*表示匹配0次或多次
    
    // \\?(.*) 匹配一个以"?"开始的字符(不要这个问号)并持续匹配直到遇到空格,注意有一个空格 
    // \\?(.*)与 (\\?.*)不同,第一个不要"?"了,第二个却要"?"
    
    // (\\HTTP/1\\.[01]): 匹配HTTP/1. [01]:表示匹配0或者1 , "."表示匹配任意非\r\n的单个字符,但是这里我们想要的是真正的"."
    // 那就要使用 "\\."转义成单个"."
    bool ret = std::regex_match(str,matches,e);
    if(ret == false)
        return -1;

    for(auto &s : matches)
        std::cout << s << std::endl;
    return 0;
}

4.通用型容器Any类

在Connection对链接进行管理时,不可避免地会涉及到获取应用层协议信息,并进行解析和处理。d但是应用层协议有那么多种,我们不能只单独实现针对一个协议的代码,只能实现一个通用类型的容器来保存该这些应用层协议的请求和解析。

这个容器必须能保存各种不同类型结构的数据。
设计思想:

  • 1.设计一个模板类,但是通过该类实例化对象的时候要传递模板参数,如:Any< int > a = 10…
  • 我就是因为不知道数据类型菜设计的Any类,显然这种方法行不通。
  1. 同样设计一个类,只是这个类中再设计一个模板类,让这个模板类存储数据
class Any
{
    template<class T>
    class placeholder
    {
    private:
        T _val;
    };
};

但是这样实例化对象的时候,同样要传递模板参数。
所以,还应该设计一个父类holder,让placeholder继承该类,通过holder类的指针或者引用,即可访问placeholder类的成员函数。即可实现Any类只存储一个holder类的指针,就能访问到存储的数据。

Any类的设计思想:
在这里插入图片描述
具体实现:

class Any
{
private:
    class holder       //父类,用来被继承的
    {
    public:
        virtual ~holder() {}
        virtual const std::type_info& type() = 0 ;     
        virtual holder* clone() = 0 ;           
    };

    //子类才是用来保存数据的
    template<class T>
    class placeholder : public holder      //子类继承父类
    {
    public:
        placeholder(const T& val) : _val(val) {}  //内置类型不做处理,自定义类型T会调用它的拷贝构造
        // 获取子类对象的数据类型
        virtual const std::type_info& type() { return typeid(T); } 
        // 针对当前子类对象自身,克隆一个新的子类对象
        virtual holder* clone() { return new placeholder(_val); }
    public:
        T _val;
    };

    holder* _content;

public:
    Any() :_content(nullptr) {}
    template<class T>
    Any(const T& val) : _content(new placeholder<T>(val)) {}               //根据数据类型构造一个Any对象
    Any(const Any& other) : _content(other._content == nullptr ? nullptr : other._content->clone()) {}     //拷贝构造
    //这里的拷贝构造不能直接复制一个other地址,而是要复制other对象内的父类指针指向的子类对象(placeholder)的地址
    ~Any() {delete _content;}

    Any& swap(Any& other)  //加了const后,other的_content是const修饰的,而原本的_content是可修改的
                  //这样就会出现权限放大的情况
    {
        std::swap(_content, other._content); 
        return *this;    //返回对象本身是为了支持连续swap操作/赋值操作
    }

    template<class T> 
    T* get()                       //返回子类对象保存的数据的指针
    {
        //我当前保存的数据类型与想要返回的数据类型不匹配
        assert(typeid(T) == _content->type()) ; //条件成立继续,不成立报错

        return &(((placeholder<T>*)_content)->_val);
        // _content是一个父类指针,先转化成子类.
    }

    template<class T>
    Any& operator=(const T& val)    //赋值运算符重载
    {
        Any(val).swap(*this); //为val构造一个临时Any通用容器对象,然后与自身Any容器的_content指针交换
        //交换后不会再关心自身容器的_content了,因为交换之后,临时对象出了作用域会调用析构函数进行销毁
        
        //此时保留的就是val构造出来的_content;
        return *this;  // 为了支持连续赋值
    }

    Any& operator=(const Any& other)
    {
        Any(other).swap(*this);
        //调用拷贝构造为other对象构造一个新的临时Any对象,用该对象与我自身的_content指针交换,交换后
        //原来_content指针保存的数据就跟随临时对象调用析构销毁了

        //这样既不影响到other,又能销毁原指针指向的数据。
        return *this;   //支持连续的赋值重载
    }

服务器设计模式

服务器使用Reactor的事件监听和分发处理分离的设计模式。

简单解释就是:谁触发了我的事件,我就处理谁。
在这里插入图片描述
下面有三种模式介绍,但本次项目最主要采用的是第三种模式:多Reactor多线程模式。

1)单Reactor单线程模式

在这里插入图片描述

优点:所有操作均在单一线程完成,实现较为简单,没有线程间竞争问题。
缺点:无法有效利用CPU多核资源,容易达到性能瓶颈。

2)单Reactor多线程模式

在这里插入图片描述

优点:利用了CPU多核的优势,提高了性能,将业务处理和IO过程分离,降低了代码的耦合度。
缺点:单Reactor线程包含了对客户端进行监听和响应,在高并发情况下,仍然是一个串行化过程,也就是在每一时刻都有多个客户端发起请求,单Reactor线程需要处理完上一个客户端的请求才能处理下个客户端的请求,只有处理完请求时,才能处理下一波客户端的新连接,容易达到性能瓶颈。

3)多Reactor多线程模式(本次项目主要借鉴的模式)

在这里插入图片描述

优点:充分利用CPU多核资源,主从Reactor各司其职。
缺点:执行流不是越多越好,如果执行流过多,反而会增加CPU对线程切换调度的成本。

项目实现目标框架

该项目要实现的是一个基于Reactor模式下的
one thread one loop式的高并发服务器。简单理解就是一个Reactor从属线程处理:

  • 1.IO事件监控
  • 2.IO事件处理
  • 3.业务处理

在这里插入图片描述
也就是在多Reactor多线程的基础上,删掉专门进行业务处理的线程池,将IO事件的监听,处理以及业务处理集为一体,由从属Reactor线程一并处理。

下面来介绍Server服务器模块的各个子功能模块。

一、Server服务器模块

该模块就是对所有连接和线程进行管理。
具体分为:

  • 1.监听连接管理
  • 2.通信连接管理
  • 3.超时连接管理

1)Buffer模块

下面是Buffer模块的设计思路:

在这里插入图片描述

2)Socket模块

在这里插入图片描述
Socket模块是对socket套接字的封装,该模块实现了socket套接字的各项操作。

知识点:开启地址重用

端口重用选项(SO_REUSEADDR)允许在同一地址和端口上绑定多个套接字,常用于多线程服务器和快速重启服务器程序。

在这里插入图片描述

/* int setsockopt(int socket,int level,
       int opt_name,void* opt_val,socklen_t opt_len);
*/
 void ReuseAddress()
 {
     int val = 1;
                                 //SOL_SOCKET是对套接字操作级别
     int ret = setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR,(void*)&val,sizeof(val));
     val = 1;
     int ret1 = setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT,(void*)&val,sizeof(val));

 }

3)Channel模块

在这里插入图片描述
Channel模块相当于是为socket监控到的文件描述符而生的。

Channel模块类设计思想
在这里插入图片描述

需要注意的是:Channel模块并不是独立的,由于要对文件描述符对应的连接进行事件监控,所以要调用Poller模块的功能进行具体的事件监控。

4)Poller模块

Poller是用来监听管理Channel的,相当于所有一对一服务员的总经理。

监控IO事件,相当于把所有IO事件文件描述符都放到一个具体的容器中管理起来。
Poller模块的底层数据结构就是使用红黑树将一个个要监控的时间添加进红黑树的节点中。
Poller模块设计思路:

在这里插入图片描述
在这里插入图片描述

5)EventLoop模块

系统接口eventfd——线程间的事件通知

在这里插入图片描述
总结:
向eventfd返回的文件描述符写入数据时,会触发事件通知,如果其他进程或线程正在等待(例如通过 poll()、select() 或 epoll() 监听该文件描述符),它们会被唤醒。
读取eventfd返回的文件符的数据时,会消费事件。


EventLoop模块可以理解成上边所说的Reactor模块,它是对Poller模块,TimerQueue模块,
Socket模块的⼀个整体封装,进⾏所有描述符的事件监控。
同时,EventLoop模块一定是一个EventLoop对应一个Connection连接。

Eventoop模块为了保证线程安全,因此必须保证整个Connection连接的所有操作都要在同一线程下执行。

由于EventLoop模块监控了所有事件,且添加了上面的TimerWheel时间轮,所以可以为每个链接都设置一个非活跃链接定时销毁任务,只要链接在规定时间内没有进行通信,就释放该链接,避免链接空耗资源。

如何保证一个Connection连接的所有操作都放在同一个线程中执行?
将该链接的所有操作都放入到一个QueueTask任务队列中,由EventLoop进行管理。
当事件处理完成后,再在EventLoop对应的线程中执行任务队列的操作。

但是,因为EventLoop的所有事件监控都交给了Poller模块中的epoll监控,如果没有事件就绪,epoll就会阻塞住,则任务队列中的任务就不能执行。
为了解决这个问题,eventfd的作用就出现了:当向任务队列中添加任务时,就可以通过向eventfd中写入数据来唤醒正在阻塞的epoll,从而让任务池的操作能够执行。

EventLoop具体操作流程:

  • EventLoop的Poller模块对当前管理的所有文件描述符进行事件监控,当有事件到来时,将事件分发给文件描述符对应的Channel进行事件处理。
  • 当所有的就绪事件都处理完毕后,对任务队列QueueTask中的所有任务进行执行。
  • 由于epoll的事件监控,有可能因为没有事件到来而阻塞,导致任务不能有效执行。所以创建了eventfd,添加到Poller的事件监控中,每次向任务队列中添加任务时,通过向eventfd写入数据,来唤醒阻塞的epoll。

EventLoop联合其他模块的思想流程图:

在这里插入图片描述

在这里插入图片描述

6)TimerTask和TimerWheel模块

7) Connection模块

Connection模块是对Buffer,Socket,Channel模块进行整合并封装,实现整体对套接字的管理。
Connection模块为每一个服务器Accept到的新链接都设置一个Connection对象进行管理。
对链接的所有具体操作,都是通过这个模块提供的功能完成。

在Connection中管理的几个部分:

  • 1.套接字管理
  • 2.连接的事件管理
  • 3.缓冲区管理,将Socket模块接收到的数据拿过来放到我当下的缓冲区模块中。
  • 4.协议上下文的管理(Any类)(有可能请求还不完整,或者请求的正文还没有发送完,则将数据保存到Any类中,等待下次请求发送完成后再取出来进行解析。
  • 5.回调函数的管理,设置各种回调函数,然后交给Channel模块具体执行。

Connection模块设计思想:
在这里插入图片描述

为什么要将Conenction模块交给shared_ptr进行管理呢?
因为一旦要对Connection链接进行操作,而Connection链接已经释放,其他地方并不知道Connection释放了,这就导致内存错误访问。
使用shared_ptr对Connection进行操作的时候,保存了一份shared_ptr,因此就算其他地方进行了释放操作,也只是对shared_ptr的计数器-1.并不会导致Connection释放。

实现操作:让Connection继承std::enable_shared_from_this<Connection> ,这样在对Connection操作时,就可以安全地传递一份shared_ptr<Connection>

Connection模块联合其他模块的功能关系流程图:

在这里插入图片描述
注意,Connection模块也不是独立的模块,它不仅仅对上述模块整合,由于该服务器是多Reactor多线程模式,有多线程就涉及到线程安全问题,所以有些操作必须只能在线程内执行,所以就关联到EventLoop模块。

8)Acceptor模块

设计思想:
在这里插入图片描述

9)LoopThread模块

线程间的同步和互斥知识点

线程的同步就是当多个线程获取同一个共享资源时,需要进行排队,不能每个线程都并发获取同一个共享资源。所以线程的同步就是线程排队获取共享资源。

线程间的互斥,当我这个线程在申请共享资源时,你的线程就不能申请共享资源。
这样实现了一个共享资源(临界资源)只能同时有一个线程申请。这就是线程间的互斥。

可以看到,线程同步和互斥是同时存在的。

在这里插入图片描述

10)LoopThreadPool模块

设计思想:
在这里插入图片描述

二、HTTP协议模块

需要注意的是HTTP协议是⼀个运⾏在TCP协议之上的应⽤层协议,这⼀点本质上是告诉我们,HTTP服务器其实就是个TCP服务器,只不过在应⽤层基于HTTP协议格式进⾏数据的组织和解析来明确客⼾端的请求并完成业务处理。

因此实现HTTP服务器简单理解,只需要以下⼏步即可

  1. 搭建⼀个TCP服务器,接收客户端请求。
  2. 以HTTP协议格式进⾏解析请求数据,明确客户端⽬的。
  3. 明确客户端请求⽬的后提供对应服务。
  4. 将服务结果⼀HTTP协议格式进⾏组织,发送给客户端

设计思想

在这里插入图片描述

应用层协议:Http服务器设计

2)HttpRequest模块

在这里插入图片描述

3)HttpResponse模块

在这里插入图片描述

4)HttpContext对Http请求进行解析模块

这个模块是⼀个HTTP请求接收的上下⽂模块,主要是为了防⽌在⼀次接收的数据中,不是⼀个完整的HTTP请求,则解析过程并未完成,⽆法进⾏完整的请求处理,需要在下次接收到新数据后继续根据上下⽂进⾏解析,最终得到⼀个HttpRequest请求信息对象,因此在请求数据的接收以及解析部分需要⼀个上下⽂来进⾏控制接收和处理节奏。

在这里插入图片描述

HttpServer模块:对前面几个模块的功能整合

在这里插入图片描述

遇到的问题

1.vector定义在_capacity之后,导致vector在使用时没有开空间而直接使用[]访问导致空间使用错误问题。

在这里插入图片描述

2.再调试Buffer类的接口函数时,预期结果是从i = 1打印到i = 300

在这里插入图片描述
然而却出现了断层现象。
经过调试发现,在这里插入图片描述
在获取write前沿空间大小时,代码写错了,应该用size的。

此时混淆了capacity()和size()函数的区别
以及忘记了reserve()和resize()函数的具体用法。
遂立即复习。

3.

在这里插入图片描述

在这里插入图片描述

4.对Connection联合调试时,未对_socket进行初始化,导致后续无法recv

知识复习:在类的成员函数后面加上const,表示告诉编译器我不会在函数内部对this指针进行修改。

5. 请求方法必须是Get/Head方法

在对http服务器进行测试时。
在判断一个资源是否是静态资源的时候,请求的方法应该必须是GET方法或者是HEAD方法。
所以在逻辑判断时:

if(req._method != “GET” && req._method != “HEAD”)
如果请求方法不是GET方法且不是HEAD方法,则判断错误,return false;

而我的代码写成了

if(req._method != “GET” || req._method != “HEAD”)
意味着就算该方法是GET方法,同样满足req._method != "HEAD"这个判断条件,也会退出。

同时还学到了如何进行调试,先根据正常的HTTP访问的流程,客户端对服务器访问发送数据时,服务器肯定会对到来的新链接设置回调处理,然后对HTTP请求进行解析,然后响应给客户端。
根据这个流程看调用了哪些函数,分别进入了哪些函数,然后就打印一些调试信息一个个函数地查看是哪个函数的逻辑出现了问题!!

项目涉及到的知识点

md5sum + 文件名

对一个文件进行大量算法计算,最后得出一个标准字符串md5,如果两个文件的md5值是一样的,则说明这两个文件的内容一模一样,只要两个文件内容有一点点点不一样,则得出的md5值也会大有不同。

上传大文件

dd if=/dev/zero of=./hello.txt bs=1G count=1

多态

通用类型容器Any类,保存的是holder类的指针(引用也可以),而holder类被placeholder类继承,所以holder类的指针可以访问到placeholder类的成员函数,而让placeholder类通过实例化对象专门用来保存任意类型的数据。

给服务器上强度

1.长链接请求测试

2.超时链接释放测试

3.数据中多条请求处理测试

4.通过Put方法上传大文件测试

性能测试采⽤webbench进⾏服务器性能测试。Webbench是知名的⽹站压⼒测试⼯具,webbench的
标准测试可以向我们展⽰服务器的两项内容: 每秒钟相应请求数 和 每秒钟传输数据量 ,即QPS和吞
吐。


网站公告

今日签到

点亮在社区的每一天
去签到