實現(xiàn)簡單的 RxKotlin (下)

其他的一些操作符...

doOnNext ( doOnError doOnCompleted)

//Observable.kt
fun doOnNext(action: (t: T) -> Unit): Observable<T> {
    return create(OnSubscribeDoOnNext(this, action))
}

//OnSubscribeDoOnNext.kt
class OnSubscribeDoOnNext<T>(private var source: Observable<T>, private var action: (T) -> Unit) : Observable.OnSubscribe<T> {

    override fun call(subscriber: Subscriber<T>) {
        source.subscribe(object : Subscriber<T>(){
            override fun onCompleted() {
                subscriber.onCompleted()
            }

            override fun onError(t: Throwable) {
                subscriber.onError(t)
            }

            override fun onNext(t: T) {
                action(t)
                subscriber.onNext(t)
            }
        })
    }
}

compose

fun <R> compose(transformer: (t : Observable<T>) -> Observable<R>): Observable<R> {
    return transformer(this)
}

from(array: Array<T>)

//Observable.kt
companion object {

    fun <T> from(array: Array<T>): Observable<T> {
        return create(OnSubscribeFromArray(array))
    }

}

//OnSubscribeFromArray
class OnSubscribeFromArray<T>(private var array : Array<T>) : Observable.OnSubscribe<T> {

    override fun call(subscriber: Subscriber<T>) {
        try {
            for (t in array) {
                if (!subscriber.isUnsubscribed())
                    subscriber.onNext(t)
            }
        }catch (e: Exception) {
            subscriber.onError(e)
        }
        subscriber.onCompleted()
    }

}

merge

//Observable.kt
companion object {

    fun <T> merge(source : Observable<Observable<T>>): Observable<T> {
        return source.lift(OperatorMerge())
    }

    fun <T> merge(array: Array<Observable<T>>): Observable<T> {
        return merge(from(array))
    }
}

//OperatorMerge.kt
class OperatorMerge<T> : Observable.Operator<T, Observable<T>> {

    override fun call(subscriber: Subscriber<T>): Subscriber<Observable<T>> {
        return MergeSubscriber(subscriber)
    }

    private class MergeSubscriber<T>(private var actual : Subscriber<T>) : Subscriber<Observable<T>>() {

        override fun onCompleted() {
        }

        override fun onError(t: Throwable) {
        }

        override fun onNext(t: Observable<T>) {
            t.subscribe(actual)
        }

    }
}

flatMap

fun <R> flatMap(func: (t: T) -> Observable<R>): Observable<R>{
    return merge(map(func))
}
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • http://blog.csdn.net/yyh352091626/article/details/5330472...
    奈何心善閱讀 3,646評論 0 0
  • 看了許多講解RxJava的文章,有些文章講解的內容是基于第一個版本的,有些文章的講解是通過比較常用的一些API和基...
    開發(fā)者如是說閱讀 41,013評論 0 52
  • 注:本系列文章主要用于博主個人學習記錄,本文末尾附上了一些較好的文章提供學習。轉載請附 原文鏈接RxJava學習系...
    黑丫山上小旋風閱讀 2,246評論 1 5
  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    無求_95dd閱讀 3,482評論 0 21
  • 今天第一次玩這個,看見很多人都是積極的過著自己人生,看書、寫作、塑立自己的目標,寫著一路路的難堪或者喜悅。我沒有什...
    胡畔_閱讀 431評論 0 1

友情鏈接更多精彩內容