Python实现MCP Server的完整Demo

发布于:2025-07-08 ⋅ 阅读:(31) ⋅ 点赞:(0)

mcp server

from fastmcp import FastMCP
import logging

# 配置日志记录
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

mcp = FastMCP("DemoServer")

@mcp.tool()
async def calculate(a: float, b: float, op: str) -> float:
    """数学计算器(支持加减乘除)"""
    ops = {
        '+': a + b,
        '-': a - b,
        '*': a * b,
        '/': a / b if b != 0 else float('nan')
    }
    result = ops.get(op, float('nan'))
    logger.info(f"计算: {a}{op}{b}={result}")
    return result

@mcp.resource("greet://{name}")
async def greeting(name: str) -> str:
    """个性化问候服务"""
    return f"您好, {name}! 当前时间:2025-07-07"

if __name__ == "__main__":
    # 启动服务(推荐HTTP模式便于远程调用)
    mcp.run(transport="streamable-http", port=8000)

mcp client

import asyncio
from mcp.client import HttpClient

async def main():
    async with HttpClient("http://localhost:8000") as client:
        # 调用计算工具
        calc_result = await client.call_tool(
            "calculate", 
            {"a": 5, "b": 3, "op": "*"}
        )
        print(f"计算结果: {calc_result}")

        # 访问问候资源
        greet_result = await client.get_resource(
            "greet://开发者"
        )
        print(greet_result)

asyncio.run(main())

关键配置说明:

  1. 需先安装fastmcp库:uv pip install fastmcp httpx15
  2. 支持三种传输模式:stdio(默认)、sse和streamable-http34
  3. 工具方法需用@mcp.tool()装饰器注册,资源用@mcp.resource()9
  4. 生产环境建议添加类型校验和错误处理

网站公告

今日签到

点亮在社区的每一天
去签到