随着大语言模型(LLMs)的飞速发展,模型与应用之间的通信效率和灵活性变得至关重要。Model Context Protocol (MCP) 作为专为模型交互设计的协议,一直在不断进化以满足日益增长的需求。近期,MCP引入了一个令人振奋的新特性——StreamableHTTP 通信协议。这一特性旨在提供一种更高效、更通用、更易于集成的方式来实现模型服务器与客户端之间的流式数据交换。
1. MCP 新特性概览:StreamableHTTP 成为焦点
传统的 MCP 通信方式(如 stdio
)在某些场景下表现出色,尤其是在本地进程间通信。然而,随着模型服务的分布式部署、云原生架构的普及以及对 Web 友好性的追求,stdio
的局限性逐渐显现。
StreamableHTTP 的引入,为 MCP 带来了以下关键优势:
- Web 友好性:基于 HTTP/1.1 Chunked Transfer Encoding 或 HTTP/2 Streams,天然兼容现有的 Web 基础设施,如反向代理、负载均衡器、防火墙等。
- 标准化与通用性:HTTP 是应用最广泛的协议之一,开发者对此非常熟悉,降低了学习和集成成本。
- 双向流式处理:支持高效的双向流,这对于需要持续交换上下文或进行多轮对话的大模型应用至关重要。
- 持久连接:通过单个 HTTP 连接处理多个 MCP 请求和响应流,减少了连接建立的开销。
- 元数据处理:HTTP Headers 可以方便地携带元数据,简化了认证、路由等机制的实现。
可以预见,StreamableHTTP 将成为 MCP 在分布式环境中部署和应用的首选传输方式。
2. 为什么选择 StreamableHTTP?与 stdio、SSE 的对比
在深入 StreamableHTTP 实现之前,我们有必要理解为什么需要一种新的通信方式,以及它与现有方案(如 stdio
和 Server-Sent Events (SSE))相比有何优势。
MCP over stdio (
mcp://stdio
)- 优点:简单直接,适用于本地父子进程通信,延迟低。例如,一个应用直接启动并管理一个本地模型进程。
- 缺点:
- 非网络化:不适用于分布式系统或远程模型调用。
- 扩展性差:难以实现负载均衡和水平扩展。
- 单连接限制:通常一个
stdio
通道对应一个模型实例的完整生命周期。
Server-Sent Events (SSE)
- 优点:
- 基于 HTTP,Web 友好。
- 实现简单,用于服务器向客户端单向推送事件流。
- 缺点:
- 单向性:SSE 主要设计为服务器到客户端的单向流。虽然客户端可以通过独立的 HTTP 请求发送数据给服务器,但这并非原生的双向流,也无法在同一连接上高效处理客户端流式输入。
- 不完全适合 MCP 场景:MCP 通常需要双向流式交互,客户端可能流式发送输入(例如,长文档分块处理),服务器同时流式返回结果。SSE 在此方面能力有限。
- 元数据处理相对局限:虽然可以通过事件数据本身携带,但不如 HTTP Headers 灵活。
- 优点:
StreamableHTTP for MCP (
mcp://http
或mcp://https
)- 核心机制:利用 HTTP/1.1 的
Transfer-Encoding: chunked
或 HTTP/2 的 Streams 特性,在单个持久 HTTP 连接上实现双向、任意长度的数据流传输。 - 优点:
- 双向流:完美支持客户端到服务器、服务器到客户端的并发流数据。
- Web 基础设施兼容:轻松集成到现有网络架构中,如 API 网关、Service Mesh 等。
- 标准化:开发者熟悉 HTTP,有大量现成的库和工具支持。
- 元数据与控制:HTTP Headers 提供了丰富的元数据传递机制,方便实现认证、内容协商、错误处理等。
- 高性能,多路复用 (HTTP/2):HTTP/2 进一步支持在单个连接上并发处理多个请求和响应流,效率更高。
- 持久连接:减少了为每个 MCP 会话建立新 TCP 连接的开销。
- 核心机制:利用 HTTP/1.1 的
对比总结:
特性 | MCP over stdio | Server-Sent Events (SSE) | StreamableHTTP for MCP |
---|---|---|---|
方向性 | 双向(本地) | 单向(服务器 -> 客户端) | 双向 |
网络化 | 否 | 是 | 是 |
Web 兼容 | 否 | 是 | 是 |
基础设施 | 不适用 | 标准 Web 服务器 | 标准 Web 服务器/代理 |
持久连接 | 进程生命周期 | 是 (但常用于单向推送) | 是 (为双向流优化) |
元数据 | 有限 (依赖协议本身) | 有限 (事件内数据) | 丰富 (HTTP Headers) |
适用场景 | 本地模型集成 | 实时通知、简单数据流 | 分布式模型服务、复杂交互 |
显然,StreamableHTTP 通过结合 HTTP 的强大功能和流式处理的效率,为 MCP 提供了一个更现代化、更通用的解决方案,特别适合构建可扩展、高性能的大模型应用。
3. 实战:构建基于 StreamableHTTP 的 MCP 应用
接下来,我们将通过具体的 Python 代码示例,演示如何搭建一个简单的 MCP 服务器和客户端,它们之间通过 StreamableHTTP 进行通信。
3.1 环境准备
首先,确保您有一个 Python 环境 (推荐 Python 3.8+)。我们将使用虚拟环境来管理依赖:
python -m venv mcp_env
# Windows
# mcp_env\Scripts\activate
# macOS/Linux
source mcp_env/bin/activate
3.2 安装依赖
pip install "mcp"
pip install aiohttp uvicorn
3.3 MCP Server 端实现 (echo_server.py
)
我们将创建一个简单的 MCP 服务器,它承载一个 EchoModel
。
import asyncio
import logging
from mcp import (
ProcessContext,
McpModel,
McpOptions,
McpRequest,
McpServer,
# McpTransport, # McpTransport 基类在此示例中未直接使用,可省略
content,
error,
)
from mcp.transport.streamable_http import StreamableHttpTransport # 关键导入
# 配置日志,方便观察
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' # 更详细的日志格式
)
logger = logging.getLogger(__name__) # 获取当前模块的 logger
class EchoModel(McpModel):
"""一个简单的回显模型,流式处理输入并流式返回"""
async def handle_request(self, request: McpRequest, process_context: ProcessContext):
request_id = request.request_id # 获取请求 ID 用于日志追踪
logger.info(f"EchoModel [{request_id}] received request.")
if request.content_type != content.CONTENT_TYPE_TEXT:
logger.warning(f"EchoModel [{request_id}] received unsupported content type: {request.content_type}")
# 抛出 MCPError 会被服务器框架捕获并发送给客户端
raise error.McpError(
error_type=error.ERROR_TYPE_BAD_REQUEST,
message=f"Unsupported content type: {request.content_type}. Expected '{content.CONTENT_TYPE_TEXT}'.",
)
try:
# 开始响应,指定内容类型
await process_context.begin_response(content_type=content.CONTENT_TYPE_TEXT)
logger.info(f"EchoModel [{request_id}] started response stream.")
# 流式读取请求内容并逐块回显
chunk_count = 0
async for chunk in request.read_chunks():
chunk_count += 1
if isinstance(chunk, bytes):
text_chunk = chunk.decode('utf-8')
logger.info(f"EchoModel [{request_id}] echoing chunk #{chunk_count}: '{text_chunk}'")
await process_context.write_chunk(text_chunk.encode('utf-8'))
# 在实际应用中,这里可能是模型处理的延迟
await asyncio.sleep(0.1) # 模拟处理延迟,更清晰地观察流式效果
else:
# 理论上 read_chunks() 应该总是返回 bytes,这是一个防御性日志
logger.warning(f"EchoModel [{request_id}] received non-bytes chunk (type: {type(chunk)}): {chunk}")
if chunk_count == 0:
logger.info(f"EchoModel [{request_id}] received no chunks in request body.")
# 即使没有输入块,也可能需要发送一个空响应或特定响应
# await process_context.write_chunk(b"Received empty stream.\n")
# 结束响应流
await process_context.end_response()
logger.info(f"EchoModel [{request_id}] finished response stream successfully.")
except error.McpError as e: # 捕获可预期的 MCP 错误
logger.error(f"EchoModel [{request_id}] McpError during handling: {e.message}")
# 重新抛出,让服务器框架处理
raise
except Exception as e:
logger.error(f"EchoModel [{request_id}] Unhandled exception during request processing: {e}", exc_info=True)
# 对于未处理的异常,构造并发送一个标准的 MCP 内部错误
# 确保在发送错误前响应流没有开始,或者以错误方式结束
if not process_context.response_begun:
await process_context.begin_response(
content_type=content.CONTENT_TYPE_MCP_ERROR,
is_error=True,
)
await process_context.write_chunk(
error.McpError(
error_type=error.ERROR_TYPE_INTERNAL, message=f"Internal server error: {str(e)}"
).to_json().encode('utf-8')
)
elif not process_context.response_ended: # 如果流已开始但未结束
# 尝试写入错误信息,但这可能因流的状态而出错
try:
await process_context.write_chunk(
error.McpError(
error_type=error.ERROR_TYPE_INTERNAL, message=f"Error mid-stream: {str(e)}"
).to_json().encode('utf-8')
)
except Exception as write_err:
logger.error(f"EchoModel [{request_id}] Could not write error to stream: {write_err}")
if not process_context.response_ended:
await process_context.end_response() # 总是尝试结束响应
async def main():
options = McpOptions() # 使用默认选项
models = {"echo/v1": EchoModel()} # 注册模型及其键
# 配置 StreamableHttpTransport
# 默认监听: host="localhost", port=8080, path="/mcp"
# 若要监听所有接口,可使用 host="0.0.0.0"
transport = StreamableHttpTransport(host="localhost", port=8080, path="/mcp")
logger.info(f"Initializing StreamableHTTP MCP server on http://{transport.host}:{transport.port}{transport.path}")
server = McpServer(options=options, models=models, transports=[transport])
try:
logger.info("Starting MCP server...")
await server.serve() # 运行服务器直到被中断
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received, shutting down server...")
except Exception as e:
logger.error(f"Server failed to run: {e}", exc_info=True)
finally:
logger.info("Attempting to shut down the server gracefully...")
await server.shutdown()
logger.info("Server has been shut down.")
if __name__ == "__main__":
asyncio.run(main())
代码解读 (Server):
- 导入模块:导入了 MCP SDK 的核心组件以及
StreamableHttpTransport
。 EchoModel(McpModel)
:- 继承自
McpModel
,这是实现自定义模型的标准方式。 handle_request
是核心方法,当服务器收到针对此模型的请求时被调用。- 它首先检查
content_type
,然后使用process_context.begin_response()
开始一个流式响应。 - 通过
async for chunk in request.read_chunks():
异步迭代读取客户端发送的流式数据块。 process_context.write_chunk()
将数据块写回给客户端。process_context.end_response()
标记响应流结束。- 包含了基本的错误处理逻辑。
- 继承自
main()
函数:- 创建
McpOptions
和模型字典models
。我们将EchoModel
实例注册到路径echo/v1
。 - 关键:
transport = StreamableHttpTransport()
创建了一个 StreamableHTTP 传输实例。默认情况下,它监听localhost:8080
的/mcp
路径。 McpServer
用选项、模型和传输方式列表进行初始化。server.serve()
启动服务器,server.shutdown()
用于优雅关闭。
- 创建
3.4 MCP Client 端实现 (echo_client.py
)
现在我们来编写一个客户端,它将连接到上述服务器,发送流式文本,并接收和打印服务器的流式回显。
import asyncio
import logging
from mcp import McpConnection, McpRequestOptions, content, error
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' # 更详细的日志格式
)
logger = logging.getLogger(__name__) # 获取当前模块的 logger
async def stream_data_generator(data_list, delay=0.2): # 稍微调整延迟以观察
"""一个异步生成器,模拟流式发送数据块"""
logger.info("Client data stream generator started.")
for i, item in enumerate(data_list):
chunk_to_send = item.encode('utf-8')
logger.info(f"Client sending chunk #{i+1}: '{item}'")
yield chunk_to_send
await asyncio.sleep(delay) # 模拟数据块之间的处理或网络延迟
logger.info("Client data stream generator finished.")
async def main():
# 服务器的 MCP 端点 URL。确保它与服务器配置完全匹配。
# `mcp+http` 表示使用 StreamableHTTP (非加密)
# `localhost:8080` 是服务器监听的地址和端口
# `/mcp` 是服务器 StreamableHttpTransport 配置的路径
server_mcp_url = "mcp+http://localhost:8080/mcp"
logger.info(f"Attempting to connect to MCP server at: {server_mcp_url}")
try:
# 建立到 MCP 服务器的连接。
# 对于 StreamableHTTP,这通常意味着客户端会发起一个 HTTP 请求 (可能升级到 WebSocket 或使用长轮询/chunked encoding)
async with McpConnection.create(target_url=server_mcp_url) as conn:
logger.info(f"Successfully connected to MCP server: {server_mcp_url}")
request_options = McpRequestOptions(
model_key="echo/v1", # 必须与服务器端注册的模型键匹配
content_type=content.CONTENT_TYPE_TEXT, # 指定请求体的内容类型
# accept_content_types=[content.CONTENT_TYPE_TEXT] # 可以指定期望的响应类型
)
# 准备要流式发送的数据
data_to_stream = ["Hello, ", "MCP Server! ", "This is a ", "streaming test."]
# 创建一个流式请求并获取响应处理器
# request_body_generator 参数接收一个异步生成器
logger.info(f"Client initiating streaming request for model '{request_options.model_key}'...")
response_handler = await conn.streaming_request(
request_options=request_options,
request_body_generator=stream_data_generator(data_to_stream)
)
logger.info(f"Client request sent. Request ID: {response_handler.request_id}. Waiting for response...")
# 检查响应元数据
logger.info(f"Response Content-Type: {response_handler.response_content_type}")
logger.info(f"Response Is Error: {response_handler.is_error}")
if response_handler.is_error:
logger.error("Server indicated an error in response.")
# 如果是错误,内容通常是 McpError 的 JSON 序列化形式
error_content_bytes = b""
async for chunk in response_handler.read_response_chunks():
error_content_bytes += chunk
try:
mcp_err = error.McpError.from_json(error_content_bytes.decode('utf-8'))
logger.error(f"MCP Error from server: Type='{mcp_err.error_type}', Message='{mcp_err.message}'")
except Exception as e:
logger.error(f"Could not parse MCP error from response: {error_content_bytes.decode('utf-8')}, Parse Error: {e}")
return # 错误发生,提前退出
# 流式接收和打印响应
full_response_text = ""
logger.info("Client receiving stream from server:")
chunk_count = 0
async for chunk in response_handler.read_response_chunks():
chunk_count +=1
if isinstance(chunk, bytes):
text_chunk = chunk.decode('utf-8')
print(text_chunk, end='', flush=True) # 实时打印,不带换行
full_response_text += text_chunk
else:
# 理论上 read_response_chunks() 应该总是返回 bytes
logger.warning(f"Client received non-bytes chunk (type: {type(chunk)}): {chunk}")
print() # 在所有块接收完毕后打印一个换行符
if chunk_count == 0:
logger.info("Client received an empty response stream.")
logger.info(f"Client received full response: '{full_response_text}'")
except ConnectionRefusedError:
logger.error(f"Connection refused. Ensure the MCP server is running at {server_mcp_url} and accessible.")
except error.McpError as e: # 捕获客户端操作中可能发生的 MCP 特定错误
logger.error(f"An MCPError occurred on the client-side: {e.message} (Type: {e.error_type})")
except Exception as e:
logger.error(f"An unexpected error occurred in the client: {e}", exc_info=True)
if __name__ == "__main__":
asyncio.run(main())
代码解读 (Client):
server_mcp_url
:定义了 MCP 服务器的 StreamableHTTP 端点。注意前缀是mcp+http
(或mcp+https
如果启用了 TLS)。路径/mcp
必须与服务器端StreamableHttpTransport
配置的path
一致。McpConnection.create(target_url=server_mcp_url)
:异步上下文管理器,用于建立和管理到服务器的连接。McpRequestOptions
:指定要调用的模型 (echo/v1
) 和内容类型。stream_data_generator
:这是一个异步生成器,用于模拟客户端流式发送请求体。在实际应用中,这可能来自文件、用户输入或其他流式来源。conn.streaming_request(...)
:发起一个流式请求。它需要request_options
和一个可选的request_body_generator
。此方法返回一个response_handler
对象。response_handler.read_response_chunks()
:异步迭代读取服务器返回的响应数据块。- 客户端代码会逐块打印服务器回显的内容。
3.5 运行与结果
启动服务器:
打开一个终端,激活虚拟环境,然后运行服务器脚本:python echo_server.py
INFO:__main__:Starting StreamableHTTP MCP server on localhost:8080/mcp INFO:aiohttp.access:127.0.0.1 [DD/Mon/YYYY:HH:MM:SS +0000] "GET /mcp HTTP/1.1" 101 - "-" "-" # (这是 WebSocket 升级请求,或者 HTTP streaming 初始请求)
运行客户端:
打开另一个终端,激活虚拟环境,然后运行客户端脚本:python echo_client.py
客户端输出:
INFO:__main__:Connected to MCP server at mcp+http://localhost:8080/mcp
INFO:__main__:Client sent request for model 'echo/v1'
INFO:__main__:Client sending chunk: 'Hello, '
INFO:__main__:Response Content-Type: text/plain
INFO:__main__:Client receiving stream:
Hello, INFO:__main__:Client sending chunk: 'MCP over HTTP! '
MCP over HTTP! INFO:__main__:Client sending chunk: 'This is '
This is INFO:__main__:Client sending chunk: 'a stream.'
a stream.
INFO:__main__:
Client received full response: 'Hello, MCP over HTTP! This is a stream.'
服务器端额外输出 (对应一次客户端连接和请求):
INFO:__main__:EchoModel received request: <some_request_id>
INFO:__main__:EchoModel [<some_request_id>] started response stream.
INFO:__main__:EchoModel [<some_request_id>] echoing chunk: 'Hello, '
INFO:__main__:EchoModel [<some_request_id>] echoing chunk: 'MCP over HTTP! '
INFO:__main__:EchoModel [<some_request_id>] echoing chunk: 'This is '
INFO:__main__:EchoModel [<some_request_id>] echoing chunk: 'a stream.'
INFO:__main__:EchoModel [<some_request_id>] finished response stream.
客户端逐块发送数据,服务器也逐块接收并回显,客户端再逐块接收并打印。这清晰地展示了双向流式通信的过程。
4. 总结
StreamableHTTP 的引入不仅仅是一个传输选项的增加,它对 MCP 生态和 LLM 应用架构可能带来深远影响:
简化云原生部署:
- HTTP 是云原生环境的通用语言。MCP 服务可以更容易地容器化,并通过 Kubernetes 等编排工具进行管理。
- 可以无缝集成到 Service Mesh (如 Istio, Linkerd) 中,利用其流量管理、可观察性和安全特性。
- API 网关可以轻松地将 MCP 服务暴露给外部消费者,并处理认证、速率限制等。
提升互操作性:
- 任何支持标准 HTTP 客户端库的语言或平台都可以相对容易地与 MCP StreamableHTTP 服务器交互,即使没有原生的 MCP SDK。开发者只需理解 MCP 的消息分帧(通常是长度前缀或其他分隔符,具体取决于 StreamableHTTP 在 MCP 中的实现细节)和核心协议语义。
- 这为构建异构系统和集成现有 Web 服务提供了便利。
性能考量:
- HTTP/1.1 的 Chunked Encoding 已经相当成熟。
- HTTP/2 的流多路复用可以显著减少延迟,尤其是在高并发场景下,因为它允许在单个 TCP 连接上并行处理多个请求/响应流,避免了队头阻塞问题。MCP SDK 和服务器实现需要支持 HTTP/2 才能充分发挥此优势。
- 持久连接减少了 TCP 和 TLS握手的开销。
安全性:
- 可以直接利用 HTTPS (HTTP over TLS) 来确保传输层安全,这是 Web 安全的标准做法。
- HTTP Headers 可以方便地承载认证令牌 (如 JWT Bearer tokens),与常见的 Web 认证方案兼容。
未来展望与挑战:
- HTTP/3 (QUIC):随着 HTTP/3 的普及,未来 MCP StreamableHTTP 可能会支持基于 QUIC 的传输,进一步改善连接建立速度和弱网环境下的表现。
- 标准化细节:确保 MCP 消息在 HTTP 流中的分帧方式、错误处理、元数据约定等细节得到清晰的标准化和文档化至关重要。这包括如何区分不同的 MCP 消息(如果一个 HTTP 流承载多个独立的 MCP 消息)以及如何处理特定于 MCP 的错误与 HTTP 级别的错误。
- 开发者工具:需要更好的开发者工具来调试和监控 StreamableHTTP 上的 MCP 通信,例如能够解析和显示 MCP 消息流的代理工具。
与 gRPC 的比较:
gRPC 是另一个流行的基于 HTTP/2 的高性能 RPC 框架,也支持双向流。- 相似之处:都利用 HTTP/2 实现高效双向流。
- 不同之处:
- Schema 定义:gRPC 依赖 Protocol Buffers 进行接口定义和序列化,提供了强类型和代码生成。MCP 本身更侧重于上下文和交互模式,内容类型相对灵活 (如 JSON, text, bytes)。
- 生态和侧重点:gRPC 更通用,用于各种微服务通信。MCP 专为模型交互设计,其核心概念(如上下文管理、模型能力协商等)是其特有价值。
- StreamableHTTP for MCP 提供了在不引入 Protobuf 依赖的情况下,利用 HTTP/2 (或 HTTP/1.1) 流能力的途径。对于希望保持简单或已有 JSON/text 기반 API 的场景可能更有吸引力。