epoll与IO多路复用
章节概述
epoll是Linux提供的高效IO多路复用机制,是构建高并发网络服务的基石。本章将深入探讨epoll的工作原理、使用方法,以及如何基于epoll构建高性能服务器。
学习目标:
- 理解IO多路复用的概念和必要性
- 掌握epoll的工作原理和API使用
- 学会区分LT和ET模式
- 能够使用epoll构建高性能网络服务
核心概念
1. IO模型对比
阻塞IO(Blocking IO):
应用调用read()
↓
等待数据(阻塞)
↓
数据到达
↓
复制数据到用户空间
↓
返回
非阻塞IO(Non-blocking IO):
应用调用read()
↓
数据未就绪 → 立即返回EAGAIN
↓
应用轮询
↓
数据就绪 → 返回数据
IO多路复用(IO Multiplexing):
epoll_wait()监听多个fd
↓
某个fd就绪
↓
返回就绪的fd列表
↓
应用处理就绪的fd
对比表:
模型 | 并发能力 | CPU占用 | 复杂度 | 适用场景 |
---|---|---|---|---|
阻塞IO | 低(每连接一线程) | 低 | 简单 | 并发低的场景 |
非阻塞IO | 中(需轮询) | 高 | 中等 | 不推荐 |
select/poll | 中(有限制) | 中 | 中等 | 小规模并发 |
epoll | 高(无限制) | 低 | 复杂 | 高并发场景 |
2. epoll核心架构
┌─────────────────────────────────────┐
│ 用户空间应用 │
│ epoll_create() │
│ epoll_ctl(ADD/MOD/DEL) │
│ epoll_wait() │
└──────────┬──────────────────────────┘
│
↓
┌─────────────────────────────────────┐
│ 内核空间 │
│ ┌─────────────────────────────┐ │
│ │ epoll instance │ │
│ │ ┌──────────┐ ┌──────────┐│ │
│ │ │ 红黑树 │ │就绪链表 ││ │
│ │ │(所有fd) │ │(就绪fd) ││ │
│ │ └──────────┘ └──────────┘│ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────┘
关键组件:
- 红黑树:存储所有监听的文件描述符
- 就绪链表:存储已就绪的文件描述符
- 等待队列:阻塞等待的进程队列
3. LT vs ET模式
水平触发(Level Triggered, LT):
特点:
- 只要fd就绪,就会一直通知
- 类似边缘中断
- 不处理也会再次通知
适用:
- 对实时性要求不高
- 编程简单
- 容错性好
边缘触发(Edge Triggered, ET):
特点:
- 只在状态变化时通知一次
- 需要一次性读完所有数据
- 漏掉则不再通知
适用:
- 高性能要求
- 需要配合非阻塞IO
- 编程复杂但高效
对比示例:
假设有1KB数据到达,应用只读取了512字节
LT模式:
epoll_wait()会再次返回该fd(因为还有数据)
ET模式:
epoll_wait()不会再返回该fd(除非有新数据到达)
必须在第一次通知时读完所有数据
源码解析
1. epoll实例创建
关键文件: fs/eventpoll.c
// epoll_create系统调用
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;
return do_epoll_create(0);
}
// 创建epoll实例
static int do_epoll_create(int flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
// 分配eventpoll结构
error = ep_alloc(&ep);
if (error < 0)
return error;
// 获取未使用的文件描述符
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}
// 创建匿名inode和文件
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
ep->file = file;
fd_install(fd, file);
return fd;
out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep);
return error;
}
// eventpoll核心结构
struct eventpoll {
spinlock_t lock; // 保护eventpoll的自旋锁
struct mutex mtx; // 保护eventpoll的互斥锁
wait_queue_head_t wq; // epoll_wait()等待队列
wait_queue_head_t poll_wait; // file->poll()等待队列
struct list_head rdllist; // 就绪链表
struct rb_root_cached rbr; // 红黑树根节点
struct epitem *ovflist; // 溢出链表
};
2. 添加/修改/删除监听
// epoll_ctl系统调用
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
int full_check = 0;
struct fd f, tf;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
// 复制用户空间的event
if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
return -EFAULT;
// 获取epoll文件
f = fdget(epfd);
if (!f.file)
return -EBADF;
// 获取目标文件
tf = fdget(fd);
if (!tf.file)
goto error_fput;
ep = f.file->private_data;
mutex_lock(&ep->mtx);
// 查找是否已存在
epi = ep_find(ep, tf.file, fd);
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= EPOLLERR | EPOLLHUP;
error = ep_insert(ep, &epds, tf.file, fd, full_check);
} else
error = -EEXIST;
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= EPOLLERR | EPOLLHUP;
error = ep_modify(ep, epi, &epds);
} else
error = -ENOENT;
break;
}
mutex_unlock(&ep->mtx);
error_tgt_fput:
fdput(tf);
error_fput:
fdput(f);
return error;
}
// 插入新的epitem到红黑树
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd, int full_check)
{
int error, pwake = 0;
struct epitem *epi;
struct ep_pqueue epq;
// 分配epitem
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;
// 初始化epitem
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
// 插入红黑树
ep_rbtree_insert(ep, epi);
// 注册回调函数
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
// 调用文件的poll函数
revents = ep_item_poll(epi, &epq.pt, 1);
// 如果已经就绪,加入就绪链表
if (revents && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
// 唤醒等待的进程
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
}
return 0;
}
3. 等待事件
// epoll_wait系统调用
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
int error;
struct fd f;
struct eventpoll *ep;
// 获取epoll文件
f = fdget(epfd);
if (!f.file)
return -EBADF;
ep = f.file->private_data;
// 等待事件
error = ep_poll(ep, events, maxevents, timeout);
fdput(f);
return error;
}
// epoll核心等待逻辑
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res = 0, eavail, timed_out = 0;
u64 slack = 0;
wait_queue_entry_t wait;
ktime_t expires, *to = NULL;
// 检查就绪链表
eavail = ep_events_available(ep);
// 如果没有就绪事件,进入等待
if (!eavail) {
// 计算超时时间
if (timeout > 0) {
expires = ktime_add_ns(ktime_get(), timeout * NSEC_PER_MSEC);
to = &expires;
}
// 初始化等待队列项
init_waitqueue_entry(&wait, current);
// 加入等待队列
__add_wait_queue_exclusive(&ep->wq, &wait);
for (;;) {
// 设置进程状态为可中断睡眠
set_current_state(TASK_INTERRUPTIBLE);
// 再次检查就绪链表
if (!list_empty_careful(&ep->rdllist) || timed_out)
break;
// 检查信号
if (signal_pending(current)) {
res = -EINTR;
break;
}
// 睡眠等待
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
timed_out = 1;
}
// 从等待队列移除
__remove_wait_queue(&ep->wq, &wait);
__set_current_state(TASK_RUNNING);
}
// 发送就绪事件到用户空间
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;
return res;
}
️ 实用命令
1. 查看epoll使用情况
查看进程的fd:
# 查看进程打开的文件描述符
ls -l /proc/<pid>/fd
# 查看epoll实例
ls -l /proc/<pid>/fd | grep eventpoll
# 查看epoll详细信息
cat /proc/<pid>/fdinfo/<epoll_fd>
示例输出:
pos: 0
flags: 02
mnt_id: 10
tfd: 5 events: 19 data: 7fffa0b2c920
tfd: 6 events: 19 data: 7fffa0b2c928
tfd: 7 events: 19 data: 7fffa0b2c930
2. 监控epoll性能
使用strace追踪:
# 追踪epoll相关系统调用
strace -e epoll_create,epoll_ctl,epoll_wait -p <pid>
# 统计系统调用
strace -c -e epoll_create,epoll_ctl,epoll_wait -p <pid>
代码示例
1. epoll基础示例(LT模式)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <errno.h>
#define MAX_EVENTS 10
#define PORT 8080
// 设置非阻塞
int setnonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) {
perror("fcntl F_GETFL");
return -1;
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
perror("fcntl F_SETFL");
return -1;
}
return 0;
}
int main() {
int listen_fd, conn_fd, epoll_fd, nfds;
struct sockaddr_in server_addr, client_addr;
socklen_t client_len;
struct epoll_event ev, events[MAX_EVENTS];
char buffer[1024];
// 创建监听socket
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0) {
perror("socket");
exit(1);
}
// 设置地址重用
int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
// 设置非阻塞
setnonblocking(listen_fd);
// 绑定地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);
if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
perror("bind");
exit(1);
}
// 监听
if (listen(listen_fd, SOMAXCONN) < 0) {
perror("listen");
exit(1);
}
// 创建epoll实例
epoll_fd = epoll_create1(0);
if (epoll_fd < 0) {
perror("epoll_create1");
exit(1);
}
// 添加监听socket到epoll
ev.events = EPOLLIN; // LT模式(默认)
ev.data.fd = listen_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev) < 0) {
perror("epoll_ctl: listen_fd");
exit(1);
}
printf("服务器启动,监听端口 %d\n", PORT);
// 事件循环
while (1) {
nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds < 0) {
perror("epoll_wait");
break;
}
for (int i = 0; i < nfds; i++) {
if (events[i].data.fd == listen_fd) {
// 新连接
client_len = sizeof(client_addr);
conn_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_len);
if (conn_fd < 0) {
perror("accept");
continue;
}
printf("接受新连接: fd=%d\n", conn_fd);
setnonblocking(conn_fd);
// 添加到epoll
ev.events = EPOLLIN;
ev.data.fd = conn_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &ev) < 0) {
perror("epoll_ctl: conn_fd");
close(conn_fd);
}
} else {
// 数据可读
int fd = events[i].data.fd;
int n = read(fd, buffer, sizeof(buffer) - 1);
if (n > 0) {
buffer[n] = '\0';
printf("接收 fd=%d: %s\n", fd, buffer);
// 回显
write(fd, buffer, n);
} else if (n == 0) {
// 连接关闭
printf("连接关闭: fd=%d\n", fd);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
} else {
if (errno != EAGAIN) {
perror("read");
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
}
}
}
}
}
close(epoll_fd);
close(listen_fd);
return 0;
}
2. epoll ET模式示例
// ET模式需要循环读取直到EAGAIN
void handle_et_read(int fd) {
char buffer[1024];
int n;
// ET模式必须循环读取
while (1) {
n = read(fd, buffer, sizeof(buffer) - 1);
if (n > 0) {
buffer[n] = '\0';
printf("读取 %d 字节: %s\n", n, buffer);
// 处理数据...
} else if (n == 0) {
// 连接关闭
printf("连接关闭\n");
close(fd);
break;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 数据读完了
printf("数据读完\n");
break;
} else {
perror("read");
close(fd);
break;
}
}
}
}
// 添加ET模式监听
ev.events = EPOLLIN | EPOLLET; // 边缘触发
ev.data.fd = conn_fd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &ev);
3. Go语言封装epoll
package main
import (
"fmt"
"net"
"syscall"
)
type EpollServer struct {
listener net.Listener
epfd int
events []syscall.EpollEvent
}
func NewEpollServer(addr string) (*EpollServer, error) {
// 创建监听器
listener, err := net.Listen("tcp", addr)
if (err != nil {
return nil, err
}
// 创建epoll实例
epfd, err := syscall.EpollCreate1(0)
if err != nil {
listener.Close()
return nil, err
}
server := &EpollServer{
listener: listener,
epfd: epfd,
events: make([]syscall.EpollEvent, 128),
}
// 获取监听socket的fd
file, _ := listener.(*net.TCPListener).File()
fd := int(file.Fd())
// 添加到epoll
event := syscall.EpollEvent{
Events: syscall.EPOLLIN,
Fd: int32(fd),
}
if err := syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
listener.Close()
syscall.Close(epfd)
return nil, err
}
return server, nil
}
func (s *EpollServer) Run() {
for {
// 等待事件
n, err := syscall.EpollWait(s.epfd, s.events, -1)
if err != nil {
fmt.Printf("epoll_wait错误: %v\n", err)
continue
}
// 处理事件
for i := 0; i < n; i++ {
fd := int(s.events[i].Fd)
// 处理连接或数据...
fmt.Printf("事件就绪: fd=%d\n", fd)
}
}
}
func main() {
server, err := NewEpollServer(":8080")
if err != nil {
panic(err)
}
fmt.Println("服务器启动")
server.Run()
}
动手实验
实验1:LT vs ET性能对比
目标: 理解两种模式的性能差异
步骤:
- 编译LT和ET版本:
gcc -o epoll_lt epoll_lt.c
gcc -o epoll_et epoll_et.c
- 使用ab进行压测:
# 终端1:启动服务器
./epoll_lt
# 终端2:压力测试
ab -n 10000 -c 100 http://127.0.0.1:8080/
- 对比性能指标:
- 吞吐量(RPS)
- 延迟(ms)
- CPU使用率
实验2:epoll扩展性测试
目标: 测试epoll处理大量并发的能力
步骤:
- 创建并发连接测试:
#!/bin/bash
# concurrent_test.sh
for i in {1..1000}; do
(echo "test $i"; sleep 10) | nc localhost 8080 &
done
- 观测系统状态:
# 查看连接数
ss -tan | grep 8080 | wc -l
# 查看epoll信息
ls -l /proc/$(pgrep epoll_lt)/fd | grep eventpoll
实验3:epoll源码追踪
目标: 使用工具追踪epoll的工作流程
步骤:
- 使用strace追踪:
strace -e epoll_create,epoll_ctl,epoll_wait -s 1000 ./epoll_lt
- 使用bpftrace追踪内核:
sudo bpftrace -e 'kprobe:ep_poll { printf("epoll_wait调用\n"); }'
常见问题
Q1: 什么时候应该使用ET模式?
A: ET模式适用于:
- 高性能要求的场景
- 能够保证一次性读完所有数据
- 配合非阻塞IO使用
- 愿意接受更复杂的编程模型
Q2: epoll的EPOLLONESHOT有什么用?
A: EPOLLONESHOT用于:
- 多线程环境避免竞争
- 事件只触发一次,处理完需要重新注册
- 确保一个socket不会被多个线程同时处理
Q3: epoll能监听普通文件吗?
A: 不能。epoll只能监听支持poll操作的文件描述符,如:
- socket
- pipe
- eventfd
- timerfd
- signalfd 普通文件总是返回就绪状态,不适合用epoll
Q4: epoll和select/poll的本质区别是什么?
A: 核心区别:
- select/poll每次都要遍历所有fd
- epoll使用回调机制,只返回就绪的fd
- epoll没有fd数量限制
- epoll在内核中维护就绪链表
复习题
选择题
epoll使用什么数据结构存储所有监听的fd?
- A. 链表
- B. 数组
- C. 红黑树
- D. 哈希表
ET模式的特点是:
- A. 只要有数据就通知
- B. 状态变化时通知一次
- C. 永远不通知
- D. 随机通知
epoll_wait的返回值表示:
- A. 总的fd数量
- B. 就绪的fd数量
- C. 错误码
- D. 超时时间
EPOLLONESHOT的作用是:
- A. 只监听一次
- B. 触发后自动删除
- C. 触发后禁用,需重新启用
- D. 单线程使用
epoll相比select的优势不包括:
- A. 没有fd数量限制
- B. 不需要遍历所有fd
- C. 更简单的API
- D. 更好的性能
简答题
解释LT和ET模式的区别及适用场景。
为什么epoll比select/poll更高效?
描述epoll的完整工作流程。
ET模式为什么必须配合非阻塞IO?
如何使用epoll实现超时机制?
实战题
编程题: 使用epoll实现一个高性能的echo服务器,支持10000并发连接。
性能优化题: 一个基于epoll的服务器在高并发下性能不佳,请分析可能的原因并提出优化方案。
调试题: 通过strace发现epoll_wait频繁返回但没有就绪fd,请分析可能的原因。
扩展阅读
推荐资源
深入方向
- io_uring(新一代异步IO)
- IOCP(Windows完成端口)
- kqueue(BSD的IO多路复用)
- Reactor vs Proactor模式
下一章预告: 我们将深入探讨进程与线程管理,包括fork/exec机制、僵尸进程处理、线程调度策略等内容。