import fitz # PyMuPDF
import json
from pathlib import Path
def process_pdfs_to_chunks(datas_dir: Path, output_json_path: Path):
"""
使用 PyMuPDF 直接从 PDF 提取每页文本,并生成最终的 JSON 文件。
Args:
datas_dir (Path): 包含 PDF 文件的输入目录。
output_json_path (Path): 最终输出的 JSON 文件路径。
"""
all_chunks = []
# 递归查找 datas_dir 目录下的所有 .pdf 文件
pdf_files = list(datas_dir.rglob('*.pdf'))
if not pdf_files:
print(f"警告:在目录 '{datas_dir}' 中未找到任何 PDF 文件。")
return
print(f"找到 {len(pdf_files)} 个 PDF 文件,开始处理...")
for pdf_path in pdf_files:
file_name_stem = pdf_path.stem # 文件名(不含扩展名)
full_file_name = pdf_path.name # 完整文件名(含扩展名)
print(f" - 正在处理: {full_file_name}")
try:
# 使用 with 语句确保文件被正确关闭
with fitz.open(pdf_path) as doc:
# 遍历 PDF 的每一页
for page_idx, page in enumerate(doc):
# 提取当前页面的所有文本
content = page.get_text("text")
# 如果页面没有文本内容,则跳过
if not content.strip():
continue
# 构建符合最终格式的 chunk 字典
chunk = {
"id": f"{file_name_stem}_page_{page_idx}",
"content": content,
"metadata": {
"page": page_idx, # 0-based page index
"file_name": full_file_name
}
}
all_chunks.append(chunk)
except Exception as e:
print(f"处理文件 '{pdf_path}' 时发生错误: {e}")
# 确保输出目录存在
output_json_path.parent.mkdir(parents=True, exist_ok=True)
# 将所有 chunks 写入一个 JSON 文件
with open(output_json_path, 'w', encoding='utf-8') as f:
json.dump(all_chunks, f, ensure_ascii=False, indent=2)
print(f"\n处理完成!所有内容已保存至: {output_json_path}")
def main():
base_dir = Path(__file__).parent
datas_dir = base_dir / 'datas'
chunk_json_path = base_dir / 'all_pdf_page_chunks.json'
process_pdfs_to_chunks(datas_dir, chunk_json_path)
if __name__ == '__main__':
main()
本次赛题的核心目标是打造一个能看懂图片、读懂文字、并将两者关联起来思考的AI助手,构建一个先进的智能问答系统,以应对真实世界中复杂的、图文混排的信息环境。
让 AI模型能够阅读并理解包含大量图标、图像和文字的pdf文档 ,基于信息回答用户问题。
能找到答案的同时还需要标注出答案的出处,比如源自于哪一个文件的哪一页。
多模态检索增强生成 (Multimodal RAG)!其中需要涉及到——
多模态信息处理 (Multimodal Information Processing)、向量化与检索技术 (Embeddings & Retrieval)
跨模态检索与关联 (Cross-Modal Retrieval)、大语言模型(LLM)的应用与推理 (LLM Application & Reasoning)
相关知识点及参考资料
PDF文档解析库PyMuPDF官方教程:PyMuPDF 1.26.3 documentation
强大的中文OCR工具PaddleOCR:https://github.com/PaddlePaddle/PaddleOCR
领先的中文文本向量化模型库FlagEmbedding (BGE模型):https://github.com/FlagOpen/FlagEmbedding
经典图文多模态向量化模型CLIP (Hugging Face实现):https://huggingface.co/docs/transformers/model_doc/clip
高性能向量检索引擎FAISS入门指南:https://github.com/facebookresearch/faiss/wiki/Getting-started
简单易用的向量数据库ChromaDB快速上手:Getting Started - Chroma Docs
通义千问Qwen大模型官方仓库 (含多模态VL模型):https://github.com/QwenLM/Qwen-VL
集成化RAG开发框架LlamaIndex五分钟入门:Redirecting...
Xinference官方仓库(模型推理框架):
https://github.com/xorbitsai/inference
此次 多模态RAG任务 有四大核心要素
此次赛题的核心不仅仅是简单的问答,而是基于给定的pdf知识库的、可溯源的多模态 问答。
数据源:一堆图文混排的PDF,这是我们唯一的数据。
可溯源:必须明确指出答案的出处。
多模态:问题可能需要理解文本,也可能需要理解图表(图像)。
问答:根据检索的信息生成一个回答。
输出 (Output):我们需要提交什么?
我们的最终任务是为 test.json
中的每一个问题,预测出三个信息: 答案 ( answer
) 、 来源文件名 ( filename
) 和 来源页码 ( page
) 。
总结一下 :整个任务流程就是,
读取
test.json
里的一个问题,驱动你的系统去
财报数据库
中查找信息,然后生成答案和出处,
最后将这几项信息作为一行写入到最终的
submit.json
文件中。
对 test.json
中的所有问题重复此过程,即可得到最终的提交文件.
其中的train.json
文件主要是用来在训练非生成式模型环节中使用的,比如训练embedding模型,或者是微调LLM。
遇到的卡点及解决建议
在实际操作中,最主要的瓶颈在于 时间消耗 :
pymupdf解析 :处理整个财报数据库会稍微消耗一些时间,特别是如果使用基于深度学习的方式提取内容,比如mineru,不过我们本次baseline使用pymupdf速度会有比较大的提升。
批量Embedding :将接近5000的内容块进行向量化,也会消耗不少时间,如果是基于CPU运行的话大概会慢十倍,使用A6000这样的GPU也需要消耗大概1分钟的时间。
核心痛点 :如果在处理过程中代码出现一个小错误,比如数据格式没对齐,就需要从头再来,这将浪费大量时间。
解决与建议 : 不要在一个脚本里完成所有事 。强烈建议使用 Jupyter Notebook 进行开发调试,并将流程拆分:
第一阶段:解析 。在一个Notebook中,专门负责调用pymupdf,将所有PDF解析为JSON并 保存到本地 。这个阶段成功运行一次后,就不再需要重复执行。
第二阶段:预处理与Embedding 。在另一个Notebook中,读取第一步生成的JSON文件,进行图片描述生成、数据清洗,并调用Embedding模型。将最终包含向量的知识库 保存为持久化文件 。
第三阶段:检索与生成 。在第三个Notebook中,加载第二步保存好的知识库,专注于调试检索逻辑和Prompt工程。
通过这种 分步执行、缓存中间结果 的方式,可以极大地提高调试效率,每次修改只需运行对应的、耗时较短的模块。
重申一下关系:
脚本 | 职责 | 输入 → 输出 |
---|---|---|
fitz_pipeline_all_2.py |
纯 PDF 解析 + 切分 | datas/*.pdf → all_pdf_page_chunks.json (仅文本) |
rag_from_page_chunks.py |
Embedding + 检索 + LLM 问答 | all_pdf_page_chunks.json → 向量库 → rag_top1_pred.json (问答结果) |
前者只负责“把 PDF 变成可溯源的文本块”;后者完全依赖前者生成的 all_pdf_page_chunks.json
,再完成“向量化 → 语义检索 → LLM 回答”。
两个脚本在数据流上是上下游关系,没有重复计算。
先安装一下新库PyMuPDF:
pip install -i https://pypi.mirrors.ustc.edu.cn/simple/ \
PyMuPDF langchain langchain-text-splitters tiktoken
第一次运行时解压了财报数据库,以后再运行时可以跳过此步骤,修改程序为:
import os
import zipfile
output_dir = "datas/财报数据库"
zip_file = "datas/财报数据库.zip"
if not os.path.exists(output_dir) or not os.listdir(output_dir):
with zipfile.ZipFile(zip_file, 'r') as z:
z.extractall("datas/")
print("已解压财报数据库。")
else:
print("财报数据库已存在,跳过解压。")
第二句不变:
!pip install -r requirements.txt
第三句原为:
!python fitz_pipeline_all_1.py
改为:
import os, subprocess
json_file = "all_pdf_page_chunks.json"
if not os.path.exists(json_file):
subprocess.run(["python", "fitz_pipeline_all_1.py"])
else:
print("JSON 已存在,跳过提取。")
原文件rag_from_page_chunks.py 程序是一套端到端(RAG)流水线,作用可以一句话概括为:
把“PDF 年报→问答”全自动完成:先给每段文本做向量索引,再对用户问题做语义检索,最后用本地大模型生成带出处(文件名+页码)的答案。
具体拆成 4 步:
加载
读取all_pdf_page_chunks.json
(由前面fitz_pipeline_all.py
生成),得到{content, metadata:{filename, page}}
列表。向量化
用本地 Embedding 模型(OpenAI-API 格式)把content
转成向量,存进SimpleVectorStore
,建立可语义检索的索引。检索
用户输入问题 → 同样转成向量 → 在向量库中做余弦相似度检索,返回最相关前k
段原文。生成答案
把检索到的原文拼接成上下文,连同问题一起送给本地 LLM(Qwen/Kimi/GLM 等),要求模型输出 固定 JSON
最后决定拆成两个部份:
build_index.py # 只负责 Embedding → 保存向量库
rag_eval.py # 只负责检索 + 生成
需要反复调试 Prompt / 时间敏感:拆能省 90 % 时间。
build_index.py如下:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
build_index.py
功能:读取 all_pdf_page_chunks.json,为所有 chunk 生成 Embedding,
并将 (chunks, embeddings) 持久化到 vector_store.pkl
依赖:pip install PyMuPDF tiktoken tqdm
"""
import json
import pickle
import os
import hashlib
from pathlib import Path
from typing import List, Dict, Any
from get_text_embedding import get_text_embedding
# ---------- 配置 ----------
VECTOR_STORE_PATH = Path("vector_store.pkl")
CHUNK_JSON_PATH = Path("all_pdf_page_chunks.json")
# 如果 JSON 与向量库同时存在且哈希一致,就跳过 Embedding
CACHE_KEY_FILE = Path("vector_store.hash")
def _compute_file_hash(path: Path) -> str:
"""计算文件 SHA256"""
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def need_rebuild() -> bool:
"""判断是否需要重新 Embedding"""
if not VECTOR_STORE_PATH.exists() or not CACHE_KEY_FILE.exists():
return True
with open(CACHE_KEY_FILE, "r", encoding="utf-8") as f:
cached_hash = f.read().strip()
return cached_hash != _compute_file_hash(CHUNK_JSON_PATH)
def build_vector_store():
if not CHUNK_JSON_PATH.exists():
raise FileNotFoundError(CHUNK_JSON_PATH)
if not need_rebuild():
print("✅ 向量库已是最新,跳过 Embedding。")
return
print("🔄 加载 JSON...")
with open(CHUNK_JSON_PATH, "r", encoding="utf-8") as f:
chunks: List[Dict[str, Any]] = json.load(f)
texts = [c["content"] for c in chunks]
print(f"共 {len(texts)} 段文本,开始 Embedding...")
embeddings = get_text_embedding(
texts,
api_key=os.getenv("LOCAL_API_KEY"),
base_url=os.getenv("LOCAL_BASE_URL"),
embedding_model=os.getenv("LOCAL_EMBEDDING_MODEL"),
batch_size=64
)
print("✅ Embedding 完成,写入向量库...")
# 持久化
with open(VECTOR_STORE_PATH, "wb") as f:
pickle.dump({"chunks": chunks, "embeddings": embeddings}, f)
# 写缓存哈希
with open(CACHE_KEY_FILE, "w", encoding="utf-8") as f:
f.write(_compute_file_hash(CHUNK_JSON_PATH))
print(f"🎉 向量库已保存到 {VECTOR_STORE_PATH.resolve()}")
if __name__ == "__main__":
build_vector_store()
rag_eval.py如下:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
rag_eval.py
功能:加载 vector_store.pkl,完成语义检索 + LLM 问答
输出:rag_top1_pred.json(可直接提交)
依赖:pickle, numpy, tqdm, openai, dotenv
"""
import os
import json
import pickle
import numpy as np
from pathlib import Path
from typing import List, Dict, Any
from tqdm import tqdm
from dotenv import load_dotenv
from openai import OpenAI
# ----------------- 环境 -----------------
load_dotenv()
# ----------------- 工具函数 -----------------
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> np.ndarray:
a_norm = a / (np.linalg.norm(a, axis=1, keepdims=True) + 1e-8)
b_norm = b / (np.linalg.norm(b) + 1e-8)
return a_norm @ b_norm
# ----------------- 简易向量库 -----------------
class VectorStore:
def __init__(self, pkl_path: Path):
with open(pkl_path, "rb") as f:
data = pickle.load(f)
self.chunks: List[Dict[str, Any]] = data["chunks"]
self.embeddings: np.ndarray = np.array(data["embeddings"], dtype=np.float32)
def search(self, query_emb: np.ndarray, top_k: int = 5) -> List[Dict[str, Any]]:
scores = cosine_similarity(self.embeddings, query_emb).flatten()
top_idx = np.argsort(scores)[::-1][:top_k]
return [self.chunks[i] for i in top_idx]
# ----------------- Embedding -----------------
def embed_text(text: str) -> np.ndarray:
# 复用 get_text_embedding 单条接口
from get_text_embedding import get_text_embedding
emb = get_text_embedding(
[text],
api_key=os.getenv("LOCAL_API_KEY"),
base_url=os.getenv("LOCAL_BASE_URL"),
embedding_model=os.getenv("LOCAL_EMBEDDING_MODEL"),
batch_size=1
)
return np.array(emb[0], dtype=np.float32)
# ----------------- LLM 调用 -----------------
def llm_generate(question: str, context_chunks: List[Dict[str, Any]]) -> Dict[str, str]:
context = "\n".join([
f"[文件名]{c['metadata']['filename']} [页码]{c['metadata']['page']}\n{c['content']}"
for c in context_chunks
])
prompt = (
f"你是一名金融分析助手,请根据以下检索内容回答问题。\n"
f"请严格按照如下 JSON 格式返回:\n"
f'{{"answer": "简洁回答", "filename": "来源文件", "page": "页码"}}'
f"\n检索内容:\n{context}\n\n问题:{question}\n"
)
client = OpenAI(
api_key=os.getenv("LOCAL_API_KEY"),
base_url=os.getenv("LOCAL_BASE_URL")
)
resp = client.chat.completions.create(
model=os.getenv("LOCAL_TEXT_MODEL"),
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=1024
)
raw = resp.choices[0].message.content.strip()
# 简易解析
try:
out = json.loads(raw)
answer = out.get("answer", "")
filename = out.get("filename", "")
page = str(out.get("page", ""))
except Exception:
answer = raw
filename = context_chunks[0]["metadata"]["filename"] if context_chunks else ""
page = str(context_chunks[0]["metadata"]["page"]) if context_chunks else ""
return {"answer": answer, "filename": filename, "page": page}
# ----------------- 主流程 -----------------
def generate_answers(test_json: Path, out_json: Path, top_k: int = 5) -> None:
if not test_json.exists():
raise FileNotFoundError(test_json)
with open(test_json, "r", encoding="utf-8") as f:
test_data = json.load(f)
vector_store = VectorStore(Path("vector_store.pkl"))
results = []
for item in tqdm(test_data, desc="问答中"):
question = item["question"]
query_emb = embed_text(question)
chunks = vector_store.search(query_emb, top_k=top_k)
ans = llm_generate(question, chunks)
results.append({
"question": question,
"answer": ans["answer"],
"filename": ans["filename"],
"page": ans["page"]
})
with open(out_json, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"结果已保存到 {out_json}")
# ----------------- 入口 -----------------
if __name__ == "__main__":
generate_answers(
test_json=Path("./datas/test.json"),
out_json=Path("./rag_top1_pred.json"),
top_k=5
)
由于上面的程序运行太慢,优化速度版如下:
pip install faiss-cpu # GPU 用户用 faiss-gpu
pip install faiss-cpu # GPU 用户用 faiss-gpu
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
极速版 rag_eval.py
1. faiss 向量检索
2. 批量 Embedding
3. 并发 LLM
"""
import os, json, pickle, itertools, math, time
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
import numpy as np
from tqdm import tqdm
import faiss
from dotenv import load_dotenv
from openai import OpenAI
from typing import List, Dict, Any
load_dotenv()
# ----------------- 工具 -----------------
def cosine_normalize(x: np.ndarray) -> np.ndarray:
return x / (np.linalg.norm(x, axis=1, keepdims=True) + 1e-8)
# ----------------- 1. 向量库 -----------------
class FaissIndex:
def __init__(self, pkl_path: Path):
with open(pkl_path, "rb") as f:
data = pickle.load(f)
self.chunks = data["chunks"]
self.embeddings = np.array(data["embeddings"], dtype=np.float32)
self.embeddings = cosine_normalize(self.embeddings)
dim = self.embeddings.shape[1]
self.index = faiss.IndexFlatIP(dim) # 内积 == cosine
self.index.add(self.embeddings)
def search(self, query_emb: np.ndarray, top_k: int = 5) -> List[Dict[str, Any]]:
scores, idxs = self.index.search(query_emb, top_k)
return [self.chunks[i] for i in idxs[0]]
# ----------------- 2. 批量 Embedding -----------------
from get_text_embedding import get_text_embedding
def embed_batch(texts: List[str], batch_size: int = 64) -> np.ndarray:
return np.array(
get_text_embedding(
texts,
api_key=os.getenv("LOCAL_API_KEY"),
base_url=os.getenv("LOCAL_BASE_URL"),
embedding_model=os.getenv("LOCAL_EMBEDDING_MODEL"),
batch_size=batch_size
),
dtype=np.float32
)
# ----------------- 3. LLM 并发 -----------------
client = OpenAI(
api_key=os.getenv("LOCAL_API_KEY"),
base_url=os.getenv("LOCAL_BASE_URL")
)
model = os.getenv("LOCAL_TEXT_MODEL")
def llm_answer(question: str, chunks: List[Dict[str, Any]]) -> Dict[str, str]:
ctx = "\n".join([
f"[文件名]{c['metadata']['filename']} [页码]{c['metadata']['page']}\n{c['content']}"
for c in chunks
])
prompt = (
f"请回答用户问题,并返回 JSON:"
f'{{"answer": "...", "filename": "...", "page": "..."}}'
f"\n检索内容:\n{ctx}\n\n问题:{question}"
)
resp = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=512
)
raw = resp.choices[0].message.content.strip()
try:
out = json.loads(raw)
return {
"answer": out.get("answer", ""),
"filename": out.get("filename", ""),
"page": str(out.get("page", ""))
}
except Exception:
return {"answer": raw, "filename": "", "page": ""}
# ----------------- 主流程 -----------------
def main():
vector_store = FaissIndex(Path("vector_store.pkl"))
test_path = Path("./datas/test.json")
with open(test_path, "r", encoding="utf-8") as f:
questions = json.load(f)
# 批量 Embedding
q_texts = [q["question"] for q in questions]
q_embs = cosine_normalize(embed_batch(q_texts))
assert len(q_embs) == len(questions)
# 并发问答
def job(i):
q = questions[i]
chunks = vector_store.search(q_embs[i:i+1], top_k=5)
ans = llm_answer(q["question"], chunks)
return {
"question": q["question"],
"answer": ans["answer"],
"filename": ans["filename"],
"page": ans["page"]
}
with ThreadPoolExecutor(max_workers=10) as pool:
results = list(
tqdm(pool.map(job, range(len(questions))), total=len(questions), desc="并发问答")
)
out_path = Path("./rag_top1_pred.json")
with open(out_path, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"✅ 完成,共 {len(results)} 题 → {out_path}")
if __name__ == "__main__":
main()