简介
本文详细介绍了基于LangGraph 1.0+实现的人机协同(HITL)系统,重点解析了Interrupt机制的工作原理与实现方法。文章从系统架构设计到前后端代码实现,全面展示了如何构建工业级人机交互系统,以智能设备维护预测为应用场景,实现设备监控、故障预测、维护规划及工程师审批的完整流程。项目包含完整源码,适合大模型应用开发学习与实践。
摘要
人机协同(Human-in-the-Loop, HITL)是现代AI系统中的关键能力,它允许在AI工作流的特定节点插入人工干预,实现更精准、更可靠的业务流程。本文将深入分析一个基于LangGraph 1.0+实现的完整HITL系统,从架构设计到代码实现,从原理剖析到最佳实践,全方位讲解如何在实际项目中构建工业级的人机交互系统。
目录
- HITL系统概述
- 系统架构设计
- LangGraph Interrupt机制深度解析
- 后端实现:工作流与状态管理
- 前端实现:状态同步与交互界面
- 关键代码深度剖析
- 开发要点与最佳实践
- 总结
- 项目源代码下载
一、HITL系统概述
1.1 什么是HITL?
Human-in-the-Loop(人机协同)是一种将人工智能与人类智慧相结合的系统设计模式。在HITL系统中,AI系统可以在关键决策点暂停执行,等待人类专家的输入、审批或修改,然后将人类反馈整合到后续流程中。
HITL的核心价值在于结合了AI的效率优势和人类的判断能力,特别适用于高风险、高精度要求的业务场景,如医疗诊断、金融风控、工业质检等领域。
1.2 业务场景
本项目以智能设备维护预测为应用场景,展示了完整的HITL流程:
图1:智能设备维护预测HITL流程
在这个场景中,系统会持续监控设备传感器数据,通过AI分析预测潜在故障风险。当检测到高风险情况时,系统会暂停并等待工程师审批,而低风险情况则可以自动执行维护计划。这种设计既保证了高风险决策的可靠性,又提高了整体流程的效率。
1.3 系统界面展示
- 系统首页:
- 待审批页面(初始为空):
- 智能体流程中断,提示人类审批:
- 待审批页面显示审批请求:
- 人类审批:
- 人类审批后,智能体流程恢复执行完成
针对所有自学遇到困难的同学们,我帮大家系统梳理大模型学习脉络,将这份LLM大模型资料分享出来:包括LLM大模型书籍、640套大模型行业报告、LLM大模型学习视频、LLM大模型学习路线、开源大模型学习教程等, 😝有需要的小伙伴,可以扫描下方二维码领取🆓↓↓↓
二、系统架构设计
2.1 整体架构
系统采用分层架构设计,各层职责明确,通过标准化接口通信,确保系统的可维护性和可扩展性。核心的HITL逻辑由LangGraph实现,负责工作流的定义、执行、中断和恢复。
图2:系统整体架构图
2.2 核心组件说明
| 组件 | 职责 | 技术栈 |
|---|---|---|
| 前端界面 | 用户交互、状态展示、实时更新 | React 18 + TypeScript + Vite |
| 审批API服务器 | 前后端通信桥梁 | Express.js + Node.js |
| 工作流引擎 | 业务逻辑执行、HITL控制 | LangGraph 1.0+ + FastAPI |
| 状态管理 | 工作流状态持久化 | Checkpointer + SQLite |
| 通信机制 | 状态同步 | HTTP + Polling |
- 架构设计考量:系统采用前后端分离架构,通过API进行通信,使得前后端可以独立开发、测试和部署。核心的工作流逻辑与业务API分离,提高了代码的复用性和可维护性。
三、 LangGraph Interrupt机制深度解析
3.1 什么是Interrupt?
Interrupt是LangGraph 1.0+引入的强大机制,允许在节点执行过程中暂停工作流,等待外部事件(如人工审批)后再恢复执行。
Interrupt机制是实现HITL的核心技术,它突破了传统工作流"一跑到底"的限制,使得工作流可以在任意节点暂停和恢复,为人类干预提供了可能。
3.2 工作原理
Interrupt机制的工作流程可以分为以下几个关键步骤:
- 工作流执行:客户端触发工作流,API调用LangGraph的invoke方法开始执行
- 节点处理:工作流按定义的节点顺序执行,直到遇到中断节点
- 触发中断:中断节点调用interrupt()方法,工作流暂停执行
- 状态保存:Checkpointer自动保存当前工作流状态到持久化存储
- 等待审批:前端收到中断状态,展示审批界面等待人工输入
- 提交决策:人类专家完成审批,前端提交审批结果
- 恢复执行:API调用带有resume参数的invoke方法,工作流从中断处恢复
- 处理结果:节点继续执行,处理审批结果,完成后续流程
图3:Interrupt/Resume机制时序图
💡深入理解:为什么 LangGraph 能精准“冻结”?
许多开发者会好奇:为什么 interrupt() 调用后,整个 Python 函数能像按了暂停键一样,并在几小时甚至几天后恢复?
- 状态快照(State Snapshots):每当工作流到达一个节点或触发 interrupt 时,Checkpointer 会将当前的 State 字典序列化并存入数据库(如 SQLite)。这个快照包含了当前所有变量的副本。
- 线程隔离与恢复:通过 thread_id,LangGraph 能在数据库中定位唯一的执行历史。当你调用 resume 时,系统并不是重新运行整个函数,而是**重放(Replay)**历史状态,并将 resume 的输入直接注入到 interrupt() 的返回值中。
- 不可变性原则:这种设计要求 State 中的数据必须是可序列化的 JSON 对象,这也就是为什么我们在代码中强调需要 clean_result_for_json 的原因。
3.3 关键特性
特性1:状态自动持久化
from langgraph.checkpoint.sqlite import SqliteSaverfrom langgraph.graph import StateGraph# 配置Checkpointercheckpointer = SqliteSaver.from_conn_string("sqlite:///state.db")# 构建工作流时指定checkpointerbuilder = StateGraph(state_schema)builder.compile( checkpointer=checkpointer, interrupt_before=["engineer_review"], # 可选:在指定节点前中断 interrupt_after=["engineer_review"] # 可选:在指定节点后中断)技术解析:Checkpointer是LangGraph提供的状态持久化机制,支持多种存储后端(如SQLite、PostgreSQL、Redis等)。当工作流中断时,Checkpointer会自动保存当前状态,包括:当前节点、状态数据、执行历史等关键信息,为后续恢复提供完整的上下文。
特性2:线程级状态隔离
# 每次调用都使用唯一线程IDthread_id = f"workflow_{equipment_id}_{int(time.time())}"# 工作流执行result = graph.invoke( initial_state, config={ "configurable": {"thread_id": thread_id}, # 或者使用resume参数恢复 "resume": human_decision })技术解析:thread_id是工作流实例的唯一标识,类似于会话ID。通过为每个工作流实例分配唯一的thread_id,Checkpointer可以区分不同的工作流状态,避免状态混淆。在分布式环境中,thread_id还可以用于跨服务节点定位工作流实例。
特性3:可中断节点
from langgraph.types import Commanddef engineer_review_node(self, state: Dict[str, Any]) -> Command: """工程师审查节点 - 可中断""" # ... 业务逻辑 ... # 触发中断,等待人工审批 human_decision = interrupt(review_request) # 工作流恢复时从此处继续 # human_decision包含审批结果 # 返回下一个节点 return Command( goto="execution_feedback", resume=human_decision )设计优势:在节点内部触发中断的方式提供了最大的灵活性,可以根据业务逻辑动态决定是否需要中断,而不是静态配置在节点边界。这使得系统可以实现更复杂的HITL逻辑,如基于风险评估结果动态决定是否需要人工干预。
四、后端实现:工作流与状态管理
4.1 工作流定义
文件:backend/app/agents/graph_builder.py
from langgraph.graph import StateGraph, ENDfrom langgraph.types import Command, interruptfrom typing import Dict, Any, Literalclass MaintenanceGraph: """维护预测工作流图""" def __init__(self): self.builder = StateGraph(dict) self._setup_nodes() self._setup_edges() self.graph = self.builder.compile( checkpointer=SqliteSaver.from_conn_string("sqlite:///./state.db") ) def _setup_nodes(self): """注册所有节点""" self.builder.add_node("equipment_monitor", self.equipment_monitor_node) self.builder.add_node("fault_prediction", self.fault_prediction_node) self.builder.add_node("maintenance_planning", self.maintenance_planning_node) self.builder.add_node("engineer_review", self.engineer_review_node) # 可中断节点 self.builder.add_node("execution_feedback", self.execution_feedback_node) self.builder.add_node("knowledge_update", self.knowledge_update_node) def engineer_review_node(self, state: Dict[str, Any]) -> Command: """ 工程师审查节点 - HITL核心实现 这个节点是整个HITL系统的关键: 1. 构建审查请求 2. 通知前端审批API 3. 触发interrupt暂停工作流 4. 等待人工审批 5. 恢复后处理审批结果 """ equipment_id = state.get("equipment_id") maintenance_plan = state.get("maintenance_plan") # 构建审查请求 review_request = { "type": "maintenance_plan_review", "equipment_id": equipment_id, "plan": maintenance_plan, "options": { "accept": "批准此维护计划", "edit": "修改计划内容", "feedback": "提供改进建议" } } # 通知前端审批API(可选) try: self._notify_frontend(review_request) except Exception as e: print(f"⚠️ 通知前端失败: {e}") # ⭐ 核心:触发中断,等待人工审批 # 工作流将暂停在此处,直到通过resume API恢复 human_decision = interrupt(review_request) # ⭐ 工作流恢复后的处理逻辑 # human_decision包含审批结果 decision_type = human_decision.get("type") if decision_type == "edit": # 处理编辑 edited_plan = human_decision.get("edited_plan") state["maintenance_plan"] = edited_plan state["decision"] = "edited" elif decision_type == "feedback": # 处理反馈 feedback = human_decision.get("feedback") state["feedback"] = feedback state["decision"] = "feedback_provided" else: # accept # 批准 state["decision"] = "approved" # 返回下一个节点 return Command( goto="execution_feedback", resume=human_decision ) def _notify_frontend(self, review_request: Dict[str, Any]): """通知前端审批系统""" import requests response = requests.post( 'http://localhost:3001/api/approval/request', json={ 'approvalId': f"APP_{review_request['equipment_id']}_{int(time.time())}", 'equipmentId': review_request['equipment_id'], 'plan': review_request['plan'], 'interruptData': review_request, 'notificationUrl': f"http://localhost:5173/approval" }, timeout=5 ) print(f"✅ 已通知前端审批服务器")工作流设计解析
MaintenanceGraph类定义了一个包含6个节点的完整工作流,其中engineer_review节点是实现HITL的核心。这个节点的实现包含以下关键步骤:
- 构建审查请求:收集设备ID、维护计划等关键信息,构建结构化的审查请求数据
- 通知前端:主动通知前端系统有新的审批请求,可通过WebSocket或HTTP请求实现
- 触发中断:调用interrupt()方法暂停工作流,传递审查请求数据
- 等待并处理审批结果:工作流恢复后,根据审批类型(批准/编辑/反馈)处理结果
- 返回下一个节点:通过Command对象指定下一个要执行的节点
节点间的数据传递通过state对象实现,这是一个字典结构,可以在工作流执行过程中持续累积和修改数据。这种设计使得节点间的通信简单直接,同时保持了节点的独立性。
🧠架构视角:Command 对象的妙用
在早期的 LangGraph 版本中,节点通常只返回更新后的状态。而在 1.0+ 版本中,引入了 Command 对象:
- 控制权显式化:通过 Command(goto=“…”),节点不再仅仅是数据处理器,它拥有了显式的路由控制权。
- Resume 语义:Command 结构中的 resume 字段是专门为 HITL 设计的。它告诉工作流引擎:“我已经拿到了人工干预的结果,请将其作为上次中断的补偿输入,并继续流转到下一个目标。”。
- 解耦节点逻辑:这种方式避免了在状态机中写死复杂的 if/else 跳转,让每个节点的逻辑更加内聚。
4.2 API端点实现
文件:backend/app/api/agent.py
from fastapi import APIRouter, HTTPExceptionfrom typing import Dict, Anyfrom datetime import datetimeimport jsonrouter = APIRouter()# 存储工作流状态的内存字典# 生产环境应使用数据库workflow_states = {}@router.post("/agent/workflow/{thread_id}/resume")async def resume_workflow( thread_id: str, human_input: Dict[str, Any]): """ 恢复被中断的工作流 这是HITL系统的核心API: 1. 接收审批决策 2. 通过Command(resume=...)恢复工作流 3. 返回最终结果 """ try: print(f"[RESUME] 收到恢复请求 - 线程ID: {thread_id}") print(f"[RESUME] 人机输入: {human_input}") if thread_id not in workflow_states: raise HTTPException(status_code=404, detail="工作流不存在") state = workflow_states[thread_id] print(f"[RESUME] 当前状态: {state.get('status')}") if state.get("status") != "awaiting_human_input": raise HTTPException(status_code=400, detail="工作流未中断或已恢复") # 更新工作流状态 state["status"] = "resuming" state["human_input"] = human_input state["resumed_at"] = datetime.utcnow().isoformat() # ⭐ 关键:使用Command(resume=...)恢复工作流 # resume_value将作为interrupt()的返回值 maintenance_graph = get_maintenance_graph() # 获取当前状态 current_state = state.get("initial_state", {}) # 恢复工作流 # 第一个参数是当前状态 # config中的resume参数是恢复时的输入 result = await maintenance_graph.graph.ainvoke( current_state, config={ "configurable": {"thread_id": thread_id}, "resume": human_input # ⭐ 关键:传递给interrupt() } ) # 更新工作流状态为完成 workflow_states[thread_id]["status"] = "completed" workflow_states[thread_id]["completed_at"] = datetime.utcnow().isoformat() # 清理结果以便JSON序列化 cleaned_result = clean_result_for_json(result) return { "success": True, "message": "工作流已恢复并完成", "data": { "thread_id": thread_id, "status": "completed", "workflow_result": cleaned_result } } except HTTPException: raise except Exception as e: error_msg = f"恢复工作流失败: {str(e)}" print(f"❌ {error_msg}") raise HTTPException(status_code=500, detail=error_msg)def clean_result_for_json(result): """ 清理结果中的不可序列化对象 这是生产环境必须的处理: LangGraph返回的结果可能包含复杂对象(如Interrupt), 需要转换为JSON可序列化的格式 """ if result is None: return {} if not isinstance(result, dict): return result cleaned = {} for key, value in result.items(): try: if key == "__interrupt__": # 特殊处理Interrupt对象 cleaned[key] = [{"type": "interrupt", "handled": True}] elif hasattr(value, '__dict__') or 'Interrupt' in str(type(value)): # 特殊处理所有对象类型 cleaned[key] = str(value) else: # 尝试JSON序列化以验证可序列化 json.dumps(value) cleaned[key] = value except (TypeError, ValueError) as e: print(f"⚠️ 键 {key} 不可序列化,转换为字符串: {e}") cleaned[key] = str(value) return cleaned@router.get("/agent/workflow/status_by_thread/{thread_id}")async def get_workflow_status_by_thread(thread_id: str): """ 查询工作流状态 前端轮询此API来检测工作流是否完成 """ try: if thread_id not in workflow_states: return { "success": True, "data": { "thread_id": thread_id, "status": "not_found", "message": "工作流不存在" } } state = workflow_states[thread_id] status = { "thread_id": thread_id, "equipment_id": state.get("equipment_id"), "current_step": state.get("current_step"), "status": state.get("status"), # 关键:completed/awaiting_human_input "created_at": state.get("created_at"), "awaiting_human_input": state.get("status") == "awaiting_human_input", "completed_at": state.get("completed_at"), "resumed_at": state.get("resumed_at") } return { "success": True, "data": status } except Exception as e: raise HTTPException(status_code=500, detail=f"获取工作流状态失败: {str(e)}")API设计解析
后端API层提供了两个核心接口用于实现HITL功能:
1. 工作流恢复API (/agent/workflow/{thread_id}/resume)
该API负责接收人工审批决策,并恢复被中断的工作流。其核心逻辑包括:
- 验证工作流状态,确保工作流存在且处于"awaiting_human_input"状态
- 更新工作流状态为"resuming"
- 调用LangGraph的ainvoke方法,通过config中的"resume"参数传递审批决策
- 工作流完成后,更新状态为"completed"
- 清理结果数据,确保可以JSON序列化
- 返回执行结果
核心技术点:通过在config中设置"resume"参数,LangGraph会自动将该值作为interrupt()方法的返回值,从而实现工作流从中断处恢复执行。这种设计使得工作流恢复逻辑与正常执行逻辑无缝集成,简化了系统设计。 :::
2. 状态查询API (/agent/workflow/status_by_thread/{thread_id})
该API提供工作流状态查询功能,支持前端轮询获取最新状态。返回的状态信息包括:
- 工作流ID (thread_id)
- 设备ID (equipment_id)
- 当前步骤 (current_step)
- 状态 (status) - 如"completed"、"awaiting_human_input"等
- 时间戳信息 - 创建时间、恢复时间、完成时间
- 是否等待人工输入 (awaiting_human_input)
生产环境考虑:示例代码中使用内存字典(workflow_states)存储工作流状态,这仅适用于开发环境。在生产环境中,应使用分布式缓存(如Redis)或数据库来存储状态,确保系统的可扩展性和可靠性。
针对所有自学遇到困难的同学们,我帮大家系统梳理大模型学习脉络,将这份LLM大模型资料分享出来:包括LLM大模型书籍、640套大模型行业报告、LLM大模型学习视频、LLM大模型学习路线、开源大模型学习教程等, 😝有需要的小伙伴,可以扫描下方二维码领取🆓↓↓↓
五、前端实现:状态同步与交互界面
5.1 状态管理
文件:frontend/smart-maintenance-frontend/src/pages/AgentInterface.tsx
import React, { useState, useEffect, useRef } from 'react';interface AgentMessage { role: 'user' | 'assistant' | 'system'; content: string; timestamp: string;}const AgentInterface: React.FC = () => { // 核心状态 const [messages, setMessages] = useState<AgentMessage[]>([]); const [currentStep, setCurrentStep] = useState(''); const [agentStatus, setAgentStatus] = useState<'idle' | 'processing' | 'completed'>('idle'); const [currentThreadId, setCurrentThreadId] = useState<string | null>(null); // ⭐ 状态持久化:自动保存到sessionStorage const saveWorkflowState = (newMessages?: AgentMessage[], newCurrentStep?: string) => { if (newMessages) { sessionStorage.setItem('workflowMessages', JSON.stringify(newMessages)); } if (newCurrentStep !== undefined) { sessionStorage.setItem('workflowCurrentStep', newCurrentStep); } }; // ⭐ 状态恢复:从sessionStorage读取 const restoreWorkflowState = () => { const savedMessages = sessionStorage.getItem('workflowMessages'); const savedCurrentStep = sessionStorage.getItem('workflowCurrentStep'); const savedThreadId = sessionStorage.getItem('currentThreadId'); const savedAgentStatus = sessionStorage.getItem('agentStatus'); let restored = false; if (savedMessages) { try { const messages = JSON.parse(savedMessages); setMessages(messages); restored = true; } catch (e) { console.error('解析保存的消息失败:', e); } } if (savedCurrentStep) { setCurrentStep(savedCurrentStep); restored = true; } if (savedThreadId && savedAgentStatus) { setCurrentThreadId(savedThreadId); setAgentStatus(savedAgentStatus as any); restored = true; } return restored; }; // 页面加载时恢复状态 useEffect(() => { const hasSavedState = restoreWorkflowState(); if (!hasSavedState) { initializeAgent(); } }, []); // 状态变化时自动保存 useEffect(() => { if (messages.length > 0) { saveWorkflowState(messages); } }, [messages]);前端状态管理解析
前端状态管理是HITL系统用户体验的关键,直接影响用户对工作流状态的感知和操作。本实现通过以下机制确保状态的可靠性和一致性:
1. 核心状态定义
定义了四个核心状态变量:
messages- 存储工作流交互消息列表currentStep- 当前工作流步骤agentStatus- 工作流状态(idle/processing/completed)currentThreadId- 当前工作流ID
2. 状态持久化与恢复
通过sessionStorage实现状态的持久化存储和恢复:
saveWorkflowState- 将消息列表和当前步骤保存到sessionStoragerestoreWorkflowState- 从sessionStorage读取并恢复状态
用户体验优化:状态持久化确保用户在页面刷新或意外关闭后仍能恢复到之前的工作状态,这对于需要长时间审批或处理的工作流尤为重要。实现时需要注意处理JSON解析错误,确保系统的健壮性。
3. 自动保存机制
通过React的useEffect钩子实现状态变化时的自动保存:
// 状态变化时自动保存useEffect(() => { if (messages.length > 0) { saveWorkflowState(messages); }}, [messages]);这种设计确保状态变化时能够及时保存,避免数据丢失。
5.2 轮询机制
// ⭐ 核心:轮询检测工作流状态const pollWorkflowStatus = async (threadId: string) => { console.log(`🔄 [FRONTEND] 开始轮询工作流状态 - 线程ID: ${threadId}`); // 每秒检查一次 const checkInterval = setInterval(async () => { try { const statusResponse = await agentAPI.getWorkflowStatusByThread(threadId); const status = statusResponse.data; console.log('📊 [FRONTEND] 工作流状态响应:', status); if (status?.status === 'completed') { // ⭐ 检测到工作流完成 console.log('✅ [FRONTEND] 检测到工作流已完成!'); clearInterval(checkInterval); setAgentStatus('completed'); // 添加完成消息 const completedMessage: AgentMessage = { role: 'assistant', content: `✅ **工作流已完成!**\n\n📊 工作流已恢复并执行完毕。\n\n所有步骤:\n• 设备监控\n• 故障预测\n• 维护规划\n• 工程师审查\n• 执行反馈\n• 知识更新\n\n🎯 维护预测流程完成!`, timestamp: new Date().toISOString() }; // 更新消息列表 setMessages(prev => { const newMessages = [...prev, completedMessage]; // ⭐ 立即保存状态 saveWorkflowState(newMessages); return newMessages; }); toast.success('工作流已完成!'); setCurrentStep(''); } else if (status?.status === 'awaiting_human_input') { // 等待审批中 setCurrentStep(`${status?.current_step} - 等待工程师审批...`); } else if (status?.status === 'running' || status?.status === 'resuming') { // 执行中 setCurrentStep(status?.current_step || ''); } } catch (error) { console.error('❌ 轮询工作流状态出错:', error); // 继续轮询,不中断 } }, 1000); // 60秒后超时 setTimeout(() => { console.log('⏰ 轮询超时,停止检查工作流状态'); clearInterval(checkInterval); }, 60000);};轮询机制解析
轮询机制是前端获取工作流状态更新的关键,其核心逻辑包括:
- 使用setInterval设置每秒执行一次状态查询
- 调用状态查询API获取最新状态
- 根据返回状态执行不同逻辑:
- completed- 工作流完成,更新UI显示完成消息,清除轮询定时器
- awaiting_human_input- 等待人工输入,更新当前步骤提示
- running/resuming- 工作流执行中,更新当前步骤
- 设置60秒超时机制,避免无限轮询
- 错误处理 - 记录错误但不中断轮询
技术选型考量:示例中使用轮询机制实现状态同步,这是一种简单可靠的方案,适用于中小规模应用。对于大规模应用或对实时性要求更高的场景,可以考虑使用WebSocket或Server-Sent Events (SSE)替代轮询,减少服务器负载并提高实时性。
六、关键代码深度剖析
6.1 Interrupt机制的核心代码分析
关键点1:中断触发
def engineer_review_node(self, state: Dict[str, Any]) -> Command: """ 理解interrupt()的行为: 1. 当执行到interrupt()时,工作流立即暂停 2. interrupt()的参数会被保存,并在恢复时返回 3. 函数在此处"冻结",直到工作流被恢复 4. 恢复时,代码从此处继续执行 """ # 构建要传递给审批系统的数据 review_request = { "type": "maintenance_plan_review", "equipment_id": state.get("equipment_id"), "plan": state.get("maintenance_plan"), "risk_level": state.get("risk_level") } # ⭐ 触发中断 - 这是关键! # 工作流将暂停在此处,等待外部恢复 human_decision = interrupt(review_request) # ⭐ 工作流恢复后从此处继续执行 # human_decision包含审批结果 print(f"收到审批决策: {human_decision}") # 处理审批结果 if human_decision.get("type") == "accept": state["approved"] = True elif human_decision.get("type") == "edit": state["approved"] = True state["modified_plan"] = human_decision.get("edited_plan") elif human_decision.get("type") == "feedback": state["feedback"] = human_decision.get("feedback") # 返回下一个节点 return Command( goto="execution_feedback", # 下一个要执行的节点 resume=human_decision # 可选:传递数据给下一个节点 )深度解析:interrupt()方法是实现HITL的核心。当调用此方法时,LangGraph会执行以下操作:
- 保存当前节点的执行状态,包括函数调用栈和局部变量
- 将interrupt()的参数保存到持久化存储(通过Checkpointer)
- 返回一个特殊的中断标记,通知工作流引擎暂停执行
- 工作流引擎返回当前状态给API调用者
当工作流恢复时,LangGraph会:
- 从持久化存储中加载保存的状态
- 恢复函数执行环境
- 将resume参数作为interrupt()方法的返回值
- 从interrupt()调用处继续执行代码
关键点2:工作流恢复
async def resume_workflow(thread_id: str, human_input: Dict[str, Any]): """ 恢复工作流的核心逻辑: 1. 接收人工审批决策 2. 使用相同的thread_id 3. 通过config中的resume参数传递决策数据 4. LangGraph会自动从interrupt()处恢复执行 """ # 获取当前工作流状态 current_state = get_current_state(thread_id) # ⭐ 关键:使用resume参数恢复工作流 # 第一个参数是当前状态 # config中的resume是恢复时的输入 result = await graph.ainvoke( current_state, # 当前状态 config={ "configurable": {"thread_id": thread_id}, "resume": human_input # ⭐ 传递给interrupt()的返回值 } ) # 返回结果 return result技术解析:工作流恢复的关键在于使用与中断时相同的thread_id,并在config中提供resume参数。LangGraph通过以下机制实现恢复:
- 通过thread_id定位到被中断的工作流实例
- 从Checkpointer加载保存的工作流状态
- 将resume参数作为interrupt()的返回值
- 恢复工作流执行,从interrupt()调用处继续执行
- 执行完成后返回最终结果
这种设计使得工作流恢复逻辑与正常执行逻辑无缝集成,开发者无需编写额外的恢复处理代码。
关键点3:前端轮询检测
const pollWorkflowStatus = async (threadId: string) => { const checkInterval = setInterval(async () => { // 查询工作流状态 const status = await agentAPI.getWorkflowStatusByThread(threadId); if (status.data.status === 'completed') { // ⭐ 检测到完成,添加完成消息 const completedMessage = { role: 'assistant', content: '✅ 工作流已完成!', timestamp: new Date().toISOString() }; setMessages(prev => [...prev, completedMessage]); clearInterval(checkInterval); // 停止轮询 } }, 1000); // 每秒检查一次};实时性与性能平衡:轮询间隔的选择需要在实时性和服务器负载之间取得平衡。示例中使用1秒间隔,这对于大多数企业应用已经足够。对于实时性要求更高的场景,可以考虑:
- 实现自适应轮询 - 根据工作流状态动态调整轮询间隔
- 使用WebSocket实现真正的实时通信
- 采用长轮询(Long Polling)技术减少无效请求
无论采用何种技术,都需要实现超时机制,避免无限轮询导致的资源浪费。
6.2 状态持久化机制
// 自动保存状态useEffect(() => { // 消息变化时保存 if (messages.length > 0) { sessionStorage.setItem('workflowMessages', JSON.stringify(messages)); }}, [messages]);useEffect(() => { // 状态变化时保存 if (currentStep) { sessionStorage.setItem('workflowCurrentStep', currentStep); }}, [currentStep]);// 页面加载时恢复状态const restoreWorkflowState = () => { const savedMessages = sessionStorage.getItem('workflowMessages'); const savedCurrentStep = sessionStorage.getItem('workflowCurrentStep'); if (savedMessages) { const messages = JSON.parse(savedMessages); setMessages(messages); // 恢复消息列表 } if (savedCurrentStep) { setCurrentStep(savedCurrentStep); // 恢复当前步骤 }};状态管理策略:前端状态持久化是提升用户体验的关键功能,实现时需要考虑以下几点:
- 存储选择:sessionStorage适用于临时会话状态,localStorage适用于长期保存的数据
- 数据结构:确保存储的数据可以被JSON序列化和解析
- 性能优化:避免过于频繁的存储操作,可以使用防抖(debounce)技术优化
- 错误处理:处理JSON解析错误和存储容量限制等异常情况
- 安全考虑:避免在客户端存储敏感信息
七、开发要点与最佳实践
7.1 Interrupt开发要点
要点1:正确配置Checkpointer
# ✅ 正确:必须配置Checkpointerbuilder.compile( checkpointer=SqliteSaver.from_conn_string("sqlite:///./state.db"))# ❌ 错误:没有配置Checkpointerbuilder.compile() # 无法使用interrupt!重要提示:Checkpointer是使用Interrupt机制的前提条件。没有配置Checkpointer的工作流无法实现状态持久化,因此也无法支持中断和恢复功能。在开发环境中可以使用SQLiteSaver,而在生产环境中应考虑使用更强大的数据库如PostgreSQL或分布式存储如Redis。
🛡️工业级补充:审批环节的安全防范
虽然示例代码为了简洁未加入权限校验,但在生产环境中,HITL 必须考虑:
- 身份校验:在 resume_workflow API 中,必须验证提交决策的用户是否有权限操作该设备。
- 输入验证:人类是不可靠的。对 editedPlan 的回传必须进行严格的 Schema 校验,防止由于错误的 JSON 格式导致工作流在后续节点崩溃。
- 审计日志:每一次 interrupt 到 resume 的过程都应记录:谁在什么时间、基于什么数据、做了什么决定。
要点2:使用唯一的线程ID
# ✅ 正确:每次调用使用唯一线程IDthread_id = f"workflow_{equipment_id}_{int(time.time())}"result = graph.invoke(initial_state, config={"configurable": {"thread_id": thread_id}})# ❌ 错误:使用相同线程ID会导致状态冲突result = graph.invoke(initial_state) # 使用默认线程ID最佳实践:thread_id的设计应确保唯一性,避免不同工作流实例之间的状态冲突。推荐的thread_id生成策略包括:
- 结合业务ID(如设备ID、用户ID)和时间戳
- 使用UUID/GUID
- 结合分布式ID生成器(如Snowflake算法)
同时,thread_id应包含足够的业务信息,便于问题排查和监控。
要点3:合理使用interrupt_before/after
# 在指定节点前中断builder.compile( checkpointer=checkpointer, interrupt_before=["engineer_review"] # 在engineer_review节点前中断)# 在指定节点后中断builder.compile( checkpointer=checkpointer, interrupt_after=["engineer_review"] # 在engineer_review节点后中断)# 在节点内手动控制中断(推荐)def engineer_review_node(self, state): # 业务逻辑... human_decision = interrupt(data) # 手动控制中断时机 # 恢复后逻辑... return Command(goto="next")技术选型建议:
- 静态配置(interrupt_before/interrupt_after):适用于固定需要人工干预的节点,配置简单
- 动态中断(节点内调用interrupt()):适用于需要根据业务逻辑动态决定是否中断的场景,灵活性更高
实际项目中推荐使用动态中断方式,因为它允许根据实时业务数据(如风险评估结果)决定是否需要人工干预,更符合HITL的核心价值。
7.2 错误处理最佳实践
实践1:JSON序列化保护
def clean_result_for_json(result): """清理不可序列化对象""" cleaned = {} for key, value in result.items(): try: # 尝试JSON序列化 json.dumps(value) cleaned[key] = value except (TypeError, ValueError): # 转换不可序列化的对象 cleaned[key] = str(value) return cleaned最佳实践:LangGraph工作流返回的结果可能包含复杂对象,这些对象无法直接序列化为JSON。在API返回结果前进行清理处理,可以避免序列化错误。关键措施包括:
- 识别并特殊处理不可序列化的对象类型
- 提供友好的错误提示和日志记录
- 将复杂对象转换为字符串或简单字典表示
- 在开发阶段进行充分的序列化测试
实践2:超时处理
// 前端轮询超时setTimeout(() => { clearInterval(checkInterval); console.log('⏰ 轮询超时');}, 60000); // 60秒超时超时处理是确保系统稳定性的重要措施,应在以下场景实现:
- API调用:设置合理的请求超时时间
- 轮询机制:设置最大轮询时长,避免无限轮询
- 工作流执行:设置工作流整体执行超时时间
- 人工审批:设置审批超时自动处理机制
超时时间的设置应根据业务场景调整,太短可能导致正常流程被中断,太长则可能影响用户体验或导致资源泄漏。
实践3:异常捕获
async def resume_workflow(thread_id: str, human_input: Dict[str, Any]): try: # 恢复工作流 result = await graph.ainvoke(...) return {"success": True, "data": result} except Exception as e: # 记录错误 print(f"❌ 恢复工作流失败: {e}") # 返回友好错误 raise HTTPException(status_code=500, detail=str(e))异常处理最佳实践:
- 分层处理:在API层统一处理异常,避免将内部错误直接暴露给客户端
- 详细日志:记录异常堆栈信息,便于问题排查
- 指标监控:记录异常指标,支持监控和告警
- 用户友好:返回清晰的错误信息,指导用户如何处理
- 区分异常类型:对已知异常和未知异常分别处理
7.3 性能优化
优化1:减少轮询频率
// ✅ 合理:每秒轮询一次setInterval(async () => { const status = await checkStatus();}, 1000);// ❌ 过于频繁:每100ms轮询setInterval(async () => { const status = await checkStatus();}, 100); // 可能导致服务器压力过大轮询频率优化建议:
- 默认轮询间隔:1-5秒,根据业务实时性要求调整
- 自适应轮询:根据工作流状态动态调整间隔,如"等待审批"状态可增大间隔
- 批量请求:一次请求获取多个工作流状态,减少请求次数
- 长轮询:服务器在有数据更新时才返回响应,减少无效请求
轮询优化可以显著减少服务器负载和网络流量,特别是在系统规模扩大时效果更为明显。
优化2:使用WebSocket替代轮询(可选)
// WebSocket实现(更高效)const ws = new WebSocket('ws://localhost:8000/ws');ws.onmessage = (event) => { const status = JSON.parse(event.data); if (status.type === 'workflow_completed') { // 处理完成 }};WebSocket相比轮询的优势:
- 实时性更高:服务器可以主动推送状态更新
- 减少网络流量:无需频繁发送重复的请求头
- 降低服务器负载:减少无效请求处理
- 更好的用户体验:状态更新无延迟
适用场景:用户数量适中、实时性要求高的应用。对于大规模应用,需要考虑WebSocket服务器的扩展性和负载均衡问题。
优化3:状态批量更新
// ✅ 批量更新减少重渲染setMessages(prev => { const newMessages = [...prev, msg1, msg2, msg3]; saveWorkflowState(newMessages); // 一次性保存 return newMessages;});前端性能优化建议:
- 批量更新:合并多个状态更新操作,减少React重渲染
- 防抖处理:对频繁触发的操作(如输入、滚动)使用防抖优化
- 懒加载:对大型组件或数据采用懒加载策略
- 不可变数据:使用不可变数据结构,优化React渲染性能
- 虚拟滚动:对长列表使用虚拟滚动,只渲染可见区域
八、总结
8.1 项目总结
本项目成功实现了一个完整的HITL(人机协同)系统,展示了以下核心技术:
- LangGraph Interrupt机制
- 标准的中断/恢复流程
- 状态自动持久化
- 线程级状态隔离
- 前后端解耦架构
- HTTP API通信
- 实时状态轮询
- 状态持久化
- 工业级质量
- 完整的错误处理
- 类型安全设计
- 性能优化
💡下一步行动:
如果你想将这套系统应用到自己的项目中,建议从以下三个步骤开始:
- 定义你的中断点:梳理业务流程,找出哪些地方 AI 容易出错或法律合规性要求必须人工参与。
- 实现持久化层:配置好 SQLite 或 PostgreSQL 的 Checkpointer。
- 构建 UI 闭环:不仅要能显示状态,还要能让用户直观地看到“AI 为什么要请你帮忙”(例如显示 AI 的置信度分数)。
8.2 应用场景
HITL系统适用于多种场景:
- 金融风控- 大额交易需要人工审批
- 医疗诊断- AI辅助诊断需要医生确认
- 内容审核- 用户生成内容需要人工审核
- 工业质检- 异常检测需要工程师确认
- 法律文档- AI生成的法律文档需要律师审查
九、如何学习AI大模型?
大模型时代,火爆出圈的LLM大模型让程序员们开始重新评估自己的本领。 “AI会取代那些行业?”“谁的饭碗又将不保了?”等问题热议不断。
不如成为「掌握AI工具的技术人」,毕竟AI时代,谁先尝试,谁就能占得先机!
想正式转到一些新兴的 AI 行业,不仅需要系统的学习AI大模型。同时也要跟已有的技能结合,辅助编程提效,或上手实操应用,增加自己的职场竞争力。
但是LLM相关的内容很多,现在网上的老课程老教材关于LLM又太少。所以现在小白入门就只能靠自学,学习成本和门槛很高
那么针对所有自学遇到困难的同学们,我帮大家系统梳理大模型学习脉络,将这份LLM大模型资料分享出来:包括LLM大模型书籍、640套大模型行业报告、LLM大模型学习视频、LLM大模型学习路线、开源大模型学习教程等, 😝有需要的小伙伴,可以扫描下方二维码领取🆓↓↓↓
学习路线
第一阶段: 从大模型系统设计入手,讲解大模型的主要方法;
第二阶段: 在通过大模型提示词工程从Prompts角度入手更好发挥模型的作用;
第三阶段: 大模型平台应用开发借助阿里云PAI平台构建电商领域虚拟试衣系统;
第四阶段: 大模型知识库应用开发以LangChain框架为例,构建物流行业咨询智能问答系统;
第五阶段: 大模型微调开发借助以大健康、新零售、新媒体领域构建适合当前领域大模型;
第六阶段: 以SD多模态大模型为主,搭建了文生图小程序案例;
第七阶段: 以大模型平台应用与开发为主,通过星火大模型,文心大模型等成熟大模型构建大模型行业应用。
👉学会后的收获:👈
• 基于大模型全栈工程实现(前端、后端、产品经理、设计、数据分析等),通过这门课可获得不同能力;
• 能够利用大模型解决相关实际项目需求: 大数据时代,越来越多的企业和机构需要处理海量数据,利用大模型技术可以更好地处理这些数据,提高数据分析和决策的准确性。因此,掌握大模型应用开发技能,可以让程序员更好地应对实际项目需求;
• 基于大模型和企业数据AI应用开发,实现大模型理论、掌握GPU算力、硬件、LangChain开发框架和项目实战技能, 学会Fine-tuning垂直训练大模型(数据准备、数据蒸馏、大模型部署)一站式掌握;
• 能够完成时下热门大模型垂直领域模型训练能力,提高程序员的编码能力: 大模型应用开发需要掌握机器学习算法、深度学习框架等技术,这些技术的掌握可以提高程序员的编码能力和分析能力,让程序员更加熟练地编写高质量的代码。
1.AI大模型学习路线图
2.100套AI大模型商业化落地方案
3.100集大模型视频教程
4.200本大模型PDF书籍
5.LLM面试题合集
6.AI产品经理资源合集
👉获取方式:
😝有需要的小伙伴,可以保存图片到wx扫描二v码免费领取【保证100%免费】🆓