设备拓扑感知调度
概述
在大规模 AI 集群中,加速器之间的拓扑关系对训练性能有重大影响。拓扑感知调度通过理解设备互联关系、NUMA 亲和性等,优化任务放置,最大化通信效率。本文深入探讨拓扑感知调度的原理与实现。
拓扑结构分析
GPU 服务器拓扑
┌─────────────────────────────────────────────────────────────────┐
│ 典型 8-GPU 服务器拓扑 (DGX A100) │
├─────────────────────────────────────────────────────────────────┤
│ │
│ NUMA Node 0 NUMA Node 1 │
│ ┌────────────────────────┐ ┌────────────────────────┐ │
│ │ │ │ │ │
│ │ ┌──────┐ ┌──────┐ │ │ ┌──────┐ ┌──────┐ │ │
│ │ │ CPU │ │ CPU │ │ │ │ CPU │ │ CPU │ │ │
│ │ │ 0 │ │ 1 │ │ │ │ 2 │ │ 3 │ │ │
│ │ └──┬───┘ └──┬───┘ │ │ └──┬───┘ └──┬───┘ │ │
│ │ │ │ │ │ │ │ │ │
│ │ ┌──┴─────────┴──┐ │ │ ┌──┴─────────┴──┐ │ │
│ │ │ Memory │ │ │ │ Memory │ │ │
│ │ │ 512GB │ │ │ │ 512GB │ │ │
│ │ └───────────────┘ │ │ └───────────────┘ │ │
│ │ │ │ │ │ │ │
│ │ ═══════╪═════════ │ QPI │ ══════╪═════════ │ │
│ │ │ │◄────────►│ │ │ │
│ │ ┌──────┴──────┐ │ │ ┌──────┴──────┐ │ │
│ │ │ PCIe │ │ │ │ PCIe │ │ │
│ │ │ Switch │ │ │ │ Switch │ │ │
│ │ └──────┬──────┘ │ │ └──────┬──────┘ │ │
│ │ │ │ │ │ │ │
│ └─────────┼─────────────┘ └──────────┼────────────┘ │
│ │ │ │
│ ══════════╪═══════════════════════════════════╪═══════════ │
│ │ NVSwitch Fabric │ │
│ ══════════╪═══════════════════════════════════╪═══════════ │
│ │ │ │
│ ┌─────────┼───────────────────────────────────┼─────────┐ │
│ │ │ │ │ │
│ │ ┌──────┴──────┐ ┌──────┴──────┐ │ │
│ │ │ NVSwitch │◄───────────────────►│ NVSwitch │ │ │
│ │ │ 0 │ │ 1 │ │ │
│ │ └──────┬──────┘ └──────┬──────┘ │ │
│ │ │ │ │ │
│ │ ┌──────┴──────┐ ┌──────┴──────┐ │ │
│ │ │ NVSwitch │◄───────────────────►│ NVSwitch │ │ │
│ │ │ 2 │ │ 3 │ │ │
│ │ └──────┬──────┘ └──────┬──────┘ │ │
│ │ │ │ │ │
│ └─────────┼───────────────────────────────────┼─────────┘ │
│ │ │ │
│ ┌─────────┴──────┐ ┌─────────┐ ┌──────┴─────────┐ │
│ │ GPU Group 0 │ │ ... │ │ GPU Group 1 │ │
│ │ ┌────┐ ┌────┐│ │ │ │┌────┐ ┌────┐ │ │
│ │ │GPU0│ │GPU1││◄───►│NVLink │◄───►││GPU4│ │GPU5│ │ │
│ │ └────┘ └────┘│ │600GB/s │ │└────┘ └────┘ │ │
│ │ ┌────┐ ┌────┐│ │ │ │┌────┐ ┌────┐ │ │
│ │ │GPU2│ │GPU3││◄───►│per pair │◄───►││GPU6│ │GPU7│ │ │
│ │ └────┘ └────┘│ │ │ │└────┘ └────┘ │ │
│ └───────────────┘ └─────────┘ └────────────────┘ │
│ │
│ 带宽层次: │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ NVLink (同 NVSwitch): 600 GB/s (双向) │ │
│ │ NVLink (跨 NVSwitch): 300 GB/s │ │
│ │ PCIe Gen4 x16: 32 GB/s │ │
│ │ 跨 NUMA (QPI): ~25 GB/s │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
拓扑距离矩阵
"""
GPU 拓扑分析与距离计算
"""
import numpy as np
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum
import subprocess
import re
class ConnectionType(Enum):
"""连接类型"""
NVLINK = "NVLink"
NVSWITCH = "NVSwitch"
PCIE = "PCIe"
SYS = "SYS" # 跨 NUMA
NODE = "NODE" # 跨节点
@dataclass
class GPUConnection:
"""GPU 连接信息"""
gpu_a: int
gpu_b: int
connection_type: ConnectionType
bandwidth_gbps: float
latency_us: float
@dataclass
class NUMANode:
"""NUMA 节点"""
node_id: int
cpus: List[int]
memory_mb: int
gpus: List[int]
class TopologyAnalyzer:
"""拓扑分析器"""
# 典型带宽值 (GB/s)
BANDWIDTH = {
ConnectionType.NVLINK: 600, # A100 NVLink 4.0
ConnectionType.NVSWITCH: 600, # 通过 NVSwitch
ConnectionType.PCIE: 32, # PCIe Gen4 x16
ConnectionType.SYS: 25, # 跨 NUMA (QPI/UPI)
ConnectionType.NODE: 12.5 # 跨节点 (InfiniBand HDR)
}
# 典型延迟值 (μs)
LATENCY = {
ConnectionType.NVLINK: 1,
ConnectionType.NVSWITCH: 2,
ConnectionType.PCIE: 5,
ConnectionType.SYS: 10,
ConnectionType.NODE: 50
}
def __init__(self):
self.gpu_count = 0
self.numa_nodes: List[NUMANode] = []
self.connections: List[GPUConnection] = []
self.distance_matrix: Optional[np.ndarray] = None
def discover_topology(self) -> bool:
"""发现系统拓扑"""
try:
# 获取 GPU 拓扑
self._discover_gpu_topology()
# 获取 NUMA 拓扑
self._discover_numa_topology()
# 构建距离矩阵
self._build_distance_matrix()
return True
except Exception as e:
print(f"Topology discovery failed: {e}")
return False
def _discover_gpu_topology(self):
"""发现 GPU 拓扑"""
try:
# 使用 nvidia-smi topo
result = subprocess.run(
["nvidia-smi", "topo", "-m"],
capture_output=True, text=True
)
if result.returncode != 0:
raise RuntimeError("nvidia-smi failed")
# 解析输出
lines = result.stdout.strip().split('\n')
# 找到 GPU 行
gpu_lines = [l for l in lines if l.startswith('GPU')]
self.gpu_count = len(gpu_lines)
# 解析连接矩阵
for i, line in enumerate(gpu_lines):
parts = line.split()
if len(parts) < self.gpu_count + 1:
continue
for j in range(self.gpu_count):
if i == j:
continue
conn_str = parts[j + 1]
conn_type = self._parse_connection_type(conn_str)
self.connections.append(GPUConnection(
gpu_a=i,
gpu_b=j,
connection_type=conn_type,
bandwidth_gbps=self.BANDWIDTH[conn_type],
latency_us=self.LATENCY[conn_type]
))
except FileNotFoundError:
# 模拟 8-GPU 系统
self._simulate_dgx_topology()
def _parse_connection_type(self, conn_str: str) -> ConnectionType:
"""解析连接类型字符串"""
conn_str = conn_str.upper()
if "NV" in conn_str and "SWITCH" not in conn_str:
return ConnectionType.NVLINK
elif "NVS" in conn_str or "NVSWITCH" in conn_str:
return ConnectionType.NVSWITCH
elif "PIX" in conn_str or "PXB" in conn_str:
return ConnectionType.PCIE
elif "SYS" in conn_str:
return ConnectionType.SYS
elif "NODE" in conn_str:
return ConnectionType.NODE
else:
return ConnectionType.PCIE
def _simulate_dgx_topology(self):
"""模拟 DGX A100 拓扑"""
self.gpu_count = 8
# NVSwitch 全连接
for i in range(8):
for j in range(8):
if i != j:
# 同一 NUMA 节点内的 GPU 使用 NVSwitch
if (i < 4 and j < 4) or (i >= 4 and j >= 4):
conn_type = ConnectionType.NVSWITCH
else:
# 跨 NUMA 也通过 NVSwitch,但需要额外跳转
conn_type = ConnectionType.NVSWITCH
self.connections.append(GPUConnection(
gpu_a=i,
gpu_b=j,
connection_type=conn_type,
bandwidth_gbps=self.BANDWIDTH[conn_type],
latency_us=self.LATENCY[conn_type]
))
def _discover_numa_topology(self):
"""发现 NUMA 拓扑"""
try:
# 获取 NUMA 信息
result = subprocess.run(
["numactl", "--hardware"],
capture_output=True, text=True
)
if result.returncode == 0:
# 解析 numactl 输出
pass
except FileNotFoundError:
pass
# 默认配置
if not self.numa_nodes:
# 假设 2 个 NUMA 节点,每个 4 个 GPU
self.numa_nodes = [
NUMANode(
node_id=0,
cpus=list(range(0, 64)),
memory_mb=512 * 1024,
gpus=[0, 1, 2, 3]
),
NUMANode(
node_id=1,
cpus=list(range(64, 128)),
memory_mb=512 * 1024,
gpus=[4, 5, 6, 7]
)
]
def _build_distance_matrix(self):
"""构建距离矩阵"""
# 使用延迟作为距离度量
self.distance_matrix = np.zeros((self.gpu_count, self.gpu_count))
for conn in self.connections:
# 距离 = 延迟 (标准化)
distance = conn.latency_us / self.LATENCY[ConnectionType.NVLINK]
self.distance_matrix[conn.gpu_a][conn.gpu_b] = distance
def get_optimal_placement(self, num_gpus: int) -> List[int]:
"""
获取最优 GPU 放置
使用贪心算法选择通信开销最小的 GPU 组合
"""
if num_gpus > self.gpu_count:
raise ValueError(f"Requested {num_gpus} GPUs but only {self.gpu_count} available")
if num_gpus == self.gpu_count:
return list(range(self.gpu_count))
# 贪心选择
selected = [0] # 从 GPU 0 开始
while len(selected) < num_gpus:
best_gpu = -1
best_cost = float('inf')
for gpu in range(self.gpu_count):
if gpu in selected:
continue
# 计算添加此 GPU 的总通信成本
cost = sum(self.distance_matrix[gpu][s] for s in selected)
if cost < best_cost:
best_cost = cost
best_gpu = gpu
selected.append(best_gpu)
return sorted(selected)
def get_gpu_groups(self, group_size: int) -> List[List[int]]:
"""
获取 GPU 分组 (用于数据并行)
将 GPU 分成若干组,组内通信开销最小
"""
if self.gpu_count % group_size != 0:
raise ValueError(f"Cannot evenly divide {self.gpu_count} GPUs into groups of {group_size}")
num_groups = self.gpu_count // group_size
groups = []
used = set()
for _ in range(num_groups):
# 选择一个起始 GPU
start_gpu = -1
for gpu in range(self.gpu_count):
if gpu not in used:
start_gpu = gpu
break
group = [start_gpu]
used.add(start_gpu)
# 贪心添加最近的 GPU
while len(group) < group_size:
best_gpu = -1
best_cost = float('inf')
for gpu in range(self.gpu_count):
if gpu in used:
continue
cost = sum(self.distance_matrix[gpu][g] for g in group)
if cost < best_cost:
best_cost = cost
best_gpu = gpu
group.append(best_gpu)
used.add(best_gpu)
groups.append(sorted(group))
return groups
def get_numa_affinity(self, gpu_ids: List[int]) -> Dict[int, int]:
"""获取 GPU 的 NUMA 亲和性"""
affinity = {}
for gpu_id in gpu_ids:
for numa_node in self.numa_nodes:
if gpu_id in numa_node.gpus:
affinity[gpu_id] = numa_node.node_id
break
return affinity
def estimate_communication_time(
self,
gpu_ids: List[int],
data_size_mb: float,
pattern: str = "all_reduce"
) -> float:
"""
估算通信时间
Args:
gpu_ids: 参与通信的 GPU
data_size_mb: 数据大小 (MB)
pattern: 通信模式 (all_reduce, all_gather, reduce_scatter)
"""
if len(gpu_ids) < 2:
return 0
# 获取最慢的连接
min_bandwidth = float('inf')
for i in range(len(gpu_ids)):
for j in range(i + 1, len(gpu_ids)):
for conn in self.connections:
if conn.gpu_a == gpu_ids[i] and conn.gpu_b == gpu_ids[j]:
min_bandwidth = min(min_bandwidth, conn.bandwidth_gbps)
break
# Ring AllReduce 通信量: 2 * (n-1)/n * data_size
n = len(gpu_ids)
if pattern == "all_reduce":
effective_data = 2 * (n - 1) / n * data_size_mb
elif pattern == "all_gather":
effective_data = (n - 1) / n * data_size_mb
else:
effective_data = data_size_mb
# 转换为 GB
effective_data_gb = effective_data / 1024
# 计算时间 (秒)
time_s = effective_data_gb / min_bandwidth
return time_s * 1000 # 返回毫秒
def print_topology_matrix(self):
"""打印拓扑矩阵"""
print("\nGPU Topology Matrix:")
print("=" * 60)
# 表头
header = " " + " ".join(f"GPU{i:2d}" for i in range(self.gpu_count))
print(header)
# 每行
for i in range(self.gpu_count):
row = f"GPU{i:2d}"
for j in range(self.gpu_count):
if i == j:
row += " X "
else:
# 找连接类型
conn_type = "?"
for conn in self.connections:
if conn.gpu_a == i and conn.gpu_b == j:
conn_type = conn.connection_type.value[:3]
break
row += f" {conn_type:>4}"
print(row)
print("=" * 60)
def export_to_kubernetes_config(self) -> Dict:
"""导出为 Kubernetes 配置"""
config = {
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "gpu-topology-config"
},
"data": {
"topology.json": {
"gpu_count": self.gpu_count,
"numa_nodes": [
{
"node_id": n.node_id,
"gpus": n.gpus,
"cpus": n.cpus[:8] + ["..."] # 简化
}
for n in self.numa_nodes
],
"connections": [
{
"from": c.gpu_a,
"to": c.gpu_b,
"type": c.connection_type.value,
"bandwidth_gbps": c.bandwidth_gbps
}
for c in self.connections[:16] # 简化
]
}
}
}
return config
# 使用示例
if __name__ == "__main__":
analyzer = TopologyAnalyzer()
analyzer.discover_topology()
print(f"Discovered {analyzer.gpu_count} GPUs")
analyzer.print_topology_matrix()
# 获取最优放置
optimal_4gpu = analyzer.get_optimal_placement(4)
print(f"\nOptimal 4-GPU placement: {optimal_4gpu}")
# 获取分组
groups = analyzer.get_gpu_groups(2)
print(f"GPU groups (size=2): {groups}")
# 估算通信时间
comm_time = analyzer.estimate_communication_time(
gpu_ids=list(range(8)),
data_size_mb=100,
pattern="all_reduce"
)
print(f"Estimated AllReduce time for 100MB: {comm_time:.2f}ms")
Kubernetes 拓扑感知调度
调度器扩展
"""
Kubernetes 拓扑感知调度器扩展
"""
from typing import Dict, List, Optional, Set
from dataclasses import dataclass
import json
import logging
@dataclass
class Node:
"""Kubernetes 节点"""
name: str
gpu_count: int
gpu_topology: Dict
numa_topology: Dict
allocatable_gpus: Set[int]
labels: Dict[str, str]
@dataclass
class Pod:
"""Pod 请求"""
name: str
namespace: str
gpu_request: int
topology_policy: str # none, single-numa, best-effort, restricted
anti_affinity_pods: List[str]
preferred_node: Optional[str]
@dataclass
class SchedulingResult:
"""调度结果"""
node_name: str
gpu_ids: List[int]
numa_node: Optional[int]
score: float
class TopologyAwareScheduler:
"""拓扑感知调度器"""
def __init__(self):
self.nodes: Dict[str, Node] = {}
self.logger = logging.getLogger(__name__)
def register_node(self, node: Node):
"""注册节点"""
self.nodes[node.name] = node
def filter_nodes(self, pod: Pod) -> List[Node]:
"""过滤符合条件的节点"""
filtered = []
for node in self.nodes.values():
# 检查 GPU 数量
if len(node.allocatable_gpus) < pod.gpu_request:
continue
# 检查拓扑策略
if pod.topology_policy == "single-numa":
# 必须在单个 NUMA 节点内
if not self._can_fit_single_numa(node, pod.gpu_request):
continue
# 检查反亲和性
if self._has_anti_affinity_conflict(node, pod):
continue
filtered.append(node)
return filtered
def _can_fit_single_numa(self, node: Node, gpu_count: int) -> bool:
"""检查是否可以在单个 NUMA 节点内放置"""
numa_topology = node.numa_topology
for numa_id, numa_gpus in numa_topology.items():
available = [g for g in numa_gpus if g in node.allocatable_gpus]
if len(available) >= gpu_count:
return True
return False
def _has_anti_affinity_conflict(self, node: Node, pod: Pod) -> bool:
"""检查反亲和性冲突"""
# 简化实现:检查节点标签
return False
def score_nodes(
self,
pod: Pod,
filtered_nodes: List[Node]
) -> List[SchedulingResult]:
"""对节点评分"""
results = []
for node in filtered_nodes:
# 计算最优 GPU 放置
gpu_ids, numa_node = self._find_optimal_gpus(node, pod)
if gpu_ids is None:
continue
# 计算得分
score = self._calculate_score(node, pod, gpu_ids, numa_node)
results.append(SchedulingResult(
node_name=node.name,
gpu_ids=gpu_ids,
numa_node=numa_node,
score=score
))
# 按得分排序
results.sort(key=lambda r: r.score, reverse=True)
return results
def _find_optimal_gpus(
self,
node: Node,
pod: Pod
) -> tuple:
"""找到最优 GPU 组合"""
gpu_count = pod.gpu_request
allocatable = list(node.allocatable_gpus)
if len(allocatable) < gpu_count:
return None, None
# 根据策略选择
if pod.topology_policy == "single-numa":
return self._find_single_numa_gpus(node, gpu_count)
elif pod.topology_policy == "best-effort":
return self._find_best_effort_gpus(node, gpu_count)
else:
# none 策略:简单选择
return allocatable[:gpu_count], None
def _find_single_numa_gpus(
self,
node: Node,
gpu_count: int
) -> tuple:
"""在单个 NUMA 节点内查找 GPU"""
numa_topology = node.numa_topology
best_numa = None
best_gpus = None
for numa_id, numa_gpus in numa_topology.items():
available = [g for g in numa_gpus if g in node.allocatable_gpus]
if len(available) >= gpu_count:
# 优先选择 GPU 数量刚好匹配的 NUMA 节点
if best_gpus is None or len(available) < len(best_gpus):
best_numa = numa_id
best_gpus = available[:gpu_count]
return best_gpus, best_numa
def _find_best_effort_gpus(
self,
node: Node,
gpu_count: int
) -> tuple:
"""尽力而为查找最优 GPU"""
# 使用拓扑分析器找到最优组合
topology = node.gpu_topology
allocatable = list(node.allocatable_gpus)
if len(allocatable) <= gpu_count:
return allocatable, None
# 贪心算法选择通信开销最小的 GPU
distance_matrix = topology.get("distance_matrix", {})
selected = [allocatable[0]]
while len(selected) < gpu_count:
best_gpu = None
best_cost = float('inf')
for gpu in allocatable:
if gpu in selected:
continue
# 计算到已选 GPU 的总距离
cost = sum(
distance_matrix.get(f"{gpu}-{s}", 1)
for s in selected
)
if cost < best_cost:
best_cost = cost
best_gpu = gpu
if best_gpu is not None:
selected.append(best_gpu)
else:
break
# 确定主要 NUMA 节点
numa_topology = node.numa_topology
numa_counts = {}
for numa_id, numa_gpus in numa_topology.items():
count = len([g for g in selected if g in numa_gpus])
numa_counts[numa_id] = count
primary_numa = max(numa_counts, key=numa_counts.get) if numa_counts else None
return selected, primary_numa
def _calculate_score(
self,
node: Node,
pod: Pod,
gpu_ids: List[int],
numa_node: Optional[int]
) -> float:
"""计算调度得分"""
score = 100.0
# 1. 拓扑亲和性得分 (0-40)
topology_score = self._topology_affinity_score(node, gpu_ids)
score += topology_score * 0.4
# 2. NUMA 亲和性得分 (0-30)
numa_score = self._numa_affinity_score(node, gpu_ids, numa_node)
score += numa_score * 0.3
# 3. 资源均衡得分 (0-20)
balance_score = self._resource_balance_score(node, gpu_ids)
score += balance_score * 0.2
# 4. 节点偏好得分 (0-10)
if pod.preferred_node == node.name:
score += 10
return score
def _topology_affinity_score(
self,
node: Node,
gpu_ids: List[int]
) -> float:
"""拓扑亲和性得分"""
if len(gpu_ids) < 2:
return 100.0
topology = node.gpu_topology
connections = topology.get("connections", [])
# 计算平均连接带宽
total_bandwidth = 0
count = 0
for i in range(len(gpu_ids)):
for j in range(i + 1, len(gpu_ids)):
for conn in connections:
if (conn["from"] == gpu_ids[i] and conn["to"] == gpu_ids[j]) or \
(conn["from"] == gpu_ids[j] and conn["to"] == gpu_ids[i]):
total_bandwidth += conn.get("bandwidth_gbps", 0)
count += 1
break
if count == 0:
return 50.0
avg_bandwidth = total_bandwidth / count
# 归一化到 0-100
max_bandwidth = 600 # NVLink
return min(100.0, (avg_bandwidth / max_bandwidth) * 100)
def _numa_affinity_score(
self,
node: Node,
gpu_ids: List[int],
numa_node: Optional[int]
) -> float:
"""NUMA 亲和性得分"""
if numa_node is None:
return 50.0
numa_topology = node.numa_topology
numa_gpus = numa_topology.get(numa_node, [])
# 计算 GPU 在同一 NUMA 节点的比例
same_numa_count = len([g for g in gpu_ids if g in numa_gpus])
ratio = same_numa_count / len(gpu_ids)
return ratio * 100
def _resource_balance_score(
self,
node: Node,
gpu_ids: List[int]
) -> float:
"""资源均衡得分"""
# 优先选择空闲资源较多的节点
remaining = len(node.allocatable_gpus) - len(gpu_ids)
total = node.gpu_count
# 剩余资源比例
ratio = remaining / total
# 适度利用得分更高 (避免碎片)
if ratio > 0.5:
return (1 - ratio) * 100
else:
return ratio * 100
def schedule(self, pod: Pod) -> Optional[SchedulingResult]:
"""执行调度"""
# 1. 过滤节点
filtered = self.filter_nodes(pod)
if not filtered:
self.logger.warning(f"No nodes available for pod {pod.name}")
return None
# 2. 评分
results = self.score_nodes(pod, filtered)
if not results:
return None
# 3. 选择最高分节点
best = results[0]
self.logger.info(
f"Scheduled pod {pod.name} to node {best.node_name} "
f"with GPUs {best.gpu_ids} (score: {best.score:.2f})"
)
return best
# Kubernetes 调度器扩展点实现
class KubernetesSchedulerExtender:
"""Kubernetes 调度器扩展器"""
def __init__(self, scheduler: TopologyAwareScheduler):
self.scheduler = scheduler
def filter(self, request: Dict) -> Dict:
"""Filter 扩展点"""
pod_info = request.get("Pod", {})
nodes = request.get("Nodes", {}).get("Items", [])
# 解析 Pod
pod = self._parse_pod(pod_info)
# 获取可用节点列表
node_names = [n["metadata"]["name"] for n in nodes]
# 过滤
filtered_nodes = self.scheduler.filter_nodes(pod)
filtered_names = [n.name for n in filtered_nodes if n.name in node_names]
return {
"Nodes": {
"Items": [n for n in nodes if n["metadata"]["name"] in filtered_names]
}
}
def prioritize(self, request: Dict) -> List[Dict]:
"""Prioritize 扩展点"""
pod_info = request.get("Pod", {})
nodes = request.get("Nodes", {}).get("Items", [])
pod = self._parse_pod(pod_info)
# 获取节点对象
node_objs = [self.scheduler.nodes.get(n["metadata"]["name"])
for n in nodes
if n["metadata"]["name"] in self.scheduler.nodes]
node_objs = [n for n in node_objs if n is not None]
# 评分
results = self.scheduler.score_nodes(pod, node_objs)
# 转换为 Kubernetes 格式 (0-10)
priorities = []
for result in results:
priorities.append({
"Host": result.node_name,
"Score": int(result.score / 10) # 归一化到 0-10
})
return priorities
def bind(self, request: Dict) -> Dict:
"""Bind 扩展点"""
pod_info = request.get("Pod", {})
node_name = request.get("Node", "")
pod = self._parse_pod(pod_info)
node = self.scheduler.nodes.get(node_name)
if not node:
return {"Error": f"Node {node_name} not found"}
# 执行调度
gpu_ids, numa_node = self.scheduler._find_optimal_gpus(node, pod)
if gpu_ids is None:
return {"Error": "No suitable GPUs found"}
# 生成绑定结果
binding = {
"apiVersion": "v1",
"kind": "Binding",
"metadata": {
"name": pod.name,
"namespace": pod.namespace,
"annotations": {
"scheduling.k8s.io/gpu-ids": ",".join(map(str, gpu_ids)),
"scheduling.k8s.io/numa-node": str(numa_node) if numa_node else ""
}
},
"target": {
"apiVersion": "v1",
"kind": "Node",
"name": node_name
}
}
# 更新节点可用 GPU
node.allocatable_gpus -= set(gpu_ids)
return binding
def _parse_pod(self, pod_info: Dict) -> Pod:
"""解析 Pod 信息"""
metadata = pod_info.get("metadata", {})
spec = pod_info.get("spec", {})
annotations = metadata.get("annotations", {})
# 获取 GPU 请求
gpu_request = 0
for container in spec.get("containers", []):
resources = container.get("resources", {})
limits = resources.get("limits", {})
gpu_request += int(limits.get("nvidia.com/gpu", 0))
# 获取拓扑策略
topology_policy = annotations.get(
"scheduling.k8s.io/topology-policy",
"best-effort"
)
return Pod(
name=metadata.get("name", ""),
namespace=metadata.get("namespace", "default"),
gpu_request=gpu_request,
topology_policy=topology_policy,
anti_affinity_pods=[],
preferred_node=None
)
# 使用示例
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# 创建调度器
scheduler = TopologyAwareScheduler()
# 注册节点
node1 = Node(
name="gpu-node-1",
gpu_count=8,
gpu_topology={
"connections": [
{"from": 0, "to": 1, "type": "NVSwitch", "bandwidth_gbps": 600},
{"from": 0, "to": 2, "type": "NVSwitch", "bandwidth_gbps": 600},
{"from": 0, "to": 3, "type": "NVSwitch", "bandwidth_gbps": 600},
# ... 更多连接
]
},
numa_topology={
0: [0, 1, 2, 3],
1: [4, 5, 6, 7]
},
allocatable_gpus={0, 1, 2, 3, 4, 5, 6, 7},
labels={"accelerator": "a100"}
)
scheduler.register_node(node1)
# 调度 Pod
pod = Pod(
name="training-job",
namespace="ml",
gpu_request=4,
topology_policy="single-numa",
anti_affinity_pods=[],
preferred_node=None
)
result = scheduler.schedule(pod)
if result:
print(f"Scheduled to: {result.node_name}")
print(f"GPU IDs: {result.gpu_ids}")
print(f"NUMA Node: {result.numa_node}")
print(f"Score: {result.score}")
Kubernetes 配置示例
# GPU 拓扑调度器配置
---
apiVersion: v1
kind: ConfigMap
metadata:
name: topology-scheduler-config
namespace: kube-system
data:
scheduler-config.yaml: |
apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
profiles:
- schedulerName: topology-aware-scheduler
plugins:
filter:
enabled:
- name: NodeResourcesFit
- name: TopologyAwareFilter
score:
enabled:
- name: NodeResourcesBalancedAllocation
weight: 1
- name: TopologyAwareScore
weight: 5
- name: NUMAAffinityScore
weight: 3
pluginConfig:
- name: TopologyAwareFilter
args:
# 拓扑策略支持
supportedPolicies:
- none
- single-numa
- best-effort
- restricted
- name: TopologyAwareScore
args:
# 评分权重
bandwidthWeight: 0.4
latencyWeight: 0.3
fragmentationWeight: 0.3
- name: NUMAAffinityScore
args:
# NUMA 亲和性评分
sameNUMABonus: 50
crossNUMAPenalty: 20
---
# 调度器扩展器配置
apiVersion: v1
kind: ConfigMap
metadata:
name: scheduler-extender-config
namespace: kube-system
data:
extender-config.yaml: |
apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
extenders:
- urlPrefix: "http://topology-scheduler-extender:8888"
filterVerb: "filter"
prioritizeVerb: "prioritize"
bindVerb: "bind"
weight: 10
enableHTTPS: false
nodeCacheCapable: true
managedResources:
- name: nvidia.com/gpu
ignoredByScheduler: false
- name: huawei.com/Ascend910B
ignoredByScheduler: false
ignorable: false
---
# 拓扑感知调度的 Pod 示例
apiVersion: v1
kind: Pod
metadata:
name: distributed-training
namespace: ml-jobs
annotations:
# 拓扑策略
scheduling.k8s.io/topology-policy: "single-numa"
# NUMA 亲和性
scheduling.k8s.io/numa-affinity: "required"
spec:
schedulerName: topology-aware-scheduler
containers:
- name: trainer
image: pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime
resources:
limits:
nvidia.com/gpu: 4
memory: "128Gi"
requests:
nvidia.com/gpu: 4
memory: "128Gi"
env:
- name: CUDA_VISIBLE_DEVICES
valueFrom:
fieldRef:
fieldPath: metadata.annotations['scheduling.k8s.io/gpu-ids']
nodeSelector:
accelerator: nvidia-a100
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
---
# 节点拓扑标签
# 由节点代理自动发现和更新
apiVersion: v1
kind: Node
metadata:
name: gpu-node-1
labels:
# 基础标签
accelerator: nvidia-a100
gpu-count: "8"
# 拓扑标签
topology.kubernetes.io/gpu-interconnect: nvswitch
topology.kubernetes.io/numa-nodes: "2"
topology.kubernetes.io/gpus-per-numa: "4"
# 能力标签
topology.kubernetes.io/single-numa-4gpu: "true"
topology.kubernetes.io/single-numa-8gpu: "false"
annotations:
# 详细拓扑信息
topology.kubernetes.io/gpu-topology: |
{
"gpus": [
{"id": 0, "numa": 0, "pcie": "0000:3b:00.0"},
{"id": 1, "numa": 0, "pcie": "0000:86:00.0"},
{"id": 2, "numa": 0, "pcie": "0000:af:00.0"},
{"id": 3, "numa": 0, "pcie": "0000:d8:00.0"},
{"id": 4, "numa": 1, "pcie": "0000:3c:00.0"},
{"id": 5, "numa": 1, "pcie": "0000:87:00.0"},
{"id": 6, "numa": 1, "pcie": "0000:b0:00.0"},
{"id": 7, "numa": 1, "pcie": "0000:d9:00.0"}
],
"nvlinks": [
{"from": 0, "to": 1, "bandwidth_gbps": 600},
{"from": 0, "to": 2, "bandwidth_gbps": 600}
]
}
跨节点拓扑优化
集群级拓扑分析
"""
集群级拓扑分析与优化
"""
from typing import Dict, List, Set, Tuple, Optional
from dataclasses import dataclass, field
import networkx as nx
import numpy as np
@dataclass
class ClusterNode:
"""集群节点"""
name: str
rack: str
switch: str
gpu_count: int
network_bandwidth_gbps: float # 网络带宽
@dataclass
class NetworkLink:
"""网络链路"""
node_a: str
node_b: str
bandwidth_gbps: float
latency_ms: float
hops: int
class ClusterTopologyAnalyzer:
"""集群拓扑分析器"""
def __init__(self):
self.nodes: Dict[str, ClusterNode] = {}
self.links: List[NetworkLink] = []
self.graph = nx.Graph()
def add_node(self, node: ClusterNode):
"""添加节点"""
self.nodes[node.name] = node
self.graph.add_node(node.name, **{
"rack": node.rack,
"switch": node.switch,
"gpu_count": node.gpu_count
})
def add_link(self, link: NetworkLink):
"""添加链路"""
self.links.append(link)
self.graph.add_edge(
link.node_a,
link.node_b,
bandwidth=link.bandwidth_gbps,
latency=link.latency_ms,
hops=link.hops
)
def get_network_distance(self, node_a: str, node_b: str) -> float:
"""获取网络距离"""
if node_a == node_b:
return 0
if not self.graph.has_edge(node_a, node_b):
# 计算最短路径
try:
path = nx.shortest_path(
self.graph, node_a, node_b,
weight='latency'
)
# 累计延迟
total_latency = 0
for i in range(len(path) - 1):
edge_data = self.graph.edges[path[i], path[i+1]]
total_latency += edge_data.get('latency', 1)
return total_latency
except nx.NetworkXNoPath:
return float('inf')
edge_data = self.graph.edges[node_a, node_b]
return edge_data.get('latency', 1)
def get_bandwidth(self, node_a: str, node_b: str) -> float:
"""获取节点间带宽"""
if node_a == node_b:
return float('inf')
if not self.graph.has_edge(node_a, node_b):
try:
path = nx.shortest_path(self.graph, node_a, node_b)
# 瓶颈带宽
min_bandwidth = float('inf')
for i in range(len(path) - 1):
edge_data = self.graph.edges[path[i], path[i+1]]
min_bandwidth = min(min_bandwidth, edge_data.get('bandwidth', 0))
return min_bandwidth
except nx.NetworkXNoPath:
return 0
edge_data = self.graph.edges[node_a, node_b]
return edge_data.get('bandwidth', 0)
def find_optimal_node_set(
self,
node_count: int,
gpu_per_node: int,
locality_preference: str = "rack" # rack, switch, any
) -> List[str]:
"""
找到最优节点集合
Args:
node_count: 需要的节点数
gpu_per_node: 每节点 GPU 数
locality_preference: 局部性偏好
"""
# 过滤满足 GPU 要求的节点
candidates = [
name for name, node in self.nodes.items()
if node.gpu_count >= gpu_per_node
]
if len(candidates) < node_count:
raise ValueError(f"Not enough nodes with {gpu_per_node} GPUs")
if locality_preference == "rack":
return self._find_same_rack_nodes(candidates, node_count)
elif locality_preference == "switch":
return self._find_same_switch_nodes(candidates, node_count)
else:
return self._find_lowest_latency_nodes(candidates, node_count)
def _find_same_rack_nodes(
self,
candidates: List[str],
count: int
) -> List[str]:
"""在同一机架内查找节点"""
# 按机架分组
rack_nodes: Dict[str, List[str]] = {}
for name in candidates:
rack = self.nodes[name].rack
if rack not in rack_nodes:
rack_nodes[rack] = []
rack_nodes[rack].append(name)
# 找最大的机架
for rack, nodes in sorted(rack_nodes.items(), key=lambda x: -len(x[1])):
if len(nodes) >= count:
return nodes[:count]
# 没有单一机架满足,使用最近的机架组合
return self._find_lowest_latency_nodes(candidates, count)
def _find_same_switch_nodes(
self,
candidates: List[str],
count: int
) -> List[str]:
"""在同一交换机下查找节点"""
switch_nodes: Dict[str, List[str]] = {}
for name in candidates:
switch = self.nodes[name].switch
if switch not in switch_nodes:
switch_nodes[switch] = []
switch_nodes[switch].append(name)
for switch, nodes in sorted(switch_nodes.items(), key=lambda x: -len(x[1])):
if len(nodes) >= count:
return nodes[:count]
return self._find_lowest_latency_nodes(candidates, count)
def _find_lowest_latency_nodes(
self,
candidates: List[str],
count: int
) -> List[str]:
"""找到延迟最低的节点组合"""
if len(candidates) <= count:
return candidates
# 使用贪心算法
selected = [candidates[0]]
while len(selected) < count:
best_node = None
best_total_latency = float('inf')
for node in candidates:
if node in selected:
continue
# 计算到已选节点的总延迟
total_latency = sum(
self.get_network_distance(node, s)
for s in selected
)
if total_latency < best_total_latency:
best_total_latency = total_latency
best_node = node
if best_node:
selected.append(best_node)
else:
break
return selected
def estimate_allreduce_time(
self,
nodes: List[str],
data_size_mb: float,
algorithm: str = "ring"
) -> float:
"""
估算 AllReduce 时间
Args:
nodes: 参与的节点
data_size_mb: 数据大小
algorithm: ring | tree | recursive_halving
"""
if len(nodes) < 2:
return 0
n = len(nodes)
data_size_gb = data_size_mb / 1024
if algorithm == "ring":
# Ring AllReduce: 2 * (n-1) * data_size / (n * bandwidth)
# 找瓶颈带宽
min_bandwidth = float('inf')
for i in range(len(nodes)):
j = (i + 1) % len(nodes)
bw = self.get_bandwidth(nodes[i], nodes[j])
min_bandwidth = min(min_bandwidth, bw)
if min_bandwidth == 0:
return float('inf')
time_s = 2 * (n - 1) * data_size_gb / (n * min_bandwidth)
return time_s * 1000 # ms
elif algorithm == "tree":
# 树形 AllReduce: 2 * log2(n) * data_size / bandwidth
import math
depth = math.ceil(math.log2(n))
avg_bandwidth = np.mean([
self.get_bandwidth(nodes[i], nodes[j])
for i in range(len(nodes))
for j in range(i + 1, len(nodes))
])
time_s = 2 * depth * data_size_gb / avg_bandwidth
return time_s * 1000
else:
# 简单估算
avg_latency = np.mean([
self.get_network_distance(nodes[i], nodes[j])
for i in range(len(nodes))
for j in range(i + 1, len(nodes))
])
return avg_latency * n
def visualize_topology(self) -> str:
"""可视化拓扑结构"""
lines = ["Cluster Topology:"]
lines.append("=" * 50)
# 按机架分组
racks: Dict[str, List[str]] = {}
for name, node in self.nodes.items():
if node.rack not in racks:
racks[node.rack] = []
racks[node.rack].append(name)
for rack, nodes in sorted(racks.items()):
lines.append(f"\nRack: {rack}")
lines.append("-" * 30)
for node_name in nodes:
node = self.nodes[node_name]
lines.append(
f" {node_name}: {node.gpu_count} GPUs, "
f"Switch: {node.switch}"
)
return "\n".join(lines)
class JobPlacement:
"""作业放置优化"""
def __init__(self, cluster: ClusterTopologyAnalyzer):
self.cluster = cluster
def optimize_placement(
self,
job_requirements: Dict
) -> Dict:
"""
优化作业放置
Args:
job_requirements: {
"total_gpus": 32,
"gpus_per_node": 8,
"communication_heavy": True,
"data_locality": "path/to/data"
}
"""
total_gpus = job_requirements.get("total_gpus", 8)
gpus_per_node = job_requirements.get("gpus_per_node", 8)
communication_heavy = job_requirements.get("communication_heavy", False)
node_count = (total_gpus + gpus_per_node - 1) // gpus_per_node
# 选择局部性策略
if communication_heavy:
locality = "switch" # 通信密集型优先同交换机
else:
locality = "rack"
# 找最优节点
nodes = self.cluster.find_optimal_node_set(
node_count=node_count,
gpu_per_node=gpus_per_node,
locality_preference=locality
)
# 估算通信时间
comm_time = self.cluster.estimate_allreduce_time(
nodes=nodes,
data_size_mb=100,
algorithm="ring"
)
return {
"nodes": nodes,
"node_count": len(nodes),
"gpus_per_node": gpus_per_node,
"total_gpus": len(nodes) * gpus_per_node,
"estimated_allreduce_100mb_ms": comm_time,
"placement_quality": self._evaluate_placement(nodes)
}
def _evaluate_placement(self, nodes: List[str]) -> str:
"""评估放置质量"""
if len(nodes) < 2:
return "optimal"
# 检查是否在同一机架
racks = set(self.cluster.nodes[n].rack for n in nodes)
if len(racks) == 1:
return "optimal (same rack)"
# 检查是否在同一交换机
switches = set(self.cluster.nodes[n].switch for n in nodes)
if len(switches) == 1:
return "good (same switch)"
if len(racks) <= 2:
return "acceptable (adjacent racks)"
return "suboptimal (distributed)"
# 生成示例集群拓扑
def create_sample_cluster() -> ClusterTopologyAnalyzer:
"""创建示例集群"""
cluster = ClusterTopologyAnalyzer()
# 添加节点 (2 个机架,每个机架 4 个节点)
for rack_id in range(2):
for node_id in range(4):
name = f"node-{rack_id}-{node_id}"
cluster.add_node(ClusterNode(
name=name,
rack=f"rack-{rack_id}",
switch=f"tor-{rack_id}",
gpu_count=8,
network_bandwidth_gbps=100
))
# 添加链路
# 同机架内 (低延迟)
for rack_id in range(2):
for i in range(4):
for j in range(i + 1, 4):
node_a = f"node-{rack_id}-{i}"
node_b = f"node-{rack_id}-{j}"
cluster.add_link(NetworkLink(
node_a=node_a,
node_b=node_b,
bandwidth_gbps=100,
latency_ms=0.1,
hops=1
))
# 跨机架 (高延迟)
for i in range(4):
for j in range(4):
node_a = f"node-0-{i}"
node_b = f"node-1-{j}"
cluster.add_link(NetworkLink(
node_a=node_a,
node_b=node_b,
bandwidth_gbps=50,
latency_ms=0.5,
hops=3
))
return cluster
# 使用示例
if __name__ == "__main__":
# 创建集群
cluster = create_sample_cluster()
print(cluster.visualize_topology())
# 作业放置优化
placement = JobPlacement(cluster)
# 32 GPU 通信密集型作业
result = placement.optimize_placement({
"total_gpus": 32,
"gpus_per_node": 8,
"communication_heavy": True
})
print("\n" + "=" * 50)
print("Job Placement Result:")
print(f"Nodes: {result['nodes']}")
print(f"Total GPUs: {result['total_gpus']}")
print(f"AllReduce Time (100MB): {result['estimated_allreduce_100mb_ms']:.2f}ms")
print(f"Quality: {result['placement_quality']}")
NUMA 亲和性优化
NUMA 感知内存管理
"""
NUMA 感知内存管理
"""
import os
from typing import Dict, List, Optional, Set
from dataclasses import dataclass
import subprocess
@dataclass
class NUMAInfo:
"""NUMA 信息"""
node_id: int
cpus: List[int]
memory_total_mb: int
memory_free_mb: int
gpus: List[int]
distances: Dict[int, int] # 到其他 NUMA 节点的距离
class NUMAManager:
"""NUMA 管理器"""
def __init__(self):
self.numa_nodes: Dict[int, NUMAInfo] = {}
self._discover_numa_topology()
def _discover_numa_topology(self):
"""发现 NUMA 拓扑"""
try:
# 获取 NUMA 节点数
result = subprocess.run(
["numactl", "--hardware"],
capture_output=True, text=True
)
if result.returncode == 0:
self._parse_numactl_output(result.stdout)
else:
# 模拟配置
self._simulate_numa_config()
except FileNotFoundError:
self._simulate_numa_config()
# 发现 GPU 到 NUMA 的映射
self._discover_gpu_numa_mapping()
def _parse_numactl_output(self, output: str):
"""解析 numactl 输出"""
lines = output.strip().split('\n')
current_node = None
for line in lines:
if line.startswith("node"):
parts = line.split()
if "cpus:" in line:
node_id = int(parts[1])
cpus = [int(c) for c in parts[3:]]
if node_id not in self.numa_nodes:
self.numa_nodes[node_id] = NUMAInfo(
node_id=node_id,
cpus=cpus,
memory_total_mb=0,
memory_free_mb=0,
gpus=[],
distances={}
)
else:
self.numa_nodes[node_id].cpus = cpus
elif "size:" in line:
node_id = int(parts[1])
size_mb = int(parts[3])
if node_id in self.numa_nodes:
self.numa_nodes[node_id].memory_total_mb = size_mb
elif "free:" in line:
node_id = int(parts[1])
free_mb = int(parts[3])
if node_id in self.numa_nodes:
self.numa_nodes[node_id].memory_free_mb = free_mb
elif "node distances:" in line.lower():
# 解析距离矩阵
pass
def _simulate_numa_config(self):
"""模拟 NUMA 配置"""
# 2 个 NUMA 节点的典型配置
self.numa_nodes = {
0: NUMAInfo(
node_id=0,
cpus=list(range(0, 64)),
memory_total_mb=512 * 1024,
memory_free_mb=400 * 1024,
gpus=[0, 1, 2, 3],
distances={0: 10, 1: 21}
),
1: NUMAInfo(
node_id=1,
cpus=list(range(64, 128)),
memory_total_mb=512 * 1024,
memory_free_mb=400 * 1024,
gpus=[4, 5, 6, 7],
distances={0: 21, 1: 10}
)
}
def _discover_gpu_numa_mapping(self):
"""发现 GPU 到 NUMA 的映射"""
try:
result = subprocess.run(
["nvidia-smi", "topo", "-m"],
capture_output=True, text=True
)
if result.returncode == 0:
# 解析 NUMA 亲和性
pass
except FileNotFoundError:
pass
def get_gpu_numa_node(self, gpu_id: int) -> Optional[int]:
"""获取 GPU 所属的 NUMA 节点"""
for node_id, numa_info in self.numa_nodes.items():
if gpu_id in numa_info.gpus:
return node_id
return None
def get_optimal_cpus(
self,
gpu_ids: List[int],
cpu_count: int
) -> List[int]:
"""
获取与 GPU 亲和的最优 CPU 列表
优先选择与 GPU 同一 NUMA 节点的 CPU
"""
# 统计 GPU 在各 NUMA 节点的分布
numa_gpu_count: Dict[int, int] = {}
for gpu_id in gpu_ids:
numa_node = self.get_gpu_numa_node(gpu_id)
if numa_node is not None:
numa_gpu_count[numa_node] = numa_gpu_count.get(numa_node, 0) + 1
# 按 GPU 数量排序 NUMA 节点
sorted_numas = sorted(
numa_gpu_count.items(),
key=lambda x: -x[1]
)
selected_cpus = []
for numa_id, _ in sorted_numas:
numa_info = self.numa_nodes.get(numa_id)
if numa_info:
available = [c for c in numa_info.cpus if c not in selected_cpus]
need = cpu_count - len(selected_cpus)
selected_cpus.extend(available[:need])
if len(selected_cpus) >= cpu_count:
break
# 如果还不够,从其他 NUMA 节点补充
if len(selected_cpus) < cpu_count:
for numa_id, numa_info in self.numa_nodes.items():
if numa_id in numa_gpu_count:
continue
available = [c for c in numa_info.cpus if c not in selected_cpus]
need = cpu_count - len(selected_cpus)
selected_cpus.extend(available[:need])
if len(selected_cpus) >= cpu_count:
break
return selected_cpus[:cpu_count]
def generate_cpu_affinity_command(
self,
gpu_ids: List[int],
command: List[str],
cpu_count: int = 16
) -> List[str]:
"""生成带 CPU 亲和性的命令"""
cpus = self.get_optimal_cpus(gpu_ids, cpu_count)
cpu_list = ",".join(map(str, cpus))
return [
"numactl",
f"--cpunodebind={self._get_numa_nodes_for_cpus(cpus)}",
f"--membind={self._get_numa_nodes_for_cpus(cpus)}",
"--"
] + command
def _get_numa_nodes_for_cpus(self, cpus: List[int]) -> str:
"""获取 CPU 所属的 NUMA 节点"""
numa_nodes = set()
for cpu in cpus:
for node_id, numa_info in self.numa_nodes.items():
if cpu in numa_info.cpus:
numa_nodes.add(node_id)
break
return ",".join(map(str, sorted(numa_nodes)))
# Kubernetes NUMA 感知配置
class KubernetesNUMAConfig:
"""Kubernetes NUMA 感知配置生成"""
@staticmethod
def generate_topology_manager_config() -> Dict:
"""生成拓扑管理器配置"""
return {
"apiVersion": "kubelet.config.k8s.io/v1beta1",
"kind": "KubeletConfiguration",
"topologyManagerPolicy": "best-effort", # none, best-effort, restricted, single-numa-node
"topologyManagerScope": "pod", # container, pod
"cpuManagerPolicy": "static",
"cpuManagerPolicyOptions": {
"full-pcpus-only": "true",
"distribute-cpus-across-numa": "false"
},
"memoryManagerPolicy": "Static",
"reservedMemory": [
{
"numaNode": 0,
"limits": {
"memory": "1Gi"
}
},
{
"numaNode": 1,
"limits": {
"memory": "1Gi"
}
}
]
}
@staticmethod
def generate_numa_aware_pod(
name: str,
image: str,
gpu_count: int,
cpu_count: int,
memory: str
) -> Dict:
"""生成 NUMA 感知的 Pod 配置"""
return {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": name,
"annotations": {
# NUMA 策略注解
"topology.kubernetes.io/policy": "single-numa-node"
}
},
"spec": {
"containers": [{
"name": "main",
"image": image,
"resources": {
"limits": {
"nvidia.com/gpu": str(gpu_count),
"cpu": str(cpu_count),
"memory": memory
},
"requests": {
"nvidia.com/gpu": str(gpu_count),
"cpu": str(cpu_count),
"memory": memory
}
}
}],
# 确保资源在同一 NUMA 节点
"topologySpreadConstraints": [{
"maxSkew": 1,
"topologyKey": "topology.kubernetes.io/zone",
"whenUnsatisfiable": "DoNotSchedule"
}]
}
}
# 使用示例
if __name__ == "__main__":
numa_mgr = NUMAManager()
# 获取最优 CPU
gpu_ids = [0, 1, 2, 3]
optimal_cpus = numa_mgr.get_optimal_cpus(gpu_ids, cpu_count=32)
print(f"Optimal CPUs for GPUs {gpu_ids}: {optimal_cpus}")
# 生成带亲和性的命令
cmd = numa_mgr.generate_cpu_affinity_command(
gpu_ids=gpu_ids,
command=["python", "train.py"],
cpu_count=32
)
print(f"Command with NUMA affinity: {' '.join(cmd)}")
# Kubernetes 配置
k8s_config = KubernetesNUMAConfig.generate_numa_aware_pod(
name="training-pod",
image="pytorch/pytorch:latest",
gpu_count=4,
cpu_count=32,
memory="128Gi"
)
import yaml
print("\nKubernetes Pod Config:")
print(yaml.dump(k8s_config))
最佳实践
拓扑优化检查清单
# 拓扑感知调度最佳实践
topology_optimization:
# 节点级优化
node_level:
- name: "启用 NUMA 感知"
config:
kubelet:
topologyManagerPolicy: "best-effort"
cpuManagerPolicy: "static"
- name: "GPU 拓扑发现"
tools:
- "nvidia-smi topo -m"
- "nvidia-smi nvlink --status"
automation: "使用 GPU Feature Discovery"
- name: "网络拓扑标注"
labels:
- "topology.kubernetes.io/rack"
- "topology.kubernetes.io/switch"
- "topology.kubernetes.io/zone"
# 集群级优化
cluster_level:
- name: "使用拓扑感知调度器"
scheduler: "topology-aware-scheduler"
config:
plugins:
- TopologyAwareFilter
- TopologyAwareScore
- name: "作业放置优化"
strategies:
communication_heavy: "same-switch"
compute_heavy: "spread"
mixed: "best-effort"
- name: "Gang Scheduling"
tools:
- "Volcano"
- "Kubernetes Gang Scheduling"
# 应用级优化
application_level:
- name: "通信模式优化"
patterns:
- ring_allreduce: "网络拓扑匹配"
- hierarchical_allreduce: "多级聚合"
- pipeline_parallel: "考虑带宽"
- name: "数据放置"
strategies:
- "数据本地性"
- "缓存预热"
---
# 监控与告警
monitoring:
metrics:
- "GPU 到 GPU 通信延迟"
- "NUMA 跨节点内存访问"
- "网络带宽利用率"
- "AllReduce 时间"
alerts:
- name: "拓扑不匹配告警"
condition: "GPU 分布跨越多个 NUMA 节点"
severity: "warning"
- name: "通信瓶颈告警"
condition: "AllReduce 时间 > 阈值"
severity: "critical"
总结
设备拓扑感知调度是大规模 AI 训练的关键优化:
- 节点内拓扑:NVLink/NVSwitch 连接、NUMA 亲和性
- 集群拓扑:机架、交换机、网络带宽
- 调度策略:局部性优先、通信开销最小化
- NUMA 优化:CPU 亲和性、内存绑定
通过拓扑感知调度,可以显著提升分布式训练的通信效率和整体性能。