RxJava2.0的使用詳解
1,初識RxJava
RxJava就是一種用Java語言實現(xiàn)的響應(yīng)式編程,來創(chuàng)建基于事件的異步程序
RxJava是一個基于事件訂閱的異步執(zhí)行的一個類庫,目前比較火的一些技術(shù)框架!
1.1使用前所添加的依賴(build.gradle):
implementation 'io.reactivex.rxjava2:rxjava:2.1.6'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
1.2作用:
RxJava的目的就是異步。
RxJava的特點就是可以非常簡便的實現(xiàn)異步調(diào)用,可以在邏輯復(fù)雜的代碼邏輯中以比較輕易的方式實現(xiàn)異步調(diào)用。隨著邏輯的復(fù)雜,需求的更改,代碼可依然能保持極強的閱讀性
1.3概念:
RxJava是利用觀察者模式來實現(xiàn)一些列的操作,所以對于觀察者模式中的觀察者,被觀察者,以及訂閱、事件需要有一個了解.
Observable:在觀察者模式中稱為“被觀察者”;
Observer:觀察者模式中的“觀察者”,可接收Observable發(fā)送的數(shù)據(jù);
subscribe:訂閱,觀察者與被觀察者,通過Observable的subscribe()方法進行訂閱;
Subscriber:也是一種觀察者,在2.0中 它與Observer沒什么實質(zhì)的區(qū)別,不同的是 Subscriber要與Flowable(也是一種被觀察者)聯(lián)合使用,該部分 內(nèi)容是2.0新增的,后續(xù)文章再介紹。Obsesrver用于訂閱Observable,而Subscriber用于訂閱Flowable.
1.4觀察者模式的理解:
A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應(yīng).
在程序的觀察者模式,觀察者不需要時刻盯著被觀察者,而是采用注冊或者稱為訂閱的方式,告訴被觀察者:我需要你的某某狀態(tài),你要在它變化的時候通知我!
RxJava 有四個基本概念:
Observable (被觀察者)、
Observer (觀察者)、
subscribe (訂閱)
Observable 和 Observer 通過 subscribe() 方法實現(xiàn)訂閱關(guān)系,從而 Observable 可以在完成某些操作,獲得一些結(jié)果后,回調(diào)觸發(fā)事件,即 發(fā)出事件來通知 Observer。
關(guān)于回調(diào),如果理解則可以跳過這一段,如果不理解,在RxJava中可以簡單的理解為:為了方便Observable和Observer交互,在Observable中,將 Observer對象傳入,在完成某些操作后調(diào)用Observer對象的方法,此時將觸發(fā)Observer中具體實現(xiàn)的對應(yīng)方法。
注意:Observer是個接口,Observable是個類。
RxJava中定義的事件方法:
onNext(),普通事件,按照隊列依次進行處理.
onComplete(),事件隊列完結(jié)時調(diào)用該方法
onError(),事件處理過程中出現(xiàn)異常時,onError()觸發(fā),同時隊列終止,不再有事件發(fā)出.
onSubscribe(),RxJava 2.0 中新增的,傳遞參數(shù)為Disposable,可用于切斷接收事件
讓Observable (被觀察者)開啟子線程執(zhí)行耗操作,完成耗時操作后,觸發(fā)回調(diào),通知Observer (觀察者)進行主線程UI更新
2,簡單使用步驟:
步驟:
創(chuàng)建數(shù)據(jù)發(fā)射源,上游Observable
創(chuàng)建數(shù)據(jù)接收處,下游Observer
數(shù)據(jù)源關(guān)聯(lián)接收處,上游銜接下游!
3,Observable
數(shù)據(jù)發(fā)射源,可觀察的,被觀察的,
Observable有兩種形式啟動形式:
1熱啟動Observable任何時候都會發(fā)送消息,即使沒有任何觀察者監(jiān)聽它。
2冷啟動Observable只有在至少有一個訂閱者的時候才會發(fā)送消息
Observable的幾種創(chuàng)建方式:
01,just()方式
使用just( ),將創(chuàng)建一個Observable并自動調(diào)用onNext( )發(fā)射數(shù)據(jù)。
也就是通過just( )方式 直接觸發(fā)onNext(),just中傳遞的參數(shù)將直接在Observer的onNext()方法中接收到。02,fromIterable()方式
使用fromIterable(),遍歷集合,發(fā)送每個item.多次自動調(diào)用onNext()方法,每次傳入一個item.
注意:Collection接口是Iterable接口的子接口,所以所有Collection接口的實現(xiàn)類都可以作為Iterable對象直接傳入fromIterable() 方法。03,defer()方式
當(dāng)觀察者訂閱時,才創(chuàng)建Observable,并且針對每個觀察者創(chuàng)建都是一個新的Observable.
通過Callable中的回調(diào)方法call(),決定使用以何種方式來創(chuàng)建這個Observable對象,當(dāng)訂閱后,發(fā)送事件.04,interval( )方式
創(chuàng)建一個按固定時間間隔發(fā)射整數(shù)序列的Observable,可用作定時器。按照固定時間間隔來調(diào)用onNext()方法。05,timer( )方式
通過此種方式創(chuàng)建一個Observable,它在一個給定的延遲后發(fā)射一個特殊的值,即表示延遲指定時間后,調(diào)用onNext()方法。06,range( )方式,range(x,y)
創(chuàng)建一個發(fā)射特定整數(shù)序列的Observable,第一個參數(shù)x為起始值,第二個y為發(fā)送的個數(shù),如果y為0則不發(fā)送,y為負(fù)數(shù)則拋異常。
range(1,5)
上述表示發(fā)射1到5的數(shù)。即調(diào)用5次Next()方法,依次傳入1-5數(shù)字。07,repeat( )方式
創(chuàng)建一個Observable,該Observable的事件可以重復(fù)調(diào)用。
部分方法介紹:
表示下游不關(guān)心任何事件,你上游盡管發(fā)你的數(shù)據(jù)
Disposable subscribe()
表示下游只關(guān)心onNext事件,其他不管
Disposable subscribe(Consumer<? super T> onNext)
表示下游只關(guān)心onNext事件,onError事件
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
表示只關(guān)心onNext事件,onError事件,onComplete事件
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
表示處理所有事件
subscribe(Observer<? super T> observer)
ObservableEmitter
Emitter是發(fā)射器的意思,就是用來發(fā)出事件的,它可以發(fā)出三種類型的事件
- 通過調(diào)用onNext(T value),發(fā)出next事件
- 通過調(diào)用onComplete(),發(fā)出complete事件
- 通過調(diào)用onError(Throwable error),發(fā)出error事件
注意事項: - onComplete和onError唯一并且互斥
- 發(fā)送多個onComplete, 第一個onComplete接收到,就不再接收了.
- 發(fā)送多個onError, 則收到第二個onError事件會導(dǎo)致程序會崩潰.
- 不可以隨意亂七八糟發(fā)射事件,需要滿足一定的規(guī)則:
- 上游可以發(fā)送無限個onNext, 下游也可以接收無限個onNext.
- 當(dāng)上游發(fā)送了一個onComplete后, 上游onComplete之后的事件將會繼續(xù)發(fā)送, 而下游收到onComplete事件之后將不再繼續(xù)接收事件.
- 上游發(fā)送了一個onError后, 上游onError之后的事件將繼續(xù)發(fā)送, 而下游收到onError事件之后將不再繼續(xù)接收事件.
- 上游可以不發(fā)送onComplete或onError.
- 最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個onComplete, 也不能發(fā)多個onError, 也不能先發(fā)一個onComplete, 然后再發(fā)一個onError
Disposable
一次性,它理解成兩根管道之間的一個機關(guān), 當(dāng)調(diào)用它的dispose()方法時, 它就會將兩根管道切斷, 從而導(dǎo)致下游收不到事件.
在RxJava中,用它來切斷Observer(觀察者)與Observable(被觀察者)之間的連接,當(dāng)調(diào)用它的dispose()方法時, 它就會將Observer(觀察者)與Observable(被觀察者)之間的連接切斷, 從而導(dǎo)致Observer(觀察者)收不到事件。
注意:
調(diào)用dispose()并不會導(dǎo)致上游不再繼續(xù)發(fā)送事件, 上游會繼續(xù)發(fā)送剩余的事件
我們讓上游依次發(fā)送1,2,3,complete,4,在下游收到第二個事件之后, 切斷水管, 看看運行結(jié)果
Disposable的對象通過觀察者獲得,具體分為兩種方式
1,Observer接口
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//此方法接收到Disposable的實例!
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
通過創(chuàng)建Observer(觀察者)接口,重寫onSubscribe方法,當(dāng)訂閱后,建立與Observable(被觀察者)的聯(lián)系后,在onSubscribe(Disposable d)方法中便可以獲得Disposable對象。
2.Consumer等其他函數(shù)式接口
Disposable disposable = Observable.just("你好").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
});
當(dāng)調(diào)用Observable的subscribe()方法后直接返回一個Disposable 對象
6,線程控制——Scheduler
在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在哪個線程調(diào)用 subscribe(),就在哪個線程生產(chǎn)事件;在哪個線程生產(chǎn)事件,就在哪個線程消費事件。如果需要切換線程,就需要用到 Scheduler (調(diào)度器)。
Schedulers.immediate():
直接在當(dāng)前線程運行,相當(dāng)于不指定線程。這是默認(rèn)的Scheduler。
Schedulers.newThread():
總是啟用新線程,并在新線程執(zhí)行操作。
Schedulers.io(): I/O
操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的Scheduler。行為模式和newThread()差不多,區(qū)別在于io()的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下io()比newThread()更有效率。不要把計算工作放在io()中,可以避免創(chuàng)建不必要的線程。
Schedulers.computation():
計算所使用的Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個Scheduler使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在computation()中,否則 I/O 操作的等待時間會浪費 CPU。
AndroidSchedulers.mainThread(),
Android專用線程,指定操作在主線程運行。
如何切換線程呢?RxJava中提供了兩個方法:
subscribeOn() 和 observeOn() ,
兩者的不同點在于:
subscribeOn():
指定subscribe()訂閱所發(fā)生的線程,或者叫做事件產(chǎn)生的線程。
observeOn():
指定Observer所運行在的線程,即onNext()執(zhí)行的線程?;蛘呓凶鍪录M的線程。
7,以Consumer為例,我們可以實現(xiàn)簡便式的觀察者模式
Observable.just("hello").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
其中Consumer中的accept()方法接收一個來自O(shè)bservable的單個值。Consumer就是一個觀察者。其他函數(shù)式接口可以類似應(yīng)用
8,RxJava中的操作符
01,操作符就是用于在Observable和最終的Observer之間,通過轉(zhuǎn)換Observable為其他觀察者對象的過程,修改發(fā)出的事件,
最終將最簡潔的數(shù)據(jù)傳遞給Observer對象.
每次調(diào)用一次操作符,就進行一次被觀察者對象的改變,同時將需要傳遞的數(shù)據(jù)進行轉(zhuǎn)變,最終Observer對象獲得想要的數(shù)據(jù)。
以網(wǎng)絡(luò)加載為例,我們通過Observable開啟子線程,進行一些網(wǎng)絡(luò)請求獲取數(shù)據(jù)的操作,獲得到網(wǎng)絡(luò)數(shù)據(jù)后,然后通過操作符進行轉(zhuǎn)換,獲得我們想要的形式的數(shù)據(jù),然后傳遞給Observer對象
02,比較常用的操作符:
- map()操作符
map()操作符,就是把原來的Observable對象轉(zhuǎn)換成另一個Observable對象,同時將傳輸?shù)臄?shù)據(jù)進行一些靈活的操作,方便Observer獲得想要的數(shù)據(jù)形式。
舉例:
Observable<Integer> observable = Observable
.just("hello")
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return s.length();
}
});
- flatMap()操作符
flatMap()對于數(shù)據(jù)的轉(zhuǎn)換比map()更加徹底,如果發(fā)送的數(shù)據(jù)是集合,flatmap()重新生成一個Observable對象,并把數(shù)據(jù)轉(zhuǎn)換成Observer想 要的數(shù)據(jù)形式。它可以返回任何它想返回的Observable對象。
舉例:
Observable.just(list)
.flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
});
- filter()操作符
filter()操作符根據(jù)它的test()方法中,根據(jù)自己想過濾的數(shù)據(jù)加入相應(yīng)的邏輯判斷,返回true則表示數(shù)據(jù)滿足條件,返回false則表示數(shù)據(jù)需要被過濾。
最后過濾出的數(shù)據(jù)將加入到新的Observable對象中,方便傳遞給Observer想要的數(shù)據(jù)形式。
舉例:
Observable
.just(list)
.flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).filter(new Predicate<Object>() {
@Override
public boolean test(Object s) throws Exception {
String newStr = (String) s;
if (newStr.charAt(5) - '0' > 5) {
return true;
}
return false;
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println((String)o);
}
});
- take()操作符
輸出最多指定數(shù)量的結(jié)果.(接收指定數(shù)量的結(jié)果)
舉例:
Observable.just(new ArrayList<String>(){
{
for (int i = 0; i < 8; i++) {
add("data"+i);
}
}
}).flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).take(5).subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
DemonstrateUtil.showLogResult(s.toString());
}
});
- doOnNext()
允許我們在每次輸出一個元素之前做一些額外的事情
舉例:
Observable.just(new ArrayList<String>(){
{
for (int i = 0; i < 6; i++) {
add("data"+i);
}
}
}).flatMap(new Function<List<String>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).take(5).doOnNext(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
DemonstrateUtil.showLogResult("額外的準(zhǔn)備工作!");
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object s) throws Exception {
DemonstrateUtil.showLogResult(s.toString());
}
});
8,Flowable的理解
Flowable是一個被觀察者,與Subscriber(觀察者)配合使用,解決Backpressure問題
Backpressure(背壓)。所謂背壓,即生產(chǎn)者的速度大于消費者的速度帶來的問題。
什么情況下才會產(chǎn)生Backpressure問題?
- 1.如果生產(chǎn)者和消費者在一個線程的情況下,無論生產(chǎn)者的生產(chǎn)速度有多快,每生產(chǎn)一個事件都會通知消費者,等待消費者消費完畢,再生產(chǎn)下一個事件。
所以在這種情況下,根本不存在Backpressure問題。即同步情況下,Backpressure問題不存在。 - 2.如果生產(chǎn)者和消費者不在同一線程的情況下,如果生產(chǎn)者的速度大于消費者的速度,就會產(chǎn)生Backpressure問題。
即異步情況下,Backpressure問題才會存在。
現(xiàn)象演示說明:
被觀察者是事件的生產(chǎn)者,觀察者是事件的消費者.假如生產(chǎn)者無限生成事件,而消費者以很緩慢的節(jié)奏來消費事件,會造成事件無限堆積,形成背壓,最后造成OOM!
Flowable悠然而生,專門用來處理這類問題。
Flowable是為了應(yīng)對Backpressure而產(chǎn)生的。Flowable是一個被觀察者,
與Subscriber(觀察者)配合使用,解決Backpressure問題。
注意:處理Backpressure的策略僅僅是處理Subscriber接收事件的方式,并不影響Flowable發(fā)送事件的方法。
即使采用了處理Backpressure的策略,F(xiàn)lowable原來以什么樣的速度產(chǎn)生事件,現(xiàn)在還是什么樣的速度不會變化,主要處理的是Subscriber接收事件的方式。
處理Backpressure問題的策略,或者來解決Backpressure問題
BackpressureStrategy.ERROR
如果緩存池溢出,就會立刻拋出MissingBackpressureException異常
request()用來向生產(chǎn)者申請可以消費的事件數(shù)量,這樣我們便可以根據(jù)本身的消費能力進行消費事件.
雖然并不限制向request()方法中傳入任意數(shù)字,但是如果消費者并沒有這么多的消費能力,依舊會造成資源浪費,最后產(chǎn)生OOM
at java.lang.OutOfMemoryError.<init>(OutOfMemoryError.java:33)
在異步調(diào)用時,RxJava中有個緩存池,用來緩存消費者處理不了暫時緩存下來的數(shù)據(jù),緩存池的默認(rèn)大小為128,即只能緩存128個事件。
無論request()中傳入的數(shù)字比128大或小,緩存池中在剛開始都會存入128個事件。
當(dāng)然如果本身并沒有這么多事件需要發(fā)送,則不會存128個事件。
應(yīng)用舉例:
BackpressureStrategy.BUFFER
是把RxJava中默認(rèn)的只能存128個事件的緩存池?fù)Q成一個大的緩存池,支持存更多的數(shù)據(jù).
消費者通過request()即使傳入一個很大的數(shù)字,生產(chǎn)者也會生產(chǎn)事件,并將處理不了的事件緩存.
注意:
這種方式任然比較消耗內(nèi)存,除非是我們比較了解消費者的消費能力,能夠把握具體情況,不會產(chǎn)生OOM。
BUFFER要慎用
BackpressureStrategy.DROP
顧名思義,當(dāng)消費者處理不了事件,就丟棄!
例如,當(dāng)數(shù)據(jù)源創(chuàng)建了200個事件,先不進行消費臨時進行緩存實際緩存128個,我們第一次申請消費了100個,再次申請消費100個,
那么實際只消費了128個,而其余的72個被丟棄了!BackpressureStrategy.LATEST
LATEST與DROP功能基本一致,當(dāng)消費者處理不了事件,就丟棄!
唯一的區(qū)別就是LATEST總能使消費者能夠接收到生產(chǎn)者產(chǎn)生的最后一個事件。
例如,當(dāng)數(shù)據(jù)源創(chuàng)建了200個事件,先不進行消費臨時進行緩存,我們第一次申請消費了100個,再次申請消費100個,
那么實際只消費了129個,而其余的71個被丟棄了,但是第200個(最后一個)會被消費.
-
BackpressureStrategy.MISSING
生產(chǎn)的事件沒有進行緩存和丟棄,下游接收到的事件必須進行消費或者處理!在RxJava中會經(jīng)常遇到一種情況就是被觀察者發(fā)送消息十分迅速以至于觀察者不能及時的響應(yīng)這些消息
舉例:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
while (true){
e.onNext(1);
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
System.out.println(integer);
}
});
被觀察者是事件的生產(chǎn)者,觀察者是事件的消費者。上述例子中可以看出生產(chǎn)者無限生成事件,而消費者每2秒才能消費一個事件,這會造成事件無限堆積,最后造成OOM。
Flowable就是由此產(chǎn)生,專門用來處理這類問題


參考資料:
Github上RxJava的項目地址:
https://github.com/ReactiveX/RxJava
技術(shù)文檔Api:
http://reactivex.io/RxJava/javadoc/
RxAndroid,用于 Android 開發(fā):
https://github.com/ReactiveX/RxAndroid
簡書博客推薦:
http://www.itdecent.cn/p/ba61c047c230