基于MCP架构的LLM-Agent融合—构建AI Agent的技术体系与落地实践

发布于:2025-07-24 ⋅ 阅读:(30) ⋅ 点赞:(0)

随着大型语言模型(LLM)能力的不断提升,构建具备复杂任务处理能力的AI Agent系统成为研究与应用的热点。然而,赋予Agent调用外部工具的能力是其走向实用化的关键一步。本文旨在探讨一种新兴的标准化工具接口协议——Model Context Protocol (MCP),并深入分析如何将其集成到当前八种主流的LLM Agent开发框架中,包括OpenAI Agents SDK、LangGraph、LlamaIndex、AutoGen、Pydantic AI、SmolAgents、Camel和CrewAI。通过梳理各框架的核心特性、集成方法及代码示例,本文为开发者提供了一套清晰的实践指南,旨在促进Agent系统与外部世界的高效交互,推动AI Agent技术的标准化与工程化落地。

1. 什么是 MCP Server?

MCP(Model Context Protocol)Server 是一个标准化的工具接口协议,它允许 AI Agent 通过统一的方式调用各种外部工具和服务。无论是搜索引擎、数据库查询,还是API调用,MCP Server 都能提供标准化的接入方式。

MCP Server支持两种主要连接模式:

  • Stdio模式:通过命令行进程通信,适合本地开发
  • SSE模式:通过HTTP连接,适合生产环境部署

在这里插入图片描述

2. 主流LLM Agent框架与MCP集成实战

2.1 OpenAI Agents SDK - 轻量级多Agent协作

OpenAI官方推出的轻量级框架,特别适合构建简单的多Agent协作场景。

  • 集成要点 : 使用mcp-client-python库连接MCP Server,并将获取的工具传递给create_agent 函数。

  • 安装依赖: pip install openai-agents-sdk mcp-client-python

  • 优势 :API简洁,OpenAI生态无缝集成。

  • 应用场景 :创建“研究员”和“写手Agent,研究员调用MCP搜索工具获取信息,完成后将任务转交给写手生成报告。

    # 基础Agent设置
    from openai_agents import Agent, create_agent
    from mcp_client import MCPClient
    import asyncio
    
    async def setup_openai_agent_with_mcp():
        # 创建MCP客户端
        mcp_client = MCPClient()
        # 连接Tavily搜索工具
        await mcp_client.connect_stdio(
            command="npx",
            args=["@tavily/mcp-server"]
        )
        # 获取工具列表
        tools = await mcp_client.get_tools()
        # 创建Agent
        agent = create_agent(
            name="搜索助手",
            instructions="你是一个专业的搜索助手,能够帮助用户获取最新信息",
            tools=tools,
            model="gpt-4"
        )
        return agent, mcp_client
    
    # 运行Agent
    async def main():
        agent, mcp_client = await setup_openai_agent_with_mcp()
        # 与Agent对话
        response = await agent.run(
            "帮我搜索一下2025年AI领域的重要进展"
        )
        print(response.content)
        # 清理连接
        await mcp_client.disconnect()
    
    # 执行
    if __name__ == "__main__":
        asyncio.run(main())
    

创建和运行多Agent协作系统的异步函数,使用了类似 OpenAI Assistants API 的概念,并结合了 MCP(Model Control Protocol)客户端来集成外部工具(如 Tavily 搜索):

  • 安装依赖: pip install langgraph langchain-openai tavily-python

    import asyncio
    from typing import TypedDict, Annotated, Sequence
    from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
    from langchain_openai import ChatOpenAI
    from tavily import TavilyClient
    from langgraph.graph import StateGraph, END
    
    # 初始化模型
    llm = ChatOpenAI(model="gpt-4", temperature=0)
    
    # 初始化 Tavily 搜索工具
    tavily = TavilyClient(api_key="your_tavily_api_key")  # 替换为你的 Tavily Key
    
    # 定义状态
    class AgentState(TypedDict):
        messages: Annotated[Sequence[BaseMessage], ...]
        current_agent: str
        research_data: str
    
    # 创建研究员 Agent
    async def researcher_node(state: AgentState):
        query = state['messages'][-1].content
        print("🔍 研究员正在搜集信息...")
        search_result = tavily.search(query=query, max_results=5)
        result_text = "\n\n".join([r["content"] for r in search_result["results"]])
        
        # 调用 LLM 做初步分析(可选)
        response = llm.invoke([
            HumanMessage(content=f"请总结以下关于'{query}'的信息,用于写科普文章:\n{result_text}")
        ])
        
        return {
            **state,
            "research_data": response.content,
            "messages": state["messages"] + [AIMessage(content=response.content)],
            "current_agent": "writer"
        }
    
    # 创建写手 Agent
    async def writer_node(state: AgentState):
        print("✍️ 写手正在撰写文章...")
        research_summary = state["research_data"]
        
        prompt = f"""
        请将以下研究内容整理成一篇通俗易懂的中文科普文章,风格生动,适合大众阅读:
    
        {research_summary}
        """
        
        response = llm.invoke([HumanMessage(content=prompt)])
        
        return {
            **state,
            "messages": state["messages"] + [AIMessage(content=response.content)],
            "current_agent": "end"
        }
    
    # 构建图
    async def create_agent_team():
        workflow = StateGraph(AgentState)
    
        workflow.add_node("researcher", researcher_node)
        workflow.add_node("writer", writer_node)
    
        workflow.set_entry_point("researcher")
    
        # 设置流转逻辑
        workflow.add_conditional_edges(
            "researcher",
            lambda x: "writer",
            {
                "writer": "writer"
            }
        )
        workflow.add_edge("writer", END)
    
        graph = workflow.compile()
        return graph
    
    # 运行任务
    async def run_team_task():
        app = await create_agent_team()
    
        inputs = {
            "messages": [HumanMessage(content="研究量子计算的最新突破,并整理成一篇科普文章")],
            "current_agent": "researcher",
            "research_data": ""
        }
    
        async for output in app.astream(inputs):
            for key, value in output.items():
                print(f"\n[Node: {key}]")
                if 'research_data' in value:
                    print("📊 研究完成,数据已生成...")
                if 'messages' in value and len(value['messages']) > 0:
                    last_msg = value['messages'][-1]
                    if isinstance(last_msg, AIMessage):
                        print("📝 输出:", last_msg.content[:200] + "..." if len(last_msg.content) > 200 else last_msg.content)
    
        # 最终结果
        final_msg = output.get("writer", {}).get("messages", [])[-1]
        print("\n" + "="*50)
        print("最终文章:\n", final_msg.content)
        print("="*50)
    
    # 主程序入口
    if __name__ == "__main__":
        asyncio.run(run_team_task())
    

    多Agent协作示例

    # 多Agent协作示例
    
    async def create_agent_team():
        # 创建MCP客户端
        mcp_client = MCPClient()
        
        await mcp_client.connect_stdio(
            command="npx",
            args=["@tavily/mcp-server"]
        )
        
        tools = await mcp_client.get_tools()
        
        # 创建研究员Agent
        researcher = create_agent(
            name="研究员",
            instructions="负责信息搜集和分析",
            tools=tools,
            model="gpt-4"
        )
        
        # 创建写手Agent
        writer = create_agent(
            name="写手",
            instructions="负责将研究结果整理成文章",
            model="gpt-4"
        )
        
        # 创建协作工作流
        from openai_agents import Handoffs
        
        # 配置Agent间的任务转交
        handoffs = Handoffs()
        handoffs.add_handoff(
            from_agent=researcher,
            to_agent=writer,
            condition="完成信息收集后转交给写手"
        )
        
        return researcher, writer, handoffs
    
    
    # 运行协作任务
    async def run_team_task():
        researcher, writer, handoffs = await create_agent_team()
        
        # 启动协作任务
        result = await researcher.run(
            "研究量子计算的最新突破,并整理成一篇科普文章",
            handoffs=handoffs
        )
        
        print(result.content)
    

2.2 LangGraph - 有状态图工作流

基于图结构的框架,适用于需要状态管理和复杂决策逻辑的长周期任务。

  • MCP集成 :使用load_mcp_toolsMultiServerMCPClient加载工具。

  • 应用场景 :构建“研究-分析-报告”三阶段工作流。研究节点调用MCP搜索,分析和报告节点则基于研究结果进行纯推理。

  • 优势 :可视化工作流,状态持久化,适合处理复杂、多步骤任务。

  • 安装依赖:pip install langgraph langchain-openai mcp-client-python

    from langgraph.prebuilt import create_react_agent
    from langchain_openai import ChatOpenAI
    from mcp_client import MCPClient, load_mcp_tools
    import asyncio
    async def create_langgraph_agent():
        # 方法1:使用load_mcp_tools
        tools = await load_mcp_tools(
            server_config={
                "command": "npx",
                "args": ["@tavily/mcp-server"]
            }
        )
        # 初始化LLM
        llm = ChatOpenAI(model="gpt-4")
        # 创建ReactAgent
        agent = create_react_agent(
            llm=llm,
            tools=tools,
            state_modifier="你是一个专业的AI助手,能够使用各种工具帮助用户解决问题"
        )
        return agent
    # 运行Agent
    async def run_langgraph_agent():
        agent = await create_langgraph_agent()
        # 执行任务
        config = {"configurable": {"thread_id": "1"}}
        result = await agent.ainvoke(
            {"messages": [("user", "帮我搜索并总结Python 3.12的新特性")]},
            config=config
        )
        print(result["messages"][-1].content)
    if __name__ == "__main__":
        asyncio.run(run_langgraph_agent())
    

集成多个 MCP Server(如 Tavily 搜索、数据库等)作为工具,供一个 Agent 使用,并通过 LangGraph 驱动 ReAct 模式完成任务。

  • 安装依赖: pip install langchain langchain-mcp tavily-python openai

    import asyncio
    import json
    import sys
    from typing import List, Dict, Any
    from langchain_core.tools import Tool
    from langchain_openai import ChatOpenAI
    from langchain.agents import create_react_agent
    from langchain.agents.agent import AgentExecutor
    from langchain.memory import ConversationBufferMemory
    
    # =======================
    # MCP 客户端:与 MCP Server 通信
    # =======================
    class MCPClient:
        def __init__(self, server_id: str, command: str, args: List[str]):
            self.server_id = server_id
            self.command = command
            self.args = args
            self.reader = None
            self.writer = None
    
        async def connect(self):
            print(f"🔁 启动 MCP Server: {self.server_id}")
            proc = await asyncio.create_subprocess_exec(
                self.command,
                *self.args,
                stdin=asyncio.subprocess.PIPE,
                stdout=asyncio.subprocess.PIPE,
                stderr=sys.stderr,
            )
            self.reader, self.writer = proc.stdin, proc.stdout
    
            # 读取初始化消息
            header = await self._read_line()
            if not header.startswith("MCP/"):
                raise RuntimeError(f"Invalid MCP server response: {header}")
    
            # 发送客户端握手
            self._send({"version": "0.2.0"})
    
            init = await self._read_json()
            if init["type"] != "init":
                raise RuntimeError("Expected init message")
    
            print(f"✅ MCP Server '{self.server_id}' 初始化完成,工具数: {len(init.get('tools', []))}")
            return init.get("tools", [])
    
        async def _read_line(self) -> str:
            line = await self.reader.readline()
            return line.decode().strip()
    
        def _send(self, data: dict):
            text = json.dumps(data, ensure_ascii=False)
            self.writer.write((text + "\n").encode())
            # 注意:不 await writer.drain() 可能导致问题
            # await self.writer.drain()
    
        async def _read_json(self) -> dict:
            while True:
                line = await self.reader.readline()
                if not line:
                    raise EOFError("Connection closed")
                try:
                    return json.loads(line.decode())
                except json.JSONDecodeError:
                    continue
    
        async def close(self):
            if self.writer:
                self.writer.close()
                await self.writer.wait_closed()
    
    
    # =======================
    # 创建多 Server Agent
    # =======================
    async def create_multi_server_agent():
        clients: List[MCPClient] = []
        all_tools: List[Tool] = []
    
        # 创建 MCP 客户端列表
        mcp_configs = [
            ("search", "npx", ["@tavily/mcp-server"]),
            # 示例:其他 MCP Server(需要实际存在)
            # ("db", "python", ["-m", "my_mcp_db_server"]),
        ]
    
        for server_id, cmd, args in mcp_configs:
            client = MCPClient(server_id, cmd, args)
            await client.connect()
            clients.append(client)
    
            # 获取工具并转换为 LangChain Tool
            tools = await client.get_all_tools()  # 实际上我们已经在 connect 返回了 tools
            for tool in tools:
                lc_tool = Tool(
                    name=f"{server_id}.{tool['name']}",
                    description=tool.get("description", "") or tool.get("prompt", ""),
                    func=lambda q, t=tool, c=client: asyncio.run(c.call_tool(t["name"], {"query": q})),
                )
                all_tools.append(lc_tool)
    
        # 初始化 LLM
        llm = ChatOpenAI(model="gpt-4o", temperature=0)
    
        # 创建 ReAct Agent
        from langchain import hub
        prompt = hub.pull("hwchase17/react")
    
        agent = create_react_agent(llm, all_tools, prompt)
        agent_executor = AgentExecutor(agent=agent, tools=all_tools, verbose=True)
    
        return agent_executor, clients
    
    
    # =======================
    # 调用工具(简化版,需增强)
    # =======================
    # 注意:上面的 lambda 有问题(闭包陷阱),需改写为:
    async def call_tool(client: MCPClient, tool_name: str, input_dict: dict):
        # 发送调用消息
        client._send({
            "type": "call",
            "method": tool_name,
            "params": input_dict
        })
    
        # 等待结果(简化处理,实际需处理流式/多消息)
        result = await client._read_json()
        if result["type"] == "result":
            return result["output"]
        elif result["type"] == "error":
            raise RuntimeError(result["error"])
        return str(result)
    
    
    # 修正:动态绑定时避免闭包问题
    def make_tool(client, tool_name, tool_info):
        async def func(input_val):
            if isinstance(input_val, dict):
                params = input_val
            else:
                params = {"query": input_val}
            try:
                result = await call_tool(client, tool_name, params)
                return json.dumps(result, ensure_ascii=False, indent=2)
            except Exception as e:
                return f"Error calling {tool_name}: {str(e)}"
    
        return Tool(
            name=f"{client.server_id}.{tool_name}",
            description=tool_info.get("description", "") or tool_info.get("prompt", "No description"),
            func=func,
            coroutine=func,
        )
    
    
    # 重新实现 create_multi_server_agent(修正版)
    async def create_multi_server_agent():
        clients: List[MCPClient] = []
        all_tools: List[Tool] = []
    
        mcp_configs = [
            ("search", "npx", ["@tavily/mcp-server"]),
            # 可扩展其他 MCP Server
        ]
    
        for server_id, cmd, args in mcp_configs:
            client = MCPClient(server_id, cmd, args)
            await client.connect()
            clients.append(client)
    
            # 获取工具
            tools = await client.get_all_tools()  # 实际上我们已经在 connect 返回了 tools
            for tool in tools:
                lc_tool = make_tool(client, tool["name"], tool)
                all_tools.append(lc_tool)
    
        llm = ChatOpenAI(model="gpt-4o", temperature=0)
        from langchain import hub
        prompt = hub.pull("hwchase17/react")
        agent = create_react_agent(llm, all_tools, prompt)
        agent_executor = AgentExecutor(agent=agent, tools=all_tools, verbose=True)
    
        return agent_executor, clients
    
    
    # =======================
    # 运行任务
    # =======================
    async def run_langgraph_agent():
        agent_executor, clients = await create_multi_server_agent()
    
        try:
            result = await agent_executor.ainvoke({
                "input": "量子计算最近有什么突破?请搜索并总结。",
                "chat_history": [],
            })
            print("\n\n🎯 最终结果:")
            print(result["output"])
        finally:
            for client in clients:
                await client.close()
    
    
    if __name__ == "__main__":
        asyncio.run(run_langgraph_agent())
    

    多Server集成示例

    # 多Server集成示例
    from langgraph.mcp import MultiServerMCPClient
    async def create_multi_server_agent():
    
        # 创建多Server客户端
        mcp_client = MultiServerMCPClient()
        
        # 添加搜索工具
        await mcp_client.add_server(
            "search",
            command="npx",
            args=["@tavily/mcp-server"]
        )
        
        # 添加数据库工具(示例)
        await mcp_client.add_server(
            "database",
            command="npx",
            args=["@example/database-mcp-server"]
        )
        
        # 获取所有工具
        all_tools = await mcp_client.get_all_tools()
        
        llm = ChatOpenAI(model="gpt-4")
        agent = create_react_agent(llm=llm, tools=all_tools)
        
        return agent, mcp_client
    

使用 LangGraph + LangChain 构建的 自动化研究工作流系统 ,让多个 AI Agent 分工协作,自动完成:“用户提问 → 搜集资料 → 分析信息 → 写成专业报告”

  • 安装依赖:pip install langgraph langchain-openai langchain-community tavily-python

    import asyncio
    from typing import TypedDict, Annotated, List
    from langgraph.graph import StateGraph, START, END
    from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
    from langchain_openai import ChatOpenAI
    from langchain.agents import create_react_agent
    from langchain_community.tools.tavily_search import TavilySearchResults
    from langchain_core.prompts import ChatPromptTemplate
    from operator import add
    
    # =======================
    # 自定义状态
    # =======================
    class AgentState(TypedDict):
        messages: Annotated[List[BaseMessage], add]        # 自动合并消息
        research_data: dict                                # 存储中间数据
        final_report: str                                  # 最终输出
    
    # =======================
    # 工具:使用 Tavily 搜索(替代 MCP)
    # =======================
    def load_tools():
        """加载外部工具,例如 Tavily 搜索"""
        search_tool = TavilySearchResults(
            max_results=5,
            include_images=False,
            include_raw_content=True
        )
        return [search_tool]
    
    # =======================
    # 节点函数
    # =======================
    llm = ChatOpenAI(model="gpt-4o", temperature=0)
    
    async def research_node(state: AgentState):
        """研究节点:调用搜索工具收集信息"""
        print("\n🔍 正在执行【研究节点】...")
        query = state["messages"][-1].content
        system_prompt = "你是一个专业研究员,擅长使用工具收集权威信息。"
    
        # 创建 ReAct Agent
        agent = create_react_agent(
            llm=llm,
            tools=load_tools(),
            state_modifier=system_prompt  # 实际上会被包装进 prompt
        )
    
        # 执行调用
        result = await agent.ainvoke({
            "messages": [
                ("system", system_prompt),
                ("human", f"请深入研究:{query}")
            ]
        })
    
        # 提取响应
        response_msg: AIMessage = result["messages"][-1]
        return {
            "messages": [response_msg],
            "research_data": {
                "raw_data": response_msg.content
            }
        }
    
    async def analysis_node(state: AgentState):
        """分析节点:对研究结果进行深度分析"""
        print("\n📊 正在执行【分析节点】...")
        raw_data = state["research_data"]["raw_data"]
    
        prompt = f"""
        你是一名资深行业分析师,请基于以下研究内容进行结构化分析:
    
        {raw_data}
    
        请输出:
        1. 关键发现(3-5条)
        2. 行业趋势判断
        3. 潜在挑战与机遇
        """
        response = await llm.ainvoke([
            ("system", "你是一个专业的数据分析师,逻辑严谨,表达清晰。"),
            ("human", prompt)
        ])
    
        return {
            "messages": [response],
            "research_data": {
                **state["research_data"],
                "analysis": response.content
            }
        }
    
    async def report_node(state: AgentState):
        """报告生成节点:输出结构化报告"""
        print("\n📝 正在执行【报告节点】...")
        raw_data = state["research_data"]["raw_data"]
        analysis = state["research_data"]["analysis"]
    
        prompt = f"""
        请根据以下内容撰写一份专业报告:
    
        【原始研究】
        {raw_data}
    
        【分析结论】
        {analysis}
    
        要求:
        - 标题明确
        - 分为“背景”、“关键发现”、“趋势预测”、“建议”四个部分
        - 语言正式,适合向管理层汇报
        """
        response = await llm.ainvoke([
            ("system", "你是一位资深咨询顾问,擅长撰写高质量行业报告。"),
            ("human", prompt)
        ])
    
        return {
            "messages": [response],
            "final_report": response.content
        }
    
    # =======================
    # 构建工作流
    # =======================
    async def create_research_workflow():
        workflow = StateGraph(AgentState)
    
        # 添加节点
        workflow.add_node("research", research_node)
        workflow.add_node("analysis", analysis_node)
        workflow.add_node("report", report_node)
    
        # 设置图结构
        workflow.add_edge(START, "research")
        workflow.add_edge("research", "analysis")
        workflow.add_edge("analysis", "report")
        workflow.add_edge("report", END)
    
        # 编译图
        app = workflow.compile()
        return app
    
    # =======================
    # 运行工作流
    # =======================
    async def run_research_workflow():
        app = await create_research_workflow()
    
        inputs = {
            "messages": [
                HumanMessage(content="人工智能在医疗领域的应用前景")
            ],
            "research_data": {},
            "final_report": ""
        }
    
        print("🚀 开始运行复杂研究工作流...\n")
        result = await app.ainvoke(inputs)
    
        print("\n" + "="*60)
        print("✅ 最终报告")
        print("="*60)
        print(result["final_report"])
        print("="*60)
    
    # =======================
    # 主入口
    # =======================
    if __name__ == "__main__":
        asyncio.run(run_research_workflow())
    

2.3 LlamaIndex - 企业级RAG+Agent

专为检索增强生成(RAG)设计,擅长处理企业知识库。

  • MCP集成 :使用McpToolSpec包装MCP工具。

  • 应用场景 :Agent优先查询企业内部的向量知识库(如产品手册),若信息不足,则通过MCP调用外部搜索引擎获取最新资讯。

  • 优势 :完美融合内部知识与外部信息,确保回答的全面性和时效性。

  • 安装依赖:pip install llama-index llama-index-agent-openai mcp-client-python

    # 安装和基础设置
    from llama_index.core import Settings
    from llama_index.llms.openai import OpenAI
    from llama_index.agent.openai import OpenAIAgent
    from llama_index.tools.mcp import McpToolSpec
    from mcp_client import BasicMCPClient
    import asyncio
    async def create_llamaindex_agent():
        # 配置全局设置
        Settings.llm = OpenAI(model="gpt-4")
        # 创建MCP客户端
        mcp_client = BasicMCPClient()
        await mcp_client.connect_stdio(
            command="npx",
            args=["@tavily/mcp-server"]
        )
        # 使用McpToolSpec包装MCP工具
        mcp_tool_spec = McpToolSpec(mcp_client)
        tools = mcp_tool_spec.to_tool_list()
        # 创建Agent
        agent = OpenAIAgent.from_tools(
            tools=tools,
            system_prompt="""
            你是一个专业的AI助手,具备以下能力:
            1. 使用搜索工具获取最新信息
            2. 分析和整理信息
            3. 提供专业建议
            请始终基于可靠的信息源进行回答。
            """,
            verbose=True
        )
        return agent, mcp_client
    # 运行Agent
    async def run_llamaindex_agent():
        agent, mcp_client = await create_llamaindex_agent()
        try:
            # 执行查询
            response = await agent.achat(
                "帮我搜索并分析2024年大语言模型的发展趋势"
            )
            print("Agent回复:")
            print(response.response)
            # 查看工具调用历史
            print("
    工具调用历史:")
            for i, source in enumerate(response.source_nodes):
                print(f"调用 {i+1}: {source.node.text[:100]}...")
        finally:
            await mcp_client.disconnect()
    if __name__ == "__main__":
        # 运行基础Agent
        asyncio.run(run_llamaindex_agent())
    

    RAG + Agent 集成

  • 安装依赖:pip install llama-index-core llama-index-embeddings-openai llama-index-llms-openai llama-index-readers-web tavily-python

    import asyncio
    from llama_index.core import Settings, VectorStoreIndex, Document
    from llama_index.core.tools import QueryEngineTool, ToolMetadata
    from llama_index.core.agent import ReActAgent
    from llama_index.llms.openai import OpenAI
    from llama_index.embeddings.openai import OpenAIEmbedding
    from llama_index.readers.web import TavilyWebReader
    
    # =======================
    # 全局设置
    # =======================
    Settings.llm = OpenAI(model="gpt-4o", temperature=0)
    Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-large")
    
    # =======================
    # 创建 RAG 知识库
    # =======================
    async def create_knowledge_base():
        documents = [
            Document(text="人工智能是计算机科学的一个分支,致力于创建能够执行通常需要人类智能的任务的系统。"),
            Document(text="机器学习是人工智能的一个子领域,专注于开发能够从数据中学习的算法。"),
            Document(text="深度学习是机器学习的一个分支,使用多层神经网络来模拟人脑的工作方式。"),
            Document(text="卷积神经网络(CNN)在图像识别中表现优异,常用于医学影像分析。"),
            Document(text="Transformer 模型通过自注意力机制,在自然语言处理任务中取得突破。")
        ]
        index = VectorStoreIndex.from_documents(documents)
        return index
    
    # =======================
    # 创建 RAG 工具
    # =======================
    def create_rag_tool(index):
        return QueryEngineTool(
            query_engine=index.as_query_engine(similarity_top_k=3),
            metadata=ToolMetadata(
                name="knowledge_base",
                description="查询内部知识库,获取AI相关的基础概念和定义"
            )
        )
    
    # =======================
    # 创建外部搜索工具(替代 MCP)
    # =======================
    def create_search_tool():
        # 使用 TavilyWebReader 作为外部搜索工具
        web_reader = TavilyWebReader(
            api_key="your_tavily_api_key",  # 替换为你的 Tavily Key
            include_raw_content=True,
            max_results=3
        )
    
        async def search_fn(query: str):
            try:
                nodes = await web_reader.load_data([query])
                content = "\n\n".join([n.text for n in nodes])
                return content
            except Exception as e:
                return f"搜索出错: {str(e)}"
    
        return QueryEngineTool(
            query_engine=None,  # 手动定义函数
            metadata=ToolMetadata(
                name="web_search",
                description="搜索互联网以获取关于AI、科技等领域的最新进展"
            ),
            tool_fn=search_fn,
            async_tool_fn=search_fn,
        )
    
    # =======================
    # 创建 Agent
    # =======================
    async def create_rag_agent():
        # 1. 创建 RAG 知识库
        index = await create_knowledge_base()
    
        # 2. 创建工具
        rag_tool = create_rag_tool(index)
        search_tool = create_search_tool()
    
        all_tools = [rag_tool, search_tool]
    
        # 3. 创建 Agent
        agent = ReActAgent.from_tools(
            tools=all_tools,
            system_prompt="""
            你是一个专业的AI知识助手,具备以下能力:
            1. 使用内部知识库查询AI的基础概念和定义
            2. 使用网络搜索工具获取最新的技术进展和新闻
            3. 结合两者提供全面、准确的回答
    
            回答策略:
            - 对于基础概念(如“什么是深度学习”),优先使用内部知识库
            - 对于时效性问题(如“2025年最新进展”),必须使用搜索工具
            - 若问题包含两部分,请分别查询并综合回答
            """,
            verbose=True
        )
    
        return agent
    
    # =======================
    # 运行 Agent
    # =======================
    async def run_rag_agent():
        agent = await create_rag_agent()
    
        query = "什么是深度学习?它在2025年有哪些最新进展?"
        print(f"📌 问题:{query}\n")
    
        try:
            response = await agent.achat(query)
            print("🧠 RAG Agent 回复:")
            print(response.response)
        except Exception as e:
            print(f"❌ Agent 执行出错: {e}")
    
    # =======================
    # 主程序入口
    # =======================
    if __name__ == "__main__":
        asyncio.run(run_rag_agent())
    

2.4 AutoGen 0.4+ - 分布式多Agent系统

AutoGen是微软开发的多Agent协作框架,0.4版本开始引入了更强大的分布式能力。

  • MCP集成 :通过AutoGenMCPClient连接,支持SSE模式。应用场景 :

  • 构建分布式系统,一个本地Agent负责决策,另一个远程Agent通过SSE连接的MCP Server执行耗时的工具调用。

  • 优势 :强大的Agent通信和协作机制,适合大型、分布式的Agent系统。

  • 安装依赖:pip install autogen-agentchat mcp-client-python

    import asyncio
    from autogen_agentchat.agents import AssistantAgent
    from autogen_agentchat.teams import RoundRobinGroupChat
    from autogen_agentchat.ui import Console
    from mcp_client import StdioServerParams, SseServerParams
    from mcp_client.integrations.autogen import AutoGenMCPClient
    from autogen_agentchat.base import TaskResult
    from autogen_agentchat.messages import ChatMessage
    
    
    async def create_autogen_agents():
        # 创建MCP客户端
        mcp_client = AutoGenMCPClient()
        
        # 配置Stdio Server
        stdio_params = StdioServerParams(
            command="npx",
            args=["@tavily/mcp-server"]
        )
        
        # 连接MCP Server
        await mcp_client.connect_server("search_tools", stdio_params)
        
        # 获取工具列表
        tools = await mcp_client.get_tools("search_tools")
        
        # 创建研究员Agent
        researcher = AssistantAgent(
            name="研究员",
            model_client=OpenAI(model="gpt-4"),
            tools=tools,
            system_message="""
            你是一个专业的研究员,负责:
            1. 收集和分析信息
            2. 使用搜索工具获取最新数据
            3. 提供准确的研究结果
            """
        )
        
        # 创建分析师Agent
        analyst = AssistantAgent(
            name="分析师",
            model_client=OpenAI(model="gpt-4"),
            system_message="""
            你是一个专业的数据分析师,负责:
            1. 分析研究员提供的数据
            2. 识别趋势和模式
            3. 提供深入见解
            """
        )
        
        # 创建报告员Agent
        reporter = AssistantAgent(
            name="报告员",
            model_client=OpenAI(model="gpt-4"),
            system_message="""
            你是一个专业的报告撰写专家,负责:
            1. 整理研究和分析结果
            2. 生成结构化报告
            3. 确保内容准确易懂
            """
        )
        
        return researcher, analyst, reporter, mcp_client
    
    
    async def run_autogen_team():
        researcher, analyst, reporter, mcp_client = await create_autogen_agents()
        try:
            # 创建团队
            team = RoundRobinGroupChat([researcher, analyst, reporter])
            
            # 执行任务
            result = await Console(
                team.run_stream(
                    task="请研究、分析并报告区块链技术在2024年的最新发展趋势"
                )
            )
            
            print("团队协作结果:")
            print(result.messages[-1].content)
        finally:
            await mcp_client.disconnect_all()
    
    
    async def create_distributed_system():
        # 创建多个MCP客户端(模拟分布式环境)
        search_client = AutoGenMCPClient()
        
        # 使用SSE连接远程MCP Server
        sse_params = SseServerParams(
            url="http://localhost:8000/mcp",
            headers={"Authorization": "Bearer your-token"}
        )
        
        await search_client.connect_server("remote_search", sse_params)
        
        # 创建专门的搜索Agent
        search_agent = AssistantAgent(
            name="搜索专家",
            model_client=OpenAI(model="gpt-4"),
            tools=await search_client.get_tools("remote_search"),
            system_message="你专门负责信息搜索和数据收集"
        )
        
        # 创建本地分析Agent
        local_analyst = AssistantAgent(
            name="本地分析师",
            model_client=OpenAI(model="gpt-4"),
            system_message="你负责分析从搜索Agent获得的数据"
        )
        
        # 创建分布式团队
        distributed_team = RoundRobinGroupChat([search_agent, local_analyst])
        
        return distributed_team, search_client
    
    
    if __name__ == "__main__":
        asyncio.run(run_autogen_team())
    

    自定义MCP工具集成

    from autogen_agentchat.base import Tool
    from typing import Any, Dict
    import json
    
    
    class CustomMCPTool(Tool):
        """自定义MCP工具包装器"""
        
        def __init__(self, mcp_client, tool_name: str):
            self.mcp_client = mcp_client
            self.tool_name = tool_name
        
        async def run(self, **kwargs) -> Any:
            """执行MCP工具"""
            try:
                result = await self.mcp_client.call_tool(
                    self.tool_name,
                    kwargs
                )
                return result
            except Exception as e:
                return f"工具调用失败: {str(e)}"
        
        @property
        def schema(self) -> Dict[str, Any]:
            """工具schema"""
            return {
                "type": "function",
                "function": {
                    "name": self.tool_name,
                    "description": f"调用{self.tool_name}工具",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "query": {
                                "type": "string",
                                "description": "查询参数"
                            }
                        },
                        "required": ["query"]
                    }
                }
            }
    
    
    async def create_custom_tools_agent():
        # 创建MCP客户端
        mcp_client = AutoGenMCPClient()
    
        # 连接多个MCP服务
        await mcp_client.connect_server(
            "search",
            StdioServerParams(command="npx", args=["@tavily/mcp-server"])
        )
    
        # 创建自定义工具
        search_tool = CustomMCPTool(mcp_client, "tavily_search")
    
        # 创建Agent
        agent = AssistantAgent(
            name="多工具专家",
            model_client=OpenAI(model="gpt-4"),
            tools=[search_tool],
            system_message="你是一个能够使用多种工具的专家"
        )
    
        return agent, mcp_client
    
    
    # 运行自定义工具Agent
    async def run_custom_tools():
        agent, mcp_client = await create_custom_tools_agent()
        try:
            # 创建单Agent对话
            from autogen_agentchat.teams import Swarm
            team = Swarm([agent])
            result = await team.run(
                "使用搜索工具查找Python asyncio的最佳实践"
            )
            print("自定义工具结果:")
            print(result.messages[-1].content)
        finally:
            await mcp_client.disconnect_all()
    

2.5 Pydantic AI - 结构化输出

Pydantic AI结合了Pydantic的类型验证能力,强调输出的类型安全和结构化,特别适合需要JSON或特定数据模型的场景。

  • MCP集成 :通过MCPServerStdioMCPServerHTTP连接。
  • 应用场景:要求Agent返回结构化的ResearchReport对象,包含key_findings、search_results等字段,便于下游程序直接解析。
  • 优势 :输出可预测、可验证,减少后处理成本。
  • 安装依赖:pip install pydantic-ai mcp-client-python
    from pydantic_ai import Agent, ModelRetry
    from pydantic import BaseModel, Field
    from mcp_client import MCPServerStdio, MCPServerHTTP
    from typing import List, Optional
    import asyncio
    
    
    # 定义结构化输出模型
    class SearchResult(BaseModel):
        title: str = Field(description="搜索结果标题")
        content: str = Field(description="搜索结果内容")
        url: str = Field(description="搜索结果链接")
        relevance_score: float = Field(description="相关度评分", ge=0.0, le=1.0)
    
    
    class ResearchReport(BaseModel):
        topic: str = Field(description="研究主题")
        executive_summary: str = Field(description="执行摘要")
        key_findings: List[str] = Field(description="关键发现")
        search_results: List[SearchResult] = Field(description="搜索结果")
        conclusion: str = Field(description="结论")
        confidence_level: float = Field(description="置信度", ge=0.0, le=1.0)
    
    
    async def create_pydantic_agent():
        # 创建MCP Server连接
        mcp_server = MCPServerStdio(
            command="npx",
            args=["@tavily/mcp-server"]
        )
        # 获取工具
        tools = await mcp_server.get_tools()
        # 创建Agent
        agent = Agent(
            model="openai:gpt-4",
            tools=tools,
            system_prompt="""
            你是一个专业的研究助手,能够:
            1. 使用搜索工具获取信息
            2. 分析和整理搜索结果
            3. 生成结构化的研究报告
            请确保输出符合指定的数据结构。
            """,
            result_type=ResearchReport
        )
        return agent, mcp_server
    
    
    async def run_pydantic_agent():
        agent, mcp_server = await create_pydantic_agent()
        try:
            # 执行结构化查询
            result = await agent.run(
                "研究人工智能在教育领域的应用现状,生成详细的研究报告"
            )
            print("结构化研究报告:")
            print(f"研究主题: {result.data.topic}")
            print(f"执行摘要: {result.data.executive_summary}")
            print(f"置信度: {result.data.confidence_level}")
            print("\n关键发现:")
            for i, finding in enumerate(result.data.key_findings, 1):
                print(f"{i}. {finding}")
            print("\n搜索结果:")
            for i, search_result in enumerate(result.data.search_results, 1):
                print(f"{i}. {search_result.title} (相关度: {search_result.relevance_score})")
            print(f"   链接: {search_result.url}")
            print(f"   摘要: {search_result.content[:100]}...")
            print(f"\n结论: {result.data.conclusion}")
        finally:
            await mcp_server.disconnect()
    
    
    class AnalysisStep(BaseModel):
        step_name: str = Field(description="分析步骤名称")
        description: str = Field(description="步骤描述")
        tools_used: List[str] = Field(description="使用的工具")
        findings: str = Field(description="发现内容")
        confidence: float = Field(description="步骤置信度", ge=0.0, le=1.0)
    
    
    class MultiStepAnalysis(BaseModel):
        query: str = Field(description="原始查询")
        analysis_steps: List[AnalysisStep] = Field(description="分析步骤")
        final_answer: str = Field(description="最终答案")
        overall_confidence: float = Field(description="整体置信度", ge=0.0, le=1.0)
        sources_cited: int = Field(description="引用源数量")
    
    
    async def create_advanced_pydantic_agent():
        # 创建HTTP MCP Server连接
        mcp_server = MCPServerHTTP(
            base_url="http://localhost:8000/mcp",
            headers={"Authorization": "Bearer your-token"}
        )
    
        tools = await mcp_server.get_tools()
    
        # 创建多步骤分析Agent
        agent = Agent(
            model="openai:gpt-4",
            tools=tools,
            system_prompt="""
            你是一个专业的分析师,能够进行多步骤深度分析。
    
            分析流程:
            1. 信息收集 - 使用搜索工具收集相关信息
            2. 数据验证 - 验证信息的准确性
            3. 深度分析 - 分析数据的含义和影响
            4. 结论综合 - 整合所有发现得出结论
    
            每个步骤都要记录使用的工具、发现的内容和置信度。
            """,
            result_type=MultiStepAnalysis
        )
    
        return agent, mcp_server
    
    
    async def run_advanced_analysis():
        agent, mcp_server = await create_advanced_pydantic_agent()
        try:
            result = await agent.run(
                "分析区块链技术在供应链管理中的应用前景和挑战"
            )
    
            print("多步骤分析结果:")
            print(f"查询: {result.data.query}")
            print(f"整体置信度: {result.data.overall_confidence}")
            print(f"引用源数量: {result.data.sources_cited}")
    
            print("\n分析步骤:")
            for i, step in enumerate(result.data.analysis_steps, 1):
                print(f"步骤 {i}: {step.step_name}")
                print(f"  描述: {step.description}")
                print(f"  使用工具: {', '.join(step.tools_used)}")
                print(f"  发现: {step.findings}")
                print(f"  置信度: {step.confidence}")
                print()
    
            print(f"最终答案: {result.data.final_answer}")
        finally:
            await mcp_server.disconnect()
    
    
    if __name__ == "__main__":
        # 运行基础结构化Agent
        asyncio.run(run_pydantic_agent())
    
        # 运行高级分析Agent
        print("\n" + "=" * 50 + "\n")
        asyncio.run(run_advanced_analysis())
    

2.6 SmolAgents - 轻量级代码生成

Hugging Face推出的轻量级框架,其核心是让Agent生成代码来调用工具。

  • MCP集成 :使用ToolCollection.from_mcp导入工具。

  • 应用场景 :Agent被要求“并行搜索多个主题”,它会生成Python asyncio.gather代码来并发调用MCP搜索工具。

  • 优势 :灵活性高,Agent可以自由组合和调度工具。

  • 安装依赖:pip install smolagents mcp-client-python

    from smolagents import CodeAgent, ToolCollection
    from mcp_client import MCPClient
    from smolagents import Tool
    import asyncio
    
    
    async def create_smol_agent():
        # 创建MCP客户端
        mcp_client = MCPClient()
        await mcp_client.connect_stdio(
            command="npx",
            args=["@tavily/mcp-server"]
        )
        # 使用ToolCollection导入MCP工具
        tools = ToolCollection.from_mcp(mcp_client)
        # 创建CodeAgent
        agent = CodeAgent(
            tools=tools,
            model="gpt-4",
            system_prompt="""
            你是一个专业的代码生成助手,能够:
            1. 理解用户需求
            2. 生成Python代码调用工具
            3. 执行代码并返回结果
            请总是生成清晰、可执行的代码。
            """
        )
        return agent, mcp_client
    
    
    async def run_smol_agent():
        agent, mcp_client = await create_smol_agent()
        try:
            # 执行代码生成任务
            result = await agent.run(
                """
                请帮我搜索"Python异步编程最佳实践",
                然后分析搜索结果,提取关键要点。
                """
            )
            print("SmolAgent生成的代码:")
            print(result.code)
            print("\n执行结果:")
            print(result.output)
        finally:
            await mcp_client.disconnect()
    
    
    # 自定义工具集成示例
    
    async def create_custom_smol_tools():
    
        # 创建自定义工具
        class CustomSearchTool(Tool):
            name = "custom_search"
            description = "自定义搜索工具,支持高级搜索选项"
    
            def __init__(self, mcp_client):
                self.mcp_client = mcp_client
                super().__init__()
    
            async def forward(self, query: str, max_results: int = 5) -> str:
                """执行自定义搜索"""
                try:
                    # 调用MCP工具
                    result = await self.mcp_client.call_tool(
                        "tavily_search",
                        {
                            "query": query,
                            "max_results": max_results
                        }
                    )
                    return result
                except Exception as e:
                    return f"搜索失败: {str(e)}"
    
            @property
            def inputs(self):
                return {
                    "query": {"type": "string", "description": "搜索查询"},
                    "max_results": {"type": "integer", "description": "最大结果数", "default": 5}
                }
    
            @property
            def output_type(self):
                return "string"
    
        # 创建MCP客户端
        mcp_client = MCPClient()
        await mcp_client.connect_stdio(
            command="npx",
            args=["@tavily/mcp-server"]
        )
    
        # 创建自定义工具
        custom_tool = CustomSearchTool(mcp_client)
    
        # 创建工具集合
        tools = ToolCollection([custom_tool])
    
        # 创建Agent
        agent = CodeAgent(
            tools=tools,
            model="gpt-4",
            system_prompt="""
            你是一个专业的搜索分析师,能够:
            1. 使用自定义搜索工具
            2. 分析搜索结果
            3. 生成洞察报告
            """
        )
    
        return agent, mcp_client
    
    
    async def run_custom_smol_agent():
        agent, mcp_client = await create_custom_smol_tools()
        try:
            result = await agent.run(
                """
                使用自定义搜索工具搜索"机器学习模型部署"相关内容,
                限制结果数量为3个,然后分析这些结果的共同点。
                """
            )
            print("自定义SmolAgent结果:")
            print(result.code)
            print("\n输出:")
            print(result.output)
        finally:
            await mcp_client.disconnect()
    
    
    # 批量任务处理示例
    
    async def create_batch_processing_agent():
        # 创建MCP客户端
        mcp_client = MCPClient()
        await mcp_client.connect_stdio(
            command="npx",
            args=["@tavily/mcp-server"]
        )
    
        tools = ToolCollection.from_mcp(mcp_client)
    
        # 创建批量处理Agent
        agent = CodeAgent(
            tools=tools,
            model="gpt-4",
            system_prompt="""
            你是一个批量任务处理专家,能够:
            1. 处理多个搜索任务
            2. 并行执行工具调用
            3. 汇总和分析结果
    
            请使用Python的asyncio来实现并行处理。
            """
        )
    
        return agent, mcp_client
    
    
    async def run_batch_processing():
        agent, mcp_client = await create_batch_processing_agent()
        try:
            # 批量搜索任务
            queries = [
                "人工智能在金融领域的应用",
                "区块链技术发展趋势",
                "云计算市场分析",
                "物联网安全挑战"
            ]
    
            result = await agent.run(
                f"""
                对以下查询进行批量搜索和分析:
                {queries}
    
                请:
                1. 并行执行所有搜索
                2. 分析每个查询的结果
                3. 生成综合报告
                """
            )
    
            print("批量处理结果:")
            print(result.code)
            print("\n综合报告:")
            print(result.output)
        finally:
            await mcp_client.disconnect()
    
    
    if __name__ == "__main__":
        # 运行基础SmolAgent
        asyncio.run(run_smol_agent())
    
        # 运行自定义工具Agent
        print("\n" + "=" * 50 + "\n")
        asyncio.run(run_custom_smol_agent())
    
        # 运行批量处理Agent
        print("\n" + "=" * 50 + "\n")
        asyncio.run(run_batch_processing())
    

2.7 Camel - 多Agent角色扮演

Camel专为多Agent角色扮演和协作任务设计,特别适合需要不同专业角色协作的场景。

  • MCP集成 :通过MCPToolkit提供工具。

  • 应用场景 :模拟“研究员”、“策略师”和“执行官”的团队协作。研究员使用MCP搜索市场数据,策略师据此制定策略,执行官规划落地步骤。

  • 优势 :生动地模拟了人类团队的协作过程。

  • 安装依赖:pip install camel-ai mcp-client-python

    from camel.agents import ChatAgent
    from camel.messages import BaseMessage
    from camel.types import RoleType, ModelType
    from camel.toolkits import MCPToolkit
    from mcp_client import MCPClient
    from camel.societies import RolePlaying
    import asyncio
    
    
    async def create_camel_agents():
        # 创建MCP客户端
        mcp_client = MCPClient()
        await mcp_client.connect_stdio(
            command="npx",
            args=["@tavily/mcp-server"]
        )
    
        # 创建MCP工具包
        mcp_toolkit = MCPToolkit(mcp_client)
        tools = mcp_toolkit.get_tools()
    
        # 创建研究员Agent
        researcher = ChatAgent(
            system_message=BaseMessage.make_assistant_message(
                role_name="研究员",
                content="""
                你是一个专业的研究员,具备以下特点:
                1. 善于信息收集和分析
                2. 能够使用各种搜索工具
                3. 注重数据的准确性和可靠性
                4. 擅长发现趋势和模式
    
                你的任务是为团队提供准确、全面的研究数据。
                """
            ),
            model_type=ModelType.GPT_4,
            tools=tools
        )
    
        # 创建策略师Agent
        strategist = ChatAgent(
            system_message=BaseMessage.make_assistant_message(
                role_name="策略师",
                content="""
                你是一个资深的策略师,具备以下能力:
                1. 基于研究数据制定策略
                2. 分析市场趋势和机会
                3. 识别风险和挑战
                4. 提供可行的行动建议
    
                你的任务是将研究结果转化为实际的策略建议。
                """
            ),
            model_type=ModelType.GPT_4
        )
    
        # 创建执行官Agent
        executor = ChatAgent(
            system_message=BaseMessage.make_assistant_message(
                role_name="执行官",
                content="""
                你是一个经验丰富的执行官,专长包括:
                1. 将策略转化为具体行动计划
                2. 评估资源需求和时间安排
                3. 识别执行风险和解决方案
                4. 制定关键指标和里程碑
    
                你的任务是确保策略能够有效执行。
                """
            ),
            model_type=ModelType.GPT_4
        )
    
        return researcher, strategist, executor, mcp_client
    
    
    async def run_camel_collaboration():
        researcher, strategist, executor, mcp_client = await create_camel_agents()
        try:
            # 定义协作任务
            task = "制定一个关于人工智能在零售业应用的完整商业策略"
    
            # 第一阶段:研究员收集信息
            research_prompt = BaseMessage.make_user_message(
                role_name="项目经理",
                content=f"""
                请对以下主题进行全面研究:{task}
    
                需要重点关注:
                1. 当前市场状况
                2. 技术发展趋势
                3. 成功案例分析
                4. 潜在挑战和机遇
                """
            )
            research_result = await researcher.step(research_prompt)
            print("研究员报告:")
            print(research_result.msg.content)
            print("\n" + "="*50 + "\n")
    
            # 第二阶段:策略师制定策略
            strategy_prompt = BaseMessage.make_user_message(
                role_name="项目经理",
                content=f"""
                基于研究员的报告,请制定详细的商业策略:
    
                研究报告:
                {research_result.msg.content}
    
                请提供:
                1. 市场定位策略
                2. 技术实施方案
                3. 竞争优势分析
                4. 风险评估和应对
                """
            )
            strategy_result = await strategist.step(strategy_prompt)
            print("策略师方案:")
            print(strategy_result.msg.content)
            print("\n" + "="*50 + "\n")
    
            # 第三阶段:执行官制定执行计划
            execution_prompt = BaseMessage.make_user_message(
                role_name="项目经理",
                content=f"""
                基于策略师的方案,请制定详细的执行计划:
    
                策略方案:
                {strategy_result.msg.content}
    
                请提供:
                1. 具体行动步骤
                2. 资源需求和预算
                3. 时间表和里程碑
                4. 关键指标和评估方法
                """
            )
            execution_result = await executor.step(execution_prompt)
            print("执行官计划:")
            print(execution_result.msg.content)
    
        finally:
            await mcp_client.disconnect()
    
    
    # 动态角色分配示例
    async def create_dynamic_role_system():
    
        # 创建MCP客户端
        mcp_client = MCPClient()
        await mcp_client.connect_stdio(
            command="npx",
            args=["@tavily/mcp-server"]
        )
        mcp_toolkit = MCPToolkit(mcp_client)
        tools = mcp_toolkit.get_tools()
    
        # 创建角色扮演系统
        role_play = RolePlaying(
            assistant_role_name="AI专家",
            user_role_name="企业顾问",
            assistant_agent_kwargs={
                "model_type": ModelType.GPT_4,
                "tools": tools
            },
            user_agent_kwargs={
                "model_type": ModelType.GPT_4
            }
        )
    
        return role_play, mcp_client
    
    
    async def run_dynamic_roles():
        role_play, mcp_client = await create_dynamic_role_system()
        try:
            # 初始化对话
            task_prompt = """
            我需要为我的制造业公司制定一个AI转型战略。
            请AI专家研究当前的AI技术趋势,
            企业顾问则基于实际业务需求提供建议。
    
            让我们开始这个协作过程。
            """
    
            # 运行角色扮演对话
            input_msg = BaseMessage.make_user_message(
                role_name="项目发起人",
                content=task_prompt
            )
            print("开始角色扮演协作:")
            print(f"任务: {task_prompt}")
            print("\n" + "="*50 + "\n")
    
            # 运行多轮对话
            for i in range(3):  # 进行3轮对话
                assistant_msg, user_msg = await role_play.step(input_msg)
                print(f"第{i+1}轮对话:")
                print(f"AI专家: {assistant_msg.content}")
                print(f"企业顾问: {user_msg.content}")
                print("\n" + "-"*30 + "\n")
                # 准备下一轮输入
                input_msg = user_msg
    
        finally:
            await mcp_client.disconnect()
    
    
    # 专业团队协作示例
    async def create_professional_team():
        # 创建MCP客户端
        mcp_client = MCPClient()
        await mcp_client.connect_stdio(
            command="npx",
            args=["@tavily/mcp-server"]
        )
        mcp_toolkit = MCPToolkit(mcp_client)
        tools = mcp_toolkit.get_tools()
    
        # 定义专业角色
        roles = {
            "数据科学家": {
                "description": "专注于数据分析、机器学习模型开发和数据洞察",
                "tools": tools
            },
            "产品经理": {
                "description": "负责产品规划、用户需求分析和市场策略",
                "tools": tools
            },
            "技术架构师": {
                "description": "设计系统架构、技术选型和实施方案",
                "tools": []
            },
            "业务分析师": {
                "description": "分析业务流程、需求分析和效益评估",
                "tools": []
            }
        }
    
        # 创建Agent团队
        team = {}
        for role_name, role_config in roles.items():
            team[role_name] = ChatAgent(
                system_message=BaseMessage.make_assistant_message(
                    role_name=role_name,
                    content=f"""
                    你是一个专业的{role_name},职责是:{role_config['description']}
    
                    请以专业的角度参与团队协作,
                    提供你领域内的专业建议和见解。
                    """
                ),
                model_type=ModelType.GPT_4,
                tools=role_config.get('tools', [])
            )
    
        return team, mcp_client
    
    
    async def run_professional_team():
        team, mcp_client = await create_professional_team()
        try:
            # 团队协作任务
            project_brief = """
            项目:开发一个基于AI的客户服务聊天机器人
            需求:
            1. 能够理解客户问题并提供准确回答
            2. 支持多语言交互
            3. 具备学习和优化能力
            4. 集成现有客服系统
    
            请各位专家从自己的角度提供专业建议。
            """
    
            print("专业团队协作开始:")
            print(f"项目简介: {project_brief}")
            print("\n" + "="*50 + "\n")
    
            # 让每个角色分别提供建议
            for role_name, agent in team.items():
                prompt = BaseMessage.make_user_message(
                    role_name="项目经理",
                    content=f"""
                    项目简介:{project_brief}
    
                    请从{role_name}的角度,提供专业的建议和方案。
                    """
                )
                result = await agent.step(prompt)
                print(f"{role_name}的建议:")
                print(result.msg.content)
                print("\n" + "-"*30 + "\n")
    
            # 生成综合方案
            print("团队协作完成,各专家已提供专业建议。")
    
        finally:
            await mcp_client.disconnect()
    
    
    if __name__ == "__main__":
        # 运行基础协作
        asyncio.run(run_camel_collaboration())
    
        # 运行动态角色系统
        print("\n" + "="*70 + "\n")
        asyncio.run(run_dynamic_roles())
    
        # 运行专业团队
        print("\n" + "="*70 + "\n")
        asyncio.run(run_professional_team())
    

2.8 CrewAI - 结构化Agent团队

CrewAI专注于构建结构化的Agent团队(Crew),强调明确的角色(Role)、目标(Goal)和任务(Task)的结构化团队。

  • MCP集成 :需自定义MCPSearchTool类,包装MCP调用逻辑。

  • 应用场景 :创建一个“市场研究员”Agent,其目标是“收集和分析信息”,通过MCP工具完成研究任务。

  • 优势 :任务流程清晰,易于管理和监控。

  • 安装依赖:pip install crewai mcp-client-python

    # 安装第三方MCP适配器
    # pip install crewai-mcp-adapter
    
    from crewai import Agent, Task, Crew, Process
    from crewai.tools import BaseTool
    import asyncio
    import json
    import subprocess
    import sys
    from typing import Dict, Any, Optional
    import os
    
    
    # 简化的MCP客户端实现
    class MCPClient:
        def __init__(self):
            self.process = None
            self.connected = False
    
        async def connect_stdio(self, command: str, args: list, env: Optional[Dict[str, str]] = None):
            """连接到MCP服务器"""
            try:
                # 设置环境变量
                full_env = os.environ.copy()
                if env:
                    full_env.update(env)
    
                # 启动MCP服务器进程
                self.process = await asyncio.create_subprocess_exec(
                    command, *args,
                    stdin=asyncio.subprocess.PIPE,
                    stdout=asyncio.subprocess.PIPE,
                    stderr=asyncio.subprocess.PIPE,
                    env=full_env
                )
    
                # 发送初始化请求
                init_request = {
                    "jsonrpc": "2.0",
                    "id": 1,
                    "method": "initialize",
                    "params": {
                        "protocolVersion": "2024-11-05",
                        "capabilities": {
                            "tools": {}
                        },
                        "clientInfo": {
                            "name": "CrewAI-MCP-Client",
                            "version": "1.0.0"
                        }
                    }
                }
                await self._send_request(init_request)
                response = await self._receive_response()
                if response.get("result"):
                    self.connected = True
                    print("MCP客户端连接成功")
                    return True
                else:
                    print(f"MCP连接失败: {response}")
                    return False
            except Exception as e:
                print(f"MCP连接错误: {e}")
                return False
    
        async def _send_request(self, request: Dict[str, Any]):
            """发送请求到MCP服务器"""
            if not self.process:
                raise Exception("MCP客户端未连接")
    
            request_str = json.dumps(request) + "\n"
            self.process.stdin.write(request_str.encode())
            await self.process.stdin.drain()
    
        async def _receive_response(self) -> Dict[str, Any]:
            """接收来自MCP服务器的响应"""
            if not self.process:
                raise Exception("MCP客户端未连接")
    
            line = await self.process.stdout.readline()
            if line:
                try:
                    return json.loads(line.decode().strip())
                except json.JSONDecodeError as e:
                    print(f"JSON解析错误: {e}")
                    return {"error": "JSON解析失败"}
            return {"error": "无响应"}
    
        async def call_tool(self, tool_name: str, params: Dict[str, Any]) -> str:
            """调用MCP工具"""
            if not self.connected:
                return "MCP客户端未连接"
    
            try:
                # 发送工具调用请求
                request = {
                    "jsonrpc": "2.0",
                    "id": 2,
                    "method": "tools/call",
                    "params": {
                        "name": tool_name,
                        "arguments": params
                    }
                }
                await self._send_request(request)
                response = await self._receive_response()
                if "result" in response:
                    content = response["result"].get("content", [])
                    if content:
                        return content[0].get("text", "无内容")
                    return "工具调用成功但无返回内容"
                else:
                    return f"工具调用失败: {response.get('error', '未知错误')}"
            except Exception as e:
                return f"工具调用异常: {str(e)}"
    
        async def disconnect(self):
            """断开MCP连接"""
            if self.process:
                self.process.terminate()
                await self.process.wait()
                self.connected = False
                print("MCP客户端已断开连接")
    
    
    # 创建MCP工具适配器
    class MCPSearchTool(BaseTool):
        name: str = "MCP搜索工具"
        description: str = "使用MCP协议进行网络搜索,获取最新信息"
    
        def __init__(self, mcp_client):
            super().__init__()
            self.mcp_client = mcp_client
    
        def _run(self, query: str) -> str:
            """执行搜索"""
            try:
                # 在新的事件循环中运行异步代码
                import asyncio
                try:
                    loop = asyncio.get_event_loop()
                    if loop.is_running():
                        # 如果事件循环正在运行,创建一个新的事件循环
                        import threading
                        result = [None]
                        exception = [None]
    
                        def run_in_thread():
                            new_loop = asyncio.new_event_loop()
                            asyncio.set_event_loop(new_loop)
                            try:
                                result[0] = new_loop.run_until_complete(
                                    self.mcp_client.call_tool("tavily_search", {"query": query})
                                )
                            except Exception as e:
                                exception[0] = e
                            finally:
                                new_loop.close()
    
                        thread = threading.Thread(target=run_in_thread)
                        thread.start()
                        thread.join()
    
                        if exception[0]:
                            raise exception[0]
                        return result[0]
                    else:
                        result = loop.run_until_complete(
                            self.mcp_client.call_tool("tavily_search", {"query": query})
                        )
                        return result
                except RuntimeError:
                    # 如果没有事件循环,创建一个新的
                    result = asyncio.run(
                        self.mcp_client.call_tool("tavily_search", {"query": query})
                    )
                    return result
            except Exception as e:
                return f"搜索失败: {str(e)}"
    
    
    async def create_crew_with_mcp():
        """创建带有MCP工具的Crew"""
        # 创建MCP客户端
        mcp_client = MCPClient()
    
        # 连接到Tavily搜索服务
        # 注意:需要设置TAVILY_API_KEY环境变量
        success = await mcp_client.connect_stdio(
            command="npx",
            args=["-y", "@tavily/mcp-server"],
            env={"TAVILY_API_KEY": os.getenv("TAVILY_API_KEY", "your_api_key_here")}
        )
    
        if not success:
            print("警告:MCP连接失败,将使用模拟数据")
            # 创建模拟工具
            class MockSearchTool(BaseTool):
                name: str = "模拟搜索工具"
                description: str = "模拟搜索工具,用于演示"
    
                def _run(self, query: str) -> str:
                    return f"模拟搜索结果:关于'{query}'的信息 - 这是一个模拟搜索结果,包含了相关的基础信息。"
    
            search_tool = MockSearchTool()
        else:
            # 创建MCP搜索工具
            search_tool = MCPSearchTool(mcp_client)
    
        # 创建研究员Agent
        researcher = Agent(
            role='资深研究员',
            goal='收集和分析相关信息,提供准确的研究报告',
            backstory="""
            你是一个经验丰富的研究员,擅长从各种来源收集信息,
            进行深入分析,并提供有价值的洞察。你总是确保信息的准确性和相关性。
            """,
            tools=[search_tool],
            verbose=True,
            allow_delegation=False
        )
    
        # 创建分析师Agent
        analyst = Agent(
            role='数据分析师',
            goal='分析研究数据,识别趋势和模式,提供专业见解',
            backstory=""",
            你是一个专业的数据分析师,善于从复杂的数据中提取有意义的信息,
            识别趋势和模式,并提供基于数据的专业建议。
            """,
            verbose=True,
            allow_delegation=False
        )
    
        # 创建内容创作者Agent
        content_creator = Agent(
            role='内容创作专家',
            goal='将分析结果转化为易于理解的内容',
            backstory=""",
            你是一个专业的内容创作专家,擅长将复杂的分析结果
            转化为清晰、有吸引力的内容,确保读者能够轻松理解。
            """,
            verbose=True,
            allow_delegation=False
        )
    
        return researcher, analyst, content_creator, mcp_client
    
    
    async def create_crew_tasks(researcher, analyst, content_creator):
        """创建任务"""
        # 定义研究任务
        research_task = Task(
            description="""
            对"人工智能在医疗保健中的应用"进行全面研究。
    
            需要研究的方面:
            1. 当前应用现状
            2. 主要技术和解决方案
            3. 成功案例和失败教训
            4. 未来发展趋势
            5. 面临的挑战和机遇
    
            请提供详细的研究报告。
            """,
            expected_output="一份包含当前状况、技术方案、案例分析和趋势预测的详细研究报告",
            agent=researcher
        )
    
        # 定义分析任务
        analysis_task = Task(
            description=""",
            基于研究报告,进行深入的数据分析。
    
            分析要点:
            1. 市场规模和增长趋势
            2. 技术成熟度分析
            3. 竞争格局评估
            4. 投资机会识别
            5. 风险因素评估
    
            请提供专业的分析报告。
            """,
            expected_output="一份包含市场分析、技术评估、竞争分析和投资建议的专业分析报告",
            agent=analyst
        )
    
        # 定义内容创作任务
        content_task = Task(
            description=""",
            基于研究和分析报告,创作一篇高质量的科普文章。
    
            内容要求:
            1. 通俗易懂,适合普通读者
            2. 结构清晰,逻辑性强
            3. 包含实际案例和数据
            4. 提供实用的建议
            5. 字数控制在1500-2000字
    
            请创作一篇引人入胜的科普文章。
            """,
            expected_output="一篇1500-2000字的高质量科普文章,内容准确、易懂、有趣",
            agent=content_creator
        )
    
        return research_task, analysis_task, content_task
    
    
    async def create_advanced_crew():
        """创建高级Crew配置"""
        # 创建MCP客户端
        mcp_client = MCPClient()
        success = await mcp_client.connect_stdio(
            command="npx",
            args=["-y", "@tavily/mcp-server"],
            env={"TAVILY_API_KEY": os.getenv("TAVILY_API_KEY", "your_api_key_here")}
        )
        if success:
            search_tool = MCPSearchTool(mcp_client)
        else:
            # 使用模拟工具
            class MockSearchTool(BaseTool):
                name: str = "模拟搜索工具"
                description: str = "模拟搜索工具,用于演示"
    
                def _run(self, query: str) -> str:
                    return f"模拟搜索结果:关于'{query}'的专业信息和最新数据。"
            search_tool = MockSearchTool()
    
        # 创建专业化的Agent团队
        market_researcher = Agent(
            role='市场研究专家',
            goal='专注于市场趋势和竞争分析',
            backstory='你是一个资深的市场研究专家,对各行业的市场动态有深入了解。',
            tools=[search_tool],
            verbose=True,
            max_iter=3,  # 最大迭代次数
            memory=True  # 启用记忆功能
        )
    
        tech_analyst = Agent(
            role='技术分析师',
            goal='专注于技术发展和创新分析',
            backstory='你是一个技术分析专家,对新兴技术和创新趋势有敏锐的洞察力。',
            tools=[search_tool],
            verbose=True,
            max_iter=3,
            memory=True
        )
    
        strategy_consultant = Agent(
            role='战略咨询师',
            goal='提供战略建议和实施方案',
            backstory='你是一个经验丰富的战略咨询师,擅长将研究和分析转化为可执行的战略方案。',
            verbose=True,
            max_iter=3,
            memory=True
        )
    
        return market_researcher, tech_analyst, strategy_consultant, mcp_client
    
    
    async def run_crew_ai():
        """运行CrewAI任务"""
        try:
            print("开始创建CrewAI团队...")
            # 创建Agents
            researcher, analyst, content_creator, mcp_client = await create_crew_with_mcp()
            # 创建任务
            research_task, analysis_task, content_task = await create_crew_tasks(
                researcher, analyst, content_creator
            )
            # 创建Crew
            crew = Crew(
                agents=[researcher, analyst, content_creator],
                tasks=[research_task, analysis_task, content_task],
                process=Process.sequential,  # 顺序执行
                verbose=2
            )
            print("📋 开始执行CrewAI任务...")
            # 执行任务
            result = crew.kickoff()
            print("\nCrewAI任务完成!")
            print("\n最终结果:")
            print("="*50)
            print(result)
            print("="*50)
            return result
        except Exception as e:
            print(f"执行过程中出现错误: {str(e)}")
            return None
        finally:
            # 清理资源
            if 'mcp_client' in locals():
                await mcp_client.disconnect()
    
    
    async def run_advanced_crew():
        """运行高级Crew配置"""
        try:
            print("开始创建高级CrewAI团队...")
            # 创建高级Agents
            market_researcher, tech_analyst, strategy_consultant, mcp_client = await create_advanced_crew()
            # 创建高级任务
            market_task = Task(
                description=""",
                对人工智能医疗保健市场进行深入的市场研究。
                重点关注:
                1. 全球市场规模和增长预测
                2. 主要参与者和竞争格局
                3. 区域市场差异
                4. 监管环境影响
                """,
                expected_output="详细的市场研究报告,包含数据分析和竞争格局",
                agent=market_researcher
            )
            tech_task = Task(
                description=""",
                分析人工智能在医疗保健中的技术发展趋势。
                重点关注:
                1. 关键技术栈和解决方案
                2. 技术成熟度评估
                3. 创新突破点
                4. 技术实施挑战
                """,
                expected_output="技术分析报告,包含技术路线图和实施建议",
                agent=tech_analyst
            )
            strategy_task = Task(
                description=""",
                基于市场研究和技术分析,制定战略建议。
                重点关注:
                1. 投资机会识别
                2. 风险评估和缓解
                3. 实施路径规划
                4. 成功因素分析
                """,
                expected_output="战略建议报告,包含可执行的行动方案",
                agent=strategy_consultant
            )
            # 创建高级Crew
            advanced_crew = Crew(
                agents=[market_researcher, tech_analyst, strategy_consultant],
                tasks=[market_task, tech_task, strategy_task],
                process=Process.sequential,
                verbose=2
            )
            print("📋 开始执行高级CrewAI任务...")
            # 执行任务
            result = advanced_crew.kickoff()
            print("\n高级CrewAI任务完成!")
            print("\n最终结果:")
            print("="*50)
            print(result)
            print("="*50)
            return result
        except Exception as e:
            print(f"❌ 执行过程中出现错误: {str(e)}")
            return None
        finally:
            # 清理资源
            if 'mcp_client' in locals():
                await mcp_client.disconnect()
    
    
    def main():
        """主函数"""
        print("CrewAI + MCP 集成演示")
        print("="*50)
    
        # 检查环境变量
        if not os.getenv("TAVILY_API_KEY"):
            print(" 警告:未设置TAVILY_API_KEY环境变量")
            print("   将使用模拟数据进行演示")
            print("   要获取真实搜索结果,请设置环境变量:")
            print("   export TAVILY_API_KEY=your_api_key_here")
            print()
    
        print("请选择运行模式:")
        print("1. 基础CrewAI演示")
        print("2. 高级CrewAI演示")
        choice = input("请输入选择 (1 或 2): ").strip()
        if choice == "1":
            asyncio.run(run_crew_ai())
        elif choice == "2":
            asyncio.run(run_advanced_crew())
        else:
            print("无效选择,运行基础演示...")
            asyncio.run(run_crew_ai())
    
    if __name__ == "__main__":
        main()
    

3. 结论与展望

本文系统性地说明了MCP协议作为AI Agent“神经中枢”的价值。通过将八大主流框架与MCP集成,我们证明了其强大的通用性和工程可行性。MCP不仅解决了工具调用的标准化问题,更促进了Agent生态的繁荣。

未来,MCP有望成为AI Agent领域的“TCP/IP协议”。我们期待看到更多企业级特性(如权限控制、审计日志、服务网格)被集成进来,推动AI Agent从实验室走向千行百业。


网站公告

今日签到

点亮在社区的每一天
去签到