【llama.cpp】qwen2_vl_surgery.py详解

发布于:2025-09-05 ⋅ 阅读:(25) ⋅ 点赞:(0)

目录

■官方代码

■详解

qwen2_vl_surgery.py



■官方代码

# 克隆llama.cpp

git clone https://github.com/HimariO/llama.cpp.qwen2.5vl.git

cd llama.cpp.qwen2.5vl

git checkout qwen25-vl-20250404

■详解

qwen2_vl_surgery.py

这段代码用于将 Qwen2-VL 模型的视觉部分转换为 GGUF 格式,支持 fp32/fp16 精度。主要功能包括:

  1. 加载模型和配置;
  2. 提取视觉模块权重并重命名;
  3. 将权重写入 GGUF 文件,供推理使用。
import argparse
from typing import Dict

import torch
import numpy as np
from gguf import *
from transformers import (
    AutoProcessor,
    Qwen2VLConfig,
    Qwen2VLProcessor,
    Qwen2VLForConditionalGeneration,
    Qwen2_5_VLConfig, # type: ignore[reportAttributeAccessIssue]
    Qwen2_5_VLForConditionalGeneration, # type: ignore[reportAttributeAccessIssue]
)


VISION = "clip.vision"


def k(raw_key: str, arch: str) -> str:
    """
    格式化键名字符串,将架构信息插入到键名模板中
    
    参数:
        raw_key (str): 包含架构占位符的键名模板字符串,使用 {arch} 作为占位符
        arch (str): 架构名称,用于替换模板中的占位符
    
    返回:
        str: 格式化后的键名字符串,其中 {arch} 占位符被实际的架构名称替换
    """
    return raw_key.format(arch=arch)

class VL2:
    """
    VL2 类用于处理视觉-语言模型中的张量名称转换和提取视觉模块的权重。
    """

    @staticmethod
    def to_gguf_name(name: str) -> str:
        """
        将原始模型中的参数名称转换为 GGUF 格式兼容的名称。

        参数:
            name (str): 原始模型中的参数名称。

        返回:
            str: 转换后的 GGUF 兼容名称。
        """
        og = name
        name = name.replace("text_model", "t").replace("vision_model", "v")
        name = name.replace("blocks", "blk").replace("embeddings.", "")
        name = name.replace("attn.", "attn_")
        name = name.replace("mlp.fc1", "ffn_down").replace("mlp.fc2", "ffn_up").replace("proj.", "out.")
        # name = name.replace("layrnorm", "ln").replace("layer_norm", "ln").replace("layernorm", "ln")
        name = name.replace("norm1", "ln1").replace("norm2", "ln2")
        name = name.replace("merger.mlp", 'mm')
        print(f"[to_gguf_name] {og} --> {name}")
        return name

    @classmethod
    def find_vision_tensors(cls, qwen2vl, dtype) -> Dict[str, np.ndarray]:
        """
        提取视觉模型中的所有张量,并根据需要进行重命名和拆分。

        参数:
            qwen2vl: 包含视觉模型的完整模型对象。
            dtype: 目标数据类型,用于非归一化层权重的转换。

        返回:
            Dict[str, np.ndarray]: 一个字典,键是转换后的张量名称,值是对应的 NumPy 数组。
        """
        vision_model = qwen2vl.visual
        tensor_map = {}

        # 遍历视觉模型的所有状态字典项
        for name, ten in vision_model.state_dict().items():
            ten = ten.numpy()

            # 处理 QKV 合并的线性层(注意力机制中常见的合并查询、键、值)
            if 'qkv' in name:
                if ten.ndim == 2:  # 权重矩阵
                    c3, _ = ten.shape
                else:              # 偏置向量
                    c3 = ten.shape[0]
                assert c3 % 3 == 0
                c = c3 // 3
                wq = ten[:c]
                wk = ten[c: c * 2]
                wv = ten[c * 2:]

                base_name = f"vision_model.{name}"
                # 分别保存 Q、K、V 的权重
                tensor_map[cls.to_gguf_name(base_name).replace("qkv", "q")] = wq
                tensor_map[cls.to_gguf_name(base_name).replace("qkv", "k")] = wk
                tensor_map[cls.to_gguf_name(base_name).replace("qkv", "v")] = wv

            # 处理 merger 模块中的 MLP 和 LayerNorm 层
            elif 'merger' in name:
                if name.endswith("ln_q.weight"):
                    tensor_map['v.post_ln.weight'] = ten
                elif name.endswith("ln_q.bias"):
                    tensor_map['v.post_ln.bias'] = ten
                else:
                    # "merger.mlp.%d.weight/bias" --> "mm.%d.weight/bias"
                    tensor_map[cls.to_gguf_name(name)] = ten

            # 特殊处理 patch embedding 中的 3D 卷积核,将其拆分为两个 2D 卷积核
            elif 'patch_embed.proj.weight' in name:
                # NOTE: split Conv3D into Conv2Ds
                # 从输入张量中提取时空patch嵌入权重
                # 该函数假设输入张量包含两个时间步的patch嵌入信息
                # 
                # 参数:
                #     ten: 输入张量,形状为(c1, c2, kt, kh, kw)
                #          c1, c2: 通道维度
                #          kt: 时间维度,当前实现要求必须为2
                #          kh, kw: 空间维度(高度和宽度)
                #     tensor_map: 字典,用于存储提取的权重张量
                #
                # 重要假设:
                #     - 时间patch大小必须为2,这是当前实现的限制
                #     - 输入张量的前两个维度表示通道信息
                #     - 第三个维度表示时间步,只处理两个时间步的情况
                #
                # 处理逻辑:
                #     - 将输入张量在时间维度上分离
                #     - 第0个时间步的权重存储为"v.patch_embd.weight"
                #     - 第1个时间步的权重存储为"v.patch_embd.weight.1"
                
                c1, c2, kt, kh, kw = ten.shape
                assert kt == 2, "Current implmentation only support temporal_patch_size of 2"
                tensor_map["v.patch_embd.weight"] = ten[:, :, 0, ...]
                tensor_map["v.patch_embd.weight.1"] = ten[:, :, 1, ...]
            # 其他常规张量直接映射并重命名
            else:
                tensor_map[cls.to_gguf_name(f"vision_model.{name}")] = ten

        # 根据张量维度决定是否转换为指定的数据类型
        for new_name, ten in tensor_map.items():
            if ten.ndim <= 1 or new_name.endswith("_norm.weight"):
                tensor_map[new_name] = ten.astype(np.float32)
            else:
                tensor_map[new_name] = ten.astype(dtype)

        # 添加一个占位的位置编码张量(dummy tensor)
        # 该代码块用于在tensor_map字典中添加一个名为"v.position_embd.weight"的位置编码权重张量,
        # 该张量初始化为10x10的零矩阵,数据类型为float32,作为占位符使用
        tensor_map["v.position_embd.weight"] = np.zeros([10, 10], dtype=np.float32)

        return tensor_map


class VL25(VL2):
    """
    VL25类继承自VL2类,用于处理模型名称到GGUF格式名称的转换。
    """

    @staticmethod
    def to_gguf_name(name: str) -> str:
        """
        将模型层名称转换为GGUF格式的名称。
        
        该函数通过一系列字符串替换操作,将原始模型名称中的特定关键词
        替换为GGUF格式约定的缩写形式。
        
        参数:
            name (str): 原始模型层名称
            
        返回:
            str: 转换后的GGUF格式名称
        """
        og = name
        
        # 替换模型类型相关关键词
        name = name.replace("text_model", "t").replace("vision_model", "v")
        
        # 替换结构相关关键词
        name = name.replace("blocks", "blk").replace("embeddings.", "")
        
        # 替换注意力机制相关关键词
        name = name.replace("attn.", "attn_")
        
        # 替换MLP层相关关键词
        name = name.replace("mlp.down_proj", "ffn_down").replace("mlp.up_proj", "ffn_up")
        name = name.replace("mlp.gate_proj", "ffn_gate").replace("proj.", "out.")
        
        # 替换归一化层相关关键词
        name = name.replace("norm1", "ln1").replace("norm2", "ln2")
        
        # 替换融合模块相关关键词
        name = name.replace("merger.mlp", 'mm')
        
        print(f"[vl25][to_gguf_name] {og} --> {name}")
        return name


def main(args):
    """
    主函数,用于将 Qwen2VL 或 Qwen2.5VL 模型的视觉编码器部分导出为 GGUF 格式。

    参数:
        args: 命令行参数对象,包含以下属性:
            - data_type (str): 数据类型,支持 'fp32' 或 'fp16'。
            - model_name (str): 模型名称或本地路径。
            - model_type (str): 模型类型,支持 "qwen2vl" 或 "qwen2.5vl"。

    返回值:
        无返回值。输出为一个以 `-vision.gguf` 结尾的 GGUF 文件。
    """

    # 根据指定的数据类型设置 PyTorch 和 NumPy 的数据类型以及 GGUF 文件类型标识
    if args.data_type == 'fp32':
        dtype = torch.float32
        np_dtype = np.float32
        ftype = 0
    elif args.data_type == 'fp16':
        dtype = torch.float16
        np_dtype = np.float16
        ftype = 1
    else:
        raise ValueError()

    local_model = False
    model_path = ""
    model_name = args.model_name
    print("model_name: ", model_name)

    # 加载对应类型的模型并获取其配置信息和视觉配置
    if args.model_type == "qwen2vl":
        qwen2vl = Qwen2VLForConditionalGeneration.from_pretrained(
            model_name, torch_dtype=dtype, device_map="cpu"
        )
        cfg: Qwen2VLConfig = qwen2vl.config  # type: ignore[reportAssignmentType]
        vcfg = cfg.vision_config
    else:
        qwen2vl = Qwen2_5_VLForConditionalGeneration.from_pretrained(
            model_name, torch_dtype=dtype, device_map="cpu"
        )
        cfg: Qwen2_5_VLConfig = qwen2vl.config  # type: ignore[reportAssignmentType]
        vcfg = cfg.vision_config

    # 判断模型是否来自本地路径,并处理路径和模型名
    if os.path.isdir(model_name):
        local_model = True
        if model_name.endswith(os.sep):
            model_name = model_name[:-1]
        model_path = model_name
        model_name = os.path.basename(model_name)

    # 设置输出文件名
    fname_out = f"{model_name.replace('/', '-').lower()}-vision.gguf"

    # 初始化 GGUF 写入器并添加基本元数据
    fout = GGUFWriter(path=fname_out, arch="clip")
    fout.add_description("image encoder for Qwen2VL")
    fout.add_file_type(ftype)
    fout.add_bool("clip.has_text_encoder", False)
    fout.add_bool("clip.has_vision_encoder", True)
    fout.add_bool("clip.has_qwen2vl_merger", True)
    fout.add_string("clip.projector_type", "qwen2vl_merger")

    # 根据激活函数类型设置相应的布尔标志
    print(cfg.vision_config)
    if 'silu' in cfg.vision_config.hidden_act.lower():
        fout.add_bool("clip.use_silu", True)
        fout.add_bool("clip.use_gelu", False)
    elif 'gelu' in cfg.vision_config.hidden_act.lower():
        fout.add_bool("clip.use_silu", False)
        fout.add_bool("clip.use_gelu", 'quick' not in cfg.vision_config.hidden_act.lower())
    else:
        raise ValueError()

    # 根据模型类型添加特定的视觉模型参数
    if args.model_type == "qwen2.5vl":
        fout.add_bool("clip.use_glu_mlp", True)  # gate linear unit MLP layer in vision model
        fout.add_bool("clip.use_rms_norm", True)
        fout.add_array("clip.vision.fullatt_block_indexes", vcfg.fullatt_block_indexes)
        fout.add_uint32("clip.vision.window_size", vcfg.window_size)
        fout.add_uint32(k(KEY_EMBEDDING_LENGTH, VISION), vcfg.hidden_size)
        fout.add_uint32("clip.vision.projection_dim", vcfg.out_hidden_size)
    else:
        fout.add_uint32(k(KEY_EMBEDDING_LENGTH, VISION), vcfg.embed_dim)
        fout.add_uint32("clip.vision.projection_dim", vcfg.hidden_size)

    # 获取模型中的视觉相关张量并写入 GGUF 文件
    if args.model_type == "qwen2.5vl":
        tensor_map = VL25.find_vision_tensors(qwen2vl, np_dtype)
    else:
        tensor_map = VL2.find_vision_tensors(qwen2vl, np_dtype)
    for name, data in tensor_map.items():
        fout.add_tensor(name, data)

    # 添加视觉模型的基本结构参数
    fout.add_uint32("clip.vision.patch_size", vcfg.patch_size)
    fout.add_uint32("clip.vision.image_size", 14 * 40)  # some reasonable size that is divable by (14*2)
    fout.add_uint32(k(KEY_ATTENTION_HEAD_COUNT, VISION), vcfg.num_heads)
    fout.add_float32(k(KEY_ATTENTION_LAYERNORM_EPS, VISION), 1e-6)
    fout.add_uint32(k(KEY_BLOCK_COUNT, VISION), vcfg.depth)
    fout.add_uint32(k(KEY_FEED_FORWARD_LENGTH, VISION), 0)  # not sure what this does, put 0 here as a placeholder
    fout.add_name(model_name)

    """
    HACK: Since vision rope related parameter aren't stored in the `Qwen2VLConfig,
            it will be hardcoded in the `clip_image_build_graph` from `clip.cpp`.
    """

    # 加载处理器以获取图像预处理参数(均值和标准差)
    if local_model:
        processor: Qwen2VLProcessor = AutoProcessor.from_pretrained(model_path)
    else:
        processor: Qwen2VLProcessor = AutoProcessor.from_pretrained(model_name)
    fout.add_array("clip.vision.image_mean", processor.image_processor.image_mean) # type: ignore[reportAttributeAccessIssue]
    fout.add_array("clip.vision.image_std", processor.image_processor.image_std) # type: ignore[reportAttributeAccessIssue]

    # 将所有数据写入文件并关闭写入器
    fout.write_header_to_file()
    fout.write_kv_data_to_file()
    fout.write_tensors_to_file()
    fout.close()
    print("save model as: ", fname_out)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("model_name", nargs='?', default="Qwen/Qwen2-VL-2B-Instruct")
    parser.add_argument("--model_type", nargs='?', choices=['qwen2vl', 'qwen2.5vl'], default="qwen2vl")
    parser.add_argument("--data_type", nargs='?', choices=['fp32', 'fp16'], default="fp32")
    args = parser.parse_args()
    main(args)

至此,本文分享的内容就结束了。


网站公告

今日签到

点亮在社区的每一天
去签到