04-vLLM推理引擎源码解析
概述
vLLM是UC Berkeley开发的高性能LLM推理引擎,其核心创新是PagedAttention技术,实现了接近理论峰值的推理吞吐。本章深入解析vLLM的核心源码实现。
vLLM 架构
整体架构
┌─────────────────────────────────────────────────────────────────────────┐
│ vLLM 架构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ API Layer │ │
│ │ │ │
│ │ OpenAI Compatible API Async Engine API │ │
│ │ /v1/completions AsyncLLMEngine │ │
│ │ /v1/chat/completions generate() │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────────┐ │
│ │ Engine Layer │ │
│ │ │ │
│ │ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ │
│ │ │ LLMEngine │ │ Scheduler │ │ BlockManager │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ ├─ add_req │ │ ├─ schedule │ │ ├─ allocate │ │ │
│ │ │ └─ step │ │ └─ _schedule │ │ └─ free │ │ │
│ │ └───────────────┘ └───────────────┘ └───────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────────┐ │
│ │ Worker Layer │ │
│ │ │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ Worker │ │ │
│ │ │ │ │ │
│ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │
│ │ │ │ ModelRunner │ │ CacheEngine │ │ KV Cache │ │ │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ │ │ ├─ execute │ │ ├─ swap_in │ │ (GPU Mem) │ │ │ │
│ │ │ │ └─ profile │ │ └─ swap_out │ │ │ │ │ │
│ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │
│ │ └───────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────────┐ │
│ │ Model Layer │ │
│ │ │ │
│ │ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ │
│ │ │ Attention │ │ MLP │ │ Embedding │ │ │
│ │ │ (PagedAttn) │ │ │ │ │ │ │
│ │ └───────────────┘ └───────────────┘ └───────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
源码目录结构
vllm/
├── entrypoints/
│ ├── openai/
│ │ ├── api_server.py # OpenAI兼容API服务
│ │ └── serving_chat.py # Chat API实现
│ └── llm.py # 简单Python API
│
├── engine/
│ ├── llm_engine.py # 核心引擎
│ ├── async_llm_engine.py # 异步引擎
│ └── arg_utils.py # 参数处理
│
├── core/
│ ├── scheduler.py # 调度器
│ ├── block_manager.py # Block管理
│ └── block.py # Block定义
│
├── worker/
│ ├── worker.py # Worker进程
│ ├── model_runner.py # 模型执行
│ └── cache_engine.py # 缓存引擎
│
├── model_executor/
│ ├── layers/
│ │ ├── attention.py # PagedAttention
│ │ └── linear.py # 量化线性层
│ ├── models/
│ │ ├── llama.py # LLaMA模型
│ │ └── gpt2.py # GPT-2模型
│ └── parallel_utils/
│ └── tensor_parallel.py # 张量并行
│
├── attention/
│ ├── backends/
│ │ ├── flash_attn.py # FlashAttention后端
│ │ └── xformers.py # xFormers后端
│ └── ops/
│ └── paged_attn.py # PagedAttention CUDA kernel
│
└── distributed/
└── communication_op.py # 分布式通信
PagedAttention 核心原理
KV Cache 显存问题
┌─────────────────────────────────────────────────────────────────────────┐
│ KV Cache 显存问题 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 传统KV Cache分配: │
│ ═══════════════════ │
│ │
│ 问题1: 预分配导致浪费 │
│ ───────────────────── │
│ │
│ Request 1 (实际长度 100): │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │████████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░│ │
│ │ Used (100) Wasted (Max_len - 100) │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ Request 2 (实际长度 500): │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │████████████████████████████████████░░░░░░░░░░░░░░░░░░░░░░░░░░│ │
│ │ Used (500) Wasted │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ 预分配 max_seq_len (如2048) 的连续内存 │
│ 实际使用率可能只有 20-40% │
│ │
│ 问题2: 连续分配限制并发 │
│ ───────────────────────── │
│ │
│ GPU Memory: │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ Req1 │ Req2 │ Free │ Req3 │ Free │ Free │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ 新请求Req4需要连续2048空间, 但碎片化导致无法分配 │
│ 即使总空闲空间足够 │
│ │
│ │
│ PagedAttention 解决方案: │
│ ═══════════════════════════ │
│ │
│ 将KV Cache分成固定大小的Block (类似OS的内存分页) │
│ │
│ ┌───────┬───────┬───────┬───────┬───────┬───────┬───────┬───────┐ │
│ │Block 0│Block 1│Block 2│Block 3│Block 4│Block 5│Block 6│Block 7│ │
│ └───────┴───────┴───────┴───────┴───────┴───────┴───────┴───────┘ │
│ │
│ Request 1: [Block 0] → [Block 3] → [Block 5] (按需分配) │
│ Request 2: [Block 1] → [Block 2] → [Block 4] → [Block 6] │
│ Request 3: [Block 7] │
│ │
│ 优势: │
│ • 按需分配: 不预留空间 │
│ • 消除碎片: 任意空闲Block可用 │
│ • 支持共享: 相同prefix的请求共享Block │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Block 数据结构
# vllm/core/block.py
@dataclass
class PhysicalTokenBlock:
"""
物理Block: 实际存储KV cache的GPU内存块
每个Block存储固定数量token的KV cache
"""
device: Device # CPU or GPU
block_number: int # 物理Block编号
block_size: int # Block大小 (token数)
ref_count: int = 0 # 引用计数 (用于共享)
def __post_init__(self):
# 实际的KV tensor在CacheEngine中分配
pass
@dataclass
class LogicalTokenBlock:
"""
逻辑Block: 请求视角的Block
一个逻辑Block可能映射到一个物理Block
多个逻辑Block可以共享同一个物理Block (prefix caching)
"""
block_number: int
block_size: int
token_ids: List[int] = field(default_factory=list)
num_tokens: int = 0
def is_full(self) -> bool:
return self.num_tokens == self.block_size
def append_token(self, token_id: int) -> None:
assert not self.is_full()
self.token_ids.append(token_id)
self.num_tokens += 1
def get_num_empty_slots(self) -> int:
return self.block_size - self.num_tokens
class BlockTable:
"""
Block表: 维护逻辑Block到物理Block的映射
类似操作系统的页表
"""
def __init__(self):
self.logical_to_physical: Dict[int, int] = {}
def add_mapping(self, logical_block_num: int, physical_block_num: int):
self.logical_to_physical[logical_block_num] = physical_block_num
def get_physical_block(self, logical_block_num: int) -> int:
return self.logical_to_physical[logical_block_num]
def get_block_table(self) -> List[int]:
"""返回物理Block编号列表 (传给CUDA kernel)"""
return [self.logical_to_physical[i]
for i in range(len(self.logical_to_physical))]
BlockManager 实现
# vllm/core/block_manager.py
class BlockSpaceManager:
"""
Block空间管理器
管理GPU和CPU上的物理Block分配和释放
"""
def __init__(
self,
block_size: int,
num_gpu_blocks: int,
num_cpu_blocks: int,
watermark: float = 0.01,
sliding_window: Optional[int] = None,
):
self.block_size = block_size
self.num_gpu_blocks = num_gpu_blocks
self.num_cpu_blocks = num_cpu_blocks
# GPU Block池: 空闲的物理Block
self.gpu_allocator = BlockAllocator(
Device.GPU, block_size, num_gpu_blocks
)
# CPU Block池 (用于swap)
self.cpu_allocator = BlockAllocator(
Device.CPU, block_size, num_cpu_blocks
)
# 每个sequence的Block表
self.block_tables: Dict[int, BlockTable] = {}
# watermark: 预留的最小空闲Block数
self.watermark_blocks = int(num_gpu_blocks * watermark)
def can_allocate(self, seq_group: SequenceGroup) -> AllocStatus:
"""检查是否能为新请求分配Block"""
# 计算需要的Block数
num_required_blocks = self._get_num_required_blocks(seq_group)
# 检查空闲Block是否足够
num_free_gpu_blocks = self.gpu_allocator.get_num_free_blocks()
if num_free_gpu_blocks - num_required_blocks >= self.watermark_blocks:
return AllocStatus.OK
elif num_free_gpu_blocks >= num_required_blocks:
return AllocStatus.LATER # 等待其他请求完成
else:
return AllocStatus.NEVER # 请求太长,永远无法分配
def allocate(self, seq_group: SequenceGroup) -> None:
"""为新请求分配Block"""
seq = seq_group.get_seqs()[0]
# 计算需要的Block数
num_prompt_blocks = (len(seq.prompt_token_ids) + self.block_size - 1) // self.block_size
# 分配物理Block
block_table = BlockTable()
for logical_idx in range(num_prompt_blocks):
# 分配物理Block
physical_block = self.gpu_allocator.allocate()
block_table.add_mapping(logical_idx, physical_block.block_number)
self.block_tables[seq.seq_id] = block_table
def can_append_slot(self, seq_group: SequenceGroup) -> bool:
"""检查是否能追加一个新token"""
seq = seq_group.get_seqs()[0]
block_table = self.block_tables[seq.seq_id]
# 检查最后一个Block是否有空间
last_block_idx = len(block_table.logical_to_physical) - 1
last_block = self.gpu_allocator.get_block(
block_table.get_physical_block(last_block_idx)
)
if last_block.get_num_empty_slots() > 0:
return True
# 需要分配新Block
return self.gpu_allocator.get_num_free_blocks() >= 1
def append_slot(self, seq: Sequence) -> Optional[Tuple[int, int]]:
"""追加一个slot用于新生成的token"""
block_table = self.block_tables[seq.seq_id]
# 检查最后一个Block
if len(block_table.logical_to_physical) > 0:
last_logical_idx = len(block_table.logical_to_physical) - 1
last_physical_idx = block_table.get_physical_block(last_logical_idx)
last_block = self.gpu_allocator.get_block(last_physical_idx)
if last_block.get_num_empty_slots() > 0:
# 最后一个Block还有空间
return None
# 需要分配新Block
physical_block = self.gpu_allocator.allocate()
new_logical_idx = len(block_table.logical_to_physical)
block_table.add_mapping(new_logical_idx, physical_block.block_number)
return (new_logical_idx, physical_block.block_number)
def free(self, seq: Sequence) -> None:
"""释放sequence的所有Block"""
block_table = self.block_tables.pop(seq.seq_id)
for physical_block_num in block_table.logical_to_physical.values():
self.gpu_allocator.free(physical_block_num)
def swap_out(self, seq_group: SequenceGroup) -> Dict[int, int]:
"""将GPU Block换出到CPU"""
mapping = {}
for seq in seq_group.get_seqs():
block_table = self.block_tables[seq.seq_id]
for logical_idx, gpu_block_num in block_table.logical_to_physical.items():
# 分配CPU Block
cpu_block = self.cpu_allocator.allocate()
# 记录映射 (用于数据拷贝)
mapping[gpu_block_num] = cpu_block.block_number
# 更新Block表
block_table.add_mapping(logical_idx, cpu_block.block_number)
# 释放GPU Block
self.gpu_allocator.free(gpu_block_num)
return mapping
def swap_in(self, seq_group: SequenceGroup) -> Dict[int, int]:
"""将CPU Block换入到GPU"""
mapping = {}
for seq in seq_group.get_seqs():
block_table = self.block_tables[seq.seq_id]
for logical_idx, cpu_block_num in block_table.logical_to_physical.items():
# 分配GPU Block
gpu_block = self.gpu_allocator.allocate()
# 记录映射
mapping[cpu_block_num] = gpu_block.block_number
# 更新Block表
block_table.add_mapping(logical_idx, gpu_block.block_number)
# 释放CPU Block
self.cpu_allocator.free(cpu_block_num)
return mapping
Scheduler 调度器
调度策略
# vllm/core/scheduler.py
class Scheduler:
"""
请求调度器
决定:
1. 哪些请求可以在当前step执行
2. 哪些请求需要preempt (换出)
3. 哪些请求可以swap回来
"""
def __init__(
self,
scheduler_config: SchedulerConfig,
cache_config: CacheConfig,
):
self.scheduler_config = scheduler_config
self.cache_config = cache_config
# Block管理器
self.block_manager = BlockSpaceManager(
block_size=cache_config.block_size,
num_gpu_blocks=cache_config.num_gpu_blocks,
num_cpu_blocks=cache_config.num_cpu_blocks,
)
# 请求队列
self.waiting: Deque[SequenceGroup] = deque() # 等待处理
self.running: Deque[SequenceGroup] = deque() # 正在运行
self.swapped: Deque[SequenceGroup] = deque() # 已换出到CPU
def add_seq_group(self, seq_group: SequenceGroup) -> None:
"""添加新请求到等待队列"""
self.waiting.append(seq_group)
def schedule(self) -> Tuple[SchedulerOutputs, bool]:
"""
主调度函数
返回:
- SchedulerOutputs: 调度结果
- bool: 是否有prompt (prefill)
"""
# 1. 首先处理swapped队列 (优先恢复之前被抢占的请求)
self._schedule_swapped()
# 2. 处理running队列 (decode阶段的请求)
self._schedule_running()
# 3. 处理waiting队列 (新请求,prefill阶段)
self._schedule_waiting()
# 构建输出
return self._build_scheduler_outputs()
def _schedule_waiting(self) -> None:
"""调度等待队列中的请求"""
while self.waiting:
seq_group = self.waiting[0]
# 检查是否能分配Block
alloc_status = self.block_manager.can_allocate(seq_group)
if alloc_status == AllocStatus.OK:
# 可以分配,移到running队列
self.waiting.popleft()
self._allocate(seq_group)
self.running.append(seq_group)
elif alloc_status == AllocStatus.LATER:
# 暂时不能分配,等待
break
else: # NEVER
# 请求太长,无法处理
self.waiting.popleft()
self._abort_seq_group(seq_group)
def _schedule_running(self) -> None:
"""调度正在运行的请求"""
# 检查是否有足够的Block继续生成
preempted = []
for seq_group in self.running:
# 检查是否能追加新token
if not self.block_manager.can_append_slot(seq_group):
# Block不足,需要抢占
preempted.append(seq_group)
# 抢占优先级最低的请求
for seq_group in reversed(preempted):
self._preempt(seq_group)
self.running.remove(seq_group)
self.swapped.append(seq_group)
def _schedule_swapped(self) -> None:
"""调度被换出的请求"""
while self.swapped:
seq_group = self.swapped[0]
# 检查是否能swap回来
if self.block_manager.can_swap_in(seq_group):
self.swapped.popleft()
self._swap_in(seq_group)
self.running.append(seq_group)
else:
break
def _preempt(self, seq_group: SequenceGroup) -> None:
"""抢占请求 (swap out到CPU)"""
preemption_mode = self.scheduler_config.preemption_mode
if preemption_mode == PreemptionMode.SWAP:
# 换出到CPU
self._swap_out(seq_group)
else:
# 重新计算 (丢弃KV cache,之后重算)
self._recompute(seq_group)
def _allocate(self, seq_group: SequenceGroup) -> None:
"""为请求分配Block"""
self.block_manager.allocate(seq_group)
def _swap_out(self, seq_group: SequenceGroup) -> Dict[int, int]:
"""换出到CPU"""
return self.block_manager.swap_out(seq_group)
def _swap_in(self, seq_group: SequenceGroup) -> Dict[int, int]:
"""从CPU换入"""
return self.block_manager.swap_in(seq_group)
Continuous Batching
┌─────────────────────────────────────────────────────────────────────────┐
│ Continuous Batching │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 传统Static Batching: │
│ ═══════════════════════ │
│ │
│ Time → │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ Req1: [===Prefill===][=====Decode=====][PAD][PAD][PAD] │ │
│ │ Req2: [===Prefill===][===Decode===][PAD][PAD][PAD][PAD][PAD] │ │
│ │ Req3: [===Prefill===][=======Decode=======][PAD][PAD] │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ 所有请求必须等到最长的完成才能处理新请求 │
│ 短请求等待时有大量padding浪费 │
│ │
│ │
│ Continuous Batching: │
│ ═════════════════════ │
│ │
│ Time → │
│ Step 1: [Req1 Prefill][Req2 Prefill][Req3 Prefill] │
│ Step 2: [Req1 Decode ][Req2 Decode ][Req3 Decode ] │
│ Step 3: [Req1 Decode ][Req2 DONE→Req4 Prefill][Req3 Decode ] │
│ Step 4: [Req1 Decode ][Req4 Decode ][Req3 Decode ] │
│ Step 5: [Req1 DONE→Req5][Req4 Decode ][Req3 Decode ] │
│ ... │
│ │
│ 每个step独立调度: │
│ • 完成的请求立即移除 │
│ • 空出的slot立即填入新请求 │
│ • 无padding浪费 │
│ • 吞吐量大幅提升 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
PagedAttention CUDA Kernel
Kernel 实现
// vllm/attention/ops/paged_attention.cu (简化版)
template <typename scalar_t, int HEAD_SIZE, int BLOCK_SIZE, int NUM_THREADS>
__global__ void paged_attention_kernel(
scalar_t* __restrict__ out, // [num_seqs, num_heads, head_size]
const scalar_t* __restrict__ query, // [num_seqs, num_heads, head_size]
const scalar_t* __restrict__ key_cache, // [num_blocks, num_heads, head_size, block_size]
const scalar_t* __restrict__ value_cache, // [num_blocks, num_heads, head_size, block_size]
const int* __restrict__ block_tables, // [num_seqs, max_num_blocks_per_seq]
const int* __restrict__ context_lens, // [num_seqs]
const int max_num_blocks_per_seq,
const float scale
) {
// 获取当前处理的sequence和head
const int seq_idx = blockIdx.x;
const int head_idx = blockIdx.y;
const int thread_idx = threadIdx.x;
// 获取context长度
const int context_len = context_lens[seq_idx];
const int num_blocks = (context_len + BLOCK_SIZE - 1) / BLOCK_SIZE;
// Query: 当前token的query
const scalar_t* q = query + seq_idx * num_heads * HEAD_SIZE + head_idx * HEAD_SIZE;
// 初始化累加器
float qk_max = -FLT_MAX;
float exp_sum = 0.0f;
float acc[HEAD_SIZE] = {0.0f};
// 遍历所有Block
for (int block_idx = 0; block_idx < num_blocks; block_idx++) {
// 从Block表获取物理Block编号
const int physical_block_num = block_tables[seq_idx * max_num_blocks_per_seq + block_idx];
// Key Cache指针
const scalar_t* k_block = key_cache +
physical_block_num * num_heads * HEAD_SIZE * BLOCK_SIZE +
head_idx * HEAD_SIZE * BLOCK_SIZE;
// Value Cache指针
const scalar_t* v_block = value_cache +
physical_block_num * num_heads * HEAD_SIZE * BLOCK_SIZE +
head_idx * HEAD_SIZE * BLOCK_SIZE;
// 计算Block内有效token数
const int block_start = block_idx * BLOCK_SIZE;
const int block_end = min(block_start + BLOCK_SIZE, context_len);
const int num_valid_tokens = block_end - block_start;
// 计算QK
for (int token_idx = thread_idx; token_idx < num_valid_tokens; token_idx += NUM_THREADS) {
float qk = 0.0f;
// Q @ K^T
for (int d = 0; d < HEAD_SIZE; d++) {
qk += float(q[d]) * float(k_block[d * BLOCK_SIZE + token_idx]);
}
qk *= scale;
// 更新max
qk_max = fmaxf(qk_max, qk);
}
}
// 跨线程同步max
__shared__ float shared_qk_max;
qk_max = blockReduceMax(qk_max);
if (thread_idx == 0) {
shared_qk_max = qk_max;
}
__syncthreads();
qk_max = shared_qk_max;
// 第二遍: 计算softmax和value累加
for (int block_idx = 0; block_idx < num_blocks; block_idx++) {
const int physical_block_num = block_tables[seq_idx * max_num_blocks_per_seq + block_idx];
const scalar_t* k_block = key_cache + ...;
const scalar_t* v_block = value_cache + ...;
const int block_start = block_idx * BLOCK_SIZE;
const int block_end = min(block_start + BLOCK_SIZE, context_len);
const int num_valid_tokens = block_end - block_start;
for (int token_idx = thread_idx; token_idx < num_valid_tokens; token_idx += NUM_THREADS) {
// 重新计算QK
float qk = 0.0f;
for (int d = 0; d < HEAD_SIZE; d++) {
qk += float(q[d]) * float(k_block[d * BLOCK_SIZE + token_idx]);
}
qk *= scale;
// Softmax
float p = expf(qk - qk_max);
exp_sum += p;
// 累加V
for (int d = 0; d < HEAD_SIZE; d++) {
acc[d] += p * float(v_block[d * BLOCK_SIZE + token_idx]);
}
}
}
// 跨线程同步exp_sum
__shared__ float shared_exp_sum;
exp_sum = blockReduceSum(exp_sum);
if (thread_idx == 0) {
shared_exp_sum = exp_sum;
}
__syncthreads();
exp_sum = shared_exp_sum;
// Normalize
const float inv_exp_sum = 1.0f / (exp_sum + 1e-6f);
for (int d = 0; d < HEAD_SIZE; d++) {
acc[d] *= inv_exp_sum;
}
// 写入输出
scalar_t* output = out + seq_idx * num_heads * HEAD_SIZE + head_idx * HEAD_SIZE;
for (int d = thread_idx; d < HEAD_SIZE; d += NUM_THREADS) {
output[d] = scalar_t(acc[d]);
}
}
Python接口
# vllm/attention/backends/flash_attn.py
class FlashAttentionBackend(AttentionBackend):
"""
使用FlashAttention的Attention后端
"""
@staticmethod
def get_impl_cls() -> Type["FlashAttentionImpl"]:
return FlashAttentionImpl
class FlashAttentionImpl(AttentionImpl):
def forward(
self,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
kv_cache: torch.Tensor,
attn_metadata: FlashAttentionMetadata,
k_scale: float = 1.0,
v_scale: float = 1.0,
) -> torch.Tensor:
"""
执行PagedAttention
Args:
query: [num_tokens, num_heads, head_size]
key: [num_tokens, num_kv_heads, head_size]
value: [num_tokens, num_kv_heads, head_size]
kv_cache: [2, num_blocks, block_size, num_kv_heads, head_size]
attn_metadata: 包含block_tables, context_lens等
"""
num_tokens, num_heads, head_size = query.shape
# 更新KV Cache
if kv_cache is not None:
key_cache = kv_cache[0]
value_cache = kv_cache[1]
# 将新的KV写入cache
ops.reshape_and_cache(
key,
value,
key_cache,
value_cache,
attn_metadata.slot_mapping,
)
# Prefill (prompt处理)
if attn_metadata.prefill_metadata is not None:
output = flash_attn_varlen_func(
q=query,
k=key,
v=value,
cu_seqlens_q=attn_metadata.prefill_metadata.seq_start_loc,
cu_seqlens_k=attn_metadata.prefill_metadata.seq_start_loc,
max_seqlen_q=attn_metadata.prefill_metadata.max_seq_len,
max_seqlen_k=attn_metadata.prefill_metadata.max_seq_len,
softmax_scale=self.scale,
causal=True,
)
# Decode (生成阶段)
if attn_metadata.decode_metadata is not None:
output = paged_attention_v1(
out=output,
query=query,
key_cache=key_cache,
value_cache=value_cache,
num_kv_heads=self.num_kv_heads,
scale=self.scale,
block_tables=attn_metadata.decode_metadata.block_tables,
context_lens=attn_metadata.decode_metadata.context_lens,
block_size=self.block_size,
max_context_len=attn_metadata.decode_metadata.max_context_len,
)
return output
LLMEngine 核心实现
# vllm/engine/llm_engine.py
class LLMEngine:
"""
vLLM核心引擎
管理模型、调度器、Worker
"""
def __init__(
self,
model_config: ModelConfig,
cache_config: CacheConfig,
parallel_config: ParallelConfig,
scheduler_config: SchedulerConfig,
device_config: DeviceConfig,
lora_config: Optional[LoRAConfig] = None,
):
self.model_config = model_config
self.cache_config = cache_config
# 初始化调度器
self.scheduler = Scheduler(scheduler_config, cache_config)
# 初始化Worker (可能是分布式)
self._init_workers(
model_config,
parallel_config,
device_config,
)
# 初始化KV Cache
self._init_cache()
def _init_workers(self, model_config, parallel_config, device_config):
"""初始化Worker进程"""
if parallel_config.world_size == 1:
# 单GPU
self.workers = [Worker(model_config, parallel_config, 0)]
else:
# 多GPU (使用Ray分布式)
self.workers = []
for rank in range(parallel_config.world_size):
worker = ray.remote(Worker).remote(
model_config, parallel_config, rank
)
self.workers.append(worker)
def _init_cache(self):
"""Profile并初始化KV Cache"""
# Profile: 确定能分配多少Block
num_gpu_blocks, num_cpu_blocks = self._profile_num_available_blocks()
self.cache_config.num_gpu_blocks = num_gpu_blocks
self.cache_config.num_cpu_blocks = num_cpu_blocks
# 初始化Cache
self._run_workers("init_cache_engine", cache_config=self.cache_config)
def add_request(
self,
request_id: str,
prompt: str,
sampling_params: SamplingParams,
) -> None:
"""添加新请求"""
# Tokenize
prompt_token_ids = self.tokenizer.encode(prompt)
# 创建Sequence
seq = Sequence(seq_id, prompt_token_ids)
seq_group = SequenceGroup(request_id, [seq], sampling_params)
# 添加到调度器
self.scheduler.add_seq_group(seq_group)
def step(self) -> List[RequestOutput]:
"""
执行一步推理
这是vLLM的核心循环
"""
# 1. 调度: 决定哪些请求参与本次推理
scheduler_outputs = self.scheduler.schedule()
if scheduler_outputs.is_empty():
return []
# 2. 准备输入
execute_model_req = self._prepare_model_input(scheduler_outputs)
# 3. 执行模型
output = self._run_workers(
"execute_model",
execute_model_req=execute_model_req,
)
# 4. 处理输出
return self._process_model_outputs(output, scheduler_outputs)
def _prepare_model_input(
self,
scheduler_outputs: SchedulerOutputs
) -> ExecuteModelRequest:
"""准备模型输入"""
# 收集所有要处理的序列
seq_group_metadata_list = []
for seq_group in scheduler_outputs.scheduled_seq_groups:
seq_data = {}
for seq in seq_group.get_seqs():
seq_data[seq.seq_id] = SequenceData(
prompt_token_ids=seq.prompt_token_ids,
output_token_ids=seq.output_token_ids,
)
# Block表
block_tables = {}
for seq in seq_group.get_seqs():
block_table = self.scheduler.block_manager.get_block_table(seq)
block_tables[seq.seq_id] = block_table
metadata = SequenceGroupMetadata(
request_id=seq_group.request_id,
is_prompt=scheduler_outputs.prompt_run,
seq_data=seq_data,
sampling_params=seq_group.sampling_params,
block_tables=block_tables,
)
seq_group_metadata_list.append(metadata)
return ExecuteModelRequest(
seq_group_metadata_list=seq_group_metadata_list,
blocks_to_swap_in=scheduler_outputs.blocks_to_swap_in,
blocks_to_swap_out=scheduler_outputs.blocks_to_swap_out,
blocks_to_copy=scheduler_outputs.blocks_to_copy,
)
def _process_model_outputs(
self,
output: SamplerOutput,
scheduler_outputs: SchedulerOutputs
) -> List[RequestOutput]:
"""处理模型输出"""
request_outputs = []
for seq_group, outputs in zip(
scheduler_outputs.scheduled_seq_groups,
output.outputs
):
for seq, output in zip(seq_group.get_seqs(), outputs):
# 获取采样结果
token_id = output.samples[0].output_token
logprob = output.samples[0].logprobs
# 追加到序列
seq.append_token_id(token_id, logprob)
# 分配新的slot (如果需要)
self.scheduler.block_manager.append_slot(seq)
# 检查是否完成
if self._check_stop(seq, seq_group.sampling_params):
seq.status = SequenceStatus.FINISHED_STOPPED
# 构建输出
request_output = RequestOutput(
request_id=seq_group.request_id,
prompt=seq_group.prompt,
outputs=[...],
finished=seq_group.is_finished(),
)
request_outputs.append(request_output)
return request_outputs
总结
vLLM 核心技术
┌─────────────────────────────────────────────────────────────────────────┐
│ vLLM 核心技术 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ PagedAttention │
│ ────────────── │
│ • Block化KV Cache管理 │
│ • 消除显存碎片 │
│ • 支持动态分配 │
│ • 接近100%显存利用率 │
│ │
│ Continuous Batching │
│ ──────────────────── │
│ • 每个step独立调度 │
│ • 完成即释放,新请求即入队 │
│ • 无padding浪费 │
│ • 吞吐量提升2-3倍 │
│ │
│ KV Cache Swapping │
│ ───────────────── │
│ • GPU Block换出到CPU │
│ • 支持更多并发请求 │
│ • 抢占式调度 │
│ │
│ Block Table │
│ ──────────── │
│ • 逻辑Block到物理Block映射 │
│ • 类似OS页表 │
│ • 支持共享 (prefix caching) │
│ │
└─────────────────────────────────────────────────────────────────────────┘
面试高频问题
- PagedAttention如何解决KV Cache碎片问题?
- Continuous Batching相比Static Batching的优势?
- vLLM的调度策略是什么?
- Block Table是如何工作的?
- KV Cache Swapping的实现原理?