萬字長文!從底層開始帶你了解并發(fā)編程,徹底幫你搞懂java鎖!

線程是否要鎖住同步資源

  • 鎖住 悲觀鎖
  • 不鎖住 樂觀鎖

鎖住同步資源失敗 線程是否要阻塞

  • 阻塞
  • 不阻塞自旋鎖,適應性自旋鎖

多個線程競爭同步資源的流程細節(jié)有沒有區(qū)別

  • 不鎖住資源,多個線程只有一個能修改資源成功,其它線程會重試無鎖
  • 同一個線程執(zhí)行同步資源時自動獲取資源偏向鎖
  • 多個線程競爭同步資源時,沒有獲取資源的線程自旋等待鎖釋放 輕量級鎖
  • 多個線程競爭同步資源時,沒有獲取資源的線程阻塞等待喚醒 重量級鎖

4.多個線程競爭鎖時是否要排隊

  • 排隊公平鎖
  • 先嘗試插隊,插隊失敗在排隊非公平鎖

一個線程的多個流程能不能獲取同一把鎖

  • 能 可重入鎖
  • 不能非可重入鎖

多個線程能不能共享一把鎖

  • 能 共享
  • 不能排他鎖

悲觀鎖與樂觀鎖

悲觀鎖與樂觀鎖時一種廣義的概念,體現(xiàn)的是看待線程同步的不同角度。

悲觀鎖

悲觀鎖認為自己在使用數據的時候一定有別的線程來修改數據,在獲取數據的時候會先加鎖,確保數據不會被別的線程修改。
鎖實現(xiàn):synchronized 接口Lock的實現(xiàn)類
適用場景:寫操作多,先加鎖可以保證寫操作時數據正確。

樂觀鎖

樂觀鎖認為自己在使用數據時不會有別的線程修改數據,所以不會添加鎖,只是在更新數據的時候去判斷之前有沒有別的線程更新了這個數據。
鎖實現(xiàn):CAS算法,例如AtomicInteger類的原子自增時通過CAS自旋實現(xiàn)。
適用場景:讀操作較多,不加鎖的特點能夠使其讀操作的性能大幅度提升。
樂觀鎖的執(zhí)行流程:
線程A獲取到數據以后直接操作,操作完數據以后準備更新同步資源,更新之前會先判斷內存中同步資源是否被更新:
1.如果沒有被更新,更新內存中同步資源的值。
2.如果同步資源被其他線程更新,根據實現(xiàn)方法執(zhí)行不同的操做(報錯or重試)。

CAS算法

全名:Compare And Swap(比較并交換)
無鎖算法:基于硬件原語實現(xiàn),在不使用鎖(沒有線程被阻塞)的情況下實現(xiàn)多線程之間的變量同步。
jdk中的實現(xiàn):java.util.concurrent包中的原子類就是通過CAS來實現(xiàn)了樂觀鎖。
算法涉及到的三個操作數:

需要讀寫的內存值V
進行比較的值A
要寫入的新值的B

CAS存在的問題

1.ABA問題
線程1準備用CAS將變量的值由A替換為B,在此之前,線程2將變量的值由A替換為C,又由C替換為A,然后線程1執(zhí)行CAS時發(fā)現(xiàn)變量的值仍然為A,所以CAS成功。但實際上這時的現(xiàn)場已經和最初不同了,盡管CAS成功,但可能存在潛藏的問題。
舉例:一個小偷,把別人家的錢偷了之后又還了回來,還是原來的錢嗎,你老婆出軌之后又回來,還是原來的老婆嗎?ABA問題也一樣,如果不好好解決就會帶來大量的問題。最常見的就是資金問題,也就是別人如果挪用了你的錢,在你發(fā)現(xiàn)之前又還了回來。但是別人卻已經觸犯了法律。
但是jdk已經解決了這個問題。
想追下源碼來著,但是一追發(fā)現(xiàn)直接到c了。

2.循環(huán)時間長開銷大
3.只能保證一個共享變量的原子操作

Unsafe

AtomicInteger

public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

從這里可見原子類的方法調用了unsafe類的方法
unsafe

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

再往底層的話,實際加上就到了C。
最終的實現(xiàn)就是 cmpxchg = cas修改變量值

lock cmpxchg 指令

硬件:
lock指令在執(zhí)行后面指令的時候鎖定一個北橋信號(不采用鎖總線的方式)

自旋鎖

是指當一個線程在獲取鎖的時候,如果鎖已經被其他線程獲取,那么該線程將循環(huán)等待,然后不斷的判斷鎖是否能夠被成功獲取,自旋直到獲取到鎖才會退出循環(huán)。

自旋存在的意義與使用場景

阻塞與喚醒線程需要操作系統(tǒng)切換CPU狀態(tài),需要消耗一定時間。
同步代碼塊邏輯簡單,執(zhí)行時間很短。
自適應自旋鎖假定不同線程持有同一個鎖對象的時間基本相當,競爭程度趨于穩(wěn)定,因此,可以根據上一次自旋的時間與結果調整下一次自旋的時間。
JDK>=1.7自旋鎖的參數被取消,虛擬機不再支持由用戶配置自旋鎖,自旋鎖總是會執(zhí)行,自旋鎖次數也是由虛擬機自動調整。


鎖升級

鎖升級流程

jdk6的時候對鎖進行了很多優(yōu)化,其中就有了鎖的升級過程。

1.偏向鎖:只有一個線程進入臨界區(qū),適用于只有一個線程訪問同步塊的場景
2.輕量級鎖:多線程為競爭或者競爭不激烈,適用于追求響應時間,同步塊執(zhí)行速度非常快。
3.重量級鎖:多線程競爭,適用于追求吞吐量,同步塊執(zhí)行速度較長。

JVM對象加鎖的原理

對象的內存結構?
對象頭:比如hash碼,對象所屬的年代,對象鎖,鎖狀態(tài)標志,偏向鎖(線程)ID,偏向時間,數組長度(數組對象)等。
實例數據:創(chuàng)建對象時,對象中的成員變量,方法等。
對齊填充:就為了湊夠8的倍數。


利用一個插件,對比對象上鎖前后的差異:

    <dependencies>
        <dependency>
            <groupId>org.openjdk.jol</groupId>
            <artifactId>jol-core</artifactId>
            <version>0.9</version>
        </dependency>
    </dependencies>

    public static void main(String[] args) {
        Object o = new Object();
        System.out.println(ClassLayout.parseInstance(o).toPrintable());
        System.out.println("===============================================");
        synchronized (o){
            System.out.println(ClassLayout.parseInstance(o).toPrintable());
        }
    }

實例對象是怎樣存儲的?

對象實例存儲在堆空間,對象的元數據存在元空間,對象引用存在??臻g。

鎖消除

/**
 * @author yhd
 * @createtime 2020/9/8 20:40
 */
public class Demo2 {
    public static void main(String[] args) {
        StringBuffer buffer = new StringBuffer();
        buffer.append("a").append("b");
        System.out.println(ClassLayout.parseInstance(buffer).toPrintable());
    }
}

我們都知道StringBuffer是線程安全的,因為他的關鍵方法都加了synchronized,但是,從打印結果可以看出,鎖被消除了。因為buffer這個引用只會在main方法中使用,不可能被其他線程引用(因為是局部變量,棧私有),所以buffer是不可能共享的資源,JVM會自動消除StringBuffer對象內部的鎖。

鎖粗化

/**
 * @author yhd
 * @createtime 2020/9/8 20:48
 */
public class Demo3 {
    public static void main(String[] args) {
        int i=0;
        StringBuffer buffer = new StringBuffer();
        while (i<100){
            buffer.append(i);
            i++;
        }
        System.out.println(buffer.toString());
        System.out.println(ClassLayout.parseInstance(buffer).toPrintable());
    }
}

JVM會檢測到這樣一連串的操作都對同一個對象加鎖(while 循環(huán)內 100 次執(zhí)行 append,沒有鎖粗化的就要進行 100 次加鎖/解鎖),此時 JVM 就會將加鎖的范圍粗化到這一連串的操作的外部(比如 while 虛幻體外),使得這一連串操作只需要加一次鎖即可。

AQS

研究了AQS一天,終于找到了他的入口,接下來看我的想法:

多線程操做共享數據問題

/**
 * @author yhd
 * @createtime 2020/9/8 8:11
 */
public class Demo1 {
    public static int m=0;

    public static void main(String[] args)throws Exception {
        Thread []threads=new Thread[100];
        for (int i = 0; i < threads.length; i++) {
            threads[i]=new Thread(()->{
                for (int j = 0; j < 100; j++) {
                    m++;
                }
            });
        }
        for (Thread t :threads) t.start();
        for (Thread t :threads) t.join();//線程順序結束
        System.out.println(m);
    }
}

毫無疑問,這段代碼是存在線程安全問題的,只要了解一點并發(fā)編程,都是可以看出來的。那么我們可以怎么來解決呢?

使用synchronized來解決

/**
 * @author yhd
 * @createtime 2020/9/8 8:32
 */
public class Demo2 {

    public static int m=0;

    public static void main(String[] args)throws Exception {
        Thread []threads=new Thread[100];
        for (int i = 0; i < threads.length; i++) {
            threads[i]=new Thread(()->{
                synchronized (Demo2.class) {
                    for (int j = 0; j < 100; j++) {
                        m++;
                    }
                }
            });
        }
        for (Thread t :threads) t.start();
        for (Thread t :threads) t.join();//線程順序結束
        System.out.println(m);
    }

這樣解決了線程安全的問題,但是同時也存在一個問題,synchronized是操作系統(tǒng)層面的方法,也就是需要jvm和操做系統(tǒng)之間進行一個切換(用戶態(tài)和內核態(tài)的切換),這樣實際上是影響效率的。另一種解決辦法:

使用ReentrantLock來解決

/**
 * @author yhd
 * @createtime 2020/9/8 8:41
 */
public class Demo3 {

    public static int m=0;
    public static Lock lock=new ReentrantLock();
    public static void main(String[] args)throws Exception {
        Thread []threads=new Thread[100];
        for (int i = 0; i < threads.length; i++) {
            threads[i]=new Thread(()->{

                try {
                    lock.lock();
                    for (int j = 0; j < 100; j++) {
                            m++;
                    }
                } finally {
                    lock.unlock();
                }
            });
        }
        for (Thread t :threads) t.start();
        for (Thread t :threads) t.join();//線程順序結束
        System.out.println(m);
    }
}

那么這種方式的底層是怎么做的呢?接下來追源碼。

    public ReentrantLock() {
        sync = new NonfairSync();
    }

這個sync又是啥呢?接著追

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

它實際上是ReentrantLock的一個內部類繼承了Sync,而他里面的方法實際上也是調用了Sync的方法。這樣目標就明確了,我們可以看一下Sync
這個Sync的源碼:

abstract static class Sync extends AbstractQueuedSynchronizer

由此可知,實際上ReentrantLock是利用AbstractQueuedSynchronizer也就是AQS來實現(xiàn)的。

AbstractQueuedSynchronizer概述

這個類的內部有一個內部類Node

static final class Node {
    static final Node SHARED = new Node();
    volatile Node prev;//前驅指針
    volatile Node next;//后繼指針
    volatile Thread thread;//當前線程
    private transient volatile Node head;//頭節(jié)點
    private transient volatile Node tail;//尾節(jié)點
    private volatile int state;//鎖狀態(tài),加鎖成功則為1,重入+1 解鎖則為0
    .....
}

看到這里就想到了LinkedHashMap,實際上也就是類似于維持了一個雙向的鏈表,每個節(jié)點都是一個線程。

它維護了一個volatile int state(代表共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)。這里volatile是核心關鍵詞,具體volatile的語義,在此不述。state的訪問方式有三種:

    getState()
    setState()
    compareAndSetState()

AQS定義兩種資源共享方式:Exclusive(獨占,只有一個線程能執(zhí)行,如ReentrantLock)和Share(共享,多個線程可同時執(zhí)行,如Semaphore/CountDownLatch)。
  不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現(xiàn)時只需要實現(xiàn)共享資源state的獲取與釋放方式即可,至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現(xiàn)好了。自定義同步器實現(xiàn)時主要實現(xiàn)以下幾種方法:

`isHeldExclusively()`:該線程是否正在獨占資源。只有用到condition才需要去實現(xiàn)它。
`tryAcquire(int)`:獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
`tryRelease(int)`:獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
`tryAcquireShared(int)`:共享方式。嘗試獲取資源。負數表示失?。?表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
`tryReleaseShared(int)`:共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結點返回true,否則返回false。

ReentrantLock為例,state初始化為0,表示未鎖定狀態(tài)。A線程lock()時,會調用tryAcquire()獨占該鎖并將state+1。此后,其他線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)為止,其它線程才有機會獲取該鎖。當然,釋放鎖之前,A線程自己是可以重復獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多么次,這樣才能保證state是能回到零態(tài)的。

再以CountDownLatch以例,任務分為N個子線程去執(zhí)行,state也初始化為N(注意N要與線程個數一致)。這N個子線程是并行執(zhí)行的,每個子線程執(zhí)行完后countDown()一次,state會CAS減1。等到所有子線程都執(zhí)行完后(即state=0),會unpark()主調用線程,然后主調用線程就會從await()函數返回,繼續(xù)后余動作。

一般來說,自定義同步器要么是獨占方法,要么是共享方式,他們也只需實現(xiàn)tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支持自定義同步器同時實現(xiàn)獨占和共享兩種方式,如ReentrantReadWriteLock

源碼解讀與執(zhí)行流程分析

獨占鎖

acquire(int)==
此方法是獨占模式下線程獲取共享資源的頂層入口。如果獲取到資源,線程直接返回,否則進入等待隊列,直到獲取到資源為止,且整個過程忽略中斷的影響。這也正是lock()的語義,當然不僅僅只限于lock()。獲取到資源后,線程就可以去執(zhí)行其臨界區(qū)代碼了。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

函數流程如下:

tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
addWaiter()將該線程加入等待隊列的尾部,并標記為獨占模式;
acquireQueued()使線程在等待隊列中獲取資源,一直獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源后才再進行自我中斷selfInterrupt(),將中斷補上。
tryAcquire(int)
此方法嘗試去獲取獨占資源。如果獲取成功,則直接返回true,否則直接返回false。這也正是tryLock()的語義,還是那句話,當然不僅僅只限于tryLock()。

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

一開始還傻乎乎的想為啥直接拋異常了,后來才反應過來,這不是給自定義的方法么?AQS這里只定義了一個接口,具體資源的獲取交由自定義同步器去實現(xiàn)了。
addWaiter(Node)
此方法用于將當前線程加入到等待隊列的隊尾,并返回當前線程所在的結點。

    private Node addWaiter(Node mode) {
     //以給定模式構造結點。mode有兩種:EXCLUSIVE(獨占)和SHARED(共享)
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

Node結點是對每一個訪問同步代碼的線程的封裝,其包含了需要同步的線程本身以及線程的狀態(tài),如是否被阻塞,是否等待喚醒,是否已經被取消等。變量waitStatus則表示當前被封裝成Node結點的等待狀態(tài),共有4種取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。

CANCELLED:值為1,在同步隊列中等待的線程等待超時或被中斷,需要從同步隊列中取消該Node的結點,其結點的waitStatus為CANCELLED,即結束狀態(tài),進入該狀態(tài)后的結點將不會再變化。

SIGNAL:值為-1,被標識為該等待喚醒狀態(tài)的后繼結點,當其前繼結點的線程釋放了同步鎖或被取消,將會通知該后繼結點的線程執(zhí)行。說白了,就是處于喚醒狀態(tài),只要前繼結點釋放鎖,就會通知標識為SIGNAL狀態(tài)的后繼結點的線程執(zhí)行。

CONDITION:值為-2,與Condition相關,該標識的結點處于等待隊列中,結點的線程等待在Condition上,當其他線程調用了Condition的signal()方法后,CONDITION狀態(tài)的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。

PROPAGATE:值為-3,與共享模式相關,在共享模式中,該狀態(tài)標識結點的線程處于可運行狀態(tài)。

0狀態(tài):值為0,代表初始化狀態(tài)。

AQS在判斷狀態(tài)時,通過用waitStatus>0表示取消狀態(tài),而waitStatus<0表示有效狀態(tài)。

== enq(Node)==
此方法用于將node加入隊尾。

    private Node enq(final Node node) {
        for (;;) {//自旋
            Node t = tail;
            if (t == null) { // Must initialize
            // 隊列為空,創(chuàng)建一個空的標志結點作為head結點,并將tail也指向它
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {//正常放入隊列尾部
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

cas自旋volatile變量
acquireQueued(Node, int)
通過tryAcquire()和addWaiter(),該線程獲取資源失敗,已經被放入等待隊列尾部了。聰明的你立刻應該能想到該線程下一步該干什么了吧:進入等待狀態(tài)休息,直到其他線程徹底釋放資源后喚醒自己,自己再拿到資源,然后就可以去干自己想干的事了。沒錯,就是這樣!是不是跟醫(yī)院排隊拿號有點相似~~acquireQueued()就是干這件事:在等待隊列中排隊拿號(中間沒其它事干可以休息),直到拿到號后再返回。這個函數非常關鍵,還是上源碼吧:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;//標記是否已經拿到鎖
        try {
            boolean interrupted = false;//標記等待過程中是否被中斷過
            for (;;) {
                final Node p = node.predecessor();//拿到前驅節(jié)點
                //如果前驅是head,即該結點已成老二,那么便有資格去嘗試獲取資源(可能是老大釋放完資源喚醒自己的,當然也可能被interrupt了)。
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    //拿到資源后,將head指向該結點。所以head所指的標桿結點,就是當前獲取到資源的那個結點或null。
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //如果自己可以休息了,就進入waiting狀態(tài),直到被unpark()
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;//如果等待過程中被中斷過,哪怕只有那么一次,就將interrupted標記為true
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire(Node, Node)
此方法主要用于檢查狀態(tài),看看自己是否真的可以去休息了。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;//拿到前驅的狀態(tài)
        if (ws == Node.SIGNAL)
            //如果已經告訴前驅拿完號后通知自己一下,那就可以安心休息了
            return true;
        if (ws > 0) {
           /*
           * 如果前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀態(tài),并排在它的后邊。
           * 注意:那些放棄的結點,由于被自己“加塞”到它們前邊,它們相當于形成一個無引用鏈,稍后就會被保安大叔趕走了(GC回收)!
          */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //如果前驅正常,那就把前驅的狀態(tài)設置成SIGNAL,告訴它拿完號后通知自己一下。有可能失敗,人家說不定剛剛釋放完呢!
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

整個流程中,如果前驅結點的狀態(tài)不是SIGNAL,那么自己就不能安心去休息,需要去找個安心的休息點,同時可以再嘗試下看有沒有機會輪到自己拿號。
parkAndCheckInterrupt()
如果線程找好安全休息點后,那就可以安心去休息了。此方法就是讓線程去休息,真正進入等待狀態(tài)。

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
``

Thread.interrupted()會清除當前線程的中斷標記位。 
==整個獲取鎖的流程:==
1.如果嘗試獲取鎖成功,直接返回。
2.沒成功,先加入到等待隊列尾部,標記為獨占模式。
3.嘗試這獲取一次鎖后,如果還是獲取不到就去休息,有機會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
4.如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源后才再進行自我中斷selfInterrupt(),將中斷補上。
**這也就是ReentrantLock.lock()的流程**
![](https://img-blog.csdnimg.cn/20200909091109420.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NTU5NjAyMg==,size_16,color_FFFFFF,t_70#pic_center)
==release(int)==
此方法是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。

```java
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
                //喚醒等待隊列里的下一個線程
            return true;
        }
        return false;
    }

它是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!所以自定義同步器在設計tryRelease()的時候要明確這一點?。?/strong>
tryRelease(int)
此方法嘗試去釋放指定量的資源。

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

還是需要AQS的實現(xiàn)類自己去寫。
unparkSuccessor(Node)
此方法用于喚醒等待隊列中下一個線程。

    private void unparkSuccessor(Node node) {
        //這里,node一般為當前線程所在的結點。
        int ws = node.waitStatus;
        if (ws < 0)
            //置零當前線程所在的結點狀態(tài),允許失敗。
            compareAndSetWaitStatus(node, ws, 0);

        //找到下一個需要喚醒的結點s
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {//如果為空或已取消
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)//從這里可以看出,<=0的結點,都是還有效的結點。
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);//喚醒
    }

用unpark()喚醒等待隊列中最前邊的那個未放棄線程。
釋放鎖的流程
1.釋放指定鎖的資源并返回結果。
2.如果成功釋放,就喚醒等待隊列中最前邊的那個未放棄線程。
3.如果沒成功,返回false。

共享鎖

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

此方法用于將當前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應量的資源后才返回。

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {//成功
                        setHeadAndPropagate(node, r);//將head指向自己,還有剩余資源可以再喚醒之后的線程
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

跟獨占模式比,這里只有線程是head.next時(“老二”),才會去嘗試獲取資源,有剩余的話還會喚醒之后的隊友。那么問題就來了,假如老大用完后釋放了5個資源,而老二需要6個,老三需要1個,老四需要2個。老大先喚醒老二,老二一看資源不夠,他是把資源讓給老三呢,還是不讓?答案是否定的!老二會繼續(xù)park()等待其他線程釋放資源,也更不會去喚醒老三和老四了。獨占模式,同一時刻只有一個線程去執(zhí)行,這樣做未嘗不可;但共享模式下,多個線程是可以同時執(zhí)行的,現(xiàn)在因為老二的資源需求量大,而把后面量小的老三和老四也都卡住了。當然,這并不是問題,只是AQS保證嚴格按照入隊順序喚醒罷了(保證公平,但降低了并發(fā))。
== setHeadAndPropagate(Node, int)==

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

== doReleaseShared()==

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

自定義鎖

不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現(xiàn)時只需要實現(xiàn)共享資源state的獲取與釋放方式即可,至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現(xiàn)好了。自定義同步器實現(xiàn)時主要實現(xiàn)以下幾種方法:

    isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現(xiàn)它。
    tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
    tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
    tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失??;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
    tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結點返回true,否則返回false。

自定義一個簡單的鎖

/**
 * @author yhd
 * @createtime 2020/9/8 9:44
 */
public class MLock implements Lock {
    private AbstractQueuedSynchronizer sync=new Sync();
    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return null;
    }

    //自定義一個獨占鎖
    private class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            assert arg == 1;
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    }

}

Demo測試:

/**
 * @author yhd
 * @createtime 2020/9/8 9:36
 */
public class Demo6 {
    public static int m = 0;
    public static Lock lock = new MLock();

    public static void main(String[] args) throws Exception {
        Thread[] threads = new Thread[100];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {

                try {
                    lock.lock();
                    for (int j = 0; j < 100; j++) {
                        m++;
                    }
                } finally {
                    lock.unlock();
                }
            });
        }
        for (Thread t : threads) t.start();
        for (Thread t : threads) t.join();//線程順序結束
        System.out.println(m);
    }

}

最后

歡迎大家關注我的公眾號:前程有光,金九銀十跳槽面試季,為大家整理了1000多道BATZ面試題!將近500多頁pdf文檔的Java面試題資料放在里面,助你圓夢BATZ!文章都會在里面更新,整理的資料也會放在里面。謝謝你的觀看!

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容