R3:适用于 .NET 的新一代响应式扩展库,事件订阅流

发布于:2025-09-02 ⋅ 阅读:(13) ⋅ 点赞:(0)

R3:适用于 .NET 的新一代响应式扩展库

R3 是 dotnet/reactive(.NET 官方响应式扩展)与 UniRx(适用于 Unity 的响应式扩展)的新一代替代方案,支持多种平台,包括 Unity、Godot、Avalonia、WPF、WinForms、WinUI3、Stride、LogicLooper、MAUI、MonoGame、Blazor 和 Uno。

我拥有超过 10 年的 Rx 使用经验,不仅为游戏引擎实现过自定义 Rx 运行时(UniRx),还为游戏引擎开发过异步运行时(UniTask)。基于这些经验,我认为有必要为 .NET 实现一款全新的响应式扩展库,它既要能体现现代 C# 的特性,又要回归 Rx 的核心价值。

核心设计理念

R3 的设计源于对传统 Rx 局限性的反思,核心理念如下:

  1. 在 OnError 处中断管道是错误设计:传统 Rx 中,管道遇到异常会触发 OnError 并自动取消订阅,这在事件处理场景中过于严苛,且难以通过 Retry 等操作符优雅恢复。
  2. IScheduler 是性能瓶颈的根源:传统 Rx 的 IScheduler 抽象层导致性能损耗,且内部实现复杂(如通过 PeriodicTimer 和 IStopwatch 规避问题)。
  3. 基于帧的操作是游戏引擎的关键需求:传统 Rx 缺乏帧循环适配能力,而游戏引擎(如 Unity、Godot)中,基于帧的事件处理(如每帧更新、延迟 N 帧执行)至关重要。
  4. 单一异步操作应完全交给 async/await:Rx 应专注于事件流处理,而非替代 async/await 处理单个异步任务(如网络请求、文件读写)。
  5. 不实现同步 API:同步操作无需 Rx 抽象,避免冗余设计。
  6. 查询语法(Query Syntax)仅适用于 SQL:C# 的 LINQ 查询语法(如 from x in xs select x)在 Rx 中使用体验不佳,应优先使用方法链语法。
  7. 订阅列表是防止内存泄漏的必要手段:需提供类似 “并行调试器” 的订阅跟踪能力,解决 GUI 或游戏等长生命周期应用的内存泄漏问题。
  8. 背压(Backpressure)交给 IAsyncEnumerable 和 Channels:Rx 无需重复实现背压机制,可复用 .NET 原生组件。
  9. 分布式处理与查询有更优方案:GraphQL、Kubernetes、Orleans、Akka.NET、gRPC、MagicOnion 等工具更适合分布式场景,Rx 应专注于内存内消息处理(LINQ to Events)

与传统 Rx 的核心差异

为解决 dotnet/reactive 的不足,R3 对核心接口进行了重构。近年来,Kotlin Flow、Swift Combine 等面向现代语言特性的 Rx 类框架已成为标准,而 C# 也已演进至 C# 12,因此 R3 旨在打造一款与最新 C# 特性对齐的响应式扩展库。

1. 性能优化

性能提升是 R3 重构的核心目标之一,以下是关键优化点:

  • 移除 IScheduler 带来的性能提升:传统 Rx 的 IScheduler 导致大量冗余计算,R3 改用 .NET 8 引入的 TimeProvider,性能显著提升(如下图,Observable.Range(1, 10000).Subscribe() 的执行效率对比)。
  • Subject 订阅 / 取消的内存优化:传统 Rx 的 Subject 使用 ImmutableArray 存储订阅者,每次添加 / 删除订阅都会分配新数组,导致内存频繁波动。R3 采用自定义数据结构,避免 ImmutableArray 带来的性能损耗(如下图,10000 次 subject.Subscribe() + 10000 次 subscription.Dispose() 的内存分配对比)。

2. 核心接口重构

R3 的表面 API 与传统 Rx 保持一致(便于迁移),但内部接口完全重构,不再依赖 IObservable<T>/IObserver<T>

2.1 新接口定义

传统 Rx 的 IObservable<T>/IObserver<T> 虽在理论上优雅(与 IEnumerable<T> 对偶),但实际使用中存在诸多限制。R3 改用抽象类实现,确保行为可控:

csharp

using System;

namespace R3
{
    /// <summary>
    /// 可观察对象抽象类,替代传统 IObservable<T>
    /// </summary>
    /// <typeparam name="T">事件流中数据的类型</typeparam>
    public abstract class Observable<T>
    {
        /// <summary>
        /// 订阅可观察对象,返回可取消订阅的 IDisposable
        /// </summary>
        /// <param name="observer">观察者对象,用于接收事件</param>
        /// <returns>用于取消订阅的 IDisposable 实例</returns>
        public abstract IDisposable Subscribe(Observer<T> observer);
    }

    /// <summary>
    /// 观察者抽象类,替代传统 IObserver<T>,并实现 IDisposable 用于资源释放
    /// </summary>
    /// <typeparam name="T">接收数据的类型</typeparam>
    public abstract class Observer<T> : IDisposable
    {
        /// <summary>
        /// 接收正常事件数据
        /// </summary>
        /// <param name="value">事件数据</param>
        public abstract void OnNext(T value);

        /// <summary>
        /// 接收异常事件(不会中断订阅)
        /// 区别于传统 Rx 的 OnError,此处异常不会自动取消订阅,需手动处理
        /// </summary>
        /// <param name="error">异常对象</param>
        public abstract void OnErrorResume(Exception error);

        /// <summary>
        /// 接收事件流完成通知(成功/失败)
        /// 合并传统 Rx 的 OnCompleted 和 OnError,用 Result 表示完成状态
        /// </summary>
        /// <param name="result">完成结果,包含成功/失败状态</param>
        public abstract void OnCompleted(Result result);

        /// <summary>
        /// 释放观察者占用的资源
        /// </summary>
        public abstract void Dispose();
    }

    /// <summary>
    /// 事件流完成结果,区分成功/失败状态
    /// </summary>
    public readonly struct Result
    {
        /// <summary>
        /// 是否为失败状态
        /// </summary>
        public bool IsFailure { get; }

        /// <summary>
        /// 失败时的异常对象(成功时为 null)
        /// </summary>
        public Exception? Exception { get; }

        /// <summary>
        /// 创建成功状态的 Result
        /// </summary>
        /// <returns>成功状态的 Result 实例</returns>
        public static Result Success() => new Result(false, null);

        /// <summary>
        /// 创建失败状态的 Result
        /// </summary>
        /// <param name="exception">失败对应的异常</param>
        /// <returns>失败状态的 Result 实例</returns>
        public static Result Failure(Exception exception) => new Result(true, exception);

        private Result(bool isFailure, Exception? exception)
        {
            IsFailure = isFailure;
            Exception = exception;
        }
    }
}
2.2 关键差异点
特性 传统 Rx R3
异常处理 OnError 触发后自动取消订阅 OnErrorResume 触发后不取消订阅,需手动控制
完成通知 OnCompleted(成功)+ OnError(失败) OnCompleted (Result) 合并两种状态
接口类型 接口(IObservable/IObserver) 抽象类(Observable/Observer)
订阅跟踪 无原生支持 内置订阅列表,支持泄漏检测

3. 订阅管理与内存泄漏防护

订阅泄漏是长生命周期应用(如 GUI、游戏)的常见问题。R3 通过以下设计解决该问题:

  • 观察者与订阅绑定:订阅时,Observer 会自动与目标 Observable 关联,并作为 Subscription 实例(避免传统 Rx 中额外的 IDisposable 分配)。
  • 全链路订阅跟踪:Observer 从上游到下游形成可靠的链路,确保 OnCompleted/Dispose 时能释放所有资源。
  • ObservableTracker 工具:可启用订阅跟踪功能,查看所有活跃订阅的状态(如创建时间、调用栈),便于定位泄漏。

csharp

using R3;
using System;

// 启用订阅跟踪(默认关闭)
ObservableTracker.EnableTracking = true;
// 启用调用栈捕获(性能损耗较高,调试时使用)
ObservableTracker.EnableStackTrace = true;

// 创建一个示例订阅
using var subscription = Observable.Interval(TimeSpan.FromSeconds(1))
    .Where(x => true)
    .Take(10000)
    .Subscribe();

// 遍历所有活跃订阅,打印状态
ObservableTracker.ForEachActiveTask(trackingState =>
{
    Console.WriteLine($"跟踪ID: {trackingState.TrackingId}");
    Console.WriteLine($"类型: {trackingState.FormattedType}");
    Console.WriteLine($"创建时间: {trackingState.AddTime}");
    Console.WriteLine($"调用栈: {trackingState.StackTrace}");
    Console.WriteLine("------------------------");
});

核心功能特性

1. TimeProvider 替代 IScheduler

传统 Rx 的 IScheduler 存在性能问题且设计复杂,R3 改用 .NET 8 引入的 TimeProvider 作为时间抽象,支持以下能力:

  • 异步时间操作:通过 TimeProvider.CreateTimer()TimeProvider.GetTimestamp() 实现高效的时间控制。
  • 平台适配:默认使用 TimeProvider.System(基于线程池),同时提供平台专用实现(如 WPF 的 DispatcherTimeProvider、Unity 的 UpdateTimeProvider)。
  • 测试友好:支持 FakeTimeProvider(来自 Microsoft.Extensions.TimeProvider.Testing),便于单元测试中模拟时间流逝。
1.1 时间相关操作符示例

csharp

using R3;
using System;

// 1. 每隔 1 秒发送一个事件(使用默认 TimeProvider)
var interval = Observable.Interval(TimeSpan.FromSeconds(1));

// 2. 延迟 2 秒后发送事件(指定自定义 TimeProvider,如 WPF 的 DispatcherTimeProvider)
var delay = Observable.Return(42)
    .Delay(TimeSpan.FromSeconds(2), new DispatcherTimeProvider());

// 3. 防抖操作(300ms 内无新事件则发送最后一个事件)
var debounce = Observable.FromEvent<EventArgs>(
        addHandler: h => button.Click += (s, e) => h(e),
        removeHandler: h => button.Click -= (s, e) => h(e)
    )
    .Debounce(TimeSpan.FromMilliseconds(300), TimeProvider.System);

2. 基于帧的操作(FrameProvider)

游戏引擎和 GUI 应用依赖帧循环,R3 引入 FrameProvider 抽象层,提供与帧相关的操作符,支持以下场景:

  • 每帧执行:如 EveryUpdate() 每帧发送一个事件。
  • 延迟 N 帧执行:如 DelayFrame(5) 延迟 5 帧后发送事件。
  • 帧防抖 / 节流:如 DebounceFrame(10) 10 帧内无新事件则发送。
2.1 帧操作符示例

csharp

using R3;
using UnityEngine; // 以 Unity 为例,其他平台类似

// 1. 每帧发送事件(使用 Unity 的 Update 帧循环)
Observable.EveryUpdate(UnityFrameProvider.Update)
    .Subscribe(_ =>
    {
        // 每帧更新角色位置
        player.transform.Translate(Vector3.forward * Time.deltaTime);
    });

// 2. 延迟 3 帧后执行(例如:玩家死亡后 3 帧显示游戏结束界面)
player.OnDeathAsObservable()
    .DelayFrame(3, UnityFrameProvider.Update)
    .Subscribe(_ =>
    {
        UIManager.ShowGameOver();
    });

// 3. 帧防抖(避免按钮快速点击触发多次事件)
button.OnClickAsObservable()
    .DebounceFrame(2, UnityFrameProvider.Update)
    .Subscribe(_ =>
    {
        // 处理按钮点击(2 帧内多次点击仅触发一次)
        SubmitRequest();
    });

// 4. 监听属性变化(每帧检查属性值,无 INotifyPropertyChanged 也可使用)
Observable.EveryValueChanged(this, x => x.PlayerHealth, UnityFrameProvider.Update)
    .Subscribe(health =>
    {
        // 更新生命值显示
        healthText.Text = $"HP: {health}";
    });

3. Subject 与 ReactiveProperty

R3 提供 5 种 Subject 类型,满足不同事件分发场景,且默认在 Dispose 时触发 OnCompleted,确保订阅自动释放。

Subject 类型 用途说明
Subject<T> 基础事件分发器,无状态,仅分发订阅后的事件。
BehaviorSubject<T> 持有最新值,新订阅者会立即收到当前值。
ReactiveProperty<T> 继承 BehaviorSubject,支持去重(相同值不触发通知),适合数据绑定。
ReplaySubject<T> 缓存指定数量 / 时间范围内的事件,新订阅者会收到历史事件。
ReplayFrameSubject<T> 基于帧缓存事件(如缓存最近 10 帧的事件),适合游戏帧同步场景。
3.1 ReactiveProperty 示例(数据绑定与验证)

csharp

using R3;
using System;
using System.ComponentModel.DataAnnotations;

// 1. 响应式模型定义(以游戏角色为例)
public class Enemy
{
    // 生命值(ReactiveProperty 自动去重,值变化时触发通知)
    public ReactiveProperty<long> CurrentHp { get; }

    // 是否死亡(由 CurrentHp 推导,自动同步状态)
    public ReadOnlyReactiveProperty<bool> IsDead { get; }

    public Enemy(int initialHp)
    {
        CurrentHp = new ReactiveProperty<long>(initialHp);
        // 从 CurrentHp 派生 IsDead(当生命值 ≤ 0 时为 true)
        IsDead = CurrentHp.Select(hp => hp <= 0).ToReadOnlyReactiveProperty();
    }
}

// 2. 带验证的 ReactiveProperty(如限制数值范围)
public sealed class ClampedReactiveProperty<T> : ReactiveProperty<T> 
    where T : IComparable<T>
{
    private readonly T _min;
    private readonly T _max;
    private static readonly IComparer<T> _comparer = Comparer<T>.Default;

    // 构造函数:初始化时限制值在 [min, max] 范围内
    public ClampedReactiveProperty(T initialValue, T min, T max)
        : base(initialValue, EqualityComparer<T>.Default, callOnValueChangeInBaseConstructor: false)
    {
        _min = min;
        _max = max;
        // 手动修正初始值(确保符合范围)
        OnValueChanging(ref GetValueRef());
    }

    // 重写值变更前的逻辑,强制限制范围
    protected override void OnValueChanging(ref T value)
    {
        if (_comparer.Compare(value, _min) < 0)
        {
            value = _min; // 小于最小值时强制设为最小值
        }
        else if (_comparer.Compare(value, _max) > 0)
        {
            value = _max; // 大于最大值时强制设为最大值
        }
    }
}

// 3. XAML 数据绑定(以 WPF 为例)
public class PlayerViewModel
{
    // 可写属性(用于绑定输入控件,如 Slider)
    public ReactiveProperty<int> Level { get; } = new ReactiveProperty<int>(1);

    // 只读属性(用于绑定显示控件,如 TextBlock)
    public ReadOnlyReactiveProperty<string> LevelText { get; }

    public PlayerViewModel()
    {
        // 派生 LevelText(格式化为 "等级:X")
        LevelText = Level.Select(level => $"等级:{level}").ToReadOnlyReactiveProperty();
    }
}
3.2 Subject 自动释放示例

R3 的 Subject 在 Dispose 时会自动触发 OnCompleted,确保所有订阅者收到完成通知并释放资源:

csharp

using R3;
using System;

var subject = new Subject<string>();

// 订阅 Subject
var subscription = subject.Subscribe(
    onNext: msg => Console.WriteLine($"收到消息:{msg}"),
    onCompleted: result => Console.WriteLine(result.IsFailure 
        ? $"失败:{result.Exception.Message}" 
        : "成功完成")
);

// 发送消息
subject.OnNext("Hello R3");

// 释放 Subject(会触发 OnCompleted)
subject.Dispose();

// 此时订阅已自动取消,后续发送消息不会被接收
subject.OnNext("This message is ignored");

4. 灵活的 Disposable 管理

R3 提供多种 Disposable 组合方式,满足不同场景的性能与灵活性需求,性能从高到低排序如下:

  1. `Disposable.Combine(d1, d2,..., d8):适用于已知数量(≤8个)的订阅,内部使用字段存储,性能最优。 2. Disposable.CreateBuilder():适用于动态数量但构建时已知的订阅,Builder 为值类型,无内存分配。 3. Disposable.Combine(params IDisposable[]):适用于动态数量的订阅,内部使用数组存储。 4. DisposableBag:适用于动态添加的订阅,轻量级值类型,非线程安全。 5. CompositeDisposable`:支持动态添加 / 移除,线程安全,但性能最低。
4.1 各 Disposable 用法示例

csharp

using R3;
using System;
using System.Reactive.Disposables;

// 1. Disposable.Combine(已知3个订阅,性能最优)
public class Example1
{
    private IDisposable _disposables;

    public void Initialize()
    {
        var d1 = Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Update 1"));
        var d2 = Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Update 2"));
        var d3 = Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Update 3"));
        
        // 组合3个订阅,Dispose时会释放所有
        _disposables = Disposable.Combine(d1, d2, d3);
    }

    public void Cleanup()
    {
        _disposables?.Dispose();
    }
}

// 2. Disposable.CreateBuilder(动态添加但构建时确定数量)
public class Example2
{
    private IDisposable _disposables;

    public void Initialize()
    {
        var builder = Disposable.CreateBuilder();
        
        // 动态添加订阅(数量在构建时确定)
        Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Builder 1")).AddTo(ref builder);
        Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Builder 2")).AddTo(ref builder);
        Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Builder 3")).AddTo(ref builder);
        
        // 构建组合订阅
        _disposables = builder.Build();
    }

    public void Cleanup()
    {
        _disposables?.Dispose();
    }
}

// 3. DisposableBag(动态添加,非线程安全)
public class Example3
{
    // DisposableBag 是值类型,无需 new,也不要拷贝
    private DisposableBag _disposableBag;

    public void Initialize()
    {
        // 初始添加订阅
        Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Bag 1")).AddTo(ref _disposableBag);
        Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Bag 2")).AddTo(ref _disposableBag);
    }

    // 动态添加(如按钮点击时)
    public void OnButtonClick()
    {
        Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Dynamic Bag")).AddTo(ref _disposableBag);
    }

    public void Cleanup()
    {
        _disposableBag.Dispose();
    }
}

// 4. CompositeDisposable(支持移除,线程安全)
public class Example4
{
    private CompositeDisposable _compositeDisposable = new CompositeDisposable();
    private IDisposable _dynamicSubscription;

    public void Initialize()
    {
        // 添加固定订阅
        _compositeDisposable.Add(Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Composite 1")));
        _compositeDisposable.Add(Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Composite 2")));
        
        // 保存动态订阅,后续可移除
        _dynamicSubscription = Observable.EveryUpdate().Subscribe(_ => Console.WriteLine("Dynamic Composite"));
        _compositeDisposable.Add(_dynamicSubscription);
    }

    // 移除动态订阅
    public void RemoveDynamicSubscription()
    {
        _compositeDisposable.Remove(_dynamicSubscription);
    }

    public void Cleanup()
    {
        _compositeDisposable.Dispose();
    }
}

5. 与 async/await 深度集成

R3 针对 async/await 做了专门优化,支持异步操作符(如 SelectAwaitWhereAwait),并提供灵活的异步执行策略(如串行、并行、丢弃、切换)。

5.1 异步操作符示例(避免按钮重复点击)

csharp

using R3;
using System;
using System.Threading;
using System.Threading.Tasks;
using UnityEngine;
using UnityEngine.Networking;

// 按钮点击后发起网络请求,使用 AwaitOperation.Drop 避免重复请求
public class AsyncExample : MonoBehaviour
{
    public Button RequestButton;
    public Text ResultText;

    private void Start()
    {
        RequestButton.OnClickAsObservable()
            // 异步转换:发起网络请求
            .SelectAwait(async (_, ct) =>
            {
                // 使用 UnityWebRequest 发起请求,并支持取消
                using var request = UnityWebRequest.Get("https://api.example.com/data");
                var operation = request.SendWebRequest();
                
                // 绑定取消令牌(当订阅取消或完成时取消请求)
                using (ct.Register(() => operation.Abort()))
                {
                    await operation;
                }

                if (request.result != UnityWebRequest.Result.Success)
                {
                    throw new Exception($"请求失败:{request.error}");
                }

                return request.downloadHandler.text;
            }, 
            // 执行策略:当异步操作未完成时,丢弃新的点击事件
            awaitOperation: AwaitOperation.Drop,
            // 订阅完成时取消正在进行的异步操作
            cancelOnCompleted: true)
            // 订阅结果并更新UI
            .Subscribe(
                result => ResultText.text = $"结果:{result}",
                error => ResultText.text = $"错误:{error.Message}"
            )
            // 将订阅绑定到当前 MonoBehaviour,销毁时自动释放
            .AddTo(this);
    }
}
5.2 异步执行策略(AwaitOperation)
策略 行为说明
Sequential 所有事件排队,下一个事件等待前一个异步操作完成。
Drop 异步操作执行期间,丢弃新的事件(如避免重复点击)。
Switch 新事件到来时,取消前一个未完成的异步操作,立即执行新操作(如搜索联想)。
Parallel 所有事件立即执行异步操作,不限制并发数。
SequentialParallel 异步操作并行执行,但结果按事件顺序传递给下一个操作符。
ThrottleFirstLast 异步操作执行期间,仅保留第一个和最后一个事件(如批量处理)。

6. 单元测试支持

R3 提供完善的单元测试工具,支持时间 / 帧模拟,结合 LiveList 可轻松验证事件流结果。

6.1 时间模拟测试(使用 FakeTimeProvider)

csharp

using R3;
using Microsoft.Extensions.TimeProvider.Testing;
using Xunit;
using Shouldly;

public class TimeProviderTests
{
    [Fact]
    public void Timer_Should_Trigger_After_Delay()
    {
        // 1. 创建虚假时间提供器
        var fakeTime = new FakeTimeProvider();
        // 2. 创建事件流:5秒后发送一个事件
        var timer = Observable.Timer(TimeSpan.FromSeconds(5), fakeTime);
        // 3. 将事件流转换为 LiveList,便于断言
        var results = timer.ToLiveList();

        // 断言:提前4秒,事件未触发
        fakeTime.Advance(TimeSpan.FromSeconds(4));
        results.AssertIsNotCompleted();
        results.AssertEmpty();

        // 断言:再提前1秒(总计5秒),事件触发
        fakeTime.Advance(TimeSpan.FromSeconds(1));
        results.AssertIsCompleted();
        results.AssertEqual([Unit.Default]);
    }
}
6.2 帧模拟测试(使用 FakeFrameProvider)

csharp

using R3;
using System;
using Xunit;
using Shouldly;

public class FrameProviderTests
{
    [Fact]
    public void EveryUpdate_Should_Trigger_Per_Frame()
    {
        // 1. 创建虚假帧提供器
        var fakeFrameProvider = new FakeFrameProvider();
        var cts = new CancellationTokenSource();

        // 2. 创建事件流:每帧发送当前帧计数
        var everyUpdate = Observable.EveryUpdate(fakeFrameProvider, cts.Token)
            .Select(_ => fakeFrameProvider.GetFrameCount());
        var results = everyUpdate.ToLiveList();

        // 断言:初始状态无事件
        results.AssertEmpty();

        // 断言:推进1帧,收到帧计数0
        fakeFrameProvider.Advance();
        results.AssertEqual([0]);

        // 断言:推进3帧,收到帧计数1、2、3
        fakeFrameProvider.Advance(3);
        results.AssertEqual([0, 1, 2, 3]);

        // 断言:取消令牌,事件流完成
        cts.Cancel();
        results.AssertIsCompleted();

        // 断言:继续推进帧,无新事件
        fakeFrameProvider.Advance();
        results.AssertEqual([0, 1, 2, 3]);
    }
}

// 测试辅助扩展方法
public static class LiveListExtensions
{
    public static void AssertEqual<T>(this LiveList<T> list, params T[] expected)
    {
        list.ShouldBe(expected);
    }

    public static void AssertEmpty<T>(this LiveList<T> list)
    {
        list.Count.ShouldBe(0);
    }

    public static void AssertIsCompleted<T>(this LiveList<T> list)
    {
        list.IsCompleted.ShouldBeTrue();
    }

    public static void AssertIsNotCompleted<T>(this LiveList<T> list)
    {
        list.IsCompleted.ShouldBeFalse();
    }
}

平台支持

R3 支持多种 .NET 平台,核心库无需额外配置即可使用,平台专用扩展需安装对应 NuGet 包,并替换 TimeProvider/FrameProvider 以适配平台特性。

1. 平台支持列表

平台 NuGet 包名称 核心适配点
WPF R3Extensions.WPF 提供 WpfDispatcherTimeProvider(UI 线程时间)、WpfRenderingFrameProvider(渲染帧)。
Avalonia R3Extensions.Avalonia 提供 AvaloniaDispatcherTimeProviderAvaloniaRenderingFrameProvider(基于渲染事件)。
Uno R3Extensions.Uno 提供 UnoDispatcherTimeProviderUnoRenderingFrameProvider(跨平台 UI 适配)。
MAUI R3Extensions.Maui 提供 MauiDispatcherTimeProviderMauiTickerFrameProvider(基于 Ticker 帧循环)。
WinForms R3Extensions.WinForms 提供 WinFormsTimeProviderWinFormsFrameProvider(基于消息循环)。
WinUI3 R3Extensions.WinUI3 提供 WinUI3DispatcherTimeProviderWinUI3RenderingFrameProvider
Unity R3 + R3.Unity 提供 UnityTimeProvider(支持 Update/FixedUpdate 等生命周期)、UnityFrameProvider
Godot R3 + R3.Godot 提供 GodotTimeProvider(Process/PhysicsProcess)、GodotFrameProvider
Stride R3Extensions.Stride 提供 StrideTimeProviderStrideFrameProvider(游戏引擎帧循环)。
MonoGame R3Extensions.MonoGame 提供 MonoGameTimeProviderMonoGameFrameProvider(基于 Game.Update)。
LogicLooper R3Extensions.LogicLooper 提供 LogicLooperTimeProviderLogicLooperFrameProvider(循环逻辑适配)。
Blazor R3Extensions.Blazor 适配 Blazor 同步上下文,避免 UI 线程阻塞。

2. 平台初始化示例(以 Unity 为例)

Unity 平台需安装两个包:核心 R3 和 Unity 专用扩展 R3.Unity,步骤如下:

  1. 通过 NuGetForUnity 安装 R3(搜索 “R3” 并安装)。
  2. 引用 Git 地址安装 R3.Unityhttps://github.com/Cysharp/R3.git?path=src/R3.Unity/Assets/R3.Unity
  3. (可选)指定版本:https://github.com/Cysharp/R3.git?path=src/R3.Unity/Assets/R3.Unity#1.0.0
2.1 Unity 平台核心用法

csharp

using R3;
using R3.Unity;
using UnityEngine;
using UnityEngine.UI;

public class UnityExample : MonoBehaviour
{
    public Button AttackButton;
    public Text HpText;
    public Slider HpSlider;

    // 响应式属性:角色生命值(初始1000)
    private ReactiveProperty<long> _currentHp = new ReactiveProperty<long>(1000);
    // 派生属性:是否死亡(生命值 ≤ 0)
    private ReadOnlyReactiveProperty<bool> _isDead;

    private void Awake()
    {
        // 初始化派生属性
        _isDead = _currentHp.Select(hp => hp <= 0).ToReadOnlyReactiveProperty();
    }

    private void Start()
    {
        // 1. 绑定生命值到 UI(自动同步更新)
        _currentHp.Subscribe(hp =>
        {
            HpText.text = $"HP: {hp}";
            HpSlider.value = hp;
        }).AddTo(this);

        // 2. 绑定死亡状态到按钮(死亡后禁用攻击按钮)
        _isDead.Subscribe(isDead =>
        {
            AttackButton.interactable = !isDead;
            if (isDead)
            {
                HpText.text = "已死亡";
            }
        }).AddTo(this);

        // 3. 绑定按钮点击事件(每点击一次减少100生命值)
        AttackButton.OnClickAsObservable()
            .Subscribe(_ => _currentHp.Value -= 100)
            .AddTo(this);

        // 4. 每帧检测生命值变化(无 INotifyPropertyChanged 也可使用)
        Observable.EveryValueChanged(this, x => x.transform.position, UnityFrameProvider.Update)
            .Subscribe(pos => Debug.Log($"当前位置: {pos}"))
            .AddTo(this);

        // 5. 固定帧执行(如物理逻辑)
        Observable.EveryUpdate(UnityFrameProvider.FixedUpdate)
            .Subscribe(_ =>
            {
                // 物理相关逻辑(如碰撞检测)
            })
            .AddTo(this);
    }
}

与传统 Rx 的兼容性

R3 提供与 IObservable<T>(传统 Rx)的双向转换,便于逐步迁移现有项目。

1. 转换方法

转换方向 方法 说明
IObservable<T> → R3.Observable<T> ToObservable()(扩展方法) 将传统 Rx 的可观察对象转换为 R3 的 Observable<T>。
R3.Observable<T> → IObservable<T> AsSystemObservable()(扩展方法) 将 R3 的 Observable<T> 转换为传统 Rx 的 IObservable<T>。
1.1 转换示例

csharp

using R3;
using System.Reactive.Linq;
using System.Reactive.Subjects;

public class CompatibilityExample
{
    public void ConvertFromSystemRx()
    {
        // 1. 创建传统 Rx 的 Subject
        var systemSubject = new Subject<int>();

        // 2. 转换为 R3 的 Observable<int>
        var r3Observable = systemSubject.ToObservable();

        // 3. 使用 R3 的操作符处理
        r3Observable
            .Where(x => x % 2 == 0)
            .Delay(TimeSpan.FromSeconds(1))
            .Subscribe(x => Console.WriteLine($"R3 处理结果:{x}"));

        // 4. 发送事件(传统 Rx 侧发送,R3 侧接收)
        systemSubject.OnNext(1); // 被过滤(奇数)
        systemSubject.OnNext(2); // 被 R3 接收并延迟1秒输出
    }

    public void ConvertToSystemRx()
    {
        // 1. 创建 R3 的 Observable
        var r3Observable = Observable.Interval(TimeSpan.FromSeconds(1));

        // 2. 转换为传统 Rx 的 IObservable<Unit>
        var systemObservable = r3Observable.AsSystemObservable();

        // 3. 使用传统 Rx 的操作符处理
        systemObservable
            .Take(5)
            .Subscribe(
                _ => Console.WriteLine("传统 Rx 接收事件"),
                () => Console.WriteLine("传统 Rx 事件流完成")
            );
    }
}

许可证

R3 基于 MIT 许可证


网站公告

今日签到

点亮在社区的每一天
去签到