Java多線程--JDK并發(fā)包(2)
線程池
在使用線程池后,創(chuàng)建線程變成了從線程池里獲得空閑線程,關(guān)閉線程變成了將線程歸壞給線程池。
JDK有一套Executor框架,大概包括Executor、ExecutorService、AbstractExeccutorService、ThreadPoolExecutor、Executors等成員,位于java.util.concurrent包下。它們之間的關(guān)系如下:
Executor是頂層的接口,ExecutorService接口繼承了它,AbstrctExecutorService繼承了ExecutorService,ThreadPoolExecutor繼承了AbstrctExecutorService。如果用<——表示繼承,<--表示實(shí)現(xiàn)接口,它們的關(guān)系可表示如下:
Executor(接口) <—— ExecutorService(接口) <-- AbstrctExecutorService(抽象類) <—— ThreadPoolExecutor(類)
Executors是單獨(dú)的一個(gè)類,可以看成是“線程池工廠”,它有很多靜態(tài)方法,比如:
- newFixedThreadPool(int nThread)
- newSingleThreadExecutor()
- newCachedThreadPool()
- newSingleThreadScheduledExecutor()
- newScheduledThreadPool(int corePoolSize)
newFixedThreadPool該方法返回一個(gè)固定線程數(shù)的線程池。當(dāng)有新任務(wù)提交時(shí),如果線程池中有空閑線程就立即執(zhí)行,否則會(huì)進(jìn)入任務(wù)隊(duì)列中,等到有空閑線程了才能執(zhí)行。
newSingleThreadExecutor,該方法返回只有一個(gè)線程的線程池,處理策略和上面一樣。實(shí)際上就是上面的參數(shù)指定為1而已。
newCachedThreadPool該方法返回一個(gè)可根據(jù)實(shí)際情況調(diào)整線程數(shù)的線程池,任務(wù)提交后,如果有空閑線程可以復(fù)用,則優(yōu)先復(fù)用。若線程池中的線程全部在工作,而此時(shí)有新任務(wù),則會(huì)創(chuàng)建新的線程來(lái)處理任務(wù),所有線程執(zhí)行完后會(huì)將線程歸還給線程池。
newScheduledThreadPool返回一個(gè)ScheduledExecutorService對(duì)象,可以有計(jì)劃地執(zhí)行任務(wù),比如在某個(gè)延時(shí)之后開始執(zhí)行,或者周期性地執(zhí)行某個(gè)任務(wù)??梢灾付ň€程數(shù)量。
newSingleThreadScheduledExecutor實(shí)現(xiàn)了和上面一樣的功能,不過(guò)線程池的大小為1。
ScheduledExecutorService有三個(gè)方法可以有計(jì)劃地執(zhí)行任務(wù)。如:
-
schedule(Runnable command, long delay, TimeUnit unit);該方法可以在給定的延時(shí)后,執(zhí)行一個(gè)任務(wù); -
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);該方法以任務(wù)開始執(zhí)行的時(shí)間為initialDelay,加上周期period,就是下一個(gè)任務(wù)開始執(zhí)行的時(shí)間,以此類推,因此這個(gè)方法任務(wù)調(diào)度的頻率是一定的; -
scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);該方法表示每執(zhí)行完一個(gè)任務(wù),延遲delay的時(shí)間后,開始執(zhí)行下一個(gè)任務(wù),initialDelay還是表示任務(wù)開始的初始時(shí)延,上一個(gè)任務(wù)結(jié)束的時(shí)間點(diǎn)與下一個(gè)任務(wù)開始的時(shí)間點(diǎn)之差是固定的,固定為delay。
即使單個(gè)任務(wù)的執(zhí)行時(shí)間超過(guò)調(diào)度周期,scheduleAtFixedRate也不會(huì)讓多個(gè)任務(wù)堆疊,比如任務(wù)執(zhí)行需要8s,而調(diào)度周期是2s,調(diào)度第二個(gè)任務(wù)時(shí),第一個(gè)還沒(méi)執(zhí)行完,因此為了避免任務(wù)堆疊,此時(shí)調(diào)度周期會(huì)變成8s;而采用scheduleWithFixedDelay,兩個(gè)任務(wù)之間的實(shí)際間隔會(huì)變成10s,8s的執(zhí)行+2s的delay。
線程池的內(nèi)部實(shí)現(xiàn)
- newFixedThreadPool(int nThread)
- newSingleThreadExecutor()
- newCachedThreadPool()
這三個(gè)內(nèi)部都是通過(guò)返回ThreadPoolExecutor產(chǎn)生線程池的。所以我們來(lái)重點(diǎn)關(guān)注它的構(gòu)造方法。
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize表示線程池中的線程數(shù);
- maximumPoolSize表示線程池中的最大線程數(shù);
- keepAliveTime表示當(dāng)線程數(shù)超過(guò)corePoolSize時(shí),多余的空閑線程的存活時(shí)間;
- unit是keepAliveTime的單位
- workQueue任務(wù)隊(duì)列,保存那些已經(jīng)提交但還沒(méi)有開始執(zhí)行的任務(wù)(在等待空閑線程);
- threadFactory,線程工廠,可自定義,一般默認(rèn);
- handler拒絕策略,當(dāng)任務(wù)多得來(lái)不及處理時(shí),如何拒絕任務(wù)。
workQueue是一個(gè)BlockingQueue接口的對(duì)象,存放的是Runnable對(duì)象。根據(jù)功能的不同,ThreadPoolExecutor中可以使用以下幾種BlockingQueue
- 直接提交的隊(duì)列:對(duì)應(yīng)SynchronousQueue對(duì)象,它沒(méi)有容量,每一個(gè)插入都要等待一個(gè)相應(yīng)的刪除操作;每一個(gè)刪除操作都要等待對(duì)應(yīng)的插入操作。使用該對(duì)象,提交的任務(wù)不會(huì)被真實(shí)保存,而總是將任務(wù)交給線程執(zhí)行。如果沒(méi)有空閑線程就創(chuàng)建新線程,如果線程數(shù)已經(jīng)達(dá)到最大值,就執(zhí)行拒絕策略。
- 有界的任務(wù)隊(duì)列:使用ArrayBlockingQueue實(shí)現(xiàn)。當(dāng)有任務(wù)提交時(shí),判斷線程池中當(dāng)前的實(shí)際線程數(shù),如果小于corePoolSize,則優(yōu)先創(chuàng)建新線程;若大于corePoolSize,就將任務(wù)加入到等待隊(duì)列中;若此時(shí)等待隊(duì)列也滿,創(chuàng)建新線程;若實(shí)際線程已經(jīng)達(dá)到maxPoolSize,就開始執(zhí)行拒絕策略??梢钥闯鲇薪绲娜蝿?wù)隊(duì)列只有在任務(wù)隊(duì)列滿時(shí),才會(huì)創(chuàng)建新線程,通常情況下實(shí)際線程數(shù)可以穩(wěn)定在corePoolSize。
- 無(wú)界的任務(wù)隊(duì)列:使用LinkedBlockingQueue實(shí)現(xiàn)。和上面ArrayBlockingQueue相比,區(qū)別在于,任務(wù)隊(duì)列沒(méi)有大小限制,當(dāng)實(shí)際線程數(shù)超過(guò)corePoolSize時(shí),直接進(jìn)入任務(wù)隊(duì)列。
- 優(yōu)先任務(wù)隊(duì)列:使用PriorityBlockingQueue實(shí)現(xiàn)。前面的幾種都是按照先進(jìn)先出的順序來(lái)處理任務(wù),而該對(duì)象實(shí)現(xiàn)的任務(wù)隊(duì)列可根據(jù)任務(wù)自身的優(yōu)先級(jí)順序執(zhí)行。
newFixedThreadPool因?yàn)樗腸orePoolSize和maxPoolSize大小一樣,固定大小的線程不存在當(dāng)實(shí)際線程數(shù)超過(guò)corePoolSize時(shí)要新增線程的可能,所以它使用了LinkedBlockingQueue,當(dāng)有新任務(wù)且實(shí)際線程數(shù)已經(jīng)達(dá)到最大時(shí),會(huì)直接進(jìn)入等待隊(duì)列。
newSingleThreadExecutor是newFixedThreadPool的一種特殊情況,即取corePoolSize和maxPoolSize都為1
而newCachedThreadPool的corePoolSize為0,maxPoolSize為Integer.MAX_VALUE,任務(wù)隊(duì)列使用SynchronousQueue直接提交,新任務(wù)提交后,若有空閑線程就直接用,若沒(méi)有就進(jìn)入等待隊(duì)列——但是這是個(gè)直接提交的隊(duì)列,所有會(huì)新增線程執(zhí)行該任務(wù)!由于corePoolSize為0,所以任務(wù)執(zhí)行完畢后60s(構(gòu)造函數(shù)指定)就會(huì)被回收。
拒絕策略
當(dāng)實(shí)際線程數(shù)超過(guò)maxPoolSize時(shí),該采取什么樣的策略?
- AbortPolicy:丟棄任務(wù)并拋出異常;
- CallerRunPolicy:該任務(wù)被線程池拒絕,由調(diào)用execute方法的線程執(zhí)行該任務(wù);
- DiscardOldestPolicy:丟棄最老的一個(gè),也就是馬上要執(zhí)行的一個(gè)任務(wù);
- DiscardPolicy:默默丟棄被拒絕的任務(wù),體現(xiàn)在代碼中就是什么也不做。
下面看看CallerRunPolicy怎么拒絕的
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
DiscardOldestPolicy是這樣做的
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll(); // 最老的一個(gè)請(qǐng)求在隊(duì)列頭部
e.execute(r);
}
}
線程的創(chuàng)建--線程工廠
ThreadFactory只有一個(gè)方法Thread newThread(Runnable r);,線程池中的線程就是由它創(chuàng)建的。
Fork/Join框架
fork也就是分支、分叉的意思,可以將大任務(wù)分解成小任務(wù);join表示等待的意思,必須等待fork后的小任務(wù)執(zhí)行完畢,得到執(zhí)行后的部分結(jié)果,才能將部分結(jié)果合并成最終結(jié)果。
比如計(jì)算1到10000的和,就可以分成10個(gè)分支,每個(gè)分支計(jì)算一千個(gè)數(shù)的和,得到一個(gè)部分和,等待這10個(gè)部分和的結(jié)果都計(jì)算完畢,最后將其全部合并,得到最終的結(jié)果。
通常一個(gè)物理線程需要處理多個(gè)邏輯任務(wù),所以每一個(gè)線程都有一個(gè)任務(wù)隊(duì)列。若線程A的任務(wù)都執(zhí)行完了,B還有很多任務(wù)沒(méi)執(zhí)行,此時(shí)A會(huì)“幫助”B執(zhí)行它的任務(wù),A幫助B執(zhí)行B的任務(wù)時(shí),從隊(duì)列的尾部拿數(shù)據(jù);而B自己執(zhí)行任務(wù)時(shí)從隊(duì)列頭部拿數(shù)據(jù),這就像是兩個(gè)指針一個(gè)往左移動(dòng)一個(gè)往右移動(dòng),避免了A、B之間對(duì)數(shù)據(jù)的競(jìng)爭(zhēng)。
JDK中有ForkJoinPool,該接口有個(gè)方法public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
ForkJoinTask支持fork()和join()方法,它有兩個(gè)重要的子類,沒(méi)有返回值的RecursiveAction和有返回值的RecursiveTask,它們都有個(gè)方法compute(),在這個(gè)方法中進(jìn)行主要的計(jì)算。對(duì)于RecursiveAction來(lái)說(shuō)簽名是void,而對(duì)于RecursiveTask來(lái)說(shuō)有返回值所以簽名是<T>
JDK并發(fā)容器
- ConcurrentHashMap:高效的并發(fā)HashMap,可看作線程安全的HashMap;
- CopyOnWriteArrayList:讀-讀,讀-寫不會(huì)阻塞,只有在寫-寫下會(huì)進(jìn)行同步。在讀多寫少的場(chǎng)合,性能很好;
- ConcurrentLinkedQueue:高效的并發(fā)隊(duì)列,鏈表實(shí)現(xiàn),使用了CAS操作(Compare and Swap),可看作線程安全的LinkedList;
- BlockingQueue:接口,實(shí)現(xiàn)了Queue;數(shù)組實(shí)現(xiàn)的ArrayBlockingQueue和鏈表實(shí)現(xiàn)的LinkedBlockingQueue實(shí)現(xiàn)了這個(gè)接口。
- ConcurrentSkipListMap:使用跳表的數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)的Map。
CopyOnWriteArrayList原理
CopyOnWriteArrayList的原理主要是:讀的時(shí)候正常讀,寫-寫需要同步,所以在寫之前要使用Lock,然后為了讀-寫不阻塞,CopyOnWriteArrayList在寫入操作時(shí),先將原數(shù)組復(fù)制一份,然后在新數(shù)組末尾追加要添加的值,添加成功后再用新數(shù)組覆蓋舊數(shù)組。
JDK中的該類的add方法是這樣實(shí)現(xiàn)的:
public boolean add(E e) {
final ReentrantLock lock = this.lock;
// 保證寫-寫阻塞,故進(jìn)行同步
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 關(guān)鍵!寫入之前先賦值一個(gè)副本
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 新數(shù)組的末尾添加
newElements[len] = e;
// 新數(shù)組覆蓋舊數(shù)組
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
而數(shù)組的定義是這樣的:
private transient volatile Object[] array;
注意有volatile關(guān)鍵字,說(shuō)明當(dāng)寫數(shù)據(jù)的線程修改數(shù)組后,其他讀取線程能立即“察覺(jué)”到。
BlockingQueue原理
BlockingQueue可以在并發(fā)環(huán)境下高效傳輸數(shù)據(jù),本質(zhì)上還是一個(gè)隊(duì)列,數(shù)據(jù)從隊(duì)列尾部入,從隊(duì)列頭部出。隊(duì)列都有的offer()和pull()就不說(shuō)了,沒(méi)什么特別的。BlockingQueue還有put()和take()方法,正是這兩個(gè)方法實(shí)現(xiàn)了阻塞。
以ArrayBlockingQueue來(lái)說(shuō):當(dāng)隊(duì)列為空時(shí),take()方法會(huì)等待,直到隊(duì)列不為空;當(dāng)隊(duì)列滿時(shí),put()方法會(huì)等待,直到隊(duì)列有空閑位置。這是怎么實(shí)現(xiàn)的呢?來(lái)看代碼
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
首先讀和寫都用的同一個(gè)鎖lock,因此任何時(shí)候讀和寫只能有一個(gè)在執(zhí)行。然后是條件notNull,等待非滿,以便put;notEmpty等待非空,以便take。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 關(guān)鍵,若隊(duì)列滿了,就等待
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 關(guān)鍵!一旦插入了數(shù)據(jù),隊(duì)列就不是非空了,于是喚醒在notEmpty上等待的線程(通知其他線程可以進(jìn)行take啦)
notEmpty.signal();
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 關(guān)鍵!若隊(duì)列為空,等待
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 關(guān)鍵!有元素出列了,等待在notFull上的線程可以被喚醒,可以進(jìn)行put操作了
notFull.signal();
return x;
}
LinkedBlockingQueue和ArrayBlockingQueue原理大同小異,不過(guò)LinkedBlockingQueue讀和寫分別用一把鎖,因此讀和寫可以同時(shí)進(jìn)行。
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
跳表
ConcurrentSkipMap使用跳表實(shí)現(xiàn)。是一種可以進(jìn)行快速查找的數(shù)據(jù)結(jié)構(gòu),時(shí)間復(fù)雜度是$O(lg n)$
跳表形象點(diǎn)說(shuō)像個(gè)“直角三角形一樣的金字塔”,每一層都是一條鏈表,最底層的鏈表包含了Map中的所有數(shù)據(jù),每上一層都是下面一層的子集,越到上面結(jié)點(diǎn)越少。層與層之間通過(guò)值相同的元素鏈接起來(lái),因此結(jié)點(diǎn)除了有指向本層的下一個(gè)結(jié)點(diǎn)的right,還有指向下層中具有相同值的元素的down(實(shí)際上通過(guò)數(shù)據(jù)結(jié)構(gòu)Index表示)。另外,跳表中所有鏈表的元素都是排序的。
查找時(shí),先從頂層開始查找,如果找到就結(jié)束了;否則當(dāng)發(fā)現(xiàn)查找的值大于當(dāng)前層的最大值(鏈表末尾),就會(huì)“跳到”下一層鏈表接著向前查找,查找朝著下面和右面兩個(gè)方向進(jìn)行,有點(diǎn)像“下臺(tái)階”...
by @sunhaiyu
2108.4.26