Kotlin StateFlow、SharedFlow、Channel

StateFlowSharedFlow 是 Flow API,允許數(shù)據(jù)流以最優(yōu)方式發(fā)出狀態(tài)更新并向多個使用方發(fā)出值。

StateFlow

官方文檔解釋:StateFlow 是一個狀態(tài)容器式可觀察數(shù)據(jù)流,可以向其收集器發(fā)出當前狀態(tài)更新和新狀態(tài)更新。還可通過其 value 屬性讀取當前狀態(tài)值。如需更新狀態(tài)并將其發(fā)送到數(shù)據(jù)流,請為 MutableStateFlow 類的 value 屬性分配一個新值。

StateFlow有兩種類型: StateFlow 和 MutableStateFlow :

public interface StateFlow<out T> : SharedFlow<T> {
    public val value: T
}

public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
    public override var value: T

    public fun compareAndSet(expect: T, update: T): Boolean
}

狀態(tài)由其值表示。任何對值的更新都會反饋新值到所有流的接收器中。

StateFlow 基本使用
class StateFlow {

    private val _state = MutableStateFlow<String>("unKnown")
    val state: kotlinx.coroutines.flow.StateFlow<String> get() = _state

    fun getApi(scope: CoroutineScope) {
        scope.launch {
            var res = getApi()
            _state.value = res
        }
    }

    /**
     * 進行網(wǎng)絡Api請求
     */
    private suspend fun getApi() = withContext(Dispatchers.IO) {
        delay(2000) //模擬耗時請求
        "result"
    }

}

    private fun stateFlowTest() = runBlocking {
        val stateFlow = StateFlow()
        stateFlow.getApi(this) //開始獲取結果

        launch(Dispatchers.IO) {
            stateFlow.state.collect {
                Log.i("minfo", "${Thread.currentThread().name} + $it")
            }
        }

        launch(Dispatchers.IO) {
            stateFlow.state.collect {
                Log.i("minfo", "${Thread.currentThread().name} + $it")

            }
        }
    }

打印結果:

 I  DefaultDispatcher-worker-1 + unKnown
 I  DefaultDispatcher-worker-2 + unKnown
// 等待兩秒
 I  DefaultDispatcher-worker-1 + result
 I  DefaultDispatcher-worker-1 + result

StateFlow 的使用方式與 LiveData 類似。
MutableStateFlow 是可變類型的,即可以改變 value 的值。 StateFlow 則是只讀的。這與 LiveData、MutableLiveData一樣。

為什么使用 StateFlow

我們知道 LiveData 有如下特點:

1.只能在主線程更新數(shù)據(jù),即使在子線程通過 postValue()方法,最終也是將值 post 到主線程調用的 setValue()
2.LiveData 是不防抖的
3.LiveData 的 transformation 是在主線程工作
4.LiveData 需要正確處理 “粘性事件” 問題。
鑒于此,使用 StateFlow 可以輕松解決上述場景。

class StateFlow {

    private val _state = MutableStateFlow<String>("unKnown")
    val state: kotlinx.coroutines.flow.StateFlow<String> get() = _state

    fun getApi2(scope: CoroutineScope) {
        scope.launch {
            delay(2000)
            _state.value = "hello, coroutine"
        }
    }

    fun getApi3(scope: CoroutineScope) {
        scope.launch {
            delay(2000)
            _state.value = "hello, kotlin"
        }
    }
}

    fun stateFlowTest2() = runBlocking<Unit> {
        val stateFlow: StateFlow = StateFlow()

        stateFlow.getApi2(this) // 開始獲取結果
        delay(1000)
        stateFlow.getApi3(this) // 開始獲取結果

        val job1 = launch(Dispatchers.IO) {
            delay(8000)
            stateFlow.state.collect {
                Log.i("minfo", "${Thread.currentThread().name} + $it")
            }
        }
        val job2 = launch(Dispatchers.IO) {
            delay(8000)
            stateFlow.state.collect {
                Log.i("minfo", "${Thread.currentThread().name} + $it")
            }
        }

        // 避免任務泄漏,手動取消
        delay(10000)
        job1.cancel()
        job2.cancel()
    }

現(xiàn)在的場景是,先請求 getApi1(), 一秒之后再次請求 getApi2(), 這樣 stateFlow 的值加上初始值,一共被賦值過 3 次。確保,三次賦值都完成后,我們再收集 StateFlow 中的數(shù)據(jù)。
打印結果:

 DefaultDispatcher-worker-1 + hello, kotlin
 DefaultDispatcher-worker-2 + hello, kotlin

結果顯示了,StateFlow 只會將最新的數(shù)據(jù)發(fā)射給訂閱者。對比 LiveData, LiveData 內部有 version 的概念,對于注冊的訂閱者,會根據(jù) version 進行判斷,將歷史數(shù)據(jù)發(fā)送給訂閱者。即所謂的“粘性”。而SharedFlow可以做到此種粘性。

如需將任何數(shù)據(jù)流轉換為 StateFlow,請使用stateIn中間運算符。

StateFlow、Flow 和 LiveData

StateFlow 和LiveData具有相似之處。兩者都是可觀察的數(shù)據(jù)容器類,并且在應用架構中使用時,兩者都遵循相似模式。

但請注意,StateFlow 和LiveData的行為確實有所不同:

  • StateFlow 需要將初始狀態(tài)傳遞給構造函數(shù),而 LiveData 不需要。
  • 當 View 進入 STOPPED 狀態(tài)時,LiveData.observe() 會自動取消注冊使用方,而從 StateFlow 或任何其他數(shù)據(jù)流收集數(shù)據(jù)的操作并不會自動停止。如需實現(xiàn)相同的行為,您需要從 Lifecycle.repeatOnLifecycle 塊收集數(shù)據(jù)流。
SharedFlow

SharedFlow 也有兩種類型:SharedFlow 和 MutableSharedFlow。
使用 sharedIn 方法可以將 Flow 轉換為 SharedFlow。

public fun <T> MutableSharedFlow(
   replay: Int,   // 當新的訂閱者Collect時,發(fā)送幾個已經(jīng)發(fā)送過的數(shù)據(jù)給它
   extraBufferCapacity: Int = 0,  // 減去replay,MutableSharedFlow還緩存多少數(shù)據(jù)
   onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND  // 緩存溢出時的處理策略,三種 丟掉最新值、丟掉最舊值和掛起
): MutableSharedFlow<T>
class SharedFlow {

    private val _state = MutableSharedFlow<Int>(replay = 3,
        extraBufferCapacity = 2)
    val state: MutableSharedFlow<Int> get() = _state

    fun getApi(scope: CoroutineScope) {
        scope.launch {
            for (i in 0..20) {
                delay(200)
                _state.emit(i)
                Log.i("minfo", "send data $i")
            }
        }
    }

    fun main() = runBlocking {
        getApi(this)

        val job = launch(Dispatchers.IO) {
            delay(3000)
            state.collect {
                Log.i("minfo", "collect  $it")
            }
        }
        delay(5000)
        job.cancel()
    }
}

打印結果:

send data 0
send data 1
send data 2
send data 3
send data 4
send data 5
send data 6
send data 7
send data 8
send data 9
send data 10
send data 11
send data 12
send data 13
collect  11
collect  12
collect  13
send data 14
collect  14
send data 15
collect  15
send data 16
collect  16
send data 17
collect  17
send data 18
collect  18
send data 19
collect  19
send data 20
collect  20

分析一下該結果:
SharedFlow 每 200ms 發(fā)射一次數(shù)據(jù),總共發(fā)射 21 個數(shù)據(jù)出來,耗時大約 4s。
SharedFlow 的 replay 設置為 3, extraBufferCapacity 設置為2, 即 SharedFlow 的緩存為 5 。緩存溢出的處理策略是默認掛起的。
訂閱者是在 3s 之后開始收集數(shù)據(jù)的。此時應該已經(jīng)發(fā)射了 14 個數(shù)據(jù),即 0-13, SharedFlow 的緩存為 5, 緩存的數(shù)據(jù)為 9-13, 但是,只給訂閱者發(fā)送 3 個舊數(shù)據(jù),即訂閱者收集到的值是從 11 開始的。

StateFlow 和 SharedFlow 的使用場景

StateFlow 的命名已經(jīng)說明了適用場景, StateFlow 只會向訂閱者發(fā)射最新的值,適用于對狀態(tài)的監(jiān)聽。
SharedFlow 可以配置對歷史發(fā)射的數(shù)據(jù)進行訂閱,適合用來處理對于事件的監(jiān)聽。

Channel

Flow底層使用的Channel機制實現(xiàn),StateFlow、SharedFlow都是一對多的關系,如果上游發(fā)送者與下游UI層的訂閱者是一對一的關系,可以使用Channel來實現(xiàn),Channel默認是粘性的。

Channel使用特點:
每個消息只有一個訂閱者可以收到,用于一對一的通信。

Channel使用示例:

//viewModel中
private val _loadingChannel = Channel<Boolean>()
val loadingFlow = _loadingChannel.receiveAsFlow()

private suspend fun loadStart() {
    _loadingChannel.send(true)
}

private suspend fun loadFinish() {
    _loadingChannel.send(false)
}

//UI層接收Loading信息
 mViewModel.loadingFlow.flowWithLifecycle2(this, Lifecycle.State.STARTED) { isShow ->
     mStatusViewUtil.showLoadingView(isShow)
 }

參考:
https://developer.android.com/kotlin/flow/stateflow-and-sharedflow?hl=zh-cn

https://www.cnblogs.com/joy99/p/15805955.html#13-%E5%88%9B%E5%BB%BA%E5%B8%B8%E8%A7%84-flow-%E7%9A%84%E5%B8%B8%E7%94%A8%E6%96%B9%E5%BC%8F

用到demo:

https://github.com/running-libo/FlowUse

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容