【高并发内存池】三、线程缓存的设计

发布于:2025-09-04 ⋅ 阅读:(19) ⋅ 点赞:(0)


在这里插入图片描述

Ⅰ. thread cache整体设计

定长内存池只支持固定大小内存块的申请释放,因此定长内存池中只需要一个空闲链表管理释放回来的内存块。但是现在我们要支持申请和释放不同大小的内存块,那么我们就需要多个空闲链表来管理释放回来的内存块,因此 thread cache 实际上是一个哈希桶结构,每个桶中存放的都是一个空闲链表结构

每个线程都会有一个 thread cache 对象,这样每个线程在该缓存中获取对象和释放对象时就不需要加锁了。

​ 其中 thread cache 支持小于等于 256KB 内存的申请,如果我们将每种字节数的内存块都用一个空闲链表进行管理的话,那么此时我们就需要 256K 个空闲链表,光是存储这些空闲链表的头指针就需要消耗大量内存,这显然是得不偿失的。

所以这时我们可以选择做一些平衡的牺牲,想办法让这些字节数按照某种规则进行对齐,例如我们让这些字节数都按照 8 字节进行向上对齐,那么 thread cache 的结构就是下面这样的,此时当线程申请 1~8 字节的内存时会直接给出 8 字节,而当线程申请 9~16 字节的内存时会直接给出 16 字节,以此类推。

在这里插入图片描述

因此当线程要申请某一大小的内存块时,就需要经过某种计算得到对齐后的字节数,进而找到对应的哈希桶,如果该哈希桶中的空闲链表中有内存块,那就从空闲链表中头删一个内存块进行返回;如果该空闲链表已经为空了,那么就需要向下一层的 central cache 申请内存了

但此时由于对齐的原因,就可能会产生一些碎片化的内存无法被利用,比如线程只申请了 6 字节的内存,而 thread cache 却直接给了 8 字节的内存,这多给出的 2 字节就无法被利用,导致了一定程度的空间浪费,这些因为某些对齐原因导致无法被利用的内存,就是内存碎片中的内部碎片


鉴于当前项目比较复杂,我们最好对空闲链表这个结构进行封装,目前我们就提供 push()pop() 两个成员函数,对应的操作分别是将对象插入到空闲链表(头插)和从空闲链表获取一个对象(头删),后面在需要时还会添加对应的成员函数!

​ 对应的操作其实我们在讲定长内存池的时候就提到过,这里不再赘述,具体参考代码,结合图例进行思考:

// common.h

// 获取所指对象的开头指针大小的空间的地址
static inline void*& get_next(void* ptr)
{
	return (*(void**)ptr);
}

class FreeList
{
private:
	void* _freelist = nullptr; // 指向空闲链表头节点的指针
public:
	// 将对象插入空闲链表
	void push(void* obj)
	{
		assert(obj != nullptr);

		// 头插操作
		get_next(obj) = _freelist;
		_freelist = obj;
	}

	// 取出空闲链表中的空间
	void* pop()
	{
		assert(_freelist != nullptr);

		// 头删操作
		void* obj = _freelist;
		_freelist = get_next(obj);
		return obj;
	}

	bool empty() { return _freelist == nullptr; }
};

因此我们下面要设计的 thread cache 类实际就是一个数组,数组中存储的就是一个个的空闲链表,至于这个数组中到底存储了多少个空闲链表,就需要看我们在进行字节数对齐时具体用的是什么映射对齐规则了,下面我们来研究一下!

Ⅱ. 空闲链表的哈希桶与对象大小的映射关系

1、如何进行对齐?

上面已经说了,我们不能为每个字节数都设置一个对应的空闲链表,这样开销太大了,因此我们需要制定一个合适的映射对齐规则。

首先,这些内存块是会被链接到空闲链表上的,因此首先肯定是按 8 字节进行对齐是最合适的,因为我们必须保证这些内存块,无论是在 32 位平台下还是 64 位平台下,都至少能够存储得下一个指针。

但如果所有的字节数都按照 8 字节进行对齐的话,那么我们就需要建立 256 × 1024 ÷ 8 = 32768 个桶,这个数量是非常多的,实际上我们可以让不同范围的字节数按照不同的对齐数进行对齐,具体对齐方式如下:

在这里插入图片描述

​ 这种对其方式其实是 tcmalloc 中的对齐方式的简化版本!

2、该对齐规则对应的空间浪费率

虽然对齐产生的内碎片会引起一定程度的空间浪费,但 按照上面的对齐规则,我们可以将浪费率控制到 10% 左右。需要说明的是,1~128 这个区间我们不做讨论,因为 1 字节就算是对齐到 2 字节都有百分之五十的浪费率,这里我们就从第二个区间开始进行计算。

在这里插入图片描述

我们可以依据最大浪费率来看看效果怎么样!根据上面的公式,要想得到某个区间的最大浪费率,就应该让分子取到最大,让分母取到最小。比如 129 ~ 1024 这个区间,对应的对齐数是 16,此时因为最小的数字是 129,它得对其到 16 的倍数,所以要变成 144 才行,此时最大浪费的字节数就是 144 - 129 = 15,而最小对齐后的字节数就是这个区间内的前 16 个数所对齐到的字节数,也就是 144,那么该区间的最大浪费率也就是 15 ÷ 144 ≈ 10.42%

​ 同样的道理,后面两个区间的最大浪费率分别是 127 ÷ 1152 ≈ 11.02%1023 ÷ 9216 ≈ 11.10%,都是在保持在 10% 左右,已经非常的优秀了!

3、对齐和映射相关函数的编写

此时有了字节数的对齐规则后,我们就需要提供两个对应的函数,分别用于获取某一字节数对齐后的字节数,以及该字节数对应的哈希桶下标。关于处理对齐和映射的函数,我们可以将其封装到一个类当中。

// common.h

// 管理对象大小的对齐与映射规则
class AlignClass
{
public:
	// 传入一个对象大小size,返回它对应的对齐数
	static inline size_t get_align(size_t size);

	// 传入一个对象大小size,返回它当前所处的哈希桶下标
	static inline int get_index(size_t size);
};

需要注意的是,AlignClass 类当中的成员函数最好设置为静态成员函数,否则我们在调用这些函数时就需要通过对象去调用,并且对于这些可能会频繁调用的函数,可以考虑将其设置为内联函数。(虽然编译器会自动将其设置为内联函数,但是我们最好也是显示设置)

① 获取向上对齐后的字节数

在获取某一字节数向上对齐后的字节数时,可以先判断该字节数属于哪一个区间,然后再通过调用一个子函数 align() 进行进一步处理。

// common.h

// 整体控制在最多10%左右的内碎片浪费
// [1,128]					8byte对齐	    freelist[0,16)
// [128+1,1024]				16byte对齐	    freelist[16,72)
// [1024+1,8*1024]			128byte对齐	    freelist[72,128)
// [8*1024+1,64*1024]		1024byte对齐     freelist[128,184)
// [64*1024+1,256*1024]		8*1024byte对齐   freelist[184,208)

// 传入一个对象大小size,返回它对应的对齐数
static inline size_t get_align(size_t size)
{
    // 根据字节的范围,将size以及其范围对应的对齐数交给align替我们计算对齐数
    if (size <= 128) return align(size, 8);
    else if (size <= 1024) return align(size, 16);
    else if (size <= 8 * 1024) return align(size, 128);
    else if (size <= 64 * 1024) return align(size, 1024);
    else if (size <= 256 * 1024) return align(size, 8 * 1024);

    // 走到这说明超过了单个线程最大申请的空间大小,则进行断言错误然后返回-1即可
    assert(false);
    return -1;
}

此时我们就需要编写一个子函数,该子函数需要通过对齐数计算出某一字节数对齐后的字节数,我们通过位运算的方式来进行计算,实现 tcmalloc 的大佬们就是使用位运算的方法来计算,虽然这种位运算很难想到,但是也是一种思路,并且计算机执行位运算的速度是比执行乘法和除法更快的!

​ 其实现如下所示:

// common.h

private:
	static inline size_t align(size_t bytes, size_t alignNum)
	{
		// 比如bytes为1~8,则alignNum为8,此时前面bytes + alignNum - 1得到的最小值和最大值分别是8和15
		// 然后alignNum - 1得到的就是7,此时取反得到就是低三位比特位都是0,其它都是1
		// 如果此时为8,也就是0000 1000,而上面取反是1111 1000,此时它们相互按位与之后得到就是0000 1000也就是8
		// 如果此时为15, 就是0000 1111,而上面取反是1111 1000,此时它们相互按位与之后得到就是0000 1000依旧是8
		return ((bytes + alignNum - 1) & ~(alignNum - 1));
	}

② 获取对应哈希桶的下标

在获取某一字节数对应的哈希桶下标时,也是先判断该字节数属于哪一个区间,然后再通过调用一个子函数 index() 进行进一步处理!

​ 需要注意的是,此时我们并 不是传入该字节数的对齐数,而是对齐数写成以 2 为底时候的指数 n,将这个 n 值进行传入。比如对齐数是 8,传入的就是 3

​ 此外就是因为在下面的 index() 函数中计算下标的时候,因为 不同的对象大小是对应不同的对齐数的,所以要将当前区间的前面区间的大小都减掉,只剩下当前区间的大小,才能让它们对应自己区间上的对齐数!

​ 举个例子,如果 size1428,位于 [129, 1024] 区间,它对应的对齐数是 16,但是因为 size 的大小中包含了 [0, 128] 的大小,它们对应的对齐数是 8,这是不匹配的,所以要让 1428 - 128 = 1300,这才是要传给 index() 函数的第一个参数!

​ 还有就是因为 index() 返回的只是当前对象大小对应区间的下标,它是从 0 开始的,但是如果它不是第一个区间,比如上面的 size = 1428 它是属于 [129, 1024] 区间上的,它对应的哈希桶下标应该是从 16 开始的,此时就需要加上 [0, 128] 区间的桶数,才能让它从 16 开始,其它的区间也是一样的!

​ 也就是说 当前区间的哈希桶下标计算的时候需要加上前面区间的桶的个数!所以我们需要搞一个 groupArray 数组,事先给出每个区间有多少个空闲链表,这样子方便在拿到下标之后进行计算!

// common.h

// 传入一个对象大小size,返回它当前所处的哈希桶下标
static inline int get_index(size_t size)
{
    // 事先给出每个区间有多少个空闲链表
    static int groupArray[4] = { 16, 56, 56, 56 }; 

    // 这里对齐数传入的是对齐数以2为底,此时对应的指数
    // 并且这里因为index函数中计算下标的时候,只考虑当前字节数属于哪一个区间,所以要将前面区间的字节数减去
    if (size <= 128) 
        return index(size, 3);
    else if (size <= 1024) 
        return index(size - 128, 4) + groupArray[0];
    else if (size <= 8 * 1024) 
        return index(size - 1024, 7) + groupArray[0] + groupArray[1];
    else if (size <= 64 * 1024) 
        return index(size - 8*1024, 10) + groupArray[0] + groupArray[1] + groupArray[2];
    else if (size <= 256 * 1024) 
        return index(size - 256*1024, 13) + groupArray[0] + groupArray[1] + groupArray[2] + groupArray[3];

    // 走到这说明超过了单个线程最大申请的空间大小,则进行断言错误然后返回-1即可
    assert(false);
    return -1;
}

此时我们需要编写一个子函数来继续进行处理,当然,为了提高效率下面也提供了一个用位运算来解决的方法(也是借鉴 tcmalloc 中的思路):

// common.h

private:
	static inline int index(size_t bytes, size_t alignShift)
	{
		// 比如bytes为1~8,则alignShift为3,此时前面(1 << alignShift) - 1得到的就是7
		// 然后bytes + 7 得到的区间就是8~15了
		// 接着让8~15区间的值右移alignShift位也就是3位然后减一,如下所示:
		// 如果为8,即0000 1000,右移3位得到0000 0001 - 1 = 0
		// 如果为15,即0000 1111,右移3位得到0000 0001 - 1 = 0
		// 可以看到它们都是属于对齐数8的空闲链表中,哈希桶中下标为0的对象!
		return ((bytes + (1 << alignShift) - 1) >> alignShift) - 1;
	}

Ⅲ. thread cache类的实现

按照上述的对齐规则,thread cache 中桶的个数,也就是空闲链表的个数是 208,以及 thread cache 允许申请的最大内存大小 256KB,我们可以将这些数据按照如下方式进行定义:

// common.h

static const size_t THREAD_MAX_SIZE = 256 * 1024; // 单个线程最大申请的空间大小(单位为字节)
static const size_t FREELIST_NUMS = 208;		  // 空闲链表的个数

现在就可以对 ThreadCache 类进行定义了,thread cache 就是一个存储 208 个空闲链表的数组,然后提供申请和释放内存对象的接口即可,如下所示:

// ThreadCache.h
#pragma once
#include "Common.h"

class ThreadCache
{
private:
	FreeList _freelists[FREELIST_NUMS];
public:
	// 申请和释放内存对象
	void* allocate(size_t size);
	void deallocate(void* ptr, size_t size);
private:
	// 从中心缓存获取对象空间
	void* fetch_from_CentralCache(size_t index, size_t size);
};

thread cache 申请对象时通过所申请的字节数计算出对应的哈希桶下标,如果桶中空闲链表不为空,则从该空闲链表中取出一个对象进行返回即可;但如果此时空闲链表为空,那么我们就需要从 central cache 中申请空间了,这里的 fetch_from_CentralCache() 函数就是为了这个功能,它也是 thread cache 类中的一个成员函数,这在后面设计 CentralCache 类之后再进行实现!

// ThreadCache.cpp
#include "ThreadCache.h"

void* ThreadCache::allocate(size_t size)
{
	assert(size <= THREAD_MAX_SIZE); // 要求申请的空间是小于256KB的

	// 获取对齐数和下标,然后进行判断:
	//  1. 如果对应哈希桶中存在空闲的空间,则将其分配出来即可
	//  2. 如果没有空闲空间,则向下一层central cache进行内存申请
	size_t align_num = AlignClass::get_align(size);
	size_t index = AlignClass::get_index(size);
	if (!_freelists[index].empty())
		return _freelists[index].pop();
	else
		return fetch_from_CentralCache(index, align_num);
}

void ThreadCache::deallocate(void* ptr, size_t size)
{
	assert(size <= THREAD_MAX_SIZE); // 要求申请的空间是小于256KB的
	assert(ptr != nullptr);

	// 将ptr指向的空间插入到对应的哈希桶中即可
	size_t index = AlignClass::get_index(size);
	_freelists[index].push(ptr);
    
    //后面补充剩余细节...
}

Ⅳ. thread cacheTLS无锁访问

windows下的tls

linux下的tls

​ 线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。

​ 在 windows 中, 可以使用 __declspec 关键字声明 thread 变量,如下所示:

__declspec(thread) int i = 1; // 初始化一个只有该线程能访问的全局变量i

​ 所以我们想创建一个每个线程独立的 ThreadCache 类的话,并且因为我们在多文件内引入头文件,所以需要用 static 修饰,最后就是这样子初始化:

static __declspec(thread) ThreadCache* tls_thread_cache = nullptr;

​ 知道这些之后,我们先 创建一个新头文件 ConcurrentAlloc.h,它包含最终主函数要调用的申请内存的接口,相当于 malloc 那样子使用!

#pragma once
#include "Common.h"
#include "ThreadCache.h"

// 向高并发内存池申请空间的接口
static void* ConcurrentAlloc(size_t size);

// 释放内存的接口(实际上并不会真的释放,只是还给内存池管理了)
static void ConcurrentFree(void* ptr, size_t size);

​ 我们先简单的实现一下这两个接口(在后面的笔记中会补充其它实现细节,这不是最终版本!),因为不是每个线程被创建时就立马有了属于自己的 thread cache,而是当该线程调用相关申请内存的接口时才会创建自己的 thread cache,因此在申请内存的函数中需要包含以下逻辑:

#pragma once
#include "Common.h"
#include "ThreadCache.h"

// 向高并发内存池申请空间的接口
static void* ConcurrentAlloc(size_t size)
{
    // 通过TLS机制,每个线程都能不加锁的获取到属于自己的ThreadCache对象,并且对于单个来说是全局的!
	if (tls_thread_cache == nullptr)
		tls_thread_cache = new ThreadCache;
	
	return tls_thread_cache->allocate(size);
}

// 释放内存的接口(实际上并不会真的释放,只是还给内存池管理了)
static void ConcurrentFree(void* ptr, size_t size)
{
	assert(tls_thread_cache != nullptr);
	tls_thread_cache->deallocate(ptr, size);
}

​ 最后我们来测试一下当前 TLS 机制的无锁访问是否有效,如下图所示:

在这里插入图片描述

在这里插入图片描述