11 - 直播系统设计
| > 面试频率: 需求类型 | 指标 |
|---|---|
| 并发观众 | 单直播间 100 万 |
| 延迟 | < 3 秒(CDN)、< 500ms(WebRTC) |
| 可用性 | 99.9% |
| 带宽 | 单主播 5 Mbps 上行,单观众 2 Mbps 下行 |
| 消息延迟 | 聊天/弹幕 < 500ms |
1.3 面试官可能的追问
Q1: 直播和点播有什么区别?
A1:
- 直播:实时推送,延迟低(< 3秒),无法快进
- 点播:预录制,可缓存,可快进/回退
- 技术:直播需要推流服务器、CDN分发
Q2: 如何降低直播延迟?
A2:
- 协议选择:WebRTC(< 500ms)> RTMP(1-3s)> HLS(5-10s)
- 编码优化:H.265 压缩率更高
- CDN 节点:边缘节点靠近用户
- 弱网优化:动态码率调整
2. 容量估算
2.1 场景假设
假设为一个中型直播平台设计系统:
- 日活用户(DAU):500 万
- 同时在线用户:50 万
- 同时在线直播间:5000 个
- 平均每直播间观众:100 人
- 主播推流码率:2 Mbps
- 观众拉流码率:1 Mbps
2.2 带宽估算
上行带宽(主播推流)
同时在线直播间 = 5000 个
单主播码率 = 2 Mbps
总上行带宽 = 5000 × 2 Mbps = 10 Gbps
下行带宽(观众拉流)
同时在线观众 = 50 万
单观众码率 = 1 Mbps
总下行带宽 = 50 万 × 1 Mbps = 500 Gbps(500 Tbps)
CDN 成本估算
带宽成本(按 0.5 元/GB):
日流量 = 500 Gbps × 86400 秒 / 8 = 5.4 PB
日成本 = 5.4 PB × 1024 × 0.5 元 ≈ 2,764,800 元
月成本 ≈ 8300 万元
2.3 消息 QPS 估算
聊天消息
同时在线用户 = 50 万
每用户每分钟发消息 = 1 条
消息 QPS = 50 万 / 60 ≈ 8,333 QPS
峰值 QPS = 8,333 × 3 ≈ 25,000 QPS
弹幕
弹幕 QPS = 聊天 QPS × 2 ≈ 50,000 QPS
2.4 存储估算
直播回放
单直播时长 = 2 小时
单直播大小 = 2 Mbps × 7200 秒 / 8 = 1.8 GB
日直播数 = 5000 × 5(每天 5 场)= 25,000 场
日存储 = 25,000 × 1.8 GB = 45 TB
月存储(保留 3 个月)= 45 TB × 90 ≈ 4 PB
聊天记录
单条消息 = 200 字节
日消息数 = 8,333 QPS × 86400 ≈ 7.2 亿条
日存储 = 7.2 亿 × 200 字节 ≈ 144 GB
月存储 = 144 GB × 30 ≈ 4.3 TB
2.5 服务器估算
推流服务器
单服务器支持并发推流 = 200 路
所需服务器 = 5000 / 200 = 25 台
聊天服务器
单服务器 QPS = 10,000
所需服务器 = 25,000 / 10,000 × 2(冗余)= 5 台
3. API 设计
3.1 核心 API
3.1.1 创建直播间
POST /api/v1/live/rooms
Request:
{
"title": "户外探险直播",
"cover_url": "https://cdn.example.com/cover.jpg",
"category": "outdoor",
"tags": ["探险", "户外"],
"notice": "欢迎来到我的直播间"
}
Response:
{
"code": 0,
"message": "success",
"data": {
"room_id": "12345",
"push_url": "rtmp://push.example.com/live/12345?key=abc123",
"play_urls": {
"rtmp": "rtmp://play.example.com/live/12345",
"flv": "https://play.example.com/live/12345.flv",
"hls": "https://play.example.com/live/12345.m3u8"
},
"status": "created",
"created_at": "2023-11-13T14:30:00Z"
}
}
3.1.2 开始推流
POST /api/v1/live/rooms/{room_id}/start
Response:
{
"code": 0,
"message": "success",
"data": {
"status": "living",
"online_count": 0,
"started_at": "2023-11-13T14:35:00Z"
}
}
3.1.3 获取直播间信息
GET /api/v1/live/rooms/{room_id}
Response:
{
"code": 0,
"message": "success",
"data": {
"room_id": "12345",
"title": "户外探险直播",
"status": "living", // created | living | paused | ended
"online_count": 1234,
"streamer": {
"user_id": 1001,
"nickname": "探险家",
"avatar": "https://cdn.example.com/avatar.jpg"
},
"play_urls": {
"hd": "https://play.example.com/live/12345_hd.flv",
"sd": "https://play.example.com/live/12345_sd.flv"
},
"stats": {
"total_views": 50000,
"likes": 1200,
"gifts_count": 500
}
}
}
3.1.4 发送聊天消息
POST /api/v1/live/rooms/{room_id}/messages
Request:
{
"message_type": "text", // text | image | gift
"content": "主播加油!",
"metadata": {}
}
Response:
{
"code": 0,
"message": "success",
"data": {
"message_id": "msg_123456",
"timestamp": 1699876200
}
}
3.1.5 发送弹幕
POST /api/v1/live/rooms/{room_id}/danmaku
Request:
{
"content": "666",
"color": "#FFFFFF",
"position": "scroll" // scroll | top | bottom
}
Response:
{
"code": 0,
"message": "success"
}
3.1.6 赠送礼物
POST /api/v1/live/rooms/{room_id}/gifts
Request:
{
"gift_id": 1001,
"count": 1,
"receiver_user_id": 1001
}
Response:
{
"code": 0,
"message": "success",
"data": {
"order_id": "gift_order_123",
"total_amount": 10.00,
"balance_after": 990.00
}
}
4. 数据模型设计
4.1 直播间表
CREATE TABLE live_rooms (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
room_id VARCHAR(64) NOT NULL UNIQUE,
-- 基本信息
title VARCHAR(256) NOT NULL,
cover_url VARCHAR(512),
category VARCHAR(50),
tags JSON,
notice TEXT,
-- 主播信息
streamer_user_id BIGINT NOT NULL,
-- 状态
status VARCHAR(20) NOT NULL COMMENT 'created|living|paused|ended',
-- 推拉流地址
push_url VARCHAR(512),
play_url_rtmp VARCHAR(512),
play_url_flv VARCHAR(512),
play_url_hls VARCHAR(512),
-- 统计信息
online_count INT DEFAULT 0,
total_views INT DEFAULT 0,
likes_count INT DEFAULT 0,
gifts_count INT DEFAULT 0,
-- 时间
started_at DATETIME,
ended_at DATETIME,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_streamer (streamer_user_id),
INDEX idx_status (status),
INDEX idx_created (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='直播间表';
4.2 聊天消息表
CREATE TABLE chat_messages (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
message_id VARCHAR(64) NOT NULL UNIQUE,
room_id VARCHAR(64) NOT NULL,
user_id BIGINT NOT NULL,
message_type VARCHAR(20) NOT NULL COMMENT 'text|image|gift|system',
content TEXT,
-- 元数据(JSON)
metadata JSON,
timestamp BIGINT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_room_time (room_id, timestamp),
INDEX idx_user (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='聊天消息表';
4.3 弹幕表
CREATE TABLE danmaku (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
room_id VARCHAR(64) NOT NULL,
user_id BIGINT NOT NULL,
content VARCHAR(200) NOT NULL,
color VARCHAR(20) DEFAULT '#FFFFFF',
position VARCHAR(20) DEFAULT 'scroll',
timestamp BIGINT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_room_time (room_id, timestamp)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='弹幕表';
4.4 礼物表
CREATE TABLE gifts (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
gift_id INT NOT NULL UNIQUE,
name VARCHAR(100) NOT NULL,
icon_url VARCHAR(512),
animation_url VARCHAR(512),
price DECIMAL(10, 2) NOT NULL,
-- 礼物类型
type VARCHAR(20) COMMENT 'normal|luxury|combo',
-- 特效
effect JSON COMMENT '{"duration": 3, "animation": "firework"}',
status VARCHAR(20) DEFAULT 'active',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_price (price)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='礼物表';
4.5 礼物订单表
CREATE TABLE gift_orders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id VARCHAR(64) NOT NULL UNIQUE,
room_id VARCHAR(64) NOT NULL,
-- 送礼人
sender_user_id BIGINT NOT NULL,
-- 收礼人(主播)
receiver_user_id BIGINT NOT NULL,
gift_id INT NOT NULL,
count INT DEFAULT 1,
-- 金额
unit_price DECIMAL(10, 2) NOT NULL,
total_amount DECIMAL(10, 2) NOT NULL,
status VARCHAR(20) DEFAULT 'success',
timestamp BIGINT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_room (room_id),
INDEX idx_sender (sender_user_id),
INDEX idx_receiver (receiver_user_id),
INDEX idx_timestamp (timestamp)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='礼物订单表';
4.6 在线用户表(Redis)
# 直播间在线用户集合
ZADD live:room:{room_id}:online {timestamp} {user_id}
# 用户在线状态
SET user:{user_id}:online:room {room_id} EX 30
# 直播间在线数
GET live:room:{room_id}:online_count
5. 架构设计
5.1 整体架构
┌─────────────────────────────────────────────────────────────┐
│ 主播端 │
│ (OBS / 移动端推流) │
└─────────────────────┬───────────────────────────────────────┘
│ RTMP / WebRTC
┌─────────────────────────────────────────────────────────────┐
│ 推流服务器 │
│ (SRS / Nginx-RTMP) │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 转码 │ │ 录制 │ │ 截图 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────┬───────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────────────┐
│ CDN │
│ (边缘节点分发) │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 华北 │ │ 华东 │ │ 华南 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────┬───────────────────────────────────────────────┘
│ HLS / FLV / WebRTC
┌─────────────────────────────────────────────────────────────┐
│ 观众端 │
│ (Web / App / H5) │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 业务服务层 │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 直播间 │ │ 聊天室 │ │ 弹幕 │ │ 礼物 │ │
│ │ 服务 │ │ 服务 │ │ 服务 │ │ 服务 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────┘
5.2 V1: 基础直播系统(MVP)
适用场景:初创产品、直播间 < 100 个
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)
// ==================== 数据结构 ====================
// LiveRoom 直播间
type LiveRoom struct {
RoomID string `json:"room_id"`
Title string `json:"title"`
StreamerUserID int64 `json:"streamer_user_id"`
Status string `json:"status"` // created | living | ended
PushURL string `json:"push_url"`
PlayURLs *PlayURLs `json:"play_urls"`
OnlineCount int `json:"online_count"`
TotalViews int `json:"total_views"`
StartedAt time.Time `json:"started_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
// PlayURLs 播放地址
type PlayURLs struct {
RTMP string `json:"rtmp"`
FLV string `json:"flv"`
HLS string `json:"hls"`
}
// ChatMessage 聊天消息
type ChatMessage struct {
MessageID string `json:"message_id"`
RoomID string `json:"room_id"`
UserID int64 `json:"user_id"`
Nickname string `json:"nickname"`
MessageType string `json:"message_type"` // text | gift | system
Content string `json:"content"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
Timestamp int64 `json:"timestamp"`
}
// ==================== 直播间服务 ====================
// LiveRoomService 直播间服务
type LiveRoomService struct {
rooms map[string]*LiveRoom
mu sync.RWMutex
}
// NewLiveRoomService 创建直播间服务
func NewLiveRoomService() *LiveRoomService {
return &LiveRoomService{
rooms: make(map[string]*LiveRoom),
}
}
// CreateRoom 创建直播间
func (s *LiveRoomService) CreateRoom(title string, streamerUserID int64) (*LiveRoom, error) {
s.mu.Lock()
defer s.mu.Unlock()
roomID := generateRoomID()
room := &LiveRoom{
RoomID: roomID,
Title: title,
StreamerUserID: streamerUserID,
Status: "created",
PushURL: fmt.Sprintf("rtmp://push.example.com/live/%s", roomID),
PlayURLs: &PlayURLs{
RTMP: fmt.Sprintf("rtmp://play.example.com/live/%s", roomID),
FLV: fmt.Sprintf("https://play.example.com/live/%s.flv", roomID),
HLS: fmt.Sprintf("https://play.example.com/live/%s.m3u8", roomID),
},
OnlineCount: 0,
TotalViews: 0,
CreatedAt: time.Now(),
}
s.rooms[roomID] = room
return room, nil
}
// StartLive 开始直播
func (s *LiveRoomService) StartLive(roomID string) error {
s.mu.Lock()
defer s.mu.Unlock()
room, exists := s.rooms[roomID]
if !exists {
return fmt.Errorf("直播间不存在")
}
if room.Status == "living" {
return fmt.Errorf("直播已开始")
}
room.Status = "living"
room.StartedAt = time.Now()
return nil
}
// GetRoom 获取直播间信息
func (s *LiveRoomService) GetRoom(roomID string) (*LiveRoom, error) {
s.mu.RLock()
defer s.mu.RUnlock()
room, exists := s.rooms[roomID]
if !exists {
return nil, fmt.Errorf("直播间不存在")
}
return room, nil
}
// IncrOnlineCount 增加在线人数
func (s *LiveRoomService) IncrOnlineCount(roomID string) {
s.mu.Lock()
defer s.mu.Unlock()
if room, exists := s.rooms[roomID]; exists {
room.OnlineCount++
room.TotalViews++
}
}
// DecrOnlineCount 减少在线人数
func (s *LiveRoomService) DecrOnlineCount(roomID string) {
s.mu.Lock()
defer s.mu.Unlock()
if room, exists := s.rooms[roomID]; exists && room.OnlineCount > 0 {
room.OnlineCount--
}
}
// ==================== 聊天室服务 ====================
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// ChatRoomService 聊天室服务
type ChatRoomService struct {
// 房间 -> 客户端连接
rooms map[string]map[*Client]bool
// 广播消息通道
broadcast chan *ChatMessage
// 注册通道
register chan *Client
// 注销通道
unregister chan *Client
mu sync.RWMutex
}
// Client WebSocket 客户端
type Client struct {
RoomID string
UserID int64
Nickname string
Conn *websocket.Conn
Send chan []byte
}
// NewChatRoomService 创建聊天室服务
func NewChatRoomService() *ChatRoomService {
return &ChatRoomService{
rooms: make(map[string]map[*Client]bool),
broadcast: make(chan *ChatMessage, 256),
register: make(chan *Client),
unregister: make(chan *Client),
}
}
// Run 运行聊天室服务
func (s *ChatRoomService) Run() {
for {
select {
case client := <-s.register:
s.mu.Lock()
if s.rooms[client.RoomID] == nil {
s.rooms[client.RoomID] = make(map[*Client]bool)
}
s.rooms[client.RoomID][client] = true
s.mu.Unlock()
fmt.Printf("用户 %d 加入直播间 %s\n", client.UserID, client.RoomID)
case client := <-s.unregister:
s.mu.Lock()
if clients, ok := s.rooms[client.RoomID]; ok {
if _, exists := clients[client]; exists {
delete(clients, client)
close(client.Send)
if len(clients) == 0 {
delete(s.rooms, client.RoomID)
}
}
}
s.mu.Unlock()
fmt.Printf("用户 %d 离开直播间 %s\n", client.UserID, client.RoomID)
case message := <-s.broadcast:
s.mu.RLock()
clients := s.rooms[message.RoomID]
s.mu.RUnlock()
// 广播消息
data, _ := json.Marshal(message)
for client := range clients {
select {
case client.Send <- data:
default:
close(client.Send)
delete(clients, client)
}
}
}
}
}
// Broadcast 广播消息
func (s *ChatRoomService) Broadcast(message *ChatMessage) {
s.broadcast <- message
}
// HandleWebSocket 处理 WebSocket 连接
func (s *ChatRoomService) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
// 升级为 WebSocket
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
fmt.Printf("WebSocket upgrade failed: %v\n", err)
return
}
// 从 URL 参数获取信息
roomID := r.URL.Query().Get("room_id")
userID := int64(12345) // 实际应该从 token 解析
nickname := "用户" + uuid.New().String()[:8]
client := &Client{
RoomID: roomID,
UserID: userID,
Nickname: nickname,
Conn: conn,
Send: make(chan []byte, 256),
}
// 注册客户端
s.register <- client
// 启动读写协程
go client.writePump()
go client.readPump(s)
}
// readPump 读取客户端消息
func (c *Client) readPump(service *ChatRoomService) {
defer func() {
service.unregister <- c
c.Conn.Close()
}()
c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.Conn.SetPongHandler(func(string) error {
c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
for {
_, messageData, err := c.Conn.ReadMessage()
if err != nil {
break
}
// 解析消息
var msg map[string]interface{}
if err := json.Unmarshal(messageData, &msg); err != nil {
continue
}
// 构建聊天消息
chatMsg := &ChatMessage{
MessageID: generateMessageID(),
RoomID: c.RoomID,
UserID: c.UserID,
Nickname: c.Nickname,
MessageType: "text",
Content: msg["content"].(string),
Timestamp: time.Now().Unix(),
}
// 广播
service.Broadcast(chatMsg)
}
}
// writePump 发送消息给客户端
func (c *Client) writePump() {
ticker := time.NewTicker(54 * time.Second)
defer func() {
ticker.Stop()
c.Conn.Close()
}()
for {
select {
case message, ok := <-c.Send:
c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
if err := c.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
return
}
case <-ticker.C:
c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// ==================== HTTP Handlers ====================
type LiveHandler struct {
liveService *LiveRoomService
chatService *ChatRoomService
}
func (h *LiveHandler) CreateRoomHandler(w http.ResponseWriter, r *http.Request) {
var req struct {
Title string `json:"title"`
StreamerUserID int64 `json:"streamer_user_id"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
room, err := h.liveService.CreateRoom(req.Title, req.StreamerUserID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(map[string]interface{}{
"code": 0,
"message": "success",
"data": room,
})
}
func (h *LiveHandler) GetRoomHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
roomID := vars["room_id"]
room, err := h.liveService.GetRoom(roomID)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
json.NewEncoder(w).Encode(map[string]interface{}{
"code": 0,
"message": "success",
"data": room,
})
}
func (h *LiveHandler) StartLiveHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
roomID := vars["room_id"]
if err := h.liveService.StartLive(roomID); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
json.NewEncoder(w).Encode(map[string]interface{}{
"code": 0,
"message": "success",
})
}
// ==================== 辅助函数 ====================
func generateRoomID() string {
return uuid.New().String()[:8]
}
func generateMessageID() string {
return "msg_" + uuid.New().String()
}
// ==================== Main ====================
func main() {
liveService := NewLiveRoomService()
chatService := NewChatRoomService()
// 启动聊天室服务
go chatService.Run()
handler := &LiveHandler{
liveService: liveService,
chatService: chatService,
}
// 设置路由
r := mux.NewRouter()
r.HandleFunc("/api/v1/live/rooms", handler.CreateRoomHandler).Methods("POST")
r.HandleFunc("/api/v1/live/rooms/{room_id}", handler.GetRoomHandler).Methods("GET")
r.HandleFunc("/api/v1/live/rooms/{room_id}/start", handler.StartLiveHandler).Methods("POST")
r.HandleFunc("/ws/chat", chatService.HandleWebSocket)
fmt.Println("直播服务启动在 :8080")
http.ListenAndServe(":8080", r)
}
V1 特点:
- 简单易实现
- 支持基础直播功能
- WebSocket 实时聊天
- 单机部署,无法扩展
- 无消息持久化
5.3 V2: 分布式架构 + 消息队列
优化点:
- Redis Pub/Sub 实现分布式聊天室
- Kafka 消息持久化
- 弹幕限流
- 礼物系统
// ==================== 分布式聊天室(Redis Pub/Sub)====================
package distributed
import (
"context"
"encoding/json"
"fmt"
"github.com/go-redis/redis/v8"
)
// DistributedChatService 分布式聊天服务
type DistributedChatService struct {
redis *redis.Client
// 本地连接管理
localClients map[string]map[*Client]bool
// 订阅的房间
subscriptions map[string]*redis.PubSub
}
// NewDistributedChatService 创建分布式聊天服务
func NewDistributedChatService(redisClient *redis.Client) *DistributedChatService {
return &DistributedChatService{
redis: redisClient,
localClients: make(map[string]map[*Client]bool),
subscriptions: make(map[string]*redis.PubSub),
}
}
// SubscribeRoom 订阅房间消息
func (s *DistributedChatService) SubscribeRoom(roomID string) {
if _, exists := s.subscriptions[roomID]; exists {
return
}
ctx := context.Background()
pubsub := s.redis.Subscribe(ctx, fmt.Sprintf("chat:room:%s", roomID))
s.subscriptions[roomID] = pubsub
// 接收消息
go func() {
ch := pubsub.Channel()
for msg := range ch {
var chatMsg ChatMessage
if err := json.Unmarshal([]byte(msg.Payload), &chatMsg); err != nil {
continue
}
// 广播给本地客户端
s.broadcastToLocal(roomID, &chatMsg)
}
}()
}
// PublishMessage 发布消息到 Redis
func (s *DistributedChatService) PublishMessage(message *ChatMessage) error {
ctx := context.Background()
// 1. 发布到 Redis Pub/Sub
data, _ := json.Marshal(message)
channelName := fmt.Sprintf("chat:room:%s", message.RoomID)
if err := s.redis.Publish(ctx, channelName, data).Err(); err != nil {
return err
}
// 2. 异步写入 Kafka(持久化)
go s.writeToKafka(message)
return nil
}
// broadcastToLocal 广播给本地客户端
func (s *DistributedChatService) broadcastToLocal(roomID string, message *ChatMessage) {
clients := s.localClients[roomID]
data, _ := json.Marshal(message)
for client := range clients {
select {
case client.Send <- data:
default:
close(client.Send)
delete(clients, client)
}
}
}
// writeToKafka 写入 Kafka
func (s *DistributedChatService) writeToKafka(message *ChatMessage) {
// 实际实现:发送到 Kafka
// kafkaProducer.Produce("chat-messages", message)
}
// ==================== 弹幕限流 ====================
import (
"time"
"golang.org/x/time/rate"
)
// DanmakuLimiter 弹幕限流器
type DanmakuLimiter struct {
// 用户级限流
userLimiters map[int64]*rate.Limiter
// 房间级限流
roomLimiters map[string]*rate.Limiter
}
// NewDanmakuLimiter 创建弹幕限流器
func NewDanmakuLimiter() *DanmakuLimiter {
return &DanmakuLimiter{
userLimiters: make(map[int64]*rate.Limiter),
roomLimiters: make(map[string]*rate.Limiter),
}
}
// AllowUser 检查用户是否允许发送弹幕
func (l *DanmakuLimiter) AllowUser(userID int64) bool {
limiter, exists := l.userLimiters[userID]
if !exists {
// 每用户每秒 3 条
limiter = rate.NewLimiter(rate.Every(time.Second/3), 3)
l.userLimiters[userID] = limiter
}
return limiter.Allow()
}
// AllowRoom 检查房间是否允许接收弹幕
func (l *DanmakuLimiter) AllowRoom(roomID string) bool {
limiter, exists := l.roomLimiters[roomID]
if !exists {
// 每房间每秒 100 条
limiter = rate.NewLimiter(rate.Every(time.Second/100), 100)
l.roomLimiters[roomID] = limiter
}
return limiter.Allow()
}
// ==================== 礼物系统 ====================
package gift
import (
"context"
"fmt"
)
// GiftService 礼物服务
type GiftService struct {
db *sql.DB
redis *redis.Client
}
// SendGift 赠送礼物
func (s *GiftService) SendGift(
roomID string,
senderUserID, receiverUserID int64,
giftID int,
count int,
) (*GiftOrder, error) {
// 1. 查询礼物信息
gift, err := s.getGift(giftID)
if err != nil {
return nil, err
}
// 2. 计算总金额
totalAmount := gift.Price * float64(count)
// 3. 扣除用户余额(调用支付服务)
if err := s.deductBalance(senderUserID, totalAmount); err != nil {
return nil, err
}
// 4. 创建礼物订单
order := &GiftOrder{
OrderID: generateOrderID(),
RoomID: roomID,
SenderUserID: senderUserID,
ReceiverUserID: receiverUserID,
GiftID: giftID,
Count: count,
UnitPrice: gift.Price,
TotalAmount: totalAmount,
Status: "success",
Timestamp: time.Now().Unix(),
}
_, err = s.db.Exec(`
INSERT INTO gift_orders (
order_id, room_id, sender_user_id, receiver_user_id,
gift_id, count, unit_price, total_amount, status, timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, order.OrderID, order.RoomID, order.SenderUserID, order.ReceiverUserID,
order.GiftID, order.Count, order.UnitPrice, order.TotalAmount,
order.Status, order.Timestamp)
if err != nil {
// 回滚余额
s.refundBalance(senderUserID, totalAmount)
return nil, err
}
// 5. 发送礼物消息到聊天室
giftMessage := &ChatMessage{
MessageID: generateMessageID(),
RoomID: roomID,
UserID: senderUserID,
MessageType: "gift",
Content: fmt.Sprintf("赠送了 %d 个 %s", count, gift.Name),
Metadata: map[string]interface{}{
"gift_id": giftID,
"gift_name": gift.Name,
"count": count,
"animation_url": gift.AnimationURL,
},
Timestamp: time.Now().Unix(),
}
s.chatService.PublishMessage(giftMessage)
// 6. 更新主播收益(异步)
go s.updateStreamerIncome(receiverUserID, totalAmount*0.5) // 50% 分成
return order, nil
}
type GiftOrder struct {
OrderID string
RoomID string
SenderUserID int64
ReceiverUserID int64
GiftID int
Count int
UnitPrice float64
TotalAmount float64
Status string
Timestamp int64
}
type Gift struct {
GiftID int
Name string
Price float64
AnimationURL string
}
func (s *GiftService) getGift(giftID int) (*Gift, error) {
var gift Gift
err := s.db.QueryRow(`
SELECT gift_id, name, price, animation_url
FROM gifts
WHERE gift_id = ?
`, giftID).Scan(&gift.GiftID, &gift.Name, &gift.Price, &gift.AnimationURL)
return &gift, err
}
func (s *GiftService) deductBalance(userID int64, amount float64) error {
// 调用支付服务
return nil
}
func (s *GiftService) refundBalance(userID int64, amount float64) error {
// 退款
return nil
}
func (s *GiftService) updateStreamerIncome(userID int64, amount float64) error {
// 更新主播收益
return nil
}
func generateOrderID() string {
return "gift_" + uuid.New().String()
}
V2 特点:
- 分布式架构,支持水平扩展
- Redis Pub/Sub 实现消息广播
- Kafka 持久化消息
- 弹幕限流
- 礼物系统
5.4 V3: CDN 优化 + 低延迟
优化点:
- WebRTC 低延迟推拉流
- CDN 智能调度
- 多清晰度转码
- 回放系统
// ==================== CDN 调度 ====================
package cdn
import (
"fmt"
"math"
)
// CDNScheduler CDN 调度器
type CDNScheduler struct {
nodes []*CDNNode
}
// CDNNode CDN 节点
type CDNNode struct {
NodeID string
Region string // "华北" | "华东" | "华南"
Bandwidth float64 // 可用带宽(Gbps)
Load float64 // 负载(0-1)
Latency int // 延迟(ms)
}
// SelectBestNode 选择最佳 CDN 节点
func (s *CDNScheduler) SelectBestNode(userRegion string, userIP string) *CDNNode {
var bestNode *CDNNode
bestScore := -1.0
for _, node := range s.nodes {
// 计算综合分数
score := s.calculateScore(node, userRegion)
if score > bestScore {
bestScore = score
bestNode = node
}
}
return bestNode
}
// calculateScore 计算节点分数
func (s *CDNScheduler) calculateScore(node *CDNNode, userRegion string) float64 {
// 1. 地域匹配(同地域加分)
regionScore := 0.0
if node.Region == userRegion {
regionScore = 1.0
} else {
regionScore = 0.5
}
// 2. 负载(负载越低越好)
loadScore := 1.0 - node.Load
// 3. 延迟(延迟越低越好)
latencyScore := 1.0 / (1.0 + float64(node.Latency)/100.0)
// 4. 带宽(带宽越大越好)
bandwidthScore := math.Log(node.Bandwidth + 1)
// 加权综合
score := regionScore*0.4 + loadScore*0.3 + latencyScore*0.2 + bandwidthScore*0.1
return score
}
// ==================== 多清晰度转码 ====================
package transcode
// TranscodeService 转码服务
type TranscodeService struct {
ffmpegPath string
}
// TranscodeProfile 转码配置
type TranscodeProfile struct {
Name string // "超清" | "高清" | "标清"
Resolution string // "1920x1080" | "1280x720" | "640x480"
Bitrate string // "2000k" | "1000k" | "500k"
Framerate int // 30 | 25 | 15
}
// Transcode 转码
func (s *TranscodeService) Transcode(inputURL string, profiles []*TranscodeProfile) error {
for _, profile := range profiles {
outputURL := fmt.Sprintf("%s_%s.flv", inputURL, profile.Name)
// 构建 ffmpeg 命令
cmd := fmt.Sprintf(`
%s -i %s \
-c:v libx264 -preset veryfast \
-b:v %s -r %d \
-s %s \
-c:a aac -b:a 128k \
-f flv %s
`, s.ffmpegPath, inputURL, profile.Bitrate, profile.Framerate,
profile.Resolution, outputURL)
// 异步执行
go s.executeFFmpeg(cmd)
}
return nil
}
func (s *TranscodeService) executeFFmpeg(cmd string) {
// 执行 ffmpeg 命令
// exec.Command("sh", "-c", cmd).Run()
}
// ==================== 直播回放 ====================
package replay
import (
"fmt"
"time"
)
// ReplayService 回放服务
type ReplayService struct {
db *sql.DB
storageURL string
}
// CreateReplay 创建回放
func (s *ReplayService) CreateReplay(roomID string, startTime, endTime time.Time) (*Replay, error) {
replayID := generateReplayID()
// 1. 从录制文件生成回放
recordFile := fmt.Sprintf("%s/record/%s_%d.flv",
s.storageURL, roomID, startTime.Unix())
replayFile := fmt.Sprintf("%s/replay/%s.mp4", s.storageURL, replayID)
// 2. 转换为 MP4 格式
if err := s.convertToMP4(recordFile, replayFile); err != nil {
return nil, err
}
// 3. 生成封面图
coverURL := s.generateCover(replayFile)
// 4. 保存到数据库
duration := endTime.Sub(startTime).Seconds()
replay := &Replay{
ReplayID: replayID,
RoomID: roomID,
Title: "直播回放",
VideoURL: replayFile,
CoverURL: coverURL,
Duration: int(duration),
StartTime: startTime,
EndTime: endTime,
CreatedAt: time.Now(),
}
_, err := s.db.Exec(`
INSERT INTO replays (
replay_id, room_id, title, video_url, cover_url,
duration, start_time, end_time, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`, replay.ReplayID, replay.RoomID, replay.Title, replay.VideoURL,
replay.CoverURL, replay.Duration, replay.StartTime, replay.EndTime,
replay.CreatedAt)
return replay, err
}
type Replay struct {
ReplayID string
RoomID string
Title string
VideoURL string
CoverURL string
Duration int
StartTime time.Time
EndTime time.Time
CreatedAt time.Time
}
func (s *ReplayService) convertToMP4(inputFile, outputFile string) error {
// ffmpeg 转换
return nil
}
func (s *ReplayService) generateCover(videoFile string) string {
// 生成封面图
return ""
}
func generateReplayID() string {
return "replay_" + uuid.New().String()
}
V3 特点:
- CDN 智能调度
- 多清晰度自适应
- WebRTC 低延迟(< 500ms)
- 直播回放
- 支持百万并发
6. 核心优化
6.1 延迟优化
问题:直播延迟高(> 5 秒)
解决方案:
| 协议 | 延迟 | 场景 |
|---|---|---|
| HLS | 5-10s | 普通直播 |
| RTMP | 2-5s | 传统直播 |
| HTTP-FLV | 1-3s | 低延迟直播 |
| WebRTC | < 500ms | 实时互动 |
优化策略:
- 使用 HTTP-FLV / WebRTC
- 减少 GOP 大小(关键帧间隔)
- CDN 边缘节点
- 码率自适应
6.2 弱网优化
// 动态码率调整
type AdaptiveBitrateService struct {
profiles []*BitrateProfile
}
type BitrateProfile struct {
Bitrate int // kbps
Resolution string
Quality string // "超清" | "高清" | "标清"
}
// SelectProfile 根据网络状况选择码率
func (s *AdaptiveBitrateService) SelectProfile(bandwidth int) *BitrateProfile {
// 选择最接近但不超过带宽的配置
for i := len(s.profiles) - 1; i >= 0; i-- {
if s.profiles[i].Bitrate <= bandwidth*80/100 { // 预留 20% 缓冲
return s.profiles[i]
}
}
// 返回最低画质
return s.profiles[0]
}
6.3 大规模并发优化
// 长连接优化
type ConnectionPool struct {
maxConns int
activeConns int
mu sync.Mutex
}
// Acquire 获取连接
func (p *ConnectionPool) Acquire() bool {
p.mu.Lock()
defer p.mu.Unlock()
if p.activeConns >= p.maxConns {
return false
}
p.activeConns++
return true
}
// Release 释放连接
func (p *ConnectionPool) Release() {
p.mu.Lock()
defer p.mu.Unlock()
p.activeConns--
}
7. 监控与告警
var (
liveRoomCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "live_room_count",
Help: "Current live room count",
},
)
onlineUserCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "live_online_user_count",
Help: "Online user count per room",
},
[]string{"room_id"},
)
messageLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "live_message_latency_seconds",
Help: "Message latency",
Buckets: []float64{0.001, 0.01, 0.1, 0.5, 1},
},
[]string{"message_type"},
)
streamBitrate = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "live_stream_bitrate_kbps",
Help: "Stream bitrate",
},
[]string{"room_id", "quality"},
)
)
8. 面试问答(10个高频题)
直播的推流和拉流是什么?
答:
推流(Push):主播将音视频流推送到服务器
- 协议:RTMP、WebRTC
- 工具:OBS、FFmpeg、移动端 SDK
拉流(Pull):观众从服务器拉取视频流
- 协议:HLS、HTTP-FLV、WebRTC
- 播放器:Video.js、ijkplayer
流程:
主播端 → RTMP推流 → 推流服务器 → 转码/录制 → CDN → HLS/FLV拉流 → 观众端
如何降低直播延迟?
答:
协议选择:
- HLS:5-10s(HTTP + M3U8切片)
- RTMP:2-5s
- HTTP-FLV:1-3s
- WebRTC:< 500ms
优化方法:
- 减少 GOP:关键帧间隔从 3s → 1s
- CDN 优化:边缘节点缓存
- 编码优化:H.265 压缩率更高
- 网络优化:UDP 代替 TCP(WebRTC)
如何实现百万并发?
答:
架构优化:
- CDN 分发:减轻源站压力
单源站 10 Gbps
CDN 回源 1:100
支持 1000 Gbps = 100 万观众(1 Mbps/人)
- 长连接优化(聊天):
- 单机支持 100 万连接(C10M)
- 使用 epoll(Linux)、kqueue(macOS)
- 消息队列:
- Redis Pub/Sub 广播
- Kafka 持久化
- 分布式部署:
- 多机房、多地域
- 负载均衡
聊天室如何实现?
答:
单机版:WebSocket + 内存广播
分布式版:Redis Pub/Sub
// 发送消息
redis.Publish("chat:room:12345", message)
// 订阅消息
pubsub := redis.Subscribe("chat:room:12345")
for msg := range pubsub.Channel() {
broadcastToLocalClients(msg)
}
消息持久化:
- Kafka 存储历史消息
- 用户上线后拉取离线消息
弹幕如何实现?
答:
发送流程:
用户发送 → 敏感词过滤 → 限流 → 广播给所有观众
限流策略:
// 用户级:每秒 3 条
userLimiter := rate.NewLimiter(3, 3)
// 房间级:每秒 100 条
roomLimiter := rate.NewLimiter(100, 100)
敏感词过滤:
- DFA 算法(确定性有限状态自动机)
- Trie 树
- 正则表达式
显示优化:
- 前端随机轨道
- 碰撞检测
- 限制同屏数量(< 50 条)
礼物系统如何设计?
答:
流程:
- 用户点击送礼
- 扣除用户余额
- 创建礼物订单
- 广播礼物消息(带动画)
- 更新主播收益
关键点:
- 事务性:扣款和订单原子性
- 幂等性:防止重复扣款
- 分成:主播 50%,平台 50%
func SendGift(userID, streamerID int64, giftID int) error {
tx, _ := db.Begin()
defer tx.Rollback()
// 1. 扣款
tx.Exec(`UPDATE accounts SET balance = balance - ? WHERE user_id = ?`, giftPrice, userID)
// 2. 创建订单
tx.Exec(`INSERT INTO gift_orders ...`)
// 3. 增加主播收益
tx.Exec(`UPDATE accounts SET balance = balance + ? WHERE user_id = ?`, giftPrice*0.5, streamerID)
return tx.Commit()
}
如何实现直播回放?
答:
录制:
- 推流服务器(SRS、Nginx-RTMP)自动录制
- 格式:FLV、MP4
回放生成:
- 直播结束后触发任务
- 转码为 MP4(ffmpeg)
- 生成封面图
- 上传到对象存储(OSS)
- 入库
优化:
- 切片(HLS):支持快进
- 多清晰度
- CDN 缓存
如何监控直播质量?
答:
推流质量:
- 帧率(FPS):>= 25
- 码率(Bitrate):稳定,无剧烈波动
- 丢包率:< 1%
拉流质量:
- 首屏时间:< 2s
- 卡顿率:< 5%
- 播放成功率:> 99%
指标采集:
// 推流服务器采集
type StreamMetrics struct {
RoomID string
FPS int
Bitrate int // kbps
PacketLoss float64
}
// 上报到 Prometheus
streamBitrate.WithLabelValues(roomID, "hd").Set(float64(bitrate))
告警:
- 帧率 < 15:推流异常
- 卡顿率 > 10%:网络问题
- 在线人数暴跌:服务故障
WebRTC 和 RTMP 有什么区别?
答:
| 对比项 | RTMP | WebRTC |
|---|---|---|
| 延迟 | 2-5s | < 500ms |
| 协议 | TCP | UDP(SRTP) |
| 浏览器支持 | 需要 Flash | 原生支持 |
| 穿透 NAT | 困难 | STUN/TURN |
| 应用场景 | 传统直播 | 实时互动、连麦 |
选择建议:
- 普通直播:RTMP + HTTP-FLV
- 连麦、视频会议:WebRTC
如何设计大规模直播系统的架构?
答:
整体架构:
┌──────────┐
│ 主播端 │ RTMP推流
└─────┬────┘
↓
┌─────────────┐
│ 推流服务器 │ 转码、录制
│ (SRS集群) │
└─────┬───────┘
↓
┌─────────────┐
│ CDN │ 分发
│ (边缘节点) │
└─────┬───────┘
↓ HLS/FLV
┌──────────┐
│ 观众端 │
└──────────┘
关键点:
- 推流:集群部署,负载均衡
- 转码:GPU 加速,多清晰度
- CDN:边缘节点,智能调度
- 聊天:Redis Pub/Sub,分布式
- 存储:对象存储(OSS),CDN 缓存
- 监控:Prometheus + Grafana
容量规划:
- 推流服务器:5000 路 / 25 台 = 200 路/台
- CDN 带宽:50 万观众 × 1 Mbps = 500 Gbps
- 聊天服务器:25,000 QPS / 5 台 = 5000 QPS/台
9. 总结
核心要点
推拉流
- RTMP 推流、HLS/FLV 拉流
- 转码、录制、截图
聊天室
- WebSocket 实时通信
- Redis Pub/Sub 分布式
- Kafka 消息持久化
弹幕
- 限流(用户级、房间级)
- 敏感词过滤
- 前端碰撞检测
低延迟
- WebRTC(< 500ms)
- HTTP-FLV(1-3s)
- CDN 边缘节点
大规模并发
- CDN 分发(1:100)
- 长连接优化(C10M)
- 消息队列
架构演进
V1: 单机 WebSocket 聊天室
↓
V2: Redis Pub/Sub + Kafka 持久化
↓
V3: CDN + WebRTC + 多清晰度
本章完,祝面试顺利!