从 0 到 1 构建 Graph RAG 系统:本地图谱 + 通义千问落地实践

发布于:2025-06-24 ⋅ 阅读:(18) ⋅ 点赞:(0)

引言

本文适用于希望快速构建图谱增强问答系统(Graph RAG)的人群,涵盖从本地图谱构建到接入 Neo4j 的完整过程,结合通义千问进行问答。
Graph RAG 是将“领域知识图谱 + 语义检索 + 大模型问答”结合起来的智能问答系统,旨在让大家能够像问一个资深专家一样与系统交流,获取结构化、精准、上下文相关的答案。

一、项目背景与目标(Why)

  1. 场景介绍:
    在研发和数据分析过程中,我们每天都面临海量的结构化、非结构化信息:如 ECU 通信协议、故障树(FTA/FMEA)、设计文档、案例数据库、trace 日志说明、以及大量技术问答。这些知识分散、冗余、版本不一致,靠全文检索很难精准获取关键内容。相比传统的全文检索或向量搜索,Graph RAG 引入图结构,能够更精准表达实体间的语义关系,减少大模型“幻觉”,让知识更可信、可解释。

为了解决以上问题,我们构建了一个 “基于知识图谱的检索增强生成系统”(Graph-RAG):

  • 支持结构化+非结构化混合检索

  • 支持多轮技术问答

  • 支持故障树、部件依赖图等知识图谱关联理解

  • 支持接入大模型进行生成式问答(如通义千问)

  1. 项目目标:

    构建一个本地部署、私有可控的 Graph RAG(基于图结构增强的检索式问答)系统。

    结合大语言模型与知识图谱,提升问答的准确性、可解释性和可控性。

目标 说明
建立知识图谱 把领域知识显式结构化,避免大模型“幻觉”
实现语义增强检索 用户提问后,能根据图谱语义精确召回相关节点
支持生成式问答 不只是返回段落,还能基于图谱和文档组织答案
融合上下游系统 支持与 trace 数据、测试平台等接口融合

二、核心技术架构(How)

我会用一张系统结构图,简明扼要地讲清以下组件:
目录结构

graph_rag/
├── data/                         # 存储 PDF 文档或知识数据源
│   └── Core Services Handbuch.pdf
├── rag/                          # 核心模块代码集中此处
│   ├── __init__.py
│   ├── document_loader.py        # 读取和解析文档
│   ├── graph_builder.py          # 图构建 + Neo4j 连接整合
│   ├── graph_retriever.py        # 检索逻辑(包含 Agent)
│   ├── prompt_util.py            # Prompt 组装逻辑
│   └── qwen_client.py            # LLM API 封装
├── ui/                           # 若保留 Gradio 接口,则放这里
│   └── app.py
├── main.py                       # 项目入口
├── docker_Neo4j.sh               # Neo4j 部署脚本
├── requirements.txt              # 依赖管理

系统流程图

[ 文档加载 ][ 图谱构建(Graph / Neo4j) ][ 关键词图谱检索 GraphRetriever ][ 上下文拼接 + Prompt 构造 ][ 通义千问 LLM 生成式问答 ]

三、成果展示(What)

1. 核心demo

1.1 graph_builder.py:图谱构建与检索

使用 Neo4j 本地图数据库
支持 Cypher 查询构造,子图查询
提供图数据的导入、节点与边关系构建封装
from py2neo import Graph as Neo4jGraph
from typing import List, Dict
import re
from neo4j import GraphDatabase
from collections import defaultdict


# -------- 本地内存图结构构建 --------
class Graph:
    def __init__(self):
        self.nodes = set()
        self.edges = defaultdict(list)

    def add_entity(self, entity: str):
        """向图中添加一个实体节点"""
        self.nodes.add(entity)

    def add_relation(self, source: str, relation: str, target: str):
        self.edges[source].append((relation, target))
        self.nodes.add(source)
        self.nodes.add(target)

    def build_from_text(self, text: str):
        for line in text.split("\n"):
            # 中文关系句式
            cn_relations = re.findall(r'(\w+)[\s]*(连接|依赖|包括|属于|指向|调用|控制|访问)[\s]*(\w+)', line)
            for src, _, tgt in cn_relations:
                self.add_relation(src, tgt)

            # 英文关系句式
            en_relations = re.findall(r'(\w+)[\s]*(includes|connects|depends|calls|belongs|uses|controls|accesses)[\s]*(\w+)', line)
            for src, _, tgt in en_relations:
                self.add_relation(src, tgt)

    def get_related_nodes(self, query: str, depth=1):
        result = set()
        queue = [(query, 0)]
        while queue:
            node, d = queue.pop(0)
            if d > depth:
                continue
            result.add(node)
            for neighbor in self.edges.get(node, []):
                queue.append((neighbor, d + 1))
        return list(result)
    
    def get_context(self, keywords):
        matched_nodes = []
        for k in keywords:
            for node in self.nodes:
                if k.lower() in node.lower():
                    matched_nodes.append(node)

        expanded_nodes = set()
        for node in matched_nodes:
            expanded_nodes.add(node)
            for rel in self.edges.get(node, []):
                expanded_nodes.add(rel[1])  # 关系目标节点

        context = []
        for source in expanded_nodes:
            for relation, target in self.edges.get(source, []):
                context.append(f"{source} --{relation}--> {target}")

        return "\n".join(context)
    
    def close(self):
        pass  # 如果你不需要关闭任何连接

# -------- 持久化图操作(Neo4j) --------
class Neo4jGraph:
    def __init__(self, uri="bolt://localhost:7687", user="neo4j", password="test123"):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    def close(self):
        self.driver.close()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def add_entity(self, name, label="Entity"):
        with self.driver.session() as session:
            session.run(f"MERGE (e:{label} {{name: $name}})", name=name)

    def add_relation(self, subj, rel, obj):
        with self.driver.session() as session:
            session.run(f"""
                MERGE (a:Entity {{name: $subj}})
                MERGE (b:Entity {{name: $obj}})
                MERGE (a)-[r:{rel.upper()}]->(b)
            """, subj=subj, obj=obj)

    def get_context(self, keywords, limit=3):
        query = """
        MATCH (e:Entity)
        WHERE e.name IN $keywords
        WITH collect(e.name) as ents
        MATCH (a:Entity)-[r]->(b:Entity)
        WHERE a.name IN ents OR b.name IN ents
        RETURN a.name AS subject, type(r) AS relation, b.name AS object
        LIMIT $limit
        """
        with self.driver.session() as session:
            results = session.run(query, keywords=keywords, limit=limit)
            triples = []
            for record in results:
                triples.append(f"{record['subject']} {record['relation']} {record['object']}")
            return triples

# 示例代码块(可删除)
if __name__ == "__main__":
    # 测试内存图
    g = Graph()
    g.build_from_text("设备A连接设备B\n设备B依赖设备C")
    print("Memory Graph:", g.get_related_nodes("设备A"))

    # 测试Neo4j图
    with Neo4jGraph() as db:
        db.add_entity("Active Directory")
        db.add_entity("DHCP")
        db.add_entity("DNS")
        db.add_relation("Active Directory", "MANAGES", "Network Resources")
        db.add_relation("DHCP", "ASSIGNS", "IP Addresses")
        db.add_relation("DNS", "TRANSLATES", "Domain Names")
        print("Neo4j Graph:", db.get_context(["DHCP", "DNS"]))

1.2 图谱检索模块graph_retriever.py:从实体关系中提取上下文片段
该模块用于基于关键词从图谱中定位相关节点,并提取其关联上下文,供 LLM 构造回答。

from rag.prompt_util import build_prompt
from rag.qwen_client import QwenClient
from rag.graph_builder import Graph
import re

class GraphRetriever:
    def __init__(self, graph: Graph, raw_text: str):
        self.graph = graph
        self.raw_text = raw_text
        self.lines = self.raw_text.split("\n")

    def retrieve(self, query: str):
        keywords = re.findall(r'\w+', query.lower())
        # 用 query 中的每个关键词去匹配图中的节点
        matched_nodes = []
        for k in keywords:
            for node in self.graph.nodes:
                if k.lower() in node.lower():
                    matched_nodes.append(node)

        # 如果找到了相关节点,就用图结构扩展它们
        expanded_nodes = set()
        for n in matched_nodes:
            expanded_nodes.update(self.graph.get_related_nodes(n))

        print("🔍 相关实体节点(宽匹配+扩展):", expanded_nodes)

        # 从文本中找到相关段落
        matched_lines = [line for line in self.lines if any(n in line for n in expanded_nodes)]
        print("📄 检索到的上下文行数:", len(matched_lines))

        return "\n".join(matched_lines)

    def ask(self, query: str):
        context = self.retrieve(query)
        prompt = f"根据以下文档内容回答问题:\n{context}\n\n问题:{query}"
        return QwenClient().chat(prompt)  # ✅ 使用封装好的模型调用

1.3 prompt_util.py:模板化提示词管理

管理系统 Prompt 模板(如“你是一个图谱问答专家…”)
支持中英文 Prompt 切换
方便后期进行 Prompt tuning
def build_prompt(context_triples, question):
    prompt = "你是IT领域的专业问答助手,以下是相关知识点,请基于这些内容回答问题:\n"
    for i, triple in enumerate(context_triples, 1):
        prompt += f"知识点{i}: {triple}\n"
    prompt += f"请简洁回答以下问题:{question}\n"
    return prompt

1.4 qwen_client.py:通义千问调用封装

使用 vLLM 或 OpenAI 接口兼容结构调用通义千问本地服务
支持自动翻译、语言切换
返回内容结构清晰,方便后续解析
import requests

class QwenClient:
    def __init__(self):
        self.api_url = "http://localhost:8000/v1"  # 通义千问部署 API 地址
        self.api_key = "vllm"  # API 密钥(如有验证机制)
        self.embedding_model = "bge-base-zh"
        self.headers = {
            "Content-Type": "application/json",
            # 如果接口需要token认证,可以添加Authorization字段
            # "Authorization": f"Bearer {self.api_key}"
        }

    def chat(self, prompt: str) -> str:
        payload = {
            "model": "Qwen3-32B-AWQ",
            "messages": [
                {"role": "system", "content": "你是一个智能助手"},
                {"role": "user", "content": prompt}
            ]
        }

        try:
            response = requests.post(self.api_url, headers=self.headers, json=payload, timeout=30)
            response.raise_for_status()
            data = response.json()
            return data["choices"][0]["message"]["content"]
        except Exception as e:
            return f"Qwen 调用失败:{str(e)}"

1.5 ui/app.py:图谱 + 问答 UI 界面

使用 Streamlit 构建交互界面
图谱展示用 pyvis 或 Neo4j Browser
支持输入问题、显示结果、子图可视化
import gradio as gr
from rag.document_loader import load_document
from rag.graph_builder import Graph
from rag.graph_retriever import GraphRetriever

# 初始化图谱
raw_text = load_document("./data/Core Services Handbuch v1.00.pdf")
graph = Graph()
graph.build_from_text(raw_text)
retriever = GraphRetriever(graph, raw_text)

def qa(query):
    if not query.strip():
        return "请输入问题"
    return retriever.ask(query)

gr.Interface(
    fn=qa,
    inputs=gr.Textbox(label="请输入你的问题"),
    outputs=gr.Textbox(label="回答"),
    title="Graph RAG QA(图谱问答)",
    description="支持基于图结构的本地文档问答,模型为通义千问 Qwen3-32B",
).launch(
    server_name="0.0.0.0",  # ⬅️ 关键修改:绑定到 0.0.0.0
    server_port=7860        # ⬅️ 你也可以修改为你想要的端口号
)

2. 使用案例

以一个汽车行业知识图谱为例:

问题:“这辆车的刹车系统是否与ESP系统有关?”

系统流程:

graph_builder.py 查询出刹车系统与ESP相关的子图

qwen_client.py 根据子图与问题生成回答

ui/app.py 展示子图 + 回答:“ESP系统通过制动控制与刹车系统相关联……”

3. 依赖启动流程

#1. 创建虚拟环境(推荐 Python 3.10)
conda create -n graph_rag python=3.10 -y conda activate graph_rag

#2. 安装依赖
pip install -r requirements.txt

#3. 启动本地 Neo4j 图数据库容器
bash docker_Neo4j.sh

#4. 下载 spaCy 英文模型
python -m spacy download en_core_web_sm

#5. 运行 Gradio 界面
python main.py

#6. 浏览器打开 http://192.168.0.108:7860 输入问题即可

四、项目优势与创新点(Value)

图谱增强:不仅依赖语义匹配,而是走图关系,精准定位知识。
本地部署:数据隐私可控,模型离线可用。
Agent 驱动:支持多轮对话、复杂问题推理。
模块解耦:每部分可独立扩展,比如替换模型、接其他知识源。

五、系统部署情况

依赖环境

  • Python ≥ 3.10

  • Neo4j Desktop 或 Neo4j Server(默认 7687 端口)

  • 通义千问本地模型(如 Qwen-7B-Chat + vLLM)

项目模块 运行方式
后端服务 Python + FastAPI,已容器化,部署在内网 GPU 服务器
图数据库 Neo4j 5.0,运行在 Docker 容器中
大模型 通义千问,本地部署 / 内部 API
UI 界面 Streamlit + Websocket 支持

六、未来计划与协作建议(Next Steps)

支持多图谱融合(如文本+设备图谱)

多语言问答支持

图谱自动更新(增量构建机制)

接入 LangChain、Flowise 等生态工具

欢迎团队协作,共建插件模块(如:知识上传、对话分析)

网站公告

今日签到

点亮在社区的每一天
去签到