HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 完整学习路径

    • AI教程 - 从零到一的完整学习路径
    • 第00章:AI基础与发展史
    • 第01章:Python与AI开发环境
    • 第02章:数学基础-线性代数与微积分
    • 03-数据集详解-从获取到预处理
    • 04-从零训练第一个模型
    • 05-模型文件详解
    • 06-分布式训练-多GPU与多机
    • 07-模型调度与资源管理
    • 08-Transformer架构深度解析
    • 09-大语言模型原理与架构
    • 10-Token与Tokenization详解
    • 11-Prompt Engineering完全指南
    • 第12章:模型微调与LoRA技术
    • 第13章:RLHF与对齐技术
    • 第14章 AI编程助手原理与实现
    • 15-RAG系统设计与实现
    • 16-Agent智能体与工具调用
    • 17-多模态大模型
    • 第18章:AI前沿技术趋势
    • 第19章 AI热门话题与应用案例

第18章:AI前沿技术趋势

本章深入探讨2024年最前沿的AI技术发展,包括Mixture of Experts架构、长上下文处理、模型量化、扩散模型、AI视频生成等核心技术,并提供完整的代码实现和实践指南。

18.1 MoE架构:Mixture of Experts

Mixture of Experts (MoE) 是一种突破性的模型架构,通过条件计算实现更高效的模型扩展。

18.1.1 MoE核心原理

MoE的核心思想是将一个大模型分解为多个"专家"网络,每次推理只激活其中一部分专家,从而在保持模型容量的同时降低计算成本。

关键概念:

  1. 专家网络(Experts): 多个并行的前馈网络
  2. 门控网络(Gating Network): 决定激活哪些专家
  3. 稀疏激活(Sparse Activation): 每个token只使用Top-K个专家
  4. 负载均衡(Load Balancing): 确保专家使用均匀

数学原理:

给定输入 x,MoE层的输出:
y = Σ(i=1 to N) G(x)_i * E_i(x)

其中:
- G(x) 是门控函数,输出N个专家的权重
- E_i(x) 是第i个专家的输出
- 通常只选择Top-K个专家(K << N)

门控函数:
G(x) = Softmax(TopK(x · W_g))

负载均衡损失:
L_balance = α * Σ(i=1 to N) f_i * P_i

其中:
- f_i 是分配给专家i的token比例
- P_i 是专家i的平均门控概率

18.1.2 完整MoE实现

import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Tuple, Optional
import numpy as np

class Expert(nn.Module):
    """单个专家网络 - 标准的FFN"""
    def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
        super().__init__()
        self.w1 = nn.Linear(d_model, d_ff)
        self.w2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x: [batch_size, seq_len, d_model]
        return self.w2(self.dropout(F.gelu(self.w1(x))))

class TopKGate(nn.Module):
    """Top-K门控网络"""
    def __init__(
        self,
        d_model: int,
        num_experts: int,
        top_k: int = 2,
        capacity_factor: float = 1.25,
        noisy_gate: bool = True
    ):
        super().__init__()
        self.num_experts = num_experts
        self.top_k = top_k
        self.capacity_factor = capacity_factor
        self.noisy_gate = noisy_gate

        # 门控权重
        self.w_gate = nn.Linear(d_model, num_experts, bias=False)

        # 噪声参数(用于训练时的探索)
        if noisy_gate:
            self.w_noise = nn.Linear(d_model, num_experts, bias=False)

    def forward(
        self,
        x: torch.Tensor,
        train: bool = True
    ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        """
        Args:
            x: [batch_size, seq_len, d_model]

        Returns:
            gates: [batch_size, seq_len, top_k] - 门控权重
            indices: [batch_size, seq_len, top_k] - 选中的专家索引
            load: [num_experts] - 每个专家的负载
        """
        # 计算门控logits
        logits = self.w_gate(x)  # [batch_size, seq_len, num_experts]

        # 添加噪声(训练时)
        if self.noisy_gate and train:
            noise_logits = self.w_noise(x)
            noise = torch.randn_like(logits) * F.softplus(noise_logits)
            logits = logits + noise

        # Top-K选择
        gates = F.softmax(logits, dim=-1)
        top_k_gates, top_k_indices = torch.topk(gates, self.top_k, dim=-1)

        # 重新归一化
        top_k_gates = top_k_gates / (top_k_gates.sum(dim=-1, keepdim=True) + 1e-8)

        # 计算负载(用于负载均衡)
        load = self._calculate_load(top_k_indices)

        return top_k_gates, top_k_indices, load

    def _calculate_load(self, indices: torch.Tensor) -> torch.Tensor:
        """计算每个专家的负载"""
        batch_size, seq_len, top_k = indices.shape

        # 统计每个专家被选中的次数
        load = torch.zeros(
            self.num_experts,
            device=indices.device,
            dtype=torch.float32
        )

        indices_flat = indices.view(-1)
        load.scatter_add_(
            0,
            indices_flat,
            torch.ones_like(indices_flat, dtype=torch.float32)
        )

        # 归一化
        load = load / (batch_size * seq_len * top_k)
        return load

class MoELayer(nn.Module):
    """完整的MoE层"""
    def __init__(
        self,
        d_model: int,
        d_ff: int,
        num_experts: int = 8,
        top_k: int = 2,
        dropout: float = 0.1,
        capacity_factor: float = 1.25,
        balance_loss_weight: float = 0.01
    ):
        super().__init__()
        self.num_experts = num_experts
        self.top_k = top_k
        self.balance_loss_weight = balance_loss_weight

        # 创建专家网络
        self.experts = nn.ModuleList([
            Expert(d_model, d_ff, dropout)
            for _ in range(num_experts)
        ])

        # 门控网络
        self.gate = TopKGate(d_model, num_experts, top_k, capacity_factor)

    def forward(
        self,
        x: torch.Tensor,
        train: bool = True
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Args:
            x: [batch_size, seq_len, d_model]

        Returns:
            output: [batch_size, seq_len, d_model]
            aux_loss: 负载均衡损失
        """
        batch_size, seq_len, d_model = x.shape

        # 门控
        gates, indices, load = self.gate(x, train)
        # gates: [batch_size, seq_len, top_k]
        # indices: [batch_size, seq_len, top_k]

        # 初始化输出
        output = torch.zeros_like(x)

        # 为每个专家处理分配的tokens
        for i in range(self.num_experts):
            # 找到分配给当前专家的所有位置
            expert_mask = (indices == i).any(dim=-1)  # [batch_size, seq_len]

            if expert_mask.any():
                # 提取分配给该专家的tokens
                expert_input = x[expert_mask]  # [num_tokens, d_model]

                # 专家处理
                expert_output = self.experts[i](expert_input)

                # 获取对应的门控权重
                expert_gates = gates[expert_mask]  # [num_tokens, top_k]
                expert_indices = indices[expert_mask]  # [num_tokens, top_k]

                # 找到当前专家在top_k中的位置
                expert_positions = (expert_indices == i).nonzero(as_tuple=True)[1]
                expert_weights = expert_gates[
                    torch.arange(expert_gates.size(0)),
                    expert_positions
                ].unsqueeze(-1)

                # 加权累加到输出
                output[expert_mask] += expert_weights * expert_output

        # 计算负载均衡损失
        aux_loss = self._calculate_balance_loss(load)

        return output, aux_loss

    def _calculate_balance_loss(self, load: torch.Tensor) -> torch.Tensor:
        """
        计算负载均衡损失
        目标:让每个专家的负载接近 1/num_experts
        """
        # 目标负载
        target_load = 1.0 / self.num_experts

        # 使用CV (Coefficient of Variation) 作为均衡指标
        balance_loss = torch.sum(load * torch.log(load * self.num_experts + 1e-8))

        return self.balance_loss_weight * balance_loss

class MoETransformerBlock(nn.Module):
    """包含MoE的Transformer块"""
    def __init__(
        self,
        d_model: int = 768,
        num_heads: int = 12,
        d_ff: int = 3072,
        num_experts: int = 8,
        top_k: int = 2,
        dropout: float = 0.1
    ):
        super().__init__()

        # 自注意力
        self.attention = nn.MultiheadAttention(
            d_model,
            num_heads,
            dropout=dropout,
            batch_first=True
        )
        self.norm1 = nn.LayerNorm(d_model)

        # MoE层
        self.moe = MoELayer(
            d_model=d_model,
            d_ff=d_ff,
            num_experts=num_experts,
            top_k=top_k,
            dropout=dropout
        )
        self.norm2 = nn.LayerNorm(d_model)

    def forward(
        self,
        x: torch.Tensor,
        mask: Optional[torch.Tensor] = None
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        # 自注意力 + 残差
        attn_out, _ = self.attention(x, x, x, attn_mask=mask)
        x = self.norm1(x + attn_out)

        # MoE + 残差
        moe_out, aux_loss = self.moe(x)
        x = self.norm2(x + moe_out)

        return x, aux_loss

# 使用示例
def example_moe_usage():
    """MoE使用示例"""
    batch_size = 4
    seq_len = 128
    d_model = 768

    # 创建MoE Transformer块
    moe_block = MoETransformerBlock(
        d_model=d_model,
        num_heads=12,
        d_ff=3072,
        num_experts=8,
        top_k=2
    )

    # 随机输入
    x = torch.randn(batch_size, seq_len, d_model)

    # 前向传播
    output, aux_loss = moe_block(x)

    print(f"输入形状: {x.shape}")
    print(f"输出形状: {output.shape}")
    print(f"负载均衡损失: {aux_loss.item():.6f}")

    # 训练循环示例
    optimizer = torch.optim.Adam(moe_block.parameters(), lr=1e-4)

    for step in range(10):
        optimizer.zero_grad()

        # 前向传播
        output, aux_loss = moe_block(x)

        # 主任务损失(这里用随机目标演示)
        target = torch.randn_like(output)
        main_loss = F.mse_loss(output, target)

        # 总损失 = 主任务损失 + 负载均衡损失
        total_loss = main_loss + aux_loss

        # 反向传播
        total_loss.backward()
        optimizer.step()

        if step % 5 == 0:
            print(f"Step {step}: Main Loss={main_loss.item():.4f}, "
                  f"Aux Loss={aux_loss.item():.6f}")

if __name__ == "__main__":
    example_moe_usage()

18.1.3 Mixtral MoE实现

Mistral AI的Mixtral 8x7B是最成功的开源MoE模型之一。

import torch
import torch.nn as nn
from transformers import AutoTokenizer, AutoModelForCausalLM
from typing import Optional, List

class MixtralConfig:
    """Mixtral模型配置"""
    def __init__(
        self,
        vocab_size: int = 32000,
        hidden_size: int = 4096,
        intermediate_size: int = 14336,
        num_hidden_layers: int = 32,
        num_attention_heads: int = 32,
        num_key_value_heads: int = 8,  # GQA
        num_experts: int = 8,
        num_experts_per_token: int = 2,
        max_position_embeddings: int = 32768,
        rope_theta: float = 10000.0,
        rms_norm_eps: float = 1e-5,
    ):
        self.vocab_size = vocab_size
        self.hidden_size = hidden_size
        self.intermediate_size = intermediate_size
        self.num_hidden_layers = num_hidden_layers
        self.num_attention_heads = num_attention_heads
        self.num_key_value_heads = num_key_value_heads
        self.num_experts = num_experts
        self.num_experts_per_token = num_experts_per_token
        self.max_position_embeddings = max_position_embeddings
        self.rope_theta = rope_theta
        self.rms_norm_eps = rms_norm_eps

class MixtralSparseMoeBlock(nn.Module):
    """Mixtral的MoE实现"""
    def __init__(self, config: MixtralConfig):
        super().__init__()
        self.hidden_dim = config.hidden_size
        self.ffn_dim = config.intermediate_size
        self.num_experts = config.num_experts
        self.top_k = config.num_experts_per_token

        # 门控网络
        self.gate = nn.Linear(self.hidden_dim, self.num_experts, bias=False)

        # 专家网络(每个专家是一个完整的FFN)
        self.experts = nn.ModuleList([
            MixtralExpert(self.hidden_dim, self.ffn_dim)
            for _ in range(self.num_experts)
        ])

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        batch_size, seq_len, hidden_dim = hidden_states.shape

        # 重塑为2D以便处理
        hidden_states_flat = hidden_states.view(-1, hidden_dim)

        # 计算路由权重
        router_logits = self.gate(hidden_states_flat)
        routing_weights = torch.softmax(router_logits, dim=-1)

        # 选择top-k专家
        routing_weights, selected_experts = torch.topk(
            routing_weights, self.top_k, dim=-1
        )

        # 重新归一化
        routing_weights /= routing_weights.sum(dim=-1, keepdim=True)

        # 初始化输出
        final_hidden_states = torch.zeros_like(hidden_states_flat)

        # 为每个专家处理tokens
        expert_mask = torch.nn.functional.one_hot(
            selected_experts,
            num_classes=self.num_experts
        ).permute(2, 1, 0)

        for expert_idx in range(self.num_experts):
            expert = self.experts[expert_idx]
            idx, top_x = torch.where(expert_mask[expert_idx])

            if top_x.shape[0] == 0:
                continue

            # 处理分配给该专家的tokens
            current_state = hidden_states_flat[None, top_x].reshape(-1, hidden_dim)
            current_hidden_states = expert(current_state)

            # 应用路由权重
            current_hidden_states *= routing_weights[top_x, idx, None]

            # 累加到输出
            final_hidden_states.index_add_(
                0, top_x, current_hidden_states.to(hidden_states.dtype)
            )

        # 恢复原始形状
        final_hidden_states = final_hidden_states.reshape(
            batch_size, seq_len, hidden_dim
        )

        return final_hidden_states

class MixtralExpert(nn.Module):
    """Mixtral专家网络"""
    def __init__(self, hidden_dim: int, ffn_dim: int):
        super().__init__()
        self.w1 = nn.Linear(hidden_dim, ffn_dim, bias=False)
        self.w2 = nn.Linear(ffn_dim, hidden_dim, bias=False)
        self.w3 = nn.Linear(hidden_dim, ffn_dim, bias=False)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # SwiGLU activation
        return self.w2(nn.functional.silu(self.w1(x)) * self.w3(x))

# 使用Hugging Face的Mixtral模型
def use_mixtral_model():
    """使用预训练的Mixtral模型"""
    model_name = "mistralai/Mixtral-8x7B-Instruct-v0.1"

    # 加载tokenizer和模型
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    # 使用8bit量化加载(节省内存)
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        device_map="auto",
        load_in_8bit=True,
        torch_dtype=torch.float16
    )

    # 推理示例
    prompt = "Explain the concept of Mixture of Experts in AI:"
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

    outputs = model.generate(
        **inputs,
        max_new_tokens=200,
        temperature=0.7,
        top_p=0.9,
        do_sample=True
    )

    generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
    print(generated_text)

18.1.4 MoE的优势与挑战

优势:

  1. 计算效率: 虽然参数量大,但每次推理只激活部分参数
  2. 可扩展性: 可以通过增加专家数量来扩展模型容量
  3. 专业化: 不同专家可以学习处理不同类型的输入

挑战:

  1. 负载不均: 某些专家可能被过度使用
  2. 训练不稳定: 需要精心设计的负载均衡策略
  3. 内存占用: 所有专家都需要加载到内存
  4. 通信开销: 分布式训练时专家间的通信成本高

实际应用数据:

  • Mixtral 8x7B: 47B参数,但每token只激活13B参数
  • 性能匹敌GPT-3.5,但推理速度快6倍
  • 支持32K上下文长度
  • DeepSeek-V2: 236B参数,激活21B,性能接近GPT-4

18.2 长上下文技术

长上下文处理是2024年的重要突破,使模型能够处理数十万甚至百万token的输入。

18.2.1 Context Length瓶颈分析

传统注意力的问题:

# 标准自注意力的复杂度
import torch
import torch.nn as nn
import math

def standard_attention(Q, K, V, mask=None):
    """
    标准缩放点积注意力

    复杂度:
    - 时间: O(n²d),其中n是序列长度,d是特征维度
    - 空间: O(n²),需要存储注意力矩阵
    """
    d_k = Q.size(-1)

    # QK^T: [batch, heads, n, n]
    scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(d_k)

    if mask is not None:
        scores = scores.masked_fill(mask == 0, -1e9)

    # Softmax: [batch, heads, n, n]
    attn_weights = torch.softmax(scores, dim=-1)

    # 输出: [batch, heads, n, d]
    output = torch.matmul(attn_weights, V)

    return output, attn_weights

# 复杂度分析
def analyze_attention_complexity():
    """分析不同序列长度的内存和计算需求"""
    import numpy as np

    batch_size = 1
    num_heads = 32
    d_head = 128

    seq_lengths = [512, 2048, 8192, 32768, 131072]

    print("序列长度 | 注意力矩阵大小 | 内存占用(GB) | FLOPs(B)")
    print("-" * 65)

    for seq_len in seq_lengths:
        # 注意力矩阵大小: [batch, heads, n, n]
        attn_matrix_size = batch_size * num_heads * seq_len * seq_len

        # 内存占用(float16)
        memory_gb = attn_matrix_size * 2 / (1024**3)

        # FLOPs: QK^T + Softmax + AttnV
        flops = (
            2 * seq_len * seq_len * d_head +  # QK^T
            4 * seq_len * seq_len +            # Softmax
            2 * seq_len * seq_len * d_head     # AttnV
        ) * num_heads
        flops_b = flops / 1e9

        print(f"{seq_len:8d} | {attn_matrix_size:14,d} | "
              f"{memory_gb:11.2f} | {flops_b:10.2f}")

if __name__ == "__main__":
    analyze_attention_complexity()

输出:

序列长度 | 注意力矩阵大小 | 内存占用(GB) | FLOPs(B)
-----------------------------------------------------------------
     512 |      8,388,608 |        0.02 |       0.34
    2048 |    134,217,728 |        0.25 |       5.50
    8192 |  2,147,483,648 |        4.00 |      88.08
   32768 | 34,359,738,368 |       64.00 |    1409.29
  131072 |549,755,813,888 |     1024.00 |   22548.58

18.2.2 RoPE位置编码外推

RoPE (Rotary Position Embedding) 支持长度外推,是长上下文的关键技术。

import torch
import torch.nn as nn
import math
from typing import Tuple

class RotaryPositionEmbedding(nn.Module):
    """
    旋转位置编码(RoPE)
    论文: RoFormer: Enhanced Transformer with Rotary Position Embedding
    """
    def __init__(
        self,
        dim: int,
        max_seq_len: int = 2048,
        base: float = 10000.0
    ):
        super().__init__()
        self.dim = dim
        self.max_seq_len = max_seq_len
        self.base = base

        # 预计算频率
        inv_freq = 1.0 / (
            base ** (torch.arange(0, dim, 2).float() / dim)
        )
        self.register_buffer('inv_freq', inv_freq)

    def forward(
        self,
        seq_len: int,
        device: torch.device
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        生成cos和sin缓存

        Returns:
            cos_cached: [seq_len, dim]
            sin_cached: [seq_len, dim]
        """
        # 位置索引
        t = torch.arange(seq_len, device=device).type_as(self.inv_freq)

        # 计算频率 * 位置
        freqs = torch.einsum('i,j->ij', t, self.inv_freq)

        # 拼接以匹配完整维度
        emb = torch.cat((freqs, freqs), dim=-1)

        return emb.cos(), emb.sin()

def rotate_half(x: torch.Tensor) -> torch.Tensor:
    """将特征维度分为两半并旋转"""
    x1, x2 = x[..., :x.shape[-1] // 2], x[..., x.shape[-1] // 2:]
    return torch.cat((-x2, x1), dim=-1)

def apply_rotary_pos_emb(
    q: torch.Tensor,
    k: torch.Tensor,
    cos: torch.Tensor,
    sin: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor]:
    """
    应用旋转位置编码

    Args:
        q, k: [batch, heads, seq_len, dim]
        cos, sin: [seq_len, dim]
    """
    # 扩展维度以匹配q, k
    cos = cos[None, None, :, :]  # [1, 1, seq_len, dim]
    sin = sin[None, None, :, :]

    # 应用旋转
    q_embed = (q * cos) + (rotate_half(q) * sin)
    k_embed = (k * cos) + (rotate_half(k) * sin)

    return q_embed, k_embed

class RoPEScaledAttention(nn.Module):
    """带有RoPE的注意力层,支持长度外推"""
    def __init__(
        self,
        d_model: int,
        num_heads: int,
        max_seq_len: int = 2048,
        rope_base: float = 10000.0,
        rope_scaling: Optional[dict] = None
    ):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_head = d_model // num_heads

        # QKV投影
        self.q_proj = nn.Linear(d_model, d_model, bias=False)
        self.k_proj = nn.Linear(d_model, d_model, bias=False)
        self.v_proj = nn.Linear(d_model, d_model, bias=False)
        self.o_proj = nn.Linear(d_model, d_model, bias=False)

        # RoPE
        self.rope = RotaryPositionEmbedding(
            self.d_head,
            max_seq_len,
            rope_base
        )

        # RoPE缩放(用于长度外推)
        self.rope_scaling = rope_scaling
        if rope_scaling:
            self.rope_scale_factor = rope_scaling.get('factor', 1.0)
            self.rope_scale_type = rope_scaling.get('type', 'linear')
        else:
            self.rope_scale_factor = 1.0
            self.rope_scale_type = None

    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.Tensor] = None
    ) -> torch.Tensor:
        batch_size, seq_len, _ = hidden_states.shape

        # QKV投影
        Q = self.q_proj(hidden_states)
        K = self.k_proj(hidden_states)
        V = self.v_proj(hidden_states)

        # 重塑为多头
        Q = Q.view(batch_size, seq_len, self.num_heads, self.d_head)
        K = K.view(batch_size, seq_len, self.num_heads, self.d_head)
        V = V.view(batch_size, seq_len, self.num_heads, self.d_head)

        # 转置: [batch, heads, seq_len, d_head]
        Q = Q.transpose(1, 2)
        K = K.transpose(1, 2)
        V = V.transpose(1, 2)

        # 应用RoPE
        cos, sin = self.rope(seq_len, hidden_states.device)

        # RoPE缩放(用于外推)
        if self.rope_scaling:
            cos, sin = self._apply_rope_scaling(cos, sin, seq_len)

        Q, K = apply_rotary_pos_emb(Q, K, cos, sin)

        # 计算注意力
        attn_output = self._compute_attention(Q, K, V, attention_mask)

        # 输出投影
        attn_output = attn_output.transpose(1, 2).contiguous()
        attn_output = attn_output.view(batch_size, seq_len, self.d_model)
        output = self.o_proj(attn_output)

        return output

    def _apply_rope_scaling(
        self,
        cos: torch.Tensor,
        sin: torch.Tensor,
        seq_len: int
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """应用RoPE缩放以支持长度外推"""
        if self.rope_scale_type == 'linear':
            # 线性插值
            max_seq_len = self.rope.max_seq_len
            if seq_len > max_seq_len:
                # 插值到更长的序列
                scale = seq_len / max_seq_len
                positions = torch.arange(seq_len, device=cos.device) / scale

                # 重新计算频率
                freqs = torch.einsum(
                    'i,j->ij',
                    positions,
                    self.rope.inv_freq
                )
                emb = torch.cat((freqs, freqs), dim=-1)
                cos = emb.cos()
                sin = emb.sin()

        elif self.rope_scale_type == 'dynamic':
            # 动态NTK插值
            scale = self.rope_scale_factor
            max_seq_len = self.rope.max_seq_len * scale

            # 调整base以保持高频信息
            new_base = self.rope.base * (
                (scale * seq_len / max_seq_len) - (scale - 1)
            ) ** (self.d_head / (self.d_head - 2))

            inv_freq = 1.0 / (
                new_base ** (
                    torch.arange(0, self.d_head, 2, device=cos.device).float()
                    / self.d_head
                )
            )

            t = torch.arange(seq_len, device=cos.device).type_as(inv_freq)
            freqs = torch.einsum('i,j->ij', t, inv_freq)
            emb = torch.cat((freqs, freqs), dim=-1)
            cos = emb.cos()
            sin = emb.sin()

        return cos, sin

    def _compute_attention(
        self,
        Q: torch.Tensor,
        K: torch.Tensor,
        V: torch.Tensor,
        mask: Optional[torch.Tensor] = None
    ) -> torch.Tensor:
        """计算缩放点积注意力"""
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_head)

        if mask is not None:
            scores = scores + mask

        attn_weights = torch.softmax(scores, dim=-1)
        output = torch.matmul(attn_weights, V)

        return output

# 测试RoPE外推
def test_rope_extrapolation():
    """测试RoPE的长度外推能力"""
    d_model = 768
    num_heads = 12
    train_seq_len = 2048
    test_seq_len = 8192  # 4倍于训练长度

    # 创建模型(带RoPE缩放)
    rope_scaling = {
        'type': 'dynamic',
        'factor': 4.0
    }

    model = RoPEScaledAttention(
        d_model=d_model,
        num_heads=num_heads,
        max_seq_len=train_seq_len,
        rope_scaling=rope_scaling
    )

    # 测试短序列(训练长度)
    short_input = torch.randn(1, train_seq_len, d_model)
    short_output = model(short_input)
    print(f"短序列 ({train_seq_len}): 输出形状 {short_output.shape}")

    # 测试长序列(外推)
    long_input = torch.randn(1, test_seq_len, d_model)
    long_output = model(long_input)
    print(f"长序列 ({test_seq_len}): 输出形状 {long_output.shape}")

    print("\nRoPE外推成功!")

if __name__ == "__main__":
    test_rope_extrapolation()

18.2.3 窗口注意力与FlashAttention

滑动窗口注意力:

import torch
import torch.nn as nn
from typing import Optional

def sliding_window_attention(
    Q: torch.Tensor,
    K: torch.Tensor,
    V: torch.Tensor,
    window_size: int = 512
) -> torch.Tensor:
    """
    滑动窗口注意力

    每个token只关注窗口内的其他tokens
    复杂度: O(n * w * d),其中w是窗口大小

    Args:
        Q, K, V: [batch, heads, seq_len, d_head]
        window_size: 窗口大小
    """
    batch_size, num_heads, seq_len, d_head = Q.shape

    # 创建滑动窗口mask
    mask = torch.ones(seq_len, seq_len, device=Q.device)
    for i in range(seq_len):
        start = max(0, i - window_size // 2)
        end = min(seq_len, i + window_size // 2 + 1)
        mask[i, start:end] = 0

    # 注意力计算
    scores = torch.matmul(Q, K.transpose(-2, -1)) / (d_head ** 0.5)
    scores = scores.masked_fill(mask.bool(), float('-inf'))
    attn_weights = torch.softmax(scores, dim=-1)
    output = torch.matmul(attn_weights, V)

    return output

class FlashAttention(nn.Module):
    """
    FlashAttention-2 简化实现

    核心思想:
    1. 分块计算,减少HBM访问
    2. 在线softmax,避免存储完整注意力矩阵
    3. 重计算而非存储中间结果
    """
    def __init__(self, dropout: float = 0.0):
        super().__init__()
        self.dropout = dropout

    def forward(
        self,
        Q: torch.Tensor,
        K: torch.Tensor,
        V: torch.Tensor,
        causal: bool = False,
        block_size: int = 256
    ) -> torch.Tensor:
        """
        FlashAttention前向传播

        Args:
            Q, K, V: [batch, heads, seq_len, d_head]
            causal: 是否使用因果mask
            block_size: 分块大小
        """
        batch_size, num_heads, seq_len, d_head = Q.shape

        # 分块数量
        num_blocks = (seq_len + block_size - 1) // block_size

        # 输出缓冲区
        O = torch.zeros_like(Q)
        l = torch.zeros(batch_size, num_heads, seq_len, 1, device=Q.device)
        m = torch.full(
            (batch_size, num_heads, seq_len, 1),
            float('-inf'),
            device=Q.device
        )

        # 分块处理
        for i in range(num_blocks):
            # Q块
            q_start = i * block_size
            q_end = min((i + 1) * block_size, seq_len)
            Q_block = Q[:, :, q_start:q_end, :]

            # 初始化块级统计
            O_block = torch.zeros_like(Q_block)
            l_block = torch.zeros(
                batch_size, num_heads, q_end - q_start, 1,
                device=Q.device
            )
            m_block = torch.full(
                (batch_size, num_heads, q_end - q_start, 1),
                float('-inf'),
                device=Q.device
            )

            for j in range(num_blocks):
                # 因果mask检查
                if causal and j > i:
                    continue

                # K, V块
                k_start = j * block_size
                k_end = min((j + 1) * block_size, seq_len)
                K_block = K[:, :, k_start:k_end, :]
                V_block = V[:, :, k_start:k_end, :]

                # 计算注意力分数
                S_block = torch.matmul(Q_block, K_block.transpose(-2, -1))
                S_block = S_block / (d_head ** 0.5)

                # 因果mask
                if causal:
                    block_mask = torch.triu(
                        torch.ones(
                            q_end - q_start, k_end - k_start,
                            device=Q.device
                        ),
                        diagonal=k_start - q_start + 1
                    )
                    S_block = S_block.masked_fill(
                        block_mask.bool(),
                        float('-inf')
                    )

                # 在线softmax更新
                m_block_new = torch.maximum(
                    m_block,
                    S_block.max(dim=-1, keepdim=True)[0]
                )

                # 更新指数和
                alpha = torch.exp(m_block - m_block_new)
                beta = torch.exp(S_block - m_block_new)

                l_block_new = alpha * l_block + beta.sum(dim=-1, keepdim=True)

                # 更新输出
                O_block = (
                    alpha * O_block +
                    torch.matmul(beta, V_block)
                )

                m_block = m_block_new
                l_block = l_block_new

            # 归一化
            O_block = O_block / l_block

            # 写回全局输出
            O[:, :, q_start:q_end, :] = O_block

        return O

# 性能对比
def compare_attention_methods():
    """对比不同注意力方法的性能"""
    import time

    batch_size = 2
    num_heads = 12
    seq_len = 4096
    d_head = 64

    Q = torch.randn(batch_size, num_heads, seq_len, d_head, device='cuda')
    K = torch.randn(batch_size, num_heads, seq_len, d_head, device='cuda')
    V = torch.randn(batch_size, num_heads, seq_len, d_head, device='cuda')

    # 标准注意力
    torch.cuda.synchronize()
    start = time.time()
    standard_out, _ = standard_attention(Q, K, V)
    torch.cuda.synchronize()
    standard_time = time.time() - start

    # 滑动窗口注意力
    torch.cuda.synchronize()
    start = time.time()
    window_out = sliding_window_attention(Q, K, V, window_size=512)
    torch.cuda.synchronize()
    window_time = time.time() - start

    # FlashAttention
    flash_attn = FlashAttention().cuda()
    torch.cuda.synchronize()
    start = time.time()
    flash_out = flash_attn(Q, K, V)
    torch.cuda.synchronize()
    flash_time = time.time() - start

    print(f"序列长度: {seq_len}")
    print(f"标准注意力: {standard_time:.4f}s")
    print(f"窗口注意力: {window_time:.4f}s ({standard_time/window_time:.2f}x)")
    print(f"FlashAttention: {flash_time:.4f}s ({standard_time/flash_time:.2f}x)")

if __name__ == "__main__":
    if torch.cuda.is_available():
        compare_attention_methods()

18.2.4 实际长上下文模型

Claude 3 (Anthropic, 2024)

能力:
  - 上下文窗口: 200K tokens
  - 实际测试: 能准确检索200K文档中的信息
  - "大海捞针"测试: 接近完美

技术:
  - 位置编码改进
  - 注意力优化
  - KV Cache压缩

应用:
  - 分析整本书籍
  - 理解大型代码库
  - 长时间对话记忆

Gemini 1.5 Pro (Google, 2024)

能力:
  - 上下文窗口: 1M tokens (100万!)
  - 可处理:
    - 11小时音频
    - 1小时视频
    - 700K代码行
    - 多本书籍

技术:
  - MoE架构
  - 多模态理解
  - 高效注意力机制

突破:
  - 首个百万token级模型
  - 多模态长上下文统一

GPT-4 Turbo (OpenAI, 2023-2024)

能力:
  - 上下文窗口: 128K tokens
  - 相当于300页文本

应用:
  - 长文档摘要
  - 复杂代码审查
  - 持续对话

价格:
  - 输入: $0.01/1K tokens
  - 输出: $0.03/1K tokens

18.3 模型量化与小型化

模型量化是将模型部署到边缘设备的关键技术。

18.3.1 量化原理

量化基础:

import torch
import torch.nn as nn
import numpy as np
from typing import Tuple

def quantize_tensor(tensor: torch.Tensor, num_bits: int = 8) -> Tuple[torch.Tensor, float, float]:
    """
    对称量化

    量化公式:
    q = round(x / scale)
    x_reconstructed = q * scale

    Args:
        tensor: 原始浮点张量
        num_bits: 量化位数

    Returns:
        quantized: 量化后的整数张量
        scale: 量化比例
        zero_point: 零点
    """
    # 计算范围
    qmin = -(2 ** (num_bits - 1))
    qmax = 2 ** (num_bits - 1) - 1

    # 计算scale
    min_val = tensor.min().item()
    max_val = tensor.max().item()

    scale = (max_val - min_val) / (qmax - qmin)
    zero_point = qmin - min_val / scale

    # 量化
    quantized = torch.clamp(
        torch.round(tensor / scale + zero_point),
        qmin, qmax
    ).to(torch.int8)

    return quantized, scale, zero_point

def dequantize_tensor(
    quantized: torch.Tensor,
    scale: float,
    zero_point: float
) -> torch.Tensor:
    """反量化"""
    return (quantized.float() - zero_point) * scale

# INT8量化示例
def int8_quantization_example():
    """INT8量化示例"""
    # 原始权重
    weight = torch.randn(512, 512) * 0.1

    # 量化
    weight_q, scale, zero_point = quantize_tensor(weight, num_bits=8)

    # 反量化
    weight_dq = dequantize_tensor(weight_q, scale, zero_point)

    # 计算误差
    error = (weight - weight_dq).abs().mean()

    # 内存节省
    original_size = weight.element_size() * weight.nelement() / 1024  # KB
    quantized_size = weight_q.element_size() * weight_q.nelement() / 1024

    print(f"原始权重: {original_size:.2f} KB (FP32)")
    print(f"量化权重: {quantized_size:.2f} KB (INT8)")
    print(f"压缩比: {original_size / quantized_size:.2f}x")
    print(f"平均误差: {error:.6f}")

if __name__ == "__main__":
    int8_quantization_example()

18.3.2 GPTQ量化

GPTQ (Accurate Post-Training Quantization for Generative Pre-trained Transformers) 是一种高精度的权重量化方法。

import torch
import torch.nn as nn
from typing import List, Tuple
import math

class GPTQ:
    """
    GPTQ量化器

    论文: GPTQ: Accurate Post-Training Quantization for Generative Pre-trained Transformers

    核心思想:
    1. 逐层量化,最小化重构误差
    2. 使用Hessian信息指导量化
    3. 贪心逐列优化
    """
    def __init__(
        self,
        layer: nn.Linear,
        bits: int = 4,
        group_size: int = 128,
        actorder: bool = True
    ):
        self.layer = layer
        self.bits = bits
        self.group_size = group_size
        self.actorder = actorder

        self.dev = layer.weight.device
        self.rows = layer.weight.shape[0]
        self.columns = layer.weight.shape[1]

        # Hessian矩阵(近似)
        self.H = torch.zeros(
            (self.columns, self.columns),
            device=self.dev
        )
        self.nsamples = 0

    def add_batch(self, inp: torch.Tensor):
        """
        添加一批样本来估计Hessian矩阵

        Args:
            inp: [batch_size, seq_len, in_features]
        """
        if len(inp.shape) == 3:
            inp = inp.reshape(-1, inp.shape[-1])

        tmp = inp.shape[0]

        # 计算H = 2 * X^T X
        inp = math.sqrt(2 / self.nsamples) * inp.float()
        self.H += inp.t().matmul(inp)
        self.nsamples += tmp

    def quantize(self) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        执行GPTQ量化

        Returns:
            quantized_weight: 量化后的权重
            scales: 量化scale
        """
        W = self.layer.weight.data.clone()

        # 确保Hessian已经计算
        if self.nsamples == 0:
            raise ValueError("请先调用add_batch添加样本")

        # 计算量化参数
        qmin = -(2 ** (self.bits - 1))
        qmax = 2 ** (self.bits - 1) - 1

        # 初始化
        H = self.H
        dead = torch.diag(H) == 0
        H[dead, dead] = 1
        W[:, dead] = 0

        # Cholesky分解
        try:
            damp = 0.01 * torch.mean(torch.diag(H))
            diag = torch.arange(self.columns, device=self.dev)
            H[diag, diag] += damp
            H = torch.linalg.cholesky(H)
            H = torch.cholesky_inverse(H)
            H = torch.linalg.cholesky(H, upper=True)
            Hinv = H
        except:
            print("Cholesky分解失败,使用对角近似")
            Hinv = torch.diag(1.0 / torch.diag(H))

        # 量化顺序
        if self.actorder:
            perm = torch.argsort(torch.diag(H), descending=True)
            W = W[:, perm]
            Hinv = Hinv[perm][:, perm]

        # 分组量化
        Q = torch.zeros_like(W)
        scales = torch.zeros(
            (self.rows, (self.columns + self.group_size - 1) // self.group_size),
            device=self.dev
        )

        Err = torch.zeros_like(W)

        for i1 in range(0, self.columns, self.group_size):
            i2 = min(i1 + self.group_size, self.columns)
            count = i2 - i1

            W1 = W[:, i1:i2].clone()
            Q1 = torch.zeros_like(W1)
            Err1 = torch.zeros_like(W1)
            H1 = Hinv[i1:i2, i1:i2]

            for i in range(count):
                w = W1[:, i]
                d = H1[i, i]

                # 计算scale
                scale = w.abs().max() / qmax
                scales[:, i1 // self.group_size] = scale

                # 量化
                q = torch.clamp(
                    torch.round(w / scale),
                    qmin, qmax
                )
                Q1[:, i] = q * scale

                # 更新误差
                err = (w - Q1[:, i]) / d
                Err1[:, i] = err

                # 传播误差到后续列
                W1[:, i:] -= err.unsqueeze(1).matmul(H1[i, i:].unsqueeze(0))

            Q[:, i1:i2] = Q1
            Err[:, i1:i2] = Err1

        # 恢复原始顺序
        if self.actorder:
            invperm = torch.argsort(perm)
            Q = Q[:, invperm]

        # 转换为整数
        Q_int = torch.zeros_like(Q, dtype=torch.int8)
        for i in range(scales.shape[1]):
            i1 = i * self.group_size
            i2 = min(i1 + self.group_size, self.columns)
            scale = scales[:, i:i+1]
            Q_int[:, i1:i2] = torch.clamp(
                torch.round(Q[:, i1:i2] / scale),
                qmin, qmax
            ).to(torch.int8)

        return Q_int, scales

# 使用GPTQ量化模型
def quantize_model_with_gptq(
    model: nn.Module,
    calibration_data: torch.Tensor,
    bits: int = 4
) -> nn.Module:
    """
    使用GPTQ量化整个模型

    Args:
        model: 要量化的模型
        calibration_data: 校准数据
        bits: 量化位数
    """
    # 收集所有Linear层
    linear_layers = []
    for name, module in model.named_modules():
        if isinstance(module, nn.Linear):
            linear_layers.append((name, module))

    print(f"找到 {len(linear_layers)} 个线性层")

    # 逐层量化
    for name, layer in linear_layers:
        print(f"量化层: {name}")

        # 创建GPTQ量化器
        gptq = GPTQ(layer, bits=bits)

        # 收集激活值(这里简化,实际需要hook)
        # 实际应用中需要在前向传播时收集
        with torch.no_grad():
            fake_input = torch.randn(
                calibration_data.shape[0],
                layer.in_features,
                device=layer.weight.device
            )
            gptq.add_batch(fake_input)

        # 量化
        q_weight, scales = gptq.quantize()

        # 替换权重(这里简化,实际需要自定义量化层)
        print(f"  原始: {layer.weight.shape}, 量化: {q_weight.shape}")

    return model

# 示例
def gptq_example():
    """GPTQ量化示例"""
    # 创建简单模型
    model = nn.Sequential(
        nn.Linear(512, 1024),
        nn.ReLU(),
        nn.Linear(1024, 512)
    )

    # 校准数据
    calib_data = torch.randn(100, 512)

    # 量化
    quantized_model = quantize_model_with_gptq(model, calib_data, bits=4)

    print("\nGPTQ量化完成!")

if __name__ == "__main__":
    gptq_example()

18.3.3 AWQ量化

AWQ (Activation-aware Weight Quantization) 通过保护重要权重来提高量化精度。

import torch
import torch.nn as nn
from typing import Dict, List

class AWQ:
    """
    AWQ量化器

    论文: AWQ: Activation-aware Weight Quantization for LLM Compression and Acceleration

    核心思想:
    1. 识别对激活值影响大的权重通道
    2. 对这些通道应用较小的量化力度
    3. 使用逐通道scale
    """
    def __init__(
        self,
        layer: nn.Linear,
        bits: int = 4,
        group_size: int = 128,
        alpha: float = 0.5
    ):
        self.layer = layer
        self.bits = bits
        self.group_size = group_size
        self.alpha = alpha

        self.dev = layer.weight.device

        # 激活值统计
        self.act_scales = None
        self.nsamples = 0

    def collect_activations(self, inp: torch.Tensor):
        """
        收集激活值统计

        Args:
            inp: [batch_size, seq_len, in_features]
        """
        if len(inp.shape) == 3:
            inp = inp.reshape(-1, inp.shape[-1])

        # 计算每个通道的激活幅度
        scales = inp.abs().mean(dim=0)

        if self.act_scales is None:
            self.act_scales = scales
        else:
            # 指数移动平均
            self.act_scales = 0.9 * self.act_scales + 0.1 * scales

        self.nsamples += inp.shape[0]

    def compute_scaling_factors(self) -> torch.Tensor:
        """
        计算每个通道的缩放因子

        Returns:
            s: [in_features] 缩放因子
        """
        if self.act_scales is None:
            raise ValueError("请先收集激活值")

        # 基于激活值重要性的缩放
        s = self.act_scales.pow(self.alpha)

        # 归一化(保持整体scale)
        s = s / s.mean()

        return s

    def quantize(self) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        """
        执行AWQ量化

        Returns:
            quantized_weight: 量化后的权重
            scales: 量化scale
            scaling_factors: 通道缩放因子
        """
        W = self.layer.weight.data.clone()

        # 计算缩放因子
        s = self.compute_scaling_factors()

        # 应用缩放
        W_scaled = W * s.view(1, -1)

        # 分组量化
        qmin = -(2 ** (self.bits - 1))
        qmax = 2 ** (self.bits - 1) - 1

        out_features, in_features = W_scaled.shape
        num_groups = (in_features + self.group_size - 1) // self.group_size

        Q = torch.zeros_like(W_scaled, dtype=torch.int8)
        scales = torch.zeros(out_features, num_groups, device=self.dev)

        for g in range(num_groups):
            start = g * self.group_size
            end = min(start + self.group_size, in_features)

            W_group = W_scaled[:, start:end]

            # 计算scale(逐通道)
            scale = W_group.abs().max(dim=1)[0] / qmax
            scales[:, g] = scale

            # 量化
            Q[:, start:end] = torch.clamp(
                torch.round(W_group / scale.view(-1, 1)),
                qmin, qmax
            ).to(torch.int8)

        return Q, scales, s

class AWQLinear(nn.Module):
    """AWQ量化的线性层"""
    def __init__(
        self,
        in_features: int,
        out_features: int,
        bits: int = 4,
        group_size: int = 128
    ):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.bits = bits
        self.group_size = group_size

        # 量化权重
        self.register_buffer(
            'qweight',
            torch.zeros(
                (out_features, in_features),
                dtype=torch.int8
            )
        )

        # Scales
        num_groups = (in_features + group_size - 1) // group_size
        self.register_buffer(
            'scales',
            torch.zeros((out_features, num_groups))
        )

        # 通道缩放因子
        self.register_buffer(
            'scaling_factors',
            torch.ones(in_features)
        )

        if hasattr(nn.Linear(1, 1), 'bias'):
            self.register_buffer('bias', None)

    def pack_weights(
        self,
        qweight: torch.Tensor,
        scales: torch.Tensor,
        scaling_factors: torch.Tensor
    ):
        """打包量化权重"""
        self.qweight = qweight
        self.scales = scales
        self.scaling_factors = scaling_factors

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        前向传播

        Args:
            x: [batch_size, seq_len, in_features]
        """
        # 应用输入缩放
        x_scaled = x / self.scaling_factors.view(1, 1, -1)

        # 反量化权重并计算
        out_features, in_features = self.qweight.shape
        num_groups = self.scales.shape[1]

        output = torch.zeros(
            x.shape[0], x.shape[1], out_features,
            device=x.device, dtype=x.dtype
        )

        for g in range(num_groups):
            start = g * self.group_size
            end = min(start + self.group_size, in_features)

            # 反量化
            W_dequant = (
                self.qweight[:, start:end].float() *
                self.scales[:, g:g+1]
            )

            # 计算
            output += torch.matmul(
                x_scaled[:, :, start:end],
                W_dequant.t()
            )

        if self.bias is not None:
            output += self.bias

        return output

# 使用示例
def awq_example():
    """AWQ量化示例"""
    # 原始层
    layer = nn.Linear(512, 1024).cuda()

    # 创建AWQ量化器
    awq = AWQ(layer, bits=4, group_size=128)

    # 收集激活值
    for _ in range(10):
        fake_input = torch.randn(32, 128, 512).cuda()
        awq.collect_activations(fake_input)

    # 量化
    qweight, scales, scaling_factors = awq.quantize()

    # 创建量化层
    q_layer = AWQLinear(512, 1024, bits=4, group_size=128).cuda()
    q_layer.pack_weights(qweight, scales, scaling_factors)

    # 测试
    test_input = torch.randn(1, 10, 512).cuda()

    with torch.no_grad():
        original_output = layer(test_input)
        quantized_output = q_layer(test_input)

    error = (original_output - quantized_output).abs().mean()
    print(f"量化误差: {error:.6f}")

    # 内存节省
    original_size = layer.weight.nelement() * layer.weight.element_size() / 1024 / 1024
    quantized_size = (
        qweight.nelement() * qweight.element_size() +
        scales.nelement() * scales.element_size()
    ) / 1024 / 1024

    print(f"原始大小: {original_size:.2f} MB")
    print(f"量化大小: {quantized_size:.2f} MB")
    print(f"压缩比: {original_size / quantized_size:.2f}x")

if __name__ == "__main__":
    if torch.cuda.is_available():
        awq_example()

18.3.4 知识蒸馏

知识蒸馏是模型小型化的重要技术。

import torch
import torch.nn as nn
import torch.nn.functional as F

class DistillationLoss(nn.Module):
    """
    知识蒸馏损失

    结合硬标签和软标签
    """
    def __init__(
        self,
        temperature: float = 3.0,
        alpha: float = 0.5
    ):
        """
        Args:
            temperature: 软化温度
            alpha: 软标签权重
        """
        super().__init__()
        self.temperature = temperature
        self.alpha = alpha

    def forward(
        self,
        student_logits: torch.Tensor,
        teacher_logits: torch.Tensor,
        labels: torch.Tensor
    ) -> torch.Tensor:
        """
        计算蒸馏损失

        Args:
            student_logits: 学生模型输出 [batch, num_classes]
            teacher_logits: 教师模型输出 [batch, num_classes]
            labels: 真实标签 [batch]
        """
        # 硬标签损失
        hard_loss = F.cross_entropy(student_logits, labels)

        # 软标签损失(KL散度)
        soft_student = F.log_softmax(
            student_logits / self.temperature,
            dim=-1
        )
        soft_teacher = F.softmax(
            teacher_logits / self.temperature,
            dim=-1
        )
        soft_loss = F.kl_div(
            soft_student,
            soft_teacher,
            reduction='batchmean'
        ) * (self.temperature ** 2)

        # 组合损失
        total_loss = (
            self.alpha * soft_loss +
            (1 - self.alpha) * hard_loss
        )

        return total_loss

class DistillationTrainer:
    """知识蒸馏训练器"""
    def __init__(
        self,
        teacher_model: nn.Module,
        student_model: nn.Module,
        temperature: float = 3.0,
        alpha: float = 0.5,
        learning_rate: float = 1e-4
    ):
        self.teacher = teacher_model
        self.student = student_model

        # 冻结教师模型
        for param in self.teacher.parameters():
            param.requires_grad = False
        self.teacher.eval()

        # 损失和优化器
        self.criterion = DistillationLoss(temperature, alpha)
        self.optimizer = torch.optim.AdamW(
            self.student.parameters(),
            lr=learning_rate
        )

    def train_step(
        self,
        inputs: torch.Tensor,
        labels: torch.Tensor
    ) -> float:
        """单步训练"""
        self.student.train()

        # 教师模型推理(不计算梯度)
        with torch.no_grad():
            teacher_logits = self.teacher(inputs)

        # 学生模型推理
        student_logits = self.student(inputs)

        # 计算损失
        loss = self.criterion(student_logits, teacher_logits, labels)

        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss.item()

    def evaluate(
        self,
        dataloader: torch.utils.data.DataLoader
    ) -> Dict[str, float]:
        """评估模型"""
        self.student.eval()

        total_loss = 0
        correct = 0
        total = 0

        with torch.no_grad():
            for inputs, labels in dataloader:
                # 推理
                teacher_logits = self.teacher(inputs)
                student_logits = self.student(inputs)

                # 损失
                loss = self.criterion(
                    student_logits,
                    teacher_logits,
                    labels
                )
                total_loss += loss.item()

                # 准确率
                _, predicted = student_logits.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()

        return {
            'loss': total_loss / len(dataloader),
            'accuracy': 100. * correct / total
        }

# 完整示例:蒸馏BERT到TinyBERT
def distill_bert_example():
    """BERT知识蒸馏示例"""
    from transformers import (
        BertForSequenceClassification,
        BertConfig,
        BertTokenizer
    )

    # 教师模型(BERT-base)
    teacher_config = BertConfig(
        hidden_size=768,
        num_hidden_layers=12,
        num_attention_heads=12,
        intermediate_size=3072,
        num_labels=2
    )
    teacher = BertForSequenceClassification(teacher_config)

    # 学生模型(TinyBERT)
    student_config = BertConfig(
        hidden_size=312,
        num_hidden_layers=4,
        num_attention_heads=12,
        intermediate_size=1200,
        num_labels=2
    )
    student = BertForSequenceClassification(student_config)

    # 计算参数量
    teacher_params = sum(p.numel() for p in teacher.parameters())
    student_params = sum(p.numel() for p in student.parameters())

    print(f"教师模型参数: {teacher_params:,} ({teacher_params/1e6:.1f}M)")
    print(f"学生模型参数: {student_params:,} ({student_params/1e6:.1f}M)")
    print(f"压缩比: {teacher_params/student_params:.2f}x")

    # 创建蒸馏训练器
    trainer = DistillationTrainer(
        teacher_model=teacher,
        student_model=student,
        temperature=3.0,
        alpha=0.7
    )

    # 模拟训练数据
    fake_inputs = torch.randint(0, 30000, (32, 128))
    fake_labels = torch.randint(0, 2, (32,))

    # 训练一步
    loss = trainer.train_step(fake_inputs, fake_labels)
    print(f"\n训练损失: {loss:.4f}")

if __name__ == "__main__":
    distill_bert_example()

18.3.5 边缘部署:MLC-LLM

MLC-LLM (Machine Learning Compilation for LLM) 是用于边缘设备部署的框架。

# MLC-LLM配置示例
"""
MLC-LLM部署流程:

1. 模型量化
2. 编译优化
3. 移动端部署

支持平台:
- iOS (iPhone, iPad)
- Android
- WebGPU (浏览器)
- CUDA, Metal, Vulkan
"""

# 配置文件 mlc-config.json
mlc_config = {
    "model_name": "Llama-2-7B-Chat",
    "quantization": {
        "mode": "q4f16_1",  # 4bit权重,16bit激活
        "group_size": 32,
        "symmetric": True
    },
    "context_window_size": 4096,
    "prefill_chunk_size": 2048,
    "tensor_parallel_shards": 1,
    "max_batch_size": 1
}

# Python部署代码
def deploy_with_mlc():
    """使用MLC-LLM部署模型"""
    from mlc_chat import ChatModule
    from mlc_chat.callback import StreamToStdout

    # 加载模型
    cm = ChatModule(
        model="Llama-2-7b-chat-hf-q4f16_1",
        device="cuda:0"
    )

    # 推理
    output = cm.generate(
        prompt="What is the meaning of life?",
        progress_callback=StreamToStdout()
    )

    print(output)

# iOS部署示例 (Swift)
ios_code = """
import MLCChat

class ChatViewModel: ObservableObject {
    private var chat: ChatModule?
    @Published var messages: [String] = []

    init() {
        // 加载模型
        chat = ChatModule(modelPath: "Llama-2-7b-chat-q4f16_1")
    }

    func sendMessage(_ text: String) {
        chat?.generate(prompt: text) { response in
            DispatchQueue.main.async {
                self.messages.append(response)
            }
        }
    }
}
"""

# Android部署示例 (Kotlin)
android_code = """
import ai.mlc.mlcchat.ChatModule

class ChatViewModel : ViewModel() {
    private lateinit var chatModule: ChatModule
    private val _messages = MutableLiveData<List<String>>()
    val messages: LiveData<List<String>> = _messages

    init {
        // 加载模型
        chatModule = ChatModule(
            modelPath = "Llama-2-7b-chat-q4f16_1",
            device = "opencl"
        )
    }

    fun sendMessage(text: String) {
        viewModelScope.launch {
            val response = chatModule.generate(text)
            _messages.value = _messages.value?.plus(response)
        }
    }
}
"""

print("MLC-LLM部署配置已生成")
print("\n支持的量化模式:")
print("- q3f16_1: 3bit权重")
print("- q4f16_1: 4bit权重(推荐)")
print("- q4f16_0: 4bit权重(快速)")
print("- q0f32: 无量化(精度最高)")

实际应用数据:

  • INT8量化:2x内存压缩,1.5-2x推理加速,精度下降<1%
  • INT4量化(GPTQ/AWQ):4x压缩,3x加速,精度下降1-3%
  • 知识蒸馏:10-20x压缩(BERT→TinyBERT),精度下降2-5%
  • MLC-LLM:在iPhone 14 Pro上运行Llama-2-7B,约10 tokens/s

18.4 Diffusion模型

扩散模型revolutionize了图像生成领域。

18.4.1 扩散模型原理

核心数学:

import torch
import torch.nn as nn
import math
from typing import Tuple

class DDPMScheduler:
    """
    DDPM (Denoising Diffusion Probabilistic Models) 调度器

    前向过程(加噪):
    q(x_t | x_0) = N(x_t; √(ᾱ_t) x_0, (1 - ᾱ_t)I)

    反向过程(去噪):
    p_θ(x_{t-1} | x_t) = N(x_{t-1}; μ_θ(x_t, t), Σ_θ(x_t, t))
    """
    def __init__(
        self,
        num_train_timesteps: int = 1000,
        beta_start: float = 0.0001,
        beta_end: float = 0.02,
        beta_schedule: str = "linear"
    ):
        self.num_train_timesteps = num_train_timesteps

        # 生成beta schedule
        if beta_schedule == "linear":
            self.betas = torch.linspace(
                beta_start, beta_end, num_train_timesteps
            )
        elif beta_schedule == "cosine":
            self.betas = self._cosine_beta_schedule(
                num_train_timesteps
            )
        else:
            raise ValueError(f"Unknown schedule: {beta_schedule}")

        # 预计算常用项
        self.alphas = 1.0 - self.betas
        self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
        self.alphas_cumprod_prev = torch.cat([
            torch.tensor([1.0]),
            self.alphas_cumprod[:-1]
        ])

        # 用于采样的常数
        self.sqrt_alphas_cumprod = torch.sqrt(self.alphas_cumprod)
        self.sqrt_one_minus_alphas_cumprod = torch.sqrt(
            1.0 - self.alphas_cumprod
        )

        # 后验方差
        self.posterior_variance = (
            self.betas * (1.0 - self.alphas_cumprod_prev) /
            (1.0 - self.alphas_cumprod)
        )

    def _cosine_beta_schedule(self, timesteps: int, s: float = 0.008):
        """Cosine schedule (DDPM改进版本)"""
        steps = timesteps + 1
        x = torch.linspace(0, timesteps, steps)
        alphas_cumprod = torch.cos(
            ((x / timesteps) + s) / (1 + s) * math.pi * 0.5
        ) ** 2
        alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
        betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
        return torch.clip(betas, 0.0001, 0.9999)

    def add_noise(
        self,
        original_samples: torch.Tensor,
        noise: torch.Tensor,
        timesteps: torch.Tensor
    ) -> torch.Tensor:
        """
        前向过程:添加噪声

        x_t = √(ᾱ_t) * x_0 + √(1 - ᾱ_t) * ε
        """
        sqrt_alpha_prod = self.sqrt_alphas_cumprod[timesteps]
        sqrt_one_minus_alpha_prod = self.sqrt_one_minus_alphas_cumprod[
            timesteps
        ]

        # Reshape以便广播
        while len(sqrt_alpha_prod.shape) < len(original_samples.shape):
            sqrt_alpha_prod = sqrt_alpha_prod.unsqueeze(-1)
            sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.unsqueeze(-1)

        noisy_samples = (
            sqrt_alpha_prod * original_samples +
            sqrt_one_minus_alpha_prod * noise
        )

        return noisy_samples

    def step(
        self,
        model_output: torch.Tensor,
        timestep: int,
        sample: torch.Tensor
    ) -> torch.Tensor:
        """
        反向过程:去噪一步

        预测 x_{t-1} 给定 x_t 和 ε_θ(x_t, t)
        """
        t = timestep

        # 1. 从模型输出预测原始样本
        alpha_prod_t = self.alphas_cumprod[t]
        beta_prod_t = 1 - alpha_prod_t

        # 预测 x_0
        pred_original_sample = (
            sample - torch.sqrt(beta_prod_t) * model_output
        ) / torch.sqrt(alpha_prod_t)

        # 2. 计算"方向指向x_t"的项
        alpha_prod_t_prev = (
            self.alphas_cumprod_prev[t] if t > 0
            else torch.tensor(1.0)
        )

        pred_sample_direction = torch.sqrt(
            1 - alpha_prod_t_prev
        ) * model_output

        # 3. 计算 x_{t-1}
        pred_prev_sample = (
            torch.sqrt(alpha_prod_t_prev) * pred_original_sample +
            pred_sample_direction
        )

        # 4. 添加噪声(除了最后一步)
        if t > 0:
            noise = torch.randn_like(sample)
            variance = torch.sqrt(self.posterior_variance[t]) * noise
            pred_prev_sample = pred_prev_sample + variance

        return pred_prev_sample

class SimpleUNet(nn.Module):
    """简化的UNet去噪网络"""
    def __init__(
        self,
        in_channels: int = 3,
        out_channels: int = 3,
        channels: int = 128,
        time_emb_dim: int = 256
    ):
        super().__init__()

        # 时间嵌入
        self.time_mlp = nn.Sequential(
            nn.Linear(channels, time_emb_dim),
            nn.SiLU(),
            nn.Linear(time_emb_dim, time_emb_dim)
        )

        # 简化的UNet结构
        self.down1 = nn.Conv2d(in_channels, channels, 3, padding=1)
        self.down2 = nn.Conv2d(channels, channels * 2, 3, padding=1)
        self.down3 = nn.Conv2d(channels * 2, channels * 4, 3, padding=1)

        self.mid = nn.Conv2d(channels * 4, channels * 4, 3, padding=1)

        self.up1 = nn.ConvTranspose2d(channels * 4, channels * 2, 2, 2)
        self.up2 = nn.ConvTranspose2d(channels * 2, channels, 2, 2)
        self.up3 = nn.ConvTranspose2d(channels, channels, 2, 2)

        self.out = nn.Conv2d(channels, out_channels, 1)

    def sinusoidal_embedding(self, timesteps, dim):
        """正弦位置编码"""
        half_dim = dim // 2
        emb = math.log(10000) / (half_dim - 1)
        emb = torch.exp(
            torch.arange(half_dim, device=timesteps.device) * -emb
        )
        emb = timesteps[:, None] * emb[None, :]
        emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=-1)
        return emb

    def forward(self, x, timesteps):
        # 时间嵌入
        t_emb = self.sinusoidal_embedding(timesteps, 128)
        t_emb = self.time_mlp(t_emb)

        # UNet处理(简化版)
        d1 = self.down1(x)
        d2 = self.down2(nn.functional.avg_pool2d(d1, 2))
        d3 = self.down3(nn.functional.avg_pool2d(d2, 2))

        m = self.mid(nn.functional.avg_pool2d(d3, 2))

        u1 = self.up1(m)
        u2 = self.up2(u1 + d3)
        u3 = self.up3(u2 + d2)

        return self.out(u3)

# 训练循环
def train_ddpm():
    """DDPM训练示例"""
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # 创建模型和调度器
    model = SimpleUNet(in_channels=3, out_channels=3).to(device)
    scheduler = DDPMScheduler(num_train_timesteps=1000)

    # 优化器
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)

    # 训练步骤
    def train_step(images):
        optimizer.zero_grad()

        batch_size = images.shape[0]

        # 随机选择时间步
        timesteps = torch.randint(
            0, scheduler.num_train_timesteps, (batch_size,),
            device=device
        )

        # 生成噪声
        noise = torch.randn_like(images)

        # 添加噪声
        noisy_images = scheduler.add_noise(images, noise, timesteps)

        # 预测噪声
        noise_pred = model(noisy_images, timesteps)

        # 计算损失
        loss = torch.nn.functional.mse_loss(noise_pred, noise)

        # 反向传播
        loss.backward()
        optimizer.step()

        return loss.item()

    # 模拟训练
    for step in range(10):
        fake_images = torch.randn(4, 3, 64, 64, device=device)
        loss = train_step(fake_images)
        print(f"Step {step}: Loss = {loss:.4f}")

    # 采样
    @torch.no_grad()
    def sample(num_samples=1):
        model.eval()

        # 从纯噪声开始
        image = torch.randn(num_samples, 3, 64, 64, device=device)

        # 逐步去噪
        for t in reversed(range(scheduler.num_train_timesteps)):
            timestep = torch.tensor([t], device=device).repeat(num_samples)

            # 预测噪声
            noise_pred = model(image, timestep)

            # 去噪一步
            image = scheduler.step(noise_pred, t, image)

            if t % 100 == 0:
                print(f"Sampling step {t}")

        return image

    # 生成图像
    print("\n生成图像...")
    generated = sample(num_samples=4)
    print(f"生成完成!形状: {generated.shape}")

if __name__ == "__main__":
    train_ddpm()

18.4.2 Stable Diffusion详解

Stable Diffusion架构:

文本 → CLIP Text Encoder → 文本嵌入
    ↓
潜在扩散(Latent Diffusion):
    图像 → VAE Encoder → 潜在空间 (压缩8倍)
    潜在空间 + 文本嵌入 → U-Net去噪 → 去噪潜在变量
    去噪潜在变量 → VAE Decoder → 生成图像

核心组件:

  1. CLIP Text Encoder: 理解文本提示
  2. VAE Encoder/Decoder: 压缩/解压图像
  3. U-Net: 在潜在空间去噪
  4. 调度器: 控制去噪过程
from diffusers import StableDiffusionPipeline, DPMSolverMultistepScheduler
import torch

# 使用Stable Diffusion生成图像
def generate_with_sd():
    """使用Stable Diffusion生成图像"""

    # 加载模型
    model_id = "stabilityai/stable-diffusion-2-1"
    pipe = StableDiffusionPipeline.from_pretrained(
        model_id,
        torch_dtype=torch.float16
    )

    # 使用更快的调度器
    pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config)

    # 移到GPU
    pipe = pipe.to("cuda")

    # 文本提示
    prompt = """
    A serene landscape at sunset, with mountains in the background,
    a calm lake reflecting the sky, and cherry blossoms in the foreground.
    Ultra detailed, 8k, cinematic lighting, highly detailed.
    """

    negative_prompt = """
    blurry, low quality, distorted, ugly, bad anatomy
    """

    # 生成
    with torch.autocast("cuda"):
        images = pipe(
            prompt=prompt,
            negative_prompt=negative_prompt,
            num_inference_steps=25,  # 推理步数
            guidance_scale=7.5,       # CFG scale
            width=768,
            height=768,
            num_images_per_prompt=4  # 生成4张
        ).images

    # 保存
    for i, image in enumerate(images):
        image.save(f"output_{i}.png")

    return images

18.4.3 ControlNet与LoRA

ControlNet - 可控生成:

from diffusers import StableDiffusionControlNetPipeline, ControlNetModel
from diffusers.utils import load_image
import torch
import cv2
import numpy as np

def use_controlnet():
    """使用ControlNet进行可控生成"""

    # 加载ControlNet模型(Canny边缘检测)
    controlnet = ControlNetModel.from_pretrained(
        "lllyasviel/control_v11p_sd15_canny",
        torch_dtype=torch.float16
    )

    # 创建pipeline
    pipe = StableDiffusionControlNetPipeline.from_pretrained(
        "runwayml/stable-diffusion-v1-5",
        controlnet=controlnet,
        torch_dtype=torch.float16
    )
    pipe.to("cuda")

    # 准备控制图像
    image = load_image("input.jpg")
    image = np.array(image)

    # Canny边缘检测
    low_threshold = 100
    high_threshold = 200
    image = cv2.Canny(image, low_threshold, high_threshold)
    image = image[:, :, None]
    image = np.concatenate([image, image, image], axis=2)
    control_image = Image.fromarray(image)

    # 生成
    prompt = "a professional photograph of a beautiful landscape"
    output = pipe(
        prompt=prompt,
        image=control_image,
        num_inference_steps=20,
        controlnet_conditioning_scale=1.0
    ).images[0]

    output.save("controlnet_output.png")
    return output

LoRA - 高效微调:

from diffusers import DiffusionPipeline
import torch

def use_lora():
    """使用LoRA风格化生成"""

    # 加载基础模型
    pipe = DiffusionPipeline.from_pretrained(
        "runwayml/stable-diffusion-v1-5",
        torch_dtype=torch.float16
    )
    pipe.to("cuda")

    # 加载LoRA权重
    pipe.load_lora_weights("style_lora_weights.safetensors")

    # 生成
    prompt = "a portrait of a woman in anime style"
    images = pipe(
        prompt=prompt,
        num_inference_steps=30,
        guidance_scale=7.5
    ).images[0]

    images.save("lora_output.png")
    return images

# 训练自定义LoRA
def train_custom_lora():
    """训练自定义LoRA"""

    from peft import LoraConfig, get_peft_model

    # LoRA配置
    lora_config = LoraConfig(
        r=8,  # LoRA rank
        lora_alpha=32,
        target_modules=["to_q", "to_k", "to_v", "to_out.0"],
        lora_dropout=0.05,
        bias="none"
    )

    # 应用LoRA到模型
    model = StableDiffusionPipeline.from_pretrained(
        "runwayml/stable-diffusion-v1-5"
    ).unet

    model = get_peft_model(model, lora_config)

    # 训练参数
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    all_params = sum(p.numel() for p in model.parameters())

    print(f"可训练参数: {trainable_params:,} ({trainable_params/all_params*100:.2f}%)")
    print(f"总参数: {all_params:,}")

    return model

18.4.4 SDXL与Stable Diffusion 3

SDXL架构特点:

from diffusers import StableDiffusionXLPipeline
import torch

def use_sdxl():
    """使用SDXL生成高分辨率图像"""

    # 加载SDXL
    pipe = StableDiffusionXLPipeline.from_pretrained(
        "stabilityai/stable-diffusion-xl-base-1.0",
        torch_dtype=torch.float16,
        variant="fp16",
        use_safetensors=True
    )
    pipe.to("cuda")

    # 生成1024x1024图像
    prompt = """
    masterpiece, best quality, ultra-detailed, 8k wallpaper,
    a futuristic city at night with neon lights,
    cyberpunk style, highly detailed architecture
    """

    negative_prompt = "blurry, low quality, distorted"

    images = pipe(
        prompt=prompt,
        negative_prompt=negative_prompt,
        num_inference_steps=40,
        guidance_scale=7.5,
        width=1024,
        height=1024
    ).images[0]

    images.save("sdxl_output.png")
    return images

Stable Diffusion 3 特点:

  • 使用Diffusion Transformer (DiT)架构
  • 多模态理解能力更强
  • 文本渲染改进
  • 更好的提示词理解

18.5 AI视频生成

AI视频生成是2024年最激动人心的突破之一。

18.5.1 Sora技术解析

Sora (OpenAI, 2024)

能力:
  - 最长60秒视频
  - 分辨率: 1920×1080
  - 多镜头切换
  - 复杂场景理解

技术架构:
  - Diffusion Transformer (DiT)
  - Spacetime Patches (时空补丁)
  - 可变分辨率/时长训练
  - 视频压缩网络

创新点:
  1. 统一的视觉数据表示
  2. 端到端学习时空一致性
  3. 文本→视频的强大理解
  4. 物理世界模拟能力

示例提示词:
  "A stylish woman walks down a Tokyo street filled with warm glowing neon
   and animated city signage. She wears a black leather jacket, a long red
   dress, and black boots, and carries a black purse."

18.5.2 视频生成模型对比

1. Runway Gen-2 & Gen-3

Gen-2 (2023):
  - 4秒视频
  - 文本/图像→视频
  - 商用产品

Gen-3 (2024):
  - 10秒视频
  - 更高保真度
  - 更好的运动控制
  - API可用

2. Pika Labs

特点:
  - 3秒视频
  - 易用性强
  - 网页界面
  - 图像动画化

Pika 1.0:
  - 更长视频
  - 编辑功能
  - 风格控制

3. Stable Video Diffusion

from diffusers import StableVideoDiffusionPipeline
from diffusers.utils import load_image, export_to_video
import torch

def generate_video_svd():
    """使用SVD生成视频"""

    pipe = StableVideoDiffusionPipeline.from_pretrained(
        "stabilityai/stable-video-diffusion-img2vid-xt",
        torch_dtype=torch.float16,
        variant="fp16"
    )
    pipe.to("cuda")

    # 输入图像
    image = load_image("input.jpg")
    image = image.resize((1024, 576))

    # 生成视频帧
    frames = pipe(
        image,
        decode_chunk_size=8,
        num_frames=25,  # 生成25帧
        motion_bucket_id=127,  # 运动强度(0-255)
        noise_aug_strength=0.02
    ).frames[0]

    # 导出视频
    export_to_video(frames, "output.mp4", fps=7)

    print(f"生成 {len(frames)} 帧视频")
    return frames

18.5.3 视频生成的挑战

技术挑战:

  1. 时间一致性: 前后帧保持连贯,避免闪烁
  2. 物理真实性: 运动符合物理规律
  3. 高分辨率: 视频需要更高质量
  4. 计算成本: 视频=图像×帧数,成本指数级增长
  5. 可控性: 精确控制内容、镜头、运动

未来方向:

  • 更长时长(数分钟)
  • 实时生成
  • 交互式编辑
  • 多模态控制(文本+草图+音频)

18.6 开源模型生态

18.6.1 LLaMA系列演进

Meta LLaMA:

LLaMA 3.1 (2024.07):
  规模: 8B, 70B, 405B
  数据: 15T+ tokens
  特点:
    - 开源最强模型(405B)
    - 多语言(8种)
    - 128K上下文窗口
    - 数学推理增强
  性能: 405B接近GPT-4
  许可: Llama 3.1 (可商用)

对比:
  LLaMA 3.1 405B vs GPT-4:
    - MMLU: 88.6% vs 86.4%
    - GSM8K: 96.8% vs 92.0%
    - HumanEval: 89.0% vs 67.0%
    - 成本: 开源免费 vs 闭源付费

18.6.2 中国开源模型

Qwen (通义千问)

Qwen2 (2024.06):
  规模: 0.5B - 72B
  创新:
    - 支持29种语言
    - 128K上下文
    - 代码和数学大幅提升
    - 极强的中文能力

性能:
  Qwen2-72B:
    - MMLU: 84.2%
    - C-Eval (中文): 91.6%
    - HumanEval: 64.6%

DeepSeek

DeepSeek-V2 (2024):
  架构: MoE (236B总参数, 21B激活)
  创新:
    - 多头潜在注意力(MLA)
    - 极致成本效率
    - 128K上下文

性能:
  - 接近GPT-4水平
  - 训练成本仅GPT-4的1/10
  - 开源可商用

18.6.3 开源模型排行榜 (2024)

模型参数MMLUAlpacaEval许可发布时间
LLaMA 3.1 405B405B88.6%-Llama 3.12024.07
LLaMA 3.1 70B70B86.0%51%Llama 3.12024.07
Qwen2 72B72B84.2%42%Apache 2.02024.06
Mixtral 8x22B141B77.8%38%Apache 2.02024.04
DeepSeek-V2236B81.7%-MIT2024.05
Mistral 7B7B62.5%23%Apache 2.02023.09

18.7 本章总结

本章深入探讨了AI领域的前沿技术趋势:

核心要点:

  1. MoE架构: 大模型新范式,平衡规模与效率(Mixtral 8x7B, DeepSeek-V2)
  2. 长上下文: 从4K到百万token的突破(Gemini 1.5 Pro 1M tokens)
  3. 模型量化: GPTQ、AWQ等技术实现4x-8x压缩,精度损失<3%
  4. Diffusion模型: 从DDPM到Stable Diffusion,革命性图像生成
  5. AI视频生成: Sora引领,60秒高质量视频生成
  6. 开源生态: LLaMA 3.1 405B等推动AI民主化

实践建议:

  • 关注MoE架构在大规模模型中的应用
  • 掌握量化技术(GPTQ、AWQ、bitsandbytes),优化部署
  • 学习Diffusion原理,应用于创意生成
  • 跟踪开源模型进展,选择合适工具
  • 实验长上下文技术,拓展应用场景

未来展望:

  • MoE将成为主流架构
  • 百万级上下文将普及
  • 极致量化(INT2/INT1)
  • 多模态统一模型
  • 实时视频生成

参考资源

论文:

  • Mixtral of Experts (Mistral AI, 2023)
  • FlashAttention-2 (Dao et al., 2023)
  • GPTQ: Accurate Post-Training Quantization (Frantar et al., 2023)
  • AWQ: Activation-aware Weight Quantization (Lin et al., 2023)
  • Denoising Diffusion Probabilistic Models (Ho et al., 2020)
  • High-Resolution Image Synthesis with Latent Diffusion Models (Rombach et al., 2022)

代码库:

  • https://github.com/mistralai/mistral-src
  • https://github.com/Dao-AILab/flash-attention
  • https://github.com/huggingface/diffusers
  • https://github.com/vllm-project/vllm
  • https://github.com/AutoGPTQ/AutoGPTQ
  • https://github.com/mit-han-lab/llm-awq

工具:

  • HuggingFace Transformers
  • bitsandbytes
  • llama.cpp
  • vLLM
  • AUTOMATIC1111/stable-diffusion-webui
Prev
17-多模态大模型
Next
第19章 AI热门话题与应用案例