首页 > 系统相关 >进程间通信-进程池

进程间通信-进程池

时间:2024-09-07 16:21:50浏览次数:10  
标签:std channels int 间通信 进程 include channel

目录

理解​

完整代码

 完善代码

 回收子进程:​

 不回收子进程:

子进程使用重定向优化


理解


#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <sys/types.h>

void work(int rfd)
{
}

// master
class Channel
{
private:
    int _wfd;
    pid_t _subprocessid;
    std::string _name; // 信道名字
public:
    Channel(int wfd, pid_t id, const std::string name) // 构造函数
        : _wfd(wfd), _subprocessid(id), _name(name)    // 构造函数初始化列表
    {
    }

    int getfd() { return _wfd; }
    int getid() { return _subprocessid; }
    std::string getname() { return _name; }

    ~Channel() // 析构函数
    {
    }
};

//  ./processpool 5
int main(int argc, char *argv[])
{
    std::vector<Channel> channels;
    if (argc != 2) // 说明不用创建子进程,不会用
    {
        std::cerr << "usage: " << argv[0] << "processnum" << std::endl;
        return 1;
    }
    int num = std::stoi(argv[1]); // 表明需要创建几个子进程,stoi转化整数
    for(int i=0;i<num;i++)
    {
        int pipefd[2];
        int n=pipe(pipefd);//创建管道
        if(n<0)exit(1);//创建管道失败,那么就无需与子进程通信了

        pid_t id=fork();//创建子进程
        if(id==0){//子进程不用创建子进程,只需父进程即可
            //child  --r
            close(pipefd[1]);
            work(pipefd[0]);//进行工作
            exit(0);
        }

        //farther  --w
        close(pipefd[0]);
        //a.此时父进程已经有了子进程的pid  b.父进程的w端 
        std::string channel_name="channel-"+std::to_string(i);//构建一个channel名称
        channels.push_back(Channel(pipefd[1],id,channel_name));
    }
    // test
    for (auto &channel : channels)
    {
        std::cout << "***************************************" << std::endl;
        std::cout << channel.getname() << std::endl; // 取出
        std::cout << channel.getid() << std::endl;   // 取出
        std::cout << channel.getfd() << std::endl;   // 取出
    }

    return 0;
}

完整代码

#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <sys/types.h>
#include "task.hpp"

void work(int rfd)
{
    while (true)
    {
        int command = 0;
        int n = read(rfd, &command, sizeof(command)); // 父进程写了一个整数,那么我也读一个整数
        if (n == sizeof(int))
        {
            std::cout<<"pid is "<<getpid()<<"chuli task"<<std::endl;
            excuttask(command); // 执行
        }
    }
}

// master
class Channel
{
private:
    int _wfd;
    pid_t _subprocessid;
    std::string _name; // 信道名字
public:
    Channel(int wfd, pid_t id, const std::string name) // 构造函数
        : _wfd(wfd), _subprocessid(id), _name(name)    // 构造函数初始化列表
    {
    }
    int getfd() { return _wfd; }
    int getid() { return _subprocessid; }
    std::string getname() { return _name; }
    ~Channel() // 析构函数
    {
    }
};

// 形参类型和命名规范
//  const & :输出
//  & :输入输出型参数
//  * :输出型参数
void creatchannelandsun(int num, std::vector<Channel> *channels)
{
    for (int i = 0; i < num; i++)
    {
        int pipefd[2];
        int n = pipe(pipefd); // 创建管道
        if (n < 0)
            exit(1); // 创建管道失败,那么就无需与子进程通信了

        pid_t id = fork(); // 创建子进程
        if (id == 0)
        { // 子进程不用创建子进程,只需父进程即可
            // child  --r
            close(pipefd[1]);
            work(pipefd[0]); // 进行工作
            exit(0);
        }

        // farther  --w
        close(pipefd[0]);
        // a.此时父进程已经有了子进程的pid  b.父进程的w端
        std::string channel_name = "channel-" + std::to_string(i); // 构建一个channel名称
        channels->push_back(Channel(pipefd[1], id, channel_name));
    }
}

int nextchannel(int channelnum)
{ // 形成一个0 1 ...到channelnum的编号
    static int next = 0;
    int channel = next;
    next++;
    next %= channelnum;
    return channel;
}

void sendtask(Channel &channel, int taskcommand)
{
    write(channel.getfd(), &taskcommand, sizeof(taskcommand)); // 写
}

//  ./processpool 5
int main(int argc, char *argv[])
{
    if (argc != 2) // 说明不用创建子进程,不会用
    {
        std::cerr << "usage: " << argv[0] << "processnum" << std::endl;
        return 1;
    }
    int num = std::stoi(argv[1]); // 表明需要创建几个子进程,stoi转化整数
    inittask();                   // 装载任务

    std::vector<Channel> channels;
    creatchannelandsun(num, &channels); // 创建信道和子进程

    // 通过channel控制子进程
    while (true)
    {
        sleep(1);//每隔一秒发布一个
        // 第一步选择一个任务 
        int taskcommand = selecttask();
        // 第二步选择一个信道和进程,其实是在vector中选择
        int index_channel = nextchannel(channels.size());
        // 第三步发送任务
        sendtask(channels[index_channel], taskcommand);
        std::cout<<std::endl;
std::cout<<"taskcommand: "<<taskcommand<<" channel: "<<channels[index_channel].getname()<<" subprocess: "<<channels[index_channel].getid()<<std::endl;
    }
    return 0;
}
//以前我们都是用.h表示头文件声明,.cpp表示实现
//那么我们用.hpp也是c++的一种头文件,他允许将声明和实现和在一个文件里,那么就有一个好处,像这种代码无法形成库,即使形成库也是开源形成的
#pragma once
#include<iostream>
#include<ctime>
#include<stdlib.h>
#include<unistd.h>
#include <sys/types.h>

#define tasknum 3

typedef void (*task_t)();//task_t  返回值为void,参数为空的函数指针

void print(){//三个任务列表
    std::cout<<"i am a printf task"<<std::endl;
}
void download(){
    std::cout<<"i am a download task"<<std::endl;
}
void flush(){
    std::cout<<"i am a flush task"<<std::endl;
}

task_t tasks[tasknum];//函数指针数组

void inittask(){//初始化任务
    srand(time(nullptr)^getpid());//种时间和pid随机种子
    //srand(seed): 这个函数用于用指定的 seed(种子)初始化随机数生成器。
    //seed 的值决定了 rand() 生成的随机数序列。如果使用相同的种子值,每次生成的随机数序列都是一样的。
    //time(nullptr) 提供了一个基于当前时间的种子值。getpid() 提供了一个进程的唯一标识符。
    //用异或操作符 ^ 将这两个值混合在一起,产生一个更为变化的种子值。
    tasks[0]=print;
    tasks[1]=download;
    tasks[2]=flush;
}

void excuttask(int n){//执行任务
    if(n<0||n>2)return;
    tasks[n]();//调用
}

int selecttask(){//随机选择任务
    return rand()%tasknum;
}

 完善代码

#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include<sys/wait.h>
#include <sys/types.h>
#include "task.hpp"

void work(int rfd)
{
    while (true)
    {
        int command = 0;
        int n = read(rfd, &command, sizeof(command)); // 父进程写了一个整数,那么我也读一个整数
        if (n == sizeof(int))
        {
            std::cout<<"pid is "<<getpid()<<"chuli task"<<std::endl;
            excuttask(command); // 执行
        }
        else if(n==0){
            std::cout<<"sub process: "<<getpid()<<" quit"<<std::endl;
            break;
        }
    }
}

// master
class Channel
{
private:
    int _wfd;
    pid_t _subprocessid;
    std::string _name; // 信道名字
public:
    Channel(int wfd, pid_t id, const std::string name) // 构造函数
        : _wfd(wfd), _subprocessid(id), _name(name)    // 构造函数初始化列表
    {
    }
    int getfd() { return _wfd; }
    int getid() { return _subprocessid; }
    std::string getname() { return _name; }
    void closechannel(){//关闭文件描述符
        close(_wfd);
    }
    void wait(){
        pid_t n=waitpid(_subprocessid,nullptr,0);
        if(n>0){
            std::cout<<"wait "<<n<<" success"<<std::endl;
        }
    }
    ~Channel() // 析构函数
    {
    }
};

// 形参类型和命名规范
//  const & :输出
//  & :输入输出型参数
//  * :输出型参数
void creatchannelandsun(int num, std::vector<Channel> *channels)
{
    for (int i = 0; i < num; i++)
    {
        int pipefd[2];
        int n = pipe(pipefd); // 创建管道
        if (n < 0)
            exit(1); // 创建管道失败,那么就无需与子进程通信了

        pid_t id = fork(); // 创建子进程
        if (id == 0)
        { // 子进程不用创建子进程,只需父进程即可
            // child  --r
            close(pipefd[1]);
            work(pipefd[0]); // 进行工作
            exit(0);
        }

        // farther  --w
        close(pipefd[0]);
        // a.此时父进程已经有了子进程的pid  b.父进程的w端
        std::string channel_name = "channel-" + std::to_string(i); // 构建一个channel名称
        channels->push_back(Channel(pipefd[1], id, channel_name));
    }
}

int nextchannel(int channelnum)
{ // 形成一个0 1 ...到channelnum的编号
    static int next = 0;
    int channel = next;
    next++;
    next %= channelnum;
    return channel;
}

void sendtask(Channel &channel, int taskcommand)
{
    write(channel.getfd(), &taskcommand, sizeof(taskcommand)); // 写
}

void ctrlprocessonce(std::vector<Channel> &channels){//只做一次任务
        sleep(1);//每隔一秒发布一个
        // 第一步选择一个任务 
        int taskcommand = selecttask();
        // 第二步选择一个信道和进程,其实是在vector中选择
        int index_channel = nextchannel(channels.size());
        // 第三步发送任务
        sendtask(channels[index_channel], taskcommand);
        std::cout<<std::endl;
std::cout<<"taskcommand: "<<taskcommand<<" channel: "<<channels[index_channel].getname()<<" subprocess: "<<channels[index_channel].getid()<<std::endl;
}

void ctrlprocess(std::vector<Channel> &channels,int times=-1){
    if(times>0){//固定次数
        while(times--){//根据times控制
            ctrlprocessonce(channels);
        }
    }
    else{//缺省一直
    while(true){//一直控制
        ctrlprocessonce(channels);
    }
    }
}

//  ./processpool 5
int main(int argc, char *argv[])
{
    if (argc != 2) // 说明不用创建子进程,不会用
    {
        std::cerr << "usage: " << argv[0] << "processnum" << std::endl;
        return 1;
    }
    int num = std::stoi(argv[1]); // 表明需要创建几个子进程,stoi转化整数
    inittask();                   // 装载任务

    std::vector<Channel> channels;
    creatchannelandsun(num, &channels); // 创建信道和子进程

    // 通过channel控制子进程
    ctrlprocess(channels,10);//控制10次退出

    //回收进程,把写端关闭那么所有子进程读到0就break退出了
    for(auto &channel:channels){
        channel.closechannel();
    }

    //注意进程回收,则遍历关闭
    for(auto &channel:channels){
        channel.wait();
    }//如果不等待那么子进程就是僵尸进程了
    return 0;
}
//以前我们都是用.h表示头文件声明,.cpp表示实现
//那么我们用.hpp也是c++的一种头文件,他允许将声明和实现和在一个文件里,那么就有一个好处,像这种代码无法形成库,即使形成库也是开源形成的
#pragma once
#include<iostream>
#include<ctime>
#include<stdlib.h>
#include<unistd.h>
#include <sys/types.h>

#define tasknum 3

typedef void (*task_t)();//task_t  返回值为void,参数为空的函数指针

void print(){//三个任务列表
    std::cout<<"i am a printf task"<<std::endl;
}
void download(){
    std::cout<<"i am a download task"<<std::endl;
}
void flush(){
    std::cout<<"i am a flush task"<<std::endl;
}

task_t tasks[tasknum];//函数指针数组

void inittask(){//初始化任务
    srand(time(nullptr)^getpid());//种时间和pid随机种子
    //srand(seed): 这个函数用于用指定的 seed(种子)初始化随机数生成器。
    //seed 的值决定了 rand() 生成的随机数序列。如果使用相同的种子值,每次生成的随机数序列都是一样的。
    //time(nullptr) 提供了一个基于当前时间的种子值。getpid() 提供了一个进程的唯一标识符。
    //用异或操作符 ^ 将这两个值混合在一起,产生一个更为变化的种子值。
    tasks[0]=print;
    tasks[1]=download;
    tasks[2]=flush;
}

void excuttask(int n){//执行任务
    if(n<0||n>2)return;
    tasks[n]();//调用
}

int selecttask(){//随机选择任务
    return rand()%tasknum;
}

 回收子进程:

 不回收子进程:

子进程使用重定向优化

#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include<sys/wait.h>
#include <sys/types.h>
#include "task.hpp"

// void work(int rfd)//执行任务工作
// {
//     while (true)
//     {
//         int command = 0;
//         int n = read(rfd, &command, sizeof(command)); // 父进程写了一个整数,那么我也读一个整数
//         if (n == sizeof(int))
//         {
//             std::cout<<"pid is "<<getpid()<<"chuli task"<<std::endl;
//             excuttask(command); // 执行
//         }
//         else if(n==0){
//             std::cout<<"sub process: "<<getpid()<<" quit"<<std::endl;
//             break;
//         }
//     }
// }

void work()//执行任务工作
{
    while (true)
    {
        int command = 0;
        int n = read(0, &command, sizeof(command)); // 父进程写了一个整数,那么我也读一个整数
        //此时从标准输入去读,没有管道的概念了,对于子进程来说有人通过标准输入将任务给你
        if (n == sizeof(int))
        {
            std::cout<<"pid is "<<getpid()<<"chuli task"<<std::endl;
            excuttask(command); // 执行
        }
        else if(n==0){
            std::cout<<"sub process: "<<getpid()<<" quit"<<std::endl;
            break;
        }
    }
}

// master
class Channel
{
private:
    int _wfd;
    pid_t _subprocessid;
    std::string _name; // 信道名字
public:
    Channel(int wfd, pid_t id, const std::string name) // 构造函数
        : _wfd(wfd), _subprocessid(id), _name(name)    // 构造函数初始化列表
    {
    }
    int getfd() { return _wfd; }
    int getid() { return _subprocessid; }
    std::string getname() { return _name; }
    void closechannel(){//关闭文件描述符
        close(_wfd);
    }
    void wait(){
        pid_t n=waitpid(_subprocessid,nullptr,0);
        if(n>0){
            std::cout<<"wait "<<n<<" success"<<std::endl;
        }
    }
    ~Channel() // 析构函数
    {
    }
};

// 形参类型和命名规范
//  const & :输出
//  & :输入输出型参数
//  * :输出型参数
void creatchannelandsun(int num, std::vector<Channel> *channels)
{
    for (int i = 0; i < num; i++)
    {
        int pipefd[2];
        int n = pipe(pipefd); // 创建管道
        if (n < 0)
            exit(1); // 创建管道失败,那么就无需与子进程通信了

        pid_t id = fork(); // 创建子进程
        if (id == 0)
        { // 子进程不用创建子进程,只需父进程即可
            // child  --r
            close(pipefd[1]);
            dup2(pipefd[0],0);//将管道的读端,重定向到标准输入
            //本来应该在管道的读端读任务,现在做重定向,把0号文件描述符指向管道的读端
            work(); // 进行工作,此时不用传参
            exit(0);
        }

        // farther  --w
        close(pipefd[0]);
        // a.此时父进程已经有了子进程的pid  b.父进程的w端
        std::string channel_name = "channel-" + std::to_string(i); // 构建一个channel名称
        channels->push_back(Channel(pipefd[1], id, channel_name));
    }
}

int nextchannel(int channelnum)//选定一个管道
{ // 形成一个0 1 ...到channelnum的编号
    static int next = 0;
    int channel = next;
    next++;
    next %= channelnum;
    return channel;
}

void sendtask(Channel &channel, int taskcommand)//派发什么任务
{
    write(channel.getfd(), &taskcommand, sizeof(taskcommand)); // 写
}

void ctrlprocessonce(std::vector<Channel> &channels){//只做一次任务
        sleep(1);//每隔一秒发布一个
        // 第一步选择一个任务 
        int taskcommand = selecttask();
        // 第二步选择一个信道和进程,其实是在vector中选择
        int index_channel = nextchannel(channels.size());
        // 第三步发送任务
        sendtask(channels[index_channel], taskcommand);
        std::cout<<std::endl;
std::cout<<"taskcommand: "<<taskcommand<<" channel: "<<channels[index_channel].getname()<<" subprocess: "<<channels[index_channel].getid()<<std::endl;
}

void ctrlprocess(std::vector<Channel> &channels,int times=-1){//控制派发任务次数
    if(times>0){//固定次数
        while(times--){//根据times控制
            ctrlprocessonce(channels);
        }
    }
    else{//缺省一直
    while(true){//一直控制
        ctrlprocessonce(channels);
    }
    }
}

//  ./processpool 5
int main(int argc, char *argv[])
{
    if (argc != 2) // 说明不用创建子进程,不会用
    {
        std::cerr << "usage: " << argv[0] << "processnum" << std::endl;
        return 1;
    }
    int num = std::stoi(argv[1]); // 表明需要创建几个子进程,stoi转化整数
    inittask();                   // 装载任务

    std::vector<Channel> channels;
    creatchannelandsun(num, &channels); // 创建信道和子进程

    // 通过channel控制子进程
    ctrlprocess(channels,10);//控制10次退出
    //或者ctrlprocess(channels);//使用缺省参数一直控制

    //回收进程,把写端关闭那么所有子进程读到0就break退出了
    for(auto &channel:channels){
        channel.closechannel();
    }
    //注意进程回收,则遍历关闭
    for(auto &channel:channels){
        channel.wait();
    }//如果不等待那么子进程就是僵尸进程了
    return 0;
}
//以前我们都是用.h表示头文件声明,.cpp表示实现
//那么我们用.hpp也是c++的一种头文件,他允许将声明和实现和在一个文件里,那么就有一个好处,像这种代码无法形成库,即使形成库也是开源形成的
#pragma once
#include<iostream>
#include<ctime>
#include<stdlib.h>
#include<unistd.h>
#include <sys/types.h>

#define tasknum 3

typedef void (*task_t)();//task_t  返回值为void,参数为空的函数指针

void print(){//三个任务列表
    std::cout<<"i am a printf task"<<std::endl;
}
void download(){
    std::cout<<"i am a download task"<<std::endl;
}
void flush(){
    std::cout<<"i am a flush task"<<std::endl;
}

task_t tasks[tasknum];//函数指针数组

void inittask(){//初始化任务
    srand(time(nullptr)^getpid());//种时间和pid随机种子
    //srand(seed): 这个函数用于用指定的 seed(种子)初始化随机数生成器。
    //seed 的值决定了 rand() 生成的随机数序列。如果使用相同的种子值,每次生成的随机数序列都是一样的。
    //time(nullptr) 提供了一个基于当前时间的种子值。getpid() 提供了一个进程的唯一标识符。
    //用异或操作符 ^ 将这两个值混合在一起,产生一个更为变化的种子值。
    tasks[0]=print;
    tasks[1]=download;
    tasks[2]=flush;
}

void excuttask(int n){//执行任务
    if(n<0||n>2)return;
    tasks[n]();//调用
}

int selecttask(){//随机选择任务
    return rand()%tasknum;
}

 那么有个问题?为什么不能关闭一个再等待

 为什么呢?这是一个bug

 为什么全部关闭后再挨个等待就可以呢?

假如有10个子进程,那么第一个子进程有一个读端10个写端,第二个子进程9个写端1个读端,最后一个进程1个读端1个写端;
如果我们把它全部关完了,那么此时他从上往下遍历,管道最后只有最后一个会释放,那么它对应的上一个管道的写端也会释放,所以递归式的逆向关闭;

需要注意的是函数名就是地址;

标签:std,channels,int,间通信,进程,include,channel
From: https://blog.csdn.net/yiqizhuashuimub/article/details/141931965

相关文章

  • shell脚本监控一个进程号,进程号没有就输出error
    你可以使用一个简单的Shell脚本来监控进程号(PID),如果进程不存在则输出error。以下是一个示例脚本:#!/bin/bash#输入要监控的进程号pid=$1#检查进程是否存在ifps-p$pid>/dev/nullthenecho"Process$pidisrunning."elseecho"error:Process$pid......
  • kswapd0进程占用cpu非常高
    早上,遇到一起故障,调查显示kswapd0进程的内存使用率很高,系统负载从平时的0.x升高到了260多。查到原因是,同事代码逻辑错误,不断的死循环向表里面添加数据,记录数达到了2.7千万。然后对该表altertableaddindexxxx,导致了mysqld内存不断的膨胀,导致需要使用到了swap分区的虚拟内存,然......
  • Linux中的进程优先级与设置方法
    在Linux系统中,进程优先级是影响进程调度的重要因素。进程优先级决定了操作系统在多任务环境中分配CPU时间的方式。以下是关于Linux中进程优先级的详细介绍及其设置方法。1.进程优先级概述优先级范围:Linux中的优先级通常使用一个值来表示,范围从0到139:实时优先级:范......
  • 什么是进程中断
    进程中断是指在操作系统中,当前正在执行的进程被暂时挂起或中止,以便处理某种特定事件或条件。这种机制允许操作系统在多个进程之间进行切换,确保系统能够响应外部事件或高优先级的任务。以下是对进程中断的详细解释:1.中断的类型硬件中断:由外部设备(如键盘、鼠标、网络适配器等)发......
  • 什么是不可中断进程
    不可中断进程(UninterruptibleProcess)是指在某些情况下,进程无法被外部中断信号(如硬中断或软件中断)打断或终止。这种状态通常与特定的内核操作有关,比如等待I/O操作的完成。以下是对不可中断进程的详细解释:1.不可中断进程的特征状态:不可中断进程通常处于D状态(Uninterruptibl......
  • 【操作系统】进程同步之共享内存
    进程的线程共享进程资源,而进程共享计算机资源。在某种程度上,多进程是共享物理内存的。由于操作系统的进程管理,不同的进程有自己独立的内存空间,互不干扰。但是共享内存可以打破这个限制。共享内存允许不相关的进程访问同一片物理内存。共享内存是两个进程之间共享和传递数......
  • 查看文件(或文件夹)被哪个进程使用【文件已在另一程序中打开】
    原文链接:https://www.cnblogs.com/liushui-sky/p/8135292.htmlwindows系统中当我们在删除某个文件或文件夹时有时会提示该文件有程序在使用不能被删除,这时相当惆怅。那么可以用这个方法来找到是哪个进程在占用该文件: 1:打开任务管理器选择“性能” 2:单击下部的“资源监视器”......
  • 进程间通信(IPC):概念、分类与信号机制(2)
    文章目录进程间通信(IPC):概念、分类与信号机制引言IPC的分类信号机制信号周期信号的产生信号的发送信号的接收信号处理示例IPC的其他方法管道通信消息队列共享内存套接字通信服务器端代码客户端代码优点与缺点优点缺点结论进程间通信(IPC):概念、分类与信号机制引言......
  • python 多进程的 Process 和 Queue 的使用
    QuestionfrommultiprocessingimportProcess,Queue解释下这个多进程AnswerfrommultiprocessingimportProcess,Queue是用于多进程处理的模块。详细解释多进程:multiprocessing模块提供了类似于threading模块的API,但它使用的是进程而不是线程。每个进程都有自己的......
  • python语言基础(七)--多进程多线程
    多进程,多线程1、多任务概述多个任务同时执行目的节约资源,充分利用CPU资源,提高效率表现形式并发:针对于单核CPU来讲的,如果有多个任务同时请求执行,但是同一瞬间CPU只能执行1个(任务),于是就安排它们交替执行.因为时间间隔非常短(CPU执行速度太快......