一、Rxjava
1.簡介
Rxjava 是一個實(shí)現(xiàn)異步操作的庫
優(yōu)勢: 使邏輯變得簡潔,而不是代碼量得增減,當(dāng)邏輯變得復(fù)雜時(shí),RxJava能保證從上到下仍然是鏈?zhǔn)秸{(diào)用,沒有任何嵌套。
github :
https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid
2.概覽
Rxjava基本原理就是觀察者(ObServer)通過(Subscribe)訂閱被觀察者(Observable)
RxJava有四個基本概念:Observable(被觀察者),Observer(觀察者),subscribe(訂閱),事件。Observable和Observer通過subscribe達(dá)成訂閱關(guān)系,從而 Observable可以在需要的時(shí)候發(fā)出事件來通知 Observer。
RxJava定義了三個事件回調(diào)方法:
(1) onNext():相當(dāng)于onClick(),每個單獨(dú)的事件處理回調(diào)方法
(2) onCompleted(): 事件隊(duì)列完結(jié)。RxJava不僅把每個事件單獨(dú)處理,還會把它看成一個事件隊(duì)列。當(dāng)不再有新的onNext()發(fā)出時(shí),需要觸發(fā)onCompleted()作為結(jié)束標(biāo)志。
(3) onError(): 事件隊(duì)列異常,在事件處理過程中出異常時(shí),onError()
會被觸發(fā),同時(shí)隊(duì)列自動終止,不允許再有事件發(fā)出。
在一個正確運(yùn)行的事件序列中, onCompleted()和 onError()有且只有一個,并且是事件序列中的最后一個。
在 RxJava 2.x 中,Observable 用于訂閱 Observer,不再支持背壓(1.x 中可以使用背壓策略),而 Flowable 用于訂閱 Subscriber , 是支持背壓(Backpressure)的。
Observable ( 被觀察者 ) / Observer ( 觀察者 )
Flowable (被觀察者)/ Subscriber (觀察者)
二、分析
從過程開始
2.1 創(chuàng)建Observer
Observer 即觀察者,它決定事件觸發(fā)的時(shí)候?qū)⒂性鯓拥男袨椤?梢酝ㄟ^創(chuàng)建Observer對象和創(chuàng)建Subscriber對象來實(shí)現(xiàn)觀察者。
2.1.1 創(chuàng)建Observer對象:
Observer<String> observer=new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
};
2.1.2 創(chuàng)建Subscriber對象:
Subscriber<String> subscriber=new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
};
Subscriber實(shí)現(xiàn)了 Observer,并對 Observer 接口進(jìn)行了一些擴(kuò)展,實(shí)質(zhì)上,在 RxJava 的 subscribe 過程中,Observer也總是會先被轉(zhuǎn)換成一個 Subscriber再使用。
//轉(zhuǎn)換成Subscriber
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
2.2 創(chuàng)建Observable
2.2.1 使用 create方法創(chuàng)建: observable 即被觀察者,它決定什么時(shí)候觸發(fā)事件以及觸發(fā)怎樣的事件。RxJava使用create方法來創(chuàng)建一個Observable,其中OnSubcribe對象作為參數(shù)傳入。當(dāng)Observable被訂閱時(shí),OnSubcribe對象的call()方法會被自動調(diào)用,事件會按照設(shè)定依次觸發(fā)。
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊(duì)列
2.2.2 使用just(T...) 創(chuàng)建: 將傳入的參數(shù)依次發(fā)送出來。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
2..2.3 使用from(T[]) /from(Iterable<? extends T>)創(chuàng)建:將傳入的數(shù)組或者Iterable拆分成具體的對象后,依次發(fā)送出來。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
2.3 Subcribe(訂閱)
創(chuàng)建了 Observable 和 Observer之后,再用 subscribe() 方法將它們聯(lián)結(jié)起來。
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
Observable.subscribe(Subscriber) 的內(nèi)部實(shí)現(xiàn)是這樣的(邏輯實(shí)現(xiàn)):
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
主要做了三件事:
(1)調(diào)用 subscriber.onStart();
(2)調(diào)用onSubscribe.call(subscriber);
(3)將傳入的 Subscriber 作為 Subscription返回。
2.4RxJava不完整定義的回調(diào)(Action0,Action1)。
Action0: 是一個接口,只有一個call()方法,并且該方法無參無返回值。
public interface Action0 extends Action {
void call();
}
4.1.2 Action1: 是一個接口,只有一個call()方法,并且該方法有一個參數(shù)返回值。
public interface Action1<T> extends Action {
void call(T t);
}
2.5線程控制(Scheduler)
2.5.1 Scheduler API
RxJava 遵循的是線程不變的原則:在哪個線程調(diào)用 subscribe(),就在哪個線程生產(chǎn)事件;在哪個線程生產(chǎn)事件,就在哪個線程消費(fèi)事件。
切換線程需要通過Scheduler,RxJava內(nèi)置的Schedulers。
Schedulers.immediate():直接在當(dāng)前線程運(yùn)行。
Schedulers.newThread(): 啟動新線程,并在新線程執(zhí)行操作。
Schedulers.io(): I/O操作使用的Scheduler。
Schedulers.computation():計(jì)算所使用的Scheduler,CPU密集型計(jì)算,即不會被I/O操作限制性能的操作,不要把I/O操作放在這個computation()中,否則這個I/O操作的等待時(shí)間會浪費(fèi)CPU。
AndroidSchedulers.mainThread():使 操作執(zhí)行在Android的主線程。
subscribeOn(): 指定subscribe發(fā)生的線程,即事件生產(chǎn)線程。
observeOn(): 指定Subscriber所運(yùn)行的線程,即事件消費(fèi)線程。
2.5.2線程控制:Scheduler (二)
observeOn(): 可以多次調(diào)用;指定的是它之后的操作所在的線程。observeOn()指定的是Subscriber的線程,而這個Subscriber并不一定是subscribe()中的Subscriber,而是observeOn()執(zhí)行時(shí),當(dāng)前Obserable所對應(yīng)的Subscriber。
subscribeOn(): subscribeOn()的位置放在哪里都可以,但它是只能調(diào)用一次的。
subscribeOn()和observeOn()的原理
subscribeOn 的線程切換發(fā)生在OnSubcribe中,即在它通知上一級 OnSubscribe 時(shí),這時(shí)事件還沒有開始發(fā)送,因此subscribeOn()的線程控制可以從事件發(fā)出的開端就造成影響;
observeOn() 的線程切換則發(fā)生在它內(nèi)建的 Subscriber中,即發(fā)生在它即將給下一級 Subscriber 發(fā)送事件時(shí),因此 observeOn()控制的是它后面的線程。
doOnSubscribe()
默認(rèn)情況下doOnSubscribe()執(zhí)行在subscribe()線程;但是如果doOnSubscribe()后面有subscribeOn()的話,它將執(zhí)行在離它最近的 subscribeOn()所指定的線程。
doOnSubscribe()主要是在事件流程之前進(jìn)行一些初始化工作,和Subscriber的 onStart()方法左右類似1;不同的是onStart()方法不能指定線程,它執(zhí)行在 subscribe()被調(diào)用時(shí)的線程,這會對onStart()中代碼有要求。
三、其他
1、RxJava和RxAndroid的區(qū)別: RxAndroid是特別針對Android這個平臺增加了更新UI的調(diào)度器,其余的類都是相同的。
- Subscriber和Observer的區(qū)別
onStart(): 這個是Subscriber新增的方法,用于subscribe剛開始,事件還沒發(fā)送之前,可以用于做一些準(zhǔn)備工作。例如事件清零。onStart方法總是在subscribe的線程發(fā)生。
unSubscrible(): 這個是Subscribe實(shí)現(xiàn)的Subscription接口中的方法,用于取消訂閱,因?yàn)閟ubscribe后, Observable會持有一個Subscriber的引用,這個引用如果不能及時(shí)被釋放,將有內(nèi)存泄露的風(fēng)險(xiǎn)。所以最好保持一個原則:要在不再使用的時(shí)候盡快在合適的地方(例如 onPause() onStop()等方法中)調(diào)用unsubscribe()來解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生。