HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

算力池化与弹性调度

概述

算力池化是将分散的 GPU/NPU 资源统一管理,实现资源的弹性分配和高效利用。通过池化技术,可以打破物理边界,让计算资源像云服务一样按需使用。本文探讨算力池化的架构设计、实现方案及弹性调度策略。

算力池化架构

整体架构设计

┌─────────────────────────────────────────────────────────────────┐
│                      算力池化平台架构                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  用户接入层                                                       │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────────┐ │  │
│  │  │ Web UI  │  │  CLI    │  │   API   │  │ SDK/Client  │ │  │
│  │  └────┬────┘  └────┬────┘  └────┬────┘  └──────┬──────┘ │  │
│  │       └───────────┬┴───────────┬┴──────────────┘        │  │
│  └───────────────────┼────────────┼────────────────────────┘  │
│                      │            │                            │
│  ════════════════════╪════════════╪════════════════════════   │
│                      │            │                            │
│  资源管理层                        │                            │
│  ┌───────────────────┴────────────┴────────────────────────┐  │
│  │                                                          │  │
│  │  ┌────────────────────────────────────────────────────┐  │  │
│  │  │              Resource Manager (资源管理器)           │  │  │
│  │  │  ┌──────────┐  ┌──────────┐  ┌──────────┐         │  │  │
│  │  │  │ 资源发现  │  │ 资源抽象  │  │ 容量规划  │         │  │  │
│  │  │  └──────────┘  └──────────┘  └──────────┘         │  │  │
│  │  └────────────────────────────────────────────────────┘  │  │
│  │                                                          │  │
│  │  ┌────────────────────────────────────────────────────┐  │  │
│  │  │              Scheduler (调度器)                     │  │  │
│  │  │  ┌──────────┐  ┌──────────┐  ┌──────────┐         │  │  │
│  │  │  │ 队列管理  │  │ 优先级   │  │ 抢占策略  │         │  │  │
│  │  │  └──────────┘  └──────────┘  └──────────┘         │  │  │
│  │  │  ┌──────────┐  ┌──────────┐  ┌──────────┐         │  │  │
│  │  │  │ Gang调度 │  │ 弹性伸缩  │  │ 公平共享  │         │  │  │
│  │  │  └──────────┘  └──────────┘  └──────────┘         │  │  │
│  │  └────────────────────────────────────────────────────┘  │  │
│  │                                                          │  │
│  │  ┌────────────────────────────────────────────────────┐  │  │
│  │  │              Quota Manager (配额管理)               │  │  │
│  │  │  ┌──────────┐  ┌──────────┐  ┌──────────┐         │  │  │
│  │  │  │ 租户配额  │  │ 计费统计  │  │ 用量告警  │         │  │  │
│  │  │  └──────────┘  └──────────┘  └──────────┘         │  │  │
│  │  └────────────────────────────────────────────────────┘  │  │
│  │                                                          │  │
│  └──────────────────────────────────────────────────────────┘  │
│                              │                                  │
│  ════════════════════════════╪════════════════════════════════  │
│                              │                                  │
│  虚拟化层                     │                                  │
│  ┌───────────────────────────┴───────────────────────────────┐  │
│  │                                                           │  │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐       │  │
│  │  │ GPU 虚拟化  │  │ 显存池化    │  │ 远程 GPU    │       │  │
│  │  │ (MIG/vGPU) │  │ (Memory Pool)│  │ (RDMA/RPC) │       │  │
│  │  └─────────────┘  └─────────────┘  └─────────────┘       │  │
│  │                                                           │  │
│  │  ┌─────────────────────────────────────────────────────┐ │  │
│  │  │           Unified Device Layer (统一设备层)          │ │  │
│  │  │  NVIDIA GPU │ AMD GPU │ 昇腾 NPU │ Gaudi │ TPU      │ │  │
│  │  └─────────────────────────────────────────────────────┘ │  │
│  │                                                           │  │
│  └───────────────────────────────────────────────────────────┘  │
│                              │                                  │
│  ════════════════════════════╪════════════════════════════════  │
│                              │                                  │
│  物理资源层                   │                                  │
│  ┌───────────────────────────┴───────────────────────────────┐  │
│  │                                                           │  │
│  │  ┌─────────────────────────────────────────────────────┐ │  │
│  │  │                  物理集群 A (训练)                    │ │  │
│  │  │  ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐                   │ │  │
│  │  │  │Node1│ │Node2│ │Node3│ │Node4│  ... (H100×8)      │ │  │
│  │  │  └─────┘ └─────┘ └─────┘ └─────┘                   │ │  │
│  │  └─────────────────────────────────────────────────────┘ │  │
│  │                                                           │  │
│  │  ┌─────────────────────────────────────────────────────┐ │  │
│  │  │                  物理集群 B (推理)                    │ │  │
│  │  │  ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐                   │ │  │
│  │  │  │Node1│ │Node2│ │Node3│ │Node4│  ... (A10×4)       │ │  │
│  │  │  └─────┘ └─────┘ └─────┘ └─────┘                   │ │  │
│  │  └─────────────────────────────────────────────────────┘ │  │
│  │                                                           │  │
│  └───────────────────────────────────────────────────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

资源抽象模型

"""
算力池化资源抽象模型
"""
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import uuid


class ResourceType(Enum):
    """资源类型"""
    GPU = "gpu"
    NPU = "npu"
    CPU = "cpu"
    MEMORY = "memory"
    STORAGE = "storage"
    NETWORK = "network"


class ResourceState(Enum):
    """资源状态"""
    AVAILABLE = "available"
    ALLOCATED = "allocated"
    RESERVED = "reserved"
    MAINTENANCE = "maintenance"
    FAILED = "failed"


@dataclass
class ResourceSpec:
    """资源规格"""
    type: ResourceType
    model: str  # e.g., "A100-80GB", "910B"
    quantity: float  # 支持小数(虚拟化场景)
    attributes: Dict[str, str] = field(default_factory=dict)


@dataclass
class PhysicalResource:
    """物理资源"""
    id: str
    type: ResourceType
    model: str
    node_name: str
    state: ResourceState
    capacity: float  # 总容量
    allocated: float  # 已分配
    attributes: Dict[str, str] = field(default_factory=dict)

    @property
    def available(self) -> float:
        return self.capacity - self.allocated

    def can_allocate(self, amount: float) -> bool:
        return self.available >= amount


@dataclass
class VirtualResource:
    """虚拟资源"""
    id: str
    physical_id: str  # 对应的物理资源
    type: ResourceType
    model: str
    quantity: float
    state: ResourceState
    owner: Optional[str] = None  # 租户 ID
    created_at: datetime = field(default_factory=datetime.now)


@dataclass
class ResourcePool:
    """资源池"""
    id: str
    name: str
    type: ResourceType
    physical_resources: List[str]  # 物理资源 ID 列表
    total_capacity: float
    allocated_capacity: float
    reserved_capacity: float
    labels: Dict[str, str] = field(default_factory=dict)
    quotas: Dict[str, float] = field(default_factory=dict)  # 租户配额

    @property
    def available_capacity(self) -> float:
        return self.total_capacity - self.allocated_capacity - self.reserved_capacity


@dataclass
class ResourceRequest:
    """资源请求"""
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    tenant_id: str = ""
    resources: List[ResourceSpec] = field(default_factory=list)
    priority: int = 0
    preemptible: bool = True
    constraints: Dict[str, str] = field(default_factory=dict)
    created_at: datetime = field(default_factory=datetime.now)


@dataclass
class ResourceAllocation:
    """资源分配"""
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    request_id: str = ""
    tenant_id: str = ""
    resources: List[VirtualResource] = field(default_factory=list)
    state: str = "pending"
    created_at: datetime = field(default_factory=datetime.now)
    expires_at: Optional[datetime] = None


class ResourcePoolManager:
    """资源池管理器"""

    def __init__(self):
        self.physical_resources: Dict[str, PhysicalResource] = {}
        self.virtual_resources: Dict[str, VirtualResource] = {}
        self.pools: Dict[str, ResourcePool] = {}
        self.allocations: Dict[str, ResourceAllocation] = {}

    def register_physical_resource(self, resource: PhysicalResource):
        """注册物理资源"""
        self.physical_resources[resource.id] = resource

    def create_pool(
        self,
        name: str,
        resource_type: ResourceType,
        resource_ids: List[str],
        labels: Dict[str, str] = None
    ) -> ResourcePool:
        """创建资源池"""
        total_capacity = sum(
            self.physical_resources[rid].capacity
            for rid in resource_ids
            if rid in self.physical_resources
        )

        pool = ResourcePool(
            id=str(uuid.uuid4()),
            name=name,
            type=resource_type,
            physical_resources=resource_ids,
            total_capacity=total_capacity,
            allocated_capacity=0,
            reserved_capacity=0,
            labels=labels or {}
        )

        self.pools[pool.id] = pool
        return pool

    def set_quota(self, pool_id: str, tenant_id: str, quota: float):
        """设置租户配额"""
        if pool_id in self.pools:
            self.pools[pool_id].quotas[tenant_id] = quota

    def get_tenant_usage(self, pool_id: str, tenant_id: str) -> float:
        """获取租户用量"""
        usage = 0
        for alloc in self.allocations.values():
            if alloc.tenant_id == tenant_id:
                for vr in alloc.resources:
                    if self.physical_resources.get(vr.physical_id, {}) and \
                       vr.physical_id in self.pools.get(pool_id, ResourcePool("", "", ResourceType.GPU, [], 0, 0, 0)).physical_resources:
                        usage += vr.quantity
        return usage

    def allocate(
        self,
        request: ResourceRequest,
        pool_id: Optional[str] = None
    ) -> Optional[ResourceAllocation]:
        """分配资源"""
        # 检查配额
        if pool_id:
            pool = self.pools.get(pool_id)
            if pool:
                quota = pool.quotas.get(request.tenant_id, float('inf'))
                current_usage = self.get_tenant_usage(pool_id, request.tenant_id)
                requested = sum(r.quantity for r in request.resources)
                if current_usage + requested > quota:
                    return None

        # 执行分配
        allocation = ResourceAllocation(
            request_id=request.id,
            tenant_id=request.tenant_id,
            state="active"
        )

        for spec in request.resources:
            # 找到可用的物理资源
            for pr_id, pr in self.physical_resources.items():
                if pr.type == spec.type and pr.model == spec.model:
                    if pr.can_allocate(spec.quantity):
                        # 创建虚拟资源
                        vr = VirtualResource(
                            id=str(uuid.uuid4()),
                            physical_id=pr_id,
                            type=spec.type,
                            model=spec.model,
                            quantity=spec.quantity,
                            state=ResourceState.ALLOCATED,
                            owner=request.tenant_id
                        )
                        self.virtual_resources[vr.id] = vr
                        allocation.resources.append(vr)

                        # 更新物理资源使用量
                        pr.allocated += spec.quantity

                        # 更新池使用量
                        if pool_id and pool_id in self.pools:
                            self.pools[pool_id].allocated_capacity += spec.quantity

                        break

        if len(allocation.resources) == len(request.resources):
            self.allocations[allocation.id] = allocation
            return allocation
        else:
            # 回滚
            self._rollback_allocation(allocation)
            return None

    def release(self, allocation_id: str):
        """释放资源"""
        allocation = self.allocations.get(allocation_id)
        if not allocation:
            return

        for vr in allocation.resources:
            # 更新物理资源
            pr = self.physical_resources.get(vr.physical_id)
            if pr:
                pr.allocated -= vr.quantity

            # 删除虚拟资源
            if vr.id in self.virtual_resources:
                del self.virtual_resources[vr.id]

        # 更新池使用量
        for pool in self.pools.values():
            if vr.physical_id in pool.physical_resources:
                pool.allocated_capacity -= vr.quantity

        allocation.state = "released"

    def _rollback_allocation(self, allocation: ResourceAllocation):
        """回滚分配"""
        for vr in allocation.resources:
            pr = self.physical_resources.get(vr.physical_id)
            if pr:
                pr.allocated -= vr.quantity
            if vr.id in self.virtual_resources:
                del self.virtual_resources[vr.id]

    def get_pool_stats(self, pool_id: str) -> Dict:
        """获取池统计信息"""
        pool = self.pools.get(pool_id)
        if not pool:
            return {}

        return {
            "id": pool.id,
            "name": pool.name,
            "type": pool.type.value,
            "total_capacity": pool.total_capacity,
            "allocated_capacity": pool.allocated_capacity,
            "reserved_capacity": pool.reserved_capacity,
            "available_capacity": pool.available_capacity,
            "utilization": pool.allocated_capacity / pool.total_capacity if pool.total_capacity > 0 else 0,
            "resource_count": len(pool.physical_resources),
            "tenant_quotas": pool.quotas
        }


# 使用示例
if __name__ == "__main__":
    manager = ResourcePoolManager()

    # 注册物理资源
    for i in range(8):
        manager.register_physical_resource(PhysicalResource(
            id=f"gpu-{i}",
            type=ResourceType.GPU,
            model="A100-80GB",
            node_name="node-0",
            state=ResourceState.AVAILABLE,
            capacity=1.0,
            allocated=0
        ))

    # 创建资源池
    pool = manager.create_pool(
        name="training-pool",
        resource_type=ResourceType.GPU,
        resource_ids=[f"gpu-{i}" for i in range(8)],
        labels={"purpose": "training"}
    )

    # 设置配额
    manager.set_quota(pool.id, "tenant-a", 4.0)
    manager.set_quota(pool.id, "tenant-b", 4.0)

    # 分配资源
    request = ResourceRequest(
        tenant_id="tenant-a",
        resources=[
            ResourceSpec(type=ResourceType.GPU, model="A100-80GB", quantity=2)
        ]
    )

    allocation = manager.allocate(request, pool.id)
    if allocation:
        print(f"Allocated: {len(allocation.resources)} resources")
        for vr in allocation.resources:
            print(f"  - {vr.type.value} {vr.model}: {vr.quantity}")

    # 查看池状态
    stats = manager.get_pool_stats(pool.id)
    print(f"\nPool Stats: {stats}")

弹性调度系统

队列管理与调度

"""
弹性调度系统实现
"""
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timedelta
import heapq
import threading
import time
import logging


class JobState(Enum):
    """作业状态"""
    PENDING = "pending"
    QUEUED = "queued"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    PREEMPTED = "preempted"
    SUSPENDED = "suspended"


class QueuePolicy(Enum):
    """队列策略"""
    FIFO = "fifo"
    PRIORITY = "priority"
    FAIR_SHARE = "fair_share"
    CAPACITY = "capacity"


@dataclass
class Job:
    """作业"""
    id: str
    name: str
    tenant_id: str
    queue_name: str
    priority: int = 0
    gpu_request: int = 1
    gpu_limit: int = 8
    preemptible: bool = True
    elastic: bool = False  # 是否支持弹性伸缩
    min_replicas: int = 1
    max_replicas: int = 1
    state: JobState = JobState.PENDING
    current_replicas: int = 0
    created_at: datetime = field(default_factory=datetime.now)
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None

    def __lt__(self, other):
        # 用于优先级队列排序
        if self.priority != other.priority:
            return self.priority > other.priority  # 高优先级在前
        return self.created_at < other.created_at  # 同优先级,先到先服务


@dataclass
class Queue:
    """调度队列"""
    name: str
    policy: QueuePolicy
    priority: int = 0
    max_capacity: int = 0  # 最大容量 (GPU 数)
    guaranteed_capacity: int = 0  # 保证容量
    current_usage: int = 0
    weight: float = 1.0  # 公平共享权重
    jobs: List[Job] = field(default_factory=list)
    tenants: Set[str] = field(default_factory=set)

    def add_job(self, job: Job):
        """添加作业"""
        heapq.heappush(self.jobs, job)
        job.state = JobState.QUEUED

    def pop_job(self) -> Optional[Job]:
        """取出作业"""
        if self.jobs:
            return heapq.heappop(self.jobs)
        return None

    def peek_job(self) -> Optional[Job]:
        """查看队首作业"""
        if self.jobs:
            return self.jobs[0]
        return None


class ElasticScheduler:
    """弹性调度器"""

    def __init__(
        self,
        total_capacity: int,
        preemption_enabled: bool = True
    ):
        self.total_capacity = total_capacity
        self.preemption_enabled = preemption_enabled

        self.queues: Dict[str, Queue] = {}
        self.running_jobs: Dict[str, Job] = {}
        self.allocations: Dict[str, int] = {}  # job_id -> gpu_count

        self._lock = threading.Lock()
        self._running = False
        self._scheduler_thread: Optional[threading.Thread] = None

        self.logger = logging.getLogger(__name__)

        # 回调函数
        self.on_job_start: Optional[Callable] = None
        self.on_job_complete: Optional[Callable] = None
        self.on_job_preempt: Optional[Callable] = None

    def create_queue(
        self,
        name: str,
        policy: QueuePolicy = QueuePolicy.PRIORITY,
        max_capacity: int = 0,
        guaranteed_capacity: int = 0,
        weight: float = 1.0,
        priority: int = 0
    ) -> Queue:
        """创建队列"""
        queue = Queue(
            name=name,
            policy=policy,
            max_capacity=max_capacity or self.total_capacity,
            guaranteed_capacity=guaranteed_capacity,
            weight=weight,
            priority=priority
        )
        self.queues[name] = queue
        return queue

    def submit_job(self, job: Job) -> bool:
        """提交作业"""
        with self._lock:
            queue = self.queues.get(job.queue_name)
            if not queue:
                self.logger.error(f"Queue {job.queue_name} not found")
                return False

            queue.add_job(job)
            self.logger.info(f"Job {job.id} submitted to queue {job.queue_name}")
            return True

    def start(self):
        """启动调度器"""
        self._running = True
        self._scheduler_thread = threading.Thread(target=self._scheduling_loop)
        self._scheduler_thread.daemon = True
        self._scheduler_thread.start()

    def stop(self):
        """停止调度器"""
        self._running = False
        if self._scheduler_thread:
            self._scheduler_thread.join()

    def _scheduling_loop(self):
        """调度循环"""
        while self._running:
            try:
                self._schedule_once()
            except Exception as e:
                self.logger.error(f"Scheduling error: {e}")
            time.sleep(1)

    def _schedule_once(self):
        """执行一次调度"""
        with self._lock:
            # 计算可用容量
            used_capacity = sum(self.allocations.values())
            available_capacity = self.total_capacity - used_capacity

            if available_capacity <= 0:
                # 尝试抢占
                if self.preemption_enabled:
                    self._try_preemption()
                return

            # 按队列优先级和策略调度
            sorted_queues = sorted(
                self.queues.values(),
                key=lambda q: (-q.priority, -q.weight)
            )

            for queue in sorted_queues:
                job = queue.peek_job()
                if not job:
                    continue

                # 检查容量限制
                if queue.current_usage >= queue.max_capacity:
                    continue

                # 检查作业资源需求
                gpu_to_allocate = min(
                    job.gpu_request,
                    available_capacity,
                    queue.max_capacity - queue.current_usage
                )

                if gpu_to_allocate >= job.min_replicas:
                    # 分配资源
                    queue.pop_job()
                    self._start_job(job, gpu_to_allocate)
                    queue.current_usage += gpu_to_allocate
                    available_capacity -= gpu_to_allocate

            # 弹性伸缩检查
            self._elastic_scaling(available_capacity)

    def _start_job(self, job: Job, gpu_count: int):
        """启动作业"""
        job.state = JobState.RUNNING
        job.started_at = datetime.now()
        job.current_replicas = gpu_count

        self.running_jobs[job.id] = job
        self.allocations[job.id] = gpu_count

        self.logger.info(f"Started job {job.id} with {gpu_count} GPUs")

        if self.on_job_start:
            self.on_job_start(job, gpu_count)

    def _try_preemption(self):
        """尝试抢占"""
        # 找到高优先级等待作业
        waiting_jobs = []
        for queue in self.queues.values():
            for job in queue.jobs:
                if job.state == JobState.QUEUED:
                    waiting_jobs.append((queue.priority, job))

        if not waiting_jobs:
            return

        # 按优先级排序
        waiting_jobs.sort(key=lambda x: -x[0])

        for queue_priority, waiting_job in waiting_jobs:
            # 找可抢占的作业
            preemptible_jobs = [
                (job_id, job)
                for job_id, job in self.running_jobs.items()
                if job.preemptible and job.priority < waiting_job.priority
            ]

            if not preemptible_jobs:
                continue

            # 按优先级排序(低优先级先被抢占)
            preemptible_jobs.sort(key=lambda x: x[1].priority)

            # 释放资源直到满足需求
            released = 0
            for job_id, job in preemptible_jobs:
                if released >= waiting_job.gpu_request:
                    break

                self._preempt_job(job)
                released += self.allocations.get(job_id, 0)

    def _preempt_job(self, job: Job):
        """抢占作业"""
        job.state = JobState.PREEMPTED

        # 释放资源
        gpu_count = self.allocations.pop(job.id, 0)

        # 更新队列使用量
        queue = self.queues.get(job.queue_name)
        if queue:
            queue.current_usage -= gpu_count

        # 从运行列表移除
        self.running_jobs.pop(job.id, None)

        self.logger.info(f"Preempted job {job.id}")

        if self.on_job_preempt:
            self.on_job_preempt(job)

        # 重新加入队列
        if queue:
            job.state = JobState.PENDING
            queue.add_job(job)

    def _elastic_scaling(self, available_capacity: int):
        """弹性伸缩"""
        if available_capacity <= 0:
            return

        # 找可以扩容的弹性作业
        elastic_jobs = [
            job for job in self.running_jobs.values()
            if job.elastic and job.current_replicas < job.max_replicas
        ]

        if not elastic_jobs:
            return

        # 按优先级分配额外资源
        elastic_jobs.sort(key=lambda j: -j.priority)

        for job in elastic_jobs:
            if available_capacity <= 0:
                break

            can_add = min(
                job.max_replicas - job.current_replicas,
                available_capacity
            )

            if can_add > 0:
                job.current_replicas += can_add
                self.allocations[job.id] += can_add
                available_capacity -= can_add

                self.logger.info(
                    f"Scaled up job {job.id} to {job.current_replicas} replicas"
                )

    def complete_job(self, job_id: str):
        """完成作业"""
        with self._lock:
            job = self.running_jobs.pop(job_id, None)
            if not job:
                return

            job.state = JobState.COMPLETED
            job.completed_at = datetime.now()

            # 释放资源
            gpu_count = self.allocations.pop(job_id, 0)

            # 更新队列使用量
            queue = self.queues.get(job.queue_name)
            if queue:
                queue.current_usage -= gpu_count

            self.logger.info(f"Completed job {job_id}")

            if self.on_job_complete:
                self.on_job_complete(job)

    def get_scheduler_stats(self) -> Dict:
        """获取调度器统计"""
        with self._lock:
            queued_jobs = sum(len(q.jobs) for q in self.queues.values())
            running_jobs = len(self.running_jobs)
            total_allocated = sum(self.allocations.values())

            return {
                "total_capacity": self.total_capacity,
                "allocated_capacity": total_allocated,
                "available_capacity": self.total_capacity - total_allocated,
                "utilization": total_allocated / self.total_capacity if self.total_capacity > 0 else 0,
                "queued_jobs": queued_jobs,
                "running_jobs": running_jobs,
                "queue_stats": {
                    name: {
                        "pending_jobs": len(q.jobs),
                        "current_usage": q.current_usage,
                        "max_capacity": q.max_capacity,
                        "guaranteed_capacity": q.guaranteed_capacity
                    }
                    for name, q in self.queues.items()
                }
            }


# 公平共享调度器
class FairShareScheduler(ElasticScheduler):
    """公平共享调度器"""

    def __init__(self, total_capacity: int):
        super().__init__(total_capacity)
        self.tenant_usage: Dict[str, int] = {}
        self.tenant_share: Dict[str, float] = {}

    def set_tenant_share(self, tenant_id: str, share: float):
        """设置租户份额"""
        self.tenant_share[tenant_id] = share

    def _schedule_once(self):
        """公平共享调度"""
        with self._lock:
            # 计算每个租户的目标份额
            total_share = sum(self.tenant_share.values())
            if total_share == 0:
                return

            tenant_target = {
                tid: int(self.total_capacity * share / total_share)
                for tid, share in self.tenant_share.items()
            }

            # 计算当前使用量
            self.tenant_usage = {}
            for job in self.running_jobs.values():
                self.tenant_usage[job.tenant_id] = \
                    self.tenant_usage.get(job.tenant_id, 0) + \
                    self.allocations.get(job.id, 0)

            # 按使用率差距排序租户
            tenants_by_deficit = sorted(
                tenant_target.items(),
                key=lambda x: self.tenant_usage.get(x[0], 0) / x[1] if x[1] > 0 else float('inf')
            )

            # 为使用率最低的租户调度作业
            available = self.total_capacity - sum(self.allocations.values())

            for tenant_id, target in tenants_by_deficit:
                if available <= 0:
                    break

                current = self.tenant_usage.get(tenant_id, 0)
                if current >= target:
                    continue

                # 找该租户的等待作业
                for queue in self.queues.values():
                    for job in list(queue.jobs):
                        if job.tenant_id != tenant_id:
                            continue
                        if job.state != JobState.QUEUED:
                            continue

                        gpu_to_allocate = min(
                            job.gpu_request,
                            available,
                            target - current
                        )

                        if gpu_to_allocate >= job.min_replicas:
                            queue.jobs.remove(job)
                            heapq.heapify(queue.jobs)
                            self._start_job(job, gpu_to_allocate)
                            available -= gpu_to_allocate
                            current += gpu_to_allocate
                            break


# 使用示例
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    # 创建调度器
    scheduler = ElasticScheduler(total_capacity=64)

    # 创建队列
    scheduler.create_queue(
        name="high-priority",
        policy=QueuePolicy.PRIORITY,
        max_capacity=32,
        guaranteed_capacity=16,
        priority=100
    )

    scheduler.create_queue(
        name="normal",
        policy=QueuePolicy.PRIORITY,
        max_capacity=48,
        guaranteed_capacity=8,
        priority=50
    )

    scheduler.create_queue(
        name="low-priority",
        policy=QueuePolicy.PRIORITY,
        max_capacity=64,
        guaranteed_capacity=0,
        priority=10
    )

    # 启动调度器
    scheduler.start()

    # 提交作业
    job1 = Job(
        id="job-1",
        name="training-large",
        tenant_id="tenant-a",
        queue_name="high-priority",
        priority=100,
        gpu_request=8,
        preemptible=False
    )
    scheduler.submit_job(job1)

    job2 = Job(
        id="job-2",
        name="training-medium",
        tenant_id="tenant-b",
        queue_name="normal",
        priority=50,
        gpu_request=4,
        elastic=True,
        min_replicas=2,
        max_replicas=8
    )
    scheduler.submit_job(job2)

    # 等待调度
    time.sleep(5)

    # 查看状态
    stats = scheduler.get_scheduler_stats()
    print(f"\nScheduler Stats: {stats}")

    # 完成作业
    scheduler.complete_job("job-1")

    time.sleep(2)
    stats = scheduler.get_scheduler_stats()
    print(f"\nAfter completion: {stats}")

    scheduler.stop()

远程 GPU 访问

rGPU 架构

┌─────────────────────────────────────────────────────────────────┐
│                      远程 GPU (rGPU) 架构                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  客户端节点                            GPU 服务器节点              │
│  ┌────────────────────┐               ┌────────────────────┐   │
│  │                    │               │                    │   │
│  │  ┌──────────────┐  │               │  ┌──────────────┐  │   │
│  │  │  Application │  │               │  │  GPU Driver  │  │   │
│  │  │  (PyTorch)   │  │               │  └──────┬───────┘  │   │
│  │  └──────┬───────┘  │               │         │          │   │
│  │         │          │               │  ┌──────┴───────┐  │   │
│  │  ┌──────┴───────┐  │               │  │    GPU 0-7   │  │   │
│  │  │  CUDA API    │  │               │  │  ┌───┐ ┌───┐ │  │   │
│  │  └──────┬───────┘  │               │  │  │ G │ │ G │ │  │   │
│  │         │          │               │  │  │ P │ │ P │ │  │   │
│  │  ┌──────┴───────┐  │               │  │  │ U │ │ U │ │  │   │
│  │  │  rGPU Client │  │    RDMA/     │  │  └───┘ └───┘ │  │   │
│  │  │  ┌─────────┐ │  │    TCP       │  │              │  │   │
│  │  │  │ API Hook│ │◄─┼────────────►┼─►│  ┌───┐ ┌───┐ │  │   │
│  │  │  ├─────────┤ │  │              │  │  │ G │ │ G │ │  │   │
│  │  │  │Serialize│ │  │              │  │  │ P │ │ P │ │  │   │
│  │  │  ├─────────┤ │  │              │  │  │ U │ │ U │ │  │   │
│  │  │  │ Network │ │  │              │  │  └───┘ └───┘ │  │   │
│  │  │  └─────────┘ │  │              │  │              │  │   │
│  │  └──────────────┘  │               │  └──────────────┘  │   │
│  │                    │               │                    │   │
│  │  本地无 GPU        │               │  ┌──────────────┐  │   │
│  │                    │               │  │  rGPU Server │  │   │
│  │                    │               │  │  ┌─────────┐ │  │   │
│  │                    │               │  │  │ Handler │ │  │   │
│  │                    │               │  │  ├─────────┤ │  │   │
│  │                    │               │  │  │Execution│ │  │   │
│  │                    │               │  │  ├─────────┤ │  │   │
│  │                    │               │  │  │ Result  │ │  │   │
│  │                    │               │  │  └─────────┘ │  │   │
│  │                    │               │  └──────────────┘  │   │
│  │                    │               │                    │   │
│  └────────────────────┘               └────────────────────┘   │
│                                                                 │
│  数据流:                                                         │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                                                          │  │
│  │  1. 应用调用 CUDA API                                     │  │
│  │  2. rGPU Client 拦截调用                                  │  │
│  │  3. 序列化参数并通过网络发送                                │  │
│  │  4. rGPU Server 接收并在本地 GPU 执行                      │  │
│  │  5. 返回结果到客户端                                       │  │
│  │                                                          │  │
│  │  关键优化:                                                 │  │
│  │  • 批量 RPC 调用减少网络开销                               │  │
│  │  • RDMA 直接内存访问减少 CPU 拷贝                          │  │
│  │  • 智能缓存减少重复数据传输                                │  │
│  │                                                          │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

rGPU 实现框架

"""
远程 GPU 访问框架 (简化实现)
"""
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
import pickle
import socket
import threading
import struct
import logging
from enum import Enum


class RPCMethod(Enum):
    """RPC 方法"""
    CUDA_MALLOC = "cuda_malloc"
    CUDA_FREE = "cuda_free"
    CUDA_MEMCPY = "cuda_memcpy"
    CUDA_LAUNCH_KERNEL = "cuda_launch_kernel"
    CUDA_SYNC = "cuda_sync"
    TENSOR_CREATE = "tensor_create"
    TENSOR_OP = "tensor_op"
    MODEL_FORWARD = "model_forward"


@dataclass
class RPCRequest:
    """RPC 请求"""
    id: int
    method: RPCMethod
    args: tuple
    kwargs: dict


@dataclass
class RPCResponse:
    """RPC 响应"""
    id: int
    success: bool
    result: Any
    error: Optional[str] = None


class rGPUServer:
    """远程 GPU 服务端"""

    def __init__(self, host: str = "0.0.0.0", port: int = 50051):
        self.host = host
        self.port = port
        self.socket = None
        self._running = False

        # GPU 状态管理
        self.device_handles: Dict[int, Any] = {}
        self.tensor_cache: Dict[int, Any] = {}
        self.model_cache: Dict[str, Any] = {}

        self.logger = logging.getLogger(__name__)

        # 方法映射
        self.handlers: Dict[RPCMethod, Callable] = {
            RPCMethod.CUDA_MALLOC: self._handle_malloc,
            RPCMethod.CUDA_FREE: self._handle_free,
            RPCMethod.CUDA_MEMCPY: self._handle_memcpy,
            RPCMethod.TENSOR_CREATE: self._handle_tensor_create,
            RPCMethod.TENSOR_OP: self._handle_tensor_op,
            RPCMethod.MODEL_FORWARD: self._handle_model_forward,
        }

    def start(self):
        """启动服务"""
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind((self.host, self.port))
        self.socket.listen(10)

        self._running = True
        self.logger.info(f"rGPU Server listening on {self.host}:{self.port}")

        while self._running:
            try:
                client_socket, address = self.socket.accept()
                self.logger.info(f"New connection from {address}")
                client_thread = threading.Thread(
                    target=self._handle_client,
                    args=(client_socket,)
                )
                client_thread.daemon = True
                client_thread.start()
            except Exception as e:
                self.logger.error(f"Accept error: {e}")

    def stop(self):
        """停止服务"""
        self._running = False
        if self.socket:
            self.socket.close()

    def _handle_client(self, client_socket: socket.socket):
        """处理客户端连接"""
        try:
            while self._running:
                # 读取请求长度
                length_data = client_socket.recv(4)
                if not length_data:
                    break

                length = struct.unpack("!I", length_data)[0]

                # 读取请求数据
                data = b""
                while len(data) < length:
                    chunk = client_socket.recv(min(length - len(data), 4096))
                    if not chunk:
                        break
                    data += chunk

                # 反序列化请求
                request: RPCRequest = pickle.loads(data)

                # 处理请求
                response = self._process_request(request)

                # 发送响应
                response_data = pickle.dumps(response)
                client_socket.send(struct.pack("!I", len(response_data)))
                client_socket.sendall(response_data)

        except Exception as e:
            self.logger.error(f"Client handler error: {e}")
        finally:
            client_socket.close()

    def _process_request(self, request: RPCRequest) -> RPCResponse:
        """处理请求"""
        try:
            handler = self.handlers.get(request.method)
            if not handler:
                return RPCResponse(
                    id=request.id,
                    success=False,
                    result=None,
                    error=f"Unknown method: {request.method}"
                )

            result = handler(*request.args, **request.kwargs)
            return RPCResponse(
                id=request.id,
                success=True,
                result=result
            )
        except Exception as e:
            return RPCResponse(
                id=request.id,
                success=False,
                result=None,
                error=str(e)
            )

    def _handle_malloc(self, size: int) -> int:
        """处理显存分配"""
        import torch
        tensor = torch.empty(size, dtype=torch.uint8, device='cuda')
        handle_id = id(tensor)
        self.device_handles[handle_id] = tensor
        return handle_id

    def _handle_free(self, handle_id: int):
        """处理显存释放"""
        if handle_id in self.device_handles:
            del self.device_handles[handle_id]

    def _handle_memcpy(
        self,
        src_data: bytes,
        dst_handle: int,
        direction: str
    ) -> Optional[bytes]:
        """处理内存拷贝"""
        import torch

        if direction == "host_to_device":
            # 主机到设备
            tensor = self.device_handles.get(dst_handle)
            if tensor is not None:
                tensor.copy_(torch.frombuffer(src_data, dtype=torch.uint8))
            return None
        else:
            # 设备到主机
            tensor = self.device_handles.get(dst_handle)
            if tensor is not None:
                return tensor.cpu().numpy().tobytes()
            return None

    def _handle_tensor_create(
        self,
        shape: tuple,
        dtype: str,
        data: Optional[bytes] = None
    ) -> int:
        """创建张量"""
        import torch

        dtype_map = {
            "float32": torch.float32,
            "float16": torch.float16,
            "int64": torch.int64,
            "int32": torch.int32,
        }

        torch_dtype = dtype_map.get(dtype, torch.float32)

        if data:
            import numpy as np
            np_array = np.frombuffer(data, dtype=np.float32).reshape(shape)
            tensor = torch.from_numpy(np_array).to(device='cuda', dtype=torch_dtype)
        else:
            tensor = torch.empty(shape, dtype=torch_dtype, device='cuda')

        handle_id = id(tensor)
        self.tensor_cache[handle_id] = tensor
        return handle_id

    def _handle_tensor_op(
        self,
        op_name: str,
        tensor_ids: List[int],
        *args,
        **kwargs
    ) -> int:
        """执行张量操作"""
        import torch

        tensors = [self.tensor_cache[tid] for tid in tensor_ids]

        # 获取操作函数
        if hasattr(torch, op_name):
            op_func = getattr(torch, op_name)
            result = op_func(*tensors, *args, **kwargs)
        elif len(tensors) > 0 and hasattr(tensors[0], op_name):
            method = getattr(tensors[0], op_name)
            result = method(*tensors[1:], *args, **kwargs)
        else:
            raise ValueError(f"Unknown operation: {op_name}")

        if isinstance(result, torch.Tensor):
            handle_id = id(result)
            self.tensor_cache[handle_id] = result
            return handle_id
        else:
            return result

    def _handle_model_forward(
        self,
        model_name: str,
        input_tensor_ids: List[int]
    ) -> List[int]:
        """模型前向传播"""
        model = self.model_cache.get(model_name)
        if not model:
            raise ValueError(f"Model {model_name} not found")

        inputs = [self.tensor_cache[tid] for tid in input_tensor_ids]

        with torch.no_grad():
            outputs = model(*inputs)

        if isinstance(outputs, (list, tuple)):
            result_ids = []
            for out in outputs:
                if isinstance(out, torch.Tensor):
                    handle_id = id(out)
                    self.tensor_cache[handle_id] = out
                    result_ids.append(handle_id)
            return result_ids
        elif isinstance(outputs, torch.Tensor):
            handle_id = id(outputs)
            self.tensor_cache[handle_id] = outputs
            return [handle_id]
        else:
            return []


class rGPUClient:
    """远程 GPU 客户端"""

    def __init__(self, host: str, port: int = 50051):
        self.host = host
        self.port = port
        self.socket = None
        self._request_id = 0
        self._lock = threading.Lock()

    def connect(self):
        """连接服务端"""
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.connect((self.host, self.port))

    def disconnect(self):
        """断开连接"""
        if self.socket:
            self.socket.close()

    def _call(
        self,
        method: RPCMethod,
        *args,
        **kwargs
    ) -> Any:
        """RPC 调用"""
        with self._lock:
            self._request_id += 1
            request = RPCRequest(
                id=self._request_id,
                method=method,
                args=args,
                kwargs=kwargs
            )

            # 发送请求
            data = pickle.dumps(request)
            self.socket.send(struct.pack("!I", len(data)))
            self.socket.sendall(data)

            # 接收响应
            length_data = self.socket.recv(4)
            length = struct.unpack("!I", length_data)[0]

            response_data = b""
            while len(response_data) < length:
                chunk = self.socket.recv(min(length - len(response_data), 4096))
                response_data += chunk

            response: RPCResponse = pickle.loads(response_data)

            if not response.success:
                raise RuntimeError(response.error)

            return response.result

    def create_tensor(
        self,
        shape: tuple,
        dtype: str = "float32",
        data: Optional[bytes] = None
    ) -> int:
        """创建远程张量"""
        return self._call(RPCMethod.TENSOR_CREATE, shape, dtype, data)

    def tensor_op(
        self,
        op_name: str,
        tensor_ids: List[int],
        *args,
        **kwargs
    ) -> int:
        """执行张量操作"""
        return self._call(RPCMethod.TENSOR_OP, op_name, tensor_ids, *args, **kwargs)

    def model_forward(
        self,
        model_name: str,
        input_tensor_ids: List[int]
    ) -> List[int]:
        """模型前向传播"""
        return self._call(RPCMethod.MODEL_FORWARD, model_name, input_tensor_ids)


# 高级抽象:远程张量
class RemoteTensor:
    """远程张量封装"""

    def __init__(self, client: rGPUClient, handle_id: int, shape: tuple, dtype: str):
        self.client = client
        self.handle_id = handle_id
        self.shape = shape
        self.dtype = dtype

    def __add__(self, other: 'RemoteTensor') -> 'RemoteTensor':
        result_id = self.client.tensor_op("add", [self.handle_id, other.handle_id])
        return RemoteTensor(self.client, result_id, self.shape, self.dtype)

    def __mul__(self, other: 'RemoteTensor') -> 'RemoteTensor':
        result_id = self.client.tensor_op("mul", [self.handle_id, other.handle_id])
        return RemoteTensor(self.client, result_id, self.shape, self.dtype)

    def matmul(self, other: 'RemoteTensor') -> 'RemoteTensor':
        result_id = self.client.tensor_op("matmul", [self.handle_id, other.handle_id])
        # 计算新形状
        new_shape = (self.shape[0], other.shape[1])
        return RemoteTensor(self.client, result_id, new_shape, self.dtype)


# 使用示例
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    # 服务端示例 (在 GPU 节点运行)
    # server = rGPUServer(port=50051)
    # server.start()

    # 客户端示例 (在 CPU 节点运行)
    # client = rGPUClient("gpu-server-address", 50051)
    # client.connect()
    # tensor_id = client.create_tensor((1024, 1024), "float32")
    # result_id = client.tensor_op("relu", [tensor_id])
    # client.disconnect()

    print("rGPU framework initialized")

Kubernetes 集成

算力池化 Operator

# 算力池化 CRD 定义
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: computepools.ai.example.com
spec:
  group: ai.example.com
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                poolType:
                  type: string
                  enum: ["gpu", "npu", "mixed"]
                capacity:
                  type: object
                  properties:
                    total:
                      type: integer
                    reserved:
                      type: integer
                virtualization:
                  type: object
                  properties:
                    enabled:
                      type: boolean
                    strategy:
                      type: string
                      enum: ["mig", "time-slicing", "vgpu"]
                quotas:
                  type: array
                  items:
                    type: object
                    properties:
                      tenant:
                        type: string
                      limit:
                        type: integer
                      guaranteed:
                        type: integer
            status:
              type: object
              properties:
                phase:
                  type: string
                totalCapacity:
                  type: integer
                allocatedCapacity:
                  type: integer
                availableCapacity:
                  type: integer
  scope: Cluster
  names:
    plural: computepools
    singular: computepool
    kind: ComputePool
    shortNames:
      - cp

---
# 算力池示例
apiVersion: ai.example.com/v1
kind: ComputePool
metadata:
  name: training-pool
spec:
  poolType: gpu
  capacity:
    total: 64
    reserved: 8
  virtualization:
    enabled: true
    strategy: mig
  quotas:
    - tenant: team-a
      limit: 32
      guaranteed: 16
    - tenant: team-b
      limit: 24
      guaranteed: 8
    - tenant: team-c
      limit: 16
      guaranteed: 0

---
# 弹性作业 CRD
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: elasticjobs.ai.example.com
spec:
  group: ai.example.com
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                pool:
                  type: string
                priority:
                  type: integer
                preemptible:
                  type: boolean
                scaling:
                  type: object
                  properties:
                    minReplicas:
                      type: integer
                    maxReplicas:
                      type: integer
                    gpuPerReplica:
                      type: integer
                template:
                  type: object
                  x-kubernetes-preserve-unknown-fields: true
  scope: Namespaced
  names:
    plural: elasticjobs
    singular: elasticjob
    kind: ElasticJob
    shortNames:
      - ej

---
# 弹性作业示例
apiVersion: ai.example.com/v1
kind: ElasticJob
metadata:
  name: llm-training
  namespace: ml-workloads
spec:
  pool: training-pool
  priority: 100
  preemptible: false
  scaling:
    minReplicas: 4
    maxReplicas: 16
    gpuPerReplica: 8
  template:
    spec:
      containers:
        - name: trainer
          image: pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime
          command: ["torchrun"]
          args:
            - "--nproc_per_node=8"
            - "--nnodes=$(WORLD_SIZE)"
            - "--node_rank=$(RANK)"
            - "--master_addr=$(MASTER_ADDR)"
            - "--master_port=29500"
            - "train.py"
          resources:
            limits:
              nvidia.com/gpu: 8
              memory: "512Gi"
          env:
            - name: NCCL_DEBUG
              value: "INFO"

---
# 调度队列配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: scheduler-queues
  namespace: kube-system
data:
  queues.yaml: |
    queues:
      - name: high-priority
        weight: 100
        maxCapacity: 48
        guaranteedCapacity: 32
        preemptionPolicy: PreemptLowerPriority

      - name: normal
        weight: 50
        maxCapacity: 64
        guaranteedCapacity: 16
        preemptionPolicy: PreemptLowerPriority

      - name: preemptible
        weight: 10
        maxCapacity: 64
        guaranteedCapacity: 0
        preemptionPolicy: Never

    fairShare:
      enabled: true
      preemptionPolicy: PreemptLowerPriority

---
# Volcano 队列配置
apiVersion: scheduling.volcano.sh/v1beta1
kind: Queue
metadata:
  name: training-queue
spec:
  weight: 1
  capability:
    nvidia.com/gpu: 32
  reclaimable: true
  state: Open
  affinity:
    nodeGroupAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        - "gpu-training"
    nodeGroupAntiAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        - "inference-only"

监控与计费

资源使用监控

"""
算力池化监控与计费系统
"""
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from collections import defaultdict
import threading
import time


@dataclass
class UsageRecord:
    """使用记录"""
    tenant_id: str
    resource_type: str
    resource_id: str
    start_time: datetime
    end_time: Optional[datetime] = None
    quantity: float = 0
    cost: float = 0


@dataclass
class BillingConfig:
    """计费配置"""
    resource_type: str
    price_per_hour: float
    minimum_billing_period: int = 60  # 秒
    discount_rules: Dict[int, float] = field(default_factory=dict)  # 时长 -> 折扣


class UsageTracker:
    """使用量追踪器"""

    def __init__(self):
        self.active_sessions: Dict[str, UsageRecord] = {}
        self.history: List[UsageRecord] = []
        self._lock = threading.Lock()

    def start_session(
        self,
        session_id: str,
        tenant_id: str,
        resource_type: str,
        resource_id: str,
        quantity: float
    ):
        """开始使用会话"""
        with self._lock:
            record = UsageRecord(
                tenant_id=tenant_id,
                resource_type=resource_type,
                resource_id=resource_id,
                start_time=datetime.now(),
                quantity=quantity
            )
            self.active_sessions[session_id] = record

    def end_session(self, session_id: str) -> Optional[UsageRecord]:
        """结束使用会话"""
        with self._lock:
            record = self.active_sessions.pop(session_id, None)
            if record:
                record.end_time = datetime.now()
                self.history.append(record)
            return record

    def get_current_usage(self, tenant_id: str = None) -> Dict[str, float]:
        """获取当前使用量"""
        with self._lock:
            usage = defaultdict(float)
            for session_id, record in self.active_sessions.items():
                if tenant_id and record.tenant_id != tenant_id:
                    continue
                key = f"{record.resource_type}:{record.tenant_id}"
                usage[key] += record.quantity
            return dict(usage)

    def get_usage_summary(
        self,
        tenant_id: str,
        start_time: datetime,
        end_time: datetime
    ) -> Dict:
        """获取使用量汇总"""
        with self._lock:
            summary = {
                "tenant_id": tenant_id,
                "period_start": start_time.isoformat(),
                "period_end": end_time.isoformat(),
                "resources": {}
            }

            for record in self.history:
                if record.tenant_id != tenant_id:
                    continue
                if record.start_time < start_time or record.start_time > end_time:
                    continue

                resource_key = record.resource_type
                if resource_key not in summary["resources"]:
                    summary["resources"][resource_key] = {
                        "total_hours": 0,
                        "total_cost": 0,
                        "session_count": 0
                    }

                # 计算使用时长
                duration = (record.end_time - record.start_time).total_seconds() / 3600
                summary["resources"][resource_key]["total_hours"] += duration * record.quantity
                summary["resources"][resource_key]["total_cost"] += record.cost
                summary["resources"][resource_key]["session_count"] += 1

            return summary


class BillingSystem:
    """计费系统"""

    def __init__(self, usage_tracker: UsageTracker):
        self.usage_tracker = usage_tracker
        self.billing_configs: Dict[str, BillingConfig] = {}
        self.tenant_balances: Dict[str, float] = defaultdict(float)
        self.invoices: List[Dict] = []

    def configure_pricing(
        self,
        resource_type: str,
        price_per_hour: float,
        discount_rules: Dict[int, float] = None
    ):
        """配置计费规则"""
        self.billing_configs[resource_type] = BillingConfig(
            resource_type=resource_type,
            price_per_hour=price_per_hour,
            discount_rules=discount_rules or {}
        )

    def calculate_cost(
        self,
        resource_type: str,
        duration_seconds: float,
        quantity: float
    ) -> float:
        """计算费用"""
        config = self.billing_configs.get(resource_type)
        if not config:
            return 0

        # 最小计费周期
        billable_seconds = max(duration_seconds, config.minimum_billing_period)
        hours = billable_seconds / 3600

        base_cost = hours * quantity * config.price_per_hour

        # 应用折扣
        total_hours = hours * quantity
        discount = 1.0
        for threshold, disc in sorted(config.discount_rules.items(), reverse=True):
            if total_hours >= threshold:
                discount = disc
                break

        return base_cost * discount

    def charge_session(self, session_id: str) -> Optional[float]:
        """结算会话"""
        record = self.usage_tracker.end_session(session_id)
        if not record:
            return None

        duration = (record.end_time - record.start_time).total_seconds()
        cost = self.calculate_cost(
            record.resource_type,
            duration,
            record.quantity
        )

        record.cost = cost
        self.tenant_balances[record.tenant_id] -= cost

        return cost

    def add_credit(self, tenant_id: str, amount: float):
        """增加信用额度"""
        self.tenant_balances[tenant_id] += amount

    def get_balance(self, tenant_id: str) -> float:
        """获取余额"""
        return self.tenant_balances[tenant_id]

    def generate_invoice(
        self,
        tenant_id: str,
        start_time: datetime,
        end_time: datetime
    ) -> Dict:
        """生成账单"""
        summary = self.usage_tracker.get_usage_summary(
            tenant_id, start_time, end_time
        )

        invoice = {
            "invoice_id": f"INV-{int(time.time())}",
            "tenant_id": tenant_id,
            "period": {
                "start": start_time.isoformat(),
                "end": end_time.isoformat()
            },
            "line_items": [],
            "total": 0,
            "generated_at": datetime.now().isoformat()
        }

        for resource_type, usage in summary.get("resources", {}).items():
            config = self.billing_configs.get(resource_type)
            price = config.price_per_hour if config else 0

            item = {
                "resource_type": resource_type,
                "hours": usage["total_hours"],
                "unit_price": price,
                "subtotal": usage["total_cost"],
                "sessions": usage["session_count"]
            }
            invoice["line_items"].append(item)
            invoice["total"] += usage["total_cost"]

        self.invoices.append(invoice)
        return invoice


class ResourceMonitor:
    """资源监控"""

    def __init__(self, usage_tracker: UsageTracker):
        self.usage_tracker = usage_tracker
        self.metrics: Dict[str, List[Dict]] = defaultdict(list)
        self._running = False

    def start(self, interval: int = 60):
        """启动监控"""
        self._running = True
        self._thread = threading.Thread(target=self._collect_loop, args=(interval,))
        self._thread.daemon = True
        self._thread.start()

    def stop(self):
        """停止监控"""
        self._running = False

    def _collect_loop(self, interval: int):
        """采集循环"""
        while self._running:
            self._collect_metrics()
            time.sleep(interval)

    def _collect_metrics(self):
        """采集指标"""
        timestamp = datetime.now()
        current_usage = self.usage_tracker.get_current_usage()

        for key, value in current_usage.items():
            self.metrics[key].append({
                "timestamp": timestamp.isoformat(),
                "value": value
            })

            # 保留最近 24 小时数据
            cutoff = timestamp - timedelta(hours=24)
            self.metrics[key] = [
                m for m in self.metrics[key]
                if datetime.fromisoformat(m["timestamp"]) > cutoff
            ]

    def get_metrics(
        self,
        resource_key: str,
        start_time: datetime = None,
        end_time: datetime = None
    ) -> List[Dict]:
        """获取指标"""
        metrics = self.metrics.get(resource_key, [])

        if start_time or end_time:
            filtered = []
            for m in metrics:
                ts = datetime.fromisoformat(m["timestamp"])
                if start_time and ts < start_time:
                    continue
                if end_time and ts > end_time:
                    continue
                filtered.append(m)
            return filtered

        return metrics

    def export_prometheus_metrics(self) -> str:
        """导出 Prometheus 格式指标"""
        lines = []

        lines.append("# HELP compute_pool_usage Current resource usage")
        lines.append("# TYPE compute_pool_usage gauge")

        current_usage = self.usage_tracker.get_current_usage()
        for key, value in current_usage.items():
            resource_type, tenant_id = key.split(":")
            lines.append(
                f'compute_pool_usage{{resource_type="{resource_type}",'
                f'tenant_id="{tenant_id}"}} {value}'
            )

        return '\n'.join(lines)


# 使用示例
if __name__ == "__main__":
    # 创建使用追踪器
    tracker = UsageTracker()

    # 创建计费系统
    billing = BillingSystem(tracker)

    # 配置价格
    billing.configure_pricing(
        resource_type="nvidia.com/gpu",
        price_per_hour=2.0,
        discount_rules={
            100: 0.9,   # 100小时以上 9折
            500: 0.8,   # 500小时以上 8折
            1000: 0.7   # 1000小时以上 7折
        }
    )

    # 增加信用额度
    billing.add_credit("tenant-a", 1000.0)

    # 开始使用
    tracker.start_session(
        session_id="session-1",
        tenant_id="tenant-a",
        resource_type="nvidia.com/gpu",
        resource_id="gpu-0",
        quantity=4
    )

    # 模拟使用
    time.sleep(2)

    # 结算
    cost = billing.charge_session("session-1")
    print(f"Session cost: ${cost:.2f}")
    print(f"Remaining balance: ${billing.get_balance('tenant-a'):.2f}")

    # 生成账单
    invoice = billing.generate_invoice(
        tenant_id="tenant-a",
        start_time=datetime.now() - timedelta(days=1),
        end_time=datetime.now()
    )
    print(f"\nInvoice: {invoice}")

最佳实践

算力池化部署清单

# 算力池化最佳实践
deployment_checklist:
  # 资源规划
  resource_planning:
    - name: "容量规划"
      considerations:
        - 预留 10-20% 容量用于弹性伸缩
        - 区分训练池和推理池
        - 考虑峰值负载

    - name: "配额策略"
      recommendations:
        - 按团队设置配额
        - 保证配额 + 弹性配额
        - 定期审核使用情况

  # 调度策略
  scheduling:
    - name: "队列设计"
      patterns:
        - 高优先级队列 (生产任务)
        - 普通队列 (开发训练)
        - 抢占队列 (临时任务)

    - name: "弹性伸缩"
      policies:
        - 基于队列深度
        - 基于资源利用率
        - 基于时间窗口

  # 虚拟化配置
  virtualization:
    - name: "MIG 策略"
      configs:
        training: "3g.40gb"  # 大实例
        inference: "1g.10gb"  # 小实例
        mixed: "balanced"

    - name: "时间片"
      recommendations:
        - 推理服务: replicas=4
        - 开发测试: replicas=2

  # 监控告警
  monitoring:
    metrics:
      - "资源利用率"
      - "队列等待时间"
      - "作业完成率"
      - "抢占次数"

    alerts:
      - name: "低利用率"
        condition: "utilization < 50%"
        action: "检查调度策略"

      - name: "长队列"
        condition: "queue_wait_time > 1h"
        action: "扩容或调整配额"

---
# 参考架构
reference_architecture:
  # 小规模 (< 100 GPU)
  small:
    pools: 1
    queues: 2
    scheduler: "Kubernetes 原生"
    virtualization: "时间片"

  # 中规模 (100-500 GPU)
  medium:
    pools: 2-3
    queues: 4-5
    scheduler: "Volcano"
    virtualization: "MIG + 时间片"

  # 大规模 (> 500 GPU)
  large:
    pools: "按集群/区域"
    queues: "层级队列"
    scheduler: "自定义 + Volcano"
    virtualization: "MIG"
    additional:
      - "跨集群联邦"
      - "多级缓存"
      - "智能调度"

总结

算力池化与弹性调度是构建高效 AI 基础设施的关键:

  1. 资源池化:统一管理分散资源,提高利用率
  2. 弹性调度:根据负载动态分配,优化资源使用
  3. 多租户隔离:配额管理、公平共享、优先级抢占
  4. 监控计费:使用追踪、成本分摊、账单生成

通过合理的池化策略和调度算法,可以在保证服务质量的同时,最大化资源利用效率。

Prev
设备拓扑感知调度