RxJava的詳細(xì)介紹

一.項(xiàng)目引入依賴

在開始之前,首先給出GitHub上的鏈接

Github
https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid

引入依賴

compile 'io.reactivex:rxjava:1.0.14' 
compile 'io.reactivex:rxandroid:1.0.1' 

二.RxJava是什么

RxJava是一個(gè)在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫。

RxJava本質(zhì)上是一個(gè)實(shí)現(xiàn)異步操作的庫

三.RxJava的優(yōu)勢

簡潔,在程序邏輯變得越來越復(fù)雜時(shí),可以保持程序的簡潔。

四.API 介紹和原理

觀察者模式

RxJava 的異步實(shí)現(xiàn),是通過一種擴(kuò)展的觀察者模式來實(shí)現(xiàn)的。

觀察者設(shè)計(jì)模式:對象間的一種一對多的依賴關(guān)系,以便一個(gè)對象的狀態(tài)發(fā)生變化時(shí),所有依賴于它的對象都得到通知并自動(dòng)刷新。

RxJava 的觀察者模式

RxJava 有四個(gè)基本概念:Observable (被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。被觀察者和觀察者通過 subscribe() 方法實(shí)現(xiàn)訂閱關(guān)系,從而被觀察者可以在需要的時(shí)候發(fā)出事件來通知觀察者。

RxJava 的事件回調(diào)方法 onNext()、onCompleted() 和 onError()。

  • onNext():RxJava把每個(gè)事件單獨(dú)處理,當(dāng)一個(gè)事件隊(duì)列完成后,會(huì)調(diào)用onNext()方法。
  • onCompleted(): 事件隊(duì)列結(jié)束,當(dāng)不會(huì)再有新的 onNext() 發(fā)出時(shí),需要觸發(fā) onCompleted() 方法作為完成標(biāo)志。
  • onError(): 事件隊(duì)列異常。在事件處理過程中出異常時(shí),onError() 會(huì)被觸發(fā),同時(shí)隊(duì)列自動(dòng)終止,不允許再有事件發(fā)出。

注意:onCompleted() 和 onError() 二者也是互斥的,即在隊(duì)列中調(diào)用了其中一個(gè),就不應(yīng)該再調(diào)用另一個(gè)。

基本實(shí)現(xiàn)

RxJava 的基本實(shí)現(xiàn)主要有三點(diǎn):

  • 創(chuàng)建 Observer(觀察者)
Observer<String> observer = new Observer<String>()
{
            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext方法執(zhí)行了: " + s);
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "Completed方法執(zhí)行了");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError方法執(zhí)行了!");
            }
 };

除了 Observer 接口之外,RxJava 還內(nèi)置了一個(gè)實(shí)現(xiàn)了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 接口進(jìn)行了一些擴(kuò)展,但他們的基本使用方式是完全一樣的

Subscriber<String> subscriber = new Subscriber<String>()
{
            @Override
            public void onCompleted()
            {
                Log.i(TAG, "Completed方法執(zhí)行了");
            }
            @Override
            public void onError(Throwable e)
            {
                Log.i(TAG, "onError方法執(zhí)行了!");
            }
            @Override
            public void onNext(String s)
            {
                Log.i(TAG, "onNext方法執(zhí)行了: " + s);
            }
};

兩者的使用方式是一樣的,實(shí)質(zhì)上,在 RxJava 的 subscribe 過程中,Observer 也總是會(huì)先被轉(zhuǎn)換成一個(gè) Subscriber 再使用,它們的區(qū)別主要有兩點(diǎn):

第一:onStart(),這是 Subscriber 增加的方法。它會(huì)在 subscribe 剛開始,而事件還未發(fā)送之前被調(diào)用,可以用于做一些準(zhǔn)備工作,例如數(shù)據(jù)的清零或重置。它總是在 subscribe 所發(fā)生的線程被調(diào)用,而不能指定線程。要在指定的線程來做準(zhǔn)備工作,可以使用 doOnSubscribe() 方法。

第二:unsubscribe(),這是 Subscriber 所實(shí)現(xiàn)的另一個(gè)接口 Subscription 的方法,用于取消訂閱。在這個(gè)方法被調(diào)用后,Subscriber 將不再接收事件。一般在這個(gè)方法調(diào)用前,可以使用 isUnsubscribed() 先判斷一下狀態(tài),可以在onPause() onStop() 等方法中調(diào)用 unsubscribe() 來解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生。

  • 創(chuàng)建Observable(被觀察者)
Observable observable = Observable.create(new Observable.OnSubscribe<String>()
{
            @Override
            public void call(Subscriber<? super String> subscriber)
            {
                subscriber.onNext("Hello");
                subscriber.onNext("Rxjava");
                subscriber.onNext("RxAndroid");
                subscriber.onCompleted();
            }
});

可以看到,這里傳入了一個(gè) OnSubscribe 對象作為參數(shù)。OnSubscribe 會(huì)被存儲(chǔ)在返回的 Observable 對象中,它的作用相當(dāng)于一個(gè)計(jì)劃表,當(dāng) Observable 被訂閱的時(shí)候,OnSubscribe 的 call() 方法會(huì)自動(dòng)被調(diào)用,事件序列就會(huì)依照設(shè)定依次觸發(fā)。

create() 方法是 RxJava 最基本的創(chuàng)造事件序列的方法。 RxJava 還提供了一些快捷方法用來創(chuàng)建事件隊(duì)列

just(T...): 將傳入的參數(shù)依次發(fā)送出來。

Observable observable = Observable.just("Hello", "Rxjava", "RxAndroid");

from(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組或 Iterable 拆分成具體對象后,依次發(fā)送出來。

String[] strings = {"Hello", "Rxjava", "RxAndroid"};
Observable observable = Observable.from(strings);
  • Subscribe (訂閱)
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

基本實(shí)現(xiàn)的完整代碼:


        //觀察者Observer
        Observer<String> observer = new Observer<String>()
        {
            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext方法執(zhí)行了: " + s);
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "Completed方法執(zhí)行了");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError方法執(zhí)行了!");
            }
        };
        

        //被觀察者
        Observable observable = Observable.create(new Observable.OnSubscribe<String>()
        {
            @Override
            public void call(Subscriber<? super String> subscriber)
            {
                subscriber.onNext("Hello");
                subscriber.onNext("Rxjava");
                subscriber.onNext("RxAndroid");
                subscriber.onCompleted();
            }
        });

        //被觀察者訂閱觀察者
        observable.subscribe(observer);
       

或者這樣寫

        Subscriber<String> subscriber = new Subscriber<String>()
        {
            @Override
            public void onCompleted()
            {
                Log.i(TAG, "Completed方法執(zhí)行了");
            }
            @Override
            public void onError(Throwable e)
            {
                Log.i(TAG, "onError方法執(zhí)行了!");
            }
            @Override
            public void onNext(String s)
            {
                Log.i(TAG, "onNext方法執(zhí)行了: " + s);
            }
        };


        //被觀察者
        Observable observable = Observable.create(new Observable.OnSubscribe<String>()
        {
            @Override
            public void call(Subscriber<? super String> subscriber)
            {
                subscriber.onNext("Hello");
                subscriber.onNext("Rxjava");
                subscriber.onNext("RxAndroid");
                subscriber.onCompleted();
            }
        });

        //被觀察者訂閱觀察者
        observable.subscribe(subscriber);

在或者這樣寫

        //觀察者Observer
        Observer<String> observer = new Observer<String>()
        {
            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext方法執(zhí)行了: " + s);
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "Completed方法執(zhí)行了");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError方法執(zhí)行了!");
            }
        };

        Observable observable = Observable.just("Hello","Rxjava","RxAndroid");
        //String[] strings = {"Hello","Rxjava","RxAndroid"};
        //Observable observable = Observable.from(strings);
        //被觀察者訂閱觀察者
        observable.subscribe(observer);

執(zhí)行結(jié)果:

01-16 13:35:02.734 12509-12509/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了: Hello
01-16 13:35:02.734 12509-12509/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了: Rxjava
01-16 13:35:02.734 12509-12509/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了: RxAndroid
01-16 13:35:02.734 12509-12509/com.zhoujian.rxjava I/MainActivity: Completed方法執(zhí)行了

自定義回調(diào)

  • 被觀察者(Observable)
 Observable observable = Observable.just("Hello","Rxjava","RxAndroid");
  • 自定義回調(diào)的觀察者
        //自定義回調(diào)
        Action1<String> nextAction = new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i(TAG, "自定義回調(diào)----onNext方法執(zhí)行了: " + s);
            }
        };
        Action1<Throwable> errorAction = new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                Log.i(TAG, "自定義回調(diào)----onError方法執(zhí)行了!");
            }
        };
        Action0 completeAction = new Action0() {
            @Override
            public void call() {
                Log.i(TAG, "自定義回調(diào)----Completed方法執(zhí)行了");
            }
        };
        
  • 訂閱(subscribe)
//observable.subscribe(nextAction);
//observable.subscribe(nextAction,errorAction);
observable.subscribe(nextAction,errorAction,completeAction);

注意: Action1 和Action0 是 RxJava 的接口,Action1表示有一個(gè)返回值,Action0表示沒有返回值。

執(zhí)行結(jié)果:

01-16 13:50:26.964 12509-12509/com.zhoujian.rxjava I/MainActivity: 自定義回調(diào)----onNext方法執(zhí)行了: Hello
01-16 13:50:26.964 12509-12509/com.zhoujian.rxjava I/MainActivity: 自定義回調(diào)----onNext方法執(zhí)行了: Rxjava
01-16 13:50:26.964 12509-12509/com.zhoujian.rxjava I/MainActivity: 自定義回調(diào)----onNext方法執(zhí)行了: RxAndroid
01-16 13:50:26.964 12509-12509/com.zhoujian.rxjava I/MainActivity: 自定義回調(diào)----Completed方法執(zhí)行了

舉個(gè)例子a:打印數(shù)組中的元素

        String[] names = {"周杰倫","周星馳","周潤發(fā)"};
        //利用自定義回調(diào)
        Observable.from(names).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i(TAG, "姓名:=="+s);
            }
        });

執(zhí)行結(jié)果:

01-16 13:55:42.937 12509-12509/com.zhoujian.rxjava I/MainActivity: 姓名:==周杰倫
01-16 13:55:42.937 12509-12509/com.zhoujian.rxjava I/MainActivity: 姓名:==周星馳
01-16 13:55:42.937 12509-12509/com.zhoujian.rxjava I/MainActivity: 姓名:==周潤發(fā)

舉個(gè)例子b:加載顯示圖片

        final ImageView img = (ImageView) findViewById(R.id.img);
        //被觀察者
        Observable observable = Observable.create(new Observable.OnSubscribe<Drawable>() {
            @Override
            public void call(Subscriber<? super Drawable> subscriber) {
                Drawable drawable = getResources().getDrawable(R.mipmap.my);
                subscriber.onNext(drawable);
                subscriber.onCompleted();
            }
        });

        //觀察者
        Observer<Drawable> observer=  new Observer<Drawable>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted方法執(zhí)行了");
            }
            @Override
            public void onError(Throwable e) {
                Toast.makeText(MainActivity.this, e.toString(), Toast.LENGTH_SHORT).show();
            }
            @Override
            public void onNext(Drawable drawable) {
                Log.d(TAG, "onNext方法執(zhí)行了");
                img.setImageDrawable(drawable);
                Log.d(TAG, "圖片加載完成了!");
            }
        };
        //被觀察者訂閱觀察者
        observable.subscribe(observer);

執(zhí)行結(jié)果:

01-16 14:09:06.752 10355-10355/com.zhoujian.rxjava D/MainActivity: onNext方法執(zhí)行了
01-16 14:09:06.752 10355-10355/com.zhoujian.rxjava D/MainActivity: 圖片加載完成了!
01-16 14:09:06.753 10355-10355/com.zhoujian.rxjava D/MainActivity: onCompleted方法執(zhí)行了

Snip20170116_1.png

Scheduler(線程控制)

在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在哪個(gè)線程調(diào)用 subscribe(),就在哪個(gè)線程生產(chǎn)事件;在哪個(gè)線程生產(chǎn)事件,就在哪個(gè)線程消費(fèi)事件。如果需要切換線程,就需要用到 Scheduler。

  • Schedulers.immediate(): 直接在當(dāng)前線程運(yùn)行,相當(dāng)于不指定線程。這是默認(rèn)的 Scheduler。
  • Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作。
  • Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。
  • Schedulers.computation(): 計(jì)算所使用的 Scheduler。這個(gè)計(jì)算指的是 CPU 密集型計(jì)算,即不會(huì)被 I/O 等操作限制性能的操作,例如圖形的計(jì)算
  • Android 有一個(gè)專用AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運(yùn)行。

使用 subscribeOn() 和 observeOn() 兩個(gè)方法來對線程進(jìn)行控制。
subscribeOn(): 指定事件產(chǎn)生所在線程。
observeOn(): 指定事件消費(fèi)所在的線程。

還是以加載顯示圖片的例子來說明

        final ImageView img = (ImageView) findViewById(R.id.img);
        Observable.create(new Observable.OnSubscribe<Drawable>() {
            @Override
            public void call(Subscriber<? super Drawable> subscriber) {
                Drawable drawable = getResources().getDrawable(R.mipmap.my);
                subscriber.onNext(drawable);
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.io())//上面的call方法發(fā)生在io線程
                . observeOn(AndroidSchedulers.mainThread())//下面顯示圖片發(fā)生在主線程
                . subscribe(new Observer<Drawable>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted方法執(zhí)行了");
                    }
                    @Override
                    public void onError(Throwable e) {
                        Toast.makeText(MainActivity.this, e.toString(), Toast.LENGTH_SHORT).show();
                    }
                    @Override
                    public void onNext(Drawable drawable) {
                        Log.d(TAG, "onNext方法執(zhí)行了");
                        img.setImageDrawable(drawable);
                        Log.d(TAG, "圖片加載完成了!");
                    }
                });

說明:被觀察者的代碼運(yùn)行在io線程中,觀察者的代碼運(yùn)行在主線程中,符合Android的在子線程中獲取數(shù)據(jù),在主線程中顯示數(shù)據(jù),更新界面。

變換

變換,就是將事件序列中的對象或整個(gè)序列進(jìn)行加工處理,轉(zhuǎn)換成不同的事件或事件序列。

  • 變換之map():一對一的轉(zhuǎn)換

例子a:把String類型轉(zhuǎn)換成Integer

        //被觀察者
        Observable.just("100")
         //變換----map
        .map(new Func1<String, Integer>()
        {
                @Override
                public Integer call(String s)
                {
                     return Integer.valueOf(s);
                }
        })
         //自定義觀察者
        .subscribe(new Action1<Integer>()
        {
                 @Override
                 public void call(Integer integer)
                 {
                     Log.d(TAG, "String轉(zhuǎn)換成Integer完成了,integer的值為:"+integer.intValue());
                 }
        });

執(zhí)行結(jié)果:

01-16 14:56:59.117 10355-10355/com.zhoujian.rxjava D/MainActivity: String轉(zhuǎn)換成Integer完成了,integer的值為:100

Func1 和 Action1 非常相似,也是 RxJava 的一個(gè)接口,用于包裝含有一個(gè)參數(shù)的方法。 Func1 和 Action 的區(qū)別在于, Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣, FuncX 也有多個(gè),用于不同參數(shù)個(gè)數(shù)的方法。FuncX 和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法。

可以看到,map() 方法將參數(shù)中的 String 對象轉(zhuǎn)換成一個(gè) Integer 對象后返回,而在經(jīng)過 map() 方法后,事件的參數(shù)類型也由 String 轉(zhuǎn)為了 Integer。這種直接變換對象并返回的,是最常見的也最容易理解的變換。

例子b:把JavaBean對象轉(zhuǎn)換成String類型輸出

首先封裝一個(gè)演員Actor的JavaBean

package com.zhoujian.rxjava.bean;

/**
 * Created by zhoujian on 2016/12/29.
 */

public class Actor
{
    private String name;
    private String  sex;

    public Actor(String name, String sex)
    {
        this.name = name;
        this.sex = sex;
    }

    public String getName()
    {
        return name;
    }

    public void setName(String name)
    {
        this.name = name;
    }

    public String getSex()
    {
        return sex;
    }

    public void setSex(String sex)
    {
        this.sex = sex;
    }

    @Override
    public String toString()
    {
        return "Actor{" +  "name='" + name + '\'' + ", sex='" + sex + '\'' + '}';
    }
}

再封裝一個(gè)電影Movie的JavaBean,一個(gè)電影中有多個(gè)演員

package com.zhoujian.rxjava.bean;

import java.util.List;

/**
 * Created by zhoujian on 2016/12/29.
 */

public class Movie
{

    private String name;
    private int id;
    private String data;
    private List<Actor> mactorList;

    public Movie(String data, int id, List<Actor> mactorList, String name)
    {
        this.data = data;
        this.id = id;
        this.mactorList = mactorList;
        this.name = name;
    }


    public String getData() {
        return data;
    }

    public void setData(String data) {
        this.data = data;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public List<Actor> getMactorList() {
        return mactorList;
    }

    public void setMactorList(List<Actor> mactorList) {
        this.mactorList = mactorList;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }


    @Override
    public String toString() {
        return "Movie{" +  "data='" + data + '\'' + ", name='" + name + '\'' +", id=" + id + ", mactorList=" + mactorList + '}';
    }
}

準(zhǔn)備數(shù)據(jù)

        Actor actor1 = new Actor("周星馳", "男");
        Actor actor2 = new Actor("張柏芝", "女");
        ArrayList<Actor> movie1List = new ArrayList<Actor>();
        movie1List.add(actor1);
        movie1List.add(actor2);
        Movie movie1 = new Movie("1998-10-14", 1, movie1List, "喜劇之王");
        Actor actor3 = new Actor("羅志祥", "男");
        Actor actor4 = new Actor("張雨綺", "女");
        ArrayList<Actor> movie2List = new ArrayList<Actor>();
        movie2List.add(actor3);
        movie2List.add(actor4);
        Movie movie2 = new Movie("2016-05-01", 2, movie2List, "美人魚");
        Movie[] movies = {movie1, movie2};

把JavaBean對象轉(zhuǎn)換成String類型輸出

        //定義被觀察者
        Observable observable=  Observable.from(movies).map(new Func1<Movie, String>() {
            @Override
            public String call(Movie movie) {
                return movie.toString();
            }
        });

        //定義觀察者
        Observer observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.i(TAG, "Completed方法執(zhí)行了");
            }
            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError方法執(zhí)行了");
            }
            @Override
            public void onNext(String s) {

                Log.i(TAG, "onNext方法執(zhí)行了");
                Log.i(TAG,"s==="+s);
            }
        };

        //被觀察者訂閱觀察者
        observable.subscribe(observer);

執(zhí)行結(jié)果:

01-16 15:34:20.291 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了
01-16 15:34:20.291 31780-31780/com.zhoujian.rxjava I/MainActivity: s===Movie{data='1998-10-14', name='喜劇之王', id=1, mactorList=[Actor{name='周星馳', sex='男'}, Actor{name='張柏芝', sex='女'}]}
01-16 15:34:20.291 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了
01-16 15:34:20.291 31780-31780/com.zhoujian.rxjava I/MainActivity: s===Movie{data='2016-05-01', name='美人魚', id=2, mactorList=[Actor{name='羅志祥', sex='男'}, Actor{name='張雨綺', sex='女'}]}
01-16 15:34:20.291 31780-31780/com.zhoujian.rxjava I/MainActivity: Completed方法執(zhí)行了

  • 變換之flatMap():一對多的轉(zhuǎn)換

還是上述電影的例子,如果把一個(gè)電影轉(zhuǎn)換成多個(gè)演員輸出,這就要用到flatMap(),一對多的轉(zhuǎn)換。


        Observer<Actor> observer = new Observer<Actor>() {
            @Override
            public void onCompleted() {
                Log.i(TAG, "Completed方法執(zhí)行了");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError方法執(zhí)行了");
            }
            @Override
            public void onNext(Actor actor) {
                Log.i(TAG, "onNext方法執(zhí)行了");
                Log.i(TAG, "actor===" + actor.toString());
            }
        };

        Observable.from(movies).flatMap(new Func1<Movie, Observable<Actor>>() {
            @Override
            public Observable<Actor> call(Movie movie) {
                return Observable.from(movie.getMactorList());
            }
        }).subscribe(observer);


執(zhí)行結(jié)果:


01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: actor===Actor{name='周星馳', sex='男'}
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: actor===Actor{name='張柏芝', sex='女'}
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: actor===Actor{name='羅志祥', sex='男'}
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法執(zhí)行了
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: actor===Actor{name='張雨綺', sex='女'}
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: Completed方法執(zhí)行了

從上面的代碼可以看出, flatMap() 和 map() 有一個(gè)相同點(diǎn):它也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個(gè)對象。但需要注意,和 map() 不同的是, flatMap() 中返回的是個(gè) Observable 對象,并且這個(gè) Observable 對象并不是被直接發(fā)送到了 Subscriber 的回調(diào)方法中。

flatMap() 的原理:

第一,使用傳入的事件對象創(chuàng)建一個(gè) Observable 對象。
第二, 并不發(fā)送這個(gè) Observable, 而是將它激活,于是它開始發(fā)送事件。
第三,每一個(gè)創(chuàng)建出來的 Observable 發(fā)送的事件,都被匯入同一個(gè) Observable ,而這個(gè) Observable 負(fù)責(zé)將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法。

變換的原理:lift()

這些變換雖然功能各有不同,但實(shí)質(zhì)上都是針對事件序列的處理和再發(fā)送。而在 RxJava 的內(nèi)部,它們是基于同一個(gè)基礎(chǔ)的變換方法: lift(Operator)


public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);
        }
    });
}

Observable 執(zhí)行了 lift(Operator) 方法之后,會(huì)返回一個(gè)新的 Observable,這個(gè)新的 Observable 會(huì)像一個(gè)代理一樣,負(fù)責(zé)接收原始的 Observable 發(fā)出的事件,并在處理后發(fā)送給 Subscriber。像一種代理機(jī)制,通過事件攔截和處理實(shí)現(xiàn)事件序列的變換。

compose對 Observable 整體的變換

除了 lift() 之外, Observable 還有一個(gè)變換方法叫做 compose(Transformer)。它和 lift() 的區(qū)別在于, lift() 是針對事件項(xiàng)和事件序列的,而 compose() 是針對 Observable 自身進(jìn)行變換。


public class LiftTransformer implements Observable.Transformer<Integer, String> {
    @Override
    public Observable<String> call(Observable<Integer> observable) {
        return observable
            .lift1()
            .lift2()
            .lift3()  
    }
}
...
Transformer liftAll = new LiftTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);

線程的自由控制

還是原來的例子,String轉(zhuǎn)Integer

   Observable.just("100")//主線程中執(zhí)行,由subscribeOn指定
        .subscribeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.io())
        .map(new Func1<String, Integer>()//IO 線程,由 observeOn() 指定
        {
                @Override
                public Integer call(String s)
                {
                     return Integer.valueOf(s);
                }
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<Integer>()// Android 主線程,由 observeOn() 指定
        {
                 @Override
                 public void call(Integer integer)
                 {
                     Log.d(TAG, "String轉(zhuǎn)換成Integer完成了,integer的值為:"+integer.intValue());
                 }
        });

有關(guān)更加詳細(xì)的介紹,請參考:

給 Android 開發(fā)者的 RxJava 詳解

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容