【JAVA RxJava 2.x】
為什么要學(xué) RxJava?
RxJava 最大的優(yōu)點也是簡潔,但它不止是簡潔,而且是** 隨著程序邏輯變得越來越復(fù)雜,它依然能夠保持簡潔 **。
咳咳,要例子,猛戳這里:給 Android 開發(fā)者的 RxJava 詳解
什么是響應(yīng)式編程
響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式。
數(shù)據(jù)流就像一條河:它可以被觀測,被過濾,被操作,或者為新的消費者與另外一條流合并為一條新的流。
響應(yīng)式編程的一個關(guān)鍵概念是事件。事件可以被等待,可以觸發(fā)過程,也可以觸發(fā)其它事件。事件是唯一的以合適的方式將我們的現(xiàn)實世界映射到我們的軟件中:如果屋里太熱了我們就打開一扇窗戶。同樣的,當(dāng)我們的天氣app從服務(wù)端獲取到新的天氣數(shù)據(jù)后,我們需要更新app上展示天氣信息的UI;汽車上的車道偏移系統(tǒng)探測到車輛偏移了正常路線就會提醒駕駛者糾正,就是是響應(yīng)事件。
響應(yīng)式編程最通用的一個場景是UI:我們的移動App必須做出對網(wǎng)絡(luò)調(diào)用、用戶觸摸輸入和系統(tǒng)彈框的響應(yīng)。
簡介
RxJava 2.x 已經(jīng)按照 Reactive-Streams specification 規(guī)范完全的重寫了,maven也被放在了io.reactivex.rxjava2:rxjava:2.x.y 下,所以 RxJava 2.x 獨立于 RxJava 1.x 而存在,而隨后官方宣布的將在一段時間后終止對 RxJava 1.x 的維護(hù),所以對于熟悉 RxJava 1.x 的老司機自然可以直接看一下 2.x 的文檔和異同就能輕松上手了。
而對于不熟悉的年輕司機,此處有案例:https://github.com/nanchen2251/RxJava2Examples
你只需要在 build.gradle 中加上:compile 'io.reactivex.rxjava2:rxjava:2.1.1'(2.1.1為寫此文章時的最新版本)
接口變化
RxJava 2.x 擁有了新的特性,其依賴于4個基礎(chǔ)接口,它們分別是
- Publisher
- Subscriber
- Subscription
- Processor
其中最核心的莫過于 Publisher 和 Subscriber。
Publisher 可以發(fā)出一系列的事件,而 Subscriber 負(fù)責(zé)和處理這些事件。
其中用的比較多的自然是 Publisher 的 Flowable,它支持背壓。關(guān)于背壓給個簡潔的定義就是:
背壓是指在異步場景中,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略。
簡而言之,背壓是流速控制的一種策略。有興趣的可以看一下官方對于背壓的講解。
可以明顯地發(fā)現(xiàn),RxJava 2.x 最大的改動就是對于 backpressure 的處理,為此將原來的 Observable 拆分成了新的 Observable 和 Flowable,同時其他相關(guān)部分也同時進(jìn)行了拆分。
觀察者模式
RxJava 以觀察者模式為骨架,在 2.0 中依舊如此。
不過此次更新中,出現(xiàn)了兩種觀察者模式:
-
Observable( 被觀察者 ) /Observer( 觀察者 ) -
Flowable(被觀察者)/Subscriber(觀察者)

在 RxJava 2.x 中,Observable 用于訂閱 Observer,不再支持背壓(1.x 中可以使用背壓策略),而 Flowable 用于訂閱 Subscriber , 是支持背壓(Backpressure)的。
Observable
RxJava 三部曲
第一步:初始化 Observable
第二步:初始化 Observer
第三步:建立訂閱關(guān)系

Observable.create(new ObservableOnSubscribe<Integer>() { // 第一步:初始化Observable
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.e(TAG, "Observable emit 1" + "\n");
e.onNext(1);
Log.e(TAG, "Observable emit 2" + "\n");
e.onNext(2);
Log.e(TAG, "Observable emit 3" + "\n");
e.onNext(3);
e.onComplete();
Log.e(TAG, "Observable emit 4" + "\n" );
e.onNext(4);
}
}).subscribe(new Observer<Integer>() { // 第三步:訂閱
// 第二步:初始化Observer
private int I;
private Disposable mDisposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
mDisposable = d;
}
@Override
public void onNext(@NonNull Integer integer) {
I++;
if (i == 2) {
// 在RxJava 2.x 中,新增的Disposable可以做到切斷的操作,讓Observer觀察者不再接收上游事件
mDisposable.dispose();
}
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete" + "\n" );
}
});
與 1.x區(qū)別
RxJava 2.x 與 1.x 還是存在著一些區(qū)別的:
- 首先,創(chuàng)建 Observable 時,回調(diào)的是
ObservableEmitter,字面意思即發(fā)射器,并且直接 throws Exception。 - 其次,在創(chuàng)建的 Observer 中,也多了一個回調(diào)方法:
onSubscribe,傳遞參數(shù)為Disposable,Disposable相當(dāng)于 RxJava 1.x 中的Subscription, 用于解除訂閱??梢钥吹绞纠a中,在 i 自增到 2 的時候,訂閱關(guān)系被切斷。
07-03 14:24:11.663 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onSubscribe : false
07-03 14:24:11.664 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 1
07-03 14:24:11.665 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 1
07-03 14:24:11.666 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 2
07-03 14:24:11.667 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 2
07-03 14:24:11.668 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : isDisposable : true
07-03 14:24:11.669 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 3
07-03 14:24:11.670 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 4
簡化訂閱
當(dāng)然,我們的 RxJava 2.x 也為我們保留了簡化訂閱方法,我們可以根據(jù)需求,進(jìn)行相應(yīng)的簡化訂閱,只不過傳入對象改為了 Consumer。
Consumer 即消費者,用于接收單個值,
BiConsumer 則是接收兩個值,
Function 用于變換對象,
Predicate 用于判斷。
這些接口命名大多參照了 Java 8。
線程調(diào)度
關(guān)于線程切換這點,RxJava 1.x 和 RxJava 2.x 的實現(xiàn)思路是一樣的。
subscribeOn
同 RxJava 1.x 一樣,subscribeOn 用于指定 subscribe() 時所發(fā)生的線程,從源碼角度可以看出,內(nèi)部線程調(diào)度是通過 ObservableSubscribeOn來實現(xiàn)的。
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
ObservableSubscribeOn 的核心源碼在 subscribeActual 方法中,通過代理的方式使用 SubscribeOnObserver 包裝 Observer 后,設(shè)置 Disposable 來將 subscribe 切換到 Scheduler 線程中。
observeOn
observeOn 方法用于指定下游 Observer 回調(diào)發(fā)生的線程。
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
線程切換注意
RxJava 內(nèi)置的線程調(diào)度器的確可以讓我們的線程切換得心應(yīng)手,但其中也有些需要注意的地方。
- 簡單地說,
subscribeOn()指定的就是發(fā)射事件的線程,observerOn指定的就是訂閱者接收事件的線程。 - 多次指定發(fā)射事件的線程只有第一次指定的有效,也就是說多次調(diào)用
subscribeOn()只有第一次的有效,其余的會被忽略。 - 但多次指定訂閱者接收線程是可以的,也就是說每調(diào)用一次
observerOn(),下游的線程就會切換一次。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
e.onNext(1);
e.onComplete();
}
}).subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
}
});
輸出:
07-03 14:54:01.177 15121-15438/com.nanchen.rxjava2examples E/RxThreadActivity: Observable thread is : RxNewThreadScheduler-1
07-03 14:54:01.178 15121-15121/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(mainThread),Current thread is main
07-03 14:54:01.179 15121-15439/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(io),Current thread is RxCachedThreadScheduler-2
實例代碼中,分別用 Schedulers.newThread() 和 Schedulers.io() 對發(fā)射線程進(jìn)行切換,并采用 observeOn(AndroidSchedulers.mainThread() 和 Schedulers.io() 進(jìn)行了接收線程的切換??梢钥吹捷敵鲋邪l(fā)射線程僅僅響應(yīng)了第一個 newThread,但每調(diào)用一次 observeOn() ,線程便會切換一次,因此如果我們有類似的需求時,便知道如何處理了。
內(nèi)置線程
RxJava 中,已經(jīng)內(nèi)置了很多線程選項供我們選擇,例如有:
-
Schedulers.io()代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等io密集型的操作; -
Schedulers.computation()代表CPU計算密集型的操作, 例如需要大量計算的操作; -
Schedulers.newThread()代表一個常規(guī)的新線程; -
AndroidSchedulers.mainThread()代表Android的主線程
這些內(nèi)置的 Scheduler 已經(jīng)足夠滿足我們開發(fā)的需求,因此我們應(yīng)該使用內(nèi)置的這些選項,而 RxJava 內(nèi)部使用的是線程池來維護(hù)這些線程,所以效率也比較高。
操作符
關(guān)于操作符,在官方文檔中已經(jīng)做了非常完善的講解,并且筆者前面的系列教程中也著重講解了絕大多數(shù)的操作符作用,這里受于篇幅限制,就不多做贅述,只挑選幾個進(jìn)行實際情景的講解。
map

map 操作符可以將一個 Observable 對象通過某種關(guān)系轉(zhuǎn)換為另一個Observable 對象。
在 2.x 中和 1.x 中作用幾乎一致,不同點在于:2.x 將 1.x 中的 Func1 和 Func2 改為了 Function 和 BiFunction。
采用 map 操作符進(jìn)行網(wǎng)絡(luò)數(shù)據(jù)解析
想必大家都知道,很多時候我們在使用 RxJava 的時候總是和 Retrofit 進(jìn)行結(jié)合使用,而為了方便演示,這里我們就暫且采用 OkHttp3 進(jìn)行演示,配合 map,doOnNext ,線程切換進(jìn)行簡單的網(wǎng)絡(luò)請求:
1)通過 Observable.create() 方法,調(diào)用 OkHttp 網(wǎng)絡(luò)請求;
2)通過 map 操作符集合 gson,將 Response 轉(zhuǎn)換為 bean 類;
3)通過 doOnNext() 方法,解析 bean 中的數(shù)據(jù),并進(jìn)行數(shù)據(jù)庫存儲等操作;
4)調(diào)度線程,在子線程中進(jìn)行耗時操作任務(wù),在主線程中更新 UI ;
5)通過 subscribe(),根據(jù)請求成功或者失敗來更新 UI 。
Observable
.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
Builder builder = new Builder()
.url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
}
})
.map(new Function<Response, MobileAddress>() {
@Override
public MobileAddress apply(@NonNull Response response) throws Exception {
if (response.isSuccessful()) {
ResponseBody body = response.body();
if (body != null) {
Log.e(TAG, "map:轉(zhuǎn)換前:" + response.body());
return new Gson().fromJson(body.string(), MobileAddress.class);
}
}
return null;
}
})
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<MobileAddress>() {
@Override
public void accept(@NonNull MobileAddress s) throws Exception {
Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<MobileAddress>() {
@Override
public void accept(@NonNull MobileAddress data) throws Exception {
Log.e(TAG, "成功:" + data.toString() + "\n");
},new Consumer<Throwable>()
{
@Override
public void accept (@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "失?。? + throwable.getMessage() + "\n");
}
});
concat

concat 可以做到不交錯的發(fā)射兩個甚至多個 Observable 的發(fā)射事件,并且只有前一個 Observable 終止(onComplete) 后才會訂閱下一個 Observable。
采用 concat 操作符先讀取緩存再通過網(wǎng)絡(luò)請求獲取數(shù)據(jù)
在實際應(yīng)用中,很多時候(對數(shù)據(jù)操作不敏感時)都需要我們先讀取緩存的數(shù)據(jù),如果緩存沒有數(shù)據(jù),再通過網(wǎng)絡(luò)請求獲取,隨后在主線程更新我們的UI。
concat 操作符簡直就是為我們這種需求量身定做。
利用 concat 的必須調(diào)用 onComplete 后才能訂閱下一個 Observable 的特性,我們就可以先讀取緩存數(shù)據(jù),倘若獲取到的緩存數(shù)據(jù)不是我們想要的,再調(diào)用 onComplete() 以執(zhí)行獲取網(wǎng)絡(luò)數(shù)據(jù)的 Observable,如果緩存數(shù)據(jù)能應(yīng)我們所需,則直接調(diào)用 onNext(),防止過度的網(wǎng)絡(luò)請求,浪費用戶的流量。
Observable<FoodList> cache =
Observable
.create(new ObservableOnSubscribe<FoodList>() {
@Override
public void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception {
Log.e(TAG, "create當(dāng)前線程:" + Thread.currentThread().getName());
FoodList data = CacheManager.getInstance().getFoodListData();
// 在操作符 concat 中,只有調(diào)用 onComplete 之后才會執(zhí)行下一個 Observable
if (data != null) { // 如果緩存數(shù)據(jù)不為空,則直接讀取緩存數(shù)據(jù),而不讀取網(wǎng)絡(luò)數(shù)據(jù)
isFromNet = false;
Log.e(TAG, "\nsubscribe: 讀取緩存數(shù)據(jù):");
runOnUiThread(new Runnable() {
@Override
public void run() {
mRxOperatorsText.append("\nsubscribe: 讀取緩存數(shù)據(jù):\n");
}
});
e.onNext(data);
} else {
isFromNet = true;
runOnUiThread(new Runnable() {
@Override
public void run() {
mRxOperatorsText.append("\nsubscribe: 讀取網(wǎng)絡(luò)數(shù)據(jù):\n");
}
});
Log.e(TAG, "\nsubscribe: 讀取網(wǎng)絡(luò)數(shù)據(jù):");
e.onComplete();
}
}
});
Observable<FoodList> network =
Rx2AndroidNetworking
.get("http://www.tngou.net/api/food/list")
.addQueryParameter("rows", 10 + "")
.build()
.getObjectObservable(FoodList.class);
// 兩個 Observable 的泛型應(yīng)當(dāng)保持一致
Observable
.concat(cache, network)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FoodList>() {
@Override
public void accept(@NonNull FoodList tngouBeen) throws Exception {
Log.e(TAG, "subscribe 成功:" + Thread.currentThread().getName());
if (isFromNet) {
mRxOperatorsText.append("accept : 網(wǎng)絡(luò)獲取數(shù)據(jù)設(shè)置緩存: \n");
Log.e(TAG, "accept : 網(wǎng)絡(luò)獲取數(shù)據(jù)設(shè)置緩存: \n" + tngouBeen.toString());
CacheManager.getInstance().setFoodListData(tngouBeen);
}
mRxOperatorsText.append("accept: 讀取數(shù)據(jù)成功:" + tngouBeen.toString() + "\n");
Log.e(TAG, "accept: 讀取數(shù)據(jù)成功:" + tngouBeen.toString());
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "subscribe 失敗:" + Thread.currentThread().getName());
Log.e(TAG, "accept: 讀取數(shù)據(jù)失敗:" + throwable.getMessage());
mRxOperatorsText.append("accept: 讀取數(shù)據(jù)失?。? + throwable.getMessage() + "\n");
}
});
有時候我們的緩存可能還會分為 memory 和 disk ,實際上都差不多,無非是多寫點 Observable ,然后通過 concat 合并即可。
flatMap 多個網(wǎng)絡(luò)請求依次依賴
想必這種情況也在實際情況中比比皆是,例如用戶注冊成功后需要自動登錄,我們只需要先通過注冊接口注冊用戶信息,注冊成功后馬上調(diào)用登錄接口進(jìn)行自動登錄即可。
我們的 flatMap 恰好解決了這種應(yīng)用場景,flatMap 操作符可以將一個發(fā)射數(shù)據(jù)的 Observable 變換為多個 Observables ,然后將它們發(fā)射的數(shù)據(jù)合并后放到一個單獨的 Observable,利用這個特性,我們很輕松地達(dá)到了我們的需求。
Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
.addQueryParameter("rows", 1 + "")
.build()
.getObjectObservable(FoodList.class) // 發(fā)起獲取食品列表的請求,并解析到FootList
.subscribeOn(Schedulers.io()) // 在io線程進(jìn)行網(wǎng)絡(luò)請求
.observeOn(AndroidSchedulers.mainThread()) // 在主線程處理獲取食品列表的請求結(jié)果
.doOnNext(new Consumer<FoodList>() {
@Override
public void accept(@NonNull FoodList foodList) throws Exception {
// 先根據(jù)獲取食品列表的響應(yīng)結(jié)果做一些操作
Log.e(TAG, "accept: doOnNext :" + foodList.toString());
mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n");
}
})
.observeOn(Schedulers.io()) // 回到 io 線程去處理獲取食品詳情的請求
.flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {
@Override
public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {
if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {
return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show")
.addBodyParameter("id", foodList.getTngou().get(0).getId() + "")
.build()
.getObjectObservable(FoodDetail.class);
}
return null;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FoodDetail>() {
@Override
public void accept(@NonNull FoodDetail foodDetail) throws Exception {
Log.e(TAG, "accept: success :" + foodDetail.toString());
mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "accept: error :" + throwable.getMessage());
mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n");
}
});
zip 多個接口數(shù)據(jù)共同更新 UI
在實際應(yīng)用中,我們極有可能會在一個頁面顯示的數(shù)據(jù)來源于多個接口,這時候我們的 zip 操作符為我們排憂解難。
zip 操作符可以將多個 Observable 的數(shù)據(jù)結(jié)合為一個數(shù)據(jù)源再發(fā)射出去。
Observable<MobileAddress> observable1 =
Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
.build()
.getObjectObservable(MobileAddress.class);
Observable<CategoryResult> observable2 =
Network.getGankApi()
.getCategoryData("Android", 1, 1);
Observable
.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() {
@Override
public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception {
return "合并后的數(shù)據(jù)為:手機歸屬地:" + mobileAddress.getResult().getMobilearea() + "人名:" + categoryResult.results.get(0).who;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept: 成功:" + s + "\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "accept: 失?。? + throwable + "\n");
}
});
interval 心跳間隔任務(wù)
想必即時通訊等需要輪訓(xùn)的任務(wù)在如今的 APP 中已是很常見,而 RxJava 2.x 的 interval 操作符可謂完美地解決了我們的疑惑。
這里就簡單的意思一下輪訓(xùn)。
private Disposable mDisposable;
@Override
protected void doSomething() {
mDisposable =
Flowable
.interval(1, TimeUnit.SECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: doOnNext : " + aLong);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: 設(shè)置文本 :" + aLong);
mRxOperatorsText.append("accept: 設(shè)置文本 :" + aLong + "\n");
}
});
}
/**
* 銷毀時停止心跳
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (mDisposable != null) {
mDisposable.dispose();
}
}
RxJava 1.x 如何平滑升級到 RxJava 2.x?
由于 RxJava 2.x 變化較大無法直接升級,幸運的是,官方為我們提供了 RxJava2Interrop 這個庫,可以方便地把 RxJava 1.x 升級到 RxJava 2.x,或者將 RxJava 2.x 轉(zhuǎn)回到 RxJava 1.x。
與RxJava 1.x的差異
給Android開發(fā)者的 RxJava 詳解:該文詳細(xì)地為大家講解了 RxJava 的優(yōu)勢、原理以及使用方式和適用情景,一定被眾多的 Android 開發(fā)者視為神器??上?,文章歷史比較久遠(yuǎn),基本都是講解的 RxJava 1.x了。
因為 RxJava 2.x 是按照 Reactive-Streams specification 規(guī)范完全的重寫的,完全獨立于 RxJava 1.x 而存在,它改變了以往 RxJava 的用法。
Nulls
這是一個很大的變化,熟悉 RxJava 1.x 的童鞋一定都知道,1.x 是允許我們在發(fā)射事件的時候傳入 null 值的,但現(xiàn)在我們的 2.x 不支持了。
傳入null會產(chǎn)生 NullPointerException 。這意味著 Observable<Void> 不再發(fā)射任何值,而是正常結(jié)束或者拋出空指針。
Flowable
在 RxJava 1.x 中關(guān)于介紹 backpressure 部分有一個小小的遺憾,那就是沒有用一個單獨的類,而是使用 Observable 。而在 2.x 中 Observable 不支持背壓了,將用一個全新的 Flowable 來支持背壓。
背壓:大概就是指在異步場景中,被觀察者發(fā)送事件的速度遠(yuǎn)快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略。在差距太大的時候,我們的內(nèi)存會猛增,直到OOM。而我們的 Flowable 一定意義上可以解決這樣的問題,但其實并不能完全解決,這個后面可能會提到。
Single/Completable/Maybe
其實這三者都差不多,Single 顧名思義,只能發(fā)送一個事件,和 Observable接受可變參數(shù)完全不同。而 Completable 側(cè)重于觀察結(jié)果,而 Maybe 是上面兩種的結(jié)合體。也就是說,當(dāng)你只想要某個事件的結(jié)果(true or false)的時候,你可以使用這種觀察者模式。
線程調(diào)度相關(guān)
這一塊基本沒什么改動,但細(xì)心的小伙伴一定會發(fā)現(xiàn),RxJava 2.x 中已經(jīng)沒有了 Schedulers.immediate() 這個線程環(huán)境,還有 Schedulers.test()。
Function相關(guān)
熟悉 1.x 的小伙伴一定都知道,我們在1.x 中是有 Func1,Func2.....FuncN的,但 2.x 中將它們移除,而采用 Function 替換了 Func1,采用 BiFunction 替換了 Func 2..N。并且,它們都增加了 throws Exception,也就是說,媽媽再也不用擔(dān)心我們做某些操作還需要 try-catch 了。
其他操作符相關(guān)
如 Func1...N 的變化,現(xiàn)在同樣用 Consumer 和 BiConsumer 對 Action1 和 Action2 進(jìn)行了替換。后面的 Action 都被替換了,只保留了 ActionN。
官方對比
下面從官方截圖展示 2.x 相對 1.x 的改動細(xì)節(jié),僅供參考。








引用:
文章鏈接:
這可能是最好的RxJava 2.x 教程(完結(jié)版)
這可能是最好的RxJava 2.x 入門教程(一)
這可能是最好的RxJava 2.x 入門教程(二)
這可能是最好的RxJava 2.x 入門教程(三)
這可能是最好的RxJava 2.x 入門教程(四)
這可能是最好的RxJava 2.x 入門教程(五)
GitHub 同步代碼:https://github.com/nanchen2251/RxJava2Examples,主要包含基本的代碼封裝,RxJava 2.x所有操作符應(yīng)用場景介紹和實際應(yīng)用場景,后期除了RxJava可能還會增添其他東西。