基于rxjava2 & kotlin實現(xiàn)的RxBus

這是一個基于rxjava2實現(xiàn)的事件總線類,可以方便的使用關鍵字來發(fā)送和訂閱信息。

代碼

class RxBus {
    companion object {
        private val _instance: RxBus by lazy { RxBus() }
        val instance get() = _instance
    }

    data class RxMsg(val action: String, val event: Any)

    private val bus: FlowableProcessor<RxMsg> by lazy { PublishProcessor.create<RxMsg>().toSerialized() }
    private val map = mutableMapOf<Any, MutableMap<String, MutableList<Disposable>>>()

    fun <T : Any> post(a: String, o: T) = SerializedSubscriber(bus).onNext(RxMsg(a, o))

    fun <T> flowable(clazz: Class<T>,
                     action: String,
                     scheduler: Scheduler = AndroidSchedulers.mainThread()): Flowable<T> {
        return bus.ofType(RxMsg::class.java).filter {
            it.action == action && clazz.isInstance(it.event)
        }.map { clazz.cast(it.event) }.observeOn(scheduler)
    }

    inline fun <reified T> flowable(action: String,
                                    scheduler: Scheduler = AndroidSchedulers.mainThread()): Flowable<T> =
            flowable(T::class.java, action, scheduler)

    fun <T> subscribe(clazz: Class<T>,
                      target: Any,
                      action: String,
                      scheduler: Scheduler = AndroidSchedulers.mainThread(),
                      call: (T) -> Unit): Disposable =
            flowable(clazz, action, scheduler).subscribe { call(it) }.also { obs ->
                map.getOrPut(target, { mutableMapOf() }).getOrPut(action, { mutableListOf() }).add(obs)
            }

    inline fun <reified T> subscribe(target: Any,
                                     action: String,
                                     scheduler: Scheduler = AndroidSchedulers.mainThread(),
                                     noinline call: (T) -> Unit): Disposable =
            subscribe(T::class.java, target, action, scheduler, call)

    fun unsubscribe(target: Any, action: String? = null) {
        map[target]?.let {
            if (action != null) it.remove(action)?.onEach { it.dispose() }
            else it.onEach { it.value.forEach { it.dispose() } }.clear()
            if (it.isEmpty()) map.remove(target)
        }
    }
}

使用

  • 在指定對象上按關鍵字訂閱, 可以按指定關鍵字(action)和類型(String)接收數(shù)據(jù)。
RxBus.instance.subscribe<String>(this, "action") { println(it) }
  • 按關鍵字取消訂閱
RxBus.instance.unsubscribe(this, "action")
  • 取消指定對象上的全部訂閱
RxBus.instance.unsubscribe(this)
  • 發(fā)送消息,會自動發(fā)送指定類型數(shù)據(jù)給訂閱者。
RxBus.instance.post("action", "msg")
  • 也可不依賴于一個對象,這樣需要手工調(diào)用 dispose() 取消訂閱,或者使用RxLifecycle管理訂閱。
val disposable = RxBus.instance.flowable<String>("action").observeOn(AndroidSchedulers.mainThread()).subscribe {
    println(it)
}
disposable.dispose()

示例

override fun onCreate(state: Bundle?) {
    super.onCreate(state)
    setContentView(R.layout.activity_main)
    RxBus.instance.subscribe<String>(this, "click") { 
        Toast.makeText(this, it, Toast.LENGTH_SHORT).show()
    }
    findViewById<Button>(R.id.button).setOnClickListener {
        RxBus.instance.post("click", "test")
    }
}
override fun onDestroy() {
    super.onDestroy()
    RxBus.instance.unsubscribe(this, "day_night")
}

應用 https://github.com/yueeng/meitu

aitaotu
meitulu
meituri
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容