RxJava 操作符第一波

上一篇文章RxJava造輪子初步的了解了rxjava的簡單原理,更深入的可能還是要去查看源碼了,后續(xù)會(huì)再研究。這個(gè)階段的目標(biāo)是了解使用常用的操作符。

創(chuàng)建操作符

create

含義:使用一個(gè)函數(shù)從頭開始創(chuàng)建observable
實(shí)現(xiàn):

  • create()函數(shù)中傳入一個(gè)接受observer觀察者的函數(shù)Observable.Onsubscrib()

  • Observable.Onsubscriber()函數(shù)中只有一個(gè)call(Subscriber)方法

  • 在call()方法中調(diào)用subscriber.onNext()、onError()、onComplete()

      /**
               * 創(chuàng)建一個(gè)observable
               * 注意: 在create中函數(shù)調(diào)用發(fā)送消息時(shí)候,檢查,是否有觀察者,沒有不發(fā)送消息,減少資源消耗
               * observer.isUnSubscribed()
               */
              fun create() {
                  Observable.create(Observable.OnSubscribe<String> { t ->
                      if (!(t?.isUnsubscribed ?: true)) {
                          try {
                              for (i in 1..10) {
                                  t?.onNext("" + i)
                              }
                              t?.onCompleted()
                          } catch (e: Exception) {
                              e.printStackTrace()
                              t?.onError(e)
                          }
    
                      }
                  })
                          .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
              }
    
from

含義: 將數(shù)組或者iterable、future轉(zhuǎn)化為一個(gè)observable
實(shí)現(xiàn): 產(chǎn)生的observable將iterable、數(shù)組中的每個(gè)item數(shù)據(jù)發(fā)送出去

    /**
             * from 將數(shù)組或者對象,生成新的observable發(fā)送出去
             */
            fun from(){
                Observable.from(arrayOf(1,2,3,4,5))
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())

            }
interval

含義:間接或者無限期的發(fā)送數(shù)據(jù)

    /**
             * Interval:
             *      固定時(shí)間發(fā)送數(shù)據(jù)
             *      初始值為0
             *      無線遞增發(fā)送數(shù)據(jù)
             */
            fun interval(){
                Observable.interval(1,TimeUnit.SECONDS)
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
just

含義:發(fā)送單個(gè)或者多個(gè)對象(對多發(fā)送10個(gè)對象)

    /**
             * just
             *      發(fā)送單個(gè)對象
             *      參數(shù)可選,1-10
             *      按照參數(shù)列表發(fā)送數(shù)據(jù)
             */
            fun just(){
                Observable.just(1,2,3,4,5,6,7,8,9,10)
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
range

含義: 發(fā)送指定范圍內(nèi)數(shù)據(jù)的observable
如果找不到需要發(fā)送的數(shù)據(jù),如,起始位負(fù)數(shù),發(fā)送數(shù)據(jù)個(gè)數(shù)不足,拋出異常

    /**
             * range: 發(fā)送整數(shù)范圍內(nèi)的有序序列
             *      第一個(gè)參數(shù):整數(shù)的起始數(shù)
             *      第二個(gè)參數(shù):一共要發(fā)送幾個(gè)數(shù)據(jù)
             */
            fun range(){
                Observable.range(3,3)
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
repeat

含義:重復(fù)發(fā)送observable中數(shù)據(jù)

    /**
             * repeat
             *      重復(fù)發(fā)送源數(shù)據(jù)源,重復(fù)次數(shù)可設(shè)置
             *      repeat() 表示無限循環(huán)
             *      repeat(5):表示循環(huán)5次
             *
             * 其他循環(huán)操作符
             *      滿足條件循環(huán)
             *      repeatWhen()
             *      doWhile()
             *      whileDo()
             */
            fun repeat(){
                Observable.just(1)
                        .repeat(5)
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
timer

含義:在一段時(shí)間之后發(fā)送數(shù)據(jù)

    /**
             * timer
             *      在一定延時(shí)后發(fā)送一條數(shù)據(jù)
             */
            fun timer(){
                Observable.timer(1,TimeUnit.SECONDS)
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }

變換操作符

buffer

含義: 可以將一段時(shí)間或者將count個(gè)數(shù)據(jù)打包發(fā)送
: 可以實(shí)現(xiàn)backpress 背壓操作,將快速產(chǎn)生很多數(shù)據(jù),緩存打包發(fā)送

    /**
             * buffer
             *      buffer(3)  三個(gè)為一體,打包發(fā)送
             *      buffer(3,4)
             *              第一個(gè)參數(shù)為count:幾個(gè)數(shù)據(jù)作為一個(gè)打包
             *              第二個(gè)參數(shù)為跳躍:跳躍第一個(gè)值
             */
            fun buffer() {
                Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
    //                    .buffer(3)
                        .buffer(3, 4)
                        .subscribe(Action1<List<Int>> {
                            integers ->
                            integers.forEach {
                                integer ->
                                Log.e(TAG, "" + integer)
                            }

                            Log.e(TAG, "------------------------------------")

                        }, RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
flatMap

含義:
將observable發(fā)送的每一項(xiàng)數(shù)據(jù)進(jìn)行變換操作,轉(zhuǎn)化為多個(gè)observables,再使用merge()將所有observables合并
因?yàn)槭莔erge()合并發(fā)送,所以發(fā)送的順序不是有序的

    /**
             * flatMap
             *      * 合并所有產(chǎn)生的observables,產(chǎn)生的自己數(shù)據(jù)列不能保證順序
             *      1.通過一個(gè)指定的函數(shù)將源數(shù)據(jù)源變化為其他數(shù)據(jù)
             *      2.新建一個(gè)observable發(fā)送變化后的數(shù)據(jù)源
             *      3.merge合并所有產(chǎn)生的observable->放入新的observable一起發(fā)送出去
             *      4.發(fā)送的順序是無序的
             * flatMap(function1,maxCount)
             *      1.第二個(gè)參數(shù):從源數(shù)據(jù)最大同時(shí)訂閱個(gè)數(shù),當(dāng)達(dá)到最大限制,會(huì)等待其中一個(gè)終止在訂閱
             */
            fun flatMap() {
                Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
    //                    .flatMap { t -> Observable.just("flatmap-change==="+t) }
                        .flatMap({
                            t ->
                            var list = arrayListOf(1, 2, 3)
                            Observable.from(list).delay(100, TimeUnit.MILLISECONDS)
                        }, 3)
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
concatMap

含義:和flatMap類似,都是將原始o(jì)bservable發(fā)送的每一項(xiàng)數(shù)據(jù)進(jìn)行轉(zhuǎn)化,不同的是,concatMap是按照順序連接每一個(gè)發(fā)送數(shù)據(jù)

  • 就是當(dāng)前一個(gè)數(shù)據(jù)源結(jié)束之后接著下一個(gè)事件的發(fā)送

      /**
               * concatMap
               *      * 按照次序連接生成的observables,然后產(chǎn)生自己的數(shù)據(jù)列
               *      1.和flatMap操作符類似
               *      2.不同的是,嚴(yán)格按照源數(shù)據(jù)的順序發(fā)送數(shù)據(jù)源
               */
              fun concatMap() {
                  Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
      //                    .flatMap { t -> Observable.just("flatmap-change==="+t) }
                          .concatMap({
                              t ->
                              var list = arrayListOf(t, t, t)
                              Observable.from(list).delay(100, TimeUnit.MILLISECONDS)
                          })
                          .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
              }
    
switchMap

含義: 只按最后發(fā)送過來的事件為準(zhǔn),永遠(yuǎn)只監(jiān)聽最后一個(gè)事件

     /**
             * switchMap
             *      只監(jiān)聽當(dāng)前的數(shù)據(jù)
             */
            fun switchMap() {
                Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
                        .switchMap({
                            t ->
                            var list = arrayListOf(t, t, t)
                            Observable.from(list).delay(100, TimeUnit.MILLISECONDS)
                        })
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
map

含義: 將observable中發(fā)送的源數(shù)據(jù),轉(zhuǎn)化為另一種數(shù)據(jù)

    /**
             * map
             *      1.根據(jù)你指定的函數(shù)將源數(shù)據(jù)源轉(zhuǎn)化為另一種類型
             */
            fun map() {
                Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
                        .map {
                            number ->
                            "" + number
                        }
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }

輔助操作符

delay

含義: 1.第一種是對整個(gè)observable的延時(shí),在一段時(shí)間之后再發(fā)送數(shù)據(jù)
2.第二章是對observable中的每一項(xiàng)數(shù)據(jù)發(fā)送之前延時(shí)

    /**
             * delay
             *      延時(shí)一段指定的時(shí)間,再發(fā)送observable數(shù)據(jù)
             *      * 整體發(fā)射時(shí)間延長
             * delay(observable)
             *      *: 發(fā)射的每一項(xiàng)都會(huì)延時(shí)
             *      *: 每一項(xiàng)數(shù)據(jù)都默認(rèn)使用這個(gè)bservable的定時(shí)器
             *
             * delaySubscription(long,timeunit)
             *      *: 延時(shí)訂閱原始的observable
             *      *: 整體的延時(shí)訂閱
             *
             */
            fun delay() {
                Observable.just(1, 2, 3)
    //                    .delay(1,TimeUnit.SECONDS)
    //                    .delay { t ->
    //                        Observable.create<Int> {
    //                            subscriber->
    //                            Thread.sleep(1000)
    //                            subscriber.onNext(t)
    //                            subscriber.onCompleted()
    //                        }
    //                    }
    //                    .delaySubscription(1,TimeUnit.SECONDS)
                        .delaySubscription(object : Func0<Observable<Int>> {
                            override fun call(): Observable<Int> {
                                return Observable.create<Int> {
                                    subscriber ->
                                    Thread.sleep(1000)
                                    subscriber.onNext(1)
                                    subscriber.onCompleted()
                                }
                            }
                        })
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
timestamp

含義:為observable中每一個(gè)數(shù)據(jù)源包裝一個(gè)時(shí)間戳,返回Timestamped<T>類型
其中t.timestampMillis獲取發(fā)送這條數(shù)據(jù)的時(shí)間戳
t.value獲取原始發(fā)送數(shù)據(jù)

    /**
             * timestamp
             */
            fun timestamp(){
                Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
                        .timestamp()
                        .subscribe(object :Action1<Timestamped<Int>>{
                            override fun call(t: Timestamped<Int>?) {
                                Log.e(RxUtil.TAG,""+t?.timestampMillis+"value-"+t?.value)
                            }

                        })
            }
doEtch生命周期

含義: observable的整個(gè)生命周期,在發(fā)送之前調(diào)用一下事件

    /**
             * doEatch
             *      *: 在observable的對于生命周期之前的時(shí)候調(diào)用對應(yīng)代碼
             *  doOnNext:在subscriber->onNext之前調(diào)用
             *  doOnError:在onError->之前調(diào)用
             *  doOnCompleted: 在onComplete->之前調(diào)用
             *  doOnTerminate: observable終止的時(shí)候調(diào)用(無論是否正常終止)
             */
            fun doEatch() {
                Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
    //                    .doOnNext { Log.e(RxUtil.TAG,"doOnNext-onNext") }
    //                    .doOnTerminate{Log.e(RxUtil.TAG,"doOnTerminate-doOnTerminate")}
    //                    .finallyDo{Log.e(RxUtil.TAG,"finallyDo-finallyDo")}
                        .doOnEach(object : Observer<Int> {
                            override fun onNext(t: Int?) {
                                Log.e(RxUtil.TAG, "doEatch-onNext")
                            }

                            override fun onError(e: Throwable?) {
                                Log.e(RxUtil.TAG, "doEatch-onError")
                            }

                            override fun onCompleted() {
                                Log.e(RxUtil.TAG, "doEatch-onCompleted")
                            }
                        })
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
timeout

含義: 在一段時(shí)間之后沒有發(fā)送數(shù)據(jù),有兩種處理
1. 直接拋出異常
2. 運(yùn)行默認(rèn)的observable數(shù)據(jù)
可以結(jié)合onErrorReturn進(jìn)行錯(cuò)誤處理

    /**
             * timeout
             *      *: 超過一段時(shí)間沒有發(fā)送數(shù)據(jù),拋出異常
             *  timeout(long,timeunit,observable)
             *      *: 超過一段時(shí)間,執(zhí)行默認(rèn)的observable
             */
            fun timeout() {
                Observable.create<Int> {
                    subscriber ->
                    Thread.sleep(2000)
                    subscriber.onNext(1)
                    subscriber.onCompleted()
                }
    //                    .timeout(1, TimeUnit.SECONDS)
                        .timeout(1, TimeUnit.SECONDS, Observable.create {
                            subscriber ->
                            subscriber.onNext(-1)
                            subscriber.onCompleted()
                        })
                        .subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
            }
toList

含義:將observable發(fā)送的所有數(shù)據(jù)結(jié)果包裝一個(gè)list集合中,一起發(fā)出去
toSortedList()可以對生成的數(shù)據(jù)進(jìn)行排序
toSortedList(func2())這個(gè)自定義實(shí)現(xiàn)排序還有問題

    /**
             * toList
             *      *: 讓observable將多項(xiàng)數(shù)據(jù)組合成一個(gè)list數(shù)據(jù)返回
             *  toSortedList
             *      *: 可以排序,默認(rèn)自然順序
             */
            fun toList(){
                Observable.from(arrayOf(1, 3, 2, 5, 4, 8, 7, 6))
    //                    .flatMap { t -> Observable.just("flatmap-change==="+t) }
                        .concatMap({
                            t ->
                            var list = arrayListOf(3, 1, 2)
                            Observable.from(list)
                        })
    //                    .toList()
                        .toSortedList()
    //                    .toSortedList(object :Func2<Int,Int,Int>{
    //                        override fun call(t1: Int?, t2: Int?): Int {
    //                            return t2?.toInt()?:0
    //                        }
    //                    })
                        .subscribe(object :Action1<List<Int>>{
                            override fun call(t: List<Int>?) {
                                t?.forEach {
                                    Log.e(RxUtil.TAG,"toList-"+t)
                                }
                            }

                        }, Action1<Throwable> {
                            error->
                            Log.e(RxUtil.TAG,"toList-"+error.toString())
                        })
            }
toMap

含義: 將observable中所有數(shù)據(jù)結(jié)果合并到map中,一起發(fā)送出去

    /**
             * toMap
             *      *: 將原始所有數(shù)據(jù)合并到map中,發(fā)送這個(gè)map
             *
             */
            fun toMap(){
                Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
                        .toMap(object :Func1<Int,Int>{
                            override fun call(t: Int): Int {
                                return 10*t
                            }
                        },object :Func1<Int,String>{
                            override fun call(t: Int?): String {
                                return ""+t
                            }

                        })
                        .subscribe(object :Action1<Map<Int,String>>{
                            override fun call(map: Map<Int,String>?) {
                                map?.forEach {
                                    (key,value)->
                                    Log.e(RxUtil.TAG, "toMap-key-$key-value-$value")
                                }
                            }

                        }, Action1<Throwable> {
                            error->
                            Log.e(RxUtil.TAG,"toMap-"+error.toString())
                        })
            }

本期的操作符暫時(shí)這么多啊,后面還有第二波哦

附上github地址

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容