消息队列
概念
-
消息队列是消息的链表,存放在内核中并由消息队列标识符标识,消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺陷。消息队列包括 POSIX 消息队列和 System V 消息队列。
-
消息队列是 UNIX 下不同进程之间实现共享资源的一种机制,UNIX 允许不同进程将格式化的数据流以消息队列形式发送给任意进程,有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。
原理
步骤
- 创建或者获取消息队列
- 读取或写入数据
- 删除消息队列(非必须)
创建/获取消息队列
功能
- 创建或者获取消息队列
函数原型
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
key
:IPC_PRIVATE
或 通过ftok()
获取msgflg
:IPC_CREAT | 0666
,内核中不存在指定队列就创建,否则就获取它IPC_CREAT | IPC_EXCL | 0666
,若队列已存在则出错(函数返回-1)
返回值
- 成功返回
msgid
(队列ID) - 失败返回-1,并设置errnor
使用命令获取消息队列
ipcs -q # interprocess communication status——进程间通信的状态 q——queue
读取/写入数据
写入数据
功能
- 往消息队列中写入数据
函数原型
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
-
msqid
:msgget()
的返回值(代表消息队列) -
msgp
:万能指针(写入的类型 + 数据)struct msgbuf { long mtype; // 消息类型(>0) // 消息正文,多少字节随你而定 // ... };
-
只要保证首4字节是一个整数(代表类型)就行(64位系统,建议用 long 类型)
-
正文部分是什么数据类型都没关系,因为消息队列传递的是 2 进制数据
-
示例:
struct msgbuf { long type; char name[20]; int age; } msg; struct msgbuf { long type; int start; int end; } msg;
-
-
msgsz
:数据域的字节数(不包含消息类型的大小)sizeof(struct msgbuf) - sizeof(long);
-
msgflg
:0 阻塞 IPC_NOWAIT 不阻塞
消息队列的容量(msgp)
-
消息的大小有一个最大限制,其定义在Linux/msg.h文件中
#define MSGMAX 8192
-
消息结构的总大小不能超过8192字节(包括type的大小)
返回值
- 成功返回0
- 失败返回-1,并设置 errno
读取数据
功能
- 从消息队列中读取数据
函数原型
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
-
msqid
:信息队列的ID号 -
msgp
:用来保存读取的信息struct msgbuf { long mtype; // 消息类型(>0) // 消息正文 // ... };
-
msgsz
:数据域的字节数sizeof(struct msgbuf) - sizeof(long);
-
msgtyp
:读取的数据 在消息队列中存储的 类型>0
:得到该种类型的第一个数据==0
:得到整个消息队列的第一个数据<0
:得到 小于或等于$|msgtyp|$类型的 最小类型的 第一个数据
-
msgflg
:0 阻塞 IPC_NOWAIT 不阻塞
返回值
- 成功返回 数据区的字节数
- 失败返回-1,并设置 errno
删除消息队列
功能
- 控制 消息队列
函数原型
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
msqid
:消息队列的ID号cmd
:IPC_RMID
:删除消息队列,第三个参数为NULL
返回值
- IPC_RMID 成功返回0,失败返回-1,并设置 errno
实现双向通信
方法一:创建两个消息队列来进行通信
- 不同颜色,代表不同的消息队列
two_msg_way1_1.c
- 父进程中,使用通过 key1 建立的消息队列1,发送信息(数据类型为1)
- 子进程中,使用通过 key2 建立的消息队列2,接受信息(数据类型为2)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>
// 消息的缓冲区结构
struct msgmbuf
{
long mtype; // 消息类型
char mtext[10]; // 消息内容
};
int main(void)
{
int ret = -1;
key_t key1, key2;
int smsg_id, rmsg_id; // 收消息与发消息的队列id
struct msgmbuf msg_mbuf; // 创建消息缓冲区
char* msgpath1 = "/usr/bin"; // 消息key1产生所用的路径
char* msgpath2 = "/usr/bin"; // 消息key2产生所用的路径
pid_t pid;
// 获得 key1 和 key2
key1 = ftok(msgpath1, 'b');
key2 = ftok(msgpath2, 'a');
if (key1 != -1 && key2 != -1)
{
printf("成功建立KEY\n");
}
else
{
perror("建立KEY失败");
exit(-1);
}
smsg_id = msgget(key1, IPC_CREAT | 0666); // 使用 key1 建立发消息的消息队列
rmsg_id = msgget(key2, IPC_CREAT | 0666); // 使用 key2 建立收消息的消息队列
if (-1 == smsg_id || -1 == rmsg_id)
{
perror("msgget error");
exit(-1);
}
// 通过fork()创建子进程,主进程进行发消息,子进程进行收消息
pid = fork();
while (1)
{
if (pid > 0) // 主进程
{
msg_mbuf.mtype = 1; // 设置发送的消息类型
scanf("%s", msg_mbuf.mtext); // 用户输入内容
if (strncmp(msg_mbuf.mtext, "end", 3) == 0) // 如果前三个字符为end,则跳出循环
{
break;
}
ret = msgsnd(smsg_id, &msg_mbuf, sizeof(struct msgmbuf) - sizeof(msg_mbuf.mtype),
0); // 发送消息
if (ret == -1)
{
perror("msgsnd error");
exit(-1);
}
}
else // 子进程
{
ret = msgrcv(rmsg_id, &msg_mbuf, sizeof(struct msgmbuf) - sizeof(msg_mbuf.mtype), 2,
0); //接收消息
if (-1 == ret)
{
perror("msgrcv error");
exit(-1);
}
else
{
printf("接收消息成功,长度:%d\n", ret);
printf("content:%s\n\n", msg_mbuf.mtext);
}
}
}
// 注意:两个程序删除的消息队列
ret = msgctl(smsg_id, IPC_RMID, NULL); // 删除发消息的队列
if (-1 == ret)
{
perror("msgctl error");
exit(-1);
}
return 0;
}
two_msg_way1_2.c
- 父进程中,使用通过 key2 建立的消息队列2,发送信息(数据类型为2)
- 子进程中,使用通过 key1 建立的消息队列1,接收信息(数据类型为1)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>
// 消息的缓冲区结构
struct msgmbuf
{
long mtype; // 消息类型
char mtext[10]; // 消息内容
};
int main(void)
{
int ret = -1;
key_t key1, key2;
int smsg_id, rmsg_id; // 收消息与发消息的队列id
struct msgmbuf msg_mbuf; // 创建消息缓冲区
char* msgpath1 = "/usr/bin"; // 消息key1产生所用的路径
char* msgpath2 = "/usr/bin"; // 消息key2产生所用的路径
pid_t pid;
// 获得 key1 和 key2
key1 = ftok(msgpath1, 'b');
key2 = ftok(msgpath2, 'a');
if (key1 != -1 && key2 != -1)
{
printf("成功建立KEY\n");
}
else
{
perror("建立KEY失败");
exit(-1);
}
smsg_id = msgget(key2, IPC_CREAT | 0666); // 使用 key2 建立收消息的消息队列
rmsg_id = msgget(key1, IPC_CREAT | 0666); // 使用 key1 建立发消息的消息队列
if (-1 == smsg_id || -1 == rmsg_id)
{
perror("msgget error");
exit(-1);
}
// 通过fork()创建子进程,主进程进行发消息,子进程进行收消息
pid = fork();
while (1)
{
if (pid > 0) // 主进程
{
msg_mbuf.mtype = 2; // 设置发送的消息类型
scanf("%s", msg_mbuf.mtext); // 用户输入内容
if (strncmp(msg_mbuf.mtext, "end", 3) == 0) // 如果前三个字符为end,则跳出循环
{
break;
}
ret = msgsnd(smsg_id, &msg_mbuf, sizeof(struct msgmbuf) - sizeof(msg_mbuf.mtype),
0); // 发送消息
if (ret == -1)
{
perror("msgsnd error");
exit(-1);
}
}
else // 子进程
{
ret = msgrcv(rmsg_id, &msg_mbuf, sizeof(struct msgmbuf) - sizeof(msg_mbuf.mtype), 1,
0); //接收消息
if (-1 == ret)
{
perror("msgrcv error");
exit(-1);
}
else
{
printf("接收消息成功,长度:%d\n", ret);
printf("content:%s\n\n", msg_mbuf.mtext);
}
}
}
ret = msgctl(smsg_id, IPC_RMID, NULL); // 删除发消息的队列
if (-1 == ret)
{
perror("msgctl error");
exit(-1);
}
return 0;
}
方法二:通过创建不同的消息类型来进行双向通信
- 在同一个消息队列中,使用不同的消息类型来标识收发信息
- 图中不同的颜色,表示不同的消息类型