一、最基本的使用格式:
用subscribeOn()和observerOn()來控制線程,并通過subscribe()來觸發(fā)網(wǎng)絡(luò)請求的開始。代碼大致形式:
disposable = api.getData()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
樣例:
1、導(dǎo)入庫Rxjava和Retrofit庫:
//okHttp log攔截器
implementation 'com.squareup.okhttp3:logging-interceptor:3.4.1'
implementation 'com.squareup.retrofit2:retrofit:2.5.0'
implementation 'com.squareup.retrofit2:converter-gson:2.5.0'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'
implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
2、第二步:網(wǎng)絡(luò)請求接口api轉(zhuǎn)成接口interface
public interface ZhuangbiApi {
@GET("search")
Observable<List<ZhuangbiImage>> search(@Query("q") String query);
}
3、第三步:api接口創(chuàng)建實現(xiàn)類
public class NetWork {
private static ZhuangbiApi zhuangbiApi;
private static Converter.Factory gsonConverterFactory = GsonConverterFactory.create();
private static CallAdapter.Factory rxJavaCallAdapterFactory = RxJava2CallAdapterFactory.create();
public static OkHttpClient getOkHttpClient() {
//新建log攔截器
HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor(new HttpLoggingInterceptor.Logger() {
@Override
public void log(String message) {
Log.d("zcb", "OkHttp====Message:" + message);
}
});
//日志顯示級別
loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
//定制OkHttp
OkHttpClient.Builder httpClientBuilder = new OkHttpClient
.Builder();
//OkHttp進行添加攔截器loggingInterceptor
httpClientBuilder.addInterceptor(loggingInterceptor);
return httpClientBuilder.build();
}
public static ZhuangbiApi getZhuangbiApi() {
if (zhuangbiApi == null) {
Retrofit retrofit = new Retrofit.Builder()
.client(getOkHttpClient())
.baseUrl("http://www.zhuangbi.info/")
.addConverterFactory(gsonConverterFactory)
.addCallAdapterFactory(rxJavaCallAdapterFactory)
.build();
zhuangbiApi = retrofit.create(ZhuangbiApi.class);
}
return zhuangbiApi;
}
}
4、第四步使用
disposable = Network.getZhuangbiApi()
.search(key)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<ZhuangbiImage>>() {
@Override
public void accept(@NonNull List<ZhuangbiImage> images) throws Exception {
swipeRefreshLayout.setRefreshing(false);
adapter.setImages(images);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
swipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
}
});
二、轉(zhuǎn)換(map):
有些服務(wù)端的接口設(shè)計,會在返回的數(shù)據(jù)外層包裹一些額外信息,這些信息對于調(diào)試很有用,但本地顯示是用不到的。使用map()可以把外層的格式剝掉,只留下本地會用到的核心格式。(當(dāng)然,map()也可以用于基于其他各種需求的格式轉(zhuǎn)換)代碼大致形式:
disposable = api.getData()
.map(response->response.data)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
使用案例:
需實現(xiàn)Function接口重寫apply方法進行轉(zhuǎn)換
public class GankBeautyResultToItemsMapper implements Function<GankBeautyResult, List<Item>> {
private static GankBeautyResultToItemsMapper INSTANCE = new GankBeautyResultToItemsMapper();
private GankBeautyResultToItemsMapper() {
}
public static GankBeautyResultToItemsMapper getInstance() {
return INSTANCE;
}
@Override
public List<Item> apply(GankBeautyResult gankBeautyResult) {
List<GankBeauty> gankBeauties = gankBeautyResult.beauties;
List<Item> items = new ArrayList<>(gankBeauties.size());
SimpleDateFormat inputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS'Z'");
SimpleDateFormat outputFormat = new SimpleDateFormat("yy/MM/dd HH:mm:ss");
for (GankBeauty gankBeauty : gankBeauties) {
Item item = new Item();
try {
Date date = inputFormat.parse(gankBeauty.createdAt);
item.description = outputFormat.format(date);
} catch (ParseException e) {
e.printStackTrace();
item.description = "unknown date";
}
item.imageUrl = gankBeauty.url;
items.add(item);
}
return items;
}
}
disposable = Network.getGankApi()
.getBeauties(10, page)
.map(GankBeautyResultToItemsMapper.getInstance())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<Item>>() {
@Override
public void accept(@NonNull List<Item> items) throws Exception {
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
}
});
三、壓合(zip):
有的時候,app中會需要同時訪問不同接口,然后將結(jié)果糅合后轉(zhuǎn)為統(tǒng)一的格式后輸出(例如將第三方廣告的API的廣告夾雜自家平臺返回的數(shù)據(jù)List中)。這種并行的異步處理比較麻煩,不過用了zip()之后就會簡單得多。代碼大致形式:
Observable.zip( api.getData(),adApi.getAds(),zipFunc())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
使用案例:
disposable = Observable.zip(Network.getGankApi().getBeauties(200, 1).map(GankBeautyResultToItemsMapper.getInstance()),
Network.getZhuangbiApi().search("裝逼"),
new BiFunction<List<Item>, List<ZhuangbiImage>, List<Item>>() {
@Override
public List<Item> apply(List<Item> gankItems, List<ZhuangbiImage> zhuangbiImages) {
List<Item> items = new ArrayList<Item>();
for (int i = 0; i < gankItems.size() / 2 && i < zhuangbiImages.size(); i++) {
items.add(gankItems.get(i * 2));
items.add(gankItems.get(i * 2 + 1));
Item zhuangbiItem = new Item();
ZhuangbiImage zhuangbiImage = zhuangbiImages.get(i);
zhuangbiItem.description = zhuangbiImage.description;
zhuangbiItem.imageUrl = zhuangbiImage.image_url;
items.add(zhuangbiItem);
}
return items;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<Item>>() {
@Override
public void accept(@NonNull List<Item> items) throws Exception {
swipeRefreshLayout.setRefreshing(false);
adapter.setItems(items);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
swipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
}
});
四、Token(flatMap):
出于安全性、性能等方面的考慮,多數(shù)服務(wù)器會有一些接口需要傳入 token 才能正確返回結(jié)果,而 token 是需要從另一個接口獲取的,這就需要使用兩步連續(xù)的請求才能獲取數(shù)據(jù)(①token -> ②目標(biāo)數(shù)據(jù))。使用 flatMap() 可以用較為清晰的代碼實現(xiàn)這種連續(xù)請求,避免 Callback 嵌套的結(jié)構(gòu)。代碼大致形式:
disposable = api.getToken()
.flatMap(token->api.getData(token))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
使用案例:
final FakeApi fakeApi = Network.getFakeApi();
disposable = fakeApi.getFakeToken("fake_auth_code")
.flatMap(new Function<FakeToken, Observable<FakeThing>>() {
@Override
public Observable<FakeThing> apply(FakeToken fakeToken) {
return fakeApi.getFakeData(fakeToken);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FakeThing>() {
@Override
public void accept(FakeThing fakeData) {
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
}
});
五、Token_高級(retryWhen):
有的 token 并非一次性的,而是可以多次使用,直到它超時或被銷毀(多數(shù) token 都是這樣的)。這樣的 token 處理起來比較麻煩:需要把它保存起來,并且在發(fā)現(xiàn)它失效的時候要能夠自動重新獲取新的 token 并繼續(xù)訪問之前由于 token 失效而失敗的請求。如果項目中有多處的接口請求都需要這樣的自動修復(fù)機制,使用傳統(tǒng)的 Callback 形式需要寫出非常復(fù)雜的代碼。而使用 RxJava ,可以用 retryWhen() 來輕松地處理這樣的問題。代碼大致形式:
disposable = api.getData(token)
.retryWhen(observable->
observable.flatMap(->
api.getToken()
.doOnNext(->updateToken())))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
使用案例:
final FakeApi fakeApi = Network.getFakeApi();
disposable = Observable.just(1)
.flatMap(new Function<Object, Observable<FakeThing>>() {
@Override
public Observable<FakeThing> apply(Object o) {
return cachedFakeToken.token == null
? Observable.<FakeThing>error(new NullPointerException("Token is null!"))
: fakeApi.getFakeData(cachedFakeToken);
}
})
.retryWhen(new Function<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> apply(Observable<? extends Throwable> observable) {
return observable.flatMap(new Function<Throwable, Observable<?>>() {
@Override
public Observable<?> apply(Throwable throwable) {
if (throwable instanceof IllegalArgumentException || throwable instanceof NullPointerException) {
return fakeApi.getFakeToken("fake_auth_code")
.doOnNext(new Consumer<FakeToken>() {
@Override
public void accept(FakeToken fakeToken) {
tokenUpdated = true;
cachedFakeToken.token = fakeToken.token;
cachedFakeToken.expired = fakeToken.expired;
}
});
}
return Observable.error(throwable);
}
});
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FakeThing>() {
@Override
public void accept(FakeThing fakeData) {
swipeRefreshLayout.setRefreshing(false);
String token = cachedFakeToken.token;
if (tokenUpdated) {
token += "(" + getString(R.string.updated) + ")";
}
tokenTv.setText(getString(R.string.got_token_and_data, token, fakeData.id, fakeData.name));
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
swipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
}
});
五、緩存(BehaviorSubject):
RxJava 中有一個較少被人用到的類叫做 Subject,它是一種『既是 Observable,又是 Observer』的東西,因此可以被用作中間件來做數(shù)據(jù)傳遞。例如,可以用它的子類 BehaviorSubject 來制作緩存。代碼大致形式:
disposable = api.getData()
.subscribe(behaviorSubject);// 網(wǎng)絡(luò)數(shù)據(jù)會被緩存
behaviorSubject.subscribe(observer); // 之前的緩存將直接送達 observer。
public Disposable subscribeData(@NonNull Consumer<List<Item>> onNext, @NonNull Consumer<Throwable> onError) {
if (cache == null) {
cache = BehaviorSubject.create();
Observable.create(new ObservableOnSubscribe<List<Item>>() {
@Override
public void subscribe(ObservableEmitter<List<Item>> e) throws Exception {
List<Item> items = Database.getInstance().readItems();
if (items == null) {
setDataSource(DATA_SOURCE_NETWORK);
loadFromNetwork();
} else {
setDataSource(DATA_SOURCE_DISK);
e.onNext(items);
}
}
})
.subscribeOn(Schedulers.io())
.subscribe(cache);
} else {
setDataSource(DATA_SOURCE_MEMORY);
}
return cache.doOnError(new Consumer<Throwable>() {
@Override
public void accept(@io.reactivex.annotations.NonNull Throwable throwable) throws Exception {
cache = null;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onNext, onError);
}
disposable = Data.getInstance()
.subscribeData(new Consumer<List<Item>>() {
@Override
public void accept(@NonNull List<Item> items) throws Exception {
swipeRefreshLayout.setRefreshing(false);
int loadingTime = (int) (System.currentTimeMillis() - startingTime);
loadingTimeTv.setText(getString(R.string.loading_time_and_source, loadingTime, Data.getInstance().getDataSourceText()));
adapter.setItems(items);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
throwable.printStackTrace();
swipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
}
});
詳細代碼請參考凱哥RxJavaSamples案例