当前位置: 首页 > news >正文

响应式编程入门教程第六节:进阶?Combine、Merge、SelectMany 与错误处理

响应式编程入门教程第一节:揭秘 UniRx 核心 - ReactiveProperty - 让你的数据动起来!

响应式编程入门教程第二节:构建 ObservableProperty<T> — 封装 ReactiveProperty 的高级用法

响应式编程入门教程第三节:ReactiveCommand 与 UI 交互

响应式编程入门教程第四节:响应式集合与数据绑定

响应式编程入门教程第五节:Unity 生命周期与资源管理中的响应式编程

响应式编程入门教程第六节:进阶?Combine、Merge、SelectMany 与错误处理

经过前面几篇的学习,我们已经对 ReactivePropertyReactiveCommandReactiveCollection 有了扎实的理解,也掌握了如何管理订阅生命周期和封装 Unity 事件。现在,我们将进入响应式编程真正展现其强大威力的地方:高级操作符的组合与转换

在实际项目中,我们面对的逻辑往往是复杂的:需要同时监听多个数据源、将不同的事件流合并、将异步操作串联起来,并确保整个系统在面对错误时依然健壮。UniRx 提供了一系列强大的操作符来优雅地解决这些问题,让我们能够以声明式的方式构建复杂的逻辑。


1. 组合操作符:将多个流合并为一个

很多时候,一个业务逻辑的触发条件或数据来源依赖于多个独立的事件流。组合操作符允许我们将这些独立的流合并成一个新的流,其发射的值基于原始流的特定组合。

1.1 CombineLatest:取最新值组合

CombineLatest 操作符会等待所有源 Observable 都至少发射一个值,然后每当其中任何一个源发射新值时,它就会将所有源的最新值组合起来并发射出去。

应用场景: UI 表单验证(多个输入字段都合法时按钮才可用)、多个游戏状态(玩家在线且有足够的金币)都满足时触发某个行为。

using UnityEngine;
using UnityEngine.UI;
using UniRx;public class LoginPanel : MonoBehaviour
{public InputField usernameInput;public InputField passwordInput;public Button loginButton;void Start(){// 用户名长度是否合法 (至少3个字符)var isUsernameValid = usernameInput.OnValueChangedAsObservable().Select(username => username.Length >= 3).Publish().RefCount(); // 使用 Publish().RefCount() 避免重复订阅// 密码长度是否合法 (至少6个字符)var isPasswordValid = passwordInput.OnValueChangedAsObservable().Select(password => password.Length >= 6).Publish().RefCount();// 组合两个流:当用户名和密码都合法时,登录按钮才可用isUsernameValid.CombineLatest(isPasswordValid, (isUserOk, isPassOk) => isUserOk && isPassOk).Subscribe(canLogin =>{loginButton.interactable = canLogin;Debug.Log($"登录按钮状态: {(canLogin ? "可用" : "禁用")}");}).AddTo(this);// 订阅登录按钮点击事件loginButton.OnClickAsObservable().Subscribe(_ => Debug.Log("尝试登录...")).AddTo(this);// 初始化按钮状态 (如果输入框初始值不满足条件,按钮应禁用)loginButton.interactable = false;}
}

LoginPanel 示例中,CombineLatest 实时监听用户名和密码输入框的有效性。只要其中一个输入框内容改变,并且两个输入框都满足了长度要求,loginButton 就会变为可点击状态。

1.2 Merge:将多个流的事件合并到一个流中

Merge 操作符会合并来自多个 Observable 的事件,并按照事件发生的时间顺序将它们发射到单个 Observable 中。

应用场景: 同时监听多个 UI 按钮的点击事件、统一处理来自不同服务器的实时消息、合并不同来源的日志信息。

using UnityEngine;
using UnityEngine.UI;
using UniRx;
using System.Collections.Generic;public class EventAggregator : MonoBehaviour
{public Button buttonA;public Button buttonB;public Button buttonC;public Text statusText;void Start(){// 将多个按钮的点击事件合并到一个流中var clicks = Observable.Merge(buttonA.OnClickAsObservable().Select(_ => "按钮A 被点击"),buttonB.OnClickAsObservable().Select(_ => "按钮B 被点击"),buttonC.OnClickAsObservable().Select(_ => "按钮C 被点击"));clicks.Subscribe(message =>{statusText.text = message;Debug.Log(message);}).AddTo(this);}
}

通过 Merge,无论哪个按钮被点击,statusText 都会更新为对应的消息。这比分别订阅每个按钮并写重复逻辑要简洁得多。

1.3 Zip:按索引配对组合

Zip 操作符会按顺序从每个源 Observable 中取一个值,然后将它们配对组合成一个新的值并发射出去。它会等待所有源都发射一个值后才开始组合,并且会以最少事件的那个源为准。

应用场景: 同步处理两个或多个需要按顺序对应的数据流(例如,动画完成与某个数据更新同步)、实现分步操作的同步。

using UnityEngine;
using UniRx;
using System;public class ZipExample : MonoBehaviour
{void Start(){// 模拟两个异步操作,分别返回数字和字母var numbers = Observable.Interval(TimeSpan.FromSeconds(0.5f)).Take(3) // 0, 1, 2.Select(x => (char)('0' + x));var letters = Observable.Interval(TimeSpan.FromSeconds(0.7f)).Take(3) // a, b, c.Select(x => (char)('a' + x));// 将数字和字母流按顺序配对numbers.Zip(letters, (num, letter) => $"({num}, {letter})").Subscribe(result => Debug.Log($"Zip 结果: {result}"),() => Debug.Log("Zip 完成")).AddTo(this);// 预期输出:// (0, a) - 0.7s 时 number[0] 和 letter[0] 都到了// (1, b) - 1.4s 时 number[1] 和 letter[1] 都到了// (2, c) - 2.1s 时 number[2] 和 letter[2] 都到了}
}

ZipExample 中,Zip 会等待 numbersletters 都准备好各自的第一个值后才组合。因此,尽管 numbers 产生值的速度更快,但 Zip 的输出速度受限于较慢的 letters 流。


2. 转换与过滤操作符:重塑事件流

这些操作符允许我们对 Observable 发射的事件进行转换、过滤或聚合,以满足特定的业务需求。

2.1 SelectMany:处理嵌套的 Observable (扁平化操作)

SelectMany (FlatMap) 是一个非常重要的操作符,用于处理这样的场景:当一个 Observable 发射一个值时,你需要基于这个值去创建并订阅另一个 Observable,然后将这个“内部 Observable”所发射的所有值扁平化到主 Observable 流中。

应用场景:

  • 串联异步操作: 例如,用户点击按钮 -> 发送网络请求 -> 收到数据后进行本地存储。
  • 数据查询: 根据一个 ID 查询详细信息,然后根据详细信息再查询相关联的其他数据。
  • 游戏流程: 技能施放成功 -> 播放特效动画 -> 动画播放完毕后造成伤害。
using UnityEngine;
using UnityEngine.UI;
using UniRx;
using System;
using System.Threading.Tasks;public class AsyncChain : MonoBehaviour
{public Button startChainButton;public Text statusText;void Start(){startChainButton.OnClickAsObservable().SelectMany(_ => PerformStep1("用户数据")) // 步骤1:模拟加载用户数据.SelectMany(userData => PerformStep2(userData + " -> 物品数据")) // 步骤2:根据用户数据加载物品数据.SelectMany(itemData => PerformStep3(itemData + " -> 最终结果")) // 步骤3:根据物品数据处理最终结果.Subscribe(finalResult =>{statusText.text = $"流程完成: {finalResult}";Debug.Log($"整个异步链完成: {finalResult}");},ex =>{statusText.text = $"流程出错: {ex.Message}";Debug.LogError($"异步链中发生错误: {ex.Message}");},() => Debug.Log("异步链流完成") // 正常完成回调).AddTo(this);}// 模拟第一个异步步骤private IObservable<string> PerformStep1(string input){return Observable.FromAsync(async () =>{Debug.Log($"步骤1开始: {input}");await Task.Delay(TimeSpan.FromSeconds(1));if (UnityEngine.Random.value < 0.2f) throw new Exception("步骤1模拟错误!"); // 模拟错误Debug.Log("步骤1完成");return "处理后的用户数据";});}// 模拟第二个异步步骤private IObservable<string> PerformStep2(string input){return Observable.FromAsync(async () =>{Debug.Log($"步骤2开始: {input}");await Task.Delay(TimeSpan.FromSeconds(1.5f));Debug.Log("步骤2完成");return "处理后的物品数据";});}// 模拟第三个异步步骤private IObservable<string> PerformStep3(string input){return Observable.FromAsync(async () =>{Debug.Log($"步骤3开始: {input}");await Task.Delay(TimeSpan.FromSeconds(0.8f));Debug.Log("步骤3完成");return "最终结果数据";});}
}

在这个例子中,SelectMany 优雅地将三个独立的异步操作串联起来。前一个操作的输出作为后一个操作的输入。如果其中任何一个步骤发生错误,整个流会向下游传播错误,我们可以统一处理。

2.2 Throttle / ThrottleFirst / Debounce:控制事件频率

这些操作符用于限制事件流的发射频率,在处理高频事件时非常有用。

  • Throttle(TimeSpan) 在指定的时间窗口内,只发射最后一次事件。适用于搜索框输入(用户停止输入一段时间后才触发搜索)。
  • ThrottleFirst(TimeSpan) 在指定的时间窗口内,只发射第一次事件。适用于防止按钮重复点击。
  • Debounce(TimeSpan)Throttle 类似,但在时间窗口内,如果又收到新事件,则会重置计时器。
using UnityEngine;
using UnityEngine.UI;
using UniRx;
using System;public class FrequencyControl : MonoBehaviour
{public Button spamButton;public Text statusText;void Start(){// 防止快速双击:0.5秒内只响应第一次点击spamButton.OnClickAsObservable().ThrottleFirst(TimeSpan.FromSeconds(0.5f)).Subscribe(_ =>{statusText.text = "按钮被点击 (ThrottleFirst)";Debug.Log("按钮被点击 (ThrottleFirst)");}).AddTo(this);// 假设有一个持续的输入流(例如鼠标移动事件)// 每0.2秒最多处理一次Observable.EveryUpdate().Where(_ => Input.GetMouseButton(0)) // 鼠标左键按下时.Throttle(TimeSpan.FromSeconds(0.2f)) // 每0.2秒只处理一次.Subscribe(_ =>{Debug.Log("鼠标拖拽事件 (Throttle)");}).AddTo(this);}
}
2.3 WhereSelect:过滤与映射
  • Where(predicate) 过滤流中的事件,只让满足特定条件的事件通过。
  • Select(selector) 将流中的每个事件映射(转换)成一个新的值。

这两个操作符非常基础和常用,用于构建更精确的事件流。


3. 错误处理:构建健壮的响应式系统

在复杂的响应式流中,错误是不可避免的。网络请求可能失败、文件加载可能不存在、某个内部计算可能抛出异常。如果不妥善处理,一个错误可能会终止整个流,导致应用程序崩溃或进入不可预测的状态。

UniRx 提供了一系列操作符来优雅地处理错误,让我们的响应式系统更加健壮。

3.1 Catch:捕获并替换错误流

Catch 操作符用于捕获上游流中的错误,并在错误发生时,用另一个 Observable 替换 整个流。

using UnityEngine;
using UniRx;
using System;
using System.Threading.Tasks;public class ErrorHandlingCatch : MonoBehaviour
{void Start(){var source = Observable.FromAsync(async () =>{Debug.Log("开始模拟可能出错的操作...");await Task.Delay(TimeSpan.FromSeconds(1));if (UnityEngine.Random.value < 0.5f) // 50% 概率出错{throw new Exception("模拟操作失败!");}return "操作成功!";});source.Catch((Exception ex) =>{Debug.LogError($"错误被 Catch 捕获: {ex.Message}");// 当错误发生时,返回一个新的 Observable 流来替代return Observable.Return("从错误中恢复,返回默认值。");}).Subscribe(result => Debug.Log($"最终结果: {result}"),error => Debug.LogError($"Subscribe 接收到未被 Catch 处理的错误: {error.Message}"),() => Debug.Log("流完成")).AddTo(this);}
}

在这个例子中,如果 source 流发生错误,Catch 会捕获它,并发射 Observable.Return("从错误中恢复..."),然后流会正常完成。订阅者只会收到这个恢复值,而不会收到错误通知。

3.2 OnErrorResumeNext:捕获并切换到下一个流

OnErrorResumeNext 操作符在源 Observable 遇到错误时,会立即切换到你指定的下一个 Observable。这与 Catch 类似,但它更强调在错误发生后“继续”使用另一个完整的流。

using UnityEngine;
using UniRx;
using System;
using System.Threading.Tasks;public class ErrorHandlingResumeNext : MonoBehaviour
{void Start(){var mainOperation = Observable.FromAsync(async () =>{Debug.Log("主操作开始...");await Task.Delay(TimeSpan.FromSeconds(1));if (UnityEngine.Random.value < 0.5f) // 50% 概率出错{throw new Exception("主操作失败!");}return "主操作成功数据";});var fallbackOperation = Observable.FromAsync(async () =>{Debug.Log("备用操作开始...");await Task.Delay(TimeSpan.FromSeconds(0.5f));return "备用操作成功数据";});mainOperation.OnErrorResumeNext(fallbackOperation) // 如果 mainOperation 出错,则切换到 fallbackOperation.Subscribe(data => Debug.Log($"最终接收到数据: {data}"),error => Debug.LogError($"Subscribe 接收到未处理的错误: {error.Message}"),() => Debug.Log("流完成")).AddTo(this);}
}

这里,如果 mainOperation 失败,流会无缝切换到 fallbackOperation。订阅者会收到 fallbackOperation 发出的值,然后流正常完成。

3.3 Retry / RetryWhen:重试机制
  • Retry() 当源 Observable 发生错误时,无条件地重新订阅源 Observable。可以指定重试次数 Retry(count)
  • RetryWhen(selector) 提供更复杂的重试逻辑。它接收一个错误流,你可以根据错误类型或重试次数,决定是立即重试、延迟重试,还是最终抛出错误。

应用场景: 网络请求的自动重试、临时性故障的恢复。

using UnityEngine;
using UniRx;
using System;
using System.Threading.Tasks;public class ErrorHandlingRetry : MonoBehaviour
{private int _attemptCount = 0;void Start(){var unstableOperation = Observable.FromAsync(async () =>{_attemptCount++;Debug.Log($"尝试执行操作 (第 {_attemptCount} 次)...");await Task.Delay(TimeSpan.FromSeconds(1));if (_attemptCount < 3) // 前2次模拟失败{throw new Exception("操作不稳定,请重试!");}Debug.Log("操作终于成功了!");return "成功数据";});// 无条件重试3次unstableOperation.Retry(3) // 会尝试最多3次(第一次执行+2次重试).Subscribe(data => Debug.Log($"最终成功接收到数据: {data}"),error => Debug.LogError($"操作在重试后仍然失败: {error.Message}"),() => Debug.Log("流完成")).AddTo(this);// --- 复杂重试逻辑示例 (RetryWhen) ---// 模拟一个只有在特定条件满足时才重试的操作// 例如,只重试网络错误,且延迟重试/*_attemptCount = 0; // 重置计数器var smartRetryOperation = Observable.FromAsync(async () =>{_attemptCount++;Debug.Log($"智能重试操作 (第 {_attemptCount} 次)...");await Task.Delay(TimeSpan.FromSeconds(0.5f));if (_attemptCount < 2){throw new InvalidOperationException("模拟业务逻辑错误,不重试!"); // 不想重试的错误}if (_attemptCount < 4){throw new Exception("模拟网络错误,需要重试!"); // 想重试的错误}return "智能重试成功";});smartRetryOperation.RetryWhen(errors =>errors.SelectMany(error =>{if (error is InvalidOperationException){// 如果是业务逻辑错误,则直接抛出,不重试return Observable.Throw<long>(error);}// 对于其他错误,延迟2秒后重试Debug.Log($"检测到错误,将在2秒后重试: {error.Message}");return Observable.Timer(TimeSpan.FromSeconds(2));})).Subscribe(data => Debug.Log($"智能重试最终成功: {data}"),error => Debug.LogError($"智能重试最终失败: {error.Message}"),() => Debug.Log("智能重试流完成")).AddTo(this);*/}
}

RetryRetryWhen 是构建弹性系统的关键。RetryWhen 尤其强大,它允许你根据错误的类型、重试的次数等,定制化重试策略,甚至可以实现指数退避重试(Exponential Backoff)。


4. 调度器 (Scheduler):线程管理与上下文切换

在响应式编程中,操作符的执行上下文(线程)是一个重要的概念。在 Unity 中,大部分 UI 操作和游戏逻辑都必须在主线程上执行。UniRx 的调度器 (Scheduler) 负责管理 Observable 序列的执行,包括订阅、事件发送和操作符的执行。

  • Scheduler.MainThread UniRx 默认的调度器,确保所有事件都在 Unity 主线程上发布。这是你大部分时间都会使用的调度器,因为它避免了跨线程访问 Unity API 的问题。
  • Scheduler.ThreadPool / Scheduler.CurrentThread 用于在后台线程执行耗时操作,避免阻塞主线程。

应用场景: 当你有一个非常耗时的计算,或者需要从网络线程切换回主线程更新 UI 时。

using UnityEngine;
using UnityEngine.UI;
using UniRx;
using System;
using System.Threading.Tasks;public class SchedulerExample : MonoBehaviour
{public Button startComputeButton;public Text resultText;void Start(){startComputeButton.OnClickAsObservable().SelectMany(_ => Observable.FromAsync(() => DoHeavyComputation())) // 耗时操作.ObserveOn(Scheduler.MainThread) // 将结果调度回主线程.Subscribe(result =>{resultText.text = $"计算结果: {result}";Debug.Log($"计算完成,在主线程更新UI: {result}");},ex => Debug.LogError($"计算失败: {ex.Message}")).AddTo(this);}// 模拟一个耗时的后台计算private async UniTask<int> DoHeavyComputation(){Debug.Log($"开始耗时计算,当前线程ID: {System.Threading.Thread.CurrentThread.ManagedThreadId}");await Task.Delay(TimeSpan.FromSeconds(3)); // 模拟耗时int sum = 0;for (int i = 0; i < 100000000; i++) // 模拟CPU密集型计算{sum += i;}Debug.Log($"耗时计算完成,当前线程ID: {System.Threading.Thread.CurrentThread.ManagedThreadId}");return sum;}
}

在这个例子中:

  1. OnClickAsObservable 在主线程触发。
  2. SelectMany 内部的 DoHeavyComputation 使用 UniTask,它默认在线程池执行,不会阻塞主线程。
  3. ObserveOn(Scheduler.MainThread) 是关键:它确保 Subscribe 中的代码(更新 resultText)总是在 Unity 主线程上执行,避免跨线程访问 UI 组件的错误。

重要提示: 除非你明确知道自己在做什么,否则请始终使用 ObserveOn(Scheduler.MainThread) 在异步操作完成后切换回主线程来更新 Unity UI 或访问其他 Unity API。


5. 总结与展望

本篇教程深入讲解了响应式编程中的核心高级操作符:

  • 组合操作符 (CombineLatest, Merge, Zip): 如何将多个独立的事件流合并为一个。
  • 转换与过滤操作符 (SelectMany, Throttle, Where, Select): 如何重塑事件流以满足复杂的业务逻辑和性能需求。
  • 错误处理 (Catch, OnErrorResumeNext, Retry, RetryWhen): 如何构建能够从错误中恢复的健壮系统。
  • 调度器 (Scheduler.MainThread): 如何在 Unity 中安全地进行线程切换,以避免阻塞主线程和跨线程访问问题。

掌握这些操作符是构建复杂、高性能、可维护的响应式应用程序的关键。它们让你能够以一种声明式、模块化的方式来表达复杂的异步和事件驱动逻辑,大大降低了代码的耦合度和维护成本。

在下一篇教程中,我们将把这些响应式编程的理念和工具提升到架构层面,探讨 响应式架构与 MVVM (Model-View-ViewModel) 模式在 Unity 中的应用。这将帮助你构建出更加清晰、可测试和可扩展的 Unity 应用程序。

响应式编程入门教程第一节:揭秘 UniRx 核心 - ReactiveProperty - 让你的数据动起来!

响应式编程入门教程第二节:构建 ObservableProperty<T> — 封装 ReactiveProperty 的高级用法

响应式编程入门教程第三节:ReactiveCommand 与 UI 交互

响应式编程入门教程第四节:响应式集合与数据绑定

响应式编程入门教程第五节:Unity 生命周期与资源管理中的响应式编程

响应式编程入门教程第六节:进阶?Combine、Merge、SelectMany 与错误处理

http://www.dtcms.com/a/285870.html

相关文章:

  • 【怜渠客】简单实现手机云控Windows电脑锁屏
  • MySQL中的锁有哪些
  • 【软件重构】如何避免意外冗余
  • 一文入门深度学习(以医学图像分割为例)
  • 【机器学习深度学习】LoRA 与 QLoRA:大模型高效微调的进阶指南
  • 【华为机试】70. 爬楼梯
  • 快速安装GitLab指南
  • mix-blend-mode的了解使用
  • git 介绍与使用教程
  • LP-MSPM0G3507学习--05管脚中断
  • 如何 ASP.NET Core 中使用 WebSocket
  • HTTP性能优化实战技术文章大纲
  • final 使用
  • 实现库存显示和状态按钮的Question
  • 数据统计模块后端架构解析:从Controller到SQL的ECharts数据对接实践
  • Spring介绍以及IOC和AOP的实现
  • 某邮生活旋转验证码逆向
  • Ansible + Shell 服务器巡检脚本
  • 如何使用Python将HTML格式的文本转换为Markdown格式?
  • UDP 协议下一发一收通信程序的实现与解析
  • Python - 数据分析三剑客之Pandas
  • docker--容器自启动
  • Vue导出Html为Word中包含图片在Microsoft Word显示异常问题
  • Python MP3 归一化器和长度分割器实用工具开发指南
  • 曼哈顿自注意力MaSA,基于曼哈顿距离的显式空间先验,以线性计算复杂度高效建模全局与局部空间关系,提升视觉任务的性能。
  • Java: 反射机制的 ParameterizedType(参数化类型)
  • WEB弹性设计
  • 使用 C++ 和 OpenCV 进行表面划痕检测
  • jQuery最新js文件下载教程
  • Django母婴商城项目实践(五)