02-大模型面试题
概述
本章聚焦大语言模型 (LLM) 相关的面试题,涵盖模型架构、训练技术、推理优化、应用开发等核心主题。
1. Transformer 架构
1.1 基础问题
Q1: 详细解释 Self-Attention 机制的计算过程?
Self-Attention 计算流程
输入: X ∈ R^(n×d) (n: 序列长度, d: 维度)
Step 1: 计算 Q, K, V
┌─────────────────────────────────────────────────────┐
│ Q = X × W_Q (Query) │
│ K = X × W_K (Key) │
│ V = X × W_V (Value) │
│ │
│ 其中 W_Q, W_K, W_V ∈ R^(d×d_k) │
└─────────────────────────────────────────────────────┘
Step 2: 计算注意力分数
┌─────────────────────────────────────────────────────┐
│ Q × K^T │
│ Scores = ───────────────── │
│ √d_k │
│ │
│ Scores ∈ R^(n×n) │
│ 除以 √d_k 防止梯度消失 (点积值过大导致 softmax 饱和) │
└─────────────────────────────────────────────────────┘
Step 3: Softmax 归一化
┌─────────────────────────────────────────────────────┐
│ Attention_weights = softmax(Scores) │
│ │
│ 每行和为 1,表示对其他位置的关注程度 │
└─────────────────────────────────────────────────────┘
Step 4: 加权求和
┌─────────────────────────────────────────────────────┐
│ Output = Attention_weights × V │
│ │
│ Output ∈ R^(n×d_k) │
└─────────────────────────────────────────────────────┘
"""
Self-Attention 实现
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
class SelfAttention(nn.Module):
"""单头自注意力"""
def __init__(self, d_model, d_k=None):
super().__init__()
self.d_k = d_k or d_model
self.W_q = nn.Linear(d_model, self.d_k, bias=False)
self.W_k = nn.Linear(d_model, self.d_k, bias=False)
self.W_v = nn.Linear(d_model, self.d_k, bias=False)
def forward(self, x, mask=None):
"""
Args:
x: (batch, seq_len, d_model)
mask: (batch, seq_len, seq_len) or (seq_len, seq_len)
"""
Q = self.W_q(x) # (batch, seq_len, d_k)
K = self.W_k(x)
V = self.W_v(x)
# 计算注意力分数
scores = torch.matmul(Q, K.transpose(-2, -1)) # (batch, seq_len, seq_len)
scores = scores / math.sqrt(self.d_k)
# 应用 mask (因果注意力或 padding mask)
if mask is not None:
scores = scores.masked_fill(mask == 0, float('-inf'))
# Softmax
attention_weights = F.softmax(scores, dim=-1)
# 加权求和
output = torch.matmul(attention_weights, V)
return output, attention_weights
class MultiHeadAttention(nn.Module):
"""多头注意力"""
def __init__(self, d_model, num_heads):
super().__init__()
assert d_model % num_heads == 0
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
def forward(self, x, mask=None):
batch_size, seq_len, _ = x.shape
# 线性变换并分头
Q = self.W_q(x).view(batch_size, seq_len, self.num_heads, self.d_k)
K = self.W_k(x).view(batch_size, seq_len, self.num_heads, self.d_k)
V = self.W_v(x).view(batch_size, seq_len, self.num_heads, self.d_k)
# 转置: (batch, heads, seq_len, d_k)
Q = Q.transpose(1, 2)
K = K.transpose(1, 2)
V = V.transpose(1, 2)
# 注意力计算
scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
if mask is not None:
scores = scores.masked_fill(mask == 0, float('-inf'))
attention = F.softmax(scores, dim=-1)
context = torch.matmul(attention, V)
# 合并多头
context = context.transpose(1, 2).contiguous()
context = context.view(batch_size, seq_len, self.d_model)
# 输出投影
output = self.W_o(context)
return output
Q2: 为什么 Transformer 需要位置编码?有哪些位置编码方法?
答案要点:
位置编码的必要性
Self-Attention 的特点:
┌─────────────────────────────────────────────────────┐
│ Attention(Q, K, V) 是置换等变的 (Permutation Equivariant)
│ │
│ 即: Attention(Perm(X)) = Perm(Attention(X)) │
│ │
│ 这意味着打乱输入顺序,输出也相应打乱 │
│ 模型无法区分 "猫吃鱼" 和 "鱼吃猫" │
└─────────────────────────────────────────────────────┘
位置编码方法:
1. 正弦位置编码 (Sinusoidal)
┌─────────────────────────────────────────────────────┐
│ PE(pos, 2i) = sin(pos / 10000^(2i/d)) │
│ PE(pos, 2i+1) = cos(pos / 10000^(2i/d)) │
│ │
│ 优点: 可外推到更长序列 │
│ 缺点: 固定编码,无法学习 │
└─────────────────────────────────────────────────────┘
2. 可学习位置编码 (Learned)
┌─────────────────────────────────────────────────────┐
│ PE = Embedding(position_ids) │
│ │
│ 优点: 可学习最优表示 │
│ 缺点: 无法外推,最大长度固定 │
└─────────────────────────────────────────────────────┘
3. 旋转位置编码 (RoPE)
┌─────────────────────────────────────────────────────┐
│ 将位置信息编码到 Q/K 的旋转中 │
│ q_m · k_n = f(q, m) · f(k, n) = g(q, k, m-n) │
│ │
│ 优点: 相对位置编码,可外推 │
│ 应用: LLaMA, Qwen 等 │
└─────────────────────────────────────────────────────┘
4. ALiBi (Attention with Linear Biases)
┌─────────────────────────────────────────────────────┐
│ 直接在注意力分数上加线性偏置 │
│ Scores[i,j] -= m × |i - j| │
│ │
│ 优点: 简单高效,外推性好 │
│ 应用: BLOOM, MPT 等 │
└─────────────────────────────────────────────────────┘
"""
位置编码实现
"""
# 1. 正弦位置编码
class SinusoidalPositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super().__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(
torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)
)
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe.unsqueeze(0))
def forward(self, x):
return x + self.pe[:, :x.size(1)]
# 2. 旋转位置编码 (RoPE)
class RotaryPositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=2048, base=10000):
super().__init__()
self.d_model = d_model
# 计算频率
inv_freq = 1.0 / (base ** (torch.arange(0, d_model, 2).float() / d_model))
self.register_buffer('inv_freq', inv_freq)
# 预计算
self._build_cache(max_len)
def _build_cache(self, seq_len):
t = torch.arange(seq_len, device=self.inv_freq.device)
freqs = torch.outer(t, self.inv_freq)
emb = torch.cat((freqs, freqs), dim=-1)
self.register_buffer('cos_cached', emb.cos())
self.register_buffer('sin_cached', emb.sin())
def _rotate_half(self, x):
x1, x2 = x[..., :x.shape[-1]//2], x[..., x.shape[-1]//2:]
return torch.cat((-x2, x1), dim=-1)
def forward(self, q, k, seq_len):
cos = self.cos_cached[:seq_len].unsqueeze(0).unsqueeze(0)
sin = self.sin_cached[:seq_len].unsqueeze(0).unsqueeze(0)
q_embed = (q * cos) + (self._rotate_half(q) * sin)
k_embed = (k * cos) + (self._rotate_half(k) * sin)
return q_embed, k_embed
# 3. ALiBi
class ALiBiAttention(nn.Module):
def __init__(self, num_heads):
super().__init__()
# 每个头有不同的斜率
slopes = self._get_slopes(num_heads)
self.register_buffer('slopes', slopes)
def _get_slopes(self, n_heads):
"""计算 ALiBi 斜率"""
def get_slopes_power_of_2(n):
start = 2 ** (-(2 ** -(math.log2(n) - 3)))
ratio = start
return [start * ratio ** i for i in range(n)]
if math.log2(n_heads).is_integer():
return torch.tensor(get_slopes_power_of_2(n_heads))
else:
closest_power_of_2 = 2 ** math.floor(math.log2(n_heads))
return torch.tensor(
get_slopes_power_of_2(closest_power_of_2) +
get_slopes_power_of_2(2 * closest_power_of_2)[0::2][:n_heads - closest_power_of_2]
)
def forward(self, attention_scores, seq_len):
# 构建位置偏置矩阵
positions = torch.arange(seq_len, device=attention_scores.device)
relative_positions = positions.unsqueeze(0) - positions.unsqueeze(1)
# 应用斜率
alibi = self.slopes.unsqueeze(1).unsqueeze(1) * relative_positions.abs().unsqueeze(0)
return attention_scores - alibi
Q3: 解释 Flash Attention 的原理和优势?
答案要点:
标准 Attention vs Flash Attention
标准 Attention 的内存瓶颈:
┌─────────────────────────────────────────────────────┐
│ 1. 计算 S = QK^T 需要存储 O(N²) 的矩阵 │
│ 2. 计算 P = softmax(S) 需要存储 O(N²) 的矩阵 │
│ 3. 计算 O = PV 最终输出 │
│ │
│ 总内存: O(N²) │
│ HBM 带宽成为瓶颈 │
└─────────────────────────────────────────────────────┘
GPU 内存层次:
┌─────────────────────────────────────────────────────┐
│ SRAM (片上): ~20MB, ~19TB/s │
│ ↕ (快速) │
│ HBM (显存): ~40GB, ~1.5TB/s │
│ ↕ (慢速) │
│ CPU RAM │
└─────────────────────────────────────────────────────┘
Flash Attention 核心思想:分块计算,减少 HBM 访问
┌─────────────────────────────────────────────────────┐
│ 将 Q, K, V 分成小块,在 SRAM 中完成计算 │
│ │
│ for block_q in Q_blocks: │
│ for block_k in K_blocks: │
│ # 在 SRAM 中计算 │
│ S_block = block_q @ block_k.T │
│ # 增量更新 softmax 和输出 │
│ update_output(O, S_block) │
│ │
│ 关键技巧: Online Softmax (增量计算 softmax) │
└─────────────────────────────────────────────────────┘
"""
Flash Attention 简化实现 (展示核心思想)
"""
def flash_attention_forward(Q, K, V, block_size=64):
"""
Flash Attention 前向传播
核心: 分块计算 + Online Softmax
"""
batch, heads, seq_len, d = Q.shape
# 输出和统计量
O = torch.zeros_like(Q)
L = torch.zeros(batch, heads, seq_len, 1, device=Q.device) # log-sum-exp
M = torch.full((batch, heads, seq_len, 1), float('-inf'), device=Q.device) # max
# 分块
num_blocks = (seq_len + block_size - 1) // block_size
for block_j in range(num_blocks):
# K, V 块
j_start = block_j * block_size
j_end = min(j_start + block_size, seq_len)
K_j = K[:, :, j_start:j_end, :]
V_j = V[:, :, j_start:j_end, :]
for block_i in range(num_blocks):
# Q 块
i_start = block_i * block_size
i_end = min(i_start + block_size, seq_len)
Q_i = Q[:, :, i_start:i_end, :]
# 计算注意力分数
S_ij = torch.matmul(Q_i, K_j.transpose(-2, -1)) / math.sqrt(d)
# 因果 mask
if j_start > i_end - 1: # 这个 K 块完全在 Q 块之后
continue
# Online Softmax 更新
M_i = M[:, :, i_start:i_end, :]
L_i = L[:, :, i_start:i_end, :]
O_i = O[:, :, i_start:i_end, :]
# 新的 max
M_ij = S_ij.max(dim=-1, keepdim=True).values
M_new = torch.maximum(M_i, M_ij)
# 更新 exp 和 sum
exp_old = torch.exp(M_i - M_new)
exp_new = torch.exp(S_ij - M_new)
L_new = exp_old * L_i + exp_new.sum(dim=-1, keepdim=True)
# 更新输出
O_new = (exp_old * O_i + torch.matmul(exp_new, V_j)) / L_new
# 写回
M[:, :, i_start:i_end, :] = M_new
L[:, :, i_start:i_end, :] = L_new
O[:, :, i_start:i_end, :] = O_new
return O
# 实际使用 Flash Attention
from flash_attn import flash_attn_func
def use_flash_attention(q, k, v, causal=True):
"""
使用 Flash Attention 库
要求:
- q, k, v: (batch, seq_len, heads, head_dim)
- 半精度 (fp16/bf16)
"""
output = flash_attn_func(
q, k, v,
causal=causal,
softmax_scale=1.0 / math.sqrt(q.shape[-1])
)
return output
1.2 进阶问题
Q4: 比较 GPT 和 BERT 的架构差异?各适用什么场景?
答案要点:
GPT vs BERT 架构对比
┌─────────────────────────────────────────────────────────────────────┐
│ GPT (Decoder-Only) │
├─────────────────────────────────────────────────────────────────────┤
│ 注意力模式: 因果注意力 (Causal Attention) │
│ │
│ Token 1: [●][ ][ ][ ][ ] 只能看到自己 │
│ Token 2: [●][●][ ][ ][ ] 能看到 1 │
│ Token 3: [●][●][●][ ][ ] 能看到 1,2 │
│ Token 4: [●][●][●][●][ ] 能看到 1,2,3 │
│ │
│ 训练目标: 下一个 token 预测 (Next Token Prediction) │
│ P(x_t | x_1, x_2, ..., x_{t-1}) │
│ │
│ 适用场景: 文本生成、对话、代码生成 │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ BERT (Encoder-Only) │
├─────────────────────────────────────────────────────────────────────┤
│ 注意力模式: 双向注意力 (Bidirectional Attention) │
│ │
│ Token 1: [●][●][●][●][●] 能看到所有 │
│ Token 2: [●][●][●][●][●] 能看到所有 │
│ Token 3: [●][●][●][●][●] 能看到所有 │
│ │
│ 训练目标: 掩码语言模型 (Masked Language Model) │
│ P(x_mask | x_1, ..., x_{mask-1}, x_{mask+1}, ..., x_n) │
│ │
│ 适用场景: 文本分类、NER、问答、语义相似度 │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ T5 (Encoder-Decoder) │
├─────────────────────────────────────────────────────────────────────┤
│ Encoder: 双向注意力处理输入 │
│ Decoder: 因果注意力 + Cross-Attention │
│ │
│ 训练目标: Span Corruption + Seq2Seq │
│ │
│ 适用场景: 翻译、摘要、问答 │
└─────────────────────────────────────────────────────────────────────┘
"""
不同架构的实现对比
"""
# GPT 风格 (Decoder-Only)
class GPTBlock(nn.Module):
def __init__(self, d_model, num_heads, d_ff):
super().__init__()
self.ln1 = nn.LayerNorm(d_model)
self.attn = CausalSelfAttention(d_model, num_heads)
self.ln2 = nn.LayerNorm(d_model)
self.mlp = MLP(d_model, d_ff)
def forward(self, x):
# Pre-LN 架构
x = x + self.attn(self.ln1(x))
x = x + self.mlp(self.ln2(x))
return x
class CausalSelfAttention(nn.Module):
def __init__(self, d_model, num_heads):
super().__init__()
self.mha = MultiHeadAttention(d_model, num_heads)
def forward(self, x):
seq_len = x.size(1)
# 因果 mask
mask = torch.tril(torch.ones(seq_len, seq_len, device=x.device))
return self.mha(x, mask=mask)
# BERT 风格 (Encoder-Only)
class BERTBlock(nn.Module):
def __init__(self, d_model, num_heads, d_ff):
super().__init__()
self.attn = nn.MultiheadAttention(d_model, num_heads)
self.ln1 = nn.LayerNorm(d_model)
self.mlp = MLP(d_model, d_ff)
self.ln2 = nn.LayerNorm(d_model)
def forward(self, x, attention_mask=None):
# Post-LN 架构 (原始 BERT)
attn_out, _ = self.attn(x, x, x, key_padding_mask=attention_mask)
x = self.ln1(x + attn_out)
x = self.ln2(x + self.mlp(x))
return x
# T5 风格 (Encoder-Decoder)
class T5Block(nn.Module):
def __init__(self, d_model, num_heads, d_ff, is_decoder=False):
super().__init__()
self.is_decoder = is_decoder
self.self_attn = MultiHeadAttention(d_model, num_heads)
self.ln1 = nn.LayerNorm(d_model)
if is_decoder:
self.cross_attn = MultiHeadAttention(d_model, num_heads)
self.ln2 = nn.LayerNorm(d_model)
self.mlp = MLP(d_model, d_ff)
self.ln_final = nn.LayerNorm(d_model)
def forward(self, x, encoder_output=None, causal_mask=None):
# Self-Attention
x = x + self.self_attn(self.ln1(x), mask=causal_mask)
# Cross-Attention (仅 Decoder)
if self.is_decoder and encoder_output is not None:
x = x + self.cross_attn(
self.ln2(x),
encoder_output,
encoder_output
)
# FFN
x = x + self.mlp(self.ln_final(x))
return x
Q5: MoE (Mixture of Experts) 架构的原理和挑战?
答案要点:
MoE 架构
基本原理:
┌─────────────────────────────────────────────────────────────────────┐
│ │
│ 输入 x ──▶ Router ──▶ 选择 Top-K 专家 │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ Expert 1 │ Expert 2 │ ... │ Expert N │ │
│ │ (FFN) │ (FFN) │ │ (FFN) │ │
│ └──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 加权求和 ──▶ 输出 │
│ │
│ y = Σ g(x)_i × Expert_i(x) │
│ 其中 g(x) 是 router 输出的权重 │
└─────────────────────────────────────────────────────────────────────┘
优势:
┌─────────────────────────────────────────────────────────────────────┐
│ 1. 参数量大但计算量可控 │
│ - 总参数: N × 专家参数 │
│ - 激活参数: K × 专家参数 (K << N) │
│ │
│ 2. 专家分工 │
│ - 不同专家学习不同的知识/能力 │
└─────────────────────────────────────────────────────────────────────┘
挑战:
┌─────────────────────────────────────────────────────────────────────┐
│ 1. 负载不均衡 │
│ - 某些专家被频繁选择,其他专家闲置 │
│ - 解决: 辅助损失 (Auxiliary Loss) │
│ │
│ 2. 训练不稳定 │
│ - Router 梯度稀疏 │
│ - 解决: Expert Choice, Token Choice 等策略 │
│ │
│ 3. 分布式通信 │
│ - 专家分布在不同 GPU,需要 All-to-All 通信 │
│ - 解决: 专家并行 + 优化通信 │
└─────────────────────────────────────────────────────────────────────┘
"""
MoE 实现
"""
class Router(nn.Module):
"""Top-K 路由器"""
def __init__(self, d_model, num_experts, top_k=2):
super().__init__()
self.top_k = top_k
self.num_experts = num_experts
self.gate = nn.Linear(d_model, num_experts, bias=False)
def forward(self, x):
"""
Args:
x: (batch, seq_len, d_model)
Returns:
gates: (batch, seq_len, top_k) - 权重
indices: (batch, seq_len, top_k) - 专家索引
"""
# 计算路由概率
logits = self.gate(x) # (batch, seq_len, num_experts)
# Top-K 选择
top_k_logits, top_k_indices = torch.topk(logits, self.top_k, dim=-1)
# Softmax 归一化 (只在选中的专家上)
top_k_gates = F.softmax(top_k_logits, dim=-1)
return top_k_gates, top_k_indices
class MoELayer(nn.Module):
"""MoE 层"""
def __init__(self, d_model, d_ff, num_experts=8, top_k=2):
super().__init__()
self.num_experts = num_experts
self.top_k = top_k
# 路由器
self.router = Router(d_model, num_experts, top_k)
# 专家 (每个专家是一个 FFN)
self.experts = nn.ModuleList([
nn.Sequential(
nn.Linear(d_model, d_ff),
nn.GELU(),
nn.Linear(d_ff, d_model)
)
for _ in range(num_experts)
])
def forward(self, x):
batch, seq_len, d_model = x.shape
# 路由
gates, indices = self.router(x) # (B, S, K), (B, S, K)
# 展平便于处理
x_flat = x.view(-1, d_model) # (B*S, D)
gates_flat = gates.view(-1, self.top_k) # (B*S, K)
indices_flat = indices.view(-1, self.top_k) # (B*S, K)
# 计算输出
output = torch.zeros_like(x_flat)
for k in range(self.top_k):
expert_indices = indices_flat[:, k] # (B*S,)
expert_gates = gates_flat[:, k:k+1] # (B*S, 1)
for e in range(self.num_experts):
# 找到路由到专家 e 的 token
mask = (expert_indices == e)
if mask.sum() == 0:
continue
expert_input = x_flat[mask]
expert_output = self.experts[e](expert_input)
output[mask] += expert_gates[mask] * expert_output
return output.view(batch, seq_len, d_model)
def load_balance_loss(self, gates, indices):
"""负载均衡损失"""
# 计算每个专家的负载
num_tokens = gates.numel() // self.top_k
# f_i: 路由到专家 i 的 token 比例
expert_counts = torch.zeros(self.num_experts, device=gates.device)
for e in range(self.num_experts):
expert_counts[e] = (indices == e).sum()
f = expert_counts / num_tokens
# P_i: 路由到专家 i 的平均概率
P = torch.zeros(self.num_experts, device=gates.device)
for e in range(self.num_experts):
mask = (indices == e)
if mask.sum() > 0:
P[e] = gates[mask].mean()
# 负载均衡损失: 鼓励均匀分配
loss = self.num_experts * (f * P).sum()
return loss
2. 训练技术
2.1 预训练
Q6: 大模型预训练的关键技术有哪些?
答案要点:
大模型预训练关键技术
1. 数据处理
┌─────────────────────────────────────────────────────────────────────┐
│ 数据清洗 │
│ ├─ 去重 (MinHash, SimHash) │
│ ├─ 过滤低质量内容 │
│ ├─ 隐私数据脱敏 │
│ └─ 有害内容过滤 │
│ │
│ 数据配比 │
│ ├─ Web 数据: ~70% │
│ ├─ 书籍: ~15% │
│ ├─ 代码: ~10% │
│ └─ 学术论文: ~5% │
└─────────────────────────────────────────────────────────────────────┘
2. 训练稳定性
┌─────────────────────────────────────────────────────────────────────┐
│ 学习率调度 │
│ ├─ Warmup: 线性增加到峰值 │
│ └─ Decay: 余弦退火或线性衰减 │
│ │
│ 梯度处理 │
│ ├─ 梯度裁剪 (Gradient Clipping) │
│ ├─ 梯度累积 (Gradient Accumulation) │
│ └─ 梯度检查点 (Gradient Checkpointing) │
│ │
│ 数值稳定 │
│ ├─ 混合精度 (BF16 优于 FP16) │
│ ├─ Loss Scaling │
│ └─ 层归一化位置 (Pre-LN vs Post-LN) │
└─────────────────────────────────────────────────────────────────────┘
3. 并行策略
┌─────────────────────────────────────────────────────────────────────┐
│ 3D 并行 │
│ ├─ 数据并行 (DP): 不同 GPU 处理不同数据 │
│ ├─ 张量并行 (TP): 层内切分 (Megatron 风格) │
│ ├─ 流水线并行 (PP): 层间切分 │
│ └─ 专家并行 (EP): MoE 专家分布 │
│ │
│ 示例配置 (GPT-3 175B): │
│ ├─ TP = 8 (单节点 8 GPU) │
│ ├─ PP = 8 (8 个流水线阶段) │
│ └─ DP = 64 (64 个数据并行副本) │
│ 总 GPU 数 = 8 × 8 × 64 = 4096 │
└─────────────────────────────────────────────────────────────────────┘
"""
预训练配置示例
"""
# DeepSpeed 配置
deepspeed_config = {
"train_batch_size": 2048,
"train_micro_batch_size_per_gpu": 2,
"gradient_accumulation_steps": 128,
"optimizer": {
"type": "AdamW",
"params": {
"lr": 1e-4,
"betas": [0.9, 0.95],
"eps": 1e-8,
"weight_decay": 0.1
}
},
"scheduler": {
"type": "WarmupDecayLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": 1e-4,
"warmup_num_steps": 2000,
"total_num_steps": 100000
}
},
"fp16": {
"enabled": True,
"loss_scale": 0,
"loss_scale_window": 1000,
"initial_scale_power": 16,
"hysteresis": 2,
"min_loss_scale": 1
},
"zero_optimization": {
"stage": 3,
"offload_optimizer": {"device": "cpu"},
"offload_param": {"device": "none"},
"overlap_comm": True,
"contiguous_gradients": True
},
"gradient_clipping": 1.0,
"activation_checkpointing": {
"partition_activations": True,
"contiguous_memory_optimization": True
}
}
# Megatron-LM 张量并行
class TensorParallelLinear(nn.Module):
"""张量并行线性层 (列并行)"""
def __init__(self, in_features, out_features, tp_size, tp_rank):
super().__init__()
self.tp_size = tp_size
self.tp_rank = tp_rank
# 每个 rank 只持有 1/tp_size 的权重
self.local_out = out_features // tp_size
self.weight = nn.Parameter(
torch.empty(self.local_out, in_features)
)
nn.init.kaiming_uniform_(self.weight)
def forward(self, x):
# 本地计算
local_out = F.linear(x, self.weight)
# AllGather 收集所有结果
output_list = [torch.empty_like(local_out) for _ in range(self.tp_size)]
dist.all_gather(output_list, local_out)
return torch.cat(output_list, dim=-1)
2.2 微调与对齐
Q7: 解释 LoRA 的原理和实现?
答案要点:
LoRA (Low-Rank Adaptation) 原理
核心思想:
┌─────────────────────────────────────────────────────────────────────┐
│ 预训练权重: W₀ ∈ R^(d×k) │
│ 微调变化: ΔW ∈ R^(d×k) │
│ │
│ 假设 ΔW 是低秩的: ΔW = BA │
│ 其中 B ∈ R^(d×r), A ∈ R^(r×k), r << min(d, k) │
│ │
│ 微调后: W = W₀ + BA │
│ │
│ 参数量对比: │
│ - 全量微调: d × k │
│ - LoRA: (d + k) × r │
│ - 节省比例: r/min(d,k) │
└─────────────────────────────────────────────────────────────────────┘
示意图:
┌─────────────────────────────────────────────────────────────────────┐
│ │
│ x │
│ │ │
│ ┌─────┴─────┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌───┐ ┌───┐ │
│ │W₀ │ │ A │ r×k │
│ │ │ └─┬─┘ │
│ │d×k│ │ │
│ │ │ ▼ │
│ │ │ ┌───┐ │
│ │ │ │ B │ d×r │
│ └─┬─┘ └─┬─┘ │
│ │ │ │
│ │ × α/r │ │
│ └────┬─────┘ │
│ │ │
│ ▼ │
│ output │
│ │
└─────────────────────────────────────────────────────────────────────┘
"""
LoRA 实现
"""
class LoRALinear(nn.Module):
"""LoRA 线性层"""
def __init__(self, original_layer, r=8, alpha=16, dropout=0.1):
"""
Args:
original_layer: 原始线性层
r: LoRA 秩
alpha: 缩放因子
dropout: Dropout 概率
"""
super().__init__()
self.original = original_layer
self.r = r
self.alpha = alpha
self.scaling = alpha / r
in_features = original_layer.in_features
out_features = original_layer.out_features
# 冻结原始权重
for param in self.original.parameters():
param.requires_grad = False
# LoRA 矩阵
self.lora_A = nn.Parameter(torch.zeros(r, in_features))
self.lora_B = nn.Parameter(torch.zeros(out_features, r))
# 初始化
nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5))
nn.init.zeros_(self.lora_B) # B 初始化为 0,确保初始输出不变
self.dropout = nn.Dropout(dropout)
def forward(self, x):
# 原始输出
original_output = self.original(x)
# LoRA 输出
lora_output = self.dropout(x)
lora_output = F.linear(lora_output, self.lora_A)
lora_output = F.linear(lora_output, self.lora_B)
return original_output + lora_output * self.scaling
def merge_weights(self):
"""合并权重 (推理时使用)"""
self.original.weight.data += self.scaling * (self.lora_B @ self.lora_A)
return self.original
def apply_lora_to_model(model, r=8, alpha=16, target_modules=['q_proj', 'v_proj']):
"""给模型应用 LoRA"""
for name, module in model.named_modules():
if any(target in name for target in target_modules):
if isinstance(module, nn.Linear):
parent_name = '.'.join(name.split('.')[:-1])
child_name = name.split('.')[-1]
parent = model.get_submodule(parent_name) if parent_name else model
lora_layer = LoRALinear(module, r=r, alpha=alpha)
setattr(parent, child_name, lora_layer)
return model
# 使用 PEFT 库
from peft import get_peft_model, LoraConfig, TaskType
def create_lora_model(base_model):
"""使用 PEFT 创建 LoRA 模型"""
lora_config = LoraConfig(
task_type=TaskType.CAUSAL_LM,
r=8,
lora_alpha=32,
lora_dropout=0.1,
target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],
bias="none"
)
model = get_peft_model(base_model, lora_config)
model.print_trainable_parameters()
return model
Q8: RLHF 的三个阶段是什么?各有什么挑战?
答案要点:
RLHF 三阶段
Stage 1: 监督微调 (SFT)
┌─────────────────────────────────────────────────────────────────────┐
│ 输入: 预训练模型 + 高质量指令数据 │
│ 输出: 能遵循指令的模型 │
│ │
│ 数据格式: │
│ { │
│ "instruction": "解释量子计算的基本原理", │
│ "input": "", │
│ "output": "量子计算是..." │
│ } │
│ │
│ 挑战: │
│ - 高质量数据获取成本高 │
│ - 数据多样性和覆盖度 │
│ - 避免遗忘预训练知识 │
└─────────────────────────────────────────────────────────────────────┘
Stage 2: 奖励模型训练 (RM)
┌─────────────────────────────────────────────────────────────────────┐
│ 输入: SFT 模型 + 人类偏好数据 (排序) │
│ 输出: 能评估回答质量的奖励模型 │
│ │
│ 数据格式: │
│ { │
│ "prompt": "写一首关于春天的诗", │
│ "chosen": "春风拂面暖...", (人类偏好) │
│ "rejected": "春天来了..." (人类不偏好) │
│ } │
│ │
│ 训练目标: Bradley-Terry 模型 │
│ Loss = -log(σ(r(chosen) - r(rejected))) │
│ │
│ 挑战: │
│ - 人类偏好不一致 │
│ - 奖励 hacking │
│ - 分布外泛化 │
└─────────────────────────────────────────────────────────────────────┘
Stage 3: 强化学习优化 (PPO)
┌─────────────────────────────────────────────────────────────────────┐
│ 输入: SFT 模型 + 奖励模型 │
│ 输出: 对齐后的模型 │
│ │
│ 目标函数: │
│ max E[r(x,y)] - β × KL(π || π_ref) │
│ │
│ π: 当前策略 │
│ π_ref: 参考策略 (SFT 模型) │
│ β: KL 惩罚系数 │
│ │
│ 挑战: │
│ - 训练不稳定 │
│ - 奖励模型过拟合 │
│ - 计算资源消耗大 (需要 4 个模型) │
└─────────────────────────────────────────────────────────────────────┘
"""
RLHF 实现
"""
# Stage 2: 奖励模型
class RewardModel(nn.Module):
def __init__(self, base_model):
super().__init__()
self.backbone = base_model
self.reward_head = nn.Linear(
base_model.config.hidden_size, 1
)
def forward(self, input_ids, attention_mask):
outputs = self.backbone(
input_ids=input_ids,
attention_mask=attention_mask,
output_hidden_states=True
)
# 取最后一个 token 的隐藏状态
last_hidden = outputs.hidden_states[-1]
# 找到序列的最后一个非 padding token
sequence_lengths = attention_mask.sum(dim=1) - 1
last_token_hidden = last_hidden[
torch.arange(last_hidden.size(0)),
sequence_lengths
]
reward = self.reward_head(last_token_hidden)
return reward
def train_reward_model(model, dataloader, optimizer, epochs=3):
"""训练奖励模型"""
model.train()
for epoch in range(epochs):
for batch in dataloader:
chosen_ids = batch['chosen_input_ids']
chosen_mask = batch['chosen_attention_mask']
rejected_ids = batch['rejected_input_ids']
rejected_mask = batch['rejected_attention_mask']
# 计算奖励
r_chosen = model(chosen_ids, chosen_mask)
r_rejected = model(rejected_ids, rejected_mask)
# Bradley-Terry 损失
loss = -F.logsigmoid(r_chosen - r_rejected).mean()
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Stage 3: PPO
class PPOTrainer:
def __init__(self, policy_model, ref_model, reward_model,
tokenizer, kl_coef=0.1):
self.policy = policy_model
self.ref = ref_model
self.reward = reward_model
self.tokenizer = tokenizer
self.kl_coef = kl_coef
def compute_rewards(self, query_tensors, response_tensors):
"""计算奖励 (包含 KL 惩罚)"""
with torch.no_grad():
# 奖励模型分数
full_ids = torch.cat([query_tensors, response_tensors], dim=1)
rm_scores = self.reward(full_ids)
# KL 散度
policy_logits = self.policy(
response_tensors,
attention_mask=torch.ones_like(response_tensors)
).logits
ref_logits = self.ref(
response_tensors,
attention_mask=torch.ones_like(response_tensors)
).logits
kl_div = F.kl_div(
F.log_softmax(policy_logits, dim=-1),
F.softmax(ref_logits, dim=-1),
reduction='batchmean'
)
rewards = rm_scores - self.kl_coef * kl_div
return rewards
def ppo_step(self, queries, responses, old_logprobs, advantages):
"""PPO 更新步骤"""
# 计算新的 log 概率
new_logits = self.policy(responses).logits
new_logprobs = F.log_softmax(new_logits, dim=-1)
# 概率比
ratio = torch.exp(new_logprobs - old_logprobs)
# PPO Clipped 目标
clip_eps = 0.2
pg_loss1 = -advantages * ratio
pg_loss2 = -advantages * torch.clamp(ratio, 1 - clip_eps, 1 + clip_eps)
pg_loss = torch.max(pg_loss1, pg_loss2).mean()
return pg_loss
# DPO (Direct Preference Optimization) - RLHF 的简化替代
def dpo_loss(policy_model, ref_model, chosen_ids, rejected_ids, beta=0.1):
"""
DPO 直接优化偏好,无需单独的奖励模型
Loss = -log σ(β × (log π(y_w|x)/π_ref(y_w|x) - log π(y_l|x)/π_ref(y_l|x)))
"""
# 计算 log 概率
policy_chosen_logps = get_logprobs(policy_model, chosen_ids)
policy_rejected_logps = get_logprobs(policy_model, rejected_ids)
with torch.no_grad():
ref_chosen_logps = get_logprobs(ref_model, chosen_ids)
ref_rejected_logps = get_logprobs(ref_model, rejected_ids)
# DPO 损失
chosen_rewards = beta * (policy_chosen_logps - ref_chosen_logps)
rejected_rewards = beta * (policy_rejected_logps - ref_rejected_logps)
loss = -F.logsigmoid(chosen_rewards - rejected_rewards).mean()
return loss
3. 推理优化
3.1 基础问题
Q9: 解释 KV Cache 的原理和优化方法?
答案要点:
KV Cache 原理
自回归生成过程:
┌─────────────────────────────────────────────────────────────────────┐
│ Step 1: "The" → 计算 K₁, V₁ │
│ Step 2: "The cat" → 计算 K₂, V₂ (重复计算 K₁, V₁) │
│ Step 3: "The cat sat" → 计算 K₃, V₃ (重复计算 K₁, V₁, K₂, V₂) │
│ ... │
│ │
│ 问题: O(n²) 的重复计算 │
└─────────────────────────────────────────────────────────────────────┘
KV Cache 优化:
┌─────────────────────────────────────────────────────────────────────┐
│ 缓存之前的 K, V,只计算新 token 的 K, V │
│ │
│ Step 1: Q₁ @ [K₁]^T → cache K₁, V₁ │
│ Step 2: Q₂ @ [K₁, K₂]^T → 只计算 K₂, V₂ │
│ Step 3: Q₃ @ [K₁, K₂, K₃]^T → 只计算 K₃, V₃ │
│ │
│ 显存占用: 2 × batch × num_layers × seq_len × num_heads × head_dim │
└─────────────────────────────────────────────────────────────────────┘
KV Cache 优化技术:
┌─────────────────────────────────────────────────────────────────────┐
│ 1. Multi-Query Attention (MQA) │
│ - 所有注意力头共享 K, V │
│ - 显存降低: num_heads 倍 │
│ │
│ 2. Grouped-Query Attention (GQA) │
│ - K, V 分组共享 │
│ - 平衡质量和效率 │
│ │
│ 3. PagedAttention (vLLM) │
│ - 类似操作系统的虚拟内存管理 │
│ - 按需分配,减少碎片 │
│ │
│ 4. 量化 KV Cache │
│ - FP16 → INT8/INT4 │
│ - 显存降低 2-4 倍 │
└─────────────────────────────────────────────────────────────────────┘
"""
KV Cache 实现
"""
class KVCache:
"""基础 KV Cache"""
def __init__(self, num_layers, max_batch, max_seq_len,
num_heads, head_dim, device, dtype=torch.float16):
self.num_layers = num_layers
self.max_seq_len = max_seq_len
# 预分配缓存
cache_shape = (num_layers, 2, max_batch, num_heads, max_seq_len, head_dim)
self.cache = torch.zeros(cache_shape, device=device, dtype=dtype)
# 当前序列长度
self.seq_len = 0
def update(self, layer_idx, k, v):
"""更新缓存"""
# k, v: (batch, heads, seq_len, head_dim)
new_seq_len = k.shape[2]
self.cache[layer_idx, 0, :, :, self.seq_len:self.seq_len + new_seq_len, :] = k
self.cache[layer_idx, 1, :, :, self.seq_len:self.seq_len + new_seq_len, :] = v
self.seq_len += new_seq_len
def get(self, layer_idx):
"""获取缓存的 K, V"""
k = self.cache[layer_idx, 0, :, :, :self.seq_len, :]
v = self.cache[layer_idx, 1, :, :, :self.seq_len, :]
return k, v
class PagedKVCache:
"""分页 KV Cache (vLLM 风格)"""
def __init__(self, num_layers, num_heads, head_dim,
block_size=16, num_blocks=1000, device='cuda'):
self.block_size = block_size
self.num_blocks = num_blocks
# 物理块 (预分配的内存池)
block_shape = (num_layers, 2, num_heads, block_size, head_dim)
self.blocks = torch.zeros(num_blocks, *block_shape, device=device)
# 块分配表
self.free_blocks = list(range(num_blocks))
self.block_tables = {} # request_id -> [block_ids]
def allocate(self, request_id, num_tokens):
"""为请求分配块"""
num_blocks_needed = (num_tokens + self.block_size - 1) // self.block_size
if len(self.free_blocks) < num_blocks_needed:
raise MemoryError("No free blocks")
allocated = [self.free_blocks.pop() for _ in range(num_blocks_needed)]
self.block_tables[request_id] = allocated
return allocated
def free(self, request_id):
"""释放请求的块"""
if request_id in self.block_tables:
self.free_blocks.extend(self.block_tables.pop(request_id))
def update(self, request_id, layer_idx, k, v, position):
"""更新指定位置的 KV"""
block_idx = position // self.block_size
block_offset = position % self.block_size
physical_block = self.block_tables[request_id][block_idx]
self.blocks[physical_block, layer_idx, 0, :, block_offset, :] = k
self.blocks[physical_block, layer_idx, 1, :, block_offset, :] = v
# GQA (Grouped-Query Attention)
class GroupedQueryAttention(nn.Module):
"""分组查询注意力"""
def __init__(self, d_model, num_heads, num_kv_heads):
super().__init__()
self.num_heads = num_heads
self.num_kv_heads = num_kv_heads
self.num_groups = num_heads // num_kv_heads
self.head_dim = d_model // num_heads
self.q_proj = nn.Linear(d_model, num_heads * self.head_dim)
self.k_proj = nn.Linear(d_model, num_kv_heads * self.head_dim)
self.v_proj = nn.Linear(d_model, num_kv_heads * self.head_dim)
self.o_proj = nn.Linear(num_heads * self.head_dim, d_model)
def forward(self, x, kv_cache=None, position=0):
batch, seq_len, _ = x.shape
q = self.q_proj(x).view(batch, seq_len, self.num_heads, self.head_dim)
k = self.k_proj(x).view(batch, seq_len, self.num_kv_heads, self.head_dim)
v = self.v_proj(x).view(batch, seq_len, self.num_kv_heads, self.head_dim)
# 更新 KV Cache
if kv_cache is not None:
k, v = kv_cache.update_and_get(k, v, position)
# 扩展 K, V 以匹配 Q 的头数
k = k.repeat_interleave(self.num_groups, dim=2)
v = v.repeat_interleave(self.num_groups, dim=2)
# 注意力计算
q = q.transpose(1, 2)
k = k.transpose(1, 2)
v = v.transpose(1, 2)
scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim)
attn = F.softmax(scores, dim=-1)
output = torch.matmul(attn, v)
output = output.transpose(1, 2).contiguous().view(batch, seq_len, -1)
return self.o_proj(output)
Q10: 投机解码 (Speculative Decoding) 的原理?
答案要点:
投机解码原理
标准自回归解码:
┌─────────────────────────────────────────────────────────────────────┐
│ 每步只生成一个 token,受制于内存带宽 │
│ │
│ Step 1: [prompt] → token1 (一次大模型调用) │
│ Step 2: [prompt, token1] → token2 (一次大模型调用) │
│ Step 3: [prompt, token1, token2] → token3 (一次大模型调用) │
│ ... │
│ │
│ N 个 token 需要 N 次大模型调用 │
└─────────────────────────────────────────────────────────────────────┘
投机解码:
┌─────────────────────────────────────────────────────────────────────┐
│ 用小模型 (draft model) 快速生成多个候选 token │
│ 用大模型 (target model) 一次验证多个 token │
│ │
│ Step 1: 小模型生成 K 个候选 [t1, t2, t3, t4] │
│ Step 2: 大模型并行验证 → 接受 [t1, t2, t3], 拒绝 t4 │
│ Step 3: 大模型在拒绝位置采样正确 token │
│ │
│ 加速比: 接受率 × K / (K + 1) │
└─────────────────────────────────────────────────────────────────────┘
验证算法 (保证输出分布不变):
┌─────────────────────────────────────────────────────────────────────┐
│ 对每个候选 token t: │
│ │
│ p = target_model_prob(t) │
│ q = draft_model_prob(t) │
│ │
│ if random() < min(1, p/q): │
│ accept t │
│ else: │
│ reject t, resample from (p - q)+ │
└─────────────────────────────────────────────────────────────────────┘
"""
投机解码实现
"""
class SpeculativeDecoder:
"""投机解码器"""
def __init__(self, target_model, draft_model, tokenizer,
num_speculative_tokens=4):
self.target = target_model
self.draft = draft_model
self.tokenizer = tokenizer
self.K = num_speculative_tokens
@torch.no_grad()
def generate(self, input_ids, max_new_tokens):
"""投机生成"""
generated = input_ids.clone()
while generated.shape[1] - input_ids.shape[1] < max_new_tokens:
# 1. Draft 阶段: 小模型生成 K 个候选
draft_tokens = self._draft_tokens(generated)
# 2. Verify 阶段: 大模型验证
accepted_tokens, next_token = self._verify_tokens(
generated, draft_tokens
)
# 3. 更新生成序列
generated = torch.cat([generated, accepted_tokens, next_token], dim=1)
return generated
def _draft_tokens(self, prefix):
"""用小模型生成候选 token"""
draft_tokens = []
draft_input = prefix.clone()
for _ in range(self.K):
logits = self.draft(draft_input).logits[:, -1, :]
probs = F.softmax(logits, dim=-1)
# 采样
token = torch.multinomial(probs, num_samples=1)
draft_tokens.append(token)
draft_input = torch.cat([draft_input, token], dim=1)
return torch.cat(draft_tokens, dim=1) # (batch, K)
def _verify_tokens(self, prefix, draft_tokens):
"""用大模型验证"""
batch_size = prefix.shape[0]
# 拼接 prefix 和 draft tokens
full_input = torch.cat([prefix, draft_tokens], dim=1)
# 大模型前向 (一次性计算所有位置的概率)
target_logits = self.target(full_input).logits
# 小模型概率 (重新计算以获取完整分布)
draft_logits = self.draft(full_input).logits
# 验证每个位置
accepted = []
prefix_len = prefix.shape[1]
for i in range(self.K):
pos = prefix_len + i
draft_token = draft_tokens[:, i:i+1]
# 目标模型在 pos-1 位置对 pos token 的概率
target_probs = F.softmax(target_logits[:, pos-1, :], dim=-1)
draft_probs = F.softmax(draft_logits[:, pos-1, :], dim=-1)
p = target_probs.gather(1, draft_token).squeeze(-1)
q = draft_probs.gather(1, draft_token).squeeze(-1)
# 接受概率
accept_prob = torch.minimum(torch.ones_like(p), p / (q + 1e-10))
accept_mask = torch.rand_like(accept_prob) < accept_prob
if accept_mask.all():
accepted.append(draft_token)
else:
# 某个位置被拒绝,从调整分布中采样
adjusted_probs = torch.clamp(target_probs - draft_probs, min=0)
adjusted_probs = adjusted_probs / adjusted_probs.sum(dim=-1, keepdim=True)
next_token = torch.multinomial(adjusted_probs, num_samples=1)
break
else:
# 所有候选都被接受,从大模型采样下一个
next_probs = F.softmax(target_logits[:, -1, :], dim=-1)
next_token = torch.multinomial(next_probs, num_samples=1)
accepted_tokens = torch.cat(accepted, dim=1) if accepted else torch.empty(batch_size, 0, dtype=torch.long, device=prefix.device)
return accepted_tokens, next_token
4. 应用开发
4.1 RAG 系统
Q11: 设计一个高质量的 RAG 系统需要考虑哪些方面?
答案要点:
RAG 系统设计要点
1. 文档处理
┌─────────────────────────────────────────────────────────────────────┐
│ 分块策略 │
│ ├─ 固定大小分块 (简单但可能切断语义) │
│ ├─ 语义分块 (按段落/句子边界) │
│ ├─ 递归分块 (层次结构) │
│ └─ 滑动窗口 (带重叠) │
│ │
│ 元数据提取 │
│ ├─ 文档标题、章节 │
│ ├─ 创建时间、来源 │
│ └─ 实体、关键词 │
└─────────────────────────────────────────────────────────────────────┘
2. 检索优化
┌─────────────────────────────────────────────────────────────────────┐
│ 混合检索 │
│ ├─ 向量检索 (语义相似) │
│ ├─ BM25 (关键词匹配) │
│ └─ 融合排序 (RRF) │
│ │
│ 检索增强 │
│ ├─ Query Expansion (扩展查询) │
│ ├─ HyDE (假设文档嵌入) │
│ ├─ Multi-Query (多角度查询) │
│ └─ Reranking (重排序) │
└─────────────────────────────────────────────────────────────────────┘
3. 生成优化
┌─────────────────────────────────────────────────────────────────────┐
│ 上下文构建 │
│ ├─ 相关性排序 │
│ ├─ 去重和合并 │
│ └─ 长度控制 │
│ │
│ 生成控制 │
│ ├─ 引用标注 │
│ ├─ 幻觉检测 │
│ └─ 答案验证 │
└─────────────────────────────────────────────────────────────────────┘
"""
RAG 系统实现
"""
class AdvancedRAG:
"""高级 RAG 系统"""
def __init__(self, embedding_model, llm, vector_store):
self.embedding_model = embedding_model
self.llm = llm
self.vector_store = vector_store
self.bm25 = None
def index_documents(self, documents, chunk_size=512, overlap=50):
"""索引文档"""
chunks = []
for doc in documents:
# 语义分块
doc_chunks = self._semantic_chunking(doc, chunk_size, overlap)
chunks.extend(doc_chunks)
# 向量化
embeddings = self.embedding_model.encode([c['text'] for c in chunks])
# 存储
self.vector_store.add(embeddings, chunks)
# 构建 BM25 索引
from rank_bm25 import BM25Okapi
tokenized = [c['text'].split() for c in chunks]
self.bm25 = BM25Okapi(tokenized)
self.bm25_chunks = chunks
def _semantic_chunking(self, document, chunk_size, overlap):
"""语义分块"""
# 按段落分割
paragraphs = document['text'].split('\n\n')
chunks = []
current_chunk = ""
for para in paragraphs:
if len(current_chunk) + len(para) < chunk_size:
current_chunk += para + "\n\n"
else:
if current_chunk:
chunks.append({
'text': current_chunk.strip(),
'metadata': document.get('metadata', {})
})
current_chunk = para + "\n\n"
if current_chunk:
chunks.append({
'text': current_chunk.strip(),
'metadata': document.get('metadata', {})
})
return chunks
def retrieve(self, query, top_k=5, use_hybrid=True):
"""混合检索"""
results = []
# 向量检索
query_embedding = self.embedding_model.encode([query])[0]
vector_results = self.vector_store.search(query_embedding, top_k=top_k)
if use_hybrid and self.bm25:
# BM25 检索
tokenized_query = query.split()
bm25_scores = self.bm25.get_scores(tokenized_query)
bm25_top_k = sorted(
enumerate(bm25_scores),
key=lambda x: x[1],
reverse=True
)[:top_k]
# RRF 融合
results = self._reciprocal_rank_fusion(
vector_results, bm25_top_k
)
else:
results = vector_results
return results
def _reciprocal_rank_fusion(self, vector_results, bm25_results, k=60):
"""倒数排名融合"""
scores = {}
for rank, (doc_id, score) in enumerate(vector_results):
scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1)
for rank, (doc_idx, bm25_score) in enumerate(bm25_results):
doc_id = f"bm25_{doc_idx}"
scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1)
# 排序
sorted_results = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return sorted_results
def query_expansion(self, query):
"""查询扩展"""
prompt = f"""Given the following query, generate 3 alternative versions that might help find relevant information:
Query: {query}
Alternative queries:
1."""
expanded = self.llm.generate(prompt, max_tokens=100)
queries = [query] + [q.strip() for q in expanded.split('\n') if q.strip()]
return queries
def generate_answer(self, query, contexts, with_citations=True):
"""生成答案"""
# 构建上下文
context_str = "\n\n".join([
f"[{i+1}] {ctx['text']}"
for i, ctx in enumerate(contexts)
])
if with_citations:
prompt = f"""Based on the following context, answer the question. Include citations [1], [2], etc. when using information from the context.
Context:
{context_str}
Question: {query}
Answer with citations:"""
else:
prompt = f"""Based on the following context, answer the question.
Context:
{context_str}
Question: {query}
Answer:"""
answer = self.llm.generate(prompt, max_tokens=500)
# 验证答案
if with_citations:
answer = self._validate_citations(answer, len(contexts))
return answer
def _validate_citations(self, answer, num_contexts):
"""验证引用"""
import re
citations = re.findall(r'\[(\d+)\]', answer)
# 检查引用是否有效
invalid_citations = [c for c in citations if int(c) > num_contexts]
if invalid_citations:
# 移除无效引用
for c in invalid_citations:
answer = answer.replace(f'[{c}]', '')
return answer
4.2 Agent 系统
Q12: 如何设计一个可靠的 LLM Agent 系统?
答案要点:
Agent 系统设计
核心组件:
┌─────────────────────────────────────────────────────────────────────┐
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 规划器 (Planner) │ │
│ │ - 任务分解 │ │
│ │ - 目标设定 │ │
│ │ - 策略选择 │ │
│ └──────────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────┴──────────────────────────────────┐ │
│ │ 执行器 (Executor) │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Tool 1 │ │ Tool 2 │ │ Tool N │ │ │
│ │ │ (搜索) │ │ (计算) │ │ (API) │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │
│ └──────────────────────────┬──────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────┴──────────────────────────────────┐ │
│ │ 记忆系统 (Memory) │ │
│ │ - 短期记忆 (对话上下文) │ │
│ │ - 长期记忆 (向量存储) │ │
│ │ - 工作记忆 (当前任务状态) │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
可靠性设计:
┌─────────────────────────────────────────────────────────────────────┐
│ 1. 错误处理 │
│ - 工具调用失败重试 │
│ - 超时处理 │
│ - 回退策略 │
│ │
│ 2. 安全机制 │
│ - 工具权限控制 │
│ - 输入验证 │
│ - 输出审核 │
│ │
│ 3. 可观测性 │
│ - 执行轨迹记录 │
│ - 决策日志 │
│ - 性能监控 │
└─────────────────────────────────────────────────────────────────────┘
"""
Agent 系统实现
"""
from dataclasses import dataclass
from typing import List, Dict, Any, Callable, Optional
from enum import Enum
class ActionStatus(Enum):
SUCCESS = "success"
FAILED = "failed"
PENDING = "pending"
@dataclass
class Tool:
name: str
description: str
function: Callable
parameters: Dict[str, Any]
@dataclass
class Action:
tool_name: str
parameters: Dict[str, Any]
thought: str
@dataclass
class Observation:
result: Any
status: ActionStatus
error: Optional[str] = None
class ReActAgent:
"""ReAct 风格的 Agent"""
def __init__(self, llm, tools: List[Tool], max_iterations=10):
self.llm = llm
self.tools = {t.name: t for t in tools}
self.max_iterations = max_iterations
self.memory = []
def run(self, task: str) -> str:
"""执行任务"""
self.memory = [{"role": "user", "content": task}]
for i in range(self.max_iterations):
# 思考下一步
action = self._think()
if action is None:
# 任务完成
return self._extract_final_answer()
# 执行动作
observation = self._execute(action)
# 更新记忆
self.memory.append({
"role": "assistant",
"content": f"Thought: {action.thought}\nAction: {action.tool_name}\nAction Input: {action.parameters}"
})
self.memory.append({
"role": "observation",
"content": f"Observation: {observation.result}"
})
# 检查是否失败
if observation.status == ActionStatus.FAILED:
# 尝试恢复
recovery_action = self._recover_from_failure(observation.error)
if recovery_action:
continue
else:
return f"Task failed: {observation.error}"
return "Max iterations reached"
def _think(self) -> Optional[Action]:
"""生成下一步动作"""
tools_desc = "\n".join([
f"- {t.name}: {t.description}"
for t in self.tools.values()
])
prompt = f"""You are a helpful assistant that can use tools to complete tasks.
Available tools:
{tools_desc}
Based on the conversation history, decide the next action.
If the task is complete, respond with "Final Answer: [your answer]"
Otherwise, respond with:
Thought: [your reasoning]
Action: [tool name]
Action Input: [parameters as JSON]
History:
{self._format_memory()}
Next step:"""
response = self.llm.generate(prompt, max_tokens=500)
return self._parse_action(response)
def _parse_action(self, response: str) -> Optional[Action]:
"""解析 LLM 响应"""
if "Final Answer:" in response:
return None
# 提取 Thought, Action, Action Input
import re
import json
thought_match = re.search(r'Thought:\s*(.+?)(?=Action:|$)', response, re.DOTALL)
action_match = re.search(r'Action:\s*(\w+)', response)
input_match = re.search(r'Action Input:\s*({.+})', response, re.DOTALL)
if not action_match:
return None
thought = thought_match.group(1).strip() if thought_match else ""
tool_name = action_match.group(1)
try:
parameters = json.loads(input_match.group(1)) if input_match else {}
except json.JSONDecodeError:
parameters = {}
return Action(tool_name=tool_name, parameters=parameters, thought=thought)
def _execute(self, action: Action) -> Observation:
"""执行动作"""
tool = self.tools.get(action.tool_name)
if not tool:
return Observation(
result=None,
status=ActionStatus.FAILED,
error=f"Unknown tool: {action.tool_name}"
)
try:
result = tool.function(**action.parameters)
return Observation(result=result, status=ActionStatus.SUCCESS)
except Exception as e:
return Observation(
result=None,
status=ActionStatus.FAILED,
error=str(e)
)
def _recover_from_failure(self, error: str) -> Optional[Action]:
"""从失败中恢复"""
prompt = f"""The previous action failed with error: {error}
How should we proceed? Options:
1. Retry with different parameters
2. Try a different approach
3. Give up
Respond with your decision and next action if applicable."""
response = self.llm.generate(prompt, max_tokens=200)
if "give up" in response.lower():
return None
return self._parse_action(response)
def _format_memory(self) -> str:
"""格式化记忆"""
return "\n".join([
f"{m['role']}: {m['content']}"
for m in self.memory
])
def _extract_final_answer(self) -> str:
"""提取最终答案"""
last_response = self.memory[-1]['content'] if self.memory else ""
if "Final Answer:" in last_response:
return last_response.split("Final Answer:")[-1].strip()
return last_response
# 工具定义示例
def search_tool(query: str) -> str:
"""搜索工具"""
# 实际实现会调用搜索 API
return f"Search results for '{query}': ..."
def calculator_tool(expression: str) -> float:
"""计算器工具"""
return eval(expression)
# 创建 Agent
tools = [
Tool(
name="search",
description="Search the web for information",
function=search_tool,
parameters={"query": "string"}
),
Tool(
name="calculator",
description="Evaluate mathematical expressions",
function=calculator_tool,
parameters={"expression": "string"}
)
]
# agent = ReActAgent(llm, tools)
# result = agent.run("What is the population of China times 2?")
总结
本章覆盖了大模型相关的核心面试题:
- Transformer 架构:Self-Attention、位置编码、Flash Attention、MoE
- 训练技术:预训练、LoRA、RLHF/DPO
- 推理优化:KV Cache、投机解码
- 应用开发:RAG、Agent
面试准备建议:
- 深入理解原理,而非只记公式
- 能够手写核心代码实现
- 了解最新进展和工程实践
- 准备好讨论实际项目经验