Lock接口
與synchronized關(guān)鍵字相比擁有了鎖獲取與釋放的可操作性,可非阻塞的獲取鎖、可中斷的獲取鎖、超時(shí)獲取鎖。
標(biāo)準(zhǔn)接口定義
void lock();
void lockInterruptibly() throws InterruptedException;//可中斷的阻塞獲取鎖
Condition newCondition();
boolean tryLock();//非阻塞的獲取鎖,馬上返回
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;//最大阻塞time時(shí)間,超時(shí)之后返回,可中斷
void unlock();//釋放鎖
標(biāo)準(zhǔn)使用方式
注意不要將獲取鎖的過(guò)程寫(xiě)到try塊里面
Lock lock = new ReentrantLock();
lock.lock();
try{
//to do
}finally{
lock.unlock();
}
隊(duì)列同步器AQS(AbstractQueuedSynchronizer)
簡(jiǎn)單的理解是對(duì)于請(qǐng)求獲取鎖的線(xiàn)程,如果當(dāng)前鎖已經(jīng)被別的線(xiàn)程獲取,那么當(dāng)前線(xiàn)程需要到隊(duì)列中排隊(duì)。如果隊(duì)列中的第一個(gè)線(xiàn)程釋放了鎖,那么就會(huì)喚醒排在它后面的一個(gè)線(xiàn)程去獲取鎖(如果非公平鎖,可能會(huì)被插隊(duì))。
核心字段
private volatile int state;//同步狀態(tài)
private transient volatile Node head;//同步隊(duì)列的頭指針
private transient volatile Node tail;//同步隊(duì)列的尾指針
核心方法
int getState();
void setState(int newState);
boolean compareAndSetState(int expect, int update);
boolean isHeldExclusively();
void acquire(long arg);
boolean release(long arg);
void acquireShared(long arg);
boolean releaseShared(long arg);
同步隊(duì)列的Node節(jié)點(diǎn)
這里最好結(jié)合源碼的英文注釋理解
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
獨(dú)占式同步狀態(tài)的獲取源碼分析
acquire方法
首先非阻塞的獲取同步狀態(tài)(tryAcquire方法),如果失敗的話(huà)會(huì)把當(dāng)前線(xiàn)程信息構(gòu)造一個(gè)Node節(jié)點(diǎn)加入到同步隊(duì)列的尾部進(jìn)行排隊(duì)(這里可能會(huì)使線(xiàn)程進(jìn)入waiting狀態(tài))
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire方法
非阻塞的獲取同步狀態(tài),失敗返回false.這個(gè)是子類(lèi)實(shí)現(xiàn)的。(模板方法模式)
addWaiter方法
將當(dāng)前節(jié)點(diǎn)加入到同步隊(duì)列的尾部
private Node addWaiter(Node mode) {
// 這里mode為EXCLUSIVE,也就是 node.nextWaiter==null.
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//下面if邏輯是嘗試快速插入到隊(duì)列尾部,如果失敗到enq函數(shù)
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);//當(dāng)前節(jié)點(diǎn)插入到隊(duì)列尾部
return node;
}
enq 方法
當(dāng)前節(jié)點(diǎn)插入到隊(duì)列尾部,保證成功,可能重試多次。采用CAS更改改共享變量的標(biāo)準(zhǔn)寫(xiě)法。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued方法
對(duì)于剛進(jìn)入隊(duì)列排隊(duì)的節(jié)點(diǎn)或者剛被喚醒的線(xiàn)程(LockSupport.unpark(thread)),會(huì)檢查它前面的那個(gè)節(jié)點(diǎn)是不是head節(jié)點(diǎn),如果是的話(huà)嘗試獲取鎖。如果不是的話(huà)會(huì)判斷當(dāng)前線(xiàn)程是否可以到waiting狀態(tài)(釋放cpu資源,避免盲等),(判斷的依據(jù)主要是當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)的waitStatus是否為SIGNAL),如果可以睡眠,就會(huì)睡眠直到被前一個(gè)節(jié)點(diǎn)喚醒。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn),并且獲取同步狀態(tài)成功,則把當(dāng)前節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn),然后返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判斷當(dāng)前節(jié)點(diǎn)是否可以睡眠或者把當(dāng)前節(jié)點(diǎn)狀態(tài)改到可睡眠狀態(tài)(可以睡否最重要的依據(jù)是睡后有人喚醒你,否則你就醒不來(lái)了)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire方法
如果暫時(shí)不能獲取同步狀態(tài),線(xiàn)程會(huì)考慮睡一會(huì)(park)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)的waitStatus為Node.SIGNAL,則其前驅(qū)節(jié)點(diǎn)在釋放鎖之后就會(huì)喚醒(uppark)當(dāng)前線(xiàn)程,所以當(dāng)前線(xiàn)程可以放心的park
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//如果前驅(qū)節(jié)點(diǎn)已經(jīng)放棄獲取鎖,更新其前驅(qū)節(jié)點(diǎn)
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//嘗試把前驅(qū)節(jié)點(diǎn)狀態(tài)改為Node.SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt方法
當(dāng)前線(xiàn)程park,到waiting 狀態(tài),線(xiàn)程阻塞于當(dāng)前方法直到被中斷或者unpark
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
獨(dú)占式同步狀態(tài)釋放源碼分析
release方法
釋放同步狀態(tài),喚醒隊(duì)里中的下一個(gè)節(jié)點(diǎn)。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor方法
喚醒隊(duì)列中下一個(gè)節(jié)點(diǎn)去參與鎖的競(jìng)爭(zhēng)
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
共享式獲取同步狀態(tài)源碼分析
acquireShared方法
如果沒(méi)有獲取同步狀態(tài)成功,就會(huì)將當(dāng)前節(jié)點(diǎn)更新加入到同步隊(duì)列。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
doAcquireShared
同步隊(duì)列中獲取獨(dú)占式同步狀態(tài),類(lèi)似于獨(dú)占模式同步狀態(tài)的獲取。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//如果當(dāng)前節(jié)點(diǎn)獲取同步狀態(tài)成功,會(huì)將當(dāng)前節(jié)點(diǎn)設(shè)為頭節(jié)點(diǎn),
//并且傳播到它的下一個(gè)節(jié)點(diǎn)(下一個(gè)節(jié)點(diǎn)嘗試獲取同步狀態(tài))
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate方法
設(shè)置node節(jié)點(diǎn)為head節(jié)點(diǎn),并且嘗試傳播到下一個(gè)節(jié)點(diǎn)(因?yàn)楣蚕砟J降耐綘顟B(tài)是可以有多個(gè)線(xiàn)程同時(shí)獲取的)
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//如果下一個(gè)節(jié)點(diǎn)是共享模式的節(jié)點(diǎn),就考慮喚醒下一個(gè)節(jié)點(diǎn)去競(jìng)爭(zhēng)性獲取共享狀態(tài)。
if (s == null || s.isShared())
doReleaseShared();
}
}
共享式釋放同步狀態(tài)源碼分析
releaseShared方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}