背景
本文从一个有序单链表的数据结构开始,讲解有序单链表的插入、查询过程,通过插入、查询过程来引出有序单链表对于"高效"插入、查询的问题,然后通过对有序单链表的不断的改造,逐渐形成一个新的数据结构-跳跃表,并推导出跳跃表的、插入、查询时间复杂度,然后引出我们的主题<>的实现,通过对其数据结构、高度算法、插入、查询、删除的逐行源码分析让大家充分的了解其底层的实现,最后再通过跳表跟哈希表、平衡树、Btree之间的对比来说明为什么Redis会选择跳跃表这种数据结构作为Sorted-Set的实现。希望通过本文让大家真正掌握跳表这种数据结构,掌握Redis的Sorted-Set底层实现的原理。
1. 单链表
1.1
单链表数据结构
单链表是一种链式存取的数据结构,用一组地址任意的存储单元存放线性表中的数据元素,链表中的数据是以结点来表示的,每个结点的构成:元素(数据元素的映像) + 指针(指示后继元素存储位置),元素就是存储数据的存储单元,指针就是连接每个结点的地址数据。数据结构如下:
typedef struct node {
int data;//代表数据域
struct LinkNode *next;//代表指针域,指向直接后继元素
}LinkNode;
如图1.1展示了一个单链表:
了解了单链表的数据结构,下面我们通过研究单链表的插入、查询引出其时间复杂度,最后再引出具体单链表的问题。我们先看一下插入过程。
1.2
单链表插入过程
我们先生成一个10、20、30这样的链表:
LinkNode* InitLinkNode(){
//创建一个头结点
LinkNode *p = (LinkNode *)malloc(sizeof(LinkNode));
//声明一个指针指向头结点,用于遍历链表
LinkNode *temp = p;
//生成链表
for (int i = 1; i < 40; i += 10) {
LinkNode *node = (LinkNode *)malloc(sizeof(LinkNode));
node->data = i;
a->next = NULL;
temp->next = a;
temp = temp->next;
}
return p;
}
生成的链表如图1.2:
然后我们插入一个数字25,我看一下过程是什么样的?
LinkNode* insert(LinkNode *p, int data){
LinkNode *temp = p;//创建临时结点temp
while(temp != NULL) {
if (temp->next == NULL || temp->next->data > data)
{
LinkNode *tempNode = (LinkNode *)malloc(sizeof(LinkNode));
tempNode->data = data;
tempNode->next = temp->next;
temp->next = tempNode;
break;
}
temp = temp->next;
}
return temp;
}
第一次对比过程,如图1.3,跟链表的第一个结点最对比,结果10小于25,往后进行对比:
第二次对比过程,如图1.4,跟链表的第二个点做对比,结果20也小于25,继续往后对比:
第三次对比过程,如图1.5,跟链表第三个结点做对比,结果30大于25,那我们就找到了插入的位置在20和30之间:
最后我们把数字25插入到20和30之间,我们看一下最后的链表,如图1.6,我们插入过程的时间复杂度是O(N)。
1.3
单链表的查询过程
同样的,我们看想查找数字25,我看一下过程是什么样的?
int Find(LinkNode *p,int data){
LinkNode *temp = p;
while (temp != NULL) {
if (temp->next != NULL && temp->next->data == data) {
return 1;
}
temp = temp->next;
}
return -1;
}
查询第一次对比过程,如图1.7,25大于第一个结点10,往后对比:
查询第二次对比过程,如图1.8,25大于第二个结点20,继续往后对比:
查询第三次对比过程,如图1.9,25等于第三个结点25,返回。
我们通过程序得知,我们的查询的一个时间复杂度也是O(N)。
1.4
单链表的问题
我们通过分析单链表的插入、查询的过程得知,它们的时间复杂度都是O(N),那假如我们想降低时间复杂度呢?很明显单链表已经不能满足我们的需求了,那我们要怎么做呢?
2. 跳跃表
2.1
跳跃表的形成过程
通过上文我们认识到了一个单链表的结构,并通过分析插入、查询得知他们的时间复杂度都是O(N),如果我们想降低时间复杂度,我们需要怎么做呢?
第一种:转换为哈希表,第二种:转换成平衡树,第三种:转换为B-树,但是这些数据结构都有其不合适的地方(后续3.9章节会有跟各个数据结构的对比情况),如果我们就是希望以链表为基础的数据结构,有其他的合适的数据结构吗?
答案是有的,首先我们回想一下我们的书的目录的结构,下面是(图2.0)是时间简史的目录,假如我们想看黑洞这章节,我们能够很容易知道从267页开始就是讲的黑洞,我们也能够很容易的知道第302页就是第八章宇宙的起源和命运的开始,这其实是我们提前把章节打上了一个索引。
那对于链表呢?我们也可以给它加一下索引来提高我们的查询效率。假如我们现在有一个有序链表,其元素为1、3、5、7、10、12、15、19,我们可以在链表的基础上,每两个结点提取一个结点作为索引,在通过索引层的结点去原始链表去找元素,如图2.1**:
这样我们找一个数字12,没有索引的流程是1 -> 3 -> 5 -> 7 -> 10 -> 12返回,现在的流程是从一级索引开始查:1 -> 5 -> 10 -> 12,查询次数从6次减少到了4次,甚至我们还可以再基于一级索引每两个结点再提取一个结点作为二级索引,如图2.1:
这样我们找一个数字12,流程是: 1 -> 10 -> 12,查询次数从4次减少到了3次,不过我们的数据量现在只有 8 个元素,假如说我们有几千个元素那查询效果就非常明显了。像这种在原来有序链表的基础上增加了多级索引来提高查询效率数据结构,我们称之为跳表,也叫跳跃表,那不知道大家有没有一个疑问,我们怎么去确定我们有多少级的索引(后见章节3.2)?我们跳表的查询,插入时间复杂度是怎么样的呢?下面我们先推导一下跳表的时间复杂度。
2.2
跳跃表的时间复杂度推导
假如我们要找元素12,我们来观察一下我们的查询路径,如图2.3所示,在二级索引和元素1、10对比没找到,在一级索引又进行了元素15对比,还是没有找到,最后再原始链表层又和12进行了对比,找到了元素12。
假如我们的原始链表有N个结点,假如没两个结点我们提取一个结点,可以得出以下结论:
1、那么一级索引结点个数N / 2,那么假如我们的索引有M阶,第M阶索引就是:N / 2^M
2、假如第M阶的结点个数是 2,那么可以得出 N / 2^m = 2,得出时间复杂度为M = log(N) - 1
3、假如我们每次遍历 K个结点,那么得出时间复杂度是O(K * log(N))
4、K一般是个常数,所以最后的时间复杂度是O(log(N))
2.3
跳跃表的插入过程
我们来看一下简单的插入过程,我们插入一个元素11,首先我们先执行查询过程,第一步,跟二级索引的1做比较:如图2.4:
发现1结点小于11新插入的结点,则完后对比,再跟10作对比,发现10结点也小于 11结点,再往后对比,发现10后面是NULL,如图2.5:
则下降一级索引,然后再往后跟15比较,发现15比11大,则再下降一级索引,然后再跟12做对比,发现12也大于11,那我们就找到了我们的插入位置,就是在10、12之间,如图2.6和图2.7:
跟12做对比:如图2.7:
最后我们找到插入位置在元素10和元素12之间,然后就可以进行插入动作了,插入完成后,整个跳表如下图2.8所示:
我们也推导一下插入过程的时间复杂度,通过插入过程我们知道,想要插入一个元素,我们要先找到插入的位置,这个过程是一个查询过程,经过上一节我们的查询一个结点的时间复杂度是O(log(N)),那我们插入一个结点的时间复杂度也是O(log(N))。
通过本小节,我们了解到跳表的形成过程,时间复杂度的推动,插入、查询过程,下面我们通过研究Redis的有序集合的底层存储,来探究一下,跳表是怎么在Redis的应用的。
3. Redis跳跃表底层存储
3.1
Sorted-Set底层存储数据结构
3.1.1 zskiplistNode结构
zskiplistNode是跳表的结点结构,数据结构如下:
typedef struct zskiplistNode {
sds ele;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned long span;
} level[];
} zskiplistNode;
结构属性介绍:
● ele:存的是集合的值
● score:存的一个double类型的排序字段,通过这个字段来进行集合值的排序
● backward:链表指针,指向当天元素的前一个元素
● Level[]:跳表的高度
• forward:指向下一个元素
• span:跨度,当前结点到下一个结点中间元素的个数
3.1.2 zskiplist结构
zskiplist就是跳表,它用来管理整个跳表的高度、长度、头结点、尾结点,其数据结构如下:
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;
结构属性介绍:
● header:指向跳表的头结点
● tail:指向跳表的尾结点
● length:跳表的高度
● level:当前调整的高度
这两个结构就组成了我们Redis中的跳表的数据结构,下面我们先通过一张Redis的Sorted-Set图(图3.1)来分别解释一下每个属性具体的含义:
3.2
Sorted-Set高度算法探究
根据第二节跳表的形成我们了解到可以提取结点来做多层索引来提高查询的效率,我们怎么来确定跳表的高度呢?对于结点不插入和删除,我们又怎么来实时调整跳表的高度呢?我们来看一下Redis是怎么做的。
//跳表最高层级为64
//跳表的随机因子为 0.25
/* Returns a random level for the new skiplist node we are going to create.
* The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
* (both inclusive), with a powerlaw-alike distribution where higher
* levels are less likely to be returned. */
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
根据代码我们先回答一下第一个问题:怎么来确定跳表的高度?回答:随机生成,对没有看错,就是随机生成,但是为了保证查询的性能,我们尽量让找出一套算法能够达到一个O(log(N))的时间复杂度,我们先看一下这个函数,random()会生成一个32位的随机数,跟 0xFFFF做与操作,其实就是把高16位清零,得到一个处于0x0000-0xFFFF的数字,然后每次循环判断生产的这个数字是否小于自己的1/4,如果成立则高度+1,循环结束在判断一下生产的level是否小于 64层,小于的话,就用生成的层数,否则用64层。
我们看一下推导过程,我们定义随机因子为p:
● 结点层数至少为1,而大于1的结点层数,满足一个概率分布。
● 结点层数恰好等于1的概率为1 - p
● 结点层数大于等于2的改为为p,而结点层数等于2的概率为p * (1 - p)
● 结点层数大于等于3的概率为p p,而结点层数等于3的概率为p p * (1-p)
● 结点层数大于等于4的概率为p p p,而结点层数等于4的概率为p p p * (1-p)
● 结点层数大于等于5的概率为p p p p,而结点层数等于5的概率为p p p p * (1-p)
..........
那么,一个结点的平均层数的计算公式如下:
Redis定义的随机因子p是 1/4,那通过这个公式能够得平均高度是:1.33。
3.3
Sorted-Set头结点
初始化源码分析
源代码:创建一个skiplist
/* Create a new skiplist. */
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
//先申请一块内存
zsl = zmalloc(sizeof(*zsl));
//默认给跳表的高度赋值为1
zsl->level = 1;
//默认给跳表的长度赋值为0
zsl->length = 0;
//创建一个头结点,然后跳表指向头结点,可参考函数zslCreateNode
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
//这里是循环的给头结点的每个元素赋值
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
//跳表尾部指向为NULL
zsl->tail = NULL;
return zsl;
}
创建一个skiplistNode结点:
//创建一个skiplistNode结点,有三个参数:level: 层级,score: 排序key,sds: 跳表值
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
//同样先通过传过来的高度申请结点的内存
zskiplistNode *zn =
zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
//给跳表结点score赋值
zn->score = score;
//给跳表结点ele赋值
zn->ele = ele;
return zn;
}
我们通过图3.2来了解一下,头结点初始化后结构是什么样的。
3.4
Sorted-Set插入元素源码分析
3.4.1 插入函数解析
下面我们来通过分析一个元素的插入流程来详细的了解一下源码,我们先看一下整个函数,然后我们会逐步的拆解每一次循环的含义。
/* Insert a new node in the skiplist. Assumes the element does not already
* exist (up to the caller to enforce that). The skiplist takes ownership
* of the passed SDS string 'ele'. */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
//赋值跳表的头结点给变量x
x = zsl->header;
//从最高层开始查找我们合适的位置
for (i = zsl->level-1; i >= 0; i--) {
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
//生产随机的高度
level = zslRandomLevel();
//如果新节点的层数比表中其他节点的层数都要大
//那么初始化表头节点中未使用的层,并将它们记录到 update 数组中
//将来也指向新节点
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
//创建一个新的结点
x = zslCreateNode(level,score,ele);
//插入新结点的过程
for (i = 0; i < level; i++) {
//设置新节点的 forward 指针
x->level[i].forward = update[i]->level[i].forward;
//将旧结点的各个节点的 forward 指针指向新节点
update[i]->level[i].forward = x;
//计算新结点的跨度
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
//更新结点插入之后,旧结点的跨度
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
//调整新结点的backward
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
//更新跳表的长度
zsl->length++;
return x;
}
通过对整体函数的解析,我们发现插入一个元素基本上分为4个步骤:
1、查到插入的位置(查询过程);
2、生产新结点高度并调整跳表的高度;
3、插入元素;
4、调整backward并更新跳表的长度;
下面我们统一逐步分析每一次循环的代码来了解一下整个插入过程。
3.4.2 查到插入的位置
我们假如跳表现在已经有元素1、11、23三个元素了,我们想插入一个新元素21,如图3.3所示:
我们先看一下找到插入位置的源代码:
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
查找节点(score=21,level=2)的插入位置,逻辑如下:
1)第一次for循环,i=1。x现在为跳跃表的头节点;
2)现在i的值与zsl->level-1相等,所以rank[1]的值为0;
3)header->level[1].forward存在,并且header->level[1].forward->score(1)小于要插入值的score,所以while循环可以进入,rank[1]=1,x赋值为第一个节点;
4)第一个节点的第1层的forward指向NULL,所以while循环不会再进入,经过第一次for循环,rank[1]=1,x和update[1]都为第一个节点(score=1)。
5)for循环进入第二次,i=0。x为跳跃表第一个节点(score=1) 6)现在i的值与zsl->level-1不相等,所以rank[0]等于rank[1]的值赋值为1; 6)x->level[0]->forward存在,并且x->level[0].foreard->score(11)小于要插入的score,所以while循环可以进入,rank[0]=2,x为第二个节点(score=11)。
7)x->level[0]->forward存在,并且x->level[0].foreard->score(23)大于要插入的score,所以while不会再进入,经过第二次for循环,rank[0]=2,x和update[0]都为第二个节点(score=11),如图3.5所示:
这个时候我们就找到插入位置在元素11和元素23中间。
3.4.3 调整跳表的高度
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
假如我们生成的高度是3,这个时候我们需要调整一下整个跳表的高度,此时i的值是2,level的值是3,所以我们只能进入一次循环,此时rank[2] = 0,update[2]指向头结点,update[2]->level[2].span = 3,zsl->level = 3,调整完高度,如图3.6所示:
3.4.4 插入元素
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
level的值为3,所以for循环可以执行三遍,插入过程如下:
for循环第一遍:
1)x的level[0]的forward为update[0]的level[0]的forward节点,即x->level[0].forward为score=23的节点;
2)update[0]的level[0]的下个节点为新插入的节点;
3)rank[0]的值为0,update[0]->level[0].span=1,所以x->level[0].span=1、update[0]->level[0].span=1。
我们再看一下经过第一次for循环,我们的结构,如图3.7所示:
for循环第二遍:
1)x的level[1]的forward为update[1]的level[1]的forward节点,即x->level[1].forward为NULL;
2)update[1]的level[1]的下个节点为新插入的节点;
3)rank[1]的值为1,update[1]->level[1].span等于2,所以x->level[1].span=1
4)update[1]->level[1].span=2。
我们再看一下经过第二次for循环,我们的结构,如图3.8所示:
for循环第三遍:
1)x的level[2]的forward为update[2]的level[2]的forward节点,即x->level[2].forward为NULL;
2)update[2]的level[2]的下个节点为新插入的节点;
3)rank[2]的值为2,因为update[2]->level[2].span等于跳跃表的总长度3,所以x->level[2].span=1;
4)update[2]->level[2].span=3。
我们再看一下经过第三次for循环,我们的结构,如图3.9所示:
至此,我们的插入过程完成。
3.4.5 调整backward并更新跳表的长度
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
x->backward的值等于,判断update[0] == zsl->header是否相等,如果想当代表是个空跳表,则赋值为NULL,如果不是的话,就把update[0]的赋给它,也就是说指向score = 11的backward。
第二步判断是否是尾结点,如果不是则调整对应的backward,如果是就更新跳表的尾部指向。
最后再更新跳表的length = 4,调整完的结构如图3.10所示:
好了,我们的插入元素整个源码分析到此告一段落,本小节通过分析插入元素21的过程带大家分析每一句源码的涵义,希望对大家了解插入过程有所帮助。
3.5
Sorted-Set查询
一个元素源码分析
下面通过zslGetRank函数来看一下查询一个元素过程,下面是具体的源代码:
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
//先把头结点保存起来
x = zsl->header;
//从跳表的最高层开始循环查找
for (i = zsl->level-1; i >= 0; i--) {
//先从最高层开始查找
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
/* x might be equal to zsl->header, so test if obj is non-NULL */
//如果查找到相同的元素,则直接返回,rank
if (x->ele && sdscmp(x->ele,ele) == 0) {
return rank;
}
}
//否则返回0
return 0;
}
我们可以通过源代码来看,其实查找一个元素跟我们插入过程中查找插入位置的代码基本一致,具体查询过程此小节就不详细展开了,(可以查看插入过程查找插入位置的详细讲解)。
3.6
Sorted-Set查询
多个元素源码分析
3.6.1 整体源码分析
查询多个元素我们通过zrange和zrevrange的命令的实现来进行源码分析:
void zrangeGenericCommand(client *c, int reverse) {
robj *key = c->argv[1];
robj *zobj;
int withscores = 0;
long start;
long end;
long llen;
long rangelen;
//中间省略一下无用的代码
.....
//获取跳表当前的长度,请参数小节3.9.1
llen = zsetLength(zobj);
//初始化一下开始游标和结束游标
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
//异常判断
if (start > end || start >= llen) {
addReply(c,shared.emptymultibulk);
return;
}
//得到获取元素的个数
if (end >= llen) end = llen-1;
rangelen = (end-start)+1;
/* Return the result in form of a multi-bulk reply */
addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
//如果是压缩列表,这次我们不关心压测列表,可以先跳过这段代码
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
if (reverse)
eptr = ziplistIndex(zl,-2-(2*start));
else
eptr = ziplistIndex(zl,2*start);
serverAssertWithInfo(c,zobj,eptr != NULL);
sptr = ziplistNext(zl,eptr);
while (rangelen--) {
serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
if (vstr == NULL)
addReplyBulkLongLong(c,vlong);
else
addReplyBulkCBuffer(c,vstr,vlen);
if (withscores)
addReplyDouble(c,zzlGetScore(sptr));
if (reverse)
zzlPrev(zl,&eptr,&sptr);
else
zzlNext(zl,&eptr,&sptr);
}
//如果是跳表
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
sds ele;
/* Check if starting point is trivial, before doing log(N) lookup. */
//如果逆序,则从后往前取
if (reverse) {
ln = zsl->tail;
//如果start不是从0开始,则先要找到start的那个起点的结点
if (start > 0)
//通过rank获取该结点,请参考3.9.2
ln = zslGetElementByRank(zsl,llen-start);
} else {
//从第一个结点开始
ln = zsl->header->level[0].forward;
if (start > 0)
ln = zslGetElementByRank(zsl,start+1);
}
//循环这次去除取数据的个数,直到元素取完
while(rangelen--) {
//断言
serverAssertWithInfo(c,zobj,ln != NULL);
ele = ln->ele;
//把数据写到对client的缓冲区中,批量返回请参考3.9.3
addReplyBulkCBuffer(c,ele,sdslen(ele));
//假如带了withscores参数
if (withscores)
//把结点的score(double类型)也同时写到缓冲区,请参考3.9.4
addReplyDouble(c,ln->score);
//下一个结点,如果从头那,则用forward找到下一个结点,如果是从尾部拿,则用backward找打前一个结点
ln = reverse ? ln->backward : ln->level[0].forward;
}
} else {
serverPanic("Unknown sorted set encoding");
}
}
这个过程就非常简单了,我们通过两张图来分别描述一下从头取元素,从尾取元素的过程。
3.6.2 Zrange查询过程
命令:zrange mytest 0 -1,从头部取元素的过程,1 -> 11 -> 21 -> 23 结束。如图3.12**:
3.6.3 Zrevrange查询过程
命令:zrevrange mytest 0 -1,从尾部取元素的过程,23 -> 21 -> 11 -> 1 结束。如图3.13**:
3.7
Sorted-Set删除元素源码分析
3.7.1 删除结点整体源码分析
本小节通过删除一个元素的过程来分析一下源代码,看具体过程是什么样的,首先我们还是来介绍一下删除函数的源代码:
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
//同样先从最高层找到要删除元素
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
//设置span和forward
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node)
//请参数3.9.5
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}
删除结点的过程我们通过源代码能够得知分为两个步骤:
1、查找要删除的结点;
2、设置span和forward;
假如我们想删除score = 21,ele = hij的的过程是什么样的?请接着往下看:
3.7.2 查找删除结点源码分析
先看一下具体代码:
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
//同样先从最高层找到要删除元素
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
中间每次循环的查询过程省略了(如果想了解请查看3.4小节或者3.6小节),通过循环赋值,最后得出update[2] 指向header、update[1] 指向score = 1的结点、update[0]指向score = 11的结点,如图3.14所示:
3.7.3 设置span和forward
先看一下具体的源代码:
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
//更新每个update结点的span和forward
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
//调整backward
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}
我们看一下更新每个update结点后的span和forward之后,跳的结构是什么样的?如图3.15所示:
最后再看一下调整backward之后,跳的结构是什么样的?如图3.16所示:
至此,我们的删除结点动作完成。
3.8
Redis为什么要选择跳表
通过对第二章和第三章节的了解,我们知道Redis的有序集合使用跳表这样的数据结构,那为什么Redis要使用跳表呢?有没有更好的选择呢?
3.8.1 跳表 VS 哈希表
哈希表也叫散列表,可以根据一个key找到具体的一个value,它通过一定的算法把key映射到不同的Hash桶,在Hash冲突不是很大的时间它的时间复杂度是O(1)。
哈希表可以很容易的通过一个key,来找到相对应的value,那为什么不用哈希表呢?
1、哈希表无序,我们的想实现一个有序的数据结构,这个不满足需求
2、没办法范围查询
基于有序和范围查询的特性跳表更具优势。
3.8.2 跳表 VS 平衡树
平衡树是两个子树高度差绝对值不超过 1,并且左右两个子树也都是平衡二叉树这样的数据结构,为了保持树的平衡需要在插入、删除的时候做左旋、右旋的操作来让树保持平衡,红黑树更复杂(具体红黑树的结构这里不做详细赘述)
时间复杂度对比:
数据结构 | 查询 | 插入 | 删除 |
跳表 | O(log(N)) | O(log(N)) | O(log(N)) |
平衡树 | O(log(N)) | O(log(N)) | O(log(N)) |
我们通过对比发现,几乎平衡树在查询、插入、删除的时间复杂度跟跳表一模一样,那我们为啥不选择平衡树呢?
1、实现难易,跳表实现比较简单,平衡树实现很复杂
2、范围查询方便,跳表只需要通过找到范围开始结点,然后顺着前后把元素找齐就行了,平衡树如果范围查询,则很困难,我们先要找到指定范围的小值之后,再通过中序遍历继续寻找其他不超过大值的结点。
针对数据结构的实现、范围查询这两点来看,跳表更具优势。
3.8.3 跳表 VS Btree
Btree是为了磁盘存储设置设计的一种多叉平衡查找树,像我们了解的数据库Mysql的引擎Myisam就是B树,Innodb是B+树(图3.17),我们其实很容易能够想到我们的Mysql不仅支持单键查询,也支持范围查询。那我们是不是可以用B+树来实现呢?
其实是为了更省空间,Redis是内存型软件,不是基于硬盘索引这点也要考虑,Btree的结点出了数据还有左右两个指针,而跳表只有一个一个向后的指针,占用的空间更少。所以针对空间跳表更具优势。
3.8.4 作者的选择的原因
1) They are not very memory intensive. It's up to you basically. Changing parameters about the probability of a node to have a given number of levels will make then less memory intensive than btrees.(更省内存)
2) A sorted set is often target of many ZRANGE or ZREVRANGE operations, that is, traversing the skip list as a linked list. With this operation the cache locality of skip lists is at least as good as with other kind of balanced trees.(范围查询比平衡树方便)
3) They are simpler to implement, debug, and so forth. For instance thanks to the skip list simplicity I received a patch (already in Redis master) with augmented skip lists implementing ZRANK in O(log(N)). It required little changes to the code.About the Append Only durability & speed, I don't think it is a good idea to optimize Redis at cost of more code and more complexity for a use case that IMHO should be rare for the Redis target (fsync() at every command). Almost no one is using this feature even with ACID SQL databases, as the performance hint is big anyway.About threads: our experience shows that Redis is mostly I/O bound. I'm using threads to serve things from Virtual Memory. The long term solution to exploit all the cores, assuming your link is so fast that you can saturate a single core, is running multiple instances of Redis (no locks, almost fully scalable linearly with number of cores), and using the "Redis Cluster" solution that I plan to develop in the future.(更简单)
根据这三个小节的论述相信大家已经对Redis选择跳表而不是哈希表、平衡树、Btree的原因已经有了较深的了解了吧。
3.9
公共函数源码解析
3.9.1 获取跳表的长度
unsigned long zsetLength(const robj *zobj) {
unsigned long length = 0;
//如果是压缩列表
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
length = zzlLength(zobj->ptr);
//如果是跳表
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
//直接返回结构跳表的长度
length = ((const zset*)zobj->ptr)->zsl->length;
} else {
serverPanic("Unknown sorted set encoding");
}
return length;
}
3.9.2 按照Rank查到该结点
/* Finds an element by its rank. The rank argument needs to be 1-based. */
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;
int i;
x = zsl->header;
//也是同样的从最高层开始寻找
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
{
traversed += x->level[i].span;
x = x->level[i].forward;
}
//如果找到rank相等的,返回该结点
if (traversed == rank) {
return x;
}
}
return NULL;
}
3.9.3 批量增加写数据到缓冲区
/* Add a C buffer as bulk reply */
void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
addReplyLongLongWithPrefix(c,len,'$');
addReplyString(c,p,len);
addReply(c,shared.crlf);
}
3.9.4 写double类型的数据到缓冲区
/* Add a double as a bulk reply */
void addReplyDouble(client *c, double d) {
char dbuf[128], sbuf[128];
int dlen, slen;
if (isinf(d)) {
/* Libc in odd systems (Hi Solaris!) will format infinite in a
* different way, so better to handle it in an explicit way. */
addReplyBulkCString(c, d > 0 ? "inf" : "-inf");
} else {
dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
addReplyString(c,sbuf,slen);
}
}
3.9.5 释放跳表结点
void zslFreeNode(zskiplistNode *node) {
sdsfree(node->ele);
zfree(node);
}
4. 结语、参考资料
4.1
结语
本文先从链表开始,描述一个链表插入和查询的过程,推导出来链表的时间复杂度是O(N),然后从设置问题怎么样提高时间复杂度呢?通过《时间简史》的目录受到启发,可以在链表提取结点形成多级索引,来提高我们的查询和插入的效率,到最后推出我们真正要讲的sorted-set的源码实现,通过对sorted-set的底层数据结构,插入过程,查询单个元素过程,查询多个元素过程,删除一个sorted-set的结点,以及最后一小节跟红黑树之间的对比让大家能够对Redis的有序集合底层有个彻底的了解,希望大家能够有所收获,由于笔者水平有限,文章中难免会有一些错误或者不准确的地方,望各位大佬指正批评。
4.2
参考内容
图书:
[1] 作者.陈雷 书名《Redis 5设计与源码分析》 M.版次:2019年8月第一版第一次印刷。出版地: 北京市西城区。出版者:机械工业出版社。出版年:2019年
官方源代码:
https://github.com/redis/redis/blob/unstable/src/server.h
https://github.com/redis/redis/blob/unstable/src/server.c
https://github.com/redis/redis/blob/unstable/src/t_set.c
https://github.com/redis/redis/blob/unstable/src/ziplist.c
https://github.com/redis/redis/blob/unstable/src/ziplist.h
往期推荐
+
+
+
+
致力于互联网教育技术的创新和推广
扫码关注我们 / @学而思网校技术团队
微信扫一扫
关注该公众号