deepspeed 基本使用
package
本文字数:5.6k 字 | 阅读时长 ≈ 27 min

deepspeed 基本使用

package
本文字数:5.6k 字 | 阅读时长 ≈ 27 min

1. deepspeed 的基本用法

1.1 deepspeed 安装

安装:pip install deepspeed

1.2 deepspeed 使用

直接写一个简单的程序来展示

import torch
import deepspeed
from torch.utils.data import DataLoader, Dataset

# 简单的自定义数据集
class SimpleDataset(Dataset):
    def __init__(self):
        self.x = torch.linspace(-10, 10, 100).unsqueeze(1)
        self.y = 2 * self.x + 1  # 线性关系: y = 2x + 1

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

class LinearModel(torch.nn.Module):
    def __init__(self):
        super(LinearModel, self).__init__()
        self.linear = torch.nn.Linear(1, 1)

    def forward(self, x):
        return self.linear(x)

ds_config = {
    "train_batch_size": 16,  # 总batch大小
    "gradient_accumulation_steps": 1,  # 梯度累积步数
    "fp16": {  # 是否启用混合精度训练
        "enabled": False
    },
    "zero_optimization": {  # 是否启用 ZeRO 优化
        "stage": 0  # ZeRO stage: 0表示关闭
    }
}

def main():
    dataset = SimpleDataset()
    dataloader = DataLoader(dataset, batch_size=ds_config["train_batch_size"])
    model = LinearModel()
    criterion = torch.nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    model_engine, optimizer, dataloader, _ = deepspeed.initialize(
        model=model,
        optimizer=optimizer,
        model_parameters=model.parameters(),
        config=ds_config,
        training_data=dataset
    )

    epochs = 200
    use_fp16 = ds_config.get("fp16", {}).get("enabled", False)
    for epoch in range(epochs):
        for i, (x, y) in enumerate(dataloader):
            x, y = x.to(model_engine.local_rank), y.to(model_engine.local_rank)
            if use_fp16:
                x, y = x.half(), y.half()
            outputs = model_engine(x)
            loss = criterion(outputs, y)
            model_engine.backward(loss)
            model_engine.step()

            if i % 10 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Step [{i}], Loss: {loss.item()}")

if __name__ == "__main__":
    main()

执行 deepspeed train.py 来用当前服务器的所有卡来运行程序,如果想要制定某张卡,则执行 deepspeed --include localhost:2 train.py

接下来就可以看到训练过程了

[2023-08-11 15:28:13,128] [INFO] [real_accelerator.py:110:get_accelerator] Setting ds_accelerator to cuda (auto detect)
......

2. deepspeed 的一些设置

2.1 一些参数

指定端口deepspeed --master_port 29500 train.py
指定文件deepspeed train.py --deepspeed_config ds_config.json
梯度累计: 将 gradient_accumulation_steps 参数设置一下即可,总的 batch 大小 = train_batch_size * gradient_accumulation_steps,以上面程序为例,假设累计梯度为 4,不需要更改其他设置 model_engine.step() 会自动在 4 步之后更新参数,他内部有一个 model_engine.is_gradient_accumulation_boundary() 参数,来判断是否应该更新了,可以通过下面程序来 print 一下

if model_engine.is_gradient_accumulation_boundary():  # 确保是更新时刻
    for param in model_engine.parameters():
        print(f"Epoch {epoch} iter {i}, Before update: {param.data}")

2.3 如何用 vscode 调试 deepspeed 程序

launch.json 文件改为以下内容即可,其中 program 那一行的 llava2 改为自己的环境名字

{
    // Use IntelliSense to learn about possible attributes.
    // Hover to view descriptions of existing attributes.
    // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Current File",
            "type": "python",
            "request": "launch",
            "program": "/home/wangyh/miniconda3/envs/llava2/bin/deepspeed",
            "console": "integratedTerminal",
            "justMyCode": true,
            "args": [
                "--include", "localhost:7",
                "test.py",
                "--deepspeed_config", "/data/wangyh/mllms/deepspeed_test/config.json",
            ],
        }
    ]
}

2.4 deepspeed.initialize() 函数

deepspeed.initialize(model=None, optimizer=None, args=None, lr_scheduler=None, mpu=None, dist_init_required=True, config=None, config_params=None, training_data=None, collate_fn=None, model_parameters=None)

模型 (model)

优化器 (optimizer)

命令行参数 (args)

学习率调度器 (lr_scheduler)

模型并行单元 (mpu)

是否初始化分布式环境 (dist_init_required)

配置 (config)

动态配置参数 (config_params)

训练数据 (training_data)

自定义数据处理函数 (collate_fn)
- 可选,类型: callable
- 用于自定义批次数据的组合逻辑(类似 DataLoadercollate_fn)。
- 示例:
python collate_fn = lambda x: torch.stack(x)

模型参数 (model_parameters)
- 可选,类型: listiterable
- 可显式指定模型需要优化的参数列表。如果未提供,则使用 model.parameters()

2.3 返回值

deepspeed.initialize() 返回以下 4 个对象,通过封装用户的模型、优化器和数据加载器,构成 DeepSpeed 的训练基础。

  1. model_engine

    • 类型:deepspeed.DeepSpeedEngine
    • DeepSpeed 封装的模型,代替原始的 PyTorch model,并提供优化的训练接口。
    • 示例:
      model_engine.forward()
      model_engine.backward()
      model_engine.step()
      
  2. optimizer

    • 类型:deepspeed.DeepSpeedOptimizer(或 None
    • DeepSpeed 管理的优化器。如果用户未提供优化器,DeepSpeed 会自动创建。
  3. training_dataloader

    • 类型:torch.utils.data.DataLoader(或 None
    • 如果提供了 training_data,DeepSpeed 自动为你创建训练数据加载器。
  4. lr_scheduler

    • 类型:torch.optim.lr_scheduler(或 None
    • DeepSpeed 管理的学习率调度器。

3. 使用示例

以下是一个完整的使用示例,展示如何结合 deepspeed.initialize() 构建训练过程:

import torch
import deepspeed
from transformers import AutoModel, AutoTokenizer

# 1. 加载模型和数据
model = AutoModel.from_pretrained("bert-base-uncased")
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
training_data = torch.utils.data.TensorDataset(torch.randn(100, 10), torch.randn(100, 1))

# 2. 配置 DeepSpeed
ds_config = {
    "train_batch_size": 32,
    "gradient_accumulation_steps": 2,
    "fp16": {"enabled": True},
    "zero_optimization": {"stage": 1}
}

# 3. 初始化 DeepSpeed
model_engine, optimizer, dataloader, _ = deepspeed.initialize(
    model=model,
    optimizer=optimizer,
    training_data=training_data,
    config_params=ds_config
)

# 4. 训练主循环
for batch in dataloader:
    inputs, labels = batch
    outputs = model_engine(inputs)
    loss = torch.nn.functional.mse_loss(outputs, labels)
    
    model_engine.backward(loss)
    model_engine.step()

3. deepspeed 应用分析

3.1 基本介绍

DeepSpeed 官方教程
Huggingface 官方教程
DeepSpeed 论文

  1. Optimizer state partitioning (ZeRO stage 1)
  2. Gradient partitioning (ZeRO stage 2)
  3. Parameter partitioning (ZeRO stage 3)
  4. Custom mixed precision training handling
  5. A range of fast CUDA-extension-based optimizers
  6. ZeRO-Offload to CPU and NVMe

最多的我们使用的是前三种,第一种是针对 Optimizer 的优化,第二种主要是对梯度的优化(因此在 inference 的时候是无效的),第三种是对模型参数的优化(可以大模型切分到多张卡上)

这里只介绍 DeepSpeed 在 Huggingface 中的应用

训练过程:支持 ZeRO stage1,2,3 以及 Infinity
推理过程:支持 ZeRO stage3 以及 Infinity

启动方式

# 常规的pytorch DDP启动
torch.distributed.run --nproc_per_node=2 your_program.py <normal cl args> --deepspeed ds_config.json

# deepspeed专属启动方式
deepspeed --num_gpus=2 your_program.py <normal cl args> --deepspeed ds_config.json

注意,如果你要指定使用的 GPU,比如你想用 GPU 1,那么使用 CUDA_VISIBLE_DEVICES 是无效的,需要使用 localhost 参数

deepspeed --include localhost:1 examples/pytorch/translation/run_translation.py ...

注意,在 deepspeed 文档中的很多参数会和 trainer 中的有冲突,所以为了避免冲突的发生,你需要将参数的值设为 auto,它能够自动替换为正确或者最有效的值(如果二者的参数不匹配,训练可能会失败)

Stage two

{
    "zero_optimization": {
        "stage": 2,
        "offload_optimizer": {
            "device": "cpu",
            "pin_memory": true
        },
        "allgather_partitions": true,
        "allgather_bucket_size": 5e8,
        "overlap_comm": true,
        "reduce_scatter": true,
        "reduce_bucket_size": 5e8,
        "contiguous_gradients": true
    }
}

Stage three

{
    "zero_optimization": {
        "stage": 3,
        "offload_optimizer": {
            "device": "cpu",
            "pin_memory": true
        },
        "offload_param": {
            "device": "cpu",
            "pin_memory": true
        },
        "overlap_comm": true,
        "contiguous_gradients": true,
        "sub_group_size": 1e9,
        "reduce_bucket_size": "auto",
        "stage3_prefetch_bucket_size": "auto",
        "stage3_param_persistence_threshold": "auto",
        "stage3_max_live_parameters": 1e9,
        "stage3_max_reuse_distance": 1e9,
        "stage3_gather_16bit_weights_on_model_save": true
    }
}

Others

{
    "fp16": {
        "enabled": true,
        "auto_cast": false,
        "loss_scale": 0,
        "initial_scale_power": 16,
        "loss_scale_window": 1000,
        "hysteresis": 2,
        "consecutive_hysteresis": false,
        "min_loss_scale": 1
    }
    "bf16": {
        "enabled": true
    }
}

训练的时候 grad_norm 为 nan 或者了 loss=0
https://github.com/microsoft/DeepSpeed/issues/5242
将 common_overlap 设为 false

DeepspeedExample

1. deepspeed 基本原理

现在的模型越来越大,特别是大模型动辄 7B,13B,70B,这种大模型甚至没办法在 A100-80G 上进行训练,deepseed 的思想就是 GPU 显存不够,CPU 内存来凑,用时间换空间,这里需要优化的参数包括: 模型参数,优化器参数,梯度

DeepSpeed 模型训练是通过 DeepSpeed 引擎完成的。这个引擎可以包装任何类型为 torch.nn.module 的模型,并且有一套最基本的 API 用于训练和检查点保存模型

model_engine, optimizer, _, _ = deepspeed.initialize(args=cmd_args, model=model,model_parameters=params,)

如果您已经设置了分布式环境,您将需要把以下代码:

torch.distributed.init_process_group(...)  # remove
deepspeed.init_distributed()  # replace

如果您在 deepspeed.initialize()之后才需要设置分布式环境,就不必使用这个函数,因为 DeepSpeed 会在初始化时自动设置分布式环境。但不管怎样,如果已经设置了 torch.distributed.init_process_group,就需要将其移除。

当我们说“已经设置了分布式环境”,这意味着在一个计算机集群或多台计算机之间,已经配置好了相互通信和数据共享的必要设置,从而使得这些计算机能够协同工作,共同执行分布式计算任务。在机器学习和深度学习领域,这通常涉及到在多个 GPU 或服务器上并行处理数据和模型。

关于“如果您在 deepspeed.initialize()之后才需要设置分布式环境”的问题,这里有几个可能的场景:

Training#
DeepSpeed 的 training 流畅和 Pytorch 其实差别不大,但实际上,DeepSpeed 可自动执行分布式数据并行训练(混合精度)所需的必要操作,并使用预定义的 learning rate scheduler

for step, batch in enumerate(data_loader):
    # forward() method
    loss = model_engine(batch)

    # runs backpropagation
    model_engine.backward(loss)

    # weight update
    model_engine.step()

Gradient Averaging:在 DDP 中,backward 会确保在每个 training batch 结束后,梯度在 DP 处理中被平均。(这个很好理解,在前面的 ZeRO 博客中也有详细介绍)

Loss Scaling:在 FP16/混合精度训练中,DeepSpeed 引擎会自动处理损失缩放,以避免在梯度中的精度损失。

这里的 Loss Scaling 可能需要一定的介绍。Loss scaling 是一种在训练深度学习模型时使用较小数据类型(如 FP16)时常用的技术,旨在防止在梯度计算中出现的数值下溢。由于使用了 FP16 等较小的数据类型,在返现传播中小梯度可能直接变为 0 了,从而无法有效的更新模型权重。Loss scaling 将先放大 Loss 进而放大梯度,再反向传播后再将梯度缩放回原来的大小,保持更多的精度信息。

具体的实现可以参考 DeepSpeed 中 DynamicLossScaler 的实现。 实际上,DeepSpeed 中实现的核心思想为,保证数值稳定的前提下,尽可能使用最大的 loss scale。

Learning Rate Scheduler:当使用 DeepSpeed 的学习率调度器(在 ds_config.json 文件中指定)时,DeepSpeed 会在每个训练步骤调用调度器的 step() 方法(当执行 model_engine.step() 时)。如果不使用 DeepSpeed 的学习率调度器的话,则主要分为以下两种情况:

在每个 training step 中执行自定义的调度器,可以在 deepspeed.initialize 添加参数 lr_scheduler=,用于定制化用户需要的 scheduler。
或者我们需要按照某些特殊的间隔/周期进行学习率的调整,这时候我们就无须传给 DeepSpeed (他会在每个 training step 执行),而是自己手动的管理。
Model Checkpointing#
如果我们需要保存或者加载 DeepSpeed 模型的 checkpointing,我们只需要使用两个函数:save_checkpoint 和 load_checkpoint 即可,他们用两个参数 ckpt_dir 和 ckpt_id 来唯一标识一个 checkpoint。

下面是 DeepSpeed 官方给出的 Example Code。

# load checkpoint
# client_sd : Dict
_, client_sd = model_engine.load_checkpoint(args.load_dir, args.ckpt_id)
step = client_sd['step']

# advance data loader to ckpt step
dataloader_to_step(data_loader, step + 1)

for step, batch in enumerate(data_loader):

    # forward() method
    loss = model_engine(batch)

    # runs backpropagation
    model_engine.backward(loss)

    # weight update
    model_engine.step()

    # save checkpoint
    if step % args.save_interval:
        client_sd['step'] = step
        ckpt_id = loss.item()
        model_engine.save_checkpoint(args.save_dir, ckpt_id, client_sd = client_sd)

这段代码 client_sd 的定义容易让人 confused。实际上,DeepSpeed 可以自动保存和恢复模型、优化器以及学习率调度器的状态,同时对用户隐藏这些细节。但是用户可能有一些其他的信息需要保存。为了支持这个功能,DeepSpeed 非常贴心的提供了对应的接口, save_checkpoint 接受一个状态字典 client_sd 用于保存,同时也能从 load_checkpoint 读取出来。

⚠️Important:所有进程都必须调用 load_checkpoint,而不仅仅是 rank 为 0 的进程(主进程)。 这是因为每个进程都需要保存其主权重,调度器和优化器状态。如果只有主进程调用该方法,那么在等待与其他进程同步时就会挂起。

DeepSpeed Configuration#
DeepSpeed 的配置可以通过一个 JSON 文件实现,具体的文件名在程序中应该被标识为:args.deepspeed_config。下面将简单的介绍一个 Example,完整的特性可以参考 DS_CONFIG doc。

{
  "train_batch_size": 8,
  "gradient_accumulation_steps": 1,
  "optimizer": {
    "type": "Adam",
    "params": {
      "lr": 0.00015
    }
  },
  "fp16": {
    "enabled": true
  },
  "zero_optimization": true
}

单机多卡 Resource Configuration (single-node)#
Official Section URL

在单机的情况下, DeepSpeed 的使用比较简单,因为 DeepSpeed 会自动的检测所有可用的 GPU 数量。所以,我们只需要使用 --include 和 --exclude 用来选择或者反选一些 GPU。用 localhost 来指代主机名,比如如下代码使用 GPU0,GPU1 进行训练。

⚠️ 注意,不能在 DeepSpeed 中使用 CUDA_VISIBLE_DEVICES 来控制应使用哪些设备。

deepspeed --include localhost:0,1 ...
#!/bin/bash

deepspeed --bind_cores_to_rank cifar10_deepspeed.py --deepspeed $@
import argparse

import deepspeed
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from deepspeed.accelerator import get_accelerator
from deepspeed.moe.utils import split_params_into_different_moe_groups_for_optimizer


def add_argument():
    parser = argparse.ArgumentParser(description="CIFAR")

    # For train.
    parser.add_argument("--epochs", default=30, type=int, help="number of total epochs (default: 30)")
    parser.add_argument("--local_rank", type=int, default=-1, help="local rank passed from distributed launcher")
    parser.add_argument("--log-interval", type=int, default=2000, help="output logging information at a given interval",)
    # For mixed precision training.
    parser.add_argument("--dtype", default="fp16", type=str, choices=["bf16", "fp16", "fp32"], help="Datatype used for training",)
    # For ZeRO Optimization.
    parser.add_argument("--stage", default=0, type=int, choices=[0, 1, 2, 3], help="Datatype used for training",)

    # For MoE (Mixture of Experts).
    parser.add_argument("--moe", default=False, action="store_true", help="use deepspeed mixture of experts (moe)",)
    parser.add_argument("--ep-world-size", default=1, type=int, help="(moe) expert parallel world size")
    parser.add_argument("--num-experts", type=int, nargs="+", default=[1,], help="number of experts list, MoE related.",)
    parser.add_argument("--mlp-type", type=str, default="standard", help="Only applicable when num-experts > 1, accepts [standard, residual]",)
    parser.add_argument("--top-k", default=1, type=int, help="(moe) gating top 1 and 2 supported")
    parser.add_argument("--min-capacity", default=0, type=int, help="(moe) minimum capacity of an expert regardless of the capacity_factor",)
    parser.add_argument("--noisy-gate-policy", default=None, type=str, help="(moe) noisy gating (only supported with top-1). Valid values are None, RSample, and Jitter")
    parser.add_argument("--moe-param-group", default=False, action="store_true", help="(moe) create separate moe param groups, required when using ZeRO w. MoE",)
    parser = deepspeed.add_config_arguments(parser)
    args = parser.parse_args()
    return args


def create_moe_param_groups(model):
    """Create separate parameter groups for each expert."""
    parameters = {"params": [p for p in model.parameters()], "name": "parameters"}
    return split_params_into_different_moe_groups_for_optimizer(parameters)


def get_ds_config(args):
    """Get the DeepSpeed configuration dictionary."""
    ds_config = {
        "train_batch_size": 16,
        "steps_per_print": 2000,
        "optimizer": {
            "type": "Adam",
            "params": {
                "lr": 0.001,
                "betas": [0.8, 0.999],
                "eps": 1e-8,
                "weight_decay": 3e-7,
            },
        },
        "scheduler": {
            "type": "WarmupLR",
            "params": {
                "warmup_min_lr": 0,
                "warmup_max_lr": 0.001,
                "warmup_num_steps": 1000,
            },
        },
        "gradient_clipping": 1.0,
        "prescale_gradients": False,
        "bf16": {"enabled": args.dtype == "bf16"},
        "fp16": {
            "enabled": args.dtype == "fp16",
            "fp16_master_weights_and_grads": False,
            "loss_scale": 0,
            "loss_scale_window": 500,
            "hysteresis": 2,
            "min_loss_scale": 1,
            "initial_scale_power": 15,
        },
        "wall_clock_breakdown": False,
        "zero_optimization": {
            "stage": args.stage,
            "allgather_partitions": True,
            "reduce_scatter": True,
            "allgather_bucket_size": 50000000,
            "reduce_bucket_size": 50000000,
            "overlap_comm": True,
            "contiguous_gradients": True,
            "cpu_offload": False,
        },
    }
    return ds_config


class Net(nn.Module):
    def __init__(self, args):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.moe = args.moe
        if self.moe:
            fc3 = nn.Linear(84, 84)
            self.moe_layer_list = []
            for n_e in args.num_experts:
                # Create moe layers based on the number of experts.
                self.moe_layer_list.append(
                    deepspeed.moe.layer.MoE(
                        hidden_size=84,
                        expert=fc3,
                        num_experts=n_e,
                        ep_size=args.ep_world_size,
                        use_residual=args.mlp_type == "residual",
                        k=args.top_k,
                        min_capacity=args.min_capacity,
                        noisy_gate_policy=args.noisy_gate_policy,
                    )
                )
            self.moe_layer_list = nn.ModuleList(self.moe_layer_list)
            self.fc4 = nn.Linear(84, 10)
        else:
            self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        if self.moe:
            for layer in self.moe_layer_list:
                x, _, _ = layer(x)
            x = self.fc4(x)
        else:
            x = self.fc3(x)
        return x


def test(model_engine, testset, local_device, target_dtype, test_batch_size=4):
    """Test the network on the test data.

    Args:
        model_engine (deepspeed.runtime.engine.DeepSpeedEngine): the DeepSpeed engine.
        testset (torch.utils.data.Dataset): the test dataset.
        local_device (str): the local device name.
        target_dtype (torch.dtype): the target datatype for the test data.
        test_batch_size (int): the test batch size.

    """
    classes = ("plane", "car", "bird", "cat", "deer", "dog", "frog", "horse", "ship", "truck",)
    testloader = torch.utils.data.DataLoader(testset, batch_size=test_batch_size, shuffle=False, num_workers=0)

    correct, total = 0, 0
    class_correct = list(0.0 for i in range(10))
    class_total = list(0.0 for i in range(10))

    # Start testing.
    model_engine.eval()
    with torch.no_grad():
        for data in testloader:
            images, labels = data
            if target_dtype != None:
                images = images.to(target_dtype)
            outputs = model_engine(images.to(local_device))
            _, predicted = torch.max(outputs.data, 1)
            # Count the total accuracy.
            total += labels.size(0)
            correct += (predicted == labels.to(local_device)).sum().item()

            # Count the accuracy per class.
            batch_correct = (predicted == labels.to(local_device)).squeeze()
            for i in range(test_batch_size):
                label = labels[i]
                class_correct[label] += batch_correct[i].item()
                class_total[label] += 1

    if model_engine.local_rank == 0:
        print(f"Accuracy of the network on the {total} test images: {100 * correct / total : .0f} %")
        for i in range(10):
            print(f"Accuracy of {classes[i] : >5s} : {100 * class_correct[i] / class_total[i] : 2.0f} %")


def main(args):
    # Initialize DeepSpeed distributed backend.
    deepspeed.init_distributed()
    ########################################################################
    # Step1. Data Preparation.
    #
    # The output of torchvision datasets are PILImage images of range [0, 1].
    # We transform them to Tensors of normalized range [-1, 1].
    #
    # Note:
    #     If running on Windows and you get a BrokenPipeError, try setting
    #     the num_worker of torch.utils.data.DataLoader() to 0.
    ########################################################################
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

    if torch.distributed.get_rank() != 0:
        torch.distributed.barrier()
    trainset = torchvision.datasets.CIFAR10(root="./data", train=True, download=True, transform=transform)
    testset = torchvision.datasets.CIFAR10(root="./data", train=False, download=True, transform=transform)
    if torch.distributed.get_rank() == 0:
        torch.distributed.barrier()

    ########################################################################
    # Step 2. Define the network with DeepSpeed.
    #
    # First, we define a Convolution Neural Network.
    # Then, we define the DeepSpeed configuration dictionary and use it to
    # initialize the DeepSpeed engine.
    ########################################################################
    net = Net(args)
    # Get list of parameters that require gradients.
    parameters = filter(lambda p: p.requires_grad, net.parameters())

    # If using MoE, create separate param groups for each expert.
    if args.moe_param_group:
        parameters = create_moe_param_groups(net)

    # Initialize DeepSpeed to use the following features.
    #   1) Distributed model.
    #   2) Distributed data loader.
    #   3) DeepSpeed optimizer.
    ds_config = get_ds_config(args)
    model_engine, optimizer, trainloader, __ = deepspeed.initialize(
        args=args,
        model=net,
        model_parameters=parameters,
        training_data=trainset,
        config=ds_config,
    )

    # Get the local device name (str) and local rank (int).
    local_device = get_accelerator().device_name(model_engine.local_rank)
    local_rank = model_engine.local_rank

    # For float32, target_dtype will be None so no datatype conversion needed.
    target_dtype = None
    if model_engine.bfloat16_enabled():
        target_dtype = torch.bfloat16
    elif model_engine.fp16_enabled():
        target_dtype = torch.half

    # Define the Classification Cross-Entropy loss function.
    criterion = nn.CrossEntropyLoss()

    ########################################################################
    # Step 3. Train the network.
    #
    # This is when things start to get interesting.
    # We simply have to loop over our data iterator, and feed the inputs to the
    # network and optimize. (DeepSpeed handles the distributed details for us!)
    ########################################################################

    for epoch in range(args.epochs):  # loop over the dataset multiple times
        running_loss = 0.0
        for i, data in enumerate(trainloader):
            # Get the inputs. ``data`` is a list of [inputs, labels].
            inputs, labels = data[0].to(local_device), data[1].to(local_device)

            # Try to convert to target_dtype if needed.
            if target_dtype != None:
                inputs = inputs.to(target_dtype)

            outputs = model_engine(inputs)
            loss = criterion(outputs, labels)

            model_engine.backward(loss)
            model_engine.step()

            # Print statistics
            running_loss += loss.item()
            if local_rank == 0 and i % args.log_interval == (args.log_interval - 1):  # Print every log_interval mini-batches.
                print(f"[{epoch + 1 : d}, {i + 1 : 5d}] loss: {running_loss / args.log_interval : .3f}")
                running_loss = 0.0
    print("Finished Training")

    ########################################################################
    # Step 4. Test the network on the test data.
    ########################################################################
    test(model_engine, testset, local_device, target_dtype)


if __name__ == "__main__":
    args = add_argument()
    main(args)
8月 26, 2025