Android框架——RxJava(一)概述與基本使用

RxJava(一)概述與基本使用

RxJava學(xué)習(xí)系列:

  • RxJava(一)概述與基本使用
  • [RxJava(二)創(chuàng)建操作符]

RxJava是近年來非常火熱且復(fù)雜的Android框架,本文基于RxJava 1.2.9來對(duì)其進(jìn)行分析。

使用RxJava需要在build.gradle中修改如下代碼:

dependencies {
    ...
    compile 'io.reactivex:rxjava:1.2.9' //需要添加的代碼
    compile 'io.reactivex:rxandroid:1.2.1' //rxandroid的依賴,基于rxjava的擴(kuò)展庫

}

一、RxJava概述

RxJava是函數(shù)響應(yīng)式編程在Java語言上的實(shí)現(xiàn),在了解RxJava之前我們先來簡(jiǎn)單學(xué)習(xí)下什么是函數(shù)響應(yīng)式編程。

函數(shù)響應(yīng)式編程

函數(shù)響應(yīng)式編程是函數(shù)式編程和響應(yīng)式編程這兩種編程范式的結(jié)合。

函數(shù)式編程(Functional Programming)是通過函數(shù)的調(diào)用與組合來處理數(shù)據(jù),獲取計(jì)算結(jié)果的一種編程范式。
在函數(shù)式編程中,函數(shù)是"第一等公民",即與其他數(shù)據(jù)類型地位相同:

  • 可以賦值給其他變量,因?yàn)樵谠诤瘮?shù)式編程中,只用"表達(dá)式",即每一步都是一個(gè)運(yùn)算過程,都有返回值,函數(shù)也必須都有返回值
  • 也可以作為參數(shù)傳遞給其他函數(shù),比如閉包作為參數(shù)傳遞。

函數(shù)式編程的優(yōu)點(diǎn):

  • 簡(jiǎn)潔易懂,通過函數(shù)的鏈?zhǔn)秸{(diào)用使代碼可讀性更強(qiáng)
  • 只依賴輸入的特性,每一個(gè)函數(shù)都可以看做一個(gè)獨(dú)立的單元,使代碼更易管理
  • 由于函數(shù)不修改變量,所以不需要考慮死鎖的問題,易于并發(fā)編程

響應(yīng)式編程(Reactive Programming)是一種面向數(shù)據(jù)流和變化傳播的編程范式。
在響應(yīng)式編程中,任何的事件都看做是數(shù)據(jù)流的形式。上游發(fā)射數(shù)據(jù)流,下游監(jiān)聽數(shù)據(jù)流,在傳遞的過程中,對(duì)數(shù)據(jù)流進(jìn)行過濾,轉(zhuǎn)變,合并,去重等處理,當(dāng)下游接受到數(shù)據(jù)流時(shí),對(duì)其做出響應(yīng)。
在界面顯示中,將要顯示的數(shù)據(jù)源(從網(wǎng)絡(luò)請(qǐng)求,數(shù)據(jù)庫查詢中得到)以數(shù)據(jù)流的形式,通過一系列的流轉(zhuǎn)過程(對(duì)數(shù)據(jù)進(jìn)行處理,后臺(tái)線程發(fā)送到UI線程),交給界面,界面在對(duì)數(shù)據(jù)做出相應(yīng)的響應(yīng)。在界面的交互中,也是如此,將用戶輸入的點(diǎn)擊,觸摸等事件以數(shù)據(jù)流的形式經(jīng)過層層傳遞交給對(duì)應(yīng)的窗口,在到對(duì)應(yīng)的控件,控件監(jiān)聽到相應(yīng)的事件后,響應(yīng)用戶的行為。

函數(shù)響應(yīng)式編程(Functional reactive programming)是通過函數(shù)塊(map,reduce,filter)來處理異步數(shù)據(jù)流的一種編程范式。
在RxJava中,函數(shù)響應(yīng)式編程通過設(shè)置一個(gè)可觀察對(duì)象(Observable)和觀察者(Observer/Subscriber),Subscriber監(jiān)聽Observable的事件,Observable以數(shù)據(jù)流的形式發(fā)送事件,再通過一系列函數(shù)的鏈?zhǔn)秸{(diào)用(map/flatmap/filter等)對(duì)數(shù)據(jù)流進(jìn)行轉(zhuǎn)變和通過線程調(diào)度器(Scheduler)對(duì)數(shù)據(jù)流進(jìn)行并發(fā)處理,最后由Subscriber接受到事件后對(duì)事件作出響應(yīng)。

RxJava的定義

了解了函數(shù)響應(yīng)式編程后,我們就能理解RxJava的定義了。

RxJava是響應(yīng)式擴(kuò)展的JVM實(shí)現(xiàn):一個(gè)使用可觀察序列組成異步的,基于事件的程序的庫。

總的來說,RxJava就是一個(gè)實(shí)現(xiàn)異步操作的庫,相比于AsyncTask/Handler等異步操作的機(jī)制來說,RxJava的優(yōu)點(diǎn)在于其使用了函數(shù)式的編程范式,使用函數(shù)的鏈?zhǔn)秸{(diào)用對(duì)數(shù)據(jù)流的發(fā)送,流轉(zhuǎn),接受,響應(yīng)進(jìn)行處理,使得代碼異常簡(jiǎn)潔明了,再加上JDK8中支持的lambda表達(dá)式可以使代碼變的更為簡(jiǎn)化。

二、觀察者模式

RxJava的實(shí)現(xiàn)使用了觀察者模式,我們就來講一下觀察者模式。

什么是觀察者模式

觀察者模式的機(jī)制是存在一個(gè)被觀察者和觀察者,觀察者對(duì)被觀察者的某種特征進(jìn)行監(jiān)聽,當(dāng)這種特征發(fā)生變化時(shí),觀察者立刻做出反應(yīng)。例如,在監(jiān)獄里,獄警是觀察者,犯人是被觀察者,獄警對(duì)犯人的打開牢門的行為進(jìn)行了監(jiān)聽,當(dāng)犯人打開牢門時(shí),獄警需要立刻沖上去把犯人制服。這就是觀察者模式,獄警需要時(shí)刻緊盯犯人的行為,觀察其是否有打開牢門的行為。

編程中的觀察者模式

程序中的觀察者模式則有略微不同,觀察者不需要時(shí)刻去緊盯被觀察者,而是去訂閱(Subscribe)或注冊(cè)(Register)觀察者感興趣的行為,當(dāng)被觀察者發(fā)生這種行為時(shí),讓被觀察者去通知他(在上例中就是獄警告訴犯人,你要開牢門的時(shí)候要提醒我)。

通常的模式是這樣的:Observable內(nèi)部有一個(gè)成員變量mObserver,通過訂閱函數(shù)setObserver來設(shè)置訂閱者,建立訂閱關(guān)系,同時(shí)在被觀察的事件發(fā)生時(shí)調(diào)用通知函數(shù)notifyObserver來通知mObserver去調(diào)用它的響應(yīng)函數(shù)。

點(diǎn)擊事件中的觀察者模式

Android中有很多觀察者的例子,比如注冊(cè)監(jiān)聽事件就是一個(gè)很典型的例子。被觀察者View調(diào)用setOnClickListener來給自己設(shè)置一個(gè)觀察者OnClickListener,約定好OnClickListener訂閱了View的點(diǎn)擊事件。當(dāng)View被點(diǎn)擊時(shí),會(huì)調(diào)用performClick方法去通知OnClickListener,調(diào)用OnClickListener.onClick方法,即OnClickListener對(duì)點(diǎn)擊事件作出響應(yīng)。

View&OnClickListener.png

三、RxJava的基本使用

RxJava中基本的實(shí)現(xiàn)為創(chuàng)建觀察者,創(chuàng)建被觀察者,建立訂閱關(guān)系

第一步:創(chuàng)建Subscriber/Observer

首先是創(chuàng)建一個(gè)觀察者Subscriber/Observer,其中Observer是一個(gè)接口,其中包含onNext(T t),onCompleted()onError(Throwable e),三個(gè)抽象方法,而Subscriber是一個(gè)抽象類,實(shí)現(xiàn)了Observer接口,上述三個(gè)方法的實(shí)現(xiàn)由其子類來實(shí)現(xiàn),在使用上兩者區(qū)別不大,在Observable.subscribe方法中如果傳入Observer,會(huì)將其封裝成Subscriber來進(jìn)行訂閱的。Subscriber提供了更多的功能,在之后再進(jìn)行講解。

創(chuàng)建Subscriber

Subscriber<String> subscriber = new Subscriber<>() {
    @Override
    public void onNext(String s) {
        Log.d("TAG", "onNext: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d("TAG", "onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        Log.d("TAG", "onError");
    }
};

RxJava中將事件看做是一個(gè)隊(duì)列,關(guān)于這三個(gè)事件回調(diào)方法作用如下:

  • onNext(T t):不斷的處理下一個(gè)事件,直至隊(duì)尾
  • onCompleted():當(dāng)不在有新的事件時(shí),調(diào)用這個(gè)方法作為結(jié)束標(biāo)志,這個(gè)方法會(huì)在最后一個(gè)onNext調(diào)用之后調(diào)用
  • onError(Throwable e):當(dāng)事件隊(duì)列出現(xiàn)異常時(shí),會(huì)觸發(fā)這個(gè)方法,并終止事件隊(duì)列,不在處理新的事件。

onErroronCompleted在一個(gè)處理流里面只會(huì)調(diào)用一個(gè)。

第二步:創(chuàng)建Observable

調(diào)用Observable.create(OnSubcribe)來創(chuàng)建一個(gè)Observable,OnSubscirbe繼承了Action1<T>接口,Action1<T>接口中只包含一個(gè)方法void call(T t)。在RxJava源碼中有很多ActionX的接口,X表示接口的泛型個(gè)數(shù)和call方法的參數(shù)個(gè)數(shù),call中每個(gè)參數(shù)與泛型類型一一對(duì)應(yīng)。

我們先來看OnSubscribe的代碼:

/**
     * Invoked when Observable.subscribe is called.
     * @param <T> the output value type
     */
    public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        // cover for generics insanity
    }

從注釋可以看出在建立訂閱關(guān)系Observable.subscribe調(diào)用后會(huì)去調(diào)用OnSubscribe.call方法,所以call中就應(yīng)該實(shí)現(xiàn)Subscriber對(duì)事件作出的響應(yīng)的處理邏輯。

創(chuàng)建Observable:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscribe<? super String> subscriber) {
        //subcricber對(duì)事件的響應(yīng)邏輯
        subscriber.onNext("first");
        subscriber.onNext("second");
        subscriber.onNext("third");
        subscriber.onCompleted();
    }
} );

除了通過Observable.create來創(chuàng)建Observable還可以通過just(T... t),from(T[] t),from(Iterable<? extends T> iterable)來進(jìn)行創(chuàng)建Observable`

just傳入的是多個(gè)事件參數(shù):

Observable observable = Observable.just("first", "second", "third");

from傳入的是一個(gè)事件數(shù)組:

String[] s = {"first", "second", "third"};
Observable observable = Observable.from(s);

上述兩種方式都和第一種方式?jīng)]有區(qū)別,justfrom都在其內(nèi)部對(duì)事件回調(diào)函數(shù)進(jìn)行了調(diào)用

第三步:建立訂閱關(guān)系

通過Observable.subscribe(subscriber)來進(jìn)行訂閱關(guān)系的建立

observable.subscribe(subscirber);

subscribe返回的不是Observable,是Subscription接口,后序系列詳解這個(gè)接口

除了這種方式,RxJava還提供了訂閱不完整定義的事件回調(diào)函數(shù),在subscribe方法中直接傳入事件回調(diào)函數(shù):

Subscription subscribe(final Action1<? super T> onNext)
Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError) 
Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted) {

前面已經(jīng)講過了ActionX接口的含義,這里subscribe的三個(gè)重載就是可以不用麻煩的創(chuàng)建Subscriber,而是直接去實(shí)現(xiàn)回調(diào)方法就行了。三個(gè)回調(diào)方法與重載參數(shù)的順序是確定的,看上述方法參數(shù)的形參名就知道了。

以第三個(gè)方法為例,實(shí)現(xiàn)跟之前創(chuàng)建Subscriber一樣的效果:

observable.subscribe(new Action1<String>() {
  @Override
  public void call(String s) {
    System.out.println("onNext: " + s);
  }
}, new Action1<Throwable>() {
  @Override
  public void call(Throwable throwable) {
    System.out.println("onError");
  }
}, new Action0() {
  @Override
  public void call() {
    System.out.println("onCompleted");
  }
});

可以根據(jù)方法的重載(第一種和第二種情況),不用將三個(gè)方法都實(shí)現(xiàn),這樣未實(shí)現(xiàn)的方法就默認(rèn)為什么操作都不執(zhí)行。

通過以上方式,我們就實(shí)現(xiàn)了對(duì)RxJava的基本使用,RxJava使用函數(shù)式編程的思想,支持鏈?zhǔn)秸{(diào)用,合起來代碼如下:

Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
          //subcricber對(duì)事件的響應(yīng)邏輯
          subscriber.onNext("first");
          subscriber.onNext("second");
          subscriber.onNext("third");
          subscriber.onCompleted();
        }
      }).subscribe(new Subscriber<String>() {
        @Override
        public void onNext(String s) {
          System.out.println("onNext: " + s);
        }

        @Override
        public void onCompleted() {
          System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable e) {
          System.out.println("onError");
        }
      });

運(yùn)行結(jié)果如下:

onNext: first
onNext: second
onNext: third
onCompleted

整個(gè)的調(diào)用流程如下:

Observable&&Subscriber.png

首先通過Observable.create(OnSubscribe)來創(chuàng)建一個(gè)Observable對(duì)象,(當(dāng)然你也可以通過just或者from的方式來創(chuàng)建,這樣就不需要OnSubscribe了,在內(nèi)部會(huì)自動(dòng)去調(diào)用Subscriber的事件回調(diào)方法)。然后調(diào)用Observable.subcribe(Subscriber)傳入一個(gè)實(shí)現(xiàn)了Subscriber的實(shí)現(xiàn)類,這個(gè)實(shí)現(xiàn)類里面實(shí)現(xiàn)了onNextonCompleted,onError三個(gè)方法。
Observable.subcribe(Subscriber)的內(nèi)部會(huì)去調(diào)用OnSubscribe.call()這個(gè)方法,而我們?cè)趧?chuàng)建Observable時(shí)傳入的OnSubscribe的匿名內(nèi)部類中實(shí)現(xiàn)了call(Subscriber<? super String> subscriber)方法,去調(diào)用傳入進(jìn)來的subscriber的事件回調(diào)的三個(gè)函數(shù),來對(duì)事件作出響應(yīng)。

由此可以發(fā)現(xiàn),RxJava中的觀察者模式與一般的不同點(diǎn)在于:

  • RxJava中的事件是一個(gè)虛擬的事件,不是來源于外部注入的事件,而是是在創(chuàng)建被觀察者的時(shí)候就已經(jīng)將事件給注入進(jìn)去了。
  • RxJava中被觀察者是在與觀察者建立訂閱關(guān)系的同時(shí),內(nèi)部調(diào)用OnSubscribecall方法去通知觀察者對(duì)事件作出響應(yīng)的。

四、RxJava的線程調(diào)度

前面已經(jīng)講述了RxJava的基本使用,但是你看了之后可能發(fā)現(xiàn)這個(gè)RxJava不是多此一舉,整些花里胡哨的,直接調(diào)用幾個(gè)方法去對(duì)這些事件進(jìn)行處理不就行了,為什么還要整那些觀察者,被觀察者干嘛呢。剛開始我學(xué)習(xí)RxJava的時(shí)候也是這樣覺得的,就覺得這個(gè)東西沒什么用,還把自己搞的暈頭轉(zhuǎn)向的,但是后來學(xué)習(xí)了RxJava的異步處理之后才知道這樣做的好處。其實(shí)前面也有將到RxJava的核心思想和優(yōu)勢(shì),就是在于對(duì)數(shù)據(jù)流的轉(zhuǎn)換和對(duì)異步操作的處理。所以這里我們就來先講一下RxJava中的線程調(diào)度。

Scheduler

RxJava中提供了一個(gè)Scheduler——線程調(diào)度器的機(jī)制,其作用是指定RxJava中的每個(gè)過程在什么線程中運(yùn)行,所以現(xiàn)在你可能就明白了一個(gè)很簡(jiǎn)單的事件響應(yīng)為什么要分成多個(gè)過程來處理了,其原因就是為了利于線程的調(diào)度。

Schedulers中定義了幾個(gè)常用的Scheduler(幾個(gè)靜態(tài)方法的返回值):

  • Schedulers.immediate():默認(rèn)的Scheduler,直接在當(dāng)前線程運(yùn)行,相當(dāng)于不指定線程
  • Schedulers.newThread():?jiǎn)?dòng)一個(gè)新線程,在新線程中執(zhí)行任務(wù)
  • Scheduler.io():用于IO異步阻塞操作的Scheduler,內(nèi)部是用無上限的線程池實(shí)現(xiàn)的,可以重用空閑的線程,效率比newThread要高。不要把計(jì)算操作放在這個(gè)Scheduler中。
  • Scheduler.computation():用于事件循環(huán),回調(diào)以及其他計(jì)算工作的Scheduler,內(nèi)部是一個(gè)固定大小為CPU核數(shù)的線程池,不能把IO阻塞操作放在這個(gè)線程中
  • Schedulers.from(executor):使用指定的executor作為Scheduler
  • AndroidSchedulers.mainThread():RxAndroid中添加的Scheduler,表示在Android主線程中運(yùn)行(RxAndroid是一個(gè)基于RxJava的擴(kuò)展庫,兼容了Android的一些特性)

現(xiàn)在我們就可以去設(shè)置每個(gè)過程中的Scheduler來指定每個(gè)過程的執(zhí)行線程了。在RxJava中通過subcsribeOn()observeOn()來設(shè)置執(zhí)行線程,subscribeOn指定Observable.OnSubscribe調(diào)用call()方法時(shí)所在的線程,即事件產(chǎn)生的線程,observeOn()指定Subscriber事件回調(diào)的執(zhí)行線程,即事件響應(yīng)的線程。

那么現(xiàn)在我們就來寫一個(gè)從IO線程讀取圖片的例子:

Observable.create(new Observable.OnSubscribe<Bitmap>() {

  @Override
  public void call(Subscriber<? super Bitmap> subscriber) {
    Bitmap bitmap = BitmapFactory.decodeFile(picPath);
    subscriber.onNext(bitmap);
    subscriber.onCompleted();
  }
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Bitmap>() {
  @Override
  public void onCompleted() {
    Toast.makeText(RxJavaActivity.this, "圖片加載成功", Toast.LENGTH_LONG).show();
  }

  @Override
  public void onError(Throwable e) {

  }

  @Override
  public void onNext(Bitmap bitmap) {
    mIvLocalImage.setImageBitmap(bitmap);
  }
});

上述代碼就實(shí)現(xiàn)了在IO線程中去讀取SD卡上的圖片,然后在主線程中顯示出來。將上述代碼中完整的subscriber實(shí)現(xiàn)變?yōu)椴煌暾氖录卣{(diào)函數(shù),并且使用lambda表達(dá)式,代碼如下:

Observable.<Bitmap>create(subscriber -> {
              Bitmap bitmap = BitmapFactory.decodeFile(picPath);
              subscriber.onNext(bitmap);
              subscriber.onCompleted();
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                bitmap -> mIvLocalImage.setImageBitmap(bitmap),
                e -> {},
                () -> Toast.makeText(RxJavaActivity.this, "圖片加載成功", Toast.LENGTH_LONG).show()
            );

RxJava配合Lambda表達(dá)式實(shí)現(xiàn)異步操作,是不是使用起來特別方便簡(jiǎn)潔!

ps:以上就是關(guān)于RxJava的概述以及RxJava的基本使用了,關(guān)于RxJava的后續(xù)學(xué)習(xí)我會(huì)在后面的文章中進(jìn)行講解的,敬請(qǐng)關(guān)注!

[RxJava(二)創(chuàng)建操作符]:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容