哈希表与字典实现
学习目标
- 理解哈希表的设计动机与核心价值
- 对比分析不同数据结构的查找性能
- 掌握哈希冲突解决方案的优劣对比
- 理解渐进式 Rehash 的设计思想
- 深入 Redis 字典的实现细节
一、问题背景:为什么需要哈希表?
1.1 业务场景分析
在实际应用中,我们经常需要进行快速的键值查找:
| 操作类型 | 业务场景示例 | 性能要求 |
|---|---|---|
| 查找 | 用户信息查询 (ID → User) | 要快!O(1) |
| 插入 | 缓存数据 (Key → Value) | 要快!O(1) |
| 更新 | 更新配置 (Config → Value) | 要快!O(1) |
| 删除 | 清除过期缓存 | 要快!O(1) |
典型应用场景:
// 1. 用户会话管理
sessionStore["session_abc123"] = User{ID: 1000, Name: "张三"}
// 2. 数据库查询缓存
queryCache["SELECT * FROM users WHERE id=1000"] = cachedResult
// 3. 配置中心
configMap["db.host"] = "192.168.1.100"
configMap["db.port"] = "3306"
// 4. Redis 核心数据结构
// SET key value → 底层就是哈希表
// GET key → O(1) 查找
1.2 性能目标
对于键值存储,我们的理想目标是:
- 查找:O(1) 时间复杂度
- 插入:O(1) 时间复杂度
- 删除:O(1) 时间复杂度
- 空间复杂度:O(n) 且常数因子可接受
- 并发友好:支持高效的并发操作
二、方案对比:如何实现快速查找?
2.1 方案1:数组 (Direct Address)
思路: 用键直接作为数组索引
键值对:(1 → "Alice"), (5 → "Bob"), (100 → "Charlie")
数组实现:
索引: [0] [1] [2] [3] [4] [5] ... [100]
元素: null "Alice" null null null "Bob" ... "Charlie"
实现代码:
type DirectArray struct {
data [1000000]string // 假设键的范围是 0-999999
}
func (da *DirectArray) Get(key int) string {
return da.data[key] // O(1) 直接访问
}
func (da *DirectArray) Set(key int, value string) {
da.data[key] = value // O(1) 直接设置
}
性能分析:
| 操作 | 时间复杂度 | 说明 |
|---|---|---|
| 查找 | O(1) | 直接索引访问 |
| 插入 | O(1) | 直接索引赋值 |
| 删除 | O(1) | 直接索引清空 |
| 空间 | O(U) | U为键的范围大小 |
优缺点:
- 极快:真正的 O(1) 访问
- 简单:实现非常简单
- 空间浪费严重:键范围大时浪费大量内存
- 例如:键为手机号 (11位数字),需要 10^11 的数组!
- 不支持非整数键:无法处理字符串键
结论: 只适用于键范围很小的特殊场景
2.2 方案2:有序数组 + 二分查找
思路: 维护有序数组,二分查找
有序键值对数组:
索引: [0] [1] [2] [3]
键: ["Alice"] ["Bob"] ["Charlie"] ["David"]
值: [100] [200] [300] [400]
实现代码:
type SortedArray struct {
keys []string
values []interface{}
}
func (sa *SortedArray) Get(key string) (interface{}, bool) {
// 二分查找
left, right := 0, len(sa.keys)-1
for left <= right {
mid := left + (right-left)/2
if sa.keys[mid] == key {
return sa.values[mid], true
} else if sa.keys[mid] < key {
left = mid + 1
} else {
right = mid - 1
}
}
return nil, false
}
func (sa *SortedArray) Set(key string, value interface{}) {
// 二分查找位置
pos := sa.findInsertPosition(key)
// 插入需要移动元素 → O(n)
sa.keys = append(sa.keys[:pos], append([]string{key}, sa.keys[pos:]...)...)
sa.values = append(sa.values[:pos], append([]interface{}{value}, sa.values[pos:]...)...)
}
性能分析:
| 操作 | 时间复杂度 | 说明 |
|---|---|---|
| 查找 | ⚠️ O(log n) | 二分查找 |
| 插入 | O(n) | 需要移动元素 |
| 删除 | O(n) | 需要移动元素 |
| 空间 | O(n) | 只存储实际数据 |
优缺点:
- 空间效率高:只存储实际键值对
- 支持任意键类型:可比较即可
- ⚠️ 查找较慢:O(log n) 而非 O(1)
- 写入慢:插入/删除需要移动元素
2.3 方案3:链表
思路: 顺序存储键值对
链表:
head → [key="Alice", val=100] → [key="Bob", val=200] → [key="Charlie", val=300] → nil
性能分析:
| 操作 | 时间复杂度 | 说明 |
|---|---|---|
| 查找 | O(n) | 顺序遍历 |
| 插入 | O(1) | 头部插入 |
| 删除 | O(n) | 需要先查找 |
| 空间 | O(n) | 只存储实际数据 |
优缺点:
- 插入快:头部插入 O(1)
- 查找极慢:O(n) 顺序遍历
2.4 方案4:哈希表 (Hash Table)
核心思想:通过哈希函数将键映射到数组索引
哈希函数:h(key) = hash(key) % table_size
示例:
h("Alice") = hash("Alice") % 8 = 12345 % 8 = 5
h("Bob") = hash("Bob") % 8 = 67890 % 8 = 2
h("Charlie") = hash("Charlie") % 8 = 11111 % 8 = 7
哈希表:
索引: [0] [1] [2] [3] [4] [5] [6] [7]
null null "Bob"→200 null null "Alice"→100 null "Charlie"→300
基本实现:
type HashTable struct {
buckets []interface{}
size int
}
func (ht *HashTable) hash(key string) int {
h := 0
for _, c := range key {
h = h*31 + int(c)
}
return h % ht.size
}
func (ht *HashTable) Get(key string) (interface{}, bool) {
index := ht.hash(key)
return ht.buckets[index], true // 简化版,忽略冲突
}
关键问题:哈希冲突!
问题场景:
h("Alice") = 5
h("Alex") = 5 ← 冲突!两个键映射到同一个索引
如何解决?
2.5 方案对比总结
| 数据结构 | 查找 | 插入 | 删除 | 空间 | 支持键类型 | 适用场景 |
|---|---|---|---|---|---|---|
| 直接地址数组 | O(1) | O(1) | O(1) | O(U) | 仅整数 | 键范围极小 |
| 有序数组 | O(log n) | O(n) | O(n) | O(n) | 可比较 | 读多写少 |
| 链表 | O(n) | O(1) | O(n) | O(n) | 任意 | 插入频繁 |
| 哈希表 | O(1)* | O(1)* | O(1)* | O(n) | 可哈希 | 生产环境 |
* 平均情况下
结论:哈希表是键值查找的最佳方案!
但需要解决一个核心问题:哈希冲突
三、哈希冲突:如何解决?
3.1 什么是哈希冲突?
定义: 不同的键经过哈希函数后映射到同一个索引
示例:
hash("Alice") % 8 = 5
hash("Alex") % 8 = 5 ← 冲突!
哈希表槽位 5:
[5] → 应该存 "Alice" 还是 "Alex"?
为什么会冲突?
- 鸽笼原理:键的空间 >> 数组大小
- 例如:字符串键有无限多个
- 数组大小是有限的 (如 1000个槽位)
- 必然有多个键映射到同一个槽位
3.2 解决方案对比
方案A:链地址法 (Separate Chaining)
思路: 每个槽位维护一个链表,冲突的键都存在链表中
哈希表结构:
索引 [0]: null
索引 [1]: null
索引 [2]: → ["Bob", 200] → ["Bobby", 250] → null
索引 [3]: null
索引 [4]: null
索引 [5]: → ["Alice", 100] → ["Alex", 150] → ["Alan", 180] → null
索引 [6]: null
索引 [7]: → ["Charlie", 300] → null
查找 "Alex" 的过程:
1. 计算哈希: h("Alex") = 5
2. 访问槽位 5
3. 遍历链表: "Alice" → "Alex" (找到!)
实现代码:
type ChainNode struct {
key string
value interface{}
next *ChainNode
}
type ChainHashTable struct {
buckets []*ChainNode
size int
count int
}
func (ht *ChainHashTable) Set(key string, value interface{}) {
index := ht.hash(key)
// 检查链表中是否已存在
current := ht.buckets[index]
for current != nil {
if current.key == key {
current.value = value // 更新
return
}
current = current.next
}
// 头插法插入新节点
newNode := &ChainNode{key: key, value: value}
newNode.next = ht.buckets[index]
ht.buckets[index] = newNode
ht.count++
}
func (ht *ChainHashTable) Get(key string) (interface{}, bool) {
index := ht.hash(key)
// 遍历链表查找
current := ht.buckets[index]
for current != nil {
if current.key == key {
return current.value, true
}
current = current.next
}
return nil, false
}
性能分析:
假设负载因子 α = n/m (n为元素数量, m为槽位数量)
| 操作 | 平均情况 | 最坏情况 | 说明 |
|---|---|---|---|
| 查找 | O(1 + α) | O(n) | 均匀分布时链表很短 |
| 插入 | O(1) | O(1) | 头插法 |
| 删除 | O(1 + α) | O(n) | 需要先查找 |
优缺点:
- 实现简单:链表操作直观
- 永不溢出:链表可无限扩展
- 删除方便:只需断开链表节点
- 负载因子灵活:可以 > 1
- 内存不连续:指针跳跃,缓存不友好
- 额外空间:每个节点需要指针
方案B:开放地址法 (Open Addressing)
思路: 冲突时,在哈希表内寻找下一个空槽位
B1. 线性探测 (Linear Probing)
公式: h(k, i) = (h(k) + i) mod m
示例:插入 "Alice", "Alex", "Alan" (都映射到索引5)
Step 1: 插入 "Alice"
索引 [0] [1] [2] [3] [4] [5] [6] [7]
null null null null null "Alice"→100 null null
Step 2: 插入 "Alex" (冲突! 索引5已占用)
尝试: 5 → 6 (空!) → 插入
索引 [0] [1] [2] [3] [4] [5] [6] [7]
null null null null null "Alice"→100 "Alex"→150 null
Step 3: 插入 "Alan" (冲突! 索引5已占用)
尝试: 5 → 6 (占用) → 7 (空!) → 插入
索引 [0] [1] [2] [3] [4] [5] [6] [7]
null null null null null "Alice"→100 "Alex"→150 "Alan"→180
查找 "Alan" 的过程:
1. h("Alan") = 5 → 检查: "Alice" ≠ "Alan"
2. 线性探测: 5+1=6 → 检查: "Alex" ≠ "Alan"
3. 线性探测: 6+1=7 → 检查: "Alan" == "Alan" (找到!)
问题:一次聚集 (Primary Clustering)
随着插入增多,会形成连续的占用区域:
索引 [0] [1] [2] [3] [4] [5] [6] [7]
null null null "Dave" "Eve" "Frank" "Grace" "Helen"
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
连续占用区域 → 查找效率下降!
B2. 二次探测 (Quadratic Probing)
公式: h(k, i) = (h(k) + i²) mod m
探测序列: h(k), h(k)+1, h(k)+4, h(k)+9, h(k)+16, ...
优点:减少一次聚集
缺点:可能存在二次聚集,且可能无法探测所有槽位
B3. 双重哈希 (Double Hashing)
公式: h(k, i) = (h₁(k) + i·h₂(k)) mod m
使用两个哈希函数:
h₁(k) = k mod 13
h₂(k) = 1 + (k mod 11)
探测序列: h₁(k), h₁(k)+h₂(k), h₁(k)+2·h₂(k), ...
优点:探测序列取决于键本身,减少聚集
缺点:计算两次哈希,实现复杂
开放地址法对比:
| 方法 | 聚集程度 | 实现复杂度 | 查找效率 |
|---|---|---|---|
| 线性探测 | 严重 (一次聚集) | 简单 | 较差 |
| 二次探测 | 中等 (二次聚集) | 中等 | 中等 |
| 双重哈希 | 最小 | 复杂 | 最好 |
开放地址法的优缺点:
- 内存连续:缓存友好,访问更快
- 无指针开销:节省空间
- 删除复杂:需要标记"已删除"而非真删除
- 负载因子受限:必须 < 1,否则无法插入
- 性能退化:高负载时聚集严重
// 删除问题示例
哈希表: [A] [B] [C] [D]
5 6 7 8
删除 B 后不能直接清空:
错误: [A] [ ] [C] [D] → 查找 C 时会在 6 停止!
正确: [A] [DELETED] [C] [D] → 查找 C 时会继续探测
3.3 方案选择:链地址法 vs 开放地址法
| 对比维度 | 链地址法 | 开放地址法 |
|---|---|---|
| 实现复杂度 | 简单 | 中等 |
| 删除操作 | 容易 | 复杂 (需要标记) |
| 负载因子 | 可 > 1 | 必须 < 1 |
| 内存局部性 | 差 (指针跳跃) | 好 (连续) |
| 空间利用 | ⚠️ 有指针开销 | 无指针 |
| 性能稳定性 | 稳定 | ⚠️ 高负载退化 |
| 并发控制 | 较容易 | 困难 |
Redis 为什么选择链地址法?
- 删除频繁:Redis 缓存场景经常删除过期键
- 动态扩展:链表可以无限扩展,不担心满载
- 实现简单:代码清晰,易维护
- 并发友好:细粒度锁更容易实现
四、负载因子与扩容:何时 Rehash?
4.1 负载因子 (Load Factor)
定义: α = n / m
- n = 已存储的键值对数量
- m = 哈希表槽位数量
意义: 衡量哈希表的"拥挤程度"
示例:
哈希表大小 m = 8
已存储元素 n = 6
负载因子 α = 6/8 = 0.75
可视化:
索引 [0] [1] [2] [3] [4] [5] [6] [7]
null "Bob" null "Eve" null "Alice" null "Charlie" → ["Dave"]
(链表)
75% 的槽位已被占用
4.2 负载因子对性能的影响
链地址法:
| 负载因子 α | 平均链表长度 | 查找性能 | 内存利用 |
|---|---|---|---|
| α = 0.5 | 0.5 | 优秀 | ⚠️ 浪费 50% |
| α = 1.0 | 1.0 | 好 | 平衡 |
| α = 2.0 | 2.0 | 中等 | 高 |
| α = 5.0 | 5.0 | ⚠️ 差 | 高 |
开放地址法:
| 负载因子 α | 探测次数 | 查找性能 |
|---|---|---|
| α = 0.5 | 1.5 | 优秀 |
| α = 0.7 | 2.5 | 好 |
| α = 0.9 | 5.0 | ⚠️ 差 |
| α ≥ 1.0 | ∞ | 无法插入! |
4.3 扩容策略
何时扩容?
不同实现的策略:
| 实现 | 扩容阈值 | 说明 |
|---|---|---|
| Java HashMap | α ≥ 0.75 | 平衡性能和空间 |
| Go map | α ≥ 6.5 | 允许更高负载 |
| Redis dict | α ≥ 1.0 (常规) α ≥ 5.0 (BGSAVE中) | 渐进式rehash |
| Python dict | α ≥ 2/3 | 较保守 |
Redis 的双重策略:
// 1. 正常情况:负载因子 ≥ 1
if dict.used >= dict.size {
expand() // 立即扩容
}
// 2. BGSAVE/BGREWRITEAOF 期间:负载因子 ≥ 5
// 原因:fork子进程时,写时复制(COW)机制会复制内存
// 推迟扩容可以减少内存拷贝
if server.isSavingOrRewriting() {
if dict.used >= dict.size * 5 {
expand() // 负载过高才扩容
}
}
扩容大小:
通常扩容为 2倍 或 下一个2的幂
// 寻找大于等于 size 的最小 2^n
func nextPower(size int) int {
n := 4 // 最小容量
for n < size {
n *= 2
}
return n
}
// 示例:
// 当前大小 8 → 扩容到 16
// 当前大小 100 → 扩容到 128
五、渐进式 Rehash:Redis 的优雅扩容
5.1 问题:传统 Rehash 的困境
传统 Rehash 过程:
1. 分配新的更大的哈希表
2. 遍历旧表,重新计算所有键的哈希值
3. 将所有键值对迁移到新表
4. 释放旧表
问题:如果有 1000万 个键,全部迁移可能需要数秒!
→ 在此期间,Redis 无法处理其他请求
→ 客户端超时、请求积压
可视化:
传统 Rehash:
时间轴: |-------- 正常服务 --------|XXXXX阻塞XXXXX|-------- 正常服务 --------|
↑
迁移1000万个键 (阻塞数秒)
5.2 解决方案:渐进式 Rehash
核心思想: 分多次、逐步地迁移键值对
渐进式 Rehash:
时间轴: |-- 正常+迁移 --|-- 正常+迁移 --|-- 正常+迁移 --|-- 正常 --|
迁移100个键 迁移100个键 迁移100个键 完成!
每次操作只迁移少量键 → 单次操作耗时极短 → 用户无感知
5.3 渐进式 Rehash 原理
数据结构:双哈希表
type Dict struct {
ht [2]*HashTable // 两个哈希表
rehashidx int64 // rehash进度 (-1表示未进行)
}
// rehashidx 的含义:
// -1: 未在 rehash
// 0~size: 正在 rehash,表示已迁移到 ht[0] 的索引位置
可视化过程:
初始状态:只使用 ht[0]
ht[0]: [0][1][2][3][4][5][6][7] (8个槽位, 6个键)
[A][B][ ][C][ ][D][E][F]
ht[1]: null
rehashidx: -1
──────────────────────────────────────────
触发扩容:负载因子 ≥ 1
ht[0]: [0][1][2][3][4][5][6][7] (旧表)
[A][B][ ][C][ ][D][E][F]
ht[1]: [0][1][2][3][4][5][6][7][8][9][10][11][12][13][14][15] (新表,16个槽位)
[ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ]
rehashidx: 0 ← 开始迁移!
──────────────────────────────────────────
第1步:迁移索引0 (键A)
ht[0]: [X][B][ ][C][ ][D][E][F] (A已迁移)
ht[1]: [ ][ ][ ][A][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ][ ]
↑
h(A) % 16 = 3
rehashidx: 1 ← 继续
──────────────────────────────────────────
第2步:迁移索引1 (键B)
ht[0]: [X][X][ ][C][ ][D][E][F]
ht[1]: [ ][ ][ ][A][ ][ ][B][ ][ ][ ][ ][ ][ ][ ][ ][ ]
↑
h(B) % 16 = 6
rehashidx: 2
──────────────────────────────────────────
...继续迁移索引2,3,4,5,6,7...
──────────────────────────────────────────
完成:所有键已迁移
ht[0]: null (释放旧表)
ht[1]: [C][ ][ ][A][ ][F][B][ ][D][ ][ ][ ][ ][E][ ][ ]
rehashidx: -1 ← 完成!
5.4 渐进式 Rehash 的操作策略
查找操作: 先查 ht[0],再查 ht[1]
func (d *Dict) Get(key string) (interface{}, bool) {
// 1. 先在 ht[0] 查找
if val, ok := d.ht[0].Get(key); ok {
return val, true
}
// 2. 如果正在 rehash,再在 ht[1] 查找
if d.isRehashing() {
if val, ok := d.ht[1].Get(key); ok {
return val, true
}
}
return nil, false
}
插入操作: 只插入 ht[1]
func (d *Dict) Set(key string, value interface{}) {
// 渐进式迁移1步
if d.isRehashing() {
d.rehashStep()
}
// 新键总是插入 ht[1] (如果正在rehash)
table := d.ht[0]
if d.isRehashing() {
table = d.ht[1]
}
table.Set(key, value)
}
删除操作: 在两个表中都尝试删除
func (d *Dict) Delete(key string) bool {
// 渐进式迁移1步
if d.isRehashing() {
d.rehashStep()
}
// 先尝试从 ht[0] 删除
if d.ht[0].Delete(key) {
return true
}
// 如果正在rehash,再尝试从 ht[1] 删除
if d.isRehashing() {
return d.ht[1].Delete(key)
}
return false
}
5.5 核心:rehashStep 实现
func (d *Dict) rehashStep() {
// 每次只迁移 N 个桶 (Redis默认N=1)
const N = 1
for i := 0; i < N; i++ {
// 检查是否完成
if d.ht[0].used == 0 {
// 所有键已迁移完成
d.ht[0] = d.ht[1]
d.ht[1] = nil
d.rehashidx = -1
return
}
// 跳过空桶
for d.ht[0].table[d.rehashidx] == nil {
d.rehashidx++
}
// 迁移当前桶的所有键
entry := d.ht[0].table[d.rehashidx]
for entry != nil {
next := entry.next
// 重新计算哈希值
hash := d.hashFunc(entry.key)
index := hash & d.ht[1].sizemask
// 插入到新表 (头插法)
entry.next = d.ht[1].table[index]
d.ht[1].table[index] = entry
d.ht[0].used--
d.ht[1].used++
entry = next
}
// 清空旧桶
d.ht[0].table[d.rehashidx] = nil
d.rehashidx++
}
}
5.6 定时 Rehash:主动推进
除了在操作时被动迁移,Redis 还会在定时任务中主动推进 rehash:
// Redis 定时任务 (每100ms执行一次)
func serverCron() {
// 每次最多花费 1ms 推进 rehash
start := time.Now()
for _, db := range server.dbs {
if db.dict.isRehashing() {
// 每次迁移100个桶
for i := 0; i < 100; i++ {
db.dict.rehashStep()
// 超时退出
if time.Since(start) >= time.Millisecond {
return
}
}
}
}
}
5.7 性能分析
单次操作耗时:
传统 rehash:
- 一次性迁移 1000万键
- 耗时:~5秒 (阻塞!)
渐进式 rehash:
- 每次操作迁移 1 个桶 (约 1-2个键)
- 额外耗时:~10微秒 (用户无感知)
- 总迁移时间:分散到 1000万次操作中
优缺点:
| 方案 | 单次延迟 | 总迁移时间 | 内存占用 | 复杂度 |
|---|---|---|---|---|
| 传统 | 阻塞数秒 | 5秒 | 1倍 | 简单 |
| 渐进式 | 微秒级 | ⚠️ 较长(分散) | ⚠️ 2倍(过程中) | 中等 |
结论: 渐进式 rehash 是高性能系统的必备技术!
六、完整实现:生产级 Redis 字典
6.1 核心结构定义
// dict/dict.go
package dict
import (
"fmt"
"sync"
)
const (
// 初始哈希表大小
DICT_HT_INITIAL_SIZE = 4
// 强制 rehash 的负载因子阈值
DICT_FORCE_RESIZE_RATIO = 5
)
// 字典类型 (支持不同类型的键值)
type DictType struct {
HashFunc func(key interface{}) uint64 // 哈希函数
KeyDup func(key interface{}) interface{} // 键复制
ValDup func(val interface{}) interface{} // 值复制
KeyCompare func(k1, k2 interface{}) bool // 键比较
KeyDestructor func(key interface{}) // 键析构
ValDestructor func(val interface{}) // 值析构
}
// 字典节点
type DictEntry struct {
key interface{}
value interface{}
next *DictEntry // 链表指针 (解决冲突)
}
// 哈希表
type DictHt struct {
table []*DictEntry // 哈希表数组
size uint64 // 哈希表大小
sizemask uint64 // 哈希表大小掩码 (size - 1)
used uint64 // 已有节点数量
}
// 字典
type Dict struct {
dictType *DictType // 类型特定函数
privdata interface{} // 私有数据
ht [2]*DictHt // 两个哈希表 (用于渐进式rehash)
rehashidx int64 // rehash进度 (-1表示未进行)
iterators int64 // 当前运行的迭代器数量
mu sync.RWMutex // 读写锁
}
// 创建字典
func NewDict(dictType *DictType) *Dict {
return &Dict{
dictType: dictType,
ht: [2]*DictHt{newDictHt(DICT_HT_INITIAL_SIZE), nil},
rehashidx: -1,
iterators: 0,
}
}
// 创建哈希表
func newDictHt(size uint64) *DictHt {
// 确保 size 是 2 的幂
realSize := nextPower(size)
return &DictHt{
table: make([]*DictEntry, realSize),
size: realSize,
sizemask: realSize - 1,
used: 0,
}
}
// 计算下一个 2 的幂
func nextPower(size uint64) uint64 {
if size >= (1 << 63) {
return 1 << 63
}
n := uint64(DICT_HT_INITIAL_SIZE)
for n < size {
n *= 2
}
return n
}
// 默认哈希函数 (SipHash)
func defaultHashFunc(key interface{}) uint64 {
switch k := key.(type) {
case string:
return sipHash([]byte(k))
case int:
return uint64(k) * 0x9e3779b97f4a7c15
case int64:
return uint64(k) * 0x9e3779b97f4a7c15
case uint64:
return k * 0x9e3779b97f4a7c15
default:
return sipHash([]byte(fmt.Sprintf("%v", k)))
}
}
// SipHash 简化实现
func sipHash(data []byte) uint64 {
// 简化版本,生产环境应使用完整SipHash
h := uint64(0xcbf29ce484222325)
for _, b := range data {
h ^= uint64(b)
h *= 0x100000001b3
}
return h
}
6.2 基础操作实现
// dict/operations.go
package dict
// 设置键值对
func (d *Dict) Set(key, value interface{}) error {
d.mu.Lock()
defer d.mu.Unlock()
// 渐进式 rehash
if d.isRehashing() {
d.rehashStep()
}
// 计算哈希值
hash := d.dictType.HashFunc(key)
// 确定使用哪个哈希表
table := 0
if d.isRehashing() {
table = 1 // rehash期间,新键插入ht[1]
}
// 计算索引
index := hash & d.ht[table].sizemask
// 检查键是否已存在
entry := d.ht[table].table[index]
for entry != nil {
if d.dictType.KeyCompare(entry.key, key) {
// 更新值
entry.value = value
return nil
}
entry = entry.next
}
// 插入新节点 (头插法)
newEntry := &DictEntry{
key: key,
value: value,
next: d.ht[table].table[index],
}
d.ht[table].table[index] = newEntry
d.ht[table].used++
// 检查是否需要扩容
d.expandIfNeeded()
return nil
}
// 获取值
func (d *Dict) Get(key interface{}) (interface{}, bool) {
d.mu.RLock()
defer d.mu.RUnlock()
if d.ht[0].used == 0 && (d.ht[1] == nil || d.ht[1].used == 0) {
return nil, false
}
// 渐进式 rehash (读操作也推进)
if d.isRehashing() {
d.mu.RUnlock()
d.mu.Lock()
d.rehashStep()
d.mu.Unlock()
d.mu.RLock()
}
hash := d.dictType.HashFunc(key)
// 先查 ht[0]
if entry := d.findEntry(0, key, hash); entry != nil {
return entry.value, true
}
// 如果在rehash,再查 ht[1]
if d.isRehashing() {
if entry := d.findEntry(1, key, hash); entry != nil {
return entry.value, true
}
}
return nil, false
}
// 删除键
func (d *Dict) Delete(key interface{}) bool {
d.mu.Lock()
defer d.mu.Unlock()
if d.ht[0].used == 0 && (d.ht[1] == nil || d.ht[1].used == 0) {
return false
}
// 渐进式 rehash
if d.isRehashing() {
d.rehashStep()
}
hash := d.dictType.HashFunc(key)
// 先尝试从 ht[0] 删除
if d.deleteEntry(0, key, hash) {
return true
}
// 如果在rehash,再尝试从 ht[1] 删除
if d.isRehashing() {
return d.deleteEntry(1, key, hash)
}
return false
}
// 查找节点
func (d *Dict) findEntry(table int, key interface{}, hash uint64) *DictEntry {
if d.ht[table] == nil {
return nil
}
index := hash & d.ht[table].sizemask
entry := d.ht[table].table[index]
for entry != nil {
if d.dictType.KeyCompare(entry.key, key) {
return entry
}
entry = entry.next
}
return nil
}
// 删除节点
func (d *Dict) deleteEntry(table int, key interface{}, hash uint64) bool {
if d.ht[table] == nil {
return false
}
index := hash & d.ht[table].sizemask
entry := d.ht[table].table[index]
var prev *DictEntry
for entry != nil {
if d.dictType.KeyCompare(entry.key, key) {
// 找到了,删除节点
if prev == nil {
d.ht[table].table[index] = entry.next
} else {
prev.next = entry.next
}
// 调用析构函数
if d.dictType.KeyDestructor != nil {
d.dictType.KeyDestructor(entry.key)
}
if d.dictType.ValDestructor != nil {
d.dictType.ValDestructor(entry.value)
}
d.ht[table].used--
return true
}
prev = entry
entry = entry.next
}
return false
}
// 获取字典大小
func (d *Dict) Size() uint64 {
d.mu.RLock()
defer d.mu.RUnlock()
size := d.ht[0].used
if d.ht[1] != nil {
size += d.ht[1].used
}
return size
}
// 检查是否为空
func (d *Dict) IsEmpty() bool {
return d.Size() == 0
}
6.3 渐进式 Rehash 实现
// dict/rehash.go
package dict
// 检查是否正在 rehash
func (d *Dict) isRehashing() bool {
return d.rehashidx != -1
}
// 渐进式 rehash 一步
func (d *Dict) rehashStep() {
if d.iterators == 0 {
d.rehash(1)
}
}
// rehash N 个桶
func (d *Dict) rehash(n int) int {
if !d.isRehashing() {
return 0
}
emptyVisits := n * 10 // 最多访问 10*n 个空桶
for n > 0 && d.ht[0].used != 0 {
// 跳过空桶
for d.ht[0].table[d.rehashidx] == nil {
d.rehashidx++
emptyVisits--
if emptyVisits == 0 {
return 1 // 访问太多空桶,下次继续
}
}
// 获取当前桶
entry := d.ht[0].table[d.rehashidx]
// 迁移当前桶的所有节点
for entry != nil {
next := entry.next
// 重新计算哈希值
hash := d.dictType.HashFunc(entry.key)
index := hash & d.ht[1].sizemask
// 头插法插入 ht[1]
entry.next = d.ht[1].table[index]
d.ht[1].table[index] = entry
d.ht[0].used--
d.ht[1].used++
entry = next
}
// 清空旧桶
d.ht[0].table[d.rehashidx] = nil
d.rehashidx++
n--
}
// 检查是否完成
if d.ht[0].used == 0 {
// 释放 ht[0],将 ht[1] 设置为 ht[0]
d.ht[0] = d.ht[1]
d.ht[1] = nil
d.rehashidx = -1
return 0 // 完成
}
return 1 // 未完成
}
// 检查是否需要扩容
func (d *Dict) expandIfNeeded() {
// 已在 rehash,不处理
if d.isRehashing() {
return
}
// 哈希表为空,初始化
if d.ht[0].size == 0 {
d.expand(DICT_HT_INITIAL_SIZE)
return
}
// 负载因子检查
// 1. used >= size 且没有子进程在save → 扩容
// 2. used / size >= 5 → 强制扩容
ratio := float64(d.ht[0].used) / float64(d.ht[0].size)
if (d.ht[0].used >= d.ht[0].size && !serverIsBackgroundSaving()) ||
ratio >= DICT_FORCE_RESIZE_RATIO {
d.expand(d.ht[0].used * 2)
}
}
// 扩容
func (d *Dict) expand(size uint64) error {
// 已在 rehash
if d.isRehashing() {
return fmt.Errorf("already rehashing")
}
// 计算实际大小 (≥ size 的最小2的幂)
realSize := nextPower(size)
// 不能缩小
if realSize < d.ht[0].used {
return fmt.Errorf("expand size too small")
}
// 创建新哈希表
d.ht[1] = newDictHt(realSize)
d.rehashidx = 0
return nil
}
// 缩容
func (d *Dict) shrinkIfNeeded() {
// TODO: 实现缩容逻辑
// 当负载因子 < 0.1 时,缩容到合适大小
}
// 模拟检查是否有后台保存任务
func serverIsBackgroundSaving() bool {
// 简化实现,实际应检查 BGSAVE/BGREWRITEAOF
return false
}
6.4 迭代器实现
// dict/iterator.go
package dict
// 字典迭代器
type DictIterator struct {
d *Dict
table int
index int64
entry *DictEntry
nextEntry *DictEntry
fingerprint uint64
}
// 创建迭代器
func (d *Dict) GetIterator() *DictIterator {
d.mu.RLock()
iter := &DictIterator{
d: d,
table: 0,
index: -1,
entry: nil,
nextEntry: nil,
fingerprint: d.fingerprint(),
}
d.iterators++
d.mu.RUnlock()
return iter
}
// 获取下一个节点
func (iter *DictIterator) Next() *DictEntry {
iter.d.mu.RLock()
defer iter.d.mu.RUnlock()
for {
if iter.entry == nil {
// 移动到下一个桶
iter.index++
// 检查是否遍历完当前表
if iter.table == 0 && iter.index >= int64(iter.d.ht[0].size) {
// 如果正在rehash,切换到ht[1]
if iter.d.isRehashing() {
iter.table = 1
iter.index = 0
} else {
break // 遍历完成
}
} else if iter.table == 1 && iter.index >= int64(iter.d.ht[1].size) {
break // 遍历完成
}
// 获取当前桶的第一个节点
if iter.table == 0 {
iter.entry = iter.d.ht[0].table[iter.index]
} else {
iter.entry = iter.d.ht[1].table[iter.index]
}
} else {
// 继续遍历当前链表
iter.entry = iter.nextEntry
}
if iter.entry != nil {
// 保存下一个节点
iter.nextEntry = iter.entry.next
return iter.entry
}
}
return nil
}
// 释放迭代器
func (iter *DictIterator) Release() {
iter.d.mu.Lock()
defer iter.d.mu.Unlock()
iter.d.iterators--
// 检查指纹是否改变 (安全性检查)
if iter.fingerprint != iter.d.fingerprint() {
panic("dict was modified during iteration")
}
}
// 计算指纹 (用于检测迭代期间是否被修改)
func (d *Dict) fingerprint() uint64 {
integers := []uint64{
uint64(uintptr(unsafe.Pointer(d.ht[0].table))),
d.ht[0].size,
d.ht[0].used,
}
if d.ht[1] != nil {
integers = append(integers,
uint64(uintptr(unsafe.Pointer(d.ht[1].table))),
d.ht[1].size,
d.ht[1].used,
)
}
h := uint64(0)
for _, v := range integers {
h = h*31 + v
}
return h
}
七、性能测试与对比
7.1 基准测试
// dict/dict_bench_test.go
package dict
import (
"fmt"
"math/rand"
"testing"
)
// 默认字典类型
var defaultDictType = &DictType{
HashFunc: defaultHashFunc,
KeyCompare: func(k1, k2 interface{}) bool {
return k1 == k2
},
}
// 测试插入性能
func BenchmarkDictSet(b *testing.B) {
d := NewDict(defaultDictType)
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key%d", i)
d.Set(key, i)
}
}
// 测试查找性能
func BenchmarkDictGet(b *testing.B) {
d := NewDict(defaultDictType)
// 预填充数据
for i := 0; i < 1000000; i++ {
key := fmt.Sprintf("key%d", i)
d.Set(key, i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key%d", rand.Intn(1000000))
d.Get(key)
}
}
// 测试删除性能
func BenchmarkDictDelete(b *testing.B) {
d := NewDict(defaultDictType)
// 预填充数据
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key%d", i)
d.Set(key, i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key%d", i)
d.Delete(key)
}
}
// 对比:Go 原生 map
func BenchmarkGoMap(b *testing.B) {
m := make(map[string]int)
b.ResetTimer()
for i := 0; i < b.N; i++ {
key := fmt.Sprintf("key%d", i)
m[key] = i
}
}
基准测试结果 (示例):
$ go test -bench=. -benchmem
BenchmarkDictSet-8 2000000 750 ns/op 120 B/op 2 allocs/op
BenchmarkDictGet-8 10000000 120 ns/op 0 B/op 0 allocs/op
BenchmarkDictDelete-8 5000000 280 ns/op 0 B/op 0 allocs/op
BenchmarkGoMap-8 5000000 250 ns/op 48 B/op 0 allocs/op
性能对比:
| 操作 | Redis Dict | Go map | 差距 |
|---|---|---|---|
| 插入 | 750 ns | 250 ns | 3倍慢 |
| 查找 | 120 ns | 60 ns | 2倍慢 |
| 删除 | 280 ns | 200 ns | 1.4倍慢 |
为什么 Redis Dict 更慢?
- 渐进式 rehash 开销:每次操作都要推进 rehash
- 锁开销:sync.RWMutex 的锁开销
- 通用设计:支持自定义类型,牺牲性能换灵活性
但 Redis Dict 的优势:
- 无阻塞扩容:渐进式 rehash
- 可控内存:清晰的扩容策略
- 灵活定制:支持自定义哈希函数和类型
八、实战应用:Redis 数据库与过期键
8.1 Redis 数据库结构
Redis 的每个数据库就是一个字典:
type RedisDb struct {
dict *Dict // 主字典:存储所有键值对
expires *Dict // 过期字典:存储键的过期时间
id int // 数据库编号
}
// SET key value
func (db *RedisDb) Set(key string, value interface{}) {
db.dict.Set(key, value)
}
// GET key
func (db *RedisDb) Get(key string) (interface{}, bool) {
// 先检查是否过期
if db.IsExpired(key) {
db.Delete(key)
return nil, false
}
return db.dict.Get(key)
}
// EXPIRE key seconds
func (db *RedisDb) Expire(key string, seconds int64) {
expireAt := time.Now().Unix() + seconds
db.expires.Set(key, expireAt)
}
// 检查是否过期
func (db *RedisDb) IsExpired(key string) bool {
expireAt, exists := db.expires.Get(key)
if !exists {
return false
}
return time.Now().Unix() > expireAt.(int64)
}
// DEL key
func (db *RedisDb) Delete(key string) bool {
db.expires.Delete(key) // 删除过期时间
return db.dict.Delete(key)
}
8.2 Redis 命令映射
| Redis命令 | 字典操作 | 说明 |
|---|---|---|
SET key value | dict.Set(key, value) | O(1) 插入 |
GET key | dict.Get(key) | O(1) 查找 |
DEL key | dict.Delete(key) | O(1) 删除 |
EXISTS key | dict.Get(key) != nil | O(1) 检查存在 |
KEYS * | 遍历 dict | O(n) 遍历所有键 |
DBSIZE | dict.Size() | O(1) 获取键数量 |
FLUSHDB | dict.Clear() | O(n) 清空数据库 |
九、面试高频问答
哈希表的时间复杂度为什么是 O(1)?
答案:
哈希表通过哈希函数将键直接映射到数组索引,实现常数时间访问:
查找 "Alice":
1. 计算哈希: h("Alice") = 12345
2. 取模: 12345 % 8 = 5
3. 访问: table[5] → O(1) 直接索引
对比:
- 数组查找: 需要遍历 → O(n)
- 二分查找: 需要比较 log n 次 → O(log n)
- 哈希表查找: 直接索引 → O(1)
但要注意:
- 平均 O(1):假设哈希均匀分布
- 最坏 O(n):所有键冲突到同一个槽位 (极少发生)
为什么 Redis 使用链地址法而不是开放地址法?
答案:
| 对比维度 | 链地址法 (Redis) | 开放地址法 |
|---|---|---|
| 删除操作 | 简单 | 复杂 (需要标记) |
| 负载因子 | 可 > 1 | 必须 < 1 |
| 性能稳定性 | 稳定 | ⚠️ 高负载退化 |
| 并发控制 | 较容易 | 困难 |
Redis 的场景特点:
- 删除频繁:缓存过期、键删除
- 动态负载:键数量波动大
- 高并发:需要细粒度锁
结论: 链地址法更适合 Redis 的使用场景
什么是渐进式 Rehash? 为什么需要?
答案:
传统 Rehash 的问题:
一次性迁移 1000万键 → 阻塞 5秒 → 客户端超时
渐进式 Rehash 的解决方案:
分多次迁移,每次只迁移少量键:
- 每次操作迁移 1 个桶 (约1-2个键)
- 额外耗时仅 10微秒
- 用户完全无感知
实现要点:
- 双哈希表:ht[0] (旧表) + ht[1] (新表)
- 分散迁移:每次操作迁移一部分
- 查询两表:查找时先查 ht[0],再查 ht[1]
- 插入新表:新键只插入 ht[1]
负载因子应该设置为多少?
答案:
不同场景的选择:
| 场景 | 推荐负载因子 | 原因 |
|---|---|---|
| 内存敏感 | α = 2.0 | 节省空间 |
| 性能优先 | α = 0.75 | 平衡性能 |
| 极致性能 | α = 0.5 | 链表极短 |
Redis 的策略:
// 1. 正常情况: α ≥ 1.0 → 扩容
if used >= size {
expand()
}
// 2. BGSAVE期间: α ≥ 5.0 → 才扩容
// 原因: 写时复制(COW)机制,延迟扩容减少内存拷贝
if isBGSaving() && used >= size * 5 {
expand()
}
权衡:
- α 越小 → 性能越好,但浪费内存
- α 越大 → 节省内存,但性能下降
哈希函数应该满足什么要求?
答案:
好的哈希函数必须:
- 确定性:相同输入产生相同输出
- 均匀性:输出均匀分布,减少冲突
- 雪崩效应:输入微小变化 → 输出巨大变化
- 高效性:计算速度快
示例对比:
// 差的哈希函数
func badHash(s string) uint64 {
return uint64(len(s)) // 只依赖长度,冲突严重!
}
// 好的哈希函数 (FNV-1a)
func fnvHash(s string) uint64 {
h := uint64(14695981039346656037) // FNV offset basis
for _, c := range s {
h ^= uint64(c)
h *= 1099511628211 // FNV prime
}
return h
}
Redis 使用的哈希函数:
- SipHash:防止哈希洪水攻击 (Hash Flooding)
- MurmurHash:高性能场景
如何防止哈希洪水攻击?
答案:
攻击原理:
1. 攻击者构造大量键,使其哈希到同一个槽位
2. 哈希表退化为链表 → O(n) 查找
3. CPU 100%,服务拒绝
示例:
正常情况: hash("user1") = 5, hash("user2") = 3 (分散)
攻击情况: hash("aaa") = 5, hash("bbb") = 5, hash("ccc") = 5 (全冲突!)
防御方案:
1. 使用加密级哈希函数 (SipHash)
// SipHash 使用随机种子,攻击者无法预测哈希值
func sipHash(key []byte, seed [16]byte) uint64 {
// 密码学级别的哈希函数
// 即使知道算法,不知道 seed 也无法构造冲突
}
2. 随机化种子
// 每次启动使用不同的随机种子
seed := generateRandomSeed()
3. 限制单个槽位的链表长度
// 当链表过长时,拒绝插入或报警
if chainLength > MAX_CHAIN_LENGTH {
return error("possible hash flooding attack")
}
Redis 的防护:
- 默认使用 SipHash
- 随机种子在启动时生成
- 单个键值对大小限制 (512MB)
哈希表与跳表、B+树的选择?
答案:
| 数据结构 | 查找 | 插入 | 删除 | 范围查询 | 有序遍历 | 适用场景 |
|---|---|---|---|---|---|---|
| 哈希表 | O(1) | O(1) | O(1) | 不支持 | 无序 | 精确查找 (Redis String/Hash) |
| 跳表 | O(log n) | O(log n) | O(log n) | O(log n+k) | 有序 | 范围查询 (Redis Sorted Set) |
| B+树 | O(log n) | O(log n) | O(log n) | O(log n+k) | 有序 | 磁盘存储 (MySQL索引) |
选择建议:
- 需要 O(1) 精确查找 → 哈希表
- 需要范围查询 + 内存存储 → 跳表
- 需要范围查询 + 磁盘存储 → B+树
十、总结与扩展
10.1 核心要点回顾
- 哈希表的本质:通过哈希函数实现 O(1) 查找
- 冲突解决:链地址法 vs 开放地址法
- 渐进式 Rehash:Redis 无阻塞扩容的关键
- 负载因子:性能与空间的权衡
10.2 Redis 字典的优势
- O(1) 操作:查找、插入、删除都是常数时间
- 渐进式 rehash:无阻塞扩容
- 灵活定制:支持多种数据类型
- 并发友好:读写锁 + 细粒度锁
10.3 扩展阅读
- Redis源码:
redis/src/dict.c,redis/src/dict.h - 论文:SipHash - "SipHash: a fast short-input PRF"
- 应用案例:
- Redis 数据库键空间
- Redis Hash 类型内部实现
- Redis 订阅频道映射
十一、实战练习
练习1:实现一致性哈希
// 提示:环形哈希 + 虚拟节点
type ConsistentHash struct {
circle map[uint64]string // 哈希环
nodes []uint64 // 排序的哈希值
}
练习2:实现 LRU 缓存
// 提示:哈希表 + 双向链表
type LRUCache struct {
capacity int
cache map[string]*Node
head *Node
tail *Node
}
练习3:实现布隆过滤器
// 提示:多个哈希函数 + 位数组
type BloomFilter struct {
bits []bool
hashFuncs []func(string) uint64
}
下一章预告: 有序集合 (Sorted Set) 的实现,结合哈希表与跳表,支持 O(1) 查找和 O(log n) 范围查询。