在现代Python开发中,异步编程已经成为提高程序性能的重要手段,特别是在处理网络请求、数据库操作或AI模型调用等耗时操作时。本文将通过实际的LangGraph 示例,深入解析async
的真正作用,并揭示一个常见误区:为什么异步顺序执行与同步执行时间相近?
async的核心作用
async
的主要价值在于创建异步编程环境,让程序在等待耗时操作时不被阻塞,从而提高执行效率。但是,很多开发者对异步编程存在一个根本性的误解。
常见误区:async ≠ 自动加速
许多人认为只要在函数前加上async
,程序就会自动变快。这是错误的!
让我们通过一个LangGraph的实际例子来说明:
from langchain.chat_models import init_chat_model
from langgraph.graph import MessagesState, StateGraph
from dotenv import load_dotenv
load_dotenv()
# 初始化 LLM 模型
llm = ChatDeepSeek(model="deepseek-chat")
# 异步节点定义
async def async_node(state: MessagesState):
new_message = await llm.ainvoke(state["messages"])
return {"messages": [new_message]}
builder = StateGraph(MessagesState).add_node(async_node).set_entry_point("node")
graph = builder.compile()
完整的性能对比示例
以下是一个可以完整运行的性能测试示例:
import asyncio
import time
from langchain.chat_models import init_chat_model
from langgraph.graph import MessagesState, StateGraph
from dotenv import load_dotenv
load_dotenv()
# 初始化 LLM 模型
llm = ChatDeepSeek(model="deepseek-chat")
# 同步版本的节点
def sync_node(state: MessagesState):
"""同步版本:会阻塞等待"""
new_message = llm.invoke(state["messages"])
return {"messages": [new_message]}
# 异步版本的节点
async def async_node(state: MessagesState):
"""异步版本:可以并发执行"""
new_message = await llm.ainvoke(state["messages"])
return {"messages": [new_message]}
# 创建同步图
sync_builder = StateGraph(MessagesState).add_node("node", sync_node).set_entry_point("node")
sync_graph = sync_builder.compile()
# 创建异步图
async_builder = StateGraph(MessagesState).add_node("node", async_node).set_entry_point("node")
async_graph = async_builder.compile()
# 测试消息
messages = [
{"role": "user", "content": "你好,请介绍一下自己"},
{"role": "user", "content": "请解释一下什么是人工智能"},
{"role": "user", "content": "给我讲个笑话吧"},
{"role": "user", "content": "请推荐几本好书"},
{"role": "user", "content": "今天天气怎么样?"}
]
def test_sync_sequential():
"""测试同步顺序执行"""
print("同步顺序执行测试...")
start_time = time.time()
results = []
for i, msg in enumerate(messages):
print(f" 处理消息 {i+1}/{len(messages)}...")
result = sync_graph.invoke({"messages": [msg]})
results.append(result)
end_time = time.time()
duration = end_time - start_time
print(f"同步执行完成,总耗时: {duration:.2f} 秒")
return results, duration
async def test_async_sequential():
"""测试异步顺序执行"""
print("异步顺序执行测试...")
start_time = time.time()
results = []
for i, msg in enumerate(messages):
print(f" 处理消息 {i+1}/{len(messages)}...")
result = await async_graph.ainvoke({"messages": [msg]})
results.append(result)
end_time = time.time()
duration = end_time - start_time
print(f"异步顺序执行完成,总耗时: {duration:.2f} 秒")
return results, duration
async def test_async_concurrent():
"""测试异步并发执行"""
print("异步并发执行测试...")
start_time = time.time()
# 创建所有任务
tasks = []
for i, msg in enumerate(messages):
print(f" 启动任务 {i+1}/{len(messages)}...")
task = async_graph.ainvoke({"messages": [msg]})
tasks.append(task)
# 并发执行所有任务
print(" 所有任务并发运行中...")
results = await asyncio.gather(*tasks)
end_time = time.time()
duration = end_time - start_time
print(f"异步并发执行完成,总耗时: {duration:.2f} 秒")
return results, duration
async def main():
"""主函数:运行所有测试"""
print("=" * 60)
print("LangGraph 异步 vs 同步性能测试")
print("=" * 60)
print(f"测试场景:处理 {len(messages)} 个 LLM 请求")
print()
# 1. 同步顺序执行
sync_results, sync_time = test_sync_sequential()
print()
# 2. 异步顺序执行
async_seq_results, async_seq_time = await test_async_sequential()
print()
# 3. 异步并发执行
async_con_results, async_con_time = await test_async_concurrent()
print()
# 性能对比分析
print("=" * 60)
print("性能对比分析")
print("=" * 60)
print(f"同步顺序执行: {sync_time:.2f} 秒")
print(f"异步顺序执行: {async_seq_time:.2f} 秒")
print(f"异步并发执行: {async_con_time:.2f} 秒")
print()
# 计算性能提升
if async_con_time > 0:
speedup_vs_sync = sync_time / async_con_time
speedup_vs_async_seq = async_seq_time / async_con_time
print("性能提升:")
print(f"异步并发 vs 同步顺序: {speedup_vs_sync:.1f}x 倍速提升")
print(f"异步并发 vs 异步顺序: {speedup_vs_async_seq:.1f}x 倍速提升")
# 运行测试
if __name__ == "__main__":
asyncio.run(main())
三种执行方式的性能对比
1. 同步顺序执行
def test_sync_sequential():
results = []
for msg in messages:
result = sync_graph.invoke({"messages": [msg]})
results.append(result)
return results
# 执行时间线:
# [请求1---等待---响应1] [请求2---等待---响应2] [请求3---等待---响应3] ...
# 总耗时:约 10-15 秒(5个请求 × 每个2-3秒)
2. 异步顺序执行
async def test_async_sequential():
results = []
for msg in messages:
result = await async_graph.ainvoke({"messages": [msg]}) # 还是逐个等待
results.append(result)
return results
# 执行时间线:
# [请求1---等待---响应1] [请求2---等待---响应2] [请求3---等待---响应3] ...
# 总耗时:约 10-15 秒(与同步执行相近)
3. 异步并发执行
async def test_async_concurrent():
# 关键:同时启动所有任务
tasks = [async_graph.ainvoke({"messages": [msg]}) for msg in messages]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
return results
# 执行时间线:
# [请求1---等待---响应1]
# [请求2---等待---响应2] ← 同时进行
# [请求3---等待---响应3] ← 同时进行
# [请求4---等待---响应4] ← 同时进行
# [请求5---等待---响应5] ← 同时进行
# 总耗时:约 2-3 秒(接近单个请求时间)
为什么异步顺序执行时间相近?
这个现象困惑了很多开发者。让我们深入分析原因:
控制权的概念
在异步编程中,控制权指的是CPU当前正在执行哪段代码的决定权。
同步执行中的控制权
def sync_function():
print("开始")
result = llm.invoke(messages) # CPU 在这里"卡住"等待
print("结束")
return result
# 执行流程:
# 1. CPU 执行 print("开始")
# 2. CPU 调用 llm.invoke()
# 3. CPU 完全停止,等待网络响应(2-3秒)
# 4. 收到响应后,CPU 继续执行 print("结束")
在步骤3中,CPU被完全占用但什么都不做,这就是"阻塞"。
异步执行中的控制权转移
async def async_function():
print("开始")
result = await llm.ainvoke(messages) # 让出控制权
print("结束")
return result
# 执行流程:
# 1. CPU 执行 print("开始")
# 2. CPU 调用 llm.ainvoke()
# 3. 遇到 await,CPU 说:"我先去做别的事,响应来了再叫我"
# 4. CPU 可以执行其他任务
# 5. 网络响应到达,CPU 重新获得控制权
# 6. CPU 继续执行 print("结束")
关键洞察:让出控制权 ≠ 时间节省
# 异步但没有性能提升(顺序执行)
for msg in messages:
result = await process_message(msg) # 还是一个接一个等待
# 异步真正的优势(并发执行)
tasks = [process_message(msg) for msg in messages]
results = await asyncio.gather(*tasks) # 同时处理所有
异步顺序执行时间相近的原因:
- 都是顺序执行:两种方式都是"处理完第一个请求,再处理第二个"
- 等待时间相同:每个LLM调用的网络延迟和处理时间是一样的
- 没有并发优势:异步顺序执行没有利用异步的核心优势——并发
实际运行和测试
将上述代码保存为 async_test.py
,运行后会看到类似输出:
============================================================
🧪 LangGraph 异步 vs 同步性能测试
============================================================
📝 测试场景:处理 5 个 LLM 请求
🔄 同步顺序执行测试...
处理消息 1/5...
处理消息 2/5...
处理消息 3/5...
处理消息 4/5...
处理消息 5/5...
✅ 同步执行完成,总耗时: 148.37 秒
⏳ 异步顺序执行测试...
处理消息 1/5...
处理消息 2/5...
处理消息 3/5...
处理消息 4/5...
处理消息 5/5...
✅ 异步顺序执行完成,总耗时: 147.72 秒
🚀 异步并发执行测试...
启动任务 1/5...
启动任务 2/5...
启动任务 3/5...
启动任务 4/5...
启动任务 5/5...
🔥 所有任务并发运行中...
✅ 异步执行完成,总耗时: 67.24 秒
============================================================
📊 性能对比分析
============================================================
同步顺序执行: 148.37 秒
异步顺序执行: 147.72 秒
异步并发执行: 67.24 秒
🎯 性能提升:
异步并发 vs 同步顺序: 2.2x 倍速提升
异步并发 vs 异步顺序: 2.2x 倍速提升
💡 关键发现:
• 异步并发执行可以显著减少总耗时
• 当有多个独立的 LLM 调用时,并发执行效果最明显
• 异步顺序执行与同步执行时间相近(都是逐个等待)
• 实际加速比取决于网络延迟和 LLM 响应时间
实际应用指导
何时使用异步?
适合使用异步的场景:
- 多个独立的网络请求(如批量API调用)
- 并发的数据库查询
- 同时处理多个用户请求
- I/O密集型任务
不适合使用异步的场景:
- CPU密集型计算
- 必须顺序执行的依赖任务
- 简单的单次操作
最佳实践
# 错误用法:异步但无性能提升
async def bad_example():
result1 = await api_call_1()
result2 = await api_call_2() # 依赖result1
result3 = await api_call_3() # 依赖result2
return [result1, result2, result3]
# 改进:部分并发
async def better_example():
# 可以并发的部分
task1 = api_call_1()
task2 = independent_api_call()
result1, result2 = await asyncio.gather(task1, task2)
# 依赖前面结果的部分
result3 = await api_call_3(result1)
return [result1, result2, result3]
# 最佳:完全并发(当任务独立时)
async def best_example():
tasks = [
api_call_1(),
api_call_2(),
api_call_3(),
api_call_4(),
api_call_5()
]
results = await asyncio.gather(*tasks)
return results
总结
- async的真正价值:不在于让单个任务变快,而在于让多个任务可以同时进行
- 异步顺序执行时间相近:因为还是逐个等待,没有发挥并发优势
- 性能提升的关键:使用
asyncio.gather()
或类似机制实现真正的并发 - 实际应用:在设计异步程序时,要识别哪些任务可以并发执行
异步编程是一个强大的工具,但只有正确使用才能发挥其真正的威力。记住:异步的魅力不在于等待得更快,而在于可以同时等待多件事情。
延伸思考:在你的项目中,有哪些场景可以从顺序执行改为并发执行?试着识别那些相互独立的异步操作,这通常是性能优化的黄金机会。