Java中的隊(duì)列同步器AQS

一、AQS概念

1、隊(duì)列同步器是用來(lái)構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,使用一個(gè)int型變量代表同步狀態(tài),通過(guò)內(nèi)置的隊(duì)列來(lái)完成線程的排隊(duì)工作。

2、下面是JDK8文檔中對(duì)于AQS的部分介紹

  public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable

  提供一個(gè)框架,用于實(shí)現(xiàn)依賴(lài)先進(jìn)先出(FIFO)等待隊(duì)列的阻塞鎖和相關(guān)同步器(信號(hào)量,事件等)。 該類(lèi)被設(shè)計(jì)為大多數(shù)類(lèi)型的同步器的有用依據(jù),這些同步器依賴(lài)于單個(gè)原子int值來(lái)表示狀

態(tài)。子類(lèi)必須定義改變此狀態(tài)的protected方法,以及根據(jù)該對(duì)象被獲取或釋放來(lái)定義該狀態(tài)的含義。給定這些,這個(gè)類(lèi)中的其他方法執(zhí)行所有排隊(duì)和阻塞機(jī)制。 子類(lèi)可以保持其他狀態(tài)字段,但只以

原子方式更新int使用方法操縱值getState() , setState(int)和compareAndSetState(int, int)被跟蹤相對(duì)于同步。

  此類(lèi)支持默認(rèn)獨(dú)占模式和共享模式。 當(dāng)以獨(dú)占模式獲取時(shí),嘗試通過(guò)其他線程獲取不能成功。 多線程獲取的共享模式可能(但不需要)成功。 除了在機(jī)械意義上,這個(gè)類(lèi)不理解這些差異,當(dāng)共享

模式獲取成功時(shí),下一個(gè)等待線程(如果存在)也必須確定它是否也可以獲取。 在不同模式下等待的線程共享相同的FIFO隊(duì)列。 通常,實(shí)現(xiàn)子類(lèi)只支持這些模式之一,但是兩者都可以在

ReadWriteLock中發(fā)揮作用。僅支持獨(dú)占或僅共享模式的子類(lèi)不需要定義支持未使用模式的方法。

總結(jié)來(lái)說(shuō)就是:

①子類(lèi)通過(guò)繼承AQS并實(shí)現(xiàn)其抽象方法來(lái)管理同步狀態(tài),對(duì)于同步狀態(tài)的更改通過(guò)提供的getState()、setState(int state)、compareAndSetState(int expect, int update)來(lái)進(jìn)行操作,因?yàn)槭褂肅AS操作保證同步狀態(tài)的改變是原子的。

②子類(lèi)被推薦定義為自定義同步組件的靜態(tài)內(nèi)部類(lèi),同步器本身并沒(méi)有實(shí)現(xiàn)任何的同步接口,僅僅是定義了若干狀態(tài)獲取和釋放的方法來(lái)提供自定義同步組件的使用。

③同步器既可以支持獨(dú)占式的獲取同步狀態(tài),也可以支持共享式的獲取同步狀態(tài)(ReentrantLock、ReentrantReadWriteLock、CountDownLatch等不同類(lèi)型的同步組件)

3、同步器是實(shí)現(xiàn)鎖的關(guān)鍵,在鎖的實(shí)現(xiàn)中聚合同步器,利用同步器實(shí)現(xiàn)鎖的語(yǔ)義;

回到頂部

二、AQS的接口和實(shí)例

1、同步器的設(shè)計(jì)實(shí)現(xiàn)原理

繼承同步器并且重寫(xiě)指定的方法,然后將同步器組合在自定義同步組件的實(shí)現(xiàn)中,并且調(diào)用同步器提供的模板方法(這些模板方法會(huì)調(diào)用重寫(xiě)的方法);而重寫(xiě)指定的方法的時(shí)候,需要使用getState()、setState(int state)、compareAndSetState(int expect, int update)來(lái)訪問(wèn)或者更新同步狀態(tài)。下面是源碼中state變量和三個(gè)方法的定義聲明實(shí)現(xiàn)

1   /**

2? ? ? * .(同步狀態(tài))

3? ? ? */

4? ? private volatile int state;

5

6? ? /**

7? ? ? * (返回當(dāng)前的同步狀態(tài))

8? ? ? * 此操作的內(nèi)存語(yǔ)義為@code volatile read

9? ? ? */

10? ? protected final int getState() {

11? ? ? ? return state;

12? ? }

13

14? ? /**

15? ? ? * (設(shè)置新的同步狀態(tài))

16? ? ? * 此操作的內(nèi)存語(yǔ)義為@code volatile read

17? ? ? */

18? ? protected final void setState(int newState) {

19? ? ? ? state = newState;

20? ? }

21

22? ? /**

23? ? ? * (如果要更新的狀態(tài)和期望的狀態(tài)相同,那就通過(guò)原子的方式更新?tīng)顟B(tài))

24? ? ? * ( 此操作的內(nèi)存語(yǔ)義為@code volatile read 和 write)

25? ? ? * (如果更新的狀態(tài)和期望的狀態(tài)不同就返回false)

26? ? ? */

27? ? protected final boolean compareAndSetState(int expect, int update) {

28? ? ? ? return unsafe.compareAndSwapInt(this, stateOffset, expect, update);

29? ? }



2、下面介紹AQS提供可被重寫(xiě)的方法

1 /**

2? * 獨(dú)占式的獲取同步狀態(tài),實(shí)現(xiàn)該方法需要查詢(xún)當(dāng)前狀態(tài)并判斷同步狀態(tài)是否符合預(yù)期,然后再進(jìn)行CAS設(shè)置同步狀態(tài)

3? *

4? */

5 protected boolean tryAcquire(int arg) {

6? ? throw new UnsupportedOperationException();

7 }

8

9 /**

10? * 獨(dú)占式的釋放同步狀態(tài),等待獲取同步狀態(tài)的線程可以有機(jī)會(huì)獲取同步狀態(tài)

11? *

12? */

13 protected boolean tryRelease(int arg) {

14? ? throw new UnsupportedOperationException();

15 }

16

17 /**

18? * 嘗試以共享模式獲取。 該方法應(yīng)該查詢(xún)對(duì)象的狀態(tài)是否允許在共享模式下獲取該對(duì)象,如果是這樣,就可以獲取它。 該方法總是由執(zhí)行獲取的線程調(diào)用。

19? * 如果此方法報(bào)告失敗,則獲取方法可能將線程排隊(duì)(如果尚未排隊(duì)),直到被其他線程釋放為止。 獲取失敗時(shí)返回負(fù)值,如果在獲取成共享模式下功但沒(méi)

20? * 有后續(xù)共享模式獲取可以成功,則為零; 并且如果以共享模式獲取成功并且隨后的共享模式獲取可能成功,則為正值,在這種情況下,后續(xù)等待線程必須檢查可用性。

21? */

22 protected int tryAcquireShared(int arg) {

23? ? throw new UnsupportedOperationException(); //如果不支持共享模式 ,會(huì)拋出該異常

24 }

25

26 /**

27? * 嘗試將狀態(tài)設(shè)置為以共享模式釋放同步狀態(tài)。 該方法總是由執(zhí)行釋放的線程調(diào)用。

28? */

29 protected int tryReleaseShared(int arg) {

30? ? throw new UnsupportedOperationException(); //如果不支持共享模式 ,會(huì)拋出該異常

31 }

32

33 /**

34? * 當(dāng)前同步器是否在獨(dú)占模式下被線程占用,一般該方法表示是否被當(dāng)前線程所獨(dú)占

35? */

36 protected int isHeldExclusively(int arg) {

37? ? throw new UnsupportedOperationException(); //如果不支持共享模式 ,會(huì)拋出該異常

38 }

3、同步器提供的模板方法

在實(shí)現(xiàn)自定義同步組件的時(shí)候,需要重寫(xiě)上面的方法,而下面的模板方法會(huì)調(diào)用上面重寫(xiě)的方法。下面介紹同步器提供的模板方法

1 /**

2? * 以獨(dú)占模式獲取,忽略中斷。 通過(guò)調(diào)用至少一次tryAcquire(int)實(shí)現(xiàn),成功返回。 否則線

3? * 程排隊(duì),可能會(huì)重復(fù)阻塞和解除阻塞,直到成功才調(diào)用tryAcquire(int)

4? */

5 public final void acquire(int arg) {...}

6

7 /**

8? * 以獨(dú)占方式獲得,如果中斷,中止。 通過(guò)首先檢查中斷狀態(tài),然后調(diào)用至少一次

9? * tryAcquire(int) ,成功返回。 否則線程排隊(duì),可能會(huì)重復(fù)阻塞和解除阻塞,調(diào)用

10? * tryAcquire(int)直到成功或線程中斷。

11? */

12 public final void acquireInterruptibly(int arg) throws InterruptedException {...}

13

14 /**

15? * 嘗試以獨(dú)占模式獲取,如果中斷則中止,如果給定的超時(shí)時(shí)間失敗。 首先檢查中斷狀態(tài),然

16? * 后調(diào)用至少一次tryAcquire(int) ,成功返回。 否則,線程排隊(duì),可能會(huì)重復(fù)阻塞和解除阻

17? * 塞,調(diào)用tryAcquire(int)直到成功或線程中斷或超時(shí)

18? */

19 public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {...}

20

21 /**

22? * 以共享模式獲取,忽略中斷。 通過(guò)首次調(diào)用至少一次執(zhí)行 tryAcquireShared(int),成功返

23? * 回。 否則線程排隊(duì),可能會(huì)重復(fù)阻塞和解除阻塞,直到成功調(diào)用tryAcquireShared(int) 。

24? */

25 public final void acquireShared(int arg){...}

26

27 /**

28? * 以共享方式獲取,如果中斷,中止。 首先檢查中斷狀態(tài),然后調(diào)用至少一次

29? * tryAcquireShared(int) ,成功返回。 否則線程排隊(duì),可能會(huì)重復(fù)阻塞和解除阻塞,調(diào)用

30? * tryAcquireShared(int)直到成功或線程中斷。

31? */

32 public final void acquireSharedInterruptibly(int arg) throws InterruptedException{...}

33

34 /**

35? * 嘗試以共享模式獲取,如果中斷則中止,如果給定的時(shí)間超過(guò),則失敗。 通過(guò)首先檢查中斷

36? * 狀態(tài),然后調(diào)用至少一次tryAcquireShared(int) ,成功返回。 否則,線程排隊(duì),可能會(huì)重

37? * 復(fù)阻塞和解除阻塞,調(diào)用tryAcquireShared(int)直到成功或線程中斷或超時(shí)。

38? */

39 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException{...}

40

41 /**

42? * 獨(dú)占式的釋放同步狀態(tài),該方法會(huì)在釋放同步狀態(tài)之后,將同步隊(duì)列中的第一個(gè)節(jié)點(diǎn)包含的線程喚醒

43? */

44 public final boolean release(int arg){...}

45

46 /**

47? * 共享式的釋放同步狀態(tài)

48? */

49 public final boolean releaseShared(int arg){...}

50

51 /**

52? * 獲取在等待隊(duì)列上的線程集合

53? */

54 public final Collection<Thread> getQueuedThreads(){...}

回到頂部

三、隊(duì)列同步器的實(shí)現(xiàn)分析

?1、同步隊(duì)列

a)t同步隊(duì)列的實(shí)現(xiàn)原理

AQS內(nèi)部維護(hù)一個(gè)同步隊(duì)列來(lái)完成同步狀態(tài)的管理,當(dāng)前線程獲取同步狀態(tài)失敗的時(shí)候,AQS會(huì)將當(dāng)前線程以及等待狀態(tài)信息構(gòu)造成一個(gè)結(jié)點(diǎn)Node并將其加入同步隊(duì)列中,同時(shí)阻塞當(dāng)前線程,當(dāng)同步狀態(tài)由持有線程釋放的時(shí)候,會(huì)將同步隊(duì)列中的首節(jié)點(diǎn)喚醒使其再次嘗試獲取同步狀態(tài)。同步隊(duì)列中的結(jié)點(diǎn)用來(lái)保存獲取同步狀態(tài)失敗的線程的線程引用、等待狀態(tài)以及前驅(qū)結(jié)點(diǎn)和后繼結(jié)點(diǎn)。下面是Node的屬性分析

1? ? static final class Node {

2? ? ? ? /** 共享模式下構(gòu)造結(jié)點(diǎn) */

3? ? ? ? static final Node SHARED = new Node();

4? ? ? ? /** 獨(dú)占模式下構(gòu)造結(jié)點(diǎn) */

5? ? ? ? static final Node EXCLUSIVE = null;

6

7? ? ? ? /** 用于指示線程已經(jīng)取消的waitStatus值(由于在同步隊(duì)列中等待的線程等待超時(shí)或者發(fā)生中斷,需要從同步隊(duì)列中取消等待,結(jié)點(diǎn)進(jìn)入該狀態(tài)將不會(huì)發(fā)生變化)*/

8? ? ? ? static final int CANCELLED =? 1;

9? ? ? ? /** waitstatus值指示后續(xù)線程需要取消等待(后繼結(jié)點(diǎn)的線程處于等待狀態(tài),而當(dāng)前結(jié)點(diǎn)的線程如果釋放了同步狀態(tài)或者CANCELL,將會(huì)通知后繼結(jié)點(diǎn)的線程以運(yùn)行) */

10? ? ? ? static final int SIGNAL? ? = -1;

11? ? ? ? /**waitStatus值表示線程正在等待條件(原本結(jié)點(diǎn)在等待隊(duì)列中,結(jié)點(diǎn)線程等待在Condition上,當(dāng)其他線程對(duì)Condition調(diào)用了signal()方法之后)該結(jié)點(diǎn)會(huì)從

         等待隊(duì)列中轉(zhuǎn)移到同步隊(duì)列中,進(jìn)行同步狀態(tài)的獲取 */

12? ? ? ? static final int CONDITION = -2;

13? ? ? ? /**

14? ? ? ? ? * waitStatus值表示下一個(gè)共享式同步狀態(tài)的獲取應(yīng)該無(wú)條件傳播下去

15? ? ? ? ? */

16? ? ? ? static final int PROPAGATE = -3;

17

18? ? ? ? /**

19? ? ? ? ? * 不同的等到狀態(tài)的int值

20? ? ? ? ? */

21? ? ? ? volatile int waitStatus;

22

23? ? ? ? /**

24? ? ? ? ? * 前驅(qū)結(jié)點(diǎn),當(dāng)結(jié)點(diǎn)加入同步隊(duì)列將會(huì)被設(shè)置前驅(qū)結(jié)點(diǎn)信息

25? ? ? ? ? */

26? ? ? ? volatile Node prev;

27

28? ? ? ? /**

29? ? ? ? ? * 后繼結(jié)點(diǎn)

30? ? ? ? ? */

31? ? ? ? volatile Node next;

32

33? ? ? ? /**

34? ? ? ? ? * 當(dāng)前獲取到同步狀態(tài)的線程

35? ? ? ? ? */

36? ? ? ? volatile Thread thread;

37

38? ? ? ? /**

39? ? ? ? ? * 等待隊(duì)列中的后繼結(jié)點(diǎn),如果當(dāng)前結(jié)點(diǎn)是共享的,那么這個(gè)字段是一個(gè)SHARED常量;也就是說(shuō)結(jié)點(diǎn)類(lèi)型(獨(dú)占和共享)和等待隊(duì)列中的后繼結(jié)點(diǎn)公用一個(gè)字段

40? ? ? ? ? */

41? ? ? ? Node nextWaiter;

42

43? ? ? ? /**

44? ? ? ? ? * 如果是共享模式下等待,那么返回true(因?yàn)樯厦娴腘ode nextWaiter字段在共享模式下是一個(gè)SHARED常量)

45? ? ? ? ? */

46? ? ? ? final boolean isShared() {

47? ? ? ? ? ? return nextWaiter == SHARED;

48? ? ? ? }

49

50? ? ? ? final Node predecessor() throws NullPointerException {

51? ? ? ? ? ? Node p = prev;

52? ? ? ? ? ? if (p == null)

53? ? ? ? ? ? ? ? throw new NullPointerException();

54? ? ? ? ? ? else

55? ? ? ? ? ? ? ? return p;

56? ? ? ? }

57

58? ? ? ? Node() {? ? // 用于建立初始頭結(jié)點(diǎn)或SHARED標(biāo)記

59? ? ? ? }

60

61? ? ? ? Node(Thread thread, Node mode) {? ? // 用于添加到等待隊(duì)列

62? ? ? ? ? ? this.nextWaiter = mode;

63? ? ? ? ? ? this.thread = thread;

64? ? ? ? }

65

66? ? ? ? Node(Thread thread, int waitStatus) { // Used by Condition

67? ? ? ? ? ? this.waitStatus = waitStatus;

68? ? ? ? ? ? this.thread = thread;

69? ? ? ? }

70? ? }

b)同步隊(duì)列示意圖和簡(jiǎn)單分析

①同步隊(duì)列示意圖:當(dāng)一個(gè)線程獲取了同步狀態(tài)后,其他線程不能獲取到該同步狀態(tài),就會(huì)被構(gòu)造稱(chēng)為Node然后添加到同步隊(duì)列之中,這個(gè)添加的過(guò)程基于CAS保證線程安全性。

②同步隊(duì)列遵循先進(jìn)先出(FIFO),首節(jié)點(diǎn)是獲取到同步狀態(tài)的結(jié)點(diǎn),首節(jié)點(diǎn)的線程在釋放同步狀態(tài)的時(shí)候?qū)?huì)喚醒后繼結(jié)點(diǎn)(然后后繼結(jié)點(diǎn)就會(huì)變成新的首節(jié)點(diǎn)等待獲取同步狀態(tài))

2、獨(dú)占式同步狀態(tài)的獲取和釋放

①前面說(shuō)過(guò),同步器的acquire()方法會(huì)獲取同步狀態(tài),這個(gè)方法對(duì)不會(huì)響應(yīng)中斷,也就是說(shuō)當(dāng)線程獲取通同步狀態(tài)失敗后會(huì)被構(gòu)造成結(jié)點(diǎn)加入到同步隊(duì)列中,當(dāng)線程被中斷時(shí)不會(huì)從同步隊(duì)列中移除。

1 /**

2? * ①首先調(diào)用tryAcquire方法嘗試獲取同步狀態(tài),如果獲取同步狀態(tài)失敗,就進(jìn)行下面的操作

3? * ②獲取失敗:按照獨(dú)占式的模式構(gòu)造同步結(jié)點(diǎn)并通過(guò)addWaiter方法將結(jié)點(diǎn)添加到同步隊(duì)列的尾部

4? * ③通過(guò)acquireQueue方法自旋獲取同步狀態(tài)。

5? * ④如果獲取不到同步狀態(tài),就阻塞結(jié)點(diǎn)中的線程,而結(jié)點(diǎn)中的線程喚醒主要是通過(guò)前驅(qū)結(jié)點(diǎn)的出隊(duì)或者被中斷來(lái)實(shí)現(xiàn)

6? */

7 public final void acquire(int arg) {

8? ? if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

9? ? ? ? selfInterrupt();

10 }

?②下面是addWaiter、enq和自旋獲取同步狀態(tài)acquireQueue方法的實(shí)現(xiàn)(該方法的主要作用就是將獲取同步狀態(tài)失敗的線程構(gòu)造成結(jié)點(diǎn)然后添加到同步隊(duì)列的隊(duì)尾)

1 private Node addWaiter(Node mode) {

2? ? Node node = new Node(Thread.currentThread(), mode);

3? ? //嘗試直接放在隊(duì)尾

4? ? Node pred = tail; //直接獲取同步器的tail結(jié)點(diǎn)

5? ? if (pred != null) {

6? ? ? ? node.prev = pred;

7? ? ? ? if (compareAndSetTail(pred, node)) {

8? ? ? ? ? ? //隊(duì)尾結(jié)點(diǎn)不為空通過(guò)原子操作將構(gòu)造的結(jié)點(diǎn)置為隊(duì)尾結(jié)點(diǎn)

9? ? ? ? ? ? pred.next = node;

10? ? ? ? ? ? return node;

11? ? ? ? }

12? ? }

13? ? //采用自旋方式保證構(gòu)造的結(jié)點(diǎn)添加到同步隊(duì)列中

14? ? enq(node);

15? ? return node;

16 }

17 private Node enq(final Node node) {

18? ? for (;;) { //死循環(huán)知道添加成功

19? ? ? ? Node t = tail;

20? ? ? ? if (t == null) { // Must initialize

21? ? ? ? ? ? if (compareAndSetHead(new Node()))

22? ? ? ? ? ? ? ? tail = head;

23? ? ? ? } else {

24? ? ? ? ? ? node.prev = t;

25? ? ? ? ? ? //通過(guò)CAS方式將結(jié)點(diǎn)添加到同步隊(duì)列之后才會(huì)返回,否則就會(huì)不斷嘗試添加(這樣實(shí)際上就是在并發(fā)情況下,把向同步隊(duì)列添加Node變得串行化了)

26? ? ? ? ? ? if (compareAndSetTail(t, node)) {

27? ? ? ? ? ? ? ? t.next = node;

28? ? ? ? ? ? ? ? return t;

29? ? ? ? ? ? }

30? ? ? ? }

31? ? }

32 }? ?

33 /**

34? *? ? 通過(guò)tryAcquire()和addWaiter(),表示該線程獲取同步狀態(tài)已經(jīng)失敗,被放入同步? ? ? ?

35? * 隊(duì)列尾部了。線程阻塞等待直到其他線程(前驅(qū)結(jié)點(diǎn)獲得同步裝填或者被中斷)釋放同步狀

36? * 態(tài)后喚醒自己,自己才能獲得。

37? */

38 final boolean acquireQueued(final Node node, int arg) {

39? ? boolean failed = true;

40? ? try {

41? ? ? ? boolean interrupted = false;

42? ? ? ? //線程在死循環(huán)的方式中嘗試獲取同步狀態(tài)

43? ? ? ? for (;;) {

44? ? ? ? ? ? final Node p = node.predecessor(); //獲取前驅(qū)結(jié)點(diǎn)

45? ? ? ? ? ? //只有前驅(qū)接待是頭結(jié)點(diǎn)的時(shí)候才能?chē)L試獲取同步狀態(tài)

46? ? ? ? ? ? if (p == head && tryAcquire(arg)) {

47? ? ? ? ? ? ? ? setHead(node); //獲取到同步狀態(tài)之后,就將自己設(shè)置為頭結(jié)點(diǎn)

48? ? ? ? ? ? ? ? p.next = null; //前驅(qū)結(jié)點(diǎn)已經(jīng)獲得同步狀態(tài)去執(zhí)行自己的程序了,所以需要釋放掉占用的同步隊(duì)列的資源,由JVM回收

49? ? ? ? ? ? ? ? failed = false;

50? ? ? ? ? ? ? ? return interrupted;

51? ? ? ? ? ? }

52? ? ? ? ? ? //如果獲取同步狀態(tài)失敗,應(yīng)該自旋等待繼續(xù)獲取并且校驗(yàn)自己的中斷標(biāo)志位信息

53? ? ? ? ? ? if (shouldParkAfterFailedAcquire(p, node) &&

54? ? ? ? ? ? ? ? parkAndCheckInterrupt())

55? ? ? ? ? ? ? ? interrupted = true; //如果被中斷,就改變自己的中斷標(biāo)志位狀態(tài)信息

56? ? ? ? }

57? ? } finally {

58? ? ? ? if (failed)

59? ? ? ? ? ? cancelAcquire(node);

60? ? }

61 }

③獨(dú)占式獲取同步狀態(tài)的整個(gè)流程

④獨(dú)占式同步器的釋放:release方法執(zhí)行時(shí),會(huì)喚醒頭結(jié)點(diǎn)的后繼結(jié)點(diǎn)線程

public final boolean release(int arg) {

? ? if (tryRelease(arg)) {

? ? ? ? Node h = head;//頭結(jié)點(diǎn)

? ? ? ? //喚醒頭結(jié)點(diǎn)的后繼結(jié)點(diǎn)線程

? ? ? ? if (h != null && h.waitStatus != 0)

? ? ? ? ? ? unparkSuccessor(h);

? ? ? ? return true;

? ? }

? ? return false;

}

3、共享式同步狀態(tài)的獲取和釋放?

①共享式獲取和獨(dú)占式獲取最主要的區(qū)別是能否有多個(gè)線程同時(shí)獲取到同步狀態(tài)。如圖所示簡(jiǎn)易描述二者的區(qū)別(共享式訪問(wèn)的時(shí)候,可以允許多個(gè)線程訪問(wèn)資源,但是存在獨(dú)占式訪問(wèn)的時(shí)候,同一時(shí)刻其他的不管是共享還是獨(dú)占都會(huì)被阻塞)

②關(guān)于共享式獲取同步狀態(tài)的方法

1 /**

2? * 此方法是共享模式下線程獲取共享同步狀態(tài)的頂層入口。它會(huì)嘗試去獲取同步狀態(tài),獲取成功則直接返回,

3? * 獲取失敗則進(jìn)入等待隊(duì)列一直嘗試獲取(執(zhí)行doAcquireShared方法體中的內(nèi)容),直到獲取到資源為止(條件就是tryAcquireShared方法返回值大于等于0),整個(gè)過(guò)程忽略中斷

4? */

5 public final void acquireShared(int arg) {

6? ? if (tryAcquireShared(arg) < 0)

7? ? ? ? doAcquireShared(arg);

8 }

9 /**

10? * "自旋"嘗試獲取同步狀態(tài)

11? */

12 private void doAcquireShared(int arg) {

13? ? //首先將該線程包括線程引用、等待狀態(tài)、前驅(qū)結(jié)點(diǎn)和后繼結(jié)點(diǎn)的信息封裝臺(tái)Node中,然后添加到等待隊(duì)列里面(一共享模式添加)

14? ? final Node node = addWaiter(Node.SHARED);

15? ? boolean failed = true;

16? ? try {

17? ? ? ? boolean interrupted = false; //當(dāng)前線程的中斷標(biāo)志

18? ? ? ? for (;;) {

19? ? ? ? ? ? final Node p = node.predecessor(); //獲取前驅(qū)結(jié)點(diǎn)

20? ? ? ? ? ? if (p == head) {

21? ? ? ? ? ? ? ? //當(dāng)前驅(qū)結(jié)點(diǎn)是頭結(jié)點(diǎn)的時(shí)候就會(huì)以共享的方式去嘗試獲取同步狀態(tài)

22? ? ? ? ? ? ? ? int r = tryAcquireShared(arg);

23? ? ? ? ? ? ? ? //判斷tryAcquireShared的返回值

24? ? ? ? ? ? ? ? if (r >= 0) {

25? ? ? ? ? ? ? ? ? ? //如果返回值大于等于0,表示獲取同步狀態(tài)成功,就修改當(dāng)前的頭結(jié)點(diǎn)并將信息傳播都后續(xù)的結(jié)點(diǎn)隊(duì)列中

26? ? ? ? ? ? ? ? ? ? setHeadAndPropagate(node, r);

27? ? ? ? ? ? ? ? ? ? p.next = null; // 釋放掉已經(jīng)獲取到同步狀態(tài)的前驅(qū)結(jié)點(diǎn)的資源

28? ? ? ? ? ? ? ? ? ? if (interrupted)

29? ? ? ? ? ? ? ? ? ? ? ? selfInterrupt(); //檢查中斷標(biāo)志

30? ? ? ? ? ? ? ? ? ? failed = false;

31? ? ? ? ? ? ? ? ? ? return;

32? ? ? ? ? ? ? ? }

33? ? ? ? ? ? }

34? ? ? ? ? ? if (shouldParkAfterFailedAcquire(p, node) &&

35? ? ? ? ? ? ? ? parkAndCheckInterrupt())

36? ? ? ? ? ? ? ? interrupted = true;

37? ? ? ? }

38? ? } finally {

39? ? ? ? if (failed)

40? ? ? ? ? ? cancelAcquire(node);

41? ? }

42 }

根據(jù)源代碼我們可以了解共享式獲取同步狀態(tài)的整個(gè)過(guò)程

首先同步器會(huì)調(diào)用tryAcquireShared方法來(lái)嘗試獲取同步狀態(tài),然后根據(jù)這個(gè)返回值來(lái)判斷是否獲取到同步狀態(tài)(當(dāng)返回值大于等于0可視為獲取到同步狀態(tài));如果第一次獲取失敗的話(huà),就進(jìn)入'自旋'狀態(tài)(執(zhí)行doAcquireShared方法)一直嘗試去獲取同步狀態(tài);在自旋獲取中,如果檢查到當(dāng)前前驅(qū)結(jié)點(diǎn)是頭結(jié)點(diǎn)的話(huà),就會(huì)嘗試獲取同步狀態(tài),而一旦獲取成功(tryAcquireShared方法返回值大于等于0)就可以從自旋狀態(tài)退出。

另外,還有一點(diǎn)就是上面說(shuō)到的一個(gè)處于等待隊(duì)列的線程要想開(kāi)始嘗試去獲取同步狀態(tài),需要滿(mǎn)足的條件就是前驅(qū)結(jié)點(diǎn)是頭結(jié)點(diǎn),那么它本身就是整個(gè)隊(duì)列中的第二個(gè)結(jié)點(diǎn)。當(dāng)頭結(jié)點(diǎn)釋放掉所有的臨界資源之后,我們考慮每個(gè)線程運(yùn)行所需資源的不同數(shù)量問(wèn)題,如下圖所示

③共享式同步狀態(tài)的釋放

對(duì)于支持共享式的同步組件(即多個(gè)線程同同時(shí)訪問(wèn)),它們和獨(dú)占式的主要區(qū)別就是tryReleaseShared方法必須確保同步狀態(tài)的釋放是線程安全的(CAS的模式來(lái)釋放同步狀態(tài),因?yàn)榧热皇嵌鄠€(gè)線程能夠訪問(wèn),那么釋放的時(shí)候也會(huì)是多個(gè)線程的,就需要保證釋放時(shí)候的線程安全)

1 /**

2? * 該方法是共享模式下線程釋放共享資源的頂層入口。它會(huì)釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會(huì)喚醒等待隊(duì)列里的其他線程來(lái)獲取資源。

3? */

4 public final boolean releaseShared(int arg) {

5? ? if (tryReleaseShared(arg)) {

6? ? ? ? doReleaseShared(); //

7? ? ? ? return true;

8? ? }

9? ? return false;

10 }

回到頂部

四、自定義同步組件的實(shí)現(xiàn)

1、共享式鎖的實(shí)現(xiàn)

①、自定義一個(gè)同步組件,可以允許兩個(gè)線程訪問(wèn)(共享式同步組件),超過(guò)兩個(gè)線程就會(huì)被阻塞。

②、既然是共享式同步組件,按照前面所說(shuō)的,組件本身需要使用AQS提供的共享式模板方法acquireShared等;組件的內(nèi)部類(lèi)需要實(shí)現(xiàn)AQS,并且重寫(xiě)關(guān)于共享式獲取同步狀態(tài)的方法(tryAcquireShared()、tryReleaseShared()等共享模式下的方法)。

③、既然是兩個(gè)線程能夠同時(shí)訪問(wèn)的話(huà),那么狀態(tài)數(shù)的取值范圍就是0、1、2了,每當(dāng)一個(gè)線程獲取到同步狀態(tài)的時(shí)候state值減1,反之就會(huì)增加1;當(dāng)state值為0的時(shí)候就會(huì)阻塞其他想要獲取同步狀態(tài)的線程。對(duì)于同步狀態(tài)的更改需要使用CAS來(lái)進(jìn)行保證原子性。

package cn.source.concurrent;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

public class TestAQS implements Lock{

??private Sync sync = new Sync(2);


??private static class Sync extends AbstractQueuedSynchronizer {


????Sync(int num) {

??????if(num <= 0) {

????????throw new RuntimeException("num需要大于0");

??????}

??????setState(num);

????}

????@Override

????protected int tryAcquireShared(int arg) {

??????for(; ;) {

????????int currentState = getState();

????????int newState = currentState - arg;

????????if(newState < 0 || compareAndSetState(currentState, newState)) {

??????????return newState;

????????}

??????}

????}

????@Override

????protected boolean tryReleaseShared(int arg) {

??????for(; ;) {

????????int currentState = getState();

????????int newState = currentState + arg;

????????if(compareAndSetState(currentState, newState)) {

??????????return true;

????????}

??????}

????}



??}

??@Override

??public void lock() {

????sync.acquireShared(1);

??}

??@Override

??public void unlock() {

????sync.releaseShared(1);

??}

??//......

}

共享式鎖

/**

?* 測(cè)試結(jié)果:輸出的線程名稱(chēng)是成對(duì)的,保證同一時(shí)刻只有兩個(gè)線程能夠獲取到鎖

?*

?*/?????

public class TestLockShare {

??@Test

??public void test() {

????Lock lock = new TestAQS();

????class Worker extends Thread {

??????@Override

??????public void run() {

????????while(true) {

??????????lock.lock();

??????????try {

????????????Thread.sleep(1000);

????????????System.out.println(Thread.currentThread().getName());

????????????Thread.sleep(1000);

??????????} catch (Exception e) {

????????????e.printStackTrace();

??????????} finally {

????????????lock.unlock();

??????????}

????????}

??????}


????}


????for (int i = 0; i < 8; i++) {

??????Worker worker = new Worker();

??????worker.setDaemon(true);

??????worker.start();


????}

????for (int i = 0; i < 8; i++) {

??????try {

????????Thread.sleep(1000);

??????} catch (InterruptedException e) {

????????// TODO Auto-generated catch block

????????e.printStackTrace();

??????}

??????System.out.println();

????}

??}

}

共享式鎖測(cè)試

2、獨(dú)占式鎖的實(shí)現(xiàn)

package cn.source.concurrent;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

public class Mutex implements Lock{

??private Sync sync = new Sync();


??private static class Sync extends AbstractQueuedSynchronizer {

????/**

?????* 嘗試獲取資源,立即返回。成功則返回true,否則false。

?????*/

????@Override

????protected boolean tryAcquire(int arg) {

??????if(compareAndSetState(0, 1)) {//state為0才設(shè)置為1,不可重入!

????????setExclusiveOwnerThread(Thread.currentThread());//設(shè)置為當(dāng)前線程獨(dú)占資源

????????return true;

??????}

??????return false;

????}

????/**

?????* 嘗試釋放資源,立即返回。成功則為true,否則false。

?????*/

????@Override

????protected boolean tryRelease(int arg) {

??????if(getState() == 0) { //既然來(lái)釋放,那肯定就是已占有狀態(tài)了。只是為了保險(xiǎn),多層判斷!

????????throw new IllegalMonitorStateException();

??????}

??????setExclusiveOwnerThread(null);

??????setState(0);

??????return true;

????}

????@Override

????protected boolean isHeldExclusively() {

??????// 判斷是否鎖定狀態(tài)

??????return getState() == 1;

????}


??}


??@Override

??public void lock() {

????sync.acquire(1);

??}

??@Override

??public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

????return sync.tryAcquire(1);

??}

??@Override

??public void unlock() {

????sync.release(1);

??}

}

獨(dú)占式鎖

public class TestMutex {

??@Test

??public void test() {

????Lock lock = new Mutex();

????class Worker extends Thread {

??????@Override

??????public void run() {

????????while(true) {

??????????lock.lock();

??????????try {

????????????Thread.sleep(1000);

????????????System.out.println(Thread.currentThread().getName());

????????????Thread.sleep(1000);

??????????} catch (Exception e) {

????????????e.printStackTrace();

??????????} finally {

????????????lock.unlock();

??????????}

????????}

??????}


????}


????for (int i = 0; i < 8; i++) {

??????Worker worker = new Worker();

??????worker.setDaemon(true);

??????worker.start();


????}

????for (int i = 0; i < 8; i++) {

??????try {

????????Thread.sleep(1000);

??????} catch (InterruptedException e) {

????????e.printStackTrace();

??????}

??????System.out.println();

????}

??}

}

獨(dú)占式鎖測(cè)試

歡迎工作一到五年的Java工程師朋友們加入Java程序員開(kāi)發(fā): 721575865

群內(nèi)提供免費(fèi)的Java架構(gòu)學(xué)習(xí)資料(里面有高可用、高并發(fā)、高性能及分布式、Jvm性能調(diào)優(yōu)、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個(gè)知識(shí)點(diǎn)的架構(gòu)資料)合理利用自己每一分每一秒的時(shí)間來(lái)學(xué)習(xí)提升自己,不要再用"沒(méi)有時(shí)間“來(lái)掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來(lái)的自己一個(gè)交代!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 長(zhǎng)著長(zhǎng)著,花兒就忘記了最初的模樣,可她欣然接受每一個(gè)階段的自己,哪怕她發(fā)現(xiàn)自己在同類(lèi)中是如此不同,雖然從同樣的一片...
    文心訪藝閱讀 469評(píng)論 1 1
  • 作為前端,一直以來(lái)都知道HTTP劫持與XSS跨站腳本(Cross-site scripting)、CSRF跨站請(qǐng)求...
    IM魂影閱讀 633評(píng)論 0 1
  • 上河·下河·灌城村 遠(yuǎn)心 (組詩(shī)發(fā)表在《詩(shī)選刊》上半月刊第11、12期合刊) 1.下河 到河北曲陽(yáng)縣下河村了,還有...
    遠(yuǎn)心篤行閱讀 7,139評(píng)論 38 10

友情鏈接更多精彩內(nèi)容