https://github.com/rickyang1114/DDP-practice/tree/main
https://github.com/pytorch/examples/tree/main/imagenet
简易DDP模版,只考虑单机多卡,完整程序在这里。
首先,我们在if __name__ == '__main__'中启动 DDP:
61if __name__ == '__main__':2 args = prepare() ###3 time_start = time.time()4 mp.spawn(main, args=(args, ), nprocs=torch.cuda.device_count()) #import torch.multiprocessing as mp5 time_elapsed = time.time() - time_start6 print(f'\ntime elapsed: {time_elapsed:.2f} seconds')spawn函数的主要参数包括以下几个:
fn,即上面传入的main函数。每个线程将执行一次该函数
args,即fn所需的参数。传给fn的参数必须写成元组的形式,哪怕像上面一样只有一个
nprocs启动的进程数,将其设置为world_size即可。不传默认为1,与world_size不一致会导致进程等待同步而一直停滞。
入口 → spawn nprocs=4 → 启动4个训练进程 ↘ 每个子进程 → 初始化 DDP(world_size=4, rank=0~3) ↘ 所有进程通过通信组形成 DDP 同步机制
再假设有个复杂场景:跨两台机器进行 DDP 训练,但每台机器上使用的 GPU 数不同:
机器 A(8卡):只使用 GPU 3、4、5(共 3 张)
机器 B(4卡):使用所有 GPU 0、1、2、3(共 4 张)
总共 7 个进程 / 7 张卡,此时:
| 参数 | 值 | 说明 |
|---|---|---|
world_size | 7 | 全部进程数量 = 3 + 4 |
nprocs | 每台机器的卡数 | A 机器是 3,B 机器是 4 |
rank | 每个进程的全局编号 | A 机器是 0~2,B 机器是 3~6 |
机器A启动:
71# 只在这台机器使用 GPU 3、4、52CUDA_VISIBLE_DEVICES=3,4,5 \3MASTER_ADDR=192.168.1.100 \4MASTER_PORT=29500 \5WORLD_SIZE=7 \6RANK=0 \7python train_ddp.py机器B启动:
x
1CUDA_VISIBLE_DEVICES=0,1,2,3 \2MASTER_ADDR=192.168.1.100 \3MASTER_PORT=29500 \4WORLD_SIZE=7 \5RANK=3 \ # 对应全局 rank6python train_ddp.py所以多机多卡对我们训练来说差不多的。类似 rank = int(os.environ["RANK"]) 获取一下global rank即可。 底层通信机制不一样,但无需感知。
本地进程间通信(通过 NCCL 共享内存 + socket)、跨节点网络通信(NCCL over TCP)
单机多卡时MASTER_ADDR配置为本地回环地址localhost,而多机多卡时会配置为主节点IP。
在prepare函数里面,也进行了一些 DDP 的配置:
xxxxxxxxxx1def prepare():2 parser = argparse.ArgumentParser()3 parser.add_argument('--gpu', default='0,1')4 parser.add_argument('-e',5 '--epochs',6 default=3,7 type=int,8 metavar='N',9 help='number of total epochs to run')10 parser.add_argument('-b',11 '--batch_size',12 default=32,13 type=int,14 metavar='N',15 help='number of batchsize')16 args = parser.parse_args()17 18 # 下面几行是新加的,用于启动多进程 DDP。使用 torchrun 启动时只需要设置使用的 GPU19 os.environ['MASTER_ADDR'] = 'localhost' # 0号机器的本地回环地址。20 os.environ['MASTER_PORT'] = '19198' # 0号机器的可用端口,随便选一个没被占用的21 os.environ['CUDA_VISIBLE_DEVICES'] = args.gpu # 使用哪些 GPU22 world_size = torch.cuda.device_count() # 就是上一行使用的 GPU 数量23 os.environ['WORLD_SIZE'] = str(world_size)24 return args再来看看main函数里面添加了什么。首先是其添加一个额外的参数local_rank(在mp.spawn里面不用传,会自动分配,但都是从0开始,所以只能叫local rank)
x1def main(local_rank, args):2 init_ddp(local_rank) ### 进程初始化3 model = ConvNet().cuda() ### 模型的 forward 方法变了4 model = nn.SyncBatchNorm.convert_sync_batchnorm(model) ### 转换模型的 BN 层5 model = nn.parallel.DistributedDataParallel(model, device_ids=[local_rank]) ### 套 DDP6 criterion = nn.CrossEntropyLoss().cuda()7 optimizer = torch.optim.SGD(model.parameters(), 1e-4)8 scaler = GradScaler() ### 用于混合精度训练9 10 train_dataset = torchvision.datasets.MNIST(root='./data',train=True,transform=transforms.ToTensor(),download=True)11 train_sampler = torch.utils.data.distributed.DistributedSampler(12 train_dataset) ### 用于在 DDP 环境下采样13 g = get_ddp_generator() ###14 train_dloader = torch.utils.data.DataLoader(15 dataset=train_dataset,16 batch_size=args.batch_size,17 shuffle=False, ### shuffle is mutually exclusive with sampler18 num_workers=4,19 pin_memory=True,20 sampler=train_sampler,21 generator=g,22 ) ### 添加额外的 generator,随机种子23 24 test_dataset = torchvision.datasets.MNIST(25 root="./data", train=False, transform=transforms.ToTensor(), download=True26 )27 test_sampler = torch.utils.data.distributed.DistributedSampler(28 test_dataset29 ) ### 用于在 DDP 环境下采样30 test_dloader = torch.utils.data.DataLoader(31 dataset=test_dataset,32 batch_size=args.batch_size,33 shuffle=False,34 num_workers=2,35 pin_memory=True,36 sampler=test_sampler,37 )38 test_dataset) 39 40 for epoch in range(args.epochs):41 if local_rank == 0: ### 防止每个进程都输出一次42 print(f'begin training of epoch {epoch + 1}/{args.epochs}')43 train_dloader.sampler.set_epoch(epoch) ### DDP里是需要控制shuffle的种子44 train(model, train_dloader, criterion, optimizer, scaler)45 if local_rank == 0:46 print(f'begin testing')47 test(model, test_dloader)48 if local_rank == 0: ### 防止每个进程都保存一次49 torch.save({'model': model.state_dict(), 'scaler': scaler.state_dict()}, 'ddp_checkpoint.pt')50 dist.destroy_process_group() ### 最后摧毁进程,和 init_process_group 相对首先,根据用init_ddp函数对模型进行初始化。这里我们使用 nccl 后端,并用 env 作为初始化方法:
xxxxxxxxxx141def init_ddp(local_rank):2 # 有了这一句之后,在转换device的时候直接使用 a=a.cuda()即可,否则要用a=a.cuda(local_rank)3 torch.cuda.set_device(local_rank) 4 os.environ['RANK'] = str(local_rank)5 dist.init_process_group(backend='nccl', init_method='env://')6 # 或 dist.init_process_group("nccl", rank=rank, world_size=world_size)7 # 但更推荐init_method='env://'8 # 你只需要 设置好环境变量,PyTorch 会自动读取并初始化:9 # shell 中设置:10 # MASTER_ADDR=192.168.1.1011 # MASTER_PORT=1234512 # RANK=313 # WORLD_SIZE=814 不过我觉得device = torch.device('cuda:{}'.format(rank)),然后统一to(device)更好
在完成了该初始化后,可以很轻松地在需要时获得local_rank、world_size,而不需要作为额外参数从main中一层一层往下传。
x
1import torch.distributed as dist2local_rank = dist.get_rank()3world_size = dist.get_world_size()比如需要print, log, save_state_dict时,由于多个进程拥有相同的副本,故只需要一个进程执行即可,比如:
41if local_rank == 0:2 print(f'begin testing')3if local_rank == 0: ### 防止每个进程都保存一次4 torch.save({'model': model.state_dict(), 'scaler': scaler.state_dict()}, 'ddp_checkpoint.pt')为了加速推理,我们在模型的forward方法里套一个torch.cuda.amp.autocast():
使得forward函数变为:
x
1def forward(self, x):2 with torch.cuda.amp.autocast(): # 混合精度,加速推理3 out = self.layer1(x)4 out = self.layer2(out)5 out = out.reshape(out.size(0), -1)6 out = self.fc(out)7 return outautocast 也可以在推理的时候再套,但是在这里套最方便,而且适用于所有情况。
在模型改变之后,使用convert_sync_batchnorm和DistributedDataParallel对模型进行包装。
创建 scaler,用于训练时对 loss 进行 scale:
xxxxxxxxxx21from torch.cuda.amp import GradScaler2scaler = GradScaler() ### 用于混合精度训练训练时,需要使用 DDP 的sampler,并且在num_workers > 1时需要传入generator,否则对于同一个worker,所有进程的augmentation相同,减弱训练的随机性。详细分析参见这篇文章。
xxxxxxxxxx161def get_ddp_generator(seed=3407):2 local_rank = dist.get_rank()3 g = torch.Generator()4 g.manual_seed(seed + local_rank)5 return g6
7train_sampler = torch.utils.data.distributed.DistributedSampler(8 train_dataset) ### 用于在 DDP 环境下采样9g = get_ddp_generator() ###10train_dloader = torch.utils.data.DataLoader(dataset=train_dataset,11 batch_size=args.batch_size,12 shuffle=False, ### shuffle 通过 sampler 完成13 num_workers=4,14 pin_memory=True,15 sampler=train_sampler,16 generator=g) ### 添加额外的 generator并且在多个epoch的训练时,需要设置train_dloader.sampler.set_epoch(epoch)。
下面来看看train函数。
xxxxxxxxxx111def train(model, train_dloader, criterion, optimizer, scaler):2 model.train()3 for images, labels in train_dloader:4 images = images.cuda()5 labels = labels.cuda()6 outputs = model(images)7 loss = criterion(outputs, labels)8 optimizer.zero_grad()9 scaler.scale(loss).backward() ###10 scaler.step(optimizer) ###11 scaler.update() ###最后三行发生了改变。相较于原始的loss.backward、optimizer.step(),这里通过scaler对梯度进行缩放,防止由于使用混合精度导致损失下溢,并且对scaler自身的状态进行更新呢。如果有多个loss,它们也使用同一个scaler。如果需要保存模型的state_dict并且在后续继续训练(比如预训练-微调模式),最好连带scaler的状态一起保留,并在后续的微调过程中和模型的参数一同加载。
测试时,需要将多个进程的数据reduce到一张卡上。注意,在test函数的外面加上if local_rank == 0,否则多个进程会彼此等待而陷入死锁。
xxxxxxxxxx171def test(model, test_dloader):2 local_rank = dist.get_rank()3 model.eval()4 size = torch.tensor(0.).cuda()5 correct = torch.tensor(0.).cuda()6 for images, labels in test_dloader:7 images = images.cuda()8 labels = labels.cuda()9 with torch.no_grad():10 outputs = model(images)11 size += images.size(0)12 correct += (outputs.argmax(1) == labels).type(torch.float).sum()13 dist.reduce(size, 0, op=dist.ReduceOp.SUM) ###14 dist.reduce(correct, 0, op=dist.ReduceOp.SUM) ###15 if local_rank == 0:16 acc = correct / size17 print(f'Accuracy is {acc:.2%}')注释的两行即为所需添加的reduce操作。
至此,添加的代码讲解完毕。
启动的方式变化不大:
xxxxxxxxxx11python ddp_main.py --gpu 0,1相应的结果:
xxxxxxxxxx71begin training of epoch 1/32begin training of epoch 2/33begin training of epoch 3/34begin testing5Accuracy is 89.21%6
7time elapsed: 30.82 seconds上述是通过mp.spawn启动。mp模块对multiprocessing库进行封装,并没有特定针对DDP。我们还可以通过官方推荐的torchrun进行启动。完整的程序在这里。
相比mp.spawn启动,torchrun自动控制一些环境变量的设置,因而更为方便。我们只需要设置os.environ['CUDA_VISIBLE_DEVICES']即可(不设置默认为该机器上的所有GPU),而无需设置os.environ['MASTER_ADDR']等。此外,main函数不再需要local_rank参数。程序入口变为:
xxxxxxxxxx81if __name__ == '__main__':2 args = prepare()3 time_start = time.time()4 main(args)5 time_elapsed = time.time() - time_start6 local_rank = int(os.environ['LOCAL_RANK'])7 if local_rank == 0:8 print(f'\ntime elapsed: {time_elapsed:.2f} seconds')运行脚本的命令由python变为了torchrun,如下:
x
1torchrun --standalone --nproc_per_node=2 ddp_main_torchrun.py --gpu 0,1其中,nproc_per_node表示进程数,将其设置为使用的GPU数量即可。
torchrun 是 PyTorch 官方推荐的启动分布式训练(DDP)的工具,是对早期 python -m torch.distributed.launch 的升级替代。它非常适合 单机多卡 和 多机多卡 的训练脚本。
11torchrun [OPTIONS] your_training_script.py [SCRIPT_ARGS...]| 参数 | 作用 | 示例 |
|---|---|---|
--nproc_per_node | 当前节点使用的 GPU 数量(即几张卡) | --nproc_per_node=4 |
--nnodes | 总共多少个节点(机器) | --nnodes=2 |
--node_rank | 当前节点的编号(0 开始) | --node_rank=0 |
--master_addr | 主节点 IP(或 hostname) | --master_addr=192.168.1.100 |
--master_port | 主节点端口(通信端口) | --master_port=29500 |
--rdzv_backend | 通信后端,一般默认不写 | 默认使用 c10d |
--max_restarts | 进程最大重启次数 | 可选,调试用 |
--monitor_interval | 监测失败进程的间隔时间(秒) | 可选 |
每个进程的
RANK会自动计算为node_rank * nproc_per_node + local_rank小知识:不同节点GPU数量不一样,nproc_per_node设置为不同即可。只是很少有这种场景。
xxxxxxxxxx1
2# 方案1: mp.spawn()3 mp.spawn(main_fun,4 args=(opt.world_size, opt),5 nprocs=opt.world_size,6 join=True)7# 方案2: Process8 from torch.multiprocessing import Process9 world_size = opt.world_size10 processes = []11 for rank in range(world_size):12 p = Process(target=main_fun, args=(rank, world_size, opt))13 p.start()14 processes.append(p)15 for p in processes:16 p.join()17
18# 方案3: torchrun
在写完 DDP 的代码之后,最好检查一遍,否则很容易因为漏了什么而出现莫名奇妙的错误,比如程序卡着不动了,也不报错)
大致需要检查:
DDP 初始化有没有完成,包括if __name__ == '__main__'里和main函数里的。退出main函数时摧毁进程。
模型的封装,包括autocast,BN 层的转化和 DDP 封装
指定train_dloader的sampler、generator和shuffle,并且在每个epoch设置sampler,测试集、验证集同理。
训练时使用scaler对loss进行scale
对于print、log、save等操作,仅在一个线程上进行。
测试时进行reduce
多个线程大致相当于增大了相应倍数的batch_size,最好相应地调一调batch_size和学习率。本文没有进行调节,导致测试获得的准确率有一些差别。
模型较小时速度差别不大,反而DDP与混合精度可能因为一些初始化和精度转换耗费额外时间而更慢。在模型较大时,DDP + 混合精度的速度要明显高于常规,且能降低显存占用。