該包的結(jié)構(gòu)如圖

其中有java6新加入的一個類LockSupport,這個類已經(jīng)在自旋鎖在高并發(fā)的用處,實現(xiàn)各種鎖的最后提到過,也是基于Unsafe類的實現(xiàn) 主要的兩個方法是
public static void unpark(Thread thread);
public static void park(Object blocker) {
public static void parkNanos(Object blocker, long nanos) ;
public static void parkUntil(Object blocker, long deadline) ;
public static Object getBlocker(Thread t) ;
public static void park() ;
public static void parkNanos(long nanos);
public static void parkUntil(long deadline);
基本是基于
unsafe.putObject
unsafe.unpark
unsafe.park
來實現(xiàn)的。由于已經(jīng)分析過。所以這里不再說了。
這個包下的類 具體的實現(xiàn)類只有ReentrantLock和ReentrantReadWriteLock(除去單獨講的LockSupport),其余的都是抽象類和接口,做一下歸類
抽象類 AbstractOwnableSynchronizer 、AbstractQueuedLongSynchronizer、AbstractQueuedSynchronizer
接口 Lock、ReadWriteLock、Condition
抽象類AbstractOwnableSynchronizer 該類是主要定義讓線程以獨占方式擁有同步器,此類為創(chuàng)建鎖和相關(guān)同步器提供了基礎(chǔ),類本身不管理或使用此信息 很簡單的兩個方法setExclusiveOwnerThread(Thread t)設(shè)置當(dāng)前擁有獨占訪問的線程 和getExclusiveOwnerThread() 返回由 setExclusiveOwnerThread最后設(shè)置的線程;如果從未設(shè)置,則返回 null。
抽象類AbstractQueuedSynchronizer 繼承自AbstractOwnableSynchronizer該類為實現(xiàn)依賴于先進先出 (FIFO) 等待隊列的阻塞鎖和相關(guān)同步器(信號量、事件,等等)提供一個框架 和是MCSLock的擴展。該類有個屬性 private volatile int state; 是The synchronization state. 同步的狀態(tài)表示。擁有正常的set和get方法后,還有個compareAndSetState方法,是基于unsafe類的compareAndSwapInt來實現(xiàn)的,由此類實現(xiàn)同步。
相似的AbstractQueuedLongSynchronizer類的屬性定義是private volatile long state;可以看出是LONG型的屬性值。調(diào)用的是unsafe.compareAndSwapLong,所以二者的區(qū)別就基本知道了,其它沒啥區(qū)別。所以我們只要繼續(xù)分析AbstractQueuedLongSynchronizer類即可
該類有Node 內(nèi)部類和ConditionObject 內(nèi)部類,其中Node類定義了兩種模式的node屬性,
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
一種是獨占的一種是共享的。還定義了四種等待狀態(tài)
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
- 節(jié)點等待狀態(tài)被取消掉了 值為1;
- 節(jié)點等待狀態(tài)標(biāo)示 等待獲得鎖的線程去unparking 通知 值為-1;
- 節(jié)點等待狀態(tài) 為處于條件等待狀態(tài) 值為-2;
- 節(jié)點等待狀態(tài)為節(jié)點狀態(tài)需要向后傳播,一般是釋放共享需要傳播給其他節(jié)點,一般是頭節(jié)點在doReleaseShared 去確保傳播繼續(xù)下去即使其他操作已經(jīng)介入了。值為-3;
- 還有值為0的情況 這個表明處于非上述四種狀態(tài)。
這里還有volatile Node prev;和volatile Node next; 隊列的前后節(jié)點
這里AbstractQueuedSynchronizer也有 private transient volatile Node tail;和 private transient volatile Node head; 這個是首尾節(jié)點 不要混淆
結(jié)構(gòu)圖大致為

由前面MCSLock的知識知道
volatile Thread thread; 是標(biāo)示是否為當(dāng)前線程獲得鎖的這里還有個
Node nextWaiter; 這個屬性要結(jié)合Condition去理解,該節(jié)點關(guān)聯(lián)的是等待某條件上的下個節(jié)點。因為 條件隊列只有是獨占模式下才可以被訪問到。再先來看一下內(nèi)部類ConditionObject 該類實現(xiàn)了Condition接口。
該對象內(nèi)部也維護了一個隊列,
private transient Node firstWaiter;
private transient Node lastWaiter;
Condition主要是為了在J.U.C框架中提供和Java傳統(tǒng)的監(jiān)視器風(fēng)格的wait,notify和notifyAll方法類似的功能。
JDK官方的解釋是
Condition將 Object監(jiān)視器方法(wait、notify和 notifyAll)分解成截然不同的對象,以便通過將這些對象與任意 Lock 實現(xiàn)組合使用,為每個對象提供多個等待 set(wait-set)。其中,Lock 替代了 synchronized方法和語句的使用,Condition替代了 Object 監(jiān)視器方法的使用。條件(也稱為條件隊列 或條件變量)為線程提供了一個含義,以便在某個狀態(tài)條件現(xiàn)在可能為 true 的另一個線程通知它之前,一直掛起該線程(即讓其“等待”)。因為訪問此共享狀態(tài)信息發(fā)生在不同的線程中,所以它必須受保護,因此要將某種形式的鎖與該條件相關(guān)聯(lián)。等待提供一個條件的主要屬性是:以原子方式 釋放相關(guān)的鎖,并掛起當(dāng)前線程,就像 Object.wait做的那樣。Condition實例實質(zhì)上被綁定到一個鎖上。要為特定 Lock 實例獲得 Condition實例,請使用其 newCondition()方法。
提供了
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
方法,await開頭的都是和await方法一致,只不過添加了寫條件而已,await會在當(dāng)前l(fā)ock的隊列中持有鎖的線程上釋放鎖資源,并新建Condition節(jié)點加到Condition隊列尾部,阻塞當(dāng)前線程。
signal()和signalAll()就是將Condition的頭節(jié)點移動到Lock等待節(jié)點尾部,讓其等待再次獲取鎖。
主要看await方法和signal方法是怎么實現(xiàn)的即可
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
可以看到這里借助了LockSupport.park(this);方法。其他的方法主要是做各種檢測 這里就不說了
signal方法
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
這里調(diào)用了doSignal方法
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
看一下while循環(huán)調(diào)用到的方法
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
可以看出還是借助了unsafe和 LockSupport.unpark方法實現(xiàn)鎖釋放。
看到一篇不錯的博文對Condition和AbstractQueuedSynchronizer的闡述,這里借助下他的圖
以下是AQS隊列和Condition隊列的出入結(jié)點的示意圖,可以通過這幾張圖看出線程結(jié)點在兩個隊列中的出入關(guān)系和條件。
I.初始化狀態(tài):AQS等待隊列有3個Node,Condition隊列有1個Node(也有可能1個都沒有)

II.節(jié)點1執(zhí)行Condition.await()
1.將head后移
2.釋放節(jié)點1的鎖并從AQS等待隊列中移除
3.將節(jié)點1加入到Condition的等待隊列中
4.更新lastWaiter為節(jié)點1

III.節(jié)點2執(zhí)行signal()操作
5.將firstWaiter后移
6.將節(jié)點4移出Condition隊列
7.將節(jié)點4加入到AQS的等待隊列中去
8.更新AQS的等待隊列的tail

最后給出官方的Condition例子
package com.alibaba.otter.canal.common;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length)
putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length)
takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
再看下AbstractQueuedSynchronizer 留給子類去實現(xiàn)的方法,看一下官方文檔的說法
為了將此類用作同步器的基礎(chǔ),需要適當(dāng)?shù)刂匦露x以下方法,這是通過使用 getState setState和/或 compareAndSetState方法來檢查和/或修改同步狀態(tài)來實現(xiàn)的
- tryAcquire(int)
- tryRelease(int)
- tryAcquireShared(int)
- tryReleaseShared(int)
- isHeldExclusively()
默認(rèn)情況下,每個方法都拋出 UnsupportedOperationException。這些方法的實現(xiàn)在內(nèi)部必須是線程安全的,通常應(yīng)該很短并且不被阻塞。定義這些方法是使用此類的 唯一 受支持的方式。其他所有方法都被聲明為 final
,因為它們無法是各不相同的。
也就是說子類需要自己實現(xiàn)這些方法去構(gòu)造共享鎖和獨占鎖,怎么實現(xiàn)呢?就是利用getState 、setState、和compareAndSetState這幾個AbstractQueuedSynchronizer中已經(jīng)實現(xiàn)好的方法。
我們看下兩個實現(xiàn)類ReentrantLock和ReentrantReadWriteLock,這算是比較典型的兩個例子了。一個是可重入鎖,是獨占鎖的方式,ReentrantReadWriteLock是共享鎖的實現(xiàn)。完美
ReentrantLock中還實現(xiàn)了公平鎖Sync和非公平鎖NonfairSync
都是借助繼承了AbstractQueuedSynchronizer的內(nèi)部類Sync來實現(xiàn) 看到實現(xiàn)了tryAcquire和tryRelease方法
其中獨占方式的判斷
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
這樣我們就可以簡單的利用ReentrantLock的lock和unlock實現(xiàn)鎖啦
再看下ReentrantReadWriteLock 這個類 這個類實現(xiàn)了ReadWriteLock 有兩種鎖readerLock和writerLock
看下內(nèi)部類ReadLock 主要的方法都是共享的
public void lock() {
sync.acquireShared(1);
}
public void unlock() {
sync.releaseShared(1);
}
共享鎖時通過計數(shù)的方式實現(xiàn)的,最大可以到
static final int SHARED_SHIFT = 16;
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
可以看到這里還涉及到了
/**
* A counter for per-thread read hold counts.
* Maintained as a ThreadLocal; cached in cachedHoldCounter
*/
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = Thread.currentThread().getId();
}
再看下
主要的方法都是獨占的
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
這二者都借助
abstract static class Sync extends AbstractQueuedSynchronizer
還是出自AbstractQueuedSynchronizer
這種讀寫鎖,讀共享,寫?yīng)氄伎梢越档玩i的粒度 重入方面其內(nèi)部的WriteLock可以獲取ReadLock,但是反過來ReadLock想要獲得WriteLock則永遠都不要想。 WriteLock可以降級為ReadLock,順序是:先獲得WriteLock再獲得ReadLock,然后釋放WriteLock,這時候線程將保持Readlock的持有。反過來ReadLock想要升級為WriteLock則不可能.ReadLock可以被多個線程持有并且在作用時排斥任何的WriteLock,而WriteLock則是完全的互斥。這一特性最為重要,因為對于高讀取頻率而相對較低寫入的數(shù)據(jù)結(jié)構(gòu),使用此類鎖同步機制則可以提高并發(fā)量.不管是ReadLock還是WriteLock都支持Interrupt,語義與ReentrantLock一致。WriteLock支持Condition并且與ReentrantLock語義一致,而ReadLock則不能使用Condition,否則拋出UnsupportedOperationException異常,這種鎖的用處很多,緩存方面應(yīng)該更明顯 在網(wǎng)上找到的個demo 這里附上
package com.alibaba.otter.canal.common;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockDemo {
private Map<String, Object> map = new HashMap<String, Object>();//緩存器
private ReadWriteLock rwl = new ReentrantReadWriteLock();
public static void main(String[] args) {
}
public Object get(String id){
Object value = null;
rwl.readLock().lock();//首先開啟讀鎖,從緩存中去取
try{
value = map.get(id);
if(value == null){ //如果緩存中沒有釋放讀鎖,上寫鎖
rwl.readLock().unlock();
rwl.writeLock().lock();
try{
if(value == null){
value = "aaa"; //此時可以去數(shù)據(jù)庫中查找,這里簡單的模擬一下
}
}finally{
rwl.writeLock().unlock(); //釋放寫鎖
}
rwl.readLock().lock(); //然后再上讀鎖
}
}finally{
rwl.readLock().unlock(); //最后釋放讀鎖
}
return value;
}
}