一、概念
数据并行(Data Parallel)是一种常见的并行化策略,用于加速模型训练过程。数据并行的基本思想是将训练数据分成多个子集,并将这些子集分配给多个处理单元(如GPU或CPU核心),每个处理单元独立地计算其子集上的梯度,然后将这些梯度汇总起来更新模型参数。当然,这也意味着每个处理单元上都需要有一个完整的模型副本。
二、原理及优势
1、原理
- 数据分割:首先,将训练数据集分成多个子集,每个子集分配给一个处理单元(如GPU)。
- 并行计算:其次,每个处理单元使用相同的模型副本在其子集上进行前向传播和后向传播,计算梯度。
- 梯度汇总:再次,将所有处理单元计算的梯度汇总(通常是求平均)。在每个迭代步骤结束时,所有GPU上的梯度需要被聚合。这个过程通常通过特定的通信操作来完成,如AllReduce操作。AllReduce操作会将所有GPU上的梯度相加,然后平均,以获得全局的梯度。
- 参数更新:再次,使用汇总后的梯度更新所有模型副本的参数。
2、优势
- 加速训练:通过并行处理多个数据子集,可以显著加快模型训练速度。
- 扩展性好:可以轻松扩展到多个 GPU 或多个机器上,适用于大规模数据集。
- 资源利用率高:充分利用多核 CPU 或多 GPU 的计算能力,提高资源利用率。
三、python实现
这里,我们演示一下在拥有两个GPU的服务器上实现数据并行。好消息是,我们并不需要自行编写数据并行的代码,而是可以使用PyTorch提供的torch.nn.DataParallel
或torch.nn.parallel.DistributedDataParalle。
DataParallel(DP)是一个简单的数据并行包装器,它可以自动地将我们的数据和模型复制到每个GPU上,并在每个GPU上并行执行前向和反向传播;而DistributedDataParallel(DDP)是一个更高级的数据并行实现,它提供了更好的性能和扩展性,特别是在多GPU和多节点环境中。
这里,我们使用简单便捷的DataParallel作为示例。需要注意的是,DP默认在第0个处理单元上汇总各个模型副本的输出并计算梯度,因此一般来说第0个处理单元的内存占用会比其他的处理单元要高一些。
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
class taskDataset(Dataset):
def __init__(self):
# 这里我们随机生成一百万条样本
self.data = torch.randn(2000, 20)
self.label = torch.randint(0, 2, (2000,))
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return self.data[idx], self.label[idx]
class taskModel(nn.Module):
def __init__(self):
super(taskModel, self).__init__()
self.fc1 = nn.Linear(20, 10)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(10, 3)
def forward(self, input):
x = self.fc1(input)
x = self.relu(x)
x = self.fc2(x)
print(f"\t In Model: input size {input.size()}, output size {x.size()}")
return x
# 初始化模型
model = taskModel()
# 如果有多个GPU,使用DataParallel包装模型,自动为每个GPU创建一个模型副本
if torch.cuda.device_count() > 1:
print(f"使用 {torch.cuda.device_count()} 个GPU训练模型!")
model = nn.DataParallel(model)
# 将各个模型副本移动到GPU上
model.to('cuda')
# 检查模型是否已经在多个GPU上复制
if isinstance(model, nn.DataParallel):
print("使用的GPU数量:", len(model.device_ids))
print("模型副本所在的GPU:", model.device_ids)
print("原始模型:", model.module)
else:
print("模型没有被封装在DataParallel中。")
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)
# 加载数据集
dataset = taskDataset()
dataloader = DataLoader(dataset, batch_size=100, shuffle=True)
print('开始训练:')
# 训练模型10个epoch
model.train()
# 程序开始时间
start_time = time.time()
for iter in range(10):
for idx, (inputs, labels) in enumerate(dataloader):
# 自动会将当前batch的数据分成N个子batch传给每个GPU
print(f"iter {iter}, batch {idx}")
inputs, labels = inputs.to('cuda'), labels.to('cuda')
optimizer.zero_grad()
outputs = model(inputs)
print(f"\t Outside: input size {inputs.size()}, output_size {outputs.size()}")
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# 程序结束时间
end_time = time.time()
# 计算运行时间
elapsed_time = end_time - start_time
print(f"模型训练时间:{elapsed_time}秒")
对于单机单卡的小伙伴,我们不妨去白嫖一下Kaggle提供的GPU T4 × 2的服务器环境,运行数据并行代码之后可以看到,每个GPU都参与了模型训练。
同时,通过打印模型内部的输入输出大小,以及训练过程中调用模型时的输入输出大小,我们可以直观地评估当前的数据并行策略是否生效。下图可见,每个模型副本都分到了batch_size/2的样本,且torch自动汇总了各个模型副本的输出:
四、总结
DP是一个非常便捷的数据并行方法,但如果我们需要更高的性能和可拓展性,推荐使用DDP,这个笔者后面会介绍。此外,对于数据并行方法而言,不同处理单元之间的通信会消耗一定的时间,如果我们的batch_size较少,则通信频率提高了,相应的通信耗时也会变多,这也解释了为什么有时候使用数据并行训练的速度反而还不如单核GPU。数据集规模、batch_size设置都会对训练速度产生重要影响,当我们的数据集规模在百万以内或者模型结构较为简单时,并不推荐使用数据并行策略,而如果使用数据并行策略,那么batch_size可以适当调大一些。