文章目录
说明
- 本文学自赋范社区公开课,仅供学习和交流使用,不用作任何商业用途!
一 Agents SDK接入MCP
1.1 MCP技术回顾
1.2 MCP基础实践流程
- 先尝试手动实现一遍
MCP
实践流程,再考虑将已经部署好的server
带入Agents SDK
中,作为tools
进行调用。 - 一个极简的天气查询MCP基本执行流程:
1.2.1 天气查询服务器Server创建流程
- 创建一个天气查询的服务器,通过openweather,创建一个能够实时查询天气的服务器(server)。
curl -s "https://api.openweathermap.org/data/2.5/weather?q=Beijing&units=metric&appid=xxx"
- 执行结果:
{ "coord": { "lon": 116.3972, "lat": 39.9075 }, "weather": [ { "id": 804, "main": "Clouds", "description": "阴,多云", "icon": "04n" } ], "base": "stations", "main": { "temp": 22.36, "feels_like": 22.77, "temp_min": 22.36, "temp_max": 22.36, "pressure": 1007, "humidity": 81, "sea_level": 1007, "grnd_level": 1002 }, "visibility": 10000, "wind": { "speed": 1.42, "deg": 26, "gust": 3.23 }, "clouds": { "all": 100 }, "dt": 1753014180, "sys": { "country": "CN", "sunrise": 1752958921, "sunset": 1753011546 }, "timezone": 28800, "id": 1816670, "name": "Beijing", "cod": 200 }
1.2.2 服务器依赖安装和代码编写
- 创建项目目录,创建并激活虚拟环境。
uv init mcp-weather cd mcp-weather uv venv .venv\Scripts\activate
- 在当前虚拟环境中添加如下依赖:
pip install mcp httpx openai python-dotenv pypinyin openai-agents
- 创建server服务器代码文件
server.py
:import json import httpx from typing import Any from mcp.server.fastmcp import FastMCP # 初始化mcp服务器 mcp=FastMCP("WeatherServer") #OpenWeather API 配置 OPENWEATHER_API_BASE = "https://api.openweathermap.org/data/2.5/weather" API_KEY ="xxx" USER_AGENT = "weather-app/1.0" async def fetch_weather(city: str) -> dict[str, Any]|None: """ 获取天气信息 """ params={ "q": city, "appid": API_KEY, "units": "metric", "lang": "zh_cn" } headers={ "User-Agent": USER_AGENT } async with httpx.AsyncClient() as client: response = await client.get(OPENWEATHER_API_BASE, params=params, headers=headers,timeout=1000) if response.status_code == 200: return response.json() else: return None def format_weather(data: dict[str,Any] | str)->str: """ 解析天气数据字典,提取关键信息并格式化输出。 功能:对可能缺失的嵌套数据字段进行容错处理,确保返回内容完整。 参数: data: 天气API返回的原始数据字典 返回: 格式化后的天气信息字符串 """ # 基础位置信息(城市、国家)- 缺失时显示"未知" city = data.get("name", "未知") # 城市名称(顶层字段) country = data.get("sys", {}).get("country", "未知") # 国家代码(嵌套在sys字段中) # 天气核心指标 - 缺失时显示"N/A"(Not Available) main_data = data.get("main", {}) # 提取main字段(包含温度、湿度等) temperature = main_data.get("temp", "N/A") # 温度 humidity = main_data.get("humidity", "N/A") # 湿度 wind_data = data.get("wind", {}) # 提取wind字段(包含风速等) wind_speed = wind_data.get("speed", "N/A") # 风速 # 天气描述 - weather字段可能为空列表,默认返回第一个元素的描述 weather_list = data.get("weather", [{}]) # 提取weather数组(默认空字典避免索引错误) weather_description = weather_list[0].get("description", "未知") # 天气状况描述 # 格式化输出字符串(使用f-string拼接,添加emoji直观展示) weather_info = ( f"🌍 {city}, {country}\n" f"🌡️ 温度:{temperature}℃\n" f"💧 湿度:{humidity}%\n" f"💨 风速:{wind_speed} m/s\n" f"☁️ 天气:{weather_description}\n" ) return weather_info @mcp.tool() async def query_weather(city: str) -> str: """ 查询天气信息并返回结果 """ weather_data = await fetch_weather(city) if weather_data: return format_weather(weather_data) else: return "无法获取天气信息。请检查城市名称是否正确。" if __name__=="__main__": mcp.run(transport='stdio')
1.2.3 环境配置文件
- 创建.env文件
BASE_URL="https://api.siliconflow.cn/v1/chat/completions"
MODEL=deepseek-ai/DeepSeek-V3
OPENAI_API_KEY="sk-xxx"
1.2.4 客户端代码编写
- 创建一个可以和
server
进行通信的客户端,需要注意的是,该客户端需要包含大模型调用的基础信息。我们需要编写一个client.py
脚本。import asyncio import os import json import sys from typing import Optional from contextlib import AsyncExitStack from openai.types.chat import ChatCompletionToolParam from openai import OpenAI from dotenv import load_dotenv from mcp import ClientSession,StdioServerParameters from mcp.client.stdio import stdio_client from pypinyin import lazy_pinyin, Style # 加载env文件,确保配置正确 load_dotenv() class MCPClient: def __init__(self): """初始化MCP客户端""" self.write = None self.stdio = None self.exit_stack = AsyncExitStack() self.base_url=os.getenv("BASE_URL") self.model = os.getenv("MODEL") self.openai_api_key = os.getenv("OPENAI_API_KEY") if not self.openai_api_key: raise ValueError("OPENAI_API_KEY未设置") self.client=OpenAI(api_key=self.openai_api_key, base_url=self.base_url) self.session: Optional[ClientSession] = None self.exit_stack=AsyncExitStack() async def connect_to_server(self, server_script_path:str): """连接MCP服务器并列出可用工具""" is_python=server_script_path.endswith(".py") is_js=server_script_path.endswith(".js") if not (is_python or is_js): raise ValueError("服务器脚本必须以.py或.js结尾") command="python" if is_python else "node" server_params=StdioServerParameters( command=command, args=[server_script_path], env=None) # 启动MCP服务器并建立通信 stdio_transport=await self.exit_stack.enter_async_context(stdio_client(server_params)) self.stdio,self.write=stdio_transport self.session= await self.exit_stack.enter_async_context(ClientSession(self.stdio,self.write)) await self.session.initialize() # 列出MCP服务器上的工具 response=await self.session.list_tools() tools=response.tools print("\n已连接到服务器,支持以下工具:",[tool.name for tool in tools]) async def process_query(self, query:str)-> str: """ 使用大模型处理查询并调用可用的MCP工具(Function Calling) """ messages=[{"role": "user","content": query}] response=await self.session.list_tools() available_tools = [ ChatCompletionToolParam( type="function", function={ "name": tool.name, "description": tool.description, "parameters": tool.inputSchema } ) for tool in response.tools ] response= self.client.chat.completions.create( model=self.model, messages=messages, tools=available_tools ) # 处理返回的内容 content=response.choices[0] # 检查是否使用了工具 if content.finish_reason == "tool_calls": # 何时需要使用工具就解析工具 tool_call = content.message.tool_calls[0] tool_name=tool_call.function.name tool_args=json.loads(tool_call.function.arguments) # 如果调用的是 query_weather 工具,处理城市名称 if tool_name == "query_weather" and "city" in tool_args: city = tool_args["city"] # 简单判断是否为中文城市名 if any('\u4e00' <= c <= '\u9fff' for c in city): # 转换为拼音,首字母大写 pinyin_city = ''.join([word.capitalize() for word in lazy_pinyin(city)]) tool_args["city"] = pinyin_city # 执行工具 result=await self.session.call_tool(tool_name, tool_args) print(f"\n\n[Calling Tool: {tool_name} with args: {tool_args}]") # 将工具调用和结果添加到消息历史中 messages.append(content.message.model_dump()) messages.append({ "role": "tool", "content": result.content[0].text, "tool_call_id": tool_call.id }) # 将上面的结果再返回给大模型用于最终的效果 response=self.client.chat.completions.create(model=self.model,messages=messages) return response.choices[0].message.content # 如果调用工具直接返回结果 return content.message.content async def chat_loop(self): """运行交互式聊天循环""" print("\nMCP客户端已启动!输入'quit'退出") while True: try: query=input("\n you:").strip() if query.lower() == "quit": break response=await self.process_query(query) print(f"\n ai: {response}") except Exception as e: print(f"\n Error: {e}") async def cleanup(self): """清理资源""" await self.exit_stack.aclose() async def main(): if len(sys.argv)<2: print("Usage: python client.py <server_address>") sys.exit(1) client=MCPClient() try: await client.connect_to_server(sys.argv[1]) await client.chat_loop() finally: await client.cleanup() if __name__ == "__main__": asyncio.run(main())
1.3 测试运行
进入项目目录,激活虚拟环境
cd ./mcp-weather source .venv/bin/activate
运行MCP客户端和服务器
uv run client.py server.py
(mcp-weather) D:\Code\mcp-study\mcp-weather>uv run client.py server.py 已连接到服务器,支持以下工具: ['query_weather'] MCP客户端已启动!输入'quit'退出 you:请问北京今天天气如何? [Calling Tool: query_weather with args: {'city': 'BeiJing'}] ai: 北京今天的天气情况如下: 🌍 **北京,中国** 🌡️ **温度**:22.85℃ 💧 **湿度**:74% 💨 **风速**:2.14 m/s ☁️ **天气状况**:阴天,多云 请根据实际需要增减衣物,出行注意安全!
二 MCP+Agents SDK基础调用
Agents SDK
可以将某个对应的Agent
封装为client
与外部定义好的server
进行通信。客户端为client_agent.py
、服务端weather_server.py
2.1 weather_server.py
import json
import httpx
from typing import Any
from mcp.server.fastmcp import FastMCP
# 初始化mcp服务器
mcp=FastMCP("WeatherServer")
#OpenWeather API 配置
OPENWEATHER_API_BASE = "https://api.openweathermap.org/data/2.5/weather"
API_KEY ="xxx"
USER_AGENT = "weather-app/1.0"
async def fetch_weather(city: str) -> dict[str, Any]|None:
"""
获取天气信息
"""
params={
"q": city,
"appid": API_KEY,
"units": "metric",
"lang": "zh_cn"
}
headers={
"User-Agent": USER_AGENT
}
async with httpx.AsyncClient() as client:
response = await client.get(OPENWEATHER_API_BASE, params=params, headers=headers,timeout=1000)
if response.status_code == 200:
return response.json()
else:
print(f"Error fetching weather data: {response.status_code}, {response.text}") # 增加日志输
return None
def format_weather(data: dict[str,Any] | str)->str:
"""
解析天气数据字典,提取关键信息并格式化输出。
功能:对可能缺失的嵌套数据字段进行容错处理,确保返回内容完整。
参数:
data: 天气API返回的原始数据字典
返回:
格式化后的天气信息字符串
"""
# 基础位置信息(城市、国家)- 缺失时显示"未知"
city = data.get("name", "未知") # 城市名称(顶层字段)
country = data.get("sys", {}).get("country", "未知") # 国家代码(嵌套在sys字段中)
# 天气核心指标 - 缺失时显示"N/A"(Not Available)
main_data = data.get("main", {}) # 提取main字段(包含温度、湿度等)
temperature = main_data.get("temp", "N/A") # 温度
humidity = main_data.get("humidity", "N/A") # 湿度
wind_data = data.get("wind", {}) # 提取wind字段(包含风速等)
wind_speed = wind_data.get("speed", "N/A") # 风速
# 天气描述 - weather字段可能为空列表,默认返回第一个元素的描述
weather_list = data.get("weather", [{}]) # 提取weather数组(默认空字典避免索引错误)
weather_description = weather_list[0].get("description", "未知") # 天气状况描述
# 格式化输出字符串(使用f-string拼接,添加emoji直观展示)
weather_info = (
f"🌍 {city}, {country}\n"
f"🌡️ 温度:{temperature}℃\n"
f"💧 湿度:{humidity}%\n"
f"💨 风速:{wind_speed} m/s\n"
f"☁️ 天气:{weather_description}\n"
)
return weather_info
@mcp.tool()
async def query_weather(city: str) -> str:
"""
查询天气信息并返回结果
"""
weather_data = await fetch_weather(city)
if weather_data:
return format_weather(weather_data)
else:
return "无法获取天气信息。请检查城市名称是否正确。"
if __name__=="__main__":
mcp.run(transport='stdio')
2.2 client_agent.py
import asyncio
import time
from openai import AsyncOpenAI
from agents import OpenAIChatCompletionsModel,Agent, Runner, gen_trace_id, trace, set_default_openai_client
from agents.mcp import MCPServer, MCPServerStdio
from agents.model_settings import ModelSettings
from agents import set_tracing_disabled # or from your framework's module
OPENAI_API_KEY="hk-xxx"
OPENAI_API_BASE="https://api.openai-hk.com/v1"
MODEL="deepseek-v3"
# 创建一个Agent对象并调用DeepSeek模型
external_client = AsyncOpenAI(
base_url = OPENAI_API_BASE,
api_key= OPENAI_API_KEY,
)
set_default_openai_client(external_client)
set_tracing_disabled(True)
deepseek_model=OpenAIChatCompletionsModel(
model=MODEL,
openai_client=external_client)
async def run(mcp_server: MCPServer):
agent = Agent(
name="Assistant",
instructions="你是一名助人为乐的助手",
mcp_servers=[mcp_server],
model=deepseek_model
)
message = "请帮我查询Beijing天气如何?"
print(f"Running: {message}")
result = await Runner.run(starting_agent=agent, input=message)
print(result.final_output)
async def mcp_run():
async with MCPServerStdio(
name="Weather Server",
cache_tools_list=True,
params = {"command": "uv","args": ["run", "weather_server.py"]}
) as server:
await run(server)
if __name__ == "__main__":
asyncio.run(mcp_run())
2.3 运行测试
(mcp-weather) D:\Code\mcp-study\mcp-weather>uv run client_agent.py
Running: 请帮我查询Beijing天气如何?
北京当前的天气情况如下:
- **温度**:34.66℃
- **湿度**:54%
- **风速**:2.78 m/s
- **天气状况**:晴
天气晴朗,但温度较高,请注意防晒和补水!