Redis系列-哈希表

发布于:2024-11-28 ⋅ 阅读:(98) ⋅ 点赞:(0)


摘要:本文将对redis哈希表进行介绍,主要侧重从源代码的角度展开,所使用的源码版本为6.0.19。
关键词:Redis,C,源代码,哈希表

源码分析

结构体定义和构造

下面是一些重要结构体的定义,在源码文件dict.h中,哈希表的实现则在源码文件dict.c中。

// 定义的是哈希表每一个位置处链表中的一个节点
typedef struct dictEntry {  
    void *key;
    union {   // union联合体节省内存
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    struct dictEntry *next;
} dictEntry;

typedef struct dictht {
    dictEntry **table;
    unsigned long size;   
    unsigned long sizemask;
    unsigned long used;   // 已有的元素数量
} dictht;

typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2];   //  redis 渐进式rehash需要新旧两个哈希表
    // 表示rehash的进程,如果为-1,表示当前不处于rehash中
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    // 表示当前正在哈希表上迭代的线程数目
    unsigned long iterators; /* number of iterators currently running */
} dict;

创建一个哈希表的函数代码如下:

static void _dictReset(dictht *ht)
{
    ht->table = NULL;
    ht->size = 0;
    ht->sizemask = 0;
    ht->used = 0;
}

/* Create a new hash table */
// type结构体里包含一些针对具体类型的哈希函数,比较函数,注意到Entry结构体定义中的key和val都是void*指针,一定程度上支持泛型
dict *dictCreate(dictType *type,
        void *privDataPtr)
{
    dict *d = zmalloc(sizeof(*d));

    _dictInit(d,type,privDataPtr);
    return d;
}

/* Initialize the hash table */
int _dictInit(dict *d, dictType *type,
        void *privDataPtr)
{
    _dictReset(&d->ht[0]);
    _dictReset(&d->ht[1]);
    d->type = type;
    d->privdata = privDataPtr;
    d->rehashidx = -1;   
    d->iterators = 0;
    return DICT_OK;
}

dictAdd 插入元素

相关的源码如下:

int dictAdd(dict *d, void *key, void *val)
{
    // 插入一个节点,并且返回指向那个节点的指针,用来赋值(处于底层代码复用的目的)
    dictEntry *entry = dictAddRaw(d,key,NULL);

    if (!entry) return DICT_ERR;
    dictSetVal(d, entry, val);   // 赋值
    return DICT_OK;
}

/* Low level add or find:
 * This function adds the entry but instead of setting a value returns the
 * dictEntry structure to the user, that will make sure to fill the value
 * field as he wishes.
 *
 * This function is also directly exposed to the user API to be called
 * mainly in order to store non-pointers inside the hash value, example:
 *
 * entry = dictAddRaw(dict,mykey,NULL);
 * if (entry != NULL) dictSetSignedIntegerVal(entry,1000);
 *
 * Return values:
 *
 * If key already exists NULL is returned, and "*existing" is populated
 * with the existing entry if existing is not NULL.
 *
 * If key was added, the hash entry is returned to be manipulated by the caller.
 */
// 这个函数用来实现插入或者查找,如果哈希表中有这个key了,则返回null,但是会将传入的二级指针参数指向这个存在的entry,如果不存在,则插入一个,返回这个新创建的entry
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
    long index;
    dictEntry *entry;
    dictht *ht;
    // 如果当前处于rehash阶段,则先尝试完成一个桶的rehash
    if (dictIsRehashing(d)) _dictRehashStep(d);

    /* Get the index of the new element, or -1 if
     * the element already exists. */
    // key已经存在
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
        return NULL;

    /* Allocate the memory and store the new entry.
     * Insert the element in top, with the assumption that in a database
     * system it is more likely that recently added entries are accessed
     * more frequently. */
    // 如果当前处于rehash状态,则每一次插入key-value都会进行一次rehash,将rehash的时间分散到每一次操作中
    // 和scan、keys的选择有异曲同工之妙,毕竟redis是单线程,宁愿要多次短时操作,不要单次长时操作
    // 并且如果正在rehash,则选择新的ht[1], 选择选择ht[0]
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    entry->next = ht->table[index];
    ht->table[index] = entry;
    ht->used++;

    /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    return entry;
}
// 查找过程,如果key不存在,则返回待插入节点所在的桶位置,如果存在,返回-1,且传入的二级指针将会指向这个节点
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
    unsigned long idx, table;
    dictEntry *he;
    if (existing) *existing = NULL;

    /* Expand the hash table if needed */
    if (_dictExpandIfNeeded(d) == DICT_ERR)  // 判断需要需要扩容
        return -1;
    for (table = 0; table <= 1; table++) {
        idx = hash & d->ht[table].sizemask;
        /* Search if this slot does not already contain the given key */
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key)) {
                if (existing) *existing = he;
                return -1;
            }
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;   // 非rehash阶段,不需要去看1号桶(新桶)
    }
    return idx;
}

总结一下大致的过程如下:

  1. 首先判断是否处于rehash阶段,是的话,则进行一个旧bucket的搬迁
  2. 判断是否需要扩容,如果需要扩容,则首先完成扩容
  3. 先在零号哈希表里去找,如果找不到且当前处于rehash阶段,则到1号哈希表里去找。如果找不到,则说明需要插入这个元素,返回这个元素在哈希表中bucket的编号,找到了则直接返回这个找到的节点。
  4. 如果上一步中没有在哈希表中找到这个key,下面正式进入插入阶段,直接用头插法插入到哈希表对应bucket链表的头部。注意,如果当前处于rehash阶段,会优先插入到新的1号哈希表里。
  5. 返回插入的节点,进行相应的赋值,没有在插入的时候赋值是因为复用底层函数的目标,底层函数要复用,不能执行赋值操作。

dictReplace

// dictReplace 增加一个元素或者更新一个元素,如果key已经存在,
int dictReplace(dict *d, void *key, void *val)
{
    dictEntry *entry, *existing, auxentry;

    /* Try to add the element. If the key
     * does not exists dictAdd will succeed. */
    // 传入二级指针,如果key不存在则插入,否则返回key对应的节点
    entry = dictAddRaw(d,key,&existing);
    if (entry) {
        dictSetVal(d, entry, val);  
        return 1;
    }

    /* Set the new value and free the old one. Note that it is important
     * to do that in this order, as the value may just be exactly the same
     * as the previous one. In this context, think to reference counting,
     * you want to increment (set), and then decrement (free), and not the
     * reverse. */
    auxentry = *existing;
    dictSetVal(d, existing, val);
    dictFreeVal(d, &auxentry);
    return 0;
}

基于底层复用的函数dictAddRaw,这个函数实现起来非常简单,不存在则插入,存在则更新。

dictDelete

相关代码如下:

// 底层工具函数,其中第三个参数nofree表示是否需要删除找到的entry,高层的dictDelete传入0,表示直接删除,dictUnlink 相当于只是从哈希表中摘出了这个节点
static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
    uint64_t h, idx;
    dictEntry *he, *prevHe;
    int table;
    // 空哈希表
    if (d->ht[0].used == 0 && d->ht[1].used == 0) return NULL;

    if (dictIsRehashing(d)) _dictRehashStep(d);   // 正在rehash,则尝试搬迁一个旧bucket
    h = dictHashKey(d, key);

    for (table = 0; table <= 1; table++) {
        idx = h & d->ht[table].sizemask;
        he = d->ht[table].table[idx];
        prevHe = NULL;   // 保存前驱节点,便于删除链表节点
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key)) {
                /* Unlink the element from the list */
                if (prevHe)
                    prevHe->next = he->next;
                else
                    d->ht[table].table[idx] = he->next;
                if (!nofree) {   // 直接删除
                    dictFreeKey(d, he);
                    dictFreeVal(d, he);
                    zfree(he);
                }
                d->ht[table].used--;
                return he;
            }
            prevHe = he;
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;   // 只有当前处于rehash阶段,才会去查1号哈希表
    }
    return NULL; /* not found */
}

/* Remove an element, returning DICT_OK on success or DICT_ERR if the
 * element was not found. */
int dictDelete(dict *ht, const void *key) {
    return dictGenericDelete(ht,key,0) ? DICT_OK : DICT_ERR;
}

/* Remove an element from the table, but without actually releasing
 * the key, value and dictionary entry. The dictionary entry is returned
 * if the element was found (and unlinked from the table), and the user
 * should later call `dictFreeUnlinkedEntry()` with it in order to release it.
 * Otherwise if the key is not found, NULL is returned.
 *
 * This function is useful when we want to remove something from the hash
 * table but want to use its value before actually deleting the entry.
 * Without this function the pattern would require two lookups:
 *
 *  entry = dictFind(...);
 *  // Do something with entry
 *  dictDelete(dictionary,entry);
 *
 * Thanks to this function it is possible to avoid this, and use
 * instead:
 *
 * entry = dictUnlink(dictionary,entry);
 * // Do something with entry
 * dictFreeUnlinkedEntry(entry); // <- This does not need to lookup again.
 */
dictEntry *dictUnlink(dict *ht, const void *key) {
    return dictGenericDelete(ht,key,1);
}

其中dictDelete是直接函数,不关心被删除的节点,dictUnlink只是从哈希表中摘出了这个节点,可能还需要访问这个节点的值。如果当前处于rehash阶段,则0号哈希表和1号哈希表的对应节点都会被删除。

dictFind

代码如下,大致逻辑和前面的类似,先在0号哈希表里去查找,只有当前处于rehash阶段,才会去1号哈希表里去查找。

dictEntry *dictFind(dict *d, const void *key)
{
    dictEntry *he;
    uint64_t h, idx, table;
    // 空哈希表
    if (dictSize(d) == 0) return NULL; /* dict is empty */
    if (dictIsRehashing(d)) _dictRehashStep(d);   // 尝试进行一次搬迁
    h = dictHashKey(d, key);
    for (table = 0; table <= 1; table++) {
        idx = h & d->ht[table].sizemask;
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key))
                return he;
            he = he->next;
        }
        if (!dictIsRehashing(d)) return NULL;
    }
    return NULL;
}

dictExpand

首先来看什么时候会扩容,代码如下:

// 判断是否需要扩容,如果需要,则进行扩容
static int _dictExpandIfNeeded(dict *d)
{
    /* Incremental rehashing already in progress. Return. */
    // 处于rehash阶段,不扩容
    if (dictIsRehashing(d)) return DICT_OK;

    /* If the hash table is empty expand it to the initial size. */
    // 初始化
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

    /* If we reached the 1:1 ratio, and we are allowed to resize the hash
     * table (global setting) or we should avoid it but the ratio between
     * elements/buckets is over the "safe" threshold, we resize doubling
     * the number of buckets. */
    // 扩容策略:
    // 1. 配置为可以扩容情况下,只要元素数量大于了桶的数量,将会扩容,负载因子大于1
    // 2. 配置为避免扩容情况下,只要元素数量大于五倍的桶数量,相当于负载因子大于5,
    if ((dict_can_resize == DICT_RESIZE_ENABLE &&
         d->ht[0].used >= d->ht[0].size) ||
        (dict_can_resize != DICT_RESIZE_FORBID &&
         d->ht[0].used / d->ht[0].size > dict_force_resize_ratio))
    {
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}

/* Our hash table capability is a power of two */
static unsigned long _dictNextPower(unsigned long size)
{
    unsigned long i = DICT_HT_INITIAL_SIZE;

    if (size >= LONG_MAX) return LONG_MAX + 1LU;
    while(1) {
        if (i >= size)
            return i;
        i *= 2;
    }
}

扩容的条件大致有两个,其中dict_can_resize是文件中的一个static的全局变量,默认是DICT_RESIZE_ENABLE,处于这种状态下,只要哈希表中的元素数量超过了桶的数量,就会扩容。如果该值被设置为DICT_RESIZE_AVOID,则如果哈希表中的元素数量超过了桶数量的五倍(5也是一个全局变量dict_force_resize_ratio),也会发生扩容。但是如果该值被设置为DICT_RESIZE_FORBID,就不会扩容了。当前也要检查是否处于rehash阶段,如果处于这样的阶段,也不会去扩容。
下面来看一下具体扩容的代码,如下:

int dictExpand(dict *d, unsigned long size)
{
    /* the size is invalid if it is smaller than the number of
     * elements already inside the hash table */
    // 如果当前处于rehash阶段,返回错误
    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;

    dictht n; /* the new hash table */
    // 从4开始不断乘以2,直到找到一个可行2的幂并且大于size
    unsigned long realsize = _dictNextPower(size);

    /* Rehashing to the same table size is not useful. */
    // 不需要扩容
    if (realsize == d->ht[0].size) return DICT_ERR;

    /* Allocate the new hash table and initialize all pointers to NULL */
    n.size = realsize;
    n.sizemask = realsize-1;   // 掩码设置
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    n.used = 0;

    /* Is this the first initialization? If so it's not really a rehashing
     * we just set the first hash table so that it can accept keys. */
    // 初始化
    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }

    /* Prepare a second hash table for incremental rehashing */
    d->ht[1] = n;
    d->rehashidx = 0;   // 标识当前处于rehash阶段,下一步要处理搬迁的桶的编号为0
    return DICT_OK;
}

首先根据传入的size(已有元素数量的两倍)确定一个比其大的2的幂,作为新哈希表桶的数量,由于采用的是渐进式rehash的方法,所以这里会构造一个新哈希表作为一号哈希表,然后设置rehash进度为0,即下一步要搬迁的桶的编号为0。

dictRehash

下面来看rehash的相关代码,函数_dictRehashStep会在插入更新删除查找等操作中被调用,进行一步搬迁,即最多搬迁一个非空桶。同时,在搬迁之前还会检查,如果resize策略被设置为禁止或者当前不处于rehash阶段,则直接返回,或者策略为尽可能避免avoid rehash,且新哈希表桶数量小于旧哈希表桶数量的五倍(dict_force_resize_ratio),也会直接返回。且旧哈希表里有空桶,所以这里只会扫描10 * n个空桶,然后返回,这样做的目的是避免长时间在扫描空桶,增加了一条命令的响应延迟。

// dictRehash 执行n步的rehash
int dictRehash(dict *d, int n) {
    int empty_visits = n*10; /* Max number of empty buckets to visit. */   // 最高的访问桶的次数
    // doct_can_resize 是一个全局的static变量,默认设置为DICT_RESIZE_ENABLE,表示可以扩容
    // 当不允许扩容或者当前不处于rehash阶段的时候,返回0不再需要进行rehash
    if (dict_can_resize == DICT_RESIZE_FORBID || !dictIsRehashing(d)) return 0;
    // 当扩容策略被设置为尽可能避免avoid,且新ht的桶数量相比旧ht桶的数量没有达到一定的阈值,则直接返回0,不进行rehash
    if (dict_can_resize == DICT_RESIZE_AVOID && 
        (d->ht[1].size / d->ht[0].size < dict_force_resize_ratio))
    {

        return 0;
    }

    while(n-- && d->ht[0].used != 0) {   // 开始搬迁
        dictEntry *de, *nextde;

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
        // rehashidx=-1,说明当前不处于rehash阶段,否则处于rehash阶段,且该值表示rehash到了那个桶
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
        // 找到一个非空的桶,进行搬迁,注意最多扫描10 * n个空桶,如果超过这个,也会返回,避免长时间扫描带来的服务停顿
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        de = d->ht[0].table[d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
        // 开始搬迁de这个桶
        while(de) {
            uint64_t h;

            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;  // 找到其在新桶中的位置
            de->next = d->ht[1].table[h];  //  头插法
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }

    /* Check if we already rehashed the whole table... */
    // 检查一下是否完成了所有搬迁工作
    if (d->ht[0].used == 0) {
        zfree(d->ht[0].table);
        d->ht[0] = d->ht[1];
        _dictReset(&d->ht[1]);
        d->rehashidx = -1;
        return 0;
    } 

    /* More to rehash... */
    return 1;
}
static void _dictRehashStep(dict *d) {
    // 此时没有安全的迭代器正在迭代,进行一次rehash
    if (d->iterators == 0) dictRehash(d,1);
}

参考


网站公告

今日签到

点亮在社区的每一天
去签到