一、进程池
实现进程池,需要了解一些管道知识,具体请看从ELF到进程间通信:剖析Linux程序的加载与交互机制这篇文章。
在写进程池之前,先简单了解一下进程池。
将来,我们通过父进程创建出一批子进程,父进程要与子进程进行通信,就要建立信道。如果父进程不给子进程写入数据,子进程在读取数据时,会怎么样呢?
子进程会被阻塞
,父进程向子进程写入数据时,子进程读取数据之后,就可以继续做接下来的工作。
所以,父进程可以通过向管道写入数据,来控制子进程的启停。
.
processpool.hpp
#ifndef __PROCESS_POOL_HPP__
#define __PROCESS_POOL_HPP__
#include <stdio.h>
#include <iostream>
#include <string>
#include <vector>
#include <functional>
#include <stdlib.h>
#include <time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include"Task.hpp"
const int gdefault_process_num = 5;
using callback_t = std::function<void(int fd)>;
class Channel
{
public:
Channel()
{
}
Channel(int wfd, std::string name, pid_t id) : _wfd(wfd), _name(name), _sub_target(id)
{
}
void DebugPrint()
{
printf("channel name:%s, wfd:%d, target pid:%d\n", _name.c_str(), _wfd, _sub_target);
}
int Fd() { return _wfd; }
std::string Name() { return _name; }
pid_t Target() { return _sub_target; }
void Close() { close(_wfd); }
void wait()
{
int n = waitpid(_sub_target, nullptr, 0);
}
~Channel()
{
}
private:
int _wfd; // 写描述符
std::string _name; // 管道名
pid_t _sub_target; // 目标子进程
};
class ProcessPool
{
public:
ProcessPool(int num = gdefault_process_num) : _processnum(num)
{
srand((unsigned int)time(NULL));
}
bool InitProcessPool(callback_t cb)
{
Init it;
// error
// int pipefd[2] = {0};
// int n = pipe(pipefd);
// if (n < 0)
// return false;
// 创建子进程
for (int i = 0; i < gdefault_process_num; ++i)
{
// 管道放在循环外面的话,会导致多个子进程共用一个文件描述符,从而导致读失败了,因为子进程访问了其它进程的fd
// 创建信道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
return false;
pid_t id = fork();
if (id < 0)
return false;
if (id == 0)
{
// 子进程
// 关闭不需要的读写端
close(pipefd[1]);
//子进程关闭掉所有的写文件描述符
for(auto& c: _channels)
{
c.Close();
}
// printf("pipefd[0] = %d, pipefd[1] = %d\n", pipefd[0], pipefd[1]);
// 做自己的事情
cb(pipefd[0]);
exit(0);
}
// 父进程
close(pipefd[0]);
std::string name = "channel-" + std::to_string(i);
_channels.emplace_back(pipefd[1], name, id);
// sleep(1);
}
return true;
}
// sleep(3);
// std::cout << "父进程开始控制" << std::endl;
void CtrlSubProcessHelper(int& index)
{
int who = index;
index++;
index %= _channels.size();
int x = rand() % tasks.size();
std::cout << "选择信道: " << _channels[who].Name() << ", subtarget : " << _channels[who].Target() << std::endl;
write(_channels[who].Fd(), &x, sizeof(x));
sleep(2);
}
// 父进程控制子进程
// 采用轮询方案唤醒子进程-------负载均衡
void PollingCtrlSubProcess()
{
int index = 0;
while (true)
{
CtrlSubProcessHelper(index);
}
}
void PollingCtrlSubProcess(int count)
{
int index = 0;
while (count)
{
CtrlSubProcessHelper(index);
count--;
}
}
void WaitAllSubProcess()
{
// 子进程退出,关闭父进程的写描述符
for (auto &c : _channels)
{
c.Close();
}
for (auto &c : _channels)
{
c.wait();
}
}
~ProcessPool()
{
}
private:
std::vector<Channel> _channels; // 所有信道
int _processnum; // 多少个子进程
};
#endif
.
Task.hpp
#pragma once
#include<iostream>
#include<functional>
#include<vector>
using task_t = std::function<void()>;
void DownLoad()
{
std::cout << "我是一个 DownLoad 任务" << std::endl;
}
void MySql()
{
std::cout << "我是一个 MySql 任务" << std::endl;
}
void Sync()
{
std::cout << "我是一个数据刷新同步的任务" << std::endl;
}
void Log()
{
std::cout << "我是一个日志打印的任务" << std::endl;
}
std::vector<task_t> tasks;
class Init
{
public:
Init()
{
tasks.push_back(DownLoad);
tasks.push_back(MySql);
tasks.push_back(Sync);
tasks.push_back(Log);
}
};
.
main.cc
#include "processpool.hpp"
#include"Task.hpp"
int main()
{
ProcessPool pp;
pp.InitProcessPool([](int fd)
{
while(true)
{
std::cout << "子进程阻塞: " << getpid() << std::endl;
int code = 0;
//父进程写端未关闭,写端不写,子进程读取数据就会被阻塞
ssize_t n = read(fd, &code, sizeof(code));
if(n == sizeof(code))
{
std::cout << "子进程被唤醒" << getpid() << std::endl;
if(code >= 0 && code < tasks.size())
{
tasks[code]();
}
else
{
std::cerr << "父进程给我的任务码是不对的" << code << std::endl;
}
}
else if(n == 0)
{
std::cout << "子进程应该退出了" << std::endl;
break;
}
else
{
std::cout << "子进程读取失败" << std::endl;
break;
}
sleep(1);
} });
pp.PollingCtrlSubProcess(10);
// std::cout << "父进程退出了" << std::endl;
pp.WaitAllSubProcess();
return 0;
}
.
Makefile
process_pool:main.cc
g++ -o $@ $^ -g -std=c++11
.PHONY:clean
clean:
rm -f process_pool
这就是进程池的实现啦。但是有一些细节问题需要解决。
void WaitAllSubProcess()
{
for(auto& c: _channels)
{
c.Close();
c.wait();
}
}
关一个写文件描述符,就让父进程回收子进程,是否可以呢?
可以看到,是有问题的。程序并没有退出,子进程也没有被回收,那么,这是为什么呢?
在解决这个问题之前,先来想想,为什么上面先关闭全部的写文件描述符,再回收子进程就可以呢?
这是因为,父进程关闭了全部的写文件描述符,此时父进程的写端关闭,写端不写,而子进程还在读取数据,那么就会读到文件末尾,返回 0,这时就会跳出循环,子进程就会相继退出,父进程就可以一个个回收子进程了
。
现在,再来解析刚才提出的问题,为什么程序没有退出,子进程没有被回收?
父进程创建管道,接着创建子进程,那么子进程就会继承父进程的文件描述符表
,第一个子进程拿到的是 3 和 4 ,接着子进程关闭 4 号文件描述符表,父进程关闭 3 号文件描述符表,第二个子进程拿到的是 3 和 5,子进程关闭 5 号文件描述符,父进程继续关闭 3 号文件描述符,以此类推。
是不是看着挺完美的,但是还有一点细节哦。我们说子进程会继承父进程的文件描述符表
,所以第二个子进程继承时也会继承父进程第一次打开的文件描述符,也就是 4 号文件描述符,此时这个 4 号文件描述符指向的依然是第一个管道,以此类推,第三个子进程就会有两个文件描述符分别指向第一个管道和第二个管道,所以第一个管道的引用计数就会随着子进程增多而逐步增多,最后一个子进程的管道才只有一个写文件描述符指向
。
所以,我们的第一种做法可以倒着关闭。
void WaitAllSubProcess()
{
for(int end = gdefault_process_num - 1; end >= 0; --end)
{
_channels[end].Close();
_channels[end].wait();
}
}
第二种做法就是需要每一个子进程先关闭自己所有的写文件描述符,然后就可以像刚才那样正向关闭
。
那么,要怎么关闭呢?不要忘了,我们的写文件描述符是保存在 Channel 里面的,子进程在继承的时候,也继承了类里面的对象。父进程先创建子进程,此时父进程还没有将描述符添加进去,所以子进程继承的是父进程之前的版本,比如,第一个子进程继承的对象 _channels 是空的,第二个子进程继承的 _channels里有第一个子进程的写文件描述符,所以,只需要在子进程的执行流里关闭掉所有的写文件描述符就可以了
。
InitProcessPool
函数
进程池到这里就结束啦。觉得不错的小伙伴给个一键三连吧。