RxJava(RxKotlin)、RxAndroid
ps:文章中涉及到的代碼均使用 Kotlin 實(shí)現(xiàn),即需要導(dǎo)入 RxKotlin,同時(shí)也涉及到了 RxAndroid 相關(guān)內(nèi)容
導(dǎo)入方法:
- 在項(xiàng)目的 build.gradle 文件中添加 RxKotlin 的版本信息
buildscript {
ext.rx_kotlin_version = '1.0.0'
ext.rx_android_version = '1.2.1'
}
- 在 module 的 build.gradle 文件中添加 RxKotlin 以及 RxAndroid 的依賴
dependencies {
// RxKotlin RxAndroid
implementation "io.reactivex:rxkotlin:$rx_kotlin_version"
implementation "io.reactivex:rxandroid:$rx_android_version"
}
1. 一些常用的網(wǎng)站
2. 觀察者模式的四大要素
-
Observable被觀察者 -
Observer觀察者 -
subscribe訂閱 - 事件
3. 操作符
3.1 Creating 操作符
create
just
from
range
repeat
interval
defer
empty / never
timer
start
-
create操作符,直接創(chuàng)建一個(gè) Subscriber 對象
Observable.create<String> {
it.onNext("Hello Rx!")
it.onCompleted()
}.subscribe(object : Subscriber<String>() {
override fun onNext(t: String) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> Hello Rx!
onCompleted()
-
just操作符將一系列對象逐個(gè)發(fā)射出去,注意集合對象將作為一個(gè)整體進(jìn)行發(fā)射
Observable.just(1, 1.0, "String", true)
.subscribe(object : Subscriber<Any>() {
override fun onNext(t: Any) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
Observable.just(listOf(1, 2, 3, 4, 5))
.subscribe(object : Subscriber<List<Int>>() {
override fun onNext(t: List<Int>) {
t.forEach { println("onNext() --> $it") }
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 1
onNext() --> 1.0
onNext() --> String
onNext() --> true
onCompleted()
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 4
onNext() --> 5
onCompleted()
-
from操作符可以將集合中的元素逐個(gè)發(fā)射出去
Observable.from(listOf(5, 4, 3, 2, 1, 0))
.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 5
onNext() --> 4
onNext() --> 3
onNext() --> 2
onNext() --> 1
onNext() --> 0
onCompleted()
-
range在一定范圍內(nèi)向觀察者發(fā)射整型數(shù)據(jù),repeat重復(fù)發(fā)射,默認(rèn)重復(fù)無數(shù)次
Observable.range(1, 3)
.repeat(2)
.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 1
onNext() --> 2
onNext() --> 3
onCompleted()
-
interval定時(shí)向觀察者發(fā)送一個(gè) Long 類型的數(shù)字(逐個(gè)疊加)
Observable.interval(2, 2, TimeUnit.SECONDS)
.subscribe(object : Subscriber<Long>() {
override fun onNext(t: Long) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 0
onNext() --> 1
onNext() --> 2
onNext() --> 3
...
-
defer延遲創(chuàng)建 Observable 對象,只有在調(diào)用 subscribe() 方法時(shí),才會(huì)創(chuàng)建 Observable 對象
var arg = "初始值"
val observable = Observable.defer { Observable.just(arg) }
arg = "再次賦值"
observable.subscribe(object : Subscriber<String>() {
override fun onNext(t: String) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 再次賦值
onCompleted()
3.2 Transforming 操作符
map
flatMap
groupBy
buffer
scan
window
map
Observable.just(123, 234).map {
"¥ $it"
}.subscribe(object : Subscriber<String>() {
override fun onNext(t: String) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> ¥ 123
onNext() --> ¥ 234
onCompleted()
flatMap
Observable.just(123, 234, 345)
.flatMap {
Observable.just("$ $it")
}.subscribe(object : Subscriber<String>() {
override fun onNext(t: String) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> $ 123
onNext() --> $ 234
onNext() --> $ 345
onCompleted()
groupBy
Observable.just(1, 2, 3, 4, 5, 6)
.groupBy { it % 2 }
.subscribe(object : Observer<GroupedObservable<Int, Int>> {
override fun onError(e: Throwable?) {
}
override fun onNext(t: GroupedObservable<Int, Int>) {
t.subscribe(object : Subscriber<Int>() {
override fun onNext(r: Int) {
println("group -> ${t.key}, value -> $r")
}
override fun onCompleted() {
}
override fun onError(e: Throwable?) {
}
})
}
override fun onCompleted() {
}
})
group -> 1, value -> 1
group -> 0, value -> 2
group -> 1, value -> 3
group -> 0, value -> 4
group -> 1, value -> 5
group -> 0, value -> 6
buffer
Observable.range(0, 7)
.buffer(3)
.subscribe(object : Subscriber<List<Int>>() {
override fun onNext(t: List<Int>) {
println(t)
}
override fun onCompleted() {
}
override fun onError(e: Throwable?) {
}
})
[0, 1, 2]
[3, 4, 5]
[6]
scan
Observable.range(1, 5)
.scan { sum, num -> sum + num }
.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("sum = $t")
}
override fun onCompleted() {
}
override fun onError(e: Throwable?) {
}
})
sum = 1
sum = 3
sum = 6
sum = 10
sum = 15
3.3 Filtering 操作符
debounce // 在一定時(shí)間間隔內(nèi)沒有操作,數(shù)據(jù)才會(huì)發(fā)射給觀察者
distinct // 去重
elementAt // 取指定位置的一個(gè)數(shù)據(jù)
filter // 按照指定的規(guī)則進(jìn)行條件的過濾
first // 取第一個(gè)數(shù)據(jù)
last // 取最后一個(gè)數(shù)據(jù)
ignoreElements // 忽略所有數(shù)據(jù),不向觀察者發(fā)送任何數(shù)據(jù),只回調(diào) onCompleted() 或 onError()
sample // 取樣
skip // 跳過
skipLast // 跳過最后幾項(xiàng)
take
takeLast
debounce
Observable.create<Int> {
for (i in 1..10) {
try {
it.onNext(i)
if (i % 2 == 0) {
Thread.sleep(1000)
} else {
Thread.sleep(2000)
}
} catch (e: Exception) {
it.onError(e)
}
}
it.onCompleted()
}
.subscribeOn(Schedulers.io())
.debounce(2, TimeUnit.SECONDS)
.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 1
onNext() --> 3
onNext() --> 5
onNext() --> 7
onNext() --> 9
onNext() --> 10
onCompleted()
distinct
Observable.just(1, 2, 3, 4, 2, 3)
.distinct()
.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 4
onCompleted()
3.4 Combining 操作符(組合)
zip
merge
startWith
combineLatest
join
switchOnNext
-
zip用來合并兩個(gè) Observable 發(fā)射的數(shù)據(jù)項(xiàng),根據(jù) Func2() 函數(shù)指定的規(guī)則生成一個(gè)新的 Observable 并發(fā)射出去,當(dāng)其中一個(gè) Observable 發(fā)射數(shù)據(jù)結(jié)束或者出現(xiàn)異常后,另一個(gè) Observable 也將停止發(fā)射數(shù)據(jù)
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
Observable.zip(observable1, observable2) { o1, o2 ->
o1 + o2
}.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 11
onNext() --> 22
onNext() --> 33
onCompleted()
-
merge將兩個(gè) Observable 發(fā)射的數(shù)據(jù)項(xiàng)按照發(fā)射時(shí)間順序合并成一個(gè) Observable 進(jìn)行發(fā)射
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
Observable.merge(observable1, observable2)
.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 10
onNext() --> 20
onNext() --> 30
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 4
onCompleted()
-
startWith用于在一個(gè) Observable 發(fā)射數(shù)據(jù)前插入一個(gè) Observable
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
observable1.startWith(observable2)
.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 4
onNext() --> 10
onNext() --> 20
onNext() --> 30
onCompleted()
-
combineLatest用于將兩個(gè) Observable 發(fā)射的臨近的數(shù)據(jù)項(xiàng)通過 Func2() 函數(shù)指定的規(guī)則組合成一個(gè)新的 Observable
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
Observable.combineLatest(observable1, observable2) { o1, o2 ->
o1 + o2
}.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
onNext() --> 31
onNext() --> 32
onNext() --> 33
onNext() --> 34
onCompleted()
join
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
observable1.join(observable2,
{ Observable.timer(2, TimeUnit.SECONDS) },
{ Observable.timer(0, TimeUnit.SECONDS) },
{ t1, t2 -> Observable.just(t1 + t2) })
.subscribe(object : Subscriber<Observable<Int>>() {
override fun onNext(t: Observable<Int>) {
t.subscribe(object : Subscriber<Int>() {
override fun onNext(data: Int) {
println("onNext() --> $data")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 11
onCompleted()
onNext() --> 21
onCompleted()
onNext() --> 31
onCompleted()
onNext() --> 12
onCompleted()
onNext() --> 22
onCompleted()
onNext() --> 32
onCompleted()
onNext() --> 13
onCompleted()
onNext() --> 23
onCompleted()
onNext() --> 33
onCompleted()
onNext() --> 14
onCompleted()
onNext() --> 24
onCompleted()
onNext() --> 34
onCompleted()
onCompleted()
4. 線程調(diào)度(結(jié)合 RxAndroid)
-
Schedulers.io()I/O 線程,執(zhí)行耗時(shí)操作 -
AndroidSchedulers.mainThread()Android 中的UI線程,執(zhí)行UI更新 -
subscribeOn()調(diào)度被觀察者運(yùn)行的線程 -
observeOn()調(diào)度觀察者運(yùn)行的線程