Redis ZipList 编码


1 介绍

官方的介绍

The ziplist is a specially encoded dually linked list that is designed
to be very memory efficient. It stores both strings and integer values,
where integers are encoded as actual integers instead of a series of
characters. It allows push and pop operations on either side of the list
in O(1) time. However, because every operation requires a reallocation of
the memory used by the ziplist, the actual complexity is related to the
amount of memory used by the ziplist.

ziplist 是一个经过特殊编码的内存高效的双向链表。
它同时存储字符串和整数值, 其中整数被编码为实际整数, 而不是一系列字符。
它允许在 O(1) 时间内对列表的两边进行 push 和 pop 操作。
但是, 因为每个操作都需要重新分配 ziplist 所使用的内存, 所以实际的复杂性与 ziplist 所使用的内存数量有关。

ziplist 是一个双向链表。但是它不存储指向上一个链表节点和指向下一个链表节点的指针, 而是存储上一个节点长度和当前节点长度。
通过牺牲部分读写性能, 来换取高效的内存空间利用率, 是一种时间换空间的思想。

注: 下面的分析也是按照 Redis 5.x 进行分析。

2 ziplist 的实现逻辑

在 Redis 中, ziplist 虽然是一个双向链表, 却是通过一个 char[] 数组实现的, 先后遍历时, 借助在存储数据时冗余的上一个节点长度和当前节点长度, 计算后得到上下节点的位置。
所以, 在 Reids 中, **ziplist == char[]**。

首先明确一点, 在 C 语言中, char 类型只占 8 位 (这个很重要, 因为下面的内容基本都是会涉及到很多的字节内容), 整个 char[] 数组会被按照下面的格式进行划分。

Alt 'char[] 实现 ziplist 的方式'

整个 char[] 数组的划分如下:

  1. 0 ~ 3 位, 4 个字节, 叫做 zlbytes, 表示当前整个 ziplist 的字节长度, 也就是整个数组的长度, 因此压缩列表最多有 2^32 - 1 个字节
  2. 4 ~ 7 位, 4 个字节, 叫做 zltail, 表示当前 ziplist 的起始位置到最后一个 entry 元素的起始位置相差的字节数, 也就是 ziplist 的起始位置 + zltail = ziplist 最后一个 entry 的起始位置 (简单理解就是, 数组的第几位是最后一个 entry 的开始位置)
  3. 8 ~ 9 位, 2 个字节, 叫做 zllen, 表示当前整个 ziplist 中的 entry 个数, 也就是整个 ziplist 最多有 2^16 - 1 个 entry
  4. 10 ~ n 位, 字节数不定, 就是真正存储数据的 entry 集合
  5. 最后 1 位, 1 个字节, 叫做 zlend, 表示整个 ziplist 的结束位, 固定为 0xFF (255, 1111 1111)

整个数组中, 只有 entry 是存储数据的地方。

2.1 entry 的结构

整个 ziplist 的大体布局了解完了, 看一下存储数据的 entry, 整个 entry 的话分成了 3 部分

  1. previous_entry_length: 当前 entry 的前一个 entry 的占的字节长度
  2. encoding: 当前 entry 存储的数据内容是什么类型, 大体有 2 种:整数和字节数组 (也就是字符串)
  3. content: 需要存储的内容

previous_entry_length

表示前一个 entry 的字节长度, 会占 1 个或者 5 个字节, 占的字节数取决于上一个 entry 的的字节长度。
当前一个 entry 的长度小于 254 字节时, 用 1 个字节表示。
当前一个 entry 的长度大于或等于 254 字节时, 用 5 个字节来表示, 这 5 个字节中第一个固定为 0xFE (254, 1111 1110)

encoding

当前 entry 的编码, 不同的编码, 表示后面的 content 是不同的内容, 会占 1, 2 或者 5 个字节, 占的字节取决于存储在当前 entry 内的内容的格式

encoding 占的字节数 encoding 的二进制 二进制的前 2 位的 表示的内容
1 00 bbbbbb 00 长度最大为 63 的字节数组, encoding 后面的 6 位用来存储字节数组的长度
1 1100 0000 11 int_16 的整数
1 1101 0000 11 int 32 的整数
1 1110 0000 11 int 64 的整数
1 1111 0000 11 int 24 位的整数
1 1111 1110 11 8 位的整数
1 1111 bbbb 11 后面的 bbbb 的取值范围为 0001 到 1101 (避免和上面的 24 位和 8 位整数的影响), 表示 1 - 13, 这个编码表示的是后面没有 content 字段, 值存储在 encoding 后面 4 位, 表示值为 0 - 12, 进制值减 1
2 01 bbbbbb aaaa aaaa 01 长度最大为 2^14 - 1 的字节数组, 2 个字节的 encoding 后面的 14 位用来存储字节数组的长度
4 10_ bbbb bbbb aaaa aaaa cccc cccc dddd dddd 10 长度最大为 2^32 - 1 的字节数组, 5 个字节的 encoding 用后面的 4 个字节, 32 位用来存储字节数组的长度

从上面的编码可以看出, encoding 的第一个字节的前二位可以确定后面 content 的数据类型了, 11xx xxxx 为整数, 不是 11xx xxxx 就是字节数组, 字节数组再判断前二位, 知道具体的长度类型, 最终得到了存储数据长度。

content
没什么好说的, 真正存储的数据。

2.1.1 entry 代码中的实现 zlentry

通过上面的 entry 组成分析可以知道, 整体的数据都是压缩在 entry 中的, 这个存储没问题。
但是在代码的处理中, 不可能说每次需要用就进行进行一次解析, 比如需要内容的长度, 解析对应的 entry, 需要内容的具体值, 再解析对应的 entry。
这个基本不实际, 一般都是直接将一个 entry 解析完成, 在整个方法域中都能起作用。

Redis 5.x 中, entry 会被解析成 ZlEntry 对象, 解析后, 在整个方法的执行中都能通过这个 zlentry 获取到整个 entry 的内容。

zlentry 的定义

typedef struct zlentry {

    /** previous_entry_length 占用了多少个字节, 取值为 1 或者 5 */
    unsigned int prevrawlensize; 

    /** previous_entry_length 的表示的值 */
    unsigned int prevrawlen; 

    /** encoding 占用了多少个字节  1,2,5 */
    unsigned int lensize; 

    /** content 占用的字节数 */
    unsigned int len; 

    /** 
     * content 的数据类型, 用一些枚举替代, 不保存具体的值
     * 取值范围:
     * 0000 0000 长度最大为 63 的字节数组
     * 0100 0000 最大长度为 2^14 - 1 的字节数组 
     * 1000 0000 最大长度为 2^32 - 1 的字节数组
     * 1100 0000 整数
     */
    unsigned char encoding; 

    /** 头部的长度占用的长度, 等于 prevrawlensize + lensize */
    unsigned int headersize; 

    /** 当前 entry 在字节数组中的开始位置, 指向这个 entry 的指针开始 */
    unsigned char *p;

} zlentry;

Redis 5.x 的源码中, entry 解析为 zlentry 的方法有 2 个 zipEntryzipEntrySafe, 2 个的逻辑的一样的。
不同的是后面的 zipEntrySafe 会加上字节长度的校验, 确保当前 entry 的内容不会超过当前存储内容的 char[] 数组的长度。

下面的代码, 经过调整, 逻辑不变, 但是相对更容易理解, 所以和源码有点区别

/**
 * zlentry 解析
 * @param p 就是需要解析的 entry 的指针地址, 转为为 Java 就是 可以理解为一个 int, 表示在 char[] 数组的这个位置 entry 的开始位置
 * @param e 就是一个 zlentry 的地址引用, 将解析后的结果存入到这个 zlentry
 */
void zipEntry(unsigned char *p, zlentry *e) {

    // 从 p 的开始位置, 也就是 previous_entry_length 的位置解析出 prevrawlensize 和 prevrawlen, 存入到 e 对应的位置
    zip_decode_prevlen(p, e->prevrawlensize, e->prevrawlen);

    // 从 p + prevrawlensize 的位置, 也就是到了 encoding 的位置, 解析出 encoding,
    zip_entry_encoding(p + e->prevrawlensize, e->encoding);

    // 从 p + prevrawlensize 的位置, 也就是到了 encoding 的位置, 解析出 encoding, lensize, len 存入到 e 对应的位置
    zip_decode_length(p + e->prevrawlensize, e-> encoding, e->lensize, e->len);

    // e 的 headersize 的值等于 prevrawlensize + lensize 的值
    e->headersize = e->prevrawlensize + e->lensize; 

    // e 的 p 值等于入参的 p 值, 也就是 entry 开始的位置
    e->p = p;
}

void zip_decode_prevlen(ptr, prevlensize, prevlen) {

    // 解析 previous_entry_length 占的字节数
    // ZIP_BIG_PREVLEN = 0xfe, 1111 1110 
    // 上面说过了 previous_entry_length 的字节长度只有 1 和 5
    // 如果 5 个字节的话, 第一位必定是 0xfe
    // 所以 第一位如果是 0xfe, 那么 previous_entry_length 必定是 5 个字节, 否则就是 1 个字节
    (prevlensize) = (prt)[0] < ZIP_BIG_PREVLEN ? 1 : 5;

    if ((prevlensize) == 1) {
        // 1 个字节, 那么第一个字节就是 previous_entry_length 的值
        (prevlen) = (prt)[0]
    } else {
        // 不是 1 个字节, 那么就是 5 个字节, 取后面的 4 个字节拼接成一个 int 值
        (prevlen) = ((prt)[4] <<24)| ((prt)[3] <<16) | ((prt)[2] <<8)  | ((prt)[1])  
    }
}

void zip_entry_encoding(ptr, encoding) {

    // 注意此处的 ptr 不是 entry 的开始位置, 而是 entry + previous_entry_length 后的位置, 也就是 encoding 的开始位置
    // encoding 等于当前 encoding 的第一个字节
    (encoding) = ((ptr)[0]);

    // ZIP_STR_MASK = 0xc0, 二进制 1100 0000
    // 当前 ptr[0] 小于 1100 0000, 表示 encoding 当前表示的是字节数组
    if ((encoding) < ZIP_STR_MASK) {
        // encoding & 1100 0000, 只取前 2 位, 即可
        (encoding) &= ZIP_STR_MASK; 
    }
}


void zip_decode_length(ptr, encoding, lensize, len) {

    // encoding < 1100 0000, 字节数组
    if ((encoding) < ZIP_STR_MASK) {  

        if ((encoding) == ZIP_STR_06B) {

            // ZIP_STR_06B = 0, 二进制 0000 0000
            // 结果: 字节数组, 长度最大为 63 的字节数组, encoding 只需要使用 1 个字节存储
            (lensize) = 1;                                            
            // 0x3f = 63, 二进制 0011 1111, & 上 0011 1111, 得到 encoding 去掉前 2 位后的值, 也就是 content 的字节长度          
            (len) = (ptr)[0] & 0x3f; 

        } else if ((encoding) == ZIP_STR_14B) {  

            // ZIP_STR_14B = 64, 二进制 0100 0000
            // 结果: 字节数组, 2^14 - 1 的字节数组, encoding 需要使用 2 个字节存储
            (lensize) = 2;                                             
            // 得到存储在 encoding 2 个字节中 content 字节长度        
            (len) = (((ptr)[0] & 0x3f) << 8) | (ptr)[1];

        } else if ((encoding) == ZIP_STR_32B) {

            // ZIP_STR_32B = 128, 二进制 10000000
            // 结果: 字节数组, 2^32 - 1 的字节数组, encoding 需要使用 5 个字节存储
            (lensize) = 5;  
            (len) = ((ptr)[1] << 24) | ((ptr)[2] << 16) | ((ptr)[3] <<  8) | ((ptr)[4]); 
        } else {

            // 异常情况处理
            (lensize) = 0;
            (len) = 0;
        }    

    } else {

        // 整数处理
        // 整数的话, 默认 encoding 只需要一个字节
        (lensize) = 1;

        // ZIP_INT_8B = 254, 二进制 11111110, 表示 8 位的整数, content 只需要 1 个字节
        if ((encoding) == ZIP_INT_8B)  (len) = 1; 
        
        // ZIP_INT_16B = 192, 二进制 1100 0000, 表示 16 位的整数, content 需要 2 个字节
        else if ((encoding) == ZIP_INT_16B) (len) = 2;                         
        
        // ZIP_INT_24B = 240, 二进制 1111 0000, 表示 24 位的整数, content 需要 3 个字节
        else if ((encoding) == ZIP_INT_24B) (len) = 3;  

        // ZIP_INT_32B = 208, 二进制 1101 0000, 表示 32 位的整数, content 需要 4 个字节
        else if ((encoding) == ZIP_INT_32B) (len) = 4;                         

        // ZIP_INT_64B = 224, 二进制 1110 0000, 表示 64 位的整数, content 需要 5 个字节
        else if ((encoding) == ZIP_INT_64B) (len) = 8; 

        // ZIP_INT_IMM_MIN = 241, 二进制 1111 0001 
        // ZIP_INT_IMM_MAX = 253, 二进制 1111 1101
        // 1111 0001 <= encoding <= 1111 1101, 没有 content,值直接存储在 encoding 
        else if (encoding >= ZIP_INT_IMM_MIN && encoding <= ZIP_INT_IMM_MAX)
            (len) = 0;
        else 
        // 异常情况处理
            (lensize) = (len) = 0;    
    }
}

上面的就是 entry 解码为 zlentry 的过程, 同样的 zlentry 编码为 entry, 就是过程逆过来的处理, 就不展开了。

3 ziplist 节省了哪些空间

我们可以知道 ziplist 是一个双向链表, 可以从 2 侧遍历。
通过 zltail 可以获取到最后一个 entry 的起始位置, 同时通过 entry 的 previous_entry_length 属性, 可以得到上一个 entry 的字节长度, 从而达到从后到前的遍历。
而从 ziplist 的 char[] 数组的第 10 位开始就是第一个 entry, 通过解码每一个 entry, 获取到每一个 entry 的长度, 也可以得到下一个 entry 的开始位置, 从而达到从前往后的遍历。
通过这种方式达到了用数组实现双向链表的操作。

这种方式每次遍历都需要解码编, 浪费了时间, 影响执行效率, 但是节省了空间, 那么节省了哪些空间呢?

在传统的双向链表中的每一个节点需要

  1. 指向前一个节点的指针, 64 位的系统中, 这个指针需要 8 个字节的内存
  2. 指向后一个节点的指针, 同样是 8 个字节的内存
  3. 节点中的内容指针, 指向存储的字符串的指针, 8 个字节
  4. C 语言中, 字符串的特殊处理, 末尾加上 \0 的结束符

最终需要 25 个字节的辅助, 假设现在有 3 个字符串 one, two, three, 内存中的情况如下:

Alt '普通链表的内存形式'

而通过上面的 entry 进行存储时

  1. previous_entry_length: 受前面的 entry 的长度影响, 但是最大为 5 个字节
  2. encoding: 受当前存储的 entry 的内容影响, 最大为 5 个字节
  3. content: 可以不需要借助 \0 结束
    所以最终最坏的情况 (不考虑里面存储的内容) 需要 10 个字节的辅助。

4 ziplist 在使用上的一些情况

下面以插入 (伪代码) 为例, 进行分析

/**
 * @param zl 插入的压缩列表
 * @param p 插入的位置指针, 原位置的 entry 变为当前插入 entry 的后一个节点
 * @param s 需要插入的字符串
 * @param slen 插入的字符串的长度
 */
unsigned char *ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {

    // 向压缩列表插入元素有 4 种情况
	// 1. 列表没有数据的插入
	// 2. 列表有数据, 在尾部插入
	// 3. 列表有数据, 在中间插入
	// 4. 列表有数据, 在首部插入

    // 插入位置当前的 entry 的 previous_entry_length 占的字节数
    int prevlensize = 0;

    // 插入位置当前的 entry 的 previous_entry_length 的值, 也就是当前插入的位置前一个 entry 的字节长度
    int prevlen = 0;

        // ZIP_END = 0xff, 二进制 1111 1111
    if (p[0] != ZIP_END) {

        // 插入的位置不是压缩列表的尾部, 插入的情况 3, 4
        
        // 获取当前插入位置的 entry 的 previous_entry_length
        // 获取到的 prevlen 就等于插入 entry 的 previous_entry_length
        zip_decode_prevlen(p, prevlensize, prevlen);

    } else {
        
        // 插入的位置是压缩列表的尾部, 插入的情况 1,2

        // 获取到 zltail 指向的位置, zltail 一般指向的是最后一个 entry 的起始位置
        // 当没有 entry 在当前压缩列表, zltai 指向的位置就是 zllen, 值为 0xff
        char *ptail = getZlTailPos(zl);

        if (ptail[0] != ZIP_END) {

            // ptail 也就是 zltail 对应的位置的第一个字节不是 0xff, 表示插入的压缩列表有数据了, 情况 2
            // 获取 ptail 位置 entry 的字节长度, 也就是插入 entry 的 previous_entry_length
            prevlen = getPreEntryLength(zl, ptail);
        }
    }

    // 给予一个初始值, 避免警告, 同时给一个容易排查的值, 便于后面没有初始时排查
    long long value = 123456789;
    
    // 插入的 entry 的编码
    char encoding = 0;
    
    // 插入的 entry 的 content 需要的字节长度
    int reqlen = 0;

    // zipTryEncoding 的作用
    // 尝试将传入的 s 转为整数, 如果可以转换将转换后的值放到 value, 同时将其编码放到 encoding, 同时返回 true
    // 不能转换, 返回 false
    if (zipTryEncoding(s, slen, &value, &encoding)) {
        // 可以转换
        // 通过 encoding 获取当前的 int 需要的字节数, 也就是 content 的字节长度
        reqlen = zipIntSize(encoding);

    } else {
        // 不可以转换   
        // 那么字符串的长度是多少, 需要的字节数就是多少
        reqlen = slen;
    }

    // 加上上一个 entry 的长度
    // reqLen 的长度 = previous_entry_length + content
    reqlen += prevlen;

    // 加上当前的 encoding 需要的字节数
    // previous_entry_length + encoding + content, 这时候 reqLen 就是当前存储内容需要的字节数
    reqlen += getEncodingLength(encoding, slen);

    // 当前的位置已经有 entry 的情况
    // 插入的位置的当前的 entry, 将会变为插入 entry 的后一个 entry 
    // 那么当前的 entry 的 previous_entry_length 的字节数存储插入 entry 的字节长度 reqlen, 还需要多少个字节
    
    // 三种情况 4, -4, 0
    int nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;


    // 获取当前的压缩列表的字节长度
    int curlen = getZlbytes(zl);

    // 最新的压缩列表需要的在字节长度
    newlen = curlen + reqlen + nextdiff;

    // 按照新内存重新分配一个新的压缩列表, 会将旧的压缩列表的内容全部拷贝到新的压缩列表
    zl = ziplistResize(zl,newlen);

    // 更新 zlBytes 属性
    updateZlBytes(zl);
    // 更新 zlend 属性
    updateZlend(zl, newlen);

    // 插入的位置不是尾部, 需要迁移数据
    if (p[0] != ZIP_END) {

        // 空出一个 reqlen + nextdiff 的空间, 存放插入的 entry
        memmove(p+reqlen, p - nextdiff, curlen - offset - 1 + nextdiff);

        // 更新当前需要插入的位置的下一个 entry 的 previous_entry_length
        updateNextEntryPreviousEntryLength(p + reqlen, reqlen);

        // 更新 zlTail 属性
        updateZlTail(getZlTail() + reqlen + nextdiff);
    }

    // 当后一个 entry 的 previous_entry_length 的字节有变动, 进行连续更新的检查
    if (nextdiff != 0) {

        // 连续更新检查, 如果需要进行连续更新处理
        // 连续更新后面聊
        zl = ziplistCascadeUpdate(zl,p+reqlen);
    }

    // 更新当前 entry 的 previous_entry_length
    updateCurEntryPreviousEntryLength(p, prevlen);

    // 更新当前 entry 的 encoding
    updateCurEntryEncoding(p, encoding, slen);

    // 更新当前 entry 的 content
    if (zip2Str(encoding)) {
        // 字符串内容
        memcpy(p, s, slen);
    } else {
        // 整数内容
        zipSaveInteger(p, value, encoding);
    }

    // 更新 zlLen 属性
    updateZlLen(getZlLen() + 1);
    return zl;
}

从中可以看到整个新增的过程

  1. 如果插入的位置, 后面有 entry, 获取插入位置的 entry 的 previous_entry_length, 这个就是等一下插入 entry 的 previous_entry_length 值
  2. 如果插入的位置, 后面没有 entry, 需要获取到前一个 entry 的字节长度, 这个长度就是等一下插入 entry 的 previous_entry_length 值
  3. 获取当前插入的 entry 的编码 encoding
  4. 计算插入的 entry 需要的字节长度
  5. 计算插入的 entry 需要的字节长度 存入到插入位置的 entry 的 previous_entry_length, 还需要多少个字节 (0, 4, -4)
  6. 重新分配新的压缩列表, 更新 zlbytes, zlend
  7. 将新分配的压缩列表的要插入位置后面的内容, 往后移动, 空出需要插入的 entry 的空间
  8. 更新当前需要插入的位置的下一个 entry 的 previous_entry_length 为插入 entry 的字节长度
  9. 更新新的压缩列表的 zltail
  10. 当前需要插入的位置的下一个 entry 的 previous_entry_length 变动了, 进行连续更新的检查
  11. 逐步将需要插入的 entry 的 previous_entry_length, encoding, content 更新到新的压缩列表
  12. 更新新的压缩列表的 zllen

整个过程很复杂

  1. 涉及到插入 entry 的解码, 编码
  2. 新压缩列表的内存分配和旧数据的迁移
  3. 新的 entry 的编码
  4. 还有一个连续更新的问题

连锁更新

假设现在有一个列表 A(p_e_l=5) B(p_e_l=5) C(p_e_l=5), 这时候如果插入一个只需要 1 个字节的 entry, I(p_e_l=5)。
A(p_e_l=1) A 需要的字节数变小了, 假设刚好变为只需要 1 个字节,那么后面的 B 的 previous_entry_length 也需要变小,
导致 B 也变为只需要 1 个字节了, 导致 C 也需要变小, 从而引起了后面的连续更新

连锁更新会导致多次重新分配内存及数据复制,效率很低, 出现的情况在新增, 删除, 更新都有可能。
但是出现这种情况的概率是很低的, 所以 Redis 并没有为了避免连锁更新而采取措施。

上面的就是新增 entry 的过程, 基本可以从这个过程推导出更新删除的过程。


  目录