*文章已授權(quán)微信公眾號 guolin_blog (郭霖)獨家發(fā)布
拖拖踏踏的第三篇文章,我又來造輪子了,一直糾結(jié)要不要寫這個主題的文章,總感覺的自己駕馭不了RxJava這么高深的東西。本篇可能比較多的是個人的理解。
------------- 2018-05-21更新--------------
升級為Retrofit2.0+RxJava2 的版本,項目結(jié)構(gòu)做了一些修改。項目地址https://github.com/Hemumu/Template
前言
Retrofit 和RxJava已經(jīng)出來很久了,很多前輩寫了很多不錯的文章,在此不得不感謝這些前輩無私奉獻(xiàn)的開源精神,能讓我們站在巨人的肩膀上望得更遠(yuǎn)。對于 RxJava 不是很了解的同學(xué)推薦你們看扔物線大神的這篇文章給 Android 開發(fā)者的 RxJava 詳解一遍看不懂就看第二遍。Retrofit的使用可以參考Android Retrofit 2.0使用
本文內(nèi)容是基于Retrofit + RxJava做的一些巧妙的封裝。參考了很多文章加入了一些自己的理解,請多指教。源碼地址https://github.com/Hemumu/RxSample
先放出build.gradle
compile 'io.reactivex:rxjava:1.1.0'
compile 'io.reactivex:rxandroid:1.1.0'
compile 'com.squareup.retrofit2:retrofit:2.0.0-beta4'
compile 'com.squareup.retrofit2:converter-gson:2.0.0-beta4'
compile 'com.squareup.retrofit2:adapter-rxjava:2.0.0-beta4'
本文是基于RxJava1.1.0和Retrofit 2.0.0-beta4來進(jìn)行的。
初始化 Retrofit
新建類Api,此類就是初始化Retrofit,提供一個靜態(tài)方法初始化Retrofit非常簡單.
private static ApiService SERVICE;
/**
* 請求超時時間
*/
private static final int DEFAULT_TIMEOUT = 10000;
public static ApiService getDefault() {
if (SERVICE == null) {
//手動創(chuàng)建一個OkHttpClient并設(shè)置超時時間
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
httpClientBuilder.connectTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
/**
* 對所有請求添加請求頭
*/
httpClientBuilder.addInterceptor(new Interceptor() {
@Override
public okhttp3.Response intercept(Chain chain) throws IOException {
Request request = chain.request();
okhttp3.Response originalResponse = chain.proceed(request);
return originalResponse.newBuilder().header("key1", "value1").addHeader("key2", "value2").build();
}
});
SERVICE = new Retrofit.Builder()
.client(httpClientBuilder.build())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.baseUrl(Url.BASE_URL)
.build().create(ApiService.class);
}
return SERVICE;
}
提供一個靜態(tài)方法初始化Retrofit,手動創(chuàng)建了OkHttpClient設(shè)置了請求的超時時間。并在OkHttp的攔截器中增加了請求頭。注意這里是為所有的請求添加了請求頭,你可以單獨的給請求增加請求頭,例如
@Headers("apikey:b86c2269fe6588bbe3b41924bb2f2da2")
@GET("/student/login")
Observable<HttpResult> login(@Query("phone") String phone, @Query("password") String psw);
和Retrofit初始化不同的地方就在我們添加了這兩句話
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
service的定義也從這樣
@GET("/student/login")
Call<HttpResult> getTopMovie(@Query("start") int start, @Query("count") int count);
變成了
@GET("/student/login")
Observable<HttpResult> login(@Query("phone") String phone, @Query("password") String psw);
返回值變成了Observable,這個Observable不就是RxJava的可觀察者(即被觀察者)么。
封裝服務(wù)器請求以及返回數(shù)據(jù)
用戶在使用任何一個網(wǎng)絡(luò)框架都只關(guān)系請求的返回和錯誤信息,所以對請求的返回和請求要做一個細(xì)致的封裝。
我們一般請求的返回都是像下面這樣
{
"code":"200",
"message":"Return Successd!",
"data":{
"name":"張三"
"age":3
}
}
如果你們的服務(wù)器返回不是這樣的格式那你就只有坐下來請他喝茶,跟他好好說(把他頭摁進(jìn)顯示器)了。大不了就獻(xiàn)出你的菊花吧!

對于這樣的數(shù)據(jù)我們肯定要對code做出一些判斷,不同的code對應(yīng)不同的錯誤信息。所以我們新建一個HttpResult類,對應(yīng)上面的數(shù)據(jù)結(jié)構(gòu)。
public class HttpResult<T> {
private int code;
private String message;
private T data;
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
這算是所有實體的一個基類,data可以為任何數(shù)據(jù)類型。
我們要對所以返回結(jié)果進(jìn)行預(yù)處理,新建一個RxHelper,預(yù)處理無非就是對code進(jìn)行判斷和解析,不同的錯誤返回不同的錯誤信息,這還不簡單。Rxjava的map操作符不是輕松解決
Api.getDefault().login("name","psw")
.map(new HttpResultFunc<UserEntity>());
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
private class HttpResultFunc<T> implements Func1<HttpResult<T>, T> {
@Override
public T call(HttpResult<T> httpResult) {
Log.e("error", httpResult.getData().toString() + "");
if (httpResult.getCode() != 0) {
throw new ApiException(httpResult.getCode());
}
return httpResult.getData();
}
}
喲,這不是輕松愉快 so seay么!對code進(jìn)行了判斷,code為0就做對應(yīng)更新UI或者其他后續(xù)操作,不等于0就拋出異常,在ApiException中隊code做處理,根據(jù)message字段進(jìn)行提示用戶
private static String getApiExceptionMessage(int code){
switch (code) {
case USER_NOT_EXIST:
message = "該用戶不存在";
break;
case WRONG_PASSWORD:
message = "密碼錯誤";
break;
default:
message = "未知錯誤";
}
return message;
}
撒花!??!

然而。。。RxJava永遠(yuǎn)比你想象的強大。RxJava中那么多操作符看到我身體不適,有個操作符compose。因為我們在每一個請求中都會處理code以及一些重用一些操作符,比如用observeOn和subscribeOn來切換線程。RxJava提供了一種解決方案:Transformer(轉(zhuǎn)換器),一般情況下就是通過使用操作符Observable.compose()來實現(xiàn)。具體可以參考避免打斷鏈?zhǔn)浇Y(jié)構(gòu):使用.compose( )操作符
新建一個RxHelper對結(jié)果進(jìn)行預(yù)處理,代碼
public class RxHelper {
/**
* 對結(jié)果進(jìn)行預(yù)處理
*
* @param <T>
* @return
*/
public static <T> Observable.Transformer<HttpResult<T>, T> handleResult() {
return new Observable.Transformer<HttpResult<T>, T>() {
@Override
public Observable<T> call(Observable<HttpResult<T>> tObservable) {
return tObservable.flatMap(new Func1<HttpResult<T>, Observable<T>>() {
@Override
public Observable<T> call(HttpResult<T> result) {
LogUtils.e(result.getCode()+"");
if (result.getCode() == 0) {
return createData(result.getData());
} else {
return Observable.error(new ApiException(result.getCode()));
}
}
}).subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io()).subscribeOn(AndroidSchedulers.mainThread()).observeOn(AndroidSchedulers.mainThread());
}
};
}
/**
* 創(chuàng)建成功的數(shù)據(jù)
*
* @param data
* @param <T>
* @return
*/
private static <T> Observable<T> createData(final T data) {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
try {
subscriber.onNext(data);
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
});
}
}
Transformer實際上就是一個Func1<Observable<T>, Observable<R>>,換言之就是:可以通過它將一種類型的Observable轉(zhuǎn)換成另一種類型的Observable,和調(diào)用一系列的內(nèi)聯(lián)操作符是一模一樣的。這里我們首先使用flatMap操作符把Obserable<HttpResult<T>>,轉(zhuǎn)換成為Observable<T>在內(nèi)部對code進(jìn)行了預(yù)處理。如果成功則把結(jié)果Observable<T>發(fā)射給訂閱者。反之則把code交給ApiException并返回一個異常,ApiException中我們對code進(jìn)行相應(yīng)的處理并返回對應(yīng)的錯誤信息
public class ApiException extends RuntimeException{
public static final int USER_NOT_EXIST = 100;
public static final int WRONG_PASSWORD = 101;
private static String message;
public ApiException(int resultCode) {
this(getApiExceptionMessage(resultCode));
}
public ApiException(String detailMessage) {
super(detailMessage);
}
@Override
public String getMessage() {
return message;
}
/**
* 由于服務(wù)器傳遞過來的錯誤信息直接給用戶看的話,用戶未必能夠理解
* 需要根據(jù)錯誤碼對錯誤信息進(jìn)行一個轉(zhuǎn)換,在顯示給用戶
* @param code
* @return
*/
private static String getApiExceptionMessage(int code){
switch (code) {
case USER_NOT_EXIST:
message = "該用戶不存在";
break;
case WRONG_PASSWORD:
message = "密碼錯誤";
break;
default:
message = "未知錯誤";
}
return message;
}
}
最后調(diào)用了頻繁使用的subscribeOn()和observeOn()以及unsubscribeOn()。
處理ProgressDialog
在Rxjava中我們什么時候來顯示Dialog呢。一開始覺得是放在Subscriber<T>的onStart中。onStart可以用作流程開始前的初始化。然而 onStart()由于在 subscribe()發(fā)生時就被調(diào)用了,因此不能指定線程,而是只能執(zhí)行在 subscribe()被調(diào)用時的線程。所以onStart并不能保證永遠(yuǎn)在主線程運行。
怎么辦呢?

千萬不要小看了RxJava,與 onStart()相對應(yīng)的有一個方法 doOnSubscribe(),它和 onStart()同樣是在subscribe()調(diào)用后而且在事件發(fā)送前執(zhí)行,但區(qū)別在于它可以指定線程。默認(rèn)情況下, doOnSubscribe()執(zhí)行在 subscribe()發(fā)生的線程;而如果在 doOnSubscribe()之后有 subscribeOn()的話,它將執(zhí)行在離它最近的subscribeOn()所指定的線程。可以看到在RxHelper中看到我們調(diào)用了兩次subscribeOn,最后一個調(diào)用也就是離doOnSubscribe()最近的一次subscribeOn是指定的AndroidSchedulers.mainThread()也就是主線程。這樣我們就就能保證它永遠(yuǎn)都在主線運行了。這里不得不感概RxJava的強大。
這里我們自定義一個類ProgressSubscriber繼承Subscriber<T>
public abstract class ProgressSubscriber<T> extends Subscriber<T> implements ProgressCancelListener{
private SimpleLoadDialog dialogHandler;
public ProgressSubscriber(Context context) {
dialogHandler = new SimpleLoadDialog(context,this,true);
}
@Override
public void onCompleted() {
dismissProgressDialog();
}
/**
* 顯示Dialog
*/
public void showProgressDialog(){
if (dialogHandler != null) {
dialogHandler.obtainMessage(SimpleLoadDialog.SHOW_PROGRESS_DIALOG).sendToTarget();
}
}
@Override
public void onNext(T t) {
_onNext(t);
}
/**
* 隱藏Dialog
*/
private void dismissProgressDialog(){
if (dialogHandler != null) {
dialogHandler.obtainMessage(SimpleLoadDialog.DISMISS_PROGRESS_DIALOG).sendToTarget();
dialogHandler=null;
}
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
if (false) { //這里自行替換判斷網(wǎng)絡(luò)的代碼
_onError("網(wǎng)絡(luò)不可用");
} else if (e instanceof ApiException) {
_onError(e.getMessage());
} else {
_onError("請求失敗,請稍后再試...");
}
dismissProgressDialog();
}
@Override
public void onCancelProgress() {
if (!this.isUnsubscribed()) {
this.unsubscribe();
}
}
protected abstract void _onNext(T t);
protected abstract void _onError(String message);
}
初始化ProgressSubscriber新建了一個我們自己定義的ProgressDialog并且傳入一個自定義接口ProgressCancelListener。此接口是在SimpleLoadDialog消失onCancel的時候回調(diào)的。用于終止網(wǎng)絡(luò)請求。
load.setOnCancelListener(new DialogInterface.OnCancelListener() {
@Override
public void onCancel(DialogInterface dialog) {
mProgressCancelListener.onCancelProgress();
}
});
ProgressSubscriber其他就很簡單了,在onCompleted()和onError()的時候取消Dialog。需要的時候調(diào)用showProgressDialog即可。
處理數(shù)據(jù)緩存
服務(wù)器返回的數(shù)據(jù)我們肯定要做緩存,所以我們需要一個RetrofitCache類來做緩存處理。
public class RetrofitCache {
/**
* @param cacheKey 緩存的Key
* @param fromNetwork
* @param isSave 是否緩存
* @param forceRefresh 是否強制刷新
* @param <T>
* @return
*/
public static <T> Observable<T> load(final String cacheKey,
Observable<T> fromNetwork,
boolean isSave, boolean forceRefresh) {
Observable<T> fromCache = Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
T cache = (T) Hawk.get(cacheKey);
if (cache != null) {
subscriber.onNext(cache);
} else {
subscriber.onCompleted();
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
//是否緩存
if (isSave) {
/**
* 這里的fromNetwork 不需要指定Schedule,在handleRequest中已經(jīng)變換了
*/
fromNetwork = fromNetwork.map(new Func1<T, T>() {
@Override
public T call(T result) {
Hawk.put(cacheKey, result);
return result;
}
});
}
//強制刷新
if (forceRefresh) {
return fromNetwork;
} else {
return Observable.concat(fromCache, fromNetwork).first();
}
}
}
幾個參數(shù)注釋上面已經(jīng)寫得很清楚了,不需要過多的解釋。這里我們先取了一個Observable<T>對象fromCache,里面的操作很簡單,去緩存里面找個key對應(yīng)的緩存,如果有就發(fā)射數(shù)據(jù)。在fromNetwork里面做的操作僅僅是緩存數(shù)據(jù)這一操作。最后判斷如果強制刷新就直接返回fromNetwork反之用Observable.concat()做一個合并。concat操作符將多個Observable結(jié)合成一個Observable并發(fā)射數(shù)據(jù)。這里又用了first()。fromCache和fromNetwork任何一步一旦發(fā)射數(shù)據(jù)后面的操作都不執(zhí)行。
最后我們新建一個HttpUtil用來返回用戶關(guān)心的數(shù)據(jù),緩存,顯示Dialog在這里面進(jìn)行。
public class HttpUtil{
/**
* 構(gòu)造方法私有
*/
private HttpUtil() {
}
/**
* 在訪問HttpUtil時創(chuàng)建單例
*/
private static class SingletonHolder {
private static final HttpUtil INSTANCE = new HttpUtil();
}
/**
* 獲取單例
*/
public static HttpUtil getInstance() {
return SingletonHolder.INSTANCE;
}
//添加線程管理并訂閱
public void toSubscribe(Observable ob, final ProgressSubscriber subscriber,String cacheKey,boolean isSave, boolean forceRefresh) {
//數(shù)據(jù)預(yù)處理
Observable.Transformer<HttpResult<Object>, Object> result = RxHelper.handleResult();
//重用操作符
Observable observable = ob.compose(result)
.doOnSubscribe(new Action0() {
@Override
public void call() {
//顯示Dialog和一些其他操作
subscriber.showProgressDialog();
}
});
//緩存
RetrofitCache.load(cacheKey,observable,isSave,forceRefresh).subscribe(subscriber);
}
Activity生命周期管理
基本的網(wǎng)絡(luò)請求都是向服務(wù)器請求數(shù)據(jù),客戶端拿到數(shù)據(jù)后更新UI。但也不排除意外情況,比如請求回數(shù)據(jù)途中Activity已經(jīng)不在了,這個時候就應(yīng)該取消網(wǎng)絡(luò)請求。
要實現(xiàn)上面的功能其實很簡單,兩部分
- 隨時監(jiān)聽Activity(Fragment)的生命周期并對外發(fā)射出去; 在我們的網(wǎng)絡(luò)請求中,接收生命周期
- 并進(jìn)行判斷,如果該生命周期是自己綁定的,如Destory,那么就斷開數(shù)據(jù)向下傳遞的過程
實現(xiàn)以上功能需要用到Rxjava的Subject的子類PublishSubject
在你的BaseActivity中添加如下代碼
public class BaseActivity extends AppCompatActivity {
public final PublishSubject<ActivityLifeCycleEvent> lifecycleSubject = PublishSubject.create();
@Override
protected void onCreate(Bundle savedInstanceState) {
lifecycleSubject.onNext(ActivityLifeCycleEvent.CREATE);
super.onCreate(savedInstanceState);
}
@Override
protected void onPause() {
lifecycleSubject.onNext(ActivityLifeCycleEvent.PAUSE);
super.onPause();
}
@Override
protected void onStop() {
lifecycleSubject.onNext(ActivityLifeCycleEvent.STOP);
super.onStop();
}
@Override
protected void onDestroy() {
super.onDestroy();
lifecycleSubject.onNext(ActivityLifeCycleEvent.DESTROY);
}
這樣的話,我們把所有生命周期事件都傳給了PublishSubject了,或者說PublishSubject已經(jīng)接收到了并能夠?qū)ν獍l(fā)射各種生命周期事件的能力了。
現(xiàn)在我們要讓網(wǎng)絡(luò)請求的時候去監(jiān)聽這個PublishSubject,在收到相應(yīng)的生命周期后取消網(wǎng)絡(luò)請求,這又用到了我們神奇的compose(),我們需要修改handleResult代碼如下
public static <T> Observable.Transformer<HttpResult<T>, T> handleResult(final ActivityLifeCycleEvent event,final PublishSubject<ActivityLifeCycleEvent> lifecycleSubject) {
return new Observable.Transformer<HttpResult<T>, T>() {
@Override
public Observable<T> call(Observable<HttpResult<T>> tObservable) {
Observable<ActivityLifeCycleEvent> compareLifecycleObservable =
lifecycleSubject.takeFirst(new Func1<ActivityLifeCycleEvent, Boolean>() {
@Override
public Boolean call(ActivityLifeCycleEvent activityLifeCycleEvent) {
return activityLifeCycleEvent.equals(event);
}
});
return tObservable.flatMap(new Func1<HttpResult<T>, Observable<T>>() {
@Override
public Observable<T> call(HttpResult<T> result) {
if (result.getCount() != 0) {
return createData(result.getSubjects());
} else {
return Observable.error(new ApiException(result.getCount()));
}
}
}) .takeUntil(compareLifecycleObservable).subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io()).subscribeOn(AndroidSchedulers.mainThread()).observeOn(AndroidSchedulers.mainThread());
}
};
}
調(diào)用的時候增加了兩個參數(shù)一個是ActivityLifeCycleEvent 其實就是一些枚舉表示Activity的生命周期
public enum ActivityLifeCycleEvent {
CREATE,
START,
RESUME,
PAUSE,
STOP,
DESTROY
}
另外一個參數(shù)就是我們在BaseActivity添加的PublishSubject,這里用到了takeUntil()它的作用是監(jiān)聽我們創(chuàng)建的compareLifecycleObservable,compareLifecycleObservable中就是判斷了如果當(dāng)前生命周期和Activity一樣就發(fā)射數(shù)據(jù),一旦compareLifecycleObservable 對外發(fā)射了數(shù)據(jù),就自動把當(dāng)前的Observable(也就是網(wǎng)絡(luò)請求的Observable)停掉。
當(dāng)然有個庫是專門針對這種情況的,叫RxLifecycle,不過要繼承他自己的RxActivity,當(dāng)然這個庫不只是針對網(wǎng)絡(luò)請求,其他所有的Rxjava都可以。有需要的可以去看看。
最后新建一個ApiService存放我們的請求
public interface ApiService {
@GET("/student/mobileRegister")
Observable<HttpResult<UserEntity>> login(@Query("phone") String phone, @Query("password") String psw);
}
使用
使用起來就超級簡單了
/**
*
*
// ┏┓ ┏┓
//┏┛┻━━━┛┻┓
//┃ ┃
//┃ ━ ┃
//┃ ┳┛ ┗┳ ┃
//┃ ┃
//┃ ┻ ┃
//┃ ┃
//┗━┓ ┏━┛
// ┃ ┃ 神獸保佑
// ┃ ┃ 阿彌陀佛
// ┃ ┗━━━┓
// ┃ ┣┓
// ┃ ┏┛
// ┗┓┓┏━┳┓┏┛
// ┃┫┫ ┃┫┫
// ┗┻┛ ┗┻┛
//
*/
//獲取豆瓣電影TOP 100
Observable ob = Api.getDefault().getTopMovie(0, 100);
HttpUtil.getInstance().toSubscribe(ob, new ProgressSubscriber<List<Subject>>(this) {
@Override
protected void _onError(String message) {
Toast.makeText(MainActivity.this, message, Toast.LENGTH_LONG).show();
}
@Override
protected void _onNext(List<Subject> list) {
}
}, "cacheKey", ActivityLifeCycleEvent.PAUSE, lifecycleSubject, false, false);
具體很多東西都可以在使用的時候具體修改,比如緩存我用的Hawk。Dialog是我自己定義的一個SimpleLoadDialog。源碼已經(jīng)給出請多指教!
-------------更新--------------
評論區(qū)有人提出對于Activity生命周期的管理,個人疏忽大意,特地來加上。
END!
Thanks
Rx處理服務(wù)器請求、緩存的完美封裝
給 Android 開發(fā)者的 RxJava 詳解
RxJava 與 Retrofit 結(jié)合的最佳實踐
可能是東半球最全的RxJava使用場景小結(jié)
帶你學(xué)開源項目:RxLifecycle - 當(dāng)Activity被destory時自動暫停網(wǎng)絡(luò)請求