在llm+angent+mcp这个框架中:
- LLM: 依然是智能体的“大脑”,赋予它们理解、推理、生成和规划的能力,并且也用于处理和利用共享上下文。
- Agent: 具备特定 R&D 职能的自主单元,它们感知共享上下文,根据内置逻辑和 LLM 的推理决定自己的行动,并通过调用工具和更新共享上下文来执行任务。
- MCP (Master Context Protocol) Service: 一个提供 API 的服务,用于:
* 存储和检索所有 Agent 在特定任务或会话中的交互历史(对话、思考、行动、观察)。
* 存储和更新共享的工作状态、关键变量、中间结果、需要共同关注的标记(如“需要人类审批”、“遇到冲突”)。
* 将相关的长期记忆(Vector DB, KG)和当前任务上下文关联起来。
* 可能还负责事件通知,当共享上下文的某个关键部分更新时通知相关 Agent。
基于 LLM+Agent+MCP (Context Protocol) 的 R&D 自动化平台
架构分层:
- 用户/输入层 (User/Input Layer): 用户输入 R&D 目标。一个
Task Initiator接收输入并创建一个新的任务会话,初始化 MCP Context。 - Task Orchestration (Simplified): 不再是复杂的中心 Planner 生成详细 DAG。而是由
Task Initiator或一个简单的Session Manager启动一个初始 Agent 或一组 Agent,并将它们指向新创建的 MCP Context。后续的流程推进主要依靠 Agent 之间的自主协作和对共享上下文的响应。 - Agent 层 (Agent Layer):
- 各种 R&D Agent 实例。
- 每个 Agent 内部结构:
- LLM Core: Agent 的大脑。
- Memory Module: 访问长期记忆 (Vector DB, KG)。
- Tool Use Module: 调用外部工具。
- MCP Interaction Module: 通过 API 调用 MCP Service,进行以下操作:
GetContext(task_id): 获取当前任务的完整或过滤后的共享上下文。AppendMessage(task_id, message_type, content): 添加新的消息到历史(如自己的思考、行动、观察)。UpdateSharedState(task_id, key, value): 更新共享工作状态。GetSharedState(task_id, key): 获取共享状态。NotifyContextUpdate(task_id, key): (Optional) 通知 MCP Service 某个关键状态已更新。
- Agent 的决策循环 (LLM 驱动):感知上下文 (通过 MCP Service) -> LLM 推理 -> 决定行动 (工具调用或更新上下文) -> 执行行动 -> 观察结果 -> 更新上下文 (通过 MCP Service) -> 重复。
- Tools & Environment 层 (Tools & Environment Layer): 外部 R&D 工具封装为 Agent 可调用的服务。
- Data & Knowledge 层 (Data & Knowledge Layer): 存储数据和长期知识。Agent 通过 Memory Module 与此层交互。MCP Service 也可能需要访问此层来 enriquecer 上下文。
- MCP (Master Context Protocol) Service Layer: 专门的服务层,提供上述 MCP Interaction Module 使用的 API,管理底层存储。
- Context Storage: 数据库 (如支持 JSONB 的 PostgreSQL, MongoDB) 存储任务历史和共享状态。
- Knowledge Linking: 可能与 Vector DB/KG 集成,根据上下文关联相关知识片段。
- Event System (Optional): Kafka/RabbitMQ 用于 Context 更新通知。
- Observability Layer: 收集日志、指标、追踪。在 MCP (Context Protocol) 模式下,追踪尤其重要,通过追溯 Agent 之间的交互和上下文变化来理解流程。
- Global State Monitoring & Human-in-the-Loop: 一个独立的服务或 UI,监控 MCP Service 中存储的全局任务状态和关键标记(如“需要人类审批”)。当检测到特定状态时,通知人类。人类通过 UI 查看上下文,并可以通过 UI 修改共享状态或直接与 Agent 交互(通过向 Context 中添加特定消息)。
- Learning & Evolution Layer: 分析 MCP Service 中存储的丰富的交互历史和任务结果,用于学习 Agent 协作模式、优化 Prompt、改进工具使用、训练迭代策略。
LLM 在各层的具体作用 (Refined):
- Task Initiator: 使用 LLM 简单解析初始目标,决定启动哪些初始 Agent 并初始化 MCP Context。
- Agent (Core Logic):
- Contextual Reasoning: LLM 接收从 MCP Service 获取的当前上下文(历史、状态),结合自身角色和长期记忆,进行推理。Prompt 会包含:“你是一个 [Agent Role]。当前任务目标是 X。以下是迄今为止的讨论和状态:[从 MCP Service 获取的历史和状态]。你的下一步行动是什么?思考过程和行动请记录到上下文中。”
- Self-Planning: LLM 在当前上下文中生成自己的行动计划。
- Tool Parameter Generation: LLM 根据上下文和计划生成工具调用的具体参数。
- Observation Interpretation: LLM 解释工具执行结果或从 MCP Service 获取的其他 Agent 的更新。
- Context Formatting: LLM 生成要添加到共享上下文中的结构化或非结构化内容(思考、行动、观察、总结、发现)。
- Global State Monitoring / HITL: LLM 可以辅助分析共享上下文,提取关键摘要或检测潜在问题,呈现给人类。
技术栈与实现思路 (基于 MCP Service):
- MCP (Master Context Protocol) Service 实现:
- Backend Framework: FastAPI 或 Spring Boot 构建 RESTful API。
- Context Storage: PostgreSQL with JSONB column 或 MongoDB 存储 Context Document。每个任务一个 Document,包含
history(array of messages) 和shared_state(JSON object)。 - API Endpoints:
/tasks/{task_id}/context: GET - 获取上下文/tasks/{task_id}/messages: POST - 添加消息到历史/tasks/{task_id}/state: GET/POST/PUT - 获取/更新共享状态/tasks: POST - 创建新任务上下文
- Knowledge Linking: 内部逻辑根据 Context 中的关键词或实体,查询 Vector DB 或 KG,并将相关知识片段添加到 Context 中(或提供链接)。
- Agent Layer 实现:
- Agent Framework: AutoGen (非常适合多 Agent 对话,只需调整其通信机制使用 MCP Service) 或 LangChain (构建单个 Agent)。
- MCP Interaction Module: Agent 代码内部封装对 MCP Service API 的调用。
- Agent Reasoning Prompt: 核心 Prompt 引导 Agent 使用 MCP API 获取和更新上下文。
- Deployment: Kubernetes 容器化部署。
- Task Initiator & Global State Monitor:
- 一个简单的 Web 服务或命令行工具作为 Initiator,调用 MCP Service 创建新任务,启动初始 Agent。
- Global State Monitor 是一个后台服务,定期查询 MCP Service 的任务状态,或监听 Context 更新事件,并在 UI 上展示。
- Workflow Engine (Optional / Simplified Role): 如果需要更强的流程控制(如顺序执行、条件分支),Workflow Engine 仍有用武之地,但它的任务步骤不再是“调用一个 Agent 完成所有工作”,而是“等待 MCP Context 中的某个状态达到特定值”、“向 Context 添加一条消息触发某个 Agent”。或者,一个“流程管理 Agent”可以利用 MCP 协调其他 Agent 完成一个流程。
- 其他层: 与之前方案类似 (Kubernetes, Kafka/RabbitMQ, Databases, Observability Stack)。
实现流程概念伪代码:
1. MCP (Master Context Protocol) Service (Simplified FastAPI Example)
# Pseudo-code for MCP Service using FastAPI
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Any
import uuid
import time
app = FastAPI()
# In-memory storage for simplicity, replace with DB in production
contexts: Dict[str, Dict[str, Any]] = {}
class Message(BaseModel):
sender_id: str
sender_type: str # e.g., "agent", "human", "system"
type: str # e.g., "thought", "action", "observation", "tool_call", "tool_result", "human_input", "status_update"
content: Any
timestamp: float = Field(default_factory=time.time)
class TaskContext(BaseModel):
task_id: str
created_at: float = Field(default_factory=time.time)
messages: List[Message] = [] # History of interactions
shared_state: Dict[str, Any] = {} # Key-value pairs for shared state
# Add fields for linked knowledge, relevant documents etc.
@app.post("/tasks", response_model=TaskContext)
def create_task(initial_goal: str):
task_id = str(uuid.uuid4())
new_context = TaskContext(task_id=task_id, shared_state={"status": "CREATED", "initial_goal": initial_goal})
contexts[task_id] = new_context.model_dump() # Store as dict for mutation
print(f"Task {task_id} created.")
return new_context
@app.get("/tasks/{task_id}/context", response_model=TaskContext)
def get_context(task_id: str):
if task_id not in contexts:
raise HTTPException(status_code=404, detail="Task not found")
return TaskContext(**contexts[task_id]) # Return as Pydantic model
@app.post("/tasks/{task_id}/messages")
def add_message(task_id: str, message: Message):
if task_id not in contexts:
raise HTTPException(status_code=404, detail="Task not found")
contexts[task_id]['messages'].append(message.model_dump())
# Optional: Trigger event notification here
print(f"Task {task_id}: Added message from {message.sender_id} ({message.type})")
return {"status": "success"}
@app.put("/tasks/{task_id}/state")
def update_shared_state(task_id: str, state_update: Dict[str, Any]):
if task_id not in contexts:
raise HTTPException(status_code=404, detail="Task not found")
contexts[task_id]['shared_state'].update(state_update)
# Optional: Trigger event notification if specific keys updated
print(f"Task {task_id}: Updated shared state: {state_update}")
return {"status": "success"}
@app.get("/tasks/{task_id}/state", response_model=Dict[str, Any])
def get_shared_state(task_id: str):
if task_id not in contexts:
raise HTTPException(status_code=404, detail="Task not found")
return contexts[task_id]['shared_state']
# Run with: uvicorn mcp_service:app --reload
2. Agent Layer: Example Agent (Literature Review Agent) - Interacting with MCP
# Pseudo-code for a Literature Review Agent's internal logic (Simplified)
from agent_core_base import AgentCore # Base class handles LLM, Memory, Tools
from tools import SearchPubMedTool, SummarizeTextTool
from mcp_client import MCPClient # Custom client for MCP Service API
class LiteratureReviewAgent(AgentCore): # Inherit from base Agent class
def __init__(self, id, llm, memory, tools, mcp_client: MCPClient, task_id: str):
super().__init__(id, llm, memory, tools)
self.mcp_client = mcp_client
self.task_id = task_id # Agent is initialized for a specific task context
def run(self):
# Agent starts its execution loop
print(f"Agent {self.id}: Starting execution for task {self.task_id}")
try:
while True:
# 1. Get current context from MCP
context = self.mcp_client.get_context(self.task_id)
current_history = context['messages']
shared_state = context['shared_state']
# Check if task is already completed or requires human intervention
if shared_state.get('status') == 'COMPLETED' or shared_state.get('status') == 'NEEDS_HUMAN_REVIEW':
print(f"Agent {self.id}: Task status is {shared_state.get('status')}. Exiting.")
break # Exit loop
# 2. LLM Reasoning based on context
# Craft a prompt that includes the history and state from context
prompt = self.generate_reasoning_prompt(current_history, shared_state)
llm_response = self.llm.chat(prompt) # Assuming chat model
thought = extract_thought_from_llm_response(llm_response) # Needs parsing
action = extract_action_from_llm_response(llm_response) # Needs parsing (tool call or update_state)
# 3. Log thought to MCP
self.mcp_client.add_message(self.task_id, self.id, "thought", thought)
if action:
action_type = action['type']
action_details = action['details']
# 4. Log action to MCP
self.mcp_client.add_message(self.task_id, self.id, "action", action)
if action_type == "tool_call":
tool_name = action_details['tool_name']
tool_params = action_details['tool_params']
print(f"Agent {self.id}: Calling tool {tool_name} with {tool_params}")
try:
tool_result = self.use_tool(tool_name, tool_params)
print(f"Agent {self.id}: Tool {tool_name} returned {tool_result}")
# 5. Log observation/tool result to MCP
self.mcp_client.add_message(self.task_id, self.id, "observation", tool_result)
self.mcp_client.add_message(self.task_id, self.id, "tool_result", tool_result)
# 6. Based on result, maybe update shared state?
if self.check_if_literature_complete(tool_result): # Agent's own logic
self.mcp_client.update_shared_state(self.task_id, {"literature_review_status": "COMPLETED", "literature_summary": tool_result['summary']})
print(f"Agent {self.id}: Literature review task completed.")
# Agent might choose to exit here, or LLM might decide the next step
# break # Example: Exit after completion
except Exception as tool_error:
error_msg = f"Tool {tool_name} failed: {tool_error}"
print(error_msg)
# 5. Log error to MCP
self.mcp_client.add_message(self.task_id, self.id, "observation", {"error": error_msg})
self.mcp_client.update_shared_state(self.task_id, {"status": "ERROR_IN_LIT_REVIEW", "error_details": error_msg})
# Agent might decide to stop or seek help
break # Example: Exit on major error
elif action_type == "update_state":
state_updates = action_details['updates']
self.mcp_client.update_shared_state(self.task_id, state_updates)
print(f"Agent {self.id}: Updated shared state with {state_updates}")
elif action_type == "delegate": # Example: Agent decides to delegate to another agent type
target_agent_type = action_details['agent_type']
delegation_task = action_details['task_description']
# Log this delegation to the context so other agents can see it
self.mcp_client.add_message(self.task_id, self.id, "delegation", {"to_agent_type": target_agent_type, "description": delegation_task})
# The Session Manager or another Agent might pick this up and launch the new agent
# This requires other components monitoring the context
elif action_type == "report_completion": # Agent signals its part is done
final_summary = action_details['summary']
self.mcp_client.update_shared_state(self.task_id, {"literature_review_status": "FINALIZED", "final_summary": final_summary})
self.mcp_client.add_message(self.task_id, self.id, "status_update", {"status": "Literature review finalized"})
print(f"Agent {self.id}: Reported finalization.")
# break # Agent might exit after its specific sub-task is done
# ... other action types (e.g., ask_human_for_help)
else: # LLM didn't generate a valid action or signaled completion implicitly
# Needs logic to detect if LLM is stuck or implicitly finished
print(f"Agent {self.id}: LLM returned no valid action or finished.")
# Maybe update state to signal potential stagnation or sub-task completion?
# self.mcp_client.update_shared_state(self.task_id, {"status": "LIT_REVIEW_AGENT_STALLED"})
break # Example: Exit loop
time.sleep(5) # Prevent tight loop, allow other agents to potentially act
except Exception as e:
print(f"Agent {self.id} encountered fatal error: {e}")
# Log fatal error and update global state
self.mcp_client.add_message(self.task_id, self.id, "system_error", {"error": str(e)})
self.mcp_client.update_shared_state(self.task_id, {"status": "FATAL_ERROR", "error_source": self.id, "error_details": str(e)})
def generate_reasoning_prompt(self, history, shared_state):
# Craft the core prompt for the LLM, injecting relevant history and state
history_text = "\n".join([f"{msg['sender_type']} ({msg['sender_id']}) [{msg['type']}]: {msg['content']}" for msg in history])
state_text = json.dumps(shared_state, indent=2)
prompt = f"""
You are a {self.agent_type} with ID {self.id}. Your current task context (ID: {self.task_id}) is managed by the Master Context Protocol Service.
Review the task history and shared state below. Determine your next step to contribute to the overall R&D goal defined in the shared state ('initial_goal').
Your actions should be one of the following types: 'tool_call', 'update_state', 'delegate', 'report_completion', 'ask_human_for_help'.
Output your thought process first, then your chosen action in JSON format. If no action is needed now, explain why.
Current Shared State:
{state_text}
Task History:
{history_text}
Available Tools: {list(self.tools.keys())}
Think step-by-step. What is the current situation? What needs to be done next based on the goal and state? Which action is most appropriate?
"""
return prompt
def check_if_literature_complete(self, tool_result):
# Agent-specific logic to determine if its sub-task is done
# E.g., check if a summary is generated, if enough papers were reviewed, etc.
pass # Placeholder
# use_tool method would be similar to previous examples, calling external services
# class AgentCoreBase would handle the LLM interaction and basic structure
3. Task Initiator & Global State Monitor (Simplified)
# Pseudo-code for Task Initiator (e.g., a simple script or service)
from mcp_client import MCPClient
mcp_client = MCPClient(mcp_service_url="http://mcp_service:8000") # Point to MCP Service
def start_new_rnd_task(user_goal: str):
print(f"Initiating R&D task with goal: {user_goal}")
# 1. Create a new context for the task
new_context = mcp_client.create_task(user_goal)
task_id = new_context['task_id']
print(f"New task created with ID: {task_id}")
# 2. Start initial Agent(s) and provide them with the task_id
# This requires knowing which agent(s) should start based on the initial goal
# This logic could be hardcoded, rule-based, or even LLM-decided by the Initiator.
initial_agent_types = ["Literature Review Agent", "Hypothesis Generation Agent"] # Example
for agent_type in initial_agent_types:
# Assume a way to launch an agent instance (e.g., calling a Kubernetes API to create a Pod,
# or calling an Agent Management Service)
launch_agent_instance(agent_type, task_id) # This function starts the Agent process
print(f"Launched initial agent: {agent_type} for task {task_id}")
print("Initial agents launched. Monitoring task state via MCP.")
# Pseudo-code for Global State Monitor (e.g., a background process or UI component)
def monitor_rnd_tasks():
mcp_client = MCPClient(mcp_service_url="http://mcp_service:8000")
while True:
# 1. Get overview of tasks (e.g., list all tasks or recent ones)
tasks_overview = mcp_client.list_tasks() # Needs a list_tasks endpoint in MCP Service
for task_summary in tasks_overview:
task_id = task_summary['task_id']
current_state = mcp_client.get_shared_state(task_id)
print(f"Task {task_id}: Status - {current_state.get('status', 'N/A')}, Goal - {current_state.get('initial_goal', 'N/A')}")
# Check for specific conditions needing human attention
if current_state.get('status') == 'NEEDS_HUMAN_REVIEW':
print(f"ALERT: Task {task_id} needs human review!")
# Trigger notification (email, slack) and provide link to UI showing context
if current_state.get('status') == 'FATAL_ERROR':
print(f"ALERT: Task {task_id} encountered FATAL ERROR!")
# Trigger notification
time.sleep(60) # Check every minute
# launch_agent_instance(agent_type, task_id) pseudo-code:
# This function is part of the Agent Management layer, not MCP itself.
# It would typically interact with Kubernetes to deploy a pod,
# passing task_id and MCP Service endpoint as environment variables or arguments
# so the Agent knows which context to join.
def launch_agent_instance(agent_type, task_id):
# Example: Call Kubernetes API to create a Pod
k8s_client = get_kubernetes_client()
pod_manifest = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {"generateName": f"{agent_type.lower().replace(' ', '-')}-"},
"spec": {
"containers": [{
"name": "agent",
"image": f"your_agent_image:{agent_type.lower().replace(' ', '-')}", # Need different images or config for each type
"env": [
{"name": "AGENT_ID", "value": str(uuid.uuid4())},
{"name": "AGENT_TYPE", "value": agent_type},
{"name": "TASK_ID", "value": task_id},
{"name": "MCP_SERVICE_URL", "value": "http://mcp_service:8000"},
# ... other env vars for LLM endpoint, DBs, etc.
]
}],
"restartPolicy": "Never" # Or OnFailure depending on desired behavior
}
}
k8s_client.create_namespaced_pod("default", pod_manifest)
print(f"Requested Kubernetes to launch {agent_type} pod for task {task_id}")
与 Dify / AutoGen 的结合:
- Dify: Dify 的 Agent 能力和 Workflow 可以在这个框架中用于构建单个具备复杂工具使用或 RAG 能力的 Agent 类型。您可以将一个 Dify Agent App 或 Workflow 视为一个黑盒的、可通过 API 调用的服务,这个服务内部可能集成了 LLM 和工具。然后,您的定制 Agent 代码(如上面伪代码中的
LiteratureReviewAgent)在需要时,不是直接调用底层工具,而是调用这个 Dify Agent 的 API 来完成一个子任务。Dify 的 RAG 和 Prompt 管理能力依然非常有用。 - AutoGen: AutoGen 本身就是一个多 Agent 框架,它有自己的基于消息传递的协作机制。您可以选择不使用 AutoGen 内置的群聊或工作流,而是将 AutoGen 的
Agent类用作构建单个 Agent 实例(它负责自身的 LLM、Tool Use 等)的基础,然后修改 AutoGen Agent 的通信层,使其通过调用您的 MCP Service API 来发送和接收消息(即send方法写入 MCP,receive方法从 MCP 读取相关消息)。或者,更简单地,将 整个 AutoGenGroupChat视为一个复杂的 Agent 类型,通过 MCP Service 接收任务,然后它在内部使用 AutoGen 的机制完成任务,最后将最终结果和关键过程总结更新到 MCP Context 中。
优势 (与中心控制程序相比):
- 更去中心化/自主性: Agent 更多地基于共享上下文和自身逻辑行动,理论上更灵活。
- 潜在的更强的韧性: 某个 Agent 失败不一定导致整个系统停滞(如果其他 Agent 能感知到并接管或寻求帮助)。
- 协作模式的灵活性: 不局限于预设的 DAG,可以通过 Agent 在 Context 中的对话和状态更新发展出更动态的协作。
- 易于调试和理解 (如果 MCP 实现得好): 整个任务的执行过程和 Agent 交互记录在共享 Context 中,方便回溯。
挑战 (与中心控制程序相比):
- 协调的复杂性: 确保所有 Agent 对任务目标和当前状态有共同理解,避免冲突、重复工作或遗漏关键步骤,需要更精巧的 Agent 设计和 Prompt Engineering。
- 收敛性: 没有一个强中心的协调者来强制流程推进,如何确保 Agent 团队最终能够收敛到完成任务的状态,而不是陷入僵局或无效循环?需要 Agent 内置强大的自我纠错和寻求帮助(人类或特定管理 Agent)的能力。
- Context 管理: 共享上下文可能变得非常庞大且包含噪音。如何让 Agent (LLM) 有效地从大量历史和状态信息中提取 relevant 信息进行决策,并避免 Context Window 限制?需要先进的 Context Management 技术(如 RAG on Context, summarization)。
- 全局优化难度: 难以进行全局最优的资源分配和调度,因为决策是分布在各个 Agent 中的。
总结:
LLM + Agent + MCP (Master Context Protocol) 模式提供了一种构建 R&D 自动化平台的可行且更具自主性的思路。核心是将共享上下文作为 Agent 协作的中心媒介。MCP Service 负责管理这个共享上下文,而 Agent 则通过与 MCP Service 交互,感知环境、执行任务并影响环境。
实现的关键在于设计健壮的 MCP Service API、编写能够有效利用共享上下文进行推理和协作的 Agent Prompt 与内部逻辑、以及构建一套能够监控共享状态并适时介入的 Global State Monitor 和 Human-in-the-Loop 机制。
这个模式尤其适合需要更灵活、更接近人类团队协作模式的 R&D 任务,但对 Agent 设计的自主性和鲁棒性提出了更高的要求,并且需要解决共享上下文管理和任务收敛性等复杂问题。