概念
RxJS 是 Reactive Extensions for JavaScript 的縮寫,起源于 Reactive Extensions,是一個(gè)基于可觀測數(shù)據(jù)流 Stream 結(jié)合觀察者模式和迭代器模式的一種異步編程的應(yīng)用庫。RxJS 是 Reactive Extensions 在 JavaScript 上的實(shí)現(xiàn)。
注意!它跟React沒啥關(guān)系,筆者最初眼花把它看成了React.js的縮寫(恥辱?。。。。?/p>
對于陌生的技術(shù)而言,我們一般的思路莫過于,打開百度(google),搜索,然后查看官方文檔,或者從零散的博客當(dāng)中,去找尋能夠理解這項(xiàng)技術(shù)的信息。但在很多時(shí)候,僅從一些只言片語中,的確也很難真正了解到一門技術(shù)的來龍去脈。
本文將從學(xué)習(xí)的角度來解析這項(xiàng)技術(shù)具備的價(jià)值以及能給我們現(xiàn)有項(xiàng)目中帶來的好處。
背景
從開發(fā)者角度來看,對于任何一項(xiàng)技術(shù)而言,我們經(jīng)常會去談?wù)摰模^于以下幾點(diǎn):
- 應(yīng)用場景?
- 如何落地?
- 上手難易程度如何?
- 為什么需要它?它解決了什么問題?
針對以上問題,我們可以由淺入深的來刨析一下RxJS的相關(guān)理念。
應(yīng)用場景?
假設(shè)我們有這樣一個(gè)需求:
我們上傳一個(gè)大文件之后,需要實(shí)時(shí)監(jiān)聽他的進(jìn)度,并且待進(jìn)度進(jìn)行到 100 的時(shí)候停止監(jiān)聽。
對于一般的做法我們可以采用短輪詢的方式來實(shí)現(xiàn),在對于異步請求的封裝的時(shí)候,如果我們采用Promise的方式,那么我們一般的做法就可以采用編寫一個(gè)用于輪詢的方法,獲取返回值進(jìn)行處理,如果進(jìn)度沒有完成則延遲一定時(shí)間再次調(diào)用該方法,同時(shí)在出現(xiàn)錯(cuò)誤的時(shí)候需要捕獲錯(cuò)誤并處理。
顯然,這樣的處理方式無疑在一定程度上給開發(fā)者帶來了一定開發(fā)和維護(hù)成本,因?yàn)檫@個(gè)過程更像是我們在觀察一個(gè)事件,這個(gè)事件會多次觸發(fā)并讓我感知到,不僅如此還要具備取消訂閱的能力,Promise在處理這種事情時(shí)的方式其實(shí)并不友好,而RxJS對于異步數(shù)據(jù)流的管理就更加符合這種范式。
引用尤大的話:
我個(gè)人傾向于在適合 Rx 的地方用 Rx,但是不強(qiáng)求 Rx for everything。比較合適的例子就是比如多個(gè)服務(wù)端實(shí)時(shí)消息流,通過 Rx 進(jìn)行高階處理,最后到 view 層就是很清晰的一個(gè) Observable,但是 view 層本身處理用戶事件依然可以沿用現(xiàn)有的范式。
如何落地?
針對現(xiàn)有項(xiàng)目來說,如何與實(shí)際結(jié)合并保證原有項(xiàng)目的穩(wěn)定性也的確是我們應(yīng)該優(yōu)先考慮的問題,畢竟任何一項(xiàng)技術(shù)如果無法落地實(shí)踐,那么必然給我們帶來的收益是比較有限的。
這里如果你是一名使用Angular的開發(fā)者,或許你應(yīng)該知道Angular中深度集成了Rxjs,只要你使用Angular框架,你就不可避免的會接觸到 RxJs 相關(guān)的知識。
在一些需要對事件進(jìn)行更為精確控制的場景下,比如我們想要監(jiān)聽點(diǎn)擊事件 (click event),但點(diǎn)擊三次之后不再監(jiān)聽。
那么這個(gè)時(shí)候引入RxJS進(jìn)行功能開發(fā)是十分便利而有效的,讓我們能省去對事件的監(jiān)聽并且記錄點(diǎn)擊的狀態(tài),以及需要處理取消監(jiān)聽的一些邏輯上的心理負(fù)擔(dān)。
你也可以選擇為你的大型項(xiàng)目引入RxJS進(jìn)行數(shù)據(jù)流的統(tǒng)一管理規(guī)范,當(dāng)然也不要給本不適合RxJS理念的場景強(qiáng)加使用,這樣實(shí)際帶來的效果可能并不明顯。
上手難易程度如何?
如果你是一名具備一定開發(fā)經(jīng)驗(yàn)的JavaScript開發(fā)者,那么幾分鐘或許你就能將RxJS應(yīng)用到一些簡單的實(shí)踐中了。
為什么需要它?它解決了什么問題?
如果你是一名使用JavaScript的開發(fā)者,在面對眾多的事件處理,以及復(fù)雜的數(shù)據(jù)解析轉(zhuǎn)化時(shí),是否常常容易寫出十分低效的代碼或者是臃腫的判斷以及大量臟邏輯語句?
不僅如此,在JavaScript的世界里,就眾多處理異步事件的場景中來看,“麻煩” 兩個(gè)字似乎經(jīng)常容易被提起,我們可以先從JS的異步事件的處理方式發(fā)展史中來細(xì)細(xì)品味RxJS帶來的價(jià)值。
回調(diào)函數(shù)時(shí)代(callback)
使用場景:
- 事件回調(diào)
-
Ajax請求 Node API-
setTimeout、setInterval等異步事件回調(diào)
在上述場景中,我們最開始的處理方式就是在函數(shù)調(diào)用時(shí)傳入一個(gè)回調(diào)函數(shù),在同步或者異步事件完成之后,執(zhí)行該回調(diào)函數(shù)??梢哉f在大部分簡單場景下,采用回調(diào)函數(shù)的寫法無疑是很方便的,比如我們熟知的幾個(gè)高階函數(shù):
forEachmapfilter
[1, 2, 3].forEach(function (item, index) {
console.log(item, index);
})
他們的使用方式只需要我們傳入一個(gè)回調(diào)函數(shù)即可完成對一組數(shù)據(jù)的批量處理,很方便也很清晰明了。
但在一些復(fù)雜業(yè)務(wù)的處理中,我們?nèi)绻匀槐植粧仐壊环艞壍南敕B強(qiáng)的使用回調(diào)函數(shù)的方式就可能會出現(xiàn)下面的情況:
fs.readFile('a.txt', 'utf-8', function(err, data) {
fs.readFile('b.txt', 'utf-8', function(err, data1) {
fs.readFile('c.txt', 'utf-8', function(err, data2) {
})
})
})
當(dāng)然作為編寫者來說,你可能覺得說這個(gè)很清晰啊,沒啥不好的。但是如果再復(fù)雜點(diǎn)呢,如果調(diào)用的函數(shù)都不一樣呢,如果每一個(gè)回調(diào)里面的內(nèi)容都十分復(fù)雜呢。短期內(nèi)自己可能清楚為什么這么寫,目的是什么,但是過了一個(gè)月、三個(gè)月、一年后,你確定在眾多業(yè)務(wù)代碼中你還能找回當(dāng)初的本心嗎?
你會不會迫不及待的查找提交記錄,這是哪個(gè)憨批寫的,跟shit......,臥槽怎么是我寫的。
這時(shí)候,面對眾多開發(fā)者苦不堪言的回調(diào)地域,終于還是有人出來造福人類了......
Promise 時(shí)代
Promise最初是由社區(qū)提出(畢竟作為每天與奇奇怪怪的業(yè)務(wù)代碼打交道的我們來說,一直用回調(diào)頂不住了啊),后來官方正式在ES6中將其加入語言標(biāo)準(zhǔn),并進(jìn)行了統(tǒng)一規(guī)范,讓我們能夠原生就能new一個(gè)Promise。
就優(yōu)勢而言,Promise帶來了與回調(diào)函數(shù)不一樣的編碼方式,它采用鏈?zhǔn)秸{(diào)用,將數(shù)據(jù)一層一層往后拋,并且能夠進(jìn)行統(tǒng)一的異常捕獲,不像使用回調(diào)函數(shù)就直接炸了,還得在眾多的代碼中一個(gè)個(gè)try catch。
話不多說,看碼!
function readData(filePath) {
return new Promise((resolve, reject) => {
fs.readFile(filePath, 'utf-8', (err, data) => {
if (err) reject(err);
resolve(data);
})
});
}
readData('a.txt').then(res => {
return readData('b.txt');
}).then(res => {
return readData('c.txt');
}).then(res => {
return readData('d.txt');
}).catch(err => {
console.log(err);
})
對比一下,這種寫法會不會就更加符合我們正常的思維邏輯了,這種順序下,讓人看上去十分舒暢,也更利于代碼的維護(hù)。
優(yōu)點(diǎn):
狀態(tài)改變就不會再變,任何時(shí)候都能得到相同的結(jié)果
將異步事件的處理流程化,寫法更方便
缺點(diǎn):
無法取消
錯(cuò)誤無法被
try catch(但是可以使用.catch方式)當(dāng)處于
pending狀態(tài)時(shí)無法得知現(xiàn)在處在什么階段
雖然Promise的出現(xiàn)在一定程度上提高了我們處理異步事件的效率,但是在需要與一些同步事件的進(jìn)行混合處理時(shí)往往我們還需要面臨一些并不太友好的代碼遷移,我們需要把原本放置在外層的代碼移到Promise的內(nèi)部才能保證某異步事件完成之后再進(jìn)行繼續(xù)執(zhí)行。
Generator 函數(shù)
ES6 新引入了 Generator 函數(shù),可以通過 yield 關(guān)鍵字,把函數(shù)的執(zhí)行流掛起,為改變執(zhí)行流程提供了可能,從而為異步編程提供解決方案。形式上也是一個(gè)普通函數(shù),但有幾個(gè)顯著的特征:
function關(guān)鍵字與函數(shù)名之間有一個(gè)星號 "*" (推薦緊挨著function關(guān)鍵字)函數(shù)體內(nèi)使用
yield· 表達(dá)式,定義不同的內(nèi)部狀態(tài) (可以有多個(gè)yield`)直接調(diào)用
Generator函數(shù)并不會執(zhí)行,也不會返回運(yùn)行結(jié)果,而是返回一個(gè)遍歷器對象(Iterator Object)依次調(diào)用遍歷器對象的
next方法,遍歷Generator函數(shù)內(nèi)部的每一個(gè)狀態(tài)
function* read(){
let a= yield '666';
console.log(a);
let b = yield 'ass';
console.log(b);
return 2
}
let it = read();
console.log(it.next());
console.log(it.next());
console.log(it.next());
console.log(it.next());
這種模式的寫法我們可以自由的控制函數(shù)的執(zhí)行機(jī)制,在需要的時(shí)候再讓函數(shù)執(zhí)行,但是對于日常項(xiàng)目中來說,這種寫法也是不夠友好的,無法給與使用者最直觀的感受。
async / await
相信在經(jīng)過許多面試題的洗禮后,大家或多或少應(yīng)該也知道這玩意其實(shí)就是一個(gè)語法糖,內(nèi)部就是把Generator函數(shù)與自動(dòng)執(zhí)行器co進(jìn)行了結(jié)合,讓我們能以同步的方式編寫異步代碼,十分暢快。
有一說一,這玩意著實(shí)好用,要不是要考慮兼容性,真就想大面積使用這種方式。
再來看看用它編寫的代碼有多快樂:
async readFileData() {
const data = await Promise.all([
'異步事件一',
'異步事件二',
'異步事件三'
]);
console.log(data);
}
直接把它當(dāng)作同步方式來寫,完全不要考慮把一堆代碼復(fù)制粘貼的一個(gè)其他異步函數(shù)內(nèi)部,屬實(shí)簡潔明了。
RxJS
它在使用方式上,跟Promise有點(diǎn)像,但在能力上比Promise強(qiáng)大多了,不僅僅能夠以流的形式對數(shù)據(jù)進(jìn)行控制,還內(nèi)置許許多多的內(nèi)置工具方法讓我們能十分方便的處理各種數(shù)據(jù)層面的操作,讓我們的代碼如絲一般順滑。
優(yōu)勢:
- 代碼量的大幅度減少
- 代碼可讀性的提高
- 很好的處理異步
- 事件管理、調(diào)度引擎
- 十分豐富的操作符
- 聲明式的編程風(fēng)格
function readData(filePath) {
return new Observable((observer) => {
fs.readFile(filePath, 'utf-8', (err, data) => {
if (err) observer.error(err);
observer.next(data);
})
});
}
Rx.Observable
.forkJoin(readData('a.txt'), readData('b.txt'), readData('c.txt'))
.subscribe(data => console.log(data));
這里展示的僅僅是RxJS能表達(dá)能量的冰山一角,對于這種場景的處理辦法還有多種方式。RxJS 擅長處理異步數(shù)據(jù)流,而且具有豐富的庫函數(shù)。對于RxJS而言,他能將任意的Dom事件,或者是Promise轉(zhuǎn)換成observables。
前置知識點(diǎn)
在正式進(jìn)入RxJS的世界之前,我們首先需要明確和了解幾個(gè)概念:
- 響應(yīng)式編程(
Reactive Programming) - 流(
Stream) - 觀察者模式(發(fā)布訂閱)
- 迭代器模式
響應(yīng)式編程(Reactive Programming)
響應(yīng)式編程(Reactive Programming),它是一種基于事件的模型。在上面的異步編程模式中,我們描述了兩種獲得上一個(gè)任務(wù)執(zhí)行結(jié)果的方式,一個(gè)就是主動(dòng)輪訓(xùn),我們把它稱為 Proactive 方式。另一個(gè)就是被動(dòng)接收反饋,我們稱為 Reactive。簡單來說,在 Reactive 方式中,上一個(gè)任務(wù)的結(jié)果的反饋就是一個(gè)事件,這個(gè)事件的到來將會觸發(fā)下一個(gè)任務(wù)的執(zhí)行。
響應(yīng)式編程的思路大概如下:你可以用包括 Click 和 Hover 事件在內(nèi)的任何東西創(chuàng)建 Data stream(也稱 “流”,后續(xù)章節(jié)詳述)。Stream 廉價(jià)且常見,任何東西都可以是一個(gè) Stream:變量、用戶輸入、屬性、Cache、數(shù)據(jù)結(jié)構(gòu)等等。舉個(gè)例子,想像一下你的 Twitter feed 就像是 Click events 那樣的 Data stream,你可以監(jiān)聽它并相應(yīng)的作出響應(yīng)。
結(jié)合實(shí)際,如果你使用過Vue,必然能夠第一時(shí)間想到,Vue的設(shè)計(jì)理念不也是一種響應(yīng)式編程范式么,我們在編寫代碼的過程中,只需要關(guān)注數(shù)據(jù)的變化,不必手動(dòng)去操作視圖改變,這種Dom層的修改將隨著相關(guān)數(shù)據(jù)的改變而自動(dòng)改變并重新渲染。
流(Stream)
流作為概念應(yīng)該是語言無關(guān)的。文件IO流,Unix系統(tǒng)標(biāo)準(zhǔn)輸入輸出流,標(biāo)準(zhǔn)錯(cuò)誤流 (stdin, stdout, stderr),還有一開始提到的 TCP 流,還有一些 Web 后臺技術(shù)(如Nodejs)對HTTP請求 / 響應(yīng)流的抽象,都可以見到流的概念。
作為響應(yīng)式編程的核心,流的本質(zhì)是一個(gè)按時(shí)間順序排列的進(jìn)行中事件的序列集合。
對于一流或多個(gè)流來說,我們可以對他們進(jìn)行轉(zhuǎn)化,合并等操作,生成一個(gè)新的流,在這個(gè)過程中,流是不可改變的,也就是只會在原來的基礎(chǔ)返回一個(gè)新的stream。
觀察者模式
在眾多設(shè)計(jì)模式中,觀察者模式可以說是在很多場景下都有著比較明顯的作用。
觀察者模式是一種行為設(shè)計(jì)模式, 允許你定義一種訂閱機(jī)制, 可在對象事件發(fā)生時(shí)通知多個(gè) “觀察” 該對象的其他對象。
用實(shí)際的例子來理解,就比如你訂了一個(gè)銀行卡余額變化短信通知的服務(wù),那么這個(gè)時(shí)候,每次只要你轉(zhuǎn)賬或者是購買商品在使用這張銀行卡消費(fèi)之后,銀行的系統(tǒng)就會給你推送一條短信,通知你消費(fèi)了多少多少錢,這種其實(shí)就是一種觀察者模式,又稱發(fā)布 - 訂閱模式。
在這個(gè)過程中,銀行卡余額就是被觀察的對象,而用戶就是觀察者。
優(yōu)點(diǎn):
- 降低了目標(biāo)與觀察者之間的耦合關(guān)系,兩者之間是抽象耦合關(guān)系。
- 符合依賴倒置原則。
- 目標(biāo)與觀察者之間建立了一套觸發(fā)機(jī)制。
- 支持廣播通信
不足:
- 目標(biāo)與觀察者之間的依賴關(guān)系并沒有完全解除,而且有可能出現(xiàn)循環(huán)引用。
- 當(dāng)觀察者對象很多時(shí),通知的發(fā)布會花費(fèi)很多時(shí)間,影響程序的效率。
迭代器模式
迭代器(Iterator)模式又叫游標(biāo)(Sursor)模式,在面向?qū)ο缶幊汤?,迭代器模式是一種設(shè)計(jì)模式,是一種最簡單也最常見的設(shè)計(jì)模式。迭代器模式可以把迭代的過程從從業(yè)務(wù)邏輯中分離出來,它可以讓用戶透過特定的接口巡訪容器中的每一個(gè)元素而不用了解底層的實(shí)現(xiàn)。
const iterable = [1, 2, 3];
const iterator = iterable[Symbol.iterator]();
iterator.next();
iterator.next();
iterator.next();
iterator.next();
作為前端開發(fā)者來說,我們最常遇到的部署了iterator接口的數(shù)據(jù)結(jié)構(gòu)不乏有:Map、Set、Array、類數(shù)組等等,我們在使用他們的過程中,均能使用同一個(gè)接口訪問每個(gè)元素就是運(yùn)用了迭代器模式。
Iterator作用:
- 為各種數(shù)據(jù)結(jié)構(gòu),提供一個(gè)統(tǒng)一的、簡便的訪問接口;
- 使得數(shù)據(jù)結(jié)構(gòu)的成員能夠按某種次序排列;
- 為新的遍歷語法
for...of實(shí)現(xiàn)循環(huán)遍歷
在許多文章中,有人會喜歡把迭代器和遍歷器混在一起進(jìn)行概念解析,其實(shí)他們表達(dá)的含義是一致的,或者可以說(迭代器等于遍歷器)。
Observable
表示一個(gè)概念,這個(gè)概念是一個(gè)可調(diào)用的未來值或事件的集合。它能被多個(gè)observer訂閱,每個(gè)訂閱關(guān)系相互獨(dú)立、互不影響。
舉個(gè)栗子:
假設(shè)你訂閱了一個(gè)博客或者是推送文章的服務(wù)號(微信公眾號之類的),之后只要公眾號更新了新的內(nèi)容,那么該公眾號就會把新的文章推送給你,在這段關(guān)系中,這個(gè)公眾號就是一個(gè)Observable,用來產(chǎn)生數(shù)據(jù)的數(shù)據(jù)源。
相信看完上面的描述,你應(yīng)該對Observable是個(gè)什么東西有了一定的了解了,那么這就好辦了,下面我們來看看在RxJS中如何創(chuàng)建一個(gè)Observable。
const Rx = require('rxjs/Rx')
const myObservable = Rx.Observable.create(observer => {
observer.next('foo');
setTimeout(() => observer.next('bar'), 1000);
});
我們可以調(diào)用Observable.create方法來創(chuàng)建一個(gè)Observable,這個(gè)方法接受一個(gè)函數(shù)作為參數(shù),這個(gè)函數(shù)叫做 producer 函數(shù), 用來生成 Observable 的值。這個(gè)函數(shù)的入?yún)⑹?observer,在函數(shù)內(nèi)部通過調(diào)用 observer.next() 便可生成有一系列值的一個(gè) Observable。
我們先不應(yīng)理會observer是個(gè)什么東西,從創(chuàng)建一個(gè)Observable的方式來看,其實(shí)也就是調(diào)用一個(gè)API的事,十分簡單,這樣一個(gè)簡單的Observable對象就創(chuàng)建出來了。
Observer
一個(gè)回調(diào)函數(shù)的集合,它知道如何去監(jiān)聽由Observable提供的值。Observer在信號流中是一個(gè)觀察者(哨兵)的角色,它負(fù)責(zé)觀察任務(wù)執(zhí)行的狀態(tài)并向流中發(fā)射信號。
這里我們簡單實(shí)現(xiàn)一下內(nèi)部的構(gòu)造:
const observer = {
next: function(value) {
console.log(value);
},
error: function(error) {
console.log(error)
},
complete: function() {
console.log('complete')
}
}
在RxJS中,Observer是可選的。在next、error 和 complete處理邏輯部分缺失的情況下,Observable仍然能正常運(yùn)行,為包含的特定通知類型的處理邏輯會被自動(dòng)忽略。
比如我們可以這樣定義:
const observer = {
next: function(value) {
console.log(value);
},
error: function(error) {
console.log(error)
}
}
它依舊是可以正常的運(yùn)行。
那么它又是怎么來配合我們在實(shí)際戰(zhàn)斗中使用的呢:
const myObservable = Rx.Observable.create((observer) => {
observer.next('111')
setTimeout(() => {
observer.next('777')
}, 3000)
})
myObservable.subscribe((text) => console.log(text));
這里直接使用subscribe方法讓一個(gè)observer訂閱一個(gè)Observable,我們可以看看這個(gè)subscribe的函數(shù)定義來看看怎么實(shí)現(xiàn)訂閱的:
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription;
源碼是用ts寫的,代碼即文檔,十分清晰,這里筆者給大家解讀一下,我們從入?yún)砜?,從左至右依次?code>next、error,complete,且是可選的,我們可以自己選擇性的傳入相關(guān)回調(diào),從這里也就印證了我們上面所說next、error 和 complete處理邏輯部分缺失的情況下仍可以正常運(yùn)行,因?yàn)樗麄兌际强蛇x的。
Subscription 與 Subject
Subscription
Subscription就是表示Observable的執(zhí)行,可以被清理。這個(gè)對象最常用的方法就是unsubscribe方法,它不需要任何參數(shù),只是用來清理由Subscription占用的資源。同時(shí),它還有add方法可以使我們?nèi)∠鄠€(gè)訂閱。
const myObservable = Rx.Observable.create(observer => {
observer.next('foo');
setTimeout(() => observer.next('bar'), 1000);
});
const subscription = myObservable.subscribe(x => console.log(x));
subscription.unsubscribe();
Subject (主體)
它是一個(gè)代理對象,既是一個(gè) Observable 又是一個(gè) Observer,它可以同時(shí)接受 Observable 發(fā)射出的數(shù)據(jù),也可以向訂閱了它的 observer 發(fā)射數(shù)據(jù),同時(shí),Subject 會對內(nèi)部的 observers 清單進(jìn)行多播 (multicast)
Subjects 是將任意 Observable 執(zhí)行共享給多個(gè)觀察者的唯一方式
這個(gè)時(shí)候眼尖的讀者會發(fā)現(xiàn),這里產(chǎn)生了一個(gè)新概念——多播。
- 那么多播又是什么呢?
- 有了多播是不是還有單播?
- 他們的區(qū)別又是什么呢?
接下來就讓筆者給大家好好分析這兩個(gè)概念吧。
單播
普通的Observable 是單播的,那么什么是單播呢?
單播的意思是,每個(gè)普通的 Observables 實(shí)例都只能被一個(gè)觀察者訂閱,當(dāng)它被其他觀察者訂閱的時(shí)候會產(chǎn)生一個(gè)新的實(shí)例。也就是普通 Observables 被不同的觀察者訂閱的時(shí)候,會有多個(gè)實(shí)例,不管觀察者是從何時(shí)開始訂閱,每個(gè)實(shí)例都是從頭開始把值發(fā)給對應(yīng)的觀察者。
const Rx = require('rxjs/Rx')
const source = Rx.Observable.interval(1000).take(3);
source.subscribe((value) => console.log('A ' + value))
setTimeout(() => {
source.subscribe((value) => console.log('B ' + value))
}, 1000)
看到陌生的調(diào)用不要慌,后面會進(jìn)行詳細(xì)解析,這里的source你可以理解為就是一個(gè)每隔一秒發(fā)送一個(gè)從 0 開始遞增整數(shù)的Observable就行了,且只會發(fā)送三次(take操作符其實(shí)也就是限定拿多少個(gè)數(shù)就不在發(fā)送數(shù)據(jù)了。)。
從這里我們可以看出兩個(gè)不同觀察者訂閱了同一個(gè)源(source),一個(gè)是直接訂閱,另一個(gè)延時(shí)一秒之后再訂閱。
從打印的結(jié)果來看,A從 0 開始每隔一秒打印一個(gè)遞增的數(shù),而B延時(shí)了一秒,然后再從 0 開始打印,由此可見,A與B的執(zhí)行是完全分開的,也就是每次訂閱都創(chuàng)建了一個(gè)新的實(shí)例。
在許多場景下,我們可能會希望B能夠不從最初始開始接受數(shù)據(jù),而是接受在訂閱的那一刻開始接受當(dāng)前正在發(fā)送的數(shù)據(jù),這就需要用到多播能力了。
多播
那么如果實(shí)現(xiàn)多播能力呢,也就是實(shí)現(xiàn)我們不論什么時(shí)候訂閱只會接收到實(shí)時(shí)的數(shù)據(jù)的功能。
可能這個(gè)時(shí)候會有小伙伴跳出來了,直接給個(gè)中間人來訂閱這個(gè)源,然后將數(shù)據(jù)轉(zhuǎn)發(fā)給A和B不就行了?
const source = Rx.Observable.interval(1000).take(3);
const subject = {
observers: [],
subscribe(target) {
this.observers.push(target);
},
next: function(value) {
this.observers.forEach((next) => next(value))
}
}
source.subscribe(subject);
subject.subscribe((value) => console.log('A ' + value))
setTimeout(() => {
subject.subscribe((value) => console.log('B ' + value))
}, 1000)
先分析一下代碼,A和B的訂閱和單播里代碼并無差別,唯一變化的是他們訂閱的對象由source變成了subject,然后再看看這個(gè)subject包含了什么,這里做了一些簡化,移除了error、complete這樣的處理函數(shù),只保留了next,然后內(nèi)部含有一個(gè)observers數(shù)組,這里包含了所有的訂閱者,暴露一個(gè)subscribe用于觀察者對其進(jìn)行訂閱。
在使用過程中,讓這個(gè)中間商subject來訂閱source,這樣便做到了統(tǒng)一管理,以及保證數(shù)據(jù)的實(shí)時(shí)性,因?yàn)楸举|(zhì)上對于source來說只有一個(gè)訂閱者。
這里主要是方便理解,簡易實(shí)現(xiàn)了RxJS中的Subject的實(shí)例,這里的中間人可以直接換成RxJS的Subject類實(shí)例,效果是一樣的
const source = Rx.Observable.interval(1000).take(3);
const subject = new Rx.Subject();
source.subscribe(subject);
subject.subscribe((value) => console.log('A ' + value))
setTimeout(() => {
subject.subscribe((value) => console.log('B ' + value))
}, 1000)
同樣先來看看打印的結(jié)果是否符合預(yù)期,首先A的打印結(jié)果并無變化,B首次打印的數(shù)字現(xiàn)在是從 1 開始了,也就當(dāng)前正在傳輸?shù)臄?shù)據(jù),這下滿足了我們需要獲取實(shí)時(shí)數(shù)據(jù)的需求了。
不同于單播訂閱者總是需要從頭開始獲取數(shù)據(jù),多播模式能夠保證數(shù)據(jù)的實(shí)時(shí)性。
除了以上這些,RxJS還提供了Subject的三個(gè)變體:
BehaviorSubjectReplaySubjectAsyncSubject
BehaviorSubject
BehaviorSubject 是一種在有新的訂閱時(shí)會額外發(fā)出最近一次發(fā)出的值的Subject。
同樣我們結(jié)合現(xiàn)實(shí)場景來進(jìn)行理解,假設(shè)有我們需要使用它來維護(hù)一個(gè)狀態(tài),在它變化之后給所有重新訂閱的人都能發(fā)送一個(gè)當(dāng)前狀態(tài)的數(shù)據(jù),這就好比我們要實(shí)現(xiàn)一個(gè)計(jì)算屬性,我們只關(guān)心該計(jì)算屬性最終的狀態(tài),而不關(guān)心過程中變化的數(shù),那么又該怎么處理呢?
我們知道普通的Subject只會在當(dāng)前有新數(shù)據(jù)的時(shí)候發(fā)送當(dāng)前的數(shù)據(jù),而發(fā)送完畢之后就不會再發(fā)送已發(fā)送過的數(shù)據(jù),那么這個(gè)時(shí)候我們就可以引入BehaviorSubject來進(jìn)行終態(tài)維護(hù)了,因?yàn)橛嗛喠嗽搶ο蟮挠^察者在訂閱的同時(shí)能夠收到該對象發(fā)送的最近一次的值,這樣就能滿足我們上述的需求了。
然后再結(jié)合代碼來分析這種Subject應(yīng)用的場景:
const subject = new Rx.Subject();
subject.subscribe((value) => console.log('A:' + value))
subject.next(1);
subject.next(2);
setTimeout(() => {
subject.subscribe((value) => console.log('B:' + value));
}, 1000)
首先演示的是采用普通Subject來作為訂閱的對象,然后觀察者A在實(shí)例對象subject調(diào)用next發(fā)送新的值之前訂閱的,然后觀察者是延時(shí)一秒之后訂閱的,所以A接受數(shù)據(jù)正常,那么這個(gè)時(shí)候由于B在數(shù)據(jù)發(fā)送的時(shí)候還沒訂閱,所以它并沒有收到數(shù)據(jù)。
那么我們再來看看采用BehaviorSubject實(shí)現(xiàn)的效果:
const subject = new Rx.BehaviorSubject(0);
subject.subscribe((value: number) => console.log('A:' + value))
subject.next(1);
subject.next(2);
setTimeout(() => {
subject.subscribe((value: number) => console.log('B:' + value))
}, 1000)
同樣從打印的結(jié)果來看,與普通Subject的區(qū)別在于,在訂閱的同時(shí)源對象就發(fā)送了最近一次改變的值(如果沒改變則發(fā)送初始值),這個(gè)時(shí)候我們的B也如愿獲取到了最新的狀態(tài)。
這里在實(shí)例化BehaviorSubject的時(shí)候需要傳入一個(gè)初始值。
ReplaySubject
在理解了BehaviorSubject之后再來理解ReplaySubject就比較輕松了,ReplaySubject會保存所有值,然后回放給新的訂閱者,同時(shí)它提供了入?yún)⒂糜诳刂浦胤胖档臄?shù)量(默認(rèn)重放所有)。
什么?還不理解?看碼:
const subject = new Rx.ReplaySubject(2);
subject.next(0);
subject.next(1);
subject.next(2);
subject.subscribe((value: number) => console.log('A:' + value))
subject.next(3);
subject.next(4);
setTimeout(() => {
subject.subscribe((value: number) => console.log('B:' + value))
}, 1000)
我們先從構(gòu)造函數(shù)傳參來看,BehaviorSubject與ReplaySubject都需要傳入一個(gè)參數(shù),對BehaviorSubject來說是初始值,而對于ReplaySubject來說就是重放先前多少次的值,如果不傳入重放次數(shù),那么它將重放所有發(fā)射過的值。
從結(jié)果上看,如果你不傳入確定的重放次數(shù),那么實(shí)現(xiàn)的效果與之前介紹的單播效果幾乎沒有差別。
所以我們再分析代碼可以知道在訂閱的那一刻,觀察者們就能收到源對象前多少次發(fā)送的值。
AsyncSubject
AsyncSubject 只有當(dāng) Observable 執(zhí)行完成時(shí) (執(zhí)行complete()),它才會將執(zhí)行的最后一個(gè)值發(fā)送給觀察者,如果因異常而終止,AsyncSubject將不會釋放任何數(shù)據(jù),但是會向Observer傳遞一個(gè)異常通知。
AsyncSubject一般用的比較少,更多的還是使用前面三種。
const subject = new Rx.AsyncSubject();
subject.next(1);
subject.subscribe(res => {
console.log('A:' + res);
});
subject.next(2);
subject.subscribe(res => {
console.log('B:' + res);
});
subject.next(3);
subject.subscribe(res => {
console.log('C:' + res);
});
subject.complete();
subject.next(4);
從打印結(jié)果來看其實(shí)已經(jīng)很好理解了,也就是說對于所有的觀察者們來說,源對象只會在所有數(shù)據(jù)發(fā)送完畢也就是調(diào)用complete方法之后才會把最后一個(gè)數(shù)據(jù)返回給觀察者們。
這就好比小說里經(jīng)常有的,當(dāng)你要放技能的時(shí)候,先要打一套起手式,打完之后才會放出你的大招。
Cold-Observables 與 Hot-Observables
Cold Observables
Cold Observables 只有被 observers 訂閱的時(shí)候,才會開始產(chǎn)生值。是單播的,有多少個(gè)訂閱就會生成多少個(gè)訂閱實(shí)例,每個(gè)訂閱都是從第一個(gè)產(chǎn)生的值開始接收值,所以每個(gè)訂閱接收到的值都是一樣的。
如果大家想要參考Cold Observables相關(guān)代碼,直接看前面的單播示例就行了。
正如單播描述的能力,不管觀察者們什么時(shí)候開始訂閱,源對象都會從初始值開始把所有的數(shù)都發(fā)給該觀察者。
Hot Observables
Hot Observables 不管有沒有被訂閱都會產(chǎn)生值。是多播的,多個(gè)訂閱共享同一個(gè)實(shí)例,是從訂閱開始接受到值,每個(gè)訂閱接收到的值是不同的,取決于它們是從什么時(shí)候開始訂閱。
這里有幾種場景,我們可以逐一分析一下便于理解:
“加熱”
首先可以忽略代碼中出現(xiàn)的陌生的函數(shù),后面會細(xì)說。
const source = Rx.Observable.of(1, 2).publish();
source.connect();
source.subscribe((value) => console.log('A:' + value));
setTimeout(() => {
source.subscribe((value) => console.log('B:' + value));
}, 1000);
這里首先用Rx的操作符of創(chuàng)建了一個(gè)Observable,并且后面跟上了一個(gè)publish函數(shù),在創(chuàng)建完之后調(diào)用connect函數(shù)進(jìn)行開始數(shù)據(jù)發(fā)送。
最終代碼的執(zhí)行結(jié)果就是沒有任何數(shù)據(jù)打印出來,分析一下原因其實(shí)也比較好理解,由于開啟數(shù)據(jù)發(fā)送的時(shí)候還沒有訂閱,并且這是一個(gè)Hot Observables,它是不會理會你是否有沒有訂閱它,開啟之后就會直接發(fā)送數(shù)據(jù),所以A和B都沒有接收到數(shù)據(jù)。
當(dāng)然你這里如果把connect方法放到最后,那么最終的結(jié)果就是A接收到了,B還是接不到,因?yàn)?code>A在開啟發(fā)數(shù)據(jù)之前就訂閱了,而B還要等一秒。
更直觀的場景
正如上述多播所描述的,其實(shí)我們更多想看到的現(xiàn)象是能夠A和B兩個(gè)觀察者能夠都有接收到數(shù)據(jù),然后觀察數(shù)據(jù)的差別,這樣會方便理解。
這里直接換一個(gè)發(fā)射源:
const source = Rx.Observable.interval(1000).take(3).publish();
source.subscribe((value: number) => console.log('A:' + value));
setTimeout(() => {
source.subscribe((value: number) => console.log('B:' + value));
}, 3000);
source.connect();
這里我們利用interval配合take操作符每秒發(fā)射一個(gè)遞增的數(shù),最多三個(gè),然后這個(gè)時(shí)候的打印結(jié)果就更清晰了,A正常接收到了三個(gè)數(shù),B三秒之后才訂閱,所以只接收到了最后一個(gè)數(shù) 2,這種方式就是上述多播所描述的并無一二。
兩者對比
-
Cold Observables:舉個(gè)栗子會比較好理解一點(diǎn):比如我們上 B 站看番,更新了新番,我們不論什么時(shí)候去看,都能從頭開始看到完整的劇集,與其他人看不看毫無關(guān)聯(lián),互不干擾。 -
Hot Observables:這就好比我們上 B 站看直播,直播開始之后就直接開始播放了,不管是否有沒有訂閱者,也就是說如果你沒有一開始就訂閱它,那么你過一段時(shí)候后再去看,是不知道前面直播的內(nèi)容的。
上述代碼中出現(xiàn)的操作符解析
在創(chuàng)建Hot Observables時(shí)我們用到了publish與connect函數(shù)的結(jié)合,其實(shí)調(diào)用了publish操作符之后返回的結(jié)果是一個(gè)ConnectableObservable,然后該對象上提供了connect方法讓我們控制發(fā)送數(shù)據(jù)的時(shí)間。
-
publish:這個(gè)操作符把正常的Observable(Cold Observables)轉(zhuǎn)換成ConnectableObservable。 -
ConnectableObservable:ConnectableObservable是多播的共享Observable,可以同時(shí)被多個(gè)observers共享訂閱,它是Hot Observables。ConnectableObservable是訂閱者和真正的源頭Observables(上面例子中的interval,每隔一秒發(fā)送一個(gè)值,就是源頭Observables)的中間人,ConnectableObservable從源頭Observables接收到值然后再把值轉(zhuǎn)發(fā)給訂閱者。 -
connect():ConnectableObservable并不會主動(dòng)發(fā)送值,它有個(gè)connect方法,通過調(diào)用connect方法,可以啟動(dòng)共享ConnectableObservable發(fā)送值。當(dāng)我們調(diào)用ConnectableObservable.prototype.connect方法,不管有沒有被訂閱,都會發(fā)送值。訂閱者共享同一個(gè)實(shí)例,訂閱者接收到的值取決于它們何時(shí)開始訂閱。
其實(shí)這種手動(dòng)控制的方式還挺麻煩的,有沒有什么更加方便的操作方式呢,比如監(jiān)聽到有訂閱者訂閱了才開始發(fā)送數(shù)據(jù),一旦所有訂閱者都取消了,就停止發(fā)送數(shù)據(jù)?其實(shí)也是有的,讓我們看看引用計(jì)數(shù)(refCount):
引用計(jì)數(shù)
這里主要用到了publish結(jié)合refCount實(shí)現(xiàn)一個(gè) “自動(dòng)擋” 的效果。
const source = Rx.Observable.interval(1000).take(3).publish().refCount();
setTimeout(() => {
source.subscribe(data => { console.log("A:" + data) });
setTimeout(() => {
source.subscribe(data => { console.log("B:" + data) });
}, 1000);
}, 2000);
我們透過結(jié)果看本質(zhì),能夠很輕松的發(fā)現(xiàn),只有當(dāng)A訂閱的時(shí)候才開始發(fā)送數(shù)據(jù)(A拿到的數(shù)據(jù)是從 0 開始的),并且當(dāng)B訂閱時(shí),也是只能獲取到當(dāng)前發(fā)送的數(shù)據(jù),而不能獲取到之前的數(shù)據(jù)。
不僅如此,這種 “自動(dòng)擋” 當(dāng)所有訂閱者都取消訂閱的時(shí)候它就會停止再發(fā)送數(shù)據(jù)了。
Schedulers(調(diào)度器)
用來控制并發(fā)并且是中央集權(quán)的調(diào)度員,允許我們在發(fā)生計(jì)算時(shí)進(jìn)行協(xié)調(diào),例如 setTimeout 或 requestAnimationFrame 或其他。
- 調(diào)度器是一種數(shù)據(jù)結(jié)構(gòu)。 它知道如何根據(jù)優(yōu)先級或其他標(biāo)準(zhǔn)來存儲任務(wù)和將任務(wù)進(jìn)行排序。
- 調(diào)度器是執(zhí)行上下文。 它表示在何時(shí)何地執(zhí)行任務(wù) (舉例來說,立即的,或另一種回調(diào)函數(shù)機(jī)制 (比如
setTimeout或process.nextTick),或動(dòng)畫幀)。 - 調(diào)度器有一個(gè) (虛擬的) 時(shí)鐘。 調(diào)度器功能通過它的
getter方法now()提供了 “時(shí)間” 的概念。在具體調(diào)度器上安排的任務(wù)將嚴(yán)格遵循該時(shí)鐘所表示的時(shí)間。
學(xué)到這相信大家也已經(jīng)或多或少對RxJS有一定了解了,不知道大家有沒有發(fā)現(xiàn)一個(gè)疑問,前面所展示的代碼示例中有同步也有異步,而筆者卻沒有顯示的控制他們的執(zhí)行,他們的這套執(zhí)行機(jī)制到底是什么呢?
其實(shí)他們的內(nèi)部的調(diào)度就是靠的Schedulers來控制數(shù)據(jù)發(fā)送的時(shí)機(jī),許多操作符會預(yù)設(shè)不同的Scheduler,所以我們不需要進(jìn)行特殊處理他們就能良好的進(jìn)行同步或異步運(yùn)行。
const source = Rx.Observable.create(function (observer: any) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
console.log('訂閱前');
source.observeOn(Rx.Scheduler.async)
.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
console.log('訂閱后');
從打印結(jié)果上來看,數(shù)據(jù)的發(fā)送時(shí)機(jī)的確已經(jīng)由同步變成了異步,如果不進(jìn)行調(diào)度方式修改,那么 “訂閱后” 的打印應(yīng)該是在數(shù)據(jù)發(fā)送完畢之后才會執(zhí)行的。
看完示例之后我們再來研究這個(gè)調(diào)度器能做哪幾種調(diào)度:
queueasapasyncanimationFrame
queue
將每個(gè)下一個(gè)任務(wù)放在隊(duì)列中,而不是立即執(zhí)行
queue 延遲使用調(diào)度程序時(shí),其行為與 async 調(diào)度程序相同。
當(dāng)沒有延遲使用時(shí),它將同步安排給定的任務(wù) - 在安排好任務(wù)后立即執(zhí)行。但是,當(dāng)遞歸調(diào)用時(shí)(即在已調(diào)度的任務(wù)內(nèi)部),將使用隊(duì)列調(diào)度程序調(diào)度另一個(gè)任務(wù),而不是立即執(zhí)行,該任務(wù)將被放入隊(duì)列并等待當(dāng)前任務(wù)完成。
這意味著,當(dāng)您使用 queue 調(diào)度程序執(zhí)行任務(wù)時(shí),您確定它會在該調(diào)度程序調(diào)度的其他任何任務(wù)開始之前結(jié)束。
這個(gè)同步與我們平常理解的同步可能不太一樣,筆者當(dāng)時(shí)也都困惑了一會。
還是用一個(gè)官方的例子來講解這種調(diào)度方式是怎么理解吧:
import { queueScheduler } from 'rxjs';
queueScheduler.schedule(() => {
queueScheduler.schedule(() => console.log('second'));
console.log('first');
});
我們無需關(guān)注陌生的函數(shù)調(diào)用,我們這里著重于看這種調(diào)度方式與平常的同步調(diào)度的區(qū)別。
首先我們調(diào)用queueScheduler的schedule方法開始執(zhí)行,然后函數(shù)內(nèi)部又同樣再以同樣的方式調(diào)用(這里也可以改成遞歸,不過這里用這個(gè)示例去理解可能會好一點(diǎn)),并且傳入一個(gè)函數(shù),打印second。
然后繼續(xù)看下面的語句,一個(gè)普通的console.log('first'),然后我們再來看看打印結(jié)果:
是不是有點(diǎn)神奇,如果沒看明白為啥的,可以再回頭看看前面queue對于遞歸執(zhí)行的處理方式。也就是說如果遞歸調(diào)用,它內(nèi)部會維護(hù)一個(gè)隊(duì)列,然后等待先加入隊(duì)列的任務(wù)先執(zhí)行完成(也就是上面的console.log('first')執(zhí)行完才會執(zhí)行console.log('second'),因?yàn)?code>console.log('second')這個(gè)任務(wù)是后加入該隊(duì)列的)。
asap
內(nèi)部基于Promise實(shí)現(xiàn)(Node端采用process.nextTick),他會使用可用的最快的異步傳輸機(jī)制,如果不支持Promise或process.nextTick或者Web Worker的 MessageChannel也可能會調(diào)用setTimeout方式進(jìn)行調(diào)度。
async
與asap方式很像,只不過內(nèi)部采用setInterval 進(jìn)行調(diào)度,大多用于基于時(shí)間的操作符。
animationFrame
從名字看其實(shí)相信大家已經(jīng)就能略知一二了,內(nèi)部基于requestAnimationFrame來實(shí)現(xiàn)調(diào)度,所以執(zhí)行的時(shí)機(jī)將與window.requestAnimationFrame保持一致,適用于需要頻繁渲染或操作動(dòng)畫的場景。
Operators
Operator 概念
采用函數(shù)式編程風(fēng)格的純函數(shù) (pure function),使用像 map、filter、concat、flatMap 等這樣的操作符來處理集合。也正因?yàn)樗募兒瘮?shù)定義,所以我們可以知道調(diào)用任意的操作符時(shí)都不會改變已存在的Observable實(shí)例,而是會在原有的基礎(chǔ)上返回一個(gè)新的Observable。
盡管 RxJS 的根基是 Observable,但最有用的還是它的操作符。操作符是允許復(fù)雜的異步代碼以聲明式的方式進(jìn)行輕松組合的基礎(chǔ)代碼單元。