RxJava——入門篇

一、ReactiveX簡介

在學習RxJava前首先需要了解ReactiveX,因為RxJava是ReactiveX的一種Java的實現(xiàn)形式。

  • ReactiveX的官網(wǎng)地址為:
 http://reactivex.io/

ReactiveX官網(wǎng)對于自身的介紹是:

An API for asynchronous programming with observable streams

實質(zhì)上我們可以對其解讀為三部分:

ReactiveX的解讀
①An API: 首先它是個編程接口,不同語言提供不同實現(xiàn)。例如Java中的RxJava。
②For asynchronous programming: 在異步編程設計中使用。例如開啟子線程處理耗時的網(wǎng)絡請求。
③With observable streams: 基于可觀察的事件流。例如觀察者模式中觀察者對被觀察者的監(jiān)聽。

而ReactiveX結(jié)合了如下三部分內(nèi)容:

  1. 觀察者模式,即定義對象間一種一對多的依賴關系,當一個對象改變狀態(tài)時,則所有依賴它的對象都會被改變。
  2. Iterator模式,即迭代流式編程模式。
  3. 函數(shù)式編程模式,即提供一系列函數(shù)樣式的方法供快速開發(fā)。

Reactive的模式圖如下:

圖1.1 ReactiveX的模式圖

二、RxJava的使用

1、RxJava的優(yōu)勢

在Android的SDK中,給開發(fā)者提供的用于異步操作的原生內(nèi)容有AsyncTask和Handler。對于簡單的異步請求來說,使用Android原生的AsyncTask和Handler即可滿足需求,但是對于復雜的業(yè)務邏輯而言,依然使用AsyncTask和Handler會導致代碼結(jié)構(gòu)混亂,代碼的可讀性非常差。
但是RxJava的異步操作是基于觀察者模式實現(xiàn)的,在越來越復雜的業(yè)務邏輯中,RxJava依舊可以保持簡潔

2、RxJava的配置

首先,在Android Studio中配置Module的build.gradle,在這里我們使用的版本是1.2版本,并且導入RxAndroid,輔助RxJava完成線程調(diào)度:

        implementation "io.reactivex:rxjava:1.2.0"
        implementation "io.reactivex:rxandroid:1.2.0"

然后,RxJava基于觀察者設計模式,其中的關鍵性三個步驟如下:

(1)Observable被觀察者

Observable被觀察者創(chuàng)建的代碼如下:

        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Alex");
                subscriber.onCompleted();
            }
        });

在這里,要強調(diào)的是Observable被觀察者是類類型,其中有諸多方法,我們關注其構(gòu)造函數(shù)與創(chuàng)建Observable對象的方法,查看如下圖對應的視圖結(jié)構(gòu):
圖2.2.1 Observable被觀察者對象的視圖結(jié)構(gòu)

查看源碼:

        protected Observable(OnSubscribe<T> f) {
           this.onSubscribe = f;
        }

        public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        
        }
        public static <T> Observable<T> create(OnSubscribe<T> f) {
           return new Observable<T>(RxJavaHooks.onCreate(f));
        }
  
        public static <S, T> Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe) {
           return create((OnSubscribe<T>)syncOnSubscribe);
        }

        public static <S, T> Observable<T> create(AsyncOnSubscribe<S, T> asyncOnSubscribe) {
           return create((OnSubscribe<T>)asyncOnSubscribe);
        }

通過源碼分析,可知Observable提供了create()方法來獲取Observable實例對象。
此外,除了基本的創(chuàng)建的方法,Observable還提供了便捷的創(chuàng)建Observable序列的兩種方式,代碼如下:

  • 第一種,會將參數(shù)逐個發(fā)送
        Observable<String> observable1 = Observable.just("Alex","Payne");
  • 第二種,會將數(shù)組元素逐個轉(zhuǎn)換完畢后逐個發(fā)送
        String[] observableArr = {"Alex", "Payne"};
        Observable<String> observable2 = Observable.from(observableArr);

其中Observable.just()方法會調(diào)用from()方法,詳情可查看源碼。

(2)Observer觀察者

Observer觀察者創(chuàng)建的代碼如下:

        Observer<String> observer = new Observer<String>() {

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError,Error Info is:" + e.getMessage());
            }

            @Override
            public void onNext(String s) {
                Log.e(TAG, s);
            }
        };

Observer是接口,其中包含的方法有onCompleted()、onNext()、onError()。查看如下圖所示Observer的視圖結(jié)構(gòu):

圖2.2.2 Observer觀察者對象的視圖結(jié)構(gòu)

那么在RxJava中,Observer有其接口實現(xiàn)類對象Subscriber,它們的使用onNext、onCompleted、onError方法是一樣的,但是Subscriber對于Observer接口進行了拓展,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉(zhuǎn)換成一個 Subscriber 再使用,代碼如下:

        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onStart() {
                Log.e(TAG, "onStart");
            }

            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError,Error Info is:" + e.getMessage());

            }

            @Override
            public void onNext(String s) {
                Log.e(TAG, s);
            }
        };

其中,onStart()方法會在事件未發(fā)送前被調(diào)用,可以用于訂閱關系建立前的準備工作,例如將數(shù)據(jù)清空或者重置,在Subscriber中默認是空實現(xiàn),我們可以在該方法中調(diào)用自己的業(yè)務邏輯代碼。在如下的視圖結(jié)構(gòu)中我們可以看到Subscriber的拓展內(nèi)容,重點是add()、unsubscribe()方法以及名為subscription的Subscription隊列。

圖2.2.3 Subscriber對象視圖結(jié)構(gòu)

(3)Subscribe訂閱關系

Observable與observer形成訂閱關系代碼如下:

            observable.subscribe(observer);
            //或者
            observable.subscribe(subscriber);

那么我們以observable.subscribe(observer)為例在這里繼續(xù)查看源碼,查看subscribe()方法到底做了什么:


圖2.3.1 Observable調(diào)用Subscribe將Observer轉(zhuǎn)換為Subscriber對象

Observer轉(zhuǎn)換為Subscriber對象在這里得到印證。

  • 在之后的內(nèi)容中統(tǒng)一以Subscriber作為訂閱觀察者對象

繼續(xù)深入,我們可以看到訂閱關系中的關鍵步驟(僅核心代碼):

            subscriber.onStart();

            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

            return RxJavaHooks.onObservableReturn(subscriber);

在這里RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)等價于OnSubscribe.call(subscriber),見下圖2.3.2:

圖2.3.2 RxJavaHooks.onObservableStart()轉(zhuǎn)換為OnSubscribe
在return RxJavaHooks.onObservableReturn(subscriber)這里等價于return subscription,見下圖2.3.3:
圖2.3.3 RxJavaHooks.onObservableReturn()轉(zhuǎn)換為Subscrition

  • 可以看到,subscriber() 做了3件事:
  1. 調(diào)用 Subscriber.onStart() 。該方法用于在訂閱關系建立之前的準備。
  2. 調(diào)用 Observable 中的 OnSubscribe.call(Subscriber) 。OnSubscribe是Observable的內(nèi)部接口,而事件發(fā)送的邏輯在這里開始運行。從這也可以看出,在 RxJava 中當 subscribe() 方法執(zhí)行的時候訂閱關系確立,Observable 開始發(fā)送事件。
  3. 將傳入的 Subscriber 作為 Subscription 返回。這是為了方便后續(xù)調(diào)用unsubscribe()。

三、RxJava的不完整回調(diào)

1、不完整回調(diào)的代碼示例

        Observable<String> observable = Observable.just("Alex", "Payne");
        Action1<String> onNextAction = new Action1<String>() {
            @Override
            public void call(String s) {
                Log.e(TAG, s);
            }
        };
        Action1<Throwable> onErrorAction = new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                Log.e(TAG, "onError,Error Info is:" + throwable.getMessage());
            }
        };
        Action0 onCompletedAction = new Action0() {
            @Override
            public void call() {
                Log.e(TAG, "onCompleted");
            }
        };

        // 根據(jù)onNextAction 來定義 onNext()
        observable.subscribe(onNextAction);
        // 根據(jù)onNextAction 來定義 onNext()、根據(jù)onErrorAction 來定義 onError()
        observable.subscribe(onNextAction, onErrorAction);
        // 根據(jù)onNextAction 來定義 onNext()、根據(jù)onErrorAction 來定義 onError()、onCompletedAction 來定義 onCompleted()
        observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

2、不完整回調(diào)的原理分析

在這里我們可以看到:

Action0無參數(shù)泛型無返回值類型,而Subscriber中的onCompleted()方法也沒有參數(shù)泛型
Action1有1個參數(shù)泛型無返回值類型 ,onNextAction設置的參數(shù)泛型為String,而Subscriber中的onNext()方法參數(shù)泛型也是String(和本文中觀察者對象中的OnNext方法對比)
Action1有1個參數(shù)泛型無返回值類型,onErrorAction設置的參數(shù)泛型為Throwable,而Subscriber中的onError()方法參數(shù)泛型也是Throwable

那么,我們來查看observable.subscribe(onNextAction)的源碼,在這里, Action1可以被當成一個包裝對象,將onNext()方法進行包裝作為不完整的回調(diào)傳入到observable.subscribe()中

圖3.2.1 傳入的onNextAction最終被包裝成ActionSubscriber

我們來看看Action1有何玄機,Action1的源碼如下圖所示:

圖3.2.2 Action1接口源碼
實質(zhì)上,這種根據(jù)參數(shù)泛型的個數(shù)且無返回值類型的包裝在RxJava中有多種如下圖所示的體現(xiàn),例如Action0的參數(shù)個數(shù)為0,Action1的參數(shù)個數(shù)為1以此類推:
圖3.2.3 根據(jù)參數(shù)泛型的個數(shù)且無返回值類型的包裝

四、RxJava的線程切換

1、Scheduler線程調(diào)度器

如果不指定線程,默認是在調(diào)用subscribe方法的線程上進行回調(diào),那么如果子線程中調(diào)用subscibe方法,而想在主線程中進行UI更新,則會拋出異常。當然了RxJava已經(jīng)幫我們考慮到了這一點,所以提供了Scheduler線程調(diào)度器幫助我們進行線程之間的切換。
實質(zhì)上,Scheduler線程調(diào)度器和RxJava的操作符有緊密關聯(lián),我將在下一篇文章中進行詳細介紹。

  • RxJava內(nèi)置了如下所示幾個的線程調(diào)度器:
  1. Schedulers.immediate():在當前線程中執(zhí)行
  2. Schedulers.newThread():啟動新線程,在新線程中進行操作
  3. Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免創(chuàng)建不必要的線程。
  4. Schedulers.computation():計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
  5. Schedulers.trampoline():會將任務按添加順序依次放入當前線程中等待執(zhí)行。線程一次只執(zhí)行一個任務,其余任務排隊等待,一個任務都執(zhí)行完成后再開始下一個任務的執(zhí)行。
  • 此外RxJava還提供了用于測試的調(diào)度器Schedulers.test() 及 可自定義Scheduler—-Schedulers.form() 。

RxAndroid并且其為我們提供了AndroidSchedulers.mainThread()進行主線程的回調(diào)

2、線程控制

調(diào)用Observable對象的subscribeOn()、observeOn()方法即可完成線程控制。

  • subscribeOn(): 指定 subscribe() 所發(fā)生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫做事件產(chǎn)生的線程。
  • observeOn(): 指定 Subscriber 所運行在的線程?;蛘呓凶鍪录M的線程。
            Observable.just("Alex", "Payne")
           .subscribeOn(Schedulers.io())//指定 subscribe() 所發(fā)生的線程
           .unsubscribeOn(Schedulers.io())//事件發(fā)送完畢后,及時取消發(fā)送
           .observeOn(AndroidSchedulers.mainThread())//指定 Subscriber 所運行在的線程
           .subscribe(new Action1<String>() {
               @Override
                 public void call(String s) {
                     Log.e(TAG, s);
                }
            });

五、總結(jié)

本文主要介紹了RxJava的由來、使用步驟、部分內(nèi)容的原理解析。在下篇文章中我會詳細介紹RxJava的操作符。希望本文對你在學習RxJava的路上有所啟發(fā)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容