Java7并發(fā)編程實戰(zhàn)手冊
線程管理
- Thread/Runnable/Thread.State
- 線程的信息獲取和設(shè)置
- 線程的中斷
- sleep/yield
- join
- daemon
- UncaughtExceptionHandler
- Thread#setDefaultUncaughtExceptionHandler
- ThreadLocal/InheritableThreadLocal
- ThreadGroup
- uncaughtException
- ThreadFactory
線程同步基礎(chǔ)
- synchronized 同步方法 this
- synchronized 屬性對象 object
- 同步代碼塊中使用條件
- Object#wait/notify/notifyAll
- 虛假喚醒(while)
- Lock
- ReentrantLock
- try/finally
- ReadWriteLock
- 公平性 fair
- Condition
- while
- lock/unlock之間
- ReentrantLock
線程同步輔助類
- Semaphore
- 內(nèi)部計數(shù)器
- acquire/release
- acquireUninterruptibly
- 忽略線程中斷且不會拋出任何異常
- CountDownLatch
- 內(nèi)部計數(shù)器被初始化之后就不能被再次初始化,唯一能改值的是countDown
- CylicBarrier
- BrokenBarrierException
- reset
- Phaser
- 在每一步結(jié)束的位置對線程進行同步,當(dāng)所有的線程都完成了這一步,才允許執(zhí)行下一步
- Phaser(3) 指定參與階段同步的線程數(shù)是3個
- 被Phaser類置于休眠的線程不會響應(yīng)中斷事件
- awaitAdvanceInterruptibly(int phaser),被中斷會拋出InterruptedException
- onAdavance,階段改變時被自動執(zhí)行
- Exchanger
- 只能同步兩個線程
- exchange調(diào)用后將休眠直到其他的線程到達
線程執(zhí)行器
- ThreadPoolExecutor/Executors
- shutdown,當(dāng)執(zhí)行完所有待運行的任務(wù)后,它將結(jié)束執(zhí)行,調(diào)用完畢立即返回,結(jié)合awaitTermination判斷是否線程池是否關(guān)閉
- awaitTermination(long timeout, TimeUnit unit)
- 提供很多方法獲取自身狀態(tài)的信息
- newCachedThreadPool/newFixedThreadPool
- Future/Callbale
- submit
- call() throws Exception
- ThreadPoolExecutor#invokeAny
- 返回第一個完成任務(wù)且沒有拋出異常的任務(wù)的執(zhí)行結(jié)果
- ThreadPoolExecutor#invokeAll
- 等待所有任務(wù)的完成
- landon:是否可以用這個方法做場景心跳
while(!isDone) { - long startTime = getCurrent() - 場景調(diào)度器提交場景任務(wù),等待所有場景任務(wù)執(zhí)行完畢(invokeAll),這樣亦可以保證潛在的順序問題,因為每次都是將當(dāng)前的tick執(zhí)行完畢 - 而非之前submit,如果一次tick執(zhí)行超過50ms則下次循環(huán)又會向線程池提交任務(wù),會出現(xiàn)同一個任務(wù)多個線程執(zhí)行的潛在情況 - long endTime = getCurrent() - long sleepTime = startTime + TickInterval - endTime - if(sleepTime > 0) sleep(sleepTime) - else println("busy,oneTick executeTime:" + sleepTime) }
- ScheduledThreadPoolExecutor
- schedule
- scheduleAtFixedRate
- scheduleWithFixDelay
- Future
- cancel
- get
- FutureTask
- done,允許在執(zhí)行器中的任務(wù)執(zhí)行結(jié)束之后還可以執(zhí)行一些代碼
- FutureTask<V> implements RunnableFuture<V>
- FutureTask(Callable<V> callable)
- CompletionService
- submit,提交任務(wù)
- poll/take,獲取任務(wù)已經(jīng)完成的Future對象
- 即任務(wù)完成后將Future對象放到一個完成的阻塞隊列中
- RejectedExecutionHandler
Fork/Join框架
-
分治
- fork-將一個任務(wù)拆分成更小的多個任務(wù)
- join-等待子任務(wù)的完成執(zhí)行
- 工作竊取算法-work-stealing algorithm
-
ForkJoinPool
- ForkJoinTask
- RecursiveAction 任務(wù)無返回結(jié)果
- RecursiveTask 任務(wù)有返回結(jié)果
- 遞歸
- ForkJoinTask
if(problem size > default size)
{
tasks = divide(task)
execute(tasks)
}
else {resolve problem using another algorithm}
- Task extends RecursiveAction // 無返回結(jié)果
- compute
if(...) // divide { Task t1 = new Task(...) Task t2 = new Task(...) invokeAll(t1,t2) // 同步調(diào)用,執(zhí)行創(chuàng)建的多個子任務(wù) } else {...}
- ForkJoinPool#execute(task) --默認創(chuàng)建一個線程數(shù)等于計算機cpu數(shù)目的線程池
- 合并任務(wù)的結(jié)果
if(problem size > default size) tasks = divide(task) execute(tasks) groupResults return result else resolve problem return result
- ForkJoinTask#get 等待返回任務(wù)計算結(jié)果
- 異步運行任務(wù)
- 同步方式如invokeAll,任務(wù)被掛起,直到任務(wù)被發(fā)送到fork/join線程池中執(zhí)行。該方式允許ForkJoinPool采用工作竊取算法
- 異步方式如fork時(立即返回),無法使用該算法
- ForkJoinTask#V join()
- get和join有區(qū)別
- 任務(wù)中拋出異常
- ForkJoinTask#isCompletedAbnormally 檢查主任務(wù)或者子任務(wù)是否拋出了異常
- getException 獲取異常信息
- 任務(wù)拋出運行時異常,會影響其父任務(wù)...父任務(wù)..
- 取消任務(wù)
- 在任務(wù)開始前可以取消
- 例:在數(shù)字數(shù)組中尋找一組數(shù)字,拆分為更小的問題,但僅關(guān)心數(shù)字的一次出現(xiàn)。當(dāng)我們找到他時,就會取消其他子任務(wù)
- 可存儲發(fā)送到線程池中的所有任務(wù),當(dāng)發(fā)現(xiàn)當(dāng)前任務(wù)找到數(shù)字后,取消非當(dāng)前任務(wù)的所有任務(wù)
- 如果任務(wù)正在運行或者已經(jīng)執(zhí)行結(jié)束,則不能取消,cancel返回false。因此可以嘗試去取消所有的任務(wù)而不用擔(dān)心可能帶來的間接影響
并發(fā)集合
- ConcurrentLinkedDeque 非阻塞
- getFirst.../peekFirst.../removeFirst.../pollFirst... - 雙端隊列
- LinkedBlockingDeque 阻塞
- put/take/poll... 雙端隊列
- PriorityBlockingQueue
- 隊列的元素必須實現(xiàn)Comparable接口
- 按照排序結(jié)果決定插入元素的位置
- DelayQueue
- 元素必須實現(xiàn)Delayed接口
- public interface Delayed extends Comparable<Delayed>
- 兩個待實現(xiàn)方法
- compareTo(Delayed o)
- getDelay(Timeunit unit)
- 從隊列取元素時,到期的元素會返回(未來的元素等待到期_延遲時間)
- landon:是否可以用于游戲服務(wù)器中的計時器如果有多個timer按照到期時間排隊_
- ConcurrentSkipListMap
- 根據(jù)鍵值排序所有元素
- 內(nèi)部機制-Skip List
- firstEntry/lastEntry/subMap/...
- landon:可以和TreeMap做比較,一個線程不安全,一個線程安全
- ThreadLocalRandom
- current,該方法是一個靜態(tài)方法,如果調(diào)用線程還未關(guān)聯(lián)隨機數(shù)對象,就會初始化一個(localInit)
- Atomic Variable
- compareAndSet,這是是最主要的方法
- 判斷內(nèi)存變量的值是否是expect,如果是說明未被其他線程改過,可以直接用update新值更新
- 否則說明被其他線程改過,進而可以選擇下一步處理方式
- AtomicReference#public final boolean compareAndSet(V expect, V update)
- CAS
- AtomicIntegerArray -原子數(shù)組
- compareAndSet,這是是最主要的方法
- LinkedTransferQueue
- AtomicReference--實現(xiàn)單例
public class Singleton {
private static final AtomicReference<Singleton> INSTANCE = new AtomicReference<Singleton>();
private Singleton (){}
public static Singleton getInstance() {
for (;;) {
Singleton current = INSTANCE.get();
if (current != null) {
return current;
}
current = new Singleton();
if (INSTANCE.compareAndSet(null, current)) {
return current;
}
}
}
}
定制并發(fā)類
- 定制ThreadPoolExecutor
- 繼承該類
- 覆寫_記得調(diào)用super
- shutdown
- shutdownNow
- 輸出如等待執(zhí)行的任務(wù)數(shù)目 getQueue().size...
- getCompletedTaskCount,獲得已執(zhí)行過的任務(wù)數(shù)
- getActiveCount,獲得正在執(zhí)行的任務(wù)數(shù)
- beforeExecute
- afterExecute
- 實現(xiàn)基于優(yōu)先級的Executor類
- ThreadPoolExector參數(shù)傳入PriorityBlockingQueue
- 如果是fixed兩個線程,那么前2個任務(wù)是被2個線程執(zhí)行的;后面的排隊任務(wù)按照優(yōu)先級順序執(zhí)行
- 實現(xiàn)ThreadFactory接口生成定制線程
- 覆寫newThread方法
- 返回的Thread可以是定制的Thread對象(MyThread extends Thread)
- 可外部直接調(diào)用Threadfactory#newThread返回線程
- 在Executor對象中使用ThreadFactory
- 線程池參數(shù)中指定線程工廠
- Executors內(nèi)部有一個DefaultThreadFactory
* Executors$DefaultThreadFactory
- 定制運行在定時線程池中的任務(wù)
- 繼承ScheduledThreadPoolExecutor
- 覆寫protected <V> RunnableScheduledFuture<V> decorateTask(
* Runnable runnable, RunnableScheduledFuture<V> task) - 可自定義一個調(diào)度類extends FutureTask<V> implements RunnableScheduledFuture<V>
- 參考ScheduledThreadPoolExecutor$ScheduledFutureTask
- 通過實現(xiàn)ThreadFactory接口為Fork/Join框架生成定制線程
- ForkJoinPool內(nèi)部實現(xiàn)
- 一個任務(wù)隊列,存放等待被執(zhí)行的任務(wù)
- 一個執(zhí)行這些任務(wù)的線程池
- ForkJoinWorkerThread持有一個ForkJoinPool.WorkQueue workQueue
* work-stealing mechanics
- MyWorkerThread extends ForkJoinWorkerThread
- onStart
- onTermination
- 調(diào)用super
- MyWorkerThreadFactory implements ForkJoinWorkerThreadFactory
- newThread
- ForkJoinPool內(nèi)部實現(xiàn)
- 定制運行在Fork/Join框架中的任務(wù)
- MyWorkTask extends ForkJoinTask(Void)
- getRawResult
- exec
- 調(diào)用compute抽象方法
- MyWorkTask extends ForkJoinTask(Void)
- 實現(xiàn)定制Lock類
- ReentrantLock內(nèi)部有一個很重要的類Sync(AQS)
- abstract static class Sync extends AbstractQueuedSynchronizer
- static final class FairSync extends Sync
- static final class NonfairSync extends Sync
- 自定義實現(xiàn)一個MyAbstractQueuedSynchronizer extends AbstractQueuedSynchronizer
- tryActuire
- tryRelease
- 自定義實現(xiàn)MyLock implements Lock
- lock
- unlock
- tryLock
- newCondition
- 實現(xiàn)基于優(yōu)先級的傳輸隊列
- MyPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E>
- tryTransfer
- transfer
- hasWaitingConsumer
- getWaitingConsumerCount
- take
- MyPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E>
- 實現(xiàn)自己的原子對象
- MyCounter extends AtomicInteger
for(;;)
{
int value = get();
if(value == 10) return false;
else
{
int newValue = value + 1;
boolean changed = compareAndSet(value,newValue);
if(changed) return true;
}
}
測試并發(fā)應(yīng)用程序
- 監(jiān)控Lock接口
- ReentrantLock內(nèi)部方法都是protected的,所以可以繼承
- MyLock extends ReentrantLock
- 調(diào)用Thread getOwner()
- 調(diào)用Collection<Thread> getQueuedThreads()
- ...
- 監(jiān)控Phaser類
- getPhase
- getRegisteredParties
- getArrivedParties
- ...
- 監(jiān)控執(zhí)行器框架
- ThreadPoolExecutor
- getPoolSize
- getCorePoolSize
- getActiveCount
- getTaskCount
- ...
- ThreadPoolExecutor
- 監(jiān)控Fork/Join池
- getPoolSize
- getParallelism
- getActiveThreadCount
- getStealCount
- ...
- 輸出高效的日志信息
- 輸出必要的信息
- 為消息設(shè)定恰當(dāng)?shù)募墑e
- 使用FindBugs分析并發(fā)代碼
- 配置Eclipse調(diào)試并發(fā)代碼
- 可選擇Default suspend policy for new breakpoints的值為Suspend VM
- 默認為Suspend Thread
- 配置NetBeans調(diào)試并發(fā)代碼
- 使用MultithreadedTC測試并發(fā)代碼
- MultithreadedTestCase
- waitForTick