Java7并發(fā)編程實戰(zhàn)手冊-讀書筆記

Java7并發(fā)編程實戰(zhàn)手冊

線程管理

  • Thread/Runnable/Thread.State
    • 線程的信息獲取和設(shè)置
    • 線程的中斷
    • sleep/yield
    • join
    • daemon
    • UncaughtExceptionHandler
      • Thread#setDefaultUncaughtExceptionHandler
    • ThreadLocal/InheritableThreadLocal
    • ThreadGroup
      • uncaughtException
    • ThreadFactory

線程同步基礎(chǔ)

  • synchronized 同步方法 this
  • synchronized 屬性對象 object
  • 同步代碼塊中使用條件
    • Object#wait/notify/notifyAll
    • 虛假喚醒(while)
  • Lock
    • ReentrantLock
      • try/finally
    • ReadWriteLock
    • 公平性 fair
    • Condition
      • while
      • lock/unlock之間

線程同步輔助類

  • Semaphore
    • 內(nèi)部計數(shù)器
    • acquire/release
    • acquireUninterruptibly
      • 忽略線程中斷且不會拋出任何異常
  • CountDownLatch
    • 內(nèi)部計數(shù)器被初始化之后就不能被再次初始化,唯一能改值的是countDown
  • CylicBarrier
    • BrokenBarrierException
    • reset
  • Phaser
    • 在每一步結(jié)束的位置對線程進行同步,當(dāng)所有的線程都完成了這一步,才允許執(zhí)行下一步
    • Phaser(3) 指定參與階段同步的線程數(shù)是3個
    • 被Phaser類置于休眠的線程不會響應(yīng)中斷事件
      • awaitAdvanceInterruptibly(int phaser),被中斷會拋出InterruptedException
    • onAdavance,階段改變時被自動執(zhí)行
  • Exchanger
    • 只能同步兩個線程
    • exchange調(diào)用后將休眠直到其他的線程到達

線程執(zhí)行器

  • ThreadPoolExecutor/Executors
    • shutdown,當(dāng)執(zhí)行完所有待運行的任務(wù)后,它將結(jié)束執(zhí)行,調(diào)用完畢立即返回,結(jié)合awaitTermination判斷是否線程池是否關(guān)閉
    • awaitTermination(long timeout, TimeUnit unit)
    • 提供很多方法獲取自身狀態(tài)的信息
    • newCachedThreadPool/newFixedThreadPool
  • Future/Callbale
    • submit
    • call() throws Exception
  • ThreadPoolExecutor#invokeAny
    • 返回第一個完成任務(wù)且沒有拋出異常的任務(wù)的執(zhí)行結(jié)果
  • ThreadPoolExecutor#invokeAll
    • 等待所有任務(wù)的完成
    • landon:是否可以用這個方法做場景心跳
         while(!isDone)
         {
          - long startTime = getCurrent()
          - 場景調(diào)度器提交場景任務(wù),等待所有場景任務(wù)執(zhí)行完畢(invokeAll),這樣亦可以保證潛在的順序問題,因為每次都是將當(dāng)前的tick執(zhí)行完畢
           - 而非之前submit,如果一次tick執(zhí)行超過50ms則下次循環(huán)又會向線程池提交任務(wù),會出現(xiàn)同一個任務(wù)多個線程執(zhí)行的潛在情況
          - long endTime = getCurrent()
          - long sleepTime = startTime + TickInterval - endTime
          - if(sleepTime > 0) sleep(sleepTime)
          - else println("busy,oneTick executeTime:" + sleepTime)
         }
      

  • ScheduledThreadPoolExecutor
    • schedule
    • scheduleAtFixedRate
    • scheduleWithFixDelay
  • Future
    • cancel
    • get
  • FutureTask
    • done,允許在執(zhí)行器中的任務(wù)執(zhí)行結(jié)束之后還可以執(zhí)行一些代碼
    • FutureTask<V> implements RunnableFuture<V>
    • FutureTask(Callable<V> callable)
  • CompletionService
    • submit,提交任務(wù)
    • poll/take,獲取任務(wù)已經(jīng)完成的Future對象
    • 即任務(wù)完成后將Future對象放到一個完成的阻塞隊列中
  • RejectedExecutionHandler

Fork/Join框架

  • 分治

    • fork-將一個任務(wù)拆分成更小的多個任務(wù)
    • join-等待子任務(wù)的完成執(zhí)行
    • 工作竊取算法-work-stealing algorithm
  • ForkJoinPool

    • ForkJoinTask
      • RecursiveAction 任務(wù)無返回結(jié)果
      • RecursiveTask 任務(wù)有返回結(jié)果
      • 遞歸

if(problem size > default size)
{
    tasks = divide(task)
    execute(tasks)
 }
 else {resolve problem using another algorithm}

  • Task extends RecursiveAction // 無返回結(jié)果
    • compute

        if(...) // divide
        {
         Task t1 = new Task(...)
         Task t2 = new Task(...)
         invokeAll(t1,t2) // 同步調(diào)用,執(zhí)行創(chuàng)建的多個子任務(wù)
        }
        else {...}
    

    • ForkJoinPool#execute(task) --默認創(chuàng)建一個線程數(shù)等于計算機cpu數(shù)目的線程池
  • 合并任務(wù)的結(jié)果

      if(problem size > default size)
        tasks = divide(task)
        execute(tasks)
        groupResults
        return result
       else
        resolve problem
        return result
    

    • ForkJoinTask#get 等待返回任務(wù)計算結(jié)果
  • 異步運行任務(wù)
    • 同步方式如invokeAll,任務(wù)被掛起,直到任務(wù)被發(fā)送到fork/join線程池中執(zhí)行。該方式允許ForkJoinPool采用工作竊取算法
    • 異步方式如fork時(立即返回),無法使用該算法
    • ForkJoinTask#V join()
    • get和join有區(qū)別
  • 任務(wù)中拋出異常
    • ForkJoinTask#isCompletedAbnormally 檢查主任務(wù)或者子任務(wù)是否拋出了異常
    • getException 獲取異常信息
    • 任務(wù)拋出運行時異常,會影響其父任務(wù)...父任務(wù)..
  • 取消任務(wù)
    • 在任務(wù)開始前可以取消
    • 例:在數(shù)字數(shù)組中尋找一組數(shù)字,拆分為更小的問題,但僅關(guān)心數(shù)字的一次出現(xiàn)。當(dāng)我們找到他時,就會取消其他子任務(wù)
      • 可存儲發(fā)送到線程池中的所有任務(wù),當(dāng)發(fā)現(xiàn)當(dāng)前任務(wù)找到數(shù)字后,取消非當(dāng)前任務(wù)的所有任務(wù)
      • 如果任務(wù)正在運行或者已經(jīng)執(zhí)行結(jié)束,則不能取消,cancel返回false。因此可以嘗試去取消所有的任務(wù)而不用擔(dān)心可能帶來的間接影響

并發(fā)集合

  • ConcurrentLinkedDeque 非阻塞
    • getFirst.../peekFirst.../removeFirst.../pollFirst... - 雙端隊列
  • LinkedBlockingDeque 阻塞
    • put/take/poll... 雙端隊列
  • PriorityBlockingQueue
    • 隊列的元素必須實現(xiàn)Comparable接口
    • 按照排序結(jié)果決定插入元素的位置
  • DelayQueue
    • 元素必須實現(xiàn)Delayed接口
    • public interface Delayed extends Comparable<Delayed>
    • 兩個待實現(xiàn)方法
      • compareTo(Delayed o)
      • getDelay(Timeunit unit)
    • 從隊列取元素時,到期的元素會返回(未來的元素等待到期_延遲時間)
    • landon:是否可以用于游戲服務(wù)器中的計時器如果有多個timer按照到期時間排隊_
  • ConcurrentSkipListMap
    • 根據(jù)鍵值排序所有元素
    • 內(nèi)部機制-Skip List
    • firstEntry/lastEntry/subMap/...
    • landon:可以和TreeMap做比較,一個線程不安全,一個線程安全
  • ThreadLocalRandom
    • current,該方法是一個靜態(tài)方法,如果調(diào)用線程還未關(guān)聯(lián)隨機數(shù)對象,就會初始化一個(localInit)
  • Atomic Variable
    • compareAndSet,這是是最主要的方法
      • 判斷內(nèi)存變量的值是否是expect,如果是說明未被其他線程改過,可以直接用update新值更新
      • 否則說明被其他線程改過,進而可以選擇下一步處理方式
    • AtomicReference#public final boolean compareAndSet(V expect, V update)
    • CAS
    • AtomicIntegerArray -原子數(shù)組
  • LinkedTransferQueue
  • AtomicReference--實現(xiàn)單例

 public class Singleton {   
 
    private static final AtomicReference<Singleton> INSTANCE = new AtomicReference<Singleton>();  
      
    private Singleton (){}   
     
    public static  Singleton getInstance() {            
    for (;;) {            
        Singleton current = INSTANCE.get();                 
        if (current != null) {                
        return current;            
        }            
        current = new Singleton();            
        if (INSTANCE.compareAndSet(null, current)) {                
        return current;           
        }       
    }    
    }
  
}

定制并發(fā)類

  • 定制ThreadPoolExecutor
    • 繼承該類
    • 覆寫_記得調(diào)用super
    • shutdown
    • shutdownNow
      • 輸出如等待執(zhí)行的任務(wù)數(shù)目 getQueue().size...
      • getCompletedTaskCount,獲得已執(zhí)行過的任務(wù)數(shù)
      • getActiveCount,獲得正在執(zhí)行的任務(wù)數(shù)
    • beforeExecute
    • afterExecute
  • 實現(xiàn)基于優(yōu)先級的Executor類
    • ThreadPoolExector參數(shù)傳入PriorityBlockingQueue
    • 如果是fixed兩個線程,那么前2個任務(wù)是被2個線程執(zhí)行的;后面的排隊任務(wù)按照優(yōu)先級順序執(zhí)行
  • 實現(xiàn)ThreadFactory接口生成定制線程
    • 覆寫newThread方法
    • 返回的Thread可以是定制的Thread對象(MyThread extends Thread)
    • 可外部直接調(diào)用Threadfactory#newThread返回線程
  • 在Executor對象中使用ThreadFactory
    • 線程池參數(shù)中指定線程工廠
    • Executors內(nèi)部有一個DefaultThreadFactory
      * Executors$DefaultThreadFactory
  • 定制運行在定時線程池中的任務(wù)
    • 繼承ScheduledThreadPoolExecutor
    • 覆寫protected <V> RunnableScheduledFuture<V> decorateTask(
      * Runnable runnable, RunnableScheduledFuture<V> task)
    • 可自定義一個調(diào)度類extends FutureTask<V> implements RunnableScheduledFuture<V>
      - 參考ScheduledThreadPoolExecutor$ScheduledFutureTask
  • 通過實現(xiàn)ThreadFactory接口為Fork/Join框架生成定制線程
    • ForkJoinPool內(nèi)部實現(xiàn)
      • 一個任務(wù)隊列,存放等待被執(zhí)行的任務(wù)
      • 一個執(zhí)行這些任務(wù)的線程池
      • ForkJoinWorkerThread持有一個ForkJoinPool.WorkQueue workQueue
        * work-stealing mechanics
    • MyWorkerThread extends ForkJoinWorkerThread
      • onStart
      • onTermination
      • 調(diào)用super
    • MyWorkerThreadFactory implements ForkJoinWorkerThreadFactory
      • newThread
  • 定制運行在Fork/Join框架中的任務(wù)
    • MyWorkTask extends ForkJoinTask(Void)
      • getRawResult
      • exec
        • 調(diào)用compute抽象方法
  • 實現(xiàn)定制Lock類
    • ReentrantLock內(nèi)部有一個很重要的類Sync(AQS)
    • abstract static class Sync extends AbstractQueuedSynchronizer
      • static final class FairSync extends Sync
      • static final class NonfairSync extends Sync
    • 自定義實現(xiàn)一個MyAbstractQueuedSynchronizer extends AbstractQueuedSynchronizer
      • tryActuire
      • tryRelease
    • 自定義實現(xiàn)MyLock implements Lock
      • lock
      • unlock
      • tryLock
      • newCondition
  • 實現(xiàn)基于優(yōu)先級的傳輸隊列
    • MyPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E>
      • tryTransfer
      • transfer
      • hasWaitingConsumer
      • getWaitingConsumerCount
      • take
  • 實現(xiàn)自己的原子對象
    • MyCounter extends AtomicInteger

   for(;;)
     {
      int value = get();
      if(value == 10) return false;
      else
      {
       int newValue = value + 1;
       boolean changed = compareAndSet(value,newValue);
       if(changed) return true;
      }
     }

測試并發(fā)應(yīng)用程序

  • 監(jiān)控Lock接口
    • ReentrantLock內(nèi)部方法都是protected的,所以可以繼承
    • MyLock extends ReentrantLock
      • 調(diào)用Thread getOwner()
      • 調(diào)用Collection<Thread> getQueuedThreads()
      • ...
  • 監(jiān)控Phaser類
    • getPhase
    • getRegisteredParties
    • getArrivedParties
    • ...
  • 監(jiān)控執(zhí)行器框架
    • ThreadPoolExecutor
      • getPoolSize
      • getCorePoolSize
      • getActiveCount
      • getTaskCount
      • ...
  • 監(jiān)控Fork/Join池
    • getPoolSize
    • getParallelism
    • getActiveThreadCount
    • getStealCount
    • ...
  • 輸出高效的日志信息
    • 輸出必要的信息
    • 為消息設(shè)定恰當(dāng)?shù)募墑e
  • 使用FindBugs分析并發(fā)代碼
  • 配置Eclipse調(diào)試并發(fā)代碼
    • 可選擇Default suspend policy for new breakpoints的值為Suspend VM
    • 默認為Suspend Thread
  • 配置NetBeans調(diào)試并發(fā)代碼
  • 使用MultithreadedTC測試并發(fā)代碼
    • MultithreadedTestCase
    • waitForTick
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容