首页 > 其他分享 >消息队列非阻塞

消息队列非阻塞

时间:2023-02-17 15:34:42浏览次数:31  
标签:IPC 队列 msgid 阻塞 消息 msg include

消息队列(也叫做报文队列)能够克服早期unix通信机制的一些缺点。作为早期unix通信机制之一的信号能够传送的信息量有限,后来虽然POSIX 1003.1b在信号的实时性方面作了拓广,使得信号在传递信息量方面有了相当程度的改进,但是信号这种通信方式更像"即时"的通信方式,它要求接受信号的进程在某个时间范围内对信号做出反应,因此该信号最多在接受信号进程的生命周期内才有意义,信号所传递的信息是接近于随进程持续的概念(process-persistent),管道及有名管道及有名管道则是典型的随进程持续IPC,并且,只能传送无格式的字节流无疑会给应用程序开发带来不便,另外,它的缓冲区大小也受到限制。

 

详见: http://www.ibm.com/developerworks/cn/linux/l-ipc/part3/

 

  1. /* 
  2.  * 
  3.  *       Filename:  producer.c 
  4.  * 
  5.  *    Description:  生产者进程 
  6.  * 
  7.  *        Version:  1.0 
  8.  *        Created:  09/30/2011 04:52:23 PM 
  9.  *       Revision:  none 
  10.  *       Compiler:  gcc(g++) 
  11.  * 
  12.  *         Author:  |Zhenghe Zhang|, |[email protected]
  13.  *        Company:  |Shenzhen XXX Technology Co., Ltd.| 
  14.  * 
  15.  */  
  16.   
  17. #include <stdio.h>  
  18. #include <stdlib.h>  
  19. #include <string.h>  
  20. #include <unistd.h>  
  21. #include <sys/ipc.h>  
  22. #include <sys/msg.h>  
  23. #include <error.h>  
  24.   
  25. #define BUFFER      10 //定义buf大小  
  26.   
  27. struct msgtype {  
  28.     long mtype;  
  29.     char buf1[BUFFER + 1];  
  30.     char buf2[BUFFER + 1];  
  31.     long size;  
  32. };  
  33.   
  34. int main()  
  35. {  
  36.     key_t msgkey;  
  37.   
  38.     /*消息队列*/  
  39.     int msgid;  
  40.     struct msgtype msg;  
  41.   
  42.     msgkey = ftok("/home/zhang/shmipcx", 10001);  
  43.     if(msgkey == -1)  
  44.     {  
  45.         perror("ftok");  
  46.         exit(1);  
  47.     }  
  48.   
  49.     /*得到消息队列标识符或创建一个消息队列对象并返回消息队列标识符*/  
  50.     msgid = msgget(msgkey, IPC_CREAT | 0666);  
  51.     if(msgid == -1)  
  52.     {  
  53.         perror("msgget");  
  54.         exit(1);  
  55.     }  
  56.   
  57.     int i = 0;  
  58.     while(i < 10)  
  59.     {  
  60.         memset(msg.buf1, 0, BUFFER + 1);  
  61.         memset(msg.buf2, 0, BUFFER + 1);  
  62.   
  63.         sprintf(msg.buf1, "buf1_0x%x", i);         
  64.         sprintf(msg.buf2, "buf2_0x%x", i + 'a');  
  65.          
  66.         msg.mtype = 1001;  
  67.         msg.size  = i;  
  68.   
  69.         printf("msg.mtype = %ld, msg.size = %ld\t", msg.mtype, msg.size);  
  70.         printf("msg.buf1 = %s, msg.buf2 = %s\n", msg.buf1, msg.buf2);  
  71.   
  72.         /*将msgp消息写入到标识符为msqid的消息队列*/  
  73.         /*msgsz, 要发送消息的大小,不含消息类型占用的4个字节,即mtext的长度*/  
  74.         /*msgflg 0:当消息队列满时,msgsnd将会阻塞,直到消息能写进消息队列*/  
  75.         /*msgflg IPC_NOWAIT:当消息队列已满的时候,msgsnd函数不等待立即返回*/  
  76.         /*msgflg IPC_NOERROR:若发送的消息大于size字节,则把该消息截断,截断部分将被丢弃,且不通知发送进程。*/          
  77.         if(msgsnd(msgid, &msg, sizeof(struct msgtype) - sizeof(long), 0) == -1)  
  78.         {  
  79.             perror("msgsnd");  
  80.             exit(1);  
  81.         }  
  82.   
  83.         i++;  
  84.         sleep(1);     
  85.     }  
  86.   
  87.     sleep(30); //等待消费者进程退出  
  88.   
  89.     /*获取和设置消息队列的属性*/  
  90.     /*cmd IPC_STAT:获得msgid的消息队列头数据到buf中*/  
  91.     /*cmd IPC_RMID:删除消息队列*/  
  92.     if(msgctl(msgid, IPC_RMID, 0) == -1)  
  93.     {  
  94.         perror("msgctl");  
  95.         exit(1);  
  96.     }  
  97.   
  98.     return 0;  
  99. }  
  100.   
  101. /* 
  102.  * 
  103.  *       Filename:  consumer.c 
  104.  * 
  105.  *    Description:  消费者进程 
  106.  * 
  107.  *        Version:  1.0 
  108.  *        Created:  09/30/2011 04:52:23 PM 
  109.  *       Revision:  none 
  110.  *       Compiler:  gcc(g++) 
  111.  * 
  112.  *         Author:  |Zhenghe Zhang|, |[email protected]
  113.  *        Company:  |Shenzhen XXX Technology Co., Ltd.| 
  114.  * 
  115.  */  
  116.   
  117. #include <stdio.h>  
  118. #include <stdlib.h>  
  119. #include <string.h>  
  120. #include <unistd.h>  
  121. #include <sys/ipc.h>  
  122. #include <sys/msg.h>  
  123. #include <error.h>  
  124.   
  125. #define BUFFER      10 //定义buf大小  
  126.   
  127. struct msgtype {  
  128.     long mtype;  
  129.     char buf1[BUFFER + 1];  
  130.     char buf2[BUFFER + 1];  
  131.     long size;  
  132. };  
  133.   
  134. int main()  
  135. {  
  136.     key_t msgkey;    
  137.   
  138.     /*消息队列*/  
  139.     int msgid;  
  140.     struct msgtype msg;  
  141.   
  142.   
  143.     msgkey = ftok("/home/zhang/shmipcx", 10001);  
  144.     if(msgkey == -1)  
  145.     {  
  146.         perror("ftok");  
  147.         exit(1);  
  148.     }  
  149.   
  150.     msgid = msgget(msgkey, IPC_EXCL | 0666);  
  151.     if(msgid == -1)  
  152.     {  
  153.         perror("msgget");  
  154.         exit(1);  
  155.     }  
  156.    
  157.     int i = 0;  
  158.   
  159.     while(i < 10) //运行一个consumenr,为 10 ,同时运行两个consumer进程,为 5    
  160.     {  
  161.         memset(msg.buf1, 0, BUFFER + 1);  
  162.         memset(msg.buf2, 0, BUFFER + 1);  
  163.         msg.mtype = 1001;  
  164.         msg.size  = -1;  
  165.    
  166.         /*从标识符为msqid的消息队列读取消息并存于msgp中,读取后把此消息从消息队列中删除*/  
  167.         /*msgp 存放消息的结构体,结构体类型要与msgsnd函数发送的类型相同*/  
  168.         /*msgsz 要接收消息的大小,不含消息类型占用的4个字节*/  
  169.         /*msgtyp 0 接收第一个消息*/  
  170.         /*msgtyp <0 接收类型等于或者小于msgtyp绝对值的第一个消息*/  
  171.         /*msgtyp >0 接收类型等于msgtyp的第一个消息*/  
  172.         /*msgflg 0 阻塞式接收消息,没有该类型的消息msgrcv函数一直阻塞等待*/  
  173.         /*msgflg IPC_NOWAIT 如果没有返回条件的消息调用立即返回,此时错误码为ENOMSG*/  
  174.         /*msgflg IPC_EXCEPT 与msgtype配合使用返回队列中第一个类型不为msgtype的消息*/  
  175.         /*msgflg IPC_NOERROR 如果队列中满足条件的消息内容大于所请求的size字节,则把该消息截断,截断部分将被丢弃*/  
  176.         if(msgrcv(msgid, &msg, sizeof(struct msgtype) - sizeof(long), msg.mtype, 0) == -1)  
  177.         {  
  178.             perror("msgrcv");  
  179.             exit(1);  
  180.         }  
  181.   
  182.         printf("msg.mtype = %ld, msg.size = %ld\t", msg.mtype, msg.size);  
  183.         printf("msg.buf1 = %s, msg.buf2 = %s\n", msg.buf1, msg.buf2);  
  184.   
  185.         i++;  
  186.         sleep(2);  
  187.     }  
  188.   
  189.     return 0;  
  190. }  

标签:IPC,队列,msgid,阻塞,消息,msg,include
From: https://www.cnblogs.com/kn-zheng/p/17130323.html

相关文章

  • 单调队列
    单调队列是什么呢?可以直接从问题开始来展开。Poj2823给定一个数列,从左至右输出每个长度为m的数列段内的最小数和最大数。数列长度:N<=106,m<=N暴力解很直观的一种解法,那就......
  • 堆-- 神奇的优先队列
    堆是什么?是一种特殊的完全二叉树,就像树一样。有没有发现这棵二叉树有一个特点?就是所有的父节点都比子节点小(PS:就是圆圈的数值,圆圈外面的编号是这个节点的编号,)那么符合这样......
  • Java: RocketMQ事务消息的优雅使用
    背景在项目中,技术方案需要使用事务消息来保证最终一致性达到实现业务的目的。但在一个服务中有多个业务需要使用事务消息发送不同的消息类型到不同的Topic时,RocketMQ的本......
  • 切换窗体或消息控制窗体的延时问题
    应用场景如:ShowWindow+SetForegroundWindow,用来切换窗体,有时切换失败对其他窗体SendMessageWM_KEYDOWN或其他消息控制动作,有时逻辑异常问题原因:其他窗......
  • 小程序消息推送之模板订阅
    1、如下所示,订阅自己所需要的模板  2、订阅成功 ......
  • 小程序之消息推送(node.js)
    1、打开微信公众平台,登录小程序后台配置界面,开发管理>消息推送(开启)>配置(注意:这里我使用的是https,明文传输,JSON格式,如果是http,建议使用加密或者兼容模式,那么在这个url接......
  • 【LeetCode栈与队列#06】前K个高频元素,以及pair、priority_queue的使用
    前K个高频元素力扣题目链接(opensnewwindow)给定一个非空的整数数组,返回其中出现频率前k高的元素。示例1:输入:nums=[1,1,1,2,2,3],k=2输出:[1,2]示......
  • 开发微信订阅消息推送
    1,微信基础配置WxContanst.java:1packagecom.pgo.modules.inv.subscribe;23publicclassWxContanst{45privatestaticStringAPPID="微信公众平台......
  • 【开源需求】C++多线程消息分发架构
    项目【gi_messager】开源项目:https://girakoo.com/联系方式:[email protected]需求简述在多线程环境中,为每个线程提供独立的消息队列MessageLoop。注:主线程默认自动......
  • 精确延时---多线程延时阻塞精度asm("nop") nanosleep usleep sleep select
    Linux平台延时之sleep、usleep、nanosleep、select比较 1、sleep的精度是秒2、usleep的精度是微妙,不精确3、select的精度是微妙,精确struct timevaldelay;delay.tv_......