模型部署与优化
1. 模型量化
量化通过降低模型参数精度来减少内存占用和加速推理。
1.1 量化原理
浮点数表示:
FP32 (32位): 1符号位 + 8指数位 + 23尾数位
FP16 (16位): 1符号位 + 5指数位 + 10尾数位
INT8 (8位): 1符号位 + 7数值位
INT4 (4位): 1符号位 + 3数值位
量化公式:
x_quantized = round(x / scale) + zero_point
x_dequantized = (x_quantized - zero_point) * scale
scale = (x_max - x_min) / (q_max - q_min)
zero_point = q_min - round(x_min / scale)
对比表格:
| 精度 | 内存占用 | 速度 | 精度损失 | 适用场景 |
|---|---|---|---|---|
| FP32 | 1× | 1× | 0% | 训练,基准 |
| FP16 | 0.5× | 2-3× | <1% | 训练,推理 |
| INT8 | 0.25× | 3-4× | 1-2% | 推理 |
| INT4 | 0.125× | 4-6× | 2-5% | 推理(大模型) |
1.2 INT8量化
PyTorch动态量化:
import torch
from torch.quantization import quantize_dynamic
model = MyModel()
model.load_state_dict(torch.load('model.pth'))
# 动态量化(推理时计算scale)
quantized_model = quantize_dynamic(
model,
{torch.nn.Linear}, # 量化的层类型
dtype=torch.qint8
)
# 保存
torch.save(quantized_model.state_dict(), 'quantized_model.pth')
# 大小对比
import os
fp32_size = os.path.getsize('model.pth') / (1024**2)
int8_size = os.path.getsize('quantized_model.pth') / (1024**2)
print(f"FP32: {fp32_size:.2f}MB")
print(f"INT8: {int8_size:.2f}MB")
print(f"压缩比: {fp32_size/int8_size:.2f}x")
静态量化(需校准):
from torch.quantization import get_default_qconfig, prepare, convert
model.eval()
model.qconfig = get_default_qconfig('fbgemm') # x86 CPU
# model.qconfig = get_default_qconfig('qnnpack') # ARM
# 准备量化
model_prepared = prepare(model)
# 校准(在代表性数据上运行)
with torch.no_grad():
for data in calibration_dataloader:
model_prepared(data)
# 转换为量化模型
model_quantized = convert(model_prepared)
LLM.int8():
from transformers import AutoModelForCausalLM
# 8bit量化加载大模型
model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-2-7b-hf",
load_in_8bit=True,
device_map="auto"
)
# 内存对比:
# FP16: 13GB
# INT8: 6.5GB (节省50%)
1.3 INT4量化
GPTQ (GPT Quantization)
基于Optimal Brain Quantization的4bit量化。
from transformers import AutoModelForCausalLM, GPTQConfig
# GPTQ配置
gptq_config = GPTQConfig(
bits=4, # 4bit量化
group_size=128, # 量化分组大小
desc_act=False, # 激活值量化顺序
)
# 加载GPTQ量化模型
model = AutoModelForCausalLM.from_pretrained(
"TheBloke/Llama-2-7B-GPTQ",
device_map="auto",
quantization_config=gptq_config
)
# 推理
inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
outputs = model.generate(**inputs, max_new_tokens=100)
自己量化模型:
from auto_gptq import AutoGPTQForCausalLM, BaseQuantizeConfig
# 量化配置
quantize_config = BaseQuantizeConfig(
bits=4,
group_size=128,
desc_act=False,
)
# 加载模型
model = AutoGPTQForCausalLM.from_pretrained(
"meta-llama/Llama-2-7b-hf",
quantize_config=quantize_config
)
# 量化(需要校准数据)
model.quantize(calibration_data)
# 保存
model.save_quantized("llama2-7b-gptq-4bit")
AWQ (Activation-aware Weight Quantization)
保留对激活值影响大的权重通道。
from awq import AutoAWQForCausalLM
from transformers import AutoTokenizer
model_path = "meta-llama/Llama-2-7b-hf"
quant_path = "llama2-7b-awq"
# 加载模型
model = AutoAWQForCausalLM.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)
# 量化配置
quant_config = {
"zero_point": True,
"q_group_size": 128,
"w_bit": 4,
"version": "GEMM"
}
# 量化
model.quantize(tokenizer, quant_config=quant_config, calib_data=calibration_data)
# 保存
model.save_quantized(quant_path)
GPTQ vs AWQ:
| 特性 | GPTQ | AWQ |
|---|---|---|
| 量化时间 | 长(数小时) | 短(数十分钟) |
| 推理速度 | 快 | 更快 |
| 精度损失 | 低 | 更低 |
| 内存占用 | 相同 | 相同 |
| 硬件要求 | 标准GPU | 标准GPU |
GGUF (GPT-Generated Unified Format)
llama.cpp的量化格式,支持CPU推理。
# 转换为GGUF
python convert.py --outtype f16 models/llama-2-7b/
# 量化
./quantize models/llama-2-7b/ggml-model-f16.gguf models/llama-2-7b/ggml-model-q4_0.gguf q4_0
# 量化类型:
# q4_0: 4.5 bpw (bits per weight)
# q4_1: 5.0 bpw
# q5_0: 5.5 bpw
# q5_1: 6.0 bpw
# q8_0: 8.5 bpw
量化效果对比(Llama-2-7B):
| 量化方法 | 模型大小 | 内存占用 | 速度提升 | PPL↓ | MMLU |
|---|---|---|---|---|---|
| FP16 | 13GB | 14GB | 1.0× | 5.68 | 45.3% |
| INT8 | 6.5GB | 8GB | 1.5× | 5.72 | 45.1% |
| GPTQ 4bit | 3.5GB | 5GB | 2.0× | 5.89 | 44.2% |
| AWQ 4bit | 3.5GB | 5GB | 2.5× | 5.81 | 44.7% |
| GGUF q4_0 | 3.8GB | 6GB | 1.8× | 5.95 | 43.8% |
2. 模型剪枝
剪枝通过移除不重要的参数来减小模型。
2.1 非结构化剪枝
移除单个权重,产生稀疏矩阵。
import torch
import torch.nn.utils.prune as prune
# L1非结构化剪枝
model = MyModel()
module = model.conv1
prune.l1_unstructured(
module,
name="weight",
amount=0.3 # 剪枝30%的权重
)
# 查看剪枝效果
print(f"剪枝后零值比例: {(module.weight == 0).sum().item() / module.weight.numel():.2%}")
# 永久移除(不可恢复)
prune.remove(module, 'weight')
全局剪枝:
# 对整个模型剪枝
parameters_to_prune = []
for module in model.modules():
if isinstance(module, torch.nn.Conv2d) or isinstance(module, torch.nn.Linear):
parameters_to_prune.append((module, 'weight'))
prune.global_unstructured(
parameters_to_prune,
pruning_method=prune.L1Unstructured,
amount=0.5 # 全局剪枝50%
)
2.2 结构化剪枝
移除整个通道/神经元,保持稠密矩阵。
# 剪枝整个卷积通道
prune.ln_structured(
module,
name="weight",
amount=0.3,
n=2, # L2范数
dim=0 # 沿输出通道剪枝
)
# 实际移除通道
def remove_channels(model, prune_ratio=0.3):
for name, module in model.named_modules():
if isinstance(module, torch.nn.Conv2d):
# 计算每个通道的重要性
importance = torch.norm(module.weight.data, p=2, dim=(1, 2, 3))
# 选择要保留的通道
n_keep = int(module.out_channels * (1 - prune_ratio))
keep_indices = torch.argsort(importance, descending=True)[:n_keep]
# 创建新的Conv层
new_conv = torch.nn.Conv2d(
in_channels=module.in_channels,
out_channels=n_keep,
kernel_size=module.kernel_size,
stride=module.stride,
padding=module.padding
)
new_conv.weight.data = module.weight.data[keep_indices]
new_conv.bias.data = module.bias.data[keep_indices] if module.bias is not None else None
# 替换
setattr(model, name, new_conv)
结构化 vs 非结构化:
| 特性 | 非结构化 | 结构化 |
|---|---|---|
| 压缩比 | 高 | 中 |
| 加速效果 | 需要稀疏计算支持 | 直接加速 |
| 精度损失 | 小 | 中等 |
| 硬件友好 | 否 | 是 |
2.3 LLM剪枝
SparseGPT:
from sparsegpt import SparseGPT
model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-7b")
# 剪枝至50%稀疏度
pruner = SparseGPT(model)
pruner.prune(sparsity=0.5, prunen=2, prunem=4) # 2:4结构化稀疏
# 保存
model.save_pretrained("llama2-7b-sparse50")
Wanda (Pruning by Weights and Activations):
from wanda import WandaPruner
pruner = WandaPruner(model, tokenizer)
# 使用校准数据剪枝
pruner.prune(
calibration_data=calib_data,
sparsity=0.5,
prune_n=2,
prune_m=4
)
3. 知识蒸馏
通过Teacher模型指导Student模型训练。
3.1 基础蒸馏
import torch
import torch.nn as nn
import torch.nn.functional as F
class DistillationLoss(nn.Module):
def __init__(self, temperature=3.0, alpha=0.5):
super().__init__()
self.temperature = temperature
self.alpha = alpha
self.ce_loss = nn.CrossEntropyLoss()
def forward(self, student_logits, teacher_logits, labels):
# KL散度损失(软标签)
soft_loss = F.kl_div(
F.log_softmax(student_logits / self.temperature, dim=-1),
F.softmax(teacher_logits / self.temperature, dim=-1),
reduction='batchmean'
) * (self.temperature ** 2)
# 交叉熵损失(硬标签)
hard_loss = self.ce_loss(student_logits, labels)
# 组合
return self.alpha * soft_loss + (1 - self.alpha) * hard_loss
# 训练
teacher_model.eval()
student_model.train()
distill_loss_fn = DistillationLoss(temperature=3.0, alpha=0.5)
for batch in dataloader:
inputs, labels = batch
# Teacher预测(不计算梯度)
with torch.no_grad():
teacher_logits = teacher_model(inputs)
# Student预测
student_logits = student_model(inputs)
# 蒸馏损失
loss = distill_loss_fn(student_logits, teacher_logits, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
3.2 LLM蒸馏
DistilBERT:
from transformers import DistilBertForSequenceClassification, BertForSequenceClassification
from transformers import DistillationTrainingArguments, DistillationTrainer
# Teacher: BERT-base (110M参数)
teacher = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2)
# Student: DistilBERT (66M参数, 40%压缩)
student = DistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased', num_labels=2)
# 蒸馏训练
training_args = DistillationTrainingArguments(
output_dir="./distilled_model",
temperature=2.0,
alpha_ce=0.5,
alpha_mlm=0.5,
)
trainer = DistillationTrainer(
teacher_model=teacher,
student_model=student,
args=training_args,
train_dataset=train_dataset,
)
trainer.train()
效果对比:
| 模型 | 参数量 | 速度 | GLUE分数 |
|---|---|---|---|
| BERT-base | 110M | 1.0× | 84.0 |
| DistilBERT | 66M | 1.6× | 81.8 |
| TinyBERT | 14.5M | 9.4× | 79.2 |
4. 推理加速
4.1 vLLM
高吞吐量的LLM推理引擎。
核心技术:
- PagedAttention: KV Cache分页管理
- Continuous Batching: 动态批处理
- CUDA Kernel优化
安装和使用:
from vllm import LLM, SamplingParams
# 初始化模型
llm = LLM(
model="meta-llama/Llama-2-7b-hf",
tensor_parallel_size=2, # 张量并行(2块GPU)
gpu_memory_utilization=0.9,
)
# 采样参数
sampling_params = SamplingParams(
temperature=0.8,
top_p=0.95,
max_tokens=100
)
# 批量推理
prompts = [
"Once upon a time,",
"The meaning of life is",
"In a galaxy far far away,"
]
outputs = llm.generate(prompts, sampling_params)
for output in outputs:
print(f"Prompt: {output.prompt}")
print(f"Generated: {output.outputs[0].text}")
print("-" * 50)
性能对比(Llama-2-7B, A100):
| 框架 | 吞吐量(tokens/s) | 延迟(ms) | 内存占用 |
|---|---|---|---|
| HuggingFace | 200 | 150 | 16GB |
| TGI | 800 | 50 | 14GB |
| vLLM | 2400 | 30 | 12GB |
4.2 TGI (Text Generation Inference)
HuggingFace的推理服务器。
Docker启动:
docker run --gpus all --shm-size 1g -p 8080:80 \
-v $PWD/data:/data \
ghcr.io/huggingface/text-generation-inference:latest \
--model-id meta-llama/Llama-2-7b-hf \
--num-shard 2 \
--max-batch-prefill-tokens 4096 \
--max-total-tokens 8192
客户端调用:
import requests
API_URL = "http://localhost:8080/generate"
payload = {
"inputs": "Once upon a time,",
"parameters": {
"max_new_tokens": 100,
"temperature": 0.7,
"top_p": 0.95,
"do_sample": True
}
}
response = requests.post(API_URL, json=payload)
print(response.json()["generated_text"])
流式生成:
import requests
API_URL = "http://localhost:8080/generate_stream"
with requests.post(API_URL, json=payload, stream=True) as response:
for chunk in response.iter_lines():
if chunk:
data = json.loads(chunk)
print(data["token"]["text"], end="", flush=True)
4.3 llama.cpp
CPU推理优化,支持量化模型。
编译和运行:
# 编译
git clone https://github.com/ggerganov/llama.cpp
cd llama.cpp
make
# 转换模型
python convert.py --outtype f16 models/llama-2-7b/
# 量化
./quantize models/llama-2-7b/ggml-model-f16.gguf models/llama-2-7b/ggml-model-q4_0.gguf q4_0
# 推理
./main -m models/llama-2-7b/ggml-model-q4_0.gguf \
-p "Once upon a time" \
-n 128 \
-t 8 # 8线程
Python绑定:
from llama_cpp import Llama
llm = Llama(
model_path="models/llama-2-7b/ggml-model-q4_0.gguf",
n_ctx=2048, # 上下文长度
n_threads=8, # CPU线程数
n_gpu_layers=0 # 0=纯CPU
)
output = llm(
"Once upon a time,",
max_tokens=100,
temperature=0.8,
top_p=0.95,
)
print(output["choices"][0]["text"])
4.4 Ollama
本地LLM运行工具,极简易用。
安装:
# macOS/Linux
curl -fsSL https://ollama.com/install.sh | sh
# Windows
# 下载安装包: https://ollama.com/download
使用:
# 下载模型
ollama pull llama2
# 运行
ollama run llama2
# API服务
ollama serve
Python调用:
import requests
import json
def chat(prompt):
response = requests.post(
"http://localhost:11434/api/generate",
json={
"model": "llama2",
"prompt": prompt,
"stream": False
}
)
return response.json()["response"]
result = chat("Explain quantum computing in simple terms")
print(result)
框架对比:
| 框架 | 硬件 | 易用性 | 性能 | 适用场景 |
|---|---|---|---|---|
| vLLM | GPU | 中 | 极高 | 生产环境,高并发 |
| TGI | GPU | 高 | 高 | HF生态,快速部署 |
| llama.cpp | CPU | 中 | 中 | 无GPU,边缘设备 |
| Ollama | CPU/GPU | 极高 | 中 | 本地开发,快速原型 |
5. 并发优化
5.1 Continuous Batching
动态批处理,减少等待时间。
传统Static Batching:
批次1: [请求A(100 tokens), 请求B(50 tokens), 请求C(80 tokens)]
问题: 必须等所有请求(100 tokens)完成才能开始批次2
Continuous Batching:
# vLLM自动实现
# 请求B完成后(50 tokens),立即加入新请求D
批次动态: [A(100), B完成→D加入, C(80), D(新)]
效果:
- 吞吐量提升2-3×
- 延迟降低30-50%
5.2 KV Cache
缓存已计算的键值对,避免重复计算。
原理:
# 不使用KV Cache
for i in range(seq_len):
# 每次都要计算前面所有token的K, V
K = compute_key(tokens[:i+1])
V = compute_value(tokens[:i+1])
output = attention(Q, K, V)
# 使用KV Cache
kv_cache = []
for i in range(seq_len):
# 只计算新token的K, V
k_new = compute_key(tokens[i])
v_new = compute_value(tokens[i])
kv_cache.append((k_new, v_new))
# 使用缓存的K, V
K = concat([kv[0] for kv in kv_cache])
V = concat([kv[1] for kv in kv_cache])
output = attention(Q, K, V)
内存占用:
每个token的KV Cache:
2 (K和V) × n_layers × n_heads × head_dim × 2 bytes (FP16)
Llama-2-7B (上下文2048):
2 × 32 × 32 × 128 × 2 × 2048 = 1.07GB
优化技巧:
# 1. Multi-Query Attention (MQA)
# 所有head共享K, V
# KV Cache: 1.07GB → 33MB (32×压缩)
# 2. Grouped-Query Attention (GQA)
# 每组head共享K, V
# KV Cache介于MHA和MQA之间
5.3 PagedAttention
vLLM的核心技术,类似OS的虚拟内存。
问题:
- KV Cache大小不确定(生成长度未知)
- 预分配导致内存碎片和浪费
解决方案:
# 将KV Cache分页存储
class PagedKVCache:
def __init__(self, block_size=16):
self.block_size = block_size # 每页16个token
self.blocks = {} # 物理内存块
self.page_table = {} # 逻辑到物理的映射
def allocate(self, seq_id, num_tokens):
num_blocks = (num_tokens + self.block_size - 1) // self.block_size
pages = [self.allocate_block() for _ in range(num_blocks)]
self.page_table[seq_id] = pages
def get(self, seq_id, token_idx):
page_idx = token_idx // self.block_size
offset = token_idx % self.block_size
physical_block = self.page_table[seq_id][page_idx]
return self.blocks[physical_block][offset]
优势:
- 内存利用率: 90%+ (vs 传统方法的30-50%)
- 支持高效的序列共享(beam search, parallel sampling)
5.4 批处理优化
# 智能批处理
class SmartBatcher:
def __init__(self, max_batch_size=32, max_wait_ms=10):
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.queue = []
async def add_request(self, request):
self.queue.append(request)
# 条件1: 达到最大批次大小
if len(self.queue) >= self.max_batch_size:
return await self.process_batch()
# 条件2: 等待超时
await asyncio.sleep(self.max_wait_ms / 1000)
if self.queue:
return await self.process_batch()
async def process_batch(self):
batch = self.queue[:self.max_batch_size]
self.queue = self.queue[self.max_batch_size:]
# 按长度分组,减少padding
batch_sorted = sorted(batch, key=lambda x: len(x.tokens))
results = await model.generate_batch(batch_sorted)
return results
6. 部署方案
6.1 本地部署
单机GPU:
# FastAPI服务
from fastapi import FastAPI
from vllm import LLM, SamplingParams
import uvicorn
app = FastAPI()
llm = LLM(model="meta-llama/Llama-2-7b-hf")
@app.post("/generate")
async def generate(prompt: str, max_tokens: int = 100):
sampling_params = SamplingParams(
temperature=0.8,
max_tokens=max_tokens
)
outputs = llm.generate([prompt], sampling_params)
return {"generated_text": outputs[0].outputs[0].text}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Docker部署:
FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04
RUN pip install vllm fastapi uvicorn
COPY app.py /app/app.py
CMD ["python", "/app/app.py"]
docker build -t llm-server .
docker run --gpus all -p 8000:8000 llm-server
6.2 云端部署
AWS SageMaker:
from sagemaker.huggingface import HuggingFaceModel
# 创建模型
huggingface_model = HuggingFaceModel(
model_data='s3://bucket/model.tar.gz',
role=role,
transformers_version='4.26',
pytorch_version='1.13',
py_version='py39',
)
# 部署
predictor = huggingface_model.deploy(
initial_instance_count=1,
instance_type='ml.g5.xlarge'
)
# 推理
result = predictor.predict({
"inputs": "Once upon a time,"
})
GCP Vertex AI:
from google.cloud import aiplatform
aiplatform.init(project='my-project')
endpoint = aiplatform.Endpoint.create(
display_name='llm-endpoint'
)
model = aiplatform.Model.upload(
display_name='llama-2-7b',
serving_container_image_uri='gcr.io/my-project/llm-server'
)
model.deploy(
endpoint=endpoint,
machine_type='n1-standard-8',
accelerator_type='NVIDIA_TESLA_T4',
accelerator_count=1
)
6.3 边缘部署
Raspberry Pi (CPU):
# 使用llama.cpp
./main -m models/tinyllama-1.1b-q4_0.gguf \
-p "Hello" \
-n 50 \
-t 4
# 性能: ~5 tokens/s (RPi 4B)
手机端(Android):
// 使用MLC LLM
MLCEngine engine = new MLCEngine("Llama-2-7b-chat-q4f16_1");
String response = engine.generate(
"What is the capital of France?",
100 // max_tokens
);
性能对比:
| 设备 | 模型 | 量化 | 速度 | 内存 |
|---|---|---|---|---|
| RTX 4090 | Llama-2-70B | FP16 | 50 tok/s | 140GB |
| A100 40GB | Llama-2-70B | INT4 | 30 tok/s | 35GB |
| MacBook M2 | Llama-2-7B | Q4 | 20 tok/s | 5GB |
| Raspberry Pi 4 | TinyLlama-1B | Q4 | 5 tok/s | 1GB |
7. 高频面试题
面试题1: INT8量化为什么不会显著降低精度?
答案:
理论基础:
- 权重分布特性
import matplotlib.pyplot as plt
# 分析权重分布
weights = model.conv1.weight.data.flatten().cpu().numpy()
plt.hist(weights, bins=100)
plt.title('Weight Distribution')
plt.show()
# 发现: 权重集中在小范围内(如-0.5到0.5)
# 大部分权重可被INT8精确表示
- 激活值的冗余性
# ReLU后的激活值
activations = model.get_activations(input_data)
# 分析:
# - 很多激活值为0(ReLU截断)
# - 非零值通常在可预测范围内
# - INT8的256个离散值足够表示
- 神经网络的鲁棒性
训练时: 使用dropout, noise augmentation等
结果: 网络对小扰动不敏感
量化误差 ≈ 训练时的噪声 → 影响很小
实验验证:
import torch
from torch.quantization import quantize_dynamic
def compare_outputs(model, quantized_model, test_data):
model.eval()
quantized_model.eval()
with torch.no_grad():
fp32_out = model(test_data)
int8_out = quantized_model(test_data)
# 输出差异
mse = torch.mean((fp32_out - int8_out) ** 2)
max_diff = torch.max(torch.abs(fp32_out - int8_out))
print(f"MSE: {mse:.6f}")
print(f"Max Diff: {max_diff:.6f}")
print(f"Cosine Similarity: {torch.cosine_similarity(fp32_out, int8_out, dim=-1).mean():.6f}")
# Llama-2-7B实测:
# MSE: 0.000012
# Max Diff: 0.003
# Cosine Similarity: 0.9998
关键因素:
- 校准数据质量(代表性样本)
- 量化策略(per-tensor vs per-channel)
- 模型架构(Transformer对量化友好)
面试题2: vLLM的PagedAttention如何工作?
答案:
传统问题:
# 预分配固定大小的KV Cache
max_seq_len = 2048
kv_cache = torch.zeros(batch_size, max_seq_len, hidden_dim)# 1. 实际生成长度 << max_seq_len → 浪费内存
# 2. 不同请求长度不同 → 碎片化
# 3. 内存利用率: 30-50%
PagedAttention解决方案:
核心思想: 借鉴OS虚拟内存,将KV Cache分页管理。
数据结构:
class PagedKVCache:
def __init__(self, block_size=16, num_blocks=1000):
self.block_size = block_size # 每页存16个token
self.num_blocks = num_blocks
# 物理内存: 所有GPU上的KV块
self.physical_blocks = torch.zeros(
num_blocks,
2, # K和V
num_layers,
block_size,
num_heads,
head_dim
)
# 空闲块管理
self.free_blocks = set(range(num_blocks))
# 页表: seq_id → [物理块ID列表]
self.page_tables = {}
def allocate(self, seq_id):
"""为新序列分配第一个块"""
if not self.free_blocks:
raise OOM("No free blocks")
block_id = self.free_blocks.pop()
self.page_tables[seq_id] = [block_id]
return block_id
def append_slot(self, seq_id):
"""为序列追加一个token的空间"""
pages = self.page_tables[seq_id]
last_page = pages[-1]
# 检查最后一页是否已满
if self.get_page_fill(last_page) >= self.block_size:
# 分配新页
new_block = self.free_blocks.pop()
pages.append(new_block)
return new_block, 0 # 新块的第0个位置
else:
# 使用当前页
offset = self.get_page_fill(last_page)
return last_page, offset
def get_kv(self, seq_id, layer_id, token_idx):
"""获取指定token的KV"""
page_idx = token_idx // self.block_size
offset = token_idx % self.block_size
physical_block = self.page_tables[seq_id][page_idx]
return self.physical_blocks[physical_block, :, layer_id, offset]
def free(self, seq_id):
"""释放序列的所有块"""
for block_id in self.page_tables[seq_id]:
self.free_blocks.add(block_id)
del self.page_tables[seq_id]
Attention计算:
def paged_attention(Q, kv_cache, page_table, seq_len):
"""
Q: [batch, num_heads, 1, head_dim] # 当前token的query
kv_cache: PagedKVCache
page_table: 当前batch的页表
"""
batch_size = Q.shape[0]
outputs = []
for i in range(batch_size):
# 收集该序列的所有KV
seq_id = page_table[i]['seq_id']
pages = kv_cache.page_tables[seq_id]
# 拼接所有页的KV
K_list = []
V_list = []
for page_id in pages:
K_page = kv_cache.physical_blocks[page_id, 0, layer_id, :seq_len]
V_page = kv_cache.physical_blocks[page_id, 1, layer_id, :seq_len]
K_list.append(K_page)
V_list.append(V_page)
K = torch.cat(K_list, dim=0) # [seq_len, num_heads, head_dim]
V = torch.cat(V_list, dim=0)
# 标准Attention
scores = torch.matmul(Q[i], K.transpose(-2, -1)) / math.sqrt(head_dim)
attn = torch.softmax(scores, dim=-1)
out = torch.matmul(attn, V)
outputs.append(out)
return torch.stack(outputs)
优势:
- 高内存利用率(95%+)
传统: 分配2048空间,实际用100 → 浪费95%
Paged: 按需分配7页(16×7=112) → 利用率93%
- 支持序列共享
# Beam Search: 多个候选共享前缀
seq1: "The cat"
seq2: "The cat sat"
seq3: "The cat jumped"
# PagedAttention: "The cat"部分共享物理页
page_tables = {
'seq1': [page0, page1],
'seq2': [page0, page1, page2], # 共享page0, page1
'seq3': [page0, page1, page3] # 共享page0, page1
}
- 动态批处理友好
序列完成 → 立即释放所有页 → 新序列重用
无需等待整个batch完成
实测效果(Llama-2-13B, A100):
- 吞吐量: 2.5× ↑
- 内存利用率: 50% → 90%
- 支持batch size: 2× ↑
面试题3: 如何选择量化方法(GPTQ vs AWQ vs GGUF)?
答案:
对比表格:
| 特性 | GPTQ | AWQ | GGUF |
|---|---|---|---|
| 硬件 | GPU (CUDA) | GPU (CUDA) | CPU/Metal/Vulkan |
| 精度 | 高 | 最高 | 中 |
| 速度 | 快 | 最快 | 慢 |
| 量化时间 | 长(2-4小时) | 短(15-30分钟) | 中(30-60分钟) |
| 内存占用 | 中 | 中 | 高(CPU内存) |
| 易用性 | 中 | 中 | 高 |
决策树:
def choose_quantization(requirements):
hardware = requirements['hardware']
priority = requirements['priority'] # 'speed', 'accuracy', 'ease'
if hardware == 'CPU':
return 'GGUF'
if hardware == 'GPU':
if priority == 'accuracy':
return 'AWQ'
elif priority == 'speed' and requirements['has_amp']:
return 'AWQ'
else:
return 'GPTQ'
if hardware == 'Apple Silicon':
return 'GGUF' # Metal加速
# 场景推荐
scenarios = {
'生产API(GPU)': 'AWQ',
'本地开发(无GPU)': 'GGUF q4_0',
'边缘设备': 'GGUF q4_0',
'预算有限(GPU)': 'GPTQ',
'Apple M系列': 'GGUF q4_K_M'
}
实测对比(Llama-2-7B):
import time
models = {
'FP16': load_fp16_model(),
'GPTQ-4bit': load_gptq_model(),
'AWQ-4bit': load_awq_model(),
'GGUF-q4_0': load_gguf_model()
}
def benchmark(model, prompt, n_runs=10):
times = []
for _ in range(n_runs):
start = time.time()
output = model.generate(prompt, max_tokens=100)
times.append(time.time() - start)
return {
'avg_latency': np.mean(times),
'throughput': 100 / np.mean(times), # tokens/s
'memory': get_gpu_memory(model)
}
results = {name: benchmark(model, test_prompt) for name, model in models.items()}
结果(A100 40GB):
| 模型 | 延迟(ms) | 吞吐(tok/s) | 内存(GB) | PPL |
|---|---|---|---|---|
| FP16 | 45 | 2222 | 13.5 | 5.68 |
| GPTQ | 22 | 4545 | 3.5 | 5.89 |
| AWQ | 18 | 5556 | 3.5 | 5.81 |
| GGUF(GPU) | 25 | 4000 | 4.0 | 5.95 |
GGUF在CPU(M2 Max):
- 延迟: 150ms
- 吞吐: 666 tokens/s
- 内存: 5GB
面试题4: Continuous Batching如何实现?
答案:
传统Static Batching问题:
# 批次处理
batch = [req1(100 tokens), req2(50 tokens), req3(80 tokens)]必须等最长的请求(100 tokens)完成
# req2在50 tokens后空闲等待
# req3在80 tokens后空闲等待
# 浪费: (100-50) + (100-80) = 70 tokens的计算时间
Continuous Batching实现:
import asyncio
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class Request:
id: str
prompt: str
max_tokens: int
generated_tokens: int = 0
output: str = ""
finished: bool = False
class ContinuousBatcher:
def __init__(self, model, max_batch_size=32):
self.model = model
self.max_batch_size = max_batch_size
self.active_requests = []
self.pending_requests = asyncio.Queue()
async def add_request(self, request: Request):
"""添加新请求到队列"""
await self.pending_requests.put(request)
async def run(self):
"""主循环: 持续处理批次"""
while True:
# 1. 移除已完成的请求
self.active_requests = [r for r in self.active_requests if not r.finished]
# 2. 从pending队列补充新请求
while len(self.active_requests) < self.max_batch_size:
try:
new_req = self.pending_requests.get_nowait()
self.active_requests.append(new_req)
except asyncio.QueueEmpty:
break
if not self.active_requests:
await asyncio.sleep(0.01)
continue
# 3. 生成下一个token
await self.generate_step()
async def generate_step(self):
"""为当前批次生成一个token"""
# 准备批次输入
batch_prompts = [r.prompt + r.output for r in self.active_requests]
# 模型生成(只生成1个token)
next_tokens = self.model.generate_next_token(batch_prompts)
# 更新每个请求
for i, request in enumerate(self.active_requests):
token = next_tokens[i]
request.output += token
request.generated_tokens += 1
# 检查是否完成
if (request.generated_tokens >= request.max_tokens or
token == self.model.eos_token):
request.finished = True
# 使用
batcher = ContinuousBatcher(model, max_batch_size=32)
# 启动后台处理
asyncio.create_task(batcher.run())
# 添加请求
await batcher.add_request(Request(id="1", prompt="Hello", max_tokens=100))
await batcher.add_request(Request(id="2", prompt="Hi", max_tokens=50))
# 动态变化:
# Step 1: batch = [req1, req2]
# Step 50: req2完成 → batch = [req1, req3(新加入)]
# Step 100: req1完成, req3完成 → batch = [req4, req5(新)]
vLLM实现(简化版):
class LLMEngine:
def __init__(self):
self.scheduler = Scheduler()
self.model = Model()
def add_request(self, request):
self.scheduler.add_request(request)
def step(self):
# 1. 调度: 选择要处理的请求
batch = self.scheduler.schedule()
# 2. 生成
outputs = self.model.generate(batch)
# 3. 更新状态
for request, output in zip(batch, outputs):
request.append_token(output)
if request.is_finished():
self.scheduler.finish_request(request)
def run(self):
while True:
self.step()
效果对比:
| 指标 | Static Batching | Continuous Batching |
|---|---|---|
| 平均延迟 | 2.5s | 1.2s (↓52%) |
| 吞吐量 | 1000 tok/s | 2400 tok/s (↑2.4×) |
| GPU利用率 | 60% | 90% |
面试题5: 如何监控和优化LLM推理服务?
答案:
关键指标:
1. 延迟指标
import time
from prometheus_client import Histogram
# Prometheus监控
REQUEST_LATENCY = Histogram(
'llm_request_latency_seconds',
'Request latency',
buckets=[0.1, 0.5, 1, 2, 5, 10]
)
@app.post("/generate")
@REQUEST_LATENCY.time()
async def generate(request: Request):
start = time.time()
# TTFT (Time To First Token)
first_token_time = None
async for token in model.generate_stream(request.prompt):
if first_token_time is None:
first_token_time = time.time() - start
metrics['ttft'].observe(first_token_time)
yield token
# 总延迟
total_time = time.time() - start
metrics['total_latency'].observe(total_time)
# TPOT (Time Per Output Token)
num_tokens = len(tokens)
tpot = (total_time - first_token_time) / (num_tokens - 1)
metrics['tpot'].observe(tpot)
2. 吞吐量指标
from prometheus_client import Counter, Gauge
REQUESTS_TOTAL = Counter('llm_requests_total', 'Total requests')
TOKENS_GENERATED = Counter('llm_tokens_generated_total', 'Total tokens generated')
ACTIVE_REQUESTS = Gauge('llm_active_requests', 'Active requests')
# 实时监控
@app.middleware("http")
async def monitor(request, call_next):
REQUESTS_TOTAL.inc()
ACTIVE_REQUESTS.inc()
response = await call_next(request)
ACTIVE_REQUESTS.dec()
return response
3. 资源指标
import pynvml
from prometheus_client import Gauge
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(0)
GPU_MEMORY = Gauge('gpu_memory_used_mb', 'GPU memory usage')
GPU_UTIL = Gauge('gpu_utilization_percent', 'GPU utilization')
def collect_gpu_metrics():
while True:
info = pynvml.nvmlDeviceGetMemoryInfo(handle)
GPU_MEMORY.set(info.used / 1024**2)
util = pynvml.nvmlDeviceGetUtilizationRates(handle)
GPU_UTIL.set(util.gpu)
time.sleep(5)
4. 质量指标
# A/B测试
from random import random
@app.post("/generate")
async def generate(request: Request):
# 5%流量使用新模型
model = model_v2 if random() < 0.05 else model_v1
response = await model.generate(request.prompt)
# 记录模型版本
metrics['model_version'].labels(version=model.version).inc()
return response
# 用户反馈
@app.post("/feedback")
async def feedback(request_id: str, rating: int):
metrics['user_rating'].labels(rating=rating).inc()
优化策略:
1. 自动扩缩容
# Kubernetes HPA
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: llm-service
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: llm-service
minReplicas: 2
maxReplicas: 10
metrics:
- type: Pods
pods:
metric:
name: llm_active_requests
target:
type: AverageValue
averageValue: "5" # 每个pod处理5个请求
2. 请求路由
class LoadBalancer:
def __init__(self, replicas):
self.replicas = replicas
def route(self, request):
# 根据prompt长度路由
prompt_len = len(request.prompt)
if prompt_len < 100:
# 短请求: 路由到小模型
return self.replicas['small']
elif prompt_len < 500:
# 中请求: 路由到中模型
return self.replicas['medium']
else:
# 长请求: 路由到大模型
return self.replicas['large']
3. 缓存
from functools import lru_cache
import hashlib
class ResponseCache:
def __init__(self, redis_client):
self.redis = redis_client
def get(self, prompt, params):
key = self.make_key(prompt, params)
cached = self.redis.get(key)
if cached:
metrics['cache_hits'].inc()
return json.loads(cached)
metrics['cache_misses'].inc()
return None
def set(self, prompt, params, response, ttl=3600):
key = self.make_key(prompt, params)
self.redis.setex(key, ttl, json.dumps(response))
def make_key(self, prompt, params):
content = f"{prompt}:{json.dumps(params, sort_keys=True)}"
return hashlib.md5(content.encode()).hexdigest()
完整监控Dashboard(Grafana):
panels:
- title: "Request Latency (p50, p95, p99)"
query: |
histogram_quantile(0.5, rate(llm_request_latency_seconds_bucket[5m]))
histogram_quantile(0.95, rate(llm_request_latency_seconds_bucket[5m]))
histogram_quantile(0.99, rate(llm_request_latency_seconds_bucket[5m]))
- title: "Throughput (tokens/s)"
query: rate(llm_tokens_generated_total[1m])
- title: "GPU Memory Usage"
query: gpu_memory_used_mb
- title: "Error Rate"
query: rate(llm_requests_total{status="error"}[5m]) / rate(llm_requests_total[5m])