有序集合实现
学习目标
- 深入理解有序集合的双数据结构设计原理
- 掌握跳表+字典的协同工作机制
- 实现完整的 ZADD/ZRANGE/ZRANK 等命令
- 理解分数更新和排名计算的优化策略
- 掌握有序集合的性能优化技巧
️ 有序集合架构设计
1. 双数据结构设计
Redis 有序集合使用跳表+字典的双数据结构设计,实现 O(log n) 的插入、删除、查找和范围查询:
┌─────────────────────────────────────────────────────────────┐
│ 有序集合架构 │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 跳表 (ZSet) │ │ 字典 (Dict) │ │
│ │ │ │ │ │
│ │ ┌───────────┐ │ │ ┌───────────┐ │ │
│ │ │ member1 │ │ │ │ member1 │ │ │
│ │ │ score:5 │ │ │ │ score:5 │ │ │
│ │ └───────────┘ │ │ └───────────┘ │ │
│ │ ┌───────────┐ │ │ ┌───────────┐ │ │
│ │ │ member2 │ │ │ │ member2 │ │ │
│ │ │ score:3 │ │ │ │ score:3 │ │ │
│ │ └───────────┘ │ │ └───────────┘ │ │
│ │ ┌───────────┐ │ │ ┌───────────┐ │ │
│ │ │ member3 │ │ │ │ member3 │ │ │
│ │ │ score:7 │ │ │ │ score:7 │ │ │
│ │ └───────────┘ │ │ └───────────┘ │ │
│ └─────────────────┘ └─────────────────┘ │
│ │ │ │
│ └───────────┬───────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 协同工作机制 │ │
│ │ • 跳表:按分数排序,支持范围查询 │ │
│ │ • 字典:按成员查找,支持 O(1) 访问 │ │
│ │ • 数据一致性:两个结构保持同步 │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
2. 数据结构选择原因
跳表的优势:
- 支持按分数排序
- 高效的范围查询 O(log n + k)
- 简单的实现和维护
- 良好的并发性能
字典的优势:
- O(1) 的成员查找
- 支持成员存在性检查
- 快速更新分数
- 内存效率高
️ Go 语言有序集合实现
1. 基础结构定义
// zset/zset.go
package zset
import (
"fmt"
"math/rand"
"sync"
"time"
)
// 有序集合成员
type ZSetMember struct {
Member string
Score float64
}
// 跳表节点
type ZSkipListNode struct {
member string
score float64
level int
forward []*ZSkipListNode
backward *ZSkipListNode
}
// 跳表
type ZSkipList struct {
header *ZSkipListNode
tail *ZSkipListNode
length int
level int
maxLevel int
}
// 有序集合
type ZSet struct {
dict map[string]*ZSkipListNode // 字典:member -> node
zsl *ZSkipList // 跳表:按分数排序
mu sync.RWMutex
}
// 创建有序集合
func NewZSet() *ZSet {
return &ZSet{
dict: make(map[string]*ZSkipListNode),
zsl: NewZSkipList(),
}
}
// 创建跳表
func NewZSkipList() *ZSkipList {
header := &ZSkipListNode{
member: "",
score: 0,
level: 0,
forward: make([]*ZSkipListNode, 32),
backward: nil,
}
return &ZSkipList{
header: header,
tail: nil,
length: 0,
level: 0,
maxLevel: 32,
}
}
// 创建跳表节点
func newZSkipListNode(member string, score float64, level int) *ZSkipListNode {
return &ZSkipListNode{
member: member,
score: score,
level: level,
forward: make([]*ZSkipListNode, level+1),
backward: nil,
}
}
// 随机生成层数
func (zsl *ZSkipList) randomLevel() int {
level := 0
for rand.Float64() < 0.5 && level < zsl.maxLevel-1 {
level++
}
return level
}
2. 跳表操作实现
// zset/skiplist_operations.go
package zset
// 插入节点
func (zsl *ZSkipList) Insert(member string, score float64) *ZSkipListNode {
// 查找插入位置
update := make([]*ZSkipListNode, zsl.maxLevel)
current := zsl.header
// 从最高层开始搜索
for i := zsl.level; i >= 0; i-- {
for current.forward[i] != nil &&
(current.forward[i].score < score ||
(current.forward[i].score == score &&
current.forward[i].member < member)) {
current = current.forward[i]
}
update[i] = current
}
// 生成随机层数
level := zsl.randomLevel()
// 如果新层数大于当前层数,更新头节点
if level > zsl.level {
for i := zsl.level + 1; i <= level; i++ {
update[i] = zsl.header
}
zsl.level = level
}
// 创建新节点
newNode := newZSkipListNode(member, score, level)
// 插入节点
for i := 0; i <= level; i++ {
newNode.forward[i] = update[i].forward[i]
update[i].forward[i] = newNode
}
// 设置前向指针
if newNode.forward[0] != nil {
newNode.forward[0].backward = newNode
} else {
zsl.tail = newNode
}
newNode.backward = update[0]
zsl.length++
return newNode
}
// 删除节点
func (zsl *ZSkipList) Delete(member string, score float64) bool {
// 查找节点
update := make([]*ZSkipListNode, zsl.maxLevel)
current := zsl.header
for i := zsl.level; i >= 0; i-- {
for current.forward[i] != nil &&
(current.forward[i].score < score ||
(current.forward[i].score == score &&
current.forward[i].member < member)) {
current = current.forward[i]
}
update[i] = current
}
current = current.forward[0]
// 检查是否找到
if current == nil || current.member != member || current.score != score {
return false
}
// 删除节点
for i := 0; i <= current.level; i++ {
update[i].forward[i] = current.forward[i]
}
// 更新前向指针
if current.forward[0] != nil {
current.forward[0].backward = current.backward
} else {
zsl.tail = current.backward
}
if current.backward != nil {
current.backward.forward[0] = current.forward[0]
}
// 更新层数
for zsl.level > 0 && zsl.header.forward[zsl.level] == nil {
zsl.level--
}
zsl.length--
return true
}
// 查找节点
func (zsl *ZSkipList) Find(member string, score float64) *ZSkipListNode {
current := zsl.header
for i := zsl.level; i >= 0; i-- {
for current.forward[i] != nil &&
(current.forward[i].score < score ||
(current.forward[i].score == score &&
current.forward[i].member < member)) {
current = current.forward[i]
}
}
current = current.forward[0]
if current != nil && current.member == member && current.score == score {
return current
}
return nil
}
// 获取排名
func (zsl *ZSkipList) GetRank(member string, score float64) int {
rank := 0
current := zsl.header
for i := zsl.level; i >= 0; i-- {
for current.forward[i] != nil &&
(current.forward[i].score < score ||
(current.forward[i].score == score &&
current.forward[i].member < member)) {
rank += (1 << i)
current = current.forward[i]
}
}
current = current.forward[0]
if current != nil && current.member == member && current.score == score {
return rank
}
return -1
}
// 根据排名获取节点
func (zsl *ZSkipList) GetByRank(rank int) *ZSkipListNode {
if rank < 0 || rank >= zsl.length {
return nil
}
current := zsl.header
currentRank := 0
for i := zsl.level; i >= 0; i-- {
for current.forward[i] != nil &&
currentRank+(1<<i) <= rank {
currentRank += (1 << i)
current = current.forward[i]
}
}
return current
}
// 获取范围排名
func (zsl *ZSkipList) GetRangeByRank(start, end int) []*ZSkipListNode {
if start < 0 || end >= zsl.length || start > end {
return nil
}
var result []*ZSkipListNode
current := zsl.header
currentRank := 0
// 移动到起始位置
for i := zsl.level; i >= 0; i-- {
for current.forward[i] != nil &&
currentRank+(1<<i) <= start {
currentRank += (1 << i)
current = current.forward[i]
}
}
// 收集范围内的节点
for i := start; i <= end; i++ {
if current != nil {
result = append(result, current)
current = current.forward[0]
}
}
return result
}
// 获取分数范围
func (zsl *ZSkipList) GetRangeByScore(minScore, maxScore float64) []*ZSkipListNode {
var result []*ZSkipListNode
current := zsl.header
// 移动到起始位置
for i := zsl.level; i >= 0; i-- {
for current.forward[i] != nil &&
current.forward[i].score < minScore {
current = current.forward[i]
}
}
// 收集范围内的节点
current = current.forward[0]
for current != nil && current.score <= maxScore {
result = append(result, current)
current = current.forward[0]
}
return result
}
// 获取长度
func (zsl *ZSkipList) Length() int {
return zsl.length
}
3. 有序集合操作实现
// zset/zset_operations.go
package zset
// 添加成员
func (zs *ZSet) ZAdd(member string, score float64) int {
zs.mu.Lock()
defer zs.mu.Unlock()
// 检查成员是否已存在
if existingNode, exists := zs.dict[member]; exists {
// 更新分数
oldScore := existingNode.score
if oldScore == score {
return 0 // 分数未变化
}
// 从跳表中删除旧节点
zs.zsl.Delete(member, oldScore)
// 插入新节点
newNode := zs.zsl.Insert(member, score)
zs.dict[member] = newNode
return 1 // 更新了一个成员
} else {
// 添加新成员
newNode := zs.zsl.Insert(member, score)
zs.dict[member] = newNode
return 1 // 添加了一个成员
}
}
// 删除成员
func (zs *ZSet) ZRem(member string) int {
zs.mu.Lock()
defer zs.mu.Unlock()
if node, exists := zs.dict[member]; exists {
// 从跳表中删除
zs.zsl.Delete(member, node.score)
// 从字典中删除
delete(zs.dict, member)
return 1 // 删除了一个成员
}
return 0 // 成员不存在
}
// 获取成员分数
func (zs *ZSet) ZScore(member string) (float64, bool) {
zs.mu.RLock()
defer zs.mu.RUnlock()
if node, exists := zs.dict[member]; exists {
return node.score, true
}
return 0, false
}
// 获取成员排名
func (zs *ZSet) ZRank(member string) (int, bool) {
zs.mu.RLock()
defer zs.mu.RUnlock()
if node, exists := zs.dict[member]; exists {
rank := zs.zsl.GetRank(member, node.score)
return rank, true
}
return -1, false
}
// 获取成员排名(从后往前)
func (zs *ZSet) ZRevRank(member string) (int, bool) {
zs.mu.RLock()
defer zs.mu.RUnlock()
if node, exists := zs.dict[member]; exists {
rank := zs.zsl.GetRank(member, node.score)
if rank != -1 {
return zs.zsl.Length() - 1 - rank, true
}
}
return -1, false
}
// 获取成员数量
func (zs *ZSet) ZCard() int {
zs.mu.RLock()
defer zs.mu.RUnlock()
return zs.zsl.Length()
}
// 检查成员是否存在
func (zs *ZSet) ZExists(member string) bool {
zs.mu.RLock()
defer zs.mu.RUnlock()
_, exists := zs.dict[member]
return exists
}
4. 范围查询实现
// zset/range_operations.go
package zset
// 按排名范围获取成员
func (zs *ZSet) ZRange(start, end int) []ZSetMember {
zs.mu.RLock()
defer zs.mu.RUnlock()
nodes := zs.zsl.GetRangeByRank(start, end)
if nodes == nil {
return nil
}
result := make([]ZSetMember, len(nodes))
for i, node := range nodes {
result[i] = ZSetMember{
Member: node.member,
Score: node.score,
}
}
return result
}
// 按排名范围获取成员(从后往前)
func (zs *ZSet) ZRevRange(start, end int) []ZSetMember {
zs.mu.RLock()
defer zs.mu.RUnlock()
length := zs.zsl.Length()
if start < 0 {
start = length + start
}
if end < 0 {
end = length + end
}
if start > end || start >= length {
return nil
}
// 转换为正向排名
revStart := length - 1 - end
revEnd := length - 1 - start
nodes := zs.zsl.GetRangeByRank(revStart, revEnd)
if nodes == nil {
return nil
}
// 反转结果
result := make([]ZSetMember, len(nodes))
for i, node := range nodes {
result[len(nodes)-1-i] = ZSetMember{
Member: node.member,
Score: node.score,
}
}
return result
}
// 按分数范围获取成员
func (zs *ZSet) ZRangeByScore(minScore, maxScore float64) []ZSetMember {
zs.mu.RLock()
defer zs.mu.RUnlock()
nodes := zs.zsl.GetRangeByScore(minScore, maxScore)
if nodes == nil {
return nil
}
result := make([]ZSetMember, len(nodes))
for i, node := range nodes {
result[i] = ZSetMember{
Member: node.member,
Score: node.score,
}
}
return result
}
// 按分数范围获取成员(带限制)
func (zs *ZSet) ZRangeByScoreWithLimit(minScore, maxScore float64, offset, count int) []ZSetMember {
zs.mu.RLock()
defer zs.mu.RUnlock()
nodes := zs.zsl.GetRangeByScore(minScore, maxScore)
if nodes == nil {
return nil
}
// 应用偏移和限制
if offset >= len(nodes) {
return nil
}
end := offset + count
if end > len(nodes) {
end = len(nodes)
}
result := make([]ZSetMember, end-offset)
for i := offset; i < end; i++ {
result[i-offset] = ZSetMember{
Member: nodes[i].member,
Score: nodes[i].score,
}
}
return result
}
// 按排名范围获取成员(带分数)
func (zs *ZSet) ZRangeWithScores(start, end int) []ZSetMember {
return zs.ZRange(start, end)
}
// 按排名范围获取成员(从后往前,带分数)
func (zs *ZSet) ZRevRangeWithScores(start, end int) []ZSetMember {
return zs.ZRevRange(start, end)
}
// 按分数范围获取成员(带分数)
func (zs *ZSet) ZRangeByScoreWithScores(minScore, maxScore float64) []ZSetMember {
return zs.ZRangeByScore(minScore, maxScore)
}
5. 统计操作实现
// zset/count_operations.go
package zset
// 统计分数范围内的成员数量
func (zs *ZSet) ZCount(minScore, maxScore float64) int {
zs.mu.RLock()
defer zs.mu.RUnlock()
count := 0
current := zs.zsl.header
// 移动到起始位置
for i := zs.zsl.level; i >= 0; i-- {
for current.forward[i] != nil &&
current.forward[i].score < minScore {
current = current.forward[i]
}
}
// 统计范围内的成员
current = current.forward[0]
for current != nil && current.score <= maxScore {
count++
current = current.forward[0]
}
return count
}
// 获取分数范围内的成员数量(带限制)
func (zs *ZSet) ZCountWithLimit(minScore, maxScore float64, offset, count int) int {
zs.mu.RLock()
defer zs.mu.RUnlock()
totalCount := 0
current := zs.zsl.header
// 移动到起始位置
for i := zs.zsl.level; i >= 0; i-- {
for current.forward[i] != nil &&
current.forward[i].score < minScore {
current = current.forward[i]
}
}
// 统计范围内的成员
current = current.forward[0]
for current != nil && current.score <= maxScore {
if totalCount >= offset {
if totalCount-offset >= count {
break
}
}
totalCount++
current = current.forward[0]
}
if totalCount <= offset {
return 0
}
result := totalCount - offset
if result > count {
result = count
}
return result
}
// 获取分数范围内的成员数量(排除边界)
func (zs *ZSet) ZCountExclusive(minScore, maxScore float64) int {
zs.mu.RLock()
defer zs.mu.RUnlock()
count := 0
current := zs.zsl.header
// 移动到起始位置
for i := zs.zsl.level; i >= 0; i-- {
for current.forward[i] != nil &&
current.forward[i].score <= minScore {
current = current.forward[i]
}
}
// 统计范围内的成员(排除边界)
current = current.forward[0]
for current != nil && current.score < maxScore {
count++
current = current.forward[0]
}
return count
}
6. 集合运算实现
// zset/set_operations.go
package zset
// 并集
func (zs *ZSet) ZUnion(other *ZSet) *ZSet {
result := NewZSet()
// 添加当前集合的所有成员
for member, node := range zs.dict {
result.ZAdd(member, node.score)
}
// 添加另一个集合的所有成员
for member, node := range other.dict {
result.ZAdd(member, node.score)
}
return result
}
// 交集
func (zs *ZSet) ZInter(other *ZSet) *ZSet {
result := NewZSet()
// 遍历较小的集合
if zs.ZCard() < other.ZCard() {
for member, node := range zs.dict {
if other.ZExists(member) {
result.ZAdd(member, node.score)
}
}
} else {
for member, node := range other.dict {
if zs.ZExists(member) {
result.ZAdd(member, node.score)
}
}
}
return result
}
// 差集
func (zs *ZSet) ZDiff(other *ZSet) *ZSet {
result := NewZSet()
// 添加当前集合中不在另一个集合中的成员
for member, node := range zs.dict {
if !other.ZExists(member) {
result.ZAdd(member, node.score)
}
}
return result
}
// 对称差集
func (zs *ZSet) ZSymDiff(other *ZSet) *ZSet {
result := NewZSet()
// 添加当前集合中不在另一个集合中的成员
for member, node := range zs.dict {
if !other.ZExists(member) {
result.ZAdd(member, node.score)
}
}
// 添加另一个集合中不在当前集合中的成员
for member, node := range other.dict {
if !zs.ZExists(member) {
result.ZAdd(member, node.score)
}
}
return result
}
测试验证
1. 单元测试
// zset/zset_test.go
package zset
import (
"testing"
)
func TestZSetBasic(t *testing.T) {
zs := NewZSet()
// 测试添加成员
count := zs.ZAdd("member1", 5.0)
if count != 1 {
t.Errorf("Expected count 1, got %d", count)
}
count = zs.ZAdd("member2", 3.0)
if count != 1 {
t.Errorf("Expected count 1, got %d", count)
}
count = zs.ZAdd("member3", 7.0)
if count != 1 {
t.Errorf("Expected count 1, got %d", count)
}
// 测试获取分数
score, exists := zs.ZScore("member1")
if !exists || score != 5.0 {
t.Errorf("Expected score 5.0, got %f", score)
}
// 测试获取排名
rank, exists := zs.ZRank("member1")
if !exists || rank != 1 {
t.Errorf("Expected rank 1, got %d", rank)
}
// 测试范围查询
members := zs.ZRange(0, 2)
if len(members) != 3 {
t.Errorf("Expected 3 members, got %d", len(members))
}
// 验证排序
if members[0].Member != "member2" || members[0].Score != 3.0 {
t.Errorf("Expected member2 with score 3.0, got %s with score %f",
members[0].Member, members[0].Score)
}
if members[1].Member != "member1" || members[1].Score != 5.0 {
t.Errorf("Expected member1 with score 5.0, got %s with score %f",
members[1].Member, members[1].Score)
}
if members[2].Member != "member3" || members[2].Score != 7.0 {
t.Errorf("Expected member3 with score 7.0, got %s with score %f",
members[2].Member, members[2].Score)
}
}
func TestZSetUpdate(t *testing.T) {
zs := NewZSet()
// 添加成员
zs.ZAdd("member1", 5.0)
// 更新分数
count := zs.ZAdd("member1", 8.0)
if count != 1 {
t.Errorf("Expected count 1, got %d", count)
}
// 验证更新
score, exists := zs.ZScore("member1")
if !exists || score != 8.0 {
t.Errorf("Expected score 8.0, got %f", score)
}
// 验证排名
rank, exists := zs.ZRank("member1")
if !exists || rank != 0 {
t.Errorf("Expected rank 0, got %d", rank)
}
}
func TestZSetRange(t *testing.T) {
zs := NewZSet()
// 添加测试数据
for i := 0; i < 10; i++ {
member := fmt.Sprintf("member%d", i)
score := float64(i)
zs.ZAdd(member, score)
}
// 测试按排名范围查询
members := zs.ZRange(0, 4)
if len(members) != 5 {
t.Errorf("Expected 5 members, got %d", len(members))
}
// 验证排序
for i, member := range members {
expected := fmt.Sprintf("member%d", i)
if member.Member != expected {
t.Errorf("Expected %s, got %s", expected, member.Member)
}
}
// 测试按分数范围查询
members = zs.ZRangeByScore(3.0, 7.0)
if len(members) != 5 {
t.Errorf("Expected 5 members, got %d", len(members))
}
// 验证分数范围
for _, member := range members {
if member.Score < 3.0 || member.Score > 7.0 {
t.Errorf("Score %f out of range [3.0, 7.0]", member.Score)
}
}
}
func TestZSetCount(t *testing.T) {
zs := NewZSet()
// 添加测试数据
for i := 0; i < 10; i++ {
member := fmt.Sprintf("member%d", i)
score := float64(i)
zs.ZAdd(member, score)
}
// 测试统计
count := zs.ZCount(3.0, 7.0)
if count != 5 {
t.Errorf("Expected count 5, got %d", count)
}
count = zs.ZCount(0.0, 9.0)
if count != 10 {
t.Errorf("Expected count 10, got %d", count)
}
count = zs.ZCount(10.0, 20.0)
if count != 0 {
t.Errorf("Expected count 0, got %d", count)
}
}
func TestZSetSetOperations(t *testing.T) {
zs1 := NewZSet()
zs2 := NewZSet()
// 添加数据到第一个集合
zs1.ZAdd("member1", 1.0)
zs1.ZAdd("member2", 2.0)
zs1.ZAdd("member3", 3.0)
// 添加数据到第二个集合
zs2.ZAdd("member2", 2.0)
zs2.ZAdd("member3", 3.0)
zs2.ZAdd("member4", 4.0)
// 测试并集
union := zs1.ZUnion(zs2)
if union.ZCard() != 4 {
t.Errorf("Expected union size 4, got %d", union.ZCard())
}
// 测试交集
inter := zs1.ZInter(zs2)
if inter.ZCard() != 2 {
t.Errorf("Expected intersection size 2, got %d", inter.ZCard())
}
// 测试差集
diff := zs1.ZDiff(zs2)
if diff.ZCard() != 1 {
t.Errorf("Expected difference size 1, got %d", diff.ZCard())
}
}
2. 性能基准测试
// zset/benchmark_test.go
package zset
import (
"testing"
)
func BenchmarkZAdd(b *testing.B) {
zs := NewZSet()
b.ResetTimer()
for i := 0; i < b.N; i++ {
member := fmt.Sprintf("member%d", i)
score := float64(i)
zs.ZAdd(member, score)
}
}
func BenchmarkZScore(b *testing.B) {
zs := NewZSet()
// 预填充数据
for i := 0; i < 1000; i++ {
member := fmt.Sprintf("member%d", i)
score := float64(i)
zs.ZAdd(member, score)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
member := fmt.Sprintf("member%d", i%1000)
zs.ZScore(member)
}
}
func BenchmarkZRank(b *testing.B) {
zs := NewZSet()
// 预填充数据
for i := 0; i < 1000; i++ {
member := fmt.Sprintf("member%d", i)
score := float64(i)
zs.ZAdd(member, score)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
member := fmt.Sprintf("member%d", i%1000)
zs.ZRank(member)
}
}
func BenchmarkZRange(b *testing.B) {
zs := NewZSet()
// 预填充数据
for i := 0; i < 1000; i++ {
member := fmt.Sprintf("member%d", i)
score := float64(i)
zs.ZAdd(member, score)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
zs.ZRange(0, 99)
}
}
func BenchmarkZRangeByScore(b *testing.B) {
zs := NewZSet()
// 预填充数据
for i := 0; i < 1000; i++ {
member := fmt.Sprintf("member%d", i)
score := float64(i)
zs.ZAdd(member, score)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
zs.ZRangeByScore(0.0, 100.0)
}
}
func BenchmarkZCount(b *testing.B) {
zs := NewZSet()
// 预填充数据
for i := 0; i < 1000; i++ {
member := fmt.Sprintf("member%d", i)
score := float64(i)
zs.ZAdd(member, score)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
zs.ZCount(0.0, 100.0)
}
}
3. 并发测试
// zset/concurrent_test.go
package zset
import (
"sync"
"testing"
)
func TestZSetConcurrent(t *testing.T) {
zs := NewZSet()
const numGoroutines = 10
const numOperations = 1000
var wg sync.WaitGroup
// 并发写入
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
member := fmt.Sprintf("member_%d_%d", goroutineID, j)
score := float64(goroutineID*numOperations + j)
zs.ZAdd(member, score)
}
}(i)
}
wg.Wait()
// 验证结果
expectedCount := numGoroutines * numOperations
actualCount := zs.ZCard()
if actualCount != expectedCount {
t.Errorf("Expected count %d, got %d", expectedCount, actualCount)
}
}
func TestZSetConcurrentReadWrite(t *testing.T) {
zs := NewZSet()
const numGoroutines = 10
const numOperations = 1000
var wg sync.WaitGroup
// 预填充数据
for i := 0; i < 1000; i++ {
member := fmt.Sprintf("member%d", i)
score := float64(i)
zs.ZAdd(member, score)
}
// 并发读取
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
member := fmt.Sprintf("member%d", j%1000)
zs.ZScore(member)
zs.ZRank(member)
}
}(i)
}
// 并发写入
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
member := fmt.Sprintf("new_member_%d_%d", goroutineID, j)
score := float64(goroutineID*numOperations + j)
zs.ZAdd(member, score)
}
}(i)
}
wg.Wait()
// 验证结果
expectedCount := 1000 + numGoroutines*numOperations
actualCount := zs.ZCard()
if actualCount != expectedCount {
t.Errorf("Expected count %d, got %d", expectedCount, actualCount)
}
}
性能对比分析
1. 操作复杂度对比
操作 | 时间复杂度 | 空间复杂度 | 说明 |
---|---|---|---|
ZADD | O(log n) | O(1) | 跳表插入 + 字典更新 |
ZREM | O(log n) | O(1) | 跳表删除 + 字典删除 |
ZSCORE | O(1) | O(1) | 字典查找 |
ZRANK | O(log n) | O(1) | 跳表排名计算 |
ZRANGE | O(log n + k) | O(k) | k 为结果数量 |
ZRANGEBYSCORE | O(log n + k) | O(k) | k 为结果数量 |
2. 内存使用对比
特性 | 跳表+字典 | 平衡树 | 说明 |
---|---|---|---|
内存开销 | 中等 | 中等 | 跳表需要额外指针 |
内存局部性 | 好 | 好 | 跳表节点连续存储 |
缓存友好性 | 好 | 好 | 跳表访问模式更好 |
内存碎片 | 少 | 少 | 连续分配减少碎片 |
3. 性能测试结果
// 基准测试结果示例
func BenchmarkComparison(b *testing.B) {
// 有序集合性能
b.Run("ZSet", func(b *testing.B) {
zs := NewZSet()
for i := 0; i < b.N; i++ {
member := fmt.Sprintf("member%d", i)
score := float64(i)
zs.ZAdd(member, score)
}
})
// Go 切片性能(模拟)
b.Run("GoSlice", func(b *testing.B) {
var slice []ZSetMember
for i := 0; i < b.N; i++ {
member := fmt.Sprintf("member%d", i)
score := float64(i)
slice = append(slice, ZSetMember{Member: member, Score: score})
}
})
}
面试要点
1. 有序集合的设计原理
答案要点:
- 双数据结构:跳表+字典协同工作
- 跳表作用:按分数排序,支持范围查询
- 字典作用:O(1) 成员查找和分数更新
- 数据一致性:两个结构保持同步
2. 跳表 vs 平衡树的优势
答案要点:
- 实现简单:跳表比平衡树更容易实现
- 并发友好:支持无锁并发操作
- 范围查询:高效支持范围查询
- 内存效率:比平衡树使用更少内存
3. 分数更新的优化策略
答案要点:
- 先删除后插入:避免跳表结构破坏
- 原子操作:确保数据一致性
- 批量更新:减少锁竞争
- 内存预分配:减少内存分配开销
4. 范围查询的优化
答案要点:
- 索引优化:使用跳表快速定位
- 批量处理:一次处理多个结果
- 内存复用:减少内存分配
- 缓存友好:提高访问效率
总结
通过本章学习,我们深入理解了:
- 有序集合的双数据结构设计原理
- 跳表+字典的协同工作机制实现
- 完整的 ZADD/ZRANGE/ZRANK 等命令实现
- 分数更新和排名计算的优化策略
- 有序集合的性能优化技巧
有序集合为 Redis 提供了高效的有序数据存储和查询能力。在下一章中,我们将学习内存管理与对象系统,了解 Redis 如何优化内存使用和对象生命周期管理。