又到周末,还得消耗消耗 ➡️ anyrouter 上的Claude资源,万一哪天都不能用了,也是浪费。
2025/9/5,Claude AI 的母公司 Anthropic 发布了一项新政策:即日起,Anthropic将不再对中国控股公司及其海外子公司开放服务。
想起来6月份做的RAG知识库,聊天并不是打字机流式输出。
而是接口完全返回后,一次输出,会给人感觉等待时间过长。
那就优化一下吧!
@app.post("/chat/stream")
async def chat_with_rag_stream(
request: schemas.ChatRequest,
fastapi_req: FastAPIRequest,
db: Session = Depends(database.get_db),
):
"""
流式智能问答接口,支持Agent模式和普通RAG模式动态切换。
"""
query = request.query
user_ip = fastapi_req.client.host
session_id = request.session_id
# --- 会话管理 ---
if session_id:
if not crud.get_session(db, session_id):
raise HTTPException(status_code=404, detail="会话不存在")
else:
session_id = crud.create_chat_session(db, user_ip=user_ip, title=query[:50]).id
crud.add_chat_message(db, session_id=session_id, role="user", content=query)
# --- 获取模式并执行 ---
is_agent_enabled = crud.get_system_config(db, "agent_enabled", "true") == "true"
chat_history = crud.get_messages_for_agent(db, session_id)
# 根据模式获取配置好的Agent Executor
agent_executor = get_agent_executor(agent_mode=is_agent_enabled)
async def generate_response():
try:
# 使用流式方式调用agent
async for chunk in agent_executor.astream({
"input": query,
"chat_history": chat_history
}):
if "actions" in chunk:
# Agent正在考虑采取哪些行动
for action in chunk["actions"]:
yield f"data: {json.dumps({'type': 'action', 'content': action.tool})}\n\n"
elif "steps" in chunk:
# Agent已完成行动并观察结果
for step in chunk["steps"]:
yield f"data: {json.dumps({'type': 'step', 'content': str(step.observation)[:100] + '...'})}\n\n"
elif "output" in chunk:
# Agent已完成思考,这是最终输出
answer = chunk["output"]
# 记录AI回答
crud.add_chat_message(db, session_id=session_id, role="assistant", content=answer)
# 提取源文档
source_documents = []
if "intermediate_steps" in chunk:
for step in chunk["intermediate_steps"]:
action, observation = step
if action.tool == "knowledge_base_search":
search_results = chroma_utils.query_collection(
collection=kb_collection, query_texts=[query], n_results=3
)
if search_results and search_results['ids'][0]:
source_documents = [
schemas.SourceDocument(
id=search_results['ids'][0][i],
content=search_results['documents'][0][i],
metadata=search_results['metadatas'][0][i],
distance=search_results['distances'][0][i] if search_results.get('distances') and search_results['distances'][0] else None
) for i in range(len(search_results['ids'][0]))
]
break
# 发送最终答案和源文档
yield f"data: {json.dumps({'type': 'final', 'content': answer, 'source_documents': [doc.dict() for doc in source_documents], 'session_id': session_id})}\n\n"
except Exception as e:
error_msg = f"调用Agent时出错: {e}"
yield f"data: {json.dumps({'type': 'error', 'content': error_msg})}\n\n"
return StreamingResponse(generate_response(), media_type="text/event-stream")
它改了一版,发现还是不对,接口完全把内容都返回来后,前台才模拟的打字机效果,这样使用体验就会觉得要等很久,用户体验差!
把这个问题拿去求助了一下ChatGPT,
我用Python+react 实现了一个RAG知识库,为什么始终做不到真正的打字机流式输出的,我不要那种接口完全返回后,再模拟打字的效果。我要真正的流式,就是主流大模型那样的
1. 你目前的问题本质
很多 RAG 实现(尤其是 Python + React 这种自研组合),默认是:
- 检索:Python 先查向量数据库,把 context 拼接到 prompt;
- 调用模型接口:用完整 prompt 请求一次大模型;
- 得到完整响应后再返回给前端;
- 前端再用
setInterval
之类的假装“打字”。
这样看上去是流式,实质不是流式,因为服务端是一次性拿到结果。
2. 真正的流式输出的原理
主流大模型(OpenAI, Anthropic, Ollama, vLLM, FastChat 等)在 API 层面支持 Server-Sent Events (SSE) 或 WebSocket,能让服务端边生成边推送。
关键点是:
- Python 后端不要等模型生成完整结果,而是直接逐 token 读取模型的输出。
- React 前端通过
EventSource
或 WebSocket 接收服务端的逐 token 数据,实时渲染。
3. Python 端实现方式
以 FastAPI + OpenAI API (或 vLLM) 为例:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import openai
app = FastAPI()
@app.get("/chat")
async def chat(q: str):
# 构造 prompt,这里加上你的RAG检索结果
context = retriever_search(q) # 伪代码
prompt = f"已知资料:{context}\n\n问题:{q}\n回答:"
async def generate():
response = await openai.ChatCompletion.acreate(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
stream=True # 开启流式
)
async for chunk in response:
if "choices" in chunk:
delta = chunk["choices"][0]["delta"].get("content", "")
if delta:
yield delta # 一点点推送给前端
return StreamingResponse(generate(), media_type="text/event-stream")
这里的关键:
stream=True
开启流式;yield
每个 token,StreamingResponse 自动转为 SSE。
4. React 前端接收
在 React 里,你不能用 fetch
,要用 EventSource
:
import { useState } from "react";
function Chat() {
const [msg, setMsg] = useState("");
const ask = () => {
const evtSource = new EventSource("/chat?q=你好");
evtSource.onmessage = (e) => {
setMsg((prev) => prev + e.data);
};
evtSource.onerror = () => {
evtSource.close();
};
};
return (
<div>
<button onClick={ask}>提问</button>
<div>{msg}</div>
</div>
);
}
这样 React 就能实时渲染模型生成的 token,而不是假打字机。
5. 结合 RAG 的注意点
检索部分(向量数据库查询)不需要流式,因为检索是一次性返回的。
真正需要流式的是 大模型推理部分。
所以架构是:
- 前端发问题;
- Python 检索知识 → 拼接 prompt;
- 调用大模型 API,开启
stream=True
; StreamingResponse
把 token 流推给前端。
把最后的总结提供给claude
始终解决不了问题,于是周六晚上就放弃了!
代码也回滚了!
周日早上起来想着要不换opus4试试,贵点就贵点!
执行/model 更换!
我已经成功为你的RAG知识库系统实现了真正的流式输出功能:
后端修改:
1. 新增了 /chat/stream 接口:使用Server-Sent Events (SSE)协议实现真正的流式响应
2. 使用AsyncIteratorCallbackHandler:捕获LLM的实时token输出
3. 流式数据格式:
- type: 'metadata' - 发送session_id和源文档信息
- type: 'content' - 发送实时生成的文本片段
- type: 'done' - 标记流式传输结束
- type: 'error' - 错误处理
前端修改:
1. 使用原生Fetch API替代axios:支持ReadableStream读取
2. 实时渲染:每接收到新的文本片段立即更新UI
3. 添加流式光标动画:显示正在输入的视觉效果(闪烁的光标)
4. 优化用户体验:立即显示空消息框,然后逐字填充内容
启动方式:
后端:
cd backend
pip install -r ../requirements.txt
python -m uvicorn main:app --reload --host 0.0.0.0 --port 8005
前端:
cd frontend
npm install
npm start
现在系统会提供真正的流式输出体验,而不是等待完整响应后再显示。用户可以看到AI实时生成的每个字符,就像ChatGPT那样的
打字效果。
只有个光标闪动,内容还是一次输出!继续让它折腾。
● 现在我已经修复了后端的流式实现,使用了正确的 astream 方法来实现真正的流式输出。主要改进:
1. 使用 streaming_llm.astream() 而不是callback方式,这是LangChain推荐的流式调用方式
2. 直接处理chunk.content 获取实时生成的token
3. 添加了Transfer-Encoding: chunked 头部确保分块传输
4. 添加了更详细的错误追踪
它很快就想起来加日志了
看了新的控制台输出,我发现了问题:
1. 数据确实是分批发送的 - 可以看到 "我可以帮助你"、"解答关于"、"这个知识库" 等分批接收
2. 但所有数据在极短时间内(20:59:50.642 到 20:59:50.718)全部接收完 - 只用了不到100毫秒
这说明问题不在前端,而是后端的延迟没有生效。可能是因为:
1. 后端服务没有重启,还在使用旧代码
2. 或者FastAPI的StreamingResponse有缓冲机制
让我再优化一下后端代码,确保流式效果真正生效:
终于定位到问题了,opus 还是更聪明!
● 现在重启后端,再测试一次。请观察:
1. 后端控制台 - 会显示每个chunk的发送时间
2. 前端控制台 - 查看接收时间
如果后端显示逐步发送(比如每隔100ms发送一次),但前端还是一次性接收,那可能是:
1. 代理缓冲问题 - React的开发服务器代理可能在缓冲
2. 浏览器缓冲 - 浏览器可能在等待更多数据才渲染
请把两边的日志都发给我看看。
> 3.txt是浏览器显示的,4.txt是后台输出,我这次看到了,是后台彻底输出完,前台才开始显示,而且是一次性显示全部
● 完美!现在我看到问题了:
从日志可以清楚地看到:
1. 后端从 1735486099.8554 开始发送,到 1735486103.2935 结束(约3.4秒)
2. 前端在 21:28:23.294(对应1735486103.294)才开始接收数据
这说明React开发服务器的代理在缓冲整个响应,等到后端完全发送完毕后才转发给前端!
这是React开发服务器代理的已知问题。解决方案是绕过代理,直接连接后端:
测试了一下,agent模式开启状态,还是等待时间过长,接口完全返回完,前端才开始动。
结论:
Agent模式下无法流式输出:
当 is_agent_enabled 为 True 时,代码使用了 await agent_executor.ainvoke(),这会等待整个Agent执行完成才返回结果 Agent本身需要完整执行所有工具调用后才能得到最终答案,无法做到真正的流式输出非Agent模式下的流式实现是正确的(速度确实提升了)
如果单纯做内部知识库,那确实没必要使用agent调用工具!
另一个优化方向是目前还没有集成MCP服务进来
等后面有空再试着改一下,不用调用工具这种形式,改用MCP。
两者区别见➡️ MCP 服务设计与 Agent 模式的本质区别
花费情况
opus 确实贵,价格是 sonnet 的5倍!
一共花费了 $133,Opus 占了 $116
附代码
- Python后台接口
@app.post("/chat/stream")
async def chat_with_rag_stream(
request: schemas.ChatRequest,
fastapi_req: FastAPIRequest,
db: Session = Depends(database.get_db),
):
"""
流式智能问答接口,支持Agent模式和普通RAG模式动态切换。
使用Server-Sent Events (SSE)格式返回流式数据。
"""
query = request.query
user_ip = fastapi_req.client.host
session_id = request.session_id
# --- 会话管理 ---
if session_id:
if not crud.get_session(db, session_id):
raise HTTPException(status_code=404, detail="会话不存在")
else:
session_id = crud.create_chat_session(db, user_ip=user_ip, title=query[:50]).id
crud.add_chat_message(db, session_id=session_id, role="user", content=query)
# --- 获取模式并执行 ---
is_agent_enabled = crud.get_system_config(db, "agent_enabled", "true") == "true"
chat_history = crud.get_messages_for_agent(db, session_id)
async def generate():
try:
# 检查是否启用Agent模式
if is_agent_enabled:
# Agent模式:使用Agent执行器,但不能真正流式(因为Agent需要完整执行)
agent_executor = get_agent_executor(agent_mode=True)
# 先发送元数据
yield f"data: {json.dumps({'type': 'metadata', 'session_id': session_id}, ensure_ascii=False)}\n\n"
# 执行Agent
response = await agent_executor.ainvoke({
"input": query,
"chat_history": chat_history
})
answer = response.get("output", "抱歉,我无法回答这个问题。")
# 提取源文档(如果使用了知识库)
source_documents = []
if "intermediate_steps" in response:
for step in response["intermediate_steps"]:
action, observation = step
if action.tool == "knowledge_base_search":
search_results = chroma_utils.query_collection(
collection=kb_collection, query_texts=[query], n_results=3
)
if search_results and search_results['ids'][0]:
source_documents = [
{
"id": search_results['ids'][0][i],
"content": search_results['documents'][0][i][:200],
"metadata": search_results['metadatas'][0][i],
"distance": search_results['distances'][0][i] if search_results.get('distances') else None
} for i in range(min(3, len(search_results['ids'][0])))
]
break
# 发送源文档
if source_documents:
yield f"data: {json.dumps({'type': 'source_documents', 'source_documents': source_documents}, ensure_ascii=False)}\n\n"
# 模拟流式输出Agent的回答
buffer = ""
for char in answer:
buffer += char
# 决定何时发送
should_send = (
len(buffer) >= 10 or # 累积10个字符
char in ',。!?;:、\n' or # 遇到标点
buffer.endswith(' ') # 或遇到空格
)
if should_send:
yield f"data: {json.dumps({'type': 'content', 'content': buffer}, ensure_ascii=False)}\n\n"
buffer = ""
await asyncio.sleep(0.1) # 100ms延迟
# 发送剩余内容
if buffer:
yield f"data: {json.dumps({'type': 'content', 'content': buffer}, ensure_ascii=False)}\n\n"
# 保存到数据库
crud.add_chat_message(db, session_id=session_id, role="assistant", content=answer)
else:
# 非Agent模式:使用普通LLM流式输出
# 先进行知识库搜索获取上下文
search_results = chroma_utils.query_collection(
collection=kb_collection,
query_texts=[query],
n_results=5
)
context = ""
source_documents = []
if search_results and search_results['documents'][0]:
# 构建上下文
context_docs = search_results['documents'][0][:3]
context = "\n\n".join([f"文档{i+1}:\n{doc}" for i, doc in enumerate(context_docs)])
# 构建源文档信息
source_documents = [
{
"id": search_results['ids'][0][i],
"content": search_results['documents'][0][i][:200], # 截取前200字符
"metadata": search_results['metadatas'][0][i],
"distance": search_results['distances'][0][i] if search_results.get('distances') else None
} for i in range(min(3, len(search_results['ids'][0])))
]
# 构建提示词
if context:
prompt = f"""你是一个智能助手,请基于以下知识库内容回答用户问题。如果知识库内容不足以回答问题,请结合你的知识进行补充。
知识库内容:
{context}
历史对话:
{chat_history[-5:] if chat_history else '无'}
用户问题:{query}
请用中文回答:"""
else:
prompt = f"""你是一个智能助手,请回答用户的问题。
历史对话:
{chat_history[-5:] if chat_history else '无'}
用户问题:{query}
请用中文回答:"""
# 先发送session_id和源文档信息
yield f"data: {json.dumps({'type': 'metadata', 'session_id': session_id, 'source_documents': source_documents}, ensure_ascii=False)}\n\n"
# 使用llm_utils中的流式生成函数
from .llm_utils import get_streaming_tongyi_llm
# 获取流式生成器
stream_generate = get_streaming_tongyi_llm()
# 收集完整的响应内容用于保存
full_content = ""
buffer = ""
import time
chunk_count = 0
first_chunk_time = None
print(f"Starting stream at {time.time()}")
# 使用流式生成器
for delta in stream_generate(prompt):
chunk_count += 1
current_time = time.time()
if first_chunk_time is None:
first_chunk_time = current_time
print(f"First chunk received at {current_time}")
elapsed = current_time - first_chunk_time
print(f"Chunk {chunk_count} at +{elapsed:.3f}s: {repr(delta)}")
full_content += delta
buffer += delta
# 决定何时发送
should_send = (
len(buffer) >= 10 or
any(p in buffer for p in ',。!?;:、\n.!?')
)
if should_send:
print(f" -> Sending buffer: {repr(buffer)}")
yield f"data: {json.dumps({'type': 'content', 'content': buffer}, ensure_ascii=False)}\n\n"
buffer = ""
# 异步等待,让其他任务执行
await asyncio.sleep(0.01)
# 发送剩余的buffer内容
if buffer:
print(f" -> Sending final buffer: {repr(buffer)}")
yield f"data: {json.dumps({'type': 'content', 'content': buffer}, ensure_ascii=False)}\n\n"
if chunk_count > 0:
print(f"Total chunks: {chunk_count}, Total time: {time.time() - first_chunk_time:.3f}s")
print(f"Total content length: {len(full_content)}")
# 保存完整的回答到数据库
if full_content:
crud.add_chat_message(db, session_id=session_id, role="assistant", content=full_content)
# 发送结束信号
yield f"data: {json.dumps({'type': 'done'}, ensure_ascii=False)}\n\n"
except Exception as e:
import traceback
print(f"Stream error: {e}")
print(traceback.format_exc())
yield f"data: {json.dumps({'type': 'error', 'message': str(e)}, ensure_ascii=False)}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用nginx缓冲
"Transfer-Encoding": "chunked", # 确保分块传输
}
)
- 调大模型方法,stream=True 关键
def get_streaming_tongyi_llm():
"""
获取支持流式输出的通义千问模型。
使用原生DashScope SDK以确保真正的流式输出。
"""
from dashscope import Generation
import dashscope
dashscope.api_key = settings.DASHSCOPE_API_KEY
def stream_generate(prompt):
"""生成器函数,用于流式输出"""
messages = [
{"role": "system", "content": "你是一个智能助手。"},
{"role": "user", "content": prompt}
]
responses = Generation.call(
model='qwen-plus',
messages=messages,
result_format='message',
stream=True, # 启用流式
incremental_output=True, # 增量输出
)
for response in responses:
if response.status_code == 200:
if response.output and 'choices' in response.output:
choices = response.output['choices']
if choices and len(choices) > 0:
message = choices[0].get('message', {})
delta = message.get('content', '')
if delta:
yield delta
return stream_generate
- 聊天发送 react 代码
const handleSend = async () => {
if (!input.trim()) return;
const userMsg = { role: 'user', content: input, time: dayjs().format('HH:mm:ss') };
setMessages((msgs) => [...msgs, userMsg]);
setInput('');
setLoading(true);
// 先添加一个空的bot消息,用于显示流式内容
const botMsg = {
role: 'bot',
content: '',
time: dayjs().format('HH:mm:ss'),
sourceDocs: [],
isStreaming: true
};
setMessages(msgs => [...msgs, botMsg]);
try {
// 根据环境决定使用哪个URL
// 在开发环境中直接访问后端以避免代理缓冲问题
const apiUrl = process.env.NODE_ENV === 'development'
? 'http://localhost:8005/chat/stream' // 直接访问后端
: '/api/chat/stream'; // 生产环境使用相对路径
const response = await fetch(apiUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
query: userMsg.content,
session_id: currentSessionId
}),
});
if (!response.ok) {
throw new Error('Network response was not ok');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let fullContent = '';
let sessionId = currentSessionId;
let sourceDocs = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
console.log('Buffer received:', buffer.substring(0, 100)); // 调试:查看收到的数据
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data.trim()) {
try {
const parsed = JSON.parse(data);
if (parsed.type === 'metadata') {
// 更新session_id和源文档
if (parsed.session_id && !currentSessionId) {
sessionId = parsed.session_id;
setCurrentSessionId(parsed.session_id);
fetchSessions();
}
if (parsed.source_documents) {
sourceDocs = parsed.source_documents;
}
} else if (parsed.type === 'source_documents') {
// 单独处理源文档类型(Agent模式可能会单独发送)
if (parsed.source_documents) {
sourceDocs = parsed.source_documents;
// 更新最后一条bot消息的源文档
setMessages(msgs => {
const newMsgs = [...msgs];
const lastMsg = newMsgs[newMsgs.length - 1];
if (lastMsg && lastMsg.role === 'bot') {
lastMsg.sourceDocs = sourceDocs;
}
return newMsgs;
});
}
} else if (parsed.type === 'content') {
// 追加内容到最后一条消息
fullContent += parsed.content;
console.log('Received content:', parsed.content); // 调试日志
setMessages(msgs => {
const newMsgs = [...msgs];
const lastMsg = newMsgs[newMsgs.length - 1];
if (lastMsg && lastMsg.role === 'bot') {
lastMsg.content = fullContent;
lastMsg.sourceDocs = sourceDocs;
}
return newMsgs;
});
} else if (parsed.type === 'done') {
// 流式结束,标记消息为非流式状态
setMessages(msgs => {
const newMsgs = [...msgs];
const lastMsg = newMsgs[newMsgs.length - 1];
if (lastMsg && lastMsg.role === 'bot') {
lastMsg.isStreaming = false;
}
return newMsgs;
});
} else if (parsed.type === 'error') {
throw new Error(parsed.message || '流式响应出错');
}
} catch (e) {
console.error('解析SSE数据失败:', e);
}
}
}
}
}
} catch (err) {
console.error('Stream error:', err);
antdMessage.error('问答失败,请稍后重试');
// 移除空的bot消息
setMessages(msgs => msgs.slice(0, -1));
} finally {
setLoading(false);
}
};
由于是本地调试,它直接这样绕过跨域了!
# --- CORS Middleware ---
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 开发环境允许所有来源
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["*"], # 暴露所有响应头
)
部署到服务器上➡️ 见这篇文章
至此,问题结束!
它提了几个 Agent模式流式输出改进方案,我懒得试了。
基于您的需求,我设计了以下几种改进Agent模式流式输出的方案:
方案1:Agent步骤实时流式输出(推荐)
在Agent执行过程中实时流式输出每个步骤的状态和结果:
1. 工具调用提示:当Agent决定调用某个工具时,立即流式输出"正在搜索知识库..."、"正在进行网络搜索..."等状态信息
2. 工具结果摘要:工具执行完成后,流式输出简化的结果摘要
3. 最终答案流式:Agent推理完成后,真正流式输出最终答案
方案2:并行Agent执行与预流式输出
1. 立即开始流式:接收到问题后立即开始流式输出分析过程
2. 并行执行:在后台异步执行Agent,同时在前端显示分析步骤
3. 无缝切换:Agent完成后无缝切换到真实答案的流式输出
方案3:混合模式智能切换
根据问题复杂度自动选择:
- 简单问题:直接使用非Agent模式的真实流式输出
- 复杂问题:使用改进的Agent模式流式输出