
參考:給 Android 開發(fā)者的 RxJava 詳解
GitHub:RxJava、RxAndroid
// Build.gradle配置文件引入
compile 'io.reactivex:rxandroid:1.1.0'
compile 'io.reactivex:rxjava:1.1.0'
1、RxJava可以做什么
RxJava是一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫??梢暂p松實(shí)現(xiàn)線程切換的方式。平時我們做線程切換的幾種方式:
- Handler
這個應(yīng)該是Android常用的一種方式,利用線程的Handler發(fā)送消息到相應(yīng)的線程Looper里面,那么這個任務(wù)也將會在該線程執(zhí)行。
handler.sendEmptyMessage(EVENT_DOWNLOAD_DATA);
Handler另外的一種發(fā)送消息的方式:
getUiHandler().post(new Runnable() {
@Override
public void run() {
// do something
}
});
- new Thread()
新建一個子線程執(zhí)行耗時操作,需要切換到主線更新UI時
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
// do something
}
});
- AsyncTask
內(nèi)部有個線程池執(zhí)行異常任務(wù)doInBackground(),在UI線程中執(zhí)行任務(wù)前onPreExecute()、更新進(jìn)度onProgressUpdate()、完成任務(wù)onPostExecute()的方法。 - EventBus、otto
比較通用的事件消息總線開源框架,對于普通的線程切換來說,也是比較重的一種方式。 - 另外還有廣播、Service等不大靈活的線程切換方式。
相必,以上的幾種方式使用起來都不是那么的簡潔,業(yè)務(wù)復(fù)雜起來,還會導(dǎo)致邏輯上的不清晰,那么RxJava就可以做到邏輯的簡潔,執(zhí)行異步任務(wù)、線程切換都可以在一條鏈上完成。
2、觀察者模式
觀察者模式的例子:
- Button點(diǎn)擊事件
Button被觀察者,持有引用,OnClickListener觀察者,通過setOnClickListener()注冊/訂閱點(diǎn)擊事件,發(fā)生onClick()事件。 - 設(shè)置監(jiān)聽回調(diào)函數(shù)
比如執(zhí)行選擇會員彈窗Dialog。Dialog被觀察者,持有引用,OnMemberSelectListener觀察者,通過setOnMemberSelectListener()注冊/訂閱選擇會員事件,發(fā)生onMemberSelect(Member member)事件。
3、RxJava觀察者模式實(shí)現(xiàn)步驟
3.1、創(chuàng)建觀察者 Observer
Observable被觀察者 和 Observer觀察者 通過 subscribe() 方法實(shí)現(xiàn)訂閱關(guān)系
事件。
onNext()
onCompleted()
onError()
onStart() // 這是 Subscriber 增加的方法。它總是在 subscribe 所發(fā)生的線程被調(diào)用。不太適用于執(zhí)行任務(wù)前的彈窗等UI操作。
unsubscribe() // 這是 Subscriber 所實(shí)現(xiàn)的另一個接口 Subscription 的方法,用于取消訂閱。避免內(nèi)存泄漏。
Observer 接口,Observer 的抽象類:Subscriber
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
3.2、創(chuàng)建被觀察者 Observable
create() 方法是 RxJava 最基本的創(chuàng)造事件序列的方法
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
另外還提供了一些方法用來快捷創(chuàng)建事件隊(duì)列
// just()
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// from()
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
3.3、Subscribe (訂閱)
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
觀察者被觀察者換位置:對流式 API 的設(shè)計會造成影響。
在 RxJava 中, Observable 并不是在創(chuàng)建的時候就立即開始發(fā)送事件,而是在它被訂閱的時候。
除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 還支持不完整定義的回調(diào)
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
Action0 是 RxJava 的一個接口,它只有一個方法 call(),這個方法是無參無返回值的;由于 onCompleted() 方法也是無參無返回值的,因此 Action0 可以被當(dāng)成一個包裝對象,將 onCompleted() 的內(nèi)容打包起來將自己作為一個參數(shù)傳入 subscribe() 以實(shí)現(xiàn)不完整定義的回調(diào)。這樣其實(shí)也可以看做將 onCompleted() 方法作為參數(shù)傳進(jìn)了 subscribe(),相當(dāng)于其他某些語言中的『閉包』。
4、RxJava操作符
變換:就是將事件序列中的對象或整個序列進(jìn)行加工處理,轉(zhuǎn)換成不同的事件或事件序列
FuncX 和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法。
4.1、map()
事件對象的直接變換
Observable.from(students)
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
})
.subscribe(subscriber);
4.2、flatMap()
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
flatMap() 中返回的是個 Observable 對象,并且這個 Observable 對象并不是被直接發(fā)送到了 Subscriber 的回調(diào)方法中。 flatMap() 的原理是這樣的:
- 使用傳入的事件對象創(chuàng)建一個 Observable 對象;
- 并不發(fā)送這個 Observable, 而是將它激活,于是它開始發(fā)送事件;
- 每一個創(chuàng)建出來的 Observable 發(fā)送的事件,都被匯入同一個 Observable ,而這個 Observable 負(fù)責(zé)將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法。
由于可以在嵌套的 Observable 中添加異步代碼, flatMap() 也常用于嵌套的異步操作,例如嵌套的網(wǎng)絡(luò)請求。
5、Scheduler 線程切換
- Schedulers.immediate(): 直接在當(dāng)前線程運(yùn)行,相當(dāng)于不指定線程。這是默認(rèn)的 Scheduler。
- Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作。
- Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實(shí)現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免創(chuàng)建不必要的線程。
- Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費(fèi) CPU。
- AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運(yùn)行。
- subscribeOn():subscribe() 所發(fā)生的線程,即Observable.OnSubscribe 被激活時所處的線程?;蛘呓凶鍪录a(chǎn)生的線程。
- observeOn():Subscriber 所運(yùn)行在的線程?;蛘呓凶鍪录M(fèi)的線程。
Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新線程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 線程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主線程,由 observeOn() 指定
通過 observeOn() 的多次調(diào)用,程序?qū)崿F(xiàn)了線程的多次切換。
不過,不同于 observeOn() , subscribeOn() 的位置放在哪里都可以,但它是只能調(diào)用一次的。
- doOnSubscribe()
與 Subscriber.onStart() 相對應(yīng)的,有一個方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同樣是在 subscribe() 調(diào)用后而且在事件發(fā)送前執(zhí)行,但區(qū)別在于它可以指定線程。默認(rèn)情況下, doOnSubscribe() 執(zhí)行在 subscribe() 發(fā)生的線程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的話,它將執(zhí)行在離它最近的 subscribeOn() 所指定的線程。
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主線程執(zhí)行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);