异构计算概述
什么是异构计算
异构计算(Heterogeneous Computing)是指在一个系统中同时使用多种不同类型处理器的计算方式。在 AI 基础设施中,异构计算通常涉及 CPU、GPU、TPU、FPGA、NPU 等多种计算设备的协同工作。
异构计算的必要性
传统同构计算 vs 异构计算:
同构计算(仅 CPU):
┌─────────────────────────────────────────────────────────────┐
│ CPU 集群 │
│ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │ CPU │ │ CPU │ │ CPU │ │ CPU │ 所有任务使用相同架构 │
│ └─────┘ └─────┘ └─────┘ └─────┘ │
│ │
│ 问题: │
│ - 并行计算能力有限 │
│ - 能效比低 │
│ - 无法满足 AI 训练/推理需求 │
└─────────────────────────────────────────────────────────────┘
异构计算:
┌─────────────────────────────────────────────────────────────┐
│ 异构计算集群 │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ CPU │ │ GPU │ │ TPU │ │ FPGA │ │
│ │ 通用计算 │ │ 并行计算 │ │ 张量计算 │ │ 定制加速 │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 优势: │
│ - 任务匹配最优硬件 │
│ - 更高的计算效率 │
│ - 更好的能效比 │
│ - 灵活的资源配置 │
└─────────────────────────────────────────────────────────────┘
主流 AI 加速器对比
GPU (Graphics Processing Unit)
GPU 是目前最主流的 AI 加速器,以 NVIDIA 为代表:
package heterogeneous
import (
"fmt"
)
// GPUSpec GPU 规格定义
type GPUSpec struct {
Name string
Architecture string
ComputeCapability string
CUDACores int
TensorCores int
MemoryGB int
MemoryBandwidth float64 // GB/s
TDP int // Watts
FP16TFLOPS float64
FP32TFLOPS float64
INT8TOPS float64
InterconnectType string // NVLink, PCIe
InterconnectBandwidth float64 // GB/s
}
// 主流 GPU 规格对比
var GPUCatalog = map[string]GPUSpec{
"A100-80GB": {
Name: "NVIDIA A100",
Architecture: "Ampere",
ComputeCapability: "8.0",
CUDACores: 6912,
TensorCores: 432,
MemoryGB: 80,
MemoryBandwidth: 2039,
TDP: 400,
FP16TFLOPS: 312,
FP32TFLOPS: 19.5,
INT8TOPS: 624,
InterconnectType: "NVLink",
InterconnectBandwidth: 600,
},
"H100-80GB": {
Name: "NVIDIA H100",
Architecture: "Hopper",
ComputeCapability: "9.0",
CUDACores: 16896,
TensorCores: 528,
MemoryGB: 80,
MemoryBandwidth: 3350,
TDP: 700,
FP16TFLOPS: 989,
FP32TFLOPS: 67,
INT8TOPS: 1979,
InterconnectType: "NVLink",
InterconnectBandwidth: 900,
},
"H200-141GB": {
Name: "NVIDIA H200",
Architecture: "Hopper",
ComputeCapability: "9.0",
CUDACores: 16896,
TensorCores: 528,
MemoryGB: 141,
MemoryBandwidth: 4800,
TDP: 700,
FP16TFLOPS: 989,
FP32TFLOPS: 67,
INT8TOPS: 1979,
InterconnectType: "NVLink",
InterconnectBandwidth: 900,
},
"L40S": {
Name: "NVIDIA L40S",
Architecture: "Ada Lovelace",
ComputeCapability: "8.9",
CUDACores: 18176,
TensorCores: 568,
MemoryGB: 48,
MemoryBandwidth: 864,
TDP: 350,
FP16TFLOPS: 362,
FP32TFLOPS: 91.6,
INT8TOPS: 724,
InterconnectType: "PCIe",
InterconnectBandwidth: 64,
},
"RTX-4090": {
Name: "NVIDIA RTX 4090",
Architecture: "Ada Lovelace",
ComputeCapability: "8.9",
CUDACores: 16384,
TensorCores: 512,
MemoryGB: 24,
MemoryBandwidth: 1008,
TDP: 450,
FP16TFLOPS: 330,
FP32TFLOPS: 82.6,
INT8TOPS: 660,
InterconnectType: "PCIe",
InterconnectBandwidth: 64,
},
}
// GPUCapability GPU 能力枚举
type GPUCapability string
const (
CapTensorCore GPUCapability = "tensor_core" // Tensor Core 支持
CapFP8 GPUCapability = "fp8" // FP8 支持
CapNVLink GPUCapability = "nvlink" // NVLink 互连
CapMIG GPUCapability = "mig" // 多实例 GPU
CapTransformerEngine GPUCapability = "transformer_engine"
)
// GetGPUCapabilities 获取 GPU 能力
func GetGPUCapabilities(spec *GPUSpec) []GPUCapability {
caps := []GPUCapability{}
if spec.TensorCores > 0 {
caps = append(caps, CapTensorCore)
}
if spec.Architecture == "Hopper" {
caps = append(caps, CapFP8, CapTransformerEngine)
}
if spec.InterconnectType == "NVLink" {
caps = append(caps, CapNVLink)
}
if spec.Architecture == "Ampere" || spec.Architecture == "Hopper" {
caps = append(caps, CapMIG)
}
return caps
}
// CalculateModelFit 计算模型是否能在 GPU 上运行
func CalculateModelFit(spec *GPUSpec, modelParams int64, batchSize int, seqLen int) *ModelFitResult {
// 估算模型内存需求(FP16)
// 参数内存 = params * 2 bytes (FP16)
paramMemGB := float64(modelParams*2) / 1e9
// KV Cache 内存 = 2 * layers * hidden * batch * seq * 2 bytes
// 简化估算:约为参数的 10-20% per 1K tokens
kvCacheGB := paramMemGB * 0.15 * float64(seqLen) / 1024
// 激活内存(峰值)
activationGB := paramMemGB * 0.3 * float64(batchSize)
totalRequired := paramMemGB + kvCacheGB + activationGB
available := float64(spec.MemoryGB) * 0.9 // 预留 10% 碎片
return &ModelFitResult{
Fits: totalRequired <= available,
RequiredMemGB: totalRequired,
AvailableMemGB: available,
ParamMemGB: paramMemGB,
KVCacheMemGB: kvCacheGB,
ActivationMemGB: activationGB,
UtilizationPct: (totalRequired / available) * 100,
}
}
type ModelFitResult struct {
Fits bool
RequiredMemGB float64
AvailableMemGB float64
ParamMemGB float64
KVCacheMemGB float64
ActivationMemGB float64
UtilizationPct float64
}
TPU (Tensor Processing Unit)
Google 的 TPU 专为机器学习设计:
package heterogeneous
// TPUSpec TPU 规格
type TPUSpec struct {
Name string
Version string
ChipsPerPod int
HBMPerChip int // GB
TFLOPSPerChip float64 // BF16
InterconnectBandwidth float64 // Gbps
TDP int // Watts per chip
}
var TPUCatalog = map[string]TPUSpec{
"v4": {
Name: "Cloud TPU v4",
Version: "v4",
ChipsPerPod: 4096,
HBMPerChip: 32,
TFLOPSPerChip: 275,
InterconnectBandwidth: 4800,
TDP: 192,
},
"v5e": {
Name: "Cloud TPU v5e",
Version: "v5e",
ChipsPerPod: 256,
HBMPerChip: 16,
TFLOPSPerChip: 197,
InterconnectBandwidth: 1600,
TDP: 150,
},
"v5p": {
Name: "Cloud TPU v5p",
Version: "v5p",
ChipsPerPod: 8960,
HBMPerChip: 95,
TFLOPSPerChip: 459,
InterconnectBandwidth: 4800,
TDP: 350,
},
}
// TPUTopology TPU 拓扑配置
type TPUTopology struct {
Version string
Shape [3]int // x, y, z 维度
NumChips int
Accelerators int
}
// 常见 TPU 配置
var TPUTopologies = []TPUTopology{
{Version: "v4", Shape: [3]int{2, 2, 1}, NumChips: 4, Accelerators: 4},
{Version: "v4", Shape: [3]int{2, 2, 4}, NumChips: 16, Accelerators: 16},
{Version: "v4", Shape: [3]int{4, 4, 4}, NumChips: 64, Accelerators: 64},
{Version: "v4", Shape: [3]int{8, 8, 8}, NumChips: 512, Accelerators: 512},
{Version: "v5e", Shape: [3]int{2, 2, 1}, NumChips: 4, Accelerators: 4},
{Version: "v5e", Shape: [3]int{4, 4, 1}, NumChips: 16, Accelerators: 16},
{Version: "v5p", Shape: [3]int{2, 2, 2}, NumChips: 8, Accelerators: 8},
{Version: "v5p", Shape: [3]int{4, 4, 4}, NumChips: 64, Accelerators: 64},
}
// TPUPodSlice TPU Pod 切片
type TPUPodSlice struct {
Topology TPUTopology
TotalHBMGB int
TotalTFLOPS float64
NumHosts int
}
func NewTPUPodSlice(topology TPUTopology) *TPUPodSlice {
spec := TPUCatalog[topology.Version]
return &TPUPodSlice{
Topology: topology,
TotalHBMGB: topology.NumChips * spec.HBMPerChip,
TotalTFLOPS: float64(topology.NumChips) * spec.TFLOPSPerChip,
NumHosts: (topology.NumChips + 3) / 4, // 每主机 4 chips
}
}
其他加速器
package heterogeneous
// AcceleratorType 加速器类型
type AcceleratorType string
const (
AccelGPU AcceleratorType = "gpu"
AccelTPU AcceleratorType = "tpu"
AccelFPGA AcceleratorType = "fpga"
AccelASIC AcceleratorType = "asic"
AccelNPU AcceleratorType = "npu"
)
// AcceleratorSpec 通用加速器规格
type AcceleratorSpec struct {
Type AcceleratorType
Vendor string
Model string
ComputeTFLOPS float64
MemoryGB int
MemoryBandwidth float64 // GB/s
TDP int
Features []string
UseCases []string
}
// 其他加速器目录
var OtherAccelerators = map[string]AcceleratorSpec{
// Intel Gaudi
"gaudi2": {
Type: AccelASIC,
Vendor: "Intel",
Model: "Gaudi 2",
ComputeTFLOPS: 432, // BF16
MemoryGB: 96,
MemoryBandwidth: 2450,
TDP: 600,
Features: []string{"RoCE v2", "Scale-out"},
UseCases: []string{"LLM Training", "LLM Inference"},
},
"gaudi3": {
Type: AccelASIC,
Vendor: "Intel",
Model: "Gaudi 3",
ComputeTFLOPS: 1835, // BF16
MemoryGB: 128,
MemoryBandwidth: 3700,
TDP: 900,
Features: []string{"FP8", "Scale-out"},
UseCases: []string{"LLM Training", "LLM Inference"},
},
// AWS Inferentia/Trainium
"trainium": {
Type: AccelASIC,
Vendor: "AWS",
Model: "Trainium",
ComputeTFLOPS: 190,
MemoryGB: 32,
MemoryBandwidth: 820,
TDP: 180,
Features: []string{"NeuronLink"},
UseCases: []string{"LLM Training"},
},
"trainium2": {
Type: AccelASIC,
Vendor: "AWS",
Model: "Trainium2",
ComputeTFLOPS: 380,
MemoryGB: 96,
MemoryBandwidth: 2460,
TDP: 300,
Features: []string{"NeuronLink v2"},
UseCases: []string{"LLM Training", "Inference"},
},
"inferentia2": {
Type: AccelASIC,
Vendor: "AWS",
Model: "Inferentia2",
ComputeTFLOPS: 190,
MemoryGB: 32,
MemoryBandwidth: 820,
TDP: 130,
Features: []string{"Low latency"},
UseCases: []string{"LLM Inference"},
},
// AMD
"mi300x": {
Type: AccelGPU,
Vendor: "AMD",
Model: "Instinct MI300X",
ComputeTFLOPS: 1307, // FP16
MemoryGB: 192,
MemoryBandwidth: 5300,
TDP: 750,
Features: []string{"ROCm", "Infinity Fabric"},
UseCases: []string{"LLM Training", "LLM Inference"},
},
// Cerebras
"wse2": {
Type: AccelASIC,
Vendor: "Cerebras",
Model: "WSE-2",
ComputeTFLOPS: 2000, // Theoretical
MemoryGB: 40, // On-chip SRAM
MemoryBandwidth: 20000, // On-chip
TDP: 15000,
Features: []string{"Wafer-scale", "On-chip memory"},
UseCases: []string{"LLM Training"},
},
// Groq
"lpu": {
Type: AccelASIC,
Vendor: "Groq",
Model: "LPU",
ComputeTFLOPS: 750,
MemoryGB: 230, // SRAM
MemoryBandwidth: 80000, // On-chip
TDP: 300,
Features: []string{"Deterministic latency", "SRAM-only"},
UseCases: []string{"LLM Inference"},
},
}
// AcceleratorComparison 加速器对比
type AcceleratorComparison struct {
Name string
TFLOPSPerDollar float64
TFLOPSPerWatt float64
MemoryPerTFLOPS float64
BestFor []string
}
func CompareAccelerators() []AcceleratorComparison {
// 估算价格(每小时云端价格,美元)
prices := map[string]float64{
"H100-80GB": 3.50,
"A100-80GB": 2.50,
"L40S": 1.20,
"gaudi2": 1.80,
"mi300x": 3.00,
"trainium2": 1.50,
"inferentia2": 0.75,
}
comparisons := make([]AcceleratorComparison, 0)
// GPU 对比
for name, spec := range GPUCatalog {
if price, ok := prices[name]; ok {
comparisons = append(comparisons, AcceleratorComparison{
Name: name,
TFLOPSPerDollar: spec.FP16TFLOPS / price,
TFLOPSPerWatt: spec.FP16TFLOPS / float64(spec.TDP),
MemoryPerTFLOPS: float64(spec.MemoryGB) / spec.FP16TFLOPS,
BestFor: determineBestUse(spec.FP16TFLOPS, spec.MemoryGB),
})
}
}
return comparisons
}
func determineBestUse(tflops float64, memGB int) []string {
uses := []string{}
if tflops > 500 && memGB > 60 {
uses = append(uses, "Large LLM Training")
}
if memGB > 40 {
uses = append(uses, "Large Model Inference")
}
if tflops > 300 {
uses = append(uses, "High Throughput Inference")
}
return uses
}
异构计算资源抽象
统一资源模型
package heterogeneous
import (
"context"
"sync"
"time"
)
// ComputeDevice 计算设备抽象
type ComputeDevice interface {
GetType() AcceleratorType
GetID() string
GetSpec() *DeviceSpec
GetStatus() *DeviceStatus
GetCapabilities() []string
Allocate(ctx context.Context, req *AllocationRequest) (*Allocation, error)
Release(allocationID string) error
}
// DeviceSpec 设备规格
type DeviceSpec struct {
Type AcceleratorType `json:"type"`
Vendor string `json:"vendor"`
Model string `json:"model"`
ComputeCapacity float64 `json:"compute_capacity"` // TFLOPS
MemoryCapacity int64 `json:"memory_capacity"` // Bytes
MemoryBandwidth float64 `json:"memory_bandwidth"` // GB/s
Interconnect string `json:"interconnect"`
InterconnectBW float64 `json:"interconnect_bw"` // GB/s
TDP int `json:"tdp"`
}
// DeviceStatus 设备状态
type DeviceStatus struct {
Healthy bool `json:"healthy"`
MemoryUsed int64 `json:"memory_used"`
MemoryFree int64 `json:"memory_free"`
Utilization float64 `json:"utilization"`
Temperature int `json:"temperature"`
PowerUsage int `json:"power_usage"`
Allocations []string `json:"allocations"`
LastUpdated time.Time `json:"last_updated"`
}
// AllocationRequest 资源分配请求
type AllocationRequest struct {
RequestID string `json:"request_id"`
DeviceType AcceleratorType `json:"device_type"`
DeviceCount int `json:"device_count"`
MemoryRequired int64 `json:"memory_required"`
ComputeRequired float64 `json:"compute_required"`
Exclusive bool `json:"exclusive"`
Affinity *AffinitySpec `json:"affinity"`
AntiAffinity *AntiAffinitySpec `json:"anti_affinity"`
Timeout time.Duration `json:"timeout"`
}
type AffinitySpec struct {
SameNode bool `json:"same_node"`
SameRack bool `json:"same_rack"`
Interconnect string `json:"interconnect"` // 要求的互连类型
DeviceIDs []string `json:"device_ids"` // 指定设备
}
type AntiAffinitySpec struct {
WorkloadIDs []string `json:"workload_ids"` // 避开的工作负载
}
// Allocation 资源分配结果
type Allocation struct {
ID string `json:"id"`
RequestID string `json:"request_id"`
DeviceIDs []string `json:"device_ids"`
NodeIDs []string `json:"node_ids"`
Status AllocationStatus `json:"status"`
CreateTime time.Time `json:"create_time"`
ExpireTime *time.Time `json:"expire_time"`
}
type AllocationStatus string
const (
AllocStatusPending AllocationStatus = "pending"
AllocStatusActive AllocationStatus = "active"
AllocStatusReleasing AllocationStatus = "releasing"
AllocStatusReleased AllocationStatus = "released"
AllocStatusFailed AllocationStatus = "failed"
)
// DevicePool 设备池
type DevicePool struct {
devices map[string]ComputeDevice
allocations map[string]*Allocation
mu sync.RWMutex
}
func NewDevicePool() *DevicePool {
return &DevicePool{
devices: make(map[string]ComputeDevice),
allocations: make(map[string]*Allocation),
}
}
// RegisterDevice 注册设备
func (p *DevicePool) RegisterDevice(device ComputeDevice) {
p.mu.Lock()
defer p.mu.Unlock()
p.devices[device.GetID()] = device
}
// GetDevicesByType 按类型获取设备
func (p *DevicePool) GetDevicesByType(deviceType AcceleratorType) []ComputeDevice {
p.mu.RLock()
defer p.mu.RUnlock()
result := make([]ComputeDevice, 0)
for _, device := range p.devices {
if device.GetType() == deviceType {
result = append(result, device)
}
}
return result
}
// GetAvailableDevices 获取可用设备
func (p *DevicePool) GetAvailableDevices(req *AllocationRequest) []ComputeDevice {
p.mu.RLock()
defer p.mu.RUnlock()
result := make([]ComputeDevice, 0)
for _, device := range p.devices {
// 检查设备类型
if device.GetType() != req.DeviceType {
continue
}
// 检查设备健康状态
status := device.GetStatus()
if !status.Healthy {
continue
}
// 检查内存是否充足
if status.MemoryFree < req.MemoryRequired {
continue
}
// 检查是否已被独占
if req.Exclusive && len(status.Allocations) > 0 {
continue
}
// 检查亲和性
if req.Affinity != nil && !p.checkAffinity(device, req.Affinity) {
continue
}
result = append(result, device)
}
return result
}
func (p *DevicePool) checkAffinity(device ComputeDevice, affinity *AffinitySpec) bool {
if len(affinity.DeviceIDs) > 0 {
found := false
for _, id := range affinity.DeviceIDs {
if device.GetID() == id {
found = true
break
}
}
if !found {
return false
}
}
if affinity.Interconnect != "" {
spec := device.GetSpec()
if spec.Interconnect != affinity.Interconnect {
return false
}
}
return true
}
// Allocate 分配设备
func (p *DevicePool) Allocate(ctx context.Context, req *AllocationRequest) (*Allocation, error) {
available := p.GetAvailableDevices(req)
if len(available) < req.DeviceCount {
return nil, fmt.Errorf("not enough devices: need %d, available %d",
req.DeviceCount, len(available))
}
// 选择最优设备组合
selected := p.selectDevices(available, req)
// 执行分配
allocation := &Allocation{
ID: fmt.Sprintf("alloc-%d", time.Now().UnixNano()),
RequestID: req.RequestID,
DeviceIDs: make([]string, 0, len(selected)),
NodeIDs: make([]string, 0),
Status: AllocStatusPending,
CreateTime: time.Now(),
}
for _, device := range selected {
_, err := device.Allocate(ctx, req)
if err != nil {
// 回滚已分配的设备
for _, id := range allocation.DeviceIDs {
if d, ok := p.devices[id]; ok {
d.Release(allocation.ID)
}
}
return nil, err
}
allocation.DeviceIDs = append(allocation.DeviceIDs, device.GetID())
}
allocation.Status = AllocStatusActive
p.mu.Lock()
p.allocations[allocation.ID] = allocation
p.mu.Unlock()
return allocation, nil
}
func (p *DevicePool) selectDevices(available []ComputeDevice, req *AllocationRequest) []ComputeDevice {
// 简单的选择策略:按内存排序,选择内存最充足的
// 实际实现需要考虑拓扑、互连等因素
// 排序
for i := 0; i < len(available)-1; i++ {
for j := i + 1; j < len(available); j++ {
if available[i].GetStatus().MemoryFree < available[j].GetStatus().MemoryFree {
available[i], available[j] = available[j], available[i]
}
}
}
if req.DeviceCount <= len(available) {
return available[:req.DeviceCount]
}
return available
}
// Release 释放分配
func (p *DevicePool) Release(allocationID string) error {
p.mu.Lock()
allocation, exists := p.allocations[allocationID]
if !exists {
p.mu.Unlock()
return fmt.Errorf("allocation not found: %s", allocationID)
}
allocation.Status = AllocStatusReleasing
p.mu.Unlock()
// 释放每个设备
for _, deviceID := range allocation.DeviceIDs {
device, ok := p.devices[deviceID]
if ok {
device.Release(allocationID)
}
}
p.mu.Lock()
allocation.Status = AllocStatusReleased
delete(p.allocations, allocationID)
p.mu.Unlock()
return nil
}
异构计算调度策略
任务-设备匹配
package heterogeneous
import (
"math"
"sort"
)
// WorkloadProfile 工作负载特征
type WorkloadProfile struct {
Type WorkloadType `json:"type"`
ModelSize int64 `json:"model_size"` // 参数量
BatchSize int `json:"batch_size"`
SeqLength int `json:"seq_length"`
ComputeIntensity float64 `json:"compute_intensity"` // FLOPS/Byte
MemoryBound bool `json:"memory_bound"`
Requirements []string `json:"requirements"` // 特殊需求
SLO *SLORequirements `json:"slo"`
}
type WorkloadType string
const (
WorkloadTraining WorkloadType = "training"
WorkloadInference WorkloadType = "inference"
WorkloadFineTuning WorkloadType = "fine_tuning"
WorkloadEmbedding WorkloadType = "embedding"
)
type SLORequirements struct {
MaxLatencyMs int `json:"max_latency_ms"`
MinThroughput float64 `json:"min_throughput"` // tokens/s
MaxCostPerHour float64 `json:"max_cost_per_hour"`
}
// DeviceScore 设备评分
type DeviceScore struct {
Device ComputeDevice
Score float64
Reasons []string
EstLatency float64
EstThroughput float64
EstCost float64
}
// WorkloadMatcher 工作负载匹配器
type WorkloadMatcher struct {
devicePool *DevicePool
costModel *CostModel
}
func NewWorkloadMatcher(pool *DevicePool) *WorkloadMatcher {
return &WorkloadMatcher{
devicePool: pool,
costModel: NewCostModel(),
}
}
// Match 匹配最优设备
func (m *WorkloadMatcher) Match(workload *WorkloadProfile) []DeviceScore {
scores := make([]DeviceScore, 0)
// 获取所有设备类型
deviceTypes := []AcceleratorType{AccelGPU, AccelTPU, AccelASIC}
for _, deviceType := range deviceTypes {
devices := m.devicePool.GetDevicesByType(deviceType)
for _, device := range devices {
score := m.scoreDevice(device, workload)
if score.Score > 0 {
scores = append(scores, score)
}
}
}
// 按分数排序
sort.Slice(scores, func(i, j int) bool {
return scores[i].Score > scores[j].Score
})
return scores
}
func (m *WorkloadMatcher) scoreDevice(device ComputeDevice, workload *WorkloadProfile) DeviceScore {
score := DeviceScore{
Device: device,
Reasons: make([]string, 0),
}
spec := device.GetSpec()
status := device.GetStatus()
// 1. 基础可行性检查
if !status.Healthy {
return score
}
memRequired := m.estimateMemoryRequired(workload)
if status.MemoryFree < memRequired {
return score
}
// 2. 计算能力评分
computeScore := m.scoreComputeCapability(spec, workload)
score.Reasons = append(score.Reasons,
fmt.Sprintf("Compute score: %.2f", computeScore))
// 3. 内存带宽评分
memoryScore := m.scoreMemoryBandwidth(spec, workload)
score.Reasons = append(score.Reasons,
fmt.Sprintf("Memory bandwidth score: %.2f", memoryScore))
// 4. 特性匹配评分
featureScore := m.scoreFeatures(device, workload)
score.Reasons = append(score.Reasons,
fmt.Sprintf("Feature score: %.2f", featureScore))
// 5. 成本效益评分
costScore := m.scoreCostEfficiency(device, workload)
score.Reasons = append(score.Reasons,
fmt.Sprintf("Cost efficiency score: %.2f", costScore))
// 6. 估算性能指标
score.EstLatency = m.estimateLatency(spec, workload)
score.EstThroughput = m.estimateThroughput(spec, workload)
score.EstCost = m.costModel.EstimateCost(device, workload)
// 7. SLO 检查
sloScore := 1.0
if workload.SLO != nil {
if score.EstLatency > float64(workload.SLO.MaxLatencyMs) {
sloScore = 0
score.Reasons = append(score.Reasons, "Fails latency SLO")
}
if score.EstThroughput < workload.SLO.MinThroughput {
sloScore *= 0.5
score.Reasons = append(score.Reasons, "Below throughput SLO")
}
if score.EstCost > workload.SLO.MaxCostPerHour {
sloScore *= 0.5
score.Reasons = append(score.Reasons, "Exceeds cost SLO")
}
}
// 综合评分
score.Score = (computeScore*0.3 + memoryScore*0.2 +
featureScore*0.2 + costScore*0.3) * sloScore
return score
}
func (m *WorkloadMatcher) estimateMemoryRequired(workload *WorkloadProfile) int64 {
// 参数内存(FP16)
paramMem := workload.ModelSize * 2
// KV Cache
kvMem := int64(0)
if workload.Type == WorkloadInference {
// 简化估算
kvMem = int64(float64(paramMem) * 0.2 * float64(workload.SeqLength) / 1024)
}
// 激活内存
actMem := int64(float64(paramMem) * 0.3 * float64(workload.BatchSize))
// 优化器状态(训练时)
optMem := int64(0)
if workload.Type == WorkloadTraining {
optMem = paramMem * 6 // Adam: 参数 + 梯度 + m + v
}
return paramMem + kvMem + actMem + optMem
}
func (m *WorkloadMatcher) scoreComputeCapability(spec *DeviceSpec, workload *WorkloadProfile) float64 {
requiredTFLOPS := m.estimateRequiredTFLOPS(workload)
// 计算能力是否满足需求
if spec.ComputeCapacity >= requiredTFLOPS {
// 过剩不是越多越好,适度即可
ratio := spec.ComputeCapacity / requiredTFLOPS
if ratio > 4 {
return 0.8 // 严重过剩
}
return 1.0 - (ratio-1)*0.1
}
// 不满足需求
return spec.ComputeCapacity / requiredTFLOPS * 0.5
}
func (m *WorkloadMatcher) scoreMemoryBandwidth(spec *DeviceSpec, workload *WorkloadProfile) float64 {
// 内存带宽对内存密集型工作负载更重要
if workload.MemoryBound {
// 需要更高的带宽
requiredBW := float64(workload.ModelSize) * 2 / 1e9 * 100 // 简化估算
if spec.MemoryBandwidth >= requiredBW {
return 1.0
}
return spec.MemoryBandwidth / requiredBW
}
return 0.8 // 计算密集型对带宽要求较低
}
func (m *WorkloadMatcher) scoreFeatures(device ComputeDevice, workload *WorkloadProfile) float64 {
caps := device.GetCapabilities()
capSet := make(map[string]bool)
for _, c := range caps {
capSet[c] = true
}
matchCount := 0
for _, req := range workload.Requirements {
if capSet[req] {
matchCount++
}
}
if len(workload.Requirements) == 0 {
return 1.0
}
return float64(matchCount) / float64(len(workload.Requirements))
}
func (m *WorkloadMatcher) scoreCostEfficiency(device ComputeDevice, workload *WorkloadProfile) float64 {
cost := m.costModel.EstimateCost(device, workload)
throughput := m.estimateThroughput(device.GetSpec(), workload)
if throughput == 0 {
return 0
}
// 每美元吞吐量
efficiency := throughput / cost
// 归一化(基于经验值)
baselineEfficiency := 100.0 // tokens/$ baseline
return math.Min(efficiency/baselineEfficiency, 1.5)
}
func (m *WorkloadMatcher) estimateRequiredTFLOPS(workload *WorkloadProfile) float64 {
// 基于模型大小和批次大小估算
// 简化公式:TFLOPS = 2 * params * batch_size * seq_len / time
return float64(workload.ModelSize) * 2 * float64(workload.BatchSize) / 1e12
}
func (m *WorkloadMatcher) estimateLatency(spec *DeviceSpec, workload *WorkloadProfile) float64 {
// 简化的延迟估算
computeTime := float64(workload.ModelSize*2) / (spec.ComputeCapacity * 1e12) * 1000 // ms
memoryTime := float64(workload.ModelSize*2) / (spec.MemoryBandwidth * 1e9) * 1000 // ms
return math.Max(computeTime, memoryTime) * float64(workload.SeqLength)
}
func (m *WorkloadMatcher) estimateThroughput(spec *DeviceSpec, workload *WorkloadProfile) float64 {
latency := m.estimateLatency(spec, workload)
if latency == 0 {
return 0
}
return float64(workload.BatchSize*workload.SeqLength) / latency * 1000 // tokens/s
}
// CostModel 成本模型
type CostModel struct {
prices map[string]float64 // 设备ID -> 每小时价格
}
func NewCostModel() *CostModel {
return &CostModel{
prices: map[string]float64{
"H100-80GB": 3.50,
"A100-80GB": 2.50,
"L40S": 1.20,
"TPU-v4": 3.00,
"gaudi2": 1.80,
},
}
}
func (c *CostModel) EstimateCost(device ComputeDevice, workload *WorkloadProfile) float64 {
spec := device.GetSpec()
basePrice := c.prices[spec.Model]
if basePrice == 0 {
basePrice = 2.0 // 默认价格
}
return basePrice
}
互连拓扑管理
设备拓扑发现
package heterogeneous
import (
"fmt"
)
// TopologyType 拓扑类型
type TopologyType string
const (
TopoNVLink TopologyType = "nvlink"
TopoPCIe TopologyType = "pcie"
TopoNVSwitch TopologyType = "nvswitch"
TopoInfiniBand TopologyType = "infiniband"
TopoEthernet TopologyType = "ethernet"
)
// DeviceTopology 设备拓扑
type DeviceTopology struct {
Nodes map[string]*TopologyNode
Links []*TopologyLink
Switches []*TopologySwitch
}
type TopologyNode struct {
DeviceID string
DeviceType AcceleratorType
NodeID string // 物理节点 ID
PCIeBus string // PCIe 总线地址
NUMANode int // NUMA 节点
Links []string // 连接的设备 ID
}
type TopologyLink struct {
Source string
Target string
Type TopologyType
Bandwidth float64 // GB/s
Latency float64 // ns
Hops int
}
type TopologySwitch struct {
ID string
Type TopologyType
Ports int
Bandwidth float64
ConnectedDevices []string
}
// TopologyManager 拓扑管理器
type TopologyManager struct {
topology *DeviceTopology
}
func NewTopologyManager() *TopologyManager {
return &TopologyManager{
topology: &DeviceTopology{
Nodes: make(map[string]*TopologyNode),
Links: make([]*TopologyLink, 0),
Switches: make([]*TopologySwitch, 0),
},
}
}
// DiscoverTopology 发现拓扑
func (tm *TopologyManager) DiscoverTopology() error {
// 发现 NVIDIA GPU 拓扑
if err := tm.discoverNVIDIATopology(); err != nil {
return err
}
// 发现网络拓扑
if err := tm.discoverNetworkTopology(); err != nil {
return err
}
return nil
}
func (tm *TopologyManager) discoverNVIDIATopology() error {
// 实际实现会调用 nvidia-smi 或 NVML 库
// 这里提供示例拓扑
// 8-GPU DGX 系统示例
for i := 0; i < 8; i++ {
node := &TopologyNode{
DeviceID: fmt.Sprintf("GPU-%d", i),
DeviceType: AccelGPU,
NodeID: "dgx-01",
PCIeBus: fmt.Sprintf("0000:%02x:00.0", i*16),
NUMANode: i / 4,
Links: make([]string, 0),
}
tm.topology.Nodes[node.DeviceID] = node
}
// NVLink 连接(简化的全连接拓扑)
for i := 0; i < 8; i++ {
for j := i + 1; j < 8; j++ {
// 同 NUMA 节点内使用 NVLink
linkType := TopoNVLink
bandwidth := 600.0 // H100 NVLink
latency := 0.5 // us
if i/4 != j/4 {
// 跨 NUMA 需要经过 NVSwitch
bandwidth = 450.0
latency = 1.0
}
link := &TopologyLink{
Source: fmt.Sprintf("GPU-%d", i),
Target: fmt.Sprintf("GPU-%d", j),
Type: linkType,
Bandwidth: bandwidth,
Latency: latency,
Hops: 1,
}
tm.topology.Links = append(tm.topology.Links, link)
// 更新节点连接
tm.topology.Nodes[fmt.Sprintf("GPU-%d", i)].Links = append(
tm.topology.Nodes[fmt.Sprintf("GPU-%d", i)].Links,
fmt.Sprintf("GPU-%d", j),
)
tm.topology.Nodes[fmt.Sprintf("GPU-%d", j)].Links = append(
tm.topology.Nodes[fmt.Sprintf("GPU-%d", j)].Links,
fmt.Sprintf("GPU-%d", i),
)
}
}
return nil
}
func (tm *TopologyManager) discoverNetworkTopology() error {
// 发现 InfiniBand 或高速以太网拓扑
return nil
}
// GetLink 获取两个设备间的链路
func (tm *TopologyManager) GetLink(deviceA, deviceB string) *TopologyLink {
for _, link := range tm.topology.Links {
if (link.Source == deviceA && link.Target == deviceB) ||
(link.Source == deviceB && link.Target == deviceA) {
return link
}
}
return nil
}
// GetBandwidth 获取设备间带宽
func (tm *TopologyManager) GetBandwidth(deviceA, deviceB string) float64 {
link := tm.GetLink(deviceA, deviceB)
if link != nil {
return link.Bandwidth
}
return 0
}
// FindOptimalDeviceGroup 找到最优设备组(最大化互连带宽)
func (tm *TopologyManager) FindOptimalDeviceGroup(
candidates []string,
count int,
) []string {
if len(candidates) <= count {
return candidates
}
// 使用贪心算法选择最优组合
// 优先选择 NVLink 连接的设备
result := make([]string, 0, count)
remaining := make(map[string]bool)
for _, c := range candidates {
remaining[c] = true
}
// 选择第一个设备
result = append(result, candidates[0])
delete(remaining, candidates[0])
// 贪心选择剩余设备
for len(result) < count && len(remaining) > 0 {
var bestDevice string
bestScore := -1.0
for device := range remaining {
score := tm.calculateGroupScore(append(result, device))
if score > bestScore {
bestScore = score
bestDevice = device
}
}
if bestDevice != "" {
result = append(result, bestDevice)
delete(remaining, bestDevice)
} else {
break
}
}
return result
}
func (tm *TopologyManager) calculateGroupScore(devices []string) float64 {
if len(devices) < 2 {
return 0
}
totalBW := 0.0
for i := 0; i < len(devices); i++ {
for j := i + 1; j < len(devices); j++ {
bw := tm.GetBandwidth(devices[i], devices[j])
totalBW += bw
}
}
// 平均带宽
numPairs := float64(len(devices) * (len(devices) - 1) / 2)
return totalBW / numPairs
}
// IsNVLinkConnected 检查是否 NVLink 连接
func (tm *TopologyManager) IsNVLinkConnected(deviceA, deviceB string) bool {
link := tm.GetLink(deviceA, deviceB)
return link != nil && link.Type == TopoNVLink
}
// GetNUMANode 获取设备的 NUMA 节点
func (tm *TopologyManager) GetNUMANode(deviceID string) int {
node := tm.topology.Nodes[deviceID]
if node != nil {
return node.NUMANode
}
return -1
}
小结
本章介绍了异构计算的基础概念和核心组件:
- 加速器对比:GPU、TPU、FPGA、ASIC 等各类加速器的特性与适用场景
- 资源抽象:统一的计算设备模型,屏蔽底层差异
- 调度策略:工作负载特征分析与设备匹配算法
- 拓扑管理:设备互连拓扑发现与优化分配
异构计算是构建高效 AI 基础设施的关键。通过合理的资源抽象和调度策略,可以充分发挥各类加速器的优势,实现最优的性价比。
下一章我们将深入探讨 GPU 虚拟化与共享,讲解如何在多租户环境中高效共享 GPU 资源。