1 跳表的特点
在 Redis 5.x 的源码中, zset (order set) 的编码为 ziplist 或 skiplist (Redis 6.0 后变为 listpack 或 dict + skiplist)。
skiplist 的特点
- 本身是一种空间换时间的有序数据结构
- 本身可以达到了二分查找的效果, 时间复杂度在 O(log n), 最坏 O(n), 和平衡树一样
- 实现逻辑简单, 数据变更时, 数据的再平衡 (有序) 比平衡树简单很多
而 skiplist 本质可以看作是一个有序链表, 保留着链表的变更简单, 同时通过空间换时间的方式提升查询的效率。
注: 下面的分析也是按照 Redis 5.x 进行分析。
2 跳表的查询
既然 skiplist 是本质是一个有序链表, 我们先简单的了解一下有序链表。
如上图, 是一个简单的有序链表。
所有的数据按照升序/降序的方式组织在一起, 每个节点都有一个指向下一个节点的指针, 最后一个节点指向了 null。
如果需要从当前的链表中查找 22, 从第一个节点开始一直往下找 1 -> 4 -> 6 -> 19 -> 22, 需要经过 5 个节点。
有没有办法加速这个查询的过程吗?
制定一个这样的规则:
- 从当前的链表随机选择几个节点重新构成一个新的有序链表
- 第一个节点必定被选中
假设, 原本的有序链表, 变成了如下:
这时, 可以从新的链表中查找 22, 同样从第一个节点开始找 1 -> 6 -> 19 -> 22, 经过链表的节点少了, 这次只需要经过 4 个节点。
这样可以加速查找, 那么是否可以以新的链表为原型, 按照相关的规则, 再次构建一条更短的有序链表
假设, 这时的有序链表变为
这时, 同样是查询 22, 只需要经过 2 个节点 (1 -> 22)
制定这样的规则, 现在看起来可以加速查询的效率, 但是有个问题: 要查找的数据不在新建的链表中。
还是这张图, 假设现在我们现在需要查找的数据是 54。
这时如果直接从先建的第 2 条链表中查找, 是查询不到的, 但是 54 实际是存在的。
所以需要继续追加规则
- 新增层的概念, 同一条链表为同一层, 原始的链表为 0 层 (也可以定为 1 层), 逐层递增
那么最新的链表变为:
再加一条规则
- 通过 0 层产生的节点之间是一个整体, 一个整体内, 高层的节点可以下降到低一层的节点, 但是无法往上升
最新的链表又变成如下:
加两条查询的规则
- 查找数据时, 必须从最高层开始找
- 查找过程中, 通过和同一层的后一个元素比较大小, 决定在同一层继续往后找, 还是在同一个整体的下一层
在最新的链表中, 查找 54 时, 过程如下:
- 从最高层的的第 1 个节点开始, 也就是第 2 层的第 1 个节点 1 开始
- 此时第 2 层第 1 个节点 1 的后一个节点为 22, 查找的数字比 22 大, 跳到同一层的下一个节点 22
- 此时到了第 2 层第 2个节点 22, 它的后一个节点为 null, 只能下到自己的下一层的节点, 也就是第 1 层的第 4 个节点 22
- 此时到了第 1 层第 4 个节点 22 , 同样后一个节点为 null, 只能下到自己的下一层的节点, 也就是第 1 层的第 5 个节点 22
- 此时到了第 0 层第 5 个节点 22 , 后一个节点为 54, 找到了
此时经过的节点为 1 -> (22, 22, 22, 一个整体的 3 层) -> 54, 可以看作经过了 5 个节点。
看起来和直接查找 1 -> 4 -> 6 -> 19 -> 22 -> 54 的 5 个节点一样。
但是在实际中, 一个整体的几个层的节点可以用数组进行组织, 达到随机访问的效果, 也就是这里实际是 3 个节点的。
最后的结构基本就是跳表结构的雏形, 从中可以大体到跳表的结构的性质, 这里对跳表结构做一个定义
- 跳表本质是一个有序的单向链表
- 链表中的每个节点除了存放自己的排序值, 还有若干个层节点, 层节点存储着当前节点的排序值, 和同一层的下一个节点或者 null
- 查询时, 从第一个节点的最高层开始, 通过比较当前层的下一个节点, 确定是在往同一层的下一个节点, 还是同一个节点的下一层查询
3 Redis 对跳表的实现
具体的源码可以查看 t_zset.c 这个文件。
3.1 跳表中的几个对象的定义
3.1.1 跳表本身的定义
可以看到跳表本身的定义很简单
- header 跳表的头节点
- tail 跳表的尾节点
- length 跳表中第一层 (数据最完整的) 的节点长度
- level 跳表中节点中的最高层数
typedef struct zskiplist {
// 跳表的头节点 和 跳表的尾节点
struct zskiplistNode *header, *tail;
// 跳表的节点长度, 只算第一层的节点
unsigned long length;
// 跳表的节点中的最高层数
int level;
} zskiplist;
3.1.2 跳表中的节点定义
节点的定义也很简单
- ele 节点存储的数据
- score 节点的分值
- backward 节点的前置节点, 主要用于从表尾向表头遍历时使用
- levels 跳表的特点, 层节点
typedef struct zskiplistNode {
// 节点存储的数据
sds ele;
// 当前节点的分值
double score;
// 当前节点的前置节点, 也就是前一个节点
// 主要用于从表尾向表头遍历时使用
struct zskiplistNode *backward;
// 当前节点层节点
struct zskiplistLevel levels[];
}zskiplistNode;
3.1.3 跳表节点中的层定义
层节点的定义
- forward 当前层节点指向的下一个跳表节点
- span 当前跳表节点和 forward 所指的跳表节点之间跳过了多少个节点
struct zskiplistLevel {
// 当前层节点指向的下一个跳表表节点, 不是具体的层节点
struct zskiplistNode *forward;
// 以第一层为标准 (第一层数据是最全的, 有多少数据, 第一层就有多少个节点)
// 当前层节点指向的链表节点, 中间跳过了多少个跳表节点
unsigned long span;
} zskiplistLevel;
span 的定义了解
假设现在有一个跳表如下
跳表中有 4 个节点 (头节点不算), 分别存储了 A, B, C, D, 每个节点上面的 L 代表了层节点, 括号里面的数字就是 span 的值。
SkipListNode-1 这个节点的 L4 层, 下一个节点为 null, 到达 null 中间跨过了 SkipListNode-2, SkipListNode-3, SkipListNode-4, null 本身, 总共 4 个节点, 所以 SkipListNode-1 的 L4 层的 span 为 4。
同理 SkipListNode-1 节点的 L3 层, 下一个节点为 SkipListNode-3, 中间跨过了 SkipListNode-2, SkipListNode-3 本身, 所以对应的 span 为 2。
到这里, 可以理解层节点的 span 的含义: 当前跳表节点和 forward 所指的跳表节点 (也就是下一个跳表节点) 之间跨过了多少个节点, 跨过的节点本身也算是一个节点。
3.2 创建一个跳表对象
/**
* 跳表创建
* @return 创建的跳表
*/
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
// 为 zskiplist 分配内存, 可以看为 Java 中的 new zskiplist()
zsl = zmalloc(sizeof(*zsl));
// 当前跳表的最大层高, 默认为 1, 头节点不算
zsl->level = 1;
// 当前跳表的节点长度, 默认为 0, 头节点不算
zsl->length = 0;
// 创建头节点
// ZSKIPLIST_MAXLEVEL, 跳表中节点的最大层数, 默认为 32
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
// 初始头节点的所有层节点的数据
// 下一个链表节点为 null, 跳过的节点 span = 0
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
// 头节点的前置节点为 NULL
zsl->header->backward = NULL;
// 跳表的尾节点设置为 NULL
zsl->tail = NULL;
return zsl;
}
/**
* 创建链表节点
* @param level 创建的节点的层高
* @param score 创建的节点的分值
* @param ele 创建的节点的内容
* @return 创建的节点
*/
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
// 为跳表的节点分配内存
zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
// 设置跳表节点的分数为入参的 score, 内容为入参的 ele
zn->score = score;
zn->ele = ele;
return zn;
}
从图中可以看到, Redis 跳表的初始状态
- Redis 中限制了节点的层高最大为 32 层 (层数会消耗空间的, 需要有一个合理的限制值)
- 跳表的头节点, 默认就是一个最大层高的节点, 但是里面所有内容都指向了 null
这个逻辑很简单, 创建了一个层高 32 的跳表节点, 跳表的头节点执行这个节点, 没了, 节点内的数据都为默认值。
3.3 向跳表添加第一个元素
/**
* 向跳表中新增一个节点
* @param zsl 新增节点的跳表
* @param score 分值
* @param ele 内容
*/
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
// update 保存的是新增节点每一层的前一个节点 (也就是这个节点的下一个节点就是新增的节点)
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
// rank[i] 存储的是头节点到 update[i] 的中间跨过的跳表节点
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
// zsl->level 为当前跳表除头结点外的最大层数
// 1. 通过遍历跳表, 找到新增节点的应该保存的位置
// 从头节点的这个最大层数出发, 向后或向下遍历, 找到新增节点时, 每一层的前一个节点, 同时记录头节点到这个前一个节点的跨过的节点数
for (i = zsl->level-1; i >= 0; i--) {
// for 循环控制同一个节点的下层遍历, 也就是向下遍历
// 下面的 while 循环控制同一层的遍历, 也就是向后遍历
// 把上一层经过的跨过节点数继承下来, 然后加上这一层后面的节点(有的话), 就是这一层真正跨过的节点数
// 从 for 循环进来, level - 1, 也就是向下遍历, 向下遍历前的节点和向下遍历达到的节点, 只是层数不一样, 实际还是同一个节点,
// 所以原本节点的上一层的跨过的节点数继承下来, 再加上这一层后面处理的节点数, 就是这一层真正跨过的节点数
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// 先比较分数, 分数相同的话, 再比较内容
while (x->level[i].forward
&& (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) {
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
// 新增位置的前一个节点就是 x 了
update[i] = x;
}
// 2. 通过随机函数, 获取新增节点的层高, 如果随机出来的层高比现在的最大的层高还高, 多对出来的层进行初始化
level = zslRandomLevel();
// 随机出来的层高比现在的层高还高的话
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
// 更新最高层数
zsl->level = level;
}
// 3. 创建新增节点, 保存到跳表表中
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
// 链表的基本操作, 修改节点的上下一个节点
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
// 更新新增节点每一层的 span, 也就是新增节点的每一层到下一个节点所跨过的节点数
// 从上面的 for 循环可以看出, rank[i] 保存的是头节点到 update[i] 节点, 中间经过的节点数
// rank[0] 第一层, update[0] 的下一个节点就是新增的节点, 中间是没有跨过任何其他节点的
// rank[0] - rank[i], 可以得到 update[i] 到新增节点中间跨过的节点数
// 这时 update[i] 对应层的 span 减去 上一步计算出来的节点就是新增节点的 span 了
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
// 4. 更新插入节点的前置节点的其他层的跨度
// 上面的 zslRandomLevel() 可能随机出一个比原本层高还低的层数
// 这时新增了一个节点, 那么原本在新增节点前面的节点的 span 都要加 1
// 小于等于随机出来的 level 的在上一步处理了, 大于随机出的 level, 但是小于原本的 level 的, 这里处理
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
// 5. 确定新增节点的前一个节点
x->backward = (update[0] == zsl->header) ? NULL : update[0];
// 6. 更新跳表的头尾节点
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
/**
* 返回一个随机值, 用于确定节点的层高
* 规则: 层数越高的概率越低
* 设负载因子 ZSKIPLIST_P = p, 这里等于 0.25
* 第 n 层的概率为 = p ^ (n - 1) * (1 - p)
* 第一层 = 0.25 ^ (1 - 1) * (1 - 0.25) = 0.25
* 第二层 = 0.25 ^ (2 - 1) * (1 - 0.25) = 0.1875
* 第三层 = 0.25 ^ (3 - 1) * (1 - 0.25) = 0.140625
* ...
* 第三十二层 = 0.25 ^ (32 - 1) * (1 - 0.25) = 0.000000000
*/
int zslRandomLevel(void) {
// 层数最低为 1
int level = 1;
// 0xFFFF = 0000 0000 0000 0000 1111 1111 1111 1111
// ZSKIPLIST_P = 0.25f
while ((random() & 0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
//
return (level < ZSKIPLIST_MAXLEVEL ) ? level : ZSKIPLIST_MAXLEVEL;
}
假设现在有一个跳表如上, 需要往里面新增一个 score = 4 的节点, 这个节点的层高不定, 层节点里面的下一个节点和跨节点数不确定
- 新增节点在绿色节点的后面
- 新增节点的层高不确定, 但是可以知道, 新增节点时, 前 5 层的上一个节点分别为 Node2, Node2, Node2, Node1, Node1, 这些数据就存在了 update[] 数组中
- 确定了 update[] 时, 那么也可以确定头节点到这些节点跨过的节点数: 3, 3, 3, 1, 1 这些数据存在 rank[] 数组中
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
for (i = zsl->level-1; i >= 0; i--) {
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward
&& (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) {
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
}
就是为了计算出上面 update 和 rank 的数据。
3.4 指定范围的查询
/**
* 从跳表中找到第一个符合范围的节点
* @param zsl 跳表
* @param range 范围
*/
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
int i;
// 判断跳表的分值是否包含了查询范围内的值
if (!zslIsInRange(zsl,range))
return NULL;
x = zsl->header;
// 从头节点出发, 找到第一个节点的下一个节点的分值大于等于范围的最小值的节点
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && !zslValueGteMin(x->level[i].forward->score,range))
x = x->level[i].forward;
}
// 获取符合条件的节点
x = x->level[0].forward;
serverAssert(x != NULL);
// 判断节点的分值是否小于等于范围的最大值, 不符合返回 NULL
if (!zslValueLteMax(x->score,range))
return NULL;
return x;
}
/**
* 判断跳表节点的分值是否在范围内
* @param zsl 跳表
* @param range 范围
* @return 1: 存在 0: 不存在
*/
int zslIsInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
// 入参的范围值有问题, 直接返回不存在
if (range->min > range->max || (range->min == range->max && (range->minex || range->maxex)))
return 0;
// 获取尾巴节点, 也就是分值最大的节点
x = zsl->tail;
// 尾节点不存在或者节点的分数小于范围的最小值, 不存在
if (x == NULL || !zslValueGteMin(x->score,range))
return 0;
// 获取头节点的下一个节点, 也就是分值最小的节点
x = zsl->header->level[0].forward;
// 节点不存在, 或者节点的分数大于范围的最大值, 不存在
if (x == NULL || !zslValueLteMax(x->score,range))
return 0;
// 返回存在
return 1;
}
/**
* 判断入参的数值是否大于或大于等于范围的最小值
* @param value 入参的数值
* @param spec 范围
*/
int zslValueGteMin(double value, zrangespec *spec) {
// minex 是否可以等于判断值, 1: 大于 0: 大于等于
return spec->minex ? (value > spec->min) : (value >= spec->min);
}
/**
* 判断入参的数组是否小于或小于等于范围的最大值
*/
int zslValueLteMax(double value, zrangespec *spec) {
// maxex 是否可以等于判断值, 1: 小于 0: 小于等于
return spec->maxex ? (value < spec->max) : (value <= spec->max);
}
到这里, Redis 对跳表的实现的大体逻辑基本就分解完了。
4 问题
4.1 跳表的最大层数为 32
本身跳表的层节点是需要消耗空间的, 所以需要有一个上限。
而 Redis 提供的确定层高的函数
- 第一层的概率为 100%
- 第二层的概率为 25%
- 第五层的概率为 7.9%
- 第十三层的概率为 0.7%
- 第二十一层的概率为 0.07%
想达到 32 层基本不可能的。
4.2 Redis中 为啥不用红黑树二用跳表
作者原文, 概括如下
- 跳表占用的内存会比红黑树多, 但是多的内存很有限
- 跳表的实现比红黑树简单
- 范围查询的支持, 比红黑树简单