基本介绍
Distributed Data-Parallel Training(DDP)
模型被复制到每个进程中,其中每个模型副本获得一组不同的输入数据样本,DDP 主要负责梯度通信,保持模型副本同步,并将其与梯度计算重叠,以加快计算速度。
单机多gpu训练DataParallel
易于使用,但单进程多线程的特性容易受到GIL的限制,速度较慢且容易出现显存不均匀等问题
1 2 3
   | import torch gpus = [0,1,2,3] model = torch.nn.DataParallel(model,device_ids=gpus,output_device=gpus[0])
   | 
 
单机多gpu训练DistributedDataParallel
通过MPI实现CPU通信,通过NCCL实现GPU通信,自动将代码分配给多个进程,分别在多个gpu上运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
   | import torch import torch.distributed as dist
 
  dist.init_process_group(backend='nccl')
 
  train_dataset = ... train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) train_loader = torch.utils.data.DataLoader(train_dataset,sampler=train_sampler)
 
  model = ...  model = torch.nn.parallel.DistributedDataParallel(model, device_ids=list(range(torch.cuda.device_count())))
   | 
 
1 2
   | # 在执行时,需要调用启动器启动,在执行过程中启动器将当前进程的index传递给python CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 main.py
   | 
 
也可使用torch.multiprocessing来替代这个启动器,见下文。
TorchElastic
为了避免多进程时,由于某个进程出现故障导致不同步或者崩溃等,若预计在训练过程中会发生故障或者资源会动态离开或者加入,则可使用torchelastic。
模型并行训练
通过将单个模型拆分到不同的GPU上,即相应地实现模型的forward方法在不同的GPU上移动中间输出,以共同为更大的模型服务。
基本的使用
1 2 3 4 5 6 7 8 9 10 11
   | import torch
 
  class Model(torch.nn.Module):     def __init__(self):         super(Model,self).__init__()         self.net1 = torch.nn.Sequential(...).to('cuda:0')         self.net2 = torch.nn.Sequential(...).to('cuda:1')              def forward(self,x):         return self.net2(self.net1(x).to('cuda:1'))
   | 
 
注意在使用时网络中间输出和标签的设备位置,并且由于GPU之间张量的复制,导致速度比单GPU更慢。
即为了避免在执行过程中,两个GPU之间存在一个处于空闲状态,则将每个批次划分为多个pipeline,使得当一个split到达第二个子网络时,下一个split也馈入第一个子网络。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
   | import torch class Model(torch.nn.Module):     def __init__(self):         super(Model,self).__init__()         self.net1 = torch.nn.Sequential(...).to('cuda:0')         self.net2 = torch.nn.Sequential(...).to('cuda:1')         self.split_size = 10
      def forward(self,x):         splits = iter(x.split(self.split_size, dim=0))         s_next = next(splits)         s_prev = self.net1(s_next).to('cuda:1')
          ret = []         for s_next in splits:             s_prev = self.net2(s_prev)             ret.append(s_prev)             s_prev = self.net1(s_next).to('cuda:1')                  s_prev = self.net2(s_prev)         ret.append(s_prev)         return torch.cat(ret)
   | 
 
split_size 值较小会导致许多微小的CUDA内核启动,而使用较大的 split_size 会导致在第一个和最后一个分割期间出现较长的空闲时间,因此split_size的大小是一个可调的超参数。
但是对于上述方法,使用的是默认流操作,即下一个split的计算不能与上一个split的复制操作重叠,但是由于是两个不同的张量,因此是可以在两个GPU上使用多个流,后续再探讨。
分布式数据并行训练
基本的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
   | 
  import torch import torch.distributed as dist import torch.multiprocessing as mp from torch.nn.parallel import DistributedDataParallel as DDP
  import os def setup(rank, world_size):     os.environ['MASTER_ADDR'] = 'localhost'     os.environ['MASTER_PORT'] = '12355'          dist.init_process_group(backend='gloo',rank=rank,world_size=world_size)
  def cleanup():     dist.destroy_process_group()
 
 
  class Model(torch.nn.Module):     def __init__(self):         super(Model,self).__init__()         self.net = ...
      def forward(self,x):         return self.net(x)
  def basic(rank, world_size):     print(f'running DDP example on rank {rank}.')     setup(rank, world_size)
           model = Model().to(rank)      ddp_model = DDP(model, device_ids=[rank])
      ... 
      cleanup()
  def run(demo_fn, world_size):          mp.spawn(demo_fn, args=(world_size,), nprocs=world_size, join=True)
 
  | 
 
由于DDP将模型状态从rank 0进程广播到DDP构造函数中的所有其他进程,因此不必担心不同的DDP进程从不同的模型参数初始值开始;同时DDP包装了较低级别的分布式通信详细信息,梯度的同步通信发生在反向传播时,并且与反向计算重叠。当backward()返回时,param.grad已经包含了同步的梯度张量。
使用DDP时保存和加载模型
一种优化方法是仅在一个进程中保存模型,然后将其加载到所有进程中,从而减少写开销。因为所有过程都从相同的参数开始,并且梯度在反向传播中同步,因此优化程序应将参数设置为相同的值。如果使用此优化,需要确保在保存完成之前不要启动所有进程。此外,在加载模块时,需要提供适当的map_location 参数以防止进程进入其他人的设备。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
   | def basic(rank, world_size):     print(f'running DDP example on rank {rank}.')     setup(rank, world_size)
           model = Model().to(rank)      ddp_model = DDP(model, device_ids=[rank])          path = tempfile.gettempdir() + "/model.checkpoint"     if rank==0:                  torch.save(ddp_model.state_dict(), path)          dist.barrier()          map_location = {'cuda:%d'%0: 'cuda:%d'%rank}     ddp_model.load_state_dict(torch.load(path,map_location=map_location))          ...                if rank==0:         os.remove(path)
      cleanup()
   | 
 
将DDP与模型并行性结合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
   | import torch
 
  class Model(torch.nn.Module):     def __init__(self, dev0, dev1):         super(Model,self).__init__()         self.dev0 = dev0         self.dev1 = dev1         self.net1 = torch.nn.Sequential(...).to(dev0)         self.net2 = torch.nn.Sequential(...).to(dev1)              def forward(self,x):         return self.net2(self.net1(x.to(self.dev0)).to(self.dev1))      def basic(rank, world_size):     print(f'running DDP example on rank {rank}.')     setup(rank, world_size)
           dev0 = rank*2     dev1 = rank*2 + 1     model = Model(dev0,dev1)     ddp_model = DDP(model)          path = tempfile.gettempdir() + "/model.checkpoint"
      ... 
      cleanup()
   | 
 
在将多GPU模型传递给DDP时,不能设置device_ids和output_device,输入和输出数据将通过应用程序或model的forward()方法放置在适当的设备中。
参考:
SINGLE-MACHINE MODEL PARALLEL BEST PRACTICES
GETTING STARTED WITH DISTRIBUTED DATA PARALLEL