【编程难度第三名】高性能异步系统 - 突破性能天花板的艺术
本系列文章
➤ [NO.1 调度器]
➤ [NO.2 一致性协议](Paxos / Raft)
➤ NO.3 高性能异步系统(消息队列、回调、重试)
➤ [NO.4 交易系统](钱的事不能错)
➤ [NO.5 普通业务系统](绝大多数人做的)
一、为什么高性能异步系统如此困难?
1.1 核心挑战
代表系统:
- Nginx: 事件驱动架构,单机支持百万并发
- Redis: 单线程 + IO 多路复用,QPS 10 万+
- Netty: 异步网络框架,Dubbo/gRPC 的基础
- Envoy: 高性能代理,Service Mesh 的核心
- Kafka: 高吞吐消息队列,百万级 TPS
为什么难:
- 需要处理高并发(10 万+ QPS)
- 需要处理网络抖动、超时、重连
- 需要保证消息顺序(如果需要的话)
- 需要处理背压(backpressure)
- 需要优化性能(延迟 < 10ms,CPU < 50%)
与普通系统的维度差异:
普通系统:
- 并发: 100-1000 连接
- 延迟: 100-500ms
- 吞吐: 1000-10000 QPS
高性能异步系统:
- 并发: 10 万+ 连接
- 延迟: < 10ms (P99)
- 吞吐: 10 万+ QPS
- CPU: < 50%
- 内存: < 2GB
二、核心难点深度解析
2.1 难点一:高并发连接管理
传统模型的崩溃
一个连接一个线程模型:
问题:
- 10 万个连接 = 10 万个线程
- 每个线程栈空间 1MB
- 总内存 = 100GB (!)
- 线程切换开销巨大
- CPU 全部浪费在上下文切换上
- 系统完全无法工作
为什么会这样?
// 传统的阻塞 IO 模型
func handleClient(conn net.Conn) {
defer conn.Close()
buf := make([]byte, 4096)
for {
// 阻塞等待数据
n, err := conn.Read(buf)
if err != nil {
return
}
// 处理数据
response := process(buf[:n])
// 阻塞写入
conn.Write(response)
}
}
// 主函数
func main() {
listener, _ := net.Listen("tcp", ":8080")
for {
conn, _ := listener.Accept()
// 每个连接一个 goroutine/thread
go handleClient(conn)
}
}
问题:
conn.Read会阻塞,等待数据到来- 在等待期间,线程什么都不做,纯粹浪费
- 10 万个连接 = 10 万个阻塞的线程
---IO 多路复用 + 事件驱动
核心思想:
单线程管理多个连接:
- 不阻塞在任何一个连接上
- 使用 epoll/kqueue 监听所有连接
- 哪个连接有数据,就处理哪个
- 没有数据的连接不占用 CPU
Epoll 模型详解(Linux):
type EventLoop struct {
epollFd int
events []unix.EpollEvent
conns map[int]*Connection
mu sync.RWMutex
}
type Connection struct {
fd int
readBuf *bytes.Buffer
writeBuf *bytes.Buffer
handler func(*Connection, []byte)
}
func NewEventLoop() (*EventLoop, error) {
// 创建 epoll 实例
epollFd, err := unix.EpollCreate1(0)
if err != nil {
return nil, err
}
return &EventLoop{
epollFd: epollFd,
events: make([]unix.EpollEvent, 1024),
conns: make(map[int]*Connection),
}, nil
}
func (loop *EventLoop) AddConnection(fd int, conn *Connection) error {
loop.mu.Lock()
loop.conns[fd] = conn
loop.mu.Unlock()
// 注册到 epoll,监听读事件
event := unix.EpollEvent{
Events: unix.EPOLLIN | unix.EPOLLET, // 边缘触发
Fd: int32(fd),
}
return unix.EpollCtl(
loop.epollFd,
unix.EPOLL_CTL_ADD,
fd,
&event,
)
}
func (loop *EventLoop) Run() {
log.Info("event loop started")
for {
// 1. 等待事件(非阻塞,高效)
// 这一个系统调用可以监听 10 万个连接!
n, err := unix.EpollWait(
loop.epollFd,
loop.events,
-1, // 无限等待
)
if err != nil {
if err == unix.EINTR {
continue // 被信号中断,继续
}
log.Error("epoll wait failed", "err", err)
continue
}
// 2. 处理所有就绪的事件
for i := 0; i < n; i++ {
event := loop.events[i]
fd := int(event.Fd)
loop.mu.RLock()
conn, ok := loop.conns[fd]
loop.mu.RUnlock()
if !ok {
continue
}
// 可读事件
if event.Events&unix.EPOLLIN != 0 {
loop.handleRead(conn)
}
// 可写事件
if event.Events&unix.EPOLLOUT != 0 {
loop.handleWrite(conn)
}
// 错误或关闭
if event.Events&(unix.EPOLLERR|unix.EPOLLHUP) != 0 {
loop.handleClose(conn)
}
}
}
}
func (loop *EventLoop) handleRead(conn *Connection) {
// 非阻塞读取
buf := make([]byte, 4096)
for {
n, err := unix.Read(conn.fd, buf)
if err != nil {
if err == unix.EAGAIN {
// 数据读完了,等待下次事件
break
}
loop.handleClose(conn)
return
}
if n == 0 {
// 连接关闭
loop.handleClose(conn)
return
}
// 追加到读缓冲区
conn.readBuf.Write(buf[:n])
// 解析消息
loop.parseMessages(conn)
}
}
func (loop *EventLoop) parseMessages(conn *Connection) {
for {
// 尝试解析一条完整的消息
msg, n, ok := parseMessage(conn.readBuf.Bytes())
if !ok {
// 数据不完整,等待更多数据
break
}
// 移除已解析的数据
conn.readBuf.Next(n)
// 处理消息
conn.handler(conn, msg)
}
}
func (loop *EventLoop) handleWrite(conn *Connection) {
if conn.writeBuf.Len() == 0 {
// 没有数据要写,取消 EPOLLOUT 监听
unix.EpollCtl(
loop.epollFd,
unix.EPOLL_CTL_MOD,
conn.fd,
&unix.EpollEvent{
Events: unix.EPOLLIN | unix.EPOLLET,
Fd: int32(conn.fd),
},
)
return
}
// 非阻塞写入
for conn.writeBuf.Len() > 0 {
n, err := unix.Write(conn.fd, conn.writeBuf.Bytes())
if err != nil {
if err == unix.EAGAIN {
// 写缓冲区满了,等待下次事件
break
}
loop.handleClose(conn)
return
}
// 移除已写入的数据
conn.writeBuf.Next(n)
}
}
func (loop *EventLoop) Send(conn *Connection, data []byte) error {
// 写入发送缓冲区
conn.writeBuf.Write(data)
// 注册 EPOLLOUT 事件
return unix.EpollCtl(
loop.epollFd,
unix.EPOLL_CTL_MOD,
conn.fd,
&unix.EpollEvent{
Events: unix.EPOLLIN | unix.EPOLLOUT | unix.EPOLLET,
Fd: int32(conn.fd),
},
)
}
func (loop *EventLoop) handleClose(conn *Connection) {
log.Info("connection closed", "fd", conn.fd)
// 从 epoll 移除
unix.EpollCtl(
loop.epollFd,
unix.EPOLL_CTL_DEL,
conn.fd,
nil,
)
// 关闭连接
unix.Close(conn.fd)
// 移除映射
loop.mu.Lock()
delete(loop.conns, conn.fd)
loop.mu.Unlock()
}
关键优势:
单线程处理 10 万连接:
- 一个线程通过 epoll 监听所有连接
- 只处理有数据的连接
- 没有线程切换开销
零拷贝:
- 数据直接从内核缓冲区到应用缓冲区
- 避免不必要的内存拷贝
非阻塞 IO:
- 永远不会阻塞在某个连接上
- 最大化 CPU 利用率
边缘触发(ET):
- 只在状态变化时触发
- 减少事件数量
- 需要一次读/写完所有数据
Reactor 模式
多线程 Reactor 架构:
type Reactor struct {
mainLoop *EventLoop // 主 Reactor,负责 accept
workerLoops []*EventLoop // 工作 Reactor,负责 IO
workerCount int
nextWorker int
mu sync.Mutex
}
func NewReactor(workerCount int) (*Reactor, error) {
mainLoop, err := NewEventLoop()
if err != nil {
return nil, err
}
workerLoops := make([]*EventLoop, workerCount)
for i := 0; i < workerCount; i++ {
workerLoops[i], err = NewEventLoop()
if err != nil {
return nil, err
}
}
return &Reactor{
mainLoop: mainLoop,
workerLoops: workerLoops,
workerCount: workerCount,
}, nil
}
func (r *Reactor) Start(port int) error {
// 创建监听 socket
listenFd, err := unix.Socket(
unix.AF_INET,
unix.SOCK_STREAM|unix.SOCK_NONBLOCK,
0,
)
if err != nil {
return err
}
// 绑定端口
addr := &unix.SockaddrInet4{Port: port}
copy(addr.Addr[:], net.ParseIP("0.0.0.0").To4())
if err := unix.Bind(listenFd, addr); err != nil {
return err
}
if err := unix.Listen(listenFd, 1024); err != nil {
return err
}
log.Info("reactor started", "port", port, "workers", r.workerCount)
// 启动所有 worker
for i, worker := range r.workerLoops {
go func(id int, w *EventLoop) {
log.Info("worker started", "id", id)
w.Run()
}(i, worker)
}
// 主线程负责 accept
r.acceptLoop(listenFd)
return nil
}
func (r *Reactor) acceptLoop(listenFd int) {
for {
// 非阻塞 accept
connFd, _, err := unix.Accept(listenFd)
if err != nil {
if err == unix.EAGAIN {
// 没有新连接,等待
time.Sleep(10 * time.Millisecond)
continue
}
log.Error("accept failed", "err", err)
continue
}
// 设置非阻塞
unix.SetNonblock(connFd, true)
// 设置 TCP_NODELAY(禁用 Nagle 算法,减少延迟)
unix.SetsockoptInt(connFd, unix.IPPROTO_TCP, unix.TCP_NODELAY, 1)
// 负载均衡:轮询分配给 worker
worker := r.selectWorker()
conn := &Connection{
fd: connFd,
readBuf: &bytes.Buffer{},
writeBuf: &bytes.Buffer{},
handler: r.handleMessage,
}
// 添加到 worker 的 event loop
if err := worker.AddConnection(connFd, conn); err != nil {
log.Error("add connection failed", "err", err)
unix.Close(connFd)
continue
}
log.Info("new connection",
"fd", connFd,
"worker", r.nextWorker-1)
}
}
func (r *Reactor) selectWorker() *EventLoop {
r.mu.Lock()
defer r.mu.Unlock()
worker := r.workerLoops[r.nextWorker]
r.nextWorker = (r.nextWorker + 1) % r.workerCount
return worker
}
func (r *Reactor) handleMessage(conn *Connection, msg []byte) {
// 处理消息
log.Info("received message",
"fd", conn.fd,
"len", len(msg))
// 回复
response := []byte("OK\n")
r.mainLoop.Send(conn, response)
}
架构优势:
主 Reactor:
- 负责 accept 新连接
- 分发连接给 worker
Worker Reactors:
- 负责 IO 读写
- 并发处理多个连接
- 数量 = CPU 核心数
效果:
- 充分利用多核 CPU
- 单机支持百万并发
- 延迟 < 1ms
2.2 难点二:背压(Backpressure)处理
问题场景
生产者: 每秒产生 10 万条消息
消费者: 每秒只能处理 5 万条消息
时刻 T1: 队列 0 条
时刻 T2: 队列 5 万条 (堆积 5 万)
时刻 T3: 队列 10 万条 (堆积 10 万)
时刻 T4: 队列 15 万条 (堆积 15 万)
...
时刻 T10: 队列 50 万条
时刻 T11: 内存爆炸 OOM!
时刻 T12: 系统崩溃!
解决方案一:动态限流
type Channel struct {
buffer chan Message
capacity int
paused bool
pauseTime time.Time
mu sync.Mutex
// 统计
sendCount int64
receiveCount int64
dropCount int64
}
func NewChannel(capacity int) *Channel {
return &Channel{
buffer: make(chan Message, capacity),
capacity: capacity,
}
}
func (ch *Channel) Send(msg Message) error {
ch.mu.Lock()
defer ch.mu.Unlock()
// 1. 检查容量
currentLen := len(ch.buffer)
usage := float64(currentLen) / float64(ch.capacity)
if usage >= 0.9 {
// 超过 90%,启用背压
if !ch.paused {
ch.paused = true
ch.pauseTime = time.Now()
ch.notifyProducer("SLOW_DOWN", usage)
log.Warn("backpressure activated",
"usage", usage,
"queueLen", currentLen)
}
// 策略 1: 阻塞发送(等待消费者追上)
select {
case ch.buffer <- msg:
atomic.AddInt64(&ch.sendCount, 1)
return nil
case <-time.After(5 * time.Second):
atomic.AddInt64(&ch.dropCount, 1)
return errors.New("channel full, timeout")
}
}
if usage >= 0.95 {
// 超过 95%,丢弃低优先级消息
if msg.Priority < PriorityHigh {
atomic.AddInt64(&ch.dropCount, 1)
log.Warn("message dropped",
"priority", msg.Priority,
"usage", usage)
return errors.New("channel overloaded, message dropped")
}
}
// 2. 正常发送
select {
case ch.buffer <- msg:
atomic.AddInt64(&ch.sendCount, 1)
return nil
default:
atomic.AddInt64(&ch.dropCount, 1)
return errors.New("channel full")
}
}
func (ch *Channel) Receive() (Message, error) {
msg := <-ch.buffer
atomic.AddInt64(&ch.receiveCount, 1)
// 3. 检查是否可以恢复
ch.mu.Lock()
defer ch.mu.Unlock()
if ch.paused {
currentLen := len(ch.buffer)
usage := float64(currentLen) / float64(ch.capacity)
if usage < 0.5 {
// 降到 50%,恢复
ch.paused = false
pauseDuration := time.Since(ch.pauseTime)
ch.notifyProducer("RESUME", usage)
log.Info("backpressure released",
"usage", usage,
"pauseDuration", pauseDuration)
}
}
return msg, nil
}
func (ch *Channel) notifyProducer(action string, usage float64) {
// 通知生产者调整速率
log.Info("notifying producer",
"action", action,
"usage", usage)
}
func (ch *Channel) Stats() ChannelStats {
return ChannelStats{
SendCount: atomic.LoadInt64(&ch.sendCount),
ReceiveCount: atomic.LoadInt64(&ch.receiveCount),
DropCount: atomic.LoadInt64(&ch.dropCount),
QueueLen: len(ch.buffer),
Capacity: ch.capacity,
}
}
解决方案二:令牌桶限流
type RateLimiter struct {
rate float64 // 每秒允许的请求数
burst float64 // 突发容量
tokens float64 // 当前可用令牌
lastUpdate time.Time
mu sync.Mutex
// 自适应调整
autoAdjust bool
targetLatency time.Duration
actualLatency time.Duration
adjustInterval time.Duration
lastAdjustTime time.Time
}
func NewRateLimiter(rate, burst float64) *RateLimiter {
return &RateLimiter{
rate: rate,
burst: burst,
tokens: burst,
lastUpdate: time.Now(),
autoAdjust: true,
targetLatency: 10 * time.Millisecond,
adjustInterval: 1 * time.Second,
lastAdjustTime: time.Now(),
}
}
func (rl *RateLimiter) Allow() bool {
rl.mu.Lock()
defer rl.mu.Unlock()
now := time.Now()
elapsed := now.Sub(rl.lastUpdate).Seconds()
// 恢复令牌(每秒恢复 rate 个)
rl.tokens += elapsed * rl.rate
if rl.tokens > rl.burst {
rl.tokens = rl.burst
}
rl.lastUpdate = now
// 检查是否有可用令牌
if rl.tokens >= 1 {
rl.tokens--
return true
}
return false
}
func (rl *RateLimiter) Wait(ctx context.Context) error {
for {
if rl.Allow() {
return nil
}
// 计算需要等待的时间
rl.mu.Lock()
deficit := 1 - rl.tokens
delay := time.Duration(deficit/rl.rate) * time.Second
rl.mu.Unlock()
select {
case <-time.After(delay):
// 继续尝试
case <-ctx.Done():
return ctx.Err()
}
}
}
func (rl *RateLimiter) AdjustRate(newRate float64) {
rl.mu.Lock()
defer rl.mu.Unlock()
log.Info("adjusting rate",
"oldRate", rl.rate,
"newRate", newRate)
rl.rate = newRate
}
// 自适应调整速率
func (rl *RateLimiter) AutoAdjust(actualLatency time.Duration) {
if !rl.autoAdjust {
return
}
rl.mu.Lock()
defer rl.mu.Unlock()
now := time.Now()
if now.Sub(rl.lastAdjustTime) < rl.adjustInterval {
return
}
rl.lastAdjustTime = now
rl.actualLatency = actualLatency
// 计算调整因子
factor := float64(rl.targetLatency) / float64(actualLatency)
if factor > 1.1 {
// 延迟低于目标,可以提速
newRate := rl.rate * 1.1
log.Info("increasing rate",
"oldRate", rl.rate,
"newRate", newRate,
"latency", actualLatency)
rl.rate = newRate
} else if factor < 0.9 {
// 延迟高于目标,需要降速
newRate := rl.rate * 0.9
log.Warn("decreasing rate",
"oldRate", rl.rate,
"newRate", newRate,
"latency", actualLatency)
rl.rate = newRate
}
}
// 使用示例
func handleRequest(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 限流
if !rateLimiter.Allow() {
http.Error(w, "Too Many Requests", 429)
return
}
// 处理请求
result := processRequest(r)
// 记录延迟
latency := time.Since(start)
rateLimiter.AutoAdjust(latency)
w.Write(result)
}
2.3 难点三:消息顺序保证
挑战
场景: WebSocket 消息推送系统
用户 A 发送 3 条消息:
1. "Hello" (seq=1)
2. "How are you?" (seq=2)
3. "Bye" (seq=3)
网络层并发发送,可能乱序:
服务器收到: seq=3 -> seq=1 -> seq=2
用户 B 看到: "Bye" -> "Hello" -> "How are you?" (完全乱套!)
```序列号 + 重排序缓冲区
```go
type Message struct {
SeqNum uint64
Payload []byte
ReceivedAt time.Time
}
type OrderedChannel struct {
nextSeq uint64
pending map[uint64]Message
deliverChan chan Message
timeout time.Duration
mu sync.Mutex
// 统计
receivedCount uint64
deliveredCount uint64
droppedCount uint64
}
func NewOrderedChannel(bufferSize int, timeout time.Duration) *OrderedChannel {
oc := &OrderedChannel{
nextSeq: 1,
pending: make(map[uint64]Message),
deliverChan: make(chan Message, bufferSize),
timeout: timeout,
}
// 启动超时检查
go oc.timeoutChecker()
return oc
}
func (oc *OrderedChannel) Receive(msg Message) {
oc.mu.Lock()
defer oc.mu.Unlock()
atomic.AddUint64(&oc.receivedCount, 1)
// 记录接收时间
msg.ReceivedAt = time.Now()
log.Debug("received message",
"seq", msg.SeqNum,
"expected", oc.nextSeq)
// 1. 如果是旧消息,丢弃
if msg.SeqNum < oc.nextSeq {
log.Warn("dropping old message",
"seq", msg.SeqNum,
"expected", oc.nextSeq)
atomic.AddUint64(&oc.droppedCount, 1)
return
}
// 2. 存入待处理队列
oc.pending[msg.SeqNum] = msg
// 3. 尝试按序投递
oc.deliverPending()
}
func (oc *OrderedChannel) deliverPending() {
for {
msg, ok := oc.pending[oc.nextSeq]
if !ok {
// 下一条消息还没到
break
}
// 投递
select {
case oc.deliverChan <- msg:
atomic.AddUint64(&oc.deliveredCount, 1)
log.Debug("delivered message",
"seq", msg.SeqNum)
default:
// 投递队列满了,稍后重试
log.Warn("deliver queue full",
"seq", msg.SeqNum)
return
}
// 清理
delete(oc.pending, oc.nextSeq)
oc.nextSeq++
}
}
func (oc *OrderedChannel) timeoutChecker() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
oc.checkTimeout()
}
}
func (oc *OrderedChannel) checkTimeout() {
oc.mu.Lock()
defer oc.mu.Unlock()
if len(oc.pending) == 0 {
return
}
now := time.Now()
for seqNum, msg := range oc.pending {
if seqNum < oc.nextSeq {
// 旧消息,删除
delete(oc.pending, seqNum)
continue
}
if now.Sub(msg.ReceivedAt) > oc.timeout {
// 超时,可能丢包了
log.Warn("message timeout",
"seq", seqNum,
"expected", oc.nextSeq,
"age", now.Sub(msg.ReceivedAt))
// 策略 1: 跳过丢失的消息
if seqNum == oc.nextSeq {
log.Warn("skipping lost message",
"seq", oc.nextSeq)
atomic.AddUint64(&oc.droppedCount, 1)
oc.nextSeq++
// 尝试投递后续消息
oc.deliverPending()
}
// 策略 2: 请求重传(如果协议支持)
// requestRetransmit(oc.nextSeq)
}
}
}
func (oc *OrderedChannel) Deliver() <-chan Message {
return oc.deliverChan
}
func (oc *OrderedChannel) Stats() OrderedChannelStats {
oc.mu.Lock()
defer oc.mu.Unlock()
return OrderedChannelStats{
ReceivedCount: atomic.LoadUint64(&oc.receivedCount),
DeliveredCount: atomic.LoadUint64(&oc.deliveredCount),
DroppedCount: atomic.LoadUint64(&oc.droppedCount),
PendingCount: len(oc.pending),
NextSeq: oc.nextSeq,
}
}
分区顺序(Partition Ordering)
// 同一个分区的消息有序,不同分区之间无序
type PartitionedChannel struct {
partitions map[string]*OrderedChannel
numShards int
mu sync.RWMutex
// 统计
partitionCount int
}
func NewPartitionedChannel(numShards int) *PartitionedChannel {
return &PartitionedChannel{
partitions: make(map[string]*OrderedChannel),
numShards: numShards,
}
}
func (pc *PartitionedChannel) Send(partitionKey string, msg Message) {
// 获取或创建分区
partition := pc.getOrCreatePartition(partitionKey)
// 发送到分区
partition.Receive(msg)
}
func (pc *PartitionedChannel) getOrCreatePartition(key string) *OrderedChannel {
// 先尝试读锁
pc.mu.RLock()
partition, ok := pc.partitions[key]
pc.mu.RUnlock()
if ok {
return partition
}
// 需要创建,升级到写锁
pc.mu.Lock()
defer pc.mu.Unlock()
// 双重检查
partition, ok = pc.partitions[key]
if ok {
return partition
}
// 创建新分区
partition = NewOrderedChannel(1000, 5*time.Second)
pc.partitions[key] = partition
pc.partitionCount++
log.Info("created partition",
"key", key,
"totalPartitions", pc.partitionCount)
return partition
}
func (pc *PartitionedChannel) Subscribe(partitionKey string) <-chan Message {
partition := pc.getOrCreatePartition(partitionKey)
return partition.Deliver()
}
// 使用示例:WebSocket 服务器
func (server *WebSocketServer) handleUserMessage(userID string, msg Message) {
// 同一个用户的消息发送到同一个分区,保证顺序
server.channel.Send(userID, msg)
}
func (server *WebSocketServer) startUserWorker(userID string) {
// 订阅该用户的消息
msgChan := server.channel.Subscribe(userID)
for msg := range msgChan {
// 按顺序处理消息
server.processMessage(userID, msg)
}
}
2.4 难点四:零拷贝(Zero-Copy)优化
传统方式的开销
普通方式(4 次拷贝):
1. 磁盘 -> 内核缓冲区(DMA 拷贝)
2. 内核缓冲区 -> 用户空间(CPU 拷贝)
3. 用户空间 -> Socket 缓冲区(CPU 拷贝)
4. Socket 缓冲区 -> 网卡(DMA 拷贝)
总共: 2 次 CPU 拷贝 + 2 次 DMA 拷贝
CPU 拷贝非常昂贵!
零拷贝:sendfile
// 使用 sendfile 系统调用
func sendFile(w http.ResponseWriter, filePath string) error {
// 1. 打开文件
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
// 2. 获取文件信息
info, err := file.Stat()
if err != nil {
return err
}
// 3. 设置响应头
w.Header().Set("Content-Length", strconv.FormatInt(info.Size(), 10))
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Disposition",
fmt.Sprintf("attachment; filename=\"%s\"", filepath.Base(filePath)))
// 4. 获取底层的 TCP 连接
hijacker, ok := w.(http.Hijacker)
if !ok {
// Fallback: 使用普通方式
_, err = io.Copy(w, file)
return err
}
conn, _, err := hijacker.Hijack()
if err != nil {
return err
}
defer conn.Close()
// 5. 获取文件和 socket 的 fd
fileFd := int(file.Fd())
tcpConn := conn.(*net.TCPConn)
socketFile, err := tcpConn.File()
if err != nil {
return err
}
defer socketFile.Close()
socketFd := int(socketFile.Fd())
// 6. 使用 sendfile (零拷贝!)
// 数据直接从文件描述符传输到 socket
_, err = unix.Sendfile(socketFd, fileFd, nil, int(info.Size()))
return err
}
性能对比:
文件大小: 1GB
普通方式:
- CPU 拷贝: 2GB (用户空间 <-> 内核空间)
- 耗时: 2 秒
- CPU: 100%
零拷贝(sendfile):
- CPU 拷贝: 0GB
- 耗时: 0.5 秒
- CPU: 10%
性能提升: 4 倍!
零拷贝:mmap(内存映射)
func readFileWithMmap(filePath string) ([]byte, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return nil, err
}
// 将文件映射到内存
data, err := unix.Mmap(
int(file.Fd()),
0, // offset
int(info.Size()),
unix.PROT_READ,
unix.MAP_SHARED,
)
if err != nil {
return nil, err
}
// 数据在内存中,无需拷贝
// 操作系统会按需加载(page fault)
return data, nil
}
// 使用示例
func serveFile(w http.ResponseWriter, r *http.Request) {
data, err := readFileWithMmap("/path/to/large/file")
if err != nil {
http.Error(w, err.Error(), 500)
return
}
defer unix.Munmap(data)
// 直接写入(内核会优化)
w.Write(data)
}
三、性能优化技巧
3.1 对象池(减少 GC 压力)
var bufferPool = sync.Pool{
New: func() interface{} {
// 预分配 4KB buffer
return make([]byte, 4096)
},
}
func handleConnection(conn net.Conn) {
// 从池中获取 buffer
buf := bufferPool.Get().([]byte)
defer bufferPool.Put(buf) // 归还到池中
n, err := conn.Read(buf)
if err != nil {
return
}
// 处理数据
process(buf[:n])
}
// 性能对比
// 不使用对象池:
// - 每次分配新 buffer
// - GC 压力大
// - 吞吐: 50K QPS
//
// 使用对象池:
// - 复用 buffer
// - GC 压力小
// - 吞吐: 100K QPS
3.2 批量处理(减少系统调用)
type BatchWriter struct {
conn net.Conn
buffer *bytes.Buffer
mu sync.Mutex
timer *time.Timer
batchSize int
batchTime time.Duration
// 统计
writeCount int64
batchCount int64
}
func NewBatchWriter(conn net.Conn) *BatchWriter {
return &BatchWriter{
conn: conn,
buffer: &bytes.Buffer{},
batchSize: 4096, // 4KB
batchTime: 10 * time.Millisecond,
}
}
func (bw *BatchWriter) Write(data []byte) error {
bw.mu.Lock()
defer bw.mu.Unlock()
// 写入缓冲区
bw.buffer.Write(data)
atomic.AddInt64(&bw.writeCount, 1)
// 触发条件:
// 1. 缓冲区满(4KB)
// 2. 定时刷新(10ms)
if bw.buffer.Len() >= bw.batchSize {
return bw.flushLocked()
}
if bw.timer == nil {
bw.timer = time.AfterFunc(bw.batchTime, func() {
bw.mu.Lock()
bw.flushLocked()
bw.mu.Unlock()
})
}
return nil
}
func (bw *BatchWriter) flushLocked() error {
if bw.buffer.Len() == 0 {
return nil
}
// 批量写入(一次系统调用)
_, err := bw.conn.Write(bw.buffer.Bytes())
if err != nil {
return err
}
atomic.AddInt64(&bw.batchCount, 1)
log.Debug("batch flushed",
"bytes", bw.buffer.Len(),
"writes", atomic.LoadInt64(&bw.writeCount),
"batches", atomic.LoadInt64(&bw.batchCount))
bw.buffer.Reset()
if bw.timer != nil {
bw.timer.Stop()
bw.timer = nil
}
return nil
}
func (bw *BatchWriter) Flush() error {
bw.mu.Lock()
defer bw.mu.Unlock()
return bw.flushLocked()
}
// 性能对比
// 不批量处理:
// - 10 万次写入 = 10 万次系统调用
// - 吞吐: 30K QPS
//
// 批量处理:
// - 10 万次写入 = 1000 次系统调用
// - 吞吐: 100K QPS
3.3 预分配内存
// 差的做法
func bad() {
var results []Result
for i := 0; i < 10000; i++ {
results = append(results, process(i)) // 频繁扩容
}
}
// 好的做法
func good() {
results := make([]Result, 0, 10000) // 预分配容量
for i := 0; i < 10000; i++ {
results = append(results, process(i)) // 不需要扩容
}
}
// 性能对比
// 差的做法:
// - 多次扩容和拷贝
// - 耗时: 500ms
//
// 好的做法:
// - 一次分配,不扩容
// - 耗时: 100ms
3.4 并发处理 + Worker Pool
type WorkerPool struct {
workers int
taskChan chan Task
wg sync.WaitGroup
}
type Task func()
func NewWorkerPool(workers int) *WorkerPool {
return &WorkerPool{
workers: workers,
taskChan: make(chan Task, workers*2),
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workers; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
log.Info("worker started", "id", id)
for task := range wp.taskChan {
task()
}
log.Info("worker stopped", "id", id)
}
func (wp *WorkerPool) Submit(task Task) {
wp.taskChan <- task
}
func (wp *WorkerPool) Stop() {
close(wp.taskChan)
wp.wg.Wait()
}
// 使用示例
func processRequests(requests []Request) {
pool := NewWorkerPool(runtime.NumCPU())
pool.Start()
defer pool.Stop()
for _, req := range requests {
r := req // 避免闭包问题
pool.Submit(func() {
handleRequest(r)
})
}
}
四、完整案例:高性能 WebSocket 服务器
type WebSocketServer struct {
reactor *Reactor
clients sync.Map // map[string]*Client
broadcast chan Message
register chan *Client
unregister chan *Client
// 性能指标
metrics struct {
connections int64
messages int64
errors int64
latency *LatencyHistogram
}
}
type Client struct {
ID string
conn *websocket.Conn
send chan Message
server *WebSocketServer
lastPing time.Time
}
func NewWebSocketServer(workers int) *WebSocketServer {
reactor, _ := NewReactor(workers)
return &WebSocketServer{
reactor: reactor,
broadcast: make(chan Message, 10000),
register: make(chan *Client, 1000),
unregister: make(chan *Client, 1000),
}
}
func (s *WebSocketServer) Run() {
// 启动广播协程
for i := 0; i < runtime.NumCPU(); i++ {
go s.handleBroadcast()
}
// 启动管理协程
go s.manage()
// 启动 Reactor
s.reactor.Start(8080)
}
func (s *WebSocketServer) manage() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case client := <-s.register:
s.clients.Store(client.ID, client)
atomic.AddInt64(&s.metrics.connections, 1)
log.Info("client registered",
"id", client.ID,
"total", atomic.LoadInt64(&s.metrics.connections))
case client := <-s.unregister:
s.clients.Delete(client.ID)
atomic.AddInt64(&s.metrics.connections, -1)
close(client.send)
log.Info("client unregistered",
"id", client.ID,
"total", atomic.LoadInt64(&s.metrics.connections))
case <-ticker.C:
// 检查僵尸连接
s.checkZombieConnections()
// 打印统计
s.printStats()
}
}
}
func (s *WebSocketServer) handleBroadcast() {
for msg := range s.broadcast {
start := time.Now()
atomic.AddInt64(&s.metrics.messages, 1)
// 广播消息到所有客户端
s.clients.Range(func(key, value interface{}) bool {
client := value.(*Client)
select {
case client.send <- msg:
// 发送成功
default:
// 发送失败,客户端可能阻塞
log.Warn("client send buffer full",
"id", client.ID)
s.unregister <- client
}
return true
})
// 记录延迟
latency := time.Since(start)
s.metrics.latency.Record(latency)
}
}
func (s *WebSocketServer) checkZombieConnections() {
now := time.Now()
s.clients.Range(func(key, value interface{}) bool {
client := value.(*Client)
if now.Sub(client.lastPing) > 60*time.Second {
log.Warn("zombie connection detected",
"id", client.ID,
"lastPing", client.lastPing)
s.unregister <- client
}
return true
})
}
func (s *WebSocketServer) printStats() {
log.Info("server stats",
"connections", atomic.LoadInt64(&s.metrics.connections),
"messages", atomic.LoadInt64(&s.metrics.messages),
"errors", atomic.LoadInt64(&s.metrics.errors),
"latency_p50", s.metrics.latency.Percentile(0.5),
"latency_p99", s.metrics.latency.Percentile(0.99))
}
func (c *Client) readPump() {
defer func() {
c.server.unregister <- c
c.conn.Close()
}()
// 设置读取超时
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.lastPing = time.Now()
return nil
})
for {
var msg Message
err := c.conn.ReadJSON(&msg)
if err != nil {
if websocket.IsUnexpectedCloseError(err,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure) {
log.Error("read error", "id", c.ID, "err", err)
}
break
}
// 发送到广播频道
c.server.broadcast <- msg
}
}
func (c *Client) writePump() {
ticker := time.NewTicker(30 * time.Second)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case msg, ok := <-c.send:
if !ok {
// 频道关闭
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// 设置写入超时
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.conn.WriteJSON(msg); err != nil {
return
}
case <-ticker.C:
// 发送心跳
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
性能指标:
硬件: 8 核 CPU, 16GB 内存
连接数: 10 万+ 并发连接
吞吐量: 100 万 消息/秒
延迟: P99 < 10ms
CPU: < 50%
内存: < 2GB
五、总结
5.1 为什么高性能异步系统排第三?
难度仅次于调度器和一致性协议:
- 并发控制复杂: 10 万+ 连接管理
- 背压处理困难: 生产者-消费者速率不匹配
- 顺序保证棘手: 网络乱序、重传、丢包
- 性能优化深入: 零拷贝、对象池、批处理
但比前两者简单:
- 单机为主: 不像调度器需要分布式协调
- 状态相对简单: 不像 Raft 有复杂的选举和日志复制
- 有成熟模式: Reactor、Proactor、事件驱动
5.2 关键技术点
核心架构:
- IO 多路复用(epoll/kqueue)
- 事件驱动
- Reactor 模式
- 非阻塞 IO
性能优化:
- 零拷贝(sendfile/mmap)
- 对象池
- 批量处理
- 预分配内存
- Worker Pool
背压控制:
- 动态限流
- 令牌桶
- 自适应调整
顺序保证:
- 序列号
- 重排序缓冲区
- 分区顺序
5.3 学习路径
- 理解基础: IO 模型、epoll、事件驱动
- 看源码: Nginx、Redis、Netty
- 写 Demo: 实现简单的 Echo 服务器
- 压力测试: wrk、ab、JMeter
- 生产使用: 在真实项目中应用
高性能异步系统是性能优化的艺术,值得深入学习!
本系列文章
➤ [NO.1 调度器]
➤ [NO.2 一致性协议](Paxos / Raft)
➤ NO.3 高性能异步系统(消息队列、回调、重试)
➤ [NO.4 交易系统](钱的事不能错)
➤ [NO.5 普通业务系统](绝大多数人做的)