【Linux取经路】进程通信之匿名管道

发布于:2024-05-15 ⋅ 阅读:(164) ⋅ 点赞:(0)

在这里插入图片描述

一、进程间通信介绍

1.1 进程间通信是什么?

是两个或多个进程实现数据层面的交互,因为进程之间是存在独立性的,所以进程通信的成本比较高。

1.2 进程间通信的目的

  • 数据传输:一个进程需要将它的数据发送给另外一个进程。

  • 资源共享:多个进程之间共享同样的资源。

  • 通知事件:一个进程需要向另一个或者一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时需要通知父进程)。

  • 进程控制:有些进程希望完全控制另一个进程的执行(如 Debug 进程),此时控制进程希望及时知道它的状态改变。

1.3 进程通信该如何实现

进程通信的本质是:必须让不同的进程看到同一份“资源”。这个“资源”就是一种特定形式的内存空间。为了不破坏进程的独立性,这个资源是由操作系统提供的。进程访问这个空间进行通信,本质就是访问操作系统。进程代表的就是用户,因为一个进程本质上是从一个 .c 源代码进化而成的,而源代码是程序员写的,操作系统是不相信用户的,这意味着程序员在代码中不能直接去访问操作系统提供的资源,必须通过系统调用去创建、使用、释放这个资源。在操作系统内部可能会存在多组进程之间都需要通信,因此资源可能会有多份,操作系统需要通过“先描述,再组织”的形式将多份资源管理起来,一般操作系统会有一个独立的通信模块(IPC通信模块),隶属于文件系统。 进程间通信有 system Vposix 两个标准,前者主要是针对本机内部通信,后者是针对网络通信。在这两个标准出来之前也就是操作系统还没有通信模块的时候,进程之间也可以通过基于文件级别的通信方式,管道进行通信。

  • 管道:匿名地址 pipe、命名管道。

  • System V IPC:System V 消息队列、System V 共享内存、System V 信号量。

  • POSIX IPC:消息队列、共享内存、信号量、互斥量、条件变量、读写锁。

二、管道

管道是 Unix 中最古老的进程将通信的形式,我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”

2.1 匿名管道

2.1.1 站在文件描述符角度深入理解管道

在这里插入图片描述
管道本质上是一种内存级文件,它不用往磁盘上进行刷新。上图是以创建子进程为基础,来演示管道的原理。首先父进程以读写方式分两次打开一个文件,分两次的原因是为了获得两个 struct file 对象,这样对一个文件就有两个读写指针,让读写操作使用各自独立的指针,这样读写之间就不会相互影响。读写指针记录了当前文件读取或写入的位置,一个 struct file 中只有一个读写指针,在向文件写入(或读取)的时候,读写指针会发生移动,然后再去读取(写入),此时读写指针已经不再最初的位置,无法将刚写入的内容读取上来,因此这里需要分两次以不同的方式打开同一个文件。接着创建子进程,子进程会继承父进程中打开的文件,也就是继承父进程的文件描述符表,此时父子进程就会共享同一个文件资源,子进程可以通过4号文件描述符向文件中进行写入,父进程就可以通过3号文件描述符从文件中进程读取,此时父子进程就实现了数据传输,也就是通信。一般为了避免误操作,根据需要只会将读写其中的一个文件描述符保留,另外一个关闭,上图中的虚线就表示,在开始通信之前,将不需要的文件描述符进行关闭。通过上面的描述可以发现,这种通信模式只能是单向的,所以我们就把它叫做管道。如果要实现双向通信,可以创建两个管道。

小Tips:父进程可能创建多个子进程,暂且把它们成为“兄弟进程”,兄弟进程之间也可以采用上述的方式进行管道通信。此外,子进程可能还会继续创建子进程,暂且把它叫做“孙子进程”,孙子进程和爷爷进程、父进程、叔叔进程之间都可以采用上述的方式进行管道通信。

结论:上面这种管道通信方式,只适用于具备血缘关系的进程之间进行通信

2.1.2 接口使用

可以用 pipe 系统调用来创建一个管道,下面是函数声明:

int pipe(int pipefd[2]);

参数:一个有两个元素的整形数组,输出型参数,将两个文件描述符数字返回给用户使用,其中 pipefd[0]
中存的是读对应的文件描述符,pipefd[1] 中存的是写对应的文件描述符下标。

返回值:管道创建成功,0被返回;创建失败,-1被返回,错误码被设置。

#include <unistd.h>
#include <iostream>

using namespace std;

#define N 2

int main()
{
    int pipefd[N] = {0};
    int ret = pipe(pipefd);
    if(ret == -1)
    {
        perror("pipe");
        return errno;
    }

    cout << "pipefd[0]: " << pipefd[0] << ", pipefd[1]: " << pipefd[1] << endl;
    return 0;
}

在这里插入图片描述
上面代码执行的工作是创建管道,接下来需要创建子进程进行通信。

#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <cstdlib>
#include <cstdio>
#include <iostream>
#include <string>
#include <string.h>

using namespace std;

#define N 2
#define NUM 1024

void Writer(int wfd)
{
    string str = "Hello, I am child";
    pid_t self_id = getpid();
    int num = 0;

    char buffer[NUM];
    while (true)
    {
        // 构建发送字符串
        buffer[0] = 0; // 字符串清空,只是为了提醒阅读代码的人,我把这个数组当做字符串了
        snprintf(buffer, sizeof(buffer), "%s-%d-%d", str.c_str(), self_id, num);

        write(wfd, buffer, strlen(buffer));
        sleep(1);
        num++;
    }
}

void Reader(int rfd)
{
    char buffer[NUM];

    while(true)
    {
        ssize_t n = read(rfd, buffer, sizeof(buffer) - 1);
        if(n > 0)
        {
            buffer[n] = 0;
        }

        cout << "father("<< getpid() << ")  get a message: " << buffer << endl;
    }
}

int main()
{
    int pipefd[N] = {0};
    int ret = pipe(pipefd);
    if (ret == -1)
    {
        perror("pipe");
        return errno;
    }

    // cout << "pipefd[0]: " << pipefd[0] << ", pipefd[1]: " << pipefd[1] << endl;

    // child -> W, father -> R
    pid_t id = fork();
    if (id > 0)
    {
        // 父进程
        close(pipefd[1]);
        // IPC code
        Reader(pipefd[0]);

        pid_t rid = waitpid(id, nullptr, 0); // 不关心退出嘛码和状态,阻塞式等待
        if(rid < 0)
        {
            // 等待失败
            perror("waitpid");
            return errno;
        }
        close(pipefd[0]);
    }
    else if (id == 0)
    {
        // 子进程
        close(pipefd[0]);
        // IPC code
        Writer(pipefd[1]);
        close(pipefd[1]);
        exit(0);
    }
    else
    {
        perror("fork");
        return errno;
    }
    return 0;
}

在这里插入图片描述
代码中只让子进程进行了 sleep,父进程并没有进行 sleep,但是通过运行结果可以发现,父进程并没有一直去管道中读取并进行打印,而是在子进程向管道中写入后才去读取打印。并且,子进程一定是在父进程读取结束后才进行写入的,没有出现父进程只读取了一半,然后子进程就进行写入。这里可以得出一个结论:父子进程是会进行协同的,会有同步与互斥,以保证管道文件的数据安全。read 读到管道的结尾,write 就会从管道的开始处进行写入。

小Tips:整个通信过程,数据一共发生了两次拷贝,第一次是将数据从子进程的 buffer 中拷贝到管道中(管道文件的页缓冲区中),第二次拷贝是将文件页缓冲区中的内容拷贝到父进程的 buffer 中。

在这里插入图片描述
从上面的运行结果可以得出一个结论:管道是面向字节流的,即无论写端写入了多少次,对于读端来说,只要管道中有数据,有多少就读多少,前提是读端的缓冲区足够大。虽然写端每次都是按照一个字符串一个字符串写入进管道,但是对读端来说,管道里面就是一个个的字符,把这些字符按照某种格式还原成一个一个的字符串这是用户来做的事情,管道不管。

2.1.3 PIPE_BUFFER 和 Pipe capacity

PIPE_BUFFER 是内核管道缓冲区的容量,这个值可以通过 ulimit -a 来查看:

在这里插入图片描述
在这里插入图片描述
如果写入的大小小于 PIPE_BUFFER,也就是小于 512bytes * 8 = 4096bytes = 4kb,那么写入操作就是原子的,也就是要写入的数据应该被连续的写入到管道。

Pipe capacity,表示管道容量的大小,由 PIPE_BUFFER 和缓冲条数的数量来共同决定其大小,缓冲条目的数量与 Linux 内核的版本有关,我这里的数量是16。

在这里插入图片描述

2.1.4 管道中的四种情况

管道中情况总结:读写端正常,管道如果为空,读端就要堵塞;读写端正常,管道如果被写满,写端就要被阻塞;读端正常,写端关闭,读端就会读到0,表示读到了管道(文件)结尾,不会被阻塞;写端正常,读端关闭,操作系统会通过 13 号信号把正在写入的进程 kill 掉。

在这里插入图片描述

2.1.5 管道特征总结

  • 具有血缘关系的进程进行进程间通信。

  • 管道只能单向通信。

  • 父子进程是会协同的,进行同步与互斥,以保证管道文件中数据的安全。

  • 管道是面向字节流的。

  • 管道是基于文件的,而文件的生命周期是随进程的,进程如果退出了,管道也会被自动关闭掉。

2.2 匿名管道使用场景

2.2.1 命令行中的管道

命令行中的 | 底层就是通过 pipe 来创建管道的。它的实现原理是:bash 对输入的指令做分析,统计出指令中 | 的个数,创建出对应数量的管道,然后通过 fork 创建出一批子进程,然后进行重定向工作,将管道左边进程的输出重定向到管道文件中,将管道右边进程的输入重定向到管道文件中,然后通过程序替换去执行指令。程序替换不会影响预先设置好的重定向。

2.2.2 基于管道的简易进程池

进程池就是把一个一个的进程当做资源,提前准备好,需要的时候直接分配,无需再去调用 fork 创建子进程。系统调用是有成本的,当程序执行到系统调用的时候,首先使用类似 int 80H 的软中断指令,在通过系统调用进入内核态的时候,需要保存用户程序的上下文数据,再由用户栈切入内核栈,进入内核态。在内核态返回用户态的时候,需要恢复用户程序的上下文。实际上的操作会更复杂。因此,为了减少用户态和内核态之间的切换次数,以提高系统效率,池化技术应运而生,池化技术就是一次多申请一些系统资源,在用户(程序)需要的时候,直接从池子里面分配即可。

#include <unistd.h>
#include <vector>
#include <cstdio>
#include <cerrno>
#include <string>
#include <iostream>
#include <cstdlib>
#include <ctime>
#include "Task.hpp"
#include <sys/types.h>
#include <sys/wait.h>

const int processnum = 5;

class pipeline
{
public:
    pipeline(pid_t rprocessid, int wfd, std::string processname)
        : _rprocessid(rprocessid),
          _wfd(wfd),
          _processname(processname)
    {
    }

public:
    pid_t _rprocessid;        // 读取端的进程 id
    int _wfd;                 // 写入端的文件描述符
    std::string _processname; // 子进程的名字
};

void slaver()
{
    while (true)
    {
        int cmdcode = 0;
        int n = read(0, &cmdcode, sizeof(int));
        if (n == sizeof(int))
        {
            // 执行 cmdcod 对应的任务
            std::cout << "child-" << getpid() << "-say@ get a cmdcode: " << cmdcode << std::endl;
            if(cmdcode >=0 && cmdcode < tasks.size())
            {
                tasks[cmdcode]();
            }
        }
        else if(n == 0) break;
    }
}

void InitProcessPool(std::vector<pipeline> *pipelines)
{
    for (int i = 0; i < processnum; i++)
    {
        // 创建管道
        int pipefd[2] = {0};
        int ret = pipe(pipefd);
        if (ret == -1)
        {
            perror("pipe");
            return;
        }

        // 创建子进程
        pid_t id = fork();
        if (id == 0)
        {
            // child
            close(pipefd[1]);
            // code
            dup2(pipefd[0], 0);
            slaver();
            exit(0);
        }
        else if (id > 0)
        {
            // father
            close(pipefd[0]);
            std::string processname = "process-" + std::to_string(i);
            pipelines->push_back(pipeline(id, pipefd[1], processname));
        }
        else
        {
            perror("fork");
            return;
        }
    }
}

void Debug(const std::vector<pipeline> pipelines)
{
    for (auto &c : pipelines)
    {
        std::cout << c._processname << "-" << c._rprocessid << "-" << c._wfd << std::endl;
    }
    sleep(1000);
}

void ctrlSlaverRandom(const std::vector<pipeline> &pipelines) // 随机选择子进程
{
    for(int i = 1; i <= 100; i++)
    {
        // 1. 选择任务
        int cmdcode = rand() % tasks.size();
        // 2. 选择进程
        int processpos = rand() % processnum;
        // 3. 发送任务
        std::cout << "father say: " << "cmdcode-"  << cmdcode << ", already sendto: " << pipelines[processpos]._rprocessid << " processname: " << pipelines[processpos]._processname << std::endl;
        write(pipelines[processpos]._wfd, &cmdcode, sizeof(cmdcode));
        sleep(1);
    }
}

void ctrlSlaverPoll(const std::vector<pipeline> &pipelines) // 子进程轮询
{
    int which = 0;
    int cnt = 5;
    while(cnt--)
    {
        // 1. 选择任务
        int cmdcode = rand() % tasks.size();
        // 2. 选择进程----轮询方式
        // 3. 发送任务
        std::cout << "father say: " << "cmdcode-"  << cmdcode << ", already sendto: " << pipelines[which]._rprocessid << " processname: " << pipelines[which]._processname << std::endl;

        write(pipelines[which]._wfd, &cmdcode, sizeof(cmdcode));

        which++;
        which %= pipelines.size();
        sleep(1);
    }
}


void QuitProcess(const std::vector<pipeline> &pipelines)
{
    for(const auto &c : pipelines) close(c._wfd); // 写端关闭,读端正常会读到0
    sleep(5);
    for(const auto &c : pipelines) waitpid(c._rprocessid, NULL, 0);
    sleep(5);
}

int main()
{
    srand((unsigned int)time(NULL));
    std::vector<pipeline> pipelines;
    LoadTask(&tasks);
    // 1、 初始化
    InitProcessPool(&pipelines);

    // test
    // Debug(pipelines);

    // 2、开始控制子进程----给子进程布置任务
    ctrlSlaverPoll(pipelines);

    // 3、 结束
    QuitProcess(pipelines);
    return 0;
}

在这里插入图片描述
上面代码中存在一个小问题,因为,父进程是需要向管道中进行写入的,所以,父进程对创建出管道的读端始终没有关闭,每次只把写端进行关闭,而新创建的子进程会继承父进程中的所有文件描述符,以父进程和子进程 A 之间的管道为例,这就导致,后创建的子进程继承了之前管道的写端描述符,这样其实是有问题的。问题一是后创建的子进程可以向之前的管道中进行写入。问题二是,在结束的时候如果处理不当程序会出现卡住的现象,上面说过,写端关闭,读端就会读到0,可以通过判断,让子进程结束终止任务,如果忽略了上图展示的 Bug,父进程在 close(4) 之后就立刻去调用 waitpid 等待子进程A,此时因为实际上,在其它的子进程中也有指向该管道的写端,而只是在父进程中调用 close(4),把父进程中的写端关闭了,所以子进程A并不会读到0,而是读写端都正常,管道为空,子进程A会阻塞等待。下面这样的代码就是错误的。

void QuitProcess(const std::vector<pipeline> &pipelines)
{
    for(const auto &c : pipelines) 
    {
        close(c._wfd); // 写端关闭,读端正常会读到0
        waitpid(c._rprocessid, NULL, 0);
    }
}

正确的写法有以下几种:

void QuitProcess(const std::vector<pipeline> &pipelines)
{
    for(const auto &c : pipelines) close(c._wfd); // 写端关闭,读端正常会读到0
    sleep(5);
    for(const auto &c : pipelines) waitpid(c._rprocessid, NULL, 0);
}

这种写法正确的原因是,最后一个子进程和父进程之间的管道,就只有父进程中的一个读端,通过 for 循环将父进程中所有的读端都关闭,虽然前面的子进程并不会退出,但是最后一个子进程一定会退出处于僵尸状态,最后一个进程退出,它里面的文件描述符就会全部关闭,这就回间接导致指向倒数第二个管道的所有读端也被关闭了,这样倒数第二个子进程就会退出,以此类推,最终所有的子进程都会退出处于僵尸状态。然后再去通过 for 循环去回收所有的子进程,此时就能回收成功。

void QuitProcess(const std::vector<pipeline> &pipelines)
{
    for(int i = pipelines.size()-1; i >= 0; i--)
    {
        close(pipelines[i]._wfd);
        waitpid(pipelines[i]._rprocessid, NULL, 0);
    }
}

上面这样写也是对的,倒着去关闭父进程中的读端,然后立即回收。除了上面这两种方法外,还可以在子进程最开始的时候,将继承下来的无用的文件描述符进行关闭,因此需要定一个 oldfd 数组,记录父进程每次创建出管道的写端。

#include <unistd.h>
#include <vector>
#include <cstdio>
#include <cerrno>
#include <string>
#include <iostream>
#include <cstdlib>
#include <ctime>
#include "Task.hpp"
#include <sys/types.h>
#include <sys/wait.h>

const int processnum = 5;

class pipeline
{
public:
    pipeline(pid_t rprocessid, int wfd, std::string processname)
        : _rprocessid(rprocessid),
          _wfd(wfd),
          _processname(processname)
    {
    }

public:
    pid_t _rprocessid;        // 读取端的进程 id
    int _wfd;                 // 写入端的文件描述符
    std::string _processname; // 子进程的名字
};

void slaver()
{
    while (true)
    {
        int cmdcode = 0;
        int n = read(0, &cmdcode, sizeof(int));
        if (n == sizeof(int))
        {
            // 执行 cmdcod 对应的任务
            std::cout << "child-" << getpid() << "-say@ get a cmdcode: " << cmdcode << std::endl;
            if(cmdcode >=0 && cmdcode < tasks.size())
            {
                tasks[cmdcode]();
            }
        }
        else if(n == 0) break;
    }
}

void InitProcessPool(std::vector<pipeline> *pipelines)
{
    std::vector<int> oldfd;
    for (int i = 0; i < processnum; i++)
    {
        // 创建管道
        int pipefd[2] = {0};
        int ret = pipe(pipefd);
        if (ret == -1)
        {
            perror("pipe");
            return;
        }

        // 创建子进程
        pid_t id = fork();
        if (id == 0)
        {
            sleep(10);
            for(auto c : oldfd) close(c);
            // child
            close(pipefd[1]);
            // code
            dup2(pipefd[0], 0);
            slaver();
            exit(0);
        }
        else if (id > 0)
        {
            // father
            close(pipefd[0]);
            std::string processname = "process-" + std::to_string(i);
            pipelines->push_back(pipeline(id, pipefd[1], processname));
            oldfd.push_back(pipefd[1]);
        }
        else
        {
            perror("fork");
            return;
        }
    }
}

void Debug(const std::vector<pipeline> pipelines)
{
    for (auto &c : pipelines)
    {
        std::cout << c._processname << "-" << c._rprocessid << "-" << c._wfd << std::endl;
    }
    sleep(1000);
}

void ctrlSlaverRandom(const std::vector<pipeline> &pipelines) // 随机选择子进程
{
    for(int i = 1; i <= 100; i++)
    {
        // 1. 选择任务
        int cmdcode = rand() % tasks.size();
        // 2. 选择进程
        int processpos = rand() % processnum;
        // 3. 发送任务
        std::cout << "father say: " << "cmdcode-"  << cmdcode << ", already sendto: " << pipelines[processpos]._rprocessid << " processname: " << pipelines[processpos]._processname << std::endl;
        write(pipelines[processpos]._wfd, &cmdcode, sizeof(cmdcode));
        sleep(1);
    }
}

void ctrlSlaverPoll(const std::vector<pipeline> &pipelines) // 子进程轮询
{
    int which = 0;
    int cnt = 10;
    while(cnt--)
    {
        // 1. 选择任务
        int cmdcode = rand() % tasks.size();
        // 2. 选择进程----轮询方式
        // 3. 发送任务
        std::cout << "father say: " << "cmdcode-"  << cmdcode << ", already sendto: " << pipelines[which]._rprocessid << " processname: " << pipelines[which]._processname << std::endl;

        write(pipelines[which]._wfd, &cmdcode, sizeof(cmdcode));

        which++;
        which %= pipelines.size();
        sleep(1);
    }
}


// void QuitProcess(const std::vector<pipeline> &pipelines)
// {
//     for(const auto &c : pipelines) close(c._wfd); // 写端关闭,读端正常会读到0
//     sleep(5);
//     for(const auto &c : pipelines) waitpid(c._rprocessid, NULL, 0);
// }

// void QuitProcess(const std::vector<pipeline> &pipelines)
// {
//     for(const auto &c : pipelines) 
//     {
//         close(c._wfd); // 写端关闭,读端正常会读到0
//         waitpid(c._rprocessid, NULL, 0);
//     }
    
//     // sleep(5);
// }

// void QuitProcess(const std::vector<pipeline> &pipelines)
// {
//     for(int i = pipelines.size()-1; i >= 0; i--)
//     {
//         close(pipelines[i]._wfd);
//         sleep(2);
//         waitpid(pipelines[i]._rprocessid, NULL, 0);
//     }
// }

void QuitProcess(const std::vector<pipeline> &pipelines)
{
    for(const auto &c : pipelines) 
    {
        close(c._wfd); // 写端关闭,读端正常会读到0
        waitpid(c._rprocessid, NULL, 0);
    }
}

int main()
{
    srand((unsigned int)time(NULL));
    std::vector<pipeline> pipelines;
    LoadTask(&tasks);
    // 1、 初始化
    InitProcessPool(&pipelines);

    // test
    // Debug(pipelines);

    // 2、开始控制子进程----给子进程布置任务
    ctrlSlaverPoll(pipelines);

    // 3、 结束
    QuitProcess(pipelines);
    return 0;
}

三、结语

今天的分享到这里就结束啦!如果觉得文章还不错的话,可以三连支持一下,春人的主页还有很多有趣的文章,欢迎小伙伴们前去点评,您的支持就是春人前进的动力!

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到