news 2026/1/22 0:31:01

.NET+AI | Workflow | 工作流快速开始(2)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
.NET+AI | Workflow | 工作流快速开始(2)

Workflow 概览与核心概念

理解 MAF Workflow 架构并创建第一个工作流

📚 课程目标

本节课将带你快速入门MAF Workflow Orchestration (工作流编排),你将学习:

  • ✅ 理解 Workflow 在 AI 应用中的价值和定位

  • ✅ 掌握 Workflow 的核心构建块: Step (步骤)、Edge (边)、Executor (执行器)

  • ✅ 创建并运行第一个线性工作流

  • ✅ 掌握流式运行 (RunStreamingAsync) 机制

  • ✅ 理解工作流即 Agent (Workflow as Agent) 理念

📋 前置知识

在学习本课之前,建议先掌握:

  • 第4章 MAF Agent: 理解 Agent 基础概念、AIAgentAgentThread的使用

  • 第1章 MEAI(可选): 了解IChatClient接口有助于理解后续的 Agent 集成

如果你还不熟悉 MAF Agent 的基础概念,建议先学习第4章的内容。

🎯 什么是 Workflow Orchestration?

为什么需要工作流编排?

在企业级 AI 应用开发中,我们经常面临以下挑战:

单个 Agent 难以处理复杂任务: 一个 Agent 无法同时擅长需求分析、代码生成、测试等多个领域
业务逻辑与 AI 调用耦合: 复杂的条件判断、循环、并发控制分散在代码各处
流程难以可视化和维护: 多步骤的 AI 处理流程缺乏清晰的结构
缺乏统一的状态管理: 多个步骤之间的数据传递和状态共享容易出错

Workflow Orchestration (工作流编排)正是为了解决这些问题而生:

模块化设计: 将复杂任务拆分为独立的 Executor 和 Agent
清晰的流程定义: 使用 Builder API 构建可读、可维护的流程图
灵活的流程控制: 支持条件分支、循环迭代、并发执行等复杂模式
统一的状态管理: 内置WorkflowContext管理跨步骤的状态和数据
实时流式反馈: 通过事件机制实时监控工作流的执行进度

MAF Workflow 的定位

MAF Workflow位于应用层和 Agent 层之间,负责:

  • 📐编排多个 Agent: 决定 Agent 的执行顺序、条件和并发策略

  • 🔄管理数据流: 在 Agent 和 Executor 之间传递数据

  • 📊监控执行状态: 实时报告工作流的执行进度和结果

  • 🛡️错误处理: 统一处理执行过程中的异常和重试逻辑

🧩 Workflow 的核心构建块

Workflow 由以下三个核心概念组成:

1️⃣Executor (执行器)- 处理单元

Executor 是工作流的基本处理单元,负责执行具体的业务逻辑。

类型定义:

public abstract class Executor<TInput, TOutput> : Executor { public abstract ValueTask<TOutput> HandleAsync( TInput input, IWorkflowContext context, CancellationToken cancellationToken = default); }

特点:

  • 📥强类型输入:TInput定义接收的数据类型

  • 📤强类型输出:TOutput定义返回的数据类型

  • 🔧纯逻辑处理: 可以是数据转换、验证、格式化等任何操作

  • 🤖Agent 也是 Executor: AIAgent 可以直接作为 Executor 使用

示例用途:

  • 文本转换 (大写、反转、清理)

  • 数据验证 (格式检查、安全过滤)

  • 结果聚合 (合并多个输出)

  • AI 调用 (通过 Agent)

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

2️⃣Edge (边)- 数据流路径

Edge 定义了 Executor 之间的数据流动方向和连接关系。

类型:

  • 顺序边 (Sequential Edge):A → B- 数据从 A 流向 B

  • 条件边 (Conditional Edge):A → B or C- 根据条件选择路径

  • Fan-out 边:A → [B, C, D]- 一个输出分发给多个 Executor

  • Fan-in 边:[B, C, D] → E- 多个输出汇聚到一个 Executor

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

3️⃣Workflow (工作流)- 完整流程定义

Workflow 是由多个 Executor 通过 Edge 连接而成的完整处理流程。

构建方式:

// 使用 WorkflowBuilder 构建工作流 WorkflowBuilder builder = new(startExecutor); builder.AddEdge(executorA, executorB); builder.AddEdge(executorB, executorC); builder.WithOutputFrom(executorC); Workflow workflow = builder.Build();

执行方式:

  • 同步执行:RunAsync()- 等待完整结果

  • 流式执行:StreamAsync()- 实时接收事件流

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

核心概念关系图

🔧 环境准备

首先我们需要导入必要的依赖包和 Helper 类。

// 导入 MAF Workflow 核心包 #!import ../helper/MafHelper.cs using Microsoft.Agents.AI.Workflows; using Microsoft.Agents.AI; using Microsoft.Extensions.AI; using System.ComponentModel; Console.WriteLine("✅ 依赖包加载完成");

🚀 第一个工作流: 文本处理管道

让我们从一个简单的例子开始,构建一个文本处理工作流:

业务场景: 将用户输入的文本 → 转为大写 → 反转顺序

步骤 1: 定义第一个 Executor - 大写转换

我们将实现一个自定义 Executor 来处理文本大写转换。

using System.Threading; /// <summary> /// 大写转换执行器 - 将输入文本转换为大写 /// </summary> publicclassUppercaseExecutor : Executor<string, string> { public UppercaseExecutor() : base("UppercaseExecutor") { } /// <summary> /// 核心处理方法 - 将输入文本转为大写 /// </summary> public override ValueTask<string> HandleAsync( string message, IWorkflowContext context, CancellationToken cancellationToken = default) { // 执行业务逻辑: 转为大写 string result = message.ToUpperInvariant(); // 返回处理结果 (会自动沿着 Edge 传递给下一个 Executor) return ValueTask.FromResult(result); } } Console.WriteLine("✅ UppercaseExecutor 定义完成");

代码解析:

🔵继承Executor<TInput, TOutput>:

  • TInput = string: 接收字符串输入

  • TOutput = string: 返回字符串输出

  • 构造函数传入"UppercaseExecutor"作为唯一标识符

🔵实现HandleAsync方法:

  • message: 接收上一个步骤传递的数据 (对于第一个 Executor,是工作流的输入)

  • context: 工作流上下文,用于访问共享状态、发布事件等 (后续课程详解)

  • cancellationToken: 用于取消操作

  • 返回值: 会自动作为消息沿着 Edge 传递给下一个 Executor

🔵业务逻辑:

  • 使用ToUpperInvariant()进行文化无关的大写转换

  • 返回ValueTask<string>(高性能异步模式)

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

步骤 2: 定义第二个 Executor - 文本反转

/// <summary> /// 文本反转执行器 - 将输入文本反转 /// </summary> publicclassReverseTextExecutor : Executor<string, string> { public ReverseTextExecutor() : base("ReverseTextExecutor") { } /// <summary> /// 核心处理方法 - 反转输入文本 /// </summary> public override ValueTask<string> HandleAsync( string message, IWorkflowContext context, CancellationToken cancellationToken = default) { // 执行业务逻辑: 反转字符串 string result = string.Concat(message.Reverse()); // 返回处理结果 return ValueTask.FromResult(result); } } Console.WriteLine("✅ ReverseTextExecutor 定义完成");

代码解析:

  • 同样继承Executor<string, string>

  • 使用Reverse()+string.Concat()反转字符串

  • 返回结果会成为工作流的最终输出 (如果它是最后一个 Executor)

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

步骤 3: 使用 WorkflowBuilder 构建工作流

现在我们将两个 Executor 通过 Edge 连接起来,组成完整的工作流。

// 创建 Executor 实例 UppercaseExecutor uppercaseExecutor = new UppercaseExecutor(); ReverseTextExecutor reverseExecutor = new ReverseTextExecutor(); // 使用 WorkflowBuilder 构建工作流 WorkflowBuilder builder = new WorkflowBuilder(uppercaseExecutor); // 指定起始 Executor builder.AddEdge(uppercaseExecutor, reverseExecutor); // 添加边: uppercase → reverse builder.WithOutputFrom(reverseExecutor); // 指定输出来自 reverse // 构建最终的工作流对象 Workflow workflow = builder.Build(); Console.WriteLine("✅ 工作流构建完成"); new { 起始节点 = "UppercaseExecutor", 边 = "UppercaseExecutor → ReverseTextExecutor", 输出节点 = "ReverseTextExecutor" }.Display();

代码解析:

🔵new WorkflowBuilder(uppercaseExecutor):

  • 创建 WorkflowBuilder 并指定起始 Executor

  • 工作流的输入数据会首先传递给这个 Executor

🔵AddEdge(source, target):

  • 添加一条有向边(Edge),定义数据流动方向

  • source的输出会自动传递给target的输入

  • 类型检查:sourceTOutput必须匹配targetTInput

🔵WithOutputFrom(executor):

  • 指定工作流的最终输出来自哪个 Executor

  • 如果不调用此方法,默认输出来自最后一个 Executor

🔵Build():

  • 完成构建,返回一个不可变的Workflow对象

  • 此时会进行拓扑验证: 检查是否有循环依赖、孤立节点等

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

步骤 4: 同步执行工作流

现在让我们执行工作流并获取结果。

// 执行工作流 - 同步模式 string input = "Hello, World!"; Console.WriteLine($"📥 输入: {input}"); Console.WriteLine("⏱️ 开始执行工作流...\n"); awaitusing (Run run = await InProcessExecution.RunAsync(workflow, input)) { // 处理执行结果 Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"); Console.WriteLine("📊 执行结果:\n"); foreach (WorkflowEvent evt in run.NewEvents) { if (evt is ExecutorCompletedEvent executorCompleted) { new { 执行器 = executorCompleted.ExecutorId, 输出数据 = executorCompleted.Data }.Display(); Console.WriteLine(); } } Console.WriteLine("✅ 工作流执行完成"); }

代码解析:

🔵InProcessExecution.RunAsync(workflow, input):

  • 当前进程内执行工作流 (同步等待完成)

  • input参数会传递给起始 Executor

  • 返回Run对象,包含执行结果和事件

🔵Run.NewEvents:

  • 获取工作流执行过程中产生的所有事件

  • 主要事件类型:

    • WorkflowStartedEvent: 工作流启动

    • ExecutorCompletedEvent: 某个 Executor 执行完成

    • WorkflowCompletedEvent: 工作流完成

🔵ExecutorCompletedEvent:

  • ExecutorId: 执行器的唯一标识符

  • Data: 执行器的输出数据 (object 类型,需要转换)

  • 事件按执行顺序排列

预期输出:

执行器: UppercaseExecutor 输出数据: HELLO, WORLD! 执行器: ReverseTextExecutor 输出数据: !DLROW ,OLLEH

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

💡 关键理解:

RunAsync阻塞式执行:

  • 等待整个工作流完成后才返回

  • 一次性获取所有事件

  • 适合短时间任务不需要实时反馈的场景

但在企业级应用中,我们经常需要:

  • ⏱️ 实时显示长时间运行任务的进度

  • 📊 向用户报告每个步骤的执行状态

  • 🛡️ 及时发现和处理中间步骤的错误

这时就需要使用流式执行 (Streaming Execution)模式。

🌊 流式执行: 实时监控工作流进度

流式执行允许我们在工作流运行过程中实时接收事件流,而不是等待全部完成。

对比:

特性

同步执行 (RunAsync)

流式执行 (StreamAsync)

执行模式

阻塞等待全部完成

返回异步流 (IAsyncEnumerable)

事件获取

一次性获取所有事件

实时逐个接收事件

适用场景

短时间任务、批处理

长时间任务、实时反馈

用户体验

等待后一次性显示结果

逐步显示进度

步骤 5: 使用 StreamAsync 实现流式执行

// 执行工作流 - 流式模式 string streamingInput = "Workflow Streaming!"; Console.WriteLine($"📥 输入: {streamingInput}"); Console.WriteLine("⏱️ 开始流式执行工作流...\n"); awaitusing (StreamingRun streamingRun = await InProcessExecution.StreamAsync(workflow, streamingInput)) { Console.WriteLine("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"); Console.WriteLine("📊 实时执行进度:\n"); // 实时监听事件流 awaitforeach (WorkflowEvent evt in streamingRun.WatchStreamAsync()) { if (evt is ExecutorCompletedEvent executorCompleted) { Console.WriteLine($"⚡ [{DateTime.Now:HH:mm:ss.fff}] {executorCompleted.ExecutorId} 完成"); new { 输出数据 = executorCompleted.Data }.Display(); Console.WriteLine(); } elseif (evt is WorkflowStartedEvent) { Console.WriteLine($"🚀 [{DateTime.Now:HH:mm:ss.fff}] 工作流启动\n"); } elseif (evt is WorkflowOutputEvent) { Console.WriteLine($"✅ [{DateTime.Now:HH:mm:ss.fff}] 工作流完成"); Console.WriteLine($"✅ 工作流输出:{evt.Data}"); } } }

代码解析:

🔵InProcessExecution.StreamAsync(workflow, input):

  • 启动流式执行,立即返回StreamingRun对象

  • 不会阻塞等待工作流完成

🔵streamingRun.WatchStreamAsync():

  • 返回IAsyncEnumerable<WorkflowEvent>异步流

  • 使用await foreach实时迭代事件

  • 每当一个 Executor 完成,就会产生一个新的ExecutorCompletedEvent

🔵时间戳DateTime.Now:

  • 演示事件是逐个实时产生

  • 在实际的长时间任务中,你会看到明显的时间间隔

🔵主要事件类型:

  • WorkflowStartedEvent: 工作流开始

  • ExecutorCompletedEvent: 单个 Executor 完成

  • WorkflowOutputEvent: 整个工作流完成

预期输出:

🚀 [14:23:45.123] 工作流启动 ⚡ [14:23:45.125] UppercaseExecutor 完成 输出数据: WORKFLOW STREAMING! ⚡ [14:23:45.127] ReverseTextExecutor 完成 输出数据: !GNIMAERTS WOLFKROW ✅ [14:23:45.128] 工作流完成

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

💡 流式执行的优势:

实时反馈: 用户可以立即看到每个步骤的进度
早期错误检测: 可以在中间步骤失败时立即处理
更好的用户体验: 避免长时间的"黑盒等待"
灵活的事件处理: 可以订阅特定类型的事件,实现日志、监控等功能

在后续课程中,我们会学习如何定义自定义事件来传递更丰富的业务信息。

⚡ 简化写法: 使用 Lambda 表达式创建 Executor

对于简单的转换逻辑,每次都定义完整的 Executor 类显得过于繁琐。MAF 提供了BindAsExecutor扩展方法,允许我们将普通函数绑定为 Executor。

步骤 6: 使用 Lambda 表达式重构工作流

// 使用 Lambda 表达式定义转换逻辑 Func<string, string> toUpperFunc = input => input.ToUpperInvariant(); Func<string, string> reverseFunc = input => string.Concat(input.Reverse()); // 将函数绑定为 Executor var upperExecutor = toUpperFunc.BindAsExecutor("UpperExecutor"); var reverseExecutor2 = reverseFunc.BindAsExecutor("ReverseExecutor"); // 构建工作流 (与之前相同的流程) WorkflowBuilder lambdaBuilder = new WorkflowBuilder(upperExecutor); lambdaBuilder.AddEdge(upperExecutor, reverseExecutor2); lambdaBuilder.WithOutputFrom(reverseExecutor2); Workflow lambdaWorkflow = lambdaBuilder.Build(); Console.WriteLine("✅ 使用 Lambda 表达式的工作流构建完成");

测试 Lambda 工作流

// 测试 Lambda 工作流 string lambdaInput = "Lambda Workflow!"; Console.WriteLine($"📥 输入: {lambdaInput}"); awaitusing (Run lambdaRun = await InProcessExecution.RunAsync(lambdaWorkflow, lambdaInput)) { Console.WriteLine("\n📊 执行结果:"); foreach (WorkflowEvent evt in lambdaRun.NewEvents) { if (evt is ExecutorCompletedEvent executorCompleted) { new { 执行器 = executorCompleted.ExecutorId, 输出 = executorCompleted.Data }.Display(); } } }

代码解析:

🔵BindAsExecutor(name):

  • Func<TInput, TOutput>绑定为Executor<TInput, TOutput>

  • 参数name: Executor 的唯一标识符

  • 限制: 只适用于同步、无副作用的纯函数转换

🔵适用场景:

  • ✅ 数据格式转换 (大写、小写、Trim)

  • ✅ 简单计算 (字符串拼接、数学运算)

  • ✅ 数据映射 (DTO 转换)

🔵不适用场景:

  • ❌ 需要访问IWorkflowContext的逻辑

  • ❌ 需要异步操作 (如数据库查询、API调用)

  • ❌ 复杂的状态管理

建议:

  • 简单转换用 Lambda

  • 复杂逻辑用完整的 Executor 类

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

📝 本课总结

恭喜! 你已经掌握了 MAF Workflow 的核心概念和基础用法:

✅ 核心收获

1️⃣理解 Workflow 的价值:

  • 模块化设计: 将复杂任务拆分为独立的 Executor

  • 清晰的流程定义: 使用 Builder API 构建可维护的流程

  • 统一的状态管理: 内置 WorkflowContext 管理状态

2️⃣掌握核心构建块:

  • Executor: 处理单元,强类型输入输出

  • Edge: 数据流路径,定义 Executor 之间的连接

  • Workflow: 完整流程,由 Executor 和 Edge 组成

3️⃣学会构建工作流:

  • 定义自定义 Executor (继承Executor<TInput, TOutput>)

  • 使用WorkflowBuilder连接 Executor

  • 使用BindAsExecutor简化简单逻辑

4️⃣掌握执行模式:

  • 同步执行:RunAsync()- 等待完整结果

  • 流式执行:StreamAsync()- 实时接收事件流

  • 理解ExecutorCompletedEvent等核心事件

5️⃣了解高级概念:

  • Workflow as Agent: 工作流可以被封装为 Agent

  • 为后续学习 Agent 集成、多轮对话打下基础

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

📚 下一步学习

在下一节课中,我们将学习:

  • 🔧 如何实现自定义 Executor 的高级功能

  • 📊 深入理解IWorkflowContext的使用

  • 🛡️ 错误处理与 Cancellation Token 管理

  • 🎯 企业级 Executor 的最佳实践

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

💡 思考题

  1. 场景: 你需要构建一个"文章处理工作流": 输入原始文章 → 去除特殊字符 → 提取关键词 → 生成摘要。请设计需要哪些 Executor,以及它们之间的连接关系。

  2. 对比: 什么时候应该使用RunAsync,什么时候应该使用StreamAsync? 请结合实际业务场景思考。

  3. 扩展: 如果要在工作流中加入"人工审核"步骤 (需要暂停等待人工输入),应该如何实现? (提示: 思考IWorkflowContext可能提供的能力)

在后续课程中,这些问题都会得到解答!

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/20 0:21:31

PyTorch-2.x-Universal-Dev-v1.0环境部署教程:OpenCV-Python-headless应用解析

PyTorch-2.x-Universal-Dev-v1.0环境部署教程&#xff1a;OpenCV-Python-headless应用解析 1. 引言 随着深度学习项目的复杂度不断提升&#xff0c;构建一个稳定、高效且开箱即用的开发环境已成为提升研发效率的关键环节。PyTorch-2.x-Universal-Dev-v1.0 镜像正是为此而生—…

作者头像 李华
网站建设 2026/1/21 23:05:57

Qwen3-Embedding-4B调用报错?常见问题排查步骤详解

Qwen3-Embedding-4B调用报错&#xff1f;常见问题排查步骤详解 1. 背景与问题引入 在基于大模型的语义理解系统中&#xff0c;文本嵌入&#xff08;Text Embedding&#xff09;是实现检索、聚类、分类等任务的核心前置能力。Qwen3-Embedding-4B作为通义千问系列最新推出的中等…

作者头像 李华
网站建设 2026/1/21 22:30:28

Qwen3-14B模型监控方案:推理性能实时分析工具

Qwen3-14B模型监控方案&#xff1a;推理性能实时分析工具 你是不是也遇到过这样的场景&#xff1a;作为MLE&#xff08;机器学习工程师&#xff09;&#xff0c;手头要上线一个基于Qwen3-14B的大模型服务&#xff0c;但生产环境部署前必须做一轮完整的压力测试。可问题是——你…

作者头像 李华
网站建设 2026/1/20 0:18:56

YOLOE镜像使用心得:高效又省心的检测方案

YOLOE镜像使用心得&#xff1a;高效又省心的检测方案 在智能安防、工业质检和自动驾驶等实时视觉任务中&#xff0c;目标检测与实例分割模型正面临前所未有的挑战&#xff1a;不仅要识别预定义类别&#xff0c;还需应对开放世界中的未知物体。传统YOLO系列虽推理高效&#xff…

作者头像 李华
网站建设 2026/1/20 0:18:36

24l01话筒在无线麦克风中的实践应用

用nRF24L01打造高性能无线麦克风&#xff1a;从芯片原理到实战调优你有没有遇到过这样的场景&#xff1f;在小型演讲厅里&#xff0c;主持人刚开口&#xff0c;麦克风就“滋啦”一声爆出杂音&#xff1b;或者直播时延迟半拍&#xff0c;声音和口型对不上&#xff1b;更别提那些…

作者头像 李华