Redis SkipList 编码


1 跳表的特点

Redis 5.x 的源码中, zset (order set) 的编码为 ziplist 或 skiplist (Redis 6.0 后变为 listpack 或 dict + skiplist)。

skiplist 的特点

  1. 本身是一种空间换时间有序数据结构
  2. 本身可以达到了二分查找的效果, 时间复杂度在 O(log n), 最坏 O(n), 和平衡树一样
  3. 实现逻辑简单, 数据变更时, 数据的再平衡 (有序) 比平衡树简单很多

而 skiplist 本质可以看作是一个有序链表, 保留着链表的变更简单, 同时通过空间换时间的方式提升查询的效率。

注: 下面的分析也是按照 Redis 5.x 进行分析。

2 跳表的查询

既然 skiplist 是本质是一个有序链表, 我们先简单的了解一下有序链表。

Alt '有序链表'

如上图, 是一个简单的有序链表。
所有的数据按照升序/降序的方式组织在一起, 每个节点都有一个指向下一个节点的指针, 最后一个节点指向了 null。

如果需要从当前的链表中查找 22, 从第一个节点开始一直往下找 1 -> 4 -> 6 -> 19 -> 22, 需要经过 5 个节点。

有没有办法加速这个查询的过程吗?

制定一个这样的规则:

  1. 从当前的链表随机选择几个节点重新构成一个新的有序链表
  2. 第一个节点必定被选中

假设, 原本的有序链表, 变成了如下:

Alt '双层有序链表'

这时, 可以从新的链表中查找 22, 同样从第一个节点开始找 1 -> 6 -> 19 -> 22, 经过链表的节点少了, 这次只需要经过 4 个节点。

这样可以加速查找, 那么是否可以以新的链表为原型, 按照相关的规则, 再次构建一条更短的有序链表

假设, 这时的有序链表变为

Alt '三层有序链表'

这时, 同样是查询 22, 只需要经过 2 个节点 (1 -> 22)

制定这样的规则, 现在看起来可以加速查询的效率, 但是有个问题: 要查找的数据不在新建的链表中

Alt '三层有序链表'

还是这张图, 假设现在我们现在需要查找的数据是 54。
这时如果直接从先建的第 2 条链表中查找, 是查询不到的, 但是 54 实际是存在的。

所以需要继续追加规则

  1. 新增的概念, 同一条链表为同一层, 原始的链表为 0 层 (也可以定为 1 层), 逐层递增

那么最新的链表变为:

Alt '明确三层分离的有序链表'

再加一条规则

  1. 通过 0 层产生的节点之间是一个整体, 一个整体内, 高层的节点可以下降到低一层的节点, 但是无法往上升

最新的链表又变成如下:

Alt '明确三层分离和节点有序链表'

加两条查询的规则

  1. 查找数据时, 必须从最高层开始找
  2. 查找过程中, 通过和同一层的后一个元素比较大小, 决定在同一层继续往后找, 还是在同一个整体的下一层

Alt '简易跳表查询'

在最新的链表中, 查找 54 时, 过程如下:

  1. 从最高层的的第 1 个节点开始, 也就是第 2 层的第 1 个节点 1 开始
  2. 此时第 2 层第 1 个节点 1 的后一个节点为 22, 查找的数字比 22 大, 跳到同一层的下一个节点 22
  3. 此时到了第 2 层第 2个节点 22, 它的后一个节点为 null, 只能下到自己的下一层的节点, 也就是第 1 层的第 4 个节点 22
  4. 此时到了第 1 层第 4 个节点 22 , 同样后一个节点为 null, 只能下到自己的下一层的节点, 也就是第 1 层的第 5 个节点 22
  5. 此时到了第 0 层第 5 个节点 22 , 后一个节点为 54, 找到了

此时经过的节点为 1 -> (22, 22, 22, 一个整体的 3 层) -> 54, 可以看作经过了 5 个节点。
看起来和直接查找 1 -> 4 -> 6 -> 19 -> 22 -> 54 的 5 个节点一样。

但是在实际中, 一个整体的几个层的节点可以用数组进行组织, 达到随机访问的效果, 也就是这里实际是 3 个节点的。

最后的结构基本就是跳表结构的雏形, 从中可以大体到跳表的结构的性质, 这里对跳表结构做一个定义

  1. 跳表本质是一个有序的单向链表
  2. 链表中的每个节点除了存放自己的排序值, 还有若干个节点, 层节点存储着当前节点的排序值, 和同一层的下一个节点或者 null
  3. 查询时, 从第一个节点的最高层开始, 通过比较当前层的下一个节点, 确定是在往同一层的下一个节点, 还是同一个节点的下一层查询

3 Redis 对跳表的实现

具体的源码可以查看 t_zset.c 这个文件。

3.1 跳表中的几个对象的定义

3.1.1 跳表本身的定义

Alt 'SkipList 声明'

可以看到跳表本身的定义很简单

  1. header 跳表的头节点
  2. tail 跳表的尾节点
  3. length 跳表中第一层 (数据最完整的) 的节点长度
  4. level 跳表中节点中的最高层数
typedef struct zskiplist {

    // 跳表的头节点 和 跳表的尾节点
    struct zskiplistNode *header, *tail;

    // 跳表的节点长度, 只算第一层的节点
    unsigned long length;

    // 跳表的节点中的最高层数
    int level;
} zskiplist;

3.1.2 跳表中的节点定义

Alt 'SkipNode 声明'

节点的定义也很简单

  1. ele 节点存储的数据
  2. score 节点的分值
  3. backward 节点的前置节点, 主要用于从表尾向表头遍历时使用
  4. levels 跳表的特点, 层节点
typedef struct zskiplistNode {

    // 节点存储的数据
    sds ele;

    // 当前节点的分值
    double score;

    // 当前节点的前置节点, 也就是前一个节点
    // 主要用于从表尾向表头遍历时使用
    struct zskiplistNode *backward;

    // 当前节点层节点    
    struct zskiplistLevel levels[];

}zskiplistNode;

3.1.3 跳表节点中的层定义

Alt 'SkipListLevel 声明'

层节点的定义

  1. forward 当前层节点指向的下一个跳表节点
  2. span 当前跳表节点和 forward 所指的跳表节点之间跳过了多少个节点
struct zskiplistLevel {

    // 当前层节点指向的下一个跳表表节点, 不是具体的层节点
    struct zskiplistNode *forward;

    // 以第一层为标准 (第一层数据是最全的, 有多少数据, 第一层就有多少个节点)
    // 当前层节点指向的链表节点, 中间跳过了多少个跳表节点 
    unsigned long span;
} zskiplistLevel;

span 的定义了解

假设现在有一个跳表如下

Alt '分析 Span 的跳表'

跳表中有 4 个节点 (头节点不算), 分别存储了 A, B, C, D, 每个节点上面的 L 代表了层节点, 括号里面的数字就是 span 的值。

SkipListNode-1 这个节点的 L4 层, 下一个节点为 null, 到达 null 中间跨过了 SkipListNode-2, SkipListNode-3, SkipListNode-4, null 本身, 总共 4 个节点, 所以 SkipListNode-1 的 L4 层的 span 为 4。

同理 SkipListNode-1 节点的 L3 层, 下一个节点为 SkipListNode-3, 中间跨过了 SkipListNode-2, SkipListNode-3 本身, 所以对应的 span 为 2。

到这里, 可以理解层节点的 span 的含义: 当前跳表节点和 forward 所指的跳表节点 (也就是下一个跳表节点) 之间跨过了多少个节点, 跨过的节点本身也算是一个节点。

3.2 创建一个跳表对象

/**
 * 跳表创建
 * @return 创建的跳表
 */
zskiplist *zslCreate(void) {

    int j;
    zskiplist *zsl;
    // 为 zskiplist 分配内存, 可以看为 Java 中的 new zskiplist()
    zsl = zmalloc(sizeof(*zsl));

    // 当前跳表的最大层高, 默认为 1, 头节点不算
    zsl->level = 1;
    // 当前跳表的节点长度, 默认为 0, 头节点不算
    zsl->length = 0;

    // 创建头节点
    // ZSKIPLIST_MAXLEVEL, 跳表中节点的最大层数, 默认为 32
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    // 初始头节点的所有层节点的数据
    // 下一个链表节点为 null, 跳过的节点 span = 0
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        zsl->header->level[j].forward = NULL;
        zsl->header->level[j].span = 0;
    }
    // 头节点的前置节点为 NULL
    zsl->header->backward = NULL;
    // 跳表的尾节点设置为 NULL
    zsl->tail = NULL;
    return zsl;
}

/**
 * 创建链表节点
 * @param level 创建的节点的层高
 * @param score 创建的节点的分值
 * @param ele   创建的节点的内容
 * @return 创建的节点
 */
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
    // 为跳表的节点分配内存
    zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    // 设置跳表节点的分数为入参的 score, 内容为入参的 ele
    zn->score = score;
    zn->ele = ele;
    return zn;
}

Alt '初始跳表'

从图中可以看到, Redis 跳表的初始状态

  1. Redis 中限制了节点的层高最大为 32 层 (层数会消耗空间的, 需要有一个合理的限制值)
  2. 跳表的头节点, 默认就是一个最大层高的节点, 但是里面所有内容都指向了 null

这个逻辑很简单, 创建了一个层高 32 的跳表节点, 跳表的头节点执行这个节点, 没了, 节点内的数据都为默认值。

3.3 向跳表添加第一个元素

/**
 * 向跳表中新增一个节点
 * @param zsl 新增节点的跳表
 * @param score 分值
 * @param ele  内容 
 */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    
    // update 保存的是新增节点每一层的前一个节点 (也就是这个节点的下一个节点就是新增的节点)
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;

    // rank[i] 存储的是头节点到 update[i] 的中间跨过的跳表节点
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    
    x = zsl->header;
    
    // zsl->level 为当前跳表除头结点外的最大层数
    // 1. 通过遍历跳表, 找到新增节点的应该保存的位置 
    // 从头节点的这个最大层数出发, 向后或向下遍历, 找到新增节点时, 每一层的前一个节点, 同时记录头节点到这个前一个节点的跨过的节点数
    for (i = zsl->level-1; i >= 0; i--) {

        // for 循环控制同一个节点的下层遍历, 也就是向下遍历
        // 下面的 while 循环控制同一层的遍历, 也就是向后遍历
        
        
        // 把上一层经过的跨过节点数继承下来, 然后加上这一层后面的节点(有的话), 就是这一层真正跨过的节点数
        // 从 for 循环进来, level - 1, 也就是向下遍历, 向下遍历前的节点和向下遍历达到的节点, 只是层数不一样, 实际还是同一个节点,
        // 所以原本节点的上一层的跨过的节点数继承下来, 再加上这一层后面处理的节点数, 就是这一层真正跨过的节点数
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];

        // 先比较分数, 分数相同的话, 再比较内容
        while (x->level[i].forward 
            && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        // 新增位置的前一个节点就是 x 了
        update[i] = x;
    }

    // 2. 通过随机函数, 获取新增节点的层高, 如果随机出来的层高比现在的最大的层高还高, 多对出来的层进行初始化
    level = zslRandomLevel();
    // 随机出来的层高比现在的层高还高的话
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        // 更新最高层数
        zsl->level = level;
    }

    // 3. 创建新增节点, 保存到跳表表中
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        // 链表的基本操作, 修改节点的上下一个节点
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        // 更新新增节点每一层的 span, 也就是新增节点的每一层到下一个节点所跨过的节点数

        // 从上面的 for 循环可以看出, rank[i] 保存的是头节点到 update[i] 节点, 中间经过的节点数
        // rank[0] 第一层, update[0] 的下一个节点就是新增的节点, 中间是没有跨过任何其他节点的
        // rank[0] - rank[i], 可以得到 update[i] 到新增节点中间跨过的节点数
        // 这时 update[i] 对应层的 span 减去 上一步计算出来的节点就是新增节点的 span 了
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    // 4. 更新插入节点的前置节点的其他层的跨度
    // 上面的 zslRandomLevel() 可能随机出一个比原本层高还低的层数
    // 这时新增了一个节点, 那么原本在新增节点前面的节点的 span 都要加 1
    // 小于等于随机出来的 level 的在上一步处理了,  大于随机出的 level, 但是小于原本的 level 的, 这里处理
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    // 5. 确定新增节点的前一个节点
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    
    // 6. 更新跳表的头尾节点
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;

    zsl->length++;
    return x;
}

/**
 * 返回一个随机值, 用于确定节点的层高
 * 规则: 层数越高的概率越低
 * 设负载因子 ZSKIPLIST_P = p, 这里等于 0.25
 * 第 n 层的概率为 = p ^ (n - 1) * (1 - p)
 * 第一层   = 0.25 ^ (1 - 1)  * (1 - 0.25) = 0.25
 * 第二层   = 0.25 ^ (2 - 1)  * (1 - 0.25) = 0.1875
 * 第三层   = 0.25 ^ (3 - 1)  * (1 - 0.25) = 0.140625
 * ...
 * 第三十二层 = 0.25 ^ (32 - 1) * (1 - 0.25) = 0.000000000
 */
int zslRandomLevel(void) {
    // 层数最低为 1
    int level = 1;
    // 0xFFFF = 0000 0000 0000 0000 1111 1111 1111 1111
    // ZSKIPLIST_P = 0.25f
    while ((random() & 0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    // 
    return (level < ZSKIPLIST_MAXLEVEL ) ? level : ZSKIPLIST_MAXLEVEL;
}

Alt '分析跳表新增'

假设现在有一个跳表如上, 需要往里面新增一个 score = 4 的节点, 这个节点的层高不定, 层节点里面的下一个节点和跨节点数不确定

  1. 新增节点在绿色节点的后面
  2. 新增节点的层高不确定, 但是可以知道, 新增节点时, 前 5 层的上一个节点分别为 Node2, Node2, Node2, Node1, Node1, 这些数据就存在了 update[] 数组中
  3. 确定了 update[] 时, 那么也可以确定头节点到这些节点跨过的节点数: 3, 3, 3, 1, 1 这些数据存在 rank[] 数组中
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    for (i = zsl->level-1; i >= 0; i--) {
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward 
            && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
}

就是为了计算出上面 update 和 rank 的数据。

3.4 指定范围的查询

/**
 * 从跳表中找到第一个符合范围的节点
 * @param zsl 跳表
 * @param range 范围
 */
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {

    zskiplistNode *x;
    int i;

    // 判断跳表的分值是否包含了查询范围内的值
    if (!zslIsInRange(zsl,range)) 
        return NULL;

    x = zsl->header;
    // 从头节点出发, 找到第一个节点的下一个节点的分值大于等于范围的最小值的节点
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward && !zslValueGteMin(x->level[i].forward->score,range))
            x = x->level[i].forward;
    }

    // 获取符合条件的节点
    x = x->level[0].forward;
    serverAssert(x != NULL);

    // 判断节点的分值是否小于等于范围的最大值, 不符合返回 NULL
    if (!zslValueLteMax(x->score,range)) 
        return NULL;
    return x;
}

/**
 * 判断跳表节点的分值是否在范围内
 * @param zsl 跳表
 * @param range 范围
 * @return 1: 存在 0: 不存在
 */
int zslIsInRange(zskiplist *zsl, zrangespec *range) {
    zskiplistNode *x;

    // 入参的范围值有问题, 直接返回不存在
    if (range->min > range->max || (range->min == range->max && (range->minex || range->maxex)))
        return 0;
    // 获取尾巴节点, 也就是分值最大的节点
    x = zsl->tail;

    // 尾节点不存在或者节点的分数小于范围的最小值, 不存在
    if (x == NULL || !zslValueGteMin(x->score,range))
        return 0;
    // 获取头节点的下一个节点, 也就是分值最小的节点
    x = zsl->header->level[0].forward;
    // 节点不存在, 或者节点的分数大于范围的最大值, 不存在
    if (x == NULL || !zslValueLteMax(x->score,range))
        return 0;

    // 返回存在
    return 1;
}

/**
 * 判断入参的数值是否大于或大于等于范围的最小值
 * @param value 入参的数值
 * @param spec 范围
 */
int zslValueGteMin(double value, zrangespec *spec) {
    // minex 是否可以等于判断值, 1: 大于 0: 大于等于
    return spec->minex ? (value > spec->min) : (value >= spec->min);
}

/**
 * 判断入参的数组是否小于或小于等于范围的最大值
 */
int zslValueLteMax(double value, zrangespec *spec) {
    // maxex 是否可以等于判断值, 1: 小于 0: 小于等于
    return spec->maxex ? (value < spec->max) : (value <= spec->max);
}

到这里, Redis 对跳表的实现的大体逻辑基本就分解完了。

4 问题

4.1 跳表的最大层数为 32

本身跳表的层节点是需要消耗空间的, 所以需要有一个上限。
而 Redis 提供的确定层高的函数

  1. 第一层的概率为 100%
  2. 第二层的概率为 25%
  3. 第五层的概率为 7.9%
  4. 第十三层的概率为 0.7%
  5. 第二十一层的概率为 0.07%

想达到 32 层基本不可能的。

4.2 Redis中 为啥不用红黑树二用跳表

作者原文, 概括如下

  1. 跳表占用的内存会比红黑树多, 但是多的内存很有限
  2. 跳表的实现比红黑树简单
  3. 范围查询的支持, 比红黑树简单

  目录