淺入深出,RxJava實(shí)現(xiàn)原理

rxJava就是觀察者模式的變形增強(qiáng),具體怎么變形增強(qiáng)網(wǎng)上的文章有各種講解,各種比喻,最普遍的是上下游的說法,上游的水通過水管流到下游,源碼里確實(shí)有upStream,downStream的命名,不過水是什么,水管是什么,上下游又怎么連接,有時(shí)候交代不清楚,或者生硬造詞,反倒是對(duì)理解其原理增加了難度,我也不知道怎么精準(zhǔn)的用詞,索性從最簡(jiǎn)單的代碼一步步擴(kuò)展來(lái)理解。
從0到1我設(shè)計(jì)不出來(lái),從1到0我分析一下總可以吧,好的,先看最簡(jiǎn)單的寫法如下:

class Observable {
    fun subscribe(observer:Observer) {
         observer.onNext()
    }
}

class Observer {
    fun onNext() {
        println("receive msg")
    }
}

fun main(args: Array<String>) {
    Observable().subscribe(Observer())
}

rxJava最基礎(chǔ)的形式是這樣的:

  Observable.create(object : ObservableOnSubscribe<Int> {
            override fun subscribe(emitter: ObservableEmitter<Int>) {
                emitter.onNext(10)
            }
        }).subscribe(object : Observer<Int> {
            override fun onSubscribe(d: Disposable) {
            }

            override fun onError(e: Throwable) {
            }

            override fun onComplete() {
            }

            override fun onNext(t: Int) {
                println(t)
            }
        })

下面我們朝著這個(gè)目標(biāo)去變形擴(kuò)展,將Observable的構(gòu)造方法私有化,增加create靜態(tài)方法,返回一個(gè)Observable,如下:

class Observable private constructor() {
    
    fun subscribe(observer:Observer) {
         observer.onNext()
    }
    
    companion object {
        fun create() : Observable {
           return Observable()
        }
    }
}

class Observer {
    fun onNext() {
        println("receive msg")
    }
}

fun main() {
    Observable.create().subscribe(Observer())
}

再近一步,create方法里面包含了一個(gè)匿名的實(shí)現(xiàn)了ObservableOnSubscribe接口的object,同時(shí)它還是一個(gè)Observable,所以O(shè)bservableCreate應(yīng)該繼承Observable,同時(shí)實(shí)現(xiàn)ObservableOnSubscribe接口,或者讓Observable實(shí)現(xiàn)ObservableOnSubscribe接口,ObservableCreate的類繼承Observable后自然也實(shí)現(xiàn)了ObservableOnSubscribe接口。翻看源碼是讓Observable實(shí)現(xiàn)ObservableOnSubscribe接口,所以這里新增一個(gè)ObservableOnSubscribe接口,里面有一個(gè)subscribe方法,把Observable改為抽象類并實(shí)現(xiàn)它,同時(shí)新增一個(gè)ObservableCreate的類繼承Observable。那么既然傳過來(lái)這個(gè)object是個(gè)Observable,create方法是直接return這個(gè)object嗎,好像也行,但是翻看源碼是聲明了成員變量,接住他,為什么多此一舉我們后面再說。修改下ObservableCreate的構(gòu)造方法,聲明一個(gè)變量接住它。

interface ObservableOnSubscribe {
    fun subscribe(observer : Observer)
}

abstract class Observable : ObservableOnSubscribe {
    
    override fun subscribe(observer : Observer) {
         observer.onNext()
    }
    
    companion object {
        fun create(source : ObservableOnSubscribe) : Observable {
           return ObservableCreate(source)
        }
    }
    
}

class ObservableCreate (private val source:ObservableOnSubscribe) : Observable() {
    
}

class Observer {
    fun onNext() {
        println("receive msg")
    }
}

fun main() {
    Observable.create(object : ObservableOnSubscribe {
        override fun subscribe(emitter: Observer) {
            
        }
    }).subscribe(Observer())
}

相對(duì)于目標(biāo)代碼,就差在subscribe方法中調(diào)用emitter.onNext(10)了,這里ObservableCreate沒有重寫Observable的subscribe方法,導(dǎo)致抽象類Observable調(diào)用具體方法執(zhí)行了具體的通知操作,好像不太符合面向?qū)ο蟮脑O(shè)計(jì)思想。所以這里增加一個(gè)抽象方法,命名subscribeActual,讓子類ObservableCreate重寫這個(gè)方法,Observable只是起到承上啟下的作用,不做具體的邏輯。

interface ObservableOnSubscribe {
    fun subscribe(observer : Observer)
}

abstract class Observable  : ObservableOnSubscribe {
    
    fun subscribe(observer : Observer) {
         subscribeActual(observer)
    }
    
    abstract fun subscribeActual(observer: Observer)
    
    companion object {
        fun create(source : ObservableOnSubscribe) : Observable {
           return ObservableCreate(source)
        }
    }
    
}

class ObservableCreate (private val source:ObservableOnSubscribe) : Observable() {
    override fun subscribeActual(observer : Observer) {
        observer.onNext(10)
    }
}

class Observer {
    fun onNext() {
        println("receive msg")
    }
}

fun main() {
    Observable.create(object : ObservableOnSubscribe {
        override fun subscribe(emitter: Observer) {
            
        }
    }).subscribe(Observer())
}

到這我們發(fā)現(xiàn)還是差一步啊,再分析下,這里observer.onNext(10)其實(shí)還是在ObservableCreate的subscribe方法執(zhí)行的,而目標(biāo)代碼其實(shí)要在實(shí)現(xiàn)了ObservableOnSubscribe的那個(gè)object的subscribe中執(zhí)行,在subscribeActual里面調(diào)用source即object的subscribe方法,object的subscribe方法執(zhí)行observer.onNext(10)

class ObservableCreate (private val source:ObservableOnSubscribe) : Observable() {
    override fun subscribeActual(observer : Observer) {
        source.subscribe(observer)
    }
}

fun main() {
    Observable.create(object : ObservableOnSubscribe {
        override fun subscribe(emitter: Observer) {
            emitter.onNext(10)
        }
    }).subscribe(Observer())
}

是不是差不多了,再給Observable加上泛型,支持發(fā)送任意類型的數(shù)據(jù),把類Observer改為接口,增加剩下的方法

interface ObservableOnSubscribe<T> {
    fun subscribe(observer : Observer<T>)
}

abstract class Observable<T> : ObservableOnSubscribe<T>{
    
    fun subscribe(observer : Observer<T>) {
         subscribeActual(observer)
    }
    
    abstract fun subscribeActual(observer: Observer<T>)
    
    companion object {
        fun <T> create(source : ObservableOnSubscribe<T>) : Observable<T> {
           return ObservableCreate(source)
        }
    }
    
}

class ObservableCreate<T> (private val source:ObservableOnSubscribe<T>) : Observable<T>() {
    override fun subscribeActual(observer : Observer<T>) {
        source.subscribe(observer)
    }
}

interface Observer<T> {
    fun onSubscribe()
    fun onNext(t : T)
    fun onError(t:Throwable)
    fun onComplete()
}

fun main() {
    Observable.create(object : ObservableOnSubscribe<Int> {
        override fun subscribe(emitter: Observer<Int>) {
            emitter.onNext(10)
        }
    }).subscribe(object: Observer<Int> {
        
        override fun onSubscribe() {
            
        }
        
        override fun onNext(t:Int) {
            println("value:$t")
        }
        
        override fun onError(t:Throwable) {
            
        }
        
        override fun onComplete() {
            
        }
    })
}

這樣第一個(gè)基礎(chǔ)的擴(kuò)展就完成了,那如果實(shí)現(xiàn)一個(gè)操作符呢,這里選取最基礎(chǔ)的map操作符,我們知道m(xù)ap的作用是映射,將輸入的某種類型的值通過map函數(shù)轉(zhuǎn)化為另外一種類型的值,也有可能類型是一樣的,要看這個(gè)map函數(shù)怎么寫了

實(shí)現(xiàn)map操作符

在Observable里面增加一個(gè)map函數(shù)

abstract class Observable<T> : ObservableOnSubscribe<T> {
    
    override fun subscribe(observer : Observer<T>) {
         subscribeActual(observer)
    }
    
    abstract fun subscribeActual(observer: Observer<T>)
    
    companion object {
        fun <T> create(source : ObservableOnSubscribe<T>) : Observable<T> {
           return ObservableCreate(source)
        }
    }
    
    fun <R> map(func: (T) -> R): Observable<R> {
        return ObservableMap<T,R>(this, func)
    }
}

可以看到這里返回了一個(gè)ObservableMap,接收兩個(gè)參數(shù),一個(gè)是調(diào)用該函數(shù)的Observable,一個(gè)是map函數(shù),這里是將T類型轉(zhuǎn)為R類型,看返回值ObservableMap是一個(gè)R類型的Observable,說明這個(gè)操作符將上游發(fā)送的T類型經(jīng)過map轉(zhuǎn)化為R類型后接著向下游發(fā)送數(shù)據(jù),最終到達(dá)Observer,所以O(shè)bserver收到的數(shù)據(jù)為R類型,但是我們的source發(fā)送的數(shù)據(jù)是T類型,所以subscribeActual里面source.subscribe的observer類型是T類型,這里新增一個(gè)MapObserver內(nèi)部類,包裹了Observer<R>,MapObserver的onNext調(diào)用時(shí),執(zhí)行observer.onNext(func(t)),最終發(fā)給下游轉(zhuǎn)化為R類型的數(shù)據(jù)。
新增一個(gè)ObservableMap類

class ObservableMap<T, R>(
    private val source: ObservableOnSubscribe<T>, private val func: (T) -> R
) : Observable<R>() {

    override fun subscribeActual(observer: Observer<R>) {
        source.subscribe(MapObserver(observer, func))
    }

    private class MapObserver<T, R>(private val observer: Observer<R>, private val func: (T) -> R) : Observer<T> {
        
         override fun onSubscribe() {
            observer.onSubscribe()
         }
        
        override fun onNext(t: T) {
            observer.onNext(func(t))
        }

        override fun onError(t:Throwable) {
            observer.onError(t)
        }
        
        override fun onComplete() {
            observer.onComplete()
        }
    }
}

最終調(diào)用如下:

fun main() {
    Observable.create(object : ObservableOnSubscribe<Int> {
        override fun subscribe(emitter: Observer<Int>) {
            emitter.onNext(10)
        }
    }).map {
        "value:$it"
    }.subscribe(object: Observer<String> {
        
        override fun onSubscribe() {
            
        }
        
        override fun onNext(t:String) {
            println("value:$t")
        }
        
        override fun onError(t:Throwable) {
            
        }
        
        override fun onComplete() {
            
        }
    })
}

到此,map操作符也實(shí)現(xiàn)了,那么再添加其他操作符是不是也是一個(gè)套路呢

實(shí)現(xiàn)線程切換

可以發(fā)現(xiàn)實(shí)現(xiàn)其他操作符也是同一個(gè)套路,下面看下切換線程subscribeOn的實(shí)現(xiàn)過程:
在Observable里面增加一個(gè)subscribeOn函數(shù):

abstract class Observable<T> : ObservableOnSubscribe<T> {

    override fun subscribe(observer: Observer<T>) {
        subscribeActual(observer)
    }

    abstract fun subscribeActual(observer: Observer<T>)


    companion object {
        fun <T> create(source:ObservableOnSubscribe<T>): Observable<T> {
            return ObservableCreate(source)
        }
    }

    fun <R> map(func: (T) -> R): Observable<R> {
        return MapObservable(this, func)
    }

    fun subscribeOn(scheduler: Scheduler): Observable<T> {
        return ObservableSubscribeOn(this, scheduler)
    }
}

可以看到這里返回了一個(gè)ObservableSubscribeOn,接收兩個(gè)參數(shù),一個(gè)是調(diào)用該函數(shù)的Observable,一個(gè)是Scheduler類型的變量,這個(gè)變量就是用于切換線程的。新建ObservableSubscribeOn如下:

class ObservableSubscribeOn<T>(
    private val source: ObservableOnSubscribe<T>, private val scheduler: Scheduler
) : Observable<T>() {

    override fun subscribeActual(observer: Observer<T>) {
        scheduler.scheduleDirect {
            source.subscribe(SubscribeOnObserver(observer))
        }
    }

    private class SubscribeOnObserver<T>(private val observer: Observer<T>) : Observer<T> {

        override fun onNext(t: T) {
            observer.onNext(t)
        }
    }
}

新建Scheduler抽象類,包含scheduleDirect方法,抽象靜態(tài)內(nèi)部類Worker表示工作線程,抽象方法createWorker:

abstract class Scheduler {

    fun scheduleDirect(runnable: Runnable) {
        val worker = createWorker()
        worker.schedule(runnable)
    }

    abstract fun createWorker(): Worker

    abstract class Worker {
        abstract fun schedule(runnable: Runnable)
    }
}

接下來(lái)可以實(shí)現(xiàn)一個(gè)具體的Scheduler,命名IoScheduler:

import java.util.concurrent.Executors

class IoScheduler : Scheduler() {

    companion object {
        private val executor = Executors.newCachedThreadPool()
    }

    override fun createWorker(): Worker {
        return EventLoopWorker()
    }

    private inner class EventLoopWorker : Worker() {
        override fun schedule(runnable: Runnable) {
            executor.submit(runnable)
        }
    }
}

回頭看下所謂線程切換,就是讓訂閱的代碼執(zhí)行在一個(gè)線程里面,如下

 scheduler.scheduleDirect {
        source.subscribe(SubscribeOnObserver(observer))
 }

調(diào)用一下看看呢,這里為了保持和源碼基本一致的寫法,在Scheduler類里面添加一個(gè)靜態(tài)方法,用于生成IoScheduler,如下:

abstract class Scheduler {

    companion object {
        fun io(): IoScheduler {
            return IoScheduler()
        }
    }
    ...  //后面的代碼同上
}

調(diào)用一下:

 Observable.create(object : ObservableOnSubscribe<Int> {
            override fun subscribe(observer: Observer<Int>) {
                observer.onNext(10)
            }
        }).map {
            "value:$it"
        }.subscribeOn(Scheduler.io()).subscribe(object : Observer<String> {
            override fun onNext(t: String) {
                Log.d(tag, "$t,thread:${Thread.currentThread().name}")
            }
        })

結(jié)果如下:

D  value:10,thread:pool-2-thread-1

這里是線程池里面一個(gè)線程的名字,我們也可以直接使用線程Thread類,執(zhí)行Runnable,給Thread設(shè)置一個(gè)名字,修改IoScheduler,線程命名worker-thread看下:

class IoScheduler : Scheduler() {

    override fun createWorker(): Worker {
        return EventLoopWorker()
    }

    private inner class EventLoopWorker : Worker() {
        override fun schedule(runnable: Runnable) {
             val t = Thread(runnable)
             t.name = "worker-thread"
             t.start()
        }
    }
}

執(zhí)行結(jié)果:

 D  value:10,thread:worker-thread

好的,現(xiàn)在已經(jīng)實(shí)現(xiàn)了subscribeOn階段的線程切換,我們知道還有observerOn的線程切換,那又是怎么寫呢?
新增observerOn操作符:

abstract class Observable<T> : ObservableOnSubscribe<T> {
     
    ... // 以上代碼省略

    fun observerOn(scheduler: Scheduler): Observable<T> {
        return ObservableObserverOn(this, scheduler)
    }
}

新建ObservableObserverOn:

class ObservableObserverOn<T>(
    private val source: ObservableOnSubscribe<T>, private val scheduler: Scheduler
) : Observable<T>() {
    override fun subscribeActual(observer: Observer<T>) {
        val w = scheduler.createWorker()
        source.subscribe(ObserverOnObserver(observer, w))
    }

    private class ObserverOnObserver<T>(
        private val observer: Observer<T>, val w: Scheduler.Worker
    ) : Observer<T> {

        override fun onNext(t: T) {
            w.schedule {
                observer.onNext(t)
            }
        }
    }
}

這里和ObservableSubscribeOn不同的是,訂閱過程沒有在新的線程執(zhí)行,而是把發(fā)送消息放到了worker的schedule方法里面,那這時(shí)候我想切換到主線程怎么辦呢,繼續(xù)仿寫,新增AndroidSchedulers,里面返回一個(gè)HandlerScheduler,而這個(gè)類就是借助主線程的handler,把消息發(fā)送到主線程的消息隊(duì)列的:

import android.os.Handler
import android.os.Looper

class AndroidSchedulers {

    companion object {
        private val DEFAULT = 
        HandlerScheduler(Handler(Looper.getMainLooper()))

        fun mainThread(): Scheduler {
            return DEFAULT
        }
    }
}

再看HandlerScheduler怎么寫?

import android.os.Handler
import android.os.Message

class HandlerScheduler(private val handler: Handler) : Scheduler() {

    private class HandlerWorker(private val handler: Handler) : Worker() {
        override fun schedule(runnable: Runnable) {
            handler.sendMessage(Message.obtain(handler, runnable))
        }
    }

    override fun createWorker(): Worker {
        return HandlerWorker(handler)
    }
}

可以看到就是利用handler,把runnable包裝成Message對(duì)象發(fā)送到了主線程的消息隊(duì)列,runnable里面的代碼observer.onNext(t)自然運(yùn)行在主線程,進(jìn)而完成了切換的過程。
調(diào)用一下試試:

Observable.create(object : ObservableOnSubscribe<Int> {
            override fun subscribe(observer: Observer<Int>) {
                observer.onNext(10)
            }
        }).map {
            Log.d(tag, "map in,thread:${Thread.currentThread().name}")
            "value:$it"
     }.subscribeOn(Scheduler.io()).observerOn(AndroidSchedulers.mainThread()).subscribe(object : Observer<String> {
            override fun onNext(t: String) {
                Log.d(tag, "$t,thread:${Thread.currentThread().name}")
            }
        })

這里map操作符在subscribeOn之前,里面的打印應(yīng)該在IoScheduler獲取的線程執(zhí)行,而onNext里面的打印應(yīng)該在主線程執(zhí)行。
結(jié)果:

 D  map in,thread:pool-2-thread-1
 D  value:10,thread:main
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容