RxJava在Android移動(dòng)端開發(fā)中的實(shí)戰(zhàn)應(yīng)用之二

響應(yīng)式編程是以異步和數(shù)據(jù)流來構(gòu)建事務(wù)關(guān)系的編程模型,異步和數(shù)據(jù)流是以構(gòu)建事務(wù)關(guān)系而存在,異步是為了區(qū)分無關(guān)的事務(wù),數(shù)據(jù)流是為了聯(lián)系有關(guān)的事務(wù)

版權(quán)聲明:本文出自門心叼龍的博客,屬于原創(chuàng)內(nèi)容,轉(zhuǎn)載請(qǐng)注明出處
https://blog.csdn.net/geduo_83/article/details/89740000

1.實(shí)現(xiàn)簡(jiǎn)單的網(wǎng)絡(luò)請(qǐng)求

Observable
.create(new ObservableOnSubscribe<Response>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
                Builder builder = new Builder()
                        .url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                        .get();
                Request request = builder.build();
                Call call = new OkHttpClient().newCall(request);
                Response response = call.execute();
                e.onNext(response);
            }
        })
.map(new Function<Response, MobileAddress>() {
                    @Override
                    public MobileAddress apply(@NonNull Response response) throws Exception {

                        Log.e(TAG, "map 線程:" + Thread.currentThread().getName() + "\n");
                        if (response.isSuccessful()) {
                            ResponseBody body = response.body();
                            if (body != null) {
                                Log.e(TAG, "map:轉(zhuǎn)換前:" + response.body());
                                return new Gson().fromJson(body.string(), MobileAddress.class);
                            }
                        }
                        return null;
                    }
                })
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress s) throws Exception {
                        Log.e(TAG, "doOnNext 線程:" + Thread.currentThread().getName() + "\n");
                        mRxOperatorsText.append("\ndoOnNext 線程:" + Thread.currentThread().getName() + "\n");
                        Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");
                        mRxOperatorsText.append("doOnNext: 保存成功:" + s.toString() + "\n");

                    }
                })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress data) throws Exception {
                        Log.e(TAG, "subscribe 線程:" + Thread.currentThread().getName() + "\n");
                        mRxOperatorsText.append("\nsubscribe 線程:" + Thread.currentThread().getName() + "\n");
                        Log.e(TAG, "成功:" + data.toString() + "\n");
                        mRxOperatorsText.append("成功:" + data.toString() + "\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "subscribe 線程:" + Thread.currentThread().getName() + "\n");
                        mRxOperatorsText.append("\nsubscribe 線程:" + Thread.currentThread().getName() + "\n");

                        Log.e(TAG, "失?。? + throwable.getMessage() + "\n");
                        mRxOperatorsText.append("失敗:" + throwable.getMessage() + "\n");
                    }
                });

2.先讀取緩存,如果緩存沒數(shù)據(jù)再通過網(wǎng)絡(luò)請(qǐng)求獲取數(shù)據(jù)后更新UI

Observable<FoodList> cache = Observable
.create(new ObservableOnSubscribe<FoodList>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception {
                Log.e(TAG, "create當(dāng)前線程:"+Thread.currentThread().getName() );
                FoodList data = CacheManager.getInstance().getFoodListData();

                // 在操作符 concat 中,只有調(diào)用 onComplete 之后才會(huì)執(zhí)行下一個(gè) Observable
                if (data != null){ // 如果緩存數(shù)據(jù)不為空,則直接讀取緩存數(shù)據(jù),而不讀取網(wǎng)絡(luò)數(shù)據(jù)
                    isFromNet = false;
                    Log.e(TAG, "\nsubscribe: 讀取緩存數(shù)據(jù):" );
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            mRxOperatorsText.append("\nsubscribe: 讀取緩存數(shù)據(jù):\n");
                        }
                    });
                    e.onNext(data);
                }else {
                    isFromNet = true;
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            mRxOperatorsText.append("\nsubscribe: 讀取網(wǎng)絡(luò)數(shù)據(jù):\n");
                        }
                    });
                    Log.e(TAG, "\nsubscribe: 讀取網(wǎng)絡(luò)數(shù)據(jù):" );
                    e.onComplete();
                }
           }
        });

        Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                .addQueryParameter("rows",10+"")
                .build()
                .getObjectObservable(FoodList.class);

        // 兩個(gè) Observable 的泛型應(yīng)當(dāng)保持一致

Observable
.concat(cache,network)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FoodList>() {
                    @Override
                    public void accept(@NonNull FoodList tngouBeen) throws Exception {
                        Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() );
                        if (isFromNet){
                            mRxOperatorsText.append("accept : 網(wǎng)絡(luò)獲取數(shù)據(jù)設(shè)置緩存: \n");
                            Log.e(TAG, "accept : 網(wǎng)絡(luò)獲取數(shù)據(jù)設(shè)置緩存: \n"+tngouBeen.toString() );
                            CacheManager.getInstance().setFoodListData(tngouBeen);
                        }

                        mRxOperatorsText.append("accept: 讀取數(shù)據(jù)成功:" + tngouBeen.toString()+"\n");
                        Log.e(TAG, "accept: 讀取數(shù)據(jù)成功:" + tngouBeen.toString());
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName() );
                        Log.e(TAG, "accept: 讀取數(shù)據(jù)失?。?+throwable.getMessage() );
                        mRxOperatorsText.append("accept: 讀取數(shù)據(jù)失?。?+throwable.getMessage()+"\n");
                    }
                });

3.多個(gè)網(wǎng)絡(luò)請(qǐng)求依次依賴

Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                .addQueryParameter("rows", 1 + "")
                .build()
                .getObjectObservable(FoodList.class) // 發(fā)起獲取食品列表的請(qǐng)求,并解析到FootList
                .subscribeOn(Schedulers.io())        // 在io線程進(jìn)行網(wǎng)絡(luò)請(qǐng)求
                .observeOn(AndroidSchedulers.mainThread()) // 在主線程處理獲取食品列表的請(qǐng)求結(jié)果
                .doOnNext(new Consumer<FoodList>() {
                    @Override
                    public void accept(@NonNull FoodList foodList) throws Exception {
                        // 先根據(jù)獲取食品列表的響應(yīng)結(jié)果做一些操作
                        Log.e(TAG, "accept: doOnNext :" + foodList.toString());
                        mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n");
                    }
                })
                .observeOn(Schedulers.io()) // 回到 io 線程去處理獲取食品詳情的請(qǐng)求
                .flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {
                    @Override
                    public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {
                        if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {
                            return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show")
                                    .addBodyParameter("id", foodList.getTngou().get(0).getId() + "")
                                    .build()
                                    .getObjectObservable(FoodDetail.class);
                        }
                        return null;

                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<FoodDetail>() {
                    @Override
                    public void accept(@NonNull FoodDetail foodDetail) throws Exception {
                        Log.e(TAG, "accept: success :" + foodDetail.toString());
                        mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: error :" + throwable.getMessage());
                        mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n");
                    }
                });

4.結(jié)合多個(gè)接口的數(shù)據(jù)更新UI

Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                .build()
                .getObjectObservable(MobileAddress.class);

        Observable<CategoryResult> observable2 = Network.getGankApi()
                .getCategoryData("Android",1,1);

        Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() {
            @Override
            public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception {
                return "合并后的數(shù)據(jù)為:手機(jī)歸屬地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who;
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.e(TAG, "accept: 成功:" + s+"\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: 失?。? + throwable+"\n");
                    }
                });

5.間隔任務(wù)實(shí)現(xiàn)心跳

private Disposable mDisposable;
    @Override
    protected void doSomething() {
        mDisposable = Flowable.interval(1, TimeUnit.SECONDS)
                .doOnNext(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        Log.e(TAG, "accept: doOnNext : "+aLong );
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        Log.e(TAG, "accept: 設(shè)置文本 :"+aLong );
                        mRxOperatorsText.append("accept: 設(shè)置文本 :"+aLong +"\n");
                    }
                });
    }

    /**
     * 銷毀時(shí)停止心跳
     */
    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null){
            mDisposable.dispose();
        }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容