JUC-AQS入門

1. 簡介

AQS是AbstractQueuedSynchronizer的簡寫,即隊列同步器。它是構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等)。

2. 工作原理

AQS通過內(nèi)置的FIFO同步隊列來完成資源獲取線程的排隊工作,如果當(dāng)前線程獲取同步狀態(tài)失?。ㄦi)時,AQS則會將當(dāng)前線程以及等待狀態(tài)等信息構(gòu)造成一個節(jié)點(Node)并將其加入同步隊列,同時會阻塞當(dāng)前線程,當(dāng)同步狀態(tài)釋放時,則會把節(jié)點中的線程喚醒,使其再次嘗試獲取同步狀態(tài)。

aqs.png
  • 使用Node實現(xiàn)FIFO隊列,可以用于構(gòu)建鎖或者其它同步裝置的基礎(chǔ)框架。
  • 利用了一個int類型表示狀態(tài)。
  • 使用方法是繼承,子類通過繼承并通過實現(xiàn)它的方法管理其狀態(tài){ acquire 和release }的方法操縱狀態(tài)。
  • 可以同時實現(xiàn)排它鎖和共享鎖模式(獨占、共享)。

3.AQS組件

3.1 CountDownLatch

CountDownLatch是通過一個計數(shù)器來實現(xiàn)的,計數(shù)器的初始值為線程的數(shù)量。每當(dāng)一個線程完成了自己的任務(wù)后,計數(shù)器的值就會減1。當(dāng)計數(shù)器值到達(dá)0時,它表示所有的線程已經(jīng)完成了任務(wù),然后在閉鎖上等待的線程就可以恢復(fù)執(zhí)行任務(wù)。


CountDownLatch.png
@Slf4j
public class CountDownLatchExample1 {
    private final static int threadCount = 200;
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        //countDownLatch.await(100,TimeUnit.MILLSECONDS),設(shè)定等候時間
        countDownLatch.await();
        log.info("finish");
        exec.shutdown();
    }
    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum);
        Thread.sleep(100);
    }
}
3.2 Semaphore

Semaphore負(fù)責(zé)協(xié)調(diào)各個線程,以保證它們能夠正確、合理的使用公共資源,也是操作系統(tǒng)中用于控制進(jìn)程同步互斥的量。Semaphore是一種計數(shù)信號量,用于管理一組資源,內(nèi)部是基于AQS的共享模式。它相當(dāng)于給線程規(guī)定一個量從而控制允許活動的線程數(shù)。

Semaphore主要方法:

//構(gòu)造方法,創(chuàng)建具有給定許可數(shù)的計數(shù)信號量并設(shè)置為非公平信號量
Semaphore(int permits)
//構(gòu)造方法,當(dāng)fair等于true時,創(chuàng)建具有給定許可數(shù)的計數(shù)信號量并設(shè)置為公平信號量
Semaphore(int permits,boolean fair)
//從此信號量獲取一個許可前線程將一直阻塞。相當(dāng)于一輛車占了一個車位
void acquire()
//從此信號量獲取給定數(shù)目許可,在提供這些許可前一直將線程阻塞。比如n=2,就相當(dāng)于一輛車占了兩個車位
void acquire(int n)
//嘗試獲取許可,停車場有車位就進(jìn)入,沒有就走
tryAcquire()
//釋放一個許可,將其返回給信號量。就如同車開走返回一個車位
void release()
//釋放n個許可
void release(int n)
//當(dāng)前可用的許可數(shù)
int availablePermits()

下面一起看看如何使用Semaphore

@Slf4j
public class SemaphoreExample1 {
    private final static int threadCount = 20;
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    //semaphore.acquire(3); 獲取多個許可
                    //semaphore.tryAcquire() 嘗試獲取許可
                    //semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS) 嘗試等待獲取許可
                    semaphore.acquire(); // 獲取一個許可
                    test(threadNum);
                    semaphore.release(); // 釋放一個許可
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }
    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}
3.3 CyclicBarrier

CyclicBarrier是一個同步的輔助類,允許一組線程相互之間等待,達(dá)到一個共同點,再繼續(xù)執(zhí)行。


CyclicBarrier.png
@Slf4j
public class CyclicBarrierExample3 {
    private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
        //此處代碼在barrier滿足條件時優(yōu)先執(zhí)行
        log.info("callback is running");
    });
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }
    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}
3.4 ReetrantLock

,JDK6.0版本之后synchronized 得到了大量的優(yōu)化,二者性能也不分伯仲,但是重入鎖是可以完全替代synchronized關(guān)鍵字的。除此之外, ReetrantLock又自帶一系列高逼格BUFF:可中斷響應(yīng)、鎖申請等待限時、公平鎖。另外可以結(jié)合Condition來使用,使其更是逼格滿滿。

ReentrantLock與Synchronized區(qū)別

  • 可重入性(都可重入)
  • 鎖的實現(xiàn):Synchronized是依賴于JVM實現(xiàn)的,而ReentrantLock是依賴于程序?qū)崿F(xiàn)。
  • 性能區(qū)別:在JDK5.0版本之前, ReetrantLock的性能遠(yuǎn)遠(yuǎn)好于synchronized關(guān)鍵字,但是隨著鎖的不斷優(yōu)化(自旋鎖、輕量級鎖、偏向鎖),兩者性能也差不太多。在兩者都能滿足需求的情況,更推薦使用synchronized,簡單。
  • 功能區(qū)別:Synchronized的使用便于ReentrantLock,并且它是由編譯器是保證鎖的加鎖和釋放的,而ReentrantLock是由我們自己控制的;第二點鎖定粒度與靈活度,明顯ReentrantLock優(yōu)于Synchronized。
  • ReentrantLock獨有功能:1,可指定公平鎖或非公平鎖。2,提供了一個Condition類,可以分組喚醒需要喚醒的線程。3,提供能夠中斷等待鎖的線程的機(jī)制,lock.lockInterruptibly()。

Condition的使用:Condition可以非常靈活的操作線程的喚醒,下面是一個線程等待與喚醒的例子,其中用1234序號標(biāo)出了日志輸出順序

public static void main(String[] args) {
    ReentrantLock reentrantLock = new ReentrantLock();
    Condition condition = reentrantLock.newCondition();//創(chuàng)建condition
    //線程1
    new Thread(() -> {
        try {
            reentrantLock.lock();
            log.info("wait signal"); // 1
            condition.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("get signal"); // 4
        reentrantLock.unlock();
    }).start();
    //線程2
    new Thread(() -> {
        reentrantLock.lock();
        log.info("get lock"); // 2
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        condition.signalAll();//發(fā)送信號
        log.info("send signal"); // 3
        reentrantLock.unlock();
    }).start();
}
3.5 ReentrantReadWriteLock讀寫鎖

在沒有任何讀寫鎖的時候才可以取得寫入鎖(悲觀讀取,容易寫線程饑餓),也就是說如果一直存在讀操作,那么寫鎖一直在等待沒有讀的情況出現(xiàn),這樣我的寫鎖就永遠(yuǎn)也獲取不到,就會造成等待獲取寫鎖的線程饑餓。 平時使用的場景并不多。

public class LockExample3 {
    private final Map<String, Data> map = new TreeMap<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();
    public Data get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }
    public Set<String> getAllKeys() {
        readLock.lock();
        try {
            return map.keySet();
        } finally {
            readLock.unlock();
        }
    }
    public Data put(String key, Data value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }
    class Data {}
}
3.6 StampedLock

StampedLock是Java8引入的一種新的鎖機(jī)制,可以認(rèn)為它是讀寫鎖的一個改進(jìn)版本,讀寫鎖雖然分離了讀和寫的功能,使得讀與讀之間可以完全并發(fā),但是讀和寫之間依然是沖突的,讀鎖會完全阻塞寫鎖,它使用的依然是悲觀鎖策略。如果有大量的讀線程,他也有可能引起寫線程的饑餓。而StampedLock則提供了一種樂觀的讀策略,這種樂觀策略的鎖非常類似于無鎖的操作,使得樂觀鎖完全不會阻塞寫線程。

@Slf4j
@ThreadSafe
public class LockExample5 {
    // 請求總數(shù)
    public static int clientTotal = 5000;
    // 同時并發(fā)執(zhí)行的線程數(shù)
    public static int threadTotal = 200;
    public static int count = 0;
    private final static StampedLock lock = new StampedLock();
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }
    private static void add() {
        long stamp = lock.writeLock();
        try {
            count++;
        } finally {
            lock.unlock(stamp);
        }
    }
}
3.7 Callable、Future、FutureTask

線程的創(chuàng)建方式中有兩種,一種是實現(xiàn)Runnable接口,另一種是繼承Thread,但是這兩種方式都有個缺點,那就是在任務(wù)執(zhí)行完成之后無法獲取返回結(jié)果,于是就有了Callable接口,F(xiàn)uture接口與FutureTask類的配合取得返回的結(jié)果。

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

該接口聲明了一個名稱為call()的方法,同時這個方法可以有返回值V,也可以拋出異常。
下面看下Future接口定義

public interface Future<V> {
    //如果任務(wù)還沒開始,執(zhí)行cancel(...)方法將返回false;如果任務(wù)已經(jīng)啟動,執(zhí)    
   //行cancel(true)方法將以中斷執(zhí)行此任務(wù)線程的方式來試圖停止任務(wù),如果停 
   //止成功,返回true;當(dāng)任務(wù)已經(jīng)啟動,執(zhí)行cancel(false)方法將不會對正在執(zhí) 
   //行的任務(wù)線程產(chǎn)生影響(讓線程正常執(zhí)行到完成),此時返回false;當(dāng)任務(wù)已經(jīng)/ 
   //完成,執(zhí)行cancel(...)方法將返回false。mayInterruptRunning參數(shù)表示是否中 
   //斷執(zhí)行中的線
    boolean cancel(boolean mayInterruptIfRunning);
    //如果任務(wù)完成前被取消,則返回true
    boolean isCancelled();
    //如果任務(wù)執(zhí)行結(jié)束,無論是正常結(jié)束或是中途取消還是發(fā)生異常,都返回true
    boolean isDone();
    //獲取異步執(zhí)行的結(jié)果,如果沒有結(jié)果可用,此方法會阻塞直到異步計算完成
    V get() throws InterruptedException, ExecutionException;
    //獲取異步執(zhí)行結(jié)果,如果沒有結(jié)果可用,此方法會阻塞,但是會有時間限      
    //制,如果阻塞時間超過設(shè)定的timeout時間,該方法將拋出異常
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

通過方法我們可知Future提供了3種功能:(1)能夠中斷執(zhí)行中的任務(wù)(2)判斷任務(wù)是否執(zhí)行完成(3)獲取任務(wù)執(zhí)行完成后的結(jié)果。
下面我們再看下FutureTask的定義

public class FutureTask<V> implements RunnableFuture<V> {
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

FutureTask除了實現(xiàn)了Future接口外還實現(xiàn)了Runnable接口,
通過上面的介紹,我們對Callable,F(xiàn)uture,F(xiàn)utureTask都有了比較清晰的了解了,那么它們到底有什么用呢?我們前面說過通過這樣的方式去創(chuàng)建線程的話,最大的好處就是能夠返回結(jié)果,加入有這樣的場景,我們現(xiàn)在需要計算一個數(shù)據(jù),而這個數(shù)據(jù)的計算比較耗時,而我們后面的程序也要用到這個數(shù)據(jù)結(jié)果,那么這個時Callable豈不是最好的選擇?我們可以開設(shè)一個線程去執(zhí)行計算,而主線程繼續(xù)做其他事,而后面需要使用到這個數(shù)據(jù)時,我們再使用Future獲取不就可以了嗎?下面我們就來編寫一個這樣的實例。

@Slf4j
public class FutureExample {
    static class MyCallable implements Callable<String> {
        @Override
        public String call() throws Exception {
            log.info("do something in callable");
            Thread.sleep(5000);
            return "Done";
        }
    }

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> future = executorService.submit(new MyCallable());
        log.info("do something in main");
        Thread.sleep(1000);
        String result = future.get();
        log.info("result:{}", result);
    }
}

下面我們用FutureTask再次實現(xiàn)類似功能

@Slf4j
public class FutureTaskExample {

    public static void main(String[] args) throws Exception {
        FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.info("do something in callable");
                Thread.sleep(5000);
                return "Done";
            }
        });
        new Thread(futureTask).start();
        log.info("do something in main");
        Thread.sleep(1000);
        String result = futureTask.get();
        log.info("result:{}", result);
    }
}
3.8 Fork/Join框架

Fork/Join框架是Java7提供了的一個用于并行執(zhí)行任務(wù)的框架, 是一個把大任務(wù)分割成若干個小任務(wù),最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。此處我們不做過多的說明,其原理是基于工作竊取算法,指某個線程從其他隊列里竊取任務(wù)來執(zhí)行。下面我們以一個demo來介紹如何使用

@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
    public static final int threshold = 2;
    private int start;
    private int end;
    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;

        //如果任務(wù)足夠小就計算任務(wù)
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任務(wù)大于閾值,就分裂成兩個子任務(wù)計算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

            // 執(zhí)行子任務(wù)
            leftTask.fork();
            rightTask.fork();

            // 等待任務(wù)執(zhí)行結(jié)束合并其結(jié)果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合并子任務(wù)
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一個計算任務(wù),計算1+2+3+4
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

        //執(zhí)行一個任務(wù)
        Future<Integer> result = forkjoinPool.submit(task);
        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}
3.9 Queue

在并發(fā)隊列上JDK提供了兩套實現(xiàn),一個是以ConcurrentLinkedQueue為代表的高性能隊列,一個是以BlockingQueue接口為代表的阻塞隊列,無論哪種都集成自Queue。


類結(jié)構(gòu).png

ConcurrentLinkedQueue:是一個適用于高并發(fā)場景下的隊列,通過無鎖的方式,實現(xiàn)了高并發(fā)狀態(tài)下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。它是一個基于鏈接節(jié)點的無界線程安全隊列。該隊列的元素遵循先進(jìn)先出的原則。

BlockingQueue接口:

  • ArrayBlockingQueue:基于數(shù)組的阻塞隊列實現(xiàn),在ArrayBlockingQueue內(nèi)部,維護(hù)了一個定長數(shù)組,以便緩存隊列中的數(shù)據(jù)對象,其內(nèi)部沒有實現(xiàn)讀寫分離,也就意味著生產(chǎn)和消費不能完全并行,長度是需要定義的,可以指定先進(jìn)先出或者先進(jìn)后出,也叫有界隊列。
  • LinkedBlockingQueue:基于鏈表的阻塞隊列,同ArrayBlockingQueue類似,其內(nèi)部也維持一個數(shù)據(jù)緩沖隊列(由列表構(gòu)成),LinkedBlockingQueue之所以能夠高效的處理并發(fā)數(shù)據(jù),是因為內(nèi)部實現(xiàn)采用分離鎖(讀寫分離兩個鎖),從而實現(xiàn)生產(chǎn)者和消費者操作完全并發(fā),是一個無界隊列。
  • SynchronousQueue:一種沒有緩沖的隊列,生產(chǎn)者生產(chǎn)的數(shù)據(jù)直接會被消費者獲取并消費。
  • PriorityBlockingQueue:基于優(yōu)先級的阻塞隊列(優(yōu)先級的判斷通過構(gòu)造函數(shù)傳入的Compator對象來決定,也就是說傳入隊列的對象必須實現(xiàn)Comparable接口),在實現(xiàn)PriorityBlockingQueue時,內(nèi)部控制線程同步得鎖采用的是公平鎖,它是一個無界的隊列。
  • DelayQueue:帶有延遲時間的Queue,其中的元素只有當(dāng)其指定的延遲時間到了,才能夠從隊列中獲取到該元素,DelayQueue中的元素必須實現(xiàn)Delayed接口,DelayQueue是一個沒有大小限制的隊列,應(yīng)用場景很多,比如對緩存超時的數(shù)據(jù)進(jìn)行移除、任務(wù)超時處理、空閑鏈接的關(guān)閉等等。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、并發(fā) 進(jìn)程:每個進(jìn)程都擁有自己的一套變量 線程:線程之間共享數(shù)據(jù) 1.線程 Java中為多線程任務(wù)提供了很多的...
    SeanMa閱讀 2,813評論 0 11
  • 第1章并發(fā)編程的挑戰(zhàn) 第2章Java并發(fā)機(jī)制的底層實現(xiàn)原理 1. volatile——下文討論 2. synchr...
    小張同學(xué)_loveZY閱讀 1,432評論 0 1
  • 這周小詩 這周閱讀狀態(tài)不佳我會堅持的 這周故事(洛克)有一只小鳥叫洛克它生下來就是一只翅膀,就是因為洛克只有一支翅...
    peter則斌閱讀 394評論 1 1
  • 樓高月小,步行回家。 #賣糖葫蘆的不在#
    仇志飛閱讀 175評論 0 0
  • 當(dāng)初寫作就想寫點能讓自己有回憶或者能幫助讀者的東西,但是隨著自己不斷的寫下去、慢慢發(fā)現(xiàn)自己就是為了應(yīng)付交作業(yè)...
    阿耀的故事閱讀 420評論 0 1

友情鏈接更多精彩內(nèi)容