HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

GPU 虚拟化技术

概述

GPU 虚拟化是 AI 基础设施的关键技术,它允许多个工作负载共享物理 GPU 资源,提高资源利用率,降低成本。本文深入探讨 GPU 虚拟化的原理、主流方案及实践。

GPU 虚拟化原理

虚拟化层次

┌─────────────────────────────────────────────────────────────────┐
│                    GPU 虚拟化架构层次                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  应用层虚拟化                                                     │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             │
│  │  容器化方案   │  │  框架级隔离   │  │  进程级共享   │             │
│  │ (Docker GPU) │  │ (TF/PyTorch)│  │  (MPS/MIG)  │             │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘             │
│         │                │                │                     │
│  ───────┴────────────────┴────────────────┴─────────────        │
│                                                                 │
│  驱动层虚拟化                                                     │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             │
│  │  API 转发    │  │  显存虚拟化   │  │  算力分片    │             │
│  │ (vGPU/GRID) │  │ (Memory Pool)│  │ (Time Slice)│             │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘             │
│         │                │                │                     │
│  ───────┴────────────────┴────────────────┴─────────────        │
│                                                                 │
│  硬件层虚拟化                                                     │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             │
│  │    SR-IOV   │  │     MIG     │  │   硬件分区    │             │
│  │  (PCIe VF)  │  │ (A100/H100) │  │  (Gaudi/TPU) │             │
│  └─────────────┘  └─────────────┘  └─────────────┘             │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

资源隔离维度

# GPU 虚拟化资源隔离
resource_isolation:
  # 算力隔离
  compute:
    - sm_partitioning      # SM 分区
    - time_slicing         # 时间片
    - priority_scheduling  # 优先级调度

  # 显存隔离
  memory:
    - physical_partition   # 物理分区 (MIG)
    - virtual_memory       # 虚拟显存
    - memory_limit         # 显存限制

  # 带宽隔离
  bandwidth:
    - nvlink_partition     # NVLink 分区
    - pcie_bandwidth       # PCIe 带宽
    - memory_bandwidth     # 显存带宽

  # 故障隔离
  fault:
    - ecc_isolation        # ECC 错误隔离
    - process_isolation    # 进程级隔离
    - vm_isolation         # 虚拟机隔离

NVIDIA MIG 技术

MIG 架构详解

┌─────────────────────────────────────────────────────────────────┐
│                   NVIDIA A100 MIG 架构                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Full GPU (80GB HBM2e, 108 SMs)                                │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                                                         │   │
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐       │   │
│  │  │  GPC 0  │ │  GPC 1  │ │  GPC 2  │ │  GPC 3  │       │   │
│  │  │ 14 SMs  │ │ 14 SMs  │ │ 14 SMs  │ │ 14 SMs  │       │   │
│  │  └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘       │   │
│  │       │           │           │           │             │   │
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐       │   │
│  │  │  GPC 4  │ │  GPC 5  │ │  GPC 6  │ │  GPC 7  │       │   │
│  │  │ 14 SMs  │ │ 14 SMs  │ │ 14 SMs  │ │ 14 SMs  │       │   │
│  │  └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘       │   │
│  │       │           │           │           │             │   │
│  │  ═════╪═══════════╪═══════════╪═══════════╪═════════   │   │
│  │       │    Memory Controller & L2 Cache   │             │   │
│  │  ═════╪═══════════╪═══════════╪═══════════╪═════════   │   │
│  │       │           │           │           │             │   │
│  │  ┌─────────────────────────────────────────────┐       │   │
│  │  │              HBM2e Memory (80GB)             │       │   │
│  │  │  Slice 0 │ Slice 1 │ ... │ Slice 7          │       │   │
│  │  │   10GB   │  10GB   │     │  10GB             │       │   │
│  │  └─────────────────────────────────────────────┘       │   │
│  │                                                         │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                 │
│  MIG 分区示例:                                                   │
│  ┌───────────────────┬───────────────────┬─────────────────┐   │
│  │   1g.10gb (×7)    │   2g.20gb (×3)    │   3g.40gb (×2)  │   │
│  │   7个小实例        │   3个中实例        │    2个大实例     │   │
│  └───────────────────┴───────────────────┴─────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

MIG 配置管理

"""
NVIDIA MIG 配置管理工具
"""
import subprocess
import json
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum


class MIGProfile(Enum):
    """MIG 配置文件"""
    PROFILE_1G_5GB = "1g.5gb"      # A30
    PROFILE_1G_10GB = "1g.10gb"    # A100-40GB
    PROFILE_1G_20GB = "1g.20gb"    # A100-80GB
    PROFILE_2G_10GB = "2g.10gb"    # A30
    PROFILE_2G_20GB = "2g.20gb"    # A100-40GB
    PROFILE_3G_20GB = "3g.20gb"    # A30
    PROFILE_3G_40GB = "3g.40gb"    # A100-40GB
    PROFILE_4G_40GB = "4g.40gb"    # A100-80GB
    PROFILE_7G_40GB = "7g.40gb"    # A100-40GB Full
    PROFILE_7G_80GB = "7g.80gb"    # A100-80GB Full


@dataclass
class GPUInstance:
    """GPU 实例"""
    gi_id: int
    profile: str
    placement_start: int
    placement_size: int


@dataclass
class ComputeInstance:
    """计算实例"""
    ci_id: int
    gi_id: int
    profile: str


class MIGManager:
    """MIG 管理器"""

    def __init__(self, gpu_index: int = 0):
        self.gpu_index = gpu_index
        self._validate_mig_support()

    def _validate_mig_support(self):
        """验证 MIG 支持"""
        result = self._run_nvidia_smi([
            "nvidia-smi", "-i", str(self.gpu_index),
            "--query-gpu=mig.mode.current",
            "--format=csv,noheader"
        ])

        if "Enabled" not in result and "[N/A]" in result:
            raise RuntimeError(f"GPU {self.gpu_index} does not support MIG")

    def _run_nvidia_smi(self, cmd: List[str]) -> str:
        """执行 nvidia-smi 命令"""
        result = subprocess.run(cmd, capture_output=True, text=True)
        if result.returncode != 0:
            raise RuntimeError(f"Command failed: {result.stderr}")
        return result.stdout.strip()

    def enable_mig(self) -> bool:
        """启用 MIG 模式"""
        try:
            self._run_nvidia_smi([
                "nvidia-smi", "-i", str(self.gpu_index),
                "-mig", "1"
            ])
            print(f"MIG enabled on GPU {self.gpu_index}. Reboot required.")
            return True
        except Exception as e:
            print(f"Failed to enable MIG: {e}")
            return False

    def disable_mig(self) -> bool:
        """禁用 MIG 模式"""
        try:
            # 首先销毁所有实例
            self.destroy_all_instances()

            self._run_nvidia_smi([
                "nvidia-smi", "-i", str(self.gpu_index),
                "-mig", "0"
            ])
            print(f"MIG disabled on GPU {self.gpu_index}. Reboot required.")
            return True
        except Exception as e:
            print(f"Failed to disable MIG: {e}")
            return False

    def get_mig_status(self) -> Dict:
        """获取 MIG 状态"""
        # 获取 MIG 模式
        mode = self._run_nvidia_smi([
            "nvidia-smi", "-i", str(self.gpu_index),
            "--query-gpu=mig.mode.current",
            "--format=csv,noheader"
        ])

        # 获取 GPU 实例
        gi_output = self._run_nvidia_smi([
            "nvidia-smi", "mig", "-lgi", "-i", str(self.gpu_index)
        ])

        # 获取计算实例
        ci_output = self._run_nvidia_smi([
            "nvidia-smi", "mig", "-lci", "-i", str(self.gpu_index)
        ])

        return {
            "gpu_index": self.gpu_index,
            "mig_mode": mode,
            "gpu_instances": gi_output,
            "compute_instances": ci_output
        }

    def list_available_profiles(self) -> List[Dict]:
        """列出可用的 MIG 配置"""
        output = self._run_nvidia_smi([
            "nvidia-smi", "mig", "-lgip", "-i", str(self.gpu_index)
        ])

        profiles = []
        for line in output.split('\n'):
            if 'MIG' in line or 'GPU' in line or '=' in line:
                continue
            parts = line.split()
            if len(parts) >= 4:
                profiles.append({
                    "id": parts[0],
                    "instances": parts[1],
                    "memory": parts[2],
                    "sms": parts[3] if len(parts) > 3 else "N/A"
                })

        return profiles

    def create_gpu_instance(self, profile: str) -> Optional[int]:
        """创建 GPU 实例"""
        try:
            output = self._run_nvidia_smi([
                "nvidia-smi", "mig", "-cgi", profile,
                "-i", str(self.gpu_index)
            ])

            # 解析输出获取 GI ID
            for line in output.split('\n'):
                if 'Successfully created GPU instance' in line:
                    gi_id = int(line.split('ID')[1].split()[0])
                    print(f"Created GPU instance {gi_id} with profile {profile}")
                    return gi_id

            return None
        except Exception as e:
            print(f"Failed to create GPU instance: {e}")
            return None

    def create_compute_instance(self, gi_id: int, profile: str = None) -> Optional[int]:
        """创建计算实例"""
        try:
            cmd = [
                "nvidia-smi", "mig", "-cci",
                "-gi", str(gi_id),
                "-i", str(self.gpu_index)
            ]

            if profile:
                cmd.insert(3, profile)

            output = self._run_nvidia_smi(cmd)

            for line in output.split('\n'):
                if 'Successfully created compute instance' in line:
                    ci_id = int(line.split('ID')[1].split()[0])
                    print(f"Created compute instance {ci_id} on GPU instance {gi_id}")
                    return ci_id

            return None
        except Exception as e:
            print(f"Failed to create compute instance: {e}")
            return None

    def destroy_gpu_instance(self, gi_id: int) -> bool:
        """销毁 GPU 实例"""
        try:
            # 首先销毁所有关联的计算实例
            self._run_nvidia_smi([
                "nvidia-smi", "mig", "-dci",
                "-gi", str(gi_id),
                "-i", str(self.gpu_index)
            ])

            # 销毁 GPU 实例
            self._run_nvidia_smi([
                "nvidia-smi", "mig", "-dgi",
                "-gi", str(gi_id),
                "-i", str(self.gpu_index)
            ])

            print(f"Destroyed GPU instance {gi_id}")
            return True
        except Exception as e:
            print(f"Failed to destroy GPU instance: {e}")
            return False

    def destroy_all_instances(self) -> bool:
        """销毁所有实例"""
        try:
            # 销毁所有计算实例
            self._run_nvidia_smi([
                "nvidia-smi", "mig", "-dci",
                "-i", str(self.gpu_index)
            ])

            # 销毁所有 GPU 实例
            self._run_nvidia_smi([
                "nvidia-smi", "mig", "-dgi",
                "-i", str(self.gpu_index)
            ])

            print("Destroyed all MIG instances")
            return True
        except Exception as e:
            print(f"Failed to destroy instances: {e}")
            return False

    def apply_profile(self, profile_config: List[str]) -> bool:
        """
        应用 MIG 配置

        Args:
            profile_config: 配置列表,如 ["1g.10gb", "1g.10gb", "2g.20gb"]
        """
        try:
            # 销毁现有实例
            self.destroy_all_instances()

            # 创建新实例
            for profile in profile_config:
                gi_id = self.create_gpu_instance(profile)
                if gi_id is not None:
                    self.create_compute_instance(gi_id)

            return True
        except Exception as e:
            print(f"Failed to apply profile: {e}")
            return False


class MIGProfileTemplates:
    """预定义的 MIG 配置模板"""

    # A100-40GB 配置模板
    A100_40GB_TEMPLATES = {
        # 7 个小实例 - 适合推理
        "inference_small": ["1g.5gb"] * 7,

        # 3 个中等实例 - 平衡配置
        "balanced": ["2g.10gb", "2g.10gb", "3g.20gb"],

        # 2 个大实例 - 适合训练
        "training": ["3g.20gb", "4g.20gb"],

        # 1 大 + 多小 - 混合工作负载
        "mixed": ["4g.20gb", "1g.5gb", "1g.5gb", "1g.5gb"],

        # 全 GPU - 单任务
        "full": ["7g.40gb"]
    }

    # A100-80GB 配置模板
    A100_80GB_TEMPLATES = {
        "inference_small": ["1g.10gb"] * 7,
        "balanced": ["2g.20gb", "2g.20gb", "3g.40gb"],
        "training": ["3g.40gb", "4g.40gb"],
        "mixed": ["4g.40gb", "1g.10gb", "1g.10gb", "1g.10gb"],
        "full": ["7g.80gb"]
    }

    @classmethod
    def get_template(cls, gpu_model: str, template_name: str) -> List[str]:
        """获取配置模板"""
        if "80GB" in gpu_model or "80G" in gpu_model:
            templates = cls.A100_80GB_TEMPLATES
        else:
            templates = cls.A100_40GB_TEMPLATES

        return templates.get(template_name, templates["balanced"])


# Kubernetes MIG 设备插件配置
class KubernetesMIGConfig:
    """Kubernetes MIG 配置生成器"""

    @staticmethod
    def generate_device_plugin_config(
        strategy: str = "mixed",
        mig_configs: Dict[str, List[str]] = None
    ) -> Dict:
        """
        生成设备插件配置

        Args:
            strategy: single | mixed | none
            mig_configs: GPU 索引到 MIG 配置的映射
        """
        config = {
            "version": "v1",
            "sharing": {
                "mig": {
                    "strategy": strategy,
                    "devices": {}
                }
            }
        }

        if mig_configs:
            for gpu_index, profiles in mig_configs.items():
                config["sharing"]["mig"]["devices"][gpu_index] = profiles

        return config

    @staticmethod
    def generate_resource_request(profile: str, count: int = 1) -> Dict:
        """生成资源请求"""
        # MIG 资源格式: nvidia.com/mig-<profile>
        resource_name = f"nvidia.com/mig-{profile}"

        return {
            "resources": {
                "limits": {
                    resource_name: str(count)
                },
                "requests": {
                    resource_name: str(count)
                }
            }
        }


# 使用示例
if __name__ == "__main__":
    # MIG 管理示例
    # mig = MIGManager(gpu_index=0)
    # mig.apply_profile(["2g.20gb", "2g.20gb", "3g.40gb"])

    # Kubernetes 配置示例
    k8s_config = KubernetesMIGConfig.generate_device_plugin_config(
        strategy="mixed",
        mig_configs={
            "0": ["1g.10gb", "1g.10gb", "2g.20gb", "3g.40gb"],
            "1": ["7g.80gb"]  # 全 GPU
        }
    )
    print(json.dumps(k8s_config, indent=2))

MIG Kubernetes 集成

# nvidia-device-plugin-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: nvidia-device-plugin-config
  namespace: kube-system
data:
  config.yaml: |
    version: v1
    flags:
      migStrategy: "mixed"  # none, single, mixed
    sharing:
      timeSlicing:
        renameByDefault: false
        resources:
          - name: nvidia.com/gpu
            replicas: 2  # 时间片复制因子

---
# MIG 配置 ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
  name: mig-parted-config
  namespace: gpu-operator
data:
  config.yaml: |
    version: v1
    mig-configs:
      # 推理优化配置
      all-1g.10gb:
        - devices: all
          mig-enabled: true
          mig-devices:
            "1g.10gb": 7

      # 平衡配置
      all-balanced:
        - devices: all
          mig-enabled: true
          mig-devices:
            "2g.20gb": 2
            "3g.40gb": 1

      # 训练配置
      all-3g.40gb:
        - devices: all
          mig-enabled: true
          mig-devices:
            "3g.40gb": 2

      # 混合配置 - 不同节点不同策略
      custom-config:
        - devices: [0, 1]
          mig-enabled: true
          mig-devices:
            "1g.10gb": 7
        - devices: [2, 3]
          mig-enabled: true
          mig-devices:
            "3g.40gb": 2
        - devices: [4, 5, 6, 7]
          mig-enabled: false

---
# 使用 MIG 实例的 Pod
apiVersion: v1
kind: Pod
metadata:
  name: mig-inference-pod
spec:
  containers:
  - name: inference
    image: nvcr.io/nvidia/pytorch:23.10-py3
    resources:
      limits:
        nvidia.com/mig-1g.10gb: 1  # 请求 1g.10gb MIG 实例
    command: ["python", "inference.py"]
  nodeSelector:
    nvidia.com/mig.config: "all-1g.10gb"

---
# MIG 感知的调度器配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: scheduler-config
  namespace: kube-system
data:
  scheduler-config.yaml: |
    apiVersion: kubescheduler.config.k8s.io/v1
    kind: KubeSchedulerConfiguration
    profiles:
    - schedulerName: gpu-scheduler
      plugins:
        filter:
          enabled:
          - name: NodeResourcesFit
        score:
          enabled:
          - name: NodeResourcesBalancedAllocation
            weight: 1
          - name: NodeResourcesLeastAllocated
            weight: 1
      pluginConfig:
      - name: NodeResourcesFit
        args:
          scoringStrategy:
            type: LeastAllocated
            resources:
            - name: nvidia.com/mig-1g.10gb
              weight: 1
            - name: nvidia.com/mig-2g.20gb
              weight: 2
            - name: nvidia.com/mig-3g.40gb
              weight: 3

GPU 时间片共享

时间片原理

┌─────────────────────────────────────────────────────────────────┐
│                    GPU 时间片调度原理                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  时间轴:                                                         │
│  ┌────┬────┬────┬────┬────┬────┬────┬────┬────┬────┐           │
│  │ P1 │ P2 │ P3 │ P1 │ P2 │ P3 │ P1 │ P2 │ P3 │... │           │
│  └────┴────┴────┴────┴────┴────┴────┴────┴────┴────┘           │
│    ↑                                                            │
│    └─ 时间片 (默认 60ms)                                         │
│                                                                 │
│  上下文切换开销:                                                  │
│  ┌──────────────────────────────────────────────────┐          │
│  │  ┌─────┐     ┌─────┐     ┌─────┐                │          │
│  │  │ Run │ Save│ Run │ Load│ Run │                │          │
│  │  │ P1  │ Ctx │ P2  │ Ctx │ P3  │                │          │
│  │  └─────┘     └─────┘     └─────┘                │          │
│  │        ↑           ↑                            │          │
│  │        └───────────┴─ 切换开销 (~100μs)          │          │
│  └──────────────────────────────────────────────────┘          │
│                                                                 │
│  MPS vs 时间片:                                                  │
│  ┌────────────────────┬────────────────────────────┐           │
│  │     时间片          │         MPS               │           │
│  ├────────────────────┼────────────────────────────┤           │
│  │ 串行执行            │ 并行执行                   │           │
│  │ 显存独占            │ 显存共享                   │           │
│  │ 完全隔离            │ 部分隔离                   │           │
│  │ 高切换开销          │ 低切换开销                 │           │
│  │ 兼容性好            │ 限制较多                   │           │
│  └────────────────────┴────────────────────────────┘           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

时间片配置实现

"""
GPU 时间片共享配置
"""
import os
from typing import Dict, List, Optional
from dataclasses import dataclass
import yaml


@dataclass
class TimeSlicingConfig:
    """时间片配置"""
    replicas: int = 2
    rename_by_default: bool = False
    fail_requests_greater_than_one: bool = False

    def to_dict(self) -> Dict:
        return {
            "replicas": self.replicas,
            "renameByDefault": self.rename_by_default,
            "failRequestsGreaterThanOne": self.fail_requests_greater_than_one
        }


class GPUTimeSlicingManager:
    """GPU 时间片管理器"""

    def __init__(self):
        self.default_config = TimeSlicingConfig()

    def generate_device_plugin_config(
        self,
        gpu_configs: Dict[str, TimeSlicingConfig] = None,
        mig_strategy: str = "none"
    ) -> Dict:
        """
        生成设备插件配置

        Args:
            gpu_configs: GPU 类型到配置的映射
            mig_strategy: MIG 策略
        """
        config = {
            "version": "v1",
            "flags": {
                "migStrategy": mig_strategy
            },
            "sharing": {
                "timeSlicing": {
                    "renameByDefault": self.default_config.rename_by_default,
                    "failRequestsGreaterThanOne": self.default_config.fail_requests_greater_than_one,
                    "resources": []
                }
            }
        }

        if gpu_configs:
            for gpu_name, ts_config in gpu_configs.items():
                config["sharing"]["timeSlicing"]["resources"].append({
                    "name": gpu_name,
                    "replicas": ts_config.replicas
                })
        else:
            # 默认配置
            config["sharing"]["timeSlicing"]["resources"].append({
                "name": "nvidia.com/gpu",
                "replicas": self.default_config.replicas
            })

        return config

    def generate_kubernetes_manifests(
        self,
        replicas: int = 4,
        namespace: str = "kube-system"
    ) -> str:
        """生成 Kubernetes 清单"""
        manifests = f"""
# ConfigMap for time-slicing
apiVersion: v1
kind: ConfigMap
metadata:
  name: time-slicing-config
  namespace: {namespace}
data:
  time-slicing-config.yaml: |
    version: v1
    sharing:
      timeSlicing:
        renameByDefault: true
        failRequestsGreaterThanOne: false
        resources:
        - name: nvidia.com/gpu
          replicas: {replicas}

---
# 更新 NVIDIA Device Plugin
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: nvidia-device-plugin-daemonset
  namespace: {namespace}
spec:
  selector:
    matchLabels:
      name: nvidia-device-plugin-ds
  template:
    metadata:
      labels:
        name: nvidia-device-plugin-ds
    spec:
      tolerations:
      - key: nvidia.com/gpu
        operator: Exists
        effect: NoSchedule
      containers:
      - name: nvidia-device-plugin-ctr
        image: nvcr.io/nvidia/k8s-device-plugin:v0.14.1
        env:
        - name: CONFIG_FILE
          value: /etc/nvidia/time-slicing-config.yaml
        volumeMounts:
        - name: device-plugin
          mountPath: /var/lib/kubelet/device-plugins
        - name: config
          mountPath: /etc/nvidia
      volumes:
      - name: device-plugin
        hostPath:
          path: /var/lib/kubelet/device-plugins
      - name: config
        configMap:
          name: time-slicing-config
"""
        return manifests


class AdaptiveTimeSlicing:
    """自适应时间片调整"""

    def __init__(self, min_replicas: int = 2, max_replicas: int = 10):
        self.min_replicas = min_replicas
        self.max_replicas = max_replicas
        self.current_replicas = min_replicas

    def calculate_optimal_replicas(
        self,
        workload_metrics: Dict
    ) -> int:
        """
        根据工作负载计算最佳复制因子

        考虑因素:
        - GPU 利用率
        - 显存使用率
        - 平均任务时长
        - 队列深度
        """
        gpu_util = workload_metrics.get("gpu_utilization", 50)
        memory_util = workload_metrics.get("memory_utilization", 50)
        avg_task_duration = workload_metrics.get("avg_task_duration_ms", 100)
        queue_depth = workload_metrics.get("queue_depth", 0)

        # 基础复制因子
        base_replicas = self.min_replicas

        # 根据 GPU 利用率调整
        if gpu_util < 30:
            # 低利用率,增加复制
            base_replicas = min(self.max_replicas, base_replicas + 2)
        elif gpu_util > 80:
            # 高利用率,减少复制
            base_replicas = max(self.min_replicas, base_replicas - 1)

        # 根据显存使用率调整
        memory_factor = 1.0 - (memory_util / 100)
        base_replicas = int(base_replicas * (1 + memory_factor * 0.5))

        # 根据任务时长调整
        if avg_task_duration < 50:  # 短任务
            base_replicas = min(self.max_replicas, base_replicas + 1)
        elif avg_task_duration > 500:  # 长任务
            base_replicas = max(self.min_replicas, base_replicas - 1)

        # 根据队列深度调整
        if queue_depth > 10:
            base_replicas = min(self.max_replicas, base_replicas + 1)

        return min(self.max_replicas, max(self.min_replicas, base_replicas))

    def should_update(self, new_replicas: int) -> bool:
        """判断是否需要更新配置"""
        # 避免频繁变更
        if abs(new_replicas - self.current_replicas) < 2:
            return False
        return True


# 使用示例
if __name__ == "__main__":
    manager = GPUTimeSlicingManager()

    # 生成基础配置
    config = manager.generate_device_plugin_config(
        gpu_configs={
            "nvidia.com/gpu": TimeSlicingConfig(replicas=4),
        }
    )
    print(yaml.dump(config))

    # 生成 Kubernetes 清单
    manifests = manager.generate_kubernetes_manifests(replicas=4)
    print(manifests)

vGPU 虚拟化方案

vGPU 架构

┌─────────────────────────────────────────────────────────────────┐
│                      vGPU 虚拟化架构                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  虚拟机 / 容器层                                                  │
│  ┌───────────┐  ┌───────────┐  ┌───────────┐  ┌───────────┐   │
│  │   VM 1    │  │   VM 2    │  │   VM 3    │  │   VM 4    │   │
│  │ ┌───────┐ │  │ ┌───────┐ │  │ ┌───────┐ │  │ ┌───────┐ │   │
│  │ │ App   │ │  │ │ App   │ │  │ │ App   │ │  │ │ App   │ │   │
│  │ ├───────┤ │  │ ├───────┤ │  │ ├───────┤ │  │ ├───────┤ │   │
│  │ │ CUDA  │ │  │ │ CUDA  │ │  │ │ CUDA  │ │  │ │ CUDA  │ │   │
│  │ ├───────┤ │  │ ├───────┤ │  │ ├───────┤ │  │ ├───────┤ │   │
│  │ │vGPU   │ │  │ │vGPU   │ │  │ │vGPU   │ │  │ │vGPU   │ │   │
│  │ │Driver │ │  │ │Driver │ │  │ │Driver │ │  │ │Driver │ │   │
│  │ └───────┘ │  │ └───────┘ │  │ └───────┘ │  │ └───────┘ │   │
│  └─────┬─────┘  └─────┬─────┘  └─────┬─────┘  └─────┬─────┘   │
│        │              │              │              │          │
│  ══════╪══════════════╪══════════════╪══════════════╪════════  │
│        │      Hypervisor / Container Runtime        │          │
│  ══════╪══════════════╪══════════════╪══════════════╪════════  │
│        │              │              │              │          │
│  vGPU Manager (NVIDIA GRID / HAMi / gpushare)                  │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐       │  │
│  │  │ Scheduler   │  │ Memory Mgr  │  │ Profile Mgr │       │  │
│  │  │             │  │             │  │             │       │  │
│  │  │ - 时间片     │  │ - 显存分配   │  │ - vGPU配置  │       │  │
│  │  │ - 优先级     │  │ - 显存隔离   │  │ - QoS策略   │       │  │
│  │  └─────────────┘  └─────────────┘  └─────────────┘       │  │
│  └──────────────────────────────────────────────────────────┘  │
│                              │                                  │
│  ════════════════════════════╪════════════════════════════════  │
│                              │                                  │
│  物理 GPU                     │                                  │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                    NVIDIA GPU                             │  │
│  │  ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐             │  │
│  │  │SM Bank1│ │SM Bank2│ │SM Bank3│ │SM Bank4│             │  │
│  │  └────────┘ └────────┘ └────────┘ └────────┘             │  │
│  │  ┌──────────────────────────────────────────┐             │  │
│  │  │            GPU Memory (VRAM)             │             │  │
│  │  └──────────────────────────────────────────┘             │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

HAMi 开源 GPU 共享方案

"""
HAMi (Heterogeneous AI Computing Virtualization Middleware) 配置
开源 GPU 共享方案
"""
from typing import Dict, List, Optional
from dataclasses import dataclass
import yaml


@dataclass
class HAMiDeviceConfig:
    """HAMi 设备配置"""
    gpu_memory: str  # 显存大小,如 "4Gi"
    gpu_cores: int  # GPU 核心百分比 0-100
    priority: int = 0  # 优先级


class HAMiManager:
    """HAMi 管理器"""

    def __init__(self):
        self.namespace = "hami-system"

    def generate_installation_manifest(self) -> str:
        """生成安装清单"""
        return f"""
# HAMi 命名空间
apiVersion: v1
kind: Namespace
metadata:
  name: {self.namespace}

---
# 使用 Helm 安装
# helm repo add hami https://project-hami.github.io/HAMi/
# helm install hami hami/hami -n {self.namespace}

# 或使用 YAML 部署
# HAMi Scheduler
apiVersion: apps/v1
kind: Deployment
metadata:
  name: hami-scheduler
  namespace: {self.namespace}
spec:
  replicas: 1
  selector:
    matchLabels:
      app: hami-scheduler
  template:
    metadata:
      labels:
        app: hami-scheduler
    spec:
      serviceAccountName: hami-scheduler
      containers:
      - name: scheduler
        image: projecthami/hami:v2.3
        args:
        - scheduler
        - --config=/config/config.yaml
        volumeMounts:
        - name: config
          mountPath: /config
      volumes:
      - name: config
        configMap:
          name: hami-scheduler-config

---
# HAMi Device Plugin
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: hami-device-plugin
  namespace: {self.namespace}
spec:
  selector:
    matchLabels:
      app: hami-device-plugin
  template:
    metadata:
      labels:
        app: hami-device-plugin
    spec:
      tolerations:
      - key: nvidia.com/gpu
        operator: Exists
        effect: NoSchedule
      containers:
      - name: device-plugin
        image: projecthami/hami:v2.3
        args:
        - device-plugin
        - --node-name=$(NODE_NAME)
        env:
        - name: NODE_NAME
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        securityContext:
          privileged: true
        volumeMounts:
        - name: device-plugin
          mountPath: /var/lib/kubelet/device-plugins
        - name: pod-resources
          mountPath: /var/lib/kubelet/pod-resources
      volumes:
      - name: device-plugin
        hostPath:
          path: /var/lib/kubelet/device-plugins
      - name: pod-resources
        hostPath:
          path: /var/lib/kubelet/pod-resources
"""

    def generate_pod_spec(
        self,
        name: str,
        image: str,
        gpu_memory: str,
        gpu_cores: int = 100,
        gpu_count: int = 1
    ) -> Dict:
        """
        生成使用 HAMi 的 Pod 规格

        Args:
            name: Pod 名称
            image: 容器镜像
            gpu_memory: 显存大小 (如 "4Gi")
            gpu_cores: GPU 核心百分比 (0-100)
            gpu_count: GPU 数量
        """
        return {
            "apiVersion": "v1",
            "kind": "Pod",
            "metadata": {
                "name": name,
                "annotations": {
                    # HAMi 注解
                    "hami.io/gpu-memory": gpu_memory,
                    "hami.io/gpu-cores": str(gpu_cores)
                }
            },
            "spec": {
                "schedulerName": "hami-scheduler",
                "containers": [{
                    "name": "main",
                    "image": image,
                    "resources": {
                        "limits": {
                            "nvidia.com/gpu": str(gpu_count)
                        }
                    }
                }]
            }
        }

    def generate_scheduler_config(
        self,
        default_memory: str = "4Gi",
        default_cores: int = 100,
        enable_core_isolation: bool = True
    ) -> Dict:
        """生成调度器配置"""
        return {
            "apiVersion": "v1",
            "kind": "ConfigMap",
            "metadata": {
                "name": "hami-scheduler-config",
                "namespace": self.namespace
            },
            "data": {
                "config.yaml": yaml.dump({
                    "scheduler": {
                        "defaultMemory": default_memory,
                        "defaultCores": default_cores,
                        "enableCoreIsolation": enable_core_isolation,
                        "resourceName": "nvidia.com/gpu",
                        "schedulerPolicy": "binpack",  # binpack | spread
                        "nodeSelector": {},
                        "tolerations": []
                    },
                    "devicePlugin": {
                        "enableHealthCheck": True,
                        "healthCheckInterval": "30s"
                    }
                })
            }
        }


class GPUShareScheduler:
    """GPU 共享调度器"""

    def __init__(self):
        self.namespace = "gpu-share"

    def generate_gpushare_deployment(self) -> str:
        """生成 gpushare 部署清单"""
        return """
# gpushare-scheduler-extender
apiVersion: apps/v1
kind: Deployment
metadata:
  name: gpushare-scheduler-extender
  namespace: kube-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gpushare-scheduler-extender
  template:
    metadata:
      labels:
        app: gpushare-scheduler-extender
    spec:
      serviceAccountName: gpushare-scheduler-extender
      containers:
      - name: extender
        image: registry.cn-hangzhou.aliyuncs.com/acs/k8s-gpushare-scheduler-extender:v2
        env:
        - name: LOG_LEVEL
          value: info
        ports:
        - containerPort: 12345

---
# gpushare-device-plugin
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: gpushare-device-plugin
  namespace: kube-system
spec:
  selector:
    matchLabels:
      app: gpushare-device-plugin
  template:
    metadata:
      labels:
        app: gpushare-device-plugin
    spec:
      tolerations:
      - operator: Exists
      containers:
      - name: device-plugin
        image: registry.cn-hangzhou.aliyuncs.com/acs/k8s-gpushare-plugin:v2
        env:
        - name: NODE_NAME
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        securityContext:
          privileged: true
        volumeMounts:
        - name: device-plugin
          mountPath: /var/lib/kubelet/device-plugins
      volumes:
      - name: device-plugin
        hostPath:
          path: /var/lib/kubelet/device-plugins

---
# 调度器配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: scheduler-extender-config
  namespace: kube-system
data:
  scheduler-extender-config.yaml: |
    apiVersion: kubescheduler.config.k8s.io/v1
    kind: KubeSchedulerConfiguration
    extenders:
    - urlPrefix: "http://gpushare-scheduler-extender:12345/gpushare-scheduler"
      filterVerb: "filter"
      bindVerb: "bind"
      enableHTTPS: false
      nodeCacheCapable: true
      managedResources:
      - name: aliyun.com/gpu-mem
        ignoredByScheduler: false
      ignorable: false
"""

    def generate_pod_with_gpu_share(
        self,
        name: str,
        image: str,
        gpu_memory_gb: int
    ) -> Dict:
        """生成使用 GPU 共享的 Pod"""
        return {
            "apiVersion": "v1",
            "kind": "Pod",
            "metadata": {
                "name": name
            },
            "spec": {
                "schedulerName": "gpushare-scheduler",
                "containers": [{
                    "name": "main",
                    "image": image,
                    "resources": {
                        "limits": {
                            "aliyun.com/gpu-mem": gpu_memory_gb
                        }
                    }
                }]
            }
        }


# 使用示例
if __name__ == "__main__":
    # HAMi 示例
    hami = HAMiManager()

    # 生成 Pod 配置
    pod_spec = hami.generate_pod_spec(
        name="inference-pod",
        image="pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime",
        gpu_memory="4Gi",
        gpu_cores=50  # 使用 50% GPU 算力
    )
    print(yaml.dump(pod_spec))

显存虚拟化

显存池化架构

┌─────────────────────────────────────────────────────────────────┐
│                     显存池化架构                                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  应用层                                                          │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐            │
│  │  App 1  │  │  App 2  │  │  App 3  │  │  App 4  │            │
│  │  需要8G  │  │  需要4G  │  │  需要12G │  │  需要6G  │            │
│  └────┬────┘  └────┬────┘  └────┬────┘  └────┬────┘            │
│       │            │            │            │                  │
│  ═════╪════════════╪════════════╪════════════╪════════════════  │
│       │                                      │                  │
│  显存虚拟化层 (Unified Memory Manager)                           │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                                                          │  │
│  │  ┌────────────────────────────────────────────────────┐  │  │
│  │  │              Virtual Address Space                 │  │  │
│  │  │  ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐     │  │  │
│  │  │  │ App1 │ │ App2 │ │ App3 │ │ App4 │ │ Free │     │  │  │
│  │  │  │  8G  │ │  4G  │ │ 12G  │ │  6G  │ │      │     │  │  │
│  │  │  └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ └──────┘     │  │  │
│  │  └─────┼────────┼────────┼────────┼──────────────────┘  │  │
│  │        │        │        │        │                     │  │
│  │  ┌─────┴────────┴────────┴────────┴─────────────────┐   │  │
│  │  │              Page Table / Memory Map             │   │  │
│  │  │  ┌────┬────┬────┬────┬────┬────┬────┬────┐      │   │  │
│  │  │  │ P0 │ P1 │ P2 │ P3 │ P4 │ P5 │ .. │ Pn │      │   │  │
│  │  │  │GPU0│GPU0│GPU1│GPU1│HOST│GPU0│    │    │      │   │  │
│  │  │  └────┴────┴────┴────┴────┴────┴────┴────┘      │   │  │
│  │  └──────────────────────────────────────────────────┘   │  │
│  │                                                          │  │
│  │  特性:                                                    │  │
│  │  • 按需分配 (On-demand allocation)                        │  │
│  │  • 页面迁移 (Page migration)                              │  │
│  │  • 超分配 (Oversubscription)                              │  │
│  │  • 透明换出 (Transparent swap)                            │  │
│  │                                                          │  │
│  └──────────────────────────────────────────────────────────┘  │
│                              │                                  │
│  ════════════════════════════╪════════════════════════════════  │
│                              │                                  │
│  物理资源层                    │                                  │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │   GPU 0 (24GB)    │    GPU 1 (24GB)    │   Host (128GB)  │  │
│  │  ┌──────────────┐ │  ┌──────────────┐  │  ┌────────────┐ │  │
│  │  │    VRAM      │ │  │    VRAM      │  │  │    RAM     │ │  │
│  │  │  实际: 18GB   │ │  │  实际: 20GB   │  │  │  溢出: 12GB│ │  │
│  │  └──────────────┘ │  └──────────────┘  │  └────────────┘ │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

显存管理实现

"""
GPU 显存虚拟化管理
"""
import torch
import numpy as np
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from threading import Lock
import logging
from enum import Enum


class MemoryLocation(Enum):
    """内存位置"""
    GPU = "gpu"
    HOST = "host"
    DISK = "disk"


@dataclass
class MemoryBlock:
    """内存块"""
    id: str
    size: int  # bytes
    location: MemoryLocation
    gpu_id: Optional[int] = None
    last_access: float = 0.0
    access_count: int = 0
    pinned: bool = False

    def __post_init__(self):
        import time
        self.last_access = time.time()


@dataclass
class TensorHandle:
    """张量句柄"""
    tensor_id: str
    shape: Tuple[int, ...]
    dtype: torch.dtype
    blocks: List[str] = field(default_factory=list)
    materialized: bool = False


class VirtualMemoryManager:
    """虚拟显存管理器"""

    def __init__(
        self,
        gpu_ids: List[int],
        enable_oversubscription: bool = True,
        oversubscription_ratio: float = 1.5,
        page_size: int = 2 * 1024 * 1024  # 2MB
    ):
        self.gpu_ids = gpu_ids
        self.enable_oversubscription = enable_oversubscription
        self.oversubscription_ratio = oversubscription_ratio
        self.page_size = page_size

        self.blocks: Dict[str, MemoryBlock] = {}
        self.tensors: Dict[str, TensorHandle] = {}
        self.gpu_usage: Dict[int, int] = {gid: 0 for gid in gpu_ids}
        self.host_usage: int = 0

        self._lock = Lock()
        self.logger = logging.getLogger(__name__)

        # 初始化 GPU 信息
        self.gpu_capacities = self._get_gpu_capacities()
        self.virtual_capacities = {
            gid: int(cap * oversubscription_ratio)
            for gid, cap in self.gpu_capacities.items()
        }

    def _get_gpu_capacities(self) -> Dict[int, int]:
        """获取 GPU 显存容量"""
        capacities = {}
        for gid in self.gpu_ids:
            props = torch.cuda.get_device_properties(gid)
            capacities[gid] = props.total_memory
        return capacities

    def allocate(
        self,
        size: int,
        preferred_gpu: Optional[int] = None,
        pinned: bool = False
    ) -> str:
        """
        分配虚拟内存

        Args:
            size: 请求大小
            preferred_gpu: 首选 GPU
            pinned: 是否固定在 GPU
        """
        with self._lock:
            # 对齐到页大小
            aligned_size = ((size + self.page_size - 1) // self.page_size) * self.page_size

            # 选择目标 GPU
            target_gpu = self._select_gpu(aligned_size, preferred_gpu)

            # 确定初始位置
            if target_gpu is not None and self._can_fit_gpu(target_gpu, aligned_size):
                location = MemoryLocation.GPU
                gpu_id = target_gpu
            else:
                location = MemoryLocation.HOST
                gpu_id = None

            # 创建内存块
            block_id = f"block_{len(self.blocks)}"
            block = MemoryBlock(
                id=block_id,
                size=aligned_size,
                location=location,
                gpu_id=gpu_id,
                pinned=pinned
            )

            self.blocks[block_id] = block

            # 更新使用统计
            if location == MemoryLocation.GPU:
                self.gpu_usage[gpu_id] += aligned_size
            else:
                self.host_usage += aligned_size

            self.logger.info(
                f"Allocated {aligned_size} bytes at {location.value}"
                f"{f' (GPU {gpu_id})' if gpu_id is not None else ''}"
            )

            return block_id

    def _select_gpu(
        self,
        size: int,
        preferred_gpu: Optional[int]
    ) -> Optional[int]:
        """选择目标 GPU"""
        if preferred_gpu is not None:
            if self._can_fit_gpu(preferred_gpu, size):
                return preferred_gpu

        # 选择使用率最低的 GPU
        best_gpu = None
        min_usage = float('inf')

        for gid in self.gpu_ids:
            if self._can_fit_gpu(gid, size):
                usage = self.gpu_usage[gid] / self.gpu_capacities[gid]
                if usage < min_usage:
                    min_usage = usage
                    best_gpu = gid

        return best_gpu

    def _can_fit_gpu(self, gpu_id: int, size: int) -> bool:
        """检查是否能放入 GPU"""
        current = self.gpu_usage[gpu_id]
        capacity = self.virtual_capacities[gpu_id] if self.enable_oversubscription \
                   else self.gpu_capacities[gpu_id]
        return current + size <= capacity

    def free(self, block_id: str):
        """释放内存块"""
        with self._lock:
            if block_id not in self.blocks:
                return

            block = self.blocks[block_id]

            if block.location == MemoryLocation.GPU:
                self.gpu_usage[block.gpu_id] -= block.size
            else:
                self.host_usage -= block.size

            del self.blocks[block_id]

    def migrate(
        self,
        block_id: str,
        target_location: MemoryLocation,
        target_gpu: Optional[int] = None
    ) -> bool:
        """迁移内存块"""
        with self._lock:
            if block_id not in self.blocks:
                return False

            block = self.blocks[block_id]

            if block.pinned:
                self.logger.warning(f"Cannot migrate pinned block {block_id}")
                return False

            # 检查目标容量
            if target_location == MemoryLocation.GPU:
                if not self._can_fit_gpu(target_gpu, block.size):
                    return False

            # 更新使用统计
            if block.location == MemoryLocation.GPU:
                self.gpu_usage[block.gpu_id] -= block.size
            else:
                self.host_usage -= block.size

            if target_location == MemoryLocation.GPU:
                self.gpu_usage[target_gpu] += block.size
            else:
                self.host_usage += block.size

            # 更新块信息
            block.location = target_location
            block.gpu_id = target_gpu

            self.logger.info(
                f"Migrated block {block_id} to {target_location.value}"
            )

            return True

    def evict_to_host(self, gpu_id: int, required_size: int) -> int:
        """
        将数据从 GPU 驱逐到主机

        使用 LRU 策略选择要驱逐的块
        """
        evicted_size = 0

        # 收集可驱逐的块,按访问时间排序
        evictable = [
            block for block in self.blocks.values()
            if block.gpu_id == gpu_id and not block.pinned
        ]
        evictable.sort(key=lambda b: b.last_access)

        for block in evictable:
            if evicted_size >= required_size:
                break

            if self.migrate(block.id, MemoryLocation.HOST):
                evicted_size += block.size

        return evicted_size

    def prefetch_to_gpu(
        self,
        block_ids: List[str],
        gpu_id: int
    ) -> int:
        """预取数据到 GPU"""
        prefetched = 0

        for block_id in block_ids:
            if block_id not in self.blocks:
                continue

            block = self.blocks[block_id]

            if block.location != MemoryLocation.GPU or block.gpu_id != gpu_id:
                # 可能需要先驱逐其他数据
                if not self._can_fit_gpu(gpu_id, block.size):
                    self.evict_to_host(gpu_id, block.size)

                if self.migrate(block_id, MemoryLocation.GPU, gpu_id):
                    prefetched += 1

        return prefetched

    def get_statistics(self) -> Dict:
        """获取统计信息"""
        stats = {
            "gpu_usage": {},
            "host_usage": self.host_usage,
            "total_blocks": len(self.blocks),
            "blocks_by_location": {
                MemoryLocation.GPU.value: 0,
                MemoryLocation.HOST.value: 0
            }
        }

        for gid in self.gpu_ids:
            stats["gpu_usage"][gid] = {
                "used": self.gpu_usage[gid],
                "capacity": self.gpu_capacities[gid],
                "virtual_capacity": self.virtual_capacities[gid],
                "utilization": self.gpu_usage[gid] / self.gpu_capacities[gid]
            }

        for block in self.blocks.values():
            stats["blocks_by_location"][block.location.value] += 1

        return stats


class UnifiedMemoryTensor:
    """统一内存张量"""

    def __init__(
        self,
        vmm: VirtualMemoryManager,
        shape: Tuple[int, ...],
        dtype: torch.dtype = torch.float32,
        device: Optional[int] = None
    ):
        self.vmm = vmm
        self.shape = shape
        self.dtype = dtype
        self.preferred_device = device

        # 计算大小
        self.element_size = torch.tensor([], dtype=dtype).element_size()
        self.total_size = int(np.prod(shape)) * self.element_size

        # 分配虚拟内存
        self.block_id = vmm.allocate(self.total_size, device)

        # 实际数据(延迟初始化)
        self._data: Optional[torch.Tensor] = None
        self._location: Optional[MemoryLocation] = None

    def materialize(self, device: Optional[int] = None) -> torch.Tensor:
        """
        具现化张量
        确保数据在指定设备上
        """
        target_device = device or self.preferred_device

        block = self.vmm.blocks[self.block_id]

        # 如果数据不在目标位置,进行迁移
        if target_device is not None:
            if block.location != MemoryLocation.GPU or block.gpu_id != target_device:
                self.vmm.migrate(
                    self.block_id,
                    MemoryLocation.GPU,
                    target_device
                )

        # 创建或获取张量
        if self._data is None:
            if target_device is not None:
                self._data = torch.empty(
                    self.shape,
                    dtype=self.dtype,
                    device=f"cuda:{target_device}"
                )
            else:
                self._data = torch.empty(self.shape, dtype=self.dtype)
            self._location = block.location
        elif self._location != block.location:
            # 位置已变更,移动数据
            if block.location == MemoryLocation.GPU:
                self._data = self._data.to(f"cuda:{block.gpu_id}")
            else:
                self._data = self._data.cpu()
            self._location = block.location

        # 更新访问统计
        import time
        block.last_access = time.time()
        block.access_count += 1

        return self._data

    def release(self):
        """释放张量"""
        self._data = None
        self.vmm.free(self.block_id)

    def __del__(self):
        if hasattr(self, 'vmm') and hasattr(self, 'block_id'):
            self.release()


# 使用示例
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    # 创建虚拟内存管理器
    if torch.cuda.is_available():
        vmm = VirtualMemoryManager(
            gpu_ids=[0],
            enable_oversubscription=True,
            oversubscription_ratio=1.5
        )

        # 创建统一内存张量
        tensor = UnifiedMemoryTensor(
            vmm,
            shape=(1024, 1024),
            dtype=torch.float32,
            device=0
        )

        # 具现化并使用
        data = tensor.materialize()
        data.fill_(1.0)

        # 查看统计
        print(vmm.get_statistics())

        # 释放
        tensor.release()

性能监控与优化

虚拟化性能监控

"""
GPU 虚拟化性能监控
"""
import time
import threading
from typing import Dict, List, Callable, Optional
from dataclasses import dataclass
from collections import deque
import json


@dataclass
class VGPUMetrics:
    """vGPU 指标"""
    timestamp: float
    gpu_id: int
    instance_id: str

    # 利用率指标
    compute_utilization: float  # 0-100
    memory_utilization: float   # 0-100
    encoder_utilization: float  # 0-100
    decoder_utilization: float  # 0-100

    # 内存指标
    memory_used: int  # bytes
    memory_total: int  # bytes

    # 性能指标
    sm_clock: int  # MHz
    memory_clock: int  # MHz
    temperature: int  # Celsius
    power_usage: int  # Watts

    # 虚拟化特有指标
    context_switches: int
    time_slice_violations: int
    memory_migrations: int


class VGPUMonitor:
    """vGPU 监控器"""

    def __init__(
        self,
        collection_interval: float = 1.0,
        history_size: int = 3600
    ):
        self.collection_interval = collection_interval
        self.history_size = history_size

        self.metrics_history: Dict[str, deque] = {}
        self.alerts: List[Dict] = []
        self.alert_callbacks: List[Callable] = []

        self._running = False
        self._thread: Optional[threading.Thread] = None

    def start(self):
        """启动监控"""
        self._running = True
        self._thread = threading.Thread(target=self._collection_loop)
        self._thread.daemon = True
        self._thread.start()

    def stop(self):
        """停止监控"""
        self._running = False
        if self._thread:
            self._thread.join()

    def _collection_loop(self):
        """采集循环"""
        while self._running:
            try:
                metrics = self._collect_metrics()
                self._process_metrics(metrics)
            except Exception as e:
                print(f"Metrics collection error: {e}")

            time.sleep(self.collection_interval)

    def _collect_metrics(self) -> List[VGPUMetrics]:
        """采集指标"""
        metrics = []

        # 使用 nvidia-smi 或 NVML 采集
        # 这里是示例实现
        try:
            import pynvml
            pynvml.nvmlInit()

            device_count = pynvml.nvmlDeviceGetCount()

            for i in range(device_count):
                handle = pynvml.nvmlDeviceGetHandleByIndex(i)

                # 获取利用率
                util = pynvml.nvmlDeviceGetUtilizationRates(handle)

                # 获取内存
                mem = pynvml.nvmlDeviceGetMemoryInfo(handle)

                # 获取时钟
                sm_clock = pynvml.nvmlDeviceGetClockInfo(
                    handle, pynvml.NVML_CLOCK_SM
                )
                mem_clock = pynvml.nvmlDeviceGetClockInfo(
                    handle, pynvml.NVML_CLOCK_MEM
                )

                # 获取温度
                temp = pynvml.nvmlDeviceGetTemperature(
                    handle, pynvml.NVML_TEMPERATURE_GPU
                )

                # 获取功耗
                power = pynvml.nvmlDeviceGetPowerUsage(handle) // 1000

                # 尝试获取 MIG 实例信息
                try:
                    mig_mode = pynvml.nvmlDeviceGetMigMode(handle)
                    if mig_mode[0] == pynvml.NVML_DEVICE_MIG_ENABLE:
                        # 遍历 MIG 实例
                        mig_count = pynvml.nvmlDeviceGetMaxMigDeviceCount(handle)
                        for j in range(mig_count):
                            try:
                                mig_handle = pynvml.nvmlDeviceGetMigDeviceHandleByIndex(
                                    handle, j
                                )
                                # 采集 MIG 实例指标
                                mig_mem = pynvml.nvmlDeviceGetMemoryInfo(mig_handle)
                                metrics.append(VGPUMetrics(
                                    timestamp=time.time(),
                                    gpu_id=i,
                                    instance_id=f"mig-{j}",
                                    compute_utilization=0,  # MIG 不支持细粒度利用率
                                    memory_utilization=mig_mem.used / mig_mem.total * 100,
                                    encoder_utilization=0,
                                    decoder_utilization=0,
                                    memory_used=mig_mem.used,
                                    memory_total=mig_mem.total,
                                    sm_clock=sm_clock,
                                    memory_clock=mem_clock,
                                    temperature=temp,
                                    power_usage=power,
                                    context_switches=0,
                                    time_slice_violations=0,
                                    memory_migrations=0
                                ))
                            except:
                                pass
                except:
                    pass

                # 整卡指标
                metrics.append(VGPUMetrics(
                    timestamp=time.time(),
                    gpu_id=i,
                    instance_id="full",
                    compute_utilization=util.gpu,
                    memory_utilization=mem.used / mem.total * 100,
                    encoder_utilization=0,
                    decoder_utilization=0,
                    memory_used=mem.used,
                    memory_total=mem.total,
                    sm_clock=sm_clock,
                    memory_clock=mem_clock,
                    temperature=temp,
                    power_usage=power,
                    context_switches=0,
                    time_slice_violations=0,
                    memory_migrations=0
                ))

            pynvml.nvmlShutdown()
        except ImportError:
            pass

        return metrics

    def _process_metrics(self, metrics: List[VGPUMetrics]):
        """处理指标"""
        for m in metrics:
            key = f"{m.gpu_id}:{m.instance_id}"

            if key not in self.metrics_history:
                self.metrics_history[key] = deque(maxlen=self.history_size)

            self.metrics_history[key].append(m)

            # 检查告警
            self._check_alerts(m)

    def _check_alerts(self, metrics: VGPUMetrics):
        """检查告警条件"""
        alerts = []

        # 高温告警
        if metrics.temperature > 85:
            alerts.append({
                "type": "high_temperature",
                "severity": "critical",
                "gpu_id": metrics.gpu_id,
                "instance_id": metrics.instance_id,
                "value": metrics.temperature,
                "threshold": 85,
                "message": f"GPU {metrics.gpu_id} temperature critical: {metrics.temperature}°C"
            })
        elif metrics.temperature > 80:
            alerts.append({
                "type": "high_temperature",
                "severity": "warning",
                "gpu_id": metrics.gpu_id,
                "instance_id": metrics.instance_id,
                "value": metrics.temperature,
                "threshold": 80,
                "message": f"GPU {metrics.gpu_id} temperature warning: {metrics.temperature}°C"
            })

        # 显存告警
        memory_util = metrics.memory_used / metrics.memory_total * 100
        if memory_util > 95:
            alerts.append({
                "type": "high_memory",
                "severity": "critical",
                "gpu_id": metrics.gpu_id,
                "instance_id": metrics.instance_id,
                "value": memory_util,
                "threshold": 95,
                "message": f"GPU {metrics.gpu_id} memory critical: {memory_util:.1f}%"
            })

        # 触发告警回调
        for alert in alerts:
            alert["timestamp"] = time.time()
            self.alerts.append(alert)

            for callback in self.alert_callbacks:
                try:
                    callback(alert)
                except Exception as e:
                    print(f"Alert callback error: {e}")

    def add_alert_callback(self, callback: Callable):
        """添加告警回调"""
        self.alert_callbacks.append(callback)

    def get_current_metrics(self, gpu_id: int = None) -> List[VGPUMetrics]:
        """获取当前指标"""
        result = []
        for key, history in self.metrics_history.items():
            if history:
                m = history[-1]
                if gpu_id is None or m.gpu_id == gpu_id:
                    result.append(m)
        return result

    def get_metrics_summary(self, gpu_id: int, window_seconds: int = 60) -> Dict:
        """获取指标摘要"""
        cutoff = time.time() - window_seconds

        summary = {
            "avg_compute_utilization": 0,
            "avg_memory_utilization": 0,
            "max_temperature": 0,
            "avg_power_usage": 0,
            "sample_count": 0
        }

        for key, history in self.metrics_history.items():
            samples = [m for m in history if m.gpu_id == gpu_id and m.timestamp > cutoff]

            if samples:
                summary["avg_compute_utilization"] = sum(
                    m.compute_utilization for m in samples
                ) / len(samples)
                summary["avg_memory_utilization"] = sum(
                    m.memory_utilization for m in samples
                ) / len(samples)
                summary["max_temperature"] = max(m.temperature for m in samples)
                summary["avg_power_usage"] = sum(
                    m.power_usage for m in samples
                ) / len(samples)
                summary["sample_count"] = len(samples)

        return summary

    def export_metrics(self, format: str = "json") -> str:
        """导出指标"""
        data = {
            "timestamp": time.time(),
            "metrics": {}
        }

        for key, history in self.metrics_history.items():
            if history:
                latest = history[-1]
                data["metrics"][key] = {
                    "compute_utilization": latest.compute_utilization,
                    "memory_utilization": latest.memory_utilization,
                    "memory_used": latest.memory_used,
                    "memory_total": latest.memory_total,
                    "temperature": latest.temperature,
                    "power_usage": latest.power_usage
                }

        if format == "json":
            return json.dumps(data, indent=2)
        else:
            return str(data)


# Prometheus 指标导出
class PrometheusVGPUExporter:
    """Prometheus vGPU 指标导出器"""

    def __init__(self, monitor: VGPUMonitor, port: int = 9400):
        self.monitor = monitor
        self.port = port

    def generate_metrics(self) -> str:
        """生成 Prometheus 格式指标"""
        lines = []

        lines.append("# HELP vgpu_compute_utilization GPU compute utilization")
        lines.append("# TYPE vgpu_compute_utilization gauge")

        lines.append("# HELP vgpu_memory_utilization GPU memory utilization")
        lines.append("# TYPE vgpu_memory_utilization gauge")

        lines.append("# HELP vgpu_memory_used GPU memory used in bytes")
        lines.append("# TYPE vgpu_memory_used gauge")

        lines.append("# HELP vgpu_temperature GPU temperature in Celsius")
        lines.append("# TYPE vgpu_temperature gauge")

        lines.append("# HELP vgpu_power_usage GPU power usage in Watts")
        lines.append("# TYPE vgpu_power_usage gauge")

        for metrics in self.monitor.get_current_metrics():
            labels = f'gpu_id="{metrics.gpu_id}",instance_id="{metrics.instance_id}"'

            lines.append(
                f'vgpu_compute_utilization{{{labels}}} {metrics.compute_utilization}'
            )
            lines.append(
                f'vgpu_memory_utilization{{{labels}}} {metrics.memory_utilization}'
            )
            lines.append(
                f'vgpu_memory_used{{{labels}}} {metrics.memory_used}'
            )
            lines.append(
                f'vgpu_temperature{{{labels}}} {metrics.temperature}'
            )
            lines.append(
                f'vgpu_power_usage{{{labels}}} {metrics.power_usage}'
            )

        return '\n'.join(lines)


# 使用示例
if __name__ == "__main__":
    # 创建监控器
    monitor = VGPUMonitor(collection_interval=1.0)

    # 添加告警回调
    def alert_handler(alert):
        print(f"Alert: {alert['message']}")

    monitor.add_alert_callback(alert_handler)

    # 启动监控
    monitor.start()

    # 运行一段时间
    time.sleep(10)

    # 获取指标摘要
    summary = monitor.get_metrics_summary(gpu_id=0, window_seconds=60)
    print(f"Summary: {summary}")

    # 导出指标
    print(monitor.export_metrics())

    # 停止监控
    monitor.stop()

最佳实践

虚拟化方案选择

# GPU 虚拟化方案选择指南
virtualization_selection:
  # 场景 1: 推理服务
  inference:
    recommended: "MIG + Time Slicing"
    reasoning:
      - 推理任务通常显存需求小
      - 需要高并发处理
      - 延迟敏感
    configuration:
      mig_profile: "1g.10gb"  # 小实例
      time_slicing_replicas: 2

  # 场景 2: 训练任务
  training:
    recommended: "MIG (大实例) 或 Full GPU"
    reasoning:
      - 训练需要大量显存
      - 需要稳定的计算资源
      - 长时间运行
    configuration:
      mig_profile: "3g.40gb"  # 大实例

  # 场景 3: 开发测试
  development:
    recommended: "Time Slicing + HAMi"
    reasoning:
      - 资源需求变化大
      - 需要灵活分配
      - 成本敏感
    configuration:
      time_slicing_replicas: 4
      hami_gpu_memory: "4Gi"
      hami_gpu_cores: 25

  # 场景 4: 混合工作负载
  mixed:
    recommended: "MIG 混合配置"
    reasoning:
      - 不同任务需求不同
      - 需要资源隔离
      - 优化整体利用率
    configuration:
      mig_profiles:
        - "1g.10gb": 4  # 推理
        - "3g.40gb": 1  # 训练

---
# 虚拟化配置检查清单
checklist:
  pre_deployment:
    - [ ] 确认 GPU 支持 MIG(A100/A30/H100)
    - [ ] 规划 MIG 分区策略
    - [ ] 评估时间片开销
    - [ ] 配置设备插件

  monitoring:
    - [ ] 部署 GPU 监控
    - [ ] 配置告警规则
    - [ ] 设置性能基线

  optimization:
    - [ ] 定期评估利用率
    - [ ] 调整分区配置
    - [ ] 优化调度策略

Kubernetes 集成配置

# 完整的 GPU 虚拟化 Kubernetes 配置
---
# GPU Operator 配置
apiVersion: nvidia.com/v1
kind: ClusterPolicy
metadata:
  name: cluster-policy
spec:
  operator:
    defaultRuntime: containerd

  driver:
    enabled: true
    version: "535.104.05"

  toolkit:
    enabled: true

  devicePlugin:
    enabled: true
    config:
      name: nvidia-device-plugin-config
      default: "any"

  mig:
    strategy: mixed

  migManager:
    enabled: true
    config:
      name: mig-parted-config
      default: "all-balanced"

---
# 设备插件配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: nvidia-device-plugin-config
  namespace: gpu-operator
data:
  any: |
    version: v1
    flags:
      migStrategy: mixed
    sharing:
      timeSlicing:
        renameByDefault: false
        failRequestsGreaterThanOne: false
        resources:
        - name: nvidia.com/gpu
          replicas: 2

  inference: |
    version: v1
    flags:
      migStrategy: single
    sharing:
      timeSlicing:
        resources:
        - name: nvidia.com/gpu
          replicas: 4

---
# MIG 分区配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: mig-parted-config
  namespace: gpu-operator
data:
  config.yaml: |
    version: v1
    mig-configs:
      all-disabled:
        - devices: all
          mig-enabled: false

      all-balanced:
        - devices: all
          mig-enabled: true
          mig-devices:
            "1g.10gb": 4
            "2g.20gb": 1

      all-inference:
        - devices: all
          mig-enabled: true
          mig-devices:
            "1g.10gb": 7

      all-training:
        - devices: all
          mig-enabled: true
          mig-devices:
            "3g.40gb": 2

---
# 节点标签策略
apiVersion: v1
kind: ConfigMap
metadata:
  name: gpu-node-labels
data:
  labeling-rules.yaml: |
    # 根据 GPU 类型打标签
    - match:
        gpu-model: "A100-SXM4-80GB"
      labels:
        gpu.nvidia.com/class: high-end
        gpu.nvidia.com/mig-capable: "true"

    - match:
        gpu-model: "A100-SXM4-40GB"
      labels:
        gpu.nvidia.com/class: mid-range
        gpu.nvidia.com/mig-capable: "true"

    - match:
        gpu-model: "V100"
      labels:
        gpu.nvidia.com/class: legacy
        gpu.nvidia.com/mig-capable: "false"

---
# ResourceQuota 限制
apiVersion: v1
kind: ResourceQuota
metadata:
  name: gpu-quota
  namespace: ml-workloads
spec:
  hard:
    # 全 GPU
    nvidia.com/gpu: "8"

    # MIG 实例
    nvidia.com/mig-1g.10gb: "28"
    nvidia.com/mig-2g.20gb: "6"
    nvidia.com/mig-3g.40gb: "4"

    # HAMi 资源
    nvidia.com/gpumem: "320Gi"
    nvidia.com/gpucores: "800"

总结

GPU 虚拟化技术是提高 AI 基础设施资源利用率的关键:

  1. MIG 技术:硬件级分区,提供真正的资源隔离
  2. 时间片共享:软件级共享,适合轻量级工作负载
  3. vGPU 方案:灵活的显存和算力分配
  4. 显存虚拟化:支持超分配和按需迁移

选择合适的虚拟化方案需要考虑:

  • 工作负载特性(训练/推理)
  • 隔离需求(硬隔离/软隔离)
  • 资源利用率目标
  • 运维复杂度
Prev
异构计算概述
Next
NPU 与专用 AI 芯片