rxjs

介紹

RxJS是一個(gè)異步編程的庫(kù),同時(shí)它通過(guò)observable序列來(lái)實(shí)現(xiàn)基于事件的編程。它提供了一個(gè)核心的類(lèi)型:Observable,幾個(gè)輔助類(lèi)型(Observer,Schedulers,Subjects),受到Array的擴(kuò)展操作(map,filter,reduce,every等等)啟發(fā),允許直接處理異步事件的集合。

ReactiveX結(jié)合了Observer模式、Iterator模式和函數(shù)式編程和集合來(lái)構(gòu)建一個(gè)管理事件序列的理想方式。

在RxJS中管理異步事件的基本概念如下:

  • Observable:代表了一個(gè)調(diào)用未來(lái)值或事件的集合的概念
  • Observer:代表了一個(gè)知道如何監(jiān)聽(tīng)Observable傳遞過(guò)來(lái)的值的回調(diào)集合
  • Subscription:代表了一個(gè)可執(zhí)行的Observable,主要是用于取消執(zhí)行
  • Operators:是一個(gè)純函數(shù),允許處理集合與函數(shù)式編程風(fēng)格的操作,比如map、filter、concat、flatMap等
  • Subject:相當(dāng)于一個(gè)EventEmitter,它的唯一的方法是廣播一個(gè)值或事件給多個(gè)Observer
  • Schedulers:是一個(gè)集中式調(diào)度程序來(lái)控制并發(fā)性,允許我們?cè)趕etTimeout或者requestAnimationFrame上進(jìn)行協(xié)調(diào)計(jì)算

第一個(gè)例子

正常注冊(cè)一個(gè)事件監(jiān)聽(tīng)函數(shù):

var button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

使用RxJS,你可以創(chuàng)建一個(gè)observable來(lái)代替:

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .subscrible(() => console.log('Clicked!'));

純粹

使得RxJS變得如此強(qiáng)大的原因是它使用了純函數(shù),這意味著你的代碼很少會(huì)發(fā)生錯(cuò)誤。

正常你不會(huì)創(chuàng)建一個(gè)純函數(shù),代碼的其他部分可能擾亂你的狀態(tài)。

var count = 0;
var button = document.querySelector('button');
button.addEventListener('click', () => console.log(`Clicked $(++count) times`));

RxJS將隔離你的狀態(tài)

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .scan(count => count + 1, 0)
  .subscribe(count => console.log(`Clicked ${count} items`));

scan操作符類(lèi)似于arrays的reduce操作符。它需要一個(gè)回調(diào)函數(shù)作為一個(gè)參數(shù),函數(shù)返回的值將作為下次調(diào)用時(shí)的參數(shù)。

Flow

RxJS有一系列的操作符來(lái)幫你監(jiān)控事件將如何流動(dòng)。

這是一個(gè)每秒最多點(diǎn)擊一次的程序:

var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', () => {
  if (Date.now() - lastClick >= rate){
    console.log(`Clicked ${++count} times`);
    lastClick = Date.now();
  }
});

使用RxJS:

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .throttleTime(1000)
  .scan(count => count + 1, 0)
  .subscribe(count => console.log(`Clicked ${count} times`));

另外的控制符還有:filter, delay, debounceTime, take, takeUntil, distinct, distinctUntilChanged等。

你可以使用你的observables來(lái)轉(zhuǎn)換值。

這是一個(gè)每次點(diǎn)擊添加x坐標(biāo)的程序:

var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', (event) => {
  if (Date.now() - lastClick >= rate){
    count += event.clientX;
    console.log(count);
    lastClick = Date.now();
  }
})

使用Rxjs:

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .throttleTime(1000)
  .map(event => event.clientX)
  .scan((count, clientX) => count + clientX, 0)
  .subscribe(count => console.log(count));

另外的producing操作符:pluck、pairwise、sample等

Observable

Observables是一個(gè)延遲Push(關(guān)于Push的概念見(jiàn)后面)操作數(shù)據(jù)的集合。它們遵從下表:

Single Multiple
Pull Function Iterator
Push Promise Observable

舉個(gè)例子。下面是一個(gè)Observable,當(dāng)執(zhí)行subscribed,它將會(huì)立即push 1、 2、 3(同步),然后過(guò)去一秒后push 4

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000); 
});

為了調(diào)用Observable,然后看這些值,我們需要對(duì)這些數(shù)據(jù)進(jìn)行訂閱(subscribe)

var observable = Rx.Observable.create(function (observer){
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  })
});

console.log('just before subscribe');
observerble.subscribe({
  next: x => console.log(`got value` + x),
  error: err => console.error('somthing wrong occurred: ' +err),
  complete: () => console.log('done')
});
console.log('just after subscribe');

執(zhí)行結(jié)果如下:

just before subscribe
got value 1
got value 2
got value 3
just after sbuscribe
got value 4
done

Pull和Push

PullPush是關(guān)于數(shù)據(jù)提供者和數(shù)據(jù)消費(fèi)者交互的兩個(gè)不同的協(xié)議。

什么是Pull?在Pull系統(tǒng)中,當(dāng)Consumer收到Producer的數(shù)據(jù)時(shí),它會(huì)自己判斷是否接收該數(shù)據(jù),Producer自己并不知道數(shù)據(jù)將交給哪個(gè)Consumer。

所有的JavaScript函數(shù)都是一個(gè)Pull系統(tǒng)。函數(shù)是一個(gè)數(shù)據(jù)提供者,調(diào)用函數(shù)的代碼是一個(gè)consuming(消費(fèi)者),它將函數(shù)返回的值"pulling"出來(lái)。

ES2015介紹了generator functions and iterators (function*),它們是另外一種Pull系統(tǒng)。iterator.next() 是Consumer,它從iterator(Producer)中"pulling"出多個(gè)值

Producer Consumer
Pull 被動(dòng):當(dāng)需要時(shí)產(chǎn)生數(shù)據(jù) 主動(dòng):決定是否接收數(shù)據(jù)
Push 主動(dòng):自己決定將數(shù)據(jù)傳給誰(shuí) 被動(dòng):響應(yīng)式接收數(shù)據(jù)

什么是Push?在Push系統(tǒng)中,Producer決定將數(shù)據(jù)發(fā)往哪個(gè)Consumer。Consumer并不知道它自己的值來(lái)自哪個(gè)Producer

Promise是最常見(jiàn)的一個(gè)Push系統(tǒng)。一個(gè)Promise(Producer)分發(fā)一個(gè)結(jié)果值給注冊(cè)的接口(Consumer),與函數(shù)不同的是,Promise當(dāng)遇到值被"push"給callback時(shí),他會(huì)保證它傳遞的對(duì)象是正確的。

RxJS介紹了Observables,它是一個(gè)新的Push系統(tǒng)。Observable是一個(gè)提供多值的Producer,將它們"pushing"給Observers(Consumer)

  • Function:計(jì)算并同步調(diào)用一個(gè)值
  • generator:計(jì)算并同步調(diào)用多個(gè)值
  • Promise:計(jì)算后可能(不可能)返回一個(gè)值
  • Observable:計(jì)算然后同步或異步返回一個(gè)或多個(gè)值

Observable as generalizations of functions

與主流相反,Observable不像EventEmitters,也不像Promise。在某些情況下,Observable的行為可能像EventEmitters,比如當(dāng)使用RxJS的Subjects進(jìn)行多途徑傳播時(shí),但是大部分的情況它們都是不一樣的。

考慮下面的情況:

function foo(){
  console.log('Hello');
  return 42;
}

var x = foo.call();  //  same as foo()
console.log(x);
var y = foo.call();  //  same as foo()
console.log(y)

我們期望出現(xiàn)下面的結(jié)果:

"Hello"
42
"Hello"
42

當(dāng)使用Observables時(shí):

var foo = Rx.Observable.create(function (observer){
  console.log('Hello');
  observer.next(42);
});

foo.subscribe(function (x){
  console.log(x);
});
foo.subscribe(function (y){
  console.log(y);
})

它們有著同樣地輸出:

"Hello"
42
"Hello"
42

之所以出現(xiàn)這種情況是因?yàn)閒unction和Observables都是延遲(lazy)計(jì)算的。如果你不調(diào)用function,console.log('Hello')這段代碼是不會(huì)執(zhí)行的。Observables是同樣的,如果你不執(zhí)行(subscribe)它,代碼也不會(huì)執(zhí)行?!癱alling”和"subscribing"都是一個(gè)獨(dú)立的操作:兩個(gè)function分別導(dǎo)致兩個(gè)結(jié)果,兩個(gè)Observale subscribes trigger也會(huì)分別導(dǎo)致兩個(gè)結(jié)果。這與EventEmitters截然相反,EventEmitters會(huì)共享結(jié)果,并且它執(zhí)行的時(shí)候也不會(huì)顧忌到底是否有subscribers存在,Observables不會(huì)是共享結(jié)果,并且也是延遲執(zhí)行。

Subscribing一個(gè)Observable就像調(diào)用一個(gè)函數(shù)一樣

一些人要求Observables是異步的,這是不正確的??聪旅孢@個(gè)例子:

console.log('before');
console.log(foo.call());
console.log('after');

你將會(huì)看到這樣的輸出:

"before"
"Hello"
42
"after"

使用Observables

console.log('before');
foo.subscribe(function(x) {
  console.log(x);
});
console.log('after');

輸出是:

"before"
"Hello"
42
"after"

這證明了foo的訂閱是一個(gè)完完全全的異步,就像一個(gè)函數(shù)一樣。

Observables可以同步或異步地傳遞一個(gè)值

Observable和function的不同是什么呢?隨之時(shí)間的流逝,Observables可以“返回”多個(gè)值,函數(shù)是不可以的。你不可以這么做:

function foo(){
  console.log('Hello');
  return 42;
  return 100;  //  不會(huì)執(zhí)行到這兒
}

函數(shù)只能返回一次,Observables可以做到返回多次:

var foo = Rx.Observable.create(function (observer){
  console.log('Hello');
  observer.next(42);
  observer.next(100);  //  "return another value"
  observer.next(200);  //  "return" yet another
});

console.log('before');
foo.subscribe(function (x){
  console.log(x);
});
console.log('after');

同步輸出:

"before"
"Hello"
42
100
200
"after"

你也可以異步返回:

var foo = Rx.Observable.create(function (observer){
  console.log('Hello');
  observer.next(42);
  observer.next(100);
  observer.next(200);
  setTimeout(() => {
    observer.next(300);  //  異步
  }, 1000);
});

console.log('before');
foo.subscribe(function(x){
  console.log(x);
});
console.log('after');

輸出:

"before"
"Hello"
42
100
200
"after"
300

結(jié)論:

  • func.call()表示“同步給我一個(gè)數(shù)據(jù)”
  • observable.subscribe()表示“給我任何數(shù)量的值,同步或者異步”

解析一個(gè)Observable

Observables使用Rx.Observable.create或者一個(gè)構(gòu)造器創(chuàng)建(create),使用Observer來(lái)監(jiān)聽(tīng)(subscribed),執(zhí)行(execute)是通過(guò)投遞一個(gè)next/error/complete來(lái)通知其他的Observer,然后按照各自的意愿(disposed)來(lái)執(zhí)行。在一個(gè)Observable實(shí)例中,這四個(gè)方面都是通過(guò)編碼實(shí)現(xiàn)的,但是這些可能與其他的類(lèi)型相關(guān),比如Obsrever和Subscription。

Observable的核心點(diǎn):

  • Creating Observables
  • Subscribing to Observables
  • Executing the Observable
  • Disposing Observables

創(chuàng)建一個(gè)Observables

Rx.Observable.create是Observable構(gòu)造器的一個(gè)別名,他需要一個(gè)參數(shù):一個(gè)subscribe函數(shù)

下面的例子創(chuàng)建一個(gè)Observable,它的作用是每秒鐘輸出字符串'hi':

var observable = Rx.Observable.create(function subscrite(observer){
  var id = setInterval(() => {
    observer.next('hi')
  }, 1000);
});

Observables可以使用create創(chuàng)建,但是我們經(jīng)常使用creation operators,諸如from,interval等。

在上面的例子中,subscribe函數(shù)是描述Observable最重要的一部分,讓我們來(lái)看看subscribing是什么意思。

subscribing to Observables

Observable的observable可以被訂閱(subscribed),就像這樣:

observable.subscribe(x => console.log(x));

observable.scribe和Observable.create(function subscribe(observer) {...})中的subscribe有著相同的名字并不是巧合。在庫(kù)中,它們是不同的,但是在實(shí)際的用途中你可以在邏輯上把他們想成相同的。

同樣的Observable被多個(gè)Observers監(jiān)聽(tīng)時(shí),它們是不共享的。

Subscribing一個(gè)Observable像調(diào)用一個(gè)函數(shù)一樣,當(dāng)一個(gè)數(shù)據(jù)被傳遞時(shí)提供一個(gè)回調(diào)

這個(gè)addEventListener/removeEventListener這樣的API完全不一樣。observable.subscribe作為一個(gè)給定的觀察者,在Observable中并沒(méi)有像listener一樣被注冊(cè)。Observable甚至不需要維護(hù)一系列的Observers。

Executing observables

代碼Observable.create(function subscribe(observer) {...})代表了一個(gè)"Observable execution",它將僅僅在每個(gè)Observer的subscribes的延遲計(jì)算中。隨著時(shí)間的推移,將產(chǎn)生多個(gè)結(jié)果,同步或者異步。

Observable可以傳遞的有三種類(lèi)型:

  • "Next" notification:傳遞一個(gè)數(shù)值,諸如Number、String、Object等
  • “Error” notification:傳遞一個(gè)js異常
  • "Complete" notification:什么值都不傳遞

Next notifications是最重要的也是最常見(jiàn)的類(lèi)型:它們表示一個(gè)實(shí)際數(shù)據(jù)被送到Observer。在Observable Execute執(zhí)行期間Error和Complete最多會(huì)發(fā)生一次。

下面的語(yǔ)法是在Observable Grammar or Contract中最好的表達(dá):

next*(error|complete)?

在一個(gè)Observable Execute中,0或多個(gè)Next notifications可能被傳遞。如果有error或者Complete被傳遞,剩下的next將不會(huì)被傳遞。

下面是Observable execute傳遞3個(gè)Next notifications的例子:

var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
})

下面的例子中,Next notification 4不會(huì)被傳遞:

var observable = Rx.Observable.create(function subscribe(observer){
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
  observer.next(4);  //  不會(huì)被執(zhí)行
})

用tru/catch代碼快包裹起來(lái)是個(gè)好主意:

var observable = Rx.Observable.create(function subscribe(observer) {
  try {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
  } catch (err) {
    observer.error(err); // delivers an error if it caught one
  }
});

處理(Disposing)Observable Executions

Observable Executing的個(gè)數(shù)可能是無(wú)限個(gè),Observer中應(yīng)該處理有限個(gè)next,所以我們需要一個(gè)API來(lái)停止execution。因?yàn)閑xecution在每個(gè)Observer中都是獨(dú)立的,一旦Observer完成接收值,它必須有一個(gè)方法來(lái)停止executing。

當(dāng) observable.subscribe 被調(diào)用,Observer將被附加到一個(gè)新創(chuàng)建的Observable execution中,這次調(diào)用將返回一個(gè)對(duì)象,即Subscription:

var subscription = observable.subscribe(x => console.log(x));

Subscription代表了一個(gè)進(jìn)行中的executing,它有一個(gè)最小的API允許你取消execution??梢栽谶@里閱讀更多有關(guān)于 Subscription type here 的東西。使用 subscription.unsubscribe() 你可以取消正在進(jìn)行的execution:

var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
//  Later:
subscription.unsubscribe();

當(dāng)我們使用create()創(chuàng)建一個(gè)Observable時(shí),我們必須定義execution怎么處理資源。你可以通過(guò)返回一個(gè)自定義的 unsubscribe 函數(shù)來(lái)實(shí)現(xiàn)該步驟。

var observable = Rx.Observable.create(function subscribe(observer){
  var intervalID = setInterval(() => {
    observer.next('hi')
  });

  return function unsubscribe(){
    clearInterval(intervalID);
  }
})

然后這樣來(lái)調(diào)用:

function subscribe(observer) {
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);

  return function unsubscribe() {
    clearInterval(intervalID);
  };
}

var unsubscribe = subscribe({next: (x) => console.log(x)});

// Later:
unsubscribe(); // dispose the resources

Observer

什么是Observer?一個(gè)Observer是Observable傳遞過(guò)來(lái)的數(shù)據(jù)的customer。Observers是一個(gè)簡(jiǎn)單的一些列的回調(diào),next、error、和 complete用來(lái)傳遞數(shù)據(jù)。下面的例子展現(xiàn)了一個(gè)典型的Observer對(duì)象:

var observer = {
  next: x => console.log('Observable got a next value: ' + x),
  error: err => console.log('Observable got and error: ' + err),
  complete: () => console.log('Observable got a complete notification')
};

為了使用Observalbe,提供了一個(gè)subscribe:

observable.subscribe(observer)

你也可以提供部分回調(diào):

var observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
};

當(dāng)你訂閱(subscribing)一個(gè)Observable時(shí),你也許僅僅只提供一個(gè)函數(shù)作為參數(shù):

observable.subscribe(x => console.log('Observer got a next value: ' + x));

在observable.subscribe的內(nèi)部,他將使用第一個(gè)回調(diào)創(chuàng)建一個(gè)Observer對(duì)象作為一個(gè)next handler。所有的callback類(lèi)型都可能被提供:

observable.subscribe(
  x => console.log('Observer got a next value: ' + x),
  err => console.error('Observer got an error: ' + err),
  () => console.log('Observer got a complete notification')
);

Subscription

什么是Subscription?一個(gè)Subscription代表了一個(gè)一次性的資源,通常表示的是一個(gè)Observable execution。一個(gè)Subscription有一個(gè)重要的方法,unsubscribe,它不需要參數(shù),僅僅是處理subscription的資源。在之前的RxJS版本中,Subscription被稱(chēng)作"Disposable"。

var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
// Later:
// This cancels the ongoing Observable execution which
// was started by calling subscribe with an Observer.
subscription.unsubscribe();

一個(gè)Subscription實(shí)質(zhì)上是一個(gè)unsubscribe()函數(shù),用來(lái)釋放資源或者取消一個(gè)Observable executions。

Subscriptions也可以放在一起,這樣會(huì)導(dǎo)致使用一個(gè)unsubscribe()將取消多個(gè)Observable executions。你可以這么做:

var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);

var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

setTimeout(() => {
  // Unsubscribes BOTH subscription and childSubscription
  subscription.unsubscribe();
}, 1000);

當(dāng)執(zhí)行時(shí),我們將看到如下輸出:

second: 0
first: 0
second: 1
first: 1
second: 2

Subscriptions有一個(gè)remove(otherSubscription)方法,用來(lái)移除關(guān)聯(lián)的Subscirption

Subject

什么是Subject?一個(gè)RxJS Subject是一個(gè)特殊類(lèi)型的Observable,它允許值可以多路廣播給多個(gè)Observers。普通的Observables是單路廣播(每個(gè)subscribed Observer擁有自己獨(dú)立的Observable execution),Subjects是多路廣播。

一個(gè)Subject像一個(gè)Observable,但是可以多路廣播給Observers。Subjects像Eventmitters:它們維持許多注冊(cè)過(guò)的監(jiān)聽(tīng)器。

每個(gè)subject是一個(gè)Observable。給定一個(gè)Subject,你可以通過(guò)提供一個(gè)Observer來(lái)訂閱(subscribe)它,然后開(kāi)始正常的接收值。從Observer的角度來(lái)看,他不能告知Observer的Observable execution到底是來(lái)自一個(gè)不同的單路傳播的Observable,還是來(lái)自Subject。

在Subject的內(nèi)部,subscribe并沒(méi)有調(diào)用一個(gè)新的execute去傳遞數(shù)據(jù)。它只是簡(jiǎn)單的注冊(cè)O(shè)bservers列表中的一個(gè)Observer,類(lèi)似于addListener的使用。

每個(gè)subject是一個(gè)Observer。他是擁有next(v),error(e)和complete()方法的對(duì)象。為了給Subject一個(gè)新值,只需要調(diào)用 next(theValue),他講多路傳播給注冊(cè)過(guò)的Observer。

在下面的例子中,我們?cè)赟ubject中注冊(cè)了兩個(gè)Observers,我們傳遞一些值給Subject:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

輸出:

observerA: 1
observerB: 1
observerA: 2
observerB: 2

因?yàn)镾ubject同時(shí)也是一個(gè)Observer,這意味著你應(yīng)該提供一個(gè)Subject作為Observable的subscribe的參數(shù),像這樣:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject);  // You can subscribe providing a Subject

執(zhí)行如下:

observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

在上面的處理中,我們本質(zhì)上僅僅是通過(guò)Subject將一個(gè)單路廣播的Observable execution變?yōu)槎嗦窂V播的。這個(gè)演示展示了Subjects是怎樣將單路廣播變?yōu)槎嗦窂V播的。

這里有幾個(gè)特殊的Subject類(lèi)型:BehaviorSubject、ReplaySubject和AsyncSubject。

Multicasted Observables

一個(gè)"multicasted Observable"的實(shí)現(xiàn)是通過(guò)Subject的多個(gè)訂閱(subscribers)來(lái)實(shí)現(xiàn)的,然而一個(gè)"unicast Observable"僅僅只通知一個(gè)單一的Observer。

在后臺(tái),multicast是這樣操作的:Observers訂閱(subscribe)一個(gè)相關(guān)的Subject,Subject訂閱一個(gè)Observable源。

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
  next: (v) => console.log('observerAa: ' + v)
});
muticasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// This is, under the hood, `source.subscribe(subject)`:
muticasted.connect();

Reference counting

手動(dòng)的調(diào)用connect()來(lái)處理Subscription是很麻煩的。通常,我們希望當(dāng)?shù)谝粋€(gè)Observer到達(dá)時(shí),能夠自動(dòng)connect,當(dāng)最后一個(gè)Observer被移除時(shí),自動(dòng)取消shared execution。

看看下面這些訂閱發(fā)生時(shí)的列表:

  1. 第一個(gè)Observer訂閱multicasted Observable
  2. multicasted observable連接
  3. next value 0被傳遞給第一個(gè)Observer
  4. 第二個(gè)Observer訂閱multicasted Observable
  5. next value 1被傳遞給第一個(gè)Observer
  6. next value 1被傳遞給第二個(gè)Observer
  7. 第一個(gè)Observer解除監(jiān)聽(tīng)
  8. next value2被傳遞給第二個(gè)Observer
  9. 第二個(gè)Observer解除監(jiān)聽(tīng)
  10. 與multicasted observable連接的Observable解除連接

看下面的代碼:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();

setTimeout(() => {
  subscription2 = multicasted.subscribe({
    next: v => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  subscrption1.unscribe();
}, 1200);

// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
  subscription2.unsubscribe();
  subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);

如果我們希望避免一遍遍地調(diào)用connect(),我們可以使用ConnectableObservable的refCount()方法(reference counting),它返回一個(gè)Observable來(lái)跟蹤有多少個(gè)訂閱者(subscribers)。當(dāng)訂閱者從0增加到1時(shí),它將自動(dòng)調(diào)用connect(),只有當(dāng)訂閱者從1變?yōu)?時(shí),它才會(huì)disconnect。

看下面的例子:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscrption1, subscription2, subscriptionConnect;


// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log('observerA: ' + v);
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log('observerA: ' + v)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);

執(zhí)行結(jié)果:

observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed

refCount()方法僅存在ConnectableObservable中,它返回一個(gè)Observable,而不是另外的ConnectableObservable。

BehaviorSubject

Subjects的一種變形是BehaviorSubject,它有一個(gè)"the current value" 的概念。它存儲(chǔ)了consumer最后一次執(zhí)行時(shí)的value,每當(dāng)一個(gè)Observer訂閱時(shí),它都會(huì)立即從BehaviorSubject接收一個(gè)"current value"。

例子:

var subject = new Rx.BehaviorSubject(0);  //  0 is the inital value

subject.subscribe({
  next: v => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: v = console.log('observerB: ' + v)
});

subject.next(3);

輸出:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

ReplaySubject

功能和它的名字一樣:

var subject = new Rx.ReplaySubject(3);  // buffer 3 values for new subscribers

subject.subscribe({
  next: v => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: v => console.log('observerB: ' + v)
});

subject.next(5);

輸出:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

你還可以指定一個(gè)以毫秒為單位的窗口事時(shí)間,除了buffer size之外,決定記錄的值可以重復(fù)(時(shí)間內(nèi))。

var subject = new Rx.ReplaySubject(100, 500);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

var i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: v => console.log('observerB: ' + v)
  });
}, 1000)

下面的輸出中,第二個(gè)Observer在最后500ms內(nèi)得到的數(shù)值為3、 4、 5:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...

AsyncSubject

AsyncSubject表示只有最后一個(gè)Observable execution的值會(huì)被發(fā)送給observers,僅僅發(fā)生在執(zhí)行完成時(shí)

var subject = new Rx.AsyncSubject();

subject.subscrbe({
  next: v => console.log('onbserverA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

輸出:

observerA: 5
observerB: 5

AsyncSubject類(lèi)似于一個(gè)last() operator,他等待complete通知來(lái)傳遞一個(gè)唯一的值。

Opeartors

RxJS最有用的一部分就是operators,即使Observable是最基礎(chǔ)的。Operators最基本的要點(diǎn)是允許復(fù)雜的代碼變得簡(jiǎn)單化。

什么是Operators?

Opeartors是Obsrevable的方法,就像map(),filter(),merge()等。當(dāng)它被調(diào)用時(shí),它們并不改變已經(jīng)存在的Observable,而是返回一個(gè)基于第一個(gè)Observable上新的Observable。

一個(gè)Operator本質(zhì)上是一個(gè)純函數(shù),它接收一個(gè)Observable,基于其上返回一個(gè)新的Observable。在下面的例子中,我們創(chuàng)建了一個(gè)自定義的operator方法:

function multiplayByTen(input){
  var output = Rx.Observable.create(function subscribe(observer){
    input.subscribe({
      next: v => observer.next(10 * v),
      error: err => observer.error(err),
      complete: () => observer.complete()
    });
  });
return output;
}

var input = Rx.Observable.from([1, 2, 3 ,4]);
var output = multiplayByTen(input);
output.subscribe(x => console.log(x));

輸出為:

10
20
30
40

注意訂閱(subscribe)的輸出將導(dǎo)致輸入的Observable可觀測(cè)的變化。我們稱(chēng)這個(gè)為"operator subscription chain"。

Instance opeartors versus static operators

什么是instance operator?最常見(jiàn)的情況是當(dāng)你引用一個(gè)opeartors時(shí),我們假定實(shí)現(xiàn)了一個(gè)operator,它是Observable實(shí)例的一個(gè)方法。例如,如果multiplayByTen operator變成一個(gè)官方的operator,它看起來(lái)會(huì)是這樣:

Rx.Observable.prototype.multiplyByTen = function multiplyByTen(){
  var input = this;
  return Rx.subscrible.create(function subscribe(observer){
    input.subccribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
}

Instance operators是一個(gè)實(shí)例運(yùn)算符,我們使用它來(lái)推斷可觀測(cè)的輸入。

注意,input observable不再是一個(gè)函數(shù)參數(shù):

var observable = Rx.Observable.from([1, 2, 3, 4]).multiplyByTen();
observable.subscribe(x => console.log(x));

什么是static operator?除了instance operators之外,static operators是直接附加在Observable類(lèi)上的方法。一個(gè)static operator使用內(nèi)部的this進(jìn)行操作,但是并不完全依賴(lài)它的參數(shù)。

static operators是附著在Observable類(lèi)上的純函數(shù),通常用于創(chuàng)建Observable

最常見(jiàn)的static operators類(lèi)型是Create Operators,他不是將一個(gè)Observable改變成另外一個(gè)Observable,它們簡(jiǎn)單的獲得一個(gè)non-Observable參數(shù),比如number,然后create一個(gè)新的Observable。

一個(gè)典型的例子是使用interval函數(shù)。它獲取一個(gè)數(shù)值(不是一個(gè)Observable)作為輸入?yún)?shù),然后輸出一個(gè)Observable:

var observable = Rx.Observable.interval(1000 /* number of milliseconds */)

創(chuàng)建一個(gè)creation operaor的另外一個(gè)例子是create,就是我們之前一直在使用的例子。 all static creation operators here

然而,static operators也許和普通的creation性質(zhì)不同。一些Combination Operator也許是靜態(tài)的,比如merge、combineLatest、concat等。將這些作為靜態(tài)是有意義的,因?yàn)樗鼈儼裮ultiple Observales作為輸入,不只是一個(gè),比如:

var observable1 = Rx.Observable.interval(1000);
var observable2 = Rx.Observable.interval(400);

var merged = Rx.Observable.merge(observable1, observable2);

Marble diagrams

為了解釋operators是如何工作的,光是文本解釋是不夠的。許多operators和時(shí)間有關(guān),它們可能會(huì)延遲執(zhí)行,例如,throttle等。圖標(biāo)往往能夠比文字更多表達(dá)清楚。Marble Diagrams能夠可視化的表現(xiàn)出operators是如何工作的,包括輸入的Observable(s),operator和它的參數(shù),以及輸出的Observable

在一個(gè)marble diagram中,隨著時(shí)間的流逝,它會(huì)描述值("marbles")在Observable execution上傳遞。

你可以在下面看到marble diagram的解析:

Paste_Image.png
  • 時(shí)間從左往右流動(dòng),代表input Observable的execution
  • 這些代表Observable傳遞傳來(lái)的值
  • 這個(gè)豎線表示"complete" notification,它表明Observable已經(jīng)成功完成了。
  • 這個(gè)方框表示input Observable的operator(上圖)產(chǎn)生出的output Observable(下圖)。方框內(nèi)的文字表示轉(zhuǎn)變的屬性。
  • 這個(gè)Observable是調(diào)用operator產(chǎn)生的
  • 這個(gè)X代表output Observable發(fā)出的錯(cuò)誤,說(shuō)明因?yàn)槟承┰蚨惓=K止。

在這個(gè)網(wǎng)站的站點(diǎn),我們會(huì)廣泛的使用marble diagrams去解釋operators是如何工作的。它們也許在其他的地方也很有用,比如單元測(cè)試等。

選擇一個(gè)operator

你需要為你的程序選擇一個(gè)適當(dāng)?shù)膐perator嗎?先從下面的列表選擇一個(gè):

  • 我已經(jīng)有了一個(gè)Observable
  • 我想改變每個(gè)傳遞的值
    • 讓它成為一個(gè)固定(constant)的值
      • 你應(yīng)該使用mapTo
    • 通過(guò)公式計(jì)算出來(lái)的值
      • 你應(yīng)該使用map
  • 我想選擇每個(gè)傳遞值的屬性
    • 你應(yīng)該使用pluck
  • 我想查看每個(gè)被傳遞的值,但是不影響它們
    • 你應(yīng)該使用do
  • 我想過(guò)濾某些值
    • 基于一個(gè)自定義的邏輯
      • 你應(yīng)該使用filter

更多內(nèi)容參考官網(wǎng):Choose an operator

operators的分類(lèi)

參考官網(wǎng):Categories of operators

Scheduler

什么是Scheduler?當(dāng)一個(gè)subscription開(kāi)始工作或者notifications被傳遞,scheduler就會(huì)開(kāi)始調(diào)圖。它包含三個(gè)組件。

  • 一個(gè)Scheduler是一個(gè)數(shù)據(jù)結(jié)構(gòu)(data structure)。它知道如何基于優(yōu)先級(jí)或者其它標(biāo)準(zhǔn)進(jìn)行存儲(chǔ),執(zhí)行隊(duì)列任務(wù)
  • 一個(gè)Scheduler是一個(gè)執(zhí)行上下文(execution context)。它表示task在哪個(gè)地方,什么時(shí)候執(zhí)行()
  • 一個(gè)Scheduler是一個(gè)(虛擬(virtual))時(shí)鐘。它基于scheduler上的getter方法now(),提出了一個(gè)"時(shí)間(time)"的概念。任務(wù)被安排在一個(gè)特殊的調(diào)度器中,它會(huì)遵守給它的時(shí)間。

看下面例子中,我們使用之前已經(jīng)寫(xiě)過(guò)的例子,同步傳遞數(shù)值1、2、 3,然后使用observerOn操作符來(lái)指定異步調(diào)度:

var observable = Rx.Observable.create(function (observer) {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
})
.observerOn(Rx.Scheduler.async);

console.log('just before subscribe');
observable.subscribe({
    next: x => console.log('got value ' + x),
    error: err => console.error('something wrong occurred: ' + err),
    complete: () => console.log('done')
});
console.log('just after subscribe');

輸出:

just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done

注意got value這個(gè)語(yǔ)句實(shí)在 just after subscribe只有打印輸出的,這和我們看到的代碼順序不一致。這時(shí)因?yàn)?observerOn(Rx.Scheduler.async)在Observable.create和最后一個(gè)Observer之間引入了一個(gè)代理的Observer。讓我們重新為一些標(biāo)識(shí)符取名,以便讓他們之間有著明顯的差別:

var observable = Rx.Observable.create(function (proxyObserver) {
    proxyObserver.next(1);
    proxyObserver.next(2);
    proxyObserver.next(3);
    proxyObserver.complete();
})
    .observeOn(Rx.Scheduler.async);

var finalObserver = {
    next: x => console.log('got value ' + x),
    error: err => console.error('something wrong occurred: ' + err),
    complete: () => console.log('done')
};

console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');

proxyObserver在observeOn(Rx.Scheduler.async)中創(chuàng)建,它的next(val)方法大概像下面這樣:

var proxyObserver = {
  next: (val) => {
    Rx.Scheduler.async.schedule(
      (x) => finalObserver.next(x),
      0 /* delay */,
      val /* will be the x for the function above */
    );
  },

  // ...
}

這有點(diǎn)兒像setTimeout或者setInterval是異步調(diào)度操作,即使給定的delay為0。按照慣例,在JS中,setTimeout(fn, 0)知道運(yùn)行fn函數(shù)的時(shí)機(jī)最早是下一次循環(huán)隊(duì)列初。這也說(shuō)明了為什么 got value 1是最后運(yùn)行的。

可以給Scheduler的schedule傳遞一個(gè)延時(shí)(delay)參數(shù),它可以讓Scheduler內(nèi)部的時(shí)鐘去延時(shí)到指定時(shí)間。Scheduler的時(shí)鐘和真實(shí)的時(shí)鐘沒(méi)有任何關(guān)系。它更類(lèi)似于延時(shí),而不是運(yùn)行指定的時(shí)間。

Scheduler類(lèi)型

異步Scheduler只是RxJS提供的一種Scheduler。通過(guò)使用Scheduler的靜態(tài)方法可以創(chuàng)建下面的類(lèi)型

Scheduler Purpose
null 不使用Scheduler, notifications將會(huì)被同步和遞歸地交付給Observer。使用這個(gè)來(lái)進(jìn)行常量操作或者尾部遞歸操作
Rx.Scheduler.queue Schedules on a queue in the current event frame (trampoline scheduler). Use this for iteration operations.
Rx.Scheduler.asap Schedules on the micro task queue, which uses the fastest transport mechanism available, either Node.js' process.nextTick() or Web Worker MessageChannel or setTimeout or others. Use this for asynchronous conversions.
Rx.Scheduler.async Schedules work with setInterval. Use this for time-based operations.

使用Schedulers

見(jiàn)Using Schedulers

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容