随着大型语言模型(
LLM
)能力的不断提升,构建具备复杂任务处理能力的AI Agent
系统成为研究与应用的热点。然而,赋予Agent
调用外部工具的能力是其走向实用化的关键一步。本文旨在探讨一种新兴的标准化工具接口协议——Model Context Protocol (MCP)
,并深入分析如何将其集成到当前八种主流的LLM Agent
开发框架中,包括OpenAI Agents SDK、LangGraph、LlamaIndex、AutoGen、Pydantic AI、SmolAgents、Camel和CrewAI。通过梳理各框架的核心特性、集成方法及代码示例,本文为开发者提供了一套清晰的实践指南,旨在促进Agent系统与外部世界的高效交互,推动AI Agent
技术的标准化与工程化落地。
1. 什么是 MCP Server?
MCP(Model Context Protocol)Server
是一个标准化的工具接口协议,它允许 AI Agent
通过统一的方式调用各种外部工具和服务。无论是搜索引擎、数据库查询,还是API
调用,MCP Server
都能提供标准化的接入方式。
MCP Server
支持两种主要连接模式:
Stdio
模式:通过命令行进程通信,适合本地开发SSE
模式:通过HTTP连接,适合生产环境部署
2. 主流LLM Agent框架与MCP集成实战
2.1 OpenAI Agents SDK - 轻量级多Agent协作
OpenAI
官方推出的轻量级框架,特别适合构建简单的多Agent
协作场景。
集成要点 : 使用
mcp-client-python
库连接MCP Server
,并将获取的工具传递给create_agent
函数。安装依赖:
pip install openai-agents-sdk mcp-client-python
优势 :API简洁,OpenAI生态无缝集成。
应用场景 :创建“研究员”和“写手”
Agent
,研究员调用MCP
搜索工具获取信息,完成后将任务转交给写手生成报告。# 基础Agent设置 from openai_agents import Agent, create_agent from mcp_client import MCPClient import asyncio async def setup_openai_agent_with_mcp(): # 创建MCP客户端 mcp_client = MCPClient() # 连接Tavily搜索工具 await mcp_client.connect_stdio( command="npx", args=["@tavily/mcp-server"] ) # 获取工具列表 tools = await mcp_client.get_tools() # 创建Agent agent = create_agent( name="搜索助手", instructions="你是一个专业的搜索助手,能够帮助用户获取最新信息", tools=tools, model="gpt-4" ) return agent, mcp_client # 运行Agent async def main(): agent, mcp_client = await setup_openai_agent_with_mcp() # 与Agent对话 response = await agent.run( "帮我搜索一下2025年AI领域的重要进展" ) print(response.content) # 清理连接 await mcp_client.disconnect() # 执行 if __name__ == "__main__": asyncio.run(main())
创建和运行多Agent协作系统的异步函数,使用了类似 OpenAI Assistants API 的概念,并结合了 MCP(Model Control Protocol)客户端来集成外部工具(如 Tavily 搜索):
安装依赖:
pip install langgraph langchain-openai tavily-python
import asyncio from typing import TypedDict, Annotated, Sequence from langchain_core.messages import BaseMessage, HumanMessage, AIMessage from langchain_openai import ChatOpenAI from tavily import TavilyClient from langgraph.graph import StateGraph, END # 初始化模型 llm = ChatOpenAI(model="gpt-4", temperature=0) # 初始化 Tavily 搜索工具 tavily = TavilyClient(api_key="your_tavily_api_key") # 替换为你的 Tavily Key # 定义状态 class AgentState(TypedDict): messages: Annotated[Sequence[BaseMessage], ...] current_agent: str research_data: str # 创建研究员 Agent async def researcher_node(state: AgentState): query = state['messages'][-1].content print("🔍 研究员正在搜集信息...") search_result = tavily.search(query=query, max_results=5) result_text = "\n\n".join([r["content"] for r in search_result["results"]]) # 调用 LLM 做初步分析(可选) response = llm.invoke([ HumanMessage(content=f"请总结以下关于'{query}'的信息,用于写科普文章:\n{result_text}") ]) return { **state, "research_data": response.content, "messages": state["messages"] + [AIMessage(content=response.content)], "current_agent": "writer" } # 创建写手 Agent async def writer_node(state: AgentState): print("✍️ 写手正在撰写文章...") research_summary = state["research_data"] prompt = f""" 请将以下研究内容整理成一篇通俗易懂的中文科普文章,风格生动,适合大众阅读: {research_summary} """ response = llm.invoke([HumanMessage(content=prompt)]) return { **state, "messages": state["messages"] + [AIMessage(content=response.content)], "current_agent": "end" } # 构建图 async def create_agent_team(): workflow = StateGraph(AgentState) workflow.add_node("researcher", researcher_node) workflow.add_node("writer", writer_node) workflow.set_entry_point("researcher") # 设置流转逻辑 workflow.add_conditional_edges( "researcher", lambda x: "writer", { "writer": "writer" } ) workflow.add_edge("writer", END) graph = workflow.compile() return graph # 运行任务 async def run_team_task(): app = await create_agent_team() inputs = { "messages": [HumanMessage(content="研究量子计算的最新突破,并整理成一篇科普文章")], "current_agent": "researcher", "research_data": "" } async for output in app.astream(inputs): for key, value in output.items(): print(f"\n[Node: {key}]") if 'research_data' in value: print("📊 研究完成,数据已生成...") if 'messages' in value and len(value['messages']) > 0: last_msg = value['messages'][-1] if isinstance(last_msg, AIMessage): print("📝 输出:", last_msg.content[:200] + "..." if len(last_msg.content) > 200 else last_msg.content) # 最终结果 final_msg = output.get("writer", {}).get("messages", [])[-1] print("\n" + "="*50) print("最终文章:\n", final_msg.content) print("="*50) # 主程序入口 if __name__ == "__main__": asyncio.run(run_team_task())
多Agent协作示例
# 多Agent协作示例 async def create_agent_team(): # 创建MCP客户端 mcp_client = MCPClient() await mcp_client.connect_stdio( command="npx", args=["@tavily/mcp-server"] ) tools = await mcp_client.get_tools() # 创建研究员Agent researcher = create_agent( name="研究员", instructions="负责信息搜集和分析", tools=tools, model="gpt-4" ) # 创建写手Agent writer = create_agent( name="写手", instructions="负责将研究结果整理成文章", model="gpt-4" ) # 创建协作工作流 from openai_agents import Handoffs # 配置Agent间的任务转交 handoffs = Handoffs() handoffs.add_handoff( from_agent=researcher, to_agent=writer, condition="完成信息收集后转交给写手" ) return researcher, writer, handoffs # 运行协作任务 async def run_team_task(): researcher, writer, handoffs = await create_agent_team() # 启动协作任务 result = await researcher.run( "研究量子计算的最新突破,并整理成一篇科普文章", handoffs=handoffs ) print(result.content)
2.2 LangGraph - 有状态图工作流
基于图结构的框架,适用于需要状态管理和复杂决策逻辑的长周期任务。
MCP集成 :使用
load_mcp_tools
或MultiServerMCPClient
加载工具。应用场景 :构建“研究-分析-报告”三阶段工作流。研究节点调用MCP搜索,分析和报告节点则基于研究结果进行纯推理。
优势 :可视化工作流,状态持久化,适合处理复杂、多步骤任务。
安装依赖:
pip install langgraph langchain-openai mcp-client-python
from langgraph.prebuilt import create_react_agent from langchain_openai import ChatOpenAI from mcp_client import MCPClient, load_mcp_tools import asyncio async def create_langgraph_agent(): # 方法1:使用load_mcp_tools tools = await load_mcp_tools( server_config={ "command": "npx", "args": ["@tavily/mcp-server"] } ) # 初始化LLM llm = ChatOpenAI(model="gpt-4") # 创建ReactAgent agent = create_react_agent( llm=llm, tools=tools, state_modifier="你是一个专业的AI助手,能够使用各种工具帮助用户解决问题" ) return agent # 运行Agent async def run_langgraph_agent(): agent = await create_langgraph_agent() # 执行任务 config = {"configurable": {"thread_id": "1"}} result = await agent.ainvoke( {"messages": [("user", "帮我搜索并总结Python 3.12的新特性")]}, config=config ) print(result["messages"][-1].content) if __name__ == "__main__": asyncio.run(run_langgraph_agent())
集成多个 MCP Server(如 Tavily 搜索、数据库等)作为工具,供一个 Agent 使用,并通过 LangGraph 驱动 ReAct 模式完成任务。
安装依赖:
pip install langchain langchain-mcp tavily-python openai
import asyncio import json import sys from typing import List, Dict, Any from langchain_core.tools import Tool from langchain_openai import ChatOpenAI from langchain.agents import create_react_agent from langchain.agents.agent import AgentExecutor from langchain.memory import ConversationBufferMemory # ======================= # MCP 客户端:与 MCP Server 通信 # ======================= class MCPClient: def __init__(self, server_id: str, command: str, args: List[str]): self.server_id = server_id self.command = command self.args = args self.reader = None self.writer = None async def connect(self): print(f"🔁 启动 MCP Server: {self.server_id}") proc = await asyncio.create_subprocess_exec( self.command, *self.args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=sys.stderr, ) self.reader, self.writer = proc.stdin, proc.stdout # 读取初始化消息 header = await self._read_line() if not header.startswith("MCP/"): raise RuntimeError(f"Invalid MCP server response: {header}") # 发送客户端握手 self._send({"version": "0.2.0"}) init = await self._read_json() if init["type"] != "init": raise RuntimeError("Expected init message") print(f"✅ MCP Server '{self.server_id}' 初始化完成,工具数: {len(init.get('tools', []))}") return init.get("tools", []) async def _read_line(self) -> str: line = await self.reader.readline() return line.decode().strip() def _send(self, data: dict): text = json.dumps(data, ensure_ascii=False) self.writer.write((text + "\n").encode()) # 注意:不 await writer.drain() 可能导致问题 # await self.writer.drain() async def _read_json(self) -> dict: while True: line = await self.reader.readline() if not line: raise EOFError("Connection closed") try: return json.loads(line.decode()) except json.JSONDecodeError: continue async def close(self): if self.writer: self.writer.close() await self.writer.wait_closed() # ======================= # 创建多 Server Agent # ======================= async def create_multi_server_agent(): clients: List[MCPClient] = [] all_tools: List[Tool] = [] # 创建 MCP 客户端列表 mcp_configs = [ ("search", "npx", ["@tavily/mcp-server"]), # 示例:其他 MCP Server(需要实际存在) # ("db", "python", ["-m", "my_mcp_db_server"]), ] for server_id, cmd, args in mcp_configs: client = MCPClient(server_id, cmd, args) await client.connect() clients.append(client) # 获取工具并转换为 LangChain Tool tools = await client.get_all_tools() # 实际上我们已经在 connect 返回了 tools for tool in tools: lc_tool = Tool( name=f"{server_id}.{tool['name']}", description=tool.get("description", "") or tool.get("prompt", ""), func=lambda q, t=tool, c=client: asyncio.run(c.call_tool(t["name"], {"query": q})), ) all_tools.append(lc_tool) # 初始化 LLM llm = ChatOpenAI(model="gpt-4o", temperature=0) # 创建 ReAct Agent from langchain import hub prompt = hub.pull("hwchase17/react") agent = create_react_agent(llm, all_tools, prompt) agent_executor = AgentExecutor(agent=agent, tools=all_tools, verbose=True) return agent_executor, clients # ======================= # 调用工具(简化版,需增强) # ======================= # 注意:上面的 lambda 有问题(闭包陷阱),需改写为: async def call_tool(client: MCPClient, tool_name: str, input_dict: dict): # 发送调用消息 client._send({ "type": "call", "method": tool_name, "params": input_dict }) # 等待结果(简化处理,实际需处理流式/多消息) result = await client._read_json() if result["type"] == "result": return result["output"] elif result["type"] == "error": raise RuntimeError(result["error"]) return str(result) # 修正:动态绑定时避免闭包问题 def make_tool(client, tool_name, tool_info): async def func(input_val): if isinstance(input_val, dict): params = input_val else: params = {"query": input_val} try: result = await call_tool(client, tool_name, params) return json.dumps(result, ensure_ascii=False, indent=2) except Exception as e: return f"Error calling {tool_name}: {str(e)}" return Tool( name=f"{client.server_id}.{tool_name}", description=tool_info.get("description", "") or tool_info.get("prompt", "No description"), func=func, coroutine=func, ) # 重新实现 create_multi_server_agent(修正版) async def create_multi_server_agent(): clients: List[MCPClient] = [] all_tools: List[Tool] = [] mcp_configs = [ ("search", "npx", ["@tavily/mcp-server"]), # 可扩展其他 MCP Server ] for server_id, cmd, args in mcp_configs: client = MCPClient(server_id, cmd, args) await client.connect() clients.append(client) # 获取工具 tools = await client.get_all_tools() # 实际上我们已经在 connect 返回了 tools for tool in tools: lc_tool = make_tool(client, tool["name"], tool) all_tools.append(lc_tool) llm = ChatOpenAI(model="gpt-4o", temperature=0) from langchain import hub prompt = hub.pull("hwchase17/react") agent = create_react_agent(llm, all_tools, prompt) agent_executor = AgentExecutor(agent=agent, tools=all_tools, verbose=True) return agent_executor, clients # ======================= # 运行任务 # ======================= async def run_langgraph_agent(): agent_executor, clients = await create_multi_server_agent() try: result = await agent_executor.ainvoke({ "input": "量子计算最近有什么突破?请搜索并总结。", "chat_history": [], }) print("\n\n🎯 最终结果:") print(result["output"]) finally: for client in clients: await client.close() if __name__ == "__main__": asyncio.run(run_langgraph_agent())
多Server集成示例
# 多Server集成示例 from langgraph.mcp import MultiServerMCPClient async def create_multi_server_agent(): # 创建多Server客户端 mcp_client = MultiServerMCPClient() # 添加搜索工具 await mcp_client.add_server( "search", command="npx", args=["@tavily/mcp-server"] ) # 添加数据库工具(示例) await mcp_client.add_server( "database", command="npx", args=["@example/database-mcp-server"] ) # 获取所有工具 all_tools = await mcp_client.get_all_tools() llm = ChatOpenAI(model="gpt-4") agent = create_react_agent(llm=llm, tools=all_tools) return agent, mcp_client
使用 LangGraph + LangChain 构建的 自动化研究工作流系统 ,让多个 AI Agent 分工协作,自动完成:“用户提问 → 搜集资料 → 分析信息 → 写成专业报告”
安装依赖:
pip install langgraph langchain-openai langchain-community tavily-python
import asyncio from typing import TypedDict, Annotated, List from langgraph.graph import StateGraph, START, END from langchain_core.messages import BaseMessage, HumanMessage, AIMessage from langchain_openai import ChatOpenAI from langchain.agents import create_react_agent from langchain_community.tools.tavily_search import TavilySearchResults from langchain_core.prompts import ChatPromptTemplate from operator import add # ======================= # 自定义状态 # ======================= class AgentState(TypedDict): messages: Annotated[List[BaseMessage], add] # 自动合并消息 research_data: dict # 存储中间数据 final_report: str # 最终输出 # ======================= # 工具:使用 Tavily 搜索(替代 MCP) # ======================= def load_tools(): """加载外部工具,例如 Tavily 搜索""" search_tool = TavilySearchResults( max_results=5, include_images=False, include_raw_content=True ) return [search_tool] # ======================= # 节点函数 # ======================= llm = ChatOpenAI(model="gpt-4o", temperature=0) async def research_node(state: AgentState): """研究节点:调用搜索工具收集信息""" print("\n🔍 正在执行【研究节点】...") query = state["messages"][-1].content system_prompt = "你是一个专业研究员,擅长使用工具收集权威信息。" # 创建 ReAct Agent agent = create_react_agent( llm=llm, tools=load_tools(), state_modifier=system_prompt # 实际上会被包装进 prompt ) # 执行调用 result = await agent.ainvoke({ "messages": [ ("system", system_prompt), ("human", f"请深入研究:{query}") ] }) # 提取响应 response_msg: AIMessage = result["messages"][-1] return { "messages": [response_msg], "research_data": { "raw_data": response_msg.content } } async def analysis_node(state: AgentState): """分析节点:对研究结果进行深度分析""" print("\n📊 正在执行【分析节点】...") raw_data = state["research_data"]["raw_data"] prompt = f""" 你是一名资深行业分析师,请基于以下研究内容进行结构化分析: {raw_data} 请输出: 1. 关键发现(3-5条) 2. 行业趋势判断 3. 潜在挑战与机遇 """ response = await llm.ainvoke([ ("system", "你是一个专业的数据分析师,逻辑严谨,表达清晰。"), ("human", prompt) ]) return { "messages": [response], "research_data": { **state["research_data"], "analysis": response.content } } async def report_node(state: AgentState): """报告生成节点:输出结构化报告""" print("\n📝 正在执行【报告节点】...") raw_data = state["research_data"]["raw_data"] analysis = state["research_data"]["analysis"] prompt = f""" 请根据以下内容撰写一份专业报告: 【原始研究】 {raw_data} 【分析结论】 {analysis} 要求: - 标题明确 - 分为“背景”、“关键发现”、“趋势预测”、“建议”四个部分 - 语言正式,适合向管理层汇报 """ response = await llm.ainvoke([ ("system", "你是一位资深咨询顾问,擅长撰写高质量行业报告。"), ("human", prompt) ]) return { "messages": [response], "final_report": response.content } # ======================= # 构建工作流 # ======================= async def create_research_workflow(): workflow = StateGraph(AgentState) # 添加节点 workflow.add_node("research", research_node) workflow.add_node("analysis", analysis_node) workflow.add_node("report", report_node) # 设置图结构 workflow.add_edge(START, "research") workflow.add_edge("research", "analysis") workflow.add_edge("analysis", "report") workflow.add_edge("report", END) # 编译图 app = workflow.compile() return app # ======================= # 运行工作流 # ======================= async def run_research_workflow(): app = await create_research_workflow() inputs = { "messages": [ HumanMessage(content="人工智能在医疗领域的应用前景") ], "research_data": {}, "final_report": "" } print("🚀 开始运行复杂研究工作流...\n") result = await app.ainvoke(inputs) print("\n" + "="*60) print("✅ 最终报告") print("="*60) print(result["final_report"]) print("="*60) # ======================= # 主入口 # ======================= if __name__ == "__main__": asyncio.run(run_research_workflow())
2.3 LlamaIndex - 企业级RAG+Agent
专为检索增强生成(RAG
)设计,擅长处理企业知识库。
MCP
集成 :使用McpToolSpec
包装MCP
工具。应用场景 :
Agent
优先查询企业内部的向量知识库(如产品手册),若信息不足,则通过MCP
调用外部搜索引擎获取最新资讯。优势 :完美融合内部知识与外部信息,确保回答的全面性和时效性。
安装依赖:
pip install llama-index llama-index-agent-openai mcp-client-python
# 安装和基础设置 from llama_index.core import Settings from llama_index.llms.openai import OpenAI from llama_index.agent.openai import OpenAIAgent from llama_index.tools.mcp import McpToolSpec from mcp_client import BasicMCPClient import asyncio async def create_llamaindex_agent(): # 配置全局设置 Settings.llm = OpenAI(model="gpt-4") # 创建MCP客户端 mcp_client = BasicMCPClient() await mcp_client.connect_stdio( command="npx", args=["@tavily/mcp-server"] ) # 使用McpToolSpec包装MCP工具 mcp_tool_spec = McpToolSpec(mcp_client) tools = mcp_tool_spec.to_tool_list() # 创建Agent agent = OpenAIAgent.from_tools( tools=tools, system_prompt=""" 你是一个专业的AI助手,具备以下能力: 1. 使用搜索工具获取最新信息 2. 分析和整理信息 3. 提供专业建议 请始终基于可靠的信息源进行回答。 """, verbose=True ) return agent, mcp_client # 运行Agent async def run_llamaindex_agent(): agent, mcp_client = await create_llamaindex_agent() try: # 执行查询 response = await agent.achat( "帮我搜索并分析2024年大语言模型的发展趋势" ) print("Agent回复:") print(response.response) # 查看工具调用历史 print(" 工具调用历史:") for i, source in enumerate(response.source_nodes): print(f"调用 {i+1}: {source.node.text[:100]}...") finally: await mcp_client.disconnect() if __name__ == "__main__": # 运行基础Agent asyncio.run(run_llamaindex_agent())
RAG + Agent 集成
安装依赖:
pip install llama-index-core llama-index-embeddings-openai llama-index-llms-openai llama-index-readers-web tavily-python
import asyncio from llama_index.core import Settings, VectorStoreIndex, Document from llama_index.core.tools import QueryEngineTool, ToolMetadata from llama_index.core.agent import ReActAgent from llama_index.llms.openai import OpenAI from llama_index.embeddings.openai import OpenAIEmbedding from llama_index.readers.web import TavilyWebReader # ======================= # 全局设置 # ======================= Settings.llm = OpenAI(model="gpt-4o", temperature=0) Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-large") # ======================= # 创建 RAG 知识库 # ======================= async def create_knowledge_base(): documents = [ Document(text="人工智能是计算机科学的一个分支,致力于创建能够执行通常需要人类智能的任务的系统。"), Document(text="机器学习是人工智能的一个子领域,专注于开发能够从数据中学习的算法。"), Document(text="深度学习是机器学习的一个分支,使用多层神经网络来模拟人脑的工作方式。"), Document(text="卷积神经网络(CNN)在图像识别中表现优异,常用于医学影像分析。"), Document(text="Transformer 模型通过自注意力机制,在自然语言处理任务中取得突破。") ] index = VectorStoreIndex.from_documents(documents) return index # ======================= # 创建 RAG 工具 # ======================= def create_rag_tool(index): return QueryEngineTool( query_engine=index.as_query_engine(similarity_top_k=3), metadata=ToolMetadata( name="knowledge_base", description="查询内部知识库,获取AI相关的基础概念和定义" ) ) # ======================= # 创建外部搜索工具(替代 MCP) # ======================= def create_search_tool(): # 使用 TavilyWebReader 作为外部搜索工具 web_reader = TavilyWebReader( api_key="your_tavily_api_key", # 替换为你的 Tavily Key include_raw_content=True, max_results=3 ) async def search_fn(query: str): try: nodes = await web_reader.load_data([query]) content = "\n\n".join([n.text for n in nodes]) return content except Exception as e: return f"搜索出错: {str(e)}" return QueryEngineTool( query_engine=None, # 手动定义函数 metadata=ToolMetadata( name="web_search", description="搜索互联网以获取关于AI、科技等领域的最新进展" ), tool_fn=search_fn, async_tool_fn=search_fn, ) # ======================= # 创建 Agent # ======================= async def create_rag_agent(): # 1. 创建 RAG 知识库 index = await create_knowledge_base() # 2. 创建工具 rag_tool = create_rag_tool(index) search_tool = create_search_tool() all_tools = [rag_tool, search_tool] # 3. 创建 Agent agent = ReActAgent.from_tools( tools=all_tools, system_prompt=""" 你是一个专业的AI知识助手,具备以下能力: 1. 使用内部知识库查询AI的基础概念和定义 2. 使用网络搜索工具获取最新的技术进展和新闻 3. 结合两者提供全面、准确的回答 回答策略: - 对于基础概念(如“什么是深度学习”),优先使用内部知识库 - 对于时效性问题(如“2025年最新进展”),必须使用搜索工具 - 若问题包含两部分,请分别查询并综合回答 """, verbose=True ) return agent # ======================= # 运行 Agent # ======================= async def run_rag_agent(): agent = await create_rag_agent() query = "什么是深度学习?它在2025年有哪些最新进展?" print(f"📌 问题:{query}\n") try: response = await agent.achat(query) print("🧠 RAG Agent 回复:") print(response.response) except Exception as e: print(f"❌ Agent 执行出错: {e}") # ======================= # 主程序入口 # ======================= if __name__ == "__main__": asyncio.run(run_rag_agent())
2.4 AutoGen 0.4+ - 分布式多Agent系统
AutoGen
是微软开发的多Agent
协作框架,0.4
版本开始引入了更强大的分布式能力。
MCP
集成 :通过AutoGenMCPClient
连接,支持SSE
模式。应用场景 :构建分布式系统,一个本地
Agent
负责决策,另一个远程Agent
通过SSE
连接的MCP Server
执行耗时的工具调用。优势 :强大的
Agent
通信和协作机制,适合大型、分布式的Agent
系统。安装依赖:
pip install autogen-agentchat mcp-client-python
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.ui import Console from mcp_client import StdioServerParams, SseServerParams from mcp_client.integrations.autogen import AutoGenMCPClient from autogen_agentchat.base import TaskResult from autogen_agentchat.messages import ChatMessage async def create_autogen_agents(): # 创建MCP客户端 mcp_client = AutoGenMCPClient() # 配置Stdio Server stdio_params = StdioServerParams( command="npx", args=["@tavily/mcp-server"] ) # 连接MCP Server await mcp_client.connect_server("search_tools", stdio_params) # 获取工具列表 tools = await mcp_client.get_tools("search_tools") # 创建研究员Agent researcher = AssistantAgent( name="研究员", model_client=OpenAI(model="gpt-4"), tools=tools, system_message=""" 你是一个专业的研究员,负责: 1. 收集和分析信息 2. 使用搜索工具获取最新数据 3. 提供准确的研究结果 """ ) # 创建分析师Agent analyst = AssistantAgent( name="分析师", model_client=OpenAI(model="gpt-4"), system_message=""" 你是一个专业的数据分析师,负责: 1. 分析研究员提供的数据 2. 识别趋势和模式 3. 提供深入见解 """ ) # 创建报告员Agent reporter = AssistantAgent( name="报告员", model_client=OpenAI(model="gpt-4"), system_message=""" 你是一个专业的报告撰写专家,负责: 1. 整理研究和分析结果 2. 生成结构化报告 3. 确保内容准确易懂 """ ) return researcher, analyst, reporter, mcp_client async def run_autogen_team(): researcher, analyst, reporter, mcp_client = await create_autogen_agents() try: # 创建团队 team = RoundRobinGroupChat([researcher, analyst, reporter]) # 执行任务 result = await Console( team.run_stream( task="请研究、分析并报告区块链技术在2024年的最新发展趋势" ) ) print("团队协作结果:") print(result.messages[-1].content) finally: await mcp_client.disconnect_all() async def create_distributed_system(): # 创建多个MCP客户端(模拟分布式环境) search_client = AutoGenMCPClient() # 使用SSE连接远程MCP Server sse_params = SseServerParams( url="http://localhost:8000/mcp", headers={"Authorization": "Bearer your-token"} ) await search_client.connect_server("remote_search", sse_params) # 创建专门的搜索Agent search_agent = AssistantAgent( name="搜索专家", model_client=OpenAI(model="gpt-4"), tools=await search_client.get_tools("remote_search"), system_message="你专门负责信息搜索和数据收集" ) # 创建本地分析Agent local_analyst = AssistantAgent( name="本地分析师", model_client=OpenAI(model="gpt-4"), system_message="你负责分析从搜索Agent获得的数据" ) # 创建分布式团队 distributed_team = RoundRobinGroupChat([search_agent, local_analyst]) return distributed_team, search_client if __name__ == "__main__": asyncio.run(run_autogen_team())
自定义MCP工具集成
from autogen_agentchat.base import Tool from typing import Any, Dict import json class CustomMCPTool(Tool): """自定义MCP工具包装器""" def __init__(self, mcp_client, tool_name: str): self.mcp_client = mcp_client self.tool_name = tool_name async def run(self, **kwargs) -> Any: """执行MCP工具""" try: result = await self.mcp_client.call_tool( self.tool_name, kwargs ) return result except Exception as e: return f"工具调用失败: {str(e)}" @property def schema(self) -> Dict[str, Any]: """工具schema""" return { "type": "function", "function": { "name": self.tool_name, "description": f"调用{self.tool_name}工具", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "查询参数" } }, "required": ["query"] } } } async def create_custom_tools_agent(): # 创建MCP客户端 mcp_client = AutoGenMCPClient() # 连接多个MCP服务 await mcp_client.connect_server( "search", StdioServerParams(command="npx", args=["@tavily/mcp-server"]) ) # 创建自定义工具 search_tool = CustomMCPTool(mcp_client, "tavily_search") # 创建Agent agent = AssistantAgent( name="多工具专家", model_client=OpenAI(model="gpt-4"), tools=[search_tool], system_message="你是一个能够使用多种工具的专家" ) return agent, mcp_client # 运行自定义工具Agent async def run_custom_tools(): agent, mcp_client = await create_custom_tools_agent() try: # 创建单Agent对话 from autogen_agentchat.teams import Swarm team = Swarm([agent]) result = await team.run( "使用搜索工具查找Python asyncio的最佳实践" ) print("自定义工具结果:") print(result.messages[-1].content) finally: await mcp_client.disconnect_all()
2.5 Pydantic AI - 结构化输出
Pydantic AI
结合了Pydantic
的类型验证能力,强调输出的类型安全和结构化,特别适合需要JSON
或特定数据模型的场景。
MCP
集成 :通过MCPServerStdio
或MCPServerHTTP
连接。- 应用场景:要求
Agent
返回结构化的ResearchReport
对象,包含key_findings、search_results
等字段,便于下游程序直接解析。 - 优势 :输出可预测、可验证,减少后处理成本。
- 安装依赖:
pip install pydantic-ai mcp-client-python
from pydantic_ai import Agent, ModelRetry from pydantic import BaseModel, Field from mcp_client import MCPServerStdio, MCPServerHTTP from typing import List, Optional import asyncio # 定义结构化输出模型 class SearchResult(BaseModel): title: str = Field(description="搜索结果标题") content: str = Field(description="搜索结果内容") url: str = Field(description="搜索结果链接") relevance_score: float = Field(description="相关度评分", ge=0.0, le=1.0) class ResearchReport(BaseModel): topic: str = Field(description="研究主题") executive_summary: str = Field(description="执行摘要") key_findings: List[str] = Field(description="关键发现") search_results: List[SearchResult] = Field(description="搜索结果") conclusion: str = Field(description="结论") confidence_level: float = Field(description="置信度", ge=0.0, le=1.0) async def create_pydantic_agent(): # 创建MCP Server连接 mcp_server = MCPServerStdio( command="npx", args=["@tavily/mcp-server"] ) # 获取工具 tools = await mcp_server.get_tools() # 创建Agent agent = Agent( model="openai:gpt-4", tools=tools, system_prompt=""" 你是一个专业的研究助手,能够: 1. 使用搜索工具获取信息 2. 分析和整理搜索结果 3. 生成结构化的研究报告 请确保输出符合指定的数据结构。 """, result_type=ResearchReport ) return agent, mcp_server async def run_pydantic_agent(): agent, mcp_server = await create_pydantic_agent() try: # 执行结构化查询 result = await agent.run( "研究人工智能在教育领域的应用现状,生成详细的研究报告" ) print("结构化研究报告:") print(f"研究主题: {result.data.topic}") print(f"执行摘要: {result.data.executive_summary}") print(f"置信度: {result.data.confidence_level}") print("\n关键发现:") for i, finding in enumerate(result.data.key_findings, 1): print(f"{i}. {finding}") print("\n搜索结果:") for i, search_result in enumerate(result.data.search_results, 1): print(f"{i}. {search_result.title} (相关度: {search_result.relevance_score})") print(f" 链接: {search_result.url}") print(f" 摘要: {search_result.content[:100]}...") print(f"\n结论: {result.data.conclusion}") finally: await mcp_server.disconnect() class AnalysisStep(BaseModel): step_name: str = Field(description="分析步骤名称") description: str = Field(description="步骤描述") tools_used: List[str] = Field(description="使用的工具") findings: str = Field(description="发现内容") confidence: float = Field(description="步骤置信度", ge=0.0, le=1.0) class MultiStepAnalysis(BaseModel): query: str = Field(description="原始查询") analysis_steps: List[AnalysisStep] = Field(description="分析步骤") final_answer: str = Field(description="最终答案") overall_confidence: float = Field(description="整体置信度", ge=0.0, le=1.0) sources_cited: int = Field(description="引用源数量") async def create_advanced_pydantic_agent(): # 创建HTTP MCP Server连接 mcp_server = MCPServerHTTP( base_url="http://localhost:8000/mcp", headers={"Authorization": "Bearer your-token"} ) tools = await mcp_server.get_tools() # 创建多步骤分析Agent agent = Agent( model="openai:gpt-4", tools=tools, system_prompt=""" 你是一个专业的分析师,能够进行多步骤深度分析。 分析流程: 1. 信息收集 - 使用搜索工具收集相关信息 2. 数据验证 - 验证信息的准确性 3. 深度分析 - 分析数据的含义和影响 4. 结论综合 - 整合所有发现得出结论 每个步骤都要记录使用的工具、发现的内容和置信度。 """, result_type=MultiStepAnalysis ) return agent, mcp_server async def run_advanced_analysis(): agent, mcp_server = await create_advanced_pydantic_agent() try: result = await agent.run( "分析区块链技术在供应链管理中的应用前景和挑战" ) print("多步骤分析结果:") print(f"查询: {result.data.query}") print(f"整体置信度: {result.data.overall_confidence}") print(f"引用源数量: {result.data.sources_cited}") print("\n分析步骤:") for i, step in enumerate(result.data.analysis_steps, 1): print(f"步骤 {i}: {step.step_name}") print(f" 描述: {step.description}") print(f" 使用工具: {', '.join(step.tools_used)}") print(f" 发现: {step.findings}") print(f" 置信度: {step.confidence}") print() print(f"最终答案: {result.data.final_answer}") finally: await mcp_server.disconnect() if __name__ == "__main__": # 运行基础结构化Agent asyncio.run(run_pydantic_agent()) # 运行高级分析Agent print("\n" + "=" * 50 + "\n") asyncio.run(run_advanced_analysis())
2.6 SmolAgents - 轻量级代码生成
Hugging Face
推出的轻量级框架,其核心是让Agent
生成代码来调用工具。
MCP
集成 :使用ToolCollection.from_mcp
导入工具。应用场景 :
Agent
被要求“并行搜索多个主题”,它会生成Python
asyncio.gather
代码来并发调用MCP
搜索工具。优势 :灵活性高,
Agent
可以自由组合和调度工具。安装依赖:
pip install smolagents mcp-client-python
from smolagents import CodeAgent, ToolCollection from mcp_client import MCPClient from smolagents import Tool import asyncio async def create_smol_agent(): # 创建MCP客户端 mcp_client = MCPClient() await mcp_client.connect_stdio( command="npx", args=["@tavily/mcp-server"] ) # 使用ToolCollection导入MCP工具 tools = ToolCollection.from_mcp(mcp_client) # 创建CodeAgent agent = CodeAgent( tools=tools, model="gpt-4", system_prompt=""" 你是一个专业的代码生成助手,能够: 1. 理解用户需求 2. 生成Python代码调用工具 3. 执行代码并返回结果 请总是生成清晰、可执行的代码。 """ ) return agent, mcp_client async def run_smol_agent(): agent, mcp_client = await create_smol_agent() try: # 执行代码生成任务 result = await agent.run( """ 请帮我搜索"Python异步编程最佳实践", 然后分析搜索结果,提取关键要点。 """ ) print("SmolAgent生成的代码:") print(result.code) print("\n执行结果:") print(result.output) finally: await mcp_client.disconnect() # 自定义工具集成示例 async def create_custom_smol_tools(): # 创建自定义工具 class CustomSearchTool(Tool): name = "custom_search" description = "自定义搜索工具,支持高级搜索选项" def __init__(self, mcp_client): self.mcp_client = mcp_client super().__init__() async def forward(self, query: str, max_results: int = 5) -> str: """执行自定义搜索""" try: # 调用MCP工具 result = await self.mcp_client.call_tool( "tavily_search", { "query": query, "max_results": max_results } ) return result except Exception as e: return f"搜索失败: {str(e)}" @property def inputs(self): return { "query": {"type": "string", "description": "搜索查询"}, "max_results": {"type": "integer", "description": "最大结果数", "default": 5} } @property def output_type(self): return "string" # 创建MCP客户端 mcp_client = MCPClient() await mcp_client.connect_stdio( command="npx", args=["@tavily/mcp-server"] ) # 创建自定义工具 custom_tool = CustomSearchTool(mcp_client) # 创建工具集合 tools = ToolCollection([custom_tool]) # 创建Agent agent = CodeAgent( tools=tools, model="gpt-4", system_prompt=""" 你是一个专业的搜索分析师,能够: 1. 使用自定义搜索工具 2. 分析搜索结果 3. 生成洞察报告 """ ) return agent, mcp_client async def run_custom_smol_agent(): agent, mcp_client = await create_custom_smol_tools() try: result = await agent.run( """ 使用自定义搜索工具搜索"机器学习模型部署"相关内容, 限制结果数量为3个,然后分析这些结果的共同点。 """ ) print("自定义SmolAgent结果:") print(result.code) print("\n输出:") print(result.output) finally: await mcp_client.disconnect() # 批量任务处理示例 async def create_batch_processing_agent(): # 创建MCP客户端 mcp_client = MCPClient() await mcp_client.connect_stdio( command="npx", args=["@tavily/mcp-server"] ) tools = ToolCollection.from_mcp(mcp_client) # 创建批量处理Agent agent = CodeAgent( tools=tools, model="gpt-4", system_prompt=""" 你是一个批量任务处理专家,能够: 1. 处理多个搜索任务 2. 并行执行工具调用 3. 汇总和分析结果 请使用Python的asyncio来实现并行处理。 """ ) return agent, mcp_client async def run_batch_processing(): agent, mcp_client = await create_batch_processing_agent() try: # 批量搜索任务 queries = [ "人工智能在金融领域的应用", "区块链技术发展趋势", "云计算市场分析", "物联网安全挑战" ] result = await agent.run( f""" 对以下查询进行批量搜索和分析: {queries} 请: 1. 并行执行所有搜索 2. 分析每个查询的结果 3. 生成综合报告 """ ) print("批量处理结果:") print(result.code) print("\n综合报告:") print(result.output) finally: await mcp_client.disconnect() if __name__ == "__main__": # 运行基础SmolAgent asyncio.run(run_smol_agent()) # 运行自定义工具Agent print("\n" + "=" * 50 + "\n") asyncio.run(run_custom_smol_agent()) # 运行批量处理Agent print("\n" + "=" * 50 + "\n") asyncio.run(run_batch_processing())
2.7 Camel - 多Agent角色扮演
Camel
专为多Agent
角色扮演和协作任务设计,特别适合需要不同专业角色协作的场景。
MCP
集成 :通过MCPToolkit
提供工具。应用场景 :模拟“研究员”、“策略师”和“执行官”的团队协作。研究员使用
MCP
搜索市场数据,策略师据此制定策略,执行官规划落地步骤。优势 :生动地模拟了人类团队的协作过程。
安装依赖:
pip install camel-ai mcp-client-python
from camel.agents import ChatAgent from camel.messages import BaseMessage from camel.types import RoleType, ModelType from camel.toolkits import MCPToolkit from mcp_client import MCPClient from camel.societies import RolePlaying import asyncio async def create_camel_agents(): # 创建MCP客户端 mcp_client = MCPClient() await mcp_client.connect_stdio( command="npx", args=["@tavily/mcp-server"] ) # 创建MCP工具包 mcp_toolkit = MCPToolkit(mcp_client) tools = mcp_toolkit.get_tools() # 创建研究员Agent researcher = ChatAgent( system_message=BaseMessage.make_assistant_message( role_name="研究员", content=""" 你是一个专业的研究员,具备以下特点: 1. 善于信息收集和分析 2. 能够使用各种搜索工具 3. 注重数据的准确性和可靠性 4. 擅长发现趋势和模式 你的任务是为团队提供准确、全面的研究数据。 """ ), model_type=ModelType.GPT_4, tools=tools ) # 创建策略师Agent strategist = ChatAgent( system_message=BaseMessage.make_assistant_message( role_name="策略师", content=""" 你是一个资深的策略师,具备以下能力: 1. 基于研究数据制定策略 2. 分析市场趋势和机会 3. 识别风险和挑战 4. 提供可行的行动建议 你的任务是将研究结果转化为实际的策略建议。 """ ), model_type=ModelType.GPT_4 ) # 创建执行官Agent executor = ChatAgent( system_message=BaseMessage.make_assistant_message( role_name="执行官", content=""" 你是一个经验丰富的执行官,专长包括: 1. 将策略转化为具体行动计划 2. 评估资源需求和时间安排 3. 识别执行风险和解决方案 4. 制定关键指标和里程碑 你的任务是确保策略能够有效执行。 """ ), model_type=ModelType.GPT_4 ) return researcher, strategist, executor, mcp_client async def run_camel_collaboration(): researcher, strategist, executor, mcp_client = await create_camel_agents() try: # 定义协作任务 task = "制定一个关于人工智能在零售业应用的完整商业策略" # 第一阶段:研究员收集信息 research_prompt = BaseMessage.make_user_message( role_name="项目经理", content=f""" 请对以下主题进行全面研究:{task} 需要重点关注: 1. 当前市场状况 2. 技术发展趋势 3. 成功案例分析 4. 潜在挑战和机遇 """ ) research_result = await researcher.step(research_prompt) print("研究员报告:") print(research_result.msg.content) print("\n" + "="*50 + "\n") # 第二阶段:策略师制定策略 strategy_prompt = BaseMessage.make_user_message( role_name="项目经理", content=f""" 基于研究员的报告,请制定详细的商业策略: 研究报告: {research_result.msg.content} 请提供: 1. 市场定位策略 2. 技术实施方案 3. 竞争优势分析 4. 风险评估和应对 """ ) strategy_result = await strategist.step(strategy_prompt) print("策略师方案:") print(strategy_result.msg.content) print("\n" + "="*50 + "\n") # 第三阶段:执行官制定执行计划 execution_prompt = BaseMessage.make_user_message( role_name="项目经理", content=f""" 基于策略师的方案,请制定详细的执行计划: 策略方案: {strategy_result.msg.content} 请提供: 1. 具体行动步骤 2. 资源需求和预算 3. 时间表和里程碑 4. 关键指标和评估方法 """ ) execution_result = await executor.step(execution_prompt) print("执行官计划:") print(execution_result.msg.content) finally: await mcp_client.disconnect() # 动态角色分配示例 async def create_dynamic_role_system(): # 创建MCP客户端 mcp_client = MCPClient() await mcp_client.connect_stdio( command="npx", args=["@tavily/mcp-server"] ) mcp_toolkit = MCPToolkit(mcp_client) tools = mcp_toolkit.get_tools() # 创建角色扮演系统 role_play = RolePlaying( assistant_role_name="AI专家", user_role_name="企业顾问", assistant_agent_kwargs={ "model_type": ModelType.GPT_4, "tools": tools }, user_agent_kwargs={ "model_type": ModelType.GPT_4 } ) return role_play, mcp_client async def run_dynamic_roles(): role_play, mcp_client = await create_dynamic_role_system() try: # 初始化对话 task_prompt = """ 我需要为我的制造业公司制定一个AI转型战略。 请AI专家研究当前的AI技术趋势, 企业顾问则基于实际业务需求提供建议。 让我们开始这个协作过程。 """ # 运行角色扮演对话 input_msg = BaseMessage.make_user_message( role_name="项目发起人", content=task_prompt ) print("开始角色扮演协作:") print(f"任务: {task_prompt}") print("\n" + "="*50 + "\n") # 运行多轮对话 for i in range(3): # 进行3轮对话 assistant_msg, user_msg = await role_play.step(input_msg) print(f"第{i+1}轮对话:") print(f"AI专家: {assistant_msg.content}") print(f"企业顾问: {user_msg.content}") print("\n" + "-"*30 + "\n") # 准备下一轮输入 input_msg = user_msg finally: await mcp_client.disconnect() # 专业团队协作示例 async def create_professional_team(): # 创建MCP客户端 mcp_client = MCPClient() await mcp_client.connect_stdio( command="npx", args=["@tavily/mcp-server"] ) mcp_toolkit = MCPToolkit(mcp_client) tools = mcp_toolkit.get_tools() # 定义专业角色 roles = { "数据科学家": { "description": "专注于数据分析、机器学习模型开发和数据洞察", "tools": tools }, "产品经理": { "description": "负责产品规划、用户需求分析和市场策略", "tools": tools }, "技术架构师": { "description": "设计系统架构、技术选型和实施方案", "tools": [] }, "业务分析师": { "description": "分析业务流程、需求分析和效益评估", "tools": [] } } # 创建Agent团队 team = {} for role_name, role_config in roles.items(): team[role_name] = ChatAgent( system_message=BaseMessage.make_assistant_message( role_name=role_name, content=f""" 你是一个专业的{role_name},职责是:{role_config['description']} 请以专业的角度参与团队协作, 提供你领域内的专业建议和见解。 """ ), model_type=ModelType.GPT_4, tools=role_config.get('tools', []) ) return team, mcp_client async def run_professional_team(): team, mcp_client = await create_professional_team() try: # 团队协作任务 project_brief = """ 项目:开发一个基于AI的客户服务聊天机器人 需求: 1. 能够理解客户问题并提供准确回答 2. 支持多语言交互 3. 具备学习和优化能力 4. 集成现有客服系统 请各位专家从自己的角度提供专业建议。 """ print("专业团队协作开始:") print(f"项目简介: {project_brief}") print("\n" + "="*50 + "\n") # 让每个角色分别提供建议 for role_name, agent in team.items(): prompt = BaseMessage.make_user_message( role_name="项目经理", content=f""" 项目简介:{project_brief} 请从{role_name}的角度,提供专业的建议和方案。 """ ) result = await agent.step(prompt) print(f"{role_name}的建议:") print(result.msg.content) print("\n" + "-"*30 + "\n") # 生成综合方案 print("团队协作完成,各专家已提供专业建议。") finally: await mcp_client.disconnect() if __name__ == "__main__": # 运行基础协作 asyncio.run(run_camel_collaboration()) # 运行动态角色系统 print("\n" + "="*70 + "\n") asyncio.run(run_dynamic_roles()) # 运行专业团队 print("\n" + "="*70 + "\n") asyncio.run(run_professional_team())
2.8 CrewAI - 结构化Agent团队
CrewAI
专注于构建结构化的Agent
团队(Crew
),强调明确的角色(Role
)、目标(Goal
)和任务(Task
)的结构化团队。
MCP
集成 :需自定义MCPSearchTool
类,包装MCP
调用逻辑。应用场景 :创建一个
“市场研究员”Agent
,其目标是“收集和分析信息”
,通过MCP
工具完成研究任务。优势 :任务流程清晰,易于管理和监控。
安装依赖:
pip install crewai mcp-client-python
# 安装第三方MCP适配器 # pip install crewai-mcp-adapter from crewai import Agent, Task, Crew, Process from crewai.tools import BaseTool import asyncio import json import subprocess import sys from typing import Dict, Any, Optional import os # 简化的MCP客户端实现 class MCPClient: def __init__(self): self.process = None self.connected = False async def connect_stdio(self, command: str, args: list, env: Optional[Dict[str, str]] = None): """连接到MCP服务器""" try: # 设置环境变量 full_env = os.environ.copy() if env: full_env.update(env) # 启动MCP服务器进程 self.process = await asyncio.create_subprocess_exec( command, *args, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=full_env ) # 发送初始化请求 init_request = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {} }, "clientInfo": { "name": "CrewAI-MCP-Client", "version": "1.0.0" } } } await self._send_request(init_request) response = await self._receive_response() if response.get("result"): self.connected = True print("MCP客户端连接成功") return True else: print(f"MCP连接失败: {response}") return False except Exception as e: print(f"MCP连接错误: {e}") return False async def _send_request(self, request: Dict[str, Any]): """发送请求到MCP服务器""" if not self.process: raise Exception("MCP客户端未连接") request_str = json.dumps(request) + "\n" self.process.stdin.write(request_str.encode()) await self.process.stdin.drain() async def _receive_response(self) -> Dict[str, Any]: """接收来自MCP服务器的响应""" if not self.process: raise Exception("MCP客户端未连接") line = await self.process.stdout.readline() if line: try: return json.loads(line.decode().strip()) except json.JSONDecodeError as e: print(f"JSON解析错误: {e}") return {"error": "JSON解析失败"} return {"error": "无响应"} async def call_tool(self, tool_name: str, params: Dict[str, Any]) -> str: """调用MCP工具""" if not self.connected: return "MCP客户端未连接" try: # 发送工具调用请求 request = { "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": { "name": tool_name, "arguments": params } } await self._send_request(request) response = await self._receive_response() if "result" in response: content = response["result"].get("content", []) if content: return content[0].get("text", "无内容") return "工具调用成功但无返回内容" else: return f"工具调用失败: {response.get('error', '未知错误')}" except Exception as e: return f"工具调用异常: {str(e)}" async def disconnect(self): """断开MCP连接""" if self.process: self.process.terminate() await self.process.wait() self.connected = False print("MCP客户端已断开连接") # 创建MCP工具适配器 class MCPSearchTool(BaseTool): name: str = "MCP搜索工具" description: str = "使用MCP协议进行网络搜索,获取最新信息" def __init__(self, mcp_client): super().__init__() self.mcp_client = mcp_client def _run(self, query: str) -> str: """执行搜索""" try: # 在新的事件循环中运行异步代码 import asyncio try: loop = asyncio.get_event_loop() if loop.is_running(): # 如果事件循环正在运行,创建一个新的事件循环 import threading result = [None] exception = [None] def run_in_thread(): new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) try: result[0] = new_loop.run_until_complete( self.mcp_client.call_tool("tavily_search", {"query": query}) ) except Exception as e: exception[0] = e finally: new_loop.close() thread = threading.Thread(target=run_in_thread) thread.start() thread.join() if exception[0]: raise exception[0] return result[0] else: result = loop.run_until_complete( self.mcp_client.call_tool("tavily_search", {"query": query}) ) return result except RuntimeError: # 如果没有事件循环,创建一个新的 result = asyncio.run( self.mcp_client.call_tool("tavily_search", {"query": query}) ) return result except Exception as e: return f"搜索失败: {str(e)}" async def create_crew_with_mcp(): """创建带有MCP工具的Crew""" # 创建MCP客户端 mcp_client = MCPClient() # 连接到Tavily搜索服务 # 注意:需要设置TAVILY_API_KEY环境变量 success = await mcp_client.connect_stdio( command="npx", args=["-y", "@tavily/mcp-server"], env={"TAVILY_API_KEY": os.getenv("TAVILY_API_KEY", "your_api_key_here")} ) if not success: print("警告:MCP连接失败,将使用模拟数据") # 创建模拟工具 class MockSearchTool(BaseTool): name: str = "模拟搜索工具" description: str = "模拟搜索工具,用于演示" def _run(self, query: str) -> str: return f"模拟搜索结果:关于'{query}'的信息 - 这是一个模拟搜索结果,包含了相关的基础信息。" search_tool = MockSearchTool() else: # 创建MCP搜索工具 search_tool = MCPSearchTool(mcp_client) # 创建研究员Agent researcher = Agent( role='资深研究员', goal='收集和分析相关信息,提供准确的研究报告', backstory=""" 你是一个经验丰富的研究员,擅长从各种来源收集信息, 进行深入分析,并提供有价值的洞察。你总是确保信息的准确性和相关性。 """, tools=[search_tool], verbose=True, allow_delegation=False ) # 创建分析师Agent analyst = Agent( role='数据分析师', goal='分析研究数据,识别趋势和模式,提供专业见解', backstory=""", 你是一个专业的数据分析师,善于从复杂的数据中提取有意义的信息, 识别趋势和模式,并提供基于数据的专业建议。 """, verbose=True, allow_delegation=False ) # 创建内容创作者Agent content_creator = Agent( role='内容创作专家', goal='将分析结果转化为易于理解的内容', backstory=""", 你是一个专业的内容创作专家,擅长将复杂的分析结果 转化为清晰、有吸引力的内容,确保读者能够轻松理解。 """, verbose=True, allow_delegation=False ) return researcher, analyst, content_creator, mcp_client async def create_crew_tasks(researcher, analyst, content_creator): """创建任务""" # 定义研究任务 research_task = Task( description=""" 对"人工智能在医疗保健中的应用"进行全面研究。 需要研究的方面: 1. 当前应用现状 2. 主要技术和解决方案 3. 成功案例和失败教训 4. 未来发展趋势 5. 面临的挑战和机遇 请提供详细的研究报告。 """, expected_output="一份包含当前状况、技术方案、案例分析和趋势预测的详细研究报告", agent=researcher ) # 定义分析任务 analysis_task = Task( description=""", 基于研究报告,进行深入的数据分析。 分析要点: 1. 市场规模和增长趋势 2. 技术成熟度分析 3. 竞争格局评估 4. 投资机会识别 5. 风险因素评估 请提供专业的分析报告。 """, expected_output="一份包含市场分析、技术评估、竞争分析和投资建议的专业分析报告", agent=analyst ) # 定义内容创作任务 content_task = Task( description=""", 基于研究和分析报告,创作一篇高质量的科普文章。 内容要求: 1. 通俗易懂,适合普通读者 2. 结构清晰,逻辑性强 3. 包含实际案例和数据 4. 提供实用的建议 5. 字数控制在1500-2000字 请创作一篇引人入胜的科普文章。 """, expected_output="一篇1500-2000字的高质量科普文章,内容准确、易懂、有趣", agent=content_creator ) return research_task, analysis_task, content_task async def create_advanced_crew(): """创建高级Crew配置""" # 创建MCP客户端 mcp_client = MCPClient() success = await mcp_client.connect_stdio( command="npx", args=["-y", "@tavily/mcp-server"], env={"TAVILY_API_KEY": os.getenv("TAVILY_API_KEY", "your_api_key_here")} ) if success: search_tool = MCPSearchTool(mcp_client) else: # 使用模拟工具 class MockSearchTool(BaseTool): name: str = "模拟搜索工具" description: str = "模拟搜索工具,用于演示" def _run(self, query: str) -> str: return f"模拟搜索结果:关于'{query}'的专业信息和最新数据。" search_tool = MockSearchTool() # 创建专业化的Agent团队 market_researcher = Agent( role='市场研究专家', goal='专注于市场趋势和竞争分析', backstory='你是一个资深的市场研究专家,对各行业的市场动态有深入了解。', tools=[search_tool], verbose=True, max_iter=3, # 最大迭代次数 memory=True # 启用记忆功能 ) tech_analyst = Agent( role='技术分析师', goal='专注于技术发展和创新分析', backstory='你是一个技术分析专家,对新兴技术和创新趋势有敏锐的洞察力。', tools=[search_tool], verbose=True, max_iter=3, memory=True ) strategy_consultant = Agent( role='战略咨询师', goal='提供战略建议和实施方案', backstory='你是一个经验丰富的战略咨询师,擅长将研究和分析转化为可执行的战略方案。', verbose=True, max_iter=3, memory=True ) return market_researcher, tech_analyst, strategy_consultant, mcp_client async def run_crew_ai(): """运行CrewAI任务""" try: print("开始创建CrewAI团队...") # 创建Agents researcher, analyst, content_creator, mcp_client = await create_crew_with_mcp() # 创建任务 research_task, analysis_task, content_task = await create_crew_tasks( researcher, analyst, content_creator ) # 创建Crew crew = Crew( agents=[researcher, analyst, content_creator], tasks=[research_task, analysis_task, content_task], process=Process.sequential, # 顺序执行 verbose=2 ) print("📋 开始执行CrewAI任务...") # 执行任务 result = crew.kickoff() print("\nCrewAI任务完成!") print("\n最终结果:") print("="*50) print(result) print("="*50) return result except Exception as e: print(f"执行过程中出现错误: {str(e)}") return None finally: # 清理资源 if 'mcp_client' in locals(): await mcp_client.disconnect() async def run_advanced_crew(): """运行高级Crew配置""" try: print("开始创建高级CrewAI团队...") # 创建高级Agents market_researcher, tech_analyst, strategy_consultant, mcp_client = await create_advanced_crew() # 创建高级任务 market_task = Task( description=""", 对人工智能医疗保健市场进行深入的市场研究。 重点关注: 1. 全球市场规模和增长预测 2. 主要参与者和竞争格局 3. 区域市场差异 4. 监管环境影响 """, expected_output="详细的市场研究报告,包含数据分析和竞争格局", agent=market_researcher ) tech_task = Task( description=""", 分析人工智能在医疗保健中的技术发展趋势。 重点关注: 1. 关键技术栈和解决方案 2. 技术成熟度评估 3. 创新突破点 4. 技术实施挑战 """, expected_output="技术分析报告,包含技术路线图和实施建议", agent=tech_analyst ) strategy_task = Task( description=""", 基于市场研究和技术分析,制定战略建议。 重点关注: 1. 投资机会识别 2. 风险评估和缓解 3. 实施路径规划 4. 成功因素分析 """, expected_output="战略建议报告,包含可执行的行动方案", agent=strategy_consultant ) # 创建高级Crew advanced_crew = Crew( agents=[market_researcher, tech_analyst, strategy_consultant], tasks=[market_task, tech_task, strategy_task], process=Process.sequential, verbose=2 ) print("📋 开始执行高级CrewAI任务...") # 执行任务 result = advanced_crew.kickoff() print("\n高级CrewAI任务完成!") print("\n最终结果:") print("="*50) print(result) print("="*50) return result except Exception as e: print(f"❌ 执行过程中出现错误: {str(e)}") return None finally: # 清理资源 if 'mcp_client' in locals(): await mcp_client.disconnect() def main(): """主函数""" print("CrewAI + MCP 集成演示") print("="*50) # 检查环境变量 if not os.getenv("TAVILY_API_KEY"): print(" 警告:未设置TAVILY_API_KEY环境变量") print(" 将使用模拟数据进行演示") print(" 要获取真实搜索结果,请设置环境变量:") print(" export TAVILY_API_KEY=your_api_key_here") print() print("请选择运行模式:") print("1. 基础CrewAI演示") print("2. 高级CrewAI演示") choice = input("请输入选择 (1 或 2): ").strip() if choice == "1": asyncio.run(run_crew_ai()) elif choice == "2": asyncio.run(run_advanced_crew()) else: print("无效选择,运行基础演示...") asyncio.run(run_crew_ai()) if __name__ == "__main__": main()
3. 结论与展望
本文系统性地说明了MCP
协议作为AI Agent
“神经中枢”的价值。通过将八大主流框架与MCP
集成,我们证明了其强大的通用性和工程可行性。MCP
不仅解决了工具调用的标准化问题,更促进了Agent
生态的繁荣。
未来,MCP
有望成为AI Agent
领域的“TCP/IP协议”
。我们期待看到更多企业级特性(如权限控制、审计日志、服务网格)被集成进来,推动AI Agent
从实验室走向千行百业。