Nestjs框架: RxJS 核心方法实践与错误处理详解

发布于:2025-07-20 ⋅ 阅读:(13) ⋅ 点赞:(0)

RxJS 核心方法实践与错误处理详解

  • 围绕 RxJS 的核心方法进行实操
  • 涵盖 observable 的构建、定时器的使用、错误处理机制(如 retrycatchError)、以及 deferlastValueFrom 的使用场景

1 )RxJS 基础实践:Timer 的使用

  • 我们首先在项目中使用 RxJS 的 Observabletimer 方法来进行实际操作

  • 由于项目中已通过 nestjs 依赖引入了 RxJS,因此无需额外安装

  • 我们新建一个名为 index.mjs 的文件,并在其中导入以下内容:

    import { Observable, timer } from 'rxjs';
    
    const exampleTimer = timer(2000); // 2秒后触发
    
    exampleTimer.subscribe({
      next: (value) => console.log('timer emitted', value),
      complete: () => console.log('timer complete')
    });
    
  • 这段代码会在两秒后输出 timer emitted 0,并在完成后打印 timer complete

  • timer 操作符支持两种参数模式,不同模式下的值发射逻辑如下

    参数形式 功能描述 发射值序列示例
    timer(delay: number) 仅延迟执行:延迟 delay 毫秒后发射第一个值,随后完成。 [0](2秒后发射 0,立即触发 complete)
    timer(delay: number, period: number) 延迟+周期执行:延迟 delay 毫秒后发射第一个值,之后每 period 毫秒发射下一个值(无限序列,除非主动取消订阅)。 [0, 1, 2, 3…](2秒后发射 0,之后每1秒发射1、2、3…)
  • 关键结论:无论是否设置周期,timer 发射的第一个值始终为 0,后续值按“当前发射次数-1”的规则递增(即第1次发射0,第2次发射1,以此类推)

  • 执行流程解析:

    • 创建 Observable:timer(2000) 创建一个 Observable,设置“延迟2秒后发射值”
    • 订阅与等待:调用 subscribe 后,Observable 开始计时,2秒后执行发射逻辑
    • 发射第一个值:由于未设置周期参数,timer 仅发射一次值 0,触发 next 回调,故 value 为 0
    • 完成 Observable:发射值后立即触发 complete 回调,整个流程结束
  • 若修改代码为带周期的 timer(如 timer(2000, 1000)),则输出结果将变为:

    // 代码:timer(2000, 1000) 
    // 输出顺序: 
    // 2秒后:timeremitted 0 
    // 3秒后:timeremitted 1(2+1秒) 
    // 4秒后:timeremitted 2(3+1秒) 
    // ...(无限递增,直至取消订阅) 
    
  • timer 操作符的设计初衷是提供“基于时间的序列发射能力”,其发射的 value 本质是“计时周期的索引”:

    • 当仅用于延迟执行时(单参数),索引仅为 0(表示“第一个周期”)
    • 当用于周期性任务时(双参数),索引从 0 开始递增,标识“第 n 个周期”
  • 因此,用户代码中 value 为 0 是 timer 操作符的默认行为,与延迟时间无关,仅由其内部值生成逻辑决定

2 )错误处理:Retry 与 CatchError 的配合使用

接下来我们演示如何使用 RxJS 提供的 错误处理 API,包括 throwErrorretrycatchError

import { Observable, retry, catchError, throwError } from 'rxjs';

let count = 0;
// 我们定义一个 observable,主动抛出错误
const errorObservable = new Observable((subscriber) => {
	console.log('retry: ', count ++);
    subscriber.error(new Error('this is an error'));
});

errorObservable.pipe(
  retry(3),
  catchError((err) => {
    console.log('caught error:', err.message);
    return throwError(() => new Error('Error after retries'));
  })
).subscribe({
  error: (error) => console.log('subscribe error final:', error?.message ?? error)
});

输出结果

retry:  0
retry:  1
retry:  2
retry:  3
caught error: this is an error
subscribe error final: Error after retries

通过 pipe 方法结合 retry(3)catchError 实现重试三次并捕获最终错误:

  • 在测试过程中,“retry”被执行了4次,这是因为第一次执行也算作一次尝试,之后才进行三次重试
  • 这一机制非常适用于网络请求或数据库连接等需要自动重试的场景

3 )延迟创建:使用 Defer 与 LastValueFrom 获取最终值

  • 我们将使用 deferlastValueFrom 来演示延迟创建 observable 并获取其最后一个值

    import { Observable, defer, lastValueFrom } from 'rxjs';
     
    const deferredObservable = defer(() => {
      console.log('observable created');
      return new Observable((subscriber) => {
        subscriber.next('hello1');
        subscriber.next('hello2');
        subscriber.next('hello3');
        subscriber.complete();
      });
    });
    
    async function getDeferredValue() {
      const result = await lastValueFrom(deferredObservable);
      console.log('deferred value:', result);
    }
    
    getDeferredValue();
    
  • 运行后,控制台将输出:

    observable created
    deferred value: hello3
    
  • lastValueFrom 的作用是将 observable 转换为 promise,并返回其最后一个值

  • 它会自动订阅该 observable 并等待其完成

4 ) 结合 Timer 实现延迟输出

  • 示例 让 hello3 在两秒后输出
    import { Observable, defer, lastValueFrom, timer } from 'rxjs';
    	 
    const deferredObservable = defer(() => {
      console.log('observable created');
      return new Observable((subscriber) => {
        subscriber.next('hello1');
        subscriber.next('hello2');
        timer(2000).subscribe(() => {
    	  subscriber.next('hello 3');
    	  subscriber.complete();
    	});
      });
    });
    
    async function getDeferredValue() {
      const result = await lastValueFrom(deferredObservable);
      console.log('deferred value:', result);
    }
    
    getDeferredValue();
    
  • 运行后,observable created 会立即输出,而 deferred value: hello 3 将在两秒后打印

总结

  • 通过以上示例,实现了 RxJS 的几个核心方法:
      1. Timer:用于定时触发 observable 的响应
      1. Retry 与 CatchError:实现错误自动重试及最终错误捕获
      1. Defer 与 LastValueFrom:延迟创建 observable 并获取其最终值
  • 这些方法在实际开发中具有广泛的应用价值
  • 例如在处理网络请求、数据库连接、异步任务调度等场景时
  • 能够极大地提升代码的健壮性与可维护性
  • 建议结合官方 RxJS 文档,进一步深入理解这些操作符的原理与使用技巧

网站公告

今日签到

点亮在社区的每一天
去签到