一、AQS概念
1、隊(duì)列同步器是用來(lái)構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,使用一個(gè)int型變量代表同步狀態(tài),通過(guò)內(nèi)置的隊(duì)列來(lái)完成線程的排隊(duì)工作。
2、下面是JDK8文檔中對(duì)于AQS的部分介紹
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable
提供一個(gè)框架,用于實(shí)現(xiàn)依賴(lài)先進(jìn)先出(FIFO)等待隊(duì)列的阻塞鎖和相關(guān)同步器(信號(hào)量,事件等)。 該類(lèi)被設(shè)計(jì)為大多數(shù)類(lèi)型的同步器的有用依據(jù),這些同步器依賴(lài)于單個(gè)原子int值來(lái)表示狀
態(tài)。子類(lèi)必須定義改變此狀態(tài)的protected方法,以及根據(jù)該對(duì)象被獲取或釋放來(lái)定義該狀態(tài)的含義。給定這些,這個(gè)類(lèi)中的其他方法執(zhí)行所有排隊(duì)和阻塞機(jī)制。 子類(lèi)可以保持其他狀態(tài)字段,但只以
原子方式更新int使用方法操縱值getState() , setState(int)和compareAndSetState(int, int)被跟蹤相對(duì)于同步。
此類(lèi)支持默認(rèn)獨(dú)占模式和共享模式。 當(dāng)以獨(dú)占模式獲取時(shí),嘗試通過(guò)其他線程獲取不能成功。 多線程獲取的共享模式可能(但不需要)成功。 除了在機(jī)械意義上,這個(gè)類(lèi)不理解這些差異,當(dāng)共享
模式獲取成功時(shí),下一個(gè)等待線程(如果存在)也必須確定它是否也可以獲取。 在不同模式下等待的線程共享相同的FIFO隊(duì)列。 通常,實(shí)現(xiàn)子類(lèi)只支持這些模式之一,但是兩者都可以在
ReadWriteLock中發(fā)揮作用。僅支持獨(dú)占或僅共享模式的子類(lèi)不需要定義支持未使用模式的方法。
總結(jié)來(lái)說(shuō)就是:
①子類(lèi)通過(guò)繼承AQS并實(shí)現(xiàn)其抽象方法來(lái)管理同步狀態(tài),對(duì)于同步狀態(tài)的更改通過(guò)提供的getState()、setState(int state)、compareAndSetState(int expect, int update)來(lái)進(jìn)行操作,因?yàn)槭褂肅AS操作保證同步狀態(tài)的改變是原子的。
②子類(lèi)被推薦定義為自定義同步組件的靜態(tài)內(nèi)部類(lèi),同步器本身并沒(méi)有實(shí)現(xiàn)任何的同步接口,僅僅是定義了若干狀態(tài)獲取和釋放的方法來(lái)提供自定義同步組件的使用。
③同步器既可以支持獨(dú)占式的獲取同步狀態(tài),也可以支持共享式的獲取同步狀態(tài)(ReentrantLock、ReentrantReadWriteLock、CountDownLatch等不同類(lèi)型的同步組件)
3、同步器是實(shí)現(xiàn)鎖的關(guān)鍵,在鎖的實(shí)現(xiàn)中聚合同步器,利用同步器實(shí)現(xiàn)鎖的語(yǔ)義;
回到頂部
二、AQS的接口和實(shí)例
1、同步器的設(shè)計(jì)實(shí)現(xiàn)原理
繼承同步器并且重寫(xiě)指定的方法,然后將同步器組合在自定義同步組件的實(shí)現(xiàn)中,并且調(diào)用同步器提供的模板方法(這些模板方法會(huì)調(diào)用重寫(xiě)的方法);而重寫(xiě)指定的方法的時(shí)候,需要使用getState()、setState(int state)、compareAndSetState(int expect, int update)來(lái)訪問(wèn)或者更新同步狀態(tài)。下面是源碼中state變量和三個(gè)方法的定義聲明實(shí)現(xiàn)
1 /**
2? ? ? * .(同步狀態(tài))
3? ? ? */
4? ? private volatile int state;
5
6? ? /**
7? ? ? * (返回當(dāng)前的同步狀態(tài))
8? ? ? * 此操作的內(nèi)存語(yǔ)義為@code volatile read
9? ? ? */
10? ? protected final int getState() {
11? ? ? ? return state;
12? ? }
13
14? ? /**
15? ? ? * (設(shè)置新的同步狀態(tài))
16? ? ? * 此操作的內(nèi)存語(yǔ)義為@code volatile read
17? ? ? */
18? ? protected final void setState(int newState) {
19? ? ? ? state = newState;
20? ? }
21
22? ? /**
23? ? ? * (如果要更新的狀態(tài)和期望的狀態(tài)相同,那就通過(guò)原子的方式更新?tīng)顟B(tài))
24? ? ? * ( 此操作的內(nèi)存語(yǔ)義為@code volatile read 和 write)
25? ? ? * (如果更新的狀態(tài)和期望的狀態(tài)不同就返回false)
26? ? ? */
27? ? protected final boolean compareAndSetState(int expect, int update) {
28? ? ? ? return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
29? ? }
2、下面介紹AQS提供可被重寫(xiě)的方法
1 /**
2? * 獨(dú)占式的獲取同步狀態(tài),實(shí)現(xiàn)該方法需要查詢(xún)當(dāng)前狀態(tài)并判斷同步狀態(tài)是否符合預(yù)期,然后再進(jìn)行CAS設(shè)置同步狀態(tài)
3? *
4? */
5 protected boolean tryAcquire(int arg) {
6? ? throw new UnsupportedOperationException();
7 }
8
9 /**
10? * 獨(dú)占式的釋放同步狀態(tài),等待獲取同步狀態(tài)的線程可以有機(jī)會(huì)獲取同步狀態(tài)
11? *
12? */
13 protected boolean tryRelease(int arg) {
14? ? throw new UnsupportedOperationException();
15 }
16
17 /**
18? * 嘗試以共享模式獲取。 該方法應(yīng)該查詢(xún)對(duì)象的狀態(tài)是否允許在共享模式下獲取該對(duì)象,如果是這樣,就可以獲取它。 該方法總是由執(zhí)行獲取的線程調(diào)用。
19? * 如果此方法報(bào)告失敗,則獲取方法可能將線程排隊(duì)(如果尚未排隊(duì)),直到被其他線程釋放為止。 獲取失敗時(shí)返回負(fù)值,如果在獲取成共享模式下功但沒(méi)
20? * 有后續(xù)共享模式獲取可以成功,則為零; 并且如果以共享模式獲取成功并且隨后的共享模式獲取可能成功,則為正值,在這種情況下,后續(xù)等待線程必須檢查可用性。
21? */
22 protected int tryAcquireShared(int arg) {
23? ? throw new UnsupportedOperationException(); //如果不支持共享模式 ,會(huì)拋出該異常
24 }
25
26 /**
27? * 嘗試將狀態(tài)設(shè)置為以共享模式釋放同步狀態(tài)。 該方法總是由執(zhí)行釋放的線程調(diào)用。
28? */
29 protected int tryReleaseShared(int arg) {
30? ? throw new UnsupportedOperationException(); //如果不支持共享模式 ,會(huì)拋出該異常
31 }
32
33 /**
34? * 當(dāng)前同步器是否在獨(dú)占模式下被線程占用,一般該方法表示是否被當(dāng)前線程所獨(dú)占
35? */
36 protected int isHeldExclusively(int arg) {
37? ? throw new UnsupportedOperationException(); //如果不支持共享模式 ,會(huì)拋出該異常
38 }
3、同步器提供的模板方法
在實(shí)現(xiàn)自定義同步組件的時(shí)候,需要重寫(xiě)上面的方法,而下面的模板方法會(huì)調(diào)用上面重寫(xiě)的方法。下面介紹同步器提供的模板方法
1 /**
2? * 以獨(dú)占模式獲取,忽略中斷。 通過(guò)調(diào)用至少一次tryAcquire(int)實(shí)現(xiàn),成功返回。 否則線
3? * 程排隊(duì),可能會(huì)重復(fù)阻塞和解除阻塞,直到成功才調(diào)用tryAcquire(int)
4? */
5 public final void acquire(int arg) {...}
6
7 /**
8? * 以獨(dú)占方式獲得,如果中斷,中止。 通過(guò)首先檢查中斷狀態(tài),然后調(diào)用至少一次
9? * tryAcquire(int) ,成功返回。 否則線程排隊(duì),可能會(huì)重復(fù)阻塞和解除阻塞,調(diào)用
10? * tryAcquire(int)直到成功或線程中斷。
11? */
12 public final void acquireInterruptibly(int arg) throws InterruptedException {...}
13
14 /**
15? * 嘗試以獨(dú)占模式獲取,如果中斷則中止,如果給定的超時(shí)時(shí)間失敗。 首先檢查中斷狀態(tài),然
16? * 后調(diào)用至少一次tryAcquire(int) ,成功返回。 否則,線程排隊(duì),可能會(huì)重復(fù)阻塞和解除阻
17? * 塞,調(diào)用tryAcquire(int)直到成功或線程中斷或超時(shí)
18? */
19 public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {...}
20
21 /**
22? * 以共享模式獲取,忽略中斷。 通過(guò)首次調(diào)用至少一次執(zhí)行 tryAcquireShared(int),成功返
23? * 回。 否則線程排隊(duì),可能會(huì)重復(fù)阻塞和解除阻塞,直到成功調(diào)用tryAcquireShared(int) 。
24? */
25 public final void acquireShared(int arg){...}
26
27 /**
28? * 以共享方式獲取,如果中斷,中止。 首先檢查中斷狀態(tài),然后調(diào)用至少一次
29? * tryAcquireShared(int) ,成功返回。 否則線程排隊(duì),可能會(huì)重復(fù)阻塞和解除阻塞,調(diào)用
30? * tryAcquireShared(int)直到成功或線程中斷。
31? */
32 public final void acquireSharedInterruptibly(int arg) throws InterruptedException{...}
33
34 /**
35? * 嘗試以共享模式獲取,如果中斷則中止,如果給定的時(shí)間超過(guò),則失敗。 通過(guò)首先檢查中斷
36? * 狀態(tài),然后調(diào)用至少一次tryAcquireShared(int) ,成功返回。 否則,線程排隊(duì),可能會(huì)重
37? * 復(fù)阻塞和解除阻塞,調(diào)用tryAcquireShared(int)直到成功或線程中斷或超時(shí)。
38? */
39 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException{...}
40
41 /**
42? * 獨(dú)占式的釋放同步狀態(tài),該方法會(huì)在釋放同步狀態(tài)之后,將同步隊(duì)列中的第一個(gè)節(jié)點(diǎn)包含的線程喚醒
43? */
44 public final boolean release(int arg){...}
45
46 /**
47? * 共享式的釋放同步狀態(tài)
48? */
49 public final boolean releaseShared(int arg){...}
50
51 /**
52? * 獲取在等待隊(duì)列上的線程集合
53? */
54 public final Collection<Thread> getQueuedThreads(){...}
回到頂部
三、隊(duì)列同步器的實(shí)現(xiàn)分析
?1、同步隊(duì)列
a)t同步隊(duì)列的實(shí)現(xiàn)原理
AQS內(nèi)部維護(hù)一個(gè)同步隊(duì)列來(lái)完成同步狀態(tài)的管理,當(dāng)前線程獲取同步狀態(tài)失敗的時(shí)候,AQS會(huì)將當(dāng)前線程以及等待狀態(tài)信息構(gòu)造成一個(gè)結(jié)點(diǎn)Node并將其加入同步隊(duì)列中,同時(shí)阻塞當(dāng)前線程,當(dāng)同步狀態(tài)由持有線程釋放的時(shí)候,會(huì)將同步隊(duì)列中的首節(jié)點(diǎn)喚醒使其再次嘗試獲取同步狀態(tài)。同步隊(duì)列中的結(jié)點(diǎn)用來(lái)保存獲取同步狀態(tài)失敗的線程的線程引用、等待狀態(tài)以及前驅(qū)結(jié)點(diǎn)和后繼結(jié)點(diǎn)。下面是Node的屬性分析
1? ? static final class Node {
2? ? ? ? /** 共享模式下構(gòu)造結(jié)點(diǎn) */
3? ? ? ? static final Node SHARED = new Node();
4? ? ? ? /** 獨(dú)占模式下構(gòu)造結(jié)點(diǎn) */
5? ? ? ? static final Node EXCLUSIVE = null;
6
7? ? ? ? /** 用于指示線程已經(jīng)取消的waitStatus值(由于在同步隊(duì)列中等待的線程等待超時(shí)或者發(fā)生中斷,需要從同步隊(duì)列中取消等待,結(jié)點(diǎn)進(jìn)入該狀態(tài)將不會(huì)發(fā)生變化)*/
8? ? ? ? static final int CANCELLED =? 1;
9? ? ? ? /** waitstatus值指示后續(xù)線程需要取消等待(后繼結(jié)點(diǎn)的線程處于等待狀態(tài),而當(dāng)前結(jié)點(diǎn)的線程如果釋放了同步狀態(tài)或者CANCELL,將會(huì)通知后繼結(jié)點(diǎn)的線程以運(yùn)行) */
10? ? ? ? static final int SIGNAL? ? = -1;
11? ? ? ? /**waitStatus值表示線程正在等待條件(原本結(jié)點(diǎn)在等待隊(duì)列中,結(jié)點(diǎn)線程等待在Condition上,當(dāng)其他線程對(duì)Condition調(diào)用了signal()方法之后)該結(jié)點(diǎn)會(huì)從
等待隊(duì)列中轉(zhuǎn)移到同步隊(duì)列中,進(jìn)行同步狀態(tài)的獲取 */
12? ? ? ? static final int CONDITION = -2;
13? ? ? ? /**
14? ? ? ? ? * waitStatus值表示下一個(gè)共享式同步狀態(tài)的獲取應(yīng)該無(wú)條件傳播下去
15? ? ? ? ? */
16? ? ? ? static final int PROPAGATE = -3;
17
18? ? ? ? /**
19? ? ? ? ? * 不同的等到狀態(tài)的int值
20? ? ? ? ? */
21? ? ? ? volatile int waitStatus;
22
23? ? ? ? /**
24? ? ? ? ? * 前驅(qū)結(jié)點(diǎn),當(dāng)結(jié)點(diǎn)加入同步隊(duì)列將會(huì)被設(shè)置前驅(qū)結(jié)點(diǎn)信息
25? ? ? ? ? */
26? ? ? ? volatile Node prev;
27
28? ? ? ? /**
29? ? ? ? ? * 后繼結(jié)點(diǎn)
30? ? ? ? ? */
31? ? ? ? volatile Node next;
32
33? ? ? ? /**
34? ? ? ? ? * 當(dāng)前獲取到同步狀態(tài)的線程
35? ? ? ? ? */
36? ? ? ? volatile Thread thread;
37
38? ? ? ? /**
39? ? ? ? ? * 等待隊(duì)列中的后繼結(jié)點(diǎn),如果當(dāng)前結(jié)點(diǎn)是共享的,那么這個(gè)字段是一個(gè)SHARED常量;也就是說(shuō)結(jié)點(diǎn)類(lèi)型(獨(dú)占和共享)和等待隊(duì)列中的后繼結(jié)點(diǎn)公用一個(gè)字段
40? ? ? ? ? */
41? ? ? ? Node nextWaiter;
42
43? ? ? ? /**
44? ? ? ? ? * 如果是共享模式下等待,那么返回true(因?yàn)樯厦娴腘ode nextWaiter字段在共享模式下是一個(gè)SHARED常量)
45? ? ? ? ? */
46? ? ? ? final boolean isShared() {
47? ? ? ? ? ? return nextWaiter == SHARED;
48? ? ? ? }
49
50? ? ? ? final Node predecessor() throws NullPointerException {
51? ? ? ? ? ? Node p = prev;
52? ? ? ? ? ? if (p == null)
53? ? ? ? ? ? ? ? throw new NullPointerException();
54? ? ? ? ? ? else
55? ? ? ? ? ? ? ? return p;
56? ? ? ? }
57
58? ? ? ? Node() {? ? // 用于建立初始頭結(jié)點(diǎn)或SHARED標(biāo)記
59? ? ? ? }
60
61? ? ? ? Node(Thread thread, Node mode) {? ? // 用于添加到等待隊(duì)列
62? ? ? ? ? ? this.nextWaiter = mode;
63? ? ? ? ? ? this.thread = thread;
64? ? ? ? }
65
66? ? ? ? Node(Thread thread, int waitStatus) { // Used by Condition
67? ? ? ? ? ? this.waitStatus = waitStatus;
68? ? ? ? ? ? this.thread = thread;
69? ? ? ? }
70? ? }
b)同步隊(duì)列示意圖和簡(jiǎn)單分析
①同步隊(duì)列示意圖:當(dāng)一個(gè)線程獲取了同步狀態(tài)后,其他線程不能獲取到該同步狀態(tài),就會(huì)被構(gòu)造稱(chēng)為Node然后添加到同步隊(duì)列之中,這個(gè)添加的過(guò)程基于CAS保證線程安全性。
②同步隊(duì)列遵循先進(jìn)先出(FIFO),首節(jié)點(diǎn)是獲取到同步狀態(tài)的結(jié)點(diǎn),首節(jié)點(diǎn)的線程在釋放同步狀態(tài)的時(shí)候?qū)?huì)喚醒后繼結(jié)點(diǎn)(然后后繼結(jié)點(diǎn)就會(huì)變成新的首節(jié)點(diǎn)等待獲取同步狀態(tài))
2、獨(dú)占式同步狀態(tài)的獲取和釋放
①前面說(shuō)過(guò),同步器的acquire()方法會(huì)獲取同步狀態(tài),這個(gè)方法對(duì)不會(huì)響應(yīng)中斷,也就是說(shuō)當(dāng)線程獲取通同步狀態(tài)失敗后會(huì)被構(gòu)造成結(jié)點(diǎn)加入到同步隊(duì)列中,當(dāng)線程被中斷時(shí)不會(huì)從同步隊(duì)列中移除。
1 /**
2? * ①首先調(diào)用tryAcquire方法嘗試獲取同步狀態(tài),如果獲取同步狀態(tài)失敗,就進(jìn)行下面的操作
3? * ②獲取失敗:按照獨(dú)占式的模式構(gòu)造同步結(jié)點(diǎn)并通過(guò)addWaiter方法將結(jié)點(diǎn)添加到同步隊(duì)列的尾部
4? * ③通過(guò)acquireQueue方法自旋獲取同步狀態(tài)。
5? * ④如果獲取不到同步狀態(tài),就阻塞結(jié)點(diǎn)中的線程,而結(jié)點(diǎn)中的線程喚醒主要是通過(guò)前驅(qū)結(jié)點(diǎn)的出隊(duì)或者被中斷來(lái)實(shí)現(xiàn)
6? */
7 public final void acquire(int arg) {
8? ? if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
9? ? ? ? selfInterrupt();
10 }
?②下面是addWaiter、enq和自旋獲取同步狀態(tài)acquireQueue方法的實(shí)現(xiàn)(該方法的主要作用就是將獲取同步狀態(tài)失敗的線程構(gòu)造成結(jié)點(diǎn)然后添加到同步隊(duì)列的隊(duì)尾)
1 private Node addWaiter(Node mode) {
2? ? Node node = new Node(Thread.currentThread(), mode);
3? ? //嘗試直接放在隊(duì)尾
4? ? Node pred = tail; //直接獲取同步器的tail結(jié)點(diǎn)
5? ? if (pred != null) {
6? ? ? ? node.prev = pred;
7? ? ? ? if (compareAndSetTail(pred, node)) {
8? ? ? ? ? ? //隊(duì)尾結(jié)點(diǎn)不為空通過(guò)原子操作將構(gòu)造的結(jié)點(diǎn)置為隊(duì)尾結(jié)點(diǎn)
9? ? ? ? ? ? pred.next = node;
10? ? ? ? ? ? return node;
11? ? ? ? }
12? ? }
13? ? //采用自旋方式保證構(gòu)造的結(jié)點(diǎn)添加到同步隊(duì)列中
14? ? enq(node);
15? ? return node;
16 }
17 private Node enq(final Node node) {
18? ? for (;;) { //死循環(huán)知道添加成功
19? ? ? ? Node t = tail;
20? ? ? ? if (t == null) { // Must initialize
21? ? ? ? ? ? if (compareAndSetHead(new Node()))
22? ? ? ? ? ? ? ? tail = head;
23? ? ? ? } else {
24? ? ? ? ? ? node.prev = t;
25? ? ? ? ? ? //通過(guò)CAS方式將結(jié)點(diǎn)添加到同步隊(duì)列之后才會(huì)返回,否則就會(huì)不斷嘗試添加(這樣實(shí)際上就是在并發(fā)情況下,把向同步隊(duì)列添加Node變得串行化了)
26? ? ? ? ? ? if (compareAndSetTail(t, node)) {
27? ? ? ? ? ? ? ? t.next = node;
28? ? ? ? ? ? ? ? return t;
29? ? ? ? ? ? }
30? ? ? ? }
31? ? }
32 }? ?
33 /**
34? *? ? 通過(guò)tryAcquire()和addWaiter(),表示該線程獲取同步狀態(tài)已經(jīng)失敗,被放入同步? ? ? ?
35? * 隊(duì)列尾部了。線程阻塞等待直到其他線程(前驅(qū)結(jié)點(diǎn)獲得同步裝填或者被中斷)釋放同步狀
36? * 態(tài)后喚醒自己,自己才能獲得。
37? */
38 final boolean acquireQueued(final Node node, int arg) {
39? ? boolean failed = true;
40? ? try {
41? ? ? ? boolean interrupted = false;
42? ? ? ? //線程在死循環(huán)的方式中嘗試獲取同步狀態(tài)
43? ? ? ? for (;;) {
44? ? ? ? ? ? final Node p = node.predecessor(); //獲取前驅(qū)結(jié)點(diǎn)
45? ? ? ? ? ? //只有前驅(qū)接待是頭結(jié)點(diǎn)的時(shí)候才能?chē)L試獲取同步狀態(tài)
46? ? ? ? ? ? if (p == head && tryAcquire(arg)) {
47? ? ? ? ? ? ? ? setHead(node); //獲取到同步狀態(tài)之后,就將自己設(shè)置為頭結(jié)點(diǎn)
48? ? ? ? ? ? ? ? p.next = null; //前驅(qū)結(jié)點(diǎn)已經(jīng)獲得同步狀態(tài)去執(zhí)行自己的程序了,所以需要釋放掉占用的同步隊(duì)列的資源,由JVM回收
49? ? ? ? ? ? ? ? failed = false;
50? ? ? ? ? ? ? ? return interrupted;
51? ? ? ? ? ? }
52? ? ? ? ? ? //如果獲取同步狀態(tài)失敗,應(yīng)該自旋等待繼續(xù)獲取并且校驗(yàn)自己的中斷標(biāo)志位信息
53? ? ? ? ? ? if (shouldParkAfterFailedAcquire(p, node) &&
54? ? ? ? ? ? ? ? parkAndCheckInterrupt())
55? ? ? ? ? ? ? ? interrupted = true; //如果被中斷,就改變自己的中斷標(biāo)志位狀態(tài)信息
56? ? ? ? }
57? ? } finally {
58? ? ? ? if (failed)
59? ? ? ? ? ? cancelAcquire(node);
60? ? }
61 }
③獨(dú)占式獲取同步狀態(tài)的整個(gè)流程
④獨(dú)占式同步器的釋放:release方法執(zhí)行時(shí),會(huì)喚醒頭結(jié)點(diǎn)的后繼結(jié)點(diǎn)線程
public final boolean release(int arg) {
? ? if (tryRelease(arg)) {
? ? ? ? Node h = head;//頭結(jié)點(diǎn)
? ? ? ? //喚醒頭結(jié)點(diǎn)的后繼結(jié)點(diǎn)線程
? ? ? ? if (h != null && h.waitStatus != 0)
? ? ? ? ? ? unparkSuccessor(h);
? ? ? ? return true;
? ? }
? ? return false;
}
3、共享式同步狀態(tài)的獲取和釋放?
①共享式獲取和獨(dú)占式獲取最主要的區(qū)別是能否有多個(gè)線程同時(shí)獲取到同步狀態(tài)。如圖所示簡(jiǎn)易描述二者的區(qū)別(共享式訪問(wèn)的時(shí)候,可以允許多個(gè)線程訪問(wèn)資源,但是存在獨(dú)占式訪問(wèn)的時(shí)候,同一時(shí)刻其他的不管是共享還是獨(dú)占都會(huì)被阻塞)
②關(guān)于共享式獲取同步狀態(tài)的方法
1 /**
2? * 此方法是共享模式下線程獲取共享同步狀態(tài)的頂層入口。它會(huì)嘗試去獲取同步狀態(tài),獲取成功則直接返回,
3? * 獲取失敗則進(jìn)入等待隊(duì)列一直嘗試獲取(執(zhí)行doAcquireShared方法體中的內(nèi)容),直到獲取到資源為止(條件就是tryAcquireShared方法返回值大于等于0),整個(gè)過(guò)程忽略中斷
4? */
5 public final void acquireShared(int arg) {
6? ? if (tryAcquireShared(arg) < 0)
7? ? ? ? doAcquireShared(arg);
8 }
9 /**
10? * "自旋"嘗試獲取同步狀態(tài)
11? */
12 private void doAcquireShared(int arg) {
13? ? //首先將該線程包括線程引用、等待狀態(tài)、前驅(qū)結(jié)點(diǎn)和后繼結(jié)點(diǎn)的信息封裝臺(tái)Node中,然后添加到等待隊(duì)列里面(一共享模式添加)
14? ? final Node node = addWaiter(Node.SHARED);
15? ? boolean failed = true;
16? ? try {
17? ? ? ? boolean interrupted = false; //當(dāng)前線程的中斷標(biāo)志
18? ? ? ? for (;;) {
19? ? ? ? ? ? final Node p = node.predecessor(); //獲取前驅(qū)結(jié)點(diǎn)
20? ? ? ? ? ? if (p == head) {
21? ? ? ? ? ? ? ? //當(dāng)前驅(qū)結(jié)點(diǎn)是頭結(jié)點(diǎn)的時(shí)候就會(huì)以共享的方式去嘗試獲取同步狀態(tài)
22? ? ? ? ? ? ? ? int r = tryAcquireShared(arg);
23? ? ? ? ? ? ? ? //判斷tryAcquireShared的返回值
24? ? ? ? ? ? ? ? if (r >= 0) {
25? ? ? ? ? ? ? ? ? ? //如果返回值大于等于0,表示獲取同步狀態(tài)成功,就修改當(dāng)前的頭結(jié)點(diǎn)并將信息傳播都后續(xù)的結(jié)點(diǎn)隊(duì)列中
26? ? ? ? ? ? ? ? ? ? setHeadAndPropagate(node, r);
27? ? ? ? ? ? ? ? ? ? p.next = null; // 釋放掉已經(jīng)獲取到同步狀態(tài)的前驅(qū)結(jié)點(diǎn)的資源
28? ? ? ? ? ? ? ? ? ? if (interrupted)
29? ? ? ? ? ? ? ? ? ? ? ? selfInterrupt(); //檢查中斷標(biāo)志
30? ? ? ? ? ? ? ? ? ? failed = false;
31? ? ? ? ? ? ? ? ? ? return;
32? ? ? ? ? ? ? ? }
33? ? ? ? ? ? }
34? ? ? ? ? ? if (shouldParkAfterFailedAcquire(p, node) &&
35? ? ? ? ? ? ? ? parkAndCheckInterrupt())
36? ? ? ? ? ? ? ? interrupted = true;
37? ? ? ? }
38? ? } finally {
39? ? ? ? if (failed)
40? ? ? ? ? ? cancelAcquire(node);
41? ? }
42 }
根據(jù)源代碼我們可以了解共享式獲取同步狀態(tài)的整個(gè)過(guò)程
首先同步器會(huì)調(diào)用tryAcquireShared方法來(lái)嘗試獲取同步狀態(tài),然后根據(jù)這個(gè)返回值來(lái)判斷是否獲取到同步狀態(tài)(當(dāng)返回值大于等于0可視為獲取到同步狀態(tài));如果第一次獲取失敗的話(huà),就進(jìn)入'自旋'狀態(tài)(執(zhí)行doAcquireShared方法)一直嘗試去獲取同步狀態(tài);在自旋獲取中,如果檢查到當(dāng)前前驅(qū)結(jié)點(diǎn)是頭結(jié)點(diǎn)的話(huà),就會(huì)嘗試獲取同步狀態(tài),而一旦獲取成功(tryAcquireShared方法返回值大于等于0)就可以從自旋狀態(tài)退出。
另外,還有一點(diǎn)就是上面說(shuō)到的一個(gè)處于等待隊(duì)列的線程要想開(kāi)始嘗試去獲取同步狀態(tài),需要滿(mǎn)足的條件就是前驅(qū)結(jié)點(diǎn)是頭結(jié)點(diǎn),那么它本身就是整個(gè)隊(duì)列中的第二個(gè)結(jié)點(diǎn)。當(dāng)頭結(jié)點(diǎn)釋放掉所有的臨界資源之后,我們考慮每個(gè)線程運(yùn)行所需資源的不同數(shù)量問(wèn)題,如下圖所示
③共享式同步狀態(tài)的釋放
對(duì)于支持共享式的同步組件(即多個(gè)線程同同時(shí)訪問(wèn)),它們和獨(dú)占式的主要區(qū)別就是tryReleaseShared方法必須確保同步狀態(tài)的釋放是線程安全的(CAS的模式來(lái)釋放同步狀態(tài),因?yàn)榧热皇嵌鄠€(gè)線程能夠訪問(wèn),那么釋放的時(shí)候也會(huì)是多個(gè)線程的,就需要保證釋放時(shí)候的線程安全)
1 /**
2? * 該方法是共享模式下線程釋放共享資源的頂層入口。它會(huì)釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會(huì)喚醒等待隊(duì)列里的其他線程來(lái)獲取資源。
3? */
4 public final boolean releaseShared(int arg) {
5? ? if (tryReleaseShared(arg)) {
6? ? ? ? doReleaseShared(); //
7? ? ? ? return true;
8? ? }
9? ? return false;
10 }
回到頂部
四、自定義同步組件的實(shí)現(xiàn)
1、共享式鎖的實(shí)現(xiàn)
①、自定義一個(gè)同步組件,可以允許兩個(gè)線程訪問(wèn)(共享式同步組件),超過(guò)兩個(gè)線程就會(huì)被阻塞。
②、既然是共享式同步組件,按照前面所說(shuō)的,組件本身需要使用AQS提供的共享式模板方法acquireShared等;組件的內(nèi)部類(lèi)需要實(shí)現(xiàn)AQS,并且重寫(xiě)關(guān)于共享式獲取同步狀態(tài)的方法(tryAcquireShared()、tryReleaseShared()等共享模式下的方法)。
③、既然是兩個(gè)線程能夠同時(shí)訪問(wèn)的話(huà),那么狀態(tài)數(shù)的取值范圍就是0、1、2了,每當(dāng)一個(gè)線程獲取到同步狀態(tài)的時(shí)候state值減1,反之就會(huì)增加1;當(dāng)state值為0的時(shí)候就會(huì)阻塞其他想要獲取同步狀態(tài)的線程。對(duì)于同步狀態(tài)的更改需要使用CAS來(lái)進(jìn)行保證原子性。
package cn.source.concurrent;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class TestAQS implements Lock{
??private Sync sync = new Sync(2);
??private static class Sync extends AbstractQueuedSynchronizer {
????Sync(int num) {
??????if(num <= 0) {
????????throw new RuntimeException("num需要大于0");
??????}
??????setState(num);
????}
????@Override
????protected int tryAcquireShared(int arg) {
??????for(; ;) {
????????int currentState = getState();
????????int newState = currentState - arg;
????????if(newState < 0 || compareAndSetState(currentState, newState)) {
??????????return newState;
????????}
??????}
????}
????@Override
????protected boolean tryReleaseShared(int arg) {
??????for(; ;) {
????????int currentState = getState();
????????int newState = currentState + arg;
????????if(compareAndSetState(currentState, newState)) {
??????????return true;
????????}
??????}
????}
??}
??@Override
??public void lock() {
????sync.acquireShared(1);
??}
??@Override
??public void unlock() {
????sync.releaseShared(1);
??}
??//......
}
共享式鎖
/**
?* 測(cè)試結(jié)果:輸出的線程名稱(chēng)是成對(duì)的,保證同一時(shí)刻只有兩個(gè)線程能夠獲取到鎖
?*
?*/?????
public class TestLockShare {
??@Test
??public void test() {
????Lock lock = new TestAQS();
????class Worker extends Thread {
??????@Override
??????public void run() {
????????while(true) {
??????????lock.lock();
??????????try {
????????????Thread.sleep(1000);
????????????System.out.println(Thread.currentThread().getName());
????????????Thread.sleep(1000);
??????????} catch (Exception e) {
????????????e.printStackTrace();
??????????} finally {
????????????lock.unlock();
??????????}
????????}
??????}
????}
????for (int i = 0; i < 8; i++) {
??????Worker worker = new Worker();
??????worker.setDaemon(true);
??????worker.start();
????}
????for (int i = 0; i < 8; i++) {
??????try {
????????Thread.sleep(1000);
??????} catch (InterruptedException e) {
????????// TODO Auto-generated catch block
????????e.printStackTrace();
??????}
??????System.out.println();
????}
??}
}
共享式鎖測(cè)試
2、獨(dú)占式鎖的實(shí)現(xiàn)
package cn.source.concurrent;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Mutex implements Lock{
??private Sync sync = new Sync();
??private static class Sync extends AbstractQueuedSynchronizer {
????/**
?????* 嘗試獲取資源,立即返回。成功則返回true,否則false。
?????*/
????@Override
????protected boolean tryAcquire(int arg) {
??????if(compareAndSetState(0, 1)) {//state為0才設(shè)置為1,不可重入!
????????setExclusiveOwnerThread(Thread.currentThread());//設(shè)置為當(dāng)前線程獨(dú)占資源
????????return true;
??????}
??????return false;
????}
????/**
?????* 嘗試釋放資源,立即返回。成功則為true,否則false。
?????*/
????@Override
????protected boolean tryRelease(int arg) {
??????if(getState() == 0) { //既然來(lái)釋放,那肯定就是已占有狀態(tài)了。只是為了保險(xiǎn),多層判斷!
????????throw new IllegalMonitorStateException();
??????}
??????setExclusiveOwnerThread(null);
??????setState(0);
??????return true;
????}
????@Override
????protected boolean isHeldExclusively() {
??????// 判斷是否鎖定狀態(tài)
??????return getState() == 1;
????}
??}
??@Override
??public void lock() {
????sync.acquire(1);
??}
??@Override
??public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
????return sync.tryAcquire(1);
??}
??@Override
??public void unlock() {
????sync.release(1);
??}
}
獨(dú)占式鎖
public class TestMutex {
??@Test
??public void test() {
????Lock lock = new Mutex();
????class Worker extends Thread {
??????@Override
??????public void run() {
????????while(true) {
??????????lock.lock();
??????????try {
????????????Thread.sleep(1000);
????????????System.out.println(Thread.currentThread().getName());
????????????Thread.sleep(1000);
??????????} catch (Exception e) {
????????????e.printStackTrace();
??????????} finally {
????????????lock.unlock();
??????????}
????????}
??????}
????}
????for (int i = 0; i < 8; i++) {
??????Worker worker = new Worker();
??????worker.setDaemon(true);
??????worker.start();
????}
????for (int i = 0; i < 8; i++) {
??????try {
????????Thread.sleep(1000);
??????} catch (InterruptedException e) {
????????e.printStackTrace();
??????}
??????System.out.println();
????}
??}
}
獨(dú)占式鎖測(cè)試
歡迎工作一到五年的Java工程師朋友們加入Java程序員開(kāi)發(fā): 721575865
群內(nèi)提供免費(fèi)的Java架構(gòu)學(xué)習(xí)資料(里面有高可用、高并發(fā)、高性能及分布式、Jvm性能調(diào)優(yōu)、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個(gè)知識(shí)點(diǎn)的架構(gòu)資料)合理利用自己每一分每一秒的時(shí)間來(lái)學(xué)習(xí)提升自己,不要再用"沒(méi)有時(shí)間“來(lái)掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來(lái)的自己一個(gè)交代!