简介

本文基于 Redis3.0 版本的源码解读跳表的实现

Redis3.0 源码链接:[redis/redis at 3.0.0](https://github.com/redis/redis/tree/3.0.0)

建议阅读:

本文按照以下顺序进行:

背景知识

Redis 内置了多种数据结构,其中`zset`(有序集合,英文名:sorted set)兼具集合的元素唯一性以及有序性(元素有序性)两个特性,被广泛用在日常业务需求的开发中

zset由多个数据结构组合实现的,而跳表(zskiplist)就是实现有序性的关键数据结构,Redis 就是通过跳表实现了zset结构的排序、根据排名的范围获取等操作

Redis 自己实现了跳表结构,为符合自身业务操作的需要增加了一些额外的字段以及操作,虽然和原始论文中的结构不太一样,但基本的操作逻辑都是相同的

源码解读

结构体定义

zset

1
2
3
4
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;

实现zset的结构体定义,从结构体中的成员

  • dictdict):实现集合的特性
  • zslzskiplist):实现有序(排序)的特性

就可以知道zset就是通过组合dictRedis 底层数据结构之一,即哈希表)、zskiplist(跳表)两种数据结构实现的

zskiplist

1
2
3
4
5
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;
  • headertail:头尾指针
  • length:元素个数
  • level:当前跳表的最高层级

zskiplistNode

1
2
3
4
5
6
7
8
9
10
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
robj *obj;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned int span;
} level[];
} zskiplistNode;
  • rojb:元素值
  • score:分数,也就是跳表原始论文中的key
  • backward:回退指针,主要用于反向遍历使用
  • level
    • forward:前进指针
    • span :前进指针的跨度(用来记录对应的前进指针向前时跨过的元素个数,基于这个字段可以很方便的计算排位)

1738048075089

常量定义

1
2
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
Redis 跳跃表的最高层级是32层,层晋升概率为*0.25*

随机函数

1
2
3
4
5
6
7
8
9
10
/* Returns a random level for the new skiplist node we are going to create.
* The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
* (both inclusive), with a powerlaw-alike distribution where higher
* levels are less likely to be returned. */
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;

zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}

通过zslCreate可以创建一个zskiplist指针

插入

Redis 跳表引入了一些额外的字段/机制以符合实际业务操作的需要

函数声明

1
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);

相比于原始论文跳表的实现而言多了以下两块逻辑:

  1. span(跨度)的维护
  2. backward(回退指针)的维护

除了维护上述的两块逻辑,可以发现实际上代码的实现和论文伪代码基本相同

也是分为三个步骤

  1. 遍历跳表,找到符合的节点位置,并记录每一层需要更新的节点(插入的节点对应的前驱节点)(也就是记录update数组)

    • 增加rank数组记录每层的排位
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
    /* store rank that is crossed to reach the insert position */
    // 初始化排位信息,如果是最顶层,则初始化为0,其他情况就是上一层的rank的值
    // 比如从4层开始遍历,rank[3]=0,rank[2]=rank[3]
    // 其实也很好理解,当前层的排位应该基于上一层的排位接着计数
    rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
    while (x->level[i].forward &&
    (x->level[i].forward->score < score ||
    (x->level[i].forward->score == score &&
    compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
    // 这里的比较除了根据score比较之外,还增加了score相同情况下根据obj(值)比较的场景
    // 更新排位,增加跨度就是新的排位
    rank[i] += x->level[i].span;
    x = x->level[i].forward;
    }
    // 记录节点(这个节点就是插入新节点的每一层对应的前驱节点)
    // 如果说新节点的层级超过当前跳表的最高层级,这里是一定记录不到的,所以才有下面单独判断并处理的逻辑
    // 保证代码逻辑统一也让代码更具可读性
    update[i] = x;
    }
  2. 调用随机函数生成新节点层级,如果新节点层级超过当前跳表最高层,则更新update数组(超过最高层的前驱节点就是头节点,但是遍历过程不会记录,为了让后续能够用统一代码逻辑处理,单独进行判断并更新update数组,还有同步更新跳表level字段

    • 增加span字段的初始化逻辑
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // 调用随机函数生成level
    level = zslRandomLevel();
    // 如果新节点level超过当前跳表最高层
    if (level > zsl->level) {
    // 只处理超出当前跳表最高层的部分
    // (剩下的都会在遍历过程中被记录,只有超过的部分不会被记录,所以单独在这里处理)
    for (i = zsl->level; i < level; i++) {
    // 排位初始化,和上面遍历过程的初始化逻辑相同
    rank[i] = 0;
    // 新节点层级是最高的,所以只会有一个元素,前驱节点只能是header
    update[i] = zsl->header;
    // 同理,只有一个元素,所以跨度就是跳表长度(没更新前,跳表长度在最后才更新,所以这里就是原长度)
    update[i]->level[i].span = zsl->length;
    }
    // 更新跳表的最高层级
    zsl->level = level;
    }
  3. 创建新节点,更新跳表

    • 增加span字段的更新
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    x = zslCreateNode(level,score,obj);
    for (i = 0; i < level; i++) {
    // 从下往上更新
    // 以下两句和链表插入的是相同的,就是将新节点插入的逻辑
    x->level[i].forward = update[i]->level[i].forward;
    update[i]->level[i].forward = x;

    /* update span covered by update[i] as x is inserted here */
    // 根据每层节点的排位推算出跨度
    // 更新新节点的跨度
    x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
    // 更新每层前驱节点的跨度(考虑到前驱节点可能是那一层的最后一个节点,所以这里不能简单地直接自增1)
    update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }
  4. Redis 多了最后一个步骤(其余字段的更新)
    1. 更新span跨度
    2. 更新backward
    3. 更新tail
    4. 更新length
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
        /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
    // 对于新节点没有涉及到的层级,跨度直接自增1即可
    update[i]->level[i].span++;
    }
    // 更新BW指针,BW指针没有多层的概念
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
    // 如果新节点不是最后一个节点,那就要更新后继节点BW指向新节点
    x->level[0].forward->backward = x;
    else
    // 新节点是最后一个节点,更新tail
    zsl->tail = x;
    // 跳表长度自增1
    zsl->length++;
    return x;

跨度更新

以下面的跳表插入score=8的节点为例

insert.svg

rank数组:[4, 4, 2, 2]

update数组:[B, B, A, A]AB为节点代号)

新节点

1
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);

针对新节点,需要更新新节点对应的每一层的前进指针的跨度

  1. $i=0$,更新新节点第一层的跨度

    $update[0] = B$

    $update[0]->level[0].span = 1$

    $rank[0] - rank[0] = 4 - 4 = 0$

    $x->level[0].span = 1 - 0 = 1$

  2. $i=1$,更新新节点第二层的跨度

    $update[1] = B$

    $update[1]->level[1].span = 1$

    $rank[0] - rank[1] = 0$

    $x->level[1].span = 1 - 0 = 1$

  3. $i=2$,更新新节点第三层的跨度

    $update[2] = A$

    $update[2]->level[2].span = 3$

    $rank[0] -rank[2] = 4 - 2 = 2$

    $x->level[2].span = 3 - 2 = 1$

  4. $i=3$,更新新节点第四层的跨度

    $update[3] = A$

    $update[3]->level[3].span = 3$

    $rank[0] - rank[3] = 4 - 2 = 2$

    $x->level[3].span = 3 - 2 = 1$

前驱节点

新节点层级内的节点

1
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
  1. $i=0$,更新第一层节点(节点A

    $update[0]->level[0].span = (rank[0] - rank[0]) + 1 = (4 - 4) + 1 = 1$

  2. $i=1$,更新第二层节点(节点A

    $update[1]->level[0].span = (rank[0] - rank[1]) + 1 = (4 - 4) + 1 = 1$

  3. $i=2$,更新第三层节点(节点B

    $update[2]->level[2].span = (rank[0] - rank[2]) + 1 = (4 - 2) + 1 = 3$

  4. $i=3$,更新第三层节点(节点B

    $update[3]->level[3].span = (rank[0] - rank[3]) + 1 = (4 - 2) + 1 = 3$

超过新节点层级的节点

1
2
3
4
for (i = level; i < zsl->level; i++) {
// 对于新节点没有涉及到的层级,跨度直接自增1即可
update[i]->level[i].span++;
}

对于这些节点(超过新节点)的层级由于直接跳过新节点,上层能感知到的就是多跨过了一个节点,所以直接进行遍历并自增1即可

最终插入结果如下图

蓝色标注即为新增的指针

insert-result.svg

查找

函数声明

1
2
// zslGetRank 根据给定的分数以及值返回节点的排位
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o);

函数实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/* Find the rank for an element by both score and key.
* Returns 0 when the element cannot be found, rank otherwise.
* Note that the rank is 1-based due to the span of zsl->header to the
* first element. */
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
zskiplistNode *x;
unsigned long rank = 0;
int i;

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}

/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->obj && equalStringObjects(x->obj,o)) {
return rank;
}
}
return 0;
}

删除

删除操作和插入操作类似,只是在找到删除节点后,需要将各层节点的前驱节点更新,然后删除节点

函数声明

1
int zslDelete(zskiplist *zsl, double score, robj *obj);

函数实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/* Delete an element with matching score/object from the skiplist. */
int zslDelete(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0)))
x = x->level[i].forward;
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
if (x && score == x->score && equalStringObjects(x->obj,obj)) {
zslDeleteNode(zsl, x, update);
zslFreeNode(x);
return 1;
}
return 0; /* not found */
}

参考资料