HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

04-vLLM推理引擎源码解析

概述

vLLM是UC Berkeley开发的高性能LLM推理引擎,其核心创新是PagedAttention技术,实现了接近理论峰值的推理吞吐。本章深入解析vLLM的核心源码实现。

vLLM 架构

整体架构

┌─────────────────────────────────────────────────────────────────────────┐
│                         vLLM 架构                                        │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  ┌─────────────────────────────────────────────────────────────────┐    │
│  │                        API Layer                                 │    │
│  │                                                                  │    │
│  │  OpenAI Compatible API       Async Engine API                   │    │
│  │  /v1/completions             AsyncLLMEngine                     │    │
│  │  /v1/chat/completions        generate()                         │    │
│  └─────────────────────────────────────────────────────────────────┘    │
│                                │                                         │
│  ┌─────────────────────────────┴───────────────────────────────────┐    │
│  │                        Engine Layer                              │    │
│  │                                                                  │    │
│  │  ┌───────────────┐  ┌───────────────┐  ┌───────────────┐       │    │
│  │  │   LLMEngine   │  │   Scheduler   │  │ BlockManager  │       │    │
│  │  │               │  │               │  │               │       │    │
│  │  │  ├─ add_req   │  │  ├─ schedule  │  │  ├─ allocate │       │    │
│  │  │  └─ step      │  │  └─ _schedule │  │  └─ free     │       │    │
│  │  └───────────────┘  └───────────────┘  └───────────────┘       │    │
│  └─────────────────────────────────────────────────────────────────┘    │
│                                │                                         │
│  ┌─────────────────────────────┴───────────────────────────────────┐    │
│  │                        Worker Layer                              │    │
│  │                                                                  │    │
│  │  ┌───────────────────────────────────────────────────────────┐ │    │
│  │  │                      Worker                                │ │    │
│  │  │                                                            │ │    │
│  │  │   ┌─────────────┐   ┌─────────────┐   ┌─────────────┐    │ │    │
│  │  │   │ ModelRunner │   │ CacheEngine │   │ KV Cache    │    │ │    │
│  │  │   │             │   │             │   │             │    │ │    │
│  │  │   │ ├─ execute  │   │ ├─ swap_in  │   │ (GPU Mem)   │    │ │    │
│  │  │   │ └─ profile  │   │ └─ swap_out │   │             │    │ │    │
│  │  │   └─────────────┘   └─────────────┘   └─────────────┘    │ │    │
│  │  └───────────────────────────────────────────────────────────┘ │    │
│  └─────────────────────────────────────────────────────────────────┘    │
│                                │                                         │
│  ┌─────────────────────────────┴───────────────────────────────────┐    │
│  │                        Model Layer                               │    │
│  │                                                                  │    │
│  │  ┌───────────────┐  ┌───────────────┐  ┌───────────────┐       │    │
│  │  │  Attention    │  │  MLP          │  │  Embedding    │       │    │
│  │  │  (PagedAttn)  │  │               │  │               │       │    │
│  │  └───────────────┘  └───────────────┘  └───────────────┘       │    │
│  └─────────────────────────────────────────────────────────────────┘    │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

源码目录结构

vllm/
├── entrypoints/
│   ├── openai/
│   │   ├── api_server.py       # OpenAI兼容API服务
│   │   └── serving_chat.py     # Chat API实现
│   └── llm.py                  # 简单Python API
│
├── engine/
│   ├── llm_engine.py           # 核心引擎
│   ├── async_llm_engine.py     # 异步引擎
│   └── arg_utils.py            # 参数处理
│
├── core/
│   ├── scheduler.py            # 调度器
│   ├── block_manager.py        # Block管理
│   └── block.py                # Block定义
│
├── worker/
│   ├── worker.py               # Worker进程
│   ├── model_runner.py         # 模型执行
│   └── cache_engine.py         # 缓存引擎
│
├── model_executor/
│   ├── layers/
│   │   ├── attention.py        # PagedAttention
│   │   └── linear.py           # 量化线性层
│   ├── models/
│   │   ├── llama.py            # LLaMA模型
│   │   └── gpt2.py             # GPT-2模型
│   └── parallel_utils/
│       └── tensor_parallel.py  # 张量并行
│
├── attention/
│   ├── backends/
│   │   ├── flash_attn.py       # FlashAttention后端
│   │   └── xformers.py         # xFormers后端
│   └── ops/
│       └── paged_attn.py       # PagedAttention CUDA kernel
│
└── distributed/
    └── communication_op.py     # 分布式通信

PagedAttention 核心原理

KV Cache 显存问题

┌─────────────────────────────────────────────────────────────────────────┐
│                     KV Cache 显存问题                                    │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  传统KV Cache分配:                                                       │
│  ═══════════════════                                                     │
│                                                                          │
│  问题1: 预分配导致浪费                                                   │
│  ─────────────────────                                                   │
│                                                                          │
│  Request 1 (实际长度 100):                                              │
│  ┌────────────────────────────────────────────────────────────────┐    │
│  │████████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░│    │
│  │   Used (100)                    Wasted (Max_len - 100)       │    │
│  └────────────────────────────────────────────────────────────────┘    │
│                                                                          │
│  Request 2 (实际长度 500):                                              │
│  ┌────────────────────────────────────────────────────────────────┐    │
│  │████████████████████████████████████░░░░░░░░░░░░░░░░░░░░░░░░░░│    │
│  │   Used (500)                    Wasted                       │    │
│  └────────────────────────────────────────────────────────────────┘    │
│                                                                          │
│  预分配 max_seq_len (如2048) 的连续内存                                  │
│  实际使用率可能只有 20-40%                                               │
│                                                                          │
│  问题2: 连续分配限制并发                                                 │
│  ─────────────────────────                                               │
│                                                                          │
│  GPU Memory:                                                             │
│  ┌────────────────────────────────────────────────────────────────┐    │
│  │  Req1    │    Req2    │   Free   │  Req3  │  Free  │  Free  │    │
│  └────────────────────────────────────────────────────────────────┘    │
│                                                                          │
│  新请求Req4需要连续2048空间, 但碎片化导致无法分配                         │
│  即使总空闲空间足够                                                      │
│                                                                          │
│                                                                          │
│  PagedAttention 解决方案:                                               │
│  ═══════════════════════════                                             │
│                                                                          │
│  将KV Cache分成固定大小的Block (类似OS的内存分页)                        │
│                                                                          │
│  ┌───────┬───────┬───────┬───────┬───────┬───────┬───────┬───────┐    │
│  │Block 0│Block 1│Block 2│Block 3│Block 4│Block 5│Block 6│Block 7│    │
│  └───────┴───────┴───────┴───────┴───────┴───────┴───────┴───────┘    │
│                                                                          │
│  Request 1: [Block 0] → [Block 3] → [Block 5]  (按需分配)               │
│  Request 2: [Block 1] → [Block 2] → [Block 4] → [Block 6]               │
│  Request 3: [Block 7]                                                   │
│                                                                          │
│  优势:                                                                   │
│  • 按需分配: 不预留空间                                                  │
│  • 消除碎片: 任意空闲Block可用                                           │
│  • 支持共享: 相同prefix的请求共享Block                                   │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

Block 数据结构

# vllm/core/block.py

@dataclass
class PhysicalTokenBlock:
    """
    物理Block: 实际存储KV cache的GPU内存块

    每个Block存储固定数量token的KV cache
    """
    device: Device           # CPU or GPU
    block_number: int        # 物理Block编号
    block_size: int          # Block大小 (token数)
    ref_count: int = 0       # 引用计数 (用于共享)

    def __post_init__(self):
        # 实际的KV tensor在CacheEngine中分配
        pass


@dataclass
class LogicalTokenBlock:
    """
    逻辑Block: 请求视角的Block

    一个逻辑Block可能映射到一个物理Block
    多个逻辑Block可以共享同一个物理Block (prefix caching)
    """
    block_number: int
    block_size: int
    token_ids: List[int] = field(default_factory=list)
    num_tokens: int = 0

    def is_full(self) -> bool:
        return self.num_tokens == self.block_size

    def append_token(self, token_id: int) -> None:
        assert not self.is_full()
        self.token_ids.append(token_id)
        self.num_tokens += 1

    def get_num_empty_slots(self) -> int:
        return self.block_size - self.num_tokens


class BlockTable:
    """
    Block表: 维护逻辑Block到物理Block的映射

    类似操作系统的页表
    """

    def __init__(self):
        self.logical_to_physical: Dict[int, int] = {}

    def add_mapping(self, logical_block_num: int, physical_block_num: int):
        self.logical_to_physical[logical_block_num] = physical_block_num

    def get_physical_block(self, logical_block_num: int) -> int:
        return self.logical_to_physical[logical_block_num]

    def get_block_table(self) -> List[int]:
        """返回物理Block编号列表 (传给CUDA kernel)"""
        return [self.logical_to_physical[i]
                for i in range(len(self.logical_to_physical))]

BlockManager 实现

# vllm/core/block_manager.py

class BlockSpaceManager:
    """
    Block空间管理器

    管理GPU和CPU上的物理Block分配和释放
    """

    def __init__(
        self,
        block_size: int,
        num_gpu_blocks: int,
        num_cpu_blocks: int,
        watermark: float = 0.01,
        sliding_window: Optional[int] = None,
    ):
        self.block_size = block_size
        self.num_gpu_blocks = num_gpu_blocks
        self.num_cpu_blocks = num_cpu_blocks

        # GPU Block池: 空闲的物理Block
        self.gpu_allocator = BlockAllocator(
            Device.GPU, block_size, num_gpu_blocks
        )

        # CPU Block池 (用于swap)
        self.cpu_allocator = BlockAllocator(
            Device.CPU, block_size, num_cpu_blocks
        )

        # 每个sequence的Block表
        self.block_tables: Dict[int, BlockTable] = {}

        # watermark: 预留的最小空闲Block数
        self.watermark_blocks = int(num_gpu_blocks * watermark)

    def can_allocate(self, seq_group: SequenceGroup) -> AllocStatus:
        """检查是否能为新请求分配Block"""
        # 计算需要的Block数
        num_required_blocks = self._get_num_required_blocks(seq_group)

        # 检查空闲Block是否足够
        num_free_gpu_blocks = self.gpu_allocator.get_num_free_blocks()

        if num_free_gpu_blocks - num_required_blocks >= self.watermark_blocks:
            return AllocStatus.OK
        elif num_free_gpu_blocks >= num_required_blocks:
            return AllocStatus.LATER  # 等待其他请求完成
        else:
            return AllocStatus.NEVER  # 请求太长,永远无法分配

    def allocate(self, seq_group: SequenceGroup) -> None:
        """为新请求分配Block"""
        seq = seq_group.get_seqs()[0]

        # 计算需要的Block数
        num_prompt_blocks = (len(seq.prompt_token_ids) + self.block_size - 1) // self.block_size

        # 分配物理Block
        block_table = BlockTable()

        for logical_idx in range(num_prompt_blocks):
            # 分配物理Block
            physical_block = self.gpu_allocator.allocate()
            block_table.add_mapping(logical_idx, physical_block.block_number)

        self.block_tables[seq.seq_id] = block_table

    def can_append_slot(self, seq_group: SequenceGroup) -> bool:
        """检查是否能追加一个新token"""
        seq = seq_group.get_seqs()[0]
        block_table = self.block_tables[seq.seq_id]

        # 检查最后一个Block是否有空间
        last_block_idx = len(block_table.logical_to_physical) - 1
        last_block = self.gpu_allocator.get_block(
            block_table.get_physical_block(last_block_idx)
        )

        if last_block.get_num_empty_slots() > 0:
            return True

        # 需要分配新Block
        return self.gpu_allocator.get_num_free_blocks() >= 1

    def append_slot(self, seq: Sequence) -> Optional[Tuple[int, int]]:
        """追加一个slot用于新生成的token"""
        block_table = self.block_tables[seq.seq_id]

        # 检查最后一个Block
        if len(block_table.logical_to_physical) > 0:
            last_logical_idx = len(block_table.logical_to_physical) - 1
            last_physical_idx = block_table.get_physical_block(last_logical_idx)
            last_block = self.gpu_allocator.get_block(last_physical_idx)

            if last_block.get_num_empty_slots() > 0:
                # 最后一个Block还有空间
                return None

        # 需要分配新Block
        physical_block = self.gpu_allocator.allocate()
        new_logical_idx = len(block_table.logical_to_physical)
        block_table.add_mapping(new_logical_idx, physical_block.block_number)

        return (new_logical_idx, physical_block.block_number)

    def free(self, seq: Sequence) -> None:
        """释放sequence的所有Block"""
        block_table = self.block_tables.pop(seq.seq_id)

        for physical_block_num in block_table.logical_to_physical.values():
            self.gpu_allocator.free(physical_block_num)

    def swap_out(self, seq_group: SequenceGroup) -> Dict[int, int]:
        """将GPU Block换出到CPU"""
        mapping = {}

        for seq in seq_group.get_seqs():
            block_table = self.block_tables[seq.seq_id]

            for logical_idx, gpu_block_num in block_table.logical_to_physical.items():
                # 分配CPU Block
                cpu_block = self.cpu_allocator.allocate()

                # 记录映射 (用于数据拷贝)
                mapping[gpu_block_num] = cpu_block.block_number

                # 更新Block表
                block_table.add_mapping(logical_idx, cpu_block.block_number)

                # 释放GPU Block
                self.gpu_allocator.free(gpu_block_num)

        return mapping

    def swap_in(self, seq_group: SequenceGroup) -> Dict[int, int]:
        """将CPU Block换入到GPU"""
        mapping = {}

        for seq in seq_group.get_seqs():
            block_table = self.block_tables[seq.seq_id]

            for logical_idx, cpu_block_num in block_table.logical_to_physical.items():
                # 分配GPU Block
                gpu_block = self.gpu_allocator.allocate()

                # 记录映射
                mapping[cpu_block_num] = gpu_block.block_number

                # 更新Block表
                block_table.add_mapping(logical_idx, gpu_block.block_number)

                # 释放CPU Block
                self.cpu_allocator.free(cpu_block_num)

        return mapping

Scheduler 调度器

调度策略

# vllm/core/scheduler.py

class Scheduler:
    """
    请求调度器

    决定:
    1. 哪些请求可以在当前step执行
    2. 哪些请求需要preempt (换出)
    3. 哪些请求可以swap回来
    """

    def __init__(
        self,
        scheduler_config: SchedulerConfig,
        cache_config: CacheConfig,
    ):
        self.scheduler_config = scheduler_config
        self.cache_config = cache_config

        # Block管理器
        self.block_manager = BlockSpaceManager(
            block_size=cache_config.block_size,
            num_gpu_blocks=cache_config.num_gpu_blocks,
            num_cpu_blocks=cache_config.num_cpu_blocks,
        )

        # 请求队列
        self.waiting: Deque[SequenceGroup] = deque()  # 等待处理
        self.running: Deque[SequenceGroup] = deque()  # 正在运行
        self.swapped: Deque[SequenceGroup] = deque()  # 已换出到CPU

    def add_seq_group(self, seq_group: SequenceGroup) -> None:
        """添加新请求到等待队列"""
        self.waiting.append(seq_group)

    def schedule(self) -> Tuple[SchedulerOutputs, bool]:
        """
        主调度函数

        返回:
        - SchedulerOutputs: 调度结果
        - bool: 是否有prompt (prefill)
        """
        # 1. 首先处理swapped队列 (优先恢复之前被抢占的请求)
        self._schedule_swapped()

        # 2. 处理running队列 (decode阶段的请求)
        self._schedule_running()

        # 3. 处理waiting队列 (新请求,prefill阶段)
        self._schedule_waiting()

        # 构建输出
        return self._build_scheduler_outputs()

    def _schedule_waiting(self) -> None:
        """调度等待队列中的请求"""
        while self.waiting:
            seq_group = self.waiting[0]

            # 检查是否能分配Block
            alloc_status = self.block_manager.can_allocate(seq_group)

            if alloc_status == AllocStatus.OK:
                # 可以分配,移到running队列
                self.waiting.popleft()
                self._allocate(seq_group)
                self.running.append(seq_group)
            elif alloc_status == AllocStatus.LATER:
                # 暂时不能分配,等待
                break
            else:  # NEVER
                # 请求太长,无法处理
                self.waiting.popleft()
                self._abort_seq_group(seq_group)

    def _schedule_running(self) -> None:
        """调度正在运行的请求"""
        # 检查是否有足够的Block继续生成
        preempted = []

        for seq_group in self.running:
            # 检查是否能追加新token
            if not self.block_manager.can_append_slot(seq_group):
                # Block不足,需要抢占
                preempted.append(seq_group)

        # 抢占优先级最低的请求
        for seq_group in reversed(preempted):
            self._preempt(seq_group)
            self.running.remove(seq_group)
            self.swapped.append(seq_group)

    def _schedule_swapped(self) -> None:
        """调度被换出的请求"""
        while self.swapped:
            seq_group = self.swapped[0]

            # 检查是否能swap回来
            if self.block_manager.can_swap_in(seq_group):
                self.swapped.popleft()
                self._swap_in(seq_group)
                self.running.append(seq_group)
            else:
                break

    def _preempt(self, seq_group: SequenceGroup) -> None:
        """抢占请求 (swap out到CPU)"""
        preemption_mode = self.scheduler_config.preemption_mode

        if preemption_mode == PreemptionMode.SWAP:
            # 换出到CPU
            self._swap_out(seq_group)
        else:
            # 重新计算 (丢弃KV cache,之后重算)
            self._recompute(seq_group)

    def _allocate(self, seq_group: SequenceGroup) -> None:
        """为请求分配Block"""
        self.block_manager.allocate(seq_group)

    def _swap_out(self, seq_group: SequenceGroup) -> Dict[int, int]:
        """换出到CPU"""
        return self.block_manager.swap_out(seq_group)

    def _swap_in(self, seq_group: SequenceGroup) -> Dict[int, int]:
        """从CPU换入"""
        return self.block_manager.swap_in(seq_group)

Continuous Batching

┌─────────────────────────────────────────────────────────────────────────┐
│                    Continuous Batching                                   │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  传统Static Batching:                                                    │
│  ═══════════════════════                                                 │
│                                                                          │
│  Time →                                                                  │
│  ┌────────────────────────────────────────────────────────────────┐    │
│  │ Req1: [===Prefill===][=====Decode=====][PAD][PAD][PAD]        │    │
│  │ Req2: [===Prefill===][===Decode===][PAD][PAD][PAD][PAD][PAD]  │    │
│  │ Req3: [===Prefill===][=======Decode=======][PAD][PAD]         │    │
│  └────────────────────────────────────────────────────────────────┘    │
│                                                                          │
│  所有请求必须等到最长的完成才能处理新请求                                 │
│  短请求等待时有大量padding浪费                                           │
│                                                                          │
│                                                                          │
│  Continuous Batching:                                                    │
│  ═════════════════════                                                   │
│                                                                          │
│  Time →                                                                  │
│  Step 1: [Req1 Prefill][Req2 Prefill][Req3 Prefill]                    │
│  Step 2: [Req1 Decode ][Req2 Decode ][Req3 Decode ]                    │
│  Step 3: [Req1 Decode ][Req2 DONE→Req4 Prefill][Req3 Decode ]          │
│  Step 4: [Req1 Decode ][Req4 Decode ][Req3 Decode ]                    │
│  Step 5: [Req1 DONE→Req5][Req4 Decode ][Req3 Decode ]                  │
│  ...                                                                     │
│                                                                          │
│  每个step独立调度:                                                       │
│  • 完成的请求立即移除                                                    │
│  • 空出的slot立即填入新请求                                              │
│  • 无padding浪费                                                         │
│  • 吞吐量大幅提升                                                        │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

PagedAttention CUDA Kernel

Kernel 实现

// vllm/attention/ops/paged_attention.cu (简化版)

template <typename scalar_t, int HEAD_SIZE, int BLOCK_SIZE, int NUM_THREADS>
__global__ void paged_attention_kernel(
    scalar_t* __restrict__ out,           // [num_seqs, num_heads, head_size]
    const scalar_t* __restrict__ query,   // [num_seqs, num_heads, head_size]
    const scalar_t* __restrict__ key_cache,   // [num_blocks, num_heads, head_size, block_size]
    const scalar_t* __restrict__ value_cache, // [num_blocks, num_heads, head_size, block_size]
    const int* __restrict__ block_tables, // [num_seqs, max_num_blocks_per_seq]
    const int* __restrict__ context_lens, // [num_seqs]
    const int max_num_blocks_per_seq,
    const float scale
) {
    // 获取当前处理的sequence和head
    const int seq_idx = blockIdx.x;
    const int head_idx = blockIdx.y;
    const int thread_idx = threadIdx.x;

    // 获取context长度
    const int context_len = context_lens[seq_idx];
    const int num_blocks = (context_len + BLOCK_SIZE - 1) / BLOCK_SIZE;

    // Query: 当前token的query
    const scalar_t* q = query + seq_idx * num_heads * HEAD_SIZE + head_idx * HEAD_SIZE;

    // 初始化累加器
    float qk_max = -FLT_MAX;
    float exp_sum = 0.0f;
    float acc[HEAD_SIZE] = {0.0f};

    // 遍历所有Block
    for (int block_idx = 0; block_idx < num_blocks; block_idx++) {
        // 从Block表获取物理Block编号
        const int physical_block_num = block_tables[seq_idx * max_num_blocks_per_seq + block_idx];

        // Key Cache指针
        const scalar_t* k_block = key_cache +
            physical_block_num * num_heads * HEAD_SIZE * BLOCK_SIZE +
            head_idx * HEAD_SIZE * BLOCK_SIZE;

        // Value Cache指针
        const scalar_t* v_block = value_cache +
            physical_block_num * num_heads * HEAD_SIZE * BLOCK_SIZE +
            head_idx * HEAD_SIZE * BLOCK_SIZE;

        // 计算Block内有效token数
        const int block_start = block_idx * BLOCK_SIZE;
        const int block_end = min(block_start + BLOCK_SIZE, context_len);
        const int num_valid_tokens = block_end - block_start;

        // 计算QK
        for (int token_idx = thread_idx; token_idx < num_valid_tokens; token_idx += NUM_THREADS) {
            float qk = 0.0f;

            // Q @ K^T
            for (int d = 0; d < HEAD_SIZE; d++) {
                qk += float(q[d]) * float(k_block[d * BLOCK_SIZE + token_idx]);
            }
            qk *= scale;

            // 更新max
            qk_max = fmaxf(qk_max, qk);
        }
    }

    // 跨线程同步max
    __shared__ float shared_qk_max;
    qk_max = blockReduceMax(qk_max);
    if (thread_idx == 0) {
        shared_qk_max = qk_max;
    }
    __syncthreads();
    qk_max = shared_qk_max;

    // 第二遍: 计算softmax和value累加
    for (int block_idx = 0; block_idx < num_blocks; block_idx++) {
        const int physical_block_num = block_tables[seq_idx * max_num_blocks_per_seq + block_idx];

        const scalar_t* k_block = key_cache + ...;
        const scalar_t* v_block = value_cache + ...;

        const int block_start = block_idx * BLOCK_SIZE;
        const int block_end = min(block_start + BLOCK_SIZE, context_len);
        const int num_valid_tokens = block_end - block_start;

        for (int token_idx = thread_idx; token_idx < num_valid_tokens; token_idx += NUM_THREADS) {
            // 重新计算QK
            float qk = 0.0f;
            for (int d = 0; d < HEAD_SIZE; d++) {
                qk += float(q[d]) * float(k_block[d * BLOCK_SIZE + token_idx]);
            }
            qk *= scale;

            // Softmax
            float p = expf(qk - qk_max);
            exp_sum += p;

            // 累加V
            for (int d = 0; d < HEAD_SIZE; d++) {
                acc[d] += p * float(v_block[d * BLOCK_SIZE + token_idx]);
            }
        }
    }

    // 跨线程同步exp_sum
    __shared__ float shared_exp_sum;
    exp_sum = blockReduceSum(exp_sum);
    if (thread_idx == 0) {
        shared_exp_sum = exp_sum;
    }
    __syncthreads();
    exp_sum = shared_exp_sum;

    // Normalize
    const float inv_exp_sum = 1.0f / (exp_sum + 1e-6f);
    for (int d = 0; d < HEAD_SIZE; d++) {
        acc[d] *= inv_exp_sum;
    }

    // 写入输出
    scalar_t* output = out + seq_idx * num_heads * HEAD_SIZE + head_idx * HEAD_SIZE;
    for (int d = thread_idx; d < HEAD_SIZE; d += NUM_THREADS) {
        output[d] = scalar_t(acc[d]);
    }
}

Python接口

# vllm/attention/backends/flash_attn.py

class FlashAttentionBackend(AttentionBackend):
    """
    使用FlashAttention的Attention后端
    """

    @staticmethod
    def get_impl_cls() -> Type["FlashAttentionImpl"]:
        return FlashAttentionImpl


class FlashAttentionImpl(AttentionImpl):

    def forward(
        self,
        query: torch.Tensor,
        key: torch.Tensor,
        value: torch.Tensor,
        kv_cache: torch.Tensor,
        attn_metadata: FlashAttentionMetadata,
        k_scale: float = 1.0,
        v_scale: float = 1.0,
    ) -> torch.Tensor:
        """
        执行PagedAttention

        Args:
            query: [num_tokens, num_heads, head_size]
            key: [num_tokens, num_kv_heads, head_size]
            value: [num_tokens, num_kv_heads, head_size]
            kv_cache: [2, num_blocks, block_size, num_kv_heads, head_size]
            attn_metadata: 包含block_tables, context_lens等
        """
        num_tokens, num_heads, head_size = query.shape

        # 更新KV Cache
        if kv_cache is not None:
            key_cache = kv_cache[0]
            value_cache = kv_cache[1]

            # 将新的KV写入cache
            ops.reshape_and_cache(
                key,
                value,
                key_cache,
                value_cache,
                attn_metadata.slot_mapping,
            )

        # Prefill (prompt处理)
        if attn_metadata.prefill_metadata is not None:
            output = flash_attn_varlen_func(
                q=query,
                k=key,
                v=value,
                cu_seqlens_q=attn_metadata.prefill_metadata.seq_start_loc,
                cu_seqlens_k=attn_metadata.prefill_metadata.seq_start_loc,
                max_seqlen_q=attn_metadata.prefill_metadata.max_seq_len,
                max_seqlen_k=attn_metadata.prefill_metadata.max_seq_len,
                softmax_scale=self.scale,
                causal=True,
            )

        # Decode (生成阶段)
        if attn_metadata.decode_metadata is not None:
            output = paged_attention_v1(
                out=output,
                query=query,
                key_cache=key_cache,
                value_cache=value_cache,
                num_kv_heads=self.num_kv_heads,
                scale=self.scale,
                block_tables=attn_metadata.decode_metadata.block_tables,
                context_lens=attn_metadata.decode_metadata.context_lens,
                block_size=self.block_size,
                max_context_len=attn_metadata.decode_metadata.max_context_len,
            )

        return output

LLMEngine 核心实现

# vllm/engine/llm_engine.py

class LLMEngine:
    """
    vLLM核心引擎

    管理模型、调度器、Worker
    """

    def __init__(
        self,
        model_config: ModelConfig,
        cache_config: CacheConfig,
        parallel_config: ParallelConfig,
        scheduler_config: SchedulerConfig,
        device_config: DeviceConfig,
        lora_config: Optional[LoRAConfig] = None,
    ):
        self.model_config = model_config
        self.cache_config = cache_config

        # 初始化调度器
        self.scheduler = Scheduler(scheduler_config, cache_config)

        # 初始化Worker (可能是分布式)
        self._init_workers(
            model_config,
            parallel_config,
            device_config,
        )

        # 初始化KV Cache
        self._init_cache()

    def _init_workers(self, model_config, parallel_config, device_config):
        """初始化Worker进程"""
        if parallel_config.world_size == 1:
            # 单GPU
            self.workers = [Worker(model_config, parallel_config, 0)]
        else:
            # 多GPU (使用Ray分布式)
            self.workers = []
            for rank in range(parallel_config.world_size):
                worker = ray.remote(Worker).remote(
                    model_config, parallel_config, rank
                )
                self.workers.append(worker)

    def _init_cache(self):
        """Profile并初始化KV Cache"""
        # Profile: 确定能分配多少Block
        num_gpu_blocks, num_cpu_blocks = self._profile_num_available_blocks()

        self.cache_config.num_gpu_blocks = num_gpu_blocks
        self.cache_config.num_cpu_blocks = num_cpu_blocks

        # 初始化Cache
        self._run_workers("init_cache_engine", cache_config=self.cache_config)

    def add_request(
        self,
        request_id: str,
        prompt: str,
        sampling_params: SamplingParams,
    ) -> None:
        """添加新请求"""
        # Tokenize
        prompt_token_ids = self.tokenizer.encode(prompt)

        # 创建Sequence
        seq = Sequence(seq_id, prompt_token_ids)
        seq_group = SequenceGroup(request_id, [seq], sampling_params)

        # 添加到调度器
        self.scheduler.add_seq_group(seq_group)

    def step(self) -> List[RequestOutput]:
        """
        执行一步推理

        这是vLLM的核心循环
        """
        # 1. 调度: 决定哪些请求参与本次推理
        scheduler_outputs = self.scheduler.schedule()

        if scheduler_outputs.is_empty():
            return []

        # 2. 准备输入
        execute_model_req = self._prepare_model_input(scheduler_outputs)

        # 3. 执行模型
        output = self._run_workers(
            "execute_model",
            execute_model_req=execute_model_req,
        )

        # 4. 处理输出
        return self._process_model_outputs(output, scheduler_outputs)

    def _prepare_model_input(
        self,
        scheduler_outputs: SchedulerOutputs
    ) -> ExecuteModelRequest:
        """准备模型输入"""
        # 收集所有要处理的序列
        seq_group_metadata_list = []

        for seq_group in scheduler_outputs.scheduled_seq_groups:
            seq_data = {}
            for seq in seq_group.get_seqs():
                seq_data[seq.seq_id] = SequenceData(
                    prompt_token_ids=seq.prompt_token_ids,
                    output_token_ids=seq.output_token_ids,
                )

            # Block表
            block_tables = {}
            for seq in seq_group.get_seqs():
                block_table = self.scheduler.block_manager.get_block_table(seq)
                block_tables[seq.seq_id] = block_table

            metadata = SequenceGroupMetadata(
                request_id=seq_group.request_id,
                is_prompt=scheduler_outputs.prompt_run,
                seq_data=seq_data,
                sampling_params=seq_group.sampling_params,
                block_tables=block_tables,
            )
            seq_group_metadata_list.append(metadata)

        return ExecuteModelRequest(
            seq_group_metadata_list=seq_group_metadata_list,
            blocks_to_swap_in=scheduler_outputs.blocks_to_swap_in,
            blocks_to_swap_out=scheduler_outputs.blocks_to_swap_out,
            blocks_to_copy=scheduler_outputs.blocks_to_copy,
        )

    def _process_model_outputs(
        self,
        output: SamplerOutput,
        scheduler_outputs: SchedulerOutputs
    ) -> List[RequestOutput]:
        """处理模型输出"""
        request_outputs = []

        for seq_group, outputs in zip(
            scheduler_outputs.scheduled_seq_groups,
            output.outputs
        ):
            for seq, output in zip(seq_group.get_seqs(), outputs):
                # 获取采样结果
                token_id = output.samples[0].output_token
                logprob = output.samples[0].logprobs

                # 追加到序列
                seq.append_token_id(token_id, logprob)

                # 分配新的slot (如果需要)
                self.scheduler.block_manager.append_slot(seq)

                # 检查是否完成
                if self._check_stop(seq, seq_group.sampling_params):
                    seq.status = SequenceStatus.FINISHED_STOPPED

            # 构建输出
            request_output = RequestOutput(
                request_id=seq_group.request_id,
                prompt=seq_group.prompt,
                outputs=[...],
                finished=seq_group.is_finished(),
            )
            request_outputs.append(request_output)

        return request_outputs

总结

vLLM 核心技术

┌─────────────────────────────────────────────────────────────────────────┐
│                        vLLM 核心技术                                     │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  PagedAttention                                                          │
│  ──────────────                                                          │
│  • Block化KV Cache管理                                                   │
│  • 消除显存碎片                                                          │
│  • 支持动态分配                                                          │
│  • 接近100%显存利用率                                                    │
│                                                                          │
│  Continuous Batching                                                     │
│  ────────────────────                                                    │
│  • 每个step独立调度                                                      │
│  • 完成即释放,新请求即入队                                               │
│  • 无padding浪费                                                         │
│  • 吞吐量提升2-3倍                                                       │
│                                                                          │
│  KV Cache Swapping                                                       │
│  ─────────────────                                                       │
│  • GPU Block换出到CPU                                                    │
│  • 支持更多并发请求                                                      │
│  • 抢占式调度                                                            │
│                                                                          │
│  Block Table                                                             │
│  ────────────                                                            │
│  • 逻辑Block到物理Block映射                                              │
│  • 类似OS页表                                                            │
│  • 支持共享 (prefix caching)                                             │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

面试高频问题

  1. PagedAttention如何解决KV Cache碎片问题?
  2. Continuous Batching相比Static Batching的优势?
  3. vLLM的调度策略是什么?
  4. Block Table是如何工作的?
  5. KV Cache Swapping的实现原理?
Prev
03-Megatron-LM源码解析
Next
05-HuggingFace Transformers源码解析