HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 系统设计实战

    • 系统设计面试教程
    • 系统设计方法论
    • 01-短链系统设计
    • 02 - 秒杀系统设计
    • 03 - IM 即时通讯系统设计
    • 04 - Feed 流系统设计
    • 05 - 分布式 ID 生成器设计
    • 06 - 限流系统设计
    • 第7章:搜索引擎设计
    • 08 - 推荐系统设计
    • 09 - 支付系统设计
    • 10 - 电商系统设计
    • 11 - 直播系统设计
    • 第12章:缓存系统设计
    • 第13章:消息队列设计
    • 第14章:分布式事务
    • 15 - 监控系统设计

03 - IM 即时通讯系统设计

面试频率: 难度等级: 推荐时长: 50-60 分钟

目录

  • 需求分析与澄清
  • 容量估算
  • API 设计
  • 数据模型设计
  • 架构设计
  • 核心算法与实现
  • 优化方案
  • 监控告警
  • 面试问答

需求分析与澄清

业务场景

IM(Instant Messaging)即时通讯系统是现代互联网应用的核心基础设施之一。典型应用场景:

  • 社交聊天(微信、QQ)
  • 企业协作(钉钉、Slack)
  • 客服系统
  • 直播间聊天
  • 游戏内聊天

核心挑战

IM 系统的五大核心挑战:
┌─────────────────────────────────────────────────────────┐
│ 1. 长连接管理                                            │
│    - 百万级并发长连接                                    │
│    - 连接保活与断线重连                                  │
│    - 连接路由(用户 → 服务器映射)                       │
│                                                          │
│ 2. 消息可靠性                                            │
│    - 消息不丢失(ACK 机制)                              │
│    - 消息不重复(幂等性)                                │
│    - 消息有序(时序保证)                                │
│                                                          │
│ 3. 实时性                                                │
│    - 端到端延迟 < 500ms                                  │
│    - 消息推送实时送达                                    │
│                                                          │
│ 4. 一致性                                                │
│    - 多端消息同步(手机、PC、Web)                       │
│    - 离线消息拉取                                        │
│    - 消息顺序一致性                                      │
│                                                          │
│ 5. 海量存储                                              │
│    - 消息持久化存储                                      │
│    - 历史消息快速检索                                    │
│    - 冷热数据分离                                        │
└─────────────────────────────────────────────────────────┘

功能性需求

面试官视角的关键问题

面试官: "设计一个类似微信的 IM 系统。"

你需要主动澄清以下需求:

Q1: 需要支持哪些消息类型?

 单聊(1v1)
 群聊(最多 500 人)
 文本、图片、语音、视频
 音视频通话(WebRTC,超出范围)
 朋友圈、公众号(独立系统)

Q2: 用户规模是多少?

DAU: 1 亿
在线用户: 5000 万(峰值)
平均每人发送消息: 50 条/天
日消息总量: 50 亿条
QPS: 50亿 / 86400 ≈ 6 万 QPS
峰值 QPS: 6万 * 3 = 18 万 QPS

Q3: 消息可靠性要求?

 消息必须不丢失(持久化)
 支持离线消息(最多 1 万条)
 消息顺序保证(同一会话内有序)
 不需要实时全局有序(分布式难以实现)

Q4: 多端同步?

 支持多端登录(手机、PC、Web)
 消息实时同步到所有设备
 已读状态同步

Q5: 群聊规模?

普通群: 最多 500 人
超级群: 最多 5000 人(只读消息,如直播间)

非功能性需求

需求类型具体要求优先级
高可用99.99% 可用性P0
低延迟端到端延迟 < 500ms (P99)P0
消息可靠性不丢失、不重复P0
扩展性支持亿级用户P0
安全性消息加密传输P1
可观测性消息链路追踪P1

容量估算

用户规模

【用户数据】
总用户数: 10 亿
DAU: 1 亿 (10%)
在线用户: 5000 万 (50%)
峰值在线: 8000 万 (晚上 8-10 点)

【消息量】
平均每人每天发送: 50 条
日消息总量: 1亿 * 50 = 50 亿条
QPS: 50亿 / 86400 ≈ 6 万 QPS
峰值 QPS: 18 万 QPS

连接数估算

【长连接】
在线用户: 5000 万
平均每用户设备数: 1.5 (手机 + PC)
总连接数: 5000万 * 1.5 = 7500 万

【单机连接数】
假设单机支持: 10 万连接(C10M 问题)
所需服务器: 7500万 / 10万 = 750 台

【连接保活】
心跳间隔: 30 秒
心跳 QPS: 7500万 / 30 ≈ 250 万 QPS

存储估算

消息存储

【单条消息大小】
消息 ID: 8 Bytes (int64)
会话 ID: 8 Bytes
发送者 ID: 8 Bytes
接收者 ID: 8 Bytes
消息类型: 1 Byte
消息内容: 500 Bytes (平均)
时间戳: 8 Bytes
状态标识: 1 Byte
总计: ≈ 550 Bytes

【存储量】
每日消息: 50 亿条
每日存储: 50亿 * 550B = 2.75 TB

一年存储: 2.75TB * 365 ≈ 1 PB
三年存储: 3 PB

【优化方案】
- 热数据(最近 3 个月): 1 PB(SSD)
- 温数据(3-12 个月): 2 PB(HDD)
- 冷数据(>12 个月): 对象存储(压缩)

多媒体存储

【图片】
每日图片消息: 5 亿条 (10%)
平均大小: 200 KB
每日存储: 5亿 * 200KB = 100 TB

【语音】
每日语音消息: 2 亿条 (4%)
平均大小: 50 KB
每日存储: 2亿 * 50KB = 10 TB

【视频】
每日视频消息: 1 亿条 (2%)
平均大小: 5 MB
每日存储: 1亿 * 5MB = 500 TB

【总计】
每日多媒体存储: 610 TB
一年: 610TB * 365 ≈ 220 PB

【优化方案】
- 使用对象存储(OSS/S3)
- CDN 加速分发
- 小文件合并存储
- 冷数据归档压缩

带宽估算

【下行带宽】(服务器 → 客户端)
消息推送: 18 万 QPS * 550 Bytes = 99 MB/s = 0.8 Gbps
多媒体下载: 100 TB / 86400s ≈ 1.2 GB/s = 9.6 Gbps
总计: ≈ 10 Gbps

【上行带宽】(客户端 → 服务器)
消息上传: 18 万 QPS * 550 Bytes = 99 MB/s = 0.8 Gbps
多媒体上传: 610 TB / 86400s ≈ 7 GB/s = 56 Gbps
总计: ≈ 60 Gbps

建议带宽: 100 Gbps(考虑 1.5 倍冗余)

【优化方案】
- 多媒体走 CDN(降低 80% 带宽成本)
- 消息压缩(Gzip/LZ4)
- 图片压缩(WebP)

数据库估算

【用户表】
1 亿 DAU * 1 KB = 100 GB

【会话表】
1 亿用户 * 平均 100 个会话 * 200 Bytes = 2 TB

【消息表】
50 亿条/天 * 550 Bytes = 2.75 TB/天
按 user_id 分 256 个表: 每表 10 GB/天

【群组表】
1 亿群 * 1 KB = 100 GB

【群成员关系表】
1 亿群 * 平均 100 人 * 16 Bytes = 160 GB

API 设计

WebSocket 协议设计

连接建立

// 客户端连接
const ws = new WebSocket('wss://im.example.com/ws?token=xxx&device_id=yyy');

ws.onopen = () => {
    console.log('WebSocket 连接成功');
};

ws.onmessage = (event) => {
    const msg = JSON.parse(event.data);
    handleMessage(msg);
};

ws.onerror = (error) => {
    console.error('WebSocket 错误', error);
};

ws.onclose = () => {
    console.log('WebSocket 连接关闭,尝试重连...');
    reconnect();
};

消息协议(JSON)

// 统一消息格式
{
  "type": "message",           // 消息类型: message, ack, ping, pong
  "msg_id": "1234567890",      // 消息 ID(全局唯一)
  "from_uid": 10001,           // 发送者 UID
  "to_uid": 10002,             // 接收者 UID(单聊)
  "to_gid": null,              // 群组 ID(群聊)
  "msg_type": "text",          // 消息类型: text, image, voice, video
  "content": "Hello!",         // 消息内容
  "timestamp": 1699804800000,  // 时间戳(毫秒)
  "seq": 1001,                 // 消息序号(会话内递增)
  "client_msg_id": "uuid-xxx"  // 客户端消息 ID(去重)
}

RESTful API 设计

1. 用户认证

# 登录获取 Token
POST /api/v1/auth/login

Request Body:
{
  "username": "user123",
  "password": "hashed_password",
  "device_id": "device_uuid"
}

Response 200:
{
  "code": 0,
  "data": {
    "user_id": 10001,
    "token": "jwt_token_xxx",
    "ws_url": "wss://im-gateway-1.example.com/ws",  // 分配的网关地址
    "expire_time": 1699891200
  }
}

2. 发送消息(HTTP 降级方案)

# 发送单聊消息
POST /api/v1/messages/send

Headers:
  Authorization: Bearer {token}

Request Body:
{
  "to_uid": 10002,
  "msg_type": "text",
  "content": "Hello!",
  "client_msg_id": "uuid-xxx"
}

Response 200:
{
  "code": 0,
  "data": {
    "msg_id": "1234567890",
    "timestamp": 1699804800000,
    "seq": 1001
  }
}

# 发送群聊消息
POST /api/v1/messages/send

Request Body:
{
  "to_gid": 2001,
  "msg_type": "text",
  "content": "Hello everyone!",
  "client_msg_id": "uuid-xxx"
}

3. 拉取离线消息

# 拉取离线消息(增量拉取)
GET /api/v1/messages/offline?last_seq=1000&limit=100

Response 200:
{
  "code": 0,
  "data": {
    "messages": [
      {
        "msg_id": "1234567890",
        "from_uid": 10001,
        "to_uid": 10002,
        "msg_type": "text",
        "content": "Hello!",
        "timestamp": 1699804800000,
        "seq": 1001
      }
    ],
    "has_more": true,
    "next_seq": 1100
  }
}

4. 消息历史查询

# 查询会话历史消息
GET /api/v1/messages/history?session_id=xxx&before_seq=1000&limit=20

Response 200:
{
  "code": 0,
  "data": {
    "messages": [ /* ... */ ],
    "has_more": true
  }
}

5. 会话管理

# 获取会话列表
GET /api/v1/sessions?page=1&size=20

Response 200:
{
  "code": 0,
  "data": {
    "sessions": [
      {
        "session_id": "session_xxx",
        "session_type": "single",  // single, group
        "peer_uid": 10002,
        "peer_name": "Alice",
        "last_msg": "Hello!",
        "last_msg_time": 1699804800000,
        "unread_count": 5,
        "seq": 1001
      }
    ],
    "total": 50
  }
}

# 清除未读数
POST /api/v1/sessions/{session_id}/read

Request Body:
{
  "seq": 1001  // 已读到的序号
}

6. 群组管理

# 创建群组
POST /api/v1/groups

Request Body:
{
  "name": "技术讨论组",
  "members": [10001, 10002, 10003]
}

Response 200:
{
  "code": 0,
  "data": {
    "group_id": 2001,
    "name": "技术讨论组",
    "created_at": 1699804800000
  }
}

# 加入群组
POST /api/v1/groups/{group_id}/members

Request Body:
{
  "user_ids": [10004, 10005]
}

# 退出群组
DELETE /api/v1/groups/{group_id}/members/{user_id}

消息类型定义

// 消息类型
const (
    MsgTypeText     = "text"     // 文本
    MsgTypeImage    = "image"    // 图片
    MsgTypeVoice    = "voice"    // 语音
    MsgTypeVideo    = "video"    // 视频
    MsgTypeFile     = "file"     // 文件
    MsgTypeLocation = "location" // 位置
    MsgTypeCard     = "card"     // 名片
    MsgTypeSystem   = "system"   // 系统消息
)

// 协议类型
const (
    ProtocolMessage = "message"  // 普通消息
    ProtocolAck     = "ack"      // 确认消息
    ProtocolPing    = "ping"     // 心跳请求
    ProtocolPong    = "pong"     // 心跳响应
    ProtocolKickoff = "kickoff"  // 踢下线
    ProtocolSync    = "sync"     // 同步请求
)

数据模型设计

核心表结构

1. 用户表 (user)

CREATE TABLE user (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY COMMENT '用户 ID',
    username VARCHAR(50) NOT NULL COMMENT '用户名',
    nickname VARCHAR(100) NOT NULL COMMENT '昵称',
    avatar VARCHAR(255) DEFAULT '' COMMENT '头像 URL',
    phone VARCHAR(20) DEFAULT '' COMMENT '手机号',
    email VARCHAR(100) DEFAULT '' COMMENT '邮箱',
    password_hash VARCHAR(255) NOT NULL COMMENT '密码哈希',
    status TINYINT UNSIGNED DEFAULT 1 COMMENT '状态: 0-禁用 1-正常',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

    UNIQUE KEY uk_username (username),
    INDEX idx_phone (phone),
    INDEX idx_email (email)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户表';

2. 会话表 (session)

CREATE TABLE session (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY COMMENT '会话 ID',
    session_id VARCHAR(64) NOT NULL COMMENT '会话唯一标识',
    user_id BIGINT UNSIGNED NOT NULL COMMENT '用户 ID',
    session_type TINYINT UNSIGNED NOT NULL COMMENT '会话类型: 1-单聊 2-群聊',
    peer_id BIGINT UNSIGNED NOT NULL COMMENT '对端 ID(用户 ID 或群组 ID)',
    last_msg_id VARCHAR(64) DEFAULT '' COMMENT '最后一条消息 ID',
    last_msg_content TEXT COMMENT '最后一条消息内容(摘要)',
    last_msg_time TIMESTAMP NULL COMMENT '最后消息时间',
    unread_count INT UNSIGNED DEFAULT 0 COMMENT '未读消息数',
    seq BIGINT UNSIGNED DEFAULT 0 COMMENT '消息序号(用于增量拉取)',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

    UNIQUE KEY uk_user_session (user_id, session_id),
    INDEX idx_user_id (user_id),
    INDEX idx_last_msg_time (last_msg_time) COMMENT '按时间排序会话列表'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='会话表';

-- 分表策略: 按 user_id 分 256 张表

3. 消息表 (message)

CREATE TABLE message (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY COMMENT '自增 ID',
    msg_id VARCHAR(64) NOT NULL COMMENT '消息 ID(全局唯一)',
    session_id VARCHAR(64) NOT NULL COMMENT '会话 ID',
    from_uid BIGINT UNSIGNED NOT NULL COMMENT '发送者 UID',
    to_uid BIGINT UNSIGNED DEFAULT NULL COMMENT '接收者 UID(单聊)',
    to_gid BIGINT UNSIGNED DEFAULT NULL COMMENT '群组 ID(群聊)',
    msg_type VARCHAR(20) NOT NULL COMMENT '消息类型',
    content TEXT NOT NULL COMMENT '消息内容',
    seq BIGINT UNSIGNED NOT NULL COMMENT '消息序号(会话内递增)',
    client_msg_id VARCHAR(64) DEFAULT '' COMMENT '客户端消息 ID(去重)',
    timestamp BIGINT UNSIGNED NOT NULL COMMENT '时间戳(毫秒)',
    status TINYINT UNSIGNED DEFAULT 1 COMMENT '状态: 0-删除 1-正常',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    UNIQUE KEY uk_msg_id (msg_id),
    INDEX idx_session_seq (session_id, seq) COMMENT '会话内消息查询',
    INDEX idx_from_uid (from_uid),
    INDEX idx_to_uid (to_uid),
    INDEX idx_to_gid (to_gid),
    INDEX idx_timestamp (timestamp) COMMENT '时间范围查询'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息表';

-- 分表策略: 按 session_id 哈希分 256 张表
-- 消息表会非常大,建议按时间分表(每月一张表)

4. 群组表 (group_info)

CREATE TABLE group_info (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY COMMENT '群组 ID',
    group_id BIGINT UNSIGNED NOT NULL COMMENT '群组唯一 ID',
    name VARCHAR(100) NOT NULL COMMENT '群名称',
    avatar VARCHAR(255) DEFAULT '' COMMENT '群头像',
    owner_id BIGINT UNSIGNED NOT NULL COMMENT '群主 ID',
    member_count INT UNSIGNED DEFAULT 0 COMMENT '成员数量',
    max_members INT UNSIGNED DEFAULT 500 COMMENT '最大成员数',
    type TINYINT UNSIGNED DEFAULT 1 COMMENT '群类型: 1-普通群 2-超级群',
    status TINYINT UNSIGNED DEFAULT 1 COMMENT '状态: 0-解散 1-正常',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

    UNIQUE KEY uk_group_id (group_id),
    INDEX idx_owner_id (owner_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='群组表';

5. 群成员表 (group_member)

CREATE TABLE group_member (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY COMMENT '自增 ID',
    group_id BIGINT UNSIGNED NOT NULL COMMENT '群组 ID',
    user_id BIGINT UNSIGNED NOT NULL COMMENT '用户 ID',
    role TINYINT UNSIGNED DEFAULT 1 COMMENT '角色: 1-普通成员 2-管理员 3-群主',
    nickname VARCHAR(100) DEFAULT '' COMMENT '群昵称',
    join_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '加入时间',
    last_read_seq BIGINT UNSIGNED DEFAULT 0 COMMENT '最后已读序号',

    UNIQUE KEY uk_group_user (group_id, user_id),
    INDEX idx_group_id (group_id),
    INDEX idx_user_id (user_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='群成员表';

6. 离线消息表 (offline_message)

CREATE TABLE offline_message (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY COMMENT '自增 ID',
    user_id BIGINT UNSIGNED NOT NULL COMMENT '用户 ID',
    msg_id VARCHAR(64) NOT NULL COMMENT '消息 ID',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    INDEX idx_user_id (user_id),
    INDEX idx_created_at (created_at) COMMENT '定时清理'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='离线消息索引表';

-- 说明:
-- 离线消息只存储索引,真实消息在 message 表
-- 用户上线后拉取离线消息,然后删除索引
-- 最多保留 1 万条,超过则删除最早的

Redis 数据结构设计

1. 用户在线状态

【String 类型】
Key: user:online:{user_id}
Value: {gateway_server_id}_{connection_id}  # 用户所在网关和连接 ID
TTL: 300 秒(心跳刷新)

示例:
SET user:online:10001 "gateway-1_conn-abc123" EX 300
GET user:online:10001

2. 用户多端会话

【Hash 类型】
Key: user:sessions:{user_id}
Field: device_id
Value: {gateway_server_id}_{connection_id}
TTL: 永久(手动删除)

示例:
HSET user:sessions:10001 device-mobile "gateway-1_conn-abc"
HSET user:sessions:10001 device-pc "gateway-2_conn-def"
HGETALL user:sessions:10001

3. 会话序号(用于消息去重和增量拉取)

【String 类型】
Key: session:seq:{session_id}
Value: 当前最大序号
TTL: 永久

示例:
SET session:seq:session_xxx 1001
INCR session:seq:session_xxx  # 原子递增
GET session:seq:session_xxx

4. 消息 ACK 缓存

【Set 类型】
Key: msg:ack:{user_id}
Value: msg_id 集合(已确认的消息 ID)
TTL: 1 小时

示例:
SADD msg:ack:10001 "msg_123" "msg_456"
SISMEMBER msg:ack:10001 "msg_123"  # 检查是否已 ACK

5. 群成员列表缓存

【Set 类型】
Key: group:members:{group_id}
Value: user_id 集合
TTL: 1 小时

示例:
SADD group:members:2001 10001 10002 10003
SMEMBERS group:members:2001
SISMEMBER group:members:2001 10001  # 检查是否在群里

6. 未读消息计数

【Hash 类型】
Key: user:unread:{user_id}
Field: session_id
Value: 未读数
TTL: 永久

示例:
HINCRBY user:unread:10001 session_xxx 1  # 未读数 +1
HSET user:unread:10001 session_xxx 0     # 清零
HGETALL user:unread:10001

架构设计

V1 版本:单体架构(MVP)

架构图

客户端(WebSocket)
    ↓
┌────────────────────────────────────────┐
│         IM 服务器(单体)               │
│  ┌──────────────────────────────────┐  │
│  │  连接管理                        │  │
│  │  - WebSocket 连接池              │  │
│  │  - 心跳检测                      │  │
│  └──────────────────────────────────┘  │
│  ┌──────────────────────────────────┐  │
│  │  消息处理                        │  │
│  │  - 消息路由                      │  │
│  │  - 消息持久化                    │  │
│  │  - 离线消息                      │  │
│  └──────────────────────────────────┘  │
│  ┌──────────────────────────────────┐  │
│  │  业务逻辑                        │  │
│  │  - 会话管理                      │  │
│  │  - 群组管理                      │  │
│  └──────────────────────────────────┘  │
└────────────────────────────────────────┘
    ↓
┌────────────────────────────────────────┐
│         MySQL(单机)                   │
│  - 用户表                               │
│  - 消息表                               │
│  - 会话表                               │
│  - 群组表                               │
└────────────────────────────────────────┘

核心代码实现

WebSocket 连接管理

package server

import (
    "github.com/gorilla/websocket"
    "sync"
)

// Connection WebSocket 连接封装
type Connection struct {
    UserID     int64
    DeviceID   string
    Conn       *websocket.Conn
    SendChan   chan []byte
    CloseChan  chan struct{}
    LastActive time.Time
}

// ConnectionManager 连接管理器
type ConnectionManager struct {
    connections map[int64]*Connection  // user_id -> connection
    mu          sync.RWMutex
}

func NewConnectionManager() *ConnectionManager {
    return &ConnectionManager{
        connections: make(map[int64]*Connection),
    }
}

// AddConnection 添加连接
func (cm *ConnectionManager) AddConnection(userID int64, conn *Connection) {
    cm.mu.Lock()
    defer cm.mu.Unlock()
    cm.connections[userID] = conn

    log.Infof("用户 %d 已连接,当前在线: %d", userID, len(cm.connections))
}

// RemoveConnection 移除连接
func (cm *ConnectionManager) RemoveConnection(userID int64) {
    cm.mu.Lock()
    defer cm.mu.Unlock()
    delete(cm.connections, userID)

    log.Infof("用户 %d 已断开,当前在线: %d", userID, len(cm.connections))
}

// GetConnection 获取连接
func (cm *ConnectionManager) GetConnection(userID int64) (*Connection, bool) {
    cm.mu.RLock()
    defer cm.mu.RUnlock()
    conn, ok := cm.connections[userID]
    return conn, ok
}

// SendToUser 发送消息给用户
func (cm *ConnectionManager) SendToUser(userID int64, data []byte) error {
    conn, ok := cm.GetConnection(userID)
    if !ok {
        return errors.New("用户不在线")
    }

    select {
    case conn.SendChan <- data:
        return nil
    case <-time.After(5 * time.Second):
        return errors.New("发送超时")
    }
}

消息处理

package handler

import (
    "encoding/json"
    "time"
)

type MessageHandler struct {
    connMgr *ConnectionManager
    db      *sql.DB
}

// HandleMessage 处理收到的消息
func (h *MessageHandler) HandleMessage(conn *Connection, data []byte) error {
    var msg Message
    if err := json.Unmarshal(data, &msg); err != nil {
        return err
    }

    switch msg.Type {
    case ProtocolMessage:
        return h.handleChatMessage(conn, &msg)
    case ProtocolAck:
        return h.handleAck(conn, &msg)
    case ProtocolPing:
        return h.handlePing(conn)
    default:
        return errors.New("未知消息类型")
    }
}

// handleChatMessage 处理聊天消息
func (h *MessageHandler) handleChatMessage(conn *Connection, msg *Message) error {
    // 1. 生成消息 ID
    msg.MsgID = generateMsgID()
    msg.Timestamp = time.Now().UnixMilli()
    msg.FromUID = conn.UserID

    // 2. 持久化到数据库
    if err := h.saveMessage(msg); err != nil {
        return err
    }

    // 3. 发送给接收方
    if msg.ToUID != 0 {
        // 单聊
        return h.sendToUser(msg)
    } else if msg.ToGID != 0 {
        // 群聊
        return h.sendToGroup(msg)
    }

    return nil
}

// sendToUser 发送单聊消息
func (h *MessageHandler) sendToUser(msg *Message) error {
    // 检查接收方是否在线
    if h.connMgr.IsOnline(msg.ToUID) {
        // 在线,直接推送
        data, _ := json.Marshal(msg)
        return h.connMgr.SendToUser(msg.ToUID, data)
    } else {
        // 离线,存储离线消息
        return h.saveOfflineMessage(msg.ToUID, msg.MsgID)
    }
}

// sendToGroup 发送群聊消息
func (h *MessageHandler) sendToGroup(msg *Message) error {
    // 查询群成员
    members, err := h.getGroupMembers(msg.ToGID)
    if err != nil {
        return err
    }

    data, _ := json.Marshal(msg)

    // 遍历群成员,发送消息
    for _, memberID := range members {
        if memberID == msg.FromUID {
            continue  // 跳过发送者自己
        }

        if h.connMgr.IsOnline(memberID) {
            h.connMgr.SendToUser(memberID, data)
        } else {
            h.saveOfflineMessage(memberID, msg.MsgID)
        }
    }

    return nil
}

// saveMessage 保存消息到数据库
func (h *MessageHandler) saveMessage(msg *Message) error {
    sql := `
        INSERT INTO message (msg_id, session_id, from_uid, to_uid, to_gid, msg_type, content, seq, timestamp)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    `
    _, err := h.db.Exec(sql, msg.MsgID, msg.SessionID, msg.FromUID, msg.ToUID, msg.ToGID,
        msg.MsgType, msg.Content, msg.Seq, msg.Timestamp)
    return err
}

// saveOfflineMessage 保存离线消息索引
func (h *MessageHandler) saveOfflineMessage(userID int64, msgID string) error {
    sql := "INSERT INTO offline_message (user_id, msg_id) VALUES (?, ?)"
    _, err := h.db.Exec(sql, userID, msgID)
    return err
}

心跳保活

// handlePing 处理心跳
func (h *MessageHandler) handlePing(conn *Connection) error {
    // 更新最后活跃时间
    conn.LastActive = time.Now()

    // 发送 pong 响应
    pong := Message{Type: ProtocolPong, Timestamp: time.Now().UnixMilli()}
    data, _ := json.Marshal(pong)

    return conn.Conn.WriteMessage(websocket.TextMessage, data)
}

// HeartbeatChecker 心跳检查器
func (cm *ConnectionManager) HeartbeatChecker() {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        cm.mu.Lock()
        for userID, conn := range cm.connections {
            // 如果 2 分钟没有活跃,关闭连接
            if time.Since(conn.LastActive) > 2*time.Minute {
                log.Warnf("用户 %d 心跳超时,关闭连接", userID)
                conn.Conn.Close()
                delete(cm.connections, userID)
            }
        }
        cm.mu.Unlock()
    }
}

V1 架构的问题

问题分析:
┌─────────────────────────────────────────────┐
│ 1. 单点故障                                  │
│    - 服务器宕机,所有用户掉线                │
│    - 无法水平扩展                            │
│                                              │
│ 2. 连接数限制                                │
│    - 单机最多 10 万连接(C10K 问题)         │
│    - 无法支撑百万级在线                      │
│                                              │
│ 3. 消息丢失风险                              │
│    - 内存中的消息未持久化                    │
│    - 重启后消息丢失                          │
│                                              │
│ 4. 跨服务器通信                              │
│    - 用户 A 在服务器 1,用户 B 在服务器 2    │
│    - 无法相互发送消息                        │
└─────────────────────────────────────────────┘

V2 版本:分布式架构 + 消息队列

架构图

客户端
    ↓
┌────────────────────────────────────────────┐
│         负载均衡(Nginx/LVS)               │
└────────────────────────────────────────────┘
    ↓
┌────────────────────────────────────────────┐
│      WebSocket 网关集群(100 台)           │
│  - 长连接管理                               │
│  - 心跳检测                                 │
│  - 消息收发                                 │
└────────────────────────────────────────────┘
    ↓                     ↓
┌─────────┐          ┌──────────────┐
│  Redis  │          │  Kafka MQ    │
│  集群   │          │  - 消息投递   │
│         │          │  - 解耦       │
└─────────┘          └──────────────┘
         ↓                     ↓
┌────────────────────────────────────────────┐
│      消息处理服务集群(50 台)              │
│  - 消息持久化                               │
│  - 离线消息                                 │
│  - 消息路由                                 │
└────────────────────────────────────────────┘
    ↓
┌────────────────────────────────────────────┐
│     MySQL 分库分表(8 库 * 256 表)         │
│     - 消息表分片                            │
│     - 会话表分片                            │
└────────────────────────────────────────────┘
    ↓
┌────────────────────────────────────────────┐
│         对象存储(OSS)                     │
│     - 图片、语音、视频                      │
└────────────────────────────────────────────┘

核心改进点

1. 用户路由(在 Redis 中记录用户所在网关)

package router

import (
    "github.com/go-redis/redis/v8"
)

type UserRouter struct {
    rdb *redis.Client
}

// RegisterUser 用户连接时注册路由
func (r *UserRouter) RegisterUser(ctx context.Context, userID int64, gatewayID, connID string) error {
    key := fmt.Sprintf("user:online:%d", userID)
    value := fmt.Sprintf("%s_%s", gatewayID, connID)

    // 设置 5 分钟过期(心跳刷新)
    return r.rdb.Set(ctx, key, value, 5*time.Minute).Err()
}

// GetUserGateway 查询用户所在网关
func (r *UserRouter) GetUserGateway(ctx context.Context, userID int64) (string, string, error) {
    key := fmt.Sprintf("user:online:%d", userID)
    value, err := r.rdb.Get(ctx, key).Result()
    if err != nil {
        return "", "", err
    }

    parts := strings.Split(value, "_")
    if len(parts) != 2 {
        return "", "", errors.New("invalid format")
    }

    return parts[0], parts[1], nil  // gatewayID, connID
}

// UnregisterUser 用户断开连接时注销路由
func (r *UserRouter) UnregisterUser(ctx context.Context, userID int64) error {
    key := fmt.Sprintf("user:online:%d", userID)
    return r.rdb.Del(ctx, key).Err()
}

2. 跨网关消息投递(Kafka)

package delivery

import (
    "github.com/Shopify/sarama"
)

type MessageDelivery struct {
    producer sarama.SyncProducer
    router   *UserRouter
    rpcPool  *GatewayRPCPool
}

// DeliverMessage 投递消息给用户
func (d *MessageDelivery) DeliverMessage(ctx context.Context, msg *Message) error {
    // 1. 查询用户所在网关
    gatewayID, connID, err := d.router.GetUserGateway(ctx, msg.ToUID)
    if err != nil {
        // 用户不在线,保存离线消息
        return d.saveOfflineMessage(msg)
    }

    // 2. 判断是否在本网关
    if gatewayID == currentGatewayID {
        // 本地投递
        return d.deliverLocal(connID, msg)
    } else {
        // 跨网关投递(通过 Kafka)
        return d.deliverRemote(gatewayID, connID, msg)
    }
}

// deliverRemote 跨网关投递
func (d *MessageDelivery) deliverRemote(gatewayID, connID string, msg *Message) error {
    // 构造投递消息
    deliveryMsg := DeliveryMessage{
        GatewayID: gatewayID,
        ConnID:    connID,
        Message:   msg,
    }

    data, _ := json.Marshal(deliveryMsg)

    // 发送到 Kafka(topic 按 gateway_id 分区)
    partition := hashGatewayID(gatewayID) % numPartitions
    _, _, err := d.producer.SendMessage(&sarama.ProducerMessage{
        Topic:     "im-delivery",
        Key:       sarama.StringEncoder(gatewayID),
        Value:     sarama.ByteEncoder(data),
        Partition: int32(partition),
    })

    return err
}

// Kafka 消费者(每个网关订阅自己的分区)
type DeliveryConsumer struct {
    connMgr *ConnectionManager
}

func (c *DeliveryConsumer) Consume(msg *sarama.ConsumerMessage) error {
    var deliveryMsg DeliveryMessage
    if err := json.Unmarshal(msg.Value, &deliveryMsg); err != nil {
        return err
    }

    // 本地投递
    return c.connMgr.SendToConnection(deliveryMsg.ConnID, deliveryMsg.Message)
}

3. 消息 ACK 机制

package ack

type AckManager struct {
    rdb *redis.Client
}

// SendMessage 发送消息并等待 ACK
func (h *MessageHandler) SendMessage(msg *Message) error {
    // 1. 发送消息
    data, _ := json.Marshal(msg)
    if err := h.delivery.DeliverMessage(context.Background(), msg); err != nil {
        return err
    }

    // 2. 等待 ACK(超时 10 秒)
    ackKey := fmt.Sprintf("msg:ack:%s", msg.MsgID)
    for i := 0; i < 20; i++ {  // 重试 20 次,每次 500ms
        exists, _ := h.rdb.Exists(context.Background(), ackKey).Result()
        if exists == 1 {
            // 收到 ACK
            return nil
        }
        time.Sleep(500 * time.Millisecond)
    }

    // 超时未收到 ACK,重试或保存离线消息
    log.Warnf("消息 %s 未收到 ACK", msg.MsgID)
    return errors.New("ACK timeout")
}

// HandleAck 处理客户端 ACK
func (h *MessageHandler) HandleAck(conn *Connection, ack *AckMessage) error {
    ackKey := fmt.Sprintf("msg:ack:%s", ack.MsgID)
    return h.rdb.Set(context.Background(), ackKey, "1", 10*time.Second).Err()
}

4. 离线消息拉取

// PullOfflineMessages 拉取离线消息
func (h *MessageHandler) PullOfflineMessages(userID int64, lastSeq int64) ([]*Message, error) {
    // 1. 查询离线消息索引
    query := `
        SELECT msg_id FROM offline_message
        WHERE user_id = ?
        ORDER BY created_at ASC
        LIMIT 100
    `
    rows, err := h.db.Query(query, userID)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var msgIDs []string
    for rows.Next() {
        var msgID string
        rows.Scan(&msgID)
        msgIDs = append(msgIDs, msgID)
    }

    // 2. 批量查询消息内容
    if len(msgIDs) == 0 {
        return []*Message{}, nil
    }

    placeholders := strings.Repeat("?,", len(msgIDs))
    placeholders = placeholders[:len(placeholders)-1]

    query = fmt.Sprintf(`
        SELECT msg_id, from_uid, to_uid, to_gid, msg_type, content, seq, timestamp
        FROM message
        WHERE msg_id IN (%s)
        ORDER BY timestamp ASC
    `, placeholders)

    args := make([]interface{}, len(msgIDs))
    for i, id := range msgIDs {
        args[i] = id
    }

    rows, err = h.db.Query(query, args...)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var messages []*Message
    for rows.Next() {
        var msg Message
        rows.Scan(&msg.MsgID, &msg.FromUID, &msg.ToUID, &msg.ToGID, &msg.MsgType,
            &msg.Content, &msg.Seq, &msg.Timestamp)
        messages = append(messages, &msg)
    }

    // 3. 删除已拉取的离线消息索引
    deleteSQL := fmt.Sprintf("DELETE FROM offline_message WHERE user_id = ? AND msg_id IN (%s)", placeholders)
    h.db.Exec(deleteSQL, append([]interface{}{userID}, args...)...)

    return messages, nil
}

V2 性能提升

性能对比:
┌────────────────────────────────────────────────┐
│ 指标              V1 版本      V2 版本         │
├────────────────────────────────────────────────┤
│ 最大在线用户      10 万       5000 万          │
│ QPS               5,000       18 万            │
│ 消息延迟 P99      200ms       100ms            │
│ 单点故障          是          否               │
│ 水平扩展          否          是               │
└────────────────────────────────────────────────┘

优化效果:
 支持水平扩展(网关集群 + 消息服务集群)
 消息不丢失(Kafka 持久化)
 跨网关通信(用户路由 + Kafka)
 高可用(任一网关宕机不影响其他)

V3 版本:高可用 + 全链路优化

架构图

                         客户端
                             ↓
        ┌────────────────────────────────────────┐
        │         DNS 智能解析(就近接入)        │
        └────────────────────────────────────────┘
                             ↓
    ┌───────────┬───────────┬───────────┬──────────┐
    │   华北    │   华东    │   华南    │  西南    │
    │   Region  │   Region  │   Region  │  Region  │
    └───────────┴───────────┴───────────┴──────────┘
                             ↓
        ┌────────────────────────────────────────┐
        │     负载均衡(LVS + Nginx)             │
        └────────────────────────────────────────┘
                             ↓
        ┌────────────────────────────────────────┐
        │    WebSocket 网关集群(1000 台)        │
        │    - 长连接管理(单机 10 万连接)       │
        │    - 心跳保活                           │
        │    - 消息收发                           │
        └────────────────────────────────────────┘
              ↓                           ↓
    ┌──────────────────┐       ┌──────────────────┐
    │   Redis 集群     │       │   Kafka 集群     │
    │   (哨兵 + 分片)   │       │   (3 副本)       │
    │   - 用户路由     │       │   - 消息投递     │
    │   - 在线状态     │       │   - 离线消息     │
    │   - 会话序号     │       │   - 消息总线     │
    └──────────────────┘       └──────────────────┘
                                      ↓
                         ┌──────────────────────┐
                         │  消息处理集群         │
                         │  (200 台)            │
                         │  - 消息持久化         │
                         │  - 离线消息           │
                         │  - 推送服务           │
                         └──────────────────────┘
                                      ↓
        ┌────────────────────────────────────────┐
        │     MySQL 分库分表(16 库 * 256 表)    │
        │     - 消息表按时间 + session_id 分片   │
        │     - 会话表按 user_id 分片             │
        └────────────────────────────────────────┘
                             ↓
        ┌────────────────────────────────────────┐
        │         对象存储 + CDN                  │
        │     - 图片、语音、视频                  │
        │     - 全球加速                          │
        └────────────────────────────────────────┘
                             ↓
        ┌────────────────────────────────────────┐
        │     监控告警 (Prometheus + Grafana)     │
        │     - 连接数监控                        │
        │     - 消息延迟监控                      │
        │     - 链路追踪                          │
        └────────────────────────────────────────┘

核心优化点

1. 多端同步

package sync

type MultiDeviceSync struct {
    router *UserRouter
}

// BroadcastToAllDevices 广播消息到用户所有设备
func (s *MultiDeviceSync) BroadcastToAllDevices(ctx context.Context, userID int64, msg *Message) error {
    // 1. 查询用户所有在线设备
    key := fmt.Sprintf("user:sessions:%d", userID)
    sessions, err := s.rdb.HGetAll(ctx, key).Result()
    if err != nil {
        return err
    }

    // 2. 遍历所有设备,发送消息
    for deviceID, gatewayConn := range sessions {
        parts := strings.Split(gatewayConn, "_")
        gatewayID, connID := parts[0], parts[1]

        log.Infof("发送消息到用户 %d 设备 %s (网关: %s)", userID, deviceID, gatewayID)

        if err := s.delivery.DeliverToGateway(gatewayID, connID, msg); err != nil {
            log.Errorf("发送失败: %v", err)
        }
    }

    return nil
}

2. 消息顺序保证

package seq

type SequenceManager struct {
    rdb *redis.Client
}

// AllocateSeq 为消息分配序号(会话内单调递增)
func (s *SequenceManager) AllocateSeq(ctx context.Context, sessionID string) (int64, error) {
    key := fmt.Sprintf("session:seq:%s", sessionID)
    seq, err := s.rdb.Incr(ctx, key).Result()
    return seq, err
}

// 消息按 seq 排序
type MessageSorter []*Message

func (m MessageSorter) Len() int           { return len(m) }
func (m MessageSorter) Less(i, j int) bool { return m[i].Seq < m[j].Seq }
func (m MessageSorter) Swap(i, j int)      { m[i], m[j] = m[j], m[i] }

// 客户端按 seq 去重和排序
func (c *Client) ProcessMessages(messages []*Message) {
    // 1. 去重(检查 seq 是否已处理)
    var newMessages []*Message
    for _, msg := range messages {
        if !c.isSeqProcessed(msg.Seq) {
            newMessages = append(newMessages, msg)
            c.markSeqProcessed(msg.Seq)
        }
    }

    // 2. 排序
    sort.Sort(MessageSorter(newMessages))

    // 3. 处理消息
    for _, msg := range newMessages {
        c.handleMessage(msg)
    }
}

3. 群聊消息扩散(读扩散 vs 写扩散)

【写扩散】(WeChat 模式)
- 消息发送时,复制 N 份给每个群成员
- 优点:读取快(每个用户只读自己的消息)
- 缺点:写入慢(N 次写操作)
- 适用:普通群(< 500 人)

【读扩散】(Slack 模式)
- 消息只写一份(group_id)
- 读取时查询群消息
- 优点:写入快(1 次写操作)
- 缺点:读取慢(需要过滤)
- 适用:超级群(> 500 人)
// 写扩散:普通群
func (h *MessageHandler) SendGroupMessage_WriteFanout(msg *Message) error {
    // 1. 保存一份群消息
    if err := h.saveMessage(msg); err != nil {
        return err
    }

    // 2. 查询群成员
    members, _ := h.getGroupMembers(msg.ToGID)

    // 3. 为每个成员创建一条消息记录(写扩散)
    for _, memberID := range members {
        if memberID == msg.FromUID {
            continue
        }

        // 创建会话消息
        sessionMsg := *msg
        sessionMsg.SessionID = fmt.Sprintf("group_%d_user_%d", msg.ToGID, memberID)
        sessionMsg.Seq, _ = h.seqMgr.AllocateSeq(context.Background(), sessionMsg.SessionID)

        // 保存到用户会话表
        h.saveSessionMessage(memberID, &sessionMsg)

        // 推送或离线
        if h.router.IsOnline(memberID) {
            h.delivery.DeliverMessage(context.Background(), &sessionMsg)
        } else {
            h.saveOfflineMessage(memberID, sessionMsg.MsgID)
        }
    }

    return nil
}

// 读扩散:超级群
func (h *MessageHandler) SendGroupMessage_ReadFanout(msg *Message) error {
    // 1. 只保存一份群消息
    if err := h.saveMessage(msg); err != nil {
        return err
    }

    // 2. 推送给在线成员(通过 Kafka 广播)
    h.kafka.Publish("group-message", GroupMessageEvent{
        GroupID: msg.ToGID,
        Message: msg,
    })

    return nil
}

// 读扩散:拉取群消息
func (h *MessageHandler) PullGroupMessages(userID, groupID, lastSeq int64) ([]*Message, error) {
    // 查询群消息(读时过滤)
    query := `
        SELECT msg_id, from_uid, msg_type, content, seq, timestamp
        FROM message
        WHERE to_gid = ? AND seq > ?
        ORDER BY seq ASC
        LIMIT 100
    `
    // ...
}

4. 数据库分片策略

package sharding

// 消息表分片:按 session_id 哈希 + 按月分表
func GetMessageTable(sessionID string, timestamp int64) string {
    // 1. 按 session_id 哈希,分 16 个库
    dbIndex := crc32.ChecksumIEEE([]byte(sessionID)) % 16

    // 2. 按时间分表(每月一张表)
    month := time.Unix(timestamp/1000, 0).Format("200601")  // 202511

    return fmt.Sprintf("db%d.message_%s", dbIndex, month)
}

// 会话表分片:按 user_id 分 256 张表
func GetSessionTable(userID int64) string {
    dbIndex := userID % 16
    tableIndex := (userID / 16) % 256
    return fmt.Sprintf("db%d.session_%d", dbIndex, tableIndex)
}

// 使用示例
func (h *MessageHandler) SaveMessage(msg *Message) error {
    tableName := GetMessageTable(msg.SessionID, msg.Timestamp)

    sql := fmt.Sprintf(`
        INSERT INTO %s (msg_id, session_id, from_uid, to_uid, msg_type, content, seq, timestamp)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    `, tableName)

    _, err := h.db.Exec(sql, msg.MsgID, msg.SessionID, msg.FromUID, msg.ToUID,
        msg.MsgType, msg.Content, msg.Seq, msg.Timestamp)
    return err
}

5. 断线重连与消息补偿

package reconnect

// ClientReconnect 客户端断线重连
func (c *Client) Reconnect() error {
    // 1. 指数退避重连
    backoff := 1 * time.Second
    maxBackoff := 60 * time.Second

    for {
        if err := c.Connect(); err == nil {
            // 连接成功
            c.onReconnected()
            return nil
        }

        log.Infof("重连失败,%v 后重试", backoff)
        time.Sleep(backoff)

        backoff *= 2
        if backoff > maxBackoff {
            backoff = maxBackoff
        }
    }
}

// onReconnected 重连成功后的处理
func (c *Client) onReconnected() {
    // 1. 拉取离线消息
    lastSeq := c.getLastSeq()
    messages, _ := c.pullOfflineMessages(lastSeq)

    // 2. 处理消息
    c.ProcessMessages(messages)

    // 3. 同步未读数
    c.syncUnreadCount()

    log.Info("重连成功,消息已同步")
}

V3 架构优势

高可用保障:
┌────────────────────────────────────────────┐
│ 1. 多地域部署                               │
│    - 4 个地域就近接入                       │
│    - DNS 智能解析                           │
│                                             │
│ 2. 服务多活                                 │
│    - 网关集群无状态                         │
│    - 任一节点故障不影响                     │
│                                             │
│ 3. 数据高可用                               │
│    - Redis 哨兵(主从自动切换)             │
│    - MySQL 主从 + 分库分表                  │
│    - Kafka 3 副本                           │
│                                             │
│ 4. 消息可靠性                               │
│    - ACK 机制                               │
│    - 离线消息                               │
│    - 消息补偿                               │
│                                             │
│ 5. 监控告警                                 │
│    - 全链路监控                             │
│    - 实时告警                               │
│    - 链路追踪                               │
└────────────────────────────────────────────┘

核心算法与实现

1. 消息 ID 生成(Snowflake)

package idgen

import (
    "fmt"
    "sync"
    "time"
)

// SnowflakeIDGenerator 雪花算法 ID 生成器
type SnowflakeIDGenerator struct {
    mu           sync.Mutex
    timestamp    int64
    machineID    int64  // 机器 ID (10 bits)
    sequence     int64  // 序列号 (12 bits)
}

const (
    epoch           = int64(1609459200000)  // 2021-01-01
    machineBits     = uint(10)
    sequenceBits    = uint(12)
    machineMax      = int64(-1) ^ (int64(-1) << machineBits)
    sequenceMask    = int64(-1) ^ (int64(-1) << sequenceBits)
    machineShift    = sequenceBits
    timestampShift  = machineBits + sequenceBits
)

func NewSnowflakeIDGenerator(machineID int64) *SnowflakeIDGenerator {
    return &SnowflakeIDGenerator{
        machineID: machineID,
    }
}

func (s *SnowflakeIDGenerator) NextID() (int64, error) {
    s.mu.Lock()
    defer s.mu.Unlock()

    now := time.Now().UnixNano() / 1e6

    if now < s.timestamp {
        return 0, fmt.Errorf("clock moved backwards")
    }

    if now == s.timestamp {
        s.sequence = (s.sequence + 1) & sequenceMask
        if s.sequence == 0 {
            for now <= s.timestamp {
                now = time.Now().UnixNano() / 1e6
            }
        }
    } else {
        s.sequence = 0
    }

    s.timestamp = now

    id := ((now - epoch) << timestampShift) |
          (s.machineID << machineShift) |
          s.sequence

    return id, nil
}

func GenerateMessageID(machineID int64) string {
    gen := NewSnowflakeIDGenerator(machineID)
    id, _ := gen.NextID()
    return fmt.Sprintf("%d", id)
}

2. 一致性哈希(网关负载均衡)

package consistent

import (
    "hash/crc32"
    "sort"
    "sync"
)

type ConsistentHash struct {
    ring       map[uint32]string  // 哈希环
    sortedKeys []uint32           // 排序的哈希值
    nodes      map[string]bool    // 节点集合
    replicas   int                // 虚拟节点数
    mu         sync.RWMutex
}

func NewConsistentHash(replicas int) *ConsistentHash {
    return &ConsistentHash{
        ring:     make(map[uint32]string),
        nodes:    make(map[string]bool),
        replicas: replicas,
    }
}

// AddNode 添加节点
func (c *ConsistentHash) AddNode(node string) {
    c.mu.Lock()
    defer c.mu.Unlock()

    if c.nodes[node] {
        return
    }

    c.nodes[node] = true

    // 添加虚拟节点
    for i := 0; i < c.replicas; i++ {
        hash := c.hashKey(fmt.Sprintf("%s#%d", node, i))
        c.ring[hash] = node
        c.sortedKeys = append(c.sortedKeys, hash)
    }

    sort.Slice(c.sortedKeys, func(i, j int) bool {
        return c.sortedKeys[i] < c.sortedKeys[j]
    })
}

// RemoveNode 移除节点
func (c *ConsistentHash) RemoveNode(node string) {
    c.mu.Lock()
    defer c.mu.Unlock()

    if !c.nodes[node] {
        return
    }

    delete(c.nodes, node)

    for i := 0; i < c.replicas; i++ {
        hash := c.hashKey(fmt.Sprintf("%s#%d", node, i))
        delete(c.ring, hash)
    }

    // 重建 sortedKeys
    c.sortedKeys = c.sortedKeys[:0]
    for k := range c.ring {
        c.sortedKeys = append(c.sortedKeys, k)
    }
    sort.Slice(c.sortedKeys, func(i, j int) bool {
        return c.sortedKeys[i] < c.sortedKeys[j]
    })
}

// GetNode 获取节点
func (c *ConsistentHash) GetNode(key string) string {
    c.mu.RLock()
    defer c.mu.RUnlock()

    if len(c.sortedKeys) == 0 {
        return ""
    }

    hash := c.hashKey(key)

    // 二分查找
    idx := sort.Search(len(c.sortedKeys), func(i int) bool {
        return c.sortedKeys[i] >= hash
    })

    if idx == len(c.sortedKeys) {
        idx = 0
    }

    return c.ring[c.sortedKeys[idx]]
}

func (c *ConsistentHash) hashKey(key string) uint32 {
    return crc32.ChecksumIEEE([]byte(key))
}

// 使用示例:根据 user_id 选择网关
func SelectGateway(userID int64, gateways []string) string {
    ch := NewConsistentHash(150)  // 150 个虚拟节点

    for _, gateway := range gateways {
        ch.AddNode(gateway)
    }

    return ch.GetNode(fmt.Sprintf("%d", userID))
}

3. 消息去重(布隆过滤器 + Redis)

package dedup

import (
    "github.com/bits-and-blooms/bloom/v3"
)

type MessageDeduplicator struct {
    bloom *bloom.BloomFilter
    rdb   *redis.Client
}

func NewMessageDeduplicator(expectedItems uint) *MessageDeduplicator {
    return &MessageDeduplicator{
        bloom: bloom.NewWithEstimates(expectedItems, 0.01),  // 1% 误判率
    }
}

// IsDuplicate 检查消息是否重复
func (d *MessageDeduplicator) IsDuplicate(ctx context.Context, clientMsgID string) (bool, error) {
    // 1. 先用布隆过滤器快速判断
    if !d.bloom.TestString(clientMsgID) {
        // 肯定不存在
        d.bloom.AddString(clientMsgID)
        return false, nil
    }

    // 2. 布隆过滤器说可能存在,进一步检查 Redis
    key := fmt.Sprintf("msg:dedup:%s", clientMsgID)
    exists, err := d.rdb.Exists(ctx, key).Result()
    if err != nil {
        return false, err
    }

    if exists == 1 {
        // 确实重复
        return true, nil
    }

    // 不重复,记录到 Redis(24 小时过期)
    d.rdb.Set(ctx, key, "1", 24*time.Hour)
    return false, nil
}

4. 限流算法(滑动窗口)

package ratelimit

type SlidingWindowRateLimiter struct {
    rdb *redis.Client
}

// Allow 判断是否允许通过(滑动窗口)
func (r *SlidingWindowRateLimiter) Allow(ctx context.Context, userID int64, limit int) (bool, error) {
    key := fmt.Sprintf("ratelimit:user:%d", userID)
    now := time.Now().UnixMilli()
    windowStart := now - 60000  // 1 分钟窗口

    // Lua 脚本保证原子性
    script := `
        local key = KEYS[1]
        local now = tonumber(ARGV[1])
        local window_start = tonumber(ARGV[2])
        local limit = tonumber(ARGV[3])

        -- 删除窗口外的记录
        redis.call('ZREMRANGEBYSCORE', key, 0, window_start)

        -- 统计窗口内的请求数
        local count = redis.call('ZCARD', key)

        if count < limit then
            -- 允许通过,记录本次请求
            redis.call('ZADD', key, now, now)
            redis.call('EXPIRE', key, 60)
            return 1
        else
            return 0
        end
    `

    result, err := r.rdb.Eval(ctx, script, []string{key}, now, windowStart, limit).Int()
    return result == 1, err
}

优化方案

1. 长连接优化

【TCP 优化】
# /etc/sysctl.conf
net.ipv4.tcp_tw_reuse = 1           # TIME_WAIT 重用
net.ipv4.tcp_tw_recycle = 0         # 禁用(NAT 环境有问题)
net.ipv4.tcp_fin_timeout = 15       # FIN_WAIT_2 超时时间
net.ipv4.tcp_keepalive_time = 300   # TCP keepalive 时间
net.ipv4.tcp_keepalive_intvl = 30
net.ipv4.tcp_keepalive_probes = 3

net.core.somaxconn = 65535          # 连接队列长度
net.ipv4.tcp_max_syn_backlog = 8192 # SYN 队列长度

# 文件描述符限制
ulimit -n 1000000

【应用层心跳】
- 客户端每 30 秒发送 ping
- 服务器 2 分钟未收到心跳,关闭连接
- 避免僵尸连接占用资源

【连接迁移】
- 用户从 WiFi 切换到 4G 网络
- IP 地址变化,WebSocket 断开
- 客户端自动重连,拉取离线消息

2. 消息压缩

package compression

import (
    "bytes"
    "compress/gzip"
    "io"
)

// CompressMessage 压缩消息
func CompressMessage(data []byte) ([]byte, error) {
    var buf bytes.Buffer
    writer := gzip.NewWriter(&buf)

    if _, err := writer.Write(data); err != nil {
        return nil, err
    }

    if err := writer.Close(); err != nil {
        return nil, err
    }

    return buf.Bytes(), nil
}

// DecompressMessage 解压消息
func DecompressMessage(data []byte) ([]byte, error) {
    reader, err := gzip.NewReader(bytes.NewReader(data))
    if err != nil {
        return nil, err
    }
    defer reader.Close()

    return io.ReadAll(reader)
}

// 使用场景:消息内容 > 1KB 时压缩
func (h *MessageHandler) SendMessage(msg *Message) error {
    data, _ := json.Marshal(msg)

    if len(data) > 1024 {
        // 压缩
        compressed, _ := CompressMessage(data)
        msg.Compressed = true
        msg.Content = base64.StdEncoding.EncodeToString(compressed)
    }

    // 发送...
}

3. 数据库冷热分离

【冷热数据划分】
热数据(最近 3 个月):
  - 存储:SSD
  - 查询频繁
  - 索引齐全

温数据(3-12 个月):
  - 存储:HDD
  - 查询较少
  - 保留关键索引

冷数据(>12 个月):
  - 存储:对象存储(S3/OSS)
  - 查询极少
  - 压缩存储

【自动归档】
- 定时任务每天凌晨执行
- 将 3 个月前的消息迁移到温数据表
- 将 12 个月前的消息归档到对象存储
- 删除原表数据
package archive

type MessageArchiver struct {
    db  *sql.DB
    oss *OSSClient
}

// ArchiveColdData 归档冷数据
func (a *MessageArchiver) ArchiveColdData() error {
    // 1. 查询 12 个月前的消息
    threshold := time.Now().AddDate(0, -12, 0)

    query := `
        SELECT * FROM message
        WHERE created_at < ?
        LIMIT 10000
    `
    rows, _ := a.db.Query(query, threshold)
    defer rows.Close()

    var messages []*Message
    for rows.Next() {
        var msg Message
        // scan...
        messages = append(messages, &msg)
    }

    if len(messages) == 0 {
        return nil
    }

    // 2. 压缩并上传到 OSS
    data, _ := json.Marshal(messages)
    compressed, _ := gzip.Compress(data)

    fileName := fmt.Sprintf("message_archive_%s.gz", time.Now().Format("20060102"))
    if err := a.oss.Upload(fileName, compressed); err != nil {
        return err
    }

    // 3. 删除原数据
    msgIDs := make([]string, len(messages))
    for i, msg := range messages {
        msgIDs[i] = msg.MsgID
    }

    placeholders := strings.Repeat("?,", len(msgIDs))
    placeholders = placeholders[:len(placeholders)-1]

    deleteSQL := fmt.Sprintf("DELETE FROM message WHERE msg_id IN (%s)", placeholders)
    args := make([]interface{}, len(msgIDs))
    for i, id := range msgIDs {
        args[i] = id
    }

    _, err := a.db.Exec(deleteSQL, args...)
    return err
}

4. 多媒体优化

【图片优化】
- 客户端上传前压缩(WebP 格式)
- 服务器生成缩略图(小图、中图、原图)
- CDN 加速分发
- 懒加载(滚动到可视区域再加载)

【语音优化】
- AMR 格式(高压缩比)
- 分片上传(大文件)
- 断点续传

【视频优化】
- H.264 编码
- 自适应码率(根据网络状况切换清晰度)
- CDN 分发

5. 缓存策略

【多级缓存】
L1: 客户端本地缓存
    - SQLite 存储消息
    - 减少网络请求

L2: CDN 缓存
    - 多媒体文件
    - 静态资源

L3: Redis 缓存
    - 用户在线状态
    - 会话信息
    - 群成员列表
    - TTL: 1 小时

L4: 应用本地缓存
    - 配置信息
    - 热点数据
    - LRU 淘汰

L5: 数据库
    - 持久化存储

监控告警

核心监控指标

1. 连接指标

【Prometheus 指标】

# 当前连接数
im_connections_total{gateway="gateway-1"}

# 连接建立速率
im_connections_established_rate

# 连接断开速率
im_connections_closed_rate

# 平均连接时长
im_connection_duration_seconds
var (
    ConnectionsTotal = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "im_connections_total",
            Help: "Current WebSocket connections",
        },
        []string{"gateway"},
    )

    ConnectionEstablishedTotal = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "im_connections_established_total",
            Help: "Total connections established",
        },
        []string{"gateway"},
    )
)

// 使用示例
func (cm *ConnectionManager) AddConnection(conn *Connection) {
    cm.connections[conn.UserID] = conn
    ConnectionsTotal.WithLabelValues(gatewayID).Inc()
}

2. 消息指标

【核心指标】
# 消息发送总数
im_messages_sent_total{type="text|image|voice"}

# 消息接收总数
im_messages_received_total

# 消息延迟(端到端)
im_message_latency_seconds{quantile="0.5|0.9|0.99"}

# 消息丢失率
im_message_loss_rate

# 离线消息积压
im_offline_messages_pending{user_id}

3. Grafana 看板

【实时监控大屏】
┌────────────────────────────────────────────────┐
│  IM 系统实时监控                                │
├────────────────────────────────────────────────┤
│  [在线用户]  [消息 QPS]  [P99 延迟]  [成功率]  │
│   5000 万     18 万      100ms      99.9%      │
├────────────────────────────────────────────────┤
│  [连接数曲线(24 小时)]                        │
│  ▁▂▃▅▇█████▇▅▃▂▁                               │
├────────────────────────────────────────────────┤
│  [消息延迟分布]                                 │
│  P50: 50ms   P90: 80ms   P99: 100ms            │
├────────────────────────────────────────────────┤
│  [各网关连接数]                                 │
│  gateway-1: 100万                               │
│  gateway-2: 95万                                │
│  gateway-3: 90万                                │
└────────────────────────────────────────────────┘

告警规则

groups:
  - name: im_alerts
    rules:
      # 连接数过高
      - alert: HighConnections
        expr: im_connections_total > 100000
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "网关连接数过高"
          description: "{{ $labels.gateway }} 连接数 {{ $value }} 超过 10 万"

      # 消息延迟过高
      - alert: HighMessageLatency
        expr: histogram_quantile(0.99, rate(im_message_latency_seconds_bucket[5m])) > 0.5
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "消息延迟过高"
          description: "P99 延迟 {{ $value }}s 超过 500ms"

      # 消息丢失
      - alert: MessageLoss
        expr: rate(im_message_loss_total[5m]) > 0.01
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "消息丢失"
          description: "消息丢失率 {{ $value }} 超过 1%"

      # 离线消息积压
      - alert: OfflineMessageBacklog
        expr: im_offline_messages_pending > 5000
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "离线消息积压"
          description: "用户 {{ $labels.user_id }} 离线消息积压 {{ $value }} 条"

面试问答

如何保证消息不丢失?

【多重保障】
1. 客户端层
   - 发送消息后等待服务器 ACK
   - 超时未收到 ACK,自动重试 3 次
   - 重试失败,标记为发送失败

2. 网关层
   - 收到消息立即 ACK
   - 写入 Kafka(持久化)
   - Kafka ACK 成功才算真正接收

3. Kafka 层
   - 3 副本存储
   - ISR 机制(至少 2 个副本确认)
   - 消息持久化到磁盘

4. 消费者层
   - 处理成功后才 commit offset
   - 失败重试 3 次
   - 最终失败进入死信队列

5. 数据库层
   - 消息持久化到 MySQL
   - 事务保证原子性

【关键点】
- ACK 机制:每一层都要确认
- 持久化:Kafka + MySQL 双重保障
- 重试:自动重试 + 死信队列
- 监控:消息丢失告警

如何保证消息有序?

【有序性保证】
1. 单聊消息有序
   - 同一会话分配递增的 seq(Redis INCR)
   - 客户端按 seq 排序展示
   - 乱序消息缓存,等待缺失的 seq

2. 群聊消息有序(相对有序)
   - 写扩散:每个用户有独立的 seq
   - 读扩散:按timestamp排序(允许短暂不一致)

3. 跨网关有序
   - 同一会话的消息发往同一分区(Kafka partition)
   - 分区内顺序消费

【不保证全局有序】
- 用户 A 和 B 同时发消息
- 全局有序需要全局锁(性能差)
- IM 系统只需要会话内有序

【客户端去重排序】
- 收到消息后按 seq 排序
- 如果收到 seq=105,但 seq=104 还没收到
- 先缓存 105,等收到 104 后一起展示

如何支持多端同步?

【同步机制】
1. 用户在 Redis 记录所有在线设备
   Key: user:sessions:{user_id}
   Value: {device_id: gateway_conn}

2. 收到消息后,广播到所有设备
   - 遍历 user:sessions
   - 依次发送到各设备网关

3. 已读状态同步
   - 用户在 A 设备读取消息
   - 更新 last_read_seq
   - 推送 sync 事件到其他设备
   - B、C 设备同步更新未读数

4. 离线消息拉取
   - 设备上线后,根据 last_seq 拉取增量消息
   - 确保各设备消息一致

【典型场景】
用户在手机发送消息 → PC、iPad 实时收到
用户在 PC 读消息 → 手机未读数同步清零

群聊消息如何扩散?

【写扩散 vs 读扩散】

写扩散(WeChat 模式):
  - 适用:普通群(< 500 人)
  - 原理:消息发送时,复制 N 份给每个成员
  - 优点:读取快(每人有独立收件箱)
  - 缺点:写入慢(N 次写操作)
  - 实现:
    for member in group_members:
        save_to_inbox(member, message)

读扩散(Slack 模式):
  - 适用:超级群(> 500 人,如直播间)
  - 原理:消息只写一份(group_id)
  - 优点:写入快(1 次写操作)
  - 缺点:读取慢(需要过滤、分页)
  - 实现:
    save_group_message(group_id, message)
    用户读取时: SELECT * FROM message WHERE group_id = ?

混合模式:
  - 普通群用写扩散
  - 超级群用读扩散
  - 根据群大小动态切换

【优化】
- 写扩散:异步写入(Kafka 消费者批量写)
- 读扩散:缓存群消息(Redis List)

如何实现已读未读功能?

【已读未读机制】
1. 未读数统计
   - Redis Hash: user:unread:{user_id}
     Field: session_id
     Value: unread_count

   - 收到新消息: HINCRBY user:unread:10001 session_xxx 1
   - 读取消息: HSET user:unread:10001 session_xxx 0

2. 已读回执(单聊)
   - 用户 A 发消息给 B
   - B 读取后发送 read_ack
   - A 收到 ack,消息标记为"已读"(双勾变蓝)

3. 群已读(复杂)
   - 群成员关系表记录 last_read_seq
   - 点击"谁看过"查询 last_read_seq > msg_seq 的成员

4. 多端同步
   - 用户在 PC 读消息
   - 更新 last_read_seq
   - 推送 sync 事件到手机
   - 手机未读数同步更新

【性能优化】
- 批量更新(不是每条消息都更新)
- 延迟更新(页面离开时更新)
- 缓存未读数(Redis)

如何处理消息重复?

【去重策略】
1. 客户端去重
   - 客户端生成 client_msg_id(UUID)
   - 消息表 UNIQUE KEY (client_msg_id)
   - 重复消息插入失败(忽略)

2. 服务端去重
   - 布隆过滤器 + Redis
   - 收到消息先检查是否存在
   - 存在则丢弃,不存在则处理

3. 消费者幂等
   - Kafka 重复消费
   - 订单号唯一索引
   - 重复插入自动忽略

【场景】
- 用户重复点击发送按钮
- 网络抖动,客户端重试
- Kafka 消费者重复消费

【实现】
func (h *MessageHandler) HandleMessage(msg *Message) error {
    // 检查是否重复
    isDup, _ := h.dedup.IsDuplicate(msg.ClientMsgID)
    if isDup {
        log.Warnf("重复消息: %s", msg.ClientMsgID)
        return nil  // 忽略
    }

    // 处理消息...
}

如何优化长连接性能?

【优化方向】
1. 操作系统优化
   - 文件描述符: ulimit -n 1000000
   - TCP 参数: net.core.somaxconn = 65535
   - TIME_WAIT 优化: net.ipv4.tcp_tw_reuse = 1

2. 应用层优化
   - 使用 epoll(Linux)/ kqueue(Mac)
   - 单机支持 10 万连接(C10K 问题)
   - 心跳检测:30 秒一次(减少开销)

3. 内存优化
   - 连接对象池化
   - 读写缓冲区复用
   - 及时回收僵尸连接

4. 网络优化
   - Nagle 算法:TCP_NODELAY(低延迟)
   - Keep-Alive:防止中间网络设备断开
   - 压缩:大消息启用 Gzip

5. 负载均衡
   - 一致性哈希:用户固定连接到某台网关
   - 减少重连迁移

【C10M 问题】
单机 10 万连接(C10K)已解决
单机 1000 万连接(C10M)需要:
  - 内核旁路(DPDK、Netmap)
  - 用户态协议栈(绕过内核)
  - 零拷贝技术

如何实现在线状态?

【在线状态管理】
1. 基于心跳
   - 客户端每 30 秒发送心跳
   - 服务器收到心跳,更新 Redis TTL
     SET user:online:10001 "1" EX 60
   - 60 秒内无心跳,Redis 过期,视为离线

2. 状态推送
   - 用户 A 上线/离线时,推送给好友
   - 好友列表可能很大(1000 人)
   - 优化:只推送给在线好友

3. 状态查询
   - 打开聊天窗口时,查询对方在线状态
   - Redis: EXISTS user:online:10002

4. 精细化状态
   - 在线:用户活跃
   - 离开:10 分钟无操作
   - 隐身:用户主动设置
   - 离线:断开连接

5. 多端在线
   - user:online:10001 = ["mobile", "pc"]
   - 任一设备在线即为在线

【性能优化】
- 批量查询在线状态(管道)
- 缓存好友在线状态(本地缓存 1 分钟)
- 延迟推送(聚合 5 秒内的状态变化)

如何设计敏感词过滤?

【敏感词过滤】
1. 算法选择
   - DFA(确定有限状态自动机)
   - Trie 树(前缀树)
   - AC 自动机(多模式匹配)
   - 性能:O(n),n 为文本长度

2. 实现
   - 服务端过滤(必须)
   - 客户端过滤(体验优化)

3. 敏感词库
   - 政治敏感词
   - 色情暴力词
   - 广告词
   - 定期更新

4. 处理策略
   - 替换:**、***
   - 拒绝:直接拒绝发送
   - 人工审核:疑似敏感,进队列

5. 变种处理
   - 同音字:习近平 → 习jin平
   - 拆分:法 轮 功
   - 符号:法@轮@功
   - 繁体:習近平

【代码示例】
type SensitiveFilter struct {
    trie *Trie
}

func (f *SensitiveFilter) Filter(text string) string {
    runes := []rune(text)
    for i := 0; i < len(runes); i++ {
        matched := f.trie.Match(runes[i:])
        if matched > 0 {
            // 替换为 ***
            for j := 0; j < matched; j++ {
                runes[i+j] = '*'
            }
        }
    }
    return string(runes)
}

从 0 到 1 设计 IM 系统,步骤是什么?

【设计步骤】
1. 需求分析(5 分钟)
   - 规模:1 亿 DAU,5000 万在线
   - 功能:单聊、群聊、离线消息
   - 一致性:消息不丢失、不重复、有序

2. 容量估算(5 分钟)
   - 连接数:7500 万(多端)
   - QPS:18 万
   - 存储:1 PB/年(消息)+ 220 PB/年(多媒体)
   - 服务器:1000 台网关 + 200 台消息服务

3. API 设计(5 分钟)
   - WebSocket 协议
   - 消息格式(JSON)
   - RESTful API(降级)

4. 数据模型(5 分钟)
   - user, session, message, group 表
   - Redis: 用户路由、在线状态、会话序号

5. 架构设计(20 分钟)
   - V1: 单体架构(快速验证)
   - V2: 分布式 + Kafka(支撑 5000 万在线)
   - V3: 多活 + 全链路优化(生产级)

6. 核心难点(15 分钟)
   - 消息可靠性:ACK + Kafka + MySQL
   - 跨网关通信:用户路由 + Kafka 投递
   - 多端同步:Redis 记录设备 + 广播
   - 消息有序:seq 机制
   - 群聊扩散:写扩散 vs 读扩散

7. 优化方案(5 分钟)
   - 长连接优化(C10K)
   - 消息压缩
   - 数据库分片
   - 冷热分离

【加分项】
- 画架构图
- 写核心代码(WebSocket、消息路由)
- 分析瓶颈(连接数、跨网关通信)
- 提出监控方案

Prev
02 - 秒杀系统设计
Next
04 - Feed 流系统设计