本文通过整合前面4篇博文,包括文本读取、切片、向量化及向量数据库等技术,实现了一个完整的简单的RAG应用案例,回顾加强前面的知识。
本文对话模型使用智谱GLM-4, embedding模型使用智谱embedding-3
一、数据加载
案例使用网页数据百度百科黑神话悟空,地址:https://baike.baidu.com/item/%E9%BB%91%E7%A5%9E%E8%AF%9D%EF%BC%9A%E6%82%9F%E7%A9%BA/53303078
import json
import bs4
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.embeddings import ZhipuAIEmbeddings
from langchain_openai import ChatOpenAI
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pymilvus import MilvusClient, FieldSchema, DataType, Function, FunctionType, CollectionSchema
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
# 加载文档
loader = WebBaseLoader(
web_paths=("https://baike.baidu.com/item/%E9%BB%91%E7%A5%9E%E8%AF%9D%EF%BC%9A%E6%82%9F%E7%A9%BA/53303078",),
bs_kwargs=dict(
parse_only=bs4.SoupStrainer(
class_=("mainContent_XnadV",)
)
)
)
docs = loader.load()
print(f"文档数量:{len(docs)}")
print(f"文档内容:{docs[0].page_content}")
print(f"文档元数据:{docs[0].metadata}")
二、数据切片
# 文档切片
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=800,
chunk_overlap=200
)
splits_doc = text_splitter.split_documents(docs)
print(f"文档切片数量:{len(splits_doc)}")
print(f"文档切片内容:{splits_doc[0].page_content}")
print(f"文档切片元数据:{splits_doc[0].metadata}")
chunk_size=800:
这个参数定义了每个文本块的最大长度为800个字符
文本分割器会尝试将文档分割成不超过800个字符的块
注意这是字符数而不是字节数,对于中文文本,一个汉字算作一个字符
chunk_overlap=200:
这个参数定义了相邻文本块之间的重叠部分为200个字符
重叠的目的是避免在分割点处丢失上下文信息
例如,如果第一个块是字符0-800,第二个块会从字符600开始(800-200=600)到140
三、embedding
#embedding模型
embedding_generator = ZhipuAIEmbeddings(
api_key=ZHIPU_API_KEY,
model="embedding-3",
dimensions=1536
)
texts = [document.page_content for document in splits_doc]
# 生成向量
embeddings = embedding_generator.embed_documents(texts)
四、导入到向量数据库
#向量数据库
client = MilvusClient(uri='http://127.0.0.1:19530')
# 删除已存在的集合(如果存在)
collection_name = "rag_collection"
if client.has_collection(collection_name):
client.drop_collection(collection_name)
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
# 这里使用结巴分词
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=2500, enable_analyzer=True,
analyzer_params={"tokenizer": "jieba", "filter": ["cnalphanumonly"]}),
FieldSchema(name="metadata", dtype=DataType.VARCHAR, max_length=500),
# 向量维度 我这里使用openai的embedding生成的向量默认维度是1536
# embedding_llm1.embed_documents(titles)
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1536),
# 稀疏向量字段
FieldSchema(name="sparse_vector", dtype=DataType.SPARSE_FLOAT_VECTOR)
]
bm25_function = Function(
name="bm25",
input_field_names=["text"],
output_field_names=["sparse_vector"],
function_type=FunctionType.BM25,
)
schema = CollectionSchema(fields=fields, functions=[bm25_function], description="文档集合")
client.create_collection(
collection_name=collection_name,
schema=schema,
primary_key="id",
consistency_level="Strong", # 一致性级别
enable_dynamic_field=True, # 动态字段 如果设为 True,Milvus 将自动接受未在 schema 中定义的新字段数据,并将其存储为 JSON 格式。
auto_id=True # 自动生成ID。
)
insert_data = []
for i in range(len(splits_doc)):
insert_data.append({
"text": splits_doc[i].page_content,
"metadata": json.dumps(splits_doc[i].metadata, ensure_ascii=False),#元数据从dict转成json
"vector": embeddings[i],
})
res = client.insert(collection_name=collection_name, data=insert_data)
print(res)
# 创建索引参数对象
index_params = client.prepare_index_params()
# 添加一个索引,指定字段名称、索引类型、度量类型和参数
index_params.add_index(
field_name="vector",
index_type="HNSW",
metric_type="IP",
params={"M": 16, "efConstruction": 64}
)
# 为 sparse_vector 字段添加 SPARSE_INVERTED_INDEX 索引
index_params.add_index(
field_name="sparse_vector",
index_type="SPARSE_INVERTED_INDEX",
metric_type="BM25",
params={
"inverted_index_algo": "DAAT_MAXSCORE",
"bm25_k1": 1.2,
"bm25_b": 0.75
}
)
# 创建索引
client.create_index(collection_name, index_params)
五、检索向量数据库
这里我直接使用向量检索也可以混合检索,实现可参考:https://blog.csdn.net/sunqingzhong44/article/details/148929075?spm=1001.2014.3001.5502
client.load_collection(COLLECTION_NAME)
def search_milvus(query):
"""在Milvus中搜索相关文档"""
# 生成查询向量
query_vector = embedding_generator.embed_query(query)
# 执行搜索
results = client.search(
collection_name=COLLECTION_NAME,
data=[query_vector],
anns_field="vector",
output_fields=["text", "metadata"],
limit=3
)
# 提取文本内容
texts = []
for result in results[0]: # 第一个查询的结果
texts.append(result['entity']['text'])
return texts
print(search_milvus("黑神话悟空的社会影响"))
六、整合大模型
#大模型
chat = ChatOpenAI(
api_key=ZHIPU_API_KEY,
base_url=ZHIPU_BASE_URL,
model_name="GLM-4",
temperature=0.5
)
# 提示模板
prompt = hub.pull("rlm/rag-prompt")
#检索结果整合到一起
def format_docs(docs):
"""将文档列表格式化为字符串"""
return "\n\n".join(docs)
# 构建RAG链
rag_chain = (
{"context": lambda x: format_docs(search_milvus(x)), "question": RunnablePassthrough()}
| prompt
| chat
| StrOutputParser()
)
七、测试验证
# 使用RAG链回答问题
response = rag_chain.invoke("黑神话悟空的社会影响")
print(response)
输出如下:
《黑神话:悟空》在2024年12月入选中国国际传播热词和十大网络用语,2025年1月写入浙江省政府工作报告,显示出其对中国文化及游戏产业的社会影响。此外,该游戏在国内外市场取得巨大成功,销量和收入均创下纪录。
和百度百科基本一致:
# 使用RAG链回答问题
response = rag_chain.invoke("黑神话悟空有哪些角色")
print(response)
输出如下:
《黑神话:悟空》的主要角色包括主角“天命人”和若干其他角色。
这个问题回答的不是很好,后面接入知识图谱,我们可以再试一下