介紹
RxJS是一個(gè)異步編程的庫(kù),同時(shí)它通過(guò)observable序列來(lái)實(shí)現(xiàn)基于事件的編程。它提供了一個(gè)核心的類(lèi)型:Observable,幾個(gè)輔助類(lèi)型(Observer,Schedulers,Subjects),受到Array的擴(kuò)展操作(map,filter,reduce,every等等)啟發(fā),允許直接處理異步事件的集合。
ReactiveX結(jié)合了Observer模式、Iterator模式和函數(shù)式編程和集合來(lái)構(gòu)建一個(gè)管理事件序列的理想方式。
在RxJS中管理異步事件的基本概念如下:
- Observable:代表了一個(gè)調(diào)用未來(lái)值或事件的集合的概念
- Observer:代表了一個(gè)知道如何監(jiān)聽(tīng)Observable傳遞過(guò)來(lái)的值的回調(diào)集合
- Subscription:代表了一個(gè)可執(zhí)行的Observable,主要是用于取消執(zhí)行
- Operators:是一個(gè)純函數(shù),允許處理集合與函數(shù)式編程風(fēng)格的操作,比如map、filter、concat、flatMap等
- Subject:相當(dāng)于一個(gè)EventEmitter,它的唯一的方法是廣播一個(gè)值或事件給多個(gè)Observer
- Schedulers:是一個(gè)集中式調(diào)度程序來(lái)控制并發(fā)性,允許我們?cè)趕etTimeout或者requestAnimationFrame上進(jìn)行協(xié)調(diào)計(jì)算
第一個(gè)例子
正常注冊(cè)一個(gè)事件監(jiān)聽(tīng)函數(shù):
var button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));
使用RxJS,你可以創(chuàng)建一個(gè)observable來(lái)代替:
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.subscrible(() => console.log('Clicked!'));
純粹
使得RxJS變得如此強(qiáng)大的原因是它使用了純函數(shù),這意味著你的代碼很少會(huì)發(fā)生錯(cuò)誤。
正常你不會(huì)創(chuàng)建一個(gè)純函數(shù),代碼的其他部分可能擾亂你的狀態(tài)。
var count = 0;
var button = document.querySelector('button');
button.addEventListener('click', () => console.log(`Clicked $(++count) times`));
RxJS將隔離你的狀態(tài)
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.scan(count => count + 1, 0)
.subscribe(count => console.log(`Clicked ${count} items`));
scan操作符類(lèi)似于arrays的reduce操作符。它需要一個(gè)回調(diào)函數(shù)作為一個(gè)參數(shù),函數(shù)返回的值將作為下次調(diào)用時(shí)的參數(shù)。
Flow
RxJS有一系列的操作符來(lái)幫你監(jiān)控事件將如何流動(dòng)。
這是一個(gè)每秒最多點(diǎn)擊一次的程序:
var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', () => {
if (Date.now() - lastClick >= rate){
console.log(`Clicked ${++count} times`);
lastClick = Date.now();
}
});
使用RxJS:
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.throttleTime(1000)
.scan(count => count + 1, 0)
.subscribe(count => console.log(`Clicked ${count} times`));
另外的控制符還有:filter, delay, debounceTime, take, takeUntil, distinct, distinctUntilChanged等。
值
你可以使用你的observables來(lái)轉(zhuǎn)換值。
這是一個(gè)每次點(diǎn)擊添加x坐標(biāo)的程序:
var count = 0;
var rate = 1000;
var lastClick = Date.now() - rate;
var button = document.querySelector('button');
button.addEventListener('click', (event) => {
if (Date.now() - lastClick >= rate){
count += event.clientX;
console.log(count);
lastClick = Date.now();
}
})
使用Rxjs:
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.throttleTime(1000)
.map(event => event.clientX)
.scan((count, clientX) => count + clientX, 0)
.subscribe(count => console.log(count));
另外的producing操作符:pluck、pairwise、sample等
Observable
Observables是一個(gè)延遲Push(關(guān)于Push的概念見(jiàn)后面)操作數(shù)據(jù)的集合。它們遵從下表:
| Single | Multiple | |
|---|---|---|
| Pull | Function | Iterator |
| Push | Promise | Observable |
舉個(gè)例子。下面是一個(gè)Observable,當(dāng)執(zhí)行subscribed,它將會(huì)立即push 1、 2、 3(同步),然后過(guò)去一秒后push 4
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
為了調(diào)用Observable,然后看這些值,我們需要對(duì)這些數(shù)據(jù)進(jìn)行訂閱(subscribe)
var observable = Rx.Observable.create(function (observer){
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
})
});
console.log('just before subscribe');
observerble.subscribe({
next: x => console.log(`got value` + x),
error: err => console.error('somthing wrong occurred: ' +err),
complete: () => console.log('done')
});
console.log('just after subscribe');
執(zhí)行結(jié)果如下:
just before subscribe
got value 1
got value 2
got value 3
just after sbuscribe
got value 4
done
Pull和Push
Pull和Push是關(guān)于數(shù)據(jù)提供者和數(shù)據(jù)消費(fèi)者交互的兩個(gè)不同的協(xié)議。
什么是Pull?在Pull系統(tǒng)中,當(dāng)Consumer收到Producer的數(shù)據(jù)時(shí),它會(huì)自己判斷是否接收該數(shù)據(jù),Producer自己并不知道數(shù)據(jù)將交給哪個(gè)Consumer。
所有的JavaScript函數(shù)都是一個(gè)Pull系統(tǒng)。函數(shù)是一個(gè)數(shù)據(jù)提供者,調(diào)用函數(shù)的代碼是一個(gè)consuming(消費(fèi)者),它將函數(shù)返回的值"pulling"出來(lái)。
ES2015介紹了generator functions and iterators (function*),它們是另外一種Pull系統(tǒng)。iterator.next() 是Consumer,它從iterator(Producer)中"pulling"出多個(gè)值
| Producer | Consumer | |
|---|---|---|
| Pull | 被動(dòng):當(dāng)需要時(shí)產(chǎn)生數(shù)據(jù) | 主動(dòng):決定是否接收數(shù)據(jù) |
| Push | 主動(dòng):自己決定將數(shù)據(jù)傳給誰(shuí) | 被動(dòng):響應(yīng)式接收數(shù)據(jù) |
什么是Push?在Push系統(tǒng)中,Producer決定將數(shù)據(jù)發(fā)往哪個(gè)Consumer。Consumer并不知道它自己的值來(lái)自哪個(gè)Producer
Promise是最常見(jiàn)的一個(gè)Push系統(tǒng)。一個(gè)Promise(Producer)分發(fā)一個(gè)結(jié)果值給注冊(cè)的接口(Consumer),與函數(shù)不同的是,Promise當(dāng)遇到值被"push"給callback時(shí),他會(huì)保證它傳遞的對(duì)象是正確的。
RxJS介紹了Observables,它是一個(gè)新的Push系統(tǒng)。Observable是一個(gè)提供多值的Producer,將它們"pushing"給Observers(Consumer)
- Function:計(jì)算并同步調(diào)用一個(gè)值
- generator:計(jì)算并同步調(diào)用多個(gè)值
- Promise:計(jì)算后可能(不可能)返回一個(gè)值
- Observable:計(jì)算然后同步或異步返回一個(gè)或多個(gè)值
Observable as generalizations of functions
與主流相反,Observable不像EventEmitters,也不像Promise。在某些情況下,Observable的行為可能像EventEmitters,比如當(dāng)使用RxJS的Subjects進(jìn)行多途徑傳播時(shí),但是大部分的情況它們都是不一樣的。
考慮下面的情況:
function foo(){
console.log('Hello');
return 42;
}
var x = foo.call(); // same as foo()
console.log(x);
var y = foo.call(); // same as foo()
console.log(y)
我們期望出現(xiàn)下面的結(jié)果:
"Hello"
42
"Hello"
42
當(dāng)使用Observables時(shí):
var foo = Rx.Observable.create(function (observer){
console.log('Hello');
observer.next(42);
});
foo.subscribe(function (x){
console.log(x);
});
foo.subscribe(function (y){
console.log(y);
})
它們有著同樣地輸出:
"Hello"
42
"Hello"
42
之所以出現(xiàn)這種情況是因?yàn)閒unction和Observables都是延遲(lazy)計(jì)算的。如果你不調(diào)用function,console.log('Hello')這段代碼是不會(huì)執(zhí)行的。Observables是同樣的,如果你不執(zhí)行(subscribe)它,代碼也不會(huì)執(zhí)行?!癱alling”和"subscribing"都是一個(gè)獨(dú)立的操作:兩個(gè)function分別導(dǎo)致兩個(gè)結(jié)果,兩個(gè)Observale subscribes trigger也會(huì)分別導(dǎo)致兩個(gè)結(jié)果。這與EventEmitters截然相反,EventEmitters會(huì)共享結(jié)果,并且它執(zhí)行的時(shí)候也不會(huì)顧忌到底是否有subscribers存在,Observables不會(huì)是共享結(jié)果,并且也是延遲執(zhí)行。
Subscribing一個(gè)Observable就像調(diào)用一個(gè)函數(shù)一樣
一些人要求Observables是異步的,這是不正確的??聪旅孢@個(gè)例子:
console.log('before');
console.log(foo.call());
console.log('after');
你將會(huì)看到這樣的輸出:
"before"
"Hello"
42
"after"
使用Observables
console.log('before');
foo.subscribe(function(x) {
console.log(x);
});
console.log('after');
輸出是:
"before"
"Hello"
42
"after"
這證明了foo的訂閱是一個(gè)完完全全的異步,就像一個(gè)函數(shù)一樣。
Observables可以同步或異步地傳遞一個(gè)值
Observable和function的不同是什么呢?隨之時(shí)間的流逝,Observables可以“返回”多個(gè)值,函數(shù)是不可以的。你不可以這么做:
function foo(){
console.log('Hello');
return 42;
return 100; // 不會(huì)執(zhí)行到這兒
}
函數(shù)只能返回一次,Observables可以做到返回多次:
var foo = Rx.Observable.create(function (observer){
console.log('Hello');
observer.next(42);
observer.next(100); // "return another value"
observer.next(200); // "return" yet another
});
console.log('before');
foo.subscribe(function (x){
console.log(x);
});
console.log('after');
同步輸出:
"before"
"Hello"
42
100
200
"after"
你也可以異步返回:
var foo = Rx.Observable.create(function (observer){
console.log('Hello');
observer.next(42);
observer.next(100);
observer.next(200);
setTimeout(() => {
observer.next(300); // 異步
}, 1000);
});
console.log('before');
foo.subscribe(function(x){
console.log(x);
});
console.log('after');
輸出:
"before"
"Hello"
42
100
200
"after"
300
結(jié)論:
- func.call()表示“同步給我一個(gè)數(shù)據(jù)”
- observable.subscribe()表示“給我任何數(shù)量的值,同步或者異步”
解析一個(gè)Observable
Observables使用Rx.Observable.create或者一個(gè)構(gòu)造器創(chuàng)建(create),使用Observer來(lái)監(jiān)聽(tīng)(subscribed),執(zhí)行(execute)是通過(guò)投遞一個(gè)next/error/complete來(lái)通知其他的Observer,然后按照各自的意愿(disposed)來(lái)執(zhí)行。在一個(gè)Observable實(shí)例中,這四個(gè)方面都是通過(guò)編碼實(shí)現(xiàn)的,但是這些可能與其他的類(lèi)型相關(guān),比如Obsrever和Subscription。
Observable的核心點(diǎn):
- Creating Observables
- Subscribing to Observables
- Executing the Observable
- Disposing Observables
創(chuàng)建一個(gè)Observables
Rx.Observable.create是Observable構(gòu)造器的一個(gè)別名,他需要一個(gè)參數(shù):一個(gè)subscribe函數(shù)
下面的例子創(chuàng)建一個(gè)Observable,它的作用是每秒鐘輸出字符串'hi':
var observable = Rx.Observable.create(function subscrite(observer){
var id = setInterval(() => {
observer.next('hi')
}, 1000);
});
Observables可以使用create創(chuàng)建,但是我們經(jīng)常使用creation operators,諸如from,interval等。
在上面的例子中,subscribe函數(shù)是描述Observable最重要的一部分,讓我們來(lái)看看subscribing是什么意思。
subscribing to Observables
Observable的observable可以被訂閱(subscribed),就像這樣:
observable.subscribe(x => console.log(x));
observable.scribe和Observable.create(function subscribe(observer) {...})中的subscribe有著相同的名字并不是巧合。在庫(kù)中,它們是不同的,但是在實(shí)際的用途中你可以在邏輯上把他們想成相同的。
同樣的Observable被多個(gè)Observers監(jiān)聽(tīng)時(shí),它們是不共享的。
Subscribing一個(gè)Observable像調(diào)用一個(gè)函數(shù)一樣,當(dāng)一個(gè)數(shù)據(jù)被傳遞時(shí)提供一個(gè)回調(diào)
這個(gè)addEventListener/removeEventListener這樣的API完全不一樣。observable.subscribe作為一個(gè)給定的觀察者,在Observable中并沒(méi)有像listener一樣被注冊(cè)。Observable甚至不需要維護(hù)一系列的Observers。
Executing observables
代碼Observable.create(function subscribe(observer) {...})代表了一個(gè)"Observable execution",它將僅僅在每個(gè)Observer的subscribes的延遲計(jì)算中。隨著時(shí)間的推移,將產(chǎn)生多個(gè)結(jié)果,同步或者異步。
Observable可以傳遞的有三種類(lèi)型:
- "Next" notification:傳遞一個(gè)數(shù)值,諸如Number、String、Object等
- “Error” notification:傳遞一個(gè)js異常
- "Complete" notification:什么值都不傳遞
Next notifications是最重要的也是最常見(jiàn)的類(lèi)型:它們表示一個(gè)實(shí)際數(shù)據(jù)被送到Observer。在Observable Execute執(zhí)行期間Error和Complete最多會(huì)發(fā)生一次。
下面的語(yǔ)法是在Observable Grammar or Contract中最好的表達(dá):
next*(error|complete)?
在一個(gè)Observable Execute中,0或多個(gè)Next notifications可能被傳遞。如果有error或者Complete被傳遞,剩下的next將不會(huì)被傳遞。
下面是Observable execute傳遞3個(gè)Next notifications的例子:
var observable = Rx.Observable.create(function subscribe(observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
})
下面的例子中,Next notification 4不會(huì)被傳遞:
var observable = Rx.Observable.create(function subscribe(observer){
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
observer.next(4); // 不會(huì)被執(zhí)行
})
用tru/catch代碼快包裹起來(lái)是個(gè)好主意:
var observable = Rx.Observable.create(function subscribe(observer) {
try {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
} catch (err) {
observer.error(err); // delivers an error if it caught one
}
});
處理(Disposing)Observable Executions
Observable Executing的個(gè)數(shù)可能是無(wú)限個(gè),Observer中應(yīng)該處理有限個(gè)next,所以我們需要一個(gè)API來(lái)停止execution。因?yàn)閑xecution在每個(gè)Observer中都是獨(dú)立的,一旦Observer完成接收值,它必須有一個(gè)方法來(lái)停止executing。
當(dāng) observable.subscribe 被調(diào)用,Observer將被附加到一個(gè)新創(chuàng)建的Observable execution中,這次調(diào)用將返回一個(gè)對(duì)象,即Subscription:
var subscription = observable.subscribe(x => console.log(x));
Subscription代表了一個(gè)進(jìn)行中的executing,它有一個(gè)最小的API允許你取消execution??梢栽谶@里閱讀更多有關(guān)于 Subscription type here 的東西。使用 subscription.unsubscribe() 你可以取消正在進(jìn)行的execution:
var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
// Later:
subscription.unsubscribe();
當(dāng)我們使用create()創(chuàng)建一個(gè)Observable時(shí),我們必須定義execution怎么處理資源。你可以通過(guò)返回一個(gè)自定義的 unsubscribe 函數(shù)來(lái)實(shí)現(xiàn)該步驟。
var observable = Rx.Observable.create(function subscribe(observer){
var intervalID = setInterval(() => {
observer.next('hi')
});
return function unsubscribe(){
clearInterval(intervalID);
}
})
然后這樣來(lái)調(diào)用:
function subscribe(observer) {
var intervalID = setInterval(() => {
observer.next('hi');
}, 1000);
return function unsubscribe() {
clearInterval(intervalID);
};
}
var unsubscribe = subscribe({next: (x) => console.log(x)});
// Later:
unsubscribe(); // dispose the resources
Observer
什么是Observer?一個(gè)Observer是Observable傳遞過(guò)來(lái)的數(shù)據(jù)的customer。Observers是一個(gè)簡(jiǎn)單的一些列的回調(diào),next、error、和 complete用來(lái)傳遞數(shù)據(jù)。下面的例子展現(xiàn)了一個(gè)典型的Observer對(duì)象:
var observer = {
next: x => console.log('Observable got a next value: ' + x),
error: err => console.log('Observable got and error: ' + err),
complete: () => console.log('Observable got a complete notification')
};
為了使用Observalbe,提供了一個(gè)subscribe:
observable.subscribe(observer)
你也可以提供部分回調(diào):
var observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
};
當(dāng)你訂閱(subscribing)一個(gè)Observable時(shí),你也許僅僅只提供一個(gè)函數(shù)作為參數(shù):
observable.subscribe(x => console.log('Observer got a next value: ' + x));
在observable.subscribe的內(nèi)部,他將使用第一個(gè)回調(diào)創(chuàng)建一個(gè)Observer對(duì)象作為一個(gè)next handler。所有的callback類(lèi)型都可能被提供:
observable.subscribe(
x => console.log('Observer got a next value: ' + x),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
);
Subscription
什么是Subscription?一個(gè)Subscription代表了一個(gè)一次性的資源,通常表示的是一個(gè)Observable execution。一個(gè)Subscription有一個(gè)重要的方法,unsubscribe,它不需要參數(shù),僅僅是處理subscription的資源。在之前的RxJS版本中,Subscription被稱(chēng)作"Disposable"。
var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
// Later:
// This cancels the ongoing Observable execution which
// was started by calling subscribe with an Observer.
subscription.unsubscribe();
一個(gè)Subscription實(shí)質(zhì)上是一個(gè)unsubscribe()函數(shù),用來(lái)釋放資源或者取消一個(gè)Observable executions。
Subscriptions也可以放在一起,這樣會(huì)導(dǎo)致使用一個(gè)unsubscribe()將取消多個(gè)Observable executions。你可以這么做:
var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);
var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));
subscription.add(childSubscription);
setTimeout(() => {
// Unsubscribes BOTH subscription and childSubscription
subscription.unsubscribe();
}, 1000);
當(dāng)執(zhí)行時(shí),我們將看到如下輸出:
second: 0
first: 0
second: 1
first: 1
second: 2
Subscriptions有一個(gè)remove(otherSubscription)方法,用來(lái)移除關(guān)聯(lián)的Subscirption
Subject
什么是Subject?一個(gè)RxJS Subject是一個(gè)特殊類(lèi)型的Observable,它允許值可以多路廣播給多個(gè)Observers。普通的Observables是單路廣播(每個(gè)subscribed Observer擁有自己獨(dú)立的Observable execution),Subjects是多路廣播。
一個(gè)Subject像一個(gè)Observable,但是可以多路廣播給Observers。Subjects像Eventmitters:它們維持許多注冊(cè)過(guò)的監(jiān)聽(tīng)器。
每個(gè)subject是一個(gè)Observable。給定一個(gè)Subject,你可以通過(guò)提供一個(gè)Observer來(lái)訂閱(subscribe)它,然后開(kāi)始正常的接收值。從Observer的角度來(lái)看,他不能告知Observer的Observable execution到底是來(lái)自一個(gè)不同的單路傳播的Observable,還是來(lái)自Subject。
在Subject的內(nèi)部,subscribe并沒(méi)有調(diào)用一個(gè)新的execute去傳遞數(shù)據(jù)。它只是簡(jiǎn)單的注冊(cè)O(shè)bservers列表中的一個(gè)Observer,類(lèi)似于addListener的使用。
每個(gè)subject是一個(gè)Observer。他是擁有next(v),error(e)和complete()方法的對(duì)象。為了給Subject一個(gè)新值,只需要調(diào)用 next(theValue),他講多路傳播給注冊(cè)過(guò)的Observer。
在下面的例子中,我們?cè)赟ubject中注冊(cè)了兩個(gè)Observers,我們傳遞一些值給Subject:
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(1);
subject.next(2);
輸出:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
因?yàn)镾ubject同時(shí)也是一個(gè)Observer,這意味著你應(yīng)該提供一個(gè)Subject作為Observable的subscribe的參數(shù),像這樣:
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
var observable = Rx.Observable.from([1, 2, 3]);
observable.subscribe(subject); // You can subscribe providing a Subject
執(zhí)行如下:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
在上面的處理中,我們本質(zhì)上僅僅是通過(guò)Subject將一個(gè)單路廣播的Observable execution變?yōu)槎嗦窂V播的。這個(gè)演示展示了Subjects是怎樣將單路廣播變?yōu)槎嗦窂V播的。
這里有幾個(gè)特殊的Subject類(lèi)型:BehaviorSubject、ReplaySubject和AsyncSubject。
Multicasted Observables
一個(gè)"multicasted Observable"的實(shí)現(xiàn)是通過(guò)Subject的多個(gè)訂閱(subscribers)來(lái)實(shí)現(xiàn)的,然而一個(gè)"unicast Observable"僅僅只通知一個(gè)單一的Observer。
在后臺(tái),multicast是這樣操作的:Observers訂閱(subscribe)一個(gè)相關(guān)的Subject,Subject訂閱一個(gè)Observable源。
var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
next: (v) => console.log('observerAa: ' + v)
});
muticasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
// This is, under the hood, `source.subscribe(subject)`:
muticasted.connect();
Reference counting
手動(dòng)的調(diào)用connect()來(lái)處理Subscription是很麻煩的。通常,我們希望當(dāng)?shù)谝粋€(gè)Observer到達(dá)時(shí),能夠自動(dòng)connect,當(dāng)最后一個(gè)Observer被移除時(shí),自動(dòng)取消shared execution。
看看下面這些訂閱發(fā)生時(shí)的列表:
- 第一個(gè)Observer訂閱multicasted Observable
- multicasted observable連接
- next value 0被傳遞給第一個(gè)Observer
- 第二個(gè)Observer訂閱multicasted Observable
- next value 1被傳遞給第一個(gè)Observer
- next value 1被傳遞給第二個(gè)Observer
- 第一個(gè)Observer解除監(jiān)聽(tīng)
- next value2被傳遞給第二個(gè)Observer
- 第二個(gè)Observer解除監(jiān)聽(tīng)
- 與multicasted observable連接的Observable解除連接
看下面的代碼:
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;
subscription1 = multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();
setTimeout(() => {
subscription2 = multicasted.subscribe({
next: v => console.log('observerB: ' + v)
});
}, 600);
setTimeout(() => {
subscrption1.unscribe();
}, 1200);
// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
subscription2.unsubscribe();
subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);
如果我們希望避免一遍遍地調(diào)用connect(),我們可以使用ConnectableObservable的refCount()方法(reference counting),它返回一個(gè)Observable來(lái)跟蹤有多少個(gè)訂閱者(subscribers)。當(dāng)訂閱者從0增加到1時(shí),它將自動(dòng)調(diào)用connect(),只有當(dāng)訂閱者從1變?yōu)?時(shí),它才會(huì)disconnect。
看下面的例子:
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscrption1, subscription2, subscriptionConnect;
// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log('observerA: ' + v);
});
setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
}, 600);
setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);
// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);
執(zhí)行結(jié)果:
observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed
refCount()方法僅存在ConnectableObservable中,它返回一個(gè)Observable,而不是另外的ConnectableObservable。
BehaviorSubject
Subjects的一種變形是BehaviorSubject,它有一個(gè)"the current value" 的概念。它存儲(chǔ)了consumer最后一次執(zhí)行時(shí)的value,每當(dāng)一個(gè)Observer訂閱時(shí),它都會(huì)立即從BehaviorSubject接收一個(gè)"current value"。
例子:
var subject = new Rx.BehaviorSubject(0); // 0 is the inital value
subject.subscribe({
next: v => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: v = console.log('observerB: ' + v)
});
subject.next(3);
輸出:
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
ReplaySubject
功能和它的名字一樣:
var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers
subject.subscribe({
next: v => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: v => console.log('observerB: ' + v)
});
subject.next(5);
輸出:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5
你還可以指定一個(gè)以毫秒為單位的窗口事時(shí)間,除了buffer size之外,決定記錄的值可以重復(fù)(時(shí)間內(nèi))。
var subject = new Rx.ReplaySubject(100, 500);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
var i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: v => console.log('observerB: ' + v)
});
}, 1000)
下面的輸出中,第二個(gè)Observer在最后500ms內(nèi)得到的數(shù)值為3、 4、 5:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...
AsyncSubject
AsyncSubject表示只有最后一個(gè)Observable execution的值會(huì)被發(fā)送給observers,僅僅發(fā)生在執(zhí)行完成時(shí)
var subject = new Rx.AsyncSubject();
subject.subscrbe({
next: v => console.log('onbserverA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.complete();
輸出:
observerA: 5
observerB: 5
AsyncSubject類(lèi)似于一個(gè)last() operator,他等待complete通知來(lái)傳遞一個(gè)唯一的值。
Opeartors
RxJS最有用的一部分就是operators,即使Observable是最基礎(chǔ)的。Operators最基本的要點(diǎn)是允許復(fù)雜的代碼變得簡(jiǎn)單化。
什么是Operators?
Opeartors是Obsrevable的方法,就像map(),filter(),merge()等。當(dāng)它被調(diào)用時(shí),它們并不改變已經(jīng)存在的Observable,而是返回一個(gè)基于第一個(gè)Observable上新的Observable。
一個(gè)Operator本質(zhì)上是一個(gè)純函數(shù),它接收一個(gè)Observable,基于其上返回一個(gè)新的Observable。在下面的例子中,我們創(chuàng)建了一個(gè)自定義的operator方法:
function multiplayByTen(input){
var output = Rx.Observable.create(function subscribe(observer){
input.subscribe({
next: v => observer.next(10 * v),
error: err => observer.error(err),
complete: () => observer.complete()
});
});
return output;
}
var input = Rx.Observable.from([1, 2, 3 ,4]);
var output = multiplayByTen(input);
output.subscribe(x => console.log(x));
輸出為:
10
20
30
40
注意訂閱(subscribe)的輸出將導(dǎo)致輸入的Observable可觀測(cè)的變化。我們稱(chēng)這個(gè)為"operator subscription chain"。
Instance opeartors versus static operators
什么是instance operator?最常見(jiàn)的情況是當(dāng)你引用一個(gè)opeartors時(shí),我們假定實(shí)現(xiàn)了一個(gè)operator,它是Observable實(shí)例的一個(gè)方法。例如,如果multiplayByTen operator變成一個(gè)官方的operator,它看起來(lái)會(huì)是這樣:
Rx.Observable.prototype.multiplyByTen = function multiplyByTen(){
var input = this;
return Rx.subscrible.create(function subscribe(observer){
input.subccribe({
next: (v) => observer.next(10 * v),
error: (err) => observer.error(err),
complete: () => observer.complete()
});
});
}
Instance operators是一個(gè)實(shí)例運(yùn)算符,我們使用它來(lái)推斷可觀測(cè)的輸入。
注意,input observable不再是一個(gè)函數(shù)參數(shù):
var observable = Rx.Observable.from([1, 2, 3, 4]).multiplyByTen();
observable.subscribe(x => console.log(x));
什么是static operator?除了instance operators之外,static operators是直接附加在Observable類(lèi)上的方法。一個(gè)static operator使用內(nèi)部的this進(jìn)行操作,但是并不完全依賴(lài)它的參數(shù)。
static operators是附著在Observable類(lèi)上的純函數(shù),通常用于創(chuàng)建Observable
最常見(jiàn)的static operators類(lèi)型是Create Operators,他不是將一個(gè)Observable改變成另外一個(gè)Observable,它們簡(jiǎn)單的獲得一個(gè)non-Observable參數(shù),比如number,然后create一個(gè)新的Observable。
一個(gè)典型的例子是使用interval函數(shù)。它獲取一個(gè)數(shù)值(不是一個(gè)Observable)作為輸入?yún)?shù),然后輸出一個(gè)Observable:
var observable = Rx.Observable.interval(1000 /* number of milliseconds */)
創(chuàng)建一個(gè)creation operaor的另外一個(gè)例子是create,就是我們之前一直在使用的例子。 all static creation operators here
然而,static operators也許和普通的creation性質(zhì)不同。一些Combination Operator也許是靜態(tài)的,比如merge、combineLatest、concat等。將這些作為靜態(tài)是有意義的,因?yàn)樗鼈儼裮ultiple Observales作為輸入,不只是一個(gè),比如:
var observable1 = Rx.Observable.interval(1000);
var observable2 = Rx.Observable.interval(400);
var merged = Rx.Observable.merge(observable1, observable2);
Marble diagrams
為了解釋operators是如何工作的,光是文本解釋是不夠的。許多operators和時(shí)間有關(guān),它們可能會(huì)延遲執(zhí)行,例如,throttle等。圖標(biāo)往往能夠比文字更多表達(dá)清楚。Marble Diagrams能夠可視化的表現(xiàn)出operators是如何工作的,包括輸入的Observable(s),operator和它的參數(shù),以及輸出的Observable
在一個(gè)marble diagram中,隨著時(shí)間的流逝,它會(huì)描述值("marbles")在Observable execution上傳遞。
你可以在下面看到marble diagram的解析:

- 時(shí)間從左往右流動(dòng),代表input Observable的execution
- 這些代表Observable傳遞傳來(lái)的值
- 這個(gè)豎線表示"complete" notification,它表明Observable已經(jīng)成功完成了。
- 這個(gè)方框表示input Observable的operator(上圖)產(chǎn)生出的output Observable(下圖)。方框內(nèi)的文字表示轉(zhuǎn)變的屬性。
- 這個(gè)Observable是調(diào)用operator產(chǎn)生的
- 這個(gè)X代表output Observable發(fā)出的錯(cuò)誤,說(shuō)明因?yàn)槟承┰蚨惓=K止。
在這個(gè)網(wǎng)站的站點(diǎn),我們會(huì)廣泛的使用marble diagrams去解釋operators是如何工作的。它們也許在其他的地方也很有用,比如單元測(cè)試等。
選擇一個(gè)operator
你需要為你的程序選擇一個(gè)適當(dāng)?shù)膐perator嗎?先從下面的列表選擇一個(gè):
- 我已經(jīng)有了一個(gè)Observable
- 我想改變每個(gè)傳遞的值
- 我想選擇每個(gè)傳遞值的屬性
- 你應(yīng)該使用pluck
- 我想查看每個(gè)被傳遞的值,但是不影響它們
- 你應(yīng)該使用do
- 我想過(guò)濾某些值
- 基于一個(gè)自定義的邏輯
- 你應(yīng)該使用filter
- 基于一個(gè)自定義的邏輯
更多內(nèi)容參考官網(wǎng):Choose an operator
operators的分類(lèi)
參考官網(wǎng):Categories of operators
Scheduler
什么是Scheduler?當(dāng)一個(gè)subscription開(kāi)始工作或者notifications被傳遞,scheduler就會(huì)開(kāi)始調(diào)圖。它包含三個(gè)組件。
- 一個(gè)Scheduler是一個(gè)數(shù)據(jù)結(jié)構(gòu)(data structure)。它知道如何基于優(yōu)先級(jí)或者其它標(biāo)準(zhǔn)進(jìn)行存儲(chǔ),執(zhí)行隊(duì)列任務(wù)
- 一個(gè)Scheduler是一個(gè)執(zhí)行上下文(execution context)。它表示task在哪個(gè)地方,什么時(shí)候執(zhí)行()
- 一個(gè)Scheduler是一個(gè)(虛擬(virtual))時(shí)鐘。它基于scheduler上的getter方法now(),提出了一個(gè)"時(shí)間(time)"的概念。任務(wù)被安排在一個(gè)特殊的調(diào)度器中,它會(huì)遵守給它的時(shí)間。
看下面例子中,我們使用之前已經(jīng)寫(xiě)過(guò)的例子,同步傳遞數(shù)值1、2、 3,然后使用observerOn操作符來(lái)指定異步調(diào)度:
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
})
.observerOn(Rx.Scheduler.async);
console.log('just before subscribe');
observable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done')
});
console.log('just after subscribe');
輸出:
just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done
注意got value這個(gè)語(yǔ)句實(shí)在 just after subscribe只有打印輸出的,這和我們看到的代碼順序不一致。這時(shí)因?yàn)?observerOn(Rx.Scheduler.async)在Observable.create和最后一個(gè)Observer之間引入了一個(gè)代理的Observer。讓我們重新為一些標(biāo)識(shí)符取名,以便讓他們之間有著明顯的差別:
var observable = Rx.Observable.create(function (proxyObserver) {
proxyObserver.next(1);
proxyObserver.next(2);
proxyObserver.next(3);
proxyObserver.complete();
})
.observeOn(Rx.Scheduler.async);
var finalObserver = {
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done')
};
console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');
proxyObserver在observeOn(Rx.Scheduler.async)中創(chuàng)建,它的next(val)方法大概像下面這樣:
var proxyObserver = {
next: (val) => {
Rx.Scheduler.async.schedule(
(x) => finalObserver.next(x),
0 /* delay */,
val /* will be the x for the function above */
);
},
// ...
}
這有點(diǎn)兒像setTimeout或者setInterval是異步調(diào)度操作,即使給定的delay為0。按照慣例,在JS中,setTimeout(fn, 0)知道運(yùn)行fn函數(shù)的時(shí)機(jī)最早是下一次循環(huán)隊(duì)列初。這也說(shuō)明了為什么 got value 1是最后運(yùn)行的。
可以給Scheduler的schedule傳遞一個(gè)延時(shí)(delay)參數(shù),它可以讓Scheduler內(nèi)部的時(shí)鐘去延時(shí)到指定時(shí)間。Scheduler的時(shí)鐘和真實(shí)的時(shí)鐘沒(méi)有任何關(guān)系。它更類(lèi)似于延時(shí),而不是運(yùn)行指定的時(shí)間。
Scheduler類(lèi)型
異步Scheduler只是RxJS提供的一種Scheduler。通過(guò)使用Scheduler的靜態(tài)方法可以創(chuàng)建下面的類(lèi)型
| Scheduler | Purpose |
|---|---|
| null | 不使用Scheduler, notifications將會(huì)被同步和遞歸地交付給Observer。使用這個(gè)來(lái)進(jìn)行常量操作或者尾部遞歸操作 |
| Rx.Scheduler.queue | Schedules on a queue in the current event frame (trampoline scheduler). Use this for iteration operations. |
| Rx.Scheduler.asap | Schedules on the micro task queue, which uses the fastest transport mechanism available, either Node.js' process.nextTick() or Web Worker MessageChannel or setTimeout or others. Use this for asynchronous conversions. |
| Rx.Scheduler.async | Schedules work with setInterval. Use this for time-based operations. |
使用Schedulers
見(jiàn)Using Schedulers