LangChain VectorStores核心:多向量数据库统一交互层与RAG存储中枢
目录
1. 核心定义与价值
1.1 本质定位
VectorStores 是LangChain框架中向量数据库的统一抽象接口,作为RAG(检索增强生成)系统的核心存储中枢,为不同向量数据库提供一致的操作接口。
核心价值:
- 统一抽象:为50+种向量数据库提供一致的API接口
- RAG中枢:作为检索增强生成系统的核心存储层
- 开发简化:屏蔽底层数据库差异,降低开发复杂度
- 生态协同:与Embeddings、Retrievers无缝集成
1.2 核心痛点解决
传统直接对接向量数据库的困境:
# 传统方式:直接使用不同数据库SDK
# Chroma
import chromadb
client = chromadb.Client()
collection = client.create_collection("docs")
collection.add(documents=texts, embeddings=embeddings, ids=ids)
results = collection.query(query_embeddings=[query_embedding], n_results=5)
# Pinecone
import pinecone
pinecone.init(api_key="xxx")
index = pinecone.Index("docs")
index.upsert(vectors=[(id, embedding, metadata)])
results = index.query(vector=query_embedding, top_k=5)
# Qdrant
from qdrant_client import QdrantClient
client = QdrantClient("localhost", port=6333)
client.upsert(collection_name="docs", points=[...])
results = client.search(collection_name="docs", query_vector=query_embedding, limit=5)
VectorStores统一接口的优势:
# LangChain VectorStores统一方式
from langchain_chroma import Chroma
from langchain_pinecone import Pinecone
from langchain_qdrant import Qdrant
from langchain_openai import OpenAIEmbeddings
# 统一的接口,可随时切换数据库
embeddings = OpenAIEmbeddings()
# 任选一种,接口完全一致
vectorstore = Chroma(embedding_function=embeddings)
# vectorstore = Pinecone(embedding=embeddings, index_name="docs")
# vectorstore = Qdrant(embeddings=embeddings, collection_name="docs")
# 统一的操作方式
vectorstore.add_documents(documents)
results = vectorstore.similarity_search(query, k=5)
1.3 RAG流程中的存储中枢地位
1.4 核心能力概述
VectorStores支持的关键操作:
- 文档管理:
add_documents()
,delete()
,get_by_ids()
- 相似性检索:
similarity_search()
,similarity_search_with_score()
- 多样化检索:
max_marginal_relevance_search()
(MMR算法) - 检索器集成:
as_retriever()
转换为Retriever对象 - 异步支持:所有操作的异步版本
- 元数据过滤:基于文档元数据的条件检索
2. 底层实现逻辑
2.1 核心接口定义
VectorStore抽象基类定义了统一的接口规范:
# libs/core/langchain_core/vectorstores/base.py
from abc import ABC, abstractmethod
from typing import Optional, Any, Callable
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
class VectorStore(ABC):
"""向量存储的统一接口"""
@abstractmethod
def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> list[Document]:
"""返回与查询最相似的文档"""
@abstractmethod
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[list[dict]] = None,
*,
ids: Optional[list[str]] = None,
**kwargs: Any,
) -> list[str]:
"""添加文本到向量存储"""
@classmethod
@abstractmethod
def from_texts(
cls,
texts: list[str],
embedding: Embeddings,
metadatas: Optional[list[dict]] = None,
**kwargs: Any,
) -> "VectorStore":
"""从文本创建向量存储实例"""
@property
def embeddings(self) -> Optional[Embeddings]:
"""访问嵌入模型对象"""
return None
def as_retriever(self, **kwargs: Any) -> VectorStoreRetriever:
"""转换为检索器对象"""
return VectorStoreRetriever(vectorstore=self, **kwargs)
2.2 多数据库适配原理
以Chroma和内存向量存储为例,展示适配机制:
2.2.1 Chroma适配实现
# libs/partners/chroma/langchain_chroma/vectorstores.py
from langchain_core.vectorstores import VectorStore
import chromadb
class Chroma(VectorStore):
"""Chroma向量数据库适配器"""
def __init__(
self,
collection_name: str = "langchain",
embedding_function: Optional[Embeddings] = None,
persist_directory: Optional[str] = None,
client_settings: Optional[chromadb.config.Settings] = None,
**kwargs
):
# 初始化Chroma客户端
if persist_directory:
self._client = chromadb.PersistentClient(path=persist_directory)
else:
self._client = chromadb.Client(settings=client_settings)
self._embedding_function = embedding_function
self._collection = self._client.get_or_create_collection(
name=collection_name
)
def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> list[Document]:
"""实现Chroma的相似性搜索"""
if self._embedding_function:
# 使用LangChain嵌入模型
query_embedding = self._embedding_function.embed_query(query)
results = self._collection.query(
query_embeddings=[query_embedding],
n_results=k,
**kwargs
)
else:
# 使用Chroma内置嵌入
results = self._collection.query(
query_texts=[query],
n_results=k,
**kwargs
)
# 转换为LangChain Document格式
return self._results_to_docs(results)
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[list[dict]] = None,
ids: Optional[list[str]] = None,
**kwargs: Any,
) -> list[str]:
"""添加文本到Chroma"""
texts_list = list(texts)
# 生成ID
if ids is None:
ids = [str(uuid.uuid4()) for _ in texts_list]
# 生成嵌入向量
embeddings = None
if self._embedding_function:
embeddings = self._embedding_function.embed_documents(texts_list)
# 存储到Chroma
self._collection.upsert(
documents=texts_list,
embeddings=embeddings,
metadatas=metadatas,
ids=ids
)
return ids
@classmethod
def from_texts(
cls,
texts: list[str],
embedding: Embeddings,
metadatas: Optional[list[dict]] = None,
**kwargs: Any,
) -> "Chroma":
"""从文本创建Chroma实例"""
vectorstore = cls(embedding_function=embedding, **kwargs)
vectorstore.add_texts(texts, metadatas)
return vectorstore
2.2.2 内存向量存储实现
# libs/core/langchain_core/vectorstores/in_memory.py
import numpy as np
from langchain_core.vectorstores.utils import cosine_similarity
class InMemoryVectorStore(VectorStore):
"""内存向量存储实现"""
def __init__(self, embedding: Embeddings):
self.embedding = embedding
self.store: dict[str, dict] = {} # {id: {vector, text, metadata}}
def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> list[Document]:
"""基于余弦相似度的搜索"""
if not self.store:
return []
# 查询向量化
query_embedding = self.embedding.embed_query(query)
# 计算相似度
similarities = []
for doc_id, doc_data in self.store.items():
similarity = cosine_similarity(
[query_embedding], [doc_data["vector"]]
)[0][0]
similarities.append((doc_id, similarity))
# 排序并返回top-k
similarities.sort(key=lambda x: x[1], reverse=True)
top_k = similarities[:k]
results = []
for doc_id, _ in top_k:
doc_data = self.store[doc_id]
results.append(Document(
id=doc_id,
page_content=doc_data["text"],
metadata=doc_data["metadata"]
))
return results
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[list[dict]] = None,
ids: Optional[list[str]] = None,
**kwargs: Any,
) -> list[str]:
"""添加文本到内存存储"""
texts_list = list(texts)
# 生成嵌入向量
embeddings = self.embedding.embed_documents(texts_list)
# 生成ID
if ids is None:
ids = [str(uuid.uuid4()) for _ in texts_list]
# 存储到内存
for i, text in enumerate(texts_list):
doc_id = ids[i]
self.store[doc_id] = {
"vector": embeddings[i],
"text": text,
"metadata": metadatas[i] if metadatas else {}
}
return ids
2.3 元数据管理逻辑
VectorStores通过Document对象统一管理文本内容和元数据:
from langchain_core.documents import Document
# Document结构
class Document:
page_content: str # 文档内容
metadata: dict # 元数据
id: Optional[str] # 文档ID
# 元数据使用示例
documents = [
Document(
page_content="LangChain是一个用于构建LLM应用的框架",
metadata={
"source": "langchain_docs.pdf",
"page": 1,
"category": "framework",
"author": "LangChain Team"
}
),
Document(
page_content="向量数据库用于存储和检索嵌入向量",
metadata={
"source": "vector_db_guide.pdf",
"page": 5,
"category": "database",
"difficulty": "intermediate"
}
)
]
# 添加到向量存储
vectorstore.add_documents(documents)
# 基于元数据过滤检索
results = vectorstore.similarity_search(
query="LangChain框架",
k=5,
filter={"category": "framework"} # 只检索框架相关文档
)
2.4 相似性算法封装
VectorStores抽象了不同数据库的相似性计算:
class VectorStore(ABC):
"""相似性评分函数"""
@staticmethod
def _cosine_relevance_score_fn(distance: float) -> float:
"""余弦距离转相似度评分"""
return 1.0 - distance
@staticmethod
def _euclidean_relevance_score_fn(distance: float) -> float:
"""欧几里得距离转相似度评分"""
return 1.0 - distance / math.sqrt(2)
@staticmethod
def _max_inner_product_relevance_score_fn(distance: float) -> float:
"""最大内积距离转相似度评分"""
if distance > 0:
return 1.0 - distance
return -1.0 * distance
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""选择合适的相似度函数"""
# 子类根据底层数据库特性选择合适的函数
raise NotImplementedError
def similarity_search_with_relevance_scores(
self, query: str, k: int = 4, **kwargs: Any
) -> list[tuple[Document, float]]:
"""返回文档和标准化的相似度评分(0-1)"""
relevance_score_fn = self._select_relevance_score_fn()
docs_and_scores = self.similarity_search_with_score(query, k, **kwargs)
return [(doc, relevance_score_fn(score)) for doc, score in docs_and_scores]
---
## 3. 代码实践
### 3.1 基础实践1:多向量数据库统一操作
#### 安装依赖
```bash
# 核心包
pip install langchain-core langchain-openai
# 向量数据库
pip install langchain-chroma chromadb
pip install langchain-pinecone pinecone-client
Chroma vs Pinecone 统一接口对比
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
import os
# 设置API密钥
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
os.environ["PINECONE_API_KEY"] = "your-pinecone-api-key"
# 初始化嵌入模型
embeddings = OpenAIEmbeddings()
# 准备测试文档
documents = [
Document(
page_content="LangChain是一个强大的LLM应用开发框架",
metadata={"source": "doc1", "category": "framework"}
),
Document(
page_content="向量数据库用于高效存储和检索嵌入向量",
metadata={"source": "doc2", "category": "database"}
),
Document(
page_content="RAG结合了检索和生成,提升LLM的知识能力",
metadata={"source": "doc3", "category": "technique"}
)
]
# 方案1:使用Chroma (本地存储)
from langchain_chroma import Chroma
print("=== Chroma向量存储 ===")
chroma_store = Chroma(
collection_name="test_collection",
embedding_function=embeddings,
persist_directory="./chroma_db" # 本地持久化
)
# 添加文档
chroma_ids = chroma_store.add_documents(documents)
print(f"Chroma添加文档ID: {chroma_ids}")
# 相似性搜索
chroma_results = chroma_store.similarity_search(
query="LangChain框架的特点",
k=2
)
print("Chroma搜索结果:")
for i, doc in enumerate(chroma_results):
print(f" {i+1}. {doc.page_content}")
print(f" 元数据: {doc.metadata}")
# 方案2:使用Pinecone (云端存储)
from langchain_pinecone import Pinecone
print("\n=== Pinecone向量存储 ===")
pinecone_store = Pinecone(
index_name="test-index", # 需要预先在Pinecone创建
embedding=embeddings
)
# 添加文档 (接口完全一致!)
pinecone_ids = pinecone_store.add_documents(documents)
print(f"Pinecone添加文档ID: {pinecone_ids}")
# 相似性搜索 (接口完全一致!)
pinecone_results = pinecone_store.similarity_search(
query="LangChain框架的特点",
k=2
)
print("Pinecone搜索结果:")
for i, doc in enumerate(pinecone_results):
print(f" {i+1}. {doc.page_content}")
print(f" 元数据: {doc.metadata}")
# 统一的切换函数
def create_vectorstore(store_type: str):
"""工厂函数:统一创建不同类型的向量存储"""
if store_type == "chroma":
return Chroma(
collection_name="unified_collection",
embedding_function=embeddings,
persist_directory="./chroma_db"
)
elif store_type == "pinecone":
return Pinecone(
index_name="unified-index",
embedding=embeddings
)
else:
raise ValueError(f"不支持的存储类型: {store_type}")
# 业务代码与具体数据库解耦
def search_documents(vectorstore, query: str, k: int = 3):
"""统一的文档搜索函数"""
results = vectorstore.similarity_search(query, k=k)
print(f"\n查询: '{query}'")
print(f"找到 {len(results)} 个相关文档:")
for i, doc in enumerate(results):
print(f" {i+1}. {doc.page_content[:50]}...")
print(f" 来源: {doc.metadata.get('source', 'unknown')}")
return results
# 可以随时切换数据库,业务逻辑不变
current_store = create_vectorstore("chroma") # 或 "pinecone"
search_documents(current_store, "向量数据库的优势")
3.2 基础实践2:元数据关联与过滤检索
from langchain_core.documents import Document
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
from typing import Dict, Any, Callable
# 初始化
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
collection_name="filtered_search",
embedding_function=embeddings,
persist_directory="./filtered_db"
)
# 创建带有丰富元数据的文档
documents = [
Document(
page_content="Python是一种高级编程语言,广泛用于数据科学和机器学习",
metadata={
"language": "python",
"difficulty": "beginner",
"category": "programming",
"tags": ["data-science", "ml"],
"author": "张三",
"publish_date": "2024-01-15"
}
),
Document(
page_content="JavaScript是Web开发的核心语言,用于前端和后端开发",
metadata={
"language": "javascript",
"difficulty": "intermediate",
"category": "programming",
"tags": ["web", "frontend"],
"author": "李四",
"publish_date": "2024-02-20"
}
),
Document(
page_content="Docker容器化技术简化了应用部署和管理",
metadata={
"language": "general",
"difficulty": "advanced",
"category": "devops",
"tags": ["container", "deployment"],
"author": "王五",
"publish_date": "2024-03-10"
}
),
Document(
page_content="React是流行的前端框架,用于构建用户界面",
metadata={
"language": "javascript",
"difficulty": "intermediate",
"category": "framework",
"tags": ["frontend", "ui"],
"author": "赵六",
"publish_date": "2024-01-25"
}
)
]
# 添加文档
vectorstore.add_documents(documents)
# 1. 基于单个字段过滤
print("=== 基于编程语言过滤 ===")
python_docs = vectorstore.similarity_search(
query="编程语言特点",
k=5,
filter={"language": "python"} # 只检索Python相关文档
)
for doc in python_docs:
print(f"- {doc.page_content}")
print(f" 语言: {doc.metadata['language']}")
# 2. 基于多个字段过滤
print("\n=== 基于多条件过滤 ===")
intermediate_programming = vectorstore.similarity_search(
query="开发技术",
k=5,
filter={
"category": "programming",
"difficulty": "intermediate"
}
)
for doc in intermediate_programming:
print(f"- {doc.page_content}")
print(f" 难度: {doc.metadata['difficulty']}, 分类: {doc.metadata['category']}")
# 3. 带评分的过滤检索
print("\n=== 带相似度评分的检索 ===")
results_with_scores = vectorstore.similarity_search_with_relevance_scores(
query="前端开发技术",
k=3,
filter={"category": "programming"}
)
for doc, score in results_with_scores:
print(f"- 相似度: {score:.3f}")
print(f" 内容: {doc.page_content}")
print(f" 标签: {doc.metadata.get('tags', [])}")
# 4. 复杂查询示例:组合多种条件
def advanced_search(
vectorstore,
query: str,
filters: Dict[str, Any] = None,
exclude_authors: list[str] = None,
min_score: float = 0.0
):
"""高级搜索函数"""
# 基础检索
results = vectorstore.similarity_search_with_relevance_scores(
query=query,
k=10, # 先获取更多结果
filter=filters or {}
)
# 后处理过滤
filtered_results = []
for doc, score in results:
# 评分过滤
if score < min_score:
continue
# 作者过滤
if exclude_authors and doc.metadata.get("author") in exclude_authors:
continue
filtered_results.append((doc, score))
return filtered_results
# 使用高级搜索
print("\n=== 高级搜索示例 ===")
advanced_results = advanced_search(
vectorstore=vectorstore,
query="开发框架和工具",
filters={"difficulty": "intermediate"},
exclude_authors=["张三"],
min_score=0.1
)
for doc, score in advanced_results:
print(f"- 评分: {score:.3f}, 作者: {doc.metadata['author']}")
print(f" {doc.page_content}")
3.3 进阶实践:VectorStores+Embeddings+Retriever联动
完整的RAG检索层实现:
# 安装额外依赖
pip install langchain langchain-community
from langchain_core.documents import Document
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter
import os
# 设置API密钥
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
class RAGSystem:
"""完整的RAG检索系统"""
def __init__(
self,
embedding_model: str = "text-embedding-3-small",
llm_model: str = "gpt-3.5-turbo",
persist_directory: str = "./rag_db"
):
# 初始化组件
self.embeddings = OpenAIEmbeddings(model=embedding_model)
self.llm = ChatOpenAI(model=llm_model, temperature=0)
# 初始化向量存储
self.vectorstore = Chroma(
collection_name="rag_collection",
embedding_function=self.embeddings,
persist_directory=persist_directory
)
# 文档分割器
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", "。", "!", "?", ".", "!", "?"]
)
# RAG提示模板
self.rag_prompt = ChatPromptTemplate.from_template("""
基于以下检索到的相关文档回答问题。如果文档中没有相关信息,请说明无法从提供的文档中找到答案。
相关文档:
{context}
问题: {question}
请提供准确、详细的回答:
""")
# 构建RAG链
self.retriever = None
self.rag_chain = None
def add_documents(self, texts: list[str], metadatas: list[dict] = None):
"""添加文档到知识库"""
# 分割长文档
all_chunks = []
all_metadatas = []
for i, text in enumerate(texts):
chunks = self.text_splitter.split_text(text)
base_metadata = metadatas[i] if metadatas else {}
for j, chunk in enumerate(chunks):
chunk_metadata = {
**base_metadata,
"chunk_id": f"{i}_{j}",
"chunk_index": j,
"total_chunks": len(chunks)
}
all_chunks.append(chunk)
all_metadatas.append(chunk_metadata)
# 添加到向量存储
doc_objects = [
Document(page_content=chunk, metadata=metadata)
for chunk, metadata in zip(all_chunks, all_metadatas)
]
ids = self.vectorstore.add_documents(doc_objects)
print(f"成功添加 {len(ids)} 个文档块到知识库")
return ids
def setup_retriever(
self,
search_type: str = "similarity",
search_kwargs: dict = None
):
"""配置检索器"""
default_kwargs = {"k": 4}
if search_kwargs:
default_kwargs.update(search_kwargs)
self.retriever = self.vectorstore.as_retriever(
search_type=search_type,
search_kwargs=default_kwargs
)
# 构建RAG处理链
def format_docs(docs):
"""格式化检索到的文档"""
formatted = []
for i, doc in enumerate(docs):
source = doc.metadata.get("source", "未知来源")
formatted.append(f"文档{i+1} (来源: {source}):\n{doc.page_content}")
return "\n\n".join(formatted)
self.rag_chain = (
{
"context": self.retriever | format_docs,
"question": RunnablePassthrough()
}
| self.rag_prompt
| self.llm
| StrOutputParser()
)
print(f"检索器配置完成: {search_type}, 参数: {default_kwargs}")
def search(self, query: str, k: int = 4, with_scores: bool = False):
"""直接检索相关文档"""
if with_scores:
results = self.vectorstore.similarity_search_with_relevance_scores(
query, k=k
)
return results
else:
results = self.vectorstore.similarity_search(query, k=k)
return results
def ask(self, question: str) -> str:
"""RAG问答"""
if not self.rag_chain:
raise ValueError("请先调用 setup_retriever() 配置检索器")
# 执行RAG链
answer = self.rag_chain.invoke(question)
return answer
def ask_with_sources(self, question: str) -> dict:
"""带来源信息的RAG问答"""
# 检索相关文档
retrieved_docs = self.retriever.invoke(question)
# 生成回答
answer = self.ask(question)
# 整理来源信息
sources = []
for doc in retrieved_docs:
source_info = {
"content": doc.page_content[:200] + "...",
"metadata": doc.metadata
}
sources.append(source_info)
return {
"answer": answer,
"sources": sources,
"num_sources": len(sources)
}
# 使用示例
def main():
# 创建RAG系统
rag = RAGSystem()
# 准备知识库文档
knowledge_docs = [
"""
LangChain是一个用于构建大语言模型应用的开源框架。它提供了丰富的组件和工具,
帮助开发者快速构建复杂的AI应用。LangChain的核心概念包括:
1. Chains(链):将多个组件串联起来形成处理流程
2. Agents(代理):能够使用工具并做出决策的智能体
3. Memory(记忆):在对话中保持上下文信息
4. Retrievers(检索器):从外部数据源检索相关信息
5. VectorStores(向量存储):存储和检索嵌入向量
LangChain支持多种大语言模型,包括OpenAI GPT、Anthropic Claude、
Google PaLM等,并提供统一的接口进行调用。
""",
"""
向量数据库是专门用于存储和检索高维向量数据的数据库系统。在AI应用中,
向量数据库主要用于:
1. 语义搜索:基于文本语义而非关键词匹配进行搜索
2. 推荐系统:根据用户偏好向量推荐相似内容
3. 图像检索:通过图像特征向量找到相似图像
4. 异常检测:识别与正常模式差异较大的数据点
常见的向量数据库包括:
- Chroma:轻量级的开源向量数据库
- Pinecone:云端向量数据库服务
- Qdrant:高性能的向量搜索引擎
- Weaviate:开源的向量搜索引擎
- FAISS:Facebook开源的相似性搜索库
""",
"""
RAG(Retrieval-Augmented Generation)是一种结合检索和生成的AI技术。
RAG的工作流程包括:
1. 文档预处理:将知识库文档分割成小块
2. 向量化:使用嵌入模型将文档块转换为向量
3. 存储:将向量存储到向量数据库中
4. 检索:根据用户查询检索相关文档块
5. 生成:将检索到的文档作为上下文,生成回答
RAG的优势:
- 知识更新:可以动态更新知识库而无需重训练模型
- 可解释性:可以追溯回答的来源文档
- 成本效益:比微调大模型成本更低
- 准确性:基于真实文档生成回答,减少幻觉
"""
]
# 文档元数据
metadatas = [
{"source": "langchain_guide.md", "category": "framework", "author": "技术团队"},
{"source": "vector_db_intro.md", "category": "database", "author": "数据团队"},
{"source": "rag_tutorial.md", "category": "technique", "author": "AI团队"}
]
# 添加文档到知识库
print("=== 构建知识库 ===")
rag.add_documents(knowledge_docs, metadatas)
# 配置不同类型的检索器
print("\n=== 配置检索器 ===")
# 1. 基础相似性检索
rag.setup_retriever(
search_type="similarity",
search_kwargs={"k": 3}
)
# 测试直接检索
print("\n=== 直接检索测试 ===")
search_results = rag.search("什么是向量数据库", k=2, with_scores=True)
for doc, score in search_results:
print(f"相似度: {score:.3f}")
print(f"内容: {doc.page_content[:100]}...")
print(f"来源: {doc.metadata['source']}")
print()
# 测试RAG问答
print("\n=== RAG问答测试 ===")
questions = [
"LangChain的核心概念有哪些?",
"向量数据库主要用于什么场景?",
"RAG技术的优势是什么?",
"如何选择合适的向量数据库?"
]
for question in questions:
print(f"\n问题: {question}")
print("-" * 50)
# 带来源的回答
result = rag.ask_with_sources(question)
print(f"回答: {result['answer']}")
print(f"\n参考来源 ({result['num_sources']} 个):")
for i, source in enumerate(result['sources']):
print(f" {i+1}. {source['metadata']['source']} "
f"(分类: {source['metadata']['category']})")
print(f" {source['content']}")
if __name__ == "__main__":
main()
4. 设计考量
4.1 解耦与扩展性分析
接口抽象的价值:
# 设计模式:策略模式 + 工厂模式
from abc import ABC, abstractmethod
from typing import Protocol
class VectorStoreStrategy(Protocol):
"""向量存储策略接口"""
def add_documents(self, documents: list[Document]) -> list[str]:
...
def similarity_search(self, query: str, k: int) -> list[Document]:
...
class RAGApplication:
"""RAG应用 - 依赖抽象而非具体实现"""
def __init__(self, vectorstore: VectorStoreStrategy):
self.vectorstore = vectorstore # 依赖注入
def process_query(self, query: str) -> str:
# 业务逻辑与具体数据库解耦
docs = self.vectorstore.similarity_search(query, k=3)
return self.generate_answer(docs, query)
def generate_answer(self, docs: list[Document], query: str) -> str:
# 生成逻辑...
pass
# 工厂函数:根据配置创建不同的向量存储
def create_vectorstore(config: dict) -> VectorStoreStrategy:
store_type = config.get("type")
if store_type == "chroma":
return Chroma(**config.get("chroma_params", {}))
elif store_type == "pinecone":
return Pinecone(**config.get("pinecone_params", {}))
elif store_type == "qdrant":
return Qdrant(**config.get("qdrant_params", {}))
else:
raise ValueError(f"不支持的向量存储类型: {store_type}")
# 配置驱动的应用
config = {
"type": "chroma", # 可以轻松切换为 "pinecone" 或 "qdrant"
"chroma_params": {
"collection_name": "my_collection",
"persist_directory": "./db"
}
}
vectorstore = create_vectorstore(config)
app = RAGApplication(vectorstore)
扩展性体现:
- 新数据库支持:只需实现VectorStore接口,无需修改业务代码
- 功能增强:可以在基类中添加新方法,所有子类自动继承
- 配置灵活:通过配置文件控制使用哪种数据库
- 测试友好:可以使用内存实现进行单元测试
4.2 开发者体验对比
传统方式 vs LangChain方式:
# === 传统方式:直接使用数据库SDK ===
class TraditionalRAG:
def __init__(self, db_type: str):
if db_type == "chroma":
import chromadb
self.client = chromadb.Client()
self.collection = self.client.create_collection("docs")
elif db_type == "pinecone":
import pinecone
pinecone.init(api_key="xxx")
self.index = pinecone.Index("docs")
# 每种数据库都需要不同的初始化代码...
def add_documents(self, texts: list[str], embeddings: list[list[float]]):
if hasattr(self, 'collection'): # Chroma
self.collection.add(
documents=texts,
embeddings=embeddings,
ids=[str(i) for i in range(len(texts))]
)
elif hasattr(self, 'index'): # Pinecone
vectors = [(str(i), emb, {"text": text})
for i, (text, emb) in enumerate(zip(texts, embeddings))]
self.index.upsert(vectors=vectors)
# 每种数据库都需要不同的添加逻辑...
def search(self, query_embedding: list[float], k: int):
if hasattr(self, 'collection'): # Chroma
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=k
)
return results['documents'][0]
elif hasattr(self, 'index'): # Pinecone
results = self.index.query(
vector=query_embedding,
top_k=k,
include_metadata=True
)
return [match['metadata']['text'] for match in results['matches']]
# 每种数据库都需要不同的搜索逻辑...
# === LangChain方式:统一接口 ===
class LangChainRAG:
def __init__(self, vectorstore: VectorStore):
self.vectorstore = vectorstore # 统一接口
def add_documents(self, documents: list[Document]):
return self.vectorstore.add_documents(documents) # 统一方法
def search(self, query: str, k: int):
return self.vectorstore.similarity_search(query, k=k) # 统一方法
# 开发者体验对比
print("传统方式问题:")
print("1. 每种数据库需要学习不同的API")
print("2. 切换数据库需要重写大量代码")
print("3. 测试困难,需要搭建真实数据库环境")
print("4. 错误处理和重试逻辑需要重复实现")
print("\nLangChain方式优势:")
print("1. 学会一套API,适用所有支持的数据库")
print("2. 切换数据库只需修改配置")
print("3. 可以使用InMemoryVectorStore进行测试")
print("4. 统一的错误处理和最佳实践")
4.3 功能完整性解释
VectorStores提供了完整的向量数据库操作功能:
# 完整功能矩阵
class VectorStoreFunctionality:
"""VectorStores功能完整性展示"""
# 1. 数据管理
def data_management_demo(self, vectorstore: VectorStore):
"""数据管理功能"""
# 添加文档
docs = [Document(page_content="test", metadata={"id": 1})]
ids = vectorstore.add_documents(docs)
# 批量添加文本
texts = ["text1", "text2", "text3"]
metadatas = [{"source": "doc1"}, {"source": "doc2"}, {"source": "doc3"}]
text_ids = vectorstore.add_texts(texts, metadatas=metadatas)
# 获取文档
retrieved_docs = vectorstore.get_by_ids(ids)
# 删除文档
vectorstore.delete(ids=ids[:1])
return {
"added_docs": len(ids),
"added_texts": len(text_ids),
"retrieved": len(retrieved_docs)
}
# 2. 搜索功能
def search_functionality_demo(self, vectorstore: VectorStore):
"""搜索功能展示"""
query = "测试查询"
# 基础相似性搜索
basic_results = vectorstore.similarity_search(query, k=3)
# 带评分的搜索
scored_results = vectorstore.similarity_search_with_score(query, k=3)
# 带标准化评分的搜索
relevance_results = vectorstore.similarity_search_with_relevance_scores(
query, k=3
)
# 向量搜索
embedding = [0.1] * 1536 # 假设的查询向量
vector_results = vectorstore.similarity_search_by_vector(embedding, k=3)
# MMR搜索(最大边际相关性)
mmr_results = vectorstore.max_marginal_relevance_search(
query, k=3, fetch_k=6, lambda_mult=0.5
)
return {
"basic": len(basic_results),
"scored": len(scored_results),
"relevance": len(relevance_results),
"vector": len(vector_results),
"mmr": len(mmr_results)
}
# 3. 异步支持
async def async_functionality_demo(self, vectorstore: VectorStore):
"""异步功能展示"""
# 异步添加
docs = [Document(page_content="async test")]
await vectorstore.aadd_documents(docs)
# 异步搜索
results = await vectorstore.asimilarity_search("test query", k=3)
# 异步删除
await vectorstore.adelete(ids=["test_id"])
return len(results)
# 4. 检索器集成
def retriever_integration_demo(self, vectorstore: VectorStore):
"""检索器集成展示"""
# 转换为检索器
retriever = vectorstore.as_retriever()
# 配置检索器参数
custom_retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={
"k": 5,
"fetch_k": 10,
"lambda_mult": 0.7
}
)
# 阈值检索器
threshold_retriever = vectorstore.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={
"score_threshold": 0.5,
"k": 3
}
)
return {
"basic_retriever": type(retriever).__name__,
"custom_retriever": custom_retriever.search_type,
"threshold_retriever": threshold_retriever.search_kwargs
}
4.4 生态协同说明
VectorStores与LangChain生态的深度集成:
# 生态协同示例
from langchain.chains import RetrievalQA
from langchain.chains.question_answering import load_qa_chain
from langchain.memory import VectorStoreRetrieverMemory
from langchain.agents import Tool
from langchain_openai import ChatOpenAI
class EcosystemIntegration:
"""生态协同展示"""
def __init__(self, vectorstore: VectorStore):
self.vectorstore = vectorstore
self.llm = ChatOpenAI()
def qa_chain_integration(self):
"""与QA链集成"""
# 1. RetrievalQA链
qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=self.vectorstore.as_retriever()
)
# 2. 自定义QA链
qa_prompt = ChatPromptTemplate.from_template("""
基于以下文档回答问题:
{context}
问题: {question}
""")
custom_chain = (
{
"context": self.vectorstore.as_retriever(),
"question": RunnablePassthrough()
}
| qa_prompt
| self.llm
| StrOutputParser()
)
return qa_chain, custom_chain
def memory_integration(self):
"""与记忆系统集成"""
# VectorStore作为记忆存储
memory = VectorStoreRetrieverMemory(
retriever=self.vectorstore.as_retriever(search_kwargs={"k": 3})
)
# 在对话中使用
memory.save_context(
{"input": "我喜欢Python编程"},
{"output": "Python是一门很棒的语言"}
)
# 检索相关记忆
relevant_memories = memory.load_memory_variables(
{"input": "推荐一门编程语言"}
)
return memory, relevant_memories
def agent_tool_integration(self):
"""与Agent工具集成"""
# VectorStore作为Agent工具
def search_knowledge_base(query: str) -> str:
"""搜索知识库工具"""
docs = self.vectorstore.similarity_search(query, k=3)
return "\n".join([doc.page_content for doc in docs])
knowledge_tool = Tool(
name="KnowledgeBase",
description="搜索内部知识库获取相关信息",
func=search_knowledge_base
)
return knowledge_tool
5. 替代方案与优化空间
5.1 替代实现方案对比
5.1.1 直接使用数据库SDK
优势:
- 性能最优,无抽象层开销
- 功能最全,可使用数据库特有功能
- 控制精细,可优化每个操作
劣势:
- 开发复杂度高
- 切换成本大
- 维护困难
# 直接使用SDK的性能对比
import time
import chromadb
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
def performance_comparison():
"""性能对比测试"""
# 准备测试数据
texts = [f"测试文档 {i}" for i in range(1000)]
embeddings_model = OpenAIEmbeddings()
embeddings = embeddings_model.embed_documents(texts)
# 1. 直接使用Chroma SDK
start_time = time.time()
client = chromadb.Client()
collection = client.create_collection("direct_test")
collection.add(
documents=texts,
embeddings=embeddings,
ids=[str(i) for i in range(len(texts))]
)
direct_time = time.time() - start_time
# 2. 使用LangChain Chroma
start_time = time.time()
vectorstore = Chroma(
collection_name="langchain_test",
embedding_function=embeddings_model
)
vectorstore.add_texts(texts)
langchain_time = time.time() - start_time
print(f"直接SDK时间: {direct_time:.2f}s")
print(f"LangChain时间: {langchain_time:.2f}s")
print(f"性能差异: {(langchain_time/direct_time-1)*100:.1f}%")
5.1.2 自定义抽象层
# 自定义轻量级抽象
from abc import ABC, abstractmethod
from typing import List, Tuple, Any
class SimpleVectorDB(ABC):
"""轻量级向量数据库抽象"""
@abstractmethod
def add(self, texts: List[str], vectors: List[List[float]]) -> List[str]:
pass
@abstractmethod
def search(self, vector: List[float], k: int) -> List[Tuple[str, float]]:
pass
class SimpleChroma(SimpleVectorDB):
"""简化的Chroma实现"""
def __init__(self):
import chromadb
self.client = chromadb.Client()
self.collection = self.client.create_collection("simple")
def add(self, texts: List[str], vectors: List[List[float]]) -> List[str]:
ids = [str(i) for i in range(len(texts))]
self.collection.add(
documents=texts,
embeddings=vectors,
ids=ids
)
return ids
def search(self, vector: List[float], k: int) -> List[Tuple[str, float]]:
results = self.collection.query(
query_embeddings=[vector],
n_results=k
)
docs = results['documents'][0]
distances = results['distances'][0]
return list(zip(docs, distances))
# 优势:更轻量,性能更好
# 劣势:功能有限,生态支持差
5.2 优化方向
5.2.1 性能优化
class OptimizedVectorStore:
"""性能优化的向量存储"""
def __init__(self, base_store: VectorStore):
self.base_store = base_store
self.cache = {} # 查询缓存
self.batch_size = 100 # 批处理大小
def add_documents_batch(self, documents: List[Document]) -> List[str]:
"""批量添加文档优化"""
all_ids = []
# 分批处理,避免内存溢出
for i in range(0, len(documents), self.batch_size):
batch = documents[i:i + self.batch_size]
batch_ids = self.base_store.add_documents(batch)
all_ids.extend(batch_ids)
return all_ids
def similarity_search_cached(
self,
query: str,
k: int = 4,
cache_ttl: int = 300 # 缓存5分钟
) -> List[Document]:
"""带缓存的相似性搜索"""
import hashlib
import time
# 生成缓存键
cache_key = hashlib.md5(f"{query}_{k}".encode()).hexdigest()
# 检查缓存
if cache_key in self.cache:
cached_result, timestamp = self.cache[cache_key]
if time.time() - timestamp < cache_ttl:
return cached_result
# 执行搜索
results = self.base_store.similarity_search(query, k=k)
# 更新缓存
self.cache[cache_key] = (results, time.time())
return results
5.2.2 功能优化
class EnhancedVectorStore:
"""功能增强的向量存储"""
def __init__(self, base_store: VectorStore):
self.base_store = base_store
def hybrid_search(
self,
query: str,
k: int = 4,
alpha: float = 0.7 # 向量搜索权重
) -> List[Document]:
"""混合搜索:向量搜索 + 关键词搜索"""
# 向量搜索
vector_results = self.base_store.similarity_search_with_relevance_scores(
query, k=k*2
)
# 关键词搜索(简单实现)
keyword_results = self._keyword_search(query, k=k*2)
# 融合结果
combined_results = self._combine_results(
vector_results, keyword_results, alpha
)
return combined_results[:k]
def _keyword_search(self, query: str, k: int) -> List[Tuple[Document, float]]:
"""关键词搜索实现"""
# 简化实现,实际可以使用BM25等算法
pass
def _combine_results(
self,
vector_results: List[Tuple[Document, float]],
keyword_results: List[Tuple[Document, float]],
alpha: float
) -> List[Document]:
"""结果融合"""
# RRF (Reciprocal Rank Fusion) 或其他融合算法
pass
def multi_vector_search(
self,
queries: List[str],
k: int = 4,
aggregation: str = "max" # max, mean, sum
) -> List[Document]:
"""多查询向量搜索"""
all_results = {}
for query in queries:
results = self.base_store.similarity_search_with_relevance_scores(
query, k=k*len(queries)
)
for doc, score in results:
doc_id = doc.metadata.get("id", id(doc))
if doc_id not in all_results:
all_results[doc_id] = {"doc": doc, "scores": []}
all_results[doc_id]["scores"].append(score)
# 聚合评分
final_results = []
for doc_id, data in all_results.items():
scores = data["scores"]
if aggregation == "max":
final_score = max(scores)
elif aggregation == "mean":
final_score = sum(scores) / len(scores)
elif aggregation == "sum":
final_score = sum(scores)
final_results.append((data["doc"], final_score))
# 排序并返回
final_results.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, _ in final_results[:k]]
5.2.3 成本优化
class CostOptimizedVectorStore:
"""成本优化的向量存储"""
def __init__(self, base_store: VectorStore):
self.base_store = base_store
self.embedding_cache = {} # 嵌入缓存
self.compression_enabled = True
def add_documents_deduplicated(
self,
documents: List[Document]
) -> List[str]:
"""去重添加文档,避免重复存储"""
# 计算文档哈希
unique_docs = {}
for doc in documents:
doc_hash = hashlib.md5(doc.page_content.encode()).hexdigest()
if doc_hash not in unique_docs:
unique_docs[doc_hash] = doc
# 只添加唯一文档
unique_doc_list = list(unique_docs.values())
print(f"去重后文档数量: {len(unique_doc_list)}/{len(documents)}")
return self.base_store.add_documents(unique_doc_list)
def compressed_storage(self, documents: List[Document]) -> List[str]:
"""压缩存储,减少存储成本"""
if not self.compression_enabled:
return self.base_store.add_documents(documents)
# 压缩文档内容
compressed_docs = []
for doc in documents:
compressed_content = self._compress_text(doc.page_content)
compressed_doc = Document(
page_content=compressed_content,
metadata={**doc.metadata, "compressed": True}
)
compressed_docs.append(compressed_doc)
return self.base_store.add_documents(compressed_docs)
def _compress_text(self, text: str) -> str:
"""文本压缩"""
import gzip
import base64
compressed = gzip.compress(text.encode('utf-8'))
return base64.b64encode(compressed).decode('ascii')
def _decompress_text(self, compressed_text: str) -> str:
"""文本解压"""
import gzip
import base64
compressed = base64.b64decode(compressed_text.encode('ascii'))
return gzip.decompress(compressed).decode('utf-8')
5.3 总结
VectorStores的核心价值:
- 统一抽象:为多种向量数据库提供一致接口
- 生态集成:与LangChain其他组件无缝协作
- 开发效率:大幅降低RAG应用开发复杂度
- 灵活切换:支持不同数据库间的平滑迁移
适用场景:
- 需要快速构建RAG应用的场景
- 对数据库选择有不确定性的项目
- 需要与LangChain生态深度集成的应用
- 团队技术栈相对简单的情况
不适用场景:
- 对性能有极致要求的高并发场景
- 需要使用数据库特有高级功能的情况
- 对抽象层开销敏感的应用
通过合理使用VectorStores,开发者可以快速构建功能完整、易于维护的RAG应用,同时保持足够的灵活性来应对未来的技术演进。