【Lightrag 框架介绍与代码实战】

发布于:2025-06-21 ⋅ 阅读:(13) ⋅ 点赞:(0)

Lightrag 框架介绍与代码实战

Lightrag 是一个基于图结构的高效、轻量级 RAG(Retrieval-Augmented Generation)系统,旨在通过知识图谱增强大语言模型的推理能力。它能够从文档中提取实体和关系,构建图谱,并在查询时利用图结构进行高效的上下文检索,从而提升问答质量。

本指南将详细介绍 Lightrag 的代码框架结构、如何运行 demo 示例、启动服务以及可视化生成的知识图图谱。


一、Lightrag 项目结构简介

Lightrag 的核心目录结构如下:

lightrag/
├── lightrag/              # 核心代码模块
│   ├── api/               # 核心逻辑:api
│      ├── webui/          # 编译后的前端模块
│   ├── llm/               # 语言模块引擎模块
│   ├── server/            # Web 服务端代码(FastAPI + Vue 前端)
│   └── utils/             # 工具类函数:如日志、缓存、token 处理等
├── examples/              # 示例脚本
│   ├── lightrag_openai_compatible_demo.py         # 使用openai接口调用llm和embeding向量模型的,简单的 RAG 使用示例
│   └── graph_visual_with_html.py                  # 根据生成的graph_chunk_entity_relation.graphml转成html文件,以便可视化
├── lightrag_webui/                # lightrag的前端源码
├── lightrag-server         # 启动 Web 服务的主入口脚本
└── README.md               # 项目说明文档

当项目运行后,在运行目录下生成如下几个文件夹
├── dickens/ # 图数据,实体、切割文本等,也包括可以可视化知识图谱的graph_chunk_entity_relation.graphml等
├── inputs/ # upload的文档的存放路径

二、运行 Demo 示例

1. 运行 RAG Demo

新建.env到运行rag server的目录下

要给简单的.env的配置如下:

### This is sample file of .env


### Server Configuration
HOST=0.0.0.0
PORT=9621
WEBUI_TITLE='My Graph KB'
WEBUI_DESCRIPTION="Simple and Fast Graph Based RAG System"
OLLAMA_EMULATING_MODEL_TAG=latest
# WORKERS=2
# CORS_ORIGINS=http://localhost:3000,http://localhost:8080

SUMMARY_LANGUAGE=Chinese
### Number of duplicated entities/edges to trigger LLM re-summary on merge ( at least 3 is recommented)
# FORCE_LLM_SUMMARY_ON_MERGE=6
### Max tokens for entity/relations description after merge
# MAX_TOKEN_SUMMARY=500

### Number of parallel processing documents(Less than MAX_ASYNC/2 is recommended)
# MAX_PARALLEL_INSERT=2
### Chunk size for document splitting, 500~1500 is recommended
# CHUNK_SIZE=1200
# CHUNK_OVERLAP_SIZE=100

### LLM Configuration
ENABLE_LLM_CACHE=true
ENABLE_LLM_CACHE_FOR_EXTRACT=true
### Time out in seconds for LLM, None for infinite timeout
TIMEOUT=240
### Some models like o1-mini require temperature to be set to 1
TEMPERATURE=0
### Max concurrency requests of LLM
MAX_ASYNC=4
### MAX_TOKENS: max tokens send to LLM for entity relation summaries (less than context size of the model)
### MAX_TOKENS: set as num_ctx option for Ollama by API Server
MAX_TOKENS=32768
### LLM Binding type: openai, ollama, lollms, azure_openai
LLM_BINDING=openai
LLM_MODEL=Qwen2.5-32B-Instruct
LLM_BINDING_HOST=http://xx.225.xx.xxx:7919/v1
LLM_BINDING_API_KEY=openai
### Optional for Azure
# AZURE_OPENAI_API_VERSION=2024-08-01-preview
# AZURE_OPENAI_DEPLOYMENT=gpt-4o


### ticktoken cache
TIKTOKEN_CACHE_DIR=/home/test/ai/tiktoken_cache

### Embedding Configuration
### Embedding Binding type: openai, ollama, lollms, azure_openai
EMBEDDING_BINDING=openai
EMBEDDING_MODEL=bge-m3
EMBEDDING_DIM=1024
EMBEDDING_BINDING_API_KEY
# If the embedding service is deployed within the same Docker stack, use host.docker.internal instead of localhost
EMBEDDING_BINDING_HOST=http://xxx.xxx.xxx.XXX:port

参照examples下面openai和ollama可以写一个简单的demo如下:

import os
import asyncio
import inspect
import logging
import logging.config
from lightrag import LightRAG, QueryParam
from lightrag.llm.openai import openai_complete_if_cache
from lightrag.llm.ollama import ollama_embed
from lightrag.utils import EmbeddingFunc, logger, set_verbose_debug
from lightrag.kg.shared_storage import initialize_pipeline_status

from dotenv import load_dotenv

load_dotenv(dotenv_path=".env", override=False)

WORKING_DIR = "./dickens"


def configure_logging():
    """Configure logging for the application"""

    # Reset any existing handlers to ensure clean configuration
    for logger_name in ["uvicorn", "uvicorn.access", "uvicorn.error", "lightrag"]:
        logger_instance = logging.getLogger(logger_name)
        logger_instance.handlers = []
        logger_instance.filters = []

    # Get log directory path from environment variable or use current directory
    log_dir = os.getenv("LOG_DIR", os.getcwd())
    log_file_path = os.path.abspath(
        os.path.join(log_dir, "lightrag_compatible_demo.log")
    )

    print(f"\nLightRAG compatible demo log file: {log_file_path}\n")
    os.makedirs(os.path.dirname(log_dir), exist_ok=True)

    # Get log file max size and backup count from environment variables
    log_max_bytes = int(os.getenv("LOG_MAX_BYTES", 10485760))  # Default 10MB
    log_backup_count = int(os.getenv("LOG_BACKUP_COUNT", 5))  # Default 5 backups

    logging.config.dictConfig(
        {
            "version": 1,
            "disable_existing_loggers": False,
            "formatters": {
                "default": {
                    "format": "%(levelname)s: %(message)s",
                },
                "detailed": {
                    "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
                },
            },
            "handlers": {
                "console": {
                    "formatter": "default",
                    "class": "logging.StreamHandler",
                    "stream": "ext://sys.stderr",
                },
                "file": {
                    "formatter": "detailed",
                    "class": "logging.handlers.RotatingFileHandler",
                    "filename": log_file_path,
                    "maxBytes": log_max_bytes,
                    "backupCount": log_backup_count,
                    "encoding": "utf-8",
                },
            },
            "loggers": {
                "lightrag": {
                    "handlers": ["console", "file"],
                    "level": "INFO",
                    "propagate": False,
                },
            },
        }
    )

    # Set the logger level to INFO
    logger.setLevel(logging.INFO)
    # Enable verbose debug if needed
    set_verbose_debug(os.getenv("VERBOSE_DEBUG", "false").lower() == "true")


if not os.path.exists(WORKING_DIR):
    os.mkdir(WORKING_DIR)


async def llm_model_func(
    prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs
) -> str:
    return await openai_complete_if_cache(
        os.getenv("LLM_MODEL", "aerogpt"),
        prompt,
        system_prompt=system_prompt,
        history_messages=history_messages,
        api_key=os.getenv("LLM_BINDING_API_KEY") or os.getenv("OPENAI_API_KEY"),
        base_url=os.getenv("LLM_BINDING_HOST", "http://10.225.124.217:7933/v1"),
        **kwargs,
    )


async def print_stream(stream):
    async for chunk in stream:
        if chunk:
            print(chunk, end="", flush=True)


async def initialize_rag():
    rag = LightRAG(
        working_dir=WORKING_DIR,
        llm_model_func=llm_model_func,
        embedding_func=EmbeddingFunc(
            embedding_dim=int(os.getenv("EMBEDDING_DIM", "1024")),
            max_token_size=int(os.getenv("MAX_EMBED_TOKENS", "8192")),
            func=lambda texts: ollama_embed(
                texts,
                embed_model=os.getenv("EMBEDDING_MODEL", "bge-m3:latest"),
                host=os.getenv("EMBEDDING_BINDING_HOST", "http://10.221.161.19:11434"),
            ),
        ),
    )

    await rag.initialize_storages()
    await initialize_pipeline_status()

    return rag


async def main():
    try:
        # Clear old data files
        files_to_delete = [
            "graph_chunk_entity_relation.graphml",
            "kv_store_doc_status.json",
            "kv_store_full_docs.json",
            "kv_store_text_chunks.json",
            "vdb_chunks.json",
            "vdb_entities.json",
            "vdb_relationships.json",
        ]

        for file in files_to_delete:
            file_path = os.path.join(WORKING_DIR, file)
            if os.path.exists(file_path):
                os.remove(file_path)
                print(f"Deleting old file:: {file_path}")

        # Initialize RAG instance
        rag = await initialize_rag()

        # Test embedding function
        test_text = ["This is a test string for embedding."]
        embedding = await rag.embedding_func(test_text)
        embedding_dim = embedding.shape[1]
        print("\n=======================")
        print("Test embedding function")
        print("========================")
        print(f"Test dict: {test_text}")
        print(f"Detected embedding dimension: {embedding_dim}\n\n")

        with open("./工单排查记录1.txt", "r", encoding="utf-8") as f:
            await rag.ainsert(f.read())

        # Perform naive search
        print("\n=====================")
        print("Query mode: naive")
        print("=====================")
        resp = await rag.aquery(
            "What are the top themes in this story?",
            param=QueryParam(mode="naive", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        # Perform local search
        print("\n=====================")
        print("Query mode: local")
        print("=====================")
        resp = await rag.aquery(
            "What are the top themes in this story?",
            param=QueryParam(mode="local", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        # Perform global search
        print("\n=====================")
        print("Query mode: global")
        print("=====================")
        resp = await rag.aquery(
            "What are the top themes in this story?",
            param=QueryParam(mode="global", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

        # Perform hybrid search
        print("\n=====================")
        print("Query mode: hybrid")
        print("=====================")
        resp = await rag.aquery(
            "What are the top themes in this story?",
            param=QueryParam(mode="hybrid", stream=True),
        )
        if inspect.isasyncgen(resp):
            await print_stream(resp)
        else:
            print(resp)

    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        if rag:
            await rag.finalize_storages()


if __name__ == "__main__":
    # Configure logging before running the main function
    configure_logging()
    asyncio.run(main())
    print("\nDone!")

执行命令
python lightrag_xxx_demo.py即可。


三、启动 Web 服务

1. 创建 .env 文件

在项目根目录创建 .env 文件并配置参数(参考前文配置示例)。

2. 启动服务

python lightrag-server

默认访问地址为:http://0.0.0.0:9621

你可以通过浏览器访问该地址,进入图形界面进行文档上传、查询和图谱查看。


四、可视化生成的 Graph

Lightrag 支持以图形化方式展示从文档中提取的实体和关系图谱。

1. 使用内置 Web UI 可视化

服务启动后,访问 Web 页面,在“Graph”标签页下即可看到生成的图谱。节点代表实体,边代表实体之间的关系。

注意:图谱的复杂度取决于文档内容和抽取设置(如 CHUNK_SIZE、MAX_TOKEN_SUMMARY 等)。

2. 手动导出图谱数据

你也可以手动导出图谱数据(JSON 格式),然后使用 Gephi 或 Python 中的 [networkx + matplotlib] 来可视化。

from lightrag.core.graph import KnowledgeGraph

kg = KnowledgeGraph.load_from_file(f"{WORKING_DIR}/graph.json")
kg.visualize()  # 将生成一个 HTML 文件或图像

3.使用命令转化为Html文件

在生成的dickens目录下有这个文件 graph_chunk_entity_relation.graphml
examples下面有graph_visual_with_html.py这个文件,运行这个文件,可以把上面的文件生成html文件,直接用浏览器打开即可

五、常见问题与调试技巧

1. 分词器缺失错误

如果提示 o200k_base.tiktoken not found,请下载该文件并按哈希命名放入指定目录:

wget https://openaipublic.blob.core.windows.net/encodings/o200k_base.tiktoken
import hashlib
print(hashlib.sha1("https://openaipublic.blob.core.windows.net/encodings/o200k_base.tiktoken".encode()).hexdigest())

重命名为输出的哈希值,并放入 .env 中配置的 TIKTOKEN_CACHE_DIR 路径。


六、总结

Lightrag 提供了一个完整的图结构增强型 RAG 解决方案,支持本地部署、Web 服务和图谱可视化。其模块化设计便于扩展和集成到企业级应用中。通过本文档,你应该已经掌握了以下内容:

  • Lightrag 的项目结构与核心模块
  • 如何运行本地 RAG 和图谱 demo
  • 如何启动 Web 服务并进行图形化交互
  • 如何可视化生成的知识图谱
  • 常见问题的解决方法

如需深入定制或扩展功能,建议阅读官方文档或源码中的注释说明。


📌 提示:所有操作均应在具备 GPU 加速的环境中运行以获得最佳性能。


网站公告

今日签到

点亮在社区的每一天
去签到