聊一聊線程池和Kotlin協(xié)程

目前很多開(kāi)發(fā)組都用上協(xié)程來(lái)處理異步任務(wù)了,但是有的地方協(xié)程提供的原生API還是不足以應(yīng)付,比方說(shuō)一些SDK提供了傳入Executor的接口(以便復(fù)用調(diào)用者的線程池來(lái)執(zhí)行異步任務(wù)),這時(shí)候可以用這JDK提供的線程池,或者封裝一下協(xié)程也可以滿足需求。

協(xié)程提供了Dispatchers.DefaultDispatchers.IO 分別用于 計(jì)算密集型 任務(wù)和 IO密集型 任務(wù),類似于RxJava的 Schedulers.computation()Schedulers.io()
但兩者有所差異,比如RxJava的 Schedulers.io() 不做并發(fā)限制,而 Dispatchers.io() 做了并發(fā)限制:

It defaults to the limit of 64 threads or the number of cores (whichever is larger)

考慮到當(dāng)前移動(dòng)設(shè)備的CPU核心數(shù)都不超過(guò)64,所以可以認(rèn)為協(xié)程的 Dispatchers.IO 的最大并發(fā)為64。
Dispatchers.Default 的并發(fā)限制為:

By default, the maximal level of parallelism used by this dispatcher is equal to the number of CPU cores, but is at least two

考慮到目前Android設(shè)備核心數(shù)都在2個(gè)以上,所以可以認(rèn)為 Dispatchers.Default 的最大并發(fā)為 CPU cores。
Dispatchers.DefaultDispatchers.IO 是共享協(xié)程自己的線程池的,二者可以復(fù)用線程。
不過(guò)目前這兩個(gè)Dispatchers 并未完全滿足項(xiàng)目中的需求,有時(shí)我們需要一些自定義的并發(fā)限制,其中最常見(jiàn)的是串行。

RxJava有Schedulers.single() ,但這個(gè)Schedulers.single()和AsyncTask的SERAIL_EXECOTOR一樣,是全局串行,不同的任務(wù)處在同一個(gè)串行隊(duì)列,會(huì)相互堵塞,因而可能會(huì)引發(fā)問(wèn)題。

或許也是因?yàn)檫@個(gè)原因,kotlin協(xié)程沒(méi)有定義“Dispatchers.Single"。
對(duì)于需要串行的場(chǎng)景,可以這樣實(shí)現(xiàn):

val coroutineContext: CoroutineContext =
    Executors.newSingleThreadExecutor().asCoroutineDispatcher()

這樣可以實(shí)現(xiàn)局部的串行,但和協(xié)程的線程池是相互獨(dú)立的,不能復(fù)用線程。
線程池的好處:

  1. 提高響應(yīng)速度:任務(wù)到達(dá)時(shí),無(wú)需等待線程創(chuàng)建即可立即執(zhí)行。
  2. 降低資源消耗:通過(guò)池化技術(shù)重復(fù)利用已創(chuàng)建的線程,降低線程創(chuàng)建和銷毀造成的損耗。
  3. 提高線程的可管理性:線程是稀缺資源,如果無(wú)限制創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)因?yàn)榫€程的不合理分布導(dǎo)致資源調(diào)度失衡,降低系統(tǒng)的穩(wěn)定性。使用線程池可以進(jìn)行統(tǒng)一的分配、調(diào)優(yōu)和監(jiān)控。

然彼此獨(dú)立創(chuàng)建線程池的話,會(huì)大打折扣。
如何既復(fù)用協(xié)程的線程池,又自主控制并發(fā)呢?
一個(gè)辦法就是套隊(duì)列來(lái)控制并發(fā),然后還是任務(wù)還是執(zhí)行在線程池之上。
AsyncTask 就是這樣實(shí)現(xiàn)的:

private static class SerialExecutor implements Executor {
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;

    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }

    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}

用SerialExecutor的execute的任務(wù)會(huì)先進(jìn)入隊(duì)列,當(dāng)mActive為空時(shí)從隊(duì)列獲取任務(wù)賦值給mActive然后通過(guò)線程池 THREAD_POOL_EXECUTOR執(zhí)行。
當(dāng)然AsyncTask 的SerialExecutor是全局唯一的,所以會(huì)有上面提到的各種任務(wù)相互堵塞的問(wèn)題。可以通過(guò)創(chuàng)建不同是的SerialExecutor實(shí)例來(lái)達(dá)到各業(yè)務(wù)各自串行。

在Kotlin環(huán)境下,我們可以利用協(xié)程和Channel來(lái)實(shí)現(xiàn):

fun Channel<Any>.runBlock(block: suspend CoroutineScope.() -> Unit) {
    CoroutineScope(Dispatchers.Unconfined).launch {
        send(0)
        CoroutineScope(Dispatchers.IO).launch {
            block()
            receive()
        }
    }
}

// 使用方法
private val serialChannel = Channel<Any>(1)
serialChannel.runBlock {
    // do somthing
}

添加Log編寫測(cè)試如下:

private val a = AtomicInteger(0)
private val b = AtomicInteger(0)
fun Channel<Any>.runBlock(block: suspend CoroutineScope.() -> Unit) {
    CoroutineScope(Dispatchers.Unconfined).launch {
        Log.d("MyTag", "before send " + a.getAndIncrement() + getTime())
        send(0)
        Log.i("MyTag", "after send " + b.getAndIncrement() + getTime())
        CoroutineScope(Dispatchers.Default).launch {
            block()
            receive()
        }
    }
}

private fun test() {
    // 并發(fā)限制為1,串行執(zhí)行任務(wù)
    val channel = Channel<Any>(1)
    val t1 = System.currentTimeMillis()
    repeat(4) { x ->
        channel.runBlock {
            Thread.sleep(1000L)
            Log.w("MyTag", "$x done job" + getTime())
        }
    }

    CoroutineScope(Dispatchers.Default).launch {
        while (!channel.isEmpty) {
            delay(200)
        }
        val t2 = System.currentTimeMillis()
        Log.d("MyTag", "Jobs all done, use time:" + (t2 - t1))
    }
}

執(zhí)行結(jié)果:

第一個(gè)任務(wù)可以順利通過(guò)send(), 而隨后的任務(wù)被suspend, 直到前面的任務(wù)執(zhí)行完(執(zhí)行block),調(diào)用recevie(), 然后下一個(gè)任務(wù)通過(guò)send() ……依此類推。
最終,消耗4s完成任務(wù)。

如果Channel的參數(shù)改成2,則能有兩個(gè)任務(wù)可以通過(guò)send() :

最終,消耗2s完成任務(wù)。

關(guān)于參數(shù)可以參考Channel的構(gòu)造函數(shù):

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> RendezvousChannel()
        UNLIMITED -> LinkedListChannel()
        CONFLATED -> ConflatedChannel()
        BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY)
        else -> ArrayChannel(capacity)
    }

在前面的實(shí)現(xiàn)中, 我們關(guān)注UNLIMITED, BUFFERED 以及 capacity > 0 的情況即可:

  • UNLIMITED: 不做限制;
  • BUFFERED: 并發(fā)數(shù)由 kotlin "kotlinx.coroutines.channels.defaultBuffer"決定,目前測(cè)試得到8;
  • capacity > 0, 則并發(fā)數(shù)由 capacity 決定;
  • 特別地,當(dāng)capacity = 1,為串行調(diào)度。

不過(guò),[Dispatchers.IO] 本身有并發(fā)限制(目前版本是64),
所有對(duì)于 Channel.UNLIMITED 和 capacity > 64 的情況,和capacity=64的情況是相同的。
我們可以為不同的業(yè)務(wù)創(chuàng)建不同的Channel實(shí)例,從而各自控制并發(fā)且最終在協(xié)程的線程池上執(zhí)行任務(wù)。
簡(jiǎn)要示意圖如下:

為了簡(jiǎn)化,我們假設(shè)Dispatchers的并發(fā)限制為4。

  • 不同Channel有各自的buffer, 當(dāng)任務(wù)小于capacity時(shí)進(jìn)入buffer, 大于capacity時(shí)新任務(wù)被suspend。
  • Dispatchers 不斷地執(zhí)行任務(wù)然后調(diào)用receive(), 上面的實(shí)現(xiàn)中,receive并非要取什么信息,僅僅是讓channel空出buffer, 好讓被suspend的任務(wù)可以通過(guò)send()然后進(jìn)入Dispatchers的調(diào)度。
  • 極端情況下(進(jìn)入Disptachers的任務(wù)大于并發(fā)限制時(shí)),任務(wù)進(jìn)入Dispatchers也不會(huì)被立即執(zhí)行,這個(gè)設(shè)定可以避免開(kāi)啟的線程太多而陷于線程上下文頻繁切換的困境。

通過(guò)Channel可以實(shí)現(xiàn)并發(fā)的控制,但是日常開(kāi)發(fā)中有的地方并不是簡(jiǎn)單地執(zhí)行個(gè)任務(wù),而是需要一個(gè)ExecutorService或者Executor。
我們可以通過(guò)Channel封裝一下:

fun Channel<Any>.runBlock(block: suspend CoroutineScope.() -> Unit) {
    CoroutineScope(Dispatchers.Unconfined).launch {
        send(0)
        CoroutineScope(Dispatchers.IO).launch {
            block()
            receive()
        }
    }
}


class ChannelExecutor(capacity: Int) : Executor {
    private val channel = Channel<Any>(capacity)

    override fun execute(command: Runnable) {
        channel.runBlock {
            command.run()
        }
    }
}


class ChannelExecutorService(capacity: Int) : AbstractExecutorService() {
    private val channel = Channel<Any>(capacity)

    override fun execute(command: Runnable) {
        channel.runBlock {
            command.run()
        }
    }

    fun isEmpty(): Boolean {
        return channel.isEmpty || channel.isClosedForReceive
    }

    override fun shutdown() {
        channel.close()
    }

    override fun shutdownNow(): MutableList<Runnable> {
        shutdown()
        return mutableListOf()
    }

    @ExperimentalCoroutinesApi
    override fun isShutdown(): Boolean {
        return channel.isClosedForSend
    }

    @ExperimentalCoroutinesApi
    override fun isTerminated(): Boolean {
        return channel.isClosedForReceive
    }

    override fun awaitTermination(timeout: Long, unit: TimeUnit): Boolean {
        var millis = unit.toMillis(timeout)
        while (!isTerminated && millis > 0) {
            try {
                Thread.sleep(200L)
                millis -= 200L
            } catch (ignore: Exception) {
            }
        }
        return isTerminated
    }
}

需要簡(jiǎn)單地控制并發(fā)的地方,直接定義Channel然后調(diào)用runBlock即可;
需要Executor的地方,可創(chuàng)建ChannelExecutor來(lái)執(zhí)行。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容