kotlin channel 入門
前言
最近項目中對 kotlin 的使用比較多。不得不說 kotlin 確實可以極大的提高 android 的開發(fā)效率,有許多之前得用 java 寫非常多、非常啰嗦的樣板代碼的 case,用 kotlin 卻可以幾行搞定,四兩撥千斤,同時邏輯表達也更加清晰。而 kotlin 對于 java 而言,最大的不同莫過于協(xié)程了。習慣了 kotlin 的協(xié)程,可能再也不想使用 java 的 handler + postDelay 了。因此,在這里,本人準備對 kotlin 協(xié)程中一些比較難以上手的點,進行說明和分析。這篇文章,將會帶大家一起學習一下 kotlin 協(xié)程中 channel 的使用。
channel 概述
kotlin 中,我們常用 defer 來進行協(xié)助之間單個值的傳遞。比如,我們可能會寫如下代碼:
val deferred = GlobalScope.async {
// do something,
"this is a result"
}
deferred.await()
用來等待一個異步協(xié)程的結(jié)果。在結(jié)果返回之前,當前協(xié)程掛起。那么,如果我們想獲取一系列的結(jié)果,應該怎么辦呢?注意,這里的一系列的結(jié)果,不是說我們需要一個 list,而是說,我們想第一次 await() 的時候,得到一個值,然后再次 await() 的時候,還能獲取到值。就像從一個隊列里面不斷的取出新的元素一樣。
這個時候我們就可以使用 channel 了。channel 非常類似于一個 java 中非常常見的概念 BlockingQueue 。只不過,BlockingQueue 使用可以阻塞的 put 方法,而 channel 使用可以掛起的 send 方法;BlockingQueue 使用可以阻塞的 take 方法,而 channel 使用可以掛起的 receive 方法。所以,如果什么時候我們對于 channel 的理解產(chǎn)生了困惑,可以簡單的把相關的內(nèi)容類比到 BlockingQueue 中,來幫助我們進行理解。
channel 的用法
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
}
repeat(5) { println(channel.receive()) }
println("Done!")
簡單說明一下上面的代碼:我們有一個 channel ,我們會從這個 channel 中 receive 5 次。這五次一次獲取到從 1 到 5 一共五個數(shù)字。
這個簡單的代碼片段,其實蘊含了非常重要的程序執(zhí)行流程:
我們假設,根據(jù)代碼的書寫順序,先執(zhí)行到了 channel.send(1) 。根據(jù)上面闡述的內(nèi)容,send 因為是一個掛起的方法,第一次只會執(zhí)行 1,并把 1 放入到 channel 中。然后,receive 方法獲取到 1。這個時候在 repeat(5) 的循環(huán)中,再次執(zhí)行到 receive 的時候,因為 channel 中已經(jīng)沒有數(shù)了,所以 receive 會掛起。之后,協(xié)程會通過調(diào)度算法,讓 channel.send(2 * 2) 執(zhí)行,并讓 channel.send(3 * 3) 掛起。再之后,channel.receive() 在經(jīng)過調(diào)度之后,得到執(zhí)行,獲取到剛才 channel.send(2 * 2) 的結(jié)果,也就是 4 。以此類推。
- channel.send(1)
- 發(fā)送方掛起
- channel.receive(1)
- 接收方掛起
- channel.send(4)
- 發(fā)送方掛起
- channel.receive(4)
- 接收方掛起
。。。
channel 的關閉和遍歷
channel 跟 queue 的一個不同的點就是,channel 是可以關閉的。close 這個動作,底層其實是給 channel 發(fā)送了一個消息。官方管這個東西叫 close token。因為 channel 在接收到 close 消息的時候,會立刻停止在這個 channel 上的遍歷的工作,所以 kotlin 會保證在 close 被調(diào)用之前已經(jīng)在 channel 中的消息被 received。
kotlin 為我們提供了一個簡單的 channel 的遍歷方法,也就是 for 循環(huán),使用方法如下:
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
channel 的流水線模式
流水線模式的使用場景如下:一個協(xié)程不斷的生產(chǎn)新的消息,其他協(xié)程不斷的處理這些消息,并且在這個過程中可能返回新的結(jié)果。跟我們說的函數(shù)式編程中的 map(映射) 非常類似。
這個模式可以讓我們很輕松的寫出一些簡潔而邏輯清晰的代碼,比如,下面代碼展示了如何生成素數(shù)的邏輯:
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}
fun main() {
var cur = numbersFrom(2)
repeat(10) {
val prime = cur.receive()
println(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
這樣,執(zhí)行 main() 方法之后,就會輸出前 10 個素數(shù)。這里的原理也很簡單。如果我們不考慮 kotlin 的語法問題,但從計算機的角度解決生成素數(shù)的問題,一種解法是,我們需要用一個 list 來存儲已經(jīng)找到的素數(shù),然后,在對 n 進行自增的過程中,遍歷所有已經(jīng)找到的素數(shù) list,如果所有的素數(shù)都不能整除 n,那么這個 n 就是新的素數(shù)。
用 pipeline 模式寫出的代碼,原理跟上面闡述的一樣。只是上文所說的 list 被封裝在了一層一層的 filter 中,最終執(zhí)行的過程中,對于一個 n ,需要通過所有的 filter,這跟上文說的遍歷所有已經(jīng)找到的素數(shù)列表的效果是一致的。
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
扇入和扇出
扇入:多對一,多個 channel 作為生產(chǎn)者,一個 channel 作為消費者。
扇出:一對多,一個 channel 作為生產(chǎn)者,多個 channel 作為消費者。
雖然概念有不同,但是,寫法上,跟一對一的 channel 是一樣的。
緩沖 channel
channel 默認的 capacity 是 1。這也就是我們上文說的,send 方法在第二次會掛起,因為中間沒有 receive 來消費這個消息。直到有 receive 消費了上一個消息之后,剛才掛起的 send 才能恢復執(zhí)行。當然,我們可以通過設置參數(shù)讓這個 capacity 的值不為 1,比如4。那么,跟上面的分析是一樣的,send 會執(zhí)行四次,然后在第五次的時候掛起,直到有 receive 把消息給消費掉了之后,之前掛起的 send 才能繼續(xù)恢復執(zhí)行。
channel 的公平性
channel 是公平的。所以,他會嚴格的按照 first-in first-out 的順序來執(zhí)行。一個比較好的例子,就是模擬打乒乓球:
data class Ball(var hits: Int)
fun main() = runBlocking {
val table = Channel<Ball>() // a shared table
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
// prints
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
這里的公平性就體現(xiàn)在,ping 協(xié)程是先啟動的,所以理應獲得到 ball。但是 ping 在通過調(diào)用 send 把 ball 送還給 channel 之后,又在循環(huán)的下一輪立刻請求獲取 ball。但是,因為 pong 的 receive 是比 ping 的下一個 receive 先調(diào)用的,(是在上一個 ping 的后面調(diào)用的),所以是 pong 得到 ball 而不是 ping。
Ticker channels
channel 還有一種比較常用的用法,就是用來實現(xiàn)令牌系統(tǒng)。比如,我們現(xiàn)在的需求,是每 100 ms 產(chǎn)生一個令牌,那么我在 51ms 來取,肯定是獲取不到的。但是我在 101ms 的時候來取,是可以獲取到的??紤]一種情況,令牌沒有得到及時的消費,比如,就是前 150ms 都沒有消費,那么第 151ms 來的消費者是可以立刻獲取到令牌的。但是,第 152ms 來的消費者是不能獲取到令牌的。但是,第 201ms 過來的消費者是可以獲取到令牌的。