類信息
- 內(nèi)部抽象類:Sync 繼承 AbstractQueuedSynchronizer
- 內(nèi)部類:非公平鎖:NonfairSync 繼承 Sync
加鎖過程
lock():CAS修改同步狀態(tài) 成功則設(shè)置exclusiveOwnerThread為當(dāng)前線程,然后執(zhí)行業(yè)務(wù),失敗則調(diào)用acquire(1);
final void lock() {
//設(shè)置AbstractQueuedSynchronizer的state為1
if (compareAndSetState(0, 1))
//成功后設(shè)置AbstractOwnableSynchronizer的thread為當(dāng)前線程
setExclusiveOwnerThread(Thread.currentThread());
else
//失敗時(shí)線程阻塞
acquire(1);
}
acquire(1): AbstractQueuedSynchronizer中的方法,執(zhí)行tryAcquire(1),acquireQueued(Node, 1),addWaiter(Node.EXCLUSIVE),接下來依次看這三個(gè)方法;
public final void acquire(int arg) {
//嘗試獲取鎖,如果獲取成功則已,不成功則加入等待隊(duì)列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(1):執(zhí)行nonfairTryAcquire(acquires),即Sync類的方法nonfairTryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
nonfairTryAcquire(acquires):非公平鎖獲取
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//再次獲取同步狀態(tài),如果是0則獲取鎖執(zhí)行
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
/**
* 如果是當(dāng)前線程是已經(jīng)獲取鎖的線程
* 讓某個(gè)線程可以多次調(diào)用同一個(gè)ReentrantLock,每調(diào)用一次給state+1,
* 由于某個(gè)線程已經(jīng)持有了鎖,所以這里不會(huì)有競爭,
* 因此不需要利用CAS設(shè)置state(相當(dāng)于一個(gè)偏向鎖)。從這段代碼可以看到,nextc每次加1,
* 當(dāng)nextc<0的時(shí)候拋出error,那么同一個(gè)鎖最多能重入Integer.MAX_VALUE次,也就是2147483647。
*/
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter(Node.EXCLUSIVE):AbstractQueuedSynchronizer中的方法,為當(dāng)前線程和指定模式創(chuàng)建并擴(kuò)充一個(gè)等待隊(duì)列。Node.EXCLUSIVE標(biāo)記表示節(jié)點(diǎn)正在獨(dú)占模式下等待
private Node addWaiter(Node mode) {
//為當(dāng)前線程創(chuàng)建一個(gè)等待節(jié)點(diǎn)
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//將需要等待的線程加入到等待隊(duì)列,如果沒有隊(duì)列則新建執(zhí)行enq
//查看等待隊(duì)列的尾部節(jié)點(diǎn)是否為空
Node pred = tail;
if (pred != null) {
//設(shè)置當(dāng)前線程的節(jié)點(diǎn)為尾節(jié)點(diǎn),擴(kuò)充隊(duì)列,并返回當(dāng)前線程節(jié)點(diǎn)
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
enq(node):判斷尾節(jié)點(diǎn)是否為空,為空則初始化頭尾節(jié)點(diǎn)(如果尾節(jié)點(diǎn)為空,則頭節(jié)點(diǎn)一定為空,頭節(jié)點(diǎn)為懶加載,只有隊(duì)列中插入第一個(gè)節(jié)點(diǎn)時(shí)才初始化),否則插入新節(jié)點(diǎn)
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued(Node, 1):加入隊(duì)列的節(jié)點(diǎn)再次嘗試獲取鎖,否則進(jìn)入阻塞狀態(tài)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//再次嘗試獲取鎖
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//獲取鎖 ---> 清除節(jié)點(diǎn)信息,設(shè)置為隊(duì)列頭結(jié)點(diǎn)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果獲取不到鎖
// 1、shouldParkAfterFailedAcquire:判斷是否進(jìn)入阻塞,進(jìn)入等待
// 2、parkAndCheckInterrupt 進(jìn)入等待,中斷線程
// 條件滿足時(shí),修改中斷狀態(tài),再次進(jìn)入循環(huán)獲取鎖
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(p, node):判斷是否進(jìn)入阻塞或者進(jìn)入等待,通過判斷當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)狀態(tài),如果是SIGNAL表示自己被阻塞,返回true;如果小于0,則表示前驅(qū)節(jié)點(diǎn)為取消狀態(tài),跳過,直到鏈接到不是取消狀態(tài)的節(jié)點(diǎn),返回false;如果兩種都不符合,則通過CAS修改前驅(qū)節(jié)點(diǎn)狀態(tài)為Node.SIGNAL,返回false.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前驅(qū)節(jié)點(diǎn)的等待狀態(tài)
int ws = pred.waitStatus;
//判斷前驅(qū)節(jié)點(diǎn)狀態(tài)
if (ws == Node.SIGNAL)
/*
* 此節(jié)點(diǎn)已設(shè)置狀態(tài),要求釋放信號(hào),因此可以安全停放。
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
//跳過狀態(tài)值為1的Node節(jié)點(diǎn)
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
* 更新狀態(tài)
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt():
//禁用當(dāng)前線程進(jìn)入等待狀態(tài)并中斷線程本身
private final boolean parkAndCheckInterrupt() {
//除非許可證可用,否則禁用當(dāng)前線程以進(jìn)行線程調(diào)度。
LockSupport.park(this);
return Thread.interrupted();
}
cancelAcquire(node):如果發(fā)生異常,取消正在進(jìn)行的Node獲取鎖的嘗試
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
//跳過取消的前驅(qū)節(jié)點(diǎn)
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
//設(shè)置此節(jié)點(diǎn)的狀態(tài)為CANCELLED,代表取消狀態(tài)
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
//如果node是tail,更新tail為pred,并使pred.next指向null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
//如果node既不是tail,又不是head的后繼節(jié)點(diǎn)
//則將node的前繼節(jié)點(diǎn)的waitStatus置為SIGNAL
//并使node的前繼節(jié)點(diǎn)指向node的后繼節(jié)點(diǎn)(相當(dāng)于將node從隊(duì)列中刪掉了)
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//如果node是head的后繼節(jié)點(diǎn),則直接喚醒node的后繼節(jié)點(diǎn)
//喚醒后繼節(jié)點(diǎn)線程后,在做出隊(duì)操作時(shí)將node節(jié)點(diǎn)刪除
unparkSuccessor(node);
}
node.next = node; // help GC
}
}