RxJava2操作符總結(jié) -- 你想要的都在這里了

對(duì)RxJava的使用還一直停留在與Retrofit一起使用做網(wǎng)絡(luò)請(qǐng)求,其他的還都不了解,所以花了不少時(shí)間整理把RxJava的操作符基本都敲了一遍,熟悉了一遍。參考了一些博客,現(xiàn)在把我整理的分享出來給大家。以下代碼完全在IDEA中編寫,可以直接放到IDEA里跑起來,需要注意的是,我使用的語言是Kotlin,所以如果沒有Kotlin環(huán)境的話需要先配置一下Kotlin的環(huán)境。

首先RxJava2 gradle集成:

implementation "io.reactivex.rxjava2:rxjava:2.2.8"
implementation 'io.reactivex.rxjava2:rxkotlin:2.0.0'

同時(shí)我使用到了RxKotlin,一個(gè)非常不錯(cuò)的RxJava Kotlin擴(kuò)展庫,也是reactivex出品。

2019/5/5 更新compose操作符

好了,廢話不多說,話都在代碼里??
接下來看代碼,有點(diǎn)長哦??

package com.kylec.kc.rxjava

import io.reactivex.Observable
import io.reactivex.ObservableTransformer
import io.reactivex.Observer
import io.reactivex.disposables.Disposable
import io.reactivex.functions.BiFunction
import io.reactivex.functions.Consumer
import io.reactivex.rxkotlin.subscribeBy
import io.reactivex.rxkotlin.toObservable
import io.reactivex.schedulers.Schedulers
import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.collections.ArrayList
import kotlin.random.Random

/**
 * 給Android開發(fā)者的RxJava詳解
 * https://gank.io/post/560e15be2dca930e00da1083
 *
 * RxKotlin
 * https://github.com/ReactiveX/RxKotlin/blob/2.x/README.md
 *
 * RxJava系列
 * http://www.itdecent.cn/p/823252f110b0
 *
 * RxJava2看這一篇文章就夠了
 * https://juejin.im/post/5b17560e6fb9a01e2862246f
 *
 *
 * 基礎(chǔ)類:
 * Flowable: 多個(gè)流,響應(yīng)式流和背壓
 * Observable: 多個(gè)流,無背壓   (被觀察者)
 * Single: 只有一個(gè)元素或者錯(cuò)誤的流
 * Completable: 沒有任何元素,只有一個(gè)完成和錯(cuò)誤信號(hào)的流
 * Maybe: 沒有任何元素或者只有一個(gè)元素或者只有一個(gè)錯(cuò)誤的流
 *
 *
 * 調(diào)度器種類:
 * Schedulers.computation()
 * 用于計(jì)算任務(wù),如事件循環(huán)或和回調(diào)處理,不要用于IO操作(IO操作請(qǐng)使用Schedulers.io());默認(rèn)線程數(shù)等于處理器的數(shù)量
 *
 * Schedulers.from(executor)            使用指定的Executor作為調(diào)度器
 *
 * Schedulers.single()                  該調(diào)度器的線程池只能同時(shí)執(zhí)行一個(gè)線程
 *
 * Schedulers.io()
 * 用于IO密集型任務(wù),如異步阻塞IO操作,這個(gè)調(diào)度器的線程池會(huì)根據(jù)需要增長;
 * 對(duì)于普通的計(jì)算任務(wù),請(qǐng)使用Schedulers.computation();
 * 默認(rèn)是一個(gè)CachedThreadScheduler,很像一個(gè)有線程緩存的新線程調(diào)度器
 *
 * Schedulers.newThread()              為每個(gè)任務(wù)創(chuàng)建一個(gè)新線程
 *
 * Schedulers.trampoline()              當(dāng)其它排隊(duì)的任務(wù)完成后,在當(dāng)前線程排隊(duì)開始執(zhí)行。
 *
 * AndroidSchedulers.mainThread()          主線程,UI線程,可以用于更新界面
 *
 *
 * 注意:本例部分結(jié)果未打印 因?yàn)樵谶@里有延時(shí)但線程已掛掉 正常在Android中使用是沒事的
 *
 * Created by KYLE on 2019/4/28 - 19:57
 */
fun main() {
    // ---------------- `Observable` 創(chuàng)建事件序列的方法 ----------------
    println("---------------- `Observable`創(chuàng)建事件序列的方法 ----------------")
    create()
    interval()
    defer()
    emptyNeverError()
    repeat()
    timer()
    from()
    just()
    range()
    println()


    // ---------------- `Observable` 變換操作 ----------------
    println("---------------- `Observable`變換操作 ----------------")
    mapCast()
    flatMap2contactMap()
    flatMapExample()
    flatMapIterable()
    buffer()
    groupBy()
    scan()
    window()
    println()


    // ---------------- `Observable` 過濾操作/條件操作符 ----------------
    println("---------------- `Observable` 過濾操作/條件操作符 ----------------")
    filter()
    element()
    distinct()
    skip()
    take()
    ignoreElements()
    debounce()
    ofType()
    all()
    contains()
    isEmpty()
    defaultIfEmpty()
    amb()
    println()


    // ---------------- `Observable` 組合操作 ----------------
    println("---------------- `Observable` 組合操作 ----------------")
    concat()
    merge()
    startWith()
    zip()
    combineLast()
    reduce()
    collect()
    count()
    println()


    // ------------------- 功能操作符/輔助操作 -------------------
    println("------------------- 功能操作符/輔助操作 -------------------")
    delay()
    doSeries()
    retry()
    subscribeOn()
    observeOn()
    println()


    // ---------------- RxKotlin擴(kuò)展庫 ----------------
    println("---------------- RxKotlin擴(kuò)展庫 ----------------")
    rkExExample()
    println()


    // ------------------- 額外 其他 -------------------
    println("------------------- 額外 其他 -------------------")
    compose()
    println()
}

/**
 * 使用基本`create()`方法創(chuàng)建事件序列
 */
fun create() {
    print("[create]: ")
    Observable.create<String> { emitter ->
        with(emitter) {
            onNext("Hello")
            onNext("Handsome")
            onNext("Kotlin")
            onComplete()
        }
    }.subscribe(object : Observer<String> {
        override fun onSubscribe(d: Disposable) {
            print("onSubscribe ")
        }

        override fun onError(e: Throwable) {}

        override fun onComplete() {
            print(" onComplete ")
        }

        override fun onNext(t: String) {
            print("$t  ")
        }
    })
    println()
}

/**
 * 使用`interval()`方法創(chuàng)建事件序列
 * 間隔發(fā)射
 */
fun interval() {
    print("[interval]: ")

    // 每隔1秒發(fā)送一個(gè)整數(shù) 從0開始 (默認(rèn)執(zhí)行無數(shù)次 使用`take(int)`方法限制執(zhí)行次數(shù))
    val disposable = Observable.interval(0, 1, TimeUnit.SECONDS)
        .take(5)
        .subscribe { print("$it  ") }
    if (!disposable.isDisposed) disposable.dispose()

    println()

    /*
        其他重載方法:
        public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
        public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)
        public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

        `initialDelay`參數(shù)用來指示開始發(fā)射第一個(gè)整數(shù)的之前要停頓的時(shí)間,時(shí)間的單位與period一樣,都是通過unit參數(shù)來指定的;
        `period`參數(shù)用來表示每個(gè)發(fā)射之間停頓多少時(shí)間;
        `unit`表示時(shí)間的單位,是TimeUnit類型的;
        `scheduler`參數(shù)指定數(shù)據(jù)發(fā)射和等待時(shí)所在的線程。
     */
}

/**
 * 使用`defer()`方法創(chuàng)建事件序列
 *
 * `defer`直到有觀察者訂閱時(shí)才創(chuàng)建Observable,并且為每個(gè)觀察者創(chuàng)建一個(gè)新的Observable
 * `defer`操作符會(huì)一直等待直到有觀察者訂閱它,然后它使用Observable工廠方法生成一個(gè)Observable
 */
fun defer() {
    print("[defer]: ")

    val observable =
        Observable.defer { Observable.just(System.currentTimeMillis()) }
    observable.subscribe { print("$it ") }   // 454  訂閱時(shí)才產(chǎn)生了Observable
    print("   ")
    observable.subscribe { print("$it ") }   // 459  訂閱時(shí)才產(chǎn)生了Observable

    println()
}

/**
 * 使用`empty()` `never()` `error()`方法創(chuàng)建事件序列
 *
 * public static <T> Observable<T> `empty()`:創(chuàng)建一個(gè)不發(fā)射任何數(shù)據(jù)但是正常終止的Observable
 * public static <T> Observable<T> `never()`:創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)也不終止的Observable
 * public static <T> Observable<T> `error(Throwable exception)`:創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)以一個(gè)錯(cuò)誤終止的Observable
 */
fun emptyNeverError() {
    print("[empty]: ")

    Observable.empty<String>().subscribeBy(
        onNext = { print(" next ") },
        onComplete = { print(" complete ") },
        onError = { print(" error ") }
    )
    // `empty()`只會(huì)調(diào)用onComplete方法

    println()

    print("[never]: ")

    Observable.never<String>().subscribeBy(
        onNext = { print(" next ") },
        onComplete = { print(" complete ") },
        onError = { print(" error ") }
    )
    // 什么也不會(huì)做

    println()

    print("[error]: ")

    Observable.error<Exception>(Exception()).subscribeBy(
        onNext = { print(" next ") },
        onComplete = { print(" complete ") },
        onError = { print(" error ") }
    )
    // `error()`只會(huì)調(diào)用onError方法

    println()
}

/**
 * 使用`repeat()`方法創(chuàng)建事件序列
 *
 * 表示指定的序列要發(fā)射多少次
 */
fun repeat() {
    // 重載方法
    // public final Observable<T> repeat(long times)
    // public final Observable<T> repeatUntil(BooleanSupplier stop)
    // public final Observable<T> repeatWhen(Function<? super Observable<Object>, ? extends ObservableSource<?>> handler)

    print("[repeat]: ")

    // 不指定次數(shù)即無限次發(fā)送(內(nèi)部調(diào)用有次數(shù)的重載方法并傳入Long.MAX_VALUE)  別執(zhí)行啊 ~ 卡的不行 ~
    // Observable.range(5, 10).repeat().subscribe { print("$it  ") }

    Observable.range(5, 10).repeat(1).subscribe { print("$it  ") }

    println()

    print("[repeatUntil]: ")

    // repeatUntil在滿足指定要求的時(shí)候停止重復(fù)發(fā)送,否則會(huì)一直發(fā)送
    // 這里當(dāng)隨機(jī)產(chǎn)生的數(shù)字`<10`時(shí)停止發(fā)送 否則繼續(xù)  (這里始終為true(即停止重復(fù)) 省的瘋了似的執(zhí)行)
    val numbers = arrayOf(0, 1, 2, 3, 4)
    numbers.toObservable().repeatUntil {
        Random(10).nextInt() < 10
    }.subscribe { print("$it  ") }

    println()
}

/**
 * 使用`timer()`方法創(chuàng)建事件序列
 *
 * 執(zhí)行延時(shí)任務(wù)
 *
 * 創(chuàng)建一個(gè)在給定的時(shí)間段之后返回一個(gè)特殊值的Observable,它在延遲一段給定的時(shí)間后發(fā)射一個(gè)簡單的數(shù)字0
 */
fun timer() {
    print("[timer]: ")

    // 在500毫秒之后輸出一個(gè)數(shù)字0
    val disposable = Observable.timer(500, TimeUnit.MILLISECONDS).subscribe { print("$it  ") }
    if (!disposable.isDisposed) disposable.dispose()

    println()
}

/**
 * 使用`from()`方法快捷創(chuàng)建事件隊(duì)列
 *
 * `fromArray`
 * `fromCallable`
 * `fromIterable`  (和上面的fromArray類似 只不過這里是集合罷了)
 *
 * 將傳入的數(shù)組依次發(fā)送出來
 */
fun from() {
    print("[fromArray]: ")

    val names = arrayOf("ha", "hello", "yummy", "kt", "world", "green", "delicious")
    // 注意:使用`*`展開數(shù)組
    val disposable = Observable.fromArray(*names).subscribe { print("$it  ") }
    if (!disposable.isDisposed) disposable.dispose()

    println()

    print("[fromCallable]: ")

    // 可以在Callable內(nèi)執(zhí)行一段代碼 并返回一個(gè)值給觀察者
    Observable.fromCallable { 1 }.subscribe { print("$it  ") }

    println()
}

/**
 * 使用`just()`方法快捷創(chuàng)建事件隊(duì)列
 *
 * 將傳入的參數(shù)依次發(fā)送出來(最少1個(gè) 最多10個(gè))
 */
fun just() {
    print("[just]: ")

    val disposable = Observable.just("Just1", "Just2", "Just3")
        // 將會(huì)依次調(diào)用:
        // onNext("Just1");
        // onNext("Just2");
        // onNext("Just3");
        // onCompleted();
        .subscribe { print("$it  ") }
    if (!disposable.isDisposed) disposable.dispose()

    println()
}

/**
 * 使用`range()`方法快捷創(chuàng)建事件隊(duì)列
 *
 * 創(chuàng)建一個(gè)序列
 */
fun range() {
    print("[range]: ")

    // 用Observable.range()方法產(chǎn)生一個(gè)序列
    // 用map方法將該整數(shù)序列映射成一個(gè)字符序列
    // 最后將得到的序列輸出來 forEach內(nèi)部也是調(diào)用的 subscribe(Consumer<? super T> onNext)
    val disposable = Observable.range(0, 10)
        .map { item -> "range$item" }
//        .forEach { print("$it  ") }
        .subscribeBy(
            onNext = { print("$it  ") },
            onComplete = { print("range complete !!! ") }
        )
    if (!disposable.isDisposed) disposable.dispose()

    println()
}

/**
 * 使用`map()` `cast()` 做變換操作
 *
 * `map`操作符對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)你選擇的函數(shù),然后返回一個(gè)發(fā)射這些結(jié)果的Observable。默認(rèn)不在任何特定的調(diào)度器上執(zhí)行
 *
 * `cast`操作符將原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)都強(qiáng)制轉(zhuǎn)換為一個(gè)指定的類型`(多態(tài))`,然后再發(fā)射數(shù)據(jù),它是map的一個(gè)特殊版本
 */
fun mapCast() {
    print("[map]: ")

    Observable.range(1, 5).map { item -> "to String $item" }.subscribe { print("$it  ") }

    println()

    print("[cast]: ")

    // 將`Date`轉(zhuǎn)換為`Any` (如果前面的Class無法轉(zhuǎn)換成第二個(gè)Class就會(huì)出現(xiàn)ClassCastException)
    Observable.just(Date()).cast(Any::class.java).subscribe { print("$it  ") }

    println()
}

/*
    `map`與`flatMap`的區(qū)別(出自朱凱):
    `map`是在一個(gè) item 被發(fā)射之后,到達(dá) map 處經(jīng)過轉(zhuǎn)換變成另一個(gè) item ,然后繼續(xù)往下走;
    `flapMap`是 item 被發(fā)射之后,到達(dá) flatMap 處經(jīng)過轉(zhuǎn)換變成一個(gè) Observable
    而這個(gè) Observable 并不會(huì)直接被發(fā)射出去,而是會(huì)立即被激活,然后把它發(fā)射出的每個(gè) item 都傳入流中,再繼續(xù)走下去。
 */

/**
 * 使用`flatMap()` `contactMap()` 做變換操作
 *
 * `flatMap`將一個(gè)發(fā)送事件的上游Observable變換為多個(gè)發(fā)送事件的Observables,然后將它們發(fā)射的事件合并后放進(jìn)一個(gè)單獨(dú)的Observable里
 * `flatMap`不保證順序  `contactMap()`保證順序
 */
fun flatMap2contactMap() {
    print("[flatMap]: ")

    /*
        `flatMap()` 的原理是這樣的:
        1. 使用傳入的事件對(duì)象創(chuàng)建一個(gè) Observable 對(duì)象;
        2. 并不發(fā)送這個(gè) Observable, 而是將它激活,于是它開始發(fā)送事件;
        3. 每一個(gè)創(chuàng)建出來的 Observable 發(fā)送的事件,都被匯入同一個(gè) Observable ,
        而這個(gè) Observable 負(fù)責(zé)將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法。
        這三個(gè)步驟,把事件拆成了兩級(jí),通過一組新創(chuàng)建的 Observable 將初始的對(duì)象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去。
        而這個(gè)『鋪平』就是 flatMap() 所謂的 flat。
     */

    Observable.range(1, 5).flatMap {
        Observable.just("$it to flat")
    }.subscribe { print("$it  ") }

    println()

    print("[contactMap]: ")

    Observable.range(1, 5).concatMap {
        Observable.just("$it to concat")
    }.subscribe { print("$it  ") }

    println()

}

/**
 * `flatMap`挺重要的,再舉一個(gè)例子
 *
 * 依次打印Person集合中每個(gè)元素中Plan的action
 */
fun flatMapExample() {
    print("[flatMapExample]: ")

    arrayListOf(
        Person("KYLE", arrayListOf(Plan(arrayListOf("Study RxJava", "May 1 to go home")))),
        Person("WEN QI", arrayListOf(Plan(arrayListOf("Study Java", "See a Movie")))),
        Person("LU", arrayListOf(Plan(arrayListOf("Study Kotlin", "Play Game")))),
        Person("SUNNY", arrayListOf(Plan(arrayListOf("Study PHP", "Listen to music"))))
    ).toObservable().flatMap {
        Observable.fromIterable(it.planList)
    }.flatMap {
        Observable.fromIterable(it.actionList)
    }.subscribeBy(
        onNext = { print("$it  ") }
    )

    println()
}

/**
 * 使用`flatMapIterable()`做變換操作
 *
 * 將上流的任意一個(gè)元素轉(zhuǎn)換成一個(gè)Iterable對(duì)象
 */
fun flatMapIterable() {
    print("[flatMapIterable]: ")

    Observable.range(1, 5)
        .flatMapIterable { integer ->
            Collections.singletonList("$integer")
        }
        .subscribe { print("$it  ") }

    // [flatMapIterable]: 1  2  3  4  5

    println()
}

/**
 * 使用`buffer()`做變換操作
 *
 * 用于將整個(gè)流進(jìn)行分組
 */
fun buffer() {
    print("[buffer]: ")

    // 生成一個(gè)7個(gè)整數(shù)構(gòu)成的流,然后使用`buffer`之后,這些整數(shù)會(huì)被3個(gè)作為一組進(jìn)行輸出

    // count 是一個(gè)buffer的最大值
    // skip 是指針后移的距離(不定義時(shí)就為count)
    // 例如 1 2 3 4 5 buffer(3) 的結(jié)果為:[1,2,3] [4,5]      (buffer(3)也就是buffer(3,3))
    // 例如 1 2 3 4 5 buffer(3,2) 的結(jié)果為:[1,2,3] [3,4,5] [5]
    Observable.range(1, 5).buffer(3)
        .subscribe {
            print("${Arrays.toString(it.toIntArray())}  ")
        }

    println()
}

/**
 * 使用`groupBy()`做變換操作
 *
 * 用于分組元素(根據(jù)groupBy()方法返回的值進(jìn)行分組)
 * 將發(fā)送的數(shù)據(jù)進(jìn)行分組,每個(gè)分組都會(huì)返回一個(gè)被觀察者。
 */
fun groupBy() {
    print("[groupBy]: ")

    // 使用concat方法先將兩個(gè)Observable拼接成一個(gè)Observable,然后對(duì)其元素進(jìn)行分組。
    // 這里我們的分組依據(jù)是整數(shù)的值,這樣我們將得到一個(gè)Observable<GroupedObservable<Integer, Integer>>類型的Observable。
    // 然后,我們?cè)賹⒌玫降男蛄衅唇映梢粋€(gè),并進(jìn)行訂閱輸出

    Observable.concat(Observable.concat(
        Observable.range(1, 3), Observable.range(1, 4)
    ).groupBy { it }
    ).subscribe { print("groupBy: $it  ") }

    // [groupBy]: groupBy: 1  groupBy: 1  groupBy: 2  groupBy: 2  groupBy: 3  groupBy: 3  groupBy: 4

    println()
}

/**
 * 使用`scan()`做變換操作
 *
 * 將數(shù)據(jù)以一定的邏輯聚合起來
 *
 * scan操作符對(duì)原始Observable發(fā)射的第一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后將那個(gè)函數(shù)的結(jié)果作為自己的第一項(xiàng)數(shù)據(jù)發(fā)射。
 * 它將函數(shù)的結(jié)果同第二項(xiàng)數(shù)據(jù)一起填充給這個(gè)函數(shù)來產(chǎn)生它自己的第二項(xiàng)數(shù)據(jù)。
 * 它持續(xù)進(jìn)行這個(gè)過程來產(chǎn)生剩余的數(shù)據(jù)序列。這個(gè)操作符在某些情況下被叫做`accumulator`
 */
fun scan() {
    print("[scan]: ")

    val disposable = Observable.just(1, 2, 3, 4, 5)
        .scan { t1: Int, t2: Int -> t1 + t2 }
        .subscribe { print("$it  ") }
    if (!disposable.isDisposed) disposable.dispose()

    // [scan]: 1  3  6  10  15

    println()
}

/**
 * 使用`window()`做變換操作
 *
 * 將事件分組 參數(shù)`count`就是分的組數(shù)
 *
 * `window`和`buffer`類似,但不是發(fā)射來自原始Observable的數(shù)據(jù)包,它發(fā)射的是Observable,
 * 這些Observables中的每一個(gè)都發(fā)射原始Observable數(shù)據(jù)的一個(gè)子集,最后發(fā)射一個(gè)onCompleted通知。
 */
fun window() {
    print("[window]: ")

    Observable.range(1, 10).window(3)
        .subscribeBy(
            onNext = { it.subscribe { int -> print("{${it.hashCode()} : $int} ") } },
            onComplete = { print("onComplete ") }
        )

    println()
}

/**
 * 使用`filter()`做過濾操作
 *
 * 對(duì)源做過濾
 */
fun filter() {
    print("[filter]: ")

    // 過濾掉 <=5 的數(shù)據(jù)源 只有 >5 的數(shù)據(jù)源會(huì)發(fā)送出去
    Observable.range(1, 10).filter { it > 5 }.subscribe { print("$it  ") }

    println()
}

/**
 * 使用`element()`獲取源中指定位置的數(shù)據(jù)
 *
 * `elementAt`  指定位置
 * `firstElement`  第一個(gè)
 * `lastElement`   最后一個(gè)
 */
fun element() {
    print("[elementAt]: ")

    // `elementAt` 在輸入的 index 超出事件序列的總數(shù)就不會(huì)出現(xiàn)任何結(jié)果
    // `elementAtOrError` 則會(huì)報(bào)異常
    // `first...` 和 `last...` 都類似

    // 只有index=0的數(shù)據(jù)源會(huì)被發(fā)射
    Observable.range(1, 10).elementAt(0).subscribe { print("$it  ") }

    println()

    print("[firstElement]: ")

    Observable.range(1, 19).firstElement().subscribe { print("$it  ") }

    println()

    print("[lastElement]: ")

    Observable.range(34, 2).lastElement().subscribe { print("$it  ") }

    println()
}

/**
 * 使用`distinct()`對(duì)源中相同的數(shù)據(jù)進(jìn)行過濾
 */
fun distinct() {
    print("[distinct]: ")

    Observable.just(1, 1, 1, 2, 3, 4, 1, 5, 5, 6)
        .distinct()
        .subscribe { print("$it  ") }

    // [distinct]: 1  2  3  4  5  6

    println()

    print("[distinctUntilChanged]: ")

    Observable.just(1, 1, 1, 2, 3, 4, 1, 5, 5, 6)
        .distinctUntilChanged()
        .subscribe { print("$it  ") }

    // [distinctUntilChanged]: 1  2  3  4  1  5  6

    println()
}

/**
 * 使用`skip()` 過濾掉數(shù)據(jù)的前n項(xiàng)
 *
 * `skip`         過濾掉數(shù)據(jù)的前n項(xiàng) 參數(shù)count代表跳過事件的數(shù)量
 * `skipLast`     與`skip` 功能相反 過濾掉數(shù)據(jù)的后n項(xiàng)
 * `skipUntil`    當(dāng) skipUntil() 中的 Observable 發(fā)送事件了,原來的 Observable 才會(huì)發(fā)送事件給觀察者。
 * `skipWhile`    可以設(shè)置條件,當(dāng)某個(gè)數(shù)據(jù)滿足條件時(shí)不發(fā)送該數(shù)據(jù),反之則發(fā)送。
 */
fun skip() {
    print("[skip]: ")

    Observable.just(1, 2, 3, 4, 5, 6)
        .skip(2)
        .subscribe { print("$it  ") }

    // [skip]: 3  4  5  6

    println()

    print("[skipUntil]: ")

    Observable.just(6)
        .skipUntil<Int> { Observable.just(2).delay(2, TimeUnit.SECONDS).subscribe { print("$it  ") } }
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .subscribe { print("$it  ") }

    println()

    print("[skipWhile]: ")

    Observable.just(1, 2, 3, 4)
        .skipWhile {
            it < 3
        }.subscribe { print("$it  ") }

    // [skipWhile]: 3  4

    println()
}

/**
 * 使用`take()` 取數(shù)據(jù)的前n項(xiàng)
 *
 * `take`          取數(shù)據(jù)的前n項(xiàng) 參數(shù)count代表取的事件的數(shù)量
 * `takeLast`      與`take`功能相反 取數(shù)據(jù)的后n項(xiàng)
 * `takeUntil`
 * `takeWhile`
 *
 * 與`skip...`對(duì)應(yīng)
 */
fun take() {
    print("[take]: ")

    Observable.range(1, 5).take(2).subscribe { print("$it  ") }

    // [take]: 1  2

    println()
}

/**
 * 使用`ignoreElements()` 過濾所有源Observable產(chǎn)生的結(jié)果
 *
 * 只會(huì)把Observable的`onComplete`和`onError`事件通知給訂閱者
 */
fun ignoreElements() {
    print("[ignoreElements]: ")

    Observable.just(1, 1, 2, 3, 4)
        .ignoreElements()
        .subscribeBy(
            onComplete = { print(" onComplete ") },
            onError = { print(" onError ") }
            // 沒有`onNext`
        )

    println()
}

/**
 * 使用`debounce()` 限制發(fā)射頻率過快
 *
 * 如果兩件事件發(fā)送的時(shí)間間隔小于設(shè)定的時(shí)間間隔則`前一件`事件就不會(huì)發(fā)送給觀察者
 */
fun debounce() {

    print("[debounce]: ")

    Observable.create<Int> { emitter ->
        emitter.onNext(1)
        Thread.sleep(900)
        emitter.onNext(2)
    }.debounce(1, TimeUnit.SECONDS)
        .subscribe { print("$it  ") }

    // 2

    println()
}

/**
 * 使用`ofType()` 過濾不符合該類型事件
 */
fun ofType() {
    print("[ofType]: ")

    Observable.just(1, 2, 3, "k", "Y")
        .ofType(String::class.java)
        .subscribe { print("$it  ") }

    // [ofType]: k  Y

    println()
}

/**
 * `all`
 *
 * 判斷事件序列是否全部滿足某個(gè)事件,如果都滿足則返回 true,反之則返回 false
 */
fun all() {
    print("[all]: ")

    Observable.just(1, 2, 3, 4)
        .all { it < 5 }
        .subscribe(Consumer {
            print("$it  ")
        })

    // [all]: true

    println()
}

/**
 * `contains`
 *
 * 判斷事件序列中是否含有某個(gè)元素,如果有則返回 true,如果沒有則返回 false。
 */
fun contains() {
    print("[contains]: ")

    Observable.just(1, 2, 3, 4)
        .contains(3)
        .subscribe(Consumer {
            print("$it  ")
        })

    // [contains]: true

    println()
}

/**
 * `isEmpty`
 *
 * 判斷事件序列是否為空  是返回true  否返回false
 */
fun isEmpty() {
    print("[isEmpty]: ")

    Observable.create<String> { emitter ->
        emitter.onComplete()
    }.isEmpty
        .subscribe(Consumer {
            print("$it  ")
        })

    // [isEmpty]: true

    println()
}

/**
 * `defaultIfEmpty`
 *
 * 如果觀察者只發(fā)送一個(gè) onComplete() 事件,則可以利用這個(gè)方法發(fā)送一個(gè)值。
 */
fun defaultIfEmpty() {
    print("[defaultIfEmpty]: ")

    Observable.create<Int> { emitter ->
        emitter.onComplete()
    }.defaultIfEmpty(666)
        .subscribe { print("$it  ") }

    // [defaultIfEmpty]: 666

    println()

}

/**
 * `amb`
 *
 * amb() 要傳入一個(gè) Observable 集合,但是只會(huì)發(fā)送最先發(fā)送事件的 Observable 中的事件,其余 Observable 將會(huì)被丟棄。
 */
fun amb() {
    print("[amb]: ")

    val list = ArrayList<Observable<Long>>()
    list.add(Observable.intervalRange(1, 5, 2, 1, TimeUnit.SECONDS))
    list.add(Observable.intervalRange(6, 5, 0, 1, TimeUnit.SECONDS))

    Observable.amb(list)
        .subscribe { print("$it  ") }

    // [amb]:  6  7  8  9  10

    println()
}

/**
 * 使用`concat()`做組合操作
 *
 * 將多個(gè)Observable拼接起來,但是它會(huì)嚴(yán)格按照傳入的Observable的順序進(jìn)行發(fā)射,一個(gè)Observable沒有發(fā)射完畢之前不會(huì)發(fā)射另一個(gè)Observable里面的數(shù)據(jù)
 *
 * `concat()`方法內(nèi)部還是調(diào)用的`concatArray(source1, source2)`方法,只是在調(diào)用前對(duì)傳入的參數(shù)做了`null`判斷
 *
 * 與 `merge()` 作用基本一樣,只是 `merge()` 是并行發(fā)送事件,而 concat() 串行發(fā)送事件
 */
fun concat() {
    print("[concat]: ")

    val disposable = Observable.concat(Observable.range(1, 5), Observable.range(6, 5))
        .subscribe { print("$it  ") }
    if (!disposable.isDisposed) disposable.dispose()

    // [concat]: 1  2  3  4  5  6  7  8  9  10

    println()
}

/**
 * 使用`merge` 做組合操作
 *
 * 讓多個(gè)數(shù)據(jù)源的數(shù)據(jù)合并起來進(jìn)行發(fā)射(merge后的數(shù)據(jù)可能會(huì)交錯(cuò)發(fā)射)
 * 與 `concat()` 作用基本一樣,只是 `concat()` 是串行發(fā)送事件,而 merge() 并行發(fā)送事件
 *
 * 內(nèi)部實(shí)際操作為調(diào)用了`fromArray()+flatMap`方法 只是在調(diào)用前對(duì)數(shù)據(jù)源參數(shù)做了`null`判斷
 *
 * 與`mergeError`的比較
 * `mergeError`方法與`merge`方法的表現(xiàn)一致,
 * 只是在處理由`onError`觸發(fā)的錯(cuò)誤的時(shí)候有所不同。
 * `mergeError`方法會(huì)等待所有的數(shù)據(jù)發(fā)射完畢之后才把錯(cuò)誤發(fā)射出來,即使多個(gè)錯(cuò)誤被觸發(fā),該方法也只會(huì)發(fā)射出一個(gè)錯(cuò)誤信息。
 * 而如果使用`merger`方法,那么當(dāng)有錯(cuò)誤被觸發(fā)的時(shí)候,該錯(cuò)誤會(huì)直接被拋出來,并結(jié)束發(fā)射操作
 */
fun merge() {
    print("[merge]: ")

    Observable.merge(Observable.range(1, 5), Observable.range(6, 5))
        .subscribe { print("$it  ") }

    // [merge]: 1  2  3  4  5  6  7  8  9  10

    println()
}

/**
 * 使用`startWith` 做組合操作 在發(fā)送事件之前追加事件
 *
 * `startWith`         追加一個(gè)事件
 * `startWithArray`    追加多個(gè)事件
 *
 * `追加的事件會(huì)先發(fā)出`
 *
 * `startWith`方法可以用來在指定的數(shù)據(jù)源的之前插入幾個(gè)數(shù)據(jù)
 */
fun startWith() {
    print("[startWith]: ")

    Observable.range(5, 3)
        .startWithArray(1, 2, 3, 4)
        .startWith(0).subscribe { print("$it  ") }

    // [startWith]: 0  1  2  3  4  5  6  7

    println()
}

/**
 * 使用`zip` 做組合操作
 *
 * 用來將多個(gè)數(shù)據(jù)項(xiàng)進(jìn)行合并 根據(jù)各個(gè)被觀察者發(fā)送事件的順序一個(gè)個(gè)結(jié)合起來,最終發(fā)送的事件數(shù)量會(huì)與源 Observable 中最少事件的數(shù)量一樣
 * 為什么呢?因?yàn)閿?shù)據(jù)源少的那個(gè) Observable 發(fā)送完成后發(fā)送了 onComplete 方法,所以數(shù)據(jù)源多的那個(gè)就不會(huì)再發(fā)送事件了
 */
fun zip() {
    print("[zip]: ")

    Observable.zip(Observable.range(1, 6), Observable.range(6, 5),
        BiFunction<Int, Int, Int> { t1, t2 -> t1 * t2 })
        .subscribe { print("$it  ") }

    // 1 2 3 4  5 6
    // 6 7 8 9 10
    // 看上面兩行再看結(jié)果很明顯了吧

    // [zip]: 6  14  24  36  50

    println()
}

/**
 * 使用`combineLast` 做組合操作
 *
 * 用第一個(gè)數(shù)據(jù)源的最后一項(xiàng)和第二個(gè)數(shù)據(jù)源的每一項(xiàng)做合并
 */
fun combineLast() {
    print("[combineLast]: ")

    Observable.combineLatest(Observable.range(1, 6), Observable.range(6, 5),
        BiFunction<Int, Int, Int> { t1, t2 -> t1 * t2 })
        .subscribe { print("$it  ") }

    // 1 2 3 4  5 6
    // 6 7 8 9 10
    // 看上面兩行再看結(jié)果很明顯了吧

    // [combineLast]: 36  42  48  54  60

    println()
}

/**
 * 使用`reduce` 做組合操作
 *
 * 與 scan() 操作符的作用一樣也是將發(fā)送數(shù)據(jù)以一定邏輯聚合起來,
 * 這兩個(gè)的區(qū)別在于 scan() 每處理一次數(shù)據(jù)就會(huì)將事件發(fā)送給觀察者,而 reduce() 會(huì)將所有數(shù)據(jù)聚合在一起才會(huì)發(fā)送事件給觀察者
 */
fun reduce() {
    print("[reduce]: ")

    Observable.just(0, 1, 2, 3)
        .reduce { t1, t2 -> t1 + t2 }
        .subscribe { print("$it  ") }

    // [reduce]: 6

    println()
}

/**
 * 使用`collect` 做組合操作
 *
 * 將數(shù)據(jù)收集到數(shù)據(jù)結(jié)構(gòu)當(dāng)中
 */
fun collect() {
    print("[collect]: ")

    // `collect`接收兩個(gè)參數(shù) 第一個(gè)是要收集到的數(shù)據(jù)解構(gòu) 第二個(gè)是數(shù)據(jù)到數(shù)據(jù)結(jié)構(gòu)中的操作
    Observable.just(1, 2, 3, 4)
        .collect({ ArrayList<Int>() }, { t1, t2 -> t1.add(t2) })
        .subscribe(Consumer<ArrayList<Int>> {
            print("$it  ")
        })

    // [collect]: [1, 2, 3, 4]

    println()
}

/**
 * 使用`count` 做組合操作
 *
 * 返回被觀察者發(fā)送事件的數(shù)量
 */
fun count() {
    print("[count]: ")

    Observable.just(1, 2, 3)
        .count()
        .subscribe(Consumer {
            print("$it  ")
        })

    // [count]: 3

    println()
}

/**
 * 用于在發(fā)射數(shù)據(jù)之前停頓指定的時(shí)間
 */
fun delay() {
    print("[delay]: ")

    Observable.range(1, 5).delay(1, TimeUnit.SECONDS).subscribe { print("$it  ") }

    println()
}

/**
 * do 系列
 */
fun doSeries() {
    print("[doSeries]: ")
    // `doOnEach`  當(dāng)每個(gè)`onNext`調(diào)用[前]觸發(fā) 并可取出`onNext`發(fā)送的值  但是方法參數(shù)是一個(gè)`Notification<T>`的包裝 可以通過`.value`取出`onNext`的值
    // `doOnNext`  在每個(gè)`onNext`調(diào)用[前]觸發(fā) 并可取出`onNext`發(fā)送的值  方法參數(shù)就是`onNext`的值
    // `doAfterNext`   在每個(gè)`onNext`調(diào)用[后]觸發(fā) 并可取出`onNext`發(fā)送的值  方法參數(shù)就是`onNext`的值
    // `doOnComplete`  在`onComplete`調(diào)用[前]觸發(fā)
    // `doOnError`  在`onError`調(diào)用[前]觸發(fā)
    // `doOnSubscribe`  在`onSubscribe`調(diào)用[前]觸發(fā)
    // `doOnDispose`  在調(diào)用 Disposable 的 dispose() 之[后]回調(diào)該方法
    // `doOnTerminate `  在 onError 或者 onComplete 發(fā)送之[前]回調(diào)
    // `doAfterTerminate `   在onError 或者 onComplete 發(fā)送之[后]回調(diào)  取消訂閱后就不會(huì)回調(diào)
    // `doFinally`   在所有事件發(fā)送完畢之后回調(diào)該方法   即使取消訂閱也會(huì)回調(diào)
    // `onErrorReturn`   當(dāng)接受到一個(gè) onError() 事件之后回調(diào),返回的值會(huì)回調(diào) onNext() 方法,并正常結(jié)束該事件序列
    // `onErrorResumeNext`   當(dāng)接收到 onError() 事件時(shí),返回一個(gè)新的 Observable,并正常結(jié)束事件序列
    // `onExceptionResumeNext`   與 onErrorResumeNext() 作用基本一致,但是這個(gè)方法只能捕捉 Exception

    // Test Code:
    Observable.create<String> { emitter ->
        emitter.onNext("K")
        emitter.onNext("Y")
        emitter.onNext("L")
        emitter.onNext("E")
        emitter.onComplete()
    }.doOnTerminate {
        print("doOnNext: $  ")
    }.subscribeBy(
        onNext = { print("accept: $it  ") },
        onComplete = { print("  onComplete  ") },
        onError = { print("  onError  ") }
    )

    println()
}

/**
 * `retry`
 *
 * 另:`retryUntil` 出現(xiàn)錯(cuò)誤事件之后,可以通過此方法判斷是否繼續(xù)發(fā)送事件 true不重試 false重試
 *
 * 如果出現(xiàn)錯(cuò)誤事件,則會(huì)重新發(fā)送所有事件序列。times 是代表重新發(fā)的次數(shù)。
 */
fun retry() {
    print("[retry]: ")

    Observable.create<String> { emitter ->
        emitter.onNext("K")
        emitter.onError(Exception("404"))
    }
        .retry(2)
        .subscribeBy(
            onNext = { print("accept: $it  ") },
            onComplete = { print("  onComplete  ") },
            onError = { print("  onError  ") }
        )

    // [retry]: accept: K  accept: K  accept: K    onError
    // 重試了2次

    println()
}

/**
 * `subscribeOn`
 *
 * 指定被觀察者的線程,要注意的時(shí),如果多次調(diào)用此方法,只有第一次有效。
 */
fun subscribeOn() {
    print("[subscribeOn]: ")

//    Observable.create<String> { emitter ->
//        emitter.onNext("K")
//        print("current thread: ${Thread.currentThread().name}")
//    }.subscribeOn(Schedulers.computation()).subscribe()

    // current thread: RxComputationThreadPool-2

    println()
}

/**
 * `observeOn`
 *
 * 指定觀察者的線程,每指定一次就會(huì)生效一次。
 */
fun observeOn() {
    print("[observeOn]: ")

//    Observable.create<String> { emitter ->
//        emitter.onNext("K")
//    }.observeOn(Schedulers.io())
//        .subscribe { print("current thread: ${Thread.currentThread().name}") }

    // current thread : RxCachedThreadScheduler -1

    println()
}

/**
 * RxKotlin擴(kuò)展庫的一個(gè)簡單使用
 * 也是RxKotlin官方給出的一個(gè)例子
 *
 * 更多查看:https://github.com/ReactiveX/RxKotlin/blob/2.x/README.md
 */
fun rkExExample() {
    print("[rkExExample]: ")

    val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

    // 相當(dāng)于是Observable.fromIterable(this) 和上面的fromArray()類似 一個(gè)數(shù)組 一個(gè)集合
    list.toObservable()  // extension function for Iterables
        .filter { it.length > 5 }
        .subscribeBy(   // 對(duì)應(yīng)上面`create`創(chuàng)建方式的最后調(diào)用的subscribe
            onNext = { print("$it  ") },
            onError = { it.printStackTrace() },
            onComplete = { print(" Done! ") }
        )

    // Result:
    // [rkExExample]: Epsilon   Done!
    println()
}


/**
 * `compose`
 * 與`Transformer`連用
 *
 * `compose`操作符和Transformer結(jié)合使用,一方面讓代碼看起來更加簡潔化,另一方面能夠提高代碼的復(fù)用性。
 * RxJava提倡鏈?zhǔn)秸{(diào)用,`compose`能夠防止鏈?zhǔn)奖淮蚱啤? *
 * compose操作于整個(gè)數(shù)據(jù)流中,能夠從數(shù)據(jù)流中得到原始的Observable<T>/Flowable<T>...
 * 當(dāng)創(chuàng)建Observable/Flowable...時(shí),compose操作符會(huì)立即執(zhí)行,而不像其他的操作符需要在onNext()調(diào)用后才執(zhí)行
 */
fun compose() {
    println("[compose]: ")

    Observable.just(1, 2)
        .compose(transformerInt2String())
        .compose(applySchedulers())
        .subscribe {
            print("$it  ")
            if (it == "1") print(" ${Thread.currentThread().name} ")
        }

    println()
}


// 用于`flatMap`舉例子
data class Person(private val name: String, val planList: List<Plan>)

data class Plan(val actionList: List<String>)

// 用于`compose`舉例子
// 將發(fā)射的Int轉(zhuǎn)換為String
fun transformerInt2String() = ObservableTransformer<Int, String> { upstream -> upstream.map { int -> "$int" } }

// 切換線程
fun <T> applySchedulers() =
    ObservableTransformer<T, T> { upstream -> upstream.observeOn(Schedulers.io()).subscribeOn(Schedulers.io()) }

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本文首發(fā)于“隨手記技術(shù)團(tuán)隊(duì)”公眾號(hào) 大概從2015年開始,RxJava1.0開始快速流行起來,短短兩年時(shí)間,RxJ...
    HolenZhou閱讀 2,217評(píng)論 0 19
  • 請(qǐng)?jiān)试S我借鑒前輩們的東西~~~~ 感激不盡~~~~~ 以下為Android 框架排行榜 么么噠~ Android...
    嗯_(tái)新閱讀 2,505評(píng)論 3 32
  • 注意Rxjava配合Retrofit進(jìn)行網(wǎng)絡(luò)請(qǐng)求進(jìn)行了更新,對(duì)Rxjava生命周期處理更加合理,詳情請(qǐng)看Demo ...
    sweetying閱讀 12,140評(píng)論 3 97
  • 一、Retrofit詳解 ·Retrofit的官網(wǎng)地址為 : http://square.github.io/re...
    余生_d630閱讀 2,083評(píng)論 0 5
  • 人物:M君(已婚已育),H君(已婚未育),W君(大齡單身),三人為大學(xué)同寢,關(guān)系非常好,畢業(yè)多年,依然樂于互相八卦...
    陸拾雜記閱讀 381評(píng)論 2 5

友情鏈接更多精彩內(nèi)容