# kotlin channel 入門

kotlin channel 入門

前言

最近項目中對 kotlin 的使用比較多。不得不說 kotlin 確實可以極大的提高 android 的開發(fā)效率,有許多之前得用 java 寫非常多、非常啰嗦的樣板代碼的 case,用 kotlin 卻可以幾行搞定,四兩撥千斤,同時邏輯表達也更加清晰。而 kotlin 對于 java 而言,最大的不同莫過于協(xié)程了。習慣了 kotlin 的協(xié)程,可能再也不想使用 java 的 handler + postDelay 了。因此,在這里,本人準備對 kotlin 協(xié)程中一些比較難以上手的點,進行說明和分析。這篇文章,將會帶大家一起學習一下 kotlin 協(xié)程中 channel 的使用。

channel 概述

kotlin 中,我們常用 defer 來進行協(xié)助之間單個值的傳遞。比如,我們可能會寫如下代碼:

val deferred = GlobalScope.async {
  // do something,
  "this is a result"
}
deferred.await()

用來等待一個異步協(xié)程的結(jié)果。在結(jié)果返回之前,當前協(xié)程掛起。那么,如果我們想獲取一系列的結(jié)果,應該怎么辦呢?注意,這里的一系列的結(jié)果,不是說我們需要一個 list,而是說,我們想第一次 await() 的時候,得到一個值,然后再次 await() 的時候,還能獲取到值。就像從一個隊列里面不斷的取出新的元素一樣。

這個時候我們就可以使用 channel 了。channel 非常類似于一個 java 中非常常見的概念 BlockingQueue 。只不過,BlockingQueue 使用可以阻塞的 put 方法,而 channel 使用可以掛起的 send 方法;BlockingQueue 使用可以阻塞的 take 方法,而 channel 使用可以掛起的 receive 方法。所以,如果什么時候我們對于 channel 的理解產(chǎn)生了困惑,可以簡單的把相關的內(nèi)容類比到 BlockingQueue 中,來幫助我們進行理解。

channel 的用法

val channel = Channel<Int>()
launch {
    for (x in 1..5) channel.send(x * x)
}
repeat(5) { println(channel.receive()) }
println("Done!")

簡單說明一下上面的代碼:我們有一個 channel ,我們會從這個 channel 中 receive 5 次。這五次一次獲取到從 1 到 5 一共五個數(shù)字。

這個簡單的代碼片段,其實蘊含了非常重要的程序執(zhí)行流程:

我們假設,根據(jù)代碼的書寫順序,先執(zhí)行到了 channel.send(1) 。根據(jù)上面闡述的內(nèi)容,send 因為是一個掛起的方法,第一次只會執(zhí)行 1,并把 1 放入到 channel 中。然后,receive 方法獲取到 1。這個時候在 repeat(5) 的循環(huán)中,再次執(zhí)行到 receive 的時候,因為 channel 中已經(jīng)沒有數(shù)了,所以 receive 會掛起。之后,協(xié)程會通過調(diào)度算法,讓 channel.send(2 * 2) 執(zhí)行,并讓 channel.send(3 * 3) 掛起。再之后,channel.receive() 在經(jīng)過調(diào)度之后,得到執(zhí)行,獲取到剛才 channel.send(2 * 2) 的結(jié)果,也就是 4 。以此類推。

  1. channel.send(1)
  2. 發(fā)送方掛起
  3. channel.receive(1)
  4. 接收方掛起
  5. channel.send(4)
  6. 發(fā)送方掛起
  7. channel.receive(4)
  8. 接收方掛起

。。。

channel 的關閉和遍歷

channelqueue 的一個不同的點就是,channel 是可以關閉的。close 這個動作,底層其實是給 channel 發(fā)送了一個消息。官方管這個東西叫 close token。因為 channel 在接收到 close 消息的時候,會立刻停止在這個 channel 上的遍歷的工作,所以 kotlin 會保證在 close 被調(diào)用之前已經(jīng)在 channel 中的消息被 received。

kotlin 為我們提供了一個簡單的 channel 的遍歷方法,也就是 for 循環(huán),使用方法如下:

val channel = Channel<Int>()
launch {
    for (x in 1..5) channel.send(x * x)
    channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")

channel 的流水線模式

流水線模式的使用場景如下:一個協(xié)程不斷的生產(chǎn)新的消息,其他協(xié)程不斷的處理這些消息,并且在這個過程中可能返回新的結(jié)果。跟我們說的函數(shù)式編程中的 map(映射) 非常類似。

這個模式可以讓我們很輕松的寫出一些簡潔而邏輯清晰的代碼,比如,下面代碼展示了如何生成素數(shù)的邏輯:

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}

fun main() {
  var cur = numbersFrom(2)
    repeat(10) {
    val prime = cur.receive()
    println(prime)
    cur = filter(cur, prime)
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
}

這樣,執(zhí)行 main() 方法之后,就會輸出前 10 個素數(shù)。這里的原理也很簡單。如果我們不考慮 kotlin 的語法問題,但從計算機的角度解決生成素數(shù)的問題,一種解法是,我們需要用一個 list 來存儲已經(jīng)找到的素數(shù),然后,在對 n 進行自增的過程中,遍歷所有已經(jīng)找到的素數(shù) list,如果所有的素數(shù)都不能整除 n,那么這個 n 就是新的素數(shù)。

pipeline 模式寫出的代碼,原理跟上面闡述的一樣。只是上文所說的 list 被封裝在了一層一層的 filter 中,最終執(zhí)行的過程中,對于一個 n ,需要通過所有的 filter,這跟上文說的遍歷所有已經(jīng)找到的素數(shù)列表的效果是一致的。

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ... 

扇入和扇出

扇入:多對一,多個 channel 作為生產(chǎn)者,一個 channel 作為消費者。

扇出:一對多,一個 channel 作為生產(chǎn)者,多個 channel 作為消費者。

雖然概念有不同,但是,寫法上,跟一對一的 channel 是一樣的。

緩沖 channel

channel 默認的 capacity 是 1。這也就是我們上文說的,send 方法在第二次會掛起,因為中間沒有 receive 來消費這個消息。直到有 receive 消費了上一個消息之后,剛才掛起的 send 才能恢復執(zhí)行。當然,我們可以通過設置參數(shù)讓這個 capacity 的值不為 1,比如4。那么,跟上面的分析是一樣的,send 會執(zhí)行四次,然后在第五次的時候掛起,直到有 receive 把消息給消費掉了之后,之前掛起的 send 才能繼續(xù)恢復執(zhí)行。

channel 的公平性

channel 是公平的。所以,他會嚴格的按照 first-in first-out 的順序來執(zhí)行。一個比較好的例子,就是模擬打乒乓球:

data class Ball(var hits: Int)

fun main() = runBlocking {
    val table = Channel<Ball>() // a shared table
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    coroutineContext.cancelChildren() // game over, cancel them
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        println("$name $ball")
        delay(300) // wait a bit
        table.send(ball) // send the ball back
    }
}

// prints
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

這里的公平性就體現(xiàn)在,ping 協(xié)程是先啟動的,所以理應獲得到 ball。但是 ping 在通過調(diào)用 sendball 送還給 channel 之后,又在循環(huán)的下一輪立刻請求獲取 ball。但是,因為 pongreceive 是比 ping 的下一個 receive 先調(diào)用的,(是在上一個 ping 的后面調(diào)用的),所以是 pong 得到 ball 而不是 ping。

Ticker channels

channel 還有一種比較常用的用法,就是用來實現(xiàn)令牌系統(tǒng)。比如,我們現(xiàn)在的需求,是每 100 ms 產(chǎn)生一個令牌,那么我在 51ms 來取,肯定是獲取不到的。但是我在 101ms 的時候來取,是可以獲取到的??紤]一種情況,令牌沒有得到及時的消費,比如,就是前 150ms 都沒有消費,那么第 151ms 來的消費者是可以立刻獲取到令牌的。但是,第 152ms 來的消費者是不能獲取到令牌的。但是,第 201ms 過來的消費者是可以獲取到令牌的。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • 這是第一次主動寫簡書,也是時隔多年再次提筆(上一次大概是高中作文?大學小論文?總之是別人要求的思想),這次是主動的...
    對立人閱讀 1,226評論 3 3
  • 許久許久之前,我在和朋友吹牛扯淡時信誓旦旦地表示: 在未來,微信一定會取消發(fā)表純文字朋友圈內(nèi)容的功能。 依據(jù)如下:...
    云寒閱讀 1,364評論 0 2
  • NO.1 人與人之間的關系往往不是因為某些具體的原因而斷絕。不,即使表面上有某種原因,其實是因為彼此的心已經(jīng)不在一...
    聚字成書閱讀 548評論 0 1
  • 敬愛的李老師,智慧的班主任,親愛的躍友們: 大家好!我是來自文登奧沃斯教育的王春葉,是黃櫟媛的人,今天是我日精進行...
    王春葉閱讀 144評論 0 0

友情鏈接更多精彩內(nèi)容