Java多線程--JDK并發(fā)包(2)

Java多線程--JDK并發(fā)包(2)

線程池

在使用線程池后,創(chuàng)建線程變成了從線程池里獲得空閑線程,關(guān)閉線程變成了將線程歸壞給線程池。

JDK有一套Executor框架,大概包括Executor、ExecutorService、AbstractExeccutorService、ThreadPoolExecutor、Executors等成員,位于java.util.concurrent包下。它們之間的關(guān)系如下:

Executor是頂層的接口,ExecutorService接口繼承了它,AbstrctExecutorService繼承了ExecutorService,ThreadPoolExecutor繼承了AbstrctExecutorService。如果用<——表示繼承,<--表示實(shí)現(xiàn)接口,它們的關(guān)系可表示如下:

Executor(接口) <—— ExecutorService(接口) <-- AbstrctExecutorService(抽象類) <—— ThreadPoolExecutor(類)

Executors是單獨(dú)的一個(gè)類,可以看成是“線程池工廠”,它有很多靜態(tài)方法,比如:

  • newFixedThreadPool(int nThread)
  • newSingleThreadExecutor()
  • newCachedThreadPool()
  • newSingleThreadScheduledExecutor()
  • newScheduledThreadPool(int corePoolSize)

newFixedThreadPool該方法返回一個(gè)固定線程數(shù)的線程池。當(dāng)有新任務(wù)提交時(shí),如果線程池中有空閑線程就立即執(zhí)行,否則會(huì)進(jìn)入任務(wù)隊(duì)列中,等到有空閑線程了才能執(zhí)行。

newSingleThreadExecutor,該方法返回只有一個(gè)線程的線程池,處理策略和上面一樣。實(shí)際上就是上面的參數(shù)指定為1而已。

newCachedThreadPool該方法返回一個(gè)可根據(jù)實(shí)際情況調(diào)整線程數(shù)的線程池,任務(wù)提交后,如果有空閑線程可以復(fù)用,則優(yōu)先復(fù)用。若線程池中的線程全部在工作,而此時(shí)有新任務(wù),則會(huì)創(chuàng)建新的線程來(lái)處理任務(wù),所有線程執(zhí)行完后會(huì)將線程歸還給線程池。

newScheduledThreadPool返回一個(gè)ScheduledExecutorService對(duì)象,可以有計(jì)劃地執(zhí)行任務(wù),比如在某個(gè)延時(shí)之后開始執(zhí)行,或者周期性地執(zhí)行某個(gè)任務(wù)??梢灾付ň€程數(shù)量。

newSingleThreadScheduledExecutor實(shí)現(xiàn)了和上面一樣的功能,不過(guò)線程池的大小為1。

ScheduledExecutorService有三個(gè)方法可以有計(jì)劃地執(zhí)行任務(wù)。如:

  • schedule(Runnable command, long delay, TimeUnit unit);該方法可以在給定的延時(shí)后,執(zhí)行一個(gè)任務(wù);
  • scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);該方法以任務(wù)開始執(zhí)行的時(shí)間為initialDelay,加上周期period,就是下一個(gè)任務(wù)開始執(zhí)行的時(shí)間,以此類推,因此這個(gè)方法任務(wù)調(diào)度的頻率是一定的;
  • scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);該方法表示每執(zhí)行完一個(gè)任務(wù),延遲delay的時(shí)間后,開始執(zhí)行下一個(gè)任務(wù),initialDelay還是表示任務(wù)開始的初始時(shí)延,上一個(gè)任務(wù)結(jié)束的時(shí)間點(diǎn)與下一個(gè)任務(wù)開始的時(shí)間點(diǎn)之差是固定的,固定為delay。

即使單個(gè)任務(wù)的執(zhí)行時(shí)間超過(guò)調(diào)度周期,scheduleAtFixedRate也不會(huì)讓多個(gè)任務(wù)堆疊,比如任務(wù)執(zhí)行需要8s,而調(diào)度周期是2s,調(diào)度第二個(gè)任務(wù)時(shí),第一個(gè)還沒(méi)執(zhí)行完,因此為了避免任務(wù)堆疊,此時(shí)調(diào)度周期會(huì)變成8s;而采用scheduleWithFixedDelay,兩個(gè)任務(wù)之間的實(shí)際間隔會(huì)變成10s,8s的執(zhí)行+2s的delay。

線程池的內(nèi)部實(shí)現(xiàn)

  • newFixedThreadPool(int nThread)
  • newSingleThreadExecutor()
  • newCachedThreadPool()

這三個(gè)內(nèi)部都是通過(guò)返回ThreadPoolExecutor產(chǎn)生線程池的。所以我們來(lái)重點(diǎn)關(guān)注它的構(gòu)造方法。

public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler)

  • corePoolSize表示線程池中的線程數(shù);
  • maximumPoolSize表示線程池中的最大線程數(shù);
  • keepAliveTime表示當(dāng)線程數(shù)超過(guò)corePoolSize時(shí),多余的空閑線程的存活時(shí)間;
  • unit是keepAliveTime的單位
  • workQueue任務(wù)隊(duì)列,保存那些已經(jīng)提交但還沒(méi)有開始執(zhí)行的任務(wù)(在等待空閑線程);
  • threadFactory,線程工廠,可自定義,一般默認(rèn);
  • handler拒絕策略,當(dāng)任務(wù)多得來(lái)不及處理時(shí),如何拒絕任務(wù)。

workQueue是一個(gè)BlockingQueue接口的對(duì)象,存放的是Runnable對(duì)象。根據(jù)功能的不同,ThreadPoolExecutor中可以使用以下幾種BlockingQueue

  • 直接提交的隊(duì)列:對(duì)應(yīng)SynchronousQueue對(duì)象,它沒(méi)有容量,每一個(gè)插入都要等待一個(gè)相應(yīng)的刪除操作;每一個(gè)刪除操作都要等待對(duì)應(yīng)的插入操作。使用該對(duì)象,提交的任務(wù)不會(huì)被真實(shí)保存,而總是將任務(wù)交給線程執(zhí)行。如果沒(méi)有空閑線程就創(chuàng)建新線程,如果線程數(shù)已經(jīng)達(dá)到最大值,就執(zhí)行拒絕策略。
  • 有界的任務(wù)隊(duì)列:使用ArrayBlockingQueue實(shí)現(xiàn)。當(dāng)有任務(wù)提交時(shí),判斷線程池中當(dāng)前的實(shí)際線程數(shù),如果小于corePoolSize,則優(yōu)先創(chuàng)建新線程;若大于corePoolSize,就將任務(wù)加入到等待隊(duì)列中;若此時(shí)等待隊(duì)列也滿,創(chuàng)建新線程;若實(shí)際線程已經(jīng)達(dá)到maxPoolSize,就開始執(zhí)行拒絕策略??梢钥闯鲇薪绲娜蝿?wù)隊(duì)列只有在任務(wù)隊(duì)列滿時(shí),才會(huì)創(chuàng)建新線程,通常情況下實(shí)際線程數(shù)可以穩(wěn)定在corePoolSize。
  • 無(wú)界的任務(wù)隊(duì)列:使用LinkedBlockingQueue實(shí)現(xiàn)。和上面ArrayBlockingQueue相比,區(qū)別在于,任務(wù)隊(duì)列沒(méi)有大小限制,當(dāng)實(shí)際線程數(shù)超過(guò)corePoolSize時(shí),直接進(jìn)入任務(wù)隊(duì)列。
  • 優(yōu)先任務(wù)隊(duì)列:使用PriorityBlockingQueue實(shí)現(xiàn)。前面的幾種都是按照先進(jìn)先出的順序來(lái)處理任務(wù),而該對(duì)象實(shí)現(xiàn)的任務(wù)隊(duì)列可根據(jù)任務(wù)自身的優(yōu)先級(jí)順序執(zhí)行。

newFixedThreadPool因?yàn)樗腸orePoolSize和maxPoolSize大小一樣,固定大小的線程不存在當(dāng)實(shí)際線程數(shù)超過(guò)corePoolSize時(shí)要新增線程的可能,所以它使用了LinkedBlockingQueue,當(dāng)有新任務(wù)且實(shí)際線程數(shù)已經(jīng)達(dá)到最大時(shí),會(huì)直接進(jìn)入等待隊(duì)列。

newSingleThreadExecutor是newFixedThreadPool的一種特殊情況,即取corePoolSize和maxPoolSize都為1

而newCachedThreadPool的corePoolSize為0,maxPoolSize為Integer.MAX_VALUE,任務(wù)隊(duì)列使用SynchronousQueue直接提交,新任務(wù)提交后,若有空閑線程就直接用,若沒(méi)有就進(jìn)入等待隊(duì)列——但是這是個(gè)直接提交的隊(duì)列,所有會(huì)新增線程執(zhí)行該任務(wù)!由于corePoolSize為0,所以任務(wù)執(zhí)行完畢后60s(構(gòu)造函數(shù)指定)就會(huì)被回收。

拒絕策略

當(dāng)實(shí)際線程數(shù)超過(guò)maxPoolSize時(shí),該采取什么樣的策略?

  • AbortPolicy:丟棄任務(wù)并拋出異常;
  • CallerRunPolicy:該任務(wù)被線程池拒絕,由調(diào)用execute方法的線程執(zhí)行該任務(wù);
  • DiscardOldestPolicy:丟棄最老的一個(gè),也就是馬上要執(zhí)行的一個(gè)任務(wù);
  • DiscardPolicy:默默丟棄被拒絕的任務(wù),體現(xiàn)在代碼中就是什么也不做。

下面看看CallerRunPolicy怎么拒絕的

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }

DiscardOldestPolicy是這樣做的

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll(); // 最老的一個(gè)請(qǐng)求在隊(duì)列頭部
            e.execute(r);
        }
    }

線程的創(chuàng)建--線程工廠

ThreadFactory只有一個(gè)方法Thread newThread(Runnable r);,線程池中的線程就是由它創(chuàng)建的。

Fork/Join框架

fork也就是分支、分叉的意思,可以將大任務(wù)分解成小任務(wù);join表示等待的意思,必須等待fork后的小任務(wù)執(zhí)行完畢,得到執(zhí)行后的部分結(jié)果,才能將部分結(jié)果合并成最終結(jié)果。

比如計(jì)算1到10000的和,就可以分成10個(gè)分支,每個(gè)分支計(jì)算一千個(gè)數(shù)的和,得到一個(gè)部分和,等待這10個(gè)部分和的結(jié)果都計(jì)算完畢,最后將其全部合并,得到最終的結(jié)果。

通常一個(gè)物理線程需要處理多個(gè)邏輯任務(wù),所以每一個(gè)線程都有一個(gè)任務(wù)隊(duì)列。若線程A的任務(wù)都執(zhí)行完了,B還有很多任務(wù)沒(méi)執(zhí)行,此時(shí)A會(huì)“幫助”B執(zhí)行它的任務(wù),A幫助B執(zhí)行B的任務(wù)時(shí),從隊(duì)列的尾部拿數(shù)據(jù);而B自己執(zhí)行任務(wù)時(shí)從隊(duì)列頭部拿數(shù)據(jù),這就像是兩個(gè)指針一個(gè)往左移動(dòng)一個(gè)往右移動(dòng),避免了A、B之間對(duì)數(shù)據(jù)的競(jìng)爭(zhēng)。

JDK中有ForkJoinPool,該接口有個(gè)方法public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)

ForkJoinTask支持fork()join()方法,它有兩個(gè)重要的子類,沒(méi)有返回值的RecursiveAction和有返回值的RecursiveTask,它們都有個(gè)方法compute(),在這個(gè)方法中進(jìn)行主要的計(jì)算。對(duì)于RecursiveAction來(lái)說(shuō)簽名是void,而對(duì)于RecursiveTask來(lái)說(shuō)有返回值所以簽名是<T>

JDK并發(fā)容器

  • ConcurrentHashMap:高效的并發(fā)HashMap,可看作線程安全的HashMap;
  • CopyOnWriteArrayList:讀-讀,讀-寫不會(huì)阻塞,只有在寫-寫下會(huì)進(jìn)行同步。在讀多寫少的場(chǎng)合,性能很好;
  • ConcurrentLinkedQueue:高效的并發(fā)隊(duì)列,鏈表實(shí)現(xiàn),使用了CAS操作(Compare and Swap),可看作線程安全的LinkedList;
  • BlockingQueue:接口,實(shí)現(xiàn)了Queue;數(shù)組實(shí)現(xiàn)的ArrayBlockingQueue和鏈表實(shí)現(xiàn)的LinkedBlockingQueue實(shí)現(xiàn)了這個(gè)接口。
  • ConcurrentSkipListMap:使用跳表的數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)的Map。

CopyOnWriteArrayList原理

CopyOnWriteArrayList的原理主要是:讀的時(shí)候正常讀,寫-寫需要同步,所以在寫之前要使用Lock,然后為了讀-寫不阻塞,CopyOnWriteArrayList在寫入操作時(shí),先將原數(shù)組復(fù)制一份,然后在新數(shù)組末尾追加要添加的值,添加成功后再用新數(shù)組覆蓋舊數(shù)組

JDK中的該類的add方法是這樣實(shí)現(xiàn)的:

public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    // 保證寫-寫阻塞,故進(jìn)行同步
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        // 關(guān)鍵!寫入之前先賦值一個(gè)副本
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        // 新數(shù)組的末尾添加
        newElements[len] = e;
        // 新數(shù)組覆蓋舊數(shù)組
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

而數(shù)組的定義是這樣的:

 private transient volatile Object[] array;

注意有volatile關(guān)鍵字,說(shuō)明當(dāng)寫數(shù)據(jù)的線程修改數(shù)組后,其他讀取線程能立即“察覺(jué)”到。

BlockingQueue原理

BlockingQueue可以在并發(fā)環(huán)境下高效傳輸數(shù)據(jù),本質(zhì)上還是一個(gè)隊(duì)列,數(shù)據(jù)從隊(duì)列尾部入,從隊(duì)列頭部出。隊(duì)列都有的offer()pull()就不說(shuō)了,沒(méi)什么特別的。BlockingQueue還有put()take()方法,正是這兩個(gè)方法實(shí)現(xiàn)了阻塞。

以ArrayBlockingQueue來(lái)說(shuō):當(dāng)隊(duì)列為空時(shí),take()方法會(huì)等待,直到隊(duì)列不為空;當(dāng)隊(duì)列滿時(shí),put()方法會(huì)等待,直到隊(duì)列有空閑位置。這是怎么實(shí)現(xiàn)的呢?來(lái)看代碼

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

首先讀和寫都用的同一個(gè)鎖lock,因此任何時(shí)候讀和寫只能有一個(gè)在執(zhí)行。然后是條件notNull,等待非滿,以便put;notEmpty等待非空,以便take。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 關(guān)鍵,若隊(duì)列滿了,就等待
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 關(guān)鍵!一旦插入了數(shù)據(jù),隊(duì)列就不是非空了,于是喚醒在notEmpty上等待的線程(通知其他線程可以進(jìn)行take啦)
    notEmpty.signal();
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 關(guān)鍵!若隊(duì)列為空,等待
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // 關(guān)鍵!有元素出列了,等待在notFull上的線程可以被喚醒,可以進(jìn)行put操作了
    notFull.signal();
    return x;
}

LinkedBlockingQueue和ArrayBlockingQueue原理大同小異,不過(guò)LinkedBlockingQueue讀和寫分別用一把鎖,因此讀和寫可以同時(shí)進(jìn)行。

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

跳表

ConcurrentSkipMap使用跳表實(shí)現(xiàn)。是一種可以進(jìn)行快速查找的數(shù)據(jù)結(jié)構(gòu),時(shí)間復(fù)雜度是$O(lg n)$

跳表形象點(diǎn)說(shuō)像個(gè)“直角三角形一樣的金字塔”,每一層都是一條鏈表,最底層的鏈表包含了Map中的所有數(shù)據(jù),每上一層都是下面一層的子集,越到上面結(jié)點(diǎn)越少。層與層之間通過(guò)值相同的元素鏈接起來(lái),因此結(jié)點(diǎn)除了有指向本層的下一個(gè)結(jié)點(diǎn)的right,還有指向下層中具有相同值的元素的down(實(shí)際上通過(guò)數(shù)據(jù)結(jié)構(gòu)Index表示)。另外,跳表中所有鏈表的元素都是排序的

查找時(shí),先從頂層開始查找,如果找到就結(jié)束了;否則當(dāng)發(fā)現(xiàn)查找的值大于當(dāng)前層的最大值(鏈表末尾),就會(huì)“跳到”下一層鏈表接著向前查找,查找朝著下面和右面兩個(gè)方向進(jìn)行,有點(diǎn)像“下臺(tái)階”...


by @sunhaiyu

2108.4.26

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 為什么使用線程池 當(dāng)我們?cè)谑褂镁€程時(shí),如果每次需要一個(gè)線程時(shí)都去創(chuàng)建一個(gè)線程,這樣實(shí)現(xiàn)起來(lái)很簡(jiǎn)單,但是會(huì)有一個(gè)問(wèn)題...
    閩越布衣閱讀 4,423評(píng)論 10 45
  • Java中對(duì)線程池提供了很好的支持,有了線程池,我們就不需要自已再去創(chuàng)建線程。如果并發(fā)的線程數(shù)量很多,并且每個(gè)線程...
    sunny4handsome閱讀 930評(píng)論 0 2
  • 【JAVA 線程】 線程 進(jìn)程:是一個(gè)正在執(zhí)行中的程序。每一個(gè)進(jìn)程執(zhí)行都有一個(gè)執(zhí)行順序。該順序是一個(gè)執(zhí)行路徑,或者...
    Rtia閱讀 2,894評(píng)論 2 20
  • 18號(hào) 葉春平 (簡(jiǎn)書:葉老巫) 圖、文:葉老巫 2016年11月16日注冊(cè)簡(jiǎn)書,開始日更。偶遇樊老師的文章,他的...
    葉兩步閱讀 562評(píng)論 16 39
  • “因?yàn)槲矣鲆?jiàn)你像一場(chǎng)虛擬的游戲 我認(rèn)識(shí)你也只是網(wǎng)路上一段訊息” ──林俊杰 《精靈》 “就算楊洋在屏幕里不動(dòng),老娘...
    我叫如歌閱讀 455評(píng)論 0 0

友情鏈接更多精彩內(nèi)容