1. 进程间通信
(1) 概念
进程间通信(IPC) 就是不同进程间交换数据的方法,进程间是独立的所以不能访问彼此的内存,需要某种机制来通信(管道、消息队列,共享内存等)
(2) 目的
数据传输:一个进程需要他的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源
通知事件:一个进程需要向另一个或一组进程发送消息,通知它们发生了某种事件(例如子进程终止通知父进程)
进程控制:有些进程希望完全控制另一个进程的执行,此时控制进程希望能够拦截另一个进程的所有陷入和异常,并及时知道它的状态改变
(3) 本质
让不同的进程先看到同一份资源(内存) 才有通信的条件。 同一份资源由操作系统OS来提供系统调用,OS的接口->设计统一的通信接口。
2. 匿名管道
管道是Unix中进程通信的形式,把一个进程链接到另一个进程的数据流称为一个管道.
匿名管道是单向通信:一端写另一端读。用于做父子通信,通过对文件的读写来实现通信,通过文件描述符表来实现fd。
使用原理是创建一个子进程,让父子进程都指向一个文件,最后我们就可以让父进程写入或读取,子进程读取或写入数据了。
创建匿名管道
#include <unistd.h>
int fd[2];
pipe(fd);
pipe系统调用,创建匿名管道,fd数组存储两个文件描述符
所需头文件<unistd.h>
fd[0]:读端(从管道文件读取数据)
fd[1]:写端(向管道文件写入数据)
管道的五种特性
- 只能用来进行具有血缘关系的进程间通信(常用于父与子)
- 管道文件自带同步机制 快的等待慢的。同一时间只允许一个进程进行写入或读取操作,并且保证同一个公共资源同一时刻只能被一个进程使用,两个进程不能同时操作(互斥)
- 管道是面向字节流的 其大小最大为64kb,65536字节
- 管道是单向通信的(特殊的半双工通信)
- 管道文件的生命周期是随进程的(所有拥有其读写端fd的都close关闭)进程终止时,所有未关闭的fd都会被内核自动关闭。
四种通信情况
- 写慢,读快---读端就要阻塞(等待写端写入)。
- 写快,读慢---满了的时候,写就要阻塞等待
- 写关闭,读继续---read就会读到返回值为0,表示文件结尾
- 读关闭,写继续---写端再写入也没有任何意义了
操作系统OS不做无意义的事情,OS会(发送异常信号)杀掉写端进程
五种特性四种通信情况也适用于命名管道
3. 进程池实现
三个类来描述再组织
channel
描述这个管道
class Channel
{
public:
Channel(int wfd, pid_t chpid)
: _wfd(wfd),
_chpid(chpid)
{
// to_string 将数字转化为string形式
_name = "channel-" + std::to_string(_wfd) + "+" + std::to_string(_chpid);
}
int Fd() { return _wfd; } // 返回_wfd
pid_t Chpid() { return _chpid; } //..
std::string Name() { return _name; } // 返回名字
void Send(int code)
{
int n= write(_wfd,&code,sizeof code);
(void)n;//调用一下n,防止检查
}
void Close()
{
close(_wfd);
}
void Wait()
{
pid_t rid=waitpid(_chpid,nullptr,0);
(void)rid;//操作一下这个变量
}
~Channel() {}
private:
int _wfd; // 文件描述符
pid_t _chpid; // 对应的子进程是谁
std::string _name; // 命名信道方便区分
};
Senf写入任务码
Close关闭写端fd
Wait等待子进程
ChannelManage
用来管理创建的匿名管道
// 管理Channel管道
class ChannelManage
{
public:
ChannelManage()
{
}
void InsterChannel(int wfd, pid_t chpid) // 写的fd 与子进程pid
{
// Channel c(wfd,chpid);
// _channel.push_back(std::move(c));
_channel.emplace_back(wfd, chpid); // 免去拷贝
}
void PrintfChannel()
{
for (auto &channel : _channel)
{
std::cout << channel.Name() << std::endl;
}
}
Channel& Select()
{
auto& c=_channel[_next];
_next++;
if(_next==_channel.size())
_next=0;
return c;
}
// void StopSubprocess()//暂停子进程
// {
// for(auto& channel:_channel)
// {
// channel.Close();//关闭接口
// std::cout<<"关闭:"<<channel.Name()<<std::endl;
// }
// }
// void WaitSubprocess()//等待子进程结束
// {
// for(auto& channel:_channel)
// {
// channel.Wait();//等待进程
// std::cout<<"回收:"<<channel.Name()<<std::endl;
// }
// }
void CloseAll()
{
for(auto& channel:_channel)
{
channel.Close();//关闭父进程历史打开的写端
// std::cout<<"关闭:"<<channel.Name()<<std::endl;
}
}
void CloseAndWait()
{
//越早开辟的进程,写端引用计数的越多,关闭不了
for(auto& channel:_channel)
{
channel.Close();//关闭接口
std::cout<<"关闭:"<<channel.Name()<<std::endl;
channel.Wait();//等待进程
std::cout<<"回收:"<<channel.Name()<<std::endl;
}
//解决方法1:
//倒着关闭所有进程
// for(int i= _channel.size()-1;i>=0;i--)
// {
// _channel[i].Close();//关闭接口
// std::cout<<"关闭:"<<_channel[i].Name()<<std::endl;
// _channel[i].Wait();//等待进程
// std::cout<<"回收:"<<_channel[i].Name()<<std::endl;
// }
//解决方案2:
//让父进程一个指向所有管道写端w
}
~ChannelManage()
{
}
private:
std::vector<Channel> _channel; // 存储管道
int _next=0;
};
InsterChannel :插入新的匿名管道 写端fd 与 子进程的pid
PrintfChannel :输出所有匿名管道的名字
Select :按顺序返回管道
CloseAll :关闭父进程历史打开的写端,让写端基本不会重复打开
CloseAndWait :关闭写端,CloseAll用来解决写端引用计数过多无法关闭的情况。
Select的作用
干活不能只让一个人干,执行任务不能总让一个子进程去执行所以有三种方法
1. 轮询 2. 随机 3. Channel增添一个负载指标,通过这个指标来选择谁干活
这里Select采用了轮询的方式
TaskManager 与 ProcessPool
typedef void (*task_t)();//函数指针
void PrintfLog()
{
std::cout<<"我是打印日志的任务"<<std::endl;
}
void Download()
{
std::cout<<"我是下载文件的任务"<<std::endl;
}
void Upload()
{
std::cout<<"我是上传文件的任务"<<std::endl;
}
class TaskManager
{
public:
TaskManager()
{
srand(time(nullptr));
}
void Register(task_t t)//注册
{
_tasks.push_back(t);
}
int Code()
{
return rand()%_tasks.size();
}
void Execute(int code)
{
if(code>=0&&code< _tasks.size())//在数组范围内
{
_tasks[code]();//根据传递来的任务码来执行任务
}
}
~TaskManager()
{
}
private:
std::vector<task_t> _tasks;
};
存储各种任务
class ProcessPool
{
public:
ProcessPool(int num=5)
: _process_num(num)
{
_tm.Register(PrintfLog);//将任务都存入_tm管理
_tm.Register(Download);
_tm.Register(Upload);
}
void Work(int rfd) // 读端文件描述符
{
while (true)
{
int code=0;
//最大读入长度为4,所以
ssize_t n=read(rfd,&code,sizeof code);
if(n > 0)//读成功
{
if(n!=sizeof code)//读取数据长度要为我们规定的长度
{
continue;//不然重新读
}
std::cout<<"子进程"<<getpid()<<"收到任务码,任务码为:"<<code<<std::endl;
//要执行任务了
_tm.Execute(code);//执行code这个任务
}
else if(n==0)//读到头了
{
std::cout<<"子进程退出"<<std::endl;
break;
}
else//n<0 读取失败了
{
std::cout<<"读取数据失败"<<std::endl;
break;
// exit(1);
}
// std::cout << "我是子进程,我的rfd是" << rfd << std::endl;
sleep(1);
}
}
bool Create() // 初始化管道
{
for (int i = 0; i < _process_num; i++)
{
// 创建匿名管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
return false; // 失败了
// 创建子进程
pid_t nid = fork();
if (nid == 0) // 子进程负责读
{
//让子进程关闭自己继承下来的哥哥进程的写端关闭就可以了
//只会影响引用计数不关闭父进程写端
_cm.CloseAll();//写实拷贝所以不会影响父进程。子进程每次关闭都只关子进程的
close(pipefd[1]); // 将写端关闭
Work(pipefd[0]); // 读端工作
close(pipefd[0]);
exit(0); // 工作完就退出即可
}
else if (nid < 0) // 创建失败
{
std::cout << "子进程创建失败" << std::endl;
return false;
}
else // 父进程
{
// char buf[1024];
close(pipefd[0]); // 读端关闭
_cm.InsterChannel(pipefd[1], nid); // 将这个子进程加入数组保存
// close(pipefd[1]); // 写端关闭要在最后才关闭
}
}
return true;
}
void Debug()
{
_cm.PrintfChannel();
}
void PushTask()//写任务
{
//不能选择一个子进程一直用啊
//下面三种实现负载均衡方案
//1. 轮询 2. 随机 3. Channel添加一个负载指标,通过是否达成指标来
//1. 选一个任务
int taskcode=_tm.Code();//随机选一个任务
//2. 选择一个信道 子进程
auto& c=_cm.Select();
std::cout<<"选择了一个子进程"<<c.Name()<<std::endl;
//3. 发送任务
c.Send(taskcode);//传任务码
std::cout<<"发送了一个任务码:"<<taskcode<<std::endl;
}
void Stop()
{
// //关闭父进程所有的wfd
// _cm.StopSubprocess();
// //回收所有子进程
// _cm.WaitSubprocess();
_cm.CloseAndWait();
}
~ProcessPool()
{
}
private:
ChannelManage _cm; // 管理所有的管道
int _process_num; // 进程数量
TaskManager _tm;//管理所有的任务
};
三个参数
ChannelManage _cm; // 管理所有的管道
int _process_num; // 记录进程数量
TaskManager _tm;//管理所有的任务
Create:pipe创建匿名管道,并将父子进程通过if else分开执行任务
Work :参数为读端文件描述符,所执行任务码通过read 来获得
PushTask :选择一个任务来执行通过轮询机制
Stop :关闭所有写端
4. 源码
processpol.hpp
hpp文件是一种在C++中的特殊头文件,通常用于将类的声明和实现放在同一个文件中。
.h文件和.cpp文件将类的声明和实现分离的方式不同,hpp文件将这两者结合在一起。
#pragma once
#include <cstdio>
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <vector>
#include <string>
#include<sys/wait.h>
#include<sys/types.h>
#include "Task.hpp"
class Channel
{
public:
Channel(int wfd, pid_t chpid)
: _wfd(wfd),
_chpid(chpid)
{
// to_string 将数字转化为string形式
_name = "channel-" + std::to_string(_wfd) + "+" + std::to_string(_chpid);
}
int Fd() { return _wfd; } // 返回_wfd
pid_t Chpid() { return _chpid; } //..
std::string Name() { return _name; } // 返回名字
void Send(int code)
{
int n= write(_wfd,&code,sizeof code);
(void)n;//调用一下n,防止检查
}
void Close()
{
close(_wfd);
}
void Wait()
{
pid_t rid=waitpid(_chpid,nullptr,0);
(void)rid;//操作一下这个变量
}
~Channel() {}
private:
int _wfd; // 文件描述符
pid_t _chpid; // 对应的子进程是谁
std::string _name; // 命名信道方便区分
};
// 管理Channel管道
class ChannelManage
{
public:
ChannelManage()
{
}
void InsterChannel(int wfd, pid_t chpid) // 写的fd 与子进程pid
{
// Channel c(wfd,chpid);
// _channel.push_back(std::move(c));
_channel.emplace_back(wfd, chpid); // 免去拷贝
}
void PrintfChannel()
{
for (auto &channel : _channel)
{
std::cout << channel.Name() << std::endl;
}
}
Channel& Select()
{
auto& c=_channel[_next];
_next++;
if(_next==_channel.size())
_next=0;
return c;
}
// void StopSubprocess()//暂停子进程
// {
// for(auto& channel:_channel)
// {
// channel.Close();//关闭接口
// std::cout<<"关闭:"<<channel.Name()<<std::endl;
// }
// }
// void WaitSubprocess()//等待子进程结束
// {
// for(auto& channel:_channel)
// {
// channel.Wait();//等待进程
// std::cout<<"回收:"<<channel.Name()<<std::endl;
// }
// }
void CloseAll()
{
for(auto& channel:_channel)
{
channel.Close();//关闭父进程历史打开的写端
// std::cout<<"关闭:"<<channel.Name()<<std::endl;
}
}
void CloseAndWait()
{
//越早开辟的进程,写端引用计数的越多,关闭不了
for(auto& channel:_channel)
{
channel.Close();//关闭接口
std::cout<<"关闭:"<<channel.Name()<<std::endl;
channel.Wait();//等待进程
std::cout<<"回收:"<<channel.Name()<<std::endl;
}
//解决方法1:
//倒着关闭所有进程
// for(int i= _channel.size()-1;i>=0;i--)
// {
// _channel[i].Close();//关闭接口
// std::cout<<"关闭:"<<_channel[i].Name()<<std::endl;
// _channel[i].Wait();//等待进程
// std::cout<<"回收:"<<_channel[i].Name()<<std::endl;
// }
//解决方案2:
//让父进程一个指向所有管道写端w
}
~ChannelManage()
{
}
private:
std::vector<Channel> _channel; // 存储管道
int _next=0;
};
class ProcessPool
{
public:
ProcessPool(int num=5)
: _process_num(num)
{
_tm.Register(PrintfLog);//将任务都存入_tm管理
_tm.Register(Download);
_tm.Register(Upload);
}
void Work(int rfd) // 读端文件描述符
{
while (true)
{
int code=0;
//最大读入长度为4,所以
ssize_t n=read(rfd,&code,sizeof code);
if(n > 0)//读成功
{
if(n!=sizeof code)//读取数据长度要为我们规定的长度
{
continue;//不然重新读
}
std::cout<<"子进程"<<getpid()<<"收到任务码,任务码为:"<<code<<std::endl;
//要执行任务了
_tm.Execute(code);//执行code这个任务
}
else if(n==0)//读到头了
{
std::cout<<"子进程退出"<<std::endl;
break;
}
else//n<0 读取失败了
{
std::cout<<"读取数据失败"<<std::endl;
break;
// exit(1);
}
// std::cout << "我是子进程,我的rfd是" << rfd << std::endl;
sleep(1);
}
}
bool Create() // 初始化管道
{
for (int i = 0; i < _process_num; i++)
{
// 创建匿名管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
return false; // 失败了
// 创建子进程
pid_t nid = fork();
if (nid == 0) // 子进程负责读
{
//让子进程关闭自己继承下来的哥哥进程的写端关闭就可以了
//只会影响引用计数不关闭父进程写端
_cm.CloseAll();//写实拷贝所以不会影响父进程。子进程每次关闭都只关子进程的
close(pipefd[1]); // 将写端关闭
Work(pipefd[0]); // 读端工作
close(pipefd[0]);
exit(0); // 工作完就退出即可
}
else if (nid < 0) // 创建失败
{
std::cout << "子进程创建失败" << std::endl;
return false;
}
else // 父进程
{
// char buf[1024];
close(pipefd[0]); // 读端关闭
_cm.InsterChannel(pipefd[1], nid); // 将这个子进程加入数组保存
// close(pipefd[1]); // 写端关闭要在最后才关闭
}
}
return true;
}
void Debug()
{
_cm.PrintfChannel();
}
void PushTask()//写任务
{
//不能选择一个子进程一直用啊
//下面三种实现负载均衡方案
//1. 轮询 2. 随机 3. Channel添加一个负载指标,通过是否达成指标来
//1. 选一个任务
int taskcode=_tm.Code();//随机选一个任务
//2. 选择一个信道 子进程
auto& c=_cm.Select();
std::cout<<"选择了一个子进程"<<c.Name()<<std::endl;
//3. 发送任务
c.Send(taskcode);//传任务码
std::cout<<"发送了一个任务码:"<<taskcode<<std::endl;
}
void Stop()
{
// //关闭父进程所有的wfd
// _cm.StopSubprocess();
// //回收所有子进程
// _cm.WaitSubprocess();
_cm.CloseAndWait();
}
~ProcessPool()
{
}
private:
ChannelManage _cm; // 管理所有的管道
int _process_num; // 进程数量
TaskManager _tm;//管理所有的任务
};
Task.hpp
#pragma once
#include <cstdio>
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <vector>
#include <string>
#include <ctime>
typedef void (*task_t)();//函数指针
void PrintfLog()
{
std::cout<<"我是打印日志的任务"<<std::endl;
}
void Download()
{
std::cout<<"我是下载文件的任务"<<std::endl;
}
void Upload()
{
std::cout<<"我是上传文件的任务"<<std::endl;
}
class TaskManager
{
public:
TaskManager()
{
srand(time(nullptr));
}
void Register(task_t t)//注册
{
_tasks.push_back(t);
}
int Code()
{
return rand()%_tasks.size();
}
void Execute(int code)
{
if(code>=0&&code< _tasks.size())//在数组范围内
{
_tasks[code]();//根据传递来的任务码来执行任务
}
}
~TaskManager()
{
}
private:
std::vector<task_t> _tasks;
};
main.cpp
#include"processpoll.hpp"
int main()
{
ProcessPool pp(5);
pp.Create();
// pp.Debug();
int cnt=10;
while(cnt--)
{
//1. 选择一个信道
pp.PushTask();
sleep(1);
}
pp.Stop();
// sleep(1000);
return 0;
}
这篇就到这里啦(๑′ᴗ‵๑)I Lᵒᵛᵉᵧₒᵤ❤