RAG-5-案例1

发布于:2025-09-09 ⋅ 阅读:(21) ⋅ 点赞:(0)

          本文通过整合前面4篇博文,包括文本读取、切片、向量化及向量数据库等技术,实现了一个完整的简单的RAG应用案例,回顾加强前面的知识。

        本文对话模型使用智谱GLM-4, embedding模型使用智谱embedding-3

一、数据加载

案例使用网页数据百度百科黑神话悟空,地址:https://baike.baidu.com/item/%E9%BB%91%E7%A5%9E%E8%AF%9D%EF%BC%9A%E6%82%9F%E7%A9%BA/53303078

import json

import bs4
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.embeddings import ZhipuAIEmbeddings
from langchain_openai import ChatOpenAI
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pymilvus import MilvusClient, FieldSchema, DataType, Function, FunctionType, CollectionSchema
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI


# 加载文档
loader = WebBaseLoader(
    web_paths=("https://baike.baidu.com/item/%E9%BB%91%E7%A5%9E%E8%AF%9D%EF%BC%9A%E6%82%9F%E7%A9%BA/53303078",),
    bs_kwargs=dict(
        parse_only=bs4.SoupStrainer(
            class_=("mainContent_XnadV",)
        )
    )
)

docs = loader.load()
print(f"文档数量:{len(docs)}")
print(f"文档内容:{docs[0].page_content}")
print(f"文档元数据:{docs[0].metadata}")

二、数据切片

# 文档切片
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=800,
    chunk_overlap=200
)
splits_doc = text_splitter.split_documents(docs)
print(f"文档切片数量:{len(splits_doc)}")
print(f"文档切片内容:{splits_doc[0].page_content}")
print(f"文档切片元数据:{splits_doc[0].metadata}")

chunk_size=800:
这个参数定义了每个文本块的最大长度为800个字符
文本分割器会尝试将文档分割成不超过800个字符的块
注意这是字符数而不是字节数,对于中文文本,一个汉字算作一个字符
chunk_overlap=200:
这个参数定义了相邻文本块之间的重叠部分为200个字符
重叠的目的是避免在分割点处丢失上下文信息
例如,如果第一个块是字符0-800,第二个块会从字符600开始(800-200=600)到140

三、embedding

#embedding模型
embedding_generator = ZhipuAIEmbeddings(
    api_key=ZHIPU_API_KEY,
    model="embedding-3",
    dimensions=1536
)

texts = [document.page_content for document in splits_doc]
# 生成向量
embeddings = embedding_generator.embed_documents(texts)

四、导入到向量数据库

#向量数据库
client = MilvusClient(uri='http://127.0.0.1:19530')

# 删除已存在的集合(如果存在)
collection_name = "rag_collection"
if client.has_collection(collection_name):
    client.drop_collection(collection_name)


fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    # 这里使用结巴分词
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=2500, enable_analyzer=True,
                analyzer_params={"tokenizer": "jieba", "filter": ["cnalphanumonly"]}),
    FieldSchema(name="metadata", dtype=DataType.VARCHAR, max_length=500),
    # 向量维度 我这里使用openai的embedding生成的向量默认维度是1536
    # embedding_llm1.embed_documents(titles)
    FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1536),
    # 稀疏向量字段
    FieldSchema(name="sparse_vector", dtype=DataType.SPARSE_FLOAT_VECTOR)
]

bm25_function = Function(
    name="bm25",
    input_field_names=["text"],
    output_field_names=["sparse_vector"],
    function_type=FunctionType.BM25,
)
schema = CollectionSchema(fields=fields, functions=[bm25_function], description="文档集合")

client.create_collection(
    collection_name=collection_name,
    schema=schema,
    primary_key="id",
    consistency_level="Strong",  # 一致性级别
    enable_dynamic_field=True,  # 动态字段 如果设为 True,Milvus 将自动接受未在 schema 中定义的新字段数据,并将其存储为 JSON 格式。
    auto_id=True  # 自动生成ID。
)

insert_data = []

for i in range(len(splits_doc)):
    insert_data.append({
        "text": splits_doc[i].page_content,
        "metadata": json.dumps(splits_doc[i].metadata, ensure_ascii=False),#元数据从dict转成json
        "vector": embeddings[i],
    })
res = client.insert(collection_name=collection_name, data=insert_data)
print(res)

# 创建索引参数对象
index_params = client.prepare_index_params()

# 添加一个索引,指定字段名称、索引类型、度量类型和参数
index_params.add_index(
    field_name="vector",
    index_type="HNSW",
    metric_type="IP",
    params={"M": 16, "efConstruction": 64}
)

# 为 sparse_vector 字段添加 SPARSE_INVERTED_INDEX 索引
index_params.add_index(
    field_name="sparse_vector",
    index_type="SPARSE_INVERTED_INDEX",
    metric_type="BM25",
    params={
        "inverted_index_algo": "DAAT_MAXSCORE",
        "bm25_k1": 1.2,
        "bm25_b": 0.75
    }

)

# 创建索引
client.create_index(collection_name, index_params)

五、检索向量数据库

这里我直接使用向量检索也可以混合检索,实现可参考:https://blog.csdn.net/sunqingzhong44/article/details/148929075?spm=1001.2014.3001.5502

client.load_collection(COLLECTION_NAME)

def search_milvus(query):
    """在Milvus中搜索相关文档"""
    # 生成查询向量
    query_vector = embedding_generator.embed_query(query)

    # 执行搜索
    results = client.search(
        collection_name=COLLECTION_NAME,
        data=[query_vector],
        anns_field="vector",
        output_fields=["text", "metadata"],
        limit=3
    )

    # 提取文本内容
    texts = []
    for result in results[0]:  # 第一个查询的结果
        texts.append(result['entity']['text'])

    return texts

print(search_milvus("黑神话悟空的社会影响"))

六、整合大模型

#大模型
chat = ChatOpenAI(
    api_key=ZHIPU_API_KEY,
    base_url=ZHIPU_BASE_URL,
    model_name="GLM-4",
    temperature=0.5
)
# 提示模板
prompt = hub.pull("rlm/rag-prompt")

#检索结果整合到一起
def format_docs(docs):
    """将文档列表格式化为字符串"""
    return "\n\n".join(docs)


# 构建RAG链
rag_chain = (
    {"context": lambda x: format_docs(search_milvus(x)), "question": RunnablePassthrough()}
    | prompt
    | chat
    | StrOutputParser()
)

七、测试验证

# 使用RAG链回答问题
response = rag_chain.invoke("黑神话悟空的社会影响")
print(response)

输出如下:

《黑神话:悟空》在2024年12月入选中国国际传播热词和十大网络用语,2025年1月写入浙江省政府工作报告,显示出其对中国文化及游戏产业的社会影响。此外,该游戏在国内外市场取得巨大成功,销量和收入均创下纪录。

和百度百科基本一致:

# 使用RAG链回答问题
response = rag_chain.invoke("黑神话悟空有哪些角色")
print(response)

输出如下:

《黑神话:悟空》的主要角色包括主角“天命人”和若干其他角色。

这个问题回答的不是很好,后面接入知识图谱,我们可以再试一下


网站公告

今日签到

点亮在社区的每一天
去签到