AI 工作流引擎设计:从编排到执行的可复用流水线实践
一、从脚本拼接到工作流引擎:AI 自动化的工程化跃迁
当 AI 能力从单次对话扩展到多步骤任务时,开发者往往会经历一个演进过程:最初用 Python 脚本串联多个 API 调用,后来发现脚本越来越长、逻辑越来越复杂,最终不得不重新设计一套可复用的编排系统。这个演进过程的背后,是 AI 工作流从"能用"到"可靠"的工程化跃迁。
AI 工作流的核心痛点集中在三个层面。第一,步骤间的状态传递:上一步的输出是下一步的输入,但模型输出格式不稳定、中间结果可能缺失,导致后续步骤频繁失败。第二,错误恢复与重试:某个步骤超时或返回异常结果时,是从头开始还是从失败步骤恢复?不同策略对成本和延迟的影响截然不同。第三,并行与串行的编排:某些步骤可以并行执行以降低延迟,但并行步骤之间的依赖关系管理增加了编排复杂度。
这些痛点的本质,是缺乏一套声明式的工作流定义与执行引擎。开发者应该描述"做什么"而非"怎么做",让引擎负责调度、重试和状态管理。
二、工作流引擎架构:DAG 驱动的任务调度模型
AI 工作流引擎的核心数据结构是有向无环图(DAG)。每个工作流由多个节点组成,节点之间的边定义了数据依赖关系。引擎根据依赖关系自动确定执行顺序,无依赖的节点并行执行。
flowchart TD A[输入节点: 用户需求] --> B[意图分析节点] B --> C[知识检索节点] B --> D[上下文构建节点] C --> E[信息融合节点] D --> E E --> F[内容生成节点] F --> G[质量校验节点] G -- 通过 --> H[输出节点: 最终结果] G -- 未通过 --> F style A fill:#e1f5fe style H fill:#e8f5e9 style G fill:#fff3e0DAG 执行引擎的核心机制
引擎的执行过程分为三个阶段:拓扑排序确定执行顺序、按层调度执行节点、状态回滚支持断点恢复。每个节点的执行结果持久化到状态存储中,当工作流因故障中断时,可以从最后一个成功节点恢复执行,而非从头开始。
节点类型与接口抽象
工作流中的节点分为四类:输入节点(接收外部数据)、处理节点(调用模型或工具)、条件节点(根据上一步结果选择分支)、输出节点(返回最终结果)。所有节点实现统一的execute接口,引擎不关心节点内部的具体实现。
三、生产级工作流引擎实现:声明式编排与容错执行
工作流定义 DSL
// workflow/types.ts // 工作流定义的类型系统:声明式描述工作流结构 // 引擎根据定义自动调度执行,开发者无需关心执行顺序 interface WorkflowDefinition { id: string; name: string; version: string; // 节点列表:每个节点声明自己的输入来源 nodes: WorkflowNode[]; // 全局配置:超时、重试策略、模型选择 config: WorkflowConfig; } interface WorkflowNode { id: string; type: 'input' | 'processor' | 'condition' | 'output'; // 声明依赖:本节点的输入来自哪些节点的输出 dependsOn: string[]; // 节点执行器:具体处理逻辑 executor: NodeExecutor; // 节点级别的重试配置 retry?: RetryPolicy; } interface RetryPolicy { maxAttempts: number; // 指数退避基础间隔(毫秒) baseDelay: number; // 可重试的错误类型 retryableErrors: string[]; } interface WorkflowConfig { // 全局超时时间 globalTimeout: number; // 最大并行度 maxConcurrency: number; // 状态持久化存储 stateStore: 'memory' | 'redis' | 'database'; } // 节点执行器的统一接口 interface NodeExecutor { execute(input: Record<string, unknown>): Promise<NodeResult>; } interface NodeResult { success: boolean; data: Record<string, unknown>; // Token 消耗统计,用于成本追踪 tokenUsage?: { prompt: number; completion: number }; }工作流执行引擎
// workflow/engine.ts // DAG 驱动的工作流执行引擎 // 核心职责:拓扑排序、并行调度、状态持久化、断点恢复 export class WorkflowEngine { private stateStore: StateStore; constructor(private definition: WorkflowDefinition) { this.stateStore = this.createStateStore(definition.config.stateStore); } async run(initialInput: Record<string, unknown>): Promise<WorkflowResult> { // 第一步:拓扑排序,确定执行层级 const layers = this.topologicalSort(); // 第二步:初始化工作流状态 const runId = this.generateRunId(); await this.stateStore.initRun(runId, initialInput); // 第三步:逐层执行,同层节点并行 for (const layer of layers) { const parallelNodes = layer.map((nodeId) => this.executeNode(runId, nodeId) ); // 同层节点并行执行,受 maxConcurrency 限制 const results = await this.runWithConcurrencyLimit( parallelNodes, this.definition.config.maxConcurrency ); // 检查是否有节点失败且不可重试 const failedNode = results.find((r) => !r.success); if (failedNode) { // 持久化失败状态,支持后续断点恢复 await this.stateStore.markRunFailed(runId, failedNode); return { success: false, error: failedNode.error }; } } // 第四步:收集输出节点的结果 const output = await this.stateStore.getRunOutput(runId); return { success: true, data: output }; } // 从指定节点恢复执行,跳过已成功的节点 async resume(runId: string): Promise<WorkflowResult> { const completedNodes = await this.stateStore.getCompletedNodes(runId); const layers = this.topologicalSort(); // 跳过已完成的层,从第一个未完成的层开始 for (const layer of layers) { const allCompleted = layer.every((id) => completedNodes.includes(id)); if (allCompleted) continue; const pendingNodes = layer.filter( (id) => !completedNodes.includes(id) ); const results = await Promise.all( pendingNodes.map((nodeId) => this.executeNode(runId, nodeId)) ); const failedNode = results.find((r) => !r.success); if (failedNode) { await this.stateStore.markRunFailed(runId, failedNode); return { success: false, error: failedNode.error }; } } const output = await this.stateStore.getRunOutput(runId); return { success: true, data: output }; } private async executeNode( runId: string, nodeId: string ): Promise<NodeResult> { const node = this.definition.nodes.find((n) => n.id === nodeId)!; const retryPolicy = node.retry ?? { maxAttempts: 1, baseDelay: 1000, retryableErrors: [] }; // 收集依赖节点的输出作为本节点输入 const dependencies = await this.stateStore.getNodeResults( runId, node.dependsOn ); let lastError: Error | null = null; // 带重试的执行循环 for (let attempt = 1; attempt <= retryPolicy.maxAttempts; attempt++) { try { const result = await node.executor.execute(dependencies); if (result.success) { // 持久化成功结果 await this.stateStore.saveNodeResult(runId, nodeId, result); return result; } // 执行成功但业务逻辑失败(如模型返回空结果) lastError = new Error(`节点 ${nodeId} 返回失败结果`); } catch (error) { lastError = error as Error; // 判断是否为可重试错误 const isRetryable = retryPolicy.retryableErrors.some( (code) => lastError!.message.includes(code) ); if (!isRetryable) break; } // 指数退避等待 const delay = retryPolicy.baseDelay * Math.pow(2, attempt - 1); await this.sleep(delay); } return { success: false, data: {}, error: lastError!.message }; } private topologicalSort(): string[][] { // Kahn 算法实现拓扑排序,返回按层分组的节点 ID // 同层节点无依赖关系,可以并行执行 const nodes = this.definition.nodes; const inDegree = new Map<string, number>(); const adjacency = new Map<string, string[]>(); for (const node of nodes) { inDegree.set(node.id, node.dependsOn.length); for (const dep of node.dependsOn) { if (!adjacency.has(dep)) adjacency.set(dep, []); adjacency.get(dep)!.push(node.id); } } const layers: string[][] = []; let queue = nodes .filter((n) => n.dependsOn.length === 0) .map((n) => n.id); while (queue.length > 0) { layers.push(queue); const nextQueue: string[] = []; for (const nodeId of queue) { const neighbors = adjacency.get(nodeId) ?? []; for (const neighbor of neighbors) { const degree = inDegree.get(neighbor)! - 1; inDegree.set(neighbor, degree); if (degree === 0) nextQueue.push(neighbor); } } queue = nextQueue; } return layers; } private async runWithConcurrencyLimit( tasks: Promise<NodeResult>[], limit: number ): Promise<NodeResult[]> { // 简化的并发限制实现 // 生产环境应使用 p-limit 等成熟库 const results: NodeResult[] = []; for (let i = 0; i < tasks.length; i += limit) { const batch = tasks.slice(i, i + limit); const batchResults = await Promise.all(batch); results.push(...batchResults); } return results; } private sleep(ms: number): Promise<void> { return new Promise((resolve) => setTimeout(resolve, ms)); } private generateRunId(): string { return `run_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`; } private createStateStore(type: string): StateStore { // 根据配置创建状态存储实例 return new MemoryStateStore(); } }四、工作流引擎的架构权衡:灵活性、性能与可靠性的博弈
声明式 vs 命令式编排:声明式 DAG 的优势在于自动推导执行顺序和并行度,但它也限制了灵活性——动态分支(如根据上一步结果决定执行哪些节点)需要通过条件节点模拟,增加了定义复杂度。对于逻辑高度动态的场景,命令式编排(如 LangChain 的 LCEL)可能更直观。但命令式编排的代价是,开发者需要自行处理并行和错误恢复。
状态持久化的开销:将每个节点的执行结果持久化到 Redis 或数据库,保证了断点恢复能力,但也增加了延迟。对于执行时间短、失败成本低的工作流(如内容生成),内存存储足够;对于执行时间长、失败成本高的工作流(如数据处理管线),持久化是必须的。
重试策略的精细度:简单的固定间隔重试容易实现,但在 API 限流场景下可能加剧问题。指数退避 + 抖动(Jitter)是更优的策略,它避免了多个工作流实例在同一时刻重试导致的"惊群效应"。
并行度的实际收益:理论上游离节点越多,并行收益越大。但在 AI 工作流中,并行节点通常都调用同一个 API,过高的并行度反而触发限流。maxConcurrency的设置需要根据 API 的限流策略来调整,而非简单地设为 CPU 核心数。
五、总结
AI 工作流引擎的价值,在于将复杂的编排逻辑从业务代码中剥离,让开发者专注于节点逻辑本身。落地路线如下:
第一,采用 DAG 作为工作流的核心数据结构。通过拓扑排序自动推导执行顺序,同层节点并行执行,最大化吞吐量。声明式定义让工作流结构一目了然。
第二,实现断点恢复机制。每个节点的执行结果持久化,工作流中断后可从最后一个成功节点恢复。对于耗时长的 AI 工作流,这一能力直接关系到成本控制。
第三,设计精细的重试策略。区分可重试错误与不可重试错误,使用指数退避 + 抖动避免惊群效应。重试策略应在节点级别可配置,不同节点面对不同的失败模式。
第四,根据实际场景选择状态存储。短时低价值工作流用内存存储,长时高价值工作流用持久化存储。不要为了可靠性而牺牲所有场景的响应速度。