LangChain VectorStores核心:多向量数据库统一交互层与RAG存储中枢

发布于:2025-09-02 ⋅ 阅读:(17) ⋅ 点赞:(0)

LangChain VectorStores核心:多向量数据库统一交互层与RAG存储中枢

目录

  1. 核心定义与价值
  2. 底层实现逻辑
  3. 代码实践
  4. 设计考量
  5. 替代方案与优化空间

1. 核心定义与价值

1.1 本质定位

VectorStores 是LangChain框架中向量数据库的统一抽象接口,作为RAG(检索增强生成)系统的核心存储中枢,为不同向量数据库提供一致的操作接口。

核心价值

  • 统一抽象:为50+种向量数据库提供一致的API接口
  • RAG中枢:作为检索增强生成系统的核心存储层
  • 开发简化:屏蔽底层数据库差异,降低开发复杂度
  • 生态协同:与Embeddings、Retrievers无缝集成

1.2 核心痛点解决

传统直接对接向量数据库的困境

# 传统方式:直接使用不同数据库SDK
# Chroma
import chromadb
client = chromadb.Client()
collection = client.create_collection("docs")
collection.add(documents=texts, embeddings=embeddings, ids=ids)
results = collection.query(query_embeddings=[query_embedding], n_results=5)

# Pinecone
import pinecone
pinecone.init(api_key="xxx")
index = pinecone.Index("docs")
index.upsert(vectors=[(id, embedding, metadata)])
results = index.query(vector=query_embedding, top_k=5)

# Qdrant
from qdrant_client import QdrantClient
client = QdrantClient("localhost", port=6333)
client.upsert(collection_name="docs", points=[...])
results = client.search(collection_name="docs", query_vector=query_embedding, limit=5)

VectorStores统一接口的优势

# LangChain VectorStores统一方式
from langchain_chroma import Chroma
from langchain_pinecone import Pinecone
from langchain_qdrant import Qdrant
from langchain_openai import OpenAIEmbeddings

# 统一的接口,可随时切换数据库
embeddings = OpenAIEmbeddings()

# 任选一种,接口完全一致
vectorstore = Chroma(embedding_function=embeddings)
# vectorstore = Pinecone(embedding=embeddings, index_name="docs")
# vectorstore = Qdrant(embeddings=embeddings, collection_name="docs")

# 统一的操作方式
vectorstore.add_documents(documents)
results = vectorstore.similarity_search(query, k=5)

1.3 RAG流程中的存储中枢地位

原始文档
文档分割器
TextSplitter
文档块
Document Chunks
嵌入模型
Embeddings
向量表示
Vector Embeddings
VectorStores
统一存储接口
Chroma
Pinecone
Qdrant
FAISS
其他50+数据库
用户查询
查询向量化
Query Embedding
VectorStores
相似性检索
检索结果
Retrieved Documents
LLM生成
Answer Generation

1.4 核心能力概述

VectorStores支持的关键操作:

  1. 文档管理add_documents(), delete(), get_by_ids()
  2. 相似性检索similarity_search(), similarity_search_with_score()
  3. 多样化检索max_marginal_relevance_search() (MMR算法)
  4. 检索器集成as_retriever() 转换为Retriever对象
  5. 异步支持:所有操作的异步版本
  6. 元数据过滤:基于文档元数据的条件检索

2. 底层实现逻辑

2.1 核心接口定义

VectorStore抽象基类定义了统一的接口规范:

# libs/core/langchain_core/vectorstores/base.py
from abc import ABC, abstractmethod
from typing import Optional, Any, Callable
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings

class VectorStore(ABC):
    """向量存储的统一接口"""
    
    @abstractmethod
    def similarity_search(
        self, query: str, k: int = 4, **kwargs: Any
    ) -> list[Document]:
        """返回与查询最相似的文档"""
        
    @abstractmethod  
    def add_texts(
        self,
        texts: Iterable[str],
        metadatas: Optional[list[dict]] = None,
        *,
        ids: Optional[list[str]] = None,
        **kwargs: Any,
    ) -> list[str]:
        """添加文本到向量存储"""
        
    @classmethod
    @abstractmethod
    def from_texts(
        cls,
        texts: list[str],
        embedding: Embeddings,
        metadatas: Optional[list[dict]] = None,
        **kwargs: Any,
    ) -> "VectorStore":
        """从文本创建向量存储实例"""
        
    @property
    def embeddings(self) -> Optional[Embeddings]:
        """访问嵌入模型对象"""
        return None
        
    def as_retriever(self, **kwargs: Any) -> VectorStoreRetriever:
        """转换为检索器对象"""
        return VectorStoreRetriever(vectorstore=self, **kwargs)

2.2 多数据库适配原理

以Chroma和内存向量存储为例,展示适配机制:

2.2.1 Chroma适配实现
# libs/partners/chroma/langchain_chroma/vectorstores.py
from langchain_core.vectorstores import VectorStore
import chromadb

class Chroma(VectorStore):
    """Chroma向量数据库适配器"""
    
    def __init__(
        self,
        collection_name: str = "langchain",
        embedding_function: Optional[Embeddings] = None,
        persist_directory: Optional[str] = None,
        client_settings: Optional[chromadb.config.Settings] = None,
        **kwargs
    ):
        # 初始化Chroma客户端
        if persist_directory:
            self._client = chromadb.PersistentClient(path=persist_directory)
        else:
            self._client = chromadb.Client(settings=client_settings)
            
        self._embedding_function = embedding_function
        self._collection = self._client.get_or_create_collection(
            name=collection_name
        )
    
    def similarity_search(
        self, query: str, k: int = 4, **kwargs: Any
    ) -> list[Document]:
        """实现Chroma的相似性搜索"""
        if self._embedding_function:
            # 使用LangChain嵌入模型
            query_embedding = self._embedding_function.embed_query(query)
            results = self._collection.query(
                query_embeddings=[query_embedding],
                n_results=k,
                **kwargs
            )
        else:
            # 使用Chroma内置嵌入
            results = self._collection.query(
                query_texts=[query],
                n_results=k,
                **kwargs
            )
        
        # 转换为LangChain Document格式
        return self._results_to_docs(results)
    
    def add_texts(
        self,
        texts: Iterable[str],
        metadatas: Optional[list[dict]] = None,
        ids: Optional[list[str]] = None,
        **kwargs: Any,
    ) -> list[str]:
        """添加文本到Chroma"""
        texts_list = list(texts)
        
        # 生成ID
        if ids is None:
            ids = [str(uuid.uuid4()) for _ in texts_list]
            
        # 生成嵌入向量
        embeddings = None
        if self._embedding_function:
            embeddings = self._embedding_function.embed_documents(texts_list)
            
        # 存储到Chroma
        self._collection.upsert(
            documents=texts_list,
            embeddings=embeddings,
            metadatas=metadatas,
            ids=ids
        )
        
        return ids
    
    @classmethod
    def from_texts(
        cls,
        texts: list[str],
        embedding: Embeddings,
        metadatas: Optional[list[dict]] = None,
        **kwargs: Any,
    ) -> "Chroma":
        """从文本创建Chroma实例"""
        vectorstore = cls(embedding_function=embedding, **kwargs)
        vectorstore.add_texts(texts, metadatas)
        return vectorstore
2.2.2 内存向量存储实现
# libs/core/langchain_core/vectorstores/in_memory.py
import numpy as np
from langchain_core.vectorstores.utils import cosine_similarity

class InMemoryVectorStore(VectorStore):
    """内存向量存储实现"""
    
    def __init__(self, embedding: Embeddings):
        self.embedding = embedding
        self.store: dict[str, dict] = {}  # {id: {vector, text, metadata}}
    
    def similarity_search(
        self, query: str, k: int = 4, **kwargs: Any
    ) -> list[Document]:
        """基于余弦相似度的搜索"""
        if not self.store:
            return []
            
        # 查询向量化
        query_embedding = self.embedding.embed_query(query)
        
        # 计算相似度
        similarities = []
        for doc_id, doc_data in self.store.items():
            similarity = cosine_similarity(
                [query_embedding], [doc_data["vector"]]
            )[0][0]
            similarities.append((doc_id, similarity))
        
        # 排序并返回top-k
        similarities.sort(key=lambda x: x[1], reverse=True)
        top_k = similarities[:k]
        
        results = []
        for doc_id, _ in top_k:
            doc_data = self.store[doc_id]
            results.append(Document(
                id=doc_id,
                page_content=doc_data["text"],
                metadata=doc_data["metadata"]
            ))
        
        return results
    
    def add_texts(
        self,
        texts: Iterable[str],
        metadatas: Optional[list[dict]] = None,
        ids: Optional[list[str]] = None,
        **kwargs: Any,
    ) -> list[str]:
        """添加文本到内存存储"""
        texts_list = list(texts)
        
        # 生成嵌入向量
        embeddings = self.embedding.embed_documents(texts_list)
        
        # 生成ID
        if ids is None:
            ids = [str(uuid.uuid4()) for _ in texts_list]
            
        # 存储到内存
        for i, text in enumerate(texts_list):
            doc_id = ids[i]
            self.store[doc_id] = {
                "vector": embeddings[i],
                "text": text,
                "metadata": metadatas[i] if metadatas else {}
            }
        
        return ids

2.3 元数据管理逻辑

VectorStores通过Document对象统一管理文本内容和元数据:

from langchain_core.documents import Document

# Document结构
class Document:
    page_content: str      # 文档内容
    metadata: dict        # 元数据
    id: Optional[str]     # 文档ID

# 元数据使用示例
documents = [
    Document(
        page_content="LangChain是一个用于构建LLM应用的框架",
        metadata={
            "source": "langchain_docs.pdf",
            "page": 1,
            "category": "framework",
            "author": "LangChain Team"
        }
    ),
    Document(
        page_content="向量数据库用于存储和检索嵌入向量",
        metadata={
            "source": "vector_db_guide.pdf", 
            "page": 5,
            "category": "database",
            "difficulty": "intermediate"
        }
    )
]

# 添加到向量存储
vectorstore.add_documents(documents)

# 基于元数据过滤检索
results = vectorstore.similarity_search(
    query="LangChain框架",
    k=5,
    filter={"category": "framework"}  # 只检索框架相关文档
)

2.4 相似性算法封装

VectorStores抽象了不同数据库的相似性计算:

class VectorStore(ABC):
    """相似性评分函数"""
    
    @staticmethod
    def _cosine_relevance_score_fn(distance: float) -> float:
        """余弦距离转相似度评分"""
        return 1.0 - distance
    
    @staticmethod
    def _euclidean_relevance_score_fn(distance: float) -> float:
        """欧几里得距离转相似度评分"""
        return 1.0 - distance / math.sqrt(2)
    
    @staticmethod
    def _max_inner_product_relevance_score_fn(distance: float) -> float:
        """最大内积距离转相似度评分"""
        if distance > 0:
            return 1.0 - distance
        return -1.0 * distance
    
    def _select_relevance_score_fn(self) -> Callable[[float], float]:
        """选择合适的相似度函数"""
        # 子类根据底层数据库特性选择合适的函数
        raise NotImplementedError
    
    def similarity_search_with_relevance_scores(
        self, query: str, k: int = 4, **kwargs: Any
    ) -> list[tuple[Document, float]]:
        """返回文档和标准化的相似度评分(0-1)"""
        relevance_score_fn = self._select_relevance_score_fn()
        docs_and_scores = self.similarity_search_with_score(query, k, **kwargs)
        return [(doc, relevance_score_fn(score)) for doc, score in docs_and_scores]
---

## 3. 代码实践

### 3.1 基础实践1:多向量数据库统一操作

#### 安装依赖
```bash
# 核心包
pip install langchain-core langchain-openai

# 向量数据库
pip install langchain-chroma chromadb
pip install langchain-pinecone pinecone-client
Chroma vs Pinecone 统一接口对比
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
import os

# 设置API密钥
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
os.environ["PINECONE_API_KEY"] = "your-pinecone-api-key"

# 初始化嵌入模型
embeddings = OpenAIEmbeddings()

# 准备测试文档
documents = [
    Document(
        page_content="LangChain是一个强大的LLM应用开发框架",
        metadata={"source": "doc1", "category": "framework"}
    ),
    Document(
        page_content="向量数据库用于高效存储和检索嵌入向量",
        metadata={"source": "doc2", "category": "database"}
    ),
    Document(
        page_content="RAG结合了检索和生成,提升LLM的知识能力",
        metadata={"source": "doc3", "category": "technique"}
    )
]

# 方案1:使用Chroma (本地存储)
from langchain_chroma import Chroma

print("=== Chroma向量存储 ===")
chroma_store = Chroma(
    collection_name="test_collection",
    embedding_function=embeddings,
    persist_directory="./chroma_db"  # 本地持久化
)

# 添加文档
chroma_ids = chroma_store.add_documents(documents)
print(f"Chroma添加文档ID: {chroma_ids}")

# 相似性搜索
chroma_results = chroma_store.similarity_search(
    query="LangChain框架的特点",
    k=2
)
print("Chroma搜索结果:")
for i, doc in enumerate(chroma_results):
    print(f"  {i+1}. {doc.page_content}")
    print(f"     元数据: {doc.metadata}")

# 方案2:使用Pinecone (云端存储)
from langchain_pinecone import Pinecone

print("\n=== Pinecone向量存储 ===")
pinecone_store = Pinecone(
    index_name="test-index",  # 需要预先在Pinecone创建
    embedding=embeddings
)

# 添加文档 (接口完全一致!)
pinecone_ids = pinecone_store.add_documents(documents)
print(f"Pinecone添加文档ID: {pinecone_ids}")

# 相似性搜索 (接口完全一致!)
pinecone_results = pinecone_store.similarity_search(
    query="LangChain框架的特点",
    k=2
)
print("Pinecone搜索结果:")
for i, doc in enumerate(pinecone_results):
    print(f"  {i+1}. {doc.page_content}")
    print(f"     元数据: {doc.metadata}")

# 统一的切换函数
def create_vectorstore(store_type: str):
    """工厂函数:统一创建不同类型的向量存储"""
    if store_type == "chroma":
        return Chroma(
            collection_name="unified_collection",
            embedding_function=embeddings,
            persist_directory="./chroma_db"
        )
    elif store_type == "pinecone":
        return Pinecone(
            index_name="unified-index",
            embedding=embeddings
        )
    else:
        raise ValueError(f"不支持的存储类型: {store_type}")

# 业务代码与具体数据库解耦
def search_documents(vectorstore, query: str, k: int = 3):
    """统一的文档搜索函数"""
    results = vectorstore.similarity_search(query, k=k)
    
    print(f"\n查询: '{query}'")
    print(f"找到 {len(results)} 个相关文档:")
    
    for i, doc in enumerate(results):
        print(f"  {i+1}. {doc.page_content[:50]}...")
        print(f"     来源: {doc.metadata.get('source', 'unknown')}")
    
    return results

# 可以随时切换数据库,业务逻辑不变
current_store = create_vectorstore("chroma")  # 或 "pinecone"
search_documents(current_store, "向量数据库的优势")

3.2 基础实践2:元数据关联与过滤检索

from langchain_core.documents import Document
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
from typing import Dict, Any, Callable

# 初始化
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(
    collection_name="filtered_search",
    embedding_function=embeddings,
    persist_directory="./filtered_db"
)

# 创建带有丰富元数据的文档
documents = [
    Document(
        page_content="Python是一种高级编程语言,广泛用于数据科学和机器学习",
        metadata={
            "language": "python",
            "difficulty": "beginner",
            "category": "programming",
            "tags": ["data-science", "ml"],
            "author": "张三",
            "publish_date": "2024-01-15"
        }
    ),
    Document(
        page_content="JavaScript是Web开发的核心语言,用于前端和后端开发",
        metadata={
            "language": "javascript", 
            "difficulty": "intermediate",
            "category": "programming",
            "tags": ["web", "frontend"],
            "author": "李四",
            "publish_date": "2024-02-20"
        }
    ),
    Document(
        page_content="Docker容器化技术简化了应用部署和管理",
        metadata={
            "language": "general",
            "difficulty": "advanced", 
            "category": "devops",
            "tags": ["container", "deployment"],
            "author": "王五",
            "publish_date": "2024-03-10"
        }
    ),
    Document(
        page_content="React是流行的前端框架,用于构建用户界面",
        metadata={
            "language": "javascript",
            "difficulty": "intermediate",
            "category": "framework",
            "tags": ["frontend", "ui"],
            "author": "赵六",
            "publish_date": "2024-01-25"
        }
    )
]

# 添加文档
vectorstore.add_documents(documents)

# 1. 基于单个字段过滤
print("=== 基于编程语言过滤 ===")
python_docs = vectorstore.similarity_search(
    query="编程语言特点",
    k=5,
    filter={"language": "python"}  # 只检索Python相关文档
)

for doc in python_docs:
    print(f"- {doc.page_content}")
    print(f"  语言: {doc.metadata['language']}")

# 2. 基于多个字段过滤
print("\n=== 基于多条件过滤 ===")
intermediate_programming = vectorstore.similarity_search(
    query="开发技术",
    k=5,
    filter={
        "category": "programming",
        "difficulty": "intermediate"
    }
)

for doc in intermediate_programming:
    print(f"- {doc.page_content}")
    print(f"  难度: {doc.metadata['difficulty']}, 分类: {doc.metadata['category']}")

# 3. 带评分的过滤检索
print("\n=== 带相似度评分的检索 ===")
results_with_scores = vectorstore.similarity_search_with_relevance_scores(
    query="前端开发技术",
    k=3,
    filter={"category": "programming"}
)

for doc, score in results_with_scores:
    print(f"- 相似度: {score:.3f}")
    print(f"  内容: {doc.page_content}")
    print(f"  标签: {doc.metadata.get('tags', [])}")

# 4. 复杂查询示例:组合多种条件
def advanced_search(
    vectorstore,
    query: str,
    filters: Dict[str, Any] = None,
    exclude_authors: list[str] = None,
    min_score: float = 0.0
):
    """高级搜索函数"""
    
    # 基础检索
    results = vectorstore.similarity_search_with_relevance_scores(
        query=query,
        k=10,  # 先获取更多结果
        filter=filters or {}
    )
    
    # 后处理过滤
    filtered_results = []
    for doc, score in results:
        # 评分过滤
        if score < min_score:
            continue
            
        # 作者过滤
        if exclude_authors and doc.metadata.get("author") in exclude_authors:
            continue
            
        filtered_results.append((doc, score))
    
    return filtered_results

# 使用高级搜索
print("\n=== 高级搜索示例 ===")
advanced_results = advanced_search(
    vectorstore=vectorstore,
    query="开发框架和工具",
    filters={"difficulty": "intermediate"},
    exclude_authors=["张三"],
    min_score=0.1
)

for doc, score in advanced_results:
    print(f"- 评分: {score:.3f}, 作者: {doc.metadata['author']}")
    print(f"  {doc.page_content}")

3.3 进阶实践:VectorStores+Embeddings+Retriever联动

完整的RAG检索层实现:

# 安装额外依赖
pip install langchain langchain-community
from langchain_core.documents import Document
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter
import os

# 设置API密钥
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"

class RAGSystem:
    """完整的RAG检索系统"""
    
    def __init__(
        self,
        embedding_model: str = "text-embedding-3-small",
        llm_model: str = "gpt-3.5-turbo",
        persist_directory: str = "./rag_db"
    ):
        # 初始化组件
        self.embeddings = OpenAIEmbeddings(model=embedding_model)
        self.llm = ChatOpenAI(model=llm_model, temperature=0)
        
        # 初始化向量存储
        self.vectorstore = Chroma(
            collection_name="rag_collection",
            embedding_function=self.embeddings,
            persist_directory=persist_directory
        )
        
        # 文档分割器
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            separators=["\n\n", "\n", "。", "!", "?", ".", "!", "?"]
        )
        
        # RAG提示模板
        self.rag_prompt = ChatPromptTemplate.from_template("""
基于以下检索到的相关文档回答问题。如果文档中没有相关信息,请说明无法从提供的文档中找到答案。

相关文档:
{context}

问题: {question}

请提供准确、详细的回答:
""")
        
        # 构建RAG链
        self.retriever = None
        self.rag_chain = None
    
    def add_documents(self, texts: list[str], metadatas: list[dict] = None):
        """添加文档到知识库"""
        
        # 分割长文档
        all_chunks = []
        all_metadatas = []
        
        for i, text in enumerate(texts):
            chunks = self.text_splitter.split_text(text)
            base_metadata = metadatas[i] if metadatas else {}
            
            for j, chunk in enumerate(chunks):
                chunk_metadata = {
                    **base_metadata,
                    "chunk_id": f"{i}_{j}",
                    "chunk_index": j,
                    "total_chunks": len(chunks)
                }
                all_chunks.append(chunk)
                all_metadatas.append(chunk_metadata)
        
        # 添加到向量存储
        doc_objects = [
            Document(page_content=chunk, metadata=metadata)
            for chunk, metadata in zip(all_chunks, all_metadatas)
        ]
        
        ids = self.vectorstore.add_documents(doc_objects)
        print(f"成功添加 {len(ids)} 个文档块到知识库")
        
        return ids
    
    def setup_retriever(
        self,
        search_type: str = "similarity",
        search_kwargs: dict = None
    ):
        """配置检索器"""
        
        default_kwargs = {"k": 4}
        if search_kwargs:
            default_kwargs.update(search_kwargs)
        
        self.retriever = self.vectorstore.as_retriever(
            search_type=search_type,
            search_kwargs=default_kwargs
        )
        
        # 构建RAG处理链
        def format_docs(docs):
            """格式化检索到的文档"""
            formatted = []
            for i, doc in enumerate(docs):
                source = doc.metadata.get("source", "未知来源")
                formatted.append(f"文档{i+1} (来源: {source}):\n{doc.page_content}")
            return "\n\n".join(formatted)
        
        self.rag_chain = (
            {
                "context": self.retriever | format_docs,
                "question": RunnablePassthrough()
            }
            | self.rag_prompt
            | self.llm
            | StrOutputParser()
        )
        
        print(f"检索器配置完成: {search_type}, 参数: {default_kwargs}")
    
    def search(self, query: str, k: int = 4, with_scores: bool = False):
        """直接检索相关文档"""
        
        if with_scores:
            results = self.vectorstore.similarity_search_with_relevance_scores(
                query, k=k
            )
            return results
        else:
            results = self.vectorstore.similarity_search(query, k=k)
            return results
    
    def ask(self, question: str) -> str:
        """RAG问答"""
        
        if not self.rag_chain:
            raise ValueError("请先调用 setup_retriever() 配置检索器")
        
        # 执行RAG链
        answer = self.rag_chain.invoke(question)
        return answer
    
    def ask_with_sources(self, question: str) -> dict:
        """带来源信息的RAG问答"""
        
        # 检索相关文档
        retrieved_docs = self.retriever.invoke(question)
        
        # 生成回答
        answer = self.ask(question)
        
        # 整理来源信息
        sources = []
        for doc in retrieved_docs:
            source_info = {
                "content": doc.page_content[:200] + "...",
                "metadata": doc.metadata
            }
            sources.append(source_info)
        
        return {
            "answer": answer,
            "sources": sources,
            "num_sources": len(sources)
        }

# 使用示例
def main():
    # 创建RAG系统
    rag = RAGSystem()
    
    # 准备知识库文档
    knowledge_docs = [
        """
        LangChain是一个用于构建大语言模型应用的开源框架。它提供了丰富的组件和工具,
        帮助开发者快速构建复杂的AI应用。LangChain的核心概念包括:
        
        1. Chains(链):将多个组件串联起来形成处理流程
        2. Agents(代理):能够使用工具并做出决策的智能体
        3. Memory(记忆):在对话中保持上下文信息
        4. Retrievers(检索器):从外部数据源检索相关信息
        5. VectorStores(向量存储):存储和检索嵌入向量
        
        LangChain支持多种大语言模型,包括OpenAI GPT、Anthropic Claude、
        Google PaLM等,并提供统一的接口进行调用。
        """,
        
        """
        向量数据库是专门用于存储和检索高维向量数据的数据库系统。在AI应用中,
        向量数据库主要用于:
        
        1. 语义搜索:基于文本语义而非关键词匹配进行搜索
        2. 推荐系统:根据用户偏好向量推荐相似内容
        3. 图像检索:通过图像特征向量找到相似图像
        4. 异常检测:识别与正常模式差异较大的数据点
        
        常见的向量数据库包括:
        - Chroma:轻量级的开源向量数据库
        - Pinecone:云端向量数据库服务
        - Qdrant:高性能的向量搜索引擎
        - Weaviate:开源的向量搜索引擎
        - FAISS:Facebook开源的相似性搜索库
        """,
        
        """
        RAG(Retrieval-Augmented Generation)是一种结合检索和生成的AI技术。
        RAG的工作流程包括:
        
        1. 文档预处理:将知识库文档分割成小块
        2. 向量化:使用嵌入模型将文档块转换为向量
        3. 存储:将向量存储到向量数据库中
        4. 检索:根据用户查询检索相关文档块
        5. 生成:将检索到的文档作为上下文,生成回答
        
        RAG的优势:
        - 知识更新:可以动态更新知识库而无需重训练模型
        - 可解释性:可以追溯回答的来源文档
        - 成本效益:比微调大模型成本更低
        - 准确性:基于真实文档生成回答,减少幻觉
        """
    ]
    
    # 文档元数据
    metadatas = [
        {"source": "langchain_guide.md", "category": "framework", "author": "技术团队"},
        {"source": "vector_db_intro.md", "category": "database", "author": "数据团队"},
        {"source": "rag_tutorial.md", "category": "technique", "author": "AI团队"}
    ]
    
    # 添加文档到知识库
    print("=== 构建知识库 ===")
    rag.add_documents(knowledge_docs, metadatas)
    
    # 配置不同类型的检索器
    print("\n=== 配置检索器 ===")
    
    # 1. 基础相似性检索
    rag.setup_retriever(
        search_type="similarity",
        search_kwargs={"k": 3}
    )
    
    # 测试直接检索
    print("\n=== 直接检索测试 ===")
    search_results = rag.search("什么是向量数据库", k=2, with_scores=True)
    for doc, score in search_results:
        print(f"相似度: {score:.3f}")
        print(f"内容: {doc.page_content[:100]}...")
        print(f"来源: {doc.metadata['source']}")
        print()
    
    # 测试RAG问答
    print("\n=== RAG问答测试 ===")
    
    questions = [
        "LangChain的核心概念有哪些?",
        "向量数据库主要用于什么场景?",
        "RAG技术的优势是什么?",
        "如何选择合适的向量数据库?"
    ]
    
    for question in questions:
        print(f"\n问题: {question}")
        print("-" * 50)
        
        # 带来源的回答
        result = rag.ask_with_sources(question)
        print(f"回答: {result['answer']}")
        print(f"\n参考来源 ({result['num_sources']} 个):")
        
        for i, source in enumerate(result['sources']):
            print(f"  {i+1}. {source['metadata']['source']} "
                  f"(分类: {source['metadata']['category']})")
            print(f"     {source['content']}")

if __name__ == "__main__":
    main()

4. 设计考量

4.1 解耦与扩展性分析

接口抽象的价值

# 设计模式:策略模式 + 工厂模式
from abc import ABC, abstractmethod
from typing import Protocol

class VectorStoreStrategy(Protocol):
    """向量存储策略接口"""
    
    def add_documents(self, documents: list[Document]) -> list[str]:
        ...
    
    def similarity_search(self, query: str, k: int) -> list[Document]:
        ...

class RAGApplication:
    """RAG应用 - 依赖抽象而非具体实现"""
    
    def __init__(self, vectorstore: VectorStoreStrategy):
        self.vectorstore = vectorstore  # 依赖注入
    
    def process_query(self, query: str) -> str:
        # 业务逻辑与具体数据库解耦
        docs = self.vectorstore.similarity_search(query, k=3)
        return self.generate_answer(docs, query)
    
    def generate_answer(self, docs: list[Document], query: str) -> str:
        # 生成逻辑...
        pass

# 工厂函数:根据配置创建不同的向量存储
def create_vectorstore(config: dict) -> VectorStoreStrategy:
    store_type = config.get("type")
    
    if store_type == "chroma":
        return Chroma(**config.get("chroma_params", {}))
    elif store_type == "pinecone":
        return Pinecone(**config.get("pinecone_params", {}))
    elif store_type == "qdrant":
        return Qdrant(**config.get("qdrant_params", {}))
    else:
        raise ValueError(f"不支持的向量存储类型: {store_type}")

# 配置驱动的应用
config = {
    "type": "chroma",  # 可以轻松切换为 "pinecone" 或 "qdrant"
    "chroma_params": {
        "collection_name": "my_collection",
        "persist_directory": "./db"
    }
}

vectorstore = create_vectorstore(config)
app = RAGApplication(vectorstore)

扩展性体现

  1. 新数据库支持:只需实现VectorStore接口,无需修改业务代码
  2. 功能增强:可以在基类中添加新方法,所有子类自动继承
  3. 配置灵活:通过配置文件控制使用哪种数据库
  4. 测试友好:可以使用内存实现进行单元测试

4.2 开发者体验对比

传统方式 vs LangChain方式

# === 传统方式:直接使用数据库SDK ===
class TraditionalRAG:
    def __init__(self, db_type: str):
        if db_type == "chroma":
            import chromadb
            self.client = chromadb.Client()
            self.collection = self.client.create_collection("docs")
        elif db_type == "pinecone":
            import pinecone
            pinecone.init(api_key="xxx")
            self.index = pinecone.Index("docs")
        # 每种数据库都需要不同的初始化代码...
    
    def add_documents(self, texts: list[str], embeddings: list[list[float]]):
        if hasattr(self, 'collection'):  # Chroma
            self.collection.add(
                documents=texts,
                embeddings=embeddings,
                ids=[str(i) for i in range(len(texts))]
            )
        elif hasattr(self, 'index'):  # Pinecone
            vectors = [(str(i), emb, {"text": text}) 
                      for i, (text, emb) in enumerate(zip(texts, embeddings))]
            self.index.upsert(vectors=vectors)
        # 每种数据库都需要不同的添加逻辑...
    
    def search(self, query_embedding: list[float], k: int):
        if hasattr(self, 'collection'):  # Chroma
            results = self.collection.query(
                query_embeddings=[query_embedding],
                n_results=k
            )
            return results['documents'][0]
        elif hasattr(self, 'index'):  # Pinecone
            results = self.index.query(
                vector=query_embedding,
                top_k=k,
                include_metadata=True
            )
            return [match['metadata']['text'] for match in results['matches']]
        # 每种数据库都需要不同的搜索逻辑...

# === LangChain方式:统一接口 ===
class LangChainRAG:
    def __init__(self, vectorstore: VectorStore):
        self.vectorstore = vectorstore  # 统一接口
    
    def add_documents(self, documents: list[Document]):
        return self.vectorstore.add_documents(documents)  # 统一方法
    
    def search(self, query: str, k: int):
        return self.vectorstore.similarity_search(query, k=k)  # 统一方法

# 开发者体验对比
print("传统方式问题:")
print("1. 每种数据库需要学习不同的API")
print("2. 切换数据库需要重写大量代码") 
print("3. 测试困难,需要搭建真实数据库环境")
print("4. 错误处理和重试逻辑需要重复实现")

print("\nLangChain方式优势:")
print("1. 学会一套API,适用所有支持的数据库")
print("2. 切换数据库只需修改配置")
print("3. 可以使用InMemoryVectorStore进行测试")
print("4. 统一的错误处理和最佳实践")

4.3 功能完整性解释

VectorStores提供了完整的向量数据库操作功能:

# 完整功能矩阵
class VectorStoreFunctionality:
    """VectorStores功能完整性展示"""
    
    # 1. 数据管理
    def data_management_demo(self, vectorstore: VectorStore):
        """数据管理功能"""
        
        # 添加文档
        docs = [Document(page_content="test", metadata={"id": 1})]
        ids = vectorstore.add_documents(docs)
        
        # 批量添加文本
        texts = ["text1", "text2", "text3"]
        metadatas = [{"source": "doc1"}, {"source": "doc2"}, {"source": "doc3"}]
        text_ids = vectorstore.add_texts(texts, metadatas=metadatas)
        
        # 获取文档
        retrieved_docs = vectorstore.get_by_ids(ids)
        
        # 删除文档
        vectorstore.delete(ids=ids[:1])
        
        return {
            "added_docs": len(ids),
            "added_texts": len(text_ids),
            "retrieved": len(retrieved_docs)
        }
    
    # 2. 搜索功能
    def search_functionality_demo(self, vectorstore: VectorStore):
        """搜索功能展示"""
        
        query = "测试查询"
        
        # 基础相似性搜索
        basic_results = vectorstore.similarity_search(query, k=3)
        
        # 带评分的搜索
        scored_results = vectorstore.similarity_search_with_score(query, k=3)
        
        # 带标准化评分的搜索
        relevance_results = vectorstore.similarity_search_with_relevance_scores(
            query, k=3
        )
        
        # 向量搜索
        embedding = [0.1] * 1536  # 假设的查询向量
        vector_results = vectorstore.similarity_search_by_vector(embedding, k=3)
        
        # MMR搜索(最大边际相关性)
        mmr_results = vectorstore.max_marginal_relevance_search(
            query, k=3, fetch_k=6, lambda_mult=0.5
        )
        
        return {
            "basic": len(basic_results),
            "scored": len(scored_results), 
            "relevance": len(relevance_results),
            "vector": len(vector_results),
            "mmr": len(mmr_results)
        }
    
    # 3. 异步支持
    async def async_functionality_demo(self, vectorstore: VectorStore):
        """异步功能展示"""
        
        # 异步添加
        docs = [Document(page_content="async test")]
        await vectorstore.aadd_documents(docs)
        
        # 异步搜索
        results = await vectorstore.asimilarity_search("test query", k=3)
        
        # 异步删除
        await vectorstore.adelete(ids=["test_id"])
        
        return len(results)
    
    # 4. 检索器集成
    def retriever_integration_demo(self, vectorstore: VectorStore):
        """检索器集成展示"""
        
        # 转换为检索器
        retriever = vectorstore.as_retriever()
        
        # 配置检索器参数
        custom_retriever = vectorstore.as_retriever(
            search_type="mmr",
            search_kwargs={
                "k": 5,
                "fetch_k": 10,
                "lambda_mult": 0.7
            }
        )
        
        # 阈值检索器
        threshold_retriever = vectorstore.as_retriever(
            search_type="similarity_score_threshold",
            search_kwargs={
                "score_threshold": 0.5,
                "k": 3
            }
        )
        
        return {
            "basic_retriever": type(retriever).__name__,
            "custom_retriever": custom_retriever.search_type,
            "threshold_retriever": threshold_retriever.search_kwargs
        }

4.4 生态协同说明

VectorStores与LangChain生态的深度集成:

# 生态协同示例
from langchain.chains import RetrievalQA
from langchain.chains.question_answering import load_qa_chain
from langchain.memory import VectorStoreRetrieverMemory
from langchain.agents import Tool
from langchain_openai import ChatOpenAI

class EcosystemIntegration:
    """生态协同展示"""
    
    def __init__(self, vectorstore: VectorStore):
        self.vectorstore = vectorstore
        self.llm = ChatOpenAI()
    
    def qa_chain_integration(self):
        """与QA链集成"""
        
        # 1. RetrievalQA链
        qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type="stuff",
            retriever=self.vectorstore.as_retriever()
        )
        
        # 2. 自定义QA链
        qa_prompt = ChatPromptTemplate.from_template("""
        基于以下文档回答问题:
        {context}
        
        问题: {question}
        """)
        
        custom_chain = (
            {
                "context": self.vectorstore.as_retriever(),
                "question": RunnablePassthrough()
            }
            | qa_prompt
            | self.llm
            | StrOutputParser()
        )
        
        return qa_chain, custom_chain
    
    def memory_integration(self):
        """与记忆系统集成"""
        
        # VectorStore作为记忆存储
        memory = VectorStoreRetrieverMemory(
            retriever=self.vectorstore.as_retriever(search_kwargs={"k": 3})
        )
        
        # 在对话中使用
        memory.save_context(
            {"input": "我喜欢Python编程"},
            {"output": "Python是一门很棒的语言"}
        )
        
        # 检索相关记忆
        relevant_memories = memory.load_memory_variables(
            {"input": "推荐一门编程语言"}
        )
        
        return memory, relevant_memories
    
    def agent_tool_integration(self):
        """与Agent工具集成"""
        
        # VectorStore作为Agent工具
        def search_knowledge_base(query: str) -> str:
            """搜索知识库工具"""
            docs = self.vectorstore.similarity_search(query, k=3)
            return "\n".join([doc.page_content for doc in docs])
        
        knowledge_tool = Tool(
            name="KnowledgeBase",
            description="搜索内部知识库获取相关信息",
            func=search_knowledge_base
        )
        
        return knowledge_tool

5. 替代方案与优化空间

5.1 替代实现方案对比

5.1.1 直接使用数据库SDK

优势

  • 性能最优,无抽象层开销
  • 功能最全,可使用数据库特有功能
  • 控制精细,可优化每个操作

劣势

  • 开发复杂度高
  • 切换成本大
  • 维护困难
# 直接使用SDK的性能对比
import time
import chromadb
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings

def performance_comparison():
    """性能对比测试"""
    
    # 准备测试数据
    texts = [f"测试文档 {i}" for i in range(1000)]
    embeddings_model = OpenAIEmbeddings()
    embeddings = embeddings_model.embed_documents(texts)
    
    # 1. 直接使用Chroma SDK
    start_time = time.time()
    
    client = chromadb.Client()
    collection = client.create_collection("direct_test")
    collection.add(
        documents=texts,
        embeddings=embeddings,
        ids=[str(i) for i in range(len(texts))]
    )
    
    direct_time = time.time() - start_time
    
    # 2. 使用LangChain Chroma
    start_time = time.time()
    
    vectorstore = Chroma(
        collection_name="langchain_test",
        embedding_function=embeddings_model
    )
    vectorstore.add_texts(texts)
    
    langchain_time = time.time() - start_time
    
    print(f"直接SDK时间: {direct_time:.2f}s")
    print(f"LangChain时间: {langchain_time:.2f}s")
    print(f"性能差异: {(langchain_time/direct_time-1)*100:.1f}%")
5.1.2 自定义抽象层
# 自定义轻量级抽象
from abc import ABC, abstractmethod
from typing import List, Tuple, Any

class SimpleVectorDB(ABC):
    """轻量级向量数据库抽象"""
    
    @abstractmethod
    def add(self, texts: List[str], vectors: List[List[float]]) -> List[str]:
        pass
    
    @abstractmethod
    def search(self, vector: List[float], k: int) -> List[Tuple[str, float]]:
        pass

class SimpleChroma(SimpleVectorDB):
    """简化的Chroma实现"""
    
    def __init__(self):
        import chromadb
        self.client = chromadb.Client()
        self.collection = self.client.create_collection("simple")
    
    def add(self, texts: List[str], vectors: List[List[float]]) -> List[str]:
        ids = [str(i) for i in range(len(texts))]
        self.collection.add(
            documents=texts,
            embeddings=vectors,
            ids=ids
        )
        return ids
    
    def search(self, vector: List[float], k: int) -> List[Tuple[str, float]]:
        results = self.collection.query(
            query_embeddings=[vector],
            n_results=k
        )
        
        docs = results['documents'][0]
        distances = results['distances'][0]
        return list(zip(docs, distances))

# 优势:更轻量,性能更好
# 劣势:功能有限,生态支持差

5.2 优化方向

5.2.1 性能优化
class OptimizedVectorStore:
    """性能优化的向量存储"""
    
    def __init__(self, base_store: VectorStore):
        self.base_store = base_store
        self.cache = {}  # 查询缓存
        self.batch_size = 100  # 批处理大小
    
    def add_documents_batch(self, documents: List[Document]) -> List[str]:
        """批量添加文档优化"""
        
        all_ids = []
        
        # 分批处理,避免内存溢出
        for i in range(0, len(documents), self.batch_size):
            batch = documents[i:i + self.batch_size]
            batch_ids = self.base_store.add_documents(batch)
            all_ids.extend(batch_ids)
        
        return all_ids
    
    def similarity_search_cached(
        self, 
        query: str, 
        k: int = 4,
        cache_ttl: int = 300  # 缓存5分钟
    ) -> List[Document]:
        """带缓存的相似性搜索"""
        
        import hashlib
        import time
        
        # 生成缓存键
        cache_key = hashlib.md5(f"{query}_{k}".encode()).hexdigest()
        
        # 检查缓存
        if cache_key in self.cache:
            cached_result, timestamp = self.cache[cache_key]
            if time.time() - timestamp < cache_ttl:
                return cached_result
        
        # 执行搜索
        results = self.base_store.similarity_search(query, k=k)
        
        # 更新缓存
        self.cache[cache_key] = (results, time.time())
        
        return results
5.2.2 功能优化
class EnhancedVectorStore:
    """功能增强的向量存储"""
    
    def __init__(self, base_store: VectorStore):
        self.base_store = base_store
    
    def hybrid_search(
        self,
        query: str,
        k: int = 4,
        alpha: float = 0.7  # 向量搜索权重
    ) -> List[Document]:
        """混合搜索:向量搜索 + 关键词搜索"""
        
        # 向量搜索
        vector_results = self.base_store.similarity_search_with_relevance_scores(
            query, k=k*2
        )
        
        # 关键词搜索(简单实现)
        keyword_results = self._keyword_search(query, k=k*2)
        
        # 融合结果
        combined_results = self._combine_results(
            vector_results, keyword_results, alpha
        )
        
        return combined_results[:k]
    
    def _keyword_search(self, query: str, k: int) -> List[Tuple[Document, float]]:
        """关键词搜索实现"""
        # 简化实现,实际可以使用BM25等算法
        pass
    
    def _combine_results(
        self, 
        vector_results: List[Tuple[Document, float]],
        keyword_results: List[Tuple[Document, float]],
        alpha: float
    ) -> List[Document]:
        """结果融合"""
        # RRF (Reciprocal Rank Fusion) 或其他融合算法
        pass
    
    def multi_vector_search(
        self,
        queries: List[str],
        k: int = 4,
        aggregation: str = "max"  # max, mean, sum
    ) -> List[Document]:
        """多查询向量搜索"""
        
        all_results = {}
        
        for query in queries:
            results = self.base_store.similarity_search_with_relevance_scores(
                query, k=k*len(queries)
            )
            
            for doc, score in results:
                doc_id = doc.metadata.get("id", id(doc))
                if doc_id not in all_results:
                    all_results[doc_id] = {"doc": doc, "scores": []}
                all_results[doc_id]["scores"].append(score)
        
        # 聚合评分
        final_results = []
        for doc_id, data in all_results.items():
            scores = data["scores"]
            if aggregation == "max":
                final_score = max(scores)
            elif aggregation == "mean":
                final_score = sum(scores) / len(scores)
            elif aggregation == "sum":
                final_score = sum(scores)
            
            final_results.append((data["doc"], final_score))
        
        # 排序并返回
        final_results.sort(key=lambda x: x[1], reverse=True)
        return [doc for doc, _ in final_results[:k]]
5.2.3 成本优化
class CostOptimizedVectorStore:
    """成本优化的向量存储"""
    
    def __init__(self, base_store: VectorStore):
        self.base_store = base_store
        self.embedding_cache = {}  # 嵌入缓存
        self.compression_enabled = True
    
    def add_documents_deduplicated(
        self, 
        documents: List[Document]
    ) -> List[str]:
        """去重添加文档,避免重复存储"""
        
        # 计算文档哈希
        unique_docs = {}
        for doc in documents:
            doc_hash = hashlib.md5(doc.page_content.encode()).hexdigest()
            if doc_hash not in unique_docs:
                unique_docs[doc_hash] = doc
        
        # 只添加唯一文档
        unique_doc_list = list(unique_docs.values())
        print(f"去重后文档数量: {len(unique_doc_list)}/{len(documents)}")
        
        return self.base_store.add_documents(unique_doc_list)
    
    def compressed_storage(self, documents: List[Document]) -> List[str]:
        """压缩存储,减少存储成本"""
        
        if not self.compression_enabled:
            return self.base_store.add_documents(documents)
        
        # 压缩文档内容
        compressed_docs = []
        for doc in documents:
            compressed_content = self._compress_text(doc.page_content)
            compressed_doc = Document(
                page_content=compressed_content,
                metadata={**doc.metadata, "compressed": True}
            )
            compressed_docs.append(compressed_doc)
        
        return self.base_store.add_documents(compressed_docs)
    
    def _compress_text(self, text: str) -> str:
        """文本压缩"""
        import gzip
        import base64
        
        compressed = gzip.compress(text.encode('utf-8'))
        return base64.b64encode(compressed).decode('ascii')
    
    def _decompress_text(self, compressed_text: str) -> str:
        """文本解压"""
        import gzip
        import base64
        
        compressed = base64.b64decode(compressed_text.encode('ascii'))
        return gzip.decompress(compressed).decode('utf-8')

5.3 总结

VectorStores的核心价值

  1. 统一抽象:为多种向量数据库提供一致接口
  2. 生态集成:与LangChain其他组件无缝协作
  3. 开发效率:大幅降低RAG应用开发复杂度
  4. 灵活切换:支持不同数据库间的平滑迁移

适用场景

  • 需要快速构建RAG应用的场景
  • 对数据库选择有不确定性的项目
  • 需要与LangChain生态深度集成的应用
  • 团队技术栈相对简单的情况

不适用场景

  • 对性能有极致要求的高并发场景
  • 需要使用数据库特有高级功能的情况
  • 对抽象层开销敏感的应用

通过合理使用VectorStores,开发者可以快速构建功能完整、易于维护的RAG应用,同时保持足够的灵活性来应对未来的技术演进。


网站公告

今日签到

点亮在社区的每一天
去签到