01-AI基础设施核心面试题
概述
本章汇总 AI 基础设施领域的核心面试题,涵盖分布式训练、GPU 调度、模型服务、存储系统等关键主题,帮助读者全面准备技术面试。
1. 分布式训练
1.1 基础概念
Q1: 解释数据并行和模型并行的区别?
数据并行 vs 模型并行
┌─────────────────────────────────────────────────────────────────────┐
│ 数据并行 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 数据批次 │
│ ┌─────────────────────────────────────────┐ │
│ │ Batch 0 │ Batch 1 │ Batch 2 │ ... │ │
│ └─────┬─────┴─────┬─────┴─────┬─────┴─────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ GPU 0 │ │ GPU 1 │ │ GPU 2 │ 每个 GPU 有完整模型 │
│ │ Model │ │ Model │ │ Model │ │
│ │ Copy │ │ Copy │ │ Copy │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └───────────┼───────────┘ │
│ ▼ │
│ 梯度聚合 (AllReduce) │
│ │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ 模型并行 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 完整模型 │
│ ┌──────────────────────────────────────────┐ │
│ │ Layer 0-3 │ Layer 4-7 │ Layer 8-11 │ │
│ └──────┬──────┴──────┬──────┴──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ GPU 0 │──▶│ GPU 1 │──▶│ GPU 2 │ 每个 GPU 有部分模型 │
│ │Layer0-3 │ │Layer4-7 │ │Layer8-11│ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 激活值在 GPU 之间传递 (Pipeline) │
│ │
└─────────────────────────────────────────────────────────────────────┘
答案要点:
"""
数据并行 (Data Parallelism)
"""
# 特点:
# 1. 每个 GPU 持有完整的模型副本
# 2. 训练数据被分割到不同 GPU
# 3. 前向和反向传播独立进行
# 4. 通过 AllReduce 同步梯度
# 适用场景:
# - 模型能放入单个 GPU 显存
# - 需要加速训练速度
# - 批量大小较大
# PyTorch 实现
import torch.nn.parallel as parallel
# DDP (推荐)
model = parallel.DistributedDataParallel(model, device_ids=[local_rank])
# DP (简单但效率较低)
model = parallel.DataParallel(model)
"""
模型并行 (Model Parallelism)
"""
# 特点:
# 1. 模型被切分到不同 GPU
# 2. 每个 GPU 只持有部分层
# 3. 激活值在 GPU 之间传递
# 4. 存在流水线气泡问题
# 适用场景:
# - 单个模型无法放入一个 GPU
# - 大型语言模型训练
# - 内存受限场景
# 手动实现示例
class ModelParallel(nn.Module):
def __init__(self):
super().__init__()
self.layer1 = nn.Linear(1000, 1000).to('cuda:0')
self.layer2 = nn.Linear(1000, 1000).to('cuda:1')
def forward(self, x):
x = self.layer1(x.to('cuda:0'))
x = self.layer2(x.to('cuda:1'))
return x
Q2: AllReduce 操作的原理是什么?常见实现算法有哪些?
答案要点:
AllReduce 操作流程
初始状态:每个 GPU 有局部梯度
GPU 0: [g0] GPU 1: [g1] GPU 2: [g2] GPU 3: [g3]
AllReduce 后:每个 GPU 有全局聚合梯度
GPU 0: [Σg] GPU 1: [Σg] GPU 2: [Σg] GPU 3: [Σg]
其中 Σg = g0 + g1 + g2 + g3
常见算法:
1. Ring AllReduce
┌───────────────────────────────────────────────────┐
│ │
│ GPU 0 ──────▶ GPU 1 ──────▶ GPU 2 │
│ ▲ │ │
│ │ ▼ │
│ └─────────── GPU 3 ◀────────┘ │
│ │
│ 优点:带宽利用率高,O(N) 通信量 │
│ 缺点:延迟随节点数线性增长 │
└───────────────────────────────────────────────────┘
2. Tree AllReduce
┌───────────────────────────────────────────────────┐
│ │
│ Root (GPU 0) │
│ / \ │
│ GPU 1 GPU 2 │
│ / \ / \ │
│ GPU 3 GPU 4 GPU 5 GPU 6 │
│ │
│ 优点:延迟 O(logN) │
│ 缺点:根节点带宽瓶颈 │
└───────────────────────────────────────────────────┘
3. Recursive Halving-Doubling
┌───────────────────────────────────────────────────┐
│ Step 1: 距离 1 的节点交换 │
│ Step 2: 距离 2 的节点交换 │
│ Step 3: 距离 4 的节点交换 │
│ ... │
│ │
│ 优点:适合小消息,延迟低 │
│ 缺点:大消息时带宽利用率不高 │
└───────────────────────────────────────────────────┘
"""
Ring AllReduce 简化实现
"""
def ring_allreduce(tensors, world_size, rank):
"""
Ring AllReduce 算法
分为两个阶段:
1. Scatter-Reduce: 将数据分成 N 份,每份通过环传递并累加
2. Allgather: 将最终结果传播到所有节点
"""
chunk_size = len(tensors) // world_size
# Phase 1: Scatter-Reduce
for step in range(world_size - 1):
send_chunk = (rank - step) % world_size
recv_chunk = (rank - step - 1) % world_size
# 发送和接收
send_data = tensors[send_chunk * chunk_size:(send_chunk + 1) * chunk_size]
recv_data = receive_from((rank + 1) % world_size)
send_to((rank - 1) % world_size, send_data)
# 累加
tensors[recv_chunk * chunk_size:(recv_chunk + 1) * chunk_size] += recv_data
# Phase 2: Allgather
for step in range(world_size - 1):
send_chunk = (rank - step + 1) % world_size
recv_chunk = (rank - step) % world_size
send_data = tensors[send_chunk * chunk_size:(send_chunk + 1) * chunk_size]
recv_data = receive_from((rank + 1) % world_size)
send_to((rank - 1) % world_size, send_data)
tensors[recv_chunk * chunk_size:(recv_chunk + 1) * chunk_size] = recv_data
return tensors
# NCCL 使用示例
import torch.distributed as dist
# 初始化
dist.init_process_group(backend='nccl')
# AllReduce
tensor = torch.randn(1000).cuda()
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
Q3: 解释 ZeRO 优化的三个阶段?
答案要点:
ZeRO (Zero Redundancy Optimizer) 三阶段
标准数据并行的显存占用 (每个 GPU):
┌─────────────────────────────────────────────────────┐
│ 模型参数 (Φ) │ 2Φ bytes (fp16) │
│ 梯度 (G) │ 2Φ bytes (fp16) │
│ 优化器状态 (O) │ 12Φ bytes (Adam: fp32 参数+动量+方差) │
│ 总计 │ 16Φ bytes │
└─────────────────────────────────────────────────────┘
ZeRO-1: 优化器状态分片
┌─────────────────────────────────────────────────────┐
│ GPU 0: 参数 + 梯度 + 优化器状态[0:N/4] │
│ GPU 1: 参数 + 梯度 + 优化器状态[N/4:N/2] │
│ GPU 2: 参数 + 梯度 + 优化器状态[N/2:3N/4] │
│ GPU 3: 参数 + 梯度 + 优化器状态[3N/4:N] │
│ │
│ 显存: 4Φ + 12Φ/N bytes │
│ 通信量: 不变 │
└─────────────────────────────────────────────────────┘
ZeRO-2: 梯度 + 优化器状态分片
┌─────────────────────────────────────────────────────┐
│ GPU 0: 参数 + 梯度[0:N/4] + 优化器状态[0:N/4] │
│ GPU 1: 参数 + 梯度[N/4:N/2] + 优化器状态[N/4:N/2] │
│ ... │
│ │
│ 显存: 2Φ + 14Φ/N bytes │
│ 通信量: Reduce-Scatter 替代 AllReduce │
└─────────────────────────────────────────────────────┘
ZeRO-3: 参数 + 梯度 + 优化器状态分片
┌─────────────────────────────────────────────────────┐
│ GPU 0: 参数[0:N/4] + 梯度[0:N/4] + 优化器[0:N/4] │
│ GPU 1: 参数[N/4:N/2] + ... │
│ ... │
│ │
│ 显存: 16Φ/N bytes │
│ 通信量: 前向/反向时需要 AllGather 参数 │
└─────────────────────────────────────────────────────┘
"""
DeepSpeed ZeRO 配置示例
"""
# ZeRO Stage 1 配置
zero_stage1_config = {
"zero_optimization": {
"stage": 1,
"reduce_bucket_size": 5e8,
"allgather_bucket_size": 5e8
}
}
# ZeRO Stage 2 配置
zero_stage2_config = {
"zero_optimization": {
"stage": 2,
"contiguous_gradients": True,
"overlap_comm": True,
"reduce_scatter": True,
"reduce_bucket_size": 5e8,
"allgather_bucket_size": 5e8
}
}
# ZeRO Stage 3 配置
zero_stage3_config = {
"zero_optimization": {
"stage": 3,
"contiguous_gradients": True,
"stage3_prefetch_bucket_size": 5e8,
"stage3_param_persistence_threshold": 1e6,
"stage3_max_live_parameters": 1e9,
"stage3_max_reuse_distance": 1e9,
"stage3_gather_16bit_weights_on_model_save": True
}
}
# 使用 DeepSpeed
import deepspeed
model_engine, optimizer, _, _ = deepspeed.initialize(
model=model,
model_parameters=model.parameters(),
config=zero_stage3_config
)
Q4: 梯度累积的作用是什么?如何正确实现?
答案要点:
"""
梯度累积 (Gradient Accumulation)
作用:
1. 在显存有限时模拟大批量训练
2. 等效批量 = 每步批量 × 累积步数
3. 减少通信频率(分布式训练)
注意事项:
1. 损失需要除以累积步数
2. 梯度同步只在累积完成时进行
3. 学习率可能需要调整
"""
def train_with_gradient_accumulation(
model, dataloader, optimizer,
accumulation_steps=4
):
"""正确的梯度累积实现"""
model.train()
optimizer.zero_grad()
for i, (inputs, labels) in enumerate(dataloader):
inputs = inputs.cuda()
labels = labels.cuda()
# 前向传播
outputs = model(inputs)
# 损失除以累积步数,保证梯度大小一致
loss = criterion(outputs, labels) / accumulation_steps
# 反向传播(梯度累积)
loss.backward()
# 每 accumulation_steps 步更新一次
if (i + 1) % accumulation_steps == 0:
# 梯度裁剪(可选)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# 参数更新
optimizer.step()
optimizer.zero_grad()
"""
分布式训练中的梯度累积
"""
def train_distributed_with_accumulation(
model, dataloader, optimizer,
accumulation_steps=4
):
"""DDP + 梯度累积"""
model.train()
optimizer.zero_grad()
for i, (inputs, labels) in enumerate(dataloader):
# 判断是否是累积的最后一步
is_accumulation_step = (i + 1) % accumulation_steps != 0
# 非最后一步时禁用梯度同步
with model.no_sync() if is_accumulation_step else nullcontext():
outputs = model(inputs.cuda())
loss = criterion(outputs, labels.cuda()) / accumulation_steps
loss.backward()
# 累积完成后更新
if not is_accumulation_step:
optimizer.step()
optimizer.zero_grad()
1.2 进阶问题
Q5: 混合精度训练的原理?可能遇到什么问题?
答案要点:
混合精度训练 (Mixed Precision Training)
数值表示范围:
┌─────────────────────────────────────────────────────┐
│ FP32: 符号(1) + 指数(8) + 尾数(23) = 32位 │
│ 范围: ±3.4e38, 精度: ~7位有效数字 │
│ │
│ FP16: 符号(1) + 指数(5) + 尾数(10) = 16位 │
│ 范围: ±65504, 精度: ~3位有效数字 │
│ │
│ BF16: 符号(1) + 指数(8) + 尾数(7) = 16位 │
│ 范围: ±3.4e38, 精度: ~2位有效数字 │
└─────────────────────────────────────────────────────┘
混合精度策略:
┌─────────────────────────────────────────────────────┐
│ FP16/BF16 FP32 │
│ ├─ 前向计算 ├─ 主权重副本 │
│ ├─ 反向传播 ├─ 损失缩放 │
│ └─ 梯度存储 └─ 优化器状态 │
└─────────────────────────────────────────────────────┘
"""
混合精度训练实现
"""
# 问题 1: 下溢 (Underflow)
# FP16 最小正数约 6e-8,小于此值的梯度变为 0
# 解决方案: 损失缩放 (Loss Scaling)
scaler = torch.cuda.amp.GradScaler(init_scale=65536)
for inputs, labels in dataloader:
optimizer.zero_grad()
# 自动混合精度上下文
with torch.cuda.amp.autocast():
outputs = model(inputs)
loss = criterion(outputs, labels)
# 缩放损失并反向传播
scaler.scale(loss).backward()
# 反缩放梯度并更新
scaler.step(optimizer)
scaler.update()
# 问题 2: 上溢 (Overflow)
# FP16 最大值约 65504,大于此值变为 inf
# 解决方案: 动态损失缩放
class DynamicLossScaler:
def __init__(self, init_scale=2**16,
growth_factor=2,
backoff_factor=0.5,
growth_interval=2000):
self.scale = init_scale
self.growth_factor = growth_factor
self.backoff_factor = backoff_factor
self.growth_interval = growth_interval
self.steps_since_growth = 0
def scale_loss(self, loss):
return loss * self.scale
def update(self, has_overflow):
if has_overflow:
# 检测到溢出,降低缩放因子
self.scale *= self.backoff_factor
self.steps_since_growth = 0
else:
self.steps_since_growth += 1
if self.steps_since_growth >= self.growth_interval:
# 稳定后增大缩放因子
self.scale *= self.growth_factor
self.steps_since_growth = 0
# 问题 3: 精度敏感层
# 某些操作对精度敏感,需要保持 FP32
# 解决方案: 选择性精度
with torch.cuda.amp.autocast():
# 大部分计算使用 FP16
x = model.conv(x)
x = model.bn(x)
# 精度敏感操作保持 FP32
with torch.cuda.amp.autocast(enabled=False):
x = x.float()
x = model.layer_norm(x)
Q6: 流水线并行中的气泡问题如何解决?
答案要点:
流水线气泡问题
朴素流水线 (GPU 利用率低):
┌─────────────────────────────────────────────────────────────┐
│ GPU 0: │F0│ │ │ │ │B0│ │ │ │ │ │
│ GPU 1: │ │F0│ │ │ │ │B0│ │ │ │ │
│ GPU 2: │ │ │F0│ │ │ │ │B0│ │ │ │
│ GPU 3: │ │ │ │F0│B0│ │ │ │ │ │ │
│ └─────────────────────────────────────────────────┘ │
│ 气泡 (Bubble) = 空闲时间 │
└─────────────────────────────────────────────────────────────┘
GPipe (微批次):
┌─────────────────────────────────────────────────────────────┐
│ GPU 0: │F0│F1│F2│F3│ │ │B3│B2│B1│B0│ │
│ GPU 1: │ │F0│F1│F2│F3│ │ │B3│B2│B1│B0│ │
│ GPU 2: │ │ │F0│F1│F2│F3│ │ │B3│B2│B1│B0│ │
│ GPU 3: │ │ │ │F0│F1│F2│F3│B3│B2│B1│B0│ │
│ └─────────────────────────────────────────────────┘ │
│ 气泡减少,但仍存在 │
└─────────────────────────────────────────────────────────────┘
1F1B (One Forward One Backward):
┌─────────────────────────────────────────────────────────────┐
│ GPU 0: │F0│F1│F2│F3│B0│B1│B2│B3│ │
│ GPU 1: │ │F0│F1│F2│B0│F3│B1│B2│B3│ │
│ GPU 2: │ │ │F0│F1│B0│F2│B1│F3│B2│B3│ │
│ GPU 3: │ │ │ │F0│B0│F1│B1│F2│B2│F3│B3│ │
│ └─────────────────────────────────────────────────┘ │
│ 气泡最小化,显存稳定 │
└─────────────────────────────────────────────────────────────┘
"""
1F1B 调度实现
"""
class PipelineScheduler:
"""1F1B 流水线调度器"""
def __init__(self, num_stages, num_microbatches):
self.num_stages = num_stages
self.num_microbatches = num_microbatches
def generate_schedule(self, stage_id):
"""生成指定 stage 的调度"""
schedule = []
# Warmup: 前向传播填充流水线
warmup_steps = self.num_stages - stage_id - 1
for i in range(warmup_steps):
schedule.append(('forward', i))
# Steady state: 1F1B
steady_steps = self.num_microbatches - warmup_steps
for i in range(steady_steps):
schedule.append(('forward', warmup_steps + i))
schedule.append(('backward', i))
# Cooldown: 反向传播清空流水线
for i in range(warmup_steps):
schedule.append(('backward', steady_steps + i))
return schedule
class PipelineStage(nn.Module):
"""流水线阶段"""
def __init__(self, stage_id, num_stages, layers):
super().__init__()
self.stage_id = stage_id
self.num_stages = num_stages
self.layers = nn.Sequential(*layers)
# 激活值缓存 (用于反向传播)
self.activations = {}
def forward_step(self, micro_batch_id, x):
"""前向传播一个微批次"""
if self.stage_id > 0:
# 从上一阶段接收
x = self.recv_forward()
# 保存输入用于反向
self.activations[micro_batch_id] = x.detach().requires_grad_()
# 计算
output = self.layers(self.activations[micro_batch_id])
if self.stage_id < self.num_stages - 1:
# 发送到下一阶段
self.send_forward(output)
return output
def backward_step(self, micro_batch_id, grad_output=None):
"""反向传播一个微批次"""
if self.stage_id < self.num_stages - 1:
# 从下一阶段接收梯度
grad_output = self.recv_backward()
# 取出保存的激活值
activation = self.activations.pop(micro_batch_id)
# 反向传播
output = self.layers(activation)
output.backward(grad_output)
if self.stage_id > 0:
# 发送梯度到上一阶段
self.send_backward(activation.grad)
2. GPU 调度与资源管理
2.1 基础问题
Q7: 如何实现 GPU 的高效调度?常见的调度策略有哪些?
答案要点:
GPU 调度策略
1. 先来先服务 (FIFO)
┌─────────────────────────────────────────────────────┐
│ 队列: Job1 → Job2 → Job3 → Job4 │
│ 优点: 简单公平 │
│ 缺点: 可能导致小任务等待大任务 │
└─────────────────────────────────────────────────────┘
2. 最短作业优先 (SJF)
┌─────────────────────────────────────────────────────┐
│ 按预估时间排序 │
│ 优点: 平均等待时间最小 │
│ 缺点: 大任务可能饿死,需要准确的时间预估 │
└─────────────────────────────────────────────────────┘
3. 装箱调度 (Bin Packing)
┌─────────────────────────────────────────────────────┐
│ GPU 0: [Job1: 4GB] [Job2: 2GB] [空闲: 2GB] │
│ GPU 1: [Job3: 6GB] [空闲: 2GB] │
│ GPU 2: [Job4: 8GB] │
│ 优点: 资源利用率高 │
│ 缺点: 调度复杂度高 │
└─────────────────────────────────────────────────────┘
4. 公平共享调度 (Fair Share)
┌─────────────────────────────────────────────────────┐
│ 用户 A: 配额 50%, 当前使用 30% │
│ 用户 B: 配额 30%, 当前使用 40% │
│ 用户 C: 配额 20%, 当前使用 10% │
│ │
│ 下次调度优先级: C > A > B (按欠额排序) │
└─────────────────────────────────────────────────────┘
"""
GPU 调度器实现
"""
from dataclasses import dataclass
from typing import List, Dict, Optional
import heapq
@dataclass
class GPUJob:
job_id: str
gpu_memory: int # MB
gpu_count: int
priority: int
submit_time: float
estimated_duration: float
@dataclass
class GPU:
gpu_id: str
total_memory: int # MB
used_memory: int = 0
running_jobs: List[str] = None
def __post_init__(self):
self.running_jobs = []
@property
def free_memory(self):
return self.total_memory - self.used_memory
class GPUScheduler:
"""GPU 调度器"""
def __init__(self, gpus: List[GPU]):
self.gpus = {g.gpu_id: g for g in gpus}
self.pending_queue: List[GPUJob] = []
self.running_jobs: Dict[str, GPUJob] = {}
def submit(self, job: GPUJob):
"""提交任务"""
heapq.heappush(
self.pending_queue,
(-job.priority, job.submit_time, job)
)
self._try_schedule()
def _try_schedule(self):
"""尝试调度"""
scheduled = []
for _, _, job in sorted(self.pending_queue):
allocation = self._find_allocation(job)
if allocation:
self._allocate(job, allocation)
scheduled.append(job)
# 从队列移除已调度的任务
self.pending_queue = [
(p, t, j) for p, t, j in self.pending_queue
if j not in scheduled
]
heapq.heapify(self.pending_queue)
def _find_allocation(self, job: GPUJob) -> Optional[List[str]]:
"""查找可用 GPU (Best Fit 策略)"""
candidates = []
for gpu in self.gpus.values():
if gpu.free_memory >= job.gpu_memory:
candidates.append((gpu.free_memory, gpu.gpu_id))
if len(candidates) < job.gpu_count:
return None
# 选择剩余空间最小的 GPU (Best Fit)
candidates.sort()
return [gpu_id for _, gpu_id in candidates[:job.gpu_count]]
def _allocate(self, job: GPUJob, gpu_ids: List[str]):
"""分配资源"""
for gpu_id in gpu_ids:
gpu = self.gpus[gpu_id]
gpu.used_memory += job.gpu_memory
gpu.running_jobs.append(job.job_id)
self.running_jobs[job.job_id] = job
def release(self, job_id: str):
"""释放资源"""
job = self.running_jobs.pop(job_id, None)
if job:
for gpu in self.gpus.values():
if job_id in gpu.running_jobs:
gpu.running_jobs.remove(job_id)
gpu.used_memory -= job.gpu_memory
self._try_schedule()
Q8: Kubernetes 中如何管理 GPU 资源?
答案要点:
# GPU 资源管理配置
# 1. 基本 GPU 请求
apiVersion: v1
kind: Pod
metadata:
name: gpu-pod
spec:
containers:
- name: cuda-container
image: nvidia/cuda:11.0-base
resources:
limits:
nvidia.com/gpu: 2 # 请求 2 个 GPU
---
# 2. 使用特定 GPU 型号 (Node Selector)
apiVersion: v1
kind: Pod
metadata:
name: a100-pod
spec:
nodeSelector:
nvidia.com/gpu.product: "NVIDIA-A100-SXM4-80GB"
containers:
- name: training
resources:
limits:
nvidia.com/gpu: 8
---
# 3. GPU 共享 (时间片)
# 使用 NVIDIA MPS 或 vGPU
apiVersion: v1
kind: Pod
metadata:
name: shared-gpu-pod
spec:
containers:
- name: inference
resources:
limits:
nvidia.com/gpu: 1
env:
- name: CUDA_MPS_PIPE_DIRECTORY
value: /tmp/nvidia-mps
---
# 4. GPU 拓扑感知调度
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: gpu-topology-aware
value: 1000000
---
apiVersion: v1
kind: Pod
metadata:
name: topology-pod
annotations:
# 请求同一 NVLink 域的 GPU
nvidia.com/gpu-topology: "nvlink"
spec:
containers:
- name: multi-gpu
resources:
limits:
nvidia.com/gpu: 4
"""
GPU 拓扑感知调度
"""
import subprocess
import xml.etree.ElementTree as ET
def get_gpu_topology():
"""获取 GPU 拓扑信息"""
result = subprocess.run(
['nvidia-smi', 'topo', '-m'],
capture_output=True, text=True
)
return parse_topology(result.stdout)
def parse_topology(output):
"""解析拓扑矩阵"""
# 示例输出:
# GPU0 GPU1 GPU2 GPU3
# GPU0 X NV1 NV1 NV2
# GPU1 NV1 X NV2 NV1
# GPU2 NV1 NV2 X NV1
# GPU3 NV2 NV1 NV1 X
topology = {}
lines = output.strip().split('\n')
# 解析连接类型
# NV1, NV2: NVLink 1代, 2代
# PHB: PCIe 同一 Host Bridge
# SYS: 跨 NUMA 节点
return topology
def select_gpus_by_topology(num_gpus, prefer_nvlink=True):
"""根据拓扑选择 GPU"""
topology = get_gpu_topology()
if prefer_nvlink:
# 优先选择 NVLink 连接的 GPU
nvlink_groups = find_nvlink_groups(topology)
for group in nvlink_groups:
if len(group) >= num_gpus:
return group[:num_gpus]
# 回退到任意可用 GPU
return list(range(num_gpus))
2.2 进阶问题
Q9: 如何实现 GPU 显存的动态管理和碎片整理?
答案要点:
"""
GPU 显存管理
"""
import torch
from typing import Dict, List
class GPUMemoryManager:
"""GPU 显存管理器"""
def __init__(self, device_id: int = 0):
self.device = torch.device(f'cuda:{device_id}')
self.allocated_tensors: Dict[int, torch.Tensor] = {}
self.memory_pool: List[torch.Tensor] = []
def get_memory_info(self):
"""获取显存信息"""
allocated = torch.cuda.memory_allocated(self.device)
reserved = torch.cuda.memory_reserved(self.device)
max_memory = torch.cuda.max_memory_allocated(self.device)
return {
'allocated': allocated / 1024**3, # GB
'reserved': reserved / 1024**3,
'max_allocated': max_memory / 1024**3,
'fragmentation': (reserved - allocated) / reserved if reserved > 0 else 0
}
def defragment(self):
"""显存碎片整理"""
# 方法 1: 清空缓存
torch.cuda.empty_cache()
# 方法 2: 重新分配张量 (高级)
# 将所有张量复制到 CPU,清空 GPU,再复制回来
cpu_copies = {}
for tensor_id, tensor in self.allocated_tensors.items():
cpu_copies[tensor_id] = tensor.cpu()
# 清空 GPU
self.allocated_tensors.clear()
torch.cuda.empty_cache()
# 重新分配(连续)
for tensor_id, cpu_tensor in cpu_copies.items():
self.allocated_tensors[tensor_id] = cpu_tensor.to(self.device)
def allocate_from_pool(self, shape, dtype=torch.float32):
"""从池中分配(减少碎片)"""
required_size = torch.tensor(shape).prod().item() * dtype.itemsize
# 查找合适的预分配块
for i, tensor in enumerate(self.memory_pool):
if tensor.numel() * tensor.element_size() >= required_size:
# 复用现有块
block = self.memory_pool.pop(i)
return block[:shape[0]].view(shape)
# 分配新块
return torch.empty(shape, dtype=dtype, device=self.device)
def return_to_pool(self, tensor: torch.Tensor):
"""归还到池中"""
self.memory_pool.append(tensor)
# 显存优化技术
def gradient_checkpointing_example():
"""梯度检查点:用计算换显存"""
from torch.utils.checkpoint import checkpoint
class CheckpointedModel(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.ModuleList([
nn.Linear(1024, 1024) for _ in range(100)
])
def forward(self, x):
for i, layer in enumerate(self.layers):
if i % 10 == 0:
# 每 10 层设置检查点
x = checkpoint(self._forward_segment, x, i)
else:
x = layer(x)
return x
def _forward_segment(self, x, start_idx):
for j in range(10):
if start_idx + j < len(self.layers):
x = self.layers[start_idx + j](x)
return x
def offload_to_cpu():
"""CPU Offload:将部分参数/优化器状态放到 CPU"""
# DeepSpeed ZeRO-Offload
config = {
"zero_optimization": {
"stage": 2,
"offload_optimizer": {
"device": "cpu",
"pin_memory": True
},
"offload_param": {
"device": "cpu",
"pin_memory": True
}
}
}
3. 模型服务与推理
3.1 基础问题
Q10: 模型推理服务的关键性能指标有哪些?如何优化?
答案要点:
推理性能指标
1. 延迟 (Latency)
┌─────────────────────────────────────────────────────┐
│ P50: 50% 请求的响应时间 │
│ P95: 95% 请求的响应时间 │
│ P99: 99% 请求的响应时间 │
│ 目标: P99 < SLA 要求 │
└─────────────────────────────────────────────────────┘
2. 吞吐量 (Throughput)
┌─────────────────────────────────────────────────────┐
│ QPS: 每秒查询数 │
│ Tokens/s: 每秒生成 token 数 (LLM) │
│ 目标: 满足业务峰值需求 │
└─────────────────────────────────────────────────────┘
3. 资源利用率
┌─────────────────────────────────────────────────────┐
│ GPU 利用率: 目标 > 80% │
│ 显存利用率: 目标 > 70% │
│ CPU 利用率: 目标 50-70% │
└─────────────────────────────────────────────────────┘
"""
推理优化技术
"""
# 1. 模型量化
import torch.quantization as quant
def quantize_model(model):
"""动态量化 (推理时量化)"""
quantized_model = quant.quantize_dynamic(
model,
{torch.nn.Linear, torch.nn.LSTM},
dtype=torch.qint8
)
return quantized_model
def static_quantize(model, calibration_data):
"""静态量化 (需要校准数据)"""
model.eval()
# 准备量化
model.qconfig = quant.get_default_qconfig('fbgemm')
quant.prepare(model, inplace=True)
# 校准
with torch.no_grad():
for data in calibration_data:
model(data)
# 转换
quant.convert(model, inplace=True)
return model
# 2. 批处理优化
class DynamicBatcher:
"""动态批处理"""
def __init__(self, model, max_batch_size=32,
max_wait_time=0.01):
self.model = model
self.max_batch_size = max_batch_size
self.max_wait_time = max_wait_time
self.pending_requests = []
self.lock = threading.Lock()
def add_request(self, input_data):
"""添加请求"""
future = asyncio.Future()
with self.lock:
self.pending_requests.append((input_data, future))
if len(self.pending_requests) >= self.max_batch_size:
self._process_batch()
return future
def _process_batch(self):
"""处理批次"""
if not self.pending_requests:
return
batch_inputs = [r[0] for r in self.pending_requests]
futures = [r[1] for r in self.pending_requests]
self.pending_requests = []
# 批量推理
batch_tensor = torch.stack(batch_inputs)
with torch.no_grad():
outputs = self.model(batch_tensor)
# 分发结果
for i, future in enumerate(futures):
future.set_result(outputs[i])
# 3. KV Cache 优化 (LLM)
class KVCache:
"""Key-Value 缓存管理"""
def __init__(self, num_layers, num_heads,
head_dim, max_seq_len):
self.num_layers = num_layers
self.cache = {}
def get_or_create(self, batch_id, device):
"""获取或创建缓存"""
if batch_id not in self.cache:
self.cache[batch_id] = [
(
torch.zeros(1, self.num_heads, 0, self.head_dim, device=device),
torch.zeros(1, self.num_heads, 0, self.head_dim, device=device)
)
for _ in range(self.num_layers)
]
return self.cache[batch_id]
def update(self, batch_id, layer_idx, new_k, new_v):
"""更新缓存"""
k, v = self.cache[batch_id][layer_idx]
self.cache[batch_id][layer_idx] = (
torch.cat([k, new_k], dim=2),
torch.cat([v, new_v], dim=2)
)
# 4. Continuous Batching (LLM)
class ContinuousBatcher:
"""连续批处理 (vLLM 风格)"""
def __init__(self, model, max_batch_tokens=4096):
self.model = model
self.max_batch_tokens = max_batch_tokens
self.running_requests = []
self.waiting_requests = []
def step(self):
"""执行一步推理"""
# 1. 检查已完成的请求
completed = []
for req in self.running_requests:
if req.is_finished():
completed.append(req)
for req in completed:
self.running_requests.remove(req)
# 2. 添加新请求
current_tokens = sum(r.current_length for r in self.running_requests)
while self.waiting_requests and \
current_tokens < self.max_batch_tokens:
req = self.waiting_requests.pop(0)
self.running_requests.append(req)
current_tokens += req.current_length
# 3. 批量前向
if self.running_requests:
self._forward_batch()
Q11: 如何设计高可用的模型服务架构?
答案要点:
高可用推理服务架构
┌─────────────────────────────────────────────────────────────────────┐
│ 负载均衡层 │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ L4/L7 LB │ 健康检查 │ 会话保持 │ 限流 │ │
│ └────────────────────────┬────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────┴────────────────────────────────────┐ │
│ │ API 网关 │ │
│ │ 认证 │ 路由 │ 限流 │ 监控 │ 协议转换 │ │
│ └────────────────────────┬────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────┴────────────────────────────────────┐ │
│ │ 推理服务集群 │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Node 1 │ │ Node 2 │ │ Node 3 │ │ │
│ │ │┌───────┐│ │┌───────┐│ │┌───────┐│ │ │
│ │ ││Triton ││ ││Triton ││ ││Triton ││ │ │
│ │ │└───────┘│ │└───────┘│ │└───────┘│ │ │
│ │ │ GPU×4 │ │ GPU×4 │ │ GPU×4 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ 模型仓库 (共享存储) │ │
│ │ Model Registry │ Version Control │ A/B Testing │ │
│ └──────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
"""
高可用设计实现
"""
# 1. 健康检查
from fastapi import FastAPI, HTTPException
import asyncio
app = FastAPI()
class HealthChecker:
def __init__(self, model):
self.model = model
self.is_healthy = True
self.last_check = 0
async def check(self):
"""执行健康检查"""
try:
# 模型推理测试
test_input = torch.randn(1, 3, 224, 224).cuda()
with torch.no_grad():
_ = self.model(test_input)
# GPU 状态检查
memory_used = torch.cuda.memory_allocated()
memory_total = torch.cuda.get_device_properties(0).total_memory
if memory_used / memory_total > 0.95:
return False, "GPU memory critical"
self.is_healthy = True
return True, "OK"
except Exception as e:
self.is_healthy = False
return False, str(e)
@app.get("/health")
async def health_check():
healthy, message = await health_checker.check()
if not healthy:
raise HTTPException(status_code=503, detail=message)
return {"status": "healthy"}
@app.get("/ready")
async def readiness_check():
if not model_loaded:
raise HTTPException(status_code=503, detail="Model not loaded")
return {"status": "ready"}
# 2. 优雅关闭
import signal
class GracefulShutdown:
def __init__(self):
self.shutdown = False
signal.signal(signal.SIGTERM, self._handle_sigterm)
def _handle_sigterm(self, signum, frame):
print("Received SIGTERM, initiating graceful shutdown...")
self.shutdown = True
async def wait_for_requests(self, timeout=30):
"""等待现有请求完成"""
start = time.time()
while active_requests > 0 and time.time() - start < timeout:
await asyncio.sleep(0.1)
# 3. 自动扩缩容指标
class AutoscalingMetrics:
"""HPA 自定义指标"""
def __init__(self):
self.request_count = 0
self.latency_sum = 0
self.gpu_utilization = 0
def record_request(self, latency):
self.request_count += 1
self.latency_sum += latency
def get_metrics(self):
return {
"qps": self.request_count / 60, # 过去 60 秒
"avg_latency": self.latency_sum / max(self.request_count, 1),
"gpu_utilization": self.gpu_utilization
}
# 4. 模型版本管理
class ModelVersionManager:
"""模型版本管理"""
def __init__(self, model_repo: str):
self.model_repo = model_repo
self.loaded_versions = {}
self.active_version = None
async def load_version(self, version: str):
"""加载指定版本"""
model_path = f"{self.model_repo}/{version}/model.pt"
model = torch.jit.load(model_path)
model.cuda()
model.eval()
self.loaded_versions[version] = model
async def switch_version(self, version: str):
"""切换活跃版本"""
if version not in self.loaded_versions:
await self.load_version(version)
self.active_version = version
async def canary_rollout(self, new_version: str,
traffic_percent: float = 10):
"""金丝雀发布"""
await self.load_version(new_version)
self.canary_version = new_version
self.canary_traffic = traffic_percent
def get_model_for_request(self):
"""获取处理请求的模型"""
if hasattr(self, 'canary_version') and \
random.random() * 100 < self.canary_traffic:
return self.loaded_versions[self.canary_version]
return self.loaded_versions[self.active_version]
4. 存储系统
4.1 基础问题
Q12: AI 训练场景对存储系统有什么特殊要求?
答案要点:
AI 存储需求分析
训练数据特征:
┌─────────────────────────────────────────────────────────────────┐
│ 阶段 │ I/O 模式 │ 吞吐需求 │ 延迟要求 │
├─────────────────────────────────────────────────────────────────┤
│ 数据加载 │ 大量顺序读 │ 高 (GB/s) │ 低敏感 │
│ Checkpoint │ 大块顺序写 │ 中 (GB/s) │ 中等 │
│ 日志/指标 │ 小块随机写 │ 低 │ 低敏感 │
│ 模型保存 │ 大块顺序写 │ 中 │ 低敏感 │
└─────────────────────────────────────────────────────────────────┘
存储系统选型:
┌─────────────────────────────────────────────────────────────────┐
│ 场景 │ 推荐存储 │ 原因 │
├─────────────────────────────────────────────────────────────────┤
│ 训练数据集 │ 分布式文件系统 │ 高吞吐,共享访问 │
│ (HDFS/Lustre) │ │ │
├─────────────────────────────────────────────────────────────────┤
│ 模型 Checkpoint │ 并行文件系统 │ 低延迟,高 IOPS │
│ (GPFS/BeeGFS) │ │ │
├─────────────────────────────────────────────────────────────────┤
│ 模型仓库 │ 对象存储 │ 版本管理,成本效益 │
│ (S3/MinIO) │ │ │
├─────────────────────────────────────────────────────────────────┤
│ 特征存储 │ 分布式 KV │ 低延迟查询 │
│ (Redis/Cassandra) │ │ │
└─────────────────────────────────────────────────────────────────┘
"""
数据加载优化
"""
# 1. 多进程数据加载
from torch.utils.data import DataLoader
def create_optimized_dataloader(dataset, batch_size=32):
return DataLoader(
dataset,
batch_size=batch_size,
shuffle=True,
num_workers=8, # 多进程加载
pin_memory=True, # 固定内存,加速 GPU 传输
prefetch_factor=2, # 预取因子
persistent_workers=True # 持久化 worker
)
# 2. 内存映射数据集
import numpy as np
class MemmapDataset:
"""内存映射数据集(适合大数据)"""
def __init__(self, data_path, shape, dtype=np.float32):
self.data = np.memmap(
data_path,
dtype=dtype,
mode='r',
shape=shape
)
def __getitem__(self, idx):
return torch.from_numpy(self.data[idx].copy())
def __len__(self):
return len(self.data)
# 3. 分布式数据加载
from torch.utils.data.distributed import DistributedSampler
def create_distributed_dataloader(dataset, world_size, rank):
sampler = DistributedSampler(
dataset,
num_replicas=world_size,
rank=rank,
shuffle=True
)
return DataLoader(
dataset,
batch_size=32,
sampler=sampler,
num_workers=4,
pin_memory=True
)
# 4. 数据预处理流水线
class AsyncDataPipeline:
"""异步数据流水线"""
def __init__(self, data_source, buffer_size=100):
self.data_source = data_source
self.buffer = queue.Queue(maxsize=buffer_size)
self.stop_event = threading.Event()
def start(self):
"""启动后台加载线程"""
self.loader_thread = threading.Thread(target=self._load_data)
self.loader_thread.start()
def _load_data(self):
"""后台加载数据"""
for item in self.data_source:
if self.stop_event.is_set():
break
# 预处理
processed = self._preprocess(item)
self.buffer.put(processed)
def _preprocess(self, item):
"""CPU 预处理"""
# 解码、增强、归一化等
return item
def get_batch(self, batch_size):
"""获取批次"""
batch = []
for _ in range(batch_size):
batch.append(self.buffer.get())
return torch.stack(batch)
Q13: 如何优化 Checkpoint 保存和恢复的性能?
答案要点:
"""
Checkpoint 优化策略
"""
# 1. 异步 Checkpoint
class AsyncCheckpointer:
"""异步 Checkpoint 保存"""
def __init__(self, save_dir, max_concurrent=2):
self.save_dir = save_dir
self.executor = ThreadPoolExecutor(max_workers=max_concurrent)
self.pending_saves = []
def save_async(self, state_dict, filename):
"""异步保存"""
# 先复制到 CPU (非阻塞)
cpu_state = {
k: v.cpu().clone() if torch.is_tensor(v) else v
for k, v in state_dict.items()
}
# 异步写入
future = self.executor.submit(
self._save_to_disk, cpu_state, filename
)
self.pending_saves.append(future)
# 清理已完成的
self.pending_saves = [f for f in self.pending_saves if not f.done()]
def _save_to_disk(self, state_dict, filename):
path = os.path.join(self.save_dir, filename)
torch.save(state_dict, path)
def wait_all(self):
"""等待所有保存完成"""
for future in self.pending_saves:
future.result()
# 2. 增量 Checkpoint
class IncrementalCheckpointer:
"""增量 Checkpoint"""
def __init__(self, base_dir):
self.base_dir = base_dir
self.last_state = {}
def save_incremental(self, state_dict, step):
"""保存增量"""
changed_keys = []
for key, value in state_dict.items():
if key not in self.last_state:
changed_keys.append(key)
elif not torch.equal(value, self.last_state[key]):
changed_keys.append(key)
# 只保存变化的部分
incremental_state = {k: state_dict[k] for k in changed_keys}
path = os.path.join(self.base_dir, f'incr_{step}.pt')
torch.save({
'changed_keys': changed_keys,
'state': incremental_state
}, path)
self.last_state = {k: v.clone() for k, v in state_dict.items()}
def load_incremental(self, base_step, target_step):
"""加载增量"""
# 加载基础状态
state = torch.load(f'{self.base_dir}/full_{base_step}.pt')
# 应用增量
for step in range(base_step + 1, target_step + 1):
incr = torch.load(f'{self.base_dir}/incr_{step}.pt')
for key in incr['changed_keys']:
state[key] = incr['state'][key]
return state
# 3. 分布式 Checkpoint
class DistributedCheckpointer:
"""分布式 Checkpoint (每个 rank 保存自己的部分)"""
def __init__(self, save_dir, rank, world_size):
self.save_dir = save_dir
self.rank = rank
self.world_size = world_size
def save_sharded(self, model, optimizer, step):
"""分片保存"""
# 每个 rank 保存自己的分片
shard = {
'model': self._get_local_state(model),
'optimizer': optimizer.state_dict(),
'step': step
}
path = f'{self.save_dir}/checkpoint_{step}_rank{self.rank}.pt'
torch.save(shard, path)
# 同步确保所有 rank 完成
dist.barrier()
def load_sharded(self, model, optimizer, step):
"""分片加载"""
# 每个 rank 加载自己的分片
path = f'{self.save_dir}/checkpoint_{step}_rank{self.rank}.pt'
shard = torch.load(path, map_location=f'cuda:{self.rank}')
self._load_local_state(model, shard['model'])
optimizer.load_state_dict(shard['optimizer'])
dist.barrier()
return shard['step']
# 4. Checkpoint 压缩
import zlib
class CompressedCheckpointer:
"""压缩 Checkpoint"""
def save_compressed(self, state_dict, path):
"""压缩保存"""
import io
buffer = io.BytesIO()
torch.save(state_dict, buffer)
compressed = zlib.compress(buffer.getvalue(), level=6)
with open(path, 'wb') as f:
f.write(compressed)
def load_compressed(self, path):
"""加载压缩文件"""
import io
with open(path, 'rb') as f:
compressed = f.read()
decompressed = zlib.decompress(compressed)
buffer = io.BytesIO(decompressed)
return torch.load(buffer)
5. 系统设计题
5.1 设计一个模型训练平台
题目: 设计一个支持多租户的 GPU 训练平台,支持资源隔离、任务调度、监控告警。
答案框架:
系统架构
┌─────────────────────────────────────────────────────────────────────┐
│ 用户层 │
│ Web UI │ CLI │ SDK │ REST API │
└───────────────────────────────┬─────────────────────────────────────┘
│
┌───────────────────────────────┴─────────────────────────────────────┐
│ 平台服务层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 任务管理 │ │ 资源管理 │ │ 用户管理 │ │ 监控告警 │ │
│ │ │ │ │ │ │ │ │ │
│ │ - 任务提交 │ │ - GPU 调度 │ │ - 认证授权 │ │ - 指标采集 │ │
│ │ - 生命周期 │ │ - 配额管理 │ │ - 租户隔离 │ │ - 日志聚合 │ │
│ │ - 日志查看 │ │ - 优先级 │ │ - 计费统计 │ │ - 异常告警 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└───────────────────────────────┬─────────────────────────────────────┘
│
┌───────────────────────────────┴─────────────────────────────────────┐
│ 编排调度层 │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Kubernetes │ │
│ │ - GPU Device Plugin │ │
│ │ - Custom Scheduler │ │
│ │ - Training Operator (PyTorchJob/TFJob) │ │
│ │ - Network Policy (租户隔离) │ │
│ └─────────────────────────────────────────────────────────────┘ │
└───────────────────────────────┬─────────────────────────────────────┘
│
┌───────────────────────────────┴─────────────────────────────────────┐
│ 基础设施层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ GPU 集群 │ │ 存储系统 │ │ 网络 │ │ 镜像仓库 │ │
│ │ A100/H100 │ │ NFS/Lustre │ │ RDMA/RoCE │ │ Harbor │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
"""
核心组件设计
"""
# 1. 任务定义
@dataclass
class TrainingJob:
job_id: str
user_id: str
tenant_id: str
name: str
image: str
command: List[str]
# 资源需求
gpu_count: int
gpu_type: str # A100, V100, etc.
cpu: int
memory: str # 64Gi
# 分布式配置
world_size: int = 1
distributed_backend: str = 'nccl'
# 优先级和调度
priority: int = 0
preemptible: bool = False
max_runtime: int = 86400 # 秒
# 存储挂载
volumes: List[Dict] = None
# 2. 资源配额
@dataclass
class ResourceQuota:
tenant_id: str
gpu_limit: int
gpu_used: int
cpu_limit: int
memory_limit: str
storage_limit: str
def can_allocate(self, job: TrainingJob) -> bool:
return (self.gpu_used + job.gpu_count) <= self.gpu_limit
# 3. 调度器接口
class TrainingScheduler:
def schedule(self, job: TrainingJob) -> Optional[str]:
"""调度任务,返回节点分配"""
# 1. 检查配额
quota = self.get_quota(job.tenant_id)
if not quota.can_allocate(job):
return None
# 2. 查找可用资源
candidates = self.find_available_nodes(job)
if not candidates:
return None
# 3. 选择最优节点 (考虑拓扑)
node = self.select_best_node(candidates, job)
# 4. 分配资源
self.allocate(node, job)
return node
# 4. 监控系统
class TrainingMonitor:
def collect_metrics(self, job_id: str) -> Dict:
return {
'gpu_utilization': self.get_gpu_util(job_id),
'gpu_memory': self.get_gpu_memory(job_id),
'throughput': self.get_throughput(job_id),
'loss': self.get_loss(job_id),
'learning_rate': self.get_lr(job_id)
}
def check_anomaly(self, metrics: Dict) -> List[str]:
anomalies = []
if metrics['gpu_utilization'] < 10:
anomalies.append('GPU 利用率过低')
if metrics['loss'] > 100:
anomalies.append('Loss 异常')
return anomalies
总结
本章覆盖了 AI 基础设施的核心面试题:
- 分布式训练:数据/模型并行、AllReduce、ZeRO、混合精度
- GPU 调度:调度策略、K8s GPU 管理、显存优化
- 模型服务:推理优化、高可用架构、动态批处理
- 存储系统:数据加载、Checkpoint 优化
- 系统设计:训练平台架构
关键要点:
- 理解底层原理,不只是会用框架
- 关注性能优化和资源效率
- 具备系统设计和问题分析能力
- 实践经验和故障处理能力
下一章将探讨大模型相关的面试题。