Implementing Redis in C++ : F
Redis C++ 实现笔记(F篇)
前言
本章代码及思路均来自Build Your Own Redis with C/C++
本文章只阐述我的理解想法,以及需要注意的地方。
本文章为续<<Implementing Redis in C++ : E>>所以本文章不再完整讲解全部代码,只讲解其不同的地方
Redis-like命令
在原作者接下来的步骤中,原作者新增了zadd
,zrem
,zscore
,zquery
的命令,这些命令都是与redis
的命令类似,我们简单的说一下这几个命令:
zadd
:将一个元素加入到有序集合中,如果这个元素已经存在,则更新这个元素的分数。
使用方法:zadd key score member
,如果数据不存在,添加成功后,返回 (int) 1,如果数据已经存在,则更新数据,返回 (int) 0
zrem
:从有序集合中删除一个元素。
使用方法:zrem key member
,删除成功后,返回 (int) 1,如果数据不存在,则返回 (int) 0
zscore
:获取有序集合中指定成员的分数。
使用方法:zscore key member
,返回 (double) score,如果数据不存在,则返回 (nil)
zquery
:获取有序集合中 >=(score, name) 区间的成员。
使用方法:zquery zset score name skip count
,返回 (array),包含有序集合中 >=(score, name) 区间的成员。其中skip
是跳过的数量,count
是返回的成员数量。
主体思路
为了快速理解作者的代码的思路,我将定义的结构体之间的关系以以下图示描述:
g_data
└── db : HMap
├── newer : HTab (正在使用)
│ ├── tab[0] ──> HNode(Entry1) ──> HNode(Entry2) ──> ...
│ │ │
│ │ └── Entry
│ │ ├── key = "foo"
│ │ ├── type = STR
│ │ ├── str = "hello"
│ │ └── zset = (unused if type=STR)
│ │
│ ├── tab[1] ──> HNode(Entry3) ──> ...
│ │ │
│ │ └── Entry
│ │ ├── key = "myzset"
│ │ ├── type = ZSET
│ │ ├── str = ""
│ │ └── zset : ZSet
│ │ ├── root (AVLNode*)
│ │ │ ┌── ZNode::tree(score=1.0, name="a")
│ │ │ root ─┼── ZNode::tree(score=2.0, name="b")
│ │ │ └── ZNode::tree(score=3.0, name="c")
│ │ │
│ │ └── hmap : HMap
│ │ ├── newer : HTab
│ │ │ ├── tab[hash("a")] ──> HNode(ZNode "a")
│ │ │ ├── tab[hash("b")] ──> HNode(ZNode "b")
│ │ │ └── tab[hash("c")] ──> HNode(ZNode "c")
│ │ ├── older : HTab (迁移用)
│ │ └── migrate : szie_t (迁移用)
│ │
│ ├── mask : size_t (hash 索引掩码, 形如 2^k-1)
│ └── size : size_t (元素数量)
│
├── older : HTab (rehash 迁移时的旧表)
│ ├── tab : HNode*[]
│ ├── mask
│ └── size
│
└── migrate_pos : size_t (迁移进度)
g_data
└── db : HMap
HMap (HashMap)
├── newer : HTab ---> 正在使用的新表
├── older : HTab ---> 迁移中的旧表
└── migrate_pos : size_t ---> 迁移进度
HTab (节点表)
├── tab : HNode** ---> 数组,每个元素是链表头指针
│ tab[0] ---> HNode ---> HNode ---> ...
│ tab[1] ---> HNode ---> ...
│ ...
├── mask : size_t ---> 用于计算索引 (hcode & mask)
└── size : size_t ---> 表中元素个数
HNode (链表节点)
├── next : HNode* ---> 指向下一个 HNode
└── hcode : uint64_t ---> 哈希值
Entry (用户数据)
├── node: HNode <--- 侵入式节点,链接到 HTab 的 tab[i] 链表
├── key: std::string <--- 用户原始数据的 键
├── type: uint32_t <--- 数据类型
├── str: std::string <--- 用户原始数据的 值
└── zset: ZSet <--- 存放有序集合数据
ZSet (有序集合)
├── root: AVLNode* ---> 根据(socre, name)进行排序的 AVL 树根节点
└── hmap: HMap ---> 根据 name 索引的哈希表
ZNode (有序集合节点)
├── tree: AVLNode
├── hmap: HNode
├── score: double
├── len: size_t
└── name: char*
AVLNode (AVL 树节点)
├── parent: AVLNode*
├── left: AVLNode*
├── right: AVLNode*
├── height: uint32_t
└── cnt: uint32_t
在上面的整体思路中,我们拓展了Entry
中存储数据的结构,让Entry
中既可以存储str
也可以存储zset
,同时仍然保持使用Entry
的键进行快速的查找,同时因为我们使用了ZSet
,
ZSet
中使用AVL树
来存储(score, name)
的数据,也使用HMap
来索引name
。
与之前的修改相比,现在的主要修改是使用ZNode
中的hmap
来接入HMap
,并使用ZNode
中的tree
来接入AVL树
。
因为在先前,我们已经实现了AVL树
和HMap
,而实现Redis-like 命令
,就可以使用我们先去实现的这两个算法,来高效的查找数据。
所以接下来我们就要实现上面的命令和逻辑。
ZSet{}, ZNode{}, znode_new()
struct ZSet{
AVLNode* root = nullptr; // index by (score, name)
HMap hmap; // index by name
};
struct ZNode{
AVLNode tree;
HNode hmap;
double score = 0;
size_t len = 0;
char name[0]; // flexible array
};
static ZNode* znode_new(const char* name, size_t len, double score){
ZNode* node = (ZNode*)malloc(sizeof(ZNode)+len);
assert(node);
avl_init(&node->tree);
node->hmap.next = nullptr;
node->hmap.hcode = str_hash((uint8_t*)name, len);
node->score = score;
node->len = len;
memcpy(&node->name[0], name, len);
return node;
}
具体的ZSet{}
与ZNode{}
结构体的关系我们已经在上面的图示中说明了,这里不再赘述。
其中我们需要注意的是ZNode{}
中的name[0]
,这是定义了一个灵活数组,用来保存name
,我们这样定义struct
的方式,并不能使用new
来创建结构体,因为new
只会创建静态结构体,也就是不会计算我们后面增加的内容的大小,而malloc()
可以创建动态结构体,可以灵活的分配内存大小。
因为我们的name
是char
类型,所以我们直接使用sizeof(ZNode)+len
也是可以的。
要是换成其他的数据,记得要进行计算后面的大小。
zadd()
zadd()
命令,用于添加一个元素到ZSet
中,示例:zadd zset 1 name1
,也就是向Entry
中存储键为key = zset
的AVLNode Entry::zset::root
的AVL树
中插入一个(score, name)
,同时,使用哈希插入到Entry
中存储键为key = zset
的HTab Entry::zset::hmap::newer
中,将name
作为key
,score
作为value
。
所以,zadd()
命令也就是分成了三个步骤:
- 检查是否已经存在
- 存在:更新
- 不存在:插入
bool zset_insert(ZSet* zset, const char* name, size_t len, double score){
ZNode* node = zset_lookup(zset, name, len); // check the node exist
if(node){
zset_update(zset, node, score);
return false; // update existing node
}
else{
node = znode_new(name, len, score);
hm_insert(&zset->hmap, &node->hmap);
tree_insert(zset, node);
return true;
}
}
我们一步步解释其中的函数。
// a helper structure for the hashtable lookup
struct HKey{
HNode node;
const char* name = nullptr;
size_t len = 0;
};
static bool hcmp(HNode* node, HNode* key){
ZNode* znode = container_of(node, ZNode, hmap);
HKey* hkey = container_of(key, HKey, node);
if(znode->len != hkey->len)
return false;
return 0 == memcmp(znode->name, hkey->name, znode->len);
}
// lookup by name
ZNode* zset_lookup(ZSet* zset, const char* name, size_t len){
if(!zset->root)
return nullptr;
HKey key;
key.node.hcode = str_hash((uint8_t*)name, len);
key.name = name;
key.len = len;
HNode* found = hm_lookup(&zset->hmap, &key.node, &hcmp);
return found ? container_of(found, ZNode, hmap) : nullptr ;
}
我们首先创建了一个HKey
结构体,用来辅助查找。
为什么要新建一个Hkey
结构体呢?
我们的hm_lookup
函数需要(HMap* hmap,HNode* key,bool (*eq)(HNode* , HNode*))
这四个参数,同时我们传入的函数指针hcmp
还会使用container_of
来获取到上级结构体,拿到上级结构体中的数据再次进行比对,最终确认我们要找的节点,在这同时我们不能单独的传入一个char*
类型,我们就需要一个结构体来承担,存储HNode*
和char*
等的作用。
注意:
HKey
中的HNode
代表的是ZNode
中的HNode
当我们找到对应的节点后,我们就可以使用container_of
来还原回上级结构体ZNode
了,并返回。
// compare by the (score, name) tuple
static bool zless(AVLNode* lhs, double score, const char* name, size_t len){
ZNode* zl = container_of(lhs, ZNode, tree);
if(zl->score != score){
return zl->score < score;
}
int rv = memcmp(zl->name, name, min(zl->len, len));
if (rv != 0) {
return rv < 0;
}
return zl->len < len;
}
static bool zless(AVLNode* lhs, AVLNode* rhs){
ZNode* zr = container_of(rhs, ZNode, tree);
return zless(lhs, zr->score, zr->name, zr->len);
}
static void tree_insert(ZSet* zset, ZNode* node){
AVLNode* parent = nullptr; // insert under this node
AVLNode* *from = &zset->root; // the incoming pointer to the next node
while(*from){ // tree search
parent = *from;
from = zless(&node->tree, parent) ? &parent->left : &parent->right;
}
*from = &node->tree; // attach to this node
node->tree.parent = parent;
zset->root = avl_fix(&node->tree);
}
因为我们要使用AVL树
来存储(score, name)
的信息,所以树在排序的时候也是,先比较score
,然后比较name
,然后进行存储,所以当我们在搜索树准备插入数据的时候,我们也需要使用container_of
来获取上级结构体中的(score, name)
数据,也就是我们写的zless
函数。
而tree_insert
函数,也是通过while zless
找到插入的位置,然后进行插入。
// update the score of an existing node
static void zset_update(ZSet* zset, ZNode* node, double score){
if(node->score == score)
return; // no change
zset->root = avl_del(&node->tree);
avl_init(&node->tree);
node->score = score;
tree_insert(zset, node);
}
更新代码似乎不用多说,需要注意的是,修改AVL树
中的数据,可能会破坏树的有序性,所以我们要先删除原来的数据,然后再插入新的数据。
avl_del
中删除后会调用avl_fix
来修复树,不必担心树被破坏。
static bool str2dbl(const std::string &s, double &out){
char* endp = nullptr;
out = strtod(s.c_str(), &endp);
return endp == s.c_str() + s.size() && !isnan(out);
}
static bool str2int(const std::string &s, int64_t &out){
char* endp = nullptr;
out = strtoll(s.c_str(), &endp, 10);
return endp == s.c_str() + s.size();
}
// zadd zset score name
static void do_zadd(std::vector<std::string> &cmd, Ring_buf &buf){
double score = 0;
if(!str2dbl(cmd[2], score)){
return out_err(buf, ERR_BAD_ARG, "expect float");
}
// lookup the zset
LookupKey key;
key.key.swap(cmd[1]);
key.node.hcode = str_hash((uint8_t*)key.key.data(), key.key.size());
HNode* hnode = hm_lookup(&g_data.db, &key.node, &entry_eq);
Entry* ent = nullptr;
if(!hnode){
// insert a new key
ent = entry_new(T_ZSET);
ent->key.swap(key.key);
ent->node.hcode = key.node.hcode;
hm_insert(&g_data.db, &ent->node);
}
else{
ent = container_of(hnode, Entry, node);
if(ent->type != T_ZSET){
return out_err(buf, ERR_BAD_TYP, "expect zset");
}
}
// add or update the tuple
const std::string &name = cmd[3];
bool added = zset_insert(&ent->zset, name.data(), name.size(), score);
return out_int(buf, (int64_t)added);
}
因为我们接收到的网络数据是字符串,所以需要将字符串转换成对应的数据。
str2dbl
和str2int
中,首先都先定义了char *end;
这是用来在使用strtod
和strtoll
时,存放解析结束的位置,如果遇到不能解析的符号,就停留在那里,否则,就指向字符串的末尾。
使用strtod
和strtoll
将字符串转换成对应的数据,stroll
中的10
就是按照十进制来转换的。
在判断 endp == s.c_str() + s.size()
时,其实就是在验证字符串是否被完全成功解析。
s.c_str()
会把std::string
转换成 C 风格字符串(const char*)
,返回一个指向首字符的指针。s.c_str() + s.size()
指向的是字符串的末尾(即最后一个字符的下一个位置,通常是\0
的位置)。- 如果
endp
没有指向这个末尾,就说明解析过程提前停止了,后面还有未能解析的内容,此时就认为解析失败。 - 只有当
endp
恰好等于字符串末尾时,才说明整个字符串都被正确解析。
举例说明:
- 输入
"123"
→endp
会指向字符串末尾,表示解析成功。 - 输入
"123abc"
→endp
会停在'a'
处,说明"abc"
不是合法数字,因此解析不完整。
在我们的do_add
中,我们仍然是使用了一个新的结构体来辅助查询,这个结构体的作用也和我们上面将的一样,hm_lookup
需要HNode
的参数,同时又需要上级的结构体中的内容来进行比对,也就是我们传入的entry_eq
,所以需要我们新建一个LookupKey
结构体。
zrem()
// delete by name
void zset_delete(ZSet* zset, ZNode* node){
// remove from the hashtable
HKey key;
key.node.hcode = node->hmap.hcode;
key.name = node->name;
key.len = node->len;
HNode* found = hm_delete(&zset->hmap, &key.node, &hcmp);
assert(found);
// remove from the AVL tree
zset->root = avl_del(&node->tree);
znode_del(node);
}
对于我们删除的函数HNode* hm_delete(HMap* hmap,HNode* key,bool (*eq)(HNode* , HNode*))
,同样也是需要我们的HKey
来辅助删除的,hcmp
通过比对node
上级结构体中的数据,来辅助查找对应的节点。
static const ZSet k_empty_zset;
static ZSet* expect_zset(std::string &s){
LookupKey key;
key.key.swap(s);
key.node.hcode = str_hash((uint8_t*)key.key.data(), key.key.size());
HNode* hnode = hm_lookup(&g_data.db, &key.node, &entry_eq);
if(!hnode){
// a non-existent key is treated as an empty zset
return (ZSet*)&k_empty_zset;
}
Entry* ent = container_of(hnode, Entry, node);
return ent->type == T_ZSET ? &ent->zset : nullptr;
}
static void do_zrem(std::vector<std::string> &cmd, Ring_buf &buf){
ZSet* zset = expect_zset(cmd[1]);
if(!zset){
return out_err(buf, ERR_BAD_TYP, "expect zset");
}
const std::string &name = cmd[2];
ZNode* znode = zset_lookup(zset, name.data(), name.size());
if(znode){
zset_delete(zset, znode);
}
return out_int(buf, znode ? 1 : 0);
}
LookupKey
的使用与HKey
类似,不过,LookupKey
中的HNode
代表(查找)的是Entry
中的HNode
我们的zrem
命令会先查找键,然后删除键中ZSet
中的元素,而数据的键存储在Entry
中,所以我们通过expect_zset
来获取键对应的ZSet
,然后通过zset_delete
来删除键对应的元素。
在这其中,我们还定义了一个static const ZSet k_zset_empty;
,用来作为,当key
不存在时,返回的ZSet
。
为什么我们要使用一个自己定义的空对象呢?
返回空对象 k_empty_zset
表key 不存在时等同空集合,并与 Redis 语义一致,让返回nullptr
只有一种情况,即key
存在但不是ZSet
。
zscore()
static void do_zscore(std::vector<std::string> &cmd, Ring_buf &buf){
ZSet* zset = expect_zset(cmd[1]);
if(!zset){
return out_err(buf, ERR_BAD_TYP, "expect zset");
}
const std::string &name = cmd[2];
ZNode* znode = zset_lookup(zset, name.data(), name.size());
return znode ? out_dbl(buf, znode->score) : out_nil(buf);
}
查找指定有序集(ZSet
)中的成员(name
)的分数(score
)
这个似乎就不用多说了,先使用expect_zset()
函数获取有序集,然后使用zset_lookup()
函数查找成员,如果找到则返回分数,否则返回nil
。
zquery()
用来查找指定有序集(ZSet
)中 **>=(score, name)**的成员
使用命令:zquery zset score name offset limit
其中offset
为查询的起始位置,limit
为查询的个数
// find the first (score, name) tuple that is >= key.
ZNode* zset_seekge(ZSet* zset, double score, const char* name, size_t len){
AVLNode* found = nullptr;
for(AVLNode* node = zset->root; node;){
if(zless(node, score, name, len))
{
node = node->right;
}
else
{
found = node;
node = node->left;
}
}
return found ? container_of(found, ZNode, tree) : nullptr;
}
首先我们使用zless
函数来判断当前节点的分数是否小于给定的分数。如果小于,则说明当前节点的分数小于给定的分数,那么我们就向右移动,同时记录当前大于 (score, name) 的节点,否则向左移动,直到找到第一个大于 (score, name) 的点。
// offset into the succeeding or preceding node
ZNode* znode_offset(ZNode* node, int64_t offset){
AVLNode* tnode = node ? avl_offset(&node->tree, offset) : nullptr;
return tnode ? container_of(tnode, ZNode, tree) : nullptr;
}
这个代码就是通过offset
找到,当前传入节点的前驱或后继
下面的代码是具体的实现
AVLNode* avl_offset(AVLNode* node, int64_t offset){
int64_t pos = 0; // the rank of difference from the starting node
while(offset != pos){
if(pos < offset && pos+avl_cnt(node->right) >= offset){
// the target is inside the right subtree
node = node->right;
pos += avl_cnt(node->left) + 1;
}
else if(pos > offset && pos - avl_cnt(node->left) <= offset){
// the target is inside the left subtree
node = node->left;
pos -= avl_cnt(node->right) + 1;
}
else{
// go to the parent
AVLNode* parent = node->parent;
if(!parent){
return nullptr;
}
if(parent->right == node){
pos -= avl_cnt(node->left)+1;
}
else{
pos += avl_cnt(node->right)+1;
}
node = parent;
}
}
return node;
}
3
/ \
2 5
/ /
1 4
index: 0 1 2 3 4
value: [1] [2] [3] [4] [5]
P --> O
offset
如果大于零,那就是要找当前节点的右子树的大小,如果右子树的大小不够,也就是说,P
的移动,无法移动到我们要找的节点,所以我们要向上移动,增大我们要找的右子树的范围,让我们的P
可以移动到对应的节点
P
在向上移动的过程中,如果在移动前的节点是其父节点的右子树,那我们挪上去后,P要相应的减去左子树的大小,也就是向左移动
为什么要向左移动,假设我们在移动前节点的父节点上,想要移动到右子树,也就是更大的值的地方,我们的P
是要向右移动的,而向右移动多少?那就是我们移动前的节点的左子树的大小(AVL中序遍历的输出是有序的,所以不必担心左子树的值比右子树的值大)
那么相应的,如果我们在移动前的节点是其父节点的左子树,那我们挪上去后,P要相应的加上右子树的大小,也就是向右移动
也正如上面我们所说的,我们要找到pos == offset
的位置
当pos < offset
时,也就是说,我们的Offset
所指的位置,还在P所指位置的右侧,我们要向右移动,如果我们的右子树足够大(avl_cnt)
,也就是说,P可以向右移动的距离足够大,那就可以直接移动到右子树中,查找对应的offset
所指的位置
如果右子树不够大,那就只能向上移动,扩大我们可以移动的范围
同理,当pos > offset
时,也就是说,我们的Offset
所指的位置,还在P
所指位置的左侧,我们要向左移动,如果我们的左子树足够大(avl_cnt)
,也就是说,P
可以向左移动的距离足够大,那就可以直接移动到左子树中,查找对应的offset
所指的位置
如果左子树不够大,那就只能向上移动,扩大我们可以移动的范围
从节点 3 出发,offset = +2 (也就是要找 5 节点)
我们发现 3 的右子树的大小是 2 (5 节点和 4 节点),也就是P所指的位置可以向右移动 2 个位置,刚好可以移动到 5 节点
那我们就进入右子树。
// zquery zset score name offset limit
static void do_zquery(std::vector<std::string> &cmd, Ring_buf &buf){
// parse args
double score = 0;
if(!str2dbl(cmd[2], score)){
return out_err(buf, ERR_BAD_ARG, "expect fp number");
}
const std::string &name = cmd[3];
int64_t offset = 0, limit = 0;
if(!str2int(cmd[4], offset) || !str2int(cmd[5],limit)){
return out_err(buf, ERR_BAD_ARG, "expect int");
}
// get the zset
ZSet* zset = expect_zset(cmd[1]);
if(!zset){
return out_err(buf, ERR_BAD_TYP, "expect zset");
}
// seek to the key
if(limit <= 0){
return out_arr(buf,0);
}
ZNode* znode = zset_seekge(zset, score, name.data(), name.size());
znode = znode_offset(znode, offset);
// output
size_t ctx = out_begin_arr(buf);
int64_t n = 0;
while(znode && n < limit){
out_str(buf, znode->name, znode->len);
out_dbl(buf, znode->score);
znode = znode_offset(znode, 1);
n += 2;
}
out_end_arr(buf, ctx, (uint32_t)n);
}
我们将上面的代码进行结合,先获取输入的score
和name
,再通过except_zset
获取到对应的有序表(ZSet
),然后通过zset_seekge
寻找到第一个大于(score, name)
的ZNode
,然后通过znode_offset
计算跳过的个数,最后进行输出。
end
这些就是代码修改的主体,其他的部分改动较小,我们就不再讲述了,鉴于代码放在这里实在太多,我给出我的github地址,大家可以去找study/dev_4
的目录进行查看
github地址:https://github.com/AuroBreeze/Implementing-Redis-in-C