HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

01-AI基础设施核心面试题

概述

本章汇总 AI 基础设施领域的核心面试题,涵盖分布式训练、GPU 调度、模型服务、存储系统等关键主题,帮助读者全面准备技术面试。

1. 分布式训练

1.1 基础概念

Q1: 解释数据并行和模型并行的区别?

数据并行 vs 模型并行

┌─────────────────────────────────────────────────────────────────────┐
│                        数据并行                                       │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│     数据批次                                                         │
│    ┌─────────────────────────────────────────┐                     │
│    │  Batch 0  │  Batch 1  │  Batch 2  │ ... │                     │
│    └─────┬─────┴─────┬─────┴─────┬─────┴─────┘                     │
│          │           │           │                                  │
│          ▼           ▼           ▼                                  │
│     ┌─────────┐ ┌─────────┐ ┌─────────┐                            │
│     │  GPU 0  │ │  GPU 1  │ │  GPU 2  │  每个 GPU 有完整模型        │
│     │ Model   │ │ Model   │ │ Model   │                            │
│     │ Copy    │ │ Copy    │ │ Copy    │                            │
│     └────┬────┘ └────┬────┘ └────┬────┘                            │
│          │           │           │                                  │
│          └───────────┼───────────┘                                  │
│                      ▼                                              │
│              梯度聚合 (AllReduce)                                    │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│                        模型并行                                       │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│     完整模型                                                         │
│    ┌──────────────────────────────────────────┐                    │
│    │  Layer 0-3  │  Layer 4-7  │  Layer 8-11  │                    │
│    └──────┬──────┴──────┬──────┴──────┬───────┘                    │
│           │             │             │                             │
│           ▼             ▼             ▼                             │
│     ┌─────────┐   ┌─────────┐   ┌─────────┐                        │
│     │  GPU 0  │──▶│  GPU 1  │──▶│  GPU 2  │  每个 GPU 有部分模型    │
│     │Layer0-3 │   │Layer4-7 │   │Layer8-11│                        │
│     └─────────┘   └─────────┘   └─────────┘                        │
│                                                                     │
│     激活值在 GPU 之间传递 (Pipeline)                                 │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

答案要点:

"""
数据并行 (Data Parallelism)
"""
# 特点:
# 1. 每个 GPU 持有完整的模型副本
# 2. 训练数据被分割到不同 GPU
# 3. 前向和反向传播独立进行
# 4. 通过 AllReduce 同步梯度

# 适用场景:
# - 模型能放入单个 GPU 显存
# - 需要加速训练速度
# - 批量大小较大

# PyTorch 实现
import torch.nn.parallel as parallel

# DDP (推荐)
model = parallel.DistributedDataParallel(model, device_ids=[local_rank])

# DP (简单但效率较低)
model = parallel.DataParallel(model)

"""
模型并行 (Model Parallelism)
"""
# 特点:
# 1. 模型被切分到不同 GPU
# 2. 每个 GPU 只持有部分层
# 3. 激活值在 GPU 之间传递
# 4. 存在流水线气泡问题

# 适用场景:
# - 单个模型无法放入一个 GPU
# - 大型语言模型训练
# - 内存受限场景

# 手动实现示例
class ModelParallel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Linear(1000, 1000).to('cuda:0')
        self.layer2 = nn.Linear(1000, 1000).to('cuda:1')

    def forward(self, x):
        x = self.layer1(x.to('cuda:0'))
        x = self.layer2(x.to('cuda:1'))
        return x

Q2: AllReduce 操作的原理是什么?常见实现算法有哪些?

答案要点:

AllReduce 操作流程

初始状态:每个 GPU 有局部梯度
GPU 0: [g0]    GPU 1: [g1]    GPU 2: [g2]    GPU 3: [g3]

AllReduce 后:每个 GPU 有全局聚合梯度
GPU 0: [Σg]   GPU 1: [Σg]   GPU 2: [Σg]   GPU 3: [Σg]

其中 Σg = g0 + g1 + g2 + g3


常见算法:

1. Ring AllReduce
┌───────────────────────────────────────────────────┐
│                                                   │
│    GPU 0 ──────▶ GPU 1 ──────▶ GPU 2             │
│      ▲                           │                │
│      │                           ▼                │
│      └─────────── GPU 3 ◀────────┘                │
│                                                   │
│  优点:带宽利用率高,O(N) 通信量                    │
│  缺点:延迟随节点数线性增长                         │
└───────────────────────────────────────────────────┘

2. Tree AllReduce
┌───────────────────────────────────────────────────┐
│                                                   │
│              Root (GPU 0)                         │
│              /         \                          │
│          GPU 1        GPU 2                       │
│          /   \        /   \                       │
│       GPU 3  GPU 4  GPU 5  GPU 6                 │
│                                                   │
│  优点:延迟 O(logN)                                │
│  缺点:根节点带宽瓶颈                              │
└───────────────────────────────────────────────────┘

3. Recursive Halving-Doubling
┌───────────────────────────────────────────────────┐
│  Step 1: 距离 1 的节点交换                         │
│  Step 2: 距离 2 的节点交换                         │
│  Step 3: 距离 4 的节点交换                         │
│  ...                                              │
│                                                   │
│  优点:适合小消息,延迟低                          │
│  缺点:大消息时带宽利用率不高                       │
└───────────────────────────────────────────────────┘
"""
Ring AllReduce 简化实现
"""
def ring_allreduce(tensors, world_size, rank):
    """
    Ring AllReduce 算法

    分为两个阶段:
    1. Scatter-Reduce: 将数据分成 N 份,每份通过环传递并累加
    2. Allgather: 将最终结果传播到所有节点
    """
    chunk_size = len(tensors) // world_size

    # Phase 1: Scatter-Reduce
    for step in range(world_size - 1):
        send_chunk = (rank - step) % world_size
        recv_chunk = (rank - step - 1) % world_size

        # 发送和接收
        send_data = tensors[send_chunk * chunk_size:(send_chunk + 1) * chunk_size]
        recv_data = receive_from((rank + 1) % world_size)
        send_to((rank - 1) % world_size, send_data)

        # 累加
        tensors[recv_chunk * chunk_size:(recv_chunk + 1) * chunk_size] += recv_data

    # Phase 2: Allgather
    for step in range(world_size - 1):
        send_chunk = (rank - step + 1) % world_size
        recv_chunk = (rank - step) % world_size

        send_data = tensors[send_chunk * chunk_size:(send_chunk + 1) * chunk_size]
        recv_data = receive_from((rank + 1) % world_size)
        send_to((rank - 1) % world_size, send_data)

        tensors[recv_chunk * chunk_size:(recv_chunk + 1) * chunk_size] = recv_data

    return tensors

# NCCL 使用示例
import torch.distributed as dist

# 初始化
dist.init_process_group(backend='nccl')

# AllReduce
tensor = torch.randn(1000).cuda()
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)

Q3: 解释 ZeRO 优化的三个阶段?

答案要点:

ZeRO (Zero Redundancy Optimizer) 三阶段

标准数据并行的显存占用 (每个 GPU):
┌─────────────────────────────────────────────────────┐
│  模型参数 (Φ)      │ 2Φ bytes (fp16)               │
│  梯度 (G)          │ 2Φ bytes (fp16)               │
│  优化器状态 (O)    │ 12Φ bytes (Adam: fp32 参数+动量+方差) │
│  总计              │ 16Φ bytes                      │
└─────────────────────────────────────────────────────┘

ZeRO-1: 优化器状态分片
┌─────────────────────────────────────────────────────┐
│  GPU 0: 参数 + 梯度 + 优化器状态[0:N/4]            │
│  GPU 1: 参数 + 梯度 + 优化器状态[N/4:N/2]          │
│  GPU 2: 参数 + 梯度 + 优化器状态[N/2:3N/4]         │
│  GPU 3: 参数 + 梯度 + 优化器状态[3N/4:N]           │
│                                                     │
│  显存: 4Φ + 12Φ/N bytes                            │
│  通信量: 不变                                       │
└─────────────────────────────────────────────────────┘

ZeRO-2: 梯度 + 优化器状态分片
┌─────────────────────────────────────────────────────┐
│  GPU 0: 参数 + 梯度[0:N/4] + 优化器状态[0:N/4]     │
│  GPU 1: 参数 + 梯度[N/4:N/2] + 优化器状态[N/4:N/2] │
│  ...                                                │
│                                                     │
│  显存: 2Φ + 14Φ/N bytes                            │
│  通信量: Reduce-Scatter 替代 AllReduce              │
└─────────────────────────────────────────────────────┘

ZeRO-3: 参数 + 梯度 + 优化器状态分片
┌─────────────────────────────────────────────────────┐
│  GPU 0: 参数[0:N/4] + 梯度[0:N/4] + 优化器[0:N/4]  │
│  GPU 1: 参数[N/4:N/2] + ...                        │
│  ...                                                │
│                                                     │
│  显存: 16Φ/N bytes                                 │
│  通信量: 前向/反向时需要 AllGather 参数             │
└─────────────────────────────────────────────────────┘
"""
DeepSpeed ZeRO 配置示例
"""

# ZeRO Stage 1 配置
zero_stage1_config = {
    "zero_optimization": {
        "stage": 1,
        "reduce_bucket_size": 5e8,
        "allgather_bucket_size": 5e8
    }
}

# ZeRO Stage 2 配置
zero_stage2_config = {
    "zero_optimization": {
        "stage": 2,
        "contiguous_gradients": True,
        "overlap_comm": True,
        "reduce_scatter": True,
        "reduce_bucket_size": 5e8,
        "allgather_bucket_size": 5e8
    }
}

# ZeRO Stage 3 配置
zero_stage3_config = {
    "zero_optimization": {
        "stage": 3,
        "contiguous_gradients": True,
        "stage3_prefetch_bucket_size": 5e8,
        "stage3_param_persistence_threshold": 1e6,
        "stage3_max_live_parameters": 1e9,
        "stage3_max_reuse_distance": 1e9,
        "stage3_gather_16bit_weights_on_model_save": True
    }
}

# 使用 DeepSpeed
import deepspeed

model_engine, optimizer, _, _ = deepspeed.initialize(
    model=model,
    model_parameters=model.parameters(),
    config=zero_stage3_config
)

Q4: 梯度累积的作用是什么?如何正确实现?

答案要点:

"""
梯度累积 (Gradient Accumulation)

作用:
1. 在显存有限时模拟大批量训练
2. 等效批量 = 每步批量 × 累积步数
3. 减少通信频率(分布式训练)

注意事项:
1. 损失需要除以累积步数
2. 梯度同步只在累积完成时进行
3. 学习率可能需要调整
"""

def train_with_gradient_accumulation(
    model, dataloader, optimizer,
    accumulation_steps=4
):
    """正确的梯度累积实现"""
    model.train()
    optimizer.zero_grad()

    for i, (inputs, labels) in enumerate(dataloader):
        inputs = inputs.cuda()
        labels = labels.cuda()

        # 前向传播
        outputs = model(inputs)

        # 损失除以累积步数,保证梯度大小一致
        loss = criterion(outputs, labels) / accumulation_steps

        # 反向传播(梯度累积)
        loss.backward()

        # 每 accumulation_steps 步更新一次
        if (i + 1) % accumulation_steps == 0:
            # 梯度裁剪(可选)
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

            # 参数更新
            optimizer.step()
            optimizer.zero_grad()


"""
分布式训练中的梯度累积
"""
def train_distributed_with_accumulation(
    model, dataloader, optimizer,
    accumulation_steps=4
):
    """DDP + 梯度累积"""
    model.train()
    optimizer.zero_grad()

    for i, (inputs, labels) in enumerate(dataloader):
        # 判断是否是累积的最后一步
        is_accumulation_step = (i + 1) % accumulation_steps != 0

        # 非最后一步时禁用梯度同步
        with model.no_sync() if is_accumulation_step else nullcontext():
            outputs = model(inputs.cuda())
            loss = criterion(outputs, labels.cuda()) / accumulation_steps
            loss.backward()

        # 累积完成后更新
        if not is_accumulation_step:
            optimizer.step()
            optimizer.zero_grad()

1.2 进阶问题

Q5: 混合精度训练的原理?可能遇到什么问题?

答案要点:

混合精度训练 (Mixed Precision Training)

数值表示范围:
┌─────────────────────────────────────────────────────┐
│  FP32: 符号(1) + 指数(8) + 尾数(23) = 32位          │
│        范围: ±3.4e38, 精度: ~7位有效数字            │
│                                                     │
│  FP16: 符号(1) + 指数(5) + 尾数(10) = 16位          │
│        范围: ±65504, 精度: ~3位有效数字             │
│                                                     │
│  BF16: 符号(1) + 指数(8) + 尾数(7) = 16位           │
│        范围: ±3.4e38, 精度: ~2位有效数字            │
└─────────────────────────────────────────────────────┘

混合精度策略:
┌─────────────────────────────────────────────────────┐
│  FP16/BF16                    FP32                  │
│  ├─ 前向计算                  ├─ 主权重副本         │
│  ├─ 反向传播                  ├─ 损失缩放           │
│  └─ 梯度存储                  └─ 优化器状态         │
└─────────────────────────────────────────────────────┘
"""
混合精度训练实现
"""

# 问题 1: 下溢 (Underflow)
# FP16 最小正数约 6e-8,小于此值的梯度变为 0

# 解决方案: 损失缩放 (Loss Scaling)
scaler = torch.cuda.amp.GradScaler(init_scale=65536)

for inputs, labels in dataloader:
    optimizer.zero_grad()

    # 自动混合精度上下文
    with torch.cuda.amp.autocast():
        outputs = model(inputs)
        loss = criterion(outputs, labels)

    # 缩放损失并反向传播
    scaler.scale(loss).backward()

    # 反缩放梯度并更新
    scaler.step(optimizer)
    scaler.update()


# 问题 2: 上溢 (Overflow)
# FP16 最大值约 65504,大于此值变为 inf

# 解决方案: 动态损失缩放
class DynamicLossScaler:
    def __init__(self, init_scale=2**16,
                 growth_factor=2,
                 backoff_factor=0.5,
                 growth_interval=2000):
        self.scale = init_scale
        self.growth_factor = growth_factor
        self.backoff_factor = backoff_factor
        self.growth_interval = growth_interval
        self.steps_since_growth = 0

    def scale_loss(self, loss):
        return loss * self.scale

    def update(self, has_overflow):
        if has_overflow:
            # 检测到溢出,降低缩放因子
            self.scale *= self.backoff_factor
            self.steps_since_growth = 0
        else:
            self.steps_since_growth += 1
            if self.steps_since_growth >= self.growth_interval:
                # 稳定后增大缩放因子
                self.scale *= self.growth_factor
                self.steps_since_growth = 0


# 问题 3: 精度敏感层
# 某些操作对精度敏感,需要保持 FP32

# 解决方案: 选择性精度
with torch.cuda.amp.autocast():
    # 大部分计算使用 FP16
    x = model.conv(x)
    x = model.bn(x)

    # 精度敏感操作保持 FP32
    with torch.cuda.amp.autocast(enabled=False):
        x = x.float()
        x = model.layer_norm(x)

Q6: 流水线并行中的气泡问题如何解决?

答案要点:

流水线气泡问题

朴素流水线 (GPU 利用率低):
┌─────────────────────────────────────────────────────────────┐
│ GPU 0: │F0│  │  │  │  │B0│  │  │  │  │                     │
│ GPU 1: │  │F0│  │  │  │  │B0│  │  │  │                     │
│ GPU 2: │  │  │F0│  │  │  │  │B0│  │  │                     │
│ GPU 3: │  │  │  │F0│B0│  │  │  │  │  │                     │
│        └─────────────────────────────────────────────────┘ │
│        气泡 (Bubble) = 空闲时间                             │
└─────────────────────────────────────────────────────────────┘

GPipe (微批次):
┌─────────────────────────────────────────────────────────────┐
│ GPU 0: │F0│F1│F2│F3│  │  │B3│B2│B1│B0│                     │
│ GPU 1: │  │F0│F1│F2│F3│  │  │B3│B2│B1│B0│                  │
│ GPU 2: │  │  │F0│F1│F2│F3│  │  │B3│B2│B1│B0│               │
│ GPU 3: │  │  │  │F0│F1│F2│F3│B3│B2│B1│B0│                  │
│        └─────────────────────────────────────────────────┘ │
│        气泡减少,但仍存在                                   │
└─────────────────────────────────────────────────────────────┘

1F1B (One Forward One Backward):
┌─────────────────────────────────────────────────────────────┐
│ GPU 0: │F0│F1│F2│F3│B0│B1│B2│B3│                           │
│ GPU 1: │  │F0│F1│F2│B0│F3│B1│B2│B3│                        │
│ GPU 2: │  │  │F0│F1│B0│F2│B1│F3│B2│B3│                     │
│ GPU 3: │  │  │  │F0│B0│F1│B1│F2│B2│F3│B3│                  │
│        └─────────────────────────────────────────────────┘ │
│        气泡最小化,显存稳定                                 │
└─────────────────────────────────────────────────────────────┘
"""
1F1B 调度实现
"""

class PipelineScheduler:
    """1F1B 流水线调度器"""

    def __init__(self, num_stages, num_microbatches):
        self.num_stages = num_stages
        self.num_microbatches = num_microbatches

    def generate_schedule(self, stage_id):
        """生成指定 stage 的调度"""
        schedule = []

        # Warmup: 前向传播填充流水线
        warmup_steps = self.num_stages - stage_id - 1
        for i in range(warmup_steps):
            schedule.append(('forward', i))

        # Steady state: 1F1B
        steady_steps = self.num_microbatches - warmup_steps
        for i in range(steady_steps):
            schedule.append(('forward', warmup_steps + i))
            schedule.append(('backward', i))

        # Cooldown: 反向传播清空流水线
        for i in range(warmup_steps):
            schedule.append(('backward', steady_steps + i))

        return schedule


class PipelineStage(nn.Module):
    """流水线阶段"""

    def __init__(self, stage_id, num_stages, layers):
        super().__init__()
        self.stage_id = stage_id
        self.num_stages = num_stages
        self.layers = nn.Sequential(*layers)

        # 激活值缓存 (用于反向传播)
        self.activations = {}

    def forward_step(self, micro_batch_id, x):
        """前向传播一个微批次"""
        if self.stage_id > 0:
            # 从上一阶段接收
            x = self.recv_forward()

        # 保存输入用于反向
        self.activations[micro_batch_id] = x.detach().requires_grad_()

        # 计算
        output = self.layers(self.activations[micro_batch_id])

        if self.stage_id < self.num_stages - 1:
            # 发送到下一阶段
            self.send_forward(output)

        return output

    def backward_step(self, micro_batch_id, grad_output=None):
        """反向传播一个微批次"""
        if self.stage_id < self.num_stages - 1:
            # 从下一阶段接收梯度
            grad_output = self.recv_backward()

        # 取出保存的激活值
        activation = self.activations.pop(micro_batch_id)

        # 反向传播
        output = self.layers(activation)
        output.backward(grad_output)

        if self.stage_id > 0:
            # 发送梯度到上一阶段
            self.send_backward(activation.grad)

2. GPU 调度与资源管理

2.1 基础问题

Q7: 如何实现 GPU 的高效调度?常见的调度策略有哪些?

答案要点:

GPU 调度策略

1. 先来先服务 (FIFO)
┌─────────────────────────────────────────────────────┐
│  队列: Job1 → Job2 → Job3 → Job4                   │
│  优点: 简单公平                                     │
│  缺点: 可能导致小任务等待大任务                     │
└─────────────────────────────────────────────────────┘

2. 最短作业优先 (SJF)
┌─────────────────────────────────────────────────────┐
│  按预估时间排序                                     │
│  优点: 平均等待时间最小                            │
│  缺点: 大任务可能饿死,需要准确的时间预估          │
└─────────────────────────────────────────────────────┘

3. 装箱调度 (Bin Packing)
┌─────────────────────────────────────────────────────┐
│  GPU 0: [Job1: 4GB] [Job2: 2GB] [空闲: 2GB]        │
│  GPU 1: [Job3: 6GB] [空闲: 2GB]                    │
│  GPU 2: [Job4: 8GB]                                 │
│  优点: 资源利用率高                                │
│  缺点: 调度复杂度高                                │
└─────────────────────────────────────────────────────┘

4. 公平共享调度 (Fair Share)
┌─────────────────────────────────────────────────────┐
│  用户 A: 配额 50%, 当前使用 30%                     │
│  用户 B: 配额 30%, 当前使用 40%                     │
│  用户 C: 配额 20%, 当前使用 10%                     │
│                                                     │
│  下次调度优先级: C > A > B (按欠额排序)            │
└─────────────────────────────────────────────────────┘
"""
GPU 调度器实现
"""
from dataclasses import dataclass
from typing import List, Dict, Optional
import heapq

@dataclass
class GPUJob:
    job_id: str
    gpu_memory: int  # MB
    gpu_count: int
    priority: int
    submit_time: float
    estimated_duration: float

@dataclass
class GPU:
    gpu_id: str
    total_memory: int  # MB
    used_memory: int = 0
    running_jobs: List[str] = None

    def __post_init__(self):
        self.running_jobs = []

    @property
    def free_memory(self):
        return self.total_memory - self.used_memory


class GPUScheduler:
    """GPU 调度器"""

    def __init__(self, gpus: List[GPU]):
        self.gpus = {g.gpu_id: g for g in gpus}
        self.pending_queue: List[GPUJob] = []
        self.running_jobs: Dict[str, GPUJob] = {}

    def submit(self, job: GPUJob):
        """提交任务"""
        heapq.heappush(
            self.pending_queue,
            (-job.priority, job.submit_time, job)
        )
        self._try_schedule()

    def _try_schedule(self):
        """尝试调度"""
        scheduled = []

        for _, _, job in sorted(self.pending_queue):
            allocation = self._find_allocation(job)
            if allocation:
                self._allocate(job, allocation)
                scheduled.append(job)

        # 从队列移除已调度的任务
        self.pending_queue = [
            (p, t, j) for p, t, j in self.pending_queue
            if j not in scheduled
        ]
        heapq.heapify(self.pending_queue)

    def _find_allocation(self, job: GPUJob) -> Optional[List[str]]:
        """查找可用 GPU (Best Fit 策略)"""
        candidates = []

        for gpu in self.gpus.values():
            if gpu.free_memory >= job.gpu_memory:
                candidates.append((gpu.free_memory, gpu.gpu_id))

        if len(candidates) < job.gpu_count:
            return None

        # 选择剩余空间最小的 GPU (Best Fit)
        candidates.sort()
        return [gpu_id for _, gpu_id in candidates[:job.gpu_count]]

    def _allocate(self, job: GPUJob, gpu_ids: List[str]):
        """分配资源"""
        for gpu_id in gpu_ids:
            gpu = self.gpus[gpu_id]
            gpu.used_memory += job.gpu_memory
            gpu.running_jobs.append(job.job_id)

        self.running_jobs[job.job_id] = job

    def release(self, job_id: str):
        """释放资源"""
        job = self.running_jobs.pop(job_id, None)
        if job:
            for gpu in self.gpus.values():
                if job_id in gpu.running_jobs:
                    gpu.running_jobs.remove(job_id)
                    gpu.used_memory -= job.gpu_memory

        self._try_schedule()

Q8: Kubernetes 中如何管理 GPU 资源?

答案要点:

# GPU 资源管理配置

# 1. 基本 GPU 请求
apiVersion: v1
kind: Pod
metadata:
  name: gpu-pod
spec:
  containers:
  - name: cuda-container
    image: nvidia/cuda:11.0-base
    resources:
      limits:
        nvidia.com/gpu: 2  # 请求 2 个 GPU
---

# 2. 使用特定 GPU 型号 (Node Selector)
apiVersion: v1
kind: Pod
metadata:
  name: a100-pod
spec:
  nodeSelector:
    nvidia.com/gpu.product: "NVIDIA-A100-SXM4-80GB"
  containers:
  - name: training
    resources:
      limits:
        nvidia.com/gpu: 8

---
# 3. GPU 共享 (时间片)
# 使用 NVIDIA MPS 或 vGPU
apiVersion: v1
kind: Pod
metadata:
  name: shared-gpu-pod
spec:
  containers:
  - name: inference
    resources:
      limits:
        nvidia.com/gpu: 1
    env:
    - name: CUDA_MPS_PIPE_DIRECTORY
      value: /tmp/nvidia-mps

---
# 4. GPU 拓扑感知调度
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: gpu-topology-aware
value: 1000000
---
apiVersion: v1
kind: Pod
metadata:
  name: topology-pod
  annotations:
    # 请求同一 NVLink 域的 GPU
    nvidia.com/gpu-topology: "nvlink"
spec:
  containers:
  - name: multi-gpu
    resources:
      limits:
        nvidia.com/gpu: 4
"""
GPU 拓扑感知调度
"""
import subprocess
import xml.etree.ElementTree as ET

def get_gpu_topology():
    """获取 GPU 拓扑信息"""
    result = subprocess.run(
        ['nvidia-smi', 'topo', '-m'],
        capture_output=True, text=True
    )
    return parse_topology(result.stdout)

def parse_topology(output):
    """解析拓扑矩阵"""
    # 示例输出:
    #         GPU0    GPU1    GPU2    GPU3
    # GPU0     X      NV1     NV1     NV2
    # GPU1    NV1      X      NV2     NV1
    # GPU2    NV1     NV2      X      NV1
    # GPU3    NV2     NV1     NV1      X

    topology = {}
    lines = output.strip().split('\n')

    # 解析连接类型
    # NV1, NV2: NVLink 1代, 2代
    # PHB: PCIe 同一 Host Bridge
    # SYS: 跨 NUMA 节点

    return topology

def select_gpus_by_topology(num_gpus, prefer_nvlink=True):
    """根据拓扑选择 GPU"""
    topology = get_gpu_topology()

    if prefer_nvlink:
        # 优先选择 NVLink 连接的 GPU
        nvlink_groups = find_nvlink_groups(topology)
        for group in nvlink_groups:
            if len(group) >= num_gpus:
                return group[:num_gpus]

    # 回退到任意可用 GPU
    return list(range(num_gpus))

2.2 进阶问题

Q9: 如何实现 GPU 显存的动态管理和碎片整理?

答案要点:

"""
GPU 显存管理
"""
import torch
from typing import Dict, List

class GPUMemoryManager:
    """GPU 显存管理器"""

    def __init__(self, device_id: int = 0):
        self.device = torch.device(f'cuda:{device_id}')
        self.allocated_tensors: Dict[int, torch.Tensor] = {}
        self.memory_pool: List[torch.Tensor] = []

    def get_memory_info(self):
        """获取显存信息"""
        allocated = torch.cuda.memory_allocated(self.device)
        reserved = torch.cuda.memory_reserved(self.device)
        max_memory = torch.cuda.max_memory_allocated(self.device)

        return {
            'allocated': allocated / 1024**3,  # GB
            'reserved': reserved / 1024**3,
            'max_allocated': max_memory / 1024**3,
            'fragmentation': (reserved - allocated) / reserved if reserved > 0 else 0
        }

    def defragment(self):
        """显存碎片整理"""
        # 方法 1: 清空缓存
        torch.cuda.empty_cache()

        # 方法 2: 重新分配张量 (高级)
        # 将所有张量复制到 CPU,清空 GPU,再复制回来
        cpu_copies = {}
        for tensor_id, tensor in self.allocated_tensors.items():
            cpu_copies[tensor_id] = tensor.cpu()

        # 清空 GPU
        self.allocated_tensors.clear()
        torch.cuda.empty_cache()

        # 重新分配(连续)
        for tensor_id, cpu_tensor in cpu_copies.items():
            self.allocated_tensors[tensor_id] = cpu_tensor.to(self.device)

    def allocate_from_pool(self, shape, dtype=torch.float32):
        """从池中分配(减少碎片)"""
        required_size = torch.tensor(shape).prod().item() * dtype.itemsize

        # 查找合适的预分配块
        for i, tensor in enumerate(self.memory_pool):
            if tensor.numel() * tensor.element_size() >= required_size:
                # 复用现有块
                block = self.memory_pool.pop(i)
                return block[:shape[0]].view(shape)

        # 分配新块
        return torch.empty(shape, dtype=dtype, device=self.device)

    def return_to_pool(self, tensor: torch.Tensor):
        """归还到池中"""
        self.memory_pool.append(tensor)


# 显存优化技术

def gradient_checkpointing_example():
    """梯度检查点:用计算换显存"""
    from torch.utils.checkpoint import checkpoint

    class CheckpointedModel(nn.Module):
        def __init__(self):
            super().__init__()
            self.layers = nn.ModuleList([
                nn.Linear(1024, 1024) for _ in range(100)
            ])

        def forward(self, x):
            for i, layer in enumerate(self.layers):
                if i % 10 == 0:
                    # 每 10 层设置检查点
                    x = checkpoint(self._forward_segment, x, i)
                else:
                    x = layer(x)
            return x

        def _forward_segment(self, x, start_idx):
            for j in range(10):
                if start_idx + j < len(self.layers):
                    x = self.layers[start_idx + j](x)
            return x


def offload_to_cpu():
    """CPU Offload:将部分参数/优化器状态放到 CPU"""
    # DeepSpeed ZeRO-Offload
    config = {
        "zero_optimization": {
            "stage": 2,
            "offload_optimizer": {
                "device": "cpu",
                "pin_memory": True
            },
            "offload_param": {
                "device": "cpu",
                "pin_memory": True
            }
        }
    }

3. 模型服务与推理

3.1 基础问题

Q10: 模型推理服务的关键性能指标有哪些?如何优化?

答案要点:

推理性能指标

1. 延迟 (Latency)
┌─────────────────────────────────────────────────────┐
│  P50: 50% 请求的响应时间                            │
│  P95: 95% 请求的响应时间                            │
│  P99: 99% 请求的响应时间                            │
│  目标: P99 < SLA 要求                               │
└─────────────────────────────────────────────────────┘

2. 吞吐量 (Throughput)
┌─────────────────────────────────────────────────────┐
│  QPS: 每秒查询数                                    │
│  Tokens/s: 每秒生成 token 数 (LLM)                  │
│  目标: 满足业务峰值需求                             │
└─────────────────────────────────────────────────────┘

3. 资源利用率
┌─────────────────────────────────────────────────────┐
│  GPU 利用率: 目标 > 80%                             │
│  显存利用率: 目标 > 70%                             │
│  CPU 利用率: 目标 50-70%                            │
└─────────────────────────────────────────────────────┘
"""
推理优化技术
"""

# 1. 模型量化
import torch.quantization as quant

def quantize_model(model):
    """动态量化 (推理时量化)"""
    quantized_model = quant.quantize_dynamic(
        model,
        {torch.nn.Linear, torch.nn.LSTM},
        dtype=torch.qint8
    )
    return quantized_model

def static_quantize(model, calibration_data):
    """静态量化 (需要校准数据)"""
    model.eval()

    # 准备量化
    model.qconfig = quant.get_default_qconfig('fbgemm')
    quant.prepare(model, inplace=True)

    # 校准
    with torch.no_grad():
        for data in calibration_data:
            model(data)

    # 转换
    quant.convert(model, inplace=True)
    return model


# 2. 批处理优化
class DynamicBatcher:
    """动态批处理"""

    def __init__(self, model, max_batch_size=32,
                 max_wait_time=0.01):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_wait_time = max_wait_time
        self.pending_requests = []
        self.lock = threading.Lock()

    def add_request(self, input_data):
        """添加请求"""
        future = asyncio.Future()

        with self.lock:
            self.pending_requests.append((input_data, future))

            if len(self.pending_requests) >= self.max_batch_size:
                self._process_batch()

        return future

    def _process_batch(self):
        """处理批次"""
        if not self.pending_requests:
            return

        batch_inputs = [r[0] for r in self.pending_requests]
        futures = [r[1] for r in self.pending_requests]
        self.pending_requests = []

        # 批量推理
        batch_tensor = torch.stack(batch_inputs)
        with torch.no_grad():
            outputs = self.model(batch_tensor)

        # 分发结果
        for i, future in enumerate(futures):
            future.set_result(outputs[i])


# 3. KV Cache 优化 (LLM)
class KVCache:
    """Key-Value 缓存管理"""

    def __init__(self, num_layers, num_heads,
                 head_dim, max_seq_len):
        self.num_layers = num_layers
        self.cache = {}

    def get_or_create(self, batch_id, device):
        """获取或创建缓存"""
        if batch_id not in self.cache:
            self.cache[batch_id] = [
                (
                    torch.zeros(1, self.num_heads, 0, self.head_dim, device=device),
                    torch.zeros(1, self.num_heads, 0, self.head_dim, device=device)
                )
                for _ in range(self.num_layers)
            ]
        return self.cache[batch_id]

    def update(self, batch_id, layer_idx, new_k, new_v):
        """更新缓存"""
        k, v = self.cache[batch_id][layer_idx]
        self.cache[batch_id][layer_idx] = (
            torch.cat([k, new_k], dim=2),
            torch.cat([v, new_v], dim=2)
        )


# 4. Continuous Batching (LLM)
class ContinuousBatcher:
    """连续批处理 (vLLM 风格)"""

    def __init__(self, model, max_batch_tokens=4096):
        self.model = model
        self.max_batch_tokens = max_batch_tokens
        self.running_requests = []
        self.waiting_requests = []

    def step(self):
        """执行一步推理"""
        # 1. 检查已完成的请求
        completed = []
        for req in self.running_requests:
            if req.is_finished():
                completed.append(req)

        for req in completed:
            self.running_requests.remove(req)

        # 2. 添加新请求
        current_tokens = sum(r.current_length for r in self.running_requests)

        while self.waiting_requests and \
              current_tokens < self.max_batch_tokens:
            req = self.waiting_requests.pop(0)
            self.running_requests.append(req)
            current_tokens += req.current_length

        # 3. 批量前向
        if self.running_requests:
            self._forward_batch()

Q11: 如何设计高可用的模型服务架构?

答案要点:

高可用推理服务架构

┌─────────────────────────────────────────────────────────────────────┐
│                        负载均衡层                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │  L4/L7 LB │ 健康检查 │ 会话保持 │ 限流                       │   │
│  └────────────────────────┬────────────────────────────────────┘   │
│                           │                                         │
│  ┌────────────────────────┴────────────────────────────────────┐   │
│  │                     API 网关                                 │   │
│  │  认证 │ 路由 │ 限流 │ 监控 │ 协议转换                       │   │
│  └────────────────────────┬────────────────────────────────────┘   │
│                           │                                         │
│  ┌────────────────────────┴────────────────────────────────────┐   │
│  │                   推理服务集群                               │   │
│  │                                                              │   │
│  │   ┌─────────┐  ┌─────────┐  ┌─────────┐                    │   │
│  │   │ Node 1  │  │ Node 2  │  │ Node 3  │                    │   │
│  │   │┌───────┐│  │┌───────┐│  │┌───────┐│                    │   │
│  │   ││Triton ││  ││Triton ││  ││Triton ││                    │   │
│  │   │└───────┘│  │└───────┘│  │└───────┘│                    │   │
│  │   │  GPU×4  │  │  GPU×4  │  │  GPU×4  │                    │   │
│  │   └─────────┘  └─────────┘  └─────────┘                    │   │
│  │                                                              │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │                   模型仓库 (共享存储)                         │   │
│  │  Model Registry │ Version Control │ A/B Testing              │   │
│  └──────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────┘
"""
高可用设计实现
"""

# 1. 健康检查
from fastapi import FastAPI, HTTPException
import asyncio

app = FastAPI()

class HealthChecker:
    def __init__(self, model):
        self.model = model
        self.is_healthy = True
        self.last_check = 0

    async def check(self):
        """执行健康检查"""
        try:
            # 模型推理测试
            test_input = torch.randn(1, 3, 224, 224).cuda()
            with torch.no_grad():
                _ = self.model(test_input)

            # GPU 状态检查
            memory_used = torch.cuda.memory_allocated()
            memory_total = torch.cuda.get_device_properties(0).total_memory

            if memory_used / memory_total > 0.95:
                return False, "GPU memory critical"

            self.is_healthy = True
            return True, "OK"

        except Exception as e:
            self.is_healthy = False
            return False, str(e)

@app.get("/health")
async def health_check():
    healthy, message = await health_checker.check()
    if not healthy:
        raise HTTPException(status_code=503, detail=message)
    return {"status": "healthy"}

@app.get("/ready")
async def readiness_check():
    if not model_loaded:
        raise HTTPException(status_code=503, detail="Model not loaded")
    return {"status": "ready"}


# 2. 优雅关闭
import signal

class GracefulShutdown:
    def __init__(self):
        self.shutdown = False
        signal.signal(signal.SIGTERM, self._handle_sigterm)

    def _handle_sigterm(self, signum, frame):
        print("Received SIGTERM, initiating graceful shutdown...")
        self.shutdown = True

    async def wait_for_requests(self, timeout=30):
        """等待现有请求完成"""
        start = time.time()
        while active_requests > 0 and time.time() - start < timeout:
            await asyncio.sleep(0.1)


# 3. 自动扩缩容指标
class AutoscalingMetrics:
    """HPA 自定义指标"""

    def __init__(self):
        self.request_count = 0
        self.latency_sum = 0
        self.gpu_utilization = 0

    def record_request(self, latency):
        self.request_count += 1
        self.latency_sum += latency

    def get_metrics(self):
        return {
            "qps": self.request_count / 60,  # 过去 60 秒
            "avg_latency": self.latency_sum / max(self.request_count, 1),
            "gpu_utilization": self.gpu_utilization
        }


# 4. 模型版本管理
class ModelVersionManager:
    """模型版本管理"""

    def __init__(self, model_repo: str):
        self.model_repo = model_repo
        self.loaded_versions = {}
        self.active_version = None

    async def load_version(self, version: str):
        """加载指定版本"""
        model_path = f"{self.model_repo}/{version}/model.pt"
        model = torch.jit.load(model_path)
        model.cuda()
        model.eval()
        self.loaded_versions[version] = model

    async def switch_version(self, version: str):
        """切换活跃版本"""
        if version not in self.loaded_versions:
            await self.load_version(version)
        self.active_version = version

    async def canary_rollout(self, new_version: str,
                             traffic_percent: float = 10):
        """金丝雀发布"""
        await self.load_version(new_version)
        self.canary_version = new_version
        self.canary_traffic = traffic_percent

    def get_model_for_request(self):
        """获取处理请求的模型"""
        if hasattr(self, 'canary_version') and \
           random.random() * 100 < self.canary_traffic:
            return self.loaded_versions[self.canary_version]
        return self.loaded_versions[self.active_version]

4. 存储系统

4.1 基础问题

Q12: AI 训练场景对存储系统有什么特殊要求?

答案要点:

AI 存储需求分析

训练数据特征:
┌─────────────────────────────────────────────────────────────────┐
│  阶段          │ I/O 模式         │ 吞吐需求    │ 延迟要求      │
├─────────────────────────────────────────────────────────────────┤
│  数据加载      │ 大量顺序读       │ 高 (GB/s)  │ 低敏感        │
│  Checkpoint    │ 大块顺序写       │ 中 (GB/s)  │ 中等          │
│  日志/指标     │ 小块随机写       │ 低         │ 低敏感        │
│  模型保存      │ 大块顺序写       │ 中         │ 低敏感        │
└─────────────────────────────────────────────────────────────────┘

存储系统选型:
┌─────────────────────────────────────────────────────────────────┐
│  场景              │ 推荐存储          │ 原因                   │
├─────────────────────────────────────────────────────────────────┤
│  训练数据集        │ 分布式文件系统    │ 高吞吐,共享访问       │
│  (HDFS/Lustre)     │                   │                        │
├─────────────────────────────────────────────────────────────────┤
│  模型 Checkpoint   │ 并行文件系统      │ 低延迟,高 IOPS        │
│  (GPFS/BeeGFS)     │                   │                        │
├─────────────────────────────────────────────────────────────────┤
│  模型仓库          │ 对象存储          │ 版本管理,成本效益     │
│  (S3/MinIO)        │                   │                        │
├─────────────────────────────────────────────────────────────────┤
│  特征存储          │ 分布式 KV         │ 低延迟查询             │
│  (Redis/Cassandra) │                   │                        │
└─────────────────────────────────────────────────────────────────┘
"""
数据加载优化
"""

# 1. 多进程数据加载
from torch.utils.data import DataLoader

def create_optimized_dataloader(dataset, batch_size=32):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=8,           # 多进程加载
        pin_memory=True,         # 固定内存,加速 GPU 传输
        prefetch_factor=2,       # 预取因子
        persistent_workers=True  # 持久化 worker
    )


# 2. 内存映射数据集
import numpy as np

class MemmapDataset:
    """内存映射数据集(适合大数据)"""

    def __init__(self, data_path, shape, dtype=np.float32):
        self.data = np.memmap(
            data_path,
            dtype=dtype,
            mode='r',
            shape=shape
        )

    def __getitem__(self, idx):
        return torch.from_numpy(self.data[idx].copy())

    def __len__(self):
        return len(self.data)


# 3. 分布式数据加载
from torch.utils.data.distributed import DistributedSampler

def create_distributed_dataloader(dataset, world_size, rank):
    sampler = DistributedSampler(
        dataset,
        num_replicas=world_size,
        rank=rank,
        shuffle=True
    )

    return DataLoader(
        dataset,
        batch_size=32,
        sampler=sampler,
        num_workers=4,
        pin_memory=True
    )


# 4. 数据预处理流水线
class AsyncDataPipeline:
    """异步数据流水线"""

    def __init__(self, data_source, buffer_size=100):
        self.data_source = data_source
        self.buffer = queue.Queue(maxsize=buffer_size)
        self.stop_event = threading.Event()

    def start(self):
        """启动后台加载线程"""
        self.loader_thread = threading.Thread(target=self._load_data)
        self.loader_thread.start()

    def _load_data(self):
        """后台加载数据"""
        for item in self.data_source:
            if self.stop_event.is_set():
                break

            # 预处理
            processed = self._preprocess(item)
            self.buffer.put(processed)

    def _preprocess(self, item):
        """CPU 预处理"""
        # 解码、增强、归一化等
        return item

    def get_batch(self, batch_size):
        """获取批次"""
        batch = []
        for _ in range(batch_size):
            batch.append(self.buffer.get())
        return torch.stack(batch)

Q13: 如何优化 Checkpoint 保存和恢复的性能?

答案要点:

"""
Checkpoint 优化策略
"""

# 1. 异步 Checkpoint
class AsyncCheckpointer:
    """异步 Checkpoint 保存"""

    def __init__(self, save_dir, max_concurrent=2):
        self.save_dir = save_dir
        self.executor = ThreadPoolExecutor(max_workers=max_concurrent)
        self.pending_saves = []

    def save_async(self, state_dict, filename):
        """异步保存"""
        # 先复制到 CPU (非阻塞)
        cpu_state = {
            k: v.cpu().clone() if torch.is_tensor(v) else v
            for k, v in state_dict.items()
        }

        # 异步写入
        future = self.executor.submit(
            self._save_to_disk, cpu_state, filename
        )
        self.pending_saves.append(future)

        # 清理已完成的
        self.pending_saves = [f for f in self.pending_saves if not f.done()]

    def _save_to_disk(self, state_dict, filename):
        path = os.path.join(self.save_dir, filename)
        torch.save(state_dict, path)

    def wait_all(self):
        """等待所有保存完成"""
        for future in self.pending_saves:
            future.result()


# 2. 增量 Checkpoint
class IncrementalCheckpointer:
    """增量 Checkpoint"""

    def __init__(self, base_dir):
        self.base_dir = base_dir
        self.last_state = {}

    def save_incremental(self, state_dict, step):
        """保存增量"""
        changed_keys = []

        for key, value in state_dict.items():
            if key not in self.last_state:
                changed_keys.append(key)
            elif not torch.equal(value, self.last_state[key]):
                changed_keys.append(key)

        # 只保存变化的部分
        incremental_state = {k: state_dict[k] for k in changed_keys}

        path = os.path.join(self.base_dir, f'incr_{step}.pt')
        torch.save({
            'changed_keys': changed_keys,
            'state': incremental_state
        }, path)

        self.last_state = {k: v.clone() for k, v in state_dict.items()}

    def load_incremental(self, base_step, target_step):
        """加载增量"""
        # 加载基础状态
        state = torch.load(f'{self.base_dir}/full_{base_step}.pt')

        # 应用增量
        for step in range(base_step + 1, target_step + 1):
            incr = torch.load(f'{self.base_dir}/incr_{step}.pt')
            for key in incr['changed_keys']:
                state[key] = incr['state'][key]

        return state


# 3. 分布式 Checkpoint
class DistributedCheckpointer:
    """分布式 Checkpoint (每个 rank 保存自己的部分)"""

    def __init__(self, save_dir, rank, world_size):
        self.save_dir = save_dir
        self.rank = rank
        self.world_size = world_size

    def save_sharded(self, model, optimizer, step):
        """分片保存"""
        # 每个 rank 保存自己的分片
        shard = {
            'model': self._get_local_state(model),
            'optimizer': optimizer.state_dict(),
            'step': step
        }

        path = f'{self.save_dir}/checkpoint_{step}_rank{self.rank}.pt'
        torch.save(shard, path)

        # 同步确保所有 rank 完成
        dist.barrier()

    def load_sharded(self, model, optimizer, step):
        """分片加载"""
        # 每个 rank 加载自己的分片
        path = f'{self.save_dir}/checkpoint_{step}_rank{self.rank}.pt'
        shard = torch.load(path, map_location=f'cuda:{self.rank}')

        self._load_local_state(model, shard['model'])
        optimizer.load_state_dict(shard['optimizer'])

        dist.barrier()
        return shard['step']


# 4. Checkpoint 压缩
import zlib

class CompressedCheckpointer:
    """压缩 Checkpoint"""

    def save_compressed(self, state_dict, path):
        """压缩保存"""
        import io

        buffer = io.BytesIO()
        torch.save(state_dict, buffer)

        compressed = zlib.compress(buffer.getvalue(), level=6)

        with open(path, 'wb') as f:
            f.write(compressed)

    def load_compressed(self, path):
        """加载压缩文件"""
        import io

        with open(path, 'rb') as f:
            compressed = f.read()

        decompressed = zlib.decompress(compressed)
        buffer = io.BytesIO(decompressed)

        return torch.load(buffer)

5. 系统设计题

5.1 设计一个模型训练平台

题目: 设计一个支持多租户的 GPU 训练平台,支持资源隔离、任务调度、监控告警。

答案框架:

系统架构

┌─────────────────────────────────────────────────────────────────────┐
│                          用户层                                       │
│  Web UI │ CLI │ SDK │ REST API                                      │
└───────────────────────────────┬─────────────────────────────────────┘
                                │
┌───────────────────────────────┴─────────────────────────────────────┐
│                        平台服务层                                     │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐  │
│  │ 任务管理    │ │ 资源管理    │ │ 用户管理    │ │ 监控告警    │  │
│  │             │ │             │ │             │ │             │  │
│  │ - 任务提交  │ │ - GPU 调度  │ │ - 认证授权  │ │ - 指标采集  │  │
│  │ - 生命周期  │ │ - 配额管理  │ │ - 租户隔离  │ │ - 日志聚合  │  │
│  │ - 日志查看  │ │ - 优先级    │ │ - 计费统计  │ │ - 异常告警  │  │
│  └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘  │
└───────────────────────────────┬─────────────────────────────────────┘
                                │
┌───────────────────────────────┴─────────────────────────────────────┐
│                        编排调度层                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    Kubernetes                                │   │
│  │  - GPU Device Plugin                                        │   │
│  │  - Custom Scheduler                                         │   │
│  │  - Training Operator (PyTorchJob/TFJob)                    │   │
│  │  - Network Policy (租户隔离)                                 │   │
│  └─────────────────────────────────────────────────────────────┘   │
└───────────────────────────────┬─────────────────────────────────────┘
                                │
┌───────────────────────────────┴─────────────────────────────────────┐
│                        基础设施层                                     │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐  │
│  │ GPU 集群    │ │ 存储系统    │ │ 网络        │ │ 镜像仓库    │  │
│  │ A100/H100   │ │ NFS/Lustre │ │ RDMA/RoCE   │ │ Harbor      │  │
│  └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘  │
└─────────────────────────────────────────────────────────────────────┘
"""
核心组件设计
"""

# 1. 任务定义
@dataclass
class TrainingJob:
    job_id: str
    user_id: str
    tenant_id: str
    name: str
    image: str
    command: List[str]

    # 资源需求
    gpu_count: int
    gpu_type: str  # A100, V100, etc.
    cpu: int
    memory: str  # 64Gi

    # 分布式配置
    world_size: int = 1
    distributed_backend: str = 'nccl'

    # 优先级和调度
    priority: int = 0
    preemptible: bool = False
    max_runtime: int = 86400  # 秒

    # 存储挂载
    volumes: List[Dict] = None


# 2. 资源配额
@dataclass
class ResourceQuota:
    tenant_id: str
    gpu_limit: int
    gpu_used: int
    cpu_limit: int
    memory_limit: str
    storage_limit: str

    def can_allocate(self, job: TrainingJob) -> bool:
        return (self.gpu_used + job.gpu_count) <= self.gpu_limit


# 3. 调度器接口
class TrainingScheduler:
    def schedule(self, job: TrainingJob) -> Optional[str]:
        """调度任务,返回节点分配"""

        # 1. 检查配额
        quota = self.get_quota(job.tenant_id)
        if not quota.can_allocate(job):
            return None

        # 2. 查找可用资源
        candidates = self.find_available_nodes(job)
        if not candidates:
            return None

        # 3. 选择最优节点 (考虑拓扑)
        node = self.select_best_node(candidates, job)

        # 4. 分配资源
        self.allocate(node, job)

        return node


# 4. 监控系统
class TrainingMonitor:
    def collect_metrics(self, job_id: str) -> Dict:
        return {
            'gpu_utilization': self.get_gpu_util(job_id),
            'gpu_memory': self.get_gpu_memory(job_id),
            'throughput': self.get_throughput(job_id),
            'loss': self.get_loss(job_id),
            'learning_rate': self.get_lr(job_id)
        }

    def check_anomaly(self, metrics: Dict) -> List[str]:
        anomalies = []

        if metrics['gpu_utilization'] < 10:
            anomalies.append('GPU 利用率过低')

        if metrics['loss'] > 100:
            anomalies.append('Loss 异常')

        return anomalies

总结

本章覆盖了 AI 基础设施的核心面试题:

  1. 分布式训练:数据/模型并行、AllReduce、ZeRO、混合精度
  2. GPU 调度:调度策略、K8s GPU 管理、显存优化
  3. 模型服务:推理优化、高可用架构、动态批处理
  4. 存储系统:数据加载、Checkpoint 优化
  5. 系统设计:训练平台架构

关键要点:

  • 理解底层原理,不只是会用框架
  • 关注性能优化和资源效率
  • 具备系统设计和问题分析能力
  • 实践经验和故障处理能力

下一章将探讨大模型相关的面试题。

Prev
09-面试专题
Next
02-大模型面试题