RTOS 内存管理实现分析 [ RT-Thread ]

发布于:2022-10-15 ⋅ 阅读:(427) ⋅ 点赞:(0)

在单片机编程界一般分为两派,一派是裸奔派,一派是系统派。

裸奔就是我们常见的main开头,一堆初始化函数,一个while大循环干到底,
把系统要监测处理的任务全放在里面扫描,再配合中断函数进行各种控制的程序,学名叫前后台程序。

系统派就是在单片机上架上操作系统,进行多进程(任务)并发运行,因为单片机(MCU)一般没有MMU,因此通常跑的是RTOS,即实时操作系统。有了OS,就可以把系统任务按一定的方式划分为多个小的任务(多个进程),由操作系统进行调度,以实现并发运行的效果。

采用RTOS有很多好处,随便说几个点吧:硬实时响应,基于优先级抢占,提高应用程序对时间关键事件的响应。系统性能最大化。紧密集成的中间件,进行内存管理,任务间的通信等。有了RTOS,程序的层次也更加清晰,给系统添加功能也更方便,这一切在大型项目中越发的明显。
前提是得对RTOS比较了解,对多进程/多任务编程有一定认识,线程安全,进程间通信、同步,可重入函数,移植、裁剪等等,不然驾驭
不了反而要起副作用。

有一段时间,吴工把听过的没听过的,用过的没用过的RTOS都研究或了解了一把,尤其喜欢找那种装在导弹上或上过火星的RTOS,感觉更牛逼一点,后来发现其实没必要。关于RTOS后面有空再专门写篇文章介绍,今天主要聊聊国产RTOS,RT-thread 的内存管理。

我们知道计算机系统中,变量、中间数据一般存放在 RAM 中,只有在实际使用时才将它们从 RAM 调入到 CPU 中进行运算。一些数据需要的内存大小需要在程序运行过程中根据实际情况确定,这就要求系统具有对内存空间进行动态管理的能力,在用户需要一段内存空间时,向系统申请,系统选择一段合适的内存空间分配给用户,用户使用完毕后,再释放回系统,以便系统将该段内存空间回收再利用。

由于实时系统中对时间的要求非常严格,内存管理往往要比通用操作系统要求苛刻得多:

1)分配内存的时间必须是确定的。一般内存管理算法是根据需要存储的数据的长度在内存中去寻找一个与这段数据相适应的空闲内存块,然后将数据存储在里面。而寻找这样一个空闲内存块所耗费的时间是不确定的,因此对于实时系统来说,这就是不可接受的,实时系统必须要保证内存块的分配过程在可预测的确定时间内完成,否则实时任务对外部事件的响应也将变得不可确定。

2)随着内存不断被分配和释放,整个内存区域会产生越来越多的碎片(因为在使用过程中,申请了一些内存,其中一些释放了,导致内存空间中存在一些小的内存块,它们地址不连续,不能够作为一整块的大内存分配出去),系统中还有足够的空闲内存,但因为它们地址并非连续,不能组成一块连续的完整内存块,会使得程序不能申请到大的内存。对于通用系统而言,这种不恰当的内存分配算法可以通过重新启动系统来解决,但是对于那些需要常年不间断地工作于野外的嵌入式系统来说,就变得让人无法接受了。

3)嵌入式系统的资源环境也是不尽相同,有些系统的资源比较紧张,只有数十 KB 的内存可供分配,而有些系统则存在数 MB 的内存,如何为这些不同的系统,选择适合它们的高效率的内存分配算法,就将变得复杂化。

RT-Thread 操作系统在内存管理上,根据上层应用及系统资源的不同,有针对性地提供了不同的内存分配管理算法。

总体上可分为两类:内存堆管理与内存池管理。


内存堆管理

内存堆管理又根据具体内存设备划分为三种情况:

第一种是针对小内存块的分配管理(小内存管理算法);
第二种是针对大内存块的分配管理(slab 管理算法);
第三种是针对多内存堆的分配情况(memheap 管理算法)

内存堆管理器可以分配任意大小的内存块,非常灵活和方便。但其也存在明显的缺点:
一是分配效率不高,在每次分配时,都要空闲内存块查找;
二是容易产生内存碎片。

内存池管理

为了提高内存分配的效率,并且避免内存碎片,RT-Thread 提供了另外一种内存管理方法:内存池(Memory Pool)。

内存池是一种内存分配方式,用于分配大量大小相同的小内存块,它可以极大地加快内存分配与释放的速度,且能尽量避免内存碎片化。此外,RT-Thread 的内存池支持线程挂起功能,当内存池中无空闲内存块时,申请线程会被挂起,直到内存池中有新的可用内存块,再将挂起的申请线程唤醒。

内存堆管理相对简单,我们着重了解RT-thread 的内存池的实现及管理。以RTT最新稳定版本4.1.0的内核为蓝本。

\include\rtdef.h
/**
 * Base structure of Memory pool object
 */
struct rt_mempool
{
    struct rt_object parent;                            /**< inherit from rt_object */

    void            *start_address;                     /**< memory pool start */
    rt_size_t        size;                              /**< size of memory pool */

    rt_size_t        block_size;                        /**< size of memory blocks */
    rt_uint8_t      *block_list;                        /**< memory blocks list */

    rt_size_t        block_total_count;                 /**< numbers of memory block */
    rt_size_t        block_free_count;                  /**< numbers of free memory block */

    rt_list_t        suspend_thread;                    /**< threads pended on this resource */
};
typedef struct rt_mempool *rt_mp_t;

这个结构体称为内存池控制块,是操作系统用于管理内存池的一个数据结构,其继承于struct rt_object,由此可知内存池也是一种内核对象。它会存放内存池的一些信息,例如内存池名,内存池数据区域开始地址,内存缓冲区,内存块大小,块数,内存块与内存块之间连接用的链表结构,因内存块不可用而挂起的线程等待事件集合等。由此结构体实例化的对象就是内存池。

每一个内存池对象由上述结构组成,其中 suspend_thread 形成了一个申请线程等待列表,即当内存池中无可用内存块,并且申请线程允许等待时,申请线程将挂起在 suspend_thread 链表上。

内存池在创建时先向系统申请一大块内存,然后分成同样大小的多个小内存块,小内存块直接通过链表连接起来(此链表也称为空闲链表)。每次分配的时候,从空闲链表中取出链头上第一个内存块,提供给申请者。

从下图中可以看到,物理内存中允许存在多个大小不同的内存池,每一个内存池又由多个空闲内存块组成,内核用它们来进行内存管理。当一个内存池对象被创建时,内存池对象就被分配给了一个内存池控制块。

内核负责给内存池分配内存池控制块,它同时也接收用户线程的分配内存块申请,当获得这些信息后,内核就可以从内存池中为用户线程分配内存。内存池一旦初始化完成,内部的内存块大小将不能再做调整。


首先来看看内存池是怎么创建和初始化的,毕竟用内存池之前先得将其创建出来:

/**
 * @brief This function will create a mempool object and allocate the memory pool from
 *        heap.
 *
 * @param name is the name of memory pool.
 *
 * @param block_count is the count of blocks in memory pool.
 *
 * @param block_size is the size for each block.
 *
 * @return the created mempool object
 */
rt_mp_t rt_mp_create(const char *name,
                     rt_size_t   block_count,
                     rt_size_t   block_size)
{
    rt_uint8_t *block_ptr;
    struct rt_mempool *mp;
    register rt_size_t offset;

    RT_DEBUG_NOT_IN_INTERRUPT;

    /* parameter check */
    RT_ASSERT(name != RT_NULL);
    RT_ASSERT(block_count > 0 && block_size > 0);

    /* allocate object */
    mp = (struct rt_mempool *)rt_object_allocate(RT_Object_Class_MemPool, name);
    /* allocate object failed */
    if (mp == RT_NULL)
        return RT_NULL;

    /* initialize memory pool */
    block_size     = RT_ALIGN(block_size, RT_ALIGN_SIZE);
    mp->block_size = block_size;
    mp->size       = (block_size + sizeof(rt_uint8_t *)) * block_count;

    /* allocate memory */
    mp->start_address = rt_malloc((block_size + sizeof(rt_uint8_t *)) *
                                  block_count);
    if (mp->start_address == RT_NULL)
    {
        /* no memory, delete memory pool object */
        rt_object_delete(&(mp->parent));

        return RT_NULL;
    }

    mp->block_total_count = block_count;
    mp->block_free_count  = mp->block_total_count;

    /* initialize suspended thread list */
    rt_list_init(&(mp->suspend_thread));

    /* initialize free block list */
    block_ptr = (rt_uint8_t *)mp->start_address;
    for (offset = 0; offset < mp->block_total_count; offset ++)
    {
        *(rt_uint8_t **)(block_ptr + offset * (block_size + sizeof(rt_uint8_t *)))
            = block_ptr + (offset + 1) * (block_size + sizeof(rt_uint8_t *));
    }

    *(rt_uint8_t **)(block_ptr + (offset - 1) * (block_size + sizeof(rt_uint8_t *)))
        = RT_NULL;

    mp->block_list = block_ptr;

    return mp;
}

代码分析:

/* allocate object */
    mp = (struct rt_mempool *)rt_object_allocate(RT_Object_Class_MemPool, name);

这句代码就是创建一个内存池控制块,注释也很清楚表明了这个意思, 其中RT_Object_Class_MemPool表示内核对象类型,内核要根据这个
参数获取对象的尺寸,进而分配相应的内存,同时对其基类进行初始化。

/* allocate memory */
    mp->start_address = rt_malloc((block_size + sizeof(rt_uint8_t *)) * block_count);

这句代码就是从堆上划出一大块内存作为内存池,劈成block_count个内存块,每个内存块为(block_size + sizeof(rt_uint8_t *),之所以加上一个sizeof(rt_uint8_t *)是因为每个内存块上要附一个内存块链表指针,以把每个内存块串起来。让mp->start_address指向内存池首地址,这样内存池控制块和内存池就扯上关系了。

如下图所示 ,对用户看来每个内存块是block_size大小(例如60字节),对内核看来就是(block_size + sizeof(rt_uint8_t *)字节(即64字节)。

再看这块代码:

/* initialize free block list */
    block_ptr = (rt_uint8_t *)mp->start_address;
    for (offset = 0; offset < mp->block_total_count; offset ++)
    {
        *(rt_uint8_t **)(block_ptr + offset * (block_size + sizeof(rt_uint8_t *)))
            = block_ptr + (offset + 1) * (block_size + sizeof(rt_uint8_t *));
    }

    *(rt_uint8_t **)(block_ptr + (offset - 1) * (block_size + sizeof(rt_uint8_t *)))
        = RT_NULL;

    mp->block_list = block_ptr;

这块代码的作用是将内存块串起来,看的出来,内存池里的内存块是构成一个单向链表的。

我们来分析一下这段代码是怎么做的:
1. block_ptr = (rt_uint8_t *)mp->start_address, 让block_ptr指向内存池首地址。

2. 重点来看这句:

*(rt_uint8_t **)(block_ptr + offset * (block_size + sizeof(rt_uint8_t *)))
            = block_ptr + (offset + 1) * (block_size + sizeof(rt_uint8_t *));

这句代码是将下一个内存块的地址存入当前内存块链表单元中,这里采用了*(rt_uint8_t **)这种操作,看上去是不是有点奇技淫巧的感觉,
实际上就是解引用,看下图就能明白:

 

 

我们写个模拟程序可测试一下这种搞法:

int main(void)
{
    int* pa = (int*)malloc(16); //模拟内存块1
    int* pb = (int*)malloc(16); //模拟内存块2

    printf("&pa:%x, *(&pa):%x,pa:%x,  *pa:%d\n", &pa, *(&pa), pa, *pa);
    printf("&pb:%x, *(&pb):%x,pb:%x,  *pb:%d\n", &pb, *(&pb), pb, *pb);

    *(int**)(pa) = pb; //内存块1的链表单元指向内存块2的首地址

    printf("&pa:%x, *(&pa):%x,pa:%x,  *pa:%x\n", &pa, *(&pa), pa, *pa);
    printf("&pb:%x, *(&pb):%x,pb:%x,  *pb:%x\n", &pb, *(&pb), pb, *pb);

    free(pa); //注意已改地址了实际没用
    free(pb);
}

编译运行:

 

创建内存块后,默认所有的值都是0,当执行完*(int**)(pa) = pb 后(offset=0的情况),pa所指向的内存单元存储了pb所指向的内存块的地址。

吴工画了两个草图,简要形象的来分析一下这个过程:
在执行*(int**)(pa) = pb 这句代码之前内存关系如下:

 在执行*(int**)(pa) = pb 这句代码之后内存关系如下:

 

 

看到没有,所有的地址关系都有没有变化,变化的是内存块1中存入的值。通过这种方法配合for循环就将所有的内存块链接起来了。

3. 通过mp->block_list = block_ptr;这句代码,将内存池挂在内存池控制块的链表上。这样内存池控制块就可以找到内存块1,通过内存块1找到
内存块2,通过内存块2找到内存块3,以此类推。

到此也就完成了内存池的创建, 可以愉快的进行下一步了。

分配和释放内存块

分配内存块

从指定的内存池中分配一个内存块,我们来看一下具体的实现过程:

void *rt_mp_alloc(rt_mp_t mp, rt_int32_t time)
{
    rt_uint8_t *block_ptr;
    register rt_base_t level;
    struct rt_thread *thread;
    rt_uint32_t before_sleep = 0;

    /* parameter check */
    RT_ASSERT(mp != RT_NULL);

    /* get current thread */
    thread = rt_thread_self();

    /* disable interrupt */
    level = rt_hw_interrupt_disable();

    while (mp->block_free_count == 0)
    {
        /* memory block is unavailable. */
        if (time == 0)
        {
            /* enable interrupt */
            rt_hw_interrupt_enable(level);

            rt_set_errno(-RT_ETIMEOUT);

            return RT_NULL;
        }

        RT_DEBUG_NOT_IN_INTERRUPT;

        thread->error = RT_EOK;

        /* need suspend thread */
        rt_thread_suspend(thread);
        rt_list_insert_after(&(mp->suspend_thread), &(thread->tlist));

        if (time > 0)
        {
            /* get the start tick of timer */
            before_sleep = rt_tick_get();

            /* init thread timer and start it */
            rt_timer_control(&(thread->thread_timer),
                             RT_TIMER_CTRL_SET_TIME,
                             &time);
            rt_timer_start(&(thread->thread_timer));
        }

        /* enable interrupt */
        rt_hw_interrupt_enable(level);

        /* do a schedule */
        rt_schedule();

        if (thread->error != RT_EOK)
            return RT_NULL;

        if (time > 0)
        {
            time -= rt_tick_get() - before_sleep;
            if (time < 0)
                time = 0;
        }
        /* disable interrupt */
        level = rt_hw_interrupt_disable();
    }

    /* memory block is available. decrease the free block counter */
    mp->block_free_count--;

    /* get block from block list */
    block_ptr = mp->block_list;
    RT_ASSERT(block_ptr != RT_NULL);

    /* Setup the next free node. */
    mp->block_list = *(rt_uint8_t **)block_ptr;

    /* point to memory pool */
    *(rt_uint8_t **)block_ptr = (rt_uint8_t *)mp;

    /* enable interrupt */
    rt_hw_interrupt_enable(level);

    RT_OBJECT_HOOK_CALL(rt_mp_alloc_hook,
                        (mp, (rt_uint8_t *)(block_ptr + sizeof(rt_uint8_t *))));

    return (rt_uint8_t *)(block_ptr + sizeof(rt_uint8_t *));
}

代码逻辑非常清楚,注释也很详细,也很好理解。

其中 time 参数的含义是申请分配内存块的超时时间。

while (mp->block_free_count == 0){...}

表示如果内存池中已经没有空闲内存块,则判断超时时间设置:
若超时时间设置为零,则立刻返回空内存块;
若等待时间大于零,则把当前线程挂起在该内存池对象上,直到内存池中有可用的自由内存块,或等待时间到达。

/* memory block is available. decrease the free block counter */
    mp->block_free_count--;

如果内存池中有可用的内存块,则从内存池的空闲块链表上取下一个内存块,减少空闲块数目并返回这个内存块;

我们来看一下,内核是怎样从内存池中取下一个内存块的:

/* get block from block list */
    block_ptr = mp->block_list;

首先获取到内存块链表头,指向第一个内存块。

/* Setup the next free node. */
    mp->block_list = *(rt_uint8_t **)block_ptr;

然后,将链表头指向下一个内存块的地址。由前面的创建内存池的讲解我们知道第一个内存块的第一个单元里存的就是下一个内存块的地址,
 *(rt_uint8_t **)block_ptr 就是取出下一个内存块的地址,现在这个内存块是链表的第一个节点。

/* point to memory pool */
    *(rt_uint8_t **)block_ptr = (rt_uint8_t *)mp;

接下来将内存池控制块的地址存入到被取下来的这个内存块的首个单元地址中(原来这个地方是存下一个内存块的地址),以保持和内存池
的联系,不然就要成野孩子了,出去了就找不到回家的路了。如下图所示 :

return (rt_uint8_t *)(block_ptr + sizeof(rt_uint8_t *));

最后,返回第一个内存块的用户区域地址(注意是用户可见区域)。


释放内存块

任何内存块使用完后都必须被释放,否则会造成内存泄露,
首先通过需要被释放的内存块指针计算出该内存块所在的(或所属于的)内存池对象,然后增加内存池对象的可用内存块数目,并把该被释放的内存块加入空闲内存块链表上。接着判断该内存池对象上是否有挂起的线程,如果有,则唤醒挂起线程链表上的首线程。

看看具体实现过程,猜测应该是和上面的rt_mp_alloc相反的操作:

/**
 * @brief This function will release a memory block.
 *
 * @param block the address of memory block to be released.
 */
void rt_mp_free(void *block)
{
    rt_uint8_t **block_ptr;
    struct rt_mempool *mp;
    struct rt_thread *thread;
    register rt_base_t level;

    /* parameter check */
    if (block == RT_NULL) return;

    /* get the control block of pool which the block belongs to */
    block_ptr = (rt_uint8_t **)((rt_uint8_t *)block - sizeof(rt_uint8_t *));
    mp        = (struct rt_mempool *)*block_ptr;

    RT_OBJECT_HOOK_CALL(rt_mp_free_hook, (mp, block));

    /* disable interrupt */
    level = rt_hw_interrupt_disable();

    /* increase the free block count */
    mp->block_free_count ++;

    /* link the block into the block list */
    *block_ptr = mp->block_list;
    mp->block_list = (rt_uint8_t *)block_ptr;

    if (!rt_list_isempty(&(mp->suspend_thread)))
    {
        /* get the suspended thread */
        thread = rt_list_entry(mp->suspend_thread.next,
                               struct rt_thread,
                               tlist);

        /* set error */
        thread->error = RT_EOK;

        /* resume thread */
        rt_thread_resume(thread);

        /* enable interrupt */
        rt_hw_interrupt_enable(level);

        /* do a schedule */
        rt_schedule();

        return;
    }

    /* enable interrupt */
    rt_hw_interrupt_enable(level);
}

分析:

/* get the control block of pool which the block belongs to */
    block_ptr = (rt_uint8_t **)((rt_uint8_t *)block - sizeof(rt_uint8_t *)); //计算内存的首地址,注意生长方向。
    mp        = (struct rt_mempool *)*block_ptr; 

在获取内存时将内存池控制块mp的值存入了内存块里,现在释放内存时,是时候取回来了,离家的孩子没有忘记回家的路。

/* link the block into the block list */
    *block_ptr = mp->block_list;
    mp->block_list = (rt_uint8_t *)block_ptr;

看,是不是上面分配时的逆操作。先把mp->block_list所指向的内存块地址存入到回收的这个内存块里,然后把回收的这个内存块地址存入内存块队列头上。
这个操作就是把这个回家的内存块插入到内存块队列的队头上,现在他是第一个空闲节点了。


以上就是RTT内核内存池管理的主要内容了,逻辑是很清晰的,代码是很优雅的。

这里也对这个国产操作RTOS推荐一波。

很多年前在用UCOS时就关注这个系统,只是那时吴工还有点“崇洋媚外”,没怎么使用。近几年RTT发展明显快了很多,
生态也起来了,社区也非常活跃,系统更新迭代也是很迅速的。很多芯片和开发板都做了适配,官方也出了IDE,傻瓜式配置,降低了使用门槛,体验过,不过
吴工在项目中从来没用过,都是直接撸代码的。这个系统运行还是非常稳定的,实际项目中表现很不错。如果玩过LINUX的话,会发现RTT和LINXU非常像,因为
RTT最早就是从LINUX中脱胎出来的,而且也遵行POSIX标准,就是说很多应用代码直接从LINUX拿过来就能用的。RTT从2006年开始,到现在也发展了有16年之久,
在官方和所有开发者和支持者的努力下,发展的越来越有自己的特色。

希望国产RTOS发展越来越好吧!欢迎关注公众号。
 

 

 

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到