HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 学习 Redis

    • Redis 手写实现学习指南
    • 快速开始
    • Redis 架构总览与线程模型
    • RESP 协议与网络通信
    • 事件循环与 I/O 多路复用
    • 底层数据结构设计
    • 字符串:SDS 简单动态字符串
    • 哈希表与字典实现
    • 列表与跳表实现
    • 有序集合:Sorted Set 实现
    • 内存管理与对象系统
    • RDB 持久化机制
    • AOF 持久化机制
    • 混合持久化策略
    • 分布式锁实现
    • 缓存一致性策略
    • 主从复制机制
    • 哨兵模式实现
    • 内存优化与 GC 调优

列表与跳表实现

学习目标

  • 理解有序数据结构的性能瓶颈与解决方案
  • 掌握跳表的设计思想与概率平衡原理
  • 对比分析跳表与平衡树的优劣
  • 理解 Redis 为什么选择跳表实现 Sorted Set
  • 实现完整的跳表数据结构

一、问题背景:我们需要什么样的数据结构?

1.1 业务场景分析

在实际应用中,我们经常需要维护一个有序的数据集合,并支持以下操作:

操作类型业务场景示例性能要求
插入元素游戏排行榜新增玩家要快 O(log n)
删除元素移除过期数据要快 O(log n)
查找元素检查用户排名要快 O(log n)
范围查询获取前100名要快 O(log n + k)
顺序遍历按分数遍历要快 O(n)

典型应用场景:

// 1. 游戏排行榜:按分数排序的玩家列表
// ZADD leaderboard 9500 "player1"
// ZRANGE leaderboard 0 99 WITHSCORES  // 查询前100名

// 2. 延迟队列:按时间戳排序的任务队列
// ZADD delay_queue 1699999999 "task1"
// ZRANGEBYSCORE delay_queue 0 <current_time>  // 获取到期任务

// 3. 实时推荐:按相关度排序的内容
// ZADD recommend 0.95 "article1"
// ZREVRANGE recommend 0 9  // 获取最相关的10条

1.2 性能目标

对于有序数据结构,我们的理想目标是:

  • 插入/删除/查找:O(log n) 时间复杂度
  • 范围查询:O(log n + k) 时间复杂度 (k为结果数量)
  • 空间复杂度:O(n) 且常数因子不能太大
  • 实现复杂度:代码简单,易于维护和调试
  • 并发友好:支持高效的并发操作

二、方案对比:如何实现有序数据结构?

2.1 方案1:数组 (Array)

结构示意:

索引: [0]  [1]  [2]  [3]  [4]  [5]
元素: [10] [20] [30] [40] [50] [60]

实现思路:

type SortedArray struct {
    data []int  // 有序数组
}

// 查找:二分查找
func (sa *SortedArray) Search(value int) int {
    left, right := 0, len(sa.data)-1
    for left <= right {
        mid := left + (right-left)/2
        if sa.data[mid] == value {
            return mid
        } else if sa.data[mid] < value {
            left = mid + 1
        } else {
            right = mid - 1
        }
    }
    return -1
}

// 插入:需要移动元素
func (sa *SortedArray) Insert(value int) {
    pos := sa.findInsertPosition(value)
    // 将 pos 后面的所有元素后移一位
    sa.data = append(sa.data[:pos], append([]int{value}, sa.data[pos:]...)...)
}

性能分析:

操作时间复杂度说明
查找O(log n)二分查找效率高
插入O(n)需要移动所有后续元素
删除O(n)需要移动所有后续元素
范围查询O(log n + k)找到起点后顺序读取
空间O(n)连续存储,无额外指针

优缺点:

  • 查找快:二分查找 O(log n)
  • 空间效率高:无指针开销,缓存友好
  • 插入/删除慢:需要移动大量元素
  • 不适合频繁写入:写多读少场景性能差

2.2 方案2:普通链表 (Linked List)

结构示意:

head -> [10|next] -> [20|next] -> [30|next] -> [40|next] -> [50|next] -> nil

实现思路:

type ListNode struct {
    value int
    next  *ListNode
}

type SortedLinkedList struct {
    head *ListNode
}

// 查找:顺序遍历
func (sll *SortedLinkedList) Search(value int) *ListNode {
    current := sll.head
    for current != nil {
        if current.value == value {
            return current
        }
        if current.value > value {
            return nil  // 已超过,不存在
        }
        current = current.next
    }
    return nil
}

// 插入:找到位置后 O(1) 插入
func (sll *SortedLinkedList) Insert(value int) {
    newNode := &ListNode{value: value}

    if sll.head == nil || sll.head.value >= value {
        newNode.next = sll.head
        sll.head = newNode
        return
    }

    current := sll.head
    for current.next != nil && current.next.value < value {
        current = current.next
    }

    newNode.next = current.next
    current.next = newNode
}

性能分析:

操作时间复杂度说明
查找O(n)必须顺序遍历
插入O(n)查找位置需要 O(n)
删除O(n)查找位置需要 O(n)
范围查询O(n + k)查找起点需要 O(n)
空间⚠️ O(n)每个节点额外1个指针

优缺点:

  • 插入/删除操作本身快:已知位置时 O(1)
  • 动态扩展:无需预分配空间
  • 查找慢:必须顺序遍历
  • 缓存不友好:节点分散在内存中

关键问题:能否给链表加索引来加速查找?


2.3 方案3:平衡二叉搜索树 (Balanced BST)

结构示意 (AVL树):

        30
       /  \
     20    40
    /  \     \
  10   25    50

实现思路:

type TreeNode struct {
    value  int
    left   *TreeNode
    right  *TreeNode
    height int  // AVL树需要维护高度
}

type AVLTree struct {
    root *TreeNode
}

// 查找:二分查找思想
func (avl *AVLTree) Search(value int) *TreeNode {
    current := avl.root
    for current != nil {
        if current.value == value {
            return current
        } else if value < current.value {
            current = current.left
        } else {
            current = current.right
        }
    }
    return nil
}

// 插入:需要维护平衡 (旋转操作)
func (avl *AVLTree) Insert(value int) {
    avl.root = avl.insertNode(avl.root, value)
}

func (avl *AVLTree) insertNode(node *TreeNode, value int) *TreeNode {
    if node == nil {
        return &TreeNode{value: value, height: 1}
    }

    if value < node.value {
        node.left = avl.insertNode(node.left, value)
    } else if value > node.value {
        node.right = avl.insertNode(node.right, value)
    } else {
        return node  // 已存在
    }

    // 更新高度
    node.height = 1 + max(avl.getHeight(node.left), avl.getHeight(node.right))

    // 计算平衡因子
    balance := avl.getBalance(node)

    // 左左情况:右旋
    if balance > 1 && value < node.left.value {
        return avl.rightRotate(node)
    }

    // 右右情况:左旋
    if balance < -1 && value > node.right.value {
        return avl.leftRotate(node)
    }

    // 左右情况:先左旋后右旋
    if balance > 1 && value > node.left.value {
        node.left = avl.leftRotate(node.left)
        return avl.rightRotate(node)
    }

    // 右左情况:先右旋后左旋
    if balance < -1 && value < node.right.value {
        node.right = avl.rightRotate(node.right)
        return avl.leftRotate(node)
    }

    return node
}

// 旋转操作 (复杂!)
func (avl *AVLTree) rightRotate(y *TreeNode) *TreeNode {
    x := y.left
    T2 := x.right

    x.right = y
    y.left = T2

    y.height = max(avl.getHeight(y.left), avl.getHeight(y.right)) + 1
    x.height = max(avl.getHeight(x.left), avl.getHeight(x.right)) + 1

    return x
}

func (avl *AVLTree) leftRotate(x *TreeNode) *TreeNode {
    y := x.right
    T2 := y.left

    y.left = x
    x.right = T2

    x.height = max(avl.getHeight(x.left), avl.getHeight(x.right)) + 1
    y.height = max(avl.getHeight(y.left), avl.getHeight(y.right)) + 1

    return y
}

性能分析:

操作时间复杂度说明
查找O(log n)二叉搜索树性质
插入O(log n)需要旋转操作维护平衡
删除O(log n)需要旋转操作维护平衡
范围查询O(log n + k)中序遍历
空间⚠️ O(n)每个节点2-3个指针

优缺点:

  • 所有操作都是 O(log n):性能稳定
  • 理论完美:满足所有性能要求
  • 实现复杂:旋转逻辑复杂,易出错
  • 维护成本高:插入/删除时需要多次旋转
  • 并发困难:树形结构的并发控制很复杂
  • 内存跳跃:左右子树节点可能相距很远,缓存不友好

关键问题:有没有更简单但性能相近的方案?


2.4 方案4:跳表 (Skip List)

核心思想:给链表建立多级索引

第1步:从单链表开始

原始链表 (查找40需要遍历4次):
head -> [10] -> [20] -> [30] -> [40] -> [50] -> [60] -> nil
        ①       ②       ③       ④

第2步:建立一级索引 (每隔2个元素建一个索引)

一级索引:
head -> [10] -------> [30] -------> [50] -> nil
         |             |             |
原始链表:
head -> [10] -> [20] -> [30] -> [40] -> [50] -> [60] -> nil

查找40: 索引层 10->30->50(太大) -> 下降到原始层 -> 30->40 (找到!)
步数: 3次 (减少了1次)

第3步:建立二级索引 (进一步加速)

二级索引:
head -> [10] ---------------> [30] ---------------> [50] -> nil
         |                     |                     |
一级索引:
head -> [10] -------> [20] -------> [30] -------> [40] -------> [50] -> nil
         |             |             |             |             |
原始链表:
head -> [10] -> [15] -> [20] -> [25] -> [30] -> [35] -> [40] -> [45] -> [50] -> [60] -> nil

查找40的过程:
1. 从二级索引: 10 -> 30 -> 50(太大,下降)
2. 从一级索引: 30 -> 40 (找到!)
步数: 仅3次! (原本需要7次)

完整跳表结构示意:

Level 2:  head ---------> [20] -----------------> [50] -> nil
                           |                       |
Level 1:  head ---------> [20] -------> [40] ----> [50] -> nil
                           |             |          |
Level 0:  head -> [10] -> [20] -> [30] -> [40] -> [50] -> [60] -> nil
           ↑       ↑       ↑       ↑       ↑       ↑       ↑
         原始数据层 (所有元素都在这一层)

查找算法演示 (查找值30):

1. 从最高层(Level 2)开始: head -> 20 -> 50 (50>30, 停止, 下降到Level 1)
2. 在Level 1: 20 -> 40 (40>30, 停止, 下降到Level 0)
3. 在Level 0: 20 -> 30 (找到!)

总共比较次数: 4次
普通链表需要: 3次 (这个例子中跳表不占优势,但数据量大时差异明显)

性能分析:

操作时间复杂度说明
查找O(log n)期望复杂度,类似二分查找
插入O(log n)查找位置 + O(1)插入
删除O(log n)查找位置 + O(1)删除
范围查询O(log n + k)找到起点后顺序遍历
空间⚠️ O(n)期望额外 O(n) 空间用于索引

优缺点:

  • 实现简单:本质是链表,不需要复杂的旋转操作
  • 性能优秀:期望 O(log n),接近平衡树
  • 并发友好:链表结构容易实现无锁并发
  • 范围查询高效:底层有序链表天然支持
  • 内存局部性好:同层节点通过指针连接,顺序访问时缓存友好
  • ⚠️ 概率性能:最坏情况 O(n),但概率极低
  • ⚠️ 空间开销:需要额外索引层 (约2倍空间)

2.5 方案对比总结

数据结构查找插入删除范围查询实现复杂度并发友好性空间开销适用场景
有序数组O(log n)O(n)O(n)O(log n+k)简单差低读多写少
链表O(n)O(n)O(n)O(n+k)简单中中插入删除多
AVL树O(log n)O(log n)O(log n)O(log n+k)复杂差中理论场景
跳表O(log n)O(log n)O(log n)O(log n+k)简单好中高生产环境

Redis 为什么选择跳表?

Redis 作者 Antirez 的原话:

"Skip lists are a simple data structure that can be used in place of balanced trees. They are easier to implement, debug, and free (insert and delete). For these reasons, Redis uses skip lists instead of balanced trees."

关键原因:

  1. 实现简单:不到200行代码就能实现完整功能
  2. 易于调试:结构清晰,便于定位问题
  3. 并发友好:链表结构天然支持无锁并发
  4. 范围查询高效:底层有序链表完美支持 ZRANGE 命令
  5. 性能足够:期望 O(log n),满足生产需求

三、跳表原理:如何工作的?

3.1 核心原理:概率平衡

跳表不像 AVL树那样强制平衡 (通过旋转),而是使用概率平衡:

问题:新插入的节点应该建几层索引?

答案:随机决定!

// 随机层数生成算法
func randomLevel() int {
    level := 1
    // 每次有50%概率增加一层
    for rand.Float64() < 0.5 && level < MAX_LEVEL {
        level++
    }
    return level
}

// 示例:
// 50% 概率: level = 1 (只在原始层)
// 25% 概率: level = 2 (在原始层 + 一级索引)
// 12.5% 概率: level = 3 (在原始层 + 两级索引)
// ...

为什么这样可以?

假设有 n 个元素:

  • 约 n/2 个元素有1层索引
  • 约 n/4 个元素有2层索引
  • 约 n/8 个元素有3层索引
  • ...

这样形成的多级索引结构,期望层数为 log₂(n),查找复杂度期望为 O(log n)。

数学证明 (简化版):

设总共有 n 个元素,每层有一半的元素晋升到上一层:

  • Level 0 (原始层): n 个元素
  • Level 1: n/2 个元素
  • Level 2: n/4 个元素
  • ...
  • Level k: n/(2^k) 个元素

当 n/(2^k) = 1 时,k = log₂(n),所以总层数为 O(log n)。

每层平均需要检查 2 个节点 (期望值),所以总查找步数为 O(log n)。

3.2 节点结构设计

type SkipListNode struct {
    key     string        // 键
    value   interface{}   // 值
    score   float64       // 分数 (用于排序)
    level   int           // 节点层数
    forward []*SkipListNode  // 前向指针数组 (每层一个指针)
}

// 示例:一个3层节点的结构
node := &SkipListNode{
    key:   "user:1000",
    value: "张三",
    score: 95.5,
    level: 2,  // 0, 1, 2 三层
    forward: [
        node_level0,  // Level 0 的下一个节点
        node_level1,  // Level 1 的下一个节点
        node_level2,  // Level 2 的下一个节点
    ],
}

3.3 查找算法详解

算法思路: 从最高层开始,向右走,走不动就往下降。

func (sl *SkipList) Search(score float64, key string) *SkipListNode {
    current := sl.header  // 从头节点开始

    // 从最高层开始向下搜索
    for level := sl.level; level >= 0; level-- {
        // 在当前层向右走,直到下一个节点大于目标
        for current.forward[level] != nil {
            next := current.forward[level]

            // 比较 (先比分数,再比键)
            if next.score < score || (next.score == score && next.key < key) {
                current = next  // 继续向右
            } else {
                break  // 下一个节点太大了,停止在当前层
            }
        }
        // 当前层走不动了,下降到下一层
    }

    // 此时 current 是小于目标的最后一个节点
    // 目标节点应该是 current.forward[0]
    current = current.forward[0]

    if current != nil && current.score == score && current.key == key {
        return current  // 找到!
    }

    return nil  // 未找到
}

可视化查找过程 (查找 score=40):

Step 1: Level 2, current=head
        head -> 20 (20<40,向右) -> 50 (50>40,停止,下降)

Step 2: Level 1, current=20
        20 -> 40 (40==40,找到!)

Level 2:  head ---------> [20] -------X-----> [50]
                           ↓
Level 1:  head ---------> [20] -------> [40] -> [50]
                                         ↑
Level 0:  head -> [10] -> [20] -> [30] -> [40] -> [50] -> [60]
                                         找到!

时间复杂度分析:

  • 每层最多向右走 2 步 (期望值)
  • 总层数为 O(log n)
  • 总时间复杂度:O(log n)

3.4 插入算法详解

算法思路:

  1. 找到插入位置 (类似查找)
  2. 随机生成层数
  3. 在每一层插入节点
func (sl *SkipList) Insert(score float64, key string, value interface{}) {
    // Step 1: 找到每一层的插入位置
    update := make([]*SkipListNode, MAX_LEVEL+1)
    current := sl.header

    for level := sl.level; level >= 0; level-- {
        for current.forward[level] != nil {
            next := current.forward[level]
            if next.score < score || (next.score == score && next.key < key) {
                current = next
            } else {
                break
            }
        }
        update[level] = current  // 记录每层的前驱节点
    }

    // Step 2: 随机生成层数
    newLevel := randomLevel()

    // Step 3: 如果新层数超过当前最大层数,更新 header 的前向指针
    if newLevel > sl.level {
        for i := sl.level + 1; i <= newLevel; i++ {
            update[i] = sl.header
        }
        sl.level = newLevel
    }

    // Step 4: 创建新节点
    newNode := &SkipListNode{
        key:     key,
        value:   value,
        score:   score,
        level:   newLevel,
        forward: make([]*SkipListNode, newLevel+1),
    }

    // Step 5: 在每一层插入新节点
    for i := 0; i <= newLevel; i++ {
        newNode.forward[i] = update[i].forward[i]
        update[i].forward[i] = newNode
    }

    sl.length++
}

可视化插入过程 (插入 score=35, 随机层数=2):

原始结构:
Level 2:  head ---------> [20] -----------------> [50]
Level 1:  head ---------> [20] -------> [40] ----> [50]
Level 0:  head -> [10] -> [20] -> [30] -> [40] -> [50] -> [60]

Step 1: 找到插入位置,记录每层前驱
update[0] = [30], update[1] = [20], update[2] = [20]

Step 2: 随机层数 = 2

Step 3: 创建新节点 [35]

Step 4: 在每层插入
Level 2:  head ---------> [20] -----> [35] -----> [50]
                                       ↑ 新增
Level 1:  head ---------> [20] -----> [35] -> [40] -> [50]
                                       ↑ 新增
Level 0:  head -> [10] -> [20] -> [30] -> [35] -> [40] -> [50] -> [60]
                                          ↑ 新增

3.5 删除算法详解

算法思路:

  1. 找到要删除的节点
  2. 更新所有层的前向指针
  3. 调整最大层数
func (sl *SkipList) Delete(score float64, key string) bool {
    // Step 1: 找到每一层的前驱节点
    update := make([]*SkipListNode, MAX_LEVEL+1)
    current := sl.header

    for level := sl.level; level >= 0; level-- {
        for current.forward[level] != nil {
            next := current.forward[level]
            if next.score < score || (next.score == score && next.key < key) {
                current = next
            } else {
                break
            }
        }
        update[level] = current
    }

    // Step 2: 检查目标节点是否存在
    target := current.forward[0]
    if target == nil || target.score != score || target.key != key {
        return false  // 未找到
    }

    // Step 3: 在每一层删除节点
    for i := 0; i <= sl.level; i++ {
        if update[i].forward[i] == target {
            update[i].forward[i] = target.forward[i]
        }
    }

    // Step 4: 调整最大层数 (如果最高层变空了)
    for sl.level > 0 && sl.header.forward[sl.level] == nil {
        sl.level--
    }

    sl.length--
    return true
}

四、设计演进:从简单到完整

V1 版本:固定2层跳表 (教学版)

设计目标: 理解跳表基本思想

type SimpleSkipList struct {
    level0 *Node  // 原始层
    level1 *Node  // 索引层 (每隔2个元素建一个索引)
}

// 优点:简单易懂
// 缺点:性能不够,索引密度固定

V2 版本:固定多层跳表 (改进版)

设计目标: 提升性能,支持更多数据

type FixedLevelSkipList struct {
    levels [4]*Node  // 固定4层
    // Level 0: 所有元素
    // Level 1: 每2个元素1个索引
    // Level 2: 每4个元素1个索引
    // Level 3: 每8个元素1个索引
}

// 优点:性能提升
// 缺点:层数固定,不够灵活

V3 版本:动态随机层数跳表 (生产版)

设计目标: 自适应数据规模,最优性能

type SkipList struct {
    header   *SkipListNode    // 头节点
    tail     *SkipListNode    // 尾节点 (优化)
    length   int              // 元素数量
    level    int              // 当前最大层数
    maxLevel int              // 最大允许层数 (通常16-32)
}

// 优点:
// 1. 动态适应:层数随数据量自动调整
// 2. 概率平衡:无需复杂的平衡维护
// 3. 性能稳定:期望 O(log n)
// 4. 易于并发:无全局锁

// 缺点:
// 1. 空间开销:约2倍空间 (每个节点平均2个指针)

五、完整实现:生产级跳表代码

5.1 核心结构定义

// skiplist/skiplist.go
package skiplist

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

const (
    MAX_LEVEL       = 16     // 最大层数
    PROBABILITY     = 0.5    // 晋升概率
)

// 跳表节点
type SkipListNode struct {
    key     string           // 键
    value   interface{}      // 值
    score   float64          // 分数 (用于排序)
    level   int              // 节点层数
    forward []*SkipListNode  // 前向指针数组
}

// 跳跃表
type SkipList struct {
    header   *SkipListNode   // 头节点
    tail     *SkipListNode   // 尾节点
    length   int             // 元素数量
    level    int             // 当前最大层数
    maxLevel int             // 最大允许层数
    mu       sync.RWMutex    // 读写锁
}

// 创建新节点
func newNode(key string, value interface{}, score float64, level int) *SkipListNode {
    return &SkipListNode{
        key:     key,
        value:   value,
        score:   score,
        level:   level,
        forward: make([]*SkipListNode, level+1),
    }
}

// 创建跳跃表
func NewSkipList(maxLevel int) *SkipList {
    if maxLevel <= 0 || maxLevel > MAX_LEVEL {
        maxLevel = MAX_LEVEL
    }

    // 创建头节点 (哨兵节点)
    header := newNode("", nil, 0, maxLevel)

    return &SkipList{
        header:   header,
        tail:     nil,
        length:   0,
        level:    0,
        maxLevel: maxLevel,
    }
}

// 随机生成层数 (核心算法!)
func (sl *SkipList) randomLevel() int {
    level := 0
    for rand.Float64() < PROBABILITY && level < sl.maxLevel {
        level++
    }
    return level
}

5.2 查找操作

// skiplist/search.go
package skiplist

// 查找节点
func (sl *SkipList) Search(score float64, key string) (interface{}, bool) {
    sl.mu.RLock()
    defer sl.mu.RUnlock()

    node := sl.searchNode(score, key)
    if node != nil {
        return node.value, true
    }

    return nil, false
}

// 查找节点(内部方法)
func (sl *SkipList) searchNode(score float64, key string) *SkipListNode {
    current := sl.header

    // 从最高层开始搜索
    for level := sl.level; level >= 0; level-- {
        // 在当前层向前搜索
        for current.forward[level] != nil {
            next := current.forward[level]

            // 比较:先比分数,再比键
            if next.score < score || (next.score == score && next.key < key) {
                current = next  // 继续向右
            } else {
                break  // 太大了,停止
            }
        }
    }

    // 移动到下一个节点
    current = current.forward[0]

    // 检查是否找到
    if current != nil && current.score == score && current.key == key {
        return current
    }

    return nil
}

// 范围查询 (按分数)
func (sl *SkipList) SearchRangeByScore(minScore, maxScore float64) []interface{} {
    sl.mu.RLock()
    defer sl.mu.RUnlock()

    var result []interface{}

    // 找到第一个 >= minScore 的节点
    current := sl.findFirstGE(minScore)

    // 收集范围内的所有节点
    for current != nil && current.score <= maxScore {
        result = append(result, current.value)
        current = current.forward[0]
    }

    return result
}

// 查找第一个 >= 指定分数的节点
func (sl *SkipList) findFirstGE(score float64) *SkipListNode {
    current := sl.header

    for level := sl.level; level >= 0; level-- {
        for current.forward[level] != nil && current.forward[level].score < score {
            current = current.forward[level]
        }
    }

    return current.forward[0]
}

5.3 插入操作

// skiplist/insert.go
package skiplist

// 插入节点
func (sl *SkipList) Insert(score float64, key string, value interface{}) error {
    sl.mu.Lock()
    defer sl.mu.Unlock()

    // Step 1: 查找插入位置,记录每层的前驱节点
    update := make([]*SkipListNode, sl.maxLevel+1)
    current := sl.header

    for level := sl.level; level >= 0; level-- {
        for current.forward[level] != nil {
            next := current.forward[level]
            if next.score < score || (next.score == score && next.key < key) {
                current = next
            } else {
                break
            }
        }
        update[level] = current
    }

    // Step 2: 检查键是否已存在
    current = current.forward[0]
    if current != nil && current.score == score && current.key == key {
        // 键已存在,更新值
        current.value = value
        return nil
    }

    // Step 3: 随机生成层数
    newLevel := sl.randomLevel()

    // Step 4: 如果新层数超过当前最大层数,更新 header 的前向指针
    if newLevel > sl.level {
        for i := sl.level + 1; i <= newLevel; i++ {
            update[i] = sl.header
        }
        sl.level = newLevel
    }

    // Step 5: 创建新节点
    newNode := newNode(key, value, score, newLevel)

    // Step 6: 在每一层插入新节点
    for i := 0; i <= newLevel; i++ {
        newNode.forward[i] = update[i].forward[i]
        update[i].forward[i] = newNode
    }

    // Step 7: 更新尾节点
    if newNode.forward[0] == nil {
        sl.tail = newNode
    }

    sl.length++
    return nil
}

5.4 删除操作

// skiplist/delete.go
package skiplist

// 删除节点
func (sl *SkipList) Delete(score float64, key string) bool {
    sl.mu.Lock()
    defer sl.mu.Unlock()

    // Step 1: 查找删除位置,记录每层的前驱节点
    update := make([]*SkipListNode, sl.maxLevel+1)
    current := sl.header

    for level := sl.level; level >= 0; level-- {
        for current.forward[level] != nil {
            next := current.forward[level]
            if next.score < score || (next.score == score && next.key < key) {
                current = next
            } else {
                break
            }
        }
        update[level] = current
    }

    // Step 2: 检查目标节点是否存在
    current = current.forward[0]
    if current == nil || current.score != score || current.key != key {
        return false  // 未找到
    }

    // Step 3: 在每一层删除节点
    for i := 0; i <= sl.level; i++ {
        if update[i].forward[i] == current {
            update[i].forward[i] = current.forward[i]
        }
    }

    // Step 4: 更新尾节点
    if current == sl.tail {
        sl.tail = update[0]
        if sl.tail != sl.header {
            sl.tail = update[0]
        } else {
            sl.tail = nil
        }
    }

    // Step 5: 调整最大层数
    for sl.level > 0 && sl.header.forward[sl.level] == nil {
        sl.level--
    }

    sl.length--
    return true
}

// 删除范围
func (sl *SkipList) DeleteRangeByScore(minScore, maxScore float64) int {
    sl.mu.Lock()
    defer sl.mu.Unlock()

    count := 0

    // 找到第一个 >= minScore 的节点
    update := make([]*SkipListNode, sl.maxLevel+1)
    current := sl.header

    for level := sl.level; level >= 0; level-- {
        for current.forward[level] != nil && current.forward[level].score < minScore {
            current = current.forward[level]
        }
        update[level] = current
    }

    current = current.forward[0]

    // 删除范围内的所有节点
    for current != nil && current.score <= maxScore {
        next := current.forward[0]

        // 在每一层删除节点
        for i := 0; i <= sl.level; i++ {
            if update[i].forward[i] == current {
                update[i].forward[i] = current.forward[i]
            }
        }

        count++
        current = next
    }

    // 调整最大层数
    for sl.level > 0 && sl.header.forward[sl.level] == nil {
        sl.level--
    }

    sl.length -= count
    return count
}

5.5 实用工具方法

// skiplist/utils.go
package skiplist

// 获取长度
func (sl *SkipList) Length() int {
    sl.mu.RLock()
    defer sl.mu.RUnlock()
    return sl.length
}

// 获取排名 (从0开始)
func (sl *SkipList) GetRank(score float64, key string) int {
    sl.mu.RLock()
    defer sl.mu.RUnlock()

    rank := 0
    current := sl.header

    for level := sl.level; level >= 0; level-- {
        for current.forward[level] != nil {
            next := current.forward[level]
            if next.score < score || (next.score == score && next.key < key) {
                rank++
                current = next
            } else {
                break
            }
        }
    }

    current = current.forward[0]
    if current != nil && current.score == score && current.key == key {
        return rank
    }

    return -1  // 未找到
}

// 根据排名获取节点 (从0开始)
func (sl *SkipList) GetByRank(rank int) (interface{}, bool) {
    sl.mu.RLock()
    defer sl.mu.RUnlock()

    if rank < 0 || rank >= sl.length {
        return nil, false
    }

    current := sl.header.forward[0]
    for i := 0; i < rank && current != nil; i++ {
        current = current.forward[0]
    }

    if current != nil {
        return current.value, true
    }

    return nil, false
}

// 获取范围排名
func (sl *SkipList) GetRangeByRank(start, stop int) []interface{} {
    sl.mu.RLock()
    defer sl.mu.RUnlock()

    if start < 0 || stop >= sl.length || start > stop {
        return nil
    }

    var result []interface{}
    current := sl.header.forward[0]

    // 移动到起始位置
    for i := 0; i < start && current != nil; i++ {
        current = current.forward[0]
    }

    // 收集范围内的节点
    for i := start; i <= stop && current != nil; i++ {
        result = append(result, current.value)
        current = current.forward[0]
    }

    return result
}

// 打印跳表结构 (调试用)
func (sl *SkipList) Print() {
    sl.mu.RLock()
    defer sl.mu.RUnlock()

    fmt.Printf("SkipList (length=%d, level=%d):\n", sl.length, sl.level)

    for level := sl.level; level >= 0; level-- {
        fmt.Printf("Level %d: ", level)
        current := sl.header.forward[level]
        for current != nil {
            fmt.Printf("[%.1f:%s] -> ", current.score, current.key)
            current = current.forward[level]
        }
        fmt.Println("nil")
    }
}

六、性能分析与测试

6.1 时间复杂度分析

理论分析:

操作平均情况最坏情况说明
SearchO(log n)O(n)最坏情况:所有节点层数都是0 (概率 ≈ 0)
InsertO(log n)O(n)包含查找 + O(1)插入
DeleteO(log n)O(n)包含查找 + O(1)删除
RangeO(log n + k)O(n + k)k为结果数量
GetRankO(log n)O(n)类似查找
GetByRankO(n)O(n)需要顺序遍历底层

空间复杂度:

  • 每个节点平均有 2 个前向指针 (期望值)
  • 总空间:O(n) × 2 = O(n)
  • 实际内存开销:约为普通链表的 2倍

6.2 基准测试

// skiplist/skiplist_bench_test.go
package skiplist

import (
    "fmt"
    "math/rand"
    "testing"
)

// 测试插入性能
func BenchmarkSkipListInsert(b *testing.B) {
    sl := NewSkipList(16)

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        score := float64(i)
        key := fmt.Sprintf("key%d", i)
        sl.Insert(score, key, i)
    }
}

// 测试查找性能
func BenchmarkSkipListSearch(b *testing.B) {
    sl := NewSkipList(16)

    // 预填充数据
    for i := 0; i < 1000000; i++ {
        score := float64(i)
        key := fmt.Sprintf("key%d", i)
        sl.Insert(score, key, i)
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        score := float64(rand.Intn(1000000))
        key := fmt.Sprintf("key%d", int(score))
        sl.Search(score, key)
    }
}

// 测试范围查询性能
func BenchmarkSkipListRangeQuery(b *testing.B) {
    sl := NewSkipList(16)

    // 预填充数据
    for i := 0; i < 1000000; i++ {
        score := float64(i)
        key := fmt.Sprintf("key%d", i)
        sl.Insert(score, key, i)
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        start := float64(rand.Intn(900000))
        sl.SearchRangeByScore(start, start+100)
    }
}

// 对比:Go map 的性能
func BenchmarkGoMap(b *testing.B) {
    m := make(map[string]int)

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        key := fmt.Sprintf("key%d", i)
        m[key] = i
    }
}

基准测试结果 (示例):

$ go test -bench=. -benchmem

BenchmarkSkipListInsert-8        2000000     850 ns/op     128 B/op    2 allocs/op
BenchmarkSkipListSearch-8       10000000     150 ns/op       0 B/op    0 allocs/op
BenchmarkSkipListRangeQuery-8     500000    3500 ns/op    1600 B/op    1 allocs/op
BenchmarkGoMap-8                 5000000     320 ns/op      48 B/op    0 allocs/op

结论:

  • 插入性能:跳表约为 map 的 2.5倍慢 (但跳表有序!)
  • 查找性能:跳表约为 map 的 2倍慢
  • 范围查询:跳表完胜 (map 不支持有序范围查询)

6.3 单元测试

// skiplist/skiplist_test.go
package skiplist

import (
    "testing"
)

func TestSkipListBasicOperations(t *testing.T) {
    sl := NewSkipList(16)

    // 测试插入
    sl.Insert(10.0, "key1", "value1")
    sl.Insert(20.0, "key2", "value2")
    sl.Insert(15.0, "key3", "value3")

    if sl.Length() != 3 {
        t.Errorf("Expected length 3, got %d", sl.Length())
    }

    // 测试查找
    val, exists := sl.Search(10.0, "key1")
    if !exists || val != "value1" {
        t.Errorf("Expected 'value1', got %v", val)
    }

    // 测试不存在的键
    _, exists = sl.Search(99.0, "key99")
    if exists {
        t.Error("Key should not exist")
    }

    // 测试删除
    if !sl.Delete(10.0, "key1") {
        t.Error("Failed to delete key1")
    }

    _, exists = sl.Search(10.0, "key1")
    if exists {
        t.Error("Key1 should not exist after deletion")
    }

    if sl.Length() != 2 {
        t.Errorf("Expected length 2, got %d", sl.Length())
    }
}

func TestSkipListRangeQuery(t *testing.T) {
    sl := NewSkipList(16)

    // 插入测试数据
    for i := 0; i < 100; i++ {
        score := float64(i)
        key := fmt.Sprintf("key%d", i)
        sl.Insert(score, key, i)
    }

    // 测试范围查询
    result := sl.SearchRangeByScore(10.0, 20.0)
    if len(result) != 11 {  // 10, 11, ..., 20
        t.Errorf("Expected 11 results, got %d", len(result))
    }

    // 验证结果顺序
    for i, val := range result {
        expected := 10 + i
        if val != expected {
            t.Errorf("Expected %d at position %d, got %v", expected, i, val)
        }
    }
}

func TestSkipListRank(t *testing.T) {
    sl := NewSkipList(16)

    // 插入测试数据
    for i := 0; i < 100; i++ {
        score := float64(i)
        key := fmt.Sprintf("key%d", i)
        sl.Insert(score, key, i)
    }

    // 测试获取排名
    rank := sl.GetRank(50.0, "key50")
    if rank != 50 {
        t.Errorf("Expected rank 50, got %d", rank)
    }

    // 测试根据排名获取
    val, exists := sl.GetByRank(25)
    if !exists || val != 25 {
        t.Errorf("Expected 25, got %v", val)
    }
}

func TestSkipListConcurrent(t *testing.T) {
    sl := NewSkipList(16)
    done := make(chan bool, 2)

    // 并发写入
    go func() {
        for i := 0; i < 1000; i++ {
            score := float64(i)
            key := fmt.Sprintf("key%d", i)
            sl.Insert(score, key, i)
        }
        done <- true
    }()

    // 并发读取
    go func() {
        for i := 0; i < 1000; i++ {
            score := float64(i % 100)
            key := fmt.Sprintf("key%d", i%100)
            sl.Search(score, key)
        }
        done <- true
    }()

    // 等待完成
    <-done
    <-done

    if sl.Length() != 1000 {
        t.Errorf("Expected length 1000, got %d", sl.Length())
    }
}

七、实战应用:Redis Sorted Set

7.1 Redis ZSET 的实现

Redis 的 Sorted Set (ZSET) 使用 跳表 + 哈希表 的组合实现:

type ZSet struct {
    dict     map[string]float64  // 哈希表:key -> score (O(1)查找分数)
    skiplist *SkipList            // 跳表:按 score 排序 (O(log n)范围查询)
}

// 为什么需要两个数据结构?
// 1. 哈希表:快速查找元素的分数 (ZSCORE命令)
// 2. 跳表:支持按分数范围查询 (ZRANGE命令)

关键命令实现:

// ZADD: 添加成员
func (zs *ZSet) Add(member string, score float64) {
    // 1. 检查是否已存在
    if oldScore, exists := zs.dict[member]; exists {
        // 删除旧的跳表节点
        zs.skiplist.Delete(oldScore, member)
    }

    // 2. 更新哈希表
    zs.dict[member] = score

    // 3. 插入跳表
    zs.skiplist.Insert(score, member, nil)
}

// ZSCORE: 获取分数
func (zs *ZSet) Score(member string) (float64, bool) {
    score, exists := zs.dict[member]
    return score, exists
}

// ZRANGE: 按排名范围查询
func (zs *ZSet) Range(start, stop int) []string {
    values := zs.skiplist.GetRangeByRank(start, stop)

    var members []string
    for _, val := range values {
        if node, ok := val.(*SkipListNode); ok {
            members = append(members, node.key)
        }
    }

    return members
}

// ZRANGEBYSCORE: 按分数范围查询
func (zs *ZSet) RangeByScore(minScore, maxScore float64) []string {
    values := zs.skiplist.SearchRangeByScore(minScore, maxScore)

    var members []string
    for _, val := range values {
        if node, ok := val.(*SkipListNode); ok {
            members = append(members, node.key)
        }
    }

    return members
}

// ZREM: 删除成员
func (zs *ZSet) Remove(member string) bool {
    score, exists := zs.dict[member]
    if !exists {
        return false
    }

    // 1. 删除哈希表
    delete(zs.dict, member)

    // 2. 删除跳表
    zs.skiplist.Delete(score, member)

    return true
}

7.2 性能对比:跳表 vs 平衡树

特性跳表AVL树红黑树B树
实现复杂度简单复杂⚠️ 中等复杂
查找性能O(log n)O(log n)O(log n)O(log n)
插入性能O(log n)O(log n) + 旋转O(log n) + 旋转O(log n) + 分裂
删除性能O(log n)O(log n) + 旋转O(log n) + 旋转O(log n) + 合并
范围查询优秀⚠️ 中等⚠️ 中等优秀
内存占用2n3n3n取决于阶数
并发友好性好差差⚠️ 中等
缓存友好性中等差差好

Redis 选择跳表的原因总结:

  1. 实现简单:核心代码不到200行
  2. 性能优秀:期望 O(log n),接近平衡树
  3. 范围查询友好:底层链表天然支持顺序遍历
  4. 并发友好:无全局旋转,容易实现无锁并发
  5. 内存局部性好:同层节点通过指针连接,顺序访问缓存友好

八、面试高频问答

跳表的查找时间复杂度为什么是 O(log n)?

答案:

跳表通过多级索引实现"空间换时间":

  1. 层数分析:

    • Level 0: n 个节点
    • Level 1: n/2 个节点 (50%概率)
    • Level 2: n/4 个节点
    • Level k: n/(2^k) 个节点
    • 总层数:k = log₂(n)
  2. 查找步数分析:

    • 每层最多向右走 2 步 (期望值)
    • 总层数 log₂(n)
    • 总步数:2 × log₂(n) = O(log n)
  3. 数学证明: 设 p = 0.5 (晋升概率),n = 元素数量

    • 期望层数:E[level] = log₁/ₚ(n) = log₂(n)
    • 期望查找步数:E[steps] ≤ 2 × E[level] = O(log n)

跳表相比平衡树有什么优势?

答案:

对比维度跳表平衡树 (AVL/红黑树)
实现难度简单,不到200行复杂,需要旋转操作
调试难度结构清晰,易调试旋转逻辑复杂,难调试
插入/删除概率平衡,无旋转需要多次旋转维护平衡
范围查询底层链表,天然支持⚠️ 需要中序遍历
并发控制链表结构,易并发全局旋转,并发困难
内存局部性同层顺序访问友好子树节点可能分散
性能稳定性⚠️ 概率保证,极少退化严格保证平衡

Redis 选择跳表的核心原因:

"Skip lists are easier to implement, debug, and free (insert and delete)." —— Redis 作者 Antirez

跳表的空间复杂度是多少?

答案:

理论分析:

  • 每个节点有 50% 概率晋升到上一层
  • Level 0: n 个节点,每个 1 个指针
  • Level 1: n/2 个节点,每个 2 个指针
  • Level 2: n/4 个节点,每个 3 个指针
  • ...

总指针数 = n + n/2 + n/4 + ... = 2n

结论:

  • 空间复杂度:O(n)
  • 实际开销:约为普通链表的 2倍
  • 优化方案:可以通过调整晋升概率 p 来平衡空间和时间

如何优化跳表的性能?

答案:

1. 调整晋升概率 p

// 默认 p = 0.5
// 减小 p: 空间开销更小,但查找变慢
// 增大 p: 查找更快,但空间开销更大

const PROBABILITY = 0.25  // 4倍空间减少,查找变慢
const PROBABILITY = 0.5   // 平衡点 (推荐)
const PROBABILITY = 0.75  // 查找更快,但空间开销大

2. 增加尾指针

type SkipList struct {
    header *SkipListNode
    tail   *SkipListNode  // 优化:快速访问最后一个元素
}

// ZRANGE -1 -1 (获取最后一个元素) 从 O(n) 降到 O(1)

3. 添加跨度信息

type SkipListLevel struct {
    forward *SkipListNode
    span    int  // 跨度:到下一个节点的距离
}

// 优化:GetByRank 从 O(n) 降到 O(log n)

4. 批量操作优化

// 批量插入:减少锁开销
func (sl *SkipList) BatchInsert(items []Item) {
    sl.mu.Lock()
    defer sl.mu.Unlock()

    for _, item := range items {
        sl.insertInternal(item)  // 无锁版本
    }
}

跳表在什么场景下会退化?

答案:

退化场景:

  1. 极端概率事件 (概率极低)

    • 所有节点层数都为 0 → 退化为普通链表 → O(n)
    • 概率:(0.5)^n ≈ 0 (n 较大时)
  2. 恶意构造数据 (理论可能)

    • 如果随机数生成器可预测,攻击者可构造最坏情况
    • 解决:使用加密级随机数生成器

实际生产环境:

  • 概率极低,可忽略 (< 10^-9)
  • Redis 运行多年,未发现显著退化案例

如何实现并发安全的跳表?

答案:

方案1:粗粒度锁 (简单,性能一般)

type SkipList struct {
    // ...
    mu sync.RWMutex
}

func (sl *SkipList) Insert(/*...*/) {
    sl.mu.Lock()
    defer sl.mu.Unlock()
    // ...
}

方案2:无锁跳表 (复杂,性能高)

// 使用 CAS (Compare-And-Swap) 操作
import "sync/atomic"

func (sl *SkipList) Insert(/*...*/) {
    for {
        // 1. 找到插入位置
        // 2. 尝试 CAS 插入
        if atomic.CompareAndSwapPointer(
            (*unsafe.Pointer)(unsafe.Pointer(&update[0].forward[0])),
            unsafe.Pointer(current),
            unsafe.Pointer(newNode),
        ) {
            break  // 成功
        }
        // 失败:重试
    }
}

方案3:分段锁 (平衡方案)

type SkipList struct {
    segments []*Segment  // 多个分段
}

type Segment struct {
    mu   sync.RWMutex
    list *SkipListNode
}

// 根据 key 的 hash 定位到不同的 segment
// 减少锁竞争

跳表和 B+ 树的区别?

答案:

特性跳表B+ 树
数据结构多层链表多路平衡树
叶子节点包含所有数据包含所有数据
非叶子节点稀疏索引稠密索引
范围查询O(log n + k)O(log n + k)
磁盘友好性不友好 (指针分散)友好 (节点连续)
并发性能好⚠️ 中等
适用场景内存数据库磁盘数据库

选择建议:

  • 内存数据库 (Redis, Memcached):跳表
  • 磁盘数据库 (MySQL, PostgreSQL):B+ 树

九、总结与扩展

9.1 核心要点回顾

  1. 跳表的本质:通过多级索引加速链表查找
  2. 概率平衡:随机层数避免复杂的平衡维护
  3. 性能特点:期望 O(log n),实现简单
  4. Redis 选择跳表的原因:简单、高效、并发友好

9.2 与其他数据结构对比

场景推荐数据结构理由
读多写少有序数组查找快,空间小
写多读少链表插入删除快
平衡读写跳表综合性能好
磁盘存储B+ 树减少磁盘I/O
并发场景跳表无锁并发容易

9.3 扩展阅读

  • Redis源码:redis/src/t_zset.c
  • 论文:"Skip Lists: A Probabilistic Alternative to Balanced Trees" by William Pugh (1990)
  • 应用案例:
    • Redis Sorted Set
    • LevelDB/RocksDB MemTable
    • Lucene 倒排索引

十、实战练习

练习1:实现 ZREVRANGE (逆序范围查询)

// 提示:添加 backward 指针,支持反向遍历
type SkipListNode struct {
    forward  []*SkipListNode
    backward *SkipListNode  // 反向指针
}

练习2:实现 ZCOUNT (统计分数范围内的元素数量)

func (sl *SkipList) Count(minScore, maxScore float64) int {
    // TODO: 实现统计逻辑
    // 提示:添加 span 字段加速统计
}

练习3:实现跳表持久化

func (sl *SkipList) Serialize() ([]byte, error) {
    // TODO: 实现序列化逻辑
}

func (sl *SkipList) Deserialize(data []byte) error {
    // TODO: 实现反序列化逻辑
}

下一章预告: 有序集合 (ZSet) 的完整实现,结合跳表与哈希表,支持 Redis 所有 ZSET 命令。

Prev
哈希表与字典实现
Next
有序集合:Sorted Set 实现