RxJava 變換操作符

ReactiveX 系列文章目錄


buffer

間隔固定個數(shù)緩存

public final <U extends Collection<? super T>> Observable<U> buffer(int count, int skip, Callable<U> bufferSupplier)

// buffer(count, count, bufferSupplier)
public final <U extends Collection<? super T>> Observable<U> buffer(int count, Callable<U> bufferSupplier)
// buffer(count, skip, ArrayListSupplier.<T>asCallable()) 第三個參數(shù)的 call 回調(diào)返回一個空列表
public final Observable<List<T>> buffer(int count, int skip)
// buffer(count, count)
public final Observable<List<T>> buffer(int count)

按照規(guī)定大小緩存,每次取 count 個數(shù),取完一次跳過 skip 個數(shù),將每次取的數(shù)據(jù)合并到一個列表里。

Observable.just(1,2,3,4,5,6,7,8)
       .buffer(4, 2)
       .subscribe({Log.e("RX", "onNext $t")})

看下日志:

onNext [1, 2, 3, 4]
onNext [3, 4, 5, 6]
onNext [5, 6, 7, 8]
onNext [7, 8]

每次取出 4 個數(shù)緩存,然后跳過 2 個數(shù)再緩存并且發(fā)射這一次的列表。

val list = mutableListOf<Int>()
Observable.just(1,2,3,4,5,6,7,8)
           .buffer(4, 2, { list })
           .subscribe(observer)

日志:

onNext [1, 2, 3, 3]
onNext [1, 2, 3, 3, 4]
onNext [1, 2, 3, 3, 4, 5]
onNext [1, 2, 3, 3, 4, 5, 7]

這就奇怪了,與想象不同,看兩個參數(shù)的 buffer 方法,也是創(chuàng)建了 ArrayListSupplier.<T>asCallable() 調(diào)用三個參數(shù)的方法,最后發(fā)現(xiàn)兩者的不同在于 ArrayListSupplier.<T>asCallable() 里的 call() 每次調(diào)用都 new 了一個新的 ArrayList,而我上面的代碼用的是同一個 ArrayList。

將上述代碼修改成

Observable.just(1,2,3,4,5,6,7,8)
           .buffer(4, 2, { mutableListOf<Int>() })
           .subscribe(observer)

此時看日志就和想象中的一樣了。

源碼分析

進源碼發(fā)現(xiàn),當 count 和 skip 不相等時,會先創(chuàng)建 BufferSkipObserver 來接收 Observable 發(fā)射的數(shù)據(jù),也就相當于先攔截下,在這個 Observer 里做緩存,然后將緩存后的數(shù)據(jù)發(fā)送給用戶訂閱的那個 Observer,所以主要邏輯看 BufferSkipObserver 的 onNext 方法。

@Override
public void onNext(T t) {
    if (index++ % skip == 0) {
        U b;

        try {
            b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources.");
        } catch (Throwable e) {
            buffers.clear();
            s.dispose();
            actual.onError(e);
            return;
        }

        buffers.offer(b);
    }

    Iterator<U> it = buffers.iterator();
    while (it.hasNext()) {
        U b = it.next();
        b.add(t);
        if (count <= b.size()) {
            it.remove();

            actual.onNext(b);
        }
    }
}

buffers 是 ArrayDeque<U>,它是一個雙端隊列,內(nèi)部存儲的 U 是一個集合,就是我們調(diào) buffer 方法第三個參數(shù)的 call 方法提供的集合。

首先在這個例子里,發(fā)射的數(shù)據(jù)是 1-8,索引也就是源碼里的 index 是 0-7,skip 是 2,這樣 index % skip 的結(jié)果就分別是 0,1,0,1,0,1,0,1,而我們看到,當這個取余結(jié)果為 0 的時候,就通過 Callable 的 call 方法返回一個列表,并且把它加入到 buffers 這個隊列的末尾。

  1. 首先數(shù)據(jù) 1,會取出一個 ArrayList 加入 buffers,然后取 buffers 的迭代器,因為只有一個元素,循環(huán)一次,取出里面的 list,把 1 添加進去,結(jié)果是這樣的


    buffer1.png
  2. 然后數(shù)據(jù) 2,index % skip 不為 0,不會向 buffers 里添加,迭代取出里面的 list,添加 2,此時是

    buffer2.png
  3. 數(shù)據(jù) 3,會又通過 bufferSupplier.call() 取出一個 list,而我代碼不是創(chuàng)建一個新的 list,而是依然用原來的 list,這樣 buffers 先變成了

    buffer3.png

    就是 buffers 里有了兩個值,都是指向同一個 list 的指針,然后遍歷 buffers,第一次遍歷取出的是 list,添加了 3,第二次遍歷取出的還是這個 list,又要添加 3,因此這個 list 里面有了兩個 3

    buffer4.png

    在第二次遍歷中,此時 list 的長度為 4,等于 count,于是會刪除 buffers 這個隊列的第一個元素,并把這個 list 的內(nèi)容發(fā)射出去,于是自己定義的觀察者就先收到了 1,2,3,3。

    結(jié)果變成了這樣

    buffer5.png
  4. 數(shù)據(jù) 4,索引 3 和 skip 取余不為 0,直接在 list 上加了 4,并且列表長度為 5,大于 count,buffers 刪除頭部元素并發(fā)射出去,此時 buffers 里面空了,什么都沒有

    buffer6.png
  5. 數(shù)據(jù) 5,又在 buffers 上加個一個元素,也是指向 list 的指針,由于 list 用的同一個,所以每次添加數(shù)據(jù)都會立刻大于 count,立刻發(fā)射數(shù)據(jù),所以 1,2,3,3,4,5 也被發(fā)射了出去,并且 buffers 剛添加了又被 remove 掉,里面又空了

  6. 再看數(shù)據(jù) 6,index % skip 為 1,所以 buffers 不會添加數(shù)據(jù),而上一次 buffers 已經(jīng)空了,所以這次迭代后發(fā)現(xiàn) hasNext 是 false,于是 6 這個數(shù)據(jù)就失蹤了

  7. 到了 7,才會 buffers 中又添加一個指向 list 的指針,并添加了 7 發(fā)射出去,于是最終 buffers 里是空的,外面的觀察者收到了 1,2,3,3,4,5,7

間隔固定時間緩存

public final Observable<List<T>> buffer(long timespan, long timeskip, TimeUnit unit)
public final Observable<List<T>> buffer(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler)
// 上面兩個方法調(diào)用這個方法
public final <U extends Collection<? super T>> Observable<U> buffer(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, Callable<U> bufferSupplier) {
    ...
    return RxJavaPlugins.onAssembly(new ObservableBufferTimed<T, U>(this, timespan, timeskip, unit, scheduler, bufferSupplier, Integer.MAX_VALUE, false));
}

public final Observable<List<T>> buffer(long timespan, TimeUnit unit) {
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, int count)
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, Scheduler scheduler)
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, Scheduler scheduler, int count)
// 上面四個方法最后都調(diào)用這個方法
public final <U extends Collection<? super T>> Observable<U> buffer(
            long timespan, TimeUnit unit,
            Scheduler scheduler, int count,
            Callable<U> bufferSupplier,
            boolean restartTimerOnMaxSize) {
    ...
    return RxJavaPlugins.onAssembly(new ObservableBufferTimed<T, U>(this, timespan, timespan, unit, scheduler, bufferSupplier, count, restartTimerOnMaxSize));
}

最后都是內(nèi)部創(chuàng)建 ObservableBufferTimed 來實現(xiàn)。

Observable.just(1,2,3,4,5,6,7,8)
        .map {
            Thread.sleep(100)
            it
        }
        .buffer(150, 200, TimeUnit.MILLISECONDS)
        .subscribe({Log.e("RX", "onNext $t")})

每隔 150ms 取一次,跳過 200ms,包含那 150ms 在內(nèi),結(jié)果是:

onNext [1]
onNext [2, 3]
onNext [4, 5]
onNext [6, 7]
onNext [8]

window

和 buffer 類似,但不是變成一個個列表發(fā)射,而是多個 Observable,每個 Observable 發(fā)射一個子集。

public final Observable<Observable<T>> window(long count)
public final Observable<Observable<T>> window(long count, long skip)
public final Observable<Observable<T>> window(long count, long skip, int bufferSize)

public final Observable<Observable<T>> window(long timespan, long timeskip, TimeUnit unit)
public final Observable<Observable<T>> window(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler)
public final Observable<Observable<T>> window(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, int bufferSize) 

public final Observable<Observable<T>> window(long timespan, TimeUnit unit) 
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, long count)
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, long count, boolean restart)
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, Scheduler scheduler)
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, Scheduler scheduler, long count)
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, Scheduler scheduler, long count, boolean restart)
public final Observable<Observable<T>> window(long timespan, TimeUnit unit, Scheduler scheduler, long count, boolean restart, int bufferSize)

public final <B> Observable<Observable<T>> window(ObservableSource<B> boundary)
public final <B> Observable<Observable<T>> window(ObservableSource<B> boundary, int bufferSize)
public final <B> Observable<Observable<T>> window(Callable<? extends ObservableSource<B>> boundary)
public final <B> Observable<Observable<T>> window(Callable<? extends ObservableSource<B>> boundary, int bufferSize)

public final <U, V> Observable<Observable<T>> window(ObservableSource<U> openingIndicator, Function<? super U, ? extends ObservableSource<V>> closingIndicator)
public final <U, V> Observable<Observable<T>> window(ObservableSource<U> openingIndicator, Function<? super U, ? extends ObservableSource<V>> closingIndicator, int bufferSize)
val ob = Observable
    .intervalRange(0,10,0,100,TimeUnit.MILLISECONDS)
    .take(10)

// 一個Observable開始發(fā)射數(shù)據(jù)
// 0
// 1
// 一個Observable開始發(fā)射數(shù)據(jù)
// 2
// 2
// 3
// 一個Observable開始發(fā)射數(shù)據(jù)
// 4
// 4
// 5
// 一個Observable開始發(fā)射數(shù)據(jù)
// 6
// 6
// 7
// 一個Observable開始發(fā)射數(shù)據(jù)
// 8
// 8
// 9
ob.window(3,2).subscribe {
it.doOnSubscribe { Log.e("RX", "一個Observable開始發(fā)射數(shù)據(jù)") }
        .subscribe { Log.e("RX", "$it") }
}

// 一個Observable開始發(fā)射數(shù)據(jù)
// 0
// 1
// 2
// 一個Observable開始發(fā)射數(shù)據(jù)
// 4
// 5
// 6
// 7
// 一個Observable開始發(fā)射數(shù)據(jù)
// 8
// 9
ob.window(300,400, TimeUnit.MILLISECONDS).subscribe {
    it.doOnSubscribe { Log.e("RX", "一個Observable開始發(fā)射數(shù)據(jù)") }
            .subscribe { Log.e("RX", "$it") }
}

// 參數(shù)的 Observable 發(fā)射前的數(shù)據(jù)
// 0
// 1
// 2
ob.window(Observable.timer(300, TimeUnit.MILLISECONDS)).subscribe {
    it.subscribe { Log.e("RX", "$it") }
}

// 屁用沒有
ob.window(Observable.timer(300, TimeUnit.MILLISECONDS)
            , Function<Long, Observable<Long>>{ Observable.timer(200, TimeUnit.MILLISECONDS)} )
    .subscribe {
        it.subscribe { Log.e("RX", "$it") }
    }

blockingIterable

返回一個迭代器

val ob = Observable.just(1,2,3,4,5,6)
val iterable = ob.blockingIterable()
iterable.forEach {Log.e("RX", "$it")}

blockingForEach

阻塞直到 forEach 都執(zhí)行完畢了,才會發(fā)射給觀察者。

val ob = Observable.just(1,2,3,4,5,6)
ob.blockingForEach {
    Thread.sleep(1000)
    Log.e("RX", "$it")
}
ob.subscribe({ Log.e("RX", "收到 $it") })
05-12 23:33:40.729 3651-3651/pot.ner347.androiddemo E/RX: 1
05-12 23:33:41.730 3651-3651/pot.ner347.androiddemo E/RX: 2
05-12 23:33:42.731 3651-3651/pot.ner347.androiddemo E/RX: 3
05-12 23:33:43.732 3651-3651/pot.ner347.androiddemo E/RX: 4
05-12 23:33:44.733 3651-3651/pot.ner347.androiddemo E/RX: 5
05-12 23:33:45.735 3651-3651/pot.ner347.androiddemo E/RX: 6
05-12 23:33:45.755 3651-3651/pot.ner347.androiddemo E/RX: 收到 1
    收到 2
    收到 3
    收到 4
    收到 5
    收到 6

blockingLatest

最近發(fā)射的數(shù)據(jù)加入到 Iterator 并返回。

險些沒把人搞死,幾個小時,各種方法試了,得到的 Iterator 一直是空的,注釋說的又不清不楚的。太浪費時間了,感覺不值,但又不想漏過去。

直到搜到一篇文章(https://www.safaribooksonline.com/library/view/learning-rxjava/9781787120426/deb25b82-ddaf-4099-8b96-7484ec152a3b.xhtml

val source = Observable.interval(1, TimeUnit.MICROSECONDS).take(1000)
val iterable = source.blockingLatest()
for (i in iterable) {
    Log.e("RX", "$i")
}

1 秒中發(fā)射一次,而 iterable 依次往后迭代。

val ob = Observable.create(ObservableOnSubscribe<Int> { emitter ->
    (1..100).forEach {
        emitter.onNext(it)
        Log.e("RX", "onNext $it")
        Thread.sleep(100)
    }
})

val iterator = ob.blockingLatest()
for (i in iterator) {
    Log.e("RX", "$i")
}

可為毛上面的代碼就不行呢,iterator 里只有最后一個 100,真是氣死了。那 interval 和我這 create 有什么區(qū)別,不都是 1 秒發(fā)一次嘛,看 interval 源碼,看到里面是創(chuàng)建了 worker 線程,然后定時執(zhí)行的,所以 create 這個也放到子線程去,居然可以了

val ob = Observable.create(ObservableOnSubscribe<Int> { emitter ->
    (1..100).forEach {
        emitter.onNext(it)
        Log.e("RX", "onNext $it")
        Thread.sleep(100)
    }
}).subscribeOn(Schedulers.newThread())

val iterator = ob.blockingLatest()
for (i in iterator) {
    Log.e("RX", "$i")
}

blockingMostRecent

val ob = Observable.create(ObservableOnSubscribe<Int> { emitter ->
    (1..20).forEach {
        emitter.onNext(it)
        Log.e("RX", "onNext $it")
        Thread.sleep(100)
    }
}).subscribeOn(Schedulers.newThread())
val iterator = ob.blockingMostRecent(-10)
for (i in iterator) {
    Log.e("RX", "$i")
}

從日志看,blockingLatest 迭代器每次取出的是最近發(fā)射的那一個,而 blockingMostRecent 返回同樣的數(shù)據(jù)多次,比如

...
onNext 1
1
1
1
1
1
1
1
1
1
1
1
2
2
3
onNext 3
3
3
4
onNext 4
4
4
onNext 5
5
5
6
6
onNext 6
6
6
...

它會重新確認最近的值,即使已經(jīng)確認過,直到下一個值被發(fā)射才停止。如果沒有值發(fā)射,會用參數(shù)的默認值。

blockingNext

返回一個阻塞的 Iterator,直到發(fā)射一個數(shù)據(jù)才返回這個數(shù)據(jù)。

val ob = Observable.create(ObservableOnSubscribe<Int> { emitter ->
    (1..20).forEach {
        emitter.onNext(it)
        Log.e("RX", "onNext $it")
        Thread.sleep(100)
    }
}).subscribeOn(Schedulers.newThread())
val iterator = ob.blockingNext()
for (i in iterator) {
    Log.e("RX", "$i")
}

blockingSubscribe

val ob = Observable.create(ObservableOnSubscribe<Int> {
    it.onNext(1)
    it.onNext(2)
//                it.onNext(1/0)
    it.onNext(3)
    it.onComplete()
})

ob.blockingSubscribe( {
    Log.e("RX", "$it")
}, {Log.e("RX", "exception")}, {Log.e("RX", "complete")})

搞出這些方法到底有什么用嗎?當前線程給出回調(diào),知道執(zhí)行了什么嗎?又能怎樣?

還有個無參的構(gòu)造方法:

ignoring any values and rethrowing any exception

這意思是忽略掉發(fā)射的數(shù)據(jù),只拋出異常。如果發(fā)射數(shù)據(jù),就不管,當發(fā)生異常時,拋出來,也沒覺得有什么特別的意義,就用 Observer 也行啊。

cast

將發(fā)射的源數(shù)據(jù)都強制轉(zhuǎn)換成另一種類型。只能是父類轉(zhuǎn)為子類。內(nèi)部調(diào)用了 map。

由于 Kotlin 的類型推斷,demo 里沒想到例子來表現(xiàn)用它前后的區(qū)別。

map

對發(fā)射的每個事件的數(shù)據(jù)執(zhí)行一個函數(shù)進行轉(zhuǎn)換,將轉(zhuǎn)換后的數(shù)據(jù)發(fā)給觀察者。

Observable.just(1,2,3)
           .map({ "number is $it" })
           .subscribe(observerStr)

map 的參數(shù)是一個 Function 接口,泛型 T 是輸入?yún)?shù)類型,R 是轉(zhuǎn)換后返回的類型

public interface Function<T, R> {
  /**
   * Apply some calculation to the input value and return some other value.
   * @param t the input value
   * @return the output value
   * @throws Exception on error
   */
  R apply(@NonNull T t) throws Exception;
}

flatMap

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize)
// 下面四個方法和上面的是一類
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int maxConcurrency)

// flatMap(ObservableInternalHelper.flatMapWithCombiner(mapper, combiner), delayErrors, maxConcurrency, bufferSize)
public final <U, R> Observable<R> flatMap(final Function<? super T, ? extends ObservableSource<? extends U>> mapper,
          final BiFunction<? super T, ? super U, ? extends R> combiner, boolean delayErrors, int maxConcurrency, int bufferSize)
// 下面四個方法和上面的是一類      
public final <U, R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends U>> mapper,
          BiFunction<? super T, ? super U, ? extends R> resultSelector)
public final <U, R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends U>> mapper,
           BiFunction<? super T, ? super U, ? extends R> combiner, boolean delayErrors)
public final <U, R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends U>> mapper,
       BiFunction<? super T, ? super U, ? extends R> combiner, boolean delayErrors, int maxConcurrency)
public final <U, R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends U>> mapper,
          BiFunction<? super T, ? super U, ? extends R> combiner, int maxConcurrency)

// 內(nèi)部經(jīng)歷了 Notification 的轉(zhuǎn)換,無論 onComplete,onError 都會作為一個數(shù)據(jù)通過 onNext 發(fā)射出去
public final <R> Observable<R> flatMap(
          Function<? super T, ? extends ObservableSource<? extends R>> onNextMapper,
          Function<? super Throwable, ? extends ObservableSource<? extends R>> onErrorMapper,
          Callable<? extends ObservableSource<? extends R>> onCompleteSupplier)
public final <R> Observable<R> flatMap(
           Function<? super T, ? extends ObservableSource<? extends R>> onNextMapper,
           Function<Throwable, ? extends ObservableSource<? extends R>> onErrorMapper,
           Callable<? extends ObservableSource<? extends R>> onCompleteSupplier,
           int maxConcurrency)
val list1 = listOf(1,2,3)
val list2 = listOf(4,5)
Observable.fromArray(list1, list2)
        .flatMap({
            Observable.fromIterable(it)
        }).subscribe(observerInt) // 收到 5 次 onNext

// 會收到
// -1-:[1, 2, 3]
// -2-:[1, 2, 3]
// -3-:[1, 2, 3]
// -4-:[4, 5]
// -5-:[4, 5]
Observable.just(list1, list2)
        .flatMap( Function<List<Int>, ObservableSource<String>>{
            // 將 int 轉(zhuǎn)成一個字符串
            Observable.fromIterable(it.map { "-$it-" })
        }, BiFunction<List<Int>, String, MutableMap<String, List<Int>>> { // 這個 combiner 參數(shù)就是將源和轉(zhuǎn)變后的組合返回一個新結(jié)構(gòu)
          t1, t2 ->
            mutableMapOf(t2 to t1)
        }).subscribe({
            it.map { Log.e("RX", "${it.key}:${it.value}") }
        })

// 會收到
// -1-
// -2-
// -3-
// -4-
// -5-
// complete onNext 收到
// onComplete
Observable.just(list1, list2)
       .flatMap({ Observable.fromIterable(it.map { "-$it-" }) },
               { Observable.just( "${it.message}" ) },
               { Observable.just("complete") })
       .subscribe(observerStr)

flatMapIterable

map 將一個數(shù)據(jù)轉(zhuǎn)成另一種,是一對一的,flatMap 是將一個數(shù)據(jù)變成一個 Observable,內(nèi)部可能發(fā)射多次,可以看成一對多,flatMapIterable 是將一個數(shù)據(jù)變成一個 Iterable,也可以認為是一對多。

Observable.just(1,10)
          .flatMapIterable { listOf(it, it+1, it+2) }
          .subscribe(observerInt)

依次發(fā)射 1,2,3,10,11,12。效果等價的 flatMap 寫法

Observable.just(1,10)
          .flatMap { Observable.fromIterable(listOf(it, it+1, it+2)) }
          .subscribe(observerInt)
Observable.just(1,10)
            .flatMapIterable( Function<Int, List<Int>>{ listOf(it, it+1, it+2) },
                    BiFunction<Int, Int, String> { t1, t2 -> "$t1:$t2"})
            .subscribe(observerStr)

發(fā)射 1:1,1:2,1:3,10:10,10:11,10:12。

flatMapCompletable

val list1 = listOf(1,2,3)
val list2 = listOf(4,5)
Observable.fromArray(list1, list2)
        .flatMapCompletable { t ->
            CompletableSource { cs ->
                Log.e("RX", "cs $t")
                cs.onComplete()
            }
        }.subscribe(object : CompletableObserver {
            override fun onComplete() {
                Log.e("RX", "onComplete")
            }

            override fun onSubscribe(d: Disposable) {}

            override fun onError(e: Throwable) {
                Log.e("RX", "onError")
            }
        })

看源碼是每次 onNext 發(fā)射一個值,先加到一個隊列里保存起來。內(nèi)部先攔下來,調(diào)用 flatMapCompletable 參數(shù)那個 Function 的 apply 方法獲得一個 CompletableSource 對象,然后又調(diào)用它的 subscribe 方法。

必須調(diào)用 cs.onComplete(),內(nèi)部的一個布爾值 active 為 false,進入 for 循環(huán)才能繼續(xù)下一次執(zhí)行。

每發(fā)一次都通過一個 onComplete 標記,最終外層的 Observer 收到一個 onComplete 事件。

flatMapMaybe

將源 Observable 發(fā)射的值執(zhí)行 map 操作放到 Maybe 中發(fā)射。

val list1 = listOf(1,2,3)
val list2 = listOf(4,5)
Observable.fromArray(list1, list2)
        .flatMapMaybe { t ->
            Maybe.create(MaybeOnSubscribe<Int> { emitter ->
                t.map { it * 2 }.forEach {
                    emitter.onSuccess(it)
                }
            })
        }.subscribe(object : Observer<Int> {
            override fun onComplete() { textView.text = "${textView.text}\n onComplete" }
            override fun onSubscribe(d: Disposable) {}
            override fun onNext(t: Int) { textView.text = "${textView.text}\n $t" }
            override fun onError(e: Throwable) {}
        })

只收到了 2 和 8,因為 list 在 Maybe 中被展開然后發(fā)射,但由于 Maybe 是 Single 和 Completable,所以第一個列表發(fā)了 2 就結(jié)束,然后第二個列表發(fā)了 8 就結(jié)束。

如果在 Maybe 里這樣:

if (list1.size == 3) emitter.onComplete()
t.map { it * 2 }.forEach {
  emitter.onSuccess(it)
}

只發(fā)了一個 onComplete 就結(jié)束了,并未收到 8。

flatMapSingle

val list1 = listOf(1,2,3)
val list2 = listOf(4,5)
Observable.fromArray(list1, list2)
        .flatMapSingle { t ->
            Single.create(SingleOnSubscribe<Int> { emitter ->
                t.map { it * 2 }.forEach {
                    emitter.onSuccess(it)
                }
            })
        }.subscribe(object : Observer<Int> {
            override fun onComplete() { textView.text = "${textView.text}\n onComplete" }
            override fun onSubscribe(d: Disposable) {}
            override fun onNext(t: Int) { textView.text = "${textView.text}\n $t" }
            override fun onError(e: Throwable) {}
        })

收到 2 和 8 和 onComplete。

switchMap/switchMapDelayError/switchMapSingle/switchMapSingleDelayError

和 flatMap 類似,區(qū)別是當源 Observable 發(fā)射一個新的數(shù)據(jù)項時,如果舊數(shù)據(jù)項訂閱還未完成,就取消舊訂閱數(shù)據(jù)和停止監(jiān)視那個數(shù)據(jù)項產(chǎn)生的 Observable,多線程情況使用。

比如源 Observable 產(chǎn)生 A、B、C 三個結(jié)果,通過 switchMap 的映射規(guī)則,映射后從 ObservableA 發(fā)射 A1、A2,ObservableB 發(fā)射 B1、B2,ObservableC 發(fā)射 C1、C2。當在產(chǎn)生 B1 的同時,C1 已經(jīng)產(chǎn)生了,這樣就會忽略 B1 且不再監(jiān)視 ObservableB。

concatMap/concatMapEager

concatMap 用法和 flatMap 一樣,只是它能保證發(fā)射是有序的。

val list1 = listOf("a","b","c")
val list2 = listOf("e","f","g")
val list3 = listOf(list1, list2)
Observable.fromIterable(list3)
    .concatMap({
        val list4 = it.map { "--$it--" }
        Observable.fromIterable(list4)
    }).subscribe(observerStr)

concatMapEager 是并發(fā)處理內(nèi)部 Observable,但是會引起更多的內(nèi)存占用。關(guān)于 flatMap、concatMap、concatMapEager 的對比可以參考這篇文章:https://www.javacodegeeks.com/2017/08/flatmap-vs-concatmap-vs-concatmapeager-rxjava-faq.html

concatMapDelayError/concatMapEagerDelayError

concatMapDelayError 和 concatMapEagerDelayError 是推遲 error 事件,這兩者最后有個 tillTheEnd 的布爾型參數(shù),默認為 true。

先看 concatMapDelayError,關(guān)于 tillTheEnd 的注釋

if true, all errors from the outer and inner ObservableSource sources are delayed until the end,
if false, an error from the main source is signalled when the current ObservableSource source terminates

但測試結(jié)果與想象的并不一樣。

val list1 = listOf("a","b","c")
val list2 = listOf("e","f","g")

Observable.create(ObservableOnSubscribe<List<String>> {
            it.onNext(list1)
            it.onError(Throwable())
            it.onNext(list2)
        }).concatMapDelayError({list ->
            Observable.fromIterable(list)
        }, 10, true).subscribe(observerStr)

外面的 Observable 發(fā)射了 onError,無論 tillTheEnd 是 true 還是 false,observerStr 都收到了 list1 的三個字符串,然后收到一個 onError。

Observable.just(list1, list2).concatMapDelayError({list ->
  Observable.create(ObservableOnSubscribe<String> { emitter ->
      (0..list.count()).forEach {
          if (2 == it) emitter.onError(Throwable()) else emitter.onNext(list[it])
      }
  })
}, 10, false).subscribe(observerStr)

外面的 Observable 正常發(fā)射,里面發(fā)了 onError,無論 tillTheEnd 為 true 還是 false,都是直接崩潰。

concatMapEagerDelayError 也是一樣的現(xiàn)象。

concatMapIterable

Observable.just(1, 10)
      .concatMapIterable { mutableListOf(it+1,it+2,it+3,it+4) }.subscribe(observerInt)

一個變多個。

concatMapCompletable

Observable.just(1,2,3,4,5)
        .concatMapCompletable { t ->
            CompletableSource { cs ->
                Log.e("RX", "cs $t")
                cs.onComplete()
            }
        }
        .subscribe(object : CompletableObserver {
            override fun onComplete() {
                Log.e("RX", "onComplete")
            }

            override fun onSubscribe(d: Disposable) {}

            override fun onError(e: Throwable) {
                Log.e("RX", "onError")
            }
        })

和 flatMapCompletable 基本一個意思。

forEach

public final Disposable forEach(Consumer<? super T> onNext) {
   return subscribe(onNext);
}

對每一個發(fā)射的數(shù)據(jù)執(zhí)行一個操作,其實就是用參數(shù)去訂閱。

forEachWhile

Observable.just(1,2,3)
          .forEachWhile { // 如果返回 false,內(nèi)部會調(diào)用 onComplete
              Log.e("RX", "$it")
              it < 2
          }

// 加了 onError 或 onComplete 時的回調(diào)
// 到 3 時就 false,會執(zhí)行一個 onComplete 所以還會收到 complete
Observable.just(1,2,3,4)
          .forEachWhile ( Predicate<Int>{
              Log.e("RX", "$it")
              it < 3
          }, Consumer<Throwable>{ Log.e("RX", "error msg = ${it.message}")},
                  Action { Log.e("RX", "complete")})

groupBy

將一個 Observable 分拆為一些 GroupedObservable 集合,每一個 GroupedObservable 發(fā)射原始 Observable 的一個子序列。

五個重載方法,可以分成兩類,一種只有 keySelector,一種同時又 keySelector 和 valueSelector。其實只有 keySelector 的內(nèi)部使用了一個默認的 valueSelector,就是原樣返回傳給它的 value。

哪個數(shù)據(jù)項由哪一個 GroupedObservable 發(fā)射,由 group 參數(shù) Function 的 apply 方法決定,在 apply 里進行分組,給每項數(shù)據(jù)指定一個 Key,Key 相同的數(shù)據(jù)會被同一個 GroupedObservable 發(fā)射。

Observable.fromArray("a", "ab", "b", "c", "abc", "bc")
            .groupBy(object : Function<String, Int> {
        override fun apply(t: String): Int = t.length
    }).subscribe(object: Consumer<GroupedObservable<Int, String>> {
        override fun accept(t: GroupedObservable<Int, String>) {
            if (t.key != 2) {
                t.subscribe { Log.e("RX", "${t.key},$it") }
            }
        }
    })

為了便于理解泛型的對應(yīng)關(guān)系,沒有完全用 Lambda 寫。上面的例子中,Observable 本來要發(fā)射 6 個字符串,在 groupBy 方法中,t.length 相同的被劃分為一組,所以 a,b,c 一組,ab,bc 一組,abc 一組,會有 3 個 GroupedObservable,在 subscribe 中,對 GroupedObservable 的 key 進行判斷,長度為 2 的這一組不接收,Log 顯示:

1,a
1,b
1,c
3,abc
Observable.fromArray("a", "ab", "b", "c", "abc", "bc")
        .groupBy(
                Function<String, Int> { t -> t.length },
                Function<String, String> { t -> "o $t o" })
        .subscribe { t ->
            t.subscribe { Log.e("RX", "${t.key},$it") }
        }

第二個參數(shù)就是對值做一個變換。

1,o a o
2,o ab o
1,o b o
1,o c o
3,o abc o
2,o bc o

safeSubscribe

看源碼內(nèi)部捕獲了回調(diào)方法的異常。

val observer = object : Observer<Int> {
     override fun onComplete() {}
     override fun onSubscribe(d: Disposable) {}
     override fun onNext(t: Int) { t/0 } // 產(chǎn)生異常
     override fun onError(e: Throwable) {}
}
// Observable.just(1).subscribe(observer)  // 崩潰
Observable.just(1).safeSubscribe(observer) // 內(nèi)部捕獲了

scan/scanWith

和 reduce 類似,也是一種累計運算,區(qū)別在于 reduce 計算完畢后發(fā)射一個結(jié)果,而 scan 是在累計的過程中,每次都發(fā)射。

// 作為觀察者,收到 3 次事件,值分別是 1,2,6。
Observable.just(1, 2, 3)
        .scan({ t1, t2 -> t1 * t2 }).subscribe {
            textView.text = "${textView.text}\n $it"
        }

// 有初始值,收到了 4 次事件,分別為 10,10,20,60
Observable.just(1, 2, 3)
        .scan(10, { t1, t2 -> t1 * t2 }).subscribe {
            textView.text = "${textView.text}\n $it"
        }

// 其實上一種內(nèi)部也是調(diào)用的這一種
Observable.just(1, 2, 3)
        .scanWith({ 10 }, { t1, t2 -> t1 * t2 }).subscribe {
            textView.text = "${textView.text}\n $it"
        }

sorted

排序后發(fā)射。源 Observable 發(fā)射的數(shù)據(jù)必須是實現(xiàn)了 Comparable 的,即是可以比較的。

// 默認按自然順序排序,收到 1,3,5,9
Observable.just(5,3,9,1).sorted().subscribe(observerInt)

// 使用自定義的排序算法,逆序,收到 9,5,3,1
Observable.just(5,3,9,1).sorted { o1, o2 -> o2-o1 }.subscribe(observerInt)

to

通過 Function 將一個 Observable 變成另一個。

// onNext 2
// onNext 4
// onComplete
Observable.just(1,2).to { it.map {it * 2} }.subscribe(observerInt)

toList

public final Single<List<T>> toList() {
    return toList(16);
}
// 第二個參數(shù)指定列表的初始長度
public final Single<List<T>> toList(final int capacityHint)

// 通過參數(shù)提供一個列表
public final <U extends Collection<? super T>> Single<U> toList(Callable<U> collectionSupplier) {

把發(fā)射的數(shù)據(jù)組合成列表在一起發(fā)射。

// 發(fā)射一次,內(nèi)容是一個列表 [2,4,1,3,5]
Observable.just(2,4,1,3,5).toList()
    .subscribe(Consumer<List<Int>> {
        textView.text = "${textView.text}\n ${it.toList()}"
    })
   
// 參數(shù)提供一個列表,里面已經(jīng)有一個值 10
// 收到的是 [10,2,4,1,3,5
Observable.just(2,4,1,3,5)
    .toList{ mutableListOf(10)}
    .subscribe(Consumer<List<Int>> {
        textView.text = "${textView.text}\n ${it.toList()}"
    })

toSortedList

public final Single<List<T>> toSortedList()
public final Single<List<T>> toSortedList(final Comparator<? super T> comparator)
public final Single<List<T>> toSortedList(final Comparator<? super T> comparator, int capacityHint)
public final Single<List<T>> toSortedList(int capacityHint)

相比 toList,發(fā)射的列表是排好序的。

// 只發(fā)射一次 onNext([1,2,3,4,5])
Observable.just(2,4,1,3,5).toSortedList()
    .subscribe(Consumer<List<Int>> {
        textView.text = "${textView.text}\n ${it.toList()}"
    })
    
// 只發(fā)射一次 onNext([5,4,3,2,1])   
Observable.just(2,4,1,3,5).toSortedList { t1, t2 -> t2-t1}
    .subscribe(Consumer<List<Int>> {
        textView.text = "${textView.text}\n ${it.toList()}"
    })

toMap

// 用 keySelector 提供的數(shù)據(jù)作為 key,源 Observable 發(fā)射的數(shù)據(jù)作為 value,封裝成 Map 發(fā)射
public final <K> Single<Map<K, T>> toMap(final Function<? super T, ? extends K> keySelector) 

// 分別使用 keySelector 和 valueSelector 作為 key 和 value
public final <K, V> Single<Map<K, V>> toMap(final Function<? super T, ? extends K> keySelector, final Function<? super T, ? extends V> valueSelector)

// 最后一個參數(shù)提供 Map,默認的是 HashMap,逆序
public final <K, V> Single<Map<K, V>> toMap(final Function<? super T, ? extends K> keySelector, final Function<? super T, ? extends V> valueSelector, Callable<? extends Map<K, V>> mapSupplier)
val ob = Observable.just(2,2,4,4,1)

val consumer = Consumer<Map<String, Int>> {
    for (map in it) {
        textView.text = "${textView.text}\n key:${map.key} value:${map.value}"
    }
}

// key 相同,value 會覆蓋,逆序輸出
// key:-4- value:4
// key:-2- value:2
// key:-1- value:1
ob.toMap({ "-$it-" }).subscribe(consumer)
        
// key:-4- value:28
// key:-2- value:0
// key:-1- value:7      
ob.toMap({ "-$it-" }, {it*Random().nextInt(10)}).subscribe(consumer)


// 使用自己傳進去的 Map,里面有了一對數(shù)據(jù),內(nèi)部是 LinkedMap
// key:a value:100
// key:-2- value:8
// key:-4- value:32
// key:-1- value:6      
ob.toMap({ "-$it-" }, {it*2}, { mutableMapOf("a" to 100) }).subscribe(consumer)

toMultimap

和 toMap 類似,只是這里相同 key 的 value 不會覆蓋,而是將多個數(shù)據(jù)組成一個 List 作為 value。構(gòu)造方法參數(shù)和 toMap 類似,只是多一個重載方法,最后一個參數(shù)提供用作 value 的那個 List。

public final <K, V> Single<Map<K, Collection<V>>> toMultimap(
        final Function<? super T, ? extends K> keySelector,
        final Function<? super T, ? extends V> valueSelector,
        final Callable<? extends Map<K, Collection<V>>> mapSupplier,
        final Function<? super K, ? extends Collection<? super V>> collectionFactory)
val ob = Observable.just(2,2,4,4,1)
val consumer = Consumer<Map<String, Collection<Int>>> {
    for (map in it) {
        textView.text = "${textView.text}\n key:${map.key} value:${map.value}"
    }
}

// key 相同的 value 會進入一個列表
// key:-4- value:[4,4]
// key:-2- value:[2,2]
// key:-1- value:[1]
ob.toMultimap { "-$it-" }.subscribe(consumer)

// key:-4- value:[4,16]
// key:-2- value:[0,18]
// key:-1- value:[2]
ob.toMultimap({ "-$it-" }, {it*Random().nextInt(10)}).subscribe(consumer)

toFlowable

public final Flowable<T> toFlowable(BackpressureStrategy strategy)

參數(shù)指定背壓策略。

toFuture

如果 Observable 沒有發(fā)射值,拋出 NoSuchElementException 異常,只有發(fā)射一個值時,可以通過 Future 來獲取,如果發(fā)射兩個以上的值,拋出 IllegalArgumentException,注釋是這么說的,可是寫代碼測試 toFuture 并沒有異常,只是這個 Future 里面沒數(shù)據(jù),get 方法時會產(chǎn)出 IndexOutOfBoundsException。

try {
//                val i = Observable.empty<Int>().toFuture().get()
//                val i = Observable.just(1).toFuture().get()
    val i = Observable.just(1, 2).toFuture().get()
    textView.text = "${textView.text}\n future $i"
} catch (e: Exception) {
    textView.text = "${textView.text}\n ${e.message}"
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 創(chuàng)建操作 用于創(chuàng)建Observable的操作符Create通過調(diào)用觀察者的方法從頭創(chuàng)建一個ObservableEm...
    rkua閱讀 1,955評論 0 1
  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位,與響應(yīng)式編程作為結(jié)合使用的,對什么是操作、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,981評論 0 10
  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    測天測地測空氣閱讀 683評論 0 1
  • 當你有一個需要訂閱的Observable,并且希望轉(zhuǎn)換結(jié)果的時候(切記,響應(yīng)式編程中一切皆流)。即將涉及到obse...
    王小賤_ww閱讀 685評論 0 1
  • 注:只包含標準包中的操作符,用于個人學(xué)習(xí)及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 2,366評論 2 8

友情鏈接更多精彩內(nèi)容