GPU 调度器实现
概述
在 Kubernetes 中,GPU 资源的调度远比 CPU 和内存复杂。默认调度器虽然支持 Extended Resources,但无法感知 GPU 的拓扑结构、NVLINK 互联、MIG 分区等特性。本章深入剖析 Kubernetes 调度器架构,并实现一个 GPU 感知的自定义调度器。
1. Kubernetes 调度器架构
1.1 调度器核心流程
┌─────────────────────────────────────────────────────────────────────┐
│ Kubernetes Scheduler Architecture │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌───────────────────────────────────────────┐ │
│ │ Informer │───▶│ Scheduling Queue │ │
│ │ (Watch) │ │ ┌─────────┐ ┌─────────┐ ┌─────────────┐ │ │
│ └──────────────┘ │ │ Active │ │ Backoff │ │ Unschedulable│ │ │
│ │ │ Queue │ │ Queue │ │ Queue │ │ │
│ │ └────┬────┘ └────┬────┘ └──────────────┘ │ │
│ └───────┼───────────┼───────────────────────┘ │
│ │ │ │
│ ▼ │ │
│ ┌───────────────────────────────────────┼───────────────────────┐ │
│ │ Scheduling Cycle │ │ │
│ │ ┌─────────────────────────────────┐ │ │ │
│ │ │ Scheduling Framework │ │ │ │
│ │ │ │ │ │ │
│ │ │ PreFilter ──▶ Filter ──▶ PostFilter │ │
│ │ │ │ │ │ │
│ │ │ ▼ ▼ │ │
│ │ │ PreScore ──▶ Score ──▶ NormalizeScore │ │
│ │ │ │ │ │ │
│ │ │ ▼ ▼ │ │
│ │ │ Reserve ──▶ Permit ──▶ PreBind ──▶ Bind ──▶ PostBind │ │
│ │ │ │ │
│ │ └─────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
1.2 调度框架扩展点
// Scheduling Framework 定义了调度器的扩展点
// 路径: kubernetes/pkg/scheduler/framework/interface.go
// PreFilterPlugin 在过滤之前执行,用于预处理或检查 Pod 信息
type PreFilterPlugin interface {
Plugin
// PreFilter 在调度周期开始时调用
// 可用于计算 Pod 需要的资源、检查必要条件等
PreFilter(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status)
// PreFilterExtensions 返回扩展接口
PreFilterExtensions() PreFilterExtensions
}
// FilterPlugin 过滤不满足条件的节点
type FilterPlugin interface {
Plugin
// Filter 对每个节点调用,返回节点是否适合运行 Pod
// 这是最核心的过滤逻辑
Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status
}
// PostFilterPlugin 当所有节点都被过滤掉时执行
type PostFilterPlugin interface {
Plugin
// PostFilter 尝试抢占等操作使 Pod 可调度
PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod,
filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
}
// PreScorePlugin 在打分前执行预处理
type PreScorePlugin interface {
Plugin
PreScore(ctx context.Context, state *CycleState, pod *v1.Pod,
nodes []*v1.Node) *Status
}
// ScorePlugin 为通过过滤的节点打分
type ScorePlugin interface {
Plugin
// Score 返回节点的得分,范围 [0, 100]
Score(ctx context.Context, state *CycleState, pod *v1.Pod,
nodeName string) (int64, *Status)
// ScoreExtensions 返回归一化扩展
ScoreExtensions() ScoreExtensions
}
// ReservePlugin 在绑定前预留资源
type ReservePlugin interface {
Plugin
// Reserve 预留节点上的资源
Reserve(ctx context.Context, state *CycleState, pod *v1.Pod,
nodeName string) *Status
// Unreserve 取消预留(当后续步骤失败时调用)
Unreserve(ctx context.Context, state *CycleState, pod *v1.Pod,
nodeName string)
}
// PermitPlugin 在绑定前进行最终检查
type PermitPlugin interface {
Plugin
// Permit 可以返回:
// - Success: 立即绑定
// - Wait: 等待指定时间
// - Deny: 拒绝调度
Permit(ctx context.Context, state *CycleState, pod *v1.Pod,
nodeName string) (*Status, time.Duration)
}
// BindPlugin 执行实际的绑定操作
type BindPlugin interface {
Plugin
// Bind 将 Pod 绑定到节点
Bind(ctx context.Context, state *CycleState, pod *v1.Pod,
nodeName string) *Status
}
1.3 默认调度器的 GPU 资源处理
默认调度器通过 NodeResourcesFit 插件处理 GPU 资源:
// 路径: kubernetes/pkg/scheduler/framework/plugins/noderesources/fit.go
// NodeResourcesFit 检查节点是否有足够的资源
type NodeResourcesFit struct {
// 忽略的资源列表
ignoredResources sets.String
// 忽略的资源组
ignoredResourceGroups sets.String
// 是否启用 Pod overhead
enablePodOverhead bool
// 评分策略
scoreStrategy *config.ScoringStrategy
}
// Filter 过滤资源不足的节点
func (f *NodeResourcesFit) Filter(ctx context.Context, cycleState *framework.CycleState,
pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
// 获取 Pod 请求的资源
podRequest := f.getResourceRequest(pod)
// 检查节点可用资源
insufficientResources := fitsRequest(podRequest, nodeInfo,
f.ignoredResources, f.ignoredResourceGroups)
if len(insufficientResources) != 0 {
// 资源不足,返回失败原因
failureReasons := make([]string, 0, len(insufficientResources))
for _, r := range insufficientResources {
failureReasons = append(failureReasons, r.Reason)
}
return framework.NewStatus(framework.Unschedulable, failureReasons...)
}
return nil
}
// fitsRequest 检查节点资源是否满足请求
func fitsRequest(podRequest *preFilterState, nodeInfo *framework.NodeInfo,
ignoredExtendedResources, ignoredResourceGroups sets.String) []InsufficientResource {
insufficientResources := make([]InsufficientResource, 0, 4)
// 检查 CPU
if podRequest.MilliCPU > (nodeInfo.Allocatable.MilliCPU - nodeInfo.Requested.MilliCPU) {
insufficientResources = append(insufficientResources, InsufficientResource{
ResourceName: v1.ResourceCPU,
Reason: "Insufficient cpu",
Requested: podRequest.MilliCPU,
Used: nodeInfo.Requested.MilliCPU,
Capacity: nodeInfo.Allocatable.MilliCPU,
})
}
// 检查内存
if podRequest.Memory > (nodeInfo.Allocatable.Memory - nodeInfo.Requested.Memory) {
insufficientResources = append(insufficientResources, InsufficientResource{
ResourceName: v1.ResourceMemory,
Reason: "Insufficient memory",
Requested: podRequest.Memory,
Used: nodeInfo.Requested.Memory,
Capacity: nodeInfo.Allocatable.Memory,
})
}
// 检查扩展资源(包括 GPU)
for rName, rQuant := range podRequest.ScalarResources {
// 跳过忽略的资源
if ignoredExtendedResources.Has(string(rName)) {
continue
}
// 检查资源组是否被忽略
if ignoredResourceGroups.Len() > 0 {
group := strings.Split(string(rName), "/")[0]
if ignoredResourceGroups.Has(group) {
continue
}
}
// 关键:检查 GPU 等扩展资源
if rQuant > (nodeInfo.Allocatable.ScalarResources[rName] -
nodeInfo.Requested.ScalarResources[rName]) {
insufficientResources = append(insufficientResources, InsufficientResource{
ResourceName: rName,
Reason: fmt.Sprintf("Insufficient %v", rName),
Requested: rQuant,
Used: nodeInfo.Requested.ScalarResources[rName],
Capacity: nodeInfo.Allocatable.ScalarResources[rName],
})
}
}
return insufficientResources
}
默认调度器的局限性:
- 只做数量匹配:只检查 GPU 数量,不关心 GPU 型号
- 无拓扑感知:不考虑 NVLINK、PCIe 拓扑
- 无碎片优化:可能造成 GPU 资源碎片化
- 无 MIG 感知:无法智能分配 MIG 实例
2. GPU 感知调度器设计
2.1 架构设计
┌─────────────────────────────────────────────────────────────────────────┐
│ GPU-Aware Scheduler Architecture │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ Scheduler Plugins │ │
│ │ │ │
│ │ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │ │
│ │ │ GPUPreFilter │ │ GPUFilter │ │ GPUScore │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ - 解析GPU请求 │ │ - GPU型号匹配 │ │ - 拓扑评分 │ │ │
│ │ │ - 检查注解 │ │ - 拓扑约束检查 │ │ - 碎片化评分 │ │ │
│ │ │ - 预处理状态 │ │ - MIG分区检查 │ │ - 利用率评分 │ │ │
│ │ └──────────────────┘ └──────────────────┘ └──────────────────┘ │ │
│ │ │ │
│ │ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │ │
│ │ │ GPUReserve │ │ GPUBind │ │ GPUPostBind │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ - 预留GPU设备 │ │ - 分配具体GPU │ │ - 更新监控指标 │ │ │
│ │ │ - 更新本地缓存 │ │ - 更新节点状态 │ │ - 记录审计日志 │ │ │
│ │ └──────────────────┘ └──────────────────┘ └──────────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ GPU State Manager │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌────────────────────────┐ │ │
│ │ │ Node GPU │ │ GPU Topo │ │ MIG Manager │ │ │
│ │ │ Cache │ │ Graph │ │ │ │ │
│ │ │ │ │ │ │ ┌──────────────────┐ │ │ │
│ │ │ GPU 0: Free │ │ [0]──[1] │ │ │ GPU 0: 3g.40gb │ │ │ │
│ │ │ GPU 1: Used │ │ │ │ │ │ │ GPU 1: 7g.80gb │ │ │ │
│ │ │ GPU 2: Free │ │ [2]──[3] │ │ └──────────────────┘ │ │ │
│ │ └──────────────┘ └──────────────┘ └────────────────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ Data Sources │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────────┐ │ │
│ │ │ Node CRD │ │ GPU Metrics │ │ Device Plugin API │ │ │
│ │ │ │ │ (Prometheus) │ │ │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
2.2 GPU 状态数据结构
package gpuscheduler
import (
"sync"
"time"
v1 "k8s.io/api/core/v1"
)
// GPUDevice 表示单个 GPU 设备
type GPUDevice struct {
// 设备 UUID
UUID string `json:"uuid"`
// 设备索引
Index int `json:"index"`
// GPU 型号
Model string `json:"model"`
// 显存大小(MB)
MemoryTotal int64 `json:"memoryTotal"`
// 可用显存(MB)
MemoryFree int64 `json:"memoryFree"`
// 计算能力
ComputeCapability string `json:"computeCapability"`
// 是否支持 MIG
MIGEnabled bool `json:"migEnabled"`
// MIG 设备列表
MIGDevices []MIGDevice `json:"migDevices,omitempty"`
// 当前状态
State DeviceState `json:"state"`
// 分配给的 Pod
AllocatedPod string `json:"allocatedPod,omitempty"`
// 健康状态
Health DeviceHealth `json:"health"`
// NVLINK 连接
NVLinkPeers []int `json:"nvlinkPeers,omitempty"`
// PCIe 信息
PCIeInfo PCIeInfo `json:"pcieInfo"`
}
// DeviceState GPU 设备状态
type DeviceState string
const (
DeviceStateFree DeviceState = "Free"
DeviceStateAllocated DeviceState = "Allocated"
DeviceStateReserved DeviceState = "Reserved"
DeviceStateFailed DeviceState = "Failed"
)
// DeviceHealth GPU 健康状态
type DeviceHealth string
const (
DeviceHealthy DeviceHealth = "Healthy"
DeviceUnhealthy DeviceHealth = "Unhealthy"
DeviceUnknown DeviceHealth = "Unknown"
)
// MIGDevice MIG 设备实例
type MIGDevice struct {
// MIG UUID
UUID string `json:"uuid"`
// MIG profile(如 "1g.5gb")
Profile string `json:"profile"`
// GPU 实例 ID
GIIndex int `json:"giIndex"`
// 计算实例 ID
CIIndex int `json:"ciIndex"`
// 状态
State DeviceState `json:"state"`
// 分配给的 Pod
AllocatedPod string `json:"allocatedPod,omitempty"`
}
// PCIeInfo PCIe 拓扑信息
type PCIeInfo struct {
// Bus ID
BusID string `json:"busId"`
// NUMA 节点
NUMANode int `json:"numaNode"`
// PCIe 代数
Generation int `json:"generation"`
// 链路宽度
LinkWidth int `json:"linkWidth"`
}
// NodeGPUInfo 节点的 GPU 信息
type NodeGPUInfo struct {
// 节点名称
NodeName string `json:"nodeName"`
// GPU 设备列表
GPUs []GPUDevice `json:"gpus"`
// GPU 拓扑
Topology *GPUTopology `json:"topology"`
// 驱动版本
DriverVersion string `json:"driverVersion"`
// CUDA 版本
CUDAVersion string `json:"cudaVersion"`
// 更新时间
LastUpdated time.Time `json:"lastUpdated"`
}
// GPUTopology GPU 拓扑结构
type GPUTopology struct {
// 节点名
NodeName string `json:"nodeName"`
// 连接矩阵,Links[i][j] 表示 GPU i 到 GPU j 的连接类型
Links [][]LinkType `json:"links"`
// NUMA 亲和性,NUMANodes[i] 表示 GPU i 所属的 NUMA 节点
NUMANodes []int `json:"numaNodes"`
}
// LinkType GPU 间连接类型
type LinkType string
const (
LinkTypeNone LinkType = "None"
LinkTypePCIe LinkType = "PCIe"
LinkTypeNVLink LinkType = "NVLink"
LinkTypeNVSwitch LinkType = "NVSwitch"
LinkTypeSameBoard LinkType = "SameBoard"
)
// GPUStateManager GPU 状态管理器
type GPUStateManager struct {
mu sync.RWMutex
// 节点 GPU 信息缓存
nodeGPUInfo map[string]*NodeGPUInfo
// 更新通道
updateChan chan *NodeGPUInfo
// 停止信号
stopChan chan struct{}
}
// NewGPUStateManager 创建状态管理器
func NewGPUStateManager() *GPUStateManager {
return &GPUStateManager{
nodeGPUInfo: make(map[string]*NodeGPUInfo),
updateChan: make(chan *NodeGPUInfo, 100),
stopChan: make(chan struct{}),
}
}
// GetNodeGPUInfo 获取节点 GPU 信息
func (m *GPUStateManager) GetNodeGPUInfo(nodeName string) (*NodeGPUInfo, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
info, ok := m.nodeGPUInfo[nodeName]
return info, ok
}
// UpdateNodeGPUInfo 更新节点 GPU 信息
func (m *GPUStateManager) UpdateNodeGPUInfo(info *NodeGPUInfo) {
m.mu.Lock()
defer m.mu.Unlock()
info.LastUpdated = time.Now()
m.nodeGPUInfo[info.NodeName] = info
}
// GetAvailableGPUs 获取节点可用的 GPU 列表
func (m *GPUStateManager) GetAvailableGPUs(nodeName string) []GPUDevice {
m.mu.RLock()
defer m.mu.RUnlock()
info, ok := m.nodeGPUInfo[nodeName]
if !ok {
return nil
}
var available []GPUDevice
for _, gpu := range info.GPUs {
if gpu.State == DeviceStateFree && gpu.Health == DeviceHealthy {
available = append(available, gpu)
}
}
return available
}
// ReserveGPUs 预留 GPU 设备
func (m *GPUStateManager) ReserveGPUs(nodeName string, gpuIndices []int, podName string) error {
m.mu.Lock()
defer m.mu.Unlock()
info, ok := m.nodeGPUInfo[nodeName]
if !ok {
return fmt.Errorf("node %s not found", nodeName)
}
// 检查所有 GPU 是否可用
for _, idx := range gpuIndices {
if idx >= len(info.GPUs) {
return fmt.Errorf("GPU index %d out of range", idx)
}
if info.GPUs[idx].State != DeviceStateFree {
return fmt.Errorf("GPU %d is not free", idx)
}
}
// 预留 GPU
for _, idx := range gpuIndices {
info.GPUs[idx].State = DeviceStateReserved
info.GPUs[idx].AllocatedPod = podName
}
return nil
}
// UnreserveGPUs 取消预留
func (m *GPUStateManager) UnreserveGPUs(nodeName string, gpuIndices []int) {
m.mu.Lock()
defer m.mu.Unlock()
info, ok := m.nodeGPUInfo[nodeName]
if !ok {
return
}
for _, idx := range gpuIndices {
if idx < len(info.GPUs) && info.GPUs[idx].State == DeviceStateReserved {
info.GPUs[idx].State = DeviceStateFree
info.GPUs[idx].AllocatedPod = ""
}
}
}
3. 调度插件实现
3.1 PreFilter 插件
package gpuscheduler
import (
"context"
"fmt"
"strconv"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
const (
// PluginName 插件名称
PluginName = "GPUScheduler"
// GPU 资源名称
ResourceNvidiaGPU = "nvidia.com/gpu"
ResourceMIGPrefix = "nvidia.com/mig-"
// 注解键
AnnotationGPUModel = "gpu.scheduler/model"
AnnotationGPUMemory = "gpu.scheduler/memory"
AnnotationGPUTopology = "gpu.scheduler/topology"
AnnotationMIGProfile = "gpu.scheduler/mig-profile"
AnnotationPreferNVLink = "gpu.scheduler/prefer-nvlink"
AnnotationPreferSameNUMA = "gpu.scheduler/prefer-same-numa"
// 状态键
StateKeyGPURequest = "GPURequest"
)
// GPUSchedulerPlugin GPU 调度插件
type GPUSchedulerPlugin struct {
handle framework.Handle
stateManager *GPUStateManager
}
// GPURequest GPU 请求信息
type GPURequest struct {
// 请求的 GPU 数量
Count int64
// 指定的 GPU 型号(可选)
Model string
// 最小显存需求(MB)
MinMemory int64
// MIG profile(可选)
MIGProfile string
// 拓扑约束
TopologyConstraint string
// 是否偏好 NVLINK
PreferNVLink bool
// 是否偏好同一 NUMA
PreferSameNUMA bool
}
// Name 返回插件名称
func (g *GPUSchedulerPlugin) Name() string {
return PluginName
}
// New 创建插件实例
func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
return &GPUSchedulerPlugin{
handle: handle,
stateManager: NewGPUStateManager(),
}, nil
}
// PreFilter 预过滤阶段
func (g *GPUSchedulerPlugin) PreFilter(ctx context.Context, state *framework.CycleState,
pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
// 解析 GPU 请求
gpuRequest, err := g.parseGPURequest(pod)
if err != nil {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable,
fmt.Sprintf("failed to parse GPU request: %v", err))
}
// 如果不需要 GPU,跳过
if gpuRequest.Count == 0 {
return nil, framework.NewStatus(framework.Success)
}
// 验证请求的合法性
if err := g.validateGPURequest(gpuRequest); err != nil {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable,
fmt.Sprintf("invalid GPU request: %v", err))
}
// 将请求信息存入调度周期状态
state.Write(StateKeyGPURequest, gpuRequest)
return nil, framework.NewStatus(framework.Success)
}
// PreFilterExtensions 返回扩展接口
func (g *GPUSchedulerPlugin) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}
// parseGPURequest 解析 Pod 的 GPU 请求
func (g *GPUSchedulerPlugin) parseGPURequest(pod *v1.Pod) (*GPURequest, error) {
request := &GPURequest{}
// 从容器资源请求中获取 GPU 数量
for _, container := range pod.Spec.Containers {
if container.Resources.Requests != nil {
// 检查标准 GPU 资源
if gpuQty, ok := container.Resources.Requests[ResourceNvidiaGPU]; ok {
request.Count += gpuQty.Value()
}
// 检查 MIG 资源
for resName, qty := range container.Resources.Requests {
if strings.HasPrefix(string(resName), ResourceMIGPrefix) {
request.Count += qty.Value()
// 提取 MIG profile
request.MIGProfile = strings.TrimPrefix(string(resName), ResourceMIGPrefix)
}
}
}
}
// 从注解中获取附加需求
if pod.Annotations != nil {
if model, ok := pod.Annotations[AnnotationGPUModel]; ok {
request.Model = model
}
if memory, ok := pod.Annotations[AnnotationGPUMemory]; ok {
memMB, err := strconv.ParseInt(memory, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid memory annotation: %v", err)
}
request.MinMemory = memMB
}
if topology, ok := pod.Annotations[AnnotationGPUTopology]; ok {
request.TopologyConstraint = topology
}
if mig, ok := pod.Annotations[AnnotationMIGProfile]; ok {
request.MIGProfile = mig
}
if prefer, ok := pod.Annotations[AnnotationPreferNVLink]; ok {
request.PreferNVLink = prefer == "true"
}
if prefer, ok := pod.Annotations[AnnotationPreferSameNUMA]; ok {
request.PreferSameNUMA = prefer == "true"
}
}
return request, nil
}
// validateGPURequest 验证 GPU 请求
func (g *GPUSchedulerPlugin) validateGPURequest(request *GPURequest) error {
// 验证 GPU 数量
if request.Count < 0 {
return fmt.Errorf("negative GPU count: %d", request.Count)
}
if request.Count > 8 {
// 通常单节点最多 8 GPU
return fmt.Errorf("GPU count %d exceeds maximum", request.Count)
}
// 验证 MIG profile
if request.MIGProfile != "" {
validProfiles := []string{
"1g.5gb", "1g.10gb", "1g.20gb",
"2g.10gb", "2g.20gb",
"3g.20gb", "3g.40gb",
"4g.40gb",
"7g.40gb", "7g.80gb",
}
valid := false
for _, p := range validProfiles {
if request.MIGProfile == p {
valid = true
break
}
}
if !valid {
return fmt.Errorf("invalid MIG profile: %s", request.MIGProfile)
}
}
// 验证拓扑约束
if request.TopologyConstraint != "" {
validConstraints := []string{"single", "nvlink", "same-numa", "any"}
valid := false
for _, c := range validConstraints {
if request.TopologyConstraint == c {
valid = true
break
}
}
if !valid {
return fmt.Errorf("invalid topology constraint: %s", request.TopologyConstraint)
}
}
return nil
}
3.2 Filter 插件
// Filter 过滤不满足 GPU 需求的节点
func (g *GPUSchedulerPlugin) Filter(ctx context.Context, state *framework.CycleState,
pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
// 获取 GPU 请求
gpuRequestData, err := state.Read(StateKeyGPURequest)
if err != nil {
// 没有 GPU 请求,跳过
return framework.NewStatus(framework.Success)
}
gpuRequest := gpuRequestData.(*GPURequest)
if gpuRequest.Count == 0 {
return framework.NewStatus(framework.Success)
}
nodeName := nodeInfo.Node().Name
// 获取节点 GPU 信息
nodeGPUInfo, ok := g.stateManager.GetNodeGPUInfo(nodeName)
if !ok {
return framework.NewStatus(framework.Unschedulable,
fmt.Sprintf("node %s has no GPU info", nodeName))
}
// 检查 GPU 数量
availableGPUs := g.stateManager.GetAvailableGPUs(nodeName)
if int64(len(availableGPUs)) < gpuRequest.Count {
return framework.NewStatus(framework.Unschedulable,
fmt.Sprintf("insufficient GPUs: need %d, available %d",
gpuRequest.Count, len(availableGPUs)))
}
// 检查 GPU 型号
if gpuRequest.Model != "" {
matchingGPUs := 0
for _, gpu := range availableGPUs {
if gpu.Model == gpuRequest.Model {
matchingGPUs++
}
}
if int64(matchingGPUs) < gpuRequest.Count {
return framework.NewStatus(framework.Unschedulable,
fmt.Sprintf("insufficient GPUs with model %s: need %d, available %d",
gpuRequest.Model, gpuRequest.Count, matchingGPUs))
}
}
// 检查显存需求
if gpuRequest.MinMemory > 0 {
matchingGPUs := 0
for _, gpu := range availableGPUs {
if gpu.MemoryTotal >= gpuRequest.MinMemory {
matchingGPUs++
}
}
if int64(matchingGPUs) < gpuRequest.Count {
return framework.NewStatus(framework.Unschedulable,
fmt.Sprintf("insufficient GPUs with memory >= %dMB: need %d, available %d",
gpuRequest.MinMemory, gpuRequest.Count, matchingGPUs))
}
}
// 检查 MIG 配置
if gpuRequest.MIGProfile != "" {
if !g.checkMIGAvailability(nodeGPUInfo, gpuRequest) {
return framework.NewStatus(framework.Unschedulable,
fmt.Sprintf("no available MIG profile %s", gpuRequest.MIGProfile))
}
}
// 检查拓扑约束
if gpuRequest.TopologyConstraint != "" {
if !g.checkTopologyConstraint(nodeGPUInfo, gpuRequest) {
return framework.NewStatus(framework.Unschedulable,
fmt.Sprintf("topology constraint %s cannot be satisfied",
gpuRequest.TopologyConstraint))
}
}
return framework.NewStatus(framework.Success)
}
// checkMIGAvailability 检查 MIG 可用性
func (g *GPUSchedulerPlugin) checkMIGAvailability(info *NodeGPUInfo,
request *GPURequest) bool {
availableCount := int64(0)
for _, gpu := range info.GPUs {
if !gpu.MIGEnabled {
continue
}
for _, mig := range gpu.MIGDevices {
if mig.Profile == request.MIGProfile && mig.State == DeviceStateFree {
availableCount++
}
}
}
return availableCount >= request.Count
}
// checkTopologyConstraint 检查拓扑约束
func (g *GPUSchedulerPlugin) checkTopologyConstraint(info *NodeGPUInfo,
request *GPURequest) bool {
availableGPUs := make([]int, 0)
for i, gpu := range info.GPUs {
if gpu.State == DeviceStateFree && gpu.Health == DeviceHealthy {
availableGPUs = append(availableGPUs, i)
}
}
if int64(len(availableGPUs)) < request.Count {
return false
}
switch request.TopologyConstraint {
case "single":
// 单 GPU,任何节点都满足
return request.Count == 1
case "nvlink":
// 需要 NVLINK 连接的 GPU
return g.findNVLinkConnectedGPUs(info.Topology, availableGPUs, int(request.Count)) != nil
case "same-numa":
// 需要同一 NUMA 节点的 GPU
return g.findSameNUMAGPUs(info.Topology, availableGPUs, int(request.Count)) != nil
case "any":
return true
default:
return true
}
}
// findNVLinkConnectedGPUs 找到 NVLINK 连接的 GPU 组合
func (g *GPUSchedulerPlugin) findNVLinkConnectedGPUs(topo *GPUTopology,
available []int, count int) []int {
if topo == nil || len(topo.Links) == 0 {
return nil
}
// 构建 NVLINK 连接图
nvlinkGraph := make(map[int][]int)
for _, i := range available {
for _, j := range available {
if i != j && topo.Links[i][j] == LinkTypeNVLink {
nvlinkGraph[i] = append(nvlinkGraph[i], j)
}
}
}
// 使用 DFS 找到连通分量
visited := make(map[int]bool)
var result []int
var dfs func(node int, group []int) []int
dfs = func(node int, group []int) []int {
if len(group) >= count {
return group
}
visited[node] = true
group = append(group, node)
for _, neighbor := range nvlinkGraph[node] {
if !visited[neighbor] {
if found := dfs(neighbor, group); len(found) >= count {
return found
}
}
}
return group
}
for _, gpu := range available {
if !visited[gpu] {
visited = make(map[int]bool)
if found := dfs(gpu, nil); len(found) >= count {
return found[:count]
}
}
}
return nil
}
// findSameNUMAGPUs 找到同一 NUMA 节点的 GPU
func (g *GPUSchedulerPlugin) findSameNUMAGPUs(topo *GPUTopology,
available []int, count int) []int {
if topo == nil || len(topo.NUMANodes) == 0 {
return nil
}
// 按 NUMA 节点分组
numaGroups := make(map[int][]int)
for _, gpu := range available {
numa := topo.NUMANodes[gpu]
numaGroups[numa] = append(numaGroups[numa], gpu)
}
// 找到满足数量要求的 NUMA 组
for _, gpus := range numaGroups {
if len(gpus) >= count {
return gpus[:count]
}
}
return nil
}
3.3 Score 插件
// Score 为节点打分
func (g *GPUSchedulerPlugin) Score(ctx context.Context, state *framework.CycleState,
pod *v1.Pod, nodeName string) (int64, *framework.Status) {
// 获取 GPU 请求
gpuRequestData, err := state.Read(StateKeyGPURequest)
if err != nil {
return 0, framework.NewStatus(framework.Success)
}
gpuRequest := gpuRequestData.(*GPURequest)
if gpuRequest.Count == 0 {
return 0, framework.NewStatus(framework.Success)
}
nodeGPUInfo, ok := g.stateManager.GetNodeGPUInfo(nodeName)
if !ok {
return 0, framework.NewStatus(framework.Success)
}
score := int64(0)
// 1. 拓扑得分(0-40分)
topoScore := g.calculateTopologyScore(nodeGPUInfo, gpuRequest)
score += topoScore
// 2. 碎片化得分(0-30分)
fragScore := g.calculateFragmentationScore(nodeGPUInfo, gpuRequest)
score += fragScore
// 3. 负载均衡得分(0-20分)
balanceScore := g.calculateBalanceScore(nodeGPUInfo)
score += balanceScore
// 4. 型号匹配得分(0-10分)
modelScore := g.calculateModelScore(nodeGPUInfo, gpuRequest)
score += modelScore
return score, framework.NewStatus(framework.Success)
}
// ScoreExtensions 返回得分扩展
func (g *GPUSchedulerPlugin) ScoreExtensions() framework.ScoreExtensions {
return g
}
// NormalizeScore 归一化得分到 [0, 100]
func (g *GPUSchedulerPlugin) NormalizeScore(ctx context.Context, state *framework.CycleState,
pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
// 找到最大分数
var maxScore int64 = 0
for _, score := range scores {
if score.Score > maxScore {
maxScore = score.Score
}
}
// 归一化
if maxScore > 0 {
for i := range scores {
scores[i].Score = scores[i].Score * framework.MaxNodeScore / maxScore
}
}
return framework.NewStatus(framework.Success)
}
// calculateTopologyScore 计算拓扑得分
func (g *GPUSchedulerPlugin) calculateTopologyScore(info *NodeGPUInfo,
request *GPURequest) int64 {
if info.Topology == nil || request.Count <= 1 {
return 20 // 单 GPU 或无拓扑信息,给中等分数
}
availableGPUs := make([]int, 0)
for i, gpu := range info.GPUs {
if gpu.State == DeviceStateFree && gpu.Health == DeviceHealthy {
availableGPUs = append(availableGPUs, i)
}
}
score := int64(0)
// 检查 NVLINK 连接
if request.PreferNVLink || request.TopologyConstraint == "nvlink" {
nvlinkGPUs := g.findNVLinkConnectedGPUs(info.Topology, availableGPUs, int(request.Count))
if nvlinkGPUs != nil {
score += 40 // 满分
} else {
score += 10 // 无法满足 NVLINK,给低分
}
} else {
// 即使没有要求,有 NVLINK 也加分
nvlinkGPUs := g.findNVLinkConnectedGPUs(info.Topology, availableGPUs, int(request.Count))
if nvlinkGPUs != nil {
score += 30
} else {
score += 20
}
}
// 检查 NUMA 亲和性
if request.PreferSameNUMA || request.TopologyConstraint == "same-numa" {
sameNUMAGPUs := g.findSameNUMAGPUs(info.Topology, availableGPUs, int(request.Count))
if sameNUMAGPUs != nil {
score += 10 // 额外加分
}
}
return score
}
// calculateFragmentationScore 计算碎片化得分
// 目标:避免留下难以利用的 GPU 碎片
func (g *GPUSchedulerPlugin) calculateFragmentationScore(info *NodeGPUInfo,
request *GPURequest) int64 {
// 统计可用 GPU 数量
availableCount := int64(0)
for _, gpu := range info.GPUs {
if gpu.State == DeviceStateFree && gpu.Health == DeviceHealthy {
availableCount++
}
}
totalGPUs := int64(len(info.GPUs))
remaining := availableCount - request.Count
// 策略:
// 1. 优先选择能完全使用的节点(remaining = 0)
// 2. 次选剩余 GPU 较多的节点(避免碎片)
// 3. 避免留下 1 个 GPU 的情况(很难利用)
if remaining == 0 {
return 30 // 完美匹配
}
if remaining == 1 {
return 5 // 留下 1 个 GPU,不太好
}
if remaining >= totalGPUs/2 {
return 25 // 还有一半以上可用
}
return 15 // 中等
}
// calculateBalanceScore 计算负载均衡得分
func (g *GPUSchedulerPlugin) calculateBalanceScore(info *NodeGPUInfo) int64 {
usedCount := 0
totalCount := len(info.GPUs)
for _, gpu := range info.GPUs {
if gpu.State != DeviceStateFree {
usedCount++
}
}
// 计算使用率
utilization := float64(usedCount) / float64(totalCount)
// 偏好使用率较高的节点(但不是满载)
// 这样可以让部分节点完全空闲,便于调度大任务
if utilization >= 0.8 {
return 10 // 接近满载,降低优先级
}
if utilization >= 0.5 {
return 20 // 中等使用率,最优
}
if utilization >= 0.2 {
return 15 // 使用率较低
}
return 10 // 几乎空闲
}
// calculateModelScore 计算 GPU 型号匹配得分
func (g *GPUSchedulerPlugin) calculateModelScore(info *NodeGPUInfo,
request *GPURequest) int64 {
if request.Model == "" {
return 5 // 没有型号要求,给基础分
}
matchCount := 0
for _, gpu := range info.GPUs {
if gpu.Model == request.Model && gpu.State == DeviceStateFree {
matchCount++
}
}
if int64(matchCount) >= request.Count {
return 10 // 满足型号要求
}
return 0
}
3.4 Reserve 和 Bind 插件
// Reserve 预留资源
func (g *GPUSchedulerPlugin) Reserve(ctx context.Context, state *framework.CycleState,
pod *v1.Pod, nodeName string) *framework.Status {
gpuRequestData, err := state.Read(StateKeyGPURequest)
if err != nil {
return framework.NewStatus(framework.Success)
}
gpuRequest := gpuRequestData.(*GPURequest)
if gpuRequest.Count == 0 {
return framework.NewStatus(framework.Success)
}
// 选择要分配的 GPU
selectedGPUs, err := g.selectGPUs(nodeName, gpuRequest)
if err != nil {
return framework.NewStatus(framework.Unschedulable,
fmt.Sprintf("failed to select GPUs: %v", err))
}
// 预留 GPU
podName := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
if err := g.stateManager.ReserveGPUs(nodeName, selectedGPUs, podName); err != nil {
return framework.NewStatus(framework.Unschedulable,
fmt.Sprintf("failed to reserve GPUs: %v", err))
}
// 将选择的 GPU 存入状态
state.Write("SelectedGPUs", selectedGPUs)
return framework.NewStatus(framework.Success)
}
// Unreserve 取消预留
func (g *GPUSchedulerPlugin) Unreserve(ctx context.Context, state *framework.CycleState,
pod *v1.Pod, nodeName string) {
selectedGPUsData, err := state.Read("SelectedGPUs")
if err != nil {
return
}
selectedGPUs := selectedGPUsData.([]int)
g.stateManager.UnreserveGPUs(nodeName, selectedGPUs)
}
// selectGPUs 选择要分配的 GPU
func (g *GPUSchedulerPlugin) selectGPUs(nodeName string, request *GPURequest) ([]int, error) {
info, ok := g.stateManager.GetNodeGPUInfo(nodeName)
if !ok {
return nil, fmt.Errorf("node %s not found", nodeName)
}
availableGPUs := make([]int, 0)
for i, gpu := range info.GPUs {
if gpu.State == DeviceStateFree && gpu.Health == DeviceHealthy {
// 检查型号
if request.Model != "" && gpu.Model != request.Model {
continue
}
// 检查显存
if request.MinMemory > 0 && gpu.MemoryTotal < request.MinMemory {
continue
}
availableGPUs = append(availableGPUs, i)
}
}
count := int(request.Count)
// 根据拓扑约束选择
switch request.TopologyConstraint {
case "nvlink":
selected := g.findNVLinkConnectedGPUs(info.Topology, availableGPUs, count)
if selected != nil {
return selected, nil
}
return nil, fmt.Errorf("cannot find NVLINK connected GPUs")
case "same-numa":
selected := g.findSameNUMAGPUs(info.Topology, availableGPUs, count)
if selected != nil {
return selected, nil
}
return nil, fmt.Errorf("cannot find GPUs in same NUMA")
default:
// 优先选择 NVLINK 连接的 GPU
if request.PreferNVLink && info.Topology != nil {
selected := g.findNVLinkConnectedGPUs(info.Topology, availableGPUs, count)
if selected != nil {
return selected, nil
}
}
// 优先选择同一 NUMA 的 GPU
if request.PreferSameNUMA && info.Topology != nil {
selected := g.findSameNUMAGPUs(info.Topology, availableGPUs, count)
if selected != nil {
return selected, nil
}
}
// 默认:选择前 N 个
if len(availableGPUs) >= count {
return availableGPUs[:count], nil
}
return nil, fmt.Errorf("insufficient GPUs")
}
}
4. 调度器部署与配置
4.1 调度器配置文件
# scheduler-config.yaml
apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
leaderElection:
leaderElect: true
resourceNamespace: kube-system
resourceName: gpu-scheduler
clientConnection:
kubeconfig: /etc/kubernetes/scheduler.conf
profiles:
- schedulerName: gpu-scheduler
plugins:
# 在默认插件之后添加 GPU 插件
preFilter:
enabled:
- name: GPUScheduler
filter:
enabled:
- name: GPUScheduler
preScore:
enabled:
- name: GPUScheduler
score:
enabled:
- name: GPUScheduler
weight: 10
reserve:
enabled:
- name: GPUScheduler
preBind:
enabled:
- name: GPUScheduler
pluginConfig:
- name: GPUScheduler
args:
# GPU 状态同步间隔
syncInterval: 30s
# 是否启用拓扑感知
topologyAware: true
# 碎片化策略
fragmentationPolicy: "minimize"
# 默认偏好
defaultPreferences:
preferNVLink: true
preferSameNUMA: true
4.2 部署 YAML
# gpu-scheduler-deployment.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: gpu-scheduler
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: gpu-scheduler
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch", "update", "patch"]
- apiGroups: [""]
resources: ["pods/binding"]
verbs: ["create"]
- apiGroups: [""]
resources: ["pods/status"]
verbs: ["update", "patch"]
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "patch", "update"]
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
# GPU 相关 CRD
- apiGroups: ["gpu.resource.io"]
resources: ["nodegpuinfos", "gpuallocations"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: gpu-scheduler
subjects:
- kind: ServiceAccount
name: gpu-scheduler
namespace: kube-system
roleRef:
kind: ClusterRole
name: gpu-scheduler
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: gpu-scheduler
namespace: kube-system
labels:
component: gpu-scheduler
spec:
replicas: 2 # HA 部署
selector:
matchLabels:
component: gpu-scheduler
template:
metadata:
labels:
component: gpu-scheduler
spec:
serviceAccountName: gpu-scheduler
priorityClassName: system-cluster-critical
nodeSelector:
node-role.kubernetes.io/control-plane: ""
tolerations:
- key: node-role.kubernetes.io/control-plane
effect: NoSchedule
containers:
- name: gpu-scheduler
image: gpu-scheduler:v1.0.0
imagePullPolicy: IfNotPresent
command:
- /gpu-scheduler
- --config=/etc/kubernetes/scheduler-config.yaml
- --v=4
resources:
requests:
cpu: 200m
memory: 256Mi
limits:
cpu: "1"
memory: 1Gi
livenessProbe:
httpGet:
path: /healthz
port: 10259
scheme: HTTPS
initialDelaySeconds: 15
periodSeconds: 10
readinessProbe:
httpGet:
path: /healthz
port: 10259
scheme: HTTPS
initialDelaySeconds: 5
periodSeconds: 10
volumeMounts:
- name: config
mountPath: /etc/kubernetes
readOnly: true
volumes:
- name: config
configMap:
name: gpu-scheduler-config
---
apiVersion: v1
kind: ConfigMap
metadata:
name: gpu-scheduler-config
namespace: kube-system
data:
scheduler-config.yaml: |
apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
leaderElection:
leaderElect: true
resourceNamespace: kube-system
resourceName: gpu-scheduler
profiles:
- schedulerName: gpu-scheduler
plugins:
preFilter:
enabled:
- name: GPUScheduler
filter:
enabled:
- name: GPUScheduler
score:
enabled:
- name: GPUScheduler
weight: 10
reserve:
enabled:
- name: GPUScheduler
4.3 使用自定义调度器
# gpu-training-job.yaml
apiVersion: v1
kind: Pod
metadata:
name: gpu-training
namespace: ml-workloads
annotations:
# 指定 GPU 型号
gpu.scheduler/model: "A100-SXM4-80GB"
# 最小显存需求
gpu.scheduler/memory: "40000"
# 拓扑约束:需要 NVLINK 连接
gpu.scheduler/topology: "nvlink"
# 偏好同一 NUMA
gpu.scheduler/prefer-same-numa: "true"
spec:
# 使用自定义调度器
schedulerName: gpu-scheduler
containers:
- name: training
image: pytorch/pytorch:2.0-cuda12.1-cudnn8-runtime
command: ["python", "train.py"]
resources:
limits:
nvidia.com/gpu: 4
requests:
nvidia.com/gpu: 4
cpu: "8"
memory: 64Gi
env:
- name: NCCL_DEBUG
value: INFO
- name: NCCL_IB_DISABLE
value: "0"
volumeMounts:
- name: data
mountPath: /data
volumes:
- name: data
persistentVolumeClaim:
claimName: training-data
restartPolicy: Never
5. GPU 信息收集器
5.1 Node GPU Info CRD
# gpu-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: nodegpuinfos.gpu.resource.io
spec:
group: gpu.resource.io
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
nodeName:
type: string
driverVersion:
type: string
cudaVersion:
type: string
gpus:
type: array
items:
type: object
properties:
uuid:
type: string
index:
type: integer
model:
type: string
memoryTotal:
type: integer
computeCapability:
type: string
migEnabled:
type: boolean
pcieInfo:
type: object
properties:
busId:
type: string
numaNode:
type: integer
generation:
type: integer
linkWidth:
type: integer
nvlinkPeers:
type: array
items:
type: integer
topology:
type: object
properties:
links:
type: array
items:
type: array
items:
type: string
numaNodes:
type: array
items:
type: integer
status:
type: object
properties:
lastUpdated:
type: string
format: date-time
gpuStates:
type: array
items:
type: object
properties:
index:
type: integer
state:
type: string
health:
type: string
allocatedPod:
type: string
memoryUsed:
type: integer
utilization:
type: integer
scope: Cluster
names:
plural: nodegpuinfos
singular: nodegpuinfo
kind: NodeGPUInfo
shortNames:
- ngi
5.2 GPU 信息收集 DaemonSet
// cmd/gpu-collector/main.go
package main
import (
"context"
"encoding/json"
"fmt"
"os"
"time"
"github.com/NVIDIA/go-nvml/pkg/nvml"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
type GPUCollector struct {
nodeName string
kubeClient kubernetes.Interface
interval time.Duration
}
func NewGPUCollector() (*GPUCollector, error) {
// 获取节点名
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
return nil, fmt.Errorf("NODE_NAME env not set")
}
// 创建 k8s 客户端
config, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to get k8s config: %v", err)
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create k8s client: %v", err)
}
return &GPUCollector{
nodeName: nodeName,
kubeClient: client,
interval: 30 * time.Second,
}, nil
}
func (c *GPUCollector) Run(ctx context.Context) error {
// 初始化 NVML
ret := nvml.Init()
if ret != nvml.SUCCESS {
return fmt.Errorf("failed to initialize NVML: %v", nvml.ErrorString(ret))
}
defer nvml.Shutdown()
ticker := time.NewTicker(c.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := c.collectAndUpdate(); err != nil {
fmt.Printf("Failed to collect GPU info: %v\n", err)
}
}
}
}
func (c *GPUCollector) collectAndUpdate() error {
info, err := c.collectGPUInfo()
if err != nil {
return err
}
return c.updateNodeGPUInfo(info)
}
func (c *GPUCollector) collectGPUInfo() (*NodeGPUInfo, error) {
// 获取驱动版本
driverVersion, ret := nvml.SystemGetDriverVersion()
if ret != nvml.SUCCESS {
return nil, fmt.Errorf("failed to get driver version: %v", nvml.ErrorString(ret))
}
// 获取 CUDA 版本
cudaVersion, ret := nvml.SystemGetCudaDriverVersion_v2()
if ret != nvml.SUCCESS {
return nil, fmt.Errorf("failed to get CUDA version: %v", nvml.ErrorString(ret))
}
// 获取 GPU 数量
count, ret := nvml.DeviceGetCount()
if ret != nvml.SUCCESS {
return nil, fmt.Errorf("failed to get device count: %v", nvml.ErrorString(ret))
}
info := &NodeGPUInfo{
NodeName: c.nodeName,
DriverVersion: driverVersion,
CUDAVersion: fmt.Sprintf("%d.%d", cudaVersion/1000, (cudaVersion%1000)/10),
GPUs: make([]GPUDevice, count),
}
// 收集每个 GPU 的信息
for i := 0; i < count; i++ {
device, ret := nvml.DeviceGetHandleByIndex(i)
if ret != nvml.SUCCESS {
continue
}
info.GPUs[i] = c.collectDeviceInfo(device, i)
}
// 收集拓扑信息
info.Topology = c.collectTopology(count)
return info, nil
}
func (c *GPUCollector) collectDeviceInfo(device nvml.Device, index int) GPUDevice {
gpu := GPUDevice{Index: index}
// UUID
uuid, ret := device.GetUUID()
if ret == nvml.SUCCESS {
gpu.UUID = uuid
}
// 型号
name, ret := device.GetName()
if ret == nvml.SUCCESS {
gpu.Model = name
}
// 显存
memory, ret := device.GetMemoryInfo()
if ret == nvml.SUCCESS {
gpu.MemoryTotal = int64(memory.Total / 1024 / 1024) // MB
gpu.MemoryFree = int64(memory.Free / 1024 / 1024)
}
// 计算能力
major, minor, ret := device.GetCudaComputeCapability()
if ret == nvml.SUCCESS {
gpu.ComputeCapability = fmt.Sprintf("%d.%d", major, minor)
}
// MIG 状态
mode, _, ret := device.GetMigMode()
if ret == nvml.SUCCESS {
gpu.MIGEnabled = mode == nvml.DEVICE_MIG_ENABLE
}
// 如果 MIG 启用,收集 MIG 设备
if gpu.MIGEnabled {
gpu.MIGDevices = c.collectMIGDevices(device)
}
// PCIe 信息
pciInfo, ret := device.GetPciInfo()
if ret == nvml.SUCCESS {
gpu.PCIeInfo = PCIeInfo{
BusID: fmt.Sprintf("%04x:%02x:%02x.0",
pciInfo.Domain, pciInfo.Bus, pciInfo.Device),
}
}
// NUMA 节点
// 注意:需要通过系统文件获取
numaNode, err := c.getNUMANode(gpu.PCIeInfo.BusID)
if err == nil {
gpu.PCIeInfo.NUMANode = numaNode
}
// 健康状态
gpu.Health = DeviceHealthy
return gpu
}
func (c *GPUCollector) collectMIGDevices(device nvml.Device) []MIGDevice {
var migDevices []MIGDevice
// 获取最大 GPU 实例数
maxGI, ret := device.GetMaxMigDeviceCount()
if ret != nvml.SUCCESS || maxGI == 0 {
return migDevices
}
// 枚举 MIG 设备
for i := 0; i < maxGI; i++ {
migDevice, ret := device.GetMigDeviceHandleByIndex(i)
if ret != nvml.SUCCESS {
continue
}
mig := MIGDevice{
GIIndex: i,
}
uuid, ret := migDevice.GetUUID()
if ret == nvml.SUCCESS {
mig.UUID = uuid
}
// 获取 profile 信息
attr, ret := migDevice.GetAttributes()
if ret == nvml.SUCCESS {
mig.Profile = fmt.Sprintf("%dg.%dgb",
attr.GpuInstanceSliceCount,
attr.MemorySizeMB/1024)
}
mig.State = DeviceStateFree
migDevices = append(migDevices, mig)
}
return migDevices
}
func (c *GPUCollector) collectTopology(gpuCount int) *GPUTopology {
topo := &GPUTopology{
NodeName: c.nodeName,
Links: make([][]LinkType, gpuCount),
NUMANodes: make([]int, gpuCount),
}
for i := 0; i < gpuCount; i++ {
topo.Links[i] = make([]LinkType, gpuCount)
device1, ret := nvml.DeviceGetHandleByIndex(i)
if ret != nvml.SUCCESS {
continue
}
for j := 0; j < gpuCount; j++ {
if i == j {
topo.Links[i][j] = LinkTypeSameBoard
continue
}
device2, ret := nvml.DeviceGetHandleByIndex(j)
if ret != nvml.SUCCESS {
topo.Links[i][j] = LinkTypeNone
continue
}
// 获取 GPU 间的拓扑关系
pathInfo, ret := nvml.DeviceGetTopologyCommonAncestor(device1, device2)
if ret != nvml.SUCCESS {
topo.Links[i][j] = LinkTypeNone
continue
}
switch pathInfo {
case nvml.TOPOLOGY_INTERNAL:
topo.Links[i][j] = LinkTypeSameBoard
case nvml.TOPOLOGY_SINGLE:
// 检查是否有 NVLINK
nvlinkCap, ret := device1.GetNvLinkCapability(uint32(j), nvml.NVLINK_CAP_P2P_SUPPORTED)
if ret == nvml.SUCCESS && nvlinkCap > 0 {
topo.Links[i][j] = LinkTypeNVLink
} else {
topo.Links[i][j] = LinkTypePCIe
}
case nvml.TOPOLOGY_MULTIPLE:
topo.Links[i][j] = LinkTypePCIe
case nvml.TOPOLOGY_HOSTBRIDGE:
topo.Links[i][j] = LinkTypePCIe
case nvml.TOPOLOGY_NODE:
topo.Links[i][j] = LinkTypePCIe
case nvml.TOPOLOGY_SYSTEM:
topo.Links[i][j] = LinkTypePCIe
default:
topo.Links[i][j] = LinkTypeNone
}
}
}
return topo
}
func (c *GPUCollector) getNUMANode(busID string) (int, error) {
// 读取 /sys/bus/pci/devices/{busID}/numa_node
path := fmt.Sprintf("/sys/bus/pci/devices/%s/numa_node", busID)
data, err := os.ReadFile(path)
if err != nil {
return 0, err
}
var numa int
fmt.Sscanf(string(data), "%d", &numa)
return numa, nil
}
func (c *GPUCollector) updateNodeGPUInfo(info *NodeGPUInfo) error {
// 这里应该更新 CRD,简化示例使用 ConfigMap
data, err := json.Marshal(info)
if err != nil {
return err
}
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("gpu-info-%s", c.nodeName),
Namespace: "kube-system",
Labels: map[string]string{
"app": "gpu-collector",
"node": c.nodeName,
},
},
Data: map[string]string{
"gpu-info": string(data),
},
}
_, err = c.kubeClient.CoreV1().ConfigMaps("kube-system").Update(
context.Background(), cm, metav1.UpdateOptions{})
if err != nil {
// 如果不存在则创建
_, err = c.kubeClient.CoreV1().ConfigMaps("kube-system").Create(
context.Background(), cm, metav1.CreateOptions{})
}
return err
}
func main() {
collector, err := NewGPUCollector()
if err != nil {
fmt.Printf("Failed to create collector: %v\n", err)
os.Exit(1)
}
ctx := context.Background()
if err := collector.Run(ctx); err != nil {
fmt.Printf("Collector error: %v\n", err)
os.Exit(1)
}
}
5.3 部署收集器
# gpu-collector-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: gpu-collector
namespace: kube-system
spec:
selector:
matchLabels:
app: gpu-collector
template:
metadata:
labels:
app: gpu-collector
spec:
serviceAccountName: gpu-collector
nodeSelector:
nvidia.com/gpu.present: "true"
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
containers:
- name: collector
image: gpu-collector:v1.0.0
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
securityContext:
privileged: true
volumeMounts:
- name: device-plugins
mountPath: /var/lib/kubelet/device-plugins
- name: sys
mountPath: /sys
readOnly: true
resources:
requests:
cpu: 50m
memory: 64Mi
limits:
cpu: 200m
memory: 256Mi
volumes:
- name: device-plugins
hostPath:
path: /var/lib/kubelet/device-plugins
- name: sys
hostPath:
path: /sys
6. 调度器扩展:Scheduler Extender
除了使用 Scheduling Framework,还可以通过 Scheduler Extender 实现外部扩展:
6.1 Extender 架构
┌──────────────────────────────────────────────────────────────────────┐
│ Scheduler Extender Architecture │
├──────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────────────┐ │
│ │ │ HTTP Webhook │ │ │
│ │ kube-scheduler │◄──────────────────▶│ GPU Scheduler │ │
│ │ │ │ Extender │ │
│ │ (default) │ │ │ │
│ └────────┬────────┘ │ - Filter │ │
│ │ │ - Prioritize │ │
│ │ │ - Bind │ │
│ │ │ │ │
│ ▼ └───────────┬─────────────┘ │
│ ┌─────────────────┐ │ │
│ │ │ │ │
│ │ Pod Queue │ ▼ │
│ │ │ ┌─────────────────────────┐ │
│ └─────────────────┘ │ GPU State Store │ │
│ │ │ │
│ │ - Node GPU Info │ │
│ │ - Allocation Cache │ │
│ │ - Topology Data │ │
│ └─────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────┘
6.2 Extender 实现
// pkg/extender/server.go
package extender
import (
"encoding/json"
"net/http"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
)
type GPUExtender struct {
stateManager *GPUStateManager
}
func NewGPUExtender(stateManager *GPUStateManager) *GPUExtender {
return &GPUExtender{stateManager: stateManager}
}
// Filter 过滤节点
func (e *GPUExtender) Filter(w http.ResponseWriter, r *http.Request) {
var args extenderv1.ExtenderArgs
if err := json.NewDecoder(r.Body).Decode(&args); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
result := &extenderv1.ExtenderFilterResult{
Nodes: &v1.NodeList{},
FailedNodes: make(extenderv1.FailedNodesMap),
}
// 解析 GPU 请求
gpuRequest := parseGPURequestFromPod(args.Pod)
if gpuRequest.Count == 0 {
// 不需要 GPU,所有节点都通过
result.Nodes = args.Nodes
writeResponse(w, result)
return
}
// 过滤节点
for _, node := range args.Nodes.Items {
if err := e.checkNode(&node, gpuRequest); err != nil {
result.FailedNodes[node.Name] = err.Error()
} else {
result.Nodes.Items = append(result.Nodes.Items, node)
}
}
writeResponse(w, result)
}
// Prioritize 节点打分
func (e *GPUExtender) Prioritize(w http.ResponseWriter, r *http.Request) {
var args extenderv1.ExtenderArgs
if err := json.NewDecoder(r.Body).Decode(&args); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
result := make(extenderv1.HostPriorityList, len(args.Nodes.Items))
gpuRequest := parseGPURequestFromPod(args.Pod)
for i, node := range args.Nodes.Items {
score := e.scoreNode(&node, gpuRequest)
result[i] = extenderv1.HostPriority{
Host: node.Name,
Score: score,
}
}
writeResponse(w, result)
}
// Bind 绑定 Pod
func (e *GPUExtender) Bind(w http.ResponseWriter, r *http.Request) {
var args extenderv1.ExtenderBindingArgs
if err := json.NewDecoder(r.Body).Decode(&args); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
result := &extenderv1.ExtenderBindingResult{}
// 分配具体的 GPU 设备
if err := e.allocateGPUs(args.PodName, args.PodNamespace, args.Node); err != nil {
result.Error = err.Error()
writeResponse(w, result)
return
}
// 执行绑定
if err := e.bindPod(args.PodName, args.PodNamespace, args.Node); err != nil {
result.Error = err.Error()
}
writeResponse(w, result)
}
func (e *GPUExtender) checkNode(node *v1.Node, request *GPURequest) error {
nodeInfo, ok := e.stateManager.GetNodeGPUInfo(node.Name)
if !ok {
return fmt.Errorf("no GPU info for node %s", node.Name)
}
available := e.stateManager.GetAvailableGPUs(node.Name)
if int64(len(available)) < request.Count {
return fmt.Errorf("insufficient GPUs: need %d, have %d",
request.Count, len(available))
}
// 检查型号
if request.Model != "" {
matchCount := 0
for _, gpu := range available {
if gpu.Model == request.Model {
matchCount++
}
}
if int64(matchCount) < request.Count {
return fmt.Errorf("insufficient GPUs with model %s", request.Model)
}
}
return nil
}
func (e *GPUExtender) scoreNode(node *v1.Node, request *GPURequest) int64 {
if request.Count == 0 {
return 50
}
nodeInfo, ok := e.stateManager.GetNodeGPUInfo(node.Name)
if !ok {
return 0
}
score := int64(50)
// 根据可用 GPU 数量调整分数
available := e.stateManager.GetAvailableGPUs(node.Name)
totalGPUs := len(nodeInfo.GPUs)
// 避免碎片化
remaining := int64(len(available)) - request.Count
if remaining == 0 {
score += 30 // 完美匹配
} else if remaining >= int64(totalGPUs)/2 {
score += 20
} else if remaining == 1 {
score -= 10 // 留下 1 个难利用
}
return score
}
func writeResponse(w http.ResponseWriter, result interface{}) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(result)
}
// StartServer 启动 HTTP 服务
func (e *GPUExtender) StartServer(addr string) error {
http.HandleFunc("/filter", e.Filter)
http.HandleFunc("/prioritize", e.Prioritize)
http.HandleFunc("/bind", e.Bind)
return http.ListenAndServe(addr, nil)
}
6.3 Extender 配置
# scheduler-policy.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: scheduler-policy
namespace: kube-system
data:
policy.cfg: |
{
"kind": "Policy",
"apiVersion": "v1",
"extenders": [
{
"urlPrefix": "http://gpu-scheduler-extender.kube-system.svc:8888",
"filterVerb": "filter",
"prioritizeVerb": "prioritize",
"bindVerb": "bind",
"weight": 10,
"enableHttps": false,
"nodeCacheCapable": false,
"managedResources": [
{
"name": "nvidia.com/gpu",
"ignoredByScheduler": false
}
],
"ignorable": false
}
]
}
7. 调度器监控与调试
7.1 调度指标
// pkg/metrics/metrics.go
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
// 调度延迟
SchedulingLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gpu_scheduler_scheduling_duration_seconds",
Help: "GPU scheduling latency in seconds",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
},
[]string{"result", "gpu_model"},
)
// 调度结果
SchedulingAttempts = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "gpu_scheduler_scheduling_attempts_total",
Help: "Number of scheduling attempts",
},
[]string{"result", "failure_reason"},
)
// GPU 分配
GPUAllocations = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "gpu_scheduler_gpu_allocations",
Help: "Number of GPU allocations per node",
},
[]string{"node", "gpu_model"},
)
// GPU 可用数量
GPUAvailable = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "gpu_scheduler_gpu_available",
Help: "Number of available GPUs per node",
},
[]string{"node", "gpu_model"},
)
// 拓扑感知调度
TopologyAwareSchedules = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "gpu_scheduler_topology_aware_schedules_total",
Help: "Number of topology-aware scheduling decisions",
},
[]string{"constraint_type", "satisfied"},
)
// 碎片化指标
GPUFragmentation = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "gpu_scheduler_fragmentation_ratio",
Help: "GPU fragmentation ratio per node (0-1)",
},
[]string{"node"},
)
)
// RecordSchedulingDuration 记录调度耗时
func RecordSchedulingDuration(duration float64, result, gpuModel string) {
SchedulingLatency.WithLabelValues(result, gpuModel).Observe(duration)
}
// RecordSchedulingAttempt 记录调度尝试
func RecordSchedulingAttempt(result, failureReason string) {
SchedulingAttempts.WithLabelValues(result, failureReason).Inc()
}
// UpdateGPUMetrics 更新 GPU 指标
func UpdateGPUMetrics(nodeInfo *NodeGPUInfo) {
allocatedByModel := make(map[string]int)
availableByModel := make(map[string]int)
for _, gpu := range nodeInfo.GPUs {
if gpu.State == DeviceStateFree {
availableByModel[gpu.Model]++
} else {
allocatedByModel[gpu.Model]++
}
}
for model, count := range allocatedByModel {
GPUAllocations.WithLabelValues(nodeInfo.NodeName, model).Set(float64(count))
}
for model, count := range availableByModel {
GPUAvailable.WithLabelValues(nodeInfo.NodeName, model).Set(float64(count))
}
// 计算碎片化比例
fragRatio := calculateFragmentation(nodeInfo)
GPUFragmentation.WithLabelValues(nodeInfo.NodeName).Set(fragRatio)
}
// calculateFragmentation 计算碎片化比例
func calculateFragmentation(info *NodeGPUInfo) float64 {
if len(info.GPUs) == 0 {
return 0
}
// 简单算法:如果剩余 GPU 无法满足常见请求(2、4、8),则视为碎片
freeCount := 0
for _, gpu := range info.GPUs {
if gpu.State == DeviceStateFree {
freeCount++
}
}
// 如果剩余 1 或 3 个 GPU,碎片化较高
if freeCount == 1 || freeCount == 3 {
return 0.7
}
if freeCount == 5 || freeCount == 6 || freeCount == 7 {
return 0.3
}
return 0.1
}
7.2 Grafana Dashboard
{
"dashboard": {
"title": "GPU Scheduler Dashboard",
"panels": [
{
"title": "Scheduling Latency",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.99, sum(rate(gpu_scheduler_scheduling_duration_seconds_bucket[5m])) by (le, result))",
"legendFormat": "p99 - {{result}}"
},
{
"expr": "histogram_quantile(0.50, sum(rate(gpu_scheduler_scheduling_duration_seconds_bucket[5m])) by (le, result))",
"legendFormat": "p50 - {{result}}"
}
]
},
{
"title": "Scheduling Success Rate",
"type": "stat",
"targets": [
{
"expr": "sum(rate(gpu_scheduler_scheduling_attempts_total{result=\"success\"}[5m])) / sum(rate(gpu_scheduler_scheduling_attempts_total[5m])) * 100",
"legendFormat": "Success Rate %"
}
]
},
{
"title": "GPU Availability by Node",
"type": "table",
"targets": [
{
"expr": "gpu_scheduler_gpu_available",
"format": "table"
}
]
},
{
"title": "GPU Fragmentation",
"type": "gauge",
"targets": [
{
"expr": "avg(gpu_scheduler_fragmentation_ratio)",
"legendFormat": "Avg Fragmentation"
}
],
"options": {
"maxValue": 1,
"thresholds": [
{"value": 0, "color": "green"},
{"value": 0.3, "color": "yellow"},
{"value": 0.6, "color": "red"}
]
}
},
{
"title": "Topology-Aware Schedules",
"type": "piechart",
"targets": [
{
"expr": "sum(gpu_scheduler_topology_aware_schedules_total) by (constraint_type, satisfied)",
"legendFormat": "{{constraint_type}} - {{satisfied}}"
}
]
}
]
}
}
7.3 调试命令
# 查看调度器日志
kubectl logs -n kube-system -l component=gpu-scheduler -f
# 查看 Pod 调度事件
kubectl describe pod <pod-name> | grep -A 20 Events
# 检查节点 GPU 信息
kubectl get configmap -n kube-system gpu-info-<node-name> -o jsonpath='{.data.gpu-info}' | jq .
# 模拟调度(dry-run)
kubectl run gpu-test --image=nvidia/cuda:12.1-runtime --dry-run=client \
--overrides='{"spec":{"schedulerName":"gpu-scheduler","containers":[{"name":"test","resources":{"limits":{"nvidia.com/gpu":"2"}}}]}}' -o yaml
# 查看调度器指标
kubectl port-forward -n kube-system svc/gpu-scheduler-metrics 8080:8080
curl http://localhost:8080/metrics | grep gpu_scheduler
8. 最佳实践
8.1 调度策略选择
| 场景 | 推荐策略 | 说明 |
|---|---|---|
| 大规模训练 | NVLINK 优先 | 减少通信开销 |
| 推理服务 | 碎片化最小 | 提高资源利用率 |
| 混合负载 | 均衡策略 | 兼顾各类任务 |
| MIG 场景 | Profile 匹配 | 精确资源分配 |
8.2 性能优化建议
- 缓存 GPU 状态:避免频繁查询 NVML
- 批量处理:合并多个调度请求
- 异步更新:状态收集与调度解耦
- 预热缓存:启动时预加载节点信息
8.3 故障处理
# Pod 配置示例:处理调度失败
apiVersion: v1
kind: Pod
metadata:
name: gpu-job
spec:
schedulerName: gpu-scheduler
# 调度超时重试
tolerations:
- key: "node.kubernetes.io/unschedulable"
operator: "Exists"
effect: "NoSchedule"
# 优先级(紧急任务优先调度)
priorityClassName: high-priority
containers:
- name: main
resources:
limits:
nvidia.com/gpu: 2
# 调度失败后的处理
restartPolicy: OnFailure
总结
本章深入讲解了 Kubernetes GPU 调度器的实现原理:
- 调度框架:理解 Scheduling Framework 的扩展点机制
- GPU 状态管理:设计数据结构存储 GPU 拓扑和状态
- 调度插件:实现 PreFilter、Filter、Score、Reserve 等插件
- 拓扑感知:支持 NVLINK、NUMA 等拓扑约束
- Extender 模式:通过 Webhook 扩展调度能力
- 监控调试:Prometheus 指标和 Grafana 可视化
下一章我们将深入探讨 拓扑感知调度,详细讲解如何利用 GPU 拓扑信息优化分布式训练性能。