實(shí)現(xiàn)簡(jiǎn)單的 RxKotlin (中)

線程切換的操作在 Rx 里面非常常用,主要有 subscribeOn observeOn 他們都需要一個(gè) Scheduler 參數(shù),很明顯這個(gè)是接口 可以實(shí)現(xiàn)各種 調(diào)度器。根據(jù)這個(gè)我們可以這樣寫(xiě)...

//Scheduler.kt
abstract class Scheduler {

    abstract fun createWorker(): Worker

    abstract class Worker {
        abstract fun schedule(action: () -> Unit)
    }
}

Worker 是實(shí)際工作的,有個(gè)抽象方法 schedule 需要我們?nèi)?shí)現(xiàn)
Executor 是 Java 里面線程池的執(zhí)行器, Executors 里面有各種已經(jīng)實(shí)現(xiàn)的線程池,至此 我們可以來(lái)實(shí)現(xiàn)一個(gè)ExecutorScheduler

//ExecutorScheduler.kt
class ExecutorScheduler(private var executor: Executor) : Scheduler() {

    override fun createWorker(): Worker {
        return WorkerImpl(executor)
    }

    private class WorkerImpl(private var executor: Executor) : Worker() {

        override fun schedule(action: () -> Unit) {
            executor.execute {
                action()
            }
        }
    }
}

參考 rxjava 建立一個(gè) Schedulers 存放 Scheduler,這樣就直接可以調(diào)用 Schedulers.io()

class Schedulers {
    companion object {

        private val io = ExecutorScheduler(Executors.newSingleThreadExecutor())
       
        fun io(): Scheduler {
            return io
        }
    }
}

實(shí)現(xiàn) Android 特有 的 AndroidScheduler,主要利用 Looper

//LooperScheduler.kt
class LooperScheduler(looper: Looper) : Scheduler() {

    private  var handler : Handler = Handler(looper)

    override fun createWorker(): Worker = LooperSchedulerWorker(handler)

    private class LooperSchedulerWorker(private var handler : Handler) : Worker() {
        override fun schedule(action: () -> Unit) {
            handler.post {
                action()
            }
        }
    }

}
//AndroidSchedulers.kt
class AndroidSchedulers {

    companion object {

        private val mainThread = LooperScheduler(Looper.getMainLooper())

        fun mainThread(): Scheduler {
            return mainThread
        }
    }
}

實(shí)現(xiàn) subscribeOn observeOn

//Observable.kt
fun <R> lift(operator: Operator<R, T>): Observable<R> {
    return create(OnSubscribeLift(onSubscribe!!, operator))
}

fun subscribeOn(scheduler: Scheduler): Observable<T> {
    return create(OperatorSubscribeOn(this, scheduler))
}

fun observeOn(scheduler: Scheduler): Observable<T> {
    return lift(OperatorObserveOn(scheduler))
}

interface Operator<T, R> {
    fun call(subscriber: Subscriber<T>) : Subscriber<R>
}

//OnSubscribeLift.kt
class OnSubscribeLift<T, R>(private var parent: Observable.OnSubscribe<T>, private var operator: Observable.Operator<R, T>) : Observable.OnSubscribe<R>{

    override fun call(subscriber: Subscriber<R>) {
        try {
            val st = operator.call(subscriber)
            st.onStart()
            parent.call(st)
        }catch (e: Exception) {
            subscriber.onError(e)
        }
    }
}

在 rxjava 里面 subscribeOn 只有調(diào)用的第一次起作用,而 observeOn 則是看最近的那次調(diào)用。
原因在于 subscribeOn 調(diào)度的是執(zhí)行 OnSubscribe 因此多次調(diào)用最上游的還是 在 第一次的線程那里執(zhí)行的,除非更改下游 觀察者所在的線程 也就是 observeOn。如果不明白的,可以多看這部分代碼或者 rxjava 的源碼加深理解。

//OperatorSubscribeOn.kt
class OperatorSubscribeOn<T>(private var source: Observable<T> , private var scheduler: Scheduler) : Observable.OnSubscribe<T>{

    override fun call(subscriber: Subscriber<T>) {
        val worker = scheduler.createWorker()
        worker.schedule {
            source.subscribe(SubscribeOnSubscriber(subscriber))
        }
    }

    class SubscribeOnSubscriber<T>(private var actual: Subscriber<T>): Subscriber<T>() {
        override fun onCompleted() {
            actual.onCompleted()
        }

        override fun onError(t: Throwable) {
            actual.onError(t)
        }

        override fun onNext(t: T) {
            actual.onNext(t)
        }
    }
}

//OperatorObserveOn.kt
class  OperatorObserveOn<T>(private var scheduler: Scheduler): Observable.Operator<T, T> {

    override fun call(subscriber: Subscriber<T>): Subscriber<T> {
        val worker = scheduler.createWorker()
        return object : Subscriber<T>(){
            override fun onCompleted() {
                worker.schedule {
                    subscriber.onCompleted()
                }
            }

            override fun onError(t: Throwable) {
                worker.schedule {
                    subscriber.onError(t)
                }
            }

            override fun onNext(t: T) {
                worker.schedule {
                    subscriber.onNext(t)
                }
            }
        }
    }
}
//Test.kt
Observable.just("1", "2", "3")
                .subscribeOn(Schedulers.io())
                .map {
                    it.toInt() + 1
                }
                .filter {
                    it != 1
                }
                .map {
                    it
                }
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(object : Subscriber<Int>() {
                    override fun onCompleted() {

                    }

                    override fun onError(t: Throwable) {
                    }

                    override fun onNext(t: Int) {
                        System.out.println(t)
                    }
                })
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容