从Sonnet到Opus:一次解决RAG知识库流式输出难题的探索

发布于:2025-09-10 ⋅ 阅读:(20) ⋅ 点赞:(0)

又到周末,还得消耗消耗 ➡️ anyrouter 上的Claude资源,万一哪天都不能用了,也是浪费。

2025/9/5,Claude AI 的母公司 Anthropic 发布了一项新政策:即日起,Anthropic将不再对中国控股公司及其海外子公司开放服务。

想起来6月份做的RAG知识库,聊天并不是打字机流式输出。

而是接口完全返回后,一次输出,会给人感觉等待时间过长。

那就优化一下吧!

在这里插入图片描述

@app.post("/chat/stream")
async def chat_with_rag_stream(
    request: schemas.ChatRequest,
    fastapi_req: FastAPIRequest,
    db: Session = Depends(database.get_db),
):
    """
    流式智能问答接口,支持Agent模式和普通RAG模式动态切换。
    """
    query = request.query
    user_ip = fastapi_req.client.host
    session_id = request.session_id
    
    # --- 会话管理 ---
    if session_id:
        if not crud.get_session(db, session_id):
            raise HTTPException(status_code=404, detail="会话不存在")
    else:
        session_id = crud.create_chat_session(db, user_ip=user_ip, title=query[:50]).id
    
    crud.add_chat_message(db, session_id=session_id, role="user", content=query)

    # --- 获取模式并执行 ---
    is_agent_enabled = crud.get_system_config(db, "agent_enabled", "true") == "true"
    chat_history = crud.get_messages_for_agent(db, session_id)
    
    # 根据模式获取配置好的Agent Executor
    agent_executor = get_agent_executor(agent_mode=is_agent_enabled)
    
    async def generate_response():
        try:
            # 使用流式方式调用agent
            async for chunk in agent_executor.astream({
                "input": query,
                "chat_history": chat_history
            }):
                if "actions" in chunk:
                    # Agent正在考虑采取哪些行动
                    for action in chunk["actions"]:
                        yield f"data: {json.dumps({'type': 'action', 'content': action.tool})}\n\n"
                elif "steps" in chunk:
                    # Agent已完成行动并观察结果
                    for step in chunk["steps"]:
                        yield f"data: {json.dumps({'type': 'step', 'content': str(step.observation)[:100] + '...'})}\n\n"
                elif "output" in chunk:
                    # Agent已完成思考,这是最终输出
                    answer = chunk["output"]
                    # 记录AI回答
                    crud.add_chat_message(db, session_id=session_id, role="assistant", content=answer)
                    
                    # 提取源文档
                    source_documents = []
                    if "intermediate_steps" in chunk:
                        for step in chunk["intermediate_steps"]:
                            action, observation = step
                            if action.tool == "knowledge_base_search":
                                search_results = chroma_utils.query_collection(
                                    collection=kb_collection, query_texts=[query], n_results=3
                                )
                                if search_results and search_results['ids'][0]:
                                    source_documents = [
                                        schemas.SourceDocument(
                                            id=search_results['ids'][0][i],
                                            content=search_results['documents'][0][i],
                                            metadata=search_results['metadatas'][0][i],
                                            distance=search_results['distances'][0][i] if search_results.get('distances') and search_results['distances'][0] else None
                                        ) for i in range(len(search_results['ids'][0]))
                                    ]
                                break
                    
                    # 发送最终答案和源文档
                    yield f"data: {json.dumps({'type': 'final', 'content': answer, 'source_documents': [doc.dict() for doc in source_documents], 'session_id': session_id})}\n\n"
        except Exception as e:
            error_msg = f"调用Agent时出错: {e}"
            yield f"data: {json.dumps({'type': 'error', 'content': error_msg})}\n\n"
    
    return StreamingResponse(generate_response(), media_type="text/event-stream")

它改了一版,发现还是不对,接口完全把内容都返回来后,前台才模拟的打字机效果,这样使用体验就会觉得要等很久,用户体验差!

在这里插入图片描述
把这个问题拿去求助了一下ChatGPT,

我用Python+react 实现了一个RAG知识库,为什么始终做不到真正的打字机流式输出的,我不要那种接口完全返回后,再模拟打字的效果。我要真正的流式,就是主流大模型那样的


1. 你目前的问题本质

  • 很多 RAG 实现(尤其是 Python + React 这种自研组合),默认是:

    1. 检索:Python 先查向量数据库,把 context 拼接到 prompt;
    2. 调用模型接口:用完整 prompt 请求一次大模型;
    3. 得到完整响应后再返回给前端;
    4. 前端再用 setInterval 之类的假装“打字”。

这样看上去是流式,实质不是流式,因为服务端是一次性拿到结果


2. 真正的流式输出的原理

主流大模型(OpenAI, Anthropic, Ollama, vLLM, FastChat 等)在 API 层面支持 Server-Sent Events (SSE) 或 WebSocket,能让服务端边生成边推送。

关键点是:

  • Python 后端不要等模型生成完整结果,而是直接逐 token 读取模型的输出。
  • React 前端通过 EventSource 或 WebSocket 接收服务端的逐 token 数据,实时渲染。

3. Python 端实现方式

FastAPI + OpenAI API (或 vLLM) 为例:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import openai

app = FastAPI()

@app.get("/chat")
async def chat(q: str):
    # 构造 prompt,这里加上你的RAG检索结果
    context = retriever_search(q)  # 伪代码
    prompt = f"已知资料:{context}\n\n问题:{q}\n回答:"

    async def generate():
        response = await openai.ChatCompletion.acreate(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            stream=True  # 开启流式
        )
        async for chunk in response:
            if "choices" in chunk:
                delta = chunk["choices"][0]["delta"].get("content", "")
                if delta:
                    yield delta  # 一点点推送给前端

    return StreamingResponse(generate(), media_type="text/event-stream")

这里的关键:

  • stream=True 开启流式;
  • yield 每个 token,StreamingResponse 自动转为 SSE

4. React 前端接收

在 React 里,你不能用 fetch,要用 EventSource

import { useState } from "react";

function Chat() {
  const [msg, setMsg] = useState("");

  const ask = () => {
    const evtSource = new EventSource("/chat?q=你好");
    evtSource.onmessage = (e) => {
      setMsg((prev) => prev + e.data);
    };
    evtSource.onerror = () => {
      evtSource.close();
    };
  };

  return (
    <div>
      <button onClick={ask}>提问</button>
      <div>{msg}</div>
    </div>
  );
}

这样 React 就能实时渲染模型生成的 token,而不是假打字机。


5. 结合 RAG 的注意点

  • 检索部分(向量数据库查询)不需要流式,因为检索是一次性返回的。

  • 真正需要流式的是 大模型推理部分

  • 所以架构是:

    1. 前端发问题;
    2. Python 检索知识 → 拼接 prompt;
    3. 调用大模型 API,开启 stream=True
    4. StreamingResponse 把 token 流推给前端。

把最后的总结提供给claude

在这里插入图片描述
在这里插入图片描述

始终解决不了问题,于是周六晚上就放弃了!

代码也回滚了!


周日早上起来想着要不换opus4试试,贵点就贵点!

执行/model 更换!

在这里插入图片描述

 我已经成功为你的RAG知识库系统实现了真正的流式输出功能:

  后端修改:

  1. 新增了 /chat/stream 接口:使用Server-Sent Events (SSE)协议实现真正的流式响应
  2. 使用AsyncIteratorCallbackHandler:捕获LLM的实时token输出
  3. 流式数据格式:
    - type: 'metadata' - 发送session_id和源文档信息
    - type: 'content' - 发送实时生成的文本片段
    - type: 'done' - 标记流式传输结束
    - type: 'error' - 错误处理

  前端修改:

  1. 使用原生Fetch API替代axios:支持ReadableStream读取
  2. 实时渲染:每接收到新的文本片段立即更新UI
  3. 添加流式光标动画:显示正在输入的视觉效果(闪烁的光标)
  4. 优化用户体验:立即显示空消息框,然后逐字填充内容

  启动方式:

  后端:
  cd backend
  pip install -r ../requirements.txt
  python -m uvicorn main:app --reload --host 0.0.0.0 --port 8005

  前端:
  cd frontend
  npm install
  npm start

  现在系统会提供真正的流式输出体验,而不是等待完整响应后再显示。用户可以看到AI实时生成的每个字符,就像ChatGPT那样的
  打字效果。

只有个光标闪动,内容还是一次输出!继续让它折腾。

● 现在我已经修复了后端的流式实现,使用了正确的 astream 方法来实现真正的流式输出。主要改进:

  1. 使用 streaming_llm.astream() 而不是callback方式,这是LangChain推荐的流式调用方式
  2. 直接处理chunk.content 获取实时生成的token
  3. 添加了Transfer-Encoding: chunked 头部确保分块传输
  4. 添加了更详细的错误追踪

它很快就想起来加日志了

在这里插入图片描述

看了新的控制台输出,我发现了问题:

  1. 数据确实是分批发送的 - 可以看到 "我可以帮助你""解答关于""这个知识库" 等分批接收
  2. 但所有数据在极短时间内(20:59:50.642 到 20:59:50.718)全部接收完 - 只用了不到100毫秒

  这说明问题不在前端,而是后端的延迟没有生效。可能是因为:
  1. 后端服务没有重启,还在使用旧代码
  2. 或者FastAPI的StreamingResponse有缓冲机制

  让我再优化一下后端代码,确保流式效果真正生效:

在这里插入图片描述
终于定位到问题了,opus 还是更聪明!

● 现在重启后端,再测试一次。请观察:
  1. 后端控制台 - 会显示每个chunk的发送时间
  2. 前端控制台 - 查看接收时间

  如果后端显示逐步发送(比如每隔100ms发送一次),但前端还是一次性接收,那可能是:
  1. 代理缓冲问题 - React的开发服务器代理可能在缓冲
  2. 浏览器缓冲 - 浏览器可能在等待更多数据才渲染

  请把两边的日志都发给我看看。

> 3.txt是浏览器显示的,4.txt是后台输出,我这次看到了,是后台彻底输出完,前台才开始显示,而且是一次性显示全部

● 完美!现在我看到问题了:

  从日志可以清楚地看到:
  1. 后端从 1735486099.8554 开始发送,到 1735486103.2935 结束(约3.4秒)
  2. 前端在 21:28:23.294(对应1735486103.294)才开始接收数据

  这说明React开发服务器的代理在缓冲整个响应,等到后端完全发送完毕后才转发给前端!

  这是React开发服务器代理的已知问题。解决方案是绕过代理,直接连接后端:

在这里插入图片描述
测试了一下,agent模式开启状态,还是等待时间过长,接口完全返回完,前端才开始动。

在这里插入图片描述


结论:

  • Agent模式下无法流式输出:
    当 is_agent_enabled 为 True 时,代码使用了 await agent_executor.ainvoke(),这会等待整个Agent执行完成才返回结果 Agent本身需要完整执行所有工具调用后才能得到最终答案,无法做到真正的流式输出

  • 非Agent模式下的流式实现是正确的(速度确实提升了)

如果单纯做内部知识库,那确实没必要使用agent调用工具!

另一个优化方向是目前还没有集成MCP服务进来

等后面有空再试着改一下,不用调用工具这种形式,改用MCP。
两者区别见➡️ MCP 服务设计与 Agent 模式的本质区别


花费情况

opus 确实贵,价格是 sonnet 的5倍!

在这里插入图片描述

一共花费了 $133,Opus 占了 $116

在这里插入图片描述

在这里插入图片描述


附代码

  1. Python后台接口
@app.post("/chat/stream")
async def chat_with_rag_stream(
    request: schemas.ChatRequest,
    fastapi_req: FastAPIRequest,
    db: Session = Depends(database.get_db),
):
    """
    流式智能问答接口,支持Agent模式和普通RAG模式动态切换。
    使用Server-Sent Events (SSE)格式返回流式数据。
    """
    query = request.query
    user_ip = fastapi_req.client.host
    session_id = request.session_id
    
    # --- 会话管理 ---
    if session_id:
        if not crud.get_session(db, session_id):
            raise HTTPException(status_code=404, detail="会话不存在")
    else:
        session_id = crud.create_chat_session(db, user_ip=user_ip, title=query[:50]).id
    
    crud.add_chat_message(db, session_id=session_id, role="user", content=query)

    # --- 获取模式并执行 ---
    is_agent_enabled = crud.get_system_config(db, "agent_enabled", "true") == "true"
    chat_history = crud.get_messages_for_agent(db, session_id)
    
    async def generate():
        try:
            # 检查是否启用Agent模式
            if is_agent_enabled:
                # Agent模式:使用Agent执行器,但不能真正流式(因为Agent需要完整执行)
                agent_executor = get_agent_executor(agent_mode=True)
                
                # 先发送元数据
                yield f"data: {json.dumps({'type': 'metadata', 'session_id': session_id}, ensure_ascii=False)}\n\n"
                
                # 执行Agent
                response = await agent_executor.ainvoke({
                    "input": query,
                    "chat_history": chat_history
                })
                
                answer = response.get("output", "抱歉,我无法回答这个问题。")
                
                # 提取源文档(如果使用了知识库)
                source_documents = []
                if "intermediate_steps" in response:
                    for step in response["intermediate_steps"]:
                        action, observation = step
                        if action.tool == "knowledge_base_search":
                            search_results = chroma_utils.query_collection(
                                collection=kb_collection, query_texts=[query], n_results=3
                            )
                            if search_results and search_results['ids'][0]:
                                source_documents = [
                                    {
                                        "id": search_results['ids'][0][i],
                                        "content": search_results['documents'][0][i][:200],
                                        "metadata": search_results['metadatas'][0][i],
                                        "distance": search_results['distances'][0][i] if search_results.get('distances') else None
                                    } for i in range(min(3, len(search_results['ids'][0])))
                                ]
                            break
                
                # 发送源文档
                if source_documents:
                    yield f"data: {json.dumps({'type': 'source_documents', 'source_documents': source_documents}, ensure_ascii=False)}\n\n"
                
                # 模拟流式输出Agent的回答
                buffer = ""
                for char in answer:
                    buffer += char
                    
                    # 决定何时发送
                    should_send = (
                        len(buffer) >= 10 or  # 累积10个字符
                        char in ',。!?;:、\n' or  # 遇到标点
                        buffer.endswith(' ')  # 或遇到空格
                    )
                    
                    if should_send:
                        yield f"data: {json.dumps({'type': 'content', 'content': buffer}, ensure_ascii=False)}\n\n"
                        buffer = ""
                        await asyncio.sleep(0.1)  # 100ms延迟
                
                # 发送剩余内容
                if buffer:
                    yield f"data: {json.dumps({'type': 'content', 'content': buffer}, ensure_ascii=False)}\n\n"
                
                # 保存到数据库
                crud.add_chat_message(db, session_id=session_id, role="assistant", content=answer)
                
            else:
                # 非Agent模式:使用普通LLM流式输出
                # 先进行知识库搜索获取上下文
                search_results = chroma_utils.query_collection(
                    collection=kb_collection, 
                    query_texts=[query], 
                    n_results=5
                )
                
                context = ""
                source_documents = []
                if search_results and search_results['documents'][0]:
                    # 构建上下文
                    context_docs = search_results['documents'][0][:3]
                    context = "\n\n".join([f"文档{i+1}:\n{doc}" for i, doc in enumerate(context_docs)])
                    
                    # 构建源文档信息
                    source_documents = [
                        {
                            "id": search_results['ids'][0][i],
                            "content": search_results['documents'][0][i][:200],  # 截取前200字符
                            "metadata": search_results['metadatas'][0][i],
                            "distance": search_results['distances'][0][i] if search_results.get('distances') else None
                        } for i in range(min(3, len(search_results['ids'][0])))
                    ]
                
                # 构建提示词
                if context:
                    prompt = f"""你是一个智能助手,请基于以下知识库内容回答用户问题。如果知识库内容不足以回答问题,请结合你的知识进行补充。

知识库内容:
{context}

历史对话:
{chat_history[-5:] if chat_history else '无'}

用户问题:{query}

请用中文回答:"""
                else:
                    prompt = f"""你是一个智能助手,请回答用户的问题。

历史对话:
{chat_history[-5:] if chat_history else '无'}

用户问题:{query}

请用中文回答:"""
                
                # 先发送session_id和源文档信息
                yield f"data: {json.dumps({'type': 'metadata', 'session_id': session_id, 'source_documents': source_documents}, ensure_ascii=False)}\n\n"
            
                # 使用llm_utils中的流式生成函数
                from .llm_utils import get_streaming_tongyi_llm
                
                # 获取流式生成器
                stream_generate = get_streaming_tongyi_llm()
                
                # 收集完整的响应内容用于保存
                full_content = ""
                buffer = ""
                
                import time
                chunk_count = 0
                first_chunk_time = None
                
                print(f"Starting stream at {time.time()}")
                
                # 使用流式生成器
                for delta in stream_generate(prompt):
                    chunk_count += 1
                    current_time = time.time()
                    if first_chunk_time is None:
                        first_chunk_time = current_time
                        print(f"First chunk received at {current_time}")
                    
                    elapsed = current_time - first_chunk_time
                    print(f"Chunk {chunk_count} at +{elapsed:.3f}s: {repr(delta)}")
                    
                    full_content += delta
                    buffer += delta
                    
                    # 决定何时发送
                    should_send = (
                        len(buffer) >= 10 or
                        any(p in buffer for p in ',。!?;:、\n.!?')
                    )
                    
                    if should_send:
                        print(f"  -> Sending buffer: {repr(buffer)}")
                        yield f"data: {json.dumps({'type': 'content', 'content': buffer}, ensure_ascii=False)}\n\n"
                        buffer = ""
                        # 异步等待,让其他任务执行
                        await asyncio.sleep(0.01)
                
                # 发送剩余的buffer内容
                if buffer:
                    print(f"  -> Sending final buffer: {repr(buffer)}")
                    yield f"data: {json.dumps({'type': 'content', 'content': buffer}, ensure_ascii=False)}\n\n"
                
                if chunk_count > 0:
                    print(f"Total chunks: {chunk_count}, Total time: {time.time() - first_chunk_time:.3f}s")
                    print(f"Total content length: {len(full_content)}")
                
                # 保存完整的回答到数据库
                if full_content:
                    crud.add_chat_message(db, session_id=session_id, role="assistant", content=full_content)
            
            # 发送结束信号
            yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n"
            
        except Exception as e:
            import traceback
            print(f"Stream error: {e}")
            print(traceback.format_exc())
            yield f"data: {json.dumps({'type': 'error', 'message': str(e)}, ensure_ascii=False)}\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # 禁用nginx缓冲
            "Transfer-Encoding": "chunked",  # 确保分块传输
        }
    )

  1. 调大模型方法,stream=True 关键
def get_streaming_tongyi_llm():
    """
    获取支持流式输出的通义千问模型。
    使用原生DashScope SDK以确保真正的流式输出。
    """
    from dashscope import Generation
    import dashscope
    
    dashscope.api_key = settings.DASHSCOPE_API_KEY
    
    def stream_generate(prompt):
        """生成器函数,用于流式输出"""
        messages = [
            {"role": "system", "content": "你是一个智能助手。"},
            {"role": "user", "content": prompt}
        ]
        
        responses = Generation.call(
            model='qwen-plus',
            messages=messages,
            result_format='message',
            stream=True,  # 启用流式
            incremental_output=True,  # 增量输出
        )
        
        for response in responses:
            if response.status_code == 200:
                if response.output and 'choices' in response.output:
                    choices = response.output['choices']
                    if choices and len(choices) > 0:
                        message = choices[0].get('message', {})
                        delta = message.get('content', '')
                        if delta:
                            yield delta
    
    return stream_generate 
  1. 聊天发送 react 代码
const handleSend = async () => {
    if (!input.trim()) return;
    const userMsg = { role: 'user', content: input, time: dayjs().format('HH:mm:ss') };
    setMessages((msgs) => [...msgs, userMsg]);
    setInput('');
    setLoading(true);

    // 先添加一个空的bot消息,用于显示流式内容
    const botMsg = { 
      role: 'bot', 
      content: '', 
      time: dayjs().format('HH:mm:ss'), 
      sourceDocs: [],
      isStreaming: true 
    };
    setMessages(msgs => [...msgs, botMsg]);

    try {
      // 根据环境决定使用哪个URL
      // 在开发环境中直接访问后端以避免代理缓冲问题
      const apiUrl = process.env.NODE_ENV === 'development' 
        ? 'http://localhost:8005/chat/stream'  // 直接访问后端
        : '/api/chat/stream';  // 生产环境使用相对路径
      
      const response = await fetch(apiUrl, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        body: JSON.stringify({ 
          query: userMsg.content, 
          session_id: currentSessionId 
        }),
      });

      if (!response.ok) {
        throw new Error('Network response was not ok');
      }

      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let buffer = '';
      let fullContent = '';
      let sessionId = currentSessionId;
      let sourceDocs = [];

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        console.log('Buffer received:', buffer.substring(0, 100)); // 调试:查看收到的数据
        const lines = buffer.split('\n');
        buffer = lines.pop() || '';

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = line.slice(6);
            if (data.trim()) {
              try {
                const parsed = JSON.parse(data);
                
                if (parsed.type === 'metadata') {
                  // 更新session_id和源文档
                  if (parsed.session_id && !currentSessionId) {
                    sessionId = parsed.session_id;
                    setCurrentSessionId(parsed.session_id);
                    fetchSessions();
                  }
                  if (parsed.source_documents) {
                    sourceDocs = parsed.source_documents;
                  }
                } else if (parsed.type === 'source_documents') {
                  // 单独处理源文档类型(Agent模式可能会单独发送)
                  if (parsed.source_documents) {
                    sourceDocs = parsed.source_documents;
                    // 更新最后一条bot消息的源文档
                    setMessages(msgs => {
                      const newMsgs = [...msgs];
                      const lastMsg = newMsgs[newMsgs.length - 1];
                      if (lastMsg && lastMsg.role === 'bot') {
                        lastMsg.sourceDocs = sourceDocs;
                      }
                      return newMsgs;
                    });
                  }
                } else if (parsed.type === 'content') {
                  // 追加内容到最后一条消息
                  fullContent += parsed.content;
                  console.log('Received content:', parsed.content); // 调试日志
                  setMessages(msgs => {
                    const newMsgs = [...msgs];
                    const lastMsg = newMsgs[newMsgs.length - 1];
                    if (lastMsg && lastMsg.role === 'bot') {
                      lastMsg.content = fullContent;
                      lastMsg.sourceDocs = sourceDocs;
                    }
                    return newMsgs;
                  });
                } else if (parsed.type === 'done') {
                  // 流式结束,标记消息为非流式状态
                  setMessages(msgs => {
                    const newMsgs = [...msgs];
                    const lastMsg = newMsgs[newMsgs.length - 1];
                    if (lastMsg && lastMsg.role === 'bot') {
                      lastMsg.isStreaming = false;
                    }
                    return newMsgs;
                  });
                } else if (parsed.type === 'error') {
                  throw new Error(parsed.message || '流式响应出错');
                }
              } catch (e) {
                console.error('解析SSE数据失败:', e);
              }
            }
          }
        }
      }
    } catch (err) {
      console.error('Stream error:', err);
      antdMessage.error('问答失败,请稍后重试');
      // 移除空的bot消息
      setMessages(msgs => msgs.slice(0, -1));
    } finally {
      setLoading(false);
    }
  };

由于是本地调试,它直接这样绕过跨域了!

# --- CORS Middleware ---
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 开发环境允许所有来源
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
    expose_headers=["*"],  # 暴露所有响应头
)

部署到服务器上➡️ 见这篇文章


至此,问题结束!

在这里插入图片描述

它提了几个 Agent模式流式输出改进方案,我懒得试了。
  基于您的需求,我设计了以下几种改进Agent模式流式输出的方案:

  方案1:Agent步骤实时流式输出(推荐)

  在Agent执行过程中实时流式输出每个步骤的状态和结果:

  1. 工具调用提示:当Agent决定调用某个工具时,立即流式输出"正在搜索知识库...""正在进行网络搜索..."等状态信息
  2. 工具结果摘要:工具执行完成后,流式输出简化的结果摘要
  3. 最终答案流式:Agent推理完成后,真正流式输出最终答案

  方案2:并行Agent执行与预流式输出

  1. 立即开始流式:接收到问题后立即开始流式输出分析过程
  2. 并行执行:在后台异步执行Agent,同时在前端显示分析步骤
  3. 无缝切换:Agent完成后无缝切换到真实答案的流式输出

  方案3:混合模式智能切换

  根据问题复杂度自动选择:
  - 简单问题:直接使用非Agent模式的真实流式输出
  - 复杂问题:使用改进的Agent模式流式输出

网站公告

今日签到

点亮在社区的每一天
去签到