RxJava(2.0)-你可能需要知道這些

作為一個Android開發(fā)從業(yè)者,當(dāng)你處理異步任務(wù)時,如果還在使用著Handler+Thread,那么你可能需要了解下RxJava這個優(yōu)秀的開源框架;當(dāng)然如果你正在跳槽面試,RxJava也是經(jīng)常被問到的框架。
關(guān)于介紹RxJava的文章也非常多,但是很多文章基于的版本還是1.0.X,而本博文就基于2.0版本對RxJava進行一個簡單的介紹和分析,也算是拋磚引玉吧。
本博文基于RxJava 2.0.0版本進行分析講解。
參考:拋物線大神《給 Android 開發(fā)者的 RxJava 詳解》

RxJava是什么?

簡單的歸納為兩個字:異步。

歸納畢竟是歸納,不能完全表明RxJava的概念,那么我們來看GitHub上給出的解釋:
a library for composing asynchronous and event-based programs by using observable sequences.

我用我蹩腳的CET-6水平給大家翻譯下,大概就是這個意思:
一個使用可觀測序列來組成異步的、基于事件的程序的庫。

這對于剛接觸的童鞋們可能不太容易理解,RxJava的核心還是異步,其他的定語都是基于其之上,有了這個思維和認識,再去學(xué)習(xí)RxJava也能更容易接受和理解其設(shè)計。


為什么要使用RxJava?

我寫溜溜的[AsyncTask / Handler / Thread/ ... ],干嘛要使用這個奇怪的RxJava???

還能為什么?簡潔唄。

異步操作的很重要的一點就是保持程序和代碼的簡潔性,Android內(nèi)部提供的AsyncTask以及Handlder+Thread都是為了解決異步代碼編寫繁瑣問題,從而使編寫異步代碼更加簡潔。在保持代碼和程序簡潔這個目的上,RxJava倒是更加的努力和方便,它的優(yōu)點是隨著程序邏輯變得越來越復(fù)雜,它仍然可以保持簡潔、優(yōu)雅。

口說無憑,我們來分析下面這樣一個例子。

圖片展示可能是我們每個Android開發(fā)者都要面對的問題,假設(shè)在我們的Activity上存在一個ListView,并且我們提供了一個addImage方法來任意添加待顯示的圖片?,F(xiàn)在需要將某個目錄下所有的png圖片都加載并顯示在ListView中,由于讀取和解析圖片是一個耗時過程,因此我們需要將這個過程放在后臺執(zhí)行;而圖片的顯示則必須放在主線程(UI線程)中。

那么在沒有使用RxJava時,我們怎么編寫這段代碼呢?

    new Thread() {
            @Override
            public void run() {
                super.run();
                for (File folder : folders) {
                    File[] files = folder.listFiles();
                    for (File file : files) {
                        if (file.getName().endsWith(".png")) {
                            final Bitmap bitmap = getBitmapFromFile(file);
                            ((MainActivity) context).runOnUiThread(new Runnable() {
                                @Override
                                public void run() {
                                    imageList.add(bitmap);
                                    imageListAdatper.notifyDataSetChanged();
                                }

                            });

                        }
                    }
                }
            }
        }.start();

沒有對比,就沒有傷害,如果我們使用RxJava的話,是如何實現(xiàn)的呢?


Observable.fromArray(folders)
                .flatMap(new Function<File, ObservableSource<File>>() {
                    @Override
                    public ObservableSource<File> apply(File file) throws Exception {
                        return Observable.fromArray(file.listFiles());
                    }
                })
                .filter(new Predicate<File>() {
                    @Override
                    public boolean test(File file) throws Exception {
                        return file.getName().endsWith(".png");
                    }
                })
                .map(new Function<File, Bitmap>() {

                    @Override
                    public Bitmap apply(File file) throws Exception {
                        return getBitmapFromFile(file);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Bitmap>() {
                    @Override
                    public void accept(Bitmap bitmap) throws Exception {
                        imageList.add(bitmap);
                        imageListAdatper.notifyDataSetChanged();
                    }
                });

這代碼變簡潔了嗎?這代碼量也沒減少啊,而且這一大堆代碼都是什么意思?。客耆床欢 ?/p>

各位看官,你先消消氣,我們講的簡潔是:邏輯上的簡潔,并不是單純的代碼減少(說實話,我們其實更關(guān)注這個)。

仔細看下這段代碼,之前的if..else呢?之前的那么多循環(huán)呢?好像都不見了,完全是從上到下的一條鏈?zhǔn)秸{(diào)用,而且沒有嵌套(你是不是也討厭好多層的嵌套,反正我是),現(xiàn)在看起來是不是邏輯更加清楚了呢。

此時RxJava的優(yōu)勢還不能完全體現(xiàn)出來,而且看到這么多陌生的函數(shù),你也一定有點不知其解,那么我們就帶著疑惑接著往下看。

API

雖然我知道你有很強的理解和學(xué)習(xí)能力,但是我還是決定要對RxJava的一些常用的API進行介紹和說明,以便你能更順暢的閱讀全文。

1.觀察者模式

RxJava的異步實現(xiàn),是通過一種擴展的觀察者模式來實現(xiàn)的。

我們來看下什么是觀察者模式?

觀察者模式(有時又被稱為發(fā)布(publish )-訂閱(Subscribe)模式、模型-視圖(View)模式、源-收聽者(Listener)模式或從屬者模式)是軟件設(shè)計模式的一種。在此種模式中,一個目標(biāo)物件管理所有相依于它的觀察者物件,并且在它本身的狀態(tài)改變時主動發(fā)出通知。這通常透過呼叫各觀察者所提供的方法來實現(xiàn)。此種模式通常被用來實現(xiàn)事件處理系統(tǒng)。

這是百度給出的解釋,我們在日常編碼中使用的點擊事件的處理就采用了觀察者模式。

clkBtn.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                Toast.makeText(MainActivity.this, "The button was clicked", Toast.LENGTH_LONG).show();

            }
        });

在典型的Click事件處理中,Button就是被觀察者,而我們設(shè)置的OnClickListener就是觀察者,在我們點擊Button時,OnClickListener的onClick方法就會被回調(diào)。

2. RxJava的觀察者模式

2.1 幾個對象

我們先來了解下RxJava給我們提供的幾個常用的對象。

  • FLowable與Observable

在2.0版本中被觀察者新的實現(xiàn)叫做Flowable, 同時舊的Observable也保留了。因為在 RxJava1.x 中,有很多事件不被能正確的背壓,從而拋出MissingBackpressureException。

舉個簡單的例子,在 RxJava1.x 中的 observeOn, 因為是切換了消費者的線程,因此內(nèi)部實現(xiàn)用隊列存儲事件。在 Android 中默認的 buffersize 大小是16,因此當(dāng)消費比生產(chǎn)慢時, 隊列中的數(shù)目積累到超過16個,就會拋出MissingBackpressureException, 初學(xué)者很難明白為什么會這樣,使得學(xué)習(xí)曲線異常得陡峭。

而在 2.0 中,Observable 不再支持背壓,而Flowable 支持非阻塞式的背壓。并且規(guī)范要求,所有的操作符強制支持背壓。

幸運的是,F(xiàn)lowable 中的操作符大多與舊有的 Observable 類似。

  • Observer與Subscriber

Observer就是我們前面提到的觀察者,與Observable組合使用。

Subscriber也被成為訂閱者,一般與Flowable組合使用。

因為Observable不再支持背壓,因此如果我們使用RxJava2.0版本,F(xiàn)lowable可能是你的不二人選。

基于以上的分析,本文以下的示例將采用Flowable進行說明和講解。

2.2 回調(diào)

為什么稱RxJava采用了擴展的觀察者模式呢?我們知道傳統(tǒng)的觀察者回調(diào)接口中只有一個update方法,那么RxJava呢?它可不止一個,讓我們來看下Subscriber的定義。

public interface Subscriber<T> {
    /**
     * Invoked after calling {@link Publisher#subscribe(Subscriber)}.
     * <p>
     * No data will start flowing until {@link Subscription#request(long)} is invoked.
     * <p>
     * It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.
     * <p>
     * The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
     * 
     * @param s
     *            {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
     */
    public void onSubscribe(Subscription s);

    /**
     * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}.
     * 
     * @param t the element signaled
     */
    public void onNext(T t);

    /**
     * Failed terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#request(long)} is invoked again.
     *
     * @param t the throwable signaled
     */
    public void onError(Throwable t);

    /**
     * Successful terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#request(long)} is invoked again.
     */
    public void onComplete();
}

RxJava的觀察者接口中提供了onSubscribe、onNext、onError、onComplete四個回調(diào)方法,而傳統(tǒng)的觀察者模式中只有update一個回調(diào)方法,這也是稱之為擴展的觀察者模式的一部分原因。
下面我們來分析下Subscriber接口中幾個方法:

  1. onSubscribe
    這個方法是2.0之后才有的方法,主要是給觀察者提供了一個終止事件接收的機會(當(dāng)然我們也可以做一些預(yù)處理),它也會首先被調(diào)用。
    要終止接收事件,可以調(diào)用Subscription的cancel方法。

  2. onNext
    我們可以將其理解為傳統(tǒng)觀察者模式回調(diào)接口中的update方法,它可能會被調(diào)用多次。它的調(diào)用順序在onSubscribe之后。

  3. onError
    在事件處理過程中出異常時,onError會被觸發(fā),同時事件隊列自動終止,不會再有事件發(fā)出。

  4. onComplete
    在事件隊列傳遞完畢后,該方法會被調(diào)用。
    在一個正確運行的事件序列中, onComplete()和onError()有且只有一個,并且是事件序列中的最后一個。
    需要注意的是,onComplete() 和 onError() 二者也是互斥的,即在隊列中調(diào)用了其中一個,就不應(yīng)該再調(diào)用另一個。

在一個正確的事件序列中,onError與onComplete互斥且唯一。

相比于傳統(tǒng)的觀察者模式,RxJava使用的擴展觀察者模式好像變得復(fù)雜了,但是從另一方面來講它也更加的豐富了,把更多的主動權(quán)和機會交給了使用者。

3. 實戰(zhàn)

看了那么多的概念,是不是覺得有點枯燥和乏味呢,那我們就開始動手使用RxJava來體驗一下吧。

3.1 引用

怎么在我們的項目中使用RxJava和RxAndroid呢?

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
compile 'io.reactivex.rxjava2:rxjava:2.1.3'

3.2 實例

  1. 1.0 方式
        //定義被觀察者
        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("Hello");
                e.onNext("World");
                e.onNext("!");
                //注意在此調(diào)用onComplete方法結(jié)束事件的處理
                e.onComplete();
            }
        });


        // 定義觀察者
        Observer<String> observer = new Observer<String>() {

            // 該方法會在onNext方法之前調(diào)用
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe->11111");

                // d.dispose();
            }

            @Override
            public void onNext(String value) {
                System.out.println(value);

            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();

            }

            @Override
            public void onComplete() {
                System.out.println("onComplete->222222");
            }
        };

        // 訂閱
        observable.subscribe(observer);
        

  1. 2.0方式
        //創(chuàng)建Flowable對象
        Flowable flowable = Flowable.create(new FlowableOnSubscribe() {
            @Override
            public void subscribe(@NonNull FlowableEmitter e) throws Exception {
                e.onNext("Hello");
                e.onNext("World");
                e.onNext("!");
                //注意在此調(diào)用onComplete方法結(jié)束事件的處理
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER);

        // 定義觀察者
        Subscriber subsrciber= new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("onSubscribe->11111");
            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete->222222");
            }
        };

        // 訂閱
        flowable.subscribe(subsrciber);


  1. 訂閱

訂閱這句代碼看起來好奇怪,主要是subscribe()這個方法有點怪:它看起來是『observalbe 訂閱了 observer / subscriber』而不是『observer / subscriber 訂閱了 observalbe』,這看起來就像『雜志訂閱了讀者』一樣顛倒了對象關(guān)系。這讓人讀起來有點別扭,不過如果把 API 設(shè)計成 observer.subscribe(observable) / subscriber.subscribe(observable) ,雖然更加符合思維邏輯,但對流式 API 的設(shè)計就造成影響了,比較起來明顯是得不償失的。

  1. 運行結(jié)果

分別運行上面的兩段代碼,運行效果相同,如下所示:

onSubscribe->11111
Hello
World
!
onComplete->222222

這可能是最簡單的RxJava使用示例了。

3.3 創(chuàng)建被觀察者

在上面的示例中,我們采用了Observable.create方法來創(chuàng)建被觀察者,并且在subscribe方法中完成了事件的傳遞。
RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊列,我們一起來看一下。

  1. just(T...)

將傳遞的參數(shù),依次發(fā)送出去。

Flowable.just("Hello", "World", "!")
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("World");
// onNext("!");
// onComplete();

這句代碼的效果與上面示例中的效果相同。

  1. from(T[]) / from(Iterable<? extends T>)
    將傳入的數(shù)組或 Iterable 拆分成具體對象后,依次發(fā)送出來。
String[] values = new String[]{"Hello", "Wrold", "!"};

Flowable observable = Flowable.fromArray(values);
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("World");
// onNext("!");
// onComplete();

上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create() 的例子是等價的。

3.4 靈活的事件回調(diào)定義

RxJava支持定義不完整的事件回調(diào)定義,就是我們可以拋棄Subscriber的定義,而只選擇定義其中的一部分回調(diào)。
看下代碼可能會更明了。

        String[] values = new String[]{"Hello", "Wrold", "!"};
        Consumer onNext = new Consumer<String>() {

            @Override
            public void accept(String s) throws Exception {
                System.out.println("onNext:" + s);
            }
        };

        Consumer<? super Throwable> onError = new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                throwable.printStackTrace();
            }
        };

        Action onComplete = new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("onComplete");
            }
        };

        // 自動創(chuàng)建 Subscriber ,并使用 onNextAction 來定義 onNext()
        Flowable.fromArray(values)
                .subscribe(onNext);

        // 自動創(chuàng)建 Subscriber ,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
        Flowable.fromArray(values)
                .subscribe(onNext, onError);

        // 自動創(chuàng)建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
        Flowable.fromArray(values)
                .subscribe(onNext, onError, onComplete);

是不是很靈活?嗯,是的。

3.5 Schedulers

在 RxJava的默認規(guī)則中,事件的發(fā)出和消費都是在同一個線程的,在哪個線程調(diào)用subscribe(),就在哪個線程生產(chǎn)事件;在哪個線程生產(chǎn)事件,就在哪個線程消費事件。
也就是說,如果只用上面的方法,實現(xiàn)出來的只是一個同步的觀察者模式。
觀察者模式本身的目的就是『后臺處理,前臺回調(diào)』的異步機制,因此異步對于RxJava 是至關(guān)重要的。
而要實現(xiàn)異步,則需要用到 RxJava 的另一個概念: Schedulers(調(diào)度器) 。

  1. API

在RxJava中,Scheduler相當(dāng)于線程控制器,RxJava通過它來指定每一段代碼應(yīng)該運行在什么樣的線程。

RxJava已經(jīng)內(nèi)置了一些調(diào)度器,主要有以下幾個:

  • Schedulers.immediate(): 直接在當(dāng)前線程運行,相當(dāng)于不指定線程,這是默認的Scheduler。
  • Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作。
  • Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免創(chuàng)建不必要的線程。
  • Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
  • AndroidSchedulers.mainThread():它指定的操作將在 Android 主線程運行,屬于Android專用的調(diào)度器。

有了這幾個Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了。

  • subscribeOn(): 指定 subscribe() 所發(fā)生的線程,即Flowable.OnSubscribe 被激活時所處的線程?;蛘呓凶鍪录a(chǎn)生的線程。

  • observeOn(): 指定 Subscriber 所運行在的線程?;蛘呓凶鍪录M的線程。

     Flowble.just(1, 2, 3)
              .subscribeOn(Schedulers.io())
              .observeOn(AndroidSchedulers.mainThread())
              .subscribe(new Consumer<Integer>() {
                  @Override
                  public void accept(Integer integer) throws Exception {
                      System.out.println("The receive num is :" + integer);
                  }
              }
    

上面這段代碼中,由于subscribeOn(Schedulers.io()) 的指定,被創(chuàng)建的事件的內(nèi)容 1、2、3 將會在 IO線程發(fā)出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber數(shù)字的打印將發(fā)生在主線程。

事實上,這種在 subscribe() 之前寫上兩句subscribeOn(Scheduler.io()) 和observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常見,它適用于多數(shù)的 『后臺線程取數(shù)據(jù),主線程顯示』的程序策略。

4.變換

RxJava提供了對事件序列進行變換的支持,這是它的核心功能之一,也是大多數(shù)人說『RxJava 真是太好用了』的最大原因。所謂變換,就是將事件序列中的對象或整個序列進行加工處理,轉(zhuǎn)換成不同的事件或事件序列。

在開發(fā)中我們經(jīng)常碰到這樣的場景:從本地讀取并加載圖片。也就是說我們通常的入?yún)⑹且粋€文件路徑,而我們想要得到的是一個BitMap對象,那么如果使用RxJava我們該如何優(yōu)雅的實現(xiàn)呢?

    final String filePath = "/images/logo.png";

        Flowble.just(filePath)
                .map(new Function<String, Bitmap>() {

                    @Override
                    public Bitmap apply(@NonNull String s) throws Exception {
                        return getBitmapFromFile(new File(filePath));
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Bitmap>() {
                    @Override
                    public void accept(Bitmap bitmap) throws Exception {
                        showBitmap(bitmap);
                    }
                });

就問你優(yōu)雅不優(yōu)雅?牛逼不牛逼?

可以看到,map()方法將參數(shù)中的String對象轉(zhuǎn)換成一個Bitmap對象后返回,而在經(jīng)過map()方法后,事件的參數(shù)類型也由 String轉(zhuǎn)為了Bitmap。這種直接變換對象并返回的,是最常見的也最容易理解的變換。

那么常用的事件變換有那些呢?

1. map

事件對象的直接變換,具體功能上面已經(jīng)介紹過,它是RxJava 最常用的變換。
在上面的例子中我們可以看到,map方法將參數(shù)中的 String對象變換為一個 Bitmap對象后返回,而在經(jīng)過 map方法后,事件的參數(shù)類型也由String變?yōu)榱?Bitmap。這種直接變換對象并返回的,是最常見的也最容易理解的變換。

2. flatMap

flatMap和map有共同點,都是將一個對象轉(zhuǎn)換為另一個對象,不同的是map只是一對一的轉(zhuǎn)換,而flatMap可以是一對多的轉(zhuǎn)換,并且是轉(zhuǎn)換為另外一個Flowable對象!
示例如下:

        ArrayList<String[]> list = new ArrayList<>();
        String[] words1 = {"Hello,", "I am", "China!"};
        String[] words2 = {"Hello,", "I am", "Beijing!"};
        list.add(words1);
        list.add(words2);
        Flowable.fromIterable(list)
                .flatMap(new Function<String[], Publisher<String>>() {
                    @Override
                    public Publisher<String> apply(@NonNull String[] strings) throws Exception {
                        return Flowable.fromArray(strings);
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println("Consumer->accept:"+s);

                    }
                });

運行結(jié)果如下所示:

Consumer->accept:Hello,
Consumer->accept:I am
Consumer->accept:China!
Consumer->accept:Hello,
Consumer->accept:I am
Consumer->accept:Beijing!

flatMap的轉(zhuǎn)換可以分解為三個過程:

  1. 根據(jù)傳入的事件生成一個Publisher對象(其實也可以理解為Flowable)。
  2. 激活該Flowable對象發(fā)送事件,而不是直接發(fā)送該Flowable對象。
  3. 同一個Flowable對象發(fā)送的事件都會匯總到Flowable后,F(xiàn)lowable負責(zé)將事件統(tǒng)一傳遞給subsrciber。

3. lift

我們可以將該方法視為map與flatMap的底層調(diào)用實現(xiàn),其目的就是定義我們自己的Operator來完成變換。
lift方法接收一個FlowableOperator的參數(shù),這個FlowableOperator就是定義我們自己的轉(zhuǎn)換操作。
這樣解釋起來可能有些不太明了,下面我們舉兩個簡單的例子來看下怎么使用lift實現(xiàn)map和flatMap的效果。

  • map的lift寫法
    Flowable.just(filePath)
                .lift(new FlowableOperator<Bitmap, String>() {
                    @Override
                    public Subscriber<? super String> apply(@NonNull final Subscriber<? super Bitmap> observer) throws Exception {
                        return new Subscriber<String>() {
                            @Override
                            public void onSubscribe(Subscription s) {
                                observer.onSubscribe(s);
                            }

                            @Override
                            public void onNext(String s) {
                                observer.onNext(getBitmapFromFile(new File(s)));

                            }

                            @Override
                            public void onError(Throwable t) {
                                observer.onError(t);
                            }

                            @Override
                            public void onComplete() {
                                observer.onComplete();

                            }
                        };
                    }
                })
                .subscribe(new Consumer<Bitmap>() {
                    @Override
                    public void accept(Bitmap bitmap) throws Exception {
                        showBitmap(bitmap);
                    }
                });
  • flatMap的lift寫法
Flowable.fromIterable(list)
                .lift(new FlowableOperator<String, String[]>() {

                    @Override
                    public Subscriber<? super String[]> apply(@NonNull final Subscriber<? super String> observer) throws Exception {
                        return new Subscriber<String[]>() {
                            @Override
                            public void onSubscribe(Subscription s) {
                                observer.onSubscribe(s);
                            }

                            @Override
                            public void onNext(String[] strings) {
                                Flowable.fromArray(strings)
                                        .subscribe(new Consumer<String>() {
                                            @Override
                                            public void accept(String s) throws Exception {
                                                observer.onNext(s);
                                            }
                                        });
                            }

                            @Override
                            public void onError(Throwable t) {
                                observer.onError(t);

                            }

                            @Override
                            public void onComplete() {
                                observer.onComplete();
                            }
                        };
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println("accept ->"+s);
                    }
                });

4. range

該方法比較簡單,用于產(chǎn)生int和long型數(shù)字。


    Flowable.range(1,5)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                })

輸出1 2 3 4 5五個數(shù)字。

5. merge

主要用戶合并對象,示例如下:

ArrayList<String> list1 = new ArrayList<>();
        list1.add("1");
        list1.add("2");
        list1.add("3");
        ArrayList<String> list2 = new ArrayList<>();
        list2.add("4");
        list2.add("5");
        list2.add("6");

        Flowable.merge(Flowable.fromIterable(list1), Flowable.fromIterable(list2))
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });

輸出1 2 3 4 5 6。

6. compose

調(diào)解轉(zhuǎn)換的作用,示例如下:

    Flowable.merge(Flowable.fromIterable(list1), Flowable.fromIterable(list2))

                .compose(new FlowableTransformer<String, Integer>() {

                    @Override
                    public Publisher<Integer> apply(@NonNull Flowable<String> upstream) {

                        return upstream.map(new Function<String, Integer>() {
                            @Override
                            public Integer apply(@NonNull String s) throws Exception {
                                return Integer.parseInt(s);
                            }
                        });
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer s) throws Exception {
                        System.out.println(s);
                    }
                });


輸出1 2 3 4 5 6 六個數(shù)字。

7. compose與lift的區(qū)別

兩者都實現(xiàn)了變換的功能,但是變換的內(nèi)容和對象卻不相同。

  • lift實現(xiàn)的是對事件和事件序列的變換。
  • compose實現(xiàn)的是Flowable本身的變換。

5 總結(jié)

至此,我們對RxJava的使用分析告一段落,作為一個牛逼的異步框架,如果能正確的引入到我們的項目中來一定能提高我們效率,降低后期我們的維護成本。
祝各位工作愉快。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 我從去年開始使用 RxJava ,到現(xiàn)在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy閱讀 5,763評論 7 62
  • 轉(zhuǎn)一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong閱讀 1,030評論 0 2
  • 引入依賴: implementation 'io.reactivex.rxjava2:rxandroid:2.0....
    為夢想戰(zhàn)斗閱讀 1,430評論 0 0
  • 前言我從去年開始使用 RxJava ,到現(xiàn)在一年多了。今年加入了 Flipboard 后,看到 Flipboard...
    占導(dǎo)zqq閱讀 9,321評論 6 151
  • 十七歲那年,我和班里另外兩名男生幸運地考入了德惠第一中學(xué),也非常榮幸地同時分到了重點班。從此,開始了艱苦的高中...
    秋水伊人8244閱讀 206評論 0 0

友情鏈接更多精彩內(nèi)容