大模型训练框架DeepSpeed使用入门(1): 训练设置

发布于:2024-05-10 ⋅ 阅读:(254) ⋅ 点赞:(0)


官方文档直接抄过来,留个笔记。
https://deepspeed.readthedocs.io/en/latest/initialize.html

使用案例来自:
https://github.com/OvJat/DeepSpeedTutorial


大模型训练的痛点是模型参数过大,动辄上百亿,如果单靠单个GPU来完成训练基本不可能。所以需要多卡或者分布式训练来完成这项工作。

DeepSpeed是由Microsoft提供的分布式训练工具,旨在支持更大规模的模型和提供更多的优化策略和工具。对于更大模型的训练来说,DeepSpeed提供了更多策略,例如:Zero、Offload等。

本文简单介绍下如何使用DeepSpeed。


一、安装

pip install deepspeed

二、训练设置

Step1 第一步参数解析

DeepSpeed 使用 argparse 来应用控制台的设置,使用

deepspeed.add_config_arguments()

可以将DeepSpeed内置的参数增加到我们自己的应用参数解析中。

parser = argparse.ArgumentParser(description='My training script.')
parser.add_argument('--local_rank', type=int, default=-1,
                    help='local rank passed from distributed launcher')
# Include DeepSpeed configuration arguments
parser = deepspeed.add_config_arguments(parser)
cmd_args = parser.parse_args()

Step2 初始化后端

与Step3中的 deepspeed.initialize() 不同,
直接调用即可。
一般发生在以下场景

when using model parallelism, pipeline parallelism, or certain data loader scenarios.

在Step3的initialize前,进行调用

deepspeed.init_distributed()

Step3 训练初始化

首先调用 deepspeed.initialize() 进行初始化,是整个调用DeepSpeed训练的入口。
调用后,如果分布式后端没有被初始化后,此时会初始化分布式后端。
使用案例:

model_engine, optimizer, _, _ = deepspeed.initialize(args=cmd_args,
                                                     model=net,
                                                     model_parameters=net.parameters(),
                                                     training_data=ds)

API如下:

def initialize(args=None,
               model: torch.nn.Module = None,
               optimizer: Optional[Union[Optimizer, DeepSpeedOptimizerCallable]] = None,
               model_parameters: Optional[torch.nn.Module] = None,
               training_data: Optional[torch.utils.data.Dataset] = None,
               lr_scheduler: Optional[Union[_LRScheduler, DeepSpeedSchedulerCallable]] = None,
               distributed_port: int = TORCH_DISTRIBUTED_DEFAULT_PORT,
               mpu=None,
               dist_init_required: Optional[bool] = None,
               collate_fn=None,
               config=None,
               config_params=None):
    """Initialize the DeepSpeed Engine.

    Arguments:
        args: an object containing local_rank and deepspeed_config fields.
            This is optional if `config` is passed.

        model: Required: nn.module class before apply any wrappers

        optimizer: Optional: a user defined Optimizer or Callable that returns an Optimizer object.
            This overrides any optimizer definition in the DeepSpeed json config.

        model_parameters: Optional: An iterable of torch.Tensors or dicts.
            Specifies what Tensors should be optimized.

        training_data: Optional: Dataset of type torch.utils.data.Dataset

        lr_scheduler: Optional: Learning Rate Scheduler Object or a Callable that takes an Optimizer and returns a Scheduler object.
            The scheduler object should define a get_lr(), step(), state_dict(), and load_state_dict() methods

        distributed_port: Optional: Master node (rank 0)'s free port that needs to be used for communication during distributed training

        mpu: Optional: A model parallelism unit object that implements
            get_{model,data}_parallel_{rank,group,world_size}()

        dist_init_required: Optional: None will auto-initialize torch distributed if needed,
            otherwise the user can force it to be initialized or not via boolean.

        collate_fn: Optional: Merges a list of samples to form a
            mini-batch of Tensor(s).  Used when using batched loading from a
            map-style dataset.

        config: Optional: Instead of requiring args.deepspeed_config you can pass your deepspeed config
            as an argument instead, as a path or a dictionary.

        config_params: Optional: Same as `config`, kept for backwards compatibility.

    Returns:
        A tuple of ``engine``, ``optimizer``, ``training_dataloader``, ``lr_scheduler``

        * ``engine``: DeepSpeed runtime engine which wraps the client model for distributed training.

        * ``optimizer``: Wrapped optimizer if a user defined ``optimizer`` is supplied, or if
          optimizer is specified in json config else ``None``.

        * ``training_dataloader``: DeepSpeed dataloader if ``training_data`` was supplied,
          otherwise ``None``.

        * ``lr_scheduler``: Wrapped lr scheduler if user ``lr_scheduler`` is passed, or
          if ``lr_scheduler`` specified in JSON configuration. Otherwise ``None``.
    """

三、训练代码展示

def parse_arguments():
    import argparse
    parser = argparse.ArgumentParser(description='deepspeed training script.')
    parser.add_argument('--local_rank', type=int, default=-1,
                        help='local rank passed from distributed launcher')
    # Include DeepSpeed configuration arguments
    parser = deepspeed.add_config_arguments(parser)
    args = parser.parse_args()
    return args


def train():
    args = parse_arguments()

    # init distributed
    deepspeed.init_distributed()

    # init model
    model = MyClassifier(3, 100, ch_multi=128)

    # init dataset
    ds = MyDataset((3, 512, 512), 100, sample_count=int(1e6))

    # init engine
    engine, optimizer, training_dataloader, lr_scheduler = deepspeed.initialize(
        args=args,
        model=model,
        model_parameters=model.parameters(),
        training_data=ds,
        # config=deepspeed_config,
    )

    # load checkpoint
    engine.load_checkpoint("./data/checkpoints/MyClassifier/")

    # train
    last_time = time.time()
    loss_list = []
    echo_interval = 10

    engine.train()
    for step, (xx, yy) in enumerate(training_dataloader):
        step += 1
        xx = xx.to(device=engine.device, dtype=torch.float16)
        yy = yy.to(device=engine.device, dtype=torch.long).reshape(-1)

        outputs = engine(xx)
        loss = tnf.cross_entropy(outputs, yy)
        engine.backward(loss)
        engine.step()
        loss_list.append(loss.detach().cpu().numpy())

        if step % echo_interval == 0:
            loss_avg = np.mean(loss_list[-echo_interval:])
            used_time = time.time() - last_time
            time_p_step = used_time / echo_interval
            if args.local_rank == 0:
                logging.info(
                    "[Train Step] Step:{:10d}  Loss:{:8.4f} | Time/Batch: {:6.4f}s",
                    step, loss_avg, time_p_step,
                )
            last_time = time.time()
    # save checkpoint
    engine.save_checkpoint("./data/checkpoints/MyClassifier/")

网站公告

今日签到

点亮在社区的每一天
去签到