前言
上一篇演示了接入公网的高德地图sse服务,有人说我贴的代码不全,确实有些自定义工具类我不可能全部复制过来,复杂的功能大家一般也都会拆分开避免单文件过大,要查看完整代码还是去看完整项目的好,
这篇文章接入本地/内网的mcp服务实现联网搜索
1,先看最终效果
2,新建一个mcp服务项目或模块
引入pom依赖
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-starter-mcp-server-webmvc</artifactId>
<version>${spring-ai.version}</version>
</dependency>
<!-- Spring AI MCP 核心包 (手动实现SSE) -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-mcp</artifactId>
<version>${spring-ai.version}</version>
</dependency>
<!-- Spring AI Model (基础接口) -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-model</artifactId>
<version>${spring-ai.version}</version>
</dependency>
application.yml添加必要的mcp服务信息,
服务端口假设9099
server:
port: 9099
spring:
ai:
mcp:
server:
enabled: true
type: SYNC
name: "LocalMcpServer"
version: "1.0.0"
stdio: false
sse-message-endpoint: "/mcp/message"
sse-endpoint: "/sse"
# MCP 服务能力配置
capabilities:
tool: true
resource: true
prompt: true
completion: false
roots: false
sampling: false
3,创建sse接收端点
private final ConcurrentMap<String, SseEmitter> clients = new ConcurrentHashMap<>();
/**
* SSE端点 - 匹配MCP客户端期望的路径
*/
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter sse(@RequestParam(value = "clientId", defaultValue = "default") String clientId) {
logger.info("🔗 新SSE连接: clientId={}", clientId);
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); // 无超时
clients.put(clientId, emitter);
// 发送初始事件
try {
emitter.send(SseEmitter.event()
.name("connect")
.data("SSE连接已建立 - clientId: " + clientId));
emitter.send(SseEmitter.event()
.name("mcp-ready")
.data("{\"jsonrpc\":\"2.0\",\"method\":\"notifications/initialized\",\"params\":{}}"));
} catch (IOException e) {
logger.error("发送初始SSE事件失败: {}", e.getMessage());
clients.remove(clientId);
emitter.complete();
}
// 设置完成和错误处理
emitter.onCompletion(() -> {
logger.info("🔌 SSE连接关闭: clientId={}", clientId);
clients.remove(clientId);
});
emitter.onError((ex) -> {
logger.error("❌ SSE连接错误: clientId={}, error={}", clientId, ex.getMessage());
clients.remove(clientId);
});
emitter.onTimeout(() -> {
logger.warn("⏰ SSE连接超时: clientId={}", clientId);
clients.remove(clientId);
});
return emitter;
}
4,获取联网搜索的key
https://www.searchapi.io/
登录注册即可拿到一个key,实际使用时直接
curl --get https://www.searchapi.io/api/v1/search \
-d engine="baidu" \
-d q="ERNIE Bot"
api_key可以放请求头,实测也可以直接当普通参数与q一起拼接到url后面,
更多的可选参数可以参考:https://www.searchapi.io/docs/baidu#api-parameters-search-query
将拿到的key配置到yml中随时读取
5,添加联网搜索工具类
/**
* 联网搜索工具配置类,提供基于SearchAPI的搜索功能。
*/
@Configuration
public class SearchToolsConfiguration {
private static final Logger logger = LoggerFactory.getLogger(SearchToolsConfiguration.class);
private static final int DEFAULT_NUM_RESULTS = 50;
private static final int MAX_NUM_RESULTS = 100;
@Value("${search.url}")
private String searchApiUrl;
private final RestTemplate restTemplate = new RestTemplate();
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* 搜索工具集合:包含所有联网搜索相关的工具。
*/
@Bean
public List<McpServerFeatures.SyncToolSpecification> searchTools() {
return List.of(createSearchWebTool());
}
/**
* 创建联网搜索工具。
*/
private McpServerFeatures.SyncToolSpecification createSearchWebTool() {
logger.info("创建联网搜索工具,API URL: {}", searchApiUrl);
String schemaJson = """
{
"type": "object",
"properties": {
"q": {
"type": "string",
"description": "搜索查询内容,必填参数"
},
"num": {
"type": "integer",
"description": "返回结果数量,可选参数,默认50,最大100",
"minimum": 1,
"maximum": 100,
"default": 50
}
},
"required": ["q"]
}
""";
McpSchema.Tool tool = new McpSchema.Tool(
"search_web",
"执行联网搜索并返回分页的结构化搜索结果",
schemaJson
);
return new McpServerFeatures.SyncToolSpecification(
tool,
(exchange, arguments) -> {
try {
String query = (String) arguments.get("q");
if (query == null || query.trim().isEmpty()) {
return new McpSchema.CallToolResult("错误:搜索查询内容不能为空", true);
}
Integer num = DEFAULT_NUM_RESULTS;
if (arguments.containsKey("num")) {
Object numValue = arguments.get("num");
if (numValue instanceof Number) {
num = ((Number) numValue).intValue();
} else if (numValue instanceof String) {
try {
num = Integer.parseInt((String) numValue);
} catch (NumberFormatException e) {
logger.warn("无效的num参数值: {}, 使用默认值: {}", numValue, DEFAULT_NUM_RESULTS);
}
}
// 限制结果数量在合理范围内
num = Math.max(1, Math.min(num, MAX_NUM_RESULTS));
}
logger.info("执行联网搜索: query='{}', num={}", query, num);
// 构建完整的搜索URL,使用UriComponents处理特殊字符
UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(searchApiUrl);
String url = builder.toUriString();
// 手动构建查询参数,确保正确编码
StringBuilder queryString = new StringBuilder();
queryString.append("q=").append(UriUtils.encode(query, "UTF-8"));
queryString.append("&num=").append(num);
// 如果URL已有查询参数,追加到现有参数
String fullUrl = url;
if (url.contains("?")) {
fullUrl = url + "&" + queryString.toString();
} else {
fullUrl = url + "?" + queryString.toString();
}
URI searchUri = URI.create(fullUrl);
logger.debug("搜索API请求URL: {}", searchUri);
// 调用搜索API
String response = restTemplate.getForObject(searchUri, String.class);
if (response == null || response.trim().isEmpty()) {
return new McpSchema.CallToolResult("错误:搜索API返回空结果", true);
}
// 解析并格式化结果
try {
Map<String, Object> searchResult = objectMapper.readValue(response, Map.class);
// 检查搜索结果状态
if (searchResult.containsKey("search_metadata")) {
Map<String, Object> metadata = (Map<String, Object>) searchResult.get("search_metadata");
String status = (String) metadata.get("status");
if (!"Success".equals(status)) {
logger.warn("搜索API返回非成功状态: {}", status);
return new McpSchema.CallToolResult("搜索失败,API返回状态: " + status, true);
}
}
// 提取有用的搜索结果
List<Map<String, Object>> organicResults = (List<Map<String, Object>>) searchResult.get("organic_results");
if (organicResults == null || organicResults.isEmpty()) {
return new McpSchema.CallToolResult("无内容", false);
}
// 直接返回organic_results的原始内容
String organicResultsJson = objectMapper.writeValueAsString(organicResults);
return new McpSchema.CallToolResult(organicResultsJson, false);
} catch (Exception e) {
logger.error("解析搜索结果失败", e);
// 如果无法解析JSON,直接返回原始响应
return new McpSchema.CallToolResult(response, false);
}
} catch (Exception e) {
logger.error("联网搜索失败", e);
return new McpSchema.CallToolResult("搜索错误: " + e.getMessage(), true);
}
}
);
}
}
6,实际调用
配置与前一章节的类似,在前端的mcp管理页面新增mcp,表单json填入:
{
"mcpServers": {
"LocalMcpServer": {
"url": "http://127.0.0.1:9099",
"type": "sse",
"sseEndpoint": "/sse"
}
}
}
后端对所有管理的模型做了维护,实际调用是从工厂取出再决定要不要使用mcp工具
/**
* 输出处理后端流式结果
* 优化流式输出处理逻辑:
* 1. 在开始阶段或空字符串输出时,状态保持为"think",表示模型正在思考
* 2. 只有当检测到有意义的内容(中文、数字、字母等)时,才将状态改为"running"
* 3. 在流结束时明确添加一个"stop"状态的消息,确保前端能正确处理结束状态
*
* @param messageList 模型消息,包括系统提示词、用户提示词、历史对话和媒体文件
* @param myModel 指定模型对象
* @param body 用户请求参数
* @return 处理后的FluxVO流
*/
private Flux<FluxVO> getFluxVOFlux(List<Message> messageList, AiModel myModel, QuestionVO body) {
Prompt prompt = new Prompt(messageList);
AtomicBoolean inThinking = new AtomicBoolean(false);
StringBuffer outputText = body.getMemory() ? new StringBuffer() : null;
ChatClient chatModel = myModel.getChatClient();
// 1. 先构造 Publisher<ChatResponse>
Flux<ChatResponse> publisher;
if (body.getUseTools()) {
List<ToolCallback> toolCallbacks = dynamicMcpClientManager.getAvailableToolCallbacks();
publisher = chatModel.prompt(prompt).toolCallbacks(toolCallbacks).stream().chatResponse();
} else {
publisher = chatModel.prompt(prompt).stream().chatResponse();
}
// 主动推送一条“处理中”消息
Flux<FluxVO> proactiveMsg = Flux.just(
FluxVO.builder().text("").status("before").build()
);
Flux<FluxVO> resp = Flux.from(publisher)
.doFirst(() -> {
System.out.println("-------------开始输出");
if (body.getMemory()) {
chatMemoryService.saveMessage(body);
}
})
.map(response -> {
String text = response.getResult().getOutput().getText();
if (text == null) {
text = "";
}
// 处理工具使用信息
if (!response.getResult().getOutput().getToolCalls().isEmpty()) {
for (AssistantMessage.ToolCall toolCall : response.getResult().getOutput().getToolCalls()) {
System.out.println("==================调用mcp工具====================");
System.out.println(toolCall.name());
}
}
if ("<think>".equals(text)) {
inThinking.set(true);
} else if ("</think>".equals(text)) {
inThinking.set(false);
}
boolean isStop = response.getResult().getMetadata().getFinishReason() != null && !response.getResult().getMetadata().getFinishReason().isEmpty();
String status = inThinking.get() ? "think" : (isStop ? "stop" : "running");
if (outputText != null) {
outputText.append(text);
}
return FluxVO.builder()
.text(text)
.status(status)
.build();
})
.doFinally(signalType -> {
System.out.println("-------------流式处理结束");
if (body.getMemory() && outputText != null) {
chatMemoryService.saveMessage(body.getSessionId(), "ASSISTANT", outputText.toString(), body.getModel());
}
})
.onErrorResume(error -> {
System.err.println("流式处理异常: " + error.getMessage());
return Flux.just(FluxVO.builder()
.text("AI服务异常,请稍后重试")
.status("stop")
.build());
});
// 先推 proactiveMsg,再推 publisher
return Flux.concat(proactiveMsg, resp);
}
7,其他
后端的动态mcp管理类做了重大优化,支持服务运行时动态添加mcp服务、定期健康检查、定时重连等,还是推荐去看我的完整目录
https://gitee.com/luckylanyu/springai-novel