R3:适用于 .NET 的新一代响应式扩展库
R3 是 dotnet/reactive(.NET 官方响应式扩展)与 UniRx(适用于 Unity 的响应式扩展)的新一代替代方案,支持多种平台,包括 Unity、Godot、Avalonia、WPF、WinForms、WinUI3、Stride、LogicLooper、MAUI、MonoGame、Blazor 和 Uno。
我拥有超过 10 年的 Rx 使用经验,不仅为游戏引擎实现过自定义 Rx 运行时(UniRx),还为游戏引擎开发过异步运行时(UniTask)。基于这些经验,我认为有必要为 .NET 实现一款全新的响应式扩展库,它既要能体现现代 C# 的特性,又要回归 Rx 的核心价值。
核心设计理念
R3 的设计源于对传统 Rx 局限性的反思,核心理念如下:
- 在 OnError 处中断管道是错误设计:传统 Rx 中,管道遇到异常会触发 OnError 并自动取消订阅,这在事件处理场景中过于严苛,且难以通过 Retry 等操作符优雅恢复。
- IScheduler 是性能瓶颈的根源:传统 Rx 的 IScheduler 抽象层导致性能损耗,且内部实现复杂(如通过 PeriodicTimer 和 IStopwatch 规避问题)。
- 基于帧的操作是游戏引擎的关键需求:传统 Rx 缺乏帧循环适配能力,而游戏引擎(如 Unity、Godot)中,基于帧的事件处理(如每帧更新、延迟 N 帧执行)至关重要。
- 单一异步操作应完全交给 async/await:Rx 应专注于事件流处理,而非替代 async/await 处理单个异步任务(如网络请求、文件读写)。
- 不实现同步 API:同步操作无需 Rx 抽象,避免冗余设计。
- 查询语法(Query Syntax)仅适用于 SQL:C# 的 LINQ 查询语法(如
from x in xs select x
)在 Rx 中使用体验不佳,应优先使用方法链语法。 - 订阅列表是防止内存泄漏的必要手段:需提供类似 “并行调试器” 的订阅跟踪能力,解决 GUI 或游戏等长生命周期应用的内存泄漏问题。
- 背压(Backpressure)交给 IAsyncEnumerable 和 Channels:Rx 无需重复实现背压机制,可复用 .NET 原生组件。
- 分布式处理与查询有更优方案:GraphQL、Kubernetes、Orleans、Akka.NET、gRPC、MagicOnion 等工具更适合分布式场景,Rx 应专注于内存内消息处理(LINQ to Events)。
与传统 Rx 的核心差异
为解决 dotnet/reactive 的不足,R3 对核心接口进行了重构。近年来,Kotlin Flow、Swift Combine 等面向现代语言特性的 Rx 类框架已成为标准,而 C# 也已演进至 C# 12,因此 R3 旨在打造一款与最新 C# 特性对齐的响应式扩展库。
1. 性能优化
性能提升是 R3 重构的核心目标之一,以下是关键优化点:
- 移除 IScheduler 带来的性能提升:传统 Rx 的 IScheduler 导致大量冗余计算,R3 改用 .NET 8 引入的 TimeProvider,性能显著提升(如下图,
Observable.Range(1, 10000).Subscribe()
的执行效率对比)。 - Subject 订阅 / 取消的内存优化:传统 Rx 的 Subject 使用 ImmutableArray 存储订阅者,每次添加 / 删除订阅都会分配新数组,导致内存频繁波动。R3 采用自定义数据结构,避免 ImmutableArray 带来的性能损耗(如下图,
10000 次 subject.Subscribe() + 10000 次 subscription.Dispose()
的内存分配对比)。
2. 核心接口重构
R3 的表面 API 与传统 Rx 保持一致(便于迁移),但内部接口完全重构,不再依赖 IObservable<T>/IObserver<T>
。
2.1 新接口定义
传统 Rx 的 IObservable<T>/IObserver<T>
虽在理论上优雅(与 IEnumerable<T>
对偶),但实际使用中存在诸多限制。R3 改用抽象类实现,确保行为可控:
csharp
using System;
namespace R3
{
/// <summary>
/// 可观察对象抽象类,替代传统 IObservable<T>
/// </summary>
/// <typeparam name="T">事件流中数据的类型</typeparam>
public abstract class Observable<T>
{
/// <summary>
/// 订阅可观察对象,返回可取消订阅的 IDisposable
/// </summary>
/// <param name="observer">观察者对象,用于接收事件</param>
/// <returns>用于取消订阅的 IDisposable 实例</returns>
public abstract IDisposable Subscribe(Observer<T> observer);
}
/// <summary>
/// 观察者抽象类,替代传统 IObserver<T>,并实现 IDisposable 用于资源释放
/// </summary>
/// <typeparam name="T">接收数据的类型</typeparam>
public abstract class Observer<T> : IDisposable
{
/// <summary>
/// 接收正常事件数据
/// </summary>
/// <param name="value">事件数据</param>
public abstract void OnNext(T value);
/// <summary>
/// 接收异常事件(不会中断订阅)
/// 区别于传统 Rx 的 OnError,此处异常不会自动取消订阅,需手动处理
/// </summary>
/// <param name="error">异常对象</param>
public abstract void OnErrorResume(Exception error);
/// <summary>
/// 接收事件流完成通知(成功/失败)
/// 合并传统 Rx 的 OnCompleted 和 OnError,用 Result 表示完成状态
/// </summary>
/// <param name="result">完成结果,包含成功/失败状态</param>
public abstract void OnCompleted(Result result);
/// <summary>
/// 释放观察者占用的资源
/// </summary>
public abstract void Dispose();
}
/// <summary>
/// 事件流完成结果,区分成功/失败状态
/// </summary>
public readonly struct Result
{
/// <summary>
/// 是否为失败状态
/// </summary>
public bool IsFailure { get; }
/// <summary>
/// 失败时的异常对象(成功时为 null)
/// </summary>
public Exception? Exception { get; }
/// <summary>
/// 创建成功状态的 Result
/// </summary>
/// <returns>成功状态的 Result 实例</returns>
public static Result Success() => new Result(false, null);
/// <summary>
/// 创建失败状态的 Result
/// </summary>
/// <param name="exception">失败对应的异常</param>
/// <returns>失败状态的 Result 实例</returns>
public static Result Failure(Exception exception) => new Result(true, exception);
private Result(bool isFailure, Exception? exception)
{
IsFailure = isFailure;
Exception = exception;
}
}
}
2.2 关键差异点
特性 | 传统 Rx | R3 |
---|---|---|
异常处理 | OnError 触发后自动取消订阅 | OnErrorResume 触发后不取消订阅,需手动控制 |
完成通知 | OnCompleted(成功)+ OnError(失败) | OnCompleted (Result) 合并两种状态 |
接口类型 | 接口(IObservable/IObserver) | 抽象类(Observable/Observer) |
订阅跟踪 | 无原生支持 | 内置订阅列表,支持泄漏检测 |
3. 订阅管理与内存泄漏防护
订阅泄漏是长生命周期应用(如 GUI、游戏)的常见问题。R3 通过以下设计解决该问题:
- 观察者与订阅绑定:订阅时,Observer 会自动与目标 Observable 关联,并作为 Subscription 实例(避免传统 Rx 中额外的 IDisposable 分配)。
- 全链路订阅跟踪:Observer 从上游到下游形成可靠的链路,确保 OnCompleted/Dispose 时能释放所有资源。
- ObservableTracker 工具:可启用订阅跟踪功能,查看所有活跃订阅的状态(如创建时间、调用栈),便于定位泄漏。
csharp
using R3;
using System;
// 启用订阅跟踪(默认关闭)
ObservableTracker.EnableTracking = true;
// 启用调用栈捕获(性能损耗较高,调试时使用)
ObservableTracker.EnableStackTrace = true;
// 创建一个示例订阅
using var subscription = Observable.Interval(TimeSpan.FromSeconds(1))
.Where(x => true)
.Take(10000)
.Subscribe();
// 遍历所有活跃订阅,打印状态
ObservableTracker.ForEachActiveTask(trackingState =>
{
Console.WriteLine($"跟踪ID: {trackingState.TrackingId}");
Console.WriteLine($"类型: {trackingState.FormattedType}");
Console.WriteLine($"创建时间: {trackingState.AddTime}");
Console.WriteLine($"调用栈: {trackingState.StackTrace}");
Console.WriteLine("------------------------");
});
核心功能特性
1. TimeProvider 替代 IScheduler
传统 Rx 的 IScheduler 存在性能问题且设计复杂,R3 改用 .NET 8 引入的 TimeProvider 作为时间抽象,支持以下能力:
- 异步时间操作:通过
TimeProvider.CreateTimer()
、TimeProvider.GetTimestamp()
实现高效的时间控制。 - 平台适配:默认使用
TimeProvider.System
(基于线程池),同时提供平台专用实现(如 WPF 的DispatcherTimeProvider
、Unity 的UpdateTimeProvider
)。 - 测试友好:支持
FakeTimeProvider
(来自Microsoft.Extensions.TimeProvider.Testing
),便于单元测试中模拟时间流逝。
1.1 时间相关操作符示例
csharp
using R3;
using System;
// 1. 每隔 1 秒发送一个事件(使用默认 TimeProvider)
var interval = Observable.Interval(TimeSpan.FromSeconds(1));
// 2. 延迟 2 秒后发送事件(指定自定义 TimeProvider,如 WPF 的 DispatcherTimeProvider)
var delay = Observable.Return(42)
.Delay(TimeSpan.FromSeconds(2), new DispatcherTimeProvider());
// 3. 防抖操作(300ms 内无新事件则发送最后一个事件)
var debounce = Observable.FromEvent<EventArgs>(
addHandler: h => button.Click += (s, e) => h(e),
removeHandler: h => button.Click -= (s, e) => h(e)
)
.Debounce(TimeSpan.FromMilliseconds(300), TimeProvider.System);
2. 基于帧的操作(FrameProvider)
游戏引擎和 GUI 应用依赖帧循环,R3 引入 FrameProvider 抽象层,提供与帧相关的操作符,支持以下场景:
- 每帧执行:如
EveryUpdate()
每帧发送一个事件。 - 延迟 N 帧执行:如
DelayFrame(5)
延迟 5 帧后发送事件。 - 帧防抖 / 节流:如
DebounceFrame(10)
10 帧内无新事件则发送。
2.1 帧操作符示例
csharp
using R3;
using UnityEngine; // 以 Unity 为例,其他平台类似
// 1. 每帧发送事件(使用 Unity 的 Update 帧循环)
Observable.EveryUpdate(UnityFrameProvider.Update)
.Subscribe(_ =>
{
// 每帧更新角色位置
player.transform.Translate(Vector3.forward * Time.deltaTime);
});
// 2. 延迟 3 帧后执行(例如:玩家死亡后 3 帧显示游戏结束界面)
player.OnDeathAsObservable()
.DelayFrame(3, UnityFrameProvider.Update)
.Subscribe(_ =>
{
UIManager.ShowGameOver();
});
// 3. 帧防抖(避免按钮快速点击触发多次事件)
button.OnClickAsObservable()
.DebounceFrame(2, UnityFrameProvider.Update)
.Subscribe(_ =>
{
// 处理按钮点击(2 帧内多次点击仅触发一次)
SubmitRequest();
});
// 4. 监听属性变化(每帧检查属性值,无 INotifyPropertyChanged 也可使用)
Observable.EveryValueChanged(this, x => x.PlayerHealth, UnityFrameProvider.Update)
.Subscribe(health =>
{
// 更新生命值显示
healthText.Text = $"HP: {health}";
});
3. Subject 与 ReactiveProperty
R3 提供 5 种 Subject 类型,满足不同事件分发场景,且默认在 Dispose 时触发 OnCompleted,确保订阅自动释放。
Subject 类型 | 用途说明 |
---|---|
Subject<T> | 基础事件分发器,无状态,仅分发订阅后的事件。 |
BehaviorSubject<T> | 持有最新值,新订阅者会立即收到当前值。 |
ReactiveProperty<T> | 继承 BehaviorSubject,支持去重(相同值不触发通知),适合数据绑定。 |
ReplaySubject<T> | 缓存指定数量 / 时间范围内的事件,新订阅者会收到历史事件。 |
ReplayFrameSubject<T> | 基于帧缓存事件(如缓存最近 10 帧的事件),适合游戏帧同步场景。 |
3.1 ReactiveProperty 示例(数据绑定与验证)
csharp
using R3;
using System;
using System.ComponentModel.DataAnnotations;
// 1. 响应式模型定义(以游戏角色为例)
public class Enemy
{
// 生命值(ReactiveProperty 自动去重,值变化时触发通知)
public ReactiveProperty<long> CurrentHp { get; }
// 是否死亡(由 CurrentHp 推导,自动同步状态)
public ReadOnlyReactiveProperty<bool> IsDead { get; }
public Enemy(int initialHp)
{
CurrentHp = new ReactiveProperty<long>(initialHp);
// 从 CurrentHp 派生 IsDead(当生命值 ≤ 0 时为 true)
IsDead = CurrentHp.Select(hp => hp <= 0).ToReadOnlyReactiveProperty();
}
}
// 2. 带验证的 ReactiveProperty(如限制数值范围)
public sealed class ClampedReactiveProperty<T> : ReactiveProperty<T>
where T : IComparable<T>
{
private readonly T _min;
private readonly T _max;
private static readonly IComparer<T> _comparer = Comparer<T>.Default;
// 构造函数:初始化时限制值在 [min, max] 范围内
public ClampedReactiveProperty(T initialValue, T min, T max)
: base(initialValue, EqualityComparer<T>.Default, callOnValueChangeInBaseConstructor: false)
{
_min = min;
_max = max;
// 手动修正初始值(确保符合范围)
OnValueChanging(ref GetValueRef());
}
// 重写值变更前的逻辑,强制限制范围
protected override void OnValueChanging(ref T value)
{
if (_comparer.Compare(value, _min) < 0)
{
value = _min; // 小于最小值时强制设为最小值
}
else if (_comparer.Compare(value, _max) > 0)
{
value = _max; // 大于最大值时强制设为最大值
}
}
}
// 3. XAML 数据绑定(以 WPF 为例)
public class PlayerViewModel
{
// 可写属性(用于绑定输入控件,如 Slider)
public ReactiveProperty<int> Level { get; } = new ReactiveProperty<int>(1);
// 只读属性(用于绑定显示控件,如 TextBlock)
public ReadOnlyReactiveProperty<string> LevelText { get; }
public PlayerViewModel()
{
// 派生 LevelText(格式化为 "等级:X")
LevelText = Level.Select(level => $"等级:{level}").ToReadOnlyReactiveProperty();
}
}
3.2 Subject 自动释放示例
R3 的 Subject 在 Dispose 时会自动触发 OnCompleted,确保所有订阅者收到完成通知并释放资源:
csharp
using R3;
using System;
var subject = new Subject<string>();
// 订阅 Subject
var subscription = subject.Subscribe(
onNext: msg => Console.WriteLine($"收到消息:{msg}"),
onCompleted: result => Console.WriteLine(result.IsFailure
? $"失败:{result.Exception.Message}"
: "成功完成")
);
// 发送消息
subject.OnNext("Hello R3");
// 释放 Subject(会触发 OnCompleted)
subject.Dispose();
// 此时订阅已自动取消,后续发送消息不会被接收
subject.OnNext("This message is ignored");
4. 灵活的 Disposable 管理
R3 提供多种 Disposable 组合方式,满足不同场景的性能与灵活性需求,性能从高到低排序如下:
- `Disposable.Combine(d1, d2,..., d8)
:适用于已知数量(≤8个)的订阅,内部使用字段存储,性能最优。 2.
Disposable.CreateBuilder():适用于动态数量但构建时已知的订阅,Builder 为值类型,无内存分配。 3.
Disposable.Combine(params IDisposable[]):适用于动态数量的订阅,内部使用数组存储。 4.
DisposableBag:适用于动态添加的订阅,轻量级值类型,非线程安全。 5.
CompositeDisposable`:支持动态添加 / 移除,线程安全,但性能最低。
4.1 各 Disposable 用法示例
csharp
using R3;
using System;
using System.Reactive.Disposables;
// 1. Disposable.Combine(已知3个订阅,性能最优)
public class Example1
{
private IDisposable _disposables;
public void Initialize()
{
var d1 = Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Update 1"));
var d2 = Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Update 2"));
var d3 = Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Update 3"));
// 组合3个订阅,Dispose时会释放所有
_disposables = Disposable.Combine(d1, d2, d3);
}
public void Cleanup()
{
_disposables?.Dispose();
}
}
// 2. Disposable.CreateBuilder(动态添加但构建时确定数量)
public class Example2
{
private IDisposable _disposables;
public void Initialize()
{
var builder = Disposable.CreateBuilder();
// 动态添加订阅(数量在构建时确定)
Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Builder 1")).AddTo(ref builder);
Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Builder 2")).AddTo(ref builder);
Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Builder 3")).AddTo(ref builder);
// 构建组合订阅
_disposables = builder.Build();
}
public void Cleanup()
{
_disposables?.Dispose();
}
}
// 3. DisposableBag(动态添加,非线程安全)
public class Example3
{
// DisposableBag 是值类型,无需 new,也不要拷贝
private DisposableBag _disposableBag;
public void Initialize()
{
// 初始添加订阅
Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Bag 1")).AddTo(ref _disposableBag);
Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Bag 2")).AddTo(ref _disposableBag);
}
// 动态添加(如按钮点击时)
public void OnButtonClick()
{
Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Dynamic Bag")).AddTo(ref _disposableBag);
}
public void Cleanup()
{
_disposableBag.Dispose();
}
}
// 4. CompositeDisposable(支持移除,线程安全)
public class Example4
{
private CompositeDisposable _compositeDisposable = new CompositeDisposable();
private IDisposable _dynamicSubscription;
public void Initialize()
{
// 添加固定订阅
_compositeDisposable.Add(Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Composite 1")));
_compositeDisposable.Add(Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Composite 2")));
// 保存动态订阅,后续可移除
_dynamicSubscription = Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Dynamic Composite"));
_compositeDisposable.Add(_dynamicSubscription);
}
// 移除动态订阅
public void RemoveDynamicSubscription()
{
_compositeDisposable.Remove(_dynamicSubscription);
}
public void Cleanup()
{
_compositeDisposable.Dispose();
}
}
5. 与 async/await 深度集成
R3 针对 async/await 做了专门优化,支持异步操作符(如 SelectAwait
、WhereAwait
),并提供灵活的异步执行策略(如串行、并行、丢弃、切换)。
5.1 异步操作符示例(避免按钮重复点击)
csharp
using R3;
using System;
using System.Threading;
using System.Threading.Tasks;
using UnityEngine;
using UnityEngine.Networking;
// 按钮点击后发起网络请求,使用 AwaitOperation.Drop 避免重复请求
public class AsyncExample : MonoBehaviour
{
public Button RequestButton;
public Text ResultText;
private void Start()
{
RequestButton.OnClickAsObservable()
// 异步转换:发起网络请求
.SelectAwait(async (_, ct) =>
{
// 使用 UnityWebRequest 发起请求,并支持取消
using var request = UnityWebRequest.Get("https://api.example.com/data");
var operation = request.SendWebRequest();
// 绑定取消令牌(当订阅取消或完成时取消请求)
using (ct.Register(() => operation.Abort()))
{
await operation;
}
if (request.result != UnityWebRequest.Result.Success)
{
throw new Exception($"请求失败:{request.error}");
}
return request.downloadHandler.text;
},
// 执行策略:当异步操作未完成时,丢弃新的点击事件
awaitOperation: AwaitOperation.Drop,
// 订阅完成时取消正在进行的异步操作
cancelOnCompleted: true)
// 订阅结果并更新UI
.Subscribe(
result => ResultText.text = $"结果:{result}",
error => ResultText.text = $"错误:{error.Message}"
)
// 将订阅绑定到当前 MonoBehaviour,销毁时自动释放
.AddTo(this);
}
}
5.2 异步执行策略(AwaitOperation)
策略 | 行为说明 |
---|---|
Sequential | 所有事件排队,下一个事件等待前一个异步操作完成。 |
Drop | 异步操作执行期间,丢弃新的事件(如避免重复点击)。 |
Switch | 新事件到来时,取消前一个未完成的异步操作,立即执行新操作(如搜索联想)。 |
Parallel | 所有事件立即执行异步操作,不限制并发数。 |
SequentialParallel | 异步操作并行执行,但结果按事件顺序传递给下一个操作符。 |
ThrottleFirstLast | 异步操作执行期间,仅保留第一个和最后一个事件(如批量处理)。 |
6. 单元测试支持
R3 提供完善的单元测试工具,支持时间 / 帧模拟,结合 LiveList
可轻松验证事件流结果。
6.1 时间模拟测试(使用 FakeTimeProvider)
csharp
using R3;
using Microsoft.Extensions.TimeProvider.Testing;
using Xunit;
using Shouldly;
public class TimeProviderTests
{
[Fact]
public void Timer_Should_Trigger_After_Delay()
{
// 1. 创建虚假时间提供器
var fakeTime = new FakeTimeProvider();
// 2. 创建事件流:5秒后发送一个事件
var timer = Observable.Timer(TimeSpan.FromSeconds(5), fakeTime);
// 3. 将事件流转换为 LiveList,便于断言
var results = timer.ToLiveList();
// 断言:提前4秒,事件未触发
fakeTime.Advance(TimeSpan.FromSeconds(4));
results.AssertIsNotCompleted();
results.AssertEmpty();
// 断言:再提前1秒(总计5秒),事件触发
fakeTime.Advance(TimeSpan.FromSeconds(1));
results.AssertIsCompleted();
results.AssertEqual([Unit.Default]);
}
}
6.2 帧模拟测试(使用 FakeFrameProvider)
csharp
using R3;
using System;
using Xunit;
using Shouldly;
public class FrameProviderTests
{
[Fact]
public void EveryUpdate_Should_Trigger_Per_Frame()
{
// 1. 创建虚假帧提供器
var fakeFrameProvider = new FakeFrameProvider();
var cts = new CancellationTokenSource();
// 2. 创建事件流:每帧发送当前帧计数
var everyUpdate = Observable.EveryUpdate(fakeFrameProvider, cts.Token)
.Select(_ => fakeFrameProvider.GetFrameCount());
var results = everyUpdate.ToLiveList();
// 断言:初始状态无事件
results.AssertEmpty();
// 断言:推进1帧,收到帧计数0
fakeFrameProvider.Advance();
results.AssertEqual([0]);
// 断言:推进3帧,收到帧计数1、2、3
fakeFrameProvider.Advance(3);
results.AssertEqual([0, 1, 2, 3]);
// 断言:取消令牌,事件流完成
cts.Cancel();
results.AssertIsCompleted();
// 断言:继续推进帧,无新事件
fakeFrameProvider.Advance();
results.AssertEqual([0, 1, 2, 3]);
}
}
// 测试辅助扩展方法
public static class LiveListExtensions
{
public static void AssertEqual<T>(this LiveList<T> list, params T[] expected)
{
list.ShouldBe(expected);
}
public static void AssertEmpty<T>(this LiveList<T> list)
{
list.Count.ShouldBe(0);
}
public static void AssertIsCompleted<T>(this LiveList<T> list)
{
list.IsCompleted.ShouldBeTrue();
}
public static void AssertIsNotCompleted<T>(this LiveList<T> list)
{
list.IsCompleted.ShouldBeFalse();
}
}
平台支持
R3 支持多种 .NET 平台,核心库无需额外配置即可使用,平台专用扩展需安装对应 NuGet 包,并替换 TimeProvider
/FrameProvider
以适配平台特性。
1. 平台支持列表
平台 | NuGet 包名称 | 核心适配点 |
---|---|---|
WPF | R3Extensions.WPF | 提供 WpfDispatcherTimeProvider (UI 线程时间)、WpfRenderingFrameProvider (渲染帧)。 |
Avalonia | R3Extensions.Avalonia | 提供 AvaloniaDispatcherTimeProvider 、AvaloniaRenderingFrameProvider (基于渲染事件)。 |
Uno | R3Extensions.Uno | 提供 UnoDispatcherTimeProvider 、UnoRenderingFrameProvider (跨平台 UI 适配)。 |
MAUI | R3Extensions.Maui | 提供 MauiDispatcherTimeProvider 、MauiTickerFrameProvider (基于 Ticker 帧循环)。 |
WinForms | R3Extensions.WinForms | 提供 WinFormsTimeProvider 、WinFormsFrameProvider (基于消息循环)。 |
WinUI3 | R3Extensions.WinUI3 | 提供 WinUI3DispatcherTimeProvider 、WinUI3RenderingFrameProvider 。 |
Unity | R3 + R3.Unity | 提供 UnityTimeProvider (支持 Update/FixedUpdate 等生命周期)、UnityFrameProvider 。 |
Godot | R3 + R3.Godot | 提供 GodotTimeProvider (Process/PhysicsProcess)、GodotFrameProvider 。 |
Stride | R3Extensions.Stride | 提供 StrideTimeProvider 、StrideFrameProvider (游戏引擎帧循环)。 |
MonoGame | R3Extensions.MonoGame | 提供 MonoGameTimeProvider 、MonoGameFrameProvider (基于 Game.Update)。 |
LogicLooper | R3Extensions.LogicLooper | 提供 LogicLooperTimeProvider 、LogicLooperFrameProvider (循环逻辑适配)。 |
Blazor | R3Extensions.Blazor | 适配 Blazor 同步上下文,避免 UI 线程阻塞。 |
2. 平台初始化示例(以 Unity 为例)
Unity 平台需安装两个包:核心 R3
和 Unity 专用扩展 R3.Unity
,步骤如下:
- 通过 NuGetForUnity 安装
R3
(搜索 “R3” 并安装)。 - 引用 Git 地址安装
R3.Unity
:https://github.com/Cysharp/R3.git?path=src/R3.Unity/Assets/R3.Unity
。 - (可选)指定版本:
https://github.com/Cysharp/R3.git?path=src/R3.Unity/Assets/R3.Unity#1.0.0
。
2.1 Unity 平台核心用法
csharp
using R3;
using R3.Unity;
using UnityEngine;
using UnityEngine.UI;
public class UnityExample : MonoBehaviour
{
public Button AttackButton;
public Text HpText;
public Slider HpSlider;
// 响应式属性:角色生命值(初始1000)
private ReactiveProperty<long> _currentHp = new ReactiveProperty<long>(1000);
// 派生属性:是否死亡(生命值 ≤ 0)
private ReadOnlyReactiveProperty<bool> _isDead;
private void Awake()
{
// 初始化派生属性
_isDead = _currentHp.Select(hp => hp <= 0).ToReadOnlyReactiveProperty();
}
private void Start()
{
// 1. 绑定生命值到 UI(自动同步更新)
_currentHp.Subscribe(hp =>
{
HpText.text = $"HP: {hp}";
HpSlider.value = hp;
}).AddTo(this);
// 2. 绑定死亡状态到按钮(死亡后禁用攻击按钮)
_isDead.Subscribe(isDead =>
{
AttackButton.interactable = !isDead;
if (isDead)
{
HpText.text = "已死亡";
}
}).AddTo(this);
// 3. 绑定按钮点击事件(每点击一次减少100生命值)
AttackButton.OnClickAsObservable()
.Subscribe(_ => _currentHp.Value -= 100)
.AddTo(this);
// 4. 每帧检测生命值变化(无 INotifyPropertyChanged 也可使用)
Observable.EveryValueChanged(this, x => x.transform.position, UnityFrameProvider.Update)
.Subscribe(pos => Debug.Log($"当前位置: {pos}"))
.AddTo(this);
// 5. 固定帧执行(如物理逻辑)
Observable.EveryUpdate(UnityFrameProvider.FixedUpdate)
.Subscribe(_ =>
{
// 物理相关逻辑(如碰撞检测)
})
.AddTo(this);
}
}
与传统 Rx 的兼容性
R3 提供与 IObservable<T>
(传统 Rx)的双向转换,便于逐步迁移现有项目。
1. 转换方法
转换方向 | 方法 | 说明 |
---|---|---|
IObservable<T> → R3.Observable<T> | ToObservable() (扩展方法) |
将传统 Rx 的可观察对象转换为 R3 的 Observable<T>。 |
R3.Observable<T> → IObservable<T> | AsSystemObservable() (扩展方法) |
将 R3 的 Observable<T> 转换为传统 Rx 的 IObservable<T>。 |
1.1 转换示例
csharp
using R3;
using System.Reactive.Linq;
using System.Reactive.Subjects;
public class CompatibilityExample
{
public void ConvertFromSystemRx()
{
// 1. 创建传统 Rx 的 Subject
var systemSubject = new Subject<int>();
// 2. 转换为 R3 的 Observable<int>
var r3Observable = systemSubject.ToObservable();
// 3. 使用 R3 的操作符处理
r3Observable
.Where(x => x % 2 == 0)
.Delay(TimeSpan.FromSeconds(1))
.Subscribe(x => Console.WriteLine($"R3 处理结果:{x}"));
// 4. 发送事件(传统 Rx 侧发送,R3 侧接收)
systemSubject.OnNext(1); // 被过滤(奇数)
systemSubject.OnNext(2); // 被 R3 接收并延迟1秒输出
}
public void ConvertToSystemRx()
{
// 1. 创建 R3 的 Observable
var r3Observable = Observable.Interval(TimeSpan.FromSeconds(1));
// 2. 转换为传统 Rx 的 IObservable<Unit>
var systemObservable = r3Observable.AsSystemObservable();
// 3. 使用传统 Rx 的操作符处理
systemObservable
.Take(5)
.Subscribe(
_ => Console.WriteLine("传统 Rx 接收事件"),
() => Console.WriteLine("传统 Rx 事件流完成")
);
}
}
许可证
R3 基于 MIT 许可证