【人工智能】QWEN + RAG系统

发布于:2025-02-10 ⋅ 阅读:(29) ⋅ 点赞:(0)

结合qwen2.5-7b + bge-reranker-base + stella-base-zh-v2 组合成的rag系统

其中测试数据和模型需要自行下载,modelscope就可以。

测试数据链接:Streamlit

# -*- coding: utf-8 -*-

from flask import Flask, request, jsonify
import jieba
import json
import pdfplumber
from rank_bm25 import BM25Okapi
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer, AutoModelForCausalLM
from sentence_transformers import SentenceTransformer
import numpy as np


app = Flask(__name__)

# 加载模型和初始化
device = "cuda" if torch.cuda.is_available() else "cpu"

rerank_tokenizer = AutoTokenizer.from_pretrained('/home/sky/model_data/bge-reranker-base')
rerank_model = AutoModelForSequenceClassification.from_pretrained('/home/sky/model_data/bge-reranker-base').to(device)

model_path = '/home/sky/model_data/Qwen/Qwen2.5-7B-Instruct'
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForCausalLM.from_pretrained(pretrained_model_name_or_path=model_path, torch_dtype="auto",
                                             device_map="auto").to(device)

sent_model = SentenceTransformer('/home/sky/model_data/stella-base-zh-v2')

pdf_content = []


def load_pdf(knowledge_data_path):
    global pdf_content
    pdf = pdfplumber.open(knowledge_data_path)
    pdf_content = []
    for page_idx in range(len(pdf.pages)):
        text = pdf.pages[page_idx].extract_text()
        new_text = split_text_fixed_size(text, chunk_size=100, overlap_size=5)
        for chunk_text in new_text:
            pdf_content.append({
                'page': 'page_' + str(page_idx + 1),
                'content': chunk_text
            })

    # 构建索引
    global pdf_content_words, bm25, pdf_embeddings
    pdf_content_words = [jieba.lcut(x['content']) for x in pdf_content]
    bm25 = BM25Okapi(pdf_content_words)
    pdf_content_sentences = [x['content'] for x in pdf_content]
    pdf_embeddings = sent_model.encode(pdf_content_sentences, normalize_embeddings=True)


def split_text_fixed_size(text, chunk_size, overlap_size):
    new_text = []
    for i in range(0, len(text), chunk_size - overlap_size):
        end = min(i + chunk_size, len(text))
        start = max(0, i - overlap_size)
        new_text.append(text[start:end])
    return new_text


def get_rank_index(max_score_page_idxs_, question_, pdf_content_):
    pairs = [[question_, pdf_content_[idx]['content']] for idx in max_score_page_idxs_]

    inputs = rerank_tokenizer(pairs, padding=True, truncation=True, return_tensors='pt', max_length=512).to(device)
    with torch.no_grad():
        logits = rerank_model(**inputs, return_dict=True).logits.view(-1).float().cpu().numpy()
        # Apply sigmoid to convert logits to probabilities
        scores = 1 / (1 + np.exp(-logits))  # Sigmoid function

    max_score_idx = scores.argmax()
    max_score = scores[max_score_idx]
    index = max_score_page_idxs_[max_score_idx]
    return max_score, index

def qwen_preprocess(tokenizer_, ziliao, question):
    if ziliao:
        messages = [
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": f"帮我结合给定的资料,回答问题。如果问题答案无法从资料中获得,"
                                        f"输出结合给定的资料,无法回答问题. 如果找到答案, 就输出找到的答案, 资料:{ziliao}, 问题:{question}"},
        ]
    else:
        messages = [
            {"role": "system", "content": "你是由智子引擎研发的AI小助手,可以帮助解答各种领域的问题."},
            {"role": "user", "content": f"问题:{question}"},
        ]


    text = tokenizer_.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    model_inputs_ = tokenizer_([text], return_tensors="pt").to(device)

    return model_inputs_


@app.route('/qa', methods=['POST'])
def qa():
    data = request.json
    question = data.get('question', '')

    if not question:
        return jsonify({'error': 'No question provided'}), 400

    # 首先进行BM25检索
    doc_scores = bm25.get_scores(jieba.lcut(question))
    bm25_score_page_idxs = doc_scores.argsort()[-10:][::-1]

    # 再进行语义检索
    question_embedding = sent_model.encode([question], normalize_embeddings=True)
    score = question_embedding @ pdf_embeddings.T
    ste_score_page_idxs = score.argsort()[0][-10:][::-1]

    bm25_score, bm25_index = get_rank_index(bm25_score_page_idxs, question, pdf_content)
    ste_score, ste_index = get_rank_index(ste_score_page_idxs, question, pdf_content)

    max_score_page_idx = ste_index if ste_score >= bm25_score else bm25_index
    # 得分
    final_score = ste_score if ste_score >= bm25_score else bm25_score
    # 设定一个阈值来判断是否找到了有效的答案
    threshold = 0.5
    if final_score < threshold:
        # 如果得分低于阈值,直接用Qwen模型回答
        model_inputs = qwen_preprocess(tokenizer, "", question)  # 不提供参考资料
        generated_ids = model.generate(
            model_inputs.input_ids,
            max_new_tokens=128,
            attention_mask=model_inputs.attention_mask,
            pad_token_id=tokenizer.eos_token_id
        )
        response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
        return jsonify({
            'answer': response,
            'score': float(final_score),
            'reference_page': "N/A"  # 没有参考页面
        })


    model_inputs = qwen_preprocess(tokenizer, pdf_content[max_score_page_idx]['content'], question)

    generated_ids = model.generate(
        model_inputs.input_ids,
        max_new_tokens=128,
        attention_mask=model_inputs.attention_mask,
        pad_token_id=tokenizer.eos_token_id
    )
    generated_ids = [
        output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
    ]

    response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]

    return jsonify({'answer': response,"score": float(final_score),'reference_page': pdf_content[max_score_page_idx]['page']})


if __name__ == '__main__':
    knowledge_data_path = './初赛训练数据集.pdf'
    load_pdf(knowledge_data_path)
    app.run(host='0.0.0.0',port=5201,debug=False)