RxJava(RxKotlin)、RxAndroid 簡單使用

RxJava(RxKotlin)、RxAndroid

ps:文章中涉及到的代碼均使用 Kotlin 實(shí)現(xiàn),即需要導(dǎo)入 RxKotlin,同時(shí)也涉及到了 RxAndroid 相關(guān)內(nèi)容

導(dǎo)入方法:

  1. 在項(xiàng)目的 build.gradle 文件中添加 RxKotlin 的版本信息
buildscript {
    ext.rx_kotlin_version = '1.0.0'
    ext.rx_android_version = '1.2.1'
}
  1. 在 module 的 build.gradle 文件中添加 RxKotlin 以及 RxAndroid 的依賴
dependencies {
    // RxKotlin RxAndroid
    implementation "io.reactivex:rxkotlin:$rx_kotlin_version"
    implementation "io.reactivex:rxandroid:$rx_android_version"
}

1. 一些常用的網(wǎng)站

  1. RxJava文檔
  2. ?RxJava中文文檔
  3. ?RxJava經(jīng)典資料

2. 觀察者模式的四大要素

  1. Observable 被觀察者
  2. Observer 觀察者
  3. subscribe 訂閱
  4. 事件

3. 操作符

3.1 Creating 操作符

create
just
from
range
repeat
interval
defer
empty / never
timer
start
  • create 操作符,直接創(chuàng)建一個(gè) Subscriber 對象
Observable.create<String> {
    it.onNext("Hello Rx!")
    it.onCompleted()
}.subscribe(object : Subscriber<String>() {
    override fun onNext(t: String) {
        println("onNext() --> $t")
    }
    override fun onCompleted() {
        println("onCompleted()")
    }
    override fun onError(e: Throwable?) {
        println("onError()")
    }
})
onNext() --> Hello Rx!
onCompleted()
  • just 操作符將一系列對象逐個(gè)發(fā)射出去,注意集合對象將作為一個(gè)整體進(jìn)行發(fā)射
Observable.just(1, 1.0, "String", true)
        .subscribe(object : Subscriber<Any>() {
            override fun onNext(t: Any) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })

Observable.just(listOf(1, 2, 3, 4, 5))
        .subscribe(object : Subscriber<List<Int>>() {
            override fun onNext(t: List<Int>) {
                t.forEach { println("onNext() --> $it") }
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 1
onNext() --> 1.0
onNext() --> String
onNext() --> true
onCompleted()
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 4
onNext() --> 5
onCompleted()
  • from 操作符可以將集合中的元素逐個(gè)發(fā)射出去
Observable.from(listOf(5, 4, 3, 2, 1, 0))
        .subscribe(object : Subscriber<Int>() {
            override fun onNext(t: Int) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 5
onNext() --> 4
onNext() --> 3
onNext() --> 2
onNext() --> 1
onNext() --> 0
onCompleted()
  • range 在一定范圍內(nèi)向觀察者發(fā)射整型數(shù)據(jù),repeat 重復(fù)發(fā)射,默認(rèn)重復(fù)無數(shù)次
Observable.range(1, 3)
        .repeat(2)
        .subscribe(object : Subscriber<Int>() {
            override fun onNext(t: Int) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 1
onNext() --> 2
onNext() --> 3
onCompleted()
  • interval 定時(shí)向觀察者發(fā)送一個(gè) Long 類型的數(shù)字(逐個(gè)疊加)
Observable.interval(2, 2, TimeUnit.SECONDS)
        .subscribe(object : Subscriber<Long>() {
            override fun onNext(t: Long) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 0
onNext() --> 1
onNext() --> 2
onNext() --> 3
...
  • defer 延遲創(chuàng)建 Observable 對象,只有在調(diào)用 subscribe() 方法時(shí),才會(huì)創(chuàng)建 Observable 對象
var arg = "初始值"
val observable = Observable.defer { Observable.just(arg) }
arg = "再次賦值"
observable.subscribe(object : Subscriber<String>() {
    override fun onNext(t: String) {
        println("onNext() --> $t")
    }
    override fun onCompleted() {
        println("onCompleted()")
    }
    override fun onError(e: Throwable?) {
        println("onError()")
    }
})
onNext() --> 再次賦值
onCompleted()

3.2 Transforming 操作符

map
flatMap
groupBy
buffer
scan
window
  • map
Observable.just(123, 234).map {
       "¥ $it"
   }.subscribe(object : Subscriber<String>() {
       override fun onNext(t: String) {
           println("onNext() --> $t")
       }
       override fun onCompleted() {
           println("onCompleted()")
       }
       override fun onError(e: Throwable?) {
           println("onError()")
       }
   })
onNext() --> ¥ 123
onNext() --> ¥ 234
onCompleted()
  • flatMap
Observable.just(123, 234, 345)
        .flatMap {
            Observable.just("$ $it")
        }.subscribe(object : Subscriber<String>() {
            override fun onNext(t: String) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> $ 123
onNext() --> $ 234
onNext() --> $ 345
onCompleted()
  • groupBy
Observable.just(1, 2, 3, 4, 5, 6)
        .groupBy { it % 2 }
        .subscribe(object : Observer<GroupedObservable<Int, Int>> {
            override fun onError(e: Throwable?) {
            }
            override fun onNext(t: GroupedObservable<Int, Int>) {
                t.subscribe(object : Subscriber<Int>() {
                    override fun onNext(r: Int) {
                        println("group -> ${t.key}, value -> $r")
                    }
                    override fun onCompleted() {
                    }
                    override fun onError(e: Throwable?) {
                    }
                })
            }
            override fun onCompleted() {
            }
        })
group -> 1, value -> 1
group -> 0, value -> 2
group -> 1, value -> 3
group -> 0, value -> 4
group -> 1, value -> 5
group -> 0, value -> 6
  • buffer
Observable.range(0, 7)
        .buffer(3)
        .subscribe(object : Subscriber<List<Int>>() {
            override fun onNext(t: List<Int>) {
                println(t)
            }
            override fun onCompleted() {
            }
            override fun onError(e: Throwable?) {
            }
        })
[0, 1, 2]
[3, 4, 5]
[6]
  • scan
Observable.range(1, 5)
        .scan { sum, num -> sum + num }
        .subscribe(object : Subscriber<Int>() {
            override fun onNext(t: Int) {
                println("sum = $t")
            }
            override fun onCompleted() {
            }
            override fun onError(e: Throwable?) {
            }
        })
sum = 1
sum = 3
sum = 6
sum = 10
sum = 15

3.3 Filtering 操作符

debounce // 在一定時(shí)間間隔內(nèi)沒有操作,數(shù)據(jù)才會(huì)發(fā)射給觀察者
distinct // 去重
elementAt // 取指定位置的一個(gè)數(shù)據(jù)
filter // 按照指定的規(guī)則進(jìn)行條件的過濾
first // 取第一個(gè)數(shù)據(jù)
last // 取最后一個(gè)數(shù)據(jù)
ignoreElements // 忽略所有數(shù)據(jù),不向觀察者發(fā)送任何數(shù)據(jù),只回調(diào) onCompleted() 或 onError()
sample // 取樣
skip //  跳過
skipLast // 跳過最后幾項(xiàng)
take
takeLast
  • debounce
Observable.create<Int> {
    for (i in 1..10) {
        try {
            it.onNext(i)
            if (i % 2 == 0) {
                Thread.sleep(1000)
            } else {
                Thread.sleep(2000)
            }
        } catch (e: Exception) {
            it.onError(e)
        }
    }
    it.onCompleted()
}
        .subscribeOn(Schedulers.io())
        .debounce(2, TimeUnit.SECONDS)
        .subscribe(object : Subscriber<Int>() {
            override fun onNext(t: Int) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 1
onNext() --> 3
onNext() --> 5
onNext() --> 7
onNext() --> 9
onNext() --> 10
onCompleted()
  • distinct
Observable.just(1, 2, 3, 4, 2, 3)
        .distinct()
        .subscribe(object : Subscriber<Int>() {
            override fun onNext(t: Int) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 4
onCompleted()

3.4 Combining 操作符(組合)

zip
merge
startWith
combineLatest
join
switchOnNext
  • zip 用來合并兩個(gè) Observable 發(fā)射的數(shù)據(jù)項(xiàng),根據(jù) Func2() 函數(shù)指定的規(guī)則生成一個(gè)新的 Observable 并發(fā)射出去,當(dāng)其中一個(gè) Observable 發(fā)射數(shù)據(jù)結(jié)束或者出現(xiàn)異常后,另一個(gè) Observable 也將停止發(fā)射數(shù)據(jù)
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
Observable.zip(observable1, observable2) { o1, o2 ->
    o1 + o2
}.subscribe(object : Subscriber<Int>() {
    override fun onNext(t: Int) {
        println("onNext() --> $t")
    }
    override fun onCompleted() {
        println("onCompleted()")
    }
    override fun onError(e: Throwable?) {
        println("onError()")
    }
})
onNext() --> 11
onNext() --> 22
onNext() --> 33
onCompleted()
  • merge 將兩個(gè) Observable 發(fā)射的數(shù)據(jù)項(xiàng)按照發(fā)射時(shí)間順序合并成一個(gè) Observable 進(jìn)行發(fā)射
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
Observable.merge(observable1, observable2)
        .subscribe(object : Subscriber<Int>() {
            override fun onNext(t: Int) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 10
onNext() --> 20
onNext() --> 30
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 4
onCompleted()
  • startWith 用于在一個(gè) Observable 發(fā)射數(shù)據(jù)前插入一個(gè) Observable
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
observable1.startWith(observable2)
        .subscribe(object : Subscriber<Int>() {
            override fun onNext(t: Int) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 4
onNext() --> 10
onNext() --> 20
onNext() --> 30
onCompleted()
  • combineLatest 用于將兩個(gè) Observable 發(fā)射的臨近的數(shù)據(jù)項(xiàng)通過 Func2() 函數(shù)指定的規(guī)則組合成一個(gè)新的 Observable
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
Observable.combineLatest(observable1, observable2) { o1, o2 ->
    o1 + o2
}.subscribe(object : Subscriber<Int>() {
    override fun onNext(t: Int) {
        println("onNext() --> $t")
    }
    override fun onCompleted() {
        println("onCompleted()")
    }
    override fun onError(e: Throwable?) {
        println("onError()")
    }
onNext() --> 31
onNext() --> 32
onNext() --> 33
onNext() --> 34
onCompleted()
  • join
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
observable1.join(observable2,
        { Observable.timer(2, TimeUnit.SECONDS) },
        { Observable.timer(0, TimeUnit.SECONDS) },
        { t1, t2 -> Observable.just(t1 + t2) })
        .subscribe(object : Subscriber<Observable<Int>>() {
            override fun onNext(t: Observable<Int>) {
                t.subscribe(object : Subscriber<Int>() {
                    override fun onNext(data: Int) {
                        println("onNext() --> $data")
                    }
                    override fun onCompleted() {
                        println("onCompleted()")
                    }
                    override fun onError(e: Throwable?) {
                        println("onError()")
                    }
                })
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 11
onCompleted()
onNext() --> 21
onCompleted()
onNext() --> 31
onCompleted()
onNext() --> 12
onCompleted()
onNext() --> 22
onCompleted()
onNext() --> 32
onCompleted()
onNext() --> 13
onCompleted()
onNext() --> 23
onCompleted()
onNext() --> 33
onCompleted()
onNext() --> 14
onCompleted()
onNext() --> 24
onCompleted()
onNext() --> 34
onCompleted()
onCompleted()

4. 線程調(diào)度(結(jié)合 RxAndroid)

  1. Schedulers.io() I/O 線程,執(zhí)行耗時(shí)操作
  2. AndroidSchedulers.mainThread() Android 中的UI線程,執(zhí)行UI更新
  3. subscribeOn() 調(diào)度被觀察者運(yùn)行的線程
  4. observeOn() 調(diào)度觀察者運(yùn)行的線程
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容