事件循环与 I/O 多路复用
学习目标
- 深入理解 I/O 多路复用的三种机制(select/poll/epoll)
- 掌握 Go 语言 netpoller 的实现原理
- 实现完整的事件循环系统(文件事件+时间事件)
- 理解非阻塞 I/O 和边缘触发/水平触发模式
- 掌握事件驱动的性能优化技巧
事件循环概述
事件循环是 Redis 高性能的核心机制,它通过 I/O 多路复用技术同时监听多个文件描述符,当有事件发生时立即处理,避免了阻塞等待。
事件循环架构
┌─────────────────────────────────────────────────────────────┐
│ 事件循环架构 │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 客户端1 │ │ 客户端2 │ │ 客户端N │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ I/O 多路复用层 │ │
│ │ (select/poll/epoll/kqueue) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 事件循环核心 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 文件事件 │ │ 时间事件 │ │ 事件分发器 │ │ │
│ │ │ 处理器 │ │ 处理器 │ │ │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 命令处理器 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 命令解析 │ │ 命令执行 │ │ 响应发送 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
I/O 多路复用机制对比
1. select 机制
select 是最早的 I/O 多路复用机制,但存在一些限制:
特点:
- 跨平台兼容性好
- 文件描述符数量限制(通常 1024)
- 每次调用需要重新设置 fd_set
- O(n) 时间复杂度扫描
实现原理:
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
2. poll 机制
poll 解决了 select 的一些限制:
特点:
- 没有文件描述符数量限制
- 使用链表存储,更灵活
- 仍然需要 O(n) 扫描
- 跨平台兼容性好
实现原理:
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
3. epoll 机制(Linux)
epoll 是 Linux 下最高效的 I/O 多路复用机制:
特点:
- 支持大量文件描述符(数万个)
- O(1) 时间复杂度
- 边缘触发和水平触发模式
- 内存映射,减少数据拷贝
实现原理:
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
4. kqueue 机制(BSD/macOS)
kqueue 是 BSD 系统的高效事件通知机制:
特点:
- 支持多种事件类型
- 高性能,低延迟
- 支持文件系统事件
- macOS 和 FreeBSD 原生支持
️ Go 语言 netpoller 实现
Go 语言在运行时层面实现了高效的网络 I/O 多路复用:
1. 核心结构
// runtime/netpoll.go (简化版)
type pollDesc struct {
link *pollDesc
fd uintptr
rg uintptr // 读 goroutine
wg uintptr // 写 goroutine
closing bool
seq uintptr
}
type pollCache struct {
lock mutex
first *pollDesc
}
var pollcache pollCache
2. 事件循环实现
// eventloop/event_loop.go
package eventloop
import (
"context"
"fmt"
"net"
"runtime"
"sync"
"syscall"
"time"
)
// 事件类型
type EventType int
const (
EventRead EventType = 1 << iota
EventWrite
EventError
)
// 事件结构
type Event struct {
FD int
Type EventType
Data interface{}
}
// 事件处理器接口
type EventHandler interface {
HandleRead(fd int) error
HandleWrite(fd int) error
HandleError(fd int) error
}
// 事件循环
type EventLoop struct {
mu sync.RWMutex
handlers map[int]EventHandler
events chan Event
stopCh chan struct{}
wg sync.WaitGroup
maxEvents int
timeout time.Duration
}
// 创建事件循环
func NewEventLoop(maxEvents int, timeout time.Duration) *EventLoop {
return &EventLoop{
handlers: make(map[int]EventHandler),
events: make(chan Event, maxEvents),
stopCh: make(chan struct{}),
maxEvents: maxEvents,
timeout: timeout,
}
}
// 启动事件循环
func (el *EventLoop) Start() error {
el.wg.Add(1)
go el.run()
return nil
}
// 停止事件循环
func (el *EventLoop) Stop() {
close(el.stopCh)
el.wg.Wait()
}
// 注册事件处理器
func (el *EventLoop) RegisterHandler(fd int, handler EventHandler) {
el.mu.Lock()
defer el.mu.Unlock()
el.handlers[fd] = handler
}
// 注销事件处理器
func (el *EventLoop) UnregisterHandler(fd int) {
el.mu.Lock()
defer el.mu.Unlock()
delete(el.handlers, fd)
}
// 事件循环主循环
func (el *EventLoop) run() {
defer el.wg.Done()
// 创建 epoll 实例
epfd, err := syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
if err != nil {
fmt.Printf("Failed to create epoll: %v\n", err)
return
}
defer syscall.Close(epfd)
events := make([]syscall.EpollEvent, el.maxEvents)
for {
select {
case <-el.stopCh:
return
default:
// 等待事件
n, err := syscall.EpollWait(epfd, events, int(el.timeout.Milliseconds()))
if err != nil {
if err == syscall.EINTR {
continue
}
fmt.Printf("EpollWait error: %v\n", err)
continue
}
// 处理事件
for i := 0; i < n; i++ {
el.handleEvent(events[i])
}
}
}
}
// 处理单个事件
func (el *EventLoop) handleEvent(event syscall.EpollEvent) {
fd := int(event.Fd)
el.mu.RLock()
handler, exists := el.handlers[fd]
el.mu.RUnlock()
if !exists {
return
}
// 根据事件类型分发处理
if event.Events&syscall.EPOLLIN != 0 {
if err := handler.HandleRead(fd); err != nil {
handler.HandleError(fd)
}
}
if event.Events&syscall.EPOLLOUT != 0 {
if err := handler.HandleWrite(fd); err != nil {
handler.HandleError(fd)
}
}
if event.Events&(syscall.EPOLLERR|syscall.EPOLLHUP) != 0 {
handler.HandleError(fd)
}
}
// 添加文件描述符到 epoll
func (el *EventLoop) AddFD(fd int, events EventType) error {
var epollEvents uint32
if events&EventRead != 0 {
epollEvents |= syscall.EPOLLIN
}
if events&EventWrite != 0 {
epollEvents |= syscall.EPOLLOUT
}
event := syscall.EpollEvent{
Events: epollEvents,
Fd: int32(fd),
}
return syscall.EpollCtl(syscall.EPOLL_CTL_ADD, fd, &event)
}
// 修改文件描述符事件
func (el *EventLoop) ModifyFD(fd int, events EventType) error {
var epollEvents uint32
if events&EventRead != 0 {
epollEvents |= syscall.EPOLLIN
}
if events&EventWrite != 0 {
epollEvents |= syscall.EPOLLOUT
}
event := syscall.EpollEvent{
Events: epollEvents,
Fd: int32(fd),
}
return syscall.EpollCtl(syscall.EPOLL_CTL_MOD, fd, &event)
}
// 删除文件描述符
func (el *EventLoop) RemoveFD(fd int) error {
return syscall.EpollCtl(syscall.EPOLL_CTL_DEL, fd, nil)
}
⏰ 时间事件系统
Redis 的时间事件用于处理定时任务,如过期键清理、AOF 刷盘等:
1. 时间事件结构
// eventloop/time_event.go
package eventloop
import (
"container/heap"
"sync"
"time"
)
// 时间事件
type TimeEvent struct {
ID int64
When time.Time
Interval time.Duration
Handler func() error
Repeat bool
}
// 时间事件堆
type TimeEventHeap []*TimeEvent
func (h TimeEventHeap) Len() int { return len(h) }
func (h TimeEventHeap) Less(i, j int) bool { return h[i].When.Before(h[j].When) }
func (h TimeEventHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *TimeEventHeap) Push(x interface{}) {
*h = append(*h, x.(*TimeEvent))
}
func (h *TimeEventHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// 时间事件管理器
type TimeEventManager struct {
mu sync.RWMutex
heap TimeEventHeap
nextID int64
}
func NewTimeEventManager() *TimeEventManager {
return &TimeEventManager{
heap: make(TimeEventHeap, 0),
nextID: 1,
}
}
// 添加时间事件
func (tem *TimeEventManager) AddTimeEvent(when time.Time, interval time.Duration, handler func() error, repeat bool) int64 {
tem.mu.Lock()
defer tem.mu.Unlock()
event := &TimeEvent{
ID: tem.nextID,
When: when,
Interval: interval,
Handler: handler,
Repeat: repeat,
}
tem.nextID++
heap.Push(&tem.heap, event)
return event.ID
}
// 删除时间事件
func (tem *TimeEventManager) RemoveTimeEvent(id int64) bool {
tem.mu.Lock()
defer tem.mu.Unlock()
for i, event := range tem.heap {
if event.ID == id {
heap.Remove(&tem.heap, i)
return true
}
}
return false
}
// 获取下一个时间事件
func (tem *TimeEventManager) GetNextTimeEvent() *TimeEvent {
tem.mu.RLock()
defer tem.mu.RUnlock()
if len(tem.heap) == 0 {
return nil
}
return tem.heap[0]
}
// 处理到期的时间事件
func (tem *TimeEventManager) ProcessExpiredEvents() []error {
tem.mu.Lock()
defer tem.mu.Unlock()
var errors []error
now := time.Now()
for len(tem.heap) > 0 && !tem.heap[0].When.After(now) {
event := heap.Pop(&tem.heap).(*TimeEvent)
// 执行事件处理器
if err := event.Handler(); err != nil {
errors = append(errors, err)
}
// 如果是重复事件,重新添加到堆中
if event.Repeat {
event.When = now.Add(event.Interval)
heap.Push(&tem.heap, event)
}
}
return errors
}
完整的事件循环实现
1. 集成事件循环
// eventloop/integrated_loop.go
package eventloop
import (
"context"
"fmt"
"net"
"sync"
"time"
)
// 集成事件循环
type IntegratedEventLoop struct {
fileEventLoop *EventLoop
timeEventMgr *TimeEventManager
stopCh chan struct{}
wg sync.WaitGroup
}
func NewIntegratedEventLoop() *IntegratedEventLoop {
return &IntegratedEventLoop{
fileEventLoop: NewEventLoop(1024, 10*time.Millisecond),
timeEventMgr: NewTimeEventManager(),
stopCh: make(chan struct{}),
}
}
// 启动集成事件循环
func (iel *IntegratedEventLoop) Start() error {
// 启动文件事件循环
if err := iel.fileEventLoop.Start(); err != nil {
return err
}
// 启动时间事件处理协程
iel.wg.Add(1)
go iel.processTimeEvents()
return nil
}
// 停止集成事件循环
func (iel *IntegratedEventLoop) Stop() {
close(iel.stopCh)
iel.fileEventLoop.Stop()
iel.wg.Wait()
}
// 处理时间事件
func (iel *IntegratedEventLoop) processTimeEvents() {
defer iel.wg.Done()
ticker := time.NewTicker(1 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-iel.stopCh:
return
case <-ticker.C:
// 处理到期的时间事件
if errors := iel.timeEventMgr.ProcessExpiredEvents(); len(errors) > 0 {
for _, err := range errors {
fmt.Printf("Time event error: %v\n", err)
}
}
}
}
}
// 添加文件事件
func (iel *IntegratedEventLoop) AddFileEvent(fd int, events EventType, handler EventHandler) error {
iel.fileEventLoop.RegisterHandler(fd, handler)
return iel.fileEventLoop.AddFD(fd, events)
}
// 添加时间事件
func (iel *IntegratedEventLoop) AddTimeEvent(when time.Time, interval time.Duration, handler func() error, repeat bool) int64 {
return iel.timeEventMgr.AddTimeEvent(when, interval, handler, repeat)
}
// 删除文件事件
func (iel *IntegratedEventLoop) RemoveFileEvent(fd int) error {
iel.fileEventLoop.UnregisterHandler(fd)
return iel.fileEventLoop.RemoveFD(fd)
}
// 删除时间事件
func (iel *IntegratedEventLoop) RemoveTimeEvent(id int64) bool {
return iel.timeEventMgr.RemoveTimeEvent(id)
}
2. Redis 服务器集成
// eventloop/redis_server.go
package eventloop
import (
"bufio"
"fmt"
"net"
"strconv"
"strings"
"time"
)
// Redis 连接处理器
type RedisConnectionHandler struct {
conn net.Conn
reader *bufio.Reader
writer *bufio.Writer
server *RedisServer
}
func NewRedisConnectionHandler(conn net.Conn, server *RedisServer) *RedisConnectionHandler {
return &RedisConnectionHandler{
conn: conn,
reader: bufio.NewReader(conn),
writer: bufio.NewWriter(conn),
server: server,
}
}
// 处理读事件
func (rch *RedisConnectionHandler) HandleRead(fd int) error {
// 解析命令
cmd, err := rch.parseCommand()
if err != nil {
return err
}
// 执行命令
response := rch.server.ExecuteCommand(cmd)
// 发送响应
return rch.sendResponse(response)
}
// 处理写事件
func (rch *RedisConnectionHandler) HandleWrite(fd int) error {
return rch.writer.Flush()
}
// 处理错误事件
func (rch *RedisConnectionHandler) HandleError(fd int) error {
rch.conn.Close()
return nil
}
// 解析命令(简化版)
func (rch *RedisConnectionHandler) parseCommand() ([]string, error) {
line, err := rch.reader.ReadString('\n')
if err != nil {
return nil, err
}
line = strings.TrimSpace(line)
if line == "" {
return nil, fmt.Errorf("empty command")
}
return strings.Fields(line), nil
}
// 发送响应
func (rch *RedisConnectionHandler) sendResponse(response string) error {
_, err := rch.writer.WriteString(response + "\r\n")
return err
}
// Redis 服务器
type RedisServer struct {
loop *IntegratedEventLoop
store map[string]string
mu sync.RWMutex
}
func NewRedisServer() *RedisServer {
return &RedisServer{
loop: NewIntegratedEventLoop(),
store: make(map[string]string),
}
}
// 启动服务器
func (rs *RedisServer) Start(addr string) error {
// 启动事件循环
if err := rs.loop.Start(); err != nil {
return err
}
// 启动监听
listener, err := net.Listen("tcp", addr)
if err != nil {
return err
}
fmt.Printf("Redis server listening on %s\n", addr)
// 添加过期键清理时间事件
rs.loop.AddTimeEvent(
time.Now().Add(time.Second),
time.Second,
rs.cleanExpiredKeys,
true,
)
// 接受连接
go rs.acceptConnections(listener)
return nil
}
// 接受连接
func (rs *RedisServer) acceptConnections(listener net.Listener) {
for {
conn, err := listener.Accept()
if err != nil {
continue
}
// 创建连接处理器
handler := NewRedisConnectionHandler(conn, rs)
// 添加到事件循环
if err := rs.loop.AddFileEvent(
int(conn.(*net.TCPConn).File().Fd()),
EventRead,
handler,
); err != nil {
conn.Close()
continue
}
}
}
// 执行命令
func (rs *RedisServer) ExecuteCommand(cmd []string) string {
if len(cmd) == 0 {
return "-ERR empty command"
}
switch strings.ToUpper(cmd[0]) {
case "PING":
return "+PONG"
case "SET":
if len(cmd) < 3 {
return "-ERR wrong number of arguments"
}
rs.mu.Lock()
rs.store[cmd[1]] = cmd[2]
rs.mu.Unlock()
return "+OK"
case "GET":
if len(cmd) < 2 {
return "-ERR wrong number of arguments"
}
rs.mu.RLock()
value, exists := rs.store[cmd[1]]
rs.mu.RUnlock()
if !exists {
return "$-1"
}
return fmt.Sprintf("$%d\r\n%s", len(value), value)
default:
return "-ERR unknown command"
}
}
// 清理过期键(示例)
func (rs *RedisServer) cleanExpiredKeys() error {
// 这里可以实现过期键清理逻辑
return nil
}
// 停止服务器
func (rs *RedisServer) Stop() {
rs.loop.Stop()
}
性能基准测试
1. 事件循环性能测试
// eventloop/benchmark_test.go
package eventloop
import (
"fmt"
"net"
"testing"
"time"
)
func BenchmarkEventLoop(b *testing.B) {
loop := NewIntegratedEventLoop()
loop.Start()
defer loop.Stop()
// 创建测试连接
conn, err := net.Dial("tcp", "127.0.0.1:6380")
if err != nil {
b.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// 模拟命令处理
cmd := fmt.Sprintf("SET key%d value%d\r\n", i, i)
conn.Write([]byte(cmd))
// 读取响应
buf := make([]byte, 1024)
conn.Read(buf)
}
}
func BenchmarkTimeEvents(b *testing.B) {
mgr := NewTimeEventManager()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// 添加时间事件
mgr.AddTimeEvent(
time.Now().Add(time.Millisecond),
time.Millisecond,
func() error { return nil },
false,
)
// 处理事件
mgr.ProcessExpiredEvents()
}
}
2. 并发性能测试
// eventloop/concurrent_test.go
package eventloop
import (
"net"
"sync"
"testing"
"time"
)
func TestConcurrentConnections(t *testing.T) {
server := NewRedisServer()
if err := server.Start(":6381"); err != nil {
t.Fatalf("Failed to start server: %v", err)
}
defer server.Stop()
const numConnections = 100
const commandsPerConnection = 100
var wg sync.WaitGroup
for i := 0; i < numConnections; i++ {
wg.Add(1)
go func(connID int) {
defer wg.Done()
conn, err := net.Dial("tcp", "127.0.0.1:6381")
if err != nil {
t.Errorf("Connection %d failed: %v", connID, err)
return
}
defer conn.Close()
for j := 0; j < commandsPerConnection; j++ {
cmd := fmt.Sprintf("SET key%d_%d value%d_%d\r\n", connID, j, connID, j)
if _, err := conn.Write([]byte(cmd)); err != nil {
t.Errorf("Write failed: %v", err)
return
}
// 读取响应
buf := make([]byte, 1024)
if _, err := conn.Read(buf); err != nil {
t.Errorf("Read failed: %v", err)
return
}
}
}(i)
}
wg.Wait()
}
性能分析
1. 事件处理延迟
// eventloop/latency_test.go
package eventloop
import (
"testing"
"time"
)
func TestEventLatency(t *testing.T) {
loop := NewIntegratedEventLoop()
loop.Start()
defer loop.Stop()
var totalLatency time.Duration
const numEvents = 1000
for i := 0; i < numEvents; i++ {
start := time.Now()
// 添加时间事件
loop.AddTimeEvent(
time.Now().Add(time.Microsecond),
0,
func() error {
totalLatency += time.Since(start)
return nil
},
false,
)
time.Sleep(time.Microsecond)
}
avgLatency := totalLatency / numEvents
t.Logf("Average event latency: %v", avgLatency)
if avgLatency > 100*time.Microsecond {
t.Errorf("Event latency too high: %v", avgLatency)
}
}
2. 内存使用分析
// eventloop/memory_test.go
package eventloop
import (
"runtime"
"testing"
)
func TestMemoryUsage(t *testing.T) {
var m1, m2 runtime.MemStats
runtime.ReadMemStats(&m1)
// 创建大量事件
loop := NewIntegratedEventLoop()
loop.Start()
for i := 0; i < 10000; i++ {
loop.AddTimeEvent(
time.Now().Add(time.Hour),
time.Hour,
func() error { return nil },
true,
)
}
runtime.ReadMemStats(&m2)
heapGrowth := m2.HeapAlloc - m1.HeapAlloc
t.Logf("Memory usage: %d bytes", heapGrowth)
if heapGrowth > 10*1024*1024 { // 10MB
t.Errorf("Memory usage too high: %d bytes", heapGrowth)
}
loop.Stop()
}
面试要点
1. I/O 多路复用的区别
答案要点:
- select:跨平台,但有 fd 数量限制,O(n) 扫描
- poll:无 fd 数量限制,但仍需 O(n) 扫描
- epoll:Linux 高效实现,O(1) 复杂度,支持边缘触发
- kqueue:BSD 系统,支持多种事件类型
2. 边缘触发 vs 水平触发
答案要点:
- 水平触发:只要 fd 可读/可写就通知,可能重复通知
- 边缘触发:只在状态变化时通知一次,需要一次性处理完所有数据
- 选择建议:高并发场景用边缘触发,简单场景用水平触发
3. Go netpoller 的优势
答案要点:
- 用户态调度:避免内核态/用户态切换
- Goroutine 复用:一个线程处理多个连接
- 自动负载均衡:工作窃取算法
- 内存效率:栈空间小,创建成本低
4. 事件循环的性能优化
答案要点:
- 批量处理:一次处理多个事件
- 零拷贝:减少内存拷贝
- 对象池:复用对象减少 GC
- 非阻塞 I/O:避免线程阻塞
总结
通过本章学习,我们深入理解了:
- I/O 多路复用的三种机制和性能对比
- Go netpoller的实现原理和优势
- 事件循环的完整实现和优化技巧
- 时间事件系统的设计和实现
- 性能测试和优化方法
这些知识为构建高性能的 Redis 服务器提供了坚实的事件驱动基础。在下一章中,我们将学习字符串与 SDS 实现,了解 Redis 如何优化字符串操作。