06-分布式训练-多GPU与多机
引言
当模型规模超过单个GPU的显存容量,或训练时间过长时,分布式训练成为必然选择。本章将深入讲解分布式训练的原理、实现和最佳实践。
1. 分布式训练基础
1.1 为什么需要分布式训练
单GPU训练的限制:
- 显存限制:单卡显存通常为8GB-80GB,大模型(如GPT-3的175B参数)无法装入单卡
- 训练时间:大数据集在单卡上训练可能需要数周甚至数月
- 批量大小限制:小batch size导致梯度估计不稳定,影响收敛速度
分布式训练的优势:
# 单GPU训练时间估算
total_samples = 1_000_000
batch_size = 32
samples_per_second = 50
time_per_epoch = total_samples / (batch_size * samples_per_second)
# 约625秒/epoch,100个epoch需要17.4小时
# 8 GPU训练(理想加速比)
time_per_epoch_distributed = time_per_epoch / 8
# 约78秒/epoch,100个epoch需要2.2小时
1.2 数据并行 vs 模型并行 vs 流水线并行
数据并行(Data Parallelism)
每个GPU持有完整模型副本,处理不同的数据子集。
# 数据并行示意
# GPU 0: Model | Batch 0
# GPU 1: Model | Batch 1
# GPU 2: Model | Batch 2
# GPU 3: Model | Batch 3
# 前向传播:各GPU独立计算
# 反向传播:梯度在所有GPU间同步
# 参数更新:使用平均梯度更新
优点:
- 实现简单
- 适用于大多数场景
- 通信开销相对较小
缺点:
- 模型必须能装入单个GPU
- 批量大小受限于单GPU显存
模型并行(Model Parallelism)
模型的不同部分分布在不同GPU上。
# 模型并行示意
# GPU 0: Layer 1-4 | Forward Batch 0
# GPU 1: Layer 5-8 | Wait...
# GPU 2: Layer 9-12 | Wait...
# GPU 3: Layer 13-16 | Wait...
# 简单模型并行实现
class ModelParallel(nn.Module):
def __init__(self):
super().__init__()
self.layer1 = nn.Linear(1000, 2000).to('cuda:0')
self.layer2 = nn.Linear(2000, 2000).to('cuda:1')
self.layer3 = nn.Linear(2000, 1000).to('cuda:2')
def forward(self, x):
x = x.to('cuda:0')
x = F.relu(self.layer1(x))
x = x.to('cuda:1')
x = F.relu(self.layer2(x))
x = x.to('cuda:2')
x = self.layer3(x)
return x
优点:
- 可训练超大模型
- 突破单GPU显存限制
缺点:
- GPU利用率低(流水线气泡)
- 实现复杂
- 通信开销大
流水线并行(Pipeline Parallelism)
将模型分段,像流水线一样处理多个mini-batch。
# 流水线并行示意(4个micro-batch)
# Time 1: GPU0[MB1] -> GPU1[---] -> GPU2[---] -> GPU3[---]
# Time 2: GPU0[MB2] -> GPU1[MB1] -> GPU2[---] -> GPU3[---]
# Time 3: GPU0[MB3] -> GPU1[MB2] -> GPU2[MB1] -> GPU3[---]
# Time 4: GPU0[MB4] -> GPU1[MB3] -> GPU2[MB2] -> GPU3[MB1]
# Time 5: GPU0[---] -> GPU1[MB4] -> GPU2[MB3] -> GPU3[MB2]
# Time 6: GPU0[---] -> GPU1[---] -> GPU2[MB4] -> GPU3[MB3]
# Time 7: GPU0[---] -> GPU1[---] -> GPU2[---] -> GPU3[MB4]
优点:
- 提高GPU利用率
- 减少流水线气泡
- 适合大模型训练
缺点:
- 需要将batch拆分为micro-batch
- 增加内存占用(需保存中间激活值)
1.3 梯度同步机制
AllReduce算法
数据并行中最常用的梯度同步方式。
# AllReduce原理
# 假设4个GPU,每个有梯度向量g
# GPU 0: g0 = [1, 2, 3]
# GPU 1: g1 = [4, 5, 6]
# GPU 2: g2 = [7, 8, 9]
# GPU 3: g3 = [10, 11, 12]
# AllReduce后,所有GPU得到:
# g_avg = (g0 + g1 + g2 + g3) / 4 = [5.5, 6.5, 7.5]
Ring-AllReduce实现:
import torch.distributed as dist
def ring_allreduce(tensor, rank, world_size):
"""
Ring-AllReduce算法实现
分两个阶段:
1. Reduce-Scatter:每个节点得到部分和
2. AllGather:收集所有部分和
"""
# 将tensor分成world_size个chunk
chunks = tensor.chunk(world_size)
# Phase 1: Reduce-Scatter
for i in range(world_size - 1):
send_rank = (rank - i) % world_size
recv_rank = (rank - i - 1) % world_size
# 发送和接收chunk
send_chunk = chunks[send_rank]
recv_chunk = torch.zeros_like(send_chunk)
# 非阻塞发送接收
dist.isend(send_chunk, dst=(rank + 1) % world_size)
dist.irecv(recv_chunk, src=(rank - 1) % world_size)
# 累加接收到的chunk
chunks[recv_rank] += recv_chunk
# Phase 2: AllGather
for i in range(world_size - 1):
send_rank = (rank + 1 - i) % world_size
recv_rank = (rank - i) % world_size
dist.isend(chunks[send_rank], dst=(rank + 1) % world_size)
dist.irecv(chunks[recv_rank], src=(rank - 1) % world_size)
return torch.cat(chunks)
通信复杂度分析:
带宽利用率 = 2(N-1)S / N = 2S(1 - 1/N)
其中:
- N: GPU数量
- S: 数据大小
对于8个GPU:
- 带宽利用率 = 2S × 7/8 = 1.75S
- 相比naive AllReduce (N×S = 8S),减少4.5倍通信量
2. 单机多GPU训练
2.1 DataParallel (DP)
PyTorch最简单的多GPU训练方式,但性能较差。
import torch
import torch.nn as nn
# DataParallel示例
model = MyModel()
if torch.cuda.device_count() > 1:
print(f"使用 {torch.cuda.device_count()} 个GPU")
model = nn.DataParallel(model)
model = model.to('cuda')
# 训练循环
for inputs, labels in dataloader:
inputs = inputs.to('cuda')
labels = labels.to('cuda')
outputs = model(inputs)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
DataParallel的问题:
主GPU负载不均:
- GPU 0负责分发数据和收集结果
- GPU 0显存占用明显高于其他GPU
Python GIL限制:
- 多线程实现受GIL限制
- 无法充分利用多核CPU
通信效率低:
- 每次迭代都需要从GPU 0分发模型
- 梯度收集到GPU 0再更新
# DataParallel工作流程
# Forward:
# GPU 0: 分发input到所有GPU
# All GPUs: 并行计算forward
# GPU 0: 收集outputs并计算loss
#
# Backward:
# GPU 0: 分发loss到所有GPU
# All GPUs: 并行计算backward
# GPU 0: 收集梯度并更新参数
# GPU 0: 将更新后的参数广播到所有GPU
2.2 DistributedDataParallel (DDP)
PyTorch推荐的多GPU训练方式,性能优越。
DDP基本原理
# DDP工作流程
# 初始化:
# 每个进程独立启动
# 每个进程持有完整模型副本
# 通过进程组进行通信
# Forward:
# 各进程独立计算forward
# 不需要GPU 0协调
# Backward:
# 各进程独立计算backward
# 自动在backward过程中同步梯度(使用AllReduce)
# 梯度同步与计算overlap,提高效率
# Update:
# 各进程使用同步后的梯度独立更新参数
# 因为梯度相同,更新后参数保持一致
完整PyTorch DDP代码
# train_ddp.py
import os
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
from torchvision import datasets, transforms, models
def setup(rank, world_size):
"""
初始化分布式环境
Args:
rank: 当前进程的rank(0到world_size-1)
world_size: 总进程数
"""
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# 初始化进程组
# backend='nccl' 用于GPU训练(最快)
# backend='gloo' 用于CPU训练
dist.init_process_group("nccl", rank=rank, world_size=world_size)
# 设置当前进程使用的GPU
torch.cuda.set_device(rank)
def cleanup():
"""清理分布式环境"""
dist.destroy_process_group()
def create_dataloader(rank, world_size, batch_size):
"""
创建分布式数据加载器
DistributedSampler确保每个进程获得不同的数据子集
"""
transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
# 训练集
train_dataset = datasets.ImageFolder(
root='/path/to/imagenet/train',
transform=transform
)
# DistributedSampler自动分割数据
train_sampler = DistributedSampler(
train_dataset,
num_replicas=world_size,
rank=rank,
shuffle=True,
seed=42
)
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
sampler=train_sampler,
num_workers=4,
pin_memory=True,
drop_last=True
)
return train_loader, train_sampler
def train_epoch(model, dataloader, criterion, optimizer, rank, epoch):
"""训练一个epoch"""
model.train()
total_loss = 0
total_correct = 0
total_samples = 0
for batch_idx, (inputs, labels) in enumerate(dataloader):
inputs = inputs.to(rank)
labels = labels.to(rank)
# Forward pass
outputs = model(inputs)
loss = criterion(outputs, labels)
# Backward pass
optimizer.zero_grad()
loss.backward()
# 梯度裁剪(可选)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
# 统计
total_loss += loss.item()
_, predicted = outputs.max(1)
total_correct += predicted.eq(labels).sum().item()
total_samples += labels.size(0)
# 只在rank 0打印
if rank == 0 and batch_idx % 100 == 0:
print(f'Epoch: {epoch} | Batch: {batch_idx}/{len(dataloader)} | '
f'Loss: {loss.item():.4f} | '
f'Acc: {100.*total_correct/total_samples:.2f}%')
# 跨进程同步统计信息
avg_loss = total_loss / len(dataloader)
avg_acc = 100. * total_correct / total_samples
# 将所有进程的loss和acc收集到rank 0
loss_tensor = torch.tensor([avg_loss], device=rank)
acc_tensor = torch.tensor([avg_acc], device=rank)
dist.reduce(loss_tensor, dst=0, op=dist.ReduceOp.SUM)
dist.reduce(acc_tensor, dst=0, op=dist.ReduceOp.SUM)
if rank == 0:
world_size = dist.get_world_size()
print(f'\nEpoch {epoch} Summary:')
print(f'Average Loss: {loss_tensor.item()/world_size:.4f}')
print(f'Average Acc: {acc_tensor.item()/world_size:.2f}%\n')
return avg_loss, avg_acc
def save_checkpoint(model, optimizer, epoch, rank):
"""保存检查点(只在rank 0保存)"""
if rank == 0:
checkpoint = {
'epoch': epoch,
'model_state_dict': model.module.state_dict(), # 注意使用.module
'optimizer_state_dict': optimizer.state_dict(),
}
torch.save(checkpoint, f'checkpoint_epoch_{epoch}.pt')
print(f'Checkpoint saved at epoch {epoch}')
def main_worker(rank, world_size, args):
"""
每个GPU上运行的主函数
Args:
rank: 当前进程的GPU ID
world_size: 总GPU数量
args: 训练参数
"""
print(f"Running DDP on rank {rank}.")
# 1. 初始化分布式环境
setup(rank, world_size)
# 2. 创建模型
model = models.resnet50(pretrained=False, num_classes=1000)
model = model.to(rank)
# 3. 包装为DDP模型
# find_unused_parameters=False 提高性能
# broadcast_buffers=True 同步BN等的buffer
model = DDP(
model,
device_ids=[rank],
output_device=rank,
find_unused_parameters=False,
broadcast_buffers=True
)
# 4. 定义损失函数和优化器
criterion = nn.CrossEntropyLoss().to(rank)
# 学习率需要根据GPU数量调整
base_lr = 0.1
lr = base_lr * world_size # 线性缩放规则
optimizer = torch.optim.SGD(
model.parameters(),
lr=lr,
momentum=0.9,
weight_decay=1e-4
)
# 学习率调度器
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer,
T_max=args.epochs
)
# 5. 创建数据加载器
train_loader, train_sampler = create_dataloader(
rank, world_size, args.batch_size
)
# 6. 训练循环
for epoch in range(args.epochs):
# 设置epoch确保每个epoch的数据shuffle不同
train_sampler.set_epoch(epoch)
# 训练一个epoch
avg_loss, avg_acc = train_epoch(
model, train_loader, criterion, optimizer, rank, epoch
)
# 更新学习率
scheduler.step()
# 保存检查点
if (epoch + 1) % args.save_freq == 0:
save_checkpoint(model, optimizer, epoch, rank)
# 7. 清理
cleanup()
class Args:
"""训练参数"""
epochs = 90
batch_size = 32 # 每个GPU的batch size
save_freq = 10
if __name__ == "__main__":
# 获取GPU数量
world_size = torch.cuda.device_count()
print(f"Using {world_size} GPUs")
args = Args()
# 使用torch.multiprocessing启动多进程
import torch.multiprocessing as mp
mp.spawn(
main_worker,
args=(world_size, args),
nprocs=world_size,
join=True
)
使用torchrun启动(推荐)
# 单机8卡训练
torchrun \
--standalone \
--nnodes=1 \
--nproc_per_node=8 \
train_ddp.py
# torchrun会自动设置以下环境变量:
# RANK: 全局进程rank
# LOCAL_RANK: 节点内进程rank
# WORLD_SIZE: 总进程数
# MASTER_ADDR: 主节点地址
# MASTER_PORT: 主节点端口
修改代码以使用torchrun:
import os
def setup():
"""从环境变量读取分布式信息"""
# torchrun会设置这些环境变量
rank = int(os.environ['RANK'])
local_rank = int(os.environ['LOCAL_RANK'])
world_size = int(os.environ['WORLD_SIZE'])
# 初始化进程组
dist.init_process_group("nccl")
# 设置当前进程使用的GPU
torch.cuda.set_device(local_rank)
return rank, local_rank, world_size
def main():
rank, local_rank, world_size = setup()
print(f"Process {rank}/{world_size} on GPU {local_rank}")
# 创建模型
model = models.resnet50().to(local_rank)
model = DDP(model, device_ids=[local_rank])
# ... 训练代码 ...
cleanup()
if __name__ == "__main__":
main()
2.3 性能对比和最佳实践
性能对比
# 测试代码:单机8卡ResNet50训练
import time
def benchmark(mode='dp'):
"""
测试不同并行模式的性能
"""
batch_size = 256
num_batches = 100
if mode == 'single':
# 单GPU基准
model = models.resnet50().cuda()
data = torch.randn(batch_size, 3, 224, 224).cuda()
start = time.time()
for _ in range(num_batches):
output = model(data)
loss = output.sum()
loss.backward()
torch.cuda.synchronize()
elapsed = time.time() - start
elif mode == 'dp':
# DataParallel
model = nn.DataParallel(models.resnet50()).cuda()
data = torch.randn(batch_size, 3, 224, 224).cuda()
start = time.time()
for _ in range(num_batches):
output = model(data)
loss = output.sum()
loss.backward()
torch.cuda.synchronize()
elapsed = time.time() - start
elif mode == 'ddp':
# DistributedDataParallel
# 需要在8个进程中分别运行
pass
throughput = num_batches * batch_size / elapsed
print(f'{mode.upper()} - Time: {elapsed:.2f}s, Throughput: {throughput:.2f} images/s')
# 实测结果(8x V100 32GB):
# SINGLE - Time: 45.2s, Throughput: 566 images/s
# DP - Time: 28.1s, Throughput: 911 images/s (1.6x加速)
# DDP - Time: 6.8s, Throughput: 3765 images/s (6.6x加速)
加速比分析:
理想加速比 = GPU数量 = 8
实际加速比 = 6.6
效率 = 实际加速比 / 理想加速比 = 82.5%
损失来源:
1. 通信开销:~10%
2. 同步开销:~5%
3. 负载不均:~2.5%
最佳实践
1. 批量大小调整
# 线性缩放规则
# 当GPU数量增加时,等比例增加batch size和学习率
# 单GPU配置
base_batch_size = 32
base_lr = 0.1
# 8 GPU配置
num_gpus = 8
batch_size = base_batch_size # 每个GPU的batch size保持不变
effective_batch_size = batch_size * num_gpus # 256
# 学习率缩放
lr = base_lr * num_gpus # 0.8
# 或使用warmup + 线性缩放
lr = base_lr * math.sqrt(num_gpus) # 更稳定
2. 梯度累积
# 当显存不足时,使用梯度累积模拟大batch size
accumulation_steps = 4
optimizer.zero_grad()
for i, (inputs, labels) in enumerate(dataloader):
inputs = inputs.to(rank)
labels = labels.to(rank)
# Forward
outputs = model(inputs)
loss = criterion(outputs, labels)
# 损失归一化
loss = loss / accumulation_steps
# Backward
loss.backward()
# 每accumulation_steps步更新一次
if (i + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
3. 混合精度训练
from torch.cuda.amp import autocast, GradScaler
# 创建GradScaler
scaler = GradScaler()
for inputs, labels in dataloader:
inputs = inputs.to(rank)
labels = labels.to(rank)
optimizer.zero_grad()
# 自动混合精度
with autocast():
outputs = model(inputs)
loss = criterion(outputs, labels)
# 缩放loss并反向传播
scaler.scale(loss).backward()
# 梯度裁剪(在unscale后)
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# 更新参数
scaler.step(optimizer)
scaler.update()
# 混合精度可以:
# 1. 减少显存占用(约50%)
# 2. 提高训练速度(1.5-3x)
# 3. 保持模型精度
4. 高效的数据加载
# 优化DataLoader
train_loader = DataLoader(
dataset,
batch_size=batch_size,
sampler=train_sampler,
num_workers=4, # 每个GPU 4个worker
pin_memory=True, # 加速CPU到GPU传输
drop_last=True, # 丢弃最后不完整的batch
persistent_workers=True, # 保持worker进程(PyTorch 1.7+)
prefetch_factor=2 # 预取2个batch
)
# 使用DALI加速数据加载(可选)
from nvidia.dali.plugin.pytorch import DALIClassificationIterator
from nvidia.dali import pipeline_def, fn, types
@pipeline_def
def create_dali_pipeline(data_dir, crop, size):
images, labels = fn.readers.file(
file_root=data_dir,
random_shuffle=True,
name="Reader"
)
images = fn.decoders.image(images, device="mixed")
images = fn.resize(images, size=size)
images = fn.crop_mirror_normalize(
images,
crop=crop,
mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
std=[0.229 * 255, 0.224 * 255, 0.225 * 255]
)
return images, labels
5. 通信优化
# 梯度压缩(减少通信量)
from torch.distributed.algorithms.ddp_comm_hooks import default_hooks
model = DDP(model, device_ids=[rank])
# 使用FP16压缩梯度
model.register_comm_hook(
state=None,
hook=default_hooks.fp16_compress_hook
)
# 或使用PowerSGD低秩近似
from torch.distributed.algorithms.ddp_comm_hooks import powerSGD_hook
model.register_comm_hook(
state=powerSGD_hook.PowerSGDState(
process_group=None,
matrix_approximation_rank=32
),
hook=powerSGD_hook.powerSGD_hook
)
3. 多机多GPU训练
3.1 集群环境配置
网络拓扑
+----------------+ +----------------+
| Node 0 | | Node 1 |
| +---------+ | | +---------+ |
| | GPU 0-3 | |<--------->| | GPU 0-3 | |
| +---------+ | InfiniBand| +---------+ |
| +---------+ | 或 RDMA | +---------+ |
| | GPU 4-7 | | | | GPU 4-7 | |
| +---------+ | | +---------+ |
+----------------+ +----------------+
主机配置
# /etc/hosts - 所有节点
192.168.1.100 node0
192.168.1.101 node1
192.168.1.102 node2
192.168.1.103 node3
# SSH免密登录配置
ssh-keygen -t rsa -N "" -f ~/.ssh/id_rsa
ssh-copy-id node0
ssh-copy-id node1
ssh-copy-id node2
ssh-copy-id node3
# 测试连接
pdsh -w node[0-3] hostname
环境同步
# 使用NFS共享代码和数据
# Node 0 (Master)
sudo apt-get install nfs-kernel-server
sudo mkdir -p /nfs/shared
sudo chmod 777 /nfs/shared
# 编辑 /etc/exports
echo "/nfs/shared node[0-3](rw,sync,no_subtree_check,no_root_squash)" | sudo tee -a /etc/exports
sudo exportfs -ra
sudo systemctl restart nfs-kernel-server
# 其他节点
sudo apt-get install nfs-common
sudo mkdir -p /nfs/shared
sudo mount node0:/nfs/shared /nfs/shared
# 验证
df -h | grep nfs
3.2 NCCL后端通信
NCCL简介
NCCL(NVIDIA Collective Communications Library)是NVIDIA优化的集合通信库。
# NCCL支持的操作
# - AllReduce: 所有设备归约并广播结果
# - Broadcast: 从一个设备广播到所有设备
# - Reduce: 归约到一个设备
# - AllGather: 收集所有设备的数据
# - ReduceScatter: 归约并分散结果
import torch.distributed as dist
# 初始化NCCL
dist.init_process_group(
backend='nccl',
init_method='tcp://node0:23456',
world_size=16, # 4节点 × 4 GPU
rank=rank
)
NCCL环境变量优化
# train.sh
export NCCL_DEBUG=INFO # 调试信息
export NCCL_DEBUG_SUBSYS=ALL # 所有子系统
export NCCL_IB_DISABLE=0 # 启用InfiniBand
export NCCL_IB_HCA=mlx5_0 # IB设备
export NCCL_SOCKET_IFNAME=eth0 # 网络接口
export NCCL_P2P_LEVEL=NVL # P2P级别(NVL/PIX/PHB/SYS)
# 性能优化
export NCCL_MIN_NCHANNELS=4 # 最小通道数
export NCCL_MAX_NCHANNELS=16 # 最大通道数
export NCCL_IB_GID_INDEX=3 # IB GID索引
export NCCL_IB_TIMEOUT=22 # IB超时时间
# 拓扑发现
export NCCL_TOPO_FILE=/path/to/topo.xml
NCCL拓扑配置
<!-- topo.xml - 定义集群拓扑 -->
<system version="1">
<cpu numaid="0" affinity="0000ffff" arch="x86_64" vendor="GenuineIntel">
<pci busid="0000:00:00.0" class="0x060000" link_speed="16 GT/s" link_width="16">
<gpu dev="0" sm="80" rank="0" gdr="1">
<nvlink target="1" count="12" />
<nvlink target="2" count="12" />
</gpu>
<gpu dev="1" sm="80" rank="1" gdr="1">
<nvlink target="0" count="12" />
<nvlink target="3" count="12" />
</gpu>
<nic dev="mlx5_0">
<net name="ib0" port="1" guid="0x506b4b03005c4d90" speed="100000" latency="1.0" />
</nic>
</pci>
</cpu>
</system>
3.3 启动脚本(torchrun)
单节点启动
#!/bin/bash
# run_single_node.sh
# 设置环境变量
export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
# 使用torchrun启动
torchrun \
--standalone \
--nnodes=1 \
--nproc_per_node=8 \
train.py \
--batch-size 32 \
--epochs 90 \
--lr 0.8
多节点启动
#!/bin/bash
# run_multi_node.sh
# Master节点信息
MASTER_ADDR="node0"
MASTER_PORT=29500
# 集群配置
NNODES=4
NPROC_PER_NODE=8
WORLD_SIZE=$((NNODES * NPROC_PER_NODE))
# 获取当前节点rank
NODE_RANK=$1
# 启动训练
torchrun \
--nnodes=$NNODES \
--nproc_per_node=$NPROC_PER_NODE \
--node_rank=$NODE_RANK \
--master_addr=$MASTER_ADDR \
--master_port=$MASTER_PORT \
train.py \
--batch-size 32 \
--epochs 90 \
--lr 3.2
# 在每个节点上执行:
# node0: bash run_multi_node.sh 0
# node1: bash run_multi_node.sh 1
# node2: bash run_multi_node.sh 2
# node3: bash run_multi_node.sh 3
使用pdsh批量启动
#!/bin/bash
# launch_all.sh
MASTER_ADDR="node0"
MASTER_PORT=29500
NNODES=4
NPROC_PER_NODE=8
# 在所有节点上启动
for NODE_RANK in {0..3}; do
NODE="node${NODE_RANK}"
pdsh -w $NODE "cd /nfs/shared/project && \
torchrun \
--nnodes=$NNODES \
--nproc_per_node=$NPROC_PER_NODE \
--node_rank=$NODE_RANK \
--master_addr=$MASTER_ADDR \
--master_port=$MASTER_PORT \
train.py \
--batch-size 32 \
--epochs 90 \
--lr 3.2 \
> logs/node${NODE_RANK}.log 2>&1" &
done
wait
echo "All nodes started"
3.4 节点间通信优化
梯度分桶(Gradient Bucketing)
# DDP自动将梯度分组为多个bucket
# 每个bucket独立进行AllReduce,实现通信与计算overlap
model = DDP(
model,
device_ids=[local_rank],
bucket_cap_mb=25, # 每个bucket最大25MB
gradient_as_bucket_view=True # 避免内存拷贝
)
# 原理:
# Backward过程中:
# 1. 计算layer N的梯度
# 2. 将梯度加入bucket
# 3. 当bucket满时,启动AllReduce(异步)
# 4. 继续计算layer N-1的梯度
#
# 这样AllReduce与梯度计算overlap,隐藏通信时间
梯度压缩
# 使用梯度压缩减少通信量
from torch.distributed.algorithms.ddp_comm_hooks import default_hooks
# FP16压缩
def fp16_compress_hook(state, bucket):
"""
将FP32梯度压缩为FP16
减少50%通信量
"""
# 压缩
compressed_tensor = bucket.buffer().to(torch.float16)
# AllReduce
dist.all_reduce(compressed_tensor)
# 解压缩
bucket.buffer().copy_(compressed_tensor.to(torch.float32))
return bucket.buffer()
model.register_comm_hook(state=None, hook=fp16_compress_hook)
# PowerSGD低秩压缩
from torch.distributed.algorithms.ddp_comm_hooks import powerSGD_hook
state = powerSGD_hook.PowerSGDState(
process_group=None,
matrix_approximation_rank=32, # 秩
start_powerSGD_iter=10, # 从第10次迭代开始使用
)
model.register_comm_hook(state=state, hook=powerSGD_hook.powerSGD_hook)
# PowerSGD原理:
# 梯度矩阵 G ≈ P × Q^T
# 其中 P (m×r), Q (n×r), r << min(m,n)
#
# 通信量从 m×n 减少到 (m+n)×r
# 例如:m=n=4096, r=32
# 压缩率 = 4096*4096 / (4096+4096)*32 = 64倍
异步通信
# 使用异步通信overlap计算和通信
class AsyncDDP:
def __init__(self, model, bucket_size=25*1024*1024):
self.model = model
self.bucket_size = bucket_size
self.buckets = []
self.handles = []
# 注册backward hook
for param in model.parameters():
if param.requires_grad:
param.register_hook(self._make_hook(param))
def _make_hook(self, param):
def hook(grad):
# 将梯度加入bucket
self.buckets.append(grad)
# 当bucket满时,启动异步AllReduce
if sum(b.numel() * b.element_size() for b in self.buckets) >= self.bucket_size:
self._launch_allreduce()
return hook
def _launch_allreduce(self):
if not self.buckets:
return
# 合并bucket
flat_grad = torch.cat([b.flatten() for b in self.buckets])
# 异步AllReduce
handle = dist.all_reduce(flat_grad, async_op=True)
self.handles.append((handle, flat_grad, self.buckets))
self.buckets = []
def synchronize(self):
"""等待所有通信完成"""
for handle, flat_grad, buckets in self.handles:
handle.wait()
# 拆分并写回
offset = 0
for bucket in buckets:
numel = bucket.numel()
bucket.copy_(flat_grad[offset:offset+numel].view_as(bucket))
offset += numel
self.handles = []
# 使用
async_model = AsyncDDP(model)
for inputs, labels in dataloader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward() # 触发异步AllReduce
async_model.synchronize() # 等待通信完成
optimizer.step()
4. 大模型训练技术
4.1 ZeRO优化器(DeepSpeed)
ZeRO原理
ZeRO(Zero Redundancy Optimizer)通过分片优化器状态、梯度和参数来减少显存占用。
# 显存占用分析(以Adam优化器为例)
# 假设模型参数量为 Ψ
# 标准训练:
# - 模型参数(FP32): 4Ψ bytes
# - 梯度(FP32): 4Ψ bytes
# - 优化器状态(FP32 momentum + variance): 8Ψ bytes
# 总计:16Ψ bytes
# 混合精度训练:
# - 模型参数(FP16): 2Ψ bytes
# - 模型参数(FP32 master copy): 4Ψ bytes
# - 梯度(FP16): 2Ψ bytes
# - 梯度(FP32): 4Ψ bytes
# - 优化器状态(FP32): 8Ψ bytes
# 总计:20Ψ bytes (更大!)
ZeRO的三个阶段:
# ZeRO-1: 优化器状态分片
# 每个GPU只保存 1/N 的优化器状态
# 显存节约:8Ψ → 8Ψ/N
# ZeRO-2: 梯度分片
# 每个GPU只保存 1/N 的梯度
# 显存节约:(8Ψ + 4Ψ) → 8Ψ/N + 4Ψ/N
# ZeRO-3: 参数分片
# 每个GPU只保存 1/N 的参数
# 显存节约:(8Ψ + 4Ψ + 4Ψ) → (8Ψ + 4Ψ + 4Ψ)/N
DeepSpeed配置
{
"train_batch_size": 256,
"train_micro_batch_size_per_gpu": 8,
"gradient_accumulation_steps": 4,
"optimizer": {
"type": "Adam",
"params": {
"lr": 0.001,
"betas": [0.9, 0.999],
"eps": 1e-8,
"weight_decay": 0.0
}
},
"scheduler": {
"type": "WarmupLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": 0.001,
"warmup_num_steps": 1000
}
},
"fp16": {
"enabled": true,
"loss_scale": 0,
"loss_scale_window": 1000,
"hysteresis": 2,
"min_loss_scale": 1
},
"zero_optimization": {
"stage": 3,
"offload_optimizer": {
"device": "cpu",
"pin_memory": true
},
"offload_param": {
"device": "cpu",
"pin_memory": true
},
"overlap_comm": true,
"contiguous_gradients": true,
"sub_group_size": 1e9,
"reduce_bucket_size": 5e8,
"stage3_prefetch_bucket_size": 5e8,
"stage3_param_persistence_threshold": 1e6,
"stage3_max_live_parameters": 1e9,
"stage3_max_reuse_distance": 1e9,
"stage3_gather_16bit_weights_on_model_save": true
},
"gradient_clipping": 1.0,
"wall_clock_breakdown": false
}
DeepSpeed训练代码
import deepspeed
from deepspeed.ops.adam import DeepSpeedCPUAdam
def create_model():
"""创建大模型"""
from transformers import GPT2LMHeadModel, GPT2Config
config = GPT2Config(
vocab_size=50257,
n_positions=1024,
n_embd=2048,
n_layer=48,
n_head=32
)
model = GPT2LMHeadModel(config)
return model
def train_deepspeed():
# 初始化分布式
deepspeed.init_distributed()
# 创建模型
model = create_model()
# 定义优化器(使用CPU优化器)
optimizer = DeepSpeedCPUAdam(model.parameters(), lr=1e-4)
# 创建DeepSpeed引擎
model_engine, optimizer, train_loader, _ = deepspeed.initialize(
model=model,
optimizer=optimizer,
config='deepspeed_config.json',
training_data=train_dataset
)
# 训练循环
for epoch in range(num_epochs):
for batch in train_loader:
inputs = batch['input_ids'].to(model_engine.device)
labels = batch['labels'].to(model_engine.device)
# Forward
outputs = model_engine(inputs, labels=labels)
loss = outputs.loss
# Backward(包含梯度缩放和裁剪)
model_engine.backward(loss)
# 参数更新
model_engine.step()
# 保存模型
model_engine.save_checkpoint('checkpoints', tag=f'epoch_{epoch}')
if __name__ == "__main__":
train_deepspeed()
启动DeepSpeed训练:
# 单机8卡
deepspeed --num_gpus=8 train_deepspeed.py
# 多机训练
deepspeed --hostfile=hostfile \
--num_gpus=8 \
--num_nodes=4 \
train_deepspeed.py
# hostfile格式:
# node0 slots=8
# node1 slots=8
# node2 slots=8
# node3 slots=8
4.2 3D并行(数据+模型+流水线)
3D并行原理
# 3D并行组合:
# - 数据并行(Data Parallelism, DP): 跨多个GPU复制模型
# - 模型并行(Tensor Parallelism, TP): 将单层切分到多个GPU
# - 流水线并行(Pipeline Parallelism, PP): 将多层切分到多个GPU64个GPU训练GPT-3
# DP=8, TP=4, PP=2
#
# Pipeline Stage 0 (Layer 1-48):
# DP Group 0: [GPU 0-3] (4-way tensor parallel)
# DP Group 1: [GPU 4-7] (4-way tensor parallel)
# ...
# DP Group 7: [GPU 28-31] (4-way tensor parallel)
#
# Pipeline Stage 1 (Layer 49-96):
# DP Group 0: [GPU 32-35]
# DP Group 1: [GPU 36-39]
# ...
# DP Group 7: [GPU 60-63]
Megatron-LM实现
# 张量并行(Tensor Parallelism)实现
import torch
import torch.nn as nn
import torch.distributed as dist
class ColumnParallelLinear(nn.Module):
"""
列并行Linear层
将权重矩阵按列切分到多个GPU
Y = XW + b
W = [W_0, W_1, ..., W_{n-1}]
GPU i计算:Y_i = XW_i + b_i
最后拼接:Y = [Y_0, Y_1, ..., Y_{n-1}]
"""
def __init__(self, input_size, output_size, tensor_parallel_size):
super().__init__()
self.input_size = input_size
self.output_size = output_size
self.tensor_parallel_size = tensor_parallel_size
# 每个GPU的输出维度
self.output_size_per_partition = output_size // tensor_parallel_size
# 创建权重(只包含当前GPU的部分)
self.weight = nn.Parameter(
torch.empty(self.output_size_per_partition, input_size)
)
self.bias = nn.Parameter(
torch.empty(self.output_size_per_partition)
)
self._initialize_weights()
def _initialize_weights(self):
nn.init.xavier_normal_(self.weight)
nn.init.zeros_(self.bias)
def forward(self, x):
# 输入 x: [batch, seq_len, input_size]
# 在所有GPU上广播输入(如果还没有)
# 注:实际实现中,输入已经在所有GPU上复制
# 矩阵乘法(每个GPU只计算部分输出)
output = torch.matmul(x, self.weight.t())
output = output + self.bias
# output: [batch, seq_len, output_size_per_partition]
return output
class RowParallelLinear(nn.Module):
"""
行并行Linear层
将权重矩阵按行切分到多个GPU
Y = XW + b
W = [W_0]
[W_1]
[...]
[W_{n-1}]
输入X也被切分:X = [X_0, X_1, ..., X_{n-1}]
GPU i计算:Y_i = X_i W_i
最后归约:Y = sum(Y_i) + b
"""
def __init__(self, input_size, output_size, tensor_parallel_size):
super().__init__()
self.input_size = input_size
self.output_size = output_size
self.tensor_parallel_size = tensor_parallel_size
# 每个GPU的输入维度
self.input_size_per_partition = input_size // tensor_parallel_size
# 创建权重
self.weight = nn.Parameter(
torch.empty(output_size, self.input_size_per_partition)
)
self.bias = nn.Parameter(torch.empty(output_size))
self._initialize_weights()
def _initialize_weights(self):
nn.init.xavier_normal_(self.weight)
nn.init.zeros_(self.bias)
def forward(self, x):
# 输入 x: [batch, seq_len, input_size_per_partition]
# 每个GPU只有部分输入
# 矩阵乘法
output = torch.matmul(x, self.weight.t())
# AllReduce求和(跨tensor parallel组)
dist.all_reduce(output, group=self.tensor_parallel_group)
# 只在一个GPU上加bias
if dist.get_rank(self.tensor_parallel_group) == 0:
output = output + self.bias
return output
class ParallelAttention(nn.Module):
"""
并行Multi-Head Attention
"""
def __init__(self, hidden_size, num_heads, tensor_parallel_size):
super().__init__()
self.hidden_size = hidden_size
self.num_heads = num_heads
self.tensor_parallel_size = tensor_parallel_size
# 每个GPU的head数量
self.num_heads_per_partition = num_heads // tensor_parallel_size
self.head_dim = hidden_size // num_heads
# QKV投影(列并行)
self.qkv_proj = ColumnParallelLinear(
hidden_size,
3 * hidden_size,
tensor_parallel_size
)
# 输出投影(行并行)
self.out_proj = RowParallelLinear(
hidden_size,
hidden_size,
tensor_parallel_size
)
def forward(self, x, mask=None):
batch_size, seq_len, _ = x.shape
# QKV投影
qkv = self.qkv_proj(x)
# 分离Q, K, V
qkv = qkv.reshape(
batch_size, seq_len,
self.num_heads_per_partition, 3 * self.head_dim
)
qkv = qkv.permute(0, 2, 1, 3) # [batch, heads, seq, 3*head_dim]
q, k, v = qkv.chunk(3, dim=-1)
# Scaled dot-product attention
scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim)
if mask is not None:
scores = scores.masked_fill(mask == 0, float('-inf'))
attn = F.softmax(scores, dim=-1)
# 应用attention
output = torch.matmul(attn, v)
# 重塑
output = output.permute(0, 2, 1, 3).contiguous()
output = output.reshape(batch_size, seq_len, -1)
# 输出投影
output = self.out_proj(output)
return output
class ParallelMLP(nn.Module):
"""
并行MLP(Feed-Forward)
"""
def __init__(self, hidden_size, intermediate_size, tensor_parallel_size):
super().__init__()
# 第一层:列并行
self.fc1 = ColumnParallelLinear(
hidden_size,
intermediate_size,
tensor_parallel_size
)
# 第二层:行并行
self.fc2 = RowParallelLinear(
intermediate_size,
hidden_size,
tensor_parallel_size
)
def forward(self, x):
x = self.fc1(x)
x = F.gelu(x)
x = self.fc2(x)
return x
流水线并行实现
class PipelineParallelModel(nn.Module):
"""
流水线并行模型
将模型切分为多个stage,每个stage在不同GPU上
"""
def __init__(self, num_layers, hidden_size, num_stages):
super().__init__()
self.num_layers = num_layers
self.num_stages = num_stages
self.layers_per_stage = num_layers // num_stages
# 获取当前stage
self.stage_id = self._get_stage_id()
# 创建当前stage的层
start_layer = self.stage_id * self.layers_per_stage
end_layer = start_layer + self.layers_per_stage
self.layers = nn.ModuleList([
TransformerLayer(hidden_size)
for _ in range(start_layer, end_layer)
])
def _get_stage_id(self):
"""根据rank确定stage ID"""
rank = dist.get_rank()
world_size = dist.get_world_size()
return rank % self.num_stages
def forward(self, x):
"""前向传播(单个stage)"""
for layer in self.layers:
x = layer(x)
return x
class PipelineEngine:
"""
流水线引擎:管理micro-batch的调度
"""
def __init__(self, model, num_microbatches, num_stages):
self.model = model
self.num_microbatches = num_microbatches
self.num_stages = num_stages
self.stage_id = model.stage_id
def train_batch(self, batch):
"""
训练一个batch(使用GPipe调度)
"""
# 将batch分成多个micro-batch
microbatches = self._split_batch(batch, self.num_microbatches)
# Forward阶段
activations = []
for mb in microbatches:
if self.stage_id == 0:
# 第一个stage:处理输入
act = self.model(mb)
else:
# 其他stage:接收上一stage的输出
act = self._recv_activation()
act = self.model(act)
# 发送到下一stage(如果不是最后一个stage)
if self.stage_id < self.num_stages - 1:
self._send_activation(act)
activations.append(act)
# Backward阶段
for i in range(self.num_microbatches - 1, -1, -1):
if self.stage_id == self.num_stages - 1:
# 最后一个stage:计算loss
loss = self._compute_loss(activations[i])
grad = torch.autograd.grad(loss, activations[i])[0]
else:
# 其他stage:接收梯度
grad = self._recv_grad()
# Backward
activations[i].backward(grad)
# 发送梯度到上一stage(如果不是第一个stage)
if self.stage_id > 0:
grad_input = activations[i].grad
self._send_grad(grad_input)
def _split_batch(self, batch, num_microbatches):
"""将batch分成micro-batches"""
batch_size = batch.size(0)
microbatch_size = batch_size // num_microbatches
return [
batch[i*microbatch_size:(i+1)*microbatch_size]
for i in range(num_microbatches)
]
def _send_activation(self, act):
"""发送activation到下一stage"""
next_rank = self.stage_id + 1
dist.send(act, dst=next_rank)
def _recv_activation(self):
"""从上一stage接收activation"""
prev_rank = self.stage_id - 1
act = torch.empty_like(...) # 需要知道shape
dist.recv(act, src=prev_rank)
return act
def _send_grad(self, grad):
"""发送梯度到上一stage"""
prev_rank = self.stage_id - 1
dist.send(grad, dst=prev_rank)
def _recv_grad(self):
"""从下一stage接收梯度"""
next_rank = self.stage_id + 1
grad = torch.empty_like(...)
dist.recv(grad, src=next_rank)
return grad
4.3 FSDP(全分片数据并行)
FSDP原理
PyTorch原生的ZeRO-3实现。
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp import CPUOffload
from torch.distributed.fsdp.wrap import size_based_auto_wrap_policy
# FSDP配置
fsdp_config = {
# 自动包装策略:按参数量分组
'auto_wrap_policy': size_based_auto_wrap_policy,
'min_num_params': 1e6, # 至少1M参数才包装
# CPU卸载
'cpu_offload': CPUOffload(offload_params=True),
# 混合精度
'mixed_precision': True,
# Backward预取
'backward_prefetch': 'BACKWARD_PRE',
# Forward预取
'forward_prefetch': True,
}
# 使用FSDP包装模型
model = MyLargeModel()
model = FSDP(model, **fsdp_config)
# 训练
for batch in dataloader:
optimizer.zero_grad()
outputs = model(batch)
loss = criterion(outputs, batch['labels'])
loss.backward()
optimizer.step()
FSDP完整示例
import torch
import torch.nn as nn
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp import MixedPrecision, BackwardPrefetch, ShardingStrategy
from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy
from transformers import GPT2LMHeadModel, GPT2Config
from functools import partial
def setup_fsdp():
"""初始化FSDP环境"""
dist.init_process_group("nccl")
torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
def create_fsdp_model():
"""创建FSDP模型"""
# 创建大模型
config = GPT2Config(
vocab_size=50257,
n_positions=2048,
n_embd=4096,
n_layer=48,
n_head=32
)
model = GPT2LMHeadModel(config)
# 混合精度配置
mixed_precision_policy = MixedPrecision(
param_dtype=torch.float16,
reduce_dtype=torch.float16,
buffer_dtype=torch.float16,
)
# 自动包装策略(针对Transformer)
from transformers.models.gpt2.modeling_gpt2 import GPT2Block
auto_wrap_policy = partial(
transformer_auto_wrap_policy,
transformer_layer_cls={GPT2Block}
)
# 包装为FSDP
model = FSDP(
model,
auto_wrap_policy=auto_wrap_policy,
mixed_precision=mixed_precision_policy,
backward_prefetch=BackwardPrefetch.BACKWARD_PRE,
sharding_strategy=ShardingStrategy.FULL_SHARD, # ZeRO-3
cpu_offload=CPUOffload(offload_params=False),
device_id=torch.cuda.current_device(),
)
return model
def train_fsdp():
"""FSDP训练"""
setup_fsdp()
model = create_fsdp_model()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
# 数据加载器
train_loader = create_dataloader()
# 训练循环
for epoch in range(num_epochs):
for batch in train_loader:
input_ids = batch['input_ids'].cuda()
labels = batch['labels'].cuda()
# Forward
outputs = model(input_ids, labels=labels)
loss = outputs.loss
# Backward
loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
# 更新
optimizer.step()
optimizer.zero_grad()
# 保存模型
if dist.get_rank() == 0:
# FSDP模型需要特殊保存
with FSDP.state_dict_type(
model,
StateDictType.FULL_STATE_DICT,
FullStateDictConfig(offload_to_cpu=True, rank0_only=True)
):
state_dict = model.state_dict()
torch.save(state_dict, "model.pt")
if __name__ == "__main__":
train_fsdp()
5. 完整实战代码
5.1 8卡训练脚本
# train_distributed.py - 完整的8卡训练脚本
import os
import argparse
import time
import json
from datetime import datetime
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
from torch.cuda.amp import autocast, GradScaler
from torchvision import datasets, transforms, models
from torch.utils.tensorboard import SummaryWriter
def parse_args():
parser = argparse.ArgumentParser(description='Distributed Training')
parser.add_argument('--data-dir', type=str, required=True)
parser.add_argument('--output-dir', type=str, default='./outputs')
parser.add_argument('--batch-size', type=int, default=32)
parser.add_argument('--epochs', type=int, default=90)
parser.add_argument('--lr', type=float, default=0.1)
parser.add_argument('--momentum', type=float, default=0.9)
parser.add_argument('--weight-decay', type=float, default=1e-4)
parser.add_argument('--print-freq', type=int, default=100)
parser.add_argument('--save-freq', type=int, default=10)
parser.add_argument('--resume', type=str, default='')
parser.add_argument('--amp', action='store_true', help='Use mixed precision')
return parser.parse_args()
class AverageMeter:
"""计算和存储平均值和当前值"""
def __init__(self, name):
self.name = name
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
class ProgressMeter:
"""进度显示"""
def __init__(self, num_batches, meters, prefix=""):
self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
self.meters = meters
self.prefix = prefix
def display(self, batch, rank):
if rank == 0:
entries = [self.prefix + self.batch_fmtstr.format(batch)]
entries += [str(meter) for meter in self.meters]
print('\t'.join(entries))
def _get_batch_fmtstr(self, num_batches):
num_digits = len(str(num_batches // 1))
fmt = '{:' + str(num_digits) + 'd}'
return '[' + fmt + '/' + fmt.format(num_batches) + ']'
def __str__(self):
loss_str = []
for meter in self.meters:
loss_str.append(f"{meter.name}: {meter.avg:.4f}")
return " ".join(loss_str)
def setup():
"""初始化分布式环境"""
dist.init_process_group("nccl")
rank = dist.get_rank()
local_rank = int(os.environ['LOCAL_RANK'])
world_size = dist.get_world_size()
torch.cuda.set_device(local_rank)
return rank, local_rank, world_size
def cleanup():
dist.destroy_process_group()
def create_model(num_classes=1000):
"""创建模型"""
model = models.resnet50(pretrained=False, num_classes=num_classes)
return model
def create_dataloader(args, rank, world_size):
"""创建数据加载器"""
# 数据增强
train_transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
val_transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
# 加载数据集
train_dataset = datasets.ImageFolder(
root=os.path.join(args.data_dir, 'train'),
transform=train_transform
)
val_dataset = datasets.ImageFolder(
root=os.path.join(args.data_dir, 'val'),
transform=val_transform
)
# 分布式采样器
train_sampler = DistributedSampler(
train_dataset,
num_replicas=world_size,
rank=rank,
shuffle=True,
drop_last=True
)
val_sampler = DistributedSampler(
val_dataset,
num_replicas=world_size,
rank=rank,
shuffle=False
)
# 数据加载器
train_loader = DataLoader(
train_dataset,
batch_size=args.batch_size,
sampler=train_sampler,
num_workers=4,
pin_memory=True,
persistent_workers=True
)
val_loader = DataLoader(
val_dataset,
batch_size=args.batch_size,
sampler=val_sampler,
num_workers=4,
pin_memory=True
)
return train_loader, val_loader, train_sampler
def train_epoch(train_loader, model, criterion, optimizer, scaler, epoch, args, rank):
"""训练一个epoch"""
batch_time = AverageMeter('Time')
data_time = AverageMeter('Data')
losses = AverageMeter('Loss')
top1 = AverageMeter('Acc@1')
top5 = AverageMeter('Acc@5')
progress = ProgressMeter(
len(train_loader),
[batch_time, data_time, losses, top1, top5],
prefix=f"Epoch: [{epoch}]"
)
model.train()
end = time.time()
for i, (images, target) in enumerate(train_loader):
# 数据加载时间
data_time.update(time.time() - end)
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
# 混合精度训练
if args.amp:
with autocast():
output = model(images)
loss = criterion(output, target)
optimizer.zero_grad()
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
else:
output = model(images)
loss = criterion(output, target)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 计算准确率
acc1, acc5 = accuracy(output, target, topk=(1, 5))
# 更新统计
losses.update(loss.item(), images.size(0))
top1.update(acc1.item(), images.size(0))
top5.update(acc5.item(), images.size(0))
# 计算时间
batch_time.update(time.time() - end)
end = time.time()
# 打印进度
if i % args.print_freq == 0:
progress.display(i, rank)
return losses.avg, top1.avg, top5.avg
def validate(val_loader, model, criterion, rank):
"""验证"""
losses = AverageMeter('Loss')
top1 = AverageMeter('Acc@1')
top5 = AverageMeter('Acc@5')
model.eval()
with torch.no_grad():
for images, target in val_loader:
images = images.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
output = model(images)
loss = criterion(output, target)
acc1, acc5 = accuracy(output, target, topk=(1, 5))
losses.update(loss.item(), images.size(0))
top1.update(acc1.item(), images.size(0))
top5.update(acc5.item(), images.size(0))
# 同步所有GPU的统计信息
losses_tensor = torch.tensor([losses.avg]).cuda()
top1_tensor = torch.tensor([top1.avg]).cuda()
top5_tensor = torch.tensor([top5.avg]).cuda()
dist.all_reduce(losses_tensor)
dist.all_reduce(top1_tensor)
dist.all_reduce(top5_tensor)
world_size = dist.get_world_size()
losses_avg = losses_tensor.item() / world_size
top1_avg = top1_tensor.item() / world_size
top5_avg = top5_tensor.item() / world_size
if rank == 0:
print(f'\nValidation: Loss {losses_avg:.4f}, '
f'Acc@1 {top1_avg:.2f}%, Acc@5 {top5_avg:.2f}%\n')
return losses_avg, top1_avg, top5_avg
def accuracy(output, target, topk=(1,)):
"""计算top-k准确率"""
with torch.no_grad():
maxk = max(topk)
batch_size = target.size(0)
_, pred = output.topk(maxk, 1, True, True)
pred = pred.t()
correct = pred.eq(target.view(1, -1).expand_as(pred))
res = []
for k in topk:
correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
res.append(correct_k.mul_(100.0 / batch_size))
return res
def save_checkpoint(state, filename, rank):
"""保存检查点"""
if rank == 0:
torch.save(state, filename)
print(f'Checkpoint saved: {filename}')
def main():
args = parse_args()
# 初始化分布式
rank, local_rank, world_size = setup()
# 创建输出目录
if rank == 0:
os.makedirs(args.output_dir, exist_ok=True)
# 保存配置
with open(os.path.join(args.output_dir, 'config.json'), 'w') as f:
json.dump(vars(args), f, indent=2)
# TensorBoard
writer = SummaryWriter(os.path.join(args.output_dir, 'logs'))
# 创建模型
model = create_model()
model = model.cuda()
model = DDP(model, device_ids=[local_rank])
# 损失函数
criterion = nn.CrossEntropyLoss().cuda()
# 优化器
optimizer = torch.optim.SGD(
model.parameters(),
lr=args.lr * world_size, # 线性缩放学习率
momentum=args.momentum,
weight_decay=args.weight_decay
)
# 学习率调度器
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer,
T_max=args.epochs
)
# 混合精度
scaler = GradScaler() if args.amp else None
# 数据加载器
train_loader, val_loader, train_sampler = create_dataloader(args, rank, world_size)
# 恢复训练
start_epoch = 0
best_acc1 = 0
if args.resume:
if os.path.isfile(args.resume):
checkpoint = torch.load(args.resume, map_location=f'cuda:{local_rank}')
start_epoch = checkpoint['epoch']
best_acc1 = checkpoint['best_acc1']
model.load_state_dict(checkpoint['state_dict'])
optimizer.load_state_dict(checkpoint['optimizer'])
scheduler.load_state_dict(checkpoint['scheduler'])
if rank == 0:
print(f"Resumed from epoch {start_epoch}")
# 训练循环
for epoch in range(start_epoch, args.epochs):
train_sampler.set_epoch(epoch)
# 训练
train_loss, train_acc1, train_acc5 = train_epoch(
train_loader, model, criterion, optimizer, scaler, epoch, args, rank
)
# 验证
val_loss, val_acc1, val_acc5 = validate(val_loader, model, criterion, rank)
# 更新学习率
scheduler.step()
# TensorBoard
if rank == 0:
writer.add_scalar('Loss/train', train_loss, epoch)
writer.add_scalar('Loss/val', val_loss, epoch)
writer.add_scalar('Acc1/train', train_acc1, epoch)
writer.add_scalar('Acc1/val', val_acc1, epoch)
writer.add_scalar('Acc5/train', train_acc5, epoch)
writer.add_scalar('Acc5/val', val_acc5, epoch)
writer.add_scalar('LR', optimizer.param_groups[0]['lr'], epoch)
# 保存最佳模型
is_best = val_acc1 > best_acc1
best_acc1 = max(val_acc1, best_acc1)
if (epoch + 1) % args.save_freq == 0 or is_best:
save_checkpoint({
'epoch': epoch + 1,
'state_dict': model.state_dict(),
'best_acc1': best_acc1,
'optimizer': optimizer.state_dict(),
'scheduler': scheduler.state_dict(),
}, os.path.join(args.output_dir, f'checkpoint_epoch_{epoch+1}.pt'), rank)
if is_best:
save_checkpoint({
'epoch': epoch + 1,
'state_dict': model.state_dict(),
'best_acc1': best_acc1,
}, os.path.join(args.output_dir, 'model_best.pt'), rank)
# 保存最终模型
save_checkpoint({
'epoch': args.epochs,
'state_dict': model.state_dict(),
'best_acc1': best_acc1,
}, os.path.join(args.output_dir, 'model_final.pt'), rank)
if rank == 0:
writer.close()
print(f'Training completed. Best Acc@1: {best_acc1:.2f}%')
cleanup()
if __name__ == '__main__':
main()
5.2 多节点启动命令
#!/bin/bash
# launch_multinode.sh
# 集群配置
MASTER_ADDR="node0"
MASTER_PORT=29500
NNODES=4
NPROC_PER_NODE=8
# 数据和输出路径
DATA_DIR="/nfs/shared/imagenet"
OUTPUT_DIR="/nfs/shared/outputs/exp_$(date +%Y%m%d_%H%M%S)"
# 训练参数
BATCH_SIZE=32
EPOCHS=90
LR=0.8 # 32 (total batch size 1024) * 0.025
# 启动函数
launch_node() {
NODE_RANK=$1
NODE="node${NODE_RANK}"
echo "Launching on ${NODE} (rank ${NODE_RANK})"
ssh ${NODE} "cd /nfs/shared/project && \
torchrun \
--nnodes=${NNODES} \
--nproc_per_node=${NPROC_PER_NODE} \
--node_rank=${NODE_RANK} \
--master_addr=${MASTER_ADDR} \
--master_port=${MASTER_PORT} \
train_distributed.py \
--data-dir ${DATA_DIR} \
--output-dir ${OUTPUT_DIR} \
--batch-size ${BATCH_SIZE} \
--epochs ${EPOCHS} \
--lr ${LR} \
--amp \
> ${OUTPUT_DIR}/node${NODE_RANK}.log 2>&1" &
}
# 创建输出目录
mkdir -p ${OUTPUT_DIR}
# 启动所有节点
for NODE_RANK in {0..3}; do
launch_node ${NODE_RANK}
done
# 等待所有节点完成
wait
echo "All nodes completed"
5.3 监控和调试
# monitor.py - 训练监控脚本
import psutil
import GPUtil
import time
import json
from datetime import datetime
class GPUMonitor:
"""GPU监控"""
def __init__(self, log_file='gpu_stats.jsonl'):
self.log_file = log_file
def get_gpu_stats(self):
"""获取GPU统计信息"""
gpus = GPUtil.getGPUs()
stats = []
for gpu in gpus:
stats.append({
'id': gpu.id,
'name': gpu.name,
'load': gpu.load * 100,
'memory_used': gpu.memoryUsed,
'memory_total': gpu.memoryTotal,
'memory_util': gpu.memoryUtil * 100,
'temperature': gpu.temperature,
})
return stats
def get_cpu_stats(self):
"""获取CPU统计信息"""
return {
'percent': psutil.cpu_percent(interval=1),
'count': psutil.cpu_count(),
'memory': psutil.virtual_memory().percent
}
def log_stats(self):
"""记录统计信息"""
stats = {
'timestamp': datetime.now().isoformat(),
'gpu': self.get_gpu_stats(),
'cpu': self.get_cpu_stats()
}
with open(self.log_file, 'a') as f:
f.write(json.dumps(stats) + '\n')
return stats
def monitor(self, interval=5):
"""持续监控"""
print("Starting GPU monitoring...")
try:
while True:
stats = self.log_stats()
# 打印到控制台
print(f"\n[{stats['timestamp']}]")
print(f"CPU: {stats['cpu']['percent']:.1f}%, "
f"Memory: {stats['cpu']['memory']:.1f}%")
for gpu in stats['gpu']:
print(f"GPU {gpu['id']}: {gpu['name']}")
print(f" Load: {gpu['load']:.1f}%")
print(f" Memory: {gpu['memory_used']:.0f}MB / "
f"{gpu['memory_total']:.0f}MB ({gpu['memory_util']:.1f}%)")
print(f" Temp: {gpu['temperature']}°C")
time.sleep(interval)
except KeyboardInterrupt:
print("\nMonitoring stopped")
if __name__ == "__main__":
monitor = GPUMonitor()
monitor.monitor(interval=5)
# 在训练时启动监控
python monitor.py &
# 实时查看日志
tail -f gpu_stats.jsonl | jq .
# 绘制GPU利用率曲线
python plot_stats.py gpu_stats.jsonl