本文对System V标准的共享内存和消息队列这两种进程间通信方式进行讨论,涉及原理、系统调用接口以及相关的内核数据结构,并给出相关示例代码。
System V 共享内存
基本原理
进程间通信必须要让不同的进程看到同一份内存资源,因为进程具有独立性,所以这份内存资源是操作系统提供的,接口是由操作系统设计的。顾名思义,共享内存就是让不同的进程看到同一块内存空间,对这块内存空间的申请、释放、挂接和控制,都必须借助系统调用接口来完成。
具体做法是,申请一块物理内存空间,将这块内存空间映射到不同进程的地址空间,进程地址空间是进程看待和管理自己资源的窗口,不同进程通过对这块内存的读写,可以进行进程间通信。
系统调用接口
Linux 提供的进程间通信接口无非分两大类:建立通信和取消通信。建立共享内存通信时,首先需要申请一块内存空间。承上,这块内存不能是任意一个用户进程申请的,只能借助系统调用接口。
使用shmget(2)
获取一块共享内存:
#include <sys/ipc.h>
#include <sys/shm.h>
int shmget(key_t key, size_t size, int shmflg);
//成功返回共享内存的shmid,失败返回-1
- 参数 key 标识了共享内存的唯一性,一个进程也是通过 key 保证自己拿到的是与其他进程相同的同一块共享内存,每块共享内存的 key 值都不能相同。key 值只能由用户自己约定,并且保证 key 值的唯一性,而不能交由操作系统随机生成,这是因为 key 值必须在不同的进程之间保持透明,在通信被成功建立之前,这一点只有设计通信的用户能做到。
- 参数 shmflg 指定获取共享内存的方式,由数个整型组合而成:
- IPC_CREAT 获取一块共享内存,如果已经存在就直接返回,否则就申请;可以单独使用。
- IPC_EXCL 只能和 IPC_CREAT 一起使用;如果共享内存已经存在,就报错返回。
- mode_flags 新申请一块共享内存时,还必须指明这块共享内存的权限,形式与文件的权限相同,可以用3个八进制数表示。
可以通过ftok(3)
辅助生成一个共享内存的 key 值:
#include <sys/types.h>
#include <sys/ipc.h>
key_t ftok(const char *pathname, int proj_id);
//成功返回一个key值,失败返回-1
pathname 是一个文件路径,这个路径必须指向一个存在且可进入的文件;proj_id 是一个非零整数。系统会通过 pathname 和 proj_id 进行计算以得到一个冲突概率较小的 key 值并返回。
申请共享内存成功后,需要将这块内存映射到进程的地址空间,即挂接共享内存。
使用shmat(2)
挂接共享内存:
#include <sys/types.h>
#include <sys/shm.h>
void *shmat(int shmid, const void *shmaddr, int shmflg);
//成功返回共享内存的首地址,失败返回(void*)-1
shmid 为要挂接的共享内存的 id。shmaddr 为挂接共享内存的首地址,一般将这个参数设为 nullptr,表示让操作系统在地址空间中自动寻找一块空闲的空间加载。shmflg 指明加载共享内存的权限,如果置为 SHM_RDONLY,则以只读方式加载共享内存,进程对这块共享内存是只读的,并且必须具有对这块内存的读权限;如果置为 0,则以读写方式加载共享内存,进程对这块共享内存是可读可写的,并且必须具有对这块内存的读权限和写权限。
下面是取消挂接和对共享内存进行控制的接口。
使用shmgt(2)
取消对共享内存的挂接:
#include <sys/types.h>
#include <sys/shm.h>
int shmdt(const void *shmaddr);
//成功返回0,失败返回-1
对首地址为 shmaddr 的共享内存取消挂接,即取消共享内存在进程地址空间中的映射,此后进程无法对这块共享内存进行读写。当要取消进程间的通信时,首先要取消对共享内存的挂接。
使用shmctl(2)
对共享内存进行控制:
#include <sys/ipc.h>
#include <sys/shm.h>
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
shmid 为共享内存的 id,cmd 为操作命令,常用的有下面几个:
- IPC_STAT 获取共享内存的相关信息,包括共享内存的大小、共享内存的创建者和当前挂接数等,获取到的信息字段会被填入到一个 shmid_ds 结构体中,此时 buf 作为输出型参数,不能为空。
- IPC_RMID 标记释放共享内存(Mark the segment to be destroyed)。当一块共享内存被标记释放后,并不是立即被释放,而是禁止被挂接,直到已有的挂接数减为 0 时才被释放。执行 IPC_RMID 的进程必须是共享内存的拥有者或者具有对共享内存操作的权限。此时 buf 参数置为 nullptr 即可。
直接使用内存读写系列的函数即可对共享内存进行读写。
下面是一个 A 进程和 B 进程使用共享内存进行通信的简单demo代码:
/*
filename:comm.hpp
对获取和新建共享内存的接口进行封装
*/
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/stat.h>
#include <sys/fcntl.h>
#include "log.hpp"
const int mem_size = 4096; //默认共享内存的大小为 4096
const char* tok_file_name = "/home/shr/code/2023_y";
const int proj_id = 0x666;
Log log(MULTIFILE);
class shared_mem
{
public:
int shmid_get() //获取共享内存shmid
{
return getShmidHelper(IPC_CREAT);
}
int shm_creat() //创建一块新的共享内存并返回其shmid
{
return getShmidHelper(IPC_CREAT | IPC_EXCL | 0666);
}
private:
//获取 key 值
int getKey()
{
key_t key = ftok(tok_file_name, proj_id);
if(key < 0) {
log(FATAL, "%s:%d:getKey err", __FILE__, __LINE__);
exit(1);
}
log(LOG, "getKey done:%d", key);
return key;
}
int getShmidHelper(int flag)
{
//以指定方式新建或拿取shmid
int shmid = shmget(getKey(), mem_size, flag);
if(shmid < 0) {
log(FATAL, "%s:%d:shmget err", __FILE__, __LINE__);
exit(1);
}
return shmid;
}
};
/*
filename:processa.cpp
*/
#include <cstdio>
#include <cstring>
#include <string>
#include <iostream>
#include "comm.hpp"
int main()
{
int shmid = shared_mem().shm_creat(); //新建共享内存
std::cout << shmid << std::endl;
//attach共享内存
char* shmAddress = (char*)shmat(shmid, nullptr, 0);
struct shmid_ds shm_ds;
shmctl(shmid, IPC_STAT, &shm_ds); //获取共享内存的属性信息
std::cout << "shared memory ds-> " << std::endl;
printf("__key: 0x%x\n", shm_ds.shm_perm.__key);
std::cout << "segment size(bytes): " << shm_ds.shm_segsz << std::endl;
std::cout << "creator(PID): " << shm_ds.shm_cpid << std::endl;
std::cout << "numbers of attch: " << shm_ds.shm_nattch << std::endl;
std::string msg;
int cnt = 5;
while(cnt--)
{
std::cout << "please enter@ ";
std::getline(std::cin, msg);
//写入共享内存
memcpy(shmAddress, msg.c_str(), 1024);
}
shmdt(shmAddress); //detach共享内存
shmctl(shmid, IPC_RMID, nullptr); //释放共享内存
return 0;
}
/*
filename:processb.cpp
*/
#include <cstdio>
#include <iostream>
#include "comm.hpp"
int main()
{
//获取共享内存shmid
int shmid = shared_mem().shmid_get();
//挂接共享内存
char* shmAddress = (char*)shmat(shmid, nullptr, 0);
while(true) {
//不断读取共享内存
std::cout << "process b get a msg: " << shmAddress << std::endl;
sleep(1);
}
shmdt(shmAddress); //deattach共享内存
return 0;
}
在命令行层面,使用ipcs -m
命令查看当前系统中存在的共享内存;使用ipcrm -m [shmid]
标记释放共享内存,直到挂接数减为 0 时才真正释放,与上文的 IPC_RMID 选项同理。
相关内核数据结构
操作系统在对共享内存进行管理时,首先要将共享内存抽象为某种结构,这个结构即struct shmid_ds
,其中的字段如下:
可见其中包含了共享内存的各种属性字段,包括共享内存的大小、相关时间和创建者等信息,上文中指定 IPC_STAT 选项的 shmctl(2) 接口的原理即是将内核中 shmid_ds 的各个字段拷贝到用户自定义的缓冲区。
同时 shmid_ds 中包含了一个ipc_perm
结构,其中记录了共享内存的 key 值等相关信息,这个结构是 System V 进程间通信的关键,后续文章会对其进行更多的讨论。
shmid 和 key 值都标识了一块共享内存的唯一性,但是从 shmid_ds 和 ipc_perm 中的字段,以及相关的系统调用接口可以看出, key 值是在内核层面标识了共享内存的唯一性,而 shmid 在用户层面标识了共享内存的唯一性。
特性
通过上面对共享内存原理的讨论,以及示例代码的运行结果,可以总结出共享内存通信具有这样的特性:
- 使用共享内存进行通信的进程之间不进行协同。这是一个明显的现象,运行上文中的 demo 代码,可以发现 a 进程和 b 进程开始运行后,b 进程未等待 a 进程进行写入,当 a 进程将共享内存写满时,也未等待 b 进程读取。
- 共享内存中的数据只能由用户自己进行维护,进程之间的读写协议只能由用户自己制定。
- 共享内存是所有进程间通信方式中最快的,这是因为被挂接后,共享内存被每个进程视为自己的内存资源,直接进行读写即可,拷贝次数较少。
另外,共享内存的生命周期是随内核的,如果用户不主动释放,则共享内存一直存在。
共享内存通信本身不进行协同,但可以借助其他具有协同特性的通信方式实现协同。下面是使用管道对上文的demo代码进行修改后的结果:首先读、写进程会等待对方打开管道文件,双方打开管道后,当写进程写入共享内存完毕后,将管道内发送信息表示写操作完成,另一端的读进程收到管道信息,结束阻塞,向下执行代码,读共享内存中的信息。
/*
filename:comm.hpp
*/
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/stat.h>
#include <sys/fcntl.h>
#include "log.hpp"
const int mem_size = 1024;
const char* tok_file_name = "/home/shr/code/2023_y";
const char* FIFONAME = "file.fifo";
const int proj_id = 0x666;
Log log(MULTIFILE);
class shared_mem
{
public:
int shmid_get() //获取shmid
{
return getShmidHelper(IPC_CREAT);
}
int shm_creat() //创建shmid
{
return getShmidHelper(IPC_CREAT | IPC_EXCL | 0666);
}
private:
int getKey()
{
key_t key = ftok(tok_file_name, proj_id);
if(key < 0) {
log(FATAL, "%s:%d:getKey err", __FILE__, __LINE__);
exit(1);
}
log(LOG, "getKey done:%d", key);
return key;
}
int getShmidHelper(int flag)
{
//以指定方式新建或拿取shmid
int shmid = shmget(getKey(), mem_size, flag);
if(shmid < 0) {
log(FATAL, "%s:%d:shmget err", __FILE__, __LINE__);
exit(1);
}
return shmid;
}
};
//管理fifo管道
class FIFO
{
public:
FIFO(const char* fifoName = FIFONAME)
:_fifo_name(fifoName)
{
int ret = mkfifo(fifoName, 0666);
if(ret < 0) { log(FATAL, "FIFO:mkfifo err"); exit(1); }
log(LOG, "FIFO mked");
}
const char* getFifoName()
{
return _fifo_name;
}
~FIFO()
{
unlink(_fifo_name);
log(LOG, "FIFO closed");
}
private:
const char* _fifo_name = FIFONAME;
};
/*
filename:processa.cpp
*/
#include <cstdio>
#include <cstring>
#include <string>
#include <iostream>
#include "comm.hpp"
int main()
{
int shmid = shared_mem().shm_creat(); //新建共享内存
std::cout << shmid << std::endl;
char* shmAddress = (char*)shmat(shmid, nullptr, 0); //attach共享内存
FIFO fifo; //利用管道实现共享内存通信的进程之间的协同
int fd = open(fifo.getFifoName(), O_WRONLY); //会等待读端进程open管道,否则阻塞
if(fd < 0) { log(FATAL, "%s:%d:open fifo err", __FILE__, __LINE__); exit(1); }
struct shmid_ds shm_ds;
shmctl(shmid, IPC_STAT, &shm_ds); //获取共享内存的属性信息
std::cout << "shared memory ds-> " << std::endl;
printf("__key: 0x%x\n", shm_ds.shm_perm.__key);
std::cout << "segment size(bytes): " << shm_ds.shm_segsz << std::endl;
std::cout << "creator(PID): " << shm_ds.shm_cpid << std::endl;
std::cout << "numbers of attch: " << shm_ds.shm_nattch << std::endl;
std::string msg;
int cnt = 5;
while(cnt--)
{
std::cout << "please enter@ ";
std::getline(std::cin, msg);
memcpy(shmAddress, msg.c_str(), 1024);
char c = 'c';
ssize_t s = write(fd, &c, 1); //告诉对方进程,写入完毕,使对方进程结束read的阻塞
if(s < 0) { log(FATAL, "process a:write err"); exit(1); }
}
close(fd);
shmdt(shmAddress); //deattch共享内存
shmctl(shmid, IPC_RMID, nullptr); //释放共享内存
return 0;
}
/*
filename:processb.cpp
*/
#include <cstdio>
#include "comm.hpp"
int main()
{
int shmid = shared_mem().shmid_get();
char* shmAddress = (char*)shmat(shmid, nullptr, 0);
int fd = open(FIFONAME, O_RDONLY); //会等待写端进程open,否则阻塞
if(fd < 0) { log(FATAL, "%s:%d:open fifo err", __FILE__, __LINE__); exit(1); }
while(true)
{
char c;
ssize_t s = read(fd, &c, 1); //阻塞,直到read到对方进程写入管道的写入就绪的信息
if(s < 0) { log(FATAL, "process b:read err"); exit(1); }
else if(s == 0) { log(LOG, "process b read exit..."); break; } else {}
std::cout << "process b get a msg: " << shmAddress << std::endl;
}
shmdt(shmAddress); //deattach共享内存
return 0;
}
System V 消息队列
System V 消息队列的原理与接口与 System V 共享内存大致相同。
基本原理
消息队列是消息的链接表,存储在内核中,由操作系统进行维护。使多个进程看到同一个消息队列,即可以实现进程间的通信。
系统调用接口
使用msgget(2)
获取一个消息队列:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
//成功返回msqid,失败返回-1
参数意义与 shmget(2) 相同。同样可以使用ftok(3)
接口辅助获取 key 值。
使用msgctl(2)
对消息队列进行控制:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
//成功返回0,失败返回-1
参数意义与 shmctl(2) 相同。
使用msgsnd(2)
向消息队列中投放消息:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
//成功返回0,失败返回-1
使用消息队列进行通信时,要将信息封装成一个结构对象struct msgbuf {}
,这个 msgbuf 的标准格式如下:
struct msgbuf {
long mtype; /* message type, must be > 0 */
char mtext[1]; /* message data */
};
可见,一条消息中包含两部分:消息类型和信息,为了区分,下文将整个 msgbuf 称为“消息”,而将实际的信息部分 mtext 称为“信息”。
将 msgbuf 对象整体在队列中投放和读取。msqid 指定要投放到哪个消息队列。msgp 是一个指向 msgbuf 的指针。msgsz 为 msgbuf 对象中 mtext 的大小。msgflg 可以指定消息的投放方式,一般情况下,msgsnd(2) 进行阻塞投放,如果消息队列为满,则等待读进程读取消息,直到消息队列中腾出空间再进行投放;当指定 msgflg 为 IPC_NOWAIT 时,msgsnd 不阻塞,如果当前队列为满,则直接投放失败,返回 EAGAIN。
使用msgrcv(2)
从队列中拿取消息:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtype,
int msgflg);
//若成功,返回实际读到的 mtext 部分的长度;若出错,返回-1
msqid 指定从哪个队列拿取。msgp 是一个指向作为输出型参数的 msgbuf 对象的指针。msgsz 为信息部分 mtext 的大小。msgtype 的存在可以实现接收者以非先进先出的次序取消息:
- 当 type > 0 时,拿取队列中消息类型值为 type 的第一个消息。
- 当 type < 0 时,拿取队列中消息类型值小于或等于 type 绝对值的第一个消息。
- 当 type == 0 时,拿取队列中的第一个消息。
对非先进先出次序读取的支持,可以在很多地方发挥作用。如果程序对消息的接收有优先权,则 type 可以设置为优先权值(多线程进行消息区分);当多个进程同时读取一个队列时,type 值可以进行消息的区分(多进程进行消息区分)。
msgrcv 默认以阻塞方式接收消息,直到消息队列中队头第一个消息符合条件才取消阻塞,进行读取。如果指定 msgflg 为 IPC_NOWAIT,则 msgrcv 不阻塞,若当前队头第一个元素不符合条件,直接接收失败返回 -1。
相关内核数据结构
与 struct shmid_ds 相似,内核中消息队列的抽象结构msqid_ds
中的字段如下:
struct msqid_ds {
struct ipc_perm msg_perm; /* Ownership and permissions */
time_t msg_stime; /* Time of last msgsnd(2) */
time_t msg_rtime; /* Time of last msgrcv(2) */
time_t msg_ctime; /* Time of last change */
unsigned long __msg_cbytes; /* Current number of bytes in
queue (nonstandard) */
msgqnum_t msg_qnum; /* Current number of messages
in queue */
msglen_t msg_qbytes; /* Maximum number of bytes
allowed in queue */
pid_t msg_lspid; /* PID of last msgsnd(2) */
pid_t msg_lrpid; /* PID of last msgrcv(2) */
};
struct ipc_perm {
key_t __key; /* Key supplied to msgget(2) */
uid_t uid; /* Effective UID of owner */
gid_t gid; /* Effective GID of owner */
uid_t cuid; /* Effective UID of creator */
gid_t cgid; /* Effective GID of creator */
unsigned short mode; /* Permissions */
unsigned short __seq; /* Sequence number */
};
与共享内存类似,在内核中,key 标识了消息队列的唯一性。
特性
消息队列具有下面几点特征:
- 消息队列通信会进行进程之间的协同,如果队列为满,则默认发送端阻塞;如果队列为空或者消息不符合条件,则默认接收端阻塞。
- 消息队列是面向 msgbuf 的,每次收发都以一个 msgbuf 对象为单位。