基本介绍
Distributed Data-Parallel Training(DDP)
模型被复制到每个进程中,其中每个模型副本获得一组不同的输入数据样本,DDP
主要负责梯度通信,保持模型副本同步,并将其与梯度计算重叠,以加快计算速度。
单机多gpu
训练DataParallel
易于使用,但单进程多线程的特性容易受到GIL
的限制,速度较慢且容易出现显存不均匀等问题
1 2 3
| import torch gpus = [0,1,2,3] model = torch.nn.DataParallel(model,device_ids=gpus,output_device=gpus[0])
|
单机多gpu
训练DistributedDataParallel
通过MPI
实现CPU
通信,通过NCCL
实现GPU
通信,自动将代码分配给多个进程,分别在多个gpu
上运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import torch import torch.distributed as dist
dist.init_process_group(backend='nccl')
train_dataset = ... train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) train_loader = torch.utils.data.DataLoader(train_dataset,sampler=train_sampler)
model = ... model = torch.nn.parallel.DistributedDataParallel(model, device_ids=list(range(torch.cuda.device_count())))
|
1 2
| # 在执行时,需要调用启动器启动,在执行过程中启动器将当前进程的index传递给python CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 main.py
|
也可使用torch.multiprocessing
来替代这个启动器,见下文。
TorchElastic
为了避免多进程时,由于某个进程出现故障导致不同步或者崩溃等,若预计在训练过程中会发生故障或者资源会动态离开或者加入,则可使用torchelastic
。
模型并行训练
通过将单个模型拆分到不同的GPU
上,即相应地实现模型的forward
方法在不同的GPU
上移动中间输出,以共同为更大的模型服务。
基本的使用
1 2 3 4 5 6 7 8 9 10 11
| import torch
class Model(torch.nn.Module): def __init__(self): super(Model,self).__init__() self.net1 = torch.nn.Sequential(...).to('cuda:0') self.net2 = torch.nn.Sequential(...).to('cuda:1') def forward(self,x): return self.net2(self.net1(x).to('cuda:1'))
|
注意在使用时网络中间输出和标签的设备位置,并且由于GPU
之间张量的复制,导致速度比单GPU
更慢。
即为了避免在执行过程中,两个GPU
之间存在一个处于空闲状态,则将每个批次划分为多个pipeline
,使得当一个split
到达第二个子网络时,下一个split
也馈入第一个子网络。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import torch class Model(torch.nn.Module): def __init__(self): super(Model,self).__init__() self.net1 = torch.nn.Sequential(...).to('cuda:0') self.net2 = torch.nn.Sequential(...).to('cuda:1') self.split_size = 10
def forward(self,x): splits = iter(x.split(self.split_size, dim=0)) s_next = next(splits) s_prev = self.net1(s_next).to('cuda:1')
ret = [] for s_next in splits: s_prev = self.net2(s_prev) ret.append(s_prev) s_prev = self.net1(s_next).to('cuda:1') s_prev = self.net2(s_prev) ret.append(s_prev) return torch.cat(ret)
|
split_size
值较小会导致许多微小的CUDA
内核启动,而使用较大的 split_size
会导致在第一个和最后一个分割期间出现较长的空闲时间,因此split_size
的大小是一个可调的超参数。
但是对于上述方法,使用的是默认流操作,即下一个split
的计算不能与上一个split
的复制操作重叠,但是由于是两个不同的张量,因此是可以在两个GPU
上使用多个流,后续再探讨。
分布式数据并行训练
基本的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
import torch import torch.distributed as dist import torch.multiprocessing as mp from torch.nn.parallel import DistributedDataParallel as DDP
import os def setup(rank, world_size): os.environ['MASTER_ADDR'] = 'localhost' os.environ['MASTER_PORT'] = '12355' dist.init_process_group(backend='gloo',rank=rank,world_size=world_size)
def cleanup(): dist.destroy_process_group()
class Model(torch.nn.Module): def __init__(self): super(Model,self).__init__() self.net = ...
def forward(self,x): return self.net(x)
def basic(rank, world_size): print(f'running DDP example on rank {rank}.') setup(rank, world_size)
model = Model().to(rank) ddp_model = DDP(model, device_ids=[rank])
...
cleanup()
def run(demo_fn, world_size): mp.spawn(demo_fn, args=(world_size,), nprocs=world_size, join=True)
|
由于DDP
将模型状态从rank 0
进程广播到DDP
构造函数中的所有其他进程,因此不必担心不同的DDP
进程从不同的模型参数初始值开始;同时DDP
包装了较低级别的分布式通信详细信息,梯度的同步通信发生在反向传播时,并且与反向计算重叠。当backward()
返回时,param.grad
已经包含了同步的梯度张量。
使用DDP时保存和加载模型
一种优化方法是仅在一个进程中保存模型,然后将其加载到所有进程中,从而减少写开销。因为所有过程都从相同的参数开始,并且梯度在反向传播中同步,因此优化程序应将参数设置为相同的值。如果使用此优化,需要确保在保存完成之前不要启动所有进程。此外,在加载模块时,需要提供适当的map_location
参数以防止进程进入其他人的设备。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| def basic(rank, world_size): print(f'running DDP example on rank {rank}.') setup(rank, world_size)
model = Model().to(rank) ddp_model = DDP(model, device_ids=[rank]) path = tempfile.gettempdir() + "/model.checkpoint" if rank==0: torch.save(ddp_model.state_dict(), path) dist.barrier() map_location = {'cuda:%d'%0: 'cuda:%d'%rank} ddp_model.load_state_dict(torch.load(path,map_location=map_location)) ... if rank==0: os.remove(path)
cleanup()
|
将DDP
与模型并行性结合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import torch
class Model(torch.nn.Module): def __init__(self, dev0, dev1): super(Model,self).__init__() self.dev0 = dev0 self.dev1 = dev1 self.net1 = torch.nn.Sequential(...).to(dev0) self.net2 = torch.nn.Sequential(...).to(dev1) def forward(self,x): return self.net2(self.net1(x.to(self.dev0)).to(self.dev1)) def basic(rank, world_size): print(f'running DDP example on rank {rank}.') setup(rank, world_size)
dev0 = rank*2 dev1 = rank*2 + 1 model = Model(dev0,dev1) ddp_model = DDP(model) path = tempfile.gettempdir() + "/model.checkpoint"
...
cleanup()
|
在将多GPU
模型传递给DDP
时,不能设置device_ids
和output_device
,输入和输出数据将通过应用程序或model
的forward()
方法放置在适当的设备中。
参考:
SINGLE-MACHINE MODEL PARALLEL BEST PRACTICES
GETTING STARTED WITH DISTRIBUTED DATA PARALLEL