干掉RxJava系列--2. 手寫FlowBus替代RxBus/EventBus/LiveDataBus

LiveData的不足

  • LiveData 是一個(gè)專用于 Android 的具備自主生命周期感知能力的可觀察的數(shù)據(jù)存儲(chǔ)器類,被有意簡(jiǎn)化設(shè)計(jì),這使得開發(fā)者很容易上手,但其不足有如下兩點(diǎn):
  1. LiveData只能在主線程更新數(shù)據(jù)(postValue底層也是切換到主線程的,而且可能會(huì)有丟數(shù)據(jù)的問題);
  2. LiveData操作符不夠強(qiáng)大, 對(duì)于較為復(fù)雜的交互數(shù)據(jù)流場(chǎng)景,建議使用 RxJava 或 Flow;
  3. LiveData與Android平臺(tái)緊密相連,雖然LiveData在表現(xiàn)層中運(yùn)行良好,但它并不適合領(lǐng)域?qū)?,因?yàn)轭I(lǐng)域?qū)幼詈檬仟?dú)立于平臺(tái)的;
  • LiveData 對(duì)于 Java 開發(fā)者、初學(xué)者或是一些簡(jiǎn)單場(chǎng)景而言仍是可行的解決方案。對(duì)于MVVM架構(gòu)而言,View和ViewModel之間可以通過LiveData交互(看了下面就知道其實(shí)也可以用StateFlow), ViewModel和Repository之間就可以通過Flow交互;

RxJava的不足

  • RxJava還是相當(dāng)強(qiáng)大的,基于事件流的鏈?zhǔn)秸{(diào)用,進(jìn)行耗時(shí)任務(wù),線程切換,是一個(gè)很好的異步操作庫(kù), 但是對(duì)于Android開發(fā)來說其也有一些不足之處
  1. 強(qiáng)大意味著復(fù)雜,其繁多的操作符簡(jiǎn)直是初學(xué)者的噩夢(mèng);
  2. 它是非官方的,google自然也就不會(huì)花大力氣去推廣和優(yōu)化;
  3. 為項(xiàng)目的包體積帶來了額外的增加;

Flow

  • Flow 是一種 "冷流"(Cold Stream)。"冷流" 是一種數(shù)據(jù)源,該類數(shù)據(jù)源的生產(chǎn)者會(huì)在每個(gè)監(jiān)聽者開始消費(fèi)事件的時(shí)候執(zhí)行(即不消費(fèi)則不生產(chǎn)數(shù)據(jù),而LiveData的發(fā)送端并不依賴于接收端),從而在每個(gè)訂閱上創(chuàng)建新的數(shù)據(jù)流(有多個(gè)訂閱者的時(shí)候,他們各自的事件是獨(dú)立的)。一旦消費(fèi)者停止監(jiān)聽或者生產(chǎn)者的阻塞結(jié)束,數(shù)據(jù)流將會(huì)被自動(dòng)關(guān)閉。
  • Flow 是 Kotlin 協(xié)程與響應(yīng)式編程模型結(jié)合的產(chǎn)物,支持線程切換、背壓,通過協(xié)程取消功能提供自動(dòng)清理功能,因此傾向于執(zhí)行一些重型任務(wù)。
  • 使用 take, first, toList 等操作符可以簡(jiǎn)化 Flow 的相關(guān)代碼測(cè)試。
  • Flow本身并不了解Android的生命周期,也不提供Android生命周期狀態(tài)變化時(shí)收集器的自動(dòng)暫停和恢復(fù),可以使用LifecycleCoroutineScope的擴(kuò)展,如 launchWhenStarted來啟動(dòng)coroutine來收集我們的Flow--這些收集器將自動(dòng)暫停,并與組件的Lifecycle同步恢復(fù)。
  • 相較于 Channel,F(xiàn)low 末端操作符 會(huì)觸發(fā)數(shù)據(jù)流的執(zhí)行,同時(shí)會(huì)根據(jù)生產(chǎn)者一側(cè)流操作來決定是成功完成操作還是拋出異常,因此 Flows 會(huì)自動(dòng)地關(guān)閉數(shù)據(jù)流,不會(huì)在生產(chǎn)者一側(cè)泄漏資源;而一旦 Channel 沒有正確關(guān)閉,生產(chǎn)者可能不會(huì)清理大型資源,因此 Channels 更容易造成資源泄漏。

Flow的一些常用操作符

//        val flow = flowOf(1,2,3,4,5)
//        val flow: Flow<Int> = flow {
//            List(20) {
//                emit(it)//發(fā)送數(shù)據(jù)
//                delay(300)
//            }
//        }
val flow = (1..10).asFlow()
lifecycleScope.launch {
    flow.flowOn(Dispatchers.IO)//設(shè)定它運(yùn)行時(shí)所使用的調(diào)度器,設(shè)置的調(diào)度器只對(duì)它之前的操作有影響
        .onStart { log("onStart") }
        .flowOn(Dispatchers.Main)
        .onEach {
            log("onEach:$it")
            delay(300)
        }
        .filter {//過濾
            it % 2 == 0
        }
        .map {//變換
            log("map:$it*$it")
            it * it
        }
        .transform<Int,String> {
            "num=$it"
//                    emit("num1=$it")
//                    emit("num2=$it")
        }
        .flowOn(Dispatchers.IO)
        .onCompletion {//訂閱流的完成,執(zhí)行在流完成時(shí)的邏輯
            log("onCompletion: $it")
        }
        .catch {//捕獲 Flow 的異常,catch 函數(shù)只能捕獲它的上游的異常
            log("catch: $it")
        }
        .flowOn(Dispatchers.Main)
        .collect {//消費(fèi)Flow
            log("collect1_1: $it")
        }
    //Flow 可以被重復(fù)消費(fèi)
    flow.collect { log("collect1_2: $it") }
    //除了可以在 collect 處消費(fèi) Flow 的元素以外,還可以通過 onEach 來做到這一點(diǎn)。
    // 這樣消費(fèi)的具體操作就不需要與末端操作符放到一起,collect 函數(shù)可以放到其他任意位置調(diào)用
    flow.onEach {
        log("onEach2:$it")
    }
    withContext(Dispatchers.IO) {
        delay(1000)
        flow.collect()
    }
    //除了使用子協(xié)程執(zhí)行上流外,我們還可以使用launchIn函數(shù)來讓Flow使用全新的協(xié)程上下文
    flow.onEach {
        log("onEach2:$it")
    }.launchIn(CoroutineScope(Dispatchers.IO))
        .join()//主線程等待這個(gè)協(xié)程執(zhí)行結(jié)束

Flow的取消

lifecycleScope.launch(Dispatchers.IO) {
    val flow2 = (1..10).asFlow().onEach { delay(1000) }
    val job: Job = lifecycleScope.launch {
        log("lifecycleScope.launch")
        flow2.flowOn(Dispatchers.IO)//設(shè)定它運(yùn)行時(shí)所使用的調(diào)度器
            .collect {//消費(fèi)Flow
                log("flow2:$it")
            }
    }
    delay(2000)
    job.cancelAndJoin()
}

Flow 的背壓

  • 只要是響應(yīng)式編程,就一定會(huì)有背壓?jiǎn)栴},我們先來看看背壓究竟是什么。
  • 背壓?jiǎn)栴}在生產(chǎn)者的生產(chǎn)速率高于消費(fèi)者的處理速率的情況下出現(xiàn)。為了保證數(shù)據(jù)不丟失,我們也會(huì)考慮添加緩存來緩解問題:
//為 Flow 添加緩沖
flow {
    List(5) {
        emit(it)
    }
}.buffer().collect {
    log("flow buffer collect:$it")
}
  • 也可以為 buffer 指定一個(gè)容量。不過,如果我們只是單純地添加緩存,而不是從根本上解決問題就始終會(huì)造成數(shù)據(jù)積壓。
  • 問題產(chǎn)生的根本原因是生產(chǎn)和消費(fèi)速率的不匹配,除直接優(yōu)化消費(fèi)者的性能以外,我們也可以采取一些取舍的手段。
  • 第一種是 conflate。與 Channel 的 Conflate 模式一致,新數(shù)據(jù)會(huì)覆蓋老數(shù)據(jù),
flow {
    List(10) {
        emit(it)
    }
}
.conflate()
.collect { value ->
    log("flow conflate Collecting $value")
    delay(100)
    log("$value collected flow conflate ")
}
  • 第二種是 collectLatest。顧名思義,只處理最新的數(shù)據(jù),這看上去似乎與 conflate 沒有區(qū)別,其實(shí)區(qū)別大了:它并不會(huì)直接用新數(shù)據(jù)覆蓋老數(shù)據(jù),而是每一個(gè)都會(huì)被處理,只不過如果前一個(gè)還沒被處理完后一個(gè)就來了的話,處理前一個(gè)數(shù)據(jù)的邏輯就會(huì)被取消除 collectLatest 之外還有 mapLatest、flatMapLatest 等等,都是這個(gè)作用。
flow {
    List(10) {
        emit(it)
    }
}.collectLatest { value ->
    log("flow collectLatest Collecting $value")
    delay(100)
    log("$value collected flow collectLatest ")
}

使用更為安全的方式收集 Android UI 數(shù)據(jù)流

  • 在 Android 開發(fā)中,請(qǐng)使用 LifecycleOwner.addRepeatingJob、suspend Lifecycle.repeatOnLifecycle 或 Flow.flowWithLifecycle 從 UI 層安全地收集數(shù)據(jù)流。(lifecycle-runtime-ktx:2.4.+ 庫(kù)中所提供的)
lifecycleScope.launch {
    delay(500)
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        flow.collect { log("collect2: $it") }
    }
}
lifecycleScope.launchWhenStarted {
    delay(1000)
    flow.collect { log("collect3: $it") }
}
lifecycleScope.launch {
    delay(1500)
    flow.flowWithLifecycle(lifecycle,Lifecycle.State.STARTED)
        .collect { log("collect4: $it") }
}

SharedFlow

  • 冷流和訂閱者只能是一對(duì)一的關(guān)系,當(dāng)我們要實(shí)現(xiàn)一個(gè)流,多個(gè)訂閱者的需求時(shí),就需要熱流了,SharedFlow就是一種熱流
  • 其構(gòu)造函數(shù)如下
public fun <T> MutableSharedFlow(
    replay: Int = 0,//當(dāng)新的訂閱者Collect時(shí),發(fā)送幾個(gè)已經(jīng)發(fā)送過的數(shù)據(jù)給它,默認(rèn)為0,即默認(rèn)新訂閱者不會(huì)獲取以前的數(shù)據(jù)
    extraBufferCapacity: Int = 0,//表示減去replay,MutableSharedFlow還緩存多少數(shù)據(jù),默認(rèn)為0
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND//表示緩存策略,即緩沖區(qū)滿了之后Flow如何處理
    //BufferOverflow.SUSPEND 策略,也就是掛起策略, 默認(rèn)為掛起
    //BufferOverflow.DROP_OLDEST: 丟棄舊數(shù)據(jù)
    //BufferOverflow.DROP_LATEST: 丟棄最新的數(shù)據(jù)
)
  • 簡(jiǎn)單使用如下
val sharedFlow = MutableSharedFlow<String>()
lifecycleScope.launch(Dispatchers.IO) {
    delay(1000)
    sharedFlow.emit("aaa")
    delay(1000)
    sharedFlow.emit("bbb")
    delay(1000)
    sharedFlow.emit("ccc")
}
lifecycleScope.launch {
    delay(500)
    sharedFlow.collect { log("collect1:$it") }
}
lifecycleScope.launch {
    delay(1500)
    sharedFlow.collect { log("collect2:$it") }
}
lifecycleScope.launch {
    delay(2500)
    sharedFlow.collect { log("collect3:$it") }
}
lifecycleScope.launch {
    delay(3500)
    sharedFlow.collect { log("collect4:$it") }
}
  • 將冷流Flow轉(zhuǎn)化為SharedFlow
lifecycleScope.launch {
    (1..5).asFlow().shareIn(
        //1. 共享開始時(shí)所在的協(xié)程作用域范圍
        scope = lifecycleScope,
        //2. 控制共享的開始和結(jié)束的策略
        // started = SharingStarted.Lazily,//當(dāng)首個(gè)訂閱者出現(xiàn)時(shí)開始,在scope指定的作用域被結(jié)束時(shí)終止
        // started = SharingStarted.Eagerly,//立即開始,而在scope指定的作用域被結(jié)束時(shí)終止
        //對(duì)于那些只執(zhí)行一次的操作,您可以使用Lazily或者Eagerly。然而,如果您需要觀察其他的流,就應(yīng)該使用WhileSubscribed來實(shí)現(xiàn)細(xì)微但又重要的優(yōu)化工作
        //WhileSubscribed策略會(huì)在沒有收集器的情況下取消上游數(shù)據(jù)流
        started = SharingStarted.WhileSubscribed(
            500,//stopTimeoutMillis 控制一個(gè)以毫秒為單位的延遲值,指的是最后一個(gè)訂閱者結(jié)束訂閱與停止上游流的時(shí)間差。默認(rèn)值是 0(比如當(dāng)用戶旋轉(zhuǎn)設(shè)備時(shí),原來的視圖會(huì)先被銷毀,然后數(shù)秒鐘內(nèi)重建)
            Long.MAX_VALUE//replayExpirationMillis表示數(shù)據(jù)重播的過時(shí)時(shí)間,如果用戶離開應(yīng)用太久,此時(shí)您不想讓用戶看到陳舊的數(shù)據(jù),你可以用到這個(gè)參數(shù)
        ),
        //3. 狀態(tài)流的重播個(gè)數(shù)
        replay = 0
    ).collect { log("shareIn.collect:$it") }
}

StateFlow

  • StateFlow繼承于SharedFlow,是SharedFlow的一個(gè)特殊變種
  • 構(gòu)造函數(shù)如下,只需要傳入一個(gè)默認(rèn)值
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
  • StateFlow本質(zhì)上是一個(gè)replay為1,并且沒有緩沖區(qū)的SharedFlow,因此第一次訂閱時(shí)會(huì)先獲得默認(rèn)值
  • StateFlow僅在值已更新,并且值發(fā)生了變化時(shí)才會(huì)返回,即如果更新后的值沒有變化,也不會(huì)回調(diào)Collect方法,這點(diǎn)與LiveData不同
  • StateFlow 與 LiveData是最接近的,因?yàn)椋?/li>
1. 它始終是有值的。
2. 它的值是唯一的。
3. 它允許被多個(gè)觀察者共用 (因此是共享的數(shù)據(jù)流)。
4. 它永遠(yuǎn)只會(huì)把最新的值重現(xiàn)給訂閱者,這與活躍觀察者的數(shù)量是無關(guān)的。
  • 簡(jiǎn)單使用
log("StateFlow 默認(rèn)值:111")
val stateFlow = MutableStateFlow("111")

lifecycleScope.launch {
    delay(500)
    stateFlow.collect { log("StateFlow collect1:$it") }
}
lifecycleScope.launch {
    delay(1500)
    stateFlow.collect { log("StateFlow collect2:$it") }
}
lifecycleScope.launch {
    delay(2500)
    stateFlow.collect { log("StateFlow collect3:$it") }
}

lifecycleScope.launch(Dispatchers.IO) {
    delay(5000)
    log("StateFlow re emit:111")
    stateFlow.emit("111")
    delay(1000)
    log("StateFlow emit:222")
    stateFlow.emit("222")
}
  • 普通流Flow轉(zhuǎn)化成StateFlow
val stateFlow2: StateFlow<Int> = flow {
    List(10) {
        delay(300)
        emit(it)
    }
}.stateIn(
    scope = lifecycleScope,
    started = WhileSubscribed(5000),//等待5秒后仍然沒有訂閱者存在就終止協(xié)程
    initialValue = 666//默認(rèn)值
)
lifecycleScope.launchWhenStarted {//STARTED狀態(tài)時(shí)會(huì)開始收集流,并且在RESUMED狀態(tài)時(shí)保持收集,進(jìn)入STOPPED狀態(tài)時(shí)結(jié)束收集過程
    stateFlow2.collect { log("StateFlow shareIn.collect:$it") }

}

StateFlow與SharedFlow 的區(qū)別

  1. SharedFlow配置更為靈活,支持配置replay,緩沖區(qū)大小等,StateFlow是SharedFlow的特化版本,replay固定為1,緩沖區(qū)大小默認(rèn)為0;
  2. StateFlow與LiveData類似,支持通過myFlow.value獲取當(dāng)前狀態(tài),如果有這個(gè)需求,必須使用StateFlow;
  3. SharedFlow支持發(fā)出和收集重復(fù)值,而StateFlow當(dāng)value重復(fù)時(shí),不會(huì)回調(diào)collect;
  4. 對(duì)于新的訂閱者,StateFlow只會(huì)重播當(dāng)前最新值,SharedFlow可配置重播元素個(gè)數(shù)(默認(rèn)為0,即不重播);

基于SharedFlow封裝FlowBus

創(chuàng)建消息類EventMessage

class EventMessage {
    /**
     * 消息的key
     */
    var key: Int

    /**
     * 消息的主體message
     */
    var message: Any? = null
    private var messageMap: HashMap<String, Any?>? = null

    constructor(key: Int, message: Any?) {
        this.key = key
        this.message = message
    }

    constructor(key: Int) {
        this.key = key
    }

    fun put(key: String, message: Any?) {
        if (messageMap == null) {
            messageMap = HashMap<String, Any?>()
        }
        messageMap?.set(key, message)
    }

    operator fun <T> get(key: String?): T? {
        if (messageMap != null) {
            try {
                return messageMap!![key] as T?
            } catch (e: ClassCastException) {
                e.printStackTrace()
            }
        }
        return null
    }
}

創(chuàng)建FlowBus

class FlowBus : ViewModel() {
    companion object {
        val instance by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { FlowBus() }
    }

    //正常事件
    private val events = mutableMapOf<String, Event<*>>()

    //粘性事件
    private val stickyEvents = mutableMapOf<String, Event<*>>()

    fun with(key: String, isSticky: Boolean = false): Event<Any> {
        return with(key, Any::class.java, isSticky)
    }

    fun <T> with(eventType: Class<T>, isSticky: Boolean = false): Event<T> {
        return with(eventType.name, eventType, isSticky)
    }

    @Synchronized
    fun <T> with(key: String, type: Class<T>?, isSticky: Boolean): Event<T> {
        val flows = if (isSticky) stickyEvents else events
        if (!flows.containsKey(key)) {
            flows[key] = Event<T>(key, isSticky)
        }
        return flows[key] as Event<T>
    }


    class Event<T>(private val key: String, isSticky: Boolean) {

        // private mutable shared flow
        private val _events = MutableSharedFlow<T>(
            replay = if (isSticky) 1 else 0,
            extraBufferCapacity = Int.MAX_VALUE
        )

        // publicly exposed as read-only shared flow
        val events = _events.asSharedFlow()

        /**
         * need main thread execute
         */
        fun observeEvent(
            lifecycleOwner: LifecycleOwner,
            dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
            minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
            action: (t: T) -> Unit
        ) {
            lifecycleOwner.lifecycle.addObserver(object : DefaultLifecycleObserver {
                override fun onDestroy(owner: LifecycleOwner) {
                    super.onDestroy(owner)
                    LjyLogUtil.d("EventBus.onDestroy:remove key=$key")
                    val subscriptCount = _events.subscriptionCount.value
                    if (subscriptCount <= 0)
                        instance.events.remove(key)
                }
            })
            lifecycleOwner.lifecycleScope.launch(dispatcher) {
                lifecycleOwner.lifecycle.whenStateAtLeast(minActiveState) {
                    events.collect {
                        try {
                            action(it)
                        } catch (e: Exception) {
                            LjyLogUtil.d("ker=$key , error=${e.message}")
                        }
                    }
                }
            }
        }

        /**
         * send value
         */
        suspend fun setValue(
            event: T,
            dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate
        ) {
            withContext(dispatcher) {
                _events.emit(event)
            }

        }
    }
}

使用FlowBus

FlowBus.instance.with(EventMessage::class.java).observeEvent(this) {
    LjyLogUtil.d("FlowBus.register1:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
lifecycleScope.launch(Dispatchers.IO) {
    withContext(Dispatchers.Main) {//不創(chuàng)建新的協(xié)程,指定協(xié)程上運(yùn)行代碼塊,可以切換線程
        FlowBus.instance.with(EventMessage::class.java)
            .observeEvent(this@EventBusActivity) {
                LjyLogUtil.d("FlowBus.register2:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
            }
    }
}
FlowBus.instance.with(EventMessage::class.java).observeEvent(this) {
    LjyLogUtil.d("FlowBus.register3:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
lifecycleScope.launch(Dispatchers.Main) {

    val event = EventMessage(111)
    LjyLogUtil.d(
        "FlowBus:send1_${Thread.currentThread().name}_${
            GsonUtils.toJson(
                event
            )
        }"
    )
    FlowBus.instance.with(EventMessage::class.java).setValue(event)
    delay(2000)
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(101))
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(102))
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(103))
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(104))
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(105))
}
lifecycleScope.launch(Dispatchers.IO) {
    delay(4000)
    val event = EventMessage(222, "bbb")
    LjyLogUtil.d(
        "FlowBus:send2_${Thread.currentThread().name}_${
            GsonUtils.toJson(
                event
            )
        }"
    )
    FlowBus.instance.with(EventMessage::class.java).setValue(event)
}
lifecycleScope.launch(Dispatchers.Default) {
    delay(6000)
    withContext(Dispatchers.Main) {
        val event = EventMessage(333, "ccc")
        event.put("key1", 123)
        event.put("key2", "abc")
        LjyLogUtil.d(
            "FlowBus:send3_${Thread.currentThread().name}_${
                GsonUtils.toJson(
                    event
                )
            }"
        )
        FlowBus.instance.with(EventMessage::class.java).setValue(event)
    }
}

進(jìn)一步優(yōu)化

  • 利用擴(kuò)展函數(shù),ViewModelStoreOwner,及預(yù)傳EventMessage::class.javas是當(dāng)前項(xiàng)目中的使用更加簡(jiǎn)單
//利用擴(kuò)展函數(shù)
fun LifecycleOwner.observeEvent(
    dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
    minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
    isSticky: Boolean = false,
    action: (t: EventMessage) -> Unit
) {
    ApplicationScopeViewModelProvider
        .getApplicationScopeViewModel(FlowBus::class.java)
        .with(EventMessage::class.java, isSticky = isSticky)
        .observeEvent(this@observeEvent, dispatcher, minActiveState, action)
}

fun postValue(
    event: EventMessage,
    delayTimeMillis: Long = 0,
    isSticky: Boolean = false,
    dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
) {
    LjyLogUtil.d("FlowBus:send_${Thread.currentThread().name}_${GsonUtils.toJson(event)}")
    ApplicationScopeViewModelProvider
        .getApplicationScopeViewModel(FlowBus::class.java)
        .viewModelScope
        .launch(dispatcher) {
            delay(delayTimeMillis)
            ApplicationScopeViewModelProvider
                .getApplicationScopeViewModel(FlowBus::class.java)
                .with(EventMessage::class.java, isSticky = isSticky)
                .setValue(event)
        }
}

private object ApplicationScopeViewModelProvider : ViewModelStoreOwner {

    private val eventViewModelStore: ViewModelStore = ViewModelStore()

    override fun getViewModelStore(): ViewModelStore {
        return eventViewModelStore
    }

    private val mApplicationProvider: ViewModelProvider by lazy {
        ViewModelProvider(
            ApplicationScopeViewModelProvider,
            ViewModelProvider.AndroidViewModelFactory.getInstance(FlowBusInitializer.application)
        )
    }

    fun <T : ViewModel> getApplicationScopeViewModel(modelClass: Class<T>): T {
        return mApplicationProvider[modelClass]
    }

}

object FlowBusInitializer {
    lateinit var application: Application
    //在Application中初始化
    fun init(application: Application) {
        FlowBusInitializer.application = application
    }
}
  • 使用
lifecycleScope.launch(Dispatchers.IO) {
    observeEvent {
        LjyLogUtil.d("FlowBus.register1:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
    }
    observeEvent(Dispatchers.IO) {
        LjyLogUtil.d("FlowBus.register2:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
    }

    observeEvent(Dispatchers.Main) {
        LjyLogUtil.d("FlowBus.register3:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
    }
}

lifecycleScope.launch(Dispatchers.IO) {
    delay(1000)
    postValue(EventMessage(100))
    postValue(EventMessage(101), 1000)
    postValue(EventMessage(102, "bbb"), dispatcher = Dispatchers.IO)
    val event3 = EventMessage(103, "ccc")
    event3.put("key1", 123)
    event3.put("key2", "abc")
    postValue(event3, 2000, dispatcher = Dispatchers.Main)
}

參考

我是今陽(yáng),如果想要進(jìn)階和了解更多的干貨,歡迎關(guān)注微信公眾號(hào) “今陽(yáng)說” 接收我的最新文章

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容