deepspeed 基本使用
package
本文字数:5.9k 字 | 阅读时长 ≈ 30 min

deepspeed 基本使用

package
本文字数:5.9k 字 | 阅读时长 ≈ 30 min

1. deepspeed 的基本用法

1.1 deepspeed 安装

deepspeed 的安装非常简单,只需要运行以下命令即可

pip install deepspeed

在此之前还需要安装 python,pytorch 等基本环境,这里就不赘述了

1.2 配置 json 文件

deepseed 的使用也非常简单,首先需要准备一个 json 文件,我们新建一个 config.json 文件来放训练的必要信息

# config.json
{
    "train_batch_size": 4,
    "steps_per_print": 2000,
    "optimizer": {
      "type": "Adam",
      "params": {
        "lr": 0.001,
        "betas": [
          0.8,
          0.999
        ],
        "eps": 1e-8,
        "weight_decay": 3e-7
      }
    },
    "scheduler": {
      "type": "WarmupLR",
      "params": {
        "warmup_min_lr": 0,
        "warmup_max_lr": 0.001,
        "warmup_num_steps": 1000
      }
    },
    "wall_clock_breakdown": false
  }

1.3 deepspeed 使用

1. 配置训练参数

def add_argument():
    parser = argparse.ArgumentParser(description='CIFAR')
    parser.add_argument('-b', '--batch_size', default=32, type=int, help='mini-batch size (default: 32)')
    parser.add_argument('-e', '--epochs', default=30, type=int, help='number of total epochs (default: 30)')
    parser.add_argument('--local_rank', type=int, default=-1, help='local rank passed from distributed launcher')
    parser.add_argument('--log-interval', type=int, default=2000, help="output logging information at a given interval")

    parser = deepspeed.add_config_arguments(parser)  # deepspeed的参数
    args = parser.parse_args()
    return args

args = add_argument()

2. 初始化网络

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16*5*5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

net = Net()
parameters = filter(lambda p: p.requires_grad, net.parameters())

3. 加载数据

transform = transforms.Compose(
  [transforms.ToTensor(),
 transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=16, shuffle=True, num_workers=2)
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=4, shuffle=False, num_workers=2)

4. 初始化 deepspeed

model_engine, optimizer, trainloader, __ = deepspeed.initialize(
    args=args, model=net, model_parameters=parameters, training_data=trainset)

5. 训练和测试

criterion = nn.CrossEntropyLoss()
for epoch in range(2):
    running_loss = 0.0
    for i, data in enumerate(trainloader):
        inputs, labels = data[0].to(model_engine.local_rank), data[1].to(model_engine.local_rank)
        outputs = model_engine(inputs)
        loss = criterion(outputs, labels)
        model_engine.backward(loss)
        model_engine.step()

        # print statistics
        running_loss += loss.item()
        if i % args.log_interval == (args.log_interval - 1):
            print('[%d %5d] loss: %.3f' % (epoch+1, i+1, running_loss / args.log_interval))
            running_loss = 0.0


correct = 0
total = 0
with torch.no_grad():
    for data in testloader:
        images, labels = data
        outputs = net(images.to(model_engine.local_rank))
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels.to(model_engine.local_rank)).sum().item()
print('Accuracy of the network on the 10000 test images: %d %%' % (100 * correct / total))

每一步都写好之后,我们运行下面命令来启动程序

deepspeed --include localhost:7 test.py --deepspeed_config config.json

接下来就可以看到训练过程了

[2023-08-11 15:28:13,128] [INFO] [real_accelerator.py:110:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-08-11 15:28:14,693] [WARNING] [runner.py:196:fetch_hostfile] Unable to find hostfile, will proceed with training with local resources only.
[2023-08-11 15:28:14,694] [INFO] [runner.py:555:main] cmd = /home/wangyh/miniconda3/envs/llava2/bin/python -u -m deepspeed.launcher.launch --world_info=eyJsb2NhbGhvc3QiOiBbN119 --master_addr=127.0.0.1 --master_port=29500 --enable_each_rank_log=None test.py --deepspeed_config config.json
[2023-08-11 15:28:15,862] [INFO] [real_accelerator.py:110:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-08-11 15:28:17,418] [INFO] [launch.py:145:main] WORLD INFO DICT: {'localhost': [7]}
[2023-08-11 15:28:17,418] [INFO] [launch.py:151:main] nnodes=1, num_local_procs=1, node_rank=0
[2023-08-11 15:28:17,418] [INFO] [launch.py:162:main] global_rank_mapping=defaultdict(<class 'list'>, {'localhost': [0]})
[2023-08-11 15:28:17,418] [INFO] [launch.py:163:main] dist_world_size=1
[2023-08-11 15:28:17,418] [INFO] [launch.py:165:main] Setting CUDA_VISIBLE_DEVICES=7
[2023-08-11 15:28:18,729] [INFO] [real_accelerator.py:110:get_accelerator] Setting ds_accelerator to cuda (auto detect)
Files already downloaded and verified
Files already downloaded and verified
[2023-08-11 15:28:21,691] [INFO] [logging.py:96:log_dist] [Rank -1] DeepSpeed info: version=0.9.5, git-hash=unknown, git-branch=unknown
[2023-08-11 15:28:21,691] [WARNING] [comm.py:152:init_deepspeed_backend] NCCL backend in DeepSpeed not yet implemented
[2023-08-11 15:28:21,691] [INFO] [comm.py:594:init_distributed] cdb=None
[2023-08-11 15:28:21,691] [INFO] [comm.py:625:init_distributed] Initializing TorchBackend in DeepSpeed with backend nccl
[2023-08-11 15:28:22,465] [INFO] [logging.py:96:log_dist] [Rank 0] DeepSpeed Flops Profiler Enabled: False
Installed CUDA version 11.3 does not match the version torch was compiled with 11.7 but since the APIs are compatible, accepting this combination
Using /home/wangyh/.cache/torch_extensions/py310_cu117 as PyTorch extensions root...
Detected CUDA files, patching ldflags
Emitting ninja build file /home/wangyh/.cache/torch_extensions/py310_cu117/fused_adam/build.ninja...
Building extension module fused_adam...
Allowing ninja to set a default number of workers... (overridable by setting the environment variable MAX_JOBS=N)
ninja: no work to do.
Loading extension module fused_adam...
Time to load fused_adam op: 0.0886220932006836 seconds
[2023-08-11 15:28:22,849] [INFO] [logging.py:96:log_dist] [Rank 0] Using DeepSpeed Optimizer param name adam as basic optimizer
......

2. deepspeed 的进阶技巧

2.1 指定特定显卡

我们只展示单机多卡的情况,运行 deepspeed --include localhost:4,5,6,7 train.py --deepspeed_config config.json,其中 include 这个参数就可以指定卡的数量,这里我们指定 4,5,6,7 即四张卡来训练,如果不指定的话,deepspeed 会自动选择所有可用的卡来训练

如果还有其他的参数,在 --deepspeed_config 这个参数后面加即可

2.2 如何用 vscode 调试 deepspeed 程序

launch.json 文件改为以下内容即可,其中 program 那一行的 llava2 改为自己的环境名字

{
    // Use IntelliSense to learn about possible attributes.
    // Hover to view descriptions of existing attributes.
    // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Current File",
            "type": "python",
            "request": "launch",
            "program": "/home/wangyh/miniconda3/envs/llava2/bin/deepspeed",
            "console": "integratedTerminal",
            "justMyCode": true,
            "args": [
                "--include", "localhost:7",
                "test.py",
                "--deepspeed_config", "/data/wangyh/mllms/deepspeed_test/config.json",
            ],
        }
    ]
}

2.3 DeepSpeed 基本运行命令

deepspeed --master_port 29500 \
  --num_gpus 2 \
  train.py \
  --deepspeed ds_config.json

2.4 Stage2 和 Stage3 的一些例子

stage2 的基本作用

{
    "bfloat16": {
        "enabled": "auto"
    },
    "fp16": {
        "enabled": "auto",
        "loss_scale": 0,
        "loss_scale_window": 1000,
        "initial_scale_power": 16,
        "hysteresis": 2,
        "min_loss_scale": 1
    },
    "optimizer": {
        "type": "AdamW",
        "params": {
            "lr": "auto",
            "betas": "auto",
            "eps": "auto",
            "weight_decay": "auto"
        }
    },
    "scheduler": {
        "type": "WarmupLR",
        "params": {
            "warmup_min_lr": "auto",
            "warmup_max_lr": "auto",
            "warmup_num_steps": "auto"
        }
    },
    "zero_optimization": {
        "stage": 2,
        "offload_optimizer": {
            "device": "cpu",
            "pin_memory": true
        },
        "allgather_partitions": true,
        "allgather_bucket_size": 2e8,
        "overlap_comm": true,
        "reduce_scatter": true,
        "reduce_bucket_size": 2e8,
        "contiguous_gradients": true
    },
    "gradient_accumulation_steps": "auto",
    "gradient_clipping": "auto",
    "train_batch_size": "auto",
    "train_micro_batch_size_per_gpu": "auto",
    "steps_per_print": 1e5
}

stage3 的基本作用

{
    "bfloat16": {
        "enabled": false
    },
    "fp16": {
        "enabled": "auto",
        "loss_scale": 0,
        "loss_scale_window": 1000,
        "initial_scale_power": 16,
        "hysteresis": 2,
        "min_loss_scale": 1
    },
    "optimizer": {
        "type": "AdamW",
        "params": {
            "lr": "auto",
            "betas": "auto",
            "eps": "auto",
            "weight_decay": "auto"
        }
    },
    "scheduler": {
        "type": "WarmupLR",
        "params": {
            "warmup_min_lr": "auto",
            "warmup_max_lr": "auto",
            "warmup_num_steps": "auto"
        }
    },
    "zero_optimization": {
        "stage": 3,
        "offload_optimizer": {
            "device": "cpu",
            "pin_memory": true
        },
        "offload_param": {
            "device": "cpu",
            "pin_memory": true
        },
        "overlap_comm": true,
        "contiguous_gradients": true,
        "sub_group_size": 1e9,
        "reduce_bucket_size": "auto",
        "stage3_prefetch_bucket_size": "auto",
        "stage3_param_persistence_threshold": "auto",
        "stage3_max_live_parameters": 1e9,
        "stage3_max_reuse_distance": 1e9,
        "stage3_gather_fp16_weights_on_model_save": true
    },
    "gradient_accumulation_steps": "auto",
    "gradient_clipping": "auto",
    "steps_per_print": 1e5,
    "train_batch_size": "auto",
    "train_micro_batch_size_per_gpu": "auto",
    "wall_clock_breakdown": false
}

3. deepspeed 应用分析

3.1 基本介绍

DeepSpeed 官方教程
Huggingface 官方教程
DeepSpeed 论文

  1. Optimizer state partitioning (ZeRO stage 1)
  2. Gradient partitioning (ZeRO stage 2)
  3. Parameter partitioning (ZeRO stage 3)
  4. Custom mixed precision training handling
  5. A range of fast CUDA-extension-based optimizers
  6. ZeRO-Offload to CPU and NVMe

最多的我们使用的是前三种,第一种是针对 Optimizer 的优化,第二种主要是对梯度的优化(因此在 inference 的时候是无效的),第三种是对模型参数的优化(可以大模型切分到多张卡上)

这里只介绍 DeepSpeed 在 Huggingface 中的应用

训练过程:支持 ZeRO stage1,2,3 以及 Infinity
推理过程:支持 ZeRO stage3 以及 Infinity

启动方式

# 常规的pytorch DDP启动
torch.distributed.run --nproc_per_node=2 your_program.py <normal cl args> --deepspeed ds_config.json

# deepspeed专属启动方式
deepspeed --num_gpus=2 your_program.py <normal cl args> --deepspeed ds_config.json

注意,如果你要指定使用的 GPU,比如你想用 GPU 1,那么使用 CUDA_VISIBLE_DEVICES 是无效的,需要使用 localhost 参数

deepspeed --include localhost:1 examples/pytorch/translation/run_translation.py ...

注意,在 deepspeed 文档中的很多参数会和 trainer 中的有冲突,所以为了避免冲突的发生,你需要将参数的值设为 auto,它能够自动替换为正确或者最有效的值(如果二者的参数不匹配,训练可能会失败)

Stage two

{
    "zero_optimization": {
        "stage": 2,
        "offload_optimizer": {
            "device": "cpu",
            "pin_memory": true
        },
        "allgather_partitions": true,
        "allgather_bucket_size": 5e8,
        "overlap_comm": true,
        "reduce_scatter": true,
        "reduce_bucket_size": 5e8,
        "contiguous_gradients": true
    }
}

Stage three

{
    "zero_optimization": {
        "stage": 3,
        "offload_optimizer": {
            "device": "cpu",
            "pin_memory": true
        },
        "offload_param": {
            "device": "cpu",
            "pin_memory": true
        },
        "overlap_comm": true,
        "contiguous_gradients": true,
        "sub_group_size": 1e9,
        "reduce_bucket_size": "auto",
        "stage3_prefetch_bucket_size": "auto",
        "stage3_param_persistence_threshold": "auto",
        "stage3_max_live_parameters": 1e9,
        "stage3_max_reuse_distance": 1e9,
        "stage3_gather_16bit_weights_on_model_save": true
    }
}

Others

{
    "fp16": {
        "enabled": true,
        "auto_cast": false,
        "loss_scale": 0,
        "initial_scale_power": 16,
        "loss_scale_window": 1000,
        "hysteresis": 2,
        "consecutive_hysteresis": false,
        "min_loss_scale": 1
    }
    "bf16": {
        "enabled": true
    }
}

训练的时候 grad_norm 为 nan 或者了 loss=0
https://github.com/microsoft/DeepSpeed/issues/5242
将 common_overlap 设为 false

DeepspeedExample

1. deepspeed 基本原理

现在的模型越来越大,特别是大模型动辄 7B,13B,70B,这种大模型甚至没办法在 A100-80G 上进行训练,deepseed 的思想就是 GPU 显存不够,CPU 内存来凑,用时间换空间,这里需要优化的参数包括: 模型参数,优化器参数,梯度

DeepSpeed 模型训练是通过 DeepSpeed 引擎完成的。这个引擎可以包装任何类型为 torch.nn.module 的模型,并且有一套最基本的 API 用于训练和检查点保存模型

model_engine, optimizer, _, _ = deepspeed.initialize(args=cmd_args, model=model,model_parameters=params,)

如果您已经设置了分布式环境,您将需要把以下代码:

torch.distributed.init_process_group(...)  # remove
deepspeed.init_distributed()  # replace

如果您在 deepspeed.initialize()之后才需要设置分布式环境,就不必使用这个函数,因为 DeepSpeed 会在初始化时自动设置分布式环境。但不管怎样,如果已经设置了 torch.distributed.init_process_group,就需要将其移除。

当我们说“已经设置了分布式环境”,这意味着在一个计算机集群或多台计算机之间,已经配置好了相互通信和数据共享的必要设置,从而使得这些计算机能够协同工作,共同执行分布式计算任务。在机器学习和深度学习领域,这通常涉及到在多个 GPU 或服务器上并行处理数据和模型。

关于“如果您在 deepspeed.initialize()之后才需要设置分布式环境”的问题,这里有几个可能的场景:

Training#
DeepSpeed 的 training 流畅和 Pytorch 其实差别不大,但实际上,DeepSpeed 可自动执行分布式数据并行训练(混合精度)所需的必要操作,并使用预定义的 learning rate scheduler

for step, batch in enumerate(data_loader):
    # forward() method
    loss = model_engine(batch)

    # runs backpropagation
    model_engine.backward(loss)

    # weight update
    model_engine.step()

Gradient Averaging:在 DDP 中,backward 会确保在每个 training batch 结束后,梯度在 DP 处理中被平均。(这个很好理解,在前面的 ZeRO 博客中也有详细介绍)

Loss Scaling:在 FP16/混合精度训练中,DeepSpeed 引擎会自动处理损失缩放,以避免在梯度中的精度损失。

这里的 Loss Scaling 可能需要一定的介绍。Loss scaling 是一种在训练深度学习模型时使用较小数据类型(如 FP16)时常用的技术,旨在防止在梯度计算中出现的数值下溢。由于使用了 FP16 等较小的数据类型,在返现传播中小梯度可能直接变为 0 了,从而无法有效的更新模型权重。Loss scaling 将先放大 Loss 进而放大梯度,再反向传播后再将梯度缩放回原来的大小,保持更多的精度信息。

具体的实现可以参考 DeepSpeed 中 DynamicLossScaler 的实现。 实际上,DeepSpeed 中实现的核心思想为,保证数值稳定的前提下,尽可能使用最大的 loss scale。

Learning Rate Scheduler:当使用 DeepSpeed 的学习率调度器(在 ds_config.json 文件中指定)时,DeepSpeed 会在每个训练步骤调用调度器的 step() 方法(当执行 model_engine.step() 时)。如果不使用 DeepSpeed 的学习率调度器的话,则主要分为以下两种情况:

在每个 training step 中执行自定义的调度器,可以在 deepspeed.initialize 添加参数 lr_scheduler=,用于定制化用户需要的 scheduler。
或者我们需要按照某些特殊的间隔/周期进行学习率的调整,这时候我们就无须传给 DeepSpeed (他会在每个 training step 执行),而是自己手动的管理。
Model Checkpointing#
如果我们需要保存或者加载 DeepSpeed 模型的 checkpointing,我们只需要使用两个函数:save_checkpoint 和 load_checkpoint 即可,他们用两个参数 ckpt_dir 和 ckpt_id 来唯一标识一个 checkpoint。

下面是 DeepSpeed 官方给出的 Example Code。

# load checkpoint
# client_sd : Dict
_, client_sd = model_engine.load_checkpoint(args.load_dir, args.ckpt_id)
step = client_sd['step']

# advance data loader to ckpt step
dataloader_to_step(data_loader, step + 1)

for step, batch in enumerate(data_loader):

    # forward() method
    loss = model_engine(batch)

    # runs backpropagation
    model_engine.backward(loss)

    # weight update
    model_engine.step()

    # save checkpoint
    if step % args.save_interval:
        client_sd['step'] = step
        ckpt_id = loss.item()
        model_engine.save_checkpoint(args.save_dir, ckpt_id, client_sd = client_sd)

这段代码 client_sd 的定义容易让人 confused。实际上,DeepSpeed 可以自动保存和恢复模型、优化器以及学习率调度器的状态,同时对用户隐藏这些细节。但是用户可能有一些其他的信息需要保存。为了支持这个功能,DeepSpeed 非常贴心的提供了对应的接口, save_checkpoint 接受一个状态字典 client_sd 用于保存,同时也能从 load_checkpoint 读取出来。

⚠️Important:所有进程都必须调用 load_checkpoint,而不仅仅是 rank 为 0 的进程(主进程)。 这是因为每个进程都需要保存其主权重,调度器和优化器状态。如果只有主进程调用该方法,那么在等待与其他进程同步时就会挂起。

DeepSpeed Configuration#
DeepSpeed 的配置可以通过一个 JSON 文件实现,具体的文件名在程序中应该被标识为:args.deepspeed_config。下面将简单的介绍一个 Example,完整的特性可以参考 DS_CONFIG doc。

{
  "train_batch_size": 8,
  "gradient_accumulation_steps": 1,
  "optimizer": {
    "type": "Adam",
    "params": {
      "lr": 0.00015
    }
  },
  "fp16": {
    "enabled": true
  },
  "zero_optimization": true
}

单机多卡 Resource Configuration (single-node)#
Official Section URL

在单机的情况下, DeepSpeed 的使用比较简单,因为 DeepSpeed 会自动的检测所有可用的 GPU 数量。所以,我们只需要使用 --include 和 --exclude 用来选择或者反选一些 GPU。用 localhost 来指代主机名,比如如下代码使用 GPU0,GPU1 进行训练。

⚠️ 注意,不能在 DeepSpeed 中使用 CUDA_VISIBLE_DEVICES 来控制应使用哪些设备。

deepspeed --include localhost:0,1 ...
#!/bin/bash

deepspeed --bind_cores_to_rank cifar10_deepspeed.py --deepspeed $@
import argparse

import deepspeed
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from deepspeed.accelerator import get_accelerator
from deepspeed.moe.utils import split_params_into_different_moe_groups_for_optimizer


def add_argument():
    parser = argparse.ArgumentParser(description="CIFAR")

    # For train.
    parser.add_argument("--epochs", default=30, type=int, help="number of total epochs (default: 30)")
    parser.add_argument("--local_rank", type=int, default=-1, help="local rank passed from distributed launcher")
    parser.add_argument("--log-interval", type=int, default=2000, help="output logging information at a given interval",)
    # For mixed precision training.
    parser.add_argument("--dtype", default="fp16", type=str, choices=["bf16", "fp16", "fp32"], help="Datatype used for training",)
    # For ZeRO Optimization.
    parser.add_argument("--stage", default=0, type=int, choices=[0, 1, 2, 3], help="Datatype used for training",)

    # For MoE (Mixture of Experts).
    parser.add_argument("--moe", default=False, action="store_true", help="use deepspeed mixture of experts (moe)",)
    parser.add_argument("--ep-world-size", default=1, type=int, help="(moe) expert parallel world size")
    parser.add_argument("--num-experts", type=int, nargs="+", default=[1,], help="number of experts list, MoE related.",)
    parser.add_argument("--mlp-type", type=str, default="standard", help="Only applicable when num-experts > 1, accepts [standard, residual]",)
    parser.add_argument("--top-k", default=1, type=int, help="(moe) gating top 1 and 2 supported")
    parser.add_argument("--min-capacity", default=0, type=int, help="(moe) minimum capacity of an expert regardless of the capacity_factor",)
    parser.add_argument("--noisy-gate-policy", default=None, type=str, help="(moe) noisy gating (only supported with top-1). Valid values are None, RSample, and Jitter")
    parser.add_argument("--moe-param-group", default=False, action="store_true", help="(moe) create separate moe param groups, required when using ZeRO w. MoE",)
    parser = deepspeed.add_config_arguments(parser)
    args = parser.parse_args()
    return args


def create_moe_param_groups(model):
    """Create separate parameter groups for each expert."""
    parameters = {"params": [p for p in model.parameters()], "name": "parameters"}
    return split_params_into_different_moe_groups_for_optimizer(parameters)


def get_ds_config(args):
    """Get the DeepSpeed configuration dictionary."""
    ds_config = {
        "train_batch_size": 16,
        "steps_per_print": 2000,
        "optimizer": {
            "type": "Adam",
            "params": {
                "lr": 0.001,
                "betas": [0.8, 0.999],
                "eps": 1e-8,
                "weight_decay": 3e-7,
            },
        },
        "scheduler": {
            "type": "WarmupLR",
            "params": {
                "warmup_min_lr": 0,
                "warmup_max_lr": 0.001,
                "warmup_num_steps": 1000,
            },
        },
        "gradient_clipping": 1.0,
        "prescale_gradients": False,
        "bf16": {"enabled": args.dtype == "bf16"},
        "fp16": {
            "enabled": args.dtype == "fp16",
            "fp16_master_weights_and_grads": False,
            "loss_scale": 0,
            "loss_scale_window": 500,
            "hysteresis": 2,
            "min_loss_scale": 1,
            "initial_scale_power": 15,
        },
        "wall_clock_breakdown": False,
        "zero_optimization": {
            "stage": args.stage,
            "allgather_partitions": True,
            "reduce_scatter": True,
            "allgather_bucket_size": 50000000,
            "reduce_bucket_size": 50000000,
            "overlap_comm": True,
            "contiguous_gradients": True,
            "cpu_offload": False,
        },
    }
    return ds_config


class Net(nn.Module):
    def __init__(self, args):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.moe = args.moe
        if self.moe:
            fc3 = nn.Linear(84, 84)
            self.moe_layer_list = []
            for n_e in args.num_experts:
                # Create moe layers based on the number of experts.
                self.moe_layer_list.append(
                    deepspeed.moe.layer.MoE(
                        hidden_size=84,
                        expert=fc3,
                        num_experts=n_e,
                        ep_size=args.ep_world_size,
                        use_residual=args.mlp_type == "residual",
                        k=args.top_k,
                        min_capacity=args.min_capacity,
                        noisy_gate_policy=args.noisy_gate_policy,
                    )
                )
            self.moe_layer_list = nn.ModuleList(self.moe_layer_list)
            self.fc4 = nn.Linear(84, 10)
        else:
            self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        if self.moe:
            for layer in self.moe_layer_list:
                x, _, _ = layer(x)
            x = self.fc4(x)
        else:
            x = self.fc3(x)
        return x


def test(model_engine, testset, local_device, target_dtype, test_batch_size=4):
    """Test the network on the test data.

    Args:
        model_engine (deepspeed.runtime.engine.DeepSpeedEngine): the DeepSpeed engine.
        testset (torch.utils.data.Dataset): the test dataset.
        local_device (str): the local device name.
        target_dtype (torch.dtype): the target datatype for the test data.
        test_batch_size (int): the test batch size.

    """
    classes = ("plane", "car", "bird", "cat", "deer", "dog", "frog", "horse", "ship", "truck",)
    testloader = torch.utils.data.DataLoader(testset, batch_size=test_batch_size, shuffle=False, num_workers=0)

    correct, total = 0, 0
    class_correct = list(0.0 for i in range(10))
    class_total = list(0.0 for i in range(10))

    # Start testing.
    model_engine.eval()
    with torch.no_grad():
        for data in testloader:
            images, labels = data
            if target_dtype != None:
                images = images.to(target_dtype)
            outputs = model_engine(images.to(local_device))
            _, predicted = torch.max(outputs.data, 1)
            # Count the total accuracy.
            total += labels.size(0)
            correct += (predicted == labels.to(local_device)).sum().item()

            # Count the accuracy per class.
            batch_correct = (predicted == labels.to(local_device)).squeeze()
            for i in range(test_batch_size):
                label = labels[i]
                class_correct[label] += batch_correct[i].item()
                class_total[label] += 1

    if model_engine.local_rank == 0:
        print(f"Accuracy of the network on the {total} test images: {100 * correct / total : .0f} %")
        for i in range(10):
            print(f"Accuracy of {classes[i] : >5s} : {100 * class_correct[i] / class_total[i] : 2.0f} %")


def main(args):
    # Initialize DeepSpeed distributed backend.
    deepspeed.init_distributed()
    ########################################################################
    # Step1. Data Preparation.
    #
    # The output of torchvision datasets are PILImage images of range [0, 1].
    # We transform them to Tensors of normalized range [-1, 1].
    #
    # Note:
    #     If running on Windows and you get a BrokenPipeError, try setting
    #     the num_worker of torch.utils.data.DataLoader() to 0.
    ########################################################################
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    if torch.distributed.get_rank() != 0:
        torch.distributed.barrier()
    trainset = torchvision.datasets.CIFAR10(root="./data", train=True, download=True, transform=transform)
    testset = torchvision.datasets.CIFAR10(root="./data", train=False, download=True, transform=transform)
    if torch.distributed.get_rank() == 0:
        torch.distributed.barrier()

    ########################################################################
    # Step 2. Define the network with DeepSpeed.
    #
    # First, we define a Convolution Neural Network.
    # Then, we define the DeepSpeed configuration dictionary and use it to
    # initialize the DeepSpeed engine.
    ########################################################################
    net = Net(args)
    # Get list of parameters that require gradients.
    parameters = filter(lambda p: p.requires_grad, net.parameters())

    # If using MoE, create separate param groups for each expert.
    if args.moe_param_group:
        parameters = create_moe_param_groups(net)

    # Initialize DeepSpeed to use the following features.
    #   1) Distributed model.
    #   2) Distributed data loader.
    #   3) DeepSpeed optimizer.
    ds_config = get_ds_config(args)
    model_engine, optimizer, trainloader, __ = deepspeed.initialize(
        args=args,
        model=net,
        model_parameters=parameters,
        training_data=trainset,
        config=ds_config,
    )

    # Get the local device name (str) and local rank (int).
    local_device = get_accelerator().device_name(model_engine.local_rank)
    local_rank = model_engine.local_rank

    # For float32, target_dtype will be None so no datatype conversion needed.
    target_dtype = None
    if model_engine.bfloat16_enabled():
        target_dtype = torch.bfloat16
    elif model_engine.fp16_enabled():
        target_dtype = torch.half

    # Define the Classification Cross-Entropy loss function.
    criterion = nn.CrossEntropyLoss()

    ########################################################################
    # Step 3. Train the network.
    #
    # This is when things start to get interesting.
    # We simply have to loop over our data iterator, and feed the inputs to the
    # network and optimize. (DeepSpeed handles the distributed details for us!)
    ########################################################################

    for epoch in range(args.epochs):  # loop over the dataset multiple times
        running_loss = 0.0
        for i, data in enumerate(trainloader):
            # Get the inputs. ``data`` is a list of [inputs, labels].
            inputs, labels = data[0].to(local_device), data[1].to(local_device)

            # Try to convert to target_dtype if needed.
            if target_dtype != None:
                inputs = inputs.to(target_dtype)

            outputs = model_engine(inputs)
            loss = criterion(outputs, labels)

            model_engine.backward(loss)
            model_engine.step()

            # Print statistics
            running_loss += loss.item()
            if local_rank == 0 and i % args.log_interval == (args.log_interval - 1):  # Print every log_interval mini-batches.
                print(f"[{epoch + 1 : d}, {i + 1 : 5d}] loss: {running_loss / args.log_interval : .3f}")
                running_loss = 0.0
    print("Finished Training")

    ########################################################################
    # Step 4. Test the network on the test data.
    ########################################################################
    test(model_engine, testset, local_device, target_dtype)


if __name__ == "__main__":
    args = add_argument()
    main(args)
5月 06, 2025
4月 06, 2025
ufw