RxJava——入門篇

一、ReactiveX簡介

在學(xué)習(xí)RxJava前首先需要了解ReactiveX,因為RxJava是ReactiveX的一種Java的實現(xiàn)形式。

ReactiveX的官網(wǎng)地址為:

http://reactivex.io/

ReactiveX官網(wǎng)對于自身的介紹是:

An APIforasynchronous programmingwithobservable streams

實質(zhì)上我們可以對其解讀為三部分:

ReactiveX的解讀

①An API: 首先它是個編程接口,不同語言提供不同實現(xiàn)。例如Java中的RxJava。

②For asynchronous programming: 在異步編程設(shè)計中使用。例如開啟子線程處理耗時的網(wǎng)絡(luò)請求。

③With observable streams: 基于可觀察的事件流。例如觀察者模式中觀察者對被觀察者的監(jiān)聽。

而ReactiveX結(jié)合了如下三部分內(nèi)容:

觀察者模式,即定義對象間一種一對多的依賴關(guān)系,當(dāng)一個對象改變狀態(tài)時,則所有依賴它的對象都會被改變。

Iterator模式,即迭代流式編程模式。

函數(shù)式編程模式,即提供一系列函數(shù)樣式的方法供快速開發(fā)。

Reactive的模式圖如下:

圖1.1 ReactiveX的模式圖

二、RxJava的使用

1、RxJava的優(yōu)勢

在Android的SDK中,給開發(fā)者提供的用于異步操作的原生內(nèi)容有AsyncTask和Handler。對于簡單的異步請求來說,使用Android原生的AsyncTask和Handler即可滿足需求,但是對于復(fù)雜的業(yè)務(wù)邏輯而言,依然使用AsyncTask和Handler會導(dǎo)致代碼結(jié)構(gòu)混亂,代碼的可讀性非常差。

但是RxJava的異步操作是基于觀察者模式實現(xiàn)的,在越來越復(fù)雜的業(yè)務(wù)邏輯中,RxJava依舊可以保持簡潔

2、RxJava的配置

首先,在Android Studio中配置Module的build.gradle,在這里我們使用的版本是1.2版本,并且導(dǎo)入RxAndroid,輔助RxJava完成線程調(diào)度:

implementation"io.reactivex:rxjava:1.2.0"implementation"io.reactivex:rxandroid:1.2.0"

然后,RxJava基于觀察者設(shè)計模式,其中的關(guān)鍵性三個步驟如下:

(1)Observable被觀察者

Observable被觀察者創(chuàng)建的代碼如下:

Observable observable = Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){? ? ? ? ? ? ? ? subscriber.onNext("Alex");? ? ? ? ? ? ? ? subscriber.onCompleted();? ? ? ? ? ? }? ? ? ? });

在這里,要強調(diào)的是Observable被觀察者是類類型,其中有諸多方法,我們關(guān)注其構(gòu)造函數(shù)與創(chuàng)建Observable對象的方法,查看如下圖對應(yīng)的視圖結(jié)構(gòu):

圖2.2.1 Observable被觀察者對象的視圖結(jié)構(gòu)

查看源碼:

protectedObservable(OnSubscribe f){this.onSubscribe = f;? ? ? ? }publicinterfaceOnSubscribeextendsAction1>{? ? ? ? ? ? ? ? }

publicstaticObservablecreate(OnSubscribe f){returnnewObservable(RxJavaHooks.onCreate(f));? ? ? ? }publicstaticObservablecreate(SyncOnSubscribe syncOnSubscribe){returncreate((OnSubscribe)syncOnSubscribe);? ? ? ? }publicstaticObservablecreate(AsyncOnSubscribe asyncOnSubscribe){returncreate((OnSubscribe)asyncOnSubscribe);? ? ? ? }

通過源碼分析,可知Observable提供了create()方法來獲取Observable實例對象。

此外,除了基本的創(chuàng)建的方法,Observable還提供了便捷的創(chuàng)建Observable序列的兩種方式,代碼如下:

第一種,會將參數(shù)逐個發(fā)送

Observable observable1 = Observable.just("Alex","Payne");

第二種,會將數(shù)組元素逐個轉(zhuǎn)換完畢后逐個發(fā)送

String[] observableArr = {"Alex","Payne"};? ? ? ? Observable observable2 = Observable.from(observableArr);

其中Observable.just()方法會調(diào)用from()方法,詳情可查看源碼。

(2)Observer觀察者

Observer觀察者創(chuàng)建的代碼如下:

Observer observer =newObserver() {@OverridepublicvoidonCompleted(){? ? ? ? ? ? ? ? Log.e(TAG,"onCompleted");? ? ? ? ? ? }@OverridepublicvoidonError(Throwable e){? ? ? ? ? ? ? ? Log.e(TAG,"onError,Error Info is:"+ e.getMessage());? ? ? ? ? ? }@OverridepublicvoidonNext(String s){? ? ? ? ? ? ? ? Log.e(TAG, s);? ? ? ? ? ? }? ? ? ? };

Observer是接口,其中包含的方法有onCompleted()、onNext()、onError()。查看如下圖所示Observer的視圖結(jié)構(gòu):

圖2.2.2 Observer觀察者對象的視圖結(jié)構(gòu)

那么在RxJava中,Observer有其接口實現(xiàn)類對象Subscriber,它們的使用onNext、onCompleted、onError方法是一樣的,但是Subscriber對于Observer接口進(jìn)行了拓展,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉(zhuǎn)換成一個 Subscriber 再使用,代碼如下:

Subscriber subscriber =newSubscriber() {@OverridepublicvoidonStart(){? ? ? ? ? ? ? ? Log.e(TAG,"onStart");? ? ? ? ? ? }@OverridepublicvoidonCompleted(){? ? ? ? ? ? ? ? Log.e(TAG,"onCompleted");? ? ? ? ? ? }@OverridepublicvoidonError(Throwable e){? ? ? ? ? ? ? ? Log.e(TAG,"onError,Error Info is:"+ e.getMessage());? ? ? ? ? ? }@OverridepublicvoidonNext(String s){? ? ? ? ? ? ? ? Log.e(TAG, s);? ? ? ? ? ? }? ? ? ? };

其中,onStart()方法會在事件未發(fā)送前被調(diào)用,可以用于訂閱關(guān)系建立前的準(zhǔn)備工作,例如將數(shù)據(jù)清空或者重置,在Subscriber中默認(rèn)是空實現(xiàn),我們可以在該方法中調(diào)用自己的業(yè)務(wù)邏輯代碼。在如下的視圖結(jié)構(gòu)中我們可以看到Subscriber的拓展內(nèi)容,重點是add()、unsubscribe()方法以及名為subscription的Subscription隊列。

圖2.2.3 Subscriber對象視圖結(jié)構(gòu)

(3)Subscribe訂閱關(guān)系

Observable與observer形成訂閱關(guān)系代碼如下:

observable.subscribe(observer);//或者observable.subscribe(subscriber);

那么我們以observable.subscribe(observer)為例在這里繼續(xù)查看源碼,查看subscribe()方法到底做了什么:

圖2.3.1 Observable調(diào)用Subscribe將Observer轉(zhuǎn)換為Subscriber對象

Observer轉(zhuǎn)換為Subscriber對象在這里得到印證。

在之后的內(nèi)容中統(tǒng)一以Subscriber作為訂閱觀察者對象

繼續(xù)深入,我們可以看到訂閱關(guān)系中的關(guān)鍵步驟(僅核心代碼):

subscriber.onStart();RxJavaHooks.onObservableStart(observable,observable.onSubscribe).call(subscriber);returnRxJavaHooks.onObservableReturn(subscriber);

在這里RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)等價于OnSubscribe.call(subscriber),見下圖2.3.2:

圖2.3.2 RxJavaHooks.onObservableStart()轉(zhuǎn)換為OnSubscribe

在return RxJavaHooks.onObservableReturn(subscriber)這里等價于return subscription,見下圖2.3.3:

圖2.3.3 RxJavaHooks.onObservableReturn()轉(zhuǎn)換為Subscrition

可以看到,subscriber() 做了3件事:

調(diào)用 Subscriber.onStart() 。該方法用于在訂閱關(guān)系建立之前的準(zhǔn)備。

調(diào)用 Observable 中的 OnSubscribe.call(Subscriber) 。OnSubscribe是Observable的內(nèi)部接口,而事件發(fā)送的邏輯在這里開始運行。從這也可以看出,在 RxJava 中當(dāng) subscribe() 方法執(zhí)行的時候訂閱關(guān)系確立,Observable 開始發(fā)送事件。

將傳入的 Subscriber 作為 Subscription 返回。這是為了方便后續(xù)調(diào)用unsubscribe()。

三、RxJava的不完整回調(diào)

1、不完整回調(diào)的代碼示例

Observable observable = Observable.just("Alex","Payne");? ? ? ? Action1 onNextAction =newAction1() {@Overridepublicvoidcall(String s){? ? ? ? ? ? ? ? Log.e(TAG, s);? ? ? ? ? ? }? ? ? ? };? ? ? ? Action1 onErrorAction =newAction1() {@Overridepublicvoidcall(Throwable throwable){? ? ? ? ? ? ? ? Log.e(TAG,"onError,Error Info is:"+ throwable.getMessage());? ? ? ? ? ? }? ? ? ? };? ? ? ? Action0 onCompletedAction =newAction0() {@Overridepublicvoidcall(){? ? ? ? ? ? ? ? Log.e(TAG,"onCompleted");? ? ? ? ? ? }? ? ? ? };// 根據(jù)onNextAction 來定義 onNext()observable.subscribe(onNextAction);// 根據(jù)onNextAction 來定義 onNext()、根據(jù)onErrorAction 來定義 onError()observable.subscribe(onNextAction, onErrorAction);// 根據(jù)onNextAction 來定義 onNext()、根據(jù)onErrorAction 來定義 onError()、onCompletedAction 來定義 onCompleted()observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

2、不完整回調(diào)的原理分析

在這里我們可以看到:

Action0無參數(shù)泛型無返回值類型,而Subscriber中的onCompleted()方法也沒有參數(shù)泛型

Action1有1個參數(shù)泛型無返回值類型,onNextAction設(shè)置的參數(shù)泛型為String,而Subscriber中的onNext()方法參數(shù)泛型也是String(和本文中觀察者對象中的OnNext方法對比)

Action1有1個參數(shù)泛型無返回值類型,onErrorAction設(shè)置的參數(shù)泛型為Throwable,而Subscriber中的onError()方法參數(shù)泛型也是Throwable

那么,我們來查看observable.subscribe(onNextAction)的源碼,在這里, Action1可以被當(dāng)成一個包裝對象,將onNext()方法進(jìn)行包裝作為不完整的回調(diào)傳入到observable.subscribe()中

圖3.2.1 傳入的onNextAction最終被包裝成ActionSubscriber

我們來看看Action1有何玄機,Action1的源碼如下圖所示:

圖3.2.2? Action1接口源碼

實質(zhì)上,這種根據(jù)參數(shù)泛型的個數(shù)且無返回值類型的包裝在RxJava中有多種如下圖所示的體現(xiàn),例如Action0的參數(shù)個數(shù)為0,Action1的參數(shù)個數(shù)為1以此類推:

圖3.2.3 根據(jù)參數(shù)泛型的個數(shù)且無返回值類型的包裝

四、RxJava的線程切換

1、Scheduler線程調(diào)度器

如果不指定線程,默認(rèn)是在調(diào)用subscribe方法的線程上進(jìn)行回調(diào),那么如果子線程中調(diào)用subscibe方法,而想在主線程中進(jìn)行UI更新,則會拋出異常。當(dāng)然了RxJava已經(jīng)幫我們考慮到了這一點,所以提供了Scheduler線程調(diào)度器幫助我們進(jìn)行線程之間的切換。

實質(zhì)上,Scheduler線程調(diào)度器和RxJava的操作符有緊密關(guān)聯(lián),我將在下一篇文章中進(jìn)行詳細(xì)介紹。

RxJava內(nèi)置了如下所示幾個的線程調(diào)度器:

Schedulers.immediate():在當(dāng)前線程中執(zhí)行

Schedulers.newThread():啟動新線程,在新線程中進(jìn)行操作

Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免創(chuàng)建不必要的線程。

Schedulers.computation():計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。

Schedulers.trampoline():會將任務(wù)按添加順序依次放入當(dāng)前線程中等待執(zhí)行。線程一次只執(zhí)行一個任務(wù),其余任務(wù)排隊等待,一個任務(wù)都執(zhí)行完成后再開始下一個任務(wù)的執(zhí)行。

此外RxJava還提供了用于測試的調(diào)度器Schedulers.test() 及 可自定義Scheduler—-Schedulers.form() 。

RxAndroid并且其為我們提供了AndroidSchedulers.mainThread()進(jìn)行主線程的回調(diào)

2、線程控制

調(diào)用Observable對象的subscribeOn()、observeOn()方法即可完成線程控制。

subscribeOn(): 指定 subscribe() 所發(fā)生的線程,即 Observable.OnSubscribe 被激活時所處的線程?;蛘呓凶鍪录a(chǎn)生的線程。

observeOn(): 指定 Subscriber 所運行在的線程?;蛘呓凶鍪录M的線程。

Observable.just("Alex","Payne")? ? ? ? ? .subscribeOn(Schedulers.io())//指定 subscribe() 所發(fā)生的線程.unsubscribeOn(Schedulers.io())//事件發(fā)送完畢后,及時取消發(fā)送.observeOn(AndroidSchedulers.mainThread())//指定 Subscriber 所運行在的線程.subscribe(newAction1() {@Overridepublicvoidcall(String s){? ? ? ? ? ? ? ? ? ? Log.e(TAG, s);? ? ? ? ? ? ? ? }? ? ? ? ? ? });

五、總結(jié)

本文主要介紹了RxJava的由來、使用步驟、部分內(nèi)容的原理解析。在下篇文章中我會詳細(xì)介紹RxJava的操作符。希望本文對你在學(xué)習(xí)RxJava的路上有所啟發(fā)。

作者:Alex_Payne

鏈接:http://www.itdecent.cn/p/2d3d7c77dc92

來源:簡書

簡書著作權(quán)歸作者所有,任何形式的轉(zhuǎn)載都請聯(lián)系作者獲得授權(quán)并注明出處。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、ReactiveX簡介 在學(xué)習(xí)RxJava前首先需要了解ReactiveX,因為RxJava是Reactive...
    測天測地測空氣閱讀 351評論 0 1
  • 我從去年開始使用 RxJava ,到現(xiàn)在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy閱讀 5,740評論 7 62
  • 前言我從去年開始使用 RxJava ,到現(xiàn)在一年多了。今年加入了 Flipboard 后,看到 Flipboard...
    占導(dǎo)zqq閱讀 9,298評論 6 151
  • 上一篇文章簡單的介紹了下VR以及在安卓開發(fā)中簡單的使用VR,但局限性也很明顯就是只能看一些簡單的全景圖和VR視頻,...
    暗塵隨碼去閱讀 4,391評論 4 8
  • 春風(fēng)十里,不如你。 你曾經(jīng)許下的誓言,你還記得嗎 你曾經(jīng)得今心思想要追到的人,你還記得嗎 你曾經(jīng)為了那個人,付出所...
    遠(yuǎn)不及她閱讀 236評論 0 0

友情鏈接更多精彩內(nèi)容