跳表-实现篇(Redis)
简介
本文基于 Redis3.0 版本的源码解读跳表的实现
Redis3.0 源码链接:[redis/redis at 3.0.0](https://github.com/redis/redis/tree/3.0.0)建议阅读:
本文按照以下顺序进行:
背景知识
Redis 内置了多种数据结构,其中`zset`(有序集合,英文名:sorted set)兼具集合的元素唯一性以及有序性(元素有序性)两个特性,被广泛用在日常业务需求的开发中zset
由多个数据结构组合实现的,而跳表(zskiplist
)就是实现有序性的关键数据结构,Redis 就是通过跳表实现了zset
结构的排序、根据排名的范围获取等操作
源码解读
结构体定义
zset
1 | typedef struct zset { |
实现zset
的结构体定义,从结构体中的成员
dict
(dict
):实现集合的特性zsl
(zskiplist
):实现有序(排序)的特性
就可以知道zset
就是通过组合dict
( Redis 底层数据结构之一,即哈希表)、zskiplist
(跳表)两种数据结构实现的
zskiplist
1 | typedef struct zskiplist { |
header
、tail
:头尾指针length
:元素个数level
:当前跳表的最高层级
zskiplistNode
1 | /* ZSETs use a specialized version of Skiplists */ |
rojb
:元素值score
:分数,也就是跳表原始论文中的key
backward
:回退指针,主要用于反向遍历使用level
forward
:前进指针span
:前进指针的跨度(用来记录对应的前进指针向前时跨过的元素个数,基于这个字段可以很方便的计算排位)
常量定义
1 |
随机函数
1 | /* Returns a random level for the new skiplist node we are going to create. |
初始化
1 | zskiplist *zslCreate(void) { |
通过zslCreate
可以创建一个zskiplist
指针
插入
Redis 跳表引入了一些额外的字段/机制以符合实际业务操作的需要函数声明
1 | zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj); |
相比于原始论文跳表的实现而言多了以下两块逻辑:
span
(跨度)的维护backward
(回退指针)的维护
除了维护上述的两块逻辑,可以发现实际上代码的实现和论文伪代码基本相同
也是分为三个步骤
遍历跳表,找到符合的节点位置,并记录每一层需要更新的节点(插入的节点对应的前驱节点)(也就是记录
update
数组)- 增加
rank
数组记录每层的排位
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
// 初始化排位信息,如果是最顶层,则初始化为0,其他情况就是上一层的rank的值
// 比如从4层开始遍历,rank[3]=0,rank[2]=rank[3]
// 其实也很好理解,当前层的排位应该基于上一层的排位接着计数
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
// 这里的比较除了根据score比较之外,还增加了score相同情况下根据obj(值)比较的场景
// 更新排位,增加跨度就是新的排位
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
// 记录节点(这个节点就是插入新节点的每一层对应的前驱节点)
// 如果说新节点的层级超过当前跳表的最高层级,这里是一定记录不到的,所以才有下面单独判断并处理的逻辑
// 保证代码逻辑统一也让代码更具可读性
update[i] = x;
}- 增加
调用随机函数生成新节点层级,如果新节点层级超过当前跳表最高层,则更新
update
数组(超过最高层的前驱节点就是头节点,但是遍历过程不会记录,为了让后续能够用统一代码逻辑处理,单独进行判断并更新update
数组,还有同步更新跳表level
字段- 增加
span
字段的初始化逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17// 调用随机函数生成level
level = zslRandomLevel();
// 如果新节点level超过当前跳表最高层
if (level > zsl->level) {
// 只处理超出当前跳表最高层的部分
// (剩下的都会在遍历过程中被记录,只有超过的部分不会被记录,所以单独在这里处理)
for (i = zsl->level; i < level; i++) {
// 排位初始化,和上面遍历过程的初始化逻辑相同
rank[i] = 0;
// 新节点层级是最高的,所以只会有一个元素,前驱节点只能是header
update[i] = zsl->header;
// 同理,只有一个元素,所以跨度就是跳表长度(没更新前,跳表长度在最后才更新,所以这里就是原长度)
update[i]->level[i].span = zsl->length;
}
// 更新跳表的最高层级
zsl->level = level;
}- 增加
创建新节点,更新跳表
- 增加
span
字段的更新
1
2
3
4
5
6
7
8
9
10
11
12
13
14x = zslCreateNode(level,score,obj);
for (i = 0; i < level; i++) {
// 从下往上更新
// 以下两句和链表插入的是相同的,就是将新节点插入的逻辑
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
// 根据每层节点的排位推算出跨度
// 更新新节点的跨度
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
// 更新每层前驱节点的跨度(考虑到前驱节点可能是那一层的最后一个节点,所以这里不能简单地直接自增1)
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}- 增加
- Redis 多了最后一个步骤(其余字段的更新)
- 更新
span
跨度 - 更新
backward
- 更新
tail
- 更新
length
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
// 对于新节点没有涉及到的层级,跨度直接自增1即可
update[i]->level[i].span++;
}
// 更新BW指针,BW指针没有多层的概念
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
// 如果新节点不是最后一个节点,那就要更新后继节点BW指向新节点
x->level[0].forward->backward = x;
else
// 新节点是最后一个节点,更新tail
zsl->tail = x;
// 跳表长度自增1
zsl->length++;
return x; - 更新
跨度更新
以下面的跳表插入score=8的节点为例
rank数组:[4, 4, 2, 2]
update数组:[B, B, A, A]
(A
、B
为节点代号)
新节点
1 | x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); |
针对新节点,需要更新新节点对应的每一层的前进指针的跨度
$i=0$,更新新节点第一层的跨度
$update[0] = B$
$update[0]->level[0].span = 1$
$rank[0] - rank[0] = 4 - 4 = 0$
$x->level[0].span = 1 - 0 = 1$
$i=1$,更新新节点第二层的跨度
$update[1] = B$
$update[1]->level[1].span = 1$
$rank[0] - rank[1] = 0$
$x->level[1].span = 1 - 0 = 1$
$i=2$,更新新节点第三层的跨度
$update[2] = A$
$update[2]->level[2].span = 3$
$rank[0] -rank[2] = 4 - 2 = 2$
$x->level[2].span = 3 - 2 = 1$
$i=3$,更新新节点第四层的跨度
$update[3] = A$
$update[3]->level[3].span = 3$
$rank[0] - rank[3] = 4 - 2 = 2$
$x->level[3].span = 3 - 2 = 1$
前驱节点
新节点层级内的节点
1 | update[i]->level[i].span = (rank[0] - rank[i]) + 1; |
$i=0$,更新第一层节点(节点
A
)$update[0]->level[0].span = (rank[0] - rank[0]) + 1 = (4 - 4) + 1 = 1$
$i=1$,更新第二层节点(节点
A
)$update[1]->level[0].span = (rank[0] - rank[1]) + 1 = (4 - 4) + 1 = 1$
$i=2$,更新第三层节点(节点
B
)$update[2]->level[2].span = (rank[0] - rank[2]) + 1 = (4 - 2) + 1 = 3$
$i=3$,更新第三层节点(节点
B
)$update[3]->level[3].span = (rank[0] - rank[3]) + 1 = (4 - 2) + 1 = 3$
超过新节点层级的节点
1 | for (i = level; i < zsl->level; i++) { |
对于这些节点(超过新节点)的层级由于直接跳过新节点,上层能感知到的就是多跨过了一个节点,所以直接进行遍历并自增1即可
最终插入结果如下图
蓝色标注即为新增的指针
查找
函数声明
1 | // zslGetRank 根据给定的分数以及值返回节点的排位 |
函数实现
1 | /* Find the rank for an element by both score and key. |
删除
删除操作和插入操作类似,只是在找到删除节点后,需要将各层节点的前驱节点更新,然后删除节点
函数声明
1 | int zslDelete(zskiplist *zsl, double score, robj *obj); |
函数实现
1 | /* Delete an element with matching score/object from the skiplist. */ |