前言
使用RxJava 已經(jīng)有一年多的時間,從 RxJava 1.0 +Retrofit 2.0 網(wǎng)絡(luò)請求框架,包括常用的 RxBus,RxPermission 等,到后面的RxJava2.0+Retrofit2.0。然而始終沒有認真細致的去歸納總結(jié),基于這個緣由,本文將由淺入深,由Rx的基本用法,到RxJava和Retrofit的封裝,到Rx的源碼分析,做一次系統(tǒng)的學(xué)習(xí)。
本文將分為三個方面闡述:
- RxJava 是什么
- RxJava 基本用法
- RxJava 的用途
一:RxJava的認識
想要用一句話總結(jié)RxJava,或許只有作者的那句話才能精準概括了:RxJava實現(xiàn)了jvm的響應(yīng)式擴展,通過觀察序列來實現(xiàn)異步和基于事件的庫。
如果要加上更細致的闡述那就是:它擴展了觀察者模式,支持數(shù)據(jù)或者事件的序列,添加了一系列的操作符,允許你使用聲明的方式組合序列,同時處理了線程、同步、線程安全、和并發(fā)的問題。
RxJava 依賴代碼
compile "io.reactivex:rxjava:1.2.3"
compile "io.reactivex:rxandroid:1.2.1"
既然是觀察者模式,那么就必然有觀察者和被觀察者,在RxJava中,有兩個類需要搞清楚
Observable :被觀察者,Observer :觀察者。
二者通過 subscribe() 方法來實現(xiàn)訂閱的關(guān)系,這樣一來 Observable 可以在需要的時候發(fā)送事件通知 Observer。
Observable 的創(chuàng)建
//創(chuàng)建一個 Observable (被觀察者)
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hi");
subscriber.onCompleted();
}
});
創(chuàng)建 Observable 的時候,需要傳遞一個 OnSubscribe 對象 ,這個對象的指定了一個回調(diào),當訂閱事件發(fā)生的時候,call 方法里面的內(nèi)容就會被調(diào)用。在回調(diào)的方法中 可以看到 Subscriber 對象,這個對象實現(xiàn)了 Observer 和 Subscription 接口,事實上,它是 Observer(觀察者) 的升級版。
而 Observable 創(chuàng)建的時候做了什么呢?
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
RxJavaHooks 的 onCreate() 返回了一個 OnSubscribe 對象,OnSubscribe 是 Action1 的子類,是一個只有一個 call() 方法的接口。RxJavaHooks 的內(nèi)部,則調(diào)用了 RxJavaHooks 的一個接口 Func1 的 call()方法的回調(diào)。
Observer 的創(chuàng)建
//創(chuàng)建一個 Observer (觀察者)
Observer observer = new Observer() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
}
};
或者可以這樣寫:
Subscriber subscriber = new Subscriber() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
}
};
Observer只是一個接口,他們的作用是一樣的,區(qū)別在于 Subscriber 比 Observer 多了兩個方法 onStart() 和 unsubscribe(), 前者是在事件還沒有發(fā)送之前調(diào)用的,可以進行一些準備的動作,后者則是用于取消訂閱,在發(fā)生訂閱行為的時候,被觀察者會持有觀察者的引用,如果在頁面銷毀或者view銷毀或者其他情況下沒有進行解綁,則會發(fā)生內(nèi)存泄漏。
訂閱
//訂閱
observable.subscribe(observer);
這里有個奇怪的現(xiàn)象,你會發(fā)現(xiàn)是被觀察者訂閱了觀察者。之所以這樣實現(xiàn),是為了更好的設(shè)計。
而訂閱行為到底干了什么,為什么要傳遞進去一個接口呢?看下源碼
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new ObserverSubscriber<T>(observer));
}
重點看下 observable 的 subscribe 方法,他需要傳遞一個 ObserverSubscriber的對象 ,它Subscriber 的子類,而且提供了一個構(gòu)造函數(shù)來傳遞 observer。
在真正的 subscribe 方法中,我們可以看到這么一句話:
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed()) {
RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
} else {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
throw r; // NOPMD
}
}
return Subscriptions.unsubscribed();
}
}
subscriber.onStart(); 在 observable 中 調(diào)用了 subscriber 的 onStart()方法,
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); 這個方法,其實就是OnSubscribe對call 方法的調(diào)用。
經(jīng)過上面的三個步驟,就完成了一次RxJava的基本使用。然而事實上,不管是 observable 的創(chuàng)建或者是 observer 或者 Subscriber ,再或者綁定的使用,都有很多種方法可以選擇,事實上,要根據(jù)業(yè)務(wù)邏輯或者不同的場景來選擇使用。比如:
Observable<String> just = Observable.just( "are", "u", "ok");
just.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i(TAG, s);
}
});
通過just 將傳入的參數(shù)依次的發(fā)送出來,這樣,在 Subscriber 的回調(diào)中(onNext),就可以依次的打印出那幾個字符串。
假定有一個數(shù)據(jù)源,我需要依次發(fā)送數(shù)據(jù)源里面的數(shù)據(jù),from方法則會很恰當,就像這樣:
String[] array = new String[]{"are","u","ok"};
Observable<String> observableStr = Observable.from(array);
observableStr.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
});
同樣的,Subscribe() 方法也可以通過Action來實現(xiàn)某些不完全的定義,比如這樣:
Action1<String> action1 = new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s);
}
};
observableStr.subscribe(action1);
Scheduler 調(diào)度器
通過 Schduler 調(diào)度器來指定某段代碼運行在哪個線程之中,因為在日常開發(fā)中,我們總是會將耗時的請求放在后臺,通過回調(diào)在前臺更新ui,基于這些重要的線程控制,Scheduler 提供了一系列的方法來進行線程切換。諸如:
//直接運行,即在當前的線程運行
Schedulers.immediate();
//計算的線程,主要用于事件輪詢,回調(diào)的處理等。不能進行耗時的操作。
Schedulers.computation();
//開啟一個新的線程,并且運行在新線程
Schedulers.newThread();
// 讀寫、網(wǎng)絡(luò)、等耗時操作
Schedulers.io();
//Android 主線程
AndroidSchedulers.mainThread();
Observable 提供了一些方法來指定我們需要的線程:
observableStr.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
subscribeOn() 方法指定了subscribe() 發(fā)生在IO線程,observeOn() 則指定了其回調(diào)發(fā)生在主線程。準確的說前者是指定事件產(chǎn)生的線程,后者是指定事件消費的線程。
變換
所謂變換,即把事件序列 中的對象或者整個序列進行變換,得到我想要的事件或者序列。通過一個簡單的例子就可以看出來轉(zhuǎn)換的強大之處了。
比如說我想加載指定路徑下的ImageView
String mImgPath = "...";
Observable.just(mImgPath)
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
Bitmap bitmap = BitmapFactory.decodeFile(s);
return bitmap;
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
imageView.setImageBitmap(bitmap);
}
});
map()方法需要一個 Func1 的回調(diào) ,F(xiàn)unc1是一個簡單的接口,和 Action 不同的地方在于,它是有返回值的。這就達到了轉(zhuǎn)換的目的。我們看到,我們發(fā)送事件的只是一個字符串,通過轉(zhuǎn)換,在我們需要Bitmap的時候,在map() 中把字符串做了轉(zhuǎn)換。
除此之外,還有一個轉(zhuǎn)換的方法 flatMap(),與map()方法不同的地方在于,flatMap中返回的是一個 Observable 對象。通過 flatMap 將事件進行了拆分,可以進行更加細致和精準的處理:
假如有這個一個類,在發(fā)起事件的時候傳遞整個類的對象,最后我想要的是String content,這種情況使用flatMap 就更加方便了。
public class DataBean implements Serializable {
public Data data;
public String message;
public int code;
class Data {
public String title;
public Content content;
}
class Content {
public String content;
}
}
final List<DataBean> beanList = new ArrayList<>();
Subscriber<String> subscriber1 = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
};
Observable.from(beanList)
.flatMap(new Func1<DataBean, Observable<DataBean.Content>>() {
@Override
public Observable<DataBean.Content> call(DataBean bean) {
return Observable.just(bean.data.content);
}
})
.flatMap(new Func1<DataBean.Content, Observable<String>>() {
@Override
public Observable<String> call(DataBean.Content content) {
return Observable.just(content.content);
}
})
.subscribe(subscriber1);
我們會發(fā)現(xiàn) map 是一對一的關(guān)系,而 flatMap 是一對多的關(guān)系,通過 flatmap 可以實現(xiàn)很強大的功能。
小示例
假如我們需要把一個Bitmap格式的圖片保存為png或者jpg文件 ,假如使用RxJava,結(jié)合上面的線程調(diào)度器,我們該怎么寫呢?直接上代碼吧。
Observable.create(new SaveObservable(bitmap))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SaveSubscriber());
//Observable
private class SaveObservable implements Observable.OnSubscribe<String> {
private Bitmap drawingCache = null;
public SaveObservable(Bitmap drawingCache) {
this.drawingCache = drawingCache;
}
@Override
public void call(Subscriber<? super String> subscriber) {
if (drawingCache == null) {
subscriber.onError(new NullPointerException("獲取圖片失敗"));
} else {
try {
File imageFile = new File(imagePath,imageName);
FileOutputStream outStream = new FileOutputStream(imageFile);
drawingCache.compress(Bitmap.CompressFormat.JPEG, 100, outStream);
subscriber.onNext(Environment.getExternalStorageDirectory() + "/store");
subscriber.onCompleted();
outStream.flush();
outStream.close();
} catch (IOException e) {
subscriber.onError(e);
}
}
}
}
//Subscriber
private class SaveSubscriber extends Subscriber<String> {
@Override
public void onCompleted() {
// Toast.makeText(context, "保存成功", Toast.LENGTH_SHORT).show();
}
@Override
public void onError(Throwable e) {
Log.i(getClass().getSimpleName(), e.toString());
Toast.makeText(context, "保存失敗:" + e.toString(), Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(String s) {
Toast.makeText(context, "保存至" + s, Toast.LENGTH_SHORT).show();
}
}
這樣以來,我們的代碼看起來非常的清爽,而且邏輯清晰。