HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

异构计算概述

什么是异构计算

异构计算(Heterogeneous Computing)是指在一个系统中同时使用多种不同类型处理器的计算方式。在 AI 基础设施中,异构计算通常涉及 CPU、GPU、TPU、FPGA、NPU 等多种计算设备的协同工作。

异构计算的必要性

传统同构计算 vs 异构计算:

同构计算(仅 CPU):
┌─────────────────────────────────────────────────────────────┐
│ CPU 集群                                                     │
│ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐                           │
│ │ CPU │ │ CPU │ │ CPU │ │ CPU │  所有任务使用相同架构       │
│ └─────┘ └─────┘ └─────┘ └─────┘                           │
│                                                             │
│ 问题:                                                       │
│ - 并行计算能力有限                                           │
│ - 能效比低                                                   │
│ - 无法满足 AI 训练/推理需求                                  │
└─────────────────────────────────────────────────────────────┘

异构计算:
┌─────────────────────────────────────────────────────────────┐
│ 异构计算集群                                                 │
│                                                             │
│ ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐        │
│ │   CPU   │  │   GPU   │  │   TPU   │  │  FPGA   │        │
│ │ 通用计算 │  │ 并行计算 │  │ 张量计算 │  │ 定制加速 │        │
│ └─────────┘  └─────────┘  └─────────┘  └─────────┘        │
│                                                             │
│ 优势:                                                       │
│ - 任务匹配最优硬件                                           │
│ - 更高的计算效率                                             │
│ - 更好的能效比                                               │
│ - 灵活的资源配置                                             │
└─────────────────────────────────────────────────────────────┘

主流 AI 加速器对比

GPU (Graphics Processing Unit)

GPU 是目前最主流的 AI 加速器,以 NVIDIA 为代表:

package heterogeneous

import (
    "fmt"
)

// GPUSpec GPU 规格定义
type GPUSpec struct {
    Name            string
    Architecture    string
    ComputeCapability string
    CUDACores       int
    TensorCores     int
    MemoryGB        int
    MemoryBandwidth float64  // GB/s
    TDP             int      // Watts
    FP16TFLOPS      float64
    FP32TFLOPS      float64
    INT8TOPS        float64
    InterconnectType string  // NVLink, PCIe
    InterconnectBandwidth float64 // GB/s
}

// 主流 GPU 规格对比
var GPUCatalog = map[string]GPUSpec{
    "A100-80GB": {
        Name:            "NVIDIA A100",
        Architecture:    "Ampere",
        ComputeCapability: "8.0",
        CUDACores:       6912,
        TensorCores:     432,
        MemoryGB:        80,
        MemoryBandwidth: 2039,
        TDP:             400,
        FP16TFLOPS:      312,
        FP32TFLOPS:      19.5,
        INT8TOPS:        624,
        InterconnectType: "NVLink",
        InterconnectBandwidth: 600,
    },
    "H100-80GB": {
        Name:            "NVIDIA H100",
        Architecture:    "Hopper",
        ComputeCapability: "9.0",
        CUDACores:       16896,
        TensorCores:     528,
        MemoryGB:        80,
        MemoryBandwidth: 3350,
        TDP:             700,
        FP16TFLOPS:      989,
        FP32TFLOPS:      67,
        INT8TOPS:        1979,
        InterconnectType: "NVLink",
        InterconnectBandwidth: 900,
    },
    "H200-141GB": {
        Name:            "NVIDIA H200",
        Architecture:    "Hopper",
        ComputeCapability: "9.0",
        CUDACores:       16896,
        TensorCores:     528,
        MemoryGB:        141,
        MemoryBandwidth: 4800,
        TDP:             700,
        FP16TFLOPS:      989,
        FP32TFLOPS:      67,
        INT8TOPS:        1979,
        InterconnectType: "NVLink",
        InterconnectBandwidth: 900,
    },
    "L40S": {
        Name:            "NVIDIA L40S",
        Architecture:    "Ada Lovelace",
        ComputeCapability: "8.9",
        CUDACores:       18176,
        TensorCores:     568,
        MemoryGB:        48,
        MemoryBandwidth: 864,
        TDP:             350,
        FP16TFLOPS:      362,
        FP32TFLOPS:      91.6,
        INT8TOPS:        724,
        InterconnectType: "PCIe",
        InterconnectBandwidth: 64,
    },
    "RTX-4090": {
        Name:            "NVIDIA RTX 4090",
        Architecture:    "Ada Lovelace",
        ComputeCapability: "8.9",
        CUDACores:       16384,
        TensorCores:     512,
        MemoryGB:        24,
        MemoryBandwidth: 1008,
        TDP:             450,
        FP16TFLOPS:      330,
        FP32TFLOPS:      82.6,
        INT8TOPS:        660,
        InterconnectType: "PCIe",
        InterconnectBandwidth: 64,
    },
}

// GPUCapability GPU 能力枚举
type GPUCapability string

const (
    CapTensorCore   GPUCapability = "tensor_core"    // Tensor Core 支持
    CapFP8          GPUCapability = "fp8"            // FP8 支持
    CapNVLink       GPUCapability = "nvlink"         // NVLink 互连
    CapMIG          GPUCapability = "mig"            // 多实例 GPU
    CapTransformerEngine GPUCapability = "transformer_engine"
)

// GetGPUCapabilities 获取 GPU 能力
func GetGPUCapabilities(spec *GPUSpec) []GPUCapability {
    caps := []GPUCapability{}

    if spec.TensorCores > 0 {
        caps = append(caps, CapTensorCore)
    }

    if spec.Architecture == "Hopper" {
        caps = append(caps, CapFP8, CapTransformerEngine)
    }

    if spec.InterconnectType == "NVLink" {
        caps = append(caps, CapNVLink)
    }

    if spec.Architecture == "Ampere" || spec.Architecture == "Hopper" {
        caps = append(caps, CapMIG)
    }

    return caps
}

// CalculateModelFit 计算模型是否能在 GPU 上运行
func CalculateModelFit(spec *GPUSpec, modelParams int64, batchSize int, seqLen int) *ModelFitResult {
    // 估算模型内存需求(FP16)
    // 参数内存 = params * 2 bytes (FP16)
    paramMemGB := float64(modelParams*2) / 1e9

    // KV Cache 内存 = 2 * layers * hidden * batch * seq * 2 bytes
    // 简化估算:约为参数的 10-20% per 1K tokens
    kvCacheGB := paramMemGB * 0.15 * float64(seqLen) / 1024

    // 激活内存(峰值)
    activationGB := paramMemGB * 0.3 * float64(batchSize)

    totalRequired := paramMemGB + kvCacheGB + activationGB
    available := float64(spec.MemoryGB) * 0.9 // 预留 10% 碎片

    return &ModelFitResult{
        Fits:            totalRequired <= available,
        RequiredMemGB:   totalRequired,
        AvailableMemGB:  available,
        ParamMemGB:      paramMemGB,
        KVCacheMemGB:    kvCacheGB,
        ActivationMemGB: activationGB,
        UtilizationPct:  (totalRequired / available) * 100,
    }
}

type ModelFitResult struct {
    Fits            bool
    RequiredMemGB   float64
    AvailableMemGB  float64
    ParamMemGB      float64
    KVCacheMemGB    float64
    ActivationMemGB float64
    UtilizationPct  float64
}

TPU (Tensor Processing Unit)

Google 的 TPU 专为机器学习设计:

package heterogeneous

// TPUSpec TPU 规格
type TPUSpec struct {
    Name           string
    Version        string
    ChipsPerPod    int
    HBMPerChip     int      // GB
    TFLOPSPerChip  float64  // BF16
    InterconnectBandwidth float64 // Gbps
    TDP            int      // Watts per chip
}

var TPUCatalog = map[string]TPUSpec{
    "v4": {
        Name:          "Cloud TPU v4",
        Version:       "v4",
        ChipsPerPod:   4096,
        HBMPerChip:    32,
        TFLOPSPerChip: 275,
        InterconnectBandwidth: 4800,
        TDP:           192,
    },
    "v5e": {
        Name:          "Cloud TPU v5e",
        Version:       "v5e",
        ChipsPerPod:   256,
        HBMPerChip:    16,
        TFLOPSPerChip: 197,
        InterconnectBandwidth: 1600,
        TDP:           150,
    },
    "v5p": {
        Name:          "Cloud TPU v5p",
        Version:       "v5p",
        ChipsPerPod:   8960,
        HBMPerChip:    95,
        TFLOPSPerChip: 459,
        InterconnectBandwidth: 4800,
        TDP:           350,
    },
}

// TPUTopology TPU 拓扑配置
type TPUTopology struct {
    Version     string
    Shape       [3]int  // x, y, z 维度
    NumChips    int
    Accelerators int
}

// 常见 TPU 配置
var TPUTopologies = []TPUTopology{
    {Version: "v4", Shape: [3]int{2, 2, 1}, NumChips: 4, Accelerators: 4},
    {Version: "v4", Shape: [3]int{2, 2, 4}, NumChips: 16, Accelerators: 16},
    {Version: "v4", Shape: [3]int{4, 4, 4}, NumChips: 64, Accelerators: 64},
    {Version: "v4", Shape: [3]int{8, 8, 8}, NumChips: 512, Accelerators: 512},
    {Version: "v5e", Shape: [3]int{2, 2, 1}, NumChips: 4, Accelerators: 4},
    {Version: "v5e", Shape: [3]int{4, 4, 1}, NumChips: 16, Accelerators: 16},
    {Version: "v5p", Shape: [3]int{2, 2, 2}, NumChips: 8, Accelerators: 8},
    {Version: "v5p", Shape: [3]int{4, 4, 4}, NumChips: 64, Accelerators: 64},
}

// TPUPodSlice TPU Pod 切片
type TPUPodSlice struct {
    Topology    TPUTopology
    TotalHBMGB  int
    TotalTFLOPS float64
    NumHosts    int
}

func NewTPUPodSlice(topology TPUTopology) *TPUPodSlice {
    spec := TPUCatalog[topology.Version]

    return &TPUPodSlice{
        Topology:    topology,
        TotalHBMGB:  topology.NumChips * spec.HBMPerChip,
        TotalTFLOPS: float64(topology.NumChips) * spec.TFLOPSPerChip,
        NumHosts:    (topology.NumChips + 3) / 4, // 每主机 4 chips
    }
}

其他加速器

package heterogeneous

// AcceleratorType 加速器类型
type AcceleratorType string

const (
    AccelGPU    AcceleratorType = "gpu"
    AccelTPU    AcceleratorType = "tpu"
    AccelFPGA   AcceleratorType = "fpga"
    AccelASIC   AcceleratorType = "asic"
    AccelNPU    AcceleratorType = "npu"
)

// AcceleratorSpec 通用加速器规格
type AcceleratorSpec struct {
    Type            AcceleratorType
    Vendor          string
    Model           string
    ComputeTFLOPS   float64
    MemoryGB        int
    MemoryBandwidth float64  // GB/s
    TDP             int
    Features        []string
    UseCases        []string
}

// 其他加速器目录
var OtherAccelerators = map[string]AcceleratorSpec{
    // Intel Gaudi
    "gaudi2": {
        Type:            AccelASIC,
        Vendor:          "Intel",
        Model:           "Gaudi 2",
        ComputeTFLOPS:   432,  // BF16
        MemoryGB:        96,
        MemoryBandwidth: 2450,
        TDP:             600,
        Features:        []string{"RoCE v2", "Scale-out"},
        UseCases:        []string{"LLM Training", "LLM Inference"},
    },
    "gaudi3": {
        Type:            AccelASIC,
        Vendor:          "Intel",
        Model:           "Gaudi 3",
        ComputeTFLOPS:   1835, // BF16
        MemoryGB:        128,
        MemoryBandwidth: 3700,
        TDP:             900,
        Features:        []string{"FP8", "Scale-out"},
        UseCases:        []string{"LLM Training", "LLM Inference"},
    },
    // AWS Inferentia/Trainium
    "trainium": {
        Type:            AccelASIC,
        Vendor:          "AWS",
        Model:           "Trainium",
        ComputeTFLOPS:   190,
        MemoryGB:        32,
        MemoryBandwidth: 820,
        TDP:             180,
        Features:        []string{"NeuronLink"},
        UseCases:        []string{"LLM Training"},
    },
    "trainium2": {
        Type:            AccelASIC,
        Vendor:          "AWS",
        Model:           "Trainium2",
        ComputeTFLOPS:   380,
        MemoryGB:        96,
        MemoryBandwidth: 2460,
        TDP:             300,
        Features:        []string{"NeuronLink v2"},
        UseCases:        []string{"LLM Training", "Inference"},
    },
    "inferentia2": {
        Type:            AccelASIC,
        Vendor:          "AWS",
        Model:           "Inferentia2",
        ComputeTFLOPS:   190,
        MemoryGB:        32,
        MemoryBandwidth: 820,
        TDP:             130,
        Features:        []string{"Low latency"},
        UseCases:        []string{"LLM Inference"},
    },
    // AMD
    "mi300x": {
        Type:            AccelGPU,
        Vendor:          "AMD",
        Model:           "Instinct MI300X",
        ComputeTFLOPS:   1307, // FP16
        MemoryGB:        192,
        MemoryBandwidth: 5300,
        TDP:             750,
        Features:        []string{"ROCm", "Infinity Fabric"},
        UseCases:        []string{"LLM Training", "LLM Inference"},
    },
    // Cerebras
    "wse2": {
        Type:            AccelASIC,
        Vendor:          "Cerebras",
        Model:           "WSE-2",
        ComputeTFLOPS:   2000, // Theoretical
        MemoryGB:        40,   // On-chip SRAM
        MemoryBandwidth: 20000, // On-chip
        TDP:             15000,
        Features:        []string{"Wafer-scale", "On-chip memory"},
        UseCases:        []string{"LLM Training"},
    },
    // Groq
    "lpu": {
        Type:            AccelASIC,
        Vendor:          "Groq",
        Model:           "LPU",
        ComputeTFLOPS:   750,
        MemoryGB:        230,  // SRAM
        MemoryBandwidth: 80000, // On-chip
        TDP:             300,
        Features:        []string{"Deterministic latency", "SRAM-only"},
        UseCases:        []string{"LLM Inference"},
    },
}

// AcceleratorComparison 加速器对比
type AcceleratorComparison struct {
    Name            string
    TFLOPSPerDollar float64
    TFLOPSPerWatt   float64
    MemoryPerTFLOPS float64
    BestFor         []string
}

func CompareAccelerators() []AcceleratorComparison {
    // 估算价格(每小时云端价格,美元)
    prices := map[string]float64{
        "H100-80GB":   3.50,
        "A100-80GB":   2.50,
        "L40S":        1.20,
        "gaudi2":      1.80,
        "mi300x":      3.00,
        "trainium2":   1.50,
        "inferentia2": 0.75,
    }

    comparisons := make([]AcceleratorComparison, 0)

    // GPU 对比
    for name, spec := range GPUCatalog {
        if price, ok := prices[name]; ok {
            comparisons = append(comparisons, AcceleratorComparison{
                Name:            name,
                TFLOPSPerDollar: spec.FP16TFLOPS / price,
                TFLOPSPerWatt:   spec.FP16TFLOPS / float64(spec.TDP),
                MemoryPerTFLOPS: float64(spec.MemoryGB) / spec.FP16TFLOPS,
                BestFor:         determineBestUse(spec.FP16TFLOPS, spec.MemoryGB),
            })
        }
    }

    return comparisons
}

func determineBestUse(tflops float64, memGB int) []string {
    uses := []string{}

    if tflops > 500 && memGB > 60 {
        uses = append(uses, "Large LLM Training")
    }
    if memGB > 40 {
        uses = append(uses, "Large Model Inference")
    }
    if tflops > 300 {
        uses = append(uses, "High Throughput Inference")
    }

    return uses
}

异构计算资源抽象

统一资源模型

package heterogeneous

import (
    "context"
    "sync"
    "time"
)

// ComputeDevice 计算设备抽象
type ComputeDevice interface {
    GetType() AcceleratorType
    GetID() string
    GetSpec() *DeviceSpec
    GetStatus() *DeviceStatus
    GetCapabilities() []string
    Allocate(ctx context.Context, req *AllocationRequest) (*Allocation, error)
    Release(allocationID string) error
}

// DeviceSpec 设备规格
type DeviceSpec struct {
    Type             AcceleratorType `json:"type"`
    Vendor           string          `json:"vendor"`
    Model            string          `json:"model"`
    ComputeCapacity  float64         `json:"compute_capacity"`  // TFLOPS
    MemoryCapacity   int64           `json:"memory_capacity"`   // Bytes
    MemoryBandwidth  float64         `json:"memory_bandwidth"`  // GB/s
    Interconnect     string          `json:"interconnect"`
    InterconnectBW   float64         `json:"interconnect_bw"`   // GB/s
    TDP              int             `json:"tdp"`
}

// DeviceStatus 设备状态
type DeviceStatus struct {
    Healthy          bool      `json:"healthy"`
    MemoryUsed       int64     `json:"memory_used"`
    MemoryFree       int64     `json:"memory_free"`
    Utilization      float64   `json:"utilization"`
    Temperature      int       `json:"temperature"`
    PowerUsage       int       `json:"power_usage"`
    Allocations      []string  `json:"allocations"`
    LastUpdated      time.Time `json:"last_updated"`
}

// AllocationRequest 资源分配请求
type AllocationRequest struct {
    RequestID        string            `json:"request_id"`
    DeviceType       AcceleratorType   `json:"device_type"`
    DeviceCount      int               `json:"device_count"`
    MemoryRequired   int64             `json:"memory_required"`
    ComputeRequired  float64           `json:"compute_required"`
    Exclusive        bool              `json:"exclusive"`
    Affinity         *AffinitySpec     `json:"affinity"`
    AntiAffinity     *AntiAffinitySpec `json:"anti_affinity"`
    Timeout          time.Duration     `json:"timeout"`
}

type AffinitySpec struct {
    SameNode    bool     `json:"same_node"`
    SameRack    bool     `json:"same_rack"`
    Interconnect string  `json:"interconnect"` // 要求的互连类型
    DeviceIDs   []string `json:"device_ids"`   // 指定设备
}

type AntiAffinitySpec struct {
    WorkloadIDs []string `json:"workload_ids"` // 避开的工作负载
}

// Allocation 资源分配结果
type Allocation struct {
    ID           string          `json:"id"`
    RequestID    string          `json:"request_id"`
    DeviceIDs    []string        `json:"device_ids"`
    NodeIDs      []string        `json:"node_ids"`
    Status       AllocationStatus `json:"status"`
    CreateTime   time.Time       `json:"create_time"`
    ExpireTime   *time.Time      `json:"expire_time"`
}

type AllocationStatus string

const (
    AllocStatusPending   AllocationStatus = "pending"
    AllocStatusActive    AllocationStatus = "active"
    AllocStatusReleasing AllocationStatus = "releasing"
    AllocStatusReleased  AllocationStatus = "released"
    AllocStatusFailed    AllocationStatus = "failed"
)

// DevicePool 设备池
type DevicePool struct {
    devices     map[string]ComputeDevice
    allocations map[string]*Allocation
    mu          sync.RWMutex
}

func NewDevicePool() *DevicePool {
    return &DevicePool{
        devices:     make(map[string]ComputeDevice),
        allocations: make(map[string]*Allocation),
    }
}

// RegisterDevice 注册设备
func (p *DevicePool) RegisterDevice(device ComputeDevice) {
    p.mu.Lock()
    defer p.mu.Unlock()
    p.devices[device.GetID()] = device
}

// GetDevicesByType 按类型获取设备
func (p *DevicePool) GetDevicesByType(deviceType AcceleratorType) []ComputeDevice {
    p.mu.RLock()
    defer p.mu.RUnlock()

    result := make([]ComputeDevice, 0)
    for _, device := range p.devices {
        if device.GetType() == deviceType {
            result = append(result, device)
        }
    }
    return result
}

// GetAvailableDevices 获取可用设备
func (p *DevicePool) GetAvailableDevices(req *AllocationRequest) []ComputeDevice {
    p.mu.RLock()
    defer p.mu.RUnlock()

    result := make([]ComputeDevice, 0)

    for _, device := range p.devices {
        // 检查设备类型
        if device.GetType() != req.DeviceType {
            continue
        }

        // 检查设备健康状态
        status := device.GetStatus()
        if !status.Healthy {
            continue
        }

        // 检查内存是否充足
        if status.MemoryFree < req.MemoryRequired {
            continue
        }

        // 检查是否已被独占
        if req.Exclusive && len(status.Allocations) > 0 {
            continue
        }

        // 检查亲和性
        if req.Affinity != nil && !p.checkAffinity(device, req.Affinity) {
            continue
        }

        result = append(result, device)
    }

    return result
}

func (p *DevicePool) checkAffinity(device ComputeDevice, affinity *AffinitySpec) bool {
    if len(affinity.DeviceIDs) > 0 {
        found := false
        for _, id := range affinity.DeviceIDs {
            if device.GetID() == id {
                found = true
                break
            }
        }
        if !found {
            return false
        }
    }

    if affinity.Interconnect != "" {
        spec := device.GetSpec()
        if spec.Interconnect != affinity.Interconnect {
            return false
        }
    }

    return true
}

// Allocate 分配设备
func (p *DevicePool) Allocate(ctx context.Context, req *AllocationRequest) (*Allocation, error) {
    available := p.GetAvailableDevices(req)

    if len(available) < req.DeviceCount {
        return nil, fmt.Errorf("not enough devices: need %d, available %d",
            req.DeviceCount, len(available))
    }

    // 选择最优设备组合
    selected := p.selectDevices(available, req)

    // 执行分配
    allocation := &Allocation{
        ID:         fmt.Sprintf("alloc-%d", time.Now().UnixNano()),
        RequestID:  req.RequestID,
        DeviceIDs:  make([]string, 0, len(selected)),
        NodeIDs:    make([]string, 0),
        Status:     AllocStatusPending,
        CreateTime: time.Now(),
    }

    for _, device := range selected {
        _, err := device.Allocate(ctx, req)
        if err != nil {
            // 回滚已分配的设备
            for _, id := range allocation.DeviceIDs {
                if d, ok := p.devices[id]; ok {
                    d.Release(allocation.ID)
                }
            }
            return nil, err
        }
        allocation.DeviceIDs = append(allocation.DeviceIDs, device.GetID())
    }

    allocation.Status = AllocStatusActive

    p.mu.Lock()
    p.allocations[allocation.ID] = allocation
    p.mu.Unlock()

    return allocation, nil
}

func (p *DevicePool) selectDevices(available []ComputeDevice, req *AllocationRequest) []ComputeDevice {
    // 简单的选择策略:按内存排序,选择内存最充足的
    // 实际实现需要考虑拓扑、互连等因素

    // 排序
    for i := 0; i < len(available)-1; i++ {
        for j := i + 1; j < len(available); j++ {
            if available[i].GetStatus().MemoryFree < available[j].GetStatus().MemoryFree {
                available[i], available[j] = available[j], available[i]
            }
        }
    }

    if req.DeviceCount <= len(available) {
        return available[:req.DeviceCount]
    }
    return available
}

// Release 释放分配
func (p *DevicePool) Release(allocationID string) error {
    p.mu.Lock()
    allocation, exists := p.allocations[allocationID]
    if !exists {
        p.mu.Unlock()
        return fmt.Errorf("allocation not found: %s", allocationID)
    }
    allocation.Status = AllocStatusReleasing
    p.mu.Unlock()

    // 释放每个设备
    for _, deviceID := range allocation.DeviceIDs {
        device, ok := p.devices[deviceID]
        if ok {
            device.Release(allocationID)
        }
    }

    p.mu.Lock()
    allocation.Status = AllocStatusReleased
    delete(p.allocations, allocationID)
    p.mu.Unlock()

    return nil
}

异构计算调度策略

任务-设备匹配

package heterogeneous

import (
    "math"
    "sort"
)

// WorkloadProfile 工作负载特征
type WorkloadProfile struct {
    Type            WorkloadType      `json:"type"`
    ModelSize       int64             `json:"model_size"`       // 参数量
    BatchSize       int               `json:"batch_size"`
    SeqLength       int               `json:"seq_length"`
    ComputeIntensity float64          `json:"compute_intensity"` // FLOPS/Byte
    MemoryBound     bool              `json:"memory_bound"`
    Requirements    []string          `json:"requirements"`      // 特殊需求
    SLO             *SLORequirements  `json:"slo"`
}

type WorkloadType string

const (
    WorkloadTraining     WorkloadType = "training"
    WorkloadInference    WorkloadType = "inference"
    WorkloadFineTuning   WorkloadType = "fine_tuning"
    WorkloadEmbedding    WorkloadType = "embedding"
)

type SLORequirements struct {
    MaxLatencyMs    int     `json:"max_latency_ms"`
    MinThroughput   float64 `json:"min_throughput"`   // tokens/s
    MaxCostPerHour  float64 `json:"max_cost_per_hour"`
}

// DeviceScore 设备评分
type DeviceScore struct {
    Device      ComputeDevice
    Score       float64
    Reasons     []string
    EstLatency  float64
    EstThroughput float64
    EstCost     float64
}

// WorkloadMatcher 工作负载匹配器
type WorkloadMatcher struct {
    devicePool  *DevicePool
    costModel   *CostModel
}

func NewWorkloadMatcher(pool *DevicePool) *WorkloadMatcher {
    return &WorkloadMatcher{
        devicePool: pool,
        costModel:  NewCostModel(),
    }
}

// Match 匹配最优设备
func (m *WorkloadMatcher) Match(workload *WorkloadProfile) []DeviceScore {
    scores := make([]DeviceScore, 0)

    // 获取所有设备类型
    deviceTypes := []AcceleratorType{AccelGPU, AccelTPU, AccelASIC}

    for _, deviceType := range deviceTypes {
        devices := m.devicePool.GetDevicesByType(deviceType)

        for _, device := range devices {
            score := m.scoreDevice(device, workload)
            if score.Score > 0 {
                scores = append(scores, score)
            }
        }
    }

    // 按分数排序
    sort.Slice(scores, func(i, j int) bool {
        return scores[i].Score > scores[j].Score
    })

    return scores
}

func (m *WorkloadMatcher) scoreDevice(device ComputeDevice, workload *WorkloadProfile) DeviceScore {
    score := DeviceScore{
        Device:  device,
        Reasons: make([]string, 0),
    }

    spec := device.GetSpec()
    status := device.GetStatus()

    // 1. 基础可行性检查
    if !status.Healthy {
        return score
    }

    memRequired := m.estimateMemoryRequired(workload)
    if status.MemoryFree < memRequired {
        return score
    }

    // 2. 计算能力评分
    computeScore := m.scoreComputeCapability(spec, workload)
    score.Reasons = append(score.Reasons,
        fmt.Sprintf("Compute score: %.2f", computeScore))

    // 3. 内存带宽评分
    memoryScore := m.scoreMemoryBandwidth(spec, workload)
    score.Reasons = append(score.Reasons,
        fmt.Sprintf("Memory bandwidth score: %.2f", memoryScore))

    // 4. 特性匹配评分
    featureScore := m.scoreFeatures(device, workload)
    score.Reasons = append(score.Reasons,
        fmt.Sprintf("Feature score: %.2f", featureScore))

    // 5. 成本效益评分
    costScore := m.scoreCostEfficiency(device, workload)
    score.Reasons = append(score.Reasons,
        fmt.Sprintf("Cost efficiency score: %.2f", costScore))

    // 6. 估算性能指标
    score.EstLatency = m.estimateLatency(spec, workload)
    score.EstThroughput = m.estimateThroughput(spec, workload)
    score.EstCost = m.costModel.EstimateCost(device, workload)

    // 7. SLO 检查
    sloScore := 1.0
    if workload.SLO != nil {
        if score.EstLatency > float64(workload.SLO.MaxLatencyMs) {
            sloScore = 0
            score.Reasons = append(score.Reasons, "Fails latency SLO")
        }
        if score.EstThroughput < workload.SLO.MinThroughput {
            sloScore *= 0.5
            score.Reasons = append(score.Reasons, "Below throughput SLO")
        }
        if score.EstCost > workload.SLO.MaxCostPerHour {
            sloScore *= 0.5
            score.Reasons = append(score.Reasons, "Exceeds cost SLO")
        }
    }

    // 综合评分
    score.Score = (computeScore*0.3 + memoryScore*0.2 +
        featureScore*0.2 + costScore*0.3) * sloScore

    return score
}

func (m *WorkloadMatcher) estimateMemoryRequired(workload *WorkloadProfile) int64 {
    // 参数内存(FP16)
    paramMem := workload.ModelSize * 2

    // KV Cache
    kvMem := int64(0)
    if workload.Type == WorkloadInference {
        // 简化估算
        kvMem = int64(float64(paramMem) * 0.2 * float64(workload.SeqLength) / 1024)
    }

    // 激活内存
    actMem := int64(float64(paramMem) * 0.3 * float64(workload.BatchSize))

    // 优化器状态(训练时)
    optMem := int64(0)
    if workload.Type == WorkloadTraining {
        optMem = paramMem * 6 // Adam: 参数 + 梯度 + m + v
    }

    return paramMem + kvMem + actMem + optMem
}

func (m *WorkloadMatcher) scoreComputeCapability(spec *DeviceSpec, workload *WorkloadProfile) float64 {
    requiredTFLOPS := m.estimateRequiredTFLOPS(workload)

    // 计算能力是否满足需求
    if spec.ComputeCapacity >= requiredTFLOPS {
        // 过剩不是越多越好,适度即可
        ratio := spec.ComputeCapacity / requiredTFLOPS
        if ratio > 4 {
            return 0.8 // 严重过剩
        }
        return 1.0 - (ratio-1)*0.1
    }

    // 不满足需求
    return spec.ComputeCapacity / requiredTFLOPS * 0.5
}

func (m *WorkloadMatcher) scoreMemoryBandwidth(spec *DeviceSpec, workload *WorkloadProfile) float64 {
    // 内存带宽对内存密集型工作负载更重要
    if workload.MemoryBound {
        // 需要更高的带宽
        requiredBW := float64(workload.ModelSize) * 2 / 1e9 * 100 // 简化估算
        if spec.MemoryBandwidth >= requiredBW {
            return 1.0
        }
        return spec.MemoryBandwidth / requiredBW
    }

    return 0.8 // 计算密集型对带宽要求较低
}

func (m *WorkloadMatcher) scoreFeatures(device ComputeDevice, workload *WorkloadProfile) float64 {
    caps := device.GetCapabilities()
    capSet := make(map[string]bool)
    for _, c := range caps {
        capSet[c] = true
    }

    matchCount := 0
    for _, req := range workload.Requirements {
        if capSet[req] {
            matchCount++
        }
    }

    if len(workload.Requirements) == 0 {
        return 1.0
    }

    return float64(matchCount) / float64(len(workload.Requirements))
}

func (m *WorkloadMatcher) scoreCostEfficiency(device ComputeDevice, workload *WorkloadProfile) float64 {
    cost := m.costModel.EstimateCost(device, workload)
    throughput := m.estimateThroughput(device.GetSpec(), workload)

    if throughput == 0 {
        return 0
    }

    // 每美元吞吐量
    efficiency := throughput / cost

    // 归一化(基于经验值)
    baselineEfficiency := 100.0 // tokens/$ baseline
    return math.Min(efficiency/baselineEfficiency, 1.5)
}

func (m *WorkloadMatcher) estimateRequiredTFLOPS(workload *WorkloadProfile) float64 {
    // 基于模型大小和批次大小估算
    // 简化公式:TFLOPS = 2 * params * batch_size * seq_len / time
    return float64(workload.ModelSize) * 2 * float64(workload.BatchSize) / 1e12
}

func (m *WorkloadMatcher) estimateLatency(spec *DeviceSpec, workload *WorkloadProfile) float64 {
    // 简化的延迟估算
    computeTime := float64(workload.ModelSize*2) / (spec.ComputeCapacity * 1e12) * 1000 // ms
    memoryTime := float64(workload.ModelSize*2) / (spec.MemoryBandwidth * 1e9) * 1000   // ms

    return math.Max(computeTime, memoryTime) * float64(workload.SeqLength)
}

func (m *WorkloadMatcher) estimateThroughput(spec *DeviceSpec, workload *WorkloadProfile) float64 {
    latency := m.estimateLatency(spec, workload)
    if latency == 0 {
        return 0
    }
    return float64(workload.BatchSize*workload.SeqLength) / latency * 1000 // tokens/s
}

// CostModel 成本模型
type CostModel struct {
    prices map[string]float64 // 设备ID -> 每小时价格
}

func NewCostModel() *CostModel {
    return &CostModel{
        prices: map[string]float64{
            "H100-80GB":   3.50,
            "A100-80GB":   2.50,
            "L40S":        1.20,
            "TPU-v4":      3.00,
            "gaudi2":      1.80,
        },
    }
}

func (c *CostModel) EstimateCost(device ComputeDevice, workload *WorkloadProfile) float64 {
    spec := device.GetSpec()
    basePrice := c.prices[spec.Model]
    if basePrice == 0 {
        basePrice = 2.0 // 默认价格
    }
    return basePrice
}

互连拓扑管理

设备拓扑发现

package heterogeneous

import (
    "fmt"
)

// TopologyType 拓扑类型
type TopologyType string

const (
    TopoNVLink   TopologyType = "nvlink"
    TopoPCIe     TopologyType = "pcie"
    TopoNVSwitch TopologyType = "nvswitch"
    TopoInfiniBand TopologyType = "infiniband"
    TopoEthernet TopologyType = "ethernet"
)

// DeviceTopology 设备拓扑
type DeviceTopology struct {
    Nodes       map[string]*TopologyNode
    Links       []*TopologyLink
    Switches    []*TopologySwitch
}

type TopologyNode struct {
    DeviceID    string
    DeviceType  AcceleratorType
    NodeID      string       // 物理节点 ID
    PCIeBus     string       // PCIe 总线地址
    NUMANode    int          // NUMA 节点
    Links       []string     // 连接的设备 ID
}

type TopologyLink struct {
    Source      string
    Target      string
    Type        TopologyType
    Bandwidth   float64      // GB/s
    Latency     float64      // ns
    Hops        int
}

type TopologySwitch struct {
    ID          string
    Type        TopologyType
    Ports       int
    Bandwidth   float64
    ConnectedDevices []string
}

// TopologyManager 拓扑管理器
type TopologyManager struct {
    topology *DeviceTopology
}

func NewTopologyManager() *TopologyManager {
    return &TopologyManager{
        topology: &DeviceTopology{
            Nodes:    make(map[string]*TopologyNode),
            Links:    make([]*TopologyLink, 0),
            Switches: make([]*TopologySwitch, 0),
        },
    }
}

// DiscoverTopology 发现拓扑
func (tm *TopologyManager) DiscoverTopology() error {
    // 发现 NVIDIA GPU 拓扑
    if err := tm.discoverNVIDIATopology(); err != nil {
        return err
    }

    // 发现网络拓扑
    if err := tm.discoverNetworkTopology(); err != nil {
        return err
    }

    return nil
}

func (tm *TopologyManager) discoverNVIDIATopology() error {
    // 实际实现会调用 nvidia-smi 或 NVML 库
    // 这里提供示例拓扑

    // 8-GPU DGX 系统示例
    for i := 0; i < 8; i++ {
        node := &TopologyNode{
            DeviceID:   fmt.Sprintf("GPU-%d", i),
            DeviceType: AccelGPU,
            NodeID:     "dgx-01",
            PCIeBus:    fmt.Sprintf("0000:%02x:00.0", i*16),
            NUMANode:   i / 4,
            Links:      make([]string, 0),
        }
        tm.topology.Nodes[node.DeviceID] = node
    }

    // NVLink 连接(简化的全连接拓扑)
    for i := 0; i < 8; i++ {
        for j := i + 1; j < 8; j++ {
            // 同 NUMA 节点内使用 NVLink
            linkType := TopoNVLink
            bandwidth := 600.0 // H100 NVLink
            latency := 0.5     // us

            if i/4 != j/4 {
                // 跨 NUMA 需要经过 NVSwitch
                bandwidth = 450.0
                latency = 1.0
            }

            link := &TopologyLink{
                Source:    fmt.Sprintf("GPU-%d", i),
                Target:    fmt.Sprintf("GPU-%d", j),
                Type:      linkType,
                Bandwidth: bandwidth,
                Latency:   latency,
                Hops:      1,
            }
            tm.topology.Links = append(tm.topology.Links, link)

            // 更新节点连接
            tm.topology.Nodes[fmt.Sprintf("GPU-%d", i)].Links = append(
                tm.topology.Nodes[fmt.Sprintf("GPU-%d", i)].Links,
                fmt.Sprintf("GPU-%d", j),
            )
            tm.topology.Nodes[fmt.Sprintf("GPU-%d", j)].Links = append(
                tm.topology.Nodes[fmt.Sprintf("GPU-%d", j)].Links,
                fmt.Sprintf("GPU-%d", i),
            )
        }
    }

    return nil
}

func (tm *TopologyManager) discoverNetworkTopology() error {
    // 发现 InfiniBand 或高速以太网拓扑
    return nil
}

// GetLink 获取两个设备间的链路
func (tm *TopologyManager) GetLink(deviceA, deviceB string) *TopologyLink {
    for _, link := range tm.topology.Links {
        if (link.Source == deviceA && link.Target == deviceB) ||
           (link.Source == deviceB && link.Target == deviceA) {
            return link
        }
    }
    return nil
}

// GetBandwidth 获取设备间带宽
func (tm *TopologyManager) GetBandwidth(deviceA, deviceB string) float64 {
    link := tm.GetLink(deviceA, deviceB)
    if link != nil {
        return link.Bandwidth
    }
    return 0
}

// FindOptimalDeviceGroup 找到最优设备组(最大化互连带宽)
func (tm *TopologyManager) FindOptimalDeviceGroup(
    candidates []string,
    count int,
) []string {

    if len(candidates) <= count {
        return candidates
    }

    // 使用贪心算法选择最优组合
    // 优先选择 NVLink 连接的设备

    result := make([]string, 0, count)
    remaining := make(map[string]bool)
    for _, c := range candidates {
        remaining[c] = true
    }

    // 选择第一个设备
    result = append(result, candidates[0])
    delete(remaining, candidates[0])

    // 贪心选择剩余设备
    for len(result) < count && len(remaining) > 0 {
        var bestDevice string
        bestScore := -1.0

        for device := range remaining {
            score := tm.calculateGroupScore(append(result, device))
            if score > bestScore {
                bestScore = score
                bestDevice = device
            }
        }

        if bestDevice != "" {
            result = append(result, bestDevice)
            delete(remaining, bestDevice)
        } else {
            break
        }
    }

    return result
}

func (tm *TopologyManager) calculateGroupScore(devices []string) float64 {
    if len(devices) < 2 {
        return 0
    }

    totalBW := 0.0
    for i := 0; i < len(devices); i++ {
        for j := i + 1; j < len(devices); j++ {
            bw := tm.GetBandwidth(devices[i], devices[j])
            totalBW += bw
        }
    }

    // 平均带宽
    numPairs := float64(len(devices) * (len(devices) - 1) / 2)
    return totalBW / numPairs
}

// IsNVLinkConnected 检查是否 NVLink 连接
func (tm *TopologyManager) IsNVLinkConnected(deviceA, deviceB string) bool {
    link := tm.GetLink(deviceA, deviceB)
    return link != nil && link.Type == TopoNVLink
}

// GetNUMANode 获取设备的 NUMA 节点
func (tm *TopologyManager) GetNUMANode(deviceID string) int {
    node := tm.topology.Nodes[deviceID]
    if node != nil {
        return node.NUMANode
    }
    return -1
}

小结

本章介绍了异构计算的基础概念和核心组件:

  1. 加速器对比:GPU、TPU、FPGA、ASIC 等各类加速器的特性与适用场景
  2. 资源抽象:统一的计算设备模型,屏蔽底层差异
  3. 调度策略:工作负载特征分析与设备匹配算法
  4. 拓扑管理:设备互连拓扑发现与优化分配

异构计算是构建高效 AI 基础设施的关键。通过合理的资源抽象和调度策略,可以充分发挥各类加速器的优势,实现最优的性价比。

下一章我们将深入探讨 GPU 虚拟化与共享,讲解如何在多租户环境中高效共享 GPU 资源。

Prev
05-异构计算
Next
GPU 虚拟化技术