【学习/记录】torchrun/DDP使用方法与注意事项


概述

基于pytorch自身的分布式训练,通过调用nccl等backend实现多机多卡并行训练。

面向 DDP手动启动 的实现

启动方式

python main.py

面向 torchrun 的实现

启动方式

torchrun main.py

重要环境变量参数说明

变量名 定义及说明
LOCAL_RANK 本地排名
RANK 全局排名
GROUP_RANK worker 组的排名。一个介于 0 和 max_nnodes 之间的数字。当每个节点运行一个 worker 组时,这是节点的排名
ROLE_RANK 所有具有相同角色的 worker 中的 worker 的排名。worker 的角色在 WorkerSpec 中指定
LOCAL_WORLD_SIZE 本地世界大小(例如本地运行的 worker 数);等于在 torchrun 上指定的 --nproc-per-node
WORLD_SIZE 世界大小(作业中 worker 的总数)
ROLE_WORLD_SIZE 使用在 WorkerSpec 中指定的相同角色启动的 worker 的总数
MASTER_ADDR 运行排名为 0 的 worker 的主机的 FQDN;用于初始化 Torch 分布式后端
MASTER_PORT MASTER_ADDR 上的端口,可用于托管 C10d TCP 存储
TORCHELASTIC_RESTART_COUNT 到目前为止的 worker 组重启次数
TORCHELASTIC_MAX_RESTARTS 配置的最大重启次数
TORCHELASTIC_RUN_ID 等于 rendezvous run_id(例如唯一的作业 ID)
PYTHON_EXEC 系统可执行文件覆盖。如果提供,python 用户脚本将使用 PYTHON_EXEC 的值作为可执行文件。默认情况下使用 sys.executable

rank问题

端口问题

个人模版

参考:

Pytorch - DDP教程 Pytorch - torchrun 弹性启动

import argparse
import logging
import os
import sys
from importlib import reload

import datasets
import torch
import torch.distributed as dist
import torch.nn as nn
from einops import rearrange
from torch.distributed.elastic.multiprocessing.errors import record
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.optim.adamw import AdamW
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.distributed import DistributedSampler
from torchvision import transforms

class CustomModel(nn.Module):
    def __init__(self, dim, heads=4, dim_head=32):
        super().__init__()
        self.heads = heads
        hidden_dim = dim_head * heads
        self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, bias=False)
        self.to_out = nn.Conv2d(hidden_dim, dim, 1)

    def forward(self, x):
        b, c, h, w = x.shape
        qkv = self.to_qkv(x)
        q, k, v = rearrange(
            qkv, "b (qkv heads c) h w -> qkv b heads c (h w)", heads=self.heads, qkv=3
        )
        k = k.softmax(dim=-1)
        context = torch.einsum("bhdn,bhen->bhde", k, v)
        out = torch.einsum("bhde,bhdn->bhen", context, q)
        out = rearrange(
            out, "b heads c (h w) -> b (heads c) h w", heads=self.heads, h=h, w=w
        )
        return self.to_out(out)

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))

@record
def main(custom_args):
    dist.init_process_group(backend="nccl")
    rank = dist.get_rank()
    print(f"Start running basic DDP example on rank {rank}.")

    # create model and move it to GPU with id rank
    gpu = rank % torch.cuda.device_count()
    world_size = dist.get_world_size()
    torch.cuda.set_device(gpu)
    torch.backends.cudnn.benchmark = True
    # ...
    dataset = Dataset()
    sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank) if (world_size > 1) else None
    dataloader = DataLoader(
        dataset, 
        sampler=sampler
    )
    rank_iter = iter(dataloader) 
    model=ToyModel().cuda()
    ddp_model = DDP(model, device_ids=[gpu])
    loss_fn = nn.MSELoss()
    optimizer = AdamW(ddp_model.parameters())
    optimizer.zero_grad()

    # train
    # batch = next(rank_iter)
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(gpu)
    loss = loss_fn(outputs, labels)
    loss.backward()
    optimizer.step()

    # A step finished
    # loss = dist.all_reduce(loss, op=torch.distributed.SUM)
    # print(loss / world_size)

if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description="Argparser for configuring [code base name to think of] codebase"
    )
    parser.add_argument("--cfg", type=str, default="config.yaml")
    args = parser.parse_args()
    main(args)

单机多卡模版

多机多卡模版

torch.distributed.launchtorchrun

主要优势

  1. worker 故障由重新启动所有 worker 优雅地处理。
  2. worker 的 RANK 和 WORLD_SIZE 被自动分配。
  3. 允许在最小和最大大小之间更改节点数量(弹性)。

修改说明

如果您的训练脚本适用于 torch.distributed.launch,则它将继续适用于 torchrun,但存在以下差异

  1. 无需手动传递 RANK、WORLD_SIZE、MASTER_ADDR 和 MASTER_PORT。
  2. 可以提供 rdzv_backend 和 rdzv_endpoint。对于大多数用户,这将设置为 c10d(参阅 rendezvous)。默认的 rdzv_backend 会创建一个非弹性 rendezvous,其中 rdzv_endpoint 保存主地址。
  3. 确保您的脚本中包含 load_checkpoint(path) 和 save_checkpoint(path) 逻辑。当任意数量的工作进程失败时,我们会使用相同的程序参数重启所有工作进程,因此您将丢失到最近检查点的进度(参阅弹性启动)。
  4. use_env 标志已移除。如果您之前通过解析 --local-rank 选项来解析本地排名,则需要从环境变量 LOCAL_RANK 中获取本地排名(例如,int(os.environ["LOCAL_RANK"]))。

以下是一个训练脚本的说明性示例,该脚本在每个 epoch 存储 checkpoint,因此在失败时丢失进度的最坏情况是一个完整 epoch 的训练量。

def main():
     args = parse_args(sys.argv[1:])
     state = load_checkpoint(args.checkpoint_path)
     initialize(state)

     # torch.distributed.run ensures that this will work
     # by exporting all the env vars needed to initialize the process group
     torch.distributed.init_process_group(backend=args.backend)

     for i in range(state.epoch, state.total_num_epochs)
          for batch in iter(state.dataset)
              train(batch, state.model)

          state.epoch += 1
          save_checkpoint(state)

梯度累积技术(gradient_accumulation)

参考:

(翻译自accelerate文档) 梯度累积是一种,通过在几个批次上累积梯度来完成的,允许在比你的机器通常能够适应内存更大的批量大小上进行训练的技术,你可以训练。这是,并且仅在执行了一定数量的批次之后才对优化器进行步进。 虽然技术上标准的梯度累积代码在分布式设置中可以正常工作(下方示例代码),但这不是最有效的方法,您可能会遇到相当大的减速! 在本教程中,您将看到如何快速设置梯度累积,并使用Accelerate中提供的实用程序执行它,总共添加一行新代码! 此示例使用一个非常简单的PyTorch训练循环,该循环每两个批次执行一次梯度累积:

device = "cuda"
model.to(device)

gradient_accumulation_steps = 2

for index, batch in enumerate(training_dataloader):
    inputs, targets = batch
    inputs = inputs.to(device)
    targets = targets.to(device)
    outputs = model(inputs)
    loss = loss_function(outputs, targets)
    loss = loss / gradient_accumulation_steps
    loss.backward()
    if (index + 1) % gradient_accumulation_steps == 0:
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()

变更为:

+ from accelerate import Accelerator
+ accelerator = Accelerator()

+ model, optimizer, training_dataloader, scheduler = accelerator.prepare(
+     model, optimizer, training_dataloader, scheduler
+ )

  for index, batch in enumerate(training_dataloader):
      inputs, targets = batch
-     inputs = inputs.to(device)
-     targets = targets.to(device)
      outputs = model(inputs)
      loss = loss_function(outputs, targets)
      loss = loss / gradient_accumulation_steps
+     accelerator.backward(loss)
      if (index+1) % gradient_accumulation_steps == 0:
          optimizer.step()
          scheduler.step()
          optimizer.zero_grad()

debug推荐使用

使用record修饰器,

from torch.distributed.elastic.multiprocessing.errors import record

@record
def main():
    # do train
    pass

if __name__ == "__main__":
    main()

多卡损失(仅值,不涉及反向传播)收集方法与对backward某些常见误解的解释

https://github.com/NVIDIA/DeepLearningExamples/blob/777d174008c365a5d62799a86f135a4f171f620e/PyTorch/Classification/ConvNets/image_classification/training.py#L231

https://github.com/NVIDIA/DeepLearningExamples/blob/777d174008c365a5d62799a86f135a4f171f620e/PyTorch/Classification/ConvNets/image_classification/utils.py#L117-L123

根据:https://github.com/huggingface/diffusers/pull/9482#issuecomment-2400424101

参考页面

Pytorch torchrun 弹性启动 Training Script DDP内部设计

声明:烈火灼冰|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - 【学习/记录】torchrun/DDP使用方法与注意事项


离离沐雪踏轻尘