GPU 虚拟化技术
概述
GPU 虚拟化是 AI 基础设施的关键技术,它允许多个工作负载共享物理 GPU 资源,提高资源利用率,降低成本。本文深入探讨 GPU 虚拟化的原理、主流方案及实践。
GPU 虚拟化原理
虚拟化层次
┌─────────────────────────────────────────────────────────────────┐
│ GPU 虚拟化架构层次 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 应用层虚拟化 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 容器化方案 │ │ 框架级隔离 │ │ 进程级共享 │ │
│ │ (Docker GPU) │ │ (TF/PyTorch)│ │ (MPS/MIG) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ───────┴────────────────┴────────────────┴───────────── │
│ │
│ 驱动层虚拟化 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ API 转发 │ │ 显存虚拟化 │ │ 算力分片 │ │
│ │ (vGPU/GRID) │ │ (Memory Pool)│ │ (Time Slice)│ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ───────┴────────────────┴────────────────┴───────────── │
│ │
│ 硬件层虚拟化 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ SR-IOV │ │ MIG │ │ 硬件分区 │ │
│ │ (PCIe VF) │ │ (A100/H100) │ │ (Gaudi/TPU) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
资源隔离维度
# GPU 虚拟化资源隔离
resource_isolation:
# 算力隔离
compute:
- sm_partitioning # SM 分区
- time_slicing # 时间片
- priority_scheduling # 优先级调度
# 显存隔离
memory:
- physical_partition # 物理分区 (MIG)
- virtual_memory # 虚拟显存
- memory_limit # 显存限制
# 带宽隔离
bandwidth:
- nvlink_partition # NVLink 分区
- pcie_bandwidth # PCIe 带宽
- memory_bandwidth # 显存带宽
# 故障隔离
fault:
- ecc_isolation # ECC 错误隔离
- process_isolation # 进程级隔离
- vm_isolation # 虚拟机隔离
NVIDIA MIG 技术
MIG 架构详解
┌─────────────────────────────────────────────────────────────────┐
│ NVIDIA A100 MIG 架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Full GPU (80GB HBM2e, 108 SMs) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ GPC 0 │ │ GPC 1 │ │ GPC 2 │ │ GPC 3 │ │ │
│ │ │ 14 SMs │ │ 14 SMs │ │ 14 SMs │ │ 14 SMs │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ │ │ │ │ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ GPC 4 │ │ GPC 5 │ │ GPC 6 │ │ GPC 7 │ │ │
│ │ │ 14 SMs │ │ 14 SMs │ │ 14 SMs │ │ 14 SMs │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ │ │ │ │ │ │ │
│ │ ═════╪═══════════╪═══════════╪═══════════╪═════════ │ │
│ │ │ Memory Controller & L2 Cache │ │ │
│ │ ═════╪═══════════╪═══════════╪═══════════╪═════════ │ │
│ │ │ │ │ │ │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ HBM2e Memory (80GB) │ │ │
│ │ │ Slice 0 │ Slice 1 │ ... │ Slice 7 │ │ │
│ │ │ 10GB │ 10GB │ │ 10GB │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ MIG 分区示例: │
│ ┌───────────────────┬───────────────────┬─────────────────┐ │
│ │ 1g.10gb (×7) │ 2g.20gb (×3) │ 3g.40gb (×2) │ │
│ │ 7个小实例 │ 3个中实例 │ 2个大实例 │ │
│ └───────────────────┴───────────────────┴─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
MIG 配置管理
"""
NVIDIA MIG 配置管理工具
"""
import subprocess
import json
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
class MIGProfile(Enum):
"""MIG 配置文件"""
PROFILE_1G_5GB = "1g.5gb" # A30
PROFILE_1G_10GB = "1g.10gb" # A100-40GB
PROFILE_1G_20GB = "1g.20gb" # A100-80GB
PROFILE_2G_10GB = "2g.10gb" # A30
PROFILE_2G_20GB = "2g.20gb" # A100-40GB
PROFILE_3G_20GB = "3g.20gb" # A30
PROFILE_3G_40GB = "3g.40gb" # A100-40GB
PROFILE_4G_40GB = "4g.40gb" # A100-80GB
PROFILE_7G_40GB = "7g.40gb" # A100-40GB Full
PROFILE_7G_80GB = "7g.80gb" # A100-80GB Full
@dataclass
class GPUInstance:
"""GPU 实例"""
gi_id: int
profile: str
placement_start: int
placement_size: int
@dataclass
class ComputeInstance:
"""计算实例"""
ci_id: int
gi_id: int
profile: str
class MIGManager:
"""MIG 管理器"""
def __init__(self, gpu_index: int = 0):
self.gpu_index = gpu_index
self._validate_mig_support()
def _validate_mig_support(self):
"""验证 MIG 支持"""
result = self._run_nvidia_smi([
"nvidia-smi", "-i", str(self.gpu_index),
"--query-gpu=mig.mode.current",
"--format=csv,noheader"
])
if "Enabled" not in result and "[N/A]" in result:
raise RuntimeError(f"GPU {self.gpu_index} does not support MIG")
def _run_nvidia_smi(self, cmd: List[str]) -> str:
"""执行 nvidia-smi 命令"""
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"Command failed: {result.stderr}")
return result.stdout.strip()
def enable_mig(self) -> bool:
"""启用 MIG 模式"""
try:
self._run_nvidia_smi([
"nvidia-smi", "-i", str(self.gpu_index),
"-mig", "1"
])
print(f"MIG enabled on GPU {self.gpu_index}. Reboot required.")
return True
except Exception as e:
print(f"Failed to enable MIG: {e}")
return False
def disable_mig(self) -> bool:
"""禁用 MIG 模式"""
try:
# 首先销毁所有实例
self.destroy_all_instances()
self._run_nvidia_smi([
"nvidia-smi", "-i", str(self.gpu_index),
"-mig", "0"
])
print(f"MIG disabled on GPU {self.gpu_index}. Reboot required.")
return True
except Exception as e:
print(f"Failed to disable MIG: {e}")
return False
def get_mig_status(self) -> Dict:
"""获取 MIG 状态"""
# 获取 MIG 模式
mode = self._run_nvidia_smi([
"nvidia-smi", "-i", str(self.gpu_index),
"--query-gpu=mig.mode.current",
"--format=csv,noheader"
])
# 获取 GPU 实例
gi_output = self._run_nvidia_smi([
"nvidia-smi", "mig", "-lgi", "-i", str(self.gpu_index)
])
# 获取计算实例
ci_output = self._run_nvidia_smi([
"nvidia-smi", "mig", "-lci", "-i", str(self.gpu_index)
])
return {
"gpu_index": self.gpu_index,
"mig_mode": mode,
"gpu_instances": gi_output,
"compute_instances": ci_output
}
def list_available_profiles(self) -> List[Dict]:
"""列出可用的 MIG 配置"""
output = self._run_nvidia_smi([
"nvidia-smi", "mig", "-lgip", "-i", str(self.gpu_index)
])
profiles = []
for line in output.split('\n'):
if 'MIG' in line or 'GPU' in line or '=' in line:
continue
parts = line.split()
if len(parts) >= 4:
profiles.append({
"id": parts[0],
"instances": parts[1],
"memory": parts[2],
"sms": parts[3] if len(parts) > 3 else "N/A"
})
return profiles
def create_gpu_instance(self, profile: str) -> Optional[int]:
"""创建 GPU 实例"""
try:
output = self._run_nvidia_smi([
"nvidia-smi", "mig", "-cgi", profile,
"-i", str(self.gpu_index)
])
# 解析输出获取 GI ID
for line in output.split('\n'):
if 'Successfully created GPU instance' in line:
gi_id = int(line.split('ID')[1].split()[0])
print(f"Created GPU instance {gi_id} with profile {profile}")
return gi_id
return None
except Exception as e:
print(f"Failed to create GPU instance: {e}")
return None
def create_compute_instance(self, gi_id: int, profile: str = None) -> Optional[int]:
"""创建计算实例"""
try:
cmd = [
"nvidia-smi", "mig", "-cci",
"-gi", str(gi_id),
"-i", str(self.gpu_index)
]
if profile:
cmd.insert(3, profile)
output = self._run_nvidia_smi(cmd)
for line in output.split('\n'):
if 'Successfully created compute instance' in line:
ci_id = int(line.split('ID')[1].split()[0])
print(f"Created compute instance {ci_id} on GPU instance {gi_id}")
return ci_id
return None
except Exception as e:
print(f"Failed to create compute instance: {e}")
return None
def destroy_gpu_instance(self, gi_id: int) -> bool:
"""销毁 GPU 实例"""
try:
# 首先销毁所有关联的计算实例
self._run_nvidia_smi([
"nvidia-smi", "mig", "-dci",
"-gi", str(gi_id),
"-i", str(self.gpu_index)
])
# 销毁 GPU 实例
self._run_nvidia_smi([
"nvidia-smi", "mig", "-dgi",
"-gi", str(gi_id),
"-i", str(self.gpu_index)
])
print(f"Destroyed GPU instance {gi_id}")
return True
except Exception as e:
print(f"Failed to destroy GPU instance: {e}")
return False
def destroy_all_instances(self) -> bool:
"""销毁所有实例"""
try:
# 销毁所有计算实例
self._run_nvidia_smi([
"nvidia-smi", "mig", "-dci",
"-i", str(self.gpu_index)
])
# 销毁所有 GPU 实例
self._run_nvidia_smi([
"nvidia-smi", "mig", "-dgi",
"-i", str(self.gpu_index)
])
print("Destroyed all MIG instances")
return True
except Exception as e:
print(f"Failed to destroy instances: {e}")
return False
def apply_profile(self, profile_config: List[str]) -> bool:
"""
应用 MIG 配置
Args:
profile_config: 配置列表,如 ["1g.10gb", "1g.10gb", "2g.20gb"]
"""
try:
# 销毁现有实例
self.destroy_all_instances()
# 创建新实例
for profile in profile_config:
gi_id = self.create_gpu_instance(profile)
if gi_id is not None:
self.create_compute_instance(gi_id)
return True
except Exception as e:
print(f"Failed to apply profile: {e}")
return False
class MIGProfileTemplates:
"""预定义的 MIG 配置模板"""
# A100-40GB 配置模板
A100_40GB_TEMPLATES = {
# 7 个小实例 - 适合推理
"inference_small": ["1g.5gb"] * 7,
# 3 个中等实例 - 平衡配置
"balanced": ["2g.10gb", "2g.10gb", "3g.20gb"],
# 2 个大实例 - 适合训练
"training": ["3g.20gb", "4g.20gb"],
# 1 大 + 多小 - 混合工作负载
"mixed": ["4g.20gb", "1g.5gb", "1g.5gb", "1g.5gb"],
# 全 GPU - 单任务
"full": ["7g.40gb"]
}
# A100-80GB 配置模板
A100_80GB_TEMPLATES = {
"inference_small": ["1g.10gb"] * 7,
"balanced": ["2g.20gb", "2g.20gb", "3g.40gb"],
"training": ["3g.40gb", "4g.40gb"],
"mixed": ["4g.40gb", "1g.10gb", "1g.10gb", "1g.10gb"],
"full": ["7g.80gb"]
}
@classmethod
def get_template(cls, gpu_model: str, template_name: str) -> List[str]:
"""获取配置模板"""
if "80GB" in gpu_model or "80G" in gpu_model:
templates = cls.A100_80GB_TEMPLATES
else:
templates = cls.A100_40GB_TEMPLATES
return templates.get(template_name, templates["balanced"])
# Kubernetes MIG 设备插件配置
class KubernetesMIGConfig:
"""Kubernetes MIG 配置生成器"""
@staticmethod
def generate_device_plugin_config(
strategy: str = "mixed",
mig_configs: Dict[str, List[str]] = None
) -> Dict:
"""
生成设备插件配置
Args:
strategy: single | mixed | none
mig_configs: GPU 索引到 MIG 配置的映射
"""
config = {
"version": "v1",
"sharing": {
"mig": {
"strategy": strategy,
"devices": {}
}
}
}
if mig_configs:
for gpu_index, profiles in mig_configs.items():
config["sharing"]["mig"]["devices"][gpu_index] = profiles
return config
@staticmethod
def generate_resource_request(profile: str, count: int = 1) -> Dict:
"""生成资源请求"""
# MIG 资源格式: nvidia.com/mig-<profile>
resource_name = f"nvidia.com/mig-{profile}"
return {
"resources": {
"limits": {
resource_name: str(count)
},
"requests": {
resource_name: str(count)
}
}
}
# 使用示例
if __name__ == "__main__":
# MIG 管理示例
# mig = MIGManager(gpu_index=0)
# mig.apply_profile(["2g.20gb", "2g.20gb", "3g.40gb"])
# Kubernetes 配置示例
k8s_config = KubernetesMIGConfig.generate_device_plugin_config(
strategy="mixed",
mig_configs={
"0": ["1g.10gb", "1g.10gb", "2g.20gb", "3g.40gb"],
"1": ["7g.80gb"] # 全 GPU
}
)
print(json.dumps(k8s_config, indent=2))
MIG Kubernetes 集成
# nvidia-device-plugin-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: nvidia-device-plugin-config
namespace: kube-system
data:
config.yaml: |
version: v1
flags:
migStrategy: "mixed" # none, single, mixed
sharing:
timeSlicing:
renameByDefault: false
resources:
- name: nvidia.com/gpu
replicas: 2 # 时间片复制因子
---
# MIG 配置 ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
name: mig-parted-config
namespace: gpu-operator
data:
config.yaml: |
version: v1
mig-configs:
# 推理优化配置
all-1g.10gb:
- devices: all
mig-enabled: true
mig-devices:
"1g.10gb": 7
# 平衡配置
all-balanced:
- devices: all
mig-enabled: true
mig-devices:
"2g.20gb": 2
"3g.40gb": 1
# 训练配置
all-3g.40gb:
- devices: all
mig-enabled: true
mig-devices:
"3g.40gb": 2
# 混合配置 - 不同节点不同策略
custom-config:
- devices: [0, 1]
mig-enabled: true
mig-devices:
"1g.10gb": 7
- devices: [2, 3]
mig-enabled: true
mig-devices:
"3g.40gb": 2
- devices: [4, 5, 6, 7]
mig-enabled: false
---
# 使用 MIG 实例的 Pod
apiVersion: v1
kind: Pod
metadata:
name: mig-inference-pod
spec:
containers:
- name: inference
image: nvcr.io/nvidia/pytorch:23.10-py3
resources:
limits:
nvidia.com/mig-1g.10gb: 1 # 请求 1g.10gb MIG 实例
command: ["python", "inference.py"]
nodeSelector:
nvidia.com/mig.config: "all-1g.10gb"
---
# MIG 感知的调度器配置
apiVersion: v1
kind: ConfigMap
metadata:
name: scheduler-config
namespace: kube-system
data:
scheduler-config.yaml: |
apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
profiles:
- schedulerName: gpu-scheduler
plugins:
filter:
enabled:
- name: NodeResourcesFit
score:
enabled:
- name: NodeResourcesBalancedAllocation
weight: 1
- name: NodeResourcesLeastAllocated
weight: 1
pluginConfig:
- name: NodeResourcesFit
args:
scoringStrategy:
type: LeastAllocated
resources:
- name: nvidia.com/mig-1g.10gb
weight: 1
- name: nvidia.com/mig-2g.20gb
weight: 2
- name: nvidia.com/mig-3g.40gb
weight: 3
GPU 时间片共享
时间片原理
┌─────────────────────────────────────────────────────────────────┐
│ GPU 时间片调度原理 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 时间轴: │
│ ┌────┬────┬────┬────┬────┬────┬────┬────┬────┬────┐ │
│ │ P1 │ P2 │ P3 │ P1 │ P2 │ P3 │ P1 │ P2 │ P3 │... │ │
│ └────┴────┴────┴────┴────┴────┴────┴────┴────┴────┘ │
│ ↑ │
│ └─ 时间片 (默认 60ms) │
│ │
│ 上下文切换开销: │
│ ┌──────────────────────────────────────────────────┐ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ │ │
│ │ │ Run │ Save│ Run │ Load│ Run │ │ │
│ │ │ P1 │ Ctx │ P2 │ Ctx │ P3 │ │ │
│ │ └─────┘ └─────┘ └─────┘ │ │
│ │ ↑ ↑ │ │
│ │ └───────────┴─ 切换开销 (~100μs) │ │
│ └──────────────────────────────────────────────────┘ │
│ │
│ MPS vs 时间片: │
│ ┌────────────────────┬────────────────────────────┐ │
│ │ 时间片 │ MPS │ │
│ ├────────────────────┼────────────────────────────┤ │
│ │ 串行执行 │ 并行执行 │ │
│ │ 显存独占 │ 显存共享 │ │
│ │ 完全隔离 │ 部分隔离 │ │
│ │ 高切换开销 │ 低切换开销 │ │
│ │ 兼容性好 │ 限制较多 │ │
│ └────────────────────┴────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
时间片配置实现
"""
GPU 时间片共享配置
"""
import os
from typing import Dict, List, Optional
from dataclasses import dataclass
import yaml
@dataclass
class TimeSlicingConfig:
"""时间片配置"""
replicas: int = 2
rename_by_default: bool = False
fail_requests_greater_than_one: bool = False
def to_dict(self) -> Dict:
return {
"replicas": self.replicas,
"renameByDefault": self.rename_by_default,
"failRequestsGreaterThanOne": self.fail_requests_greater_than_one
}
class GPUTimeSlicingManager:
"""GPU 时间片管理器"""
def __init__(self):
self.default_config = TimeSlicingConfig()
def generate_device_plugin_config(
self,
gpu_configs: Dict[str, TimeSlicingConfig] = None,
mig_strategy: str = "none"
) -> Dict:
"""
生成设备插件配置
Args:
gpu_configs: GPU 类型到配置的映射
mig_strategy: MIG 策略
"""
config = {
"version": "v1",
"flags": {
"migStrategy": mig_strategy
},
"sharing": {
"timeSlicing": {
"renameByDefault": self.default_config.rename_by_default,
"failRequestsGreaterThanOne": self.default_config.fail_requests_greater_than_one,
"resources": []
}
}
}
if gpu_configs:
for gpu_name, ts_config in gpu_configs.items():
config["sharing"]["timeSlicing"]["resources"].append({
"name": gpu_name,
"replicas": ts_config.replicas
})
else:
# 默认配置
config["sharing"]["timeSlicing"]["resources"].append({
"name": "nvidia.com/gpu",
"replicas": self.default_config.replicas
})
return config
def generate_kubernetes_manifests(
self,
replicas: int = 4,
namespace: str = "kube-system"
) -> str:
"""生成 Kubernetes 清单"""
manifests = f"""
# ConfigMap for time-slicing
apiVersion: v1
kind: ConfigMap
metadata:
name: time-slicing-config
namespace: {namespace}
data:
time-slicing-config.yaml: |
version: v1
sharing:
timeSlicing:
renameByDefault: true
failRequestsGreaterThanOne: false
resources:
- name: nvidia.com/gpu
replicas: {replicas}
---
# 更新 NVIDIA Device Plugin
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: nvidia-device-plugin-daemonset
namespace: {namespace}
spec:
selector:
matchLabels:
name: nvidia-device-plugin-ds
template:
metadata:
labels:
name: nvidia-device-plugin-ds
spec:
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
containers:
- name: nvidia-device-plugin-ctr
image: nvcr.io/nvidia/k8s-device-plugin:v0.14.1
env:
- name: CONFIG_FILE
value: /etc/nvidia/time-slicing-config.yaml
volumeMounts:
- name: device-plugin
mountPath: /var/lib/kubelet/device-plugins
- name: config
mountPath: /etc/nvidia
volumes:
- name: device-plugin
hostPath:
path: /var/lib/kubelet/device-plugins
- name: config
configMap:
name: time-slicing-config
"""
return manifests
class AdaptiveTimeSlicing:
"""自适应时间片调整"""
def __init__(self, min_replicas: int = 2, max_replicas: int = 10):
self.min_replicas = min_replicas
self.max_replicas = max_replicas
self.current_replicas = min_replicas
def calculate_optimal_replicas(
self,
workload_metrics: Dict
) -> int:
"""
根据工作负载计算最佳复制因子
考虑因素:
- GPU 利用率
- 显存使用率
- 平均任务时长
- 队列深度
"""
gpu_util = workload_metrics.get("gpu_utilization", 50)
memory_util = workload_metrics.get("memory_utilization", 50)
avg_task_duration = workload_metrics.get("avg_task_duration_ms", 100)
queue_depth = workload_metrics.get("queue_depth", 0)
# 基础复制因子
base_replicas = self.min_replicas
# 根据 GPU 利用率调整
if gpu_util < 30:
# 低利用率,增加复制
base_replicas = min(self.max_replicas, base_replicas + 2)
elif gpu_util > 80:
# 高利用率,减少复制
base_replicas = max(self.min_replicas, base_replicas - 1)
# 根据显存使用率调整
memory_factor = 1.0 - (memory_util / 100)
base_replicas = int(base_replicas * (1 + memory_factor * 0.5))
# 根据任务时长调整
if avg_task_duration < 50: # 短任务
base_replicas = min(self.max_replicas, base_replicas + 1)
elif avg_task_duration > 500: # 长任务
base_replicas = max(self.min_replicas, base_replicas - 1)
# 根据队列深度调整
if queue_depth > 10:
base_replicas = min(self.max_replicas, base_replicas + 1)
return min(self.max_replicas, max(self.min_replicas, base_replicas))
def should_update(self, new_replicas: int) -> bool:
"""判断是否需要更新配置"""
# 避免频繁变更
if abs(new_replicas - self.current_replicas) < 2:
return False
return True
# 使用示例
if __name__ == "__main__":
manager = GPUTimeSlicingManager()
# 生成基础配置
config = manager.generate_device_plugin_config(
gpu_configs={
"nvidia.com/gpu": TimeSlicingConfig(replicas=4),
}
)
print(yaml.dump(config))
# 生成 Kubernetes 清单
manifests = manager.generate_kubernetes_manifests(replicas=4)
print(manifests)
vGPU 虚拟化方案
vGPU 架构
┌─────────────────────────────────────────────────────────────────┐
│ vGPU 虚拟化架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 虚拟机 / 容器层 │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ VM 1 │ │ VM 2 │ │ VM 3 │ │ VM 4 │ │
│ │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │ │
│ │ │ App │ │ │ │ App │ │ │ │ App │ │ │ │ App │ │ │
│ │ ├───────┤ │ │ ├───────┤ │ │ ├───────┤ │ │ ├───────┤ │ │
│ │ │ CUDA │ │ │ │ CUDA │ │ │ │ CUDA │ │ │ │ CUDA │ │ │
│ │ ├───────┤ │ │ ├───────┤ │ │ ├───────┤ │ │ ├───────┤ │ │
│ │ │vGPU │ │ │ │vGPU │ │ │ │vGPU │ │ │ │vGPU │ │ │
│ │ │Driver │ │ │ │Driver │ │ │ │Driver │ │ │ │Driver │ │ │
│ │ └───────┘ │ │ └───────┘ │ │ └───────┘ │ │ └───────┘ │ │
│ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │
│ │ │ │ │ │
│ ══════╪══════════════╪══════════════╪══════════════╪════════ │
│ │ Hypervisor / Container Runtime │ │
│ ══════╪══════════════╪══════════════╪══════════════╪════════ │
│ │ │ │ │ │
│ vGPU Manager (NVIDIA GRID / HAMi / gpushare) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Scheduler │ │ Memory Mgr │ │ Profile Mgr │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ - 时间片 │ │ - 显存分配 │ │ - vGPU配置 │ │ │
│ │ │ - 优先级 │ │ - 显存隔离 │ │ - QoS策略 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ════════════════════════════╪════════════════════════════════ │
│ │ │
│ 物理 GPU │ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ NVIDIA GPU │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │SM Bank1│ │SM Bank2│ │SM Bank3│ │SM Bank4│ │ │
│ │ └────────┘ └────────┘ └────────┘ └────────┘ │ │
│ │ ┌──────────────────────────────────────────┐ │ │
│ │ │ GPU Memory (VRAM) │ │ │
│ │ └──────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
HAMi 开源 GPU 共享方案
"""
HAMi (Heterogeneous AI Computing Virtualization Middleware) 配置
开源 GPU 共享方案
"""
from typing import Dict, List, Optional
from dataclasses import dataclass
import yaml
@dataclass
class HAMiDeviceConfig:
"""HAMi 设备配置"""
gpu_memory: str # 显存大小,如 "4Gi"
gpu_cores: int # GPU 核心百分比 0-100
priority: int = 0 # 优先级
class HAMiManager:
"""HAMi 管理器"""
def __init__(self):
self.namespace = "hami-system"
def generate_installation_manifest(self) -> str:
"""生成安装清单"""
return f"""
# HAMi 命名空间
apiVersion: v1
kind: Namespace
metadata:
name: {self.namespace}
---
# 使用 Helm 安装
# helm repo add hami https://project-hami.github.io/HAMi/
# helm install hami hami/hami -n {self.namespace}
# 或使用 YAML 部署
# HAMi Scheduler
apiVersion: apps/v1
kind: Deployment
metadata:
name: hami-scheduler
namespace: {self.namespace}
spec:
replicas: 1
selector:
matchLabels:
app: hami-scheduler
template:
metadata:
labels:
app: hami-scheduler
spec:
serviceAccountName: hami-scheduler
containers:
- name: scheduler
image: projecthami/hami:v2.3
args:
- scheduler
- --config=/config/config.yaml
volumeMounts:
- name: config
mountPath: /config
volumes:
- name: config
configMap:
name: hami-scheduler-config
---
# HAMi Device Plugin
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: hami-device-plugin
namespace: {self.namespace}
spec:
selector:
matchLabels:
app: hami-device-plugin
template:
metadata:
labels:
app: hami-device-plugin
spec:
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
containers:
- name: device-plugin
image: projecthami/hami:v2.3
args:
- device-plugin
- --node-name=$(NODE_NAME)
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
securityContext:
privileged: true
volumeMounts:
- name: device-plugin
mountPath: /var/lib/kubelet/device-plugins
- name: pod-resources
mountPath: /var/lib/kubelet/pod-resources
volumes:
- name: device-plugin
hostPath:
path: /var/lib/kubelet/device-plugins
- name: pod-resources
hostPath:
path: /var/lib/kubelet/pod-resources
"""
def generate_pod_spec(
self,
name: str,
image: str,
gpu_memory: str,
gpu_cores: int = 100,
gpu_count: int = 1
) -> Dict:
"""
生成使用 HAMi 的 Pod 规格
Args:
name: Pod 名称
image: 容器镜像
gpu_memory: 显存大小 (如 "4Gi")
gpu_cores: GPU 核心百分比 (0-100)
gpu_count: GPU 数量
"""
return {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": name,
"annotations": {
# HAMi 注解
"hami.io/gpu-memory": gpu_memory,
"hami.io/gpu-cores": str(gpu_cores)
}
},
"spec": {
"schedulerName": "hami-scheduler",
"containers": [{
"name": "main",
"image": image,
"resources": {
"limits": {
"nvidia.com/gpu": str(gpu_count)
}
}
}]
}
}
def generate_scheduler_config(
self,
default_memory: str = "4Gi",
default_cores: int = 100,
enable_core_isolation: bool = True
) -> Dict:
"""生成调度器配置"""
return {
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "hami-scheduler-config",
"namespace": self.namespace
},
"data": {
"config.yaml": yaml.dump({
"scheduler": {
"defaultMemory": default_memory,
"defaultCores": default_cores,
"enableCoreIsolation": enable_core_isolation,
"resourceName": "nvidia.com/gpu",
"schedulerPolicy": "binpack", # binpack | spread
"nodeSelector": {},
"tolerations": []
},
"devicePlugin": {
"enableHealthCheck": True,
"healthCheckInterval": "30s"
}
})
}
}
class GPUShareScheduler:
"""GPU 共享调度器"""
def __init__(self):
self.namespace = "gpu-share"
def generate_gpushare_deployment(self) -> str:
"""生成 gpushare 部署清单"""
return """
# gpushare-scheduler-extender
apiVersion: apps/v1
kind: Deployment
metadata:
name: gpushare-scheduler-extender
namespace: kube-system
spec:
replicas: 1
selector:
matchLabels:
app: gpushare-scheduler-extender
template:
metadata:
labels:
app: gpushare-scheduler-extender
spec:
serviceAccountName: gpushare-scheduler-extender
containers:
- name: extender
image: registry.cn-hangzhou.aliyuncs.com/acs/k8s-gpushare-scheduler-extender:v2
env:
- name: LOG_LEVEL
value: info
ports:
- containerPort: 12345
---
# gpushare-device-plugin
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: gpushare-device-plugin
namespace: kube-system
spec:
selector:
matchLabels:
app: gpushare-device-plugin
template:
metadata:
labels:
app: gpushare-device-plugin
spec:
tolerations:
- operator: Exists
containers:
- name: device-plugin
image: registry.cn-hangzhou.aliyuncs.com/acs/k8s-gpushare-plugin:v2
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
securityContext:
privileged: true
volumeMounts:
- name: device-plugin
mountPath: /var/lib/kubelet/device-plugins
volumes:
- name: device-plugin
hostPath:
path: /var/lib/kubelet/device-plugins
---
# 调度器配置
apiVersion: v1
kind: ConfigMap
metadata:
name: scheduler-extender-config
namespace: kube-system
data:
scheduler-extender-config.yaml: |
apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
extenders:
- urlPrefix: "http://gpushare-scheduler-extender:12345/gpushare-scheduler"
filterVerb: "filter"
bindVerb: "bind"
enableHTTPS: false
nodeCacheCapable: true
managedResources:
- name: aliyun.com/gpu-mem
ignoredByScheduler: false
ignorable: false
"""
def generate_pod_with_gpu_share(
self,
name: str,
image: str,
gpu_memory_gb: int
) -> Dict:
"""生成使用 GPU 共享的 Pod"""
return {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": name
},
"spec": {
"schedulerName": "gpushare-scheduler",
"containers": [{
"name": "main",
"image": image,
"resources": {
"limits": {
"aliyun.com/gpu-mem": gpu_memory_gb
}
}
}]
}
}
# 使用示例
if __name__ == "__main__":
# HAMi 示例
hami = HAMiManager()
# 生成 Pod 配置
pod_spec = hami.generate_pod_spec(
name="inference-pod",
image="pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime",
gpu_memory="4Gi",
gpu_cores=50 # 使用 50% GPU 算力
)
print(yaml.dump(pod_spec))
显存虚拟化
显存池化架构
┌─────────────────────────────────────────────────────────────────┐
│ 显存池化架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 应用层 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ App 1 │ │ App 2 │ │ App 3 │ │ App 4 │ │
│ │ 需要8G │ │ 需要4G │ │ 需要12G │ │ 需要6G │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │ │
│ ═════╪════════════╪════════════╪════════════╪════════════════ │
│ │ │ │
│ 显存虚拟化层 (Unified Memory Manager) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ Virtual Address Space │ │ │
│ │ │ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │ │ │
│ │ │ │ App1 │ │ App2 │ │ App3 │ │ App4 │ │ Free │ │ │ │
│ │ │ │ 8G │ │ 4G │ │ 12G │ │ 6G │ │ │ │ │ │
│ │ │ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ └──────┘ │ │ │
│ │ └─────┼────────┼────────┼────────┼──────────────────┘ │ │
│ │ │ │ │ │ │ │
│ │ ┌─────┴────────┴────────┴────────┴─────────────────┐ │ │
│ │ │ Page Table / Memory Map │ │ │
│ │ │ ┌────┬────┬────┬────┬────┬────┬────┬────┐ │ │ │
│ │ │ │ P0 │ P1 │ P2 │ P3 │ P4 │ P5 │ .. │ Pn │ │ │ │
│ │ │ │GPU0│GPU0│GPU1│GPU1│HOST│GPU0│ │ │ │ │ │
│ │ │ └────┴────┴────┴────┴────┴────┴────┴────┘ │ │ │
│ │ └──────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ 特性: │ │
│ │ • 按需分配 (On-demand allocation) │ │
│ │ • 页面迁移 (Page migration) │ │
│ │ • 超分配 (Oversubscription) │ │
│ │ • 透明换出 (Transparent swap) │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ════════════════════════════╪════════════════════════════════ │
│ │ │
│ 物理资源层 │ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ GPU 0 (24GB) │ GPU 1 (24GB) │ Host (128GB) │ │
│ │ ┌──────────────┐ │ ┌──────────────┐ │ ┌────────────┐ │ │
│ │ │ VRAM │ │ │ VRAM │ │ │ RAM │ │ │
│ │ │ 实际: 18GB │ │ │ 实际: 20GB │ │ │ 溢出: 12GB│ │ │
│ │ └──────────────┘ │ └──────────────┘ │ └────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
显存管理实现
"""
GPU 显存虚拟化管理
"""
import torch
import numpy as np
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from threading import Lock
import logging
from enum import Enum
class MemoryLocation(Enum):
"""内存位置"""
GPU = "gpu"
HOST = "host"
DISK = "disk"
@dataclass
class MemoryBlock:
"""内存块"""
id: str
size: int # bytes
location: MemoryLocation
gpu_id: Optional[int] = None
last_access: float = 0.0
access_count: int = 0
pinned: bool = False
def __post_init__(self):
import time
self.last_access = time.time()
@dataclass
class TensorHandle:
"""张量句柄"""
tensor_id: str
shape: Tuple[int, ...]
dtype: torch.dtype
blocks: List[str] = field(default_factory=list)
materialized: bool = False
class VirtualMemoryManager:
"""虚拟显存管理器"""
def __init__(
self,
gpu_ids: List[int],
enable_oversubscription: bool = True,
oversubscription_ratio: float = 1.5,
page_size: int = 2 * 1024 * 1024 # 2MB
):
self.gpu_ids = gpu_ids
self.enable_oversubscription = enable_oversubscription
self.oversubscription_ratio = oversubscription_ratio
self.page_size = page_size
self.blocks: Dict[str, MemoryBlock] = {}
self.tensors: Dict[str, TensorHandle] = {}
self.gpu_usage: Dict[int, int] = {gid: 0 for gid in gpu_ids}
self.host_usage: int = 0
self._lock = Lock()
self.logger = logging.getLogger(__name__)
# 初始化 GPU 信息
self.gpu_capacities = self._get_gpu_capacities()
self.virtual_capacities = {
gid: int(cap * oversubscription_ratio)
for gid, cap in self.gpu_capacities.items()
}
def _get_gpu_capacities(self) -> Dict[int, int]:
"""获取 GPU 显存容量"""
capacities = {}
for gid in self.gpu_ids:
props = torch.cuda.get_device_properties(gid)
capacities[gid] = props.total_memory
return capacities
def allocate(
self,
size: int,
preferred_gpu: Optional[int] = None,
pinned: bool = False
) -> str:
"""
分配虚拟内存
Args:
size: 请求大小
preferred_gpu: 首选 GPU
pinned: 是否固定在 GPU
"""
with self._lock:
# 对齐到页大小
aligned_size = ((size + self.page_size - 1) // self.page_size) * self.page_size
# 选择目标 GPU
target_gpu = self._select_gpu(aligned_size, preferred_gpu)
# 确定初始位置
if target_gpu is not None and self._can_fit_gpu(target_gpu, aligned_size):
location = MemoryLocation.GPU
gpu_id = target_gpu
else:
location = MemoryLocation.HOST
gpu_id = None
# 创建内存块
block_id = f"block_{len(self.blocks)}"
block = MemoryBlock(
id=block_id,
size=aligned_size,
location=location,
gpu_id=gpu_id,
pinned=pinned
)
self.blocks[block_id] = block
# 更新使用统计
if location == MemoryLocation.GPU:
self.gpu_usage[gpu_id] += aligned_size
else:
self.host_usage += aligned_size
self.logger.info(
f"Allocated {aligned_size} bytes at {location.value}"
f"{f' (GPU {gpu_id})' if gpu_id is not None else ''}"
)
return block_id
def _select_gpu(
self,
size: int,
preferred_gpu: Optional[int]
) -> Optional[int]:
"""选择目标 GPU"""
if preferred_gpu is not None:
if self._can_fit_gpu(preferred_gpu, size):
return preferred_gpu
# 选择使用率最低的 GPU
best_gpu = None
min_usage = float('inf')
for gid in self.gpu_ids:
if self._can_fit_gpu(gid, size):
usage = self.gpu_usage[gid] / self.gpu_capacities[gid]
if usage < min_usage:
min_usage = usage
best_gpu = gid
return best_gpu
def _can_fit_gpu(self, gpu_id: int, size: int) -> bool:
"""检查是否能放入 GPU"""
current = self.gpu_usage[gpu_id]
capacity = self.virtual_capacities[gpu_id] if self.enable_oversubscription \
else self.gpu_capacities[gpu_id]
return current + size <= capacity
def free(self, block_id: str):
"""释放内存块"""
with self._lock:
if block_id not in self.blocks:
return
block = self.blocks[block_id]
if block.location == MemoryLocation.GPU:
self.gpu_usage[block.gpu_id] -= block.size
else:
self.host_usage -= block.size
del self.blocks[block_id]
def migrate(
self,
block_id: str,
target_location: MemoryLocation,
target_gpu: Optional[int] = None
) -> bool:
"""迁移内存块"""
with self._lock:
if block_id not in self.blocks:
return False
block = self.blocks[block_id]
if block.pinned:
self.logger.warning(f"Cannot migrate pinned block {block_id}")
return False
# 检查目标容量
if target_location == MemoryLocation.GPU:
if not self._can_fit_gpu(target_gpu, block.size):
return False
# 更新使用统计
if block.location == MemoryLocation.GPU:
self.gpu_usage[block.gpu_id] -= block.size
else:
self.host_usage -= block.size
if target_location == MemoryLocation.GPU:
self.gpu_usage[target_gpu] += block.size
else:
self.host_usage += block.size
# 更新块信息
block.location = target_location
block.gpu_id = target_gpu
self.logger.info(
f"Migrated block {block_id} to {target_location.value}"
)
return True
def evict_to_host(self, gpu_id: int, required_size: int) -> int:
"""
将数据从 GPU 驱逐到主机
使用 LRU 策略选择要驱逐的块
"""
evicted_size = 0
# 收集可驱逐的块,按访问时间排序
evictable = [
block for block in self.blocks.values()
if block.gpu_id == gpu_id and not block.pinned
]
evictable.sort(key=lambda b: b.last_access)
for block in evictable:
if evicted_size >= required_size:
break
if self.migrate(block.id, MemoryLocation.HOST):
evicted_size += block.size
return evicted_size
def prefetch_to_gpu(
self,
block_ids: List[str],
gpu_id: int
) -> int:
"""预取数据到 GPU"""
prefetched = 0
for block_id in block_ids:
if block_id not in self.blocks:
continue
block = self.blocks[block_id]
if block.location != MemoryLocation.GPU or block.gpu_id != gpu_id:
# 可能需要先驱逐其他数据
if not self._can_fit_gpu(gpu_id, block.size):
self.evict_to_host(gpu_id, block.size)
if self.migrate(block_id, MemoryLocation.GPU, gpu_id):
prefetched += 1
return prefetched
def get_statistics(self) -> Dict:
"""获取统计信息"""
stats = {
"gpu_usage": {},
"host_usage": self.host_usage,
"total_blocks": len(self.blocks),
"blocks_by_location": {
MemoryLocation.GPU.value: 0,
MemoryLocation.HOST.value: 0
}
}
for gid in self.gpu_ids:
stats["gpu_usage"][gid] = {
"used": self.gpu_usage[gid],
"capacity": self.gpu_capacities[gid],
"virtual_capacity": self.virtual_capacities[gid],
"utilization": self.gpu_usage[gid] / self.gpu_capacities[gid]
}
for block in self.blocks.values():
stats["blocks_by_location"][block.location.value] += 1
return stats
class UnifiedMemoryTensor:
"""统一内存张量"""
def __init__(
self,
vmm: VirtualMemoryManager,
shape: Tuple[int, ...],
dtype: torch.dtype = torch.float32,
device: Optional[int] = None
):
self.vmm = vmm
self.shape = shape
self.dtype = dtype
self.preferred_device = device
# 计算大小
self.element_size = torch.tensor([], dtype=dtype).element_size()
self.total_size = int(np.prod(shape)) * self.element_size
# 分配虚拟内存
self.block_id = vmm.allocate(self.total_size, device)
# 实际数据(延迟初始化)
self._data: Optional[torch.Tensor] = None
self._location: Optional[MemoryLocation] = None
def materialize(self, device: Optional[int] = None) -> torch.Tensor:
"""
具现化张量
确保数据在指定设备上
"""
target_device = device or self.preferred_device
block = self.vmm.blocks[self.block_id]
# 如果数据不在目标位置,进行迁移
if target_device is not None:
if block.location != MemoryLocation.GPU or block.gpu_id != target_device:
self.vmm.migrate(
self.block_id,
MemoryLocation.GPU,
target_device
)
# 创建或获取张量
if self._data is None:
if target_device is not None:
self._data = torch.empty(
self.shape,
dtype=self.dtype,
device=f"cuda:{target_device}"
)
else:
self._data = torch.empty(self.shape, dtype=self.dtype)
self._location = block.location
elif self._location != block.location:
# 位置已变更,移动数据
if block.location == MemoryLocation.GPU:
self._data = self._data.to(f"cuda:{block.gpu_id}")
else:
self._data = self._data.cpu()
self._location = block.location
# 更新访问统计
import time
block.last_access = time.time()
block.access_count += 1
return self._data
def release(self):
"""释放张量"""
self._data = None
self.vmm.free(self.block_id)
def __del__(self):
if hasattr(self, 'vmm') and hasattr(self, 'block_id'):
self.release()
# 使用示例
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# 创建虚拟内存管理器
if torch.cuda.is_available():
vmm = VirtualMemoryManager(
gpu_ids=[0],
enable_oversubscription=True,
oversubscription_ratio=1.5
)
# 创建统一内存张量
tensor = UnifiedMemoryTensor(
vmm,
shape=(1024, 1024),
dtype=torch.float32,
device=0
)
# 具现化并使用
data = tensor.materialize()
data.fill_(1.0)
# 查看统计
print(vmm.get_statistics())
# 释放
tensor.release()
性能监控与优化
虚拟化性能监控
"""
GPU 虚拟化性能监控
"""
import time
import threading
from typing import Dict, List, Callable, Optional
from dataclasses import dataclass
from collections import deque
import json
@dataclass
class VGPUMetrics:
"""vGPU 指标"""
timestamp: float
gpu_id: int
instance_id: str
# 利用率指标
compute_utilization: float # 0-100
memory_utilization: float # 0-100
encoder_utilization: float # 0-100
decoder_utilization: float # 0-100
# 内存指标
memory_used: int # bytes
memory_total: int # bytes
# 性能指标
sm_clock: int # MHz
memory_clock: int # MHz
temperature: int # Celsius
power_usage: int # Watts
# 虚拟化特有指标
context_switches: int
time_slice_violations: int
memory_migrations: int
class VGPUMonitor:
"""vGPU 监控器"""
def __init__(
self,
collection_interval: float = 1.0,
history_size: int = 3600
):
self.collection_interval = collection_interval
self.history_size = history_size
self.metrics_history: Dict[str, deque] = {}
self.alerts: List[Dict] = []
self.alert_callbacks: List[Callable] = []
self._running = False
self._thread: Optional[threading.Thread] = None
def start(self):
"""启动监控"""
self._running = True
self._thread = threading.Thread(target=self._collection_loop)
self._thread.daemon = True
self._thread.start()
def stop(self):
"""停止监控"""
self._running = False
if self._thread:
self._thread.join()
def _collection_loop(self):
"""采集循环"""
while self._running:
try:
metrics = self._collect_metrics()
self._process_metrics(metrics)
except Exception as e:
print(f"Metrics collection error: {e}")
time.sleep(self.collection_interval)
def _collect_metrics(self) -> List[VGPUMetrics]:
"""采集指标"""
metrics = []
# 使用 nvidia-smi 或 NVML 采集
# 这里是示例实现
try:
import pynvml
pynvml.nvmlInit()
device_count = pynvml.nvmlDeviceGetCount()
for i in range(device_count):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)
# 获取利用率
util = pynvml.nvmlDeviceGetUtilizationRates(handle)
# 获取内存
mem = pynvml.nvmlDeviceGetMemoryInfo(handle)
# 获取时钟
sm_clock = pynvml.nvmlDeviceGetClockInfo(
handle, pynvml.NVML_CLOCK_SM
)
mem_clock = pynvml.nvmlDeviceGetClockInfo(
handle, pynvml.NVML_CLOCK_MEM
)
# 获取温度
temp = pynvml.nvmlDeviceGetTemperature(
handle, pynvml.NVML_TEMPERATURE_GPU
)
# 获取功耗
power = pynvml.nvmlDeviceGetPowerUsage(handle) // 1000
# 尝试获取 MIG 实例信息
try:
mig_mode = pynvml.nvmlDeviceGetMigMode(handle)
if mig_mode[0] == pynvml.NVML_DEVICE_MIG_ENABLE:
# 遍历 MIG 实例
mig_count = pynvml.nvmlDeviceGetMaxMigDeviceCount(handle)
for j in range(mig_count):
try:
mig_handle = pynvml.nvmlDeviceGetMigDeviceHandleByIndex(
handle, j
)
# 采集 MIG 实例指标
mig_mem = pynvml.nvmlDeviceGetMemoryInfo(mig_handle)
metrics.append(VGPUMetrics(
timestamp=time.time(),
gpu_id=i,
instance_id=f"mig-{j}",
compute_utilization=0, # MIG 不支持细粒度利用率
memory_utilization=mig_mem.used / mig_mem.total * 100,
encoder_utilization=0,
decoder_utilization=0,
memory_used=mig_mem.used,
memory_total=mig_mem.total,
sm_clock=sm_clock,
memory_clock=mem_clock,
temperature=temp,
power_usage=power,
context_switches=0,
time_slice_violations=0,
memory_migrations=0
))
except:
pass
except:
pass
# 整卡指标
metrics.append(VGPUMetrics(
timestamp=time.time(),
gpu_id=i,
instance_id="full",
compute_utilization=util.gpu,
memory_utilization=mem.used / mem.total * 100,
encoder_utilization=0,
decoder_utilization=0,
memory_used=mem.used,
memory_total=mem.total,
sm_clock=sm_clock,
memory_clock=mem_clock,
temperature=temp,
power_usage=power,
context_switches=0,
time_slice_violations=0,
memory_migrations=0
))
pynvml.nvmlShutdown()
except ImportError:
pass
return metrics
def _process_metrics(self, metrics: List[VGPUMetrics]):
"""处理指标"""
for m in metrics:
key = f"{m.gpu_id}:{m.instance_id}"
if key not in self.metrics_history:
self.metrics_history[key] = deque(maxlen=self.history_size)
self.metrics_history[key].append(m)
# 检查告警
self._check_alerts(m)
def _check_alerts(self, metrics: VGPUMetrics):
"""检查告警条件"""
alerts = []
# 高温告警
if metrics.temperature > 85:
alerts.append({
"type": "high_temperature",
"severity": "critical",
"gpu_id": metrics.gpu_id,
"instance_id": metrics.instance_id,
"value": metrics.temperature,
"threshold": 85,
"message": f"GPU {metrics.gpu_id} temperature critical: {metrics.temperature}°C"
})
elif metrics.temperature > 80:
alerts.append({
"type": "high_temperature",
"severity": "warning",
"gpu_id": metrics.gpu_id,
"instance_id": metrics.instance_id,
"value": metrics.temperature,
"threshold": 80,
"message": f"GPU {metrics.gpu_id} temperature warning: {metrics.temperature}°C"
})
# 显存告警
memory_util = metrics.memory_used / metrics.memory_total * 100
if memory_util > 95:
alerts.append({
"type": "high_memory",
"severity": "critical",
"gpu_id": metrics.gpu_id,
"instance_id": metrics.instance_id,
"value": memory_util,
"threshold": 95,
"message": f"GPU {metrics.gpu_id} memory critical: {memory_util:.1f}%"
})
# 触发告警回调
for alert in alerts:
alert["timestamp"] = time.time()
self.alerts.append(alert)
for callback in self.alert_callbacks:
try:
callback(alert)
except Exception as e:
print(f"Alert callback error: {e}")
def add_alert_callback(self, callback: Callable):
"""添加告警回调"""
self.alert_callbacks.append(callback)
def get_current_metrics(self, gpu_id: int = None) -> List[VGPUMetrics]:
"""获取当前指标"""
result = []
for key, history in self.metrics_history.items():
if history:
m = history[-1]
if gpu_id is None or m.gpu_id == gpu_id:
result.append(m)
return result
def get_metrics_summary(self, gpu_id: int, window_seconds: int = 60) -> Dict:
"""获取指标摘要"""
cutoff = time.time() - window_seconds
summary = {
"avg_compute_utilization": 0,
"avg_memory_utilization": 0,
"max_temperature": 0,
"avg_power_usage": 0,
"sample_count": 0
}
for key, history in self.metrics_history.items():
samples = [m for m in history if m.gpu_id == gpu_id and m.timestamp > cutoff]
if samples:
summary["avg_compute_utilization"] = sum(
m.compute_utilization for m in samples
) / len(samples)
summary["avg_memory_utilization"] = sum(
m.memory_utilization for m in samples
) / len(samples)
summary["max_temperature"] = max(m.temperature for m in samples)
summary["avg_power_usage"] = sum(
m.power_usage for m in samples
) / len(samples)
summary["sample_count"] = len(samples)
return summary
def export_metrics(self, format: str = "json") -> str:
"""导出指标"""
data = {
"timestamp": time.time(),
"metrics": {}
}
for key, history in self.metrics_history.items():
if history:
latest = history[-1]
data["metrics"][key] = {
"compute_utilization": latest.compute_utilization,
"memory_utilization": latest.memory_utilization,
"memory_used": latest.memory_used,
"memory_total": latest.memory_total,
"temperature": latest.temperature,
"power_usage": latest.power_usage
}
if format == "json":
return json.dumps(data, indent=2)
else:
return str(data)
# Prometheus 指标导出
class PrometheusVGPUExporter:
"""Prometheus vGPU 指标导出器"""
def __init__(self, monitor: VGPUMonitor, port: int = 9400):
self.monitor = monitor
self.port = port
def generate_metrics(self) -> str:
"""生成 Prometheus 格式指标"""
lines = []
lines.append("# HELP vgpu_compute_utilization GPU compute utilization")
lines.append("# TYPE vgpu_compute_utilization gauge")
lines.append("# HELP vgpu_memory_utilization GPU memory utilization")
lines.append("# TYPE vgpu_memory_utilization gauge")
lines.append("# HELP vgpu_memory_used GPU memory used in bytes")
lines.append("# TYPE vgpu_memory_used gauge")
lines.append("# HELP vgpu_temperature GPU temperature in Celsius")
lines.append("# TYPE vgpu_temperature gauge")
lines.append("# HELP vgpu_power_usage GPU power usage in Watts")
lines.append("# TYPE vgpu_power_usage gauge")
for metrics in self.monitor.get_current_metrics():
labels = f'gpu_id="{metrics.gpu_id}",instance_id="{metrics.instance_id}"'
lines.append(
f'vgpu_compute_utilization{{{labels}}} {metrics.compute_utilization}'
)
lines.append(
f'vgpu_memory_utilization{{{labels}}} {metrics.memory_utilization}'
)
lines.append(
f'vgpu_memory_used{{{labels}}} {metrics.memory_used}'
)
lines.append(
f'vgpu_temperature{{{labels}}} {metrics.temperature}'
)
lines.append(
f'vgpu_power_usage{{{labels}}} {metrics.power_usage}'
)
return '\n'.join(lines)
# 使用示例
if __name__ == "__main__":
# 创建监控器
monitor = VGPUMonitor(collection_interval=1.0)
# 添加告警回调
def alert_handler(alert):
print(f"Alert: {alert['message']}")
monitor.add_alert_callback(alert_handler)
# 启动监控
monitor.start()
# 运行一段时间
time.sleep(10)
# 获取指标摘要
summary = monitor.get_metrics_summary(gpu_id=0, window_seconds=60)
print(f"Summary: {summary}")
# 导出指标
print(monitor.export_metrics())
# 停止监控
monitor.stop()
最佳实践
虚拟化方案选择
# GPU 虚拟化方案选择指南
virtualization_selection:
# 场景 1: 推理服务
inference:
recommended: "MIG + Time Slicing"
reasoning:
- 推理任务通常显存需求小
- 需要高并发处理
- 延迟敏感
configuration:
mig_profile: "1g.10gb" # 小实例
time_slicing_replicas: 2
# 场景 2: 训练任务
training:
recommended: "MIG (大实例) 或 Full GPU"
reasoning:
- 训练需要大量显存
- 需要稳定的计算资源
- 长时间运行
configuration:
mig_profile: "3g.40gb" # 大实例
# 场景 3: 开发测试
development:
recommended: "Time Slicing + HAMi"
reasoning:
- 资源需求变化大
- 需要灵活分配
- 成本敏感
configuration:
time_slicing_replicas: 4
hami_gpu_memory: "4Gi"
hami_gpu_cores: 25
# 场景 4: 混合工作负载
mixed:
recommended: "MIG 混合配置"
reasoning:
- 不同任务需求不同
- 需要资源隔离
- 优化整体利用率
configuration:
mig_profiles:
- "1g.10gb": 4 # 推理
- "3g.40gb": 1 # 训练
---
# 虚拟化配置检查清单
checklist:
pre_deployment:
- [ ] 确认 GPU 支持 MIG(A100/A30/H100)
- [ ] 规划 MIG 分区策略
- [ ] 评估时间片开销
- [ ] 配置设备插件
monitoring:
- [ ] 部署 GPU 监控
- [ ] 配置告警规则
- [ ] 设置性能基线
optimization:
- [ ] 定期评估利用率
- [ ] 调整分区配置
- [ ] 优化调度策略
Kubernetes 集成配置
# 完整的 GPU 虚拟化 Kubernetes 配置
---
# GPU Operator 配置
apiVersion: nvidia.com/v1
kind: ClusterPolicy
metadata:
name: cluster-policy
spec:
operator:
defaultRuntime: containerd
driver:
enabled: true
version: "535.104.05"
toolkit:
enabled: true
devicePlugin:
enabled: true
config:
name: nvidia-device-plugin-config
default: "any"
mig:
strategy: mixed
migManager:
enabled: true
config:
name: mig-parted-config
default: "all-balanced"
---
# 设备插件配置
apiVersion: v1
kind: ConfigMap
metadata:
name: nvidia-device-plugin-config
namespace: gpu-operator
data:
any: |
version: v1
flags:
migStrategy: mixed
sharing:
timeSlicing:
renameByDefault: false
failRequestsGreaterThanOne: false
resources:
- name: nvidia.com/gpu
replicas: 2
inference: |
version: v1
flags:
migStrategy: single
sharing:
timeSlicing:
resources:
- name: nvidia.com/gpu
replicas: 4
---
# MIG 分区配置
apiVersion: v1
kind: ConfigMap
metadata:
name: mig-parted-config
namespace: gpu-operator
data:
config.yaml: |
version: v1
mig-configs:
all-disabled:
- devices: all
mig-enabled: false
all-balanced:
- devices: all
mig-enabled: true
mig-devices:
"1g.10gb": 4
"2g.20gb": 1
all-inference:
- devices: all
mig-enabled: true
mig-devices:
"1g.10gb": 7
all-training:
- devices: all
mig-enabled: true
mig-devices:
"3g.40gb": 2
---
# 节点标签策略
apiVersion: v1
kind: ConfigMap
metadata:
name: gpu-node-labels
data:
labeling-rules.yaml: |
# 根据 GPU 类型打标签
- match:
gpu-model: "A100-SXM4-80GB"
labels:
gpu.nvidia.com/class: high-end
gpu.nvidia.com/mig-capable: "true"
- match:
gpu-model: "A100-SXM4-40GB"
labels:
gpu.nvidia.com/class: mid-range
gpu.nvidia.com/mig-capable: "true"
- match:
gpu-model: "V100"
labels:
gpu.nvidia.com/class: legacy
gpu.nvidia.com/mig-capable: "false"
---
# ResourceQuota 限制
apiVersion: v1
kind: ResourceQuota
metadata:
name: gpu-quota
namespace: ml-workloads
spec:
hard:
# 全 GPU
nvidia.com/gpu: "8"
# MIG 实例
nvidia.com/mig-1g.10gb: "28"
nvidia.com/mig-2g.20gb: "6"
nvidia.com/mig-3g.40gb: "4"
# HAMi 资源
nvidia.com/gpumem: "320Gi"
nvidia.com/gpucores: "800"
总结
GPU 虚拟化技术是提高 AI 基础设施资源利用率的关键:
- MIG 技术:硬件级分区,提供真正的资源隔离
- 时间片共享:软件级共享,适合轻量级工作负载
- vGPU 方案:灵活的显存和算力分配
- 显存虚拟化:支持超分配和按需迁移
选择合适的虚拟化方案需要考虑:
- 工作负载特性(训练/推理)
- 隔离需求(硬隔离/软隔离)
- 资源利用率目标
- 运维复杂度