這可能是最好的RxJava 2.x 入門(mén)教程(五)

這可能是最好的 RxJava 2.x入門(mén)教程系列專(zhuān)欄
文章鏈接:
這可能是最好的 RxJava 2.x 入門(mén)教程(完結(jié)版)【重磅推出】
這可能是最好的 RxJava 2.x 入門(mén)教程(一)
這可能是最好的 RxJava 2.x 入門(mén)教程(二)
這可能是最好的 RxJava 2.x 入門(mén)教程(三)
這可能是最好的 RxJava 2.x 入門(mén)教程(四)
這可能是最好的 RxJava 2.x 入門(mén)教程(五)
GitHub 代碼同步更新:https://github.com/nanchen2251/RxJava2Examples
為了滿(mǎn)足大家的饑渴難耐,GitHub 將同步更新代碼,主要包含基本的代碼封裝,RxJava 2.x 所有操作符應(yīng)用場(chǎng)景介紹和實(shí)際應(yīng)用場(chǎng)景,后期除了 RxJava 可能還會(huì)增添其他東西,總之,GitHub 上的 Demo 專(zhuān)為大家傾心打造。傳送門(mén):https://github.com/nanchen2251/RxJava2Examples

前言

終于如愿來(lái)到讓我小伙伴們亢奮的 RxJava 2 使用場(chǎng)景舉例了,前面幾章中我們講解完了 RxJava 1.x 到 RxJava 2.x 的異同以及 RxJava 2.x 的各種操作符使用,如有疑問(wèn),歡迎點(diǎn)擊上方的鏈接進(jìn)入你想要的環(huán)節(jié)。

正題

簡(jiǎn)單的網(wǎng)絡(luò)請(qǐng)求

想必大家都知道,很多時(shí)候我們?cè)谑褂?RxJava 的時(shí)候總是和 Retrofit 進(jìn)行結(jié)合使用,而為了方便演示,這里我們就暫且采用 OkHttp3 進(jìn)行演示,配合 mapdoOnNext ,線(xiàn)程切換進(jìn)行簡(jiǎn)單的網(wǎng)絡(luò)請(qǐng)求:

  • 1)通過(guò) Observable.create() 方法,調(diào)用 OkHttp 網(wǎng)絡(luò)請(qǐng)求;
  • 2)通過(guò) map 操作符集合 gson,將 Response 轉(zhuǎn)換為 bean 類(lèi);
  • 3)通過(guò) doOnNext() 方法,解析 bean 中的數(shù)據(jù),并進(jìn)行數(shù)據(jù)庫(kù)存儲(chǔ)等操作;
  • 4)調(diào)度線(xiàn)程,在子線(xiàn)程中進(jìn)行耗時(shí)操作任務(wù),在主線(xiàn)程中更新 UI ;
  • 5)通過(guò) subscribe(),根據(jù)請(qǐng)求成功或者失敗來(lái)更新 UI 。
Observable.create(new ObservableOnSubscribe<Response>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
                Builder builder = new Builder()
                        .url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                        .get();
                Request request = builder.build();
                Call call = new OkHttpClient().newCall(request);
                Response response = call.execute();
                e.onNext(response);
            }
        }).map(new Function<Response, MobileAddress>() {
                    @Override
                    public MobileAddress apply(@NonNull Response response) throws Exception {

                        Log.e(TAG, "map 線(xiàn)程:" + Thread.currentThread().getName() + "\n");
                        if (response.isSuccessful()) {
                            ResponseBody body = response.body();
                            if (body != null) {
                                Log.e(TAG, "map:轉(zhuǎn)換前:" + response.body());
                                return new Gson().fromJson(body.string(), MobileAddress.class);
                            }
                        }
                        return null;
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress s) throws Exception {
                        Log.e(TAG, "doOnNext 線(xiàn)程:" + Thread.currentThread().getName() + "\n");
                        mRxOperatorsText.append("\ndoOnNext 線(xiàn)程:" + Thread.currentThread().getName() + "\n");
                        Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");
                        mRxOperatorsText.append("doOnNext: 保存成功:" + s.toString() + "\n");

                    }
                }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress data) throws Exception {
                        Log.e(TAG, "subscribe 線(xiàn)程:" + Thread.currentThread().getName() + "\n");
                        mRxOperatorsText.append("\nsubscribe 線(xiàn)程:" + Thread.currentThread().getName() + "\n");
                        Log.e(TAG, "成功:" + data.toString() + "\n");
                        mRxOperatorsText.append("成功:" + data.toString() + "\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "subscribe 線(xiàn)程:" + Thread.currentThread().getName() + "\n");
                        mRxOperatorsText.append("\nsubscribe 線(xiàn)程:" + Thread.currentThread().getName() + "\n");

                        Log.e(TAG, "失?。? + throwable.getMessage() + "\n");
                        mRxOperatorsText.append("失敗:" + throwable.getMessage() + "\n");
                    }
                });

為了方便,我們后面的講解大部分采用開(kāi)源的 Rx2AndroidNetworking 來(lái)處理,數(shù)據(jù)來(lái)源于天狗網(wǎng)等多個(gè)公共API接口。

mRxOperatorsText.append("RxNetworkActivity\n");
        Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                .build()
                .getObjectObservable(MobileAddress.class)
                .observeOn(AndroidSchedulers.mainThread()) // 為doOnNext() 指定在主線(xiàn)程,否則報(bào)錯(cuò)
                .doOnNext(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress data) throws Exception {
                        Log.e(TAG, "doOnNext:"+Thread.currentThread().getName()+"\n" );
                        mRxOperatorsText.append("\ndoOnNext:"+Thread.currentThread().getName()+"\n" );
                        Log.e(TAG,"doOnNext:"+data.toString()+"\n");
                        mRxOperatorsText.append("doOnNext:"+data.toString()+"\n");
                    }
                })
                .map(new Function<MobileAddress, ResultBean>() {
                    @Override
                    public ResultBean apply(@NonNull MobileAddress mobileAddress) throws Exception {
                        Log.e(TAG, "\n" );
                        mRxOperatorsText.append("\n");
                        Log.e(TAG, "map:"+Thread.currentThread().getName()+"\n" );
                        mRxOperatorsText.append("map:"+Thread.currentThread().getName()+"\n" );
                        return mobileAddress.getResult();
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<ResultBean>() {
                    @Override
                    public void accept(@NonNull ResultBean data) throws Exception {
                        Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName()+"\n" );
                        mRxOperatorsText.append("\nsubscribe 成功:"+Thread.currentThread().getName()+"\n" );
                        Log.e(TAG, "成功:" + data.toString() + "\n");
                        mRxOperatorsText.append("成功:" + data.toString() + "\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName()+"\n" );
                        mRxOperatorsText.append("\nsubscribe 失敗:"+Thread.currentThread().getName()+"\n" );
                        Log.e(TAG, "失?。?+ throwable.getMessage()+"\n" );
                        mRxOperatorsText.append("失敗:"+ throwable.getMessage()+"\n");
                    }
                });

先讀取緩存,如果緩存沒(méi)數(shù)據(jù)再通過(guò)網(wǎng)絡(luò)請(qǐng)求獲取數(shù)據(jù)后更新UI

想必在實(shí)際應(yīng)用中,很多時(shí)候(對(duì)數(shù)據(jù)操作不敏感時(shí))都需要我們先讀取緩存的數(shù)據(jù),如果緩存沒(méi)有數(shù)據(jù),再通過(guò)網(wǎng)絡(luò)請(qǐng)求獲取,隨后在主線(xiàn)程更新我們的 UI。

concat 操作符簡(jiǎn)直就是為我們這種需求量身定做。

concat 可以做到不交錯(cuò)的發(fā)射兩個(gè)甚至多個(gè) Observable 的發(fā)射事件,并且只有前一個(gè) Observable 終止( onComplete() ) 后才會(huì)定義下一個(gè) Observable。

利用這個(gè)特性,我們就可以先讀取緩存數(shù)據(jù),倘若獲取到的緩存數(shù)據(jù)不是我們想要的,再調(diào)用 onComplete() 以執(zhí)行獲取網(wǎng)絡(luò)數(shù)據(jù)的 Observable,如果緩存數(shù)據(jù)能應(yīng)我們所需,則直接調(diào)用 onNext() ,防止過(guò)度的網(wǎng)絡(luò)請(qǐng)求,浪費(fèi)用戶(hù)的流量。

Observable<FoodList> cache = Observable.create(new ObservableOnSubscribe<FoodList>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception {
                Log.e(TAG, "create當(dāng)前線(xiàn)程:"+Thread.currentThread().getName() );
                FoodList data = CacheManager.getInstance().getFoodListData();

                // 在操作符 concat 中,只有調(diào)用 onComplete 之后才會(huì)執(zhí)行下一個(gè) Observable
                if (data != null){ // 如果緩存數(shù)據(jù)不為空,則直接讀取緩存數(shù)據(jù),而不讀取網(wǎng)絡(luò)數(shù)據(jù)
                    isFromNet = false;
                    Log.e(TAG, "\nsubscribe: 讀取緩存數(shù)據(jù):" );
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            mRxOperatorsText.append("\nsubscribe: 讀取緩存數(shù)據(jù):\n");
                        }
                    });

                    e.onNext(data);
                }else {
                    isFromNet = true;
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            mRxOperatorsText.append("\nsubscribe: 讀取網(wǎng)絡(luò)數(shù)據(jù):\n");
                        }
                    });
                    Log.e(TAG, "\nsubscribe: 讀取網(wǎng)絡(luò)數(shù)據(jù):" );
                    e.onComplete();
                }


            }
        });

        Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                .addQueryParameter("rows",10+"")
                .build()
                .getObjectObservable(FoodList.class);


        // 兩個(gè) Observable 的泛型應(yīng)當(dāng)保持一致

        Observable.concat(cache,network)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<FoodList>() {
                    @Override
                    public void accept(@NonNull FoodList tngouBeen) throws Exception {
                        Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() );
                        if (isFromNet){
                            mRxOperatorsText.append("accept : 網(wǎng)絡(luò)獲取數(shù)據(jù)設(shè)置緩存: \n");
                            Log.e(TAG, "accept : 網(wǎng)絡(luò)獲取數(shù)據(jù)設(shè)置緩存: \n"+tngouBeen.toString() );
                            CacheManager.getInstance().setFoodListData(tngouBeen);
                        }

                        mRxOperatorsText.append("accept: 讀取數(shù)據(jù)成功:" + tngouBeen.toString()+"\n");
                        Log.e(TAG, "accept: 讀取數(shù)據(jù)成功:" + tngouBeen.toString());
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName() );
                        Log.e(TAG, "accept: 讀取數(shù)據(jù)失?。?+throwable.getMessage() );
                        mRxOperatorsText.append("accept: 讀取數(shù)據(jù)失敗:"+throwable.getMessage()+"\n");
                    }
                });

有時(shí)候我們的緩存可能還會(huì)分為 memory 和 disk ,實(shí)際上都差不多,無(wú)非是多寫(xiě)點(diǎn) Observable ,然后通過(guò) concat 合并即可。

多個(gè)網(wǎng)絡(luò)請(qǐng)求依次依賴(lài)

想必這種情況也在實(shí)際情況中比比皆是,例如用戶(hù)注冊(cè)成功后需要自動(dòng)登錄,我們只需要先通過(guò)注冊(cè)接口注冊(cè)用戶(hù)信息,注冊(cè)成功后馬上調(diào)用登錄接口進(jìn)行自動(dòng)登錄即可。

我們的 flatMap 恰好解決了這種應(yīng)用場(chǎng)景,flatMap 操作符可以將一個(gè)發(fā)射數(shù)據(jù)的 Observable 變換為多個(gè) Observables ,然后將它們發(fā)射的數(shù)據(jù)合并后放到一個(gè)單獨(dú)的 Observable,利用這個(gè)特性,我們很輕松地達(dá)到了我們的需求。

Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                .addQueryParameter("rows", 1 + "")
                .build()
                .getObjectObservable(FoodList.class) // 發(fā)起獲取食品列表的請(qǐng)求,并解析到FootList
                .subscribeOn(Schedulers.io())        // 在io線(xiàn)程進(jìn)行網(wǎng)絡(luò)請(qǐng)求
                .observeOn(AndroidSchedulers.mainThread()) // 在主線(xiàn)程處理獲取食品列表的請(qǐng)求結(jié)果
                .doOnNext(new Consumer<FoodList>() {
                    @Override
                    public void accept(@NonNull FoodList foodList) throws Exception {
                        // 先根據(jù)獲取食品列表的響應(yīng)結(jié)果做一些操作
                        Log.e(TAG, "accept: doOnNext :" + foodList.toString());
                        mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n");
                    }
                })
                .observeOn(Schedulers.io()) // 回到 io 線(xiàn)程去處理獲取食品詳情的請(qǐng)求
                .flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {
                    @Override
                    public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {
                        if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {
                            return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show")
                                    .addBodyParameter("id", foodList.getTngou().get(0).getId() + "")
                                    .build()
                                    .getObjectObservable(FoodDetail.class);
                        }
                        return null;

                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<FoodDetail>() {
                    @Override
                    public void accept(@NonNull FoodDetail foodDetail) throws Exception {
                        Log.e(TAG, "accept: success :" + foodDetail.toString());
                        mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: error :" + throwable.getMessage());
                        mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n");
                    }
                });

結(jié)合多個(gè)接口的數(shù)據(jù)更新 UI

在實(shí)際應(yīng)用中,我們極有可能會(huì)在一個(gè)頁(yè)面顯示的數(shù)據(jù)來(lái)源于多個(gè)接口,這時(shí)候我們的 zip 操作符為我們排憂(yōu)解難。

zip 操作符可以將多個(gè) Observable 的數(shù)據(jù)結(jié)合為一個(gè)數(shù)據(jù)源再發(fā)射出去。

Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                .build()
                .getObjectObservable(MobileAddress.class);

        Observable<CategoryResult> observable2 = Network.getGankApi()
                .getCategoryData("Android",1,1);

        Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() {
            @Override
            public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception {
                return "合并后的數(shù)據(jù)為:手機(jī)歸屬地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who;
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.e(TAG, "accept: 成功:" + s+"\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: 失?。? + throwable+"\n");
                    }
                });

間隔任務(wù)實(shí)現(xiàn)心跳

想必即時(shí)通訊等需要輪訓(xùn)的任務(wù)在如今的 APP 中已是很常見(jiàn),而 RxJava 2.x 的 interval 操作符可謂完美地解決了我們的疑惑。

這里就簡(jiǎn)單的意思一下輪訓(xùn)。

private Disposable mDisposable;
    @Override
    protected void doSomething() {
        mDisposable = Flowable.interval(1, TimeUnit.SECONDS)
                .doOnNext(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        Log.e(TAG, "accept: doOnNext : "+aLong );
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        Log.e(TAG, "accept: 設(shè)置文本 :"+aLong );
                        mRxOperatorsText.append("accept: 設(shè)置文本 :"+aLong +"\n");
                    }
                });
    }

    /**
     * 銷(xiāo)毀時(shí)停止心跳
     */
    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null){
            mDisposable.dispose();
        }
    }

后記

姑且先講到這里吧,喜歡的小伙伴別忘了關(guān)注和點(diǎn)贊哦~ https://github.com/nanchen2251/RxJava2Examples

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 這可能是最好的 RxJava 2.x 入門(mén)教程系列專(zhuān)欄文章鏈接:這可能是最好的RxJava 2.x 入門(mén)教程(一)...
    nanchen2251閱讀 379,104評(píng)論 125 1,039
  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 179,094評(píng)論 25 709
  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位,與響應(yīng)式編程作為結(jié)合使用的,對(duì)什么是操作、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,987評(píng)論 0 10
  • 在記憶中寫(xiě)日記都是讀書(shū)時(shí)要交功課的事,總會(huì)苦思良久盡量湊字,想不到現(xiàn)在又要來(lái)寫(xiě),應(yīng)該怎么寫(xiě)寫(xiě)些什么心里確實(shí)有點(diǎn)兒...
    奔跑的赤兔馬閱讀 251評(píng)論 0 0
  • 現(xiàn)在很多軟件是收費(fèi)的,對(duì)于像我等窮屌絲來(lái)說(shuō),自然是想去找個(gè)破解的軟件來(lái)替代,這個(gè)時(shí)候遇到xxx.app文件已損壞,...
    JackerooChu閱讀 3,622評(píng)論 1 10

友情鏈接更多精彩內(nèi)容