HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

拓扑感知调度

概述

在分布式深度学习中,GPU 间的通信带宽直接影响训练性能。不同的 GPU 互联方式(NVLINK、PCIe、跨节点网络)性能差异可达 10 倍以上。拓扑感知调度通过理解硬件拓扑结构,将相关 GPU 任务调度到通信最优的位置,是大规模 AI 训练的关键技术。

1. GPU 拓扑基础

1.1 GPU 互联架构

┌───────────────────────────────────────────────────────────────────────────┐
│                        DGX A100 Server Topology                           │
├───────────────────────────────────────────────────────────────────────────┤
│                                                                           │
│                            ┌─────────────┐                                │
│                            │  NVSwitch   │                                │
│                            │  (6 chips)  │                                │
│                            └──────┬──────┘                                │
│                                   │                                       │
│          ┌────────────────────────┼────────────────────────┐              │
│          │                        │                        │              │
│    ┌─────┴─────┐            ┌─────┴─────┐            ┌─────┴─────┐        │
│    │           │            │           │            │           │        │
│  ┌─┴─┐   ┌─┴─┐ │          ┌─┴─┐   ┌─┴─┐ │          ┌─┴─┐   ┌─┴─┐ │        │
│  │GPU│───│GPU│ │          │GPU│───│GPU│ │          │GPU│───│GPU│ │        │
│  │ 0 │   │ 1 │ │          │ 2 │   │ 3 │ │          │ 4 │   │ 5 │ │        │
│  └─┬─┘   └─┬─┘ │          └─┬─┘   └─┬─┘ │          └─┬─┘   └─┬─┘ │        │
│    │       │   │            │       │   │            │       │   │        │
│    │   NUMA 0  │            │   NUMA 1  │            │   NUMA 2  │        │
│    │       │   │            │       │   │            │       │   │        │
│  ┌─┴───────┴─┐ │          ┌─┴───────┴─┐ │          ┌─┴───────┴─┐ │        │
│  │   CPU 0   │ │          │   CPU 1   │ │          │   CPU 2   │ │        │
│  └───────────┘ │          └───────────┘ │          └───────────┘ │        │
│                │                        │                        │        │
│    ┌─────┐   ┌─┴─┐                    ┌─┴─┐   ┌─────┐           │        │
│    │GPU 6│───│GPU│                    │GPU│───│GPU 7│           │        │
│    └─────┘   │ 6 │                    │ 7 │   └─────┘           │        │
│              └───┘                    └───┘                      │        │
│                                                                  │        │
│  ┌────────────────────────────────────────────────────────────┐ │        │
│  │                       InfiniBand NICs                      │ │        │
│  │   NIC 0 (NUMA 0)    NIC 1 (NUMA 1)    NIC 2 (NUMA 2)      │ │        │
│  └────────────────────────────────────────────────────────────┘ │        │
│                                                                  │        │
└──────────────────────────────────────────────────────────────────┘        │
                                                                            │
                                                                            │
  带宽对比:                                                                 │
  ┌─────────────────────────────────────────────────────────────────────┐   │
  │ 互联类型         │ 带宽 (GB/s)    │ 延迟      │ 适用场景              │   │
  ├─────────────────────────────────────────────────────────────────────┤   │
  │ NVSwitch         │ 600 (双向)     │ < 1μs     │ 大规模并行训练        │   │
  │ NVLINK (A100)    │ 300 (单向)     │ < 1μs     │ GPU直连              │   │
  │ PCIe Gen4        │ 32             │ ~2μs      │ 普通互联              │   │
  │ InfiniBand HDR   │ 25             │ ~2μs      │ 跨节点通信            │   │
  │ 100G Ethernet    │ 12.5           │ ~10μs     │ 数据中心网络          │   │
  └─────────────────────────────────────────────────────────────────────┘   │

1.2 拓扑层次结构

// pkg/topology/types.go
package topology

// TopologyLevel 拓扑层次
type TopologyLevel int

const (
    // TopologyLevelSame 同一设备
    TopologyLevelSame TopologyLevel = iota
    // TopologyLevelNVLink 通过 NVLINK 直连
    TopologyLevelNVLink
    // TopologyLevelNVSwitch 通过 NVSwitch 连接
    TopologyLevelNVSwitch
    // TopologyLevelPIX 通过 PCIe switch 连接
    TopologyLevelPIX
    // TopologyLevelPHB 通过 PCIe Host Bridge 连接
    TopologyLevelPHB
    // TopologyLevelNUMA 跨 NUMA 节点
    TopologyLevelNUMA
    // TopologyLevelSYS 通过系统互联(QPI/UPI)
    TopologyLevelSYS
    // TopologyLevelNode 跨物理节点
    TopologyLevelNode
)

// TopologyLevelBandwidth 各层级典型带宽 (GB/s)
var TopologyLevelBandwidth = map[TopologyLevel]float64{
    TopologyLevelSame:     1000, // 理论无限
    TopologyLevelNVLink:   300,  // NVLINK 3.0
    TopologyLevelNVSwitch: 600,  // NVSwitch 全连接
    TopologyLevelPIX:      32,   // PCIe 4.0 x16
    TopologyLevelPHB:      32,   // PCIe Host Bridge
    TopologyLevelNUMA:     25,   // QPI/UPI
    TopologyLevelSYS:      20,   // 系统总线
    TopologyLevelNode:     25,   // InfiniBand HDR
}

// TopologyMatrix GPU 拓扑矩阵
type TopologyMatrix struct {
    // GPU 数量
    GPUCount int
    // 拓扑矩阵,Levels[i][j] 表示 GPU i 到 GPU j 的拓扑层级
    Levels [][]TopologyLevel
    // NUMA 亲和性
    NUMANodes []int
    // NIC 亲和性
    NICMapping map[int]string
}

// NewTopologyMatrix 创建拓扑矩阵
func NewTopologyMatrix(gpuCount int) *TopologyMatrix {
    levels := make([][]TopologyLevel, gpuCount)
    for i := range levels {
        levels[i] = make([]TopologyLevel, gpuCount)
        for j := range levels[i] {
            if i == j {
                levels[i][j] = TopologyLevelSame
            } else {
                levels[i][j] = TopologyLevelSYS // 默认
            }
        }
    }

    return &TopologyMatrix{
        GPUCount:   gpuCount,
        Levels:     levels,
        NUMANodes:  make([]int, gpuCount),
        NICMapping: make(map[int]string),
    }
}

// GetBandwidth 获取两个 GPU 间的带宽
func (m *TopologyMatrix) GetBandwidth(gpu1, gpu2 int) float64 {
    if gpu1 < 0 || gpu1 >= m.GPUCount || gpu2 < 0 || gpu2 >= m.GPUCount {
        return 0
    }
    return TopologyLevelBandwidth[m.Levels[gpu1][gpu2]]
}

// GetMinBandwidth 获取一组 GPU 的最小带宽
func (m *TopologyMatrix) GetMinBandwidth(gpus []int) float64 {
    if len(gpus) <= 1 {
        return TopologyLevelBandwidth[TopologyLevelSame]
    }

    minBW := TopologyLevelBandwidth[TopologyLevelSame]
    for i := 0; i < len(gpus); i++ {
        for j := i + 1; j < len(gpus); j++ {
            bw := m.GetBandwidth(gpus[i], gpus[j])
            if bw < minBW {
                minBW = bw
            }
        }
    }
    return minBW
}

// GetAverageTopologyLevel 获取平均拓扑层级
func (m *TopologyMatrix) GetAverageTopologyLevel(gpus []int) float64 {
    if len(gpus) <= 1 {
        return float64(TopologyLevelSame)
    }

    sum := 0.0
    count := 0
    for i := 0; i < len(gpus); i++ {
        for j := i + 1; j < len(gpus); j++ {
            sum += float64(m.Levels[gpus[i]][gpus[j]])
            count++
        }
    }
    return sum / float64(count)
}

1.3 拓扑发现

// pkg/topology/discovery.go
package topology

import (
    "fmt"
    "os"
    "path/filepath"
    "strconv"
    "strings"

    "github.com/NVIDIA/go-nvml/pkg/nvml"
)

// TopologyDiscoverer 拓扑发现器
type TopologyDiscoverer struct {
    nvmlInited bool
}

// NewTopologyDiscoverer 创建拓扑发现器
func NewTopologyDiscoverer() (*TopologyDiscoverer, error) {
    ret := nvml.Init()
    if ret != nvml.SUCCESS {
        return nil, fmt.Errorf("failed to initialize NVML: %v", nvml.ErrorString(ret))
    }

    return &TopologyDiscoverer{nvmlInited: true}, nil
}

// Close 关闭发现器
func (d *TopologyDiscoverer) Close() {
    if d.nvmlInited {
        nvml.Shutdown()
    }
}

// Discover 发现 GPU 拓扑
func (d *TopologyDiscoverer) Discover() (*TopologyMatrix, error) {
    count, ret := nvml.DeviceGetCount()
    if ret != nvml.SUCCESS {
        return nil, fmt.Errorf("failed to get device count: %v", nvml.ErrorString(ret))
    }

    matrix := NewTopologyMatrix(count)

    // 发现 GPU 间拓扑关系
    for i := 0; i < count; i++ {
        device1, ret := nvml.DeviceGetHandleByIndex(i)
        if ret != nvml.SUCCESS {
            continue
        }

        // 获取 NUMA 节点
        pciInfo, ret := device1.GetPciInfo()
        if ret == nvml.SUCCESS {
            busID := fmt.Sprintf("%04x:%02x:%02x.0", pciInfo.Domain, pciInfo.Bus, pciInfo.Device)
            numa, err := d.getNUMANode(busID)
            if err == nil {
                matrix.NUMANodes[i] = numa
            }
        }

        for j := i + 1; j < count; j++ {
            device2, ret := nvml.DeviceGetHandleByIndex(j)
            if ret != nvml.SUCCESS {
                continue
            }

            level := d.getTopologyLevel(device1, device2)
            matrix.Levels[i][j] = level
            matrix.Levels[j][i] = level
        }
    }

    // 发现 NIC 亲和性
    d.discoverNICMapping(matrix)

    return matrix, nil
}

// getTopologyLevel 获取两个 GPU 间的拓扑层级
func (d *TopologyDiscoverer) getTopologyLevel(dev1, dev2 nvml.Device) TopologyLevel {
    // 首先检查 NVLINK 连接
    hasNVLink := d.checkNVLinkConnection(dev1, dev2)
    if hasNVLink {
        // 检查是否通过 NVSwitch
        // NVSwitch 环境下所有 GPU 都有 NVLINK 连接
        count, _ := nvml.DeviceGetCount()
        nvlinkCount := 0
        for i := 0; i < count; i++ {
            dev, _ := nvml.DeviceGetHandleByIndex(i)
            if d.checkNVLinkConnection(dev1, dev) {
                nvlinkCount++
            }
        }
        // 如果 GPU 与所有其他 GPU 都有 NVLINK,说明是 NVSwitch
        if nvlinkCount >= count-1 {
            return TopologyLevelNVSwitch
        }
        return TopologyLevelNVLink
    }

    // 使用 NVML 拓扑 API
    pathInfo, ret := nvml.DeviceGetTopologyCommonAncestor(dev1, dev2)
    if ret != nvml.SUCCESS {
        return TopologyLevelSYS
    }

    switch pathInfo {
    case nvml.TOPOLOGY_INTERNAL:
        return TopologyLevelSame
    case nvml.TOPOLOGY_SINGLE:
        return TopologyLevelPIX
    case nvml.TOPOLOGY_MULTIPLE:
        return TopologyLevelPHB
    case nvml.TOPOLOGY_HOSTBRIDGE:
        return TopologyLevelPHB
    case nvml.TOPOLOGY_NODE:
        return TopologyLevelNUMA
    case nvml.TOPOLOGY_SYSTEM:
        return TopologyLevelSYS
    default:
        return TopologyLevelSYS
    }
}

// checkNVLinkConnection 检查 NVLINK 连接
func (d *TopologyDiscoverer) checkNVLinkConnection(dev1, dev2 nvml.Device) bool {
    // 获取 dev2 的索引
    idx2, ret := dev2.GetIndex()
    if ret != nvml.SUCCESS {
        return false
    }

    // 检查 dev1 的所有 NVLINK
    for link := uint32(0); link < 12; link++ { // A100 最多 12 条 NVLINK
        // 获取远端 PCIe 信息
        remotePci, ret := dev1.GetNvLinkRemotePciInfo(link)
        if ret != nvml.SUCCESS {
            continue
        }

        // 通过 PCIe 信息找到对应设备
        remoteDev, ret := nvml.DeviceGetHandleByPciBusId(remotePci.BusIdLegacy[:])
        if ret != nvml.SUCCESS {
            continue
        }

        remoteIdx, ret := remoteDev.GetIndex()
        if ret == nvml.SUCCESS && remoteIdx == idx2 {
            return true
        }
    }

    return false
}

// getNUMANode 获取设备的 NUMA 节点
func (d *TopologyDiscoverer) getNUMANode(busID string) (int, error) {
    // 读取 /sys/bus/pci/devices/{busID}/numa_node
    path := filepath.Join("/sys/bus/pci/devices", busID, "numa_node")
    data, err := os.ReadFile(path)
    if err != nil {
        return 0, err
    }

    numa, err := strconv.Atoi(strings.TrimSpace(string(data)))
    if err != nil {
        return 0, err
    }

    // NUMA -1 表示无亲和性
    if numa < 0 {
        return 0, nil
    }

    return numa, nil
}

// discoverNICMapping 发现 NIC 映射
func (d *TopologyDiscoverer) discoverNICMapping(matrix *TopologyMatrix) {
    // 读取 InfiniBand 设备
    ibDevices, err := filepath.Glob("/sys/class/infiniband/*/device/numa_node")
    if err != nil {
        return
    }

    for _, path := range ibDevices {
        data, err := os.ReadFile(path)
        if err != nil {
            continue
        }

        numa, err := strconv.Atoi(strings.TrimSpace(string(data)))
        if err != nil || numa < 0 {
            continue
        }

        // 提取设备名
        parts := strings.Split(path, "/")
        if len(parts) >= 5 {
            deviceName := parts[4]

            // 为该 NUMA 上的 GPU 关联此 NIC
            for gpuIdx, gpuNuma := range matrix.NUMANodes {
                if gpuNuma == numa {
                    matrix.NICMapping[gpuIdx] = deviceName
                }
            }
        }
    }
}

// PrintTopology 打印拓扑信息
func (d *TopologyDiscoverer) PrintTopology(matrix *TopologyMatrix) {
    fmt.Println("GPU Topology Matrix:")
    fmt.Println("====================")

    // 打印表头
    fmt.Printf("%6s", "")
    for i := 0; i < matrix.GPUCount; i++ {
        fmt.Printf(" GPU%-3d", i)
    }
    fmt.Println()

    // 打印矩阵
    levelNames := map[TopologyLevel]string{
        TopologyLevelSame:     "X",
        TopologyLevelNVLink:   "NVL",
        TopologyLevelNVSwitch: "NVS",
        TopologyLevelPIX:      "PIX",
        TopologyLevelPHB:      "PHB",
        TopologyLevelNUMA:     "NODE",
        TopologyLevelSYS:      "SYS",
    }

    for i := 0; i < matrix.GPUCount; i++ {
        fmt.Printf("GPU%-3d", i)
        for j := 0; j < matrix.GPUCount; j++ {
            name := levelNames[matrix.Levels[i][j]]
            fmt.Printf(" %-5s", name)
        }
        fmt.Printf("  NUMA=%d", matrix.NUMANodes[i])
        if nic, ok := matrix.NICMapping[i]; ok {
            fmt.Printf("  NIC=%s", nic)
        }
        fmt.Println()
    }

    fmt.Println()
    fmt.Println("Legend:")
    fmt.Println("  X    = same device")
    fmt.Println("  NVL  = NVLink")
    fmt.Println("  NVS  = NVSwitch")
    fmt.Println("  PIX  = PCIe switch")
    fmt.Println("  PHB  = PCIe Host Bridge")
    fmt.Println("  NODE = NUMA node")
    fmt.Println("  SYS  = System interconnect")
}

2. 拓扑感知调度算法

2.1 GPU 分配策略

// pkg/scheduler/allocation.go
package scheduler

import (
    "fmt"
    "sort"
)

// AllocationPolicy GPU 分配策略
type AllocationPolicy string

const (
    // PolicyBestFit 最佳适配:选择拓扑最优的 GPU 组合
    PolicyBestFit AllocationPolicy = "BestFit"
    // PolicyFirstFit 首次适配:选择第一个满足条件的组合
    PolicyFirstFit AllocationPolicy = "FirstFit"
    // PolicySpread 分散:尽量分布在不同物理位置
    PolicySpread AllocationPolicy = "Spread"
    // PolicyPack 紧凑:尽量使用同一物理位置
    PolicyPack AllocationPolicy = "Pack"
)

// GPUAllocator GPU 分配器
type GPUAllocator struct {
    topology *TopologyMatrix
    policy   AllocationPolicy
}

// NewGPUAllocator 创建分配器
func NewGPUAllocator(topology *TopologyMatrix, policy AllocationPolicy) *GPUAllocator {
    return &GPUAllocator{
        topology: topology,
        policy:   policy,
    }
}

// Allocate 分配 GPU
func (a *GPUAllocator) Allocate(available []int, count int, constraint TopologyConstraint) ([]int, error) {
    if len(available) < count {
        return nil, fmt.Errorf("insufficient GPUs: need %d, have %d", count, len(available))
    }

    if count == 1 {
        return []int{available[0]}, nil
    }

    switch a.policy {
    case PolicyBestFit:
        return a.allocateBestFit(available, count, constraint)
    case PolicyFirstFit:
        return a.allocateFirstFit(available, count, constraint)
    case PolicySpread:
        return a.allocateSpread(available, count)
    case PolicyPack:
        return a.allocatePack(available, count)
    default:
        return a.allocateBestFit(available, count, constraint)
    }
}

// TopologyConstraint 拓扑约束
type TopologyConstraint struct {
    // 最小拓扑层级要求
    MinLevel TopologyLevel
    // 是否必须同一 NUMA
    SameNUMA bool
    // 是否必须有 NVLINK
    RequireNVLink bool
    // 最小带宽要求 (GB/s)
    MinBandwidth float64
}

// allocateBestFit 最佳适配分配
func (a *GPUAllocator) allocateBestFit(available []int, count int, constraint TopologyConstraint) ([]int, error) {
    // 生成所有可能的组合
    combinations := a.generateCombinations(available, count)

    if len(combinations) == 0 {
        return nil, fmt.Errorf("no valid combinations")
    }

    // 评估每个组合
    type scoredCombination struct {
        gpus  []int
        score float64
    }

    var valid []scoredCombination

    for _, combo := range combinations {
        // 检查约束
        if !a.checkConstraint(combo, constraint) {
            continue
        }

        // 计算分数
        score := a.scoreTopology(combo)
        valid = append(valid, scoredCombination{gpus: combo, score: score})
    }

    if len(valid) == 0 {
        return nil, fmt.Errorf("no combination satisfies constraints")
    }

    // 按分数排序(分数越高越好)
    sort.Slice(valid, func(i, j int) bool {
        return valid[i].score > valid[j].score
    })

    return valid[0].gpus, nil
}

// allocateFirstFit 首次适配
func (a *GPUAllocator) allocateFirstFit(available []int, count int, constraint TopologyConstraint) ([]int, error) {
    // 优先按 NUMA 分组
    numaGroups := make(map[int][]int)
    for _, gpu := range available {
        numa := a.topology.NUMANodes[gpu]
        numaGroups[numa] = append(numaGroups[numa], gpu)
    }

    // 首先尝试同一 NUMA
    for _, gpus := range numaGroups {
        if len(gpus) >= count {
            combo := gpus[:count]
            if a.checkConstraint(combo, constraint) {
                return combo, nil
            }
        }
    }

    // 然后尝试跨 NUMA
    if len(available) >= count {
        combo := available[:count]
        if a.checkConstraint(combo, constraint) {
            return combo, nil
        }
    }

    return nil, fmt.Errorf("no valid allocation found")
}

// allocateSpread 分散分配
func (a *GPUAllocator) allocateSpread(available []int, count int) ([]int, error) {
    // 按 NUMA 分组
    numaGroups := make(map[int][]int)
    for _, gpu := range available {
        numa := a.topology.NUMANodes[gpu]
        numaGroups[numa] = append(numaGroups[numa], gpu)
    }

    // 轮询分配
    result := make([]int, 0, count)
    numaOrder := make([]int, 0, len(numaGroups))
    for numa := range numaGroups {
        numaOrder = append(numaOrder, numa)
    }
    sort.Ints(numaOrder)

    idx := 0
    for len(result) < count {
        numa := numaOrder[idx%len(numaOrder)]
        if len(numaGroups[numa]) > 0 {
            result = append(result, numaGroups[numa][0])
            numaGroups[numa] = numaGroups[numa][1:]
        }
        idx++

        // 防止无限循环
        if idx > count*len(numaOrder) {
            break
        }
    }

    if len(result) < count {
        return nil, fmt.Errorf("cannot spread allocate %d GPUs", count)
    }

    return result, nil
}

// allocatePack 紧凑分配
func (a *GPUAllocator) allocatePack(available []int, count int) ([]int, error) {
    // 按 NUMA 分组
    numaGroups := make(map[int][]int)
    for _, gpu := range available {
        numa := a.topology.NUMANodes[gpu]
        numaGroups[numa] = append(numaGroups[numa], gpu)
    }

    // 找到包含最多 GPU 的 NUMA
    var maxNUMA int
    maxCount := 0
    for numa, gpus := range numaGroups {
        if len(gpus) > maxCount {
            maxCount = len(gpus)
            maxNUMA = numa
        }
    }

    // 优先从该 NUMA 分配
    result := make([]int, 0, count)

    if len(numaGroups[maxNUMA]) >= count {
        return numaGroups[maxNUMA][:count], nil
    }

    result = append(result, numaGroups[maxNUMA]...)

    // 从其他 NUMA 补充
    for numa, gpus := range numaGroups {
        if numa == maxNUMA {
            continue
        }
        for _, gpu := range gpus {
            if len(result) >= count {
                break
            }
            result = append(result, gpu)
        }
    }

    if len(result) < count {
        return nil, fmt.Errorf("cannot pack allocate %d GPUs", count)
    }

    return result, nil
}

// generateCombinations 生成组合
func (a *GPUAllocator) generateCombinations(elements []int, k int) [][]int {
    var result [][]int

    var combine func(start int, current []int)
    combine = func(start int, current []int) {
        if len(current) == k {
            combo := make([]int, k)
            copy(combo, current)
            result = append(result, combo)
            return
        }

        for i := start; i <= len(elements)-(k-len(current)); i++ {
            combine(i+1, append(current, elements[i]))
        }
    }

    combine(0, nil)
    return result
}

// checkConstraint 检查约束
func (a *GPUAllocator) checkConstraint(gpus []int, constraint TopologyConstraint) bool {
    if len(gpus) <= 1 {
        return true
    }

    // 检查 NUMA 约束
    if constraint.SameNUMA {
        numa := a.topology.NUMANodes[gpus[0]]
        for _, gpu := range gpus[1:] {
            if a.topology.NUMANodes[gpu] != numa {
                return false
            }
        }
    }

    // 检查最小拓扑层级
    for i := 0; i < len(gpus); i++ {
        for j := i + 1; j < len(gpus); j++ {
            level := a.topology.Levels[gpus[i]][gpus[j]]
            if level > constraint.MinLevel {
                return false
            }
        }
    }

    // 检查 NVLINK 要求
    if constraint.RequireNVLink {
        for i := 0; i < len(gpus); i++ {
            for j := i + 1; j < len(gpus); j++ {
                level := a.topology.Levels[gpus[i]][gpus[j]]
                if level != TopologyLevelNVLink && level != TopologyLevelNVSwitch {
                    return false
                }
            }
        }
    }

    // 检查最小带宽
    if constraint.MinBandwidth > 0 {
        minBW := a.topology.GetMinBandwidth(gpus)
        if minBW < constraint.MinBandwidth {
            return false
        }
    }

    return true
}

// scoreTopology 计算拓扑评分
func (a *GPUAllocator) scoreTopology(gpus []int) float64 {
    if len(gpus) <= 1 {
        return 100.0
    }

    // 计算平均带宽
    totalBW := 0.0
    count := 0
    for i := 0; i < len(gpus); i++ {
        for j := i + 1; j < len(gpus); j++ {
            totalBW += a.topology.GetBandwidth(gpus[i], gpus[j])
            count++
        }
    }
    avgBW := totalBW / float64(count)

    // 归一化到 [0, 100]
    // 假设最大带宽 600 GB/s (NVSwitch)
    score := avgBW / 600.0 * 100.0
    if score > 100.0 {
        score = 100.0
    }

    return score
}

2.2 NCCL 拓扑优化

// pkg/scheduler/nccl.go
package scheduler

import (
    "fmt"
    "strings"
)

// NCCLTopologyGenerator NCCL 拓扑配置生成器
type NCCLTopologyGenerator struct {
    topology *TopologyMatrix
}

// NewNCCLTopologyGenerator 创建生成器
func NewNCCLTopologyGenerator(topology *TopologyMatrix) *NCCLTopologyGenerator {
    return &NCCLTopologyGenerator{topology: topology}
}

// GenerateNCCLEnv 生成 NCCL 环境变量
func (g *NCCLTopologyGenerator) GenerateNCCLEnv(gpus []int) map[string]string {
    env := make(map[string]string)

    // 基础配置
    env["NCCL_DEBUG"] = "INFO"
    env["NCCL_DEBUG_SUBSYS"] = "INIT,COLL"

    // 根据拓扑配置 NCCL
    hasNVLink := g.hasNVLinkBetween(gpus)
    hasNVSwitch := g.hasNVSwitch(gpus)
    sameNUMA := g.isSameNUMA(gpus)

    if hasNVSwitch {
        // NVSwitch 环境
        env["NCCL_NVLS_ENABLE"] = "1"          // 启用 NVLink SHARP
        env["NCCL_ALGO"] = "Ring,Tree"         // 使用 Ring 和 Tree 算法
        env["NCCL_PROTO"] = "LL,LL128,Simple"  // 使用所有协议
    } else if hasNVLink {
        // NVLINK 环境
        env["NCCL_P2P_LEVEL"] = "NVL"
        env["NCCL_ALGO"] = "Ring"
        env["NCCL_PROTO"] = "LL,LL128"
    } else {
        // PCIe 环境
        env["NCCL_P2P_LEVEL"] = "PHB"
        env["NCCL_ALGO"] = "Ring"
        env["NCCL_PROTO"] = "Simple"
    }

    // NUMA 优化
    if sameNUMA {
        env["NCCL_SHM_USE_CUDA_MEMCPY"] = "0"
    } else {
        env["NCCL_SHM_USE_CUDA_MEMCPY"] = "1"
    }

    // 设置 CUDA 可见设备
    cudaDevices := make([]string, len(gpus))
    for i, gpu := range gpus {
        cudaDevices[i] = fmt.Sprintf("%d", gpu)
    }
    env["CUDA_VISIBLE_DEVICES"] = strings.Join(cudaDevices, ",")

    return env
}

// GenerateNCCLTopologyFile 生成 NCCL 拓扑文件
func (g *NCCLTopologyGenerator) GenerateNCCLTopologyFile(gpus []int) string {
    var sb strings.Builder

    sb.WriteString("# NCCL Topology File\n")
    sb.WriteString("# Generated by GPU Scheduler\n\n")

    sb.WriteString("<system version=\"1\">\n")

    // CPU 信息
    numaNums := make(map[int]bool)
    for _, gpu := range gpus {
        numa := g.topology.NUMANodes[gpu]
        numaNums[numa] = true
    }

    for numa := range numaNums {
        sb.WriteString(fmt.Sprintf("  <cpu numaid=\"%d\" affinity=\"0x%x\" arch=\"x86_64\" vendor=\"AuthenticAMD\">\n",
            numa, 1<<numa))

        // 该 NUMA 上的 GPU
        for _, gpu := range gpus {
            if g.topology.NUMANodes[gpu] == numa {
                sb.WriteString(fmt.Sprintf("    <pci busid=\"%s\" class=\"0x030200\" link_speed=\"16 GT/s\" link_width=\"16\">\n",
                    g.getGPUBusID(gpu)))
                sb.WriteString(fmt.Sprintf("      <gpu dev=\"%d\" sm=\"80\" rank=\"%d\"/>\n", gpu, gpu))
                sb.WriteString("    </pci>\n")
            }
        }

        sb.WriteString("  </cpu>\n")
    }

    // NVLINK 连接
    sb.WriteString("  <nvlink>\n")
    for i := 0; i < len(gpus); i++ {
        for j := i + 1; j < len(gpus); j++ {
            if g.isNVLinkConnected(gpus[i], gpus[j]) {
                sb.WriteString(fmt.Sprintf("    <link gpu=\"%d\" tgpu=\"%d\" count=\"12\" bw=\"300\"/>\n",
                    gpus[i], gpus[j]))
            }
        }
    }
    sb.WriteString("  </nvlink>\n")

    sb.WriteString("</system>\n")

    return sb.String()
}

// hasNVLinkBetween 检查 GPU 组是否有 NVLINK 连接
func (g *NCCLTopologyGenerator) hasNVLinkBetween(gpus []int) bool {
    for i := 0; i < len(gpus); i++ {
        for j := i + 1; j < len(gpus); j++ {
            if g.isNVLinkConnected(gpus[i], gpus[j]) {
                return true
            }
        }
    }
    return false
}

// hasNVSwitch 检查是否使用 NVSwitch
func (g *NCCLTopologyGenerator) hasNVSwitch(gpus []int) bool {
    // NVSwitch 特征:所有 GPU 之间都有 NVLINK
    for i := 0; i < len(gpus); i++ {
        for j := i + 1; j < len(gpus); j++ {
            level := g.topology.Levels[gpus[i]][gpus[j]]
            if level != TopologyLevelNVSwitch && level != TopologyLevelNVLink {
                return false
            }
        }
    }
    return true
}

// isSameNUMA 检查是否在同一 NUMA
func (g *NCCLTopologyGenerator) isSameNUMA(gpus []int) bool {
    if len(gpus) <= 1 {
        return true
    }
    numa := g.topology.NUMANodes[gpus[0]]
    for _, gpu := range gpus[1:] {
        if g.topology.NUMANodes[gpu] != numa {
            return false
        }
    }
    return true
}

// isNVLinkConnected 检查两个 GPU 是否 NVLINK 连接
func (g *NCCLTopologyGenerator) isNVLinkConnected(gpu1, gpu2 int) bool {
    level := g.topology.Levels[gpu1][gpu2]
    return level == TopologyLevelNVLink || level == TopologyLevelNVSwitch
}

// getGPUBusID 获取 GPU PCIe Bus ID
func (g *NCCLTopologyGenerator) getGPUBusID(gpu int) string {
    // 实际实现应从 NVML 获取
    return fmt.Sprintf("0000:%02x:00.0", gpu*8+1)
}

2.3 Ring 拓扑优化

// pkg/scheduler/ring.go
package scheduler

import (
    "fmt"
    "math"
)

// RingOptimizer Ring 拓扑优化器
type RingOptimizer struct {
    topology *TopologyMatrix
}

// NewRingOptimizer 创建 Ring 优化器
func NewRingOptimizer(topology *TopologyMatrix) *RingOptimizer {
    return &RingOptimizer{topology: topology}
}

// OptimizeRing 优化 GPU Ring 顺序
// 返回使 Ring 通信带宽最大的 GPU 排列顺序
func (o *RingOptimizer) OptimizeRing(gpus []int) []int {
    if len(gpus) <= 2 {
        return gpus
    }

    // 使用贪心算法构建 Ring
    // 从任意节点开始,每次选择与当前节点带宽最大的下一个节点
    result := make([]int, 0, len(gpus))
    remaining := make(map[int]bool)
    for _, gpu := range gpus {
        remaining[gpu] = true
    }

    // 从第一个 GPU 开始
    current := gpus[0]
    result = append(result, current)
    delete(remaining, current)

    for len(remaining) > 0 {
        // 找带宽最大的下一个节点
        bestNext := -1
        bestBW := float64(-1)

        for gpu := range remaining {
            bw := o.topology.GetBandwidth(current, gpu)
            if bw > bestBW {
                bestBW = bw
                bestNext = gpu
            }
        }

        result = append(result, bestNext)
        delete(remaining, bestNext)
        current = bestNext
    }

    // 使用 2-opt 局部优化
    result = o.twoOpt(result)

    return result
}

// twoOpt 2-opt 局部搜索优化
func (o *RingOptimizer) twoOpt(ring []int) []int {
    n := len(ring)
    improved := true

    for improved {
        improved = false
        for i := 0; i < n-1; i++ {
            for j := i + 2; j < n; j++ {
                // 计算当前代价
                currentCost := o.edgeCost(ring[i], ring[i+1]) +
                    o.edgeCost(ring[j], ring[(j+1)%n])

                // 计算交换后代价
                newCost := o.edgeCost(ring[i], ring[j]) +
                    o.edgeCost(ring[i+1], ring[(j+1)%n])

                if newCost < currentCost {
                    // 反转 [i+1, j] 区间
                    for l, r := i+1, j; l < r; l, r = l+1, r-1 {
                        ring[l], ring[r] = ring[r], ring[l]
                    }
                    improved = true
                }
            }
        }
    }

    return ring
}

// edgeCost 计算边的代价(带宽的倒数)
func (o *RingOptimizer) edgeCost(gpu1, gpu2 int) float64 {
    bw := o.topology.GetBandwidth(gpu1, gpu2)
    if bw <= 0 {
        return math.MaxFloat64
    }
    return 1.0 / bw
}

// GetRingBandwidth 获取 Ring 的有效带宽(最小边带宽)
func (o *RingOptimizer) GetRingBandwidth(ring []int) float64 {
    if len(ring) < 2 {
        return 0
    }

    minBW := math.MaxFloat64
    for i := 0; i < len(ring); i++ {
        next := (i + 1) % len(ring)
        bw := o.topology.GetBandwidth(ring[i], ring[next])
        if bw < minBW {
            minBW = bw
        }
    }

    return minBW
}

// CalculateAllReduceBandwidth 计算 AllReduce 有效带宽
// 使用 Ring AllReduce 算法
func (o *RingOptimizer) CalculateAllReduceBandwidth(ring []int, dataSize int64) float64 {
    if len(ring) < 2 {
        return 0
    }

    n := len(ring)
    ringBW := o.GetRingBandwidth(ring)

    // Ring AllReduce 公式:
    // 时间 = 2 * (n-1) * dataSize / (n * bandwidth)
    // 有效带宽 = dataSize / 时间 = n * bandwidth / (2 * (n-1))

    effectiveBW := float64(n) * ringBW / (2.0 * float64(n-1))
    return effectiveBW
}

// PrintRing 打印 Ring 信息
func (o *RingOptimizer) PrintRing(ring []int) {
    fmt.Printf("Ring order: ")
    for i, gpu := range ring {
        if i > 0 {
            bw := o.topology.GetBandwidth(ring[i-1], gpu)
            fmt.Printf(" --%.0fGB/s--> ", bw)
        }
        fmt.Printf("GPU%d", gpu)
    }
    // 打印首尾连接
    bw := o.topology.GetBandwidth(ring[len(ring)-1], ring[0])
    fmt.Printf(" --%.0fGB/s--> GPU%d\n", bw, ring[0])

    fmt.Printf("Ring bandwidth: %.0f GB/s\n", o.GetRingBandwidth(ring))
}

3. Kubernetes 拓扑感知调度实现

3.1 Topology Manager 集成

# kubelet-config.yaml
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
# 启用 Topology Manager
topologyManagerPolicy: "single-numa-node"  # 或 "best-effort", "restricted", "none"
topologyManagerScope: "container"  # 或 "pod"
# CPU Manager
cpuManagerPolicy: "static"
cpuManagerReconcilePeriod: "10s"
# Memory Manager (可选)
memoryManagerPolicy: "Static"
reservedMemory:
  - numaNode: 0
    limits:
      memory: 1Gi
  - numaNode: 1
    limits:
      memory: 1Gi

3.2 扩展 Device Plugin 支持拓扑

// pkg/deviceplugin/topology.go
package deviceplugin

import (
    "context"

    pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)

// TopologyAwareDevicePlugin 拓扑感知 Device Plugin
type TopologyAwareDevicePlugin struct {
    BaseDevicePlugin
    topology *TopologyMatrix
}

// GetDevicePluginOptions 返回设备选项
func (p *TopologyAwareDevicePlugin) GetDevicePluginOptions(ctx context.Context,
    _ *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
    return &pluginapi.DevicePluginOptions{
        // 请求优先设备分配
        PreStartRequired:                true,
        GetPreferredAllocationAvailable: true,
    }, nil
}

// GetPreferredAllocation 返回首选的设备分配
func (p *TopologyAwareDevicePlugin) GetPreferredAllocation(ctx context.Context,
    req *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {

    resp := &pluginapi.PreferredAllocationResponse{}

    for _, containerReq := range req.ContainerRequests {
        // 获取可用设备
        available := containerReq.AvailableDeviceIDs
        mustInclude := containerReq.MustIncludeDeviceIDs
        count := int(containerReq.AllocationSize)

        // 转换为 GPU 索引
        availableIndices := p.deviceIDsToIndices(available)
        mustIncludeIndices := p.deviceIDsToIndices(mustInclude)

        // 使用拓扑感知分配
        allocator := NewGPUAllocator(p.topology, PolicyBestFit)

        // 合并必须包含的设备
        var toAllocate []int
        if len(mustIncludeIndices) > 0 {
            toAllocate = mustIncludeIndices
            count -= len(mustIncludeIndices)
            // 从可用中移除已包含的
            availableIndices = p.removeIndices(availableIndices, mustIncludeIndices)
        }

        if count > 0 {
            constraint := TopologyConstraint{
                MinLevel:      TopologyLevelPHB,
                RequireNVLink: true, // 尽量使用 NVLINK
            }

            allocated, err := allocator.Allocate(availableIndices, count, constraint)
            if err != nil {
                // 降级:不要求 NVLINK
                constraint.RequireNVLink = false
                allocated, err = allocator.Allocate(availableIndices, count, constraint)
            }

            if err == nil {
                toAllocate = append(toAllocate, allocated...)
            }
        }

        // 转换回设备 ID
        deviceIDs := p.indicesToDeviceIDs(toAllocate)

        resp.ContainerResponses = append(resp.ContainerResponses,
            &pluginapi.ContainerPreferredAllocationResponse{
                DeviceIDs: deviceIDs,
            })
    }

    return resp, nil
}

// Allocate 分配设备
func (p *TopologyAwareDevicePlugin) Allocate(ctx context.Context,
    req *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {

    resp := &pluginapi.AllocateResponse{}

    for _, containerReq := range req.ContainerRequests {
        deviceIDs := containerReq.DevicesIDs
        gpuIndices := p.deviceIDsToIndices(deviceIDs)

        // 生成 NCCL 配置
        ncclGen := NewNCCLTopologyGenerator(p.topology)
        ncclEnv := ncclGen.GenerateNCCLEnv(gpuIndices)

        // 优化 Ring 顺序
        ringOpt := NewRingOptimizer(p.topology)
        optimizedRing := ringOpt.OptimizeRing(gpuIndices)

        // 生成拓扑文件
        topoFile := ncclGen.GenerateNCCLTopologyFile(optimizedRing)

        containerResp := &pluginapi.ContainerAllocateResponse{
            Envs:        ncclEnv,
            Annotations: make(map[string]string),
        }

        // 添加拓扑注解
        containerResp.Annotations["gpu.topology/ring-order"] =
            p.indicesToString(optimizedRing)
        containerResp.Annotations["gpu.topology/bandwidth"] =
            fmt.Sprintf("%.0f", ringOpt.GetRingBandwidth(optimizedRing))

        // 挂载拓扑文件
        containerResp.Mounts = append(containerResp.Mounts, &pluginapi.Mount{
            ContainerPath: "/etc/nccl-topo.xml",
            HostPath:      p.writeTopoFile(topoFile),
            ReadOnly:      true,
        })

        // NVIDIA 运行时所需的设备
        containerResp.Envs["NVIDIA_VISIBLE_DEVICES"] = strings.Join(deviceIDs, ",")

        resp.ContainerResponses = append(resp.ContainerResponses, containerResp)
    }

    return resp, nil
}

// deviceIDsToIndices 设备 ID 转索引
func (p *TopologyAwareDevicePlugin) deviceIDsToIndices(ids []string) []int {
    indices := make([]int, len(ids))
    for i, id := range ids {
        // 假设 ID 格式为 "GPU-{uuid}" 或直接是索引
        indices[i] = p.getGPUIndex(id)
    }
    return indices
}

// indicesToDeviceIDs 索引转设备 ID
func (p *TopologyAwareDevicePlugin) indicesToDeviceIDs(indices []int) []string {
    ids := make([]string, len(indices))
    for i, idx := range indices {
        ids[i] = p.getDeviceID(idx)
    }
    return ids
}

3.3 调度器插件中的拓扑处理

// pkg/scheduler/topology_plugin.go
package scheduler

import (
    "context"
    "fmt"

    v1 "k8s.io/api/core/v1"
    "k8s.io/kubernetes/pkg/scheduler/framework"
)

const (
    TopologyPluginName = "GPUTopologyAware"

    // 注解
    AnnotationTopologyPolicy = "gpu.topology/policy"
    AnnotationMinBandwidth   = "gpu.topology/min-bandwidth"
    AnnotationRequireNVLink  = "gpu.topology/require-nvlink"

    // 拓扑策略
    TopologyPolicySingleNUMA = "single-numa"
    TopologyPolicyNVLink     = "nvlink"
    TopologyPolicyBestEffort = "best-effort"
    TopologyPolicySpread     = "spread"
)

// TopologyAwarePlugin 拓扑感知调度插件
type TopologyAwarePlugin struct {
    handle           framework.Handle
    topologyManager  *TopologyManager
}

// TopologyManager 拓扑管理器
type TopologyManager struct {
    // 节点拓扑缓存
    nodeTopologies map[string]*TopologyMatrix
}

// PreFilter 预过滤
func (p *TopologyAwarePlugin) PreFilter(ctx context.Context, state *framework.CycleState,
    pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {

    // 解析拓扑策略
    policy := p.parseTopologyPolicy(pod)

    // 存储到状态
    state.Write("TopologyPolicy", policy)

    return nil, framework.NewStatus(framework.Success)
}

// Filter 过滤
func (p *TopologyAwarePlugin) Filter(ctx context.Context, state *framework.CycleState,
    pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {

    // 获取 GPU 请求数量
    gpuCount := p.getGPURequest(pod)
    if gpuCount == 0 {
        return framework.NewStatus(framework.Success)
    }

    // 获取拓扑策略
    policyData, err := state.Read("TopologyPolicy")
    if err != nil {
        return framework.NewStatus(framework.Success)
    }
    policy := policyData.(*TopologyPolicy)

    // 获取节点拓扑
    nodeName := nodeInfo.Node().Name
    topo := p.topologyManager.GetTopology(nodeName)
    if topo == nil {
        // 无拓扑信息,使用默认行为
        return framework.NewStatus(framework.Success)
    }

    // 获取可用 GPU
    availableGPUs := p.getAvailableGPUs(nodeInfo)

    // 检查是否能满足拓扑约束
    allocator := NewGPUAllocator(topo, PolicyBestFit)
    _, err = allocator.Allocate(availableGPUs, int(gpuCount), policy.Constraint)
    if err != nil {
        return framework.NewStatus(framework.Unschedulable,
            fmt.Sprintf("topology constraint cannot be satisfied: %v", err))
    }

    return framework.NewStatus(framework.Success)
}

// Score 打分
func (p *TopologyAwarePlugin) Score(ctx context.Context, state *framework.CycleState,
    pod *v1.Pod, nodeName string) (int64, *framework.Status) {

    gpuCount := p.getGPURequest(pod)
    if gpuCount == 0 {
        return 50, framework.NewStatus(framework.Success)
    }

    // 获取节点拓扑
    topo := p.topologyManager.GetTopology(nodeName)
    if topo == nil {
        return 50, framework.NewStatus(framework.Success)
    }

    // 计算拓扑得分
    score := p.calculateTopologyScore(topo, int(gpuCount))

    return score, framework.NewStatus(framework.Success)
}

// TopologyPolicy 拓扑策略
type TopologyPolicy struct {
    Policy     string
    Constraint TopologyConstraint
}

// parseTopologyPolicy 解析拓扑策略
func (p *TopologyAwarePlugin) parseTopologyPolicy(pod *v1.Pod) *TopologyPolicy {
    policy := &TopologyPolicy{
        Policy: TopologyPolicyBestEffort,
        Constraint: TopologyConstraint{
            MinLevel: TopologyLevelSYS,
        },
    }

    if pod.Annotations == nil {
        return policy
    }

    // 解析策略
    if policyStr, ok := pod.Annotations[AnnotationTopologyPolicy]; ok {
        policy.Policy = policyStr

        switch policyStr {
        case TopologyPolicySingleNUMA:
            policy.Constraint.SameNUMA = true
        case TopologyPolicyNVLink:
            policy.Constraint.RequireNVLink = true
        }
    }

    // 解析最小带宽
    if bwStr, ok := pod.Annotations[AnnotationMinBandwidth]; ok {
        var bw float64
        fmt.Sscanf(bwStr, "%f", &bw)
        policy.Constraint.MinBandwidth = bw
    }

    // 解析 NVLINK 要求
    if nvlink, ok := pod.Annotations[AnnotationRequireNVLink]; ok && nvlink == "true" {
        policy.Constraint.RequireNVLink = true
    }

    return policy
}

// calculateTopologyScore 计算拓扑得分
func (p *TopologyAwarePlugin) calculateTopologyScore(topo *TopologyMatrix, gpuCount int) int64 {
    // 基于可用 GPU 的拓扑质量评分

    // 收集可用 GPU 索引
    availableGPUs := make([]int, topo.GPUCount)
    for i := 0; i < topo.GPUCount; i++ {
        availableGPUs[i] = i
    }

    // 使用最佳适配找到最优组合
    allocator := NewGPUAllocator(topo, PolicyBestFit)
    allocated, err := allocator.Allocate(availableGPUs, gpuCount, TopologyConstraint{})
    if err != nil {
        return 0
    }

    // 计算分配组合的带宽得分
    minBW := topo.GetMinBandwidth(allocated)

    // 归一化到 [0, 100]
    // 基准:NVSwitch 600 GB/s = 100 分
    score := int64(minBW / 6.0)
    if score > 100 {
        score = 100
    }

    return score
}

// getGPURequest 获取 GPU 请求数量
func (p *TopologyAwarePlugin) getGPURequest(pod *v1.Pod) int64 {
    var total int64
    for _, c := range pod.Spec.Containers {
        if c.Resources.Requests != nil {
            if gpu, ok := c.Resources.Requests["nvidia.com/gpu"]; ok {
                total += gpu.Value()
            }
        }
    }
    return total
}

// getAvailableGPUs 获取节点可用 GPU
func (p *TopologyAwarePlugin) getAvailableGPUs(nodeInfo *framework.NodeInfo) []int {
    // 实际实现需要从 Device Plugin 或 CRD 获取
    // 这里简化处理
    var available []int
    allocatable := nodeInfo.Allocatable
    if gpuRes, ok := allocatable.ScalarResources["nvidia.com/gpu"]; ok {
        for i := 0; i < int(gpuRes); i++ {
            available = append(available, i)
        }
    }
    return available
}

4. 跨节点拓扑感知

4.1 多节点 GPU 拓扑

// pkg/topology/cluster.go
package topology

import (
    "fmt"
    "sort"
)

// ClusterTopology 集群拓扑
type ClusterTopology struct {
    // 节点拓扑
    NodeTopologies map[string]*TopologyMatrix
    // 节点间网络拓扑
    NetworkTopology *NetworkTopology
}

// NetworkTopology 网络拓扑
type NetworkTopology struct {
    // 节点数量
    NodeCount int
    // 网络带宽矩阵 (GB/s)
    Bandwidth [][]float64
    // 网络延迟矩阵 (μs)
    Latency [][]float64
    // 节点名映射
    NodeIndex map[string]int
}

// NewClusterTopology 创建集群拓扑
func NewClusterTopology() *ClusterTopology {
    return &ClusterTopology{
        NodeTopologies:  make(map[string]*TopologyMatrix),
        NetworkTopology: &NetworkTopology{
            NodeIndex: make(map[string]int),
        },
    }
}

// AddNode 添加节点
func (c *ClusterTopology) AddNode(nodeName string, topo *TopologyMatrix) {
    c.NodeTopologies[nodeName] = topo

    // 更新网络拓扑
    if _, exists := c.NetworkTopology.NodeIndex[nodeName]; !exists {
        idx := c.NetworkTopology.NodeCount
        c.NetworkTopology.NodeIndex[nodeName] = idx
        c.NetworkTopology.NodeCount++

        // 扩展带宽矩阵
        newBW := make([][]float64, c.NetworkTopology.NodeCount)
        for i := range newBW {
            newBW[i] = make([]float64, c.NetworkTopology.NodeCount)
            if i < len(c.NetworkTopology.Bandwidth) {
                copy(newBW[i], c.NetworkTopology.Bandwidth[i])
            }
        }
        c.NetworkTopology.Bandwidth = newBW

        // 扩展延迟矩阵
        newLatency := make([][]float64, c.NetworkTopology.NodeCount)
        for i := range newLatency {
            newLatency[i] = make([]float64, c.NetworkTopology.NodeCount)
            if i < len(c.NetworkTopology.Latency) {
                copy(newLatency[i], c.NetworkTopology.Latency[i])
            }
        }
        c.NetworkTopology.Latency = newLatency
    }
}

// SetNetworkConnection 设置节点间网络连接
func (c *ClusterTopology) SetNetworkConnection(node1, node2 string, bandwidth, latency float64) {
    idx1, ok1 := c.NetworkTopology.NodeIndex[node1]
    idx2, ok2 := c.NetworkTopology.NodeIndex[node2]

    if !ok1 || !ok2 {
        return
    }

    c.NetworkTopology.Bandwidth[idx1][idx2] = bandwidth
    c.NetworkTopology.Bandwidth[idx2][idx1] = bandwidth
    c.NetworkTopology.Latency[idx1][idx2] = latency
    c.NetworkTopology.Latency[idx2][idx1] = latency
}

// GPULocation GPU 位置
type GPULocation struct {
    NodeName string
    GPUIndex int
}

// GetInterNodeBandwidth 获取跨节点 GPU 间带宽
func (c *ClusterTopology) GetInterNodeBandwidth(loc1, loc2 GPULocation) float64 {
    if loc1.NodeName == loc2.NodeName {
        // 同节点,使用节点内拓扑
        topo := c.NodeTopologies[loc1.NodeName]
        if topo != nil {
            return topo.GetBandwidth(loc1.GPUIndex, loc2.GPUIndex)
        }
        return 0
    }

    // 跨节点,使用网络拓扑
    idx1, ok1 := c.NetworkTopology.NodeIndex[loc1.NodeName]
    idx2, ok2 := c.NetworkTopology.NodeIndex[loc2.NodeName]

    if !ok1 || !ok2 {
        return 0
    }

    return c.NetworkTopology.Bandwidth[idx1][idx2]
}

// MultiNodeAllocation 多节点分配结果
type MultiNodeAllocation struct {
    // 节点分配
    NodeAllocations map[string][]int
    // 总 GPU 数
    TotalGPUs int
    // 最小带宽
    MinBandwidth float64
    // Ring 顺序
    RingOrder []GPULocation
}

// MultiNodeAllocator 多节点分配器
type MultiNodeAllocator struct {
    clusterTopo *ClusterTopology
}

// NewMultiNodeAllocator 创建多节点分配器
func NewMultiNodeAllocator(clusterTopo *ClusterTopology) *MultiNodeAllocator {
    return &MultiNodeAllocator{clusterTopo: clusterTopo}
}

// Allocate 分配 GPU
func (a *MultiNodeAllocator) Allocate(totalGPUs int, constraint MultiNodeConstraint) (*MultiNodeAllocation, error) {
    // 收集所有节点的可用 GPU
    nodeGPUs := make(map[string]int)
    for nodeName, topo := range a.clusterTopo.NodeTopologies {
        nodeGPUs[nodeName] = topo.GPUCount
    }

    // 根据策略分配
    switch constraint.Policy {
    case "pack":
        return a.allocatePack(totalGPUs, nodeGPUs, constraint)
    case "spread":
        return a.allocateSpread(totalGPUs, nodeGPUs, constraint)
    default:
        return a.allocateBestFit(totalGPUs, nodeGPUs, constraint)
    }
}

// MultiNodeConstraint 多节点约束
type MultiNodeConstraint struct {
    // 分配策略
    Policy string
    // 最大节点数
    MaxNodes int
    // 单节点最小 GPU 数
    MinGPUsPerNode int
    // 最小网络带宽 (GB/s)
    MinNetworkBandwidth float64
}

// allocateBestFit 最佳适配分配
func (a *MultiNodeAllocator) allocateBestFit(totalGPUs int, nodeGPUs map[string]int,
    constraint MultiNodeConstraint) (*MultiNodeAllocation, error) {

    // 按 GPU 数量排序节点
    type nodeInfo struct {
        name string
        gpus int
    }
    nodes := make([]nodeInfo, 0, len(nodeGPUs))
    for name, gpus := range nodeGPUs {
        nodes = append(nodes, nodeInfo{name, gpus})
    }
    sort.Slice(nodes, func(i, j int) bool {
        return nodes[i].gpus > nodes[j].gpus
    })

    result := &MultiNodeAllocation{
        NodeAllocations: make(map[string][]int),
    }

    remaining := totalGPUs
    nodeCount := 0

    for _, node := range nodes {
        if remaining <= 0 {
            break
        }

        if constraint.MaxNodes > 0 && nodeCount >= constraint.MaxNodes {
            break
        }

        // 计算分配数量
        allocate := min(remaining, node.gpus)

        // 检查单节点最小 GPU 数
        if constraint.MinGPUsPerNode > 0 && allocate < constraint.MinGPUsPerNode {
            continue
        }

        // 节点内最优分配
        topo := a.clusterTopo.NodeTopologies[node.name]
        allocator := NewGPUAllocator(topo, PolicyBestFit)

        available := make([]int, topo.GPUCount)
        for i := range available {
            available[i] = i
        }

        allocated, err := allocator.Allocate(available, allocate, TopologyConstraint{
            RequireNVLink: true, // 优先 NVLINK
        })
        if err != nil {
            // 降级
            allocated, _ = allocator.Allocate(available, allocate, TopologyConstraint{})
        }

        result.NodeAllocations[node.name] = allocated
        remaining -= len(allocated)
        nodeCount++
    }

    if remaining > 0 {
        return nil, fmt.Errorf("cannot allocate %d GPUs, %d remaining", totalGPUs, remaining)
    }

    // 计算总 GPU 数和最小带宽
    result.TotalGPUs = totalGPUs
    result.MinBandwidth = a.calculateMinBandwidth(result)

    // 优化 Ring 顺序
    result.RingOrder = a.optimizeMultiNodeRing(result)

    return result, nil
}

// allocatePack 紧凑分配(尽量少用节点)
func (a *MultiNodeAllocator) allocatePack(totalGPUs int, nodeGPUs map[string]int,
    constraint MultiNodeConstraint) (*MultiNodeAllocation, error) {

    // 找能容纳所有 GPU 的最小节点集合
    // 使用贪心算法

    type nodeInfo struct {
        name string
        gpus int
    }
    nodes := make([]nodeInfo, 0, len(nodeGPUs))
    for name, gpus := range nodeGPUs {
        nodes = append(nodes, nodeInfo{name, gpus})
    }
    // 按 GPU 数量降序
    sort.Slice(nodes, func(i, j int) bool {
        return nodes[i].gpus > nodes[j].gpus
    })

    result := &MultiNodeAllocation{
        NodeAllocations: make(map[string][]int),
    }

    remaining := totalGPUs

    for _, node := range nodes {
        if remaining <= 0 {
            break
        }

        allocate := min(remaining, node.gpus)

        // 检查网络带宽
        if len(result.NodeAllocations) > 0 && constraint.MinNetworkBandwidth > 0 {
            // 检查与已分配节点的网络连接
            meetsRequirement := true
            for existingNode := range result.NodeAllocations {
                idx1 := a.clusterTopo.NetworkTopology.NodeIndex[existingNode]
                idx2 := a.clusterTopo.NetworkTopology.NodeIndex[node.name]
                bw := a.clusterTopo.NetworkTopology.Bandwidth[idx1][idx2]
                if bw < constraint.MinNetworkBandwidth {
                    meetsRequirement = false
                    break
                }
            }
            if !meetsRequirement {
                continue
            }
        }

        topo := a.clusterTopo.NodeTopologies[node.name]
        available := make([]int, topo.GPUCount)
        for i := range available {
            available[i] = i
        }

        allocator := NewGPUAllocator(topo, PolicyPack)
        allocated, _ := allocator.Allocate(available, allocate, TopologyConstraint{})

        result.NodeAllocations[node.name] = allocated
        remaining -= len(allocated)
    }

    if remaining > 0 {
        return nil, fmt.Errorf("pack allocation failed, %d GPUs remaining", remaining)
    }

    result.TotalGPUs = totalGPUs
    result.MinBandwidth = a.calculateMinBandwidth(result)
    result.RingOrder = a.optimizeMultiNodeRing(result)

    return result, nil
}

// allocateSpread 分散分配
func (a *MultiNodeAllocator) allocateSpread(totalGPUs int, nodeGPUs map[string]int,
    constraint MultiNodeConstraint) (*MultiNodeAllocation, error) {

    result := &MultiNodeAllocation{
        NodeAllocations: make(map[string][]int),
    }

    // 计算每节点分配数量
    nodeCount := len(nodeGPUs)
    if constraint.MaxNodes > 0 && nodeCount > constraint.MaxNodes {
        nodeCount = constraint.MaxNodes
    }

    gpusPerNode := totalGPUs / nodeCount
    extraGPUs := totalGPUs % nodeCount

    i := 0
    for nodeName := range nodeGPUs {
        if i >= nodeCount {
            break
        }

        allocate := gpusPerNode
        if i < extraGPUs {
            allocate++
        }

        topo := a.clusterTopo.NodeTopologies[nodeName]
        available := make([]int, topo.GPUCount)
        for j := range available {
            available[j] = j
        }

        allocator := NewGPUAllocator(topo, PolicyBestFit)
        allocated, err := allocator.Allocate(available, allocate, TopologyConstraint{})
        if err != nil {
            continue
        }

        result.NodeAllocations[nodeName] = allocated
        i++
    }

    result.TotalGPUs = totalGPUs
    result.MinBandwidth = a.calculateMinBandwidth(result)
    result.RingOrder = a.optimizeMultiNodeRing(result)

    return result, nil
}

// calculateMinBandwidth 计算最小带宽
func (a *MultiNodeAllocator) calculateMinBandwidth(alloc *MultiNodeAllocation) float64 {
    minBW := float64(1e9)

    // 节点内带宽
    for nodeName, gpus := range alloc.NodeAllocations {
        topo := a.clusterTopo.NodeTopologies[nodeName]
        bw := topo.GetMinBandwidth(gpus)
        if bw < minBW {
            minBW = bw
        }
    }

    // 跨节点带宽
    nodes := make([]string, 0, len(alloc.NodeAllocations))
    for nodeName := range alloc.NodeAllocations {
        nodes = append(nodes, nodeName)
    }

    for i := 0; i < len(nodes); i++ {
        for j := i + 1; j < len(nodes); j++ {
            idx1 := a.clusterTopo.NetworkTopology.NodeIndex[nodes[i]]
            idx2 := a.clusterTopo.NetworkTopology.NodeIndex[nodes[j]]
            bw := a.clusterTopo.NetworkTopology.Bandwidth[idx1][idx2]
            if bw < minBW {
                minBW = bw
            }
        }
    }

    return minBW
}

// optimizeMultiNodeRing 优化多节点 Ring
func (a *MultiNodeAllocator) optimizeMultiNodeRing(alloc *MultiNodeAllocation) []GPULocation {
    var ring []GPULocation

    // 收集所有 GPU 位置
    for nodeName, gpus := range alloc.NodeAllocations {
        for _, gpu := range gpus {
            ring = append(ring, GPULocation{NodeName: nodeName, GPUIndex: gpu})
        }
    }

    // 优化 Ring 顺序
    // 策略:尽量减少跨节点通信

    // 按节点分组
    nodeGroups := make(map[string][]GPULocation)
    for _, loc := range ring {
        nodeGroups[loc.NodeName] = append(nodeGroups[loc.NodeName], loc)
    }

    // 节点内优化
    for nodeName, locs := range nodeGroups {
        topo := a.clusterTopo.NodeTopologies[nodeName]
        gpuIndices := make([]int, len(locs))
        for i, loc := range locs {
            gpuIndices[i] = loc.GPUIndex
        }

        ringOpt := NewRingOptimizer(topo)
        optimizedIndices := ringOpt.OptimizeRing(gpuIndices)

        for i, idx := range optimizedIndices {
            nodeGroups[nodeName][i].GPUIndex = idx
        }
    }

    // 按节点顺序排列
    result := make([]GPULocation, 0, len(ring))
    for _, locs := range nodeGroups {
        result = append(result, locs...)
    }

    return result
}

func min(a, b int) int {
    if a < b {
        return a
    }
    return b
}

4.2 分布式训练调度

# distributed-training-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: distributed-training
  namespace: ml-workloads
  annotations:
    # 拓扑感知调度
    gpu.topology/policy: "nvlink"
    gpu.topology/min-bandwidth: "100"
    # 多节点配置
    gpu.topology/multi-node-policy: "pack"
    gpu.topology/max-nodes: "2"
    gpu.topology/min-gpus-per-node: "4"
spec:
  parallelism: 2
  completions: 2
  template:
    metadata:
      labels:
        job-name: distributed-training
    spec:
      schedulerName: gpu-scheduler
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchLabels:
                  job-name: distributed-training
              topologyKey: kubernetes.io/hostname
      containers:
        - name: trainer
          image: pytorch/pytorch:2.0-cuda12.1-cudnn8-runtime
          command:
            - torchrun
            - --nproc_per_node=4
            - --nnodes=2
            - --node_rank=$(POD_INDEX)
            - --master_addr=$(MASTER_ADDR)
            - --master_port=29500
            - train.py
          env:
            - name: POD_INDEX
              valueFrom:
                fieldRef:
                  fieldPath: metadata.annotations['batch.kubernetes.io/job-completion-index']
            - name: MASTER_ADDR
              value: "distributed-training-0.distributed-training"
            - name: NCCL_DEBUG
              value: INFO
            - name: NCCL_IB_DISABLE
              value: "0"
          resources:
            limits:
              nvidia.com/gpu: 4
            requests:
              nvidia.com/gpu: 4
              cpu: "16"
              memory: 64Gi
          volumeMounts:
            - name: shm
              mountPath: /dev/shm
      volumes:
        - name: shm
          emptyDir:
            medium: Memory
            sizeLimit: 32Gi
      restartPolicy: Never
  backoffLimit: 3
---
apiVersion: v1
kind: Service
metadata:
  name: distributed-training
  namespace: ml-workloads
spec:
  clusterIP: None
  selector:
    job-name: distributed-training
  ports:
    - port: 29500
      name: nccl

5. 拓扑感知调度实践

5.1 性能对比测试

# benchmark_topology.py
import torch
import torch.distributed as dist
import time
import os

def benchmark_allreduce(size_mb, iterations=100):
    """测试 AllReduce 性能"""
    rank = dist.get_rank()
    world_size = dist.get_world_size()

    # 创建测试张量
    tensor = torch.randn(size_mb * 1024 * 256, device='cuda')  # MB to elements

    # 预热
    for _ in range(10):
        dist.all_reduce(tensor)
    torch.cuda.synchronize()

    # 计时
    start = time.time()
    for _ in range(iterations):
        dist.all_reduce(tensor)
    torch.cuda.synchronize()
    elapsed = time.time() - start

    # 计算带宽
    # AllReduce 传输量 = 2 * (n-1) / n * data_size
    data_size = size_mb * 1024 * 1024  # bytes
    transfer_size = 2 * (world_size - 1) / world_size * data_size
    bandwidth = transfer_size * iterations / elapsed / 1e9  # GB/s

    if rank == 0:
        print(f"Size: {size_mb}MB, Iterations: {iterations}")
        print(f"Time: {elapsed:.3f}s")
        print(f"Bandwidth: {bandwidth:.2f} GB/s")
        print(f"Latency: {elapsed/iterations*1000:.3f} ms")

    return bandwidth

def main():
    dist.init_process_group(backend='nccl')
    rank = dist.get_rank()

    # 设置 GPU
    local_rank = int(os.environ.get('LOCAL_RANK', 0))
    torch.cuda.set_device(local_rank)

    if rank == 0:
        print(f"World size: {dist.get_world_size()}")
        print(f"Backend: {dist.get_backend()}")
        print(f"CUDA device: {torch.cuda.get_device_name()}")
        print()

    # 不同大小的测试
    sizes = [1, 10, 100, 1000]
    for size in sizes:
        benchmark_allreduce(size)
        if rank == 0:
            print("-" * 40)

    dist.destroy_process_group()

if __name__ == '__main__':
    main()

5.2 拓扑验证脚本

#!/bin/bash
# verify_topology.sh - 验证 GPU 拓扑配置

echo "=== GPU Topology Verification ==="
echo

# 检查 GPU 信息
echo "1. GPU Information:"
nvidia-smi --query-gpu=index,name,uuid,memory.total --format=csv
echo

# 检查拓扑
echo "2. GPU Topology:"
nvidia-smi topo -m
echo

# 检查 NVLINK
echo "3. NVLINK Status:"
nvidia-smi nvlink -s
echo

# 检查 NUMA
echo "4. NUMA Affinity:"
for i in $(seq 0 $(($(nvidia-smi -L | wc -l) - 1))); do
    busid=$(nvidia-smi --id=$i --query-gpu=pci.bus_id --format=csv,noheader)
    numa=$(cat /sys/bus/pci/devices/$busid/numa_node 2>/dev/null || echo "N/A")
    echo "GPU $i: Bus=$busid, NUMA=$numa"
done
echo

# 检查网络
echo "5. Network Interfaces:"
if [ -d /sys/class/infiniband ]; then
    for dev in /sys/class/infiniband/*; do
        name=$(basename $dev)
        numa=$(cat $dev/device/numa_node 2>/dev/null || echo "N/A")
        state=$(cat $dev/ports/1/state 2>/dev/null || echo "N/A")
        echo "IB: $name, NUMA=$numa, State=$state"
    done
else
    echo "No InfiniBand devices found"
fi
echo

# 检查 NCCL 环境
echo "6. NCCL Environment:"
env | grep NCCL | sort
echo

# P2P 测试
echo "7. P2P Bandwidth Test:"
if command -v cuda_memtest &> /dev/null; then
    /usr/local/cuda/samples/1_Utilities/p2pBandwidthLatencyTest/p2pBandwidthLatencyTest
else
    echo "p2pBandwidthLatencyTest not found"
fi

5.3 监控拓扑使用情况

# prometheus-rules.yaml
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
  name: gpu-topology-rules
  namespace: monitoring
spec:
  groups:
    - name: gpu-topology
      rules:
        # 拓扑感知调度成功率
        - record: gpu_topology_scheduling_success_rate
          expr: |
            sum(rate(gpu_scheduler_topology_schedules_total{satisfied="true"}[5m])) /
            sum(rate(gpu_scheduler_topology_schedules_total[5m]))

        # NVLINK 使用率
        - record: gpu_nvlink_utilization
          expr: |
            sum(DCGM_FI_DEV_NVLINK_BANDWIDTH_TOTAL) by (gpu) /
            (300 * 12)  # A100: 300GB/s * 12 links

        # 跨 NUMA 调度比例
        - record: gpu_cross_numa_scheduling_ratio
          expr: |
            sum(gpu_scheduler_allocations{same_numa="false"}) /
            sum(gpu_scheduler_allocations)

        # 告警:拓扑约束失败率过高
        - alert: HighTopologyConstraintFailureRate
          expr: gpu_topology_scheduling_success_rate < 0.8
          for: 10m
          labels:
            severity: warning
          annotations:
            summary: "GPU topology constraint failure rate is high"
            description: "More than 20% of topology-constrained scheduling requests are failing"

        # 告警:频繁跨 NUMA 调度
        - alert: HighCrossNUMAScheduling
          expr: gpu_cross_numa_scheduling_ratio > 0.5
          for: 30m
          labels:
            severity: warning
          annotations:
            summary: "High cross-NUMA GPU scheduling ratio"
            description: "More than 50% of multi-GPU allocations are crossing NUMA boundaries"

6. 最佳实践

6.1 拓扑策略选择指南

场景推荐策略GPU 数量说明
大模型训练nvlink + pack8+最大化通信带宽
模型并行single-numa2-4减少 NUMA 跨越开销
数据并行best-effortany灵活分配
推理服务spread1-2提高可用性
混合并行nvlink4-8平衡带宽和规模

6.2 NCCL 调优建议

# nccl-tuning-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: nccl-tuning
  namespace: ml-workloads
data:
  # 基础配置
  NCCL_DEBUG: "WARN"
  NCCL_DEBUG_SUBSYS: "INIT,COLL"

  # NVSwitch 环境
  nvswitch.env: |
    NCCL_NVLS_ENABLE=1
    NCCL_ALGO=Ring,Tree
    NCCL_PROTO=LL,LL128,Simple
    NCCL_MIN_NCHANNELS=32
    NCCL_MAX_NCHANNELS=32

  # NVLINK 环境(无 NVSwitch)
  nvlink.env: |
    NCCL_P2P_LEVEL=NVL
    NCCL_ALGO=Ring
    NCCL_PROTO=LL,LL128
    NCCL_MIN_NCHANNELS=16

  # PCIe 环境
  pcie.env: |
    NCCL_P2P_LEVEL=PHB
    NCCL_SHM_USE_CUDA_MEMCPY=1
    NCCL_ALGO=Ring
    NCCL_PROTO=Simple

  # 跨节点环境
  multinode.env: |
    NCCL_IB_DISABLE=0
    NCCL_NET_GDR_LEVEL=5
    NCCL_IB_GID_INDEX=3
    NCCL_SOCKET_IFNAME=eth0
    NCCL_IB_HCA=mlx5

6.3 故障处理

// pkg/scheduler/fallback.go
package scheduler

// FallbackAllocator 回退分配器
type FallbackAllocator struct {
    primary   *GPUAllocator
    secondary *GPUAllocator
}

// Allocate 带回退的分配
func (f *FallbackAllocator) Allocate(available []int, count int,
    constraint TopologyConstraint) ([]int, error) {

    // 尝试主分配器(严格拓扑约束)
    result, err := f.primary.Allocate(available, count, constraint)
    if err == nil {
        return result, nil
    }

    // 记录降级事件
    log.Warnf("Primary allocation failed: %v, trying fallback", err)
    metrics.RecordTopologyFallback(constraint.String())

    // 使用次级分配器(宽松约束)
    relaxedConstraint := TopologyConstraint{
        MinLevel: TopologyLevelSYS, // 最宽松
    }

    result, err = f.secondary.Allocate(available, count, relaxedConstraint)
    if err != nil {
        return nil, fmt.Errorf("both primary and fallback allocation failed: %v", err)
    }

    // 添加警告注解
    // 调用者可以根据此信息决定是否继续

    return result, nil
}

总结

本章深入讲解了 GPU 拓扑感知调度的原理与实现:

  1. GPU 拓扑基础:理解 NVLINK、NVSwitch、PCIe、NUMA 等互联方式及其性能特点
  2. 拓扑发现:使用 NVML 和系统文件发现 GPU 拓扑结构
  3. 分配算法:实现 BestFit、FirstFit、Pack、Spread 等分配策略
  4. Ring 优化:通过贪心和 2-opt 优化 AllReduce Ring 顺序
  5. 多节点调度:跨节点拓扑感知和网络带宽优化
  6. Kubernetes 集成:Topology Manager 和调度器插件实现

拓扑感知调度是大规模 AI 训练的关键优化手段,合理利用 GPU 拓扑可以显著提升分布式训练性能。

下一章我们将探讨 弹性 GPU 调度,讲解如何实现 GPU 资源的动态伸缩和抢占式调度。

Prev
GPU 调度器实现
Next
弹性 GPU 调度