WebSocket功能完整解析

发布于:2025-08-31 ⋅ 阅读:(23) ⋅ 点赞:(0)

WebSocket功能完整解析

概述

使用WebSocket实现实时双向通信。整个WebSocket系统设计得非常完善,包含了连接管理、消息处理、状态同步等完整功能。

架构设计

1. 分层架构

┌─────────────────┐
│   页面组件层     │  (chat/[id].tsx)
├─────────────────┤
│   状态管理层     │  (stores/chat)
├─────────────────┤
│   Hook封装层     │  (hooks/useWebSocket)
├─────────────────┤
│   工具类层       │  (utils/websocket.ts)
├─────────────────┤
│   WebSocket API  │  (浏览器原生API)
└─────────────────┘

核心组件详解

1. WebSocket管理器 (src/utils/websocket.ts)

这是整个WebSocket系统的核心底层,负责与浏览器原生WebSocket API交互。

主要功能:
export class WebSocketManager {
  private ws: WebSocket | null = null
  private options: WebSocketOptions

  // 连接WebSocket
  connect(): Promise<void>
  
  // 断开连接
  disconnect(): void
  
  // 发送消息
  send(data: any): void
  
  // 获取连接状态
  get isConnected(): boolean
  
  // 设置各种回调函数
  setMessageCallback(callback: (message: any) => void): void
  setConnectCallback(callback: () => void): void
  setDisconnectCallback(callback: () => void): void
}
关键特性:
  • Promise化连接:使用Promise包装连接过程,便于异步处理
  • 自动JSON解析:接收消息时自动解析JSON格式
  • 错误处理:完善的错误处理和日志记录
  • 状态管理:实时跟踪连接状态

2. WebSocket Hook (src/hooks/useWebSocket.ts)

这是Vue 3 Composition API的封装层,将WebSocket管理器与Vue组件生命周期结合。

主要功能:
export function useWebSocket(options: WSOptions) {
  const socketManager = ref<WebSocketManager | null>(null)

  // 连接WebSocket
  const connect = async (params?: WSOptions) => {
    // 检查token
    if (!useCookie('token').value) return
    
    // 创建连接管理器
    socketManager.value = createWebSocketManager(
      typeof innerParmas === 'function' ? innerParmas() : innerParmas
    )
    
    // 建立连接
    await socketManager.value?.connect()
    
    // 设置断开连接回调
    socketManager.value?.setDisconnectCallback(() => {
      const { message } = useGlobalComponent()
      message.error('网络连接断开,请刷新页面')
    })
  }

  // 发送消息
  const sendMessage = <T>(data: T) => {
    if (!socketManager.value?.isConnected) {
      const { message } = useGlobalComponent()
      message.error('网络连接断开,请刷新页面')
    }
    socketManager.value?.send(data)
  }

  // 断开连接
  const disconnect = () => {
    socketManager.value?.disconnect()
  }

  // 检查连接状态
  const isConnected = () => socketManager.value?.isConnected

  // 组件卸载时自动断开连接
  onUnmounted(disconnect)

  return { sendMessage, connect, disconnect, isConnected }
}
关键特性:
  • 自动生命周期管理:组件卸载时自动断开连接
  • Token验证:连接前检查用户token
  • 错误提示:连接断开时自动显示错误信息
  • 类型安全:完整的TypeScript类型支持

3. 聊天状态管理 (src/stores/chat/methods.ts)

这是业务逻辑层,使用Pinia管理聊天相关的状态和操作。

核心状态:
const state = reactive<ChatState>({
  list: [],           // 消息列表
  title: '',          // 会话标题
  isFirst: false,     // 是否首次会话
  agentState: {       // Agent状态
    content: 'switch Agent can complete the selection process',
    show: false,
    modelType: 'model',
    thinking: false,
    network: false,
  },
})
WebSocket集成:
const { sendMessage, connect, disconnect, isConnected } = useWebSocket(
  () => ({
    // WebSocket服务器地址
    url: `ws://192.168.201.49:8088/api/websocket?token=${useCookie('token').value}`,
    
    // 消息处理回调
    onMessage: msg => {
      // 处理接收到的消息
      state.list = onMessage(msg, state.list)
      
      // 处理会话完成逻辑
      if (state.isFirst && [NotificationType.FINISH, NotificationType.END].includes(msg?.data?.type)) {
        getChatSummary(msg.data.session_id).then(res => {
          state.title = res
          state.isFirst = false
        })
      }
    },
  })
)
发送消息功能:
const send = (data: string, id: number) => {
  const lastMsg = state.list[state.list.length - 1]
  let callerInstanceId = ''
  
  // 获取调用者实例ID
  if (lastMsg && 'caller' in lastMsg) {
    callerInstanceId = (lastMsg?.caller as { instance_id: string })?.instance_id
  }
  
  // 创建用户消息和响应消息
  const msg = createUserMessage(data, id, callerInstanceId)
  const question = createResponseMessage(data, id, callerInstanceId)
  
  // 更新消息列表
  if (state.list.length) {
    if (state.list[state.list.length - 1]?.session_id === id) {
      state.list = [...state.list, question]
    } else {
      state.list = [question]
      state.isFirst = true
    }
  } else {
    state.list = [question]
    state.isFirst = true
  }
  
  // 发送WebSocket消息
  sendMessage(msg)
  return question
}
路由监听:
watch(
  () => route.path,
  () => {
    if (import.meta.server) return
    
    // 重置状态
    setTitle('')
    state.list = []
    
    // 检查token
    if (!useCookie('token').value) {
      return disconnect()
    }
    
    // 如果已连接则跳过
    if (isConnected()) return
    
    // 建立连接
    connect()
  },
  { immediate: true }
)

4. 消息处理工具 (src/stores/chat/utils.ts)

这是消息处理的核心逻辑,负责解析、转换和更新消息。

消息创建函数:

创建用户消息:

export function createUserMessage(
  content: string,
  session_id: number,
  instance_id?: string
): ChatMessageWrapper {
  return {
    direction: ChatDirection.INPUT,
    type: ChatType.CHAT,
    data: {
      session_id,
      type: 'user_input',
      content: [{ id: 0, step: 0, type: 'text', content }],
      agents: [{
        agent_id: 'agent-commander',
        custom_id: 'agent-commander',
        published_version: '1.0.0',
        url: 'local',
        type: '',
        agent_provider_id: 2106919896481796,
        description: '',
        name: '',
      }],
      tools: [],
      caller: { type: 'user', instance_id: instance_id || '' },
    },
  }
}

创建响应消息:

export function createResponseMessage(
  content: string,
  session_id: number,
  instance_id?: string
): ResponseMessageData {
  return {
    id: dayjs().valueOf(),
    type: NotificationType.QUESTION,
    content: [{ id: 0, type: 'text', content, step: 0 }],
    timestamp: Date.now(),
    read: false,
    session_id,
    event_id: '',
    task_id: 0,
    agent_id: '',
    agent_instance_id: 0,
    parent_agent_instance_id: 0,
    step: 0,
    artifact: [],
    action: {},
    ext: {},
    call_batch_id: '',
    caller_type: 'user',
    caller_instance_id: instance_id || '',
    user_id: '',
    stream_uuid: '',
    tree: [],
    has_todo: false,
  }
}
核心消息处理函数:
export function onMessage(
  msg: ChatMessageWrapper,
  messages: ResponseMessageData[]
) {
  const data = msg.data as ResponseMessageData
  
  if (msg.type === ChatType.CHAT && data.type) {
    if (data.type === NotificationType.START) {
      // 开始新会话
      return [...messages, { ...data, tree: [] }]
    } else {
      // 更新现有消息
      const targetIndex = messages.findIndex(i => i.event_id === data.event_id)
      if (targetIndex === -1) return messages

      const newMessages = [...messages]
      const targetData = newMessages[targetIndex]!
      const target = [...targetData.content]

      // 处理内容更新(关键:累加内容)
      if (data.type !== NotificationType.AGENT_START) {
        data.content.forEach(item => {
          const targetContentItemIndex = target.findIndex(i =>
            i.id === data.agent_instance_id &&
            i.type === item.type &&
            i.step === data.step
          )
          
          if (targetContentItemIndex !== -1) {
            // 累加内容(实现流式渲染)
            const currentItem = target[targetContentItemIndex]!
            target[targetContentItemIndex] = {
              ...currentItem,
              content: currentItem.content + item.content,
            }
          } else {
            // 添加新内容项
            target.push({
              id: data.agent_instance_id,
              type: item.type,
              content: item.content,
              step: data.step,
            })
          }
        })
      }

      // 处理附件
      data.artifact.forEach(item => {
        // 处理附件状态更新逻辑
      })
      
      // 更新树形结构
      createTree(targetData.tree, data)

      const newData = { ...data, tree: [...targetData.tree], content: target }
      newMessages[targetIndex] = newData
      return newMessages
    }
  }
  return messages
}

5. 树形结构管理 (src/stores/chat/utils.ts)

负责管理AI Agent的执行流程树形结构。

export function createTree(
  tree: ResponseMessageData['tree'],
  data: ResponseMessageData
) {
  const node: (typeof tree)[0] = {
    plan_id: data.agent_instance_id,
    type: 'base',
    loading: true,
    parent_plan_id: data.parent_agent_instance_id,
    child_plan_ids: [],
    content: [],
    status: 'start',
    label: data.agent_id,
    timestamp: data.timestamp,
  }
  
  // 处理不同类型的消息
  switch (data.type) {
    case NotificationType.AGENT_START:
      // Agent开始执行
      node.content = data.content.reduce((acc, item) => {
        if (item.type === 'text') {
          acc.push({
            id: data.agent_instance_id,
            type: item.type,
            content: item.content,
            step: data.step,
          })
        }
        return acc
      }, [] as (typeof tree)[0]['content'])
      tree.push(node)
      break
      
    case NotificationType.AGENT_END:
      // Agent执行结束
      target.loading = false
      target.status = 'end'
      break
      
    case NotificationType.ARTIFACT_START:
      // 附件处理开始
      tree.push(node)
      break
      
    case NotificationType.ARTIFACT_END:
      // 附件处理结束
      target.loading = false
      target.status = 'end'
      break
      
    case NotificationType.ARTIFACT_RESULT:
      // 附件处理结果
      target.content = data.content.map(item => ({
        id: data.agent_instance_id,
        type: 'artifact',
        content: JSON.stringify(item),
        step: data.step,
        status: 'end',
      }))
      break
  }
  
  // 更新父子关系
  tree.forEach((item, index) => {
    if (!index) {
      item.child_plan_ids = []
      return
    }
    if (item.parent_plan_id === -1) {
      tree[0]!.child_plan_ids.push(item.plan_id)
    }
    item.child_plan_ids = tree.reduce((acc, curr) => {
      if (curr.parent_plan_id === item.plan_id) {
        acc.push(curr.plan_id)
      }
      return acc
    }, [] as number[])
  })
}

消息类型系统

1. 基础枚举类型 (src/types/chat.ts)

// WebSocket消息类型
export enum ChatType {
  CHAT = 'chat',           // 聊天消息
  HEADRBEAT = 'heartbeat', // 心跳
  CONNECTED = 'connected', // 连接
  FINISH = 'finish',       // 正常结束
  END = 'end',             // SSE断开结束
  START = 'start',         // 开始
}

// 通知类型(业务消息类型)
export enum NotificationType {
  QUESTION = 'question',           // 用户问题
  START = 'start',                 // 开始
  MODEL_START = 'model_start',     // 模型开始
  MODEL_RESULT = 'model_result',   // 模型结果
  MODEL_END = 'model_end',         // 模型结束
  AGENT_START = 'agent_start',     // Agent开始
  AGENT_RESULT = 'agent_result',   // Agent结果
  AGENT_END = 'agent_end',         // Agent结束
  ARTIFACT_START = 'artifact_start', // 附件开始
  ARTIFACT_RESULT = 'artifact_result', // 附件结果
  ARTIFACT_END = 'artifact_end',   // 附件结束
  FINISH = 'finish',               // 完成
  ERROR = 'error',                 // 错误
  END = 'end',                     // 结束
}

2. 消息数据结构

// 响应消息数据
export interface ResponseMessageData {
  has_todo: boolean
  id: number
  type: NotificationType
  content: ContentItem[]           // 消息内容数组
  timestamp: number
  read: boolean
  session_id: number
  event_id: string
  task_id: number
  agent_id: string
  agent_instance_id: number
  parent_agent_instance_id: number
  step: number
  artifact: Artifact[]            // 附件数组
  action: {}
  ext: {}
  call_batch_id: string
  caller_type: 'user' | 'agent' | 'event_source' | 'unknown'
  caller_instance_id: string
  user_id: string
  stream_uuid: string
  tree: WSTreeNode[]              // 树形结构
}

// 内容项
export interface ContentItem {
  id: number
  type: 'text' | 'think' | 'artifact' | 'image_url' | 'image_base64'
  status?: 'start' | 'end'
  content: string
  read?: boolean
  step: number
}

// 树节点
export interface WSTreeNode {
  plan_id: number
  type: 'start' | 'base'
  loading: boolean
  parent_plan_id: number | null
  child_plan_ids: number[]
  content: ContentItem[]
  status: 'start' | 'end'
  label: string
  timestamp: number
}

页面集成

1. 聊天页面 (src/pages/chat/[id].tsx)

// 发送消息回调
const sendCallback = (val: string) => {
  console.log('发送消息123123:', val)
  
  // 检查WebSocket连接状态
  if (!chatStore.isConnected()) {
    return message.error('WebSocket未连接, 无法发送消息')
  }
  
  if (!route.params.id) return
  
  // 发送消息并更新UI
  chatStore.value.list = [
    ...chatStore.value.list,
    chatStore.send(val, Number(route.params.id)),
  ]
}

// 监听消息变化,更新画布
watch(
  () => chatStore.value.list,
  val => {
    if (val[val.length - 1]?.tree) {
      if (timer) return
      timer = setTimeout(() => {
        timer = null
        const { tree } = chatStore.value.list[chatStore.value.list.length - 1]!
        state.data = generateVueFlowData(tree)  // 生成画布数据
        state.loading = false
      }, 300)  // 防抖处理
    }
  }
)

SSE支持

项目还支持Server-Sent Events (SSE)作为备选方案:

1. SSE连接类 (src/utils/sse.ts)

export class SSEConnection {
  private eventSource: EventSource | null = null
  private messageCallback?: (data: any) => void

  // 连接SSE
  connect(): Promise<void>
  
  // 断开连接
  disconnect(): void
  
  // 设置消息回调
  setMessageCallback(callback: (data: any) => void): void
  
  // 获取连接状态
  get isConnected(): boolean
}

2. SSE API (src/api/chat/method.ts)

// 创建SSE连接
export function createChatSSE(
  sessionId: string,
  options?: {
    onMessage?: (data: any) => void
  }
): SSEConnection {
  const baseURL = '/api'
  const sseUrl = `${baseURL}/events/${sessionId}`
  return createSSEConnection(sseUrl, options)
}

完整工作流程

1. 连接建立流程

1. 用户访问聊天页面
2. 检查用户token
3. 创建WebSocket连接管理器
4. 建立WebSocket连接
5. 设置消息处理回调
6. 设置断开连接回调

2. 消息发送流程

1. 用户在输入框输入消息
2. 点击发送按钮
3. 检查WebSocket连接状态
4. 创建用户消息对象
5. 创建响应消息对象
6. 更新本地消息列表
7. 通过WebSocket发送消息
8. 触发UI更新

3. 消息接收流程

1. WebSocket接收到服务器消息
2. 解析JSON数据
3. 调用onMessage处理函数
4. 根据消息类型进行不同处理
5. 更新消息列表状态
6. 触发Vue响应式更新
7. 更新UI显示
8. 如果是流式消息,累加内容实现打字机效果

4. 流式渲染机制

1. 服务器发送部分内容
2. 前端接收并累加到现有内容
3. 触发Vue响应式更新
4. TextChunk组件检测内容变化
5. 启动打字机动画
6. 逐字显示内容
7. 动画完成后停止
8. 等待下一批内容

关键特性总结

1. 实时双向通信

  • WebSocket提供全双工通信
  • 支持实时消息发送和接收

2. 流式渲染

  • 内容累加机制实现流式显示
  • 打字机效果提升用户体验

3. 状态管理

  • Pinia管理全局状态
  • 响应式更新确保UI同步

4. 错误处理

  • 完善的连接错误处理
  • 自动重连和用户提示

5. 生命周期管理

  • 自动连接和断开
  • 组件卸载时清理资源

6. 类型安全

  • 完整的TypeScript类型定义
  • 编译时错误检查

7. 扩展性

  • 支持多种消息类型
  • 模块化设计便于扩展

这个WebSocket系统设计得非常完善,既保证了功能的完整性,又考虑了性能和用户体验。通过分层架构和模块化设计,代码结构清晰,易于维护和扩展。