📝前言:
这篇文章我们用刚学的进程间通信知识,进行一个简单进程池的编写,内容包括:
- 基本轮廓
- 关键模块实现
- 运行展示
🎬个人简介:努力学习ing
📋个人专栏:Linux
🎀CSDN主页 愚润求学
🌄其他专栏:C++学习笔记,C语言入门基础,python入门基础,C++刷题专栏
一,基本轮廓
父进程负责将任务(数据)发送给子进程执行,所以父进程把任务写入管道,充当写端;子进程从管道中读取任务并处理,充当读端。
主要实现一下几个文件:
ProcessPool.hpp
:简单进程池主题模块Channel.hpp
:信道模块Task.hpp
:任务分配模块Main.cpp
:测试模块
二,关键模块实现
Cannel
Cannel
模块主要实现对信道的描述,需要实现:
- 初始化信道,关闭信道
- 获得管道的基本信息:信道在父进程文件描述符中的
fd
,以及信道对应的子进程的PID - 把任务码写入管道文件
#pragma once
#include <iostream>
#include <sys/types.h>
#include <unistd.h>
#include <string>
#include <cstdio>
using namespace std;
class Channel
{
public:
Channel(int wfd, pid_t subid) : _wfd(wfd), _Subid(subid)
{
_name = "Channel-" + to_string(wfd) + "-" + to_string(subid);
}
int Wfd()
{
return _wfd;
}
pid_t SubId()
{
return _Subid;
}
string Name()
{
return _name;
}
void cend(int task_code) // 把任务写入管道
{
write(_wfd, &task_code, sizeof(task_code)); // 直接二进制写入,后续也要二进制读出,面相字节流
}
void Close()
{
close(_wfd);
}
private:
int _wfd; // 对应的父进程的文件描述符(也是信道对应的管道文件描述符)
pid_t _Subid; // 对应的子进程的PID
string _name; // 管道名字
};
ProcessPool.hpp
ProcessPool.hpp
是进程池模块的主体。负责管理所有信道,并且所有的外层使用接口应该在这一层被封装好,则应该满足:
- 启动进程池
- 分配任务:1. 负载均衡的选择接受的信道;2. 把对应的任务码传给信道
- 关闭进程池
- 打印当前进程池信息
#pragma once
#include "Channel.hpp"
#include "Task.hpp"
#include <functional>
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
using work_t = function<void()>; // 包装 无参数无返回的函数的调用,使调用接口统一(至少名字上统一)
class ProcessPool
{
public:
ProcessPool(int n) : _n(n)
{
}
bool StartPool() // 启动进程池
{
for (int i = 0; i < _n; i++)
{
// 建立管道文件
int pipefd[2] = {0};
int ret1 = pipe(pipefd);
if (ret1 < 0)
return false;
// 创建进程
int subid = fork();
if (subid < 0)
return false;
// 子进程和父进程各自处理,形成信道
if (subid == 0)
{
// 子进程
// close(pipefd[1]); // 有问题的:关闭写端:
// 正确写法:
for (auto &c : Channels) // 关闭历史所有写端
c.Close();
close(pipefd[1]); // 关闭本次管道文件的写端
// 子进程工作
work(pipefd[0]); // 传入管道的读端
exit(0);
}
// 父进程
close(pipefd[0]); // 关闭读端
Channels.emplace_back(pipefd[1], subid); // 添加一个信道
}
return true;
}
void AllocTask(int task_nums) // 分发任务
{
// 用轮询的方式发
int who = 0;
while (task_nums--)
{
int task_code = tm.SelectTask();
Channel &cur = Channels[who++];
who %= _n;
cur.cend(task_code);
std::cout << "send " << task_code << " to " << cur.Name() << ", 任务还剩: " << task_nums << std::endl;
sleep(1);
}
}
void CleanPool() // 关闭进程池
{
for (auto &c : Channels)
{
// 关闭管道对应的文件描述符
c.Close();
cout << "关闭: " << c.Name() << endl;
// 等待管道对应的进程退出
pid_t rid = waitpid(c.SubId(), nullptr, 0);
cout << "子进程: " << rid << " 等待成功" << endl;
}
}
void PrintPool() // 打印进程池信息
{
for (int i = 0; i < _n; i++)
{
cout << "进程 " << i << ": " << Channels[i].Name() << endl;
}
}
private:
vector<Channel> Channels; // 用 vector 组织管道
size_t _n; // 进程个数
};
其中要特别注意子进程关闭读端的时候:
不能只简单的close(pipefd[1])
,因为有子进程继承读端的问题,即:
- 如果父进程的文件描述符表中已经记录了其他信道的写端,则新建的子进程会把父进程已开的写端也拷贝下来
- 这会导致后续
CleanPool
的Close
的时候,虽然关闭了父进程指向的对应管道的写端 - 但是,依然有子进程指向那个管道文件,导致
fd
没有变0
,写端没有完全关闭,就会一直阻塞住,读不到0
,就不会退出进程。 - 解决方法:
-
- 倒着关(因为最后一个管道的读端没有子进程继承)
-
- 创建子进程时:在关闭写端时,把继承下来的兄弟进程的写端也全给关闭了
-
Task.hpp
Task.cpp
用来描述子进程收到的任务,负责:
- 模拟创建子进程的任务列表
- 提供任务码生成函数
- 解析获得的任务码
- 子进程工作函数实现
#pragma once
#include <iostream>
#include <functional>
#include <vector>
using namespace std;
using task = function<void()>;
// 设置 3 个任务。
// 每次发任务,随机从以下三个任务里面选择
class TaskManager
{
public:
TaskManager()
{
tasks.push_back([]()
{ cout << "sub process[" << getpid() << " ] 执⾏访问数据库的任务\n"
<< endl; });
tasks.push_back([]()
{ cout << "sub process[" << getpid() << " ] 执⾏ url 解析\n"
<< endl; });
tasks.push_back([]()
{ cout << "sub process[" << getpid() << " ] 执⾏加密任务\n"
<< endl; });
}
int SelectTask()
{
srand(time(nullptr));
return rand() % tasks.size();
}
void Excute(int task_code) // 执行任务
{
if (task_code >= tasks.size() || task_code < 0)
printf("任务码错误");
tasks[task_code](); // 调用对应的任务
}
private:
vector<task> tasks;
};
TaskManager tm;
void work(pid_t rfd) // 子进程执行任务码对应的任务(对于每一个子进程而言,对应的 task_code 在他对应的管道文件里面
{
while (true)
{
int task_code;
int n = read(rfd, &task_code, sizeof(task_code));
if (n == sizeof(int)) // 代表该进程的管道文件里面有任务
{
tm.Excute(task_code);
}
else if (n == 0)
{
std::cout << "pid: " << getpid() << " quit..." << std::endl;
break;
}
else
{
// 当管道文件没有数据(读取错误),且写端没有关的时候,阻塞住
}
}
}
Main.cpp
Main.cpp
为测试代码,调用进程池查看效果。
#include "ProcessPool.hpp"
int main()
{
ProcessPool *pp = new ProcessPool(3);
// 1. 初始化进程池
pp->StartPool();
// 2. 派发任务
pp->AllocTask(10);
// 3. 退出进程池
pp->CleanPool();
delete pp;
return 0;
}
三,运行展示
当然,我的代码可能还存在很多BUG!
部分运行效果:
运行时和结束时对应进程变化:
🌈我的分享也就到此结束啦🌈
要是我的分享也能对你的学习起到帮助,那简直是太酷啦!
若有不足,还请大家多多指正,我们一起学习交流!
📢公主,王子:点赞👍→收藏⭐→关注🔍
感谢大家的观看和支持!祝大家都能得偿所愿,天天开心!!!