2020-11-20

RxJS快速入门

内容导航

目录
  • 内容导航
  • RxJS是什么
  • RxJS的主要成员
    • Observable (可观察对象)
      • 创建 Observable
      • 订阅 Observables
      • 执行 Observables
      • 清理 Observable 执行
    • Observer (观察者)
    • Subscription (订阅)
    • Subject (主体)
      • 多播的 Observables
      • BehaviorSubject
      • ReplaySubject
      • AsyncSubject
    • Scheduler (调度器)
      • 调度器类型
    • Pipeable(操作符)
      • 常用的操作符
      • 创建操作符
      • 连接创建操作符
      • 转换操作符
      • 过滤操作符
      • 组合操作符
      • 多播操作符
      • 错误处理操作符
      • 工具操作符
      • 条件和布尔操作符
      • 数学和聚合操作符

RxJS是什么

RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。

可以把 RxJS 当做是用来处理事件的 Lodash 。

ReactiveX 结合了 观察者模式、迭代器模式 和 使用集合的函数式编程,以满足以一种理想方式来管理事件序列所需要的一切。

RxJS的主要成员

  • Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
  • Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
  • Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
  • Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 mapfilterconcatflatMap 等这样的操作符来处理集合。
  • Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
  • Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeoutrequestAnimationFrame 或其他。

Observable (可观察对象)

RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。RxJS 中含有两个基本概念:Observables 与 Observer。Observables 作为被观察者,是一个值或事件的流集合;而 Observer 则作为观察者,根据 Observables 进行处理。Observables 是多个值的惰性推送集合。

  • of():用于创建简单的Observable,该Observable只发出给定的参数,在发送完这些参数后发出完成通知.
  • from():从一个数组、类数组对象、promise、迭代器对象或者类Observable对象创建一个Observable.
  • fromEvent(),:把event转换成Observable.
  • range():在指定起始值返回指定数量数字.
  • interval():基于给定时间间隔发出数字序列。返回一个发出无限自增的序列整数,可以选择固定的时间间隔进行发送。
  • timer():创建一个Observable,该Observable在初始延时之后开始发送并且在每个时间周期后发出自增的数字

创建 Observable

import { Observable,of,from,fromEvent,range,interval } from 'rxjs';import { map } from 'rxjs/operators';	const Observable1 = new Observable(subscriber => { try{   subscriber.next(1);   subscriber.next(2);   subscriber.next(3);   setTimeout(() => {   subscriber.next(4);   subscriber.complete();   }, 1000);  } catch (err) {  	subscriber.error(err);	//传递一个错误对象,如果捕捉到异常的话。 	} }); const Observable2 = from([  { name: 'Dave', age: 34, salary: 2000 },  { name: 'Nick', age: 37, salary: 32000 },  { name: 'Howie', age: 40, salary: 26000 },  { name: 'Brian', age: 40, salary: 30000 },  { name: 'Kevin', age: 47, salary: 24000 }, ]);	const Observable3 = of("Dave","Nick");//把所有参数组合到数组,逐个提供给消费者	const Observable4 = range(1,10);	const Observable5 = interval(3000);//从零开始每3000毫秒自增并提供给消费者	const Observable6 = timer(3000,1000);//等待3000毫秒后,从零开始每1000毫秒自增并提供给消费者

订阅 Observables

因为 Observable 执行可能会是无限的,并且观察者通常希望能在有限的时间内中止执行,所以我们需要一个 API 来取消执行。因为每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力或内存资源。

当调用了 observable.subscribe ,观察者会被附加到新创建的 Observable 执行中。这个调用还返回一个对象,即 Subscription (订阅):

import { Observable,of,from,fromEvent,range,interval } from 'rxjs';	const observable1 = range(1,10); observable1.subscribe(   num => {   console.log(num);   },   err => console.log(err),   () => console.log("Streaming is over.")  );

执行 Observables

Observable 执行可以传递三种类型的值:

  • "Next" 通知: 发送一个值,比如数字、字符串、对象,等等。
  • "Error" 通知: 发送一个 JavaScript 错误 或 异常。
  • "Complete" 通知: 不再发送任何值。

"Next" 通知是最重要,也是最常见的类型:它们表示传递给观察者的实际数据。"Error" 和 "Complete" 通知可能只会在 Observable 执行期间发生一次,并且只会执行其中的一个。

import { Observable,of,from,fromEvent,range,interval } from 'rxjs';import { map } from 'rxjs/operators';	const observable = new Observable(subscriber => { try{   subscriber.next(1);   subscriber.next(2);   subscriber.next(3);   subscriber.complete();   subscriber.next(4); // 因为违反规约,所以不会发送  } catch (err) {  	subscriber.error(err);	//传递一个错误对象,如果捕捉到异常的话。 	} });

清理 Observable 执行

因为 Observable 执行可能会是无限的,并且观察者通常希望能在有限的时间内中止执行,所以我们需要一个 API 来取消执行。因为每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力或内存资源

当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。只要调用 unsubscribe() 方法就可以取消执行。

import { Observable,of,from,fromEvent,range,interval } from 'rxjs'; const observable = new Observable(subscriber => {  let intervalID = setInterval(() => {  subscriber.next('hi');  }, 1000);  // 提供取消和清理 interval 资源的方法  return function unsubscribe() {  clearInterval(intervalID);  }; }); let subscription = observable.subscribe(x => console.log(x)); subscription.unsubscribe();

Observer (观察者)

观察者是由 Observable 发送的值的消费者。观察者只是一组回调函数的集合,每个回调函数对应一种 Observable 发送的通知类型:nexterrorcomplete 。下面的示例是一个典型的观察者对象:

观察者只是有三个回调函数的对象,每个回调函数对应一种 Observable 发送的通知类型。

observable.subscribe( next: x => console.log('Observer got a next value: ' + x), error: err => console.error('Observer got an error: ' + err), complete: () => console.log('Observer got a complete notification'));

Subscription (订阅)

Subscription 是表示可清理资源的对象,通常是 Observable 的执行。Subscription 有一个重要的方法,即 unsubscribe,它不需要任何参数,只是用来清理由 Subscription 占用的资源。在上一个版本的 RxJS 中,Subscription 叫做 "Disposable" (可清理对象)。

Subscription 基本上只有一个 unsubscribe() 函数,这个函数用来释放资源或去取消 Observable 执行。

import { Observable,of,from,fromEvent,range,interval } from 'rxjs';	var observable1 = interval(1000); var subscription1 = observable1.subscribe(x => console.log(x)); // 稍后: // 这会取消正在进行中的 Observable 执行 // Observable 执行是通过使用观察者调用 subscribe 方法启动的 subscription1.unsubscribe(); var observable2 = interval(400); var observable3 = interval(300); var subscription2 = observable2.subscribe(x => console.log('first: ' + x)); var childSubscription = observable3.subscribe(x => console.log('second: ' + x)); subscription2.add(childSubscription); setTimeout(() => {  // subscription 和 childSubscription 都会取消订阅  subscription2.unsubscribe(); }, 1000);

Subject (主体)

RxJS Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者,所以 Subject 是多播的,而普通的 Observables 是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)。

Subject 像是 Observable,但是可以多播给多个观察者。Subject 还像是 EventEmitters,维护着多个监听器的注册表。

还有一些特殊类型的 Subject:BehaviorSubjectReplaySubjectAsyncSubject

每个 Subject 都是 Observable 。 - 对于 Subject,你可以提供一个观察者并使用 subscribe 方法,就可以开始正常接收值。从观察者的角度而言,它无法判断 Observable 执行是来自普通的 Observable 还是 Subject 。

在 Subject 的内部,subscribe 不会调用发送值的新执行。它只是将给定的观察者注册到观察者列表中,类似于其他库或语言中的 addListener 的工作方式。

每个 Subject 都是观察者。 - Subject 是一个有如下方法的对象: next(v)error(e)complete() 。要给 Subject 提供新值,只要调用 next(theValue),它会将值多播给已注册监听该 Subject 的观察者们。

import { Subject,from } from 'rxjs';	//我们为 Subject 添加了两个观察者,然后给 Subject 提供一些值 	var subject1 = new Subject(); subject1.subscribe({  next: (v) => console.log('observerA: ' + v) }); subject1.subscribe({  next: (v) => console.log('observerB: ' + v) }); subject1.next(1); subject1.next(2); //因为 Subject 是观察者,这也就在意味着你可以把 Subject 作为参数传给任何 Observable 的 subscribe 方法 var subject2 =new Subject(); subject2.subscribe({  next: (v) => console.log('observerA: ' + v) }); subject2.subscribe({  next: (v) => console.log('observerB: ' + v) }); var observable = from([1, 2, 3]); observable.subscribe(subject2); // 你可以提供一个 Subject 进行订阅

多播的 Observables

"多播 Observable" 通过 Subject 来发送通知,这个 Subject 可能有多个订阅者,然而普通的 "单播 Observable" 只发送通知给单个观察者。

多播 Observable 在底层是通过使用 Subject 使得多个观察者可以看见同一个 Observable 执行。

在底层,这就是 multicast 操作符的工作原理:观察者订阅一个基础的 Subject,然后 Subject 订阅源 Observable 。

import { Subject } from 'rxjs/internal/Subject';import { take, multicast } from 'rxjs/operators'; 	 const source = timer(1000, 2500).pipe(take(5)); const subject = new Subject(); subject.subscribe({  next: (v) => console.log('observerC: ' + v) }); subject.subscribe({  next: (v) => console.log('observerD: ' + v) }); const multicasted = source.pipe(multicast(subject)); multicasted.subscribe({  next: (v) => console.log('observerA: ' + v) }); multicasted.subscribe({  next: (v) => console.log('observerB: ' + v) });	source.subscribe(subject);

BehaviorSubject

Subject 的其中一个变体就是 BehaviorSubject,它有一个"当前值"的概念。它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从 BehaviorSubject 那接收到"当前值"。

BehaviorSubjects 适合用来表示"随时间推移的值"。举例来说,生日的流是一个 Subject,但年龄的流应该是一个 BehaviorSubject 。

import { BehaviorSubject } from 'rxjs';	//BehaviorSubject 使用值0进行初始化,当第一个观察者订阅时会得到0。第二个观察者订阅时会得到值2,尽管它是在值2发送之后订阅的。	const subject = new BehaviorSubject(0); // 0是初始值 subject.subscribe({  next: (v) => console.log('observerA: ' + v) }); subject.next(1); subject.next(2); subject.subscribe({  next: (v) => console.log('observerB: ' + v) }); subject.next(3);

ReplaySubject

ReplaySubject 类似于 BehaviorSubject,它可以发送旧值给新的订阅者,但它还可以记录 Observable 执行的一部分。

ReplaySubject 记录 Observable 执行中的多个值并将其回放给新的订阅者。

除了缓冲数量,你还可以指定 window time (以毫秒为单位)来确定多久之前的值可以记录。

import { ReplaySubject } from 'rxjs';	const subject = new ReplaySubject(3); // 为新的订阅者缓冲最后3个值 subject.subscribe({  next: (v) => console.log('observerA: ' + v) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({  next: (v) => console.log('observerB: ' + v) }); subject.next(5);	//我们缓存数量100,但 window time 参数只设置了120毫秒 const subject = new ReplaySubject(100, 120 /* windowTime */); subject.subscribe({  next: (v) => console.log('observerA: ' + v) }); let i = 1; setInterval(() => subject.next(i++), 200); setTimeout(() => {  subject.subscribe({  next: (v) => console.log('observerB: ' + v)  }); }, 1000);	

AsyncSubject

AsyncSubject 是另一个 Subject 变体,只有当 Observable 执行完成时(执行 complete()),它才会将执行的最后一个值发送给观察者。

AsyncSubject 和 last() 操作符类似,因为它也是等待 complete 通知,以发送一个单个值。

import { AsyncSubject } from 'rxjs'; const subject = new AsyncSubject(); subject.subscribe({  next: (v) => console.log('observerA: ' + v) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({  next: (v) => console.log('observerB: ' + v) }); subject.next(5); subject.complete();

Scheduler (调度器)

调度器控制着何时启动 subscription 和何时发送通知。它由三部分组成:

  • 调度器是一种数据结构。 它知道如何根据优先级或其他标准来存储任务和将任务进行排序。
  • 调度器是执行上下文。 它表示在何时何地执行任务(举例来说,立即的,或另一种回调函数机制(比如 setTimeout 或 process.nextTick),或动画帧)。
  • 调度器有一个(虚拟的)时钟。 调度器功能通过它的 getter 方法 now() 提供了"时间"的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。

调度器可以让你规定 Observable 在什么样的执行上下文中发送通知给它的观察者。

import { asyncScheduler, Observable } from 'rxjs';	//我们使用普通的 Observable ,它同步地发出值`1`、`2`、`3`,并使用操作符 `observeOn` 来指定 `async` 调度器发送这些值。	const observable = new Observable(subscriber => {  subscriber.next(1);  subscriber.next(2);  subscriber.next(3);  subscriber.complete(); })  .pipe(  observeOn(asyncScheduler)  ); console.log('just before subscribe'); observable.subscribe({  next: x => console.log('got value ' + x),  error: err => console.error('something wrong occurred: ' + err),  complete: () => console.log('done'), }); console.log('just after subscribe'); //你会发现"just after subscribe"在"got value..."之前就出现了 //just before subscribe //just after subscribe //got value 1 //got value 2 //got value 3 //done 

调度器类型

async 调度器是 RxJS 提供的内置调度器中的一个。可以通过使用 Scheduler 对象的静态属性创建并返回其中的每种类型的调度器。

调度器目的
null不传递任何调度器的话,会以同步递归的方式发送通知。用于定时操作或尾递归操作。
queueScheduler当前事件帧中的队列调度(蹦床调度器)。用于迭代操作。
asapScheduler微任务的队列调度,它使用可用的最快速的传输机制,比如 Node.js 的 process.nextTick() 或 Web Worker 的 MessageChannel 或 setTimeout 或其他。用于异步转换。
asyncScheduler使用 setInterval 的调度。用于基于时间的操作符。
animationFrameScheduler计划将在下一次浏览器内容重新绘制之前发生的任务。 可用于创建流畅的浏览器动画。

Pipeable(操作符)

操作符就是函数,管道操作符本质上是一个纯函数,它将一个Observable作为输入并生成另一个Observable作为输出。订阅输出Observable也将订阅输入Observable。 操作符有两种:

管道操作符是一个将Observable作为其输入并返回另一个Observable的函数。这是一个纯粹的操作:以前的Observable保持不变。

  1. 管道操作符是可以使用语法observableInstance.pipe(operator())传递给Observable的类型。 这些包括filter()mergeMap()。 调用时,它们不会更改现有的Observable实例。 相反,它们返回一个新的Observable,其订阅逻辑基于第一个Observable。

  2. 创建运算符是另一种运算符,可以称为独立函数来创建新的Observable。例如:of(1,2,3)创建一个observable ,该对象将依次发射1、2和3。创建运算符将在后面的部分中详细讨论。

obs.pipe( op1(), op2(), op3(), op3(),)

常用的操作符

finalize<T>(callback: () => void): MonoTypeOperatorFunction<T>:

返回原始Observable,但在Observable完成或发生错误终止时将调用指定的函数。

创建操作符

  • ajax
  • bindCallback
  • bindNodeCallback
  • defer
  • empty
  • from
  • fromEvent
  • fromEventPattern
  • generate
  • interval
  • of
  • range
  • throwError
  • timer
  • iif

连接创建操作符

These are Observable creation operators that also have join functionality -- emitting values of multiple source Observables.

  • combineLatest
  • concat
  • forkJoin
  • merge
  • partition
  • race
  • zip

转换操作符

  • buffer
  • bufferCount
  • bufferTime
  • bufferToggle
  • bufferWhen
  • concatMap
  • concatMapTo
  • exhaust
  • exhaustMap
  • expand
  • groupBy
  • map
  • mapTo
  • mergeMap
  • mergeMapTo
  • mergeScan
  • pairwise
  • partition
  • pluck
  • scan
  • switchMap
  • switchMapTo
  • window
  • windowCount
  • windowTime
  • windowToggle
  • windowWhen

过滤操作符

  • audit
  • auditTime
  • debounce
  • debounceTime
  • distinct
  • distinctKey
  • distinctUntilChanged
  • distinctUntilKeyChanged
  • elementAt
  • filter
  • first
  • ignoreElements
  • last
  • sample
  • sampleTime
  • single
  • skip
  • skipLast
  • skipUntil
  • skipWhile
  • take
  • takeLast
  • takeUntil
  • takeWhile
  • throttle
  • throttleTime

组合操作符

Also see the Join Creation Operators section above.

  • combineAll
  • concatAll
  • exhaust
  • mergeAll
  • startWith
  • withLatestFrom

多播操作符

  • multicast
  • publish
  • publishBehavior
  • publishLast
  • publishReplay
  • share

错误处理操作符

  • catchError
  • retry
  • retryWhen

工具操作符

  • tap
  • delay
  • delayWhen
  • dematerialize
  • materialize
  • observeOn
  • subscribeOn
  • timeInterval
  • timestamp
  • timeout
  • timeoutWith
  • toArray

条件和布尔操作符

  • defaultIfEmpty
  • every
  • find
  • findIndex
  • isEmpty

数学和聚合操作符

  • count
  • max
  • min
  • reduce








原文转载:http://www.shaoqun.com/a/490661.html

蜜芽宝贝官网:https://www.ikjzd.com/w/1320

淘粉8:https://www.ikjzd.com/w/1725.html

topia:https://www.ikjzd.com/w/2741


内容导航目录内容导航RxJS是什么RxJS的主要成员Observable(可观察对象)创建Observable订阅Observables执行Observables清理Observable执行Observer(观察者)Subscription(订阅)Subject(主体)多播的ObservablesBehaviorSubjectReplaySubjectAsyncSubjectScheduler(调
国际标准书号:国际标准书号
c88是什么:c88是什么
日本站注册指导2018:日本站注册指导2018
深圳到厦门有直达的火车或者高铁?要多久?:深圳到厦门有直达的火车或者高铁?要多久?
南风古灶在哪里?:南风古灶在哪里?

No comments:

Post a Comment