内存池核心介绍
废话不多说,show you code.
我实现了两套内存池,一个是固定大小的内存池,一个是多重不同大小的内存池。
Fixed size memory pool
设计思路:
我们一个个看,首先我们定义了一个chunk, chunk 里面包含了n个block, 每个block都存放着一个data数据和一个next的指针,这个指针指向下一个block,形成一个链表。我们分配的时候是以一个个chunk来分配的,我们会把空闲的block 存放在freelist 里面。所以每次分配的时候都会先从freelist 中查看是否有空闲的block, 如果没有,再分配一个chunk。 但是呢,我们内存池考虑到thread-safe, 又要考虑到性能,所以使用了无锁操作,也就是原子操作(atomic)。 但是实际下来还是性能比较差,所以进一步优化使用threadcache, 也就是每个thread_local。这样就保证了每个thread都有自己的一份cache, 每次先分配的内存的时候就先从cache 分配,分配完再从freelist 中取。最后我们会把每个chunk数据push到LockFreeStack 里面管理,避免数据泄露。
LockFreeStack
无锁栈代码比较简单:
- 我们首先定义链表的节点Node,这个类是个模板类型的template, T就是数据的格式,next是指向下个节点的指针。Node的构造函数声明了一个可变参数模板,允许接受任意数量和类型的参数,并使用std::forward 完美转发到data成员的构造函数。
struct Node {
T data;
std::atomic<Node*> next;
template<typename... Args>
Node(Args&&... args): data(std::forward<Args>(args)...), next(nullptr) {}
};
std::atomic<Node*> head;
push 操作:
- 使用
std::move(item)
将传入的item值移动到新节点,避免不必要的复制, 并且利用了Node
类中的完美转发构造函数 - 使用
std::memory_order_relaxed
原子加载当前的头节点, 目前没有同步需求。 - 使用头插法把新节点插入到当前节点的前一个,这里循环条件使用CAS(Compare-And-Swap)操作尝试更新头节点
compare_exchange_weak
会比较head
与oldNode
是否相等- 如果相等,则将
head
设为newNode
并返回true
,循环结束 - 如果不相等(可能其他线程修改了
head
),则将oldNode
更新为当前的head
值并返回false
,循环继续
void push(T item) { auto* newNode = new Node(std::move(item)); Node* oldNode = head.load(std::memory_order_relaxed); do{ newNode->next.store(oldNode, std::memory_order_relaxed); } while(!head.compare_exchange_weak(oldNode, newNode, std::memory_order_release, std::memory_order_relaxed)); }
- 使用
pop操作:
- 读取当前栈顶节点指针,使用宽松内存序
- 如果头节点为
nullptr
,表示栈为空,返回false
- 否则:使用CAS操作原子地更新头节点:
- 比较当前头节点是否仍为
oldNode
- 若相同,将头节点更新为
oldNode->next
并返回true
- 若不同(其他线程已修改头节点),则更新
oldNode
为当前头节点值并返回false
- 比较当前头节点是否仍为
- 成功更新头节点后,将原头节点的数据移动到结果变量
- 释放原头节点内存
- 返回操作成功
bool pop(T& result) { Node* oldNode = head.load(std::memory_order_relaxed); if (oldNode) { if (head.compare_exchange_weak(oldNode, oldNode->next.load(std::memory_order_relaxed), std::memory_order_release, std::memory_order_relaxed)) { result = std::move(oldNode->data); delete oldNode; return true; } } return false; }
这里我们为什么使用
compare_exchange_weak
而不是compare_exchange_strong
呢?compare_exchange_weak
允许在某些平台上"假失败"(spurious failure),即使比较的值实际相等,但在实现上可能会返回失败。这种设计让它能够:- 在某些CPU架构上有更高的性能
- 避免昂贵的内存栅栏(memory barrier)操作
- 特别在ARM等架构上实现更高效
- compare_exchange_strong: 保证只在实际值不等于期望值时返回失败, 如果失败需要重试,性能开销比较大。
- compare_exchange_weak: 可能出现假失败,但在循环中使用时,这种差异不影响结果正确性
LockFreeFixedSizePool
无锁的固定大小内存池。
首先看下我们的block的定义,这个结构体包含三个主要部分:
alignas(T)
指令:确保Block结构体与T类型有相同的对齐要求,防止潜在的内存对齐问题data
数组:- 类型:
std::byte
数组,大小为sizeof(T)
- 作用:存储T类型对象的原始内存
- 布局:位于Block结构体起始位置
- 类型:
next
指针:- 类型:
std::atomic<Block*>
- 作用:指向链表中的下一个Block,用于构建无锁链表
- 初始值:nullptr
- 类型:
as_object()
方法:- 功能:将
data
数组转换为T类型指针 - 实现:使用
reinterpret_cast
进行类型转换 - 返回:指向
data
内存区域的T*
指针
- 功能:将
- 当对象被释放时,Block不会被系统回收,而是返回到池中供后续使用。每个Block作为无锁链表的节点,通过
next
原子指针链接。 将data
数组放在结构体起始位置,使得T对象指针可直接转换为Block指针。 当用户从内存池获取内存时,用户拿到的是指向data
数组的T*
指针。因为data
位于Block结构体的起始位置,并且通过alignas(T)
保证了对齐要求,所以可以安全地在deallocate
时,将用户的T*
指针转回Block*
指针。
struct alignas(T) Block { std::byte data[sizeof(T)]; // 每个block的大小为T std::atomic<Block*> next{nullptr}; T* as_object() { return reinterpret_cast<T*>(data);} };
chunk的结构体有两个成员:
blocks
: 一个指向Block
数组的智能指针,使用std::unique_ptr<Block[]>
管理内存count
: 表示这个Chunk
中包含的Block
数量.Chunk
实现了批量内存分配策略:- 一次性分配多个
Block
对象 - 默认情况下,
N = 4096
,单个Block
大小为sizeof(Block)
- 实际分配的
Block
数量由BLOCK_PER_CHUNK = N / sizeof(Block)
决定
- 一次性分配多个
std::unique_ptr<Block[]>
为Chunk
提供了自动内存管理:- 当
Chunk
被销毁时,blocks
指向的内存会被自动释放 - 避免了手动内存管理的复杂性和潜在的内存泄漏
- 当
- 构造函数接收一个
block_count
参数,并完成两个操作:- 分配指定数量的
Block
对象:std::make_unique<Block[]>(block_count)
- 存储块数量:
count(block_count)
- 分配指定数量的
struct Chunk { // 每个chunk包含N个block std::unique_ptr<Block[]> blocks; size_t count; Chunk(size_t block_count) :blocks(std::make_unique<Block[]>(block_count)), count(block_count) {} };
再看下我们的ThreadCache的结构体的实现。ThreadCache是一个线程局部内的缓存系统,用于优化无锁内存池的性能。它主要为每个线程提供本地内存缓存,减少全局竞争,提高分配效率。
- 这里为什么把缓存的block块设置为32, 并且每次取的大小设置为16。
- 减少竞争:每次获取多个块,减少对全局空闲列表的访问频率。
- 适中开销: 16个块是内存和性能的平衡点,既不会一次占用过多内存,又能有效减少同步操作
- 批量效率:填充缓存时批量获取更高效,如在后面的
fill_local_cache()
中使用。
- 这里我们为什么需要一个
pool_instance
的指针。因为我们在内存归还给全局freelist, 所以这里我们传递一个指向对象的指针。
struct ThreadCache { Block* blocks[32]; int count = 0; static constexpr size_t BATCH_SIZE = 16; // 批量获取块的数量 LockFreeFixedSizePool<T, N>* pool_instance = nullptr; ~ThreadCache() { if (count > 0 && pool_instance) { return_thread_cache(); } } // 将线程本地缓存归还全局链表 void return_thread_cache() { if (count == 0 || !pool_instance) return; //将本地缓存链接成链表 for (int i = 0; i < count - 1; ++i) { blocks[i]->next.store(blocks[i+1], std::memory_order_relaxed); } Block* old_head = pool_instance->free_list.load(std::memory_order_relaxed); Block* new_head = blocks[0]; Block* new_tail = blocks[count - 1]; do { new_tail->next.store(old_head, std::memory_order_relaxed); } while (!pool_instance->free_list.compare_exchange_weak(old_head, new_head, std::memory_order_release, std::memory_order_relaxed)); count = 0; } };
- 这里为什么把缓存的block块设置为32, 并且每次取的大小设置为16。
继续看下我们的类成员的定义:
- 我们要把ThreadCache 声明为thread_local 类型,这样每个线程都会存在一份缓存
- free_list 为全局缓存,所有空闲的block块都会链接到这个list 里面。
- chunks 分配的chunk由LockFreeStack 来管理,避免内存泄露
- allocate_count 为分配一个block的计数, deallocated_count 为归还一个block的计数。
static inline thread_local ThreadCache local_cache; static inline constexpr size_t BLOCK_PER_CHUNK = N / sizeof(Block); std::atomic<Block*> free_list{nullptr}; LockFreeStack<std::unique_ptr<Chunk>> chunks; std::atomic<size_t> allocate_count{0}; std::atomic<size_t> deallocated_count{0};
分配一个新的chunk.
- 先使用make_unique分配一个chunk的大小,其中包含BLOCK_PER_CHUNK的block。
- 获取block的指针。
- 这里我们先把block串联起一个链表,然后使用一次原子操作即可。
- 第一次分配的都是空的,所有把所有的block都放进free_list 里面去。
- 最后把chunk放到LockFreeStack里面管理。
void allocate_new_chunk() { auto chunk = std::make_unique<Chunk>(BLOCK_PER_CHUNK); Block* blocks = chunk->blocks.get(); // 准备链表,只在最后一步进行一次原子操作 auto* first_block = &blocks[0]; for (size_t i = 0; i < chunk->count - 1; ++i) { blocks[i].next.store(&blocks[i+1], std::memory_order_relaxed); } Block* old_head = free_list.load(std::memory_order_relaxed); do { blocks[chunk->count-1].next.store(old_head, std::memory_order_relaxed); } while(!free_list.compare_exchange_weak(old_head, first_block, std::memory_order_release, std::memory_order_relaxed)); //将所有块链接到空闲列表 chunks.push(std::move(chunk)); }
然后我们再看看怎么把free_list 里面的数据批量取出来放到thread_cache里面去。
- 首先获取这个对象的instance, 如果缓存还有剩余,那么不需要从free_list里面取。
- 否则检查free_list 有没有空闲的块了,如果没有分配一个新的chunk.
- 这里我们直接从free_list 里面取ThreadCache::BATCH_SIZE 大小的block的链表,把获取的块放到chche的数组里面。
// 批量获取块到本地缓存 void fill_local_cache(ThreadCache& cache) { cache.pool_instance = this; // 如果本地缓存还有剩余,则直接返回 if (cache.count > 0) return; // 从全局空闲列表中获取块 int batch_size = ThreadCache::BATCH_SIZE; Block* head = nullptr; Block* tail = nullptr; Block* new_head = nullptr; Block* old_head = free_list.load(std::memory_order_acquire); do { // 如果全局链表为空,分配新的块 if (!old_head) { allocate_new_chunk(); old_head = free_list.load(std::memory_order_acquire); if (!old_head) return; // 如果还是仍然为空,返回 } head = old_head; Block* current = old_head; int count = 0; while (count < batch_size - 1 && current->next.load(std::memory_order_relaxed)) { current = current->next.load(std::memory_order_relaxed); ++count; } tail = current; new_head = tail->next.load(std::memory_order_relaxed); } while(!free_list.compare_exchange_weak(old_head, new_head, std::memory_order_release, std::memory_order_relaxed)); // 将获取到的块存储到本地缓存 Block* current = head; int i = 0; while (current != new_head) { cache.blocks[i++] = current; current = current->next.load(std::memory_order_relaxed); } cache.count = i; }
- 分配函数
- allocate 是参数是一个可变参数,允许n个参数原封不动传给T对象。
- 我们首先获取数据从cache里面获取,如果没有,调用fill_local_cache 缓存cache数据。
- 如果本地缓存失败了(一般不会出现,除非没内存了), 从free_list 分配一个block, 如果free_list 也没有了,则分配一个新的chunk。 最后返回指向数据的指针,也就是Block的首地址。
template<typename... Args> T* allocate(Args&&... args) { if (local_cache.count == 0) { fill_local_cache(local_cache); } Block* block = nullptr; // 尝试从本地缓存获取块 if (local_cache.count > 0) { block = local_cache.blocks[--local_cache.count]; } else { // 本地缓存填充失败,直接从全局获取单个块 Block* old_block = free_list.load(std::memory_order_acquire); // 先尝试从空闲列表中获取一个块 while(old_block) { if (free_list.compare_exchange_weak(old_block, old_block->next.load())) { block = old_block; break; } } // 如果空闲列表为空,则分配一个新的块 if (!block) { allocate_new_chunk(); old_block = free_list.load(std::memory_order_acquire); while(old_block) { if (free_list.compare_exchange_weak(old_block, old_block->next.load())) { block = old_block; break; } } } } allocate_count.fetch_add(1); if (block) { // 构造新对象 return new (block->as_object()) T(std::forward<Args>(args)...); } return nullptr; }
- 最后看下deallocate的函数
- 释放的时候先会去调用对象的析构函数
- 把数据的指针转为block的指针
- 这里我们有个策略就是如果本地缓存的数量,有可能归还超过总的大小,所以我们会预先检测本地缓存的是否已经大于了容量的80%, 如果已经超过的话,就先把一半的缓存还给free_list 里面,这样确保cache里面有足够的空间。
- 最后再把block 放到cache里面去。
void deallocate(T* ptr) { if (!ptr) return; ptr->~T(); /* Block 结构的第一个成员就是 data 数组 as_object() 方法返回的是 data 数组的起始地址 通过 alignas(T) 确保 Block 的对齐要求满足 T 的需求 因此,指向 T 对象的指针实际上就是指向 Block 结构体中 data 数组的指针,而 data 数组又在 Block 的起始位置,所以可以安全地将 T 指针转回 Block 指针。 内存布局: Block 对象: +----------------------+ | data[sizeof(T)] | <-- 用户获得的 T* 指针指向这里 +----------------------+ | next | +----------------------+ */ Block* block = reinterpret_cast<Block*>(ptr); local_cache.pool_instance = this; // 确定本地缓存容量 const int cache_capacity = static_cast<int>(sizeof(local_cache.blocks) / sizeof(local_cache.blocks[0])); // 如果本地缓存已近容量的80%,预先归还一半 if (local_cache.count >= cache_capacity * 0.8) { int half_count = local_cache.count / 2; // 构建链表 for (int i = 0; i < half_count - 1; ++i) { local_cache.blocks[i]->next.store(local_cache.blocks[i + 1], std::memory_order_relaxed); } // 归还到全局链表 Block* old_head = free_list.load(std::memory_order_relaxed); Block* cache_head = local_cache.blocks[0]; Block* cache_tail = local_cache.blocks[half_count - 1]; do { cache_tail->next.store(old_head, std::memory_order_relaxed); } while (!free_list.compare_exchange_weak(old_head, cache_head, std::memory_order_release, std::memory_order_relaxed)); // 压缩剩余缓存 for (int i = 0; i < local_cache.count - half_count; ++i) { local_cache.blocks[i] = local_cache.blocks[i + half_count]; } local_cache.count -= half_count; } // 现在将新块添加到本地缓存 local_cache.blocks[local_cache.count++] = block; deallocated_count.fetch_add(1, std::memory_order_relaxed); }
- 测试结果(和标准的分配器)
- 可以看到比标准的分配器还是快了一点。PS: 后面有时间可以继续优化下。
Standard allocator time: 2450 microseconds
Fixed size pool time: 1790 microseconds
源代码: MemoryPool