首页 > 系统相关 >进程间通信-POSIX 消息队列

进程间通信-POSIX 消息队列

时间:2024-03-07 20:13:07浏览次数:30  
标签:attr 队列 间通信 POSIX 消息 mq include 函数

POSIX 消息队列

POSIX 消息队列可以认为是一个消息链表。进程(线程)可以往里写消息,也可以从里面取出消息。可以在不相关的进程之间发送和接收数据。

创建(打开)消息队列-mq_open()函数

mq_open()函数用于打开或创建一个消息队列,该函数定义如下:

#include <mqueue.h>

 mqd_t mq_open(const char *name, int oflag);
 mqd_t mq_open(const char *name, int oflag, mode_t mode,
               struct mq_attr *attr);

参数说明

  • name:消息队列的名称,形式为一个斜杠 "/" 开头的字符串,类似于文件路径。
  • oflag:打开标志,用于指定消息队列的打开方式,可以是以下之一或它们的组合:
    • O_RDONLY:只读方式打开消息队列。
    • O_WRONLY:只写方式打开消息队列。
    • O_RDWR:读写方式打开消息队列。
    • O_CREAT:如果消息队列不存在,则创建它。
    • O_EXCL:与 O_CREAT 同时使用,确保创建一个新的消息队列。
  • mode:用于指定创建的消息队列的操作权限。类似于 chmod 命令中的权限设置。
  • attr:指向结构体 mq_attr 的指针,指定消息队列的属性,如最大消息数、消息大小等。

返回值

  • 如果函数执行成功,返回一个非负整数,表示消息队列的标识符(文件描述符)。
  • 如果发生错误,返回 -1。可以通过 errno 变量来获取具体的错误信息。

注意事项

  • 消息队列的名称在系统中必须是唯一的,不同进程可以通过相同名称打开同一个消息队列。
  • 消息队列的名称需要以斜杠 "/" 开头,必须符合路径名规则(最多由PATH_MAX个字节构成,包括结尾的空字节)。
  • 创建的消息队列以文件形式保存在 /dev/mqueque 目录下。

用法

 1 #include<stdio.h>
 2 #include<fcntl.h>
 3 #include<mqueue.h>
 4 #include<sys/stat.h>
 5 
 6 int main(int argc, char** argv)
 7 {
 8     mqd_t mqd = mq_open("/myqueue", O_RDONLY | O_CREAT | O_EXCL, 0666, NULL);
 9     if(mqd == -1)
10     {
11         perror("mq_open");
12         return -1;
13     }
14 
15     return 0;
16 }

如果代码中使用了 POSIX 消息队列相关代码,在编译时,需要指定额外的库文件,例如:

gcc my_mqueue.c -o a.out -lrt

输出:

$ ./a.out
$ ls -l /dev/mqueue/
总用量 0
-rw-rw-r-- 1 test test 80 12月  21 10:04 myqueue

关闭消息队列-mq_close()函数

mq_close()函数用于关闭一个已经打开的消息队列描述符,该函数定义如下:

#include <mqueue.h>

int mq_close(mqd_t mqdes);

参数说明

  • mqdes:待关闭的消息队列描述符。

返回值

  • 如果函数执行成功,返回0。
  • 如果发生错误,返回 -1。可以通过 errno 变量来获取具体的错误信息。

删除消息队列-mq_unlink()函数

mq_unlink()函数用于删除已经存在的消息队列,即删除消息队列的名称。该函数定义如下:

#include <mqueue.h>

int mq_unlink(const char *name);

参数说明

  • mqdes:要删除的消息队列的名称。

返回值

  • 如果函数执行成功,返回0。
  • 如果发生错误,返回 -1。可以通过 errno 变量来获取具体的错误信息。

注意事项

  • 如果在调用 mq_unlink() 函数之前,没有其他进程打开该消息队列,那么该消息队列将被立即删除,并且所有之前打开的描述符将成为无效。
  • 如果有其他进程打开了该消息队列,但没有对该消息队列进行写入或读取操作,那么调用 mq_unlink() 函数后,对该消息队列的访问将立即被阻止,但消息队列本身将保持存在。
  • 如果有其他进程当前正在使用该消息队列(执行了读写操作),删除操作将会延迟到所有打开的描述符都关闭或相应进程终止时才会生效。

消息队列的属性

struct mq_attr结构体

struct mq_attr结构体用于描述 POSIX 消息队列属性,包含如下成员:

struct mq_attr {
    long mq_flags;
    long mq_maxmsg;
    long mq_msgsize;
    long mq_curmsgs;
};

各个成员的含义如下:

  • mq_flags:标志位,取值为0 或者 O_NONBLOCK(非阻塞)
  • mq_maxmsg: 是消息队列中允许的最大消息数,默认是10。
  • mq_msgsize:消息队列中消息的最大大小,单位是字节,默认是8192字节。
  • mq_curmsgs:输出参数,当前队列中的消息数。

查看消息队列默认值:

cat /proc/sys/fs/mqueue/msg_max     #查看消息队列的消息最大长度
cat /proc/sys/fs/mqueue/msgsize_max #查看消息队列的消息最大个数

在消息队列的四个属性中:

  • mq_curmsgs只能获取不能设置。
  • mq_flags只能通过 mq_setattr() 函数设置,该函数的唯一作用就是设置或清除非阻塞标志。
  • mq_maxmsg 和 mq_msgsize 只能在创建新队列时由 mq_open() 函数的 attr 参数设置。
  • mq_maxmsg 和 mq_msgsize 必须同时指定,否则 mq_open() 函数创建新队列会失败。
  • O_NONBLOCK 标识不能通过 mq_flags 指定,可以通过 mq_open() 函数 oflag 参数指定,如果想要移除,通过 mq_setattr() 函数设置。

获取消息队列属性-mq_getattr()函数

mq_getattr()函数用于获取消息队列的属性信息,该函数定义如下:

#include <mqueue.h>

int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat); 

参数说明

  • mqdes:要获取属性的消息队列描述符。
  • mqstat:指向 struct mq_attr 结构的指针,用于存储获取的属性信息。

返回值

  • 如果函数执行成功,返回0。
  • 如果发生错误,返回 -1。可以通过 errno 变量来获取具体的错误信息。

设置消息队列属性-mq_setattr()函数

mq_setattr()函数用于设置消息队列的属性,该函数定义如下:

#include <mqueue.h>

int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, 
               struct mq_attr *omqstat); 

参数说明

  • mqdes:要设置属性的消息队列描述符。
  • mqstat:指向 struct mq_attr 结构的指针,指向新的属性信息。
  • omqstat:指向 struct mq_attr 结构的指针,保存原先的属性信息。

返回值

  • 如果函数执行成功,返回0。
  • 如果发生错误,返回 -1。可以通过 errno 变量来获取具体的错误信息。

消息发送与接收

消息发送-mq_send()函数

mq_send() 函数用于向消息队列发送消息,该函数定义如下:

#include <mqueue.h>

int mq_send(mqd_t mqdes, const char *msg_ptr, 
                size_t msg_len, unsigned int msg_prio);           

参数说明

  • mqdes:消息队列描述符。
  • msg_ptr:指向要发送的消息的指针。
  • msg_len:要发送的消息的长度,不能超过消息队列的最大长度。
  • msg_prio:消息的优先级, 数值越大,优先级越高。优先级低的消息会排在优先级高的消息之后。

返回值

  • 如果函数执行成功,返回0。
  • 如果发生错误,返回 -1。可以通过 errno 变量来获取具体的错误信息。
  • 如果消息队列已满,mq_send() 函数会阻塞直到有空间可用。
  • 如果设置了非阻塞标志,此时会立即返回,错误码为 EAGAIN。

消息接收-mq_receive函数

mq_receive() 函数用于从消息队列接收消息。该函数定义如下:

#include <mqueue.h>

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, 
                         size_t msg_len, unsigned int *msg_prio);

参数说明

  • mqdes:消息队列描述符。
  • msg_ptr:指向接收消息的缓冲区的指针,用于存储接收到的消息。
  • msg_len:接收消息的最大长度,必须大于等于消息的实际长度。
  • msg_prio:用于存储接收到的消息的优先级。

返回值

  • 如果函数执行成功,返回接收到的消息的长度(即实际读取的字节数)。
  • 如果发生错误,返回 -1。可以通过 errno 变量来获取具体的错误信息。
  • 如果消息队列为空,mq_receive() 函数会阻塞直到有消息可用。
  • 如果设置了非阻塞标志,此时会立即返回,错误码为 EAGAIN。

示例

进程A负责读取:

 1 #include<stdio.h>
 2 #include<sys/stat.h>
 3 #include<mqueue.h>
 4 #include<fcntl.h>
 5 #include<errno.h>
 6 #include<unistd.h>
 7 #include<stdlib.h>
 8 
 9 int main(int argc, char** argv)
10 {
11     mqd_t mqd = mq_open("/my_mqueue", O_RDONLY);
12     if(mqd == -1)
13     {
14         perror("mq_open");
15         return -1;
16     }
17 
18     struct mq_attr attr;
19     if(mq_getattr(mqd, &attr) == -1)
20     {
21         perror("mq_getattr");
22         return -1;
23     }
24     
25     char* pBuf = (char*)malloc(attr.mq_msgsize);
26     if(pBuf == NULL)
27     {
28         perror("malloc");
29         return -1;
30     }
31 
32     int prio;
33     while(1)
34     {
35         int ret = mq_receive(mqd, pBuf,attr.mq_msgsize, &prio);
36         if(ret == -1)
37         {
38             if(errno == EAGAIN)
39             {
40                 printf("no block, contimue to read\n");
41                 continue;
42             }
43             perror("mq_receive");
44             return -1;
45         }
46         pBuf[ret] = '\0';
47         printf("read data : %s, prio : %d\n", pBuf, prio);
48         sleep(1);
49     }
50 
51     free(pBuf);
52     return 0;
53 }

进程B负责写入:

 1 #include<stdio.h>
 2 #include<stdlib.h>
 3 #include<string.h>
 4 #include<unistd.h>
 5 #include<sys/stat.h>
 6 #include<errno.h>
 7 #include<mqueue.h>
 8 
 9 int main(int argc, char** argv)
10 {
11     struct mq_attr attr0;
12     //attr0.mq_flags = O_NONBLOCK;      //在此处指定无效
13     attr0.mq_maxmsg = 5;
14     attr0.mq_msgsize = 4096;
15     mqd_t mqd;
16 
17 AGAIN:
18     mqd = mq_open("/my_mqueue", O_WRONLY | O_CREAT | O_EXCL /*| O_NONBLOCK*/, 0666, &attr0);
19     if(mqd == -1)
20     {
21         if(errno == EEXIST)
22         {
23             mq_unlink("/my_mqueue");
24             goto AGAIN;
25         }
26         perror("mq_open");
27         return -1;
28     }
29 
30     struct mq_attr attr;
31     if(mq_getattr(mqd, &attr) == -1)
32     {
33         perror("mq_getattr");
34         return -1;
35     }
36 
37     char* pBuf = (char*)malloc(attr.mq_msgsize);
38     if(pBuf == NULL)
39     {
40         perror("malloc");
41         return -1;
42     }
43 
44 /*  
45     attr.mq_flags = 0;    //取消非阻塞
46     if(mq_setattr(mqd, &attr, NULL) == -1)
47     {
48         perror("mq_setattr");
49         return -1;
50     }
51 */
52     int i = 0;
53     while(1)
54     {
55         sprintf(pBuf, "Data%d", i++);
56         int ret = mq_send(mqd, pBuf, strlen(pBuf) + 1, 10);
57         if(ret == -1)
58         {
59             if(errno == EAGAIN)
60             {
61                 printf("no block, continue to write\n");
62                 continue;
63             }
64             perror("mq_send");
65             return -1;
66         }
67         printf("send data : %s\n", pBuf);
68         sleep(1);
69     }
70 
71     free(pBuf);
72     return 0;
73 }

标签:attr,队列,间通信,POSIX,消息,mq,include,函数
From: https://www.cnblogs.com/BroccoliFighter/p/18059650

相关文章

  • 进程间通信-POSIX 共享内存
    POSIX共享内存POSIX共享内存是一种在Linux系统上使用的共享内存机制,它允许多个进程可以访问同一个内存区域,从而实现进程间的数据共享。共享内存是可用IPC机制中最快的,使用共享内存不必频繁拷贝数据。但也需要注意,由于共享内存段中的数据可以被多个进程同时访问,因此需要在程序......
  • 第三节:队列相关(滑动窗口最大值、)
    一.        二.        三.         !作       者:Yaopengfei(姚鹏飞)博客地址:http://www.cnblogs.com/yaopengfei/声     明1:如有错误,欢迎讨论,请勿谩骂^_^。声     明2:原创博客请在转载......
  • Unity3D 渲染队列 ZTest与ZWrite详解
    在Unity3D中,渲染队列(RenderingQueue)是一个非常重要的概念,它决定了游戏中各个物体的渲染顺序和优先级。而在渲染队列中,ZTest和ZWrite又是两个关键的参数,它们决定了物体在渲染的过程中如何处理深度测试和深度写入。本文将详细介绍Unity3D中的渲染队列、ZTest和ZWrite的概念,并给出相......
  • 云消息队列 Confluent 版正式上线!
    作者:阿里云消息队列前言在2023年杭州云栖大会上,Confluent成为阿里云技术合作伙伴,在此基础上,双方展开了深度合作,并在今天(3月1日)正式上线“云消息队列Confluent版”。通过将 Confluent在ApacheKafka领域的专业技术及实战经验与阿里云强大的云基础设施及服务体系相结合,......
  • 第三节:队列相关(滑动窗口最大值、)
    一.        二.        三.         !作       者:Yaopengfei(姚鹏飞)博客地址:http://www.cnblogs.com/yaopengfei/声     明1:如有错误,欢迎讨论,请勿谩骂^_^。声     明2:原创博客请在转载......
  • 并发队列
    并发队列目录并发队列为什么要使用队列并发队列简介阻塞队列BlockQueue什么是阻塞队列BlockingQueue主要方法ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueueSynchronousQueueSynchronousQueue注意点DelayQueue非阻塞队列ConcurrentLinkedQueue如何选择自己的队列......
  • (34/60)柠檬水找零、根据身高重建队列、用最少数量的箭引爆气球
    柠檬水找零leetcode:860.柠檬水找零贪心法思路遍历一遍数组,只关注面值5、10的钞票的数量每轮判断:如果是5,five++;如果是10,判断还有没有5,有的话five--;如果是20,检查有没有一张10、一张5,ten--,five--。或者三张5,five-=3。贪心:先消耗面值10的钞票,因为它更万能。复杂度分析时间......
  • 多线程系列(十三) -一文带你搞懂阻塞队列
    一、摘要在之前的文章中,我们介绍了生产者和消费者模型的最基本实现思路,相信大家对它已经有一个初步的认识。在Java的并发包里面还有一个非常重要的接口:BlockingQueue。BlockingQueue是一个阻塞队列,更为准确的解释是:BlockingQueue是一个基于阻塞机制实现的线程安全的队列。通......
  • laravel8 + redis 队列
      执行命令生成job: phpartisanmake:job自定义名称修改queue.php配置文件'redis'=>['driver'=>'redis','connection'=>'queue',【databases.php中单独配置一个redis的链接名为queue】'queue'=>en......
  • Unity3D 渲染队列 ZTest与ZWrite详解
    在Unity3D中,渲染队列(RenderingQueue)是一个非常重要的概念,它决定了游戏中各个物体的渲染顺序和优先级。而在渲染队列中,ZTest和ZWrite又是两个关键的参数,它们决定了物体在渲染的过程中如何处理深度测试和深度写入。本文将详细介绍Unity3D中的渲染队列、ZTest和ZWrite的概念,并给出相......