这段 Python 代码是一个完整的 知识库数据库操作模块,用于对本地知识库系统中的知识库进行增删改查(CRUD)操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 @with_session
实现数据库会话管理。
📘 一、整体功能概述
该模块主要实现以下功能:
功能 | 描述 |
---|---|
✅ 添加知识库 | 如果不存在则添加,否则更新信息 |
✅ 列出所有知识库 | 可设置文件数量过滤条件 |
✅ 检查知识库是否存在 | 使用不区分大小写的匹配方式 |
✅ 加载知识库基本信息 | 获取名称、向量库类型、嵌入模型 |
✅ 删除知识库 | 支持按名称删除 |
✅ 获取知识库详细信息 | 返回字典格式数据,便于接口返回 |
此外还提供了一个测试入口函数,用于验证这些数据库操作是否正常工作。
🧠 二、代码结构详解
🔹 第一部分:导入依赖与模块说明
from server.db.models.knowledge_base_model import KnowledgeBaseModel
from server.db.session import with_session
KnowledgeBaseModel
:SQLAlchemy ORM 定义的知识库表模型with_session
:一个装饰器,自动注入 SQLAlchemy 的 session 对象,避免手动打开/关闭数据库连接
🔹 第二部分:核心数据库操作函数
1. add_kb_to_db(session, kb_name, kb_info, vs_type, embed_model)
🎯 功能:
- 向数据库中添加一个新的知识库
- 若已存在同名知识库,则更新其信息
📌 参数说明:
kb_name
: 知识库名称(唯一)kb_info
: 知识库简介(用于 AI agent 理解用途)vs_type
: 向量库类型(如 FAISS、Chroma)embed_model
: 嵌入模型名称(如 text-embedding-ada-002)
🔄 工作流程:
- 查询是否有同名知识库(忽略大小写)
- 若无,则创建并插入新记录
- 若有,则更新 info、向量库类型和嵌入模型字段
✅ 返回值:
始终返回 True
,表示执行成功
2. list_kbs_from_db(session, min_file_count: int = -1)
🎯 功能:
列出所有文件数量大于指定值的知识库名称列表
📌 参数说明:
min_file_count
: 最小文件数,用于筛选活跃的知识库
🔄 工作流程:
- 查询所有满足条件的
kb_name
- 将结果从
(kb_name,)
转换为kb_name
字符串列表
✅ 示例输出:
["test_kb", "math_kb", "law_kb"]
3. kb_exists(session, kb_name)
🎯 功能:
检查某个知识库是否存在于数据库中
📌 参数说明:
kb_name
: 知识库名称(支持模糊匹配,不区分大小写)
✅ 返回值:
布尔值,True
表示存在,False
表示不存在
4. load_kb_from_db(session, kb_name)
🎯 功能:
加载知识库的基本配置信息(名称、向量库类型、嵌入模型)
📌 参数说明:
kb_name
: 知识库名称(支持模糊匹配)
✅ 返回值:
元组形式 (kb_name, vs_type, embed_model)
,如果不存在则返回 (None, None, None)
5. delete_kb_from_db(session, kb_name)
🎯 功能:
根据知识库名称删除对应条目
📌 参数说明:
kb_name
: 知识库名称(支持模糊匹配)
✅ 返回值:
始终返回 True
,表示操作成功(即使未找到要删除的对象)
6. get_kb_detail(session, kb_name)
🎯 功能:
获取知识库的完整信息,包括:
- 名称
- 简介
- 向量库类型
- 嵌入模型
- 文件数量
- 创建时间
✅ 返回值:
- 存在时返回字典:
{ "kb_name": "test_kb", "kb_info": "这是一个用于测试的知识库", "vs_type": "FAISS", "embed_model": "text-embedding-ada-002", "file_count": 0, "create_time": datetime.datetime(...) }
- 不存在时返回空字典
{}
🔁 三、装饰器机制:@with_session
@with_session
def add_kb_to_db(session, kb_name, kb_info, vs_type, embed_model):
🧩 作用:
- 自动为你打开数据库会话(session)
- 函数执行完毕后自动提交事务或回滚异常
- 避免手动管理 session,提高代码可维护性
💡 类似于上下文管理器:
with get_db() as session:
# 执行数据库操作
📐 四、ORM 模型结构(来自 knowledge_base_model.py)
你使用的 KnowledgeBaseModel
应该是如下结构(简化版):
class KnowledgeBaseModel(Base):
__tablename__ = 'knowledge_base'
id = Column(Integer, primary_key=True, autoincrement=True)
kb_name = Column(String(50), unique=True, comment='知识库名称')
kb_info = Column(String(200), comment='知识库简介')
vs_type = Column(String(50), comment='向量库类型')
embed_model = Column(String(50), comment='嵌入模型名称')
file_count = Column(Integer, default=0, comment='文件数量')
create_time = Column(DateTime, default=func.now(), comment='创建时间')
🧪 五、测试入口函数(if name == “main”)
这是程序的主入口,用于运行测试流程。
✅ 测试步骤如下:
添加知识库
- 使用
add_kb_to_db()
添加一个名为test_kb
的知识库
- 使用
检查是否存在
- 使用
kb_exists()
判断知识库是否入库成功
- 使用
获取详细信息
- 使用
get_kb_detail()
输出当前知识库的所有字段信息
- 使用
加载基本信息
- 使用
load_kb_from_db()
获取kb_name
,vs_type
,embed_model
- 使用
列出所有知识库
- 使用
list_kbs_from_db()
获取当前数据库中所有知识库名称
- 使用
删除知识库
- 使用
delete_kb_from_db()
删除刚添加的知识库
- 使用
再次检查是否存在
- 确认删除是否成功
📋 六、测试流程示意
🧪 开始测试知识库数据库操作函数...
📌 正在添加知识库:test_kb
✅ 添加完成
🔍 知识库 test_kb 是否存在?是
📝 获取知识库 test_kb 的详细信息
🧠 加载知识库信息:test_kb, FAISS, text-embedding-ada-002
📋 当前数据库中的知识库列表:
['test_kb']
🗑️ 正在删除知识库:test_kb
✅ 删除完成
🔍 删除后,知识库 test_kb 是否还存在?否
🎉 所有测试通过!
📦 七、适用场景与扩展建议
✅ 适用场景:
场景 | 描述 |
---|---|
🧠 AI 助手后台 | 管理多个知识库,支持文档问答 |
📄 RAG 架构 | 记录每个知识库的向量库类型与嵌入模型 |
📈 数据统计 | 统计知识库文件数量、创建时间等 |
🧩 多任务适配 | 不同知识库使用不同 embedding 模型 |
🛠️ 推荐扩展方向:
扩展点 | 描述 |
---|---|
✅ 增加 hash 校验字段 | 避免重复上传相同文件 |
✅ 添加文件路径字段 | file_path TEXT ,方便定位实际文件位置 |
✅ 支持软删除 | is_deleted BOOLEAN DEFAULT False |
✅ 支持异步状态标记 | status ENUM('pending', 'processing', 'done') |
✅ 多线程安全优化 | 提升大规模并发访问效率 |
📊 八、实战代码部分展示:
#!/usr/bin/env python
# coding=utf-8
"""
@author: zgw
@date: 2025/6/7 16:07
@source from:
"""
from server.db.models.knowledge_base_model import KnowledgeBaseModel
from server.db.session import with_session
@with_session
def add_kb_to_db(session, kb_name, kb_info, vs_type, embed_model):
# 创建知识库实例
kb = session.query(KnowledgeBaseModel).filter(KnowledgeBaseModel.kb_name.ilike(kb_name)).first()
if not kb:
kb = KnowledgeBaseModel(kb_name=kb_name, kb_info=kb_info, vs_type=vs_type, embed_model=embed_model)
session.add(kb)
else: # update kb with new vs_type and embed_model
kb.kb_info = kb_info
kb.vs_type = vs_type
kb.embed_model = embed_model
return True
@with_session
def list_kbs_from_db(session, min_file_count: int = -1):
kbs = session.query(KnowledgeBaseModel.kb_name).filter(KnowledgeBaseModel.file_count > min_file_count).all()
kbs = [kb[0] for kb in kbs]
return kbs
@with_session
def kb_exists(session, kb_name):
kb = session.query(KnowledgeBaseModel).filter(KnowledgeBaseModel.kb_name.ilike(kb_name)).first()
status = True if kb else False
return status
@with_session
def load_kb_from_db(session, kb_name):
kb = session.query(KnowledgeBaseModel).filter(KnowledgeBaseModel.kb_name.ilike(kb_name)).first()
if kb:
kb_name, vs_type, embed_model = kb.kb_name, kb.vs_type, kb.embed_model
else:
kb_name, vs_type, embed_model = None, None, None
return kb_name, vs_type, embed_model
@with_session
def delete_kb_from_db(session, kb_name):
kb = session.query(KnowledgeBaseModel).filter(KnowledgeBaseModel.kb_name.ilike(kb_name)).first()
if kb:
session.delete(kb)
return True
@with_session
def get_kb_detail(session, kb_name: str) -> dict:
kb: KnowledgeBaseModel = session.query(KnowledgeBaseModel).filter(KnowledgeBaseModel.kb_name.ilike(kb_name)).first()
if kb:
return {
"kb_name": kb.kb_name,
"kb_info": kb.kb_info,
"vs_type": kb.vs_type,
"embed_model": kb.embed_model,
"file_count": kb.file_count,
"create_time": kb.create_time,
}
else:
return {}
# ========================
# 测试入口
# ========================
if __name__ == "__main__":
test_kb_name = "test_kb"
test_kb_info = "这是一个用于测试的知识库"
test_vs_type = "FAISS"
test_embed_model = "text-embedding-ada-002"
print("🧪 开始测试知识库数据库操作函数...")
# 1. 添加知识库
print(f"📌 正在添加知识库:{test_kb_name}")
add_kb_to_db(
kb_name=test_kb_name,
kb_info=test_kb_info,
vs_type=test_vs_type,
embed_model=test_embed_model
)
print("✅ 添加完成")
# 2. 检查是否存在
exists = kb_exists(kb_name=test_kb_name)
print(f"🔍 知识库 {test_kb_name} 是否存在?{'是' if exists else '否'}")
assert exists is True, "❌ 添加知识库失败"
# 3. 获取详细信息
print(f"📝 获取知识库 {test_kb_name} 的详细信息")
detail = get_kb_detail(test_kb_name)
print("Detail:", detail)
# 4. 加载基本信息
kb_name, vs_type, embed_model = load_kb_from_db(test_kb_name)
print(f"🧠 加载知识库信息:{kb_name}, {vs_type}, {embed_model}")
assert kb_name == test_kb_name, "❌ 加载知识库名称错误"
assert vs_type == test_vs_type, "❌ 向量库类型不一致"
assert embed_model == test_embed_model, "❌ 嵌入模型不一致"
# 5. 列出所有知识库
print("📋 当前数据库中的知识库列表:")
all_kbs = list_kbs_from_db(min_file_count=-1)
print(all_kbs)
assert test_kb_name in all_kbs, "❌ 列表中未找到刚添加的知识库"
# 6. 删除知识库
print(f"🗑️ 正在删除知识库:{test_kb_name}")
delete_kb_from_db(test_kb_name)
print("✅ 删除完成")
# 7. 再次检查是否存在
exists_after_delete = kb_exists(kb_name=test_kb_name)
print(f"🔍 删除后,知识库 {test_kb_name} 是否还存在?{'是' if exists_after_delete else '否'}")
assert exists_after_delete is False, "❌ 删除失败"
print("🎉 所有测试通过!")