拓扑感知调度
概述
在分布式深度学习中,GPU 间的通信带宽直接影响训练性能。不同的 GPU 互联方式(NVLINK、PCIe、跨节点网络)性能差异可达 10 倍以上。拓扑感知调度通过理解硬件拓扑结构,将相关 GPU 任务调度到通信最优的位置,是大规模 AI 训练的关键技术。
1. GPU 拓扑基础
1.1 GPU 互联架构
┌───────────────────────────────────────────────────────────────────────────┐
│ DGX A100 Server Topology │
├───────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ NVSwitch │ │
│ │ (6 chips) │ │
│ └──────┬──────┘ │
│ │ │
│ ┌────────────────────────┼────────────────────────┐ │
│ │ │ │ │
│ ┌─────┴─────┐ ┌─────┴─────┐ ┌─────┴─────┐ │
│ │ │ │ │ │ │ │
│ ┌─┴─┐ ┌─┴─┐ │ ┌─┴─┐ ┌─┴─┐ │ ┌─┴─┐ ┌─┴─┐ │ │
│ │GPU│───│GPU│ │ │GPU│───│GPU│ │ │GPU│───│GPU│ │ │
│ │ 0 │ │ 1 │ │ │ 2 │ │ 3 │ │ │ 4 │ │ 5 │ │ │
│ └─┬─┘ └─┬─┘ │ └─┬─┘ └─┬─┘ │ └─┬─┘ └─┬─┘ │ │
│ │ │ │ │ │ │ │ │ │ │
│ │ NUMA 0 │ │ NUMA 1 │ │ NUMA 2 │ │
│ │ │ │ │ │ │ │ │ │ │
│ ┌─┴───────┴─┐ │ ┌─┴───────┴─┐ │ ┌─┴───────┴─┐ │ │
│ │ CPU 0 │ │ │ CPU 1 │ │ │ CPU 2 │ │ │
│ └───────────┘ │ └───────────┘ │ └───────────┘ │ │
│ │ │ │ │
│ ┌─────┐ ┌─┴─┐ ┌─┴─┐ ┌─────┐ │ │
│ │GPU 6│───│GPU│ │GPU│───│GPU 7│ │ │
│ └─────┘ │ 6 │ │ 7 │ └─────┘ │ │
│ └───┘ └───┘ │ │
│ │ │
│ ┌────────────────────────────────────────────────────────────┐ │ │
│ │ InfiniBand NICs │ │ │
│ │ NIC 0 (NUMA 0) NIC 1 (NUMA 1) NIC 2 (NUMA 2) │ │ │
│ └────────────────────────────────────────────────────────────┘ │ │
│ │ │
└──────────────────────────────────────────────────────────────────┘ │
│
│
带宽对比: │
┌─────────────────────────────────────────────────────────────────────┐ │
│ 互联类型 │ 带宽 (GB/s) │ 延迟 │ 适用场景 │ │
├─────────────────────────────────────────────────────────────────────┤ │
│ NVSwitch │ 600 (双向) │ < 1μs │ 大规模并行训练 │ │
│ NVLINK (A100) │ 300 (单向) │ < 1μs │ GPU直连 │ │
│ PCIe Gen4 │ 32 │ ~2μs │ 普通互联 │ │
│ InfiniBand HDR │ 25 │ ~2μs │ 跨节点通信 │ │
│ 100G Ethernet │ 12.5 │ ~10μs │ 数据中心网络 │ │
└─────────────────────────────────────────────────────────────────────┘ │
1.2 拓扑层次结构
// pkg/topology/types.go
package topology
// TopologyLevel 拓扑层次
type TopologyLevel int
const (
// TopologyLevelSame 同一设备
TopologyLevelSame TopologyLevel = iota
// TopologyLevelNVLink 通过 NVLINK 直连
TopologyLevelNVLink
// TopologyLevelNVSwitch 通过 NVSwitch 连接
TopologyLevelNVSwitch
// TopologyLevelPIX 通过 PCIe switch 连接
TopologyLevelPIX
// TopologyLevelPHB 通过 PCIe Host Bridge 连接
TopologyLevelPHB
// TopologyLevelNUMA 跨 NUMA 节点
TopologyLevelNUMA
// TopologyLevelSYS 通过系统互联(QPI/UPI)
TopologyLevelSYS
// TopologyLevelNode 跨物理节点
TopologyLevelNode
)
// TopologyLevelBandwidth 各层级典型带宽 (GB/s)
var TopologyLevelBandwidth = map[TopologyLevel]float64{
TopologyLevelSame: 1000, // 理论无限
TopologyLevelNVLink: 300, // NVLINK 3.0
TopologyLevelNVSwitch: 600, // NVSwitch 全连接
TopologyLevelPIX: 32, // PCIe 4.0 x16
TopologyLevelPHB: 32, // PCIe Host Bridge
TopologyLevelNUMA: 25, // QPI/UPI
TopologyLevelSYS: 20, // 系统总线
TopologyLevelNode: 25, // InfiniBand HDR
}
// TopologyMatrix GPU 拓扑矩阵
type TopologyMatrix struct {
// GPU 数量
GPUCount int
// 拓扑矩阵,Levels[i][j] 表示 GPU i 到 GPU j 的拓扑层级
Levels [][]TopologyLevel
// NUMA 亲和性
NUMANodes []int
// NIC 亲和性
NICMapping map[int]string
}
// NewTopologyMatrix 创建拓扑矩阵
func NewTopologyMatrix(gpuCount int) *TopologyMatrix {
levels := make([][]TopologyLevel, gpuCount)
for i := range levels {
levels[i] = make([]TopologyLevel, gpuCount)
for j := range levels[i] {
if i == j {
levels[i][j] = TopologyLevelSame
} else {
levels[i][j] = TopologyLevelSYS // 默认
}
}
}
return &TopologyMatrix{
GPUCount: gpuCount,
Levels: levels,
NUMANodes: make([]int, gpuCount),
NICMapping: make(map[int]string),
}
}
// GetBandwidth 获取两个 GPU 间的带宽
func (m *TopologyMatrix) GetBandwidth(gpu1, gpu2 int) float64 {
if gpu1 < 0 || gpu1 >= m.GPUCount || gpu2 < 0 || gpu2 >= m.GPUCount {
return 0
}
return TopologyLevelBandwidth[m.Levels[gpu1][gpu2]]
}
// GetMinBandwidth 获取一组 GPU 的最小带宽
func (m *TopologyMatrix) GetMinBandwidth(gpus []int) float64 {
if len(gpus) <= 1 {
return TopologyLevelBandwidth[TopologyLevelSame]
}
minBW := TopologyLevelBandwidth[TopologyLevelSame]
for i := 0; i < len(gpus); i++ {
for j := i + 1; j < len(gpus); j++ {
bw := m.GetBandwidth(gpus[i], gpus[j])
if bw < minBW {
minBW = bw
}
}
}
return minBW
}
// GetAverageTopologyLevel 获取平均拓扑层级
func (m *TopologyMatrix) GetAverageTopologyLevel(gpus []int) float64 {
if len(gpus) <= 1 {
return float64(TopologyLevelSame)
}
sum := 0.0
count := 0
for i := 0; i < len(gpus); i++ {
for j := i + 1; j < len(gpus); j++ {
sum += float64(m.Levels[gpus[i]][gpus[j]])
count++
}
}
return sum / float64(count)
}
1.3 拓扑发现
// pkg/topology/discovery.go
package topology
import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/NVIDIA/go-nvml/pkg/nvml"
)
// TopologyDiscoverer 拓扑发现器
type TopologyDiscoverer struct {
nvmlInited bool
}
// NewTopologyDiscoverer 创建拓扑发现器
func NewTopologyDiscoverer() (*TopologyDiscoverer, error) {
ret := nvml.Init()
if ret != nvml.SUCCESS {
return nil, fmt.Errorf("failed to initialize NVML: %v", nvml.ErrorString(ret))
}
return &TopologyDiscoverer{nvmlInited: true}, nil
}
// Close 关闭发现器
func (d *TopologyDiscoverer) Close() {
if d.nvmlInited {
nvml.Shutdown()
}
}
// Discover 发现 GPU 拓扑
func (d *TopologyDiscoverer) Discover() (*TopologyMatrix, error) {
count, ret := nvml.DeviceGetCount()
if ret != nvml.SUCCESS {
return nil, fmt.Errorf("failed to get device count: %v", nvml.ErrorString(ret))
}
matrix := NewTopologyMatrix(count)
// 发现 GPU 间拓扑关系
for i := 0; i < count; i++ {
device1, ret := nvml.DeviceGetHandleByIndex(i)
if ret != nvml.SUCCESS {
continue
}
// 获取 NUMA 节点
pciInfo, ret := device1.GetPciInfo()
if ret == nvml.SUCCESS {
busID := fmt.Sprintf("%04x:%02x:%02x.0", pciInfo.Domain, pciInfo.Bus, pciInfo.Device)
numa, err := d.getNUMANode(busID)
if err == nil {
matrix.NUMANodes[i] = numa
}
}
for j := i + 1; j < count; j++ {
device2, ret := nvml.DeviceGetHandleByIndex(j)
if ret != nvml.SUCCESS {
continue
}
level := d.getTopologyLevel(device1, device2)
matrix.Levels[i][j] = level
matrix.Levels[j][i] = level
}
}
// 发现 NIC 亲和性
d.discoverNICMapping(matrix)
return matrix, nil
}
// getTopologyLevel 获取两个 GPU 间的拓扑层级
func (d *TopologyDiscoverer) getTopologyLevel(dev1, dev2 nvml.Device) TopologyLevel {
// 首先检查 NVLINK 连接
hasNVLink := d.checkNVLinkConnection(dev1, dev2)
if hasNVLink {
// 检查是否通过 NVSwitch
// NVSwitch 环境下所有 GPU 都有 NVLINK 连接
count, _ := nvml.DeviceGetCount()
nvlinkCount := 0
for i := 0; i < count; i++ {
dev, _ := nvml.DeviceGetHandleByIndex(i)
if d.checkNVLinkConnection(dev1, dev) {
nvlinkCount++
}
}
// 如果 GPU 与所有其他 GPU 都有 NVLINK,说明是 NVSwitch
if nvlinkCount >= count-1 {
return TopologyLevelNVSwitch
}
return TopologyLevelNVLink
}
// 使用 NVML 拓扑 API
pathInfo, ret := nvml.DeviceGetTopologyCommonAncestor(dev1, dev2)
if ret != nvml.SUCCESS {
return TopologyLevelSYS
}
switch pathInfo {
case nvml.TOPOLOGY_INTERNAL:
return TopologyLevelSame
case nvml.TOPOLOGY_SINGLE:
return TopologyLevelPIX
case nvml.TOPOLOGY_MULTIPLE:
return TopologyLevelPHB
case nvml.TOPOLOGY_HOSTBRIDGE:
return TopologyLevelPHB
case nvml.TOPOLOGY_NODE:
return TopologyLevelNUMA
case nvml.TOPOLOGY_SYSTEM:
return TopologyLevelSYS
default:
return TopologyLevelSYS
}
}
// checkNVLinkConnection 检查 NVLINK 连接
func (d *TopologyDiscoverer) checkNVLinkConnection(dev1, dev2 nvml.Device) bool {
// 获取 dev2 的索引
idx2, ret := dev2.GetIndex()
if ret != nvml.SUCCESS {
return false
}
// 检查 dev1 的所有 NVLINK
for link := uint32(0); link < 12; link++ { // A100 最多 12 条 NVLINK
// 获取远端 PCIe 信息
remotePci, ret := dev1.GetNvLinkRemotePciInfo(link)
if ret != nvml.SUCCESS {
continue
}
// 通过 PCIe 信息找到对应设备
remoteDev, ret := nvml.DeviceGetHandleByPciBusId(remotePci.BusIdLegacy[:])
if ret != nvml.SUCCESS {
continue
}
remoteIdx, ret := remoteDev.GetIndex()
if ret == nvml.SUCCESS && remoteIdx == idx2 {
return true
}
}
return false
}
// getNUMANode 获取设备的 NUMA 节点
func (d *TopologyDiscoverer) getNUMANode(busID string) (int, error) {
// 读取 /sys/bus/pci/devices/{busID}/numa_node
path := filepath.Join("/sys/bus/pci/devices", busID, "numa_node")
data, err := os.ReadFile(path)
if err != nil {
return 0, err
}
numa, err := strconv.Atoi(strings.TrimSpace(string(data)))
if err != nil {
return 0, err
}
// NUMA -1 表示无亲和性
if numa < 0 {
return 0, nil
}
return numa, nil
}
// discoverNICMapping 发现 NIC 映射
func (d *TopologyDiscoverer) discoverNICMapping(matrix *TopologyMatrix) {
// 读取 InfiniBand 设备
ibDevices, err := filepath.Glob("/sys/class/infiniband/*/device/numa_node")
if err != nil {
return
}
for _, path := range ibDevices {
data, err := os.ReadFile(path)
if err != nil {
continue
}
numa, err := strconv.Atoi(strings.TrimSpace(string(data)))
if err != nil || numa < 0 {
continue
}
// 提取设备名
parts := strings.Split(path, "/")
if len(parts) >= 5 {
deviceName := parts[4]
// 为该 NUMA 上的 GPU 关联此 NIC
for gpuIdx, gpuNuma := range matrix.NUMANodes {
if gpuNuma == numa {
matrix.NICMapping[gpuIdx] = deviceName
}
}
}
}
}
// PrintTopology 打印拓扑信息
func (d *TopologyDiscoverer) PrintTopology(matrix *TopologyMatrix) {
fmt.Println("GPU Topology Matrix:")
fmt.Println("====================")
// 打印表头
fmt.Printf("%6s", "")
for i := 0; i < matrix.GPUCount; i++ {
fmt.Printf(" GPU%-3d", i)
}
fmt.Println()
// 打印矩阵
levelNames := map[TopologyLevel]string{
TopologyLevelSame: "X",
TopologyLevelNVLink: "NVL",
TopologyLevelNVSwitch: "NVS",
TopologyLevelPIX: "PIX",
TopologyLevelPHB: "PHB",
TopologyLevelNUMA: "NODE",
TopologyLevelSYS: "SYS",
}
for i := 0; i < matrix.GPUCount; i++ {
fmt.Printf("GPU%-3d", i)
for j := 0; j < matrix.GPUCount; j++ {
name := levelNames[matrix.Levels[i][j]]
fmt.Printf(" %-5s", name)
}
fmt.Printf(" NUMA=%d", matrix.NUMANodes[i])
if nic, ok := matrix.NICMapping[i]; ok {
fmt.Printf(" NIC=%s", nic)
}
fmt.Println()
}
fmt.Println()
fmt.Println("Legend:")
fmt.Println(" X = same device")
fmt.Println(" NVL = NVLink")
fmt.Println(" NVS = NVSwitch")
fmt.Println(" PIX = PCIe switch")
fmt.Println(" PHB = PCIe Host Bridge")
fmt.Println(" NODE = NUMA node")
fmt.Println(" SYS = System interconnect")
}
2. 拓扑感知调度算法
2.1 GPU 分配策略
// pkg/scheduler/allocation.go
package scheduler
import (
"fmt"
"sort"
)
// AllocationPolicy GPU 分配策略
type AllocationPolicy string
const (
// PolicyBestFit 最佳适配:选择拓扑最优的 GPU 组合
PolicyBestFit AllocationPolicy = "BestFit"
// PolicyFirstFit 首次适配:选择第一个满足条件的组合
PolicyFirstFit AllocationPolicy = "FirstFit"
// PolicySpread 分散:尽量分布在不同物理位置
PolicySpread AllocationPolicy = "Spread"
// PolicyPack 紧凑:尽量使用同一物理位置
PolicyPack AllocationPolicy = "Pack"
)
// GPUAllocator GPU 分配器
type GPUAllocator struct {
topology *TopologyMatrix
policy AllocationPolicy
}
// NewGPUAllocator 创建分配器
func NewGPUAllocator(topology *TopologyMatrix, policy AllocationPolicy) *GPUAllocator {
return &GPUAllocator{
topology: topology,
policy: policy,
}
}
// Allocate 分配 GPU
func (a *GPUAllocator) Allocate(available []int, count int, constraint TopologyConstraint) ([]int, error) {
if len(available) < count {
return nil, fmt.Errorf("insufficient GPUs: need %d, have %d", count, len(available))
}
if count == 1 {
return []int{available[0]}, nil
}
switch a.policy {
case PolicyBestFit:
return a.allocateBestFit(available, count, constraint)
case PolicyFirstFit:
return a.allocateFirstFit(available, count, constraint)
case PolicySpread:
return a.allocateSpread(available, count)
case PolicyPack:
return a.allocatePack(available, count)
default:
return a.allocateBestFit(available, count, constraint)
}
}
// TopologyConstraint 拓扑约束
type TopologyConstraint struct {
// 最小拓扑层级要求
MinLevel TopologyLevel
// 是否必须同一 NUMA
SameNUMA bool
// 是否必须有 NVLINK
RequireNVLink bool
// 最小带宽要求 (GB/s)
MinBandwidth float64
}
// allocateBestFit 最佳适配分配
func (a *GPUAllocator) allocateBestFit(available []int, count int, constraint TopologyConstraint) ([]int, error) {
// 生成所有可能的组合
combinations := a.generateCombinations(available, count)
if len(combinations) == 0 {
return nil, fmt.Errorf("no valid combinations")
}
// 评估每个组合
type scoredCombination struct {
gpus []int
score float64
}
var valid []scoredCombination
for _, combo := range combinations {
// 检查约束
if !a.checkConstraint(combo, constraint) {
continue
}
// 计算分数
score := a.scoreTopology(combo)
valid = append(valid, scoredCombination{gpus: combo, score: score})
}
if len(valid) == 0 {
return nil, fmt.Errorf("no combination satisfies constraints")
}
// 按分数排序(分数越高越好)
sort.Slice(valid, func(i, j int) bool {
return valid[i].score > valid[j].score
})
return valid[0].gpus, nil
}
// allocateFirstFit 首次适配
func (a *GPUAllocator) allocateFirstFit(available []int, count int, constraint TopologyConstraint) ([]int, error) {
// 优先按 NUMA 分组
numaGroups := make(map[int][]int)
for _, gpu := range available {
numa := a.topology.NUMANodes[gpu]
numaGroups[numa] = append(numaGroups[numa], gpu)
}
// 首先尝试同一 NUMA
for _, gpus := range numaGroups {
if len(gpus) >= count {
combo := gpus[:count]
if a.checkConstraint(combo, constraint) {
return combo, nil
}
}
}
// 然后尝试跨 NUMA
if len(available) >= count {
combo := available[:count]
if a.checkConstraint(combo, constraint) {
return combo, nil
}
}
return nil, fmt.Errorf("no valid allocation found")
}
// allocateSpread 分散分配
func (a *GPUAllocator) allocateSpread(available []int, count int) ([]int, error) {
// 按 NUMA 分组
numaGroups := make(map[int][]int)
for _, gpu := range available {
numa := a.topology.NUMANodes[gpu]
numaGroups[numa] = append(numaGroups[numa], gpu)
}
// 轮询分配
result := make([]int, 0, count)
numaOrder := make([]int, 0, len(numaGroups))
for numa := range numaGroups {
numaOrder = append(numaOrder, numa)
}
sort.Ints(numaOrder)
idx := 0
for len(result) < count {
numa := numaOrder[idx%len(numaOrder)]
if len(numaGroups[numa]) > 0 {
result = append(result, numaGroups[numa][0])
numaGroups[numa] = numaGroups[numa][1:]
}
idx++
// 防止无限循环
if idx > count*len(numaOrder) {
break
}
}
if len(result) < count {
return nil, fmt.Errorf("cannot spread allocate %d GPUs", count)
}
return result, nil
}
// allocatePack 紧凑分配
func (a *GPUAllocator) allocatePack(available []int, count int) ([]int, error) {
// 按 NUMA 分组
numaGroups := make(map[int][]int)
for _, gpu := range available {
numa := a.topology.NUMANodes[gpu]
numaGroups[numa] = append(numaGroups[numa], gpu)
}
// 找到包含最多 GPU 的 NUMA
var maxNUMA int
maxCount := 0
for numa, gpus := range numaGroups {
if len(gpus) > maxCount {
maxCount = len(gpus)
maxNUMA = numa
}
}
// 优先从该 NUMA 分配
result := make([]int, 0, count)
if len(numaGroups[maxNUMA]) >= count {
return numaGroups[maxNUMA][:count], nil
}
result = append(result, numaGroups[maxNUMA]...)
// 从其他 NUMA 补充
for numa, gpus := range numaGroups {
if numa == maxNUMA {
continue
}
for _, gpu := range gpus {
if len(result) >= count {
break
}
result = append(result, gpu)
}
}
if len(result) < count {
return nil, fmt.Errorf("cannot pack allocate %d GPUs", count)
}
return result, nil
}
// generateCombinations 生成组合
func (a *GPUAllocator) generateCombinations(elements []int, k int) [][]int {
var result [][]int
var combine func(start int, current []int)
combine = func(start int, current []int) {
if len(current) == k {
combo := make([]int, k)
copy(combo, current)
result = append(result, combo)
return
}
for i := start; i <= len(elements)-(k-len(current)); i++ {
combine(i+1, append(current, elements[i]))
}
}
combine(0, nil)
return result
}
// checkConstraint 检查约束
func (a *GPUAllocator) checkConstraint(gpus []int, constraint TopologyConstraint) bool {
if len(gpus) <= 1 {
return true
}
// 检查 NUMA 约束
if constraint.SameNUMA {
numa := a.topology.NUMANodes[gpus[0]]
for _, gpu := range gpus[1:] {
if a.topology.NUMANodes[gpu] != numa {
return false
}
}
}
// 检查最小拓扑层级
for i := 0; i < len(gpus); i++ {
for j := i + 1; j < len(gpus); j++ {
level := a.topology.Levels[gpus[i]][gpus[j]]
if level > constraint.MinLevel {
return false
}
}
}
// 检查 NVLINK 要求
if constraint.RequireNVLink {
for i := 0; i < len(gpus); i++ {
for j := i + 1; j < len(gpus); j++ {
level := a.topology.Levels[gpus[i]][gpus[j]]
if level != TopologyLevelNVLink && level != TopologyLevelNVSwitch {
return false
}
}
}
}
// 检查最小带宽
if constraint.MinBandwidth > 0 {
minBW := a.topology.GetMinBandwidth(gpus)
if minBW < constraint.MinBandwidth {
return false
}
}
return true
}
// scoreTopology 计算拓扑评分
func (a *GPUAllocator) scoreTopology(gpus []int) float64 {
if len(gpus) <= 1 {
return 100.0
}
// 计算平均带宽
totalBW := 0.0
count := 0
for i := 0; i < len(gpus); i++ {
for j := i + 1; j < len(gpus); j++ {
totalBW += a.topology.GetBandwidth(gpus[i], gpus[j])
count++
}
}
avgBW := totalBW / float64(count)
// 归一化到 [0, 100]
// 假设最大带宽 600 GB/s (NVSwitch)
score := avgBW / 600.0 * 100.0
if score > 100.0 {
score = 100.0
}
return score
}
2.2 NCCL 拓扑优化
// pkg/scheduler/nccl.go
package scheduler
import (
"fmt"
"strings"
)
// NCCLTopologyGenerator NCCL 拓扑配置生成器
type NCCLTopologyGenerator struct {
topology *TopologyMatrix
}
// NewNCCLTopologyGenerator 创建生成器
func NewNCCLTopologyGenerator(topology *TopologyMatrix) *NCCLTopologyGenerator {
return &NCCLTopologyGenerator{topology: topology}
}
// GenerateNCCLEnv 生成 NCCL 环境变量
func (g *NCCLTopologyGenerator) GenerateNCCLEnv(gpus []int) map[string]string {
env := make(map[string]string)
// 基础配置
env["NCCL_DEBUG"] = "INFO"
env["NCCL_DEBUG_SUBSYS"] = "INIT,COLL"
// 根据拓扑配置 NCCL
hasNVLink := g.hasNVLinkBetween(gpus)
hasNVSwitch := g.hasNVSwitch(gpus)
sameNUMA := g.isSameNUMA(gpus)
if hasNVSwitch {
// NVSwitch 环境
env["NCCL_NVLS_ENABLE"] = "1" // 启用 NVLink SHARP
env["NCCL_ALGO"] = "Ring,Tree" // 使用 Ring 和 Tree 算法
env["NCCL_PROTO"] = "LL,LL128,Simple" // 使用所有协议
} else if hasNVLink {
// NVLINK 环境
env["NCCL_P2P_LEVEL"] = "NVL"
env["NCCL_ALGO"] = "Ring"
env["NCCL_PROTO"] = "LL,LL128"
} else {
// PCIe 环境
env["NCCL_P2P_LEVEL"] = "PHB"
env["NCCL_ALGO"] = "Ring"
env["NCCL_PROTO"] = "Simple"
}
// NUMA 优化
if sameNUMA {
env["NCCL_SHM_USE_CUDA_MEMCPY"] = "0"
} else {
env["NCCL_SHM_USE_CUDA_MEMCPY"] = "1"
}
// 设置 CUDA 可见设备
cudaDevices := make([]string, len(gpus))
for i, gpu := range gpus {
cudaDevices[i] = fmt.Sprintf("%d", gpu)
}
env["CUDA_VISIBLE_DEVICES"] = strings.Join(cudaDevices, ",")
return env
}
// GenerateNCCLTopologyFile 生成 NCCL 拓扑文件
func (g *NCCLTopologyGenerator) GenerateNCCLTopologyFile(gpus []int) string {
var sb strings.Builder
sb.WriteString("# NCCL Topology File\n")
sb.WriteString("# Generated by GPU Scheduler\n\n")
sb.WriteString("<system version=\"1\">\n")
// CPU 信息
numaNums := make(map[int]bool)
for _, gpu := range gpus {
numa := g.topology.NUMANodes[gpu]
numaNums[numa] = true
}
for numa := range numaNums {
sb.WriteString(fmt.Sprintf(" <cpu numaid=\"%d\" affinity=\"0x%x\" arch=\"x86_64\" vendor=\"AuthenticAMD\">\n",
numa, 1<<numa))
// 该 NUMA 上的 GPU
for _, gpu := range gpus {
if g.topology.NUMANodes[gpu] == numa {
sb.WriteString(fmt.Sprintf(" <pci busid=\"%s\" class=\"0x030200\" link_speed=\"16 GT/s\" link_width=\"16\">\n",
g.getGPUBusID(gpu)))
sb.WriteString(fmt.Sprintf(" <gpu dev=\"%d\" sm=\"80\" rank=\"%d\"/>\n", gpu, gpu))
sb.WriteString(" </pci>\n")
}
}
sb.WriteString(" </cpu>\n")
}
// NVLINK 连接
sb.WriteString(" <nvlink>\n")
for i := 0; i < len(gpus); i++ {
for j := i + 1; j < len(gpus); j++ {
if g.isNVLinkConnected(gpus[i], gpus[j]) {
sb.WriteString(fmt.Sprintf(" <link gpu=\"%d\" tgpu=\"%d\" count=\"12\" bw=\"300\"/>\n",
gpus[i], gpus[j]))
}
}
}
sb.WriteString(" </nvlink>\n")
sb.WriteString("</system>\n")
return sb.String()
}
// hasNVLinkBetween 检查 GPU 组是否有 NVLINK 连接
func (g *NCCLTopologyGenerator) hasNVLinkBetween(gpus []int) bool {
for i := 0; i < len(gpus); i++ {
for j := i + 1; j < len(gpus); j++ {
if g.isNVLinkConnected(gpus[i], gpus[j]) {
return true
}
}
}
return false
}
// hasNVSwitch 检查是否使用 NVSwitch
func (g *NCCLTopologyGenerator) hasNVSwitch(gpus []int) bool {
// NVSwitch 特征:所有 GPU 之间都有 NVLINK
for i := 0; i < len(gpus); i++ {
for j := i + 1; j < len(gpus); j++ {
level := g.topology.Levels[gpus[i]][gpus[j]]
if level != TopologyLevelNVSwitch && level != TopologyLevelNVLink {
return false
}
}
}
return true
}
// isSameNUMA 检查是否在同一 NUMA
func (g *NCCLTopologyGenerator) isSameNUMA(gpus []int) bool {
if len(gpus) <= 1 {
return true
}
numa := g.topology.NUMANodes[gpus[0]]
for _, gpu := range gpus[1:] {
if g.topology.NUMANodes[gpu] != numa {
return false
}
}
return true
}
// isNVLinkConnected 检查两个 GPU 是否 NVLINK 连接
func (g *NCCLTopologyGenerator) isNVLinkConnected(gpu1, gpu2 int) bool {
level := g.topology.Levels[gpu1][gpu2]
return level == TopologyLevelNVLink || level == TopologyLevelNVSwitch
}
// getGPUBusID 获取 GPU PCIe Bus ID
func (g *NCCLTopologyGenerator) getGPUBusID(gpu int) string {
// 实际实现应从 NVML 获取
return fmt.Sprintf("0000:%02x:00.0", gpu*8+1)
}
2.3 Ring 拓扑优化
// pkg/scheduler/ring.go
package scheduler
import (
"fmt"
"math"
)
// RingOptimizer Ring 拓扑优化器
type RingOptimizer struct {
topology *TopologyMatrix
}
// NewRingOptimizer 创建 Ring 优化器
func NewRingOptimizer(topology *TopologyMatrix) *RingOptimizer {
return &RingOptimizer{topology: topology}
}
// OptimizeRing 优化 GPU Ring 顺序
// 返回使 Ring 通信带宽最大的 GPU 排列顺序
func (o *RingOptimizer) OptimizeRing(gpus []int) []int {
if len(gpus) <= 2 {
return gpus
}
// 使用贪心算法构建 Ring
// 从任意节点开始,每次选择与当前节点带宽最大的下一个节点
result := make([]int, 0, len(gpus))
remaining := make(map[int]bool)
for _, gpu := range gpus {
remaining[gpu] = true
}
// 从第一个 GPU 开始
current := gpus[0]
result = append(result, current)
delete(remaining, current)
for len(remaining) > 0 {
// 找带宽最大的下一个节点
bestNext := -1
bestBW := float64(-1)
for gpu := range remaining {
bw := o.topology.GetBandwidth(current, gpu)
if bw > bestBW {
bestBW = bw
bestNext = gpu
}
}
result = append(result, bestNext)
delete(remaining, bestNext)
current = bestNext
}
// 使用 2-opt 局部优化
result = o.twoOpt(result)
return result
}
// twoOpt 2-opt 局部搜索优化
func (o *RingOptimizer) twoOpt(ring []int) []int {
n := len(ring)
improved := true
for improved {
improved = false
for i := 0; i < n-1; i++ {
for j := i + 2; j < n; j++ {
// 计算当前代价
currentCost := o.edgeCost(ring[i], ring[i+1]) +
o.edgeCost(ring[j], ring[(j+1)%n])
// 计算交换后代价
newCost := o.edgeCost(ring[i], ring[j]) +
o.edgeCost(ring[i+1], ring[(j+1)%n])
if newCost < currentCost {
// 反转 [i+1, j] 区间
for l, r := i+1, j; l < r; l, r = l+1, r-1 {
ring[l], ring[r] = ring[r], ring[l]
}
improved = true
}
}
}
}
return ring
}
// edgeCost 计算边的代价(带宽的倒数)
func (o *RingOptimizer) edgeCost(gpu1, gpu2 int) float64 {
bw := o.topology.GetBandwidth(gpu1, gpu2)
if bw <= 0 {
return math.MaxFloat64
}
return 1.0 / bw
}
// GetRingBandwidth 获取 Ring 的有效带宽(最小边带宽)
func (o *RingOptimizer) GetRingBandwidth(ring []int) float64 {
if len(ring) < 2 {
return 0
}
minBW := math.MaxFloat64
for i := 0; i < len(ring); i++ {
next := (i + 1) % len(ring)
bw := o.topology.GetBandwidth(ring[i], ring[next])
if bw < minBW {
minBW = bw
}
}
return minBW
}
// CalculateAllReduceBandwidth 计算 AllReduce 有效带宽
// 使用 Ring AllReduce 算法
func (o *RingOptimizer) CalculateAllReduceBandwidth(ring []int, dataSize int64) float64 {
if len(ring) < 2 {
return 0
}
n := len(ring)
ringBW := o.GetRingBandwidth(ring)
// Ring AllReduce 公式:
// 时间 = 2 * (n-1) * dataSize / (n * bandwidth)
// 有效带宽 = dataSize / 时间 = n * bandwidth / (2 * (n-1))
effectiveBW := float64(n) * ringBW / (2.0 * float64(n-1))
return effectiveBW
}
// PrintRing 打印 Ring 信息
func (o *RingOptimizer) PrintRing(ring []int) {
fmt.Printf("Ring order: ")
for i, gpu := range ring {
if i > 0 {
bw := o.topology.GetBandwidth(ring[i-1], gpu)
fmt.Printf(" --%.0fGB/s--> ", bw)
}
fmt.Printf("GPU%d", gpu)
}
// 打印首尾连接
bw := o.topology.GetBandwidth(ring[len(ring)-1], ring[0])
fmt.Printf(" --%.0fGB/s--> GPU%d\n", bw, ring[0])
fmt.Printf("Ring bandwidth: %.0f GB/s\n", o.GetRingBandwidth(ring))
}
3. Kubernetes 拓扑感知调度实现
3.1 Topology Manager 集成
# kubelet-config.yaml
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
# 启用 Topology Manager
topologyManagerPolicy: "single-numa-node" # 或 "best-effort", "restricted", "none"
topologyManagerScope: "container" # 或 "pod"
# CPU Manager
cpuManagerPolicy: "static"
cpuManagerReconcilePeriod: "10s"
# Memory Manager (可选)
memoryManagerPolicy: "Static"
reservedMemory:
- numaNode: 0
limits:
memory: 1Gi
- numaNode: 1
limits:
memory: 1Gi
3.2 扩展 Device Plugin 支持拓扑
// pkg/deviceplugin/topology.go
package deviceplugin
import (
"context"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)
// TopologyAwareDevicePlugin 拓扑感知 Device Plugin
type TopologyAwareDevicePlugin struct {
BaseDevicePlugin
topology *TopologyMatrix
}
// GetDevicePluginOptions 返回设备选项
func (p *TopologyAwareDevicePlugin) GetDevicePluginOptions(ctx context.Context,
_ *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
return &pluginapi.DevicePluginOptions{
// 请求优先设备分配
PreStartRequired: true,
GetPreferredAllocationAvailable: true,
}, nil
}
// GetPreferredAllocation 返回首选的设备分配
func (p *TopologyAwareDevicePlugin) GetPreferredAllocation(ctx context.Context,
req *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
resp := &pluginapi.PreferredAllocationResponse{}
for _, containerReq := range req.ContainerRequests {
// 获取可用设备
available := containerReq.AvailableDeviceIDs
mustInclude := containerReq.MustIncludeDeviceIDs
count := int(containerReq.AllocationSize)
// 转换为 GPU 索引
availableIndices := p.deviceIDsToIndices(available)
mustIncludeIndices := p.deviceIDsToIndices(mustInclude)
// 使用拓扑感知分配
allocator := NewGPUAllocator(p.topology, PolicyBestFit)
// 合并必须包含的设备
var toAllocate []int
if len(mustIncludeIndices) > 0 {
toAllocate = mustIncludeIndices
count -= len(mustIncludeIndices)
// 从可用中移除已包含的
availableIndices = p.removeIndices(availableIndices, mustIncludeIndices)
}
if count > 0 {
constraint := TopologyConstraint{
MinLevel: TopologyLevelPHB,
RequireNVLink: true, // 尽量使用 NVLINK
}
allocated, err := allocator.Allocate(availableIndices, count, constraint)
if err != nil {
// 降级:不要求 NVLINK
constraint.RequireNVLink = false
allocated, err = allocator.Allocate(availableIndices, count, constraint)
}
if err == nil {
toAllocate = append(toAllocate, allocated...)
}
}
// 转换回设备 ID
deviceIDs := p.indicesToDeviceIDs(toAllocate)
resp.ContainerResponses = append(resp.ContainerResponses,
&pluginapi.ContainerPreferredAllocationResponse{
DeviceIDs: deviceIDs,
})
}
return resp, nil
}
// Allocate 分配设备
func (p *TopologyAwareDevicePlugin) Allocate(ctx context.Context,
req *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
resp := &pluginapi.AllocateResponse{}
for _, containerReq := range req.ContainerRequests {
deviceIDs := containerReq.DevicesIDs
gpuIndices := p.deviceIDsToIndices(deviceIDs)
// 生成 NCCL 配置
ncclGen := NewNCCLTopologyGenerator(p.topology)
ncclEnv := ncclGen.GenerateNCCLEnv(gpuIndices)
// 优化 Ring 顺序
ringOpt := NewRingOptimizer(p.topology)
optimizedRing := ringOpt.OptimizeRing(gpuIndices)
// 生成拓扑文件
topoFile := ncclGen.GenerateNCCLTopologyFile(optimizedRing)
containerResp := &pluginapi.ContainerAllocateResponse{
Envs: ncclEnv,
Annotations: make(map[string]string),
}
// 添加拓扑注解
containerResp.Annotations["gpu.topology/ring-order"] =
p.indicesToString(optimizedRing)
containerResp.Annotations["gpu.topology/bandwidth"] =
fmt.Sprintf("%.0f", ringOpt.GetRingBandwidth(optimizedRing))
// 挂载拓扑文件
containerResp.Mounts = append(containerResp.Mounts, &pluginapi.Mount{
ContainerPath: "/etc/nccl-topo.xml",
HostPath: p.writeTopoFile(topoFile),
ReadOnly: true,
})
// NVIDIA 运行时所需的设备
containerResp.Envs["NVIDIA_VISIBLE_DEVICES"] = strings.Join(deviceIDs, ",")
resp.ContainerResponses = append(resp.ContainerResponses, containerResp)
}
return resp, nil
}
// deviceIDsToIndices 设备 ID 转索引
func (p *TopologyAwareDevicePlugin) deviceIDsToIndices(ids []string) []int {
indices := make([]int, len(ids))
for i, id := range ids {
// 假设 ID 格式为 "GPU-{uuid}" 或直接是索引
indices[i] = p.getGPUIndex(id)
}
return indices
}
// indicesToDeviceIDs 索引转设备 ID
func (p *TopologyAwareDevicePlugin) indicesToDeviceIDs(indices []int) []string {
ids := make([]string, len(indices))
for i, idx := range indices {
ids[i] = p.getDeviceID(idx)
}
return ids
}
3.3 调度器插件中的拓扑处理
// pkg/scheduler/topology_plugin.go
package scheduler
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
const (
TopologyPluginName = "GPUTopologyAware"
// 注解
AnnotationTopologyPolicy = "gpu.topology/policy"
AnnotationMinBandwidth = "gpu.topology/min-bandwidth"
AnnotationRequireNVLink = "gpu.topology/require-nvlink"
// 拓扑策略
TopologyPolicySingleNUMA = "single-numa"
TopologyPolicyNVLink = "nvlink"
TopologyPolicyBestEffort = "best-effort"
TopologyPolicySpread = "spread"
)
// TopologyAwarePlugin 拓扑感知调度插件
type TopologyAwarePlugin struct {
handle framework.Handle
topologyManager *TopologyManager
}
// TopologyManager 拓扑管理器
type TopologyManager struct {
// 节点拓扑缓存
nodeTopologies map[string]*TopologyMatrix
}
// PreFilter 预过滤
func (p *TopologyAwarePlugin) PreFilter(ctx context.Context, state *framework.CycleState,
pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
// 解析拓扑策略
policy := p.parseTopologyPolicy(pod)
// 存储到状态
state.Write("TopologyPolicy", policy)
return nil, framework.NewStatus(framework.Success)
}
// Filter 过滤
func (p *TopologyAwarePlugin) Filter(ctx context.Context, state *framework.CycleState,
pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
// 获取 GPU 请求数量
gpuCount := p.getGPURequest(pod)
if gpuCount == 0 {
return framework.NewStatus(framework.Success)
}
// 获取拓扑策略
policyData, err := state.Read("TopologyPolicy")
if err != nil {
return framework.NewStatus(framework.Success)
}
policy := policyData.(*TopologyPolicy)
// 获取节点拓扑
nodeName := nodeInfo.Node().Name
topo := p.topologyManager.GetTopology(nodeName)
if topo == nil {
// 无拓扑信息,使用默认行为
return framework.NewStatus(framework.Success)
}
// 获取可用 GPU
availableGPUs := p.getAvailableGPUs(nodeInfo)
// 检查是否能满足拓扑约束
allocator := NewGPUAllocator(topo, PolicyBestFit)
_, err = allocator.Allocate(availableGPUs, int(gpuCount), policy.Constraint)
if err != nil {
return framework.NewStatus(framework.Unschedulable,
fmt.Sprintf("topology constraint cannot be satisfied: %v", err))
}
return framework.NewStatus(framework.Success)
}
// Score 打分
func (p *TopologyAwarePlugin) Score(ctx context.Context, state *framework.CycleState,
pod *v1.Pod, nodeName string) (int64, *framework.Status) {
gpuCount := p.getGPURequest(pod)
if gpuCount == 0 {
return 50, framework.NewStatus(framework.Success)
}
// 获取节点拓扑
topo := p.topologyManager.GetTopology(nodeName)
if topo == nil {
return 50, framework.NewStatus(framework.Success)
}
// 计算拓扑得分
score := p.calculateTopologyScore(topo, int(gpuCount))
return score, framework.NewStatus(framework.Success)
}
// TopologyPolicy 拓扑策略
type TopologyPolicy struct {
Policy string
Constraint TopologyConstraint
}
// parseTopologyPolicy 解析拓扑策略
func (p *TopologyAwarePlugin) parseTopologyPolicy(pod *v1.Pod) *TopologyPolicy {
policy := &TopologyPolicy{
Policy: TopologyPolicyBestEffort,
Constraint: TopologyConstraint{
MinLevel: TopologyLevelSYS,
},
}
if pod.Annotations == nil {
return policy
}
// 解析策略
if policyStr, ok := pod.Annotations[AnnotationTopologyPolicy]; ok {
policy.Policy = policyStr
switch policyStr {
case TopologyPolicySingleNUMA:
policy.Constraint.SameNUMA = true
case TopologyPolicyNVLink:
policy.Constraint.RequireNVLink = true
}
}
// 解析最小带宽
if bwStr, ok := pod.Annotations[AnnotationMinBandwidth]; ok {
var bw float64
fmt.Sscanf(bwStr, "%f", &bw)
policy.Constraint.MinBandwidth = bw
}
// 解析 NVLINK 要求
if nvlink, ok := pod.Annotations[AnnotationRequireNVLink]; ok && nvlink == "true" {
policy.Constraint.RequireNVLink = true
}
return policy
}
// calculateTopologyScore 计算拓扑得分
func (p *TopologyAwarePlugin) calculateTopologyScore(topo *TopologyMatrix, gpuCount int) int64 {
// 基于可用 GPU 的拓扑质量评分
// 收集可用 GPU 索引
availableGPUs := make([]int, topo.GPUCount)
for i := 0; i < topo.GPUCount; i++ {
availableGPUs[i] = i
}
// 使用最佳适配找到最优组合
allocator := NewGPUAllocator(topo, PolicyBestFit)
allocated, err := allocator.Allocate(availableGPUs, gpuCount, TopologyConstraint{})
if err != nil {
return 0
}
// 计算分配组合的带宽得分
minBW := topo.GetMinBandwidth(allocated)
// 归一化到 [0, 100]
// 基准:NVSwitch 600 GB/s = 100 分
score := int64(minBW / 6.0)
if score > 100 {
score = 100
}
return score
}
// getGPURequest 获取 GPU 请求数量
func (p *TopologyAwarePlugin) getGPURequest(pod *v1.Pod) int64 {
var total int64
for _, c := range pod.Spec.Containers {
if c.Resources.Requests != nil {
if gpu, ok := c.Resources.Requests["nvidia.com/gpu"]; ok {
total += gpu.Value()
}
}
}
return total
}
// getAvailableGPUs 获取节点可用 GPU
func (p *TopologyAwarePlugin) getAvailableGPUs(nodeInfo *framework.NodeInfo) []int {
// 实际实现需要从 Device Plugin 或 CRD 获取
// 这里简化处理
var available []int
allocatable := nodeInfo.Allocatable
if gpuRes, ok := allocatable.ScalarResources["nvidia.com/gpu"]; ok {
for i := 0; i < int(gpuRes); i++ {
available = append(available, i)
}
}
return available
}
4. 跨节点拓扑感知
4.1 多节点 GPU 拓扑
// pkg/topology/cluster.go
package topology
import (
"fmt"
"sort"
)
// ClusterTopology 集群拓扑
type ClusterTopology struct {
// 节点拓扑
NodeTopologies map[string]*TopologyMatrix
// 节点间网络拓扑
NetworkTopology *NetworkTopology
}
// NetworkTopology 网络拓扑
type NetworkTopology struct {
// 节点数量
NodeCount int
// 网络带宽矩阵 (GB/s)
Bandwidth [][]float64
// 网络延迟矩阵 (μs)
Latency [][]float64
// 节点名映射
NodeIndex map[string]int
}
// NewClusterTopology 创建集群拓扑
func NewClusterTopology() *ClusterTopology {
return &ClusterTopology{
NodeTopologies: make(map[string]*TopologyMatrix),
NetworkTopology: &NetworkTopology{
NodeIndex: make(map[string]int),
},
}
}
// AddNode 添加节点
func (c *ClusterTopology) AddNode(nodeName string, topo *TopologyMatrix) {
c.NodeTopologies[nodeName] = topo
// 更新网络拓扑
if _, exists := c.NetworkTopology.NodeIndex[nodeName]; !exists {
idx := c.NetworkTopology.NodeCount
c.NetworkTopology.NodeIndex[nodeName] = idx
c.NetworkTopology.NodeCount++
// 扩展带宽矩阵
newBW := make([][]float64, c.NetworkTopology.NodeCount)
for i := range newBW {
newBW[i] = make([]float64, c.NetworkTopology.NodeCount)
if i < len(c.NetworkTopology.Bandwidth) {
copy(newBW[i], c.NetworkTopology.Bandwidth[i])
}
}
c.NetworkTopology.Bandwidth = newBW
// 扩展延迟矩阵
newLatency := make([][]float64, c.NetworkTopology.NodeCount)
for i := range newLatency {
newLatency[i] = make([]float64, c.NetworkTopology.NodeCount)
if i < len(c.NetworkTopology.Latency) {
copy(newLatency[i], c.NetworkTopology.Latency[i])
}
}
c.NetworkTopology.Latency = newLatency
}
}
// SetNetworkConnection 设置节点间网络连接
func (c *ClusterTopology) SetNetworkConnection(node1, node2 string, bandwidth, latency float64) {
idx1, ok1 := c.NetworkTopology.NodeIndex[node1]
idx2, ok2 := c.NetworkTopology.NodeIndex[node2]
if !ok1 || !ok2 {
return
}
c.NetworkTopology.Bandwidth[idx1][idx2] = bandwidth
c.NetworkTopology.Bandwidth[idx2][idx1] = bandwidth
c.NetworkTopology.Latency[idx1][idx2] = latency
c.NetworkTopology.Latency[idx2][idx1] = latency
}
// GPULocation GPU 位置
type GPULocation struct {
NodeName string
GPUIndex int
}
// GetInterNodeBandwidth 获取跨节点 GPU 间带宽
func (c *ClusterTopology) GetInterNodeBandwidth(loc1, loc2 GPULocation) float64 {
if loc1.NodeName == loc2.NodeName {
// 同节点,使用节点内拓扑
topo := c.NodeTopologies[loc1.NodeName]
if topo != nil {
return topo.GetBandwidth(loc1.GPUIndex, loc2.GPUIndex)
}
return 0
}
// 跨节点,使用网络拓扑
idx1, ok1 := c.NetworkTopology.NodeIndex[loc1.NodeName]
idx2, ok2 := c.NetworkTopology.NodeIndex[loc2.NodeName]
if !ok1 || !ok2 {
return 0
}
return c.NetworkTopology.Bandwidth[idx1][idx2]
}
// MultiNodeAllocation 多节点分配结果
type MultiNodeAllocation struct {
// 节点分配
NodeAllocations map[string][]int
// 总 GPU 数
TotalGPUs int
// 最小带宽
MinBandwidth float64
// Ring 顺序
RingOrder []GPULocation
}
// MultiNodeAllocator 多节点分配器
type MultiNodeAllocator struct {
clusterTopo *ClusterTopology
}
// NewMultiNodeAllocator 创建多节点分配器
func NewMultiNodeAllocator(clusterTopo *ClusterTopology) *MultiNodeAllocator {
return &MultiNodeAllocator{clusterTopo: clusterTopo}
}
// Allocate 分配 GPU
func (a *MultiNodeAllocator) Allocate(totalGPUs int, constraint MultiNodeConstraint) (*MultiNodeAllocation, error) {
// 收集所有节点的可用 GPU
nodeGPUs := make(map[string]int)
for nodeName, topo := range a.clusterTopo.NodeTopologies {
nodeGPUs[nodeName] = topo.GPUCount
}
// 根据策略分配
switch constraint.Policy {
case "pack":
return a.allocatePack(totalGPUs, nodeGPUs, constraint)
case "spread":
return a.allocateSpread(totalGPUs, nodeGPUs, constraint)
default:
return a.allocateBestFit(totalGPUs, nodeGPUs, constraint)
}
}
// MultiNodeConstraint 多节点约束
type MultiNodeConstraint struct {
// 分配策略
Policy string
// 最大节点数
MaxNodes int
// 单节点最小 GPU 数
MinGPUsPerNode int
// 最小网络带宽 (GB/s)
MinNetworkBandwidth float64
}
// allocateBestFit 最佳适配分配
func (a *MultiNodeAllocator) allocateBestFit(totalGPUs int, nodeGPUs map[string]int,
constraint MultiNodeConstraint) (*MultiNodeAllocation, error) {
// 按 GPU 数量排序节点
type nodeInfo struct {
name string
gpus int
}
nodes := make([]nodeInfo, 0, len(nodeGPUs))
for name, gpus := range nodeGPUs {
nodes = append(nodes, nodeInfo{name, gpus})
}
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].gpus > nodes[j].gpus
})
result := &MultiNodeAllocation{
NodeAllocations: make(map[string][]int),
}
remaining := totalGPUs
nodeCount := 0
for _, node := range nodes {
if remaining <= 0 {
break
}
if constraint.MaxNodes > 0 && nodeCount >= constraint.MaxNodes {
break
}
// 计算分配数量
allocate := min(remaining, node.gpus)
// 检查单节点最小 GPU 数
if constraint.MinGPUsPerNode > 0 && allocate < constraint.MinGPUsPerNode {
continue
}
// 节点内最优分配
topo := a.clusterTopo.NodeTopologies[node.name]
allocator := NewGPUAllocator(topo, PolicyBestFit)
available := make([]int, topo.GPUCount)
for i := range available {
available[i] = i
}
allocated, err := allocator.Allocate(available, allocate, TopologyConstraint{
RequireNVLink: true, // 优先 NVLINK
})
if err != nil {
// 降级
allocated, _ = allocator.Allocate(available, allocate, TopologyConstraint{})
}
result.NodeAllocations[node.name] = allocated
remaining -= len(allocated)
nodeCount++
}
if remaining > 0 {
return nil, fmt.Errorf("cannot allocate %d GPUs, %d remaining", totalGPUs, remaining)
}
// 计算总 GPU 数和最小带宽
result.TotalGPUs = totalGPUs
result.MinBandwidth = a.calculateMinBandwidth(result)
// 优化 Ring 顺序
result.RingOrder = a.optimizeMultiNodeRing(result)
return result, nil
}
// allocatePack 紧凑分配(尽量少用节点)
func (a *MultiNodeAllocator) allocatePack(totalGPUs int, nodeGPUs map[string]int,
constraint MultiNodeConstraint) (*MultiNodeAllocation, error) {
// 找能容纳所有 GPU 的最小节点集合
// 使用贪心算法
type nodeInfo struct {
name string
gpus int
}
nodes := make([]nodeInfo, 0, len(nodeGPUs))
for name, gpus := range nodeGPUs {
nodes = append(nodes, nodeInfo{name, gpus})
}
// 按 GPU 数量降序
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].gpus > nodes[j].gpus
})
result := &MultiNodeAllocation{
NodeAllocations: make(map[string][]int),
}
remaining := totalGPUs
for _, node := range nodes {
if remaining <= 0 {
break
}
allocate := min(remaining, node.gpus)
// 检查网络带宽
if len(result.NodeAllocations) > 0 && constraint.MinNetworkBandwidth > 0 {
// 检查与已分配节点的网络连接
meetsRequirement := true
for existingNode := range result.NodeAllocations {
idx1 := a.clusterTopo.NetworkTopology.NodeIndex[existingNode]
idx2 := a.clusterTopo.NetworkTopology.NodeIndex[node.name]
bw := a.clusterTopo.NetworkTopology.Bandwidth[idx1][idx2]
if bw < constraint.MinNetworkBandwidth {
meetsRequirement = false
break
}
}
if !meetsRequirement {
continue
}
}
topo := a.clusterTopo.NodeTopologies[node.name]
available := make([]int, topo.GPUCount)
for i := range available {
available[i] = i
}
allocator := NewGPUAllocator(topo, PolicyPack)
allocated, _ := allocator.Allocate(available, allocate, TopologyConstraint{})
result.NodeAllocations[node.name] = allocated
remaining -= len(allocated)
}
if remaining > 0 {
return nil, fmt.Errorf("pack allocation failed, %d GPUs remaining", remaining)
}
result.TotalGPUs = totalGPUs
result.MinBandwidth = a.calculateMinBandwidth(result)
result.RingOrder = a.optimizeMultiNodeRing(result)
return result, nil
}
// allocateSpread 分散分配
func (a *MultiNodeAllocator) allocateSpread(totalGPUs int, nodeGPUs map[string]int,
constraint MultiNodeConstraint) (*MultiNodeAllocation, error) {
result := &MultiNodeAllocation{
NodeAllocations: make(map[string][]int),
}
// 计算每节点分配数量
nodeCount := len(nodeGPUs)
if constraint.MaxNodes > 0 && nodeCount > constraint.MaxNodes {
nodeCount = constraint.MaxNodes
}
gpusPerNode := totalGPUs / nodeCount
extraGPUs := totalGPUs % nodeCount
i := 0
for nodeName := range nodeGPUs {
if i >= nodeCount {
break
}
allocate := gpusPerNode
if i < extraGPUs {
allocate++
}
topo := a.clusterTopo.NodeTopologies[nodeName]
available := make([]int, topo.GPUCount)
for j := range available {
available[j] = j
}
allocator := NewGPUAllocator(topo, PolicyBestFit)
allocated, err := allocator.Allocate(available, allocate, TopologyConstraint{})
if err != nil {
continue
}
result.NodeAllocations[nodeName] = allocated
i++
}
result.TotalGPUs = totalGPUs
result.MinBandwidth = a.calculateMinBandwidth(result)
result.RingOrder = a.optimizeMultiNodeRing(result)
return result, nil
}
// calculateMinBandwidth 计算最小带宽
func (a *MultiNodeAllocator) calculateMinBandwidth(alloc *MultiNodeAllocation) float64 {
minBW := float64(1e9)
// 节点内带宽
for nodeName, gpus := range alloc.NodeAllocations {
topo := a.clusterTopo.NodeTopologies[nodeName]
bw := topo.GetMinBandwidth(gpus)
if bw < minBW {
minBW = bw
}
}
// 跨节点带宽
nodes := make([]string, 0, len(alloc.NodeAllocations))
for nodeName := range alloc.NodeAllocations {
nodes = append(nodes, nodeName)
}
for i := 0; i < len(nodes); i++ {
for j := i + 1; j < len(nodes); j++ {
idx1 := a.clusterTopo.NetworkTopology.NodeIndex[nodes[i]]
idx2 := a.clusterTopo.NetworkTopology.NodeIndex[nodes[j]]
bw := a.clusterTopo.NetworkTopology.Bandwidth[idx1][idx2]
if bw < minBW {
minBW = bw
}
}
}
return minBW
}
// optimizeMultiNodeRing 优化多节点 Ring
func (a *MultiNodeAllocator) optimizeMultiNodeRing(alloc *MultiNodeAllocation) []GPULocation {
var ring []GPULocation
// 收集所有 GPU 位置
for nodeName, gpus := range alloc.NodeAllocations {
for _, gpu := range gpus {
ring = append(ring, GPULocation{NodeName: nodeName, GPUIndex: gpu})
}
}
// 优化 Ring 顺序
// 策略:尽量减少跨节点通信
// 按节点分组
nodeGroups := make(map[string][]GPULocation)
for _, loc := range ring {
nodeGroups[loc.NodeName] = append(nodeGroups[loc.NodeName], loc)
}
// 节点内优化
for nodeName, locs := range nodeGroups {
topo := a.clusterTopo.NodeTopologies[nodeName]
gpuIndices := make([]int, len(locs))
for i, loc := range locs {
gpuIndices[i] = loc.GPUIndex
}
ringOpt := NewRingOptimizer(topo)
optimizedIndices := ringOpt.OptimizeRing(gpuIndices)
for i, idx := range optimizedIndices {
nodeGroups[nodeName][i].GPUIndex = idx
}
}
// 按节点顺序排列
result := make([]GPULocation, 0, len(ring))
for _, locs := range nodeGroups {
result = append(result, locs...)
}
return result
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
4.2 分布式训练调度
# distributed-training-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: distributed-training
namespace: ml-workloads
annotations:
# 拓扑感知调度
gpu.topology/policy: "nvlink"
gpu.topology/min-bandwidth: "100"
# 多节点配置
gpu.topology/multi-node-policy: "pack"
gpu.topology/max-nodes: "2"
gpu.topology/min-gpus-per-node: "4"
spec:
parallelism: 2
completions: 2
template:
metadata:
labels:
job-name: distributed-training
spec:
schedulerName: gpu-scheduler
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchLabels:
job-name: distributed-training
topologyKey: kubernetes.io/hostname
containers:
- name: trainer
image: pytorch/pytorch:2.0-cuda12.1-cudnn8-runtime
command:
- torchrun
- --nproc_per_node=4
- --nnodes=2
- --node_rank=$(POD_INDEX)
- --master_addr=$(MASTER_ADDR)
- --master_port=29500
- train.py
env:
- name: POD_INDEX
valueFrom:
fieldRef:
fieldPath: metadata.annotations['batch.kubernetes.io/job-completion-index']
- name: MASTER_ADDR
value: "distributed-training-0.distributed-training"
- name: NCCL_DEBUG
value: INFO
- name: NCCL_IB_DISABLE
value: "0"
resources:
limits:
nvidia.com/gpu: 4
requests:
nvidia.com/gpu: 4
cpu: "16"
memory: 64Gi
volumeMounts:
- name: shm
mountPath: /dev/shm
volumes:
- name: shm
emptyDir:
medium: Memory
sizeLimit: 32Gi
restartPolicy: Never
backoffLimit: 3
---
apiVersion: v1
kind: Service
metadata:
name: distributed-training
namespace: ml-workloads
spec:
clusterIP: None
selector:
job-name: distributed-training
ports:
- port: 29500
name: nccl
5. 拓扑感知调度实践
5.1 性能对比测试
# benchmark_topology.py
import torch
import torch.distributed as dist
import time
import os
def benchmark_allreduce(size_mb, iterations=100):
"""测试 AllReduce 性能"""
rank = dist.get_rank()
world_size = dist.get_world_size()
# 创建测试张量
tensor = torch.randn(size_mb * 1024 * 256, device='cuda') # MB to elements
# 预热
for _ in range(10):
dist.all_reduce(tensor)
torch.cuda.synchronize()
# 计时
start = time.time()
for _ in range(iterations):
dist.all_reduce(tensor)
torch.cuda.synchronize()
elapsed = time.time() - start
# 计算带宽
# AllReduce 传输量 = 2 * (n-1) / n * data_size
data_size = size_mb * 1024 * 1024 # bytes
transfer_size = 2 * (world_size - 1) / world_size * data_size
bandwidth = transfer_size * iterations / elapsed / 1e9 # GB/s
if rank == 0:
print(f"Size: {size_mb}MB, Iterations: {iterations}")
print(f"Time: {elapsed:.3f}s")
print(f"Bandwidth: {bandwidth:.2f} GB/s")
print(f"Latency: {elapsed/iterations*1000:.3f} ms")
return bandwidth
def main():
dist.init_process_group(backend='nccl')
rank = dist.get_rank()
# 设置 GPU
local_rank = int(os.environ.get('LOCAL_RANK', 0))
torch.cuda.set_device(local_rank)
if rank == 0:
print(f"World size: {dist.get_world_size()}")
print(f"Backend: {dist.get_backend()}")
print(f"CUDA device: {torch.cuda.get_device_name()}")
print()
# 不同大小的测试
sizes = [1, 10, 100, 1000]
for size in sizes:
benchmark_allreduce(size)
if rank == 0:
print("-" * 40)
dist.destroy_process_group()
if __name__ == '__main__':
main()
5.2 拓扑验证脚本
#!/bin/bash
# verify_topology.sh - 验证 GPU 拓扑配置
echo "=== GPU Topology Verification ==="
echo
# 检查 GPU 信息
echo "1. GPU Information:"
nvidia-smi --query-gpu=index,name,uuid,memory.total --format=csv
echo
# 检查拓扑
echo "2. GPU Topology:"
nvidia-smi topo -m
echo
# 检查 NVLINK
echo "3. NVLINK Status:"
nvidia-smi nvlink -s
echo
# 检查 NUMA
echo "4. NUMA Affinity:"
for i in $(seq 0 $(($(nvidia-smi -L | wc -l) - 1))); do
busid=$(nvidia-smi --id=$i --query-gpu=pci.bus_id --format=csv,noheader)
numa=$(cat /sys/bus/pci/devices/$busid/numa_node 2>/dev/null || echo "N/A")
echo "GPU $i: Bus=$busid, NUMA=$numa"
done
echo
# 检查网络
echo "5. Network Interfaces:"
if [ -d /sys/class/infiniband ]; then
for dev in /sys/class/infiniband/*; do
name=$(basename $dev)
numa=$(cat $dev/device/numa_node 2>/dev/null || echo "N/A")
state=$(cat $dev/ports/1/state 2>/dev/null || echo "N/A")
echo "IB: $name, NUMA=$numa, State=$state"
done
else
echo "No InfiniBand devices found"
fi
echo
# 检查 NCCL 环境
echo "6. NCCL Environment:"
env | grep NCCL | sort
echo
# P2P 测试
echo "7. P2P Bandwidth Test:"
if command -v cuda_memtest &> /dev/null; then
/usr/local/cuda/samples/1_Utilities/p2pBandwidthLatencyTest/p2pBandwidthLatencyTest
else
echo "p2pBandwidthLatencyTest not found"
fi
5.3 监控拓扑使用情况
# prometheus-rules.yaml
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: gpu-topology-rules
namespace: monitoring
spec:
groups:
- name: gpu-topology
rules:
# 拓扑感知调度成功率
- record: gpu_topology_scheduling_success_rate
expr: |
sum(rate(gpu_scheduler_topology_schedules_total{satisfied="true"}[5m])) /
sum(rate(gpu_scheduler_topology_schedules_total[5m]))
# NVLINK 使用率
- record: gpu_nvlink_utilization
expr: |
sum(DCGM_FI_DEV_NVLINK_BANDWIDTH_TOTAL) by (gpu) /
(300 * 12) # A100: 300GB/s * 12 links
# 跨 NUMA 调度比例
- record: gpu_cross_numa_scheduling_ratio
expr: |
sum(gpu_scheduler_allocations{same_numa="false"}) /
sum(gpu_scheduler_allocations)
# 告警:拓扑约束失败率过高
- alert: HighTopologyConstraintFailureRate
expr: gpu_topology_scheduling_success_rate < 0.8
for: 10m
labels:
severity: warning
annotations:
summary: "GPU topology constraint failure rate is high"
description: "More than 20% of topology-constrained scheduling requests are failing"
# 告警:频繁跨 NUMA 调度
- alert: HighCrossNUMAScheduling
expr: gpu_cross_numa_scheduling_ratio > 0.5
for: 30m
labels:
severity: warning
annotations:
summary: "High cross-NUMA GPU scheduling ratio"
description: "More than 50% of multi-GPU allocations are crossing NUMA boundaries"
6. 最佳实践
6.1 拓扑策略选择指南
| 场景 | 推荐策略 | GPU 数量 | 说明 |
|---|---|---|---|
| 大模型训练 | nvlink + pack | 8+ | 最大化通信带宽 |
| 模型并行 | single-numa | 2-4 | 减少 NUMA 跨越开销 |
| 数据并行 | best-effort | any | 灵活分配 |
| 推理服务 | spread | 1-2 | 提高可用性 |
| 混合并行 | nvlink | 4-8 | 平衡带宽和规模 |
6.2 NCCL 调优建议
# nccl-tuning-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: nccl-tuning
namespace: ml-workloads
data:
# 基础配置
NCCL_DEBUG: "WARN"
NCCL_DEBUG_SUBSYS: "INIT,COLL"
# NVSwitch 环境
nvswitch.env: |
NCCL_NVLS_ENABLE=1
NCCL_ALGO=Ring,Tree
NCCL_PROTO=LL,LL128,Simple
NCCL_MIN_NCHANNELS=32
NCCL_MAX_NCHANNELS=32
# NVLINK 环境(无 NVSwitch)
nvlink.env: |
NCCL_P2P_LEVEL=NVL
NCCL_ALGO=Ring
NCCL_PROTO=LL,LL128
NCCL_MIN_NCHANNELS=16
# PCIe 环境
pcie.env: |
NCCL_P2P_LEVEL=PHB
NCCL_SHM_USE_CUDA_MEMCPY=1
NCCL_ALGO=Ring
NCCL_PROTO=Simple
# 跨节点环境
multinode.env: |
NCCL_IB_DISABLE=0
NCCL_NET_GDR_LEVEL=5
NCCL_IB_GID_INDEX=3
NCCL_SOCKET_IFNAME=eth0
NCCL_IB_HCA=mlx5
6.3 故障处理
// pkg/scheduler/fallback.go
package scheduler
// FallbackAllocator 回退分配器
type FallbackAllocator struct {
primary *GPUAllocator
secondary *GPUAllocator
}
// Allocate 带回退的分配
func (f *FallbackAllocator) Allocate(available []int, count int,
constraint TopologyConstraint) ([]int, error) {
// 尝试主分配器(严格拓扑约束)
result, err := f.primary.Allocate(available, count, constraint)
if err == nil {
return result, nil
}
// 记录降级事件
log.Warnf("Primary allocation failed: %v, trying fallback", err)
metrics.RecordTopologyFallback(constraint.String())
// 使用次级分配器(宽松约束)
relaxedConstraint := TopologyConstraint{
MinLevel: TopologyLevelSYS, // 最宽松
}
result, err = f.secondary.Allocate(available, count, relaxedConstraint)
if err != nil {
return nil, fmt.Errorf("both primary and fallback allocation failed: %v", err)
}
// 添加警告注解
// 调用者可以根据此信息决定是否继续
return result, nil
}
总结
本章深入讲解了 GPU 拓扑感知调度的原理与实现:
- GPU 拓扑基础:理解 NVLINK、NVSwitch、PCIe、NUMA 等互联方式及其性能特点
- 拓扑发现:使用 NVML 和系统文件发现 GPU 拓扑结构
- 分配算法:实现 BestFit、FirstFit、Pack、Spread 等分配策略
- Ring 优化:通过贪心和 2-opt 优化 AllReduce Ring 顺序
- 多节点调度:跨节点拓扑感知和网络带宽优化
- Kubernetes 集成:Topology Manager 和调度器插件实现
拓扑感知调度是大规模 AI 训练的关键优化手段,合理利用 GPU 拓扑可以显著提升分布式训练性能。
下一章我们将探讨 弹性 GPU 调度,讲解如何实现 GPU 资源的动态伸缩和抢占式调度。