(1)基本用法
Channel實(shí)際上是一個(gè)并發(fā)安全的隊(duì)列,它可以用來連接協(xié)程,實(shí)現(xiàn)不同協(xié)程的通信。
生產(chǎn)者/消費(fèi)者模式 (send - channel - receive)
Channel的基本用法如下:
runBlocking {
val channel = Channel<Int>()
// 生產(chǎn)者
val producer = GlobalScope.launch {
var i = 0
while(true) {
delay(1000)
channel.send(++i)
println("send $i")
}
}
// 消費(fèi)者
val consumer = GlobalScope.launch {
while(true) {
val element = channel.receive()
println("receive $element")
}
}
joinAll(producer, consumer)
}
(2)Channel的容量
Channel實(shí)際上就是一個(gè)隊(duì)列,隊(duì)列中一定存在緩沖區(qū),那么一旦這個(gè)緩沖區(qū)滿了,
并且也一直沒有人調(diào)用receive取走數(shù)據(jù),send就需要掛起。
故意讓接收端的節(jié)奏放慢,發(fā)現(xiàn)send總是會(huì)掛起,直到receive之后才會(huì)繼續(xù)往下執(zhí)行。
Channel的默認(rèn)大小為0。
(3)迭代Channel
Channel本身確實(shí)像序列,所以我們?cè)谧x取的時(shí)候可以直接獲取一個(gè)Channel的iterator。
runBlocking {
val channel = Channel<Int>(Channel.UNLIMITED)
// 生產(chǎn)者
val producer = GlobalScope.launch {
for (x in 1..5) {
println("send ${x * x}")
channel.send(x * x)
}
}
// 消費(fèi)者
val consumer = GlobalScope.launch {
val iterator = channel.iterator()
while(iterator.hasNext()) {
val element = iterator.next()
println("receive $element")
delay(1000)
}
}
joinAll(producer, consumer)
}
消費(fèi)者代碼也可以改成:
// 消費(fèi)者
val consumer = GlobalScope.launch {
for (element in channel) {
println("receive $element")
delay(1000)
}
}
(4)produce與actor
構(gòu)造生產(chǎn)者與消費(fèi)者的便捷方法。
我們可以通過produce方法啟動(dòng)一個(gè)生產(chǎn)者協(xié)程,并返回一個(gè)ReceiveChannel,其他協(xié)程就可以用這個(gè)Channel來接收數(shù)據(jù)了。
反過來,我們可以用actor啟動(dòng)一個(gè)消費(fèi)者協(xié)程。
produce演示:
runBlocking {
val receiveChannel = GlobalScope.produce<Int> {
repeat(100) {
delay(1000)
send(it)
}
}
// 消費(fèi)者
val consumer = GlobalScope.launch {
for (element in receiveChannel) {
println("receive $element")
}
}
consumer.join()
}
actor演示:
runBlocking {
// 消費(fèi)者
val sendChannel = GlobalScope.actor<Int> {
while (true) {
val element = receive()
println("receive $element")
}
}
// 生產(chǎn)者
val producer = GlobalScope.launch {
repeat(100) {
sendChannel.send(it)
delay(1000)
}
}
producer.join()
}
(5)Channel關(guān)閉
produce和actor返回的Channel都會(huì)隨著對(duì)應(yīng)的協(xié)程執(zhí)行完畢而關(guān)閉,也正是這樣,Channel才被稱為 熱數(shù)據(jù)流。
對(duì)于一個(gè)Channel,如果我們調(diào)用了它的close方法,它會(huì)立即停止接收新元素,也就是說這時(shí)它的 isClosedForSend 會(huì)立即返回true,而由于Channel緩沖區(qū)的存在,這時(shí)可能還有一些元素沒有處理完,因此要等所有的元素都讀取之后 isClosedForReceive 才會(huì)返回true。
Channel的生命周期最好由主導(dǎo)方來維護(hù),建議 由主導(dǎo)的一方實(shí)現(xiàn)關(guān)閉。
runBlocking {
val channel = Channel<Int>(3)
// 生產(chǎn)者
val producer = GlobalScope.launch {
List(3) {
channel.send(it)
println("send $it")
}
channel.close()
println("producer -> isClosedForSend:" + channel.isClosedForSend + " -- isClosedForReceive:" + channel.isClosedForReceive)
}
// 消費(fèi)者
val consumer = GlobalScope.launch {
for (element in channel) {
println("receive:$element")
delay(1000)
}
println("consumer -> isClosedForSend:" + channel.isClosedForSend + " -- isClosedForReceive:" + channel.isClosedForReceive)
}
joinAll(producer, consumer)
}
(6)BroadcastChannel
正常情況下,一個(gè) 發(fā)送者 對(duì)應(yīng)著一個(gè) 接收者。
使用 BroadcastChannel 可以存在多個(gè)接收者。
runBlocking {
val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
// 生產(chǎn)者
val producer = GlobalScope.launch {
List(3) {
delay(1000)
broadcastChannel.send(it)
}
broadcastChannel.close()
}
List(3) { index->
GlobalScope.launch {
val receiveChannel = broadcastChannel.openSubscription()
for (element in receiveChannel) {
delay(1000)
println("[#${index}] receive:$element")
}
}
}.joinAll()
producer.join()
}
BroadcastChannel<Int>(Channel.BUFFERED) 可以改成 Channel<Int>().broadcast(Channel.BUFFERED)。
(7)await多路復(fù)用
什么是多路復(fù)用?
數(shù)據(jù)通信系統(tǒng)或計(jì)算機(jī)網(wǎng)絡(luò)系統(tǒng)中,傳輸媒體的帶寬或容量往往會(huì)大于傳輸單一信號(hào)的需求,為了有效地利用通信線路,希望 一個(gè)信道同時(shí)傳輸多路信號(hào),這就是所謂多路復(fù)用技術(shù)。
復(fù)用多個(gè)await?
兩個(gè)API分別從網(wǎng)絡(luò)和本地緩存 獲取數(shù)據(jù),期望哪個(gè)先返回就先用哪個(gè)做展示。
request->server->response--
----Select -> Response
request->server->response--
data class User(val name: String)
data class Response<T>(val value: T, val isLocal: Boolean)
suspend fun CoroutineScope.getUserForLocal(name: String) = async(Dispatchers.IO) {
delay(1000)
User(name)
}
suspend fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
delay(100)
User(name)
}
fun main() {
runBlocking {
GlobalScope.launch {
val localRequest = getUserForLocal("xxx")
val remoteRequest = getUserFromRemote("yyy")
// select 選擇執(zhí)行
val userResponse = select<Response<User>> {
localRequest.onAwait { Response(it, true) }
remoteRequest.onAwait { Response(it, false) }
}
println("name:" + userResponse.value.name + "-- isLocal:" + userResponse.isLocal)
}.join()
}
}
select:誰先返回,就選擇誰。
(8)復(fù)用多個(gè)Channel
fun main() {
runBlocking {
val channels = listOf(Channel<Int>(), Channel<Int>())
GlobalScope.launch {
delay(100)
channels[0].send(200)
}
GlobalScope.launch {
delay(50)
channels[0].send(100)
}
val result = select<Int?> {
channels.forEach { channel ->
channel.onReceive { it }
}
}
println(result)
}
}
(9)SelectClause
我們?cè)趺粗滥男┦录梢员籹elect呢? 其實(shí)所有能夠被select的事件都是selectClauseN 類型,包括:
- selectClause0:對(duì)應(yīng)事件沒有返回值,例如join沒有返回值,那么onJoin就是SelectClauseN類型。使用時(shí),onJoin的參數(shù)是一個(gè)無參函數(shù)。
- selectClause1:對(duì)應(yīng)事件有返回值,例如:onAwait和onReceive都是此類情況。
- selectClause2:對(duì)應(yīng)事件有返回值,此外還需要一個(gè)額外的參數(shù),例如Channel.onSend有兩個(gè)參數(shù),第一個(gè)是具體的數(shù)據(jù),第二個(gè)參數(shù)是發(fā)送成功的回調(diào)參數(shù)。
-> 如果我們想要確認(rèn)掛起函數(shù)是否支持select,只需要查看其是否存在對(duì)應(yīng)的SelectClauseN類型可回調(diào)即可。
selectClause0舉例:
fun main() {
runBlocking {
val job1 = GlobalScope.launch {
delay(100)
println("job 1")
}
val job2 = GlobalScope.launch {
delay(10)
println("job 2")
}
select<Unit> {
job1.onJoin { println("job1 onJoin") }
job2.onJoin { println("job2 onJoin") }
}
}
}
selectClause1舉例:
data class User(val name: String)
data class Response<T>(val value: T, val isLocal: Boolean)
suspend fun CoroutineScope.getUserForLocal(name: String) = async(Dispatchers.IO) {
delay(1000)
User(name)
}
suspend fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
delay(100)
User(name)
}
fun main() {
runBlocking {
GlobalScope.launch {
val localRequest = getUserForLocal("xxx")
val remoteRequest = getUserFromRemote("yyy")
// select 選擇執(zhí)行
val userResponse = select<Response<User>> {
localRequest.onAwait { Response(it, true) }
remoteRequest.onAwait { Response(it, false) }
}
println("name:" + userResponse.value.name + "-- isLocal:" + userResponse.isLocal)
}.join()
}
}
fun main() {
runBlocking {
val channels = listOf(Channel<Int>(), Channel<Int>())
GlobalScope.launch {
delay(100)
channels[0].send(200)
}
GlobalScope.launch {
delay(50)
channels[0].send(100)
}
val result = select<Int?> {
channels.forEach { channel ->
channel.onReceive { it }
}
}
println(result)
}
}
selectClause2舉例:
fun main() {
runBlocking {
val channels = listOf(Channel<Int>(), Channel<Int>())
println(channels)
launch(Dispatchers.IO) {
select<Unit> {
launch {
delay(10)
channels[1].onSend(200) { sentChannel->
println("sent on $sentChannel")
}
}
launch {
delay(100)
channels[0].onSend(100) { sentChannel->
println("sent on $sentChannel")
}
}
}
}
GlobalScope.launch {
println(channels[0].receive())
}
GlobalScope.launch {
println(channels[1].receive())
}
}
}
[完...]