RxJava2.0之旅(一)---簡(jiǎn)介及基本使用

1 前言

Rxjava由于其基于事件流的鏈?zhǔn)秸{(diào)用、邏輯簡(jiǎn)潔 & 使用簡(jiǎn)單的特點(diǎn),深受各大 Android開(kāi)發(fā)者的歡迎。
RxJava github地址

944365-4c1c1eb44ffe01e5.png

1 定義

  • RxJava 在 GitHub 的介紹:
RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
// 翻譯:RxJava 是一個(gè)在 Java VM 上使用可觀測(cè)的序列來(lái)組成異步的、基于事件的程序的庫(kù)
  • 總結(jié):RxJava 是一個(gè) 基于事件流、實(shí)現(xiàn)異步操作的庫(kù)

2 作用

  • 實(shí)現(xiàn)異步操作
  • 類似于 Android中的 AsyncTask 、Handler作用

3 特點(diǎn)

由于 RxJava的使用方式是:基于事件流的鏈?zhǔn)秸{(diào)用,所以使得 RxJava:

  • 邏輯簡(jiǎn)潔
  • 實(shí)現(xiàn)優(yōu)雅
  • 使用簡(jiǎn)單

更重要的是,隨著程序邏輯的復(fù)雜性提高,它依然能夠保持簡(jiǎn)潔 & 優(yōu)雅

4 原理

4.1 生活例子引入

  • 顧客到飯店吃飯
顧客到飯店吃飯.png

944365-07f12da4616b2b68.png

4.2 Rxjava原理介紹

  • Rxjava原理 基于.一種擴(kuò)展的觀察者模式
  • Rxjava的擴(kuò)展觀察者模式中有4個(gè)角色
角色 作用 類比
被觀察者(Observable) 產(chǎn)生事件 顧客
觀察者(Observer) 接收事件,并給出響應(yīng)動(dòng)作 廚房
訂閱(Subscribe) 連接 被觀察者 & 觀察者 服務(wù)員
事件(Event) 被觀察者 & 觀察者 溝通的載體 菜式

具體原理
請(qǐng)結(jié)合上述 顧客到飯店吃飯 的生活例子理解:

944365-5b6e7c8a3bb55f39.png

944365-fc3b7eb5a0ad28d0.png

  • 即RxJava原理可總結(jié)為:被觀察者 (Observable) 通過(guò) 訂閱(Subscribe)按順序發(fā)送事件 給觀察者 (Observer), 觀察者(Observer) 按順序接收事件 & 作出對(duì)應(yīng)的響應(yīng)動(dòng)作。具體如下圖:
944365-98ec92df0a4d7e0b.png

至此,RxJava原理講解完畢。

5 基本使用

  • 1.分步驟實(shí)現(xiàn):該方法主要為了深入說(shuō)明Rxjava的原理 & 使用,主要用于演示說(shuō)明
  • 2.基于事件流的鏈?zhǔn)秸{(diào)用:主要用于實(shí)際使用

5.1 方式1:分步驟實(shí)現(xiàn)

5.1.1 使用步驟

944365-779b0832b164e116.png

5.1.2 步驟詳解

//步驟1:創(chuàng)建被觀察者 (Observable )& 生產(chǎn)事件
Observable<Integer> observable = createObservable1();
//步驟2:創(chuàng)建觀察者 (Observer )并 定義響應(yīng)事件的行為
Observer<Integer> observer = createObserver1();
//步驟3:通過(guò)訂閱(Subscribe)連接觀察者和被觀察者
observable.subscribe(observer);
// 或者 observable.subscribe(subscriber);

步驟1:創(chuàng)建被觀察者 (Observable )& 生產(chǎn)事件

  • 即 顧客入飯店 - 坐下餐桌 - 點(diǎn)菜
  • 具體實(shí)現(xiàn)
    create()--just(T...)--fromXX()

create()

private Observable<Integer> createObservable1() {
    // 1. 創(chuàng)建被觀察者 Observable 對(duì)象

    // create() 是 RxJava 最基本的創(chuàng)造事件序列的方法
    // 此處傳入了一個(gè) OnSubscribe 對(duì)象參數(shù)
    // 當(dāng) Observable 被訂閱時(shí),OnSubscribe 的 call() 方法會(huì)自動(dòng)被調(diào)用,即事件序列就會(huì)依照設(shè)定依次被觸發(fā)
    // 即觀察者會(huì)依次調(diào)用對(duì)應(yīng)事件的復(fù)寫(xiě)方法從而響應(yīng)事件
    // 從而實(shí)現(xiàn)被觀察者調(diào)用了觀察者的回調(diào)方法 & 由被觀察者向觀察者的事件傳遞,即觀察者模式
    Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

        // 2. 在復(fù)寫(xiě)的subscribe()里定義需要發(fā)送的事件
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // 通過(guò) ObservableEmitter類對(duì)象產(chǎn)生事件并通知觀察者
            // ObservableEmitter類介紹
            // a. 定義:事件發(fā)射器
            // b. 作用:定義需要發(fā)送的事件 & 向觀察者發(fā)送事件
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        }
    });
    return integerObservable;
}

just(T...)

//擴(kuò)展:RxJava 提供了其他方法用于 創(chuàng)建被觀察者對(duì)象Observable
// 方法1:just(T...):直接將傳入的參數(shù)依次發(fā)送出來(lái)
private Observable createObservable2() {
    Observable observable = Observable.just("A", "B", "C");
    return observable;

from(T[])

//擴(kuò)展:RxJava 提供了其他方法用于 創(chuàng)建被觀察者對(duì)象Observable
// 方法2:from(T[])  將傳入的數(shù)組 / Iterable 拆分成具體對(duì)象后,依次發(fā)送出來(lái)
private Observable createObservable3() {
    String[] words = {"A", "B", "C"};
    Observable observable = Observable.fromArray(words);
    return observable;
}

步驟2:創(chuàng)建觀察者 (Observer )并 定義響應(yīng)事件的行為

  • 即 開(kāi)廚房 - 確定對(duì)應(yīng)菜式
  • 發(fā)生的事件類型包括:Next事件、Complete事件 & Error事件。具體如下:

944365-8cb0da34f94b0c73.png

具體實(shí)現(xiàn)

  • 方式1:采用Observer 接口
  • 方式2:采用Subscriber 抽象類
    2種方法的區(qū)別
  • 相同點(diǎn):二者基本使用方式完全一致(實(shí)質(zhì)上,在RxJava的 subscribe 過(guò)程中,Observer總是會(huì)先被轉(zhuǎn)換成Subscriber再使用)
  • 不同點(diǎn):Subscriber抽象類對(duì) Observer 接口進(jìn)行了擴(kuò)展,新增了兩個(gè)方法:
    1. onStart():在還未響應(yīng)事件前調(diào)用,用于做一些初始化工作
    1. unsubscribe():用于取消訂閱。在該方法被調(diào)用后,觀察者將不再接收 & 響應(yīng)事件
  • 調(diào)用該方法前,先使用 isUnsubscribed() 判斷狀態(tài),確定被觀察者Observable是否還持有觀察者Subscriber的引用,如果引用不能及時(shí)釋放,就會(huì)出現(xiàn)內(nèi)存泄露

采用Observer 接口

private Observer<Integer> createObserver1() {
    //方式1:采用Observer 接口
    // 1. 創(chuàng)建觀察者 (Observer )對(duì)象
    Observer<Integer> observer = new Observer<Integer>() {
        // 2. 創(chuàng)建對(duì)象時(shí)通過(guò)對(duì)應(yīng)復(fù)寫(xiě)對(duì)應(yīng)事件方法 從而 響應(yīng)對(duì)應(yīng)事件

        // 觀察者接收事件前,默認(rèn)最先調(diào)用復(fù)寫(xiě) onSubscribe()
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "開(kāi)始采用subscribe連接");
        }

        // 當(dāng)被觀察者生產(chǎn)Next事件 & 觀察者接收到時(shí),會(huì)調(diào)用該復(fù)寫(xiě)方法 進(jìn)行響應(yīng)
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "對(duì)Next事件作出響應(yīng)" + integer);
        }

        // 當(dāng)被觀察者生產(chǎn)Error事件& 觀察者接收到時(shí),會(huì)調(diào)用該復(fù)寫(xiě)方法 進(jìn)行響應(yīng)
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
        }

        // 當(dāng)被觀察者生產(chǎn)Complete事件& 觀察者接收到時(shí),會(huì)調(diào)用該復(fù)寫(xiě)方法 進(jìn)行響應(yīng)
        @Override
        public void onComplete() {
            Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
        }
    };
    return observer;
}

方式2:采用Subscriber 抽象類

private Subscriber<Integer> createObserver2() {
    //方式2:采用Subscriber 抽象類
    // 說(shuō)明:Subscriber類 = RxJava 內(nèi)置的一個(gè)實(shí)現(xiàn)了 Observer 的抽象類,
    // 對(duì) Observer 接口進(jìn)行了擴(kuò)展\

    // 1. 創(chuàng)建觀察者 (Observer )對(duì)象
    Subscriber<Integer> subscriber = new Subscriber<Integer>() {
        // 2. 創(chuàng)建對(duì)象時(shí)通過(guò)對(duì)應(yīng)復(fù)寫(xiě)對(duì)應(yīng)事件方法 從而 響應(yīng)對(duì)應(yīng)事件

        // 觀察者接收事件前,默認(rèn)最先調(diào)用復(fù)寫(xiě) onSubscribe()
        @Override
        public void onSubscribe(Subscription s) {
            Log.d(TAG, "開(kāi)始采用subscribe連接");
        }

        // 當(dāng)被觀察者生產(chǎn)Next事件 & 觀察者接收到時(shí),會(huì)調(diào)用該復(fù)寫(xiě)方法 進(jìn)行響應(yīng)
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "對(duì)Next事件作出響應(yīng)" + integer);
        }

        // 當(dāng)被觀察者生產(chǎn)Error事件& 觀察者接收到時(shí),會(huì)調(diào)用該復(fù)寫(xiě)方法 進(jìn)行響應(yīng)
        @Override
        public void onError(Throwable t) {
            Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
        }

        // 當(dāng)被觀察者生產(chǎn)Complete事件& 觀察者接收到時(shí),會(huì)調(diào)用該復(fù)寫(xiě)方法 進(jìn)行響應(yīng)
        @Override
        public void onComplete() {
            Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
        }
    };
    return subscriber;
}

步驟3:通過(guò)訂閱(Subscribe)連接觀察者和被觀察者

  • 即 顧客找到服務(wù)員 - 點(diǎn)菜 - 服務(wù)員下單到>廚房 - 廚房烹調(diào)
  • 具體實(shí)現(xiàn)
observable.subscribe(observer);
 // 或者 observable.subscribe(subscriber);

5.2 方式2:優(yōu)雅的實(shí)現(xiàn)方法 - 基于事件流的鏈?zhǔn)秸{(diào)用

  • 上述的實(shí)現(xiàn)方式是為了說(shuō)明Rxjava的原理 & 使用
  • 在實(shí)際應(yīng)用中,會(huì)將上述步驟&代碼連在一起,從而更加簡(jiǎn)潔、更加優(yōu)雅,即所謂的 RxJava基于事件流的鏈?zhǔn)秸{(diào)用.
private void chainCall1() {
    // RxJava的鏈?zhǔn)讲僮?    // 1. 創(chuàng)建被觀察者 & 生產(chǎn)事件
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        }
    }).subscribe(new Observer<Integer>() {
        // 2. 通過(guò)通過(guò)訂閱(subscribe)連接觀察者和被觀察者
        // 3. 創(chuàng)建觀察者 & 定義響應(yīng)事件的行為
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "開(kāi)始采用subscribe連接");
        }

        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "對(duì)Next事件" + integer + "作出響應(yīng)");
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
        }
    });
    //注:整體方法調(diào)用順序:觀察者.onSubscribe()> 被觀察者.subscribe()> 觀察者.onNext()>觀察者.onComplete()
}

RxJava 2.x 提供了多個(gè)函數(shù)式接口 ,用于實(shí)現(xiàn)簡(jiǎn)便式的觀察者模式。

944365-abda1c2bef8681f3.png

    private void chainCall2() {
        Observable.just("hello").subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
    }

6 額外說(shuō)明

6.1 觀察者 Observer的subscribe()具備多個(gè)重載的方法

public final Disposable subscribe() {}
    // 表示觀察者不對(duì)被觀察者發(fā)送的事件作出任何響應(yīng)(但被觀察者還是可以繼續(xù)發(fā)送事件)

    public final Disposable subscribe(Consumer<? super T> onNext) {}
    // 表示觀察者只對(duì)被觀察者發(fā)送的Next事件作出響應(yīng)
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
    // 表示觀察者只對(duì)被觀察者發(fā)送的Next事件 & Error事件作出響應(yīng)

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
    // 表示觀察者只對(duì)被觀察者發(fā)送的Next事件、Error事件 & Complete事件作出響應(yīng)

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
    // 表示觀察者只對(duì)被觀察者發(fā)送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出響應(yīng)

    public final void subscribe(Observer<? super T> observer) {}
    // 表示觀察者對(duì)被觀察者發(fā)送的任何事件都作出響應(yīng)

6.2 可采用 Disposable.dispose() 切斷觀察者 與 被觀察者 之間的連接

即觀察者 無(wú)法繼續(xù) 接收 被觀察者的事件,但被觀察者還是可以繼續(xù)發(fā)送事件

// 主要在觀察者 Observer中 實(shí)現(xiàn)
        Observer<Integer> observer = new Observer<Integer>() {
            // 1. 定義Disposable類變量
            private Disposable mDisposable;

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "開(kāi)始采用subscribe連接");
                // 2. 對(duì)Disposable類變量賦值
                mDisposable = d;
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "對(duì)Next事件"+ value +"作出響應(yīng)"  );
                if (value == 2) {
                    // 設(shè)置在接收到第二個(gè)事件后切斷觀察者和被觀察者的連接
                    mDisposable.dispose();
                    Log.d(TAG, "已經(jīng)切斷了連接:" + mDisposable.isDisposed());
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
            }
        };

參考

http://www.itdecent.cn/p/a406b94f3188

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容