首页 > 其他分享 >system消息队列和posix消息队列

system消息队列和posix消息队列

时间:2024-04-28 21:45:42浏览次数:20  
标签:mqd 队列 system int mq 消息 posix notify

POSIX消息队列

System消息队列

主要函数

头文件#include<sys/msg.h>

  1. int msgget(key_t key, int oflag)
  2. int msgsnd(int msqid, const void * ptr, size_t length, int flag)
  3. ssize_t msgrcv (int msqid, void *ptr, size_t length, long type, int flag)
  4. int msgctl(int msqid, int cmd, struct msqid_ds *buf)

基本用法

  1. 创建或获取消息队列:使用msgget()函数来创建或获取一个消息队列。该函数接受一个键(key)和一个标志(flag)作为参数。如果键的值为IPC_PRIVATE 或当前没有消息队列与给定键相关联,将会创建一个新的消息队列。标志位可以用来指定权限组合。
  2. 往消息队列中放入消息:使用msgsnd()函数来往一个消息队列中放入一个消息。该函数接受四个参数,分别为消息队列标识符、指向消息的指针、消息的大小以及标志位。成功放入消息后,该函数返回0,否则返回-1并设置errno来表示错误原因。
  3. 从消息队列中读取消息:使用 msgrcv()函数来从一个消息队列中读取一个消息。该函数接受五个参数,分别为消息队列标识符、指向消息的指针、消息的最大大小、消息的类型以及标志位。成功读取消息后,该函数返回读取到的消息的大小,否则返回-1并设置errno来表示错误原因。
  4. 控制消息队列:使用msgctl()函数来对一个消息队列进行控制操作,如删除、设置权限等。该函数接受三个参数,分别为消息队列标识符、操作命令以及一个可选的参数。

POSIX消息队列

主要函数

头文件#include<mqueue.h>

  1. mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr attr);
  2. int mq_close(mqd_t mqdes);
  3. int mq_unlink(const char *name);
  4. int mq_getattr(mqd_t mqdes, struct mq_attr *attr); //获取消息队列的属性
  5. int mq_setattr(mqd_t mqdes, struct mq_attr *attr, struct mq_attr *oattr);//设置消息队列的属性
  6. int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);
  7. ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *prio);
  8. int mq_notify(mqd_t mqdes, const struct sigevent *notification);

基本用法

同步通知

  1. 创建或打开消息队列:使用mqd_t mq_open(const char *name,int oflag,mode_t mode,struct mq_attr *attr);函数来创建或打开一个消息队列。该函数接受队列名称、打开标志以及可选的权限和属性作为参数。如果队列不存在且指定了创建标志,将会创建一 个新的消息队列。成功创建或打开后,函数返回一个 消息队列描述符(mqd_t)。

    mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr) 中oflag和 mode 参数说明

    参数oflag:同int open(const char *pathname, int flags, mode_t mode);函数的的oflag类似有 O_RDONLY、O_RDWR, O_WRONLY,除此之外还有 O_CREAT、O_EXCL(如果 O_CREAT 指定,但 name 不存在,就返回错误),O_NONBLOCK(以非阻塞方式打开消息队列,在正常情况下 mq_receive和mq_send 函数会阻塞的地方,使用该标志打开的消息队列会返回 EAGAIN 错误)。

    参数mode:同int open(const char *pathname, int flags, mode_t mode);函数的mode参数,用于指定权限位, 比如0644权限

  2. 发送消息:使用int mq_send(mqd_t mqdes,const char *msg_ptr,size_t msg_len,unsigned int msg_prio);函数来发送一个消息到指定的消息队列。 该函数接受消息队列描述符、指向消息的指针以及消 息的大小作为参数。发送消息时,可以指定消息的优 先级,较高的优先级数值表示较高的优先级。成功发 送后,函数返回0,否则返回-1并设置errno来表示错 误原因。

  3. 接收消息:使用ssize_t mq_receive(mqd_t mqdes, char *mdg_ptr,size_t msg_len,unsigned int *msg_prio);函数来从指定的消息队列中接收一个消 息。该函数接受消息队列描述符、指向接收缓冲区的 指针以及缓冲区的最大大小作为参数。接收消息时, 可以选择按优先级接收,也可以选择非阻塞接收。成 功接收后,函数返回接收到的消息的大小,否则返 回-1并设置errno来表示错误原因。

  4. 关闭消息队列:使用int mq_close(mqd_t mqdes); 函数来关闭一个已打开的消息队列。该函数接受消息 队列描述符作为参数。关闭消息队列后,相关的资源 将被释放。

  5. 删除消息队列:使用int mq_unlink(const char *name);函数来删除一个已存在的消息队列。该函数 接受队列名称作为参数。删除一个消息队列将会移除 与之关联的所有消息和状态.

    关于 struct mq_attr属性结构体:

    struct mq_attr
     {
       long mq_flags;//阻塞标志, 0(阻塞)或O_NONBLOCK
       long mq_maxmsg;//最大消息数
       long mq_msgsize;//每个消息最大大小
       long mq_curmsgs;//当前消息数
    };
    

    示例1:使用阻塞方式读写

     // 包含所需的头文件  
    #include <pthread.h>  // POSIX线程库  
    #include <stdio.h>    // 标准输入输出库  
    #include <stdlib.h>   // 标准库  
    #include <unistd.h>   // UNIX标准库  
    #include <mqueue.h>   // POSIX消息队列库  
    #include <string.h>   // 字符串处理库  
      
    // 定义消息队列的名称和要发送的消息  
    #define QUEUE_NAME "/test_queue"  // 消息队列的名称  
    #define MESSAGE "Hello, World!"   // 要发送的消息  
    
    // 全局的消息队列描述符  
    mqd_t mq; 
    
    // sender_thread函数:发送线程的主体
    void *sender_thread(void *arg) {    
        char message[] = MESSAGE;  // 创建要发送的消息的副本  
        printf("Sender thread started.\n");  // 打印发送线程开始的消息  
        mq_send(mq, message, strlen(message) + 1, 0);  // 发送消息到消息队列  
        printf("Message sent.\n");  // 打印消息已发送的消息  
        return NULL;  // 返回NULL,表示线程正常结束  
    }  
      
    // receiver_thread函数:接收线程的主体  
    void *receiver_thread(void *arg) {    
        char buffer[256];  // 创建用于接收消息的缓冲区  
        printf("Receiver thread started.\n");  // 打印接收线程开始的消息  
        mq_receive(mq, buffer, sizeof(buffer), NULL);  // 从消息队列接收消息  
        printf("Received message: %s\n", buffer);  // 打印已接收的消息  
        return NULL;  // 返回NULL,表示线程正常结束  
    }  
      
    // main函数:程序的入口点  
    int main() {    
        pthread_t sender, receiver;  // 创建发送和接收线程的标识符  
        struct mq_attr attr;  // 创建消息队列属性结构体变量  
      
        // 设置消息队列的属性值  
        attr.mq_flags = 0;  // 消息队列的标志位设置为0  
        attr.mq_maxmsg = 10;  // 消息队列的最大消息数设置为10  
        attr.mq_msgsize = 256;  // 消息队列的每个消息的最大大小设置为256字节  
        attr.mq_curmsgs = 0;  // 消息队列的当前消息数设置为0  
      
        // 打开或创建名为QUEUE_NAME的消息队列,并设置其属性为attr指定的值  
        mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0666, &attr);    
        if (mq == (mqd_t)-1) {  // 如果打开或创建失败,则打印错误信息并返回1  
            perror("mq_open");    
            return 1;    
        }  
      
        // 创建发送线程,如果创建失败,则打印错误信息并返回1  
        if (pthread_create(&sender, NULL, sender_thread, NULL) != 0) {    
            perror("pthread_create (sender)");    
            return 1;    
        }  
      
        // 创建接收线程,如果创建失败,则打印错误信息并返回1  
        if (pthread_create(&receiver, NULL, receiver_thread, NULL) != 0) {    
            perror("pthread_create (receiver)");    
            return 1;    
        }  
      
        // 等待发送线程结束,如果发送线程已经结束,则立即返回,否则阻塞等待其结束  
        pthread_join(sender, NULL);    
        // 等待接收线程结束,如果接收线程已经结束,则立即返回,否则阻塞等待其结束  
        pthread_join(receiver, NULL);    
      
        // 关闭已打开的消息队列描述符mq所引用的消息队列,并释放由该描述符占用的所有资源  
        mq_close(mq);    
        // 删除名为QUEUE_NAME的消息队列,并将其从系统中删除,如果成功则返回0,否则返回-1并设置errno以指示错误。
         mq_unlink(QUEUE_NAME);  // 删除消息队列
         return 0;  // 程序正常退出,返回0
    }
    

异步通知

  1. 创建或打开消息队列:使用mqd_t mq_open(const char *name,int oflag,mode_t mode,struct mq_attr *attr);函数来创建或打开一个消息队列。该函数接受 队列名称、打开标志以及可选的权限和属性作为参 数。如果队列不存在且指定了创建标志,将会创建一 个新的消息队列。成功创建或打开后,函数返回一个 消息队列描述符(mqd_t)。

  2. 设置异步通知:使用int mq_notify(mqd_t mqdes,const struct sigevent *notification);函数来注册一个进程以接收异步通知。该函数接受消息队列描述符、一个指向sigevent结构的指针以及一个通知 标志作为参数。在sigevent结构中,可以设置当消息 到达时要发送的信号或者要调用的回调函数。通过设 置用int mq_notify(mqd_t mqdes,const struct sigevent *notification);,当消息队列从空变为非空 时,已注册的进程将收到一个信号或触发一个回调函 数,以异步地通知该进程。

    mq_notify函数的使用注意事项:

    a. 注册撤销:当通知被发送给它的注册进程时,其注册会被撤销。这意味着,如果希望继续接收通知, 进程必须再次调用 mq_notify 以重新注册。

    b. 空队列与数据到来:消息机制触发条件是,在消息队列为空的情况下有数据到来才会触发。当消息队列不为空时,即使有新的数据到来也不会触发通知。

    c. 阻塞与通知:只有当没有任何线程阻塞在该队列的 mq_receive 调用的前提下,通知才会发出。这意味着,如果有线程正在等待接收消息,通知可能不会被发送。

  3. 发送消息:使用int mq_send(mqd_t mqdes,const char *msg_ptr,size_t msg_len,unsigned int msg_prio);函数来发送一个消息到指定的消息队列。 该函数接受消息队列描述符、指向消息的指针以及消 息的大小作为参数。发送消息时,可以指定消息的优 先级,较高的优先级数值表示较高的优先级。成功发 送后,函数返回0,否则返回-1并设置errno来表示错 误原因。

  4. 处理异步通知:当有新消息到达时,已注册的进程 将收到一个信号或触发一个回调函数。在信号处理函数或回调函数中,可以执行相关的操作来处理新到达的消息。例如,可以调用ssize_t mq_receive(mqd_t mqdes, char *mdg_ptr,size_t msg_len,unsigned int *msg_prio);来接收并处理消息。

  5. 关闭消息队列:使用int mq_close(mqd_t mqdes); 函数来关闭一个已打开的消息队列。该函数接受消息 队列描述符作为参数。关闭消息队列后,相关的资源 将被释放。

  6. 删除消息队列:使用int mq_unlink(const char *name);函数来删除一个已存在的消息队列。该函数 接受队列名称作为参数。删除一个消息队列将会移除 与之关联的所有消息和状态.

    1. struct sigevent和sigval_t sigev_val 的定义如下:
    union sigval {            /* Data passed with notification */
        int     sival_int;    /* Integer value */
        void   *sival_ptr;    /* Pointer value */
    };
    struct sigevent {
        int    sigev_notify;  /* Notification method */
        int    sigev_signo;   /* Notification signal */
        union sigval sigev_value;
        /* Data passed with notification */
        void (*sigev_notify_function) (union sigval);
        /* Function used for thread
                                        notification (SIGEV_THREAD) */
        void  *sigev_notify_attributes;
        /* Attributes for notification thread
                                        (SIGEV_THREAD) */
        pid_t  sigev_notify_thread_id;
        /* ID of thread to signal
                                        (SIGEV_THREAD_ID); Linux-specific */
    };
    
    1. sigev_notify取值:
    • ​ SIGEV_NONE:这个值表示不需要任何通知。当sigev_notify被设置为这个值时,即使事件发生了, 也不会有任何通知发送到进程。

    • ​ SIGEV_SIGNAL:事件发生时,将sigev_signo指定的信号发送给指定的进程;

    • ​ SIGEV_THREAD:事件发生时,内核会(在此进程内)以sigev_notify_attributes为线程属性创建一个线程,并让其执行sigev_notify_function,并以sigev_value为其参数

    1. sigev_signo: 在sigev_notify=SIGEV_SIGNAL时使用,指定信号类别, 例如SIGUSR1、SIGUSR2 等
    2. sigev_value: sigev_notify=SIGEV_SIGEV_THREAD时使用,作为sigev_notify_function的参数, 当发送信号时,这个值会传递给信号处理函数。

    示例2:使用mq_notify sigev_notify = SIGEV_THREAD异步通知的方式实现

    #include <mqueue.h>
    #include <pthread.h>
    #include <stdio.h>
    #include <errno.h>
    #include <string.h>
    #include <unistd.h>
    #include <stdlib.h>
    #include <signal.h>
    
    #define QUEQUE_NAME "/test_queue"
    #define MESSAGE "mqueque,test!"
    
    void* sender_thread(void* arg)
    {
        // 发送消息
        mqd_t mqd = *(mqd_t*)arg;
        int send_size = -1;
        char message[] = MESSAGE;
        printf("sender thread message=%s, mqd=%d\n", message, mqd);
        send_size = mq_send(mqd, message, strlen(message) + 1, 0);
        if (-1 == send_size)
        {
            if (errno == EAGAIN)
            {
                printf("message queque is full\n");
            }
            else
            {
                perror("mq_send");
            }
        }
        return NULL;
    }
    void notify_thread(union sigval sval)
    {
        // 获取消息队列描述符
        mqd_t mqd = -1;
        mqd = *((mqd_t*)sval.sival_ptr);
        // 定义一个缓冲区,用于存储接收到的消息
        char buffer[256];
        // 定义一个变量,用于存储接收到的消息的大小
        ssize_t bytes_read;
        // 定义一个结构体,用于重新注册消息队列的通知
        struct sigevent sev;
        // 打印提示信息
        printf("notify_thread started, mqd=%d\n", mqd);
        // 循环接收消息,直到队列为空
        while (1)
        {
            // 从消息队列中接收消息
            bytes_read = mq_receive(mqd, buffer, 256, NULL);
            // 如果接收失败,检查错误码
            if (bytes_read == -1)
            {
                // 如果错误码是EAGAIN,说明队列为空,跳出循环
                if (errno == EAGAIN)
                {
                    printf("queue is empty\n");
                    break;
                }
                // 否则,打印错误信息,退出程序
                else
                {
                    perror("mq_receive");
                    exit(1);
                }
            }
            // 如果接收成功,打印接收到的消息的大小和内容
            printf("read %ld bytes: %s\n", (long)bytes_read, buffer);
        }
        // 重新注册消息队列的通知,使用同样的回调函数和参数
        sev.sigev_notify = SIGEV_THREAD;
        sev.sigev_notify_function = notify_thread;
        sev.sigev_notify_attributes = NULL;
        sev.sigev_value.sival_ptr = &mqd;
        if (mq_notify(mqd, &sev) == -1)
        {
            perror("mq_notify");
            exit(1);
        }
    }
    
    int main(int argc, char* argv[])
    {
        pthread_t sender, receiver;
        //创建消息队列
        mqd_t mqd = -1;
        struct mq_attr attr;
        attr.mq_flags = 0;
        attr.mq_maxmsg = 10;
        attr.mq_msgsize = 256;
        attr.mq_curmsgs = 0;
        mqd = mq_open(QUEQUE_NAME, O_CREAT | O_RDWR, 0666, &attr);
        if (mqd == (mqd_t)-1)
        {
            perror("mq_open");
            return -1;
        }
        struct sigevent sev;
        // 注册消息队列的通知,使用线程模式,指定回调函数和参数
        sev.sigev_notify = SIGEV_THREAD;
        sev.sigev_notify_function = notify_thread;
        sev.sigev_notify_attributes = NULL;
        sev.sigev_value.sival_ptr = &mqd;
        if (mq_notify(mqd, &sev) == -1)
        {
            perror("mq_notify");
            exit(1);
        }
        if (pthread_create(&sender, NULL, sender_thread, (void*)&mqd) != 0)
        {
            perror("pthread_create sender");
            return -1;
        }
    
        pthread_join(sender, NULL);
        sleep(5);//等待触发并把消息读走
        //pthread_join(receiver, NULL);
        mq_close(mqd);
        mq_unlink(QUEQUE_NAME);
        //sleep(5);
        return 0;
    }
    

标签:mqd,队列,system,int,mq,消息,posix,notify
From: https://www.cnblogs.com/keep--fighting/p/18164543

相关文章

  • 信息安全管理系统(Information Security Management System,ISMS)
    一、中英文名称中文名称:信息安全管理系统英文名称:InformationSecurityManagementSystem,简称ISMS二、定义信息安全管理系统(ISMS)是一种对信息系统中的数据和信息进行采集、传输、存储、处理和应用的全方位保护和管理的信息系统。它采用一种集中的、系统化的方法,来管理组织的信......
  • 项目管理系统(Project Management System)
    项目管理系统”(ProjectManagementSystem):一、定义项目管理系统是基于现代管理学基础之上的一种新兴的管理学科,是项目的管理者应用专门管理项目的系统软件,在有限的资源约束下,运用系统的观点、方法和理论,对项目涉及的全部工作进行有效地管理。它从项目的投资决策开始到项目结束的......
  • 决策支持系统(Decision Support System,DSS)
    决策支持系统(DecisionSupportSystem,DSS)一。定义决策支持系统(DecisionSupportSystem,DSS)是辅助决策者通过数据、模型和知识,以人机交互方式进行半结构化或非结构化决策的计算机应用系统。它是管理信息系统(MIS)向更高一级发展而产生的先进信息管理系统,为决策者提供分析问题、建立......
  • 商业智能系统(Business Intelligence System,BI)
    商业智能BI系统信息一、中英文名称中文名称:商业智能系统英文名称:BusinessIntelligenceSystem,简称BI系统二、定义商业智能(BI)系统是一种利用现代数据仓库技术、在线分析处理技术、数据挖掘和数据展现技术来为企业提供决策支持的系统。它能够将企业中现有的数据转化为知识,帮助......
  • 知识管理系统(Knowledge Management System,KMS)
    知识管理系统(KnowledgeManagementSystem,KMS):一、定义知识管理系统是收集、处理、分享一个组织的全部知识的信息系统,通常由计算机系统支持。它利用软件系统或其他工具,对组织中大量的有价值的方案、策划、成果、经验等知识进行分类存储和管理,从而积累知识资产,避免流失,并促进知识......
  • 仓库管理系统(Warehouse Management System,WMS)
    仓库管理系统(WarehouseManagementSystem,简称WMS):一、定义仓库管理系统是一种用于管理仓储业务的信息化系统,它通过数学模型和信息手段对仓库管理的各个环节进行优化和调控,实现物流信息化自动化,优化仓库管理流程,提高存储效率,降低管理成本。二、作用仓库管理系统的主要作用包括:......
  • 实验信息管理系统(Laboratory Information Management System,LIMS)
    实验信息管理系统(LaboratoryInformationManagementSystem,简称LIMS):一、定义实验信息管理系统(LIMS)是以数据库为核心的信息化技术与实验室管理需求相结合的信息化管理工具。它将实验室的业务流程、环境、人员、仪器设备、标物标液、化学试剂、标准方法、图书资料、文件记录、科......
  • 财务管理系统(Financial Management System)
    财务管理系统(FinancialManagementSystem)是一个组织或企业用于管理和记录财务信息的软件或信息系统。一、定义财务管理系统旨在协助企业管理其财务运营、监控财务状况、制定财务决策以及遵守相关法规和合规要求。它结合了多个财务管理工具、模块和功能,以提供全面而高效的财务管......
  • Fast Training Algorithms for Deep Convolutional Fuzzy Systems With Application t
    类似深度卷积神经网络DCNN,模糊系统领域有个深度卷积模糊系统deepconvolutionalfuzzysystem(DCFS),每一层都是一个模糊系统,上一层的输出是下一层的输入。这篇论文目的是加速DCFS的计算速度,解决可解释性1990年提出,也用反向传播训练DCFS受困于低维度小数据集,大数据量时计算负担太......
  • SystemVerilog -- 6.4 Interface ~ Clocking Blocks
    SystemVerilogClockingBlocks默认情况下,模块端口和接口不指定信号之间的任何时序要求或同步方案。在clocking和endclocking之间定义的时钟块正是这样做的。它是与特定时钟同步的信号集合,有助于指定时钟和信号之间的定时要求。这将允许测试编写者更多地关注事务,而不是担心信号......