Llama-factory源码详细解读

发布于:2024-07-11 ⋅ 阅读:(26) ⋅ 点赞:(0)

微调

采用逐行调试的方法,细节来看SFT代码

AutoModelForCausalLM
peft
AutoTokenizer
DataCollatorForSeq2Seq
run_sft
load_tokenizer
get_dataset
load_model
get_dataset_list
load_single_dataset
get_preprocess_and_print_func
from_pretrained
register_autoclass
get_peft_model
from_pretrained
model
tokenizer
dataset
data_collator
trainer
  • 准备项:

    配置参数信息,推荐一种方法直接配置 YAML文件,并加载到sys.argv 变量里

    # 在train_bash.py 文件里顶部添加这两行代码
    import sys
    sys.argv.append("/workspace/LLaMA-Factory/config.yaml") # 这是 yaml 文件路径
    # 也就是把 终端里命令行里的参数信息,直接方法 yaml 文件里,直接 debug train_bash.py 文件
    

    解析参数:

    # 参数解析代码 在 LLaMA-Factory/src/llmtuner/hparams/parser.py 文件里
    def _parse_train_args(args: Optional[Dict[str, Any]] = None) -> _TRAIN_CLS:
        parser = HfArgumentParser(_TRAIN_ARGS)
        return _parse_args(parser, args)
    """
    HfArgumentParser 是 Hugging Face Transformers 库中的一个类,它用于解析命令行参数和配置文件,以便于在训练和部署基于Transformers的模型时使用。
    HfArgumentParser 继承自 Python 的内置 argparse 模块,提供了对 YAML 配置文件的支持,可以在不同的环境中配置和共享参数。
    _TRAIN_ARGS:[基于 dataclass 修饰的 类配置]
    """
    # 读文件
    parser.parse_yaml_file(os.path.abspath(sys.argv[1]))
    """
    一个替代的辅助方法,完全不使用 `argparse`,而是加载一个yaml文件并填充数据类类型。
    参数:
        yaml_file (`str` or `os.PathLike`):
            要解析的yaml文件的文件名
        allow_extra_keys (`bool`, 可选, 默认为 `False`):
            默认为 False。如果为 False,并且yaml文件中包含未被解析的键,将引发异常。
     返回:
        一个元组,包含:
           - 数据类实例,按照它们传递给初始化器的顺序排列
    """
    

    仔细阅读代码不难发现:

    parser.parse_args_into_dataclasses(return_remaining_strings=True) 同理也可以实现,将命令行参数解析为指定数据类类型的实例。展开内部细看:

    parse_args_into_dataclasses 函数入参说明

    • args 参数是一个字符串列表,包含了要解析的命令行参数。如果没有提供,默认会使用 sys.argv,这是 Python 中一个包含命令行参数的列表。
    • return_remaining_strings 参数是一个布尔值,如果设置为 True,函数不仅会返回解析后的参数,还会返回一个列表,包含所有未被解析器识别的剩余参数字符串。
    • look_for_args_file 参数是一个布尔值,如果设置为 True,函数会在与当前进程的入口点脚本同名的目录下查找一个名为 “.args” 的文件,并将其内容作为额外的命令行参数添加到 args 中。# 当参数信息也可以保存在 .args 后缀文件里,但要求文件名与启动脚本文件名一致,如 train_bash.args.
    • args_filename 参数是一个字符串,如果被指定,函数将使用这个文件名来代替默认的 “.args” 文件。
    • args_file_flag 参数是一个字符串,如果被指定,函数会在命令行参数中查找这个标志,并使用标志后面的文件作为参数文件。如果命令行中多次出现这个标志,将以最后一次出现的文件为准。
    for dtype in self.dataclass_types:
        keys = {f.name for f in dataclasses.fields(dtype) if f.init}
        inputs = {k: v for k, v in vars(namespace).items() if k in keys}
        for k in keys:
            delattr(namespace, k)
            obj = dtype(**inputs)
            outputs.append(obj)
    """
    后续代码: 将所有参数信息,全部加载到 namespace 里, 然后遍历每一个数据类,找到预先定义的参数,并从 namespace 里去除该属性。
    """        
    # namespace 怎么来的,看以下代码,设置参数属性并校验。
    if namespace is None:
        namespace = Namespace()
        for action in self._actions:
            if action.dest is not SUPPRESS:
                if not hasattr(namespace, action.dest):
                    if action.default is not SUPPRESS:
                        setattr(namespace, action.dest, action.default)    
    
    """如果必要参数缺失,直接抛出错误:"""                   
    #  train_bash.py: error: the following arguments are required: --model_name_or_path, --output_dir                 
    
  • SFT 核心代码块

    LLaMA-Factory/src/llmtuner/train/sft/workflow.py

    • load_tokenizer

      tokenizer = AutoTokenizer.from_pretrained(
                              model_args.model_name_or_path,
                              use_fast=model_args.use_fast_tokenizer,
                              split_special_tokens=model_args.split_special_tokens,
                              padding_side="right",
                              **init_kwargs,)	
      patch_tokenizer(tokenizer)
      """
      patch_tokenizer 目的是确保 tokenizer 对象有一个正确实现的 _pad 方法,这个方法用于处理序列填充(padding)。
      如果 tokenizer 对象没有从 PreTrainedTokenizerBase 继承 _pad 方法,确保 tokenizer 可以正确地进行序列填充。
      tokenizer._pad = MethodType(PreTrainedTokenizerBase._pad, tokenizer)
      """
      
    • get_dataset

      get_dataset_list 函数这部分主要是数据读取和数据转化为DatasetAttr 属性,load_single_dataset 从文件中读取数据,并对齐操作。

      • 文件 DATA_CONFIG 也就是 'dataset_info.json' (依赖文件)读取到 dataset_info里,调试注意相对路径问题,建议dataset_dir参数
      • column_names.extend(["prompt", "query", "response", "history"]) : dataset_info 里 columns 字段依次对齐到上述字段。
      1.load_dataset

      这段代码使用 Hugging Face Transformers 库中的 load_dataset 函数来加载一个数据集。官方翻译如下:

      这个函数在幕后执行以下操作:
      1. 如果库中没有缓存,则从 path 下载并导入数据集脚本。
      
          如果数据集没有数据集脚本,那么会导入一个通用的数据集脚本(JSON, CSV, Parquet, text 等)。
          数据集脚本是小型的 Python 脚本(定义了数据集构建器)。它们定义了数据集的引用、信息和格式,包含了原始数据文件的路径或 URL,以及从加载示例的代码。
      
      2. 运行数据集脚本,这将:
      
          * 如果本地或缓存中尚不可用,从原始 URL(参见脚本)下载数据集文件。
          * 处理并将数据集缓存为类型化的 Arrow 表格以便缓存。Arrow 表格是任意长度的、支持嵌套对象的类型化表格,可以映射到 numpy/pandas/python 通用		类型。它们可以直接从磁盘访问,加载到 RAM 中,甚至可以通过网络流式传输。
      
      3.返回由 split 请求的分割构建的数据集(默认:所有分割)。
      
      dataset = load_dataset(
                      path=data_path,
                      name=data_name,
                      data_dir=data_dir,
                      data_files=data_files,
                      split=data_args.split,
                      cache_dir=model_args.cache_dir,
                      token=model_args.hf_hub_token,
                      streaming=(data_args.streaming and (dataset_attr.load_from != "file")),
                      **kwargs,
      )
      """
      path: 根据传入的 path,使用的数据集构建器可能来自一个通用的数据集脚本(例如 JSON, CSV, Parquet, text 等)或者来自数据集目录内的数据集脚本(一个 Python 文件)。
      
      对于本地数据集:
              如果 path 是一个本地目录(只包含数据文件) -> 根据目录内容加载一个通用的数据集构建器(csv, json, text 等) 
              例如:'./path/to/directory/with/my/csv/data'。
              如果 path 是一个本地数据集脚本或一个包含本地数据集脚本的目录(如果脚本与目录同名) -> 从数据集脚本加载数据集构建器
              例如:'./dataset/squad' 或 './dataset/squad/squad.py'。
      
      name: 数据集的名称。对于 Hugging Face Hub 上的数据集,这是数据集的版本。对于本地数据集,这个参数通常不需要设置。
      data_dir: 数据集所在的目录。如果指定了 data_dir,load_dataset 将在这个目录中查找数据集。
      data_files: 一个字典或列表,指定了数据集文件的位置。
      split: 要加载的数据集的分割(split)。这可以是 "train"、"validation"、"test" 等,或者是自定义的分割名称。
      cache_dir: 缓存数据集的目录。如果指定了 cache_dir,下载的数据集将被缓存在这个目录中,以便于下次快速加载。
      token: Hugging Face Hub 的访问令牌。如果数据集是私有的或者是需要认证的,可以使用这个参数来提供访问令牌。
      streaming: 是否以流式方式加载数据集。如果设置为 True,数据集将以流式方式加载,这意味着数据集不会被完全加载到内存中,而是按需加载。这对于大型数据集非常有用。
      **kwargs: 其他关键字参数。这些参数将传递给 load_dataset 函数,用于处理特定情况或提供额外的配置选项。
      """
      

      load_dataset 具体实施细节底层代码相对复杂,大致过程:

      step1:模块加工 --> dataset_module_factory
      step2: 类构建 --> get_dataset_builder_class
      

      返回结果如下:

      Dataset({
          features: ['history', 'input', 'output', 'instruction'],
          num_rows: 54
      })
      
      3.align_dataset

      数据对齐操作,转化为问答对形式

      Dataset({
          features: ['prompt', 'response', 'system', 'tools'],
          num_rows: 54
      })
      """
      展开来看:
      'prompt': [{'role': 'user', 'content': ''}]
      'response':[{'role': 'assistant', 'content': ''}]
      'system':''
      'tools':''
      """
      
      4.get_preprocess_and_print_func 数据处理

      转化模型所需三要素条件,input_ids,attention_mask,labels, 采用移位multi_label 方式,预测时,只计算回应的损失。

      """
      IGNORE_INDEX : -100
      source_ids: prompt   ---tokenzier2ids---> ids 
      target_ids: response
      """
      source_mask = [IGNORE_INDEX] * len(source_ids)
      input_ids += source_ids + target_ids
      labels += source_mask + target_ids
      
    • load_model

      model = AutoModelForCausalLM.from_pretrained(model_args.model_name_or_path, config=config, **init_kwargs)
      patch_model(model, tokenizer, model_args, is_trainable)
      # patch_model 的解释
      """
      model.generate = MethodType(PreTrainedModel.generate, model) 如果 generate 方法不是从 GenerationMixin 继承的,这行代码将 PreTrainedModel 类的 generate 方法动态地绑定到 model 对象上,以确保模型具有正确的生成功能。
      
      _resize_embedding_layer(model, tokenizer) 如果需要调整词汇表大小,这个函数会被调用,以调整模型的嵌入层大小以匹配新的词汇表。
      """
      
      • init_adapter

        接着看下 Lora 微调代码细节:

        from peft import LoraConfig
        lora_config = LoraConfig(
            task_type=TaskType.CAUSAL_LM,
            inference_mode=False,
            modules_to_save=finetuning_args.additional_target,
            use_dora=finetuning_args.use_dora,
            **peft_kwargs,
        )
        model = get_peft_model(model, lora_config)
        

        步进 PeftModelForCausalLM(PeftModel) , PeftModel(PushToHubMixin, torch.nn.Module), LoraModel(BaseTuner)

        转到 BaseTuner__init__方法,执行了inject_adapter 函数,该函数用于创建适配器层(adapter layers)并将其替换为目标模块(target modules)。调用 peft.mapping.get_peft_model 函数时,如果传入了非提示调优(non-prompt tuning)的适配器类,则在内部自动调用的方法。

        key_list = [key for key, _ in model.named_modules()
        # key_list 是模型结构模块, 列举如下            
        """
        'transformer.wte', 'transformer.drop', 'transformer.rotary_emb', 'transformer.h', 'transformer.h.0', 'transformer.h.0.ln_1', 'transformer.h.0.attn', 'transformer.h.0.attn.c_attn', 'transformer.h.0.attn.c_proj', 'transformer.h.0.attn...tn_dropout', 'transformer.h.0.ln_2', 'transformer.h.0.mlp'
        """      
        

        如果peft_config配置参数target_modules 中设置为 ‘c_attn’,则将模型中以 ’ c_attn’ 后缀结尾的 block,进行替换

        parent, target, target_name = _get_submodules(model, key)
        
        print(parent)
        QWenAttention(
          (c_attn): Linear(in_features=2048, out_features=6144, bias=True)
          (c_proj): Linear(in_features=2048, out_features=2048, bias=False)
          (attn_dropout): Dropout(p=0.0, inplace=False)
        )
        
        print(target)
        Linear(in_features=2048, out_features=6144, bias=True)
        
        print(target_name)
        c_attn
        

        接下来参数的核心是产生新的module 和取代旧的 module

        self._create_new_module(lora_config, adapter_name, target, **kwargs)
        self._replace_module(parent, target_name, new_module, target)
        

        新的module 产生:

        #  peft.tuners.lora.layer.py
        lora.Linear(
          (base_layer): Linear(in_features=5120, out_features=15360, bias=True)
          (lora_dropout): ModuleDict(
            (default): Identity()
          )
          (lora_A): ModuleDict(
            (default): Linear(in_features=5120, out_features=16, bias=False)
          )
          (lora_B): ModuleDict(
            (default): Linear(in_features=16, out_features=15360, bias=False)
          )
          (lora_embedding_A): ParameterDict()
          (lora_embedding_B): ParameterDict()
        )
        

        具体实施细节可以参看LoraLayer类和 self.update_layer 方法。

        new_module 结构如下:

        new_module:
        lora.Linear(
          (base_layer): Linear(in_features=5120, out_features=15360, bias=True)
          (lora_dropout): ModuleDict(
            (default): Identity()
          )
          (lora_A): ModuleDict(
            (default): Linear(in_features=5120, out_features=16, bias=False)
          )
          (lora_B): ModuleDict(
            (default): Linear(in_features=16, out_features=15360, bias=False)
          )
          (lora_embedding_A): ParameterDict()
          (lora_embedding_B): ParameterDict()
        )
        
            def _replace_module(self, parent, child_name, new_module, child):
                setattr(parent, child_name, new_module)
                if hasattr(child, "base_layer"):
                    child = child.base_layer
        
                if not hasattr(new_module, "base_layer"):
                    new_module.weight = child.weight
                    if hasattr(child, "bias"):
                        new_module.bias = child.bias
        
                if getattr(child, "state", None) is not None:
                    if hasattr(new_module, "base_layer"):
                        new_module.base_layer.state = child.state
                    else:
                        new_module.state = child.state
                    new_module.to(child.weight.device)
        
                for name, module in new_module.named_modules():
                    if (self.prefix in name) or ("ranknum" in name):
                        weight = child.qweight if hasattr(child, "qweight") else child.weight
                        module.to(weight.device)
        

        这块代码显示module替换的过程,setattr(parent, child_name, new_module) 这行代码将新模块 new_module 赋值给父模块 parent 的属性 child_name,从而替换掉原来的子模块 child.

        child = child.base_layer 如果 child 是一个包装器,这行代码将 child 设置为它的基础层(原始模块)

        weight = child.qweight if hasattr(child, "qweight") else child.weight 这行代码检查原始模块是否有 qweight 属性,这通常表示原始模块的权重已经被量化。

    • DataCollatorForSeq2Seq

      """迭代器,用于构建dataloader"""
      data_collator = DataCollatorForSeq2Seq(
      tokenizer=tokenizer,
      pad_to_multiple_of=8 if tokenizer.padding_side == "right" else None,  # for shift short attention
      label_pad_token_id=IGNORE_INDEX if data_args.ignore_pad_token_for_loss else tokenizer.pad_token_id,
      )
      
    • CustomSeq2SeqTrainer: Initialize Trainer

      trainer = CustomSeq2SeqTrainer(
                          model=model,
                          args=training_args,
                          finetuning_args=finetuning_args,
                          tokenizer=tokenizer,
                          data_collator=data_collator,
                          callbacks=callbacks,
                          compute_metrics=ComputeMetrics(tokenizer) if training_args.predict_with_generate else None,
                          **split_dataset(dataset, data_args, training_args),
      )
      """
      split_dataset(dataset, data_args, training_args)
      {'train_dataset': Dataset({
          featur...ows: 43
      }), 'eval_dataset': Dataset({
          featur...ows: 11
      })}
      """   	
      
  • pt 核心代码块

    这部分详细介绍下预训练的代码,结构流程与上节相似,差异性如下。

    get_dataset :不需要使用template模板,启用了“打包”(packing)功能,样本将被组织成一系列的组,每组内的文本项将是cutoff_len 长度,每个文本项之间用eos_token分隔.

     text_examples = [messages[0]["content"] + tokenizer.eos_token for messages in examples["prompt"]]
    

    提供三种训练方法: full ,lora ,freeze 三种方法,主要介绍后者,即只调模型中少量的block模块。

    • freeze

             freeze_modules = {"all"}
             for name, _ in model.named_modules():
                  if ".0." in name:
                      freeze_modules.add(name.split(".0.")[-1].split(".")[0])
                  elif ".1." in name:  # MoD starts from layer 1
                      freeze_modules.add(name.split(".1.")[-1].split(".")[0])     
      """
      name:模型中 block的命名.例如:
      transformer.h.0
      transformer.h.0.ln_1
      transformer.h.0.attn
      transformer.h.0.attn.c_attn
      transformer.h.0.attn.c_proj
      transformer.h.0.attn.attn_dropout
      transformer.h.0.ln_2
      transformer.h.0.mlp
      transformer.h.0.mlp.w1
      transformer.h.0.mlp.w2
      transformer.h.0.mlp.c_proj  
      """
      # 执行后 freeze_modules :{'ln_1', 'all', 'mlp', 'ln_2', 'attn'}     
              trainable_layers = []
              for module_name in finetuning_args.name_module_trainable:
                  if module_name not in freeze_modules:
                      raise ValueError(
                          "Module {} is not found, please choose from {}".format(module_name, ", ".join(freeze_modules))
                      )
      
                  for idx in trainable_layer_ids:
                      trainable_layers.append(".{:d}.{}".format(idx, module_name if module_name != "all" else ""))
      # 执行后确定可训练的模块 ['.22.', '.23.']
              for name, param in model.named_parameters():
                  if any(trainable_layer in name for trainable_layer in trainable_layers):
                      if (not finetuning_args.pure_bf16) and (not finetuning_args.use_badam):
                          param.data = param.data.to(torch.float32)
                  else:
                      param.requires_grad_(False)
      
  • 模型训练
    trainer.train(resume_from_checkpoint=training_args.resume_from_checkpoint)
    

    步入到 _inner_training_loop :

    step1: 加载数据迭代器 train_dataloader = self.get_train_dataloader()

    + 观察数据值 next(iter(train_dataloader))
    {'input_ids': tensor([[   854, 100817, 101923,  ..., 112277,  18493,  29490],
            [    23,   7948,     21,  ..., 100627, 104147, 100178]],
           device='cuda:0'), 
    'token_type_ids': tensor([[0, 0, 0,  ..., 0, 0, 0],
            [0, 0, 0,  ..., 0, 0, 0]], device='cuda:0'), 
    'attention_mask': tensor([[1, 1, 1,  ..., 1, 1, 1],
            [1, 1, 1,  ..., 1, 1, 1]], device='cuda:0'), 
    'labels': tensor([[   854, 100817, 101923,  ..., 112277,  18493,  29490],
            [    23,   7948,     21,  ..., 100627, 104147, 100178]],device='cuda:0')}
    

    step2:设置训练控制变量

     num_train_epochs
     num_update_steps_per_epoch
     max_steps
     num_examples
    

    假设yaml文件参数配置如下:

    num_train_epochs    :   8 
    per_device_train_batch_size   : 2 
    gradient_accumulation_steps  :  2  
    eval_steps           :    10  
    save_steps           :   500
    

    如果训练样本集合数量num_examples = 43 , 先计算整体total_train_batch_size = per_device_train_batch_size * gradient_accumulation_steps * n_gpus 。假设gpu数量设置1,那么总体total_train_batch_size = 4, 迭代一轮内,扫描完全部的训练样本,参数更新所需要总步数:num_update_steps_per_epoch = 43//4 = 11

    gradient_accumulation_steps 作用是累积梯度,使得模型参数在一次更新之前能够基于多个批次的梯度进行更新。具体来说,当 gradient_accumulation_steps 设置为一个大于1的整数时,每个批次的梯度不会立即用来更新模型参数,而是累积起来。直到累积了 gradient_accumulation_steps 个批次的梯度后,这些梯度才会被用来计算参数的更新。这种方法在训练时可以减少内存的使用,因为每个批次的样本数量减少了,同时保持了较大的有效批量大小,这对于模型的收敛和性能是有益的。

    step3: 设置优化器和任务调度器

    self.create_optimizer_and_scheduler(num_training_steps=max_steps)
    

    step4: 状态,回调,控制

    • TrainerState : 这个类包含一个内部状态(inner state),该状态在模型和优化器进行检查点(checkpointing)保存时会被一同保存下来,并且会传递给 TrainerCallback,并传递给 [TrainerCallback]。

      # 实例化
      self.state
      >>>
      TrainerState(epoch=0, global_step=0, max_steps=296, logging_steps=10, eval_steps=10, save_steps=500, train_batch_size=2, num_train_epochs=8, num_input_tokens_seen=0, total_flos=0, log_history=[], best_metric=None, best_model_checkpoint=None, is_local_process_zero=True, is_world_process_zero=True, is_hyper_param_search=False, trial_name=None, trial_params=None)
      
    • TrainerCallback: 这是一个内部类,它按顺序调用回调函数列表。

      # 实例化
      self.callback_handler
      # 可调用方法主要有:
      """
      30:'eval_dataloader'                31:'lr_scheduler'
      32:'model'                          33:'on_epoch_begin'
      34:'on_epoch_end'                   35:'on_evaluate'
      36:'on_init_end'					37:'on_log'
      38:'on_predict'						39:'on_prediction_step'
      40:'on_save'						41:'on_step_begin'
      42:'on_step_end'					43:'on_substep_end'
      44:'on_train_begin'					45:'on_train_end'
      46:'optimizer'						47:'pop_callback'
      48:'remove_callback'				49:'tokenizer'
      50:'train_dataloader'
      """
      
    • TrainerControl: 专门用来管理训练过程中控制流程的类。在训练过程中,可能需要根据某些条件改变流程,比如提前终止训练、改变学习率等。

      # 实例化
      self.control
      >>>
      TrainerControl(should_training_stop=False, should_epoch_stop=False, should_save=False, should_evaluate=False, should_log=False)
      
      • # 控制开关更新顺序
        self.control = self.callback_handler.on_train_begin(args, self.state, self.control)
        self.control = self.callback_handler.on_epoch_begin(args, self.state, self.control)
        self.control = self.callback_handler.on_step_begin(args, self.state, self.control)
        self.control = self.callback_handler.on_substep_end(args, self.state, self.control)
        self.control = self.callback_handler.on_step_end(args, self.state, self.control)
        ...
        ...
        self.control = self.callback_handler.on_train_end(args, self.state, self.control)
        

进入训练循环,对每批数据进行Loss 求解,并更新梯度。

with  self.accelerator.accumulate(model):
      tr_loss_step = self.training_step(model, inputs)
"""
tr_loss_step:
    tensor(1.4847, device='cuda:0')
"""  

整个训练代码逻辑已介绍完毕:

CUDA_VISIBLE_DEVICES=0 python src/train_bash.py \
    --stage sft \
    --do_train \
    --model_name_or_path path_to_llama_model \
    --dataset alpaca_gpt4_zh \
    --template default \
    --finetuning_type lora \
    --lora_target q_proj,v_proj \
    --output_dir path_to_sft_checkpoint \
    --overwrite_cache \
    --per_device_train_batch_size 4 \
    --gradient_accumulation_steps 4 \
    --lr_scheduler_type cosine \
    --logging_steps 10 \
    --save_steps 1000 \
    --learning_rate 5e-5 \
    --num_train_epochs 3.0 \
    --plot_loss \
    --fp16
  • 模型评估
    _maybe_log_save_evaluate
    evaluate
    evaluation_loop
    prediction_step

.