協(xié)程: 協(xié)程最核心的點就是,函數(shù)或者一段程序能夠被掛起,稍后再在掛起的位置恢復(fù)。掛起的函數(shù)要在協(xié)程里面執(zhí)行。
特點: 1、更加輕量級,占用資源更少。
2、避免“回調(diào)地獄”,增加代碼可讀性。
3、協(xié)程的掛起不阻塞線程。
1、協(xié)程的掛起與恢復(fù)
suspend- 掛起函數(shù)不會阻塞當(dāng)前線程。當(dāng)一個掛起函數(shù)被調(diào)用時,它會暫停協(xié)程的執(zhí)行,但不會阻塞底層線程。這意味著即使你在主線程上調(diào)用掛起函數(shù),也不會使主線程陷入等待狀態(tài)。
resume -用于讓已暫停的協(xié)程從其暫停處繼續(xù)執(zhí)行。
掛起函數(shù)只能在協(xié)程體內(nèi)或其他掛起函數(shù)內(nèi)調(diào)用
tv?.setOnClickListener {
GlobalScope.launch(Dispatchers.Main) {
//掛起,點擊N次響應(yīng)N次
delay(4000)
Log.i("jia", "-----")
}
//點擊一次無效因為阻塞了, 點擊N次也是響應(yīng)一次而且會崩潰
Thread.sleep(4000)
Log.i("jia", "=====")
}
阻塞: 等98號技師,98號在忙,中間不干其他的,就一直等直到她結(jié)束
掛起: 等98號技師,98號在忙,前臺記下來,先看電影或者干其他的,結(jié)束后直接服務(wù)你
2、調(diào)度器
所有協(xié)程必須在調(diào)度器中運行,即使它們在主線程上運行也是如此。
Dispatchers.Main Android 上的主線程
用來處理UI交互和一些輕量級任務(wù):調(diào)用suspend函數(shù)、調(diào)用UI函教、更新LiveData
Dispatchers.lO 非主線程
轉(zhuǎn)為磁盤和網(wǎng)絡(luò)IO進行優(yōu)化,用來處理數(shù)據(jù)庫、文件、讀寫網(wǎng)絡(luò)處理
Dispatchers.Default 非主線程
專為CPU密集型任務(wù)進行了憂化:數(shù)組排序、JSON數(shù)據(jù)解析
3、協(xié)程作用域
定義協(xié)程必須指定其CoroutineScope,它會跟蹤所有協(xié)程,同樣它還可以取消由它所啟動的所有協(xié)程
常用的相關(guān)API有:
GlobalScope,生命周期是process級別的,即使Activity或Fragment已經(jīng)被銷毀,協(xié)程仍然在執(zhí)行。
MainScope,在Activity中使用,可以在onDestroy()中取消協(xié)程。默認運行在主線程。
viewModelScope,只能在ViewModel中使用,自動管理協(xié)程的生命周期,綁定ViewModel的生命周期。
lifecycleScope,只能在Activity、Fragment中使用,自動管理協(xié)程的生命周期,確保在生命周期結(jié)束時取消協(xié)程。會綁定Activity和Fragment的生命周期。
CoroutineScope:適用于需要手動管理協(xié)程生命周期的場景,可以在 Activity 或 Fragment 中使用,但需要手動取消協(xié)程。
SupervisorScope :子協(xié)程獨立運行,適用于多個協(xié)程同時運行的場景。
4、協(xié)程的啟動模式
DEFAULT:協(xié)程創(chuàng)建后,立即開始調(diào)度,在調(diào)度前如果協(xié)程被取消,其將直接進入取消響應(yīng)的狀態(tài)。
類似滴滴打車,立即開始調(diào)度不一定立馬有車,需要等待車來,但是可以在車來之前取消。
ATOMIC:協(xié)程創(chuàng)建后,立即開始調(diào)度,協(xié)程執(zhí)行到第一個掛起點之前不響應(yīng)取消。
LAZY:只有協(xié)程被需要時,包括主動調(diào)用協(xié)程的start、join或者await等函數(shù)時才會開始調(diào)度,如果調(diào)度前就被取消,那么該協(xié)程將直接進入異常結(jié)束狀態(tài)。join await 都是掛起函數(shù),不會阻塞主線程。
UNDISPATCHED:協(xié)程創(chuàng)建后立即在當(dāng)前函數(shù)調(diào)用棧中執(zhí)行,直到遇到第一個真正掛起的點。
//如何使用Dispatchers.IO調(diào)度器,執(zhí)行仍然在主線程就是用Coroutinestart.UNDISPATCHED
val job =GlobalScope.async(context=Dispatchers.IO,start=Coroutinestart.UNDISPATCHED){
println("thread:"+Thread.currentThread().name)
}
5、協(xié)程的作用域構(gòu)建器 coroutineScope與runBlocking
runBlocking是常規(guī)函數(shù),而coroutineScope是掛起函數(shù)。
它們都會等待其協(xié)程體以及所有子協(xié)程結(jié)束,主要區(qū)別在于runBlocking方法會阻塞當(dāng)前線程來等待,而coroutineScope只是掛起,會釋放底層線程用于其他用途。
coroutineScope與supervisorScope
coroutineScope:一個協(xié)程失敗了,所有其他兄弟協(xié)程也會被取消。
supervisorScope:一個協(xié)程失敗了,不會影響其他兄弟協(xié)程。
suspend fun 將main() = runBlocking {
coroutineScope { //這里換成supervisorScope,子協(xié)程1就會執(zhí)行
val job1 = launch {
delay(5000)
println("子協(xié)程1")
}
val job2 = launch {
delay(1000)
println("子協(xié)程2")
throw Exception() //子協(xié)程1不會執(zhí)行
}
}
}
6、Job對象
對于每一個創(chuàng)建的協(xié)程((通過launch或者async,會返回一個Job實例,該實例是協(xié)程的唯一標示,并且負責(zé)管理協(xié)程的生命周期。
一個任務(wù)可以包含一系列狀態(tài):新創(chuàng)建(New)、活躍(Active),完成中(Completing)、已完成(Completed),(取消中(Cancelling)和已取消(Cancelled)雖然我們無法直接訪問這些狀態(tài),但是我們可以訪問Job的屬性: isActive、isCancelled和iscompleted。
job.join
會掛起調(diào)用該函數(shù)的協(xié)程,直到目標協(xié)程執(zhí)行完畢。這樣可以確保當(dāng)前協(xié)程不會繼續(xù)執(zhí)行后續(xù)的代碼,直到與Job關(guān)聯(lián)的協(xié)程已經(jīng)結(jié)束。join函數(shù)是非阻塞的,即使它會掛起當(dāng)前協(xié)程,它不會阻塞底層線程。因此,它在并發(fā)編程中非常有用,可以用來控制協(xié)程的執(zhí)行順序,確保某些任務(wù)在其他任務(wù)之前完成。
job.cancel() 的作用是請求取消與該 Job 對象關(guān)聯(lián)的協(xié)程,但是不會立馬生效 。
具體來說,當(dāng)你調(diào)用 job.cancel() 時,會向協(xié)程發(fā)送一個取消信號,請求它停止執(zhí)行。然而,這并不意味著協(xié)程會立即終止,因為協(xié)程的取消是協(xié)作性的。換句話說,協(xié)程必須在合適的時機檢查取消狀態(tài)并進行響應(yīng)。通常協(xié)程會通過掛起函數(shù)(如 delay 或 yield)來檢測是否被取消,并在取消時拋出一個 CancellationException。
7、CPU密集型任務(wù)取消
isActive是一個可以被使用在CoroutineScope中的擴展屬性,檢查Job是否處于活躍狀態(tài)。
ensureActive(),如果job處于非活躍狀態(tài),這個方法會立即拋出異常。
yield函數(shù)會檢查所在協(xié)程的狀態(tài),如果已經(jīng)取消,則拋出CancellationException予以響應(yīng)。此外,它還會嘗試出讓線程的執(zhí)行權(quán),給其他協(xié)程提供執(zhí)行機會。
處于取消中狀態(tài)的協(xié)程不能夠掛起(運行不能取消的代碼),當(dāng)協(xié)程被取消后需要調(diào)用掛起函數(shù),我們需要將清理任務(wù)的代碼放置于 NonCancellable CoroutineContext 中
fun yield() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5&& isActive) {//增加isActive狀態(tài) 來取消任務(wù)
if (System.currentTimeMillis() >= nextPrintTime) {
// ensureActive()//下面這兩種方式都可以取消任務(wù)
// yield()
println("job:I'm sleeping ${i++}。。。1")
nextPrintTime += 500
}
}
try {
delay(10000)
println("job:I'm sleeping")
}finally {
println("job:I'm finally")
withContext(NonCancellable){
//NonCancellable使用后,即使上述任務(wù)取消了,也不影響下面的執(zhí)行
delay(1300)
println("job:I'm back")
}
}
}
delay(1300)
println("main:I'm tired of waiting!")
job.cancelAndJoin()
println("main:Now Ican guit.")
}
8、超時任務(wù)
很多情況下取消一個協(xié)程的理由是它有可能超時。
withTimeoutOrNull 通過返回null來進行超時操作,從而替代拋出一個異常。
9、SupervisorJob
使用SupervisorJob時,一個子協(xié)程的運行失敗不會影響到其他子協(xié)程。Supervisorjob不會傳播異常給它的父級,它會讓子協(xié)程自己處理異常。
這種需求常見于在作用域內(nèi)定義作業(yè)的UI組件,如果任何一個U的子作業(yè)執(zhí)行失敗了,它并不總是有必要取消整個UI組件,但是如果UI組件被銷毀了,由于它的結(jié)果不再被需要了,它就有必要使所有的子作業(yè)執(zhí)行失敗。
supervisorScope:當(dāng)作業(yè)自身執(zhí)行失敗的時候,所有子作業(yè)都會被取消。
10、異常捕獲
使用CoroutineExceptionHandler對協(xié)程的異常進行捕獲。以下的條件被滿足時,異常就會被捕獲:。
時機:異常是被自動拋出異常的協(xié)程所拋出的(使用launch,而不是 async 時);
位置:在CoroutineScope的CoroutineContext中或在一個根協(xié)程(CoroutineScope 或者 supervisorScope 的直接子協(xié)程)中。
當(dāng)子協(xié)程被取消時,不會取消它的父協(xié)程,
如果一個協(xié)程遇到了CancellationException以外的異常,它將使用該異常取消它的父協(xié)程。當(dāng)父協(xié)程的所有子協(xié)程都結(jié)束后,異常才會被父協(xié)程處理。
fun catchHandle() = runBlocking<Unit> {
val handler= CoroutineExceptionHandler { coroutineContext, exception ->
println("catch"+exception)
}
val job=GlobalScope.launch(handler) {
throw AssertionError() //會被捕獲
launch(handler) {
throw AssertionError() //此內(nèi)部協(xié)程的方式不會被捕獲
}
}
val job2=GlobalScope.async(handler) {
throw ArithmeticException()//不會被捕獲
}
job.join()
job2.await()
}
常見異常捕獲方式
val handler= CoroutineExceptionHandler { coroutineContext, exception ->
println("catch"+exception)
}
tv?.setOnClickListener {
GlobalScope.launch(handler) {
Log.i("jia", "onclick")
"abc".substring(10)//必閃退 ,但是加了異常捕獲,就不會閃退了,還會打印閃退信息
}
}
11、Flow 與其他方式的區(qū)別
名為flow的Flow類型構(gòu)建器函數(shù)。
flow{.. }構(gòu)建塊中的代碼可以掛起。
函數(shù)simpleFlow不再標有suspend修飾符。
流使用emit函數(shù)發(fā)射值。
流使用collect函數(shù)收集值。
//返回多個值,是異步的
fun simpleFlow() = flow<Int> {
for (i in 1..3) {
delay(1000)//假裝在一些重要的事情
emit(i)//發(fā)射,產(chǎn)生一個元素
}
}
fun collectFlow()= runBlocking<Unit> {
simpleFlow().collect { value ->
println(value)
}
}
12、冷流
冷流:顧名思義冷啟動,臨陣磨槍
①、Flow是一種類似于序列的冷流,flow構(gòu)建器中的代碼直到流被收集的時候才運行。
fun collectFlow()= runBlocking<Unit> {
val flow =simpleFlow()
printIn("Calling collect...")
flow.collect{value ->printIn(value)}
println("Calling collect again...")
flow.collect{value ->printIn(value)}
}
②、流的每次單獨收集都是按順序執(zhí)行的,除非使用特殊操作符.
從上游到下游每個過渡操作符都會處理每個發(fā)射出的值,然后再交給末端操作符.
下列打印結(jié)果:
Collect string 2
Collect string 4
fun collectFlow() = runBlocking<Unit> {
//過濾偶數(shù)并將其映射到字符串
(1..5).asFlow().filter {
it % 2 == 0
}.map {
"string $it"
}.collect {
printIn("Collect $it")
}
}
③、flowOf構(gòu)建器定義了一個發(fā)射固定值集的流。
使用.asFlow()擴展函數(shù),可以將各種集合與序列轉(zhuǎn)換為流。
fun collectFlow() = runBlocking<Unit> {
flowOf("one", "two", "three")
.onEach { delay(1000) }
.collect { value -> printIn(value) }
(1..3).asFlow().collect { value ->
println(value)
}
}
④、流的收集總是在調(diào)用協(xié)程的上下文中發(fā)生,流的該屬性稱為上下文保存。
flow.{...}構(gòu)建器中的代碼必須遵循上下文保存屬性,并且不允許從其他上下文中發(fā)射(emit)。
flowOn操作符,該函數(shù)用于更改流發(fā)射的上下文。
fun simpleFlow() = flow<Int> {
for (i in 1..3) {
delay(1000)//假裝在一些重要的事情
emit(i)//發(fā)射,產(chǎn)生一個元素
}
}.flowOn(Dispatchers.Default)
⑤、使用launchIn替換collect我們可以在單獨的協(xié)程中啟動流的收集。
onEach 它允許你在 Flow 的每個元素被收集時執(zhí)行一個副作用(side effect)。簡單來說,它可以在每個元素通過數(shù)據(jù)流時對其應(yīng)用某種處理,但不會改變這些元素本身或影響流的正常傳播。
onEach 提供了一種優(yōu)雅的方式來為 Flow 添加額外的行為,而不必直接修改流的內(nèi)容或其傳播方式。
//事件源
fun events() = (1..3)
.asFlow()
.onEach { delay(1000) }
.flowOn(Dispatchers.Default)
fun collectFlow5() = runBlocking<Unit> {
events().onEach { event -> println("Event: $event ${Thread.currentThread().name})") }
// .collect()
.launchIn(CoroutineScope(Dispatchers.IO))
}
⑥、為方便起見,流構(gòu)建器對每個發(fā)射值執(zhí)行附加的 ensureActive 檢測以進行取消的這意味著從 flow{..}發(fā)出的繁忙循環(huán)是可以取消的。
出于性能原因,大多數(shù)其他流操作不會自行執(zhí)行其他取消檢測,在協(xié)程處于繁忙循環(huán)的情況下,必須明確檢測是否取消。
通過cancellable操作符來執(zhí)行此操作。
⑦、背壓:背壓(Backpressure)是指當(dāng)生產(chǎn)者(數(shù)據(jù)生成方)生成數(shù)據(jù)的速度快于消費者(數(shù)據(jù)處理方)處理數(shù)據(jù)的速度時,系統(tǒng)需要采取的一種機制來處理這種不匹配的情況。背壓機制允許生產(chǎn)者暫時停止生成數(shù)據(jù),直到消費者能夠跟上處理速度,從而避免數(shù)據(jù)丟失或系統(tǒng)崩潰。
buffer(),并發(fā)運行流中發(fā)射元素的代碼。
conflate(),合并發(fā)射項,不對每個值進行處理。
collectLatest(),取消并重新發(fā)射最后一個值。
當(dāng)必須更改CoroutineDispatcher時,flowOn操作符使用了相同的緩沖機制,但是buffer函數(shù)顯式地請求緩沖而不改變執(zhí)行上下文。
fun collectFlow6() = runBlocking<Unit> {
simpleFlow()
.buffer(50)//并發(fā)運行流中發(fā)射元素的代碼。
.conflate()//合并發(fā)射項,不對每個值進行處理。
.collect { value ->
delay(3000)
println(value)
}
}
⑧、轉(zhuǎn)換轉(zhuǎn)換符
suspend fun performRequest(request: Int): String {
delay(1000)
return "response $request"
}
fun simpleFlow() = flow<Int> {
(1..3).asFlow()
.map { performRequest(it) }
.collect { value ->
println(value)
}
(1..3).asFlow()
.transform {
emit("Making request $it")
emit(performRequest(it))
}
.collect { value ->
println(value)
}
}
限長操作符 take(2),表示最多接受兩條數(shù)據(jù)。
組合操作符:就像Kotlin標準庫中的Sequence,zip擴展函數(shù)一樣,流擁有一個zip操作符用于組合兩個流中的相關(guān)值。
fun collectFlow() = runBlocking<Unit> {
val nubs=(1..3).asFlow()
val strs= flowOf("one","Two","Three")
nubs.zip(strs){
a,b->"$a ->$b"
}.collect{
println(it)
}
}
1、Channel實際上是一個并發(fā)安全的隊列,它可以用來連接協(xié)程,實現(xiàn)不同協(xié)程的
通信。channel默認容量1。
fun ceshiChannel() = runBlocking<Unit> {
//生產(chǎn)者
val channel = Channel<Int>()
val producer = GlobalScope.launch {
var i = 0
while (true) {
delay(1000)
channel.send(++i)
println("send $i")
}
}
//消費者
val consumer = GlobalScope.launch {
while (true) {
val element=channel.receive()
println("receive $element")
}
}
joinAll(producer,consumer)
}
②迭代Channel
Channel本身確實像序列,所以我們在讀取的時候可以直接獲取一個Channel的iterator.
fun ceshiChannel2() = runBlocking<Unit> {
//生產(chǎn)者
val channel = Channel<Int>()
val producer = GlobalScope.launch {
for (x in 1..5) {
channel.send(x)
println("send $x")
}
}
//消費者
val consumer = GlobalScope.launch {
val iterator =channel.iterator()
while (iterator.hasNext()) {
val element=channel.receive()
println("receive $element")
delay(2000)
}
//也或者可以以下寫法
for(element in channel){
println("receive $element")
delay(2000)
}
}
joinAll(producer,consumer)
}
③、produce與actor 構(gòu)造生產(chǎn)者與消費者的便捷方法
我們可以通過produce方法啟動一個生產(chǎn)者協(xié)程,并返回一個ReceiveChannel,其他協(xié)程就可以用這個Channel來接收數(shù)據(jù)了。反過來,我們可以用actor啟動一個消費者協(xié)程
fun ceshiChannel3() = runBlocking<Unit> {
//生產(chǎn)者
val receiveChannel:ReceiveChannel<Int> = GlobalScope.produce {
repeat(100){
delay(1000)
send(it)
}
}
//消費者
val consumer = GlobalScope.launch {
for(element in receiveChannel){
println("receive $element")
}
}
joinAll(consumer)
}
fun ceshiChannel4() = runBlocking<Unit> {
//消費者
val sendChannel:SendChannel<Int> = GlobalScope.actor {
while (true){
val element=receive()
println(element)
}
}
//生產(chǎn)者
val producer = GlobalScope.launch {
for (x in 1..5) {
sendChannel.send(x)
}
}
producer.join()
}
④、Channel的關(guān)閉
produce和actor返回的Channel都會隨著對應(yīng)的協(xié)程執(zhí)行完畢而關(guān)閉,也正是這樣,Channel才被稱為熱數(shù)據(jù)流。
對于一個Channel,如果我們調(diào)用了它的close方法,它會立即停止接收新元素,也就是說這時它的isClosedforSend會立即返回true。而由于Channel緩沖區(qū)的存在,這時候可能還有一些元素沒有被處理完,因此要等所有的元素都被讀取之后isClosedForReceive才會返回true。
Channel的生命周期最好由主導(dǎo)方來維護,建該由主導(dǎo)的一方實現(xiàn)關(guān)閉。
⑤、BroadcastChannel
前面提到,發(fā)送端和接收端在Channel中存在一對多的情形,從數(shù)據(jù)處理本身來講,雖然有多個接收端,但是同一個元素只會被一個接收端讀到。廣播則不然多個接收端不存在互斥行為。
fun ceshiChannel5() = runBlocking<Unit> {
//消費者 也可以通過下面的broadcast進行轉(zhuǎn)換
val broadcastChannel= BroadcastChannel<Int>(Channel.BUFFERED)
// val channel = Channel<Int>()
// val broadcastChannel=channel.broadcast(3)
val producer = GlobalScope.launch {
List(3){
delay(1000)
broadcastChannel.send(it)
}
broadcastChannel.close()
}
//消費者
List(3){index->
GlobalScope.launch {
val receiveChannel=broadcastChannel.openSubscription()
for (x in receiveChannel) {
println(x)
}
}
}.joinAll()
}
⑥、await多路復(fù)用
兩個API分別從網(wǎng)絡(luò)和本地緩存獲取數(shù)據(jù),期望哪個先返回就先用哪個做展示。
⑦、多個Channel復(fù)用
跟await類似,會接收到最快的那個channel消息。