第18章:AI前沿技术趋势
本章深入探讨2024年最前沿的AI技术发展,包括Mixture of Experts架构、长上下文处理、模型量化、扩散模型、AI视频生成等核心技术,并提供完整的代码实现和实践指南。
18.1 MoE架构:Mixture of Experts
Mixture of Experts (MoE) 是一种突破性的模型架构,通过条件计算实现更高效的模型扩展。
18.1.1 MoE核心原理
MoE的核心思想是将一个大模型分解为多个"专家"网络,每次推理只激活其中一部分专家,从而在保持模型容量的同时降低计算成本。
关键概念:
- 专家网络(Experts): 多个并行的前馈网络
- 门控网络(Gating Network): 决定激活哪些专家
- 稀疏激活(Sparse Activation): 每个token只使用Top-K个专家
- 负载均衡(Load Balancing): 确保专家使用均匀
数学原理:
给定输入 x,MoE层的输出:
y = Σ(i=1 to N) G(x)_i * E_i(x)
其中:
- G(x) 是门控函数,输出N个专家的权重
- E_i(x) 是第i个专家的输出
- 通常只选择Top-K个专家(K << N)
门控函数:
G(x) = Softmax(TopK(x · W_g))
负载均衡损失:
L_balance = α * Σ(i=1 to N) f_i * P_i
其中:
- f_i 是分配给专家i的token比例
- P_i 是专家i的平均门控概率
18.1.2 完整MoE实现
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Tuple, Optional
import numpy as np
class Expert(nn.Module):
"""单个专家网络 - 标准的FFN"""
def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
super().__init__()
self.w1 = nn.Linear(d_model, d_ff)
self.w2 = nn.Linear(d_ff, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# x: [batch_size, seq_len, d_model]
return self.w2(self.dropout(F.gelu(self.w1(x))))
class TopKGate(nn.Module):
"""Top-K门控网络"""
def __init__(
self,
d_model: int,
num_experts: int,
top_k: int = 2,
capacity_factor: float = 1.25,
noisy_gate: bool = True
):
super().__init__()
self.num_experts = num_experts
self.top_k = top_k
self.capacity_factor = capacity_factor
self.noisy_gate = noisy_gate
# 门控权重
self.w_gate = nn.Linear(d_model, num_experts, bias=False)
# 噪声参数(用于训练时的探索)
if noisy_gate:
self.w_noise = nn.Linear(d_model, num_experts, bias=False)
def forward(
self,
x: torch.Tensor,
train: bool = True
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""
Args:
x: [batch_size, seq_len, d_model]
Returns:
gates: [batch_size, seq_len, top_k] - 门控权重
indices: [batch_size, seq_len, top_k] - 选中的专家索引
load: [num_experts] - 每个专家的负载
"""
# 计算门控logits
logits = self.w_gate(x) # [batch_size, seq_len, num_experts]
# 添加噪声(训练时)
if self.noisy_gate and train:
noise_logits = self.w_noise(x)
noise = torch.randn_like(logits) * F.softplus(noise_logits)
logits = logits + noise
# Top-K选择
gates = F.softmax(logits, dim=-1)
top_k_gates, top_k_indices = torch.topk(gates, self.top_k, dim=-1)
# 重新归一化
top_k_gates = top_k_gates / (top_k_gates.sum(dim=-1, keepdim=True) + 1e-8)
# 计算负载(用于负载均衡)
load = self._calculate_load(top_k_indices)
return top_k_gates, top_k_indices, load
def _calculate_load(self, indices: torch.Tensor) -> torch.Tensor:
"""计算每个专家的负载"""
batch_size, seq_len, top_k = indices.shape
# 统计每个专家被选中的次数
load = torch.zeros(
self.num_experts,
device=indices.device,
dtype=torch.float32
)
indices_flat = indices.view(-1)
load.scatter_add_(
0,
indices_flat,
torch.ones_like(indices_flat, dtype=torch.float32)
)
# 归一化
load = load / (batch_size * seq_len * top_k)
return load
class MoELayer(nn.Module):
"""完整的MoE层"""
def __init__(
self,
d_model: int,
d_ff: int,
num_experts: int = 8,
top_k: int = 2,
dropout: float = 0.1,
capacity_factor: float = 1.25,
balance_loss_weight: float = 0.01
):
super().__init__()
self.num_experts = num_experts
self.top_k = top_k
self.balance_loss_weight = balance_loss_weight
# 创建专家网络
self.experts = nn.ModuleList([
Expert(d_model, d_ff, dropout)
for _ in range(num_experts)
])
# 门控网络
self.gate = TopKGate(d_model, num_experts, top_k, capacity_factor)
def forward(
self,
x: torch.Tensor,
train: bool = True
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Args:
x: [batch_size, seq_len, d_model]
Returns:
output: [batch_size, seq_len, d_model]
aux_loss: 负载均衡损失
"""
batch_size, seq_len, d_model = x.shape
# 门控
gates, indices, load = self.gate(x, train)
# gates: [batch_size, seq_len, top_k]
# indices: [batch_size, seq_len, top_k]
# 初始化输出
output = torch.zeros_like(x)
# 为每个专家处理分配的tokens
for i in range(self.num_experts):
# 找到分配给当前专家的所有位置
expert_mask = (indices == i).any(dim=-1) # [batch_size, seq_len]
if expert_mask.any():
# 提取分配给该专家的tokens
expert_input = x[expert_mask] # [num_tokens, d_model]
# 专家处理
expert_output = self.experts[i](expert_input)
# 获取对应的门控权重
expert_gates = gates[expert_mask] # [num_tokens, top_k]
expert_indices = indices[expert_mask] # [num_tokens, top_k]
# 找到当前专家在top_k中的位置
expert_positions = (expert_indices == i).nonzero(as_tuple=True)[1]
expert_weights = expert_gates[
torch.arange(expert_gates.size(0)),
expert_positions
].unsqueeze(-1)
# 加权累加到输出
output[expert_mask] += expert_weights * expert_output
# 计算负载均衡损失
aux_loss = self._calculate_balance_loss(load)
return output, aux_loss
def _calculate_balance_loss(self, load: torch.Tensor) -> torch.Tensor:
"""
计算负载均衡损失
目标:让每个专家的负载接近 1/num_experts
"""
# 目标负载
target_load = 1.0 / self.num_experts
# 使用CV (Coefficient of Variation) 作为均衡指标
balance_loss = torch.sum(load * torch.log(load * self.num_experts + 1e-8))
return self.balance_loss_weight * balance_loss
class MoETransformerBlock(nn.Module):
"""包含MoE的Transformer块"""
def __init__(
self,
d_model: int = 768,
num_heads: int = 12,
d_ff: int = 3072,
num_experts: int = 8,
top_k: int = 2,
dropout: float = 0.1
):
super().__init__()
# 自注意力
self.attention = nn.MultiheadAttention(
d_model,
num_heads,
dropout=dropout,
batch_first=True
)
self.norm1 = nn.LayerNorm(d_model)
# MoE层
self.moe = MoELayer(
d_model=d_model,
d_ff=d_ff,
num_experts=num_experts,
top_k=top_k,
dropout=dropout
)
self.norm2 = nn.LayerNorm(d_model)
def forward(
self,
x: torch.Tensor,
mask: Optional[torch.Tensor] = None
) -> Tuple[torch.Tensor, torch.Tensor]:
# 自注意力 + 残差
attn_out, _ = self.attention(x, x, x, attn_mask=mask)
x = self.norm1(x + attn_out)
# MoE + 残差
moe_out, aux_loss = self.moe(x)
x = self.norm2(x + moe_out)
return x, aux_loss
# 使用示例
def example_moe_usage():
"""MoE使用示例"""
batch_size = 4
seq_len = 128
d_model = 768
# 创建MoE Transformer块
moe_block = MoETransformerBlock(
d_model=d_model,
num_heads=12,
d_ff=3072,
num_experts=8,
top_k=2
)
# 随机输入
x = torch.randn(batch_size, seq_len, d_model)
# 前向传播
output, aux_loss = moe_block(x)
print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
print(f"负载均衡损失: {aux_loss.item():.6f}")
# 训练循环示例
optimizer = torch.optim.Adam(moe_block.parameters(), lr=1e-4)
for step in range(10):
optimizer.zero_grad()
# 前向传播
output, aux_loss = moe_block(x)
# 主任务损失(这里用随机目标演示)
target = torch.randn_like(output)
main_loss = F.mse_loss(output, target)
# 总损失 = 主任务损失 + 负载均衡损失
total_loss = main_loss + aux_loss
# 反向传播
total_loss.backward()
optimizer.step()
if step % 5 == 0:
print(f"Step {step}: Main Loss={main_loss.item():.4f}, "
f"Aux Loss={aux_loss.item():.6f}")
if __name__ == "__main__":
example_moe_usage()
18.1.3 Mixtral MoE实现
Mistral AI的Mixtral 8x7B是最成功的开源MoE模型之一。
import torch
import torch.nn as nn
from transformers import AutoTokenizer, AutoModelForCausalLM
from typing import Optional, List
class MixtralConfig:
"""Mixtral模型配置"""
def __init__(
self,
vocab_size: int = 32000,
hidden_size: int = 4096,
intermediate_size: int = 14336,
num_hidden_layers: int = 32,
num_attention_heads: int = 32,
num_key_value_heads: int = 8, # GQA
num_experts: int = 8,
num_experts_per_token: int = 2,
max_position_embeddings: int = 32768,
rope_theta: float = 10000.0,
rms_norm_eps: float = 1e-5,
):
self.vocab_size = vocab_size
self.hidden_size = hidden_size
self.intermediate_size = intermediate_size
self.num_hidden_layers = num_hidden_layers
self.num_attention_heads = num_attention_heads
self.num_key_value_heads = num_key_value_heads
self.num_experts = num_experts
self.num_experts_per_token = num_experts_per_token
self.max_position_embeddings = max_position_embeddings
self.rope_theta = rope_theta
self.rms_norm_eps = rms_norm_eps
class MixtralSparseMoeBlock(nn.Module):
"""Mixtral的MoE实现"""
def __init__(self, config: MixtralConfig):
super().__init__()
self.hidden_dim = config.hidden_size
self.ffn_dim = config.intermediate_size
self.num_experts = config.num_experts
self.top_k = config.num_experts_per_token
# 门控网络
self.gate = nn.Linear(self.hidden_dim, self.num_experts, bias=False)
# 专家网络(每个专家是一个完整的FFN)
self.experts = nn.ModuleList([
MixtralExpert(self.hidden_dim, self.ffn_dim)
for _ in range(self.num_experts)
])
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
batch_size, seq_len, hidden_dim = hidden_states.shape
# 重塑为2D以便处理
hidden_states_flat = hidden_states.view(-1, hidden_dim)
# 计算路由权重
router_logits = self.gate(hidden_states_flat)
routing_weights = torch.softmax(router_logits, dim=-1)
# 选择top-k专家
routing_weights, selected_experts = torch.topk(
routing_weights, self.top_k, dim=-1
)
# 重新归一化
routing_weights /= routing_weights.sum(dim=-1, keepdim=True)
# 初始化输出
final_hidden_states = torch.zeros_like(hidden_states_flat)
# 为每个专家处理tokens
expert_mask = torch.nn.functional.one_hot(
selected_experts,
num_classes=self.num_experts
).permute(2, 1, 0)
for expert_idx in range(self.num_experts):
expert = self.experts[expert_idx]
idx, top_x = torch.where(expert_mask[expert_idx])
if top_x.shape[0] == 0:
continue
# 处理分配给该专家的tokens
current_state = hidden_states_flat[None, top_x].reshape(-1, hidden_dim)
current_hidden_states = expert(current_state)
# 应用路由权重
current_hidden_states *= routing_weights[top_x, idx, None]
# 累加到输出
final_hidden_states.index_add_(
0, top_x, current_hidden_states.to(hidden_states.dtype)
)
# 恢复原始形状
final_hidden_states = final_hidden_states.reshape(
batch_size, seq_len, hidden_dim
)
return final_hidden_states
class MixtralExpert(nn.Module):
"""Mixtral专家网络"""
def __init__(self, hidden_dim: int, ffn_dim: int):
super().__init__()
self.w1 = nn.Linear(hidden_dim, ffn_dim, bias=False)
self.w2 = nn.Linear(ffn_dim, hidden_dim, bias=False)
self.w3 = nn.Linear(hidden_dim, ffn_dim, bias=False)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# SwiGLU activation
return self.w2(nn.functional.silu(self.w1(x)) * self.w3(x))
# 使用Hugging Face的Mixtral模型
def use_mixtral_model():
"""使用预训练的Mixtral模型"""
model_name = "mistralai/Mixtral-8x7B-Instruct-v0.1"
# 加载tokenizer和模型
tokenizer = AutoTokenizer.from_pretrained(model_name)
# 使用8bit量化加载(节省内存)
model = AutoModelForCausalLM.from_pretrained(
model_name,
device_map="auto",
load_in_8bit=True,
torch_dtype=torch.float16
)
# 推理示例
prompt = "Explain the concept of Mixture of Experts in AI:"
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
outputs = model.generate(
**inputs,
max_new_tokens=200,
temperature=0.7,
top_p=0.9,
do_sample=True
)
generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
print(generated_text)
18.1.4 MoE的优势与挑战
优势:
- 计算效率: 虽然参数量大,但每次推理只激活部分参数
- 可扩展性: 可以通过增加专家数量来扩展模型容量
- 专业化: 不同专家可以学习处理不同类型的输入
挑战:
- 负载不均: 某些专家可能被过度使用
- 训练不稳定: 需要精心设计的负载均衡策略
- 内存占用: 所有专家都需要加载到内存
- 通信开销: 分布式训练时专家间的通信成本高
实际应用数据:
- Mixtral 8x7B: 47B参数,但每token只激活13B参数
- 性能匹敌GPT-3.5,但推理速度快6倍
- 支持32K上下文长度
- DeepSeek-V2: 236B参数,激活21B,性能接近GPT-4
18.2 长上下文技术
长上下文处理是2024年的重要突破,使模型能够处理数十万甚至百万token的输入。
18.2.1 Context Length瓶颈分析
传统注意力的问题:
# 标准自注意力的复杂度
import torch
import torch.nn as nn
import math
def standard_attention(Q, K, V, mask=None):
"""
标准缩放点积注意力
复杂度:
- 时间: O(n²d),其中n是序列长度,d是特征维度
- 空间: O(n²),需要存储注意力矩阵
"""
d_k = Q.size(-1)
# QK^T: [batch, heads, n, n]
scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(d_k)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
# Softmax: [batch, heads, n, n]
attn_weights = torch.softmax(scores, dim=-1)
# 输出: [batch, heads, n, d]
output = torch.matmul(attn_weights, V)
return output, attn_weights
# 复杂度分析
def analyze_attention_complexity():
"""分析不同序列长度的内存和计算需求"""
import numpy as np
batch_size = 1
num_heads = 32
d_head = 128
seq_lengths = [512, 2048, 8192, 32768, 131072]
print("序列长度 | 注意力矩阵大小 | 内存占用(GB) | FLOPs(B)")
print("-" * 65)
for seq_len in seq_lengths:
# 注意力矩阵大小: [batch, heads, n, n]
attn_matrix_size = batch_size * num_heads * seq_len * seq_len
# 内存占用(float16)
memory_gb = attn_matrix_size * 2 / (1024**3)
# FLOPs: QK^T + Softmax + AttnV
flops = (
2 * seq_len * seq_len * d_head + # QK^T
4 * seq_len * seq_len + # Softmax
2 * seq_len * seq_len * d_head # AttnV
) * num_heads
flops_b = flops / 1e9
print(f"{seq_len:8d} | {attn_matrix_size:14,d} | "
f"{memory_gb:11.2f} | {flops_b:10.2f}")
if __name__ == "__main__":
analyze_attention_complexity()
输出:
序列长度 | 注意力矩阵大小 | 内存占用(GB) | FLOPs(B)
-----------------------------------------------------------------
512 | 8,388,608 | 0.02 | 0.34
2048 | 134,217,728 | 0.25 | 5.50
8192 | 2,147,483,648 | 4.00 | 88.08
32768 | 34,359,738,368 | 64.00 | 1409.29
131072 |549,755,813,888 | 1024.00 | 22548.58
18.2.2 RoPE位置编码外推
RoPE (Rotary Position Embedding) 支持长度外推,是长上下文的关键技术。
import torch
import torch.nn as nn
import math
from typing import Tuple
class RotaryPositionEmbedding(nn.Module):
"""
旋转位置编码(RoPE)
论文: RoFormer: Enhanced Transformer with Rotary Position Embedding
"""
def __init__(
self,
dim: int,
max_seq_len: int = 2048,
base: float = 10000.0
):
super().__init__()
self.dim = dim
self.max_seq_len = max_seq_len
self.base = base
# 预计算频率
inv_freq = 1.0 / (
base ** (torch.arange(0, dim, 2).float() / dim)
)
self.register_buffer('inv_freq', inv_freq)
def forward(
self,
seq_len: int,
device: torch.device
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
生成cos和sin缓存
Returns:
cos_cached: [seq_len, dim]
sin_cached: [seq_len, dim]
"""
# 位置索引
t = torch.arange(seq_len, device=device).type_as(self.inv_freq)
# 计算频率 * 位置
freqs = torch.einsum('i,j->ij', t, self.inv_freq)
# 拼接以匹配完整维度
emb = torch.cat((freqs, freqs), dim=-1)
return emb.cos(), emb.sin()
def rotate_half(x: torch.Tensor) -> torch.Tensor:
"""将特征维度分为两半并旋转"""
x1, x2 = x[..., :x.shape[-1] // 2], x[..., x.shape[-1] // 2:]
return torch.cat((-x2, x1), dim=-1)
def apply_rotary_pos_emb(
q: torch.Tensor,
k: torch.Tensor,
cos: torch.Tensor,
sin: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
应用旋转位置编码
Args:
q, k: [batch, heads, seq_len, dim]
cos, sin: [seq_len, dim]
"""
# 扩展维度以匹配q, k
cos = cos[None, None, :, :] # [1, 1, seq_len, dim]
sin = sin[None, None, :, :]
# 应用旋转
q_embed = (q * cos) + (rotate_half(q) * sin)
k_embed = (k * cos) + (rotate_half(k) * sin)
return q_embed, k_embed
class RoPEScaledAttention(nn.Module):
"""带有RoPE的注意力层,支持长度外推"""
def __init__(
self,
d_model: int,
num_heads: int,
max_seq_len: int = 2048,
rope_base: float = 10000.0,
rope_scaling: Optional[dict] = None
):
super().__init__()
self.d_model = d_model
self.num_heads = num_heads
self.d_head = d_model // num_heads
# QKV投影
self.q_proj = nn.Linear(d_model, d_model, bias=False)
self.k_proj = nn.Linear(d_model, d_model, bias=False)
self.v_proj = nn.Linear(d_model, d_model, bias=False)
self.o_proj = nn.Linear(d_model, d_model, bias=False)
# RoPE
self.rope = RotaryPositionEmbedding(
self.d_head,
max_seq_len,
rope_base
)
# RoPE缩放(用于长度外推)
self.rope_scaling = rope_scaling
if rope_scaling:
self.rope_scale_factor = rope_scaling.get('factor', 1.0)
self.rope_scale_type = rope_scaling.get('type', 'linear')
else:
self.rope_scale_factor = 1.0
self.rope_scale_type = None
def forward(
self,
hidden_states: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None
) -> torch.Tensor:
batch_size, seq_len, _ = hidden_states.shape
# QKV投影
Q = self.q_proj(hidden_states)
K = self.k_proj(hidden_states)
V = self.v_proj(hidden_states)
# 重塑为多头
Q = Q.view(batch_size, seq_len, self.num_heads, self.d_head)
K = K.view(batch_size, seq_len, self.num_heads, self.d_head)
V = V.view(batch_size, seq_len, self.num_heads, self.d_head)
# 转置: [batch, heads, seq_len, d_head]
Q = Q.transpose(1, 2)
K = K.transpose(1, 2)
V = V.transpose(1, 2)
# 应用RoPE
cos, sin = self.rope(seq_len, hidden_states.device)
# RoPE缩放(用于外推)
if self.rope_scaling:
cos, sin = self._apply_rope_scaling(cos, sin, seq_len)
Q, K = apply_rotary_pos_emb(Q, K, cos, sin)
# 计算注意力
attn_output = self._compute_attention(Q, K, V, attention_mask)
# 输出投影
attn_output = attn_output.transpose(1, 2).contiguous()
attn_output = attn_output.view(batch_size, seq_len, self.d_model)
output = self.o_proj(attn_output)
return output
def _apply_rope_scaling(
self,
cos: torch.Tensor,
sin: torch.Tensor,
seq_len: int
) -> Tuple[torch.Tensor, torch.Tensor]:
"""应用RoPE缩放以支持长度外推"""
if self.rope_scale_type == 'linear':
# 线性插值
max_seq_len = self.rope.max_seq_len
if seq_len > max_seq_len:
# 插值到更长的序列
scale = seq_len / max_seq_len
positions = torch.arange(seq_len, device=cos.device) / scale
# 重新计算频率
freqs = torch.einsum(
'i,j->ij',
positions,
self.rope.inv_freq
)
emb = torch.cat((freqs, freqs), dim=-1)
cos = emb.cos()
sin = emb.sin()
elif self.rope_scale_type == 'dynamic':
# 动态NTK插值
scale = self.rope_scale_factor
max_seq_len = self.rope.max_seq_len * scale
# 调整base以保持高频信息
new_base = self.rope.base * (
(scale * seq_len / max_seq_len) - (scale - 1)
) ** (self.d_head / (self.d_head - 2))
inv_freq = 1.0 / (
new_base ** (
torch.arange(0, self.d_head, 2, device=cos.device).float()
/ self.d_head
)
)
t = torch.arange(seq_len, device=cos.device).type_as(inv_freq)
freqs = torch.einsum('i,j->ij', t, inv_freq)
emb = torch.cat((freqs, freqs), dim=-1)
cos = emb.cos()
sin = emb.sin()
return cos, sin
def _compute_attention(
self,
Q: torch.Tensor,
K: torch.Tensor,
V: torch.Tensor,
mask: Optional[torch.Tensor] = None
) -> torch.Tensor:
"""计算缩放点积注意力"""
scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_head)
if mask is not None:
scores = scores + mask
attn_weights = torch.softmax(scores, dim=-1)
output = torch.matmul(attn_weights, V)
return output
# 测试RoPE外推
def test_rope_extrapolation():
"""测试RoPE的长度外推能力"""
d_model = 768
num_heads = 12
train_seq_len = 2048
test_seq_len = 8192 # 4倍于训练长度
# 创建模型(带RoPE缩放)
rope_scaling = {
'type': 'dynamic',
'factor': 4.0
}
model = RoPEScaledAttention(
d_model=d_model,
num_heads=num_heads,
max_seq_len=train_seq_len,
rope_scaling=rope_scaling
)
# 测试短序列(训练长度)
short_input = torch.randn(1, train_seq_len, d_model)
short_output = model(short_input)
print(f"短序列 ({train_seq_len}): 输出形状 {short_output.shape}")
# 测试长序列(外推)
long_input = torch.randn(1, test_seq_len, d_model)
long_output = model(long_input)
print(f"长序列 ({test_seq_len}): 输出形状 {long_output.shape}")
print("\nRoPE外推成功!")
if __name__ == "__main__":
test_rope_extrapolation()
18.2.3 窗口注意力与FlashAttention
滑动窗口注意力:
import torch
import torch.nn as nn
from typing import Optional
def sliding_window_attention(
Q: torch.Tensor,
K: torch.Tensor,
V: torch.Tensor,
window_size: int = 512
) -> torch.Tensor:
"""
滑动窗口注意力
每个token只关注窗口内的其他tokens
复杂度: O(n * w * d),其中w是窗口大小
Args:
Q, K, V: [batch, heads, seq_len, d_head]
window_size: 窗口大小
"""
batch_size, num_heads, seq_len, d_head = Q.shape
# 创建滑动窗口mask
mask = torch.ones(seq_len, seq_len, device=Q.device)
for i in range(seq_len):
start = max(0, i - window_size // 2)
end = min(seq_len, i + window_size // 2 + 1)
mask[i, start:end] = 0
# 注意力计算
scores = torch.matmul(Q, K.transpose(-2, -1)) / (d_head ** 0.5)
scores = scores.masked_fill(mask.bool(), float('-inf'))
attn_weights = torch.softmax(scores, dim=-1)
output = torch.matmul(attn_weights, V)
return output
class FlashAttention(nn.Module):
"""
FlashAttention-2 简化实现
核心思想:
1. 分块计算,减少HBM访问
2. 在线softmax,避免存储完整注意力矩阵
3. 重计算而非存储中间结果
"""
def __init__(self, dropout: float = 0.0):
super().__init__()
self.dropout = dropout
def forward(
self,
Q: torch.Tensor,
K: torch.Tensor,
V: torch.Tensor,
causal: bool = False,
block_size: int = 256
) -> torch.Tensor:
"""
FlashAttention前向传播
Args:
Q, K, V: [batch, heads, seq_len, d_head]
causal: 是否使用因果mask
block_size: 分块大小
"""
batch_size, num_heads, seq_len, d_head = Q.shape
# 分块数量
num_blocks = (seq_len + block_size - 1) // block_size
# 输出缓冲区
O = torch.zeros_like(Q)
l = torch.zeros(batch_size, num_heads, seq_len, 1, device=Q.device)
m = torch.full(
(batch_size, num_heads, seq_len, 1),
float('-inf'),
device=Q.device
)
# 分块处理
for i in range(num_blocks):
# Q块
q_start = i * block_size
q_end = min((i + 1) * block_size, seq_len)
Q_block = Q[:, :, q_start:q_end, :]
# 初始化块级统计
O_block = torch.zeros_like(Q_block)
l_block = torch.zeros(
batch_size, num_heads, q_end - q_start, 1,
device=Q.device
)
m_block = torch.full(
(batch_size, num_heads, q_end - q_start, 1),
float('-inf'),
device=Q.device
)
for j in range(num_blocks):
# 因果mask检查
if causal and j > i:
continue
# K, V块
k_start = j * block_size
k_end = min((j + 1) * block_size, seq_len)
K_block = K[:, :, k_start:k_end, :]
V_block = V[:, :, k_start:k_end, :]
# 计算注意力分数
S_block = torch.matmul(Q_block, K_block.transpose(-2, -1))
S_block = S_block / (d_head ** 0.5)
# 因果mask
if causal:
block_mask = torch.triu(
torch.ones(
q_end - q_start, k_end - k_start,
device=Q.device
),
diagonal=k_start - q_start + 1
)
S_block = S_block.masked_fill(
block_mask.bool(),
float('-inf')
)
# 在线softmax更新
m_block_new = torch.maximum(
m_block,
S_block.max(dim=-1, keepdim=True)[0]
)
# 更新指数和
alpha = torch.exp(m_block - m_block_new)
beta = torch.exp(S_block - m_block_new)
l_block_new = alpha * l_block + beta.sum(dim=-1, keepdim=True)
# 更新输出
O_block = (
alpha * O_block +
torch.matmul(beta, V_block)
)
m_block = m_block_new
l_block = l_block_new
# 归一化
O_block = O_block / l_block
# 写回全局输出
O[:, :, q_start:q_end, :] = O_block
return O
# 性能对比
def compare_attention_methods():
"""对比不同注意力方法的性能"""
import time
batch_size = 2
num_heads = 12
seq_len = 4096
d_head = 64
Q = torch.randn(batch_size, num_heads, seq_len, d_head, device='cuda')
K = torch.randn(batch_size, num_heads, seq_len, d_head, device='cuda')
V = torch.randn(batch_size, num_heads, seq_len, d_head, device='cuda')
# 标准注意力
torch.cuda.synchronize()
start = time.time()
standard_out, _ = standard_attention(Q, K, V)
torch.cuda.synchronize()
standard_time = time.time() - start
# 滑动窗口注意力
torch.cuda.synchronize()
start = time.time()
window_out = sliding_window_attention(Q, K, V, window_size=512)
torch.cuda.synchronize()
window_time = time.time() - start
# FlashAttention
flash_attn = FlashAttention().cuda()
torch.cuda.synchronize()
start = time.time()
flash_out = flash_attn(Q, K, V)
torch.cuda.synchronize()
flash_time = time.time() - start
print(f"序列长度: {seq_len}")
print(f"标准注意力: {standard_time:.4f}s")
print(f"窗口注意力: {window_time:.4f}s ({standard_time/window_time:.2f}x)")
print(f"FlashAttention: {flash_time:.4f}s ({standard_time/flash_time:.2f}x)")
if __name__ == "__main__":
if torch.cuda.is_available():
compare_attention_methods()
18.2.4 实际长上下文模型
Claude 3 (Anthropic, 2024)
能力:
- 上下文窗口: 200K tokens
- 实际测试: 能准确检索200K文档中的信息
- "大海捞针"测试: 接近完美
技术:
- 位置编码改进
- 注意力优化
- KV Cache压缩
应用:
- 分析整本书籍
- 理解大型代码库
- 长时间对话记忆
Gemini 1.5 Pro (Google, 2024)
能力:
- 上下文窗口: 1M tokens (100万!)
- 可处理:
- 11小时音频
- 1小时视频
- 700K代码行
- 多本书籍
技术:
- MoE架构
- 多模态理解
- 高效注意力机制
突破:
- 首个百万token级模型
- 多模态长上下文统一
GPT-4 Turbo (OpenAI, 2023-2024)
能力:
- 上下文窗口: 128K tokens
- 相当于300页文本
应用:
- 长文档摘要
- 复杂代码审查
- 持续对话
价格:
- 输入: $0.01/1K tokens
- 输出: $0.03/1K tokens
18.3 模型量化与小型化
模型量化是将模型部署到边缘设备的关键技术。
18.3.1 量化原理
量化基础:
import torch
import torch.nn as nn
import numpy as np
from typing import Tuple
def quantize_tensor(tensor: torch.Tensor, num_bits: int = 8) -> Tuple[torch.Tensor, float, float]:
"""
对称量化
量化公式:
q = round(x / scale)
x_reconstructed = q * scale
Args:
tensor: 原始浮点张量
num_bits: 量化位数
Returns:
quantized: 量化后的整数张量
scale: 量化比例
zero_point: 零点
"""
# 计算范围
qmin = -(2 ** (num_bits - 1))
qmax = 2 ** (num_bits - 1) - 1
# 计算scale
min_val = tensor.min().item()
max_val = tensor.max().item()
scale = (max_val - min_val) / (qmax - qmin)
zero_point = qmin - min_val / scale
# 量化
quantized = torch.clamp(
torch.round(tensor / scale + zero_point),
qmin, qmax
).to(torch.int8)
return quantized, scale, zero_point
def dequantize_tensor(
quantized: torch.Tensor,
scale: float,
zero_point: float
) -> torch.Tensor:
"""反量化"""
return (quantized.float() - zero_point) * scale
# INT8量化示例
def int8_quantization_example():
"""INT8量化示例"""
# 原始权重
weight = torch.randn(512, 512) * 0.1
# 量化
weight_q, scale, zero_point = quantize_tensor(weight, num_bits=8)
# 反量化
weight_dq = dequantize_tensor(weight_q, scale, zero_point)
# 计算误差
error = (weight - weight_dq).abs().mean()
# 内存节省
original_size = weight.element_size() * weight.nelement() / 1024 # KB
quantized_size = weight_q.element_size() * weight_q.nelement() / 1024
print(f"原始权重: {original_size:.2f} KB (FP32)")
print(f"量化权重: {quantized_size:.2f} KB (INT8)")
print(f"压缩比: {original_size / quantized_size:.2f}x")
print(f"平均误差: {error:.6f}")
if __name__ == "__main__":
int8_quantization_example()
18.3.2 GPTQ量化
GPTQ (Accurate Post-Training Quantization for Generative Pre-trained Transformers) 是一种高精度的权重量化方法。
import torch
import torch.nn as nn
from typing import List, Tuple
import math
class GPTQ:
"""
GPTQ量化器
论文: GPTQ: Accurate Post-Training Quantization for Generative Pre-trained Transformers
核心思想:
1. 逐层量化,最小化重构误差
2. 使用Hessian信息指导量化
3. 贪心逐列优化
"""
def __init__(
self,
layer: nn.Linear,
bits: int = 4,
group_size: int = 128,
actorder: bool = True
):
self.layer = layer
self.bits = bits
self.group_size = group_size
self.actorder = actorder
self.dev = layer.weight.device
self.rows = layer.weight.shape[0]
self.columns = layer.weight.shape[1]
# Hessian矩阵(近似)
self.H = torch.zeros(
(self.columns, self.columns),
device=self.dev
)
self.nsamples = 0
def add_batch(self, inp: torch.Tensor):
"""
添加一批样本来估计Hessian矩阵
Args:
inp: [batch_size, seq_len, in_features]
"""
if len(inp.shape) == 3:
inp = inp.reshape(-1, inp.shape[-1])
tmp = inp.shape[0]
# 计算H = 2 * X^T X
inp = math.sqrt(2 / self.nsamples) * inp.float()
self.H += inp.t().matmul(inp)
self.nsamples += tmp
def quantize(self) -> Tuple[torch.Tensor, torch.Tensor]:
"""
执行GPTQ量化
Returns:
quantized_weight: 量化后的权重
scales: 量化scale
"""
W = self.layer.weight.data.clone()
# 确保Hessian已经计算
if self.nsamples == 0:
raise ValueError("请先调用add_batch添加样本")
# 计算量化参数
qmin = -(2 ** (self.bits - 1))
qmax = 2 ** (self.bits - 1) - 1
# 初始化
H = self.H
dead = torch.diag(H) == 0
H[dead, dead] = 1
W[:, dead] = 0
# Cholesky分解
try:
damp = 0.01 * torch.mean(torch.diag(H))
diag = torch.arange(self.columns, device=self.dev)
H[diag, diag] += damp
H = torch.linalg.cholesky(H)
H = torch.cholesky_inverse(H)
H = torch.linalg.cholesky(H, upper=True)
Hinv = H
except:
print("Cholesky分解失败,使用对角近似")
Hinv = torch.diag(1.0 / torch.diag(H))
# 量化顺序
if self.actorder:
perm = torch.argsort(torch.diag(H), descending=True)
W = W[:, perm]
Hinv = Hinv[perm][:, perm]
# 分组量化
Q = torch.zeros_like(W)
scales = torch.zeros(
(self.rows, (self.columns + self.group_size - 1) // self.group_size),
device=self.dev
)
Err = torch.zeros_like(W)
for i1 in range(0, self.columns, self.group_size):
i2 = min(i1 + self.group_size, self.columns)
count = i2 - i1
W1 = W[:, i1:i2].clone()
Q1 = torch.zeros_like(W1)
Err1 = torch.zeros_like(W1)
H1 = Hinv[i1:i2, i1:i2]
for i in range(count):
w = W1[:, i]
d = H1[i, i]
# 计算scale
scale = w.abs().max() / qmax
scales[:, i1 // self.group_size] = scale
# 量化
q = torch.clamp(
torch.round(w / scale),
qmin, qmax
)
Q1[:, i] = q * scale
# 更新误差
err = (w - Q1[:, i]) / d
Err1[:, i] = err
# 传播误差到后续列
W1[:, i:] -= err.unsqueeze(1).matmul(H1[i, i:].unsqueeze(0))
Q[:, i1:i2] = Q1
Err[:, i1:i2] = Err1
# 恢复原始顺序
if self.actorder:
invperm = torch.argsort(perm)
Q = Q[:, invperm]
# 转换为整数
Q_int = torch.zeros_like(Q, dtype=torch.int8)
for i in range(scales.shape[1]):
i1 = i * self.group_size
i2 = min(i1 + self.group_size, self.columns)
scale = scales[:, i:i+1]
Q_int[:, i1:i2] = torch.clamp(
torch.round(Q[:, i1:i2] / scale),
qmin, qmax
).to(torch.int8)
return Q_int, scales
# 使用GPTQ量化模型
def quantize_model_with_gptq(
model: nn.Module,
calibration_data: torch.Tensor,
bits: int = 4
) -> nn.Module:
"""
使用GPTQ量化整个模型
Args:
model: 要量化的模型
calibration_data: 校准数据
bits: 量化位数
"""
# 收集所有Linear层
linear_layers = []
for name, module in model.named_modules():
if isinstance(module, nn.Linear):
linear_layers.append((name, module))
print(f"找到 {len(linear_layers)} 个线性层")
# 逐层量化
for name, layer in linear_layers:
print(f"量化层: {name}")
# 创建GPTQ量化器
gptq = GPTQ(layer, bits=bits)
# 收集激活值(这里简化,实际需要hook)
# 实际应用中需要在前向传播时收集
with torch.no_grad():
fake_input = torch.randn(
calibration_data.shape[0],
layer.in_features,
device=layer.weight.device
)
gptq.add_batch(fake_input)
# 量化
q_weight, scales = gptq.quantize()
# 替换权重(这里简化,实际需要自定义量化层)
print(f" 原始: {layer.weight.shape}, 量化: {q_weight.shape}")
return model
# 示例
def gptq_example():
"""GPTQ量化示例"""
# 创建简单模型
model = nn.Sequential(
nn.Linear(512, 1024),
nn.ReLU(),
nn.Linear(1024, 512)
)
# 校准数据
calib_data = torch.randn(100, 512)
# 量化
quantized_model = quantize_model_with_gptq(model, calib_data, bits=4)
print("\nGPTQ量化完成!")
if __name__ == "__main__":
gptq_example()
18.3.3 AWQ量化
AWQ (Activation-aware Weight Quantization) 通过保护重要权重来提高量化精度。
import torch
import torch.nn as nn
from typing import Dict, List
class AWQ:
"""
AWQ量化器
论文: AWQ: Activation-aware Weight Quantization for LLM Compression and Acceleration
核心思想:
1. 识别对激活值影响大的权重通道
2. 对这些通道应用较小的量化力度
3. 使用逐通道scale
"""
def __init__(
self,
layer: nn.Linear,
bits: int = 4,
group_size: int = 128,
alpha: float = 0.5
):
self.layer = layer
self.bits = bits
self.group_size = group_size
self.alpha = alpha
self.dev = layer.weight.device
# 激活值统计
self.act_scales = None
self.nsamples = 0
def collect_activations(self, inp: torch.Tensor):
"""
收集激活值统计
Args:
inp: [batch_size, seq_len, in_features]
"""
if len(inp.shape) == 3:
inp = inp.reshape(-1, inp.shape[-1])
# 计算每个通道的激活幅度
scales = inp.abs().mean(dim=0)
if self.act_scales is None:
self.act_scales = scales
else:
# 指数移动平均
self.act_scales = 0.9 * self.act_scales + 0.1 * scales
self.nsamples += inp.shape[0]
def compute_scaling_factors(self) -> torch.Tensor:
"""
计算每个通道的缩放因子
Returns:
s: [in_features] 缩放因子
"""
if self.act_scales is None:
raise ValueError("请先收集激活值")
# 基于激活值重要性的缩放
s = self.act_scales.pow(self.alpha)
# 归一化(保持整体scale)
s = s / s.mean()
return s
def quantize(self) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""
执行AWQ量化
Returns:
quantized_weight: 量化后的权重
scales: 量化scale
scaling_factors: 通道缩放因子
"""
W = self.layer.weight.data.clone()
# 计算缩放因子
s = self.compute_scaling_factors()
# 应用缩放
W_scaled = W * s.view(1, -1)
# 分组量化
qmin = -(2 ** (self.bits - 1))
qmax = 2 ** (self.bits - 1) - 1
out_features, in_features = W_scaled.shape
num_groups = (in_features + self.group_size - 1) // self.group_size
Q = torch.zeros_like(W_scaled, dtype=torch.int8)
scales = torch.zeros(out_features, num_groups, device=self.dev)
for g in range(num_groups):
start = g * self.group_size
end = min(start + self.group_size, in_features)
W_group = W_scaled[:, start:end]
# 计算scale(逐通道)
scale = W_group.abs().max(dim=1)[0] / qmax
scales[:, g] = scale
# 量化
Q[:, start:end] = torch.clamp(
torch.round(W_group / scale.view(-1, 1)),
qmin, qmax
).to(torch.int8)
return Q, scales, s
class AWQLinear(nn.Module):
"""AWQ量化的线性层"""
def __init__(
self,
in_features: int,
out_features: int,
bits: int = 4,
group_size: int = 128
):
super().__init__()
self.in_features = in_features
self.out_features = out_features
self.bits = bits
self.group_size = group_size
# 量化权重
self.register_buffer(
'qweight',
torch.zeros(
(out_features, in_features),
dtype=torch.int8
)
)
# Scales
num_groups = (in_features + group_size - 1) // group_size
self.register_buffer(
'scales',
torch.zeros((out_features, num_groups))
)
# 通道缩放因子
self.register_buffer(
'scaling_factors',
torch.ones(in_features)
)
if hasattr(nn.Linear(1, 1), 'bias'):
self.register_buffer('bias', None)
def pack_weights(
self,
qweight: torch.Tensor,
scales: torch.Tensor,
scaling_factors: torch.Tensor
):
"""打包量化权重"""
self.qweight = qweight
self.scales = scales
self.scaling_factors = scaling_factors
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
前向传播
Args:
x: [batch_size, seq_len, in_features]
"""
# 应用输入缩放
x_scaled = x / self.scaling_factors.view(1, 1, -1)
# 反量化权重并计算
out_features, in_features = self.qweight.shape
num_groups = self.scales.shape[1]
output = torch.zeros(
x.shape[0], x.shape[1], out_features,
device=x.device, dtype=x.dtype
)
for g in range(num_groups):
start = g * self.group_size
end = min(start + self.group_size, in_features)
# 反量化
W_dequant = (
self.qweight[:, start:end].float() *
self.scales[:, g:g+1]
)
# 计算
output += torch.matmul(
x_scaled[:, :, start:end],
W_dequant.t()
)
if self.bias is not None:
output += self.bias
return output
# 使用示例
def awq_example():
"""AWQ量化示例"""
# 原始层
layer = nn.Linear(512, 1024).cuda()
# 创建AWQ量化器
awq = AWQ(layer, bits=4, group_size=128)
# 收集激活值
for _ in range(10):
fake_input = torch.randn(32, 128, 512).cuda()
awq.collect_activations(fake_input)
# 量化
qweight, scales, scaling_factors = awq.quantize()
# 创建量化层
q_layer = AWQLinear(512, 1024, bits=4, group_size=128).cuda()
q_layer.pack_weights(qweight, scales, scaling_factors)
# 测试
test_input = torch.randn(1, 10, 512).cuda()
with torch.no_grad():
original_output = layer(test_input)
quantized_output = q_layer(test_input)
error = (original_output - quantized_output).abs().mean()
print(f"量化误差: {error:.6f}")
# 内存节省
original_size = layer.weight.nelement() * layer.weight.element_size() / 1024 / 1024
quantized_size = (
qweight.nelement() * qweight.element_size() +
scales.nelement() * scales.element_size()
) / 1024 / 1024
print(f"原始大小: {original_size:.2f} MB")
print(f"量化大小: {quantized_size:.2f} MB")
print(f"压缩比: {original_size / quantized_size:.2f}x")
if __name__ == "__main__":
if torch.cuda.is_available():
awq_example()
18.3.4 知识蒸馏
知识蒸馏是模型小型化的重要技术。
import torch
import torch.nn as nn
import torch.nn.functional as F
class DistillationLoss(nn.Module):
"""
知识蒸馏损失
结合硬标签和软标签
"""
def __init__(
self,
temperature: float = 3.0,
alpha: float = 0.5
):
"""
Args:
temperature: 软化温度
alpha: 软标签权重
"""
super().__init__()
self.temperature = temperature
self.alpha = alpha
def forward(
self,
student_logits: torch.Tensor,
teacher_logits: torch.Tensor,
labels: torch.Tensor
) -> torch.Tensor:
"""
计算蒸馏损失
Args:
student_logits: 学生模型输出 [batch, num_classes]
teacher_logits: 教师模型输出 [batch, num_classes]
labels: 真实标签 [batch]
"""
# 硬标签损失
hard_loss = F.cross_entropy(student_logits, labels)
# 软标签损失(KL散度)
soft_student = F.log_softmax(
student_logits / self.temperature,
dim=-1
)
soft_teacher = F.softmax(
teacher_logits / self.temperature,
dim=-1
)
soft_loss = F.kl_div(
soft_student,
soft_teacher,
reduction='batchmean'
) * (self.temperature ** 2)
# 组合损失
total_loss = (
self.alpha * soft_loss +
(1 - self.alpha) * hard_loss
)
return total_loss
class DistillationTrainer:
"""知识蒸馏训练器"""
def __init__(
self,
teacher_model: nn.Module,
student_model: nn.Module,
temperature: float = 3.0,
alpha: float = 0.5,
learning_rate: float = 1e-4
):
self.teacher = teacher_model
self.student = student_model
# 冻结教师模型
for param in self.teacher.parameters():
param.requires_grad = False
self.teacher.eval()
# 损失和优化器
self.criterion = DistillationLoss(temperature, alpha)
self.optimizer = torch.optim.AdamW(
self.student.parameters(),
lr=learning_rate
)
def train_step(
self,
inputs: torch.Tensor,
labels: torch.Tensor
) -> float:
"""单步训练"""
self.student.train()
# 教师模型推理(不计算梯度)
with torch.no_grad():
teacher_logits = self.teacher(inputs)
# 学生模型推理
student_logits = self.student(inputs)
# 计算损失
loss = self.criterion(student_logits, teacher_logits, labels)
# 反向传播
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
def evaluate(
self,
dataloader: torch.utils.data.DataLoader
) -> Dict[str, float]:
"""评估模型"""
self.student.eval()
total_loss = 0
correct = 0
total = 0
with torch.no_grad():
for inputs, labels in dataloader:
# 推理
teacher_logits = self.teacher(inputs)
student_logits = self.student(inputs)
# 损失
loss = self.criterion(
student_logits,
teacher_logits,
labels
)
total_loss += loss.item()
# 准确率
_, predicted = student_logits.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
return {
'loss': total_loss / len(dataloader),
'accuracy': 100. * correct / total
}
# 完整示例:蒸馏BERT到TinyBERT
def distill_bert_example():
"""BERT知识蒸馏示例"""
from transformers import (
BertForSequenceClassification,
BertConfig,
BertTokenizer
)
# 教师模型(BERT-base)
teacher_config = BertConfig(
hidden_size=768,
num_hidden_layers=12,
num_attention_heads=12,
intermediate_size=3072,
num_labels=2
)
teacher = BertForSequenceClassification(teacher_config)
# 学生模型(TinyBERT)
student_config = BertConfig(
hidden_size=312,
num_hidden_layers=4,
num_attention_heads=12,
intermediate_size=1200,
num_labels=2
)
student = BertForSequenceClassification(student_config)
# 计算参数量
teacher_params = sum(p.numel() for p in teacher.parameters())
student_params = sum(p.numel() for p in student.parameters())
print(f"教师模型参数: {teacher_params:,} ({teacher_params/1e6:.1f}M)")
print(f"学生模型参数: {student_params:,} ({student_params/1e6:.1f}M)")
print(f"压缩比: {teacher_params/student_params:.2f}x")
# 创建蒸馏训练器
trainer = DistillationTrainer(
teacher_model=teacher,
student_model=student,
temperature=3.0,
alpha=0.7
)
# 模拟训练数据
fake_inputs = torch.randint(0, 30000, (32, 128))
fake_labels = torch.randint(0, 2, (32,))
# 训练一步
loss = trainer.train_step(fake_inputs, fake_labels)
print(f"\n训练损失: {loss:.4f}")
if __name__ == "__main__":
distill_bert_example()
18.3.5 边缘部署:MLC-LLM
MLC-LLM (Machine Learning Compilation for LLM) 是用于边缘设备部署的框架。
# MLC-LLM配置示例
"""
MLC-LLM部署流程:
1. 模型量化
2. 编译优化
3. 移动端部署
支持平台:
- iOS (iPhone, iPad)
- Android
- WebGPU (浏览器)
- CUDA, Metal, Vulkan
"""
# 配置文件 mlc-config.json
mlc_config = {
"model_name": "Llama-2-7B-Chat",
"quantization": {
"mode": "q4f16_1", # 4bit权重,16bit激活
"group_size": 32,
"symmetric": True
},
"context_window_size": 4096,
"prefill_chunk_size": 2048,
"tensor_parallel_shards": 1,
"max_batch_size": 1
}
# Python部署代码
def deploy_with_mlc():
"""使用MLC-LLM部署模型"""
from mlc_chat import ChatModule
from mlc_chat.callback import StreamToStdout
# 加载模型
cm = ChatModule(
model="Llama-2-7b-chat-hf-q4f16_1",
device="cuda:0"
)
# 推理
output = cm.generate(
prompt="What is the meaning of life?",
progress_callback=StreamToStdout()
)
print(output)
# iOS部署示例 (Swift)
ios_code = """
import MLCChat
class ChatViewModel: ObservableObject {
private var chat: ChatModule?
@Published var messages: [String] = []
init() {
// 加载模型
chat = ChatModule(modelPath: "Llama-2-7b-chat-q4f16_1")
}
func sendMessage(_ text: String) {
chat?.generate(prompt: text) { response in
DispatchQueue.main.async {
self.messages.append(response)
}
}
}
}
"""
# Android部署示例 (Kotlin)
android_code = """
import ai.mlc.mlcchat.ChatModule
class ChatViewModel : ViewModel() {
private lateinit var chatModule: ChatModule
private val _messages = MutableLiveData<List<String>>()
val messages: LiveData<List<String>> = _messages
init {
// 加载模型
chatModule = ChatModule(
modelPath = "Llama-2-7b-chat-q4f16_1",
device = "opencl"
)
}
fun sendMessage(text: String) {
viewModelScope.launch {
val response = chatModule.generate(text)
_messages.value = _messages.value?.plus(response)
}
}
}
"""
print("MLC-LLM部署配置已生成")
print("\n支持的量化模式:")
print("- q3f16_1: 3bit权重")
print("- q4f16_1: 4bit权重(推荐)")
print("- q4f16_0: 4bit权重(快速)")
print("- q0f32: 无量化(精度最高)")
实际应用数据:
- INT8量化:2x内存压缩,1.5-2x推理加速,精度下降<1%
- INT4量化(GPTQ/AWQ):4x压缩,3x加速,精度下降1-3%
- 知识蒸馏:10-20x压缩(BERT→TinyBERT),精度下降2-5%
- MLC-LLM:在iPhone 14 Pro上运行Llama-2-7B,约10 tokens/s
18.4 Diffusion模型
扩散模型revolutionize了图像生成领域。
18.4.1 扩散模型原理
核心数学:
import torch
import torch.nn as nn
import math
from typing import Tuple
class DDPMScheduler:
"""
DDPM (Denoising Diffusion Probabilistic Models) 调度器
前向过程(加噪):
q(x_t | x_0) = N(x_t; √(ᾱ_t) x_0, (1 - ᾱ_t)I)
反向过程(去噪):
p_θ(x_{t-1} | x_t) = N(x_{t-1}; μ_θ(x_t, t), Σ_θ(x_t, t))
"""
def __init__(
self,
num_train_timesteps: int = 1000,
beta_start: float = 0.0001,
beta_end: float = 0.02,
beta_schedule: str = "linear"
):
self.num_train_timesteps = num_train_timesteps
# 生成beta schedule
if beta_schedule == "linear":
self.betas = torch.linspace(
beta_start, beta_end, num_train_timesteps
)
elif beta_schedule == "cosine":
self.betas = self._cosine_beta_schedule(
num_train_timesteps
)
else:
raise ValueError(f"Unknown schedule: {beta_schedule}")
# 预计算常用项
self.alphas = 1.0 - self.betas
self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
self.alphas_cumprod_prev = torch.cat([
torch.tensor([1.0]),
self.alphas_cumprod[:-1]
])
# 用于采样的常数
self.sqrt_alphas_cumprod = torch.sqrt(self.alphas_cumprod)
self.sqrt_one_minus_alphas_cumprod = torch.sqrt(
1.0 - self.alphas_cumprod
)
# 后验方差
self.posterior_variance = (
self.betas * (1.0 - self.alphas_cumprod_prev) /
(1.0 - self.alphas_cumprod)
)
def _cosine_beta_schedule(self, timesteps: int, s: float = 0.008):
"""Cosine schedule (DDPM改进版本)"""
steps = timesteps + 1
x = torch.linspace(0, timesteps, steps)
alphas_cumprod = torch.cos(
((x / timesteps) + s) / (1 + s) * math.pi * 0.5
) ** 2
alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
return torch.clip(betas, 0.0001, 0.9999)
def add_noise(
self,
original_samples: torch.Tensor,
noise: torch.Tensor,
timesteps: torch.Tensor
) -> torch.Tensor:
"""
前向过程:添加噪声
x_t = √(ᾱ_t) * x_0 + √(1 - ᾱ_t) * ε
"""
sqrt_alpha_prod = self.sqrt_alphas_cumprod[timesteps]
sqrt_one_minus_alpha_prod = self.sqrt_one_minus_alphas_cumprod[
timesteps
]
# Reshape以便广播
while len(sqrt_alpha_prod.shape) < len(original_samples.shape):
sqrt_alpha_prod = sqrt_alpha_prod.unsqueeze(-1)
sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.unsqueeze(-1)
noisy_samples = (
sqrt_alpha_prod * original_samples +
sqrt_one_minus_alpha_prod * noise
)
return noisy_samples
def step(
self,
model_output: torch.Tensor,
timestep: int,
sample: torch.Tensor
) -> torch.Tensor:
"""
反向过程:去噪一步
预测 x_{t-1} 给定 x_t 和 ε_θ(x_t, t)
"""
t = timestep
# 1. 从模型输出预测原始样本
alpha_prod_t = self.alphas_cumprod[t]
beta_prod_t = 1 - alpha_prod_t
# 预测 x_0
pred_original_sample = (
sample - torch.sqrt(beta_prod_t) * model_output
) / torch.sqrt(alpha_prod_t)
# 2. 计算"方向指向x_t"的项
alpha_prod_t_prev = (
self.alphas_cumprod_prev[t] if t > 0
else torch.tensor(1.0)
)
pred_sample_direction = torch.sqrt(
1 - alpha_prod_t_prev
) * model_output
# 3. 计算 x_{t-1}
pred_prev_sample = (
torch.sqrt(alpha_prod_t_prev) * pred_original_sample +
pred_sample_direction
)
# 4. 添加噪声(除了最后一步)
if t > 0:
noise = torch.randn_like(sample)
variance = torch.sqrt(self.posterior_variance[t]) * noise
pred_prev_sample = pred_prev_sample + variance
return pred_prev_sample
class SimpleUNet(nn.Module):
"""简化的UNet去噪网络"""
def __init__(
self,
in_channels: int = 3,
out_channels: int = 3,
channels: int = 128,
time_emb_dim: int = 256
):
super().__init__()
# 时间嵌入
self.time_mlp = nn.Sequential(
nn.Linear(channels, time_emb_dim),
nn.SiLU(),
nn.Linear(time_emb_dim, time_emb_dim)
)
# 简化的UNet结构
self.down1 = nn.Conv2d(in_channels, channels, 3, padding=1)
self.down2 = nn.Conv2d(channels, channels * 2, 3, padding=1)
self.down3 = nn.Conv2d(channels * 2, channels * 4, 3, padding=1)
self.mid = nn.Conv2d(channels * 4, channels * 4, 3, padding=1)
self.up1 = nn.ConvTranspose2d(channels * 4, channels * 2, 2, 2)
self.up2 = nn.ConvTranspose2d(channels * 2, channels, 2, 2)
self.up3 = nn.ConvTranspose2d(channels, channels, 2, 2)
self.out = nn.Conv2d(channels, out_channels, 1)
def sinusoidal_embedding(self, timesteps, dim):
"""正弦位置编码"""
half_dim = dim // 2
emb = math.log(10000) / (half_dim - 1)
emb = torch.exp(
torch.arange(half_dim, device=timesteps.device) * -emb
)
emb = timesteps[:, None] * emb[None, :]
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=-1)
return emb
def forward(self, x, timesteps):
# 时间嵌入
t_emb = self.sinusoidal_embedding(timesteps, 128)
t_emb = self.time_mlp(t_emb)
# UNet处理(简化版)
d1 = self.down1(x)
d2 = self.down2(nn.functional.avg_pool2d(d1, 2))
d3 = self.down3(nn.functional.avg_pool2d(d2, 2))
m = self.mid(nn.functional.avg_pool2d(d3, 2))
u1 = self.up1(m)
u2 = self.up2(u1 + d3)
u3 = self.up3(u2 + d2)
return self.out(u3)
# 训练循环
def train_ddpm():
"""DDPM训练示例"""
device = "cuda" if torch.cuda.is_available() else "cpu"
# 创建模型和调度器
model = SimpleUNet(in_channels=3, out_channels=3).to(device)
scheduler = DDPMScheduler(num_train_timesteps=1000)
# 优化器
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
# 训练步骤
def train_step(images):
optimizer.zero_grad()
batch_size = images.shape[0]
# 随机选择时间步
timesteps = torch.randint(
0, scheduler.num_train_timesteps, (batch_size,),
device=device
)
# 生成噪声
noise = torch.randn_like(images)
# 添加噪声
noisy_images = scheduler.add_noise(images, noise, timesteps)
# 预测噪声
noise_pred = model(noisy_images, timesteps)
# 计算损失
loss = torch.nn.functional.mse_loss(noise_pred, noise)
# 反向传播
loss.backward()
optimizer.step()
return loss.item()
# 模拟训练
for step in range(10):
fake_images = torch.randn(4, 3, 64, 64, device=device)
loss = train_step(fake_images)
print(f"Step {step}: Loss = {loss:.4f}")
# 采样
@torch.no_grad()
def sample(num_samples=1):
model.eval()
# 从纯噪声开始
image = torch.randn(num_samples, 3, 64, 64, device=device)
# 逐步去噪
for t in reversed(range(scheduler.num_train_timesteps)):
timestep = torch.tensor([t], device=device).repeat(num_samples)
# 预测噪声
noise_pred = model(image, timestep)
# 去噪一步
image = scheduler.step(noise_pred, t, image)
if t % 100 == 0:
print(f"Sampling step {t}")
return image
# 生成图像
print("\n生成图像...")
generated = sample(num_samples=4)
print(f"生成完成!形状: {generated.shape}")
if __name__ == "__main__":
train_ddpm()
18.4.2 Stable Diffusion详解
Stable Diffusion架构:
文本 → CLIP Text Encoder → 文本嵌入
↓
潜在扩散(Latent Diffusion):
图像 → VAE Encoder → 潜在空间 (压缩8倍)
潜在空间 + 文本嵌入 → U-Net去噪 → 去噪潜在变量
去噪潜在变量 → VAE Decoder → 生成图像
核心组件:
- CLIP Text Encoder: 理解文本提示
- VAE Encoder/Decoder: 压缩/解压图像
- U-Net: 在潜在空间去噪
- 调度器: 控制去噪过程
from diffusers import StableDiffusionPipeline, DPMSolverMultistepScheduler
import torch
# 使用Stable Diffusion生成图像
def generate_with_sd():
"""使用Stable Diffusion生成图像"""
# 加载模型
model_id = "stabilityai/stable-diffusion-2-1"
pipe = StableDiffusionPipeline.from_pretrained(
model_id,
torch_dtype=torch.float16
)
# 使用更快的调度器
pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config)
# 移到GPU
pipe = pipe.to("cuda")
# 文本提示
prompt = """
A serene landscape at sunset, with mountains in the background,
a calm lake reflecting the sky, and cherry blossoms in the foreground.
Ultra detailed, 8k, cinematic lighting, highly detailed.
"""
negative_prompt = """
blurry, low quality, distorted, ugly, bad anatomy
"""
# 生成
with torch.autocast("cuda"):
images = pipe(
prompt=prompt,
negative_prompt=negative_prompt,
num_inference_steps=25, # 推理步数
guidance_scale=7.5, # CFG scale
width=768,
height=768,
num_images_per_prompt=4 # 生成4张
).images
# 保存
for i, image in enumerate(images):
image.save(f"output_{i}.png")
return images
18.4.3 ControlNet与LoRA
ControlNet - 可控生成:
from diffusers import StableDiffusionControlNetPipeline, ControlNetModel
from diffusers.utils import load_image
import torch
import cv2
import numpy as np
def use_controlnet():
"""使用ControlNet进行可控生成"""
# 加载ControlNet模型(Canny边缘检测)
controlnet = ControlNetModel.from_pretrained(
"lllyasviel/control_v11p_sd15_canny",
torch_dtype=torch.float16
)
# 创建pipeline
pipe = StableDiffusionControlNetPipeline.from_pretrained(
"runwayml/stable-diffusion-v1-5",
controlnet=controlnet,
torch_dtype=torch.float16
)
pipe.to("cuda")
# 准备控制图像
image = load_image("input.jpg")
image = np.array(image)
# Canny边缘检测
low_threshold = 100
high_threshold = 200
image = cv2.Canny(image, low_threshold, high_threshold)
image = image[:, :, None]
image = np.concatenate([image, image, image], axis=2)
control_image = Image.fromarray(image)
# 生成
prompt = "a professional photograph of a beautiful landscape"
output = pipe(
prompt=prompt,
image=control_image,
num_inference_steps=20,
controlnet_conditioning_scale=1.0
).images[0]
output.save("controlnet_output.png")
return output
LoRA - 高效微调:
from diffusers import DiffusionPipeline
import torch
def use_lora():
"""使用LoRA风格化生成"""
# 加载基础模型
pipe = DiffusionPipeline.from_pretrained(
"runwayml/stable-diffusion-v1-5",
torch_dtype=torch.float16
)
pipe.to("cuda")
# 加载LoRA权重
pipe.load_lora_weights("style_lora_weights.safetensors")
# 生成
prompt = "a portrait of a woman in anime style"
images = pipe(
prompt=prompt,
num_inference_steps=30,
guidance_scale=7.5
).images[0]
images.save("lora_output.png")
return images
# 训练自定义LoRA
def train_custom_lora():
"""训练自定义LoRA"""
from peft import LoraConfig, get_peft_model
# LoRA配置
lora_config = LoraConfig(
r=8, # LoRA rank
lora_alpha=32,
target_modules=["to_q", "to_k", "to_v", "to_out.0"],
lora_dropout=0.05,
bias="none"
)
# 应用LoRA到模型
model = StableDiffusionPipeline.from_pretrained(
"runwayml/stable-diffusion-v1-5"
).unet
model = get_peft_model(model, lora_config)
# 训练参数
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
all_params = sum(p.numel() for p in model.parameters())
print(f"可训练参数: {trainable_params:,} ({trainable_params/all_params*100:.2f}%)")
print(f"总参数: {all_params:,}")
return model
18.4.4 SDXL与Stable Diffusion 3
SDXL架构特点:
from diffusers import StableDiffusionXLPipeline
import torch
def use_sdxl():
"""使用SDXL生成高分辨率图像"""
# 加载SDXL
pipe = StableDiffusionXLPipeline.from_pretrained(
"stabilityai/stable-diffusion-xl-base-1.0",
torch_dtype=torch.float16,
variant="fp16",
use_safetensors=True
)
pipe.to("cuda")
# 生成1024x1024图像
prompt = """
masterpiece, best quality, ultra-detailed, 8k wallpaper,
a futuristic city at night with neon lights,
cyberpunk style, highly detailed architecture
"""
negative_prompt = "blurry, low quality, distorted"
images = pipe(
prompt=prompt,
negative_prompt=negative_prompt,
num_inference_steps=40,
guidance_scale=7.5,
width=1024,
height=1024
).images[0]
images.save("sdxl_output.png")
return images
Stable Diffusion 3 特点:
- 使用Diffusion Transformer (DiT)架构
- 多模态理解能力更强
- 文本渲染改进
- 更好的提示词理解
18.5 AI视频生成
AI视频生成是2024年最激动人心的突破之一。
18.5.1 Sora技术解析
Sora (OpenAI, 2024)
能力:
- 最长60秒视频
- 分辨率: 1920×1080
- 多镜头切换
- 复杂场景理解
技术架构:
- Diffusion Transformer (DiT)
- Spacetime Patches (时空补丁)
- 可变分辨率/时长训练
- 视频压缩网络
创新点:
1. 统一的视觉数据表示
2. 端到端学习时空一致性
3. 文本→视频的强大理解
4. 物理世界模拟能力
示例提示词:
"A stylish woman walks down a Tokyo street filled with warm glowing neon
and animated city signage. She wears a black leather jacket, a long red
dress, and black boots, and carries a black purse."
18.5.2 视频生成模型对比
1. Runway Gen-2 & Gen-3
Gen-2 (2023):
- 4秒视频
- 文本/图像→视频
- 商用产品
Gen-3 (2024):
- 10秒视频
- 更高保真度
- 更好的运动控制
- API可用
2. Pika Labs
特点:
- 3秒视频
- 易用性强
- 网页界面
- 图像动画化
Pika 1.0:
- 更长视频
- 编辑功能
- 风格控制
3. Stable Video Diffusion
from diffusers import StableVideoDiffusionPipeline
from diffusers.utils import load_image, export_to_video
import torch
def generate_video_svd():
"""使用SVD生成视频"""
pipe = StableVideoDiffusionPipeline.from_pretrained(
"stabilityai/stable-video-diffusion-img2vid-xt",
torch_dtype=torch.float16,
variant="fp16"
)
pipe.to("cuda")
# 输入图像
image = load_image("input.jpg")
image = image.resize((1024, 576))
# 生成视频帧
frames = pipe(
image,
decode_chunk_size=8,
num_frames=25, # 生成25帧
motion_bucket_id=127, # 运动强度(0-255)
noise_aug_strength=0.02
).frames[0]
# 导出视频
export_to_video(frames, "output.mp4", fps=7)
print(f"生成 {len(frames)} 帧视频")
return frames
18.5.3 视频生成的挑战
技术挑战:
- 时间一致性: 前后帧保持连贯,避免闪烁
- 物理真实性: 运动符合物理规律
- 高分辨率: 视频需要更高质量
- 计算成本: 视频=图像×帧数,成本指数级增长
- 可控性: 精确控制内容、镜头、运动
未来方向:
- 更长时长(数分钟)
- 实时生成
- 交互式编辑
- 多模态控制(文本+草图+音频)
18.6 开源模型生态
18.6.1 LLaMA系列演进
Meta LLaMA:
LLaMA 3.1 (2024.07):
规模: 8B, 70B, 405B
数据: 15T+ tokens
特点:
- 开源最强模型(405B)
- 多语言(8种)
- 128K上下文窗口
- 数学推理增强
性能: 405B接近GPT-4
许可: Llama 3.1 (可商用)
对比:
LLaMA 3.1 405B vs GPT-4:
- MMLU: 88.6% vs 86.4%
- GSM8K: 96.8% vs 92.0%
- HumanEval: 89.0% vs 67.0%
- 成本: 开源免费 vs 闭源付费
18.6.2 中国开源模型
Qwen (通义千问)
Qwen2 (2024.06):
规模: 0.5B - 72B
创新:
- 支持29种语言
- 128K上下文
- 代码和数学大幅提升
- 极强的中文能力
性能:
Qwen2-72B:
- MMLU: 84.2%
- C-Eval (中文): 91.6%
- HumanEval: 64.6%
DeepSeek
DeepSeek-V2 (2024):
架构: MoE (236B总参数, 21B激活)
创新:
- 多头潜在注意力(MLA)
- 极致成本效率
- 128K上下文
性能:
- 接近GPT-4水平
- 训练成本仅GPT-4的1/10
- 开源可商用
18.6.3 开源模型排行榜 (2024)
| 模型 | 参数 | MMLU | AlpacaEval | 许可 | 发布时间 |
|---|---|---|---|---|---|
| LLaMA 3.1 405B | 405B | 88.6% | - | Llama 3.1 | 2024.07 |
| LLaMA 3.1 70B | 70B | 86.0% | 51% | Llama 3.1 | 2024.07 |
| Qwen2 72B | 72B | 84.2% | 42% | Apache 2.0 | 2024.06 |
| Mixtral 8x22B | 141B | 77.8% | 38% | Apache 2.0 | 2024.04 |
| DeepSeek-V2 | 236B | 81.7% | - | MIT | 2024.05 |
| Mistral 7B | 7B | 62.5% | 23% | Apache 2.0 | 2023.09 |
18.7 本章总结
本章深入探讨了AI领域的前沿技术趋势:
核心要点:
- MoE架构: 大模型新范式,平衡规模与效率(Mixtral 8x7B, DeepSeek-V2)
- 长上下文: 从4K到百万token的突破(Gemini 1.5 Pro 1M tokens)
- 模型量化: GPTQ、AWQ等技术实现4x-8x压缩,精度损失<3%
- Diffusion模型: 从DDPM到Stable Diffusion,革命性图像生成
- AI视频生成: Sora引领,60秒高质量视频生成
- 开源生态: LLaMA 3.1 405B等推动AI民主化
实践建议:
- 关注MoE架构在大规模模型中的应用
- 掌握量化技术(GPTQ、AWQ、bitsandbytes),优化部署
- 学习Diffusion原理,应用于创意生成
- 跟踪开源模型进展,选择合适工具
- 实验长上下文技术,拓展应用场景
未来展望:
- MoE将成为主流架构
- 百万级上下文将普及
- 极致量化(INT2/INT1)
- 多模态统一模型
- 实时视频生成
参考资源
论文:
- Mixtral of Experts (Mistral AI, 2023)
- FlashAttention-2 (Dao et al., 2023)
- GPTQ: Accurate Post-Training Quantization (Frantar et al., 2023)
- AWQ: Activation-aware Weight Quantization (Lin et al., 2023)
- Denoising Diffusion Probabilistic Models (Ho et al., 2020)
- High-Resolution Image Synthesis with Latent Diffusion Models (Rombach et al., 2022)
代码库:
- https://github.com/mistralai/mistral-src
- https://github.com/Dao-AILab/flash-attention
- https://github.com/huggingface/diffusers
- https://github.com/vllm-project/vllm
- https://github.com/AutoGPTQ/AutoGPTQ
- https://github.com/mit-han-lab/llm-awq
工具:
- HuggingFace Transformers
- bitsandbytes
- llama.cpp
- vLLM
- AUTOMATIC1111/stable-diffusion-webui