Device Plugin 机制深度解析
从源码层面理解 Kubernetes 如何管理 GPU 等扩展资源
本章目标
- 深入理解 Kubernetes Extended Resources 机制
- 掌握 Device Plugin 架构和工作原理
- 分析 NVIDIA Device Plugin 源码实现
- 学会开发自定义 Device Plugin
1. Kubernetes 资源模型
1.1 原生资源 vs 扩展资源
┌─────────────────────────────────────────────────────────────────────┐
│ Kubernetes 资源模型 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 原生资源 (Native Resources) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ cpu │ │
│ │ ├── 单位: 核心数 (可以是小数,如 0.5, 100m) │ │
│ │ ├── 调度器理解其含义 │ │
│ │ └── kubelet 通过 cgroups 强制执行限制 │ │
│ │ │ │
│ │ memory │ │
│ │ ├── 单位: 字节 (Ki, Mi, Gi) │ │
│ │ ├── 调度器理解其含义 │ │
│ │ └── kubelet 通过 cgroups 强制执行限制 │ │
│ │ │ │
│ │ ephemeral-storage │ │
│ │ ├── 单位: 字节 │ │
│ │ └── 临时存储配额 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ 扩展资源 (Extended Resources) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 特点: │ │
│ │ ├── 必须是整数 (不能是小数) │ │
│ │ ├── 调度器只做数量匹配,不理解资源含义 │ │
│ │ ├── kubelet 不强制执行限制 (需要外部机制) │ │
│ │ └── 通过 Device Plugin 或 API 上报 │ │
│ │ │ │
│ │ 示例: │ │
│ │ ├── nvidia.com/gpu: 2 # NVIDIA GPU │ │
│ │ ├── amd.com/gpu: 1 # AMD GPU │ │
│ │ ├── nvidia.com/mig-3g.40gb: 1 # MIG 实例 │ │
│ │ ├── example.com/fpga: 1 # FPGA │ │
│ │ ├── example.com/rdma: 1 # RDMA 设备 │ │
│ │ └── hugepages-2Mi: 1000 # 大页内存 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ 扩展资源的限制: │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 1. 必须是整数 │ │
│ │ ✓ nvidia.com/gpu: 2 │ │
│ │ ✗ nvidia.com/gpu: 0.5 (不允许) │ │
│ │ │ │
│ │ 2. requests 必须等于 limits │ │
│ │ 如果只设置了一个,会自动填充另一个 │ │
│ │ │ │
│ │ 3. 不能超额分配 (不能 overcommit) │ │
│ │ 不像 CPU/内存可以设置 limits > requests │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
1.2 资源上报方式
# 方式 1: 通过 API 直接上报 (适合节点级资源)
# PATCH /api/v1/nodes/<node-name>/status
# 使用 kubectl patch 上报自定义资源
kubectl patch node <node-name> --type merge -p '
{
"status": {
"capacity": {
"example.com/custom-resource": "10"
}
}
}'
# 方式 2: 通过 Device Plugin (推荐方式)
# Device Plugin 会自动向 kubelet 注册并上报资源
# kubelet 再更新 Node status
# 查看节点的扩展资源
kubectl get node <node-name> -o jsonpath='{.status.capacity}'
# 输出示例:
# {
# "cpu": "64",
# "memory": "256Gi",
# "nvidia.com/gpu": "8",
# "nvidia.com/mig-3g.40gb": "2"
# }
2. Device Plugin 架构
2.1 整体架构
┌─────────────────────────────────────────────────────────────────────────────┐
│ Device Plugin 架构 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Kubernetes 控制面 │ │
│ │ │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ kube-scheduler │ │ kube-apiserver │ │ │
│ │ │ │ │ │ │ │
│ │ │ 资源匹配调度 │◄──────►│ Node Status │ │ │
│ │ │ (nvidia.com/ │ │ Capacity │ │ │
│ │ │ gpu: 2) │ │ Allocatable │ │ │
│ │ └─────────────────┘ └────────┬────────┘ │ │
│ │ │ │ │
│ └──────────────────────────────────────┼──────────────────────────────┘ │
│ │ watch/update │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ kubelet │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ │ Device Manager │ │ │
│ │ │ │ │ │
│ │ │ 职责: │ │ │
│ │ │ 1. 监听 Device Plugin 注册 │ │ │
│ │ │ 2. 调用 Device Plugin 获取设备列表 │ │ │
│ │ │ 3. 向 API Server 上报 Capacity/Allocatable │ │ │
│ │ │ 4. Pod 创建时分配设备 │ │ │
│ │ │ 5. 保存分配状态 (checkpoint) │ │ │
│ │ │ │ │ │
│ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │
│ │ │ │ Device Plugin 注册表 │ │ │ │
│ │ │ │ │ │ │ │
│ │ │ │ nvidia.com/gpu → socket: /var/lib/kubelet/... │ │ │ │
│ │ │ │ amd.com/gpu → socket: /var/lib/kubelet/... │ │ │ │
│ │ │ │ example.com/fpga → socket: /var/lib/kubelet/... │ │ │ │
│ │ │ └──────────────────────────────────────────────────────┘ │ │ │
│ │ └───────────────────────────┬─────────────────────────────────┘ │ │
│ │ │ gRPC │ │
│ └──────────────────────────────┼──────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────┼───────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ NVIDIA Device │ │ AMD Device │ │ Custom Device │ │
│ │ Plugin │ │ Plugin │ │ Plugin │ │
│ │ │ │ │ │ │ │
│ │ - 发现 GPU │ │ - 发现 GPU │ │ - 发现设备 │ │
│ │ - 上报数量 │ │ - 上报数量 │ │ - 上报数量 │ │
│ │ - 分配设备 │ │ - 分配设备 │ │ - 分配设备 │ │
│ │ - 返回挂载信息 │ │ - 返回挂载信息 │ │ - 返回挂载信息 │ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ NVIDIA GPU │ │ AMD GPU │ │ Custom Device │ │
│ │ /dev/nvidia0 │ │ /dev/dri/... │ │ /dev/xxx │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
2.2 gRPC 接口定义
// Device Plugin API v1beta1
// 位置: k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1/api.proto
syntax = "proto3";
package v1beta1;
// Registration 服务 - Device Plugin 向 kubelet 注册
service Registration {
// Device Plugin 启动时调用此接口向 kubelet 注册
rpc Register(RegisterRequest) returns (Empty) {}
}
message RegisterRequest {
// API 版本,当前是 v1beta1
string version = 1;
// Unix socket 端点,kubelet 通过此 socket 与 Device Plugin 通信
string endpoint = 2;
// 资源名称,如 "nvidia.com/gpu"
string resource_name = 3;
// Device Plugin 选项
DevicePluginOptions options = 4;
}
message DevicePluginOptions {
// 是否需要在 Pod 创建前调用 PreStartContainer
bool pre_start_required = 1;
// 是否需要在 Pod 调度后调用 GetPreferredAllocation
bool get_preferred_allocation_available = 2;
}
// DevicePlugin 服务 - kubelet 调用 Device Plugin 的接口
service DevicePlugin {
// 获取 Device Plugin 支持的选项
rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {}
// 流式接口:Device Plugin 持续向 kubelet 报告设备列表
// 设备状态变化时推送更新
rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}
// 为 Pod 分配设备
// kubelet 在创建容器前调用
rpc Allocate(AllocateRequest) returns (AllocateResponse) {}
// 获取首选的设备分配方案
// 用于拓扑感知调度
rpc GetPreferredAllocation(PreferredAllocationRequest)
returns (PreferredAllocationResponse) {}
// 容器启动前的钩子
rpc PreStartContainer(PreStartContainerRequest)
returns (PreStartContainerResponse) {}
}
// 设备信息
message Device {
// 设备 ID,如 "GPU-uuid-xxxx" 或 "0"
string ID = 1;
// 健康状态
string health = 2; // "Healthy" 或 "Unhealthy"
// 设备拓扑信息 (可选)
TopologyInfo topology = 3;
}
message TopologyInfo {
repeated NUMANode nodes = 1;
}
message NUMANode {
int64 ID = 1;
}
// ListAndWatch 响应
message ListAndWatchResponse {
repeated Device devices = 1;
}
// Allocate 请求
message AllocateRequest {
repeated ContainerAllocateRequest container_requests = 1;
}
message ContainerAllocateRequest {
// 请求分配的设备 ID 列表
repeated string devicesIDs = 1;
}
// Allocate 响应
message AllocateResponse {
repeated ContainerAllocateResponse container_responses = 1;
}
message ContainerAllocateResponse {
// 环境变量
map<string, string> envs = 1;
// 挂载点
repeated Mount mounts = 2;
// 设备节点
repeated DeviceSpec devices = 3;
// 容器注解
map<string, string> annotations = 4;
}
message Mount {
string container_path = 1;
string host_path = 2;
bool read_only = 3;
}
message DeviceSpec {
string container_path = 1;
string host_path = 2;
string permissions = 3; // "r", "w", "m" 的组合
}
2.3 Device Plugin 生命周期
┌─────────────────────────────────────────────────────────────────────┐
│ Device Plugin 生命周期 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 阶段 1: 注册 │
│ ─────────────────────────────────────────────────────────────── │
│ │
│ Device Plugin kubelet │
│ │ │ │
│ │ 1. 创建 gRPC Server │ │
│ │ 监听 /var/lib/kubelet/ │ │
│ │ device-plugins/<plugin>.sock │ │
│ │ │ │
│ │ 2. Register() │ │
│ │────────────────────────────────────►│ │
│ │ resource_name: nvidia.com/gpu │ │
│ │ endpoint: nvidia.sock │ │
│ │ │ │
│ │◄────────────────────────────────────│ │
│ │ OK │ │
│ │ │ │
│ │
│ 阶段 2: 设备发现 (ListAndWatch) │
│ ─────────────────────────────────────────────────────────────── │
│ │
│ Device Plugin kubelet │
│ │ │ │
│ │◄────────────────────────────────────│ │
│ │ ListAndWatch() │ │
│ │ │ │
│ │ 3. 扫描硬件设备 │ │
│ │ nvidia-smi / NVML │ │
│ │ │ │
│ │────────────────────────────────────►│ │
│ │ devices: [ │ │
│ │ {ID: "GPU-uuid-1", health: "Healthy"}, │
│ │ {ID: "GPU-uuid-2", health: "Healthy"} │
│ │ ] │ │
│ │ │ │
│ │ (设备状态变化时继续推送...) │ │
│ │────────────────────────────────────►│ │
│ │ devices: [...] │ 4. 更新 Node Status │
│ │ │ Capacity │
│ │ │ Allocatable │
│ │
│ 阶段 3: 设备分配 (Allocate) │
│ ─────────────────────────────────────────────────────────────── │
│ │
│ Device Plugin kubelet │
│ │ │ │
│ │ │ 5. Pod 被调度到节点 │
│ │ │ │
│ │◄────────────────────────────────────│ │
│ │ Allocate() │ │
│ │ devicesIDs: ["GPU-uuid-1"] │ │
│ │ │ │
│ │ 6. 准备设备 │ │
│ │ - 设置环境变量 │ │
│ │ - 准备挂载点 │ │
│ │ - 准备设备节点 │ │
│ │ │ │
│ │────────────────────────────────────►│ │
│ │ envs: { │ │
│ │ "NVIDIA_VISIBLE_DEVICES": "GPU-uuid-1" │
│ │ } │ │
│ │ mounts: [ │ │
│ │ {host: "/usr/lib/...", │ │
│ │ container: "/usr/lib/..."} │ │
│ │ ] │ │
│ │ devices: [ │ 7. 创建容器 │
│ │ {host: "/dev/nvidia0", │ 传递 envs/ │
│ │ container: "/dev/nvidia0"} │ mounts/devices │
│ │ ] │ │
│ │ │ │
│ │
│ 阶段 4: 健康检查和状态更新 │
│ ─────────────────────────────────────────────────────────────── │
│ │
│ Device Plugin kubelet │
│ │ │ │
│ │ (持续监控设备健康状态) │ │
│ │ │ │
│ │────────────────────────────────────►│ │
│ │ devices: [ │ │
│ │ {ID: "GPU-uuid-1", health: "Unhealthy"}, │
│ │ {ID: "GPU-uuid-2", health: "Healthy"} │
│ │ ] │ 8. 更新 Allocatable │
│ │ │ 不健康设备不分配 │
│ │ │ │
│ │
└─────────────────────────────────────────────────────────────────────┘
3. NVIDIA Device Plugin 源码分析
3.1 项目结构
# GitHub: github.com/NVIDIA/k8s-device-plugin
# 目录结构:
k8s-device-plugin/
├── cmd/
│ └── nvidia-device-plugin/
│ └── main.go # 入口
├── internal/
│ ├── plugin/ # Device Plugin 核心实现
│ │ ├── server.go # gRPC Server
│ │ └── manager.go # 设备管理
│ ├── rm/ # Resource Manager
│ │ ├── nvml.go # NVML 封装
│ │ └── mig.go # MIG 支持
│ └── vgpu/ # vGPU 支持
├── api/config/v1/ # 配置 API
│ └── config.go
└── deployments/ # 部署文件
└── helm/ # Helm Chart
3.2 核心代码分析
// ==================== 主入口 ====================
// cmd/nvidia-device-plugin/main.go
package main
import (
"github.com/NVIDIA/k8s-device-plugin/internal/plugin"
"github.com/NVIDIA/go-nvml/pkg/nvml"
)
func main() {
// 1. 初始化 NVML
ret := nvml.Init()
if ret != nvml.SUCCESS {
log.Fatal("Failed to initialize NVML")
}
defer nvml.Shutdown()
// 2. 加载配置
config := loadConfig()
// 3. 创建 Plugin Manager
manager := plugin.NewManager(config)
// 4. 启动并运行
manager.Run()
}
// ==================== Plugin Manager ====================
// internal/plugin/manager.go
type Manager struct {
config *config.Config
plugins map[string]*NvidiaDevicePlugin
nvml nvml.Interface
}
func NewManager(config *config.Config) *Manager {
return &Manager{
config: config,
plugins: make(map[string]*NvidiaDevicePlugin),
nvml: nvml.New(),
}
}
func (m *Manager) Run() error {
// 1. 发现 GPU 设备
devices, err := m.discoverDevices()
if err != nil {
return err
}
// 2. 根据配置决定暴露哪些资源
// 可能是完整 GPU、MIG 设备、或时分复用份额
resources := m.getResourcesToRegister(devices)
// 3. 为每种资源创建 Device Plugin
for _, resource := range resources {
plugin := NewNvidiaDevicePlugin(resource, m.config)
m.plugins[resource.Name] = plugin
// 4. 启动 Plugin (在单独 goroutine)
go plugin.Start()
}
// 5. 监听 kubelet 重启信号
// kubelet 重启后需要重新注册
m.watchKubeletRestart()
// 6. 阻塞等待
select {}
}
func (m *Manager) discoverDevices() ([]*Device, error) {
// 使用 NVML 发现 GPU
count, ret := m.nvml.DeviceGetCount()
if ret != nvml.SUCCESS {
return nil, fmt.Errorf("failed to get device count: %v", ret)
}
var devices []*Device
for i := 0; i < count; i++ {
device, ret := m.nvml.DeviceGetHandleByIndex(i)
if ret != nvml.SUCCESS {
continue
}
uuid, _ := device.GetUUID()
name, _ := device.GetName()
memory, _ := device.GetMemoryInfo()
// 检查 MIG 模式
migMode, _ := device.GetMigMode()
devices = append(devices, &Device{
Index: i,
UUID: uuid,
Name: name,
Memory: memory.Total,
MIGEnabled: migMode == nvml.DEVICE_MIG_ENABLE,
})
}
return devices, nil
}
// ==================== Device Plugin 实现 ====================
// internal/plugin/server.go
type NvidiaDevicePlugin struct {
resourceName string
socket string
server *grpc.Server
devices []*pluginapi.Device
health chan *pluginapi.Device
stop chan struct{}
}
func NewNvidiaDevicePlugin(resource *Resource, config *config.Config) *NvidiaDevicePlugin {
return &NvidiaDevicePlugin{
resourceName: resource.Name,
socket: fmt.Sprintf("%s-%s.sock", "nvidia", resource.Name),
devices: resource.Devices,
health: make(chan *pluginapi.Device),
stop: make(chan struct{}),
}
}
func (p *NvidiaDevicePlugin) Start() error {
// 1. 清理旧的 socket 文件
socketPath := filepath.Join(pluginapi.DevicePluginPath, p.socket)
os.Remove(socketPath)
// 2. 创建 gRPC Server
p.server = grpc.NewServer()
pluginapi.RegisterDevicePluginServer(p.server, p)
// 3. 监听 Unix Socket
listener, err := net.Listen("unix", socketPath)
if err != nil {
return err
}
// 4. 启动 gRPC Server
go p.server.Serve(listener)
// 5. 向 kubelet 注册
err = p.Register()
if err != nil {
return err
}
// 6. 启动健康检查
go p.healthCheck()
return nil
}
func (p *NvidiaDevicePlugin) Register() error {
// 连接 kubelet 的注册服务
conn, err := grpc.Dial(
pluginapi.KubeletSocket,
grpc.WithInsecure(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)
if err != nil {
return err
}
defer conn.Close()
client := pluginapi.NewRegistrationClient(conn)
// 发送注册请求
req := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: p.socket,
ResourceName: p.resourceName,
Options: &pluginapi.DevicePluginOptions{
PreStartRequired: true,
GetPreferredAllocationAvailable: true,
},
}
_, err = client.Register(context.Background(), req)
return err
}
// ListAndWatch 实现 - 流式上报设备列表
func (p *NvidiaDevicePlugin) ListAndWatch(
e *pluginapi.Empty,
s pluginapi.DevicePlugin_ListAndWatchServer,
) error {
// 首次发送完整设备列表
s.Send(&pluginapi.ListAndWatchResponse{Devices: p.devices})
// 持续监听设备状态变化
for {
select {
case <-p.stop:
return nil
case device := <-p.health:
// 设备健康状态变化,重新发送完整列表
p.updateDeviceHealth(device)
s.Send(&pluginapi.ListAndWatchResponse{Devices: p.devices})
}
}
}
// Allocate 实现 - 分配设备给容器
func (p *NvidiaDevicePlugin) Allocate(
ctx context.Context,
req *pluginapi.AllocateRequest,
) (*pluginapi.AllocateResponse, error) {
response := &pluginapi.AllocateResponse{}
for _, containerReq := range req.ContainerRequests {
// 获取请求的设备 ID
deviceIDs := containerReq.DevicesIDs
// 验证设备是否存在且健康
for _, id := range deviceIDs {
if !p.deviceExists(id) {
return nil, fmt.Errorf("device %s not found", id)
}
if !p.deviceHealthy(id) {
return nil, fmt.Errorf("device %s is unhealthy", id)
}
}
// 构建容器响应
containerResp := &pluginapi.ContainerAllocateResponse{
// 设置环境变量,告诉容器运行时使用哪些 GPU
Envs: map[string]string{
"NVIDIA_VISIBLE_DEVICES": strings.Join(deviceIDs, ","),
},
// 设备节点挂载 (实际上 nvidia-container-runtime 会处理)
Devices: []*pluginapi.DeviceSpec{
{
ContainerPath: "/dev/nvidiactl",
HostPath: "/dev/nvidiactl",
Permissions: "rw",
},
{
ContainerPath: "/dev/nvidia-uvm",
HostPath: "/dev/nvidia-uvm",
Permissions: "rw",
},
},
}
// 添加每个 GPU 的设备节点
for i, id := range deviceIDs {
containerResp.Devices = append(containerResp.Devices, &pluginapi.DeviceSpec{
ContainerPath: fmt.Sprintf("/dev/nvidia%d", i),
HostPath: fmt.Sprintf("/dev/nvidia%d", p.getDeviceIndex(id)),
Permissions: "rw",
})
}
response.ContainerResponses = append(response.ContainerResponses, containerResp)
}
return response, nil
}
// GetPreferredAllocation 实现 - 返回首选的设备分配方案
// 用于拓扑感知调度
func (p *NvidiaDevicePlugin) GetPreferredAllocation(
ctx context.Context,
req *pluginapi.PreferredAllocationRequest,
) (*pluginapi.PreferredAllocationResponse, error) {
response := &pluginapi.PreferredAllocationResponse{}
for _, containerReq := range req.ContainerRequests {
available := containerReq.AvailableDeviceIDs
mustInclude := containerReq.MustIncludeDeviceIDs
size := int(containerReq.AllocationSize)
// 获取首选分配
// 考虑 NVLink 连接、NUMA 亲和性等
preferred := p.getPreferredDevices(available, mustInclude, size)
response.ContainerResponses = append(response.ContainerResponses,
&pluginapi.ContainerPreferredAllocationResponse{
DeviceIDs: preferred,
})
}
return response, nil
}
// 获取首选设备 - 考虑拓扑
func (p *NvidiaDevicePlugin) getPreferredDevices(
available []string,
mustInclude []string,
size int,
) []string {
// 简化实现:优先选择同一 NVLink 域的 GPU
// 1. 首先包含必须包含的设备
selected := make(map[string]bool)
for _, id := range mustInclude {
selected[id] = true
}
// 2. 获取设备拓扑信息
topology := p.getDeviceTopology()
// 3. 按 NVLink 连接度排序
// 优先选择与已选设备有 NVLink 连接的设备
candidates := p.sortByNVLinkAffinity(available, selected, topology)
// 4. 选择剩余设备
for _, id := range candidates {
if len(selected) >= size {
break
}
if !selected[id] {
selected[id] = true
}
}
// 转换为列表
result := make([]string, 0, len(selected))
for id := range selected {
result = append(result, id)
}
return result
}
// 健康检查
func (p *NvidiaDevicePlugin) healthCheck() {
for {
select {
case <-p.stop:
return
case <-time.After(30 * time.Second):
for _, device := range p.devices {
health := p.checkDeviceHealth(device.ID)
if health != device.Health {
device.Health = health
p.health <- device
}
}
}
}
}
func (p *NvidiaDevicePlugin) checkDeviceHealth(id string) string {
// 使用 NVML 检查设备健康状态
handle, ret := nvml.DeviceGetHandleByUUID(id)
if ret != nvml.SUCCESS {
return pluginapi.Unhealthy
}
// 检查 ECC 错误
eccErrors, ret := handle.GetTotalEccErrors(
nvml.MEMORY_ERROR_TYPE_UNCORRECTED,
nvml.VOLATILE_ECC,
)
if ret == nvml.SUCCESS && eccErrors > 0 {
return pluginapi.Unhealthy
}
// 检查温度
temp, ret := handle.GetTemperature(nvml.TEMPERATURE_GPU)
if ret == nvml.SUCCESS && temp > 90 {
return pluginapi.Unhealthy
}
return pluginapi.Healthy
}
3.3 MIG 支持实现
// internal/rm/mig.go
// MIG 资源管理器
type MIGResourceManager struct {
nvml nvml.Interface
devices map[string]*MIGDevice
}
type MIGDevice struct {
ParentUUID string
GIProfileID int
CIProfileID int
UUID string
Memory uint64
Name string // 如 "3g.40gb"
}
func (m *MIGResourceManager) DiscoverMIGDevices() ([]*MIGDevice, error) {
var migDevices []*MIGDevice
// 获取所有 GPU
count, _ := m.nvml.DeviceGetCount()
for i := 0; i < count; i++ {
device, _ := m.nvml.DeviceGetHandleByIndex(i)
// 检查是否启用 MIG
migMode, _ := device.GetMigMode()
if migMode != nvml.DEVICE_MIG_ENABLE {
continue
}
parentUUID, _ := device.GetUUID()
// 获取所有 GPU Instance
giCount, _ := device.GetMaxMigDeviceCount()
for gi := 0; gi < giCount; gi++ {
giHandle, ret := device.GetMigDeviceHandleByIndex(gi)
if ret != nvml.SUCCESS {
continue
}
// 获取 MIG 设备信息
uuid, _ := giHandle.GetUUID()
memory, _ := giHandle.GetMemoryInfo()
// 获取 Profile 信息
giInfo, _ := giHandle.GetGpuInstanceProfileInfo()
ciInfo, _ := giHandle.GetComputeInstanceProfileInfo()
migDevice := &MIGDevice{
ParentUUID: parentUUID,
GIProfileID: giInfo.Id,
CIProfileID: ciInfo.Id,
UUID: uuid,
Memory: memory.Total,
Name: m.getProfileName(giInfo, ciInfo),
}
migDevices = append(migDevices, migDevice)
}
}
return migDevices, nil
}
// 根据 MIG Profile 生成资源名称
func (m *MIGResourceManager) GetResourceName(device *MIGDevice) string {
// 格式: nvidia.com/mig-<profile>
// 例如: nvidia.com/mig-3g.40gb
return fmt.Sprintf("nvidia.com/mig-%s", device.Name)
}
// MIG 策略配置
type MIGStrategy string
const (
// Single: 假设所有 GPU 都是相同的 MIG 配置
// 适合所有 GPU 配置相同的集群
MIGStrategySingle MIGStrategy = "single"
// Mixed: 支持 MIG 和非 MIG GPU 混合
// 会暴露 nvidia.com/gpu 和 nvidia.com/mig-* 两种资源
MIGStrategyMixed MIGStrategy = "mixed"
// None: 忽略 MIG 设备,只暴露完整 GPU
MIGStrategyNone MIGStrategy = "none"
)
4. Device Plugin 配置详解
4.1 NVIDIA Device Plugin 配置
# nvidia-device-plugin 配置文件
# /etc/nvidia-device-plugin/config.yaml
version: v1
# 共享配置
sharing:
# 时分复用配置
timeSlicing:
renameByDefault: false
failRequestsGreaterThanOne: false
resources:
- name: nvidia.com/gpu
replicas: 4 # 每个 GPU 可被 4 个 Pod 共享
# MIG 配置
mig:
strategy: single # single, mixed, none
resources:
- name: nvidia.com/gpu
devices: all
# 标志配置
flags:
# 指标设备 (用于监控)
metricsConfig: ""
# 配置文件路径
configFile: "/etc/nvidia-device-plugin/config.yaml"
# 是否启用 MPS
mps: false
# 健康检查配置
failOnInitError: true
deviceListStrategy: envvar # envvar 或 volume-mounts
deviceIDStrategy: uuid # uuid 或 index
# GDS (GPUDirect Storage) 支持
gdsEnabled: false
# MoFED (Mellanox OFED) 支持
mofedEnabled: false
---
# ConfigMap 格式部署
apiVersion: v1
kind: ConfigMap
metadata:
name: nvidia-device-plugin-config
namespace: kube-system
data:
config.yaml: |
version: v1
sharing:
timeSlicing:
resources:
- name: nvidia.com/gpu
replicas: 4
4.2 Helm 部署配置
# values.yaml for nvidia-device-plugin Helm chart
# 镜像配置
image:
repository: nvcr.io/nvidia/k8s-device-plugin
tag: v0.14.3
pullPolicy: IfNotPresent
# Device Plugin 配置
config:
# 配置名称 (从 ConfigMap 中选择)
name: ""
# 默认配置
default: |
version: v1
sharing:
timeSlicing:
resources:
- name: nvidia.com/gpu
replicas: 2
# 兼容性配置
compatWithCPUManager: false
# MIG 策略
migStrategy: none # none, single, mixed
# 失败策略
failOnInitError: true
# 设备列表策略
deviceListStrategy: envvar
# 设备 ID 策略
deviceIDStrategy: uuid
# 节点选择器
nodeSelector:
nvidia.com/gpu.present: "true"
# 容忍度
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
# 资源限制
resources:
requests:
cpu: 50m
memory: 50Mi
limits:
cpu: 100m
memory: 100Mi
# 运行时类
runtimeClassName: nvidia
# NFD (Node Feature Discovery) 集成
nfd:
enabled: true
# GFD (GPU Feature Discovery) 集成
gfd:
enabled: true
# Helm 安装命令
helm repo add nvdp https://nvidia.github.io/k8s-device-plugin
helm repo update
# 安装
helm install nvidia-device-plugin nvdp/nvidia-device-plugin \
--namespace kube-system \
--set migStrategy=single \
--set config.default="$(cat config.yaml)"
# 或使用 values 文件
helm install nvidia-device-plugin nvdp/nvidia-device-plugin \
--namespace kube-system \
-f values.yaml
5. 自定义 Device Plugin 开发
5.1 开发框架
// custom-device-plugin/main.go
// 自定义 Device Plugin 开发模板
package main
import (
"context"
"fmt"
"net"
"os"
"path/filepath"
"time"
"google.golang.org/grpc"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)
const (
resourceName = "example.com/custom-device"
socketName = "custom-device.sock"
)
// CustomDevicePlugin 自定义设备插件
type CustomDevicePlugin struct {
devices []*pluginapi.Device
socket string
server *grpc.Server
healthCheck chan *pluginapi.Device
stop chan struct{}
}
// NewCustomDevicePlugin 创建新的插件实例
func NewCustomDevicePlugin() *CustomDevicePlugin {
return &CustomDevicePlugin{
socket: filepath.Join(pluginapi.DevicePluginPath, socketName),
healthCheck: make(chan *pluginapi.Device),
stop: make(chan struct{}),
}
}
// discoverDevices 发现设备
// 这里需要实现具体的设备发现逻辑
func (p *CustomDevicePlugin) discoverDevices() []*pluginapi.Device {
// 示例:发现 4 个自定义设备
devices := make([]*pluginapi.Device, 4)
for i := 0; i < 4; i++ {
devices[i] = &pluginapi.Device{
ID: fmt.Sprintf("device-%d", i),
Health: pluginapi.Healthy,
// 可选:添加拓扑信息
Topology: &pluginapi.TopologyInfo{
Nodes: []*pluginapi.NUMANode{
{ID: int64(i % 2)}, // 假设有 2 个 NUMA 节点
},
},
}
}
return devices
}
// Start 启动插件
func (p *CustomDevicePlugin) Start() error {
// 1. 发现设备
p.devices = p.discoverDevices()
if len(p.devices) == 0 {
return fmt.Errorf("no devices found")
}
// 2. 清理旧 socket
os.Remove(p.socket)
// 3. 创建 gRPC server
p.server = grpc.NewServer()
pluginapi.RegisterDevicePluginServer(p.server, p)
// 4. 监听 socket
listener, err := net.Listen("unix", p.socket)
if err != nil {
return err
}
// 5. 启动 server
go func() {
p.server.Serve(listener)
}()
// 6. 等待 server 启动
conn, err := p.dial(p.socket, 5*time.Second)
if err != nil {
return err
}
conn.Close()
// 7. 注册到 kubelet
if err := p.register(); err != nil {
return err
}
// 8. 启动健康检查
go p.runHealthCheck()
return nil
}
// register 向 kubelet 注册
func (p *CustomDevicePlugin) register() error {
conn, err := p.dial(pluginapi.KubeletSocket, 5*time.Second)
if err != nil {
return err
}
defer conn.Close()
client := pluginapi.NewRegistrationClient(conn)
req := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: socketName,
ResourceName: resourceName,
Options: &pluginapi.DevicePluginOptions{
PreStartRequired: false,
GetPreferredAllocationAvailable: true,
},
}
_, err = client.Register(context.Background(), req)
return err
}
// GetDevicePluginOptions 返回插件选项
func (p *CustomDevicePlugin) GetDevicePluginOptions(
ctx context.Context,
e *pluginapi.Empty,
) (*pluginapi.DevicePluginOptions, error) {
return &pluginapi.DevicePluginOptions{
PreStartRequired: false,
GetPreferredAllocationAvailable: true,
}, nil
}
// ListAndWatch 列出并监控设备
func (p *CustomDevicePlugin) ListAndWatch(
e *pluginapi.Empty,
s pluginapi.DevicePlugin_ListAndWatchServer,
) error {
// 发送初始设备列表
s.Send(&pluginapi.ListAndWatchResponse{Devices: p.devices})
// 持续监控
for {
select {
case <-p.stop:
return nil
case device := <-p.healthCheck:
// 更新设备健康状态
for i, d := range p.devices {
if d.ID == device.ID {
p.devices[i].Health = device.Health
break
}
}
// 发送更新后的设备列表
s.Send(&pluginapi.ListAndWatchResponse{Devices: p.devices})
}
}
}
// Allocate 分配设备
func (p *CustomDevicePlugin) Allocate(
ctx context.Context,
req *pluginapi.AllocateRequest,
) (*pluginapi.AllocateResponse, error) {
response := &pluginapi.AllocateResponse{}
for _, containerReq := range req.ContainerRequests {
containerResp := &pluginapi.ContainerAllocateResponse{
// 设置环境变量
Envs: map[string]string{
"CUSTOM_DEVICES": fmt.Sprintf("%v", containerReq.DevicesIDs),
},
// 挂载设备
Mounts: []*pluginapi.Mount{
{
ContainerPath: "/dev/custom",
HostPath: "/dev/custom",
ReadOnly: false,
},
},
// 设备节点
Devices: []*pluginapi.DeviceSpec{},
}
// 为每个请求的设备添加设备节点
for _, id := range containerReq.DevicesIDs {
containerResp.Devices = append(containerResp.Devices, &pluginapi.DeviceSpec{
ContainerPath: fmt.Sprintf("/dev/custom/%s", id),
HostPath: fmt.Sprintf("/dev/custom/%s", id),
Permissions: "rw",
})
}
response.ContainerResponses = append(response.ContainerResponses, containerResp)
}
return response, nil
}
// GetPreferredAllocation 返回首选分配方案
func (p *CustomDevicePlugin) GetPreferredAllocation(
ctx context.Context,
req *pluginapi.PreferredAllocationRequest,
) (*pluginapi.PreferredAllocationResponse, error) {
response := &pluginapi.PreferredAllocationResponse{}
for _, containerReq := range req.ContainerRequests {
// 简单实现:按顺序选择
var preferred []string
for _, id := range containerReq.AvailableDeviceIDs {
preferred = append(preferred, id)
if len(preferred) >= int(containerReq.AllocationSize) {
break
}
}
response.ContainerResponses = append(response.ContainerResponses,
&pluginapi.ContainerPreferredAllocationResponse{
DeviceIDs: preferred,
})
}
return response, nil
}
// PreStartContainer 容器启动前钩子
func (p *CustomDevicePlugin) PreStartContainer(
ctx context.Context,
req *pluginapi.PreStartContainerRequest,
) (*pluginapi.PreStartContainerResponse, error) {
// 可以在这里做一些容器启动前的准备工作
return &pluginapi.PreStartContainerResponse{}, nil
}
// runHealthCheck 运行健康检查
func (p *CustomDevicePlugin) runHealthCheck() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-p.stop:
return
case <-ticker.C:
for _, device := range p.devices {
health := p.checkHealth(device.ID)
if health != device.Health {
p.healthCheck <- &pluginapi.Device{
ID: device.ID,
Health: health,
}
}
}
}
}
}
// checkHealth 检查设备健康状态
func (p *CustomDevicePlugin) checkHealth(id string) string {
// 实现具体的健康检查逻辑
// 例如:检查设备文件是否存在
devicePath := fmt.Sprintf("/dev/custom/%s", id)
if _, err := os.Stat(devicePath); os.IsNotExist(err) {
return pluginapi.Unhealthy
}
return pluginapi.Healthy
}
// Stop 停止插件
func (p *CustomDevicePlugin) Stop() {
close(p.stop)
if p.server != nil {
p.server.Stop()
}
}
// dial 连接到 socket
func (p *CustomDevicePlugin) dial(socket string, timeout time.Duration) (*grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return grpc.DialContext(ctx, socket,
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)
}
func main() {
plugin := NewCustomDevicePlugin()
// 启动插件
if err := plugin.Start(); err != nil {
fmt.Printf("Failed to start plugin: %v\n", err)
os.Exit(1)
}
fmt.Println("Custom Device Plugin started")
// 监听 kubelet 重启
// kubelet 重启后需要重新注册
watcher, err := fsnotify.NewWatcher()
if err != nil {
fmt.Printf("Failed to create watcher: %v\n", err)
os.Exit(1)
}
defer watcher.Close()
watcher.Add(pluginapi.DevicePluginPath)
for {
select {
case event := <-watcher.Events:
if event.Name == pluginapi.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
fmt.Println("Kubelet restarted, re-registering...")
plugin.Stop()
plugin = NewCustomDevicePlugin()
if err := plugin.Start(); err != nil {
fmt.Printf("Failed to restart plugin: %v\n", err)
}
}
case err := <-watcher.Errors:
fmt.Printf("Watcher error: %v\n", err)
}
}
}
5.2 部署自定义 Device Plugin
# custom-device-plugin-deployment.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: custom-device-plugin
namespace: kube-system
labels:
app: custom-device-plugin
spec:
selector:
matchLabels:
app: custom-device-plugin
template:
metadata:
labels:
app: custom-device-plugin
spec:
nodeSelector:
# 只在有自定义设备的节点运行
example.com/custom-device: "true"
tolerations:
- key: example.com/custom-device
operator: Exists
effect: NoSchedule
containers:
- name: custom-device-plugin
image: your-registry/custom-device-plugin:v1.0
securityContext:
privileged: true # 访问设备需要
volumeMounts:
- name: device-plugin
mountPath: /var/lib/kubelet/device-plugins
- name: dev
mountPath: /dev
volumes:
- name: device-plugin
hostPath:
path: /var/lib/kubelet/device-plugins
- name: dev
hostPath:
path: /dev
6. Checkpoint 机制
6.1 Device Manager Checkpoint
┌─────────────────────────────────────────────────────────────────────┐
│ Checkpoint 机制 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ kubelet 使用 checkpoint 保存设备分配状态 │
│ 用于在 kubelet 重启后恢复分配状态 │
│ │
│ Checkpoint 文件位置: │
│ /var/lib/kubelet/device-plugins/kubelet_internal_checkpoint │
│ │
│ Checkpoint 内容 (JSON): │
│ { │
│ "Data": { │
│ "PodDeviceEntries": [ │
│ { │
│ "PodUID": "pod-uuid-1", │
│ "ContainerName": "container-1", │
│ "ResourceName": "nvidia.com/gpu", │
│ "DeviceIDs": ["GPU-uuid-1", "GPU-uuid-2"], │
│ "AllocResp": "<base64-encoded-allocate-response>" │
│ } │
│ ], │
│ "RegisteredDevices": { │
│ "nvidia.com/gpu": ["GPU-uuid-1", "GPU-uuid-2", "GPU-uuid-3"] │
│ } │
│ }, │
│ "Checksum": 12345678 │
│ } │
│ │
│ 重启恢复流程: │
│ 1. kubelet 启动 │
│ 2. 读取 checkpoint 文件 │
│ 3. 恢复 Pod 到设备的映射 │
│ 4. Device Plugin 重新注册 │
│ 5. 验证设备状态与 checkpoint 一致 │
│ 6. 如果设备丢失,标记相关 Pod 为 Failed │
│ │
└─────────────────────────────────────────────────────────────────────┘
# 查看 checkpoint 文件
cat /var/lib/kubelet/device-plugins/kubelet_internal_checkpoint | jq .
# 强制清理 checkpoint (谨慎操作)
# 需要先停止 kubelet
systemctl stop kubelet
rm /var/lib/kubelet/device-plugins/kubelet_internal_checkpoint
systemctl start kubelet
7. 本章总结
7.1 核心知识点
┌─────────────────────────────────────────────────────────────────────┐
│ Device Plugin 机制知识图谱 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 资源模型 │
│ ├── 原生资源: cpu, memory, ephemeral-storage │
│ └── 扩展资源: nvidia.com/gpu, 必须是整数 │
│ │
│ Device Plugin API (gRPC) │
│ ├── Register: 向 kubelet 注册 │
│ ├── ListAndWatch: 流式上报设备列表 │
│ ├── Allocate: 分配设备给容器 │
│ ├── GetPreferredAllocation: 拓扑感知分配 │
│ └── PreStartContainer: 启动前钩子 │
│ │
│ NVIDIA Device Plugin │
│ ├── 设备发现: 通过 NVML │
│ ├── MIG 支持: single/mixed/none 策略 │
│ ├── 时分复用: replicas 配置 │
│ └── 健康检查: ECC 错误、温度检测 │
│ │
│ 工作流程 │
│ ├── 1. Plugin 启动并创建 socket │
│ ├── 2. 向 kubelet 注册 │
│ ├── 3. ListAndWatch 上报设备 │
│ ├── 4. kubelet 更新 Node Capacity │
│ ├── 5. Pod 调度到节点 │
│ ├── 6. Allocate 分配设备 │
│ └── 7. 容器创建时注入设备 │
│ │
└─────────────────────────────────────────────────────────────────────┘
7.2 面试要点
Device Plugin 的作用是什么?
- 让 Kubernetes 能够管理非标准硬件资源(GPU、FPGA 等)
- 提供设备发现、上报、分配的标准接口
- 与 kubelet 通过 gRPC 通信
Device Plugin 的核心接口有哪些?
- Register: 注册到 kubelet
- ListAndWatch: 流式上报设备列表
- Allocate: 分配设备,返回环境变量、挂载点、设备节点
- GetPreferredAllocation: 拓扑感知分配
扩展资源与原生资源的区别?
- 扩展资源必须是整数
- requests 必须等于 limits
- 不能超额分配
- kubelet 不强制执行限制
如何实现 GPU 共享?
- MIG: 硬件分区,通过 mig strategy 配置
- 时分复用: 软件共享,通过 replicas 配置
Device Plugin 重启后如何恢复状态?
- kubelet 使用 checkpoint 文件保存分配状态
- 重启后读取 checkpoint 恢复映射关系
7.3 下一章预告
下一章我们将深入探讨 GPU 调度器实现:
- Kubernetes 调度器架构
- 调度器扩展点
- GPU 感知调度实现
- 自定义调度器开发