【网络编程】TCP 服务器并发编程:多进程、线程池与守护进程实践

发布于:2025-09-14 ⋅ 阅读:(22) ⋅ 点赞:(0)

请添加图片描述


半桔个人主页

 🔥 个人专栏: 《Linux手册》《手撕面试算法》《网络编程》

🔖很多人在喧嚣声中登场,也有少数人在静默中退出。 -张方宇-

前言

在互联网技术蓬勃发展的今天,高并发、高可靠的网络服务已成为各类应用的核心诉求 —— 从支撑海量用户的 Web 服务器,到实时交互的分布式系统,甚至是物联网设备的通信底座,高效的网络通信设计进程生命周期管理,始终是保障服务稳定运行的基石。

本文将聚焦 Linux 网络编程与进程管理 的核心技术,以 “从基础到进阶,从实现到优化” 的脉络展开:

  • 从最基础的 套接字接口 出发,剖析网络通信的底层逻辑;
  • 通过 TCP 服务器 的搭建,掌握客户端 - 服务端交互的核心流程;
  • 针对高并发场景,探索 多进程、线程池 等并行模型的设计,突破服务吞吐量的瓶颈;
  • 最终深入 进程组与守护进程 的实践,解决服务 “脱离终端、长期稳定运行” 的生产级需求。

TCP通信是面向字节流的,而UDP是面向最字节报的,因此两者通信方式上有本质的差异。

TCP面向字节流也就意味着,接收方读取上来的数据可能是不完整的,因此TCP通信要进行协议定制,规定一个消息从哪到哪是一个整体部分。关于协议的定制我们在下一篇博客中详细讲解,本篇文章我们假设通过TCP通信对方就可以拿到一个完整的数据。

套接字接口

TCP的接口和UDP接口有类似的,当时也有一些不同之处。
UDP通信的步骤就是:创建套接字,绑定,接收和发送消息;而TCP与其是不一样的。

  • TCP通信时面向连接的,需要通信双方先建立连接,服务器一般是比较“被动”的,服务器一直处于等待外界连接的状态(监听状态)。

因此在进行绑定完成之后,服务器要先进入监听状态,与客户端建立连接后才能进行通信:

int listen(int sockfd , int backlog)

  1. 参数一:套接字;
  2. 参数二: backlog 表示未完成连接队列(处于三次握手过程中)和已完成连接队列(三次握手完成,等待 accept 处理)的最大长度之和。用来调节连接时的并发量;
  3. 返回值:成功返回0,失败-1;

第二个接口,将服务器设置为监听模式之后,要对客户端的连接请求做出响应,要接收客户端的请求:

int accept(int sockfd , struct sockaddr_in *addr , socklen_t *addrlen)

  1. 参数一:套接字;
  2. 参数二:输出型参数,一个结构体,存储着客户端的ip和端口号信息;
  3. 参数三:输出型参数,表示第二个结构体的大小;
  4. 返回值:返回一个文件描述符,通过该文件描述符可以让直接使用writeread接口进行通信,就像从文件中进行读写一样。

注意:accept中的sockfd也属于文件描述符,只不过该描述符主要负责将底层的连接请求来上来,而不负责进行IO交互;而accept返回的文件描述符是专门用来进行IO交互的。

随着客户端越来越多,accept返回的文件描述符也就也来越多,每一个都负责与一个客户端进行通信。

客户端要与服务端建立连接,所以需要先服务端发送连接请求:

int connet(int sockfd , struct sockaddr* addr , socklen_t addrlen)

  1. 参数一:套接字;
  2. 参数二:结构体,内部包含要进行连接的IP和端口号;
  3. 参数三:参数二结构体的大小;
  4. 返回值:0表示成功,-1表示失败。

TCP服务器

使用一个类来实现TCP服务器:

  • 内存成员需要有IP和端口号,来进行绑定;
  • 并且需要将套接字存储起来,否则后续在不到套接字就会导致无法关闭对应的网络文件位置。
  • 此处在设计一个bool类型的变量,让用户可以控制时候打开服务器。

初始化的时候需要外界将这些参数都传进行保存起来,但是并不在初始化时创建套接字,而是当用户运行时才进行创建。

const std::string defaultip = "0.0.0.0";

class Server
{
public:
    Server(const uint16_t &port , const std::string &ip = defaultip)
    :port_(port) , ip_(ip)
    {}

private:
    uint16_t port_;
    std::string ip_;
    int sockfd_;
};

与UDP一样,为了保证服务器能够接收来自各个网卡上的数据,我们再对服务器进行绑定的时候使用ip为0。

在此之前我们需要思考以下接收到的信息如何进行处理?

如果我们直接让处理方法都在循环内完成,就会导致代码拓展性差,如果后续希望接入进程池就需要对代码进行重构,因此此处将对接收到的信息处理方法也单独封装一个类:

该类主要负责,将对信息进行处理,处理完后,向客户端返回数据,因此该类的成员必须有一个string用来存储待处理的信息,为了进行通信还需要拿到对应的文件描述符

我们可以在类中对调用运算符进行重载,在进行消息调用的时候更简单。
为了后续测试,我们先不进行太复杂的处理:

class Task
{
public:
    Task(const int & fd , const std::string message)
    :fd_(fd) , message_(message)
    {}

    bool operator()()
    {
        std::string ret = "I have got your message : " + message_;
        write(fd_ , ret.c_str() , ret.size()); 
        return true;
    }
private:
    int fd_;
    std::string message_;
};

现在可以对服务器进行初始化了,初始化主要分为3步:

  1. 创建套接字;
  2. 绑定;
  3. 设置监听模式。
    void Init()
    {
        // 1. 创建套接字
        // 2. 绑定
        // 3. 设置监听模式

        sockfd_ = socket(AF_INET , SOCK_STREAM , 0);
        if(sockfd_ < 0)
        {
            Log(Fatal) << "socket failed ";
            exit(Socket_Err);
        }

        struct sockaddr_in local;
        local.sin_family = AF_INET;
        local.sin_port = htons(port_);
        char clientip[32];
        inet_aton(ip_.c_str() , &local.sin_addr);
        if(bind(sockfd_ , (const struct sockaddr*)&local , sizeof(local)) < 0)
        {
            Log(Fatal) << "bind failed" ;
            exit(Bind_Err);
        }
        
        if(listen(sockfd_ , 10) < 0)
        {
            Log(Fatal) << "listen failed" ;
            exit(Listen_Err);
        }
    }

运行服务器了,运行服务器:

  1. 先建立连接;
  2. 读取数据;
  3. 做出反应。
    void Service(int fd_)
    {   
        char buffer[1024];
        while(1)
        {
            
            int n = read(fd_ , buffer , sizeof(buffer) - 1);
            if(n > 0)
            {
                buffer[n] = 0;
                Task task(fd_ , buffer);
                task();
            }
            else if(n == 0)
            {
                close(fd_);
                break;
            }
            else 
            {
                Log(Error) << "read error";
                close(fd_);
                break;
            }
        }
    }

    void Start()
    {
        // 1. 建立连接
        // 2. 读取消息
        // 3. 对消息进行处理,并返回
        struct sockaddr_in client;
        socklen_t len = sizeof(client);
        int fd = accept(sockfd_ , (struct sockaddr*)&client , &len);
        if(fd < 0)
        {
            Log(Warning) << "accept failed";
        }

        Service(fd);
    }

此处我们将服务单独进行了封装,方便后面接入多线程/多进程。

服务器的类编写完成,后面再进行拓展,当前先进行以下简单测试:
编写一个源文件来运行一下服务器:在执行的时候,必须给出端口号。

void Menu(char* argv[])
{
    std::cout << "\r" << argv[0] <<  "  [port] " << "\n";
}

int main(int argc , char* argv[])
{
    if(argc != 2)
    {
        Menu(argv);
        exit(1);
    }

    uint16_t port = std::stoi(argv[1]);

    Server server(port);
    server.Init();
    server.Start();
    return 0;
}

当前服务器编写完成了,但是客户端还没进行实现。如果想对服务端进行测试的话,可以先使用telnet工具,绑定本地环回地址127.0.0.1进行测试,但是只能起到本地通信的作用,不会将信息推送到网络中

下一步就是编写客户端了:

客户端的编写就比较简单了:

  1. 创建套接字;
  2. 发送连接请求;
  3. 连接成功,发送数据;
  4. 接收数据。

与服务端的编写类似,只不过要用到connect接口:

void Menu(char *argv[])
{
    std::cout << argv[0] << "  [ip] " << " [port] " << std::endl;
}

int main(int argc, char *argv[])
{
    if (argc != 3)
    {
        Menu(argv);
        exit(1);
    }

    std::string ip = argv[1];
    uint16_t port = std::stoi(argv[2]);

    // 1.创建套接字
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
    {
        std::cerr << " socket failed ";
        exit(2);
    }

    // 2.发送连接请求
    struct sockaddr_in server;
    server.sin_family = AF_INET;
    server.sin_port = htons(port);
    inet_aton(ip.c_str(), &server.sin_addr);
    int n = connect(sockfd, (sockaddr *)&server, sizeof(server));
    if (n < 0)
    {
        std::cerr << " connect failed ";
        exit(2);
    }

    // 3.进行通信
    std::string message;
    char buffer[1024];
    while (1)
    {
        std::cout << "Please Enter@";
        std::getline(std::cin, message);
        write(sockfd, message.c_str(), message.size());
        n = read(sockfd, buffer, sizeof(buffer) - 1);
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << buffer << std::endl;
        }

        if (message == "quit")
            break;
    }

    close(sockfd);
    return 0;
}

以上就是客户端和服务端的所有代码编写,只不过给服务端只能处理一个用户端。

为了能够同时处理多个用户端,此处我们需要使用多进程或多线程来实现。

TCP + 多进程

  • 父进程创建子进程,让子进程来与客户端进行交互;
  • 父进程只负责与子进程建立连接。

此处需要考虑子进程的回收问题,我们并不希望对子进程进行等待,因此有两种方案:

  1. 直接将SIGCHLD信号进行屏蔽;
  2. 使用孙子进程来完成与客户端通信,子进程直接回收;

此处我们采用孙子进程的方式直接回收子进程,让孙子进程被超卓系统领养。

此处我们仅需要对服务端类中得Start进行修改即可:

    void Start()
    {
        // 1. 建立连接
        // 2. 读取消息
        // 3. 对消息进行处理,并返回
        while (1)
        {
            struct sockaddr_in client;
            socklen_t len = sizeof(client);
            int fd = accept(sockfd_, (struct sockaddr *)&client, &len);
            if (fd < 0)
            {
                Log(Warning) << "accept failed";
            }

            // 使用多进程来实现
            pid_t id = fork();
            if (id < 0)
            {
                Log(Fatal) << "fork failed";
            }
            else if (id == 0)
            {
                close(sockfd_);
                if (fork() == 0)   // 使用孙子进程进行通信
                {
                    Service(fd);
                    exit(0);
                }
                exit(0);
            }

            // 父进程直接将fd关闭,不允许父进程与客户端进行通信
            close(fd);
            pid_t rid = waitpid(id, nullptr, 0);  // 回收子进程
        }
    }

以上就是多进程服务端的修改,也很简单。

TCP + 线程池

  • 主线程先任务队列中添加任务,而线程池中的线程负责将任务取出来,执行。

引入线程池,向任务队列中放什么???

有两种方案:

  1. 对Task任务类进行从写;
  2. 向任务队列中放函数对象,让线程能够直接调用。

此处两种方法都实现一下:

重写Task任务

  • 我们希望主线程构建一个Task任务,加入到任务队列中,然后线程池中的线程拿出来执行。
  • 线程池中的线程如果想与用户端进行通信,就必须拿到文件描述符,因此Task类私有成员有一个文件描述符
  • task任务的调用运算符重载,应该变成原来的Service函数实现.

重写如下:

class Task
{
public:
    Task(const int &fd)
        : fd_(fd)
    {
    }

    void operator()()
    {
        char buffer[1024];
        while (1)
        {
            memset(buffer, 0, sizeof(buffer));
            int n = read(fd_, buffer, sizeof(buffer) - 1);
            if (n > 0)
            {
                buffer[n] = 0;
                std::string ret = "I have got your message : " + std::string(buffer);
                write(fd_, ret.c_str(), ret.size());
                if (strcmp(buffer, "quit") == 0)
                    break;
            }
            else if (n == 0)
            {
                close(fd_);
                break;
            }
            else
            {
                Log(Level::Error) << "read error";
                close(fd_);
                break;
            }
        }
    }

private:
    int fd_;
};

下一步就是对服务端的Start的函数进行重写,主线程负责向线程池放入Task对象:

    void Start()
    {
        // 1. 建立连接
        // 2. 读取消息
        // 3. 对消息进行处理,并返回
        std::unique_ptr<thread_poll<Task>>& ptp = thread_poll<Task>::GetInstance();
        ptp->run();
        while (1)
        {
            struct sockaddr_in client;
            socklen_t len = sizeof(client);
            int fd = accept(sockfd_, (struct sockaddr *)&client, &len);
            if (fd < 0)
            {
                Log(Level::Warning) << "accept failed";
            }

            // 父进程直接将fd关闭,不允许父进程与客户端进行通信
            ptp->push(Task(fd));
        }
    }

通过这种方式,就实现了主线程向任务队列中放数据,由线程池中的线程来与用户端进行沟通。

放函数对象

我们已经有现成的函数调用对象了,就是服务端中的Service函数,但是如果线程池中的线程并没有在该函数中,因此也就没有this指针了,所以我们在传函数对象的时候,可以使用std::bind进行绑定,将this指针绑定到函数对象中,这样线程池中的线程就可以直接进行调用了。

我们只需要对Service函数进行绑定,保证线程池中的线程在调用的时候,不需要传递任何参数,可以直接调用即可:

    void Start()
    {
        // 1. 建立连接
        // 2. 读取消息
        // 3. 对消息进行处理,并返回
        using  fun_t =  std::function<void()>;
        std::unique_ptr<thread_poll<fun_t>>& ptp = thread_poll<fun_t>::GetInstance();
        ptp->run();
        while (1)
        {
            struct sockaddr_in client;
            socklen_t len = sizeof(client);
            int fd = accept(sockfd_, (struct sockaddr *)&client, &len);
            if (fd < 0)
            {
                Log(Level::Warning) << "accept failed";
            }

            // 父进程直接将fd关闭,不允许父进程与客户端进行通信
            fun_t func = std::bind(&Server::Service , this , fd);  // 绑定this指针和文件描述符
            ptp->push(func);
        }
    }

以上两种方法都比较常用,后一种方法实现上更简单一些。

客户端重连

当服务端挂掉或者读写出错时,我们上面的客户端会直接退出;当服务端出现问题的时候,我们并不应该将客户端直接退出,而是让客户端进行重连,即重新向服务端发送建立连接的请求

下面我们将进行模拟实现,客户端重连的机制:

  • 客户端重连,必定需要进行循环;当服务端挂掉时,让客户端重新进行connect尝试重新建立连接;
  • 我们也不能一直让客户端进行连接,当尝试连接的次数达到一定限制时,才让客户端退出。

下面时修改后的代码实现,我们的主循环内部有两个循环,一个用来控制重连的次数,另一个用来与服务端建立联系。

void Menu(char *argv[])
{
    std::cout << argv[0] << "  [ip] " << " [port] " << std::endl;
}

int main(int argc, char *argv[])
{
    if (argc != 3)
    {
        Menu(argv);
        exit(1);
    }

    std::string ip = argv[1];
    uint16_t port = std::stoi(argv[2]);

    struct sockaddr_in server;
    server.sin_family = AF_INET;
    server.sin_port = htons(port);
    inet_aton(ip.c_str(), &server.sin_addr);

    while (1)
    {
        int cnt = 0, n = 0 , sockfd = -1;
        const int max_cnt = 6;
        do
        {
            // 1.创建套接字
            sockfd = socket(AF_INET, SOCK_STREAM, 0);
            if (sockfd < 0)
            {
                std::cerr << " socket failed ";
                exit(2);
            }
            // 2.connext
            n = connect(sockfd, (sockaddr *)&server, sizeof(server));
            if (n < 0)
            {
                std::cout << "connet failed : " << cnt++ << std::endl;
                sleep(1);
            }
            else
                break;
        } while (cnt < max_cnt);
        if (cnt == max_cnt)
        {
            std::cout << "server error" << std::endl;
            return 0;
        }

        // 3.进行通信
        std::string message;
        char buffer[1024];
        while (1)
        {
            std::cout << "Please Enter@";
            std::getline(std::cin, message);
            write(sockfd, message.c_str(), message.size());
            n = read(sockfd, buffer, sizeof(buffer) - 1);
            if (n > 0)
            {
                buffer[n] = 0;
                std::cout << buffer << std::endl;
            }
            else
                break;

            if (message == "quit")
            {
                close(sockfd);
                return 0;
            }
        }
    }

    return 0;
}

客户端在直接进行连接的时候,会出现连接失败,因核心原因是 服务器重启时,原端口因 TCP TIME_WAIT 状态被占用,导致无法重新绑定端口(监听失败)

所以我们需要对服务器进行设置:在服务器的 socket 创建后、bind 前,添加 端口复用选项

int opt = 1;
setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR|SO_REUSEPORT, &opt, sizeof(opt));
// 防止偶发性的服务器无法进行立即重启

进程组与守护进程

在操作系统中我们有前台进程和后台进程;

  • 通过jobs指令可以查看后台进程;
  • fg + 任务号:将后台进程拿到前台;

但前台进程被暂停后,如果向前台进程发送19号信息,即SIGSTOP时,前台进程会被自动移动到后
台进程,此时bash命令行解释器会被移动到前台。

  • bg + 任务号,将后台暂停的进程继续执行。

在设计服务器的时候,我们希望服务器是后台进程,并且不受到用户的登录和退出的影响
下面解释如何做到:

进程组和会话

  • 在操作系统中有一个进程组的概念,进程组是一个或多个进程的集合,进程组中有一个组长:PID==PGID就是组长;
  • 组长负责创建一个进程组或者在进程组中创建进程;该组长进程执行完毕,并不会影响组内其他进程的执行;

一个进程组中的进程协作来完成任务,最常见的就是通过管道执行命令,管道中的所有命令都属于一个进程组

可以通过ps aj来查看进程的相关ID信息:

![[Pasted image 20250827194606.png]]

  • 在操作系统中又定义了session会话的概念,session指的是一个或多个进程组。
  • 通常默认一个会话与一个终端进行关联,在操作系统中会有一个初始会话,该会话与终端直接建立联系,控制终端可以向初始会话中的进程发送信号,同时当控制终端退出的时候,内部的所有进程,进程组都会被退出,这就会导致我们的服务器也会退出。

但是好在,当我们创建一个新会话的时候,新会话默认没有控制终端,这也就保证了新会话不受终端的登录和退出的控制。

因此只要让服务端自成一个新会话,就可以保证服务端持续运行。该进程不再与键盘关联,不受到登录和注销的影响,这种进程就被称为守护进程。下面看看守护进程如何实现。

守护进程

  • 一个进程组的组长不能自成会话,也就不能当守护进程。

因此在自成会话的时候,需要时子进程,让父进程直接退出,子进程作为孤儿进程自成会话。
我们通过pid_t setsid(void)来让一个进程自成会话。

  • 一般我们会选择将守护进程的一些信号进行忽略,防止收到信号影响;
  • 并且一般会更改目录,以及输入输出,将输入输出定向到/dev/null中。

现在让我们来实现守护进程:

const std::string defaultdir = "/";
const std::string nullfile = "/dev/null";

void Deamon(bool ischdir , bool isclose)
{
    // 1.忽略信号
    signal(SIGPIPE , SIG_IGN);
    signal(SIGPIPE , SIG_IGN);
    signal(SIGSTOP , SIG_IGN);

    // 2. 自成会话
    if(fork() > 0 ) exit(0);   // 父进程直接退出
    setsid();

    if(ischdir)
        chdir(defaultdir.c_str());

    if(isclose)    // 是否关闭文件
    {
        close(0);
        close(1);
        close(2);
    }
    else
    {
        int fd = open(nullfile.c_str() , O_RDWR);
        dup2(fd , 0);
        dup2(fd , 1);
        dup2(fd , 2);
    }
}

以上就是自己实现的守护进程接口。

实际上操作系统也提供了接口,让一个进程自成会话int daemon(int nochdir , int noclose),在这里就不再介绍了。