Android RxJava

1.什么是RxJava(ReactiveX.io鏈?zhǔn)骄幊蹋?/h1>
RXJava是一個響應(yīng)式編程框架,采用觀察者設(shè)計(jì)模式,
觀察者模式本身的目的就是『后臺處理,前臺回調(diào)』的異步機(jī)制

概述:一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫

優(yōu)點(diǎn):異步操作很關(guān)鍵的一點(diǎn)是程序的簡潔性,因?yàn)樵谡{(diào)度過程比較復(fù)雜的  
情況下,異步代碼經(jīng)常會既難寫也難被讀懂。Android 創(chuàng)造的AsyncTask和  
Handler,其實(shí)都是為了讓異步代碼更加簡潔。RxJava 的優(yōu)勢也是簡潔,但它
的簡潔的與眾不同之處在于,隨著程序邏輯變得越來越復(fù)雜,它依然能夠保持簡
潔。(函數(shù)風(fēng)格、代碼簡單、異步錯誤處理、輕松使用并發(fā))

2.觀察者模式

被觀察者

/**
 * 被觀察者
 */
public class Watched{

    private List<Watcher > list = new ArrayList<>();

    //注冊觀察者
    @Override
    public void registerWatcher(Watcher watcher) {
        list.add(watcher);
    }

    //移除觀察者
    @Override
    public void unregisterWatcher(Watcher watcher) {
        list.remove(watcher);
    }

    //清空觀察者
    @Override
    public void clearWatcher() {
        list.clear();
    }

    //通知觀察者
    @Override
    public void notifyWathers(String string) {
        for (Watcher watcher: list ) {
            watcher.update(string);
        }
    }
}

觀察者

/**
 * 觀察者
 */
public class Watcher {

    //用于觀察者更新狀態(tài)
    @Override
    public void update(String string) {

        System.out.println(Thread.currentThread().toString() + " : " + string);
    }

}

測試類

/**
 * 測試類
 */
public class MyClass {
    public static void main(String[] args){

        //觀察者
        Watcher watcher1 = new Watcher();
        Watcher watcher2 = new Watcher();
        Watcher watcher3 = new Watcher();

        //被觀察者
        Watched watched = new Watched();

        //被觀察者注冊觀察者
        watched.registerWatcher(watcher1);
        watched.registerWatcher(watcher2);
        watched.registerWatcher(watcher3);

        //通知
        watched.notifyWathers("接收的數(shù)");

        //清空
        watched.clearWatcher();
    }
}

3.基本概念(觀察者模式)

案例:按鈕點(diǎn)擊處理、廣播注冊

通過setOnClickListener()方法,Button持有OnClickListener的引用;當(dāng)用戶
擊時(shí),Button自動調(diào)用OnClickListener的onClick()方法。 
Button——>被觀察者
OnClickListener——>觀察者
setOnClickListener ——>訂閱
onClick ——>事件

RxJava 有3個基本概念:

  1.Observable(可觀察者,即被觀察者)
  2.Observer(觀察者)
  3.subscribe(訂閱)事件。

觀察者模式

Observable 和 Observer 通過 subscribe() 方法實(shí)現(xiàn)訂閱關(guān)系,從而 
Observable 可以在需要的時(shí)候發(fā)出事件來通知 Observer。

普通事件 
     onNext() 接收被觀察者發(fā)送的消息
特殊的事件:
     onCompleted() 事件隊(duì)列完結(jié)
     onError ()    事件隊(duì)列異常

注意:

1)RxJava 不僅把每個事件單獨(dú)處理,還會把它們看做一個隊(duì)列。
2)RxJava 規(guī)定,onNext() 接收被觀察者發(fā)送的消息、可以執(zhí)行多次;當(dāng)不
會再有新的 onNext () 發(fā)出時(shí),需要觸發(fā) onCompleted () 方法作為標(biāo)志。
onError():事件隊(duì)列異常。在事件處理過程中出異常時(shí),onError() 會被觸發(fā),同
時(shí)隊(duì)列自動終止,不允許再有事件發(fā)出。
3)在一個正確運(yùn)行的事件序列中, onCompleted() 和 onError () 有且只有一個,
并且是事件序列中的最后一個。
4)需要注意的是,onCompleted()和 onError () 二者也是互斥的,即在隊(duì)列中調(diào)
用了其中一個,就不應(yīng)該再調(diào)用另一個。

3.調(diào)度器

RxJava中調(diào)度器設(shè)置方法

subscribeOn():或者叫做事件產(chǎn)生的線程。 
    指定 subscribe()所發(fā)生的線程,
    即 Observable.OnSubscribe 被激活時(shí)所處的線程。

observeOn():或者叫做事件消費(fèi)的線程。
    指定 Subscriber所運(yùn)行在的線程。

幾種調(diào)度器

在RxJava 中Scheduler——調(diào)度器,相當(dāng)于線程控制器,
RxJava 通過它來指定每一段代碼應(yīng)該運(yùn)行在什么樣的線程。
RxJava 已經(jīng)內(nèi)置了幾個Scheduler,它們已經(jīng)適合大多數(shù)的使用場景:
  1:Schedulers.immediate():直接在當(dāng)前線程運(yùn)行,相當(dāng)于不指定線程。這是默認(rèn)的Scheduler
  2:Schedulers.newThread():總是啟用新線程,并在新線程執(zhí)行操作。
  3:Schedulers.io():I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler
      行為模式和 newThread()差不多區(qū)別在于 io()的內(nèi)部實(shí)現(xiàn)是是用一個無數(shù)量上限的線 
      程池可以重用空閑的線程,因此多數(shù)情況下 io()比 newThread()更有效率。不要把計(jì)算
      工作放在 io()中可以避免創(chuàng)建不必要的線程。
  4:Schedulers.computation():計(jì)算所使用的 Scheduler這個計(jì)算指的是 CPU密集型計(jì)算,
       即不會被 I/O 等操作限制性能的操作,例如圖形的計(jì)算。這個 Scheduler使用的固定  
       的線程池,大小為 CPU核數(shù)。不要把 I/O 操作放在computation()中,否則 I/O 操作
       的等待時(shí)間會浪費(fèi)CPU。
  5.AndroidSchedulers.mainThread():Android 還有一個專用的
        它指定的操作將在 Android主線程運(yùn)行。有了這幾個 Scheduler,就可以使用
        subscribeOn()和 observeOn()兩個方法來對線程進(jìn)行控制了。 

4.依賴庫

//RxJava
implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'com.squareup.retrofit2:retrofit:2.5.0'//retrofit 庫
implementation 'com.squareup.retrofit2:converter-gson:2.3.0'//轉(zhuǎn)換器,請求結(jié)果轉(zhuǎn)換成Model
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'//配合Rxjava 使用
implementation 'com.google.code.gson:gson:2.6.2'//Gson 庫

5.簡單使用

public static void baseRx(){
    //創(chuàng)建被觀察者
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) {
            emitter.onNext("1111");
            emitter.onNext("2222");
            emitter.onNext("3333");
            emitter.onNext("4444");
            //emitter.onError(new Throwable("abc"));
            //emitter.onComplete();
        }
    });

    //創(chuàng)建觀察者
    Observer<String> observer = new Observer<String>() {

        @Override
        public void onSubscribe(Disposable d) {//關(guān)閉線程
            Log.e(TAG, "onSubscribe: " );
        }

        @Override
        public void onNext(String s) {
            Log.e(TAG, "onNext: "+ s );
        }

        @Override
        public void onError(Throwable e) {//失敗
            Log.e(TAG, "onError: "+e.getMessage() );
        }

        @Override
        public void onComplete() {//成功
            Log.e(TAG, "onComplete: " );
        }
    };
    //被觀察者訂閱觀察者
    observable.subscribe(observer);

    //線程切換
    observable
            //被訂閱者在子線程中
            .subscribeOn(Schedulers.io())
            //訂閱者在主線程中
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(observer);

    //觀察中可以重復(fù)指定線程
    observable
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())//主
            .observeOn(Schedulers.io())//子
            .observeOn(AndroidSchedulers.mainThread())//主
            .subscribe(observer);
}

6.Android功能使用

private void rxAndroidBean() {

    Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(MyServer.Url)
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .addConverterFactory(GsonConverterFactory.create())
            .build();

    MyServer myServer = retrofit.create(MyServer.class);

    Observable<Bean> call = myServer.getDate2();

    call.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Bean>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Bean responseBody) {

                    Log.e(TAG, "onNext: "+ responseBody.getRESULT() );
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
}

7.其他操作符使用

//遍歷輸出
public static void rxFrom(){
    Integer[] a = {1,2,3,4,5};
    Observable.fromArray(a).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.e(TAG, "accept: "+integer);
        }
    });
}

//數(shù)組合并輸出
public static void rxJust(){
    Integer[] a = {1,2,3};
    Integer[] b = {9,8,7};
    Observable.just(a,b).subscribe(new Consumer<Integer[]>() {
        @Override
        public void accept(Integer[] integers) throws Exception {
            for (Integer i: integers) {
                Log.e(TAG, "accept: "+i);
            }
        }
    });
}

//范圍輸出
public  static  void rxRange(){
    Observable.range(0,20).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.e(TAG, "accept: "+integer );
        }
    });
}

//過濾輸出
public static void rxFilter(){
    Integer[] a = {1,2,3,4,5};
    Observable.fromArray(a).filter(new Predicate<Integer>() {
        @Override
        public boolean test(Integer integer) throws Exception {
            if (integer>3){
                return true;
            }
            return false;
        }
    }).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.e(TAG, "accept: "+integer );
        }
    });
}

//定時(shí)器
public static void rxInterval(){

    Observable.interval(1,1,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            Log.e(TAG, "accept: "+aLong );
        }
    });
}

//數(shù)組轉(zhuǎn)換
public static void rxMap(){
    Integer[] a = {1,2,3,4,5};
    Observable.fromArray(a).map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) {

            return integer+"abc";
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) {
            Log.e(TAG, "accept: "+s );
        }
    });
}

//一個對象轉(zhuǎn)換為一組對象
public static void rxFlatMap(){
    Integer[] a = {1,2,3,4,5};
    Observable.fromArray(a).flatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Integer integer) throws Exception {
            String[] strs = new String[3];
            for (int i =0;i<strs.length;i++){
                strs[i] = integer +  strs[i];
            }
            return Observable.fromArray(strs);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.e(TAG, "accept: "+s );
        }
    });
}

//Observable壓縮合并
public static void rxZip(){
    Integer[] a= {1,2,3};
    Integer[] b={4,5,6};
    Observable<Integer> observableA = Observable.fromArray(a);
    Observable<Integer> observableB = Observable.fromArray(b);

    Observable.zip(observableA, observableB, new BiFunction<Integer, Integer, String>() {

        @Override
        public String apply(Integer integer, Integer integer2) throws Exception {
            return integer + ":" + integer2;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.e(TAG, "accept: "+s );
        }
    });

}

//合并
public static void rxMerge(){
    Integer[] a ={1,2,3};
    String[] b = {"abc","aaa","bbb"};
    char[] c = {'a','b','c'};

    Observable<Integer> A = Observable.fromArray(a);
    Observable<String> B = Observable.fromArray(b);
    Observable<char[]> C = Observable.fromArray(c);

    Observable
            .merge(A,B,C)
            .subscribe(new Consumer<Serializable>() {
                @Override
                public void accept(Serializable serializable) throws Exception {
                    Log.e(TAG, "accept: ."+serializable );
                }
            });
}

8.RxAndroid好處

用途
   是一個實(shí)現(xiàn)異步操作的庫,具有簡潔的鏈?zhǔn)酱a,提供強(qiáng)大的數(shù)據(jù)變換。
優(yōu)勢
   異步好簡單、代碼好簡潔,一個簡單、一個簡潔,這就意味著工作效率。

subscribeOn只能定義一次,除非是在定義doOnSubscribe
observeOn可以定義多次,決定后續(xù)代碼所在的線程

9.RxJava:好處

使用Rxjava的好處在于,我們可以方便的切換方法的執(zhí)行線程,對線程動態(tài)
切換,該過程無需我們自己手動創(chuàng)建和啟動線程。使用Rxjava創(chuàng)建的代碼雖然
出現(xiàn)在同一個線程中,但是我們可以設(shè)置使得不同方法在不同線程中執(zhí)行。上述
功能的實(shí)現(xiàn)主要?dú)w功于RxJava的Scheduler實(shí)現(xiàn),Scheduler 提供了『后臺處
理,前臺回調(diào)』的異步機(jī)制。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容