HiHuo
首页
博客
手册
工具
关于
首页
博客
手册
工具
关于
  • AI 基础设施深度教程

    • AI Infra 深度教程
    • GPU容器化

      • 01-GPU 架构基础
      • NVIDIA 容器运行时
      • GPU 共享与隔离
      • GPU 监控与调试
    • Kubernetes GPU调度

      • Device Plugin 机制深度解析
      • GPU 调度器实现
      • 拓扑感知调度
      • 弹性 GPU 调度
    • AI训练平台

      • 分布式训练框架
      • 训练任务调度
      • 模型存储与管理
      • 实验管理
      • 超参数优化
    • 推理服务

      • 推理引擎原理
      • 模型服务框架
      • 动态批处理
      • 推理优化技术
      • 多模型服务
    • 异构计算

      • 05-异构计算
      • 异构计算概述
      • GPU 虚拟化技术
      • NPU 与专用 AI 芯片
      • 设备拓扑感知调度
      • 算力池化与弹性调度
    • AI工作流引擎

      • 06-AI工作流引擎
      • AI 工作流引擎概述
      • Kubeflow Pipelines 深度实践
      • 03-Argo Workflows 深度实践
      • 04-数据版本管理
      • 05-实验跟踪与模型注册
    • MLOps实践

      • 07-MLOps实践
      • 01-MLOps 成熟度模型
      • 02-数据集工程
      • 03-Feature Store 特征存储
      • 04-模型评测体系
      • 05-模型安全与治理
    • AIOps实践

      • 08-AIOps实践
      • 01-AIOps概述与架构
      • 02-异常检测算法
      • 03-根因分析与告警聚合
      • 04-智能运维决策
      • 05-AIOps平台实战
    • 面试专题

      • 09-面试专题
      • 01-AI基础设施核心面试题
      • 02-大模型面试题
      • 03-系统设计面试题
    • CUDA编程与算子开发

      • 10-CUDA 编程与算子开发
      • 01-CUDA编程模型与内存层次
      • 02-高性能 Kernel 开发实战
      • 03-Tensor Core 与矩阵运算
      • 04-算子融合与优化技术
      • 05-Triton 编程入门
    • 通信与网络底层

      • 11-通信与网络底层
      • 01-NCCL 源码深度解析
      • 02-AllReduce 算法实现
      • 03-RDMA与InfiniBand原理
      • 04-网络拓扑与通信优化
      • 05-大规模集群网络架构
    • 框架源码解析

      • 12-框架源码解析
      • 01-PyTorch分布式源码解析
      • 02-DeepSpeed源码深度解析
      • 03-Megatron-LM源码解析
      • 04-vLLM推理引擎源码解析
      • 05-HuggingFace Transformers源码解析
    • 编译优化与图优化

      • 13-编译优化与图优化
      • 01-深度学习编译器概述
      • 02-TorchDynamo与torch.compile
      • 03-XLA编译器深度解析
      • 04-算子融合与Kernel优化
      • 05-自动调度与代码生成

01-NCCL 源码深度解析

概述

NCCL(NVIDIA Collective Communications Library)是 NVIDIA 开发的多 GPU 集合通信库,是分布式深度学习训练的核心组件。本文深入分析 NCCL 的架构设计、核心算法和源码实现。

NCCL 架构

整体架构

┌─────────────────────────────────────────────────────────────────────────┐
│                         NCCL 整体架构                                    │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                       用户 API 层                                │   │
│  │   ncclAllReduce │ ncclBroadcast │ ncclReduce │ ncclAllGather    │   │
│  │   ncclReduceScatter │ ncclAllToAll │ ncclSend │ ncclRecv        │   │
│  └─────────────────────────────┬───────────────────────────────────┘   │
│                                │                                        │
│  ┌─────────────────────────────┴───────────────────────────────────┐   │
│  │                       通信抽象层                                  │   │
│  │                                                                  │   │
│  │   ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐    │   │
│  │   │ Channel  │   │ Ring     │   │ Tree     │   │ CollNet  │    │   │
│  │   │ Manager  │   │ Algorithm│   │ Algorithm│   │(SHARP)   │    │   │
│  │   └──────────┘   └──────────┘   └──────────┘   └──────────┘    │   │
│  │                                                                  │   │
│  └─────────────────────────────┬───────────────────────────────────┘   │
│                                │                                        │
│  ┌─────────────────────────────┴───────────────────────────────────┐   │
│  │                       传输层                                      │   │
│  │                                                                  │   │
│  │   ┌──────────────────┐   ┌──────────────────┐                   │   │
│  │   │     P2P          │   │      Net         │                   │   │
│  │   │  ├─ NVLink       │   │  ├─ Socket       │                   │   │
│  │   │  └─ PCIe         │   │  ├─ IB Verbs     │                   │   │
│  │   │                  │   │  └─ RDMA         │                   │   │
│  │   └──────────────────┘   └──────────────────┘                   │   │
│  │                                                                  │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                       基础设施层                                  │   │
│  │   拓扑发现 │ 内存管理 │ CUDA Stream │ 错误处理 │ 日志系统         │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

核心数据结构

// nccl.h - 核心数据结构

// Communicator:通信域
struct ncclComm {
    // 标识
    int rank;              // 当前进程在通信域中的排名
    int nRanks;            // 通信域中的总进程数
    uint64_t commHash;     // 通信域唯一标识

    // 拓扑信息
    struct ncclTopoGraph* graphs[NCCL_NUM_ALGORITHMS];

    // Channel 管理
    int nChannels;
    struct ncclChannel channels[MAXCHANNELS];

    // 传输
    struct ncclTransport* transports;
    struct ncclP2p p2p;
    struct ncclConnect* connectSend;
    struct ncclConnect* connectRecv;

    // 内存
    struct ncclMemory memory;

    // 状态
    cudaStream_t userStream;
    struct ncclAsyncState async;
};

// Channel:通信通道
struct ncclChannel {
    // Ring 拓扑
    struct ncclRing ring;

    // Tree 拓扑
    struct ncclTree tree;

    // Peer 信息
    struct ncclPeer* peers;

    // 缓冲区
    struct ncclDevChannelPeer* devPeers;
    void* buffers;
};

// Ring 结构
struct ncclRing {
    int prev;              // 前驱节点
    int next;              // 后继节点
    int* userRanks;        // 用户定义的 rank 顺序
    int index;
};

// Tree 结构
struct ncclTree {
    int depth;
    int up;                // 父节点
    int down[NCCL_MAX_TREE_ARITY];  // 子节点
    int ndown;             // 子节点数量
};

// 传输 Peer
struct ncclPeer {
    struct ncclConnector send[NCCL_MAX_CONNS];
    struct ncclConnector recv[NCCL_MAX_CONNS];
};

// 连接器
struct ncclConnector {
    int connected;
    struct ncclTransportComm* transportComm;
    void* conn;
    struct ncclConnInfo* connInfo;
};

// 传输资源
struct ncclTransportResources {
    void* sendMem;
    void* recvMem;
    int* sendOffsets;
    int* recvOffsets;
    struct ncclSendMem* sendMemCuda;
    struct ncclRecvMem* recvMemCuda;
};

拓扑发现

拓扑检测流程

// topo.cc - 拓扑发现核心代码

// 系统拓扑检测入口
ncclResult_t ncclTopoGetSystem(struct ncclComm* comm, struct ncclTopoSystem** system) {
    NCCLCHECK(ncclCalloc(system, 1));

    // 1. 检测 CPU
    NCCLCHECK(ncclTopoGetCpuSystem(*system));

    // 2. 检测 GPU
    NCCLCHECK(ncclTopoAddGpus(*system));

    // 3. 检测 NIC
    NCCLCHECK(ncclTopoAddNics(*system));

    // 4. 检测互联(NVLink、PCIe、网络)
    NCCLCHECK(ncclTopoConnectNodes(*system));

    // 5. 计算链路带宽
    NCCLCHECK(ncclTopoComputePaths(*system));

    return ncclSuccess;
}

// GPU 检测
ncclResult_t ncclTopoAddGpus(struct ncclTopoSystem* system) {
    int nGpus;
    CUDACHECK(cudaGetDeviceCount(&nGpus));

    for (int g = 0; g < nGpus; g++) {
        struct ncclTopoNode* gpu;
        NCCLCHECK(ncclTopoCreateNode(system, &gpu, GPU, g));

        // 获取 GPU 属性
        cudaDeviceProp prop;
        CUDACHECK(cudaGetDeviceProperties(&prop, g));

        gpu->gpu.dev = g;
        gpu->gpu.cudaCompCap = prop.major * 10 + prop.minor;
        gpu->gpu.gcn = prop.gcnArchName;  // AMD GPU

        // 检测 NVLink
        for (int p = 0; p < NVML_NVLINK_MAX_LINKS; p++) {
            nvmlReturn_t ret = nvmlDeviceGetNvLinkRemotePciInfo_v2(
                device, p, &pciInfo);
            if (ret == NVML_SUCCESS) {
                // 记录 NVLink 连接
                NCCLCHECK(ncclTopoConnectGpu(system, g, pciInfo, LINK_NVL, p));
            }
        }

        // 检测 PCIe 拓扑
        NCCLCHECK(ncclTopoGetPciPath(system, gpu));
    }

    return ncclSuccess;
}

// 路径计算
ncclResult_t ncclTopoComputePaths(struct ncclTopoSystem* system) {
    // 使用 Floyd-Warshall 算法计算所有节点间的最短路径

    int nNodes = system->nodes[GPU].count +
                 system->nodes[NIC].count +
                 system->nodes[CPU].count;

    float** dist = allocMatrix(nNodes, nNodes);
    int** next = allocMatrix(nNodes, nNodes);

    // 初始化距离矩阵
    for (int i = 0; i < nNodes; i++) {
        for (int j = 0; j < nNodes; j++) {
            dist[i][j] = (i == j) ? 0 : INFINITY;
            next[i][j] = -1;
        }
    }

    // 填充直接连接的边
    for (int i = 0; i < system->nLinks; i++) {
        struct ncclTopoLink* link = &system->links[i];
        float bw = linkBandwidth(link->type);
        if (bw > 0) {
            int from = nodeIndex(link->from);
            int to = nodeIndex(link->to);
            dist[from][to] = 1.0f / bw;  // 使用带宽倒数作为距离
            next[from][to] = to;
        }
    }

    // Floyd-Warshall
    for (int k = 0; k < nNodes; k++) {
        for (int i = 0; i < nNodes; i++) {
            for (int j = 0; j < nNodes; j++) {
                if (dist[i][k] + dist[k][j] < dist[i][j]) {
                    dist[i][j] = dist[i][k] + dist[k][j];
                    next[i][j] = next[i][k];
                }
            }
        }
    }

    // 存储路径信息
    NCCLCHECK(ncclTopoStorePaths(system, dist, next, nNodes));

    return ncclSuccess;
}

// NVLink 带宽检测
float getNvLinkBandwidth(int version, int nLinks) {
    // NVLink 带宽 (单向,GB/s)
    // NVLink 1.0: 20 GB/s per link
    // NVLink 2.0: 25 GB/s per link
    // NVLink 3.0: 25 GB/s per link (但带宽更高效)
    // NVLink 4.0: 25 GB/s per link
    float bwPerLink[] = {20.0, 25.0, 25.0, 25.0};
    return bwPerLink[version] * nLinks;
}

拓扑图构建

// graph.cc - 拓扑图构建

// 构建 Ring 图
ncclResult_t ncclTopoComputeRing(
    struct ncclTopoSystem* system,
    struct ncclTopoGraph* graph,
    int nChannels
) {
    int nGpus = system->nodes[GPU].count;

    // 寻找最优 Ring 顺序
    // 目标:最大化环路带宽

    int* order = (int*)malloc(nGpus * sizeof(int));
    float bestBandwidth = 0;
    int* bestOrder = (int*)malloc(nGpus * sizeof(int));

    // 尝试不同的起始点和方向
    for (int start = 0; start < nGpus; start++) {
        for (int dir = 0; dir < 2; dir++) {
            // 贪心构建环
            float bandwidth = buildRingGreedy(system, order, start, dir);

            if (bandwidth > bestBandwidth) {
                bestBandwidth = bandwidth;
                memcpy(bestOrder, order, nGpus * sizeof(int));
            }
        }
    }

    // 创建多 Channel
    for (int c = 0; c < nChannels; c++) {
        graph->nChannels = c + 1;
        graph->channels[c].ring.nNodes = nGpus;

        // 旋转 Ring 以利用不同的链路
        int rotation = (c * nGpus) / nChannels;
        for (int i = 0; i < nGpus; i++) {
            graph->channels[c].ring.nodes[i] =
                bestOrder[(i + rotation) % nGpus];
        }
    }

    return ncclSuccess;
}

// 构建 Tree 图
ncclResult_t ncclTopoComputeTree(
    struct ncclTopoSystem* system,
    struct ncclTopoGraph* graph,
    int nChannels
) {
    int nGpus = system->nodes[GPU].count;

    // 选择根节点(通常选择网络连接最好的 GPU)
    int root = selectTreeRoot(system);

    // BFS 构建树
    for (int c = 0; c < nChannels; c++) {
        // 不同 channel 使用不同的根
        int channelRoot = (root + c) % nGpus;

        struct ncclTopoTree* tree = &graph->channels[c].tree;
        tree->nNodes = nGpus;

        // BFS 遍历
        int* visited = (int*)calloc(nGpus, sizeof(int));
        int* queue = (int*)malloc(nGpus * sizeof(int));
        int head = 0, tail = 0;

        queue[tail++] = channelRoot;
        visited[channelRoot] = 1;
        tree->nodes[channelRoot].up = -1;

        while (head < tail) {
            int node = queue[head++];
            int nChildren = 0;

            // 遍历邻居
            for (int i = 0; i < nGpus; i++) {
                if (!visited[i] && isConnected(system, node, i)) {
                    visited[i] = 1;
                    queue[tail++] = i;

                    tree->nodes[i].up = node;
                    tree->nodes[node].down[nChildren++] = i;
                }
            }
            tree->nodes[node].nDown = nChildren;
        }

        free(visited);
        free(queue);
    }

    return ncclSuccess;
}

AllReduce 实现

Ring AllReduce

// all_reduce.cc - Ring AllReduce 核心实现

/*
 * Ring AllReduce 算法:
 *
 * 阶段 1: Reduce-Scatter
 * - 每个 GPU 将数据分成 N 份(N = GPU 数量)
 * - 经过 N-1 轮通信,每个 GPU 得到一份完整的归约结果
 *
 * 阶段 2: AllGather
 * - 经过 N-1 轮通信,广播归约结果到所有 GPU
 *
 * 总通信量: 2 * (N-1)/N * DataSize
 * 通信轮次: 2 * (N-1)
 */

// Kernel: Ring AllReduce
template<typename T, int UNROLL>
__global__ void ncclAllReduceRingKernel(
    struct ncclDevComm* comm,
    const T* sendbuff,
    T* recvbuff,
    size_t count,
    int nChannels,
    int nRanks,
    ncclRedOp_t op
) {
    int tid = threadIdx.x;
    int bid = blockIdx.x;
    int nthreads = blockDim.x;

    int channelId = bid % nChannels;
    struct ncclChannel* channel = &comm->channels[channelId];
    struct ncclRing* ring = &channel->ring;

    int prevRank = ring->prev;
    int nextRank = ring->next;

    // 每个 GPU 负责的数据块大小
    size_t chunkSize = count / nRanks;
    size_t loopSize = nChannels * chunkSize;

    // 获取发送/接收缓冲区
    T* sendBuff = (T*)channel->devPeers[nextRank].send->conn.buffs[0];
    T* recvBuff = (T*)channel->devPeers[prevRank].recv->conn.buffs[0];

    volatile int* sendHead = channel->devPeers[nextRank].send->conn.head;
    volatile int* recvTail = channel->devPeers[prevRank].recv->conn.tail;
    volatile int* sendTailCache = &channel->devPeers[nextRank].send->conn.tailCache;
    volatile int* recvHeadCache = &channel->devPeers[prevRank].recv->conn.headCache;

    int step = 0;

    // ======== Phase 1: Reduce-Scatter ========
    for (int s = 0; s < nRanks - 1; s++) {
        // 计算当前块索引
        int sendChunk = (ring->index - s + nRanks) % nRanks;
        int recvChunk = (ring->index - s - 1 + nRanks) % nRanks;

        size_t sendOffset = sendChunk * chunkSize;
        size_t recvOffset = recvChunk * chunkSize;

        // 等待接收就绪
        while (*recvHeadCache <= step) {
            *recvHeadCache = *recvTail;
        }

        // 从 sendbuff/recvbuff 加载,与接收数据归约
        for (int i = tid; i < chunkSize; i += nthreads * UNROLL) {
            T vals[UNROLL];

            #pragma unroll
            for (int u = 0; u < UNROLL; u++) {
                if (i + u * nthreads < chunkSize) {
                    T myVal = (s == 0) ?
                        sendbuff[sendOffset + i + u * nthreads] :
                        recvbuff[sendOffset + i + u * nthreads];
                    T peerVal = recvBuff[i + u * nthreads];

                    // 归约操作
                    vals[u] = reduce<T>(myVal, peerVal, op);
                }
            }

            // 写入发送缓冲区(供下一个 GPU 使用)
            #pragma unroll
            for (int u = 0; u < UNROLL; u++) {
                if (i + u * nthreads < chunkSize) {
                    sendBuff[i + u * nthreads] = vals[u];
                    recvbuff[recvOffset + i + u * nthreads] = vals[u];
                }
            }
        }

        // 更新步骤计数,通知下一个 GPU
        __threadfence_system();
        if (tid == 0) {
            *sendHead = step + 1;
        }
        step++;
    }

    // ======== Phase 2: AllGather ========
    for (int s = 0; s < nRanks - 1; s++) {
        int recvChunk = (ring->index - s + nRanks) % nRanks;
        size_t recvOffset = recvChunk * chunkSize;

        // 等待接收
        while (*recvHeadCache <= step) {
            *recvHeadCache = *recvTail;
        }

        // 复制接收数据到输出和发送缓冲区
        for (int i = tid; i < chunkSize; i += nthreads * UNROLL) {
            #pragma unroll
            for (int u = 0; u < UNROLL; u++) {
                if (i + u * nthreads < chunkSize) {
                    T val = recvBuff[i + u * nthreads];
                    sendBuff[i + u * nthreads] = val;
                    recvbuff[recvOffset + i + u * nthreads] = val;
                }
            }
        }

        __threadfence_system();
        if (tid == 0) {
            *sendHead = step + 1;
        }
        step++;
    }
}

// 启动 Ring AllReduce
ncclResult_t ncclAllReduceRingLaunch(
    struct ncclComm* comm,
    const void* sendbuff,
    void* recvbuff,
    size_t count,
    ncclDataType_t datatype,
    ncclRedOp_t op,
    cudaStream_t stream
) {
    // 选择 Kernel 配置
    int nChannels = comm->nChannels;
    int nBlocks = nChannels;
    int nThreads = 512;

    dim3 grid(nBlocks);
    dim3 block(nThreads);

    // 根据数据类型分发
    switch (datatype) {
        case ncclFloat32:
            ncclAllReduceRingKernel<float, 8><<<grid, block, 0, stream>>>(
                comm->devComm, (float*)sendbuff, (float*)recvbuff,
                count, nChannels, comm->nRanks, op);
            break;
        case ncclFloat16:
            ncclAllReduceRingKernel<half, 8><<<grid, block, 0, stream>>>(
                comm->devComm, (half*)sendbuff, (half*)recvbuff,
                count, nChannels, comm->nRanks, op);
            break;
        // ... 其他类型
    }

    return ncclSuccess;
}

Tree AllReduce

// Tree AllReduce 实现

/*
 * Tree AllReduce 算法:
 *
 * 阶段 1: Reduce (叶子 -> 根)
 * - 叶子节点发送数据给父节点
 * - 父节点归约后继续向上
 *
 * 阶段 2: Broadcast (根 -> 叶子)
 * - 根节点广播结果给子节点
 * - 子节点继续向下广播
 *
 * 通信轮次: 2 * log(N)
 * 延迟更低,但带宽利用率不如 Ring
 */

template<typename T>
__global__ void ncclAllReduceTreeKernel(
    struct ncclDevComm* comm,
    const T* sendbuff,
    T* recvbuff,
    size_t count,
    int channelId,
    ncclRedOp_t op
) {
    struct ncclChannel* channel = &comm->channels[channelId];
    struct ncclTree* tree = &channel->tree;

    int tid = threadIdx.x;
    int nthreads = blockDim.x;

    int up = tree->up;
    int* down = tree->down;
    int nDown = tree->nDown;

    // 获取通信缓冲区
    T* sendUpBuff = (up >= 0) ?
        (T*)channel->devPeers[up].send->conn.buffs[0] : nullptr;
    T* recvUpBuff = (up >= 0) ?
        (T*)channel->devPeers[up].recv->conn.buffs[0] : nullptr;

    T* sendDownBuff[NCCL_MAX_TREE_ARITY];
    T* recvDownBuff[NCCL_MAX_TREE_ARITY];
    for (int i = 0; i < nDown; i++) {
        sendDownBuff[i] = (T*)channel->devPeers[down[i]].send->conn.buffs[0];
        recvDownBuff[i] = (T*)channel->devPeers[down[i]].recv->conn.buffs[0];
    }

    // ======== Phase 1: Reduce (Up) ========
    // 等待子节点数据
    for (int i = 0; i < nDown; i++) {
        waitRecv(channel->devPeers[down[i]].recv, 0);
    }

    // 归约:本地数据 + 子节点数据
    for (size_t i = tid; i < count; i += nthreads) {
        T val = sendbuff[i];

        // 归约所有子节点的数据
        for (int c = 0; c < nDown; c++) {
            val = reduce<T>(val, recvDownBuff[c][i], op);
        }

        // 如果不是根节点,发送给父节点
        if (up >= 0) {
            sendUpBuff[i] = val;
        } else {
            // 根节点:保存结果
            recvbuff[i] = val;
        }
    }

    // 通知父节点
    if (up >= 0) {
        __threadfence_system();
        signalSend(channel->devPeers[up].send, 0);
    }

    // ======== Phase 2: Broadcast (Down) ========
    // 等待父节点数据(非根节点)
    if (up >= 0) {
        waitRecv(channel->devPeers[up].recv, 1);

        // 复制到本地输出
        for (size_t i = tid; i < count; i += nthreads) {
            T val = recvUpBuff[i];
            recvbuff[i] = val;

            // 发送给子节点
            for (int c = 0; c < nDown; c++) {
                sendDownBuff[c][i] = val;
            }
        }
    } else {
        // 根节点:直接发送给子节点
        for (size_t i = tid; i < count; i += nthreads) {
            T val = recvbuff[i];
            for (int c = 0; c < nDown; c++) {
                sendDownBuff[c][i] = val;
            }
        }
    }

    // 通知子节点
    __threadfence_system();
    for (int c = 0; c < nDown; c++) {
        signalSend(channel->devPeers[down[c]].send, 1);
    }
}

传输层实现

P2P 传输 (NVLink/PCIe)

// transport/p2p.cc - P2P 传输实现

// P2P 连接建立
ncclResult_t p2pConnect(
    struct ncclComm* comm,
    int peer,
    struct ncclConnector* send,
    struct ncclConnector* recv
) {
    int cudaDev = token->comm->cudaDev;
    int peerDev = token->peerDev;

    // 检查 P2P 可用性
    int canAccessPeer;
    CUDACHECK(cudaDeviceCanAccessPeer(&canAccessPeer, cudaDev, peerDev));

    if (canAccessPeer) {
        // 启用 P2P 访问
        cudaError_t err = cudaDeviceEnablePeerAccess(peerDev, 0);
        if (err != cudaSuccess && err != cudaErrorPeerAccessAlreadyEnabled) {
            WARN("cudaDeviceEnablePeerAccess failed");
            return ncclUnhandledCudaError;
        }
    }

    // 分配传输资源
    struct p2pResources* resources;
    NCCLCHECK(ncclCalloc(&resources, 1));

    // 分配发送/接收缓冲区
    NCCLCHECK(ncclCudaMalloc(&resources->sendMem, comm->buffSizes[0]));
    NCCLCHECK(ncclCudaMalloc(&resources->recvMem, comm->buffSizes[1]));

    // 设置连接信息
    send->transportResources = resources;
    send->conn = &resources->sendConn;

    recv->transportResources = resources;
    recv->conn = &resources->recvConn;

    // 设置 IPC handle(用于跨进程 P2P)
    if (comm->localRank != peer % comm->localRanks) {
        CUDACHECK(cudaIpcGetMemHandle(
            &send->conn->ipcHandle, resources->sendMem));
        CUDACHECK(cudaIpcGetMemHandle(
            &recv->conn->ipcHandle, resources->recvMem));
    }

    return ncclSuccess;
}

// P2P 数据传输 Kernel
template<typename T>
__device__ void p2pSend(
    struct ncclConnector* conn,
    const T* src,
    size_t count
) {
    T* dst = (T*)conn->conn.buffs[0];

    // 直接通过 NVLink/PCIe 写入对端内存
    for (size_t i = threadIdx.x; i < count; i += blockDim.x) {
        dst[i] = src[i];
    }

    __threadfence_system();

    // 更新 head 指针通知对端
    if (threadIdx.x == 0) {
        atomicAdd((int*)conn->conn.head, 1);
    }
}

template<typename T>
__device__ void p2pRecv(
    struct ncclConnector* conn,
    T* dst,
    size_t count,
    int step
) {
    // 等待数据到达
    volatile int* tail = conn->conn.tail;
    while (*tail < step) {
        // Spin wait
    }

    T* src = (T*)conn->conn.buffs[0];

    // 复制到本地
    for (size_t i = threadIdx.x; i < count; i += blockDim.x) {
        dst[i] = src[i];
    }
}

网络传输 (IB Verbs)

// transport/net_ib.cc - InfiniBand 传输实现

// IB 初始化
ncclResult_t ncclIbInit(ncclDebugLogger_t logFunction) {
    // 获取 IB 设备列表
    ibv_device** devList = ibv_get_device_list(&nDevs);
    if (devList == NULL || nDevs == 0) {
        WARN("No IB devices found");
        return ncclInternalError;
    }

    // 遍历设备
    for (int d = 0; d < nDevs; d++) {
        struct ibv_context* ctx = ibv_open_device(devList[d]);
        if (ctx == NULL) continue;

        // 查询设备属性
        struct ibv_device_attr devAttr;
        if (ibv_query_device(ctx, &devAttr) != 0) {
            ibv_close_device(ctx);
            continue;
        }

        // 查询端口
        for (int p = 1; p <= devAttr.phys_port_cnt; p++) {
            struct ibv_port_attr portAttr;
            if (ibv_query_port(ctx, p, &portAttr) != 0) continue;

            if (portAttr.state != IBV_PORT_ACTIVE) continue;

            // 记录可用端口
            struct ncclIbDev* dev = ncclIbDevs + ncclNIbDevs++;
            dev->context = ctx;
            dev->portNum = p;
            dev->pkey = portAttr.pkey_tbl_len > 0 ? 0 : -1;
            dev->maxQp = devAttr.max_qp;
            strncpy(dev->name, ibv_get_device_name(devList[d]), MAXNAMESIZE);
        }
    }

    ibv_free_device_list(devList);
    return ncclSuccess;
}

// 建立 IB 连接
ncclResult_t ncclIbConnect(
    int dev,
    void* handle,
    struct ncclConnect* connectInfo
) {
    struct ncclIbDev* ibDev = ncclIbDevs + dev;

    // 分配 Protection Domain
    struct ibv_pd* pd = ibv_alloc_pd(ibDev->context);

    // 创建 Completion Queue
    struct ibv_cq* cq = ibv_create_cq(
        ibDev->context,
        NCCL_IB_MAX_CQ_SIZE,
        NULL, NULL, 0);

    // 创建 Queue Pair
    struct ibv_qp_init_attr qpInitAttr = {
        .send_cq = cq,
        .recv_cq = cq,
        .cap = {
            .max_send_wr = NCCL_IB_MAX_QP_WR,
            .max_recv_wr = NCCL_IB_MAX_QP_WR,
            .max_send_sge = 1,
            .max_recv_sge = 1,
        },
        .qp_type = IBV_QPT_RC,  // Reliable Connection
    };

    struct ibv_qp* qp = ibv_create_qp(pd, &qpInitAttr);

    // QP 状态转换: RESET -> INIT -> RTR -> RTS
    NCCLCHECK(ncclIbModifyQp(qp, IBV_QPS_INIT, ibDev->portNum));
    NCCLCHECK(ncclIbModifyQp(qp, IBV_QPS_RTR, connectInfo));
    NCCLCHECK(ncclIbModifyQp(qp, IBV_QPS_RTS));

    // 注册内存 (Memory Region)
    struct ncclIbMr* mr;
    NCCLCHECK(ncclCalloc(&mr, 1));
    mr->mr = ibv_reg_mr(pd, buffer, size,
        IBV_ACCESS_LOCAL_WRITE |
        IBV_ACCESS_REMOTE_WRITE |
        IBV_ACCESS_REMOTE_READ);

    return ncclSuccess;
}

// IB RDMA Write
ncclResult_t ncclIbRdmaWrite(
    struct ncclIbQp* qp,
    void* localAddr,
    uint32_t localKey,
    void* remoteAddr,
    uint32_t remoteKey,
    size_t size
) {
    struct ibv_sge sge = {
        .addr = (uint64_t)localAddr,
        .length = size,
        .lkey = localKey,
    };

    struct ibv_send_wr wr = {
        .wr_id = (uint64_t)qp,
        .sg_list = &sge,
        .num_sge = 1,
        .opcode = IBV_WR_RDMA_WRITE,
        .send_flags = IBV_SEND_SIGNALED,
        .wr.rdma = {
            .remote_addr = (uint64_t)remoteAddr,
            .rkey = remoteKey,
        },
    };

    struct ibv_send_wr* bad_wr;
    int ret = ibv_post_send(qp->qp, &wr, &bad_wr);
    if (ret != 0) {
        WARN("ibv_post_send failed: %d", ret);
        return ncclSystemError;
    }

    return ncclSuccess;
}

性能优化

多 Channel 并行

// 多 Channel 配置
ncclResult_t ncclTopoGetNchannels(
    struct ncclTopoSystem* system,
    int* nChannels
) {
    // 根据拓扑确定最优 channel 数
    int nGpus = system->nodes[GPU].count;
    int nLinks = getMaxNvLinks(system);

    // 每个 NVLink 可以支持一个 channel
    // 但过多 channel 会增加同步开销
    *nChannels = min(nLinks, NCCL_MAX_NCHANNELS);
    *nChannels = max(*nChannels, 2);  // 至少 2 个

    return ncclSuccess;
}

// Channel 负载均衡
template<typename T>
__global__ void multiChannelAllReduce(
    struct ncclDevComm* comm,
    const T* sendbuff,
    T* recvbuff,
    size_t count
) {
    int channelId = blockIdx.x % comm->nChannels;
    int nChannels = comm->nChannels;

    // 每个 channel 处理数据的一部分
    size_t channelOffset = (count / nChannels) * channelId;
    size_t channelCount = (channelId == nChannels - 1) ?
        count - channelOffset : count / nChannels;

    // 执行 channel 本地的 AllReduce
    channelAllReduce<T>(
        comm,
        sendbuff + channelOffset,
        recvbuff + channelOffset,
        channelCount,
        channelId);
}

流水线优化

// 通信计算重叠
ncclResult_t ncclAllReduceOverlap(
    struct ncclComm* comm,
    const void* sendbuff,
    void* recvbuff,
    size_t count,
    ncclDataType_t datatype,
    ncclRedOp_t op,
    cudaStream_t stream
) {
    // 分块处理,实现流水线
    size_t chunkSize = NCCL_CHUNK_SIZE;
    int nChunks = (count + chunkSize - 1) / chunkSize;

    for (int c = 0; c < nChunks; c++) {
        size_t offset = c * chunkSize;
        size_t thisCount = min(chunkSize, count - offset);

        // 当前块通信
        NCCLCHECK(ncclAllReduceChunk(
            comm,
            (char*)sendbuff + offset * typeSize,
            (char*)recvbuff + offset * typeSize,
            thisCount,
            datatype, op, stream));

        // 与计算重叠(用户在此期间可以进行计算)
    }

    return ncclSuccess;
}

总结

NCCL 核心要点

组件功能关键技术
拓扑发现检测 GPU/网络互联NVLink、PCIe、Floyd-Warshall
Ring AllReduce高带宽利用率2(N-1)/N 带宽效率
Tree AllReduce低延迟log(N) 通信轮次
P2P 传输GPU 直接通信NVLink、GPUDirect
网络传输跨节点通信IB Verbs、RDMA

性能调优建议

□ 根据拓扑选择算法 (Ring vs Tree)
□ 合理配置 Channel 数量
□ 启用 NVLink 和 GPUDirect RDMA
□ 使用多 Channel 并行
□ 实现通信计算重叠
Prev
11-通信与网络底层
Next
02-AllReduce 算法实现