VxWorks 核心数据结构详解 【消息队列、环形缓冲区、管道、FIFO、双缓冲区、共享内存】

发布于:2025-08-29 ⋅ 阅读:(14) ⋅ 点赞:(0)

VxWorks 核心数据结构详解:特性、示例与应用场景

VxWorks 作为实时操作系统(RTOS),提供了多种专为嵌入式实时环境设计的数据结构,用于任务间通信(IPC)、数据缓冲和资源共享。这些数据结构各有特性,适用于不同的实时场景(如低延迟、高吞吐量、结构化消息传递等)。本文将详细介绍 VxWorks 中常用的数据结构,包括消息队列、环形缓冲区、管道、FIFO、双缓冲区和共享内存,并提供代码示例与场景推荐。

一、消息队列(Message Queues, msgQLib)

1.1 核心特性

消息队列是 VxWorks 中最常用的 IPC 机制之一,用于结构化消息传递,支持:

固定/可变长度消息(创建时指定最大消息大小);
消息优先级(0~99,数值越大优先级越高);
内置线程安全(通过内核信号量同步,多任务并发访问安全);
阻塞/非阻塞模式(发送/接收可设置超时时间)。

1.2 关键 API

函数名 功能描述
msgQCreate() 创建消息队列
msgQSend() 发送消息(支持优先级)
msgQReceive() 接收消息(按优先级/先进先出)
msgQDelete() 删除消息队列

1.3 使用示例:任务间传递结构化消息

场景:任务 A 向任务 B 发送包含传感器数据的结构化消息(如温度、湿度)。

vxworks_msgq_example.c

#include <vxWorks.h>
#include <msgQLib.h>
#include <taskLib.h>
#include <stdio.h>

// 定义消息结构
typedef struct {
    int temp;       // 温度
    int humidity;   // 湿度 
    int sensorId;   // 传感器ID
} SensorMsg;

MSG_Q_ID g_msgQId;  // 消息队列ID

// 发送消息任务
void senderTask() {
    SensorMsg msg = {25, 60, 101};  // 示例数据
    STATUS status;

    while (1) {
        // 发送消息(优先级0:默认先进先出;WAIT_FOREVER:阻塞直到发送成功)
        status = msgQSend(g_msgQId, (char*)&msg, sizeof(SensorMsg), 0, WAIT_FOREVER);
        if (status != OK) {
            printf("Failed to send message\n");
        }
        taskDelay(sysClkRateGet() * 1);  // 1秒发送一次
    }
}

// 接收消息任务
void receiverTask() {
    SensorMsg msg;
    int msgSize;

    while (1) {
        // 接收消息(WAIT_FOREVER:阻塞直到有消息)
        msgSize = msgQReceive(g_msgQId, (char*)&msg, sizeof(SensorMsg), WAIT_FOREVER);
        if (msgSize == sizeof(SensorMsg)) {
            printf("Received: Sensor %d, Temp: %d°C, Humidity: %d%%\n", 
                   msg.sensorId, msg.temp, msg.humidity);
        }
    }
}

// 初始化函数
void msgQExampleInit() {
    // 创建消息队列:最大5条消息,每条最大sizeof(SensorMsg)字节
    g_msgQId = msgQCreate(5, sizeof(SensorMsg), MSG_Q_FIFO);  // FIFO模式(忽略优先级)
    if (g_msgQId == NULL) {
        printf("Failed to create message queue\n");
        return;
    }

    // 创建发送任务和接收任务
    taskSpawn("tSender", 100, 0, 4096, (FUNCPTR)senderTask, 0,0,0,0,0,0,0,0,0,0);
    taskSpawn("tReceiver", 100, 0, 4096, (FUNCPTR)receiverTask, 0,0,0,0,0,0,0,0,0,0);
}

1.4 典型应用场景

任务间传递控制命令(如状态切换、配置参数);
多传感器数据上报(带优先级区分关键/非关键数据);
分布式系统中模块间的结构化消息通信。

二、环形缓冲区(Ring Buffer, rngLib)

2.1 核心特性

环形缓冲区(也称循环缓冲区)是轻量级字节流缓冲,基于数组实现,特点:

无消息边界,按字节流存储;
读写指针循环移动,空间利用率高;
无内置线程安全(多任务访问需手动同步);
适合单生产者-单消费者或低延迟数据缓冲。

2.2 关键 API

函数名 功能描述
rngCreate() 创建环形缓冲区
rngPut() 写入数据(返回实际写入字节数)
rngGet() 读取数据(返回实际读取字节数)
rngDelete() 删除环形缓冲区

2.3 使用示例:带同步的生产者-消费者模型

由于 rngLib 不保证线程安全,需通过信号量手动同步:

vxworks_ringbuf_example.c

#include <vxWorks.h>
#include <rngLib.h>
#include <semLib.h>
#include <taskLib.h>
#include <stdio.h>

RING_ID g_ringId;     // 环形缓冲区ID
SEM_ID g_semId;       // 互斥信号量(保护缓冲区访问)
#define BUF_SIZE 1024  // 缓冲区大小

// 生产者任务:写入数据到环形缓冲区
void producerTask() {
    char data[32];
    int count = 0;

    while (1) {
        // 构造数据(示例:递增计数器字符串)
        sprintf(data, "Data-%d", count++);
        
        // 获取信号量,保证线程安全
        semTake(g_semId, WAIT_FOREVER);
        // 写入数据(rngPut返回实际写入字节数,若缓冲区满则返回0)
        int bytesWritten = rngPut(g_ringId, data, strlen(data) + 1);  // +1包含'\0'
        semGive(g_semId);  // 释放信号量

        if (bytesWritten == 0) {
            printf("Ring buffer full, data dropped: %s\n", data);
        }
        taskDelay(sysClkRateGet() / 2);  // 每0.5秒写入一次
    }
}

// 消费者任务:从环形缓冲区读取数据
void consumerTask() {
    char buf[32];

    while (1) {
        semTake(g_semId, WAIT_FOREVER);
        // 读取数据(rngGet返回实际读取字节数,若缓冲区空则返回0)
        int bytesRead = rngGet(g_ringId, buf, sizeof(buf)-1);  // 留1字节给'\0'
        semGive(g_semId);

        if (bytesRead > 0) {
            buf[bytesRead] = '\0';  // 手动添加字符串结束符
            printf("Received: %s\n", buf);
        }
        taskDelay(sysClkRateGet() / 2);  // 每0.5秒读取一次
    }
}

// 初始化函数
void ringBufExampleInit() {
    // 创建环形缓冲区(大小BUF_SIZE字节)
    g_ringId = rngCreate(BUF_SIZE);
    if (g_ringId == NULL) {
        printf("Failed to create ring buffer\n");
        return;
    }

    // 创建互斥信号量(初始值1:允许一个任务访问)
    g_semId = semBCreate(SEM_Q_FIFO, SEM_FULL);  // SEM_FULL=1(可用)
    if (g_semId == NULL) {
        printf("Failed to create semaphore\n");
        rngDelete(g_ringId);
        return;
    }

    // 创建生产者和消费者任务
    taskSpawn("tProducer", 100, 0, 4096, (FUNCPTR)producerTask, 0,0,0,0,0,0,0,0,0,0);
    taskSpawn("tConsumer", 100, 0, 4096, (FUNCPTR)consumerTask, 0,0,0,0,0,0,0,0,0,0);
}

2.4 典型应用场景

中断服务程序(ISR)与任务间的低延迟数据传递(如UART接收缓冲);
传感器数据流缓冲(如加速度计、陀螺仪的连续采样数据);
日志数据临时缓存(避免频繁磁盘I/O)。

三、管道(Pipes, pipeLib)

3.1 核心特性

管道是单向字节流通信机制,类似Unix管道,特点:

字节流无边界,按顺序传输;
内置线程安全(通过内核信号量同步读写);
固定缓冲区大小,写入满时阻塞,读取空时阻塞;
适合简单的“生产者-消费者”字节流场景。

3.2 关键 API

函数名 功能描述
pipeCreate() 创建管道
pipeWrite() 写入数据(返回实际写入字节数)
pipeRead() 读取数据(返回实际读取字节数)
pipeDelete() 删除管道

3.3 使用示例:日志数据传输

vxworks_pipe_example.c

#include <vxworks.h>
#include <pipeLib.h>
#include <taskLib.h>
#include <stdio.h>

PIPE_ID g_pipeId;
#define PIPE_SIZE 2048  // 管道缓冲区大小

// 日志写入任务
void logWriterTask() {
    char logMsg[128];
    int count = 0;

    while (1) {
        sprintf(logMsg, "Log entry %d: System running normally\n", count++);
        // 写入管道(pipeWrite返回实际写入字节数,满时阻塞)
        int bytesWritten = pipeWrite(g_pipeId, logMsg, strlen(logMsg));
        if (bytesWritten != strlen(logMsg)) {
            printf("Failed to write all log data\n");
        }
        taskDelay(sysClkRateGet());  // 每1秒写入一条日志
    }
}

// 日志处理任务(如写入文件/网络)
void logProcessorTask() {
    char buf[128];

    while (1) {
        // 读取管道(pipeRead返回实际读取字节数,空时阻塞)
        int bytesRead = pipeRead(g_pipeId, buf, sizeof(buf)-1);
        if (bytesRead > 0) {
            buf[bytesRead] = '\0';
            printf("Processing log: %s", buf);  // 实际场景可替换为文件写入
        }
    }
}

// 初始化函数
void pipeExampleInit() {
    // 创建管道(大小PIPE_SIZE字节)
    g_pipeId = pipeCreate(PIPE_SIZE, 0);  // 0=默认选项
    if (g_pipeId == NULL) {
        printf("Failed to create pipe\n");
        return;
    }

    // 创建写入和处理任务
    taskSpawn("tLogWriter", 100, 0, 4096, (FUNCPTR)logWriterTask, 0,0,0,0,0,0,0,0,0,0);
    taskSpawn("tLogProcessor", 100, 0, 4096, (FUNCPTR)logProcessorTask, 0,0,0,0,0,0,0,0,0,0);
}

3.4 典型应用场景

日志数据转发(如从任务到日志服务);
字符设备数据透传(如UART到网络端口);
简单的单方向字节流通信(如命令行输出重定向)。

四、FIFO(fifoLib)

4.1 核心特性

FIFO(First-In-First-Out)是针对固定大小元素的轻量级缓冲,特点:

存储固定大小的元素(如整数、结构体指针);
无内置线程安全,需手动同步;
实现简单,内存开销小,适合小数据量场景。

4.2 关键 API

函数名 功能描述
fifoCreate() 创建FIFO(指定元素大小和数量)
fifoPut() 写入元素(返回OK/ERROR)
fifoGet() 读取元素(返回OK/ERROR)
fifoDelete() 删除FIFO

4.3 使用示例:固定大小整数缓冲

vxworks_fifo_example.c


#include <vxworks.h>
#include <fifoLib.h>
#include <semLib.h>
#include <taskLib.h>
#include <stdio.h>

FIFO_ID g_fifoId;
SEM_ID g_mutexId;  // 互斥锁保护FIFO访问
#define ELEM_SIZE sizeof(int)  // 元素大小:int
#define FIFO_DEPTH 10          // FIFO深度:10个元素

// 生产者任务:写入整数到FIFO
void fifoProducerTask() {
    int data = 0;

    while (1) {
        semTake(g_mutexId, WAIT_FOREVER);
        // 写入元素(FIFO满时返回ERROR)
        STATUS status = fifoPut(g_fifoId, &data);
        semGive(g_mutexId);

        if (status == OK) {
            printf("FIFO put: %d\n", data);
            data++;
        } else {
            printf("FIFO full, data %d dropped\n", data);
        }
        taskDelay(sysClkRateGet());  // 每1秒写入一次
    }
}

// 消费者任务:从FIFO读取整数
void fifoConsumerTask() {
    int data;

    while (1) {
        semTake(g_mutexId, WAIT_FOREVER);
        // 读取元素(FIFO空时返回ERROR)
        STATUS status = fifoGet(g_fifoId, &data);
        semGive(g_mutexId);

        if (status == OK) {
            printf("FIFO get: %d\n", data);
        }
        taskDelay(sysClkRateGet() / 2);  // 每0.5秒读取一次
    }
}

// 初始化函数
void fifoExampleInit() {
    // 创建FIFO:元素大小ELEM_SIZE,深度FIFO_DEPTH
    g_fifoId = fifoCreate(ELEM_SIZE, FIFO_DEPTH);
    if (g_fifoId == NULL) {
        printf("Failed to create FIFO\n");
        return;
    }

    // 创建互斥锁
    g_mutexId = semBCreate(SEM_Q_FIFO, SEM_FULL);
    if (g_mutexId == NULL) {
        printf("Failed to create mutex\n");
        fifoDelete(g_fifoId);
        return;
    }

    // 创建生产者和消费者任务
    taskSpawn("tFifoProducer", 100, 0, 4096, (FUNCPTR)fifoProducerTask, 0,0,0,0,0,0,0,0,0,0);
    taskSpawn("tFifoConsumer", 100, 0, 4096, (FUNCPTR)fifoConsumerTask, 0,0,0,0,0,0,0,0,0,0);
}

4.4 典型应用场景

中断服务程序(ISR)向任务传递事件标志(如中断计数);
固定大小的状态数据缓冲(如设备错误码队列);
资源池管理(如预分配的结构体指针缓冲)。

五、双缓冲区(Double Buffers, dblBufLib)

5.1 核心特性

双缓冲区通过两个缓冲区切换实现零拷贝数据传递,特点:

包含“活跃缓冲区”和“备用缓冲区”,通过原子切换指针实现数据交换;
生产者写入备用缓冲区,消费者读取活跃缓冲区,无读写冲突;
零拷贝设计,适合高频数据采集与处理(如传感器、图像处理)。

5.2 关键 API

函数名 功能描述
dblBufCreate() 创建双缓冲区(指定单个缓冲区大小)
dblBufSwap() 原子切换两个缓冲区(返回原活跃缓冲区指针)
dblBufGet() 获取当前活跃缓冲区指针

5.3 使用示例:高频传感器数据采集

vxworks_dblbuf_example.c




#include <vxworks.h>
#include <dblBufLib.h>
#include <taskLib.h>
#include <stdio.h>
#include <string.h>

DBL_BUF_ID g_dblBufId;
#define BUF_SIZE 1024  // 单个缓冲区大小(字节)

// 数据采集任务(生产者):填充备用缓冲区
void collectTask() {
    char* buf;
    int count = 0;

    while (1) {
        // 获取备用缓冲区(当前未被消费者使用)
        buf = (char*)dblBufGet(g_dblBufId, DBL_BUF_ALT);  // DBL_BUF_ALT=备用缓冲区

        // 模拟采集数据(填充缓冲区)
        sprintf(buf, "Sensor data batch %d: [", count);
        for (int i = 0; i < 5; i++) {
            char temp[32];
            sprintf(temp, "%d,", rand() % 100);  // 随机模拟传感器值
            strcat(buf, temp);
        }
        strcat(buf, "]");
        count++;

        // 切换缓冲区:将当前备用缓冲区变为活跃缓冲区,原活跃缓冲区变为备用
        dblBufSwap(g_dblBufId);
        taskDelay(sysClkRateGet() / 10);  // 每0.1秒采集一批数据
    }
}

// 数据处理任务(消费者):处理活跃缓冲区
void processTask() {
    char* buf;

    while (1) {
        // 获取活跃缓冲区(生产者已填充完成的数据)
        buf = (char*)dblBufGet(g_dblBufId, DBL_BUF_CURRENT);  // DBL_BUF_CURRENT=活跃缓冲区
        printf("Processing: %s\n", buf);  // 实际场景可替换为数据分析/存储

        taskDelay(sysClkRateGet() / 10);  // 每0.1秒处理一次
    }
}

// 初始化函数
void dblBufExampleInit() {
    // 创建双缓冲区:单个缓冲区大小BUF_SIZE字节
    g_dblBufId = dblBufCreate(BUF_SIZE);
    if (g_dblBufId == NULL) {
        printf("Failed to create double buffer\n");
        return;
    }

    // 创建采集和处理任务
    taskSpawn("tCollect", 100, 0, 4096, (FUNCPTR)collectTask, 0,0,0,0,0,0,0,0,0,0);
    taskSpawn("tProcess", 100, 0, 4096, (FUNCPTR)processTask, 0,0,0,0,0,0,0,0,0,0);
}

5.4 典型应用场景

实时图像处理(采集一帧时处理上一帧);
高频传感器数据采集(如1kHz加速度计数据);
网络数据包批量接收与解析(避免频繁内存拷贝)。

六、共享内存(Shared Memory, shmemLib)

6.1 核心特性

共享内存是物理连续的内存区域,允许多任务直接访问同一块内存,特点:

无内置数据结构或同步机制,需用户手动定义结构和同步;
读写速度接近内存访问(零内核拷贝),适合超大吞吐量数据;
需配合互斥锁(mutex)或信号量实现线程安全。

6.2 关键 API

函数名 功能描述
shmemCreate() 创建共享内存(指定大小)
shmemAttach() 将共享内存映射到任务地址空间
shmemDetach() 解除映射
shmemDelete() 删除共享内存

6.3 使用示例:多任务共享大数据结构

vxworks_shmem_example.c



#include <vxworks.h>
#include <shmemLib.h>
#include <semLib.h>
#include <taskLib.h>
#include <stdio.h>

#define SHMEM_SIZE 4096  // 共享内存大小
void* g_shmemAddr;      // 共享内存地址
SEM_ID g_shmemMutex;    // 互斥锁保护共享内存访问

// 共享数据结构(用户自定义)
typedef struct {
    int counter;
    char log[1024];
} SharedData;

// 写入任务:更新共享内存中的计数器和日志
void shmemWriterTask() {
    SharedData* data = (SharedData*)g_shmemAddr;

    while (1) {
        semTake(g_shmemMutex, WAIT_FOREVER);
        data->counter++;
        sprintf(data->log, "Writer updated counter to %d", data->counter);
        semGive(g_shmemMutex);

        taskDelay(sysClkRateGet());  // 每1秒更新一次
    }
}

// 读取任务:读取共享内存中的数据
void shmemReaderTask() {
    SharedData* data = (SharedData*)g_shmemAddr;

    while (1) {
        semTake(g_shmemMutex, WAIT_FOREVER);
        printf("Shared counter: %d, Log: %s\n", data->counter, data->log);
        semGive(g_shmemMutex);

        taskDelay(sysClkRateGet());  // 每1秒读取一次
    }
}

// 初始化函数
void shmemExampleInit() {
    // 创建共享内存
    g_shmemAddr = shmemCreate(SHMEM_SIZE, 0);  // 0=默认选项
    if (g_shmemAddr == NULL) {
        printf("Failed to create shared memory\n");
        return;
    }

    // 创建互斥锁
    g_shmemMutex = semBCreate(SEM_Q_FIFO, SEM_FULL);
    if (g_shmemMutex == NULL) {
        printf("Failed to create mutex\n");
        shmemDelete(g_shmemAddr);
        return;
    }

    // 初始化共享数据
    SharedData* initData = (SharedData*)g_shmemAddr;
    initData->counter = 0;
    strcpy(initData->log, "Initial state");

    // 创建读写任务
    taskSpawn("tShmemWriter", 100, 0, 4096, (FUNCPTR)shmemWriterTask, 0,0,0,0,0,0,0,0,0,0);
    taskSpawn("tShmemReader", 100, 0, 4096, (FUNCPTR)shmemReaderTask, 0,0,0,0,0,0,0,0,0,0);
}


6.4 典型应用场景

多任务共享超大数据集(如雷达点云、视频帧);
内核与用户任务间的数据交换;
分布式多CPU系统中的内存共享(需硬件支持)。

七、数据结构选择指南

数据结构 线程安全(默认) 数据类型 核心优势 典型场景
消息队列 ✅ 内置同步 结构化消息 支持优先级,无需手动同步 任务间命令/参数传递
环形缓冲区 ❌ 需手动同步 字节流 轻量,空间利用率高 传感器数据流缓冲
管道 ✅ 内置同步 字节流 简单单向通信 日志转发、字符设备透传
FIFO ❌ 需手动同步 固定大小元素 实现简单,内存开销小 中断事件传递、小数据缓冲
双缓冲区 ✅ 切换机制 批量数据 零拷贝,适合高频数据 实时图像/传感器数据处理
共享内存 ❌ 需手动同步 自定义结构 最高性能,超大吞吐量 多任务共享大数据集

总结

VxWorks 提供了丰富的数据结构工具集,选择时需结合线程安全需求、数据类型、吞吐量和实时性要求:

结构化消息传递 → 消息队列;
轻量级字节流缓冲 → 环形缓冲区(需同步);
单向简单通信 → 管道;
固定大小小数据 → FIFO;
高频零拷贝数据 → 双缓冲区;
超大吞吐量 → 共享内存(需自定义同步)。
合理选择数据结构是保证 VxWorks 实时系统高效、稳定运行的关键。


网站公告

今日签到

点亮在社区的每一天
去签到