Kotlin Flow 背壓和線程切換竟然如此相似

前言

上篇分析了Kotlin Flow原理,大部分操作符實(shí)現(xiàn)比較簡(jiǎn)單,相較而言背壓和線程切換比較復(fù)雜,遺憾的是,縱觀網(wǎng)上大部分文章,關(guān)于Flow背壓和協(xié)程切換這塊的原理說(shuō)得比較少,語(yǔ)焉不詳,鑒于此,本篇重點(diǎn)分析兩者的原理及使用。
通過(guò)本篇文章,你將了解到:

  1. 什么是背壓?
  2. 如何處理背壓?
  3. Flow buffer的原理
  4. Flow 線程切換的使用
  5. Flow 線程切換的原理

1. 什么是背壓?

先看自然界的水流:


image.png

為了充分利用水資源,人類(lèi)建立了大壩,以大壩為分界點(diǎn)將水流分為上游和下游。

當(dāng)上游的流速大于下游的流速,日積月累,最終導(dǎo)致大壩溢出,此種現(xiàn)象稱(chēng)為背壓的出現(xiàn)

而對(duì)于Kotlin里的Flow,也有上游(生產(chǎn)者)、下游(消費(fèi)者)的概念,如:

    suspend fun testBuffer1() {
        var flow = flow {
            //生產(chǎn)者
            (1..3).forEach {
                println("emit $it")
                emit(it)
            }
        }

        flow.collect {
            //消費(fèi)者
            println("collect:$it")
        }
    }

通過(guò)collect操作符觸發(fā)了流,從生產(chǎn)者生產(chǎn)數(shù)據(jù)(flow閉包),到消費(fèi)者接收并處理數(shù)據(jù)(collect閉包),這就完成了流從上游到下游的一次流動(dòng)過(guò)程。

2. 如何處理背壓?

模擬一個(gè)生產(chǎn)者消費(fèi)者速度不一致的場(chǎng)景:

    suspend fun testBuffer3() {
        var flow = flow {
            (1..3).forEach {
                delay(1000)
                println("emit $it")
                emit(it)
            }
        }

        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it")
            }
        }
        println("use time:${time} ms")
    }

計(jì)算流從生產(chǎn)到消費(fèi)的整個(gè)時(shí)間:


image.png

生產(chǎn)者的速度比消費(fèi)者的速度快,而它倆都是在同一個(gè)線程里順序執(zhí)行的,生產(chǎn)者必須等待消費(fèi)者消費(fèi)完畢后才會(huì)進(jìn)行下一次生產(chǎn)。
因此,整個(gè)流的耗時(shí)=生產(chǎn)者耗時(shí)(3 * 1000ms)+消費(fèi)者耗時(shí)(3 * 2000ms)=9s。

顯而易見(jiàn),消費(fèi)者影響了生產(chǎn)者的速度,這種情況下該怎么優(yōu)化呢?
最簡(jiǎn)單的解決方案:

生產(chǎn)者和消費(fèi)者分別在不同的線程執(zhí)行

如:

    suspend fun testBuffer4() {
        var flow = flow {
            (1..3).forEach {
                delay(1000)
                println("emit $it in thread:${Thread.currentThread()}")
                emit(it)
            }
        }.flowOn(Dispatchers.IO)

        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it in thread:${Thread.currentThread()}")
            }
        }
        println("use time:${time} ms")
    }

添加了flowOn()函數(shù),它的存在使得它前面的代碼在指定的線程里執(zhí)行,如flow閉包了的代碼都在IO線程執(zhí)行,也就是生產(chǎn)者在IO線程執(zhí)行。
而消費(fèi)者在當(dāng)前線程執(zhí)行,因此兩者無(wú)需相互等待,節(jié)省了總時(shí)間:


image.png

確實(shí)是減少了時(shí)間,提升了效率。但我們知道開(kāi)啟線程代價(jià)還是挺大的,既然都在協(xié)程里運(yùn)行了,能否借助協(xié)程的特性:協(xié)程掛起不阻塞線程 來(lái)完成此事呢?
此時(shí),Buffer出場(chǎng)了,先看看它是如何表演的:

    suspend fun testBuffer5() {
        var flow = flow {
            (1..3).forEach {
                delay(1000)
                println("emit $it in thread:${Thread.currentThread()}")
                emit(it)
            }
        }.buffer(5)

        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it in thread:${Thread.currentThread()}")
            }
        }
        println("use time:${time} ms")
    }

這次沒(méi)有使用flowOn,取而代之的是buffer。
運(yùn)行結(jié)果如下:


image.png

可以看出,生產(chǎn)者消費(fèi)者都是在同一線程執(zhí)行,但總耗時(shí)卻和不在同一線程運(yùn)行時(shí)相差無(wú)幾。
那么它是如何做到的呢?這就得從buffer的源碼說(shuō)起。

3. Flow buffer的原理

無(wú)buffer

先看看沒(méi)有buffer時(shí)的耗時(shí):

    suspend fun testBuffer3() {
        var flow = flow {
            (1..3).forEach {
                delay(1000)
                println("emit $it")
                emit(it)
            }
        }

        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it")
            }
        }
        println("use time:${time} ms")
    }
image.png

從collect開(kāi)始,依次執(zhí)行flow閉包,通過(guò)emit調(diào)用到collect閉包,因?yàn)閒low閉包里包含了幾次emit,因此整個(gè)流程會(huì)有幾次發(fā)射。
如上圖,從步驟1到步驟8,因?yàn)槭窃谕粋€(gè)線程里,因此是串行執(zhí)行的,整個(gè)流的耗時(shí)即為生產(chǎn)者到消費(fèi)者(步驟1~步驟8)的耗時(shí)。

有buffer

在沒(méi)看源碼之前,我們先猜測(cè)一下它的流程:


image.png

每次emit都發(fā)送到buffer里,然后立刻回來(lái)繼續(xù)發(fā)送,如此一來(lái)生產(chǎn)者沒(méi)有被消費(fèi)者的速度拖累。
而消費(fèi)者會(huì)檢測(cè)Buffer里是否有數(shù)據(jù),有則取出來(lái)。

根據(jù)之前的經(jīng)驗(yàn)我們知道:collect調(diào)用到emit最后到buffer是線性調(diào)用的,放入buffer后繼續(xù)循環(huán)emit,那么問(wèn)題來(lái)了:

是誰(shuí)觸發(fā)了collect閉包的調(diào)用呢?

接下來(lái)深入源碼,探究答案。

buffer源碼流程分析

創(chuàng)建Flow

public fun <T> Flow<T>.buffer(capacity: Int = Channel.BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
    var capacity = capacity//buffer容量
    var onBufferOverflow = onBufferOverflow//buffer滿(mǎn)之后的處理策略
    if (capacity == Channel.CONFLATED) {
        capacity = 0
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    }
    // create a flow
    return when (this) {
        is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
        //走else 分支,構(gòu)造ChannelFlowOperatorImpl
        else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
    }
}

buffer 返回Flow實(shí)例,其間涉及幾個(gè)重要的類(lèi)和函數(shù):


image.png

調(diào)用collect
當(dāng)調(diào)用Flow.collect時(shí):

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

構(gòu)造了匿名內(nèi)部類(lèi)FlowCollector,并實(shí)現(xiàn)了emit方法,它的實(shí)現(xiàn)為collect的閉包。

調(diào)用ChannelFlowOperatorImpl.collect最終會(huì)調(diào)用ChannelFlow.collect:

    override suspend fun collect(collector: FlowCollector<T>): Unit =
        coroutineScope {
            collector.emitAll(produceImpl(this))
        }

    public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
        scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)

produceImpl 創(chuàng)建了Channel,內(nèi)部開(kāi)啟了協(xié)程,返回ReceiveChannel。

再來(lái)看emitAll函數(shù):

private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
    ensureActive()
    var cause: Throwable? = null
    try {
        while (true) {
            //掛起等待Channel數(shù)據(jù)
            val result = run { channel.receiveCatching() }
            if (result.isClosed) {
                //Channel關(guān)閉后才會(huì)退出循環(huán)
                result.exceptionOrNull()?.let { throw it }
                break // returns normally when result.closeCause == null
            }
            //發(fā)送數(shù)據(jù)
            emit(result.getOrThrow())
        }
    } catch (e: Throwable) {
        cause = e
        throw e
    } finally {
        if (consume) channel.cancelConsumed(cause)
    }
}

Channel此時(shí)并沒(méi)有數(shù)據(jù),因此協(xié)程會(huì)掛起等待。

Channel發(fā)送
Channel什么時(shí)候有數(shù)據(jù)呢?當(dāng)然是在調(diào)用了Channel.send()函數(shù)后。
前面提到過(guò)collect之后開(kāi)啟了協(xié)程:

  public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
        scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)

  internal val collectToFun: suspend (ProducerScope<T>) -> Unit
        get() = { collectTo(it) }

  protected override suspend fun collectTo(scope: ProducerScope<T>) =
        flowCollect(SendingCollector(scope))

此時(shí)傳入的參數(shù)為:collectToFun,最后構(gòu)造了:

public class SendingCollector<T>(
    private val channel: SendChannel<T>
) : FlowCollector<T> {
    override suspend fun emit(value: T): Unit = channel.send(value)
}

當(dāng)協(xié)程得到執(zhí)行時(shí),會(huì)調(diào)用collectToFun-->collectTo(it)-->flowCollect(SendingCollector(scope)),最終調(diào)用到:

#ChannelFlowOperatorImpl
    override suspend fun flowCollect(collector: FlowCollector<T>) =
        flow.collect(collector)

而該flow為最開(kāi)始的flow,collector為SendingCollector。
flow.collect后會(huì)調(diào)用到flow的閉包,進(jìn)而調(diào)用到emit函數(shù):

    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        val currentContext = uCont.context
        currentContext.ensureActive()
        //...
        completion = uCont
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    }

emitFun本質(zhì)上會(huì)調(diào)用collector里的emit函數(shù),而此時(shí)的collector即為SendingCollector,最后調(diào)用channel.send(value)

如此一來(lái),Channel就將數(shù)據(jù)發(fā)送出去了,此時(shí)channel.receiveCatching()被喚醒,接下來(lái)執(zhí)行emit(result.getOrThrow()),這函數(shù)最后會(huì)流轉(zhuǎn)到最初始的collect的閉包里。
上面的分析即為生產(chǎn)者到消費(fèi)者的流轉(zhuǎn)過(guò)程,單看源碼可能比較亂,看圖解惑:

image.png

紅色部分和綠色部分分別為不同的協(xié)程,它倆的關(guān)聯(lián)點(diǎn)即是藍(lán)色部分。

Flow buffer的本質(zhì)上是利用了Channel進(jìn)行數(shù)據(jù)的發(fā)送和接收

buffer為啥能提升效率

前面分析過(guò)無(wú)buffer時(shí)生產(chǎn)者消費(fèi)者的流程圖,作為對(duì)比,我們也將加入buffer后生產(chǎn)者消費(fèi)者的流程圖。


image.png

還是以相同的demo,闡述其流程:

  1. 生產(chǎn)者掛起1s,當(dāng)1s結(jié)束后調(diào)用emit發(fā)射數(shù)據(jù),此時(shí)數(shù)據(jù)放入buffer里,生產(chǎn)者調(diào)用delay繼續(xù)掛起
  2. 此時(shí)消費(fèi)者被喚醒,然后掛起 2s等待
  3. 第2s到來(lái)之時(shí),生產(chǎn)者調(diào)用emit發(fā)送數(shù)據(jù)到buffer里,繼續(xù)掛起
  4. 第2s到來(lái)之時(shí),消費(fèi)者結(jié)束掛起,消費(fèi)數(shù)據(jù),然后繼續(xù)掛起2s
  5. 第3s到來(lái)之時(shí),生產(chǎn)者繼續(xù)生產(chǎn)數(shù)據(jù),而后生產(chǎn)者退出生產(chǎn)
  6. 第5s到來(lái)之時(shí),消費(fèi)者掛起結(jié)束,消費(fèi)數(shù)據(jù),然后繼續(xù)掛起2s
  7. 第7s到來(lái)之時(shí),消費(fèi)者掛起結(jié)束,消費(fèi)結(jié)束,此時(shí)因?yàn)閏hannel里已經(jīng)沒(méi)有數(shù)據(jù)了,退出循環(huán),最終消費(fèi)者退出

由此可見(jiàn),總共花費(fèi)了7s。

image.png

ps:協(xié)程調(diào)度時(shí)機(jī)不同,打印順序可能略有差異,但總體耗時(shí)不變。

至此,我們找到了buffer能夠提高效率的原因:

生產(chǎn)者、消費(fèi)者運(yùn)行在不同的協(xié)程,掛起操作不阻塞對(duì)方

拋出一個(gè)比較有意思的問(wèn)題:以下代碼加buffer之后效率會(huì)有提升嗎?

    suspend fun testBuffer6() {
        var flow = flow {
            (1..3).forEach {
                println("emit $it")
                emit(it)
            }
        }
        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it")
            }
        }
        println("use time:${time} ms")
    }

在未實(shí)驗(yàn)之前,如果你已經(jīng)有答案,恭喜你已經(jīng)弄懂了buffer的本質(zhì)。

4. Flow 線程切換的使用

    suspend fun testBuffer4() {
        var flow = flow {
            (1..3).forEach {
                delay(1000)
                println("emit $it in thread:${Thread.currentThread()}")
                emit(it)
            }
        }.flowOn(Dispatchers.IO)

        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it in thread:${Thread.currentThread()}")
            }
        }
        println("use time:${time} ms")
    }

flowOn(Dispatchers.IO)表示其之前的操作符(函數(shù))都在IO線程執(zhí)行,如這里的意思是flow閉包里的代碼在IO線程執(zhí)行。
而其之后的操作符(函數(shù))在當(dāng)前的線程執(zhí)行。
通常用在子線程里獲取網(wǎng)絡(luò)數(shù)據(jù)(flow閉包),然后再collect閉包里(主線程)更新UI。

5. Flow 線程切換的原理

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
    checkFlowContext(context)
    return when {
        context == EmptyCoroutineContext -> this
        this is FusibleFlow -> fuse(context = context)
        else -> ChannelFlowOperatorImpl(this, context = context)
    }
}

看到這你可能已經(jīng)有答案了:這不就和buffer一樣的方式嗎?
但仔細(xì)看,此處多了個(gè)上下文:CoroutineContext。
CoroutineContext的作用就是用來(lái)決定協(xié)程運(yùn)行在哪個(gè)線程。

前面分析的buffer時(shí),我們的協(xié)程的作用域是runBlocking,即使生產(chǎn)者、消費(fèi)者在不同的協(xié)程,但是它們始終在同一個(gè)線程里執(zhí)行。
而使用了flowOn指定線程,此時(shí)生產(chǎn)者、消費(fèi)者在不同的線程運(yùn)行協(xié)程。
因此,只要弄懂了buffer原理,flowOn原理自然而然就懂了。


image.png

以上為Flow背壓和線程切換的全部?jī)?nèi)容,下篇將分析Flow的熱流。
本文基于Kotlin 1.5.3,文中完整Demo請(qǐng)點(diǎn)擊

您若喜歡,請(qǐng)點(diǎn)贊、關(guān)注、收藏,您的鼓勵(lì)是我前進(jìn)的動(dòng)力

持續(xù)更新中,和我一起步步為營(yíng)系統(tǒng)、深入學(xué)習(xí)Android/Kotlin

1、Android各種Context的前世今生
2、Android DecorView 必知必會(huì)
3、Window/WindowManager 不可不知之事
4、View Measure/Layout/Draw 真明白了
5、Android事件分發(fā)全套服務(wù)
6、Android invalidate/postInvalidate/requestLayout 徹底厘清
7、Android Window 如何確定大小/onMeasure()多次執(zhí)行原因
8、Android事件驅(qū)動(dòng)Handler-Message-Looper解析
9、Android 鍵盤(pán)一招搞定
10、Android 各種坐標(biāo)徹底明了
11、Android Activity/Window/View 的background
12、Android Activity創(chuàng)建到View的顯示過(guò)
13、Android IPC 系列
14、Android 存儲(chǔ)系列
15、Java 并發(fā)系列不再疑惑
16、Java 線程池系列
17、Android Jetpack 前置基礎(chǔ)系列
18、Android Jetpack 易懂易學(xué)系列
19、Kotlin 輕松入門(mén)系列
20、Kotlin 協(xié)程系列全面解讀

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容