下面是一个完整的 C 语言实现,创建一个线程来接收消息队列中的数据,缓存到队列中,然后逐个处理。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>
#include <unistd.h>
// 消息结构体
typedef struct {
long mtype; // 消息类型
char mtext[1024]; // 消息内容
} Message;
// 数据节点结构
typedef struct DataNode {
Message message;
struct DataNode *next;
} DataNode;
// 线程共享数据结构
typedef struct {
pthread_mutex_t lock; // 互斥锁
pthread_cond_t cond; // 条件变量
DataNode *head; // 队列头指针
DataNode *tail; // 队列尾指针
int msgq_id; // 消息队列ID
int running; // 线程运行标志
} ThreadData;
// 初始化线程共享数据
int init_thread_data(ThreadData *data, int msgq_id) {
data->head = NULL;
data->tail = NULL;
data->msgq_id = msgq_id;
data->running = 1;
if (pthread_mutex_init(&data->lock, NULL) {
perror("pthread_mutex_init failed");
return -1;
}
if (pthread_cond_init(&data->cond, NULL)) {
perror("pthread_cond_init failed");
pthread_mutex_destroy(&data->lock);
return -1;
}
return 0;
}
// 清理线程共享数据
void cleanup_thread_data(ThreadData *data) {
pthread_mutex_lock(&data->lock);
// 清空队列
DataNode *current = data->head;
while (current != NULL) {
DataNode *temp = current;
current = current->next;
free(temp);
}
data->head = NULL;
data->tail = NULL;
pthread_mutex_unlock(&data->lock);
pthread_mutex_destroy(&data->lock);
pthread_cond_destroy(&data->cond);
}
// 添加消息到队列
void enqueue_message(ThreadData *data, const Message *msg) {
DataNode *node = (DataNode *)malloc(sizeof(DataNode));
if (!node) {
perror("malloc failed");
return;
}
node->message = *msg;
node->next = NULL;
pthread_mutex_lock(&data->lock);
if (data->tail == NULL) {
data->head = node;
data->tail = node;
} else {
data->tail->next = node;
data->tail = node;
}
// 通知有数据到达
pthread_cond_signal(&data->cond);
pthread_mutex_unlock(&data->lock);
}
// 从队列获取消息
int dequeue_message(ThreadData *data, Message *msg) {
pthread_mutex_lock(&data->lock);
// 等待队列不为空
while (data->head == NULL && data->running) {
pthread_cond_wait(&data->cond, &data->lock);
}
if (!data->running) {
pthread_mutex_unlock(&data->lock);
return -1;
}
DataNode *node = data->head;
*msg = node->message;
data->head = node->next;
if (data->head == NULL) {
data->tail = NULL;
}
pthread_mutex_unlock(&data->lock);
free(node);
return 0;
}
// 处理消息的函数
void process_message(const Message *msg) {
printf("Processing message type %ld: %s\n", msg->mtype, msg->mtext);
// 这里添加实际的消息处理逻辑
}
// 消息处理线程函数
void *process_thread_func(void *arg) {
ThreadData *data = (ThreadData *)arg;
Message msg;
while (1) {
if (dequeue_message(data, &msg) {
break; // 线程终止
}
process_message(&msg);
}
return NULL;
}
// 消息接收线程函数
void *receive_thread_func(void *arg) {
ThreadData *data = (ThreadData *)arg;
Message msg;
while (data->running) {
// 接收消息 (阻塞方式)
ssize_t len = msgrcv(data->msgq_id, &msg, sizeof(msg.mtext), 0, 0);
if (len == -1) {
if (errno == EINTR) {
continue; // 被信号中断,继续
}
perror("msgrcv failed");
break;
}
msg.mtext[len] = '\0'; // 确保字符串结束
// 将消息加入队列
enqueue_message(data, &msg);
}
// 通知处理线程退出
pthread_mutex_lock(&data->lock);
pthread_cond_signal(&data->cond);
pthread_mutex_unlock(&data->lock);
return NULL;
}
int main() {
// 创建或获取消息队列
key_t key = ftok(".", 'a');
if (key == -1) {
perror("ftok failed");
return 1;
}
int msgq_id = msgget(key, IPC_CREAT | 0666);
if (msgq_id == -1) {
perror("msgget failed");
return 1;
}
printf("Message queue created with id: %d\n", msgq_id);
// 初始化线程共享数据
ThreadData thread_data;
if (init_thread_data(&thread_data, msgq_id)) {
return 1;
}
// 创建接收线程和处理线程
pthread_t receive_thread, process_thread;
if (pthread_create(&receive_thread, NULL, receive_thread_func, &thread_data)) {
perror("pthread_create receive_thread failed");
cleanup_thread_data(&thread_data);
return 1;
}
if (pthread_create(&process_thread, NULL, process_thread_func, &thread_data)) {
perror("pthread_create process_thread failed");
thread_data.running = 0;
pthread_join(receive_thread, NULL);
cleanup_thread_data(&thread_data);
return 1;
}
// 主线程等待用户输入退出
printf("Press Enter to exit...\n");
getchar();
// 设置停止标志
thread_data.running = 0;
// 唤醒可能正在等待的线程
pthread_mutex_lock(&thread_data.lock);
pthread_cond_signal(&thread_data.cond);
pthread_mutex_unlock(&thread_data.lock);
// 等待线程结束
pthread_join(receive_thread, NULL);
pthread_join(process_thread, NULL);
// 清理资源
cleanup_thread_data(&thread_data);
// 删除消息队列 (可选)
if (msgctl(msgq_id, IPC_RMID, NULL) == -1) {
perror("msgctl IPC_RMID failed");
}
return 0;
}
代码说明
1. 消息队列结构:
◦ 使用标准的 System V 消息队列
◦ 消息类型为 Message 结构体
2. 线程安全队列:
◦ 使用链表实现 FIFO 队列
◦ 使用互斥锁 (pthread_mutex_t) 保护共享数据
◦ 使用条件变量 (pthread_cond_t) 实现生产者-消费者模型
3. 线程设计:
◦ 接收线程: 从消息队列接收消息并放入处理队列
◦ 处理线程: 从处理队列取出消息并处理
4. 同步机制:
◦ 当队列为空时,处理线程等待条件变量
◦ 当有新消息到达时,接收线程通知条件变量
5. 优雅退出:
◦ 设置 running 标志位控制线程退出
◦ 确保资源正确释放
编译与运行
编译命令:
gcc -o msgq_processor msgq_processor.c -lpthread
运行:
./msgq_processor
扩展建议
1. 可以添加更复杂的错误处理机制
2. 可以增加队列最大长度限制
3. 可以添加统计功能,如处理消息数量统计
4. 可以根据需要修改消息处理函数 process_message() 的实现
这个实现提供了基本的框架,你可以根据实际需求进行修改和扩展。