响应式编程入门教程第一节:揭秘 UniRx 核心 - ReactiveProperty - 让你的数据动起来!
响应式编程入门教程第二节:构建 ObservableProperty<T> — 封装 ReactiveProperty 的高级用法
响应式编程入门教程第三节:ReactiveCommand 与 UI 交互
响应式编程入门教程第五节:Unity 生命周期与资源管理中的响应式编程
响应式编程入门教程第六节:进阶?Combine、Merge、SelectMany 与错误处理
响应式编程入门教程第七节:响应式架构与 MVVM 模式在 Unity 中的应用
[响应式编程入门教程第九节:UniRx 高级特性与自定义](https://blog.csdn.net/qq_46348216/article/details/149413198?fromshare=blogdetail&sharetype=blogdetail&sharerId=149413198&sharerefer=PC&sharesource=qq_46348216&shar
恭喜你坚持到了这个系列的最后一篇!在前八篇中,我们从 ReactiveProperty
的基础概念开始,逐步深入到 ReactiveCommand
、ReactiveCollection
、生命周期管理、高级操作符,直到响应式架构的 MVVM 模式和性能优化。现在,我们将探索 UniRx 库更深层次的奥秘:它的内部运作机制,以及如何在必要时自定义行为或创建自己的操作符。
理解这些高级特性,不仅能让你更好地利用 UniRx,还能帮助你调试更复杂的问题,甚至为库贡献代码。
1. 深入理解 IObservable<T>
和 IObserver<T>
UniRx 的核心是 Rx (Reactive Extensions) 的实现,它基于两个核心接口:
IObservable<T>
(可观察对象): 表示一个可以发出零个或多个通知(数据、错误、完成)的生产者。- 它只有一个方法:
IDisposable Subscribe(IObserver<T> observer)
。当你调用Subscribe
时,你就成为了一个观察者,并开始接收来自IObservable
的通知。同时,Subscribe
返回一个IDisposable
,用于取消订阅。
- 它只有一个方法:
IObserver<T>
(观察者): 表示一个接收来自IObservable
通知的使用者。- 它有三个核心方法:
void OnNext(T value)
:当IObservable
发出一个新值时调用。void OnError(Exception error)
:当IObservable
遇到错误时调用。错误会终止序列。void OnCompleted()
:当IObservable
完成时调用。完成也会终止序列。
- 它有三个核心方法:
所有的操作符(如 Where
, Select
, Merge
等)都是基于这两个接口构建的。当你链式调用操作符时,实际上是在构建一个观察者链条,数据流从上游向下游传递。
示例:手动实现一个简单的 Observable
这部分通常不需要在日常开发中手动编写,但有助于理解原理。
using System;
using UniRx;
using UnityEngine;
// 自定义一个简单的 Observable,每秒发出一个递增的数字
public class MySimpleObservable : IObservable<int>
{
private class MyDisposable : IDisposable
{
private Action _onDispose;
public MyDisposable(Action onDispose)
{
_onDispose = onDispose;
}
public void Dispose()
{
_onDispose?.Invoke();
_onDispose = null;
}
}
public IDisposable Subscribe(IObserver<int> observer)
{
Debug.Log("MySimpleObservable: 新的观察者订阅了。");
int count = 0;
// 模拟一个每秒递增的异步操作
IDisposable timerDisposable = Observable.Interval(TimeSpan.FromSeconds(1))
.Subscribe(
_ =>
{
observer.OnNext(count++); // 发送下一个值
},
observer.OnError, // 传递错误
observer.OnCompleted // 传递完成信号
);
// 返回一个 IDisposable,当调用 Dispose 时,取消内部的计时器订阅
return new MyDisposable(() =>
{
Debug.Log("MySimpleObservable: 订阅被取消。");
timerDisposable.Dispose();
});
}
}
public class ObservableUnderstanding : MonoBehaviour
{
void Start()
{
// 订阅我们自定义的 Observable
MySimpleObservable myObservable = new MySimpleObservable();
IDisposable subscription = myObservable.Subscribe(new DebugObserver()); // 使用自定义观察者
// 也可以使用 UniRx 的扩展方法
// IDisposable subscription = myObservable.Subscribe(
// x => Debug.Log($"接收到值: {x}"),
// ex => Debug.LogError($"接收到错误: {ex.Message}"),
// () => Debug.Log("流已完成")
// ).AddTo(this); // 确保订阅被清理
Observable.Timer(TimeSpan.FromSeconds(5))
.Subscribe(_ =>
{
Debug.Log("5秒后手动取消订阅...");
subscription.Dispose(); // 手动取消订阅
}).AddTo(this);
}
}
// 一个简单的 IObserver 实现,用于打印接收到的事件
public class DebugObserver : IObserver<int>
{
public void OnNext(int value)
{
Debug.Log($"[DebugObserver] OnNext: {value}");
}
public void OnError(Exception error)
{
Debug.LogError($"[DebugObserver] OnError: {error.Message}");
}
public void OnCompleted()
{
Debug.Log("[DebugObserver] OnCompleted");
}
}
这个例子展示了 IObservable
如何接受一个 IObserver
并返回 IDisposable
。当 IDisposable
被 Dispose
时,它会清理内部资源(这里是停止了 Observable.Interval
的计时器),从而停止向观察者发送通知。
2. Hot Observable vs. Cold Observable
这是响应式编程中一个重要的概念:
Cold Observable (冷可观察对象):
- 每次有新的观察者订阅时,它都会重新开始执行其“生产”逻辑。就像一个点播电影,每个观众观看时电影都会从头播放。
- 示例:
Observable.Range
,Observable.Create
,Observable.FromAsync
,Observable.Start
等。 - 优点: 简单直观,每个订阅者都拥有独立的执行上下文。
- 缺点: 如果生产逻辑耗时或资源密集,多次订阅会造成重复计算和资源浪费。
Hot Observable (热可观察对象):
- 无论是否有观察者订阅,它都会一直在生产事件。它在订阅发生之前就开始发出事件,并且事件会在所有订阅者之间共享。就像一个直播电视节目,无论你何时打开电视,你都从当前播放的地方开始看,并且所有观众看的是同一个流。
- 示例:
ReactiveProperty
,ReactiveCollection
,Subject
,AsyncSubject
,BehaviorSubject
,ReplaySubject
。 - 优点: 适用于共享资源、避免重复计算、处理实时事件。
- 缺点: 如果订阅者在流开始后才订阅,会错过之前已经发出的事件(除非使用
ReplaySubject
或BehaviorSubject
)。
将 Cold Observable 转换为 Hot Observable:
你可以使用 Publish()
和 Share()
等操作符将 Cold Observable 转换为 Hot Observable。
Publish()
+Connect()
:Publish()
会返回一个IConnectableObservable<T>
。当你调用Connect()
方法时,上游的 Cold Observable 才真正开始执行。Publish().RefCount()
: 这是更常用的模式。RefCount()
会确保当第一个观察者订阅时,上游的 Observable 开始执行;当所有观察者都取消订阅时,上游的 Observable 停止执行。这非常适合那些需要在有消费者时才激活的共享资源。Share()
:Share()
是Publish().RefCount()
的一个简写版本,行为相似。
using UnityEngine;
using UniRx;
using System;
using System.Threading.Tasks;
public class HotVsCold : MonoBehaviour
{
void Start()
{
// --- Cold Observable 示例 ---
// 每次订阅都会重新执行 FromAsync 内部的 Task
var coldObservable = Observable.FromAsync(async () =>
{
Debug.Log("Cold Observable: 开始耗时操作...");
await Task.Delay(TimeSpan.FromSeconds(1));
Debug.Log("Cold Observable: 耗时操作完成。");
return "Cold Data";
});
Debug.Log("订阅 Cold Observable 1");
coldObservable.Subscribe(x => Debug.Log($"Cold 1: {x}")).AddTo(this);
Debug.Log("订阅 Cold Observable 2 (会再次执行耗时操作)");
coldObservable.Subscribe(x => Debug.Log($"Cold 2: {x}")).AddTo(this);
// --- Hot Observable (Publish().RefCount()) 示例 ---
// 上游的耗时操作只执行一次,结果被共享
var hotObservable = Observable.FromAsync(async () =>
{
Debug.Log("Hot Observable: 开始耗时操作 (只会执行一次)...");
await Task.Delay(TimeSpan.FromSeconds(1));
Debug.Log("Hot Observable: 耗时操作完成。");
return "Hot Data";
})
.Publish() // 将 Cold 转换为 ConnectableObservable
.RefCount(); // 当有订阅者时连接,无订阅者时断开
Debug.Log("订阅 Hot Observable 1");
hotObservable.Subscribe(x => Debug.Log($"Hot 1: {x}")).AddTo(this);
Observable.Timer(TimeSpan.FromSeconds(0.5f)) // 延迟0.5秒订阅
.Subscribe(_ =>
{
Debug.Log("订阅 Hot Observable 2 (会共享结果)");
hotObservable.Subscribe(x => Debug.Log($"Hot 2: {x}")).AddTo(this);
}).AddTo(this);
// 注意:Hot Observable 2 会在流已经开始后才订阅,因此可能会错过流开始时发出的某些事件
// 对于这种情况,可以使用 ReplaySubject 或 BehaviorSubject
}
}
理解 Hot 和 Cold Observable 对于设计高效的响应式系统至关重要,尤其是在处理共享资源或避免重复计算时。
3. Subject 类型:兼具 Observable 和 Observer 的特性
Subject
是一个特殊类型,它既是 IObservable
又是 IObserver
。这意味着你可以:
- 像
IObserver
一样向它发送值(OnNext
)、错误(OnError
)或完成信号(OnCompleted
)。 - 像
IObservable
一样被订阅,从而接收它发出的事件。
UniRx 提供了几种不同类型的 Subject
,每种都有其特定行为:
Subject<T>
: 最基本的Subject
。它只会向订阅后的观察者发送事件。如果观察者在事件发生前订阅,它会错过这些事件。- 应用场景: 实现事件总线 (Event Bus)、手动触发事件。
AsyncSubject<T>
: 只会发出它在完成前发送的最后一个值。当它完成时,它会将这个最终值发射给所有当前和未来的观察者。- 应用场景: 只需要异步操作的最终结果,例如一个只执行一次的资源加载,所有等待者都只关心加载成功后的最终资源。
BehaviorSubject<T>
: 当有新的观察者订阅时,会立即发出它最近的一个值,然后才发出后续的值。在创建时需要提供一个默认值。- 应用场景: 缓存最新状态,例如当前游戏分数、玩家当前位置,新加入的 UI 模块可以立即获取到最新状态。
ReplaySubject<T>
: 会向所有新订阅的观察者重放所有或指定数量的过去事件。可以设置缓冲区大小或时间窗口。- 应用场景: 记录事件历史,例如聊天记录、操作日志,新加入的观察者需要看到过去的事件。
using UnityEngine;
using UniRx;
using System;
using System.Threading.Tasks;
public class SubjectTypes : MonoBehaviour
{
void Start()
{
// --- Subject<T> ---
Debug.Log("--- Subject<int> 示例 ---");
var subject = new Subject<int>();
subject.OnNext(100); // 订阅前发出,会被错过
subject.Subscribe(x => Debug.Log($"Subject Sub1: {x}")).AddTo(this);
subject.OnNext(1);
subject.OnNext(2);
subject.Subscribe(x => Debug.Log($"Subject Sub2: {x}")).AddTo(this); // 在此之后订阅
subject.OnNext(3);
subject.OnCompleted(); // 完成流
// --- BehaviorSubject<T> ---
Debug.Log("--- BehaviorSubject<string> 示例 ---");
var behaviorSubject = new BehaviorSubject<string>("初始状态");
behaviorSubject.Subscribe(s => Debug.Log($"Behavior Sub1: {s}")).AddTo(this); // 立即收到"初始状态"
behaviorSubject.OnNext("状态A");
behaviorSubject.OnNext("状态B");
behaviorSubject.Subscribe(s => Debug.Log($"Behavior Sub2: {s}")).AddTo(this); // 立即收到"状态B"
behaviorSubject.OnNext("状态C");
behaviorSubject.OnCompleted();
// --- ReplaySubject<T> (缓冲区大小为2) ---
Debug.Log("--- ReplaySubject<float> 示例 (缓冲区2) ---");
var replaySubject = new ReplaySubject<float>(2); // 缓存最近2个值
replaySubject.OnNext(10.1f);
replaySubject.OnNext(10.2f);
replaySubject.OnNext(10.3f); // 10.1f 会被移除
replaySubject.Subscribe(f => Debug.Log($"Replay Sub1: {f}")).AddTo(this); // 收到10.2f, 10.3f
replaySubject.OnNext(10.4f);
replaySubject.Subscribe(f => Debug.Log($"Replay Sub2: {f}")).AddTo(this); // 收到10.3f, 10.4f
replaySubject.OnCompleted();
// --- AsyncSubject<T> ---
Debug.Log("--- AsyncSubject<bool> 示例 ---");
var asyncSubject = new AsyncSubject<bool>();
asyncSubject.OnNext(false); // 不是最后一个值
asyncSubject.Subscribe(b => Debug.Log($"Async Sub1: {b}")).AddTo(this); // 不会立即收到
asyncSubject.OnNext(true); // 最后一个值
asyncSubject.OnCompleted(); // 完成时才发出最后一个值
asyncSubject.Subscribe(b => Debug.Log($"Async Sub2: {b}")).AddTo(this); // 也会收到最后一个值 (true)
}
}
选择正确的 Subject
类型对于管理事件流和数据状态至关重要。
4. 自定义操作符 (Custom Operators)
虽然 UniRx 提供了丰富的操作符,但在某些特定情况下,你可能需要创建自己的操作符来封装复杂的、可复用的逻辑。自定义操作符本质上是返回 IObservable<TResult>
的扩展方法。
创建自定义操作符的步骤:
- 定义一个静态类,包含你的扩展方法。
- 扩展方法通常接收一个
IObservable<TSource>
作为this
参数。 - 在方法内部,使用
Observable.Create<TResult>
来创建一个新的 Observable。 - 在
Observable.Create
的 Lambda 表达式中,实现你的订阅逻辑:订阅上游 Observable,并根据你的业务逻辑向下游观察者发送数据 (observer.OnNext
)、错误 (observer.OnError
) 或完成信号 (observer.OnCompleted
)。 - 返回一个
IDisposable
,用于清理你在自定义操作符内部创建的所有订阅。
示例:ThrottleWithFirst
(在 UniRx 中可能已有类似功能,这里仅作演示)
假设我们想创建一个操作符,它在指定时间窗口内只发射第一个值,并且在窗口结束后,如果还有后续事件,则重新开始计时。这类似于 ThrottleFirst
,但更灵活。
using UnityEngine;
using UniRx;
using System;
using System.Threading; // For CancellationTokenSource
public static class CustomRxOperators
{
// 扩展方法,可以链式调用
public static IObservable<TSource> ThrottleWithFirst<TSource>(this IObservable<TSource> source, TimeSpan dueTime)
{
// Observable.Create 用于构建自定义操作符
return Observable.Create<TSource>(observer =>
{
// 用于控制时间窗口的 CancellationTokenSource
var cts = new CancellationTokenSource();
bool isThrottling = false; // 是否处于节流状态
object gate = new object(); // 线程锁
// 订阅上游 Observable
var disposable = source.Subscribe(
value =>
{
lock (gate)
{
if (!isThrottling)
{
// 如果不在节流状态,发射第一个值
observer.OnNext(value);
isThrottling = true;
// 开启一个计时器,在 dueTime 后解除节流状态
Observable.Timer(dueTime)
.Subscribe(_ =>
{
lock (gate)
{
isThrottling = false;
}
}, cts.Token); // 使用 CancellationToken 确保计时器在外部取消订阅时停止
}
}
},
observer.OnError, // 错误直接传递给下游
observer.OnCompleted // 完成信号直接传递给下游
);
// 返回一个 IDisposable,用于清理内部资源
return new CompositeDisposable(disposable, Disposable.Create(() => cts.Cancel()));
});
}
}
public class CustomOperatorExample : MonoBehaviour
{
void Start()
{
// 模拟一个高频的鼠标点击流
Observable.EveryUpdate()
.Where(_ => Input.GetMouseButtonDown(0))
.ThrottleWithFirst(TimeSpan.FromSeconds(1)) // 应用自定义操作符
.Subscribe(_ =>
{
Debug.Log($"鼠标点击 (节流): {Time.time}");
})
.AddTo(this);
}
}
自定义操作符让你能够将复杂的响应式模式封装成简洁、可复用的单元,提升代码的可读性和维护性。然而,编写自定义操作符需要对 Rx 的内部机制有深入理解,并且要特别注意线程安全和资源管理(确保 IDisposable
的正确返回和清理)。
5. UniRx 的 Triggers
命名空间
我们在前面的教程中已经多次使用 UniRx.Triggers
命名空间下的扩展方法(如 OnClickAsObservable
, UpdateAsObservable
, OnTriggerEnterAsObservable
)。这些扩展方法将 Unity MonoBehaviour 的生命周期回调和事件转换为 IObservable
流,极大地简化了 Unity 和响应式编程的集成。
这些 AsObservable()
方法的实现原理通常是在内部为每个 MonoBehaviour 添加一个隐藏的组件(或利用 ObservableStateMachineTrigger
等),来捕获对应的 Unity 事件,然后通过 Subject
发射给订阅者。这使得你的代码可以完全脱离传统的 Unity 回调,以声明式的方式处理所有事件。
6. 单元测试与响应式代码
由于 ViewModel 及其内部的响应式逻辑是纯 C# 代码,这使得它们非常适合进行单元测试。你可以使用任何 .NET 单元测试框架(如 NUnit)来测试你的 ViewModel。
关键在于:
- 模拟依赖: 使用 Mocking 框架(如 Moq)来模拟 Model 层或任何外部依赖。
- 控制时间: 对于涉及到时间的操作符(如
Delay
,Interval
,Throttle
),需要使用TestScheduler
来精确控制时间流逝,以便进行确定性测试。
示例(概念性代码,需要 NUnit 和 UniRx.Tests 包):
/*
using NUnit.Framework;
using UniRx;
using UniRx.Async; // 用于测试异步操作
using UniRx.Diagnostics; // 用于测试 Scheduler
using System;
using System.Threading.Tasks;
// 假设这是我们要测试的 ViewModel
public class MySimpleViewModel
{
public ReactiveProperty<int> Counter { get; private set; } = new ReactiveProperty<int>(0);
public ReactiveCommand IncrementCommand { get; private set; }
public MySimpleViewModel()
{
IncrementCommand = Counter
.Select(c => c < 5) // 只能增加到5
.ToReactiveCommand();
IncrementCommand.Subscribe(_ => Counter.Value++);
}
}
[TestFixture]
public class MySimpleViewModelTests
{
// 测试计数器是否正确递增
[Test]
public void CounterIncrementsCorrectly()
{
var viewModel = new MySimpleViewModel();
Assert.AreEqual(0, viewModel.Counter.Value);
viewModel.IncrementCommand.Execute();
Assert.AreEqual(1, viewModel.Counter.Value);
viewModel.IncrementCommand.Execute();
Assert.AreEqual(2, viewModel.Counter.Value);
}
// 测试命令的可执行性
[Test]
public void IncrementCommandCanExecute()
{
var viewModel = new MySimpleViewModel();
// 初始时可执行
Assert.IsTrue(viewModel.IncrementCommand.CanExecute.Value);
// 连续执行5次
for (int i = 0; i < 5; i++)
{
viewModel.IncrementCommand.Execute();
}
// 此时 Counter 应该为 5,命令不可执行
Assert.AreEqual(5, viewModel.Counter.Value);
Assert.IsFalse(viewModel.IncrementCommand.CanExecute.Value);
// 再次尝试执行,Counter 不会改变
viewModel.IncrementCommand.Execute();
Assert.AreEqual(5, viewModel.Counter.Value);
}
// 针对异步操作的测试,需要 TestScheduler
[Test]
public async Task AsyncOperationCompletes()
{
var testScheduler = new TestScheduler();
var asyncData = Observable.FromAsync(async () =>
{
await Task.Delay(TimeSpan.FromSeconds(1)); // 模拟异步延迟
return "Loaded Data";
}).ObserveOn(testScheduler); // 在 TestScheduler 上观察
string result = null;
asyncData.Subscribe(data => result = data);
Assert.IsNull(result); // 此时还没有数据
testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(500).Ticks); // 推进半秒
Assert.IsNull(result); // 还没有完成
testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(500).Ticks); // 再推进半秒,总共1秒
Assert.AreEqual("Loaded Data", result); // 此时数据应该已加载
}
}
*/
单元测试是保证复杂系统质量的基石。响应式编程的声明式特性和对副作用的限制,使得其核心逻辑(通常在 ViewModel 中)比传统命令式代码更容易进行单元测试。
7. 总结与最终展望
至此,我们已经完成了 UniRx 响应式编程的全面入门到深入教程。我们从最基础的 ReactiveProperty
开始,逐步构建起了对整个 UniRx 生态的理解:
- 数据绑定与 UI 交互:
ReactiveProperty
和ReactiveCommand
。 - 集合管理:
ReactiveCollection
。 - 生命周期与资源: 订阅管理、异步操作封装。
- 复杂逻辑构建:
Combine
,Merge
,SelectMany
等高级操作符。 - 健壮性: 错误处理和调度器。
- 架构: MVVM 模式。
- 性能: Profiler 与优化策略。
- 底层与扩展:
IObservable
/IObserver
原理、Hot/Cold Observable、Subject 类型、自定义操作符和单元测试。
UniRx 提供了一个优雅且强大的范式,用于处理 Unity 游戏开发中常见的异步、事件驱动和状态管理问题。它能帮助你编写出更具声明性、可测试性、可维护性和可扩展性的代码。
响应式编程的学习曲线可能相对陡峭,但一旦你掌握了它的思维模式,你会发现它能极大地提升你的开发效率和代码质量。持续实践、查阅文档、阅读优秀的开源项目代码,是巩固和提升响应式编程技能的最佳途径。
希望这个系列的教程能为你打开响应式编程的大门,并帮助你在 Unity 开发中敲定坚实基础!祝你编程顺利!
响应式编程入门教程第一节:揭秘 UniRx 核心 - ReactiveProperty - 让你的数据动起来!
响应式编程入门教程第二节:构建 ObservableProperty<T> — 封装 ReactiveProperty 的高级用法
响应式编程入门教程第三节:ReactiveCommand 与 UI 交互
响应式编程入门教程第五节:Unity 生命周期与资源管理中的响应式编程
响应式编程入门教程第六节:进阶?Combine、Merge、SelectMany 与错误处理