【网络编程】三、TCP网络套接字编程

发布于:2025-05-09 ⋅ 阅读:(22) ⋅ 点赞:(0)


在这里插入图片描述

TCP通信流程

​ 下图是创建的 tcp 通信时候,对应时刻的状态、调用函数等信息:
在这里插入图片描述

​ 可见这是比 udp 通信要复杂的,我们不仅仅要搞清楚如何进行通信,还要搞清楚通信的本质以及原理!这里我们先来解决如何搭建通信的环境!

Ⅰ. 服务器日志类实现

​ 一般对于服务器来说,我们都要有日志输出,这方便我们调试和查找错误!

​ 下面的 tcp 服务器中我们也会使用到日志输出,所以我们这里创建一个 log.hpp 来实现一个日志函数!

​ 实现的过程无非就是先定一个日志规则,比如下面的:

[日志等级][时间戳/时间][pid][调用函数:位置][message]

​ 然后就通过一些系统或者库函数来获取需要的信息,比如说时间可以使用三件套:time()localtime()strftime() 三个函数来实现时间的格式化输出;而 pid 可以通过 getpid() 系统调用来获取;而调用函数以及位置我们可以用 C 中自带的宏 __FILE__ 以及 __LINE__ 来获取。

​ 至于日志等级和打印的消息,则由我们调用方来传入参数,传入对应的等级就要对应上要输出到哪个文件中,如下面定义了五个等级,而分为了两个文件,这些都是自定义的!

​ 一般我们都会用到可变参数来作为 message 的内容,所以我们就要用到 C 中的 va_list 类型,然后通过 va_start() 函数和 vnsprintf() 函数来实现格式化!

#pragma once
#include <iostream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>
#include <sys/types.h>
using namespace std;

const char* LOG_NORMAL = "log_normal.txt";
const char* LOG_ERROR =  "log_error.txt";
const int NUM = 1024;
enum Level{
    DEBUG = 0,
    NORMAL,
    WARING, 
    ERROR,
    FATAL
};
const char* to_levelstr(int level)
{
    switch(level)
    {
        case DEBUG: return "DEBUG";
        case NORMAL: return "NORMAL";
        case WARING: return "WARING";
        case ERROR: return "ERROR";
        case FATAL: return "FATAL";
        default: return nullptr;
    }
}

// 日志格式:[日志等级][时间戳/时间][pid][调用函数:位置][message]
void logMessage(int level, const char* format, ...)
{
    // 1. 先将时间戳转化为本地时间然后格式化
    char timebuffer[128];
    time_t timestamp = time(nullptr); 			 // 获取当前时间戳
    struct tm* timeinfo = localtime(&timestamp); // 转化为本地时间结构
    strftime(timebuffer, sizeof(timebuffer), "%Y-%m-%d %H:%M:%S", timeinfo); // 格式化时间字符串

    // 2. 拼凑前缀部分,是固定的
    char prefixbuffer[NUM];
    snprintf(prefixbuffer, sizeof(prefixbuffer), "[%s][%s][%d][%s:%d]", to_levelstr(level), timebuffer, getpid(), __FILE__, __LINE__);

    // 3. 格式化信息部分也就是后缀部分,是可变参数的内容 -- 通过vsnprintf格式化到数组中
    char msgbuffer[NUM];
    va_list start;
    va_start(start, format);
    vsnprintf(msgbuffer, sizeof(msgbuffer), format, start);

    // 4. 写到特定等级的文件中去
    FILE* normal = fopen(LOG_NORMAL, "a");
    FILE* error = fopen(LOG_ERROR, "a");
    if(normal != nullptr && error != nullptr)
    {
        FILE* toward = nullptr;
        if(level == Level::DEBUG || level == Level::NORMAL || level == Level::WARING)
            toward = normal;
        if(level == Level::ERROR || level == Level::FATAL)
            toward = error;
        
        if(toward != nullptr)
            fprintf(toward, "%s%s\n", prefixbuffer, msgbuffer);
        
        // 记得要关闭文件,否则不断调用之后,会占据很多描述符!
        fclose(normal);
        fclose(error);
    }
}

Ⅱ. TCP服务端

1、服务器创建流程

  1. 创建监听套接字(socket
  2. 绑定端口号和 IP 地址。这个端口号是写死的,而 IP 地址设为 0.0.0.0INADDR_ANY 两种写法(bind
  3. 服务器设置 socket 为监听状态(listen
  4. 服务器获取客户端连接请求(accept
  5. 服务器进行收发消息(send/recv

2、创建套接字 – socket

TCP 服务器在调用 socket 函数创建套接字时,参数设置如下:

  • 协议家族选择 AF_INET
  • 创建套接字时所需的服务类型应该是 SOCK_STREAM,因为我们编写的是 TCP 服务器,SOCK_STREAM 提供的就是一个 有序的、可靠的、全双工的、基于连接的字节流式服务。而之前的 UDP 服务器采用的 SOCK_DRAGM 则是无序、不可靠、面向数据报的服务!
  • 协议类型默认设置为 0 即可,让操作系统根据我们选择的服务类型去自己选择即可!
socket(AF_INET, SOCK_DGRAM, 0);   // 这是UDP网络通信:面向数据报
socket(AF_INET, SOCK_STREAM, 0);  // 这是TCP网络通信:面向字节流

​ 如果创建套接字后获得的文件描述符是小于 0 的,说明套接字创建失败,此时也就没必要进行后续操作了,直接终止程序即可。

3、绑定服务器 – bind

​ 套接字创建完毕后我们实际只是在系统层面上打开了一个文件,该文件还没有与网络关联起来,因此创建完套接字后我们还需要调用 bind 函数进行绑定操作。绑定操作和 UDP 是一模一样的,这里就不细讲,只说一下步骤:

  1. 定义一个 struct sockaddr_in 结构体,将服务器网络相关的属性信息填充到该结构体当中,比如协议家族、IP地址、端口号等。
  2. 填充服务器网络相关的属性信息时,协议家族对应就是 AF_INET,端口号就是当前TCP服务器程序的端口号。在设置端口号时,需要调用 htons 函数将端口号由主机序列转为网络序列。
  3. 如果使用的是云服务器,那么在设置服务器的 IP 地址时,不需要显示绑定 IP 地址,直接将 IP 地址设置为 INADDR_ANY 即可,此时服务器就可以从本地任何一张网卡当中读取数据。此外,由于 INADDR_ANY 本质就是 0,因此在设置时不需要进行网络字节序的转换。
  4. 填充完服务器网络相关的属性信息后,需要调用 bind 函数进行绑定。绑定实际就是将文件与网络关联起来,如果绑定失败也没必要进行后续操作了,直接终止程序即可。

🎏4、服务器监听 – listen

​ 因为 TCP 是面向连接的协议,需要使用 listen 函数将一个套接字设置为监听状态,直到收到客户端的建立连接请求!

#include <sys/types.h>          
#include <sys/socket.h>
int listen(int sockfd, int backlog);
  • 作用: 用于将一个套接字设置为监听模式,以便接受客户端的连接请求。
  • 参数:
    • sockfd:需要设置为监听模式的套接字描述符
    • backlog:在 linux 中,可以暂时简单理解为指定 完全连接队列 的最大长度,一般不要设置太大,设置为 510 即可。
      • 如果有多个客户端同时发来连接请求,它们会被放入到 半连接队列 中,等到建立连接完成之后就会放到该完全连接队列中,这里的 backlog 指的就是后面这个完全连接队列的最大长度,而前面的半连接队列的长度通常是在对应的文件中修改!
      • 这里不做详细介绍,后面我们讲完 TCP 协议之后会重新回头来介绍!
  • 返回值:
    • 成功返回 0
    • 失败返回 -1,并设置 errno 变量以指示错误类型。

说明一下:

  • 初始化 TCP 服务器时创建的套接字并不是普通的套接字,而应该叫做 监听套接字。为了表明寓意,我们将代码中套接字的名字由 sockfd 改为 _listenfd
  • 在初始化 TCP 服务器时,只有创建套接字成功、绑定成功、监听成功,此时 TCP 服务器的初始化才算完成!

🎏5、获取客户端连接请求 – accept

TCP 服务器初始化后就可以开始运行了,但 TCP 服务器在与客户端进行网络通信之前,服务器需要先获取到客户端的连接请求。前面的 listen 只是对连接请求进行监听,要获取的话得用下面的 accept 函数才能获取到客户端的连接请求:

#include <sys/types.h>         
#include <sys/socket.h>

int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
  • 作用:用于从已经建立连接状态的完全连接队列中取出一个连接请求,并创建一个新的套接字用于与客户端进行通信
  • 参数:
    • sockfd:处于监听状态的套接字描述符
    • addr:指向一个 sockaddr 结构体的指针,用于存储客户端的地址信息。如果给 addr 参数传 NULL,表示不关心客户端的地址。
    • addrlenaddr 结构体的长度,是一个输入输出型参数,传入的是调用者提供的缓冲区 addr 的长度,以避免缓冲区溢出问题;传出的是客户端地址结构体的实际长度(有可能没有占满调用者提供的缓冲区)
  • 返回值:
    • 成功 返回一个新的套接字描述符,用于与客户端进行通信,这个新的套接字描述符是唯一的,只能用于与这个客户端进行通信
    • 失败返回 -1,并设置 errno 变量以指示错误类型。

使用说明:

  • 三次握手完成后,服务器才会调用 accept 函数接受连接。
  • accept 函数是一个 阻塞调用,即在没有新的连接请求到达时,它会一直等待。因此,通常会将 accept 函数放在一个循环中,以便不断接受新的连接请求。

accept函数返回的套接字描述符是什么,不是已经有一个了吗❓❓❓

​ 调用 accept 函数获取连接时,是从监听套接字当中获取的。如果 accept 函数获取连接成功,此时会返回接收到的套接字对应的文件描述符。

​ 监听套接字与 accept 函数返回的套接字的区别与作用如下所示:

  • 监听套接字:用于获取客户端发来的连接请求。accept 函数会不断从监听套接字当中获取新连接。
  • accept 函数返回的套接字:用于为本次 accept 获取到的连接提供服务。监听套接字的任务只是不断获取新连接,而真正为这些连接提供服务的套接字是 accept 函数返回的套接字,而不是监听套接字!

在这里插入图片描述

​ 举个简单的例子,监听套接字描述符 _listenfd(自定义的名称)和 accept 函数返回的描述符的关系,就相当于一个餐厅的人员,前者是在门店外面拉人的服务员,而一旦它把客人拉进来了,剩下的任务就不归它管了!剩下的工作就是后者来负责,其相当于是服务员,客人可以和服务员进行沟通选择菜系、座位等等需求,它们两个角色是工作任务是区分开的!

6、数据的发送与接收 – send && recv

​ 对于面向连接的 tcp 协议,通常我们使用 sendrecv 两个接口来进行数据的发送和接收,这和 udp 是区分开的!因为 tcp 在发送和接收数据的时候,其实已经是建立连接的了,所以不需要去关心对方的 sockaddr 结构体等信息!

​ 下面是发送数据的接口:

#include <sys/types.h>
#include <sys/socket.h>
ssize_t send(int sockfd, const void *buf, size_t len, int flags);
  • 参数说明:
    • sockfd:表示要发送数据的套接字文件描述符。
    • buf:指向要发送数据的缓冲区的指针。
    • len:表示要发送的数据的长度。
    • flags:一般设为 0 即可。用于指定发送操作的可选标志,如 MSG_DONTWAITMSG_NOSIGNAL 等。
  • 返回值:
    • 如果成功发送数据,返回实际发送的字节数
    • 如果发生错误,返回 -1,并设置 errno 来指示具体的错误类型。

​ 使用 send 函数时,需要确保套接字已经连接到目标地址,并且缓冲区中的数据已经准备好要发送。如果发送的数据长度超过套接字的发送缓冲区大小,send 函数可能会阻塞,直到有足够的空间来存储数据。

​ 需要注意的是,send 函数是一个阻塞调用,它会一直等待直到数据发送完成或发生错误。如果需要进行非阻塞的发送操作,可以使用 send 函数的 MSG_DONTWAIT 标志或使用非阻塞套接字。


​ 下面是接收数据的接口:

#include <sys/types.h>
#include <sys/socket.h>
ssize_t recv(int sockfd, void *buf, size_t len, int flags);
  • 参数说明:
    • sockfd:表示要接收数据的套接字描述符。
    • buf:指向接收数据的缓冲区的指针。
    • len:表示接收数据的最大长度。
    • flags:一般设为 0 即可。指定接收数据的行为选项,如是否设置为非阻塞模式等
  • 返回值:
    • 如果成功接收到数据,返回接收到的字节数
    • 如果连接已关闭,返回 0
    • 如果发生错误,返回 -1,并设置 errno 来指示具体的错误原因。

recv 函数是一个阻塞调用,当没有数据可接收时,它会一直等待,直到有数据到达或发生错误。如果需要非阻塞地接收数据,可以使用 recv 函数的非阻塞模式或结合使用 selectpoll 等函数来实现。

7、服务器框架搭建

① 服务器类实现 – tcpserver.hpp

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/wait.h>
#include "log.hpp"
using namespace std;

namespace Server
{
    enum {
        SOCKET_ERR = 1,
        BIND_ERR,
        LISTEN_ERR,
        ACCEPT_ERR,
        READ_ERR,
        WRITE_ERR
    };

    const int gbacklog = 5;      // 全连接队列的最大长度
    const uint16_t gport = 8080; // 默认端口号

    class tcpServer
    {
    private:
        string _ip;      // ip地址
        uint16_t _port;  // 端口号
        int _listenfd;   // 监听文件描述符,它是用来监听链接到来,获取新链接的!
        
    public:
        tcpServer(const uint16_t& port = gport) : _port(port), _listenfd(-1)
        {}

        void initServer()
        {
            // 1. 创建套接字
            _listenfd = socket(AF_INET, SOCK_STREAM, 0);
            if(_listenfd == -1)
            {
                logMessage(Level::FATAL, "socket error"); // 使用日志函数输出日志
                exit(SOCKET_ERR);
            }
            logMessage(Level::NORMAL, "socket success: %d", _listenfd);

            // 2. 绑定信息
            struct sockaddr_in local;
            memset(&local, 0, sizeof local);
            local.sin_family = AF_INET;
            local.sin_port = htons(_port);
            local.sin_addr.s_addr = INADDR_ANY;

            if((bind(_listenfd, (struct sockaddr*)&local, sizeof local)) < 0)
            {
                logMessage(Level::FATAL, "bind error");
                exit(BIND_ERR);
            }
            logMessage(Level::NORMAL, "bind success");

            // 3. 设置listen监听状态
            if(listen(_listenfd, gbacklog) < 0)
            {
                logMessage(Level::FATAL, "listen error");
                exit(LISTEN_ERR);
            }
            logMessage(Level::NORMAL, "listen success");
        }

        void start()
        {
            while(true)
            {
                // 4. 若监听到客户端的信息之后,进行accept
                struct sockaddr_in peer;
                socklen_t len = sizeof(peer);
                int server_sockfd = accept(_listenfd, (struct sockaddr*)&peer, &len);
                if(server_sockfd == -1)
                {
                    logMessage(Level::ERROR, "accept error");
                    continue;
                }
                logMessage(Level::NORMAL, "accept success, get new sockfd: %d", server_sockfd);

                // version1,客户端之间的通信都是阻塞的
                service_IO(server_sockfd);
                close(server_sockfd);   // 对使用完的套接字需要关闭,不然导致描述符泄漏问题
            }
        }
		
        // IO函数
        void service_IO(int sockfd)
        {
            char buffer[1024];
            while(true)
            {
                ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
                if(n == -1)
                {
                    logMessage(Level::ERROR, "recv error");
                    exit(1);
                }
                else if(n == 0) // 返回值为0表示客户端退出
                {
                    logMessage(Level::NORMAL, "client quit and I must quit, too!");
                    break;
                }
                else
                {
                    buffer[n] = '\0';
                    cout << "receive message is: " << buffer << endl;

                    // 写回给客户端
                    string outbuffer = buffer;
                    outbuffer += " server[echo]";
                    send(sockfd, outbuffer.c_str(), outbuffer.size(), 0);
                }
            }
        }  
    };
}

② 服务器主函数实现 – tcpserver.cpp

​ 这和 udp 中服务器的主函数基本是一模一样的!

#include "tcpserver.hpp"
#include <memory>
using namespace Server;

void Usage(string proc)
{
    cout << "Please follow the true usage:\n\t" << proc << " server_port\n";
}

int main(int argc, char* argv[])
{
    if(argc != 2)
    {
        Usage(argv[0]);
        exit(1);
    }
    uint16_t port = atoi(argv[1]);

    unique_ptr<tcpServer> tcpserver(new tcpServer(port));
    tcpserver->initServer();
    tcpserver->start();
    return 0;
}

在这里插入图片描述

Ⅲ. TCP客户端

1、客户端创建流程

  1. 创建套接字(socket
  2. 客户端需要 bind,但是客户端的绑定不需要我们自己写,操作系统会去绑定,也就是 无需手动 bind
  3. 客户端发起连接请求(connect
  4. 客户端进行收发消息(send/recv

2、客户端发起连接请求 – connect

​ 由于客户端不需要绑定,也不需要监听,因此当客户端创建完套接字后就可以向服务端发起连接请求。

​ 发起连接请求的函数叫做 connect,该函数的函数原型如下:

#include <sys/types.h>        
#include <sys/socket.h>
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
  • 参数说明:
    • sockfd:特定的套接字,表示通过该套接字发起连接请求。
    • addr:对端网络相关的属性信息,包括协议家族、IP地址、端口号等。
    • addrlen:传入的 addr 结构体的长度
  • 返回值:
    • 成功返回 0
    • 失败返回 -1,同时错误码会被设置。

​ 可以看出,调用 connect 函数向服务端发起连接请求时,需要传入服务端对应的网络信息,否则 connect 函数也不知道该客户端到底是要向哪一个服务端发起连接请求。

3、客户端框架搭建

① 客户端类实现 – tcpclient.hpp

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
using namespace std;

namespace Client
{
    const int NUM = 1024;

    class tcpClient
    {
    public:
        tcpClient(const string& ip, const uint16_t& port)
            :_destip(ip), _destport(port), _socketfd(-1)
        {}

        void initClient()
        {
            // 1.创建套接字
            _socketfd = socket(AF_INET, SOCK_STREAM, 0);
            if(_socketfd < 0)
            {
                std::cerr << "socket create error" << std::endl;
                exit(2);
            }
            // 2. tcp的客户端要不要bind?要的! 要不要显示的bind?不要!这里尤其是client port要让OS自定随机指定!
            // 3. 要不要listen?不要!
            // 4. 要不要accept? 不要!
            // 5. 要什么呢??要发起连接!
        }

        void run()
        {
            struct sockaddr_in server;
            memset(&server, 0, sizeof server);
            server.sin_family = AF_INET;
            server.sin_port = htons(_destport);
            server.sin_addr.s_addr = inet_addr(_destip.c_str());

            // 连接使用的是connect函数
            int n = connect(_socketfd, (struct sockaddr*)&server, sizeof server);
            if(n == -1)
            {
                cerr << "socket connect error" << std::endl;
            }
            else
            {
                std::string msg;
                while(true)
                {
                    cout << "Enter# ";
                    getline(cin, msg);
                    send(_socketfd, msg.c_str(), msg.size(), 0);

                    char buffer[NUM];
                    int n = recv(_socketfd, buffer, sizeof(buffer)-1, 0);
                    if(n > 0)
                    {
                        // 目前我们把读到的数据当成字符串, 截止目前
                        buffer[n] = 0;
                        cout << "Server回显# " << buffer << endl;
                    }
                    else
                    {
                        break;
                    }
                }
            }
        }

        ~tcpClient()
        {
            if(_socketfd >= 0) 
                close(_socketfd);
        }
    private:
        int _socketfd;
        string _destip;
        uint16_t _destport;
    };
}

② 客户端主函数实现 – tcpclient.cpp

​ 主函数没啥好说的,都是一样的!

#include "tcpclient.hpp"
#include <memory>
using namespace Client;

void Usage(string proc)
{
    cout << "Please follow the true usage:\n\t" << proc << " dest_ip dest_port\n\n";
}

int main(int argc, char* argv[])
{
    if(argc != 3)
    {
        Usage(argv[0]);
        exit(1);
    }
    string ip = argv[1];
    uint16_t port = atoi(argv[2]);

    unique_ptr<tcpClient> tcpclient(new tcpClient(ip, port));
    tcpclient->initClient();
    tcpclient->run();
    return 0;
}

在这里插入图片描述

Ⅳ. 多进程版的TCP服务器

单执行流的服务器的问题

​ 首先我们先来看一下之前单进程版服务器带来的缺点:

在这里插入图片描述

​ 然后我们把客户端一号退出,看看此时的情况:
在这里插入图片描述

​ 单执行流的服务器一次只能给一个客户端提供服务,此时服务器的资源并没有得到充分利用,因此服务器一般是不会写成单执行流的。要解决这个问题就需要将服务器改为多执行流的,此时就要 多进程多线程

多进程版的TCP服务器

​ 当服务端调用 accept 函数获取到新连接后不是由当前执行流为该连接提供服务,而是 当前执行流调用 fork 函数创建子进程,然后让子进程为父进程获取到的连接提供服务。

​ 由于父子进程是两个不同的执行流,当父进程调用 fork 创建出子进程后,父进程就可以继续从监听套接字当中获取新连接,而不用关心获取上来的连接是否服务完毕。

​ 需要注意的是,文件描述符表是隶属于一个进程的,而 子进程创建后会继承父进程的文件描述符表。比如父进程打开了一个文件,该文件对应的文件描述符是 3,此时父进程创建的子进程的 3 号文件描述符也会指向这个打开的文件,而如果子进程再创建一个子进程,那么子进程创建的子进程的 3 号文件描述符也同样会指向这个打开的文件。

在这里插入图片描述

​ 但当父进程创建子进程后,父子进程之间会保持独立性,此时父进程文件描述符表的变化不会影响子进程。最典型的代表就是匿名管道,父子进程在使用匿名管道进行通信时,父进程先调用 pipe 函数得到两个文件描述符,一个是管道读端的文件描述符,一个是管道写端的文件描述符,此时父进程创建出来的子进程就会继承这两个文件描述符,之后父子进程一个关闭管道的读端,另一个关闭管道的写端,这时父子进程文件描述符表的变化是不会相互影响的,此后父子进程就可以通过这个管道进行单向通信了。

​ 对于套接字文件也是一样的,父进程创建的子进程也会继承父进程的套接字文件,此时子进程就能够对特定的套接字文件进行读写操作,进而完成对对应客户端的服务。

等待子进程问题

​ 当父进程创建出子进程后,父进程是需要等待子进程退出的,否则子进程会变成僵尸进程,进而造成内存泄漏。因此服务端创建子进程后需要调用 waitwaitpid 函数对子进程进行等待。

阻塞式等待与非阻塞式等待:

  • 如果服务端采用阻塞的方式等待子进程,那么服务端还是需要等待服务完当前客户端,才能继续获取下一个连接请求,此时服务端仍然是以一种串行的方式为客户端提供服务。
  • 如果服务端采用非阻塞的方式等待子进程,虽然在子进程为客户端提供服务期间服务端可以继续获取新连接,但此时服务端就需要将所有子进程的PID保存下来,并且需要不断花费时间检测子进程是否退出。

​ 总之,服务端要等待子进程退出,无论采用阻塞式等待还是非阻塞式等待,都不尽人意。此时我们可以 考虑让服务端不等待子进程退出。让父进程不等待子进程退出,常见的方式有两种:

  1. 捕捉 SIGCHLD 信号,将其处理动作设置为忽略。
  2. 让父进程创建子进程,子进程再创建孙子进程,最后让孙子进程为客户端提供服务。

​ 下面我们演示第一种方式,进行捕捉 SIGCHLD 信号并进行忽略(代码的注释中其实也包含了第二种方式,有兴趣可以看看注释),所以此时多进程版本的服务器如下:(因为篇幅原因,与上面一样的代码就用省略号代替了!)

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/wait.h>
#include <signal.h>
#include <pthread.h>
#include "log.hpp"
using namespace std;

namespace Server
{
    enum {
        SOCKET_ERR = 1,
        BIND_ERR,
        LISTEN_ERR,
        ACCEPT_ERR,
        READ_ERR,
        WRITE_ERR
    };

    const int gbacklog = 5;      // 全连接队列的最大长度
    const uint16_t gport = 8080; // 默认端口号

    class tcpServer
    {
    private:
        string _ip;      // ip地址
        uint16_t _port;  // 端口号
        int _listenfd;   // 监听文件描述符,它是用来监听链接到来,获取新链接的!
        
    public:
        tcpServer(const uint16_t& port = gport) : _port(port), _listenfd(-1)
        {}
        void initServer()
        {
            // ……
        }
        void start()
        {
            // 将SIGCHLD信号忽略之后,父进程才不会阻塞去等待子进程的退出,而是交给操作系统去回收子进程
            signal(SIGCHLD, SIG_IGN);
            while(true)
            {
                // 4. 若监听到客户端的信息之后,进行accept
                struct sockaddr_in peer;
                socklen_t len = sizeof(peer);
                int server_sockfd = accept(_listenfd, (struct sockaddr*)&peer, &len);
                if(server_sockfd == -1)
                {
                    logMessage(Level::ERROR, "accept error");
                    continue;
                }
                logMessage(Level::NORMAL, "accept success, get new sockfd: %d", server_sockfd);
                cout << "accept success, get new sockfd: " << server_sockfd << endl; // 显示打印当前的描述符

                // version2 多进程版本,主要学习思想,借助的是不被父进程负责回收的子进程或者孙子进程
                pid_t id= fork();
                if(id == 0)
                {
                    // 子进程:
                    close(_listenfd); // 关闭子进程拷贝的父进程中不必要的描述符比如这里的监听,以免误操作
                    
                    // if(fork() > 0) // 让当前的子进程退出
                    //     exit(1); 
                    // 也就是说到下面就剩下一个孙子进程,而它变成了孤儿进程,由OS来管理回收
                    // 这是属于第二种方式
                    
                    service_IO(server_sockfd); 
                    close(server_sockfd);
                    exit(0);
                }
                
                // 父进程:下面注释部分是第二种方式
                // 如果不是用信号忽略子进程的捕捉的话,那么就要等待,并且要阻塞等待
                // 因为非阻塞等待的话,如果描述符用完了,但是轮询时候访问到上面的accept函数
                // accept的返回值就是错误的也就是-1,那么就会一直continue了,造成无效的死循环,也就是回收不了进程了
                
                close(server_sockfd); // 注意这里描述符要关,防止文件不够用
                
                // pid_t ret = waitpid(id, nullptr, 0); 
                // if(ret > 0)
                //     cout << "wait success, this child id is " << ret << endl;
            }
        }

        void service_IO(int sockfd)
        {
            // ……
        }  
    };
}

在这里插入图片描述

​ 这样子我们就能实现简单的服务器并发服务了!

Ⅴ. 多线程版的TCP服务器

​ 我们都知道,创建进程的成本是很高的,创建进程时需要创建该进程对应的进程控制块(task_struct)、进程地址空间(mm_struct)、页表等数据结构。而创建线程的成本比创建进程的成本会小得多,因为线程本质是在进程地址空间内运行,创建出来的线程会共享该进程的大部分资源,因此在实现 多执行流的服务器时最好采用多线程进行实现。

​ 当服务进程调用 accept 函数获取到一个新连接后,就可以直接创建一个新线程,让该新线程为对应客户端提供服务。

​ 当然,主线程(服务进程)创建出新线程后,也是需要等待新线程退出的,否则也会造成类似于僵尸进程这样的问题。但对于线程来说,如果不想让主线程等待新线程退出,可以 让创建出来的新线程调用 pthread_detach 函数进行线程分离,当这个线程退出时系统会自动回收该线程所对应的资源。此时主线程(服务进程)就可以继续调用 accept 函数获取新连接,而不会因为等待线程退出而阻塞,而让新线程去服务对应的客户端!

各个线程共享同一张文件描述符表

​ 文件描述符表维护的是进程与文件之间的对应关系,因此一个进程对应一张文件描述符表。而主线程创建出来的新线程依旧属于这个进程,因此创建新线程时并不会为该线程创建独立的文件描述符表,所有的线程看到的都是同一张文件描述符表

在这里插入图片描述

​ 因此当服务进程(主线程)调用 accept 函数获取到一个文件描述符后,其他创建的新线程是能够直接访问这个文件描述符的。

​ 需要注意的是,虽然新线程能够直接访问主线程 accept 上来的文件描述符,但此时 新线程并不知道它所服务的客户端对应的是哪一个文件描述符,因此主线程创建新线程后需要告诉新线程对应应该访问的文件描述符的值,也就是告诉每个新线程在服务客户端时,应该对哪一个套接字进行操作。

文件描述符关闭的问题

由于此时所有线程看到的都是同一张文件描述符表,因此当某个线程要对这张文件描述符表做某种操作时,不仅要考虑当前线程,还要考虑其他线程。

  • 对于主线程 accept 上来的文件描述符,主线程不能对其进行关闭操作,该文件描述符的关闭操作应该由新线程来执行。因为是新线程为客户端提供服务的,只有当新线程为客户端提供的服务结束后才能将该文件描述符关闭。
  • 对于监听套接字,虽然创建出来的新线程不必关心监听套接字,但 新线程不能将监听套接字对应的文件描述符关闭,否则主线程就无法从监听套接字当中获取新连接了。

类内使用线程函数的问题

​ 首先,由于调用 pthread_create 函数创建线程时,新线程的执行函数是一个参数为 void*,返回值为 void* 的函数。如果我们 要将这个执行函数定义到类内,就需要将其定义为静态成员函数,否则这个执行函数的第一个参数是隐藏的 this 指针,就会报错!

​ 但在线程的执行例程当中会调用 service_IO 函数,由于线程执行函数是静态成员函数,静态成员函数无法调用非静态成员函数,因此我们需要做一些处理!方法有很多,比如说下面的方法:

  • service_IO 函数定义为静态成员函数,因为恰好 service_IO 函数内部进行的操作都是与类无关的,因此我们直接在 service_IO 函数前面加上一个 static 即可。
  • 封装一个 ThreadData 对象,其中包含的就是 tcpserver 对象的指针,然后在使用 pthread_create 的时候通过参数 args 传递过去,接着在线程执行函数中通过 ThreadData 对象找到 tcpserver 对象指针来访问类内的 service_IO 函数!

​ 这里 采用的是第二种方式,因为一般我们在线程执行函数中,还得关闭对应的套接字描述符,所以我们可以将套接字描述符也存放在 ThreadData 对象中一起通过 args 传参过去,将其 void* 类型的参数强转为 ThreadData* 类型,然后就能够拿到客户端对应的套接字、服务器对象的指针,进而调用 service_IO 函数为对应客户端提供服务,并且关闭描述符!

​ 所以总体代码如下所示:(因为篇幅原因,与上面一样的代码就用省略号代替了!)

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/wait.h>
#include <signal.h>
#include <pthread.h>
#include "log.hpp"
using namespace std;

namespace Server
{
    enum {
        SOCKET_ERR = 1,
        BIND_ERR,
        LISTEN_ERR,
        ACCEPT_ERR,
        READ_ERR,
        WRITE_ERR
    };

    const int gbacklog = 5;      // 全连接队列的最大长度
    const uint16_t gport = 8080; // 默认端口号

    class tcpServer;	// 注意这里要有前置声明,这样子ThreadData才知道tcpServer对象指针是啥
    class ThreadData	// 封装一个数据对象类
    {
    public:
        tcpServer* _self; // 服务器对象指针
        int _sockfd;	  // 文件描述符
        ThreadData(tcpServer* self, int sockfd)
            :_self(self), _sockfd(sockfd)
        {}
    };

    class tcpServer
    {
    private:
        string _ip;      // ip地址
        uint16_t _port;  // 端口号
        int _listenfd;   // 监听文件描述符,它是用来监听链接到来,获取新链接的!
        
    public:
        tcpServer(const uint16_t& port = gport) : _port(port), _listenfd(-1)
        {}

        void initServer()
        {
            // ……
        }

        void start()
        {
            while(true)
            {
                // 4. 若监听到客户端的信息之后,进行accept
                struct sockaddr_in peer;
                socklen_t len = sizeof(peer);
                int server_sockfd = accept(_listenfd, (struct sockaddr*)&peer, &len);
                if(server_sockfd == -1)
                {
                    logMessage(Level::ERROR, "accept error");
                    continue;
                }
                logMessage(Level::NORMAL, "accept success, get new sockfd: %d", server_sockfd);
                cout << "accept success, get new sockfd: " << server_sockfd << endl;

                // version3 多线程版本,不能线程等待,
                // 因为这样子就变成了线程阻塞的形式了,所以最好就是用线程分离
                // 并且注意主线程不能关闭文件,不然新线程也会收到影响
                pthread_t tid;
                ThreadData* td = new ThreadData(this, server_sockfd);
                pthread_create(&tid, nullptr, thread_routine, td);
            }
        }

        static void* thread_routine(void* args)
        {
            // 记得线程分离
            pthread_detach(pthread_self());

            ThreadData* td = static_cast<ThreadData*>(args);
            td->_self->service_IO(td->_sockfd);
            close(td->_sockfd);
            delete td;
            return nullptr;
        }

        void service_IO(int sockfd)
        {
            // ……
        }  
    };
}

在这里插入图片描述

Ⅵ. 线程池版的TCP服务器

1、多线程版存在的缺点

  • 每当有新连接到来时,服务端的主线程都会重新为该客户端创建为其提供服务的新线程,而当服务结束后又会将该新线程销毁。这样做不仅麻烦,而且 效率低下,每当连接到来的时候服务端才创建对应提供服务的线程。
  • 如果有大量的客户端连接请求,此时服务端要为每一个客户端创建对应的服务线程。计算机当中的线程越多,CPU 的压力就越大,因为 CPU 要不断在这些线程之间来回切换,此时 CPU 在调度线程的时候,线程和线程之间切换的成本就会变得很高。此外,一旦线程太多,每一个线程再次被调度的周期就变长了,而线程是为客户端提供服务的,线程被调度的周期变长,客户端也迟迟得不到应答。

2、解决思路 – 线程池

针对这两个问题,对应的解决思路如下:

  • 可以在服务端 预先创建一批线程,当有客户端请求连接时就让这些线程为客户端提供服务,此时客户端一来就有线程为其提供服务,而不是当客户端来了才创建对应的服务线程。
  • 当某个线程为客户端提供完服务后,不要让该线程退出,而是让该线程继续为下一个客户端提供服务,如果当前没有客户端连接请求,则可以让该线程先进入休眠状态,当有客户端连接到来时再将该线程唤醒。
  • 服务端创建的这一批线程的数量不能太多,此时 CPU 的压力也就不会太大。此外,如果有客户端连接到来,但此时这一批线程都在给其他客户端提供服务,这时服务端不应该再创建线程,而应该让这个新来的连接请求在全连接队列进行排队,等服务端这一批线程中有空闲线程后,再将该连接请求获取上来并为其提供服务。

3、引入线程池

​ 实际要解决这里的问题我们就需要在服务端引入线程池,因为 线程池的存在就是为了避免处理短时间任务时创建与销毁线程的代价,此外,线程池还能够保证内核充分利用,防止过分调度

​ 线程池的工作流程差不多是这样子的:在线程池里面有一个任务队列,当有新的任务到来的时候,就可以将任务 put 到线程池当中。比如在线程池当中我们默认创建了 10 个线程,这些线程不断检测任务队列当中是否有任务,如果有任务就通过 take 函数拿出任务,然后调用该任务对应的 run 函数对该任务进行处理,如果线程池当中没有任务那么当前线程就会进入休眠状态。

​ 在之前的笔记当中已经详细介绍并实现了线程池,这里就直接将线程池的代码接入到当前的 TCP 服务器,因此下面只会讲解线程池接入的方法,如果对线程池的实现有疑问的可以去阅读那篇笔记!

① 线程池实现 – ThreadPool.hpp

​ 该线程池实现中对之前的线程池稍微做了点修改!

#pragma once 
#include <iostream>
#include <vector>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include "LockGuard.hpp"
#include "Thread.hpp"
#include "ThreadData.hpp"
#include "log.hpp"

using namespace ThreadNS;
const int MAXCAP = 10; // 线程池最大线程数量

template <class T>
class ThreadPool
{
public:
    // 启动所有线程的函数
    void run()
    {
        for(const auto& t : _threads)
        {
            // 使用ThreadData类装载线程池和名称,方便后面打印
            ThreadData<T>* td = new ThreadData<T>(this, t->threadname());
            t->start(handlerTask, td);  
            std::cout << t->threadname() << " start......" << std::endl;
            logMessage(Level::DEBUG, "%s start......", t->threadname().c_str());
        }
    }

    // 向任务队列中放置任务的接口
    void put(const T& in)
    {
        // 队列的操作是线程不安全的,所以加锁
        LockGuard lock(&_mutex);
        _task_queue.push(in);
        pthread_cond_signal(&_cond); // 唤醒其中一个线程执行任务
    }

    // 拿取并且弹出任务队列中的任务
    T take()
    {
        // 此时只会有一个线程执行,所以不需要加锁
        T t = _task_queue.front();
        _task_queue.pop();
        return t;
    }

    ~ThreadPool()
    {
        pthread_cond_destroy(&_cond);
        pthread_mutex_destroy(&_mutex);
        for(const auto& t : _threads)
            delete t;
    }

    // c++11方式获取单例对象
    static ThreadPool<T>& GetInstance()
    {
        static ThreadPool<T> single_object;
        return single_object;
    }
private:
    // 封掉拷贝构造和赋值重载
    ThreadPool<T>& operator=(const ThreadPool<T>&) = delete;
    ThreadPool(const ThreadPool<T>&) = delete;

    // 构造函数设为私有
    ThreadPool(const int& maxcap = MAXCAP)
        :_cap(maxcap)
    {
        // 初始化工作
        pthread_cond_init(&_cond, nullptr);
        pthread_mutex_init(&_mutex, nullptr);
        for(int i = 0; i < _cap; ++i)
            _threads.push_back(new Thread());
    }
private:
    // 线程将来在此获取来自任务队列中的任务和执行任务
    static void* handlerTask(void* args)
    {
        ThreadData<T>* td = static_cast< ThreadData<T> *>(args);
        while(true)
        {
            T t;
            {
                LockGuard lock(&td->_threadpool->_mutex);
                while(td->_threadpool->_task_queue.empty())
                {
                    pthread_cond_wait(&td->_threadpool->_cond, &td->_threadpool->_mutex); // 阻塞直到被put了任务后被唤醒
                }
                t = td->_threadpool->take(); // 拿取、弹出任务队列中的任务
            }
            t(); // 执行任务!
        }
        delete td;
        return nullptr;
    }
private:
    int _cap;                      // 线程池容量
    std::vector<Thread*> _threads; // 线程等待容器
    std::queue<T> _task_queue;     // 任务队列
    pthread_cond_t _cond;          // 用来线程等待和唤醒线程的条件变量
    pthread_mutex_t _mutex;        // 互斥锁,保护共享资源--任务队列
};

下面也贴出线程池实现依赖的一些自主头文件:

② 线程的封装类 – Thread.hpp

​ 该线程封装 pthread 的互斥锁,使得使用起来更加方便一些!

#pragma once
#include <iostream>
#include <functional>
#include <string>
#include <pthread.h>
#include <cstdio>

namespace ThreadNS
{
    class Thread
    {
        using func_t = std::function<void*(void*)>;
    private:
        func_t _callback;        // 线程执行函数
        void* _args;             // 线程函数参数
        std::string _threadname; // 线程自定义标识名称
        pthread_t _t;			 // 线程id

        static int _num;         // 计数器,用于标识几号线程
    public:
        Thread()
        {
            char namebuffer[1024];
            snprintf(namebuffer, sizeof namebuffer, "thread%d", _num++);
            _threadname = namebuffer;
        }

        // 创建线程
        void start(func_t callback, void *args = nullptr) 
        {
            _callback = callback;
            _args = args;
            pthread_create(&_t, nullptr, start_routine, this);
        }

        // 等待线程
        void join()
        {
            pthread_join(_t, nullptr);
        }

        std::string threadname()
        {
            return _threadname;
        }
    private:
        // 在类内创建线程,想让线程执行对应的方法,需要将方法设置成为static
        static void* start_routine(void* args) // 类内成员,有缺省参数!
        {
            Thread* _this = static_cast<Thread*>(args);
            return _this->_callback(_this->_args);
        }
    };
    int Thread::_num = 1; // 静态成员类外初始化
}

③ 守卫锁类 – LockGuard.hpp

​ 该类用于线程池中锁更安全的操作!

#pragma once
#include <iostream>
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t* pmutex = nullptr)
        : _pmutex(pmutex)
    {}
    void lock()
    {
        if(_pmutex != nullptr)
            pthread_mutex_lock(_pmutex);
    }
    void unlock()
    {
        if(_pmutex != nullptr)
            pthread_mutex_unlock(_pmutex);
    }
private:
    pthread_mutex_t* _pmutex;
};

class LockGuard
{
public:
    LockGuard(pthread_mutex_t* pmutex)
        : _mutex(pmutex)
    {
        _mutex.lock();
    }
    ~LockGuard()
    {
        _mutex.unlock();
    }
private:
    Mutex _mutex;
};

④ 线程数据成员封装类 – ThreadData.hpp

​ 该类就是为了在线程池中,线程执行函数中传参的类型封装!

#pragma once
#include <iostream>
#include <string>
#include "ThreadPool.hpp"

// 需要有ThreadPool的声明,不然会报错
template <class T>
class ThreadPool;

// 线程池与名称的封装,成员设为public给外部使用
template <class T>
class ThreadData
{
public:
    ThreadData(ThreadPool<T>* threadpool, const std::string& name)
        : _threadpool(threadpool), _name(name)
    {}
public:
    ThreadPool<T>* _threadpool;
    std::string _name;
};

4、设计任务类 – Task.hpp

​ 接下来我们要做的是设计一个任务类 Task,该任务类当中需要包含客户端对应的套接字描述符。

​ 此外,任务类当中需要包含一个函数,当线程池中的线程拿到任务后就会直接调用这个函数对该任务进行处理,而实际处理这个任务的方法就是之前我们在服务器类当中的 service_IO 函数,服务端就是通过调用 service_IO 函数为客户端提供服务的!

​ 我们可以直接拿出服务类当中的 service_IO 函数,将其放到任务类当中作为任务类当中的执行,但这实际不利于软件分层。我们可以给任务类新增一个仿函数成员,当执行任务类当中的执行方法处理任务时就可以以回调的方式处理该任务!

​ 并且,这个执行方法,我们是可以直接通过仿函数来实现的,也就是说,在线程池类中,直接就是通过 类名() 来达到执行函数的效果!

​ 简单地说,就是将原来 tcpserver.hpp 中的 service_IO 函数放到一个新的文件 Task.hpp 中进行实现,然后通过 Task 类进行调用,达到服务器和业务处理的解耦作用!

#pragma once 
#include <iostream>
#include <functional>
#include <cstdio>
#include <string>
#include <fstream>
#include "tcpserver.hpp"
#include "log.hpp"

void service_IO(int sockfd)
{
    char buffer[1024];
    while(true)
    {
        ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
        if(n == -1)
        {
            logMessage(Level::ERROR, "recv error");
            exit(1);
        }
        else if(n == 0) // 返回值为0表示客户端退出
        {
            logMessage(Level::NORMAL, "client quit and I must quit, too!");
            break;
        }
        else
        {
            buffer[n] = '\0';
            cout << "receive message is: " << buffer << endl;

            // 写回给客户端
            string outbuffer = buffer;
            outbuffer += " server[echo]";
            send(sockfd, outbuffer.c_str(), outbuffer.size(), 0);
        }
    }
    close(sockfd); // 记得关闭文件描述符
}  

class Task
{
    using func_t = std::function<void(int)>;
public:
    Task()
    {}
    Task(int sockfd, func_t func)
        :_sockfd(sockfd), _callback(func)
    {}
    void operator()()
    {
        _callback(_sockfd);
    }
private:
    int _sockfd;
    func_t _callback;
};

5、服务器代码 – tcpserver.hpp

​ 其实还差个守护进程加持,可以看下一个笔记!

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/wait.h>
#include <signal.h>
#include <pthread.h>
#include "ThreadPool.hpp"
#include "Task.hpp"
#include "log.hpp"
using namespace std;

namespace Server
{
    enum {
        SOCKET_ERR = 1,
        BIND_ERR,
        LISTEN_ERR,
        ACCEPT_ERR,
        READ_ERR,
        WRITE_ERR
    };

    const int gbacklog = 5;      // 全连接队列的最大长度
    const uint16_t gport = 8080; // 默认端口号

    class tcpServer
    {
    private:
        string _ip;      // ip地址
        uint16_t _port;  // 端口号
        int _listenfd;   // 监听文件描述符
        
    public:
        tcpServer(const uint16_t& port = gport) : _port(port), _listenfd(-1)
        {}
        void initServer()
        {
            // ……
        }

        void start()
        {
            // 初始化线程池
            ThreadPool<Task>::GetInstance().run();
            logMessage(Level::NORMAL, "Thread init success");

            // signal(SIGCHLD, SIG_IGN);
            while(true)
            {
                // 4. 若监听到客户端的信息之后,进行accept
                struct sockaddr_in peer;
                socklen_t len = sizeof(peer);
                int server_sockfd = accept(_listenfd, (struct sockaddr*)&peer, &len);
                if(server_sockfd == -1)
                {
                    logMessage(Level::ERROR, "accept error");
                    continue;
                }
                logMessage(Level::NORMAL, "accept success, get new sockfd: %d", server_sockfd);
                cout << "accept success, get new sockfd: " << server_sockfd << endl;

                // version4 线程池版本,套用我们以前写的线程池
                // 并且这是一个单例对象,初始化在上面,这里是调用put了
                ThreadPool<Task>::GetInstance().put(Task(server_sockfd, service_IO));
            }
        }
    };
}

在这里插入图片描述
在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到