当 AI Agent 遇上工作流编排:微软 Agent Framework 的 Workflow 深度解析
"如果说单个 AI Agent 是一位专家,那么 Workflow 就是让这些专家协同作战的指挥官。"
引言:从单打独斗到团队协作
还记得你第一次使用 ChatGPT 的感觉吗?那种"哇,AI 真的能理解我"的惊喜。但很快你就会发现,当任务变得复杂时,单个 AI 对话就像是让一个人同时扮演厨师、服务员和收银员——理论上可行,实际上一团糟。
这就是为什么微软推出了 Agent Framework,而其中的 Workflow 功能,正是解决这个问题的杀手锏。今天,我们就来深入探讨这个让 AI Agent 从"单兵作战"升级为"特种部队"的技术。
一、Workflow 是什么?为什么我们需要它?
1.1 现实世界的痛点
想象一个场景:你需要构建一个智能客服系统,它要能够:
-
理解用户的问题(自然语言处理)
-
查询订单数据库(数据检索)
-
判断是否需要人工介入(决策逻辑)
-
生成友好的回复(内容生成)
如果用传统的单一 AI Agent 来处理,你会遇到什么问题?
-
上下文混乱:一个 Agent 要记住所有状态,容易"精神分裂"
-
职责不清:什么都做,往往什么都做不好
-
难以扩展:新增功能就像在意大利面代码里加调料
-
无法并行:明明可以同时做的事,却要排队等待
1.2 Workflow 的核心理念
Microsoft Agent Framework 的 Workflow 采用了一个优雅的设计哲学:
Workflow = Executors(执行器)+ Edges(边)+ Orchestration(编排)
这就像搭积木:
-
Executor:每个积木块,专注做一件事
-
Edge:积木块之间的连接方式
-
Workflow:最终搭建出的复杂结构
让我们看一个最简单的例子:
// 创建两个执行器
UppercaseExecutor uppercase = new();
ReverseTextExecutor reverse = new();// 用边连接它们
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);var workflow = builder.Build();// 执行:输入 "Hello" → 转大写 "HELLO" → 反转 "OLLEH"
await using Run run = await InProcessExecution.RunAsync(workflow, "Hello");
看到了吗?每个 Executor 只做一件事,但通过 Edge 连接后,就能完成复杂的任务。这就是组合的力量。
二、Workflow 的核心组件深度剖析
2.1 Executor:工作流的基本单元
Executor 是 Workflow 的"原子",它的设计哲学是:单一职责,明确输入输出。
2.1.1 Executor 的三种形态
形态一:无返回值的 Executor
internal sealed class LoggerExecutor() : Executor<string>("Logger")
{public override ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default){Console.WriteLine($"[LOG] {message}");return default; // 不返回任何值}
}
形态二:有返回值的 Executor
internal sealed class UppercaseExecutor() : Executor<string, string>("Uppercase")
{public override ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default){return ValueTask.FromResult(message.ToUpperInvariant());}
}
形态三:AI Agent 作为 Executor
这是最有趣的部分!Agent Framework 允许你把任何 AI Agent 包装成 Executor:
var chatClient = new AzureOpenAIClient(endpoint, credential).GetChatClient(deploymentName).AsIChatClient();// 创建一个翻译 Agent
AIAgent frenchAgent = new ChatClientAgent(chatClient,"You are a translation assistant that translates to French."
);// Agent 可以直接用在 Workflow 中!
var workflow = new WorkflowBuilder(frenchAgent).AddEdge(frenchAgent, spanishAgent).AddEdge(spanishAgent, englishAgent).Build();
2.1.2 Executor 的生命周期
每个 Executor 都有完整的生命周期管理:
protected internal virtual ValueTask InitializeAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
{// 初始化资源、加载配置等
}protected internal virtual ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
{// 保存检查点前的准备工作
}protected internal virtual ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
{// 从检查点恢复后的初始化
}
这种设计让 Executor 不仅仅是一个函数,而是一个有状态、可管理的组件。
2.2 Edge:数据流动的艺术
如果说 Executor 是节点,那么 Edge 就是连接这些节点的"血管"。Agent Framework 提供了三种 Edge 类型:
2.2.1 Direct Edge:一对一连接
最简单的边,数据从一个 Executor 流向另一个:
builder.AddEdge(sourceExecutor, targetExecutor);
但这还不够强大,我们可以加上条件判断:
builder.AddEdge<int>(source, target, condition: value => value > 0 // 只有正数才会传递
);
这就像给数据流加上了"阀门",只有满足条件的数据才能通过。
2.2.2 Fan-Out Edge:一对多广播
想象一个场景:你需要同时咨询三个专家的意见。Fan-Out 就是为此而生:
ChatClientAgent physicist = new(client, "Physics expert");
ChatClientAgent chemist = new(client, "Chemistry expert");
ChatClientAgent biologist = new(client, "Biology expert");builder.AddFanOutEdge(startExecutor, targets: [physicist, chemist, biologist]
);
所有专家会并行处理同一个问题,大大提升效率!
你还可以自定义分发策略:
builder.AddFanOutEdge<string>(source,partitioner: (message, count) => {// 根据消息内容决定发给哪些 Executorif (message.Contains("urgent"))return [0, 1]; // 发给前两个elsereturn [2]; // 只发给第三个},targets: [executor1, executor2, executor3]
);
2.2.3 Fan-In Edge:多对一汇聚
有了 Fan-Out,自然需要 Fan-In 来收集结果:
var aggregator = new ResultAggregatorExecutor();builder.AddFanInEdge(aggregator, sources: [physicist, chemist, biologist]
);
Aggregator 会等待所有源 Executor 完成后,再统一处理结果。这就是经典的 Map-Reduce 模式!
三、Workflow 的高级模式:从理论到实战
3.1 Sequential Pattern:流水线模式
这是最直观的模式,就像工厂流水线一样,每个环节依次处理:
var workflow = AgentWorkflowBuilder.BuildSequential("Translation Pipeline",GetTranslationAgent("French", client),GetTranslationAgent("Spanish", client),GetTranslationAgent("English", client)
);// 输入:"Hello"
// → 法语:"Bonjour"
// → 西班牙语:"Hola"
// → 英语:"Hello"
适用场景:
-
数据处理管道(清洗 → 转换 → 验证)
-
多步骤审批流程
-
渐进式内容生成
性能特点:
-
延迟:累加(每个步骤的延迟相加)
-
吞吐量:受限于最慢的环节
-
资源占用:低(同一时间只有一个 Executor 在工作)
3.2 Concurrent Pattern:并行处理模式
当多个任务互不依赖时,为什么要让它们排队?并行处理才是王道:
var workflow = AgentWorkflowBuilder.BuildConcurrent("Multi-Expert Consultation",agents: [physicistAgent, chemistAgent, biologistAgent],aggregator: results => {// 自定义聚合逻辑return results.SelectMany(list => list).OrderBy(msg => msg.Timestamp).ToList();}
);
实战案例:智能文档分析系统
假设你要分析一份技术文档,需要:
-
提取关键词(NLP Agent)
-
生成摘要(Summarization Agent)
-
检测技术栈(Tech Stack Detector Agent)
这三个任务完全独立,用并行模式可以将处理时间从 30 秒降到 10 秒!
var nlpAgent = new ChatClientAgent(client, "Extract keywords");
var summaryAgent = new ChatClientAgent(client, "Generate summary");
var techAgent = new ChatClientAgent(client, "Detect tech stack");var workflow = AgentWorkflowBuilder.BuildConcurrent("Document Analyzer",[nlpAgent, summaryAgent, techAgent]
);await using var run = await InProcessExecution.StreamAsync(workflow, new ChatMessage(ChatRole.User, documentContent)
);
性能对比:
| 模式 | 总耗时 | CPU 利用率 | 适用场景 |
|---|---|---|---|
| Sequential | 30s | 33% | 任务有依赖关系 |
| Concurrent | 10s | 90% | 任务相互独立 |
3.3 Handoffs Pattern:智能转接模式
这是最接近真实世界的模式。想象一个客服中心:
-
前台接待(Triage Agent)判断问题类型
-
技术支持(Tech Agent)处理技术问题
-
销售顾问(Sales Agent)处理购买咨询
-
必要时可以转回前台
var triageAgent = new ChatClientAgent(client,"Determine which specialist to route to. ALWAYS handoff.","triage_agent"
);var techAgent = new ChatClientAgent(client,"Handle technical support questions only.","tech_support"
);var salesAgent = new ChatClientAgent(client,"Handle sales and pricing questions only.","sales_agent"
);var workflow = AgentWorkflowBuilder.CreateHandoffBuilderWith(triageAgent).WithHandoffs(triageAgent, [techAgent, salesAgent]).WithHandoffs([techAgent, salesAgent], triageAgent).Build();
Handoff 的魔法:AI Tool Calling
Handoff 是如何实现的?答案是 AI Function Calling!
当你配置 Handoff 时,Framework 会自动为每个目标 Agent 生成一个 Tool:
// 自动生成的 Tool
{"name": "handoff_to_tech_support","description": "Handle technical support questions only.","parameters": { ... }
}
当 Triage Agent 判断需要技术支持时,它会"调用"这个 Tool,Framework 拦截这个调用,将对话转交给 Tech Agent。整个过程对用户来说是无缝的!
3.4 Group Chat Pattern:圆桌会议模式
如果说 Handoffs 是"点对点转接",那么 Group Chat 就是"所有人都在一个会议室":
var workflow = AgentWorkflowBuilder.CreateGroupChatBuilderWith(agents => new RoundRobinGroupChatManager(agents) { MaximumIterationCount = 5 }).AddParticipants(GetTranslationAgent("French", client),GetTranslationAgent("Spanish", client),GetTranslationAgent("English", client)).Build();
Group Chat Manager 的职责:
-
决定下一个发言的 Agent
-
控制对话轮次
-
判断何时结束讨论
你可以实现自定义的 Manager:
public class SmartGroupChatManager : GroupChatManager
{protected override async ValueTask<AIAgent?> SelectNextAgentAsync(IReadOnlyList<ChatMessage> history,CancellationToken cancellationToken){// 使用 AI 来决定下一个发言者var decision = await _decisionAgent.RunAsync($"Based on the conversation, who should speak next? {string.Join(", ", _agents.Select(a => a.Name))}");return _agents.FirstOrDefault(a => decision.Contains(a.Name, StringComparison.OrdinalIgnoreCase));}
}
四、Workflow 的状态管理:时间旅行不是梦
4.1 Checkpoint:工作流的"存档点"
想象你在玩一个复杂的 RPG 游戏,突然断电了。如果没有存档,你得从头开始。Workflow 的 Checkpoint 就是这样的"存档系统"。
var checkpointManager = CheckpointManager.Default;
var checkpoints = new List<CheckpointInfo>();await using Checkpointed<StreamingRun> run = await InProcessExecution.StreamAsync(workflow, input, checkpointManager);await foreach (WorkflowEvent evt in run.Run.WatchStreamAsync())
{if (evt is SuperStepCompletedEvent superStepEvt){// 每个 Super Step 完成后自动创建 CheckpointCheckpointInfo? checkpoint = superStepEvt.CompletionInfo!.Checkpoint;if (checkpoint is not null){checkpoints.Add(checkpoint);Console.WriteLine($"Checkpoint {checkpoints.Count} saved!");}}
}
Super Step 是什么?
Workflow 的执行不是连续的,而是分成一个个"超级步骤":
Super Step 1: [Executor A] 完成
Super Step 2: [Executor B, Executor C] 并行完成
Super Step 3: [Executor D] 完成
每个 Super Step 结束后,系统会自动保存状态。这样设计的好处:
-
粒度合适:不会太频繁(每条消息一个),也不会太稀疏
-
状态一致:Super Step 内的所有 Executor 要么全部完成,要么全部未完成
-
易于恢复:从任何 Checkpoint 恢复都能保证状态一致
4.2 时间旅行:回到过去
有了 Checkpoint,你可以做一些"魔法操作":
// 恢复到第 5 个 Checkpoint
CheckpointInfo savedCheckpoint = checkpoints[4];
await run.RestoreCheckpointAsync(savedCheckpoint);// 从这个点继续执行
await foreach (WorkflowEvent evt in run.Run.WatchStreamAsync())
{// 继续处理...
}
实战场景:A/B 测试
假设你有一个复杂的 Workflow,前面 5 个步骤都一样,但第 6 步有两种不同的策略。你可以:
-
执行到第 5 步,保存 Checkpoint
-
用策略 A 完成剩余步骤,记录结果
-
恢复到第 5 步的 Checkpoint
-
用策略 B 完成剩余步骤,记录结果
-
对比两种策略的效果
这在传统编程中需要复杂的状态管理,但在 Workflow 中只需要几行代码!
4.3 Shared State:Executor 之间的"共享内存"
有时候,多个 Executor 需要共享一些数据。比如:
-
累加器(统计处理了多少条数据)
-
缓存(避免重复计算)
-
配置(所有 Executor 共享的参数)
public class CounterExecutor : StatefulExecutor<string>
{private int _count = 0;public CounterExecutor() : base("Counter", new StatefulExecutorOptions { StateScope = StateScope.Workflow // 工作流级别的状态}){ }public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default){_count++;Console.WriteLine($"Processed {_count} messages");// 状态会自动保存到 Checkpointawait context.SendMessageAsync($"Count: {_count}");}
}
状态作用域:
| 作用域 | 生命周期 | 适用场景 |
|---|---|---|
Executor | 单个 Executor 实例 | 临时缓存 |
Run | 单次 Workflow 执行 | 会话状态 |
Workflow | 整个 Workflow | 全局配置 |
五、人机协作:Human-in-the-Loop 模式
5.1 为什么需要人类介入?
AI 再强大,也有它的局限性:
-
关键决策:涉及法律、伦理的决定需要人类审核
-
异常处理:AI 遇到边界情况时需要人类指导
-
质量把关:重要内容发布前需要人工审查
Agent Framework 提供了优雅的 Request Port 机制来实现人机协作。
5.2 Request Port:工作流的"暂停按钮"
Request Port 就像是工作流中的一个"等待点",当执行到这里时,会暂停并向外部请求输入:
// 创建一个 Request Port
var humanInputPort = RequestPort.Create<NumberSignal, int>("HumanInput");// 在 Workflow 中使用
var workflow = new WorkflowBuilder(judgeExecutor).AddEdge(judgeExecutor, humanInputPort).AddEdge(humanInputPort, judgeExecutor).Build();
实战案例:数字猜谜游戏
让我们看一个完整的例子:
public class JudgeExecutor : Executor<int, NumberSignal>
{private const int TargetNumber = 42;public override ValueTask<NumberSignal> HandleAsync(int guess, IWorkflowContext context, CancellationToken cancellationToken = default){if (guess == TargetNumber){await context.YieldOutputAsync("Correct! You win!");return ValueTask.FromResult(NumberSignal.Correct);}else if (guess > TargetNumber){return ValueTask.FromResult(NumberSignal.Above);}else{return ValueTask.FromResult(NumberSignal.Below);}}
}// 执行 Workflow
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, NumberSignal.Init
);await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{if (evt is RequestInfoEvent requestEvt){// Workflow 请求人类输入Console.Write("Enter your guess: ");int guess = int.Parse(Console.ReadLine()!);// 发送响应await run.SendResponseAsync(requestEvt.Request.CreateResponse(guess));}else if (evt is WorkflowOutputEvent outputEvt){Console.WriteLine(outputEvt.Data);break;}
}
交互流程:
1. Workflow 启动 → JudgeExecutor 发送 Init 信号
2. 到达 Request Port → 触发 RequestInfoEvent
3. 外部程序接收事件 → 提示用户输入
4. 用户输入 50 → 发送 ExternalResponse
5. JudgeExecutor 判断 → 返回 Above 信号
6. 循环直到猜对...
5.3 高级场景:审批流程
在企业应用中,审批流程是典型的人机协作场景:
public class ApprovalWorkflow
{public static Workflow Build(){var documentProcessor = new DocumentProcessorExecutor();var aiReviewer = new AIReviewerAgent(client);var humanApprovalPort = RequestPort.Create<ReviewResult, ApprovalDecision>("HumanApproval");var publisher = new PublisherExecutor();return new WorkflowBuilder(documentProcessor).AddEdge(documentProcessor, aiReviewer).AddEdge<ReviewResult>(aiReviewer, humanApprovalPort,condition: result => result.Confidence < 0.9 // 置信度低时需要人工审核).AddEdge<ReviewResult>(aiReviewer,publisher,condition: result => result.Confidence >= 0.9 // 置信度高时直接发布).AddEdge(humanApprovalPort, publisher).Build();}
}
这个 Workflow 实现了"智能分流":
-
AI 有把握的内容(置信度 ≥ 0.9):自动发布
-
AI 不确定的内容(置信度 < 0.9):人工审核
这样既保证了质量,又提高了效率!
六、声明式 Workflow:低代码的未来
6.1 从命令式到声明式
前面我们看到的都是命令式的 Workflow 定义:
var builder = new WorkflowBuilder(start);
builder.AddEdge(start, middle);
builder.AddEdge(middle, end);
var workflow = builder.Build();
但对于非开发人员,这还是太复杂了。Agent Framework 提供了声明式 Workflow,用 JSON 配置就能定义复杂的流程!
6.2 声明式 Workflow 示例
{"name": "CustomerServiceWorkflow","description": "Intelligent customer service automation","actions": [{"id": "start","type": "Question","properties": {"prompt": "How can I help you today?"}},{"id": "classify","type": "InvokeAzureAgent","properties": {"agentId": "classifier-agent","input": "{{start.response}}"}},{"id": "route","type": "ConditionGroup","conditions": [{"condition": "{{classify.category}} == 'technical'","next": "tech-support"},{"condition": "{{classify.category}} == 'billing'","next": "billing-agent"}]},{"id": "tech-support","type": "InvokeAzureAgent","properties": {"agentId": "tech-support-agent"}},{"id": "billing-agent","type": "InvokeAzureAgent","properties": {"agentId": "billing-agent"}}]
}
6.3 声明式 Workflow 的优势
1. 可视化设计
配合可视化工具,业务人员可以像画流程图一样设计 Workflow:
[用户提问] → [AI分类] → [条件判断]├─ 技术问题 → [技术支持Agent]└─ 账单问题 → [账单Agent]
2. 动态加载
声明式 Workflow 可以在运行时加载和修改:
var workflowJson = await File.ReadAllTextAsync("workflow.json");
var workflow = await DeclarativeWorkflowBuilder.BuildFromJsonAsync(workflowJson);// 无需重新编译,直接执行新的 Workflow
await InProcessExecution.RunAsync(workflow, input);
3. 版本管理
Workflow 定义存储为 JSON,可以轻松进行版本控制:
git diff workflow-v1.json workflow-v2.json
4. 跨平台共享
同一个 Workflow 定义可以在不同的平台上执行:
-
.NET 应用
-
Azure Functions
-
Power Automate(未来可能支持)
6.4 声明式 Workflow 的内置 Actions
Agent Framework 提供了丰富的内置 Actions:
状态管理类:
-
SetVariable:设置变量 -
SetMultipleVariables:批量设置变量 -
ClearAllVariables:清空所有变量
对话管理类:
-
CreateConversation:创建新对话 -
AddConversationMessage:添加消息 -
RetrieveConversationMessages:检索历史消息
控制流类:
-
ConditionGroup:条件分支 -
Foreach:循环遍历 -
GotoAction:跳转到指定步骤
AI 集成类:
-
InvokeAzureAgent:调用 Azure AI Agent -
Question:请求用户输入
数据处理类:
-
ParseValue:解析数据 -
EditTableV2:编辑表格数据
这些 Actions 可以像乐高积木一样组合,构建出复杂的业务逻辑!
七、性能优化与最佳实践
7.1 并发控制:不是越多越好
虽然并发能提升性能,但也要注意资源限制:
// ❌ 不好的做法:无限制并发
var workflow = new WorkflowBuilder(start);
for (int i = 0; i < 1000; i++)
{var agent = new ChatClientAgent(client, $"Agent {i}");builder.AddFanOutEdge(start, agent);
}// ✅ 好的做法:分批处理
var batchSize = 10;
var batches = agents.Chunk(batchSize);foreach (var batch in batches)
{var batchWorkflow = AgentWorkflowBuilder.BuildConcurrent(batch);// 执行批次...
}
7.2 状态管理:选择合适的作用域
// ❌ 不好的做法:所有状态都用 Workflow 级别
public class MyExecutor : StatefulExecutor<string>
{public MyExecutor() : base("MyExecutor", new StatefulExecutorOptions { StateScope = StateScope.Workflow // 会一直占用内存}){ }
}// ✅ 好的做法:根据需要选择作用域
public class MyExecutor : StatefulExecutor<string>
{public MyExecutor() : base("MyExecutor", new StatefulExecutorOptions { StateScope = StateScope.Run // 执行完成后自动清理}){ }
}
7.3 Checkpoint 策略:平衡性能与可靠性
// 策略 1:每个 Super Step 都保存(默认)
// 优点:恢复粒度细
// 缺点:I/O 开销大// 策略 2:只在关键节点保存
var checkpointManager = new SelectiveCheckpointManager(shouldCheckpoint: (stepInfo) => stepInfo.ExecutorIds.Any(id => id.StartsWith("Critical"))
);// 策略 3:定时保存
var checkpointManager = new TimedCheckpointManager(interval: TimeSpan.FromMinutes(5)
);
7.4 错误处理:优雅降级
public class ResilientExecutor : Executor<string, string>
{private readonly int _maxRetries = 3;public override async ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default){for (int i = 0; i < _maxRetries; i++){try{return await ProcessAsync(message, cancellationToken);}catch (Exception ex) when (i < _maxRetries - 1){await context.AddEventAsync(new WorkflowWarningEvent($"Retry {i + 1}/{_maxRetries}: {ex.Message}"),cancellationToken);await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, i)), cancellationToken);}}// 最后一次失败,返回降级结果return "Service temporarily unavailable. Please try again later.";}
}
八、实战案例:构建智能文档处理系统
让我们通过一个完整的案例,把前面学到的知识串起来。
8.1 需求分析
假设我们要构建一个智能文档处理系统,功能包括:
-
文档上传:用户上传 PDF/Word 文档
-
内容提取:提取文本、图片、表格
- 多维度分析:
-
情感分析(正面/负面/中性)
-
关键词提取
-
摘要生成
-
技术栈识别(如果是技术文档)
-
-
质量评估:AI 评估文档质量
-
人工审核:质量低于阈值时需要人工审核
-
结果输出:生成分析报告
8.2 架构设计
[文档上传] ↓
[内容提取Executor]↓
[Fan-Out: 并行分析]├─ [情感分析Agent]├─ [关键词提取Agent]├─ [摘要生成Agent]└─ [技术栈识别Agent]↓
[Fan-In: 结果汇总Executor]↓
[质量评估Agent]↓
[条件分支]├─ 质量高 → [自动发布Executor]└─ 质量低 → [人工审核Port] → [发布Executor]
8.3 代码实现
Step 1: 定义 Executors
// 内容提取 Executor
public class ContentExtractorExecutor : Executor<DocumentUpload, ExtractedContent>
{public ContentExtractorExecutor() : base("ContentExtractor") { }public override async ValueTask<ExtractedContent> HandleAsync(DocumentUpload upload, IWorkflowContext context, CancellationToken cancellationToken = default){// 使用 PDF 库提取内容var content = await ExtractContentFromDocument(upload.FilePath);await context.AddEventAsync(new WorkflowOutputEvent($"Extracted {content.Text.Length} characters"),cancellationToken);return content;}
}// 结果汇总 Executor
public class ResultAggregatorExecutor : Executor<AnalysisResult>
{private readonly List<AnalysisResult> _results = new();private const int ExpectedResultCount = 4; // 4 个分析 Agentpublic ResultAggregatorExecutor() : base("ResultAggregator") { }public override async ValueTask HandleAsync(AnalysisResult result, IWorkflowContext context, CancellationToken cancellationToken = default){_results.Add(result);if (_results.Count == ExpectedResultCount){var aggregated = new AggregatedAnalysis{Sentiment = _results.First(r => r.Type == "Sentiment").Value,Keywords = _results.First(r => r.Type == "Keywords").Value,Summary = _results.First(r => r.Type == "Summary").Value,TechStack = _results.First(r => r.Type == "TechStack").Value};await context.SendMessageAsync(aggregated, cancellationToken: cancellationToken);}}
}// 发布 Executor
public class PublisherExecutor : Executor<PublishRequest, PublishResult>
{public PublisherExecutor() : base("Publisher") { }public override async ValueTask<PublishResult> HandleAsync(PublishRequest request, IWorkflowContext context, CancellationToken cancellationToken = default){// 发布到数据库或文件系统var result = await PublishToStorage(request);await context.YieldOutputAsync(result, cancellationToken);return result;}
}
Step 2: 创建 AI Agents
public class DocumentAnalysisAgents
{private readonly IChatClient _client;public DocumentAnalysisAgents(IChatClient client){_client = client;}public ChatClientAgent CreateSentimentAgent() => new(_client,"""You are a sentiment analysis expert. Analyze the provided text and return:- Overall sentiment (Positive/Negative/Neutral)- Confidence score (0-1)- Key phrases that influenced the sentimentReturn your analysis in JSON format.""","sentiment_analyzer");public ChatClientAgent CreateKeywordAgent() => new(_client,"""You are a keyword extraction expert. Extract the most important keywords and phrases.Return top 10 keywords ranked by importance in JSON format.""","keyword_extractor");public ChatClientAgent CreateSummaryAgent() => new(_client,"""You are a summarization expert. Create a concise summary (max 200 words) that captures the main points of the document.""","summarizer");public ChatClientAgent CreateTechStackAgent() => new(_client,"""You are a technology stack detector. Identify programming languages, frameworks,databases, and tools mentioned in the document. Return as a structured list.""","tech_detector");public ChatClientAgent CreateQualityAgent() => new(_client,"""You are a document quality assessor. Evaluate the document based on:- Clarity and coherence- Technical accuracy- Completeness- Professional writing qualityReturn a quality score (0-100) and detailed feedback in JSON format.""","quality_assessor");
}
Step 3: 构建 Workflow
public class DocumentProcessingWorkflow
{public static Workflow Build(IChatClient client){var agents = new DocumentAnalysisAgents(client);// 创建 Executorsvar extractor = new ContentExtractorExecutor();var sentimentAgent = agents.CreateSentimentAgent();var keywordAgent = agents.CreateKeywordAgent();var summaryAgent = agents.CreateSummaryAgent();var techAgent = agents.CreateTechStackAgent();var aggregator = new ResultAggregatorExecutor();var qualityAgent = agents.CreateQualityAgent();var humanReviewPort = RequestPort.Create<QualityReport, ReviewDecision>("HumanReview");var publisher = new PublisherExecutor();// 构建 Workflowvar builder = new WorkflowBuilder(extractor);// Fan-Out: 并行分析builder.AddFanOutEdge(extractor,targets: [sentimentAgent, keywordAgent, summaryAgent, techAgent]);// 所有分析结果汇总builder.AddFanInEdge(aggregator,sources: [sentimentAgent, keywordAgent, summaryAgent, techAgent]);// 质量评估builder.AddEdge(aggregator, qualityAgent);// 条件分支:高质量直接发布,低质量需要人工审核builder.AddEdge<QualityReport>(qualityAgent,publisher,condition: report => report.Score >= 80);builder.AddEdge<QualityReport>(qualityAgent,humanReviewPort,condition: report => report.Score < 80);builder.AddEdge(humanReviewPort, publisher);return builder.WithName("Intelligent Document Processing").WithDescription("Automated document analysis with human-in-the-loop quality control").WithOutputFrom(publisher).Build();}
}
Step 4: 执行 Workflow
public class DocumentProcessingService
{private readonly Workflow _workflow;private readonly CheckpointManager _checkpointManager;public DocumentProcessingService(IChatClient client){_workflow = DocumentProcessingWorkflow.Build(client);_checkpointManager = CheckpointManager.Default;}public async Task<PublishResult> ProcessDocumentAsync(string filePath,CancellationToken cancellationToken = default){var upload = new DocumentUpload { FilePath = filePath };await using var run = await InProcessExecution.StreamAsync(_workflow,upload,_checkpointManager);PublishResult? result = null;await foreach (WorkflowEvent evt in run.Run.WatchStreamAsync()){switch (evt){case ExecutorCompletedEvent completed:Console.WriteLine($"✓ {completed.ExecutorId} completed");break;case SuperStepCompletedEvent superStep:Console.WriteLine($"⚡ Super Step {superStep.CompletionInfo!.SuperStepId} completed");if (superStep.CompletionInfo.Checkpoint is not null){Console.WriteLine($"💾 Checkpoint saved");}break;case RequestInfoEvent request:// 需要人工审核var decision = await RequestHumanReviewAsync(request.Request.DataAs<QualityReport>());await run.Run.SendResponseAsync(request.Request.CreateResponse(decision));break;case WorkflowOutputEvent output:result = output.As<PublishResult>();Console.WriteLine($"✅ Document published: {result!.Url}");break;case WorkflowErrorEvent error:Console.WriteLine($"❌ Error: {error.Message}");throw new Exception(error.Message);}}return result ?? throw new InvalidOperationException("Workflow did not produce output");}private async Task<ReviewDecision> RequestHumanReviewAsync(QualityReport report){Console.WriteLine($"\n⚠️ Human review required!");Console.WriteLine($"Quality Score: {report.Score}/100");Console.WriteLine($"Feedback: {report.Feedback}");Console.Write("Approve? (y/n): ");var input = Console.ReadLine();return new ReviewDecision{Approved = input?.ToLower() == "y",ReviewerComments = input?.ToLower() == "y" ? "Approved by reviewer" : "Rejected - needs improvement"};}
}
8.4 性能分析
让我们对比一下不同实现方式的性能:
方案 1:顺序执行
内容提取(2s) → 情感分析(3s) → 关键词提取(3s) → 摘要生成(4s) → 技术栈识别(3s) → 质量评估(2s)
总耗时:17 秒
方案 2:并行分析(我们的实现)
内容提取(2s) → [并行: 情感(3s), 关键词(3s), 摘要(4s), 技术栈(3s)] → 质量评估(2s)
总耗时:8 秒 (节省 53%)
方案 3:完全并行(包括质量评估)
内容提取(2s) → [并行: 所有分析 + 质量评估]
总耗时:6 秒 (节省 65%)
但可能导致质量评估不准确,因为它需要完整的分析结果
8.5 扩展性考虑
这个系统可以轻松扩展:
1. 添加新的分析维度
var plagiarismAgent = new ChatClientAgent(client, "Detect plagiarism");
builder.AddFanOutEdge(extractor, [/* existing agents */, plagiarismAgent]);
builder.AddFanInEdge(aggregator, [/* existing agents */, plagiarismAgent]);
2. 支持多语言
var languageDetector = new LanguageDetectorExecutor();
builder.AddEdge(extractor, languageDetector);// 根据语言选择不同的分析 Agent
builder.AddEdge<DetectedLanguage>(languageDetector,englishAnalysisAgent,condition: lang => lang.Code == "en"
);
builder.AddEdge<DetectedLanguage>(languageDetector,chineseAnalysisAgent,condition: lang => lang.Code == "zh"
);
3. 集成外部服务
public class ExternalAPIExecutor : Executor<string, APIResult>
{private readonly HttpClient _httpClient;public override async ValueTask<APIResult> HandleAsync(string data, IWorkflowContext context, CancellationToken cancellationToken = default){var response = await _httpClient.PostAsJsonAsync("https://api.example.com/analyze",data,cancellationToken);return await response.Content.ReadFromJsonAsync<APIResult>(cancellationToken);}
}
九、可观测性:让 Workflow 透明化
9.1 内置的事件系统
Agent Framework 提供了丰富的事件类型:
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{switch (evt){case WorkflowStartedEvent started:Console.WriteLine($"🚀 Workflow started: {started.WorkflowId}");break;case ExecutorInvokedEvent invoked:Console.WriteLine($"▶️ Invoking {invoked.ExecutorId}");break;case ExecutorCompletedEvent completed:Console.WriteLine($"✅ {completed.ExecutorId} completed in {completed.Duration}ms");break;case ExecutorFailedEvent failed:Console.WriteLine($"❌ {failed.ExecutorId} failed: {failed.Exception.Message}");break;case SuperStepStartedEvent stepStarted:Console.WriteLine($"⚡ Super Step {stepStarted.StartInfo.SuperStepId} started");break;case SuperStepCompletedEvent stepCompleted:Console.WriteLine($"⚡ Super Step completed with {stepCompleted.CompletionInfo!.ExecutorCount} executors");break;case AgentRunUpdateEvent agentUpdate:Console.Write(agentUpdate.Update.Text); // 实时流式输出break;case WorkflowOutputEvent output:Console.WriteLine($"📤 Output: {output.Data}");break;}
}
9.2 集成 OpenTelemetry
Agent Framework 原生支持 OpenTelemetry:
using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;var tracerProvider = Sdk.CreateTracerProviderBuilder().SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("DocumentProcessingService")).AddSource("Microsoft.Agents.AI.Workflows").AddConsoleExporter().AddOtlpExporter(options =>{options.Endpoint = new Uri("http://localhost:4317");}).Build();// 执行 Workflow,所有操作都会自动记录到 OpenTelemetry
await using var run = await InProcessExecution.RunAsync(workflow, input);
9.3 自定义指标收集
public class MetricsCollectorExecutor : Executor<object>
{private static readonly Counter<long> s_messageCounter = Meter.CreateCounter<long>("workflow.messages.processed");private static readonly Histogram<double> s_processingTime = Meter.CreateHistogram<double>("workflow.processing.duration");public override async ValueTask HandleAsync(object message, IWorkflowContext context, CancellationToken cancellationToken = default){var stopwatch = Stopwatch.StartNew();try{await ProcessMessageAsync(message, cancellationToken);s_messageCounter.Add(1, new KeyValuePair<string, object?>("status", "success"));}catch (Exception ex){s_messageCounter.Add(1, new KeyValuePair<string, object?>("status", "failure"));throw;}finally{stopwatch.Stop();s_processingTime.Record(stopwatch.ElapsedMilliseconds);}}
}
十、Workflow 的高级特性与技巧
10.1 子工作流:模块化的艺术
当 Workflow 变得复杂时,我们需要将其拆分成更小的、可复用的单元。这就是子工作流的用武之地。
10.1.1 为什么需要子工作流?
想象你在构建一个电商系统,订单处理流程可能包括:
-
库存检查
-
支付处理
-
物流安排
-
通知发送
每个环节本身就是一个复杂的流程。如果全部写在一个 Workflow 里,会变成"意大利面代码"。
10.1.2 子工作流的实现
// 定义一个支付处理子工作流
public class PaymentWorkflow
{public static Workflow Build(){var validator = new PaymentValidatorExecutor();var processor = new PaymentProcessorExecutor();var notifier = new PaymentNotifierExecutor();return new WorkflowBuilder(validator).AddEdge(validator, processor).AddEdge(processor, notifier).WithOutputFrom(notifier).Build();}
}// 在主工作流中使用子工作流
public class OrderWorkflow
{public static Workflow Build(){var inventoryCheck = new InventoryCheckExecutor();// 将子工作流包装成 Executorvar paymentWorkflow = new WorkflowHostExecutor(PaymentWorkflow.Build(),"PaymentSubWorkflow");var shipping = new ShippingExecutor();return new WorkflowBuilder(inventoryCheck).AddEdge(inventoryCheck, paymentWorkflow).AddEdge(paymentWorkflow, shipping).WithOutputFrom(shipping).Build();}
}
子工作流的优势:
-
模块化:每个子工作流可以独立开发和测试
-
复用性:支付流程可以在多个地方使用
-
可维护性:修改支付逻辑不影响其他部分
-
可测试性:可以单独测试子工作流
10.1.3 子工作流的通信
子工作流不是黑盒,它可以与父工作流通信:
public class SubworkflowWithEventsExecutor : Executor<OrderData>
{private readonly Workflow _subworkflow;public override async ValueTask HandleAsync(OrderData order, IWorkflowContext context, CancellationToken cancellationToken = default){await using var subRun = await InProcessExecution.StreamAsync(_subworkflow, order);await foreach (WorkflowEvent evt in subRun.WatchStreamAsync()){// 将子工作流的事件转发到父工作流if (evt is SubworkflowWarningEvent warning){await context.AddEventAsync(new WorkflowWarningEvent($"[SubWorkflow] {warning.Message}"),cancellationToken);}else if (evt is WorkflowOutputEvent output){await context.SendMessageAsync(output.Data, cancellationToken: cancellationToken);}}}
}
10.2 动态工作流:运行时构建
有时候,Workflow 的结构需要根据运行时的数据来决定。比如:
-
根据用户权限决定审批流程
-
根据订单金额决定是否需要额外验证
-
根据文档类型选择不同的处理流程
public class DynamicWorkflowBuilder
{public static Workflow BuildForUser(User user, IChatClient client){var start = new StartExecutor();var builder = new WorkflowBuilder(start);// 根据用户角色动态添加步骤if (user.Role == "Admin"){var adminAgent = new ChatClientAgent(client, "Admin assistant");builder.AddEdge(start, adminAgent);}else if (user.Role == "Manager"){var managerAgent = new ChatClientAgent(client, "Manager assistant");var approvalPort = RequestPort.Create<ApprovalRequest, bool>("ManagerApproval");builder.AddEdge(start, managerAgent);builder.AddEdge(managerAgent, approvalPort);}else{var basicAgent = new ChatClientAgent(client, "Basic assistant");builder.AddEdge(start, basicAgent);}return builder.Build();}
}
10.3 循环与迭代
有些场景需要重复执行某些步骤,直到满足条件为止。
10.3.1 简单循环
public class LoopWorkflow
{public static Workflow Build(){var processor = new DataProcessorExecutor();var validator = new ValidatorExecutor();var builder = new WorkflowBuilder(processor);// 如果验证失败,回到处理器重新处理builder.AddEdge<ValidationResult>(validator,processor,condition: result => !result.IsValid);// 如果验证成功,输出结果builder.AddEdge<ValidationResult>(validator,new OutputExecutor(),condition: result => result.IsValid);return builder.Build();}
}
10.3.2 带计数器的循环
public class RetryExecutor : StatefulExecutor<string, string>
{private int _attemptCount = 0;private const int MaxAttempts = 3;public RetryExecutor() : base("RetryExecutor", new StatefulExecutorOptions { StateScope = StateScope.Run }){ }public override async ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default){_attemptCount++;try{return await ProcessWithRetry(message, cancellationToken);}catch (Exception ex) when (_attemptCount < MaxAttempts){await context.AddEventAsync(new WorkflowWarningEvent($"Attempt {_attemptCount} failed, retrying..."),cancellationToken);// 发送消息给自己,触发重试await context.SendMessageAsync(message, cancellationToken: cancellationToken);return string.Empty; // 临时返回}catch (Exception ex){throw new InvalidOperationException($"Failed after {MaxAttempts} attempts", ex);}}
}
10.4 条件路由的高级用法
10.4.1 Switch-Case 模式
public class RouterExecutor : Executor<Message, RoutingDecision>
{public override ValueTask<RoutingDecision> HandleAsync(Message message, IWorkflowContext context, CancellationToken cancellationToken = default){var decision = message.Type switch{"urgent" => new RoutingDecision { Target = "UrgentHandler", Priority = 1 },"normal" => new RoutingDecision { Target = "NormalHandler", Priority = 2 },"low" => new RoutingDecision { Target = "LowPriorityHandler", Priority = 3 },_ => new RoutingDecision { Target = "DefaultHandler", Priority = 4 }};return ValueTask.FromResult(decision);}
}// 在 WorkflowBuilder 中使用
var router = new RouterExecutor();
var urgentHandler = new UrgentHandlerExecutor();
var normalHandler = new NormalHandlerExecutor();
var lowHandler = new LowPriorityHandlerExecutor();builder.AddEdge<RoutingDecision>(router, urgentHandler, condition: d => d.Target == "UrgentHandler");
builder.AddEdge<RoutingDecision>(router, normalHandler, condition: d => d.Target == "NormalHandler");
builder.AddEdge<RoutingDecision>(router, lowHandler, condition: d => d.Target == "LowPriorityHandler");
10.4.2 多条件选择
有时候一个消息需要同时发送到多个目标:
// 使用 Fan-Out 的 partitioner 实现多选
builder.AddFanOutEdge<Message>(source,partitioner: (message, count) =>{var targets = new List<int>();if (message.RequiresLogging)targets.Add(0); // Loggerif (message.RequiresNotification)targets.Add(1); // Notifierif (message.RequiresArchiving)targets.Add(2); // Archiverreturn targets;},targets: [logger, notifier, archiver]
);
十一、生产环境部署考虑
11.1 容错与恢复
11.1.1 Checkpoint 持久化
在生产环境中,内存中的 Checkpoint 是不够的,我们需要持久化存储:
public class DatabaseCheckpointStore : ICheckpointStore
{private readonly DbContext _dbContext;public async Task SaveCheckpointAsync(string checkpointId, byte[] data, CancellationToken cancellationToken){var checkpoint = new CheckpointEntity{Id = checkpointId,Data = data,CreatedAt = DateTime.UtcNow};_dbContext.Checkpoints.Add(checkpoint);await _dbContext.SaveChangesAsync(cancellationToken);}public async Task<byte[]?> LoadCheckpointAsync(string checkpointId, CancellationToken cancellationToken){var checkpoint = await _dbContext.Checkpoints.FirstOrDefaultAsync(c => c.Id == checkpointId, cancellationToken);return checkpoint?.Data;}
}// 使用自定义的 Checkpoint Store
var checkpointManager = new CheckpointManager(new DatabaseCheckpointStore(dbContext)
);
11.1.2 分布式 Checkpoint
对于分布式系统,可以使用 Redis 或 Azure Blob Storage:
public class RedisCheckpointStore : ICheckpointStore
{private readonly IConnectionMultiplexer _redis;public async Task SaveCheckpointAsync(string checkpointId, byte[] data, CancellationToken cancellationToken){var db = _redis.GetDatabase();await db.StringSetAsync($"checkpoint:{checkpointId}", data,expiry: TimeSpan.FromDays(7) // 7 天后自动过期);}public async Task<byte[]?> LoadCheckpointAsync(string checkpointId, CancellationToken cancellationToken){var db = _redis.GetDatabase();var data = await db.StringGetAsync($"checkpoint:{checkpointId}");return data.HasValue ? (byte[])data : null;}
}
11.2 性能监控
11.2.1 集成 Application Insights
using Microsoft.ApplicationInsights;
using Microsoft.ApplicationInsights.DataContracts;public class MonitoredWorkflowExecutor : Executor<string>
{private readonly TelemetryClient _telemetry;public MonitoredWorkflowExecutor(TelemetryClient telemetry) : base("MonitoredExecutor"){_telemetry = telemetry;}public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default){using var operation = _telemetry.StartOperation<RequestTelemetry>("ProcessMessage");operation.Telemetry.Properties["MessageLength"] = message.Length.ToString();try{await ProcessMessageAsync(message, cancellationToken);operation.Telemetry.Success = true;}catch (Exception ex){operation.Telemetry.Success = false;_telemetry.TrackException(ex);throw;}}
}
11.2.2 自定义健康检查
public class WorkflowHealthCheck : IHealthCheck
{private readonly Workflow _workflow;public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default){try{// 执行一个简单的测试 Workflowawait using var run = await InProcessExecution.RunAsync(_workflow, "health-check",cancellationToken: cancellationToken);var completed = false;await foreach (var evt in run.NewEvents){if (evt is WorkflowOutputEvent){completed = true;break;}}return completed ? HealthCheckResult.Healthy("Workflow is operational"): HealthCheckResult.Degraded("Workflow did not complete");}catch (Exception ex){return HealthCheckResult.Unhealthy("Workflow failed", ex);}}
}// 在 Startup.cs 中注册
services.AddHealthChecks().AddCheck<WorkflowHealthCheck>("workflow");
11.3 安全性考虑
11.3.1 输入验证
public class SecureExecutor : Executor<UserInput, ProcessedData>
{public override async ValueTask<ProcessedData> HandleAsync(UserInput input, IWorkflowContext context, CancellationToken cancellationToken = default){// 验证输入if (string.IsNullOrWhiteSpace(input.Data)){throw new ArgumentException("Input data cannot be empty");}// 检查输入长度if (input.Data.Length > 10000){throw new ArgumentException("Input data exceeds maximum length");}// 清理潜在的恶意内容var sanitized = SanitizeInput(input.Data);return await ProcessSecurely(sanitized, cancellationToken);}private string SanitizeInput(string input){// 移除 HTML 标签、SQL 注入尝试等return System.Net.WebUtility.HtmlEncode(input);}
}
11.3.2 敏感数据处理
public class SecureDataExecutor : Executor<SensitiveData>
{private readonly IDataProtector _protector;public SecureDataExecutor(IDataProtectionProvider provider) : base("SecureDataExecutor"){_protector = provider.CreateProtector("WorkflowDataProtection");}protected internal override async ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default){// 在保存 Checkpoint 前加密敏感数据if (_sensitiveData != null){_encryptedData = _protector.Protect(_sensitiveData);_sensitiveData = null; // 清除明文}}protected internal override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default){// 从 Checkpoint 恢复后解密数据if (_encryptedData != null){_sensitiveData = _protector.Unprotect(_encryptedData);}}
}
11.4 扩展性设计
11.4.1 水平扩展
对于高负载场景,可以将 Workflow 部署到多个实例:
public class DistributedWorkflowService
{private readonly IMessageQueue _queue;private readonly Workflow _workflow;public async Task EnqueueWorkflowAsync(WorkflowRequest request){// 将请求放入队列await _queue.PublishAsync("workflow-queue", request);}public async Task ProcessWorkflowsAsync(CancellationToken cancellationToken){// 多个实例可以并行处理队列中的请求await foreach (var request in _queue.SubscribeAsync("workflow-queue", cancellationToken)){try{await using var run = await InProcessExecution.RunAsync(_workflow, request.Input,cancellationToken: cancellationToken);// 处理结果...}catch (Exception ex){// 错误处理和重试逻辑await HandleErrorAsync(request, ex);}}}
}
11.4.2 资源限制
public class ThrottledWorkflowService
{private readonly SemaphoreSlim _semaphore;private readonly Workflow _workflow;public ThrottledWorkflowService(Workflow workflow, int maxConcurrency = 10){_workflow = workflow;_semaphore = new SemaphoreSlim(maxConcurrency);}public async Task<TOutput> ExecuteAsync<TInput, TOutput>(TInput input, CancellationToken cancellationToken = default){await _semaphore.WaitAsync(cancellationToken);try{await using var run = await InProcessExecution.RunAsync(_workflow, input,cancellationToken: cancellationToken);// 提取输出...return default!;}finally{_semaphore.Release();}}
}
十二、未来展望与趋势
12.1 AI 驱动的工作流优化
未来的 Workflow 系统可能会使用 AI 来自动优化执行路径:
// 概念性代码 - 未来可能的功能
public class AIOptimizedWorkflow
{private readonly Workflow _workflow;private readonly AIOptimizer _optimizer;public async Task<Workflow> OptimizeAsync(){// AI 分析历史执行数据var executionHistory = await LoadExecutionHistoryAsync();// 识别瓶颈和优化机会var insights = await _optimizer.AnalyzeAsync(executionHistory);// 自动重构 Workflowif (insights.SuggestParallelization){return RebuildWithParallelism(_workflow, insights.ParallelizableSteps);}return _workflow;}
}
12.2 低代码/无代码平台集成
声明式 Workflow 为低代码平台铺平了道路:
[可视化设计器]↓ 导出
[JSON Workflow 定义]↓ 加载
[Agent Framework 执行]
未来可能会有:
-
拖拽式 Workflow 设计器
-
实时预览和调试
-
模板市场(预定义的 Workflow 模板)
-
协作编辑(多人同时设计 Workflow)
12.3 边缘计算与 Workflow
随着边缘计算的发展,Workflow 可能会在边缘设备上运行:
// IoT 设备上的轻量级 Workflow
public class EdgeWorkflow
{public static Workflow BuildForEdge(){var sensor = new SensorDataExecutor();var filter = new DataFilterExecutor();var localAnalysis = new LocalAnalysisExecutor();var cloudSync = new CloudSyncExecutor();return new WorkflowBuilder(sensor).AddEdge(sensor, filter).AddEdge<SensorData>(filter,localAnalysis,condition: data => data.IsNormal // 正常数据本地处理).AddEdge<SensorData>(filter,cloudSync,condition: data => !data.IsNormal // 异常数据上传云端).Build();}
}
12.4 多模态 Workflow
未来的 Workflow 将支持更多模态的数据处理:
public class MultiModalWorkflow
{public static Workflow Build(IChatClient client){var imageAnalyzer = new ImageAnalysisAgent(client);var audioTranscriber = new AudioTranscriptionAgent(client);var textAnalyzer = new TextAnalysisAgent(client);var videoProcessor = new VideoProcessingAgent(client);var synthesizer = new MultiModalSynthesisAgent(client);return new WorkflowBuilder(new InputRouterExecutor()).AddFanOutEdge(router,targets: [imageAnalyzer, audioTranscriber, textAnalyzer, videoProcessor]).AddFanInEdge(synthesizer, sources: [imageAnalyzer, audioTranscriber, textAnalyzer, videoProcessor]).WithOutputFrom(synthesizer).Build();}
}
更多AIGC文章
RAG技术全解:从原理到实战的简明指南
更多VibeCoding文章

