golang/java实现跳表的数据结构

发布于:2024-12-18 ⋅ 阅读:(213) ⋅ 点赞:(0)

1、跳表数据结构说明

最近在写一款中间件,类似于redis的中间件,我准备用golang语言来编写,目前已经实现了redis的大部分数据结构,比如string、hash、set、list,而开始实现zset的时候发现要使用跳表,在网上实现的跳表数据结构感觉都不太合适,所以就自己来实现了一个,我也是瞅着空闲的时间来实现了一把,我也不知道性能如何,还没来得及测试,欢迎大家在此基础上进行修正和讨论;
跳表(Skip List)是一种概率性的动态数据结构,通常用于实现有序集合(如集合、映射等)的查找、插入和删除操作。它能够在平均情况下提供对数时间复杂度(O(log n)),比起传统的链表和二叉搜索树,它在实现上更简单。

1.1、跳表的基本结构

跳表由多个层级的链表构成,每一层都是对下一层链表的“跳跃”,因此得名“跳表”。最底层是一个有序的链表,其上层链表则是从下层链表中按一定规则选出的元素组成。跳表的结构通常由以下几个层级构成:

第0层(底层):这是最基础的一层,包含了所有的元素,形成一个有序链表。
第1层及以上:这些层是通过概率性的方式从第0层元素中选择一部分元素构成的。在每一层,节点的出现概率通常是固定的(例如每个节点有50%的概率出现在上一层)。

1.2、跳表的操作

查找(Search):

从最上层的链表开始查找,按照节点的值进行比较,如果当前节点的值比目标值小,就跳到下一个节点,否则下降到下一层继续查找。
这种方式可以减少查找的时间复杂度,期望时间为 O(log n)。
插入(Insert):

首先进行查找操作,找到插入位置。
然后根据一定的概率决定是否在上层创建新节点,最终插入节点到每一层对应的链表中。插入操作也有期望时间 O(log n)。
删除(Delete):

类似于查找,找到待删除的节点后,将其从每一层的链表中移除,时间复杂度为 O(log n)。

1.3、跳表的优点

简单易实现:跳表比起平衡二叉树(如AVL树、红黑树等)实现起来更加简单,尤其在处理动态变化的数据时。
动态平衡:跳表是基于概率的动态数据结构,不需要像平衡树那样进行显式的重平衡操作。
较好的空间利用:相比于红黑树的每个节点存储颜色、父子指针,跳表的每个节点只需要存储向后和向下的指针。

1.4、跳表的缺点

空间开销:由于需要维护多层链表,跳表的空间复杂度是 O(n),其中 n 是元素的数量。对于每个元素,可能需要在不同层级上维护多个指针。
不适用于非常小的数据集:对于非常小的数据集,跳表可能并不如线性结构(如链表)高效。
跳表在很多场景中都能提供较好的性能,特别是在需要频繁插入和删除操作的有序集合中,常用于数据库、缓存系统、分布式系统等场景。

1.5、我的实现和设计

我这边设计是按照我跳表的思想进行设计的,首先有一个垂直指针指向下一层,每一层有一个水平的单向指针指向了下一个元素,每个节点是作为一个Entry来构成,Entry里面包含了两个元素,一个是element,一个score,我这边只想做redis的zset的数据结构,所以就没考虑其他数据类型,当然是可以抽出一个通用的跳表结构,我优点忙,就难得整了,设计大概如下:

在这里插入图片描述
从图中可以清楚的指导,节点SkipListNode里面的有一个right指针和down指针分别对应水平指针和垂直指针,用来和水平节点和上下节点建立关系的;每一层的head节点本身不存在实际的数据,只作为一个头结点存在,我是根据自身的一些常见设计的,可能毕竟简单,但是够用足以,各位看官如果觉得哪里不合理的或者有更好的方式请在评论区讨论;

2、java的实现

本来我最早是用golang实现的,但是golang写起来感觉还是没java那么舒服,所以就先用java实现一下。

2.1、SkipListNode.java

class SkipListNode {
    String element; //元素的名称
    double score; //元素的分数
    SkipListNode right;//水平指针
    SkipListNode down;//垂直指针

    public SkipListNode(String element, double score) {
        this.element = element;
        this.score = score;
        this.right = null;
        this.down = null;
    }
}

2.2、SkipList.java

class SkipList {
    private static final int MAX_LEVEL = 8; // 最大层数
    private int currentMaxLevel = 1;       // 当前最大层数
    private SkipListNode head;
    private int levels;
    private Map<String, SkipListNode> ref;
    private int size;

    public SkipList() {
        this.head = new SkipListNode(null, Double.NEGATIVE_INFINITY); // 头节点,起始无穷小
        this.levels = 1;
        this.ref = new HashMap<>();
        size = 0;
    }

    // 随机生成层数
    private int randomLevel() {
        int level = 1;
        int maxAllowedLevel = Math.min(currentMaxLevel + 1, MAX_LEVEL);
        while (level < maxAllowedLevel && Math.random() < 0.5) {
            level++;
        }
        return level;
    }

    // 插入元素
    public void insert(String element, double score) {
        // Step 1: 查找插入位置,并记录路径
        List<SkipListNode> path = new ArrayList<>();
        SkipListNode current = head;

        // 查找插入位置并记录路径
        while (current != null) {
            while (current.right != null && current.right.score < score) {
                current = current.right;
            }
            path.add(current); // 记录路径
            current = current.down;
        }

        // Step 2: 生成新节点的层数
        int newLevel = randomLevel();
        currentMaxLevel = Math.max(currentMaxLevel, newLevel); // 更新当前最大层数

        // Step 3: 创建新节点,并逐层插入
        SkipListNode downNode = null; // 用来连接下层的节点
        SkipListNode newNode = new SkipListNode(element, score); // 只创建一次新节点

        // Step 4: 按层插入节点
        for (int i = 0; i < newLevel; i++) {
            // 如果路径中没有足够的层,则扩展头节点
            if (i >= path.size()) {
                SkipListNode newHead = new SkipListNode(null, Double.NEGATIVE_INFINITY); // 创建新头节点
                newHead.down = head; // 将原头节点挂到新头节点下层
                head = newHead; // 更新头节点
                levels++;
                path.add(0, newHead); // 将新头节点添加到路径中
            }

            // 当前层的前一个节点
            SkipListNode prev = path.get(path.size() - 1 - i);

            // 插入当前层的新节点
            newNode.right = prev.right; // 将当前节点的 right 指向原来节点的 right
            prev.right = newNode;       // 将前一个节点的 right 指向新节点
            newNode.down = downNode;    // 将新节点的 down 指向下层节点
            downNode = newNode;         // 更新下层节点为当前新节点

            // 如果新层还需要节点,继续创建新节点实例
            if (i < newLevel - 1) {
                newNode = new SkipListNode(element, score); // 在此处复用对象
            }
        }
        ref.put(element, newNode);
        size++;
    }


    // 查找单个元素
    public SkipListNode find(String element) {
        SkipListNode current = head;
        while (current != null) {
            // 在右边链表中查找匹配的节点
            while (current.right != null && current.right.score <= Double.POSITIVE_INFINITY) {
                if (current.right.element != null && current.right.element.equals(element)) {
                    return current.right;
                }
                current = current.right;
            }
            // 如果当前层找不到,向下层移动
            current = current.down;
        }
        return null; // 未找到时返回 null
    }

    public List<String> rangeSearchIndex(int startIndex, int endIndex) {
        List<String> result = new ArrayList<>();
        SkipListNode current = head;

        // 找到最底层的起始节点
        while (current.down != null) {
            current = current.down;
        }
        current = current.right; // 跳过头节点

        // 遍历跳表,找到索引范围内的元素
        int currentIndex = 0;
        while (current != null && currentIndex <= endIndex) {
            if (currentIndex >= startIndex) {
                result.add(current.element);
            }
            current = current.right;
            currentIndex++;
        }
        return result;
    }

    // 查找范围内的元素
    public List<String> rangeSearch(double startScore, double endScore) {
        List<String> result = new ArrayList<>();
        SkipListNode current = head;
        // 找到范围的起点
        while (current != null) {
            while (current.right != null) {
                if (current.right.score < startScore) {
                    current = current.right;
                } else {
                    //找到0 level
                    if (current.down != null) {
                        while (current.down != null) {
                            current = current.down;
                        }
                    }
                    while (current.right != null) {
                        if (current.right.score >= startScore && current.right.score <= endScore) {
                            result.add(current.right.element);
                        }
                        current = current.right;
                    }
                }

            }
            current = current.down;
        }
        return result;
    }

    // 查找排名,通过层索引到 0 level的位置,然后从 0 level开始搜索
    public int rank(String element) {
        SkipListNode current = head;
        int rank = 0; // 记录排名
        // 首先找到目标节点
        SkipListNode targetNode = ref.get(element);
        if (targetNode == null) {
            return -1; // 如果元素不存在,返回 -1 表示未找到
        }
        double targetScore = targetNode.score;
        // 遍历跳表,计算排名
        boolean found = false;
        end:
        while (current != null) {
            // 向右移动,累加小于目标分值的节点数量
            SkipListNode rightNode = current.right;
            while (rightNode != null && rightNode.score <= targetScore) {
                if (rightNode.score == targetScore) {
                    //找到targetScore的0 level的位置
                    found = true;
                    if (rightNode.down == null) {
                        //如果rightNode.down为空,代表是0 level
                        current = rightNode;
                    } else {
                        //找到0 level的最大位置
                        while (rightNode.down != null) {
                            rightNode = rightNode.down;
                        }
                        current = rightNode;
                    }
                    break end;
                }
                rightNode = rightNode.right;
            }
            // 向下层移动
            current = current.down;
        }
        if (!found) {
            return -1;
        }
        SkipListNode find0Level = head;
        // 找到最底层的起始节点
        while (find0Level.down != null) {
            find0Level = find0Level.down;
        }
        find0Level = find0Level.right; // 跳过头节点

        while (find0Level != null && find0Level != current) {
            rank++;
            find0Level = find0Level.right;
        }
        // 最后加上当前层中等于目标分值的节点
        return rank + 1; // 排名从 1 开始
    }


    // 打印跳表(调试用)
    public void print() {
        SkipListNode current = head;
        while (current != null) {
            SkipListNode node = current.right;
            while (node != null) {
                System.out.print("(" + node.element + ", " + node.score + ") ");
                node = node.right;
            }
            System.out.println();
            current = current.down;
        }
    }

    public boolean delete(String element) {
        SkipListNode current = head;
        boolean found = false;

        SkipListNode skipListNode = ref.get(element);
        if (skipListNode == null) {
            return false;
        }
        // 遍历每一层,找到并删除目标节点
        while (current != null) {
            while (current.right != null ) {
                if(current.right.element.equals(element)){
                    found = true;
                    current.right = current.right.right; // 跳过目标节点
                    break;
                }
                current = current.right;
            }
            // 向下层移动
            current = current.down;
        }
        if (found){
            size--;
        }
        // 如果目标节点存在但已被删除,返回 true,否则返回 false
        return found;
    }

java的实现比较简单,只是大概得实现了一下,没有太多废话,具体大家可以看代码,注释还是挺清楚的

3、golang的实现

golang的实现就稍微详细一些,因为我用于zset的实现结构里面在,但是也可能存在一些问题,我还没来得及的详细的测试,目前还在开发中

3.1、Entry

// Entry 每个节点存入的数据Entry
type Entry struct {
	Element string  //存入的元素
	Score   float64 //元素对应的分值
}

3.2、Node

// Node 节点
type Node struct {
	Entry *Entry //节点Entry
	Right *Node  //水平指针
	Down  *Node  //垂直指针
}

3.3、SkipList

// SkipList 跳表结构
type SkipList struct {
	CurrentMaxLevel uint8               //当前最大的层级
	Head            *Node               //头节点
	Levels          uint8               //层数
	Ref             map[string]*float64 //引用关系
	Size            uint16              //存入的元素个数
	NodeCount       uint16
}

3.4、初始化创建的一些函数


// 初始化节点
func createNode(element string, score float64) *Node {
	return &Node{
		Entry: &Entry{
			Element: element,
			Score:   score,
		},
		Right: nil,
		Down:  nil,
	}
}
//创建一个空的SkipList
func NewSkipList() *SkipList {
	return &SkipList{
		CurrentMaxLevel: 1,
		Levels:          1,
		Ref:             make(map[string]*float64),
		Head:            createNode("", utils.MinFloatValue()), // 头节点,起始无穷小
		Size:            0,
	}
}

createNode:主要用来创建节点使用
NewSkipList:创建一个新的跳表

3.5、绑定SkipList的方法


func (sl *SkipList) Insert(element string, score float64) {
	sl.Push(&Entry{
		Element: element,
		Score:   score,
	})
}

func (sl *SkipList) Push(entry *Entry) {
	// Step 1: 查找插入位置,并记录路径
	path := make([]*Node, 0)
	current := sl.Head
	score := entry.Score
	element := entry.Element
	sl.Remove(element)
	// 查找插入位置并记录路径
	for i := 0; current != nil; i++ {
		for current.Right != nil && current.Right.Entry.Score < score {
			current = current.Right
		}
		path = append(path, current) // 记录路径
		current = current.Down
	}
	// Step 2: 生成新节点的层数
	newLevel := sl.randomLevel()
	if newLevel > sl.CurrentMaxLevel {
		sl.CurrentMaxLevel = newLevel
	}
	// Step 3: 创建新节点,并逐层插入
	var downNode *Node                    // 用来连接下层的节点
	newNode := createNode(element, score) // 只创建一次新节点
	// Step 4: 按层插入节点
	for i := 0; i < int(newLevel); i++ {
		// 如果路径中没有足够的层,则扩展头节点
		if i >= len(path) {
			newHead := createNode("", utils.MinFloatValue()) // 创建新头节点
			newHead.Down = sl.Head                           // 将原头节点挂到新头节点下层
			sl.Head = newHead                                // 更新头节点
			sl.Levels++
			newPath := make([]*Node, len(path)+1)
			newPath[0] = newHead
			copy(newPath[1:], path)
			path = newPath // 将新头节点添加到路径中
		}
		sl.NodeCount++
		// 当前层的前一个节点
		prev := path[len(path)-1-i]
		// 插入当前层的新节点
		newNode.Right = prev.Right // 将当前节点的 right 指向原来节点的 right
		prev.Right = newNode       // 将前一个节点的 right 指向新节点
		newNode.Down = downNode    // 将新节点的 down 指向下层节点
		downNode = newNode         // 更新下层节点为当前新节点
		// 如果新层还需要节点,继续创建新节点实例
		if i < int(newLevel-1) {
			newNode = createNode(element, score) // 在此处复用对象
		}
	}
	sl.Ref[element] = &score
	sl.Size++
}

func (sl *SkipList) Find(element string) *Entry {
	if score, ok := sl.Ref[element]; ok {
		return &Entry{element, *score}
	}
	return nil
}

// 在 正态分布(又叫高斯分布)的 randomLevel 中,mean 和 stddev 是决定生成的层数分布的两个关键参数:
// mean(均值):正态分布的均值决定了层数的集中趋势,即跳表生成的节点层数的“中心”位置。一般来说,mean 越大,跳表的层数就越高,生成的高层节点的概率也越大。
// 在跳表的上下文中,mean 通常代表了跳表的 预期层数。例如,如果 mean = 3,则大多数节点的层数会集中在 3 层附近。
// stddev(标准差):标准差决定了层数的 分散程度。标准差越小,层数会越集中在均值附近;标准差越大,层数的变化范围越广,生成高层节点的概率会更大,也会导致跳表更高。
// 小标准差意味着跳表的层数相对稳定,大多数节点的层数接近于 mean。
// 大标准差意味着层数的变化较大,可能会有一些节点生成非常高层数的情况。
// todo 这个随机生成层数的算法没有经过数据的验证,后面测试中如果发现分布不均匀的时候再来调整
func (sl *SkipList) randomLevel() uint8 {
	var mean float64
	var stddev float64
	if sl.NodeCount <= 1000 {
		//当节点数小于1000的时候,我们设置均值为3,标准差为0.5
		mean = 3.0
		stddev = 0.5
	} else {
		// 根据节点数动态设置 mean。使用 log2(nodeCount) 或者比例
		mean = math.Log2(float64(sl.NodeCount))
		if mean > float64(maxLevel) {
			mean = float64(maxLevel) // 层数不会超过最大值
		}
		// 当跳表当前层数较低时,可以增加 mean,帮助生成更多的高层节点
		if sl.CurrentMaxLevel < maxLevel {
			mean = (mean + float64(sl.CurrentMaxLevel)) / 2
		}
		// 根据当前层数设置 stddev
		// 如果当前层数较小,增加 stddev,允许更大的层数波动
		stddev = mean / 2
		if sl.CurrentMaxLevel < 3 {
			stddev = mean // 增加波动性
		} else {
			stddev = mean / 2 // 当层数多时,减少波动
		}
	}

	// 使用正态分布生成一个随机数
	level := uint8(mean + stddev*rand.NormFloat64())
	//随机生成的层数不能大于了当前最大层上一层,比如当前最大的是3,生成的level最大只能是4,不能是大于4
	if level > sl.CurrentMaxLevel {
		level = sl.CurrentMaxLevel + 1
	}
	if level > maxLevel {
		level = maxLevel
	}
	return level
}

// Rank 查找排名,通过层索引到 0 level的位置,然后从 0 level开始搜索
func (sl *SkipList) Rank(element string) int {
	current := sl.Head
	rank := 0 // 记录排名
	// 首先找到目标节点
	targetScore := sl.Ref[element]
	if targetScore == nil {
		return -1 // 如果元素不存在,返回 -1 表示未找到
	}
	// 遍历跳表,计算排名
	found := false
end:
	for current != nil {
		// 向右移动,累加小于目标分值的节点数量
		rightNode := current.Right
		for rightNode != nil && rightNode.Entry.Score <= *targetScore {
			if rightNode.Entry.Score == *targetScore {
				//找到targetScore的0 level的位置
				found = true
				if rightNode.Down == nil {
					//如果rightNode.down为空,代表是0 level
					current = rightNode
				} else {
					//找到0 level的最大位置
					for rightNode.Down != nil {
						rightNode = rightNode.Down
					}
					current = rightNode
				}
				break end
			}
			rightNode = rightNode.Right
		}
		// 向下层移动
		current = current.Down
	}
	if !found {
		return -1
	}
	find0Level := sl.Head
	// 找到最底层的起始节点
	for find0Level.Down != nil {
		find0Level = find0Level.Down
	}
	find0Level = find0Level.Right // 跳过头节点

	for find0Level != nil && find0Level != current {
		rank++
		find0Level = find0Level.Right
	}
	// 最后加上当前层中等于目标分值的节点
	return rank + 1 // 排名从 1 开始
}

func (sl *SkipList) FindForScore(element string) float64 {
	if score, exists := sl.Ref[element]; exists {
		return *score
	}
	return utils.MinFloatValue()
}

func (sl *SkipList) RangeCount(startScore, endScore float64) (count uint16) {
	current := sl.Head
	for current != nil {
		for current.Right != nil {
			if current.Right.Entry.Score < startScore {
				current = current.Right
			} else {
				//找到0 level
				if current.Down != nil {
					for current.Down != nil {
						current = current.Down
					}
				}
				for current.Right != nil {
					if current.Right.Entry.Score >= startScore && current.Right.Entry.Score <= endScore {
						count++
					}
					current = current.Right
				}
			}
		}
		current = current.Down
	}
	return
}

func (sl *SkipList) Range(startScore, endScore float64, offset, count int, reverse bool) (ret []*context.Pair[float64, string]) {
	current := sl.Head
	limit := count > 0
	if limit {
		ret = make([]*context.Pair[float64, string], count)
	}
	// 找到范围的起点
	randIndex := 0
	offsetIndex := 0
endFind:
	for current != nil {
		for current.Right != nil {
			if current.Right.Entry.Score < startScore {
				current = current.Right
			} else {
				//找到0 level
				if current.Down != nil {
					for current.Down != nil {
						current = current.Down
					}
				}
				for current.Right != nil {
					if current.Right.Entry.Score >= startScore && current.Right.Entry.Score <= endScore {
						if offsetIndex >= offset {
							if limit && !reverse {
								ret[randIndex] = &context.Pair[float64, string]{Key: current.Right.Entry.Element, Value: current.Right.Entry.Score}
								randIndex++
								offsetIndex++
								if randIndex >= count {
									break endFind
								}
							} else {
								ret = append(ret, &context.Pair[float64, string]{Key: current.Right.Entry.Element, Value: current.Right.Entry.Score})
							}
						} else {
							offsetIndex++
						}
					}
					current = current.Right
				}
			}
		}
		current = current.Down
	}
	return
}

func (sl *SkipList) RangeIndex(startIndex, endIndex int) (ret []context.Pair[float64, string]) {
	// 处理负索引,-1代表最后一个元素,-2代表倒数第二个元素
	ret = make([]context.Pair[float64, string], 0)
	if endIndex < 0 {
		endIndex = int(sl.Size) + endIndex // 将负数索引转换为正数
	}
	if startIndex < 0 {
		startIndex = int(sl.Size) + startIndex // 将负数索引转换为正数
	}
	current := sl.Head
	// 找到最底层的起始节点
	for current.Down != nil {
		current = current.Down
	}
	current = current.Right // 跳过头节点
	// 遍历跳表,找到索引范围内的元素
	currentIndex := 0
	for current != nil && currentIndex <= endIndex {
		if currentIndex >= startIndex {
			ret = append(ret, context.Pair[float64, string]{Key: current.Entry.Element, Value: current.Entry.Score})
		}
		current = current.Right
		currentIndex++
	}
	return
}

func (sl *SkipList) Remove(element string) bool {
	current := sl.Head
	found := false
	score := sl.Ref[element]
	if score == nil {
		return false
	}
	// 遍历每一层,找到并删除目标节点
	for current != nil {
		//每一层都是从头开始搜索
		rightNode := current
		currLayerFound := false
		for rightNode.Right != nil && rightNode.Right.Entry.Score <= *score {
			if rightNode.Right.Entry.Element == element {
				rightNode.Right = rightNode.Right.Right // 跳过目标节点
				found = true
				//如果搜索到了,那么修改下down的指针
				current = rightNode.Down
				currLayerFound = true
				sl.NodeCount--
				break
			}
			rightNode = rightNode.Right
		}
		// 向下层移动
		if !currLayerFound {
			current = current.Down
		}
	}
	if found {
		sl.Size--
		delete(sl.Ref, element)
		return true
	}
	// 如果目标节点存在但已被删除,返回 true,否则返回 false
	return false
}

4、最后看下中间件的全貌

4.1、客户端

在这里插入图片描述

4.2、服务端

在这里插入图片描述

4.3、数据节点

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到