響應(yīng)式編程是以異步和數(shù)據(jù)流來構(gòu)建事務(wù)關(guān)系的編程模型,異步和數(shù)據(jù)流是以構(gòu)建事務(wù)關(guān)系而存在,異步是為了區(qū)分無關(guān)的事務(wù),數(shù)據(jù)流是為了聯(lián)系有關(guān)的事務(wù)
版權(quán)聲明:本文出自門心叼龍的博客,屬于原創(chuàng)內(nèi)容,轉(zhuǎn)載請(qǐng)注明出處
https://blog.csdn.net/geduo_83/article/details/89740000
1.實(shí)現(xiàn)簡(jiǎn)單的網(wǎng)絡(luò)請(qǐng)求
Observable
.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
Builder builder = new Builder()
.url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
.get();
Request request = builder.build();
Call call = new OkHttpClient().newCall(request);
Response response = call.execute();
e.onNext(response);
}
})
.map(new Function<Response, MobileAddress>() {
@Override
public MobileAddress apply(@NonNull Response response) throws Exception {
Log.e(TAG, "map 線程:" + Thread.currentThread().getName() + "\n");
if (response.isSuccessful()) {
ResponseBody body = response.body();
if (body != null) {
Log.e(TAG, "map:轉(zhuǎn)換前:" + response.body());
return new Gson().fromJson(body.string(), MobileAddress.class);
}
}
return null;
}
})
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<MobileAddress>() {
@Override
public void accept(@NonNull MobileAddress s) throws Exception {
Log.e(TAG, "doOnNext 線程:" + Thread.currentThread().getName() + "\n");
mRxOperatorsText.append("\ndoOnNext 線程:" + Thread.currentThread().getName() + "\n");
Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");
mRxOperatorsText.append("doOnNext: 保存成功:" + s.toString() + "\n");
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<MobileAddress>() {
@Override
public void accept(@NonNull MobileAddress data) throws Exception {
Log.e(TAG, "subscribe 線程:" + Thread.currentThread().getName() + "\n");
mRxOperatorsText.append("\nsubscribe 線程:" + Thread.currentThread().getName() + "\n");
Log.e(TAG, "成功:" + data.toString() + "\n");
mRxOperatorsText.append("成功:" + data.toString() + "\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "subscribe 線程:" + Thread.currentThread().getName() + "\n");
mRxOperatorsText.append("\nsubscribe 線程:" + Thread.currentThread().getName() + "\n");
Log.e(TAG, "失?。? + throwable.getMessage() + "\n");
mRxOperatorsText.append("失敗:" + throwable.getMessage() + "\n");
}
});
2.先讀取緩存,如果緩存沒數(shù)據(jù)再通過網(wǎng)絡(luò)請(qǐng)求獲取數(shù)據(jù)后更新UI
Observable<FoodList> cache = Observable
.create(new ObservableOnSubscribe<FoodList>() {
@Override
public void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception {
Log.e(TAG, "create當(dāng)前線程:"+Thread.currentThread().getName() );
FoodList data = CacheManager.getInstance().getFoodListData();
// 在操作符 concat 中,只有調(diào)用 onComplete 之后才會(huì)執(zhí)行下一個(gè) Observable
if (data != null){ // 如果緩存數(shù)據(jù)不為空,則直接讀取緩存數(shù)據(jù),而不讀取網(wǎng)絡(luò)數(shù)據(jù)
isFromNet = false;
Log.e(TAG, "\nsubscribe: 讀取緩存數(shù)據(jù):" );
runOnUiThread(new Runnable() {
@Override
public void run() {
mRxOperatorsText.append("\nsubscribe: 讀取緩存數(shù)據(jù):\n");
}
});
e.onNext(data);
}else {
isFromNet = true;
runOnUiThread(new Runnable() {
@Override
public void run() {
mRxOperatorsText.append("\nsubscribe: 讀取網(wǎng)絡(luò)數(shù)據(jù):\n");
}
});
Log.e(TAG, "\nsubscribe: 讀取網(wǎng)絡(luò)數(shù)據(jù):" );
e.onComplete();
}
}
});
Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
.addQueryParameter("rows",10+"")
.build()
.getObjectObservable(FoodList.class);
// 兩個(gè) Observable 的泛型應(yīng)當(dāng)保持一致
Observable
.concat(cache,network)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FoodList>() {
@Override
public void accept(@NonNull FoodList tngouBeen) throws Exception {
Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() );
if (isFromNet){
mRxOperatorsText.append("accept : 網(wǎng)絡(luò)獲取數(shù)據(jù)設(shè)置緩存: \n");
Log.e(TAG, "accept : 網(wǎng)絡(luò)獲取數(shù)據(jù)設(shè)置緩存: \n"+tngouBeen.toString() );
CacheManager.getInstance().setFoodListData(tngouBeen);
}
mRxOperatorsText.append("accept: 讀取數(shù)據(jù)成功:" + tngouBeen.toString()+"\n");
Log.e(TAG, "accept: 讀取數(shù)據(jù)成功:" + tngouBeen.toString());
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName() );
Log.e(TAG, "accept: 讀取數(shù)據(jù)失?。?+throwable.getMessage() );
mRxOperatorsText.append("accept: 讀取數(shù)據(jù)失?。?+throwable.getMessage()+"\n");
}
});
3.多個(gè)網(wǎng)絡(luò)請(qǐng)求依次依賴
Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
.addQueryParameter("rows", 1 + "")
.build()
.getObjectObservable(FoodList.class) // 發(fā)起獲取食品列表的請(qǐng)求,并解析到FootList
.subscribeOn(Schedulers.io()) // 在io線程進(jìn)行網(wǎng)絡(luò)請(qǐng)求
.observeOn(AndroidSchedulers.mainThread()) // 在主線程處理獲取食品列表的請(qǐng)求結(jié)果
.doOnNext(new Consumer<FoodList>() {
@Override
public void accept(@NonNull FoodList foodList) throws Exception {
// 先根據(jù)獲取食品列表的響應(yīng)結(jié)果做一些操作
Log.e(TAG, "accept: doOnNext :" + foodList.toString());
mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n");
}
})
.observeOn(Schedulers.io()) // 回到 io 線程去處理獲取食品詳情的請(qǐng)求
.flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {
@Override
public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {
if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {
return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show")
.addBodyParameter("id", foodList.getTngou().get(0).getId() + "")
.build()
.getObjectObservable(FoodDetail.class);
}
return null;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FoodDetail>() {
@Override
public void accept(@NonNull FoodDetail foodDetail) throws Exception {
Log.e(TAG, "accept: success :" + foodDetail.toString());
mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "accept: error :" + throwable.getMessage());
mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n");
}
});
4.結(jié)合多個(gè)接口的數(shù)據(jù)更新UI
Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
.build()
.getObjectObservable(MobileAddress.class);
Observable<CategoryResult> observable2 = Network.getGankApi()
.getCategoryData("Android",1,1);
Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() {
@Override
public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception {
return "合并后的數(shù)據(jù)為:手機(jī)歸屬地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept: 成功:" + s+"\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "accept: 失?。? + throwable+"\n");
}
});
5.間隔任務(wù)實(shí)現(xiàn)心跳
private Disposable mDisposable;
@Override
protected void doSomething() {
mDisposable = Flowable.interval(1, TimeUnit.SECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: doOnNext : "+aLong );
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: 設(shè)置文本 :"+aLong );
mRxOperatorsText.append("accept: 設(shè)置文本 :"+aLong +"\n");
}
});
}
/**
* 銷毀時(shí)停止心跳
*/
@Override
protected void onDestroy() {
super.onDestroy();
if (mDisposable != null){
mDisposable.dispose();
}
}