嘔心瀝血:RxJava2.x創(chuàng)建操作符

RxJava的基本流程以及線程切換可以參考之前的文章

http://www.itdecent.cn/p/2adaea7237c4

1、序言

RxJava除了擁有邏輯簡潔的事件流鏈?zhǔn)秸{(diào)用,使用簡單外其豐富的操作符基本可以滿足日常開發(fā)中的各種實(shí)現(xiàn)邏輯

Rx的基本操作符分類

RxJava操作符分類.jpg

下面會逐一講解每一類操作符的使用

2、創(chuàng)建操作符

RxJava創(chuàng)建操作符.jpg

2.1、基本創(chuàng)建操作符

create作為RxJava最基本的創(chuàng)建操作,用來完整的創(chuàng)建一個(gè)被觀察者Observable對象

通過create創(chuàng)建一個(gè)被觀察Observable對象

 Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                //重寫subscribe 寫入實(shí)際的代碼邏輯
                if (!e.isDisposed()) {
                    e.onNext("RxText");
                }
            }
        });

創(chuàng)建觀察者Observer對象

Observer observer = new Observer() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {
                LogUtils.showLog("message == " + (String) o);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

通過subscribe進(jìn)行關(guān)聯(lián)

observable.subscribe(observer);

在實(shí)際使用時(shí)一般鏈?zhǔn)秸{(diào)用

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext("test");
                }
            }
        })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String s) {
                        LogUtils.showLog("s == " + s);
                    }

                    @Override
                    public void onError(Throwable e) {
                       
                    }

                    @Override
                    public void onComplete() {
                      
                    }
                });

2.2、快速創(chuàng)建操作符

使用場景,快速的創(chuàng)建被觀察者并進(jìn)行數(shù)據(jù)發(fā)送

1、just()

作用:快速創(chuàng)建1個(gè)被觀察者對象Observable

注意:just只能傳入最多10個(gè)數(shù)據(jù)

Observable.just(1, 2, 3, 4, 5)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.showLog("integer == " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

輸出:

D/hzfTag1204: integer == 1
    integer == 2
    integer == 3
    integer == 4
    integer == 5

2、fromArray()

作用:快速創(chuàng)建一個(gè)被觀察者,直接發(fā)送傳入的數(shù)組數(shù)據(jù),當(dāng)發(fā)送數(shù)據(jù)大于10時(shí)可以考慮采用fromArray

 int[] arrays = {1, 2, 3, 4, 5};
        Observable.fromArray(arrays)
                .subscribe(new Observer<int[]>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(int[] ints) {
                         //遍歷數(shù)組并輸出
                        for (int num : ints) {
                            LogUtils.showLog("num == " + num);
                        }
                       
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

3、 fromIterable

作用:快速創(chuàng)建一個(gè)Observable并將集合當(dāng)中的數(shù)據(jù)發(fā)送

fromIterable的使用與fromArray一致,數(shù)據(jù)由fromArray的數(shù)組改為集合,不做具體的贅述了

BUT!!! 我在做測試的時(shí)候發(fā)現(xiàn)了一個(gè)好玩兒的事情.....

fromArray中也可以傳一個(gè)list;但是fromIterable不能傳數(shù)組

fromIterable不能傳數(shù)組根據(jù)代碼很明顯,其對參數(shù)做了限制

fromIterable參數(shù).png

但fromArray沒有做限制,當(dāng)我用以下代碼操作時(shí)可以正確拿到list中的數(shù)據(jù)

        List list = new ArrayList();
        list.add("1");
        list.add("2");
        list.add("3");
        Observable.fromArray(list)
                .subscribe(new Observer<List>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(List ints) {
                        LogUtils.showLog("ints == "+ints.get(2));
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.showLog("exception == "+e.getMessage().toString());
                    }

                    @Override
                    public void onComplete() {

                    }
                });



fromArray傳集合.png

通過斷點(diǎn)分析源碼fromArray當(dāng)中的邏輯,當(dāng)fromArray傳入一個(gè)數(shù)組,會走到item.length == 1這個(gè)判斷當(dāng)中,并且最終走的是just操作符


fromArray源碼.png

所以通過fromArray傳入集合就相當(dāng)于是just(list);但是為什么會走到items.length == 1這個(gè)判斷當(dāng)中?
這里面涉及到Java可變參數(shù)的概念,fromArray后面參數(shù)是T...意思是參數(shù)不確定,可以傳多個(gè)參數(shù),傳入幾個(gè)參數(shù)這個(gè)items的length就是多少,所以fromArray不論傳的是list還是array,只要傳的是一個(gè)參數(shù),最終都相當(dāng)于通過just將數(shù)據(jù)發(fā)送出去了(相當(dāng)于把對象通過just發(fā)出去);當(dāng)fromArray中的參數(shù)大于1時(shí),會將參數(shù)封裝成為一個(gè)T[]數(shù)組,再將數(shù)組中的每一個(gè)元素逐一發(fā)送。


ObservableFromArray.png

4、never
作用:只會調(diào)用onSubscribe方法,不會回調(diào)onNext onError onComplete等回調(diào)方法;通過源碼可以看出,內(nèi)部的subscribeActual方法只是調(diào)用了onSubscribe,并沒有執(zhí)行其他的回調(diào)方法

Rxjava never操作符.png

5、empty
作用:當(dāng)訂閱后,被觀察者只會發(fā)送一個(gè)onComplete事件,最終Observer的回調(diào)只有onSubscribe和onComplete會執(zhí)行

6、error
作用:訂閱后僅發(fā)送Error事件,error的參數(shù)可以自定義異常發(fā)送給onError

2.3、延遲創(chuàng)建操作符

延遲創(chuàng)建操作符的需求場景:
(1)當(dāng)經(jīng)過n秒后,執(zhí)行操作x
(2)每經(jīng)過n秒,周期的執(zhí)行操作x
延遲創(chuàng)建操作符的分類:


RxJava延遲操作符.jpg

1、timer
作用:快速創(chuàng)建一個(gè)Observable,并指定一段時(shí)間后發(fā)送onNext(0)事件;onNext的參數(shù)為long類型,默認(rèn)數(shù)值為0

final long startTime = System.currentTimeMillis();
        Observable.timer(5,TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long aLong) {
                        LogUtils.showLog((System.currentTimeMillis() - startTime) + " ms后接收到了數(shù)據(jù) " + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

12-05 10:07:11.351 12187-12207/com.hzf.test.myapplication D/hzfTag1205: 5005 ms后接收到了數(shù)據(jù) 0

2、defer
作用:等到實(shí)際subscribe訂閱時(shí)才會創(chuàng)建一個(gè)Observable;可以保證Observable的數(shù)據(jù)在訂閱時(shí)是最新的數(shù)據(jù)

 Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> call() throws Exception {
                return Observable.just(test1);
            }
        });

        test1 = "test222";

        observable.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                LogUtils.showLog("s == "+s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

12-05 10:22:00.299 12308-12308/com.hzf.test.myapplication D/hzfTag1205: s == test222

通過defer的源碼可以出來被觀察者的創(chuàng)建是在subscribeActual訂閱時(shí)
(ObservableDefer)


RxJava defer訂閱.png

3、interval
作用:快速創(chuàng)建一個(gè)被觀察者Observable對象,每隔指定時(shí)間發(fā)送一個(gè)事件
interval的參數(shù)最多可用的為4個(gè)參數(shù)
參數(shù)1:初始延遲發(fā)送事件的時(shí)間
參數(shù)2:間隔發(fā)送事件的時(shí)間
參數(shù)3:TimeUnit指定的時(shí)間類型
參數(shù)4:Scheduler,可以手動(dòng)創(chuàng)建一個(gè)worker指定interval的運(yùn)行線程(如果不手動(dòng)選擇第四個(gè)參數(shù),默認(rèn)interval發(fā)生在子線程)

Observable.interval(3, 5, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long aLong) {
                        LogUtils.showLog("aLong == "+aLong+",當(dāng)前線程為 "+Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

運(yùn)行結(jié)果為:
12-05 11:17:32.539 13352-13372/com.hzf.test.myapplication D/hzfTag1205: aLong == 0,當(dāng)前線程為 RxComputationThreadPool-1
12-05 11:17:37.539 13352-13372/com.hzf.test.myapplication D/hzfTag1205: aLong == 1,當(dāng)前線程為 RxComputationThreadPool-1
12-05 11:17:42.539 13352-13372/com.hzf.test.myapplication D/hzfTag1205: aLong == 2,當(dāng)前線程為 RxComputationThreadPool-1
12-05 11:17:47.539 13352-13372/com.hzf.test.myapplication D/hzfTag1205: aLong == 3,當(dāng)前線程為 RxComputationThreadPool-1

也可以指定調(diào)度器,例如:

Observable.interval(3, 5, TimeUnit.SECONDS, new Scheduler() {
            @Override
            public Worker createWorker() {
                return AndroidSchedulers.mainThread().createWorker();
            }
        })
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long aLong) {
                        LogUtils.showLog("aLong == "+aLong+",當(dāng)前線程為 "+Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

12-05 11:20:19.386 13450-13450/com.hzf.test.myapplication D/hzfTag1205: aLong == 0,當(dāng)前線程為 main
12-05 11:20:24.388 13450-13450/com.hzf.test.myapplication D/hzfTag1205: aLong == 1,當(dāng)前線程為 main
12-05 11:20:29.388 13450-13450/com.hzf.test.myapplication D/hzfTag1205: aLong == 2,當(dāng)前線程為 main
12-05 11:20:34.388 13450-13450/com.hzf.test.myapplication D/hzfTag1205: aLong == 3,當(dāng)前線程為 main

如果運(yùn)用線程操作符的話,經(jīng)過我的實(shí)驗(yàn),當(dāng)調(diào)用subscribeOn時(shí)是不起作用的,實(shí)際發(fā)生事件的線程依然是子線程或者指定的調(diào)度器;而調(diào)用observeOn時(shí),onNext接收事件的線程即為observeOn所指定的線程。

2、intervalRange
作用:快讀創(chuàng)建一個(gè)被觀察者對象,每隔指定時(shí)間發(fā)送數(shù)據(jù),但是與interval不同的是intervalRange可以指定發(fā)送的數(shù)據(jù)數(shù)量

參數(shù)1:起始的事件序號
參數(shù)2:事件數(shù)量
參數(shù)3:第1次事件延遲發(fā)送的時(shí)間
參數(shù)4:事件間的間隔時(shí)間
參數(shù)5:時(shí)間單位
參數(shù)6:Scheduler

Observable.intervalRange(3, 3, 3, 2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long aLong) {
                        LogUtils.showLog("aLong == " + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

運(yùn)行結(jié)果:
12-05 11:30:08.864 13737-13757/com.hzf.test.myapplication D/hzfTag1205: aLong == 3
12-05 11:30:10.864 13737-13757/com.hzf.test.myapplication D/hzfTag1205: aLong == 4
12-05 11:30:12.864 13737-13757/com.hzf.test.myapplication D/hzfTag1205: aLong == 5

2、range
作用:快速創(chuàng)建一個(gè)被觀察者,并連續(xù)發(fā)送一個(gè)事件序列,可指定范圍。功能與intervalRange類似,但實(shí)現(xiàn)的功能會相對簡單一些。

參數(shù)1:起始數(shù)據(jù)
參數(shù)2:發(fā)送多少條數(shù)據(jù)

//從5開始發(fā)送 發(fā)送10個(gè)數(shù)據(jù)
        Observable.range(5, 10)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.showLog("integer == " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

2、rangeLong
作用:與range類似,但支持long類型

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容