什么是Coroutines(協(xié)程)

一、什么是Coroutines(協(xié)程)

協(xié)程是很久之前就提出的一個概念,目前支持協(xié)程的語言包括 lua、C#、go等。也包括Android官方開發(fā)語言Kotlin。當(dāng)然網(wǎng)上對此也有很多的爭議,很多說法認(rèn)為Kotlin中的協(xié)程是個偽協(xié)程,沒有實(shí)現(xiàn)go語言的那種協(xié)程特性,而僅僅是對于java線程的一個包裝,本文也認(rèn)同這種觀點(diǎn),因?yàn)樗]有脫離JVM來實(shí)現(xiàn),所以仍然受java線程模型限制。這里只去談?wù)揔otlin協(xié)程的用法和原理,暫時拋開對于協(xié)程概念的不同理解。

kotlinx.coroutines 是由 JetBrains開發(fā)的功能豐富的協(xié)程庫。它涵蓋很多啟用高級協(xié)程的原語,包括 launch、 async 等等。
coroutines通過掛起函數(shù)的概念完成協(xié)程任務(wù)調(diào)度,協(xié)程是輕量級線程,本質(zhì)上是在線程上進(jìn)行任務(wù)調(diào)度。甚至可以粗俗的理解為類似于進(jìn)程和線程的關(guān)系,一個進(jìn)程中可以包括多個線程,而一個線程中可以包括多個協(xié)程。但執(zhí)行上是有區(qū)別的,一個進(jìn)程中可以有多個線程同時并發(fā)執(zhí)行,但是一個線程中的多個協(xié)程本質(zhì)上是順序執(zhí)行的,是應(yīng)用協(xié)程掛起的方式來表現(xiàn)為并發(fā)執(zhí)行。

二、協(xié)程創(chuàng)建

1.協(xié)程的創(chuàng)建主要有三種方式:

1)launch創(chuàng)建。

返回值是Job,Job用來處理協(xié)程的取消等操作。這種創(chuàng)建方式是非阻塞的,創(chuàng)建的協(xié)程并不會阻塞創(chuàng)建協(xié)程的線程,也可以通過Job的join方法阻塞線程,來等待協(xié)程執(zhí)行結(jié)束。如果當(dāng)前創(chuàng)建處沒有協(xié)程上下文信息需要使用GlobalScope調(diào)用launch方法以頂層協(xié)程的方式創(chuàng)建。但是用GlobalScope.launch和直接用launch方式創(chuàng)建有一些區(qū)別,GlobalScope.launch默認(rèn)是開啟新線程來執(zhí)行協(xié)程任務(wù)的,launch是直接在當(dāng)前上下文中的線程執(zhí)行。

     val coroutineJob = GlobalScope.launch {
          Log.d(TAG, "current Thread is ${Thread.currentThread()}")
      }
     Log.d(TAG, "GlobalScope.launch create coroutine")

可以看到輸出的日志順序是先輸出協(xié)程外部的日志,后輸出協(xié)程內(nèi)部的日志,并且協(xié)程內(nèi)部任務(wù)的執(zhí)行是在工作線程。

2020-05-21 15:52:39.137 20964-20964/com.common.coroutines_retrofit_okhttp D/MainActivity: GlobalScope.launch create coroutine
2020-05-21 15:52:39.138 20964-20997/com.common.coroutines_retrofit_okhttp D/MainActivity: current Thread is Thread[DefaultDispatcher-worker-1,5,main]

這里可能會有人有疑問,因?yàn)閰f(xié)程在工作線程執(zhí)行,工作線程本身就不會阻塞主線程,為了進(jìn)一步驗(yàn)證這種方式創(chuàng)建了非阻塞的協(xié)程,在協(xié)程的創(chuàng)建時指定協(xié)程執(zhí)行在主線程。

      val coroutineJob = GlobalScope.launch(Dispatchers.Main) {
          Log.d(TAG, "current Thread is ${Thread.currentThread()}")
      }
      Log.d(TAG, "GlobalScope.launch create coroutine")

可以看到輸出的日志順序仍然和之前一樣,但是協(xié)程執(zhí)行的線程變成了主線程。從這里可以看出協(xié)程并沒有阻塞住主線程的執(zhí)行。

2020-05-21 15:55:59.664 22312-22312/com.common.coroutines_retrofit_okhttp D/MainActivity: GlobalScope.launch create coroutine
2020-05-21 15:55:59.695 22312-22312/com.common.coroutines_retrofit_okhttp D/MainActivity: current Thread is Thread[main,5,main]

2)runBlocking創(chuàng)建。

返回一個指定的類型,類型由協(xié)程任務(wù)的返回值控制,阻塞式創(chuàng)建,這種方式會阻塞住創(chuàng)建協(xié)程的線程,只有協(xié)程執(zhí)行結(jié)束才能繼續(xù)線程的下一步執(zhí)行,默認(rèn)執(zhí)行在創(chuàng)建協(xié)程的線程。

        val coroutine2 = runBlocking {
            Log.d(TAG, "current Thread is ${Thread.currentThread()}")
        }
        Log.d(TAG, "runBlocking create coroutine")

從日志輸出可以看到在協(xié)程執(zhí)行完畢,主線程的日志才進(jìn)行打印。

2020-05-21 15:57:27.927 22781-22781/com.common.coroutines_retrofit_okhttp D/MainActivity: current Thread is Thread[main,5,main]
2020-05-21 15:57:27.927 22781-22781/com.common.coroutines_retrofit_okhttp D/MainActivity: runBlocking create coroutine

為了進(jìn)一步驗(yàn)證阻塞性,指定runBlocking創(chuàng)建的協(xié)程在工作線程執(zhí)行,并且在協(xié)程中模擬一個耗時任務(wù)。

       val coroutine2 = runBlocking(Dispatchers.IO) {
          Log.d(TAG, "current Thread is ${Thread.currentThread()}")
          delay(5000)
      }
      Log.d(TAG, "runBlocking create coroutine")

從日志中可以看到協(xié)程執(zhí)行在工作線程,但是主線程仍然等待5秒,等待協(xié)程執(zhí)行完畢。

2020-05-21 15:58:47.506 23031-23106/com.common.coroutines_retrofit_okhttp D/MainActivity: current Thread is Thread[DefaultDispatcher-worker-1,5,main]
2020-05-21 15:58:52.516 23031-23031/com.common.coroutines_retrofit_okhttp D/MainActivity: runBlocking create coroutine

3)async創(chuàng)建。

返回值是Deferred,非阻塞式創(chuàng)建,很類似launch方式。如果當(dāng)前創(chuàng)建處沒有協(xié)程上下文信息也需要使用GlobalScope調(diào)用async方法創(chuàng)建,GlobalScope.async和直接用async方式創(chuàng)建的區(qū)別和launch是一樣的。主要是特點(diǎn)是處理協(xié)程并發(fā),當(dāng)多個協(xié)程在同一個線程執(zhí)行時,一個協(xié)程掛起了,不會阻塞另一個協(xié)程執(zhí)行。

runBlocking {
      var startTime = System.currentTimeMillis()
      val time = measureTimeMillis {
          val deferred1 = async {
              delay(2000L)
              Log.d(TAG, "deferred1 get result , current thread is ${Thread.currentThread()}")
          }

          val deferred2 = async {
              delay(3000L)
              Log.d(TAG, "deferred2 get result , current thread is ${Thread.currentThread()}")
          }

          Log.d(TAG, "result is ${deferred1.await() + deferred2.await()}")
      }
      Log.d(TAG, "cost time is $time")
      Log.d(TAG, "cost time2 is ${System.currentTimeMillis() - startTime}")

  }

從日志中可以看出兩個協(xié)程執(zhí)行總耗時大概3s中,并不是兩個協(xié)程總體延遲5s,說明在第一個協(xié)程掛起進(jìn)行延時的時候,第二個協(xié)程已開始調(diào)度執(zhí)行。并且兩個協(xié)程都是在runBlocking所在的主線程中執(zhí)行

2020-05-21 16:00:23.534 23638-23638/com.common.coroutines_retrofit_okhttp D/MainActivity: deferred1 get result , current thread is Thread[main,5,main]
2020-05-21 16:00:24.536 23638-23638/com.common.coroutines_retrofit_okhttp D/MainActivity: deferred2 get result , current thread is Thread[main,5,main]
2020-05-21 16:00:24.538 23638-23638/com.common.coroutines_retrofit_okhttp D/MainActivity: result is 150
2020-05-21 16:00:24.539 23638-23638/com.common.coroutines_retrofit_okhttp D/MainActivity: cost time is 3011
2020-05-21 16:00:24.539 23638-23638/com.common.coroutines_retrofit_okhttp D/MainActivity: cost time2 is 3012

2.協(xié)程可以嵌套使用。

父子協(xié)程來執(zhí)行不同的任務(wù)。在協(xié)程的嵌套中子協(xié)程可以省略GlobalScope,直接調(diào)用launch和async就可以進(jìn)行創(chuàng)建,這樣直接共用父協(xié)程的作用域,在父協(xié)程所在的線程執(zhí)行。也可以通過Dispatchers指定作用的線程。GlobalScope其實(shí)是協(xié)程的作用域,協(xié)程的執(zhí)行必須有作用域,這個后面會講解到。這里舉一個最簡單的嵌套的例子。

        runBlocking {
          launch {
              Log.d(TAG, "launch current Thread is ${Thread.currentThread()}")
          }
          Log.d(TAG, "current Thread is ${Thread.currentThread()}")
      }
2020-05-21 16:02:11.161 24076-24076/com.common.coroutines_retrofit_okhttp D/MainActivity: current Thread is Thread[main,5,main]
2020-05-21 16:02:11.162 24076-24076/com.common.coroutines_retrofit_okhttp D/MainActivity: launch current Thread is Thread[main,5,main]

可以看到runBlocking內(nèi)部通過launch又創(chuàng)建了一個協(xié)程,并且launch使用runBlocking的協(xié)程上下文在主線程中執(zhí)行。
協(xié)程嵌套有幾個需要注意的點(diǎn):

1)父協(xié)程取消執(zhí)行的時候,子協(xié)程也會被取消執(zhí)行。

2)父協(xié)程總是會等待子協(xié)程執(zhí)行結(jié)束。

3.掛起函數(shù)

說起協(xié)程就必須講掛起函數(shù)的概念,掛起函數(shù)是實(shí)現(xiàn)協(xié)程機(jī)制的基礎(chǔ),Kotlin中通過suspend關(guān)鍵字聲明掛起函數(shù),掛起函數(shù)只能在協(xié)程中執(zhí)行,或者在別的掛起函數(shù)中執(zhí)行。delay就是一個掛起函數(shù),掛起函數(shù)會掛起當(dāng)前協(xié)程。協(xié)程會等待掛起函數(shù)執(zhí)行完畢再繼續(xù)執(zhí)行其余任務(wù)。

     private suspend fun doWork(){
      Log.d(TAG,"doWork start")
      delay(5000)
      Log.d(TAG,"doWork end")
  }

這里定義一個掛起函數(shù),打印兩行日志,在這兩行日志之間調(diào)用delay掛起函數(shù)掛起協(xié)程5s中。

2020-05-21 16:04:40.022 25119-25119/? D/MainActivity: doWork start
2020-05-21 16:04:45.025 25119-25119/? D/MainActivity: doWork end

三、協(xié)程取消與超時

1.協(xié)程取消。

協(xié)程提供了取消操作,如果一個協(xié)程任務(wù)未執(zhí)行完畢,但是執(zhí)行結(jié)果已經(jīng)不需要了,這時可以調(diào)用cancel函數(shù)取消協(xié)程,也可以調(diào)用cancelAndJoin方法取消協(xié)程并等待任務(wù)結(jié)束,相當(dāng)于調(diào)用cancel然后調(diào)用join。

    runBlocking {
          val job = launch {
              delay(500)
              Log.d(TAG, "launch running Coroutines")
          }
          Log.d(TAG, "waiting launch running")
          job.cancelAndJoin()
          Log.d(TAG, "runBlocking running end")
      }

2.超時處理

協(xié)程在執(zhí)行中可能超過預(yù)期的執(zhí)行時間,這時候就需要取消協(xié)程的執(zhí)行,協(xié)程提供了withTimeout函數(shù)來處理超時的情況,但是withTimeout函數(shù)在超時的時候會拋出異常TimeoutCancellationException,可以選擇捕獲這個異常。協(xié)程也提供了withTimeoutOrNull函數(shù)并返回null來替代拋出異常。

 /**
   * 添加超時處理
   * withTimeout
   */
  fun timeOutCoroutines() = runBlocking {
      withTimeout(1300L) {
          repeat(1000) { i ->
              Log.d(TAG,"I'm sleeping $i ...")
              delay(500L)
          }
      }
  }

四、協(xié)程調(diào)度器與作用域

1.協(xié)程調(diào)度器

協(xié)程上下文包含一個協(xié)程調(diào)度器,即CoroutineDispatcher,它確定了哪些線程或與線程相對應(yīng)的協(xié)程執(zhí)行。協(xié)程調(diào)度器可以將協(xié)程限制在一個特定的線程執(zhí)行,或?qū)⑺峙傻揭粋€線程池,亦或是讓它不受限地運(yùn)行。所有的協(xié)程構(gòu)建器諸如 launch 和 async 接收一個可選的 CoroutineContext 參數(shù),它可以被用來顯式的為一個新協(xié)程或其它上下文元素指定一個調(diào)度器。

  /**
   * 協(xié)程上下文(實(shí)際控制協(xié)程在那個線程執(zhí)行)
   * launch和async都可接收CoroutineContext函數(shù)控制協(xié)程執(zhí)行的線程
   * Dispatchers.Unconfined一種特殊的調(diào)度器(非受限調(diào)度器),運(yùn)行在默認(rèn)的調(diào)度者線程,掛起后恢復(fù)在默認(rèn)的執(zhí)行者kotlinx.coroutines.DefaultExecutor中執(zhí)行
   * Dispatchers.Default 默認(rèn)調(diào)度器,采用后臺共享的線程池(不傳上下文,默認(rèn)采用這種)
   * newSingleThreadContext 單獨(dú)生成一個線程
   * Dispatchers.IO IO線程
   */
  fun coroutineConetxt() = runBlocking {
      launch { // 運(yùn)行在父協(xié)程的上下文中,即 runBlocking 主協(xié)程
          Log.d(TAG, "Im working in thread ${Thread.currentThread().name}")
      }
      launch(Dispatchers.Unconfined) { // 不受限的——將工作在主線程中
          Log.d(TAG, "Unconfined before I'm working in thread ${Thread.currentThread().name}")
          delay(500)
          Log.d(TAG, "Unconfined after I'm working in thread ${Thread.currentThread().name}")
      }
      launch(Dispatchers.Default) { // 將會獲取默認(rèn)調(diào)度器
          Log.d(TAG, "Default I'm working in thread ${Thread.currentThread().name}")
      }
      launch(newSingleThreadContext("MyOwnThread")) { // 將使它獲得一個新的線程
          Log.d(TAG, "newSingleThreadContext  I'm working in thread ${Thread.currentThread().name}")
      }

      launch(Dispatchers.IO) {
          Log.d(TAG, "IO I'm working in thread ${Thread.currentThread().name}")
      }
  }
 
2020-05-21 16:06:32.752 25509-25509/com.common.coroutines_retrofit_okhttp D/MainActivity: Unconfined before I'm working in thread main
2020-05-21 16:06:32.764 25509-25553/com.common.coroutines_retrofit_okhttp D/MainActivity: Default I'm working in thread DefaultDispatcher-worker-1
2020-05-21 16:06:32.766 25509-25555/com.common.coroutines_retrofit_okhttp D/MainActivity: newSingleThreadContext  I'm working in thread MyOwnThread
2020-05-21 16:06:32.766 25509-25553/com.common.coroutines_retrofit_okhttp D/MainActivity: IO I'm working in thread DefaultDispatcher-worker-1
2020-05-21 16:06:32.766 25509-25509/com.common.coroutines_retrofit_okhttp D/MainActivity: Im working in thread main
2020-05-21 16:06:33.255 25509-25552/com.common.coroutines_retrofit_okhttp D/MainActivity: Unconfined after I'm working in thread kotlinx.coroutines.DefaultExecutor

從日志輸出可以看到。

1)launch默認(rèn)在調(diào)用的協(xié)程上下文中執(zhí)行,即runBlocking所在的主線程。

2)Dispatchers.Unconfined在調(diào)用線程啟動以一個協(xié)程,掛起之后再次恢復(fù)執(zhí)行在默認(rèn)的執(zhí)行者kotlinx
.coroutines.DefaultExecutor線程中執(zhí)行。

3)Dispatchers.Default默認(rèn)調(diào)度器,開啟新線程執(zhí)行協(xié)程。

4)Dispatchers.IO創(chuàng)建在IO線程執(zhí)行。

5)newSingleThreadContext創(chuàng)建一個獨(dú)立的線程執(zhí)行。

如果需要在協(xié)程中控制和切換部分任務(wù)執(zhí)行所在的線程,可通過withContext關(guān)鍵字。withContext關(guān)鍵字接收的也是協(xié)程調(diào)度器,由此控制切換任務(wù)所在線程。

  /**
   * withContext 線程切換
   */
  fun switchThread() = runBlocking {
      launch {
          Log.d(TAG, "start in thread ${Thread.currentThread().name}")
          val job = withContext(Dispatchers.IO) {
              delay(5000)
              Log.d(TAG, "I'm working in thread ${Thread.currentThread().name}")
          }
          Log.d(TAG, "end in thread ${Thread.currentThread().name}")
      }

  }
2020-05-21 16:07:55.225 25723-25723/com.common.coroutines_retrofit_okhttp D/MainActivity: start in thread main
2020-05-21 16:08:00.239 25723-25796/com.common.coroutines_retrofit_okhttp D/MainActivity: I'm working in thread DefaultDispatcher-worker-1
2020-05-21 16:08:00.240 25723-25723/com.common.coroutines_retrofit_okhttp D/MainActivity: end in thread main

從日志輸出可以看到withContext將任務(wù)調(diào)度到IO線程執(zhí)行。

協(xié)程作用域

協(xié)程都有自己的作用域(CoroutineScope),協(xié)程調(diào)度器是在協(xié)程作用域上的擴(kuò)展,協(xié)程的執(zhí)行需要由作用域控制。除了由不同的構(gòu)建器提供協(xié)程作用域之外,還可以使用coroutineScope構(gòu)建器聲明自己的作用域。它會創(chuàng)建一個協(xié)程作用域并且在所有已啟動子協(xié)程執(zhí)行完畢之前不會結(jié)束。runBlocking 與 coroutineScope 可能看起來很類似,因?yàn)樗鼈兌紩却鋮f(xié)程體以及所有子協(xié)程結(jié)束。 這兩者的主要區(qū)別在于,runBlocking 方法會阻塞當(dāng)前線程來等待, 而 coroutineScope 只是掛起,會釋放底層線程用于其他用途。 由于存在這點(diǎn)差異,runBlocking 是常規(guī)函數(shù),而 coroutineScope 是掛起函數(shù)。

 /**
   * 協(xié)程作用域 coroutineScope創(chuàng)建協(xié)程作用域
   * runBlocking會等待協(xié)程作用域內(nèi)執(zhí)行結(jié)束
   */
  fun makeCoroutineScope() = runBlocking {
      launch {
          Log.d(TAG, "launch current Thread is ${Thread.currentThread()}")
      }
      coroutineScope {
          // 創(chuàng)建一個協(xié)程作用域
          launch {
              Log.d(TAG, "coroutineScope launch current Thread is ${Thread.currentThread()}")
          }

          Log.d(TAG, "coroutineScope current Thread is ${Thread.currentThread()}")
      }

      Log.d(TAG, "runBlocking current Thread is ${Thread.currentThread()}")
  }

五、應(yīng)用

從以上分析應(yīng)該知道協(xié)程可以用來做什么了,協(xié)程可用來處理異步任務(wù),如網(wǎng)絡(luò)請求、讀寫文件等,可以用編寫同步代碼的方式來完成異步的調(diào)用,省去了各種網(wǎng)絡(luò)、異步的回調(diào)。這里做一個最簡單的網(wǎng)絡(luò)請求的例子,使用Retrofit+Okhttp請求網(wǎng)絡(luò)數(shù)據(jù),然后用Glide加載請求回來的圖片。以前寫網(wǎng)絡(luò)請求的時候往往封裝一套RxJava+Retrofit+Okhttp來處理,這里將RxJava替換成Coroutines(協(xié)程)。

image

主要看請求網(wǎng)絡(luò)相關(guān)的代碼。

class MainViewModel : ViewModel() {
  companion object {
      const val TAG = "MainViewModel"
  }

  private val mainScope = MainScope()

  private val repertory: MainRepository by lazy { MainRepository() }
  var data: MutableLiveData<JsonBean> = MutableLiveData()

  fun getDataFromServer() = mainScope.launch {
      val jsonBeanList = withContext(Dispatchers.IO) {
          Log.d(TAG, "${Thread.currentThread()}")
          repertory.getDataFromServer()
      }
      data.postValue(jsonBeanList)
  }

  override fun onCleared() {
      super.onCleared()
      mainScope.cancel()
  }

}

使用了MainScope來引入?yún)f(xié)程作用域,在這里跟正常使用GlobalScope.launch來創(chuàng)建運(yùn)行在主線程的協(xié)程是一樣的,然后在協(xié)程中通過withContext開啟IO線程執(zhí)行聯(lián)網(wǎng)請求。

class MainRepository {

   suspend fun getDataFromServer() :JsonBean{
      return RetrofitRequest.instance.retrofitService.json()
   }
}
class RetrofitRequest private constructor() {

   private val retrofit: Retrofit by lazy {
       Retrofit.Builder()
               .client(RetrofitUtil.genericClient())
               .addConverterFactory(GsonConverterFactory.create())
               .baseUrl(RetrofitUtil.baseUrl)
               .addCallAdapterFactory(CoroutineCallAdapterFactory())
               .build()
   }
   val retrofitService: RetrofitService by lazy {
       retrofit.create(RetrofitService::class.java)
   }


   companion object {
       val instance: RetrofitRequest by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { RetrofitRequest() }
   }
}
interface RetrofitService {

   @GET(Api.json)
   suspend fun json(): JsonBean

}

這里導(dǎo)入了JakeWharton大神編寫的retrofit2-kotlin-coroutines-adapter適配器來做轉(zhuǎn)換,替換之前的Retrofit轉(zhuǎn)RxJava的適配器。可以看到處理線程切換只需要withContext一行代碼,并且沒有類似CallBack的回調(diào),整體代碼編寫就是同步代碼的方式。之前使用RxJava的時候還需要對RxJava鏈?zhǔn)秸埱筮M(jìn)行一些封裝來完成網(wǎng)絡(luò)請求的CallBack。代碼如下:

fun <T> Observable<T>.parse(success: (T) -> Unit) {
   this.subscribeOn(Schedulers.io())
           .unsubscribeOn(Schedulers.io())
           .observeOn(AndroidSchedulers.mainThread())
           .subscribe(object : Subscriber<T>() {
               override fun onNext(t: T) {
                   success(t)
               }

               override fun onCompleted() {
               }

               override fun onError(e: Throwable?) {
               }
           })
}

創(chuàng)建了一個Observable的擴(kuò)展函數(shù)parse,通過success函數(shù)將網(wǎng)絡(luò)請求結(jié)果回傳到界面層,相比RxJava協(xié)程不需要進(jìn)行添加CallBack。

Demo地址:

Coroutines
https://github.com/24KWYL/Coroutines-Retrofit-Okhttp

RxJava
https://github.com/24KWYL/MVVM

六、總結(jié)

通過協(xié)程可以很方便的處理異步任務(wù),可以用同步的方式處理異步請求,減少回調(diào)代碼。協(xié)程也提供Flow、Channel等操作,類似于RxJava的流式操作。功能上在很多地方可以替換RxJava,也可以實(shí)現(xiàn)RxJava的多種操作符。并且使用上更加簡單。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容