Rxjava

什么是RxJava(ReactiveX.io鏈式編程)

  • 定義:一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫。
    總結(jié):RxJava 是一個 基于事件流、實現(xiàn)異步操作的庫
    理解:RXJava是一個響應(yīng)式編程框架 ,采用觀察者設(shè)計模式,觀察者模式本身的目的就是『后臺處理,前臺回調(diào)』的異步機制

  • 優(yōu)點:
    由于 RxJava是基于事件流的鏈式調(diào)用,所以使得 RxJava:

邏輯簡潔
實現(xiàn)優(yōu)雅
使用簡單

  • 作用:
    實現(xiàn)異步操作

類似于 Android中的 AsyncTask 、Handler作用

RxJava 有3個基本概念及原理

1.Observable(被觀察者)
2.Observer(觀察者)
3.subscribe(訂閱)事件。

注意

1)RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。
2)RxJava 規(guī)定,onNext() 接收被觀察者發(fā)送的消息、可以執(zhí)行多次;當不會再有新的 onNext () 發(fā)出時,需要觸發(fā) onCompleted () 方法作為標志。onError():事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發(fā),同時隊列自動終止,不允許再有事件發(fā)出。
3)在一個正確運行的事件序列中, onCompleted() 和 onError () 有且只有一個,并且是事件序列中的最后一個。
4)需要注意的是,onCompleted()和 onError () 二者也是互斥的,即在隊列中調(diào)用了其中一個,就不應(yīng)該再調(diào)用另一個。

依賴庫

//RxJava
    implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
    implementation 'com.squareup.retrofit2:retrofit:2.5.0'//retrofit 庫
    implementation 'com.squareup.retrofit2:converter-gson:2.5.0'//轉(zhuǎn)換器,請求結(jié)果轉(zhuǎn)換成Model
    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'//配合Rxjava 使用

簡單使用

public static void baseRx(){
    //1.創(chuàng)建被觀察者
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) {
            emitter.onNext("1111");
            emitter.onNext("2222");
            emitter.onNext("3333");
            emitter.onNext("4444");
            //emitter.onError(new Throwable("abc"));
            //emitter.onComplete();
        }
    });

    //2.創(chuàng)建觀察者
    Observer<String> observer = new Observer<String>() {

        @Override
        public void onSubscribe(Disposable d) {//關(guān)閉線程
            Log.e(TAG, "onSubscribe: " );
        }

        @Override
        public void onNext(String s) {
            Log.e(TAG, "onNext: "+ s );
        }

        @Override
        public void onError(Throwable e) {//失敗
            Log.e(TAG, "onError: "+e.getMessage() );
        }

        @Override
        public void onComplete() {//成功
            Log.e(TAG, "onComplete: " );
        }
    };
    //3.被觀察者訂閱觀察者
    observable.subscribe(observer);

    //線程切換
    observable
            //被訂閱者在子線程中
            .subscribeOn(Schedulers.io())
            //訂閱者在主線程中
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(observer);

    //觀察中可以重復(fù)指定線程
    observable
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())//主
            .observeOn(Schedulers.io())//子
            .observeOn(AndroidSchedulers.mainThread())//主
            .subscribe(observer);
}

Android功能使用

   final Retrofit homeRetrofit = new Retrofit.Builder()
                .baseUrl(ApiServer.homeUrl)
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .build();

        final ApiServer server = homeRetrofit.create(ApiServer.class);

        final Observable<HomeBean> home = server.getHome("" + count);

        home.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<HomeBean>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(HomeBean homeBean) {
                        List<HomeBean.ResultsBean> results = homeBean.getResults();
                        homeList.addAll(results);
                        srl.finishRefresh();
                        srl.finishLoadMore();
                        adapter.notifyDataSetChanged();
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("TAG", "onError()" + e.getMessage());
                    }

                    @Override
                    public void onComplete() {

                    }
                });

其他操作符使用( 查看操作符)

  • 創(chuàng)建操作符
    //遍歷輸出
    public static void rxFrom(){
        Integer[] a = {1,2,3,4,5};
        // Observable.fromArray(1,2,3,4)
        //Observable.fromArray("a","b","c")
        Observable.fromArray(a).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept: "+integer);
            }
        });
    }

    //數(shù)組合并輸出
    public static void rxJust(){
        Integer[] a = {1,2,3};
        Integer[] b = {9,8,7};
        Observable.just(a,b).subscribe(new Consumer<Integer[]>() {
            @Override
            public void accept(Integer[] integers) throws Exception {
                for (Integer i: integers) {
                    Log.e(TAG, "accept: "+i);
                }
            }
        });
    }

    //范圍輸出
    public  static  void rxRange(){
        Observable.range(0,20).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept: "+integer );
            }
        });
    }

  //定時器
    public static void rxInterval(){
        Observable.interval(1,1,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.e(TAG, "accept: "+aLong );
            }
        });
    }
    //閃屏
    private void rxjavaInterval() {
        final Long time = 5L;
        subscribe = Observable.interval(1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e("TAG", "倒計時:" + aLong);
                        if (aLong < time && !subscribe.isDisposed()) {
                            tv.setText("記錄改變生活" + (time - aLong - 1));
                        } else {
                            Intent intent = new Intent(WelcomActivity.this, MainActivity.class);
                            startActivity(intent);
                            finish();
                        }
                    }
                });
    }
    @Override
    protected void onDestroy() {
        super.onDestroy();
        subscribe.dispose();
        subscribe = null;
    }
  • 過濾操作符
    //過濾輸出
    public static void rxFilter(){
        Integer[] a = {1,2,3,4,5};
        Observable.fromArray(a).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                if (integer>3){
                    return true;
                }
                return false;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept: "+integer );
            }
        });
    }
  • 變換操作符
①Map:通過指定一個Fun函數(shù)將Observeble轉(zhuǎn)換成一個新的Observable對象并發(fā)射,觀察者收到新的observable處理。

    public static void rxMap(){
        Integer[] a = {1,2,3,4,5};
        Observable.fromArray(a).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) {
                return integer+"abc";
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) {
                Log.e(TAG, "accept: "+s );
            }
        });
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容