github原文地址
原創(chuàng)翻譯,轉(zhuǎn)載請保留或注明出處:http://www.itdecent.cn/p/afd359a976e2
通道(Channels)
延遲值提供了一個在協(xié)程間傳輸單個值的便捷方式。Channel 提供了一種傳輸數(shù)據(jù)流的方式。
channel基礎(chǔ)
Channel 是與 BlockingQueue很相似的概念。一個關(guān)鍵的區(qū)別是它具有一個掛起 send 代替阻塞的put 操作,同時具有一個掛起 receive 代替阻塞的take 操作。
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
}
獲取完整代碼 here
輸出結(jié)果:
1
4
9
16
25
Done!
關(guān)閉和channel迭代
與隊列不同,可以通過關(guān)閉channel來指示出不再有元素到來。在接收端,通過常規(guī)的 for 循環(huán)來接收channel里的元素是很方便的。
從概念上來說,一個 close 調(diào)用就像往channel里發(fā)送一個特殊的關(guān)閉token。一旦受到此關(guān)閉token,迭代就會停止,這樣可以保證收到關(guān)閉命令前,先前所有發(fā)送的元素都可以被接收到。
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
}
獲取完整代碼 here
構(gòu)建channel生產(chǎn)者
協(xié)程生成一系列元素的模式很常見。這是“生產(chǎn)者-消費者模式”的一部分,這種模式經(jīng)常出現(xiàn)在并發(fā)代碼中。你可以將這樣一個生產(chǎn)者抽象為一個用channel作為參數(shù)的函數(shù),但這與常規(guī)意義上的“結(jié)果必須從函數(shù)返回”相違背。
在生產(chǎn)者端有一種便利的方式來做這件事:使用名為 produce 的協(xié)程構(gòu)建器。而且消費者端可以用擴展函數(shù) consumeEach 代替for循環(huán)。
fun produceSquares() = produce<Int> {
for (x in 1..5) send(x * x)
}
fun main(args: Array<String>) = runBlocking<Unit> {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
獲取完整代碼 here
流水線(Pipelines)
流水線(pipeline)是一種協(xié)程生成可能是無限的數(shù)據(jù)流的模式:
fun produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}
與此同時其他的協(xié)程消費這個流,做一些處理,并產(chǎn)生一些其他結(jié)果。在以下示例中數(shù)值只是被平方:
fun square(numbers: ReceiveChannel<Int>) = produce<Int> {
for (x in numbers) send(x * x)
}
主函數(shù)啟動并連接整個pipeline:
fun main(args: Array<String>) = runBlocking<Unit> {
val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
for (i in 1..5) println(squares.receive()) // print first five
println("Done!") // we are done
squares.cancel() // need to cancel these coroutines in a larger app
numbers.cancel()
}
獲取完整代碼 here
我們不必在此示例應(yīng)用中取消這些協(xié)程,因為 coroutines are like daemon threads 協(xié)程就像守護線程,但是在一個更大型的應(yīng)用中,如果我們不再需要它,那就需要停止我們的pipeline。又或者,我們可以將pipeline協(xié)程作為 children of a main coroutine 主協(xié)程的子協(xié)程運行,如下所示:
使用pipeline生成素數(shù)
讓我們通過一個使用協(xié)程pipeline生成素數(shù)的例子將pipeline帶到極端。我們從一個無限的數(shù)值序列開始,這次我們引入一個顯示的 context 參數(shù)并將其傳給 produce 構(gòu)建器,所以調(diào)用者可以控制我們協(xié)程的運行位置:
fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
以下的pipeline stage過濾傳入的數(shù)值流,刪除所有可被給定素數(shù)整除的數(shù)值:
fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) {
for (x in numbers) if (x % prime != 0) send(x)
}
現(xiàn)在我們開始構(gòu)建一個從2起的數(shù)值流pipeline,從當(dāng)前channel獲取素數(shù),然后為每個找到的素數(shù)啟動新的pipeline stage:
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
以下示例打印前十個素數(shù),在主線程的上下文中運行整個pipeline。由于所有協(xié)程都是作為主 runBlocking 協(xié)程的子協(xié)程在其 coroutineContext 上下文中啟動的,因為我們不必保留一份明確的清單記錄我們所啟動的所有協(xié)程。我們使用 cancelChildren 擴展函數(shù)來取消所有子協(xié)程。
fun main(args: Array<String>) = runBlocking<Unit> {
var cur = numbersFrom(coroutineContext, 2)
for (i in 1..10) {
val prime = cur.receive()
println(prime)
cur = filter(coroutineContext, cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
獲取完整代碼 here
這段代碼輸出如下:
2
3
5
7
11
13
17
19
23
29
注意一點,你可以使用標(biāo)注庫中的 buildIterator 協(xié)程構(gòu)建器來構(gòu)建相同的pipeline。用 buildIterator 替代 produce ,yield 替代 send ,next 替代receive ,Iterator 替代 ReceiveChannel ,并去除上下文。同時也不需要使用 runBlocking 。然而如上所示,pipeline使用channel的好處是,如果在 CommonPool 上下文中運行它,實際上可以使用多個cpu核心。
無論如何,這是一種極不切實際的尋找素數(shù)的方法。實際上,pipeline確實涉及一些其他的掛起調(diào)用(比如對遠(yuǎn)程服務(wù)的異步調(diào)用),并且這些pipeline不能使用 buildSeqeunce/buildIterator 構(gòu)建,因為它們不允許任意掛起。這與完全的異步的 produce 有所不同。
扇出(Fan-out)
多個協(xié)程可以從同一個channel接收數(shù)據(jù),在它們之間分配工作。讓我們從一個定時生產(chǎn)整型(每秒10個數(shù))的生產(chǎn)者協(xié)程開始:
fun produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
接著我們可以有幾個處理協(xié)程。在這個示例中,它們只是打印它們的id和接收數(shù)值:
fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
現(xiàn)在我們啟動5個處理器,讓它們工作近乎1秒鐘,看看會發(fā)生什么:
fun main(args: Array<String>) = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
}
獲取完整代碼 here
輸出會和下面這個很類似,盡管接收每個特定整型的處理器id有可能不同:
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
注意,取消生產(chǎn)者協(xié)程會關(guān)閉他的channel,從而最終終止處理器協(xié)程正在channel上執(zhí)行的迭代。
另外,請注意我們在 launchProcessor 代碼中如何使用for循環(huán)顯式迭代channel來運行扇出。與consumeEach 不同,這種for循環(huán)模式可以非常安全地在多協(xié)程環(huán)境中使用。如果其中一個處理器協(xié)程崩潰,其他處理器協(xié)程仍然會處理這條channel,而通過 consumeEach 編寫的處理器總是消費(取消)底層channel,無論是正常還是異常地終止。
扇入(Fan-in)
多個協(xié)程也許會發(fā)送數(shù)據(jù)到同一個channel。例如,我們有一個字符串channel和一個掛起函數(shù),這個函數(shù)以指定的延遲重復(fù)發(fā)送一個特定的字符串到這個channel:
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
現(xiàn)在,讓我們看看如果啟動一組發(fā)送字符串的協(xié)程會發(fā)生什么(這個例子中,我們在主線程的上下文中,將它們作為主協(xié)程的子協(xié)程進行啟動):
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<String>()
launch(coroutineContext) { sendString(channel, "foo", 200L) }
launch(coroutineContext) { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
獲取完整代碼 here
輸出如下:
foo
foo
BAR!
foo
foo
BAR!
緩沖channel
到目前為止展示的channel都沒有緩沖區(qū)。當(dāng)發(fā)送方和接收方相遇時,無緩沖的channel傳輸數(shù)據(jù)(也稱為會和)。如果先調(diào)用send,則將其掛起直到receive被調(diào)用。反之亦然。
Channel() 工廠方法和 produce 構(gòu)建器都使用可選的容量參數(shù) capacity 來指定緩沖區(qū)大小。緩沖區(qū)允許發(fā)送發(fā)在掛起之前發(fā)送多個元素,類似于具有指定容量的 BlockingQueue ,當(dāng)緩沖區(qū)塞滿時阻塞。
看看下面代碼的行為:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>(4) // create buffered channel
val sender = launch(coroutineContext) { // launch sender coroutine
repeat(10) {
println("Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full
}
}
// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine
}
獲取完整代碼 here
它使用容量為4的緩沖channel打印了5次 "sending":
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
前4個元素被加入到了緩沖區(qū)內(nèi),然后發(fā)送者在嘗試發(fā)送第5個時掛起了。
Ticker channels
Ticker channel 是一種特殊的會和channel,每次自這個channel上次的消費后、經(jīng)過給定延遲時間后生產(chǎn) Unit 。雖然它看起來似乎沒有用,但它是一種對于創(chuàng)建復(fù)雜的基于時間的 produce pipelines 和 operators 來說有用的構(gòu)建塊,這些 produce pipelines 和 operators 可以進行窗口化和其他一些時間相關(guān)的處理。Ticker channel 可以在 select 中用來執(zhí)行 "on tick" 動作。
使用工廠方法 ticker 來創(chuàng)建這類channel。為了表明不再需要其他元素,使用 ReceiveChannel.cancel 方法。
現(xiàn)在我來看一下在實踐中它是如何運作的:
fun main(args: Array<String>) = runBlocking<Unit> {
val tickerChannel = ticker(delay = 100, initialDelay = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// Emulate large consumption delays
println("Consumer pauses for 150ms")
delay(150)
// Next element is available immediately
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// Note that the pause between `receive` calls is taken into account and next element arrives faster
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // indicate that no more elements are needed
}
獲取完整代碼 here
打印如下行:
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
請注意,ticker 知道可能的消費者暫停,并且默認(rèn)情況下,如果發(fā)生了暫停,則延遲下個生產(chǎn)的元素,嘗試維持生產(chǎn)元素的固定比例。
可選地,可以指定一個值為 TickerMode.FIXED_DELAY 的 mode 參數(shù),來維持元素間的固定延遲。
Channels是公平的
對于從多個協(xié)程發(fā)起的發(fā)送和接收調(diào)用,channel對它們的調(diào)用順序是公平,以先進先出為序。例如第一個調(diào)用 receive 的協(xié)程獲得元素。在之后的示例中,兩個"ping","pong"協(xié)程從共享的"table"channel中接收"ball"對象。
data class Ball(var hits: Int)
fun main(args: Array<String>) = runBlocking<Unit> {
val table = Channel<Ball>() // a shared table
launch(coroutineContext) { player("ping", table) }
launch(coroutineContext) { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
獲取完整代碼 here
"ping"協(xié)程先啟動,所以它第一個接收到ball。即使"ping"協(xié)程在發(fā)送ball回table后,立即再次開始接收ball,ball還是被"pong"協(xié)程接收到了,因為它已經(jīng)在等待接收了:
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
請注意,由于正在使用的執(zhí)行者的特性,有時channel可能會產(chǎn)生看起來不公平的執(zhí)行結(jié)果。詳情參閱 this issue 。