流式接口,断点续传解决方案及实现

发布于:2025-07-27 ⋅ 阅读:(24) ⋅ 点赞:(0)

背景:用户刷新页面或者切换tab页后断链的续流问题

目前有两种方案:

方案一:后端:Mongo + 实时存 (数据库压力大,每个流都要进行一次入库操作)    前端轮询

方案二:后端:Redis + Mongo (推荐, 流的过程中使用Redis,流数据结束后再去入一次库)

              前端:调新接口

方案一实现:

public SseEmitter sendinfo (QuestionnaireDTO dto) {       
      Flux<String> flux = algorithmUtils.code_bswj(dto, webClient);
      flux
                .doOnError(e -> {
                    try {
                        completeEmitter(emitter, e, isCompleted);
                    } catch (IOException ex) {
                        throw new RuntimeException(ex);
                    }
                })// 处理客户端断开连接
                .doOnComplete(() -> {
                    try {
                        completeEmitter(emitter, null, isCompleted);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }) // 传null表示正常完成
                .subscribe(data -> {
                    try {
                        sendDataToEmitter(emitter, data, isCompleted);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });// 订阅 Flux 并发送数据到 SseEmitter
        return emitter;

}

private void sendDataToEmitter(SseEmitter emitter, String data, AtomicBoolean isCompleted) throws IOException {
        if (!isCompleted.get()) {
            try {
                processChunk(data, questionnaire,isCompleted,  traceId);
            } catch (IOException | JSONException e) {
                if (e instanceof ClientAbortException) {
                    this.emitter = new SseEmitterUTF8(1000000L);
                    return;
                }
                completeEmitter(emitter, e, isCompleted,questionnaire); // 处理发送过程中的异常
            }
        }
    }

private void processChunk( String chunk, Questionnaire questionnaire, AtomicBoolean isCompleted) throws IOException, JSONException {
        System.out.println("chunk handing before =========>:" + chunk);
        if (!StringUtils.hasText(chunk)) return;
        JSONObject jsonObject = new JSONObject(chunk);
        if (!jsonObject.getBoolean("is_success")) {
            emitter.send(jsonObject.getString("err_msg"));
            return;
        }

        String result = jsonObject.getString("results");
        JSONObject jsonObjectRes = new JSONObject(result);
        String type = jsonObjectRes.getString("type");//answer
        String value = jsonObjectRes.getString("value");
        JSONObject response = new JSONObject();
        JSONObject results = new JSONObject();
        if (type.equals("progress_indicator")) {
            JSONObject valueJson = new JSONObject(value);
            String text = valueJson.getString("text");
            if (text.equals("[DONE]")) {
                questionnaireRepository.save(questionnaire);
                completeEmitter(emitter, null, isCompleted, questionnaire);
                return;
            }
            results.put("type", "progress_indicator");
            results.put("data", text);
            if (isBase) {
                String progressBase = questionnaire.getProgressBase();
                progressBase = progressBase == null ? "" : progressBase;
                progressBase += text;
                questionnaire.setProgressBase(progressBase);
            } else {
                String progressCustom = questionnaire.getProgressCustom();
                progressCustom = progressCustom == null ? "" : progressCustom;
                progressCustom += text;
                questionnaire.setProgressCustom(progressCustom);
            }
            questionnaireRepository.save(questionnaire);
        } else if (type.equals("survey")) {
            JSONObject valueJson = new JSONObject(value);
            String customSurvey = valueJson.getString("custom_survey");
            questionnaire.setCustomQuestion(customSurvey);
            questionnaireRepository.save(questionnaire);
            results.put("type", "survey");
            results.put("data", customSurvey);
        }else if (type.equals("finished_thinking")) {
            JSONObject valueJson = new JSONObject(value);
            String text = valueJson.getString("text");
            results.put("type", "finished_thinking");
            results.put("data", text);
            if (isBase) {
                questionnaire.setStatus("SurveyGenerating");
            }else {
                questionnaire.setStatus("MapGenerating");
            }
            questionnaireRepository.save(questionnaire);
        }
        response.put("results", results);
        if (!isCompleted.get()){
            System.out.println("后端发送给前端时间:" + TimeUtil.getCurrentTime());
            emitter.send(response.toString());
        }

    }

通过实时插入数据库,前端感知到刷新或者切换tab后,根据状态轮询调用历史记录接口,因为此时后端与算法的流还没有断开,所以是在实时保存的。前端此时轮询历史接口可以伪造出流式输出的形式,使用户无感知。

方案二实现:(代码几乎同上,就是在处理流式的方法里做了改动)

private void processChunk( String chunk, Questionnaire questionnaire, AtomicBoolean isCompleted, String traceId) throws IOException, JSONException {
        System.out.println("chunk handing before =========>:" + chunk);
        if (!StringUtils.hasText(chunk)) return;
        JSONObject jsonObject = new JSONObject(chunk);
        if (!jsonObject.getBoolean("is_success")) {
            emitter.send(jsonObject.getString("err_msg"));
            return;
        }

        String result = jsonObject.getString("results");
        JSONObject jsonObjectRes = new JSONObject(result);
        String type = jsonObjectRes.getString("type");//answer
        String value = jsonObjectRes.getString("value");
        JSONObject response = new JSONObject();
        JSONObject results = new JSONObject();
        if (type.equals("progress_indicator")) {
            JSONObject valueJson = new JSONObject(value);
            String text = valueJson.getString("text");
            if (text.equals("[DONE]")) {
                
                questionnaireRepository.save(questionnaire);
                //设置redis中key为questionnaire.getId()的过期时间为5分钟,目前redis中是有这个key的
//                redisService.expire(questionnaire.getId(), 1);
                completeEmitter(emitter, null, isCompleted, questionnaire);
                return;
            }
            results.put("type", "progress_indicator");
            results.put("data", text);
            if (isBase) {
                String progressBase = questionnaire.getProgressBase();
                progressBase = progressBase == null ? "" : progressBase;
                progressBase += text;
                questionnaire.setProgressBase(progressBase);
            } else {
                String progressCustom = questionnaire.getProgressCustom();
                progressCustom = progressCustom == null ? "" : progressCustom;
                progressCustom += text;
                questionnaire.setProgressCustom(progressCustom);
            }
        } else if (type.equals("survey")) {
            JSONObject valueJson = new JSONObject(value);
            String customSurvey = valueJson.getString("custom_survey");
            questionnaire.setCustomQuestion(customSurvey);
            questionnaireRepository.save(questionnaire);
            accompanyLearningLog.uploadLogByTranceId("processChunk", "后端处理定制问卷", "INFO", JsonUtil.object2Json(customSurvey),traceId);
            results.put("type", "survey");
            results.put("data", customSurvey);
        } else if (type.equals("text")) {
            JSONObject valueJson = new JSONObject(value);
            String text = valueJson.getString("text");
            results.put("type", "text");
            results.put("data", text);
            questionnaire.setPlanCustom(text);
            accompanyLearningLog.uploadLogByTranceId("processChunk", "后端处理多链路地图", "INFO", JsonUtil.object2Json(text),traceId);
            questionnaireRepository.save(questionnaire);
        }else if (type.equals("finished_thinking")) {
            JSONObject valueJson = new JSONObject(value);
            String text = valueJson.getString("text");
            results.put("type", "finished_thinking");
            results.put("data", text);
            if (isBase) {
                questionnaire.setStatus("SurveyGenerating");
            }else {
                questionnaire.setStatus("MapGenerating");
            }
            questionnaireRepository.save(questionnaire);
        }
        response.put("results", results);
        //这里有了一个新的逻辑,把思维链保存的redis的队列中,然后如果前端断开连接,想要做断点续传,就从队列中取出,然后继续,队列的key是questionnaire.getId()
        // 把思维链保存到 Redis 队列中
        redisService.rightPush(questionnaire.getId(), response.toString());
        if (!isCompleted.get()){
            System.out.println("后端发送给前端时间:" + TimeUtil.getCurrentTime());
            emitter.send(response.toString());
        }

    }

前端刷新页面或者来回切换了tab后会调用我的新接口,也是一个流式输出的接口

    /**
     * 断点续传方法,用于从 Redis 中获取数据并通过 SseEmitter 发送给前端。
     * 如果数据库中有符合条件的数据则直接处理,否则从 Redis 中获取数据。
     */
    @Override
    public SseEmitter refreshStream(QuestionnaireDTO dto, String userId, String traceId) {
        // 创建一个 SseEmitter 对象,设置超时时间为 1000000 毫秒
        SseEmitter emitter = new SseEmitterUTF8(1000000L);
        // 先从数据库取数据,如果取到了就不查redis了
        /*Questionnaire questionnaire = questionnaireRepository.findByUserId(userId);
        if (questionnaire != null) {
            try {
                if(questionnaire.getStatus().equals("baseCompleted")||questionnaire.getStatus().equals("customCompleted")){
                    // 若问卷状态为 baseCompleted 或 customCompleted,发送 [DONE] 并完成 SseEmitter
                    emitter.send("[DONE]");
                    emitter.complete();
                }
                return emitter;
            } catch (Exception e) {
                // 处理异常,将错误信息记录到日志并完成 SseEmitter
                emitter.completeWithError(e);
                accompanyLearningLog.uploadLogByTranceId("refreshStream", "从数据库获取数据发送失败", "ERROR", JsonUtil.object2Json(e), traceId);
                return emitter;
            }
        }*/

        // 获取 Redis 中数据列表的键
        String key = dto.getId();
        // 从redis中取key为dto.getId()的列表的长度
        Long listLength = redisService.size(key);

        // 若 Redis 列表长度不为空且大于 0
        if (listLength != null && listLength > 0) {
            // 记录这个长度
            final long[] initialLength = {listLength};
            // 从redis中取列表中的数据
            List<Object> redisDataList = redisService.range(key, 0, initialLength[0] - 1);
            // 用于拼接 type 为 progress_indicator 的 data 数据
            StringBuilder progressData = new StringBuilder();
            // 标记 SseEmitter 是否已经完成
            AtomicBoolean isEmitterCompleted = new AtomicBoolean(false);


            // 遍历 Redis 数据列表
            for (Object data : redisDataList) {
                try {
                    // 将数据转换为 JSONObject
                    JSONObject jsonData = new JSONObject(data.toString());
                    // 获取 results 字段
                    JSONObject results = jsonData.getJSONObject("results");
                    // 获取 type 字段
                    String type = results.getString("type");
                    if("generate_indicator".equals(type)){
                        // 若 type 为 generate_indicator,直接发送数据
                        emitter.send(data.toString());
                    }else  if ("progress_indicator".equals(type)) {
                        // 若 type 为 progress_indicator,拼接 data 数据
                        progressData.append(results.getString("data"));
                    } else {
                        // 若 SseEmitter 未完成,发送非 progress_indicator 类型的数据
                        if (!isEmitterCompleted.get()) {
                            System.out.println("data:" + data.toString());
                            emitter.send(data.toString());
                        }
                    }
                } catch (JSONException e) {
                }catch (IOException e) {
                    // 处理发送数据时的 IO 异常,完成 SseEmitter 并记录日志
                    emitter.completeWithError(e);
                }
            }

            // 以{"results":{"type":"progress_indicator","data":"所有的data"}}格式推给前端
            try {
                // 若 progressData 不为空
                if (progressData.length() > 0) {
                    // 创建响应 JSON 对象
                    JSONObject response = new JSONObject();
                    JSONObject results = new JSONObject();
                    results.put("type", "progress_indicator");
                    results.put("data", progressData.toString());
                    response.put("results", results);
                    System.out.println("response:"+response.toString());
                    // 若 SseEmitter 未完成,发送拼接后的 progress_indicator 数据
                    if (!isEmitterCompleted.get()) {
                        emitter.send(response.toString());
                    }
                }

                // 从记录的长度开始,后面的就不需要拼接了,直接取到后推给前端就可以了
                // 使用数组包装 subscription
                final Disposable[] subscription = new Disposable[1];

                // 每秒检查一次 Redis 列表长度是否有变化
                subscription[0]  =  Flux.interval(Duration.ofSeconds(1))
                        .subscribe(interval -> {
                            // 获取当前 Redis 列表长度
                            Long currentLength = redisService.size(key);
                            if (currentLength != null && currentLength > initialLength[0]) {
                                // 获取新增的数据
                                List<Object> newDataList = redisService.range(key, initialLength[0], currentLength - 1);
                                for (Object newData : newDataList) {
                                    try {
                                        if(newData.toString().equals("[DONE]")){
                                            // 若数据为 [DONE],发送数据,完成 SseEmitter,取消订阅并删除 Redis 键
                                            if (!isEmitterCompleted.get()) {
                                                emitter.send(newData.toString());
                                                emitter.complete();
                                                isEmitterCompleted.set(true);
                                                // 取消订阅
                                                if (subscription[0] != null) {
                                                    subscription[0].dispose();
                                                }
                                                redisService.delete(key);
                                            }
                                            return;
                                        }
                                        System.out.println("newData:"+newData.toString());
                                        // 若 SseEmitter 未完成,发送新增数据
                                        if (!isEmitterCompleted.get()) {
                                            emitter.send(newData.toString());
                                            try {
                                                // 线程休眠 50 毫秒
                                                Thread.sleep(50);
                                            } catch (InterruptedException e) {
                                                // 处理线程休眠中断异常,完成 SseEmitter 
                                                Thread.currentThread().interrupt();
                                                emitter.completeWithError(e);
                             
                                            }
                                        }
                                    } catch (IOException e) {
                                        emitter.completeWithError(e);
               
                                    }
                                }
                                // 更新初始长度
                                initialLength[0] = currentLength;
                            }
                        });
             
            } catch (Exception e) {
                // 处理异常,完成 SseEmitter 并记录日志
                emitter.completeWithError(e);
            }
        } else {
            try {
                // 若 Redis 列表长度为空或为 0,完成 SseEmitter
                emitter.complete();
            } catch (Exception e) {
                // 处理异常,完成 SseEmitter 并记录日志
                emitter.completeWithError(e);
            }
        }
        return emitter;
    }


网站公告

今日签到

点亮在社区的每一天
去签到