[mcp: McpSchema]-源码分析

发布于:2025-08-01 ⋅ 阅读:(15) ⋅ 点赞:(0)

JSONRPCMessage

JSONRPCMessage 是一个密封接口,表示 JSON-RPC 协议中的请求、通知和响应消息,提供统一的协议版本标识方法。

public sealed interface JSONRPCMessage permits JSONRPCRequest, JSONRPCNotification, JSONRPCResponse {
	String jsonrpc();
}

JSONRPCRequest

JSONRPCRequest 是 JSON-RPC 2.0 协议中的一种消息类型,用于客户端向服务器发送请求,包含方法、参数和请求ID。

@JsonInclude(JsonInclude.Include.NON_ABSENT)
@JsonIgnoreProperties(ignoreUnknown = true)
public record JSONRPCRequest( // @formatter:off
	@JsonProperty("jsonrpc") String jsonrpc,
	@JsonProperty("method") String method,
	@JsonProperty("id") Object id,
	@JsonProperty("params") Object params) implements JSONRPCMessage {
} // @formatter:on

Request

Request 是一个密封接口,表示不同类型的请求,如初始化、工具调用、消息创建等,提供一个 meta() 方法来获取元数据,并支持通过 progressToken() 方法提取进度令牌。

public sealed interface Request
	permits InitializeRequest, CallToolRequest, CreateMessageRequest, ElicitRequest, CompleteRequest,
	GetPromptRequest, ReadResourceRequest, SubscribeRequest, UnsubscribeRequest, PaginatedRequest {

	Map<String, Object> meta();

	default String progressToken() {
		if (meta() != null && meta().containsKey("progressToken")) {
			return meta().get("progressToken").toString();
		}
		return null;
	}
}

JSONRPCNotification

JSONRPCNotification 是 JSON-RPC 2.0 协议中的一种消息类型,用于服务器向客户端发送不需要响应的通知。

@JsonInclude(JsonInclude.Include.NON_ABSENT)
@JsonIgnoreProperties(ignoreUnknown = true)
public record JSONRPCNotification( // @formatter:off
	@JsonProperty("jsonrpc") String jsonrpc,
	@JsonProperty("method") String method,
	@JsonProperty("params") Object params) implements JSONRPCMessage {
} // @formatter:on

Notification

Notification 是一个密封接口,表示不同类型的通知,如进度通知、日志消息通知、资源更新通知等,提供一个 meta() 方法来获取通知的元数据。

public sealed interface Notification
	permits ProgressNotification, LoggingMessageNotification, ResourcesUpdatedNotification {

	Map<String, Object> meta();

}

JSONRPCResponse

JSONRPCResponse 是 JSON-RPC 2.0 协议中的一种消息类型,用于表示服务器对客户端请求的响应,包含结果或错误信息。

@JsonInclude(JsonInclude.Include.NON_ABSENT)
@JsonIgnoreProperties(ignoreUnknown = true)
public record JSONRPCResponse( // @formatter:off
	@JsonProperty("jsonrpc") String jsonrpc,
	@JsonProperty("id") Object id,
	@JsonProperty("result") Object result,
	@JsonProperty("error") JSONRPCError error) implements JSONRPCMessage {

	@JsonInclude(JsonInclude.Include.NON_ABSENT)
	@JsonIgnoreProperties(ignoreUnknown = true)
	public record JSONRPCError(
		@JsonProperty("code") int code,
		@JsonProperty("message") String message,
		@JsonProperty("data") Object data) {
	}
}// @formatter:on

Result

Result 是一个密封接口,用于表示不同操作的结果,并提供一个 meta() 方法来获取相关的元数据。

public sealed interface Result 
	permits InitializeResult, ListResourcesResult, ListResourceTemplatesResult, 
	ReadResourceResult, ListPromptsResult, GetPromptResult, ListToolsResult, CallToolResult,
	CreateMessageResult, ElicitResult, CompleteResult, ListRootsResult {

	Map<String, Object> meta();

}

McpClientSession

/**
 * Sends a JSON-RPC request and returns the response.
 * @param <T> The expected response type
 * @param method The method name to call
 * @param requestParams The request parameters
 * @param typeRef Type reference for response deserialization
 * @return A Mono containing the response
 */
@Override
public <T> Mono<T> sendRequest(String method, Object requestParams, TypeReference<T> typeRef) {
	String requestId = this.generateRequestId();

	return Mono.deferContextual(ctx -> Mono.<McpSchema.JSONRPCResponse>create(sink -> {
		this.pendingResponses.put(requestId, sink);
		McpSchema.JSONRPCRequest jsonrpcRequest = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, method,
				requestId, requestParams);
		this.transport.sendMessage(jsonrpcRequest)
			.contextWrite(ctx)
			// TODO: It's most efficient to create a dedicated Subscriber here
			.subscribe(v -> {
			}, error -> {
				this.pendingResponses.remove(requestId);
				sink.error(error);
			});
	})).timeout(this.requestTimeout).handle((jsonRpcResponse, sink) -> {
		if (jsonRpcResponse.error() != null) {
			logger.error("Error handling request: {}", jsonRpcResponse.error());
			sink.error(new McpError(jsonRpcResponse.error()));
		}
		else {
			if (typeRef.getType().equals(Void.class)) {
				sink.complete();
			}
			else {
				sink.next(this.transport.unmarshalFrom(jsonRpcResponse.result(), typeRef));
			}
		}
	});
}

/**
 * Sends a JSON-RPC notification.
 * @param method The method name for the notification
 * @param params The notification parameters
 * @return A Mono that completes when the notification is sent
 */
@Override
public Mono<Void> sendNotification(String method, Object params) {
	McpSchema.JSONRPCNotification jsonrpcNotification = new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION,
			method, params);
	return this.transport.sendMessage(jsonrpcNotification);
}


网站公告

今日签到

点亮在社区的每一天
去签到