kotlin<第十一篇>:Channel-通道

(1)基本用法

Channel實(shí)際上是一個(gè)并發(fā)安全的隊(duì)列,它可以用來連接協(xié)程,實(shí)現(xiàn)不同協(xié)程的通信。
生產(chǎn)者/消費(fèi)者模式 (send - channel - receive)

Channel的基本用法如下:

runBlocking {

    val channel = Channel<Int>()
    // 生產(chǎn)者
    val producer = GlobalScope.launch {
        var i = 0
        while(true) {
            delay(1000)
            channel.send(++i)
            println("send $i")
        }
    }

    // 消費(fèi)者
    val consumer = GlobalScope.launch {
        while(true) {
            val element = channel.receive()
            println("receive $element")
        }
    }

    joinAll(producer, consumer)

}
(2)Channel的容量

Channel實(shí)際上就是一個(gè)隊(duì)列,隊(duì)列中一定存在緩沖區(qū),那么一旦這個(gè)緩沖區(qū)滿了,
并且也一直沒有人調(diào)用receive取走數(shù)據(jù),send就需要掛起。
故意讓接收端的節(jié)奏放慢,發(fā)現(xiàn)send總是會(huì)掛起,直到receive之后才會(huì)繼續(xù)往下執(zhí)行。

Channel的默認(rèn)大小為0。

(3)迭代Channel

Channel本身確實(shí)像序列,所以我們?cè)谧x取的時(shí)候可以直接獲取一個(gè)Channel的iterator。

runBlocking {

    val channel = Channel<Int>(Channel.UNLIMITED)
    // 生產(chǎn)者
    val producer = GlobalScope.launch {
        for (x in 1..5) {
            println("send ${x * x}")
            channel.send(x * x)
        }
    }

    // 消費(fèi)者
    val consumer = GlobalScope.launch {
        val iterator = channel.iterator()
        while(iterator.hasNext()) {
            val element = iterator.next()
            println("receive $element")
            delay(1000)
        }
    }

    joinAll(producer, consumer)

}

消費(fèi)者代碼也可以改成:

    // 消費(fèi)者
    val consumer = GlobalScope.launch {
        for (element in channel) {
            println("receive $element")
            delay(1000)
        }
    }
(4)produce與actor

構(gòu)造生產(chǎn)者與消費(fèi)者的便捷方法。
我們可以通過produce方法啟動(dòng)一個(gè)生產(chǎn)者協(xié)程,并返回一個(gè)ReceiveChannel,其他協(xié)程就可以用這個(gè)Channel來接收數(shù)據(jù)了。
反過來,我們可以用actor啟動(dòng)一個(gè)消費(fèi)者協(xié)程。

produce演示:

runBlocking {

    val receiveChannel = GlobalScope.produce<Int> {
        repeat(100) {
            delay(1000)
            send(it)
        }
    }

    // 消費(fèi)者
    val consumer = GlobalScope.launch {
        for (element in receiveChannel) {
            println("receive $element")
        }
    }
    consumer.join()
}

actor演示:

runBlocking {

    // 消費(fèi)者
    val sendChannel = GlobalScope.actor<Int> {
        while (true) {
            val element = receive()
            println("receive $element")
        }
    }

    // 生產(chǎn)者
    val producer = GlobalScope.launch {
        repeat(100) {
            sendChannel.send(it)
            delay(1000)
        }
    }
    producer.join()
}
(5)Channel關(guān)閉

produce和actor返回的Channel都會(huì)隨著對(duì)應(yīng)的協(xié)程執(zhí)行完畢而關(guān)閉,也正是這樣,Channel才被稱為 熱數(shù)據(jù)流。
對(duì)于一個(gè)Channel,如果我們調(diào)用了它的close方法,它會(huì)立即停止接收新元素,也就是說這時(shí)它的 isClosedForSend 會(huì)立即返回true,而由于Channel緩沖區(qū)的存在,這時(shí)可能還有一些元素沒有處理完,因此要等所有的元素都讀取之后 isClosedForReceive 才會(huì)返回true。

Channel的生命周期最好由主導(dǎo)方來維護(hù),建議 由主導(dǎo)的一方實(shí)現(xiàn)關(guān)閉。

runBlocking {

    val channel = Channel<Int>(3)
    // 生產(chǎn)者
    val producer = GlobalScope.launch {
        List(3) {
           channel.send(it)
           println("send $it")
        }
        channel.close()
        println("producer -> isClosedForSend:" + channel.isClosedForSend + " -- isClosedForReceive:" + channel.isClosedForReceive)
    }

    // 消費(fèi)者
    val consumer = GlobalScope.launch {
        for (element in channel) {
            println("receive:$element")
            delay(1000)
        }
        println("consumer -> isClosedForSend:" + channel.isClosedForSend + " -- isClosedForReceive:" + channel.isClosedForReceive)
    }
    joinAll(producer, consumer)
}
(6)BroadcastChannel

正常情況下,一個(gè) 發(fā)送者 對(duì)應(yīng)著一個(gè) 接收者。
使用 BroadcastChannel 可以存在多個(gè)接收者。

runBlocking {

    val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
    // 生產(chǎn)者
    val producer = GlobalScope.launch {
        List(3) {
            delay(1000)
            broadcastChannel.send(it)
        }
        broadcastChannel.close()
    }

    List(3) { index->
        GlobalScope.launch {
            val receiveChannel = broadcastChannel.openSubscription()
            for (element in receiveChannel) {
                delay(1000)
                println("[#${index}] receive:$element")
            }
        }
    }.joinAll()

    producer.join()
}

BroadcastChannel<Int>(Channel.BUFFERED) 可以改成 Channel<Int>().broadcast(Channel.BUFFERED)。

(7)await多路復(fù)用

什么是多路復(fù)用?
數(shù)據(jù)通信系統(tǒng)或計(jì)算機(jī)網(wǎng)絡(luò)系統(tǒng)中,傳輸媒體的帶寬或容量往往會(huì)大于傳輸單一信號(hào)的需求,為了有效地利用通信線路,希望 一個(gè)信道同時(shí)傳輸多路信號(hào),這就是所謂多路復(fù)用技術(shù)。

復(fù)用多個(gè)await?
兩個(gè)API分別從網(wǎng)絡(luò)和本地緩存 獲取數(shù)據(jù),期望哪個(gè)先返回就先用哪個(gè)做展示。

request->server->response--
                         ----Select -> Response
request->server->response--
data class User(val name: String)
data class Response<T>(val value: T, val isLocal: Boolean)
suspend fun CoroutineScope.getUserForLocal(name: String) = async(Dispatchers.IO) {
    delay(1000)
    User(name)
}

suspend fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
    delay(100)
    User(name)
}

fun main() {
    runBlocking {

        GlobalScope.launch {
            val localRequest = getUserForLocal("xxx")
            val remoteRequest = getUserFromRemote("yyy")
            // select 選擇執(zhí)行
            val userResponse = select<Response<User>> {
                localRequest.onAwait { Response(it, true) }
                remoteRequest.onAwait { Response(it, false) }
            }
            println("name:" + userResponse.value.name + "-- isLocal:" + userResponse.isLocal)
        }.join()

    }
}

select:誰先返回,就選擇誰。

(8)復(fù)用多個(gè)Channel
fun main() {
    runBlocking {

        val channels = listOf(Channel<Int>(), Channel<Int>())
        GlobalScope.launch {
            delay(100)
            channels[0].send(200)
        }
        GlobalScope.launch {
            delay(50)
            channels[0].send(100)
        }
        val result = select<Int?> {
            channels.forEach { channel ->
                channel.onReceive { it }
            }
        }
        println(result)

    }
}
(9)SelectClause

我們?cè)趺粗滥男┦录梢员籹elect呢? 其實(shí)所有能夠被select的事件都是selectClauseN 類型,包括:

  • selectClause0:對(duì)應(yīng)事件沒有返回值,例如join沒有返回值,那么onJoin就是SelectClauseN類型。使用時(shí),onJoin的參數(shù)是一個(gè)無參函數(shù)。
  • selectClause1:對(duì)應(yīng)事件有返回值,例如:onAwait和onReceive都是此類情況。
  • selectClause2:對(duì)應(yīng)事件有返回值,此外還需要一個(gè)額外的參數(shù),例如Channel.onSend有兩個(gè)參數(shù),第一個(gè)是具體的數(shù)據(jù),第二個(gè)參數(shù)是發(fā)送成功的回調(diào)參數(shù)。

-> 如果我們想要確認(rèn)掛起函數(shù)是否支持select,只需要查看其是否存在對(duì)應(yīng)的SelectClauseN類型可回調(diào)即可。

selectClause0舉例:

fun main() {
    runBlocking {
        val job1 = GlobalScope.launch {
            delay(100)
            println("job 1")
        }
        val job2 = GlobalScope.launch {
            delay(10)
            println("job 2")
        }
        select<Unit> {
            job1.onJoin { println("job1 onJoin") }
            job2.onJoin { println("job2 onJoin") }
        }
    }
}

selectClause1舉例:

data class User(val name: String)
data class Response<T>(val value: T, val isLocal: Boolean)
suspend fun CoroutineScope.getUserForLocal(name: String) = async(Dispatchers.IO) {
    delay(1000)
    User(name)
}

suspend fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
    delay(100)
    User(name)
}

fun main() {
    runBlocking {

        GlobalScope.launch {
            val localRequest = getUserForLocal("xxx")
            val remoteRequest = getUserFromRemote("yyy")
            // select 選擇執(zhí)行
            val userResponse = select<Response<User>> {
                localRequest.onAwait { Response(it, true) }
                remoteRequest.onAwait { Response(it, false) }
            }
            println("name:" + userResponse.value.name + "-- isLocal:" + userResponse.isLocal)
        }.join()

    }
}
fun main() {
    runBlocking {

        val channels = listOf(Channel<Int>(), Channel<Int>())
        GlobalScope.launch {
            delay(100)
            channels[0].send(200)
        }
        GlobalScope.launch {
            delay(50)
            channels[0].send(100)
        }
        val result = select<Int?> {
            channels.forEach { channel ->
                channel.onReceive { it }
            }
        }
        println(result)

    }
}

selectClause2舉例:

fun main() {
    runBlocking {
        val channels = listOf(Channel<Int>(), Channel<Int>())
        println(channels)
        launch(Dispatchers.IO) {
            select<Unit> {
                launch {
                    delay(10)
                    channels[1].onSend(200) { sentChannel->
                        println("sent on $sentChannel")
                    }
                }
                launch {
                    delay(100)
                    channels[0].onSend(100) { sentChannel->
                        println("sent on $sentChannel")
                    }
                }
            }
        }
        GlobalScope.launch {
            println(channels[0].receive())
        }
        GlobalScope.launch {
            println(channels[1].receive())
        }
    }
}

[完...]

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容