有小伙伴說看不懂 LiveData、Flow、Channel,跟我走

你的支持對我意義重大!

?? Hi,我是小彭。本文已收錄到 GitHub · Android-NoteBook 中。這里有 Android 進階成長路線筆記 & 博客,有志同道合的朋友,歡迎跟著我一起成長。(聯(lián)系方式 & 入群方式在 GitHub)

背景

  • Kotlin Flow 是基于 Kotlin 協(xié)程基礎能力搭建的一套數(shù)據(jù)流框架,從功能復雜性上看是介于 LiveData 和 RxJava 之間的解決方案。Kotlin Flow 擁有比 LiveData 更豐富的能力,但裁剪了 RxJava 大量復雜的操作符,做得更加精簡。并且在 Kotlin 協(xié)程的加持下,Kotlin Flow 目前是 Google 主推的數(shù)據(jù)流框架。

1. 為什么要使用 Flow?

LiveData、Kotlin Flow 和 RxJava 三者都屬于 可觀察的數(shù)據(jù)容器類,觀察者模式是它們相同的基本設計模式,那么相對于其他兩者,Kotlin Flow 的優(yōu)勢是什么呢?

LiveData 是 androidx 包下的組件,是 Android 生態(tài)中一個的簡單的生命周期感知型容器。簡單即是它的優(yōu)勢,也是它的局限,當然這些局限性不應該算 LiveData 的缺點,因為 LiveData 的設計初衷就是一個簡單的數(shù)據(jù)容器。對于簡單的數(shù)據(jù)流場景,使用 LiveData 完全沒有問題。

  • LiveData 只能在主線程更新數(shù)據(jù): 只能在主線程 setValue,即使 postValue 內(nèi)部也是切換到主線程執(zhí)行;
  • LiveData 數(shù)據(jù)重放問題: 注冊新的訂閱者,會重新收到 LiveData 存儲的數(shù)據(jù),這在有些情況下不符合預期(可以使用自定義的 LiveData 子類 SingleLiveDataUnPeekLiveData 解決,此處不展開);
  • LiveData 不防抖: 重復 setValue 相同的值,訂閱者會收到多次 onChanged() 回調(diào)(可以使用 distinctUntilChanged() 解決,此處不展開);
  • LiveData 不支持背壓: 在數(shù)據(jù)生產(chǎn)速度 > 數(shù)據(jù)消費速度時,LiveData 無法正常處理。比如在子線程大量 postValue 數(shù)據(jù)但主線程消費跟不上時,中間就會有一部分數(shù)據(jù)被忽略。

RxJava 是第三方組織 ReactiveX 開發(fā)的組件,Rx 是一個包括 Java、Go 等語言在內(nèi)的多語言數(shù)據(jù)流框架。功能強大是它的優(yōu)勢,支持大量豐富的操作符,也支持線程切換和背壓。然而 Rx 的學習門檻過高,對開發(fā)反而是一種新的負擔,也會帶來誤用的風險。

Kotlin 是 kotlinx 包下的組件,不是單純 Android 生態(tài)下的產(chǎn)物。那么,F(xiàn)low 的優(yōu)勢在哪里呢?

  • Flow 支持協(xié)程: Flow 基于協(xié)程基礎能力,能夠以結(jié)構(gòu)化并發(fā)的方式生產(chǎn)和消費數(shù)據(jù),能夠?qū)崿F(xiàn)線程切換(依靠協(xié)程的 Dispatcher);
  • Flow 支持背壓: Flow 的子類 SharedFlow 支持配置緩存容量,可以應對數(shù)據(jù)生產(chǎn)速度 > 數(shù)據(jù)消費速度的情況;
  • Flow 支持數(shù)據(jù)重放配置: Flow 的子類 SharedFlow 支持配置重放 replay,能夠自定義對新訂閱者重放數(shù)據(jù)的配置;
  • Flow 相對 RxJava 的學習門檻更低: Flow 的功能更精簡,學習性價比相對更高。不過 Flow 是基于協(xié)程,在協(xié)程會有一些學習成本,但這個應該拆分來看。

當然 Kotlin Flow 也存在一些局限:

  • Flow 不是生命周期感知型組件: Flow 不是 Android 生態(tài)下的產(chǎn)物,自然 Flow 是不會關心組件生命周期。那么我們?nèi)绾未_保訂閱者在監(jiān)聽 Flow 數(shù)據(jù)流時,不會在錯誤的狀態(tài)更新 View 呢?這個問題在下文 第 6 節(jié)再說。

2. 冷數(shù)據(jù)流與熱數(shù)據(jù)流

Kotlin Flow 包含三個實體:數(shù)據(jù)生產(chǎn)方 - (可選的)中介者 - 數(shù)據(jù)使用方。數(shù)據(jù)生產(chǎn)方負責向數(shù)據(jù)流發(fā)射(emit)數(shù)據(jù),而數(shù)據(jù)使用方從數(shù)據(jù)流中消費數(shù)據(jù)。根據(jù)生產(chǎn)方產(chǎn)生數(shù)據(jù)的時機,可以將 Kotlin Flow 分為冷流和熱流兩種:

  • 普通 Flow(冷流): 冷流是不共享的,也沒有緩存機制。冷流只有在訂閱者 collect 數(shù)據(jù)時,才按需執(zhí)行發(fā)射數(shù)據(jù)流的代碼。冷流和訂閱者是一對一的關系,多個訂閱者間的數(shù)據(jù)流是相互獨立的,一旦訂閱者停止監(jiān)聽或者生產(chǎn)代碼結(jié)束,數(shù)據(jù)流就自動關閉。
  • SharedFlow / StateFlow(熱流): 熱流是共享的,有緩存機制的。無論是否有訂閱者 collect 數(shù)據(jù),都可以生產(chǎn)數(shù)據(jù)并且緩存起來。熱流和訂閱者是一對多的關系,多個訂閱者可以共享同一個數(shù)據(jù)流。當一個訂閱者停止監(jiān)聽時,數(shù)據(jù)流不會自動關閉(除非使用 WhileSubscribed 策略,這個在下文再說)。

3. 普通 Flow(冷流)

普通 Flow 是冷流,數(shù)據(jù)是不共享的,也沒有緩存機制。數(shù)據(jù)源會延遲到消費者開始監(jiān)聽時才生產(chǎn)數(shù)據(jù)(如終端操作 collect{}),并且每次訂閱都會創(chuàng)建一個全新的數(shù)據(jù)流。 一旦消費者停止監(jiān)聽或者生產(chǎn)者代碼結(jié)束,F(xiàn)low 會自動關閉。

val coldFlow: Flow<Int> = flow {
    // 生產(chǎn)者代碼
    while(true) {
        // 執(zhí)行計算
        emit(result)
        delay(100)
    }
    // 生產(chǎn)者代碼結(jié)束,流將被關閉
}.collect{ data ->  
}

冷流 Flow 主要的操作如下:

  • 創(chuàng)建數(shù)據(jù)流 flow{}: Flow 構(gòu)造器會創(chuàng)建一個新的數(shù)據(jù)流。flow{} 是 suspend 函數(shù),需要在協(xié)程中執(zhí)行;
  • 發(fā)送數(shù)據(jù) emit(): emit() 將一個新的值發(fā)送到數(shù)據(jù)流中;
  • 終端操作 collect{}: 觸發(fā)數(shù)據(jù)流消費,可以獲取數(shù)據(jù)流中所有的發(fā)出值。Flow 是冷流,數(shù)據(jù)流會延遲到終端操作 collect 才執(zhí)行,并且每次在 Flow 上重復調(diào)用 collect,都會重復執(zhí)行 flow{} 去觸發(fā)發(fā)送數(shù)據(jù)動作(源碼位置:AbstractFlow)。collect 是 suspend 函數(shù),需要在協(xié)程中執(zhí)行。
  • 異常捕獲 catch{}: catch{} 會捕獲數(shù)據(jù)流中發(fā)生的異常;
  • 協(xié)程上下文切換 flowOn(): 更改上流數(shù)據(jù)操作的協(xié)程上下文 CoroutineContext,對下流操作沒有影響。如果有多個 flowOn 運算符,每個 flowOn 只會更改當前位置的上游數(shù)據(jù)流;
  • 狀態(tài)回調(diào) onStart: 在數(shù)據(jù)開始發(fā)送之前觸發(fā),在數(shù)據(jù)生產(chǎn)線程回調(diào);
  • 狀態(tài)回調(diào) onCompletion: 在數(shù)據(jù)發(fā)送結(jié)束之后觸發(fā),在數(shù)據(jù)生產(chǎn)線程回調(diào);
  • 狀態(tài)回調(diào) onEmpty: 在數(shù)據(jù)流為空時觸發(fā)(在數(shù)據(jù)發(fā)送結(jié)束但事實上沒有發(fā)送任何數(shù)據(jù)時),在數(shù)據(jù)生產(chǎn)線程回調(diào)。

普通 Flow 的核心代碼在 AbstractFlow 中,可以看到每次調(diào)用終端操作 collect,collector 代碼塊都會執(zhí)行一次,也就是重新執(zhí)行一次數(shù)據(jù)生產(chǎn)代碼:

AbstractFlow.kt

public abstract class AbstractFlow<T> : Flow<T> {

    @InternalCoroutinesApi
    public final override suspend fun collect(collector: FlowCollector<T>) {
        // 1. 對 flow{} 的包裝
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            // 2. 執(zhí)行 flow{} 代碼塊
            collectSafely(safeCollector)
        } finally {
            // 3. 釋放協(xié)程相關的參數(shù)
            safeCollector.releaseIntercepted()
        }
    }

    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

4. SharedFlow —— 高配版 LiveData

下文要講的 StateFlow 其實是 SharedFlow 的一個子類,所以我們先講 SharedFlow。SharedFlow 和 StateFlow 都屬于熱流,無論是否有訂閱者(collect),都可以生產(chǎn)數(shù)據(jù)并且緩存在內(nèi)部的鏈表中。 它們都有一個可變的版本 MutableSharedFlow 和 MutableStateFlow,這與 LiveData 和 MutableLiveData 類似,對外暴露接口時,應該使用不可變的版本。

4.1 SharedFlow 與 MutableSharedFlow 接口

直接對著接口講不明白,這里先放出這兩個接口方便查看:

public interface SharedFlow<out T> : Flow<T> {
    // 緩存的重放數(shù)據(jù)的快照
    public val replayCache: List<T>
}

public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
    
    // 發(fā)射數(shù)據(jù)(注意這是個掛起函數(shù))
    override suspend fun emit(value: T)

    // 嘗試發(fā)射數(shù)據(jù)(如果緩存溢出策略是 SUSPEND,則溢出時不會掛起而是返回 false)
    public fun tryEmit(value: T): Boolean

    // 活躍訂閱者數(shù)量
    public val subscriptionCount: StateFlow<Int>

    // 重置重放緩存,新訂閱者只會收到注冊后新發(fā)射的數(shù)據(jù)
    public fun resetReplayCache()
}

4.2 構(gòu)造一個 ShareFlow

我會把 SharedFlow 理解為一個高配版的 LiveData,這點首先在構(gòu)造函數(shù)就可以體現(xiàn)出來。SharedFlow 的構(gòu)造函數(shù)允許我們配置三個參數(shù):

SharedFlow.kt

public fun <T> MutableSharedFlow(
    // 重放數(shù)據(jù)個數(shù)
    replay: Int = 0,
    // 額外緩存容量
    extraBufferCapacity: Int = 0,
    // 緩存溢出策略
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
    val bufferCapacity0 = replay + extraBufferCapacity
    val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
    return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}

public enum class BufferOverflow {
    // 掛起
    SUSPEND,
    // 丟棄最早的一個
    DROP_OLDEST,
    // 丟棄最近的一個
    DROP_LATEST
}
參數(shù) 描述
reply 重放數(shù)據(jù)個數(shù),當新訂閱者時注冊時會重放緩存的 replay 個數(shù)據(jù)
extraBufferCapacity 額外緩存容量,在 replay 之外的額外容量,ShareFlow 的緩存容量 capacity = replay + extraBufferCapacity(實在想不出額外容量有什么用,知道可以告訴我)
onBufferOverflow 緩存溢出策略,即緩存容量 capacity 滿時的處理策略(SUSPEND、DROP_OLDEST、DROP_LAST)

SharedFlow 默認容量 capacity 為 0,重放 replay 為 0,緩存溢出策略是 SUSPEND,意味著發(fā)射數(shù)據(jù)時會直接丟棄數(shù)據(jù)并掛起(閱讀 emit 的源碼),而訂閱者訂不會受到任何數(shù)據(jù)。

為什么我們可以把 SharedFlow 理解為 “高配版” LiveData,拿 SharedFlow 和 LiveData 做個簡單的對比就知道了:

  • 容量問題: LiveData 容量固定為 1 個,而 SharedFlow 容量支持配置 0 個到 多個;
  • 背壓問題: LiveData 無法應對背壓問題,而 SharedFlow 有緩存空間能應對背壓問題;
  • 重放問題: LiveData 固定重放 1 個數(shù)據(jù),而 SharedFlow 支持配置重放 0 個到多個;
  • 線程問題: LiveData 只能在主線程訂閱,而 SharedFlow 支持在任意線程(通過協(xié)程的 Dispatcher)訂閱。

當然 SharedFlow 也并不是完勝,LiveData 能夠處理生命周期安全問題,而 SharedFlow 不行(因為 Flow 本身就不是純 Android 生態(tài)下的組件),不合理的使用會存在不必要的操作和資源浪費,以及在錯誤的狀態(tài)更新 View 的風險。不過別擔心,這個問題可以通過 第 6 節(jié) 的 Lifecycle API 來解決。

4.3 普通 Flow 轉(zhuǎn)換為 ShareFlow

前面提到過,冷流是不共享的,也沒有緩存機制。使用 Flow.shareInFlow.stateIn 可以把冷流轉(zhuǎn)換為熱流,一來可以將數(shù)據(jù)共享給多個訂閱者,二來可以增加緩沖機制。

Share.kt

public fun <T> Flow<T>.shareIn(
    // 協(xié)程作用域范圍
    scope: CoroutineScope,
    // 啟動策略
    started: SharingStarted,
    // 控制數(shù)據(jù)重放的個數(shù)
    replay: Int = 0
): SharedFlow<T> {
  val config = configureSharing(replay)
  val shared = MutableSharedFlow<T>(
      replay = replay,
      extraBufferCapacity = config.extraBufferCapacity,
      onBufferOverflow = config.onBufferOverflow
  )
  @Suppress("UNCHECKED_CAST")
  scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
  return shared.asSharedFlow()
}
public companion object {
    // 熱啟動式:立即開始,并在 scope 指定的作用域結(jié)束時終止
    public val Eagerly: SharingStarted = StartedEagerly()
    // 懶啟動式:在注冊首個訂閱者時開始,并在 scope 指定的作用域結(jié)束時終止
    public val Lazily: SharingStarted = StartedLazily()
 
    public fun WhileSubscribed(
        stopTimeoutMillis: Long = 0,
        replayExpirationMillis: Long = Long.MAX_VALUE
    ): SharingStarted =
        StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
}

sharedIn 的參數(shù) scope 和 replay 不需要過多解釋,主要介紹下 started: SharingStarted 啟動策略,分為三種:

  • Eagerly(熱啟動式): 立即啟動數(shù)據(jù)流,并保持數(shù)據(jù)流(直到 scope 指定的作用域結(jié)束);

  • Lazily(懶啟動式): 在首個訂閱者注冊時啟動,并保持數(shù)據(jù)流(直到 scope 指定的作用域結(jié)束);

  • WhileSubscribed(): 在首個訂閱者注冊時啟動,并保持數(shù)據(jù)流直到在最后一個訂閱者注銷時結(jié)束(或直到 scope 指定的作用域結(jié)束)。通過 WhildSubscribed() 策略能夠在沒有訂閱者的時候及時停止數(shù)據(jù)流,避免引起不必要的資源浪費,例如一直從數(shù)據(jù)庫、傳感器中讀取數(shù)據(jù)。

    whileSubscribed() 還提供了兩個配置參數(shù):

    • stopTimeoutMillis 超時時間(毫秒): 最后一個訂閱者注銷訂閱后,保留數(shù)據(jù)流的超時時間,默認值 0 表示立刻停止。這個參數(shù)能夠幫助防抖,避免訂閱者臨時短時間注銷就馬上關閉數(shù)據(jù)流。例如希望等待 5 秒后沒有訂閱者則停止數(shù)據(jù)流,可以使用 whileSubscribed(5000)。
    • replayExpirationMillis 重放過期時間(毫秒): 停止數(shù)據(jù)流后,保留重放數(shù)據(jù)的超時時間,默認值 Long.MAX_VALUE 表示永久保存(replayExpirationMillis 發(fā)生在停止數(shù)據(jù)流后,說明 replayExpirationMillis 時間是在 stopTimeoutMillis 之后發(fā)生的)。例如希望希望等待 5 秒后停止數(shù)據(jù)流,再等待 5 秒后的數(shù)據(jù)視為無用的陳舊數(shù)據(jù),可以使用 whileSubscribed(5000, 5000)。

5. StateFlow —— LiveData 的替代品

StateFlow 是 SharedFlow 的子接口,可以理解為一個特殊的 SharedFlow。不過它們的繼承關系只是接口上有繼承關系,內(nèi)部的實現(xiàn)類 SharedFlowImplStateFlowImpl 其實是分開的,這里要留個印象就好。

5.1 StateFlow 與 MutableStateFlow 接口

這里先放出這兩個接口方便查看:

public interface StateFlow<out T> : SharedFlow<T> {
    // 當前值
    public val value: T
}

public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
    // 當前值
    public override var value: T

    // 比較并設置(通過 equals 對比,如果值發(fā)生真實變化返回 true)
    public fun compareAndSet(expect: T, update: T): Boolean
}

5.2 構(gòu)造一個 StateFlow

StateFlow 的構(gòu)造函數(shù)就簡單多了,有且僅有一個必選的參數(shù),代表初始值:

public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)

5.3 特殊的 SharedFlow

StateFlow 是 SharedFlow 的一種特殊配置,MutableStateFlow(initialValue) 這樣一行代碼本質(zhì)上和下面使用 SharedFlow 的方式是完全相同的:

val shared = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(initialValue) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior
  • 有初始值: StateFlow 初始化時必須傳入初始值;
  • 容量為 1: StateFlow 只會保存一個值;
  • 重放為 1: StateFlow 會向新訂閱者重放最新的值;
  • 不支持 resetReplayCache 重置重放緩存: StateFlow 的 resetReplayCache() 方法拋出 UnsupportedOperationException
  • 緩存溢出策略為 DROP_OLDEST: 意味著每次發(fā)射的新數(shù)據(jù)會覆蓋舊數(shù)據(jù);

總的來說,StateFlow 要求傳入初始值,并且僅支持保存一個最新的數(shù)據(jù),會向新訂閱者會重放一次最新值,也不允許重置重放緩存。說 StateFlow 是 LiveData 的替代品一點不為過。除此之外,StateFlow 還額外支持一些特性:

  • 數(shù)據(jù)防抖: 意味著僅在更新值并且發(fā)生變化才會回調(diào),如果更新值沒有變化不會回調(diào) collect,其實就是在發(fā)射數(shù)據(jù)時加了一層攔截:

StateFlow.kt

public override var value: T
    get() = NULL.unbox(_state.value)
    set(value) { updateState(null, value ?: NULL) }

override fun compareAndSet(expect: T, update: T): Boolean =
    updateState(expect ?: NULL, update ?: NULL)

private fun updateState(expectedState: Any?, newState: Any): Boolean {
    var curSequence = 0
    var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it
    synchronized(this) {
        val oldState = _state.value
        if (expectedState != null && oldState != expectedState) return false // CAS support
        if (oldState == newState) return true // 如果新值 equals 舊值則攔截, 但 CAS 返回 true
        _state.value = newState
        ...
        return true
    }
}
  • CAS 操作: 原子性的比較與設置操作,只有在舊值與 expect 相同時返回 ture。

5.4 普通 Flow 轉(zhuǎn)換為 StateFlow

跟 SharedFlow 一樣,普通 Flow 也可以轉(zhuǎn)換為 StateFlow:

Share.kt

public fun <T> Flow<T>.stateIn(
    // 共享開始時所在的協(xié)程作用域范圍
    scope: CoroutineScope,
    // 共享開始策略
    started: SharingStarted,
    // 初始值
    initialValue: T
): StateFlow<T> {
    val config = configureSharing(1)
    val state = MutableStateFlow(initialValue)
    scope.launchSharing(config.context, config.upstream, state, started, initialValue)
    return state.asStateFlow()
}

6. 安全地觀察 Flow 數(shù)據(jù)流

前面也提到了,F(xiàn)low 不具備 LiveData 的生命周期感知能力,所以訂閱者在監(jiān)聽 Flow 數(shù)據(jù)流時,會存在生命周期安全的問題。Google 推薦的做法是使用 Lifecycle#repeatOnLifecycle API:

// 從 2.4.0 開始支持 Lifecycle#repeatOnLifecycle API
implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.4.1"
  • LifecycleOwner#addRepeatingJob: 在生命周期到達指定狀態(tài)時,自動創(chuàng)建并啟動協(xié)程執(zhí)行代碼塊,在生命周期低于該狀態(tài)時,自動取消協(xié)程。因為 addRepeatingJob 不是掛起函數(shù),所以不遵循結(jié)構(gòu)化并發(fā)的規(guī)則。目前已經(jīng)廢棄,被下面的 repeatOnLifecycle() 替代了(廢棄 addRepeatingJob 的考量見 設計 repeatOnLifecycle API 背后的故事 );
  • Lifecycle#repeatOnLifecycle repeatOnLifecycle 的作用相同,區(qū)別在于它是一個 suspend 函數(shù),需要在協(xié)程中執(zhí)行;
  • Flow#flowWithLifecycle Flow#flowWithLifecycle 的作用相同,內(nèi)部基于 repeatOnLifecycle API。
class LocationActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        lifecycleOwner.addRepeatingJob(Lifecycle.State.STARTED) {
            locationProvider.locationFlow().collect {
                // update UI
            }
        }
    }
}

class LocationActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        // repeatOnLifecycle 是 suspends 函數(shù),所以需要在協(xié)程中執(zhí)行
        lifecycleScope.launch {
        // 當 lifecycleScope 的生命周期高于 STARTED 狀態(tài)時,啟動一個新的協(xié)程并執(zhí)行代碼塊
        // 當 lifecycleScope 的生命周期低于 STARTED 狀態(tài)時,取消該協(xié)程
        repeatOnLifecycle(Lifecycle.State.STARTED) {
            // 當前生命周期一定高于 STARTED 狀態(tài),可以安全地從數(shù)據(jù)流中取數(shù)據(jù),并更新 View
            locationProvider.locationFlow().collect {
                // update UI
            }
        }
        // 結(jié)構(gòu)化并發(fā):生命周期處于 DESTROYED 狀態(tài)時,切換回調(diào)用 repeatOnLifecycle 的協(xié)程繼續(xù)執(zhí)行
        }
    }
}

class LocationActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        locationProvider.locationFlow()
            .flowWithLifecycle(this, Lifecycle.State.STARTED)
            .onEach {
                // update UI
            }
            .launchIn(lifecycleScope) 
    }
}

如果不使用 Lifecycle#repeatOnLifecycle API,具體會出現(xiàn)什么問題呢?

  • Activity.lifecycleScope.launch: 立即啟動協(xié)程,并在 Activity 銷毀時取消協(xié)程;
  • Fragment.lifecycleScope.launch: 立即啟動協(xié)程,并在 Fragment 銷毀時取消協(xié)程;
  • Fragment.viewLifecycleOwner.lifecycleScope.launch: 立即啟動協(xié)程,并在 Fragment 中視圖銷毀時取消協(xié)程。

可以看到,這些協(xié)程 API 只有在最后組件 / 視圖銷毀時才會取消協(xié)程,當視圖進入后臺時協(xié)程并不會被取消,F(xiàn)low 會持續(xù)生產(chǎn)數(shù)據(jù),并且會觸發(fā)更新視圖。

  • LifecycleContinueScope.launchWhenX: 在生命周期到達指定狀態(tài)時立即啟動協(xié)程執(zhí)行代碼塊,在生命周期低于該狀態(tài)時掛起(而不是取消)協(xié)程,在生命周期重新高于指定狀態(tài)時,自動恢復該協(xié)程。

可以看到,這些協(xié)程 API 在視圖離開某個狀態(tài)時會掛起協(xié)程,能夠避免更新視圖。但是 Flow 會持續(xù)生產(chǎn)數(shù)據(jù),也會產(chǎn)生一些不必要的操作和資源消耗(CPU 和內(nèi)存)。 雖然可以在視圖進入后臺時手動取消協(xié)程,但很明顯增寫了模板代碼,沒有 repeatOnLifecycle API 來得簡潔。

class LocationActivity : AppCompatActivity() {

    // 協(xié)程控制器
    private var locationUpdatesJob: Job? = null

    override fun onStart() {
        super.onStart()
        locationUpdatesJob = lifecycleScope.launch {
            locationProvider.locationFlow().collect {
                // update UI
            } 
        }
    }

    override fun onStop() {
       // 在視圖進入后臺時取消協(xié)程
        locationUpdatesJob?.cancel()
        super.onStop()
    }
}

回過頭來看,repeatOnLifecycle 是怎么實現(xiàn)生命周期感知的呢?其實很簡單,是通過 Lifecycle#addObserver 來監(jiān)聽生命周期變化:

RepeatOnLifecycle.kt

suspendCancellableCoroutine<Unit> { cont ->
    // Lifecycle observers that executes `block` when the lifecycle reaches certain state, and
    // cancels when it falls below that state.
    val startWorkEvent = Lifecycle.Event.upTo(state)
    val cancelWorkEvent = Lifecycle.Event.downFrom(state)
    val mutex = Mutex()
    observer = LifecycleEventObserver { _, event ->
        if (event == startWorkEvent) {
            // Launch the repeating work preserving the calling context
            launchedJob = this@coroutineScope.launch {
                // Mutex makes invocations run serially,
                // coroutineScope ensures all child coroutines finish
                mutex.withLock {
                    coroutineScope {
                        block()
                    }
                }
            }
            return@LifecycleEventObserver
        }
        if (event == cancelWorkEvent) {
            launchedJob?.cancel()
            launchedJob = null
        }
        if (event == Lifecycle.Event.ON_DESTROY) {
            cont.resume(Unit)
        }
    }
    this@repeatOnLifecycle.addObserver(observer as LifecycleEventObserver)
}

7. Channel 通道

在協(xié)程的基礎能力上使用數(shù)據(jù)流,除了上文提到到 Flow API,還有一個 Channel API。Channel 是 Kotlin 中實現(xiàn)跨協(xié)程數(shù)據(jù)傳輸?shù)臄?shù)據(jù)結(jié)構(gòu),類似于 Java 中的 BlockQueue 阻塞隊列。不同之處在于 BlockQueue 會阻塞線程,而 Channel 是掛起線程。Google 的建議 是優(yōu)先使用 Flow 而不是 Channel,主要原因是 Flow 會更自動地關閉數(shù)據(jù)流,而一旦 Channel 沒有正常關閉,則容易造成資源泄漏。此外,F(xiàn)low 相較于 Channel 提供了更明確的約束和操作符,更靈活。

Channel 主要的操作如下:

  • 創(chuàng)建 Channel: 通過 Channel<Int>(Channel.UNLIMITED) 創(chuàng)建一個 Channel 對象,或者直接使用 produce<Int>{} 創(chuàng)建一個生產(chǎn)者協(xié)程;
  • 關閉 Channel: Channel#close();
  • 發(fā)送數(shù)據(jù): Channel#send() 往 Channel 中發(fā)送一個數(shù)據(jù),在 Channel 容量不足時 send() 操作會掛起,Channel 默認容量 capacity 是 1;
  • 接收數(shù)據(jù): 通過 Channel#receive() 從 Channel 中取出一個數(shù)據(jù),或者直接通過 actor<Int> 創(chuàng)建一個消費者協(xié)程,在 Channel 中數(shù)據(jù)不足時 receive() 操作會掛起。
  • 廣播通道 BroadcastChannel(廢棄,使用 SharedFlow): 普通 Channel 中一個數(shù)據(jù)只會被一個消費端接收,而 BroadcastChannel 允許多個消費端接收。
public fun <E> Channel(

    // 緩沖區(qū)容量,當超出容量時會觸發(fā) onBufferOverflow 拒絕策略
    capacity: Int = RENDEZVOUS,  

    // 緩沖區(qū)溢出策略,默認為掛起,還有 DROP_OLDEST 和 DROP_LATEST
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,

    // 處理元素未能成功送達處理的情況,如訂閱者被取消或者拋異常
    onUndeliveredElement: ((E) -> Unit)? = null

): Channel<E>

8. 淺嘗一下

到這里,LiveData、Flow 和 Channel 我們都講了一遍了,實際場景中怎么使用呢,淺嘗以下。

  • 事件(Event): 事件是一次有效的,新訂閱者不應該收到舊的事件,因此事件數(shù)據(jù)適合用 SharedFlow(replay=0);
  • 狀態(tài)(State): 狀態(tài)是可以恢復的,新訂閱者允許收到舊的狀態(tài)數(shù)據(jù),因此狀態(tài)數(shù)據(jù)適合用 StateFlow。

示例代碼如下,不熟悉 MVI 模式的同學可以移步:Android UI 架構(gòu)演進:從 MVC 到 MVP、MVVM、MVI

BaseViewModel.kt

interface UiState

interface UiEvent

interface UiEffect

abstract class BaseViewModel<State : UiState, Event : UiEvent, Effect : UiEffect> : ViewModel() {

    // 初始狀態(tài)
    private val initialState: State by lazy { createInitialState() }

    // 頁面需要的狀態(tài),對應于 MVI 模式的 ViewState
    private val _uiState = MutableStateFlow<State>(initialState)
    // 對外接口使用不可變版本
    val uiState = _uiState.asStateFlow()

    // 頁面狀態(tài)變更的 “副作用”,類似一次性事件,不需要重放的狀態(tài)變更(例如 Toast)
    private val _effect = MutableSharedFlow<Effect>()
    // 對外接口使用不可變版本
    val effect = _effect.asSharedFlow()

    // 頁面的事件操作,對應于 MVI 模式的 Intent 
    private val _event = MutableSharedFlow<Event>()

    init {
        viewModelScope.launch {
            _event.collect {
                handleEvent(it)
            }
        }
    }

    // 初始狀態(tài)
    protected abstract fun createInitialState(): State

    // 事件處理
    protected abstract fun handleEvent(event: Event)

    /**
     * 事件入口
     */
    fun sendEvent(event: Event) {
        viewModelScope.launch {
            _event.emit(event)
        }
    }

    /**
     * 狀態(tài)變更
     */
    protected fun setState(newState: State) {
        _uiState.value = newState
    }

    /**
     * 副作用
     */
    protected fun setEffect(effect: Effect) {
        _effect.send(effect)
    }
}

參考資料

你的點贊對我意義重大!微信搜索公眾號 [彭旭銳],希望大家可以一起討論技術(shù),找到志同道合的朋友,我們下次見!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容