其它文章
1、RxJava之一——一次性學會使用RxJava RxJava簡單的使用和使用它的好處
2、RxJava之二——Single和Subject 與Observable舉足輕重的類,雖然用的少,但應該知道
3、RxJava之三——RxJava 2.0 全部操作符示例
4、RxJava之四—— Lift()詳解 想要了解Operators,Lift()一定要學習
5、RxJava之五—— observeOn()與subscribeOn()的詳解Scheduler線程切換的原理
6、RxJava之六——RxBus 通過RxJava來替換EventBus
RxJava介紹文檔
1、介紹
作為初學者我們不去看定義,太苦澀難懂了。我們需要記住的是RxJava就是一個實現(xiàn)異步操作的庫。學習RxJava,我們需要把握兩點:觀察者模式和異步。
異步就是多線程、線程切換這些東西。我們先來了解一下觀察者模式。
1.1 觀察者模式
觀察者模式,最重要的就是Observable(被觀察者,事件源)和Observer(觀察者),這也是RxJava中最核心的。
觀察者模式定義了一種一對多的依賴關系,讓多個觀察者對象同時監(jiān)聽某一個主題對象。這個主題對象在狀態(tài)上發(fā)生變化時,會通知所有觀察者對象,讓它們能夠自動更新自己。
我們就不看這些冷冰冰的文字定義了,舉個例子來理解下:
我們按下開關,燈亮了。
在這個事件中:燈作為Observer觀察者,開關作為Observable被觀察者,燈透過電線來觀察開關的狀態(tài)來并做出相應的處理。
一個開關可以控制多個燈,這就是一對多的關系。換句話說:就是一個開關被多個燈觀察,當開關的狀態(tài)發(fā)生改變的時候,每個燈都可以做出自己的處理。
這就是觀察者模式。
1.2 幫助理解RxJava
- 開關(被觀察者)
作為事件的產(chǎn)生方(生產(chǎn)“開”和“關”這兩個事件),是主動的,是整個開燈事理流程的起點。
- 臺燈(觀察者)
作為事件的處理方(處理“燈亮”和“燈滅”這兩個事件),是被動的,是整個開燈事件流程的終點。
- 事件傳遞
在起點和終點之間,即事件傳遞的過程中事件是可以被加工,過濾,轉(zhuǎn)換,合并等等方式處理的。
2、RxJava事件處理流程
2.1 說明
RxJava也是基于觀察者模式來組建自己的程序邏輯的,就是構建被觀察者(Observable),觀察者(Observer),然后建立二者的訂閱(subscribe)關系(就像那根電線,連接起臺燈和開關)實現(xiàn)觀察,在事件傳遞過程中還可以對事件做各種處理。
2.2 創(chuàng)建被觀察者
2.2.1創(chuàng)建開關類的正規(guī)寫法
產(chǎn)生了五個事件,分別是:開,關,開,開,結束。
Observable switcher= Observable.create(newObservable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("on");
subscriber.onNext("off");
subscriber.onNext("on");
subscriber.onNext("on");
subscriber.onCompleted();
}
});
2.2.2 創(chuàng)建開關類的簡便寫法
Observable switcher=Observable.just("On","Off","On","On");
或:
String [] kk={"On","Off","On","On"};
Observable switcher=Observable.from(kk);
2.3 創(chuàng)建觀察者
2.3.1 Observer接口實現(xiàn)
Observer<String> light= new Observer<String>() {
@Override
public void onNext(String s) {
//處理傳過來的onNext事件
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
//被觀察者的onCompleted()方法會走這里,結束觀察
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
//出現(xiàn)錯誤會調(diào)用這個方法
Log.d(tag, "Error!");
}
};
2.3.2 Subscriber實現(xiàn)
除了 Observer 接口之外,RxJava 還內(nèi)置了一個實現(xiàn)了 Observer 的抽象類-Subscriber。 Subscriber 對 Observer 接口進行了一些擴展,但他們的基本使用方式是完全一樣的:
Subscriber<String> light= new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
2.3.3 非正式寫法
使用Action1。Action1是一個單純的人畜無害的接口,和Observer沒有啥關系,只不過它可以當做觀察者來使,專門處理onNext 事件,這是一種為了簡便偷懶的寫法。當然還有Action0,Action2,Action3...,0,1,2,3分別表示call()這個方法能接受幾個參數(shù)。初次接觸為了方便理解,不便使用這種寫法。
Action1 light = new Action1<String>(){
@Override
public void call(String s) {
Log.d(tag, ""+s);
}
}
2.3.4 Observer和Subscriber 實現(xiàn)的區(qū)別
在 RxJava 的subscribe訂閱過程中,Observer 也總是會先被轉(zhuǎn)換成一個 Subscriber 再使用。所以如果你只想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的。它們的區(qū)別對于使用者來說主要有兩點:
1. onStart(): 這是 Subscriber 增加的方法。它會在 subscribe 剛開始,而事件還未發(fā)送之前被調(diào)用,可以用于做一些準備工作,例如數(shù)據(jù)的清零或重置。這是一個可選方法,默認情況下它的實現(xiàn)為空。需要注意的是,如果對準備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執(zhí)行),onStart() 就不適用了,因為它總是在 subscribe 所發(fā)生的線程被調(diào)用,而不能指定線程。要在指定的線程來做準備工作,可以使用 doOnSubscribe()方法。
2. unsubscribe(): 這是 Subscriber 所實現(xiàn)的另一個接口 Subscription 的方法,用于取消訂閱。在這個方法被調(diào)用后,Subscriber 將不再接收事件。一般在這個方法調(diào)用前,可以使用isUnsubscribed()先判斷一下狀態(tài)。unsubscribe()這個方法很重要,因為在 subscribe()之后, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放,將有內(nèi)存泄露的風險。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如 onPause() onStop()等方法中)調(diào)用 unsubscribe() 來解除引用關系,以避免內(nèi)存泄露的發(fā)生。
2.4 訂閱
現(xiàn)在已經(jīng)創(chuàng)建了觀察者和被觀察者,但是兩者還沒有聯(lián)系起來。使用下面的代碼讓兩者關聯(lián)起來:
switcher.subscribe(light);
這里需要解釋一下,開關是被觀察者,燈是觀察者,那怎么會是被觀察者訂閱觀察者呢?不應該是觀察者去訂閱被觀察者嗎?
燈訂閱開關,觀察者去訂閱被觀察者,在邏輯上是這樣的。但是,為了保證流式API調(diào)用風格(同一個調(diào)用主體一路調(diào)用下來,一氣呵成),就變成了開關訂閱燈,因為被觀察者(開關)產(chǎn)生事件,是事件的起點,那么開頭就是用被觀察者(Observable)作為唯一的調(diào)用主體,一路調(diào)用下去。
也可以這么理解,就是背后的真實的邏輯依然是燈訂閱了開關,但是在表面上,我們讓開關“假裝”訂閱了燈,以便于保持流式API調(diào)用風格不變。
2.5 流程

流程:由被觀察者產(chǎn)生事件,子傳遞過程中進行變化加工,最后到達觀察者,被觀察者處理,觀察者和被觀察者之間用訂閱關聯(lián)。結合流程圖的相關代碼:

當調(diào)用訂閱操作(即調(diào)用Observable.subscribe()方法)的時候,被觀察者才真正開始發(fā)出事件。
3、RxJava操作符
這才是RxJava最吸引人的地方
3.1 變換
RxJava 提供了對事件序列進行變換的支持,這是它的核心功能之一,也是大多數(shù)人說RxJava 真是太好用了的最大原因。所謂變換,就是將事件序列中的對象或整個序列進行加工處理,轉(zhuǎn)換成不同的事件或事件序列。
3.1.1 Map操作
需求:被觀察者產(chǎn)生的事件中只有圖片文件路徑;,但是在觀察者這里只想要bitmap,那么就需要類型變換,在事件的傳遞過程中將圖片地址轉(zhuǎn)換成bitmap對象。
Observable.just("images/logo.png") // 輸入類型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 參數(shù)類型 String
return getBitmapFromPath(filePath); // 返回類型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 參數(shù)類型 Bitmap
showBitmap(bitmap);
}
});
這是操作符的力量,但是我們只使用了RxJava 的一個要點,這顯然是不夠的,上面的代碼存在缺陷,就是讀取文件,創(chuàng)建bitmap可能是一個耗時操作,那么就應該在子線程中執(zhí)行,主線程應該僅僅做展示。線程的切換一般是比較麻煩的,但是在Rxjava中,是非常方便的,這里就要使用到第四節(jié)要講到的RxJava 線程控制了。
改進后的代碼:
Observable.just("images/logo.png") // 輸入類型 String
.subscribeOn(Schedulers.newThread()) // 指定被觀察者執(zhí)行的線程環(huán)境
.observeOn(Schedulers.io()) // 指定接下來的操作發(fā)生在IO線程
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 參數(shù)類型 String
return getBitmapFromPath(filePath); // 返回類型 Bitmap
}
})
.observeOn(AndroidSchedulers.mainThread()) // 切換到主線程
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 參數(shù)類型 Bitmap
showBitmap(bitmap);
}
});
由上面的代碼可以看到,使用操作符將事件處理逐步分解,通過線程調(diào)度為每一步設置不同的線程環(huán)境,完全解決了線程切換的煩惱??梢哉f線程調(diào)度+操作符,才真正展現(xiàn)了RxJava無與倫比的魅力。
重要:map()中事件對象是直接變換的,具體功能上面已經(jīng)介紹過,它是 RxJava 最常用的變換,示意圖如下:
(被傳遞的就是事件對象,由圓變方,即是傳遞過程中的變換。)

3.1.2 FlatMap操作
1 需求:打印出一組學生中每個學生選修的課程名。
你是不是想到了,先循環(huán)讀出所有學生的數(shù)據(jù),再循環(huán)查出每個學生的課程并打印出來。在沒有接觸RxJava之前這種想法是沒有錯的,但是太繁瑣了,這里面涉及到兩層循環(huán)。
現(xiàn)在使用RxJava的flatMap()將輕松解決這一需求。
Student[] students = ...;
//創(chuàng)建觀察者,傳入課程對象
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students) //創(chuàng)建被觀察者,獲取所有學生
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber); //被觀察者訂閱觀察者
2 原理:
flatMap() 的原理是這樣的:
①使用傳入的事件對象創(chuàng)建一個 Observable 對象;
②并不發(fā)送這個 Observable, 而是將它激活,于是它開始發(fā)送事件;
③每一個創(chuàng)建出來的 Observable 發(fā)送的事件,都被匯入同一個 Observable ,而這個 Observable 負責將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法。這三個步驟,把事件拆成了兩級,通過一組新創(chuàng)建的 Observable 將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去。而這個『鋪平』就是 flatMap() 所謂的 flat。
3.示意圖:

即將多維數(shù)組轉(zhuǎn)換成一位數(shù)組,如上面的兩層列表轉(zhuǎn)換成一層列表。
4.補充
由于可以在嵌套的 Observable 中添加異步代碼, flatMap() 也常用于嵌套的異步操作,例如嵌套的網(wǎng)絡請求(Retrofit + RxJava)。
3.2 操作符的大致分類
Creating Observables(Observable的創(chuàng)建操作符),比如:Observable.create()、Observable.just()、Observable.from()等等;
Transforming Observables(Observable的轉(zhuǎn)換操作符),比如:observable.map()、observable.flatMap()、observable.buffer()等等;
Filtering Observables(Observable的過濾操作符),比如:observable.filter()、observable.sample()、observable.take()等等;
Combining Observables(Observable的組合操作符),比如:observable.join()、observable.merge()、observable.combineLatest()等等;
Error Handling Operators(Observable的錯誤處理操作符),比如:observable.onErrorResumeNext()、observable.retry()等等;
Observable Utility Operators(Observable的功能性操作符),比如:observable.subscribeOn()、observable.observeOn()、observable.delay()等等;
Conditional and Boolean Operators(Observable的條件操作符),比如:observable.amb()、observable.contains()、observable.skipUntil()等等;
Mathematical and Aggregate Operators(Observable數(shù)學運算及聚合操作符),比如:observable.count()、observable.reduce()、observable.concat()等等;
其他如observable.toList()、observable.connect()、observable.publish()等等;
3.3 操作符的使用
具體使用請參考官方文檔,這里也有一個Demo可以參考:
https://github.com/ladingwu/ApplicationDemo,此demo我已經(jīng)調(diào)試運行后放在該文檔同目錄下了。
4、RxJava線程控制
4.1 說明
第二節(jié)主要講了RxJava的操作流程,其實主要就是觀察者模式,現(xiàn)在就來看RxJava另一個要點:異步。
在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在哪個線程調(diào)用 subscribe(),就在哪個線程生產(chǎn)事件;在哪個線程生產(chǎn)事件,就在哪個線程消費事件。如果需要切換線程,就需要用到 Scheduler (調(diào)度器)。
RxJava的線程環(huán)境:

4.2 代碼中使用
在3.1.1Map操作符時,已經(jīng)提到了線程調(diào)度,其在代碼中的使用如下:
//new Observable.just()執(zhí)行在新線程
Observable.create(new Observable.just(getFilePath())
//指定在新線程中創(chuàng)建被觀察者
.subscribeOn(Schedulers.newThread())
//將接下來執(zhí)行的線程環(huán)境指定為io線程
.observeOn(Schedulers.io())
//map就處在io線程
.map(mMapOperater)
//將后面執(zhí)行的線程環(huán)境切換為主線程,
//但是這一句依然執(zhí)行在io線程
.observeOn(AndroidSchedulers.mainThread())
//指定線程無效,但這句代碼本身執(zhí)行在主線程
.subscribeOn(Schedulers.io())
//執(zhí)行在主線程
.subscribe(mSubscriber);
4.3 總結
實際上線程調(diào)度只有subscribeOn()和observeOn()兩個方法。對于初學者,只需要掌握兩點:
subscribeOn()它指示Observable在一個指定的調(diào)度器上創(chuàng)建(只作用于被觀察者創(chuàng)建階段)。只能指定一次,如果指定多次則以第一次為準
observeOn()指定在事件傳遞(加工變換)和最終被處理(觀察者)的發(fā)生在哪一個調(diào)度器??芍付ǘ啻?,每次指定完都在下一步生效。
5、相關鏈接
5.1 github地址
RxJava:https://github.com/ReactiveX/RxJava>
RxAndroid:https://github.com/ReactiveX/RxAndroid
5.2 參考技術blog地址
給 Android 開發(fā)者的 RxJava 詳解:
http://gank.io/post/560e15be2dca930e00da1083
關于RxJava最友好的文章:
http://www.itdecent.cn/p/6fd8640046f1
關于RxJava最友好的文章(進階):