1、简介
消息队列(Message Queue) 是 Linux 提供的一种进程间通信(IPC)机制,允许进程通过发送和接收消息块来进行数据交换。与管道、共享内存不同,消息队列具有结构化、非阻塞和优先级控制等特点。
MQ 采用链表来实现消息队列,该链表是由系统内核维护,系统中可能有很多的 MQ,每个 MQ 用消息队列描述符(消息队列 ID)来区分。在进行任务间通信时,一个任务将消息加到 MQ 尾端,另一个任务从消息队列中取消息(不一定以先进先出来取消息,也可以按照消息类型字段取消息),这样就实现了任务间的通信。如下 MQ 的模型
消息队列的典型特点如下:
- 异步通信
发送方和接收方不需同时存在,消息可以先入队,稍后由接收方读取。 - 消息有结构
每条消息可以包含一个类型/优先级标识,便于有选择地接收特定类型的消息。 - 支持阻塞与非阻塞模式
接收和发送操作可配置为阻塞或非阻塞,提高通信灵活性。 - 容量限制与排队机制
消息队列有最大消息数和每条消息最大长度限制,超出后发送可能阻塞或失败。 - 跨进程通信(IPC)
支持多个进程之间通信,可用于分布式或模块化系统中。
2、SylixOS 下的消息队列
一个 SylixOS 消息队列必须要调用 API_MsgQueueCreate
函数创建之后才能使用,如果创建成功,API_MsgQueueCreate
函数将返回一个消息队列的句柄。
任务如果需要接收消息,可以调用 API_MsgQueueReceive
函数。发送消息可以调用 API_MsgQueueSend
函数。
SylixOS 支持消息优先级,让一些更紧急的消息得到及时处理。优先级由 API_MsgQueueSendEx2
函数入参控制。
当一个消息队列使用完毕后(并确保以后也不再使用),应该调用 API_MsgQueueDelete
函数将其删除,SylixOS 会回收该消息队列占用的内核资源。
使用消息队列接口时,要注意一些场景。例如,调用 API_MsgQueueSend、API_MsgQueueReceive 要考虑,该函数是否会引起睡眠?(这是由函数入参决定的)。以上只是案例,实际使用时要小心。
这里只讲解部分接口,完整的接口以及实现请阅读 SylixOS 内核源码。
2.1 创建消息队列
LW_API
LW_OBJECT_HANDLE API_MsgQueueCreate (CPCHAR pcName,
ULONG ulMaxMsgCounter,
size_t stMaxMsgByteSize,
ULONG ulOption,
LW_OBJECT_ID *pulId)
{
......
/* 函数返回的 pulId 就是 event ID,用来索引指定事件 */
__KERNEL_MODE_PROC(
pevent = _Allocate_Event_Object(); /* 申请事件 */
);
if (!pevent) {
__KERNEL_MODE_PROC(
_Free_MsgQueue_Object(pmsgqueue);
);
_DebugHandle(__ERRORMESSAGE_LEVEL, "there is no ID to build a msgqueue.\r\n");
_ErrorHandle(ERROR_KERNEL_LOW_MEMORY);
return (LW_OBJECT_HANDLE_INVALID);
}
/* 初始化消息缓冲区,消息缓冲区大小是不固定的,根据函数入参来决定 */
stMaxMsgByteSizeReal = ROUND_UP(stMaxMsgByteSize, sizeof(size_t))
+ sizeof(LW_CLASS_MSGNODE); /* 每条消息缓存大小 */
stHeapAllocateByteSize = (size_t)ulMaxMsgCounter
* stMaxMsgByteSizeReal; /* 需要分配的内存总大小 */
pvMemAllocate = __KHEAP_ALLOC(stHeapAllocateByteSize); /* 申请内存 */
if (!pvMemAllocate) {
__KERNEL_MODE_PROC(
_Free_MsgQueue_Object(pmsgqueue);
_Free_Event_Object(pevent);
);
_DebugHandle(__ERRORMESSAGE_LEVEL, "kernel low memory.\r\n");
_ErrorHandle(ERROR_KERNEL_LOW_MEMORY);
return (LW_OBJECT_HANDLE_INVALID);
}
if (pcName) { /* 拷贝名字 */
lib_strcpy(pevent->EVENT_cEventName, pcName);
} else {
pevent->EVENT_cEventName[0] = PX_EOS; /* 清空名字 */
}
pmsgqueue->MSGQUEUE_pvBuffer = pvMemAllocate;
pmsgqueue->MSGQUEUE_stMaxBytes = stMaxMsgByteSize;
/* 这里会将申请到的消息缓冲区,根号每个消息大小,组成一个消息链表 */
_MsgQueueClear(pmsgqueue, ulMaxMsgCounter); /* 缓存区准备好 */
......
}
2.2 发送消息队列
LW_API
ULONG API_MsgQueueSend2 (LW_OBJECT_HANDLE ulId,
const PVOID pvMsgBuffer,
size_t stMsgLen,
ULONG ulTimeout)
{
......
/* 通过事件 ID 找到对应事件,进而找到对应的消息队列 */
pmsgqueue = (PLW_CLASS_MSGQUEUE)pevent->EVENT_pvPtr;
if (stMsgLen > pmsgqueue->MSGQUEUE_stMaxBytes) { /* 长度太长 */
__KERNEL_EXIT_IRQ(iregInterLevel); /* 退出内核 */
_DebugHandle(__ERRORMESSAGE_LEVEL, "ulMsgLen invalidate.\r\n");
_ErrorHandle(ERROR_MSGQUEUE_MSG_LEN);
return (ERROR_MSGQUEUE_MSG_LEN);
}
if (_EventWaitNum(EVENT_MSG_Q_R, pevent)) { /* 有任务在等待消息 */
BOOL bSendOk = LW_TRUE;
/* 尝试将阻塞在接收该消息的任务由阻塞态,修改为就绪态 */
if (pevent->EVENT_ulOption & LW_OPTION_WAIT_PRIORITY) { /* 优先级等待队列 */
_EVENT_DEL_Q_PRIORITY(EVENT_MSG_Q_R, ppringList); /* 激活优先级等待线程 */
ptcb = _EventReadyPriorityLowLevel(pevent, LW_NULL, ppringList);
} else {
_EVENT_DEL_Q_FIFO(EVENT_MSG_Q_R, ppringList); /* 激活FIFO等待线程 */
ptcb = _EventReadyFifoLowLevel(pevent, LW_NULL, ppringList);
}
if ((stMsgLen > ptcb->TCB_stMaxByteSize) &&
!(ptcb->TCB_ulRecvOption & LW_OPTION_NOERROR)) { /* 是否允许自动截断 */
*ptcb->TCB_pstMsgByteSize = 0;
ptcb->TCB_stMaxByteSize = 0;
bSendOk = LW_FALSE;
} else {
stRealLen = (stMsgLen < ptcb->TCB_stMaxByteSize) ?
(stMsgLen) : (ptcb->TCB_stMaxByteSize); /* 确定信息拷贝长短 */
*ptcb->TCB_pstMsgByteSize = stRealLen; /* 保存长短 */
/*
* 注意,这里会将 msg 信息,直接拷贝到线程的 TCB 中 msg 指向的 buffer 中
* TCB 中 msg 指针,是在任务尝试接收消息被阻塞时设置的。msg 指针直接指向阻塞任务的接收消息缓冲区
* 这样可以省去一次拷贝操作,不需要在把消息拷贝到消息队列中
*/
lib_memcpy(ptcb->TCB_pvMsgQueueMessage, /* 传递消息 */
pvMsgBuffer,
stRealLen);
}
KN_INT_ENABLE(iregInterLevel); /* 使能中断 */
/* 尝试将阻塞在接收该消息的任务,分配 cpu 运行 */
_EventReadyHighLevel(ptcb,
LW_THREAD_STATUS_MSGQUEUE,
LW_SCHED_ACT_INTERRUPT); /* 处理 TCB */
MONITOR_EVT_LONG2(MONITOR_EVENT_ID_MSGQ, MONITOR_EVENT_MSGQ_POST,
ulId, ptcb->TCB_ulId, LW_NULL);
/* 这里会引起调度,尝试调度阻塞在接收该消息的任务 */
__KERNEL_EXIT(); /* 退出内核 */
if (bSendOk == LW_FALSE) {
goto __re_send; /* 重新发送 */
}
return (ERROR_NONE);
} else {
if (pevent->EVENT_ulCounter < pevent->EVENT_ulMaxCounter) { /* 检查是否还有空间加 */
pevent->EVENT_ulCounter++;
/* 这里会将消息放到消息队列缓冲区 */
_MsgQueuePut(pmsgqueue, pvMsgBuffer, stMsgLen,
EVENT_MSG_Q_PRIO_LOW); /* 保存消息 */
__KERNEL_EXIT_IRQ(iregInterLevel); /* 退出内核 */
return (ERROR_NONE);
} else {
/* 下面就是处理,如果消息队列已经满了的情况,选择是否需要阻塞,等待消息队列有空闲 */
if ((ulTimeout == LW_OPTION_NOT_WAIT) ||
LW_CPU_GET_CUR_NESTING()) { /* 不需要等待 */
__KERNEL_EXIT_IRQ(iregInterLevel); /* 退出内核 */
_ErrorHandle(ERROR_MSGQUEUE_FULL);
return (ERROR_MSGQUEUE_FULL);
}
LW_TCB_GET_CUR(ptcbCur); /* 当前任务控制块 */
ptcbCur->TCB_iPendQ = EVENT_MSG_Q_S;
ptcbCur->TCB_usStatus |= LW_THREAD_STATUS_MSGQUEUE; /* 写状态位,开始等待 */
ptcbCur->TCB_ucWaitTimeout = LW_WAIT_TIME_CLEAR; /* 清空等待时间 */
if (ulTimeout == LW_OPTION_WAIT_INFINITE) { /* 是否是无穷等待 */
ptcbCur->TCB_ulDelay = 0ul;
} else {
ptcbCur->TCB_ulDelay = ulTimeout; /* 设置超时时间 */
}
__KERNEL_TIME_GET_IGNIRQ(ulTimeSave, ULONG); /* 记录系统时间 */
if (pevent->EVENT_ulOption & LW_OPTION_WAIT_PRIORITY) {
_EVENT_INDEX_Q_PRIORITY(ptcbCur->TCB_ucPriority, ucPriorityIndex);
_EVENT_PRIORITY_Q_PTR(EVENT_MSG_Q_S, ppringList, ucPriorityIndex);
ptcbCur->TCB_ppringPriorityQueue = ppringList; /* 记录等待队列位置 */
_EventWaitPriority(pevent, ppringList); /* 加入优先级等待表 */
} else { /* 按 FIFO 等待 */
_EVENT_FIFO_Q_PTR(EVENT_MSG_Q_S, ppringList); /* 确定 FIFO 队列的位置 */
_EventWaitFifo(pevent, ppringList); /* 加入 FIFO 等待表 */
}
KN_INT_ENABLE(iregInterLevel); /* 使能中断 */
ulEventOption = pevent->EVENT_ulOption;
iSchedRet = __KERNEL_EXIT(); /* 调度器解锁 */
......
}
}
}
SylixOS 消息队列还支持紧急消息的发送,支持消息的优先级,保证了某些异常情况下的安全。函数接口为:API_MsgQueueSendEx2
,通过入参 ulOption
来控制消息的优先级。
/*********************************************************************************************************
消息队列发送选项 (URGENT 与 BROADCAST 不能同时设置)
*********************************************************************************************************/
#define LW_OPTION_URGENT 0x00000001 /* 消息队列紧急消息发送 */
#define LW_OPTION_URGENT_0 LW_OPTION_URGENT/* 最高紧急优先级 */
#define LW_OPTION_URGENT_1 0x00000011
#define LW_OPTION_URGENT_2 0x00000021
#define LW_OPTION_URGENT_3 0x00000031
#define LW_OPTION_URGENT_4 0x00000041
#define LW_OPTION_URGENT_5 0x00000051
#define LW_OPTION_URGENT_6 0x00000061
#define LW_OPTION_URGENT_7 0x00000071 /* 最低紧急优先级 */
#define LW_OPTION_BROADCAST 0x00000002 /* 消息队列广播发送 */
LW_API ULONG API_MsgQueueSendEx2(LW_OBJECT_HANDLE ulId,
const PVOID pvMsgBuffer,
size_t stMsgLen,
ULONG ulTimeout,
ULONG ulOption);/* 带有超时的发送消息高级接口 */
关于这里的消息优先级,我们讲解一下 _MsgQueuePut
函数。
/*********************************************************************************************************
** 函数名称: _MsgQueuePut
** 功能描述: 向消息队列中写入一个消息
** 输 入 : pmsgqueue 消息队列控制块
** : pvMsgBuffer 消息缓冲区
** : stMsgLen 消息长度
** : uiPrio 消息优先级
** 输 出 : NONE
** 全局变量:
** 调用模块:
*********************************************************************************************************/
VOID _MsgQueuePut (PLW_CLASS_MSGQUEUE pmsgqueue,
PVOID pvMsgBuffer,
size_t stMsgLen,
UINT uiPrio)
{
PLW_CLASS_MSGNODE pmsgnode;
/* 从消息队列中的空闲消息链表上,找到一个空闲消息块 */
pmsgnode = (PLW_CLASS_MSGNODE)_list_mono_allocate(&pmsgqueue->MSGQUEUE_pmonoFree);
pmsgnode->MSGNODE_stMsgLen = stMsgLen;
lib_memcpy((PVOID)(pmsgnode + 1), pvMsgBuffer, stMsgLen); /* 拷贝消息 */
/* 这里会设置优先级位图,就是为了确保,从消息队列中取数据时,取的一定是优先级最高的消息 */
if (pmsgqueue->MSGQUEUE_pmonoHeader[uiPrio] == LW_NULL) {
pmsgqueue->MSGQUEUE_uiMap |= (1 << uiPrio); /* 设置优先级位图 */
}
/* 将消息块链接在指定的优先级链表上(尾插) */
_list_mono_free_seq(&pmsgqueue->MSGQUEUE_pmonoHeader[uiPrio],
&pmsgqueue->MSGQUEUE_pmonoTail[uiPrio],
&pmsgnode->MSGNODE_monoManage);
}
2.3 接收消息队列
/*********************************************************************************************************
** 函数名称: _MsgQueueGet
** 功能描述: 从消息队列中获得一个消息
** 输 入 : pmsgqueue 消息队列控制块
** : pvMsgBuffer 接收缓冲区
** : stMaxByteSize 缓冲区大小
** : pstMsgLen 获得消息的长度
** 输 出 : NONE
** 全局变量:
** 调用模块:
*********************************************************************************************************/
VOID _MsgQueueGet (PLW_CLASS_MSGQUEUE pmsgqueue,
PVOID pvMsgBuffer,
size_t stMaxByteSize,
size_t *pstMsgLen)
{
INT iQ;
PLW_CLASS_MSGNODE pmsgnode;
/* 在当前的消息队列中,找到优先级最高的消息块链表 */
iQ = archFindLsb(pmsgqueue->MSGQUEUE_uiMap) - 1; /* 计算优先级 */
_BugHandle(!pmsgqueue->MSGQUEUE_pmonoHeader[iQ], LW_TRUE, "buffer is empty!\r\n");
/* 从消息链表上取下一个消息块,取的顺序是从 header 开始取,即相同优先级,按照先入先出顺序 */
pmsgnode = (PLW_CLASS_MSGNODE)_list_mono_allocate_seq(&pmsgqueue->MSGQUEUE_pmonoHeader[iQ],
&pmsgqueue->MSGQUEUE_pmonoTail[iQ]);
if (pmsgqueue->MSGQUEUE_pmonoHeader[iQ] == LW_NULL) {
pmsgqueue->MSGQUEUE_uiMap &= ~(1 << iQ); /* 清除优先级位图 */
}
*pstMsgLen = (stMaxByteSize < pmsgnode->MSGNODE_stMsgLen) ?
(stMaxByteSize) : (pmsgnode->MSGNODE_stMsgLen); /* 确定拷贝信息数量 */
lib_memcpy(pvMsgBuffer, (PVOID)(pmsgnode + 1), *pstMsgLen); /* 拷贝消息 */
/* 将该消息块从消息链表中删除,并回收到消息队列的空闲链表上 */
_list_mono_free(&pmsgqueue->MSGQUEUE_pmonoFree, &pmsgnode->MSGNODE_monoManage);
}
API_MsgQueueReceive
函数中的重点,还是 _MsgQueueGet
函数。
LW_API
ULONG API_MsgQueueReceive (LW_OBJECT_HANDLE ulId,
PVOID pvMsgBuffer,
size_t stMaxByteSize,
size_t *pstMsgLen,
ULONG ulTimeout)
{
......
pevent = &_K_eventBuffer[usIndex];
iregInterLevel = __KERNEL_ENTER_IRQ(); /* 进入内核 */
if (_Event_Type_Invalid(usIndex, LW_TYPE_EVENT_MSGQUEUE)) {
__KERNEL_EXIT_IRQ(iregInterLevel); /* 退出内核 */
_ErrorHandle(ERROR_MSGQUEUE_TYPE);
return (ERROR_MSGQUEUE_TYPE);
}
/* 通过指定事件 ID,获得消息队列结构 */
pmsgqueue = (PLW_CLASS_MSGQUEUE)pevent->EVENT_pvPtr;
ptcbCur->TCB_ulRecvOption = LW_OPTION_NOERROR; /* 接收大消息自动截断 */
/* 如果消息队列中,消息不为空 */
if (pevent->EVENT_ulCounter) { /* 事件有效 */
pevent->EVENT_ulCounter--;
_MsgQueueGet(pmsgqueue, pvMsgBuffer,
stMaxByteSize, pstMsgLen); /* 获得消息 */
if (_EventWaitNum(EVENT_MSG_Q_S, pevent)) { /* 有任务在等待写消息 */
if (pevent->EVENT_ulOption & LW_OPTION_WAIT_PRIORITY) { /* 优先级等待队列 */
_EVENT_DEL_Q_PRIORITY(EVENT_MSG_Q_S, ppringList); /* 激活优先级等待线程 */
ptcb = _EventReadyPriorityLowLevel(pevent, LW_NULL, ppringList);
} else {
_EVENT_DEL_Q_FIFO(EVENT_MSG_Q_S, ppringList); /* 激活FIFO等待线程 */
ptcb = _EventReadyFifoLowLevel(pevent, LW_NULL, ppringList);
}
KN_INT_ENABLE(iregInterLevel); /* 使能中断 */
_EventReadyHighLevel(ptcb,
LW_THREAD_STATUS_MSGQUEUE,
LW_SCHED_ACT_INTERRUPT); /* 处理 TCB */
__KERNEL_EXIT(); /* 退出内核 */
} else {
__KERNEL_EXIT_IRQ(iregInterLevel); /* 退出内核 */
}
return (ERROR_NONE);
}
/* 下面就是判断,如果消息队列为空,是否需要阻塞等待 */
if (ulTimeout == LW_OPTION_NOT_WAIT) { /* 不等待 */
__KERNEL_EXIT_IRQ(iregInterLevel); /* 退出内核 */
_ErrorHandle(ERROR_THREAD_WAIT_TIMEOUT); /* 超时 */
return (ERROR_THREAD_WAIT_TIMEOUT);
}
ptcbCur->TCB_pstMsgByteSize = pstMsgLen;
ptcbCur->TCB_stMaxByteSize = stMaxByteSize;
/* 如果在 receive 消息时被阻塞,则将 buffer 赋值给 TCB 中的 msg 指针。再被唤醒时,数据已经被拷贝到 buffer 中了 */
ptcbCur->TCB_pvMsgQueueMessage = pvMsgBuffer; /* 记录信息 */
......
}