Ragflow 源码:ragflow_server.py

发布于:2025-06-27 ⋅ 阅读:(11) ⋅ 点赞:(0)

介绍

ragflow_server.py 是 RAGFlow 项目的主服务器程序,负责启动和管理 RAGFlow 的核心服务。以下是它的主要功能和作用:

1. 初始化和配置

  • 日志初始化:使用 initRootLogger 初始化根日志记录器
  • 版本显示:打印 RAGFlow 的 ASCII 艺术标志和版本信息
  • 配置加载
    • 加载 API 设置 (settings.init_settings())
    • 打印 RAG 相关配置 (print_rag_settings())
    • 支持调试模式 (RAGFLOW_DEBUGPY_LISTEN 环境变量)

2. 数据库管理

  • 数据库初始化
    • init_web_db() 初始化 Web 数据库表
    • init_web_data() 初始化基础数据
  • 运行时配置:通过 RuntimeConfig 类管理运行时配置

3. 核心功能

  • 文档进度更新
    • 启动后台线程 (update_progress) 定期更新文档处理进度
    • 使用 Redis 分布式锁确保线程安全
  • 插件管理:通过 GlobalPluginManager.load_plugins() 加载插件

4. HTTP 服务

  • Web 服务启动
    • 使用 Werkzeug 的 run_simple 启动 HTTP 服务器
    • 支持多线程和调试模式
    • 监听指定的主机 IP 和端口

5. 信号处理

  • 注册信号处理器 (signal_handler) 用于优雅关闭
  • 处理 SIGINT (Ctrl+C) 和 SIGTERM 信号

6. 调试支持

  • 支持通过 debugpy 进行远程调试
  • 可通过 --debug 参数启用调试模式

流程图

系统架构

主程序 日志系统 数据库 配置系统 后台线程 HTTP服务器 Debugger Redis DocumentService 初始化日志(initRootLogger) 打印版本和配置 初始化数据库(init_web_db) 初始化数据(init_web_data) 解析命令行参数 初始化运行时配置 加载插件 启动debugpy监听 alt [调试模式] 启动文档进度更新线程 获取分布式锁 定期更新进度 启动Werkzeug服务器(run_simple) 处理HTTP请求 每6秒更新进度 loop [运行中] 设置停止事件(stop_event) 关闭服务器 opt [收到终止信号] 主程序 日志系统 数据库 配置系统 后台线程 HTTP服务器 Debugger Redis DocumentService

代码解释

1. 初始化系统

# 日志初始化(结构化日志记录)
initRootLogger("ragflow_server")  # 创建根日志器,输出到文件和控制台

# 配置加载(分层配置体系)
settings.init_settings()          # 加载api/settings.py基础配置
print_rag_settings()             # 打印rag/settings.py的RAG专用配置
show_configs()                   # 显示当前运行环境配置

# 数据库初始化(ORM迁移+基础数据)
init_web_db()                    # 创建所有数据库表结构
init_web_data()                  # 初始化管理员账号等基础数据

2. 运行时控制

# 命令行参数解析
parser = argparse.ArgumentParser()
parser.add_argument("--debug", action="store_true")  # 调试模式开关
args = parser.parse_args()

# 运行时配置(动态环境变量)
RuntimeConfig.init_env()          # 加载环境变量
RuntimeConfig.init_config(       # 初始化核心参数
    JOB_SERVER_HOST=settings.HOST_IP,
    HTTP_PORT=settings.HOST_PORT
)

3. 核心服务

# 文档进度更新服务(分布式锁保障)
def update_progress():
    redis_lock = RedisDistributedLock("update_progress", timeout=60)
    while not stop_event.is_set():
        try:
            if redis_lock.acquire():          # 获取Redis分布式锁
                DocumentService.update_progress()  # 批量更新文档处理状态
            stop_event.wait(6)                # 每6秒执行一次
        finally:
            redis_lock.release()               # 确保锁释放

# HTTP服务(Werkzeug增强服务器)
run_simple(
    hostname=settings.HOST_IP,    # 支持0.0.0.0监听
    port=settings.HOST_PORT,
    application=app,              # Flask应用对象
    threaded=True,                # 多线程模式
    use_reloader=RuntimeConfig.DEBUG  # 调试时自动重载
)


网站公告

今日签到

点亮在社区的每一天
去签到