HiHuo
首页
博客
手册
工具
首页
博客
手册
工具
  • 系统底层修炼

    • 操作系统核心知识学习指南
    • CPU调度与上下文切换
    • CFS调度器原理与源码
    • 内存管理与虚拟内存
    • PageCache与内存回收
    • 文件系统与IO优化
    • 零拷贝与Direct I/O
    • 网络子系统架构
    • TCP协议深度解析
    • TCP问题排查实战
    • 网络性能优化
    • epoll与IO多路复用
    • 进程与线程管理
    • Go Runtime调度器GMP模型
    • 系统性能分析方法论
    • DPDK与用户态网络栈
    • eBPF与内核可观测性
    • 综合实战案例

epoll与IO多路复用

章节概述

epoll是Linux提供的高效IO多路复用机制,是构建高并发网络服务的基石。本章将深入探讨epoll的工作原理、使用方法,以及如何基于epoll构建高性能服务器。

学习目标:

  • 理解IO多路复用的概念和必要性
  • 掌握epoll的工作原理和API使用
  • 学会区分LT和ET模式
  • 能够使用epoll构建高性能网络服务

核心概念

1. IO模型对比

阻塞IO(Blocking IO):

应用调用read()
    ↓
等待数据(阻塞)
    ↓
数据到达
    ↓
复制数据到用户空间
    ↓
返回

非阻塞IO(Non-blocking IO):

应用调用read()
    ↓
数据未就绪 → 立即返回EAGAIN
    ↓
应用轮询
    ↓
数据就绪 → 返回数据

IO多路复用(IO Multiplexing):

epoll_wait()监听多个fd
    ↓
某个fd就绪
    ↓
返回就绪的fd列表
    ↓
应用处理就绪的fd

对比表:

模型并发能力CPU占用复杂度适用场景
阻塞IO低(每连接一线程)低简单并发低的场景
非阻塞IO中(需轮询)高中等不推荐
select/poll中(有限制)中中等小规模并发
epoll高(无限制)低复杂高并发场景

2. epoll核心架构

┌─────────────────────────────────────┐
│         用户空间应用                 │
│  epoll_create()                     │
│  epoll_ctl(ADD/MOD/DEL)            │
│  epoll_wait()                       │
└──────────┬──────────────────────────┘
           │
           ↓
┌─────────────────────────────────────┐
│         内核空间                     │
│  ┌─────────────────────────────┐   │
│  │    epoll instance           │   │
│  │  ┌──────────┐  ┌──────────┐│   │
│  │  │ 红黑树   │  │就绪链表  ││   │
│  │  │(所有fd) │  │(就绪fd)  ││   │
│  │  └──────────┘  └──────────┘│   │
│  └─────────────────────────────┘   │
└─────────────────────────────────────┘

关键组件:

  • 红黑树:存储所有监听的文件描述符
  • 就绪链表:存储已就绪的文件描述符
  • 等待队列:阻塞等待的进程队列

3. LT vs ET模式

水平触发(Level Triggered, LT):

特点:
- 只要fd就绪,就会一直通知
- 类似边缘中断
- 不处理也会再次通知

适用:
- 对实时性要求不高
- 编程简单
- 容错性好

边缘触发(Edge Triggered, ET):

特点:
- 只在状态变化时通知一次
- 需要一次性读完所有数据
- 漏掉则不再通知

适用:
- 高性能要求
- 需要配合非阻塞IO
- 编程复杂但高效

对比示例:

假设有1KB数据到达,应用只读取了512字节

LT模式:
epoll_wait()会再次返回该fd(因为还有数据)

ET模式:
epoll_wait()不会再返回该fd(除非有新数据到达)
必须在第一次通知时读完所有数据

源码解析

1. epoll实例创建

关键文件: fs/eventpoll.c

// epoll_create系统调用
SYSCALL_DEFINE1(epoll_create, int, size)
{
    if (size <= 0)
        return -EINVAL;
    
    return do_epoll_create(0);
}

// 创建epoll实例
static int do_epoll_create(int flags)
{
    int error, fd;
    struct eventpoll *ep = NULL;
    struct file *file;
    
    // 分配eventpoll结构
    error = ep_alloc(&ep);
    if (error < 0)
        return error;
    
    // 获取未使用的文件描述符
    fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
    if (fd < 0) {
        error = fd;
        goto out_free_ep;
    }
    
    // 创建匿名inode和文件
    file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
                             O_RDWR | (flags & O_CLOEXEC));
    if (IS_ERR(file)) {
        error = PTR_ERR(file);
        goto out_free_fd;
    }
    
    ep->file = file;
    fd_install(fd, file);
    return fd;
    
out_free_fd:
    put_unused_fd(fd);
out_free_ep:
    ep_free(ep);
    return error;
}

// eventpoll核心结构
struct eventpoll {
    spinlock_t lock;              // 保护eventpoll的自旋锁
    struct mutex mtx;             // 保护eventpoll的互斥锁
    wait_queue_head_t wq;         // epoll_wait()等待队列
    wait_queue_head_t poll_wait;  // file->poll()等待队列
    struct list_head rdllist;     // 就绪链表
    struct rb_root_cached rbr;    // 红黑树根节点
    struct epitem *ovflist;       // 溢出链表
};

2. 添加/修改/删除监听

// epoll_ctl系统调用
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
               struct epoll_event __user *, event)
{
    int error;
    int full_check = 0;
    struct fd f, tf;
    struct eventpoll *ep;
    struct epitem *epi;
    struct epoll_event epds;
    
    // 复制用户空间的event
    if (ep_op_has_event(op) &&
        copy_from_user(&epds, event, sizeof(struct epoll_event)))
        return -EFAULT;
    
    // 获取epoll文件
    f = fdget(epfd);
    if (!f.file)
        return -EBADF;
    
    // 获取目标文件
    tf = fdget(fd);
    if (!tf.file)
        goto error_fput;
    
    ep = f.file->private_data;
    
    mutex_lock(&ep->mtx);
    
    // 查找是否已存在
    epi = ep_find(ep, tf.file, fd);
    
    error = -EINVAL;
    switch (op) {
    case EPOLL_CTL_ADD:
        if (!epi) {
            epds.events |= EPOLLERR | EPOLLHUP;
            error = ep_insert(ep, &epds, tf.file, fd, full_check);
        } else
            error = -EEXIST;
        break;
    case EPOLL_CTL_DEL:
        if (epi)
            error = ep_remove(ep, epi);
        else
            error = -ENOENT;
        break;
    case EPOLL_CTL_MOD:
        if (epi) {
            epds.events |= EPOLLERR | EPOLLHUP;
            error = ep_modify(ep, epi, &epds);
        } else
            error = -ENOENT;
        break;
    }
    
    mutex_unlock(&ep->mtx);
    
error_tgt_fput:
    fdput(tf);
error_fput:
    fdput(f);
    return error;
}

// 插入新的epitem到红黑树
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
                    struct file *tfile, int fd, int full_check)
{
    int error, pwake = 0;
    struct epitem *epi;
    struct ep_pqueue epq;
    
    // 分配epitem
    if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
        return -ENOMEM;
    
    // 初始化epitem
    INIT_LIST_HEAD(&epi->rdllink);
    INIT_LIST_HEAD(&epi->fllink);
    INIT_LIST_HEAD(&epi->pwqlist);
    epi->ep = ep;
    ep_set_ffd(&epi->ffd, tfile, fd);
    epi->event = *event;
    epi->nwait = 0;
    epi->next = EP_UNACTIVE_PTR;
    
    // 插入红黑树
    ep_rbtree_insert(ep, epi);
    
    // 注册回调函数
    epq.epi = epi;
    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
    
    // 调用文件的poll函数
    revents = ep_item_poll(epi, &epq.pt, 1);
    
    // 如果已经就绪,加入就绪链表
    if (revents && !ep_is_linked(&epi->rdllink)) {
        list_add_tail(&epi->rdllink, &ep->rdllist);
        ep_pm_stay_awake(epi);
        
        // 唤醒等待的进程
        if (waitqueue_active(&ep->wq))
            wake_up_locked(&ep->wq);
    }
    
    return 0;
}

3. 等待事件

// epoll_wait系统调用
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
               int, maxevents, int, timeout)
{
    int error;
    struct fd f;
    struct eventpoll *ep;
    
    // 获取epoll文件
    f = fdget(epfd);
    if (!f.file)
        return -EBADF;
    
    ep = f.file->private_data;
    
    // 等待事件
    error = ep_poll(ep, events, maxevents, timeout);
    
    fdput(f);
    return error;
}

// epoll核心等待逻辑
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
                  int maxevents, long timeout)
{
    int res = 0, eavail, timed_out = 0;
    u64 slack = 0;
    wait_queue_entry_t wait;
    ktime_t expires, *to = NULL;
    
    // 检查就绪链表
    eavail = ep_events_available(ep);
    
    // 如果没有就绪事件,进入等待
    if (!eavail) {
        // 计算超时时间
        if (timeout > 0) {
            expires = ktime_add_ns(ktime_get(), timeout * NSEC_PER_MSEC);
            to = &expires;
        }
        
        // 初始化等待队列项
        init_waitqueue_entry(&wait, current);
        
        // 加入等待队列
        __add_wait_queue_exclusive(&ep->wq, &wait);
        
        for (;;) {
            // 设置进程状态为可中断睡眠
            set_current_state(TASK_INTERRUPTIBLE);
            
            // 再次检查就绪链表
            if (!list_empty_careful(&ep->rdllist) || timed_out)
                break;
            
            // 检查信号
            if (signal_pending(current)) {
                res = -EINTR;
                break;
            }
            
            // 睡眠等待
            if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
                timed_out = 1;
        }
        
        // 从等待队列移除
        __remove_wait_queue(&ep->wq, &wait);
        __set_current_state(TASK_RUNNING);
    }
    
    // 发送就绪事件到用户空间
    if (!res && eavail &&
        !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
        goto fetch_events;
    
    return res;
}

️ 实用命令

1. 查看epoll使用情况

查看进程的fd:

# 查看进程打开的文件描述符
ls -l /proc/<pid>/fd

# 查看epoll实例
ls -l /proc/<pid>/fd | grep eventpoll

# 查看epoll详细信息
cat /proc/<pid>/fdinfo/<epoll_fd>

示例输出:

pos:    0
flags:  02
mnt_id: 10
tfd:        5 events:       19 data: 7fffa0b2c920
tfd:        6 events:       19 data: 7fffa0b2c928
tfd:        7 events:       19 data: 7fffa0b2c930

2. 监控epoll性能

使用strace追踪:

# 追踪epoll相关系统调用
strace -e epoll_create,epoll_ctl,epoll_wait -p <pid>

# 统计系统调用
strace -c -e epoll_create,epoll_ctl,epoll_wait -p <pid>

代码示例

1. epoll基础示例(LT模式)

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <errno.h>

#define MAX_EVENTS 10
#define PORT 8080

// 设置非阻塞
int setnonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) {
        perror("fcntl F_GETFL");
        return -1;
    }
    
    if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
        perror("fcntl F_SETFL");
        return -1;
    }
    
    return 0;
}

int main() {
    int listen_fd, conn_fd, epoll_fd, nfds;
    struct sockaddr_in server_addr, client_addr;
    socklen_t client_len;
    struct epoll_event ev, events[MAX_EVENTS];
    char buffer[1024];
    
    // 创建监听socket
    listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (listen_fd < 0) {
        perror("socket");
        exit(1);
    }
    
    // 设置地址重用
    int opt = 1;
    setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    
    // 设置非阻塞
    setnonblocking(listen_fd);
    
    // 绑定地址
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(PORT);
    
    if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
        perror("bind");
        exit(1);
    }
    
    // 监听
    if (listen(listen_fd, SOMAXCONN) < 0) {
        perror("listen");
        exit(1);
    }
    
    // 创建epoll实例
    epoll_fd = epoll_create1(0);
    if (epoll_fd < 0) {
        perror("epoll_create1");
        exit(1);
    }
    
    // 添加监听socket到epoll
    ev.events = EPOLLIN;  // LT模式(默认)
    ev.data.fd = listen_fd;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) < 0) {
        perror("epoll_ctl: listen_fd");
        exit(1);
    }
    
    printf("服务器启动,监听端口 %d\n", PORT);
    
    // 事件循环
    while (1) {
        nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (nfds < 0) {
            perror("epoll_wait");
            break;
        }
        
        for (int i = 0; i < nfds; i++) {
            if (events[i].data.fd == listen_fd) {
                // 新连接
                client_len = sizeof(client_addr);
                conn_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_len);
                if (conn_fd < 0) {
                    perror("accept");
                    continue;
                }
                
                printf("接受新连接: fd=%d\n", conn_fd);
                
                setnonblocking(conn_fd);
                
                // 添加到epoll
                ev.events = EPOLLIN;
                ev.data.fd = conn_fd;
                if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &ev) < 0) {
                    perror("epoll_ctl: conn_fd");
                    close(conn_fd);
                }
            } else {
                // 数据可读
                int fd = events[i].data.fd;
                int n = read(fd, buffer, sizeof(buffer) - 1);
                
                if (n > 0) {
                    buffer[n] = '\0';
                    printf("接收 fd=%d: %s\n", fd, buffer);
                    
                    // 回显
                    write(fd, buffer, n);
                } else if (n == 0) {
                    // 连接关闭
                    printf("连接关闭: fd=%d\n", fd);
                    epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL);
                    close(fd);
                } else {
                    if (errno != EAGAIN) {
                        perror("read");
                        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL);
                        close(fd);
                    }
                }
            }
        }
    }
    
    close(epoll_fd);
    close(listen_fd);
    return 0;
}

2. epoll ET模式示例

// ET模式需要循环读取直到EAGAIN
void handle_et_read(int fd) {
    char buffer[1024];
    int n;
    
    // ET模式必须循环读取
    while (1) {
        n = read(fd, buffer, sizeof(buffer) - 1);
        
        if (n > 0) {
            buffer[n] = '\0';
            printf("读取 %d 字节: %s\n", n, buffer);
            
            // 处理数据...
            
        } else if (n == 0) {
            // 连接关闭
            printf("连接关闭\n");
            close(fd);
            break;
        } else {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 数据读完了
                printf("数据读完\n");
                break;
            } else {
                perror("read");
                close(fd);
                break;
            }
        }
    }
}

// 添加ET模式监听
ev.events = EPOLLIN | EPOLLET;  // 边缘触发
ev.data.fd = conn_fd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &ev);

3. Go语言封装epoll

package main

import (
    "fmt"
    "net"
    "syscall"
)

type EpollServer struct {
    listener net.Listener
    epfd     int
    events   []syscall.EpollEvent
}

func NewEpollServer(addr string) (*EpollServer, error) {
    // 创建监听器
    listener, err := net.Listen("tcp", addr)
    if (err != nil {
        return nil, err
    }
    
    // 创建epoll实例
    epfd, err := syscall.EpollCreate1(0)
    if err != nil {
        listener.Close()
        return nil, err
    }
    
    server := &EpollServer{
        listener: listener,
        epfd:     epfd,
        events:   make([]syscall.EpollEvent, 128),
    }
    
    // 获取监听socket的fd
    file, _ := listener.(*net.TCPListener).File()
    fd := int(file.Fd())
    
    // 添加到epoll
    event := syscall.EpollEvent{
        Events: syscall.EPOLLIN,
        Fd:     int32(fd),
    }
    
    if err := syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
        listener.Close()
        syscall.Close(epfd)
        return nil, err
    }
    
    return server, nil
}

func (s *EpollServer) Run() {
    for {
        // 等待事件
        n, err := syscall.EpollWait(s.epfd, s.events, -1)
        if err != nil {
            fmt.Printf("epoll_wait错误: %v\n", err)
            continue
        }
        
        // 处理事件
        for i := 0; i < n; i++ {
            fd := int(s.events[i].Fd)
            
            // 处理连接或数据...
            fmt.Printf("事件就绪: fd=%d\n", fd)
        }
    }
}

func main() {
    server, err := NewEpollServer(":8080")
    if err != nil {
        panic(err)
    }
    
    fmt.Println("服务器启动")
    server.Run()
}

动手实验

实验1:LT vs ET性能对比

目标: 理解两种模式的性能差异

步骤:

  1. 编译LT和ET版本:
gcc -o epoll_lt epoll_lt.c
gcc -o epoll_et epoll_et.c
  1. 使用ab进行压测:
# 终端1:启动服务器
./epoll_lt

# 终端2:压力测试
ab -n 10000 -c 100 http://127.0.0.1:8080/
  1. 对比性能指标:
  • 吞吐量(RPS)
  • 延迟(ms)
  • CPU使用率

实验2:epoll扩展性测试

目标: 测试epoll处理大量并发的能力

步骤:

  1. 创建并发连接测试:
#!/bin/bash
# concurrent_test.sh

for i in {1..1000}; do
    (echo "test $i"; sleep 10) | nc localhost 8080 &
done
  1. 观测系统状态:
# 查看连接数
ss -tan | grep 8080 | wc -l

# 查看epoll信息
ls -l /proc/$(pgrep epoll_lt)/fd | grep eventpoll

实验3:epoll源码追踪

目标: 使用工具追踪epoll的工作流程

步骤:

  1. 使用strace追踪:
strace -e epoll_create,epoll_ctl,epoll_wait -s 1000 ./epoll_lt
  1. 使用bpftrace追踪内核:
sudo bpftrace -e 'kprobe:ep_poll { printf("epoll_wait调用\n"); }'

常见问题

Q1: 什么时候应该使用ET模式?

A: ET模式适用于:

  • 高性能要求的场景
  • 能够保证一次性读完所有数据
  • 配合非阻塞IO使用
  • 愿意接受更复杂的编程模型

Q2: epoll的EPOLLONESHOT有什么用?

A: EPOLLONESHOT用于:

  • 多线程环境避免竞争
  • 事件只触发一次,处理完需要重新注册
  • 确保一个socket不会被多个线程同时处理

Q3: epoll能监听普通文件吗?

A: 不能。epoll只能监听支持poll操作的文件描述符,如:

  • socket
  • pipe
  • eventfd
  • timerfd
  • signalfd 普通文件总是返回就绪状态,不适合用epoll

Q4: epoll和select/poll的本质区别是什么?

A: 核心区别:

  • select/poll每次都要遍历所有fd
  • epoll使用回调机制,只返回就绪的fd
  • epoll没有fd数量限制
  • epoll在内核中维护就绪链表

复习题

选择题

  1. epoll使用什么数据结构存储所有监听的fd?

    • A. 链表
    • B. 数组
    • C. 红黑树
    • D. 哈希表
  2. ET模式的特点是:

    • A. 只要有数据就通知
    • B. 状态变化时通知一次
    • C. 永远不通知
    • D. 随机通知
  3. epoll_wait的返回值表示:

    • A. 总的fd数量
    • B. 就绪的fd数量
    • C. 错误码
    • D. 超时时间
  4. EPOLLONESHOT的作用是:

    • A. 只监听一次
    • B. 触发后自动删除
    • C. 触发后禁用,需重新启用
    • D. 单线程使用
  5. epoll相比select的优势不包括:

    • A. 没有fd数量限制
    • B. 不需要遍历所有fd
    • C. 更简单的API
    • D. 更好的性能

简答题

  1. 解释LT和ET模式的区别及适用场景。

  2. 为什么epoll比select/poll更高效?

  3. 描述epoll的完整工作流程。

  4. ET模式为什么必须配合非阻塞IO?

  5. 如何使用epoll实现超时机制?

实战题

  1. 编程题: 使用epoll实现一个高性能的echo服务器,支持10000并发连接。

  2. 性能优化题: 一个基于epoll的服务器在高并发下性能不佳,请分析可能的原因并提出优化方案。

  3. 调试题: 通过strace发现epoll_wait频繁返回但没有就绪fd,请分析可能的原因。

扩展阅读

推荐资源

  • epoll源码
  • The C10K Problem
  • epoll vs select vs poll

深入方向

  • io_uring(新一代异步IO)
  • IOCP(Windows完成端口)
  • kqueue(BSD的IO多路复用)
  • Reactor vs Proactor模式

下一章预告: 我们将深入探讨进程与线程管理,包括fork/exec机制、僵尸进程处理、线程调度策略等内容。

Prev
网络性能优化
Next
进程与线程管理