實(shí)現(xiàn)簡單的 RxKotlin (下)

其他的一些操作符...

doOnNext ( doOnError doOnCompleted)

//Observable.kt
fun doOnNext(action: (t: T) -> Unit): Observable<T> {
    return create(OnSubscribeDoOnNext(this, action))
}

//OnSubscribeDoOnNext.kt
class OnSubscribeDoOnNext<T>(private var source: Observable<T>, private var action: (T) -> Unit) : Observable.OnSubscribe<T> {

    override fun call(subscriber: Subscriber<T>) {
        source.subscribe(object : Subscriber<T>(){
            override fun onCompleted() {
                subscriber.onCompleted()
            }

            override fun onError(t: Throwable) {
                subscriber.onError(t)
            }

            override fun onNext(t: T) {
                action(t)
                subscriber.onNext(t)
            }
        })
    }
}

compose

fun <R> compose(transformer: (t : Observable<T>) -> Observable<R>): Observable<R> {
    return transformer(this)
}

from(array: Array<T>)

//Observable.kt
companion object {

    fun <T> from(array: Array<T>): Observable<T> {
        return create(OnSubscribeFromArray(array))
    }

}

//OnSubscribeFromArray
class OnSubscribeFromArray<T>(private var array : Array<T>) : Observable.OnSubscribe<T> {

    override fun call(subscriber: Subscriber<T>) {
        try {
            for (t in array) {
                if (!subscriber.isUnsubscribed())
                    subscriber.onNext(t)
            }
        }catch (e: Exception) {
            subscriber.onError(e)
        }
        subscriber.onCompleted()
    }

}

merge

//Observable.kt
companion object {

    fun <T> merge(source : Observable<Observable<T>>): Observable<T> {
        return source.lift(OperatorMerge())
    }

    fun <T> merge(array: Array<Observable<T>>): Observable<T> {
        return merge(from(array))
    }
}

//OperatorMerge.kt
class OperatorMerge<T> : Observable.Operator<T, Observable<T>> {

    override fun call(subscriber: Subscriber<T>): Subscriber<Observable<T>> {
        return MergeSubscriber(subscriber)
    }

    private class MergeSubscriber<T>(private var actual : Subscriber<T>) : Subscriber<Observable<T>>() {

        override fun onCompleted() {
        }

        override fun onError(t: Throwable) {
        }

        override fun onNext(t: Observable<T>) {
            t.subscribe(actual)
        }

    }
}

flatMap

fun <R> flatMap(func: (t: T) -> Observable<R>): Observable<R>{
    return merge(map(func))
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • http://blog.csdn.net/yyh352091626/article/details/5330472...
    奈何心善閱讀 3,655評(píng)論 0 0
  • 看了許多講解RxJava的文章,有些文章講解的內(nèi)容是基于第一個(gè)版本的,有些文章的講解是通過比較常用的一些API和基...
    開發(fā)者如是說閱讀 41,015評(píng)論 0 52
  • 注:本系列文章主要用于博主個(gè)人學(xué)習(xí)記錄,本文末尾附上了一些較好的文章提供學(xué)習(xí)。轉(zhuǎn)載請(qǐng)附 原文鏈接RxJava學(xué)習(xí)系...
    黑丫山上小旋風(fēng)閱讀 2,255評(píng)論 1 5
  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    無求_95dd閱讀 3,514評(píng)論 0 21
  • 今天第一次玩這個(gè),看見很多人都是積極的過著自己人生,看書、寫作、塑立自己的目標(biāo),寫著一路路的難堪或者喜悅。我沒有什...
    胡畔_閱讀 438評(píng)論 0 1

友情鏈接更多精彩內(nèi)容