rxJava就是觀察者模式的變形增強(qiáng),具體怎么變形增強(qiáng)網(wǎng)上的文章有各種講解,各種比喻,最普遍的是上下游的說法,上游的水通過水管流到下游,源碼里確實(shí)有upStream,downStream的命名,不過水是什么,水管是什么,上下游又怎么連接,有時(shí)候交代不清楚,或者生硬造詞,反倒是對(duì)理解其原理增加了難度,我也不知道怎么精準(zhǔn)的用詞,索性從最簡(jiǎn)單的代碼一步步擴(kuò)展來(lái)理解。
從0到1我設(shè)計(jì)不出來(lái),從1到0我分析一下總可以吧,好的,先看最簡(jiǎn)單的寫法如下:
class Observable {
fun subscribe(observer:Observer) {
observer.onNext()
}
}
class Observer {
fun onNext() {
println("receive msg")
}
}
fun main(args: Array<String>) {
Observable().subscribe(Observer())
}
rxJava最基礎(chǔ)的形式是這樣的:
Observable.create(object : ObservableOnSubscribe<Int> {
override fun subscribe(emitter: ObservableEmitter<Int>) {
emitter.onNext(10)
}
}).subscribe(object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
}
override fun onComplete() {
}
override fun onNext(t: Int) {
println(t)
}
})
下面我們朝著這個(gè)目標(biāo)去變形擴(kuò)展,將Observable的構(gòu)造方法私有化,增加create靜態(tài)方法,返回一個(gè)Observable,如下:
class Observable private constructor() {
fun subscribe(observer:Observer) {
observer.onNext()
}
companion object {
fun create() : Observable {
return Observable()
}
}
}
class Observer {
fun onNext() {
println("receive msg")
}
}
fun main() {
Observable.create().subscribe(Observer())
}
再近一步,create方法里面包含了一個(gè)匿名的實(shí)現(xiàn)了ObservableOnSubscribe接口的object,同時(shí)它還是一個(gè)Observable,所以O(shè)bservableCreate應(yīng)該繼承Observable,同時(shí)實(shí)現(xiàn)ObservableOnSubscribe接口,或者讓Observable實(shí)現(xiàn)ObservableOnSubscribe接口,ObservableCreate的類繼承Observable后自然也實(shí)現(xiàn)了ObservableOnSubscribe接口。翻看源碼是讓Observable實(shí)現(xiàn)ObservableOnSubscribe接口,所以這里新增一個(gè)ObservableOnSubscribe接口,里面有一個(gè)subscribe方法,把Observable改為抽象類并實(shí)現(xiàn)它,同時(shí)新增一個(gè)ObservableCreate的類繼承Observable。那么既然傳過來(lái)這個(gè)object是個(gè)Observable,create方法是直接return這個(gè)object嗎,好像也行,但是翻看源碼是聲明了成員變量,接住他,為什么多此一舉我們后面再說。修改下ObservableCreate的構(gòu)造方法,聲明一個(gè)變量接住它。
interface ObservableOnSubscribe {
fun subscribe(observer : Observer)
}
abstract class Observable : ObservableOnSubscribe {
override fun subscribe(observer : Observer) {
observer.onNext()
}
companion object {
fun create(source : ObservableOnSubscribe) : Observable {
return ObservableCreate(source)
}
}
}
class ObservableCreate (private val source:ObservableOnSubscribe) : Observable() {
}
class Observer {
fun onNext() {
println("receive msg")
}
}
fun main() {
Observable.create(object : ObservableOnSubscribe {
override fun subscribe(emitter: Observer) {
}
}).subscribe(Observer())
}
相對(duì)于目標(biāo)代碼,就差在subscribe方法中調(diào)用emitter.onNext(10)了,這里ObservableCreate沒有重寫Observable的subscribe方法,導(dǎo)致抽象類Observable調(diào)用具體方法執(zhí)行了具體的通知操作,好像不太符合面向?qū)ο蟮脑O(shè)計(jì)思想。所以這里增加一個(gè)抽象方法,命名subscribeActual,讓子類ObservableCreate重寫這個(gè)方法,Observable只是起到承上啟下的作用,不做具體的邏輯。
interface ObservableOnSubscribe {
fun subscribe(observer : Observer)
}
abstract class Observable : ObservableOnSubscribe {
fun subscribe(observer : Observer) {
subscribeActual(observer)
}
abstract fun subscribeActual(observer: Observer)
companion object {
fun create(source : ObservableOnSubscribe) : Observable {
return ObservableCreate(source)
}
}
}
class ObservableCreate (private val source:ObservableOnSubscribe) : Observable() {
override fun subscribeActual(observer : Observer) {
observer.onNext(10)
}
}
class Observer {
fun onNext() {
println("receive msg")
}
}
fun main() {
Observable.create(object : ObservableOnSubscribe {
override fun subscribe(emitter: Observer) {
}
}).subscribe(Observer())
}
到這我們發(fā)現(xiàn)還是差一步啊,再分析下,這里observer.onNext(10)其實(shí)還是在ObservableCreate的subscribe方法執(zhí)行的,而目標(biāo)代碼其實(shí)要在實(shí)現(xiàn)了ObservableOnSubscribe的那個(gè)object的subscribe中執(zhí)行,在subscribeActual里面調(diào)用source即object的subscribe方法,object的subscribe方法執(zhí)行observer.onNext(10)
class ObservableCreate (private val source:ObservableOnSubscribe) : Observable() {
override fun subscribeActual(observer : Observer) {
source.subscribe(observer)
}
}
fun main() {
Observable.create(object : ObservableOnSubscribe {
override fun subscribe(emitter: Observer) {
emitter.onNext(10)
}
}).subscribe(Observer())
}
是不是差不多了,再給Observable加上泛型,支持發(fā)送任意類型的數(shù)據(jù),把類Observer改為接口,增加剩下的方法
interface ObservableOnSubscribe<T> {
fun subscribe(observer : Observer<T>)
}
abstract class Observable<T> : ObservableOnSubscribe<T>{
fun subscribe(observer : Observer<T>) {
subscribeActual(observer)
}
abstract fun subscribeActual(observer: Observer<T>)
companion object {
fun <T> create(source : ObservableOnSubscribe<T>) : Observable<T> {
return ObservableCreate(source)
}
}
}
class ObservableCreate<T> (private val source:ObservableOnSubscribe<T>) : Observable<T>() {
override fun subscribeActual(observer : Observer<T>) {
source.subscribe(observer)
}
}
interface Observer<T> {
fun onSubscribe()
fun onNext(t : T)
fun onError(t:Throwable)
fun onComplete()
}
fun main() {
Observable.create(object : ObservableOnSubscribe<Int> {
override fun subscribe(emitter: Observer<Int>) {
emitter.onNext(10)
}
}).subscribe(object: Observer<Int> {
override fun onSubscribe() {
}
override fun onNext(t:Int) {
println("value:$t")
}
override fun onError(t:Throwable) {
}
override fun onComplete() {
}
})
}
這樣第一個(gè)基礎(chǔ)的擴(kuò)展就完成了,那如果實(shí)現(xiàn)一個(gè)操作符呢,這里選取最基礎(chǔ)的map操作符,我們知道m(xù)ap的作用是映射,將輸入的某種類型的值通過map函數(shù)轉(zhuǎn)化為另外一種類型的值,也有可能類型是一樣的,要看這個(gè)map函數(shù)怎么寫了
實(shí)現(xiàn)map操作符
在Observable里面增加一個(gè)map函數(shù)
abstract class Observable<T> : ObservableOnSubscribe<T> {
override fun subscribe(observer : Observer<T>) {
subscribeActual(observer)
}
abstract fun subscribeActual(observer: Observer<T>)
companion object {
fun <T> create(source : ObservableOnSubscribe<T>) : Observable<T> {
return ObservableCreate(source)
}
}
fun <R> map(func: (T) -> R): Observable<R> {
return ObservableMap<T,R>(this, func)
}
}
可以看到這里返回了一個(gè)ObservableMap,接收兩個(gè)參數(shù),一個(gè)是調(diào)用該函數(shù)的Observable,一個(gè)是map函數(shù),這里是將T類型轉(zhuǎn)為R類型,看返回值ObservableMap是一個(gè)R類型的Observable,說明這個(gè)操作符將上游發(fā)送的T類型經(jīng)過map轉(zhuǎn)化為R類型后接著向下游發(fā)送數(shù)據(jù),最終到達(dá)Observer,所以O(shè)bserver收到的數(shù)據(jù)為R類型,但是我們的source發(fā)送的數(shù)據(jù)是T類型,所以subscribeActual里面source.subscribe的observer類型是T類型,這里新增一個(gè)MapObserver內(nèi)部類,包裹了Observer<R>,MapObserver的onNext調(diào)用時(shí),執(zhí)行observer.onNext(func(t)),最終發(fā)給下游轉(zhuǎn)化為R類型的數(shù)據(jù)。
新增一個(gè)ObservableMap類
class ObservableMap<T, R>(
private val source: ObservableOnSubscribe<T>, private val func: (T) -> R
) : Observable<R>() {
override fun subscribeActual(observer: Observer<R>) {
source.subscribe(MapObserver(observer, func))
}
private class MapObserver<T, R>(private val observer: Observer<R>, private val func: (T) -> R) : Observer<T> {
override fun onSubscribe() {
observer.onSubscribe()
}
override fun onNext(t: T) {
observer.onNext(func(t))
}
override fun onError(t:Throwable) {
observer.onError(t)
}
override fun onComplete() {
observer.onComplete()
}
}
}
最終調(diào)用如下:
fun main() {
Observable.create(object : ObservableOnSubscribe<Int> {
override fun subscribe(emitter: Observer<Int>) {
emitter.onNext(10)
}
}).map {
"value:$it"
}.subscribe(object: Observer<String> {
override fun onSubscribe() {
}
override fun onNext(t:String) {
println("value:$t")
}
override fun onError(t:Throwable) {
}
override fun onComplete() {
}
})
}
到此,map操作符也實(shí)現(xiàn)了,那么再添加其他操作符是不是也是一個(gè)套路呢
實(shí)現(xiàn)線程切換
可以發(fā)現(xiàn)實(shí)現(xiàn)其他操作符也是同一個(gè)套路,下面看下切換線程subscribeOn的實(shí)現(xiàn)過程:
在Observable里面增加一個(gè)subscribeOn函數(shù):
abstract class Observable<T> : ObservableOnSubscribe<T> {
override fun subscribe(observer: Observer<T>) {
subscribeActual(observer)
}
abstract fun subscribeActual(observer: Observer<T>)
companion object {
fun <T> create(source:ObservableOnSubscribe<T>): Observable<T> {
return ObservableCreate(source)
}
}
fun <R> map(func: (T) -> R): Observable<R> {
return MapObservable(this, func)
}
fun subscribeOn(scheduler: Scheduler): Observable<T> {
return ObservableSubscribeOn(this, scheduler)
}
}
可以看到這里返回了一個(gè)ObservableSubscribeOn,接收兩個(gè)參數(shù),一個(gè)是調(diào)用該函數(shù)的Observable,一個(gè)是Scheduler類型的變量,這個(gè)變量就是用于切換線程的。新建ObservableSubscribeOn如下:
class ObservableSubscribeOn<T>(
private val source: ObservableOnSubscribe<T>, private val scheduler: Scheduler
) : Observable<T>() {
override fun subscribeActual(observer: Observer<T>) {
scheduler.scheduleDirect {
source.subscribe(SubscribeOnObserver(observer))
}
}
private class SubscribeOnObserver<T>(private val observer: Observer<T>) : Observer<T> {
override fun onNext(t: T) {
observer.onNext(t)
}
}
}
新建Scheduler抽象類,包含scheduleDirect方法,抽象靜態(tài)內(nèi)部類Worker表示工作線程,抽象方法createWorker:
abstract class Scheduler {
fun scheduleDirect(runnable: Runnable) {
val worker = createWorker()
worker.schedule(runnable)
}
abstract fun createWorker(): Worker
abstract class Worker {
abstract fun schedule(runnable: Runnable)
}
}
接下來(lái)可以實(shí)現(xiàn)一個(gè)具體的Scheduler,命名IoScheduler:
import java.util.concurrent.Executors
class IoScheduler : Scheduler() {
companion object {
private val executor = Executors.newCachedThreadPool()
}
override fun createWorker(): Worker {
return EventLoopWorker()
}
private inner class EventLoopWorker : Worker() {
override fun schedule(runnable: Runnable) {
executor.submit(runnable)
}
}
}
回頭看下所謂線程切換,就是讓訂閱的代碼執(zhí)行在一個(gè)線程里面,如下
scheduler.scheduleDirect {
source.subscribe(SubscribeOnObserver(observer))
}
調(diào)用一下看看呢,這里為了保持和源碼基本一致的寫法,在Scheduler類里面添加一個(gè)靜態(tài)方法,用于生成IoScheduler,如下:
abstract class Scheduler {
companion object {
fun io(): IoScheduler {
return IoScheduler()
}
}
... //后面的代碼同上
}
調(diào)用一下:
Observable.create(object : ObservableOnSubscribe<Int> {
override fun subscribe(observer: Observer<Int>) {
observer.onNext(10)
}
}).map {
"value:$it"
}.subscribeOn(Scheduler.io()).subscribe(object : Observer<String> {
override fun onNext(t: String) {
Log.d(tag, "$t,thread:${Thread.currentThread().name}")
}
})
結(jié)果如下:
D value:10,thread:pool-2-thread-1
這里是線程池里面一個(gè)線程的名字,我們也可以直接使用線程Thread類,執(zhí)行Runnable,給Thread設(shè)置一個(gè)名字,修改IoScheduler,線程命名worker-thread看下:
class IoScheduler : Scheduler() {
override fun createWorker(): Worker {
return EventLoopWorker()
}
private inner class EventLoopWorker : Worker() {
override fun schedule(runnable: Runnable) {
val t = Thread(runnable)
t.name = "worker-thread"
t.start()
}
}
}
執(zhí)行結(jié)果:
D value:10,thread:worker-thread
好的,現(xiàn)在已經(jīng)實(shí)現(xiàn)了subscribeOn階段的線程切換,我們知道還有observerOn的線程切換,那又是怎么寫呢?
新增observerOn操作符:
abstract class Observable<T> : ObservableOnSubscribe<T> {
... // 以上代碼省略
fun observerOn(scheduler: Scheduler): Observable<T> {
return ObservableObserverOn(this, scheduler)
}
}
新建ObservableObserverOn:
class ObservableObserverOn<T>(
private val source: ObservableOnSubscribe<T>, private val scheduler: Scheduler
) : Observable<T>() {
override fun subscribeActual(observer: Observer<T>) {
val w = scheduler.createWorker()
source.subscribe(ObserverOnObserver(observer, w))
}
private class ObserverOnObserver<T>(
private val observer: Observer<T>, val w: Scheduler.Worker
) : Observer<T> {
override fun onNext(t: T) {
w.schedule {
observer.onNext(t)
}
}
}
}
這里和ObservableSubscribeOn不同的是,訂閱過程沒有在新的線程執(zhí)行,而是把發(fā)送消息放到了worker的schedule方法里面,那這時(shí)候我想切換到主線程怎么辦呢,繼續(xù)仿寫,新增AndroidSchedulers,里面返回一個(gè)HandlerScheduler,而這個(gè)類就是借助主線程的handler,把消息發(fā)送到主線程的消息隊(duì)列的:
import android.os.Handler
import android.os.Looper
class AndroidSchedulers {
companion object {
private val DEFAULT =
HandlerScheduler(Handler(Looper.getMainLooper()))
fun mainThread(): Scheduler {
return DEFAULT
}
}
}
再看HandlerScheduler怎么寫?
import android.os.Handler
import android.os.Message
class HandlerScheduler(private val handler: Handler) : Scheduler() {
private class HandlerWorker(private val handler: Handler) : Worker() {
override fun schedule(runnable: Runnable) {
handler.sendMessage(Message.obtain(handler, runnable))
}
}
override fun createWorker(): Worker {
return HandlerWorker(handler)
}
}
可以看到就是利用handler,把runnable包裝成Message對(duì)象發(fā)送到了主線程的消息隊(duì)列,runnable里面的代碼observer.onNext(t)自然運(yùn)行在主線程,進(jìn)而完成了切換的過程。
調(diào)用一下試試:
Observable.create(object : ObservableOnSubscribe<Int> {
override fun subscribe(observer: Observer<Int>) {
observer.onNext(10)
}
}).map {
Log.d(tag, "map in,thread:${Thread.currentThread().name}")
"value:$it"
}.subscribeOn(Scheduler.io()).observerOn(AndroidSchedulers.mainThread()).subscribe(object : Observer<String> {
override fun onNext(t: String) {
Log.d(tag, "$t,thread:${Thread.currentThread().name}")
}
})
這里map操作符在subscribeOn之前,里面的打印應(yīng)該在IoScheduler獲取的線程執(zhí)行,而onNext里面的打印應(yīng)該在主線程執(zhí)行。
結(jié)果:
D map in,thread:pool-2-thread-1
D value:10,thread:main