https://github.com/rickyang1114/DDP-practice/tree/main
https://github.com/pytorch/examples/tree/main/imagenet
简易DDP模版,只考虑单机多卡,完整程序在这里。
首先,我们在if __name__ == '__main__'
中启动 DDP:
61if __name__ == '__main__':
2 args = prepare() ###
3 time_start = time.time()
4 mp.spawn(main, args=(args, ), nprocs=torch.cuda.device_count()) #import torch.multiprocessing as mp
5 time_elapsed = time.time() - time_start
6 print(f'\ntime elapsed: {time_elapsed:.2f} seconds')
spawn
函数的主要参数包括以下几个:
fn
,即上面传入的main
函数。每个线程将执行一次该函数
args
,即fn
所需的参数。传给fn
的参数必须写成元组的形式,哪怕像上面一样只有一个
nprocs
启动的进程数,将其设置为world_size
即可。不传默认为1,与world_size
不一致会导致进程等待同步而一直停滞。
入口 → spawn nprocs=4 → 启动4个训练进程 ↘ 每个子进程 → 初始化 DDP(world_size=4, rank=0~3) ↘ 所有进程通过通信组形成 DDP 同步机制
再假设有个复杂场景:跨两台机器进行 DDP 训练,但每台机器上使用的 GPU 数不同:
机器 A(8卡):只使用 GPU 3、4、5(共 3 张)
机器 B(4卡):使用所有 GPU 0、1、2、3(共 4 张)
总共 7 个进程 / 7 张卡,此时:
参数 | 值 | 说明 |
---|---|---|
world_size | 7 | 全部进程数量 = 3 + 4 |
nprocs | 每台机器的卡数 | A 机器是 3 ,B 机器是 4 |
rank | 每个进程的全局编号 | A 机器是 0~2 ,B 机器是 3~6 |
机器A启动:
71# 只在这台机器使用 GPU 3、4、5
2CUDA_VISIBLE_DEVICES=3,4,5 \
3MASTER_ADDR=192.168.1.100 \
4MASTER_PORT=29500 \
5WORLD_SIZE=7 \
6RANK=0 \
7python train_ddp.py
机器B启动:
x
1CUDA_VISIBLE_DEVICES=0,1,2,3 \
2MASTER_ADDR=192.168.1.100 \
3MASTER_PORT=29500 \
4WORLD_SIZE=7 \
5RANK=3 \ # 对应全局 rank
6python train_ddp.py
所以多机多卡对我们训练来说差不多的。类似 rank = int(os.environ["RANK"])
获取一下global rank即可。 底层通信机制不一样,但无需感知。
本地进程间通信(通过 NCCL 共享内存 + socket)、跨节点网络通信(NCCL over TCP)
单机多卡时MASTER_ADDR配置为本地回环地址localhost,而多机多卡时会配置为主节点IP。
在prepare
函数里面,也进行了一些 DDP 的配置:
xxxxxxxxxx
1def prepare():
2 parser = argparse.ArgumentParser()
3 parser.add_argument('--gpu', default='0,1')
4 parser.add_argument('-e',
5 '--epochs',
6 default=3,
7 type=int,
8 metavar='N',
9 help='number of total epochs to run')
10 parser.add_argument('-b',
11 '--batch_size',
12 default=32,
13 type=int,
14 metavar='N',
15 help='number of batchsize')
16 args = parser.parse_args()
17
18 # 下面几行是新加的,用于启动多进程 DDP。使用 torchrun 启动时只需要设置使用的 GPU
19 os.environ['MASTER_ADDR'] = 'localhost' # 0号机器的本地回环地址。
20 os.environ['MASTER_PORT'] = '19198' # 0号机器的可用端口,随便选一个没被占用的
21 os.environ['CUDA_VISIBLE_DEVICES'] = args.gpu # 使用哪些 GPU
22 world_size = torch.cuda.device_count() # 就是上一行使用的 GPU 数量
23 os.environ['WORLD_SIZE'] = str(world_size)
24 return args
再来看看main
函数里面添加了什么。首先是其添加一个额外的参数local_rank
(在mp.spawn
里面不用传,会自动分配,但都是从0开始,所以只能叫local rank)
x1def main(local_rank, args):
2 init_ddp(local_rank) ### 进程初始化
3 model = ConvNet().cuda() ### 模型的 forward 方法变了
4 model = nn.SyncBatchNorm.convert_sync_batchnorm(model) ### 转换模型的 BN 层
5 model = nn.parallel.DistributedDataParallel(model, device_ids=[local_rank]) ### 套 DDP
6 criterion = nn.CrossEntropyLoss().cuda()
7 optimizer = torch.optim.SGD(model.parameters(), 1e-4)
8 scaler = GradScaler() ### 用于混合精度训练
9
10 train_dataset = torchvision.datasets.MNIST(root='./data',train=True,transform=transforms.ToTensor(),download=True)
11 train_sampler = torch.utils.data.distributed.DistributedSampler(
12 train_dataset) ### 用于在 DDP 环境下采样
13 g = get_ddp_generator() ###
14 train_dloader = torch.utils.data.DataLoader(
15 dataset=train_dataset,
16 batch_size=args.batch_size,
17 shuffle=False, ### shuffle is mutually exclusive with sampler
18 num_workers=4,
19 pin_memory=True,
20 sampler=train_sampler,
21 generator=g,
22 ) ### 添加额外的 generator,随机种子
23
24 test_dataset = torchvision.datasets.MNIST(
25 root="./data", train=False, transform=transforms.ToTensor(), download=True
26 )
27 test_sampler = torch.utils.data.distributed.DistributedSampler(
28 test_dataset
29 ) ### 用于在 DDP 环境下采样
30 test_dloader = torch.utils.data.DataLoader(
31 dataset=test_dataset,
32 batch_size=args.batch_size,
33 shuffle=False,
34 num_workers=2,
35 pin_memory=True,
36 sampler=test_sampler,
37 )
38 test_dataset)
39
40 for epoch in range(args.epochs):
41 if local_rank == 0: ### 防止每个进程都输出一次
42 print(f'begin training of epoch {epoch + 1}/{args.epochs}')
43 train_dloader.sampler.set_epoch(epoch) ### DDP里是需要控制shuffle的种子
44 train(model, train_dloader, criterion, optimizer, scaler)
45 if local_rank == 0:
46 print(f'begin testing')
47 test(model, test_dloader)
48 if local_rank == 0: ### 防止每个进程都保存一次
49 torch.save({'model': model.state_dict(), 'scaler': scaler.state_dict()}, 'ddp_checkpoint.pt')
50 dist.destroy_process_group() ### 最后摧毁进程,和 init_process_group 相对
首先,根据用init_ddp
函数对模型进行初始化。这里我们使用 nccl 后端,并用 env 作为初始化方法:
xxxxxxxxxx
141def init_ddp(local_rank):
2 # 有了这一句之后,在转换device的时候直接使用 a=a.cuda()即可,否则要用a=a.cuda(local_rank)
3 torch.cuda.set_device(local_rank)
4 os.environ['RANK'] = str(local_rank)
5 dist.init_process_group(backend='nccl', init_method='env://')
6 # 或 dist.init_process_group("nccl", rank=rank, world_size=world_size)
7 # 但更推荐init_method='env://'
8 # 你只需要 设置好环境变量,PyTorch 会自动读取并初始化:
9 # shell 中设置:
10 # MASTER_ADDR=192.168.1.10
11 # MASTER_PORT=12345
12 # RANK=3
13 # WORLD_SIZE=8
14
不过我觉得device = torch.device('cuda:{}'.format(rank)),然后统一to(device)更好
在完成了该初始化后,可以很轻松地在需要时获得local_rank
、world_size
,而不需要作为额外参数从main
中一层一层往下传。
x
1import torch.distributed as dist
2local_rank = dist.get_rank()
3world_size = dist.get_world_size()
比如需要print
, log
, save_state_dict
时,由于多个进程拥有相同的副本,故只需要一个进程执行即可,比如:
41if local_rank == 0:
2 print(f'begin testing')
3if local_rank == 0: ### 防止每个进程都保存一次
4 torch.save({'model': model.state_dict(), 'scaler': scaler.state_dict()}, 'ddp_checkpoint.pt')
为了加速推理,我们在模型的forward
方法里套一个torch.cuda.amp.autocast()
:
使得forward
函数变为:
x
1def forward(self, x):
2 with torch.cuda.amp.autocast(): # 混合精度,加速推理
3 out = self.layer1(x)
4 out = self.layer2(out)
5 out = out.reshape(out.size(0), -1)
6 out = self.fc(out)
7 return out
autocast 也可以在推理的时候再套,但是在这里套最方便,而且适用于所有情况。
在模型改变之后,使用convert_sync_batchnorm
和DistributedDataParallel
对模型进行包装。
创建 scaler,用于训练时对 loss 进行 scale:
xxxxxxxxxx
21from torch.cuda.amp import GradScaler
2scaler = GradScaler() ### 用于混合精度训练
训练时,需要使用 DDP 的sampler,并且在num_workers > 1
时需要传入generator
,否则对于同一个worker,所有进程的augmentation相同,减弱训练的随机性。详细分析参见这篇文章。
xxxxxxxxxx
161def get_ddp_generator(seed=3407):
2 local_rank = dist.get_rank()
3 g = torch.Generator()
4 g.manual_seed(seed + local_rank)
5 return g
6
7train_sampler = torch.utils.data.distributed.DistributedSampler(
8 train_dataset) ### 用于在 DDP 环境下采样
9g = get_ddp_generator() ###
10train_dloader = torch.utils.data.DataLoader(dataset=train_dataset,
11 batch_size=args.batch_size,
12 shuffle=False, ### shuffle 通过 sampler 完成
13 num_workers=4,
14 pin_memory=True,
15 sampler=train_sampler,
16 generator=g) ### 添加额外的 generator
并且在多个epoch
的训练时,需要设置train_dloader.sampler.set_epoch(epoch)
。
下面来看看train
函数。
xxxxxxxxxx
111def train(model, train_dloader, criterion, optimizer, scaler):
2 model.train()
3 for images, labels in train_dloader:
4 images = images.cuda()
5 labels = labels.cuda()
6 outputs = model(images)
7 loss = criterion(outputs, labels)
8 optimizer.zero_grad()
9 scaler.scale(loss).backward() ###
10 scaler.step(optimizer) ###
11 scaler.update() ###
最后三行发生了改变。相较于原始的loss.backward
、optimizer.step()
,这里通过scaler
对梯度进行缩放,防止由于使用混合精度导致损失下溢,并且对scaler
自身的状态进行更新呢。如果有多个loss
,它们也使用同一个scaler
。如果需要保存模型的state_dict
并且在后续继续训练(比如预训练-微调模式),最好连带scaler
的状态一起保留,并在后续的微调过程中和模型的参数一同加载。
测试时,需要将多个进程的数据reduce
到一张卡上。注意,在test
函数的外面加上if local_rank == 0
,否则多个进程会彼此等待而陷入死锁。
xxxxxxxxxx
171def test(model, test_dloader):
2 local_rank = dist.get_rank()
3 model.eval()
4 size = torch.tensor(0.).cuda()
5 correct = torch.tensor(0.).cuda()
6 for images, labels in test_dloader:
7 images = images.cuda()
8 labels = labels.cuda()
9 with torch.no_grad():
10 outputs = model(images)
11 size += images.size(0)
12 correct += (outputs.argmax(1) == labels).type(torch.float).sum()
13 dist.reduce(size, 0, op=dist.ReduceOp.SUM) ###
14 dist.reduce(correct, 0, op=dist.ReduceOp.SUM) ###
15 if local_rank == 0:
16 acc = correct / size
17 print(f'Accuracy is {acc:.2%}')
注释的两行即为所需添加的reduce
操作。
至此,添加的代码讲解完毕。
启动的方式变化不大:
xxxxxxxxxx
11python ddp_main.py --gpu 0,1
相应的结果:
xxxxxxxxxx
71begin training of epoch 1/3
2begin training of epoch 2/3
3begin training of epoch 3/3
4begin testing
5Accuracy is 89.21%
6
7time elapsed: 30.82 seconds
上述是通过mp.spawn
启动。mp
模块对multiprocessing
库进行封装,并没有特定针对DDP
。我们还可以通过官方推荐的torchrun
进行启动。完整的程序在这里。
相比mp.spawn
启动,torchrun
自动控制一些环境变量的设置,因而更为方便。我们只需要设置os.environ['CUDA_VISIBLE_DEVICES']
即可(不设置默认为该机器上的所有GPU),而无需设置os.environ['MASTER_ADDR']
等。此外,main
函数不再需要local_rank
参数。程序入口变为:
xxxxxxxxxx
81if __name__ == '__main__':
2 args = prepare()
3 time_start = time.time()
4 main(args)
5 time_elapsed = time.time() - time_start
6 local_rank = int(os.environ['LOCAL_RANK'])
7 if local_rank == 0:
8 print(f'\ntime elapsed: {time_elapsed:.2f} seconds')
运行脚本的命令由python
变为了torchrun
,如下:
x
1torchrun --standalone --nproc_per_node=2 ddp_main_torchrun.py --gpu 0,1
其中,nproc_per_node
表示进程数,将其设置为使用的GPU数量即可。
torchrun
是 PyTorch 官方推荐的启动分布式训练(DDP)的工具,是对早期 python -m torch.distributed.launch
的升级替代。它非常适合 单机多卡 和 多机多卡 的训练脚本。
11torchrun [OPTIONS] your_training_script.py [SCRIPT_ARGS...]
参数 | 作用 | 示例 |
---|---|---|
--nproc_per_node | 当前节点使用的 GPU 数量(即几张卡) | --nproc_per_node=4 |
--nnodes | 总共多少个节点(机器) | --nnodes=2 |
--node_rank | 当前节点的编号(0 开始) | --node_rank=0 |
--master_addr | 主节点 IP(或 hostname) | --master_addr=192.168.1.100 |
--master_port | 主节点端口(通信端口) | --master_port=29500 |
--rdzv_backend | 通信后端,一般默认不写 | 默认使用 c10d |
--max_restarts | 进程最大重启次数 | 可选,调试用 |
--monitor_interval | 监测失败进程的间隔时间(秒) | 可选 |
每个进程的
RANK
会自动计算为node_rank * nproc_per_node + local_rank
小知识:不同节点GPU数量不一样,nproc_per_node设置为不同即可。只是很少有这种场景。
xxxxxxxxxx
1
2# 方案1: mp.spawn()
3 mp.spawn(main_fun,
4 args=(opt.world_size, opt),
5 nprocs=opt.world_size,
6 join=True)
7# 方案2: Process
8 from torch.multiprocessing import Process
9 world_size = opt.world_size
10 processes = []
11 for rank in range(world_size):
12 p = Process(target=main_fun, args=(rank, world_size, opt))
13 p.start()
14 processes.append(p)
15 for p in processes:
16 p.join()
17
18# 方案3: torchrun
在写完 DDP 的代码之后,最好检查一遍,否则很容易因为漏了什么而出现莫名奇妙的错误,比如程序卡着不动了,也不报错)
大致需要检查:
DDP 初始化有没有完成,包括if __name__ == '__main__'
里和main
函数里的。退出main
函数时摧毁进程。
模型的封装,包括autocast,BN 层的转化和 DDP 封装
指定train_dloader
的sampler
、generator
和shuffle
,并且在每个epoch
设置sampler
,测试集、验证集同理。
训练时使用scaler
对loss
进行scale
对于print
、log
、save
等操作,仅在一个线程上进行。
测试时进行reduce
多个线程大致相当于增大了相应倍数的batch_size
,最好相应地调一调batch_size
和学习率。本文没有进行调节,导致测试获得的准确率有一些差别。
模型较小时速度差别不大,反而DDP与混合精度可能因为一些初始化和精度转换耗费额外时间而更慢。在模型较大时,DDP + 混合精度的速度要明显高于常规,且能降低显存占用。