当 AI 工作流需要“人类智慧“:深度解析 Microsoft Agent Framework 的人工接入机制
"再聪明的 AI,也有需要人类拍板的时候。"
引子:一个真实的困境
想象这样一个场景:你的 AI 客服系统正在处理一个客户的退款申请。系统已经自动分析了订单信息、物流状态、用户历史记录,甚至通过情感分析判断出客户情绪有些激动。但就在最后一步——是否批准这笔超出常规金额的退款时,AI 犹豫了。
这不是 AI 的能力问题,而是责任边界的问题。某些决策,天然需要人类的判断、经验和担当。
这就是我们今天要聊的话题:在自动化的 AI 工作流中,如何优雅地让人类接管控制权?
一、为什么工作流需要"人工接入"?
1.1 自动化的边界
AI 工作流的魅力在于自动化,但自动化并非万能。在企业级应用中,我们经常遇到这些场景:
-
高风险决策:涉及大额资金、法律责任的操作
-
异常情况处理:超出预设规则的边缘案例
-
质量把关:内容审核、创意评估等主观判断
-
合规要求:某些行业明确要求"人在回路"(Human-in-the-Loop)
用一个比喻来说:AI 工作流就像高速公路上的自动驾驶,大部分时候可以放心交给系统。但遇到复杂路况、突发事件,还是需要人类司机接管方向盘。
1.2 两种人工接入模式
Microsoft Agent Framework 提供了两种截然不同的人工接入方式,分别适用于不同的场景:
模式一:RequestPort(请求端口)
-
适用于:编程式工作流(Programmatic Workflows)
-
特点:灵活、强类型、完全可控
-
场景:复杂业务逻辑、需要精确控制流程的场合
模式二:Question Action(问答动作)
-
适用于:声明式工作流(Declarative Workflows)
-
特点:配置化、低代码、快速部署
-
场景:标准化流程、业务人员可配置的场景
接下来,我们深入剖析这两种模式的实现原理和最佳实践。
二、RequestPort:编程式工作流的"暂停按钮"
2.1 核心概念解析
RequestPort 的设计哲学很有意思:它把人工接入抽象成了一个"特殊的执行器"。在工作流的视角里,人类和 AI Agent 没有本质区别——都是接收请求、返回响应的处理单元。
让我们看看核心数据结构:
// RequestPort 定义了一个"请求-响应"契约
public record RequestPort<TRequest, TResponse>(string Id, // 端口唯一标识Type Request, // 请求类型Type Response, // 响应类型bool AllowWrapped // 是否允许类型包装
) : RequestPort(Id, Request, Response);
这个设计的精妙之处在于:通过泛型约束,在编译期就确保了类型安全。你不可能把一个 int 类型的响应发送给期望 string 的请求端口。
2.2 工作流程深度剖析
RequestPort 的执行流程涉及两个"超级步骤"(SuperStep):
第一步:发出请求
// 工作流执行到 RequestPort 时
RequestPort numberRequest = RequestPort.Create<NumberSignal, int>("GuessNumber");// 系统自动生成 RequestInfoEvent
public sealed class RequestInfoEvent(ExternalRequest request) : WorkflowEvent(request)
{public ExternalRequest Request => request;
}
这个事件会被发送到"外部世界"——也就是你的应用程序。此时工作流进入等待状态,就像一个暂停的视频。
第二步:接收响应
// 外部系统处理请求
private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{if (request.DataIs<NumberSignal>()){int userGuess = ReadIntegerFromConsole("请输入你的猜测: ");return request.CreateResponse(userGuess);}throw new NotSupportedException($"不支持的请求类型");
}// 将响应发送回工作流
await handle.SendResponseAsync(response);
工作流收到响应后,从暂停点恢复执行,就像视频继续播放。
2.3 实战案例:数字猜谜游戏
让我们通过一个完整的例子来理解 RequestPort 的威力:
internal static class WorkflowFactory
{internal static Workflow BuildWorkflow(){// 创建请求端口:请求类型是 NumberSignal,响应类型是 intRequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");// 创建判断执行器JudgeExecutor judgeExecutor = new(targetNumber: 42);// 构建循环工作流return new WorkflowBuilder(numberRequestPort).AddEdge(numberRequestPort, judgeExecutor) // 用户输入 -> 判断.AddEdge(judgeExecutor, numberRequestPort) // 判断结果 -> 再次请求输入.WithOutputFrom(judgeExecutor).Build();}
}
这个工作流的巧妙之处在于:它形成了一个人机交互的闭环。
// 判断执行器的实现
internal sealed class JudgeExecutor(int targetNumber) : Executor<int>("Judge")
{private int _tries;public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default){this._tries++;if (message == targetNumber){// 猜对了,输出结果并结束await context.YieldOutputAsync($"恭喜!{targetNumber} 在 {_tries} 次尝试后被找到!", cancellationToken);}else if (message < targetNumber){// 太小了,发送信号继续循环await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);}else{// 太大了,发送信号继续循环await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);}}
}
2.4 设计亮点分析
亮点一:类型安全的异步通信
传统的人工接入往往依赖字符串、JSON 等弱类型方式。RequestPort 通过泛型实现了强类型约束:
// ExternalRequest 提供了类型安全的访问方法
public TValue? DataAs<TValue>() => this.Data.As<TValue>();public bool DataIs<TValue>() => this.Data.Is<TValue>();// 使用时编译器会检查类型匹配
if (request.DataIs<NumberSignal>(out var signal))
{// 这里 signal 已经是强类型的 NumberSignalswitch (signal){case NumberSignal.Init:// 处理初始化信号break;}
}
亮点二:请求-响应的强关联
每个请求都有唯一的 RequestId,响应必须匹配对应的请求:
public record ExternalRequest(RequestPortInfo PortInfo,string RequestId, // 唯一标识PortableValue Data
)
{// 创建响应时自动关联 RequestIdpublic ExternalResponse CreateResponse<T>(T data) => new ExternalResponse(this.PortInfo, this.RequestId, new PortableValue(data));
}
这种设计避免了"响应错位"的问题——即使有多个并发请求,也能准确匹配。
亮点三:与 Checkpoint 的完美结合
RequestPort 天然支持工作流的检查点(Checkpoint)机制:
// 每个 SuperStep 结束时自动创建检查点
case SuperStepCompletedEvent superStepCompletedEvt:CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;if (checkpoint is not null){checkpoints.Add(checkpoint);Console.WriteLine($"检查点已创建于步骤 {checkpoints.Count}");}break;// 可以从任意检查点恢复
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None);
这意味着:即使系统崩溃,也可以从人工接入的断点恢复,不会丢失用户输入。
三、Question Action:声明式工作流的"人机对话"
3.1 从配置到执行
如果说 RequestPort 是给开发者用的"手术刀",那么 Question Action 就是给业务人员用的"瑞士军刀"。它通过声明式配置实现人工接入,无需编写代码。
Question Action 的核心是 QuestionExecutor,它实现了一个完整的问答流程:
internal sealed class QuestionExecutor(Question model, // 问题配置模型WorkflowAgentProvider agentProvider, // Agent 提供者WorkflowFormulaState state // 公式状态
) : DeclarativeActionExecutor<Question>(model, state)
3.2 三阶段执行模型
Question Action 的执行分为三个精心设计的阶段:
阶段一:Prepare(准备)
protected override async ValueTask<object?> ExecuteAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
{// 初始化提示计数await this._promptCount.WriteAsync(context, 0).ConfigureAwait(false);// 检查变量是否已有值InitializablePropertyPath variable = Throw.IfNull(this.Model.Variable);bool hasValue = context.ReadState(variable.Path) is BlankValue;// 根据 SkipQuestionMode 决定是否跳过提问SkipQuestionMode mode = this.Evaluator.GetValue(this.Model.SkipQuestionMode).Value;bool proceed = mode switch{SkipQuestionMode.SkipOnFirstExecutionIfVariableHasValue => !await this._hasExecuted.ReadAsync(context),SkipQuestionMode.AlwaysSkipIfVariableHasValue => hasValue,SkipQuestionMode.AlwaysAsk => true,_ => true,};if (proceed){await this.PromptAsync(context, cancellationToken);}
}
这个设计很聪明:它允许工作流"记住"之前的回答,避免重复提问。
阶段二:Input(输入)
public async ValueTask PrepareResponseAsync(IWorkflowContext context, ActionExecutorResult message, CancellationToken cancellationToken)
{int count = await this._promptCount.ReadAsync(context);// 格式化提示信息AnswerRequest inputRequest = new(this.FormatPrompt(this.Model.Prompt));// 发送到外部系统await context.SendMessageAsync(inputRequest, cancellationToken);// 更新提示计数await this._promptCount.WriteAsync(context, count + 1);
}
阶段三:Capture(捕获)
public async ValueTask CaptureResponseAsync(IWorkflowContext context, AnswerResponse message, CancellationToken cancellationToken)
{FormulaValue? extractedValue = null;if (message.Value is null){// 无法识别的响应string unrecognizedResponse = this.FormatPrompt(this.Model.UnrecognizedPrompt);await context.AddEventAsync(new MessageActivityEvent(unrecognizedResponse.Trim()), cancellationToken);}else{// 实体提取和验证EntityExtractionResult entityResult = EntityExtractor.Parse(this.Model.Entity, message.Value.Text);if (entityResult.IsValid){extractedValue = entityResult.Value;}else{// 无效响应string invalidResponse = this.Model.InvalidPrompt is not null ? this.FormatPrompt(this.Model.InvalidPrompt) : "无效响应";await context.AddEventAsync(new MessageActivityEvent(invalidResponse.Trim()), cancellationToken);}}if (extractedValue is null){// 重新提示await this.PromptAsync(context, cancellationToken);}else{// 保存到变量await this.AssignAsync(this.Model.Variable?.Path, extractedValue, context);await this._hasExecuted.WriteAsync(context, true);await context.SendResultMessageAsync(this.Id, cancellationToken);}
}
3.3 智能重试机制
Question Action 内置了优雅的重试逻辑:
private async ValueTask PromptAsync(IWorkflowContext context, CancellationToken cancellationToken)
{long repeatCount = this.Evaluator.GetValue(this.Model.RepeatCount).Value;int actualCount = await this._promptCount.ReadAsync(context);if (actualCount >= repeatCount){// 达到最大重试次数,使用默认值ValueExpression defaultValueExpression = Throw.IfNull(this.Model.DefaultValue);DataValue defaultValue = this.Evaluator.GetValue(defaultValueExpression).Value;await this.AssignAsync(this.Model.Variable?.Path, defaultValue.ToFormula(), context);string defaultValueResponse = this.FormatPrompt(this.Model.DefaultValueResponse);await context.AddEventAsync(new MessageActivityEvent(defaultValueResponse.Trim()), cancellationToken);await context.SendResultMessageAsync(this.Id, cancellationToken);}else{// 继续等待用户输入await context.SendResultMessageAsync(this.Id, result: true, cancellationToken);}
}
这个设计体现了容错性思维:如果用户多次输入无效,系统不会死锁,而是使用预设的默认值继续执行。
3.4 实体提取:从文本到结构化数据
Question Action 的一个强大功能是实体提取(Entity Extraction):
// 从用户输入中提取结构化信息
EntityExtractionResult entityResult = EntityExtractor.Parse(this.Model.Entity, // 实体定义(如:日期、数字、邮箱)message.Value.Text // 用户输入的文本
);if (entityResult.IsValid)
{// 提取成功,转换为 PowerFx 公式值extractedValue = entityResult.Value;
}
这意味着你可以配置工作流识别特定类型的输入:
-
日期时间:"明天下午3点" →
DateTime(2024-11-12 15:00:00) -
数字:"一百二十三" →
123 -
邮箱:"user@example.com" → 验证格式并提取
四、两种模式的对比与选择
4.1 技术特性对比
| 特性 | RequestPort | Question Action |
|---|---|---|
| 适用场景 | 编程式工作流 | 声明式工作流 |
| 配置方式 | 代码定义 | JSON/配置文件 |
| 类型安全 | 编译期强类型 | 运行时类型检查 |
| 灵活性 | 极高,完全可控 | 中等,受配置限制 |
| 学习曲线 | 陡峭,需要编程知识 | 平缓,业务人员可用 |
| 实体提取 | 需自行实现 | 内置支持 |
| 重试机制 | 需自行实现 | 内置支持 |
| 状态管理 | 手动管理 | 自动管理 |
4.2 应用场景建议
选择 RequestPort 的场景:
-
复杂业务逻辑
-
需要根据上下文动态决定请求内容
-
涉及多轮复杂交互
-
需要精确控制流程分支
-
-
高性能要求
-
需要最小化运行时开销
-
对类型安全有严格要求
-
需要编译期错误检查
-
-
开发者主导
-
团队具备较强编程能力
-
需要深度定制化
-
与现有代码库深度集成
-
选择 Question Action 的场景:
-
标准化流程
-
问答流程相对固定
-
需要快速配置和部署
-
业务规则频繁变化
-
-
低代码需求
-
业务人员需要自主配置
-
减少开发依赖
-
快速原型验证
-
-
内置功能优先
-
需要实体提取能力
-
需要自动重试机制
-
需要多语言支持
-
五、实战技巧与最佳实践
5.1 RequestPort 的高级用法
技巧一:使用枚举信号控制流程
// 定义清晰的信号枚举
internal enum ApprovalSignal
{PendingReview, // 待审核RequestMoreInfo, // 需要更多信息Approved, // 已批准Rejected // 已拒绝
}// 使用信号驱动工作流
RequestPort approvalPort = RequestPort.Create<ApprovalSignal, ApprovalDecision>("Approval");// 在执行器中根据信号做出不同响应
public override async ValueTask HandleAsync(ApprovalDecision decision, IWorkflowContext context, CancellationToken cancellationToken)
{switch (decision.Signal){case ApprovalSignal.Approved:await context.SendMessageAsync(ProcessApproval(decision), cancellationToken);break;case ApprovalSignal.RequestMoreInfo:await context.SendMessageAsync(ApprovalSignal.PendingReview, cancellationToken);break;case ApprovalSignal.Rejected:await context.YieldOutputAsync($"申请被拒绝:{decision.Reason}", cancellationToken);break;}
}
技巧二:携带上下文信息
// 定义富信息的请求类型
internal sealed class ApprovalRequest
{public string RequestId { get; init; }public decimal Amount { get; init; }public string Reason { get; init; }public Dictionary<string, object> Context { get; init; }
}// 在外部处理时可以访问完整上下文
private static ExternalResponse HandleApprovalRequest(ExternalRequest request)
{if (request.DataIs<ApprovalRequest>(out var approvalReq)){Console.WriteLine($"审批请求 {approvalReq.RequestId}");Console.WriteLine($"金额: {approvalReq.Amount:C}");Console.WriteLine($"原因: {approvalReq.Reason}");// 展示上下文信息foreach (var kvp in approvalReq.Context){Console.WriteLine($" {kvp.Key}: {kvp.Value}");}// 人工决策var decision = GetHumanDecision();return request.CreateResponse(decision);}throw new NotSupportedException();
}
技巧三:超时处理
// 在外部处理中实现超时逻辑
private static async Task<ExternalResponse> HandleWithTimeout(ExternalRequest request,TimeSpan timeout)
{using var cts = new CancellationTokenSource(timeout);try{// 等待人工输入,但设置超时var response = await GetHumanInputAsync(request, cts.Token);return request.CreateResponse(response);}catch (OperationCanceledException){// 超时后使用默认决策Console.WriteLine("人工审批超时,使用默认策略");return request.CreateResponse(GetDefaultDecision());}
}
5.2 Question Action 的配置技巧
技巧一:多语言提示
{"type": "Question","id": "AskUserName","variable": "userName","prompt": {"text": "=If(User.Language = \"zh-CN\", \"请输入您的姓名\", \"Please enter your name\")"},"entity": "Text","invalidPrompt": {"text": "=If(User.Language = \"zh-CN\", \"输入无效,请重试\", \"Invalid input, please try again\")"}
}
技巧二:条件跳过
{"type": "Question","id": "ConfirmEmail","variable": "emailConfirmed","skipQuestionMode": "AlwaysSkipIfVariableHasValue","prompt": {"text": "请确认您的邮箱地址"}
}
技巧三:智能默认值
{"type": "Question","id": "AskDeliveryDate","variable": "deliveryDate","prompt": {"text": "请选择配送日期"},"entity": "Date","repeatCount": 3,"defaultValue": "=DateAdd(Today(), 3, Days)","defaultValueResponse": {"text": "未收到有效输入,已自动设置为3天后配送"}
}
5.3 通用最佳实践
实践一:清晰的状态管理
// 使用 DurableProperty 管理持久化状态
private readonly DurableProperty<int> _promptCount = new(nameof(_promptCount));
private readonly DurableProperty<bool> _hasExecuted = new(nameof(_hasExecuted));// 在 Checkpoint 时自动保存
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
{return context.QueueStateUpdateAsync(StateKey, this._currentState, cancellationToken);
}// 恢复时自动加载
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
{this._currentState = await context.ReadStateAsync<State>(StateKey, cancellationToken);
}
实践二:优雅的错误处理
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{try{switch (evt){case RequestInfoEvent requestEvt:var response = await HandleExternalRequest(requestEvt.Request);await handle.SendResponseAsync(response);break;case WorkflowErrorEvent errorEvt:// 记录错误但不中断流程_logger.LogError(errorEvt.Exception, "工作流执行错误");await NotifyAdministrator(errorEvt);break;case WorkflowOutputEvent outputEvt:await ProcessOutput(outputEvt.Data);break;}}catch (Exception ex){_logger.LogCritical(ex, "处理工作流事件时发生严重错误");// 决定是否继续或中止if (IsCriticalError(ex)){throw;}}
}
实践三:可观测性设计
// 使用 Activity 追踪人工接入过程
using var activity = ActivitySource.StartActivity("HumanIntervention");
activity?.SetTag("request.id", request.RequestId);
activity?.SetTag("request.type", request.PortInfo.RequestType);
activity?.SetTag("user.id", currentUserId);var startTime = DateTime.UtcNow;
var response = await GetHumanInput(request);
var duration = DateTime.UtcNow - startTime;activity?.SetTag("response.duration_ms", duration.TotalMilliseconds);
activity?.SetTag("response.type", response.GetType().Name);// 记录指标
_metrics.RecordHumanInterventionDuration(duration);
_metrics.IncrementHumanInterventionCount(request.PortInfo.PortId);
六、架构设计的深层思考
6.1 为什么是"端口"而不是"回调"?
很多人第一次看到 RequestPort 会疑惑:为什么不直接用回调函数?
// 假设的回调方式(不推荐)
workflow.OnNeedHumanInput += async (request) => {var response = await GetHumanInput(request);return response;
};
RequestPort 的设计更优雅,原因有三:
1. 解耦性 端口是工作流定义的一部分,而不是运行时的副作用。这意味着:
-
工作流的结构在构建时就确定了
-
可以静态分析工作流的人工接入点
-
便于可视化和文档生成
2. 可序列化 RequestPort 的状态可以完整序列化到 Checkpoint:
public sealed record RequestPort<TRequest, TResponse>(string Id,Type Request,Type Response,bool AllowWrapped
) : RequestPort(Id, Request, Response);
而回调函数无法序列化,这会破坏 Checkpoint 机制。
3. 类型安全 端口在编译期就确定了请求和响应的类型契约,而回调的类型检查往往推迟到运行时。
6.2 SuperStep:工作流的"心跳"
理解 SuperStep 是掌握人工接入的关键。
什么是 SuperStep?
SuperStep 是工作流执行的原子单位。在一个 SuperStep 内:
-
所有就绪的执行器并发执行
-
所有消息在内存中传递
-
状态更新被缓存
SuperStep 结束时:
-
状态更新被提交
-
创建 Checkpoint(如果启用)
-
事件被发送到外部
为什么人工接入需要两个 SuperStep?
SuperStep 1: 工作流 → RequestInfoEvent → 外部系统(工作流暂停,等待响应)SuperStep 2: 外部系统 → ExternalResponse → 工作流(工作流恢复执行)
这种设计保证了:
-
一致性:每个 SuperStep 都是原子的
-
可恢复性:可以从任意 SuperStep 恢复
-
可观测性:每个 SuperStep 都有明确的边界
6.3 PortableValue:跨边界的类型桥梁
PortableValue 是一个精妙的设计,它解决了类型在不同上下文间传递的问题:
public record PortableValue(object? Value)
{public T? As<T>() => Value is T typed ? typed : default;public bool Is<T>() => Value is T;public object? AsType(Type targetType){if (Value is null) return null;if (targetType.IsInstanceOfType(Value)) return Value;// 尝试类型转换return Convert.ChangeType(Value, targetType);}
}
它的作用是:
-
类型擦除:在序列化时隐藏具体类型
-
类型恢复:在反序列化时恢复类型
-
类型转换:在必要时进行安全的类型转换
这使得工作流可以在不同的进程、甚至不同的机器间传递数据。
七、真实场景应用案例
7.1 案例一:智能客服的人工升级
场景描述: 客服 AI 处理用户咨询,当遇到复杂问题或用户情绪激动时,自动转接人工客服。
实现方案:
// 定义升级信号
internal enum EscalationSignal
{ComplexQuery, // 复杂查询EmotionalUser, // 情绪化用户HighValueCustomer, // 高价值客户PolicyException // 政策例外
}// 创建人工接入端口
RequestPort humanAgentPort = RequestPort.Create<EscalationContext, AgentResponse>("HumanAgent");// 构建工作流
return new WorkflowBuilder(aiChatBot).AddEdge(aiChatBot, complexityAnalyzer).AddEdge(complexityAnalyzer, humanAgentPort, condition: result => result.RequiresHuman).AddEdge(humanAgentPort, responseFormatter).WithOutputFrom(responseFormatter).Build();
关键实现:
internal sealed class ComplexityAnalyzer : Executor<ChatMessage, AnalysisResult>("Analyzer")
{public override async ValueTask<AnalysisResult> HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken){// 分析消息复杂度var sentiment = await _sentimentAnalyzer.AnalyzeAsync(message.Content);var complexity = await _complexityScorer.ScoreAsync(message.Content);var customerValue = await _crmService.GetCustomerValueAsync(message.UserId);bool requiresHuman = sentiment.IsNegative && sentiment.Intensity > 0.7 ||complexity.Score > 0.8 ||customerValue == CustomerTier.Premium;return new AnalysisResult{RequiresHuman = requiresHuman,Reason = DetermineReason(sentiment, complexity, customerValue),Context = new EscalationContext{OriginalMessage = message,Sentiment = sentiment,Complexity = complexity,CustomerTier = customerValue}};}
}
人工客服界面:
private static async Task<ExternalResponse> HandleHumanAgent(ExternalRequest request)
{if (request.DataIs<EscalationContext>(out var context)){// 展示给人工客服的界面Console.WriteLine("=== 人工接入请求 ===");Console.WriteLine($"原因: {context.Reason}");Console.WriteLine($"客户等级: {context.CustomerTier}");Console.WriteLine($"情绪分析: {context.Sentiment}");Console.WriteLine($"原始消息: {context.OriginalMessage.Content}");Console.WriteLine("==================");// 人工客服处理var agentResponse = await GetAgentResponseFromUI();return request.CreateResponse(agentResponse);}throw new NotSupportedException();
}
7.2 案例二:内容审核工作流
场景描述: 用户生成内容(UGC)需要经过 AI 初审和人工复审的两级审核。
实现方案:
// 使用 Question Action 配置审核流程
{"workflow": {"name": "ContentModerationWorkflow","actions": [{"type": "InvokeAzureAgent","id": "AIModeration","agentId": "content-moderator","input": "=Content.Text"},{"type": "ConditionGroup","id": "CheckAIResult","conditions": [{"condition": "=AIModeration.Result.Confidence < 0.8","actions": [{"type": "Question","id": "HumanReview","variable": "humanDecision","prompt": {"text": "=Concatenate(\"AI 审核置信度较低 (\", Text(AIModeration.Result.Confidence), \"),请人工复审:\", Content.Text)"},"entity": "Choice","choices": ["通过", "拒绝", "需要更多信息"],"repeatCount": 1,"defaultValue": "拒绝","defaultValueResponse": {"text": "审核超时,内容已被自动拒绝"}}]}]},{"type": "SetVariable","id": "FinalDecision","variable": "finalResult","value": "=If(IsBlank(humanDecision), AIModeration.Result.Decision, humanDecision)"}]}
}
7.3 案例三:财务审批流程
场景描述: 费用报销需要根据金额大小,自动路由到不同级别的审批人。
实现方案:
internal static class ExpenseApprovalWorkflow
{public static Workflow Build(){// 创建多级审批端口var managerApproval = RequestPort.Create<ApprovalRequest, ApprovalDecision>("ManagerApproval");var directorApproval = RequestPort.Create<ApprovalRequest, ApprovalDecision>("DirectorApproval");var cfoApproval = RequestPort.Create<ApprovalRequest, ApprovalDecision>("CFOApproval");var router = new ApprovalRouter();var processor = new ExpenseProcessor();return new WorkflowBuilder(router)// 根据金额路由到不同审批人.AddEdge(router, managerApproval, condition: req => req.Amount < 1000).AddEdge(router, directorApproval, condition: req => req.Amount >= 1000 && req.Amount < 10000).AddEdge(router, cfoApproval, condition: req => req.Amount >= 10000)// 所有审批结果汇总到处理器.AddFanInEdge([managerApproval, directorApproval, cfoApproval], processor).WithOutputFrom(processor).Build();}
}internal sealed class ApprovalRouter : Executor<ExpenseRequest, ApprovalRequest>("Router")
{public override async ValueTask<ApprovalRequest> HandleAsync(ExpenseRequest request, IWorkflowContext context, CancellationToken cancellationToken){// 丰富审批请求信息var approvalRequest = new ApprovalRequest{RequestId = request.Id,Amount = request.Amount,Category = request.Category,Submitter = request.SubmitterId,Reason = request.Reason,Attachments = request.Attachments,Context = new Dictionary<string, object>{["SubmissionDate"] = DateTime.UtcNow,["Department"] = await GetDepartment(request.SubmitterId),["BudgetRemaining"] = await GetBudgetRemaining(request.Category)}};return approvalRequest;}
}
多级审批处理:
private static async Task RunApprovalWorkflow(Workflow workflow)
{await using var handle = await InProcessExecution.StreamAsync(workflow, new ExpenseRequest { Amount = 5000, Category = "Travel" });await foreach (WorkflowEvent evt in handle.WatchStreamAsync()){switch (evt){case RequestInfoEvent requestEvt:// 根据端口 ID 路由到不同审批人var response = requestEvt.Request.PortInfo.PortId switch{"ManagerApproval" => await GetManagerApproval(requestEvt.Request),"DirectorApproval" => await GetDirectorApproval(requestEvt.Request),"CFOApproval" => await GetCFOApproval(requestEvt.Request),_ => throw new NotSupportedException()};await handle.SendResponseAsync(response);break;case WorkflowOutputEvent outputEvt:var result = outputEvt.DataAs<ExpenseResult>();Console.WriteLine($"审批结果: {result.Status}");if (result.Status == ApprovalStatus.Approved){await ProcessPayment(result);}break;}}
}private static async Task<ExternalResponse> GetDirectorApproval(ExternalRequest request)
{var approvalReq = request.DataAs<ApprovalRequest>();// 发送通知给主管await _notificationService.NotifyAsync(approvalReq.DirectorId, $"待审批: {approvalReq.Amount:C} - {approvalReq.Reason}");// 等待主管决策(可能来自移动端、Web 端等)var decision = await _approvalService.WaitForDecisionAsync(approvalReq.RequestId, timeout: TimeSpan.FromHours(24));return request.CreateResponse(decision);
}
八、性能优化与扩展性
8.1 并发处理
当多个用户同时触发人工接入时,如何保证系统不会崩溃?
策略一:请求队列
internal sealed class HumanRequestQueue
{private readonly Channel<PendingRequest> _queue;private readonly SemaphoreSlim _concurrencyLimit;public HumanRequestQueue(int maxConcurrency = 10){_queue = Channel.CreateUnbounded<PendingRequest>();_concurrencyLimit = new SemaphoreSlim(maxConcurrency);}public async Task<ExternalResponse> EnqueueAsync(ExternalRequest request, CancellationToken cancellationToken){var tcs = new TaskCompletionSource<ExternalResponse>();var pending = new PendingRequest(request, tcs);await _queue.Writer.WriteAsync(pending, cancellationToken);return await tcs.Task;}public async Task ProcessQueueAsync(CancellationToken cancellationToken){await foreach (var pending in _queue.Reader.ReadAllAsync(cancellationToken)){await _concurrencyLimit.WaitAsync(cancellationToken);_ = Task.Run(async () =>{try{var response = await HandleRequest(pending.Request);pending.CompletionSource.SetResult(response);}finally{_concurrencyLimit.Release();}}, cancellationToken);}}
}
策略二:优先级调度
internal sealed class PriorityHumanRequestScheduler
{private readonly PriorityQueue<PendingRequest, int> _queue = new();public async Task<ExternalResponse> ScheduleAsync(ExternalRequest request,int priority, // 数字越小优先级越高CancellationToken cancellationToken){var tcs = new TaskCompletionSource<ExternalResponse>();var pending = new PendingRequest(request, tcs);lock (_queue){_queue.Enqueue(pending, priority);}return await tcs.Task;}private int CalculatePriority(ExternalRequest request){// 根据业务规则计算优先级if (request.DataIs<ApprovalRequest>(out var approval)){return approval.Amount switch{> 100000 => 1, // 高优先级> 10000 => 2, // 中优先级_ => 3 // 低优先级};}return 5; // 默认优先级}
}
8.2 分布式部署
在微服务架构中,工作流引擎和人工处理系统可能部署在不同的服务中。
方案一:消息队列集成
internal sealed class MessageQueueRequestHandler : IExternalRequestSink
{private readonly IMessageQueue _queue;private readonly Dictionary<string, TaskCompletionSource<ExternalResponse>> _pendingRequests = new();public async Task<ExternalResponse> HandleRequestAsync(ExternalRequest request, CancellationToken cancellationToken){var tcs = new TaskCompletionSource<ExternalResponse>();_pendingRequests[request.RequestId] = tcs;// 发送到消息队列await _queue.PublishAsync(new{Type = "HumanInterventionRequest",RequestId = request.RequestId,PortId = request.PortInfo.PortId,Data = request.Data}, cancellationToken);return await tcs.Task;}public async Task OnResponseReceivedAsync(string requestId, ExternalResponse response){if (_pendingRequests.TryGetValue(requestId, out var tcs)){tcs.SetResult(response);_pendingRequests.Remove(requestId);}}
}
方案二:SignalR 实时通信
internal sealed class SignalRHumanInterface
{private readonly IHubContext<HumanInterventionHub> _hubContext;public async Task<ExternalResponse> RequestHumanInputAsync(ExternalRequest request,string userId,CancellationToken cancellationToken){var tcs = new TaskCompletionSource<ExternalResponse>();// 注册响应处理器HumanInterventionHub.RegisterResponseHandler(request.RequestId, tcs);// 通过 SignalR 发送到用户的浏览器await _hubContext.Clients.User(userId).SendAsync("RequestInput",new{RequestId = request.RequestId,PortId = request.PortInfo.PortId,Data = request.Data,Timestamp = DateTime.UtcNow},cancellationToken);// 等待用户响应using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);timeoutCts.CancelAfter(TimeSpan.FromMinutes(5));try{return await tcs.Task.WaitAsync(timeoutCts.Token);}catch (OperationCanceledException){// 超时处理return request.CreateResponse(GetDefaultResponse());}}
}// SignalR Hub
public class HumanInterventionHub : Hub
{private static readonly ConcurrentDictionary<string, TaskCompletionSource<ExternalResponse>> _responseHandlers = new();public static void RegisterResponseHandler(string requestId, TaskCompletionSource<ExternalResponse> tcs){_responseHandlers[requestId] = tcs;}public async Task SubmitResponse(string requestId, object responseData){if (_responseHandlers.TryRemove(requestId, out var tcs)){// 构造响应对象var response = new ExternalResponse(/* ... */);tcs.SetResult(response);}}
}
8.3 状态持久化
长时间运行的人工接入流程需要可靠的状态持久化。
internal sealed class PersistentHumanRequestManager
{private readonly ICheckpointStore _checkpointStore;private readonly IDistributedCache _cache;public async Task<ExternalResponse> HandleWithPersistenceAsync(ExternalRequest request,Workflow workflow,CancellationToken cancellationToken){// 保存请求状态await _cache.SetAsync($"request:{request.RequestId}",JsonSerializer.SerializeToUtf8Bytes(new{Request = request,Timestamp = DateTime.UtcNow,Status = "Pending"}),new DistributedCacheEntryOptions{AbsoluteExpirationRelativeToNow = TimeSpan.FromDays(7)},cancellationToken);// 创建工作流检查点var checkpointManager = CheckpointManager.Default;await using var checkpointedRun = await InProcessExecution.StreamAsync(workflow, request, checkpointManager);// 等待响应或超时var response = await WaitForResponseAsync(request.RequestId, cancellationToken);// 更新状态await _cache.SetAsync($"request:{request.RequestId}",JsonSerializer.SerializeToUtf8Bytes(new{Request = request,Response = response,Timestamp = DateTime.UtcNow,Status = "Completed"}),cancellationToken: cancellationToken);return response;}public async Task<ExternalResponse> ResumeFromCheckpointAsync(string requestId,CancellationToken cancellationToken){// 从缓存恢复请求信息var requestData = await _cache.GetAsync($"request:{requestId}", cancellationToken);var requestInfo = JsonSerializer.Deserialize<RequestInfo>(requestData);// 从检查点恢复工作流var checkpoint = await _checkpointStore.LoadAsync(requestInfo.CheckpointId, cancellationToken);// ... 恢复执行}
}
九、常见陷阱与解决方案
9.1 陷阱一:忘记处理超时
问题:人工接入请求发出后,如果用户长时间不响应,工作流会永久挂起。
解决方案:
private static async Task<ExternalResponse> HandleWithTimeout(ExternalRequest request,TimeSpan timeout)
{using var cts = new CancellationTokenSource(timeout);var responseTask = GetHumanInputAsync(request, cts.Token);var timeoutTask = Task.Delay(timeout, cts.Token);var completedTask = await Task.WhenAny(responseTask, timeoutTask);if (completedTask == responseTask){return await responseTask;}else{// 超时后的降级策略_logger.LogWarning($"请求 {request.RequestId} 超时,使用默认策略");return request.CreateResponse(GetDefaultDecision(request));}
}
9.2 陷阱二:状态不一致
问题:在分布式环境中,工作流状态和人工处理系统的状态可能不同步。
解决方案:使用分布式锁和事务
internal sealed class ConsistentHumanRequestHandler
{private readonly IDistributedLockProvider _lockProvider;private readonly ITransactionManager _transactionManager;public async Task<ExternalResponse> HandleConsistentlyAsync(ExternalRequest request,CancellationToken cancellationToken){// 获取分布式锁await using var lockHandle = await _lockProvider.AcquireLockAsync($"request:{request.RequestId}",TimeSpan.FromMinutes(5),cancellationToken);// 在事务中处理await using var transaction = await _transactionManager.BeginTransactionAsync(cancellationToken);try{// 1. 保存请求状态await SaveRequestStateAsync(request, transaction, cancellationToken);// 2. 发送通知await NotifyHumanAsync(request, cancellationToken);// 3. 等待响应var response = await WaitForResponseAsync(request.RequestId, cancellationToken);// 4. 更新状态await UpdateRequestStateAsync(request.RequestId, response, transaction, cancellationToken);// 5. 提交事务await transaction.CommitAsync(cancellationToken);return response;}catch{await transaction.RollbackAsync(cancellationToken);throw;}}
}
9.3 陷阱三:内存泄漏
问题:长时间运行的工作流中,未清理的请求处理器会导致内存泄漏。
解决方案:使用弱引用和定期清理
internal sealed class LeakFreeRequestManager
{private readonly ConcurrentDictionary<string, WeakReference<TaskCompletionSource<ExternalResponse>>> _pendingRequests = new();private readonly Timer _cleanupTimer;public LeakFreeRequestManager(){// 每分钟清理一次过期的请求_cleanupTimer = new Timer(CleanupExpiredRequests, null, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));}public async Task<ExternalResponse> HandleRequestAsync(ExternalRequest request,TimeSpan timeout,CancellationToken cancellationToken){var tcs = new TaskCompletionSource<ExternalResponse>();_pendingRequests[request.RequestId] = new WeakReference<TaskCompletionSource<ExternalResponse>>(tcs);using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);cts.CancelAfter(timeout);try{return await tcs.Task.WaitAsync(cts.Token);}finally{// 清理已完成的请求_pendingRequests.TryRemove(request.RequestId, out _);}}private void CleanupExpiredRequests(object? state){var expiredKeys = new List<string>();foreach (var kvp in _pendingRequests){if (!kvp.Value.TryGetTarget(out var tcs) || tcs.Task.IsCompleted){expiredKeys.Add(kvp.Key);}}foreach (var key in expiredKeys){_pendingRequests.TryRemove(key, out _);}_logger.LogDebug($"清理了 {expiredKeys.Count} 个过期请求");}
}
9.4 陷阱四:类型不匹配
问题:响应类型与请求期望的类型不匹配,导致运行时错误。
解决方案:严格的类型验证
public static class SafeResponseCreator
{public static ExternalResponse CreateTypeSafeResponse<TExpected>(this ExternalRequest request,object responseData){// 验证响应类型if (!request.PortInfo.ResponseType.IsMatchPolymorphic(responseData.GetType())){throw new InvalidOperationException($"响应类型 {responseData.GetType().Name} 与期望类型 " +$"{request.PortInfo.ResponseType.TypeName} 不匹配");}// 尝试转换if (responseData is not TExpected){try{responseData = Convert.ChangeType(responseData, typeof(TExpected));}catch (Exception ex){throw new InvalidCastException($"无法将 {responseData.GetType().Name} 转换为 {typeof(TExpected).Name}", ex);}}return request.CreateResponse(responseData);}
}// 使用示例
var response = request.CreateTypeSafeResponse<ApprovalDecision>(userInput);
十、未来展望与演进方向
10.1 AI 辅助的人工接入
未来的人工接入不应该是"非黑即白"的切换,而是 AI 和人类的协作:
// AI 提供建议,人类做最终决策
internal sealed class AIAssistedHumanDecision
{public async Task<ExternalResponse> GetAssistedDecisionAsync(ExternalRequest request,CancellationToken cancellationToken){if (request.DataIs<ApprovalRequest>(out var approvalReq)){// AI 分析并提供建议var aiAnalysis = await _aiAgent.AnalyzeAsync(approvalReq);// 展示给人类决策者var enrichedRequest = new{Original = approvalReq,AIRecommendation = aiAnalysis.Recommendation,Confidence = aiAnalysis.Confidence,RiskFactors = aiAnalysis.RiskFactors,SimilarCases = await FindSimilarCases(approvalReq),PredictedOutcome = aiAnalysis.PredictedOutcome};// 人类基于 AI 建议做决策var humanDecision = await GetHumanDecisionWithAIContext(enrichedRequest);// 记录决策用于 AI 学习await RecordDecisionForLearning(approvalReq, aiAnalysis, humanDecision);return request.CreateResponse(humanDecision);}throw new NotSupportedException();}
}
10.2 自适应的人工接入阈值
系统应该能够学习何时需要人工接入:
internal sealed class AdaptiveHumanInterventionThreshold
{private readonly IMLModel _thresholdModel;public async Task<bool> ShouldRequestHumanAsync(WorkflowContext context,AIDecision aiDecision){// 收集特征var features = new{AIConfidence = aiDecision.Confidence,DecisionComplexity = CalculateComplexity(context),HistoricalAccuracy = await GetHistoricalAccuracy(context.Category),BusinessImpact = CalculateBusinessImpact(context),UserSatisfactionTrend = await GetSatisfactionTrend(context.Category)};// ML 模型预测是否需要人工var prediction = await _thresholdModel.PredictAsync(features);return prediction.ShouldRequestHuman;}public async Task LearnFromOutcomeAsync(WorkflowContext context,AIDecision aiDecision,bool requestedHuman,Outcome actualOutcome){// 记录训练数据await _thresholdModel.RecordTrainingDataAsync(new{Context = context,AIDecision = aiDecision,RequestedHuman = requestedHuman,ActualOutcome = actualOutcome,WasCorrect = aiDecision.Prediction == actualOutcome});}
}
10.3 多模态人工接入
未来的人工接入不仅限于文本,还可能包括语音、图像、视频:
internal sealed class MultiModalHumanInterface
{public async Task<ExternalResponse> RequestMultiModalInputAsync(ExternalRequest request,CancellationToken cancellationToken){var modalityPreference = await GetUserModalityPreference();return modalityPreference switch{InputModality.Voice => await RequestVoiceInputAsync(request, cancellationToken),InputModality.Touch => await RequestTouchInputAsync(request, cancellationToken),InputModality.Gesture => await RequestGestureInputAsync(request, cancellationToken),InputModality.BrainInterface => await RequestBCIInputAsync(request, cancellationToken),_ => await RequestTextInputAsync(request, cancellationToken)};}private async Task<ExternalResponse> RequestVoiceInputAsync(ExternalRequest request,CancellationToken cancellationToken){// 通过语音接口获取输入var audioStream = await _voiceInterface.StartListeningAsync();var transcription = await _speechToText.TranscribeAsync(audioStream);var intent = await _nluEngine.ParseAsync(transcription);return request.CreateResponse(intent);}
}
10.4 区块链验证的人工决策
对于高风险决策,可以使用区块链记录不可篡改的审计日志:
internal sealed class BlockchainVerifiedHumanDecision
{private readonly IBlockchainClient _blockchain;public async Task<ExternalResponse> GetVerifiedDecisionAsync(ExternalRequest request,CancellationToken cancellationToken){var decision = await GetHumanDecisionAsync(request, cancellationToken);// 创建决策记录var record = new DecisionRecord{RequestId = request.RequestId,Timestamp = DateTime.UtcNow,DecisionMaker = GetCurrentUserId(),Decision = decision,Context = request.Data,Signature = await SignDecisionAsync(decision)};// 写入区块链var transactionHash = await _blockchain.RecordDecisionAsync(record);// 在响应中包含区块链证明var verifiedResponse = new VerifiedDecision{Decision = decision,BlockchainProof = new{TransactionHash = transactionHash,BlockNumber = await _blockchain.GetCurrentBlockNumberAsync(),Timestamp = record.Timestamp}};return request.CreateResponse(verifiedResponse);}
}
十一、总结与思考
11.1 核心要点回顾
通过深入剖析 Microsoft Agent Framework 的人工接入机制,我们发现了几个关键设计原则:
1. 类型安全优先
-
RequestPort 通过泛型实现编译期类型检查
-
PortableValue 提供跨边界的类型安全传递
-
强类型契约减少运行时错误
2. 状态可恢复性
-
SuperStep 作为原子执行单位
-
Checkpoint 机制保证任意时刻可恢复
-
DurableProperty 自动管理持久化状态
3. 灵活性与易用性的平衡
-
RequestPort 提供最大灵活性(编程式)
-
Question Action 提供最佳易用性(声明式)
-
两种模式互补,覆盖不同场景
4. 可观测性内置
-
Activity 追踪提供完整的执行链路
-
事件流提供实时的状态反馈
-
指标收集支持性能分析
11.2 设计哲学的启示
Microsoft Agent Framework 的人工接入设计给我们带来了几点启示:
启示一:抽象的力量
把人类抽象成"特殊的执行器",这个看似简单的设计决策,带来了巨大的好处:
-
统一的编程模型
-
一致的状态管理
-
无缝的 Checkpoint 支持
启示二:边界的清晰
工作流引擎和外部世界之间有明确的边界:
-
RequestInfoEvent 是唯一的出口
-
ExternalResponse 是唯一的入口
-
这种清晰的边界使得系统易于理解和测试
启示三:容错的必要性
人工接入天然是不可靠的:
-
人可能不在线
-
人可能输入错误
-
人可能需要很长时间
系统必须为这些情况做好准备:超时、重试、默认值、降级策略。
启示四:演进的空间
好的设计应该为未来留有空间:
-
RequestPort 的泛型设计支持任意类型
-
PortableValue 的抽象支持跨进程传递
-
事件驱动的架构支持异步和分布式
11.3 实践建议
基于本文的分析,给出几点实践建议:
对于架构师:
-
优先考虑 RequestPort 用于核心业务流程
-
为人工接入设计明确的 SLA(响应时间、可用性)
-
建立完善的监控和告警机制
-
规划好状态持久化和灾难恢复策略
对于开发者:
-
充分利用类型系统,避免运行时错误
-
为每个人工接入点设计超时和降级策略
-
编写充分的单元测试和集成测试
-
使用 Checkpoint 机制保证可恢复性
对于业务人员:
-
Question Action 足以应对大多数标准流程
-
合理设置重试次数和默认值
-
利用实体提取简化用户输入
-
定期审查和优化人工接入的触发条件
11.4 最后的思考
人工接入不是 AI 的失败,而是 AI 的谦逊。
在这个 AI 能力日益强大的时代,我们很容易陷入"AI 万能论"的陷阱。但真正成熟的 AI 系统,应该知道自己的边界,知道何时需要人类的智慧。
Microsoft Agent Framework 的人工接入机制,正是这种谦逊的体现。它不是简单地把人类排除在外,也不是把人类当作 AI 的"备胎",而是把人类视为工作流中平等的参与者——有时是决策者,有时是监督者,有时是创造者。
这种设计哲学,或许才是 AI 与人类协作的正确方向。
参考资源
-
Microsoft Agent Framework 官方文档
-
Workflow 示例代码
-
Human-in-the-Loop 最佳实践
-
Checkpoint 机制详解
更多AIGC文章
RAG技术全解:从原理到实战的简明指南
更多VibeCoding文章

