百萬架構(gòu)師第四十八課:并發(fā)編程的原理(三)|JavaGuide

原文鏈接
JavaGuide

并發(fā)編程的原理

目標(biāo):

  • Lock 的使用
  • AQS 原理分析
  • Condition
  • CountDownLatch 、 Semaphore
  • 線程池分析

J.U.C = java.util.concurrent

Lock 的使用

  • volatile 去解決可見性問題,防止指令重排序
  • synchronized 是保證 可見性,有序性,原子性的一種手段

這是 JVM 層面提供的關(guān)鍵字。

 **JDK** 層次有一個(gè) `java.util.concurrent` 的工具包,屬于 `并發(fā)` 在 JDK 層次的一種手段。這個(gè)手段也是對我們多線程在操作系統(tǒng)情況下一些控制的保證線程安全的方式。

同步鎖

我們知道,鎖是用來控制多個(gè)線程訪問共享資源的方式,一般來說,一個(gè)鎖能夠防止多個(gè)線程同時(shí)訪問共享資源,在 Lock 接口出現(xiàn)之前,JAVA 應(yīng)用程序只能依靠 `synchronized` 關(guān)鍵字來實(shí)現(xiàn)同步鎖的功能,在 JAVA5 以后,增加了 JUC 的并發(fā)包且提供了 `Lock` 接口用來實(shí)現(xiàn)鎖的功能,它提供了與 `synchroinzed` 關(guān)鍵字類似的同步功能,只是它比 `synchronized` 更靈活,能夠顯示的獲取和釋放鎖。

Lock的初步使用

 `Lock` 是一個(gè)接口,核心的兩個(gè)方法 Lock 和 unlock ,它有很多的實(shí)現(xiàn),比如 `ReentrantLock` 、 `ReentrantReadWriteLock`;
public interface Lock {
    void lock();
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}
JavaGuide_并發(fā)編程_原理3_ReentrantReadWriteLock.jpg
JavaGuide_并發(fā)編程_原理3_ReentrantReadWriteLock類圖.png

fair 公平

ReentrantLock

重入鎖,表示支持重新進(jìn)入的鎖,也就是說,如果當(dāng)前線程 t1 通過調(diào)用 `#lock` 方法獲取了鎖之后,再次調(diào)用lock,是不會(huì)再阻塞去獲取鎖的,直接增加重試次數(shù)就行了。
public class AtomicDemo {
    private static int count=0;
    static Lock lock=new ReentrantLock();

    public static void inc(){
        lock.lock();
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        count++;
        lock.unlock();
    }
    
    public static void main(String[] args) throws InterruptedException {
        for(int i=0;i<1000;i++){
            new Thread(()->{AtomicDemo.inc();}).start();;
        }
        Thread.sleep(3000);
    }
}

ReentrantReadWriteLock

我們以前理解的鎖,基本都是排他鎖,也就是這些鎖在同一時(shí)刻只允許一個(gè)線程進(jìn)行訪問,而讀寫所在同一時(shí)刻可以允許多個(gè)線程訪問,但是在寫線程訪問時(shí),所有的讀線程和其他寫線程都會(huì)被阻塞。讀寫鎖維護(hù)了一對鎖,一個(gè)讀鎖、一個(gè)寫鎖;一般情況下,讀寫鎖的性能都會(huì)比排它鎖好,因?yàn)榇蠖鄶?shù)場景 **讀是多于寫的** 。在讀多于寫的情況下,讀寫鎖能夠提供比排它鎖更好的并發(fā)性和吞吐量。
public class RWLockDemo {
    // 排他鎖
    // 共享鎖,在同一時(shí)刻可以有多個(gè)線程獲得鎖
    // 讀鎖, 寫鎖
    static Map<String, Object> cacheMap = new HashMap<>();
    // 重入讀寫鎖
    static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    static Lock read = readWriteLock.readLock();    // 讀鎖
    static Lock write = readWriteLock.writeLock();  // 寫鎖

    // 緩存的更新和讀取的時(shí)候
    public static final Object get(String key) {
        read.lock(); // 讀取的時(shí)候加上讀鎖
        out.println("開始讀取數(shù)據(jù)");
        try {
            return cacheMap.get(key);
        } finally {
            read.unlock();
        }
    }

    public static final Object set(String key, Object value) {
        write.lock();  // 每一次寫數(shù)據(jù),都需要先加上寫鎖
        out.println("開始寫數(shù)據(jù)");
        try {
            return cacheMap.put(key, value);
        } finally {
            write.unlock();
        }
    }
}
在這個(gè)案例中,通過hashmap來模擬了一個(gè)內(nèi)存緩存,然后使用讀寫鎖來保證這個(gè)內(nèi)存緩存的線程安全性。當(dāng)執(zhí)行讀操作的時(shí)候,需要獲取讀鎖,在并發(fā)訪問的時(shí)候,讀鎖不會(huì)被阻塞,因?yàn)樽x操作不會(huì)影響執(zhí)行結(jié)果。
在執(zhí)行寫操作時(shí),線程必須要獲取寫鎖,當(dāng)已經(jīng)有線程持有寫鎖的情況下,當(dāng)前線程會(huì)被阻塞,只有當(dāng)寫鎖釋放以后,其他讀寫操作才能繼續(xù)執(zhí)行。使用讀寫鎖提升讀操作的并發(fā)性。以保證每次寫操作對所有的讀寫操作的可見性。
  • 讀鎖與讀鎖可以共享
  • 讀鎖與寫鎖不可以共享(排他)
  • 寫鎖與寫鎖不可以共享(排他)

Locksynchronized 的簡單對比

通過我們對 `Lock` 的使用以及對 `synchronized` 的了解,基本上可以對比出這兩種鎖的區(qū)別了。因?yàn)檫@個(gè)也是在面試過程中比較常見的問題。
  • 從層次上,一個(gè)是關(guān)鍵字、一個(gè)是類, 這是最直觀的差異
  • 從使用上, Lock 具備更大的靈活性,可以控制鎖的釋放和獲取; 而 synchronized 的鎖的釋放是被動(dòng)的,當(dāng)出現(xiàn)異常或者同步代碼塊執(zhí)行完以后,才會(huì)釋放鎖
  • Lock 可以判斷鎖的狀態(tài)、而 synchronized 無法做到

Lock 可以實(shí)現(xiàn)公平鎖、非公平鎖; 而 synchronized 只有非公平鎖

AQS

 `Lock` 之所以能實(shí)現(xiàn)線程安全的鎖,主要的核心是 **AQS** ( `AbstractQueuedSynchronizer`  ) ,  `AbstractQueuedSynchronizer` 提供了一個(gè) **FIFO** 隊(duì)列,可以看作是一個(gè)用來實(shí)現(xiàn)鎖以及其他需要同步功能的框架。這里簡稱該類為 **AQS** 。 **AQS** 的使用依靠繼承來完成,子類通過繼承 **AQS** 并實(shí)現(xiàn)所需的方法來管理同步狀態(tài)。例如常見的 `ReentrantLock` ,`CountDownLatch` 等 **AQS** 的兩種功能。

從使用上來說, AQS 的功能可以分為兩種:獨(dú)占和共享。

  • 獨(dú)占鎖模式下,每次只能有一個(gè)線程持有鎖,比如前面給大家演示的 ReentrantLock 就是以獨(dú)占方式實(shí)現(xiàn)的互斥鎖
  • 共享鎖模式下,允許多個(gè)線程同時(shí)獲取鎖,并發(fā)訪問共享資源,比如 ReentrantReadWriteLock 。
    很顯然,獨(dú)占鎖是一種悲觀保守的加鎖策略,它限制了 讀/讀 沖突,如果某個(gè)只讀線程獲取鎖,則其他讀線程都只能等待,這種情況下就限制了不必要的并發(fā)性,因?yàn)樽x操作并不會(huì)影響數(shù)據(jù)的一致性。共享鎖則是一種樂觀鎖,它放寬了加鎖策略,允許多個(gè)執(zhí)行讀操作的線程同時(shí)訪問共享資源。
public class LockDemo {
    static Lock lock = new ReentrantLock();// 有公平重入鎖和非公平重入鎖

    private static int count = 0;

    public static void incr(){
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        lock.lock(); // 獲得鎖
        count ++;
        lock.unlock();
    }
}
通過一個(gè)重入鎖的方式實(shí)現(xiàn)一個(gè)鎖的等待。JVM 層面是如何讓一個(gè)鎖等待的。

 `lock.lock` 用到了 **CXQ** 以及我們的 `EntryList` ,通過隊(duì)列的方式,讓線程等待,等待之前,它用到了 **CAS** ,通過自旋的方式去嘗試獲得鎖。如果在指定時(shí)間內(nèi)獲得鎖失敗的話,它會(huì)去 `park`,最后當(dāng)我們這個(gè)鎖被釋放的時(shí)候,他會(huì)從 `EntryList` 里邊取出一個(gè)線程。再次去爭奪鎖。取出線程,喚醒一個(gè)線程 叫做 `unpark` 。

synchronized 來到了......

AQS的內(nèi)部實(shí)現(xiàn)

同步器依賴內(nèi)部的同步隊(duì)列(一個(gè) **FIFO雙向隊(duì)列** )來完成同步狀態(tài)的管理,當(dāng)前線程獲取同步狀態(tài)失敗時(shí),同步器會(huì)將當(dāng)前線程以及等待狀態(tài)等信息構(gòu)造成為一個(gè)節(jié)點(diǎn)( `Node` )并將其加入同步隊(duì)列,同時(shí)會(huì)阻塞當(dāng)前線程,當(dāng)同步狀態(tài)釋放時(shí),會(huì)把首節(jié)點(diǎn)中的線程喚醒,使其再次嘗試獲取同步狀態(tài)。

Node 的主要屬性如下

static final class Node {
    int waitStatus; //表示節(jié)點(diǎn)的狀態(tài),包含cancelled(取消);condition 表示節(jié)點(diǎn)在等待condition也就是在condition隊(duì)列中
    Node prev; //前繼節(jié)點(diǎn)
    Node next; //后繼節(jié)點(diǎn)
    Node nextWaiter; //存儲(chǔ)在condition隊(duì)列中的后繼節(jié)點(diǎn)
    Thread thread; //當(dāng)前線程
}
 **AQS** 類底層的數(shù)據(jù)結(jié)構(gòu)是使用雙向鏈表,是隊(duì)列的一種實(shí)現(xiàn)。包括一個(gè) head 節(jié)點(diǎn)和一個(gè) tail 節(jié)點(diǎn),分別表示頭結(jié)點(diǎn)和尾節(jié)點(diǎn),其中頭結(jié)點(diǎn)不存儲(chǔ) Thread ,僅保存 next 結(jié)點(diǎn)的引用。
JavaGuide_并發(fā)編程_原理3_節(jié)點(diǎn).png
JavaGuide_并發(fā)編程_原理3_AQS節(jié)點(diǎn).png
當(dāng)一個(gè)線程成功地獲取了同步狀態(tài)(或者鎖),其他線程將無法獲取到同步狀態(tài),轉(zhuǎn)而被構(gòu)造成為節(jié)點(diǎn)并加入到同步隊(duì)列中,而這個(gè)加入隊(duì)列的過程必須要保證線程安全,因此同步器提供了一個(gè)基于 CAS 的設(shè)置尾節(jié)點(diǎn)的方法: `compareAndSetTail ( Node expect , Nodeupdate )` ,它需要傳遞當(dāng)前線程“認(rèn)為”的尾節(jié)點(diǎn)和當(dāng)前節(jié)點(diǎn),只有設(shè)置成功后,當(dāng)前節(jié)點(diǎn)才正式與之前的尾節(jié)點(diǎn)建立關(guān)聯(lián)。
JavaGuide_并發(fā)編程_原理3_AQS節(jié)點(diǎn)狀態(tài).png
同步隊(duì)列遵循 **FIFO** ,首節(jié)點(diǎn)是獲取同步狀態(tài)成功的節(jié)點(diǎn),首節(jié)點(diǎn)的線程在釋放同步狀態(tài)時(shí),將會(huì)喚醒后繼節(jié)點(diǎn),而后繼節(jié)點(diǎn)將會(huì)在獲取同步狀態(tài)成功時(shí)將自己設(shè)置為首節(jié)點(diǎn)。
JavaGuide_并發(fā)編程_原理3_AQS節(jié)點(diǎn)狀態(tài)_頭節(jié)點(diǎn).png
設(shè)置首節(jié)點(diǎn)是通過獲取同步狀態(tài)成功的線程來完成的,由于只有一個(gè)線程能夠成功獲取到同步狀態(tài),因此設(shè)置頭節(jié)點(diǎn)的方法并不需要使用CAS來保證,它只需要將首節(jié)點(diǎn)設(shè)置成為原首節(jié)點(diǎn)的后繼節(jié)點(diǎn)并斷開原首節(jié)點(diǎn)的 `next` 引用即可

compareAndSet

JavaGuide_并發(fā)編程_原理3_AQS節(jié)點(diǎn)_CAS.png

java.util.concurrent.locks.AbstractQueuedSynchronizer

 **AQS** 中,除了本身的鏈表結(jié)構(gòu)以外,還有一個(gè)很關(guān)鍵的功能,就是 **CAS** ,這個(gè)是保證在多線程并發(fā)的情況下保證線程安全的前提下去把線程加入到 **AQS** 中的方法,可以簡單理解為樂觀鎖
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
這個(gè)方法里面,首先,用到了 `unsafe` 類,(Unsafe類是在 `sun.misc` 包下,不屬于 JAVA 標(biāo)準(zhǔn)。但是很多 **Java** 的基礎(chǔ)類庫,包括一些被廣泛使用的高性能開發(fā)庫都是基于 `Unsafe` 類開發(fā)的,比如 Netty 、 Hadoop 、 Kafka 等; Unsafe 可認(rèn)為是 JAVA 中留下的后門,提供了一些低層次操作,如直接內(nèi)存訪問、線程調(diào)度等) 。然后調(diào)用了 `#compareAndSwapObject` 這個(gè)方法。
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4,
                                                 Object var5);
這個(gè)是一個(gè) `native` 方法,
第一個(gè)參數(shù)為需要改變的對象,第二個(gè)為偏移量(即之前求出來的 headOffset 的值),第三個(gè)參數(shù)為期待的值,第四個(gè)為更新后的值
整個(gè)方法的作用是如果當(dāng)前時(shí)刻的值等于預(yù)期值 var4 相等,則更新為新的期望值 var5,如果更新成功,則返回 **true** ,否則返回 **false** ;
這里傳入了一個(gè) headOffset ,這個(gè) headOffset 是什么呢?在下面的代碼中,通過 `unsafe.objectFieldOffset`  
JavaGuide_并發(fā)編程_原理3_AQS節(jié)點(diǎn)_headoffset.png

然后通過反射獲取了 AQS 類中的成員變量,并且這個(gè)成員變量被 volatile 修飾的

JavaGuide_并發(fā)編程_原理3_AQS節(jié)點(diǎn)_head.png

unsafe.objectFieldOffset

 `headOffset` 這個(gè)是指類中相應(yīng)字段在該類的偏移量,在這里具體即是指 head 這個(gè)字段在 **AQS** 類的內(nèi)存中相對于該類首地址的偏移量。

一個(gè) JAVA 對象可以看成是一段內(nèi)存,每個(gè)字段都得按照一定的順序放在這段內(nèi)存里,通過這個(gè)方法可以準(zhǔn)確地告訴你某個(gè)字段相對于對象的起始內(nèi)存地址的字節(jié)偏移。用于在后面的 `compareAndSwapObject` 中,去根據(jù)偏移量找到對象在內(nèi)存中的具體位置。

這個(gè)方法在 `unsafe.cpp` 文件中,代碼如下:
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapObject(JNIEnv *env, jobject unsafe, jobject
obj, jlong offset, jobject e_h, jobject x_h))
    UnsafeWrapper("Unsafe_CompareAndSwapObject");
oop x = JNIHandles::resolve(x_h); // 新值
oop e = JNIHandles::resolve(e_h); // 預(yù)期值
oop p = JNIHandles::resolve(obj);
HeapWord* addr = (HeapWord *)index_oop_from_field_offset_long(p, offset);// 在內(nèi)存中的具體位置
oop res = oopDesc::atomic_compare_exchange_oop(x, addr, e, true);// 調(diào)用了另一個(gè)方法,實(shí)際上就是通過cas操作來替換內(nèi)存中的值是否成功
jboolean success = (res == e); // 如果返回的res等于e,則判定滿足compare條件(說明res應(yīng)該為內(nèi)存中的當(dāng)前值),但實(shí)際上會(huì)有ABA的問題
if (success) // success為true時(shí),說明此時(shí)已經(jīng)交換成功(調(diào)用的是最底層的cmpxchg指令)
    update_barrier_set((void*)addr, x); // 每次Reference類型數(shù)據(jù)寫操作時(shí),都會(huì)產(chǎn)生一個(gè)WriteBarrier暫時(shí)中斷操作,配合垃圾收集器
return success;
UNSAFE_END

所以其實(shí) #compareAndSet 這個(gè)方法,最終調(diào)用的是 unsafe 類的 #compareAndSwap ,這個(gè)指令會(huì)對內(nèi)存中的共享數(shù)據(jù)做原子的讀寫操作。

  1. 首先, cpu會(huì)把內(nèi)存中將要被更改的數(shù)據(jù)與期望值作比較
  2. 然后,當(dāng)兩個(gè)值相等時(shí),cpu才會(huì)將內(nèi)存中的對象替換為新的值。否則,不做變更操作。
  3. 最后,返回操作執(zhí)行結(jié)果

很顯然,這是一種樂觀鎖的實(shí)現(xiàn)思路。

ReentrantLock的實(shí)現(xiàn)原理分析

之所以叫重入鎖是因?yàn)橥粋€(gè)線程如果已經(jīng)獲得了鎖,那么后續(xù)該線程調(diào)用lock方法時(shí)不需要再次獲取鎖,也就是不會(huì)阻塞;重入鎖提供了兩種實(shí)現(xiàn),一種是非公平的重入鎖,另一種是公平的重入鎖。怎么理解公平和非公平呢?

如果在絕對時(shí)間上,先對鎖進(jìn)行獲取的請求一定先被滿足獲得鎖,那么這個(gè)鎖就是公平鎖,反之,就是不公平的。簡單來說公平鎖就是等待時(shí)間最長的線程最優(yōu)先獲取鎖。

默認(rèn)的情況下就是非公平鎖。

非公平鎖的實(shí)現(xiàn)流程時(shí)序圖

JavaGuide_并發(fā)編程_原理3_非公平鎖的實(shí)現(xiàn)流程.png

源碼分析

ReentrantLock.lock

public void lock() {
    sync.lock();
}
這個(gè)是獲取鎖的入口,調(diào)用了 `sync.lock` ;  `sync` 是一個(gè)實(shí)現(xiàn)了 **AQS** 的抽象類,這個(gè)類的主要作用是用來實(shí)現(xiàn)同步控制的,并且 sync 有兩個(gè)實(shí)現(xiàn),一個(gè)是 `NonfairSync` (非公平鎖)、另一個(gè)是 `FailSync` (公平鎖); 我們先來分析一下非公平鎖的實(shí)現(xiàn)

NonfairSync.lock

final void lock() {
    if (compareAndSetState(0, 1)) //這是跟公平鎖的主要區(qū)別,一上來就試探鎖是否空閑,如果可以插隊(duì),則設(shè)置獲得鎖的線程為當(dāng)前線程
        //exclusiveOwnerThread屬性是AQS從父類AbstractOwnableSynchronizer中繼承的屬性,用來保存當(dāng)前占用同步狀態(tài)的線程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1); //嘗試去獲取鎖
}
 `compareAndSetState`,這個(gè)方法在前面提到過了,再簡單講解一下,通過 **CAS** 算法去改變 state 的值,而這個(gè) state 是什么呢? 在 **AQS** 中存在一個(gè)變量 state ,對于`ReentrantLock` 來說,如果 `state = 0` 表示無鎖狀態(tài)、如果 `state > 0` 表示有鎖狀態(tài)。 ( state 在不同的鎖里邊表達(dá)的意思是不一樣的 ) 

所以在這里,是表示當(dāng)前的 state 如果等于 0 ,則替換為 1 ,如果替換成功表示獲取鎖成功了由于 `ReentrantLock` 是可重入鎖,所以持有鎖的線程可以多次加鎖,經(jīng)過判斷加鎖線程就是當(dāng)前持有鎖的線程時(shí)

(即 exclusiveOwnerThread==Thread.currentThread() ),即可加鎖,每次加鎖都會(huì)將 state 的值 +1 , state 等于幾,就代表當(dāng)前持有鎖的線程加了幾次鎖;

解鎖時(shí)每解一次鎖就會(huì)將 state 減 1 , state 減到 0 后,鎖就被釋放掉,這時(shí)其他線程可以加鎖;

AbstractQueuedSynchronizer.acquire

如果 **CAS** 操作未能成功,說明 `state` 已經(jīng)不為 0 ,此時(shí)繼續(xù) `acquire(1)` 操作, `acquire` 是AQS中的方法 當(dāng)多個(gè)線程同時(shí)進(jìn)入這個(gè)方法時(shí),首先通過 **CAS** 去修改 **state** 的狀態(tài),如果修改成功表示競爭鎖成功,競爭失敗的, `tryAcquire` 會(huì)返回  `false` 
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

這個(gè)方法的主要作用是

  • 嘗試獲取獨(dú)占鎖,獲取成功則返回,否則
  • 自旋獲取鎖,并且判斷中斷標(biāo)識,如果中斷標(biāo)識為 true ,則設(shè)置線程中斷
  • addWaiter方法把當(dāng)前線程封裝成Node,并添加到隊(duì)列的尾部

NonfairSync.tryAcquire

 `tryAcquire` 方法嘗試獲取鎖,如果成功就返回,如果不成功,則把當(dāng)前線程和等待狀態(tài)信息構(gòu)造成一個(gè)Node節(jié)點(diǎn),并將結(jié)點(diǎn)放入同步隊(duì)列的尾部。然后為同步隊(duì)列中的當(dāng)前節(jié)點(diǎn)循環(huán)等待獲取鎖,直到成功。
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

nofairTryAcquire

這里可以看出非公平鎖的含義,即獲取鎖并不會(huì)嚴(yán)格根據(jù)爭用鎖的先后順序決定。這里的實(shí)現(xiàn)邏輯類似 `synchroized` 關(guān)鍵字的偏向鎖的做法,即可重入而不用進(jìn)一步進(jìn)行鎖的競爭,也解釋了 `ReentrantLock` 中 `Reentrant` 的意義。
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState(); //獲取當(dāng)前的狀態(tài),前面講過,默認(rèn)情況下是0表示無鎖狀態(tài)
    if (c == 0) {
        if (compareAndSetState(0, acquires)) { //通過cas來改變state狀態(tài)的值,如果更新成功,表示獲取鎖成功,這個(gè)操作外部方法lock()就做過一次,這里再做只是為了再嘗試一次,盡量以最簡單的方式獲取鎖。
                setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {//如果當(dāng)前線程等于獲取鎖的線程,表示重入,直接累加重入次數(shù)
            int nextc = c + acquires;
        if (nextc < 0) // overflow 如果這個(gè)狀態(tài)值越界,拋出異常;如果沒有越界,則設(shè)置后返回true
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //如果狀態(tài)不為0,且當(dāng)前線程不是owner,則返回false。
        return false; //獲取鎖失敗,返回false
}

addWaiter

當(dāng)前鎖如果已經(jīng)被其他線程鎖持有,那么當(dāng)前線程來去請求鎖的時(shí)候,會(huì)進(jìn)入這個(gè)方法,這個(gè)方法主要是把當(dāng)前線程封裝成 node ,添加到 **AQS** 的鏈表中
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode); //創(chuàng)建一個(gè)獨(dú)占的Node節(jié)點(diǎn),mode為排他模式
    // 嘗試快速入隊(duì),如果失敗則降級至full enq
    Node pred = tail; // tail是AQS中表示同步隊(duì)列隊(duì)尾的屬性,剛開始為null,所以進(jìn)行enq(node)方法
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) { // 防止有其他線程修改tail,使用CAS進(jìn)行修改,如果失敗則降級至full enq
            pred.next = node; // 如果成功之后舊的tail的next指針再指向新的tail,成為雙向鏈表
            return node;
        }
    }
    enq(node); // 如果隊(duì)列為null或者CAS設(shè)置新的tail失敗
    return node;
}

enq

enq就是通過自旋操作把當(dāng)前節(jié)點(diǎn)加入到隊(duì)列中

private Node enq(final Node node) {
    for (;;) { //無效的循環(huán),為什么采用for(;;),是因?yàn)樗鼒?zhí)行的指令少,不占用寄存器
        Node t = tail;// 此時(shí)head, tail都為null
        if (t == null) { // Must initialize// 如果tail為null則說明隊(duì)列首次使用,需要進(jìn)行初始化
            if (compareAndSetHead(new Node()))// 設(shè)置頭節(jié)點(diǎn),如果失敗則存在競爭,留至下一輪循環(huán)
                tail = head; // 用CAS的方式創(chuàng)建一個(gè)空的Node作為頭結(jié)點(diǎn),因?yàn)榇藭r(shí)隊(duì)列中只一個(gè)頭結(jié)點(diǎn),所以tail也指向head,第一次循環(huán)執(zhí)行結(jié)束
        } else {
            //進(jìn)行第二次循環(huán)時(shí),tail不為null,進(jìn)入else區(qū)域。將當(dāng)前線程的Node結(jié)點(diǎn)的prev指向tail,然后使用CAS將tail指向Node
                //這部分代碼和addWaiter代碼一樣,將當(dāng)前節(jié)點(diǎn)添加到隊(duì)列
                node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node; //t此時(shí)指向tail,所以可以CAS成功,將tail重新指向CNode。此時(shí)t為更新前的tail的值,即指向空的頭結(jié)點(diǎn),t.next=node,就將頭結(jié)點(diǎn)的后續(xù)結(jié)點(diǎn)指向Node,返回頭結(jié)點(diǎn)。
                    return t;
            }
        }
    }
}

代碼運(yùn)行到這里, AQS 隊(duì)列的結(jié)構(gòu)就是這樣一個(gè)表現(xiàn)。

JavaGuide_并發(fā)編程_原理3_AQS隊(duì)列狀態(tài).png

acquireQueued

 `addWaiter` 返回了插入的節(jié)點(diǎn),作為 `acquireQueued` 方法的入?yún)?,這個(gè)方法主要用于爭搶鎖。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();// 獲取prev節(jié)點(diǎn),若為null即刻拋出NullPointException
            if (p == head && tryAcquire(arg)) {// 如果前驅(qū)為head才有資格進(jìn)行鎖的搶奪
                setHead(node); // 獲取鎖成功后就不需要再進(jìn)行同步操作了,獲取鎖成功的線程作為新的head節(jié)點(diǎn)
                //凡是head節(jié)點(diǎn),head.thread與head.prev永遠(yuǎn)為null,但是head.next不為null
                p.next = null; // help GC
                failed = false; //獲取鎖成功
                return interrupted;
            }
            //如果獲取鎖失敗,則根據(jù)節(jié)點(diǎn)的waitStatus決定是否需要掛起線程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())// 若前面為true,則執(zhí)行掛起,待下次喚醒的時(shí)候檢測中斷的標(biāo)志
                interrupted = true;
        }
    } finally {
        if (failed) // 如果拋出異常則取消鎖的獲取,進(jìn)行出隊(duì)(sync queue)操作
            cancelAcquire(node);
    }
}
原來的 head 節(jié)點(diǎn)釋放鎖以后,會(huì)從隊(duì)列中移除,原來 head 節(jié)點(diǎn)的 next 節(jié)點(diǎn)會(huì)成為 head 節(jié)點(diǎn)
JavaGuide_并發(fā)編程_原理3_AQS隊(duì)列狀態(tài)_頭節(jié)點(diǎn)釋放.png

shouldParkAfterFailedAcquire

從上面的分析可以看出,只有隊(duì)列的第二個(gè)節(jié)點(diǎn)可以有機(jī)會(huì)爭用鎖,如果成功獲取鎖,則此節(jié)點(diǎn)晉升為頭節(jié)點(diǎn)。對于第三個(gè)及以后的節(jié)點(diǎn), `if (p == head)` 條件不成立,首先進(jìn)行 `shouldParkAfterFailedAcquire(p, node)` 操作。

 `#shouldParkAfterFailedAcquire` 方法是判斷一個(gè)爭用鎖的線程是否應(yīng)該被阻塞。它首先判斷一個(gè)節(jié)點(diǎn)的前置節(jié)點(diǎn)的狀態(tài)是否為 `Node.SIGNAL` ,如果是,是說明此節(jié)點(diǎn)已經(jīng)將狀態(tài)設(shè)置-如果鎖釋放,則應(yīng)當(dāng)通知它,所以它可以安全地阻塞了,返回 `true` 。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; // 前繼節(jié)點(diǎn)的狀態(tài)
    if (ws == Node.SIGNAL)//如果是SIGNAL狀態(tài),意味著當(dāng)前線程需要被unpark喚醒
        return true;
    // 如果前節(jié)點(diǎn)的狀態(tài)大于0,即為CANCELLED狀態(tài)時(shí),則會(huì)從前節(jié)點(diǎn)開始逐步循環(huán)找到一個(gè)沒有被“CANCELLED”節(jié)點(diǎn)設(shè)置為當(dāng)前節(jié)點(diǎn)的前節(jié)點(diǎn),返回false。在下次循環(huán)執(zhí)行shouldParkAfterFailedAcquire時(shí),返回true。這個(gè)操作實(shí)際是把隊(duì)列中CANCELLED的節(jié)點(diǎn)剔除掉。
    if (ws > 0) {// 如果前繼節(jié)點(diǎn)是“取消”狀態(tài),則設(shè)置 “當(dāng)前節(jié)點(diǎn)”的 “當(dāng)前前繼節(jié)點(diǎn)” 為 “‘原前繼節(jié)點(diǎn)'的前繼節(jié)點(diǎn)”。
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
        pred.next = node;
    } else { // 如果前繼節(jié)點(diǎn)為“0”或者“共享鎖”狀態(tài),則設(shè)置前繼節(jié)點(diǎn)為SIGNAL狀態(tài)。
        /*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

解讀:假如有t1,t2兩個(gè)線程都加入到了鏈表中
img
如果head節(jié)點(diǎn)位置的線程一直持有鎖,那么t1和t2就是掛起狀態(tài),而HEAD以及Thread1的awaitStatus都是 SIGNAL ,在多次嘗試獲取鎖失敗以后,就會(huì)通過下面的方法進(jìn)行掛起(這個(gè)地方就是避免了驚群效應(yīng),每個(gè)節(jié)點(diǎn)只需要關(guān)心上一個(gè)節(jié)點(diǎn)的狀態(tài)即可)

JavaGuide_并發(fā)編程_原理3_節(jié)點(diǎn)狀態(tài)的流轉(zhuǎn).png
  • SIGNAL:值為 -1 ,表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)將要或者已經(jīng)被阻塞,在當(dāng)前節(jié)點(diǎn)釋放的時(shí)候需要 unpark 后繼節(jié)點(diǎn);
  • CONDITION:值為 -2 ,表示當(dāng)前節(jié)點(diǎn)在等待 condition ,即在 condition 隊(duì)列中;
  • PROPAGATE:值為 -3 ,表示 releaseShared 需要被傳遞給后續(xù)節(jié)點(diǎn)(僅在共享模式下使用);

parkAndCheckInterrupt

如果 shouldParkAfterFailedAcquire 返回了 true ,則執(zhí)行:“ parkAndCheckInterrupt() ”方法,它是通過 LockSupport.park(this)將當(dāng)前線程掛起到WATING狀態(tài),它需要等待一個(gè)中斷、unpark方法來喚醒它,通過這樣一種FIFO的機(jī)制的等待,來實(shí)現(xiàn)了Lock的操作
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);// LockSupport提供park()和unpark()方法實(shí)現(xiàn)阻塞線程和解除線程阻塞
    return Thread.interrupted();
}

ReentrantLock.unlock

加鎖的過程分析完以后,再來分析一下釋放鎖的過程,調(diào)用 release 方法,這個(gè)方法里面做兩件事。

  1. 釋放鎖 ;
  2. 喚醒 park 的線程
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease

這個(gè)動(dòng)作可以認(rèn)為就是一個(gè)設(shè)置鎖狀態(tài)的操作,而且是將狀態(tài)減掉傳入的參數(shù)值(參數(shù)是 1 ),如果結(jié)果狀態(tài)為 0 ,就將排它鎖的 Owner 設(shè)置為 null ,以使得其他的線程有機(jī)會(huì)進(jìn)行執(zhí)行。 在排它鎖中,加鎖的時(shí)候狀態(tài)會(huì)增加 1 (當(dāng)然可以自己修改這個(gè)值),在解鎖的時(shí)候減掉 1 ,同一個(gè)鎖,在可以重入后,可能會(huì)被疊加為 2、3、4這些值,只有 `unlock()` 的次數(shù)與 `lock()` 的次數(shù)對應(yīng)才會(huì)將 Owner 線程設(shè)置為空,而且也只有這種情況下才會(huì)返回 true  。
protected final boolean tryRelease(int releases) {
    int c = getState() - releases; // 這里是將鎖的數(shù)量減1
    if (Thread.currentThread() != getExclusiveOwnerThread())// 如果釋放的線程和獲取鎖的線程不是同一個(gè),拋出非法監(jiān)視器狀態(tài)異常
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        // 由于重入的關(guān)系,不是每次釋放鎖c都等于0,
        // 直到最后一次釋放鎖時(shí),才會(huì)把當(dāng)前線程釋放
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free; 
}

LockSupport

 `LockSupport` 類是 Java6 引入的一個(gè)類,提供了基本的線程同步原語。LockSupport 實(shí)際上是調(diào)用了 Unsafe 類里的函數(shù),歸結(jié)到 Unsafe 里,只有兩個(gè)函數(shù):
public native void unpark(Thread jthread);

public native void park(boolean isAbsolute, long time);

 `unpark` 函數(shù)為線程提供“許可(permit)”,線程調(diào)用park函數(shù)則等待“許可”。這個(gè)有點(diǎn)像信號量,但是這個(gè)“許可”是不能疊加的,“許可”是一次性的。

permit相當(dāng)于0/1的開關(guān),默認(rèn)是0,調(diào)用一次 unpark 就加 1 變成了 1 .調(diào)用一次 park 會(huì)消費(fèi) permit ,又會(huì)變成 0 。 如果再調(diào)用一次 park 會(huì)阻塞,因?yàn)?permit 已經(jīng)是 0 了。直到 permit 變成 1 .這時(shí)調(diào)用 unpark 會(huì)把permit 設(shè)置為 1 .每個(gè)線程都有一個(gè)相關(guān)的 permit , permit 最多只有一個(gè),重復(fù)調(diào)用unpark不會(huì)累積。

在使用 LockSupport 之前,我們對線程做同步,只能使用 wait 和 notify ,但是 wait 和 notify 其實(shí)不是很靈活,并且耦合性很高,調(diào)用notify必須要確保某個(gè)線程處于 wait 狀態(tài),而 park/unpark 模型真正解耦了線程之間的同步,先后順序沒有沒有直接關(guān)聯(lián),同時(shí)線程之間不再需要一個(gè)Object或者其他變量來存儲(chǔ)狀態(tài),不再需要關(guān)心對方的狀態(tài)。

總結(jié)

分析了獨(dú)占式同步狀態(tài)獲取和釋放過程后,做個(gè)簡單的總結(jié):在獲取同步狀態(tài)時(shí),同步器維護(hù)一個(gè)同步隊(duì)列,獲取狀態(tài)失敗的線程都會(huì)被加入到隊(duì)列中并在隊(duì)列中進(jìn)行自旋;移出隊(duì)列(或停止自旋)的條件是前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)且成功獲取了同步狀態(tài)。在釋放同步狀態(tài)時(shí),同步器調(diào)用 `tryRelease(int arg)` 方法釋放同步狀態(tài),然后喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)。

公平鎖和非公平鎖的區(qū)別

鎖的公平性是相對于獲取鎖的順序而言的,如果是一個(gè)公平鎖,那么鎖的獲取順序就應(yīng)該符合請求的絕對時(shí)間順序,也就是 **FIFO** 。 在上面分析的例子來說,只要 **CAS** 設(shè)置同步狀態(tài)成功,則表示當(dāng)前線程獲取了鎖,而公平鎖則不一樣,差異點(diǎn)有兩個(gè)。

FairSync.tryAcquire

final void** lock() {
    acquire(1);
}

非公平鎖在獲取鎖的時(shí)候,會(huì)先通過CAS進(jìn)行搶占,而公平鎖則不會(huì)

FairSync.tryAcquire

protected final boolean* tryAcquire(int acquires) {
    final Thread current = Thread.currentThread*();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
這個(gè)方法與 `nonfairTryAcquire(int acquires)` 比較,不同的地方在于判斷條件多了 `hasQueuedPredecessors()` 方法,也就是加入了[同步隊(duì)列中當(dāng)前節(jié)點(diǎn)是否有前驅(qū)節(jié)點(diǎn)]的判斷,如果該方法返回 `true` ,則表示有線程比當(dāng)前線程更早地請求獲取鎖,因此需要等待前驅(qū)線程獲取并釋放鎖之后才能繼續(xù)獲取鎖。

Condition

通過前面的課程學(xué)習(xí),我們知道任意一個(gè)Java對象,都擁有一組監(jiān)視器方法(定義在 `java.lang.Object`上),主要包括 `wait()` 、 `notify()` 以及 `notifyAll()` 方法,這些方法與同步關(guān)鍵字配合,可以實(shí)現(xiàn)等待/通知模式  `J.U.C` 包提供了 `Condition` 來對鎖進(jìn)行精準(zhǔn)控制, `Condition` 是一個(gè)多線程協(xié)調(diào)通信的工具類,可以讓某些線程一起等待某個(gè)條件( `Condition` ),只有滿足條件時(shí),線程才會(huì)被喚醒。

condition使用案例

ConditionWait
@RequiredArgsConstructor
public class ConditionWait extends Thread {
    private final Lock lock;
    private final Condition condition;

    @Override
    public void run() {
        lock.lock();
        try {
            System.out.println("【" + Thread.currentThread().getName() + "】開始執(zhí)行 condition.await()");
            condition.await();  //
            System.out.println("【" + Thread.currentThread().getName() + "】執(zhí)行結(jié)束 condition.await()");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
ConditionSignal
@RequiredArgsConstructor
public class ConditionNotify extends Thread {
    private final Lock lock;
    private final Condition condition;

    @Override
    public void run() {
        lock.lock();
        try {
            System.out.println("【" + Thread.currentThread().getName() + "】開始執(zhí)行 condition.signal()");
            condition.signal();  // signal 和 signalAll
            System.out.println("【" + Thread.currentThread().getName() + "】執(zhí)行結(jié)束 condition.signal()");
        } finally {
            lock.unlock();
        }
    }
}
通過這個(gè)案例簡單實(shí)現(xiàn)了 `wait` 和 `notify` 的功能,當(dāng)調(diào)用 `await` 方法后,當(dāng)前線程會(huì)釋放鎖并等待,而其他線程調(diào)用 `condition` 對象的 `signal` 或者 `signalall` 方法通知被阻塞的線程,然后自己執(zhí)行 `unlock` 釋放鎖,被喚醒的線程獲得之前的鎖繼續(xù)執(zhí)行,最后釋放鎖。

所以,condition 中兩個(gè)最重要的方法,一個(gè)是 await ,一個(gè)是 signal 方法

  • await : 把當(dāng)前線程阻塞掛起
  • signal : 喚醒阻塞的線程
public class ConnditionDemo {
    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        ConditionWait conditionWait = new ConditionWait(lock, condition);
        conditionWait.start();
        ConditionNotify conditionNotify = new ConditionNotify(lock, condition);
        conditionNotify.start();
    }
}
【Thread-0】開始執(zhí)行 condition.await()
【Thread-1】開始執(zhí)行 condition.signal()
【Thread-1】執(zhí)行結(jié)束 condition.signal()
【Thread-0】執(zhí)行結(jié)束 condition.await()

await 方法

調(diào)用 `Condition` 的 `await()` 方法(或者以 `await` 開頭的方法),會(huì)使當(dāng)前線程進(jìn)入等待隊(duì)列并釋放鎖,同時(shí)線程狀態(tài)變?yōu)榈却隣顟B(tài)。當(dāng)從 `await()`方法返回時(shí),當(dāng)前線程一定獲取了 `Condition` 相關(guān)聯(lián)的鎖。
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter(); //創(chuàng)建一個(gè)新的節(jié)點(diǎn),節(jié)點(diǎn)狀態(tài)為condition,采用的數(shù)據(jù)結(jié)構(gòu)仍然是鏈表
    int savedState = fullyRelease(node); //釋放當(dāng)前的鎖,得到鎖的狀態(tài),并喚醒AQS隊(duì)列中的一個(gè)線程
    int interruptMode = 0;
    //如果當(dāng)前節(jié)點(diǎn)沒有在同步隊(duì)列上,即還沒有被signal,則將當(dāng)前線程阻塞
    //isOnSyncQueue 判斷當(dāng)前 node 狀態(tài),如果是 CONDITION 狀態(tài),或者不在隊(duì)列上了,就繼續(xù)阻塞,還在隊(duì)列上且不是 CONDITION 狀態(tài)了,就結(jié)束循環(huán)和阻塞
    while (!isOnSyncQueue(node)) {//第一次判斷的是false,因?yàn)榍懊嬉呀?jīng)釋放鎖了
        LockSupport.park(this); // 第一次總是 park 自己,開始阻塞等待
        // 線程判斷自己在等待過程中是否被中斷了,如果沒有中斷,則再次循環(huán),會(huì)在 isOnSyncQueue 中判斷自己是否在隊(duì)列上.
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 當(dāng)這個(gè)線程醒來,會(huì)嘗試拿鎖, 當(dāng) acquireQueued 返回 false 就是拿到鎖了.
    // interruptMode != THROW_IE -> 表示這個(gè)線程沒有成功將 node 入隊(duì),但 signal 執(zhí)行了 enq 方法讓其入隊(duì)了.
    // 將這個(gè)變量設(shè)置成 REINTERRUPT.
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 如果 node 的下一個(gè)等待者不是 null, 則進(jìn)行清理,清理 Condition 隊(duì)列上的節(jié)點(diǎn).
    // 如果是 null ,就沒有什么好清理的了.
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 如果線程被中斷了,需要拋出異常.或者什么都不做
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

signal

調(diào)用 `Condition` 的 `signal()` 方法,將會(huì)喚醒在等待隊(duì)列中等待時(shí)間最長的節(jié)點(diǎn)(首節(jié)點(diǎn)),在喚醒節(jié)點(diǎn)之前,會(huì)將節(jié)點(diǎn)移到同步隊(duì)列中
public final void signal() {
    if (!isHeldExclusively()) //先判斷當(dāng)前線程是否獲得了鎖
        throw new IllegalMonitorStateException();
    Node first = firstWaiter; // 拿到 Condition 隊(duì)列上第一個(gè)節(jié)點(diǎn)
    if (first != null)
        doSignal(first);
}
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)// 如果第一個(gè)節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)是 null,那么, 最后一個(gè)節(jié)點(diǎn)也是 null.
            lastWaiter = null; // 將 next 節(jié)點(diǎn)設(shè)置成 null
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
該方法先是 CAS 修改了節(jié)點(diǎn)狀態(tài),如果成功,就將這個(gè)節(jié)點(diǎn)放到 AQS 隊(duì)列中,然后喚醒這個(gè)節(jié)點(diǎn)上的線程。此時(shí),那個(gè)節(jié)點(diǎn)就會(huì)在 await 方法中蘇醒。
final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    Node p = enq(node);
    int ws = p.waitStatus;
    // 如果上一個(gè)節(jié)點(diǎn)的狀態(tài)被取消了, 或者嘗試設(shè)置上一個(gè)節(jié)點(diǎn)的狀態(tài)為 SIGNAL 失敗了(SIGNAL 表示: 他的
    next 節(jié)點(diǎn)需要停止阻塞),
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread); // 喚醒輸入節(jié)點(diǎn)上的線程.
    return true;
}

讀寫鎖與 synchronized 在只讀的線程中需要耗費(fèi)大量的時(shí)間的時(shí)候的性能對比,本質(zhì)在于當(dāng)讀操作時(shí),不進(jìn)行阻塞

// [readWrite] 9999次讀操作{每次讀操作 睡眠 1 ms},1次 " +1" 操作以后, 結(jié)果為:1[耗時(shí)]:665
// [synchronized]  9999次讀操作{每次讀操作 睡眠 1 ms},1次 " +1" 操作以后, 結(jié)果為:1[耗時(shí)]:19007
public class TestperformanceDemo {
    static Integer demoInteger = 0;
    // 重入讀寫鎖
    static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    static Lock read = readWriteLock.readLock();    // 讀鎖
    static Lock write = readWriteLock.writeLock();  // 寫鎖


    public static void main(String[] args) {
        int countSum = 10000;
        add1000Seconds(countSum);
        add1000synchronized(countSum);
        // [readWrite] 9999次讀操作{每次讀操作 睡眠 1 ms},1次 " +1" 操作以后, 結(jié)果為:1[耗時(shí)]:665
        // [synchronized]  9999次讀操作{每次讀操作 睡眠 1 ms},1次 " +1" 操作以后, 結(jié)果為:1[耗時(shí)]:19007
    }

    /***
     * 第二種方式
     */
    public static void add1000Seconds(int countSum) {
        demoInteger = 0;
        ExecutorService executorService = Executors.newFixedThreadPool(countSum);
        long start = System.currentTimeMillis();
        CompletionService completionService = new ExecutorCompletionService(executorService);
        for (int i = 0; i < countSum; i++) {
            int finalI = i;
            completionService.submit(() -> {
                if (finalI == 0) {
                     write.lock();
                    try {
                        demoInteger++;
                    } finally {
                        write.unlock();
                    }
                } else {
                    read.lock();
                    try {
                        Integer value = demoInteger;
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        read.unlock();
                    }
                }

            }, null);
        }
        int count = 0;
        while (count < countSum) { // 等待任務(wù)完成全部
            if (completionService.poll() != null) {
                count++;
            }
        }
        long end = System.currentTimeMillis();

        System.out.println("[readWrite] " + (countSum - 1) + "次讀操作{每次讀操作 睡眠 1 ms},"
                           + 1 + "次 \" +1\" 操作以后, 結(jié)果為:" + demoInteger
                           + "[耗時(shí)]:" + (end - start));
    }


    /***
     * synchronized
     */
    public static void add1000synchronized(int countSum) {
        demoInteger = 0;
        ExecutorService executorService = Executors.newFixedThreadPool(countSum);
        long start = System.currentTimeMillis();
        CompletionService completionService = new ExecutorCompletionService(executorService);
        for (int i = 0; i < countSum; i++) {
            int finalI = i;
            completionService.submit(() -> {
                synchronized (RWLockDemo.class) {
                    if (finalI == 0) {
                        demoInteger++;
                    } else {
                        try {
                            Thread.sleep(1);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Integer value = demoInteger;
                    }
                }
            }, null);
        }
        int count = 0;
        while (count < countSum) { // 等待任務(wù)完成全部
            if (completionService.poll() != null) {
                count++;
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("[synchronized]  " + (countSum - 1) + "次讀操作{每次讀操作 睡眠 1 ms},"
                           + 1 + "次 \" +1\" 操作以后, 結(jié)果為:" + demoInteger
                           + "[耗時(shí)]:" + (end - start));
    }
}

JavaGuide

來源于: https://javaguide.net

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容