从0到1:Google ADK企业级多智能体开发终极指南(二)

发布于:2025-07-15 ⋅ 阅读:(33) ⋅ 点赞:(0)

从0到1:Google ADK企业级多智能体开发终极指南(二)

从0到1:Google ADK企业级多智能体开发终极指南文章中,我们组建了一个AI研究团队。本次将对这个团队进行一次重大升级,使其更智能、更强大、更接近前沿的自主智能体(Autonomous Agent)设计。

这里将引入两位新专家,并对现有专家进行能力升级,构建一个能够自主规划、执行代码、并由专门的写作AI生成报告的混合多智能体系统。

升级内容:

  1. 混合智能体架构:构建一个包含简单功能型专家(如搜索)和智能思考型专家(如报告撰写)的系统。
  2. AI调用AI:学习如何让一个智能体(协调者)调用另一个拥有独立思考能力的智能体(报告专家),这是一种极其强大的高级模式。
  3. 赋予AI代码执行能力:集成一个代码执行专家,让你的系统能进行数据计算、API调用等更复杂的任务。
  4. 端到端自主工作流:见证一个复杂的研究请求如何被自动拆解、搜索、计算、并由写作AI最终润色成文的完整过程。

Part 1: 全新蓝图 - AI深度研究智能体 2.0

团队扩充至四名成员,分工更加精细:

  1. 协调智能体 (Coordinator Agent)

    • 角色:项目总监。
    • 职责:理解用户最终目标,拆解任务,并智能地调度手下的三位专家。
    • 技术栈:Google ADK。
  2. 网络搜索专家 (Web Search Specialist)

    • 角色:信息搜集员。
    • 职责:提供一个简单的、基于函数的工具,用于网络搜索。
    • 技术栈:FastMCP (HTTP 微服务)。
  3. **代码执行专家 (Code Execution Specialist) **

    • 角色:Python 程序员。
    • 职责:提供一个能安全执行 Python 代码的工具,用于数据处理、计算等。
    • 技术栈:FastMCP (HTTP 微服务)。
  4. **报告撰写专家 (Report Writer Specialist) **

    • 角色:资深分析师与作家。
    • 职责:它本身就是一个完整的 ADK 智能体,拥有自己的大模型大脑。它接收原始数据,并利用其AI能力深度理解、分析,并撰写出高质量的报告。
    • 技术栈:Google ADK + FastMCP (将ADK智能体的能力封装为HTTP微服务)。
全新协作流程
          +-----------------+
用户  <=> |  协调智能体 (ADK) |  (项目总监)
          +-------+---------+
                  | 1. 理解并拆解任务
                  |
        /---------+----------\------------------\
       /          |           \                  \
      v           v            v                 v 5. "根据所有资料写报告"
+------------+  +-----------+  +------------------+  +----------------------+
| 网络搜索专家 |  |  代码执行专家 |  | ... 其他专家 ... |  | 报告撰写专家(ADK+MCP) |
| (HTTP@8001)|  | (HTTP@8003)|  |                  |  | (HTTP@8002)          |
+------------+  +-----------+  +------------------+  +----------------------+
      | 2. 返回结果 | 4. 返回结果       |                   | 6. 返回AI生成的报告
       \          |           /                  /
        \---------+----------/------------------/
                  | 7. 整合最终报告并回复用户
                  v
                 用户

Part 2: 锻造专家 - 创建独立的 MCP 微服务

步骤 1: 环境准备与项目结构

首先,确保你的开发环境已安装所有必要的库。

pip install google-adk python-dotenv fastmcp "uvicorn[standard]" sqlalchemy

然后,创建我们的项目文件夹结构:

adk-multi-agent-system/
  ├── .env                       # 存放你的API密钥
  ├── specialists/               # 专家服务目录
  │   ├── search_specialist.py   # 搜索专家服务
  │   ├── report_specialist.py   # 报告专家服务
  │   └── code_specialist.py     # 代码执行专家服务
  └── coordinator/               # 协调智能体目录
      ├── __init__.py
      └── agent.py

在项目根目录创建 .env 文件,并填入你的 Google Gemini API 密钥:

# .env file
GOOGLE_API_KEY="YOUR_GEMINI_API_KEY_HERE"
步骤 2: 网络搜索专家 (功能型服务)

这位专家负责提供基础的搜索能力。创建 specialists/search_specialist.py 文件并填入以下内容:

# specialists/search_specialist.py

from fastmcp import FastMCP
import asyncio
import uvicorn

mcp_app = FastMCP("WebSearcher_MCP_Service")

@mcp_app.tool
def search_web(query: str) -> dict:
    """
    根据给定的查询词在网上搜索信息,并返回关键结果。
    在实际应用中,这里会调用真正的搜索引擎API。
    """
    print(f"--- [搜索专家] 收到任务: 搜索 '{query}' ---")
  
    # 为了教学目的,我们返回模拟的搜索结果
    # 模拟搜索股价
    if "aapl" in query.lower() or "apple" in query.lower():
        return {"status": "success", "results": "模拟数据:苹果公司(AAPL)的当前股价是 $170"}
    if "googl" in query.lower() or "google" in query.lower():
        return {"status": "success", "results": "模拟数据:谷歌公司(GOOGL)的当前股价是 $150"}
      
    return {"status": "success", "results": f"关于 '{query}' 的搜索结果为空。"}

async def main():
    config = mcp_app.run(
        transport="http", 
        host="0.0.0.0", 
        port=8001, 
        path="/mcp"
    )
    server = uvicorn.Server(config)
    await server.serve()

if __name__ == "__main__":
    print("启动网络搜索专家 MCP 服务于 http://0.0.0.0:8001/mcp")
    asyncio.run(main())
步骤 3: 打造“代码执行专家” (新成员)

这位专家为我们的系统带来了计算能力。创建 specialists/code_specialist.py 文件:

# specialists/code_specialist.py

from fastmcp import FastMCP
import asyncio
import uvicorn
from contextlib import redirect_stdout
import io

mcp_app = FastMCP("CodeExecutor_MCP_Service")

@mcp_app.tool
def execute_python_code(code: str) -> dict:
    """
    安全地执行一段Python代码并返回其标准输出。
    主要用于数据计算、转换等。
    :param code: 要执行的Python代码字符串。
    """
    print(f"--- [代码专家] 收到任务: 执行代码 ---\n{code}\n---------")
  
    # ⚠️ 安全警告: 在生产环境中,`exec` 极其危险!
    # 必须在严格隔离的沙箱环境(如Docker容器、gVisor)中运行。
    # 这里为了教学目的,我们直接执行。
  
    local_vars = {}
    stdout_capture = io.StringIO()
  
    try:
        # 重定向标准输出,以捕获print语句的结果
        with redirect_stdout(stdout_capture):
            exec(code, {"__builtins__": __builtins__}, local_vars)
      
        output = stdout_capture.getvalue()
        # 修正:如果代码成功执行但没有输出,则返回一条明确的消息。
        if not output:
            output = "代码已成功执行,但没有产生任何标准输出。"
        return {"status": "success", "output": output}
    except Exception as e:
        return {"status": "error", "error_message": str(e)}

async def main():
    config = mcp_app.run(
        transport="http", 
        host="0.0.0.0", 
        port=8003, # 为新专家分配新端口
        path="/mcp"
    )
    server = uvicorn.Server(config)
    await server.serve()

if __name__ == "__main__":
    print("启动代码执行专家 MCP 服务于 http://0.0.0.0:8003/mcp")
    asyncio.run(main())
步骤 4: 升级“报告撰写专家” (智能型服务)

这是本次教程最核心的升级。我们将创建一个完整的ADK智能体,然后用FastMCP将它的“对话”能力封装成一个可远程调用的工具。

重构 specialists/report_specialist.py 文件:

# specialists/report_specialist.py (修正版)

import asyncio
import uvicorn
from typing import List, Dict
from dotenv import load_dotenv

# ADK 和 FastMCP 的双重导入
from google.adk.agents import Agent
from google.adk.events import Event # 导入Event以处理事件流
from fastmcp import FastMCP

# 加载环境变量以供ADK Agent使用
load_dotenv()

# --- 1. 定义一个拥有独立大脑的报告撰写 Agent ---
report_writing_agent = Agent(
    name="ProfessionalReportWriter",
    model="gemini-2.5-flash", # 这里可以使用更强大的模型如 gemini-2.5-pro 以获得更好的写作效果
    instruction="""
    你是一位顶级的商业分析师和报告撰写专家。
    你的任务是接收原始、杂乱的数据点(可能来自网络搜索、代码执行结果等),并完成以下工作:
    1.  **深度理解**: 分析数据背后的含义和相互关系。
    2.  **构建逻辑**: 组织一个清晰、有说服力的叙事结构。
    3.  **专业写作**: 使用精炼、专业的语言撰写一份格式优美的Markdown报告。
    你的输出必须是高质量、可直接呈现的报告成品,不要包含任何与报告无关的对话性文字。
    """,
    # 这个 Agent 自身没有工具,它只负责思考和写作
)

# --- 2. 创建一个 FastMCP 服务来“代理”这个 Agent 的能力 ---
mcp_app = FastMCP("ReportWriter_MCP_Service")

# --- 辅助函数:正确调用Agent并从事件流中提取最终结果 (修正了run_async的使用) ---
async def invoke_agent_and_get_response(agent: Agent, prompt: str) -> str:
    """
    使用 ADK 的 run_async 方法调用 Agent,并从返回的事件流中提取最终的文本输出。
    """
    final_response = ""
    # ADK 的 run_async 返回一个异步生成器,包含了所有执行事件
    async for event in agent.run_async(prompt):
        # 我们关心的是 agent_end 事件,它包含了最终的输出
        if event.type == "agent_end" and event.data.get("output_text"):
            final_response = event.data["output_text"]
            break # 拿到最终结果后即可退出
    return final_response

# --- 3. 定义一个 MCP 工具,其内部实现是调用上面的 Agent ---
@mcp_app.tool
async def generate_report_from_data(topic: str, context_data: str) -> str:
    """
    调用一个专业的写作AI,根据主题和上下文数据生成一份高质量的报告。
    :param topic: 报告的主题。
    :param context_data: 一个包含所有研究资料的字符串(如搜索结果、计算数据等)。
    """
    print(f"--- [报告专家] 收到任务,正在调用内部AI撰写 '{topic}' 的报告 ---")

    # 构造给内部写作AI的提示
    prompt = f"""
    请根据以下主题和数据,为我撰写一份详细的研究报告。
  
    主题: {topic}
  
    已知数据:
    ---
    {context_data}
    ---
    """
  
    # 【核心修正】: 使用正确的 invoke_agent_and_get_response 辅助函数
    # 这才是 "AI 调用 AI" 在 ADK 中的标准实现!
    response = await invoke_agent_and_get_response(report_writing_agent, prompt)
  
    return response

# --- 4. 运行 MCP 服务器 ---
async def main():
    config = mcp_app.run(
        transport="http",
        host="0.0.0.0",
        port=8002, # 保持报告专家的端口不变
        path="/mcp",
    )
    server = uvicorn.Server(config)
    await server.serve()

if __name__ == "__main__":
    print("启动报告撰写专家(AI版) MCP 服务于 http://0.0.0.0:8002/mcp")
    asyncio.run(main())

Part 3: 强化总监 - 升级协调智能体

协调智能体现在需要知道如何指挥它的新团队成员。它的大脑需要足够强大来理解复杂的指令并编排多步工作流。

首先,创建 coordinator/__init__.py 文件,内容如下:

# coordinator/__init__.py
from .agent import root_agent
__all__ = ["root_agent"]

然后,用以下修正后的版本更新 coordinator/agent.py 文件:

# coordinator/agent.py (最终修正版)

import asyncio
from typing import List, Optional

from dotenv import load_dotenv
from google.adk.agents import Agent
from google.adk.tools import BaseTool, BaseToolset
from google.adk.tools.base_toolset import ReadonlyContext
from google.adk.tools.mcp_tool import MCPToolset # 仍然需要MCPToolset来实际获取工具

load_dotenv()

# 定义所有专家服务的地址
SEARCH_SPECIALIST_URL = "http://localhost:8001/mcp"
REPORT_SPECIALIST_URL = "http://localhost:8002/mcp"
CODE_SPECIALIST_URL = "http://localhost:8003/mcp"

class AsyncMcpToolset(BaseToolset):
    """
    一个自定义工具集,用于异步地从 MCP URLs 加载工具。
    这是在 ADK 中处理需要异步初始化的工具的正确模式。
    """
    def __init__(self, urls: List[str]):
        self._urls = urls
        self._tools: Optional[List[BaseTool]] = None
        self._lock = asyncio.Lock()

    async def get_tools(
        self, readonly_context: Optional[ReadonlyContext] = None
    ) -> List[BaseTool]:
        """
        在首次需要时从 MCP 端点获取工具。
        ADK 框架会在运行时安全地调用这个异步方法。
        """
        async with self._lock:
            if self._tools is None:
                print("正在首次连接到所有专家 MCP 服务...")
                try:
                    # 调用原生的 MCPToolset.from_urls 来实际获取工具
                    mcp_tools, _ = await MCPToolset.from_urls(self._urls)
                    self._tools = list(mcp_tools)
                    tool_names = [tool.name for tool in self._tools]
                    print(f"成功连接!发现 {len(tool_names)} 个工具: {tool_names}")
                except Exception as e:
                    print(f"错误:连接到 MCP 服务失败: {e}")
                    self._tools = []  # 失败时返回空列表
        return self._tools if self._tools is not None else []

    async def close(self) -> None:
        """关闭工具集持有的任何资源。"""
        print("正在关闭 AsyncMcpToolset...")
        # 视具体需求,这里可以加入清理资源的代码
        await asyncio.sleep(0) # 模拟异步操作

# 创建我们自定义工具集的实例
mcp_toolset_wrapper = AsyncMcpToolset(
    urls=[SEARCH_SPECIALIST_URL, REPORT_SPECIALIST_URL, CODE_SPECIALIST_URL]
)

# ADK 要求智能体实例命名为 root_agent
# 我们现在可以安全地、同步地创建智能体实例了
root_agent = Agent(
    name="research_coordinator",
    model="gemini-2.5-pro", # 强烈建议使用更强的模型来处理复杂流程编排
    description="一个负责协调专家团队完成复杂研究任务的项目总监。",
    instruction="""
    你是一个极其聪明的项目总监。你的任务是根据用户的最终目标,自主规划并调用一个专家团队来完成任务。
  
    你的团队成员和对应的工具有:
    1.  **search_web**: 网络搜索专家,用于从互联网获取信息。
    2.  **execute_python_code**: 代码执行专家,用于进行计算、数据处理或运行脚本。当你需要精确计算时,必须使用此工具。
    3.  **generate_report_from_data**: 报告撰写专家(这是一个AI),在你收集完所有必要信息后,调用它来生成最终的、高质量的报告。

    你的标准工作流程是:
    1.  **规划**: 理解用户请求,拆分成多个步骤(如:先搜索,再计算,最后写报告)。
    2.  **执行**: 按顺序调用 `search_web` 和 `execute_python_code` 工具来收集和处理信息。
    3.  **汇总**: 将所有从其他工具获得的信息(搜索结果、代码输出等)整合成一个大的上下文信息 `context_data`。
    4.  **报告**: 调用 `generate_report_from_data` 工具,将汇总的 `context_data` 和用户原始的主题 `topic` 传递给它,获得最终报告。
    5.  **交付**: 将最终报告呈现给用户。

    **绝对禁止**自己编造数字或报告内容,你必须严格依赖工具的输出。在调用工具时,要思考清楚每个参数应该传入什么值。
    """,
    tools=[mcp_toolset_wrapper], # 将我们的 AsyncMcpToolset 实例传递给 Agent
)

print(f"协调智能体 '{root_agent.name}' 已创建,并配置了异步加载的 MCP 工具集。")

提示:我们为协调智能体升级到了 gemini-2.5-pro 并提供了非常详尽的指令(System Prompt)。对于负责复杂流程编排的“大脑”智能体,使用更强大的模型和清晰的指令至关重要,这能极大提升它自主规划和正确调用工具的能力。

Part 4: 全员就位 - 运行并见证奇迹

现在启动程序。你需要打开 4 个独立的终端窗口,因为每个专家和协调者都是独立运行的进程。

终端 1: 启动网络搜索专家

# 在 adk-multi-agent-system 根目录下运行
python specialists/search_specialist.py

你应该会看到输出: 启动网络搜索专家 MCP 服务于 http://0.0.0.0:8001/mcp

终端 2: 启动报告撰写专家

# 在 adk-multi-agent-system 根目录下运行
python specialists/report_specialist.py

你应该会看到输出: 启动报告撰写专家 MCP 服务于 http://0.0.0.0:8002/mcp

终端 3: 启动代码执行专家

# 在 adk-multi-agent-system 根目录下运行
python specialists/code_specialist.py

你应该会看到输出: 启动代码执行专家 MCP 服务于 http://0.0.0.0:8003/mcp

终端 4: 启动协调智能体 (总监)

# 在 adk-multi-agent-system 根目录下运行
adk web -a coordinator

你会看到协调智能体启动,并在首次需要工具时(例如你向它提问后),打印出正在连接以及成功发现的3个工具:search_web, generate_report_from_data, execute_python_code

现在,打开浏览器访问 ADK Web UI (通常是 http://127.0.0.1:8008 或命令行提示的地址),向你的项目总监提出一个需要多步骤、跨领域合作的挑战:

“请帮我查一下苹果公司(AAPL)和谷歌(GOOGL)的当前股价(请用模拟数据,比如苹果$170,谷歌$150),然后计算它们股价的差额,并基于这些信息为我撰写一份简短的市场观察报告。”

现在,请屏息观察!你将看到一场精彩的自动化协作:

  1. 协调智能体接收任务,并规划出:先搜索AAPL,再搜索GOOGL,然后计算差额,最后写报告。
  2. 它依次调用 search_web,两次搜索请求分别被发往搜索专家服务。
  3. 拿到两个股价信息后,它生成一段Python代码(例如 print(170 - 150)),并调用 execute_python_code,请求被发往代码专家服务。
  4. 代码专家执行代码并返回差额 “20”。
  5. 协调智能体现在拥有了所有原始数据(两家公司的股价、股价差额)。它将这些数据打包成一个大的 context_data 字符串,并调用 generate_report_from_data,请求被发往报告专家服务。
  6. 报告专家的内部AI开始工作,它理解了所有数据,并发挥其写作才能,生成一篇流畅、专业的报告。
  7. 最终,这份由AI撰写的报告被返回给协调智能体,并完美地呈现在你的聊天窗口中。

总结

通过本次升级,你构建的已不仅仅是一个多智能体系统,而是一个具备初步自主规划和智能协作能力的AI团队。

你掌握了企业级AI架构的精髓:

  • 服务专业化:让每个服务做它最擅长的事。
  • 能力分层:区分简单的功能性AI(如搜索、计算)和复杂的思考型AI(如报告撰写)。
  • 协议标准化:通过MCP让不同类型、不同技术栈的AI服务无缝协作,真正实现了即插即用。

这为你打开了通往更高级AI应用的大门,比如创建能自主编程、自主调试、甚至能自我改进的复杂系统。


网站公告

今日签到

点亮在社区的每一天
去签到