注:關鍵的代碼的部分會有注釋。
AQS介紹,主要是看屬性,方法遇到了再看
內部類 node,用于維護獲取鎖的線程的雙向隊列,詳細的可以具體看注釋,這里就不一一介紹了
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
.....
//獲取鎖隊列隊頭
private transient volatile Node head;
//獲取鎖隊列隊尾
private transient volatile Node tail;
//加鎖標志位,主要是通過cas操作,算是委托給他判斷是否加鎖成功
private volatile int state;
//在aqs父類中,當前持有鎖的線程
private transient Thread exclusiveOwnerThread;
......
ReentrantLock的具體屬性
//Sync 繼承自AQS
abstract static class Sync extends AbstractQueuedSynchronizer{...}
//
private final Sync sync;
//公平實現(xiàn)
static final class FairSync extends Sync {...}
//非公平實現(xiàn)
static final class NonfairSync extends Sync {...}
加鎖過程
lock方法解析
public void lock() {
sync.lock();
}
sync.lock() 方法有兩個實現(xiàn),一個是公平鎖的實現(xiàn),另一個是非公平鎖的實現(xiàn),如下圖
1.png
我們先看非公平鎖的實現(xiàn)的加鎖方式(NonfairSync),代碼如下
final void lock() {
//如果cas修改值state值成功,則加鎖成功
if (compareAndSetState(0, 1))
//設置當前持有鎖的線程為當前線程
setExclusiveOwnerThread(Thread.currentThread());
else
//不成功就在重新獲取鎖,過程較為復雜
acquire(1);
}
acquire(1)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
此方法可分為三個部分,!tryAcquire(arg)、addWaiter(Node.EXCLUSIVE)、 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),一一分析具體干了什么事。
!tryAcquire(arg),可以跟蹤代碼先調用NonfairSync.tryAcquire()方法,調用Sync.nonfairTryAcquire()
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//獲取當前線程
final Thread current = Thread.currentThread();
//獲取state的值
int c = getState();
//等于0那么現(xiàn)在無線程持有鎖
if (c == 0) {
//cas獲取鎖
if (compareAndSetState(0, acquires)) {
//設置當前線程持有鎖
setExclusiveOwnerThread(current);
return true;
}
}
//如果持有鎖的線程和當前線程為同一線程(重入),state累加
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
若以上方法返回true,那么加鎖成功,則!tryAcquire(arg)為false,不會執(zhí)行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)這部分代碼。若以上代碼返回false,加鎖失敗,則!tryAcquire(arg)為true,則會執(zhí)行acquireQueued(addWaiter(Node.EXCLUSIVE), arg),現(xiàn)在先假設加鎖失敗。
addWaiter(Node.EXCLUSIVE),索取鎖線程入隊
private Node addWaiter(Node mode) {
//新建node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//取隊尾
Node pred = tail;
//如果隊尾不為空
if (pred != null) {
//取當線程的的節(jié)點的前節(jié)點為對位
node.prev = pred;
//cas設置隊尾的后節(jié)點為當前線程的節(jié)點
if (compareAndSetTail(pred, node)) {
//倒數(shù)第二節(jié)點的下個節(jié)點指向當前線程的節(jié)點
pred.next = node;
return node;
}
}
//尾節(jié)點不為空,入隊
enq(node);
return node;
}
//入隊相關代碼
private Node enq(final Node node) {
for (;;) {
//取隊尾
Node t = tail;
//初始化隊列
if (t == null) { // Must initialize
//設置頭節(jié)點
if (compareAndSetHead(new Node()))
//隊首隊尾都指向 上一行的new Node()
tail = head;
} else {
//當前線程節(jié)點的前節(jié)點為隊尾節(jié)點
node.prev = t;
//cas操作設置隊尾為當前線程節(jié)點
if (compareAndSetTail(t, node)) {
隊尾(現(xiàn)在倒數(shù)第二個節(jié)點)后節(jié)點為當前線程節(jié)點
t.next = node;
return t;
}
}
}
}
到這里入隊的相關操作就完成分析了,開始下一個方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg)的分析。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg),從隊列面獲取線程來持有鎖。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//獲取當前線程節(jié)點的前節(jié)點
final Node p = node.predecessor();
//如果前節(jié)點為隊首(這里明白這個雙向隊列的第一個節(jié)點為空),且獲取鎖成功(NonfairSync.tryAcquire())
if (p == head && tryAcquire(arg)) {
setHead(node);
//如果不設置為null,可能會引起oom,引用一直存在
p.next = null; // help GC
failed = false;
return interrupted;
}
//判斷是否需要掛機線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//設置當前節(jié)點為頭節(jié)點,清空了線程和前節(jié)點
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
//判斷是否需要掛起線程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//node 類中有對應描述,代表現(xiàn)需要被unpraking
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//節(jié)點被取消,跳過 、重試
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//設置parking標志
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//park線程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
以上就是ReentrantLock 的加鎖過程。后面解析ReentrantLock 的解鎖過程
unlock
public void unlock() {
//調用sync的release
sync.release(1);
}
public final boolean release(int arg) {
//如果解鎖成功
if (tryRelease(arg)) {
//取頭節(jié)點
Node h = head;
//如果頭節(jié)點不為空且waitStatus 不為CANCELLED、SIGNAL 、CONDITION 、PROPAGATE
if (h != null && h.waitStatus != 0)
//解鎖線程
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//獲取狀態(tài)并減1
int c = getState() - releases;
//判斷持有鎖的線程是不是當前線程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//解鎖成功
if (c == 0) {
free = true;
//清空線程
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//unpark線程節(jié)點
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
//ws<0 即ws為CANCELLED 、SIGNAL 、CONDITION 、PROPAGATE 中一種
if (ws < 0)
//ws設置為0,讓下一個線程節(jié)點嘗試獲取鎖
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
公平鎖與非公平鎖區(qū)別在于tryAcquire的時候,會檢查是不是已經(jīng)有線程在排隊了,如果有那么直接入隊尾,不在嘗試獲取鎖
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//判斷是隊列是否已有線程
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
//h != t 初始化的時候,(s = h.next) == null頭節(jié)點的下個節(jié)點為空(第一個節(jié)點為”空“),s.thread != Thread.currentThread()頭節(jié)點的下個節(jié)點與當前線程不為同一線程
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
綜上就是ReentrantLock的加鎖和解鎖過程,現(xiàn)在對自旋有一點感覺了,enq(...)和 acquireQueued(...)中循環(huán)都是自選的體現(xiàn)。