文章目录
前言
本文主要探讨linux下进程间的通信方式之消息队列,最后通过一个简单的示例实现一个消息队列的创建与销毁,发送与接收的功能
1. 消息队列概念
Linux下的消息队列是一种进程间通信(IPC)的机制,它允许不同进程之间通过向消息队列发送和接收消息来实现进程间的数据交互。
消息队列在内核中是以链表的形式组织和管理消息的,每个消息队列都有一个与之对应的链表。当发送一个消息到消息队列时,发送的消息会被构造成一个结构对象并添加到消息队列链表的尾部。同样,当接收一个消息时,也是从链表查找到一个匹配的消息类型,并从链表中删除该消息节点。每一条消息都有自己的消息类型,消息类型用整数来表示且必须大于0,使用ipcs -q 可以查看系统消息队列的使用情况。
注意:消息队列的生命周期并不随进程结束而结束,而是随操作系统的运行而持续存在,我们需要显示地调用接口或者命令才能删除
2. 消息队列的应用场景
消息队列既可以应用于进程间的通信,也可以应用于线程间的数据传递。一般用于处理异步事件,比如有些消息是不需要及时处理的可以先放到消息队列里,等需要的时候再读取出来。需要注意的是,消息队列本身是有一些限制的,每个消息的最大长度限制为MSGMAX,每个消息队列的总的字节数限制为MSGMNB,系统上消息队列的总数上限为MSGMNI。这些限制的具体值取决于特定的操作系统和配置,你可以在 <sys/msg.h> 头文件中查看它们的具体值。
3. 消息队列接口分类
3.1 System V消息队列
优点:提供了底层的控制方式,对于需要更底层控制的传统IPC场景可能更为适用。
缺点:在某些无竞争条件下,其性能可能会稍低于POSIX消息队列,因为它可能会陷入内核。另外,由于它是UNIX特定的,所以在跨平台兼容性方面可能较差。
3.2 POSIX消息队列
优点:提供了更现代化的接口和更灵活的内存管理,通常具有更好的可移植性。此外,POSIX消息队列还支持消息的持久化,即使系统崩溃也不会丢失消息。在性能上,POSIX消息队列在无竞争条件下通常表现较好,因为它不会陷入内核。
缺点:虽然POSIX消息队列在多个方面表现出色,但它也增加了系统的复杂性,需要额外的配置和维护。此外,由于消息是异步传递的,可能会引入一定的延迟,并且在某些情况下可能导致一致性问题,需要特殊处理。
本文我们先来探讨一下System V消息队列
4. 消息队列相关操作函数
接口调用主要涉及到 ftok、msgget、msgsnd、msgrcv和msgctl五个接口
4.1 ftok 函数(获取一个key值)
函数原型 | key_t ftok(const char *pathname, int proj_id); |
功能 | 基于文件路径和proj_id子序号生成一个键值 |
参数 | |
pathname: 文件路径 | |
proj_id: 子序号 | |
返回值 | 成功返回一个键值,失败返回-1并设置errno指明错误的原因 |
4.2 msgget 函数(根据key值获取一个消息队列操作符)
函数原型 | int msgget(key_t key, int msgflg); |
功能 | 创建或获取一个消息队列描述符 |
参数 | |
key: 键值,唯一标识一个消息队列,可取由ftok创建的key值或指定的一个非负整数值 | |
msgflg: 这是一组标志,用于控制 msgget 函数的行为 | |
返回值 | 成功返回一个消息队列操作符,失败返回-1并设置errno指明错误的原因 |
关于msgflg我们先来看一下来自man手册的这段描述:
If msgflg specifies both IPC_CREAT and IPC_EXCL and a message queue already exists for key, then msgget() fails with errno set to EEXIST. (This is analogous to the effect of the combination O_CREAT | O_EXCL for open(2).)
意思就是说如果msgflg同时指定了IPC_CREAT和IPC_EXCL并且指定该key的消息队列已经存在的情况下,函数将返回失败-1并设置errno标识为EEXIST,这类似于open函数O_CREAT | O_EXCL的组合效果。
简而言之就是当指定msgflg为IPC_CREAT | IPC_EXCL时,若消息队列不存在则创建,否则返回失败并设置erron标识为EEXIST。
再来看一下关于消息队列权限位的描述:
Upon creation, the least significant bits of the argument msgflg define the permissions of the message queue.These permission bits have the same format and semantics as the permissions specified for the mode argument of open(2). (The execute permissions are not used.)
意思就是说在创建时,msgflag参数的最低有效位定义了该消息队列的权限。比如说我们可以将这个标志位或上0666,这样表示所有的用户度可以对该消息队列进行读写操作。
因此我们可以设置标志位为IPC_CREAT | IPC_EXCL | 0666,当消息队列不存在则创建,否则返回失败并设置errno标志为EEXIST。
4.3 msgctl 函数(设置消息队列属性)
函数原型 | int msgctl(int msqid, int cmd, struct msqid_ds *buf); |
功能 | 获取或设置消息队列的属性 |
参数 | |
msqid: 由msgget函数返回的消息队列标识符 | |
cmd: 操作指令 | |
buf: 一个指向 msqid_ds 结构的指针。 当 cmd 是 IPC_STAT 时,该结构用于获取消息队列的状态信息 当 cmd 是 IPC_SET 时,该结构用于设置消息队列的属性 当 cmd 是 IPC_RMID 时,设置为NULL表示删除该消息队列 | |
返回值 | 成功返回0,失败返回-1并设置errno指明错误的原因 |
以下是msqid_ds结构体的定义以及具体的含义:
struct msqid_ds {
struct ipc_perm msg_perm; /* Ownership and permissions */
time_t msg_stime; /* Time of last msgsnd(2) */
time_t msg_rtime; /* Time of last msgrcv(2) */
time_t msg_ctime; /* Time of creation or last
modification by msgctl() */
unsigned long msg_cbytes; /* # of bytes in queue */
msgqnum_t msg_qnum; /* # number of messages in queue */
msglen_t msg_qbytes; /* Maximum # of bytes in queue */
pid_t msg_lspid; /* PID of last msgsnd(2) */
pid_t msg_lrpid; /* PID of last msgrcv(2) */
};
关于ipc_perm结构体的定义以及具体含义如下:
struct ipc_perm {
key_t __key; /* Key supplied to msgget(2) */
uid_t uid; /* Effective UID of owner */
gid_t gid; /* Effective GID of owner */
uid_t cuid; /* Effective UID of creator */
gid_t cgid; /* Effective GID of creator */
unsigned short mode; /* Permissions */
unsigned short __seq; /* Sequence number */
};
4.4 msgsnd 函数(往消息队列写入数据)
函数原型 | int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg); |
功能 | 向标识符为msgid的消息队列中写入消息 |
参数 | |
msqid: 消息队列标识符 | |
msgp: msgp参数是一个指针,指向调用者定义的结构 | |
msgsz: 发送消息正文的字节数,注意这里的是指正文内容mtext里面数据的字节数,不含消息类型占用的4个字节 | |
msgflg: 发送消息标志位 | |
返回值 | 成功返回0,失败返回-1并设置errno指明错误的原因 |
msgp参数是一个指针,指向调用者定义的结构体,其一般形式如下:
struct msgbuf {
long mtype; /* message type, must be > 0 */
char mtext[1]; /* message data */
};
关于msgflag标志位常见有以下几种:
msgflg标志位 | 描述 |
---|---|
0 | 当消息队列满时,msgsnd将会阻塞,直到消息能写进消息队列 |
IPC_NOWAIT | 当消息队列已满的时候,msgsnd函数不等待立即返回 |
IPC_NOERROR | 若发送的消息大于size字节,则把该消息截断,截断部分将被丢弃,且不通知发送进程 |
4.5 msgrcv 函数(从消息队列获取数据)
函数原型 | ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg); |
功能 | 向标识符为msgid的消息队列中写入消息 |
参数 | |
msqid: 消息队列标识符 | |
msgp: 指针,指向接收数据结构体 | |
msgsz: 接收的正文内容mtext字节大小,不含消息类型占用的4个字节 | |
msgtyp: 要接收的消息队列的消息类型 | |
msgflg:接收消息标志位 | |
返回值 | 成功返回实际读取到的字节数,失败返回-1并设置errno指明错误的原因 |
关于msgflag标志位常见有以下几种:
msgflg标志位 | 描述 |
---|---|
0 | 阻塞式接收消息,没有该类型的消息msgrcv函数一直阻塞等待 |
IPC_NOWAIT | 如果没有返回条件的消息调用立即返回,此时错误码为ENOMSG |
IPC_NOERROR | 如果队列中满足条件的消息内容大于所请求的size字节,则把该消息截断,截断部分将被丢弃 |
4.6 部分常见错误代码:
错误代码 | 描述 |
---|---|
EAGAIN | 参数msgflg设为IPC_NOWAIT,而消息队列已满 |
EIDRM | 标识符为msqid的消息队列已被删除 |
EACCESS | 无权限写入消息队列 |
EFAULT | 参数msgp指向无效的内存地址 |
EINVAL | 无效的参数msqid、msgsz或参数消息类型type小于0 |
E2BIG | 消息数据长度大于msgsz而msgflag没有设置IPC_NOERROR |
ENOMSG | 参数msgflg设为IPC_NOWAIT,而消息队列中无消息可读 |
EINTR | 等待读取队列内的消息情况下被信号中断 |
5. 消息队列应用实例
5.1 System V消息队列应用示例代码:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define MSGSIZE 1024
struct msgbuf
{
long mtype;
char mtext[80];
};
void usage(char *prog_name, char *msg)
{
if (msg != NULL)
fputs(msg, stderr);
fprintf(stderr, "Usage: %s [options]\n", prog_name);
fprintf(stderr, "Options are:\n");
fprintf(stderr, "-s \"msg\" send message using msgsnd()\n");
fprintf(stderr, "-r read message using msgrcv()\n");
fprintf(stderr, "-t message type (default is 1)\n");
fprintf(stderr, "-k message queue key (default is 1234)\n");
fprintf(stderr, "-d keyvalue remove a message queue\n");
exit(EXIT_FAILURE);
}
/*
* init_data_msg 初始化消息队列,设置消息队列大小
*
* return 返回消息的的msg_id
*/
int msg_queue_init(key_t key)
{
int ret, msg_id;
msg_id = msgget(key, IPC_CREAT | IPC_EXCL | 0666);//创建
if (msg_id == -1) {
if (errno == EEXIST) {
printf("Message queue which key: %d is already exit !\n", key);
msg_id = msgget(key, 0);
return msg_id;
} else {
printf("Message queue creation failed: %s(errno: %d)\n", strerror(errno), errno);
return -1;
}
}
// 普通用户会设置失败,可能跟系统的限制或SELinux/AppArmor策略有关,要以root运行才行
struct msqid_ds buf;
memset(&buf, 0, sizeof(struct msqid_ds));
ret = msgctl(msg_id, IPC_STAT, &buf);
if (ret == -1) {
printf("msgctl IPC_STAT failed: %s(errno: %d)\n", strerror(errno), errno);
}
buf.msg_qbytes = 1048576;//1M
ret = msgctl(msg_id, IPC_SET, &buf);
if (ret == -1) {
printf("data msgctl IPC_SET failed: %s(errno: %d)\n", strerror(errno), errno);
}
return msg_id;
}
// 删除指定key值的消息队列
int delete_msg_queue(key_t key) {
int msgid;
// 通过key值获取消息队列ID
msgid = msgget(key, 0);
if (msgid == -1) {
if (errno == ENOENT) {
fprintf(stderr, "Message queue with key %d does not exist.\n", key);
}
return -1;
}
// 删除消息队列
if (msgctl(msgid, IPC_RMID, NULL) == -1) {
perror("msgctl");
return -1;
}
printf("Message queue with key %d deleted successfully.\n", key);
return 0;
}
void send_msg(int qid, int msgtype, const char* mtext)
{
struct msgbuf msg;
time_t t;
msg.mtype = msgtype;
time(&t);
snprintf(msg.mtext, sizeof(msg.mtext), "%s at %s", mtext,
ctime(&t));
if (msgsnd(qid, &msg, sizeof(msg.mtext),
IPC_NOWAIT) == -1) {
perror("msgsnd error");
exit(EXIT_FAILURE);
}
printf("sent: %s\n", msg.mtext);
}
void get_msg(int qid, int msgtype)
{
struct msgbuf msg;
if (msgrcv(qid, &msg, sizeof(msg.mtext), msgtype,
MSG_NOERROR | IPC_NOWAIT) == -1) {
if (errno != ENOMSG) {
perror("msgrcv");
exit(EXIT_FAILURE);
}
printf("No message available for msgrcv()\n");
} else
printf("message received: %s\n", msg.mtext);
}
int main(int argc, char *argv[])
{
int qid, opt;
int mode = 0; /* 1 - send, 2 - receive , 3 - delete*/
int msgtype = 1;
int msgkey = 1234;
char sendbuf[MSGSIZE] = {0};
while ((opt = getopt(argc, argv, "s:rt:k:d:")) != -1) {
switch (opt)
{
case 's':
mode = 1;
memcpy(sendbuf, optarg, strlen(optarg));
break;
case 'r':
mode = 2;
break;
case 't':
msgtype = atoi(optarg);
if (msgtype <= 0)
usage(argv[0], "-t option must be greater than 0\n");
break;
case 'k':
msgkey = atoi(optarg);
break;
case 'd':
mode = 3;
msgkey = atoi(optarg);
break;
default:
usage(argv[0], "Unrecognized option\n");
}
}
if (mode == 0)
usage(argv[0], "must use either -s or -r option\n");
else if (mode == 3)
delete_msg_queue(msgkey);
else {
qid = msg_queue_init(msgkey);
if (mode == 2)
get_msg(qid, msgtype);
else
send_msg(qid, msgtype, sendbuf);
}
return 0;
}
5.2 应用示例讲解:
1.创建一个消息队列并向该消息队列发送消息(默认消息队列key=1234,type=1)
执行指令 ./example -k 56789 -s “hello world !”
可以看到创建了一个key值为56789(换算成十六进制为0x0000ddd5)的消息队列,并其中写入了两条消息
2.从消息队列里读取消息
执行指令 ./example -k 56789 -r
3.删除消息队列
执行指令 ./example -d 56789
标签:IPC,队列,System,int,消息,key,Linux,msg From: https://blog.csdn.net/Mr_Jaychong/article/details/139879338