一、进程间通信介绍
1.1 进程通信的目的
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
1.2进程间通信的发展
- 管道
- system V进程间通信
- POSIX进程间通信
现在我们在进程间通信中使用的最多的就是管道和共享内存。
1.3进程间通信分类
管道分为:
匿名管道pipe、命名管道
System V IPC分为:
System V消息队列、System V共享内存、System V信号量
POSIX IPC分为:
消息队列、共享内存、信号量、互斥量、条件变量、读写锁
二、进程间通信是什么呢?
简单的来说是:两个或者多个进程实现数据层面的交互
因为进程独立性的存在,导致进程通信的成本比较高。总的来说,通信是有成本的。
2.1为什么要进行进程间通信?
1.传输基本数据
2.发送命令
3.达到某种协同
4.通知
也就是说完成某种业务逻辑的自洽和完整。
2.2对进程通信的思考?
2.2.1如何进行进程间通信呢?
a.进程间通信的本质:必须让不同的进程看到同一份“资源”
b.“资源”?特定形式的内存空间(可以是内核文件或者是我们常见的文件)
c."资源"谁来提供? ----一般由操作系统提供,为什么不是我们两个进程之间的一个来提供呢? 假设一个进程提供,这个资源属于谁? 毕竟进程具有独立性,那么这个资源为这个进程独有,破坏进程独立性。
d.我们进程访问这个空间,进行通信,本质就是在访问操作系统!
2.2.2既然要进行通信,那么如何进行管理?
我们对进程之间通信的这个进行抽象理解。进程代表的就是用户,“资源”由操作系统创建,使用,释放。都是调用的系统提供的接口。
从底层设计,从接口设计,都要由操作系统独立设计。一般操作系统,会有一个独立的通信模块--隶属于文件系统 --IPC 通信模块
我们的有名管道和匿名管道都是属于IPC通信模块。
2.3通信标准的制定
定制标准 :
进程间通信是有标准的 ---systemV && posix
systemV主要用于本机内部通信
posix用于网络通信
进程间通信的分类
管道:
匿名管道pipe
命名管道
System V
System V 消息队列
System V 共享内存
System V 信号量
posix
消息队列
共享内存
信号量
互斥量
条件变量
读写锁
三、管道
3.1什么是管道
管道是Unix中最古老的进程间通信的形式。
我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
特点:
单向通信,只允许一端读一端写,基于文件系统进行通信。
如何进行双向通信,也就是在单向管道的基础上,再多加一个管道。
管道通信需要两个通信的进程是父子关系(常用)/兄弟关系/爷孙关系(血缘关系),父进程进行拷贝子进程,使得父进程和子进程能看到同一份文件。底层原理,是他们拥有相同的files_struct。
3.2管道分类
在进行文件信息通信的时候,文件缓存区里面的文件如果没有名字,那么这样的管道通常被称为匿名管道。
那么如果我们提前或者在代码文件里面创建管道文件,管道文件有自己的名称,所以这种管道被称为有名管道。这里先不探究有名管道如何进行的进程通信。
3.3管道的特征
- 具有血缘关系的进程之间进行通信
- 管道之间只能进行单向通信
- 父子进程会进行协同的,同步与互斥的 ----保护管道文件的数据安全
- 管道是面向字节流的
- 管道是基于文件的,而文件的生命周期是随进程的!(管道的大小64kb)
深度理解,管道是有固定大小的?
如何进行验证这个管道的大小呢?
ulimit -a | grep pip
可以看到为512b的大小
3.4管道的状态
管道的四种情况:
1.读写端正常,管道如果为空,那么读端就会阻塞。
2.读写端正常,管道如果被写满,写端就要阻塞。
3.读端正常读,写端关闭,读端就会读到0,表示读到了文件(pipe)结尾,不会被阻塞
4.写端正常写,读端关闭了。操作系统就要杀掉正在写入的进程(这个得我们自己操作)。
问题来了,如何杀掉进程?
命令为 kill -9 pid
(pid为进程编号)
通过9号信号杀掉。(操作系统是不会做,低效,浪费等类似的工作。如果做了就是操作系统的bug)
3.4创建匿名管道
匿名管道是向我们的操作系统申请一个内核文件,利用文件缓冲区进行读写通信。fd[0]是读,fd[1]是写,在设计匿名管道的时候需要注意只能是一端读一端写,要将对应的另一端进行关闭。
父进程读、子进程写
如下是匿名管道进行通信的代码
#include<iostream>
#include<cstdlib>
#include<unistd.h>
#include<string>
#include<stdio.h>
#include<sys/wait.h>
#include<sys/types.h>
#include<cstring>
using namespace std;
#define N 2
#define NUM 1024
//child 子进程
void Writer(int wfd)
{
string s="hello ,I am child";
pid_t self=getpid();
int number=0;
char buffer[NUM];
while(true)
{
sleep(1);
//构建字符数组
buffer[0]=0; //字符串清空
snprintf(buffer,sizeof(buffer),"%s-%d-%d",s.c_str(),self,number++);
//cout<<buffer<<endl;
//发送/给父进程 调用系统wirte接口
write(wfd,buffer,strlen(buffer));
}
}
void Reader(int rfd)
{
char buffer[NUM];
int cnt=0;
while(true)
{
sleep(1);
buffer[0]=0;
ssize_t n=read(rfd,buffer,sizeof(buffer));
if(n>0)
{
buffer[n]=0; // 0=="\0"
cout<<"father get a message[ "<<getpid()<<"]#"<<buffer<<endl;
}
if(n==0) //当返回n 说明写端已经关闭 已经读到了文件的结尾
{
printf("father read file done ....");
break;
}
//cout<<"n: "<<n<<endl;
cnt++;
if(cnt>5) break;
}
}
int main()
{
int pipefd[N]={0};
int n=pipe(pipefd);
if(n<0)
{
return 1;
}
// cout<<"pipefd[0]:"<<pipefd[0]<<" "<<"pipefd[1]"<<pipefd[1]<<" "<<endl;
pid_t id= fork();
if(id<0)
{
return 2;
}
else if(id==0)
{
//子进程
//child
//子进程可以写入 pipefd[0]:3 pipefd[1]4
close(pipefd[0]);
//IPC code //进行通信
Writer(pipefd[1]);
close(pipefd[1]);
exit(0);
}
//父进程
//father
//父进程进行读
close(pipefd[1]);
//IPC code //进行通信
Reader(pipefd[0]);
close(pipefd[0]);
cout<<"father close read fd:"<<id<<endl;
sleep(5);
//回收子进程
int status=0;
pid_t rid=waitpid(id,&status,0);
if(rid<0) return 3;
cout<<"wait child success: "<<rid<<"exit code: "<<((status>>8)&0xff)<<" exit signal: "<<(status&0x7f)<<endl;
sleep(5);
cout<<"father quit "<<endl;
return 0;
}
匿名管道必须是具有血缘关系的进程,我们用父子进程进行管道通信,如上是父亲读,儿子写,当没有消息在缓存区,那么father关闭读端,并等待子进程,防止变成僵尸进程。
子进程读、父进程写
如下是匿名管道通信代码
#include<iostream>
#include<cstdlib>
#include<unistd.h>
#include<string>
#include<stdio.h>
#include<sys/wait.h>
#include<sys/types.h>
#include<cstring>
using namespace std;
#define N 2
#define NUM 1024
//父进程 parent
void Writer(int wfd)
{
string s="hello ,I am father";
pid_t self=getpid();
int number=0;
char buffer[NUM];
while(true)
{
//构建字符数组
buffer[0]=0; //字符串清空
snprintf(buffer,sizeof(buffer),"%s-%d-%d",s.c_str(),self,number++);
//cout<<buffer<<endl;
//发送/给父进程 调用系统wirte接口
write(wfd,buffer,strlen(buffer));
sleep(1);
}
}
void Reader(int rfd)
{
char buffer[NUM];
while(true)
{
buffer[0]=0;
ssize_t n=read(rfd,buffer,sizeof(buffer));
if(n>0)
{
buffer[n]=0; // 0=="\0"
cout<<"child get a message[ "<<getpid()<<"]#"<<buffer<<endl;
}
if(n==0) //当返回n 说明写端已经关闭 已经读到了文件的结尾
{
printf("child read file done ....");
}
//cout<<"n: "<<n<<endl;
sleep(1);
}
}
int main()
{
int pipefd[N]={0};
int n=pipe(pipefd);
if(n<0)
{
return 1;
}
// cout<<"pipefd[0]:"<<pipefd[0]<<" "<<"pipefd[1]"<<pipefd[1]<<" "<<endl;
pid_t id= fork();
if(id<0)
{
return 2;
}
else if(id==0)
{
//子进程
//child
//子进程可以读 pipefd[0]:3 pipefd[1]4
close(pipefd[1]);
//IPC code //进行通信
Reader(pipefd[0]);
close(pipefd[0]);
exit(0);
}
//父进程
//father
//父进程进行写
close(pipefd[0]);
//IPC code //进行通信
Writer(pipefd[1]);
int number=0;
pid_t rid=waitpid(id,&number,0);
close(pipefd[1]);
sleep(5);
return 0;
}
3.5创建有名管道
有名管道是通过让不同的没有血缘关系的进程,看到同一份管道文件,这样是建立通信的前提,不同的进程通过仅读仅写的方式进行通信。先介绍一下创建管道文件的手册信息。
pathname是我们文件创建的位置。mode有很多模式。比如O_WRONLY、O_RDONLY等
返回值
当返回0就是创建管道文件成功。否者就是返回-1。
如下是我们有名管道的实现代码
processa.cc
//processa.cc
#include"log.hpp"
#include"comm.hpp"
//a进程进行写 那么就要打开文件 仅写
int main()
{
int fd=open(FILE_NAME,O_WRONLY); //仅写
if(fd<0)
{
perror("open");
//log(Fatal,"process a open 管道文件失败");
exit(1);
}
else
{
cout<<"process a open file done"<<endl;
}
//准备往管道文件里面写入数据
while(true)
{
string temp;
cout<<"process a Enter:";
getline(cin,temp);
write(fd,temp.c_str(),temp.size());
sleep(1);
}
return 0;
}
processb.cc
#include"log.hpp"
#include"comm.hpp"
int main()
{
int fd=open(FILE_NAME,O_RDONLY);
if(fd<0)
{
perror("open");
//log(Fatal,"process b open 管道文件失败");
exit(1);
}
else
{
cout<<"process b open file done"<<endl;
}
while(true)
{
//接收管道文件数据----read
char buffer[1024] = {0};
int x = read(fd, buffer, sizeof(buffer));
if(x>0)
{
//buffer有数据
buffer[x]=0;
cout<<"process a say: "<<buffer<<endl;
}
else if(x==0)
{
//写端已经关闭 读端也要开始退出
cout<<"process a is dead,now process b too"<<endl;
break;
}
}
return 0;
}
comm.hpp
#pragma once
#include <iostream>
#include <string>
#include <cerrno>
#include <cstring>
#include <cstdlib>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#define FIFO_FILE "./myfifo"
#define MODE 0664
enum
{
FIFO_CREATE_ERR = 1,
FIFO_DELETE_ERR,
FIFO_OPEN_ERR
};
class Init
{
public:
Init()
{
// 创建管道
int n = mkfifo(FIFO_FILE, MODE);
if (n == -1)
{
perror("mkfifo");
exit(FIFO_CREATE_ERR);
}
}
~Init()
{
int m = unlink(FIFO_FILE);
if (m == -1)
{
perror("unlink");
exit(FIFO_DELETE_ERR);
}
}
};
makefile
.PHONY:all
all:processa processb
processa:processa.cc
g++ -o $@ $^ -g -std=c++11
processb:processb.cc
g++ -o $@ $^ -g -std=c++11
.PHONY:clean
clean:
rm -f processa processb
3.6管道的应用场景
3.6.1我们常用的 |
1.cat test.txt | head -10 | tail -5
2.sleep 1111 | sleep 2222 | sleep 3333
ps ajx | grep sleep
ps ajx | grep 16657
可以发现父进程就是bash,我们对于这种类型的管道也认为是匿名管道。
3.4.2 管道的衍生场景
使用管道实现一个简易版本的进程池!
进程池是什么?
进程池是我们池化技术的一种,通过一个匿名管道进行实现,主进程和多个子进程进行管道通信,如一个分配任务的场景,通过主进程写,轮询的向没有任务的子进程发送任务信息,并让他输出收到的是几号任务的一个场景。
ProcessPool.cc
#include<string>
#include<vector>
#include<unistd.h>
#include<cstdlib>
#include<assert.h>
#include<sys/types.h>
#include<sys/wait.h>
#include<iostream>
#include<ctime>
#include"Task.hpp"
const int processnum=10;
using std::cout;
using std::endl;
using std::cin;
std::vector<task_t> tasks;
struct channel
{
//初始化列表
channel(int cmdfd,int slaverid,const std::string &processname)
:_cmdfd(cmdfd),_slawverid(slaverid),_processname(processname)
{}
public:
int _cmdfd; // 发送任务的文件描述符
pid_t _slawverid; // 子进程的PID
std::string _processname; // 子进程的名字 -- 方便我们打印日志
};
void slaver() //
{
while(true) // 死锁
{
int cmdcode=0;
int n=read(0,&cmdcode,sizeof(int)); //如果父进程不给子进程发任务 -->子进程进行阻塞等待
if (n==sizeof(int))
{
//执行cmdcode的任务列表
cout<<"slaver say get a command: "<<getpid()<<": cmdcode: "<<cmdcode<<endl;
//合法下标 执行任务
if(cmdcode>=0&&cmdcode<tasks.size()) tasks[cmdcode]();
//任务执行完之后 无任务状态是阻塞状态等主进程写进来任务
}
else if(n==0)
{
break;
}
}
}
//输入 const &
//输出 *
// 输入 输出 &
void InitProcessPool(std::vector<channel>* channels)
{
for(int i=0;i<processnum;i++)
{
int pipefd[2]; //临时变量
int n=pipe(pipefd);
assert(!n);
(void)n;
pid_t id =fork();
if(id==0) //子进程 child
{
close(pipefd[1]); //关闭写端
dup2(pipefd[0],0); //重定向 0表示标准输入 相当于pipefd[0]子进程从标准输入读
close(pipefd[0]); //关闭
//slaver(pipefd[0]);
slaver();
//当子进程的read返回值为0,那么slaver函数退出
cout<<"process:"<<getpid()<<" quit "<<endl; //子进程不在进行接收任务 --->等价于读端关闭,写端正常
//此时的子进程应该被操作系统杀死
//子进程结束 发送了管道信号 子进程的状态变成僵尸进程等待主进程回收
exit(0);
}
//father
close(pipefd[0]); //关闭主进程的读端
//添加channel 字段
std::string name="process-" +std::to_string(i);
channels->push_back(channel(pipefd[1],id,name));
}
}
void Debug(const std::vector<channel>& channels)
{
//test
for(auto &c: channels)
{
cout<<c._cmdfd<<" "<<c._slawverid<<" "<<c._processname<<endl;
}
}
void Menu()
{
cout<<"#################################################################"<<endl;
cout<<"###########1.刷新日志############2.刷新出来野怪####################"<<endl;
cout<<"###########3.检测软件是否更新#####0.退出###########################"<<endl;
}
void ctrSlaver(const std::vector<channel>& channels)
{
int which=0;
while(true)
{
int select=0;
Menu();
cout<<"Please Enter@ ";
cin>>select;
if(select<=0 ||select >=4) break;
int cmdcode=select-1;
//1.选择任务
//int cmdcode=rand()%tasks.size();
//2.选择进程
//如何将任务都能分配到
//方法
//1随机数
//int processpos=rand()%channels.size();
//主进程的操作 ----写
cout<<"father say: "<<"cmdcode: "<<cmdcode
<<" already sendto "<<channels[which]._slawverid
<<" processname: "<<channels[which]._processname<<endl;
//3.发送任务
write(channels[which]._cmdfd,&cmdcode,sizeof(cmdcode));
//轮询分配任务 ---子进程完成打印的这个任务是瞬时的
which++;
which%=channels.size();
sleep(1);
}
}
void QuitProcess(const std::vector<channel>& channels)
{
//关闭所有的带有任务的子进程
for(const auto &c:channels) close(c._cmdfd); //子进程变成僵尸状态
for(const auto &c:channels) waitpid(c._slawverid,nullptr,0); //等到子进程关闭并进行回收
}
int main()
{
// std::vector<task_t> tasks; 不能定义在局部 否则就无法被slaver函数访问
LoadTask(&tasks);
srand(time(nullptr)^getpid()^1023);//种一个随机数种子
//在组织
std::vector<channel> channels;
//1.初始化
InitProcessPool(&channels);
//Debug(channels);
//2.控制子进程
ctrSlaver(channels);
//3.清理收尾
QuitProcess(channels);
}
Task.hpp
#pragma once
#include<iostream>
#include<vector>
//函数指针
typedef void (*task_t)();
using std::cout;
using std::endl;
void task1()
{
cout<<"lol 刷新日志"<<endl;
}
void task2()
{
cout<<"lol 更新野区,刷新出来野怪"<<endl;
}
void task3()
{
cout<<"lol 检测软件是否更新,如果需要,就提示用户"<<endl;
}
void LoadTask(std::vector<task_t> *tasks)
{
tasks->push_back(task1);
tasks->push_back(task2);
tasks->push_back(task3);
}
makefile
ProcessPool:ProcessPool.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -rf ProcessPool