【Koltin Flow(五)】SharedFlow及StateFlow

目錄

【Koltin Flow(一)】五種創(chuàng)建flow的方式
【Koltin Flow(二)】Flow操作符之末端操作符
【Koltin Flow(三)】Flow操作符之中間操作符(一)
【Koltin Flow(三)】Flow操作符之中間操作符(二)
【Koltin Flow(三)】Flow操作符之中間操作符(三)
【Koltin Flow(四)】Flow背壓
【Koltin Flow(五)】SharedFlow及StateFlow

SharedFlow

簡(jiǎn)介

相對(duì)于Flow而言,SharedFlow為熱流,也就是說(shuō)不管有無(wú)接收者,都會(huì)發(fā)送值。

一、基本使用

代碼如下:
            val sharedFlow = MutableSharedFlow<Int>()
            launch {
                sharedFlow.collect {
                    Log.d(TAG.TAG,"SharedFlow $it")
                }
            }

            delay(10)
            sharedFlow.emit(1)
            sharedFlow.emit(100)
            sharedFlow.emit(100)
日志如下:
2022-08-03 10:16:53.366 4955-4981/edu.test.demo D/Test-TAG: SharedFlow 1
2022-08-03 10:16:53.366 4955-4981/edu.test.demo D/Test-TAG: SharedFlow 100
2022-08-03 10:16:53.367 4955-4981/edu.test.demo D/Test-TAG: SharedFlow 100
分析
  • 日志可以看出 基本使用的時(shí)候和flow本身沒(méi)多大區(qū)別,發(fā)送-接收。
  • 可以看出上面有個(gè)delay(10),如果沒(méi)有則可能會(huì)接收不到值,因?yàn)镾haredFlow為熱流,不管有無(wú)接收者,emit都會(huì)直接發(fā)送值。

二、設(shè)置訂閱重發(fā)

我們看SharedFlow構(gòu)造的第一個(gè)參數(shù)replay,此參數(shù)用來(lái)設(shè)置訂閱重發(fā)的個(gè)數(shù)。
代碼如下:
            val sharedFlow = MutableSharedFlow<Int>(
                replay = 2
            )

            sharedFlow.emit(1)
            sharedFlow.emit(100)
            sharedFlow.emit(100)
            //注釋1  在此處加上重發(fā)緩存打印
            //Log.d(TAG.TAG,"SharedFlow ${sharedFlow.replayCache}")
            delay(100)

            launch {
                sharedFlow.collect {
                    Log.d(TAG.TAG,"SharedFlow $it")
                }
            }
             //注釋2 重置緩存
            //delay(100)
            //sharedFlow.resetReplayCache()
            //Log.d(TAG.TAG,"SharedFlow ${sharedFlow.replayCache}")
            //launch {
              //  sharedFlow.collect {
                //    Log.d(TAG.TAG,"SharedFlow 2 $it")
                //}
            //}
日志如下:
//注釋1 處的打印
//2022-08-03 10:23:13.621 5106-5131/edu.test.demo D/Test-TAG: SharedFlow [100, 100]
2022-08-03 10:21:14.906 5043-5070/edu.test.demo D/Test-TAG: SharedFlow 100
2022-08-03 10:21:14.906 5043-5070/edu.test.demo D/Test-TAG: SharedFlow 100
//注釋2下的打印
//2022-08-03 10:26:54.917 5233-5260/edu.test.demo D/Test-TAG: SharedFlow []
分析:
  • 可以看到,雖然是發(fā)送在前,接收在后,但還是收到了兩個(gè)值。因?yàn)閞eplay重發(fā)的值設(shè)置為2,可以這樣看起來(lái)不直觀,這樣,我們修改下代碼,放開(kāi)注釋1出的打印,則日志會(huì)多出來(lái)【注釋1 處的打印】部分的內(nèi)容,可以很清晰的看出緩存了后兩個(gè)值。
  • 當(dāng)然此緩存可以重置,也就是清空之前的緩存,放開(kāi)注釋2處的代碼,則可以看出resetReplayCache,之后則清空了緩存,后面的collect接收不到緩存的值。

三、其他兩個(gè)參數(shù),可參考背壓部分的內(nèi)容

四、shareIn操作符

shareIn操作符是將冷流flow轉(zhuǎn)換為熱流SharedFlow,主要參數(shù)有三個(gè)
1、第一個(gè)為作用域。
2、策略,分為三種Eagerly(立即發(fā)送)、Lazily(有第一個(gè)訂閱者之后發(fā)送)、
WhileSubscribed()(在第一個(gè)訂閱者出現(xiàn)之后開(kāi)始、在最后一個(gè)訂閱者、消失后結(jié)束),可配置二外的參數(shù):
stopTimeoutMillis 為最后一個(gè)訂閱者小時(shí)候保留的時(shí)長(zhǎng),單位ms,默認(rèn)為0。
replayExpirationMillis 為最后一個(gè)訂閱者消失后,緩存保留的時(shí)長(zhǎng),單位ms,默認(rèn)為L(zhǎng)ong.MAX_VALUE。
3、緩存重發(fā)的個(gè)數(shù)。
第一種策略
代碼如下:
     val sharedFlow = (1..5).asFlow().shareIn(this, SharingStarted.Eagerly,0)
            delay(100)
            sharedFlow.collect {
                Log.d(TAG.TAG,"shareIn $it")
            }
日志沒(méi)有
分析:
  • 我們發(fā)現(xiàn)沒(méi)有日志,原因在哪里呢,因?yàn)檗D(zhuǎn)換成熱流之后策略為Eagerly,立即開(kāi)始發(fā)送,但是100ms之后才有collect,同時(shí)replay的個(gè)數(shù)為0,所以接收不到值,如果我們將replay個(gè)數(shù)改為2,則可以接收到4和5,和上面的訂閱重發(fā)是一樣的。
第二種策略
代碼如下:
    val sharedFlow = (1..5).asFlow().shareIn(this, SharingStarted.Lazily, 2)
            delay(100)
            launch {
                sharedFlow.collect {
                    Log.d(TAG.TAG, "shareIn $it")
                }
            }
            delay(100)
            launch {
                sharedFlow.collect {
                    Log.d(TAG.TAG, "shareIn 2 $it")
                }
            }
日志如下:
2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 1
2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 2
2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 3
2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 4
2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 5
2022-08-03 11:09:10.205 6691-6720/edu.test.demo D/Test-TAG: shareIn 2 4
2022-08-03 11:09:10.205 6691-6720/edu.test.demo D/Test-TAG: shareIn 2 5
分析:
  • 可以看出,第一個(gè)collect能接收到全部值,那是因?yàn)長(zhǎng)azily是在第一個(gè)訂閱者出現(xiàn)后才發(fā)送值的,但是第二個(gè)collect卻只接收到了緩存的兩個(gè)值,那是因?yàn)長(zhǎng)azily只管第一個(gè)collect,不管后續(xù)的collect。
第三種策略
1. 采用默認(rèn)參數(shù)
代碼如下:
  var time  = 0L
            time = System.currentTimeMillis()
            val sharedFlow = (1..100).asFlow().onStart {
                Log.d(TAG.TAG,"onStart ${System.currentTimeMillis() - time}")
            }.onCompletion {
                Log.d(TAG.TAG,"onCompletion ${System.currentTimeMillis() - time}")
            }.onEach {
                delay(1000)
            }.shareIn(this, SharingStarted.WhileSubscribed(), 0)

            delay(1000)
            launch {
                Log.d(TAG.TAG, "1 shareIn ${sharedFlow.first()}")
                Log.d(TAG.TAG,"1 接收到第一個(gè)值 ${System.currentTimeMillis() - time}")
            }

            delay(3000)
            launch {
                Log.d(TAG.TAG, "2 shareIn ${sharedFlow.first()}")
                Log.d(TAG.TAG,"2 接收到第一個(gè)值 ${System.currentTimeMillis() - time}")
            }
日志如下:
2022-08-03 14:00:32.842 8905-8930/edu.test.demo D/Test-TAG: onStart 1029
2022-08-03 14:00:33.844 8905-8930/edu.test.demo D/Test-TAG: 1 shareIn 1
2022-08-03 14:00:33.844 8905-8930/edu.test.demo D/Test-TAG: 1 接收到第一個(gè)值 2031
2022-08-03 14:00:33.845 8905-8931/edu.test.demo D/Test-TAG: onCompletion 2032
2022-08-03 14:00:35.839 8905-8931/edu.test.demo D/Test-TAG: onStart 4026
2022-08-03 14:00:36.841 8905-8931/edu.test.demo D/Test-TAG: 2 shareIn 1
2022-08-03 14:00:36.841 8905-8931/edu.test.demo D/Test-TAG: 2 接收到第一個(gè)值 5028
2022-08-03 14:00:36.841 8905-8930/edu.test.demo D/Test-TAG: onCompletion 5028
分析:
  • 可以看出 在first之后第一個(gè)接收者消失,所以執(zhí)行了onCompletion,也就是flow結(jié)束了,而且接收到第一個(gè)值和onCompletion的時(shí)間基本是一致的。
  • 在4000ms之后第二個(gè)接收者出現(xiàn),重新執(zhí)行了onStart ,并且在first之后也執(zhí)行了onCompletion結(jié)束了。
2.進(jìn)行相關(guān)的參數(shù)配置
代碼如下:
             var time  = 0L
            time = System.currentTimeMillis()
            val sharedFlow = (1..100).asFlow().onStart {
                Log.d(TAG.TAG,"onStart ${System.currentTimeMillis() - time}")
            }.onCompletion {
                Log.d(TAG.TAG,"onCompletion ${System.currentTimeMillis() - time}")
            }.onEach {
                delay(1000)
            }.shareIn(this, SharingStarted.WhileSubscribed(
                stopTimeoutMillis = 500,
                replayExpirationMillis = 2000
            ), replay = 5)

            delay(1*1000)
            launch {
                Log.d(TAG.TAG, "1 shareIn ${sharedFlow.take(5).toList()}")
                Log.d(TAG.TAG,"1 接收到值 ${System.currentTimeMillis() - time}")
            }

            delay(10*1000)
            launch {
                Log.d(TAG.TAG, "2 shareIn ${sharedFlow.take(10).toList()}")
                Log.d(TAG.TAG,"2 接收到值 ${System.currentTimeMillis() - time}")
            }
日志如下( stopTimeoutMillis = 500,replayExpirationMillis = 2000,replay = 5):
2022-08-03 14:21:15.245 9591-9616/edu.test.demo D/Test-TAG: onStart 1030
2022-08-03 14:21:20.252 9591-9616/edu.test.demo D/Test-TAG: 1 shareIn [1, 2, 3, 4, 5]
2022-08-03 14:21:20.252 9591-9616/edu.test.demo D/Test-TAG: 1 接收到值 6037
2022-08-03 14:21:20.753 9591-9616/edu.test.demo D/Test-TAG: onCompletion 6538
2022-08-03 14:21:25.243 9591-9622/edu.test.demo D/Test-TAG: onStart 11028
2022-08-03 14:21:35.253 9591-9622/edu.test.demo D/Test-TAG: 2 shareIn [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
2022-08-03 14:21:35.253 9591-9622/edu.test.demo D/Test-TAG: 2 接收到值 21038
2022-08-03 14:21:35.755 9591-9617/edu.test.demo D/Test-TAG: onCompletion 21540
分析:
  • 可以看出,在設(shè)置了 stopTimeoutMillis = 500之后,接收到值得時(shí)間和onCompletion的時(shí)間基本差了500ms,其實(shí)就是就是延時(shí)結(jié)束。
  • 在設(shè)置了replayExpirationMillis = 2000之后,第二次開(kāi)始和接收到值的時(shí)間基本就是發(fā)送10個(gè)值得時(shí)間,具體值為1-10,那是因?yàn)榫彺娴闹狄呀?jīng)失效了,此時(shí)這個(gè)replay = 5沒(méi)有多大意義,因?yàn)榈较麓蔚臅r(shí)候值已經(jīng)失效了。
日志如下( stopTimeoutMillis = 500,replayExpirationMillis = Long.MAX_VALUE,replay = 5):
2022-08-03 14:47:30.144 9697-9724/edu.test.demo D/Test-TAG: onStart 1030
2022-08-03 14:47:35.154 9697-9723/edu.test.demo D/Test-TAG: 1 shareIn [1, 2, 3, 4, 5]
2022-08-03 14:47:35.154 9697-9723/edu.test.demo D/Test-TAG: 1 接收到值 6040
2022-08-03 14:47:35.655 9697-9723/edu.test.demo D/Test-TAG: onCompletion 6541
2022-08-03 14:47:40.141 9697-9726/edu.test.demo D/Test-TAG: onStart 11027
2022-08-03 14:47:45.149 9697-9727/edu.test.demo D/Test-TAG: 2 shareIn [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
2022-08-03 14:47:45.149 9697-9727/edu.test.demo D/Test-TAG: 2 接收到值 16035
2022-08-03 14:47:45.652 9697-9726/edu.test.demo D/Test-TAG: onCompletion 16538
分析:
  • 首先看區(qū)別,第一個(gè)接收者基本沒(méi)變化。
  • 第二個(gè)接收者有兩點(diǎn)變化,接收到的值變了,不再是1-10,而是兩個(gè)1-5,而且第二個(gè)onStart和接收到值的時(shí)間也變了,不再是10個(gè)值的時(shí)間,而是五個(gè)值的時(shí)間,原因主要如下:第一replayExpirationMillis值設(shè)置為 Long.MAX_VALUE(這也是其默認(rèn)值)之后,第二個(gè)接收者出現(xiàn)時(shí),緩存并未失效,所以出現(xiàn)了前面的12345(因?yàn)閞eplay=5,緩存前五個(gè)),但是take(10)不夠了,所以又出現(xiàn)了onStart,再接收五個(gè),也就是后面的12345,時(shí)間也就是接收五個(gè)的時(shí)間。
  • 如果將第二個(gè)接收者的take(10)變成take(5),緩存就直接夠了,就不會(huì)出現(xiàn)第二個(gè)onStart。
  • 當(dāng)然這也是因?yàn)榍懊媸莟ake(5),如果改成take(3),后面也就會(huì)出現(xiàn)onStart,數(shù)據(jù)變成12312,那是因?yàn)殡m然緩存池為5,但是只緩存了三個(gè)值,還需要兩個(gè)。

StateFlow

簡(jiǎn)介

和SharedFlow一樣,StateFlow也是熱流,但是區(qū)別在于狀態(tài)的保存,保存了最新的值,也就是新的接收者會(huì)收到最新的值,
和設(shè)置了replay = 1的SharedFlow比較類(lèi)似。

簡(jiǎn)單使用

代碼如下:
            val stateFlow = MutableStateFlow(0)
            launch {
                stateFlow.collect{
                    Log.d(TAG.TAG,"stateFlow 1 collect $it")
                }
            }
            delay(1000)
            stateFlow.value = 10
            delay(1000)
            stateFlow.value = 10
            delay(1000)
            stateFlow.value = 11
            launch {
                stateFlow.collect{
                    Log.d(TAG.TAG,"stateFlow 2 collect $it")
                }
            }

日志如下:
2022-08-03 15:43:14.618 11669-11705/edu.test.demo D/Test-TAG: stateFlow 1 collect 0
2022-08-03 15:43:15.623 11669-11704/edu.test.demo D/Test-TAG: stateFlow 1 collect 10
2022-08-03 15:43:17.626 11669-11705/edu.test.demo D/Test-TAG: stateFlow 1 collect 11
2022-08-03 15:43:17.626 11669-11705/edu.test.demo D/Test-TAG: stateFlow 2 collect 11
分析:
  • 可以看到接收者1剛開(kāi)始收到了初始值1,那是因?yàn)镾tateFlow在每個(gè)接收者出現(xiàn)時(shí)都會(huì)接收到最新的值。
  • 接收者1后面又接收到了10,那是動(dòng)態(tài)更新StateFlow的value值觸發(fā)的,但是注意10發(fā)送了兩次,但是只接收到了一次,說(shuō)明StateFlow是天然防抖的,連續(xù)發(fā)送兩次同樣的值,只會(huì)接收一次,后面又接收到了11和第一個(gè)10效果一致,是value值觸發(fā)的。
  • 接收者2接收到11的道理和接收者1接收到1的道理是一樣的。

stateIn操作符

stateIn操作符是將冷流flow轉(zhuǎn)換為熱流StateFlow,主要參數(shù)有三個(gè)
1、第一個(gè)為作用域。
2、策略,分為三種Eagerly(立即發(fā)送)、Lazily(有第一個(gè)訂閱者之后發(fā)送)、
WhileSubscribed()(在第一個(gè)訂閱者出現(xiàn)之后開(kāi)始、在最后一個(gè)訂閱者、消失后結(jié)束),可配置二外的參數(shù):
stopTimeoutMillis 為最后一個(gè)訂閱者小時(shí)候保留的時(shí)長(zhǎng),單位ms,默認(rèn)為0。
replayExpirationMillis 為最后一個(gè)訂閱者消失后,緩存保留的時(shí)長(zhǎng),單位ms,默認(rèn)為L(zhǎng)ong.MAX_VALUE。
3、StateFlow的初始值。
策略基本和shareIn是一致的,只是replay固定為1,另外會(huì)有一個(gè)初始值。
分析一種,其他的和shareIn類(lèi)比即可,策略為WhileSubscribed()。
代碼如下:
             var time = 0L
            time = System.currentTimeMillis()
            val stateFlow = (1..10).asFlow().onStart {
                Log.d(TAG.TAG,"onStart ${System.currentTimeMillis() - time}")
            }.onCompletion {
                Log.d(TAG.TAG,"onCompletion ${System.currentTimeMillis() - time}")
            }.onEach {
                delay(1000)
            }.stateIn(this, SharingStarted.WhileSubscribed(),0)

            launch {
                Log.d(TAG.TAG, "stateFlow 1 collect ${stateFlow.take(5).toList()}")
                Log.d(TAG.TAG,"接收到值 ${System.currentTimeMillis() - time}")
            }
            delay(10*1000)
            launch {
                Log.d(TAG.TAG, "stateFlow 2 collect ${stateFlow.take(5).toList()}")
                Log.d(TAG.TAG,"接收到值 ${System.currentTimeMillis() - time}")
            }
日志如下:
2022-08-03 15:33:35.204 11353-11379/edu.test.demo D/Test-TAG: onStart 70
2022-08-03 15:33:39.219 11353-11380/edu.test.demo D/Test-TAG: stateFlow 1 collect [0, 1, 2, 3, 4]
2022-08-03 15:33:39.219 11353-11380/edu.test.demo D/Test-TAG: 接收到值 4101
2022-08-03 15:33:39.221 11353-11378/edu.test.demo D/Test-TAG: onCompletion 4102
2022-08-03 15:33:45.145 11353-11378/edu.test.demo D/Test-TAG: onStart 10027
2022-08-03 15:33:49.151 11353-11378/edu.test.demo D/Test-TAG: stateFlow 2 collect [4, 1, 2, 3, 4]
2022-08-03 15:33:49.151 11353-11378/edu.test.demo D/Test-TAG: 接收到值 14033
2022-08-03 15:33:49.151 11353-11380/edu.test.demo D/Test-TAG: onCompletion 14033
分析:
  • 可以看到接收者1接收到的值為01234,因?yàn)橛幸粋€(gè)初始值為0,占了一個(gè)位置,開(kāi)始到接收到值的時(shí)間也基本是個(gè)值的時(shí)間,接收到之后也打印了onCompletion,和接收到值的時(shí)間基本一致。
  • 第二個(gè)接收者,接收到的值為41234,因?yàn)镾tateFlow緩存了一個(gè)最新值4,再接收四個(gè)新值,時(shí)間和接收者1類(lèi)似。

總結(jié)

  • 本篇主要介紹了SharedFlow和StateFlow的基本使用、以及參數(shù)設(shè)置相關(guān)內(nèi)容。
  • 本篇也終點(diǎn)介紹了shareIn操作符的使用,以及各種策略參數(shù)的設(shè)置,stateIn類(lèi)比shareIn理解。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容