01-NCCL 源码深度解析
概述
NCCL(NVIDIA Collective Communications Library)是 NVIDIA 开发的多 GPU 集合通信库,是分布式深度学习训练的核心组件。本文深入分析 NCCL 的架构设计、核心算法和源码实现。
NCCL 架构
整体架构
┌─────────────────────────────────────────────────────────────────────────┐
│ NCCL 整体架构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 用户 API 层 │ │
│ │ ncclAllReduce │ ncclBroadcast │ ncclReduce │ ncclAllGather │ │
│ │ ncclReduceScatter │ ncclAllToAll │ ncclSend │ ncclRecv │ │
│ └─────────────────────────────┬───────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────────┐ │
│ │ 通信抽象层 │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Channel │ │ Ring │ │ Tree │ │ CollNet │ │ │
│ │ │ Manager │ │ Algorithm│ │ Algorithm│ │(SHARP) │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │
│ └─────────────────────────────┬───────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┴───────────────────────────────────┐ │
│ │ 传输层 │ │
│ │ │ │
│ │ ┌──────────────────┐ ┌──────────────────┐ │ │
│ │ │ P2P │ │ Net │ │ │
│ │ │ ├─ NVLink │ │ ├─ Socket │ │ │
│ │ │ └─ PCIe │ │ ├─ IB Verbs │ │ │
│ │ │ │ │ └─ RDMA │ │ │
│ │ └──────────────────┘ └──────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 基础设施层 │ │
│ │ 拓扑发现 │ 内存管理 │ CUDA Stream │ 错误处理 │ 日志系统 │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
核心数据结构
// nccl.h - 核心数据结构
// Communicator:通信域
struct ncclComm {
// 标识
int rank; // 当前进程在通信域中的排名
int nRanks; // 通信域中的总进程数
uint64_t commHash; // 通信域唯一标识
// 拓扑信息
struct ncclTopoGraph* graphs[NCCL_NUM_ALGORITHMS];
// Channel 管理
int nChannels;
struct ncclChannel channels[MAXCHANNELS];
// 传输
struct ncclTransport* transports;
struct ncclP2p p2p;
struct ncclConnect* connectSend;
struct ncclConnect* connectRecv;
// 内存
struct ncclMemory memory;
// 状态
cudaStream_t userStream;
struct ncclAsyncState async;
};
// Channel:通信通道
struct ncclChannel {
// Ring 拓扑
struct ncclRing ring;
// Tree 拓扑
struct ncclTree tree;
// Peer 信息
struct ncclPeer* peers;
// 缓冲区
struct ncclDevChannelPeer* devPeers;
void* buffers;
};
// Ring 结构
struct ncclRing {
int prev; // 前驱节点
int next; // 后继节点
int* userRanks; // 用户定义的 rank 顺序
int index;
};
// Tree 结构
struct ncclTree {
int depth;
int up; // 父节点
int down[NCCL_MAX_TREE_ARITY]; // 子节点
int ndown; // 子节点数量
};
// 传输 Peer
struct ncclPeer {
struct ncclConnector send[NCCL_MAX_CONNS];
struct ncclConnector recv[NCCL_MAX_CONNS];
};
// 连接器
struct ncclConnector {
int connected;
struct ncclTransportComm* transportComm;
void* conn;
struct ncclConnInfo* connInfo;
};
// 传输资源
struct ncclTransportResources {
void* sendMem;
void* recvMem;
int* sendOffsets;
int* recvOffsets;
struct ncclSendMem* sendMemCuda;
struct ncclRecvMem* recvMemCuda;
};
拓扑发现
拓扑检测流程
// topo.cc - 拓扑发现核心代码
// 系统拓扑检测入口
ncclResult_t ncclTopoGetSystem(struct ncclComm* comm, struct ncclTopoSystem** system) {
NCCLCHECK(ncclCalloc(system, 1));
// 1. 检测 CPU
NCCLCHECK(ncclTopoGetCpuSystem(*system));
// 2. 检测 GPU
NCCLCHECK(ncclTopoAddGpus(*system));
// 3. 检测 NIC
NCCLCHECK(ncclTopoAddNics(*system));
// 4. 检测互联(NVLink、PCIe、网络)
NCCLCHECK(ncclTopoConnectNodes(*system));
// 5. 计算链路带宽
NCCLCHECK(ncclTopoComputePaths(*system));
return ncclSuccess;
}
// GPU 检测
ncclResult_t ncclTopoAddGpus(struct ncclTopoSystem* system) {
int nGpus;
CUDACHECK(cudaGetDeviceCount(&nGpus));
for (int g = 0; g < nGpus; g++) {
struct ncclTopoNode* gpu;
NCCLCHECK(ncclTopoCreateNode(system, &gpu, GPU, g));
// 获取 GPU 属性
cudaDeviceProp prop;
CUDACHECK(cudaGetDeviceProperties(&prop, g));
gpu->gpu.dev = g;
gpu->gpu.cudaCompCap = prop.major * 10 + prop.minor;
gpu->gpu.gcn = prop.gcnArchName; // AMD GPU
// 检测 NVLink
for (int p = 0; p < NVML_NVLINK_MAX_LINKS; p++) {
nvmlReturn_t ret = nvmlDeviceGetNvLinkRemotePciInfo_v2(
device, p, &pciInfo);
if (ret == NVML_SUCCESS) {
// 记录 NVLink 连接
NCCLCHECK(ncclTopoConnectGpu(system, g, pciInfo, LINK_NVL, p));
}
}
// 检测 PCIe 拓扑
NCCLCHECK(ncclTopoGetPciPath(system, gpu));
}
return ncclSuccess;
}
// 路径计算
ncclResult_t ncclTopoComputePaths(struct ncclTopoSystem* system) {
// 使用 Floyd-Warshall 算法计算所有节点间的最短路径
int nNodes = system->nodes[GPU].count +
system->nodes[NIC].count +
system->nodes[CPU].count;
float** dist = allocMatrix(nNodes, nNodes);
int** next = allocMatrix(nNodes, nNodes);
// 初始化距离矩阵
for (int i = 0; i < nNodes; i++) {
for (int j = 0; j < nNodes; j++) {
dist[i][j] = (i == j) ? 0 : INFINITY;
next[i][j] = -1;
}
}
// 填充直接连接的边
for (int i = 0; i < system->nLinks; i++) {
struct ncclTopoLink* link = &system->links[i];
float bw = linkBandwidth(link->type);
if (bw > 0) {
int from = nodeIndex(link->from);
int to = nodeIndex(link->to);
dist[from][to] = 1.0f / bw; // 使用带宽倒数作为距离
next[from][to] = to;
}
}
// Floyd-Warshall
for (int k = 0; k < nNodes; k++) {
for (int i = 0; i < nNodes; i++) {
for (int j = 0; j < nNodes; j++) {
if (dist[i][k] + dist[k][j] < dist[i][j]) {
dist[i][j] = dist[i][k] + dist[k][j];
next[i][j] = next[i][k];
}
}
}
}
// 存储路径信息
NCCLCHECK(ncclTopoStorePaths(system, dist, next, nNodes));
return ncclSuccess;
}
// NVLink 带宽检测
float getNvLinkBandwidth(int version, int nLinks) {
// NVLink 带宽 (单向,GB/s)
// NVLink 1.0: 20 GB/s per link
// NVLink 2.0: 25 GB/s per link
// NVLink 3.0: 25 GB/s per link (但带宽更高效)
// NVLink 4.0: 25 GB/s per link
float bwPerLink[] = {20.0, 25.0, 25.0, 25.0};
return bwPerLink[version] * nLinks;
}
拓扑图构建
// graph.cc - 拓扑图构建
// 构建 Ring 图
ncclResult_t ncclTopoComputeRing(
struct ncclTopoSystem* system,
struct ncclTopoGraph* graph,
int nChannels
) {
int nGpus = system->nodes[GPU].count;
// 寻找最优 Ring 顺序
// 目标:最大化环路带宽
int* order = (int*)malloc(nGpus * sizeof(int));
float bestBandwidth = 0;
int* bestOrder = (int*)malloc(nGpus * sizeof(int));
// 尝试不同的起始点和方向
for (int start = 0; start < nGpus; start++) {
for (int dir = 0; dir < 2; dir++) {
// 贪心构建环
float bandwidth = buildRingGreedy(system, order, start, dir);
if (bandwidth > bestBandwidth) {
bestBandwidth = bandwidth;
memcpy(bestOrder, order, nGpus * sizeof(int));
}
}
}
// 创建多 Channel
for (int c = 0; c < nChannels; c++) {
graph->nChannels = c + 1;
graph->channels[c].ring.nNodes = nGpus;
// 旋转 Ring 以利用不同的链路
int rotation = (c * nGpus) / nChannels;
for (int i = 0; i < nGpus; i++) {
graph->channels[c].ring.nodes[i] =
bestOrder[(i + rotation) % nGpus];
}
}
return ncclSuccess;
}
// 构建 Tree 图
ncclResult_t ncclTopoComputeTree(
struct ncclTopoSystem* system,
struct ncclTopoGraph* graph,
int nChannels
) {
int nGpus = system->nodes[GPU].count;
// 选择根节点(通常选择网络连接最好的 GPU)
int root = selectTreeRoot(system);
// BFS 构建树
for (int c = 0; c < nChannels; c++) {
// 不同 channel 使用不同的根
int channelRoot = (root + c) % nGpus;
struct ncclTopoTree* tree = &graph->channels[c].tree;
tree->nNodes = nGpus;
// BFS 遍历
int* visited = (int*)calloc(nGpus, sizeof(int));
int* queue = (int*)malloc(nGpus * sizeof(int));
int head = 0, tail = 0;
queue[tail++] = channelRoot;
visited[channelRoot] = 1;
tree->nodes[channelRoot].up = -1;
while (head < tail) {
int node = queue[head++];
int nChildren = 0;
// 遍历邻居
for (int i = 0; i < nGpus; i++) {
if (!visited[i] && isConnected(system, node, i)) {
visited[i] = 1;
queue[tail++] = i;
tree->nodes[i].up = node;
tree->nodes[node].down[nChildren++] = i;
}
}
tree->nodes[node].nDown = nChildren;
}
free(visited);
free(queue);
}
return ncclSuccess;
}
AllReduce 实现
Ring AllReduce
// all_reduce.cc - Ring AllReduce 核心实现
/*
* Ring AllReduce 算法:
*
* 阶段 1: Reduce-Scatter
* - 每个 GPU 将数据分成 N 份(N = GPU 数量)
* - 经过 N-1 轮通信,每个 GPU 得到一份完整的归约结果
*
* 阶段 2: AllGather
* - 经过 N-1 轮通信,广播归约结果到所有 GPU
*
* 总通信量: 2 * (N-1)/N * DataSize
* 通信轮次: 2 * (N-1)
*/
// Kernel: Ring AllReduce
template<typename T, int UNROLL>
__global__ void ncclAllReduceRingKernel(
struct ncclDevComm* comm,
const T* sendbuff,
T* recvbuff,
size_t count,
int nChannels,
int nRanks,
ncclRedOp_t op
) {
int tid = threadIdx.x;
int bid = blockIdx.x;
int nthreads = blockDim.x;
int channelId = bid % nChannels;
struct ncclChannel* channel = &comm->channels[channelId];
struct ncclRing* ring = &channel->ring;
int prevRank = ring->prev;
int nextRank = ring->next;
// 每个 GPU 负责的数据块大小
size_t chunkSize = count / nRanks;
size_t loopSize = nChannels * chunkSize;
// 获取发送/接收缓冲区
T* sendBuff = (T*)channel->devPeers[nextRank].send->conn.buffs[0];
T* recvBuff = (T*)channel->devPeers[prevRank].recv->conn.buffs[0];
volatile int* sendHead = channel->devPeers[nextRank].send->conn.head;
volatile int* recvTail = channel->devPeers[prevRank].recv->conn.tail;
volatile int* sendTailCache = &channel->devPeers[nextRank].send->conn.tailCache;
volatile int* recvHeadCache = &channel->devPeers[prevRank].recv->conn.headCache;
int step = 0;
// ======== Phase 1: Reduce-Scatter ========
for (int s = 0; s < nRanks - 1; s++) {
// 计算当前块索引
int sendChunk = (ring->index - s + nRanks) % nRanks;
int recvChunk = (ring->index - s - 1 + nRanks) % nRanks;
size_t sendOffset = sendChunk * chunkSize;
size_t recvOffset = recvChunk * chunkSize;
// 等待接收就绪
while (*recvHeadCache <= step) {
*recvHeadCache = *recvTail;
}
// 从 sendbuff/recvbuff 加载,与接收数据归约
for (int i = tid; i < chunkSize; i += nthreads * UNROLL) {
T vals[UNROLL];
#pragma unroll
for (int u = 0; u < UNROLL; u++) {
if (i + u * nthreads < chunkSize) {
T myVal = (s == 0) ?
sendbuff[sendOffset + i + u * nthreads] :
recvbuff[sendOffset + i + u * nthreads];
T peerVal = recvBuff[i + u * nthreads];
// 归约操作
vals[u] = reduce<T>(myVal, peerVal, op);
}
}
// 写入发送缓冲区(供下一个 GPU 使用)
#pragma unroll
for (int u = 0; u < UNROLL; u++) {
if (i + u * nthreads < chunkSize) {
sendBuff[i + u * nthreads] = vals[u];
recvbuff[recvOffset + i + u * nthreads] = vals[u];
}
}
}
// 更新步骤计数,通知下一个 GPU
__threadfence_system();
if (tid == 0) {
*sendHead = step + 1;
}
step++;
}
// ======== Phase 2: AllGather ========
for (int s = 0; s < nRanks - 1; s++) {
int recvChunk = (ring->index - s + nRanks) % nRanks;
size_t recvOffset = recvChunk * chunkSize;
// 等待接收
while (*recvHeadCache <= step) {
*recvHeadCache = *recvTail;
}
// 复制接收数据到输出和发送缓冲区
for (int i = tid; i < chunkSize; i += nthreads * UNROLL) {
#pragma unroll
for (int u = 0; u < UNROLL; u++) {
if (i + u * nthreads < chunkSize) {
T val = recvBuff[i + u * nthreads];
sendBuff[i + u * nthreads] = val;
recvbuff[recvOffset + i + u * nthreads] = val;
}
}
}
__threadfence_system();
if (tid == 0) {
*sendHead = step + 1;
}
step++;
}
}
// 启动 Ring AllReduce
ncclResult_t ncclAllReduceRingLaunch(
struct ncclComm* comm,
const void* sendbuff,
void* recvbuff,
size_t count,
ncclDataType_t datatype,
ncclRedOp_t op,
cudaStream_t stream
) {
// 选择 Kernel 配置
int nChannels = comm->nChannels;
int nBlocks = nChannels;
int nThreads = 512;
dim3 grid(nBlocks);
dim3 block(nThreads);
// 根据数据类型分发
switch (datatype) {
case ncclFloat32:
ncclAllReduceRingKernel<float, 8><<<grid, block, 0, stream>>>(
comm->devComm, (float*)sendbuff, (float*)recvbuff,
count, nChannels, comm->nRanks, op);
break;
case ncclFloat16:
ncclAllReduceRingKernel<half, 8><<<grid, block, 0, stream>>>(
comm->devComm, (half*)sendbuff, (half*)recvbuff,
count, nChannels, comm->nRanks, op);
break;
// ... 其他类型
}
return ncclSuccess;
}
Tree AllReduce
// Tree AllReduce 实现
/*
* Tree AllReduce 算法:
*
* 阶段 1: Reduce (叶子 -> 根)
* - 叶子节点发送数据给父节点
* - 父节点归约后继续向上
*
* 阶段 2: Broadcast (根 -> 叶子)
* - 根节点广播结果给子节点
* - 子节点继续向下广播
*
* 通信轮次: 2 * log(N)
* 延迟更低,但带宽利用率不如 Ring
*/
template<typename T>
__global__ void ncclAllReduceTreeKernel(
struct ncclDevComm* comm,
const T* sendbuff,
T* recvbuff,
size_t count,
int channelId,
ncclRedOp_t op
) {
struct ncclChannel* channel = &comm->channels[channelId];
struct ncclTree* tree = &channel->tree;
int tid = threadIdx.x;
int nthreads = blockDim.x;
int up = tree->up;
int* down = tree->down;
int nDown = tree->nDown;
// 获取通信缓冲区
T* sendUpBuff = (up >= 0) ?
(T*)channel->devPeers[up].send->conn.buffs[0] : nullptr;
T* recvUpBuff = (up >= 0) ?
(T*)channel->devPeers[up].recv->conn.buffs[0] : nullptr;
T* sendDownBuff[NCCL_MAX_TREE_ARITY];
T* recvDownBuff[NCCL_MAX_TREE_ARITY];
for (int i = 0; i < nDown; i++) {
sendDownBuff[i] = (T*)channel->devPeers[down[i]].send->conn.buffs[0];
recvDownBuff[i] = (T*)channel->devPeers[down[i]].recv->conn.buffs[0];
}
// ======== Phase 1: Reduce (Up) ========
// 等待子节点数据
for (int i = 0; i < nDown; i++) {
waitRecv(channel->devPeers[down[i]].recv, 0);
}
// 归约:本地数据 + 子节点数据
for (size_t i = tid; i < count; i += nthreads) {
T val = sendbuff[i];
// 归约所有子节点的数据
for (int c = 0; c < nDown; c++) {
val = reduce<T>(val, recvDownBuff[c][i], op);
}
// 如果不是根节点,发送给父节点
if (up >= 0) {
sendUpBuff[i] = val;
} else {
// 根节点:保存结果
recvbuff[i] = val;
}
}
// 通知父节点
if (up >= 0) {
__threadfence_system();
signalSend(channel->devPeers[up].send, 0);
}
// ======== Phase 2: Broadcast (Down) ========
// 等待父节点数据(非根节点)
if (up >= 0) {
waitRecv(channel->devPeers[up].recv, 1);
// 复制到本地输出
for (size_t i = tid; i < count; i += nthreads) {
T val = recvUpBuff[i];
recvbuff[i] = val;
// 发送给子节点
for (int c = 0; c < nDown; c++) {
sendDownBuff[c][i] = val;
}
}
} else {
// 根节点:直接发送给子节点
for (size_t i = tid; i < count; i += nthreads) {
T val = recvbuff[i];
for (int c = 0; c < nDown; c++) {
sendDownBuff[c][i] = val;
}
}
}
// 通知子节点
__threadfence_system();
for (int c = 0; c < nDown; c++) {
signalSend(channel->devPeers[down[c]].send, 1);
}
}
传输层实现
P2P 传输 (NVLink/PCIe)
// transport/p2p.cc - P2P 传输实现
// P2P 连接建立
ncclResult_t p2pConnect(
struct ncclComm* comm,
int peer,
struct ncclConnector* send,
struct ncclConnector* recv
) {
int cudaDev = token->comm->cudaDev;
int peerDev = token->peerDev;
// 检查 P2P 可用性
int canAccessPeer;
CUDACHECK(cudaDeviceCanAccessPeer(&canAccessPeer, cudaDev, peerDev));
if (canAccessPeer) {
// 启用 P2P 访问
cudaError_t err = cudaDeviceEnablePeerAccess(peerDev, 0);
if (err != cudaSuccess && err != cudaErrorPeerAccessAlreadyEnabled) {
WARN("cudaDeviceEnablePeerAccess failed");
return ncclUnhandledCudaError;
}
}
// 分配传输资源
struct p2pResources* resources;
NCCLCHECK(ncclCalloc(&resources, 1));
// 分配发送/接收缓冲区
NCCLCHECK(ncclCudaMalloc(&resources->sendMem, comm->buffSizes[0]));
NCCLCHECK(ncclCudaMalloc(&resources->recvMem, comm->buffSizes[1]));
// 设置连接信息
send->transportResources = resources;
send->conn = &resources->sendConn;
recv->transportResources = resources;
recv->conn = &resources->recvConn;
// 设置 IPC handle(用于跨进程 P2P)
if (comm->localRank != peer % comm->localRanks) {
CUDACHECK(cudaIpcGetMemHandle(
&send->conn->ipcHandle, resources->sendMem));
CUDACHECK(cudaIpcGetMemHandle(
&recv->conn->ipcHandle, resources->recvMem));
}
return ncclSuccess;
}
// P2P 数据传输 Kernel
template<typename T>
__device__ void p2pSend(
struct ncclConnector* conn,
const T* src,
size_t count
) {
T* dst = (T*)conn->conn.buffs[0];
// 直接通过 NVLink/PCIe 写入对端内存
for (size_t i = threadIdx.x; i < count; i += blockDim.x) {
dst[i] = src[i];
}
__threadfence_system();
// 更新 head 指针通知对端
if (threadIdx.x == 0) {
atomicAdd((int*)conn->conn.head, 1);
}
}
template<typename T>
__device__ void p2pRecv(
struct ncclConnector* conn,
T* dst,
size_t count,
int step
) {
// 等待数据到达
volatile int* tail = conn->conn.tail;
while (*tail < step) {
// Spin wait
}
T* src = (T*)conn->conn.buffs[0];
// 复制到本地
for (size_t i = threadIdx.x; i < count; i += blockDim.x) {
dst[i] = src[i];
}
}
网络传输 (IB Verbs)
// transport/net_ib.cc - InfiniBand 传输实现
// IB 初始化
ncclResult_t ncclIbInit(ncclDebugLogger_t logFunction) {
// 获取 IB 设备列表
ibv_device** devList = ibv_get_device_list(&nDevs);
if (devList == NULL || nDevs == 0) {
WARN("No IB devices found");
return ncclInternalError;
}
// 遍历设备
for (int d = 0; d < nDevs; d++) {
struct ibv_context* ctx = ibv_open_device(devList[d]);
if (ctx == NULL) continue;
// 查询设备属性
struct ibv_device_attr devAttr;
if (ibv_query_device(ctx, &devAttr) != 0) {
ibv_close_device(ctx);
continue;
}
// 查询端口
for (int p = 1; p <= devAttr.phys_port_cnt; p++) {
struct ibv_port_attr portAttr;
if (ibv_query_port(ctx, p, &portAttr) != 0) continue;
if (portAttr.state != IBV_PORT_ACTIVE) continue;
// 记录可用端口
struct ncclIbDev* dev = ncclIbDevs + ncclNIbDevs++;
dev->context = ctx;
dev->portNum = p;
dev->pkey = portAttr.pkey_tbl_len > 0 ? 0 : -1;
dev->maxQp = devAttr.max_qp;
strncpy(dev->name, ibv_get_device_name(devList[d]), MAXNAMESIZE);
}
}
ibv_free_device_list(devList);
return ncclSuccess;
}
// 建立 IB 连接
ncclResult_t ncclIbConnect(
int dev,
void* handle,
struct ncclConnect* connectInfo
) {
struct ncclIbDev* ibDev = ncclIbDevs + dev;
// 分配 Protection Domain
struct ibv_pd* pd = ibv_alloc_pd(ibDev->context);
// 创建 Completion Queue
struct ibv_cq* cq = ibv_create_cq(
ibDev->context,
NCCL_IB_MAX_CQ_SIZE,
NULL, NULL, 0);
// 创建 Queue Pair
struct ibv_qp_init_attr qpInitAttr = {
.send_cq = cq,
.recv_cq = cq,
.cap = {
.max_send_wr = NCCL_IB_MAX_QP_WR,
.max_recv_wr = NCCL_IB_MAX_QP_WR,
.max_send_sge = 1,
.max_recv_sge = 1,
},
.qp_type = IBV_QPT_RC, // Reliable Connection
};
struct ibv_qp* qp = ibv_create_qp(pd, &qpInitAttr);
// QP 状态转换: RESET -> INIT -> RTR -> RTS
NCCLCHECK(ncclIbModifyQp(qp, IBV_QPS_INIT, ibDev->portNum));
NCCLCHECK(ncclIbModifyQp(qp, IBV_QPS_RTR, connectInfo));
NCCLCHECK(ncclIbModifyQp(qp, IBV_QPS_RTS));
// 注册内存 (Memory Region)
struct ncclIbMr* mr;
NCCLCHECK(ncclCalloc(&mr, 1));
mr->mr = ibv_reg_mr(pd, buffer, size,
IBV_ACCESS_LOCAL_WRITE |
IBV_ACCESS_REMOTE_WRITE |
IBV_ACCESS_REMOTE_READ);
return ncclSuccess;
}
// IB RDMA Write
ncclResult_t ncclIbRdmaWrite(
struct ncclIbQp* qp,
void* localAddr,
uint32_t localKey,
void* remoteAddr,
uint32_t remoteKey,
size_t size
) {
struct ibv_sge sge = {
.addr = (uint64_t)localAddr,
.length = size,
.lkey = localKey,
};
struct ibv_send_wr wr = {
.wr_id = (uint64_t)qp,
.sg_list = &sge,
.num_sge = 1,
.opcode = IBV_WR_RDMA_WRITE,
.send_flags = IBV_SEND_SIGNALED,
.wr.rdma = {
.remote_addr = (uint64_t)remoteAddr,
.rkey = remoteKey,
},
};
struct ibv_send_wr* bad_wr;
int ret = ibv_post_send(qp->qp, &wr, &bad_wr);
if (ret != 0) {
WARN("ibv_post_send failed: %d", ret);
return ncclSystemError;
}
return ncclSuccess;
}
性能优化
多 Channel 并行
// 多 Channel 配置
ncclResult_t ncclTopoGetNchannels(
struct ncclTopoSystem* system,
int* nChannels
) {
// 根据拓扑确定最优 channel 数
int nGpus = system->nodes[GPU].count;
int nLinks = getMaxNvLinks(system);
// 每个 NVLink 可以支持一个 channel
// 但过多 channel 会增加同步开销
*nChannels = min(nLinks, NCCL_MAX_NCHANNELS);
*nChannels = max(*nChannels, 2); // 至少 2 个
return ncclSuccess;
}
// Channel 负载均衡
template<typename T>
__global__ void multiChannelAllReduce(
struct ncclDevComm* comm,
const T* sendbuff,
T* recvbuff,
size_t count
) {
int channelId = blockIdx.x % comm->nChannels;
int nChannels = comm->nChannels;
// 每个 channel 处理数据的一部分
size_t channelOffset = (count / nChannels) * channelId;
size_t channelCount = (channelId == nChannels - 1) ?
count - channelOffset : count / nChannels;
// 执行 channel 本地的 AllReduce
channelAllReduce<T>(
comm,
sendbuff + channelOffset,
recvbuff + channelOffset,
channelCount,
channelId);
}
流水线优化
// 通信计算重叠
ncclResult_t ncclAllReduceOverlap(
struct ncclComm* comm,
const void* sendbuff,
void* recvbuff,
size_t count,
ncclDataType_t datatype,
ncclRedOp_t op,
cudaStream_t stream
) {
// 分块处理,实现流水线
size_t chunkSize = NCCL_CHUNK_SIZE;
int nChunks = (count + chunkSize - 1) / chunkSize;
for (int c = 0; c < nChunks; c++) {
size_t offset = c * chunkSize;
size_t thisCount = min(chunkSize, count - offset);
// 当前块通信
NCCLCHECK(ncclAllReduceChunk(
comm,
(char*)sendbuff + offset * typeSize,
(char*)recvbuff + offset * typeSize,
thisCount,
datatype, op, stream));
// 与计算重叠(用户在此期间可以进行计算)
}
return ncclSuccess;
}
总结
NCCL 核心要点
| 组件 | 功能 | 关键技术 |
|---|---|---|
| 拓扑发现 | 检测 GPU/网络互联 | NVLink、PCIe、Floyd-Warshall |
| Ring AllReduce | 高带宽利用率 | 2(N-1)/N 带宽效率 |
| Tree AllReduce | 低延迟 | log(N) 通信轮次 |
| P2P 传输 | GPU 直接通信 | NVLink、GPUDirect |
| 网络传输 | 跨节点通信 | IB Verbs、RDMA |
性能调优建议
□ 根据拓扑选择算法 (Ring vs Tree)
□ 合理配置 Channel 数量
□ 启用 NVLink 和 GPUDirect RDMA
□ 使用多 Channel 并行
□ 实现通信计算重叠