RxJava 3.X來襲,請做好準(zhǔn)備~

看到此文,你應(yīng)該是你的技術(shù)圈第一個(gè)感知到RxJava 3.0.0-RC0來襲的大牛...

本文已經(jīng)更新在掘金,請不要重讀浪費(fèi)時(shí)間哦

前言

每個(gè)Android開發(fā)者,都是愛RxJava的,簡潔線程切換和多網(wǎng)絡(luò)請求合并,再配合Retrofit,簡直是APP開發(fā)的福音。不知不覺,RxJava一路走來,已經(jīng)更新到第三大版本了。不像RxJava 2對RxJava 1那么殘忍,RxJava 3對RxJava 2的兼容性還是挺好的,目前并沒有做出很大的更改。RxJava2到2020年12月31號不再提供支持,錯(cuò)誤的會(huì)同時(shí)在2.x和3.x修復(fù),但新功能只會(huì)在3.x上添加。

同時(shí),希望通過本文,能知道垃圾箱顏色分類。

作為嘗鮮,趕緊品嘗吧。

image

主要變化

主要特點(diǎn)

  • 單一依賴:Reactive-Streams
  • 繼續(xù)支持Java 6+和Android 2.3+
  • 修復(fù)了API錯(cuò)誤和RxJava 2的許多限制
  • 旨在替代RxJava 2,具有相對較少的二進(jìn)制不兼容更改
  • 提供Java 8 lambda友好的API
  • 關(guān)于并發(fā)源的不同意見
  • 異步或同步執(zhí)行
  • 參數(shù)化并發(fā)的虛擬時(shí)間和調(diào)度程序
  • 為測試schedulers,consumers和plugin hooks提供測試和診斷支持

與RxJava 2的主要區(qū)別是:

  • 將eagerTruncate添加到replay運(yùn)算符,以便head節(jié)點(diǎn)將在截?cái)鄷r(shí)丟失它保留的項(xiàng)引用
  • 新增 X.fromSupplier()
  • 使用 Scheduler 添加 concatMap,保證 mapper 函數(shù)的運(yùn)行位置
  • 新增 startWithItem 和 startWithIterable
  • ConnectableFlowable/ConnetableFlowable 重新設(shè)計(jì)
  • 將 as() 并入 to()
  • 更改 Maybe.defaultIfEmpty() 以返回 Single
  • 用 Supplier 代替 Callable
  • 將一些實(shí)驗(yàn)操作符推廣到標(biāo)準(zhǔn)
  • 從某些主題/處理器中刪除 getValues()
  • 刪除 replay(Scheduler) 及其重載
  • 刪除 dematerialize()
  • 刪除 startWith(T|Iterable)
  • 刪除 as()
  • 刪除 Maybe.toSingle(T)
  • 刪除 Flowable.subscribe(4 args)
  • 刪除 Observable.subscribe(4 args)
  • 刪除 Single.toCompletable()
  • 刪除 Completable.blockingGet()

到這里就結(jié)束了,想知道的都知道了。

image

入門

1、添加依賴

implementation "io.reactivex.rxjava3:rxjava:3.0.0-RC0"

不好意思哦,還沒看到RxAndroid出3.0,這就很尷尬了...

2、一些概念

2.1、上流、下流

在RxJava,數(shù)據(jù)以流的方式組織。也就是說,Rxjava包括一個(gè)源的數(shù)據(jù)流,數(shù)據(jù)流后跟著消費(fèi)者的零個(gè)到多個(gè)消費(fèi)數(shù)據(jù)流步驟。

source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer)

在上文代碼中,對于operator2來說,在它前面叫做上流,在它后面的叫做下流。憋住,別笑,真的是下流來的。

2.2、流的對象

在RxJava的文檔中,emission, emits, item, event, signal, data and message都被認(rèn)為在數(shù)據(jù)流中被傳遞的數(shù)據(jù)對象。

2.3、背壓(Backpressure)

當(dāng)數(shù)據(jù)流通過異步的步驟執(zhí)行時(shí),這些步驟的執(zhí)行速度可能不一致。也就是說上流數(shù)據(jù)發(fā)送太快,下流沒有足夠的能力去處理。為了避免這種情況,一般要么緩存上流的數(shù)據(jù),要么拋棄數(shù)據(jù)。但這種處理方式,有時(shí)會(huì)帶來很大的問題。為此,RxJava帶來了backpressure的概念。背壓是一種流量的控制步驟,在不知道上流還有多少數(shù)據(jù)的情形下控制內(nèi)存的使用,表示它們還能處理多少數(shù)據(jù)。

支持背壓的有Flowable類,不支持背壓的有Observable,Single, Maybe and Completable類。

2.4 線程調(diào)度器(Schedulers)

對于我們Android開發(fā)來說,最喜歡的就是它簡潔切換線程的操作。RxJava通過調(diào)度器來方便線程的切換。

  • Schedulers.computation(): 適合運(yùn)行在密集計(jì)算的操作,大多數(shù)異步操作符使用該調(diào)度器。
  • Schedulers.io():適合運(yùn)行I/0和阻塞操作.
  • Schedulers.single():適合需要單一線程的操作
  • Schedulers.trampoline(): 適合需要順序運(yùn)行的操作

在不同平臺(tái)還有不同的調(diào)度器,例如Android的主線程:AndroidSchedulers.mainThread()

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

2.5 基類

在 RxJava 3 可以發(fā)現(xiàn)有以下幾個(gè)基類(跟RxJava 2是一致的吧):

  • io.reactivex.Flowable:發(fā)送0個(gè)N個(gè)的數(shù)據(jù),支持Reactive-Streams和背壓
  • io.reactivex.Observable:發(fā)送0個(gè)N個(gè)的數(shù)據(jù),不支持背壓,
  • io.reactivex.Single:只能發(fā)送單個(gè)數(shù)據(jù)或者一個(gè)錯(cuò)誤
  • io.reactivex.Completable:沒有發(fā)送任何數(shù)據(jù),但只處理 onComplete 和 onError 事件。
  • io.reactivex.Maybe:能夠發(fā)射0或者1個(gè)數(shù)據(jù),要么成功,要么失敗。

不建議再往下看了,建議點(diǎn)贊或收藏...

下文關(guān)于操作符內(nèi)容太多了

等需要了,再來查閱

下班時(shí)間還是好好護(hù)發(fā)吧

My GitHub

image

操作符

實(shí)用操作符

1、ObserveOn

指定觀察者的線程,例如在Android訪問網(wǎng)絡(luò)后,數(shù)據(jù)需要主線程消費(fèi),那么將觀察者的線程切換到主線就需要ObserveOn操作符。每次指定一次都會(huì)生效。

2、subscribeOn

指定被觀察者的線程,即數(shù)據(jù)源發(fā)生的線程。例如在Android訪問網(wǎng)絡(luò)時(shí),需要將線程切換到子線程。多次指定只有第一次有效。

3、doOnEach

數(shù)據(jù)源(Observable)每發(fā)送一次數(shù)據(jù),就調(diào)用一次。

4、doOnNext

數(shù)據(jù)源每次調(diào)用onNext() 之前都會(huì)先回調(diào)該方法。

5、doOnError

數(shù)據(jù)源每次調(diào)用onError() 之前會(huì)回調(diào)該方法。

6、doOnComplete

數(shù)據(jù)源每次調(diào)用onComplete() 之前會(huì)回調(diào)該方法

7、doOnSubscribe

數(shù)據(jù)源每次調(diào)用onSubscribe() 之后會(huì)回調(diào)該方法

8、doOnDispose

數(shù)據(jù)源每次調(diào)用dispose() 之后會(huì)回調(diào)該方法

其他的見官網(wǎng)吧,不難

實(shí)用操作符

image

對數(shù)據(jù)源過濾操作符

主要講對數(shù)據(jù)源進(jìn)行選擇和過濾的常用操作符

1、skip(跳過)

可以作用于Flowable,Observable,表示源發(fā)射數(shù)據(jù)前,跳過多少個(gè)。例如下面跳過前四個(gè):

Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

source.skip(4)
    .subscribe(System.out::print);

打印結(jié)果:5678910

Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

source.skipLast(4)
    .subscribe(System.out::print);
    
打印結(jié)果:1 2 3 4 5 6

skipLast(n)操作表示從流的尾部跳過n個(gè)元素。

2、debounce(去抖動(dòng))

可作用于Flowable,Observable。在Android開發(fā),通常為了防止用戶重復(fù)點(diǎn)擊而設(shè)置標(biāo)記位,而通過RxJava的debounce操作符可以有效達(dá)到該效果。在規(guī)定時(shí)間內(nèi),用戶重復(fù)點(diǎn)擊只有最后一次有效,

Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("A");

    Thread.sleep(1_500);
    emitter.onNext("B");

    Thread.sleep(500);
    emitter.onNext("C");

    Thread.sleep(250);
    emitter.onNext("D");

    Thread.sleep(2_000);
    emitter.onNext("E");
    emitter.onComplete();
});

source.subscribeOn(Schedulers.io())
        .debounce(1, TimeUnit.SECONDS)
        .blockingSubscribe(
                item -> System.out.print(item+" "),
                Throwable::printStackTrace,
                () -> System.out.println("onComplete"));

打印:A D E onComplete

上文代碼中,數(shù)據(jù)源以一定的時(shí)間間隔發(fā)送A,B,C,D,E。操作符debounce的時(shí)間設(shè)為1秒,發(fā)送A后1.5秒并沒有發(fā)射其他數(shù)據(jù),所以A能成功發(fā)射。發(fā)射B后,在1秒之內(nèi),又發(fā)射了C和D,在D之后的2秒才發(fā)射E,所有B、C都失效,只有D有效;而E之后已經(jīng)沒有其他數(shù)據(jù)流了,所有E有效。

image

3、distinct(去重)

可作用于Flowable,Observable,去掉數(shù)據(jù)源重復(fù)的數(shù)據(jù)。

Observable.just(2, 3, 4, 4, 2, 1)
        .distinct()
        .subscribe(System.out::println);

// 打印:2 3 4 2 1
Observable.just(1, 1, 2, 1, 2, 3, 3, 4)
        .distinctUntilChanged()
        .subscribe(System.out::print);
//打?。? 2 1 2 3 4

distinctUntilChanged()去掉相鄰重復(fù)數(shù)據(jù)。

4、elementAt(獲取指定位置元素)

可作用于Flowable,Observable,從數(shù)據(jù)源獲取指定位置的元素,從0開始。

 Observable.just(2,4,3,1,5,8)
        .elementAt(0)
        .subscribe(integer -> 
         Log.d("TAG","elmentAt->"+integer));
打?。?

Observable<String> source = Observable.just("Kirk", "Spock", "Chekov", "Sulu");
Single<String> element = source.elementAtOrError(4);

element.subscribe(
    name -> System.out.println("onSuccess will not be printed!"),
    error -> System.out.println("onError: " + error));
打?。簅nSuccess will not be printed!

elementAtOrError:指定元素的位置超過數(shù)據(jù)長度,則發(fā)射異常。

5、filter(過濾)

可作用于 Flowable,Observable,Maybe,Single。在filter中返回表示發(fā)射該元素,返回false表示過濾該數(shù)據(jù)。

Observable.just(1, 2, 3, 4, 5, 6)
        .filter(x -> x % 2 == 0)
        .subscribe(System.out::print);
打印:2 4 6

6、first(第一個(gè))

作用于 Flowable,Observable。發(fā)射數(shù)據(jù)源第一個(gè)數(shù)據(jù),如果沒有則發(fā)送默認(rèn)值。

Observable<String> source = Observable.just("A", "B", "C");
Single<String> firstOrDefault = source.first("D");
firstOrDefault.subscribe(System.out::println);
打?。篈

Observable<String> emptySource = Observable.empty();
Single<String> firstOrError = emptySource.firstOrError();
firstOrError.subscribe(
        element -> System.out.println("onSuccess will not be printed!"),
        error -> System.out.println("onError: " + error));
打?。簅nError: java.util.NoSuchElementException

和firstElement的區(qū)別是first返回的是Single,而firstElement返回Maybe。firstOrError在沒有數(shù)據(jù)會(huì)返回異常。

7、last(最后一個(gè))

last、lastElement、lastOrError與fist、firstElement、firstOrError相對應(yīng)。

Observable<String> source = Observable.just("A", "B", "C");
Single<String> lastOrDefault = source.last("D");
lastOrDefault.subscribe(System.out::println);
//打印:C

Observable<String> source = Observable.just("A", "B", "C");
Maybe<String> last = source.lastElement();
last.subscribe(System.out::println);
//打印:C

Observable<String> emptySource = Observable.empty();
Single<String> lastOrError = emptySource.lastOrError();
lastOrError.subscribe(
        element -> System.out.println("onSuccess will not be printed!"),
        error -> System.out.println("onError: " + error));
// 打?。簅nError: java.util.NoSuchElementException

8、ignoreElements & ignoreElement(忽略元素)

ignoreElements 作用于Flowable、Observable。ignoreElement作用于Maybe、Single。兩者都是忽略掉數(shù)據(jù),返回完成或者錯(cuò)誤時(shí)間。

Single<Long> source = Single.timer(1, TimeUnit.SECONDS);
Completable completable = source.ignoreElement();
completable.doOnComplete(() -> System.out.println("Done!"))
        .blockingAwait();
// 1秒后打?。篋onde!

Observable<Long> source = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
Completable completable = source.ignoreElements();
completable.doOnComplete(() -> System.out.println("Done!"))
        .blockingAwait();
// 五秒后打印:Done!

9、ofType(過濾掉類型)

作用于Flowable、Observable、Maybe、過濾掉類型。

Observable<Number> numbers = Observable.just(1, 4.0, 3, 2.71, 2f, 7);
Observable<Integer> integers = numbers.ofType(Integer.class);
integers.subscribe((Integer x) -> System.out.print(x+" "));
//打印:1 3 7

10、sample

作用于Flowable、Observable,在一個(gè)周期內(nèi)發(fā)射最新的數(shù)據(jù)。

Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("A");

    Thread.sleep(500);
    emitter.onNext("B");

    Thread.sleep(200);
    emitter.onNext("C");

    Thread.sleep(800);
    emitter.onNext("D");

    Thread.sleep(600);
    emitter.onNext("E");
    emitter.onComplete();
});

source.subscribeOn(Schedulers.io())
        .sample(1, TimeUnit.SECONDS)
        .blockingSubscribe(
                item -> System.out.print(item+" "),
                Throwable::printStackTrace,
                () -> System.out.print("onComplete"));
                
// 打?。?C D onComplete

與debounce的區(qū)別是,sample是以時(shí)間為周期的發(fā)射,一秒又一秒內(nèi)的最新數(shù)據(jù)。而debounce是最后一個(gè)有效數(shù)據(jù)開始。

image

11、throttleFirst & throttleLast & throttleWithTimeout

作用于Flowable、Observable。throttleLast與smaple一致,而throttleFirst是指定周期內(nèi)第一個(gè)數(shù)據(jù)。throttleWithTimeout與debounce一致。

Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("A");

    Thread.sleep(500);
    emitter.onNext("B");

    Thread.sleep(200);
    emitter.onNext("C");

    Thread.sleep(800);
    emitter.onNext("D");

    Thread.sleep(600);
    emitter.onNext("E");
    emitter.onComplete();
});

source.subscribeOn(Schedulers.io())
        .throttleFirst(1, TimeUnit.SECONDS)
        .blockingSubscribe(
                item -> System.out.print(item+" "),
                Throwable::printStackTrace,
                () -> System.out.print(" onComplete"));
//打印:A D onComplete

source.subscribeOn(Schedulers.io())
        .throttleLast(1, TimeUnit.SECONDS)
        .blockingSubscribe(
                item -> System.out.print(item+" "),
                Throwable::printStackTrace,
                () -> System.out.print(" onComplete"));

// 打印:C D onComplete

12、throttleLatest

之所以拿出來單獨(dú)說,我看不懂官網(wǎng)的解釋。然后看別人的文章:throttleFirst+throttleLast的組合?開玩笑的吧。個(gè)人理解是:如果源的第一個(gè)數(shù)據(jù)總會(huì)被發(fā)射,然后開始周期計(jì)時(shí),此時(shí)的效果就會(huì)跟throttleLast一致。

Observable<String> source = Observable.create(emitter -> {
            emitter.onNext("A");

            Thread.sleep(500);
            emitter.onNext("B");

            Thread.sleep(200);
            emitter.onNext("C");

            Thread.sleep(200);
            emitter.onNext("D");

            Thread.sleep(400);
            emitter.onNext("E");
            
            Thread.sleep(400);
            emitter.onNext("F");
            
            Thread.sleep(400);
            emitter.onNext("G");
            
            Thread.sleep(2000);
            emitter.onComplete();
        });
        source.subscribeOn(Schedulers.io())
        .throttleLatest(1, TimeUnit.SECONDS)
        .blockingSubscribe(
            item -> Log.e("RxJava",item),
                 Throwable::printStackTrace,
            () -> Log.e("RxJava","finished"));

打印結(jié)果:

image

13、take & takeLast

作用于Flowable、Observable,take發(fā)射前n個(gè)元素;takeLast發(fā)射后n個(gè)元素。

Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

source.take(4)
    .subscribe(System.out::print);
//打印:1 2 3 4

source.takeLast(4)
    .subscribe(System.out::println);
//打印:7 8 9 10

14、timeout(超時(shí))

作用于Flowable、Observable、Maybe、Single、Completabl。后一個(gè)數(shù)據(jù)發(fā)射未在前一個(gè)元素發(fā)射后規(guī)定時(shí)間內(nèi)發(fā)射則返回超時(shí)異常。

Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("A");

    Thread.sleep(800);
    emitter.onNext("B");

    Thread.sleep(400);
    emitter.onNext("C");

    Thread.sleep(1200);
    emitter.onNext("D");
    emitter.onComplete();
});

source.timeout(1, TimeUnit.SECONDS)
        .subscribe(
                item -> System.out.println("onNext: " + item),
                error -> System.out.println("onError: " + error),
                () -> System.out.println("onComplete will not be printed!"));
// 打印:
// onNext: A
// onNext: B
// onNext: C
// onError: java.util.concurrent.TimeoutException: 
            The source did not signal an event for 1 seconds 
            and has been terminated.
image

連接操作符

通過連接操作符,將多個(gè)被觀察數(shù)據(jù)(數(shù)據(jù)源)連接在一起。

1、startWith

可作用于Flowable、Observable。將指定數(shù)據(jù)源合并在另外數(shù)據(jù)源的開頭。

Observable<String> names = Observable.just("Spock", "McCoy");
Observable<String> otherNames = Observable.just("Git", "Code","8");
names.startWith(otherNames).subscribe(item -> Log.d(TAG,item));

//打?。?RxJava: Git
RxJava: Code
RxJava: 8
RxJava: Spock
RxJava: McCo

2、merge

可作用所有數(shù)據(jù)源類型,用于合并多個(gè)數(shù)據(jù)源到一個(gè)數(shù)據(jù)源。

Observable<String> names = Observable.just("Hello", "world");
Observable<String> otherNames = Observable.just("Git", "Code","8");

Observable.merge(names,otherNames).subscribe(name -> Log.d(TAG,name));

//也可以是
//names.mergeWith(otherNames).subscribe(name -> Log.d(TAG,name));

//打?。?RxJava: Hello
RxJava: world
RxJava: Git
RxJava: Code
RxJava: 8

merge在合并數(shù)據(jù)源時(shí),如果一個(gè)合并發(fā)生異常后會(huì)立即調(diào)用觀察者的onError方法,并停止合并??赏ㄟ^mergeDelayError操作符,將發(fā)生的異常留到最后處理。

Observable<String> names = Observable.just("Hello", "world"); 
Observable<String> otherNames = Observable.just("Git", "Code","8");
Observable<String> error = Observable.error(    
                            new NullPointerException("Error!"));
Observable.mergeDelayError(names,error,otherNames).subscribe(
    name -> Log.d(TAG,name), e->Log.d(TAG,e.getMessage()));
    
//打?。?RxJava: Hello
RxJava: world
RxJava: Git
RxJava: Code
RxJava: 8
RxJava: Error!

3、zip

可作用于Flowable、Observable、Maybe、Single。將多個(gè)數(shù)據(jù)源的數(shù)據(jù)一個(gè)一個(gè)的合并在一起哇。當(dāng)其中一個(gè)數(shù)據(jù)源發(fā)射完事件之后,若其他數(shù)據(jù)源還有數(shù)據(jù)未發(fā)射完畢,也會(huì)停止。

Observable<String> names = Observable.just("Hello", "world");
Observable<String> otherNames = Observable.just("Git", "Code", "8");
names.zipWith(otherNames, (first, last) -> first + "-" + last)
       .subscribe(item -> Log.d(TAG, item));

//打?。?RxJava: Hello-Git
RxJava: world-Code

4、combineLatest

可作用于Flowable, Observable。在結(jié)合不同數(shù)據(jù)源時(shí),發(fā)射速度快的數(shù)據(jù)源最新item與較慢的相結(jié)合。
如下時(shí)間線,Observable-1發(fā)射速率快,發(fā)射了65,Observable-2才發(fā)射了C, 那么兩者結(jié)合就是C5。


image

5、switchOnNext

一個(gè)發(fā)射多個(gè)小數(shù)據(jù)源的數(shù)據(jù)源,這些小數(shù)據(jù)源發(fā)射數(shù)據(jù)的時(shí)間發(fā)生重復(fù)時(shí),取最新的數(shù)據(jù)源。

image
image

變換操作符

變化數(shù)據(jù)源的數(shù)據(jù),并轉(zhuǎn)化為新的數(shù)據(jù)源。

1、buffer

作用于Flowable、Observable。指將數(shù)據(jù)源拆解含有長度為n的list的多個(gè)數(shù)據(jù)源,不夠n的成為一個(gè)數(shù)據(jù)源。

Observable.range(0, 10)
    .buffer(4)
    .subscribe((List<Integer> buffer) -> System.out.println(buffer));

// 打印:
// [0, 1, 2, 3]
// [4, 5, 6, 7]
// [8, 9]

2、cast

作用于Flowable、Observable、Maybe、Single。將數(shù)據(jù)元素轉(zhuǎn)型成其他類型,轉(zhuǎn)型失敗會(huì)拋出異常。

Observable<Number> numbers = Observable.just(1, 4.0, 3f, 7, 12, 4.6, 5);

numbers.filter((Number x) -> Integer.class.isInstance(x))
    .cast(Integer.class)
    .subscribe((Integer x) -> System.out.println(x));
// prints:
// 1
// 7
// 12
// 5

3、concatMap

作用于Flowable、Observable、Maybe。將數(shù)據(jù)源的元素作用于指定函數(shù)后,將函數(shù)的返回值有序的存在新的數(shù)據(jù)源。

Observable.range(0, 5)
    .concatMap(i -> {
        long delay = Math.round(Math.random() * 2);

        return Observable.timer(delay, TimeUnit.SECONDS).map(n -> i);
    })
    .blockingSubscribe(System.out::print);

// prints 01234

4、concatMapDelayError

與concatMap作用相同,只是將過程發(fā)送的所有錯(cuò)誤延遲到最后處理。

Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS)
    .concatMapDelayError(x -> {
        if (x.equals(1L)) return Observable.error(new IOException("Something went wrong!"));
        else return Observable.just(x, x * x);
    })
    .blockingSubscribe(
        x -> System.out.println("onNext: " + x),
        error -> System.out.println("onError: " + error.getMessage()));

// prints:
// onNext: 2
// onNext: 4
// onNext: 3
// onNext: 9
// onError: Something went wrong!

5、concatMapCompletable

作用于Flowable、Observable。與contactMap類似,不過應(yīng)用于函數(shù)后,返回的是CompletableSource。訂閱一次并在所有CompletableSource對象完成時(shí)返回一個(gè)Completable對象。

Observable<Integer> source = Observable.just(2, 1, 3);
Completable completable = source.concatMapCompletable(x -> {
    return Completable.timer(x, TimeUnit.SECONDS)
        .doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
    });

completable.doOnComplete(() -> System.out.println("Info: Processing of all items completed"))
    .blockingAwait();

// prints:
// Info: Processing of item "2" completed
// Info: Processing of item "1" completed
// Info: Processing of item "3" completed
// Info: Processing of all items completed

6、concatMapCompletableDelayError

與concatMapCompletable作用相同,只是將過程發(fā)送的所有錯(cuò)誤延遲到最后處理。

Observable<Integer> source = Observable.just(2, 1, 3);
Completable completable = source.concatMapCompletableDelayError(x -> {
    if (x.equals(2)) {
        return Completable.error(new IOException("Processing of item \"" + x + "\" failed!"));
    } else {
        return Completable.timer(1, TimeUnit.SECONDS)
            .doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
    }
});

completable.doOnError(error -> System.out.println("Error: " + error.getMessage()))
    .onErrorComplete()
    .blockingAwait();

// prints:
// Info: Processing of item "1" completed
// Info: Processing of item "3" completed
// Error: Processing of item "2" failed!

ContactMap

8、flatMap

作用于Flowable、Observable、Maybe、Single。與contactMap類似,只是contactMap的數(shù)據(jù)發(fā)射是有序的,而flatMap是無序的。

Observable.just("A", "B", "C")
    .flatMap(a -> {
        return Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS)
                .map(b -> '(' + a + ", " + b + ')');
    })
    .blockingSubscribe(System.out::println);

// prints (not necessarily in this order):
// (A, 1)
// (C, 1)
// (B, 1)
// (A, 2)
// (C, 2)
// (B, 2)
// (A, 3)
// (C, 3)
// (B, 3)

9、flatMapXXX 和 contactMapXXX

太多了,減少篇幅,大家感興趣自己查閱官網(wǎng)吧。功能與flatMap和contactMap類似。

10、flattenAsFlowable & flattenAsObservable

作用于Maybe、Single,將其轉(zhuǎn)化為Flowable,或Observable。

Single<Double> source = Single.just(2.0);
Flowable<Double> flowable = source.flattenAsFlowable(x -> {
    return List.of(x, Math.pow(x, 2), Math.pow(x, 3));
});

flowable.subscribe(x -> System.out.println("onNext: " + x));

// prints:
// onNext: 2.0
// onNext: 4.0
// onNext: 8.0

11、groupBy

作用于Flowable、Observable。根據(jù)一定的規(guī)則對數(shù)據(jù)源進(jìn)行分組。

Observable<String> animals = Observable.just(
    "Tiger", "Elephant", "Cat", "Chameleon", "Frog", "Fish", "Turtle", "Flamingo");

animals.groupBy(animal -> animal.charAt(0), String::toUpperCase)
    .concatMapSingle(Observable::toList)
    .subscribe(System.out::println);

// prints:
// [TIGER, TURTLE]
// [ELEPHANT]
// [CAT, CHAMELEON]
// [FROG, FISH, FLAMINGO]

12、scan

作用于Flowable、Observable。對數(shù)據(jù)進(jìn)行相關(guān)聯(lián)操作,例如聚合等。

Observable.just(5, 3, 8, 1, 7)
    .scan(0, (partialSum, x) -> partialSum + x)
    .subscribe(System.out::println);

// prints:
// 0
// 5
// 8
// 16
// 17
// 24

13、window

對數(shù)據(jù)源發(fā)射出來的數(shù)據(jù)進(jìn)行收集,按照指定的數(shù)量進(jìn)行分組,以組的形式重新發(fā)射。

Observable.range(1, 4)
    // Create windows containing at most 2 items, and skip 3 items before starting a new window.
    .window(2)
    .flatMapSingle(window -> {
        return window.map(String::valueOf)
                .reduce(new StringJoiner(", ", "[", "]"), StringJoiner::add);
    })
    .subscribe(System.out::println);

// prints:
// [1, 2]
// [3, 4]
image

錯(cuò)誤處理操作符

1、onErrorReturn

作用于Flowable、Observable、Maybe、Single。但調(diào)用數(shù)據(jù)源的onError函數(shù)后會(huì)回到該函數(shù),可對錯(cuò)誤進(jìn)行處理,然后返回值,會(huì)調(diào)用觀察者onNext()繼續(xù)執(zhí)行,執(zhí)行完調(diào)用onComplete()函數(shù)結(jié)束所有事件的發(fā)射。

Single.just("2A")
    .map(v -> Integer.parseInt(v, 10))
    .onErrorReturn(error -> {
        if (error instanceof NumberFormatException) return 0;
        else throw new IllegalArgumentException();
    })
    .subscribe(
        System.out::println,
        error -> System.err.println("onError should not be printed!"));

// prints 0

2、onErrorReturnItem

與onErrorReturn類似,onErrorReturnItem不對錯(cuò)誤進(jìn)行處理,直接返回一個(gè)值。

Single.just("2A")
    .map(v -> Integer.parseInt(v, 10))
    .onErrorReturnItem(0)
    .subscribe(
        System.out::println,
        error -> System.err.println("onError should not be printed!"));

// prints 0

3、onExceptionResumeNext

可作用于Flowable、Observable、Maybe。onErrorReturn發(fā)生異常時(shí),回調(diào)onComplete()函數(shù)后不再往下執(zhí)行,而onExceptionResumeNext則是要在處理異常的時(shí)候返回一個(gè)數(shù)據(jù)源,然后繼續(xù)執(zhí)行,如果返回null,則調(diào)用觀察者的onError()函數(shù)。

Observable.create((ObservableOnSubscribe<Integer>) e -> {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onError(new NullPointerException());
            e.onNext(4);
        })
                .onErrorResumeNext(throwable -> {
                    Log.d(TAG, "onErrorResumeNext ");
                    return Observable.just(4);
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe ");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError ");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete ");
                    }
                });

結(jié)果:

image

onExceptionResumeNext操作符也是類似的,只是捕獲Exception。

4、retry

可作用于所有的數(shù)據(jù)源,當(dāng)發(fā)生錯(cuò)誤時(shí),數(shù)據(jù)源重復(fù)發(fā)射item,直到?jīng)]有異?;蛘哌_(dá)到所指定的次數(shù)。

boolean first=true;

Observable.create((ObservableOnSubscribe<Integer>) e -> {
            e.onNext(1);
            e.onNext(2);

            if (first){
                first=false;
                e.onError(new NullPointerException());

            }
            
        })
                .retry(9)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe ");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext " + integer);

                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError ");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete ");
                    }
                });

結(jié)果:

image

5、retryUntil

作用于Flowable、Observable、Maybe。與retry類似,但發(fā)生異常時(shí),返回值是false表示繼續(xù)執(zhí)行(重復(fù)發(fā)射數(shù)據(jù)),true不再執(zhí)行,但會(huì)調(diào)用onError方法。

 Observable.create((ObservableOnSubscribe<Integer>) e -> {
            e.onNext(1);
            e.onNext(2);
            e.onError(new NullPointerException());
            e.onNext(3);
            e.onComplete();
        })
                .retryUntil(() -> true)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe ");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext " + integer);

                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError ");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete ");
                    }
                });

結(jié)果:


image

retryWhen與此類似,但其判斷標(biāo)準(zhǔn)不是BooleanSupplier對象的getAsBoolean()函數(shù)的返回值。而是返回的 Observable或Flowable是否會(huì)發(fā)射異常事件。

總結(jié)

太多操作符太累了,看得心好累。還是根據(jù)實(shí)際開發(fā)需要查閱文檔才是正確的姿勢。本文只是RxJava冰山一角,更多請參閱官網(wǎng)。同時(shí)不建議立馬在項(xiàng)目上實(shí)踐,給它點(diǎn)時(shí)間報(bào)bug。

參閱官網(wǎng)

好東西要分享

如果你看到了這,點(diǎn)個(gè)贊,收下我雙膝。如果文章有誤,幫忙指正,謝謝大佬們。

image
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容