响应式编程入门教程第六节:进阶?Combine、Merge、SelectMany 与错误处理
响应式编程入门教程第一节:揭秘 UniRx 核心 - ReactiveProperty - 让你的数据动起来!
响应式编程入门教程第二节:构建 ObservableProperty<T> — 封装 ReactiveProperty 的高级用法
响应式编程入门教程第三节:ReactiveCommand 与 UI 交互
响应式编程入门教程第四节:响应式集合与数据绑定
响应式编程入门教程第五节:Unity 生命周期与资源管理中的响应式编程
响应式编程入门教程第六节:进阶?Combine、Merge、SelectMany 与错误处理
经过前面几篇的学习,我们已经对 ReactiveProperty
、ReactiveCommand
和 ReactiveCollection
有了扎实的理解,也掌握了如何管理订阅生命周期和封装 Unity 事件。现在,我们将进入响应式编程真正展现其强大威力的地方:高级操作符的组合与转换。
在实际项目中,我们面对的逻辑往往是复杂的:需要同时监听多个数据源、将不同的事件流合并、将异步操作串联起来,并确保整个系统在面对错误时依然健壮。UniRx 提供了一系列强大的操作符来优雅地解决这些问题,让我们能够以声明式的方式构建复杂的逻辑。
1. 组合操作符:将多个流合并为一个
很多时候,一个业务逻辑的触发条件或数据来源依赖于多个独立的事件流。组合操作符允许我们将这些独立的流合并成一个新的流,其发射的值基于原始流的特定组合。
1.1 CombineLatest
:取最新值组合
CombineLatest
操作符会等待所有源 Observable 都至少发射一个值,然后每当其中任何一个源发射新值时,它就会将所有源的最新值组合起来并发射出去。
应用场景: UI 表单验证(多个输入字段都合法时按钮才可用)、多个游戏状态(玩家在线且有足够的金币)都满足时触发某个行为。
using UnityEngine;
using UnityEngine.UI;
using UniRx;public class LoginPanel : MonoBehaviour
{public InputField usernameInput;public InputField passwordInput;public Button loginButton;void Start(){// 用户名长度是否合法 (至少3个字符)var isUsernameValid = usernameInput.OnValueChangedAsObservable().Select(username => username.Length >= 3).Publish().RefCount(); // 使用 Publish().RefCount() 避免重复订阅// 密码长度是否合法 (至少6个字符)var isPasswordValid = passwordInput.OnValueChangedAsObservable().Select(password => password.Length >= 6).Publish().RefCount();// 组合两个流:当用户名和密码都合法时,登录按钮才可用isUsernameValid.CombineLatest(isPasswordValid, (isUserOk, isPassOk) => isUserOk && isPassOk).Subscribe(canLogin =>{loginButton.interactable = canLogin;Debug.Log($"登录按钮状态: {(canLogin ? "可用" : "禁用")}");}).AddTo(this);// 订阅登录按钮点击事件loginButton.OnClickAsObservable().Subscribe(_ => Debug.Log("尝试登录...")).AddTo(this);// 初始化按钮状态 (如果输入框初始值不满足条件,按钮应禁用)loginButton.interactable = false;}
}
在 LoginPanel
示例中,CombineLatest
实时监听用户名和密码输入框的有效性。只要其中一个输入框内容改变,并且两个输入框都满足了长度要求,loginButton
就会变为可点击状态。
1.2 Merge
:将多个流的事件合并到一个流中
Merge
操作符会合并来自多个 Observable 的事件,并按照事件发生的时间顺序将它们发射到单个 Observable 中。
应用场景: 同时监听多个 UI 按钮的点击事件、统一处理来自不同服务器的实时消息、合并不同来源的日志信息。
using UnityEngine;
using UnityEngine.UI;
using UniRx;
using System.Collections.Generic;public class EventAggregator : MonoBehaviour
{public Button buttonA;public Button buttonB;public Button buttonC;public Text statusText;void Start(){// 将多个按钮的点击事件合并到一个流中var clicks = Observable.Merge(buttonA.OnClickAsObservable().Select(_ => "按钮A 被点击"),buttonB.OnClickAsObservable().Select(_ => "按钮B 被点击"),buttonC.OnClickAsObservable().Select(_ => "按钮C 被点击"));clicks.Subscribe(message =>{statusText.text = message;Debug.Log(message);}).AddTo(this);}
}
通过 Merge
,无论哪个按钮被点击,statusText
都会更新为对应的消息。这比分别订阅每个按钮并写重复逻辑要简洁得多。
1.3 Zip
:按索引配对组合
Zip
操作符会按顺序从每个源 Observable 中取一个值,然后将它们配对组合成一个新的值并发射出去。它会等待所有源都发射一个值后才开始组合,并且会以最少事件的那个源为准。
应用场景: 同步处理两个或多个需要按顺序对应的数据流(例如,动画完成与某个数据更新同步)、实现分步操作的同步。
using UnityEngine;
using UniRx;
using System;public class ZipExample : MonoBehaviour
{void Start(){// 模拟两个异步操作,分别返回数字和字母var numbers = Observable.Interval(TimeSpan.FromSeconds(0.5f)).Take(3) // 0, 1, 2.Select(x => (char)('0' + x));var letters = Observable.Interval(TimeSpan.FromSeconds(0.7f)).Take(3) // a, b, c.Select(x => (char)('a' + x));// 将数字和字母流按顺序配对numbers.Zip(letters, (num, letter) => $"({num}, {letter})").Subscribe(result => Debug.Log($"Zip 结果: {result}"),() => Debug.Log("Zip 完成")).AddTo(this);// 预期输出:// (0, a) - 0.7s 时 number[0] 和 letter[0] 都到了// (1, b) - 1.4s 时 number[1] 和 letter[1] 都到了// (2, c) - 2.1s 时 number[2] 和 letter[2] 都到了}
}
在 ZipExample
中,Zip
会等待 numbers
和 letters
都准备好各自的第一个值后才组合。因此,尽管 numbers
产生值的速度更快,但 Zip
的输出速度受限于较慢的 letters
流。
2. 转换与过滤操作符:重塑事件流
这些操作符允许我们对 Observable 发射的事件进行转换、过滤或聚合,以满足特定的业务需求。
2.1 SelectMany
:处理嵌套的 Observable (扁平化操作)
SelectMany
(FlatMap) 是一个非常重要的操作符,用于处理这样的场景:当一个 Observable 发射一个值时,你需要基于这个值去创建并订阅另一个 Observable,然后将这个“内部 Observable”所发射的所有值扁平化到主 Observable 流中。
应用场景:
- 串联异步操作: 例如,用户点击按钮 -> 发送网络请求 -> 收到数据后进行本地存储。
- 数据查询: 根据一个 ID 查询详细信息,然后根据详细信息再查询相关联的其他数据。
- 游戏流程: 技能施放成功 -> 播放特效动画 -> 动画播放完毕后造成伤害。
using UnityEngine;
using UnityEngine.UI;
using UniRx;
using System;
using System.Threading.Tasks;public class AsyncChain : MonoBehaviour
{public Button startChainButton;public Text statusText;void Start(){startChainButton.OnClickAsObservable().SelectMany(_ => PerformStep1("用户数据")) // 步骤1:模拟加载用户数据.SelectMany(userData => PerformStep2(userData + " -> 物品数据")) // 步骤2:根据用户数据加载物品数据.SelectMany(itemData => PerformStep3(itemData + " -> 最终结果")) // 步骤3:根据物品数据处理最终结果.Subscribe(finalResult =>{statusText.text = $"流程完成: {finalResult}";Debug.Log($"整个异步链完成: {finalResult}");},ex =>{statusText.text = $"流程出错: {ex.Message}";Debug.LogError($"异步链中发生错误: {ex.Message}");},() => Debug.Log("异步链流完成") // 正常完成回调).AddTo(this);}// 模拟第一个异步步骤private IObservable<string> PerformStep1(string input){return Observable.FromAsync(async () =>{Debug.Log($"步骤1开始: {input}");await Task.Delay(TimeSpan.FromSeconds(1));if (UnityEngine.Random.value < 0.2f) throw new Exception("步骤1模拟错误!"); // 模拟错误Debug.Log("步骤1完成");return "处理后的用户数据";});}// 模拟第二个异步步骤private IObservable<string> PerformStep2(string input){return Observable.FromAsync(async () =>{Debug.Log($"步骤2开始: {input}");await Task.Delay(TimeSpan.FromSeconds(1.5f));Debug.Log("步骤2完成");return "处理后的物品数据";});}// 模拟第三个异步步骤private IObservable<string> PerformStep3(string input){return Observable.FromAsync(async () =>{Debug.Log($"步骤3开始: {input}");await Task.Delay(TimeSpan.FromSeconds(0.8f));Debug.Log("步骤3完成");return "最终结果数据";});}
}
在这个例子中,SelectMany
优雅地将三个独立的异步操作串联起来。前一个操作的输出作为后一个操作的输入。如果其中任何一个步骤发生错误,整个流会向下游传播错误,我们可以统一处理。
2.2 Throttle
/ ThrottleFirst
/ Debounce
:控制事件频率
这些操作符用于限制事件流的发射频率,在处理高频事件时非常有用。
Throttle(TimeSpan)
: 在指定的时间窗口内,只发射最后一次事件。适用于搜索框输入(用户停止输入一段时间后才触发搜索)。ThrottleFirst(TimeSpan)
: 在指定的时间窗口内,只发射第一次事件。适用于防止按钮重复点击。Debounce(TimeSpan)
: 与Throttle
类似,但在时间窗口内,如果又收到新事件,则会重置计时器。
using UnityEngine;
using UnityEngine.UI;
using UniRx;
using System;public class FrequencyControl : MonoBehaviour
{public Button spamButton;public Text statusText;void Start(){// 防止快速双击:0.5秒内只响应第一次点击spamButton.OnClickAsObservable().ThrottleFirst(TimeSpan.FromSeconds(0.5f)).Subscribe(_ =>{statusText.text = "按钮被点击 (ThrottleFirst)";Debug.Log("按钮被点击 (ThrottleFirst)");}).AddTo(this);// 假设有一个持续的输入流(例如鼠标移动事件)// 每0.2秒最多处理一次Observable.EveryUpdate().Where(_ => Input.GetMouseButton(0)) // 鼠标左键按下时.Throttle(TimeSpan.FromSeconds(0.2f)) // 每0.2秒只处理一次.Subscribe(_ =>{Debug.Log("鼠标拖拽事件 (Throttle)");}).AddTo(this);}
}
2.3 Where
和 Select
:过滤与映射
Where(predicate)
: 过滤流中的事件,只让满足特定条件的事件通过。Select(selector)
: 将流中的每个事件映射(转换)成一个新的值。
这两个操作符非常基础和常用,用于构建更精确的事件流。
3. 错误处理:构建健壮的响应式系统
在复杂的响应式流中,错误是不可避免的。网络请求可能失败、文件加载可能不存在、某个内部计算可能抛出异常。如果不妥善处理,一个错误可能会终止整个流,导致应用程序崩溃或进入不可预测的状态。
UniRx 提供了一系列操作符来优雅地处理错误,让我们的响应式系统更加健壮。
3.1 Catch
:捕获并替换错误流
Catch
操作符用于捕获上游流中的错误,并在错误发生时,用另一个 Observable 替换 整个流。
using UnityEngine;
using UniRx;
using System;
using System.Threading.Tasks;public class ErrorHandlingCatch : MonoBehaviour
{void Start(){var source = Observable.FromAsync(async () =>{Debug.Log("开始模拟可能出错的操作...");await Task.Delay(TimeSpan.FromSeconds(1));if (UnityEngine.Random.value < 0.5f) // 50% 概率出错{throw new Exception("模拟操作失败!");}return "操作成功!";});source.Catch((Exception ex) =>{Debug.LogError($"错误被 Catch 捕获: {ex.Message}");// 当错误发生时,返回一个新的 Observable 流来替代return Observable.Return("从错误中恢复,返回默认值。");}).Subscribe(result => Debug.Log($"最终结果: {result}"),error => Debug.LogError($"Subscribe 接收到未被 Catch 处理的错误: {error.Message}"),() => Debug.Log("流完成")).AddTo(this);}
}
在这个例子中,如果 source
流发生错误,Catch
会捕获它,并发射 Observable.Return("从错误中恢复...")
,然后流会正常完成。订阅者只会收到这个恢复值,而不会收到错误通知。
3.2 OnErrorResumeNext
:捕获并切换到下一个流
OnErrorResumeNext
操作符在源 Observable 遇到错误时,会立即切换到你指定的下一个 Observable。这与 Catch
类似,但它更强调在错误发生后“继续”使用另一个完整的流。
using UnityEngine;
using UniRx;
using System;
using System.Threading.Tasks;public class ErrorHandlingResumeNext : MonoBehaviour
{void Start(){var mainOperation = Observable.FromAsync(async () =>{Debug.Log("主操作开始...");await Task.Delay(TimeSpan.FromSeconds(1));if (UnityEngine.Random.value < 0.5f) // 50% 概率出错{throw new Exception("主操作失败!");}return "主操作成功数据";});var fallbackOperation = Observable.FromAsync(async () =>{Debug.Log("备用操作开始...");await Task.Delay(TimeSpan.FromSeconds(0.5f));return "备用操作成功数据";});mainOperation.OnErrorResumeNext(fallbackOperation) // 如果 mainOperation 出错,则切换到 fallbackOperation.Subscribe(data => Debug.Log($"最终接收到数据: {data}"),error => Debug.LogError($"Subscribe 接收到未处理的错误: {error.Message}"),() => Debug.Log("流完成")).AddTo(this);}
}
这里,如果 mainOperation
失败,流会无缝切换到 fallbackOperation
。订阅者会收到 fallbackOperation
发出的值,然后流正常完成。
3.3 Retry
/ RetryWhen
:重试机制
Retry()
: 当源 Observable 发生错误时,无条件地重新订阅源 Observable。可以指定重试次数Retry(count)
。RetryWhen(selector)
: 提供更复杂的重试逻辑。它接收一个错误流,你可以根据错误类型或重试次数,决定是立即重试、延迟重试,还是最终抛出错误。
应用场景: 网络请求的自动重试、临时性故障的恢复。
using UnityEngine;
using UniRx;
using System;
using System.Threading.Tasks;public class ErrorHandlingRetry : MonoBehaviour
{private int _attemptCount = 0;void Start(){var unstableOperation = Observable.FromAsync(async () =>{_attemptCount++;Debug.Log($"尝试执行操作 (第 {_attemptCount} 次)...");await Task.Delay(TimeSpan.FromSeconds(1));if (_attemptCount < 3) // 前2次模拟失败{throw new Exception("操作不稳定,请重试!");}Debug.Log("操作终于成功了!");return "成功数据";});// 无条件重试3次unstableOperation.Retry(3) // 会尝试最多3次(第一次执行+2次重试).Subscribe(data => Debug.Log($"最终成功接收到数据: {data}"),error => Debug.LogError($"操作在重试后仍然失败: {error.Message}"),() => Debug.Log("流完成")).AddTo(this);// --- 复杂重试逻辑示例 (RetryWhen) ---// 模拟一个只有在特定条件满足时才重试的操作// 例如,只重试网络错误,且延迟重试/*_attemptCount = 0; // 重置计数器var smartRetryOperation = Observable.FromAsync(async () =>{_attemptCount++;Debug.Log($"智能重试操作 (第 {_attemptCount} 次)...");await Task.Delay(TimeSpan.FromSeconds(0.5f));if (_attemptCount < 2){throw new InvalidOperationException("模拟业务逻辑错误,不重试!"); // 不想重试的错误}if (_attemptCount < 4){throw new Exception("模拟网络错误,需要重试!"); // 想重试的错误}return "智能重试成功";});smartRetryOperation.RetryWhen(errors =>errors.SelectMany(error =>{if (error is InvalidOperationException){// 如果是业务逻辑错误,则直接抛出,不重试return Observable.Throw<long>(error);}// 对于其他错误,延迟2秒后重试Debug.Log($"检测到错误,将在2秒后重试: {error.Message}");return Observable.Timer(TimeSpan.FromSeconds(2));})).Subscribe(data => Debug.Log($"智能重试最终成功: {data}"),error => Debug.LogError($"智能重试最终失败: {error.Message}"),() => Debug.Log("智能重试流完成")).AddTo(this);*/}
}
Retry
和 RetryWhen
是构建弹性系统的关键。RetryWhen
尤其强大,它允许你根据错误的类型、重试的次数等,定制化重试策略,甚至可以实现指数退避重试(Exponential Backoff)。
4. 调度器 (Scheduler):线程管理与上下文切换
在响应式编程中,操作符的执行上下文(线程)是一个重要的概念。在 Unity 中,大部分 UI 操作和游戏逻辑都必须在主线程上执行。UniRx 的调度器 (Scheduler
) 负责管理 Observable 序列的执行,包括订阅、事件发送和操作符的执行。
Scheduler.MainThread
: UniRx 默认的调度器,确保所有事件都在 Unity 主线程上发布。这是你大部分时间都会使用的调度器,因为它避免了跨线程访问 Unity API 的问题。Scheduler.ThreadPool
/Scheduler.CurrentThread
: 用于在后台线程执行耗时操作,避免阻塞主线程。
应用场景: 当你有一个非常耗时的计算,或者需要从网络线程切换回主线程更新 UI 时。
using UnityEngine;
using UnityEngine.UI;
using UniRx;
using System;
using System.Threading.Tasks;public class SchedulerExample : MonoBehaviour
{public Button startComputeButton;public Text resultText;void Start(){startComputeButton.OnClickAsObservable().SelectMany(_ => Observable.FromAsync(() => DoHeavyComputation())) // 耗时操作.ObserveOn(Scheduler.MainThread) // 将结果调度回主线程.Subscribe(result =>{resultText.text = $"计算结果: {result}";Debug.Log($"计算完成,在主线程更新UI: {result}");},ex => Debug.LogError($"计算失败: {ex.Message}")).AddTo(this);}// 模拟一个耗时的后台计算private async UniTask<int> DoHeavyComputation(){Debug.Log($"开始耗时计算,当前线程ID: {System.Threading.Thread.CurrentThread.ManagedThreadId}");await Task.Delay(TimeSpan.FromSeconds(3)); // 模拟耗时int sum = 0;for (int i = 0; i < 100000000; i++) // 模拟CPU密集型计算{sum += i;}Debug.Log($"耗时计算完成,当前线程ID: {System.Threading.Thread.CurrentThread.ManagedThreadId}");return sum;}
}
在这个例子中:
OnClickAsObservable
在主线程触发。SelectMany
内部的DoHeavyComputation
使用UniTask
,它默认在线程池执行,不会阻塞主线程。ObserveOn(Scheduler.MainThread)
是关键:它确保Subscribe
中的代码(更新resultText
)总是在 Unity 主线程上执行,避免跨线程访问 UI 组件的错误。
重要提示: 除非你明确知道自己在做什么,否则请始终使用 ObserveOn(Scheduler.MainThread)
在异步操作完成后切换回主线程来更新 Unity UI 或访问其他 Unity API。
5. 总结与展望
本篇教程深入讲解了响应式编程中的核心高级操作符:
- 组合操作符 (
CombineLatest
,Merge
,Zip
): 如何将多个独立的事件流合并为一个。 - 转换与过滤操作符 (
SelectMany
,Throttle
,Where
,Select
): 如何重塑事件流以满足复杂的业务逻辑和性能需求。 - 错误处理 (
Catch
,OnErrorResumeNext
,Retry
,RetryWhen
): 如何构建能够从错误中恢复的健壮系统。 - 调度器 (
Scheduler.MainThread
): 如何在 Unity 中安全地进行线程切换,以避免阻塞主线程和跨线程访问问题。
掌握这些操作符是构建复杂、高性能、可维护的响应式应用程序的关键。它们让你能够以一种声明式、模块化的方式来表达复杂的异步和事件驱动逻辑,大大降低了代码的耦合度和维护成本。
在下一篇教程中,我们将把这些响应式编程的理念和工具提升到架构层面,探讨 响应式架构与 MVVM (Model-View-ViewModel) 模式在 Unity 中的应用。这将帮助你构建出更加清晰、可测试和可扩展的 Unity 应用程序。
响应式编程入门教程第一节:揭秘 UniRx 核心 - ReactiveProperty - 让你的数据动起来!
响应式编程入门教程第二节:构建 ObservableProperty<T> — 封装 ReactiveProperty 的高级用法
响应式编程入门教程第三节:ReactiveCommand 与 UI 交互
响应式编程入门教程第四节:响应式集合与数据绑定
响应式编程入门教程第五节:Unity 生命周期与资源管理中的响应式编程
响应式编程入门教程第六节:进阶?Combine、Merge、SelectMany 与错误处理