目录
■官方代码
# 克隆llama.cpp
git clone https://github.com/HimariO/llama.cpp.qwen2.5vl.git
cd llama.cpp.qwen2.5vl
git checkout qwen25-vl-20250404
■详解
qwen2_vl_surgery.py
这段代码用于将 Qwen2-VL 模型的视觉部分转换为 GGUF 格式,支持 fp32/fp16 精度。主要功能包括:
- 加载模型和配置;
- 提取视觉模块权重并重命名;
- 将权重写入 GGUF 文件,供推理使用。
import argparse
from typing import Dict
import torch
import numpy as np
from gguf import *
from transformers import (
AutoProcessor,
Qwen2VLConfig,
Qwen2VLProcessor,
Qwen2VLForConditionalGeneration,
Qwen2_5_VLConfig, # type: ignore[reportAttributeAccessIssue]
Qwen2_5_VLForConditionalGeneration, # type: ignore[reportAttributeAccessIssue]
)
VISION = "clip.vision"
def k(raw_key: str, arch: str) -> str:
"""
格式化键名字符串,将架构信息插入到键名模板中
参数:
raw_key (str): 包含架构占位符的键名模板字符串,使用 {arch} 作为占位符
arch (str): 架构名称,用于替换模板中的占位符
返回:
str: 格式化后的键名字符串,其中 {arch} 占位符被实际的架构名称替换
"""
return raw_key.format(arch=arch)
class VL2:
"""
VL2 类用于处理视觉-语言模型中的张量名称转换和提取视觉模块的权重。
"""
@staticmethod
def to_gguf_name(name: str) -> str:
"""
将原始模型中的参数名称转换为 GGUF 格式兼容的名称。
参数:
name (str): 原始模型中的参数名称。
返回:
str: 转换后的 GGUF 兼容名称。
"""
og = name
name = name.replace("text_model", "t").replace("vision_model", "v")
name = name.replace("blocks", "blk").replace("embeddings.", "")
name = name.replace("attn.", "attn_")
name = name.replace("mlp.fc1", "ffn_down").replace("mlp.fc2", "ffn_up").replace("proj.", "out.")
# name = name.replace("layrnorm", "ln").replace("layer_norm", "ln").replace("layernorm", "ln")
name = name.replace("norm1", "ln1").replace("norm2", "ln2")
name = name.replace("merger.mlp", 'mm')
print(f"[to_gguf_name] {og} --> {name}")
return name
@classmethod
def find_vision_tensors(cls, qwen2vl, dtype) -> Dict[str, np.ndarray]:
"""
提取视觉模型中的所有张量,并根据需要进行重命名和拆分。
参数:
qwen2vl: 包含视觉模型的完整模型对象。
dtype: 目标数据类型,用于非归一化层权重的转换。
返回:
Dict[str, np.ndarray]: 一个字典,键是转换后的张量名称,值是对应的 NumPy 数组。
"""
vision_model = qwen2vl.visual
tensor_map = {}
# 遍历视觉模型的所有状态字典项
for name, ten in vision_model.state_dict().items():
ten = ten.numpy()
# 处理 QKV 合并的线性层(注意力机制中常见的合并查询、键、值)
if 'qkv' in name:
if ten.ndim == 2: # 权重矩阵
c3, _ = ten.shape
else: # 偏置向量
c3 = ten.shape[0]
assert c3 % 3 == 0
c = c3 // 3
wq = ten[:c]
wk = ten[c: c * 2]
wv = ten[c * 2:]
base_name = f"vision_model.{name}"
# 分别保存 Q、K、V 的权重
tensor_map[cls.to_gguf_name(base_name).replace("qkv", "q")] = wq
tensor_map[cls.to_gguf_name(base_name).replace("qkv", "k")] = wk
tensor_map[cls.to_gguf_name(base_name).replace("qkv", "v")] = wv
# 处理 merger 模块中的 MLP 和 LayerNorm 层
elif 'merger' in name:
if name.endswith("ln_q.weight"):
tensor_map['v.post_ln.weight'] = ten
elif name.endswith("ln_q.bias"):
tensor_map['v.post_ln.bias'] = ten
else:
# "merger.mlp.%d.weight/bias" --> "mm.%d.weight/bias"
tensor_map[cls.to_gguf_name(name)] = ten
# 特殊处理 patch embedding 中的 3D 卷积核,将其拆分为两个 2D 卷积核
elif 'patch_embed.proj.weight' in name:
# NOTE: split Conv3D into Conv2Ds
# 从输入张量中提取时空patch嵌入权重
# 该函数假设输入张量包含两个时间步的patch嵌入信息
#
# 参数:
# ten: 输入张量,形状为(c1, c2, kt, kh, kw)
# c1, c2: 通道维度
# kt: 时间维度,当前实现要求必须为2
# kh, kw: 空间维度(高度和宽度)
# tensor_map: 字典,用于存储提取的权重张量
#
# 重要假设:
# - 时间patch大小必须为2,这是当前实现的限制
# - 输入张量的前两个维度表示通道信息
# - 第三个维度表示时间步,只处理两个时间步的情况
#
# 处理逻辑:
# - 将输入张量在时间维度上分离
# - 第0个时间步的权重存储为"v.patch_embd.weight"
# - 第1个时间步的权重存储为"v.patch_embd.weight.1"
c1, c2, kt, kh, kw = ten.shape
assert kt == 2, "Current implmentation only support temporal_patch_size of 2"
tensor_map["v.patch_embd.weight"] = ten[:, :, 0, ...]
tensor_map["v.patch_embd.weight.1"] = ten[:, :, 1, ...]
# 其他常规张量直接映射并重命名
else:
tensor_map[cls.to_gguf_name(f"vision_model.{name}")] = ten
# 根据张量维度决定是否转换为指定的数据类型
for new_name, ten in tensor_map.items():
if ten.ndim <= 1 or new_name.endswith("_norm.weight"):
tensor_map[new_name] = ten.astype(np.float32)
else:
tensor_map[new_name] = ten.astype(dtype)
# 添加一个占位的位置编码张量(dummy tensor)
# 该代码块用于在tensor_map字典中添加一个名为"v.position_embd.weight"的位置编码权重张量,
# 该张量初始化为10x10的零矩阵,数据类型为float32,作为占位符使用
tensor_map["v.position_embd.weight"] = np.zeros([10, 10], dtype=np.float32)
return tensor_map
class VL25(VL2):
"""
VL25类继承自VL2类,用于处理模型名称到GGUF格式名称的转换。
"""
@staticmethod
def to_gguf_name(name: str) -> str:
"""
将模型层名称转换为GGUF格式的名称。
该函数通过一系列字符串替换操作,将原始模型名称中的特定关键词
替换为GGUF格式约定的缩写形式。
参数:
name (str): 原始模型层名称
返回:
str: 转换后的GGUF格式名称
"""
og = name
# 替换模型类型相关关键词
name = name.replace("text_model", "t").replace("vision_model", "v")
# 替换结构相关关键词
name = name.replace("blocks", "blk").replace("embeddings.", "")
# 替换注意力机制相关关键词
name = name.replace("attn.", "attn_")
# 替换MLP层相关关键词
name = name.replace("mlp.down_proj", "ffn_down").replace("mlp.up_proj", "ffn_up")
name = name.replace("mlp.gate_proj", "ffn_gate").replace("proj.", "out.")
# 替换归一化层相关关键词
name = name.replace("norm1", "ln1").replace("norm2", "ln2")
# 替换融合模块相关关键词
name = name.replace("merger.mlp", 'mm')
print(f"[vl25][to_gguf_name] {og} --> {name}")
return name
def main(args):
"""
主函数,用于将 Qwen2VL 或 Qwen2.5VL 模型的视觉编码器部分导出为 GGUF 格式。
参数:
args: 命令行参数对象,包含以下属性:
- data_type (str): 数据类型,支持 'fp32' 或 'fp16'。
- model_name (str): 模型名称或本地路径。
- model_type (str): 模型类型,支持 "qwen2vl" 或 "qwen2.5vl"。
返回值:
无返回值。输出为一个以 `-vision.gguf` 结尾的 GGUF 文件。
"""
# 根据指定的数据类型设置 PyTorch 和 NumPy 的数据类型以及 GGUF 文件类型标识
if args.data_type == 'fp32':
dtype = torch.float32
np_dtype = np.float32
ftype = 0
elif args.data_type == 'fp16':
dtype = torch.float16
np_dtype = np.float16
ftype = 1
else:
raise ValueError()
local_model = False
model_path = ""
model_name = args.model_name
print("model_name: ", model_name)
# 加载对应类型的模型并获取其配置信息和视觉配置
if args.model_type == "qwen2vl":
qwen2vl = Qwen2VLForConditionalGeneration.from_pretrained(
model_name, torch_dtype=dtype, device_map="cpu"
)
cfg: Qwen2VLConfig = qwen2vl.config # type: ignore[reportAssignmentType]
vcfg = cfg.vision_config
else:
qwen2vl = Qwen2_5_VLForConditionalGeneration.from_pretrained(
model_name, torch_dtype=dtype, device_map="cpu"
)
cfg: Qwen2_5_VLConfig = qwen2vl.config # type: ignore[reportAssignmentType]
vcfg = cfg.vision_config
# 判断模型是否来自本地路径,并处理路径和模型名
if os.path.isdir(model_name):
local_model = True
if model_name.endswith(os.sep):
model_name = model_name[:-1]
model_path = model_name
model_name = os.path.basename(model_name)
# 设置输出文件名
fname_out = f"{model_name.replace('/', '-').lower()}-vision.gguf"
# 初始化 GGUF 写入器并添加基本元数据
fout = GGUFWriter(path=fname_out, arch="clip")
fout.add_description("image encoder for Qwen2VL")
fout.add_file_type(ftype)
fout.add_bool("clip.has_text_encoder", False)
fout.add_bool("clip.has_vision_encoder", True)
fout.add_bool("clip.has_qwen2vl_merger", True)
fout.add_string("clip.projector_type", "qwen2vl_merger")
# 根据激活函数类型设置相应的布尔标志
print(cfg.vision_config)
if 'silu' in cfg.vision_config.hidden_act.lower():
fout.add_bool("clip.use_silu", True)
fout.add_bool("clip.use_gelu", False)
elif 'gelu' in cfg.vision_config.hidden_act.lower():
fout.add_bool("clip.use_silu", False)
fout.add_bool("clip.use_gelu", 'quick' not in cfg.vision_config.hidden_act.lower())
else:
raise ValueError()
# 根据模型类型添加特定的视觉模型参数
if args.model_type == "qwen2.5vl":
fout.add_bool("clip.use_glu_mlp", True) # gate linear unit MLP layer in vision model
fout.add_bool("clip.use_rms_norm", True)
fout.add_array("clip.vision.fullatt_block_indexes", vcfg.fullatt_block_indexes)
fout.add_uint32("clip.vision.window_size", vcfg.window_size)
fout.add_uint32(k(KEY_EMBEDDING_LENGTH, VISION), vcfg.hidden_size)
fout.add_uint32("clip.vision.projection_dim", vcfg.out_hidden_size)
else:
fout.add_uint32(k(KEY_EMBEDDING_LENGTH, VISION), vcfg.embed_dim)
fout.add_uint32("clip.vision.projection_dim", vcfg.hidden_size)
# 获取模型中的视觉相关张量并写入 GGUF 文件
if args.model_type == "qwen2.5vl":
tensor_map = VL25.find_vision_tensors(qwen2vl, np_dtype)
else:
tensor_map = VL2.find_vision_tensors(qwen2vl, np_dtype)
for name, data in tensor_map.items():
fout.add_tensor(name, data)
# 添加视觉模型的基本结构参数
fout.add_uint32("clip.vision.patch_size", vcfg.patch_size)
fout.add_uint32("clip.vision.image_size", 14 * 40) # some reasonable size that is divable by (14*2)
fout.add_uint32(k(KEY_ATTENTION_HEAD_COUNT, VISION), vcfg.num_heads)
fout.add_float32(k(KEY_ATTENTION_LAYERNORM_EPS, VISION), 1e-6)
fout.add_uint32(k(KEY_BLOCK_COUNT, VISION), vcfg.depth)
fout.add_uint32(k(KEY_FEED_FORWARD_LENGTH, VISION), 0) # not sure what this does, put 0 here as a placeholder
fout.add_name(model_name)
"""
HACK: Since vision rope related parameter aren't stored in the `Qwen2VLConfig,
it will be hardcoded in the `clip_image_build_graph` from `clip.cpp`.
"""
# 加载处理器以获取图像预处理参数(均值和标准差)
if local_model:
processor: Qwen2VLProcessor = AutoProcessor.from_pretrained(model_path)
else:
processor: Qwen2VLProcessor = AutoProcessor.from_pretrained(model_name)
fout.add_array("clip.vision.image_mean", processor.image_processor.image_mean) # type: ignore[reportAttributeAccessIssue]
fout.add_array("clip.vision.image_std", processor.image_processor.image_std) # type: ignore[reportAttributeAccessIssue]
# 将所有数据写入文件并关闭写入器
fout.write_header_to_file()
fout.write_kv_data_to_file()
fout.write_tensors_to_file()
fout.close()
print("save model as: ", fname_out)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("model_name", nargs='?', default="Qwen/Qwen2-VL-2B-Instruct")
parser.add_argument("--model_type", nargs='?', choices=['qwen2vl', 'qwen2.5vl'], default="qwen2vl")
parser.add_argument("--data_type", nargs='?', choices=['fp32', 'fp16'], default="fp32")
args = parser.parse_args()
main(args)
至此,本文分享的内容就结束了。