Redis 架构总览与线程模型
学习目标
- 深入理解 Redis 的单线程架构设计
- 掌握事件循环和 I/O 多路复用机制
- 理解为什么单线程 Redis 仍然高性能
- 手写一个简化版的 Redis 服务器
️ Redis 架构总览
核心设计理念
Redis 采用单线程事件驱动的架构设计,这是其高性能的关键所在:
┌─────────────────────────────────────────────────────────────┐
│ Redis 服务器架构 │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 客户端1 │ │ 客户端2 │ │ 客户端N │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 网络 I/O 层 │ │
│ │ (epoll/kqueue/select) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 事件循环 (Event Loop) │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ File Events │ │Time Events │ │ 命令执行器 │ │ │
│ │ │ (网络I/O) │ │ (定时任务) │ │ (单线程) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 数据存储层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ String │ │ Hash │ │ List │ │ ZSet │ ... │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
单线程执行模型
Redis 的核心执行模型是单线程串行执行:
- 主线程:负责所有命令的解析、执行和响应
- 无锁设计:避免了多线程的锁竞争和上下文切换
- 原子操作:所有命令都是原子性的,不会被打断
事件循环机制
事件类型
Redis 的事件循环处理两种类型的事件:
1. File Events(文件事件)
- 网络 I/O 事件:客户端连接、数据读取、数据写入
- 实现方式:I/O 多路复用(epoll/kqueue/select)
- 处理流程:监听 → 就绪 → 处理 → 响应
2. Time Events(时间事件)
- 定时任务:过期键清理、AOF 刷盘、统计信息更新
- 实现方式:时间轮或最小堆
- 执行时机:在文件事件处理完成后执行
事件循环流程
func eventLoop() {
for {
// 1. 处理文件事件(网络 I/O)
processFileEvents()
// 2. 处理时间事件(定时任务)
processTimeEvents()
// 3. 等待新事件
waitForEvents()
}
}
高性能来源分析
1. 内存访问模式
- 顺序访问:Redis 数据结构设计优化了内存访问模式
- 缓存友好:充分利用 CPU 缓存,减少内存访问延迟
- 紧凑存储:使用压缩数据结构,提高内存利用率
2. 避免系统调用开销
- 批量处理:一次系统调用处理多个事件
- 零拷贝:减少数据在内核态和用户态之间的拷贝
- 管道化:支持客户端批量发送命令
3. 高效的数据结构
- SDS:优化的字符串实现
- 跳跃表:O(log n) 的有序集合操作
- 压缩列表:紧凑的列表和哈希实现
️ 手写实现:Mini Redis
让我们实现一个简化版的 Redis 服务器来理解核心架构:
项目结构
mini_redis/
├── main.go # 主程序入口
├── server/
│ ├── server.go # 服务器实现
│ ├── connection.go # 连接处理
│ └── command.go # 命令处理
├── protocol/
│ ├── resp.go # RESP 协议解析
│ └── encoder.go # 响应编码
├── storage/
│ ├── store.go # 存储引擎
│ └── types.go # 数据类型定义
└── utils/
└── logger.go # 日志工具
核心实现
1. 服务器结构
// server/server.go
package server
import (
"net"
"sync"
"time"
)
type Server struct {
addr string
store *storage.Store
listener net.Listener
stopCh chan struct{}
wg sync.WaitGroup
}
func NewServer(addr string) *Server {
return &Server{
addr: addr,
store: storage.NewStore(),
stopCh: make(chan struct{}),
}
}
func (s *Server) Start() error {
var err error
s.listener, err = net.Listen("tcp", s.addr)
if err != nil {
return err
}
// 启动过期键清理协程
go s.startExpirationCleaner()
// 主事件循环
for {
conn, err := s.listener.Accept()
if err != nil {
select {
case <-s.stopCh:
return nil
default:
continue
}
}
// 为每个连接启动处理协程
s.wg.Add(1)
go s.handleConnection(conn)
}
}
func (s *Server) Stop() {
close(s.stopCh)
s.listener.Close()
s.wg.Wait()
}
2. 连接处理
// server/connection.go
package server
import (
"bufio"
"net"
"time"
)
func (s *Server) handleConnection(conn net.Conn) {
defer func() {
conn.Close()
s.wg.Done()
}()
reader := bufio.NewReader(conn)
writer := bufio.NewWriter(conn)
for {
// 设置读取超时
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
// 解析命令
cmd, err := protocol.ParseCommand(reader)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
return // 超时断开连接
}
protocol.WriteError(writer, "ERR protocol error")
writer.Flush()
return
}
// 执行命令
result := s.executeCommand(cmd)
// 发送响应
protocol.WriteResponse(writer, result)
writer.Flush()
}
}
3. 命令执行
// server/command.go
package server
import (
"strconv"
"time"
)
func (s *Server) executeCommand(cmd *protocol.Command) *protocol.Response {
switch cmd.Name {
case "PING":
return s.handlePing(cmd)
case "SET":
return s.handleSet(cmd)
case "GET":
return s.handleGet(cmd)
case "DEL":
return s.handleDel(cmd)
case "EXPIRE":
return s.handleExpire(cmd)
case "TTL":
return s.handleTTL(cmd)
case "INFO":
return s.handleInfo(cmd)
default:
return protocol.NewErrorResponse("ERR unknown command")
}
}
func (s *Server) handleSet(cmd *protocol.Command) *protocol.Response {
if len(cmd.Args) < 2 {
return protocol.NewErrorResponse("ERR wrong number of arguments")
}
key := cmd.Args[0]
value := cmd.Args[1]
ttl := int64(0)
// 处理 EX 参数
if len(cmd.Args) >= 4 && cmd.Args[2] == "EX" {
if sec, err := strconv.ParseInt(cmd.Args[3], 10, 64); err == nil {
ttl = sec
}
}
s.store.Set(key, value, ttl)
return protocol.NewSimpleStringResponse("OK")
}
func (s *Server) handleGet(cmd *protocol.Command) *protocol.Response {
if len(cmd.Args) != 1 {
return protocol.NewErrorResponse("ERR wrong number of arguments")
}
key := cmd.Args[0]
value, exists := s.store.Get(key)
if !exists {
return protocol.NewNullBulkStringResponse()
}
return protocol.NewBulkStringResponse(value)
}
4. 存储引擎
// storage/store.go
package storage
import (
"sync"
"time"
)
type Entry struct {
Value string
ExpiresAt int64 // Unix 时间戳,0 表示永不过期
}
type Store struct {
mu sync.RWMutex
data map[string]*Entry
}
func NewStore() *Store {
return &Store{
data: make(map[string]*Entry),
}
}
func (s *Store) Set(key, value string, ttl int64) {
s.mu.Lock()
defer s.mu.Unlock()
var expiresAt int64
if ttl > 0 {
expiresAt = time.Now().Unix() + ttl
}
s.data[key] = &Entry{
Value: value,
ExpiresAt: expiresAt,
}
}
func (s *Store) Get(key string) (string, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
entry, exists := s.data[key]
if !exists {
return "", false
}
// 检查是否过期
if entry.ExpiresAt > 0 && entry.ExpiresAt <= time.Now().Unix() {
return "", false
}
return entry.Value, true
}
func (s *Store) Delete(key string) bool {
s.mu.Lock()
defer s.mu.Unlock()
_, exists := s.data[key]
if exists {
delete(s.data, key)
}
return exists
}
5. 过期键清理
// server/expiration.go
package server
import (
"time"
)
func (s *Server) startExpirationCleaner() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.cleanExpiredKeys()
case <-s.stopCh:
return
}
}
}
func (s *Server) cleanExpiredKeys() {
now := time.Now().Unix()
s.store.CleanExpired(now)
}
测试验证
运行服务器
# 编译并运行
go run main.go -addr :6380
# 在另一个终端测试
redis-cli -p 6380
测试命令
# 基础命令测试
127.0.0.1:6380> PING
PONG
127.0.0.1:6380> SET key value EX 10
OK
127.0.0.1:6380> GET key
"value"
127.0.0.1:6380> TTL key
(integer) 8
127.0.0.1:6380> DEL key
(integer) 1
127.0.0.1:6380> GET key
(nil)
性能测试
# 使用 redis-benchmark 测试
redis-benchmark -h 127.0.0.1 -p 6380 -t set,get -n 10000 -c 100
面试要点
1. 为什么 Redis 使用单线程?
答案要点:
- 避免锁竞争:多线程需要锁机制,增加复杂性和性能开销
- 简化设计:单线程模型更容易理解和维护
- CPU 不是瓶颈:Redis 主要受内存和网络 I/O 限制
- 原子操作:所有命令都是原子性的,保证数据一致性
2. 单线程 Redis 为什么还这么快?
答案要点:
- 内存操作:数据存储在内存中,访问速度极快
- 高效数据结构:使用优化的数据结构减少计算复杂度
- I/O 多路复用:epoll/kqueue 高效处理大量并发连接
- 避免上下文切换:单线程避免了线程切换的开销
- 管道化:支持批量命令处理
3. Redis 的事件循环是如何工作的?
答案要点:
- 文件事件:处理网络 I/O,使用 I/O 多路复用
- 时间事件:处理定时任务,如过期键清理
- 事件优先级:文件事件优先于时间事件
- 非阻塞:所有操作都是非阻塞的
4. Redis 如何处理阻塞操作?
答案要点:
- 避免阻塞:大部分操作都是 O(1) 或 O(log n)
- 后台线程:某些操作使用后台线程(如 AOF 刷盘)
- 分片:大键可以分片存储
- 监控:提供慢查询日志监控阻塞操作
深入思考
1. 单线程的局限性
虽然单线程有很多优势,但也存在一些局限性:
- CPU 密集型操作:如复杂计算会阻塞其他请求
- 大键操作:删除大键会阻塞服务器
- 网络 I/O:虽然使用多路复用,但仍有优化空间
2. 多线程优化
Redis 6.0 引入了多线程 I/O:
- I/O 线程:处理网络 I/O,不处理命令执行
- 主线程:仍然负责命令解析和执行
- 工作线程:处理某些后台任务
3. 性能调优建议
- 合理设置超时:避免长时间阻塞
- 监控慢查询:及时发现性能问题
- 优化数据结构:选择合适的 Redis 数据类型
- 网络优化:使用管道和批量操作
扩展阅读
总结
通过本章学习,我们深入理解了:
- Redis 的单线程架构设计及其优势
- 事件循环机制的工作原理
- 高性能来源的技术细节
- 手写实现了一个简化版 Redis 服务器
这些知识不仅有助于理解 Redis 的设计思想,也为后续学习更复杂的数据结构和分布式特性打下了坚实基础。
在下一章中,我们将深入学习 RESP 协议和网络通信机制,了解 Redis 如何与客户端进行高效的数据交换。