Redis系列-哈希表
摘要:本文将对redis哈希表进行介绍,主要侧重从源代码的角度展开,所使用的源码版本为6.0.19。
关键词:Redis,C,源代码,哈希表
源码分析
结构体定义和构造
下面是一些重要结构体的定义,在源码文件dict.h
中,哈希表的实现则在源码文件dict.c
中。
// 定义的是哈希表每一个位置处链表中的一个节点
typedef struct dictEntry {
void *key;
union { // union联合体节省内存
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;
typedef struct dictht {
dictEntry **table;
unsigned long size;
unsigned long sizemask;
unsigned long used; // 已有的元素数量
} dictht;
typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2]; // redis 渐进式rehash需要新旧两个哈希表
// 表示rehash的进程,如果为-1,表示当前不处于rehash中
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
// 表示当前正在哈希表上迭代的线程数目
unsigned long iterators; /* number of iterators currently running */
} dict;
创建一个哈希表的函数代码如下:
static void _dictReset(dictht *ht)
{
ht->table = NULL;
ht->size = 0;
ht->sizemask = 0;
ht->used = 0;
}
/* Create a new hash table */
// type结构体里包含一些针对具体类型的哈希函数,比较函数,注意到Entry结构体定义中的key和val都是void*指针,一定程度上支持泛型
dict *dictCreate(dictType *type,
void *privDataPtr)
{
dict *d = zmalloc(sizeof(*d));
_dictInit(d,type,privDataPtr);
return d;
}
/* Initialize the hash table */
int _dictInit(dict *d, dictType *type,
void *privDataPtr)
{
_dictReset(&d->ht[0]);
_dictReset(&d->ht[1]);
d->type = type;
d->privdata = privDataPtr;
d->rehashidx = -1;
d->iterators = 0;
return DICT_OK;
}
dictAdd 插入元素
相关的源码如下:
int dictAdd(dict *d, void *key, void *val)
{
// 插入一个节点,并且返回指向那个节点的指针,用来赋值(处于底层代码复用的目的)
dictEntry *entry = dictAddRaw(d,key,NULL);
if (!entry) return DICT_ERR;
dictSetVal(d, entry, val); // 赋值
return DICT_OK;
}
/* Low level add or find:
* This function adds the entry but instead of setting a value returns the
* dictEntry structure to the user, that will make sure to fill the value
* field as he wishes.
*
* This function is also directly exposed to the user API to be called
* mainly in order to store non-pointers inside the hash value, example:
*
* entry = dictAddRaw(dict,mykey,NULL);
* if (entry != NULL) dictSetSignedIntegerVal(entry,1000);
*
* Return values:
*
* If key already exists NULL is returned, and "*existing" is populated
* with the existing entry if existing is not NULL.
*
* If key was added, the hash entry is returned to be manipulated by the caller.
*/
// 这个函数用来实现插入或者查找,如果哈希表中有这个key了,则返回null,但是会将传入的二级指针参数指向这个存在的entry,如果不存在,则插入一个,返回这个新创建的entry
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
dictht *ht;
// 如果当前处于rehash阶段,则先尝试完成一个桶的rehash
if (dictIsRehashing(d)) _dictRehashStep(d);
/* Get the index of the new element, or -1 if
* the element already exists. */
// key已经存在
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;
/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that in a database
* system it is more likely that recently added entries are accessed
* more frequently. */
// 如果当前处于rehash状态,则每一次插入key-value都会进行一次rehash,将rehash的时间分散到每一次操作中
// 和scan、keys的选择有异曲同工之妙,毕竟redis是单线程,宁愿要多次短时操作,不要单次长时操作
// 并且如果正在rehash,则选择新的ht[1], 选择选择ht[0]
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;
/* Set the hash entry fields. */
dictSetKey(d, entry, key);
return entry;
}
// 查找过程,如果key不存在,则返回待插入节点所在的桶位置,如果存在,返回-1,且传入的二级指针将会指向这个节点
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
unsigned long idx, table;
dictEntry *he;
if (existing) *existing = NULL;
/* Expand the hash table if needed */
if (_dictExpandIfNeeded(d) == DICT_ERR) // 判断需要需要扩容
return -1;
for (table = 0; table <= 1; table++) {
idx = hash & d->ht[table].sizemask;
/* Search if this slot does not already contain the given key */
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) {
if (existing) *existing = he;
return -1;
}
he = he->next;
}
if (!dictIsRehashing(d)) break; // 非rehash阶段,不需要去看1号桶(新桶)
}
return idx;
}
总结一下大致的过程如下:
- 首先判断是否处于rehash阶段,是的话,则进行一个旧bucket的搬迁
- 判断是否需要扩容,如果需要扩容,则首先完成扩容
- 先在零号哈希表里去找,如果找不到且当前处于rehash阶段,则到1号哈希表里去找。如果找不到,则说明需要插入这个元素,返回这个元素在哈希表中bucket的编号,找到了则直接返回这个找到的节点。
- 如果上一步中没有在哈希表中找到这个key,下面正式进入插入阶段,直接用头插法插入到哈希表对应bucket链表的头部。注意,如果当前处于rehash阶段,会优先插入到新的1号哈希表里。
- 返回插入的节点,进行相应的赋值,没有在插入的时候赋值是因为复用底层函数的目标,底层函数要复用,不能执行赋值操作。
dictReplace
// dictReplace 增加一个元素或者更新一个元素,如果key已经存在,
int dictReplace(dict *d, void *key, void *val)
{
dictEntry *entry, *existing, auxentry;
/* Try to add the element. If the key
* does not exists dictAdd will succeed. */
// 传入二级指针,如果key不存在则插入,否则返回key对应的节点
entry = dictAddRaw(d,key,&existing);
if (entry) {
dictSetVal(d, entry, val);
return 1;
}
/* Set the new value and free the old one. Note that it is important
* to do that in this order, as the value may just be exactly the same
* as the previous one. In this context, think to reference counting,
* you want to increment (set), and then decrement (free), and not the
* reverse. */
auxentry = *existing;
dictSetVal(d, existing, val);
dictFreeVal(d, &auxentry);
return 0;
}
基于底层复用的函数dictAddRaw
,这个函数实现起来非常简单,不存在则插入,存在则更新。
dictDelete
相关代码如下:
// 底层工具函数,其中第三个参数nofree表示是否需要删除找到的entry,高层的dictDelete传入0,表示直接删除,dictUnlink 相当于只是从哈希表中摘出了这个节点
static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
uint64_t h, idx;
dictEntry *he, *prevHe;
int table;
// 空哈希表
if (d->ht[0].used == 0 && d->ht[1].used == 0) return NULL;
if (dictIsRehashing(d)) _dictRehashStep(d); // 正在rehash,则尝试搬迁一个旧bucket
h = dictHashKey(d, key);
for (table = 0; table <= 1; table++) {
idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx];
prevHe = NULL; // 保存前驱节点,便于删除链表节点
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) {
/* Unlink the element from the list */
if (prevHe)
prevHe->next = he->next;
else
d->ht[table].table[idx] = he->next;
if (!nofree) { // 直接删除
dictFreeKey(d, he);
dictFreeVal(d, he);
zfree(he);
}
d->ht[table].used--;
return he;
}
prevHe = he;
he = he->next;
}
if (!dictIsRehashing(d)) break; // 只有当前处于rehash阶段,才会去查1号哈希表
}
return NULL; /* not found */
}
/* Remove an element, returning DICT_OK on success or DICT_ERR if the
* element was not found. */
int dictDelete(dict *ht, const void *key) {
return dictGenericDelete(ht,key,0) ? DICT_OK : DICT_ERR;
}
/* Remove an element from the table, but without actually releasing
* the key, value and dictionary entry. The dictionary entry is returned
* if the element was found (and unlinked from the table), and the user
* should later call `dictFreeUnlinkedEntry()` with it in order to release it.
* Otherwise if the key is not found, NULL is returned.
*
* This function is useful when we want to remove something from the hash
* table but want to use its value before actually deleting the entry.
* Without this function the pattern would require two lookups:
*
* entry = dictFind(...);
* // Do something with entry
* dictDelete(dictionary,entry);
*
* Thanks to this function it is possible to avoid this, and use
* instead:
*
* entry = dictUnlink(dictionary,entry);
* // Do something with entry
* dictFreeUnlinkedEntry(entry); // <- This does not need to lookup again.
*/
dictEntry *dictUnlink(dict *ht, const void *key) {
return dictGenericDelete(ht,key,1);
}
其中dictDelete
是直接函数,不关心被删除的节点,dictUnlink
只是从哈希表中摘出了这个节点,可能还需要访问这个节点的值。如果当前处于rehash阶段,则0号哈希表和1号哈希表的对应节点都会被删除。
dictFind
代码如下,大致逻辑和前面的类似,先在0号哈希表里去查找,只有当前处于rehash阶段,才会去1号哈希表里去查找。
dictEntry *dictFind(dict *d, const void *key)
{
dictEntry *he;
uint64_t h, idx, table;
// 空哈希表
if (dictSize(d) == 0) return NULL; /* dict is empty */
if (dictIsRehashing(d)) _dictRehashStep(d); // 尝试进行一次搬迁
h = dictHashKey(d, key);
for (table = 0; table <= 1; table++) {
idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key))
return he;
he = he->next;
}
if (!dictIsRehashing(d)) return NULL;
}
return NULL;
}
dictExpand
首先来看什么时候会扩容,代码如下:
// 判断是否需要扩容,如果需要,则进行扩容
static int _dictExpandIfNeeded(dict *d)
{
/* Incremental rehashing already in progress. Return. */
// 处于rehash阶段,不扩容
if (dictIsRehashing(d)) return DICT_OK;
/* If the hash table is empty expand it to the initial size. */
// 初始化
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
// 扩容策略:
// 1. 配置为可以扩容情况下,只要元素数量大于了桶的数量,将会扩容,负载因子大于1
// 2. 配置为避免扩容情况下,只要元素数量大于五倍的桶数量,相当于负载因子大于5,
if ((dict_can_resize == DICT_RESIZE_ENABLE &&
d->ht[0].used >= d->ht[0].size) ||
(dict_can_resize != DICT_RESIZE_FORBID &&
d->ht[0].used / d->ht[0].size > dict_force_resize_ratio))
{
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
}
/* Our hash table capability is a power of two */
static unsigned long _dictNextPower(unsigned long size)
{
unsigned long i = DICT_HT_INITIAL_SIZE;
if (size >= LONG_MAX) return LONG_MAX + 1LU;
while(1) {
if (i >= size)
return i;
i *= 2;
}
}
扩容的条件大致有两个,其中dict_can_resize是文件中的一个static的全局变量,默认是DICT_RESIZE_ENABLE
,处于这种状态下,只要哈希表中的元素数量超过了桶的数量,就会扩容。如果该值被设置为DICT_RESIZE_AVOID
,则如果哈希表中的元素数量超过了桶数量的五倍(5也是一个全局变量dict_force_resize_ratio
),也会发生扩容。但是如果该值被设置为DICT_RESIZE_FORBID
,就不会扩容了。当前也要检查是否处于rehash阶段,如果处于这样的阶段,也不会去扩容。
下面来看一下具体扩容的代码,如下:
int dictExpand(dict *d, unsigned long size)
{
/* the size is invalid if it is smaller than the number of
* elements already inside the hash table */
// 如果当前处于rehash阶段,返回错误
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;
dictht n; /* the new hash table */
// 从4开始不断乘以2,直到找到一个可行2的幂并且大于size
unsigned long realsize = _dictNextPower(size);
/* Rehashing to the same table size is not useful. */
// 不需要扩容
if (realsize == d->ht[0].size) return DICT_ERR;
/* Allocate the new hash table and initialize all pointers to NULL */
n.size = realsize;
n.sizemask = realsize-1; // 掩码设置
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
// 初始化
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}
/* Prepare a second hash table for incremental rehashing */
d->ht[1] = n;
d->rehashidx = 0; // 标识当前处于rehash阶段,下一步要处理搬迁的桶的编号为0
return DICT_OK;
}
首先根据传入的size(已有元素数量的两倍)确定一个比其大的2的幂,作为新哈希表桶的数量,由于采用的是渐进式rehash的方法,所以这里会构造一个新哈希表作为一号哈希表,然后设置rehash进度为0,即下一步要搬迁的桶的编号为0。
dictRehash
下面来看rehash的相关代码,函数_dictRehashStep
会在插入更新删除查找等操作中被调用,进行一步搬迁,即最多搬迁一个非空桶。同时,在搬迁之前还会检查,如果resize策略被设置为禁止或者当前不处于rehash阶段,则直接返回,或者策略为尽可能避免avoid rehash,且新哈希表桶数量小于旧哈希表桶数量的五倍(dict_force_resize_ratio
),也会直接返回。且旧哈希表里有空桶,所以这里只会扫描10 * n个空桶,然后返回,这样做的目的是避免长时间在扫描空桶,增加了一条命令的响应延迟。
// dictRehash 执行n步的rehash
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */ // 最高的访问桶的次数
// doct_can_resize 是一个全局的static变量,默认设置为DICT_RESIZE_ENABLE,表示可以扩容
// 当不允许扩容或者当前不处于rehash阶段的时候,返回0不再需要进行rehash
if (dict_can_resize == DICT_RESIZE_FORBID || !dictIsRehashing(d)) return 0;
// 当扩容策略被设置为尽可能避免avoid,且新ht的桶数量相比旧ht桶的数量没有达到一定的阈值,则直接返回0,不进行rehash
if (dict_can_resize == DICT_RESIZE_AVOID &&
(d->ht[1].size / d->ht[0].size < dict_force_resize_ratio))
{
return 0;
}
while(n-- && d->ht[0].used != 0) { // 开始搬迁
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
// rehashidx=-1,说明当前不处于rehash阶段,否则处于rehash阶段,且该值表示rehash到了那个桶
assert(d->ht[0].size > (unsigned long)d->rehashidx);
// 找到一个非空的桶,进行搬迁,注意最多扫描10 * n个空桶,如果超过这个,也会返回,避免长时间扫描带来的服务停顿
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
// 开始搬迁de这个桶
while(de) {
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask; // 找到其在新桶中的位置
de->next = d->ht[1].table[h]; // 头插法
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
// 检查一下是否完成了所有搬迁工作
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}
/* More to rehash... */
return 1;
}
static void _dictRehashStep(dict *d) {
// 此时没有安全的迭代器正在迭代,进行一次rehash
if (d->iterators == 0) dictRehash(d,1);
}