0%

pytorch-分布式并行训练

基本介绍

Distributed Data-Parallel Training(DDP)

模型被复制到每个进程中,其中每个模型副本获得一组不同的输入数据样本,DDP 主要负责梯度通信,保持模型副本同步,并将其与梯度计算重叠,以加快计算速度。

单机多gpu训练DataParallel

易于使用,但单进程多线程的特性容易受到GIL的限制,速度较慢且容易出现显存不均匀等问题

1
2
3
import torch
gpus = [0,1,2,3]
model = torch.nn.DataParallel(model,device_ids=gpus,output_device=gpus[0])

单机多gpu训练DistributedDataParallel

通过MPI实现CPU通信,通过NCCL实现GPU通信,自动将代码分配给多个进程,分别在多个gpu上运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import torch
import torch.distributed as dist

# 设置GPU之间通信使用的后端及端口
dist.init_process_group(backend='nccl')

# 对数据集进行划分,即将每个batch划分为几个partition
train_dataset = ...
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(train_dataset,sampler=train_sampler)

# 包装模型,将不同GPU上求得的梯度进行汇总并同步计算结果
model = ...
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=list(range(torch.cuda.device_count())))
1
2
# 在执行时,需要调用启动器启动,在执行过程中启动器将当前进程的index传递给python
CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 main.py

也可使用torch.multiprocessing来替代这个启动器,见下文。

TorchElastic

为了避免多进程时,由于某个进程出现故障导致不同步或者崩溃等,若预计在训练过程中会发生故障或者资源会动态离开或者加入,则可使用torchelastic

模型并行训练

通过将单个模型拆分到不同的GPU上,即相应地实现模型的forward方法在不同的GPU上移动中间输出,以共同为更大的模型服务。

基本的使用
1
2
3
4
5
6
7
8
9
10
11
import torch

# 在模型构建时,设置每一部分模型的device
class Model(torch.nn.Module):
def __init__(self):
super(Model,self).__init__()
self.net1 = torch.nn.Sequential(...).to('cuda:0')
self.net2 = torch.nn.Sequential(...).to('cuda:1')

def forward(self,x):
return self.net2(self.net1(x).to('cuda:1'))

注意在使用时网络中间输出和标签的设备位置,并且由于GPU之间张量的复制,导致速度比单GPU更慢。

通过pipelining inputs加速

即为了避免在执行过程中,两个GPU之间存在一个处于空闲状态,则将每个批次划分为多个pipeline,使得当一个split到达第二个子网络时,下一个split也馈入第一个子网络。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import torch
class Model(torch.nn.Module):
def __init__(self):
super(Model,self).__init__()
self.net1 = torch.nn.Sequential(...).to('cuda:0')
self.net2 = torch.nn.Sequential(...).to('cuda:1')
self.split_size = 10

def forward(self,x):
splits = iter(x.split(self.split_size, dim=0))
s_next = next(splits)
s_prev = self.net1(s_next).to('cuda:1')

ret = []
for s_next in splits:
s_prev = self.net2(s_prev)
ret.append(s_prev)
s_prev = self.net1(s_next).to('cuda:1')

s_prev = self.net2(s_prev)
ret.append(s_prev)
return torch.cat(ret)

split_size 值较小会导致许多微小的CUDA内核启动,而使用较大的 split_size 会导致在第一个和最后一个分割期间出现较长的空闲时间,因此split_size的大小是一个可调的超参数。

但是对于上述方法,使用的是默认流操作,即下一个split的计算不能与上一个split的复制操作重叠,但是由于是两个不同的张量,因此是可以在两个GPU上使用多个流,后续再探讨。

分布式数据并行训练

基本的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 设置正确的进程组

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

import os
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# 初始化进程组
dist.init_process_group(backend='gloo',rank=rank,world_size=world_size)

def cleanup():
dist.destroy_process_group()


# 创建模型
class Model(torch.nn.Module):
def __init__(self):
super(Model,self).__init__()
self.net = ...

def forward(self,x):
return self.net(x)

def basic(rank, world_size):
print(f'running DDP example on rank {rank}.')
setup(rank, world_size)

# 创建模型并将模型移动到对应的gpu id上
model = Model().to(rank) # 数据等也需要移动到rank id中去
ddp_model = DDP(model, device_ids=[rank])

... # loss optim等

cleanup()

def run(demo_fn, world_size):
# demo_fn是要运行的代码,nprocs是线程数,即每个线程执行demo_fn并向其中传入local_rank和args
mp.spawn(demo_fn, args=(world_size,), nprocs=world_size, join=True)

由于DDP将模型状态从rank 0进程广播到DDP构造函数中的所有其他进程,因此不必担心不同的DDP进程从不同的模型参数初始值开始;同时DDP包装了较低级别的分布式通信详细信息,梯度的同步通信发生在反向传播时,并且与反向计算重叠。当backward()返回时,param.grad已经包含了同步的梯度张量。

使用DDP时保存和加载模型

一种优化方法是仅在一个进程中保存模型,然后将其加载到所有进程中,从而减少写开销。因为所有过程都从相同的参数开始,并且梯度在反向传播中同步,因此优化程序应将参数设置为相同的值。如果使用此优化,需要确保在保存完成之前不要启动所有进程。此外,在加载模块时,需要提供适当的map_location 参数以防止进程进入其他人的设备。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def basic(rank, world_size):
print(f'running DDP example on rank {rank}.')
setup(rank, world_size)

# 创建模型并将模型移动到对应的gpu id上
model = Model().to(rank) # 数据等也需要移动到rank id中去
ddp_model = DDP(model, device_ids=[rank])

path = tempfile.gettempdir() + "/model.checkpoint"
if rank==0:
# 只在一个进程中保存模型
torch.save(ddp_model.state_dict(), path)
# 确保在进程1保存模型之后,进程1加载该模型
dist.barrier()
# 确保合适的map_location
map_location = {'cuda:%d'%0: 'cuda:%d'%rank}
ddp_model.load_state_dict(torch.load(path,map_location=map_location))

... # loss optim等

# 没有必要使用dist.barrier()来保护下面的文件删除,因为DDP的向后传递中的AllReduce操作已经充当了同步。
if rank==0:
os.remove(path)

cleanup()
DDP与模型并行性结合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import torch

# 在模型构建时,设置每一部分模型的device
class Model(torch.nn.Module):
def __init__(self, dev0, dev1):
super(Model,self).__init__()
self.dev0 = dev0
self.dev1 = dev1
self.net1 = torch.nn.Sequential(...).to(dev0)
self.net2 = torch.nn.Sequential(...).to(dev1)

def forward(self,x):
return self.net2(self.net1(x.to(self.dev0)).to(self.dev1))

def basic(rank, world_size):
print(f'running DDP example on rank {rank}.')
setup(rank, world_size)

# 创建模型并将模型移动到对应的gpu id上
dev0 = rank*2
dev1 = rank*2 + 1
model = Model(dev0,dev1)
ddp_model = DDP(model)

path = tempfile.gettempdir() + "/model.checkpoint"

... # loss optim等

cleanup()

在将多GPU模型传递给DDP时,不能设置device_idsoutput_device,输入和输出数据将通过应用程序或modelforward()方法放置在适当的设备中。

参考:

SINGLE-MACHINE MODEL PARALLEL BEST PRACTICES

GETTING STARTED WITH DISTRIBUTED DATA PARALLEL