HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • 技术面试完全指南

    • 技术面试完全指南
    • 8年面试官告诉你:90%的简历在第一轮就被刷掉了
    • 刷了500道LeetCode,终于明白大厂算法面试到底考什么
    • 高频算法题精讲-双指针与滑动窗口
    • 03-高频算法题精讲-二分查找与排序
    • 04-高频算法题精讲-树与递归
    • 05-高频算法题精讲-图与拓扑排序
    • 06-高频算法题精讲-动态规划
    • Go面试必问:一道GMP问题,干掉90%的候选人
    • 08-数据库面试高频题
    • 09-分布式系统面试题
    • 10-Kubernetes与云原生面试题
    • 11-系统设计面试方法论
    • 前端面试高频题
    • AI 与机器学习面试题
    • 行为面试与软技能

模型部署与优化

1. 模型量化

量化通过降低模型参数精度来减少内存占用和加速推理。

1.1 量化原理

浮点数表示:

FP32 (32位): 1符号位 + 8指数位 + 23尾数位
FP16 (16位): 1符号位 + 5指数位 + 10尾数位
INT8 (8位): 1符号位 + 7数值位
INT4 (4位): 1符号位 + 3数值位

量化公式:

x_quantized = round(x / scale) + zero_point
x_dequantized = (x_quantized - zero_point) * scale

scale = (x_max - x_min) / (q_max - q_min)
zero_point = q_min - round(x_min / scale)

对比表格:

精度内存占用速度精度损失适用场景
FP321×1×0%训练,基准
FP160.5×2-3×<1%训练,推理
INT80.25×3-4×1-2%推理
INT40.125×4-6×2-5%推理(大模型)

1.2 INT8量化

PyTorch动态量化:

import torch
from torch.quantization import quantize_dynamic

model = MyModel()
model.load_state_dict(torch.load('model.pth'))

# 动态量化(推理时计算scale)
quantized_model = quantize_dynamic(
    model,
    {torch.nn.Linear},  # 量化的层类型
    dtype=torch.qint8
)

# 保存
torch.save(quantized_model.state_dict(), 'quantized_model.pth')

# 大小对比
import os
fp32_size = os.path.getsize('model.pth') / (1024**2)
int8_size = os.path.getsize('quantized_model.pth') / (1024**2)
print(f"FP32: {fp32_size:.2f}MB")
print(f"INT8: {int8_size:.2f}MB")
print(f"压缩比: {fp32_size/int8_size:.2f}x")

静态量化(需校准):

from torch.quantization import get_default_qconfig, prepare, convert

model.eval()
model.qconfig = get_default_qconfig('fbgemm')  # x86 CPU
# model.qconfig = get_default_qconfig('qnnpack')  # ARM

# 准备量化
model_prepared = prepare(model)

# 校准(在代表性数据上运行)
with torch.no_grad():
    for data in calibration_dataloader:
        model_prepared(data)

# 转换为量化模型
model_quantized = convert(model_prepared)

LLM.int8():

from transformers import AutoModelForCausalLM

# 8bit量化加载大模型
model = AutoModelForCausalLM.from_pretrained(
    "meta-llama/Llama-2-7b-hf",
    load_in_8bit=True,
    device_map="auto"
)

# 内存对比:
# FP16: 13GB
# INT8: 6.5GB (节省50%)

1.3 INT4量化

GPTQ (GPT Quantization)

基于Optimal Brain Quantization的4bit量化。

from transformers import AutoModelForCausalLM, GPTQConfig

# GPTQ配置
gptq_config = GPTQConfig(
    bits=4,                    # 4bit量化
    group_size=128,            # 量化分组大小
    desc_act=False,            # 激活值量化顺序
)

# 加载GPTQ量化模型
model = AutoModelForCausalLM.from_pretrained(
    "TheBloke/Llama-2-7B-GPTQ",
    device_map="auto",
    quantization_config=gptq_config
)

# 推理
inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
outputs = model.generate(**inputs, max_new_tokens=100)

自己量化模型:

from auto_gptq import AutoGPTQForCausalLM, BaseQuantizeConfig

# 量化配置
quantize_config = BaseQuantizeConfig(
    bits=4,
    group_size=128,
    desc_act=False,
)

# 加载模型
model = AutoGPTQForCausalLM.from_pretrained(
    "meta-llama/Llama-2-7b-hf",
    quantize_config=quantize_config
)

# 量化(需要校准数据)
model.quantize(calibration_data)

# 保存
model.save_quantized("llama2-7b-gptq-4bit")

AWQ (Activation-aware Weight Quantization)

保留对激活值影响大的权重通道。

from awq import AutoAWQForCausalLM
from transformers import AutoTokenizer

model_path = "meta-llama/Llama-2-7b-hf"
quant_path = "llama2-7b-awq"

# 加载模型
model = AutoAWQForCausalLM.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)

# 量化配置
quant_config = {
    "zero_point": True,
    "q_group_size": 128,
    "w_bit": 4,
    "version": "GEMM"
}

# 量化
model.quantize(tokenizer, quant_config=quant_config, calib_data=calibration_data)

# 保存
model.save_quantized(quant_path)

GPTQ vs AWQ:

特性GPTQAWQ
量化时间长(数小时)短(数十分钟)
推理速度快更快
精度损失低更低
内存占用相同相同
硬件要求标准GPU标准GPU

GGUF (GPT-Generated Unified Format)

llama.cpp的量化格式,支持CPU推理。

# 转换为GGUF
python convert.py --outtype f16 models/llama-2-7b/

# 量化
./quantize models/llama-2-7b/ggml-model-f16.gguf models/llama-2-7b/ggml-model-q4_0.gguf q4_0

# 量化类型:
# q4_0: 4.5 bpw (bits per weight)
# q4_1: 5.0 bpw
# q5_0: 5.5 bpw
# q5_1: 6.0 bpw
# q8_0: 8.5 bpw

量化效果对比(Llama-2-7B):

量化方法模型大小内存占用速度提升PPL↓MMLU
FP1613GB14GB1.0×5.6845.3%
INT86.5GB8GB1.5×5.7245.1%
GPTQ 4bit3.5GB5GB2.0×5.8944.2%
AWQ 4bit3.5GB5GB2.5×5.8144.7%
GGUF q4_03.8GB6GB1.8×5.9543.8%

2. 模型剪枝

剪枝通过移除不重要的参数来减小模型。

2.1 非结构化剪枝

移除单个权重,产生稀疏矩阵。

import torch
import torch.nn.utils.prune as prune

# L1非结构化剪枝
model = MyModel()
module = model.conv1

prune.l1_unstructured(
    module,
    name="weight",
    amount=0.3  # 剪枝30%的权重
)

# 查看剪枝效果
print(f"剪枝后零值比例: {(module.weight == 0).sum().item() / module.weight.numel():.2%}")

# 永久移除(不可恢复)
prune.remove(module, 'weight')

全局剪枝:

# 对整个模型剪枝
parameters_to_prune = []
for module in model.modules():
    if isinstance(module, torch.nn.Conv2d) or isinstance(module, torch.nn.Linear):
        parameters_to_prune.append((module, 'weight'))

prune.global_unstructured(
    parameters_to_prune,
    pruning_method=prune.L1Unstructured,
    amount=0.5  # 全局剪枝50%
)

2.2 结构化剪枝

移除整个通道/神经元,保持稠密矩阵。

# 剪枝整个卷积通道
prune.ln_structured(
    module,
    name="weight",
    amount=0.3,
    n=2,          # L2范数
    dim=0         # 沿输出通道剪枝
)

# 实际移除通道
def remove_channels(model, prune_ratio=0.3):
    for name, module in model.named_modules():
        if isinstance(module, torch.nn.Conv2d):
            # 计算每个通道的重要性
            importance = torch.norm(module.weight.data, p=2, dim=(1, 2, 3))

            # 选择要保留的通道
            n_keep = int(module.out_channels * (1 - prune_ratio))
            keep_indices = torch.argsort(importance, descending=True)[:n_keep]

            # 创建新的Conv层
            new_conv = torch.nn.Conv2d(
                in_channels=module.in_channels,
                out_channels=n_keep,
                kernel_size=module.kernel_size,
                stride=module.stride,
                padding=module.padding
            )
            new_conv.weight.data = module.weight.data[keep_indices]
            new_conv.bias.data = module.bias.data[keep_indices] if module.bias is not None else None

            # 替换
            setattr(model, name, new_conv)

结构化 vs 非结构化:

特性非结构化结构化
压缩比高中
加速效果需要稀疏计算支持直接加速
精度损失小中等
硬件友好否是

2.3 LLM剪枝

SparseGPT:

from sparsegpt import SparseGPT

model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-7b")

# 剪枝至50%稀疏度
pruner = SparseGPT(model)
pruner.prune(sparsity=0.5, prunen=2, prunem=4)  # 2:4结构化稀疏

# 保存
model.save_pretrained("llama2-7b-sparse50")

Wanda (Pruning by Weights and Activations):

from wanda import WandaPruner

pruner = WandaPruner(model, tokenizer)

# 使用校准数据剪枝
pruner.prune(
    calibration_data=calib_data,
    sparsity=0.5,
    prune_n=2,
    prune_m=4
)

3. 知识蒸馏

通过Teacher模型指导Student模型训练。

3.1 基础蒸馏

import torch
import torch.nn as nn
import torch.nn.functional as F

class DistillationLoss(nn.Module):
    def __init__(self, temperature=3.0, alpha=0.5):
        super().__init__()
        self.temperature = temperature
        self.alpha = alpha
        self.ce_loss = nn.CrossEntropyLoss()

    def forward(self, student_logits, teacher_logits, labels):
        # KL散度损失(软标签)
        soft_loss = F.kl_div(
            F.log_softmax(student_logits / self.temperature, dim=-1),
            F.softmax(teacher_logits / self.temperature, dim=-1),
            reduction='batchmean'
        ) * (self.temperature ** 2)

        # 交叉熵损失(硬标签)
        hard_loss = self.ce_loss(student_logits, labels)

        # 组合
        return self.alpha * soft_loss + (1 - self.alpha) * hard_loss

# 训练
teacher_model.eval()
student_model.train()

distill_loss_fn = DistillationLoss(temperature=3.0, alpha=0.5)

for batch in dataloader:
    inputs, labels = batch

    # Teacher预测(不计算梯度)
    with torch.no_grad():
        teacher_logits = teacher_model(inputs)

    # Student预测
    student_logits = student_model(inputs)

    # 蒸馏损失
    loss = distill_loss_fn(student_logits, teacher_logits, labels)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

3.2 LLM蒸馏

DistilBERT:

from transformers import DistilBertForSequenceClassification, BertForSequenceClassification
from transformers import DistillationTrainingArguments, DistillationTrainer

# Teacher: BERT-base (110M参数)
teacher = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)

# Student: DistilBERT (66M参数, 40%压缩)
student = DistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased', num_labels=2)

# 蒸馏训练
training_args = DistillationTrainingArguments(
    output_dir="./distilled_model",
    temperature=2.0,
    alpha_ce=0.5,
    alpha_mlm=0.5,
)

trainer = DistillationTrainer(
    teacher_model=teacher,
    student_model=student,
    args=training_args,
    train_dataset=train_dataset,
)

trainer.train()

效果对比:

模型参数量速度GLUE分数
BERT-base110M1.0×84.0
DistilBERT66M1.6×81.8
TinyBERT14.5M9.4×79.2

4. 推理加速

4.1 vLLM

高吞吐量的LLM推理引擎。

核心技术:

  • PagedAttention: KV Cache分页管理
  • Continuous Batching: 动态批处理
  • CUDA Kernel优化

安装和使用:

from vllm import LLM, SamplingParams

# 初始化模型
llm = LLM(
    model="meta-llama/Llama-2-7b-hf",
    tensor_parallel_size=2,  # 张量并行(2块GPU)
    gpu_memory_utilization=0.9,
)

# 采样参数
sampling_params = SamplingParams(
    temperature=0.8,
    top_p=0.95,
    max_tokens=100
)

# 批量推理
prompts = [
    "Once upon a time,",
    "The meaning of life is",
    "In a galaxy far far away,"
]

outputs = llm.generate(prompts, sampling_params)

for output in outputs:
    print(f"Prompt: {output.prompt}")
    print(f"Generated: {output.outputs[0].text}")
    print("-" * 50)

性能对比(Llama-2-7B, A100):

框架吞吐量(tokens/s)延迟(ms)内存占用
HuggingFace20015016GB
TGI8005014GB
vLLM24003012GB

4.2 TGI (Text Generation Inference)

HuggingFace的推理服务器。

Docker启动:

docker run --gpus all --shm-size 1g -p 8080:80 \
    -v $PWD/data:/data \
    ghcr.io/huggingface/text-generation-inference:latest \
    --model-id meta-llama/Llama-2-7b-hf \
    --num-shard 2 \
    --max-batch-prefill-tokens 4096 \
    --max-total-tokens 8192

客户端调用:

import requests

API_URL = "http://localhost:8080/generate"

payload = {
    "inputs": "Once upon a time,",
    "parameters": {
        "max_new_tokens": 100,
        "temperature": 0.7,
        "top_p": 0.95,
        "do_sample": True
    }
}

response = requests.post(API_URL, json=payload)
print(response.json()["generated_text"])

流式生成:

import requests

API_URL = "http://localhost:8080/generate_stream"

with requests.post(API_URL, json=payload, stream=True) as response:
    for chunk in response.iter_lines():
        if chunk:
            data = json.loads(chunk)
            print(data["token"]["text"], end="", flush=True)

4.3 llama.cpp

CPU推理优化,支持量化模型。

编译和运行:

# 编译
git clone https://github.com/ggerganov/llama.cpp
cd llama.cpp
make

# 转换模型
python convert.py --outtype f16 models/llama-2-7b/

# 量化
./quantize models/llama-2-7b/ggml-model-f16.gguf models/llama-2-7b/ggml-model-q4_0.gguf q4_0

# 推理
./main -m models/llama-2-7b/ggml-model-q4_0.gguf \
       -p "Once upon a time" \
       -n 128 \
       -t 8  # 8线程

Python绑定:

from llama_cpp import Llama

llm = Llama(
    model_path="models/llama-2-7b/ggml-model-q4_0.gguf",
    n_ctx=2048,      # 上下文长度
    n_threads=8,     # CPU线程数
    n_gpu_layers=0   # 0=纯CPU
)

output = llm(
    "Once upon a time,",
    max_tokens=100,
    temperature=0.8,
    top_p=0.95,
)

print(output["choices"][0]["text"])

4.4 Ollama

本地LLM运行工具,极简易用。

安装:

# macOS/Linux
curl -fsSL https://ollama.com/install.sh | sh

# Windows
# 下载安装包: https://ollama.com/download

使用:

# 下载模型
ollama pull llama2

# 运行
ollama run llama2

# API服务
ollama serve

Python调用:

import requests
import json

def chat(prompt):
    response = requests.post(
        "http://localhost:11434/api/generate",
        json={
            "model": "llama2",
            "prompt": prompt,
            "stream": False
        }
    )
    return response.json()["response"]

result = chat("Explain quantum computing in simple terms")
print(result)

框架对比:

框架硬件易用性性能适用场景
vLLMGPU中极高生产环境,高并发
TGIGPU高高HF生态,快速部署
llama.cppCPU中中无GPU,边缘设备
OllamaCPU/GPU极高中本地开发,快速原型

5. 并发优化

5.1 Continuous Batching

动态批处理,减少等待时间。

传统Static Batching:

批次1: [请求A(100 tokens), 请求B(50 tokens), 请求C(80 tokens)]
问题: 必须等所有请求(100 tokens)完成才能开始批次2

Continuous Batching:

# vLLM自动实现
# 请求B完成后(50 tokens),立即加入新请求D
批次动态: [A(100), B完成→D加入, C(80), D(新)]

效果:

  • 吞吐量提升2-3×
  • 延迟降低30-50%

5.2 KV Cache

缓存已计算的键值对,避免重复计算。

原理:

# 不使用KV Cache
for i in range(seq_len):
    # 每次都要计算前面所有token的K, V
    K = compute_key(tokens[:i+1])
    V = compute_value(tokens[:i+1])
    output = attention(Q, K, V)

# 使用KV Cache
kv_cache = []
for i in range(seq_len):
    # 只计算新token的K, V
    k_new = compute_key(tokens[i])
    v_new = compute_value(tokens[i])
    kv_cache.append((k_new, v_new))

    # 使用缓存的K, V
    K = concat([kv[0] for kv in kv_cache])
    V = concat([kv[1] for kv in kv_cache])
    output = attention(Q, K, V)

内存占用:

每个token的KV Cache:
  2 (K和V) × n_layers × n_heads × head_dim × 2 bytes (FP16)

Llama-2-7B (上下文2048):
  2 × 32 × 32 × 128 × 2 × 2048 = 1.07GB

优化技巧:

# 1. Multi-Query Attention (MQA)
# 所有head共享K, V
# KV Cache: 1.07GB → 33MB (32×压缩)

# 2. Grouped-Query Attention (GQA)
# 每组head共享K, V
# KV Cache介于MHA和MQA之间

5.3 PagedAttention

vLLM的核心技术,类似OS的虚拟内存。

问题:

  • KV Cache大小不确定(生成长度未知)
  • 预分配导致内存碎片和浪费

解决方案:

# 将KV Cache分页存储
class PagedKVCache:
    def __init__(self, block_size=16):
        self.block_size = block_size  # 每页16个token
        self.blocks = {}  # 物理内存块
        self.page_table = {}  # 逻辑到物理的映射

    def allocate(self, seq_id, num_tokens):
        num_blocks = (num_tokens + self.block_size - 1) // self.block_size
        pages = [self.allocate_block() for _ in range(num_blocks)]
        self.page_table[seq_id] = pages

    def get(self, seq_id, token_idx):
        page_idx = token_idx // self.block_size
        offset = token_idx % self.block_size
        physical_block = self.page_table[seq_id][page_idx]
        return self.blocks[physical_block][offset]

优势:

  • 内存利用率: 90%+ (vs 传统方法的30-50%)
  • 支持高效的序列共享(beam search, parallel sampling)

5.4 批处理优化

# 智能批处理
class SmartBatcher:
    def __init__(self, max_batch_size=32, max_wait_ms=10):
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        self.queue = []

    async def add_request(self, request):
        self.queue.append(request)

        # 条件1: 达到最大批次大小
        if len(self.queue) >= self.max_batch_size:
            return await self.process_batch()

        # 条件2: 等待超时
        await asyncio.sleep(self.max_wait_ms / 1000)
        if self.queue:
            return await self.process_batch()

    async def process_batch(self):
        batch = self.queue[:self.max_batch_size]
        self.queue = self.queue[self.max_batch_size:]

        # 按长度分组,减少padding
        batch_sorted = sorted(batch, key=lambda x: len(x.tokens))

        results = await model.generate_batch(batch_sorted)
        return results

6. 部署方案

6.1 本地部署

单机GPU:

# FastAPI服务
from fastapi import FastAPI
from vllm import LLM, SamplingParams
import uvicorn

app = FastAPI()
llm = LLM(model="meta-llama/Llama-2-7b-hf")

@app.post("/generate")
async def generate(prompt: str, max_tokens: int = 100):
    sampling_params = SamplingParams(
        temperature=0.8,
        max_tokens=max_tokens
    )
    outputs = llm.generate([prompt], sampling_params)
    return {"generated_text": outputs[0].outputs[0].text}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Docker部署:

FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04

RUN pip install vllm fastapi uvicorn

COPY app.py /app/app.py

CMD ["python", "/app/app.py"]
docker build -t llm-server .
docker run --gpus all -p 8000:8000 llm-server

6.2 云端部署

AWS SageMaker:

from sagemaker.huggingface import HuggingFaceModel

# 创建模型
huggingface_model = HuggingFaceModel(
    model_data='s3://bucket/model.tar.gz',
    role=role,
    transformers_version='4.26',
    pytorch_version='1.13',
    py_version='py39',
)

# 部署
predictor = huggingface_model.deploy(
    initial_instance_count=1,
    instance_type='ml.g5.xlarge'
)

# 推理
result = predictor.predict({
    "inputs": "Once upon a time,"
})

GCP Vertex AI:

from google.cloud import aiplatform

aiplatform.init(project='my-project')

endpoint = aiplatform.Endpoint.create(
    display_name='llm-endpoint'
)

model = aiplatform.Model.upload(
    display_name='llama-2-7b',
    serving_container_image_uri='gcr.io/my-project/llm-server'
)

model.deploy(
    endpoint=endpoint,
    machine_type='n1-standard-8',
    accelerator_type='NVIDIA_TESLA_T4',
    accelerator_count=1
)

6.3 边缘部署

Raspberry Pi (CPU):

# 使用llama.cpp
./main -m models/tinyllama-1.1b-q4_0.gguf \
       -p "Hello" \
       -n 50 \
       -t 4

# 性能: ~5 tokens/s (RPi 4B)

手机端(Android):

// 使用MLC LLM
MLCEngine engine = new MLCEngine("Llama-2-7b-chat-q4f16_1");

String response = engine.generate(
    "What is the capital of France?",
    100  // max_tokens
);

性能对比:

设备模型量化速度内存
RTX 4090Llama-2-70BFP1650 tok/s140GB
A100 40GBLlama-2-70BINT430 tok/s35GB
MacBook M2Llama-2-7BQ420 tok/s5GB
Raspberry Pi 4TinyLlama-1BQ45 tok/s1GB

7. 高频面试题

面试题1: INT8量化为什么不会显著降低精度?

答案:

理论基础:

  1. 权重分布特性
import matplotlib.pyplot as plt

# 分析权重分布
weights = model.conv1.weight.data.flatten().cpu().numpy()

plt.hist(weights, bins=100)
plt.title('Weight Distribution')
plt.show()

# 发现: 权重集中在小范围内(如-0.5到0.5)
# 大部分权重可被INT8精确表示
  1. 激活值的冗余性
# ReLU后的激活值
activations = model.get_activations(input_data)

# 分析:
# - 很多激活值为0(ReLU截断)
# - 非零值通常在可预测范围内
# - INT8的256个离散值足够表示
  1. 神经网络的鲁棒性
训练时: 使用dropout, noise augmentation等
结果: 网络对小扰动不敏感
量化误差 ≈ 训练时的噪声 → 影响很小

实验验证:

import torch
from torch.quantization import quantize_dynamic

def compare_outputs(model, quantized_model, test_data):
    model.eval()
    quantized_model.eval()

    with torch.no_grad():
        fp32_out = model(test_data)
        int8_out = quantized_model(test_data)

    # 输出差异
    mse = torch.mean((fp32_out - int8_out) ** 2)
    max_diff = torch.max(torch.abs(fp32_out - int8_out))

    print(f"MSE: {mse:.6f}")
    print(f"Max Diff: {max_diff:.6f}")
    print(f"Cosine Similarity: {torch.cosine_similarity(fp32_out, int8_out, dim=-1).mean():.6f}")

# Llama-2-7B实测:
# MSE: 0.000012
# Max Diff: 0.003
# Cosine Similarity: 0.9998

关键因素:

  • 校准数据质量(代表性样本)
  • 量化策略(per-tensor vs per-channel)
  • 模型架构(Transformer对量化友好)

面试题2: vLLM的PagedAttention如何工作?

答案:

传统问题:

# 预分配固定大小的KV Cache
max_seq_len = 2048
kv_cache = torch.zeros(batch_size, max_seq_len, hidden_dim)# 1. 实际生成长度 << max_seq_len → 浪费内存
# 2. 不同请求长度不同 → 碎片化
# 3. 内存利用率: 30-50%

PagedAttention解决方案:

核心思想: 借鉴OS虚拟内存,将KV Cache分页管理。

数据结构:

class PagedKVCache:
    def __init__(self, block_size=16, num_blocks=1000):
        self.block_size = block_size  # 每页存16个token
        self.num_blocks = num_blocks

        # 物理内存: 所有GPU上的KV块
        self.physical_blocks = torch.zeros(
            num_blocks,
            2,  # K和V
            num_layers,
            block_size,
            num_heads,
            head_dim
        )

        # 空闲块管理
        self.free_blocks = set(range(num_blocks))

        # 页表: seq_id → [物理块ID列表]
        self.page_tables = {}

    def allocate(self, seq_id):
        """为新序列分配第一个块"""
        if not self.free_blocks:
            raise OOM("No free blocks")

        block_id = self.free_blocks.pop()
        self.page_tables[seq_id] = [block_id]
        return block_id

    def append_slot(self, seq_id):
        """为序列追加一个token的空间"""
        pages = self.page_tables[seq_id]
        last_page = pages[-1]

        # 检查最后一页是否已满
        if self.get_page_fill(last_page) >= self.block_size:
            # 分配新页
            new_block = self.free_blocks.pop()
            pages.append(new_block)
            return new_block, 0  # 新块的第0个位置
        else:
            # 使用当前页
            offset = self.get_page_fill(last_page)
            return last_page, offset

    def get_kv(self, seq_id, layer_id, token_idx):
        """获取指定token的KV"""
        page_idx = token_idx // self.block_size
        offset = token_idx % self.block_size

        physical_block = self.page_tables[seq_id][page_idx]
        return self.physical_blocks[physical_block, :, layer_id, offset]

    def free(self, seq_id):
        """释放序列的所有块"""
        for block_id in self.page_tables[seq_id]:
            self.free_blocks.add(block_id)
        del self.page_tables[seq_id]

Attention计算:

def paged_attention(Q, kv_cache, page_table, seq_len):
    """
    Q: [batch, num_heads, 1, head_dim]  # 当前token的query
    kv_cache: PagedKVCache
    page_table: 当前batch的页表
    """
    batch_size = Q.shape[0]
    outputs = []

    for i in range(batch_size):
        # 收集该序列的所有KV
        seq_id = page_table[i]['seq_id']
        pages = kv_cache.page_tables[seq_id]

        # 拼接所有页的KV
        K_list = []
        V_list = []
        for page_id in pages:
            K_page = kv_cache.physical_blocks[page_id, 0, layer_id, :seq_len]
            V_page = kv_cache.physical_blocks[page_id, 1, layer_id, :seq_len]
            K_list.append(K_page)
            V_list.append(V_page)

        K = torch.cat(K_list, dim=0)  # [seq_len, num_heads, head_dim]
        V = torch.cat(V_list, dim=0)

        # 标准Attention
        scores = torch.matmul(Q[i], K.transpose(-2, -1)) / math.sqrt(head_dim)
        attn = torch.softmax(scores, dim=-1)
        out = torch.matmul(attn, V)
        outputs.append(out)

    return torch.stack(outputs)

优势:

  1. 高内存利用率(95%+)
传统: 分配2048空间,实际用100 → 浪费95%
Paged: 按需分配7页(16×7=112) → 利用率93%
  1. 支持序列共享
# Beam Search: 多个候选共享前缀
seq1: "The cat"
seq2: "The cat sat"
seq3: "The cat jumped"

# PagedAttention: "The cat"部分共享物理页
page_tables = {
    'seq1': [page0, page1],
    'seq2': [page0, page1, page2],  # 共享page0, page1
    'seq3': [page0, page1, page3]   # 共享page0, page1
}
  1. 动态批处理友好
序列完成 → 立即释放所有页 → 新序列重用
无需等待整个batch完成

实测效果(Llama-2-13B, A100):

  • 吞吐量: 2.5× ↑
  • 内存利用率: 50% → 90%
  • 支持batch size: 2× ↑

面试题3: 如何选择量化方法(GPTQ vs AWQ vs GGUF)?

答案:

对比表格:

特性GPTQAWQGGUF
硬件GPU (CUDA)GPU (CUDA)CPU/Metal/Vulkan
精度高最高中
速度快最快慢
量化时间长(2-4小时)短(15-30分钟)中(30-60分钟)
内存占用中中高(CPU内存)
易用性中中高

决策树:

def choose_quantization(requirements):
    hardware = requirements['hardware']
    priority = requirements['priority']  # 'speed', 'accuracy', 'ease'

    if hardware == 'CPU':
        return 'GGUF'

    if hardware == 'GPU':
        if priority == 'accuracy':
            return 'AWQ'
        elif priority == 'speed' and requirements['has_amp']:
            return 'AWQ'
        else:
            return 'GPTQ'

    if hardware == 'Apple Silicon':
        return 'GGUF'  # Metal加速

# 场景推荐
scenarios = {
    '生产API(GPU)': 'AWQ',
    '本地开发(无GPU)': 'GGUF q4_0',
    '边缘设备': 'GGUF q4_0',
    '预算有限(GPU)': 'GPTQ',
    'Apple M系列': 'GGUF q4_K_M'
}

实测对比(Llama-2-7B):

import time

models = {
    'FP16': load_fp16_model(),
    'GPTQ-4bit': load_gptq_model(),
    'AWQ-4bit': load_awq_model(),
    'GGUF-q4_0': load_gguf_model()
}

def benchmark(model, prompt, n_runs=10):
    times = []
    for _ in range(n_runs):
        start = time.time()
        output = model.generate(prompt, max_tokens=100)
        times.append(time.time() - start)

    return {
        'avg_latency': np.mean(times),
        'throughput': 100 / np.mean(times),  # tokens/s
        'memory': get_gpu_memory(model)
    }

results = {name: benchmark(model, test_prompt) for name, model in models.items()}

结果(A100 40GB):

模型延迟(ms)吞吐(tok/s)内存(GB)PPL
FP1645222213.55.68
GPTQ2245453.55.89
AWQ1855563.55.81
GGUF(GPU)2540004.05.95

GGUF在CPU(M2 Max):

  • 延迟: 150ms
  • 吞吐: 666 tokens/s
  • 内存: 5GB

面试题4: Continuous Batching如何实现?

答案:

传统Static Batching问题:

# 批次处理
batch = [req1(100 tokens), req2(50 tokens), req3(80 tokens)]必须等最长的请求(100 tokens)完成
# req2在50 tokens后空闲等待
# req3在80 tokens后空闲等待
# 浪费: (100-50) + (100-80) = 70 tokens的计算时间

Continuous Batching实现:

import asyncio
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class Request:
    id: str
    prompt: str
    max_tokens: int
    generated_tokens: int = 0
    output: str = ""
    finished: bool = False

class ContinuousBatcher:
    def __init__(self, model, max_batch_size=32):
        self.model = model
        self.max_batch_size = max_batch_size
        self.active_requests = []
        self.pending_requests = asyncio.Queue()

    async def add_request(self, request: Request):
        """添加新请求到队列"""
        await self.pending_requests.put(request)

    async def run(self):
        """主循环: 持续处理批次"""
        while True:
            # 1. 移除已完成的请求
            self.active_requests = [r for r in self.active_requests if not r.finished]

            # 2. 从pending队列补充新请求
            while len(self.active_requests) < self.max_batch_size:
                try:
                    new_req = self.pending_requests.get_nowait()
                    self.active_requests.append(new_req)
                except asyncio.QueueEmpty:
                    break

            if not self.active_requests:
                await asyncio.sleep(0.01)
                continue

            # 3. 生成下一个token
            await self.generate_step()

    async def generate_step(self):
        """为当前批次生成一个token"""
        # 准备批次输入
        batch_prompts = [r.prompt + r.output for r in self.active_requests]

        # 模型生成(只生成1个token)
        next_tokens = self.model.generate_next_token(batch_prompts)

        # 更新每个请求
        for i, request in enumerate(self.active_requests):
            token = next_tokens[i]
            request.output += token
            request.generated_tokens += 1

            # 检查是否完成
            if (request.generated_tokens >= request.max_tokens or
                token == self.model.eos_token):
                request.finished = True

# 使用
batcher = ContinuousBatcher(model, max_batch_size=32)

# 启动后台处理
asyncio.create_task(batcher.run())

# 添加请求
await batcher.add_request(Request(id="1", prompt="Hello", max_tokens=100))
await batcher.add_request(Request(id="2", prompt="Hi", max_tokens=50))

# 动态变化:
# Step 1: batch = [req1, req2]
# Step 50: req2完成 → batch = [req1, req3(新加入)]
# Step 100: req1完成, req3完成 → batch = [req4, req5(新)]

vLLM实现(简化版):

class LLMEngine:
    def __init__(self):
        self.scheduler = Scheduler()
        self.model = Model()

    def add_request(self, request):
        self.scheduler.add_request(request)

    def step(self):
        # 1. 调度: 选择要处理的请求
        batch = self.scheduler.schedule()

        # 2. 生成
        outputs = self.model.generate(batch)

        # 3. 更新状态
        for request, output in zip(batch, outputs):
            request.append_token(output)

            if request.is_finished():
                self.scheduler.finish_request(request)

    def run(self):
        while True:
            self.step()

效果对比:

指标Static BatchingContinuous Batching
平均延迟2.5s1.2s (↓52%)
吞吐量1000 tok/s2400 tok/s (↑2.4×)
GPU利用率60%90%

面试题5: 如何监控和优化LLM推理服务?

答案:

关键指标:

1. 延迟指标

import time
from prometheus_client import Histogram

# Prometheus监控
REQUEST_LATENCY = Histogram(
    'llm_request_latency_seconds',
    'Request latency',
    buckets=[0.1, 0.5, 1, 2, 5, 10]
)

@app.post("/generate")
@REQUEST_LATENCY.time()
async def generate(request: Request):
    start = time.time()

    # TTFT (Time To First Token)
    first_token_time = None

    async for token in model.generate_stream(request.prompt):
        if first_token_time is None:
            first_token_time = time.time() - start
            metrics['ttft'].observe(first_token_time)

        yield token

    # 总延迟
    total_time = time.time() - start
    metrics['total_latency'].observe(total_time)

    # TPOT (Time Per Output Token)
    num_tokens = len(tokens)
    tpot = (total_time - first_token_time) / (num_tokens - 1)
    metrics['tpot'].observe(tpot)

2. 吞吐量指标

from prometheus_client import Counter, Gauge

REQUESTS_TOTAL = Counter('llm_requests_total', 'Total requests')
TOKENS_GENERATED = Counter('llm_tokens_generated_total', 'Total tokens generated')
ACTIVE_REQUESTS = Gauge('llm_active_requests', 'Active requests')

# 实时监控
@app.middleware("http")
async def monitor(request, call_next):
    REQUESTS_TOTAL.inc()
    ACTIVE_REQUESTS.inc()

    response = await call_next(request)

    ACTIVE_REQUESTS.dec()
    return response

3. 资源指标

import pynvml
from prometheus_client import Gauge

pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(0)

GPU_MEMORY = Gauge('gpu_memory_used_mb', 'GPU memory usage')
GPU_UTIL = Gauge('gpu_utilization_percent', 'GPU utilization')

def collect_gpu_metrics():
    while True:
        info = pynvml.nvmlDeviceGetMemoryInfo(handle)
        GPU_MEMORY.set(info.used / 1024**2)

        util = pynvml.nvmlDeviceGetUtilizationRates(handle)
        GPU_UTIL.set(util.gpu)

        time.sleep(5)

4. 质量指标

# A/B测试
from random import random

@app.post("/generate")
async def generate(request: Request):
    # 5%流量使用新模型
    model = model_v2 if random() < 0.05 else model_v1

    response = await model.generate(request.prompt)

    # 记录模型版本
    metrics['model_version'].labels(version=model.version).inc()

    return response

# 用户反馈
@app.post("/feedback")
async def feedback(request_id: str, rating: int):
    metrics['user_rating'].labels(rating=rating).inc()

优化策略:

1. 自动扩缩容

# Kubernetes HPA
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: llm-service
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: llm-service
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Pods
    pods:
      metric:
        name: llm_active_requests
      target:
        type: AverageValue
        averageValue: "5"  # 每个pod处理5个请求

2. 请求路由

class LoadBalancer:
    def __init__(self, replicas):
        self.replicas = replicas

    def route(self, request):
        # 根据prompt长度路由
        prompt_len = len(request.prompt)

        if prompt_len < 100:
            # 短请求: 路由到小模型
            return self.replicas['small']
        elif prompt_len < 500:
            # 中请求: 路由到中模型
            return self.replicas['medium']
        else:
            # 长请求: 路由到大模型
            return self.replicas['large']

3. 缓存

from functools import lru_cache
import hashlib

class ResponseCache:
    def __init__(self, redis_client):
        self.redis = redis_client

    def get(self, prompt, params):
        key = self.make_key(prompt, params)
        cached = self.redis.get(key)
        if cached:
            metrics['cache_hits'].inc()
            return json.loads(cached)
        metrics['cache_misses'].inc()
        return None

    def set(self, prompt, params, response, ttl=3600):
        key = self.make_key(prompt, params)
        self.redis.setex(key, ttl, json.dumps(response))

    def make_key(self, prompt, params):
        content = f"{prompt}:{json.dumps(params, sort_keys=True)}"
        return hashlib.md5(content.encode()).hexdigest()

完整监控Dashboard(Grafana):

panels:
  - title: "Request Latency (p50, p95, p99)"
    query: |
      histogram_quantile(0.5, rate(llm_request_latency_seconds_bucket[5m]))
      histogram_quantile(0.95, rate(llm_request_latency_seconds_bucket[5m]))
      histogram_quantile(0.99, rate(llm_request_latency_seconds_bucket[5m]))

  - title: "Throughput (tokens/s)"
    query: rate(llm_tokens_generated_total[1m])

  - title: "GPU Memory Usage"
    query: gpu_memory_used_mb

  - title: "Error Rate"
    query: rate(llm_requests_total{status="error"}[5m]) / rate(llm_requests_total[5m])