【MCP Node.js SDK 全栈进阶指南】中级篇(5):MCP客户端高级开发

发布于:2025-05-01 ⋅ 阅读:(13) ⋅ 点赞:(0)

在前面的系列文章中,我们主要关注了服务器端的开发,包括基础服务器构建、资源设计、身份验证与授权以及错误处理与日志系统。本篇文章将转向客户端,探讨MCP TypeScript-SDK在客户端开发中的高级应用。

客户端开发是MCP应用的重要组成部分,它连接了用户与服务器,提供了交互界面和功能入口。一个设计良好的客户端不仅能提供流畅的用户体验,还能确保与服务器的高效通信和数据处理。在本文中,我们将深入探讨MCP客户端API的高级用法、自定义客户端封装、跨平台客户端设计以及客户端性能优化等关键主题。

一、客户端API高级用法

MCP TypeScript-SDK提供了强大而灵活的客户端API,使开发者能够与服务器进行高效交互。让我们从基础开始,逐步深入探索其高级用法。

1.1 客户端初始化与配置

在使用MCP客户端之前,需要正确初始化并配置相关参数。以下是一个完整的客户端初始化示例:

import {
    McpClient, HttpTransport } from '@modelcontextprotocol/sdk';

// 基本客户端初始化
const client = new McpClient({
   
  // 客户端唯一标识符
  clientId: 'my-mcp-client-v1',
  
  // 传输层配置
  transport: new HttpTransport({
   
    baseUrl: 'https://api.example.com/mcp',
    
    // 自定义请求头
    headers: {
   
      'X-Client-Version': '1.0.0',
    },
    
    // 身份验证令牌(可选)
    authToken: process.env.MCP_AUTH_TOKEN,
    
    // 请求超时(毫秒)
    timeout: 30000,
    
    // 重试配置
    retry: {
   
      maxRetries: 3,
      initialDelay: 300,
      maxDelay: 5000,
      factor: 2,
      retryOnStatusCodes: [408, 429, 500, 502, 503, 504],
    },
  }),
  
  // 高级配置项
  options: {
   
    // 是否启用流式响应
    enableStreaming: true,
    
    // 扩展/插件配置
    plugins: [
      // 例如:日志插件
      createLoggerPlugin({
   
        level: 'info',
        logRequests: true,
        logResponses: true,
      }),
      
      // 例如:遥测插件
      createTelemetryPlugin({
   
        endpoint: 'https://telemetry.example.com',
        sampleRate: 0.1, // 采样率10%
      }),
    ],
    
    // 默认请求超时(毫秒)
    defaultTimeout: 60000,
    
    // 保持活动连接设置
    keepAlive: true,
    keepAliveMsecs: 30000,
    
    // 自定义序列化/反序列化
    serializer: {
   
      serialize: (data) => JSON.stringify(data),
      deserialize: (data) => JSON.parse(data),
    },
  },
});

// 客户端初始化完成后的操作
await client.initialize();

// 当不再需要客户端时,应正确关闭连接
process.on('SIGTERM', async () => {
   
  await client.shutdown();
});

这个示例涵盖了MCP客户端初始化的多个方面,包括传输层配置、身份验证、重试策略、插件系统和序列化选项等。根据项目需求,你可以调整这些参数以获得最佳性能和用户体验。

1.2 资源操作的高级技巧

MCP客户端提供了多种方式与服务器端资源进行交互。以下是一些高级用法示例:

// 1. 批量获取资源
const userResults = await client.resources.getMany('users', {
   
  ids: ['user-1', 'user-2', 'user-3'],
  include: ['profile', 'preferences'], // 包含相关资源
});

// 处理批量结果
for (const user of userResults.successful) {
   
  console.log(`成功获取用户: ${
     user.id}`);
}

for (const failure of userResults.failed) {
   
  console.error(`获取用户失败: ${
     failure.id}, 错误: ${
     failure.error.message}`);
}

// 2. 使用查询参数过滤资源
const activeUsers = await client.resources.query('users', {
   
  filter: {
   
    status: 'active',
    lastLogin: {
    $gt: '2023-01-01' },
    role: {
    $in: ['admin', 'editor'] },
  },
  sort: {
    lastLogin: 'desc' },
  limit: 50,
  page: 1,
  fields: ['id', 'name', 'email', 'role'], // 字段投影
});

// 3. 递归获取资源树
const projectWithAllAssets = await client.resources.get('projects', 'project-1', {
   
  depth: 3, // 递归深度
  include: ['tasks', 'assets', 'members'],
  fields: {
    // 每种资源类型的字段投影
    projects: ['id', 'name', 'description', 'createdAt'],
    tasks: ['id', 'title', 'status', 'assignee'],
    assets: ['id', 'name', 'type', 'url'],
    members: ['id', 'name', 'role'],
  },
});

// 4. 条件资源更新
const updateResult = await client.resources.update('documents', 'doc-1', {
   
  title: '更新的文档标题',
  content: '更新的内容...',
}, {
   
  // 仅当版本匹配时更新(乐观并发控制)
  ifMatch: {
    version: 5 },
  
  // 或者使用条件表达式
  ifCondition: {
   
    lastModifiedBy: currentUser.id, // 仅当最后修改者是当前用户
    status: {
    $ne: 'locked' }, // 且状态不是锁定的
  },
});

// 5. 部分更新(补丁操作)
const patchResult = await client.resources.patch('users', 'user-1', [
  {
    op: 'replace', path: '/email', value: 'new.email@example.com' },
  {
    op: 'add', path: '/tags', value: 'premium' },
  {
    op: 'remove', path: '/temporaryFlags/newUser' },
]);

// 6. 批量操作
const batchResults = await client.batch([
  {
    
    type: 'get', 
    resource: 'users', 
    id: 'user-1',
    params: {
    fields: ['id', 'name', 'email'] }
  },
  {
    
    type: 'update', 
    resource: 'documents', 
    id: 'doc-1',
    data: {
    status: 'published' }
  },
  {
    
    type: 'create', 
    resource: 'comments',
    data: {
    
      documentId: 'doc-1',
      content: '这是一条新评论',
      author: 'user-1'
    }
  },
]);

// 处理批量操作结果
batchResults.forEach((result, index) => {
   
  if (result.status === 'fulfilled') {
   
    console.log(`操作 ${
     index} 成功:`, result.value);
  } else {
   
    console.error(`操作 ${
     index} 失败:`, result.reason);
  }
});

// 7. 使用自定义操作
const documentStats = await client.resources.execute('documents', 'doc-1', 'calculateStatistics', {
   
  includeWordCount: true,
  includeReadingTime: true,
});

// 8. 订阅资源变更
const unsubscribe = await client.resources.subscribe('projects', 'project-1', (update) => {
   
  console.log(`项目更新:`, update);
  if (update.type === 'deleted') {
   
    // 处理资源被删除的情况
    unsubscribe(); // 取消订阅
  } else if (update.type === 'updated') {
   
    // 处理资源更新
    updateUI(update.data);
  }
});

这些示例展示了MCP客户端API在处理资源时的高级用法,包括批量操作、条件更新、部分更新、自定义操作和资源订阅等。通过这些高级API,你可以构建更加灵活和高效的客户端应用程序。

1.3 工具调用与流式处理

MCP客户端支持调用服务器定义的工具,并能够处理流式响应。以下是一些高级用法:

// 1. 基本工具调用
const translationResult = await client.tools.execute('translateText', {
   
  text: '你好,世界!',
  sourceLanguage: 'zh',
  targetLanguage: 'en',
});

console.log(`翻译结果: ${
     translationResult.translatedText}`);

// 2. 流式工具调用
const stream = await client.tools.executeStreaming('generateContent', {
   
  prompt: '请写一篇关于人工智能的短文',
  maxLength: 500,
  temperature: 0.7,
});

// 处理流式响应
for await (const chunk of stream) {
   
  if (chunk.type === 'content') {
   
    // 处理内容块
    console.log(chunk.content);
    updateUI(chunk.content);
  } else if (chunk.type === 'metadata') {
   
    // 处理元数据
    console.log('元数据:', chunk.data);
  } else if (chunk.type === 'error') {
   
    // 处理错误
    console.error('流处理错误:', chunk.error);
    break;
  }
}

// 3. 取消长时间运行的工具调用
const abortController = new AbortController();
const {
    signal } = abortController;

// 设置5秒后自动取消
setTimeout(() => {
   
  abortController.abort('操作超时');
}, 5000);

try {
   
  const result = await client.tools.execute('processLargeDataset', {
   
    datasetId: 'dataset-123',
    operations: ['analyze', 'transform', 'summarize'],
  }, {
    signal });
  
  console.log('处理完成:', result);
} catch (error) {
   
  if (error.name === 'AbortError') {
   
    console.log('操作已取消:', error.message);
  } else {
   
    console.error('处理错误:', error);
  }
}

// 4. 并行工具调用
const [weatherResult, translationResult, newsResult] = await Promise.all([
  client.tools.execute('getWeather', {
    city: '北京' }),
  client.tools.execute('translateText', {
    
    text: '美好的一天',
    sourceLanguage: 'zh',
    targetLanguage: 'en'
  }),
  client.tools.execute('fetchNews', {
    category: 'technology', count: 5 }),
]);

// 5. 工具调用重试与错误处理
try {
   
  const result = await client.tools.execute('unreliableOperation', {
    
    inputData: '测试数据'
  }, {
   
    retry: {
   
      maxRetries: 5,
      initialDelay: 500,
      maxDelay: 10000,
      factor: 2,
      retryCondition: (error) => {
   
        // 自定义重试条件
        return error.isTransient === true || 
               error.code === 'RESOURCE_UNAVAILABLE';
      },
      onRetry: (error, attempt) => {
   
        console.log(`重试尝试 ${
     attempt} 失败:`, error);
      },
    },
  });
  
  console.log('操作成功:', result);
} catch (error) {
   
  console.error('所有重试都失败:', error);
  
  if (error.details?.suggestions) {
   
    console.log('建议的解决方案:', error.details.suggestions);
  }
}

// 6. 复杂工具调用与上下文管理
const sessionContext = {
    sessionId: generateUniqueId() };

const step1 = await client.tools.execute('startWorkflow', {
    
  workflowType: 'dataProcessing',
  initialData: {
    /* ... */ },
}, {
    context: sessionContext });

// 使用来自第一步的数据和同一会话上下文
const step2 = await client.tools.execute('processWorkflowStep', {
   
  stepId: 'transform',
  workflowId: step1.workflowId,
  parameters: {
    /* ... */ },
}, {
    context: sessionContext });

// 最后一步,仍使用同一会话上下文
const result = await client.tools.execute('completeWorkflow', {
   
  workflowId: step1.workflowId,
  finalizeOptions: {
    /* ... */ },
}, {
    context: sessionContext });

这些示例展示了MCP客户端在工具调用方面的高级用法,包括流式处理、取消操作、并行调用、重试策略和上下文管理等。通过这些高级功能,你可以构建更加强大和灵活的客户端应用程序。

1.4 会话与状态管理

MCP客户端提供了会话和状态管理功能,使你能够维护客户端与服务器之间的状态一致性。以下是一些高级用法:

// 1. 创建和管理会话
const session = await client.sessions.create({
   
  // 会话初始化参数
  userProfile: {
   
    id: 'user-123',
    name: '张三',
    role: 'editor',
  },
  preferences: {
   
    theme: 'dark',
    language: 'zh-CN',
  },
  // 会话持久化选项
  persistence: {
   
    type: 'localStorage', // 或 'sessionStorage', 'memory', 'custom'
    key: 'mcp-user-session',
  },
});

// 获取会话信息
console.log(`会话ID: ${
     session.id}`);
console.log(`会话创建时间: ${
     session.createdAt}`);
console.log(`会话过期时间: ${
     session.expiresAt}`);

// 更新会话数据
await session.update({
   
  preferences: {
   
    theme: 'light', // 更新主题
  },
});

// 2. 状态同步和冲突解决
const documentState = await client.state.sync('document', 'doc-123', {
   
  // 本地状态
  localState: {
   
    content: '文档内容...',
    version: 5,
    lastEditedAt: new Date().toISOString(),
  },
  
  // 冲突解决策略
  conflictResolution: {
   
    strategy: 'lastWriteWins', // 或 'serverWins', 'clientWins', 'merge', 'manual'
    
    // 自定义合并函数(当strategy为'merge'时使用)
    merge: (clientState, serverState) => {
   
      return {
   
        ...serverState,
        content: clientState.content, // 使用客户端内容
        version: Math.max(clientState.version, serverState.version) + 1,
        lastEditedAt: new Date().toISOString(),
        mergedAt: new Date().toISOString(),
      };
    },
    
    // 手动冲突解决回调(当strategy为'manual'时使用)
    onConflict: async (clientState, serverState) => {
   
      // 显示冲突解决UI
      const resolution = await showConflictResolutionDialog(
        clientState, 
        serverState
      );
      
      return resolution;
    },
  },
  
  // 差异计算选项(用于优化网络传输)
  diff: {
   
    enabled: true,
    algorithm: 'jsonpatch', // 或 'custom'
    
    // 自定义差异计算(当algorithm为'custom'时使用)
    calculate: (oldState, newState) => {
   
      // 计算状态差异
      // 返回最小差异表示
    },
    
    // 自定义差异应用(当algorithm为'custom'时使用)
    apply: (state, diff) => {
   
      // 应用差异到状态
      // 返回新状态
    },
  },
});

// 3. 实时状态订阅
const unsubscribe = await client.state.subscribe('chatRoom', 'room-456', {
   
  // 初始同步选项
  initialSync: true,
  
  // 状态更新处理
  onUpdate: (newState, previousState) => {
   
    console.log('聊天室状态更新:', newState);
    updateChatUI(newState);
  },
  
  // 错误处理
  onError: (error) => {
   
    console.error('状态订阅错误:', error);
    reconnectIfNeeded(error);
  },
  
  // 差异处理(可选,用于优化)
  processDiff: true,
});

// 4. 离线状态管理与同步
const offlineManager = client.offline.createManager({
   
  states: ['documents', 'tasks', 'contacts'],
  
  // 存储配置
  storage: {
   
    type: 'indexedDB', // 或 'localStorage', 'custom'
    databaseName: 'mcp-offline-data',
    version: 1,
  },
  
  // 同步配置
  sync: {
   
    // 自动同步间隔(毫秒)
    interval: 60000, // 1分钟
    
    // 同步触发器
    triggers: {
   
      onNetworkReconnect: true,
      onAppForeground: true,
      onStateChange: true,
    },
    
    // 同步优先级
    priorities: {
   
      'documents': 10, // 高优先级
      'tasks': 5,     // 中优先级
      'contacts': 1,  // 低优先级
    },
    
    // 冲突解决策略(可以针对不同状态类型设置不同策略)
    conflictResolution: {
   
      'documents': 'merge',
      'tasks': 'serverWins',
      'contacts': 'lastWriteWins',
    },
  },
  
  // 离线操作队列
  operations: {
   
    // 最大队列长度
    maxQueueLength: 1000,
    
    // 操作压缩(合并多个操作)
    compress: true,
    
    // 操作优先级
    prioritize: (op1, op2) => {
   
      // 返回优先级比较结果
      return op1.priority - op2.priority;
    },
  },
});

// 手动触发同步
await offlineManager.syncNow({
   
  states: ['documents'], // 只同步文档
  force: true, // 强制同步,忽略时间间隔限制
});

// 获取同步状态
const syncStatus = offlineManager.getSyncStatus();
console.log('上次同步时间:', syncStatus.lastSyncTime);
console.log('同步进度:', syncStatus.syncProgress);
console.log('待处理操作数:', syncStatus.pendingOperations);

// 注册同步事件监听器
offlineManager.events.on('syncStart', () => {
   
  showSyncIndicator();
});

offlineManager.events.on('syncComplete', (result) => {
   
  hideSyncIndicator();
  if (result.success) {
   
    showSuccessNotification('同步完成');
  } else {
   
    showErrorNotification('同步失败', result.error);
  }
});

这些示例展示了MCP客户端在会话和状态管理方面的高级用法,包括会话创建与管理、状态同步与冲突解决、实时状态订阅以及离线状态管理等。通过这些功能,你可以构建具有强大状态管理能力的客户端应用程序,为用户提供流畅的体验,即使在网络不稳定的情况下也能正常工作。

二、自定义客户端封装

在实际应用中,MCP默认的客户端API虽然功能强大,但有时需要针对特定业务需求进行定制化封装,以提供更加简洁和语义化的API。本章将探讨如何基于MCP SDK创建自定义客户端封装。

2.1 领域特定客户端设计

当你的应用有特定的业务领域时,创建领域特定的客户端可以极大提高开发效率和代码可读性。以下是一个内容管理系统(CMS)的客户端封装示例:

import {
    McpClient, HttpTransport } from '@modelcontextprotocol/sdk';

/**
 * CMS客户端类,特定于内容管理系统领域
 */
export class CmsClient {
   
  private client: McpClient;
  
  constructor(options: {
   
    apiUrl: string;
    authToken?: string;
    timeout?: number;
  }) {
   
    // 初始化底层MCP客户端
    this.client = new McpClient({
   
      clientId: 'cms-client-v1',
      transport: new HttpTransport({
   
        baseUrl: options.apiUrl,
        authToken: options.authToken,
        timeout: options.timeout || 30000,
      }),
    });
  }
  
  /**
   * 初始化客户端
   */
  async initialize(): Promise<void> {
   
    await this.client.initialize();
  }
  
  /**
   * 关闭客户端连接
   */
  async shutdown(): Promise<void> {
   
    await this.client.shutdown();
  }
  
  /**
   * 文章相关操作
   */
  articles = {
   
    /**
     * 获取文章列表
     */
    list: async (options?: {
   
      page?: numbe

网站公告

今日签到

点亮在社区的每一天
去签到