HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 学习 Redis

    • Redis 手写实现学习指南
    • 快速开始
    • Redis 架构总览与线程模型
    • RESP 协议与网络通信
    • 事件循环与 I/O 多路复用
    • 底层数据结构设计
    • 字符串:SDS 简单动态字符串
    • 哈希表与字典实现
    • 列表与跳表实现
    • 有序集合:Sorted Set 实现
    • 内存管理与对象系统
    • RDB 持久化机制
    • AOF 持久化机制
    • 混合持久化策略
    • 分布式锁实现
    • 缓存一致性策略
    • 主从复制机制
    • 哨兵模式实现
    • 内存优化与 GC 调优

哈希表与字典实现

学习目标

  • 理解哈希表的设计动机与核心价值
  • 对比分析不同数据结构的查找性能
  • 掌握哈希冲突解决方案的优劣对比
  • 理解渐进式 Rehash 的设计思想
  • 深入 Redis 字典的实现细节

一、问题背景:为什么需要哈希表?

1.1 业务场景分析

在实际应用中,我们经常需要进行快速的键值查找:

操作类型业务场景示例性能要求
查找用户信息查询 (ID → User)要快!O(1)
插入缓存数据 (Key → Value)要快!O(1)
更新更新配置 (Config → Value)要快!O(1)
删除清除过期缓存要快!O(1)

典型应用场景:

// 1. 用户会话管理
sessionStore["session_abc123"] = User{ID: 1000, Name: "张三"}

// 2. 数据库查询缓存
queryCache["SELECT * FROM users WHERE id=1000"] = cachedResult

// 3. 配置中心
configMap["db.host"] = "192.168.1.100"
configMap["db.port"] = "3306"

// 4. Redis 核心数据结构
// SET key value → 底层就是哈希表
// GET key       → O(1) 查找

1.2 性能目标

对于键值存储,我们的理想目标是:

  • 查找:O(1) 时间复杂度
  • 插入:O(1) 时间复杂度
  • 删除:O(1) 时间复杂度
  • 空间复杂度:O(n) 且常数因子可接受
  • 并发友好:支持高效的并发操作

二、方案对比:如何实现快速查找?

2.1 方案1:数组 (Direct Address)

思路: 用键直接作为数组索引

键值对:(1 → "Alice"), (5 → "Bob"), (100 → "Charlie")

数组实现:
索引:  [0]   [1]     [2]   [3]   [4]   [5]    ... [100]
元素: null "Alice" null null null "Bob" ... "Charlie"

实现代码:

type DirectArray struct {
    data [1000000]string  // 假设键的范围是 0-999999
}

func (da *DirectArray) Get(key int) string {
    return da.data[key]  // O(1) 直接访问
}

func (da *DirectArray) Set(key int, value string) {
    da.data[key] = value  // O(1) 直接设置
}

性能分析:

操作时间复杂度说明
查找O(1)直接索引访问
插入O(1)直接索引赋值
删除O(1)直接索引清空
空间O(U)U为键的范围大小

优缺点:

  • 极快:真正的 O(1) 访问
  • 简单:实现非常简单
  • 空间浪费严重:键范围大时浪费大量内存
  • 例如:键为手机号 (11位数字),需要 10^11 的数组!
  • 不支持非整数键:无法处理字符串键

结论: 只适用于键范围很小的特殊场景


2.2 方案2:有序数组 + 二分查找

思路: 维护有序数组,二分查找

有序键值对数组:
索引: [0]        [1]        [2]         [3]
键:   ["Alice"]  ["Bob"]   ["Charlie"] ["David"]
值:   [100]      [200]     [300]       [400]

实现代码:

type SortedArray struct {
    keys   []string
    values []interface{}
}

func (sa *SortedArray) Get(key string) (interface{}, bool) {
    // 二分查找
    left, right := 0, len(sa.keys)-1
    for left <= right {
        mid := left + (right-left)/2
        if sa.keys[mid] == key {
            return sa.values[mid], true
        } else if sa.keys[mid] < key {
            left = mid + 1
        } else {
            right = mid - 1
        }
    }
    return nil, false
}

func (sa *SortedArray) Set(key string, value interface{}) {
    // 二分查找位置
    pos := sa.findInsertPosition(key)

    // 插入需要移动元素 → O(n)
    sa.keys = append(sa.keys[:pos], append([]string{key}, sa.keys[pos:]...)...)
    sa.values = append(sa.values[:pos], append([]interface{}{value}, sa.values[pos:]...)...)
}

性能分析:

操作时间复杂度说明
查找⚠️ O(log n)二分查找
插入O(n)需要移动元素
删除O(n)需要移动元素
空间O(n)只存储实际数据

优缺点:

  • 空间效率高:只存储实际键值对
  • 支持任意键类型:可比较即可
  • ⚠️ 查找较慢:O(log n) 而非 O(1)
  • 写入慢:插入/删除需要移动元素

2.3 方案3:链表

思路: 顺序存储键值对

链表:
head → [key="Alice", val=100] → [key="Bob", val=200] → [key="Charlie", val=300] → nil

性能分析:

操作时间复杂度说明
查找O(n)顺序遍历
插入O(1)头部插入
删除O(n)需要先查找
空间O(n)只存储实际数据

优缺点:

  • 插入快:头部插入 O(1)
  • 查找极慢:O(n) 顺序遍历

2.4 方案4:哈希表 (Hash Table)

核心思想:通过哈希函数将键映射到数组索引

哈希函数:h(key) = hash(key) % table_size

示例:
h("Alice") = hash("Alice") % 8 = 12345 % 8 = 5
h("Bob")   = hash("Bob")   % 8 = 67890 % 8 = 2
h("Charlie") = hash("Charlie") % 8 = 11111 % 8 = 7

哈希表:
索引: [0]   [1]   [2]      [3]   [4]   [5]        [6]   [7]
     null  null  "Bob"→200 null  null  "Alice"→100 null  "Charlie"→300

基本实现:

type HashTable struct {
    buckets []interface{}
    size    int
}

func (ht *HashTable) hash(key string) int {
    h := 0
    for _, c := range key {
        h = h*31 + int(c)
    }
    return h % ht.size
}

func (ht *HashTable) Get(key string) (interface{}, bool) {
    index := ht.hash(key)
    return ht.buckets[index], true  // 简化版,忽略冲突
}

关键问题:哈希冲突!

问题场景:
h("Alice") = 5
h("Alex")  = 5  ← 冲突!两个键映射到同一个索引

如何解决?

2.5 方案对比总结

数据结构查找插入删除空间支持键类型适用场景
直接地址数组O(1)O(1)O(1)O(U)仅整数键范围极小
有序数组O(log n)O(n)O(n)O(n)可比较读多写少
链表O(n)O(1)O(n)O(n)任意插入频繁
哈希表O(1)*O(1)*O(1)*O(n)可哈希生产环境

* 平均情况下

结论:哈希表是键值查找的最佳方案!

但需要解决一个核心问题:哈希冲突


三、哈希冲突:如何解决?

3.1 什么是哈希冲突?

定义: 不同的键经过哈希函数后映射到同一个索引

示例:
hash("Alice") % 8 = 5
hash("Alex")  % 8 = 5  ← 冲突!

哈希表槽位 5:
[5] → 应该存 "Alice" 还是 "Alex"?

为什么会冲突?

  • 鸽笼原理:键的空间 >> 数组大小
    • 例如:字符串键有无限多个
    • 数组大小是有限的 (如 1000个槽位)
    • 必然有多个键映射到同一个槽位

3.2 解决方案对比

方案A:链地址法 (Separate Chaining)

思路: 每个槽位维护一个链表,冲突的键都存在链表中

哈希表结构:
索引 [0]: null
索引 [1]: null
索引 [2]: → ["Bob", 200] → ["Bobby", 250] → null
索引 [3]: null
索引 [4]: null
索引 [5]: → ["Alice", 100] → ["Alex", 150] → ["Alan", 180] → null
索引 [6]: null
索引 [7]: → ["Charlie", 300] → null

查找 "Alex" 的过程:
1. 计算哈希: h("Alex") = 5
2. 访问槽位 5
3. 遍历链表: "Alice" → "Alex" (找到!)

实现代码:

type ChainNode struct {
    key   string
    value interface{}
    next  *ChainNode
}

type ChainHashTable struct {
    buckets []*ChainNode
    size    int
    count   int
}

func (ht *ChainHashTable) Set(key string, value interface{}) {
    index := ht.hash(key)

    // 检查链表中是否已存在
    current := ht.buckets[index]
    for current != nil {
        if current.key == key {
            current.value = value  // 更新
            return
        }
        current = current.next
    }

    // 头插法插入新节点
    newNode := &ChainNode{key: key, value: value}
    newNode.next = ht.buckets[index]
    ht.buckets[index] = newNode
    ht.count++
}

func (ht *ChainHashTable) Get(key string) (interface{}, bool) {
    index := ht.hash(key)

    // 遍历链表查找
    current := ht.buckets[index]
    for current != nil {
        if current.key == key {
            return current.value, true
        }
        current = current.next
    }

    return nil, false
}

性能分析:

假设负载因子 α = n/m (n为元素数量, m为槽位数量)

操作平均情况最坏情况说明
查找O(1 + α)O(n)均匀分布时链表很短
插入O(1)O(1)头插法
删除O(1 + α)O(n)需要先查找

优缺点:

  • 实现简单:链表操作直观
  • 永不溢出:链表可无限扩展
  • 删除方便:只需断开链表节点
  • 负载因子灵活:可以 > 1
  • 内存不连续:指针跳跃,缓存不友好
  • 额外空间:每个节点需要指针

方案B:开放地址法 (Open Addressing)

思路: 冲突时,在哈希表内寻找下一个空槽位

B1. 线性探测 (Linear Probing)

公式: h(k, i) = (h(k) + i) mod m

示例:插入 "Alice", "Alex", "Alan" (都映射到索引5)

Step 1: 插入 "Alice"
索引 [0] [1] [2] [3] [4] [5]        [6] [7]
    null null null null null "Alice"→100 null null

Step 2: 插入 "Alex" (冲突! 索引5已占用)
尝试: 5 → 6 (空!) → 插入
索引 [0] [1] [2] [3] [4] [5]        [6]       [7]
    null null null null null "Alice"→100 "Alex"→150 null

Step 3: 插入 "Alan" (冲突! 索引5已占用)
尝试: 5 → 6 (占用) → 7 (空!) → 插入
索引 [0] [1] [2] [3] [4] [5]        [6]       [7]
    null null null null null "Alice"→100 "Alex"→150 "Alan"→180

查找 "Alan" 的过程:
1. h("Alan") = 5 → 检查: "Alice" ≠ "Alan"
2. 线性探测: 5+1=6 → 检查: "Alex" ≠ "Alan"
3. 线性探测: 6+1=7 → 检查: "Alan" == "Alan" (找到!)

问题:一次聚集 (Primary Clustering)

随着插入增多,会形成连续的占用区域:
索引 [0] [1] [2] [3]    [4]    [5]    [6]    [7]
    null null null "Dave" "Eve" "Frank" "Grace" "Helen"
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
                   连续占用区域 → 查找效率下降!

B2. 二次探测 (Quadratic Probing)

公式: h(k, i) = (h(k) + i²) mod m

探测序列: h(k), h(k)+1, h(k)+4, h(k)+9, h(k)+16, ...

优点:减少一次聚集
缺点:可能存在二次聚集,且可能无法探测所有槽位

B3. 双重哈希 (Double Hashing)

公式: h(k, i) = (h₁(k) + i·h₂(k)) mod m

使用两个哈希函数:
h₁(k) = k mod 13
h₂(k) = 1 + (k mod 11)

探测序列: h₁(k), h₁(k)+h₂(k), h₁(k)+2·h₂(k), ...

优点:探测序列取决于键本身,减少聚集
缺点:计算两次哈希,实现复杂

开放地址法对比:

方法聚集程度实现复杂度查找效率
线性探测严重 (一次聚集)简单较差
二次探测中等 (二次聚集)中等中等
双重哈希最小复杂最好

开放地址法的优缺点:

  • 内存连续:缓存友好,访问更快
  • 无指针开销:节省空间
  • 删除复杂:需要标记"已删除"而非真删除
  • 负载因子受限:必须 < 1,否则无法插入
  • 性能退化:高负载时聚集严重
// 删除问题示例
哈希表: [A] [B] [C] [D]
         5   6   7   8

删除 B 后不能直接清空:
错误: [A] [ ] [C] [D]  → 查找 C 时会在 6 停止!
正确: [A] [DELETED] [C] [D]  → 查找 C 时会继续探测

3.3 方案选择:链地址法 vs 开放地址法

对比维度链地址法开放地址法
实现复杂度简单中等
删除操作容易复杂 (需要标记)
负载因子可 > 1必须 < 1
内存局部性差 (指针跳跃)好 (连续)
空间利用⚠️ 有指针开销无指针
性能稳定性稳定⚠️ 高负载退化
并发控制较容易困难

Redis 为什么选择链地址法?

  1. 删除频繁:Redis 缓存场景经常删除过期键
  2. 动态扩展:链表可以无限扩展,不担心满载
  3. 实现简单:代码清晰,易维护
  4. 并发友好:细粒度锁更容易实现

四、负载因子与扩容:何时 Rehash?

4.1 负载因子 (Load Factor)

定义: α = n / m

  • n = 已存储的键值对数量
  • m = 哈希表槽位数量

意义: 衡量哈希表的"拥挤程度"

示例:
哈希表大小 m = 8
已存储元素 n = 6
负载因子 α = 6/8 = 0.75

可视化:
索引 [0]  [1]   [2]    [3]  [4]   [5]     [6]  [7]
    null "Bob" null  "Eve" null "Alice" null "Charlie" → ["Dave"]
                                                         (链表)

75% 的槽位已被占用

4.2 负载因子对性能的影响

链地址法:

负载因子 α平均链表长度查找性能内存利用
α = 0.50.5优秀⚠️ 浪费 50%
α = 1.01.0好平衡
α = 2.02.0中等高
α = 5.05.0⚠️ 差高

开放地址法:

负载因子 α探测次数查找性能
α = 0.51.5优秀
α = 0.72.5好
α = 0.95.0⚠️ 差
α ≥ 1.0∞无法插入!

4.3 扩容策略

何时扩容?

不同实现的策略:

实现扩容阈值说明
Java HashMapα ≥ 0.75平衡性能和空间
Go mapα ≥ 6.5允许更高负载
Redis dictα ≥ 1.0 (常规)
α ≥ 5.0 (BGSAVE中)
渐进式rehash
Python dictα ≥ 2/3较保守

Redis 的双重策略:

// 1. 正常情况:负载因子 ≥ 1
if dict.used >= dict.size {
    expand()  // 立即扩容
}

// 2. BGSAVE/BGREWRITEAOF 期间:负载因子 ≥ 5
// 原因:fork子进程时,写时复制(COW)机制会复制内存
// 推迟扩容可以减少内存拷贝
if server.isSavingOrRewriting() {
    if dict.used >= dict.size * 5 {
        expand()  // 负载过高才扩容
    }
}

扩容大小:

通常扩容为 2倍 或 下一个2的幂

// 寻找大于等于 size 的最小 2^n
func nextPower(size int) int {
    n := 4  // 最小容量
    for n < size {
        n *= 2
    }
    return n
}

// 示例:
// 当前大小 8 → 扩容到 16
// 当前大小 100 → 扩容到 128

五、渐进式 Rehash:Redis 的优雅扩容

5.1 问题:传统 Rehash 的困境

传统 Rehash 过程:

1. 分配新的更大的哈希表
2. 遍历旧表,重新计算所有键的哈希值
3. 将所有键值对迁移到新表
4. 释放旧表

问题:如果有 1000万 个键,全部迁移可能需要数秒!
→ 在此期间,Redis 无法处理其他请求
→ 客户端超时、请求积压

可视化:

传统 Rehash:
时间轴: |-------- 正常服务 --------|XXXXX阻塞XXXXX|-------- 正常服务 --------|
                                   ↑
                           迁移1000万个键 (阻塞数秒)

5.2 解决方案:渐进式 Rehash

核心思想: 分多次、逐步地迁移键值对

渐进式 Rehash:
时间轴: |-- 正常+迁移 --|-- 正常+迁移 --|-- 正常+迁移 --|-- 正常 --|
         迁移100个键     迁移100个键     迁移100个键    完成!

每次操作只迁移少量键 → 单次操作耗时极短 → 用户无感知

5.3 渐进式 Rehash 原理

数据结构:双哈希表

type Dict struct {
    ht        [2]*HashTable  // 两个哈希表
    rehashidx int64          // rehash进度 (-1表示未进行)
}

// rehashidx 的含义:
// -1: 未在 rehash
// 0~size: 正在 rehash,表示已迁移到 ht[0] 的索引位置

可视化过程:

初始状态:只使用 ht[0]
ht[0]: [0][1][2][3][4][5][6][7]  (8个槽位, 6个键)
       [A][B][ ][C][ ][D][E][F]
ht[1]: null
rehashidx: -1

──────────────────────────────────────────

触发扩容:负载因子 ≥ 1
ht[0]: [0][1][2][3][4][5][6][7]  (旧表)
       [A][B][ ][C][ ][D][E][F]
ht[1]: [0][1][2][3][4][5][6][7][8][9][10][11][12][13][14][15]  (新表,16个槽位)
       [ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ]
rehashidx: 0  ← 开始迁移!

──────────────────────────────────────────

第1步:迁移索引0 (键A)
ht[0]: [X][B][ ][C][ ][D][E][F]  (A已迁移)
ht[1]: [ ][ ][ ][A][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ]
                ↑
            h(A) % 16 = 3
rehashidx: 1  ← 继续

──────────────────────────────────────────

第2步:迁移索引1 (键B)
ht[0]: [X][X][ ][C][ ][D][E][F]
ht[1]: [ ][ ][ ][A][ ][ ][B][ ][ ][ ][ ][ ][ ][ ][ ][ ]
                            ↑
                        h(B) % 16 = 6
rehashidx: 2

──────────────────────────────────────────

...继续迁移索引2,3,4,5,6,7...

──────────────────────────────────────────

完成:所有键已迁移
ht[0]: null  (释放旧表)
ht[1]: [C][ ][ ][A][ ][F][B][ ][D][ ][ ][ ][ ][E][ ][ ]
rehashidx: -1  ← 完成!

5.4 渐进式 Rehash 的操作策略

查找操作: 先查 ht[0],再查 ht[1]

func (d *Dict) Get(key string) (interface{}, bool) {
    // 1. 先在 ht[0] 查找
    if val, ok := d.ht[0].Get(key); ok {
        return val, true
    }

    // 2. 如果正在 rehash,再在 ht[1] 查找
    if d.isRehashing() {
        if val, ok := d.ht[1].Get(key); ok {
            return val, true
        }
    }

    return nil, false
}

插入操作: 只插入 ht[1]

func (d *Dict) Set(key string, value interface{}) {
    // 渐进式迁移1步
    if d.isRehashing() {
        d.rehashStep()
    }

    // 新键总是插入 ht[1] (如果正在rehash)
    table := d.ht[0]
    if d.isRehashing() {
        table = d.ht[1]
    }

    table.Set(key, value)
}

删除操作: 在两个表中都尝试删除

func (d *Dict) Delete(key string) bool {
    // 渐进式迁移1步
    if d.isRehashing() {
        d.rehashStep()
    }

    // 先尝试从 ht[0] 删除
    if d.ht[0].Delete(key) {
        return true
    }

    // 如果正在rehash,再尝试从 ht[1] 删除
    if d.isRehashing() {
        return d.ht[1].Delete(key)
    }

    return false
}

5.5 核心:rehashStep 实现

func (d *Dict) rehashStep() {
    // 每次只迁移 N 个桶 (Redis默认N=1)
    const N = 1

    for i := 0; i < N; i++ {
        // 检查是否完成
        if d.ht[0].used == 0 {
            // 所有键已迁移完成
            d.ht[0] = d.ht[1]
            d.ht[1] = nil
            d.rehashidx = -1
            return
        }

        // 跳过空桶
        for d.ht[0].table[d.rehashidx] == nil {
            d.rehashidx++
        }

        // 迁移当前桶的所有键
        entry := d.ht[0].table[d.rehashidx]
        for entry != nil {
            next := entry.next

            // 重新计算哈希值
            hash := d.hashFunc(entry.key)
            index := hash & d.ht[1].sizemask

            // 插入到新表 (头插法)
            entry.next = d.ht[1].table[index]
            d.ht[1].table[index] = entry

            d.ht[0].used--
            d.ht[1].used++

            entry = next
        }

        // 清空旧桶
        d.ht[0].table[d.rehashidx] = nil
        d.rehashidx++
    }
}

5.6 定时 Rehash:主动推进

除了在操作时被动迁移,Redis 还会在定时任务中主动推进 rehash:

// Redis 定时任务 (每100ms执行一次)
func serverCron() {
    // 每次最多花费 1ms 推进 rehash
    start := time.Now()

    for _, db := range server.dbs {
        if db.dict.isRehashing() {
            // 每次迁移100个桶
            for i := 0; i < 100; i++ {
                db.dict.rehashStep()

                // 超时退出
                if time.Since(start) >= time.Millisecond {
                    return
                }
            }
        }
    }
}

5.7 性能分析

单次操作耗时:

传统 rehash:
- 一次性迁移 1000万键
- 耗时:~5秒 (阻塞!)

渐进式 rehash:
- 每次操作迁移 1 个桶 (约 1-2个键)
- 额外耗时:~10微秒 (用户无感知)
- 总迁移时间:分散到 1000万次操作中

优缺点:

方案单次延迟总迁移时间内存占用复杂度
传统阻塞数秒5秒1倍简单
渐进式微秒级⚠️ 较长(分散)⚠️ 2倍(过程中)中等

结论: 渐进式 rehash 是高性能系统的必备技术!


六、完整实现:生产级 Redis 字典

6.1 核心结构定义

// dict/dict.go
package dict

import (
    "fmt"
    "sync"
)

const (
    // 初始哈希表大小
    DICT_HT_INITIAL_SIZE = 4

    // 强制 rehash 的负载因子阈值
    DICT_FORCE_RESIZE_RATIO = 5
)

// 字典类型 (支持不同类型的键值)
type DictType struct {
    HashFunc  func(key interface{}) uint64        // 哈希函数
    KeyDup    func(key interface{}) interface{}   // 键复制
    ValDup    func(val interface{}) interface{}   // 值复制
    KeyCompare func(k1, k2 interface{}) bool      // 键比较
    KeyDestructor func(key interface{})           // 键析构
    ValDestructor func(val interface{})           // 值析构
}

// 字典节点
type DictEntry struct {
    key   interface{}
    value interface{}
    next  *DictEntry  // 链表指针 (解决冲突)
}

// 哈希表
type DictHt struct {
    table    []*DictEntry  // 哈希表数组
    size     uint64        // 哈希表大小
    sizemask uint64        // 哈希表大小掩码 (size - 1)
    used     uint64        // 已有节点数量
}

// 字典
type Dict struct {
    dictType  *DictType     // 类型特定函数
    privdata  interface{}   // 私有数据
    ht        [2]*DictHt    // 两个哈希表 (用于渐进式rehash)
    rehashidx int64         // rehash进度 (-1表示未进行)
    iterators int64         // 当前运行的迭代器数量
    mu        sync.RWMutex  // 读写锁
}

// 创建字典
func NewDict(dictType *DictType) *Dict {
    return &Dict{
        dictType:  dictType,
        ht:        [2]*DictHt{newDictHt(DICT_HT_INITIAL_SIZE), nil},
        rehashidx: -1,
        iterators: 0,
    }
}

// 创建哈希表
func newDictHt(size uint64) *DictHt {
    // 确保 size 是 2 的幂
    realSize := nextPower(size)

    return &DictHt{
        table:    make([]*DictEntry, realSize),
        size:     realSize,
        sizemask: realSize - 1,
        used:     0,
    }
}

// 计算下一个 2 的幂
func nextPower(size uint64) uint64 {
    if size >= (1 << 63) {
        return 1 << 63
    }

    n := uint64(DICT_HT_INITIAL_SIZE)
    for n < size {
        n *= 2
    }
    return n
}

// 默认哈希函数 (SipHash)
func defaultHashFunc(key interface{}) uint64 {
    switch k := key.(type) {
    case string:
        return sipHash([]byte(k))
    case int:
        return uint64(k) * 0x9e3779b97f4a7c15
    case int64:
        return uint64(k) * 0x9e3779b97f4a7c15
    case uint64:
        return k * 0x9e3779b97f4a7c15
    default:
        return sipHash([]byte(fmt.Sprintf("%v", k)))
    }
}

// SipHash 简化实现
func sipHash(data []byte) uint64 {
    // 简化版本,生产环境应使用完整SipHash
    h := uint64(0xcbf29ce484222325)
    for _, b := range data {
        h ^= uint64(b)
        h *= 0x100000001b3
    }
    return h
}

6.2 基础操作实现

// dict/operations.go
package dict

// 设置键值对
func (d *Dict) Set(key, value interface{}) error {
    d.mu.Lock()
    defer d.mu.Unlock()

    // 渐进式 rehash
    if d.isRehashing() {
        d.rehashStep()
    }

    // 计算哈希值
    hash := d.dictType.HashFunc(key)

    // 确定使用哪个哈希表
    table := 0
    if d.isRehashing() {
        table = 1  // rehash期间,新键插入ht[1]
    }

    // 计算索引
    index := hash & d.ht[table].sizemask

    // 检查键是否已存在
    entry := d.ht[table].table[index]
    for entry != nil {
        if d.dictType.KeyCompare(entry.key, key) {
            // 更新值
            entry.value = value
            return nil
        }
        entry = entry.next
    }

    // 插入新节点 (头插法)
    newEntry := &DictEntry{
        key:   key,
        value: value,
        next:  d.ht[table].table[index],
    }
    d.ht[table].table[index] = newEntry
    d.ht[table].used++

    // 检查是否需要扩容
    d.expandIfNeeded()

    return nil
}

// 获取值
func (d *Dict) Get(key interface{}) (interface{}, bool) {
    d.mu.RLock()
    defer d.mu.RUnlock()

    if d.ht[0].used == 0 && (d.ht[1] == nil || d.ht[1].used == 0) {
        return nil, false
    }

    // 渐进式 rehash (读操作也推进)
    if d.isRehashing() {
        d.mu.RUnlock()
        d.mu.Lock()
        d.rehashStep()
        d.mu.Unlock()
        d.mu.RLock()
    }

    hash := d.dictType.HashFunc(key)

    // 先查 ht[0]
    if entry := d.findEntry(0, key, hash); entry != nil {
        return entry.value, true
    }

    // 如果在rehash,再查 ht[1]
    if d.isRehashing() {
        if entry := d.findEntry(1, key, hash); entry != nil {
            return entry.value, true
        }
    }

    return nil, false
}

// 删除键
func (d *Dict) Delete(key interface{}) bool {
    d.mu.Lock()
    defer d.mu.Unlock()

    if d.ht[0].used == 0 && (d.ht[1] == nil || d.ht[1].used == 0) {
        return false
    }

    // 渐进式 rehash
    if d.isRehashing() {
        d.rehashStep()
    }

    hash := d.dictType.HashFunc(key)

    // 先尝试从 ht[0] 删除
    if d.deleteEntry(0, key, hash) {
        return true
    }

    // 如果在rehash,再尝试从 ht[1] 删除
    if d.isRehashing() {
        return d.deleteEntry(1, key, hash)
    }

    return false
}

// 查找节点
func (d *Dict) findEntry(table int, key interface{}, hash uint64) *DictEntry {
    if d.ht[table] == nil {
        return nil
    }

    index := hash & d.ht[table].sizemask
    entry := d.ht[table].table[index]

    for entry != nil {
        if d.dictType.KeyCompare(entry.key, key) {
            return entry
        }
        entry = entry.next
    }

    return nil
}

// 删除节点
func (d *Dict) deleteEntry(table int, key interface{}, hash uint64) bool {
    if d.ht[table] == nil {
        return false
    }

    index := hash & d.ht[table].sizemask
    entry := d.ht[table].table[index]
    var prev *DictEntry

    for entry != nil {
        if d.dictType.KeyCompare(entry.key, key) {
            // 找到了,删除节点
            if prev == nil {
                d.ht[table].table[index] = entry.next
            } else {
                prev.next = entry.next
            }

            // 调用析构函数
            if d.dictType.KeyDestructor != nil {
                d.dictType.KeyDestructor(entry.key)
            }
            if d.dictType.ValDestructor != nil {
                d.dictType.ValDestructor(entry.value)
            }

            d.ht[table].used--
            return true
        }
        prev = entry
        entry = entry.next
    }

    return false
}

// 获取字典大小
func (d *Dict) Size() uint64 {
    d.mu.RLock()
    defer d.mu.RUnlock()

    size := d.ht[0].used
    if d.ht[1] != nil {
        size += d.ht[1].used
    }
    return size
}

// 检查是否为空
func (d *Dict) IsEmpty() bool {
    return d.Size() == 0
}

6.3 渐进式 Rehash 实现

// dict/rehash.go
package dict

// 检查是否正在 rehash
func (d *Dict) isRehashing() bool {
    return d.rehashidx != -1
}

// 渐进式 rehash 一步
func (d *Dict) rehashStep() {
    if d.iterators == 0 {
        d.rehash(1)
    }
}

// rehash N 个桶
func (d *Dict) rehash(n int) int {
    if !d.isRehashing() {
        return 0
    }

    emptyVisits := n * 10  // 最多访问 10*n 个空桶

    for n > 0 && d.ht[0].used != 0 {
        // 跳过空桶
        for d.ht[0].table[d.rehashidx] == nil {
            d.rehashidx++
            emptyVisits--
            if emptyVisits == 0 {
                return 1  // 访问太多空桶,下次继续
            }
        }

        // 获取当前桶
        entry := d.ht[0].table[d.rehashidx]

        // 迁移当前桶的所有节点
        for entry != nil {
            next := entry.next

            // 重新计算哈希值
            hash := d.dictType.HashFunc(entry.key)
            index := hash & d.ht[1].sizemask

            // 头插法插入 ht[1]
            entry.next = d.ht[1].table[index]
            d.ht[1].table[index] = entry

            d.ht[0].used--
            d.ht[1].used++

            entry = next
        }

        // 清空旧桶
        d.ht[0].table[d.rehashidx] = nil
        d.rehashidx++
        n--
    }

    // 检查是否完成
    if d.ht[0].used == 0 {
        // 释放 ht[0],将 ht[1] 设置为 ht[0]
        d.ht[0] = d.ht[1]
        d.ht[1] = nil
        d.rehashidx = -1
        return 0  // 完成
    }

    return 1  // 未完成
}

// 检查是否需要扩容
func (d *Dict) expandIfNeeded() {
    // 已在 rehash,不处理
    if d.isRehashing() {
        return
    }

    // 哈希表为空,初始化
    if d.ht[0].size == 0 {
        d.expand(DICT_HT_INITIAL_SIZE)
        return
    }

    // 负载因子检查
    // 1. used >= size 且没有子进程在save → 扩容
    // 2. used / size >= 5 → 强制扩容
    ratio := float64(d.ht[0].used) / float64(d.ht[0].size)

    if (d.ht[0].used >= d.ht[0].size && !serverIsBackgroundSaving()) ||
        ratio >= DICT_FORCE_RESIZE_RATIO {
        d.expand(d.ht[0].used * 2)
    }
}

// 扩容
func (d *Dict) expand(size uint64) error {
    // 已在 rehash
    if d.isRehashing() {
        return fmt.Errorf("already rehashing")
    }

    // 计算实际大小 (≥ size 的最小2的幂)
    realSize := nextPower(size)

    // 不能缩小
    if realSize < d.ht[0].used {
        return fmt.Errorf("expand size too small")
    }

    // 创建新哈希表
    d.ht[1] = newDictHt(realSize)
    d.rehashidx = 0

    return nil
}

// 缩容
func (d *Dict) shrinkIfNeeded() {
    // TODO: 实现缩容逻辑
    // 当负载因子 < 0.1 时,缩容到合适大小
}

// 模拟检查是否有后台保存任务
func serverIsBackgroundSaving() bool {
    // 简化实现,实际应检查 BGSAVE/BGREWRITEAOF
    return false
}

6.4 迭代器实现

// dict/iterator.go
package dict

// 字典迭代器
type DictIterator struct {
    d           *Dict
    table       int
    index       int64
    entry       *DictEntry
    nextEntry   *DictEntry
    fingerprint uint64
}

// 创建迭代器
func (d *Dict) GetIterator() *DictIterator {
    d.mu.RLock()

    iter := &DictIterator{
        d:           d,
        table:       0,
        index:       -1,
        entry:       nil,
        nextEntry:   nil,
        fingerprint: d.fingerprint(),
    }

    d.iterators++
    d.mu.RUnlock()

    return iter
}

// 获取下一个节点
func (iter *DictIterator) Next() *DictEntry {
    iter.d.mu.RLock()
    defer iter.d.mu.RUnlock()

    for {
        if iter.entry == nil {
            // 移动到下一个桶
            iter.index++

            // 检查是否遍历完当前表
            if iter.table == 0 && iter.index >= int64(iter.d.ht[0].size) {
                // 如果正在rehash,切换到ht[1]
                if iter.d.isRehashing() {
                    iter.table = 1
                    iter.index = 0
                } else {
                    break  // 遍历完成
                }
            } else if iter.table == 1 && iter.index >= int64(iter.d.ht[1].size) {
                break  // 遍历完成
            }

            // 获取当前桶的第一个节点
            if iter.table == 0 {
                iter.entry = iter.d.ht[0].table[iter.index]
            } else {
                iter.entry = iter.d.ht[1].table[iter.index]
            }
        } else {
            // 继续遍历当前链表
            iter.entry = iter.nextEntry
        }

        if iter.entry != nil {
            // 保存下一个节点
            iter.nextEntry = iter.entry.next
            return iter.entry
        }
    }

    return nil
}

// 释放迭代器
func (iter *DictIterator) Release() {
    iter.d.mu.Lock()
    defer iter.d.mu.Unlock()

    iter.d.iterators--

    // 检查指纹是否改变 (安全性检查)
    if iter.fingerprint != iter.d.fingerprint() {
        panic("dict was modified during iteration")
    }
}

// 计算指纹 (用于检测迭代期间是否被修改)
func (d *Dict) fingerprint() uint64 {
    integers := []uint64{
        uint64(uintptr(unsafe.Pointer(d.ht[0].table))),
        d.ht[0].size,
        d.ht[0].used,
    }

    if d.ht[1] != nil {
        integers = append(integers,
            uint64(uintptr(unsafe.Pointer(d.ht[1].table))),
            d.ht[1].size,
            d.ht[1].used,
        )
    }

    h := uint64(0)
    for _, v := range integers {
        h = h*31 + v
    }
    return h
}

七、性能测试与对比

7.1 基准测试

// dict/dict_bench_test.go
package dict

import (
    "fmt"
    "math/rand"
    "testing"
)

// 默认字典类型
var defaultDictType = &DictType{
    HashFunc: defaultHashFunc,
    KeyCompare: func(k1, k2 interface{}) bool {
        return k1 == k2
    },
}

// 测试插入性能
func BenchmarkDictSet(b *testing.B) {
    d := NewDict(defaultDictType)

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        key := fmt.Sprintf("key%d", i)
        d.Set(key, i)
    }
}

// 测试查找性能
func BenchmarkDictGet(b *testing.B) {
    d := NewDict(defaultDictType)

    // 预填充数据
    for i := 0; i < 1000000; i++ {
        key := fmt.Sprintf("key%d", i)
        d.Set(key, i)
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        key := fmt.Sprintf("key%d", rand.Intn(1000000))
        d.Get(key)
    }
}

// 测试删除性能
func BenchmarkDictDelete(b *testing.B) {
    d := NewDict(defaultDictType)

    // 预填充数据
    for i := 0; i < b.N; i++ {
        key := fmt.Sprintf("key%d", i)
        d.Set(key, i)
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        key := fmt.Sprintf("key%d", i)
        d.Delete(key)
    }
}

// 对比:Go 原生 map
func BenchmarkGoMap(b *testing.B) {
    m := make(map[string]int)

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        key := fmt.Sprintf("key%d", i)
        m[key] = i
    }
}

基准测试结果 (示例):

$ go test -bench=. -benchmem

BenchmarkDictSet-8        2000000    750 ns/op    120 B/op    2 allocs/op
BenchmarkDictGet-8       10000000    120 ns/op      0 B/op    0 allocs/op
BenchmarkDictDelete-8     5000000    280 ns/op      0 B/op    0 allocs/op
BenchmarkGoMap-8          5000000    250 ns/op     48 B/op    0 allocs/op

性能对比:

操作Redis DictGo map差距
插入750 ns250 ns3倍慢
查找120 ns60 ns2倍慢
删除280 ns200 ns1.4倍慢

为什么 Redis Dict 更慢?

  1. 渐进式 rehash 开销:每次操作都要推进 rehash
  2. 锁开销:sync.RWMutex 的锁开销
  3. 通用设计:支持自定义类型,牺牲性能换灵活性

但 Redis Dict 的优势:

  • 无阻塞扩容:渐进式 rehash
  • 可控内存:清晰的扩容策略
  • 灵活定制:支持自定义哈希函数和类型

八、实战应用:Redis 数据库与过期键

8.1 Redis 数据库结构

Redis 的每个数据库就是一个字典:

type RedisDb struct {
    dict    *Dict  // 主字典:存储所有键值对
    expires *Dict  // 过期字典:存储键的过期时间
    id      int    // 数据库编号
}

// SET key value
func (db *RedisDb) Set(key string, value interface{}) {
    db.dict.Set(key, value)
}

// GET key
func (db *RedisDb) Get(key string) (interface{}, bool) {
    // 先检查是否过期
    if db.IsExpired(key) {
        db.Delete(key)
        return nil, false
    }

    return db.dict.Get(key)
}

// EXPIRE key seconds
func (db *RedisDb) Expire(key string, seconds int64) {
    expireAt := time.Now().Unix() + seconds
    db.expires.Set(key, expireAt)
}

// 检查是否过期
func (db *RedisDb) IsExpired(key string) bool {
    expireAt, exists := db.expires.Get(key)
    if !exists {
        return false
    }

    return time.Now().Unix() > expireAt.(int64)
}

// DEL key
func (db *RedisDb) Delete(key string) bool {
    db.expires.Delete(key)  // 删除过期时间
    return db.dict.Delete(key)
}

8.2 Redis 命令映射

Redis命令字典操作说明
SET key valuedict.Set(key, value)O(1) 插入
GET keydict.Get(key)O(1) 查找
DEL keydict.Delete(key)O(1) 删除
EXISTS keydict.Get(key) != nilO(1) 检查存在
KEYS *遍历 dictO(n) 遍历所有键
DBSIZEdict.Size()O(1) 获取键数量
FLUSHDBdict.Clear()O(n) 清空数据库

九、面试高频问答

哈希表的时间复杂度为什么是 O(1)?

答案:

哈希表通过哈希函数将键直接映射到数组索引,实现常数时间访问:

查找 "Alice":
1. 计算哈希: h("Alice") = 12345
2. 取模: 12345 % 8 = 5
3. 访问: table[5] → O(1) 直接索引

对比:
- 数组查找:     需要遍历 → O(n)
- 二分查找:     需要比较 log n 次 → O(log n)
- 哈希表查找:   直接索引 → O(1) 

但要注意:

  • 平均 O(1):假设哈希均匀分布
  • 最坏 O(n):所有键冲突到同一个槽位 (极少发生)

为什么 Redis 使用链地址法而不是开放地址法?

答案:

对比维度链地址法 (Redis)开放地址法
删除操作简单复杂 (需要标记)
负载因子可 > 1必须 < 1
性能稳定性稳定⚠️ 高负载退化
并发控制较容易困难

Redis 的场景特点:

  1. 删除频繁:缓存过期、键删除
  2. 动态负载:键数量波动大
  3. 高并发:需要细粒度锁

结论: 链地址法更适合 Redis 的使用场景

什么是渐进式 Rehash? 为什么需要?

答案:

传统 Rehash 的问题:

一次性迁移 1000万键 → 阻塞 5秒 → 客户端超时 

渐进式 Rehash 的解决方案:

分多次迁移,每次只迁移少量键:
- 每次操作迁移 1 个桶 (约1-2个键)
- 额外耗时仅 10微秒
- 用户完全无感知 

实现要点:

  1. 双哈希表:ht[0] (旧表) + ht[1] (新表)
  2. 分散迁移:每次操作迁移一部分
  3. 查询两表:查找时先查 ht[0],再查 ht[1]
  4. 插入新表:新键只插入 ht[1]

负载因子应该设置为多少?

答案:

不同场景的选择:

场景推荐负载因子原因
内存敏感α = 2.0节省空间
性能优先α = 0.75平衡性能
极致性能α = 0.5链表极短

Redis 的策略:

// 1. 正常情况: α ≥ 1.0 → 扩容
if used >= size {
    expand()
}

// 2. BGSAVE期间: α ≥ 5.0 → 才扩容
// 原因: 写时复制(COW)机制,延迟扩容减少内存拷贝
if isBGSaving() && used >= size * 5 {
    expand()
}

权衡:

  • α 越小 → 性能越好,但浪费内存
  • α 越大 → 节省内存,但性能下降

哈希函数应该满足什么要求?

答案:

好的哈希函数必须:

  1. 确定性:相同输入产生相同输出
  2. 均匀性:输出均匀分布,减少冲突
  3. 雪崩效应:输入微小变化 → 输出巨大变化
  4. 高效性:计算速度快

示例对比:

//  差的哈希函数
func badHash(s string) uint64 {
    return uint64(len(s))  // 只依赖长度,冲突严重!
}

//  好的哈希函数 (FNV-1a)
func fnvHash(s string) uint64 {
    h := uint64(14695981039346656037)  // FNV offset basis
    for _, c := range s {
        h ^= uint64(c)
        h *= 1099511628211  // FNV prime
    }
    return h
}

Redis 使用的哈希函数:

  • SipHash:防止哈希洪水攻击 (Hash Flooding)
  • MurmurHash:高性能场景

如何防止哈希洪水攻击?

答案:

攻击原理:

1. 攻击者构造大量键,使其哈希到同一个槽位
2. 哈希表退化为链表 → O(n) 查找
3. CPU 100%,服务拒绝

示例:
正常情况: hash("user1") = 5, hash("user2") = 3 (分散)
攻击情况: hash("aaa") = 5, hash("bbb") = 5, hash("ccc") = 5 (全冲突!)

防御方案:

1. 使用加密级哈希函数 (SipHash)

// SipHash 使用随机种子,攻击者无法预测哈希值
func sipHash(key []byte, seed [16]byte) uint64 {
    // 密码学级别的哈希函数
    // 即使知道算法,不知道 seed 也无法构造冲突
}

2. 随机化种子

// 每次启动使用不同的随机种子
seed := generateRandomSeed()

3. 限制单个槽位的链表长度

// 当链表过长时,拒绝插入或报警
if chainLength > MAX_CHAIN_LENGTH {
    return error("possible hash flooding attack")
}

Redis 的防护:

  • 默认使用 SipHash
  • 随机种子在启动时生成
  • 单个键值对大小限制 (512MB)

哈希表与跳表、B+树的选择?

答案:

数据结构查找插入删除范围查询有序遍历适用场景
哈希表O(1)O(1)O(1)不支持无序精确查找 (Redis String/Hash)
跳表O(log n)O(log n)O(log n)O(log n+k)有序范围查询 (Redis Sorted Set)
B+树O(log n)O(log n)O(log n)O(log n+k)有序磁盘存储 (MySQL索引)

选择建议:

  • 需要 O(1) 精确查找 → 哈希表
  • 需要范围查询 + 内存存储 → 跳表
  • 需要范围查询 + 磁盘存储 → B+树

十、总结与扩展

10.1 核心要点回顾

  1. 哈希表的本质:通过哈希函数实现 O(1) 查找
  2. 冲突解决:链地址法 vs 开放地址法
  3. 渐进式 Rehash:Redis 无阻塞扩容的关键
  4. 负载因子:性能与空间的权衡

10.2 Redis 字典的优势

  1. O(1) 操作:查找、插入、删除都是常数时间
  2. 渐进式 rehash:无阻塞扩容
  3. 灵活定制:支持多种数据类型
  4. 并发友好:读写锁 + 细粒度锁

10.3 扩展阅读

  • Redis源码:redis/src/dict.c, redis/src/dict.h
  • 论文:SipHash - "SipHash: a fast short-input PRF"
  • 应用案例:
    • Redis 数据库键空间
    • Redis Hash 类型内部实现
    • Redis 订阅频道映射

十一、实战练习

练习1:实现一致性哈希

// 提示:环形哈希 + 虚拟节点
type ConsistentHash struct {
    circle map[uint64]string  // 哈希环
    nodes  []uint64           // 排序的哈希值
}

练习2:实现 LRU 缓存

// 提示:哈希表 + 双向链表
type LRUCache struct {
    capacity int
    cache    map[string]*Node
    head     *Node
    tail     *Node
}

练习3:实现布隆过滤器

// 提示:多个哈希函数 + 位数组
type BloomFilter struct {
    bits []bool
    hashFuncs []func(string) uint64
}

下一章预告: 有序集合 (Sorted Set) 的实现,结合哈希表与跳表,支持 O(1) 查找和 O(log n) 范围查询。

Prev
字符串:SDS 简单动态字符串
Next
列表与跳表实现