HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

从零搭建 LLM 推理服务

项目概述

本项目从零开始搭建一个生产级 LLM 推理服务,涵盖模型加载、推理优化、API 服务、监控告警全流程。

项目目标

┌─────────────────────────────────────────────────────────────────────────────┐
│                        LLM 推理服务架构                                      │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   客户端请求                                                                 │
│       │                                                                     │
│       ▼                                                                     │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │                      API Gateway (Nginx)                             │  │
│   │                      负载均衡 / 限流 / SSL                            │  │
│   └──────────────────────────────┬──────────────────────────────────────┘  │
│                                  │                                         │
│                                  ▼                                         │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │                      FastAPI 应用层                                  │  │
│   │   • 请求验证   • 流式响应   • 会话管理   • 错误处理                  │  │
│   └──────────────────────────────┬──────────────────────────────────────┘  │
│                                  │                                         │
│            ┌─────────────────────┼─────────────────────┐                  │
│            ▼                     ▼                     ▼                  │
│   ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐             │
│   │  推理引擎实例1   │ │  推理引擎实例2   │ │  推理引擎实例3   │             │
│   │  (vLLM)         │ │  (vLLM)         │ │  (vLLM)         │             │
│   │  GPU 0-1        │ │  GPU 2-3        │ │  GPU 4-5        │             │
│   └─────────────────┘ └─────────────────┘ └─────────────────┘             │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐  │
│   │                        监控 & 日志                                   │  │
│   │        Prometheus  │  Grafana  │  ELK Stack                          │  │
│   └─────────────────────────────────────────────────────────────────────┘  │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

核心指标目标:
• 首 Token 延迟 (TTFT): < 500ms
• 吞吐量: > 1000 tokens/s (单实例)
• 可用性: 99.9%
• 并发支持: 100+ 并发请求

技术栈

组件技术选型说明
推理引擎vLLM高性能 LLM 推理
API 框架FastAPI异步 API 服务
容器化Docker容器部署
编排Kubernetes容器编排
负载均衡Nginx反向代理
监控Prometheus + Grafana指标监控
日志ELK Stack日志分析

第一阶段:基础推理服务

1.1 项目结构

llm-inference-service/
├── src/
│   ├── __init__.py
│   ├── config.py           # 配置管理
│   ├── engine.py           # 推理引擎封装
│   ├── api.py              # API 路由
│   ├── schemas.py          # 请求/响应模型
│   └── middleware.py       # 中间件
├── tests/
│   ├── test_api.py
│   └── test_engine.py
├── deploy/
│   ├── Dockerfile
│   ├── docker-compose.yml
│   └── k8s/
│       ├── deployment.yaml
│       ├── service.yaml
│       └── configmap.yaml
├── monitoring/
│   ├── prometheus.yml
│   └── grafana/
├── requirements.txt
└── README.md

1.2 配置管理

# src/config.py
"""
配置管理
"""
from pydantic_settings import BaseSettings
from typing import Optional, List
from functools import lru_cache

class Settings(BaseSettings):
    """应用配置"""

    # 服务配置
    app_name: str = "LLM Inference Service"
    host: str = "0.0.0.0"
    port: int = 8000
    workers: int = 1

    # 模型配置
    model_name: str = "meta-llama/Llama-2-7b-chat-hf"
    model_path: Optional[str] = None
    dtype: str = "float16"
    quantization: Optional[str] = None  # "awq", "gptq", None

    # vLLM 配置
    tensor_parallel_size: int = 1
    max_model_len: int = 4096
    gpu_memory_utilization: float = 0.9
    enforce_eager: bool = False

    # 推理配置
    max_tokens: int = 2048
    temperature: float = 0.7
    top_p: float = 0.9
    top_k: int = 50

    # 并发配置
    max_concurrent_requests: int = 100
    max_batch_size: int = 32
    request_timeout: float = 60.0

    # 认证配置
    api_key_enabled: bool = False
    api_keys: List[str] = []

    # 日志配置
    log_level: str = "INFO"
    log_format: str = "json"

    class Config:
        env_file = ".env"
        env_prefix = "LLM_"


@lru_cache()
def get_settings() -> Settings:
    return Settings()

1.3 推理引擎封装

# src/engine.py
"""
vLLM 推理引擎封装
"""
import asyncio
from typing import AsyncGenerator, List, Optional, Dict, Any
from dataclasses import dataclass
import time
import logging

from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
from vllm.outputs import RequestOutput

from .config import get_settings

logger = logging.getLogger(__name__)


@dataclass
class GenerationResult:
    """生成结果"""
    request_id: str
    text: str
    tokens: List[int]
    finish_reason: str
    prompt_tokens: int
    completion_tokens: int
    latency_ms: float


@dataclass
class StreamChunk:
    """流式输出块"""
    request_id: str
    text: str
    token_id: int
    is_finished: bool
    finish_reason: Optional[str] = None


class InferenceEngine:
    """推理引擎"""

    def __init__(self):
        self.settings = get_settings()
        self.engine: Optional[AsyncLLMEngine] = None
        self._request_counter = 0
        self._lock = asyncio.Lock()

        # 统计信息
        self.stats = {
            "total_requests": 0,
            "total_tokens_generated": 0,
            "total_latency_ms": 0,
            "active_requests": 0
        }

    async def initialize(self):
        """初始化引擎"""
        logger.info(f"Initializing inference engine with model: {self.settings.model_name}")

        engine_args = AsyncEngineArgs(
            model=self.settings.model_path or self.settings.model_name,
            dtype=self.settings.dtype,
            tensor_parallel_size=self.settings.tensor_parallel_size,
            max_model_len=self.settings.max_model_len,
            gpu_memory_utilization=self.settings.gpu_memory_utilization,
            enforce_eager=self.settings.enforce_eager,
            quantization=self.settings.quantization,
            trust_remote_code=True,
        )

        self.engine = AsyncLLMEngine.from_engine_args(engine_args)
        logger.info("Inference engine initialized successfully")

    async def shutdown(self):
        """关闭引擎"""
        if self.engine:
            # vLLM 清理
            pass
        logger.info("Inference engine shut down")

    async def _get_request_id(self) -> str:
        """生成请求 ID"""
        async with self._lock:
            self._request_counter += 1
            return f"req_{self._request_counter}_{int(time.time()*1000)}"

    async def generate(
        self,
        prompt: str,
        max_tokens: Optional[int] = None,
        temperature: Optional[float] = None,
        top_p: Optional[float] = None,
        top_k: Optional[int] = None,
        stop: Optional[List[str]] = None,
        **kwargs
    ) -> GenerationResult:
        """非流式生成"""
        request_id = await self._get_request_id()
        start_time = time.time()

        sampling_params = SamplingParams(
            max_tokens=max_tokens or self.settings.max_tokens,
            temperature=temperature or self.settings.temperature,
            top_p=top_p or self.settings.top_p,
            top_k=top_k or self.settings.top_k,
            stop=stop,
        )

        self.stats["active_requests"] += 1

        try:
            results_generator = self.engine.generate(
                prompt=prompt,
                sampling_params=sampling_params,
                request_id=request_id
            )

            final_output = None
            async for output in results_generator:
                final_output = output

            if final_output is None:
                raise RuntimeError("No output generated")

            output = final_output.outputs[0]
            latency_ms = (time.time() - start_time) * 1000

            # 更新统计
            self.stats["total_requests"] += 1
            self.stats["total_tokens_generated"] += len(output.token_ids)
            self.stats["total_latency_ms"] += latency_ms

            return GenerationResult(
                request_id=request_id,
                text=output.text,
                tokens=output.token_ids,
                finish_reason=output.finish_reason,
                prompt_tokens=len(final_output.prompt_token_ids),
                completion_tokens=len(output.token_ids),
                latency_ms=latency_ms
            )

        finally:
            self.stats["active_requests"] -= 1

    async def generate_stream(
        self,
        prompt: str,
        max_tokens: Optional[int] = None,
        temperature: Optional[float] = None,
        top_p: Optional[float] = None,
        top_k: Optional[int] = None,
        stop: Optional[List[str]] = None,
        **kwargs
    ) -> AsyncGenerator[StreamChunk, None]:
        """流式生成"""
        request_id = await self._get_request_id()

        sampling_params = SamplingParams(
            max_tokens=max_tokens or self.settings.max_tokens,
            temperature=temperature or self.settings.temperature,
            top_p=top_p or self.settings.top_p,
            top_k=top_k or self.settings.top_k,
            stop=stop,
        )

        self.stats["active_requests"] += 1
        previous_text = ""

        try:
            results_generator = self.engine.generate(
                prompt=prompt,
                sampling_params=sampling_params,
                request_id=request_id
            )

            async for output in results_generator:
                if output.outputs:
                    current_output = output.outputs[0]
                    current_text = current_output.text

                    # 计算增量文本
                    delta_text = current_text[len(previous_text):]
                    previous_text = current_text

                    if delta_text:
                        # 获取最新的 token id
                        token_id = current_output.token_ids[-1] if current_output.token_ids else 0

                        is_finished = current_output.finish_reason is not None

                        yield StreamChunk(
                            request_id=request_id,
                            text=delta_text,
                            token_id=token_id,
                            is_finished=is_finished,
                            finish_reason=current_output.finish_reason
                        )

                        if is_finished:
                            self.stats["total_tokens_generated"] += len(current_output.token_ids)

        finally:
            self.stats["active_requests"] -= 1
            self.stats["total_requests"] += 1

    def get_stats(self) -> Dict[str, Any]:
        """获取统计信息"""
        stats = self.stats.copy()

        if stats["total_requests"] > 0:
            stats["avg_latency_ms"] = stats["total_latency_ms"] / stats["total_requests"]
            stats["avg_tokens_per_request"] = stats["total_tokens_generated"] / stats["total_requests"]
        else:
            stats["avg_latency_ms"] = 0
            stats["avg_tokens_per_request"] = 0

        return stats


# 全局引擎实例
_engine: Optional[InferenceEngine] = None


async def get_engine() -> InferenceEngine:
    """获取引擎实例"""
    global _engine
    if _engine is None:
        _engine = InferenceEngine()
        await _engine.initialize()
    return _engine

1.4 API 定义

# src/schemas.py
"""
请求/响应模型
"""
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from enum import Enum


class Role(str, Enum):
    SYSTEM = "system"
    USER = "user"
    ASSISTANT = "assistant"


class Message(BaseModel):
    """聊天消息"""
    role: Role
    content: str


class ChatCompletionRequest(BaseModel):
    """聊天补全请求 (OpenAI 兼容)"""
    model: str = Field(default="default")
    messages: List[Message]
    max_tokens: Optional[int] = Field(default=None, ge=1, le=4096)
    temperature: Optional[float] = Field(default=None, ge=0, le=2)
    top_p: Optional[float] = Field(default=None, ge=0, le=1)
    top_k: Optional[int] = Field(default=None, ge=1)
    stop: Optional[List[str]] = None
    stream: bool = False
    user: Optional[str] = None


class CompletionRequest(BaseModel):
    """文本补全请求"""
    model: str = Field(default="default")
    prompt: str
    max_tokens: Optional[int] = Field(default=None, ge=1, le=4096)
    temperature: Optional[float] = Field(default=None, ge=0, le=2)
    top_p: Optional[float] = Field(default=None, ge=0, le=1)
    stop: Optional[List[str]] = None
    stream: bool = False


class Usage(BaseModel):
    """使用量"""
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int


class Choice(BaseModel):
    """生成选项"""
    index: int
    message: Message
    finish_reason: str


class ChatCompletionResponse(BaseModel):
    """聊天补全响应"""
    id: str
    object: str = "chat.completion"
    created: int
    model: str
    choices: List[Choice]
    usage: Usage


class DeltaMessage(BaseModel):
    """流式消息增量"""
    role: Optional[str] = None
    content: Optional[str] = None


class StreamChoice(BaseModel):
    """流式选项"""
    index: int
    delta: DeltaMessage
    finish_reason: Optional[str] = None


class ChatCompletionStreamResponse(BaseModel):
    """流式响应"""
    id: str
    object: str = "chat.completion.chunk"
    created: int
    model: str
    choices: List[StreamChoice]


class HealthResponse(BaseModel):
    """健康检查响应"""
    status: str
    model: str
    gpu_memory_used: Optional[float] = None
    active_requests: int

1.5 API 路由

# src/api.py
"""
API 路由
"""
import time
import asyncio
from typing import AsyncGenerator
from fastapi import FastAPI, HTTPException, Depends, Header
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
import json

from .schemas import (
    ChatCompletionRequest,
    ChatCompletionResponse,
    ChatCompletionStreamResponse,
    CompletionRequest,
    Message,
    Choice,
    StreamChoice,
    DeltaMessage,
    Usage,
    HealthResponse,
    Role
)
from .engine import get_engine, InferenceEngine
from .config import get_settings

app = FastAPI(title="LLM Inference Service", version="1.0.0")


def format_chat_prompt(messages: list[Message]) -> str:
    """格式化聊天消息为 prompt"""
    # Llama 2 Chat 格式
    prompt = ""
    system_message = None

    for msg in messages:
        if msg.role == Role.SYSTEM:
            system_message = msg.content
        elif msg.role == Role.USER:
            if system_message:
                prompt += f"<s>[INST] <<SYS>>\n{system_message}\n<</SYS>>\n\n{msg.content} [/INST]"
                system_message = None
            else:
                prompt += f"<s>[INST] {msg.content} [/INST]"
        elif msg.role == Role.ASSISTANT:
            prompt += f" {msg.content} </s>"

    return prompt


async def verify_api_key(authorization: str = Header(None)):
    """API Key 验证"""
    settings = get_settings()

    if not settings.api_key_enabled:
        return True

    if not authorization:
        raise HTTPException(status_code=401, detail="Missing Authorization header")

    # Bearer token 格式
    if not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Invalid Authorization format")

    api_key = authorization[7:]
    if api_key not in settings.api_keys:
        raise HTTPException(status_code=401, detail="Invalid API key")

    return True


@app.get("/health", response_model=HealthResponse)
async def health_check(engine: InferenceEngine = Depends(get_engine)):
    """健康检查"""
    stats = engine.get_stats()

    return HealthResponse(
        status="healthy",
        model=get_settings().model_name,
        active_requests=stats["active_requests"]
    )


@app.get("/v1/models")
async def list_models():
    """列出可用模型"""
    settings = get_settings()
    return {
        "object": "list",
        "data": [
            {
                "id": settings.model_name,
                "object": "model",
                "owned_by": "organization",
                "permission": []
            }
        ]
    }


@app.post("/v1/chat/completions")
async def chat_completions(
    request: ChatCompletionRequest,
    engine: InferenceEngine = Depends(get_engine),
    _: bool = Depends(verify_api_key)
):
    """聊天补全接口 (OpenAI 兼容)"""
    # 格式化 prompt
    prompt = format_chat_prompt(request.messages)

    if request.stream:
        return EventSourceResponse(
            stream_chat_response(request, prompt, engine),
            media_type="text/event-stream"
        )

    # 非流式响应
    result = await engine.generate(
        prompt=prompt,
        max_tokens=request.max_tokens,
        temperature=request.temperature,
        top_p=request.top_p,
        top_k=request.top_k,
        stop=request.stop
    )

    return ChatCompletionResponse(
        id=result.request_id,
        created=int(time.time()),
        model=request.model,
        choices=[
            Choice(
                index=0,
                message=Message(role=Role.ASSISTANT, content=result.text),
                finish_reason=result.finish_reason
            )
        ],
        usage=Usage(
            prompt_tokens=result.prompt_tokens,
            completion_tokens=result.completion_tokens,
            total_tokens=result.prompt_tokens + result.completion_tokens
        )
    )


async def stream_chat_response(
    request: ChatCompletionRequest,
    prompt: str,
    engine: InferenceEngine
) -> AsyncGenerator[str, None]:
    """流式聊天响应"""
    request_id = f"chatcmpl-{int(time.time()*1000)}"

    # 发送角色
    first_chunk = ChatCompletionStreamResponse(
        id=request_id,
        created=int(time.time()),
        model=request.model,
        choices=[
            StreamChoice(
                index=0,
                delta=DeltaMessage(role="assistant"),
                finish_reason=None
            )
        ]
    )
    yield f"data: {first_chunk.model_dump_json()}\n\n"

    # 流式生成
    async for chunk in engine.generate_stream(
        prompt=prompt,
        max_tokens=request.max_tokens,
        temperature=request.temperature,
        top_p=request.top_p,
        top_k=request.top_k,
        stop=request.stop
    ):
        response = ChatCompletionStreamResponse(
            id=request_id,
            created=int(time.time()),
            model=request.model,
            choices=[
                StreamChoice(
                    index=0,
                    delta=DeltaMessage(content=chunk.text),
                    finish_reason=chunk.finish_reason if chunk.is_finished else None
                )
            ]
        )
        yield f"data: {response.model_dump_json()}\n\n"

        if chunk.is_finished:
            break

    yield "data: [DONE]\n\n"


@app.get("/metrics")
async def metrics(engine: InferenceEngine = Depends(get_engine)):
    """Prometheus 格式指标"""
    stats = engine.get_stats()

    metrics_text = f"""# HELP llm_requests_total Total number of requests
# TYPE llm_requests_total counter
llm_requests_total {stats['total_requests']}

# HELP llm_tokens_generated_total Total tokens generated
# TYPE llm_tokens_generated_total counter
llm_tokens_generated_total {stats['total_tokens_generated']}

# HELP llm_active_requests Current active requests
# TYPE llm_active_requests gauge
llm_active_requests {stats['active_requests']}

# HELP llm_avg_latency_ms Average latency in milliseconds
# TYPE llm_avg_latency_ms gauge
llm_avg_latency_ms {stats['avg_latency_ms']:.2f}
"""

    return metrics_text

1.6 启动脚本

# src/main.py
"""
应用入口
"""
import uvicorn
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI

from .api import app
from .engine import get_engine
from .config import get_settings


@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期管理"""
    # 启动时初始化引擎
    engine = await get_engine()
    yield
    # 关闭时清理
    await engine.shutdown()


app.router.lifespan_context = lifespan


def main():
    settings = get_settings()

    uvicorn.run(
        "src.main:app",
        host=settings.host,
        port=settings.port,
        workers=settings.workers,
        log_level=settings.log_level.lower()
    )


if __name__ == "__main__":
    main()

第二阶段:容器化部署

2.1 Dockerfile

# deploy/Dockerfile
FROM nvidia/cuda:12.1-devel-ubuntu22.04

# 设置环境变量
ENV PYTHONUNBUFFERED=1 \
    PYTHONDONTWRITEBYTECODE=1 \
    PIP_NO_CACHE_DIR=1 \
    PIP_DISABLE_PIP_VERSION_CHECK=1

# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
    python3.10 \
    python3-pip \
    python3-dev \
    git \
    curl \
    && rm -rf /var/lib/apt/lists/*

# 设置工作目录
WORKDIR /app

# 安装 Python 依赖
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt

# 复制源代码
COPY src/ ./src/

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["python3", "-m", "src.main"]

2.2 Docker Compose

# deploy/docker-compose.yml
version: '3.8'

services:
  llm-service:
    build:
      context: ..
      dockerfile: deploy/Dockerfile
    ports:
      - "8000:8000"
    environment:
      - LLM_MODEL_NAME=meta-llama/Llama-2-7b-chat-hf
      - LLM_TENSOR_PARALLEL_SIZE=1
      - LLM_GPU_MEMORY_UTILIZATION=0.9
      - LLM_LOG_LEVEL=INFO
    volumes:
      - ~/.cache/huggingface:/root/.cache/huggingface
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]
    restart: unless-stopped

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
    depends_on:
      - llm-service
    restart: unless-stopped

  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ../monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    restart: unless-stopped

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana-data:/var/lib/grafana
    restart: unless-stopped

volumes:
  grafana-data:

2.3 Kubernetes 部署

# deploy/k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-inference
  labels:
    app: llm-inference
spec:
  replicas: 1
  selector:
    matchLabels:
      app: llm-inference
  template:
    metadata:
      labels:
        app: llm-inference
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "8000"
        prometheus.io/path: "/metrics"
    spec:
      containers:
      - name: llm-service
        image: llm-inference:latest
        ports:
        - containerPort: 8000
        resources:
          limits:
            nvidia.com/gpu: 2
            memory: "64Gi"
            cpu: "16"
          requests:
            nvidia.com/gpu: 2
            memory: "32Gi"
            cpu: "8"
        env:
        - name: LLM_MODEL_NAME
          valueFrom:
            configMapKeyRef:
              name: llm-config
              key: model_name
        - name: LLM_TENSOR_PARALLEL_SIZE
          value: "2"
        - name: LLM_GPU_MEMORY_UTILIZATION
          value: "0.9"
        envFrom:
        - secretRef:
            name: llm-secrets
        volumeMounts:
        - name: model-cache
          mountPath: /root/.cache/huggingface
        - name: shm
          mountPath: /dev/shm
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 120
          periodSeconds: 30
          timeoutSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 60
          periodSeconds: 10
      volumes:
      - name: model-cache
        persistentVolumeClaim:
          claimName: model-cache-pvc
      - name: shm
        emptyDir:
          medium: Memory
          sizeLimit: 16Gi
      nodeSelector:
        nvidia.com/gpu.product: NVIDIA-A100-SXM4-80GB
      tolerations:
      - key: nvidia.com/gpu
        operator: Exists
        effect: NoSchedule
---
apiVersion: v1
kind: Service
metadata:
  name: llm-inference
spec:
  selector:
    app: llm-inference
  ports:
  - port: 80
    targetPort: 8000
  type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: llm-inference-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: llm-inference
  minReplicas: 1
  maxReplicas: 4
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Pods
    pods:
      metric:
        name: llm_active_requests
      target:
        type: AverageValue
        averageValue: "50"

第三阶段:监控与优化

3.1 Prometheus 配置

# monitoring/prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'llm-inference'
    static_configs:
      - targets: ['llm-service:8000']

  - job_name: 'nvidia-gpu'
    static_configs:
      - targets: ['dcgm-exporter:9400']

3.2 Grafana Dashboard

{
  "dashboard": {
    "title": "LLM Inference Dashboard",
    "panels": [
      {
        "title": "Requests per Second",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(llm_requests_total[1m])",
            "legendFormat": "RPS"
          }
        ]
      },
      {
        "title": "Average Latency",
        "type": "gauge",
        "targets": [
          {
            "expr": "llm_avg_latency_ms",
            "legendFormat": "Latency (ms)"
          }
        ]
      },
      {
        "title": "Active Requests",
        "type": "graph",
        "targets": [
          {
            "expr": "llm_active_requests",
            "legendFormat": "Active"
          }
        ]
      },
      {
        "title": "GPU Memory Usage",
        "type": "graph",
        "targets": [
          {
            "expr": "DCGM_FI_DEV_FB_USED / DCGM_FI_DEV_FB_TOTAL * 100",
            "legendFormat": "GPU {{gpu}}"
          }
        ]
      },
      {
        "title": "Tokens per Second",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(llm_tokens_generated_total[1m])",
            "legendFormat": "Tokens/s"
          }
        ]
      }
    ]
  }
}

测试验证

功能测试

# tests/test_api.py
"""
API 测试
"""
import pytest
import httpx
import asyncio

BASE_URL = "http://localhost:8000"


@pytest.mark.asyncio
async def test_health_check():
    """测试健康检查"""
    async with httpx.AsyncClient() as client:
        response = await client.get(f"{BASE_URL}/health")
        assert response.status_code == 200
        data = response.json()
        assert data["status"] == "healthy"


@pytest.mark.asyncio
async def test_chat_completion():
    """测试聊天补全"""
    async with httpx.AsyncClient(timeout=60.0) as client:
        response = await client.post(
            f"{BASE_URL}/v1/chat/completions",
            json={
                "messages": [
                    {"role": "user", "content": "Hello, how are you?"}
                ],
                "max_tokens": 100
            }
        )
        assert response.status_code == 200
        data = response.json()
        assert "choices" in data
        assert len(data["choices"]) > 0


@pytest.mark.asyncio
async def test_streaming():
    """测试流式输出"""
    async with httpx.AsyncClient(timeout=60.0) as client:
        async with client.stream(
            "POST",
            f"{BASE_URL}/v1/chat/completions",
            json={
                "messages": [
                    {"role": "user", "content": "Count from 1 to 5."}
                ],
                "stream": True,
                "max_tokens": 50
            }
        ) as response:
            assert response.status_code == 200
            chunks = []
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    chunks.append(line)
            assert len(chunks) > 0

性能测试

# tests/benchmark.py
"""
性能测试
"""
import asyncio
import httpx
import time
from statistics import mean, stdev

BASE_URL = "http://localhost:8000"
CONCURRENT_REQUESTS = 10
TOTAL_REQUESTS = 100


async def single_request(client: httpx.AsyncClient) -> dict:
    """单个请求"""
    start_time = time.time()

    response = await client.post(
        f"{BASE_URL}/v1/chat/completions",
        json={
            "messages": [
                {"role": "user", "content": "What is 2+2?"}
            ],
            "max_tokens": 50
        }
    )

    latency = (time.time() - start_time) * 1000
    data = response.json()

    return {
        "latency_ms": latency,
        "tokens": data["usage"]["completion_tokens"],
        "status": response.status_code
    }


async def run_benchmark():
    """运行基准测试"""
    async with httpx.AsyncClient(timeout=60.0) as client:
        semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS)

        async def limited_request():
            async with semaphore:
                return await single_request(client)

        # 预热
        print("Warming up...")
        await asyncio.gather(*[single_request(client) for _ in range(5)])

        # 正式测试
        print(f"Running {TOTAL_REQUESTS} requests with {CONCURRENT_REQUESTS} concurrency...")
        start_time = time.time()

        results = await asyncio.gather(*[
            limited_request() for _ in range(TOTAL_REQUESTS)
        ])

        total_time = time.time() - start_time

        # 统计结果
        latencies = [r["latency_ms"] for r in results]
        tokens = sum(r["tokens"] for r in results)
        success = sum(1 for r in results if r["status"] == 200)

        print("\n=== Benchmark Results ===")
        print(f"Total Requests: {TOTAL_REQUESTS}")
        print(f"Successful: {success}")
        print(f"Total Time: {total_time:.2f}s")
        print(f"Throughput: {TOTAL_REQUESTS/total_time:.2f} req/s")
        print(f"Tokens Generated: {tokens}")
        print(f"Token Throughput: {tokens/total_time:.2f} tokens/s")
        print(f"\nLatency (ms):")
        print(f"  Mean: {mean(latencies):.2f}")
        print(f"  Std: {stdev(latencies):.2f}")
        print(f"  Min: {min(latencies):.2f}")
        print(f"  Max: {max(latencies):.2f}")
        print(f"  P50: {sorted(latencies)[len(latencies)//2]:.2f}")
        print(f"  P99: {sorted(latencies)[int(len(latencies)*0.99)]:.2f}")


if __name__ == "__main__":
    asyncio.run(run_benchmark())

小结

本项目完整实现了一个生产级 LLM 推理服务:

  1. 推理引擎:基于 vLLM 的高性能推理
  2. API 服务:OpenAI 兼容的 API 接口
  3. 流式输出:SSE 流式响应
  4. 容器化:Docker + Kubernetes 部署
  5. 监控:Prometheus + Grafana

后续可扩展方向:

  • 多模型管理
  • A/B 测试
  • 缓存层(Redis)
  • 更复杂的负载均衡策略