SylixOS 下的消息队列

发布于:2025-06-22 ⋅ 阅读:(19) ⋅ 点赞:(0)

1、简介

  消息队列(Message Queue) 是 Linux 提供的一种进程间通信(IPC)机制,允许进程通过发送和接收消息块来进行数据交换。与管道、共享内存不同,消息队列具有结构化、非阻塞和优先级控制等特点。

  MQ 采用链表来实现消息队列,该链表是由系统内核维护,系统中可能有很多的 MQ,每个 MQ 用消息队列描述符(消息队列 ID)来区分。在进行任务间通信时,一个任务将消息加到 MQ 尾端,另一个任务从消息队列中取消息(不一定以先进先出来取消息,也可以按照消息类型字段取消息),这样就实现了任务间的通信。如下 MQ 的模型
在这里插入图片描述
  消息队列的典型特点如下:

  • 异步通信
    发送方和接收方不需同时存在,消息可以先入队,稍后由接收方读取。
  • 消息有结构
    每条消息可以包含一个类型/优先级标识,便于有选择地接收特定类型的消息。
  • 支持阻塞与非阻塞模式
    接收和发送操作可配置为阻塞或非阻塞,提高通信灵活性。
  • 容量限制与排队机制
    消息队列有最大消息数和每条消息最大长度限制,超出后发送可能阻塞或失败。
  • 跨进程通信(IPC)
    支持多个进程之间通信,可用于分布式或模块化系统中。

2、SylixOS 下的消息队列

  一个 SylixOS 消息队列必须要调用 API_MsgQueueCreate 函数创建之后才能使用,如果创建成功,API_MsgQueueCreate 函数将返回一个消息队列的句柄。

  任务如果需要接收消息,可以调用 API_MsgQueueReceive 函数。发送消息可以调用 API_MsgQueueSend 函数。

  SylixOS 支持消息优先级,让一些更紧急的消息得到及时处理。优先级由 API_MsgQueueSendEx2 函数入参控制。

  当一个消息队列使用完毕后(并确保以后也不再使用),应该调用 API_MsgQueueDelete 函数将其删除,SylixOS 会回收该消息队列占用的内核资源。

使用消息队列接口时,要注意一些场景。例如,调用 API_MsgQueueSend、API_MsgQueueReceive 要考虑,该函数是否会引起睡眠?(这是由函数入参决定的)。以上只是案例,实际使用时要小心。

这里只讲解部分接口,完整的接口以及实现请阅读 SylixOS 内核源码。

2.1 创建消息队列

LW_API  
LW_OBJECT_HANDLE  API_MsgQueueCreate (CPCHAR             pcName,
                                      ULONG              ulMaxMsgCounter,
                                      size_t             stMaxMsgByteSize,
                                      ULONG              ulOption,
                                      LW_OBJECT_ID      *pulId)
{
	......
	/* 函数返回的 pulId 就是 event ID,用来索引指定事件 */
	 __KERNEL_MODE_PROC(
        pevent = _Allocate_Event_Object();                              /*  申请事件                    */
    );

    if (!pevent) {
        __KERNEL_MODE_PROC(
            _Free_MsgQueue_Object(pmsgqueue);
        );
        _DebugHandle(__ERRORMESSAGE_LEVEL, "there is no ID to build a msgqueue.\r\n");
        _ErrorHandle(ERROR_KERNEL_LOW_MEMORY);
        return  (LW_OBJECT_HANDLE_INVALID);
    }
    
    /* 初始化消息缓冲区,消息缓冲区大小是不固定的,根据函数入参来决定 */
    stMaxMsgByteSizeReal   = ROUND_UP(stMaxMsgByteSize, sizeof(size_t))
                           + sizeof(LW_CLASS_MSGNODE);                  /*  每条消息缓存大小            */
    
    stHeapAllocateByteSize = (size_t)ulMaxMsgCounter
                           * stMaxMsgByteSizeReal;                      /*  需要分配的内存总大小        */

    pvMemAllocate = __KHEAP_ALLOC(stHeapAllocateByteSize);              /*  申请内存                    */
    if (!pvMemAllocate) {
        __KERNEL_MODE_PROC(
            _Free_MsgQueue_Object(pmsgqueue);
            _Free_Event_Object(pevent);
        );
        _DebugHandle(__ERRORMESSAGE_LEVEL, "kernel low memory.\r\n");
        _ErrorHandle(ERROR_KERNEL_LOW_MEMORY);
        return  (LW_OBJECT_HANDLE_INVALID);
    }
    
    if (pcName) {                                                       /*  拷贝名字                    */
        lib_strcpy(pevent->EVENT_cEventName, pcName);
    } else {
        pevent->EVENT_cEventName[0] = PX_EOS;                           /*  清空名字                    */
    }

    pmsgqueue->MSGQUEUE_pvBuffer   = pvMemAllocate;
    pmsgqueue->MSGQUEUE_stMaxBytes = stMaxMsgByteSize;
    
    /* 这里会将申请到的消息缓冲区,根号每个消息大小,组成一个消息链表 */
    _MsgQueueClear(pmsgqueue, ulMaxMsgCounter);                         /*  缓存区准备好                */
    ......
}

2.2 发送消息队列

LW_API  
ULONG  API_MsgQueueSend2 (LW_OBJECT_HANDLE  ulId,
                          const PVOID       pvMsgBuffer,
                          size_t            stMsgLen,
                          ULONG             ulTimeout)
{
	......
	/* 通过事件 ID 找到对应事件,进而找到对应的消息队列 */
	pmsgqueue = (PLW_CLASS_MSGQUEUE)pevent->EVENT_pvPtr;

    if (stMsgLen > pmsgqueue->MSGQUEUE_stMaxBytes) {                    /*  长度太长                    */
        __KERNEL_EXIT_IRQ(iregInterLevel);                              /*  退出内核                    */
        _DebugHandle(__ERRORMESSAGE_LEVEL, "ulMsgLen invalidate.\r\n");
        _ErrorHandle(ERROR_MSGQUEUE_MSG_LEN);
        return  (ERROR_MSGQUEUE_MSG_LEN);
    }
    
    if (_EventWaitNum(EVENT_MSG_Q_R, pevent)) {                         /*  有任务在等待消息            */
        BOOL    bSendOk = LW_TRUE;
        /* 尝试将阻塞在接收该消息的任务由阻塞态,修改为就绪态 */
        if (pevent->EVENT_ulOption & LW_OPTION_WAIT_PRIORITY) {         /*  优先级等待队列              */
            _EVENT_DEL_Q_PRIORITY(EVENT_MSG_Q_R, ppringList);           /*  激活优先级等待线程          */
            ptcb = _EventReadyPriorityLowLevel(pevent, LW_NULL, ppringList);
        
        } else {
            _EVENT_DEL_Q_FIFO(EVENT_MSG_Q_R, ppringList);               /*  激活FIFO等待线程            */
            ptcb = _EventReadyFifoLowLevel(pevent, LW_NULL, ppringList);
        }
        
        if ((stMsgLen > ptcb->TCB_stMaxByteSize) && 
            !(ptcb->TCB_ulRecvOption & LW_OPTION_NOERROR)) {            /*  是否允许自动截断            */
            *ptcb->TCB_pstMsgByteSize = 0;
            ptcb->TCB_stMaxByteSize = 0;
            bSendOk = LW_FALSE;
            
        } else {
            stRealLen = (stMsgLen < ptcb->TCB_stMaxByteSize) ?
                        (stMsgLen) : (ptcb->TCB_stMaxByteSize);         /*  确定信息拷贝长短            */
            
            *ptcb->TCB_pstMsgByteSize = stRealLen;                      /*  保存长短                    */
            /* 
             * 注意,这里会将 msg 信息,直接拷贝到线程的 TCB 中 msg 指向的 buffer 中
             * TCB 中 msg 指针,是在任务尝试接收消息被阻塞时设置的。msg 指针直接指向阻塞任务的接收消息缓冲区
             * 这样可以省去一次拷贝操作,不需要在把消息拷贝到消息队列中
             */
            lib_memcpy(ptcb->TCB_pvMsgQueueMessage,                     /*  传递消息                    */
                       pvMsgBuffer, 
                       stRealLen);
        }
                   
        KN_INT_ENABLE(iregInterLevel);                                  /*  使能中断                    */
        /* 尝试将阻塞在接收该消息的任务,分配 cpu 运行 */
        _EventReadyHighLevel(ptcb,
                             LW_THREAD_STATUS_MSGQUEUE,
                             LW_SCHED_ACT_INTERRUPT);                   /*  处理 TCB                    */
        
        MONITOR_EVT_LONG2(MONITOR_EVENT_ID_MSGQ, MONITOR_EVENT_MSGQ_POST, 
                          ulId, ptcb->TCB_ulId, LW_NULL);
        
        /* 这里会引起调度,尝试调度阻塞在接收该消息的任务 */
        __KERNEL_EXIT();                                                /*  退出内核                    */
        
        if (bSendOk == LW_FALSE) {
            goto    __re_send;                                          /*  重新发送                    */
        }
        
        return  (ERROR_NONE);
        
    } else {
        if (pevent->EVENT_ulCounter < pevent->EVENT_ulMaxCounter) {     /*  检查是否还有空间加          */
            pevent->EVENT_ulCounter++;
            /* 这里会将消息放到消息队列缓冲区 */
            _MsgQueuePut(pmsgqueue, pvMsgBuffer, stMsgLen, 
                         EVENT_MSG_Q_PRIO_LOW);                         /*  保存消息                    */
            __KERNEL_EXIT_IRQ(iregInterLevel);                          /*  退出内核                    */
            return  (ERROR_NONE);
        
        } else {
        	/* 下面就是处理,如果消息队列已经满了的情况,选择是否需要阻塞,等待消息队列有空闲 */
	        if ((ulTimeout == LW_OPTION_NOT_WAIT) || 
	            LW_CPU_GET_CUR_NESTING()) {                             /*  不需要等待                  */
	            __KERNEL_EXIT_IRQ(iregInterLevel);                      /*  退出内核                    */
	            _ErrorHandle(ERROR_MSGQUEUE_FULL);
	            return  (ERROR_MSGQUEUE_FULL);
	        }
        
        	LW_TCB_GET_CUR(ptcbCur);                                    /*  当前任务控制块              */
        
        	ptcbCur->TCB_iPendQ         = EVENT_MSG_Q_S;
        	ptcbCur->TCB_usStatus      |= LW_THREAD_STATUS_MSGQUEUE;    /*  写状态位,开始等待          */
        	ptcbCur->TCB_ucWaitTimeout  = LW_WAIT_TIME_CLEAR;           /*  清空等待时间                */
        
	        if (ulTimeout == LW_OPTION_WAIT_INFINITE) {                 /*  是否是无穷等待              */
	            ptcbCur->TCB_ulDelay = 0ul;
	        } else {
	            ptcbCur->TCB_ulDelay = ulTimeout;                       /*  设置超时时间                */
	        }
        	__KERNEL_TIME_GET_IGNIRQ(ulTimeSave, ULONG);                /*  记录系统时间                */

	        if (pevent->EVENT_ulOption & LW_OPTION_WAIT_PRIORITY) {
	            _EVENT_INDEX_Q_PRIORITY(ptcbCur->TCB_ucPriority, ucPriorityIndex);
	            _EVENT_PRIORITY_Q_PTR(EVENT_MSG_Q_S, ppringList, ucPriorityIndex);
	            ptcbCur->TCB_ppringPriorityQueue = ppringList;          /*  记录等待队列位置            */
	            _EventWaitPriority(pevent, ppringList);                 /*  加入优先级等待表            */
	            
	        } else {                                                    /*  按 FIFO 等待                */
	            _EVENT_FIFO_Q_PTR(EVENT_MSG_Q_S, ppringList);           /*  确定 FIFO 队列的位置        */
	            _EventWaitFifo(pevent, ppringList);                     /*  加入 FIFO 等待表            */
	        }
        
        	KN_INT_ENABLE(iregInterLevel);                              /*  使能中断                    */
        
        	ulEventOption = pevent->EVENT_ulOption;
        
        	iSchedRet = __KERNEL_EXIT();                                /*  调度器解锁                  */
			......        
	    }
	}
}

  SylixOS 消息队列还支持紧急消息的发送,支持消息的优先级,保证了某些异常情况下的安全。函数接口为:API_MsgQueueSendEx2 ,通过入参 ulOption 来控制消息的优先级。

/*********************************************************************************************************
  消息队列发送选项 (URGENT 与 BROADCAST 不能同时设置)
*********************************************************************************************************/

#define LW_OPTION_URGENT                                0x00000001      /*  消息队列紧急消息发送        */
#define LW_OPTION_URGENT_0                              LW_OPTION_URGENT/*  最高紧急优先级              */
#define LW_OPTION_URGENT_1                              0x00000011
#define LW_OPTION_URGENT_2                              0x00000021
#define LW_OPTION_URGENT_3                              0x00000031
#define LW_OPTION_URGENT_4                              0x00000041
#define LW_OPTION_URGENT_5                              0x00000051
#define LW_OPTION_URGENT_6                              0x00000061
#define LW_OPTION_URGENT_7                              0x00000071      /*  最低紧急优先级              */

#define LW_OPTION_BROADCAST                             0x00000002      /*  消息队列广播发送            */


LW_API ULONG            API_MsgQueueSendEx2(LW_OBJECT_HANDLE  ulId,
                                            const PVOID       pvMsgBuffer,
                                            size_t            stMsgLen,
                                            ULONG             ulTimeout,
                                            ULONG             ulOption);/*  带有超时的发送消息高级接口  */

  关于这里的消息优先级,我们讲解一下 _MsgQueuePut 函数。

/*********************************************************************************************************
** 函数名称: _MsgQueuePut
** 功能描述: 向消息队列中写入一个消息
** 输 入  : pmsgqueue     消息队列控制块 
**         : pvMsgBuffer   消息缓冲区
**         : stMsgLen      消息长度
**         : uiPrio        消息优先级
** 输 出  : NONE
** 全局变量: 
** 调用模块: 
*********************************************************************************************************/
VOID  _MsgQueuePut (PLW_CLASS_MSGQUEUE    pmsgqueue,
                    PVOID                 pvMsgBuffer,
                    size_t                stMsgLen, 
                    UINT                  uiPrio)
{
    PLW_CLASS_MSGNODE  pmsgnode;
    
    /* 从消息队列中的空闲消息链表上,找到一个空闲消息块 */
    pmsgnode = (PLW_CLASS_MSGNODE)_list_mono_allocate(&pmsgqueue->MSGQUEUE_pmonoFree);
    
    pmsgnode->MSGNODE_stMsgLen = stMsgLen;
    
    lib_memcpy((PVOID)(pmsgnode + 1), pvMsgBuffer, stMsgLen);           /*  拷贝消息                    */
    
    /* 这里会设置优先级位图,就是为了确保,从消息队列中取数据时,取的一定是优先级最高的消息 */
    if (pmsgqueue->MSGQUEUE_pmonoHeader[uiPrio] == LW_NULL) {
        pmsgqueue->MSGQUEUE_uiMap |= (1 << uiPrio);                     /*  设置优先级位图              */
    }
    
    /* 将消息块链接在指定的优先级链表上(尾插) */
    _list_mono_free_seq(&pmsgqueue->MSGQUEUE_pmonoHeader[uiPrio], 
                        &pmsgqueue->MSGQUEUE_pmonoTail[uiPrio], 
                        &pmsgnode->MSGNODE_monoManage);
}

2.3 接收消息队列

/*********************************************************************************************************
** 函数名称: _MsgQueueGet
** 功能描述: 从消息队列中获得一个消息
** 输 入  : pmsgqueue     消息队列控制块
**         : pvMsgBuffer   接收缓冲区
**         : stMaxByteSize 缓冲区大小
**         : pstMsgLen     获得消息的长度
** 输 出  : NONE
** 全局变量: 
** 调用模块: 
*********************************************************************************************************/
VOID  _MsgQueueGet (PLW_CLASS_MSGQUEUE    pmsgqueue,
                    PVOID                 pvMsgBuffer,
                    size_t                stMaxByteSize,
                    size_t               *pstMsgLen)
{
    INT                 iQ;
    PLW_CLASS_MSGNODE   pmsgnode;
    
    /* 在当前的消息队列中,找到优先级最高的消息块链表 */
    iQ = archFindLsb(pmsgqueue->MSGQUEUE_uiMap) - 1;                    /*  计算优先级                  */
    
    _BugHandle(!pmsgqueue->MSGQUEUE_pmonoHeader[iQ], LW_TRUE, "buffer is empty!\r\n");
    
    /* 从消息链表上取下一个消息块,取的顺序是从 header 开始取,即相同优先级,按照先入先出顺序 */
    pmsgnode = (PLW_CLASS_MSGNODE)_list_mono_allocate_seq(&pmsgqueue->MSGQUEUE_pmonoHeader[iQ],
                                                          &pmsgqueue->MSGQUEUE_pmonoTail[iQ]);
    
    if (pmsgqueue->MSGQUEUE_pmonoHeader[iQ] == LW_NULL) {
        pmsgqueue->MSGQUEUE_uiMap &= ~(1 << iQ);                        /*  清除优先级位图              */
    }
    
    *pstMsgLen = (stMaxByteSize < pmsgnode->MSGNODE_stMsgLen) ? 
                 (stMaxByteSize) : (pmsgnode->MSGNODE_stMsgLen);        /*  确定拷贝信息数量            */
                 
    lib_memcpy(pvMsgBuffer, (PVOID)(pmsgnode + 1), *pstMsgLen);         /*  拷贝消息                    */
    
    /* 将该消息块从消息链表中删除,并回收到消息队列的空闲链表上 */
    _list_mono_free(&pmsgqueue->MSGQUEUE_pmonoFree, &pmsgnode->MSGNODE_monoManage);
}

  API_MsgQueueReceive 函数中的重点,还是 _MsgQueueGet 函数。

LW_API  
ULONG  API_MsgQueueReceive (LW_OBJECT_HANDLE    ulId,
                            PVOID               pvMsgBuffer,
                            size_t              stMaxByteSize,
                            size_t             *pstMsgLen,
                            ULONG               ulTimeout)

{
	......
	pevent = &_K_eventBuffer[usIndex];
    
    iregInterLevel = __KERNEL_ENTER_IRQ();                              /*  进入内核                    */
    if (_Event_Type_Invalid(usIndex, LW_TYPE_EVENT_MSGQUEUE)) {
        __KERNEL_EXIT_IRQ(iregInterLevel);                              /*  退出内核                    */
        _ErrorHandle(ERROR_MSGQUEUE_TYPE);
        return  (ERROR_MSGQUEUE_TYPE);
    }
    
    /* 通过指定事件 ID,获得消息队列结构 */
    pmsgqueue = (PLW_CLASS_MSGQUEUE)pevent->EVENT_pvPtr;

    ptcbCur->TCB_ulRecvOption = LW_OPTION_NOERROR;                      /*  接收大消息自动截断          */
    
    /* 如果消息队列中,消息不为空 */
    if (pevent->EVENT_ulCounter) {                                      /*  事件有效                    */
       pevent->EVENT_ulCounter--;
       _MsgQueueGet(pmsgqueue, pvMsgBuffer, 
                    stMaxByteSize, pstMsgLen);                         /*  获得消息                    */
       
       if (_EventWaitNum(EVENT_MSG_Q_S, pevent)) {                     /*  有任务在等待写消息          */
           if (pevent->EVENT_ulOption & LW_OPTION_WAIT_PRIORITY) {     /*  优先级等待队列              */
               _EVENT_DEL_Q_PRIORITY(EVENT_MSG_Q_S, ppringList);       /*  激活优先级等待线程          */
               ptcb = _EventReadyPriorityLowLevel(pevent, LW_NULL, ppringList);
           
           } else {
               _EVENT_DEL_Q_FIFO(EVENT_MSG_Q_S, ppringList);           /*  激活FIFO等待线程            */
               ptcb = _EventReadyFifoLowLevel(pevent, LW_NULL, ppringList);
           }
           
           KN_INT_ENABLE(iregInterLevel);                              /*  使能中断                    */
           _EventReadyHighLevel(ptcb,
                                LW_THREAD_STATUS_MSGQUEUE,
                                LW_SCHED_ACT_INTERRUPT);               /*  处理 TCB                    */
           __KERNEL_EXIT();                                            /*  退出内核                    */
       
       } else {
           __KERNEL_EXIT_IRQ(iregInterLevel);                          /*  退出内核                    */
       }
       return  (ERROR_NONE);
   }
   
   	 /* 下面就是判断,如果消息队列为空,是否需要阻塞等待 */
	 if (ulTimeout == LW_OPTION_NOT_WAIT) {                              /*  不等待                      */
	     __KERNEL_EXIT_IRQ(iregInterLevel);                              /*  退出内核                    */
	     _ErrorHandle(ERROR_THREAD_WAIT_TIMEOUT);                        /*  超时                        */
	     return  (ERROR_THREAD_WAIT_TIMEOUT);
	 }
	 
    ptcbCur->TCB_pstMsgByteSize    = pstMsgLen;
    ptcbCur->TCB_stMaxByteSize     = stMaxByteSize;
    /* 如果在 receive 消息时被阻塞,则将 buffer 赋值给 TCB 中的 msg 指针。再被唤醒时,数据已经被拷贝到 buffer 中了 */
    ptcbCur->TCB_pvMsgQueueMessage = pvMsgBuffer;                       /*  记录信息                    */
	 ......
}