Linux POSIX消息队列遇到的问题和使用方法

发布于:2024-05-04 ⋅ 阅读:(32) ⋅ 点赞:(0)

一、开发环境及消息队列介绍

开发板:nuc980

1.ARM Linux中消息队列的原理
  在ARM Linux中,消息队列是通过POSIX(Portable Operating System Interface)标准实现的。消息队列在内核中维护,通过文件系统(如/dev/mqueue)或系统调用来访问。
消息队列具有以下几个主要特点
异步通信:发送进程和接收进程不需要同时处于活动状态。发送进程可以将消息放入队列,然后继续进行其他操作;接收进程可以在需要时从队列中取出消息。
消息独立性:队列中的每个消息都是独立的,拥有自己的属性和内容。
顺序性:消息队列保证消息按照发送的顺序被接收。

2.在ARM Linux中,使用消息队列通常涉及以下步骤
定义消息队列和消息结构体:需要定义消息队列的名字和消息的结构体。
创建或打开消息队列:使用mq_open函数创建或打开一个已存在的消息队列。该函数返回一个消息队列描述符,用于后续的读写操作。
发送消息:使用mq_send函数将消息发送到消息队列。该函数需要指定消息队列描述符、要发送的消息以及消息的属性(如优先级)。
接收消息:使用mq_receive函数从消息队列中接收消息。该函数需要指定消息队列描述符以及用于存储接收到的消息的缓冲区。
关闭消息队列:使用mq_close函数关闭已打开的消息队列描述符。
删除消息队列:如果不再需要消息队列,可以使用mq_unlink函数将其从文件系统中删除。
ARM Linux中消息队列的应用

3.消息队列在ARM Linux嵌入式系统中具有广泛的应用场景
多任务协同:在嵌入式系统中,经常需要多个任务协同工作。消息队列提供了一种方便的任务间通信方式,使得任务之间可以相互传递数据和控制信息。
数据采集与处理:在数据采集系统中,消息队列可以用于存储从传感器或其他数据源获取的数据。处理任务可以从队列中取出数据进行处理,实现数据的实时处理和分析。
实时通信:在需要实时通信的嵌入式系统中,消息队列可以确保消息的可靠传输和顺序接收。这对于需要保证数据一致性和可靠性的应用至关重要。

二、问题描述

使用了Linux POSIX消息队列用于线程间通信,启动程序报错-mq_open error: Function not implemented

mq_open error: Function not implemented

三、解决办法

内核开启POSIX消息队列的支持
在这里插入图片描述

四、测试代码

#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <fcntl.h>  
#include <sys/stat.h>  
#include <mqueue.h>  
#include <pthread.h>  
#include <errno.h>
  
#define QUEUE_NAME "/my_mq"  
#define MSG_MAX_SIZE 256  
  
struct mq_msg {  
    char text[MSG_MAX_SIZE];  
};  
  
// 发送线程函数  
void *sender_thread(void *arg) {  
    mqd_t mqdes;  
    //struct mq_msg msg;  
    char buf[]="Hello from Sender!";
  
    mqdes = mq_open(QUEUE_NAME, O_WRONLY);  
    if (mqdes == (mqd_t)-1) {  
        perror("mq_open");  
        exit(EXIT_FAILURE);  
    }  
  
    //strcpy(msg.text, "Hello from Sender!");  
    //if (mq_send(mqdes, (const char *)&msg, sizeof(struct mq_msg), 0) == -1) 
    if (mq_send(mqdes, buf, sizeof(buf), 0) == -1) 
    {  
        perror("mq_send");  
        exit(EXIT_FAILURE);  
    }  
  
    mq_close(mqdes);  
    return NULL;  
}  
  
// 接收线程函数  
void *receiver_thread(void *arg) {  
    mqd_t mqdes;  
    struct mq_msg msg;  
    ssize_t bytes_read;  
  
    mqdes = mq_open(QUEUE_NAME, O_RDONLY);  
    if (mqdes == (mqd_t)-1) {  
        perror("mq_open");  
        exit(EXIT_FAILURE);  
    }  
  
    //bytes_read = mq_receive(mqdes, (char *)&msg, sizeof(struct mq_msg), NULL);  
    bytes_read = mq_receive(mqdes, &msg.text[0], sizeof(struct mq_msg), NULL);  
    if (bytes_read == -1) {  
        perror("mq_receive");  
        exit(EXIT_FAILURE);  
    }  
  
    printf("Received message: %s\n", msg.text);  
  
    mq_close(mqdes);  
    return NULL;  
}  
  
int main() {  
    pthread_t sender_tid, receiver_tid;  
    mqd_t mqdes;  
    struct mq_attr attr;  
    int ret;
  
    // 创建消息队列(如果需要)  
    mqdes = mq_open(QUEUE_NAME, O_CREAT | O_EXCL | O_RDWR, 0666, NULL);  
    if (mqdes != (mqd_t)-1) {  
        ret = mq_getattr(mqdes, &attr);
        if(ret < 0) {
            perror("mq_getattr");
            return (-1);
        }
        // 设置队列属性(可选) 
        attr.mq_flags = 0; 
        attr.mq_maxmsg = 10;  
        attr.mq_msgsize = sizeof(struct mq_msg);  
        attr.mq_curmsgs = 0;  
        if (mq_setattr(mqdes, &attr, NULL) == -1) {  
            perror("mq_setattr");  
            exit(EXIT_FAILURE);  
        }  
        mq_close(mqdes); // 创建后关闭,因为将在线程中重新打开  
    } else if (errno != EEXIST) {  
        perror("mq_open");  
        exit(EXIT_FAILURE);  
    }  
  
    // 创建发送者和接收者线程  

    // 创建接收者线程(注意:在实际应用中,你可能希望先启动接收者线程)  
    if (pthread_create(&receiver_tid, NULL, receiver_thread, NULL)) {  
        perror("pthread_create receiver");  
        return 1;  
    }  

    if (pthread_create(&sender_tid, NULL, sender_thread, NULL)) {  
        perror("pthread_create sender");  
        exit(EXIT_FAILURE);  
    }
  
    // 等待线程完成  
    pthread_join(sender_tid, NULL);  
    pthread_join(receiver_tid, NULL);  
  
    // 在实际应用中,你可能还需要处理消息的清理和队列的删除  
  
    return 0;  
}

带互斥量的测试程序,如下:

#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <fcntl.h>  
#include <sys/stat.h>  
#include <mqueue.h>  
#include <pthread.h>  
  
#define QUEUE_NAME "/my_mq"  
#define MSG_MAX_SIZE 256  
  
struct mq_msg {  
    char text[MSG_MAX_SIZE];  
};  
  
// 全局变量:消息队列描述符和互斥锁  
mqd_t mqdes;  
pthread_mutex_t mq_mutex;  
  
// 发送消息到队列的函数  
void send_message(const char *msg) {  
    struct mq_msg mq_msg;  
    strncpy(mq_msg.text, msg, MSG_MAX_SIZE);  
  
    pthread_mutex_lock(&mq_mutex); // 锁定互斥锁  
    if (mq_send(mqdes, (const char *)&mq_msg, sizeof(struct mq_msg), 0) == -1) {  
        perror("mq_send");  
        exit(EXIT_FAILURE);  
    }  
    pthread_mutex_unlock(&mq_mutex); // 解锁互斥锁  
}  
  
// 从队列接收消息的函数  
void receive_message() {  
    struct mq_msg mq_msg;  
    ssize_t bytes_read;  
  
    pthread_mutex_lock(&mq_mutex); // 锁定互斥锁  
    bytes_read = mq_receive(mqdes, (char *)&mq_msg, sizeof(struct mq_msg), NULL);  
    if (bytes_read == -1) {  
        perror("mq_receive");  
        exit(EXIT_FAILURE);  
    }  
    printf("Received message: %s\n", mq_msg.text);  
    pthread_mutex_unlock(&mq_mutex); // 解锁互斥锁  
}  
  
// 发送者线程函数  
void *sender_thread(void *arg) {  
    send_message("Hello from Sender!");  
    return NULL;  
}  
  
// 接收者线程函数  
void *receiver_thread(void *arg) {  
    receive_message();  
    return NULL;  
}  
  
int main() {  
    pthread_t sender_tid, receiver_tid;  
  
    // 初始化互斥锁  
    if (pthread_mutex_init(&mq_mutex, NULL) != 0) {  
        perror("pthread_mutex_init");  
        exit(EXIT_FAILURE);  
    }  
  
    // 打开或创建消息队列  
    mqdes = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0666, NULL);  
    if (mqdes == (mqd_t)-1) {  
        perror("mq_open");  
        exit(EXIT_FAILURE);  
    }  
  
    // 创建发送者和接收者线程  
    if (pthread_create(&sender_tid, NULL, sender_thread, NULL) != 0) {  
        perror("pthread_create sender");  
        exit(EXIT_FAILURE);  
    }  
    if (pthread_create(&receiver_tid, NULL, receiver_thread, NULL) != 0) {  
        perror("pthread_create receiver");  
        exit(EXIT_FAILURE);  
    }  
  
    // 等待线程完成  
    pthread_join(sender_tid, NULL);  
    pthread_join(receiver_tid, NULL);  
  
    // 关闭消息队列  
    mq_close(mqdes);  
  
    // 销毁互斥锁  
    pthread_mutex_destroy(&mq_mutex);  
  
    return 0;  
}

编译程序时,需要链接实时库(librt)

-lrt
arm-linux-gcc -o mq_example main.c -lrt -lpthread

测试程序报错,错误提示:

mq_receive: Message too long

在这里插入图片描述
修改代码如下:

#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <fcntl.h>  
#include <sys/stat.h>  
#include <mqueue.h>  
#include <pthread.h>  
#include <errno.h>
#include <unistd.h>
  
#define MAXSIZE (10)
#define BUFFER (8192)
#define FILE_NAME "/posix"
struct msg_type {
 int len;
 char buf[MAXSIZE];
};

// 发送线程函数  
void *sender_thread(void *arg) {  
    mqd_t msgq_id_tx;
    struct msg_type msg_tx;
    unsigned int prio_tx = 1;
    struct mq_attr msgq_attr_tx;
    int ret_tx;
    static int i_tx;

    msgq_id_tx = mq_open(FILE_NAME, O_WRONLY);
    if(msgq_id_tx == (mqd_t)-1) {
        perror("mq_open");
        return NULL;
    }
    for(;;)
    {
        memset(msg_tx.buf, 0, MAXSIZE);
        sprintf(msg_tx.buf, "%d", i_tx++);
        strcat(msg_tx.buf,"-hello world");
        //sprintf(msg_tx.buf, "%s", "hello world");
        msg_tx.len = strlen(msg_tx.buf);
        fprintf(stdout, "send msg.buf = %s\n", msg_tx.buf);
        ret_tx= mq_send(msgq_id_tx, (char*)&msg_tx, sizeof(struct msg_type), prio_tx);
        if(ret_tx < 0) {
            perror("mq_send");
            return NULL;
        }
        sleep(1);
        
    }
    sleep(60);
    ret_tx = mq_close(msgq_id_tx);
    if(ret_tx < 0) {
        perror("mq_close");
        return NULL;
    }
    ret_tx = mq_unlink(FILE_NAME);
    if(ret_tx < 0) {
        perror("mq_unlink");
        return NULL;
    }
    return NULL;  

}  
  
// 接收线程函数  
void *receiver_thread(void *arg) {  
    mqd_t msgq_id_rx;
    unsigned int sender_rx;
    struct msg_type msg_rx;
    struct mq_attr msgq_attr_rx;
    long recv_size_rx = BUFFER;
    int ret_rx;
    int i_rx;
    msgq_id_rx = mq_open(FILE_NAME, O_RDWR);
    if(msgq_id_rx == (mqd_t)-1) {
        perror("mq_open");
        return NULL;
    }
    ret_rx = mq_getattr(msgq_id_rx, &msgq_attr_rx);
    if(ret_rx < 0) {
        perror("mq_getattr");
        return NULL;
    }
    printf("msgq_attr_rx.mq_msgsize=%d\r\n",msgq_attr_rx.mq_msgsize);
    printf("msgq_attr_rx.mq_maxmsg=%d\r\n",msgq_attr_rx.mq_maxmsg);
    if(recv_size_rx < msgq_attr_rx.mq_msgsize) {
        recv_size_rx = msgq_attr_rx.mq_msgsize;
    }
    for(;;)
    {
        msg_rx.len = -1;
        memset(msg_rx.buf, 0, MAXSIZE);
        ret_rx = mq_receive(msgq_id_rx, (char*)&msg_rx, recv_size_rx, &sender_rx);
        if (ret_rx < 0) {
            perror("mq_receive");
            return NULL;
        }
        fprintf(stdout, "rx msg.len = %d, msg.buf = %s\r\n", msg_rx.len, msg_rx.buf);
        printf("***********************\r\n");
        printf("len=%d\r\n",strlen(msg_rx.buf));
        printf("data=\r\n%s\r\n",msg_rx.buf);
        printf("$$$$$$$$$$$$$$$$$$$$$$$\r\n");
        sleep(1);

    }
    ret_rx = mq_close(msgq_id_rx);
    if(ret_rx < 0) {
        perror("mq_close");
        return NULL;
    }
    return NULL;  
}  
  
int main() {  
    pthread_t sender_tid, receiver_tid;  
    mqd_t msgq_id;
    struct mq_attr msgq_attr;
    int ret;
  
    // 创建消息队列(如果需要)  
    msgq_id = mq_open(FILE_NAME, O_CREAT | O_EXCL | O_RDWR, 0666, NULL);  
    if (msgq_id != (mqd_t)-1) {  
        ret = mq_getattr(msgq_id, &msgq_attr);
        if(ret < 0) {
            perror("mq_getattr");
            return (-1);
        }
        // 设置队列属性(可选) 
        // msgq_attr.mq_flags = 0; 
        // msgq_attr.mq_maxmsg = 10;  
        // msgq_attr.mq_msgsize = sizeof(struct msg_type);  
        // msgq_attr.mq_curmsgs = 0;  
        ret = mq_setattr(msgq_id, &msgq_attr, NULL);
        if(ret < 0) {
            perror("mq_setattr");
            return (-1);
        }
        mq_close(msgq_id); // 创建后关闭,因为将在线程中重新打开  
    } 
    else if (errno != EEXIST) 
    {  
        perror("mq_open");  
        exit(EXIT_FAILURE);  
    }  
  
    // 创建发送者和接收者线程  
    if (pthread_create(&sender_tid, NULL, sender_thread, NULL)) {  
        perror("pthread_create sender");  
        exit(EXIT_FAILURE);  
    }
    // 创建接收者线程(注意:在实际应用中,你可能希望先启动接收者线程)  
    if (pthread_create(&receiver_tid, NULL, receiver_thread, NULL)) {  
        perror("pthread_create receiver");  
        return 1;  
    }  
  
    // 等待线程完成  
    pthread_join(sender_tid, NULL);  
    pthread_join(receiver_tid, NULL);  
  
    // 在实际应用中,你可能还需要处理消息的清理和队列的删除  
  
    return 0;  
}

编译,程序正常启动,但是不知道为啥,接收的数据有问题。
在这里插入图片描述

在这里插入图片描述
长度超过12,数据就不全了。将 MAXSIZE 的宏定义 改大即可。

注意:
POSIX消息队列与System V消息队列的差别:
对象引用计数,可以更为安全的删除对象;
与System V消息类型不同,POSIX消息有一个关联的优先级,消息之间是严格的按照优先级排队接收;
提供了一个特性允许队列中的一条消息可异步通知进程;
可选组件,通过CONFIG_POSIX_MQUEUE配置。

参考
Linux系统编程手册/52-POSIX-消息队列/


网站公告

今日签到

点亮在社区的每一天
去签到