上一篇文章RxJava造輪子初步的了解了rxjava的簡單原理,更深入的可能還是要去查看源碼了,后續(xù)會(huì)再研究。這個(gè)階段的目標(biāo)是了解使用常用的操作符。
創(chuàng)建操作符
create
含義:使用一個(gè)函數(shù)從頭開始創(chuàng)建observable
實(shí)現(xiàn):
create()函數(shù)中傳入一個(gè)接受observer觀察者的函數(shù)Observable.Onsubscrib()
Observable.Onsubscriber()函數(shù)中只有一個(gè)call(Subscriber)方法
-
在call()方法中調(diào)用subscriber.onNext()、onError()、onComplete()
/** * 創(chuàng)建一個(gè)observable * 注意: 在create中函數(shù)調(diào)用發(fā)送消息時(shí)候,檢查,是否有觀察者,沒有不發(fā)送消息,減少資源消耗 * observer.isUnSubscribed() */ fun create() { Observable.create(Observable.OnSubscribe<String> { t -> if (!(t?.isUnsubscribed ?: true)) { try { for (i in 1..10) { t?.onNext("" + i) } t?.onCompleted() } catch (e: Exception) { e.printStackTrace() t?.onError(e) } } }) .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0()) }
from
含義: 將數(shù)組或者iterable、future轉(zhuǎn)化為一個(gè)observable
實(shí)現(xiàn): 產(chǎn)生的observable將iterable、數(shù)組中的每個(gè)item數(shù)據(jù)發(fā)送出去
/**
* from 將數(shù)組或者對象,生成新的observable發(fā)送出去
*/
fun from(){
Observable.from(arrayOf(1,2,3,4,5))
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
interval
含義:間接或者無限期的發(fā)送數(shù)據(jù)
/**
* Interval:
* 固定時(shí)間發(fā)送數(shù)據(jù)
* 初始值為0
* 無線遞增發(fā)送數(shù)據(jù)
*/
fun interval(){
Observable.interval(1,TimeUnit.SECONDS)
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
just
含義:發(fā)送單個(gè)或者多個(gè)對象(對多發(fā)送10個(gè)對象)
/**
* just
* 發(fā)送單個(gè)對象
* 參數(shù)可選,1-10
* 按照參數(shù)列表發(fā)送數(shù)據(jù)
*/
fun just(){
Observable.just(1,2,3,4,5,6,7,8,9,10)
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
range
含義: 發(fā)送指定范圍內(nèi)數(shù)據(jù)的observable
如果找不到需要發(fā)送的數(shù)據(jù),如,起始位負(fù)數(shù),發(fā)送數(shù)據(jù)個(gè)數(shù)不足,拋出異常
/**
* range: 發(fā)送整數(shù)范圍內(nèi)的有序序列
* 第一個(gè)參數(shù):整數(shù)的起始數(shù)
* 第二個(gè)參數(shù):一共要發(fā)送幾個(gè)數(shù)據(jù)
*/
fun range(){
Observable.range(3,3)
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
repeat
含義:重復(fù)發(fā)送observable中數(shù)據(jù)
/**
* repeat
* 重復(fù)發(fā)送源數(shù)據(jù)源,重復(fù)次數(shù)可設(shè)置
* repeat() 表示無限循環(huán)
* repeat(5):表示循環(huán)5次
*
* 其他循環(huán)操作符
* 滿足條件循環(huán)
* repeatWhen()
* doWhile()
* whileDo()
*/
fun repeat(){
Observable.just(1)
.repeat(5)
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
timer
含義:在一段時(shí)間之后發(fā)送數(shù)據(jù)
/**
* timer
* 在一定延時(shí)后發(fā)送一條數(shù)據(jù)
*/
fun timer(){
Observable.timer(1,TimeUnit.SECONDS)
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
變換操作符
buffer
含義: 可以將一段時(shí)間或者將count個(gè)數(shù)據(jù)打包發(fā)送
: 可以實(shí)現(xiàn)backpress 背壓操作,將快速產(chǎn)生很多數(shù)據(jù),緩存打包發(fā)送
/**
* buffer
* buffer(3) 三個(gè)為一體,打包發(fā)送
* buffer(3,4)
* 第一個(gè)參數(shù)為count:幾個(gè)數(shù)據(jù)作為一個(gè)打包
* 第二個(gè)參數(shù)為跳躍:跳躍第一個(gè)值
*/
fun buffer() {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
// .buffer(3)
.buffer(3, 4)
.subscribe(Action1<List<Int>> {
integers ->
integers.forEach {
integer ->
Log.e(TAG, "" + integer)
}
Log.e(TAG, "------------------------------------")
}, RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
flatMap
含義:
將observable發(fā)送的每一項(xiàng)數(shù)據(jù)進(jìn)行變換操作,轉(zhuǎn)化為多個(gè)observables,再使用merge()將所有observables合并
因?yàn)槭莔erge()合并發(fā)送,所以發(fā)送的順序不是有序的
/**
* flatMap
* * 合并所有產(chǎn)生的observables,產(chǎn)生的自己數(shù)據(jù)列不能保證順序
* 1.通過一個(gè)指定的函數(shù)將源數(shù)據(jù)源變化為其他數(shù)據(jù)
* 2.新建一個(gè)observable發(fā)送變化后的數(shù)據(jù)源
* 3.merge合并所有產(chǎn)生的observable->放入新的observable一起發(fā)送出去
* 4.發(fā)送的順序是無序的
* flatMap(function1,maxCount)
* 1.第二個(gè)參數(shù):從源數(shù)據(jù)最大同時(shí)訂閱個(gè)數(shù),當(dāng)達(dá)到最大限制,會(huì)等待其中一個(gè)終止在訂閱
*/
fun flatMap() {
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
// .flatMap { t -> Observable.just("flatmap-change==="+t) }
.flatMap({
t ->
var list = arrayListOf(1, 2, 3)
Observable.from(list).delay(100, TimeUnit.MILLISECONDS)
}, 3)
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
concatMap
含義:和flatMap類似,都是將原始o(jì)bservable發(fā)送的每一項(xiàng)數(shù)據(jù)進(jìn)行轉(zhuǎn)化,不同的是,concatMap是按照順序連接每一個(gè)發(fā)送數(shù)據(jù)
-
就是當(dāng)前一個(gè)數(shù)據(jù)源結(jié)束之后接著下一個(gè)事件的發(fā)送
/** * concatMap * * 按照次序連接生成的observables,然后產(chǎn)生自己的數(shù)據(jù)列 * 1.和flatMap操作符類似 * 2.不同的是,嚴(yán)格按照源數(shù)據(jù)的順序發(fā)送數(shù)據(jù)源 */ fun concatMap() { Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8)) // .flatMap { t -> Observable.just("flatmap-change==="+t) } .concatMap({ t -> var list = arrayListOf(t, t, t) Observable.from(list).delay(100, TimeUnit.MILLISECONDS) }) .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0()) }
switchMap
含義: 只按最后發(fā)送過來的事件為準(zhǔn),永遠(yuǎn)只監(jiān)聽最后一個(gè)事件
/**
* switchMap
* 只監(jiān)聽當(dāng)前的數(shù)據(jù)
*/
fun switchMap() {
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
.switchMap({
t ->
var list = arrayListOf(t, t, t)
Observable.from(list).delay(100, TimeUnit.MILLISECONDS)
})
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
map
含義: 將observable中發(fā)送的源數(shù)據(jù),轉(zhuǎn)化為另一種數(shù)據(jù)
/**
* map
* 1.根據(jù)你指定的函數(shù)將源數(shù)據(jù)源轉(zhuǎn)化為另一種類型
*/
fun map() {
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
.map {
number ->
"" + number
}
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
輔助操作符
delay
含義: 1.第一種是對整個(gè)observable的延時(shí),在一段時(shí)間之后再發(fā)送數(shù)據(jù)
2.第二章是對observable中的每一項(xiàng)數(shù)據(jù)發(fā)送之前延時(shí)
/**
* delay
* 延時(shí)一段指定的時(shí)間,再發(fā)送observable數(shù)據(jù)
* * 整體發(fā)射時(shí)間延長
* delay(observable)
* *: 發(fā)射的每一項(xiàng)都會(huì)延時(shí)
* *: 每一項(xiàng)數(shù)據(jù)都默認(rèn)使用這個(gè)bservable的定時(shí)器
*
* delaySubscription(long,timeunit)
* *: 延時(shí)訂閱原始的observable
* *: 整體的延時(shí)訂閱
*
*/
fun delay() {
Observable.just(1, 2, 3)
// .delay(1,TimeUnit.SECONDS)
// .delay { t ->
// Observable.create<Int> {
// subscriber->
// Thread.sleep(1000)
// subscriber.onNext(t)
// subscriber.onCompleted()
// }
// }
// .delaySubscription(1,TimeUnit.SECONDS)
.delaySubscription(object : Func0<Observable<Int>> {
override fun call(): Observable<Int> {
return Observable.create<Int> {
subscriber ->
Thread.sleep(1000)
subscriber.onNext(1)
subscriber.onCompleted()
}
}
})
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
timestamp
含義:為observable中每一個(gè)數(shù)據(jù)源包裝一個(gè)時(shí)間戳,返回Timestamped<T>類型
其中t.timestampMillis獲取發(fā)送這條數(shù)據(jù)的時(shí)間戳
t.value獲取原始發(fā)送數(shù)據(jù)
/**
* timestamp
*/
fun timestamp(){
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
.timestamp()
.subscribe(object :Action1<Timestamped<Int>>{
override fun call(t: Timestamped<Int>?) {
Log.e(RxUtil.TAG,""+t?.timestampMillis+"value-"+t?.value)
}
})
}
doEtch生命周期
含義: observable的整個(gè)生命周期,在發(fā)送之前調(diào)用一下事件
/**
* doEatch
* *: 在observable的對于生命周期之前的時(shí)候調(diào)用對應(yīng)代碼
* doOnNext:在subscriber->onNext之前調(diào)用
* doOnError:在onError->之前調(diào)用
* doOnCompleted: 在onComplete->之前調(diào)用
* doOnTerminate: observable終止的時(shí)候調(diào)用(無論是否正常終止)
*/
fun doEatch() {
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
// .doOnNext { Log.e(RxUtil.TAG,"doOnNext-onNext") }
// .doOnTerminate{Log.e(RxUtil.TAG,"doOnTerminate-doOnTerminate")}
// .finallyDo{Log.e(RxUtil.TAG,"finallyDo-finallyDo")}
.doOnEach(object : Observer<Int> {
override fun onNext(t: Int?) {
Log.e(RxUtil.TAG, "doEatch-onNext")
}
override fun onError(e: Throwable?) {
Log.e(RxUtil.TAG, "doEatch-onError")
}
override fun onCompleted() {
Log.e(RxUtil.TAG, "doEatch-onCompleted")
}
})
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
timeout
含義: 在一段時(shí)間之后沒有發(fā)送數(shù)據(jù),有兩種處理
1. 直接拋出異常
2. 運(yùn)行默認(rèn)的observable數(shù)據(jù)
可以結(jié)合onErrorReturn進(jìn)行錯(cuò)誤處理
/**
* timeout
* *: 超過一段時(shí)間沒有發(fā)送數(shù)據(jù),拋出異常
* timeout(long,timeunit,observable)
* *: 超過一段時(shí)間,執(zhí)行默認(rèn)的observable
*/
fun timeout() {
Observable.create<Int> {
subscriber ->
Thread.sleep(2000)
subscriber.onNext(1)
subscriber.onCompleted()
}
// .timeout(1, TimeUnit.SECONDS)
.timeout(1, TimeUnit.SECONDS, Observable.create {
subscriber ->
subscriber.onNext(-1)
subscriber.onCompleted()
})
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
toList
含義:將observable發(fā)送的所有數(shù)據(jù)結(jié)果包裝一個(gè)list集合中,一起發(fā)出去
toSortedList()可以對生成的數(shù)據(jù)進(jìn)行排序
toSortedList(func2())這個(gè)自定義實(shí)現(xiàn)排序還有問題
/**
* toList
* *: 讓observable將多項(xiàng)數(shù)據(jù)組合成一個(gè)list數(shù)據(jù)返回
* toSortedList
* *: 可以排序,默認(rèn)自然順序
*/
fun toList(){
Observable.from(arrayOf(1, 3, 2, 5, 4, 8, 7, 6))
// .flatMap { t -> Observable.just("flatmap-change==="+t) }
.concatMap({
t ->
var list = arrayListOf(3, 1, 2)
Observable.from(list)
})
// .toList()
.toSortedList()
// .toSortedList(object :Func2<Int,Int,Int>{
// override fun call(t1: Int?, t2: Int?): Int {
// return t2?.toInt()?:0
// }
// })
.subscribe(object :Action1<List<Int>>{
override fun call(t: List<Int>?) {
t?.forEach {
Log.e(RxUtil.TAG,"toList-"+t)
}
}
}, Action1<Throwable> {
error->
Log.e(RxUtil.TAG,"toList-"+error.toString())
})
}
toMap
含義: 將observable中所有數(shù)據(jù)結(jié)果合并到map中,一起發(fā)送出去
/**
* toMap
* *: 將原始所有數(shù)據(jù)合并到map中,發(fā)送這個(gè)map
*
*/
fun toMap(){
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
.toMap(object :Func1<Int,Int>{
override fun call(t: Int): Int {
return 10*t
}
},object :Func1<Int,String>{
override fun call(t: Int?): String {
return ""+t
}
})
.subscribe(object :Action1<Map<Int,String>>{
override fun call(map: Map<Int,String>?) {
map?.forEach {
(key,value)->
Log.e(RxUtil.TAG, "toMap-key-$key-value-$value")
}
}
}, Action1<Throwable> {
error->
Log.e(RxUtil.TAG,"toMap-"+error.toString())
})
}
本期的操作符暫時(shí)這么多啊,后面還有第二波哦
附上github地址