(4)LangGraph4j框架的人工干预(Human-in-Loop)

发布于:2025-07-15 ⋅ 阅读:(12) ⋅ 点赞:(0)

LangGraph4j框架的人工干预(Human-in-Loop)

在当今快速发展的技术浪潮中,人工智能和机器学习的应用已经渗透到各个领域,从自动化流程到智能决策支持,它们正在重塑我们的工作和生活方式。然而,尽管这些技术取得了巨大的进步,但在某些情况下,人类的智慧和经验仍然是不可或缺的。LangGraph4j的“人在回路”(Human-in-the-Loop)功能正是为了解决这一需求而设计的,它将人类的智慧与机器的效率相结合,为智能工作流带来了前所未有的灵活性和精准性。

一、什么是“Human-in-Loop”

“Human-in-Loop”功能是LangGraph4j的核心亮点之一。该功能允许在工作流的任何点引入人工干预,从而实现对模型输出的验证、更正或附加上下文。这种设计特别适用于大型语言模型(LLM)驱动的应用程序,因为这些模型的输出有时可能需要人工的进一步确认或调整。

二、为什么需要“Human-in-Loop”

尽管大型语言模型在处理自然语言方面表现出色,但它们并非万无一失。模型的输出可能会出现偏差、误解或缺乏必要的上下文信息。例如,在一个自动化的客户服务场景中,模型可能无法完全理解客户的复杂需求,或者在生成建议时忽略了某些关键因素。在这种情况下,人工干预就显得尤为重要。通过“Human-in-Loop”功能,人类可以对模型的输出进行审查、编辑和批准,确保最终结果的准确性和可靠性。

三、“Human-in-Loop”的关键功能

(一)持久执行状态

LangGraph4j的一个显著特点是其全局状态管理功能。该功能允许工作流在任何步骤暂停,等待人工输入,而不会丢失执行上下文。这种设计使得工作流可以在几分钟、几小时甚至几天后恢复执行,从暂停的地方继续进行。这对于需要异步人工审阅或输入的场景非常有用,例如在复杂的决策过程中,人类专家可能需要时间来评估模型的建议并提供反馈。

(二)灵活的集成点

LangGraph4j的“Human-in-Loop”功能提供了灵活的集成点,允许在工作流的任何阶段引入人工干预。这种灵活性使得开发者可以根据具体需求,将人工干预逻辑嵌入到工作流的关键环节。例如,在一个自动化营销流程中,可以在发送关键营销信息之前暂停工作流,让人工审核内容是否符合品牌形象和法律要求。

四、如何实现“Human-in-Loop”?

(1)中断机制

langgraph4j通过自定义异常来实现中断机制,这里我们定义了一个GraphInterruptException。

public class GraphInterruptException extends RuntimeException {

    public GraphInterruptException(String errorMessage) {
        super(errorMessage);
    }
}

当某个节点需要等待人工反馈时,会抛出这个异常,导致整个流程暂停。一旦获得用户反馈后,可以通过特定方法恢复流程的执行。

(2)状态管理

状态管理是实现human-in-loop的关键部分。每个节点都可以访问和修改全局状态State,这使得在不同节点之间传递信息变得简单。同时,状态对象还包含了是否继续执行(resume)的标志位,用于控制流程是否应该被中断。

(3)节点行为定义

通过实现NodeAction接口,可以自定义节点的行为。在这个接口的apply方法中,可以根据当前的状态决定是否需要中断流程,并且可以在中断前后对状态进行更新。

(4)代码实现

LangGraphStreamingServer 类提供了一个基于 Servlet 的流式服务,允许客户端通过 HTTP 请求与服务器端的状态图进行交互。其中,resume 参数的作用是实现人为干预。resume参数用于指示是否从之前的检查点恢复执行流程。当resume为true时,系统会尝试从指定的检查点开始继续执行任务。这种机制允许在任务执行过程中插入人工审核、确认或其他形式的人工干预。

protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
            response.setHeader("Accept", "application/json");
            response.setContentType("text/plain");
            response.setCharacterEncoding("UTF-8");

            //var session = request.getSession(true);
            //Objects.requireNonNull(session, "session cannot be null");

            var threadId = ofNullable(request.getParameter("thread"))
                    .orElseThrow(() -> new IllegalStateException("Missing thread id!"));

            var resume = ofNullable(request.getParameter("resume"))
                    .map(Boolean::parseBoolean).orElse(false);

            final PrintWriter writer = response.getWriter();

            // 开始异步处理
            var asyncContext = request.startAsync();

            try {
                AsyncGenerator<? extends NodeOutput<? extends AgentState>> generator = null;

                var persistentConfig = new PersistentConfig(threadId);

                var compiledGraph = graphCache.get(persistentConfig);

                final Map<String, Object> candidateDataMap;
                if (stateGraph.getStateSerializer() instanceof PlainTextStateSerializer<? extends AgentState> textSerializer) {
                    candidateDataMap = textSerializer.read(new InputStreamReader(request.getInputStream())).data();
                } else {
                    candidateDataMap = objectMapper.readValue(request.getInputStream(), new TypeReference<>() {});
                }

                var dataMap = candidateDataMap.entrySet().stream()
                        .map( entry -> {
                            var newValue = args.stream()
                                    .filter(arg -> arg.name().equals(entry.getKey()) && arg.converter() != null).findAny()
                                    .map(arg -> arg.converter.apply(entry.getValue()));
                            return newValue.map( v -> entryOf(entry.getKey(), v ))
                                    .orElse(entry);
                        })
                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue) );

                // 保存输入
                aigcReportService.saveReport(threadId,String.valueOf(dataMap.get("input")),"",0);

                if (resume) {
                    log.trace("RESUME REQUEST PREPARE");

                    if (compiledGraph == null) {
                        throw new IllegalStateException("Missing CompiledGraph in session!");
                    }

                    var checkpointId = ofNullable(request.getParameter("checkpoint"))
                            .orElseThrow(() -> new IllegalStateException("Missing checkpoint id!"));

                    var node = request.getParameter("node");

                    var runnableConfig = RunnableConfig.builder()
                            .threadId(threadId)
                            .checkPointId(checkpointId)
                            .nextNode(node)
                            .build();

                    var stateSnapshot = compiledGraph.getState(runnableConfig);

                    runnableConfig = stateSnapshot.config();

                    log.trace("RESUME UPDATE STATE FORM {} USING CONFIG {}\n{}", node, runnableConfig, dataMap);

                    // 添加resume
                    dataMap.put(RESUME, true);
                    runnableConfig = compiledGraph.updateState(runnableConfig, dataMap, node);

                    log.trace("RESUME REQUEST STREAM {}", runnableConfig);

                    generator = compiledGraph.streamSnapshots(null, runnableConfig);

                } else {

                    log.trace("dataMap: {}", dataMap);

                    if (compiledGraph == null) {
                        compiledGraph = stateGraph.compile(compileConfig(persistentConfig));
                        graphCache.put(persistentConfig, compiledGraph);
                    }

                    generator = compiledGraph.streamSnapshots(dataMap, runnableConfig(persistentConfig));
                }

                CompiledGraph<?> finalCompiledGraph = compiledGraph;
                generator.forEachAsync(s -> {
                    try {
                        serializeOutput(writer, threadId, s);
                        writer.println();
                        writer.flush();
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        throw new CompletionException(e);
                    }
                })
                //.thenAccept(v -> writer.close())
                //.thenAccept(v -> asyncContext.complete())
                .thenAccept(v -> {
                    // 获取最终 state
                    StateSnapshot<? extends AgentState> finalState = finalCompiledGraph.getState(runnableConfig(persistentConfig));
                    // 提取最终结果
                    AgentState state = finalState.state();
                    String userInput = (String) state.value("input").orElseThrow();
                    String reportResult = (String) state.value("report_output").orElseThrow();
                    // 存储到数据库
                    aigcReportService.saveReport(threadId,userInput,reportResult,1);
                    writer.close();
                    asyncContext.complete();
                })
                .exceptionally(e -> {
                    log.error("Error streaming", e);
                    writer.close();
                    asyncContext.complete();
                    return null;
                });

            } catch (Throwable e) {
                log.error("Error streaming", e);
                throw new ServletException(e);
            }
        }

通过以上的介绍,我们确认了通过自定义异常实现中断,通过调整resume为true恢复执行。基于以上两个机制,我们可以实现一个简单的带有human-in-loop功能的节点:

public class HumanFeedbackNode implements NodeAction<State> {

    private static final Logger log = LoggerFactory.getLogger(HumanFeedbackNode.class);

    // always or conditioned
    private final String interruptStrategy;

    private Function<State, Boolean> interruptCondition;

    private Function<State, Map<String, Object>> stateUpdateFunc;

    public HumanFeedbackNode() {
        this.interruptStrategy = "always";
    }

    public HumanFeedbackNode(String interruptStrategy, Function<State, Boolean> interruptCondition) {
        this.interruptStrategy = interruptStrategy;
        this.interruptCondition = interruptCondition;
    }

    public HumanFeedbackNode(String interruptStrategy, Function<State, Boolean> interruptCondition,
                     Function<State, Map<String, Object>> stateUpdateFunc) {
        this.interruptStrategy = interruptStrategy;
        this.interruptCondition = interruptCondition;
        this.stateUpdateFunc = stateUpdateFunc;
    }

    @Override
    public Map<String, Object> apply(State state) throws GraphInterruptException{
        var shouldInterrupt = "always".equals(interruptStrategy)
                || ("conditioned".equals(interruptStrategy) && interruptCondition.apply(state));
        if (shouldInterrupt) {
            interrupt(state);
            Map<String, Object> data = new HashMap<>();
            if (stateUpdateFunc != null) {
                data = stateUpdateFunc.apply(state);
            }
            else {
                data = state.data();
            }
            return AgentState.updateState(data, Map.of(RESUME, false), State.SCHEMA);
        }
        return Map.of();
    }

    /**
     * 中断流程
     * @param state
     * @throws GraphInterruptException
     */
    private void interrupt(State state) throws GraphInterruptException{
        if (!state.isResume()) {
            log.warn("[HumanFeedbackNode] 流程中断了...");
            throw new GraphInterruptException("interrupt");
        }
    }
}
代码实现说明

构造函数:提供了多种构造方式,允许设置不同的中断策略(始终中断、条件中断等)。
apply方法:这是节点的主要逻辑所在。根据中断策略判断是否需要中断流程,如果需要,则调用interrupt方法并抛出GraphInterruptException。
interrupt方法:检查State状态中的resume的值,如果不为true,则抛出异常中断流程。
状态更新:在中断之后,可以选择使用自定义的状态更新函数或者直接返回当前状态的数据

五、总结

LangGraph4j的“Human-in-Loop”功能为智能工作流的提供了一个强大的工具。通过将人类的智慧与机器的效率相结合,它不仅提高了工作流的灵活性和精准性,还为企业和开发者提供了更多的控制权和安全保障。同时,“Human-in-Loop”功能也为我们提供了一个重要的启示:技术的进步并非意味着完全取代人类,而是通过与人类智慧的结合,实现更加高效、精准和人性化的解决方案。


网站公告

今日签到

点亮在社区的每一天
去签到