pytorch 并行计算 DistributedDataParallel
pytorch
本文字数:2.3k 字 | 阅读时长 ≈ 10 min

pytorch 并行计算 DistributedDataParallel

pytorch
本文字数:2.3k 字 | 阅读时长 ≈ 10 min

参考 1 参考 2 参考 3

这部分是 nn.DataParallel 的后续,想看 nn.DataParallel点击这里

为什么要用 nn.parallel.DistributedDataParallel 呢,首先我们看 PyTorch 官网对 nn.DataParallel 的一段话

It is recommended to use DistributedDataParallel, instead of this class, to do multi-GPU training, even if there is only a single node. See: Use nn.parallel.DistributedDataParallel instead of multiprocessing or nn.DataParallel and Distributed Data Parallel.

这段话的意思是说即使在单机多卡中也建议使用 DistributedDataParallel,这就不得不说二者的区别

apex 是什么?apex 是由 Nvidia 维护的一个支持混合精度分布式训练的 PyTorch 扩展,不仅能加速收敛,还能节省显存,但由于本文是介绍并行计算,所以这里不作过多的 apex 介绍

1. 为什么要并行计算?

在我们训练大型数据集或者很大的模型时一块 GPU 很难放下,例如最初的 AlexNet 就是在两块 GPU 上计算的。并行计算一般采取两个策略:一个是模型并行,一个是数据并行。左图中是将模型的不同部分放在不同 GPU 上进行训练,最后汇总计算。而右图中是将数据放在不同 GPU 上进行训练,最后汇总计算,不仅能加快我们的计算速度,增大 BatchSize,一次 epoch 所需要的 iter 降低了,还能使结果更加精确(Batch 增大了)

2. 基本概念

3. DistributedDataParallel 的使用

nn.parallel.DistributedDataParallel 的使用一共有两种方法

3.1 数据处理

定义 main 函数以及对数据集的加载做变化

分布式的数据集加载不同于之前的单卡,这里需要将数据集分为 N 部分,N 为卡的数量。数据集加载方式变为: Datasets DistributedSampler BatchSampler DataLoader,有时候不用 BatchSampler

DistributedSampler 将数据集 N 等分,BatchSamper 将每一等分后的数据内部进行 batch 的划分

from torch.utils.data import BatchSampler, DataLoader
from torch.utils.data.distributed import DistributedSampler

train_datasets = MyDataSet(xxx)
# 给每个rank分配训练数据,比如一共800样本8张卡,那么每张卡对应分配100个样本
train_sampler = DistributedSampler(train_datasets)
# BatchSize=16,那么能分成100/16=6...4,多出4个样本,drop_last=True表示舍弃这四个样本,False将剩余4个样本为一组
train_batch_sampler = BatchSampler(train_sampler, batch_size, drop_last=True)
train_dataloader = DataLoader(train_datasets, batch_sampler=train_batch_sampler, pin_memory=True, num_workers=nw) # pin_memory将数据加载到GPU

3.2 torch.multiprocessing

torch.multiprocessing.spawn 是 PyTorch 的多进程包的一个函数,它用于启动多个进程,在某些情况下我们会看到使用 torch.distributed.launch 等工具来启动 DDP,而这里 mp.spawn 提供了一个更简洁的实现方式。其中 mp.spawn 的参数分别是:

mp.spawn(fn, args=(world_size), nprocs=world_size, join=True, daemon=False, start_method='spawn')

下面给一个完整的训练代码供调试~

import os
os.environ["CUDA_VISIBLE_DEVICES"] = "6, 7"

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP

def example(rank, world_size):
    # dist.init_process_group("gloo", init_method='tcp://127.0.0.1:6666', rank=rank, world_size=world_size)
    init_process_group(backend="nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)
    model = nn.Linear(10, 10).to(rank)
    ddp_model = DDP(model, device_ids=[rank])  # construct DDP model
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    outputs = ddp_model(torch.randn(20, 10).to(rank))
    labels = torch.randn(20, 10).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()
    print("finished rank: {}".format(rank))

def main():
    world_size = torch.cuda.device_count()
    mp.spawn(example, args=(world_size,), nprocs=world_size, join=True)

if __name__=="__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29500"
    main()

3.3 torch.distributed

下面代码是具体的初始化过程,其中有一个 dist.barrier()函数,这个函数在文末有详细说明,这里简单理解为:假设我们的 world_size=8,那么我们有 8 张 GPU 初始化,初始化有快有慢,快的 GPU 初始化会在 dist.barrier()处停下来等待,当所有的 GPU 都到达这个函数时,才会继续运行之后的代码

import os
os.environ["CUDA_VISIBLE_DEVICES"] = "6, 7"

import torch
import argparse
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP


def init_distributed(args):
    if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ:
        args.rank = int(os.environ["RANK"])
        args.world_size = int(os.environ['WORLD_SIZE'])
        args.gpu = int(os.environ['LOCAL_RANK'])
        print("os.environ[\"WORLD_SIZE\"]: ", os.environ["WORLD_SIZE"])
        print("os.environ[\"RANK\"]: ", os.environ["RANK"])
        print("os.environ[\"LOCAL_RANK\"]: ", os.environ["LOCAL_RANK"])
    else:
        print('Not using distributed mode')
        args.distributed = False
        return

    torch.cuda.set_device(args.gpu)
    args.dist_backend = 'nccl'  # 通信后端,nvidia GPU推荐使用NCCL
    print('| distributed init (rank {}): {}'.format(args.rank, args.dist_url), flush=True)
    dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url, world_size=args.world_size, rank=args.rank)
    dist.barrier()  # 等待所有进程都初始化完毕,即所有GPU都要运行到这一步以后在继续


def main(args):
    # dist.init_process_group("gloo", init_method='tcp://127.0.0.1:6666', rank=rank, world_size=world_size)
    init_distributed(args)
    rank = args.rank
    model = nn.Linear(10, 10).to(rank)
    ddp_model = DDP(model, device_ids=[rank])  # construct DDP model
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    outputs = ddp_model(torch.randn(20, 10).to(rank))
    labels = torch.randn(20, 10).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()
    print("finished rank: {}".format(rank))

if __name__=="__main__":
    parser = argparse.ArgumentParser(description='')
    parser.add_argument('--local_rank', default=0, help='if use distributed mode, must use variable local_rank')
    args = parser.parse_args()
    main(args)

运行命令: python -m torch.distributed.launch --nproc_per_node=2 train.py

比如在单机多卡中 --nproc_per_node=2 会自动给 os.environ["WORLD_SIZE"] 赋值为 2,可以将上面程序和 torch.multiprocessing 的程序进行对比

3.4 模型保存

保存模型的 checkpoints。这里需要注意的是,我们需要在 self.gpu_id == 0 (或者是 rank == 0 时)进行 checkpoints 的保存,否则我们可能会保存多份相同的 checkpoints。

- ckp = self.model.state_dict()
+ ckp = self.model.module.state_dict()
...
...
- if epoch % self.save_every == 0:
+ if self.gpu_id == 0 and epoch % self.save_every == 0:
      self._save_checkpoint(epoch)

4. 二者区别

二者的区别仅在于 dist.init_process_group()函数之前,第一个不需要命令行指定参数,第二个需要,但是第二个代码更为简洁。之后的操作就完全一致了

5. 一些 BUG 和问题

1. runtimeerror: address already in use

这种情况是端口被占用了,可能是由于你上次调试之后端口依旧占用的缘故,假设 88889 端口被占用了,用以下命令查询其 PID,然后杀掉即可。第二种方法是将当前终端关闭,重新开一个他会自动解除占用

lsof -i:88889
或者
netstat -tunlp|grep 88889
kill -9 PID

2. 如何理解 dist.barrier()函数?

详细参考StackOverflow

单机多卡环境下使用分布式训练具有更快的速度。PyTorch 在分布式训练过程中,对于数据的读取是采用主进程预读取并缓存,然后其它进程从缓存中读取,不同进程之间的数据同步具体通过 torch.distributed.barrier()实现

举个例子(来自 StackOverflow)

  if args.local_rank not in [-1, 0]:
        torch.distributed.barrier()  # Make sure only the first process in distributed training will download model & vocab

        ... (loads the model and the vocabulary)

    if args.local_rank == 0:
        torch.distributed.barrier()  # Make sure only the first process in distributed training will download model & vocab

假设我们有 4 张卡[0, 1, 2, 3],其中[0]卡是 first process 或者 base process,有些操作不需要所有的卡同时进行,比如在预处理的时候只用 base process 即可。在上述代码中,第一个 if 是说除了主卡之外的卡运行到此处会被 barrier,也就是说运行到这里就停止了,而 base process 不会停止会继续运行,执行预加载模型等操作,当主卡运行到第二个 if 时,他也会进入到 barrier,就是说他已经预加载完了,现在他也需要被 barrier 了。==此时所有的卡都进入到了 barrier==,意味着所有的卡可以继续运行(主卡已经加载完了,这个数据所有的卡都可以使用),可以理解为小弟在等大哥发号施令(小弟都在 barrier),当大哥准备好了以后(进入到 barrier),就告诉小弟可以出发了(所有的卡从 barrier 撤出)

a process is blocked by a barrier until all processes have encountered a barrier, upon which the barrier is lifted for all processes

9月 09, 2024
9月 06, 2024