taro + vue3 实现小程序sse长连接实时对话

发布于:2025-05-29 ⋅ 阅读:(48) ⋅ 点赞:(0)

前言

taro.request是可以实现sse长连接的,但是呢其中有俩大坑,找了许多资料也没解决,后续解决办法也与后端商量改用WebSocket来实现。

代码实现

SSEManager.js:

import { getAccessToken } from "../xx/xx";
import { TextDecoder } from 'text-encoding';
import Taro from "@tarojs/taro";
const regexp = /\[DONE\]/;   // 规定结束标志
const decoder = new TextDecoder("utf-8");

/**
 * 外部需要限制SSE连接,无法获取时,自行处理
 **/
export function getSSEConnection(
  connectId,
  sessionId,
  messageEmitter,
  closeHandler
) {
  console.log("[SSE] getSSEConnection", connectId, sessionId);
  let token = getAccessToken();

  const requestTask = Taro.request({
    url:'http://xxxx:xx/chatConnect',
    method: "GET",
    data: { id: connectId },
    responseType: "arraybuffer",        // 响应的数据类型
    header: {
      Authorization: token,
      Accept: "text/event-stream",
    },
    enableChunked: true,                // 关键配置  开启后数据将以分块形式传输
    timeout:6 * 1000,                   // 该配置在sse中没用
    fail: (err) => {
      console.log("err", err);
      
      if (err.errMsg.includes("timeout")) {
        closeHandler('timeout');
        console.log('回答超时了',connectId, sessionId);
      } else {
        closeHandler('fail');
        console.log('回答出错了',connectId, sessionId);
      }
    }
  });

  // 接收到新的chunk时触发
  requestTask.onChunkReceived((chunk) => {
    const responseText = decoder.decode(chunk.data);
    let plain = responseText.replace(/^data:/gm, "");
    plain = plain
    .replace(/\n{2,}/g, "\n")    
    .replace(/^\n+|\n+$/g, "")
    .trim();

    messageEmitter(sessionId, plain);
    if (regexp.test(responseText)) {
      console.log('回答完成',connectId, sessionId);
      closeHandler('complete');
    }
  });

  return requestTask
}

使用

import { getSSEConnection } from '../sse/SSEManager.js';
import mitt from 'mitt';
const eventBus = mitt();
const sessionId = ref () // 会话id
let chat

const chatSendEvent = async () => {
  eventBus.on(sessionId.value, (data) => {
     // 对获取的数据块处理
     console.log( data);
     chat += data
  });
  // 获取SSE链接
  getSSEConnectionEvent()
}

const getSSEConnectionEvent = async () => {

  const globalId = nanoid(64); // 链接id 请求参数

  requestTaskCleanup.value = getSSEConnection(
    globalId,
    sessionId.value,
    (key, value) => { eventBus.emit(key, value); },
    async (why) => {
      console.log("[event] close!", why);
      if (why == 'complete') {}
      // TODO 超时 
      else if (why == 'timeout') {} 
      else if (why == 'abort') {}
      else {}

      eventBus.all.clear();
    }
  )
  // 发送提示词 
  sendStreamEvent(globalId)

}

以上就是整个过程使用,其中的数据处理根据业务来定。现在来讲讲其中的坑:

坑 

超时无法捕获:

在开启分块传输后,timeout配置就不生效了,超时也无法捕获了。

解决:

自己设置定时器,在超时后断开连接,requestTask.abort(); 但是这个方法是无效的,无法断开。

参考以下文章,可知该问题是一直没得到解决的。 微信开放社区https://developers.weixin.qq.com/community/develop/doc/000caa13bd8fb080ed7d318fb57800

在前端无法主动断开连接的情况下,前端只能实现不接收后端返回的数据块,表面上看起来像是断开了连接。

实现代码如下(SSEManager.js):

超时的时候 通过一个变量状态来控制是否继续接收返回的数据块。当然也可以通过与后端设置一样的超时时间 通过后端的报错 来直接处理。

import { getAccessToken } from "../xx/xx";
import { TextDecoder } from 'text-encoding';
import Taro from "@tarojs/taro";
const regexp = /\[DONE\]/;

const decoder = new TextDecoder("utf-8");
/**
 * 外部需要限制SSE连接,无法获取时,自行处理
 *
 * **/
export function getSSEConnection(
  connectId,
  sessionId,
  messageEmitter,
  closeHandler
) {
  console.log("[SSE] getSSEConnection", connectId, sessionId);
  let token = getAccessToken();
  let shouldIgnoreData = false;  // 新增 控制是否接收数据块 为true 不接收

  //  新增 设置超时定时器
  const timeoutTimer = setTimeout(() => {
    if (!shouldIgnoreData) {
      closeHandler('timeout');
      cleanup();
    }
  }, 182 * 1000 );  //  保底报错结束  这里保底是指超时时间与后端设置一样的

  const requestTask = Taro.request({
    url: `http://xxxx:xx/chatConnect`,
    method: "GET",
    data: { id: connectId },
    responseType: "arraybuffer",
    header: {
      Authorization: token,
      Accept: "text/event-stream",
    },
    enableChunked: true,
  });

  // 新增 资源清理函数
  const cleanup = () => {
    if (shouldIgnoreData) return;

    shouldIgnoreData = true;  // 暂解决超时处理 忽略后续数据
    clearTimeout(timeoutTimer);
    // 终止请求--------TODO未终止
    // 取消监听获取数据块 与使用 shouldIgnoreData 变量来控制数据的接收是一样的 可选其一种方式
    requestTask.offChunkReceived(messageEmitter);   
    requestTask.abort();   // 不生效
  };

  //  新增 请求报错处理 返回函数
  requestTask.onHeadersReceived((res) => {
    console.log("SSE request onHeadersReceived:", res);
    if (shouldIgnoreData) return; // 终止后 忽略数据
    clearTimeout(timeoutTimer);
    if (res.statusCode !== 200) {
      closeHandler('fail');
      cleanup();
    }
  });

  requestTask.onChunkReceived((chunk) => {
    if (shouldIgnoreData) return; // 终止后忽略数据

    const responseText = decoder.decode(chunk.data);
    let plain = responseText.replace(/^data:/gm, "");
    plain = plain
    .replace(/\n{2,}/g, "\n")    
    .replace(/^\n+|\n+$/g, "")
    .trim();

    messageEmitter(sessionId, plain);
    if (regexp.test(responseText)) {
      console.log('回答完成');
      closeHandler('complete');
      cleanup();
    }
  });

  // 返回函数 主动不接数据
  const f = () => {
    closeHandler('abort')
    cleanup();
  }

  return f
}

请求无法暂停:

由于requestTask.abort();   // 不生效 ,前端若想处理也可以通过以上行为 不接收返回数据

来实现,改变视觉效果。

总结

以上都是前端来实现的一种方式,在开发过程中以上问题也可以通过和后端配合来解决。

使用taro.request 和 wx.request 实现sse长连接 都是一样的坑(taro.request是基于wx.request封装的),建议改用WebSocket (自己也与后端商议改用了

有更多解决办法,欢迎评论区分享