【Linux】进程间通信(二):利用匿名管道进行简单进程池的编写

发布于:2025-05-20 ⋅ 阅读:(21) ⋅ 点赞:(0)

📝前言:

这篇文章我们用刚学的进程间通信知识,进行一个简单进程池的编写,内容包括:

  1. 基本轮廓
  2. 关键模块实现
  3. 运行展示

🎬个人简介:努力学习ing
📋个人专栏:Linux
🎀CSDN主页 愚润求学
🌄其他专栏:C++学习笔记C语言入门基础python入门基础C++刷题专栏


一,基本轮廓

在这里插入图片描述

父进程负责将任务(数据)发送给子进程执行,所以父进程把任务写入管道,充当写端;子进程从管道中读取任务并处理,充当读端。

主要实现一下几个文件:

  • ProcessPool.hpp:简单进程池主题模块
  • Channel.hpp:信道模块
  • Task.hpp:任务分配模块
  • Main.cpp:测试模块

二,关键模块实现

Cannel

Cannel模块主要实现对信道的描述,需要实现:

  • 初始化信道,关闭信道
  • 获得管道的基本信息:信道在父进程文件描述符中的fd,以及信道对应的子进程的PID
  • 把任务码写入管道文件
#pragma once

#include <iostream>
#include <sys/types.h>
#include <unistd.h>
#include <string>
#include <cstdio>
using namespace std;
class Channel
{
public:
    Channel(int wfd, pid_t subid) : _wfd(wfd), _Subid(subid)
    {
        _name = "Channel-" + to_string(wfd) + "-" + to_string(subid);
    }

    int Wfd()
    {
        return _wfd;
    }

    pid_t SubId()
    {
        return _Subid;
    }

    string Name()
    {
        return _name;
    }

    void cend(int task_code) // 把任务写入管道
    {
        write(_wfd, &task_code, sizeof(task_code)); // 直接二进制写入,后续也要二进制读出,面相字节流
    }

    void Close()
    {
        close(_wfd);
    }

private:
    int _wfd;     // 对应的父进程的文件描述符(也是信道对应的管道文件描述符)
    pid_t _Subid; // 对应的子进程的PID
    string _name; // 管道名字
};

ProcessPool.hpp

ProcessPool.hpp是进程池模块的主体。负责管理所有信道,并且所有的外层使用接口应该在这一层被封装好,则应该满足:

  • 启动进程池
  • 分配任务:1. 负载均衡的选择接受的信道;2. 把对应的任务码传给信道
  • 关闭进程池
  • 打印当前进程池信息
#pragma once

#include "Channel.hpp"
#include "Task.hpp"
#include <functional>
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
using work_t = function<void()>; // 包装 无参数无返回的函数的调用,使调用接口统一(至少名字上统一)

class ProcessPool
{
public:
    ProcessPool(int n) : _n(n)
    {
    }

    bool StartPool() // 启动进程池
    {
        for (int i = 0; i < _n; i++)
        {
            // 建立管道文件
            int pipefd[2] = {0};
            int ret1 = pipe(pipefd);
            if (ret1 < 0)
                return false;

            // 创建进程
            int subid = fork();
            if (subid < 0)
                return false;

            // 子进程和父进程各自处理,形成信道
            if (subid == 0)
            {
                // 子进程
                // close(pipefd[1]); // 有问题的:关闭写端:
                // 正确写法:
                for (auto &c : Channels) // 关闭历史所有写端
                    c.Close();
                
                close(pipefd[1]); // 关闭本次管道文件的写端
                // 子进程工作
                work(pipefd[0]); // 传入管道的读端
                exit(0);
            }

            // 父进程
            close(pipefd[0]);                           // 关闭读端
            Channels.emplace_back(pipefd[1], subid); // 添加一个信道
        }
        return true;
    }

    void AllocTask(int task_nums) // 分发任务
    {
        // 用轮询的方式发
        int who = 0;
        while (task_nums--)
        {
            int task_code = tm.SelectTask();
            Channel &cur = Channels[who++];
            who %= _n;
            cur.cend(task_code);
            std::cout << "send " << task_code << " to " << cur.Name() << ", 任务还剩: " << task_nums << std::endl;
            sleep(1);
        }
    }

    void CleanPool() // 关闭进程池
    {
        for (auto &c : Channels)
        {
            // 关闭管道对应的文件描述符
            c.Close();
            cout << "关闭: " << c.Name() << endl;

            // 等待管道对应的进程退出
            pid_t rid = waitpid(c.SubId(), nullptr, 0);
            cout << "子进程: " << rid << " 等待成功" << endl;
        }
    }

    void PrintPool() // 打印进程池信息
    {
        for (int i = 0; i < _n; i++)
        {
            cout << "进程 " << i << ": " << Channels[i].Name() << endl;
        }
    }

private:
    vector<Channel> Channels; // 用 vector 组织管道
    size_t _n;                // 进程个数
};

其中要特别注意子进程关闭读端的时候:
不能只简单的close(pipefd[1]),因为有子进程继承读端的问题,即:

  • 如果父进程的文件描述符表中已经记录了其他信道的写端,则新建的子进程会把父进程已开的写端也拷贝下来
  • 这会导致后续CleanPoolClose 的时候,虽然关闭了父进程指向的对应管道的写端
  • 但是,依然有子进程指向那个管道文件,导致 fd没有变 0 ,写端没有完全关闭,就会一直阻塞住,读不到 0,就不会退出进程。
  • 解决方法:
      1. 倒着关(因为最后一个管道的读端没有子进程继承)
      1. 创建子进程时:在关闭写端时,把继承下来的兄弟进程的写端也全给关闭了

Task.hpp

Task.cpp用来描述子进程收到的任务,负责:

  • 模拟创建子进程的任务列表
  • 提供任务码生成函数
  • 解析获得的任务码
  • 子进程工作函数实现
#pragma once

#include <iostream>
#include <functional>
#include <vector>
using namespace std;

using task = function<void()>;

// 设置 3 个任务。
// 每次发任务,随机从以下三个任务里面选择
class TaskManager
{
public:
    TaskManager()
    {
        tasks.push_back([]()
                        { cout << "sub process[" << getpid() << " ] 执⾏访问数据库的任务\n"
                               << endl; });

        tasks.push_back([]()
                        { cout << "sub process[" << getpid() << " ] 执⾏ url 解析\n"
                               << endl; });

        tasks.push_back([]()
                        { cout << "sub process[" << getpid() << " ] 执⾏加密任务\n"
                               << endl; });
    }

    int SelectTask()
    {
        srand(time(nullptr));
        return rand() % tasks.size();
    }

    void Excute(int task_code) // 执行任务
    {
        if (task_code >= tasks.size() || task_code < 0)
            printf("任务码错误");
        tasks[task_code](); // 调用对应的任务
    }

private:
    vector<task> tasks;
};

TaskManager tm;

void work(pid_t rfd) // 子进程执行任务码对应的任务(对于每一个子进程而言,对应的 task_code 在他对应的管道文件里面
{
    while (true)
    {
        int task_code;
        int n = read(rfd, &task_code, sizeof(task_code));
        if (n == sizeof(int)) // 代表该进程的管道文件里面有任务
        {
            tm.Excute(task_code);
        }
        else if (n == 0)
        {
            std::cout << "pid: " << getpid() << " quit..." << std::endl;
            break;
        }
        else
        {
            // 当管道文件没有数据(读取错误),且写端没有关的时候,阻塞住
        }
    }
}

Main.cpp

Main.cpp为测试代码,调用进程池查看效果。

#include "ProcessPool.hpp"

int main()
{
    ProcessPool *pp = new ProcessPool(3);
    // 1. 初始化进程池
    pp->StartPool();
    // 2. 派发任务
    pp->AllocTask(10);
    // 3. 退出进程池
    pp->CleanPool();
    delete pp;
    return 0;
}

三,运行展示

当然,我的代码可能还存在很多BUG!
部分运行效果:
在这里插入图片描述

运行时和结束时对应进程变化:
在这里插入图片描述


🌈我的分享也就到此结束啦🌈
要是我的分享也能对你的学习起到帮助,那简直是太酷啦!
若有不足,还请大家多多指正,我们一起学习交流!
📢公主,王子:点赞👍→收藏⭐→关注🔍
感谢大家的观看和支持!祝大家都能得偿所愿,天天开心!!!


网站公告

今日签到

点亮在社区的每一天
去签到