給Android開發(fā)者的RxJava詳解:
- 與setOnClickListener的區(qū)別是:
setOnClickListener()訂閱以后,不是馬上回調(diào),而是在邏輯處理到位了該回調(diào)了再回調(diào)。而subscribe()是把計劃表的call()邏輯和回調(diào)邏輯提前寫好,在訂閱的一瞬間直接全發(fā)出去。 -
onStart()方法是在subscribe()里發(fā)出去的,因此只能和訂閱發(fā)生在同一線程,如果要切線程的話使用doOnSubscribe()。 - 在一個正確的事件處理順序中應(yīng)該在最后調(diào)用
onComplete()或onError()來標志事件序列結(jié)束。使用不完整回調(diào)其實也是可以的。 - 在不再使用時要調(diào)用
unsubscribe()來解除訂閱,因為Observable會持有Observer的引用,如果是Activity會引起內(nèi)存泄漏。 - Observer是一個接口,Subscriber才是實現(xiàn)類,最終Observer也是轉(zhuǎn)為Subscriber使用的。
- 默認是在哪個線程調(diào)用
subscribe(),就在哪個線程處理call()方法。但一般call()邏輯都是耗時的應(yīng)該在子線程執(zhí)行然后切回主線程。 - 事件產(chǎn)生和消費:
? subscribeOn(): 指定subscribe()所發(fā)生的線程,即訂閱和call()方法邏輯執(zhí)行的線程,或者叫做事件產(chǎn)生的線程。
? observeOn(): 指定Subscriber所運行在的線程,即onNext()回調(diào)所處的線程,或者叫做事件消費的線程。 - 變換,就是將事件序列中的對象或整個序列進行包裝、加工處理,轉(zhuǎn)換成不同的事件或事件序列重新發(fā)出(順序是不變的)。
-
flatMap()中使用傳入的事件對象創(chuàng)建一個Observable對象,并不發(fā)送這個Observable,而是將它激活,于是它開始發(fā)送事件。每一個創(chuàng)建出來的Observable發(fā)送的事件,都被匯入原始的總Observable,它負責(zé)將這些事件統(tǒng)一交給Subscriber的回調(diào)方法。
這就可以做嵌套異步邏輯,最原始的異步邏輯是依賴于很多其他異步邏輯的,可以等其他的都執(zhí)行完了自己再一并發(fā)送最終的回調(diào)。 - 各種變換雖然功能各有不同,但實質(zhì)上都是針對事件序列的處理和再發(fā)送。
- 變換的原理:
? 多種變換中,最下層的Subscriber會有一個最終類型,然后往上訂閱上層Observable,上層Observable知道下層Subscriber的類型但不知道調(diào)用順序;
? 因此上層Observable會再生成一個新的Subscriber,并在該Subscriber回調(diào)中發(fā)送下層Subscriber的回調(diào),這個新的Subscriber會發(fā)給上層用于訂閱上層的順序;
? 直到訂閱到最上層的Observable時,Subscriber已經(jīng)是最初的類型了,完了最上層的Observable是知道發(fā)送順序的,開始往下發(fā)送事件;
? 每一層收到事件時,將收到的類型通過本層func.call()轉(zhuǎn)為下層需要的類型,然后通過下層的Subscriber再發(fā)送出去,最終回調(diào)到最底層的Subscriber,類型也是最終的類型了。 - 如果需要對很多Observable做一組相同的
lift()變換時,可以將這組變換業(yè)務(wù)封裝到Observable.Transformer的call()方法里,然后給其余Observable.compose(liftAll)使用。 - 線程控制:
?observeOn()可以多次調(diào)用,每次調(diào)用后影響它之后的map()處理邏輯,因為事件消費是往下發(fā)的,每次切線程后,后面的onNext()線程就被改了;
?subscribeOn()其實也能多次調(diào)用,只不過最終以第一次的為準,因為事件產(chǎn)生是往上訂閱的,最終最上面的那次調(diào)用會決定最終的訂閱call()所發(fā)生的線程;
? 由于onStart()是在subscribe()方法里調(diào)用的,因此它改不了線程了只能跟subscribe()在同一線程;
? 而doOnSubscribe()是在Observable里調(diào)用的,因為事件產(chǎn)生是往上訂閱的,因此它的call()發(fā)生的線程處于離它最近的下面的subscribeOn()指定的線程。
? 還有一個doOnNext()方法會在onNext()發(fā)送之前調(diào)用,在此先不要observeOn(),執(zhí)行完一些預(yù)備邏輯后再observeOn(),用于發(fā)事件前處理一些前置邏輯(就是名字顧名思義點而已)。
創(chuàng)建Observable:
- 完整創(chuàng)建:
? create():完整創(chuàng)建1個被觀察者對象。 - 快速創(chuàng)建:
? just():快速創(chuàng)建1個被觀察者對象,最多10個事件。
? fromArray():快速創(chuàng)建1個被觀察者對象,數(shù)組,大于10個事件。
? fromIterable():快速創(chuàng)建1個被觀察者對象,list,大于10個事件。
? empty():僅發(fā)送Complete事件,直接通知完成 。
? error():僅發(fā)送Error事件,直接通知異常 。
? never():不發(fā)送任何事件。 - 延遲創(chuàng)建:
? defer():直到有觀察者訂閱時,才動態(tài)創(chuàng)建被觀察者對象和發(fā)送事件。
? timer():延遲指定時間后,發(fā)送1個數(shù)值0,Long類型。
? interval():每隔指定時間就發(fā)送事件,從0開始、無限遞增1的整數(shù)序列。
? intervalRange():每隔指定時間就發(fā)送事件,可指定發(fā)送的數(shù)據(jù)的數(shù)量,從0開始、無限遞增1的整數(shù)序列。
? range():連續(xù)發(fā)送1個事件序列,可指定范圍從0開始,無限遞增1的整數(shù)序列,無延遲發(fā)送事件。
? rangeLong():類似于range(),區(qū)別在于該方法支持數(shù)據(jù)類型為Long。
變換操作符:
- 常用變換符:
? Map():對被觀察者發(fā)送的每個事件都通過指定的函數(shù)處理,從而變換成另外一種事件。
? FlatMap():將被觀察者發(fā)送的事件序列進行拆分和單獨轉(zhuǎn)換,再合并成一個新的事件序列,整個一起發(fā)出去,事件序列可能會亂。
? ConcatMap():與FlatMap()的區(qū)別在于重新合并生成的事件序列與舊的序列順序是一致的,不會亂序。
? Buffer():定期從被觀察者需要發(fā)送的總事件序列中獲取一定數(shù)量的事件放到緩存區(qū)中,然后發(fā)送緩沖區(qū)的數(shù)據(jù),直到遍歷完為止。
組合/合并操作符:
- 組合多個被觀察者:
? concat()/concatArray():組合多個Observable一起發(fā)送數(shù)據(jù),合并后按發(fā)送順序串行執(zhí)行(一個一個發(fā)),concat()組合被觀察者數(shù)量<=4個,concatArray()可>4個。
? merge()/mergeArray():組合多個Observable一起發(fā)送數(shù)據(jù),合并后按時間線并行執(zhí)行(兩個兩個發(fā)),merge()組合Observable數(shù)量<=4個,mergeArray()可>4個
? concatDelayError()/ mergeDelayError():任一個Observable出錯以后先緩存起來,等每個被觀察者發(fā)送完事件后再回調(diào)onError()。
concat()/merge()與zip()區(qū)別:
? concat()/ merge()是每個Observable的事件都會發(fā)出來,然后可以在Subscriber的onNext()中處理(多次被調(diào)用),完了在onComplete()中處理最終邏輯。
? zip()可以將多個Observable發(fā)出的事件匯總起來再做一次統(tǒng)一的邏輯處理,然后在Subscriber的onNext()中處理的就是最終邏輯了(一次調(diào)用)。
- 合并多個事件:
? Zip():合并多個Observable發(fā)送的事件,生成一個新的事件序列(處理新的業(yè)務(wù)邏輯)并發(fā)送,多個Observable中以數(shù)量最少的那個為準。
? combineLatest():當(dāng)兩個Observable中的任何一個發(fā)送了數(shù)據(jù)后,將先發(fā)送了數(shù)據(jù)的Observable的最新(最后)一個數(shù)據(jù)與另外一個Observable發(fā)送的每個數(shù)據(jù)結(jié)合,按時間合并發(fā)送。
? combineLatestDelayError():可處理onError()情況。
? reduce():把被觀察者需要發(fā)送的事件聚合成1個事件并發(fā)送(階乘)。
? collect():將被觀察者Observable發(fā)送的數(shù)據(jù)事件收集到一個數(shù)據(jù)結(jié)構(gòu)里再發(fā)送。
zip()與combineLatest()的區(qū)別:
? zip()按個數(shù)合并,即1對1合并。
? combineLatest()按時間合并,即在同一個時間點上的合并。
- 發(fā)送事件前追加發(fā)送事件:
? startWith()/startWithArray():在一個被觀察者發(fā)送事件前,追加發(fā)送一些數(shù)據(jù)再發(fā)送,越晚追加的越先發(fā)送。
? count():只統(tǒng)計被觀察者發(fā)送事件的數(shù)量。
幾個操作符的區(qū)別:
- merge():可以合并多個Observable的輸出,它們的數(shù)據(jù)可能會交錯發(fā)射(concat可以保持順序)。如果某個原始Observable出現(xiàn)onError,merge后的Observable就會onError。
- mergeDelayError():原始observable出現(xiàn)onError時,錯誤通知會被保留,直到所有數(shù)據(jù)發(fā)射完畢后才執(zhí)行onError。
如果有多個原始observable出現(xiàn)了onError,這些onError通知會被合并成一個CompositeException ,保留在它的 List<Throwable> exceptions異常列表里。如果只有一個原始observable出現(xiàn)了onError,則會直接使用這個onError通知,而不會生成CompositeException。
有坑:
如果先mergeDelayError再指定線程的話,mergeDelayError沒有起到延遲通知onError的作用,第一個observable出現(xiàn)錯誤的時候,整個合并的observable也onError了,第二個observable無法輸出。但是如果改成每個observable單獨subscribeOn和observeOn,然后再mergeDelayError,就正常進行了。
RxJava一些使用場景:
- Scheduler線程切換:這種場景經(jīng)常會在“后臺線程取數(shù)據(jù),主線程展示”的模式中看見。
- 使用debounce做textSearch:用簡單的話講就是當(dāng)N個結(jié)點發(fā)生的時間太靠近(即發(fā)生的時間差小于設(shè)定的值T),debounce就會自動過濾掉前N-1個結(jié)點。
- Retrofit結(jié)合RxJava做網(wǎng)絡(luò)請求框架:RxJava和Retrofit如何結(jié)合來實現(xiàn)更簡潔的代碼。
- RxJava代替EventBus進行數(shù)據(jù)傳遞(RxBus):RxBus并不是一個庫,而是一種模式,是使用了RxJava的思想來達到EventBus的數(shù)據(jù)傳遞效果。
- 使用combineLatest合并最近N個結(jié)點:注冊的時候所有輸入信息(郵箱、密碼、電話號碼等)合法才點亮注冊按鈕。
- 使用merge合并兩個數(shù)據(jù)源:一組數(shù)據(jù)來自網(wǎng)絡(luò),一組數(shù)據(jù)來自文件,需要合并兩組數(shù)據(jù)一起展示。
- 使用concat和first做緩存:依次檢查memory、disk和network中是否存在數(shù)據(jù),任何一步一旦發(fā)現(xiàn)數(shù)據(jù)后面的操作都不執(zhí)行(取第一次的有效事件)。
- 使用timer做定時操作:2秒后輸出日志“hello world”,然后結(jié)束。
- 使用interval做周期性操作:每隔2秒輸出日志“helloworld”。
- 使用throttleFirst防止按鈕重復(fù)點擊:debounce也能達到同樣的效果。
- 使用schedulePeriodically做輪詢請求:Schedulers.newThread().createWorker().schedulePeriodically(new Action0())。
- RxJava進行數(shù)組/list的遍歷:Observable.from(names).subscribe(new Action1<String>())。
- 解決嵌套回調(diào)(callback hell)問題:NetworkService.getToken().flatMap().subscribe()。
- 響應(yīng)式的界面:比如勾選了某個checkbox,馬上執(zhí)行操作,自動更新對應(yīng)的preference。
以上部分內(nèi)容摘自:
《給 Android 開發(fā)者的 RxJava 詳解》——扔物線
Carson_Ho的簡書文章