算力池化与弹性调度
概述
算力池化是将分散的 GPU/NPU 资源统一管理,实现资源的弹性分配和高效利用。通过池化技术,可以打破物理边界,让计算资源像云服务一样按需使用。本文探讨算力池化的架构设计、实现方案及弹性调度策略。
算力池化架构
整体架构设计
┌─────────────────────────────────────────────────────────────────┐
│ 算力池化平台架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 用户接入层 │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────────┐ │ │
│ │ │ Web UI │ │ CLI │ │ API │ │ SDK/Client │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └──────┬──────┘ │ │
│ │ └───────────┬┴───────────┬┴──────────────┘ │ │
│ └───────────────────┼────────────┼────────────────────────┘ │
│ │ │ │
│ ════════════════════╪════════════╪════════════════════════ │
│ │ │ │
│ 资源管理层 │ │
│ ┌───────────────────┴────────────┴────────────────────────┐ │
│ │ │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ Resource Manager (资源管理器) │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │ 资源发现 │ │ 资源抽象 │ │ 容量规划 │ │ │ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │
│ │ └────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ Scheduler (调度器) │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │ 队列管理 │ │ 优先级 │ │ 抢占策略 │ │ │ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │ Gang调度 │ │ 弹性伸缩 │ │ 公平共享 │ │ │ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │
│ │ └────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ Quota Manager (配额管理) │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │ 租户配额 │ │ 计费统计 │ │ 用量告警 │ │ │ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │
│ │ └────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ════════════════════════════╪════════════════════════════════ │
│ │ │
│ 虚拟化层 │ │
│ ┌───────────────────────────┴───────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ GPU 虚拟化 │ │ 显存池化 │ │ 远程 GPU │ │ │
│ │ │ (MIG/vGPU) │ │ (Memory Pool)│ │ (RDMA/RPC) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ Unified Device Layer (统一设备层) │ │ │
│ │ │ NVIDIA GPU │ AMD GPU │ 昇腾 NPU │ Gaudi │ TPU │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ════════════════════════════╪════════════════════════════════ │
│ │ │
│ 物理资源层 │ │
│ ┌───────────────────────────┴───────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ 物理集群 A (训练) │ │ │
│ │ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │ │
│ │ │ │Node1│ │Node2│ │Node3│ │Node4│ ... (H100×8) │ │ │
│ │ │ └─────┘ └─────┘ └─────┘ └─────┘ │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ 物理集群 B (推理) │ │ │
│ │ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │ │
│ │ │ │Node1│ │Node2│ │Node3│ │Node4│ ... (A10×4) │ │ │
│ │ │ └─────┘ └─────┘ └─────┘ └─────┘ │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
资源抽象模型
"""
算力池化资源抽象模型
"""
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import uuid
class ResourceType(Enum):
"""资源类型"""
GPU = "gpu"
NPU = "npu"
CPU = "cpu"
MEMORY = "memory"
STORAGE = "storage"
NETWORK = "network"
class ResourceState(Enum):
"""资源状态"""
AVAILABLE = "available"
ALLOCATED = "allocated"
RESERVED = "reserved"
MAINTENANCE = "maintenance"
FAILED = "failed"
@dataclass
class ResourceSpec:
"""资源规格"""
type: ResourceType
model: str # e.g., "A100-80GB", "910B"
quantity: float # 支持小数(虚拟化场景)
attributes: Dict[str, str] = field(default_factory=dict)
@dataclass
class PhysicalResource:
"""物理资源"""
id: str
type: ResourceType
model: str
node_name: str
state: ResourceState
capacity: float # 总容量
allocated: float # 已分配
attributes: Dict[str, str] = field(default_factory=dict)
@property
def available(self) -> float:
return self.capacity - self.allocated
def can_allocate(self, amount: float) -> bool:
return self.available >= amount
@dataclass
class VirtualResource:
"""虚拟资源"""
id: str
physical_id: str # 对应的物理资源
type: ResourceType
model: str
quantity: float
state: ResourceState
owner: Optional[str] = None # 租户 ID
created_at: datetime = field(default_factory=datetime.now)
@dataclass
class ResourcePool:
"""资源池"""
id: str
name: str
type: ResourceType
physical_resources: List[str] # 物理资源 ID 列表
total_capacity: float
allocated_capacity: float
reserved_capacity: float
labels: Dict[str, str] = field(default_factory=dict)
quotas: Dict[str, float] = field(default_factory=dict) # 租户配额
@property
def available_capacity(self) -> float:
return self.total_capacity - self.allocated_capacity - self.reserved_capacity
@dataclass
class ResourceRequest:
"""资源请求"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
tenant_id: str = ""
resources: List[ResourceSpec] = field(default_factory=list)
priority: int = 0
preemptible: bool = True
constraints: Dict[str, str] = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.now)
@dataclass
class ResourceAllocation:
"""资源分配"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
request_id: str = ""
tenant_id: str = ""
resources: List[VirtualResource] = field(default_factory=list)
state: str = "pending"
created_at: datetime = field(default_factory=datetime.now)
expires_at: Optional[datetime] = None
class ResourcePoolManager:
"""资源池管理器"""
def __init__(self):
self.physical_resources: Dict[str, PhysicalResource] = {}
self.virtual_resources: Dict[str, VirtualResource] = {}
self.pools: Dict[str, ResourcePool] = {}
self.allocations: Dict[str, ResourceAllocation] = {}
def register_physical_resource(self, resource: PhysicalResource):
"""注册物理资源"""
self.physical_resources[resource.id] = resource
def create_pool(
self,
name: str,
resource_type: ResourceType,
resource_ids: List[str],
labels: Dict[str, str] = None
) -> ResourcePool:
"""创建资源池"""
total_capacity = sum(
self.physical_resources[rid].capacity
for rid in resource_ids
if rid in self.physical_resources
)
pool = ResourcePool(
id=str(uuid.uuid4()),
name=name,
type=resource_type,
physical_resources=resource_ids,
total_capacity=total_capacity,
allocated_capacity=0,
reserved_capacity=0,
labels=labels or {}
)
self.pools[pool.id] = pool
return pool
def set_quota(self, pool_id: str, tenant_id: str, quota: float):
"""设置租户配额"""
if pool_id in self.pools:
self.pools[pool_id].quotas[tenant_id] = quota
def get_tenant_usage(self, pool_id: str, tenant_id: str) -> float:
"""获取租户用量"""
usage = 0
for alloc in self.allocations.values():
if alloc.tenant_id == tenant_id:
for vr in alloc.resources:
if self.physical_resources.get(vr.physical_id, {}) and \
vr.physical_id in self.pools.get(pool_id, ResourcePool("", "", ResourceType.GPU, [], 0, 0, 0)).physical_resources:
usage += vr.quantity
return usage
def allocate(
self,
request: ResourceRequest,
pool_id: Optional[str] = None
) -> Optional[ResourceAllocation]:
"""分配资源"""
# 检查配额
if pool_id:
pool = self.pools.get(pool_id)
if pool:
quota = pool.quotas.get(request.tenant_id, float('inf'))
current_usage = self.get_tenant_usage(pool_id, request.tenant_id)
requested = sum(r.quantity for r in request.resources)
if current_usage + requested > quota:
return None
# 执行分配
allocation = ResourceAllocation(
request_id=request.id,
tenant_id=request.tenant_id,
state="active"
)
for spec in request.resources:
# 找到可用的物理资源
for pr_id, pr in self.physical_resources.items():
if pr.type == spec.type and pr.model == spec.model:
if pr.can_allocate(spec.quantity):
# 创建虚拟资源
vr = VirtualResource(
id=str(uuid.uuid4()),
physical_id=pr_id,
type=spec.type,
model=spec.model,
quantity=spec.quantity,
state=ResourceState.ALLOCATED,
owner=request.tenant_id
)
self.virtual_resources[vr.id] = vr
allocation.resources.append(vr)
# 更新物理资源使用量
pr.allocated += spec.quantity
# 更新池使用量
if pool_id and pool_id in self.pools:
self.pools[pool_id].allocated_capacity += spec.quantity
break
if len(allocation.resources) == len(request.resources):
self.allocations[allocation.id] = allocation
return allocation
else:
# 回滚
self._rollback_allocation(allocation)
return None
def release(self, allocation_id: str):
"""释放资源"""
allocation = self.allocations.get(allocation_id)
if not allocation:
return
for vr in allocation.resources:
# 更新物理资源
pr = self.physical_resources.get(vr.physical_id)
if pr:
pr.allocated -= vr.quantity
# 删除虚拟资源
if vr.id in self.virtual_resources:
del self.virtual_resources[vr.id]
# 更新池使用量
for pool in self.pools.values():
if vr.physical_id in pool.physical_resources:
pool.allocated_capacity -= vr.quantity
allocation.state = "released"
def _rollback_allocation(self, allocation: ResourceAllocation):
"""回滚分配"""
for vr in allocation.resources:
pr = self.physical_resources.get(vr.physical_id)
if pr:
pr.allocated -= vr.quantity
if vr.id in self.virtual_resources:
del self.virtual_resources[vr.id]
def get_pool_stats(self, pool_id: str) -> Dict:
"""获取池统计信息"""
pool = self.pools.get(pool_id)
if not pool:
return {}
return {
"id": pool.id,
"name": pool.name,
"type": pool.type.value,
"total_capacity": pool.total_capacity,
"allocated_capacity": pool.allocated_capacity,
"reserved_capacity": pool.reserved_capacity,
"available_capacity": pool.available_capacity,
"utilization": pool.allocated_capacity / pool.total_capacity if pool.total_capacity > 0 else 0,
"resource_count": len(pool.physical_resources),
"tenant_quotas": pool.quotas
}
# 使用示例
if __name__ == "__main__":
manager = ResourcePoolManager()
# 注册物理资源
for i in range(8):
manager.register_physical_resource(PhysicalResource(
id=f"gpu-{i}",
type=ResourceType.GPU,
model="A100-80GB",
node_name="node-0",
state=ResourceState.AVAILABLE,
capacity=1.0,
allocated=0
))
# 创建资源池
pool = manager.create_pool(
name="training-pool",
resource_type=ResourceType.GPU,
resource_ids=[f"gpu-{i}" for i in range(8)],
labels={"purpose": "training"}
)
# 设置配额
manager.set_quota(pool.id, "tenant-a", 4.0)
manager.set_quota(pool.id, "tenant-b", 4.0)
# 分配资源
request = ResourceRequest(
tenant_id="tenant-a",
resources=[
ResourceSpec(type=ResourceType.GPU, model="A100-80GB", quantity=2)
]
)
allocation = manager.allocate(request, pool.id)
if allocation:
print(f"Allocated: {len(allocation.resources)} resources")
for vr in allocation.resources:
print(f" - {vr.type.value} {vr.model}: {vr.quantity}")
# 查看池状态
stats = manager.get_pool_stats(pool.id)
print(f"\nPool Stats: {stats}")
弹性调度系统
队列管理与调度
"""
弹性调度系统实现
"""
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timedelta
import heapq
import threading
import time
import logging
class JobState(Enum):
"""作业状态"""
PENDING = "pending"
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
PREEMPTED = "preempted"
SUSPENDED = "suspended"
class QueuePolicy(Enum):
"""队列策略"""
FIFO = "fifo"
PRIORITY = "priority"
FAIR_SHARE = "fair_share"
CAPACITY = "capacity"
@dataclass
class Job:
"""作业"""
id: str
name: str
tenant_id: str
queue_name: str
priority: int = 0
gpu_request: int = 1
gpu_limit: int = 8
preemptible: bool = True
elastic: bool = False # 是否支持弹性伸缩
min_replicas: int = 1
max_replicas: int = 1
state: JobState = JobState.PENDING
current_replicas: int = 0
created_at: datetime = field(default_factory=datetime.now)
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
def __lt__(self, other):
# 用于优先级队列排序
if self.priority != other.priority:
return self.priority > other.priority # 高优先级在前
return self.created_at < other.created_at # 同优先级,先到先服务
@dataclass
class Queue:
"""调度队列"""
name: str
policy: QueuePolicy
priority: int = 0
max_capacity: int = 0 # 最大容量 (GPU 数)
guaranteed_capacity: int = 0 # 保证容量
current_usage: int = 0
weight: float = 1.0 # 公平共享权重
jobs: List[Job] = field(default_factory=list)
tenants: Set[str] = field(default_factory=set)
def add_job(self, job: Job):
"""添加作业"""
heapq.heappush(self.jobs, job)
job.state = JobState.QUEUED
def pop_job(self) -> Optional[Job]:
"""取出作业"""
if self.jobs:
return heapq.heappop(self.jobs)
return None
def peek_job(self) -> Optional[Job]:
"""查看队首作业"""
if self.jobs:
return self.jobs[0]
return None
class ElasticScheduler:
"""弹性调度器"""
def __init__(
self,
total_capacity: int,
preemption_enabled: bool = True
):
self.total_capacity = total_capacity
self.preemption_enabled = preemption_enabled
self.queues: Dict[str, Queue] = {}
self.running_jobs: Dict[str, Job] = {}
self.allocations: Dict[str, int] = {} # job_id -> gpu_count
self._lock = threading.Lock()
self._running = False
self._scheduler_thread: Optional[threading.Thread] = None
self.logger = logging.getLogger(__name__)
# 回调函数
self.on_job_start: Optional[Callable] = None
self.on_job_complete: Optional[Callable] = None
self.on_job_preempt: Optional[Callable] = None
def create_queue(
self,
name: str,
policy: QueuePolicy = QueuePolicy.PRIORITY,
max_capacity: int = 0,
guaranteed_capacity: int = 0,
weight: float = 1.0,
priority: int = 0
) -> Queue:
"""创建队列"""
queue = Queue(
name=name,
policy=policy,
max_capacity=max_capacity or self.total_capacity,
guaranteed_capacity=guaranteed_capacity,
weight=weight,
priority=priority
)
self.queues[name] = queue
return queue
def submit_job(self, job: Job) -> bool:
"""提交作业"""
with self._lock:
queue = self.queues.get(job.queue_name)
if not queue:
self.logger.error(f"Queue {job.queue_name} not found")
return False
queue.add_job(job)
self.logger.info(f"Job {job.id} submitted to queue {job.queue_name}")
return True
def start(self):
"""启动调度器"""
self._running = True
self._scheduler_thread = threading.Thread(target=self._scheduling_loop)
self._scheduler_thread.daemon = True
self._scheduler_thread.start()
def stop(self):
"""停止调度器"""
self._running = False
if self._scheduler_thread:
self._scheduler_thread.join()
def _scheduling_loop(self):
"""调度循环"""
while self._running:
try:
self._schedule_once()
except Exception as e:
self.logger.error(f"Scheduling error: {e}")
time.sleep(1)
def _schedule_once(self):
"""执行一次调度"""
with self._lock:
# 计算可用容量
used_capacity = sum(self.allocations.values())
available_capacity = self.total_capacity - used_capacity
if available_capacity <= 0:
# 尝试抢占
if self.preemption_enabled:
self._try_preemption()
return
# 按队列优先级和策略调度
sorted_queues = sorted(
self.queues.values(),
key=lambda q: (-q.priority, -q.weight)
)
for queue in sorted_queues:
job = queue.peek_job()
if not job:
continue
# 检查容量限制
if queue.current_usage >= queue.max_capacity:
continue
# 检查作业资源需求
gpu_to_allocate = min(
job.gpu_request,
available_capacity,
queue.max_capacity - queue.current_usage
)
if gpu_to_allocate >= job.min_replicas:
# 分配资源
queue.pop_job()
self._start_job(job, gpu_to_allocate)
queue.current_usage += gpu_to_allocate
available_capacity -= gpu_to_allocate
# 弹性伸缩检查
self._elastic_scaling(available_capacity)
def _start_job(self, job: Job, gpu_count: int):
"""启动作业"""
job.state = JobState.RUNNING
job.started_at = datetime.now()
job.current_replicas = gpu_count
self.running_jobs[job.id] = job
self.allocations[job.id] = gpu_count
self.logger.info(f"Started job {job.id} with {gpu_count} GPUs")
if self.on_job_start:
self.on_job_start(job, gpu_count)
def _try_preemption(self):
"""尝试抢占"""
# 找到高优先级等待作业
waiting_jobs = []
for queue in self.queues.values():
for job in queue.jobs:
if job.state == JobState.QUEUED:
waiting_jobs.append((queue.priority, job))
if not waiting_jobs:
return
# 按优先级排序
waiting_jobs.sort(key=lambda x: -x[0])
for queue_priority, waiting_job in waiting_jobs:
# 找可抢占的作业
preemptible_jobs = [
(job_id, job)
for job_id, job in self.running_jobs.items()
if job.preemptible and job.priority < waiting_job.priority
]
if not preemptible_jobs:
continue
# 按优先级排序(低优先级先被抢占)
preemptible_jobs.sort(key=lambda x: x[1].priority)
# 释放资源直到满足需求
released = 0
for job_id, job in preemptible_jobs:
if released >= waiting_job.gpu_request:
break
self._preempt_job(job)
released += self.allocations.get(job_id, 0)
def _preempt_job(self, job: Job):
"""抢占作业"""
job.state = JobState.PREEMPTED
# 释放资源
gpu_count = self.allocations.pop(job.id, 0)
# 更新队列使用量
queue = self.queues.get(job.queue_name)
if queue:
queue.current_usage -= gpu_count
# 从运行列表移除
self.running_jobs.pop(job.id, None)
self.logger.info(f"Preempted job {job.id}")
if self.on_job_preempt:
self.on_job_preempt(job)
# 重新加入队列
if queue:
job.state = JobState.PENDING
queue.add_job(job)
def _elastic_scaling(self, available_capacity: int):
"""弹性伸缩"""
if available_capacity <= 0:
return
# 找可以扩容的弹性作业
elastic_jobs = [
job for job in self.running_jobs.values()
if job.elastic and job.current_replicas < job.max_replicas
]
if not elastic_jobs:
return
# 按优先级分配额外资源
elastic_jobs.sort(key=lambda j: -j.priority)
for job in elastic_jobs:
if available_capacity <= 0:
break
can_add = min(
job.max_replicas - job.current_replicas,
available_capacity
)
if can_add > 0:
job.current_replicas += can_add
self.allocations[job.id] += can_add
available_capacity -= can_add
self.logger.info(
f"Scaled up job {job.id} to {job.current_replicas} replicas"
)
def complete_job(self, job_id: str):
"""完成作业"""
with self._lock:
job = self.running_jobs.pop(job_id, None)
if not job:
return
job.state = JobState.COMPLETED
job.completed_at = datetime.now()
# 释放资源
gpu_count = self.allocations.pop(job_id, 0)
# 更新队列使用量
queue = self.queues.get(job.queue_name)
if queue:
queue.current_usage -= gpu_count
self.logger.info(f"Completed job {job_id}")
if self.on_job_complete:
self.on_job_complete(job)
def get_scheduler_stats(self) -> Dict:
"""获取调度器统计"""
with self._lock:
queued_jobs = sum(len(q.jobs) for q in self.queues.values())
running_jobs = len(self.running_jobs)
total_allocated = sum(self.allocations.values())
return {
"total_capacity": self.total_capacity,
"allocated_capacity": total_allocated,
"available_capacity": self.total_capacity - total_allocated,
"utilization": total_allocated / self.total_capacity if self.total_capacity > 0 else 0,
"queued_jobs": queued_jobs,
"running_jobs": running_jobs,
"queue_stats": {
name: {
"pending_jobs": len(q.jobs),
"current_usage": q.current_usage,
"max_capacity": q.max_capacity,
"guaranteed_capacity": q.guaranteed_capacity
}
for name, q in self.queues.items()
}
}
# 公平共享调度器
class FairShareScheduler(ElasticScheduler):
"""公平共享调度器"""
def __init__(self, total_capacity: int):
super().__init__(total_capacity)
self.tenant_usage: Dict[str, int] = {}
self.tenant_share: Dict[str, float] = {}
def set_tenant_share(self, tenant_id: str, share: float):
"""设置租户份额"""
self.tenant_share[tenant_id] = share
def _schedule_once(self):
"""公平共享调度"""
with self._lock:
# 计算每个租户的目标份额
total_share = sum(self.tenant_share.values())
if total_share == 0:
return
tenant_target = {
tid: int(self.total_capacity * share / total_share)
for tid, share in self.tenant_share.items()
}
# 计算当前使用量
self.tenant_usage = {}
for job in self.running_jobs.values():
self.tenant_usage[job.tenant_id] = \
self.tenant_usage.get(job.tenant_id, 0) + \
self.allocations.get(job.id, 0)
# 按使用率差距排序租户
tenants_by_deficit = sorted(
tenant_target.items(),
key=lambda x: self.tenant_usage.get(x[0], 0) / x[1] if x[1] > 0 else float('inf')
)
# 为使用率最低的租户调度作业
available = self.total_capacity - sum(self.allocations.values())
for tenant_id, target in tenants_by_deficit:
if available <= 0:
break
current = self.tenant_usage.get(tenant_id, 0)
if current >= target:
continue
# 找该租户的等待作业
for queue in self.queues.values():
for job in list(queue.jobs):
if job.tenant_id != tenant_id:
continue
if job.state != JobState.QUEUED:
continue
gpu_to_allocate = min(
job.gpu_request,
available,
target - current
)
if gpu_to_allocate >= job.min_replicas:
queue.jobs.remove(job)
heapq.heapify(queue.jobs)
self._start_job(job, gpu_to_allocate)
available -= gpu_to_allocate
current += gpu_to_allocate
break
# 使用示例
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# 创建调度器
scheduler = ElasticScheduler(total_capacity=64)
# 创建队列
scheduler.create_queue(
name="high-priority",
policy=QueuePolicy.PRIORITY,
max_capacity=32,
guaranteed_capacity=16,
priority=100
)
scheduler.create_queue(
name="normal",
policy=QueuePolicy.PRIORITY,
max_capacity=48,
guaranteed_capacity=8,
priority=50
)
scheduler.create_queue(
name="low-priority",
policy=QueuePolicy.PRIORITY,
max_capacity=64,
guaranteed_capacity=0,
priority=10
)
# 启动调度器
scheduler.start()
# 提交作业
job1 = Job(
id="job-1",
name="training-large",
tenant_id="tenant-a",
queue_name="high-priority",
priority=100,
gpu_request=8,
preemptible=False
)
scheduler.submit_job(job1)
job2 = Job(
id="job-2",
name="training-medium",
tenant_id="tenant-b",
queue_name="normal",
priority=50,
gpu_request=4,
elastic=True,
min_replicas=2,
max_replicas=8
)
scheduler.submit_job(job2)
# 等待调度
time.sleep(5)
# 查看状态
stats = scheduler.get_scheduler_stats()
print(f"\nScheduler Stats: {stats}")
# 完成作业
scheduler.complete_job("job-1")
time.sleep(2)
stats = scheduler.get_scheduler_stats()
print(f"\nAfter completion: {stats}")
scheduler.stop()
远程 GPU 访问
rGPU 架构
┌─────────────────────────────────────────────────────────────────┐
│ 远程 GPU (rGPU) 架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 客户端节点 GPU 服务器节点 │
│ ┌────────────────────┐ ┌────────────────────┐ │
│ │ │ │ │ │
│ │ ┌──────────────┐ │ │ ┌──────────────┐ │ │
│ │ │ Application │ │ │ │ GPU Driver │ │ │
│ │ │ (PyTorch) │ │ │ └──────┬───────┘ │ │
│ │ └──────┬───────┘ │ │ │ │ │
│ │ │ │ │ ┌──────┴───────┐ │ │
│ │ ┌──────┴───────┐ │ │ │ GPU 0-7 │ │ │
│ │ │ CUDA API │ │ │ │ ┌───┐ ┌───┐ │ │ │
│ │ └──────┬───────┘ │ │ │ │ G │ │ G │ │ │ │
│ │ │ │ │ │ │ P │ │ P │ │ │ │
│ │ ┌──────┴───────┐ │ │ │ │ U │ │ U │ │ │ │
│ │ │ rGPU Client │ │ RDMA/ │ │ └───┘ └───┘ │ │ │
│ │ │ ┌─────────┐ │ │ TCP │ │ │ │ │
│ │ │ │ API Hook│ │◄─┼────────────►┼─►│ ┌───┐ ┌───┐ │ │ │
│ │ │ ├─────────┤ │ │ │ │ │ G │ │ G │ │ │ │
│ │ │ │Serialize│ │ │ │ │ │ P │ │ P │ │ │ │
│ │ │ ├─────────┤ │ │ │ │ │ U │ │ U │ │ │ │
│ │ │ │ Network │ │ │ │ │ └───┘ └───┘ │ │ │
│ │ │ └─────────┘ │ │ │ │ │ │ │
│ │ └──────────────┘ │ │ └──────────────┘ │ │
│ │ │ │ │ │
│ │ 本地无 GPU │ │ ┌──────────────┐ │ │
│ │ │ │ │ rGPU Server │ │ │
│ │ │ │ │ ┌─────────┐ │ │ │
│ │ │ │ │ │ Handler │ │ │ │
│ │ │ │ │ ├─────────┤ │ │ │
│ │ │ │ │ │Execution│ │ │ │
│ │ │ │ │ ├─────────┤ │ │ │
│ │ │ │ │ │ Result │ │ │ │
│ │ │ │ │ └─────────┘ │ │ │
│ │ │ │ └──────────────┘ │ │
│ │ │ │ │ │
│ └────────────────────┘ └────────────────────┘ │
│ │
│ 数据流: │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 1. 应用调用 CUDA API │ │
│ │ 2. rGPU Client 拦截调用 │ │
│ │ 3. 序列化参数并通过网络发送 │ │
│ │ 4. rGPU Server 接收并在本地 GPU 执行 │ │
│ │ 5. 返回结果到客户端 │ │
│ │ │ │
│ │ 关键优化: │ │
│ │ • 批量 RPC 调用减少网络开销 │ │
│ │ • RDMA 直接内存访问减少 CPU 拷贝 │ │
│ │ • 智能缓存减少重复数据传输 │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
rGPU 实现框架
"""
远程 GPU 访问框架 (简化实现)
"""
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
import pickle
import socket
import threading
import struct
import logging
from enum import Enum
class RPCMethod(Enum):
"""RPC 方法"""
CUDA_MALLOC = "cuda_malloc"
CUDA_FREE = "cuda_free"
CUDA_MEMCPY = "cuda_memcpy"
CUDA_LAUNCH_KERNEL = "cuda_launch_kernel"
CUDA_SYNC = "cuda_sync"
TENSOR_CREATE = "tensor_create"
TENSOR_OP = "tensor_op"
MODEL_FORWARD = "model_forward"
@dataclass
class RPCRequest:
"""RPC 请求"""
id: int
method: RPCMethod
args: tuple
kwargs: dict
@dataclass
class RPCResponse:
"""RPC 响应"""
id: int
success: bool
result: Any
error: Optional[str] = None
class rGPUServer:
"""远程 GPU 服务端"""
def __init__(self, host: str = "0.0.0.0", port: int = 50051):
self.host = host
self.port = port
self.socket = None
self._running = False
# GPU 状态管理
self.device_handles: Dict[int, Any] = {}
self.tensor_cache: Dict[int, Any] = {}
self.model_cache: Dict[str, Any] = {}
self.logger = logging.getLogger(__name__)
# 方法映射
self.handlers: Dict[RPCMethod, Callable] = {
RPCMethod.CUDA_MALLOC: self._handle_malloc,
RPCMethod.CUDA_FREE: self._handle_free,
RPCMethod.CUDA_MEMCPY: self._handle_memcpy,
RPCMethod.TENSOR_CREATE: self._handle_tensor_create,
RPCMethod.TENSOR_OP: self._handle_tensor_op,
RPCMethod.MODEL_FORWARD: self._handle_model_forward,
}
def start(self):
"""启动服务"""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((self.host, self.port))
self.socket.listen(10)
self._running = True
self.logger.info(f"rGPU Server listening on {self.host}:{self.port}")
while self._running:
try:
client_socket, address = self.socket.accept()
self.logger.info(f"New connection from {address}")
client_thread = threading.Thread(
target=self._handle_client,
args=(client_socket,)
)
client_thread.daemon = True
client_thread.start()
except Exception as e:
self.logger.error(f"Accept error: {e}")
def stop(self):
"""停止服务"""
self._running = False
if self.socket:
self.socket.close()
def _handle_client(self, client_socket: socket.socket):
"""处理客户端连接"""
try:
while self._running:
# 读取请求长度
length_data = client_socket.recv(4)
if not length_data:
break
length = struct.unpack("!I", length_data)[0]
# 读取请求数据
data = b""
while len(data) < length:
chunk = client_socket.recv(min(length - len(data), 4096))
if not chunk:
break
data += chunk
# 反序列化请求
request: RPCRequest = pickle.loads(data)
# 处理请求
response = self._process_request(request)
# 发送响应
response_data = pickle.dumps(response)
client_socket.send(struct.pack("!I", len(response_data)))
client_socket.sendall(response_data)
except Exception as e:
self.logger.error(f"Client handler error: {e}")
finally:
client_socket.close()
def _process_request(self, request: RPCRequest) -> RPCResponse:
"""处理请求"""
try:
handler = self.handlers.get(request.method)
if not handler:
return RPCResponse(
id=request.id,
success=False,
result=None,
error=f"Unknown method: {request.method}"
)
result = handler(*request.args, **request.kwargs)
return RPCResponse(
id=request.id,
success=True,
result=result
)
except Exception as e:
return RPCResponse(
id=request.id,
success=False,
result=None,
error=str(e)
)
def _handle_malloc(self, size: int) -> int:
"""处理显存分配"""
import torch
tensor = torch.empty(size, dtype=torch.uint8, device='cuda')
handle_id = id(tensor)
self.device_handles[handle_id] = tensor
return handle_id
def _handle_free(self, handle_id: int):
"""处理显存释放"""
if handle_id in self.device_handles:
del self.device_handles[handle_id]
def _handle_memcpy(
self,
src_data: bytes,
dst_handle: int,
direction: str
) -> Optional[bytes]:
"""处理内存拷贝"""
import torch
if direction == "host_to_device":
# 主机到设备
tensor = self.device_handles.get(dst_handle)
if tensor is not None:
tensor.copy_(torch.frombuffer(src_data, dtype=torch.uint8))
return None
else:
# 设备到主机
tensor = self.device_handles.get(dst_handle)
if tensor is not None:
return tensor.cpu().numpy().tobytes()
return None
def _handle_tensor_create(
self,
shape: tuple,
dtype: str,
data: Optional[bytes] = None
) -> int:
"""创建张量"""
import torch
dtype_map = {
"float32": torch.float32,
"float16": torch.float16,
"int64": torch.int64,
"int32": torch.int32,
}
torch_dtype = dtype_map.get(dtype, torch.float32)
if data:
import numpy as np
np_array = np.frombuffer(data, dtype=np.float32).reshape(shape)
tensor = torch.from_numpy(np_array).to(device='cuda', dtype=torch_dtype)
else:
tensor = torch.empty(shape, dtype=torch_dtype, device='cuda')
handle_id = id(tensor)
self.tensor_cache[handle_id] = tensor
return handle_id
def _handle_tensor_op(
self,
op_name: str,
tensor_ids: List[int],
*args,
**kwargs
) -> int:
"""执行张量操作"""
import torch
tensors = [self.tensor_cache[tid] for tid in tensor_ids]
# 获取操作函数
if hasattr(torch, op_name):
op_func = getattr(torch, op_name)
result = op_func(*tensors, *args, **kwargs)
elif len(tensors) > 0 and hasattr(tensors[0], op_name):
method = getattr(tensors[0], op_name)
result = method(*tensors[1:], *args, **kwargs)
else:
raise ValueError(f"Unknown operation: {op_name}")
if isinstance(result, torch.Tensor):
handle_id = id(result)
self.tensor_cache[handle_id] = result
return handle_id
else:
return result
def _handle_model_forward(
self,
model_name: str,
input_tensor_ids: List[int]
) -> List[int]:
"""模型前向传播"""
model = self.model_cache.get(model_name)
if not model:
raise ValueError(f"Model {model_name} not found")
inputs = [self.tensor_cache[tid] for tid in input_tensor_ids]
with torch.no_grad():
outputs = model(*inputs)
if isinstance(outputs, (list, tuple)):
result_ids = []
for out in outputs:
if isinstance(out, torch.Tensor):
handle_id = id(out)
self.tensor_cache[handle_id] = out
result_ids.append(handle_id)
return result_ids
elif isinstance(outputs, torch.Tensor):
handle_id = id(outputs)
self.tensor_cache[handle_id] = outputs
return [handle_id]
else:
return []
class rGPUClient:
"""远程 GPU 客户端"""
def __init__(self, host: str, port: int = 50051):
self.host = host
self.port = port
self.socket = None
self._request_id = 0
self._lock = threading.Lock()
def connect(self):
"""连接服务端"""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
def disconnect(self):
"""断开连接"""
if self.socket:
self.socket.close()
def _call(
self,
method: RPCMethod,
*args,
**kwargs
) -> Any:
"""RPC 调用"""
with self._lock:
self._request_id += 1
request = RPCRequest(
id=self._request_id,
method=method,
args=args,
kwargs=kwargs
)
# 发送请求
data = pickle.dumps(request)
self.socket.send(struct.pack("!I", len(data)))
self.socket.sendall(data)
# 接收响应
length_data = self.socket.recv(4)
length = struct.unpack("!I", length_data)[0]
response_data = b""
while len(response_data) < length:
chunk = self.socket.recv(min(length - len(response_data), 4096))
response_data += chunk
response: RPCResponse = pickle.loads(response_data)
if not response.success:
raise RuntimeError(response.error)
return response.result
def create_tensor(
self,
shape: tuple,
dtype: str = "float32",
data: Optional[bytes] = None
) -> int:
"""创建远程张量"""
return self._call(RPCMethod.TENSOR_CREATE, shape, dtype, data)
def tensor_op(
self,
op_name: str,
tensor_ids: List[int],
*args,
**kwargs
) -> int:
"""执行张量操作"""
return self._call(RPCMethod.TENSOR_OP, op_name, tensor_ids, *args, **kwargs)
def model_forward(
self,
model_name: str,
input_tensor_ids: List[int]
) -> List[int]:
"""模型前向传播"""
return self._call(RPCMethod.MODEL_FORWARD, model_name, input_tensor_ids)
# 高级抽象:远程张量
class RemoteTensor:
"""远程张量封装"""
def __init__(self, client: rGPUClient, handle_id: int, shape: tuple, dtype: str):
self.client = client
self.handle_id = handle_id
self.shape = shape
self.dtype = dtype
def __add__(self, other: 'RemoteTensor') -> 'RemoteTensor':
result_id = self.client.tensor_op("add", [self.handle_id, other.handle_id])
return RemoteTensor(self.client, result_id, self.shape, self.dtype)
def __mul__(self, other: 'RemoteTensor') -> 'RemoteTensor':
result_id = self.client.tensor_op("mul", [self.handle_id, other.handle_id])
return RemoteTensor(self.client, result_id, self.shape, self.dtype)
def matmul(self, other: 'RemoteTensor') -> 'RemoteTensor':
result_id = self.client.tensor_op("matmul", [self.handle_id, other.handle_id])
# 计算新形状
new_shape = (self.shape[0], other.shape[1])
return RemoteTensor(self.client, result_id, new_shape, self.dtype)
# 使用示例
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# 服务端示例 (在 GPU 节点运行)
# server = rGPUServer(port=50051)
# server.start()
# 客户端示例 (在 CPU 节点运行)
# client = rGPUClient("gpu-server-address", 50051)
# client.connect()
# tensor_id = client.create_tensor((1024, 1024), "float32")
# result_id = client.tensor_op("relu", [tensor_id])
# client.disconnect()
print("rGPU framework initialized")
Kubernetes 集成
算力池化 Operator
# 算力池化 CRD 定义
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: computepools.ai.example.com
spec:
group: ai.example.com
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
poolType:
type: string
enum: ["gpu", "npu", "mixed"]
capacity:
type: object
properties:
total:
type: integer
reserved:
type: integer
virtualization:
type: object
properties:
enabled:
type: boolean
strategy:
type: string
enum: ["mig", "time-slicing", "vgpu"]
quotas:
type: array
items:
type: object
properties:
tenant:
type: string
limit:
type: integer
guaranteed:
type: integer
status:
type: object
properties:
phase:
type: string
totalCapacity:
type: integer
allocatedCapacity:
type: integer
availableCapacity:
type: integer
scope: Cluster
names:
plural: computepools
singular: computepool
kind: ComputePool
shortNames:
- cp
---
# 算力池示例
apiVersion: ai.example.com/v1
kind: ComputePool
metadata:
name: training-pool
spec:
poolType: gpu
capacity:
total: 64
reserved: 8
virtualization:
enabled: true
strategy: mig
quotas:
- tenant: team-a
limit: 32
guaranteed: 16
- tenant: team-b
limit: 24
guaranteed: 8
- tenant: team-c
limit: 16
guaranteed: 0
---
# 弹性作业 CRD
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: elasticjobs.ai.example.com
spec:
group: ai.example.com
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
pool:
type: string
priority:
type: integer
preemptible:
type: boolean
scaling:
type: object
properties:
minReplicas:
type: integer
maxReplicas:
type: integer
gpuPerReplica:
type: integer
template:
type: object
x-kubernetes-preserve-unknown-fields: true
scope: Namespaced
names:
plural: elasticjobs
singular: elasticjob
kind: ElasticJob
shortNames:
- ej
---
# 弹性作业示例
apiVersion: ai.example.com/v1
kind: ElasticJob
metadata:
name: llm-training
namespace: ml-workloads
spec:
pool: training-pool
priority: 100
preemptible: false
scaling:
minReplicas: 4
maxReplicas: 16
gpuPerReplica: 8
template:
spec:
containers:
- name: trainer
image: pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime
command: ["torchrun"]
args:
- "--nproc_per_node=8"
- "--nnodes=$(WORLD_SIZE)"
- "--node_rank=$(RANK)"
- "--master_addr=$(MASTER_ADDR)"
- "--master_port=29500"
- "train.py"
resources:
limits:
nvidia.com/gpu: 8
memory: "512Gi"
env:
- name: NCCL_DEBUG
value: "INFO"
---
# 调度队列配置
apiVersion: v1
kind: ConfigMap
metadata:
name: scheduler-queues
namespace: kube-system
data:
queues.yaml: |
queues:
- name: high-priority
weight: 100
maxCapacity: 48
guaranteedCapacity: 32
preemptionPolicy: PreemptLowerPriority
- name: normal
weight: 50
maxCapacity: 64
guaranteedCapacity: 16
preemptionPolicy: PreemptLowerPriority
- name: preemptible
weight: 10
maxCapacity: 64
guaranteedCapacity: 0
preemptionPolicy: Never
fairShare:
enabled: true
preemptionPolicy: PreemptLowerPriority
---
# Volcano 队列配置
apiVersion: scheduling.volcano.sh/v1beta1
kind: Queue
metadata:
name: training-queue
spec:
weight: 1
capability:
nvidia.com/gpu: 32
reclaimable: true
state: Open
affinity:
nodeGroupAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- "gpu-training"
nodeGroupAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- "inference-only"
监控与计费
资源使用监控
"""
算力池化监控与计费系统
"""
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from collections import defaultdict
import threading
import time
@dataclass
class UsageRecord:
"""使用记录"""
tenant_id: str
resource_type: str
resource_id: str
start_time: datetime
end_time: Optional[datetime] = None
quantity: float = 0
cost: float = 0
@dataclass
class BillingConfig:
"""计费配置"""
resource_type: str
price_per_hour: float
minimum_billing_period: int = 60 # 秒
discount_rules: Dict[int, float] = field(default_factory=dict) # 时长 -> 折扣
class UsageTracker:
"""使用量追踪器"""
def __init__(self):
self.active_sessions: Dict[str, UsageRecord] = {}
self.history: List[UsageRecord] = []
self._lock = threading.Lock()
def start_session(
self,
session_id: str,
tenant_id: str,
resource_type: str,
resource_id: str,
quantity: float
):
"""开始使用会话"""
with self._lock:
record = UsageRecord(
tenant_id=tenant_id,
resource_type=resource_type,
resource_id=resource_id,
start_time=datetime.now(),
quantity=quantity
)
self.active_sessions[session_id] = record
def end_session(self, session_id: str) -> Optional[UsageRecord]:
"""结束使用会话"""
with self._lock:
record = self.active_sessions.pop(session_id, None)
if record:
record.end_time = datetime.now()
self.history.append(record)
return record
def get_current_usage(self, tenant_id: str = None) -> Dict[str, float]:
"""获取当前使用量"""
with self._lock:
usage = defaultdict(float)
for session_id, record in self.active_sessions.items():
if tenant_id and record.tenant_id != tenant_id:
continue
key = f"{record.resource_type}:{record.tenant_id}"
usage[key] += record.quantity
return dict(usage)
def get_usage_summary(
self,
tenant_id: str,
start_time: datetime,
end_time: datetime
) -> Dict:
"""获取使用量汇总"""
with self._lock:
summary = {
"tenant_id": tenant_id,
"period_start": start_time.isoformat(),
"period_end": end_time.isoformat(),
"resources": {}
}
for record in self.history:
if record.tenant_id != tenant_id:
continue
if record.start_time < start_time or record.start_time > end_time:
continue
resource_key = record.resource_type
if resource_key not in summary["resources"]:
summary["resources"][resource_key] = {
"total_hours": 0,
"total_cost": 0,
"session_count": 0
}
# 计算使用时长
duration = (record.end_time - record.start_time).total_seconds() / 3600
summary["resources"][resource_key]["total_hours"] += duration * record.quantity
summary["resources"][resource_key]["total_cost"] += record.cost
summary["resources"][resource_key]["session_count"] += 1
return summary
class BillingSystem:
"""计费系统"""
def __init__(self, usage_tracker: UsageTracker):
self.usage_tracker = usage_tracker
self.billing_configs: Dict[str, BillingConfig] = {}
self.tenant_balances: Dict[str, float] = defaultdict(float)
self.invoices: List[Dict] = []
def configure_pricing(
self,
resource_type: str,
price_per_hour: float,
discount_rules: Dict[int, float] = None
):
"""配置计费规则"""
self.billing_configs[resource_type] = BillingConfig(
resource_type=resource_type,
price_per_hour=price_per_hour,
discount_rules=discount_rules or {}
)
def calculate_cost(
self,
resource_type: str,
duration_seconds: float,
quantity: float
) -> float:
"""计算费用"""
config = self.billing_configs.get(resource_type)
if not config:
return 0
# 最小计费周期
billable_seconds = max(duration_seconds, config.minimum_billing_period)
hours = billable_seconds / 3600
base_cost = hours * quantity * config.price_per_hour
# 应用折扣
total_hours = hours * quantity
discount = 1.0
for threshold, disc in sorted(config.discount_rules.items(), reverse=True):
if total_hours >= threshold:
discount = disc
break
return base_cost * discount
def charge_session(self, session_id: str) -> Optional[float]:
"""结算会话"""
record = self.usage_tracker.end_session(session_id)
if not record:
return None
duration = (record.end_time - record.start_time).total_seconds()
cost = self.calculate_cost(
record.resource_type,
duration,
record.quantity
)
record.cost = cost
self.tenant_balances[record.tenant_id] -= cost
return cost
def add_credit(self, tenant_id: str, amount: float):
"""增加信用额度"""
self.tenant_balances[tenant_id] += amount
def get_balance(self, tenant_id: str) -> float:
"""获取余额"""
return self.tenant_balances[tenant_id]
def generate_invoice(
self,
tenant_id: str,
start_time: datetime,
end_time: datetime
) -> Dict:
"""生成账单"""
summary = self.usage_tracker.get_usage_summary(
tenant_id, start_time, end_time
)
invoice = {
"invoice_id": f"INV-{int(time.time())}",
"tenant_id": tenant_id,
"period": {
"start": start_time.isoformat(),
"end": end_time.isoformat()
},
"line_items": [],
"total": 0,
"generated_at": datetime.now().isoformat()
}
for resource_type, usage in summary.get("resources", {}).items():
config = self.billing_configs.get(resource_type)
price = config.price_per_hour if config else 0
item = {
"resource_type": resource_type,
"hours": usage["total_hours"],
"unit_price": price,
"subtotal": usage["total_cost"],
"sessions": usage["session_count"]
}
invoice["line_items"].append(item)
invoice["total"] += usage["total_cost"]
self.invoices.append(invoice)
return invoice
class ResourceMonitor:
"""资源监控"""
def __init__(self, usage_tracker: UsageTracker):
self.usage_tracker = usage_tracker
self.metrics: Dict[str, List[Dict]] = defaultdict(list)
self._running = False
def start(self, interval: int = 60):
"""启动监控"""
self._running = True
self._thread = threading.Thread(target=self._collect_loop, args=(interval,))
self._thread.daemon = True
self._thread.start()
def stop(self):
"""停止监控"""
self._running = False
def _collect_loop(self, interval: int):
"""采集循环"""
while self._running:
self._collect_metrics()
time.sleep(interval)
def _collect_metrics(self):
"""采集指标"""
timestamp = datetime.now()
current_usage = self.usage_tracker.get_current_usage()
for key, value in current_usage.items():
self.metrics[key].append({
"timestamp": timestamp.isoformat(),
"value": value
})
# 保留最近 24 小时数据
cutoff = timestamp - timedelta(hours=24)
self.metrics[key] = [
m for m in self.metrics[key]
if datetime.fromisoformat(m["timestamp"]) > cutoff
]
def get_metrics(
self,
resource_key: str,
start_time: datetime = None,
end_time: datetime = None
) -> List[Dict]:
"""获取指标"""
metrics = self.metrics.get(resource_key, [])
if start_time or end_time:
filtered = []
for m in metrics:
ts = datetime.fromisoformat(m["timestamp"])
if start_time and ts < start_time:
continue
if end_time and ts > end_time:
continue
filtered.append(m)
return filtered
return metrics
def export_prometheus_metrics(self) -> str:
"""导出 Prometheus 格式指标"""
lines = []
lines.append("# HELP compute_pool_usage Current resource usage")
lines.append("# TYPE compute_pool_usage gauge")
current_usage = self.usage_tracker.get_current_usage()
for key, value in current_usage.items():
resource_type, tenant_id = key.split(":")
lines.append(
f'compute_pool_usage{{resource_type="{resource_type}",'
f'tenant_id="{tenant_id}"}} {value}'
)
return '\n'.join(lines)
# 使用示例
if __name__ == "__main__":
# 创建使用追踪器
tracker = UsageTracker()
# 创建计费系统
billing = BillingSystem(tracker)
# 配置价格
billing.configure_pricing(
resource_type="nvidia.com/gpu",
price_per_hour=2.0,
discount_rules={
100: 0.9, # 100小时以上 9折
500: 0.8, # 500小时以上 8折
1000: 0.7 # 1000小时以上 7折
}
)
# 增加信用额度
billing.add_credit("tenant-a", 1000.0)
# 开始使用
tracker.start_session(
session_id="session-1",
tenant_id="tenant-a",
resource_type="nvidia.com/gpu",
resource_id="gpu-0",
quantity=4
)
# 模拟使用
time.sleep(2)
# 结算
cost = billing.charge_session("session-1")
print(f"Session cost: ${cost:.2f}")
print(f"Remaining balance: ${billing.get_balance('tenant-a'):.2f}")
# 生成账单
invoice = billing.generate_invoice(
tenant_id="tenant-a",
start_time=datetime.now() - timedelta(days=1),
end_time=datetime.now()
)
print(f"\nInvoice: {invoice}")
最佳实践
算力池化部署清单
# 算力池化最佳实践
deployment_checklist:
# 资源规划
resource_planning:
- name: "容量规划"
considerations:
- 预留 10-20% 容量用于弹性伸缩
- 区分训练池和推理池
- 考虑峰值负载
- name: "配额策略"
recommendations:
- 按团队设置配额
- 保证配额 + 弹性配额
- 定期审核使用情况
# 调度策略
scheduling:
- name: "队列设计"
patterns:
- 高优先级队列 (生产任务)
- 普通队列 (开发训练)
- 抢占队列 (临时任务)
- name: "弹性伸缩"
policies:
- 基于队列深度
- 基于资源利用率
- 基于时间窗口
# 虚拟化配置
virtualization:
- name: "MIG 策略"
configs:
training: "3g.40gb" # 大实例
inference: "1g.10gb" # 小实例
mixed: "balanced"
- name: "时间片"
recommendations:
- 推理服务: replicas=4
- 开发测试: replicas=2
# 监控告警
monitoring:
metrics:
- "资源利用率"
- "队列等待时间"
- "作业完成率"
- "抢占次数"
alerts:
- name: "低利用率"
condition: "utilization < 50%"
action: "检查调度策略"
- name: "长队列"
condition: "queue_wait_time > 1h"
action: "扩容或调整配额"
---
# 参考架构
reference_architecture:
# 小规模 (< 100 GPU)
small:
pools: 1
queues: 2
scheduler: "Kubernetes 原生"
virtualization: "时间片"
# 中规模 (100-500 GPU)
medium:
pools: 2-3
queues: 4-5
scheduler: "Volcano"
virtualization: "MIG + 时间片"
# 大规模 (> 500 GPU)
large:
pools: "按集群/区域"
queues: "层级队列"
scheduler: "自定义 + Volcano"
virtualization: "MIG"
additional:
- "跨集群联邦"
- "多级缓存"
- "智能调度"
总结
算力池化与弹性调度是构建高效 AI 基础设施的关键:
- 资源池化:统一管理分散资源,提高利用率
- 弹性调度:根据负载动态分配,优化资源使用
- 多租户隔离:配额管理、公平共享、优先级抢占
- 监控计费:使用追踪、成本分摊、账单生成
通过合理的池化策略和调度算法,可以在保证服务质量的同时,最大化资源利用效率。