RxJava 2.x入門新姿勢一

引言

經(jīng)過幾年的發(fā)展,響應(yīng)式編程已經(jīng)是很流行了,在Android開發(fā)中的應(yīng)用也非常的廣泛,身為Android開發(fā)者,則是必須掌握的技術(shù)。

正文

網(wǎng)上已經(jīng)有很多很多RxJava相關(guān)的文章,視頻等等教程,但是說實(shí)話對于入門,或者新手來說,確實(shí)不好理解,上來就是各種,觀察者、被觀察者、訂閱、發(fā)布等等概念,一遍看下來直接就暈了,就感覺RxJava很難,難理解,用的時(shí)候也只是依葫蘆畫瓢,暈乎乎的用著,然后就沒有然后了。

這里我都不說那些概念,因?yàn)橹v概念太抽象,難記住,更難理解。我們用另外一個(gè)視角來學(xué)習(xí)。因?yàn)镽xJava 1.x的版本 官方已經(jīng)停止更新了維護(hù)了,沒有學(xué)習(xí)過也沒有關(guān)系,RxJava 2.x是全新的,直接學(xué)習(xí)使用就好了。

首先假設(shè)我們在工廠里上班,工廠都會(huì)有流水線,產(chǎn)品經(jīng)過流水線生產(chǎn)后來訂單了銷售出去。


事件處理模型

這里假設(shè)工廠生產(chǎn)的是一種六邊形的“Jerry帥氣餅干”,上游是生產(chǎn)車間流水線的事件流,下游是訂單產(chǎn)品的銷售消費(fèi)事件流。中間連接上下游關(guān)系的暫且叫做“Jerry帥氣餅干生產(chǎn)消費(fèi)訂單管理系統(tǒng)”(不要臉,名字寫這么長),為了下文方便抒寫且用“生產(chǎn)訂單管理系統(tǒng)”(PCMS)。以上圖上下游對應(yīng)的就是Obsersvable被觀察者也是發(fā)布者,下游對應(yīng)Observer觀察者也是訂閱者。使用RxJava代碼表示上圖就是:

public void test1() {

        // 上游生成產(chǎn)品流水線
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(TAG, "test1 ====== Observable: ------ onNext: Jerry");
                emitter.onNext("Jerry");
                Log.d(TAG, "test1 ====== Observable: ------ onNext: 就是");
                emitter.onNext("就是");
                Log.d(TAG, "test1 ====== Observable: ------ onNext: 帥");
                emitter.onNext("帥");
                Log.d(TAG, "test1 ====== Observable: ------ onNext: ?。?!");
                emitter.onNext("?。?!");
                Log.d(TAG, "test1 ====== Observable: ------ onComplete");
                emitter.onComplete();
                Log.d(TAG, "test1 ====== Observable: ------ onNext: Jerry帥炸天?。?!");
                emitter.onNext("Jerry帥炸天!??!");
            }
        });

        // 下游訂單產(chǎn)品銷售
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "test1 ====== Observer: onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "test1 ====== Observer: onNext: " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "test1 ====== Observer: onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "test1 ====== Observer: onComplete");
            }
        };

        // 連接上下游的訂單管理系統(tǒng)
        observable.subscribe(observer);
    }

上述代碼,上游生產(chǎn)車間流水線就是Observable,下游訂單銷售就是Observer,中間通過“生產(chǎn)訂單管理系統(tǒng)”subscribe來將上下游連接起來。

運(yùn)行后輸出結(jié)果是:


輸出結(jié)果

從輸出結(jié)果來看,當(dāng)上游Observable發(fā)出一個(gè)生產(chǎn)的餅干產(chǎn)品事件,下游訂單銷售的Observer就銷售一個(gè)餅干產(chǎn)品事件,而且當(dāng)上游調(diào)用了onComplete方法后,上游的生產(chǎn)事件還是生產(chǎn)餅干事件(繼續(xù)生產(chǎn)了“Jerry帥炸天”餅干事件),但是下游的訂單銷售卻沒有消費(fèi)掉。也就是事件產(chǎn)生方調(diào)用onComplete方法后,之后的事件還會(huì)繼續(xù)發(fā)送,但是事件接收方就不會(huì)接收了。

我們來看看Observable的subscribe方法的參數(shù):ObservableEmitter,Emitter顧名思義是發(fā)射器的意思,ObservableEmitter接口繼承自Emitter接口:

public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

接口定義很簡單,就三個(gè)方法,onNext我們上門已經(jīng)用過了,是用來發(fā)射發(fā)送事件的,onComplete是用來表示事件發(fā)送完了,后面如果有新的事件發(fā)送,下游接收者可以不用處理,onError方法看注釋說是發(fā)送一個(gè)異常事件給下游接收者。到底是不是這樣,我們來試試就曉得了。

public void test2() {

        // 上游生成產(chǎn)品流水線
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(TAG, "test2 ====== Observable: ------ onNext: Jerry");
                emitter.onNext("Jerry");
                Log.d(TAG, "test2 ====== Observable: ------ onNext: 就是");
                emitter.onNext("就是");
                Log.d(TAG, "test2 ====== Observable: ------ onNext: 帥");
                emitter.onNext("帥");
                Log.d(TAG, "test2 ====== Observable: ------ onNext: !?。?);
                emitter.onNext("?。。?);
                Log.d(TAG, "test2 ====== Observable: ------ onError");
                emitter.onError(new IllegalStateException("Jerry餅干烤焦了,賣出去會(huì)被打!"));
                Log.d(TAG, "test2 ====== Observable: ------ onNext: Jerry帥炸天?。?!");
                emitter.onNext("Jerry帥炸天?。?!");
            }
        });

        // 下游訂單產(chǎn)品銷售
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "test2 ====== Observer: onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "test2 ====== Observer: onNext: " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "test2 ====== Observer: onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "test2 ====== Observer: onComplete");
            }
        };

        // 連接上下游的訂單管理系統(tǒng)
        observable.subscribe(observer);
    }

在上游生產(chǎn)餅干的時(shí)候就生產(chǎn)了一個(gè)“Jerry餅干烤焦了,賣出去會(huì)被打!”的錯(cuò)誤餅干事件,下游訂單銷售的onError出錯(cuò)狀態(tài)會(huì)消費(fèi)這個(gè)事件。而上游在出錯(cuò)事件后發(fā)送的“Jerry帥炸天?。?!”餅干事件,同樣也只是把事件發(fā)送了處理,下游訂單銷售并沒有接收處理這個(gè)事件。

運(yùn)行后輸出結(jié)果:


輸出結(jié)果

細(xì)心的小伙伴應(yīng)該會(huì)發(fā)現(xiàn),每次執(zhí)行的時(shí)候都會(huì)先調(diào)用下游的onSubscribe方法,這個(gè)方法里有個(gè)參數(shù)Disposable(用完即可丟棄)意思可以理解成,將上下游的連接切斷,讓上游的生產(chǎn)的餅干不打包放入下游訂單銷售環(huán)節(jié),實(shí)際開發(fā)中是有這種需求的,當(dāng)發(fā)送事件出問題的時(shí)候就需要斷開事件接收處理。不像最近的疫苗事件,一些不要臉的生物疫苗公司把生產(chǎn)不合格的疫苗上市銷售,傷天害理,謀財(cái)害命。下面舉個(gè)例子:

public void test3() {

        // 上游生成產(chǎn)品流水線
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(TAG, "test3 ====== Observable: ------ onNext: Jerry");
                emitter.onNext("Jerry");
                Log.d(TAG, "test3 ====== Observable: ------ onNext: 就是");
                emitter.onNext("就是");
                Log.d(TAG, "test3 ====== Observable: ------ onNext: 帥");
                emitter.onNext("帥");
                Log.d(TAG, "test3 ====== Observable: ------ onNext: !?。?);
                emitter.onNext("!??!");
                Log.d(TAG, "test3 ====== Observable: ------ onComplete");
                emitter.onComplete();
                Log.d(TAG, "test3 ====== Observable: ------ onNext: Jerry帥炸天?。?!");
                emitter.onNext("Jerry帥炸天?。?!");
            }
        });

        // 下游訂單產(chǎn)品銷售
        Observer<String> observer = new Observer<String>() {

            private Disposable mDisposable;
            private int i;

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "test3 ====== Observer: onSubscribe");
                mDisposable = d;
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "test3 ====== Observer: onNext: " + value);
                i++;
                // 第一個(gè)事件接收后,就斷開上下游連接
                if (i == 1) {
                    Log.d(TAG, "test3 ====== Observer: start disposable");
                    mDisposable.dispose();
                    Log.d(TAG, "test3 ====== Observer: isDisposable: " + mDisposable.isDisposed());
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "test3 ====== Observer: onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "test3 ====== Observer: onComplete");
            }
        };

        // 連接上下游的訂單管理系統(tǒng)
        observable.subscribe(observer);
    }

這里我們在下游訂單銷售的onNext方法中,當(dāng)接收完第一個(gè)餅干事件后,就使用mDisposable.dispose()方法將上下游的連接斷開了,斷開后上游后續(xù)生產(chǎn)的餅干事件,下游就接收不到。

運(yùn)行的結(jié)果:


image.png

上圖中,也驗(yàn)證了我們的猜想,當(dāng)使用dispose斷開上下游連接后,下游就無法再繼續(xù)接收事件了。


這一講就先介紹這么多,這樣的方式理解Observable和Observer以及訂閱動(dòng)作subscribe是不是容易多了,希望對你有所幫助,下一講使用RxJava來切換變化餅干事件處理的線程(主線程、子線程)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容