HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

设备拓扑感知调度

概述

在大规模 AI 集群中,加速器之间的拓扑关系对训练性能有重大影响。拓扑感知调度通过理解设备互联关系、NUMA 亲和性等,优化任务放置,最大化通信效率。本文深入探讨拓扑感知调度的原理与实现。

拓扑结构分析

GPU 服务器拓扑

┌─────────────────────────────────────────────────────────────────┐
│               典型 8-GPU 服务器拓扑 (DGX A100)                     │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  NUMA Node 0                          NUMA Node 1               │
│  ┌────────────────────────┐          ┌────────────────────────┐ │
│  │                        │          │                        │ │
│  │  ┌──────┐  ┌──────┐   │          │   ┌──────┐  ┌──────┐  │ │
│  │  │ CPU  │  │ CPU  │   │          │   │ CPU  │  │ CPU  │  │ │
│  │  │  0   │  │  1   │   │          │   │  2   │  │  3   │  │ │
│  │  └──┬───┘  └──┬───┘   │          │   └──┬───┘  └──┬───┘  │ │
│  │     │         │       │          │      │         │       │ │
│  │  ┌──┴─────────┴──┐    │          │   ┌──┴─────────┴──┐   │ │
│  │  │   Memory      │    │          │   │   Memory      │   │ │
│  │  │   512GB       │    │          │   │   512GB       │   │ │
│  │  └───────────────┘    │          │   └───────────────┘   │ │
│  │         │             │          │          │            │ │
│  │  ═══════╪═════════    │    QPI   │    ══════╪═════════   │ │
│  │         │             │◄────────►│          │            │ │
│  │  ┌──────┴──────┐      │          │   ┌──────┴──────┐     │ │
│  │  │   PCIe      │      │          │   │   PCIe      │     │ │
│  │  │   Switch    │      │          │   │   Switch    │     │ │
│  │  └──────┬──────┘      │          │   └──────┬──────┘     │ │
│  │         │             │          │          │            │ │
│  └─────────┼─────────────┘          └──────────┼────────────┘ │
│            │                                   │               │
│  ══════════╪═══════════════════════════════════╪═══════════   │
│            │          NVSwitch Fabric          │               │
│  ══════════╪═══════════════════════════════════╪═══════════   │
│            │                                   │               │
│  ┌─────────┼───────────────────────────────────┼─────────┐    │
│  │         │                                   │         │    │
│  │  ┌──────┴──────┐                     ┌──────┴──────┐ │    │
│  │  │  NVSwitch   │◄───────────────────►│  NVSwitch   │ │    │
│  │  │     0       │                     │     1       │ │    │
│  │  └──────┬──────┘                     └──────┬──────┘ │    │
│  │         │                                   │         │    │
│  │  ┌──────┴──────┐                     ┌──────┴──────┐ │    │
│  │  │  NVSwitch   │◄───────────────────►│  NVSwitch   │ │    │
│  │  │     2       │                     │     3       │ │    │
│  │  └──────┬──────┘                     └──────┬──────┘ │    │
│  │         │                                   │         │    │
│  └─────────┼───────────────────────────────────┼─────────┘    │
│            │                                   │               │
│  ┌─────────┴──────┐     ┌─────────┐     ┌──────┴─────────┐    │
│  │   GPU Group 0  │     │   ...   │     │   GPU Group 1  │    │
│  │  ┌────┐ ┌────┐│     │         │     │┌────┐ ┌────┐   │    │
│  │  │GPU0│ │GPU1││◄───►│NVLink   │◄───►││GPU4│ │GPU5│   │    │
│  │  └────┘ └────┘│     │600GB/s  │     │└────┘ └────┘   │    │
│  │  ┌────┐ ┌────┐│     │         │     │┌────┐ ┌────┐   │    │
│  │  │GPU2│ │GPU3││◄───►│per pair │◄───►││GPU6│ │GPU7│   │    │
│  │  └────┘ └────┘│     │         │     │└────┘ └────┘   │    │
│  └───────────────┘     └─────────┘     └────────────────┘    │
│                                                                 │
│  带宽层次:                                                       │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │  NVLink (同 NVSwitch): 600 GB/s (双向)                    │  │
│  │  NVLink (跨 NVSwitch): 300 GB/s                          │  │
│  │  PCIe Gen4 x16:       32 GB/s                            │  │
│  │  跨 NUMA (QPI):       ~25 GB/s                           │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

拓扑距离矩阵

"""
GPU 拓扑分析与距离计算
"""
import numpy as np
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum
import subprocess
import re


class ConnectionType(Enum):
    """连接类型"""
    NVLINK = "NVLink"
    NVSWITCH = "NVSwitch"
    PCIE = "PCIe"
    SYS = "SYS"  # 跨 NUMA
    NODE = "NODE"  # 跨节点


@dataclass
class GPUConnection:
    """GPU 连接信息"""
    gpu_a: int
    gpu_b: int
    connection_type: ConnectionType
    bandwidth_gbps: float
    latency_us: float


@dataclass
class NUMANode:
    """NUMA 节点"""
    node_id: int
    cpus: List[int]
    memory_mb: int
    gpus: List[int]


class TopologyAnalyzer:
    """拓扑分析器"""

    # 典型带宽值 (GB/s)
    BANDWIDTH = {
        ConnectionType.NVLINK: 600,    # A100 NVLink 4.0
        ConnectionType.NVSWITCH: 600,  # 通过 NVSwitch
        ConnectionType.PCIE: 32,       # PCIe Gen4 x16
        ConnectionType.SYS: 25,        # 跨 NUMA (QPI/UPI)
        ConnectionType.NODE: 12.5      # 跨节点 (InfiniBand HDR)
    }

    # 典型延迟值 (μs)
    LATENCY = {
        ConnectionType.NVLINK: 1,
        ConnectionType.NVSWITCH: 2,
        ConnectionType.PCIE: 5,
        ConnectionType.SYS: 10,
        ConnectionType.NODE: 50
    }

    def __init__(self):
        self.gpu_count = 0
        self.numa_nodes: List[NUMANode] = []
        self.connections: List[GPUConnection] = []
        self.distance_matrix: Optional[np.ndarray] = None

    def discover_topology(self) -> bool:
        """发现系统拓扑"""
        try:
            # 获取 GPU 拓扑
            self._discover_gpu_topology()
            # 获取 NUMA 拓扑
            self._discover_numa_topology()
            # 构建距离矩阵
            self._build_distance_matrix()
            return True
        except Exception as e:
            print(f"Topology discovery failed: {e}")
            return False

    def _discover_gpu_topology(self):
        """发现 GPU 拓扑"""
        try:
            # 使用 nvidia-smi topo
            result = subprocess.run(
                ["nvidia-smi", "topo", "-m"],
                capture_output=True, text=True
            )

            if result.returncode != 0:
                raise RuntimeError("nvidia-smi failed")

            # 解析输出
            lines = result.stdout.strip().split('\n')

            # 找到 GPU 行
            gpu_lines = [l for l in lines if l.startswith('GPU')]
            self.gpu_count = len(gpu_lines)

            # 解析连接矩阵
            for i, line in enumerate(gpu_lines):
                parts = line.split()
                if len(parts) < self.gpu_count + 1:
                    continue

                for j in range(self.gpu_count):
                    if i == j:
                        continue

                    conn_str = parts[j + 1]
                    conn_type = self._parse_connection_type(conn_str)

                    self.connections.append(GPUConnection(
                        gpu_a=i,
                        gpu_b=j,
                        connection_type=conn_type,
                        bandwidth_gbps=self.BANDWIDTH[conn_type],
                        latency_us=self.LATENCY[conn_type]
                    ))

        except FileNotFoundError:
            # 模拟 8-GPU 系统
            self._simulate_dgx_topology()

    def _parse_connection_type(self, conn_str: str) -> ConnectionType:
        """解析连接类型字符串"""
        conn_str = conn_str.upper()
        if "NV" in conn_str and "SWITCH" not in conn_str:
            return ConnectionType.NVLINK
        elif "NVS" in conn_str or "NVSWITCH" in conn_str:
            return ConnectionType.NVSWITCH
        elif "PIX" in conn_str or "PXB" in conn_str:
            return ConnectionType.PCIE
        elif "SYS" in conn_str:
            return ConnectionType.SYS
        elif "NODE" in conn_str:
            return ConnectionType.NODE
        else:
            return ConnectionType.PCIE

    def _simulate_dgx_topology(self):
        """模拟 DGX A100 拓扑"""
        self.gpu_count = 8

        # NVSwitch 全连接
        for i in range(8):
            for j in range(8):
                if i != j:
                    # 同一 NUMA 节点内的 GPU 使用 NVSwitch
                    if (i < 4 and j < 4) or (i >= 4 and j >= 4):
                        conn_type = ConnectionType.NVSWITCH
                    else:
                        # 跨 NUMA 也通过 NVSwitch,但需要额外跳转
                        conn_type = ConnectionType.NVSWITCH

                    self.connections.append(GPUConnection(
                        gpu_a=i,
                        gpu_b=j,
                        connection_type=conn_type,
                        bandwidth_gbps=self.BANDWIDTH[conn_type],
                        latency_us=self.LATENCY[conn_type]
                    ))

    def _discover_numa_topology(self):
        """发现 NUMA 拓扑"""
        try:
            # 获取 NUMA 信息
            result = subprocess.run(
                ["numactl", "--hardware"],
                capture_output=True, text=True
            )

            if result.returncode == 0:
                # 解析 numactl 输出
                pass

        except FileNotFoundError:
            pass

        # 默认配置
        if not self.numa_nodes:
            # 假设 2 个 NUMA 节点,每个 4 个 GPU
            self.numa_nodes = [
                NUMANode(
                    node_id=0,
                    cpus=list(range(0, 64)),
                    memory_mb=512 * 1024,
                    gpus=[0, 1, 2, 3]
                ),
                NUMANode(
                    node_id=1,
                    cpus=list(range(64, 128)),
                    memory_mb=512 * 1024,
                    gpus=[4, 5, 6, 7]
                )
            ]

    def _build_distance_matrix(self):
        """构建距离矩阵"""
        # 使用延迟作为距离度量
        self.distance_matrix = np.zeros((self.gpu_count, self.gpu_count))

        for conn in self.connections:
            # 距离 = 延迟 (标准化)
            distance = conn.latency_us / self.LATENCY[ConnectionType.NVLINK]
            self.distance_matrix[conn.gpu_a][conn.gpu_b] = distance

    def get_optimal_placement(self, num_gpus: int) -> List[int]:
        """
        获取最优 GPU 放置

        使用贪心算法选择通信开销最小的 GPU 组合
        """
        if num_gpus > self.gpu_count:
            raise ValueError(f"Requested {num_gpus} GPUs but only {self.gpu_count} available")

        if num_gpus == self.gpu_count:
            return list(range(self.gpu_count))

        # 贪心选择
        selected = [0]  # 从 GPU 0 开始

        while len(selected) < num_gpus:
            best_gpu = -1
            best_cost = float('inf')

            for gpu in range(self.gpu_count):
                if gpu in selected:
                    continue

                # 计算添加此 GPU 的总通信成本
                cost = sum(self.distance_matrix[gpu][s] for s in selected)

                if cost < best_cost:
                    best_cost = cost
                    best_gpu = gpu

            selected.append(best_gpu)

        return sorted(selected)

    def get_gpu_groups(self, group_size: int) -> List[List[int]]:
        """
        获取 GPU 分组 (用于数据并行)

        将 GPU 分成若干组,组内通信开销最小
        """
        if self.gpu_count % group_size != 0:
            raise ValueError(f"Cannot evenly divide {self.gpu_count} GPUs into groups of {group_size}")

        num_groups = self.gpu_count // group_size
        groups = []
        used = set()

        for _ in range(num_groups):
            # 选择一个起始 GPU
            start_gpu = -1
            for gpu in range(self.gpu_count):
                if gpu not in used:
                    start_gpu = gpu
                    break

            group = [start_gpu]
            used.add(start_gpu)

            # 贪心添加最近的 GPU
            while len(group) < group_size:
                best_gpu = -1
                best_cost = float('inf')

                for gpu in range(self.gpu_count):
                    if gpu in used:
                        continue

                    cost = sum(self.distance_matrix[gpu][g] for g in group)

                    if cost < best_cost:
                        best_cost = cost
                        best_gpu = gpu

                group.append(best_gpu)
                used.add(best_gpu)

            groups.append(sorted(group))

        return groups

    def get_numa_affinity(self, gpu_ids: List[int]) -> Dict[int, int]:
        """获取 GPU 的 NUMA 亲和性"""
        affinity = {}

        for gpu_id in gpu_ids:
            for numa_node in self.numa_nodes:
                if gpu_id in numa_node.gpus:
                    affinity[gpu_id] = numa_node.node_id
                    break

        return affinity

    def estimate_communication_time(
        self,
        gpu_ids: List[int],
        data_size_mb: float,
        pattern: str = "all_reduce"
    ) -> float:
        """
        估算通信时间

        Args:
            gpu_ids: 参与通信的 GPU
            data_size_mb: 数据大小 (MB)
            pattern: 通信模式 (all_reduce, all_gather, reduce_scatter)
        """
        if len(gpu_ids) < 2:
            return 0

        # 获取最慢的连接
        min_bandwidth = float('inf')
        for i in range(len(gpu_ids)):
            for j in range(i + 1, len(gpu_ids)):
                for conn in self.connections:
                    if conn.gpu_a == gpu_ids[i] and conn.gpu_b == gpu_ids[j]:
                        min_bandwidth = min(min_bandwidth, conn.bandwidth_gbps)
                        break

        # Ring AllReduce 通信量: 2 * (n-1)/n * data_size
        n = len(gpu_ids)
        if pattern == "all_reduce":
            effective_data = 2 * (n - 1) / n * data_size_mb
        elif pattern == "all_gather":
            effective_data = (n - 1) / n * data_size_mb
        else:
            effective_data = data_size_mb

        # 转换为 GB
        effective_data_gb = effective_data / 1024

        # 计算时间 (秒)
        time_s = effective_data_gb / min_bandwidth

        return time_s * 1000  # 返回毫秒

    def print_topology_matrix(self):
        """打印拓扑矩阵"""
        print("\nGPU Topology Matrix:")
        print("=" * 60)

        # 表头
        header = "     " + " ".join(f"GPU{i:2d}" for i in range(self.gpu_count))
        print(header)

        # 每行
        for i in range(self.gpu_count):
            row = f"GPU{i:2d}"
            for j in range(self.gpu_count):
                if i == j:
                    row += "    X "
                else:
                    # 找连接类型
                    conn_type = "?"
                    for conn in self.connections:
                        if conn.gpu_a == i and conn.gpu_b == j:
                            conn_type = conn.connection_type.value[:3]
                            break
                    row += f" {conn_type:>4}"
            print(row)

        print("=" * 60)

    def export_to_kubernetes_config(self) -> Dict:
        """导出为 Kubernetes 配置"""
        config = {
            "apiVersion": "v1",
            "kind": "ConfigMap",
            "metadata": {
                "name": "gpu-topology-config"
            },
            "data": {
                "topology.json": {
                    "gpu_count": self.gpu_count,
                    "numa_nodes": [
                        {
                            "node_id": n.node_id,
                            "gpus": n.gpus,
                            "cpus": n.cpus[:8] + ["..."]  # 简化
                        }
                        for n in self.numa_nodes
                    ],
                    "connections": [
                        {
                            "from": c.gpu_a,
                            "to": c.gpu_b,
                            "type": c.connection_type.value,
                            "bandwidth_gbps": c.bandwidth_gbps
                        }
                        for c in self.connections[:16]  # 简化
                    ]
                }
            }
        }
        return config


# 使用示例
if __name__ == "__main__":
    analyzer = TopologyAnalyzer()
    analyzer.discover_topology()

    print(f"Discovered {analyzer.gpu_count} GPUs")
    analyzer.print_topology_matrix()

    # 获取最优放置
    optimal_4gpu = analyzer.get_optimal_placement(4)
    print(f"\nOptimal 4-GPU placement: {optimal_4gpu}")

    # 获取分组
    groups = analyzer.get_gpu_groups(2)
    print(f"GPU groups (size=2): {groups}")

    # 估算通信时间
    comm_time = analyzer.estimate_communication_time(
        gpu_ids=list(range(8)),
        data_size_mb=100,
        pattern="all_reduce"
    )
    print(f"Estimated AllReduce time for 100MB: {comm_time:.2f}ms")

Kubernetes 拓扑感知调度

调度器扩展

"""
Kubernetes 拓扑感知调度器扩展
"""
from typing import Dict, List, Optional, Set
from dataclasses import dataclass
import json
import logging


@dataclass
class Node:
    """Kubernetes 节点"""
    name: str
    gpu_count: int
    gpu_topology: Dict
    numa_topology: Dict
    allocatable_gpus: Set[int]
    labels: Dict[str, str]


@dataclass
class Pod:
    """Pod 请求"""
    name: str
    namespace: str
    gpu_request: int
    topology_policy: str  # none, single-numa, best-effort, restricted
    anti_affinity_pods: List[str]
    preferred_node: Optional[str]


@dataclass
class SchedulingResult:
    """调度结果"""
    node_name: str
    gpu_ids: List[int]
    numa_node: Optional[int]
    score: float


class TopologyAwareScheduler:
    """拓扑感知调度器"""

    def __init__(self):
        self.nodes: Dict[str, Node] = {}
        self.logger = logging.getLogger(__name__)

    def register_node(self, node: Node):
        """注册节点"""
        self.nodes[node.name] = node

    def filter_nodes(self, pod: Pod) -> List[Node]:
        """过滤符合条件的节点"""
        filtered = []

        for node in self.nodes.values():
            # 检查 GPU 数量
            if len(node.allocatable_gpus) < pod.gpu_request:
                continue

            # 检查拓扑策略
            if pod.topology_policy == "single-numa":
                # 必须在单个 NUMA 节点内
                if not self._can_fit_single_numa(node, pod.gpu_request):
                    continue

            # 检查反亲和性
            if self._has_anti_affinity_conflict(node, pod):
                continue

            filtered.append(node)

        return filtered

    def _can_fit_single_numa(self, node: Node, gpu_count: int) -> bool:
        """检查是否可以在单个 NUMA 节点内放置"""
        numa_topology = node.numa_topology

        for numa_id, numa_gpus in numa_topology.items():
            available = [g for g in numa_gpus if g in node.allocatable_gpus]
            if len(available) >= gpu_count:
                return True

        return False

    def _has_anti_affinity_conflict(self, node: Node, pod: Pod) -> bool:
        """检查反亲和性冲突"""
        # 简化实现:检查节点标签
        return False

    def score_nodes(
        self,
        pod: Pod,
        filtered_nodes: List[Node]
    ) -> List[SchedulingResult]:
        """对节点评分"""
        results = []

        for node in filtered_nodes:
            # 计算最优 GPU 放置
            gpu_ids, numa_node = self._find_optimal_gpus(node, pod)

            if gpu_ids is None:
                continue

            # 计算得分
            score = self._calculate_score(node, pod, gpu_ids, numa_node)

            results.append(SchedulingResult(
                node_name=node.name,
                gpu_ids=gpu_ids,
                numa_node=numa_node,
                score=score
            ))

        # 按得分排序
        results.sort(key=lambda r: r.score, reverse=True)
        return results

    def _find_optimal_gpus(
        self,
        node: Node,
        pod: Pod
    ) -> tuple:
        """找到最优 GPU 组合"""
        gpu_count = pod.gpu_request
        allocatable = list(node.allocatable_gpus)

        if len(allocatable) < gpu_count:
            return None, None

        # 根据策略选择
        if pod.topology_policy == "single-numa":
            return self._find_single_numa_gpus(node, gpu_count)
        elif pod.topology_policy == "best-effort":
            return self._find_best_effort_gpus(node, gpu_count)
        else:
            # none 策略:简单选择
            return allocatable[:gpu_count], None

    def _find_single_numa_gpus(
        self,
        node: Node,
        gpu_count: int
    ) -> tuple:
        """在单个 NUMA 节点内查找 GPU"""
        numa_topology = node.numa_topology

        best_numa = None
        best_gpus = None

        for numa_id, numa_gpus in numa_topology.items():
            available = [g for g in numa_gpus if g in node.allocatable_gpus]

            if len(available) >= gpu_count:
                # 优先选择 GPU 数量刚好匹配的 NUMA 节点
                if best_gpus is None or len(available) < len(best_gpus):
                    best_numa = numa_id
                    best_gpus = available[:gpu_count]

        return best_gpus, best_numa

    def _find_best_effort_gpus(
        self,
        node: Node,
        gpu_count: int
    ) -> tuple:
        """尽力而为查找最优 GPU"""
        # 使用拓扑分析器找到最优组合
        topology = node.gpu_topology
        allocatable = list(node.allocatable_gpus)

        if len(allocatable) <= gpu_count:
            return allocatable, None

        # 贪心算法选择通信开销最小的 GPU
        distance_matrix = topology.get("distance_matrix", {})

        selected = [allocatable[0]]

        while len(selected) < gpu_count:
            best_gpu = None
            best_cost = float('inf')

            for gpu in allocatable:
                if gpu in selected:
                    continue

                # 计算到已选 GPU 的总距离
                cost = sum(
                    distance_matrix.get(f"{gpu}-{s}", 1)
                    for s in selected
                )

                if cost < best_cost:
                    best_cost = cost
                    best_gpu = gpu

            if best_gpu is not None:
                selected.append(best_gpu)
            else:
                break

        # 确定主要 NUMA 节点
        numa_topology = node.numa_topology
        numa_counts = {}
        for numa_id, numa_gpus in numa_topology.items():
            count = len([g for g in selected if g in numa_gpus])
            numa_counts[numa_id] = count

        primary_numa = max(numa_counts, key=numa_counts.get) if numa_counts else None

        return selected, primary_numa

    def _calculate_score(
        self,
        node: Node,
        pod: Pod,
        gpu_ids: List[int],
        numa_node: Optional[int]
    ) -> float:
        """计算调度得分"""
        score = 100.0

        # 1. 拓扑亲和性得分 (0-40)
        topology_score = self._topology_affinity_score(node, gpu_ids)
        score += topology_score * 0.4

        # 2. NUMA 亲和性得分 (0-30)
        numa_score = self._numa_affinity_score(node, gpu_ids, numa_node)
        score += numa_score * 0.3

        # 3. 资源均衡得分 (0-20)
        balance_score = self._resource_balance_score(node, gpu_ids)
        score += balance_score * 0.2

        # 4. 节点偏好得分 (0-10)
        if pod.preferred_node == node.name:
            score += 10

        return score

    def _topology_affinity_score(
        self,
        node: Node,
        gpu_ids: List[int]
    ) -> float:
        """拓扑亲和性得分"""
        if len(gpu_ids) < 2:
            return 100.0

        topology = node.gpu_topology
        connections = topology.get("connections", [])

        # 计算平均连接带宽
        total_bandwidth = 0
        count = 0

        for i in range(len(gpu_ids)):
            for j in range(i + 1, len(gpu_ids)):
                for conn in connections:
                    if (conn["from"] == gpu_ids[i] and conn["to"] == gpu_ids[j]) or \
                       (conn["from"] == gpu_ids[j] and conn["to"] == gpu_ids[i]):
                        total_bandwidth += conn.get("bandwidth_gbps", 0)
                        count += 1
                        break

        if count == 0:
            return 50.0

        avg_bandwidth = total_bandwidth / count

        # 归一化到 0-100
        max_bandwidth = 600  # NVLink
        return min(100.0, (avg_bandwidth / max_bandwidth) * 100)

    def _numa_affinity_score(
        self,
        node: Node,
        gpu_ids: List[int],
        numa_node: Optional[int]
    ) -> float:
        """NUMA 亲和性得分"""
        if numa_node is None:
            return 50.0

        numa_topology = node.numa_topology
        numa_gpus = numa_topology.get(numa_node, [])

        # 计算 GPU 在同一 NUMA 节点的比例
        same_numa_count = len([g for g in gpu_ids if g in numa_gpus])
        ratio = same_numa_count / len(gpu_ids)

        return ratio * 100

    def _resource_balance_score(
        self,
        node: Node,
        gpu_ids: List[int]
    ) -> float:
        """资源均衡得分"""
        # 优先选择空闲资源较多的节点
        remaining = len(node.allocatable_gpus) - len(gpu_ids)
        total = node.gpu_count

        # 剩余资源比例
        ratio = remaining / total

        # 适度利用得分更高 (避免碎片)
        if ratio > 0.5:
            return (1 - ratio) * 100
        else:
            return ratio * 100

    def schedule(self, pod: Pod) -> Optional[SchedulingResult]:
        """执行调度"""
        # 1. 过滤节点
        filtered = self.filter_nodes(pod)

        if not filtered:
            self.logger.warning(f"No nodes available for pod {pod.name}")
            return None

        # 2. 评分
        results = self.score_nodes(pod, filtered)

        if not results:
            return None

        # 3. 选择最高分节点
        best = results[0]
        self.logger.info(
            f"Scheduled pod {pod.name} to node {best.node_name} "
            f"with GPUs {best.gpu_ids} (score: {best.score:.2f})"
        )

        return best


# Kubernetes 调度器扩展点实现
class KubernetesSchedulerExtender:
    """Kubernetes 调度器扩展器"""

    def __init__(self, scheduler: TopologyAwareScheduler):
        self.scheduler = scheduler

    def filter(self, request: Dict) -> Dict:
        """Filter 扩展点"""
        pod_info = request.get("Pod", {})
        nodes = request.get("Nodes", {}).get("Items", [])

        # 解析 Pod
        pod = self._parse_pod(pod_info)

        # 获取可用节点列表
        node_names = [n["metadata"]["name"] for n in nodes]

        # 过滤
        filtered_nodes = self.scheduler.filter_nodes(pod)
        filtered_names = [n.name for n in filtered_nodes if n.name in node_names]

        return {
            "Nodes": {
                "Items": [n for n in nodes if n["metadata"]["name"] in filtered_names]
            }
        }

    def prioritize(self, request: Dict) -> List[Dict]:
        """Prioritize 扩展点"""
        pod_info = request.get("Pod", {})
        nodes = request.get("Nodes", {}).get("Items", [])

        pod = self._parse_pod(pod_info)

        # 获取节点对象
        node_objs = [self.scheduler.nodes.get(n["metadata"]["name"])
                     for n in nodes
                     if n["metadata"]["name"] in self.scheduler.nodes]
        node_objs = [n for n in node_objs if n is not None]

        # 评分
        results = self.scheduler.score_nodes(pod, node_objs)

        # 转换为 Kubernetes 格式 (0-10)
        priorities = []
        for result in results:
            priorities.append({
                "Host": result.node_name,
                "Score": int(result.score / 10)  # 归一化到 0-10
            })

        return priorities

    def bind(self, request: Dict) -> Dict:
        """Bind 扩展点"""
        pod_info = request.get("Pod", {})
        node_name = request.get("Node", "")

        pod = self._parse_pod(pod_info)
        node = self.scheduler.nodes.get(node_name)

        if not node:
            return {"Error": f"Node {node_name} not found"}

        # 执行调度
        gpu_ids, numa_node = self.scheduler._find_optimal_gpus(node, pod)

        if gpu_ids is None:
            return {"Error": "No suitable GPUs found"}

        # 生成绑定结果
        binding = {
            "apiVersion": "v1",
            "kind": "Binding",
            "metadata": {
                "name": pod.name,
                "namespace": pod.namespace,
                "annotations": {
                    "scheduling.k8s.io/gpu-ids": ",".join(map(str, gpu_ids)),
                    "scheduling.k8s.io/numa-node": str(numa_node) if numa_node else ""
                }
            },
            "target": {
                "apiVersion": "v1",
                "kind": "Node",
                "name": node_name
            }
        }

        # 更新节点可用 GPU
        node.allocatable_gpus -= set(gpu_ids)

        return binding

    def _parse_pod(self, pod_info: Dict) -> Pod:
        """解析 Pod 信息"""
        metadata = pod_info.get("metadata", {})
        spec = pod_info.get("spec", {})
        annotations = metadata.get("annotations", {})

        # 获取 GPU 请求
        gpu_request = 0
        for container in spec.get("containers", []):
            resources = container.get("resources", {})
            limits = resources.get("limits", {})
            gpu_request += int(limits.get("nvidia.com/gpu", 0))

        # 获取拓扑策略
        topology_policy = annotations.get(
            "scheduling.k8s.io/topology-policy",
            "best-effort"
        )

        return Pod(
            name=metadata.get("name", ""),
            namespace=metadata.get("namespace", "default"),
            gpu_request=gpu_request,
            topology_policy=topology_policy,
            anti_affinity_pods=[],
            preferred_node=None
        )


# 使用示例
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    # 创建调度器
    scheduler = TopologyAwareScheduler()

    # 注册节点
    node1 = Node(
        name="gpu-node-1",
        gpu_count=8,
        gpu_topology={
            "connections": [
                {"from": 0, "to": 1, "type": "NVSwitch", "bandwidth_gbps": 600},
                {"from": 0, "to": 2, "type": "NVSwitch", "bandwidth_gbps": 600},
                {"from": 0, "to": 3, "type": "NVSwitch", "bandwidth_gbps": 600},
                # ... 更多连接
            ]
        },
        numa_topology={
            0: [0, 1, 2, 3],
            1: [4, 5, 6, 7]
        },
        allocatable_gpus={0, 1, 2, 3, 4, 5, 6, 7},
        labels={"accelerator": "a100"}
    )
    scheduler.register_node(node1)

    # 调度 Pod
    pod = Pod(
        name="training-job",
        namespace="ml",
        gpu_request=4,
        topology_policy="single-numa",
        anti_affinity_pods=[],
        preferred_node=None
    )

    result = scheduler.schedule(pod)
    if result:
        print(f"Scheduled to: {result.node_name}")
        print(f"GPU IDs: {result.gpu_ids}")
        print(f"NUMA Node: {result.numa_node}")
        print(f"Score: {result.score}")

Kubernetes 配置示例

# GPU 拓扑调度器配置
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: topology-scheduler-config
  namespace: kube-system
data:
  scheduler-config.yaml: |
    apiVersion: kubescheduler.config.k8s.io/v1
    kind: KubeSchedulerConfiguration
    profiles:
    - schedulerName: topology-aware-scheduler
      plugins:
        filter:
          enabled:
          - name: NodeResourcesFit
          - name: TopologyAwareFilter
        score:
          enabled:
          - name: NodeResourcesBalancedAllocation
            weight: 1
          - name: TopologyAwareScore
            weight: 5
          - name: NUMAAffinityScore
            weight: 3

      pluginConfig:
      - name: TopologyAwareFilter
        args:
          # 拓扑策略支持
          supportedPolicies:
          - none
          - single-numa
          - best-effort
          - restricted

      - name: TopologyAwareScore
        args:
          # 评分权重
          bandwidthWeight: 0.4
          latencyWeight: 0.3
          fragmentationWeight: 0.3

      - name: NUMAAffinityScore
        args:
          # NUMA 亲和性评分
          sameNUMABonus: 50
          crossNUMAPenalty: 20

---
# 调度器扩展器配置
apiVersion: v1
kind: ConfigMap
metadata:
  name: scheduler-extender-config
  namespace: kube-system
data:
  extender-config.yaml: |
    apiVersion: kubescheduler.config.k8s.io/v1
    kind: KubeSchedulerConfiguration
    extenders:
    - urlPrefix: "http://topology-scheduler-extender:8888"
      filterVerb: "filter"
      prioritizeVerb: "prioritize"
      bindVerb: "bind"
      weight: 10
      enableHTTPS: false
      nodeCacheCapable: true
      managedResources:
      - name: nvidia.com/gpu
        ignoredByScheduler: false
      - name: huawei.com/Ascend910B
        ignoredByScheduler: false
      ignorable: false

---
# 拓扑感知调度的 Pod 示例
apiVersion: v1
kind: Pod
metadata:
  name: distributed-training
  namespace: ml-jobs
  annotations:
    # 拓扑策略
    scheduling.k8s.io/topology-policy: "single-numa"
    # NUMA 亲和性
    scheduling.k8s.io/numa-affinity: "required"
spec:
  schedulerName: topology-aware-scheduler
  containers:
  - name: trainer
    image: pytorch/pytorch:2.0.0-cuda11.7-cudnn8-runtime
    resources:
      limits:
        nvidia.com/gpu: 4
        memory: "128Gi"
      requests:
        nvidia.com/gpu: 4
        memory: "128Gi"
    env:
    - name: CUDA_VISIBLE_DEVICES
      valueFrom:
        fieldRef:
          fieldPath: metadata.annotations['scheduling.k8s.io/gpu-ids']
  nodeSelector:
    accelerator: nvidia-a100
  tolerations:
  - key: nvidia.com/gpu
    operator: Exists
    effect: NoSchedule

---
# 节点拓扑标签
# 由节点代理自动发现和更新
apiVersion: v1
kind: Node
metadata:
  name: gpu-node-1
  labels:
    # 基础标签
    accelerator: nvidia-a100
    gpu-count: "8"

    # 拓扑标签
    topology.kubernetes.io/gpu-interconnect: nvswitch
    topology.kubernetes.io/numa-nodes: "2"
    topology.kubernetes.io/gpus-per-numa: "4"

    # 能力标签
    topology.kubernetes.io/single-numa-4gpu: "true"
    topology.kubernetes.io/single-numa-8gpu: "false"

  annotations:
    # 详细拓扑信息
    topology.kubernetes.io/gpu-topology: |
      {
        "gpus": [
          {"id": 0, "numa": 0, "pcie": "0000:3b:00.0"},
          {"id": 1, "numa": 0, "pcie": "0000:86:00.0"},
          {"id": 2, "numa": 0, "pcie": "0000:af:00.0"},
          {"id": 3, "numa": 0, "pcie": "0000:d8:00.0"},
          {"id": 4, "numa": 1, "pcie": "0000:3c:00.0"},
          {"id": 5, "numa": 1, "pcie": "0000:87:00.0"},
          {"id": 6, "numa": 1, "pcie": "0000:b0:00.0"},
          {"id": 7, "numa": 1, "pcie": "0000:d9:00.0"}
        ],
        "nvlinks": [
          {"from": 0, "to": 1, "bandwidth_gbps": 600},
          {"from": 0, "to": 2, "bandwidth_gbps": 600}
        ]
      }

跨节点拓扑优化

集群级拓扑分析

"""
集群级拓扑分析与优化
"""
from typing import Dict, List, Set, Tuple, Optional
from dataclasses import dataclass, field
import networkx as nx
import numpy as np


@dataclass
class ClusterNode:
    """集群节点"""
    name: str
    rack: str
    switch: str
    gpu_count: int
    network_bandwidth_gbps: float  # 网络带宽


@dataclass
class NetworkLink:
    """网络链路"""
    node_a: str
    node_b: str
    bandwidth_gbps: float
    latency_ms: float
    hops: int


class ClusterTopologyAnalyzer:
    """集群拓扑分析器"""

    def __init__(self):
        self.nodes: Dict[str, ClusterNode] = {}
        self.links: List[NetworkLink] = []
        self.graph = nx.Graph()

    def add_node(self, node: ClusterNode):
        """添加节点"""
        self.nodes[node.name] = node
        self.graph.add_node(node.name, **{
            "rack": node.rack,
            "switch": node.switch,
            "gpu_count": node.gpu_count
        })

    def add_link(self, link: NetworkLink):
        """添加链路"""
        self.links.append(link)
        self.graph.add_edge(
            link.node_a,
            link.node_b,
            bandwidth=link.bandwidth_gbps,
            latency=link.latency_ms,
            hops=link.hops
        )

    def get_network_distance(self, node_a: str, node_b: str) -> float:
        """获取网络距离"""
        if node_a == node_b:
            return 0

        if not self.graph.has_edge(node_a, node_b):
            # 计算最短路径
            try:
                path = nx.shortest_path(
                    self.graph, node_a, node_b,
                    weight='latency'
                )
                # 累计延迟
                total_latency = 0
                for i in range(len(path) - 1):
                    edge_data = self.graph.edges[path[i], path[i+1]]
                    total_latency += edge_data.get('latency', 1)
                return total_latency
            except nx.NetworkXNoPath:
                return float('inf')

        edge_data = self.graph.edges[node_a, node_b]
        return edge_data.get('latency', 1)

    def get_bandwidth(self, node_a: str, node_b: str) -> float:
        """获取节点间带宽"""
        if node_a == node_b:
            return float('inf')

        if not self.graph.has_edge(node_a, node_b):
            try:
                path = nx.shortest_path(self.graph, node_a, node_b)
                # 瓶颈带宽
                min_bandwidth = float('inf')
                for i in range(len(path) - 1):
                    edge_data = self.graph.edges[path[i], path[i+1]]
                    min_bandwidth = min(min_bandwidth, edge_data.get('bandwidth', 0))
                return min_bandwidth
            except nx.NetworkXNoPath:
                return 0

        edge_data = self.graph.edges[node_a, node_b]
        return edge_data.get('bandwidth', 0)

    def find_optimal_node_set(
        self,
        node_count: int,
        gpu_per_node: int,
        locality_preference: str = "rack"  # rack, switch, any
    ) -> List[str]:
        """
        找到最优节点集合

        Args:
            node_count: 需要的节点数
            gpu_per_node: 每节点 GPU 数
            locality_preference: 局部性偏好
        """
        # 过滤满足 GPU 要求的节点
        candidates = [
            name for name, node in self.nodes.items()
            if node.gpu_count >= gpu_per_node
        ]

        if len(candidates) < node_count:
            raise ValueError(f"Not enough nodes with {gpu_per_node} GPUs")

        if locality_preference == "rack":
            return self._find_same_rack_nodes(candidates, node_count)
        elif locality_preference == "switch":
            return self._find_same_switch_nodes(candidates, node_count)
        else:
            return self._find_lowest_latency_nodes(candidates, node_count)

    def _find_same_rack_nodes(
        self,
        candidates: List[str],
        count: int
    ) -> List[str]:
        """在同一机架内查找节点"""
        # 按机架分组
        rack_nodes: Dict[str, List[str]] = {}
        for name in candidates:
            rack = self.nodes[name].rack
            if rack not in rack_nodes:
                rack_nodes[rack] = []
            rack_nodes[rack].append(name)

        # 找最大的机架
        for rack, nodes in sorted(rack_nodes.items(), key=lambda x: -len(x[1])):
            if len(nodes) >= count:
                return nodes[:count]

        # 没有单一机架满足,使用最近的机架组合
        return self._find_lowest_latency_nodes(candidates, count)

    def _find_same_switch_nodes(
        self,
        candidates: List[str],
        count: int
    ) -> List[str]:
        """在同一交换机下查找节点"""
        switch_nodes: Dict[str, List[str]] = {}
        for name in candidates:
            switch = self.nodes[name].switch
            if switch not in switch_nodes:
                switch_nodes[switch] = []
            switch_nodes[switch].append(name)

        for switch, nodes in sorted(switch_nodes.items(), key=lambda x: -len(x[1])):
            if len(nodes) >= count:
                return nodes[:count]

        return self._find_lowest_latency_nodes(candidates, count)

    def _find_lowest_latency_nodes(
        self,
        candidates: List[str],
        count: int
    ) -> List[str]:
        """找到延迟最低的节点组合"""
        if len(candidates) <= count:
            return candidates

        # 使用贪心算法
        selected = [candidates[0]]

        while len(selected) < count:
            best_node = None
            best_total_latency = float('inf')

            for node in candidates:
                if node in selected:
                    continue

                # 计算到已选节点的总延迟
                total_latency = sum(
                    self.get_network_distance(node, s)
                    for s in selected
                )

                if total_latency < best_total_latency:
                    best_total_latency = total_latency
                    best_node = node

            if best_node:
                selected.append(best_node)
            else:
                break

        return selected

    def estimate_allreduce_time(
        self,
        nodes: List[str],
        data_size_mb: float,
        algorithm: str = "ring"
    ) -> float:
        """
        估算 AllReduce 时间

        Args:
            nodes: 参与的节点
            data_size_mb: 数据大小
            algorithm: ring | tree | recursive_halving
        """
        if len(nodes) < 2:
            return 0

        n = len(nodes)
        data_size_gb = data_size_mb / 1024

        if algorithm == "ring":
            # Ring AllReduce: 2 * (n-1) * data_size / (n * bandwidth)
            # 找瓶颈带宽
            min_bandwidth = float('inf')
            for i in range(len(nodes)):
                j = (i + 1) % len(nodes)
                bw = self.get_bandwidth(nodes[i], nodes[j])
                min_bandwidth = min(min_bandwidth, bw)

            if min_bandwidth == 0:
                return float('inf')

            time_s = 2 * (n - 1) * data_size_gb / (n * min_bandwidth)
            return time_s * 1000  # ms

        elif algorithm == "tree":
            # 树形 AllReduce: 2 * log2(n) * data_size / bandwidth
            import math
            depth = math.ceil(math.log2(n))

            avg_bandwidth = np.mean([
                self.get_bandwidth(nodes[i], nodes[j])
                for i in range(len(nodes))
                for j in range(i + 1, len(nodes))
            ])

            time_s = 2 * depth * data_size_gb / avg_bandwidth
            return time_s * 1000

        else:
            # 简单估算
            avg_latency = np.mean([
                self.get_network_distance(nodes[i], nodes[j])
                for i in range(len(nodes))
                for j in range(i + 1, len(nodes))
            ])
            return avg_latency * n

    def visualize_topology(self) -> str:
        """可视化拓扑结构"""
        lines = ["Cluster Topology:"]
        lines.append("=" * 50)

        # 按机架分组
        racks: Dict[str, List[str]] = {}
        for name, node in self.nodes.items():
            if node.rack not in racks:
                racks[node.rack] = []
            racks[node.rack].append(name)

        for rack, nodes in sorted(racks.items()):
            lines.append(f"\nRack: {rack}")
            lines.append("-" * 30)
            for node_name in nodes:
                node = self.nodes[node_name]
                lines.append(
                    f"  {node_name}: {node.gpu_count} GPUs, "
                    f"Switch: {node.switch}"
                )

        return "\n".join(lines)


class JobPlacement:
    """作业放置优化"""

    def __init__(self, cluster: ClusterTopologyAnalyzer):
        self.cluster = cluster

    def optimize_placement(
        self,
        job_requirements: Dict
    ) -> Dict:
        """
        优化作业放置

        Args:
            job_requirements: {
                "total_gpus": 32,
                "gpus_per_node": 8,
                "communication_heavy": True,
                "data_locality": "path/to/data"
            }
        """
        total_gpus = job_requirements.get("total_gpus", 8)
        gpus_per_node = job_requirements.get("gpus_per_node", 8)
        communication_heavy = job_requirements.get("communication_heavy", False)

        node_count = (total_gpus + gpus_per_node - 1) // gpus_per_node

        # 选择局部性策略
        if communication_heavy:
            locality = "switch"  # 通信密集型优先同交换机
        else:
            locality = "rack"

        # 找最优节点
        nodes = self.cluster.find_optimal_node_set(
            node_count=node_count,
            gpu_per_node=gpus_per_node,
            locality_preference=locality
        )

        # 估算通信时间
        comm_time = self.cluster.estimate_allreduce_time(
            nodes=nodes,
            data_size_mb=100,
            algorithm="ring"
        )

        return {
            "nodes": nodes,
            "node_count": len(nodes),
            "gpus_per_node": gpus_per_node,
            "total_gpus": len(nodes) * gpus_per_node,
            "estimated_allreduce_100mb_ms": comm_time,
            "placement_quality": self._evaluate_placement(nodes)
        }

    def _evaluate_placement(self, nodes: List[str]) -> str:
        """评估放置质量"""
        if len(nodes) < 2:
            return "optimal"

        # 检查是否在同一机架
        racks = set(self.cluster.nodes[n].rack for n in nodes)
        if len(racks) == 1:
            return "optimal (same rack)"

        # 检查是否在同一交换机
        switches = set(self.cluster.nodes[n].switch for n in nodes)
        if len(switches) == 1:
            return "good (same switch)"

        if len(racks) <= 2:
            return "acceptable (adjacent racks)"

        return "suboptimal (distributed)"


# 生成示例集群拓扑
def create_sample_cluster() -> ClusterTopologyAnalyzer:
    """创建示例集群"""
    cluster = ClusterTopologyAnalyzer()

    # 添加节点 (2 个机架,每个机架 4 个节点)
    for rack_id in range(2):
        for node_id in range(4):
            name = f"node-{rack_id}-{node_id}"
            cluster.add_node(ClusterNode(
                name=name,
                rack=f"rack-{rack_id}",
                switch=f"tor-{rack_id}",
                gpu_count=8,
                network_bandwidth_gbps=100
            ))

    # 添加链路
    # 同机架内 (低延迟)
    for rack_id in range(2):
        for i in range(4):
            for j in range(i + 1, 4):
                node_a = f"node-{rack_id}-{i}"
                node_b = f"node-{rack_id}-{j}"
                cluster.add_link(NetworkLink(
                    node_a=node_a,
                    node_b=node_b,
                    bandwidth_gbps=100,
                    latency_ms=0.1,
                    hops=1
                ))

    # 跨机架 (高延迟)
    for i in range(4):
        for j in range(4):
            node_a = f"node-0-{i}"
            node_b = f"node-1-{j}"
            cluster.add_link(NetworkLink(
                node_a=node_a,
                node_b=node_b,
                bandwidth_gbps=50,
                latency_ms=0.5,
                hops=3
            ))

    return cluster


# 使用示例
if __name__ == "__main__":
    # 创建集群
    cluster = create_sample_cluster()
    print(cluster.visualize_topology())

    # 作业放置优化
    placement = JobPlacement(cluster)

    # 32 GPU 通信密集型作业
    result = placement.optimize_placement({
        "total_gpus": 32,
        "gpus_per_node": 8,
        "communication_heavy": True
    })

    print("\n" + "=" * 50)
    print("Job Placement Result:")
    print(f"Nodes: {result['nodes']}")
    print(f"Total GPUs: {result['total_gpus']}")
    print(f"AllReduce Time (100MB): {result['estimated_allreduce_100mb_ms']:.2f}ms")
    print(f"Quality: {result['placement_quality']}")

NUMA 亲和性优化

NUMA 感知内存管理

"""
NUMA 感知内存管理
"""
import os
from typing import Dict, List, Optional, Set
from dataclasses import dataclass
import subprocess


@dataclass
class NUMAInfo:
    """NUMA 信息"""
    node_id: int
    cpus: List[int]
    memory_total_mb: int
    memory_free_mb: int
    gpus: List[int]
    distances: Dict[int, int]  # 到其他 NUMA 节点的距离


class NUMAManager:
    """NUMA 管理器"""

    def __init__(self):
        self.numa_nodes: Dict[int, NUMAInfo] = {}
        self._discover_numa_topology()

    def _discover_numa_topology(self):
        """发现 NUMA 拓扑"""
        try:
            # 获取 NUMA 节点数
            result = subprocess.run(
                ["numactl", "--hardware"],
                capture_output=True, text=True
            )

            if result.returncode == 0:
                self._parse_numactl_output(result.stdout)
            else:
                # 模拟配置
                self._simulate_numa_config()

        except FileNotFoundError:
            self._simulate_numa_config()

        # 发现 GPU 到 NUMA 的映射
        self._discover_gpu_numa_mapping()

    def _parse_numactl_output(self, output: str):
        """解析 numactl 输出"""
        lines = output.strip().split('\n')

        current_node = None

        for line in lines:
            if line.startswith("node"):
                parts = line.split()
                if "cpus:" in line:
                    node_id = int(parts[1])
                    cpus = [int(c) for c in parts[3:]]

                    if node_id not in self.numa_nodes:
                        self.numa_nodes[node_id] = NUMAInfo(
                            node_id=node_id,
                            cpus=cpus,
                            memory_total_mb=0,
                            memory_free_mb=0,
                            gpus=[],
                            distances={}
                        )
                    else:
                        self.numa_nodes[node_id].cpus = cpus

                elif "size:" in line:
                    node_id = int(parts[1])
                    size_mb = int(parts[3])
                    if node_id in self.numa_nodes:
                        self.numa_nodes[node_id].memory_total_mb = size_mb

                elif "free:" in line:
                    node_id = int(parts[1])
                    free_mb = int(parts[3])
                    if node_id in self.numa_nodes:
                        self.numa_nodes[node_id].memory_free_mb = free_mb

            elif "node distances:" in line.lower():
                # 解析距离矩阵
                pass

    def _simulate_numa_config(self):
        """模拟 NUMA 配置"""
        # 2 个 NUMA 节点的典型配置
        self.numa_nodes = {
            0: NUMAInfo(
                node_id=0,
                cpus=list(range(0, 64)),
                memory_total_mb=512 * 1024,
                memory_free_mb=400 * 1024,
                gpus=[0, 1, 2, 3],
                distances={0: 10, 1: 21}
            ),
            1: NUMAInfo(
                node_id=1,
                cpus=list(range(64, 128)),
                memory_total_mb=512 * 1024,
                memory_free_mb=400 * 1024,
                gpus=[4, 5, 6, 7],
                distances={0: 21, 1: 10}
            )
        }

    def _discover_gpu_numa_mapping(self):
        """发现 GPU 到 NUMA 的映射"""
        try:
            result = subprocess.run(
                ["nvidia-smi", "topo", "-m"],
                capture_output=True, text=True
            )

            if result.returncode == 0:
                # 解析 NUMA 亲和性
                pass
        except FileNotFoundError:
            pass

    def get_gpu_numa_node(self, gpu_id: int) -> Optional[int]:
        """获取 GPU 所属的 NUMA 节点"""
        for node_id, numa_info in self.numa_nodes.items():
            if gpu_id in numa_info.gpus:
                return node_id
        return None

    def get_optimal_cpus(
        self,
        gpu_ids: List[int],
        cpu_count: int
    ) -> List[int]:
        """
        获取与 GPU 亲和的最优 CPU 列表

        优先选择与 GPU 同一 NUMA 节点的 CPU
        """
        # 统计 GPU 在各 NUMA 节点的分布
        numa_gpu_count: Dict[int, int] = {}
        for gpu_id in gpu_ids:
            numa_node = self.get_gpu_numa_node(gpu_id)
            if numa_node is not None:
                numa_gpu_count[numa_node] = numa_gpu_count.get(numa_node, 0) + 1

        # 按 GPU 数量排序 NUMA 节点
        sorted_numas = sorted(
            numa_gpu_count.items(),
            key=lambda x: -x[1]
        )

        selected_cpus = []

        for numa_id, _ in sorted_numas:
            numa_info = self.numa_nodes.get(numa_id)
            if numa_info:
                available = [c for c in numa_info.cpus if c not in selected_cpus]
                need = cpu_count - len(selected_cpus)
                selected_cpus.extend(available[:need])

                if len(selected_cpus) >= cpu_count:
                    break

        # 如果还不够,从其他 NUMA 节点补充
        if len(selected_cpus) < cpu_count:
            for numa_id, numa_info in self.numa_nodes.items():
                if numa_id in numa_gpu_count:
                    continue
                available = [c for c in numa_info.cpus if c not in selected_cpus]
                need = cpu_count - len(selected_cpus)
                selected_cpus.extend(available[:need])

                if len(selected_cpus) >= cpu_count:
                    break

        return selected_cpus[:cpu_count]

    def generate_cpu_affinity_command(
        self,
        gpu_ids: List[int],
        command: List[str],
        cpu_count: int = 16
    ) -> List[str]:
        """生成带 CPU 亲和性的命令"""
        cpus = self.get_optimal_cpus(gpu_ids, cpu_count)
        cpu_list = ",".join(map(str, cpus))

        return [
            "numactl",
            f"--cpunodebind={self._get_numa_nodes_for_cpus(cpus)}",
            f"--membind={self._get_numa_nodes_for_cpus(cpus)}",
            "--"
        ] + command

    def _get_numa_nodes_for_cpus(self, cpus: List[int]) -> str:
        """获取 CPU 所属的 NUMA 节点"""
        numa_nodes = set()
        for cpu in cpus:
            for node_id, numa_info in self.numa_nodes.items():
                if cpu in numa_info.cpus:
                    numa_nodes.add(node_id)
                    break
        return ",".join(map(str, sorted(numa_nodes)))


# Kubernetes NUMA 感知配置
class KubernetesNUMAConfig:
    """Kubernetes NUMA 感知配置生成"""

    @staticmethod
    def generate_topology_manager_config() -> Dict:
        """生成拓扑管理器配置"""
        return {
            "apiVersion": "kubelet.config.k8s.io/v1beta1",
            "kind": "KubeletConfiguration",
            "topologyManagerPolicy": "best-effort",  # none, best-effort, restricted, single-numa-node
            "topologyManagerScope": "pod",  # container, pod
            "cpuManagerPolicy": "static",
            "cpuManagerPolicyOptions": {
                "full-pcpus-only": "true",
                "distribute-cpus-across-numa": "false"
            },
            "memoryManagerPolicy": "Static",
            "reservedMemory": [
                {
                    "numaNode": 0,
                    "limits": {
                        "memory": "1Gi"
                    }
                },
                {
                    "numaNode": 1,
                    "limits": {
                        "memory": "1Gi"
                    }
                }
            ]
        }

    @staticmethod
    def generate_numa_aware_pod(
        name: str,
        image: str,
        gpu_count: int,
        cpu_count: int,
        memory: str
    ) -> Dict:
        """生成 NUMA 感知的 Pod 配置"""
        return {
            "apiVersion": "v1",
            "kind": "Pod",
            "metadata": {
                "name": name,
                "annotations": {
                    # NUMA 策略注解
                    "topology.kubernetes.io/policy": "single-numa-node"
                }
            },
            "spec": {
                "containers": [{
                    "name": "main",
                    "image": image,
                    "resources": {
                        "limits": {
                            "nvidia.com/gpu": str(gpu_count),
                            "cpu": str(cpu_count),
                            "memory": memory
                        },
                        "requests": {
                            "nvidia.com/gpu": str(gpu_count),
                            "cpu": str(cpu_count),
                            "memory": memory
                        }
                    }
                }],
                # 确保资源在同一 NUMA 节点
                "topologySpreadConstraints": [{
                    "maxSkew": 1,
                    "topologyKey": "topology.kubernetes.io/zone",
                    "whenUnsatisfiable": "DoNotSchedule"
                }]
            }
        }


# 使用示例
if __name__ == "__main__":
    numa_mgr = NUMAManager()

    # 获取最优 CPU
    gpu_ids = [0, 1, 2, 3]
    optimal_cpus = numa_mgr.get_optimal_cpus(gpu_ids, cpu_count=32)
    print(f"Optimal CPUs for GPUs {gpu_ids}: {optimal_cpus}")

    # 生成带亲和性的命令
    cmd = numa_mgr.generate_cpu_affinity_command(
        gpu_ids=gpu_ids,
        command=["python", "train.py"],
        cpu_count=32
    )
    print(f"Command with NUMA affinity: {' '.join(cmd)}")

    # Kubernetes 配置
    k8s_config = KubernetesNUMAConfig.generate_numa_aware_pod(
        name="training-pod",
        image="pytorch/pytorch:latest",
        gpu_count=4,
        cpu_count=32,
        memory="128Gi"
    )
    import yaml
    print("\nKubernetes Pod Config:")
    print(yaml.dump(k8s_config))

最佳实践

拓扑优化检查清单

# 拓扑感知调度最佳实践
topology_optimization:
  # 节点级优化
  node_level:
    - name: "启用 NUMA 感知"
      config:
        kubelet:
          topologyManagerPolicy: "best-effort"
          cpuManagerPolicy: "static"

    - name: "GPU 拓扑发现"
      tools:
        - "nvidia-smi topo -m"
        - "nvidia-smi nvlink --status"
      automation: "使用 GPU Feature Discovery"

    - name: "网络拓扑标注"
      labels:
        - "topology.kubernetes.io/rack"
        - "topology.kubernetes.io/switch"
        - "topology.kubernetes.io/zone"

  # 集群级优化
  cluster_level:
    - name: "使用拓扑感知调度器"
      scheduler: "topology-aware-scheduler"
      config:
        plugins:
          - TopologyAwareFilter
          - TopologyAwareScore

    - name: "作业放置优化"
      strategies:
        communication_heavy: "same-switch"
        compute_heavy: "spread"
        mixed: "best-effort"

    - name: "Gang Scheduling"
      tools:
        - "Volcano"
        - "Kubernetes Gang Scheduling"

  # 应用级优化
  application_level:
    - name: "通信模式优化"
      patterns:
        - ring_allreduce: "网络拓扑匹配"
        - hierarchical_allreduce: "多级聚合"
        - pipeline_parallel: "考虑带宽"

    - name: "数据放置"
      strategies:
        - "数据本地性"
        - "缓存预热"

---
# 监控与告警
monitoring:
  metrics:
    - "GPU 到 GPU 通信延迟"
    - "NUMA 跨节点内存访问"
    - "网络带宽利用率"
    - "AllReduce 时间"

  alerts:
    - name: "拓扑不匹配告警"
      condition: "GPU 分布跨越多个 NUMA 节点"
      severity: "warning"

    - name: "通信瓶颈告警"
      condition: "AllReduce 时间 > 阈值"
      severity: "critical"

总结

设备拓扑感知调度是大规模 AI 训练的关键优化:

  1. 节点内拓扑:NVLink/NVSwitch 连接、NUMA 亲和性
  2. 集群拓扑:机架、交换机、网络带宽
  3. 调度策略:局部性优先、通信开销最小化
  4. NUMA 优化:CPU 亲和性、内存绑定

通过拓扑感知调度,可以显著提升分布式训练的通信效率和整体性能。

Prev
NPU 与专用 AI 芯片
Next
算力池化与弹性调度