LangChain自动化工作流实战教程:从任务编排到智能决策
一、LangChain工作流引擎核心概念
1.1 工作流自动化设计模式
1.2 核心组件介绍
from langchain_core.runnables import (
RunnableLambda,
RunnableParallel,
RunnableBranch
)
from langchain.agents import AgentExecutor, Tool
from langchain.memory import ConversationBufferWindowMemory
二、环境配置与基础搭建
2.1 快速安装
pip install langchain langchain-openai langchain-experimental
pip install duckduckgo-search # 用于网络搜索工具
2.2 最小化工作流示例
from langchain_core.runnables import RunnablePassthrough
# 定义处理节点
def step1(input):
return {"output1": input["input"] + " processed"}
def step2(input):
return {"final": input["output1"].upper()}
# 构建工作流
workflow = (
RunnablePassthrough.assign(output1=step1)
| step2
)
# 执行工作流
workflow.invoke({"input": "test"})
三、复杂工作流编排实战
3.1 条件分支工作流
from langchain_core.runnables import RunnableBranch
# 定义分支条件
def route_condition(data):
if data["type"] == "A":
return "path_a"
return "path_b"
# 定义各分支处理
def path_a(data):
return {"result": "Handled by A"}
def path_b(data):
return {"result": "Handled by B"}
# 构建分支工作流
branch = RunnableBranch(
(lambda x: route_condition(x) == "path_a", path_a),
path_b
)
# 执行分支工作流
branch.invoke({"type": "B"})
3.2 并行执行工作流
# 定义并行任务
def fetch_news(data):
return {"news": "最新AI动态..."}
def fetch_weather(data):
return {"weather": "晴,25℃"}
# 构建并行流
parallel_workflow = RunnableParallel(
news=fetch_news,
weather=fetch_weather
)
# 执行并行流
parallel_workflow.invoke({})
四、集成外部工具与Agent
4.1 工具集成示例
from langchain.agents import Tool
from langchain.tools import DuckDuckGoSearchResults
# 创建工具集
search = DuckDuckGoSearchResults()
tools = [
Tool(
name="web_search",
func=search.run,
description="用于查询实时信息"
),
Tool(
name="calculator",
func=lambda x: str(eval(x)),
description="用于数学计算"
)
]
# 创建代理工作流
from langchain.agents import create_react_agent
agent = create_react_agent(
llm=ChatOpenAI(model="gpt-3.5-turbo"),
tools=tools,
prompt=prompt
)
4.2 自动化审批流程案例
def approval_workflow(doc):
# 文档分类
classifier = RunnableLambda(classify_document)
# 并行执行检查
checks = RunnableParallel(
legal_review=legal_check,
finance_review=finance_check
)
# 最终决策
decision = RunnableLambda(make_decision)
return (
{"document": doc}
| classifier
| checks
| decision
)
# 执行审批流
result = approval_workflow.invoke(contract_doc)
五、状态管理与错误处理
5.1 工作流状态跟踪
from langchain_core.runnables import RunnableConfig
def log_step(data, config):
print(f"Step {config.get('step')} executed")
return data
workflow_with_log = (
RunnableLambda(log_step).with_config({"step": 1})
| RunnableLambda(process_data).with_config({"step": 2})
)
5.2 异常处理机制
from langchain_core.runnables import RunnableLambda
def safe_operation(data):
try:
return risky_operation(data)
except Exception as e:
return {"error": str(e)}
workflow = RunnableLambda(safe_operation)
六、生产级部署方案
6.1 工作流服务化部署
from fastapi import FastAPI
from concurrent.futures import ThreadPoolExecutor
app = FastAPI()
executor = ThreadPoolExecutor(max_workers=4)
@app.post("/run-workflow")
async def run_workflow(input_data: dict):
future = executor.submit(workflow.invoke, input_data)
return {"status": "started", "task_id": future.task_id}
6.2 性能优化技巧
# 异步执行工作流
async def async_workflow(data):
return await workflow.ainvoke(data)
# 缓存中间结果
from langchain.cache import InMemoryCache
langchain.llm_cache = InMemoryCache()
七、典型应用场景案例
7.1 智能客服工单系统
7.2 自动化数据分析流程
def analysis_workflow():
return (
load_data
| clean_data
| RunnableParallel(
stats=calculate_statistics,
trends=identify_trends
)
| generate_report
)
最佳实践与常见问题
8.1 调试建议
# 可视化工作流
print(workflow.get_graph().draw_mermaid())
# 分步调试
for step in workflow.stream(input_data):
print("Intermediate:", step)
8.2 性能优化矩阵
优化方向 | 实施方法 | 预期提升 |
---|---|---|
并行化 | 使用RunnableParallel | 30-50% |
缓存 | 实现LLM结果缓存 | 40-70% |
批处理 | 使用batch_invoke方法 | 3-5x |
异步执行 | 使用ainvoke替代invoke | 2-3x |
完整项目示例:
git clone https://github.com/example/langchain-automation-demo
cd langchain-automation-demo
python finance_approval_workflow.py
提示:本教程基于LangChain 0.1.x版本,实际开发时请参考官方文档获取最新API变更。对于企业级应用,建议结合Airflow或Prefect等调度系统实现复杂工作流管理。