1.RAG数据流水线示意图
构建RAG系统:涉及的技术链路环节: 文档加载器->文档转换器->文本嵌入模型->向量存储->检索器
2.Retriever是什么
统一接口:标准化检索流程,无论数据来源如何,最终输出Document对象列表。
多源混合检索:支持同时查询向量库、传统数据库和搜索引擎【提高召回率】
与VectorStore的关系:Retriever不直接管理存储,依赖VectorStore(如FAISS、Chroma)实现向量化与检索。
RAG中的角色:作为检索增强生成(RAG)流程的“数据入口”,为生成模型提供精准上下文
有多个实现:VectorStoreRetriever、MultiQueryRetriever、SelfQueryRetriever等
特点
模块化设计:支持插件式扩展,可自定义检索算法(如混合搜索、重排序)。
异步支持:通过async_get_relevant_documents实现高并发场景下的高效检索。
链式调用:可与LangChain的其他组件(如Text Splitters、Memory)无缝集成。
# from langchain_core.retrievers import BaseRetriever
3.similarity search as_retriever 检索实操
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_milvus import Milvus
from langchain_core.documents import Document
# 初始化模型
embeddings = DashScopeEmbeddings(
model="text-embedding-v2", # 第二代通用模型
max_retries=3,
dashscope_api_key="sk-0011111111",
)
document_1 = Document(
page_content="LangChain支持多种数据库集成",
metadata={"source": "langchain.net/doc1"},
)
document_2 = Document(
page_content="Milvus擅长处理向量搜索",
metadata={"source": "langchain.net/doc2"},
)
document_3 = Document(
page_content="我要去学AI课程",
metadata={"source": "langchain.net/doc3"},
)
document_4 = Document(
page_content="今天天气不错",
metadata={"source": "langchain.net/doc4"},
)
documents = [document_1, document_2, document_3, document_4]
vector_store = Milvus.from_documents(
documents=documents,
embedding=embeddings,
collection_name="nnw_retriever_test",
connection_args={"uri": "http://111.11.111.111:19530"},
)
# 默认是 similarity search
retriever = vector_store.as_retriever(search_kwargs={"k": 2})
results = retriever.invoke("如何进行数据库操作")
for result in results:
print(f"内容 {result.page_content} 元数据 {result.metadata}")
4.MultiQueryRetriever 提升召回率
当原始查询不够明确时,或者当文档库中的内容使用不同的术语表达同一概念时
单一查询可能无法有效检案到所有相关内容;
或者用户的问题可能有不同的表达方式,导致的检索结果不理想,
需要从多个角度切入才能找到最相关的文档片段。这种情况下,生成多个变体查询可以提高召回率,确保覆盖更多相关文档。
MultiQueryRetriever
通过生成多个相关查询来增强检索效果,解决单一查询可能不够全面或存在歧义的问题。
原理:
查询扩展技术:通过LLM生成N个相关查询(如改写、扩展、翻译),合并结果去重,生成多个变体查询
双重增强效果:提升召回率(+25%↑)和准确率(+18%↑)的平衡
代码:
from langchain_community.embeddings import DashScopeEmbeddings
# from langchain.vectorstores import Milvus
from langchain_milvus import Milvus
from langchain_openai import ChatOpenAI
from langchain_community.document_loaders import TextLoader
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain_text_splitters import RecursiveCharacterTextSplitter
import logging
# 设置日志记录的基本配置
logging.basicConfig()
# 设置多查询检索器的日志记录级别为INFO
logging.getLogger("langchain.retrievers.multi_query").setLevel(logging.INFO)
# 使用TextLoader加载文本数据
loader = TextLoader("data/qa.txt", encoding="utf-8")
# 加载数据到变量中
data = loader.load()
# 初始化文本分割器,将文本分割成小块
text_splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=10)
# 执行文本分割
splits = text_splitter.split_documents(data)
# 初始化模型
embedding = DashScopeEmbeddings(
model="text-embedding-v2", # 第二代通用模型
max_retries=3,
dashscope_api_key="sk-005c3c2111xxxxx",
)
# 初始化向量数据库
vector_store = Milvus.from_documents(
documents=splits,
embedding=embedding,
collection_name="nnw_mulit_retriever2",
connection_args={"uri": "http://111.11.111.111:19530"},
)
# 定义问题
question = "不知道为啥抽筋了"
# 初始化语言模型
llm = ChatOpenAI(
model_name="qwen-plus",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
api_key="sk-005c111111111111",
temperature=0.7,
)
# 从语言模型中创建多查询检索器
retriever_from_llm = MultiQueryRetriever.from_llm(
retriever=vector_store.as_retriever(), llm=llm
)
# 使用检索器执行问题检索
results = retriever_from_llm.invoke(question)
# 打印检索到的结果数量
len(results)
# 遍历并打印每个检索结果的内容和元数据
for result in results:
print(f"内容 {result.page_content} 元数据 {result.metadata}")
通过打印出来的log可以看到 ,将输入的一个问题,生成了3个不同角度的问题。通过生成多个相关查询来增强检索效果,解决单一查询可能不够全面或存在歧义的问题。
5.RAG综合查询
from langchain_community.document_loaders import WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_milvus import Milvus
from langchain.schema.runnable import RunnablePassthrough
from langchain.prompts import PromptTemplate
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_openai import ChatOpenAI
from langchain_community.document_loaders import TextLoader
# 设置Milvus Collection名称。
COLLECTION_NAME = "nnw_doc_qa_db"
# 使用TextLoader加载文本数据
loader = TextLoader("data/qa.txt", encoding="utf-8")
# 加载数据到变量中
docs = loader.load()
# 初始化RecursiveCharacterTextSplitter,用于切分文档。
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=0)
# 使用LangChain将输入文档按照chunk_size切分。
all_splits = text_splitter.split_documents(docs)
# 初始化DashScopeEmbeddings,设置embedding模型为DashScope的text-embedding-v2。
embeddings = DashScopeEmbeddings(
model="text-embedding-v2", # 第二代通用模型
max_retries=3,
dashscope_api_key="sk-005111111111108",
)
# 创建connection,为阿里云Milvus的访问域名。
connection_args = {"uri": "http://111.11.111.111:19530"}
# 创建Collection。
vector_store = Milvus(
embedding_function=embeddings,
connection_args=connection_args,
collection_name=COLLECTION_NAME,
drop_old=True,
).from_documents(
all_splits,
embedding=embeddings,
collection_name=COLLECTION_NAME,
connection_args=connection_args,
)
# 初始化ChatOpenAI模型。
llm = ChatOpenAI(
model_name="qwen-plus",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
api_key="sk-005c3c25f6dxxxxx",
temperature=0.7,
)
# 将上述相似性检索的结果作为retriever,提出问题输入到LLM之后,获取检索增强之后的回答。
retriever = vector_store.as_retriever()
print("as_retriever", retriever)
# 定义PromptTemplate,用于构建输入给LLM的prompt。
template = """你是AI文档助手,使用以下上下文来回答最后的问题。
如果你不知道答案,就说你不知道,不要试图编造答案。
最多使用10句话,并尽可能简洁地回答。总是在答案的末尾说“谢谢你的提问!”.
{context}
问题: {question}
"""
rag_prompt = PromptTemplate.from_template(template)
# 构建Retrieval-Augmented Generation链。
rag_chain = {"context": retriever, "question": RunnablePassthrough()} | rag_prompt | llm
result = rag_chain.invoke("被宠物抓伤怎么办.")
# 调用rag_chain回答问题。
print("回答", result.content)
6.加入LangSmith查看log
安装依赖:pip install langsmith==0.3.19
然后获取key 然后再加入下面这段内容
import os
import logging
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "lsv2_pt_111111942ca0"
os.environ["LANGSMITH_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGSMITH_PROJECT"] = "nnw_rag-retriever-demo"
logging.basicConfig(level=logging.DEBUG)
RunnableParallel下面的2个是并行的,透传了用户输入 + 检索
根据log得知,最终我们的检索结果将会和输入的问题以及提示词一起 发送给大模型,作为大模型的输入来源 ,然后由大模型进行整理后输出。