HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

GPU 调度器实现

概述

在 Kubernetes 中,GPU 资源的调度远比 CPU 和内存复杂。默认调度器虽然支持 Extended Resources,但无法感知 GPU 的拓扑结构、NVLINK 互联、MIG 分区等特性。本章深入剖析 Kubernetes 调度器架构,并实现一个 GPU 感知的自定义调度器。

1. Kubernetes 调度器架构

1.1 调度器核心流程

┌─────────────────────────────────────────────────────────────────────┐
│                    Kubernetes Scheduler Architecture                 │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  ┌──────────────┐    ┌───────────────────────────────────────────┐  │
│  │   Informer   │───▶│              Scheduling Queue             │  │
│  │   (Watch)    │    │  ┌─────────┐ ┌─────────┐ ┌─────────────┐  │  │
│  └──────────────┘    │  │ Active  │ │ Backoff │ │ Unschedulable│  │  │
│                      │  │  Queue  │ │  Queue  │ │    Queue     │  │  │
│                      │  └────┬────┘ └────┬────┘ └──────────────┘  │  │
│                      └───────┼───────────┼───────────────────────┘  │
│                              │           │                          │
│                              ▼           │                          │
│  ┌───────────────────────────────────────┼───────────────────────┐  │
│  │           Scheduling Cycle            │                       │  │
│  │  ┌─────────────────────────────────┐  │                       │  │
│  │  │       Scheduling Framework      │  │                       │  │
│  │  │                                 │  │                       │  │
│  │  │  PreFilter ──▶ Filter ──▶ PostFilter                       │  │
│  │  │       │                    │                               │  │
│  │  │       ▼                    ▼                               │  │
│  │  │  PreScore ──▶ Score ──▶ NormalizeScore                     │  │
│  │  │       │                    │                               │  │
│  │  │       ▼                    ▼                               │  │
│  │  │   Reserve ──▶ Permit ──▶ PreBind ──▶ Bind ──▶ PostBind     │  │
│  │  │                                                            │  │
│  │  └─────────────────────────────────┘                          │  │
│  └───────────────────────────────────────────────────────────────┘  │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

1.2 调度框架扩展点

// Scheduling Framework 定义了调度器的扩展点
// 路径: kubernetes/pkg/scheduler/framework/interface.go

// PreFilterPlugin 在过滤之前执行,用于预处理或检查 Pod 信息
type PreFilterPlugin interface {
    Plugin
    // PreFilter 在调度周期开始时调用
    // 可用于计算 Pod 需要的资源、检查必要条件等
    PreFilter(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status)
    // PreFilterExtensions 返回扩展接口
    PreFilterExtensions() PreFilterExtensions
}

// FilterPlugin 过滤不满足条件的节点
type FilterPlugin interface {
    Plugin
    // Filter 对每个节点调用,返回节点是否适合运行 Pod
    // 这是最核心的过滤逻辑
    Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status
}

// PostFilterPlugin 当所有节点都被过滤掉时执行
type PostFilterPlugin interface {
    Plugin
    // PostFilter 尝试抢占等操作使 Pod 可调度
    PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod,
               filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
}

// PreScorePlugin 在打分前执行预处理
type PreScorePlugin interface {
    Plugin
    PreScore(ctx context.Context, state *CycleState, pod *v1.Pod,
             nodes []*v1.Node) *Status
}

// ScorePlugin 为通过过滤的节点打分
type ScorePlugin interface {
    Plugin
    // Score 返回节点的得分,范围 [0, 100]
    Score(ctx context.Context, state *CycleState, pod *v1.Pod,
          nodeName string) (int64, *Status)
    // ScoreExtensions 返回归一化扩展
    ScoreExtensions() ScoreExtensions
}

// ReservePlugin 在绑定前预留资源
type ReservePlugin interface {
    Plugin
    // Reserve 预留节点上的资源
    Reserve(ctx context.Context, state *CycleState, pod *v1.Pod,
            nodeName string) *Status
    // Unreserve 取消预留(当后续步骤失败时调用)
    Unreserve(ctx context.Context, state *CycleState, pod *v1.Pod,
              nodeName string)
}

// PermitPlugin 在绑定前进行最终检查
type PermitPlugin interface {
    Plugin
    // Permit 可以返回:
    // - Success: 立即绑定
    // - Wait: 等待指定时间
    // - Deny: 拒绝调度
    Permit(ctx context.Context, state *CycleState, pod *v1.Pod,
           nodeName string) (*Status, time.Duration)
}

// BindPlugin 执行实际的绑定操作
type BindPlugin interface {
    Plugin
    // Bind 将 Pod 绑定到节点
    Bind(ctx context.Context, state *CycleState, pod *v1.Pod,
         nodeName string) *Status
}

1.3 默认调度器的 GPU 资源处理

默认调度器通过 NodeResourcesFit 插件处理 GPU 资源:

// 路径: kubernetes/pkg/scheduler/framework/plugins/noderesources/fit.go

// NodeResourcesFit 检查节点是否有足够的资源
type NodeResourcesFit struct {
    // 忽略的资源列表
    ignoredResources      sets.String
    // 忽略的资源组
    ignoredResourceGroups sets.String
    // 是否启用 Pod overhead
    enablePodOverhead     bool
    // 评分策略
    scoreStrategy         *config.ScoringStrategy
}

// Filter 过滤资源不足的节点
func (f *NodeResourcesFit) Filter(ctx context.Context, cycleState *framework.CycleState,
    pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {

    // 获取 Pod 请求的资源
    podRequest := f.getResourceRequest(pod)

    // 检查节点可用资源
    insufficientResources := fitsRequest(podRequest, nodeInfo,
        f.ignoredResources, f.ignoredResourceGroups)

    if len(insufficientResources) != 0 {
        // 资源不足,返回失败原因
        failureReasons := make([]string, 0, len(insufficientResources))
        for _, r := range insufficientResources {
            failureReasons = append(failureReasons, r.Reason)
        }
        return framework.NewStatus(framework.Unschedulable, failureReasons...)
    }
    return nil
}

// fitsRequest 检查节点资源是否满足请求
func fitsRequest(podRequest *preFilterState, nodeInfo *framework.NodeInfo,
    ignoredExtendedResources, ignoredResourceGroups sets.String) []InsufficientResource {

    insufficientResources := make([]InsufficientResource, 0, 4)

    // 检查 CPU
    if podRequest.MilliCPU > (nodeInfo.Allocatable.MilliCPU - nodeInfo.Requested.MilliCPU) {
        insufficientResources = append(insufficientResources, InsufficientResource{
            ResourceName: v1.ResourceCPU,
            Reason:       "Insufficient cpu",
            Requested:    podRequest.MilliCPU,
            Used:         nodeInfo.Requested.MilliCPU,
            Capacity:     nodeInfo.Allocatable.MilliCPU,
        })
    }

    // 检查内存
    if podRequest.Memory > (nodeInfo.Allocatable.Memory - nodeInfo.Requested.Memory) {
        insufficientResources = append(insufficientResources, InsufficientResource{
            ResourceName: v1.ResourceMemory,
            Reason:       "Insufficient memory",
            Requested:    podRequest.Memory,
            Used:         nodeInfo.Requested.Memory,
            Capacity:     nodeInfo.Allocatable.Memory,
        })
    }

    // 检查扩展资源(包括 GPU)
    for rName, rQuant := range podRequest.ScalarResources {
        // 跳过忽略的资源
        if ignoredExtendedResources.Has(string(rName)) {
            continue
        }

        // 检查资源组是否被忽略
        if ignoredResourceGroups.Len() > 0 {
            group := strings.Split(string(rName), "/")[0]
            if ignoredResourceGroups.Has(group) {
                continue
            }
        }

        // 关键:检查 GPU 等扩展资源
        if rQuant > (nodeInfo.Allocatable.ScalarResources[rName] -
                     nodeInfo.Requested.ScalarResources[rName]) {
            insufficientResources = append(insufficientResources, InsufficientResource{
                ResourceName: rName,
                Reason:       fmt.Sprintf("Insufficient %v", rName),
                Requested:    rQuant,
                Used:         nodeInfo.Requested.ScalarResources[rName],
                Capacity:     nodeInfo.Allocatable.ScalarResources[rName],
            })
        }
    }

    return insufficientResources
}

默认调度器的局限性:

  1. 只做数量匹配:只检查 GPU 数量,不关心 GPU 型号
  2. 无拓扑感知:不考虑 NVLINK、PCIe 拓扑
  3. 无碎片优化:可能造成 GPU 资源碎片化
  4. 无 MIG 感知:无法智能分配 MIG 实例

2. GPU 感知调度器设计

2.1 架构设计

┌─────────────────────────────────────────────────────────────────────────┐
│                     GPU-Aware Scheduler Architecture                     │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  ┌────────────────────────────────────────────────────────────────────┐ │
│  │                         Scheduler Plugins                          │ │
│  │                                                                    │ │
│  │  ┌──────────────────┐  ┌──────────────────┐  ┌──────────────────┐ │ │
│  │  │  GPUPreFilter    │  │   GPUFilter      │  │   GPUScore       │ │ │
│  │  │                  │  │                  │  │                  │ │ │
│  │  │ - 解析GPU请求    │  │ - GPU型号匹配    │  │ - 拓扑评分       │ │ │
│  │  │ - 检查注解       │  │ - 拓扑约束检查   │  │ - 碎片化评分     │ │ │
│  │  │ - 预处理状态     │  │ - MIG分区检查    │  │ - 利用率评分     │ │ │
│  │  └──────────────────┘  └──────────────────┘  └──────────────────┘ │ │
│  │                                                                    │ │
│  │  ┌──────────────────┐  ┌──────────────────┐  ┌──────────────────┐ │ │
│  │  │  GPUReserve      │  │   GPUBind        │  │   GPUPostBind    │ │ │
│  │  │                  │  │                  │  │                  │ │ │
│  │  │ - 预留GPU设备    │  │ - 分配具体GPU    │  │ - 更新监控指标   │ │ │
│  │  │ - 更新本地缓存   │  │ - 更新节点状态   │  │ - 记录审计日志   │ │ │
│  │  └──────────────────┘  └──────────────────┘  └──────────────────┘ │ │
│  └────────────────────────────────────────────────────────────────────┘ │
│                                      │                                   │
│                                      ▼                                   │
│  ┌────────────────────────────────────────────────────────────────────┐ │
│  │                         GPU State Manager                          │ │
│  │                                                                    │ │
│  │  ┌──────────────┐  ┌──────────────┐  ┌────────────────────────┐   │ │
│  │  │  Node GPU    │  │  GPU Topo    │  │     MIG Manager        │   │ │
│  │  │   Cache      │  │   Graph      │  │                        │   │ │
│  │  │              │  │              │  │  ┌──────────────────┐  │   │ │
│  │  │ GPU 0: Free  │  │  [0]──[1]    │  │  │ GPU 0: 3g.40gb   │  │   │ │
│  │  │ GPU 1: Used  │  │   │    │     │  │  │ GPU 1: 7g.80gb   │  │   │ │
│  │  │ GPU 2: Free  │  │  [2]──[3]    │  │  └──────────────────┘  │   │ │
│  │  └──────────────┘  └──────────────┘  └────────────────────────┘   │ │
│  └────────────────────────────────────────────────────────────────────┘ │
│                                      │                                   │
│                                      ▼                                   │
│  ┌────────────────────────────────────────────────────────────────────┐ │
│  │                           Data Sources                             │ │
│  │                                                                    │ │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────────────────┐ │ │
│  │  │   Node CRD   │  │ GPU Metrics  │  │    Device Plugin API     │ │ │
│  │  │              │  │ (Prometheus) │  │                          │ │ │
│  │  └──────────────┘  └──────────────┘  └──────────────────────────┘ │ │
│  └────────────────────────────────────────────────────────────────────┘ │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

2.2 GPU 状态数据结构

package gpuscheduler

import (
    "sync"
    "time"

    v1 "k8s.io/api/core/v1"
)

// GPUDevice 表示单个 GPU 设备
type GPUDevice struct {
    // 设备 UUID
    UUID string `json:"uuid"`
    // 设备索引
    Index int `json:"index"`
    // GPU 型号
    Model string `json:"model"`
    // 显存大小(MB)
    MemoryTotal int64 `json:"memoryTotal"`
    // 可用显存(MB)
    MemoryFree int64 `json:"memoryFree"`
    // 计算能力
    ComputeCapability string `json:"computeCapability"`
    // 是否支持 MIG
    MIGEnabled bool `json:"migEnabled"`
    // MIG 设备列表
    MIGDevices []MIGDevice `json:"migDevices,omitempty"`
    // 当前状态
    State DeviceState `json:"state"`
    // 分配给的 Pod
    AllocatedPod string `json:"allocatedPod,omitempty"`
    // 健康状态
    Health DeviceHealth `json:"health"`
    // NVLINK 连接
    NVLinkPeers []int `json:"nvlinkPeers,omitempty"`
    // PCIe 信息
    PCIeInfo PCIeInfo `json:"pcieInfo"`
}

// DeviceState GPU 设备状态
type DeviceState string

const (
    DeviceStateFree      DeviceState = "Free"
    DeviceStateAllocated DeviceState = "Allocated"
    DeviceStateReserved  DeviceState = "Reserved"
    DeviceStateFailed    DeviceState = "Failed"
)

// DeviceHealth GPU 健康状态
type DeviceHealth string

const (
    DeviceHealthy   DeviceHealth = "Healthy"
    DeviceUnhealthy DeviceHealth = "Unhealthy"
    DeviceUnknown   DeviceHealth = "Unknown"
)

// MIGDevice MIG 设备实例
type MIGDevice struct {
    // MIG UUID
    UUID string `json:"uuid"`
    // MIG profile(如 "1g.5gb")
    Profile string `json:"profile"`
    // GPU 实例 ID
    GIIndex int `json:"giIndex"`
    // 计算实例 ID
    CIIndex int `json:"ciIndex"`
    // 状态
    State DeviceState `json:"state"`
    // 分配给的 Pod
    AllocatedPod string `json:"allocatedPod,omitempty"`
}

// PCIeInfo PCIe 拓扑信息
type PCIeInfo struct {
    // Bus ID
    BusID string `json:"busId"`
    // NUMA 节点
    NUMANode int `json:"numaNode"`
    // PCIe 代数
    Generation int `json:"generation"`
    // 链路宽度
    LinkWidth int `json:"linkWidth"`
}

// NodeGPUInfo 节点的 GPU 信息
type NodeGPUInfo struct {
    // 节点名称
    NodeName string `json:"nodeName"`
    // GPU 设备列表
    GPUs []GPUDevice `json:"gpus"`
    // GPU 拓扑
    Topology *GPUTopology `json:"topology"`
    // 驱动版本
    DriverVersion string `json:"driverVersion"`
    // CUDA 版本
    CUDAVersion string `json:"cudaVersion"`
    // 更新时间
    LastUpdated time.Time `json:"lastUpdated"`
}

// GPUTopology GPU 拓扑结构
type GPUTopology struct {
    // 节点名
    NodeName string `json:"nodeName"`
    // 连接矩阵,Links[i][j] 表示 GPU i 到 GPU j 的连接类型
    Links [][]LinkType `json:"links"`
    // NUMA 亲和性,NUMANodes[i] 表示 GPU i 所属的 NUMA 节点
    NUMANodes []int `json:"numaNodes"`
}

// LinkType GPU 间连接类型
type LinkType string

const (
    LinkTypeNone      LinkType = "None"
    LinkTypePCIe      LinkType = "PCIe"
    LinkTypeNVLink    LinkType = "NVLink"
    LinkTypeNVSwitch  LinkType = "NVSwitch"
    LinkTypeSameBoard LinkType = "SameBoard"
)

// GPUStateManager GPU 状态管理器
type GPUStateManager struct {
    mu sync.RWMutex
    // 节点 GPU 信息缓存
    nodeGPUInfo map[string]*NodeGPUInfo
    // 更新通道
    updateChan chan *NodeGPUInfo
    // 停止信号
    stopChan chan struct{}
}

// NewGPUStateManager 创建状态管理器
func NewGPUStateManager() *GPUStateManager {
    return &GPUStateManager{
        nodeGPUInfo: make(map[string]*NodeGPUInfo),
        updateChan:  make(chan *NodeGPUInfo, 100),
        stopChan:    make(chan struct{}),
    }
}

// GetNodeGPUInfo 获取节点 GPU 信息
func (m *GPUStateManager) GetNodeGPUInfo(nodeName string) (*NodeGPUInfo, bool) {
    m.mu.RLock()
    defer m.mu.RUnlock()
    info, ok := m.nodeGPUInfo[nodeName]
    return info, ok
}

// UpdateNodeGPUInfo 更新节点 GPU 信息
func (m *GPUStateManager) UpdateNodeGPUInfo(info *NodeGPUInfo) {
    m.mu.Lock()
    defer m.mu.Unlock()
    info.LastUpdated = time.Now()
    m.nodeGPUInfo[info.NodeName] = info
}

// GetAvailableGPUs 获取节点可用的 GPU 列表
func (m *GPUStateManager) GetAvailableGPUs(nodeName string) []GPUDevice {
    m.mu.RLock()
    defer m.mu.RUnlock()

    info, ok := m.nodeGPUInfo[nodeName]
    if !ok {
        return nil
    }

    var available []GPUDevice
    for _, gpu := range info.GPUs {
        if gpu.State == DeviceStateFree && gpu.Health == DeviceHealthy {
            available = append(available, gpu)
        }
    }
    return available
}

// ReserveGPUs 预留 GPU 设备
func (m *GPUStateManager) ReserveGPUs(nodeName string, gpuIndices []int, podName string) error {
    m.mu.Lock()
    defer m.mu.Unlock()

    info, ok := m.nodeGPUInfo[nodeName]
    if !ok {
        return fmt.Errorf("node %s not found", nodeName)
    }

    // 检查所有 GPU 是否可用
    for _, idx := range gpuIndices {
        if idx >= len(info.GPUs) {
            return fmt.Errorf("GPU index %d out of range", idx)
        }
        if info.GPUs[idx].State != DeviceStateFree {
            return fmt.Errorf("GPU %d is not free", idx)
        }
    }

    // 预留 GPU
    for _, idx := range gpuIndices {
        info.GPUs[idx].State = DeviceStateReserved
        info.GPUs[idx].AllocatedPod = podName
    }

    return nil
}

// UnreserveGPUs 取消预留
func (m *GPUStateManager) UnreserveGPUs(nodeName string, gpuIndices []int) {
    m.mu.Lock()
    defer m.mu.Unlock()

    info, ok := m.nodeGPUInfo[nodeName]
    if !ok {
        return
    }

    for _, idx := range gpuIndices {
        if idx < len(info.GPUs) && info.GPUs[idx].State == DeviceStateReserved {
            info.GPUs[idx].State = DeviceStateFree
            info.GPUs[idx].AllocatedPod = ""
        }
    }
}

3. 调度插件实现

3.1 PreFilter 插件

package gpuscheduler

import (
    "context"
    "fmt"
    "strconv"

    v1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/kubernetes/pkg/scheduler/framework"
)

const (
    // PluginName 插件名称
    PluginName = "GPUScheduler"

    // GPU 资源名称
    ResourceNvidiaGPU = "nvidia.com/gpu"
    ResourceMIGPrefix = "nvidia.com/mig-"

    // 注解键
    AnnotationGPUModel       = "gpu.scheduler/model"
    AnnotationGPUMemory      = "gpu.scheduler/memory"
    AnnotationGPUTopology    = "gpu.scheduler/topology"
    AnnotationMIGProfile     = "gpu.scheduler/mig-profile"
    AnnotationPreferNVLink   = "gpu.scheduler/prefer-nvlink"
    AnnotationPreferSameNUMA = "gpu.scheduler/prefer-same-numa"

    // 状态键
    StateKeyGPURequest = "GPURequest"
)

// GPUSchedulerPlugin GPU 调度插件
type GPUSchedulerPlugin struct {
    handle       framework.Handle
    stateManager *GPUStateManager
}

// GPURequest GPU 请求信息
type GPURequest struct {
    // 请求的 GPU 数量
    Count int64
    // 指定的 GPU 型号(可选)
    Model string
    // 最小显存需求(MB)
    MinMemory int64
    // MIG profile(可选)
    MIGProfile string
    // 拓扑约束
    TopologyConstraint string
    // 是否偏好 NVLINK
    PreferNVLink bool
    // 是否偏好同一 NUMA
    PreferSameNUMA bool
}

// Name 返回插件名称
func (g *GPUSchedulerPlugin) Name() string {
    return PluginName
}

// New 创建插件实例
func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
    return &GPUSchedulerPlugin{
        handle:       handle,
        stateManager: NewGPUStateManager(),
    }, nil
}

// PreFilter 预过滤阶段
func (g *GPUSchedulerPlugin) PreFilter(ctx context.Context, state *framework.CycleState,
    pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {

    // 解析 GPU 请求
    gpuRequest, err := g.parseGPURequest(pod)
    if err != nil {
        return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable,
            fmt.Sprintf("failed to parse GPU request: %v", err))
    }

    // 如果不需要 GPU,跳过
    if gpuRequest.Count == 0 {
        return nil, framework.NewStatus(framework.Success)
    }

    // 验证请求的合法性
    if err := g.validateGPURequest(gpuRequest); err != nil {
        return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable,
            fmt.Sprintf("invalid GPU request: %v", err))
    }

    // 将请求信息存入调度周期状态
    state.Write(StateKeyGPURequest, gpuRequest)

    return nil, framework.NewStatus(framework.Success)
}

// PreFilterExtensions 返回扩展接口
func (g *GPUSchedulerPlugin) PreFilterExtensions() framework.PreFilterExtensions {
    return nil
}

// parseGPURequest 解析 Pod 的 GPU 请求
func (g *GPUSchedulerPlugin) parseGPURequest(pod *v1.Pod) (*GPURequest, error) {
    request := &GPURequest{}

    // 从容器资源请求中获取 GPU 数量
    for _, container := range pod.Spec.Containers {
        if container.Resources.Requests != nil {
            // 检查标准 GPU 资源
            if gpuQty, ok := container.Resources.Requests[ResourceNvidiaGPU]; ok {
                request.Count += gpuQty.Value()
            }

            // 检查 MIG 资源
            for resName, qty := range container.Resources.Requests {
                if strings.HasPrefix(string(resName), ResourceMIGPrefix) {
                    request.Count += qty.Value()
                    // 提取 MIG profile
                    request.MIGProfile = strings.TrimPrefix(string(resName), ResourceMIGPrefix)
                }
            }
        }
    }

    // 从注解中获取附加需求
    if pod.Annotations != nil {
        if model, ok := pod.Annotations[AnnotationGPUModel]; ok {
            request.Model = model
        }

        if memory, ok := pod.Annotations[AnnotationGPUMemory]; ok {
            memMB, err := strconv.ParseInt(memory, 10, 64)
            if err != nil {
                return nil, fmt.Errorf("invalid memory annotation: %v", err)
            }
            request.MinMemory = memMB
        }

        if topology, ok := pod.Annotations[AnnotationGPUTopology]; ok {
            request.TopologyConstraint = topology
        }

        if mig, ok := pod.Annotations[AnnotationMIGProfile]; ok {
            request.MIGProfile = mig
        }

        if prefer, ok := pod.Annotations[AnnotationPreferNVLink]; ok {
            request.PreferNVLink = prefer == "true"
        }

        if prefer, ok := pod.Annotations[AnnotationPreferSameNUMA]; ok {
            request.PreferSameNUMA = prefer == "true"
        }
    }

    return request, nil
}

// validateGPURequest 验证 GPU 请求
func (g *GPUSchedulerPlugin) validateGPURequest(request *GPURequest) error {
    // 验证 GPU 数量
    if request.Count < 0 {
        return fmt.Errorf("negative GPU count: %d", request.Count)
    }

    if request.Count > 8 {
        // 通常单节点最多 8 GPU
        return fmt.Errorf("GPU count %d exceeds maximum", request.Count)
    }

    // 验证 MIG profile
    if request.MIGProfile != "" {
        validProfiles := []string{
            "1g.5gb", "1g.10gb", "1g.20gb",
            "2g.10gb", "2g.20gb",
            "3g.20gb", "3g.40gb",
            "4g.40gb",
            "7g.40gb", "7g.80gb",
        }
        valid := false
        for _, p := range validProfiles {
            if request.MIGProfile == p {
                valid = true
                break
            }
        }
        if !valid {
            return fmt.Errorf("invalid MIG profile: %s", request.MIGProfile)
        }
    }

    // 验证拓扑约束
    if request.TopologyConstraint != "" {
        validConstraints := []string{"single", "nvlink", "same-numa", "any"}
        valid := false
        for _, c := range validConstraints {
            if request.TopologyConstraint == c {
                valid = true
                break
            }
        }
        if !valid {
            return fmt.Errorf("invalid topology constraint: %s", request.TopologyConstraint)
        }
    }

    return nil
}

3.2 Filter 插件

// Filter 过滤不满足 GPU 需求的节点
func (g *GPUSchedulerPlugin) Filter(ctx context.Context, state *framework.CycleState,
    pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {

    // 获取 GPU 请求
    gpuRequestData, err := state.Read(StateKeyGPURequest)
    if err != nil {
        // 没有 GPU 请求,跳过
        return framework.NewStatus(framework.Success)
    }

    gpuRequest := gpuRequestData.(*GPURequest)
    if gpuRequest.Count == 0 {
        return framework.NewStatus(framework.Success)
    }

    nodeName := nodeInfo.Node().Name

    // 获取节点 GPU 信息
    nodeGPUInfo, ok := g.stateManager.GetNodeGPUInfo(nodeName)
    if !ok {
        return framework.NewStatus(framework.Unschedulable,
            fmt.Sprintf("node %s has no GPU info", nodeName))
    }

    // 检查 GPU 数量
    availableGPUs := g.stateManager.GetAvailableGPUs(nodeName)
    if int64(len(availableGPUs)) < gpuRequest.Count {
        return framework.NewStatus(framework.Unschedulable,
            fmt.Sprintf("insufficient GPUs: need %d, available %d",
                gpuRequest.Count, len(availableGPUs)))
    }

    // 检查 GPU 型号
    if gpuRequest.Model != "" {
        matchingGPUs := 0
        for _, gpu := range availableGPUs {
            if gpu.Model == gpuRequest.Model {
                matchingGPUs++
            }
        }
        if int64(matchingGPUs) < gpuRequest.Count {
            return framework.NewStatus(framework.Unschedulable,
                fmt.Sprintf("insufficient GPUs with model %s: need %d, available %d",
                    gpuRequest.Model, gpuRequest.Count, matchingGPUs))
        }
    }

    // 检查显存需求
    if gpuRequest.MinMemory > 0 {
        matchingGPUs := 0
        for _, gpu := range availableGPUs {
            if gpu.MemoryTotal >= gpuRequest.MinMemory {
                matchingGPUs++
            }
        }
        if int64(matchingGPUs) < gpuRequest.Count {
            return framework.NewStatus(framework.Unschedulable,
                fmt.Sprintf("insufficient GPUs with memory >= %dMB: need %d, available %d",
                    gpuRequest.MinMemory, gpuRequest.Count, matchingGPUs))
        }
    }

    // 检查 MIG 配置
    if gpuRequest.MIGProfile != "" {
        if !g.checkMIGAvailability(nodeGPUInfo, gpuRequest) {
            return framework.NewStatus(framework.Unschedulable,
                fmt.Sprintf("no available MIG profile %s", gpuRequest.MIGProfile))
        }
    }

    // 检查拓扑约束
    if gpuRequest.TopologyConstraint != "" {
        if !g.checkTopologyConstraint(nodeGPUInfo, gpuRequest) {
            return framework.NewStatus(framework.Unschedulable,
                fmt.Sprintf("topology constraint %s cannot be satisfied",
                    gpuRequest.TopologyConstraint))
        }
    }

    return framework.NewStatus(framework.Success)
}

// checkMIGAvailability 检查 MIG 可用性
func (g *GPUSchedulerPlugin) checkMIGAvailability(info *NodeGPUInfo,
    request *GPURequest) bool {

    availableCount := int64(0)

    for _, gpu := range info.GPUs {
        if !gpu.MIGEnabled {
            continue
        }
        for _, mig := range gpu.MIGDevices {
            if mig.Profile == request.MIGProfile && mig.State == DeviceStateFree {
                availableCount++
            }
        }
    }

    return availableCount >= request.Count
}

// checkTopologyConstraint 检查拓扑约束
func (g *GPUSchedulerPlugin) checkTopologyConstraint(info *NodeGPUInfo,
    request *GPURequest) bool {

    availableGPUs := make([]int, 0)
    for i, gpu := range info.GPUs {
        if gpu.State == DeviceStateFree && gpu.Health == DeviceHealthy {
            availableGPUs = append(availableGPUs, i)
        }
    }

    if int64(len(availableGPUs)) < request.Count {
        return false
    }

    switch request.TopologyConstraint {
    case "single":
        // 单 GPU,任何节点都满足
        return request.Count == 1

    case "nvlink":
        // 需要 NVLINK 连接的 GPU
        return g.findNVLinkConnectedGPUs(info.Topology, availableGPUs, int(request.Count)) != nil

    case "same-numa":
        // 需要同一 NUMA 节点的 GPU
        return g.findSameNUMAGPUs(info.Topology, availableGPUs, int(request.Count)) != nil

    case "any":
        return true

    default:
        return true
    }
}

// findNVLinkConnectedGPUs 找到 NVLINK 连接的 GPU 组合
func (g *GPUSchedulerPlugin) findNVLinkConnectedGPUs(topo *GPUTopology,
    available []int, count int) []int {

    if topo == nil || len(topo.Links) == 0 {
        return nil
    }

    // 构建 NVLINK 连接图
    nvlinkGraph := make(map[int][]int)
    for _, i := range available {
        for _, j := range available {
            if i != j && topo.Links[i][j] == LinkTypeNVLink {
                nvlinkGraph[i] = append(nvlinkGraph[i], j)
            }
        }
    }

    // 使用 DFS 找到连通分量
    visited := make(map[int]bool)
    var result []int

    var dfs func(node int, group []int) []int
    dfs = func(node int, group []int) []int {
        if len(group) >= count {
            return group
        }

        visited[node] = true
        group = append(group, node)

        for _, neighbor := range nvlinkGraph[node] {
            if !visited[neighbor] {
                if found := dfs(neighbor, group); len(found) >= count {
                    return found
                }
            }
        }

        return group
    }

    for _, gpu := range available {
        if !visited[gpu] {
            visited = make(map[int]bool)
            if found := dfs(gpu, nil); len(found) >= count {
                return found[:count]
            }
        }
    }

    return nil
}

// findSameNUMAGPUs 找到同一 NUMA 节点的 GPU
func (g *GPUSchedulerPlugin) findSameNUMAGPUs(topo *GPUTopology,
    available []int, count int) []int {

    if topo == nil || len(topo.NUMANodes) == 0 {
        return nil
    }

    // 按 NUMA 节点分组
    numaGroups := make(map[int][]int)
    for _, gpu := range available {
        numa := topo.NUMANodes[gpu]
        numaGroups[numa] = append(numaGroups[numa], gpu)
    }

    // 找到满足数量要求的 NUMA 组
    for _, gpus := range numaGroups {
        if len(gpus) >= count {
            return gpus[:count]
        }
    }

    return nil
}

3.3 Score 插件

// Score 为节点打分
func (g *GPUSchedulerPlugin) Score(ctx context.Context, state *framework.CycleState,
    pod *v1.Pod, nodeName string) (int64, *framework.Status) {

    // 获取 GPU 请求
    gpuRequestData, err := state.Read(StateKeyGPURequest)
    if err != nil {
        return 0, framework.NewStatus(framework.Success)
    }

    gpuRequest := gpuRequestData.(*GPURequest)
    if gpuRequest.Count == 0 {
        return 0, framework.NewStatus(framework.Success)
    }

    nodeGPUInfo, ok := g.stateManager.GetNodeGPUInfo(nodeName)
    if !ok {
        return 0, framework.NewStatus(framework.Success)
    }

    score := int64(0)

    // 1. 拓扑得分(0-40分)
    topoScore := g.calculateTopologyScore(nodeGPUInfo, gpuRequest)
    score += topoScore

    // 2. 碎片化得分(0-30分)
    fragScore := g.calculateFragmentationScore(nodeGPUInfo, gpuRequest)
    score += fragScore

    // 3. 负载均衡得分(0-20分)
    balanceScore := g.calculateBalanceScore(nodeGPUInfo)
    score += balanceScore

    // 4. 型号匹配得分(0-10分)
    modelScore := g.calculateModelScore(nodeGPUInfo, gpuRequest)
    score += modelScore

    return score, framework.NewStatus(framework.Success)
}

// ScoreExtensions 返回得分扩展
func (g *GPUSchedulerPlugin) ScoreExtensions() framework.ScoreExtensions {
    return g
}

// NormalizeScore 归一化得分到 [0, 100]
func (g *GPUSchedulerPlugin) NormalizeScore(ctx context.Context, state *framework.CycleState,
    pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {

    // 找到最大分数
    var maxScore int64 = 0
    for _, score := range scores {
        if score.Score > maxScore {
            maxScore = score.Score
        }
    }

    // 归一化
    if maxScore > 0 {
        for i := range scores {
            scores[i].Score = scores[i].Score * framework.MaxNodeScore / maxScore
        }
    }

    return framework.NewStatus(framework.Success)
}

// calculateTopologyScore 计算拓扑得分
func (g *GPUSchedulerPlugin) calculateTopologyScore(info *NodeGPUInfo,
    request *GPURequest) int64 {

    if info.Topology == nil || request.Count <= 1 {
        return 20 // 单 GPU 或无拓扑信息,给中等分数
    }

    availableGPUs := make([]int, 0)
    for i, gpu := range info.GPUs {
        if gpu.State == DeviceStateFree && gpu.Health == DeviceHealthy {
            availableGPUs = append(availableGPUs, i)
        }
    }

    score := int64(0)

    // 检查 NVLINK 连接
    if request.PreferNVLink || request.TopologyConstraint == "nvlink" {
        nvlinkGPUs := g.findNVLinkConnectedGPUs(info.Topology, availableGPUs, int(request.Count))
        if nvlinkGPUs != nil {
            score += 40 // 满分
        } else {
            score += 10 // 无法满足 NVLINK,给低分
        }
    } else {
        // 即使没有要求,有 NVLINK 也加分
        nvlinkGPUs := g.findNVLinkConnectedGPUs(info.Topology, availableGPUs, int(request.Count))
        if nvlinkGPUs != nil {
            score += 30
        } else {
            score += 20
        }
    }

    // 检查 NUMA 亲和性
    if request.PreferSameNUMA || request.TopologyConstraint == "same-numa" {
        sameNUMAGPUs := g.findSameNUMAGPUs(info.Topology, availableGPUs, int(request.Count))
        if sameNUMAGPUs != nil {
            score += 10 // 额外加分
        }
    }

    return score
}

// calculateFragmentationScore 计算碎片化得分
// 目标:避免留下难以利用的 GPU 碎片
func (g *GPUSchedulerPlugin) calculateFragmentationScore(info *NodeGPUInfo,
    request *GPURequest) int64 {

    // 统计可用 GPU 数量
    availableCount := int64(0)
    for _, gpu := range info.GPUs {
        if gpu.State == DeviceStateFree && gpu.Health == DeviceHealthy {
            availableCount++
        }
    }

    totalGPUs := int64(len(info.GPUs))
    remaining := availableCount - request.Count

    // 策略:
    // 1. 优先选择能完全使用的节点(remaining = 0)
    // 2. 次选剩余 GPU 较多的节点(避免碎片)
    // 3. 避免留下 1 个 GPU 的情况(很难利用)

    if remaining == 0 {
        return 30 // 完美匹配
    }

    if remaining == 1 {
        return 5 // 留下 1 个 GPU,不太好
    }

    if remaining >= totalGPUs/2 {
        return 25 // 还有一半以上可用
    }

    return 15 // 中等
}

// calculateBalanceScore 计算负载均衡得分
func (g *GPUSchedulerPlugin) calculateBalanceScore(info *NodeGPUInfo) int64 {
    usedCount := 0
    totalCount := len(info.GPUs)

    for _, gpu := range info.GPUs {
        if gpu.State != DeviceStateFree {
            usedCount++
        }
    }

    // 计算使用率
    utilization := float64(usedCount) / float64(totalCount)

    // 偏好使用率较高的节点(但不是满载)
    // 这样可以让部分节点完全空闲,便于调度大任务
    if utilization >= 0.8 {
        return 10 // 接近满载,降低优先级
    }
    if utilization >= 0.5 {
        return 20 // 中等使用率,最优
    }
    if utilization >= 0.2 {
        return 15 // 使用率较低
    }
    return 10 // 几乎空闲
}

// calculateModelScore 计算 GPU 型号匹配得分
func (g *GPUSchedulerPlugin) calculateModelScore(info *NodeGPUInfo,
    request *GPURequest) int64 {

    if request.Model == "" {
        return 5 // 没有型号要求,给基础分
    }

    matchCount := 0
    for _, gpu := range info.GPUs {
        if gpu.Model == request.Model && gpu.State == DeviceStateFree {
            matchCount++
        }
    }

    if int64(matchCount) >= request.Count {
        return 10 // 满足型号要求
    }

    return 0
}

3.4 Reserve 和 Bind 插件

// Reserve 预留资源
func (g *GPUSchedulerPlugin) Reserve(ctx context.Context, state *framework.CycleState,
    pod *v1.Pod, nodeName string) *framework.Status {

    gpuRequestData, err := state.Read(StateKeyGPURequest)
    if err != nil {
        return framework.NewStatus(framework.Success)
    }

    gpuRequest := gpuRequestData.(*GPURequest)
    if gpuRequest.Count == 0 {
        return framework.NewStatus(framework.Success)
    }

    // 选择要分配的 GPU
    selectedGPUs, err := g.selectGPUs(nodeName, gpuRequest)
    if err != nil {
        return framework.NewStatus(framework.Unschedulable,
            fmt.Sprintf("failed to select GPUs: %v", err))
    }

    // 预留 GPU
    podName := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
    if err := g.stateManager.ReserveGPUs(nodeName, selectedGPUs, podName); err != nil {
        return framework.NewStatus(framework.Unschedulable,
            fmt.Sprintf("failed to reserve GPUs: %v", err))
    }

    // 将选择的 GPU 存入状态
    state.Write("SelectedGPUs", selectedGPUs)

    return framework.NewStatus(framework.Success)
}

// Unreserve 取消预留
func (g *GPUSchedulerPlugin) Unreserve(ctx context.Context, state *framework.CycleState,
    pod *v1.Pod, nodeName string) {

    selectedGPUsData, err := state.Read("SelectedGPUs")
    if err != nil {
        return
    }

    selectedGPUs := selectedGPUsData.([]int)
    g.stateManager.UnreserveGPUs(nodeName, selectedGPUs)
}

// selectGPUs 选择要分配的 GPU
func (g *GPUSchedulerPlugin) selectGPUs(nodeName string, request *GPURequest) ([]int, error) {
    info, ok := g.stateManager.GetNodeGPUInfo(nodeName)
    if !ok {
        return nil, fmt.Errorf("node %s not found", nodeName)
    }

    availableGPUs := make([]int, 0)
    for i, gpu := range info.GPUs {
        if gpu.State == DeviceStateFree && gpu.Health == DeviceHealthy {
            // 检查型号
            if request.Model != "" && gpu.Model != request.Model {
                continue
            }
            // 检查显存
            if request.MinMemory > 0 && gpu.MemoryTotal < request.MinMemory {
                continue
            }
            availableGPUs = append(availableGPUs, i)
        }
    }

    count := int(request.Count)

    // 根据拓扑约束选择
    switch request.TopologyConstraint {
    case "nvlink":
        selected := g.findNVLinkConnectedGPUs(info.Topology, availableGPUs, count)
        if selected != nil {
            return selected, nil
        }
        return nil, fmt.Errorf("cannot find NVLINK connected GPUs")

    case "same-numa":
        selected := g.findSameNUMAGPUs(info.Topology, availableGPUs, count)
        if selected != nil {
            return selected, nil
        }
        return nil, fmt.Errorf("cannot find GPUs in same NUMA")

    default:
        // 优先选择 NVLINK 连接的 GPU
        if request.PreferNVLink && info.Topology != nil {
            selected := g.findNVLinkConnectedGPUs(info.Topology, availableGPUs, count)
            if selected != nil {
                return selected, nil
            }
        }

        // 优先选择同一 NUMA 的 GPU
        if request.PreferSameNUMA && info.Topology != nil {
            selected := g.findSameNUMAGPUs(info.Topology, availableGPUs, count)
            if selected != nil {
                return selected, nil
            }
        }

        // 默认:选择前 N 个
        if len(availableGPUs) >= count {
            return availableGPUs[:count], nil
        }

        return nil, fmt.Errorf("insufficient GPUs")
    }
}

4. 调度器部署与配置

4.1 调度器配置文件

# scheduler-config.yaml
apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
leaderElection:
  leaderElect: true
  resourceNamespace: kube-system
  resourceName: gpu-scheduler
clientConnection:
  kubeconfig: /etc/kubernetes/scheduler.conf
profiles:
  - schedulerName: gpu-scheduler
    plugins:
      # 在默认插件之后添加 GPU 插件
      preFilter:
        enabled:
          - name: GPUScheduler
      filter:
        enabled:
          - name: GPUScheduler
      preScore:
        enabled:
          - name: GPUScheduler
      score:
        enabled:
          - name: GPUScheduler
            weight: 10
      reserve:
        enabled:
          - name: GPUScheduler
      preBind:
        enabled:
          - name: GPUScheduler
    pluginConfig:
      - name: GPUScheduler
        args:
          # GPU 状态同步间隔
          syncInterval: 30s
          # 是否启用拓扑感知
          topologyAware: true
          # 碎片化策略
          fragmentationPolicy: "minimize"
          # 默认偏好
          defaultPreferences:
            preferNVLink: true
            preferSameNUMA: true

4.2 部署 YAML

# gpu-scheduler-deployment.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: gpu-scheduler
  namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: gpu-scheduler
rules:
  - apiGroups: [""]
    resources: ["pods"]
    verbs: ["get", "list", "watch", "update", "patch"]
  - apiGroups: [""]
    resources: ["pods/binding"]
    verbs: ["create"]
  - apiGroups: [""]
    resources: ["pods/status"]
    verbs: ["update", "patch"]
  - apiGroups: [""]
    resources: ["nodes"]
    verbs: ["get", "list", "watch"]
  - apiGroups: [""]
    resources: ["events"]
    verbs: ["create", "patch", "update"]
  - apiGroups: ["coordination.k8s.io"]
    resources: ["leases"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  # GPU 相关 CRD
  - apiGroups: ["gpu.resource.io"]
    resources: ["nodegpuinfos", "gpuallocations"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: gpu-scheduler
subjects:
  - kind: ServiceAccount
    name: gpu-scheduler
    namespace: kube-system
roleRef:
  kind: ClusterRole
  name: gpu-scheduler
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: gpu-scheduler
  namespace: kube-system
  labels:
    component: gpu-scheduler
spec:
  replicas: 2  # HA 部署
  selector:
    matchLabels:
      component: gpu-scheduler
  template:
    metadata:
      labels:
        component: gpu-scheduler
    spec:
      serviceAccountName: gpu-scheduler
      priorityClassName: system-cluster-critical
      nodeSelector:
        node-role.kubernetes.io/control-plane: ""
      tolerations:
        - key: node-role.kubernetes.io/control-plane
          effect: NoSchedule
      containers:
        - name: gpu-scheduler
          image: gpu-scheduler:v1.0.0
          imagePullPolicy: IfNotPresent
          command:
            - /gpu-scheduler
            - --config=/etc/kubernetes/scheduler-config.yaml
            - --v=4
          resources:
            requests:
              cpu: 200m
              memory: 256Mi
            limits:
              cpu: "1"
              memory: 1Gi
          livenessProbe:
            httpGet:
              path: /healthz
              port: 10259
              scheme: HTTPS
            initialDelaySeconds: 15
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /healthz
              port: 10259
              scheme: HTTPS
            initialDelaySeconds: 5
            periodSeconds: 10
          volumeMounts:
            - name: config
              mountPath: /etc/kubernetes
              readOnly: true
      volumes:
        - name: config
          configMap:
            name: gpu-scheduler-config
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: gpu-scheduler-config
  namespace: kube-system
data:
  scheduler-config.yaml: |
    apiVersion: kubescheduler.config.k8s.io/v1
    kind: KubeSchedulerConfiguration
    leaderElection:
      leaderElect: true
      resourceNamespace: kube-system
      resourceName: gpu-scheduler
    profiles:
      - schedulerName: gpu-scheduler
        plugins:
          preFilter:
            enabled:
              - name: GPUScheduler
          filter:
            enabled:
              - name: GPUScheduler
          score:
            enabled:
              - name: GPUScheduler
                weight: 10
          reserve:
            enabled:
              - name: GPUScheduler

4.3 使用自定义调度器

# gpu-training-job.yaml
apiVersion: v1
kind: Pod
metadata:
  name: gpu-training
  namespace: ml-workloads
  annotations:
    # 指定 GPU 型号
    gpu.scheduler/model: "A100-SXM4-80GB"
    # 最小显存需求
    gpu.scheduler/memory: "40000"
    # 拓扑约束:需要 NVLINK 连接
    gpu.scheduler/topology: "nvlink"
    # 偏好同一 NUMA
    gpu.scheduler/prefer-same-numa: "true"
spec:
  # 使用自定义调度器
  schedulerName: gpu-scheduler
  containers:
    - name: training
      image: pytorch/pytorch:2.0-cuda12.1-cudnn8-runtime
      command: ["python", "train.py"]
      resources:
        limits:
          nvidia.com/gpu: 4
        requests:
          nvidia.com/gpu: 4
          cpu: "8"
          memory: 64Gi
      env:
        - name: NCCL_DEBUG
          value: INFO
        - name: NCCL_IB_DISABLE
          value: "0"
      volumeMounts:
        - name: data
          mountPath: /data
  volumes:
    - name: data
      persistentVolumeClaim:
        claimName: training-data
  restartPolicy: Never

5. GPU 信息收集器

5.1 Node GPU Info CRD

# gpu-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: nodegpuinfos.gpu.resource.io
spec:
  group: gpu.resource.io
  versions:
    - name: v1alpha1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                nodeName:
                  type: string
                driverVersion:
                  type: string
                cudaVersion:
                  type: string
                gpus:
                  type: array
                  items:
                    type: object
                    properties:
                      uuid:
                        type: string
                      index:
                        type: integer
                      model:
                        type: string
                      memoryTotal:
                        type: integer
                      computeCapability:
                        type: string
                      migEnabled:
                        type: boolean
                      pcieInfo:
                        type: object
                        properties:
                          busId:
                            type: string
                          numaNode:
                            type: integer
                          generation:
                            type: integer
                          linkWidth:
                            type: integer
                      nvlinkPeers:
                        type: array
                        items:
                          type: integer
                topology:
                  type: object
                  properties:
                    links:
                      type: array
                      items:
                        type: array
                        items:
                          type: string
                    numaNodes:
                      type: array
                      items:
                        type: integer
            status:
              type: object
              properties:
                lastUpdated:
                  type: string
                  format: date-time
                gpuStates:
                  type: array
                  items:
                    type: object
                    properties:
                      index:
                        type: integer
                      state:
                        type: string
                      health:
                        type: string
                      allocatedPod:
                        type: string
                      memoryUsed:
                        type: integer
                      utilization:
                        type: integer
  scope: Cluster
  names:
    plural: nodegpuinfos
    singular: nodegpuinfo
    kind: NodeGPUInfo
    shortNames:
      - ngi

5.2 GPU 信息收集 DaemonSet

// cmd/gpu-collector/main.go
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "os"
    "time"

    "github.com/NVIDIA/go-nvml/pkg/nvml"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
)

type GPUCollector struct {
    nodeName   string
    kubeClient kubernetes.Interface
    interval   time.Duration
}

func NewGPUCollector() (*GPUCollector, error) {
    // 获取节点名
    nodeName := os.Getenv("NODE_NAME")
    if nodeName == "" {
        return nil, fmt.Errorf("NODE_NAME env not set")
    }

    // 创建 k8s 客户端
    config, err := rest.InClusterConfig()
    if err != nil {
        return nil, fmt.Errorf("failed to get k8s config: %v", err)
    }

    client, err := kubernetes.NewForConfig(config)
    if err != nil {
        return nil, fmt.Errorf("failed to create k8s client: %v", err)
    }

    return &GPUCollector{
        nodeName:   nodeName,
        kubeClient: client,
        interval:   30 * time.Second,
    }, nil
}

func (c *GPUCollector) Run(ctx context.Context) error {
    // 初始化 NVML
    ret := nvml.Init()
    if ret != nvml.SUCCESS {
        return fmt.Errorf("failed to initialize NVML: %v", nvml.ErrorString(ret))
    }
    defer nvml.Shutdown()

    ticker := time.NewTicker(c.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            if err := c.collectAndUpdate(); err != nil {
                fmt.Printf("Failed to collect GPU info: %v\n", err)
            }
        }
    }
}

func (c *GPUCollector) collectAndUpdate() error {
    info, err := c.collectGPUInfo()
    if err != nil {
        return err
    }

    return c.updateNodeGPUInfo(info)
}

func (c *GPUCollector) collectGPUInfo() (*NodeGPUInfo, error) {
    // 获取驱动版本
    driverVersion, ret := nvml.SystemGetDriverVersion()
    if ret != nvml.SUCCESS {
        return nil, fmt.Errorf("failed to get driver version: %v", nvml.ErrorString(ret))
    }

    // 获取 CUDA 版本
    cudaVersion, ret := nvml.SystemGetCudaDriverVersion_v2()
    if ret != nvml.SUCCESS {
        return nil, fmt.Errorf("failed to get CUDA version: %v", nvml.ErrorString(ret))
    }

    // 获取 GPU 数量
    count, ret := nvml.DeviceGetCount()
    if ret != nvml.SUCCESS {
        return nil, fmt.Errorf("failed to get device count: %v", nvml.ErrorString(ret))
    }

    info := &NodeGPUInfo{
        NodeName:      c.nodeName,
        DriverVersion: driverVersion,
        CUDAVersion:   fmt.Sprintf("%d.%d", cudaVersion/1000, (cudaVersion%1000)/10),
        GPUs:          make([]GPUDevice, count),
    }

    // 收集每个 GPU 的信息
    for i := 0; i < count; i++ {
        device, ret := nvml.DeviceGetHandleByIndex(i)
        if ret != nvml.SUCCESS {
            continue
        }

        info.GPUs[i] = c.collectDeviceInfo(device, i)
    }

    // 收集拓扑信息
    info.Topology = c.collectTopology(count)

    return info, nil
}

func (c *GPUCollector) collectDeviceInfo(device nvml.Device, index int) GPUDevice {
    gpu := GPUDevice{Index: index}

    // UUID
    uuid, ret := device.GetUUID()
    if ret == nvml.SUCCESS {
        gpu.UUID = uuid
    }

    // 型号
    name, ret := device.GetName()
    if ret == nvml.SUCCESS {
        gpu.Model = name
    }

    // 显存
    memory, ret := device.GetMemoryInfo()
    if ret == nvml.SUCCESS {
        gpu.MemoryTotal = int64(memory.Total / 1024 / 1024) // MB
        gpu.MemoryFree = int64(memory.Free / 1024 / 1024)
    }

    // 计算能力
    major, minor, ret := device.GetCudaComputeCapability()
    if ret == nvml.SUCCESS {
        gpu.ComputeCapability = fmt.Sprintf("%d.%d", major, minor)
    }

    // MIG 状态
    mode, _, ret := device.GetMigMode()
    if ret == nvml.SUCCESS {
        gpu.MIGEnabled = mode == nvml.DEVICE_MIG_ENABLE
    }

    // 如果 MIG 启用,收集 MIG 设备
    if gpu.MIGEnabled {
        gpu.MIGDevices = c.collectMIGDevices(device)
    }

    // PCIe 信息
    pciInfo, ret := device.GetPciInfo()
    if ret == nvml.SUCCESS {
        gpu.PCIeInfo = PCIeInfo{
            BusID: fmt.Sprintf("%04x:%02x:%02x.0",
                pciInfo.Domain, pciInfo.Bus, pciInfo.Device),
        }
    }

    // NUMA 节点
    // 注意:需要通过系统文件获取
    numaNode, err := c.getNUMANode(gpu.PCIeInfo.BusID)
    if err == nil {
        gpu.PCIeInfo.NUMANode = numaNode
    }

    // 健康状态
    gpu.Health = DeviceHealthy

    return gpu
}

func (c *GPUCollector) collectMIGDevices(device nvml.Device) []MIGDevice {
    var migDevices []MIGDevice

    // 获取最大 GPU 实例数
    maxGI, ret := device.GetMaxMigDeviceCount()
    if ret != nvml.SUCCESS || maxGI == 0 {
        return migDevices
    }

    // 枚举 MIG 设备
    for i := 0; i < maxGI; i++ {
        migDevice, ret := device.GetMigDeviceHandleByIndex(i)
        if ret != nvml.SUCCESS {
            continue
        }

        mig := MIGDevice{
            GIIndex: i,
        }

        uuid, ret := migDevice.GetUUID()
        if ret == nvml.SUCCESS {
            mig.UUID = uuid
        }

        // 获取 profile 信息
        attr, ret := migDevice.GetAttributes()
        if ret == nvml.SUCCESS {
            mig.Profile = fmt.Sprintf("%dg.%dgb",
                attr.GpuInstanceSliceCount,
                attr.MemorySizeMB/1024)
        }

        mig.State = DeviceStateFree
        migDevices = append(migDevices, mig)
    }

    return migDevices
}

func (c *GPUCollector) collectTopology(gpuCount int) *GPUTopology {
    topo := &GPUTopology{
        NodeName:  c.nodeName,
        Links:     make([][]LinkType, gpuCount),
        NUMANodes: make([]int, gpuCount),
    }

    for i := 0; i < gpuCount; i++ {
        topo.Links[i] = make([]LinkType, gpuCount)

        device1, ret := nvml.DeviceGetHandleByIndex(i)
        if ret != nvml.SUCCESS {
            continue
        }

        for j := 0; j < gpuCount; j++ {
            if i == j {
                topo.Links[i][j] = LinkTypeSameBoard
                continue
            }

            device2, ret := nvml.DeviceGetHandleByIndex(j)
            if ret != nvml.SUCCESS {
                topo.Links[i][j] = LinkTypeNone
                continue
            }

            // 获取 GPU 间的拓扑关系
            pathInfo, ret := nvml.DeviceGetTopologyCommonAncestor(device1, device2)
            if ret != nvml.SUCCESS {
                topo.Links[i][j] = LinkTypeNone
                continue
            }

            switch pathInfo {
            case nvml.TOPOLOGY_INTERNAL:
                topo.Links[i][j] = LinkTypeSameBoard
            case nvml.TOPOLOGY_SINGLE:
                // 检查是否有 NVLINK
                nvlinkCap, ret := device1.GetNvLinkCapability(uint32(j), nvml.NVLINK_CAP_P2P_SUPPORTED)
                if ret == nvml.SUCCESS && nvlinkCap > 0 {
                    topo.Links[i][j] = LinkTypeNVLink
                } else {
                    topo.Links[i][j] = LinkTypePCIe
                }
            case nvml.TOPOLOGY_MULTIPLE:
                topo.Links[i][j] = LinkTypePCIe
            case nvml.TOPOLOGY_HOSTBRIDGE:
                topo.Links[i][j] = LinkTypePCIe
            case nvml.TOPOLOGY_NODE:
                topo.Links[i][j] = LinkTypePCIe
            case nvml.TOPOLOGY_SYSTEM:
                topo.Links[i][j] = LinkTypePCIe
            default:
                topo.Links[i][j] = LinkTypeNone
            }
        }
    }

    return topo
}

func (c *GPUCollector) getNUMANode(busID string) (int, error) {
    // 读取 /sys/bus/pci/devices/{busID}/numa_node
    path := fmt.Sprintf("/sys/bus/pci/devices/%s/numa_node", busID)
    data, err := os.ReadFile(path)
    if err != nil {
        return 0, err
    }

    var numa int
    fmt.Sscanf(string(data), "%d", &numa)
    return numa, nil
}

func (c *GPUCollector) updateNodeGPUInfo(info *NodeGPUInfo) error {
    // 这里应该更新 CRD,简化示例使用 ConfigMap
    data, err := json.Marshal(info)
    if err != nil {
        return err
    }

    cm := &corev1.ConfigMap{
        ObjectMeta: metav1.ObjectMeta{
            Name:      fmt.Sprintf("gpu-info-%s", c.nodeName),
            Namespace: "kube-system",
            Labels: map[string]string{
                "app":  "gpu-collector",
                "node": c.nodeName,
            },
        },
        Data: map[string]string{
            "gpu-info": string(data),
        },
    }

    _, err = c.kubeClient.CoreV1().ConfigMaps("kube-system").Update(
        context.Background(), cm, metav1.UpdateOptions{})
    if err != nil {
        // 如果不存在则创建
        _, err = c.kubeClient.CoreV1().ConfigMaps("kube-system").Create(
            context.Background(), cm, metav1.CreateOptions{})
    }

    return err
}

func main() {
    collector, err := NewGPUCollector()
    if err != nil {
        fmt.Printf("Failed to create collector: %v\n", err)
        os.Exit(1)
    }

    ctx := context.Background()
    if err := collector.Run(ctx); err != nil {
        fmt.Printf("Collector error: %v\n", err)
        os.Exit(1)
    }
}

5.3 部署收集器

# gpu-collector-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: gpu-collector
  namespace: kube-system
spec:
  selector:
    matchLabels:
      app: gpu-collector
  template:
    metadata:
      labels:
        app: gpu-collector
    spec:
      serviceAccountName: gpu-collector
      nodeSelector:
        nvidia.com/gpu.present: "true"
      tolerations:
        - key: nvidia.com/gpu
          operator: Exists
          effect: NoSchedule
      containers:
        - name: collector
          image: gpu-collector:v1.0.0
          env:
            - name: NODE_NAME
              valueFrom:
                fieldRef:
                  fieldPath: spec.nodeName
          securityContext:
            privileged: true
          volumeMounts:
            - name: device-plugins
              mountPath: /var/lib/kubelet/device-plugins
            - name: sys
              mountPath: /sys
              readOnly: true
          resources:
            requests:
              cpu: 50m
              memory: 64Mi
            limits:
              cpu: 200m
              memory: 256Mi
      volumes:
        - name: device-plugins
          hostPath:
            path: /var/lib/kubelet/device-plugins
        - name: sys
          hostPath:
            path: /sys

6. 调度器扩展:Scheduler Extender

除了使用 Scheduling Framework,还可以通过 Scheduler Extender 实现外部扩展:

6.1 Extender 架构

┌──────────────────────────────────────────────────────────────────────┐
│                    Scheduler Extender Architecture                    │
├──────────────────────────────────────────────────────────────────────┤
│                                                                       │
│  ┌─────────────────┐                    ┌─────────────────────────┐  │
│  │                 │  HTTP Webhook      │                         │  │
│  │  kube-scheduler │◄──────────────────▶│   GPU Scheduler         │  │
│  │                 │                    │   Extender              │  │
│  │  (default)      │                    │                         │  │
│  └────────┬────────┘                    │  - Filter               │  │
│           │                             │  - Prioritize           │  │
│           │                             │  - Bind                 │  │
│           │                             │                         │  │
│           ▼                             └───────────┬─────────────┘  │
│  ┌─────────────────┐                               │                 │
│  │                 │                               │                 │
│  │   Pod Queue     │                               ▼                 │
│  │                 │                    ┌─────────────────────────┐  │
│  └─────────────────┘                    │    GPU State Store      │  │
│                                         │                         │  │
│                                         │  - Node GPU Info        │  │
│                                         │  - Allocation Cache     │  │
│                                         │  - Topology Data        │  │
│                                         └─────────────────────────┘  │
│                                                                       │
└──────────────────────────────────────────────────────────────────────┘

6.2 Extender 实现

// pkg/extender/server.go
package extender

import (
    "encoding/json"
    "net/http"

    extenderv1 "k8s.io/kube-scheduler/extender/v1"
)

type GPUExtender struct {
    stateManager *GPUStateManager
}

func NewGPUExtender(stateManager *GPUStateManager) *GPUExtender {
    return &GPUExtender{stateManager: stateManager}
}

// Filter 过滤节点
func (e *GPUExtender) Filter(w http.ResponseWriter, r *http.Request) {
    var args extenderv1.ExtenderArgs
    if err := json.NewDecoder(r.Body).Decode(&args); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    result := &extenderv1.ExtenderFilterResult{
        Nodes:       &v1.NodeList{},
        FailedNodes: make(extenderv1.FailedNodesMap),
    }

    // 解析 GPU 请求
    gpuRequest := parseGPURequestFromPod(args.Pod)
    if gpuRequest.Count == 0 {
        // 不需要 GPU,所有节点都通过
        result.Nodes = args.Nodes
        writeResponse(w, result)
        return
    }

    // 过滤节点
    for _, node := range args.Nodes.Items {
        if err := e.checkNode(&node, gpuRequest); err != nil {
            result.FailedNodes[node.Name] = err.Error()
        } else {
            result.Nodes.Items = append(result.Nodes.Items, node)
        }
    }

    writeResponse(w, result)
}

// Prioritize 节点打分
func (e *GPUExtender) Prioritize(w http.ResponseWriter, r *http.Request) {
    var args extenderv1.ExtenderArgs
    if err := json.NewDecoder(r.Body).Decode(&args); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    result := make(extenderv1.HostPriorityList, len(args.Nodes.Items))

    gpuRequest := parseGPURequestFromPod(args.Pod)

    for i, node := range args.Nodes.Items {
        score := e.scoreNode(&node, gpuRequest)
        result[i] = extenderv1.HostPriority{
            Host:  node.Name,
            Score: score,
        }
    }

    writeResponse(w, result)
}

// Bind 绑定 Pod
func (e *GPUExtender) Bind(w http.ResponseWriter, r *http.Request) {
    var args extenderv1.ExtenderBindingArgs
    if err := json.NewDecoder(r.Body).Decode(&args); err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }

    result := &extenderv1.ExtenderBindingResult{}

    // 分配具体的 GPU 设备
    if err := e.allocateGPUs(args.PodName, args.PodNamespace, args.Node); err != nil {
        result.Error = err.Error()
        writeResponse(w, result)
        return
    }

    // 执行绑定
    if err := e.bindPod(args.PodName, args.PodNamespace, args.Node); err != nil {
        result.Error = err.Error()
    }

    writeResponse(w, result)
}

func (e *GPUExtender) checkNode(node *v1.Node, request *GPURequest) error {
    nodeInfo, ok := e.stateManager.GetNodeGPUInfo(node.Name)
    if !ok {
        return fmt.Errorf("no GPU info for node %s", node.Name)
    }

    available := e.stateManager.GetAvailableGPUs(node.Name)
    if int64(len(available)) < request.Count {
        return fmt.Errorf("insufficient GPUs: need %d, have %d",
            request.Count, len(available))
    }

    // 检查型号
    if request.Model != "" {
        matchCount := 0
        for _, gpu := range available {
            if gpu.Model == request.Model {
                matchCount++
            }
        }
        if int64(matchCount) < request.Count {
            return fmt.Errorf("insufficient GPUs with model %s", request.Model)
        }
    }

    return nil
}

func (e *GPUExtender) scoreNode(node *v1.Node, request *GPURequest) int64 {
    if request.Count == 0 {
        return 50
    }

    nodeInfo, ok := e.stateManager.GetNodeGPUInfo(node.Name)
    if !ok {
        return 0
    }

    score := int64(50)

    // 根据可用 GPU 数量调整分数
    available := e.stateManager.GetAvailableGPUs(node.Name)
    totalGPUs := len(nodeInfo.GPUs)

    // 避免碎片化
    remaining := int64(len(available)) - request.Count
    if remaining == 0 {
        score += 30 // 完美匹配
    } else if remaining >= int64(totalGPUs)/2 {
        score += 20
    } else if remaining == 1 {
        score -= 10 // 留下 1 个难利用
    }

    return score
}

func writeResponse(w http.ResponseWriter, result interface{}) {
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(result)
}

// StartServer 启动 HTTP 服务
func (e *GPUExtender) StartServer(addr string) error {
    http.HandleFunc("/filter", e.Filter)
    http.HandleFunc("/prioritize", e.Prioritize)
    http.HandleFunc("/bind", e.Bind)

    return http.ListenAndServe(addr, nil)
}

6.3 Extender 配置

# scheduler-policy.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: scheduler-policy
  namespace: kube-system
data:
  policy.cfg: |
    {
      "kind": "Policy",
      "apiVersion": "v1",
      "extenders": [
        {
          "urlPrefix": "http://gpu-scheduler-extender.kube-system.svc:8888",
          "filterVerb": "filter",
          "prioritizeVerb": "prioritize",
          "bindVerb": "bind",
          "weight": 10,
          "enableHttps": false,
          "nodeCacheCapable": false,
          "managedResources": [
            {
              "name": "nvidia.com/gpu",
              "ignoredByScheduler": false
            }
          ],
          "ignorable": false
        }
      ]
    }

7. 调度器监控与调试

7.1 调度指标

// pkg/metrics/metrics.go
package metrics

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    // 调度延迟
    SchedulingLatency = promauto.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "gpu_scheduler_scheduling_duration_seconds",
            Help:    "GPU scheduling latency in seconds",
            Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
        },
        []string{"result", "gpu_model"},
    )

    // 调度结果
    SchedulingAttempts = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "gpu_scheduler_scheduling_attempts_total",
            Help: "Number of scheduling attempts",
        },
        []string{"result", "failure_reason"},
    )

    // GPU 分配
    GPUAllocations = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "gpu_scheduler_gpu_allocations",
            Help: "Number of GPU allocations per node",
        },
        []string{"node", "gpu_model"},
    )

    // GPU 可用数量
    GPUAvailable = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "gpu_scheduler_gpu_available",
            Help: "Number of available GPUs per node",
        },
        []string{"node", "gpu_model"},
    )

    // 拓扑感知调度
    TopologyAwareSchedules = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "gpu_scheduler_topology_aware_schedules_total",
            Help: "Number of topology-aware scheduling decisions",
        },
        []string{"constraint_type", "satisfied"},
    )

    // 碎片化指标
    GPUFragmentation = promauto.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "gpu_scheduler_fragmentation_ratio",
            Help: "GPU fragmentation ratio per node (0-1)",
        },
        []string{"node"},
    )
)

// RecordSchedulingDuration 记录调度耗时
func RecordSchedulingDuration(duration float64, result, gpuModel string) {
    SchedulingLatency.WithLabelValues(result, gpuModel).Observe(duration)
}

// RecordSchedulingAttempt 记录调度尝试
func RecordSchedulingAttempt(result, failureReason string) {
    SchedulingAttempts.WithLabelValues(result, failureReason).Inc()
}

// UpdateGPUMetrics 更新 GPU 指标
func UpdateGPUMetrics(nodeInfo *NodeGPUInfo) {
    allocatedByModel := make(map[string]int)
    availableByModel := make(map[string]int)

    for _, gpu := range nodeInfo.GPUs {
        if gpu.State == DeviceStateFree {
            availableByModel[gpu.Model]++
        } else {
            allocatedByModel[gpu.Model]++
        }
    }

    for model, count := range allocatedByModel {
        GPUAllocations.WithLabelValues(nodeInfo.NodeName, model).Set(float64(count))
    }

    for model, count := range availableByModel {
        GPUAvailable.WithLabelValues(nodeInfo.NodeName, model).Set(float64(count))
    }

    // 计算碎片化比例
    fragRatio := calculateFragmentation(nodeInfo)
    GPUFragmentation.WithLabelValues(nodeInfo.NodeName).Set(fragRatio)
}

// calculateFragmentation 计算碎片化比例
func calculateFragmentation(info *NodeGPUInfo) float64 {
    if len(info.GPUs) == 0 {
        return 0
    }

    // 简单算法:如果剩余 GPU 无法满足常见请求(2、4、8),则视为碎片
    freeCount := 0
    for _, gpu := range info.GPUs {
        if gpu.State == DeviceStateFree {
            freeCount++
        }
    }

    // 如果剩余 1 或 3 个 GPU,碎片化较高
    if freeCount == 1 || freeCount == 3 {
        return 0.7
    }
    if freeCount == 5 || freeCount == 6 || freeCount == 7 {
        return 0.3
    }

    return 0.1
}

7.2 Grafana Dashboard

{
  "dashboard": {
    "title": "GPU Scheduler Dashboard",
    "panels": [
      {
        "title": "Scheduling Latency",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.99, sum(rate(gpu_scheduler_scheduling_duration_seconds_bucket[5m])) by (le, result))",
            "legendFormat": "p99 - {{result}}"
          },
          {
            "expr": "histogram_quantile(0.50, sum(rate(gpu_scheduler_scheduling_duration_seconds_bucket[5m])) by (le, result))",
            "legendFormat": "p50 - {{result}}"
          }
        ]
      },
      {
        "title": "Scheduling Success Rate",
        "type": "stat",
        "targets": [
          {
            "expr": "sum(rate(gpu_scheduler_scheduling_attempts_total{result=\"success\"}[5m])) / sum(rate(gpu_scheduler_scheduling_attempts_total[5m])) * 100",
            "legendFormat": "Success Rate %"
          }
        ]
      },
      {
        "title": "GPU Availability by Node",
        "type": "table",
        "targets": [
          {
            "expr": "gpu_scheduler_gpu_available",
            "format": "table"
          }
        ]
      },
      {
        "title": "GPU Fragmentation",
        "type": "gauge",
        "targets": [
          {
            "expr": "avg(gpu_scheduler_fragmentation_ratio)",
            "legendFormat": "Avg Fragmentation"
          }
        ],
        "options": {
          "maxValue": 1,
          "thresholds": [
            {"value": 0, "color": "green"},
            {"value": 0.3, "color": "yellow"},
            {"value": 0.6, "color": "red"}
          ]
        }
      },
      {
        "title": "Topology-Aware Schedules",
        "type": "piechart",
        "targets": [
          {
            "expr": "sum(gpu_scheduler_topology_aware_schedules_total) by (constraint_type, satisfied)",
            "legendFormat": "{{constraint_type}} - {{satisfied}}"
          }
        ]
      }
    ]
  }
}

7.3 调试命令

# 查看调度器日志
kubectl logs -n kube-system -l component=gpu-scheduler -f

# 查看 Pod 调度事件
kubectl describe pod <pod-name> | grep -A 20 Events

# 检查节点 GPU 信息
kubectl get configmap -n kube-system gpu-info-<node-name> -o jsonpath='{.data.gpu-info}' | jq .

# 模拟调度(dry-run)
kubectl run gpu-test --image=nvidia/cuda:12.1-runtime --dry-run=client \
  --overrides='{"spec":{"schedulerName":"gpu-scheduler","containers":[{"name":"test","resources":{"limits":{"nvidia.com/gpu":"2"}}}]}}' -o yaml

# 查看调度器指标
kubectl port-forward -n kube-system svc/gpu-scheduler-metrics 8080:8080
curl http://localhost:8080/metrics | grep gpu_scheduler

8. 最佳实践

8.1 调度策略选择

场景推荐策略说明
大规模训练NVLINK 优先减少通信开销
推理服务碎片化最小提高资源利用率
混合负载均衡策略兼顾各类任务
MIG 场景Profile 匹配精确资源分配

8.2 性能优化建议

  1. 缓存 GPU 状态:避免频繁查询 NVML
  2. 批量处理:合并多个调度请求
  3. 异步更新:状态收集与调度解耦
  4. 预热缓存:启动时预加载节点信息

8.3 故障处理

# Pod 配置示例:处理调度失败
apiVersion: v1
kind: Pod
metadata:
  name: gpu-job
spec:
  schedulerName: gpu-scheduler
  # 调度超时重试
  tolerations:
    - key: "node.kubernetes.io/unschedulable"
      operator: "Exists"
      effect: "NoSchedule"
  # 优先级(紧急任务优先调度)
  priorityClassName: high-priority
  containers:
    - name: main
      resources:
        limits:
          nvidia.com/gpu: 2
  # 调度失败后的处理
  restartPolicy: OnFailure

总结

本章深入讲解了 Kubernetes GPU 调度器的实现原理:

  1. 调度框架:理解 Scheduling Framework 的扩展点机制
  2. GPU 状态管理:设计数据结构存储 GPU 拓扑和状态
  3. 调度插件:实现 PreFilter、Filter、Score、Reserve 等插件
  4. 拓扑感知:支持 NVLINK、NUMA 等拓扑约束
  5. Extender 模式:通过 Webhook 扩展调度能力
  6. 监控调试:Prometheus 指标和 Grafana 可视化

下一章我们将深入探讨 拓扑感知调度,详细讲解如何利用 GPU 拓扑信息优化分布式训练性能。

Prev
Device Plugin 机制深度解析
Next
拓扑感知调度