HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 系统设计实战

    • 系统设计面试教程
    • 系统设计方法论
    • 01-短链系统设计
    • 02 - 秒杀系统设计
    • 03 - IM 即时通讯系统设计
    • 04 - Feed 流系统设计
    • 05 - 分布式 ID 生成器设计
    • 06 - 限流系统设计
    • 第7章:搜索引擎设计
    • 08 - 推荐系统设计
    • 09 - 支付系统设计
    • 10 - 电商系统设计
    • 11 - 直播系统设计
    • 第12章:缓存系统设计
    • 第13章:消息队列设计
    • 第14章:分布式事务
    • 15 - 监控系统设计

11 - 直播系统设计

> 面试频率: 需求类型指标
并发观众单直播间 100 万
延迟< 3 秒(CDN)、< 500ms(WebRTC)
可用性99.9%
带宽单主播 5 Mbps 上行,单观众 2 Mbps 下行
消息延迟聊天/弹幕 < 500ms

1.3 面试官可能的追问

Q1: 直播和点播有什么区别?

A1:

  • 直播:实时推送,延迟低(< 3秒),无法快进
  • 点播:预录制,可缓存,可快进/回退
  • 技术:直播需要推流服务器、CDN分发

Q2: 如何降低直播延迟?

A2:

  • 协议选择:WebRTC(< 500ms)> RTMP(1-3s)> HLS(5-10s)
  • 编码优化:H.265 压缩率更高
  • CDN 节点:边缘节点靠近用户
  • 弱网优化:动态码率调整

2. 容量估算

2.1 场景假设

假设为一个中型直播平台设计系统:

  • 日活用户(DAU):500 万
  • 同时在线用户:50 万
  • 同时在线直播间:5000 个
  • 平均每直播间观众:100 人
  • 主播推流码率:2 Mbps
  • 观众拉流码率:1 Mbps

2.2 带宽估算

上行带宽(主播推流)

同时在线直播间 = 5000 个
单主播码率 = 2 Mbps

总上行带宽 = 5000 × 2 Mbps = 10 Gbps

下行带宽(观众拉流)

同时在线观众 = 50 万
单观众码率 = 1 Mbps

总下行带宽 = 50 万 × 1 Mbps = 500 Gbps(500 Tbps)

CDN 成本估算

带宽成本(按 0.5 元/GB):
日流量 = 500 Gbps × 86400 秒 / 8 = 5.4 PB
日成本 = 5.4 PB × 1024 × 0.5 元 ≈ 2,764,800 元
月成本 ≈ 8300 万元

2.3 消息 QPS 估算

聊天消息

同时在线用户 = 50 万
每用户每分钟发消息 = 1 条

消息 QPS = 50 万 / 60 ≈ 8,333 QPS
峰值 QPS = 8,333 × 3 ≈ 25,000 QPS

弹幕

弹幕 QPS = 聊天 QPS × 2 ≈ 50,000 QPS

2.4 存储估算

直播回放

单直播时长 = 2 小时
单直播大小 = 2 Mbps × 7200 秒 / 8 = 1.8 GB

日直播数 = 5000 × 5(每天 5 场)= 25,000 场
日存储 = 25,000 × 1.8 GB = 45 TB

月存储(保留 3 个月)= 45 TB × 90 ≈ 4 PB

聊天记录

单条消息 = 200 字节
日消息数 = 8,333 QPS × 86400 ≈ 7.2 亿条
日存储 = 7.2 亿 × 200 字节 ≈ 144 GB

月存储 = 144 GB × 30 ≈ 4.3 TB

2.5 服务器估算

推流服务器

单服务器支持并发推流 = 200 路
所需服务器 = 5000 / 200 = 25 台

聊天服务器

单服务器 QPS = 10,000
所需服务器 = 25,000 / 10,000 × 2(冗余)= 5 台

3. API 设计

3.1 核心 API

3.1.1 创建直播间

POST /api/v1/live/rooms

Request:
{
  "title": "户外探险直播",
  "cover_url": "https://cdn.example.com/cover.jpg",
  "category": "outdoor",
  "tags": ["探险", "户外"],
  "notice": "欢迎来到我的直播间"
}

Response:
{
  "code": 0,
  "message": "success",
  "data": {
    "room_id": "12345",
    "push_url": "rtmp://push.example.com/live/12345?key=abc123",
    "play_urls": {
      "rtmp": "rtmp://play.example.com/live/12345",
      "flv": "https://play.example.com/live/12345.flv",
      "hls": "https://play.example.com/live/12345.m3u8"
    },
    "status": "created",
    "created_at": "2023-11-13T14:30:00Z"
  }
}

3.1.2 开始推流

POST /api/v1/live/rooms/{room_id}/start

Response:
{
  "code": 0,
  "message": "success",
  "data": {
    "status": "living",
    "online_count": 0,
    "started_at": "2023-11-13T14:35:00Z"
  }
}

3.1.3 获取直播间信息

GET /api/v1/live/rooms/{room_id}

Response:
{
  "code": 0,
  "message": "success",
  "data": {
    "room_id": "12345",
    "title": "户外探险直播",
    "status": "living",              // created | living | paused | ended
    "online_count": 1234,
    "streamer": {
      "user_id": 1001,
      "nickname": "探险家",
      "avatar": "https://cdn.example.com/avatar.jpg"
    },
    "play_urls": {
      "hd": "https://play.example.com/live/12345_hd.flv",
      "sd": "https://play.example.com/live/12345_sd.flv"
    },
    "stats": {
      "total_views": 50000,
      "likes": 1200,
      "gifts_count": 500
    }
  }
}

3.1.4 发送聊天消息

POST /api/v1/live/rooms/{room_id}/messages

Request:
{
  "message_type": "text",           // text | image | gift
  "content": "主播加油!",
  "metadata": {}
}

Response:
{
  "code": 0,
  "message": "success",
  "data": {
    "message_id": "msg_123456",
    "timestamp": 1699876200
  }
}

3.1.5 发送弹幕

POST /api/v1/live/rooms/{room_id}/danmaku

Request:
{
  "content": "666",
  "color": "#FFFFFF",
  "position": "scroll"              // scroll | top | bottom
}

Response:
{
  "code": 0,
  "message": "success"
}

3.1.6 赠送礼物

POST /api/v1/live/rooms/{room_id}/gifts

Request:
{
  "gift_id": 1001,
  "count": 1,
  "receiver_user_id": 1001
}

Response:
{
  "code": 0,
  "message": "success",
  "data": {
    "order_id": "gift_order_123",
    "total_amount": 10.00,
    "balance_after": 990.00
  }
}

4. 数据模型设计

4.1 直播间表

CREATE TABLE live_rooms (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    room_id VARCHAR(64) NOT NULL UNIQUE,
    
    -- 基本信息
    title VARCHAR(256) NOT NULL,
    cover_url VARCHAR(512),
    category VARCHAR(50),
    tags JSON,
    notice TEXT,
    
    -- 主播信息
    streamer_user_id BIGINT NOT NULL,
    
    -- 状态
    status VARCHAR(20) NOT NULL COMMENT 'created|living|paused|ended',
    
    -- 推拉流地址
    push_url VARCHAR(512),
    play_url_rtmp VARCHAR(512),
    play_url_flv VARCHAR(512),
    play_url_hls VARCHAR(512),
    
    -- 统计信息
    online_count INT DEFAULT 0,
    total_views INT DEFAULT 0,
    likes_count INT DEFAULT 0,
    gifts_count INT DEFAULT 0,
    
    -- 时间
    started_at DATETIME,
    ended_at DATETIME,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    
    INDEX idx_streamer (streamer_user_id),
    INDEX idx_status (status),
    INDEX idx_created (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='直播间表';

4.2 聊天消息表

CREATE TABLE chat_messages (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    message_id VARCHAR(64) NOT NULL UNIQUE,
    
    room_id VARCHAR(64) NOT NULL,
    user_id BIGINT NOT NULL,
    
    message_type VARCHAR(20) NOT NULL COMMENT 'text|image|gift|system',
    content TEXT,
    
    -- 元数据(JSON)
    metadata JSON,
    
    timestamp BIGINT NOT NULL,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    
    INDEX idx_room_time (room_id, timestamp),
    INDEX idx_user (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='聊天消息表';

4.3 弹幕表

CREATE TABLE danmaku (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    
    room_id VARCHAR(64) NOT NULL,
    user_id BIGINT NOT NULL,
    
    content VARCHAR(200) NOT NULL,
    color VARCHAR(20) DEFAULT '#FFFFFF',
    position VARCHAR(20) DEFAULT 'scroll',
    
    timestamp BIGINT NOT NULL,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    
    INDEX idx_room_time (room_id, timestamp)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='弹幕表';

4.4 礼物表

CREATE TABLE gifts (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    gift_id INT NOT NULL UNIQUE,
    
    name VARCHAR(100) NOT NULL,
    icon_url VARCHAR(512),
    animation_url VARCHAR(512),
    
    price DECIMAL(10, 2) NOT NULL,
    
    -- 礼物类型
    type VARCHAR(20) COMMENT 'normal|luxury|combo',
    
    -- 特效
    effect JSON COMMENT '{"duration": 3, "animation": "firework"}',
    
    status VARCHAR(20) DEFAULT 'active',
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    
    INDEX idx_price (price)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='礼物表';

4.5 礼物订单表

CREATE TABLE gift_orders (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    order_id VARCHAR(64) NOT NULL UNIQUE,
    
    room_id VARCHAR(64) NOT NULL,
    
    -- 送礼人
    sender_user_id BIGINT NOT NULL,
    
    -- 收礼人(主播)
    receiver_user_id BIGINT NOT NULL,
    
    gift_id INT NOT NULL,
    count INT DEFAULT 1,
    
    -- 金额
    unit_price DECIMAL(10, 2) NOT NULL,
    total_amount DECIMAL(10, 2) NOT NULL,
    
    status VARCHAR(20) DEFAULT 'success',
    
    timestamp BIGINT NOT NULL,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    
    INDEX idx_room (room_id),
    INDEX idx_sender (sender_user_id),
    INDEX idx_receiver (receiver_user_id),
    INDEX idx_timestamp (timestamp)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='礼物订单表';

4.6 在线用户表(Redis)

# 直播间在线用户集合
ZADD live:room:{room_id}:online {timestamp} {user_id}

# 用户在线状态
SET user:{user_id}:online:room {room_id} EX 30

# 直播间在线数
GET live:room:{room_id}:online_count

5. 架构设计

5.1 整体架构

┌─────────────────────────────────────────────────────────────┐
│                        主播端                                │
│                 (OBS / 移动端推流)                           │
└─────────────────────┬───────────────────────────────────────┘
                      │ RTMP / WebRTC
                      
┌─────────────────────────────────────────────────────────────┐
│                    推流服务器                                │
│               (SRS / Nginx-RTMP)                            │
│                                                             │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐                │
│  │  转码    │  │  录制    │  │  截图    │                │
│  └──────────┘  └──────────┘  └──────────┘                │
└─────────────┬───────────────────────────────────────────────┘
              │
              
┌─────────────────────────────────────────────────────────────┐
│                        CDN                                  │
│              (边缘节点分发)                                  │
│                                                             │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐                │
│  │  华北    │  │  华东    │  │  华南    │                │
│  └──────────┘  └──────────┘  └──────────┘                │
└─────────────┬───────────────────────────────────────────────┘
              │ HLS / FLV / WebRTC
              
┌─────────────────────────────────────────────────────────────┐
│                       观众端                                 │
│                 (Web / App / H5)                            │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│                    业务服务层                                │
│                                                             │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐  │
│  │ 直播间   │  │ 聊天室   │  │ 弹幕     │  │ 礼物     │  │
│  │ 服务     │  │ 服务     │  │ 服务     │  │ 服务     │  │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘  │
└─────────────────────────────────────────────────────────────┘

5.2 V1: 基础直播系统(MVP)

适用场景:初创产品、直播间 < 100 个

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "sync"
    "time"
    
    "github.com/google/uuid"
    "github.com/gorilla/mux"
    "github.com/gorilla/websocket"
)

// ==================== 数据结构 ====================

// LiveRoom 直播间
type LiveRoom struct {
    RoomID         string    `json:"room_id"`
    Title          string    `json:"title"`
    StreamerUserID int64     `json:"streamer_user_id"`
    Status         string    `json:"status"` // created | living | ended
    PushURL        string    `json:"push_url"`
    PlayURLs       *PlayURLs `json:"play_urls"`
    OnlineCount    int       `json:"online_count"`
    TotalViews     int       `json:"total_views"`
    StartedAt      time.Time `json:"started_at,omitempty"`
    CreatedAt      time.Time `json:"created_at"`
}

// PlayURLs 播放地址
type PlayURLs struct {
    RTMP string `json:"rtmp"`
    FLV  string `json:"flv"`
    HLS  string `json:"hls"`
}

// ChatMessage 聊天消息
type ChatMessage struct {
    MessageID   string                 `json:"message_id"`
    RoomID      string                 `json:"room_id"`
    UserID      int64                  `json:"user_id"`
    Nickname    string                 `json:"nickname"`
    MessageType string                 `json:"message_type"` // text | gift | system
    Content     string                 `json:"content"`
    Metadata    map[string]interface{} `json:"metadata,omitempty"`
    Timestamp   int64                  `json:"timestamp"`
}

// ==================== 直播间服务 ====================

// LiveRoomService 直播间服务
type LiveRoomService struct {
    rooms map[string]*LiveRoom
    mu    sync.RWMutex
}

// NewLiveRoomService 创建直播间服务
func NewLiveRoomService() *LiveRoomService {
    return &LiveRoomService{
        rooms: make(map[string]*LiveRoom),
    }
}

// CreateRoom 创建直播间
func (s *LiveRoomService) CreateRoom(title string, streamerUserID int64) (*LiveRoom, error) {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    roomID := generateRoomID()
    
    room := &LiveRoom{
        RoomID:         roomID,
        Title:          title,
        StreamerUserID: streamerUserID,
        Status:         "created",
        PushURL:        fmt.Sprintf("rtmp://push.example.com/live/%s", roomID),
        PlayURLs: &PlayURLs{
            RTMP: fmt.Sprintf("rtmp://play.example.com/live/%s", roomID),
            FLV:  fmt.Sprintf("https://play.example.com/live/%s.flv", roomID),
            HLS:  fmt.Sprintf("https://play.example.com/live/%s.m3u8", roomID),
        },
        OnlineCount: 0,
        TotalViews:  0,
        CreatedAt:   time.Now(),
    }
    
    s.rooms[roomID] = room
    
    return room, nil
}

// StartLive 开始直播
func (s *LiveRoomService) StartLive(roomID string) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    room, exists := s.rooms[roomID]
    if !exists {
        return fmt.Errorf("直播间不存在")
    }
    
    if room.Status == "living" {
        return fmt.Errorf("直播已开始")
    }
    
    room.Status = "living"
    room.StartedAt = time.Now()
    
    return nil
}

// GetRoom 获取直播间信息
func (s *LiveRoomService) GetRoom(roomID string) (*LiveRoom, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    
    room, exists := s.rooms[roomID]
    if !exists {
        return nil, fmt.Errorf("直播间不存在")
    }
    
    return room, nil
}

// IncrOnlineCount 增加在线人数
func (s *LiveRoomService) IncrOnlineCount(roomID string) {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    if room, exists := s.rooms[roomID]; exists {
        room.OnlineCount++
        room.TotalViews++
    }
}

// DecrOnlineCount 减少在线人数
func (s *LiveRoomService) DecrOnlineCount(roomID string) {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    if room, exists := s.rooms[roomID]; exists && room.OnlineCount > 0 {
        room.OnlineCount--
    }
}

// ==================== 聊天室服务 ====================

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

// ChatRoomService 聊天室服务
type ChatRoomService struct {
    // 房间 -> 客户端连接
    rooms map[string]map[*Client]bool
    
    // 广播消息通道
    broadcast chan *ChatMessage
    
    // 注册通道
    register chan *Client
    
    // 注销通道
    unregister chan *Client
    
    mu sync.RWMutex
}

// Client WebSocket 客户端
type Client struct {
    RoomID   string
    UserID   int64
    Nickname string
    Conn     *websocket.Conn
    Send     chan []byte
}

// NewChatRoomService 创建聊天室服务
func NewChatRoomService() *ChatRoomService {
    return &ChatRoomService{
        rooms:      make(map[string]map[*Client]bool),
        broadcast:  make(chan *ChatMessage, 256),
        register:   make(chan *Client),
        unregister: make(chan *Client),
    }
}

// Run 运行聊天室服务
func (s *ChatRoomService) Run() {
    for {
        select {
        case client := <-s.register:
            s.mu.Lock()
            if s.rooms[client.RoomID] == nil {
                s.rooms[client.RoomID] = make(map[*Client]bool)
            }
            s.rooms[client.RoomID][client] = true
            s.mu.Unlock()
            
            fmt.Printf("用户 %d 加入直播间 %s\n", client.UserID, client.RoomID)
            
        case client := <-s.unregister:
            s.mu.Lock()
            if clients, ok := s.rooms[client.RoomID]; ok {
                if _, exists := clients[client]; exists {
                    delete(clients, client)
                    close(client.Send)
                    
                    if len(clients) == 0 {
                        delete(s.rooms, client.RoomID)
                    }
                }
            }
            s.mu.Unlock()
            
            fmt.Printf("用户 %d 离开直播间 %s\n", client.UserID, client.RoomID)
            
        case message := <-s.broadcast:
            s.mu.RLock()
            clients := s.rooms[message.RoomID]
            s.mu.RUnlock()
            
            // 广播消息
            data, _ := json.Marshal(message)
            for client := range clients {
                select {
                case client.Send <- data:
                default:
                    close(client.Send)
                    delete(clients, client)
                }
            }
        }
    }
}

// Broadcast 广播消息
func (s *ChatRoomService) Broadcast(message *ChatMessage) {
    s.broadcast <- message
}

// HandleWebSocket 处理 WebSocket 连接
func (s *ChatRoomService) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
    // 升级为 WebSocket
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        fmt.Printf("WebSocket upgrade failed: %v\n", err)
        return
    }
    
    // 从 URL 参数获取信息
    roomID := r.URL.Query().Get("room_id")
    userID := int64(12345) // 实际应该从 token 解析
    nickname := "用户" + uuid.New().String()[:8]
    
    client := &Client{
        RoomID:   roomID,
        UserID:   userID,
        Nickname: nickname,
        Conn:     conn,
        Send:     make(chan []byte, 256),
    }
    
    // 注册客户端
    s.register <- client
    
    // 启动读写协程
    go client.writePump()
    go client.readPump(s)
}

// readPump 读取客户端消息
func (c *Client) readPump(service *ChatRoomService) {
    defer func() {
        service.unregister <- c
        c.Conn.Close()
    }()
    
    c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
    c.Conn.SetPongHandler(func(string) error {
        c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
        return nil
    })
    
    for {
        _, messageData, err := c.Conn.ReadMessage()
        if err != nil {
            break
        }
        
        // 解析消息
        var msg map[string]interface{}
        if err := json.Unmarshal(messageData, &msg); err != nil {
            continue
        }
        
        // 构建聊天消息
        chatMsg := &ChatMessage{
            MessageID:   generateMessageID(),
            RoomID:      c.RoomID,
            UserID:      c.UserID,
            Nickname:    c.Nickname,
            MessageType: "text",
            Content:     msg["content"].(string),
            Timestamp:   time.Now().Unix(),
        }
        
        // 广播
        service.Broadcast(chatMsg)
    }
}

// writePump 发送消息给客户端
func (c *Client) writePump() {
    ticker := time.NewTicker(54 * time.Second)
    defer func() {
        ticker.Stop()
        c.Conn.Close()
    }()
    
    for {
        select {
        case message, ok := <-c.Send:
            c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
            if !ok {
                c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }
            
            if err := c.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
                return
            }
            
        case <-ticker.C:
            c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
            if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        }
    }
}

// ==================== HTTP Handlers ====================

type LiveHandler struct {
    liveService *LiveRoomService
    chatService *ChatRoomService
}

func (h *LiveHandler) CreateRoomHandler(w http.ResponseWriter, r *http.Request) {
    var req struct {
        Title          string `json:"title"`
        StreamerUserID int64  `json:"streamer_user_id"`
    }
    
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }
    
    room, err := h.liveService.CreateRoom(req.Title, req.StreamerUserID)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    json.NewEncoder(w).Encode(map[string]interface{}{
        "code":    0,
        "message": "success",
        "data":    room,
    })
}

func (h *LiveHandler) GetRoomHandler(w http.ResponseWriter, r *http.Request) {
    vars := mux.Vars(r)
    roomID := vars["room_id"]
    
    room, err := h.liveService.GetRoom(roomID)
    if err != nil {
        http.Error(w, err.Error(), http.StatusNotFound)
        return
    }
    
    json.NewEncoder(w).Encode(map[string]interface{}{
        "code":    0,
        "message": "success",
        "data":    room,
    })
}

func (h *LiveHandler) StartLiveHandler(w http.ResponseWriter, r *http.Request) {
    vars := mux.Vars(r)
    roomID := vars["room_id"]
    
    if err := h.liveService.StartLive(roomID); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }
    
    json.NewEncoder(w).Encode(map[string]interface{}{
        "code":    0,
        "message": "success",
    })
}

// ==================== 辅助函数 ====================

func generateRoomID() string {
    return uuid.New().String()[:8]
}

func generateMessageID() string {
    return "msg_" + uuid.New().String()
}

// ==================== Main ====================

func main() {
    liveService := NewLiveRoomService()
    chatService := NewChatRoomService()
    
    // 启动聊天室服务
    go chatService.Run()
    
    handler := &LiveHandler{
        liveService: liveService,
        chatService: chatService,
    }
    
    // 设置路由
    r := mux.NewRouter()
    r.HandleFunc("/api/v1/live/rooms", handler.CreateRoomHandler).Methods("POST")
    r.HandleFunc("/api/v1/live/rooms/{room_id}", handler.GetRoomHandler).Methods("GET")
    r.HandleFunc("/api/v1/live/rooms/{room_id}/start", handler.StartLiveHandler).Methods("POST")
    r.HandleFunc("/ws/chat", chatService.HandleWebSocket)
    
    fmt.Println("直播服务启动在 :8080")
    http.ListenAndServe(":8080", r)
}

V1 特点:

  • 简单易实现
  • 支持基础直播功能
  • WebSocket 实时聊天
  • 单机部署,无法扩展
  • 无消息持久化

5.3 V2: 分布式架构 + 消息队列

优化点:

  1. Redis Pub/Sub 实现分布式聊天室
  2. Kafka 消息持久化
  3. 弹幕限流
  4. 礼物系统
// ==================== 分布式聊天室(Redis Pub/Sub)====================

package distributed

import (
    "context"
    "encoding/json"
    "fmt"
    
    "github.com/go-redis/redis/v8"
)

// DistributedChatService 分布式聊天服务
type DistributedChatService struct {
    redis *redis.Client
    
    // 本地连接管理
    localClients map[string]map[*Client]bool
    
    // 订阅的房间
    subscriptions map[string]*redis.PubSub
}

// NewDistributedChatService 创建分布式聊天服务
func NewDistributedChatService(redisClient *redis.Client) *DistributedChatService {
    return &DistributedChatService{
        redis:         redisClient,
        localClients:  make(map[string]map[*Client]bool),
        subscriptions: make(map[string]*redis.PubSub),
    }
}

// SubscribeRoom 订阅房间消息
func (s *DistributedChatService) SubscribeRoom(roomID string) {
    if _, exists := s.subscriptions[roomID]; exists {
        return
    }
    
    ctx := context.Background()
    pubsub := s.redis.Subscribe(ctx, fmt.Sprintf("chat:room:%s", roomID))
    s.subscriptions[roomID] = pubsub
    
    // 接收消息
    go func() {
        ch := pubsub.Channel()
        for msg := range ch {
            var chatMsg ChatMessage
            if err := json.Unmarshal([]byte(msg.Payload), &chatMsg); err != nil {
                continue
            }
            
            // 广播给本地客户端
            s.broadcastToLocal(roomID, &chatMsg)
        }
    }()
}

// PublishMessage 发布消息到 Redis
func (s *DistributedChatService) PublishMessage(message *ChatMessage) error {
    ctx := context.Background()
    
    // 1. 发布到 Redis Pub/Sub
    data, _ := json.Marshal(message)
    channelName := fmt.Sprintf("chat:room:%s", message.RoomID)
    
    if err := s.redis.Publish(ctx, channelName, data).Err(); err != nil {
        return err
    }
    
    // 2. 异步写入 Kafka(持久化)
    go s.writeToKafka(message)
    
    return nil
}

// broadcastToLocal 广播给本地客户端
func (s *DistributedChatService) broadcastToLocal(roomID string, message *ChatMessage) {
    clients := s.localClients[roomID]
    data, _ := json.Marshal(message)
    
    for client := range clients {
        select {
        case client.Send <- data:
        default:
            close(client.Send)
            delete(clients, client)
        }
    }
}

// writeToKafka 写入 Kafka
func (s *DistributedChatService) writeToKafka(message *ChatMessage) {
    // 实际实现:发送到 Kafka
    // kafkaProducer.Produce("chat-messages", message)
}

// ==================== 弹幕限流 ====================

import (
    "time"
    
    "golang.org/x/time/rate"
)

// DanmakuLimiter 弹幕限流器
type DanmakuLimiter struct {
    // 用户级限流
    userLimiters map[int64]*rate.Limiter
    
    // 房间级限流
    roomLimiters map[string]*rate.Limiter
}

// NewDanmakuLimiter 创建弹幕限流器
func NewDanmakuLimiter() *DanmakuLimiter {
    return &DanmakuLimiter{
        userLimiters: make(map[int64]*rate.Limiter),
        roomLimiters: make(map[string]*rate.Limiter),
    }
}

// AllowUser 检查用户是否允许发送弹幕
func (l *DanmakuLimiter) AllowUser(userID int64) bool {
    limiter, exists := l.userLimiters[userID]
    if !exists {
        // 每用户每秒 3 条
        limiter = rate.NewLimiter(rate.Every(time.Second/3), 3)
        l.userLimiters[userID] = limiter
    }
    
    return limiter.Allow()
}

// AllowRoom 检查房间是否允许接收弹幕
func (l *DanmakuLimiter) AllowRoom(roomID string) bool {
    limiter, exists := l.roomLimiters[roomID]
    if !exists {
        // 每房间每秒 100 条
        limiter = rate.NewLimiter(rate.Every(time.Second/100), 100)
        l.roomLimiters[roomID] = limiter
    }
    
    return limiter.Allow()
}

// ==================== 礼物系统 ====================

package gift

import (
    "context"
    "fmt"
)

// GiftService 礼物服务
type GiftService struct {
    db    *sql.DB
    redis *redis.Client
}

// SendGift 赠送礼物
func (s *GiftService) SendGift(
    roomID string,
    senderUserID, receiverUserID int64,
    giftID int,
    count int,
) (*GiftOrder, error) {
    // 1. 查询礼物信息
    gift, err := s.getGift(giftID)
    if err != nil {
        return nil, err
    }
    
    // 2. 计算总金额
    totalAmount := gift.Price * float64(count)
    
    // 3. 扣除用户余额(调用支付服务)
    if err := s.deductBalance(senderUserID, totalAmount); err != nil {
        return nil, err
    }
    
    // 4. 创建礼物订单
    order := &GiftOrder{
        OrderID:         generateOrderID(),
        RoomID:          roomID,
        SenderUserID:    senderUserID,
        ReceiverUserID:  receiverUserID,
        GiftID:          giftID,
        Count:           count,
        UnitPrice:       gift.Price,
        TotalAmount:     totalAmount,
        Status:          "success",
        Timestamp:       time.Now().Unix(),
    }
    
    _, err = s.db.Exec(`
        INSERT INTO gift_orders (
            order_id, room_id, sender_user_id, receiver_user_id,
            gift_id, count, unit_price, total_amount, status, timestamp
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    `, order.OrderID, order.RoomID, order.SenderUserID, order.ReceiverUserID,
        order.GiftID, order.Count, order.UnitPrice, order.TotalAmount,
        order.Status, order.Timestamp)
    
    if err != nil {
        // 回滚余额
        s.refundBalance(senderUserID, totalAmount)
        return nil, err
    }
    
    // 5. 发送礼物消息到聊天室
    giftMessage := &ChatMessage{
        MessageID:   generateMessageID(),
        RoomID:      roomID,
        UserID:      senderUserID,
        MessageType: "gift",
        Content:     fmt.Sprintf("赠送了 %d 个 %s", count, gift.Name),
        Metadata: map[string]interface{}{
            "gift_id":       giftID,
            "gift_name":     gift.Name,
            "count":         count,
            "animation_url": gift.AnimationURL,
        },
        Timestamp: time.Now().Unix(),
    }
    
    s.chatService.PublishMessage(giftMessage)
    
    // 6. 更新主播收益(异步)
    go s.updateStreamerIncome(receiverUserID, totalAmount*0.5) // 50% 分成
    
    return order, nil
}

type GiftOrder struct {
    OrderID        string
    RoomID         string
    SenderUserID   int64
    ReceiverUserID int64
    GiftID         int
    Count          int
    UnitPrice      float64
    TotalAmount    float64
    Status         string
    Timestamp      int64
}

type Gift struct {
    GiftID       int
    Name         string
    Price        float64
    AnimationURL string
}

func (s *GiftService) getGift(giftID int) (*Gift, error) {
    var gift Gift
    err := s.db.QueryRow(`
        SELECT gift_id, name, price, animation_url 
        FROM gifts 
        WHERE gift_id = ?
    `, giftID).Scan(&gift.GiftID, &gift.Name, &gift.Price, &gift.AnimationURL)
    
    return &gift, err
}

func (s *GiftService) deductBalance(userID int64, amount float64) error {
    // 调用支付服务
    return nil
}

func (s *GiftService) refundBalance(userID int64, amount float64) error {
    // 退款
    return nil
}

func (s *GiftService) updateStreamerIncome(userID int64, amount float64) error {
    // 更新主播收益
    return nil
}

func generateOrderID() string {
    return "gift_" + uuid.New().String()
}

V2 特点:

  • 分布式架构,支持水平扩展
  • Redis Pub/Sub 实现消息广播
  • Kafka 持久化消息
  • 弹幕限流
  • 礼物系统

5.4 V3: CDN 优化 + 低延迟

优化点:

  1. WebRTC 低延迟推拉流
  2. CDN 智能调度
  3. 多清晰度转码
  4. 回放系统
// ==================== CDN 调度 ====================

package cdn

import (
    "fmt"
    "math"
)

// CDNScheduler CDN 调度器
type CDNScheduler struct {
    nodes []*CDNNode
}

// CDNNode CDN 节点
type CDNNode struct {
    NodeID    string
    Region    string  // "华北" | "华东" | "华南"
    Bandwidth float64 // 可用带宽(Gbps)
    Load      float64 // 负载(0-1)
    Latency   int     // 延迟(ms)
}

// SelectBestNode 选择最佳 CDN 节点
func (s *CDNScheduler) SelectBestNode(userRegion string, userIP string) *CDNNode {
    var bestNode *CDNNode
    bestScore := -1.0
    
    for _, node := range s.nodes {
        // 计算综合分数
        score := s.calculateScore(node, userRegion)
        
        if score > bestScore {
            bestScore = score
            bestNode = node
        }
    }
    
    return bestNode
}

// calculateScore 计算节点分数
func (s *CDNScheduler) calculateScore(node *CDNNode, userRegion string) float64 {
    // 1. 地域匹配(同地域加分)
    regionScore := 0.0
    if node.Region == userRegion {
        regionScore = 1.0
    } else {
        regionScore = 0.5
    }
    
    // 2. 负载(负载越低越好)
    loadScore := 1.0 - node.Load
    
    // 3. 延迟(延迟越低越好)
    latencyScore := 1.0 / (1.0 + float64(node.Latency)/100.0)
    
    // 4. 带宽(带宽越大越好)
    bandwidthScore := math.Log(node.Bandwidth + 1)
    
    // 加权综合
    score := regionScore*0.4 + loadScore*0.3 + latencyScore*0.2 + bandwidthScore*0.1
    
    return score
}

// ==================== 多清晰度转码 ====================

package transcode

// TranscodeService 转码服务
type TranscodeService struct {
    ffmpegPath string
}

// TranscodeProfile 转码配置
type TranscodeProfile struct {
    Name       string // "超清" | "高清" | "标清"
    Resolution string // "1920x1080" | "1280x720" | "640x480"
    Bitrate    string // "2000k" | "1000k" | "500k"
    Framerate  int    // 30 | 25 | 15
}

// Transcode 转码
func (s *TranscodeService) Transcode(inputURL string, profiles []*TranscodeProfile) error {
    for _, profile := range profiles {
        outputURL := fmt.Sprintf("%s_%s.flv", inputURL, profile.Name)
        
        // 构建 ffmpeg 命令
        cmd := fmt.Sprintf(`
            %s -i %s \
            -c:v libx264 -preset veryfast \
            -b:v %s -r %d \
            -s %s \
            -c:a aac -b:a 128k \
            -f flv %s
        `, s.ffmpegPath, inputURL, profile.Bitrate, profile.Framerate,
            profile.Resolution, outputURL)
        
        // 异步执行
        go s.executeFFmpeg(cmd)
    }
    
    return nil
}

func (s *TranscodeService) executeFFmpeg(cmd string) {
    // 执行 ffmpeg 命令
    // exec.Command("sh", "-c", cmd).Run()
}

// ==================== 直播回放 ====================

package replay

import (
    "fmt"
    "time"
)

// ReplayService 回放服务
type ReplayService struct {
    db         *sql.DB
    storageURL string
}

// CreateReplay 创建回放
func (s *ReplayService) CreateReplay(roomID string, startTime, endTime time.Time) (*Replay, error) {
    replayID := generateReplayID()
    
    // 1. 从录制文件生成回放
    recordFile := fmt.Sprintf("%s/record/%s_%d.flv",
        s.storageURL, roomID, startTime.Unix())
    
    replayFile := fmt.Sprintf("%s/replay/%s.mp4", s.storageURL, replayID)
    
    // 2. 转换为 MP4 格式
    if err := s.convertToMP4(recordFile, replayFile); err != nil {
        return nil, err
    }
    
    // 3. 生成封面图
    coverURL := s.generateCover(replayFile)
    
    // 4. 保存到数据库
    duration := endTime.Sub(startTime).Seconds()
    
    replay := &Replay{
        ReplayID:  replayID,
        RoomID:    roomID,
        Title:     "直播回放",
        VideoURL:  replayFile,
        CoverURL:  coverURL,
        Duration:  int(duration),
        StartTime: startTime,
        EndTime:   endTime,
        CreatedAt: time.Now(),
    }
    
    _, err := s.db.Exec(`
        INSERT INTO replays (
            replay_id, room_id, title, video_url, cover_url,
            duration, start_time, end_time, created_at
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    `, replay.ReplayID, replay.RoomID, replay.Title, replay.VideoURL,
        replay.CoverURL, replay.Duration, replay.StartTime, replay.EndTime,
        replay.CreatedAt)
    
    return replay, err
}

type Replay struct {
    ReplayID  string
    RoomID    string
    Title     string
    VideoURL  string
    CoverURL  string
    Duration  int
    StartTime time.Time
    EndTime   time.Time
    CreatedAt time.Time
}

func (s *ReplayService) convertToMP4(inputFile, outputFile string) error {
    // ffmpeg 转换
    return nil
}

func (s *ReplayService) generateCover(videoFile string) string {
    // 生成封面图
    return ""
}

func generateReplayID() string {
    return "replay_" + uuid.New().String()
}

V3 特点:

  • CDN 智能调度
  • 多清晰度自适应
  • WebRTC 低延迟(< 500ms)
  • 直播回放
  • 支持百万并发

6. 核心优化

6.1 延迟优化

问题:直播延迟高(> 5 秒)

解决方案:

协议延迟场景
HLS5-10s普通直播
RTMP2-5s传统直播
HTTP-FLV1-3s低延迟直播
WebRTC< 500ms实时互动

优化策略:

  1. 使用 HTTP-FLV / WebRTC
  2. 减少 GOP 大小(关键帧间隔)
  3. CDN 边缘节点
  4. 码率自适应

6.2 弱网优化

// 动态码率调整
type AdaptiveBitrateService struct {
    profiles []*BitrateProfile
}

type BitrateProfile struct {
    Bitrate    int    // kbps
    Resolution string
    Quality    string // "超清" | "高清" | "标清"
}

// SelectProfile 根据网络状况选择码率
func (s *AdaptiveBitrateService) SelectProfile(bandwidth int) *BitrateProfile {
    // 选择最接近但不超过带宽的配置
    for i := len(s.profiles) - 1; i >= 0; i-- {
        if s.profiles[i].Bitrate <= bandwidth*80/100 { // 预留 20% 缓冲
            return s.profiles[i]
        }
    }
    
    // 返回最低画质
    return s.profiles[0]
}

6.3 大规模并发优化

// 长连接优化
type ConnectionPool struct {
    maxConns    int
    activeConns int
    mu          sync.Mutex
}

// Acquire 获取连接
func (p *ConnectionPool) Acquire() bool {
    p.mu.Lock()
    defer p.mu.Unlock()
    
    if p.activeConns >= p.maxConns {
        return false
    }
    
    p.activeConns++
    return true
}

// Release 释放连接
func (p *ConnectionPool) Release() {
    p.mu.Lock()
    defer p.mu.Unlock()
    
    p.activeConns--
}

7. 监控与告警

var (
    liveRoomCount = prometheus.NewGauge(
        prometheus.GaugeOpts{
            Name: "live_room_count",
            Help: "Current live room count",
        },
    )
    
    onlineUserCount = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "live_online_user_count",
            Help: "Online user count per room",
        },
        []string{"room_id"},
    )
    
    messageLatency = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "live_message_latency_seconds",
            Help:    "Message latency",
            Buckets: []float64{0.001, 0.01, 0.1, 0.5, 1},
        },
        []string{"message_type"},
    )
    
    streamBitrate = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "live_stream_bitrate_kbps",
            Help: "Stream bitrate",
        },
        []string{"room_id", "quality"},
    )
)

8. 面试问答(10个高频题)

直播的推流和拉流是什么?

答:

  • 推流(Push):主播将音视频流推送到服务器

    • 协议:RTMP、WebRTC
    • 工具:OBS、FFmpeg、移动端 SDK
  • 拉流(Pull):观众从服务器拉取视频流

    • 协议:HLS、HTTP-FLV、WebRTC
    • 播放器:Video.js、ijkplayer

流程:

主播端 → RTMP推流 → 推流服务器 → 转码/录制 → CDN → HLS/FLV拉流 → 观众端

如何降低直播延迟?

答:

协议选择:

  • HLS:5-10s(HTTP + M3U8切片)
  • RTMP:2-5s
  • HTTP-FLV:1-3s
  • WebRTC:< 500ms

优化方法:

  1. 减少 GOP:关键帧间隔从 3s → 1s
  2. CDN 优化:边缘节点缓存
  3. 编码优化:H.265 压缩率更高
  4. 网络优化:UDP 代替 TCP(WebRTC)

如何实现百万并发?

答:

架构优化:

  1. CDN 分发:减轻源站压力
单源站 10 Gbps
CDN 回源 1:100
支持 1000 Gbps = 100 万观众(1 Mbps/人)
  1. 长连接优化(聊天):
  • 单机支持 100 万连接(C10M)
  • 使用 epoll(Linux)、kqueue(macOS)
  1. 消息队列:
  • Redis Pub/Sub 广播
  • Kafka 持久化
  1. 分布式部署:
  • 多机房、多地域
  • 负载均衡

聊天室如何实现?

答:

单机版:WebSocket + 内存广播

分布式版:Redis Pub/Sub

// 发送消息
redis.Publish("chat:room:12345", message)

// 订阅消息
pubsub := redis.Subscribe("chat:room:12345")
for msg := range pubsub.Channel() {
    broadcastToLocalClients(msg)
}

消息持久化:

  • Kafka 存储历史消息
  • 用户上线后拉取离线消息

弹幕如何实现?

答:

发送流程:

用户发送 → 敏感词过滤 → 限流 → 广播给所有观众

限流策略:

// 用户级:每秒 3 条
userLimiter := rate.NewLimiter(3, 3)

// 房间级:每秒 100 条
roomLimiter := rate.NewLimiter(100, 100)

敏感词过滤:

  • DFA 算法(确定性有限状态自动机)
  • Trie 树
  • 正则表达式

显示优化:

  • 前端随机轨道
  • 碰撞检测
  • 限制同屏数量(< 50 条)

礼物系统如何设计?

答:

流程:

  1. 用户点击送礼
  2. 扣除用户余额
  3. 创建礼物订单
  4. 广播礼物消息(带动画)
  5. 更新主播收益

关键点:

  • 事务性:扣款和订单原子性
  • 幂等性:防止重复扣款
  • 分成:主播 50%,平台 50%
func SendGift(userID, streamerID int64, giftID int) error {
    tx, _ := db.Begin()
    defer tx.Rollback()
    
    // 1. 扣款
    tx.Exec(`UPDATE accounts SET balance = balance - ? WHERE user_id = ?`, giftPrice, userID)
    
    // 2. 创建订单
    tx.Exec(`INSERT INTO gift_orders ...`)
    
    // 3. 增加主播收益
    tx.Exec(`UPDATE accounts SET balance = balance + ? WHERE user_id = ?`, giftPrice*0.5, streamerID)
    
    return tx.Commit()
}

如何实现直播回放?

答:

录制:

  • 推流服务器(SRS、Nginx-RTMP)自动录制
  • 格式:FLV、MP4

回放生成:

  1. 直播结束后触发任务
  2. 转码为 MP4(ffmpeg)
  3. 生成封面图
  4. 上传到对象存储(OSS)
  5. 入库

优化:

  • 切片(HLS):支持快进
  • 多清晰度
  • CDN 缓存

如何监控直播质量?

答:

推流质量:

  • 帧率(FPS):>= 25
  • 码率(Bitrate):稳定,无剧烈波动
  • 丢包率:< 1%

拉流质量:

  • 首屏时间:< 2s
  • 卡顿率:< 5%
  • 播放成功率:> 99%

指标采集:

// 推流服务器采集
type StreamMetrics struct {
    RoomID    string
    FPS       int
    Bitrate   int // kbps
    PacketLoss float64
}

// 上报到 Prometheus
streamBitrate.WithLabelValues(roomID, "hd").Set(float64(bitrate))

告警:

  • 帧率 < 15:推流异常
  • 卡顿率 > 10%:网络问题
  • 在线人数暴跌:服务故障

WebRTC 和 RTMP 有什么区别?

答:

对比项RTMPWebRTC
延迟2-5s< 500ms
协议TCPUDP(SRTP)
浏览器支持需要 Flash原生支持
穿透 NAT困难STUN/TURN
应用场景传统直播实时互动、连麦

选择建议:

  • 普通直播:RTMP + HTTP-FLV
  • 连麦、视频会议:WebRTC

如何设计大规模直播系统的架构?

答:

整体架构:

┌──────────┐
│  主播端  │ RTMP推流
└─────┬────┘
      ↓
┌─────────────┐
│ 推流服务器  │ 转码、录制
│ (SRS集群)   │
└─────┬───────┘
      ↓
┌─────────────┐
│    CDN      │ 分发
│  (边缘节点) │
└─────┬───────┘
      ↓ HLS/FLV
┌──────────┐
│  观众端  │
└──────────┘

关键点:

  1. 推流:集群部署,负载均衡
  2. 转码:GPU 加速,多清晰度
  3. CDN:边缘节点,智能调度
  4. 聊天:Redis Pub/Sub,分布式
  5. 存储:对象存储(OSS),CDN 缓存
  6. 监控:Prometheus + Grafana

容量规划:

  • 推流服务器:5000 路 / 25 台 = 200 路/台
  • CDN 带宽:50 万观众 × 1 Mbps = 500 Gbps
  • 聊天服务器:25,000 QPS / 5 台 = 5000 QPS/台

9. 总结

核心要点

  1. 推拉流

    • RTMP 推流、HLS/FLV 拉流
    • 转码、录制、截图
  2. 聊天室

    • WebSocket 实时通信
    • Redis Pub/Sub 分布式
    • Kafka 消息持久化
  3. 弹幕

    • 限流(用户级、房间级)
    • 敏感词过滤
    • 前端碰撞检测
  4. 低延迟

    • WebRTC(< 500ms)
    • HTTP-FLV(1-3s)
    • CDN 边缘节点
  5. 大规模并发

    • CDN 分发(1:100)
    • 长连接优化(C10M)
    • 消息队列

架构演进

V1: 单机 WebSocket 聊天室
    ↓
V2: Redis Pub/Sub + Kafka 持久化
    ↓
V3: CDN + WebRTC + 多清晰度

本章完,祝面试顺利!

Prev
10 - 电商系统设计
Next
第12章:缓存系统设计