Linux 消息队列接收与处理线程实现

发布于:2025-07-15 ⋅ 阅读:(16) ⋅ 点赞:(0)

下面是一个完整的 C 语言实现,创建一个线程来接收消息队列中的数据,缓存到队列中,然后逐个处理。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>
#include <unistd.h>

// 消息结构体
typedef struct {
    long mtype;             // 消息类型
    char mtext[1024];      // 消息内容
} Message;

// 数据节点结构
typedef struct DataNode {
    Message message;
    struct DataNode *next;
} DataNode;

// 线程共享数据结构
typedef struct {
    pthread_mutex_t lock;   // 互斥锁
    pthread_cond_t cond;    // 条件变量
    DataNode *head;         // 队列头指针
    DataNode *tail;         // 队列尾指针
    int msgq_id;            // 消息队列ID
    int running;            // 线程运行标志
} ThreadData;

// 初始化线程共享数据
int init_thread_data(ThreadData *data, int msgq_id) {
    data->head = NULL;
    data->tail = NULL;
    data->msgq_id = msgq_id;
    data->running = 1;
    
    if (pthread_mutex_init(&data->lock, NULL) {
        perror("pthread_mutex_init failed");
        return -1;
    }
    
    if (pthread_cond_init(&data->cond, NULL)) {
        perror("pthread_cond_init failed");
        pthread_mutex_destroy(&data->lock);
        return -1;
    }
    
    return 0;
}

// 清理线程共享数据
void cleanup_thread_data(ThreadData *data) {
    pthread_mutex_lock(&data->lock);
    
    // 清空队列
    DataNode *current = data->head;
    while (current != NULL) {
        DataNode *temp = current;
        current = current->next;
        free(temp);
    }
    
    data->head = NULL;
    data->tail = NULL;
    
    pthread_mutex_unlock(&data->lock);
    pthread_mutex_destroy(&data->lock);
    pthread_cond_destroy(&data->cond);
}

// 添加消息到队列
void enqueue_message(ThreadData *data, const Message *msg) {
    DataNode *node = (DataNode *)malloc(sizeof(DataNode));
    if (!node) {
        perror("malloc failed");
        return;
    }
    
    node->message = *msg;
    node->next = NULL;
    
    pthread_mutex_lock(&data->lock);
    
    if (data->tail == NULL) {
        data->head = node;
        data->tail = node;
    } else {
        data->tail->next = node;
        data->tail = node;
    }
    
    // 通知有数据到达
    pthread_cond_signal(&data->cond);
    pthread_mutex_unlock(&data->lock);
}

// 从队列获取消息
int dequeue_message(ThreadData *data, Message *msg) {
    pthread_mutex_lock(&data->lock);
    
    // 等待队列不为空
    while (data->head == NULL && data->running) {
        pthread_cond_wait(&data->cond, &data->lock);
    }
    
    if (!data->running) {
        pthread_mutex_unlock(&data->lock);
        return -1;
    }
    
    DataNode *node = data->head;
    *msg = node->message;
    
    data->head = node->next;
    if (data->head == NULL) {
        data->tail = NULL;
    }
    
    pthread_mutex_unlock(&data->lock);
    
    free(node);
    return 0;
}

// 处理消息的函数
void process_message(const Message *msg) {
    printf("Processing message type %ld: %s\n", msg->mtype, msg->mtext);
    // 这里添加实际的消息处理逻辑
}

// 消息处理线程函数
void *process_thread_func(void *arg) {
    ThreadData *data = (ThreadData *)arg;
    Message msg;
    
    while (1) {
        if (dequeue_message(data, &msg) {
            break; // 线程终止
        }
        
        process_message(&msg);
    }
    
    return NULL;
}

// 消息接收线程函数
void *receive_thread_func(void *arg) {
    ThreadData *data = (ThreadData *)arg;
    Message msg;
    
    while (data->running) {
        // 接收消息 (阻塞方式)
        ssize_t len = msgrcv(data->msgq_id, &msg, sizeof(msg.mtext), 0, 0);
        if (len == -1) {
            if (errno == EINTR) {
                continue; // 被信号中断,继续
            }
            perror("msgrcv failed");
            break;
        }
        
        msg.mtext[len] = '\0'; // 确保字符串结束
        
        // 将消息加入队列
        enqueue_message(data, &msg);
    }
    
    // 通知处理线程退出
    pthread_mutex_lock(&data->lock);
    pthread_cond_signal(&data->cond);
    pthread_mutex_unlock(&data->lock);
    
    return NULL;
}

int main() {
    // 创建或获取消息队列
    key_t key = ftok(".", 'a');
    if (key == -1) {
        perror("ftok failed");
        return 1;
    }
    
    int msgq_id = msgget(key, IPC_CREAT | 0666);
    if (msgq_id == -1) {
        perror("msgget failed");
        return 1;
    }
    
    printf("Message queue created with id: %d\n", msgq_id);
    
    // 初始化线程共享数据
    ThreadData thread_data;
    if (init_thread_data(&thread_data, msgq_id)) {
        return 1;
    }
    
    // 创建接收线程和处理线程
    pthread_t receive_thread, process_thread;
    
    if (pthread_create(&receive_thread, NULL, receive_thread_func, &thread_data)) {
        perror("pthread_create receive_thread failed");
        cleanup_thread_data(&thread_data);
        return 1;
    }
    
    if (pthread_create(&process_thread, NULL, process_thread_func, &thread_data)) {
        perror("pthread_create process_thread failed");
        thread_data.running = 0;
        pthread_join(receive_thread, NULL);
        cleanup_thread_data(&thread_data);
        return 1;
    }
    
    // 主线程等待用户输入退出
    printf("Press Enter to exit...\n");
    getchar();
    
    // 设置停止标志
    thread_data.running = 0;
    
    // 唤醒可能正在等待的线程
    pthread_mutex_lock(&thread_data.lock);
    pthread_cond_signal(&thread_data.cond);
    pthread_mutex_unlock(&thread_data.lock);
    
    // 等待线程结束
    pthread_join(receive_thread, NULL);
    pthread_join(process_thread, NULL);
    
    // 清理资源
    cleanup_thread_data(&thread_data);
    
    // 删除消息队列 (可选)
    if (msgctl(msgq_id, IPC_RMID, NULL) == -1) {
        perror("msgctl IPC_RMID failed");
    }
    
    return 0;
}

代码说明
1. 消息队列结构:
◦ 使用标准的 System V 消息队列
◦ 消息类型为 Message 结构体
2. 线程安全队列:
◦ 使用链表实现 FIFO 队列
◦ 使用互斥锁 (pthread_mutex_t) 保护共享数据
◦ 使用条件变量 (pthread_cond_t) 实现生产者-消费者模型
3. 线程设计:
◦ 接收线程: 从消息队列接收消息并放入处理队列
◦ 处理线程: 从处理队列取出消息并处理
4. 同步机制:
◦ 当队列为空时,处理线程等待条件变量
◦ 当有新消息到达时,接收线程通知条件变量
5. 优雅退出:
◦ 设置 running 标志位控制线程退出
◦ 确保资源正确释放
编译与运行
编译命令:
gcc -o msgq_processor msgq_processor.c -lpthread
运行:
./msgq_processor
扩展建议
1. 可以添加更复杂的错误处理机制
2. 可以增加队列最大长度限制
3. 可以添加统计功能,如处理消息数量统计
4. 可以根据需要修改消息处理函数 process_message() 的实现
这个实现提供了基本的框架,你可以根据实际需求进行修改和扩展。


网站公告

今日签到

点亮在社区的每一天
去签到