rxjava

Rxjava到底是什么

一個(gè)詞:異步
一個(gè)可以在java VM上使用可觀測(cè)的序列來(lái)組成異步的、基本事件的程序庫(kù)
一個(gè)實(shí)現(xiàn)異步操作的庫(kù)

RXJava優(yōu)缺點(diǎn)

簡(jiǎn)潔
隨著程序的邏輯變得越來(lái)越復(fù)雜,它依然能夠保持簡(jiǎn)潔。

API介紹和原理解析

1.概念:擴(kuò)展的觀察者模式

RXjava的異步實(shí)現(xiàn),是通過(guò)一種擴(kuò)展的觀察者模式

觀察者模式

觀察者模式的面向需求是:對(duì)象A(觀察者)對(duì)對(duì)象B(被觀察者)的某種變化,高度敏感,需要在B變化的一瞬間做出反應(yīng)。
觀察者模式采用注冊(cè)(register)或者成為訂閱(subscrible)的方式,告訴被觀察者,我需要你的某某狀態(tài),你要在它變化的時(shí)候告訴我。
Android開(kāi)發(fā)中典型的例子就是view的點(diǎn)擊監(jiān)聽(tīng)器OnClickLinstener()。對(duì)設(shè)置onClickListener來(lái)說(shuō),view是被觀察者,OnClickListener是觀察者,二者通過(guò)setOnClickListener完成訂閱關(guān)系。訂閱完成之后,用戶點(diǎn)擊view的瞬間,Android Framework就會(huì)將點(diǎn)擊事件交給已經(jīng)注冊(cè)的onClickListener采取這樣被動(dòng)的觀察方式,既省去了反復(fù)檢索狀態(tài)的資源消耗,也能夠得到最高的反饋速度。

OnClickListener 的模式大致如下圖:

[圖片上傳失敗...(image-2917dc-1512961567324)]

如圖所示,通過(guò)setOnClickListener方法Button持有OnClickListener的引用,當(dāng)用戶點(diǎn)擊Button,自動(dòng)調(diào)用OnclickListener里的onClick方法。
把圖片抽象出來(lái) Button->被觀察者 OnClickListener->觀察者 setOnClickListener->訂閱 onClick->事件。就由專用的觀察者模式(例如只用于監(jiān)聽(tīng)控件點(diǎn)擊)轉(zhuǎn)變成了通用的觀察者模式。

[圖片上傳失敗...(image-ed19bb-1512961567324)]

而 RxJava 作為一個(gè)工具庫(kù),使用的就是通用形式的觀察者模式。

RXJava的觀察者模式

RXjava有4個(gè)感念:

  • Observable 被觀察者
  • Observer 觀察者
  • subscribe 訂閱
  • 事件

Observable和Observer通過(guò)Subscribe方法實(shí)現(xiàn)訂閱,從而Observable可以在需要的時(shí)候發(fā)出事件通知Observer。

與傳統(tǒng)的觀察者模式不同,除了普通事件onNext() (相當(dāng)于onClick/OnEvent),還定義兩個(gè)特殊的事件onCompleted(),onError | completed 完成 完整的|

  • onComplete() 事件隊(duì)列完結(jié)。RXJava不僅把每個(gè)事件單獨(dú)處理,還會(huì)把他們看成一個(gè)隊(duì)列RXJava規(guī)定,如果沒(méi)有新的onNext()方法發(fā)出時(shí),必須出發(fā)onCompleted方法作為完成標(biāo)志。
  • onError() 事件隊(duì)列異常,在事件處理過(guò)程中出現(xiàn)異常,會(huì)觸發(fā)onError(),并且整個(gè)事件終止,不允許在有事件發(fā)出。
  • 在一個(gè)正確運(yùn)行事件序列中,onCompleted,onError有且只有一個(gè)會(huì)被調(diào)用,而且是事件中最后一個(gè)方法,兩個(gè)方法是互斥的。即在隊(duì)列中調(diào)用了其中一個(gè),就不應(yīng)該再調(diào)用另一個(gè)。

RxJava 的觀察者模式大致如下圖:

[圖片上傳失敗...(image-f381dc-1512961567324)]

基本實(shí)現(xiàn)

基于以上的概念, RxJava 的基本實(shí)現(xiàn)主要有三點(diǎn):

創(chuàng)建Observer 觀察者

決定著事件觸發(fā)將有怎樣的行為

 Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

除了 Observer 接口之外,RxJava 還內(nèi)置了一個(gè)實(shí)現(xiàn)了 Observer 的抽象類:Subscriber。 Subscriber 對(duì) Observer 接口進(jìn)行了一些擴(kuò)展,但他們的基本使用方式是完全一樣的:

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

不僅基本使用方式一樣,實(shí)質(zhì)上,在RxJava的subscribe過(guò)程中,Observer也總是會(huì)先被轉(zhuǎn)換成Subscriber在使用。所以使用基本功能,選擇Observer或者subscriber都是一樣的。他們的區(qū)別有兩點(diǎn)。

  1. onStart() 這是subscriber新增方法,他會(huì)在subscribe剛開(kāi)始,但是事件還沒(méi)有發(fā)送之前被調(diào)用,可以用于做一些準(zhǔn)備工作,例如數(shù)據(jù)清零或者重置,這是一個(gè)可選的方法。默認(rèn)實(shí)現(xiàn)為空。但是如果對(duì)工作線程有要求的話(例如彈出一個(gè)對(duì)話框,需要在Ui線程執(zhí)行),就不能使用onStart(),因?yàn)樗偸钦{(diào)用在subscribe所發(fā)生的線程調(diào)用,而不能指定線程。如果指定線程來(lái)做準(zhǔn)備工作,可以使用doOnSubscribe()方法。
  2. unSubscribe 這是Subscriber所實(shí)現(xiàn)的另一個(gè)接口Subscription()方法,用于取消訂閱,在這個(gè)方法調(diào)用后Subscriber將不接受任何事件。一般在調(diào)用之前先使用isUnSubscribed先判斷一下?tīng)顟B(tài),unSubscribe()這個(gè)方法很重要,因?yàn)樵趕ubscribe之后,Observable會(huì)持有Subscriber的引用,這個(gè)引用如果不能及時(shí)被釋放,將有內(nèi)存泄露的風(fēng)險(xiǎn)。所以要保持良好的原則,要在不再使用的時(shí)候盡快在合適的地方(例如 onPause() onStop() 等方法中)調(diào)用 unsubscribe() 來(lái)解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生。

創(chuàng)建Observeable

Observable是被觀察者,決定在什么時(shí)候被觸發(fā),和觸發(fā)什么事件。
RXJava使用create()方法來(lái)創(chuàng)建一個(gè)Observable,并為他設(shè)置事件觸發(fā)規(guī)則。

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

這里傳入OnSubScribe對(duì)象作為參數(shù),OnSubscribe會(huì)被存入染回的Observable對(duì)象中,相當(dāng)于一個(gè)計(jì)劃表,當(dāng)OnSubscribeObservable被訂閱時(shí),OnSubscriable的call方法會(huì)被自動(dòng)調(diào)用,事件序列會(huì)按照設(shè)定依次調(diào)用onNext方法和OnCompleted方法,這樣,由被觀察者調(diào)用了觀察者的回調(diào)方法,就實(shí)現(xiàn)了被觀察者向觀察者的事件傳遞,即觀察者模式

create方法是RXJava中最基本的創(chuàng)造事件序列的方法。RXJava還提供了一些方法用來(lái)快捷創(chuàng)建事件隊(duì)列,例如:

  • just<T...> 將傳入的參數(shù)依次發(fā)送出來(lái)。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會(huì)依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
  • from(T...) / from(Iterable<? extends T> 將傳入的數(shù)組 或者 Iterable 拆分成具體對(duì)象后,依次發(fā)送出來(lái)。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會(huì)依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create(OnSubscribe) 的例子是等價(jià)的。

Subcsribe訂閱

創(chuàng)建了Observable和Observe之后,用subscribe方法將他們鏈接起來(lái),整條鏈子就可以工作了。

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

Observable.subscribe(Subscriber) 的內(nèi)部實(shí)現(xiàn)是這樣的(僅核心代碼):

// 注意:這不是 subscribe() 的源碼,而是將源碼中與性能、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼。
// 如果需要看源碼,可以去 RxJava 的 GitHub 倉(cāng)庫(kù)下載。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}

subscribe做了三件事

  1. 調(diào)用的Subscriber的onStart方法,可選的準(zhǔn)備方法。
  2. 調(diào)用observable中的Call方法。在這里,事件發(fā)送的邏輯開(kāi)始運(yùn)行。在RXjava中Observable不是在創(chuàng)建時(shí)候就立即發(fā)送事件,而是在他訂閱的時(shí)候,即放subscribe執(zhí)行的時(shí)候。
  3. 將傳入的subscribe作為Subscription返回,為了方便unSubscribe。

整個(gè)關(guān)系如下

[圖片上傳失敗...(image-17e6e0-1512961567324

或者

[圖片上傳失敗...(image-7845b8-1512961567324)]

除了subscribe(Observer) 或者 subscribe(subscriable),subscribe還支持不完整定義的回調(diào),RXJava會(huì)自動(dòng)創(chuàng)建出Subscriber

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

// 自動(dòng)創(chuàng)建 Subscriber ,并使用 onNextAction 來(lái)定義 onNext()
observable.subscribe(onNextAction);
// 自動(dòng)創(chuàng)建 Subscriber ,并使用 onNextAction 和 onErrorAction 來(lái)定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動(dòng)創(chuàng)建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來(lái)定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

Action0是RxJava中的一個(gè)接口,它只有一個(gè)方法Call,這個(gè)方法是一個(gè)無(wú)參無(wú)返回值的方法,由于onCompleted也是無(wú)參無(wú)返回值的,因此action可以當(dāng)成一個(gè)包裝對(duì)象,將onCompleted內(nèi)容打包起來(lái)將自己作為一個(gè)參數(shù)傳入subscribe中,以實(shí)現(xiàn)不完整定義的回調(diào)。也可以看做將onCompleted方法傳遞進(jìn)了subscribe,相當(dāng)于某些語(yǔ)言中的閉包。

Action1也是一個(gè)接口,他同樣也只有一個(gè)方法Call(T param),這個(gè)方法無(wú)返回值,但是有一個(gè)參數(shù),與Action0同理,由于onNext onError也是只有一個(gè)單參數(shù),且沒(méi)有返回值,因此Action1可以將OnNext(obj)和onError(error)打包起來(lái)傳入subscribe中,以實(shí)現(xiàn)不完整定義的回調(diào),事實(shí)上,雖然 Action0 和 Action1 在 API 中使用最廣泛,但 RxJava 是提供了多個(gè) ActionX 形式的接口 (例如 Action2, Action3) 的,它們可以被用以包裝不同的無(wú)返回值的方法。

場(chǎng)景事例

打印字符數(shù)組

將字符串?dāng)?shù)組 names 中的所有字符串依次打印出來(lái):

 String[] names = {"馮星","曹操","趙云","馬超"};
        rx.Observable.from(names).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d(TAG, "call: " +s);
            }
        });

由 id 取得圖片并顯示

由指定的一個(gè) drawable 文件 id drawableRes 取得圖片,并顯示在 ImageView 中,并在出現(xiàn)異常的時(shí)候打印 Toast 報(bào)錯(cuò):

Observable<Drawable> observable = Observable.create(new Observable.OnSubscribe<Drawable>() {
            @Override
            public void call(Subscriber<? super Drawable> subscriber) {
                subscriber.onNext(getResources().getDrawable(R.mipmap.water));
                subscriber.onCompleted();
            }
        });
        observable.subscribe(new Subscriber<Drawable>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                Toast.makeText(MainActivity.this,e.getMessage(),Toast.LENGTH_SHORT).show();
            }

            @Override
            public void onNext(Drawable drawable) {
                iv_demo.setImageDrawable(drawable);
            }
        });

正如上面兩個(gè)例子這樣,創(chuàng)建出 Observable 和 Subscriber ,再用 subscribe() 將它們串起來(lái),一次 RxJava 的基本使用就完成了。非常簡(jiǎn)單。

[圖片上傳失敗...(image-9074f5-1512961567324)]

在RXjava默認(rèn)規(guī)則里,事件的發(fā)出和消費(fèi)都在同一個(gè)線程里。也就是說(shuō)上面是一個(gè)同步的觀察者模式。

而觀察者模式本身的目的在于 后臺(tái)處理,前臺(tái)調(diào)用 的異步機(jī)制。因此異步對(duì)于 RxJava 是至關(guān)重要的。而要實(shí)現(xiàn)異步,則需要用到 RxJava 的另一個(gè)概念: Scheduler 。

線程控制 -- Scheduler |si gan diu le| 調(diào)度

在不指定線程的情況下,RXjava遵循的是線程不變得原則,

即,在那個(gè)線程調(diào)用Subscribe,就在哪個(gè)線程生產(chǎn)事件;在哪個(gè)線程生產(chǎn)的事件,就在那個(gè)線程消費(fèi)事件,

如果需要切換線程,就需要用到 Scheduler (調(diào)度器)。

Schedule的API

在RXjava中,schedule--調(diào)度器,相當(dāng)于線程控制器,RXJava通過(guò)它來(lái)指定每一段代碼應(yīng)該運(yùn)行哪一個(gè)線程。

  • Schedulers.immediate() 運(yùn)行在當(dāng)前線程,相當(dāng)于不指定線程。這是默認(rèn)的schedule。 |ai mi dei rui te| 立即的 立刻的
  • Schedulers.newThread() 總是啟用新線程。并在新線程執(zhí)行操作。
  • Schedulers.io() io操作(讀寫(xiě)文件,讀寫(xiě)數(shù)據(jù)庫(kù),網(wǎng)絡(luò)信息交互等)所使用的schedule。行為模式和newThread差不多。區(qū)別在于Io的內(nèi)部實(shí)現(xiàn)是用一個(gè)無(wú)數(shù)量上線的線程池,可以重復(fù)利用閑置的線程。因此多數(shù)情況下,io要比newThread更有效。不要把計(jì)算工作放在 io() 中,可以避免創(chuàng)建不必要的線程。
  • Schedulers.computation() 計(jì)算時(shí)使用的schedule。這個(gè)計(jì)算指的是CPU密集型計(jì)算,即不會(huì)被I/O等操作限制性能的操作,例如圖形計(jì)算。這個(gè)Scheduler使用固定的線程池,大小為CPU的核數(shù),不要把I/O放在computation中,否則I/O操作的等待時(shí)間會(huì)浪費(fèi)CPU
  • AndroidSchedules.mainThread() Android專用線程,他指定的操作將在主線程中運(yùn)行。

subscribeOn() 指定subscribe()所發(fā)生的線程,即Observable.OnSubscribe()被激活時(shí)所發(fā)生的線程?;蛘呓凶鍪录a(chǎn)生的線程。

ObserveOn()指定subscriber所發(fā)生的線程或者叫做事件消費(fèi)的線程。

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });

由于subscribeOn(Schedulers.io())的指定,被創(chuàng)建的事件1,2,3,4,將會(huì)在在Io線程發(fā)出。
由于observeOn(AndroidSchedulers.mainThread())的指定,因此subscriber的數(shù)字打印將發(fā)生在主線程。

事實(shí)上,這種在 subscribe() 之前寫(xiě)上兩句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常見(jiàn),它適用于多數(shù)的 『后臺(tái)線程取數(shù)據(jù),主線程顯示』的程序策略。

Schedule的原理

下面呢

變換

RxJava 提供了對(duì)事件序列進(jìn)行變換的支持,這是它的核心功能之一

所謂變換,就是將事件序列中的對(duì)象或者整個(gè)序列進(jìn)行加工處理,轉(zhuǎn)換成不同的事件或者事件序列。

API

map()

Observable.just(R.mipmap.water)
                .map(new Func1<Integer, Bitmap>() {
                    @Override
                    public Bitmap call(Integer s) {
                        return BitmapFactory.decodeResource(getResources(),s);
                    }
                })
                .observeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Bitmap>() {
                    @Override
                    public void call(Bitmap bitmap) {
                        iv_demo.setImageBitmap(bitmap);
                    }
                });

Func1和Action1非常相似。也是RXJava中的一個(gè)接口。用于包裝有一個(gè)參數(shù)的方法。Func1和Action1的區(qū)別在于,Func1是由返回值得。另外,和 ActionX 一樣, FuncX 也有多個(gè),用于不同參數(shù)個(gè)數(shù)的方法。FuncX 和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法。

map()方法將參數(shù)中的string對(duì)象轉(zhuǎn)換成BitMap對(duì)象后返回,經(jīng)過(guò)map方法后,事件的參數(shù)也隨之變成的bitmap

  • map():事件對(duì)象的直接變換,是最常見(jiàn)的轉(zhuǎn)換。

[圖片上傳失敗...(image-2a65d2-1512961567324)]

  • flatMap() 這是一個(gè)很有用但非常難理解的變換,因此我決定花多些篇幅來(lái)介紹它。 首先假設(shè)這么一種需求:假設(shè)有一個(gè)數(shù)據(jù)結(jié)構(gòu)『學(xué)生』,現(xiàn)在需要打印出一組學(xué)生的名字。實(shí)現(xiàn)方式很簡(jiǎn)單:
Observable.from(students)
                .map(new Func1<Student, String>() {
                    @Override
                    public String call(Student student) {
                        return student.getName();
                    }
                })
                .observeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.d(TAG, "call: "+s);
                    }
                });

很簡(jiǎn)單。那么再假設(shè):如果要打印出每個(gè)學(xué)生所需要修的所有課程的名稱呢?(需求的區(qū)別在于,每個(gè)學(xué)生只有一個(gè)名字,但卻有多個(gè)課程。)首先可以這樣實(shí)現(xiàn):

 Observable.from(students)
                .subscribe(new Action1<Student>() {
                    @Override
                    public void call(Student student) {
                        List<Course> courses = student.getCourses();
                        for(Course course : courses){
                            Log.d(TAG, "onNext: "+ course.getName() + "    Student : " +student.getName());
                        }
                    }
                });

依然很簡(jiǎn)單。那么如果我不想在 Subscriber 中使用 for 循環(huán),而是希望 Subscriber 中直接傳入單個(gè)的 Course 對(duì)象呢(這對(duì)于代碼復(fù)用很重要)?用 map() 顯然是不行的,因?yàn)?map() 是一對(duì)一的轉(zhuǎn)化,而我現(xiàn)在的要求是一對(duì)多的轉(zhuǎn)化。那怎么才能把一個(gè) Student 轉(zhuǎn)化成多個(gè) Course 呢? 這時(shí)候就需要flatMap了

Observable.from(students)
                .flatMap(new Func1<Student, Observable<Course>>() {
                    @Override
                    public Observable<Course> call(Student student) {
                        return Observable.from(student.getCourses());
                    }
                }).subscribe(new Action1<Course>() {
            @Override
            public void call(Course course) {
                Log.d(TAG, "call: " + course.getName());
            }
        });

flatMap和map有個(gè)相同點(diǎn):也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個(gè)對(duì)象。但是需要注意的是flatMap返回的對(duì)象是Observable對(duì)象。并且這個(gè)Observable對(duì)象不是直接發(fā)送到了Subscriber的回調(diào)方法中。

flatMap的原理是這樣的

  1. 使用傳入的事件對(duì)象創(chuàng)建一個(gè)Observable對(duì)象。
  2. 并不發(fā)送這個(gè)Observable對(duì)象,而是將它激活,于是它開(kāi)始發(fā)送事件。
  3. 每一個(gè)創(chuàng)建出來(lái)的Observable發(fā)送的事件,都匯入同一個(gè)Observable對(duì)象,而這個(gè)observable對(duì)象負(fù)責(zé)將這些事件傳入Subscriber對(duì)象。

這三個(gè)步驟吧事件分成了兩級(jí),通過(guò)一組新創(chuàng)建的Observable將初始的對(duì)象鋪平,之后通過(guò)統(tǒng)一路徑分發(fā)下去,而這個(gè)『鋪平』就是 flatMap() 所謂的 flat

flatMap()示意圖

[圖片上傳失敗...(image-a79136-1512961567324)]

擴(kuò)展

由于可以在嵌套的 Observable 中添加異步代碼, flatMap() 也常用于嵌套的異步操作,例如嵌套的網(wǎng)絡(luò)請(qǐng)求。示例代碼(Retrofit + RxJava):

networkClient.token() // 返回 Observable<String>,在訂閱時(shí)請(qǐng)求 token,并在響應(yīng)后發(fā)送 token
    .flatMap(new Func1<String, Observable<Messages>>() {
        @Override
        public Observable<Messages> call(String token) {
            // 返回 Observable<Messages>,在訂閱時(shí)請(qǐng)求消息列表,并在響應(yīng)后發(fā)送請(qǐng)求到的消息列表
            return networkClient.messages();
        }
    })
    .subscribe(new Action1<Messages>() {
        @Override
        public void call(Messages messages) {
            // 處理顯示消息列表
            showMessages(messages);
        }
    });

傳統(tǒng)的嵌套請(qǐng)求需要使用嵌套的 Callback 來(lái)實(shí)現(xiàn)。而通過(guò) flatMap() ,可以把嵌套的請(qǐng)求寫(xiě)在一條鏈中,從而保持程序邏輯的清晰。

  • throttleFirst(): 在每次事件觸發(fā)后的一定時(shí)間間隔內(nèi)丟棄新的事件。常用作去抖動(dòng)過(guò)濾,例如按鈕的點(diǎn)擊監(jiān)聽(tīng)器: RxView.clickEvents(button) // RxBinding 代碼,后面的文章有解釋 .throttleFirst(500, TimeUnit.MILLISECONDS) // 設(shè)置防抖間隔為 500ms .subscribe(subscriber); 媽媽再也不怕我的用戶手抖點(diǎn)開(kāi)兩個(gè)重復(fù)的界面啦。 |si rou te| 喉嚨 壓制 節(jié)流 減速 窒息

變換的原理 lift()

這些變化雖然功能不一樣,但實(shí)質(zhì)上都是針對(duì)事件的處理在發(fā)送。而在RXjava的內(nèi)部,他們都是基于同一個(gè)基礎(chǔ)的方法變化,lift(Operator)。

// 注意:這不是 lift() 的源碼,而是將源碼中與性能、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼。
// 如果需要看源碼,可以去 RxJava 的 GitHub 倉(cāng)庫(kù)下載。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber); // 這個(gè)onSubscribe是原始的OnSubScribe對(duì)象!!
        }
    });
}

這段代碼,它生成了一個(gè)新的Observable并且返回,新創(chuàng)建的Observable中的參數(shù)OnSubscribe的回調(diào)方法call()的實(shí)現(xiàn),和Observable.Subscribe()基本一樣,但是是由區(qū)別的。

不一樣的地方在與OnSubscrvable中call(subscribe)所指代了對(duì)象不同

  • 當(dāng)使用lift方法時(shí),
  1. 假設(shè)有一個(gè)Observable<T>調(diào)用了lift()并創(chuàng)建Observable后,一共有個(gè)Observable。
  2. 同樣的Observable的參數(shù)OnSubscribe,加上之前原始的Observable里面的原始OnSubscribe,也就有了兩個(gè) OnSubscribe;
  3. 然后調(diào)用Observable.create傳入Observable<R>,觸發(fā)onSubscribe的Call方法,也是就override的方法,
  4. 在該方法中 調(diào)用了OnSubscribe.call()方法,注意:這個(gè)OnSubscribe方法是原始的Observable<T>的onSubscribe<T>對(duì)象。他需要傳入一個(gè)Subscriber對(duì)象,這個(gè)對(duì)象是通過(guò)Subscriber newSubscriber = operator.call(subscriber);operator.call()方法生成的新的Subscribe。正是這個(gè)operator對(duì)象將兩個(gè)Subscriber對(duì)象聯(lián)系起來(lái)的。OnSubscribe<T>在執(zhí)行Subscriber<R>.onNext(R r),而這里從T變成R,正好用到了傳到Operator中的參數(shù)Func1<T, R>。

這樣就實(shí)現(xiàn)了 lift() 過(guò)程,有點(diǎn)像一種代理機(jī)制,通過(guò)事件攔截和處理實(shí)現(xiàn)事件序列的變換。

也可以這個(gè)說(shuō):在Observable執(zhí)行了lift(Operator)方法后,會(huì)返回一個(gè)新的Observable,這個(gè)新的Observable會(huì)象一個(gè)代理一樣,負(fù)責(zé)接受原始的Observable發(fā)出的事件,并在處理后發(fā)送給Subscriber

[圖片上傳失敗...(image-8fc12c-1512961567324)]

[圖片上傳失敗...(image-925ff1-1512961567324)]

多次調(diào)用

[圖片上傳失敗...(image-eb30f8-151296156732

舉個(gè)例子

Observable.just(1.34f, 8.3453f, -534.34f, 392.99f)
        .map(new Func1<Float, Integer>() {
            @Override
            public Integer call(Float aFloat) {
                return Math.round(aFloat);
            }
        })
        .map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return Integer.toBinaryString(integer);
            }
        })
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                log("2 map onNext->" + s);
            }
        });

// outputs
// 2 map onNext->1
// 2 map onNext->1000
// 2 map onNext->11111111111111111111110111101010
// 2 map onNext->110001001

該例子是一個(gè)Float->Integer->String的轉(zhuǎn)換。我們按上面的流程來(lái)分析。

  1. 生成一個(gè)Observable<Float>
  2. 調(diào)用map生成Observable<Integer>
  3. 調(diào)用map生成Observable<String>
  4. subscribe()傳入一個(gè)Subscribe(String),至此流的前半部分全部完成。
  5. 執(zhí)行開(kāi)始,Subscribe<String>發(fā)送事件,先生成一個(gè)Subscrver<Integer>傳給Observable<Integer>(Observable<Integer>.onSubscribe.call())。
  6. Observable<Interger>開(kāi)始發(fā)送事件,同樣生成一個(gè)Subscriber<Float>傳給Observable<Float>(Observable<Float>.onSubscribe.call())。
  7. 真正的發(fā)送事件開(kāi)始,Observable<Float>調(diào)用Subscriber<Float>.onNext(Float)等方法,同時(shí)Subscriber<Integer>.onNext(Integer)被調(diào)用,同時(shí)Subscriber<String>.onNext(String)被調(diào)用,事件發(fā)送完成。

compose對(duì)Observable整體的變換 |com pou si| 構(gòu)成 組成

除了lift方法外,Observable還有一個(gè)變換方法叫 compose(Transformer)它和lift的區(qū)別在于lift是針對(duì)事件項(xiàng)和事件序列,而compose是針對(duì)observable自身進(jìn)行變換。

假設(shè)在程序中有多個(gè) Observable ,并且他們都需要應(yīng)用一組相同的 lift() 變換。

public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
    @Override
    public Observable<String> call(Observable<Integer> observable) {
        return observable
            .lift1()
            .lift2()
            .lift3()
            .lift4();
    }
}
...
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);

像上面這樣,使用 compose() 方法,Observable 可以利用傳入的 Transformer 對(duì)象的 call 方法直接對(duì)自身進(jìn)行處理,也就不必被包在方法的里面了。

Scheduler的API(二)

利用 subscribeOn() 結(jié)合 observeOn() 來(lái)實(shí)現(xiàn)線程控制,讓事件的產(chǎn)生和消費(fèi)發(fā)生在不同的線程。

能不能可以多次切換線程
答案是能。因?yàn)镺bserveOn指定的是

操作符分類

面我按類別把常用操作符分別介紹,其實(shí)很多內(nèi)容都是來(lái)自于ReactiveX的官方網(wǎng)站,英文比較好的朋友可以參考(http://reactivex.io/)。
按照官方的分類,操作符大致分為以下幾種:

Creating Observables(Observable的創(chuàng)建操作符),比如:Observable.create()、Observable.just()、Observable.from()等等;
Transforming Observables(Observable的轉(zhuǎn)換操作符),比如:observable.map()、observable.flatMap()、observable.buffer()等等;
Filtering Observables(Observable的過(guò)濾操作符),比如:observable.filter()、observable.sample()、observable.take()等等;
Combining Observables(Observable的組合操作符),比如:observable.join()、observable.merge()、observable.combineLatest()等等;
Error Handling Operators(Observable的錯(cuò)誤處理操作符),比如:observable.onErrorResumeNext()、observable.retry()等等;
Observable Utility Operators(Observable的功能性操作符),比如:observable.subscribeOn()、observable.observeOn()、observable.delay()等等;
Conditional and Boolean Operators(Observable的條件操作符),比如:observable.amb()、observable.contains()、observable.skipUntil()等等;
Mathematical and Aggregate Operators(Observable數(shù)學(xué)運(yùn)算及聚合操作符),比如:observable.count()、observable.reduce()、observable.concat()等等;
其他如observable.toList()、observable.connect()、observable.publish()等等;

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 我從去年開(kāi)始使用 RxJava ,到現(xiàn)在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy閱讀 5,763評(píng)論 7 62
  • 前言我從去年開(kāi)始使用 RxJava ,到現(xiàn)在一年多了。今年加入了 Flipboard 后,看到 Flipboard...
    占導(dǎo)zqq閱讀 9,321評(píng)論 6 151
  • 最近項(xiàng)目里面有用到Rxjava框架,感覺(jué)很強(qiáng)大的巨作,所以在網(wǎng)上搜了很多相關(guān)文章,發(fā)現(xiàn)一片文章很不錯(cuò),今天把這篇文...
    Scus閱讀 6,988評(píng)論 2 50
  • 今天算是近來(lái)最忙的一天,這種忙來(lái)自于平時(shí)的懶散積累導(dǎo)致的,大概鋝了一下,對(duì)于這半年來(lái)做了如下的總結(jié): 禮拜與禱告不...
    尙熏閱讀 545評(píng)論 0 0
  • 這個(gè)老師,英國(guó)人。上面寫(xiě)的??此沁厡?xiě)著年齡48,他問(wèn)我?guī)讱q了,估計(jì)在上面學(xué)習(xí)的都是學(xué)生,我跟他說(shuō)很老了,三十多了...
    不曉得rita閱讀 162評(píng)論 0 0

友情鏈接更多精彩內(nèi)容