从零搭建 LLM 推理服务
项目概述
本项目从零开始搭建一个生产级 LLM 推理服务,涵盖模型加载、推理优化、API 服务、监控告警全流程。
项目目标
┌─────────────────────────────────────────────────────────────────────────────┐
│ LLM 推理服务架构 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 客户端请求 │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ API Gateway (Nginx) │ │
│ │ 负载均衡 / 限流 / SSL │ │
│ └──────────────────────────────┬──────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ FastAPI 应用层 │ │
│ │ • 请求验证 • 流式响应 • 会话管理 • 错误处理 │ │
│ └──────────────────────────────┬──────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────┼─────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 推理引擎实例1 │ │ 推理引擎实例2 │ │ 推理引擎实例3 │ │
│ │ (vLLM) │ │ (vLLM) │ │ (vLLM) │ │
│ │ GPU 0-1 │ │ GPU 2-3 │ │ GPU 4-5 │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 监控 & 日志 │ │
│ │ Prometheus │ Grafana │ ELK Stack │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
核心指标目标:
• 首 Token 延迟 (TTFT): < 500ms
• 吞吐量: > 1000 tokens/s (单实例)
• 可用性: 99.9%
• 并发支持: 100+ 并发请求
技术栈
| 组件 | 技术选型 | 说明 |
|---|---|---|
| 推理引擎 | vLLM | 高性能 LLM 推理 |
| API 框架 | FastAPI | 异步 API 服务 |
| 容器化 | Docker | 容器部署 |
| 编排 | Kubernetes | 容器编排 |
| 负载均衡 | Nginx | 反向代理 |
| 监控 | Prometheus + Grafana | 指标监控 |
| 日志 | ELK Stack | 日志分析 |
第一阶段:基础推理服务
1.1 项目结构
llm-inference-service/
├── src/
│ ├── __init__.py
│ ├── config.py # 配置管理
│ ├── engine.py # 推理引擎封装
│ ├── api.py # API 路由
│ ├── schemas.py # 请求/响应模型
│ └── middleware.py # 中间件
├── tests/
│ ├── test_api.py
│ └── test_engine.py
├── deploy/
│ ├── Dockerfile
│ ├── docker-compose.yml
│ └── k8s/
│ ├── deployment.yaml
│ ├── service.yaml
│ └── configmap.yaml
├── monitoring/
│ ├── prometheus.yml
│ └── grafana/
├── requirements.txt
└── README.md
1.2 配置管理
# src/config.py
"""
配置管理
"""
from pydantic_settings import BaseSettings
from typing import Optional, List
from functools import lru_cache
class Settings(BaseSettings):
"""应用配置"""
# 服务配置
app_name: str = "LLM Inference Service"
host: str = "0.0.0.0"
port: int = 8000
workers: int = 1
# 模型配置
model_name: str = "meta-llama/Llama-2-7b-chat-hf"
model_path: Optional[str] = None
dtype: str = "float16"
quantization: Optional[str] = None # "awq", "gptq", None
# vLLM 配置
tensor_parallel_size: int = 1
max_model_len: int = 4096
gpu_memory_utilization: float = 0.9
enforce_eager: bool = False
# 推理配置
max_tokens: int = 2048
temperature: float = 0.7
top_p: float = 0.9
top_k: int = 50
# 并发配置
max_concurrent_requests: int = 100
max_batch_size: int = 32
request_timeout: float = 60.0
# 认证配置
api_key_enabled: bool = False
api_keys: List[str] = []
# 日志配置
log_level: str = "INFO"
log_format: str = "json"
class Config:
env_file = ".env"
env_prefix = "LLM_"
@lru_cache()
def get_settings() -> Settings:
return Settings()
1.3 推理引擎封装
# src/engine.py
"""
vLLM 推理引擎封装
"""
import asyncio
from typing import AsyncGenerator, List, Optional, Dict, Any
from dataclasses import dataclass
import time
import logging
from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
from vllm.outputs import RequestOutput
from .config import get_settings
logger = logging.getLogger(__name__)
@dataclass
class GenerationResult:
"""生成结果"""
request_id: str
text: str
tokens: List[int]
finish_reason: str
prompt_tokens: int
completion_tokens: int
latency_ms: float
@dataclass
class StreamChunk:
"""流式输出块"""
request_id: str
text: str
token_id: int
is_finished: bool
finish_reason: Optional[str] = None
class InferenceEngine:
"""推理引擎"""
def __init__(self):
self.settings = get_settings()
self.engine: Optional[AsyncLLMEngine] = None
self._request_counter = 0
self._lock = asyncio.Lock()
# 统计信息
self.stats = {
"total_requests": 0,
"total_tokens_generated": 0,
"total_latency_ms": 0,
"active_requests": 0
}
async def initialize(self):
"""初始化引擎"""
logger.info(f"Initializing inference engine with model: {self.settings.model_name}")
engine_args = AsyncEngineArgs(
model=self.settings.model_path or self.settings.model_name,
dtype=self.settings.dtype,
tensor_parallel_size=self.settings.tensor_parallel_size,
max_model_len=self.settings.max_model_len,
gpu_memory_utilization=self.settings.gpu_memory_utilization,
enforce_eager=self.settings.enforce_eager,
quantization=self.settings.quantization,
trust_remote_code=True,
)
self.engine = AsyncLLMEngine.from_engine_args(engine_args)
logger.info("Inference engine initialized successfully")
async def shutdown(self):
"""关闭引擎"""
if self.engine:
# vLLM 清理
pass
logger.info("Inference engine shut down")
async def _get_request_id(self) -> str:
"""生成请求 ID"""
async with self._lock:
self._request_counter += 1
return f"req_{self._request_counter}_{int(time.time()*1000)}"
async def generate(
self,
prompt: str,
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
top_k: Optional[int] = None,
stop: Optional[List[str]] = None,
**kwargs
) -> GenerationResult:
"""非流式生成"""
request_id = await self._get_request_id()
start_time = time.time()
sampling_params = SamplingParams(
max_tokens=max_tokens or self.settings.max_tokens,
temperature=temperature or self.settings.temperature,
top_p=top_p or self.settings.top_p,
top_k=top_k or self.settings.top_k,
stop=stop,
)
self.stats["active_requests"] += 1
try:
results_generator = self.engine.generate(
prompt=prompt,
sampling_params=sampling_params,
request_id=request_id
)
final_output = None
async for output in results_generator:
final_output = output
if final_output is None:
raise RuntimeError("No output generated")
output = final_output.outputs[0]
latency_ms = (time.time() - start_time) * 1000
# 更新统计
self.stats["total_requests"] += 1
self.stats["total_tokens_generated"] += len(output.token_ids)
self.stats["total_latency_ms"] += latency_ms
return GenerationResult(
request_id=request_id,
text=output.text,
tokens=output.token_ids,
finish_reason=output.finish_reason,
prompt_tokens=len(final_output.prompt_token_ids),
completion_tokens=len(output.token_ids),
latency_ms=latency_ms
)
finally:
self.stats["active_requests"] -= 1
async def generate_stream(
self,
prompt: str,
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
top_k: Optional[int] = None,
stop: Optional[List[str]] = None,
**kwargs
) -> AsyncGenerator[StreamChunk, None]:
"""流式生成"""
request_id = await self._get_request_id()
sampling_params = SamplingParams(
max_tokens=max_tokens or self.settings.max_tokens,
temperature=temperature or self.settings.temperature,
top_p=top_p or self.settings.top_p,
top_k=top_k or self.settings.top_k,
stop=stop,
)
self.stats["active_requests"] += 1
previous_text = ""
try:
results_generator = self.engine.generate(
prompt=prompt,
sampling_params=sampling_params,
request_id=request_id
)
async for output in results_generator:
if output.outputs:
current_output = output.outputs[0]
current_text = current_output.text
# 计算增量文本
delta_text = current_text[len(previous_text):]
previous_text = current_text
if delta_text:
# 获取最新的 token id
token_id = current_output.token_ids[-1] if current_output.token_ids else 0
is_finished = current_output.finish_reason is not None
yield StreamChunk(
request_id=request_id,
text=delta_text,
token_id=token_id,
is_finished=is_finished,
finish_reason=current_output.finish_reason
)
if is_finished:
self.stats["total_tokens_generated"] += len(current_output.token_ids)
finally:
self.stats["active_requests"] -= 1
self.stats["total_requests"] += 1
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
stats = self.stats.copy()
if stats["total_requests"] > 0:
stats["avg_latency_ms"] = stats["total_latency_ms"] / stats["total_requests"]
stats["avg_tokens_per_request"] = stats["total_tokens_generated"] / stats["total_requests"]
else:
stats["avg_latency_ms"] = 0
stats["avg_tokens_per_request"] = 0
return stats
# 全局引擎实例
_engine: Optional[InferenceEngine] = None
async def get_engine() -> InferenceEngine:
"""获取引擎实例"""
global _engine
if _engine is None:
_engine = InferenceEngine()
await _engine.initialize()
return _engine
1.4 API 定义
# src/schemas.py
"""
请求/响应模型
"""
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from enum import Enum
class Role(str, Enum):
SYSTEM = "system"
USER = "user"
ASSISTANT = "assistant"
class Message(BaseModel):
"""聊天消息"""
role: Role
content: str
class ChatCompletionRequest(BaseModel):
"""聊天补全请求 (OpenAI 兼容)"""
model: str = Field(default="default")
messages: List[Message]
max_tokens: Optional[int] = Field(default=None, ge=1, le=4096)
temperature: Optional[float] = Field(default=None, ge=0, le=2)
top_p: Optional[float] = Field(default=None, ge=0, le=1)
top_k: Optional[int] = Field(default=None, ge=1)
stop: Optional[List[str]] = None
stream: bool = False
user: Optional[str] = None
class CompletionRequest(BaseModel):
"""文本补全请求"""
model: str = Field(default="default")
prompt: str
max_tokens: Optional[int] = Field(default=None, ge=1, le=4096)
temperature: Optional[float] = Field(default=None, ge=0, le=2)
top_p: Optional[float] = Field(default=None, ge=0, le=1)
stop: Optional[List[str]] = None
stream: bool = False
class Usage(BaseModel):
"""使用量"""
prompt_tokens: int
completion_tokens: int
total_tokens: int
class Choice(BaseModel):
"""生成选项"""
index: int
message: Message
finish_reason: str
class ChatCompletionResponse(BaseModel):
"""聊天补全响应"""
id: str
object: str = "chat.completion"
created: int
model: str
choices: List[Choice]
usage: Usage
class DeltaMessage(BaseModel):
"""流式消息增量"""
role: Optional[str] = None
content: Optional[str] = None
class StreamChoice(BaseModel):
"""流式选项"""
index: int
delta: DeltaMessage
finish_reason: Optional[str] = None
class ChatCompletionStreamResponse(BaseModel):
"""流式响应"""
id: str
object: str = "chat.completion.chunk"
created: int
model: str
choices: List[StreamChoice]
class HealthResponse(BaseModel):
"""健康检查响应"""
status: str
model: str
gpu_memory_used: Optional[float] = None
active_requests: int
1.5 API 路由
# src/api.py
"""
API 路由
"""
import time
import asyncio
from typing import AsyncGenerator
from fastapi import FastAPI, HTTPException, Depends, Header
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
import json
from .schemas import (
ChatCompletionRequest,
ChatCompletionResponse,
ChatCompletionStreamResponse,
CompletionRequest,
Message,
Choice,
StreamChoice,
DeltaMessage,
Usage,
HealthResponse,
Role
)
from .engine import get_engine, InferenceEngine
from .config import get_settings
app = FastAPI(title="LLM Inference Service", version="1.0.0")
def format_chat_prompt(messages: list[Message]) -> str:
"""格式化聊天消息为 prompt"""
# Llama 2 Chat 格式
prompt = ""
system_message = None
for msg in messages:
if msg.role == Role.SYSTEM:
system_message = msg.content
elif msg.role == Role.USER:
if system_message:
prompt += f"<s>[INST] <<SYS>>\n{system_message}\n<</SYS>>\n\n{msg.content} [/INST]"
system_message = None
else:
prompt += f"<s>[INST] {msg.content} [/INST]"
elif msg.role == Role.ASSISTANT:
prompt += f" {msg.content} </s>"
return prompt
async def verify_api_key(authorization: str = Header(None)):
"""API Key 验证"""
settings = get_settings()
if not settings.api_key_enabled:
return True
if not authorization:
raise HTTPException(status_code=401, detail="Missing Authorization header")
# Bearer token 格式
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid Authorization format")
api_key = authorization[7:]
if api_key not in settings.api_keys:
raise HTTPException(status_code=401, detail="Invalid API key")
return True
@app.get("/health", response_model=HealthResponse)
async def health_check(engine: InferenceEngine = Depends(get_engine)):
"""健康检查"""
stats = engine.get_stats()
return HealthResponse(
status="healthy",
model=get_settings().model_name,
active_requests=stats["active_requests"]
)
@app.get("/v1/models")
async def list_models():
"""列出可用模型"""
settings = get_settings()
return {
"object": "list",
"data": [
{
"id": settings.model_name,
"object": "model",
"owned_by": "organization",
"permission": []
}
]
}
@app.post("/v1/chat/completions")
async def chat_completions(
request: ChatCompletionRequest,
engine: InferenceEngine = Depends(get_engine),
_: bool = Depends(verify_api_key)
):
"""聊天补全接口 (OpenAI 兼容)"""
# 格式化 prompt
prompt = format_chat_prompt(request.messages)
if request.stream:
return EventSourceResponse(
stream_chat_response(request, prompt, engine),
media_type="text/event-stream"
)
# 非流式响应
result = await engine.generate(
prompt=prompt,
max_tokens=request.max_tokens,
temperature=request.temperature,
top_p=request.top_p,
top_k=request.top_k,
stop=request.stop
)
return ChatCompletionResponse(
id=result.request_id,
created=int(time.time()),
model=request.model,
choices=[
Choice(
index=0,
message=Message(role=Role.ASSISTANT, content=result.text),
finish_reason=result.finish_reason
)
],
usage=Usage(
prompt_tokens=result.prompt_tokens,
completion_tokens=result.completion_tokens,
total_tokens=result.prompt_tokens + result.completion_tokens
)
)
async def stream_chat_response(
request: ChatCompletionRequest,
prompt: str,
engine: InferenceEngine
) -> AsyncGenerator[str, None]:
"""流式聊天响应"""
request_id = f"chatcmpl-{int(time.time()*1000)}"
# 发送角色
first_chunk = ChatCompletionStreamResponse(
id=request_id,
created=int(time.time()),
model=request.model,
choices=[
StreamChoice(
index=0,
delta=DeltaMessage(role="assistant"),
finish_reason=None
)
]
)
yield f"data: {first_chunk.model_dump_json()}\n\n"
# 流式生成
async for chunk in engine.generate_stream(
prompt=prompt,
max_tokens=request.max_tokens,
temperature=request.temperature,
top_p=request.top_p,
top_k=request.top_k,
stop=request.stop
):
response = ChatCompletionStreamResponse(
id=request_id,
created=int(time.time()),
model=request.model,
choices=[
StreamChoice(
index=0,
delta=DeltaMessage(content=chunk.text),
finish_reason=chunk.finish_reason if chunk.is_finished else None
)
]
)
yield f"data: {response.model_dump_json()}\n\n"
if chunk.is_finished:
break
yield "data: [DONE]\n\n"
@app.get("/metrics")
async def metrics(engine: InferenceEngine = Depends(get_engine)):
"""Prometheus 格式指标"""
stats = engine.get_stats()
metrics_text = f"""# HELP llm_requests_total Total number of requests
# TYPE llm_requests_total counter
llm_requests_total {stats['total_requests']}
# HELP llm_tokens_generated_total Total tokens generated
# TYPE llm_tokens_generated_total counter
llm_tokens_generated_total {stats['total_tokens_generated']}
# HELP llm_active_requests Current active requests
# TYPE llm_active_requests gauge
llm_active_requests {stats['active_requests']}
# HELP llm_avg_latency_ms Average latency in milliseconds
# TYPE llm_avg_latency_ms gauge
llm_avg_latency_ms {stats['avg_latency_ms']:.2f}
"""
return metrics_text
1.6 启动脚本
# src/main.py
"""
应用入口
"""
import uvicorn
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
from .api import app
from .engine import get_engine
from .config import get_settings
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 启动时初始化引擎
engine = await get_engine()
yield
# 关闭时清理
await engine.shutdown()
app.router.lifespan_context = lifespan
def main():
settings = get_settings()
uvicorn.run(
"src.main:app",
host=settings.host,
port=settings.port,
workers=settings.workers,
log_level=settings.log_level.lower()
)
if __name__ == "__main__":
main()
第二阶段:容器化部署
2.1 Dockerfile
# deploy/Dockerfile
FROM nvidia/cuda:12.1-devel-ubuntu22.04
# 设置环境变量
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1
# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
python3.10 \
python3-pip \
python3-dev \
git \
curl \
&& rm -rf /var/lib/apt/lists/*
# 设置工作目录
WORKDIR /app
# 安装 Python 依赖
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt
# 复制源代码
COPY src/ ./src/
# 健康检查
HEALTHCHECK \
CMD curl -f http://localhost:8000/health || exit 1
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["python3", "-m", "src.main"]
2.2 Docker Compose
# deploy/docker-compose.yml
version: '3.8'
services:
llm-service:
build:
context: ..
dockerfile: deploy/Dockerfile
ports:
- "8000:8000"
environment:
- LLM_MODEL_NAME=meta-llama/Llama-2-7b-chat-hf
- LLM_TENSOR_PARALLEL_SIZE=1
- LLM_GPU_MEMORY_UTILIZATION=0.9
- LLM_LOG_LEVEL=INFO
volumes:
- ~/.cache/huggingface:/root/.cache/huggingface
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- llm-service
restart: unless-stopped
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ../monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
restart: unless-stopped
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana-data:/var/lib/grafana
restart: unless-stopped
volumes:
grafana-data:
2.3 Kubernetes 部署
# deploy/k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: llm-inference
labels:
app: llm-inference
spec:
replicas: 1
selector:
matchLabels:
app: llm-inference
template:
metadata:
labels:
app: llm-inference
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8000"
prometheus.io/path: "/metrics"
spec:
containers:
- name: llm-service
image: llm-inference:latest
ports:
- containerPort: 8000
resources:
limits:
nvidia.com/gpu: 2
memory: "64Gi"
cpu: "16"
requests:
nvidia.com/gpu: 2
memory: "32Gi"
cpu: "8"
env:
- name: LLM_MODEL_NAME
valueFrom:
configMapKeyRef:
name: llm-config
key: model_name
- name: LLM_TENSOR_PARALLEL_SIZE
value: "2"
- name: LLM_GPU_MEMORY_UTILIZATION
value: "0.9"
envFrom:
- secretRef:
name: llm-secrets
volumeMounts:
- name: model-cache
mountPath: /root/.cache/huggingface
- name: shm
mountPath: /dev/shm
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 120
periodSeconds: 30
timeoutSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 60
periodSeconds: 10
volumes:
- name: model-cache
persistentVolumeClaim:
claimName: model-cache-pvc
- name: shm
emptyDir:
medium: Memory
sizeLimit: 16Gi
nodeSelector:
nvidia.com/gpu.product: NVIDIA-A100-SXM4-80GB
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
---
apiVersion: v1
kind: Service
metadata:
name: llm-inference
spec:
selector:
app: llm-inference
ports:
- port: 80
targetPort: 8000
type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: llm-inference-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: llm-inference
minReplicas: 1
maxReplicas: 4
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Pods
pods:
metric:
name: llm_active_requests
target:
type: AverageValue
averageValue: "50"
第三阶段:监控与优化
3.1 Prometheus 配置
# monitoring/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'llm-inference'
static_configs:
- targets: ['llm-service:8000']
- job_name: 'nvidia-gpu'
static_configs:
- targets: ['dcgm-exporter:9400']
3.2 Grafana Dashboard
{
"dashboard": {
"title": "LLM Inference Dashboard",
"panels": [
{
"title": "Requests per Second",
"type": "graph",
"targets": [
{
"expr": "rate(llm_requests_total[1m])",
"legendFormat": "RPS"
}
]
},
{
"title": "Average Latency",
"type": "gauge",
"targets": [
{
"expr": "llm_avg_latency_ms",
"legendFormat": "Latency (ms)"
}
]
},
{
"title": "Active Requests",
"type": "graph",
"targets": [
{
"expr": "llm_active_requests",
"legendFormat": "Active"
}
]
},
{
"title": "GPU Memory Usage",
"type": "graph",
"targets": [
{
"expr": "DCGM_FI_DEV_FB_USED / DCGM_FI_DEV_FB_TOTAL * 100",
"legendFormat": "GPU {{gpu}}"
}
]
},
{
"title": "Tokens per Second",
"type": "graph",
"targets": [
{
"expr": "rate(llm_tokens_generated_total[1m])",
"legendFormat": "Tokens/s"
}
]
}
]
}
}
测试验证
功能测试
# tests/test_api.py
"""
API 测试
"""
import pytest
import httpx
import asyncio
BASE_URL = "http://localhost:8000"
@pytest.mark.asyncio
async def test_health_check():
"""测试健康检查"""
async with httpx.AsyncClient() as client:
response = await client.get(f"{BASE_URL}/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
@pytest.mark.asyncio
async def test_chat_completion():
"""测试聊天补全"""
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
f"{BASE_URL}/v1/chat/completions",
json={
"messages": [
{"role": "user", "content": "Hello, how are you?"}
],
"max_tokens": 100
}
)
assert response.status_code == 200
data = response.json()
assert "choices" in data
assert len(data["choices"]) > 0
@pytest.mark.asyncio
async def test_streaming():
"""测试流式输出"""
async with httpx.AsyncClient(timeout=60.0) as client:
async with client.stream(
"POST",
f"{BASE_URL}/v1/chat/completions",
json={
"messages": [
{"role": "user", "content": "Count from 1 to 5."}
],
"stream": True,
"max_tokens": 50
}
) as response:
assert response.status_code == 200
chunks = []
async for line in response.aiter_lines():
if line.startswith("data: "):
chunks.append(line)
assert len(chunks) > 0
性能测试
# tests/benchmark.py
"""
性能测试
"""
import asyncio
import httpx
import time
from statistics import mean, stdev
BASE_URL = "http://localhost:8000"
CONCURRENT_REQUESTS = 10
TOTAL_REQUESTS = 100
async def single_request(client: httpx.AsyncClient) -> dict:
"""单个请求"""
start_time = time.time()
response = await client.post(
f"{BASE_URL}/v1/chat/completions",
json={
"messages": [
{"role": "user", "content": "What is 2+2?"}
],
"max_tokens": 50
}
)
latency = (time.time() - start_time) * 1000
data = response.json()
return {
"latency_ms": latency,
"tokens": data["usage"]["completion_tokens"],
"status": response.status_code
}
async def run_benchmark():
"""运行基准测试"""
async with httpx.AsyncClient(timeout=60.0) as client:
semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS)
async def limited_request():
async with semaphore:
return await single_request(client)
# 预热
print("Warming up...")
await asyncio.gather(*[single_request(client) for _ in range(5)])
# 正式测试
print(f"Running {TOTAL_REQUESTS} requests with {CONCURRENT_REQUESTS} concurrency...")
start_time = time.time()
results = await asyncio.gather(*[
limited_request() for _ in range(TOTAL_REQUESTS)
])
total_time = time.time() - start_time
# 统计结果
latencies = [r["latency_ms"] for r in results]
tokens = sum(r["tokens"] for r in results)
success = sum(1 for r in results if r["status"] == 200)
print("\n=== Benchmark Results ===")
print(f"Total Requests: {TOTAL_REQUESTS}")
print(f"Successful: {success}")
print(f"Total Time: {total_time:.2f}s")
print(f"Throughput: {TOTAL_REQUESTS/total_time:.2f} req/s")
print(f"Tokens Generated: {tokens}")
print(f"Token Throughput: {tokens/total_time:.2f} tokens/s")
print(f"\nLatency (ms):")
print(f" Mean: {mean(latencies):.2f}")
print(f" Std: {stdev(latencies):.2f}")
print(f" Min: {min(latencies):.2f}")
print(f" Max: {max(latencies):.2f}")
print(f" P50: {sorted(latencies)[len(latencies)//2]:.2f}")
print(f" P99: {sorted(latencies)[int(len(latencies)*0.99)]:.2f}")
if __name__ == "__main__":
asyncio.run(run_benchmark())
小结
本项目完整实现了一个生产级 LLM 推理服务:
- 推理引擎:基于 vLLM 的高性能推理
- API 服务:OpenAI 兼容的 API 接口
- 流式输出:SSE 流式响应
- 容器化:Docker + Kubernetes 部署
- 监控:Prometheus + Grafana
后续可扩展方向:
- 多模型管理
- A/B 测试
- 缓存层(Redis)
- 更复杂的负载均衡策略