HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

02-大模型面试题

概述

本章聚焦大语言模型 (LLM) 相关的面试题,涵盖模型架构、训练技术、推理优化、应用开发等核心主题。

1. Transformer 架构

1.1 基础问题

Q1: 详细解释 Self-Attention 机制的计算过程?

Self-Attention 计算流程

输入: X ∈ R^(n×d)  (n: 序列长度, d: 维度)

Step 1: 计算 Q, K, V
┌─────────────────────────────────────────────────────┐
│  Q = X × W_Q    (Query)                            │
│  K = X × W_K    (Key)                              │
│  V = X × W_V    (Value)                            │
│                                                     │
│  其中 W_Q, W_K, W_V ∈ R^(d×d_k)                    │
└─────────────────────────────────────────────────────┘

Step 2: 计算注意力分数
┌─────────────────────────────────────────────────────┐
│                    Q × K^T                          │
│  Scores = ─────────────────                         │
│              √d_k                                   │
│                                                     │
│  Scores ∈ R^(n×n)                                  │
│  除以 √d_k 防止梯度消失 (点积值过大导致 softmax 饱和) │
└─────────────────────────────────────────────────────┘

Step 3: Softmax 归一化
┌─────────────────────────────────────────────────────┐
│  Attention_weights = softmax(Scores)               │
│                                                     │
│  每行和为 1,表示对其他位置的关注程度                │
└─────────────────────────────────────────────────────┘

Step 4: 加权求和
┌─────────────────────────────────────────────────────┐
│  Output = Attention_weights × V                    │
│                                                     │
│  Output ∈ R^(n×d_k)                                │
└─────────────────────────────────────────────────────┘
"""
Self-Attention 实现
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class SelfAttention(nn.Module):
    """单头自注意力"""

    def __init__(self, d_model, d_k=None):
        super().__init__()
        self.d_k = d_k or d_model

        self.W_q = nn.Linear(d_model, self.d_k, bias=False)
        self.W_k = nn.Linear(d_model, self.d_k, bias=False)
        self.W_v = nn.Linear(d_model, self.d_k, bias=False)

    def forward(self, x, mask=None):
        """
        Args:
            x: (batch, seq_len, d_model)
            mask: (batch, seq_len, seq_len) or (seq_len, seq_len)
        """
        Q = self.W_q(x)  # (batch, seq_len, d_k)
        K = self.W_k(x)
        V = self.W_v(x)

        # 计算注意力分数
        scores = torch.matmul(Q, K.transpose(-2, -1))  # (batch, seq_len, seq_len)
        scores = scores / math.sqrt(self.d_k)

        # 应用 mask (因果注意力或 padding mask)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))

        # Softmax
        attention_weights = F.softmax(scores, dim=-1)

        # 加权求和
        output = torch.matmul(attention_weights, V)

        return output, attention_weights


class MultiHeadAttention(nn.Module):
    """多头注意力"""

    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def forward(self, x, mask=None):
        batch_size, seq_len, _ = x.shape

        # 线性变换并分头
        Q = self.W_q(x).view(batch_size, seq_len, self.num_heads, self.d_k)
        K = self.W_k(x).view(batch_size, seq_len, self.num_heads, self.d_k)
        V = self.W_v(x).view(batch_size, seq_len, self.num_heads, self.d_k)

        # 转置: (batch, heads, seq_len, d_k)
        Q = Q.transpose(1, 2)
        K = K.transpose(1, 2)
        V = V.transpose(1, 2)

        # 注意力计算
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))

        attention = F.softmax(scores, dim=-1)
        context = torch.matmul(attention, V)

        # 合并多头
        context = context.transpose(1, 2).contiguous()
        context = context.view(batch_size, seq_len, self.d_model)

        # 输出投影
        output = self.W_o(context)

        return output

Q2: 为什么 Transformer 需要位置编码?有哪些位置编码方法?

答案要点:

位置编码的必要性

Self-Attention 的特点:
┌─────────────────────────────────────────────────────┐
│  Attention(Q, K, V) 是置换等变的 (Permutation Equivariant)
│                                                     │
│  即: Attention(Perm(X)) = Perm(Attention(X))       │
│                                                     │
│  这意味着打乱输入顺序,输出也相应打乱               │
│  模型无法区分 "猫吃鱼" 和 "鱼吃猫"                  │
└─────────────────────────────────────────────────────┘

位置编码方法:

1. 正弦位置编码 (Sinusoidal)
┌─────────────────────────────────────────────────────┐
│  PE(pos, 2i) = sin(pos / 10000^(2i/d))             │
│  PE(pos, 2i+1) = cos(pos / 10000^(2i/d))           │
│                                                     │
│  优点: 可外推到更长序列                            │
│  缺点: 固定编码,无法学习                          │
└─────────────────────────────────────────────────────┘

2. 可学习位置编码 (Learned)
┌─────────────────────────────────────────────────────┐
│  PE = Embedding(position_ids)                      │
│                                                     │
│  优点: 可学习最优表示                              │
│  缺点: 无法外推,最大长度固定                      │
└─────────────────────────────────────────────────────┘

3. 旋转位置编码 (RoPE)
┌─────────────────────────────────────────────────────┐
│  将位置信息编码到 Q/K 的旋转中                     │
│  q_m · k_n = f(q, m) · f(k, n) = g(q, k, m-n)     │
│                                                     │
│  优点: 相对位置编码,可外推                        │
│  应用: LLaMA, Qwen 等                              │
└─────────────────────────────────────────────────────┘

4. ALiBi (Attention with Linear Biases)
┌─────────────────────────────────────────────────────┐
│  直接在注意力分数上加线性偏置                       │
│  Scores[i,j] -= m × |i - j|                        │
│                                                     │
│  优点: 简单高效,外推性好                          │
│  应用: BLOOM, MPT 等                               │
└─────────────────────────────────────────────────────┘
"""
位置编码实现
"""

# 1. 正弦位置编码
class SinusoidalPositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)
        )

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]


# 2. 旋转位置编码 (RoPE)
class RotaryPositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=2048, base=10000):
        super().__init__()
        self.d_model = d_model

        # 计算频率
        inv_freq = 1.0 / (base ** (torch.arange(0, d_model, 2).float() / d_model))
        self.register_buffer('inv_freq', inv_freq)

        # 预计算
        self._build_cache(max_len)

    def _build_cache(self, seq_len):
        t = torch.arange(seq_len, device=self.inv_freq.device)
        freqs = torch.outer(t, self.inv_freq)
        emb = torch.cat((freqs, freqs), dim=-1)

        self.register_buffer('cos_cached', emb.cos())
        self.register_buffer('sin_cached', emb.sin())

    def _rotate_half(self, x):
        x1, x2 = x[..., :x.shape[-1]//2], x[..., x.shape[-1]//2:]
        return torch.cat((-x2, x1), dim=-1)

    def forward(self, q, k, seq_len):
        cos = self.cos_cached[:seq_len].unsqueeze(0).unsqueeze(0)
        sin = self.sin_cached[:seq_len].unsqueeze(0).unsqueeze(0)

        q_embed = (q * cos) + (self._rotate_half(q) * sin)
        k_embed = (k * cos) + (self._rotate_half(k) * sin)

        return q_embed, k_embed


# 3. ALiBi
class ALiBiAttention(nn.Module):
    def __init__(self, num_heads):
        super().__init__()
        # 每个头有不同的斜率
        slopes = self._get_slopes(num_heads)
        self.register_buffer('slopes', slopes)

    def _get_slopes(self, n_heads):
        """计算 ALiBi 斜率"""
        def get_slopes_power_of_2(n):
            start = 2 ** (-(2 ** -(math.log2(n) - 3)))
            ratio = start
            return [start * ratio ** i for i in range(n)]

        if math.log2(n_heads).is_integer():
            return torch.tensor(get_slopes_power_of_2(n_heads))
        else:
            closest_power_of_2 = 2 ** math.floor(math.log2(n_heads))
            return torch.tensor(
                get_slopes_power_of_2(closest_power_of_2) +
                get_slopes_power_of_2(2 * closest_power_of_2)[0::2][:n_heads - closest_power_of_2]
            )

    def forward(self, attention_scores, seq_len):
        # 构建位置偏置矩阵
        positions = torch.arange(seq_len, device=attention_scores.device)
        relative_positions = positions.unsqueeze(0) - positions.unsqueeze(1)

        # 应用斜率
        alibi = self.slopes.unsqueeze(1).unsqueeze(1) * relative_positions.abs().unsqueeze(0)

        return attention_scores - alibi

Q3: 解释 Flash Attention 的原理和优势?

答案要点:

标准 Attention vs Flash Attention

标准 Attention 的内存瓶颈:
┌─────────────────────────────────────────────────────┐
│  1. 计算 S = QK^T         需要存储 O(N²) 的矩阵     │
│  2. 计算 P = softmax(S)   需要存储 O(N²) 的矩阵     │
│  3. 计算 O = PV           最终输出                  │
│                                                     │
│  总内存: O(N²)                                      │
│  HBM 带宽成为瓶颈                                   │
└─────────────────────────────────────────────────────┘

GPU 内存层次:
┌─────────────────────────────────────────────────────┐
│  SRAM (片上): ~20MB, ~19TB/s                        │
│       ↕ (快速)                                      │
│  HBM (显存): ~40GB, ~1.5TB/s                        │
│       ↕ (慢速)                                      │
│  CPU RAM                                            │
└─────────────────────────────────────────────────────┘

Flash Attention 核心思想:分块计算,减少 HBM 访问
┌─────────────────────────────────────────────────────┐
│  将 Q, K, V 分成小块,在 SRAM 中完成计算            │
│                                                     │
│  for block_q in Q_blocks:                          │
│      for block_k in K_blocks:                      │
│          # 在 SRAM 中计算                          │
│          S_block = block_q @ block_k.T             │
│          # 增量更新 softmax 和输出                  │
│          update_output(O, S_block)                 │
│                                                     │
│  关键技巧: Online Softmax (增量计算 softmax)        │
└─────────────────────────────────────────────────────┘
"""
Flash Attention 简化实现 (展示核心思想)
"""

def flash_attention_forward(Q, K, V, block_size=64):
    """
    Flash Attention 前向传播

    核心: 分块计算 + Online Softmax
    """
    batch, heads, seq_len, d = Q.shape

    # 输出和统计量
    O = torch.zeros_like(Q)
    L = torch.zeros(batch, heads, seq_len, 1, device=Q.device)  # log-sum-exp
    M = torch.full((batch, heads, seq_len, 1), float('-inf'), device=Q.device)  # max

    # 分块
    num_blocks = (seq_len + block_size - 1) // block_size

    for block_j in range(num_blocks):
        # K, V 块
        j_start = block_j * block_size
        j_end = min(j_start + block_size, seq_len)

        K_j = K[:, :, j_start:j_end, :]
        V_j = V[:, :, j_start:j_end, :]

        for block_i in range(num_blocks):
            # Q 块
            i_start = block_i * block_size
            i_end = min(i_start + block_size, seq_len)

            Q_i = Q[:, :, i_start:i_end, :]

            # 计算注意力分数
            S_ij = torch.matmul(Q_i, K_j.transpose(-2, -1)) / math.sqrt(d)

            # 因果 mask
            if j_start > i_end - 1:  # 这个 K 块完全在 Q 块之后
                continue

            # Online Softmax 更新
            M_i = M[:, :, i_start:i_end, :]
            L_i = L[:, :, i_start:i_end, :]
            O_i = O[:, :, i_start:i_end, :]

            # 新的 max
            M_ij = S_ij.max(dim=-1, keepdim=True).values
            M_new = torch.maximum(M_i, M_ij)

            # 更新 exp 和 sum
            exp_old = torch.exp(M_i - M_new)
            exp_new = torch.exp(S_ij - M_new)
            L_new = exp_old * L_i + exp_new.sum(dim=-1, keepdim=True)

            # 更新输出
            O_new = (exp_old * O_i + torch.matmul(exp_new, V_j)) / L_new

            # 写回
            M[:, :, i_start:i_end, :] = M_new
            L[:, :, i_start:i_end, :] = L_new
            O[:, :, i_start:i_end, :] = O_new

    return O


# 实际使用 Flash Attention
from flash_attn import flash_attn_func

def use_flash_attention(q, k, v, causal=True):
    """
    使用 Flash Attention 库

    要求:
    - q, k, v: (batch, seq_len, heads, head_dim)
    - 半精度 (fp16/bf16)
    """
    output = flash_attn_func(
        q, k, v,
        causal=causal,
        softmax_scale=1.0 / math.sqrt(q.shape[-1])
    )
    return output

1.2 进阶问题

Q4: 比较 GPT 和 BERT 的架构差异?各适用什么场景?

答案要点:

GPT vs BERT 架构对比

┌─────────────────────────────────────────────────────────────────────┐
│                          GPT (Decoder-Only)                         │
├─────────────────────────────────────────────────────────────────────┤
│  注意力模式: 因果注意力 (Causal Attention)                          │
│                                                                     │
│  Token 1: [●][ ][ ][ ][ ]    只能看到自己                          │
│  Token 2: [●][●][ ][ ][ ]    能看到 1                               │
│  Token 3: [●][●][●][ ][ ]    能看到 1,2                             │
│  Token 4: [●][●][●][●][ ]    能看到 1,2,3                           │
│                                                                     │
│  训练目标: 下一个 token 预测 (Next Token Prediction)                │
│  P(x_t | x_1, x_2, ..., x_{t-1})                                   │
│                                                                     │
│  适用场景: 文本生成、对话、代码生成                                 │
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│                          BERT (Encoder-Only)                        │
├─────────────────────────────────────────────────────────────────────┤
│  注意力模式: 双向注意力 (Bidirectional Attention)                   │
│                                                                     │
│  Token 1: [●][●][●][●][●]    能看到所有                             │
│  Token 2: [●][●][●][●][●]    能看到所有                             │
│  Token 3: [●][●][●][●][●]    能看到所有                             │
│                                                                     │
│  训练目标: 掩码语言模型 (Masked Language Model)                     │
│  P(x_mask | x_1, ..., x_{mask-1}, x_{mask+1}, ..., x_n)            │
│                                                                     │
│  适用场景: 文本分类、NER、问答、语义相似度                          │
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│                          T5 (Encoder-Decoder)                       │
├─────────────────────────────────────────────────────────────────────┤
│  Encoder: 双向注意力处理输入                                        │
│  Decoder: 因果注意力 + Cross-Attention                              │
│                                                                     │
│  训练目标: Span Corruption + Seq2Seq                                │
│                                                                     │
│  适用场景: 翻译、摘要、问答                                         │
└─────────────────────────────────────────────────────────────────────┘
"""
不同架构的实现对比
"""

# GPT 风格 (Decoder-Only)
class GPTBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super().__init__()
        self.ln1 = nn.LayerNorm(d_model)
        self.attn = CausalSelfAttention(d_model, num_heads)
        self.ln2 = nn.LayerNorm(d_model)
        self.mlp = MLP(d_model, d_ff)

    def forward(self, x):
        # Pre-LN 架构
        x = x + self.attn(self.ln1(x))
        x = x + self.mlp(self.ln2(x))
        return x


class CausalSelfAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.mha = MultiHeadAttention(d_model, num_heads)

    def forward(self, x):
        seq_len = x.size(1)
        # 因果 mask
        mask = torch.tril(torch.ones(seq_len, seq_len, device=x.device))
        return self.mha(x, mask=mask)


# BERT 风格 (Encoder-Only)
class BERTBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super().__init__()
        self.attn = nn.MultiheadAttention(d_model, num_heads)
        self.ln1 = nn.LayerNorm(d_model)
        self.mlp = MLP(d_model, d_ff)
        self.ln2 = nn.LayerNorm(d_model)

    def forward(self, x, attention_mask=None):
        # Post-LN 架构 (原始 BERT)
        attn_out, _ = self.attn(x, x, x, key_padding_mask=attention_mask)
        x = self.ln1(x + attn_out)
        x = self.ln2(x + self.mlp(x))
        return x


# T5 风格 (Encoder-Decoder)
class T5Block(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, is_decoder=False):
        super().__init__()
        self.is_decoder = is_decoder

        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.ln1 = nn.LayerNorm(d_model)

        if is_decoder:
            self.cross_attn = MultiHeadAttention(d_model, num_heads)
            self.ln2 = nn.LayerNorm(d_model)

        self.mlp = MLP(d_model, d_ff)
        self.ln_final = nn.LayerNorm(d_model)

    def forward(self, x, encoder_output=None, causal_mask=None):
        # Self-Attention
        x = x + self.self_attn(self.ln1(x), mask=causal_mask)

        # Cross-Attention (仅 Decoder)
        if self.is_decoder and encoder_output is not None:
            x = x + self.cross_attn(
                self.ln2(x),
                encoder_output,
                encoder_output
            )

        # FFN
        x = x + self.mlp(self.ln_final(x))
        return x

Q5: MoE (Mixture of Experts) 架构的原理和挑战?

答案要点:

MoE 架构

基本原理:
┌─────────────────────────────────────────────────────────────────────┐
│                                                                     │
│  输入 x ──▶ Router ──▶ 选择 Top-K 专家                             │
│              │                                                       │
│              ▼                                                       │
│       ┌──────────────────────────────────────────┐                 │
│       │  Expert 1  │  Expert 2  │  ...  │ Expert N │                │
│       │   (FFN)    │   (FFN)    │       │  (FFN)   │                │
│       └──────────────────────────────────────────┘                 │
│              │                                                       │
│              ▼                                                       │
│         加权求和 ──▶ 输出                                           │
│                                                                     │
│  y = Σ g(x)_i × Expert_i(x)                                        │
│  其中 g(x) 是 router 输出的权重                                     │
└─────────────────────────────────────────────────────────────────────┘

优势:
┌─────────────────────────────────────────────────────────────────────┐
│  1. 参数量大但计算量可控                                            │
│     - 总参数: N × 专家参数                                          │
│     - 激活参数: K × 专家参数 (K << N)                               │
│                                                                     │
│  2. 专家分工                                                        │
│     - 不同专家学习不同的知识/能力                                   │
└─────────────────────────────────────────────────────────────────────┘

挑战:
┌─────────────────────────────────────────────────────────────────────┐
│  1. 负载不均衡                                                      │
│     - 某些专家被频繁选择,其他专家闲置                              │
│     - 解决: 辅助损失 (Auxiliary Loss)                               │
│                                                                     │
│  2. 训练不稳定                                                      │
│     - Router 梯度稀疏                                               │
│     - 解决: Expert Choice, Token Choice 等策略                      │
│                                                                     │
│  3. 分布式通信                                                      │
│     - 专家分布在不同 GPU,需要 All-to-All 通信                      │
│     - 解决: 专家并行 + 优化通信                                     │
└─────────────────────────────────────────────────────────────────────┘
"""
MoE 实现
"""

class Router(nn.Module):
    """Top-K 路由器"""

    def __init__(self, d_model, num_experts, top_k=2):
        super().__init__()
        self.top_k = top_k
        self.num_experts = num_experts
        self.gate = nn.Linear(d_model, num_experts, bias=False)

    def forward(self, x):
        """
        Args:
            x: (batch, seq_len, d_model)

        Returns:
            gates: (batch, seq_len, top_k) - 权重
            indices: (batch, seq_len, top_k) - 专家索引
        """
        # 计算路由概率
        logits = self.gate(x)  # (batch, seq_len, num_experts)

        # Top-K 选择
        top_k_logits, top_k_indices = torch.topk(logits, self.top_k, dim=-1)

        # Softmax 归一化 (只在选中的专家上)
        top_k_gates = F.softmax(top_k_logits, dim=-1)

        return top_k_gates, top_k_indices


class MoELayer(nn.Module):
    """MoE 层"""

    def __init__(self, d_model, d_ff, num_experts=8, top_k=2):
        super().__init__()
        self.num_experts = num_experts
        self.top_k = top_k

        # 路由器
        self.router = Router(d_model, num_experts, top_k)

        # 专家 (每个专家是一个 FFN)
        self.experts = nn.ModuleList([
            nn.Sequential(
                nn.Linear(d_model, d_ff),
                nn.GELU(),
                nn.Linear(d_ff, d_model)
            )
            for _ in range(num_experts)
        ])

    def forward(self, x):
        batch, seq_len, d_model = x.shape

        # 路由
        gates, indices = self.router(x)  # (B, S, K), (B, S, K)

        # 展平便于处理
        x_flat = x.view(-1, d_model)  # (B*S, D)
        gates_flat = gates.view(-1, self.top_k)  # (B*S, K)
        indices_flat = indices.view(-1, self.top_k)  # (B*S, K)

        # 计算输出
        output = torch.zeros_like(x_flat)

        for k in range(self.top_k):
            expert_indices = indices_flat[:, k]  # (B*S,)
            expert_gates = gates_flat[:, k:k+1]  # (B*S, 1)

            for e in range(self.num_experts):
                # 找到路由到专家 e 的 token
                mask = (expert_indices == e)
                if mask.sum() == 0:
                    continue

                expert_input = x_flat[mask]
                expert_output = self.experts[e](expert_input)
                output[mask] += expert_gates[mask] * expert_output

        return output.view(batch, seq_len, d_model)

    def load_balance_loss(self, gates, indices):
        """负载均衡损失"""
        # 计算每个专家的负载
        num_tokens = gates.numel() // self.top_k

        # f_i: 路由到专家 i 的 token 比例
        expert_counts = torch.zeros(self.num_experts, device=gates.device)
        for e in range(self.num_experts):
            expert_counts[e] = (indices == e).sum()
        f = expert_counts / num_tokens

        # P_i: 路由到专家 i 的平均概率
        P = torch.zeros(self.num_experts, device=gates.device)
        for e in range(self.num_experts):
            mask = (indices == e)
            if mask.sum() > 0:
                P[e] = gates[mask].mean()

        # 负载均衡损失: 鼓励均匀分配
        loss = self.num_experts * (f * P).sum()

        return loss

2. 训练技术

2.1 预训练

Q6: 大模型预训练的关键技术有哪些?

答案要点:

大模型预训练关键技术

1. 数据处理
┌─────────────────────────────────────────────────────────────────────┐
│  数据清洗                                                           │
│  ├─ 去重 (MinHash, SimHash)                                        │
│  ├─ 过滤低质量内容                                                 │
│  ├─ 隐私数据脱敏                                                   │
│  └─ 有害内容过滤                                                   │
│                                                                     │
│  数据配比                                                           │
│  ├─ Web 数据: ~70%                                                 │
│  ├─ 书籍: ~15%                                                     │
│  ├─ 代码: ~10%                                                     │
│  └─ 学术论文: ~5%                                                  │
└─────────────────────────────────────────────────────────────────────┘

2. 训练稳定性
┌─────────────────────────────────────────────────────────────────────┐
│  学习率调度                                                         │
│  ├─ Warmup: 线性增加到峰值                                         │
│  └─ Decay: 余弦退火或线性衰减                                      │
│                                                                     │
│  梯度处理                                                           │
│  ├─ 梯度裁剪 (Gradient Clipping)                                   │
│  ├─ 梯度累积 (Gradient Accumulation)                               │
│  └─ 梯度检查点 (Gradient Checkpointing)                            │
│                                                                     │
│  数值稳定                                                           │
│  ├─ 混合精度 (BF16 优于 FP16)                                      │
│  ├─ Loss Scaling                                                   │
│  └─ 层归一化位置 (Pre-LN vs Post-LN)                               │
└─────────────────────────────────────────────────────────────────────┘

3. 并行策略
┌─────────────────────────────────────────────────────────────────────┐
│  3D 并行                                                            │
│  ├─ 数据并行 (DP): 不同 GPU 处理不同数据                           │
│  ├─ 张量并行 (TP): 层内切分 (Megatron 风格)                        │
│  ├─ 流水线并行 (PP): 层间切分                                      │
│  └─ 专家并行 (EP): MoE 专家分布                                    │
│                                                                     │
│  示例配置 (GPT-3 175B):                                            │
│  ├─ TP = 8 (单节点 8 GPU)                                          │
│  ├─ PP = 8 (8 个流水线阶段)                                        │
│  └─ DP = 64 (64 个数据并行副本)                                    │
│  总 GPU 数 = 8 × 8 × 64 = 4096                                     │
└─────────────────────────────────────────────────────────────────────┘
"""
预训练配置示例
"""

# DeepSpeed 配置
deepspeed_config = {
    "train_batch_size": 2048,
    "train_micro_batch_size_per_gpu": 2,
    "gradient_accumulation_steps": 128,

    "optimizer": {
        "type": "AdamW",
        "params": {
            "lr": 1e-4,
            "betas": [0.9, 0.95],
            "eps": 1e-8,
            "weight_decay": 0.1
        }
    },

    "scheduler": {
        "type": "WarmupDecayLR",
        "params": {
            "warmup_min_lr": 0,
            "warmup_max_lr": 1e-4,
            "warmup_num_steps": 2000,
            "total_num_steps": 100000
        }
    },

    "fp16": {
        "enabled": True,
        "loss_scale": 0,
        "loss_scale_window": 1000,
        "initial_scale_power": 16,
        "hysteresis": 2,
        "min_loss_scale": 1
    },

    "zero_optimization": {
        "stage": 3,
        "offload_optimizer": {"device": "cpu"},
        "offload_param": {"device": "none"},
        "overlap_comm": True,
        "contiguous_gradients": True
    },

    "gradient_clipping": 1.0,

    "activation_checkpointing": {
        "partition_activations": True,
        "contiguous_memory_optimization": True
    }
}


# Megatron-LM 张量并行
class TensorParallelLinear(nn.Module):
    """张量并行线性层 (列并行)"""

    def __init__(self, in_features, out_features, tp_size, tp_rank):
        super().__init__()
        self.tp_size = tp_size
        self.tp_rank = tp_rank

        # 每个 rank 只持有 1/tp_size 的权重
        self.local_out = out_features // tp_size
        self.weight = nn.Parameter(
            torch.empty(self.local_out, in_features)
        )
        nn.init.kaiming_uniform_(self.weight)

    def forward(self, x):
        # 本地计算
        local_out = F.linear(x, self.weight)

        # AllGather 收集所有结果
        output_list = [torch.empty_like(local_out) for _ in range(self.tp_size)]
        dist.all_gather(output_list, local_out)

        return torch.cat(output_list, dim=-1)

2.2 微调与对齐

Q7: 解释 LoRA 的原理和实现?

答案要点:

LoRA (Low-Rank Adaptation) 原理

核心思想:
┌─────────────────────────────────────────────────────────────────────┐
│  预训练权重: W₀ ∈ R^(d×k)                                          │
│  微调变化:   ΔW ∈ R^(d×k)                                          │
│                                                                     │
│  假设 ΔW 是低秩的: ΔW = BA                                         │
│  其中 B ∈ R^(d×r), A ∈ R^(r×k), r << min(d, k)                    │
│                                                                     │
│  微调后: W = W₀ + BA                                               │
│                                                                     │
│  参数量对比:                                                        │
│  - 全量微调: d × k                                                 │
│  - LoRA: (d + k) × r                                               │
│  - 节省比例: r/min(d,k)                                            │
└─────────────────────────────────────────────────────────────────────┘

示意图:
┌─────────────────────────────────────────────────────────────────────┐
│                                                                     │
│          x                                                          │
│          │                                                          │
│    ┌─────┴─────┐                                                   │
│    │           │                                                   │
│    ▼           ▼                                                   │
│  ┌───┐      ┌───┐                                                  │
│  │W₀ │      │ A │  r×k                                            │
│  │   │      └─┬─┘                                                  │
│  │d×k│        │                                                    │
│  │   │        ▼                                                    │
│  │   │      ┌───┐                                                  │
│  │   │      │ B │  d×r                                            │
│  └─┬─┘      └─┬─┘                                                  │
│    │          │                                                    │
│    │    × α/r │                                                    │
│    └────┬─────┘                                                    │
│         │                                                          │
│         ▼                                                          │
│       output                                                       │
│                                                                    │
└─────────────────────────────────────────────────────────────────────┘
"""
LoRA 实现
"""

class LoRALinear(nn.Module):
    """LoRA 线性层"""

    def __init__(self, original_layer, r=8, alpha=16, dropout=0.1):
        """
        Args:
            original_layer: 原始线性层
            r: LoRA 秩
            alpha: 缩放因子
            dropout: Dropout 概率
        """
        super().__init__()

        self.original = original_layer
        self.r = r
        self.alpha = alpha
        self.scaling = alpha / r

        in_features = original_layer.in_features
        out_features = original_layer.out_features

        # 冻结原始权重
        for param in self.original.parameters():
            param.requires_grad = False

        # LoRA 矩阵
        self.lora_A = nn.Parameter(torch.zeros(r, in_features))
        self.lora_B = nn.Parameter(torch.zeros(out_features, r))

        # 初始化
        nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5))
        nn.init.zeros_(self.lora_B)  # B 初始化为 0,确保初始输出不变

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # 原始输出
        original_output = self.original(x)

        # LoRA 输出
        lora_output = self.dropout(x)
        lora_output = F.linear(lora_output, self.lora_A)
        lora_output = F.linear(lora_output, self.lora_B)

        return original_output + lora_output * self.scaling

    def merge_weights(self):
        """合并权重 (推理时使用)"""
        self.original.weight.data += self.scaling * (self.lora_B @ self.lora_A)
        return self.original


def apply_lora_to_model(model, r=8, alpha=16, target_modules=['q_proj', 'v_proj']):
    """给模型应用 LoRA"""

    for name, module in model.named_modules():
        if any(target in name for target in target_modules):
            if isinstance(module, nn.Linear):
                parent_name = '.'.join(name.split('.')[:-1])
                child_name = name.split('.')[-1]

                parent = model.get_submodule(parent_name) if parent_name else model
                lora_layer = LoRALinear(module, r=r, alpha=alpha)
                setattr(parent, child_name, lora_layer)

    return model


# 使用 PEFT 库
from peft import get_peft_model, LoraConfig, TaskType

def create_lora_model(base_model):
    """使用 PEFT 创建 LoRA 模型"""

    lora_config = LoraConfig(
        task_type=TaskType.CAUSAL_LM,
        r=8,
        lora_alpha=32,
        lora_dropout=0.1,
        target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],
        bias="none"
    )

    model = get_peft_model(base_model, lora_config)
    model.print_trainable_parameters()

    return model

Q8: RLHF 的三个阶段是什么?各有什么挑战?

答案要点:

RLHF 三阶段

Stage 1: 监督微调 (SFT)
┌─────────────────────────────────────────────────────────────────────┐
│  输入: 预训练模型 + 高质量指令数据                                   │
│  输出: 能遵循指令的模型                                             │
│                                                                     │
│  数据格式:                                                          │
│  {                                                                  │
│    "instruction": "解释量子计算的基本原理",                         │
│    "input": "",                                                     │
│    "output": "量子计算是..."                                        │
│  }                                                                  │
│                                                                     │
│  挑战:                                                              │
│  - 高质量数据获取成本高                                             │
│  - 数据多样性和覆盖度                                               │
│  - 避免遗忘预训练知识                                               │
└─────────────────────────────────────────────────────────────────────┘

Stage 2: 奖励模型训练 (RM)
┌─────────────────────────────────────────────────────────────────────┐
│  输入: SFT 模型 + 人类偏好数据 (排序)                               │
│  输出: 能评估回答质量的奖励模型                                     │
│                                                                     │
│  数据格式:                                                          │
│  {                                                                  │
│    "prompt": "写一首关于春天的诗",                                  │
│    "chosen": "春风拂面暖...",   (人类偏好)                          │
│    "rejected": "春天来了..."   (人类不偏好)                         │
│  }                                                                  │
│                                                                     │
│  训练目标: Bradley-Terry 模型                                       │
│  Loss = -log(σ(r(chosen) - r(rejected)))                           │
│                                                                     │
│  挑战:                                                              │
│  - 人类偏好不一致                                                   │
│  - 奖励 hacking                                                     │
│  - 分布外泛化                                                       │
└─────────────────────────────────────────────────────────────────────┘

Stage 3: 强化学习优化 (PPO)
┌─────────────────────────────────────────────────────────────────────┐
│  输入: SFT 模型 + 奖励模型                                          │
│  输出: 对齐后的模型                                                 │
│                                                                     │
│  目标函数:                                                          │
│  max E[r(x,y)] - β × KL(π || π_ref)                                │
│                                                                     │
│  π: 当前策略                                                        │
│  π_ref: 参考策略 (SFT 模型)                                        │
│  β: KL 惩罚系数                                                     │
│                                                                     │
│  挑战:                                                              │
│  - 训练不稳定                                                       │
│  - 奖励模型过拟合                                                   │
│  - 计算资源消耗大 (需要 4 个模型)                                   │
└─────────────────────────────────────────────────────────────────────┘
"""
RLHF 实现
"""

# Stage 2: 奖励模型
class RewardModel(nn.Module):
    def __init__(self, base_model):
        super().__init__()
        self.backbone = base_model
        self.reward_head = nn.Linear(
            base_model.config.hidden_size, 1
        )

    def forward(self, input_ids, attention_mask):
        outputs = self.backbone(
            input_ids=input_ids,
            attention_mask=attention_mask,
            output_hidden_states=True
        )

        # 取最后一个 token 的隐藏状态
        last_hidden = outputs.hidden_states[-1]
        # 找到序列的最后一个非 padding token
        sequence_lengths = attention_mask.sum(dim=1) - 1
        last_token_hidden = last_hidden[
            torch.arange(last_hidden.size(0)),
            sequence_lengths
        ]

        reward = self.reward_head(last_token_hidden)
        return reward


def train_reward_model(model, dataloader, optimizer, epochs=3):
    """训练奖励模型"""
    model.train()

    for epoch in range(epochs):
        for batch in dataloader:
            chosen_ids = batch['chosen_input_ids']
            chosen_mask = batch['chosen_attention_mask']
            rejected_ids = batch['rejected_input_ids']
            rejected_mask = batch['rejected_attention_mask']

            # 计算奖励
            r_chosen = model(chosen_ids, chosen_mask)
            r_rejected = model(rejected_ids, rejected_mask)

            # Bradley-Terry 损失
            loss = -F.logsigmoid(r_chosen - r_rejected).mean()

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()


# Stage 3: PPO
class PPOTrainer:
    def __init__(self, policy_model, ref_model, reward_model,
                 tokenizer, kl_coef=0.1):
        self.policy = policy_model
        self.ref = ref_model
        self.reward = reward_model
        self.tokenizer = tokenizer
        self.kl_coef = kl_coef

    def compute_rewards(self, query_tensors, response_tensors):
        """计算奖励 (包含 KL 惩罚)"""
        with torch.no_grad():
            # 奖励模型分数
            full_ids = torch.cat([query_tensors, response_tensors], dim=1)
            rm_scores = self.reward(full_ids)

            # KL 散度
            policy_logits = self.policy(
                response_tensors,
                attention_mask=torch.ones_like(response_tensors)
            ).logits

            ref_logits = self.ref(
                response_tensors,
                attention_mask=torch.ones_like(response_tensors)
            ).logits

            kl_div = F.kl_div(
                F.log_softmax(policy_logits, dim=-1),
                F.softmax(ref_logits, dim=-1),
                reduction='batchmean'
            )

        rewards = rm_scores - self.kl_coef * kl_div
        return rewards

    def ppo_step(self, queries, responses, old_logprobs, advantages):
        """PPO 更新步骤"""
        # 计算新的 log 概率
        new_logits = self.policy(responses).logits
        new_logprobs = F.log_softmax(new_logits, dim=-1)

        # 概率比
        ratio = torch.exp(new_logprobs - old_logprobs)

        # PPO Clipped 目标
        clip_eps = 0.2
        pg_loss1 = -advantages * ratio
        pg_loss2 = -advantages * torch.clamp(ratio, 1 - clip_eps, 1 + clip_eps)
        pg_loss = torch.max(pg_loss1, pg_loss2).mean()

        return pg_loss


# DPO (Direct Preference Optimization) - RLHF 的简化替代
def dpo_loss(policy_model, ref_model, chosen_ids, rejected_ids, beta=0.1):
    """
    DPO 直接优化偏好,无需单独的奖励模型

    Loss = -log σ(β × (log π(y_w|x)/π_ref(y_w|x) - log π(y_l|x)/π_ref(y_l|x)))
    """
    # 计算 log 概率
    policy_chosen_logps = get_logprobs(policy_model, chosen_ids)
    policy_rejected_logps = get_logprobs(policy_model, rejected_ids)

    with torch.no_grad():
        ref_chosen_logps = get_logprobs(ref_model, chosen_ids)
        ref_rejected_logps = get_logprobs(ref_model, rejected_ids)

    # DPO 损失
    chosen_rewards = beta * (policy_chosen_logps - ref_chosen_logps)
    rejected_rewards = beta * (policy_rejected_logps - ref_rejected_logps)

    loss = -F.logsigmoid(chosen_rewards - rejected_rewards).mean()

    return loss

3. 推理优化

3.1 基础问题

Q9: 解释 KV Cache 的原理和优化方法?

答案要点:

KV Cache 原理

自回归生成过程:
┌─────────────────────────────────────────────────────────────────────┐
│  Step 1: "The" → 计算 K₁, V₁                                       │
│  Step 2: "The cat" → 计算 K₂, V₂ (重复计算 K₁, V₁)                 │
│  Step 3: "The cat sat" → 计算 K₃, V₃ (重复计算 K₁, V₁, K₂, V₂)    │
│  ...                                                                │
│                                                                     │
│  问题: O(n²) 的重复计算                                             │
└─────────────────────────────────────────────────────────────────────┘

KV Cache 优化:
┌─────────────────────────────────────────────────────────────────────┐
│  缓存之前的 K, V,只计算新 token 的 K, V                            │
│                                                                     │
│  Step 1: Q₁ @ [K₁]^T → cache K₁, V₁                                │
│  Step 2: Q₂ @ [K₁, K₂]^T → 只计算 K₂, V₂                           │
│  Step 3: Q₃ @ [K₁, K₂, K₃]^T → 只计算 K₃, V₃                       │
│                                                                     │
│  显存占用: 2 × batch × num_layers × seq_len × num_heads × head_dim │
└─────────────────────────────────────────────────────────────────────┘

KV Cache 优化技术:
┌─────────────────────────────────────────────────────────────────────┐
│  1. Multi-Query Attention (MQA)                                     │
│     - 所有注意力头共享 K, V                                         │
│     - 显存降低: num_heads 倍                                        │
│                                                                     │
│  2. Grouped-Query Attention (GQA)                                   │
│     - K, V 分组共享                                                 │
│     - 平衡质量和效率                                                │
│                                                                     │
│  3. PagedAttention (vLLM)                                          │
│     - 类似操作系统的虚拟内存管理                                    │
│     - 按需分配,减少碎片                                            │
│                                                                     │
│  4. 量化 KV Cache                                                   │
│     - FP16 → INT8/INT4                                             │
│     - 显存降低 2-4 倍                                               │
└─────────────────────────────────────────────────────────────────────┘
"""
KV Cache 实现
"""

class KVCache:
    """基础 KV Cache"""

    def __init__(self, num_layers, max_batch, max_seq_len,
                 num_heads, head_dim, device, dtype=torch.float16):
        self.num_layers = num_layers
        self.max_seq_len = max_seq_len

        # 预分配缓存
        cache_shape = (num_layers, 2, max_batch, num_heads, max_seq_len, head_dim)
        self.cache = torch.zeros(cache_shape, device=device, dtype=dtype)

        # 当前序列长度
        self.seq_len = 0

    def update(self, layer_idx, k, v):
        """更新缓存"""
        # k, v: (batch, heads, seq_len, head_dim)
        new_seq_len = k.shape[2]

        self.cache[layer_idx, 0, :, :, self.seq_len:self.seq_len + new_seq_len, :] = k
        self.cache[layer_idx, 1, :, :, self.seq_len:self.seq_len + new_seq_len, :] = v

        self.seq_len += new_seq_len

    def get(self, layer_idx):
        """获取缓存的 K, V"""
        k = self.cache[layer_idx, 0, :, :, :self.seq_len, :]
        v = self.cache[layer_idx, 1, :, :, :self.seq_len, :]
        return k, v


class PagedKVCache:
    """分页 KV Cache (vLLM 风格)"""

    def __init__(self, num_layers, num_heads, head_dim,
                 block_size=16, num_blocks=1000, device='cuda'):
        self.block_size = block_size
        self.num_blocks = num_blocks

        # 物理块 (预分配的内存池)
        block_shape = (num_layers, 2, num_heads, block_size, head_dim)
        self.blocks = torch.zeros(num_blocks, *block_shape, device=device)

        # 块分配表
        self.free_blocks = list(range(num_blocks))
        self.block_tables = {}  # request_id -> [block_ids]

    def allocate(self, request_id, num_tokens):
        """为请求分配块"""
        num_blocks_needed = (num_tokens + self.block_size - 1) // self.block_size

        if len(self.free_blocks) < num_blocks_needed:
            raise MemoryError("No free blocks")

        allocated = [self.free_blocks.pop() for _ in range(num_blocks_needed)]
        self.block_tables[request_id] = allocated

        return allocated

    def free(self, request_id):
        """释放请求的块"""
        if request_id in self.block_tables:
            self.free_blocks.extend(self.block_tables.pop(request_id))

    def update(self, request_id, layer_idx, k, v, position):
        """更新指定位置的 KV"""
        block_idx = position // self.block_size
        block_offset = position % self.block_size

        physical_block = self.block_tables[request_id][block_idx]

        self.blocks[physical_block, layer_idx, 0, :, block_offset, :] = k
        self.blocks[physical_block, layer_idx, 1, :, block_offset, :] = v


# GQA (Grouped-Query Attention)
class GroupedQueryAttention(nn.Module):
    """分组查询注意力"""

    def __init__(self, d_model, num_heads, num_kv_heads):
        super().__init__()
        self.num_heads = num_heads
        self.num_kv_heads = num_kv_heads
        self.num_groups = num_heads // num_kv_heads

        self.head_dim = d_model // num_heads

        self.q_proj = nn.Linear(d_model, num_heads * self.head_dim)
        self.k_proj = nn.Linear(d_model, num_kv_heads * self.head_dim)
        self.v_proj = nn.Linear(d_model, num_kv_heads * self.head_dim)
        self.o_proj = nn.Linear(num_heads * self.head_dim, d_model)

    def forward(self, x, kv_cache=None, position=0):
        batch, seq_len, _ = x.shape

        q = self.q_proj(x).view(batch, seq_len, self.num_heads, self.head_dim)
        k = self.k_proj(x).view(batch, seq_len, self.num_kv_heads, self.head_dim)
        v = self.v_proj(x).view(batch, seq_len, self.num_kv_heads, self.head_dim)

        # 更新 KV Cache
        if kv_cache is not None:
            k, v = kv_cache.update_and_get(k, v, position)

        # 扩展 K, V 以匹配 Q 的头数
        k = k.repeat_interleave(self.num_groups, dim=2)
        v = v.repeat_interleave(self.num_groups, dim=2)

        # 注意力计算
        q = q.transpose(1, 2)
        k = k.transpose(1, 2)
        v = v.transpose(1, 2)

        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim)
        attn = F.softmax(scores, dim=-1)
        output = torch.matmul(attn, v)

        output = output.transpose(1, 2).contiguous().view(batch, seq_len, -1)
        return self.o_proj(output)

Q10: 投机解码 (Speculative Decoding) 的原理?

答案要点:

投机解码原理

标准自回归解码:
┌─────────────────────────────────────────────────────────────────────┐
│  每步只生成一个 token,受制于内存带宽                               │
│                                                                     │
│  Step 1: [prompt] → token1 (一次大模型调用)                        │
│  Step 2: [prompt, token1] → token2 (一次大模型调用)                │
│  Step 3: [prompt, token1, token2] → token3 (一次大模型调用)        │
│  ...                                                                │
│                                                                     │
│  N 个 token 需要 N 次大模型调用                                     │
└─────────────────────────────────────────────────────────────────────┘

投机解码:
┌─────────────────────────────────────────────────────────────────────┐
│  用小模型 (draft model) 快速生成多个候选 token                      │
│  用大模型 (target model) 一次验证多个 token                         │
│                                                                     │
│  Step 1: 小模型生成 K 个候选 [t1, t2, t3, t4]                      │
│  Step 2: 大模型并行验证 → 接受 [t1, t2, t3], 拒绝 t4               │
│  Step 3: 大模型在拒绝位置采样正确 token                             │
│                                                                     │
│  加速比: 接受率 × K / (K + 1)                                       │
└─────────────────────────────────────────────────────────────────────┘

验证算法 (保证输出分布不变):
┌─────────────────────────────────────────────────────────────────────┐
│  对每个候选 token t:                                                │
│                                                                     │
│  p = target_model_prob(t)                                          │
│  q = draft_model_prob(t)                                           │
│                                                                     │
│  if random() < min(1, p/q):                                        │
│      accept t                                                       │
│  else:                                                              │
│      reject t, resample from (p - q)+                              │
└─────────────────────────────────────────────────────────────────────┘
"""
投机解码实现
"""

class SpeculativeDecoder:
    """投机解码器"""

    def __init__(self, target_model, draft_model, tokenizer,
                 num_speculative_tokens=4):
        self.target = target_model
        self.draft = draft_model
        self.tokenizer = tokenizer
        self.K = num_speculative_tokens

    @torch.no_grad()
    def generate(self, input_ids, max_new_tokens):
        """投机生成"""
        generated = input_ids.clone()

        while generated.shape[1] - input_ids.shape[1] < max_new_tokens:
            # 1. Draft 阶段: 小模型生成 K 个候选
            draft_tokens = self._draft_tokens(generated)

            # 2. Verify 阶段: 大模型验证
            accepted_tokens, next_token = self._verify_tokens(
                generated, draft_tokens
            )

            # 3. 更新生成序列
            generated = torch.cat([generated, accepted_tokens, next_token], dim=1)

        return generated

    def _draft_tokens(self, prefix):
        """用小模型生成候选 token"""
        draft_tokens = []
        draft_input = prefix.clone()

        for _ in range(self.K):
            logits = self.draft(draft_input).logits[:, -1, :]
            probs = F.softmax(logits, dim=-1)

            # 采样
            token = torch.multinomial(probs, num_samples=1)
            draft_tokens.append(token)
            draft_input = torch.cat([draft_input, token], dim=1)

        return torch.cat(draft_tokens, dim=1)  # (batch, K)

    def _verify_tokens(self, prefix, draft_tokens):
        """用大模型验证"""
        batch_size = prefix.shape[0]

        # 拼接 prefix 和 draft tokens
        full_input = torch.cat([prefix, draft_tokens], dim=1)

        # 大模型前向 (一次性计算所有位置的概率)
        target_logits = self.target(full_input).logits

        # 小模型概率 (重新计算以获取完整分布)
        draft_logits = self.draft(full_input).logits

        # 验证每个位置
        accepted = []
        prefix_len = prefix.shape[1]

        for i in range(self.K):
            pos = prefix_len + i
            draft_token = draft_tokens[:, i:i+1]

            # 目标模型在 pos-1 位置对 pos token 的概率
            target_probs = F.softmax(target_logits[:, pos-1, :], dim=-1)
            draft_probs = F.softmax(draft_logits[:, pos-1, :], dim=-1)

            p = target_probs.gather(1, draft_token).squeeze(-1)
            q = draft_probs.gather(1, draft_token).squeeze(-1)

            # 接受概率
            accept_prob = torch.minimum(torch.ones_like(p), p / (q + 1e-10))
            accept_mask = torch.rand_like(accept_prob) < accept_prob

            if accept_mask.all():
                accepted.append(draft_token)
            else:
                # 某个位置被拒绝,从调整分布中采样
                adjusted_probs = torch.clamp(target_probs - draft_probs, min=0)
                adjusted_probs = adjusted_probs / adjusted_probs.sum(dim=-1, keepdim=True)
                next_token = torch.multinomial(adjusted_probs, num_samples=1)
                break
        else:
            # 所有候选都被接受,从大模型采样下一个
            next_probs = F.softmax(target_logits[:, -1, :], dim=-1)
            next_token = torch.multinomial(next_probs, num_samples=1)

        accepted_tokens = torch.cat(accepted, dim=1) if accepted else torch.empty(batch_size, 0, dtype=torch.long, device=prefix.device)

        return accepted_tokens, next_token

4. 应用开发

4.1 RAG 系统

Q11: 设计一个高质量的 RAG 系统需要考虑哪些方面?

答案要点:

RAG 系统设计要点

1. 文档处理
┌─────────────────────────────────────────────────────────────────────┐
│  分块策略                                                           │
│  ├─ 固定大小分块 (简单但可能切断语义)                               │
│  ├─ 语义分块 (按段落/句子边界)                                      │
│  ├─ 递归分块 (层次结构)                                             │
│  └─ 滑动窗口 (带重叠)                                               │
│                                                                     │
│  元数据提取                                                         │
│  ├─ 文档标题、章节                                                  │
│  ├─ 创建时间、来源                                                  │
│  └─ 实体、关键词                                                    │
└─────────────────────────────────────────────────────────────────────┘

2. 检索优化
┌─────────────────────────────────────────────────────────────────────┐
│  混合检索                                                           │
│  ├─ 向量检索 (语义相似)                                             │
│  ├─ BM25 (关键词匹配)                                               │
│  └─ 融合排序 (RRF)                                                  │
│                                                                     │
│  检索增强                                                           │
│  ├─ Query Expansion (扩展查询)                                      │
│  ├─ HyDE (假设文档嵌入)                                             │
│  ├─ Multi-Query (多角度查询)                                        │
│  └─ Reranking (重排序)                                              │
└─────────────────────────────────────────────────────────────────────┘

3. 生成优化
┌─────────────────────────────────────────────────────────────────────┐
│  上下文构建                                                         │
│  ├─ 相关性排序                                                      │
│  ├─ 去重和合并                                                      │
│  └─ 长度控制                                                        │
│                                                                     │
│  生成控制                                                           │
│  ├─ 引用标注                                                        │
│  ├─ 幻觉检测                                                        │
│  └─ 答案验证                                                        │
└─────────────────────────────────────────────────────────────────────┘
"""
RAG 系统实现
"""

class AdvancedRAG:
    """高级 RAG 系统"""

    def __init__(self, embedding_model, llm, vector_store):
        self.embedding_model = embedding_model
        self.llm = llm
        self.vector_store = vector_store
        self.bm25 = None

    def index_documents(self, documents, chunk_size=512, overlap=50):
        """索引文档"""
        chunks = []

        for doc in documents:
            # 语义分块
            doc_chunks = self._semantic_chunking(doc, chunk_size, overlap)
            chunks.extend(doc_chunks)

        # 向量化
        embeddings = self.embedding_model.encode([c['text'] for c in chunks])

        # 存储
        self.vector_store.add(embeddings, chunks)

        # 构建 BM25 索引
        from rank_bm25 import BM25Okapi
        tokenized = [c['text'].split() for c in chunks]
        self.bm25 = BM25Okapi(tokenized)
        self.bm25_chunks = chunks

    def _semantic_chunking(self, document, chunk_size, overlap):
        """语义分块"""
        # 按段落分割
        paragraphs = document['text'].split('\n\n')

        chunks = []
        current_chunk = ""

        for para in paragraphs:
            if len(current_chunk) + len(para) < chunk_size:
                current_chunk += para + "\n\n"
            else:
                if current_chunk:
                    chunks.append({
                        'text': current_chunk.strip(),
                        'metadata': document.get('metadata', {})
                    })
                current_chunk = para + "\n\n"

        if current_chunk:
            chunks.append({
                'text': current_chunk.strip(),
                'metadata': document.get('metadata', {})
            })

        return chunks

    def retrieve(self, query, top_k=5, use_hybrid=True):
        """混合检索"""
        results = []

        # 向量检索
        query_embedding = self.embedding_model.encode([query])[0]
        vector_results = self.vector_store.search(query_embedding, top_k=top_k)

        if use_hybrid and self.bm25:
            # BM25 检索
            tokenized_query = query.split()
            bm25_scores = self.bm25.get_scores(tokenized_query)
            bm25_top_k = sorted(
                enumerate(bm25_scores),
                key=lambda x: x[1],
                reverse=True
            )[:top_k]

            # RRF 融合
            results = self._reciprocal_rank_fusion(
                vector_results, bm25_top_k
            )
        else:
            results = vector_results

        return results

    def _reciprocal_rank_fusion(self, vector_results, bm25_results, k=60):
        """倒数排名融合"""
        scores = {}

        for rank, (doc_id, score) in enumerate(vector_results):
            scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1)

        for rank, (doc_idx, bm25_score) in enumerate(bm25_results):
            doc_id = f"bm25_{doc_idx}"
            scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1)

        # 排序
        sorted_results = sorted(scores.items(), key=lambda x: x[1], reverse=True)
        return sorted_results

    def query_expansion(self, query):
        """查询扩展"""
        prompt = f"""Given the following query, generate 3 alternative versions that might help find relevant information:

Query: {query}

Alternative queries:
1."""

        expanded = self.llm.generate(prompt, max_tokens=100)
        queries = [query] + [q.strip() for q in expanded.split('\n') if q.strip()]
        return queries

    def generate_answer(self, query, contexts, with_citations=True):
        """生成答案"""
        # 构建上下文
        context_str = "\n\n".join([
            f"[{i+1}] {ctx['text']}"
            for i, ctx in enumerate(contexts)
        ])

        if with_citations:
            prompt = f"""Based on the following context, answer the question. Include citations [1], [2], etc. when using information from the context.

Context:
{context_str}

Question: {query}

Answer with citations:"""
        else:
            prompt = f"""Based on the following context, answer the question.

Context:
{context_str}

Question: {query}

Answer:"""

        answer = self.llm.generate(prompt, max_tokens=500)

        # 验证答案
        if with_citations:
            answer = self._validate_citations(answer, len(contexts))

        return answer

    def _validate_citations(self, answer, num_contexts):
        """验证引用"""
        import re
        citations = re.findall(r'\[(\d+)\]', answer)

        # 检查引用是否有效
        invalid_citations = [c for c in citations if int(c) > num_contexts]
        if invalid_citations:
            # 移除无效引用
            for c in invalid_citations:
                answer = answer.replace(f'[{c}]', '')

        return answer

4.2 Agent 系统

Q12: 如何设计一个可靠的 LLM Agent 系统?

答案要点:

Agent 系统设计

核心组件:
┌─────────────────────────────────────────────────────────────────────┐
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                      规划器 (Planner)                        │   │
│  │  - 任务分解                                                  │   │
│  │  - 目标设定                                                  │   │
│  │  - 策略选择                                                  │   │
│  └──────────────────────────┬──────────────────────────────────┘   │
│                              │                                      │
│  ┌──────────────────────────┴──────────────────────────────────┐   │
│  │                      执行器 (Executor)                       │   │
│  │                                                              │   │
│  │   ┌──────────┐  ┌──────────┐  ┌──────────┐                 │   │
│  │   │  Tool 1  │  │  Tool 2  │  │  Tool N  │                 │   │
│  │   │ (搜索)   │  │ (计算)   │  │ (API)    │                 │   │
│  │   └──────────┘  └──────────┘  └──────────┘                 │   │
│  │                                                              │   │
│  └──────────────────────────┬──────────────────────────────────┘   │
│                              │                                      │
│  ┌──────────────────────────┴──────────────────────────────────┐   │
│  │                      记忆系统 (Memory)                       │   │
│  │  - 短期记忆 (对话上下文)                                     │   │
│  │  - 长期记忆 (向量存储)                                       │   │
│  │  - 工作记忆 (当前任务状态)                                   │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

可靠性设计:
┌─────────────────────────────────────────────────────────────────────┐
│  1. 错误处理                                                        │
│     - 工具调用失败重试                                              │
│     - 超时处理                                                      │
│     - 回退策略                                                      │
│                                                                     │
│  2. 安全机制                                                        │
│     - 工具权限控制                                                  │
│     - 输入验证                                                      │
│     - 输出审核                                                      │
│                                                                     │
│  3. 可观测性                                                        │
│     - 执行轨迹记录                                                  │
│     - 决策日志                                                      │
│     - 性能监控                                                      │
└─────────────────────────────────────────────────────────────────────┘
"""
Agent 系统实现
"""

from dataclasses import dataclass
from typing import List, Dict, Any, Callable, Optional
from enum import Enum

class ActionStatus(Enum):
    SUCCESS = "success"
    FAILED = "failed"
    PENDING = "pending"

@dataclass
class Tool:
    name: str
    description: str
    function: Callable
    parameters: Dict[str, Any]

@dataclass
class Action:
    tool_name: str
    parameters: Dict[str, Any]
    thought: str

@dataclass
class Observation:
    result: Any
    status: ActionStatus
    error: Optional[str] = None


class ReActAgent:
    """ReAct 风格的 Agent"""

    def __init__(self, llm, tools: List[Tool], max_iterations=10):
        self.llm = llm
        self.tools = {t.name: t for t in tools}
        self.max_iterations = max_iterations
        self.memory = []

    def run(self, task: str) -> str:
        """执行任务"""
        self.memory = [{"role": "user", "content": task}]

        for i in range(self.max_iterations):
            # 思考下一步
            action = self._think()

            if action is None:
                # 任务完成
                return self._extract_final_answer()

            # 执行动作
            observation = self._execute(action)

            # 更新记忆
            self.memory.append({
                "role": "assistant",
                "content": f"Thought: {action.thought}\nAction: {action.tool_name}\nAction Input: {action.parameters}"
            })
            self.memory.append({
                "role": "observation",
                "content": f"Observation: {observation.result}"
            })

            # 检查是否失败
            if observation.status == ActionStatus.FAILED:
                # 尝试恢复
                recovery_action = self._recover_from_failure(observation.error)
                if recovery_action:
                    continue
                else:
                    return f"Task failed: {observation.error}"

        return "Max iterations reached"

    def _think(self) -> Optional[Action]:
        """生成下一步动作"""
        tools_desc = "\n".join([
            f"- {t.name}: {t.description}"
            for t in self.tools.values()
        ])

        prompt = f"""You are a helpful assistant that can use tools to complete tasks.

Available tools:
{tools_desc}

Based on the conversation history, decide the next action.
If the task is complete, respond with "Final Answer: [your answer]"
Otherwise, respond with:
Thought: [your reasoning]
Action: [tool name]
Action Input: [parameters as JSON]

History:
{self._format_memory()}

Next step:"""

        response = self.llm.generate(prompt, max_tokens=500)

        return self._parse_action(response)

    def _parse_action(self, response: str) -> Optional[Action]:
        """解析 LLM 响应"""
        if "Final Answer:" in response:
            return None

        # 提取 Thought, Action, Action Input
        import re
        import json

        thought_match = re.search(r'Thought:\s*(.+?)(?=Action:|$)', response, re.DOTALL)
        action_match = re.search(r'Action:\s*(\w+)', response)
        input_match = re.search(r'Action Input:\s*({.+})', response, re.DOTALL)

        if not action_match:
            return None

        thought = thought_match.group(1).strip() if thought_match else ""
        tool_name = action_match.group(1)

        try:
            parameters = json.loads(input_match.group(1)) if input_match else {}
        except json.JSONDecodeError:
            parameters = {}

        return Action(tool_name=tool_name, parameters=parameters, thought=thought)

    def _execute(self, action: Action) -> Observation:
        """执行动作"""
        tool = self.tools.get(action.tool_name)

        if not tool:
            return Observation(
                result=None,
                status=ActionStatus.FAILED,
                error=f"Unknown tool: {action.tool_name}"
            )

        try:
            result = tool.function(**action.parameters)
            return Observation(result=result, status=ActionStatus.SUCCESS)
        except Exception as e:
            return Observation(
                result=None,
                status=ActionStatus.FAILED,
                error=str(e)
            )

    def _recover_from_failure(self, error: str) -> Optional[Action]:
        """从失败中恢复"""
        prompt = f"""The previous action failed with error: {error}

How should we proceed? Options:
1. Retry with different parameters
2. Try a different approach
3. Give up

Respond with your decision and next action if applicable."""

        response = self.llm.generate(prompt, max_tokens=200)

        if "give up" in response.lower():
            return None

        return self._parse_action(response)

    def _format_memory(self) -> str:
        """格式化记忆"""
        return "\n".join([
            f"{m['role']}: {m['content']}"
            for m in self.memory
        ])

    def _extract_final_answer(self) -> str:
        """提取最终答案"""
        last_response = self.memory[-1]['content'] if self.memory else ""
        if "Final Answer:" in last_response:
            return last_response.split("Final Answer:")[-1].strip()
        return last_response


# 工具定义示例
def search_tool(query: str) -> str:
    """搜索工具"""
    # 实际实现会调用搜索 API
    return f"Search results for '{query}': ..."

def calculator_tool(expression: str) -> float:
    """计算器工具"""
    return eval(expression)

# 创建 Agent
tools = [
    Tool(
        name="search",
        description="Search the web for information",
        function=search_tool,
        parameters={"query": "string"}
    ),
    Tool(
        name="calculator",
        description="Evaluate mathematical expressions",
        function=calculator_tool,
        parameters={"expression": "string"}
    )
]

# agent = ReActAgent(llm, tools)
# result = agent.run("What is the population of China times 2?")

总结

本章覆盖了大模型相关的核心面试题:

  1. Transformer 架构:Self-Attention、位置编码、Flash Attention、MoE
  2. 训练技术:预训练、LoRA、RLHF/DPO
  3. 推理优化:KV Cache、投机解码
  4. 应用开发:RAG、Agent

面试准备建议:

  • 深入理解原理,而非只记公式
  • 能够手写核心代码实现
  • 了解最新进展和工程实践
  • 准备好讨论实际项目经验
Prev
01-AI基础设施核心面试题
Next
03-系统设计面试题