鎖的概念
鎖是用來控制多個線程訪問共享資源的方式,一般來說,一個鎖可以防止多個線程同時訪問共享資源(但有些鎖可以允許多個線程并發(fā)的訪問共享資源,如讀寫鎖)。
在JDK1.5之前,Java是通過synchronized關(guān)鍵字實現(xiàn)鎖功能的:隱式地獲取鎖和釋放鎖,但不夠靈活。
在JDK1.5,java.util.concurrent包中新增了Lock接口以及相關(guān)實現(xiàn)類,用來實現(xiàn)鎖功能。它提供了與synchronized關(guān)鍵字類似的同步功能,但功能更強大和靈活:獲取鎖和釋放鎖的可操作性、可中斷地獲取鎖、超時獲取鎖等,見下表:
| 特性 | 描述 |
|---|---|
| 嘗試非阻塞地獲取鎖 | 當前線程嘗試獲取鎖,如果這個時刻鎖沒有被其他線程獲取到,則成功獲取并持有鎖 |
| 能被中斷地獲取鎖 | 獲取到鎖的線程能夠響應(yīng)中斷(而synchronized則不會響應(yīng)中斷操作) |
| 超時獲取鎖 | 在指定的截止時間之前獲取鎖,如果在截止時間到了仍無法獲取鎖,則返回。 |
Lock接口具體的方法及釋義:
public interface Lock {
/**
* 獲取鎖
*
* 如果當前線程無法獲取到鎖(可能其他線程正在持有鎖),則當前線程就會休眠,直到獲取到鎖
*/
void lock();
/**
* 可中斷地獲取鎖
*
* 如果如果當前線程無法獲取到鎖(可能其他線程正在持有鎖),則當前線程就會休眠,
* 直到發(fā)生下面兩種情況的任意一種:
* ①獲取到了鎖
* ②被其他線程中斷
*/
void lockInterruptibly() throws InterruptedException;
/**
* 嘗試非阻塞地獲取鎖
*
* lock()和lockInterruptibly()在獲取不到鎖的時候,都會阻塞當前線程,直到獲取到鎖
* 而該方法則不會阻塞線程,能立即獲取到鎖則返回true,獲取不到則立即返回false
*
* 該方法的常用方式如下:
*
* Lock lock = ...;
* if (lock.tryLock()) {
* try {
* // manipulate protected state
* } finally {
* lock.unlock();
* }
* } else {
* // perform alternative actions
* }}
*
* 這種使用方式,可以保證只在獲取到鎖的時候才去釋放鎖
*/
boolean tryLock();
/**
* 超時獲取鎖
*
* 當前線程在以下三種情況下會返回:
* ①當前線程在超時時間內(nèi)獲取到了鎖,返回true
* ②當前線程在超時時間內(nèi)被中斷,返回false(即該方法可以響應(yīng)其他線程對該線程的中斷操作)
* ③超時時間結(jié)束,沒有獲取到鎖,返回false
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* 釋放鎖
*/
void unlock();
/**
* 獲取與該鎖綁定的Condition
*
* 當前線程只有在獲得了鎖,才能調(diào)用Condition的wait()方法(表示我已經(jīng)到了某一條件),
* 調(diào)用Condition的wait()方法之后,當前線程會釋放鎖
*/
Condition newCondition();
}
包java.util.concurrent.locks的類圖

其中:
AbstractOwnableSynchronizer、AbstractQueuedLongSynchronizer、AbstractQueuedSynchronizer是同步器,是鎖實現(xiàn)相關(guān)的內(nèi)容。
ReentrantLock(重入鎖)和ReentrantReadWriteLock(重入讀寫鎖)是具體的實現(xiàn)類。
LockSupport是一個工具類,提供了基本的線程阻塞和喚醒功能。
Condition是實現(xiàn)線程間實現(xiàn)多條件等待/通知模式用到的。
同步器的實現(xiàn)原理
TODO
重入鎖:ReentrantLock
重入鎖,顧名思義,就是支持重新進入的鎖:即某線程在獲取到鎖之后,可以再次獲取鎖而不會被阻塞。
ReentrantLock類是通過組合自定義同步器來來實現(xiàn)這種重入特性的,除此之外,該類還支持公平地獲取鎖(獲取鎖的順序與請求鎖的順序是相同的,等待時間最長的線程最優(yōu)先獲取到鎖),還支持綁定多個Condition。(synchronized關(guān)鍵字隱式地支持重進入,比如synchronized修飾的遞歸方法,在方法執(zhí)行時,執(zhí)行線程在獲取了鎖之后仍能連續(xù)多次地獲得該鎖,不會出現(xiàn)阻塞自己的情況)。
ReentrantLock內(nèi)部重進入的實現(xiàn)(非公平獲取鎖的情況)代碼如下:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}else if (current == getExclusiveOwnerThread()) {
//如果是當前持有鎖的線程再次獲取鎖,則將同步值進行增加并返回true
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
ReentrantLock公平鎖的內(nèi)部實現(xiàn)代碼如下:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
與非公平獲取鎖的方法nonfairTryAcquire(int acquires)相比,多了一個hasQueuedPredecessors()判斷:同步隊列中當前節(jié)點(當前想要獲取鎖的線程)是否有前驅(qū)節(jié)點,如果該方法返回true,則表示有線程比當前線程更早地請求獲取鎖,因此需要前驅(qū)線程獲取并釋放鎖之后才能繼續(xù)獲取鎖。
公平鎖保證了鎖的獲取按照FIFO原則,而代價是進行大量的線程切換;
非公平鎖雖然可能造成線程“饑餓”(即某線程可能需要等很久才得到鎖),但線程切換極少,可以保證更大的吞吐量。
讀寫鎖:ReentrantReadWriteLock
ReentrantLock在在同一時刻,只允許一個線程進行訪問(無論讀還是寫)。而讀寫鎖是指:在同一時刻,允許多個線程進行讀操作,而寫操作則會阻塞其他所有的線程(無論是讀還是寫,都會被阻塞)。讀寫鎖維護了一對鎖:讀鎖和寫鎖,通過分離讀鎖和寫鎖,使得并發(fā)性能相比一般的排他鎖有了很大的提升。
Java中讀寫鎖的實現(xiàn)類是ReentrantReadWriteLock,它支持:①重進入;②公平性選擇;③鎖降級:寫鎖可以降級為讀鎖,其提供了一些便于外界監(jiān)控其內(nèi)部狀態(tài)的方法,如下:
int getReadLockCount()
返回當前讀鎖被獲取的次數(shù)
注意:該次數(shù)并不等于獲取讀鎖的線程數(shù),
因為同一線程可以連續(xù)獲得多次讀鎖,獲取一次,返回值就加1,
比如,僅一個線程,它連續(xù)獲得了n次讀鎖,那么占據(jù)讀鎖的線程數(shù)是1,但該方法返回n
int getReadHoldCount()
返回當前前程獲取讀鎖的次數(shù)
boolean isWriteLock()
判斷讀鎖是否被獲取
int getWriteHoldCount()
返回當前寫鎖被獲取的次數(shù)
使用舉例:
public class Cache{
//非線程安全的HashMap
private static Map<String, Object> map = new HashMap<>();
//讀寫鎖
private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
//讀鎖
private static Lock readLock = reentrantReadWriteLock.readLock();
//寫鎖
private static Lock writeLock = reentrantReadWriteLock.writeLock();
/**
* 獲取key對應(yīng)的value
*
* 使用讀鎖,使得并發(fā)訪問該方法時不會被阻塞
*/
public static final Object get(String key){
readLock.lock();
try{
return map.get(key);
}finally {
readLock.unlock();
}
}
/**
* 設(shè)置key對應(yīng)的value
*
* 當有線程對map進行put操作時,使用寫鎖,阻塞其他線程的讀、寫操作,
* 只有在寫鎖被釋放后,其他讀寫操作才能繼續(xù)
*/
public static Object put(String key, Object value){
writeLock.lock();
try {
return map.put(key, value);
}finally {
writeLock.unlock();
}
}
/**
* 清空map
*
* 當有線程對map進行清空操作時,使用寫鎖,阻塞其他線程的讀、寫操作,
* 只有在寫鎖被釋放后,其他讀寫操作才能繼續(xù)
*/
public static void clear(){
writeLock.lock();
try {
map.clear();
}finally {
writeLock.unlock();
}
}
}
TODO:讀寫鎖的實現(xiàn)原理
LockSupport工具類
LockSupport定義了一組公共靜態(tài)方法,這些方法提供了最基本的線程阻塞和喚醒功能,是構(gòu)建同步組件的基礎(chǔ)工具,它主要有兩類方法:
①以park開頭的方法:阻塞當前線程
②以unpark開頭的方法:喚醒被阻塞的線程
void park()
阻塞當前線程,只有當前線程被中斷或其他線程調(diào)用unpark(Thread thread),才能從park()方法返回
void parkNanos(long nanos)
阻塞當前線程,最長不超過nanos納秒,返回條件在park()的基礎(chǔ)上增加了超時返回
void parkUntil(long deadline)
阻塞當前線程,直到deadline這個時間點(從1970年開始到deadline時間的毫秒數(shù))
void unpark(Thread thread)
喚醒處于阻塞狀態(tài)的thread線程
在JDK1.6中,該類增加了void park(Object blocker)、void parkNanos(Object blocker, long nanos)、void parkUntil(Object blocker, long deadline)方法,相比之前的park方法,多了一個blocker對象,該對象用來標識當前線程在等待的對象(阻塞對象),主要用來問題排查和系統(tǒng)監(jiān)控(對線程dump時,可以提供阻塞對象的信息),可以用來代替原有的park方法。
Condition接口
任意一個Java對象都有一組監(jiān)視器方法(定義在java.lang.Object上):wait()、wait(long timeout)、notify()、notifyAll(),這些方法與sychronized配合使用,可以實現(xiàn)等待/通知模式。
Condition接口也提供了類似的監(jiān)視器方法,與Lock配合使用,可以實現(xiàn)等待/通知模式。
兩者的區(qū)別如下:
| 對比項 | Object Monitor Methods | Condition |
|---|---|---|
| 前置條件 | 獲取對象的鎖 | 調(diào)用Lock.lock()獲取鎖→調(diào)用Lock.newCondition()獲取Condition對象 |
| 調(diào)用方式 | 直接調(diào)用,如object.wait() | 直接調(diào)用,如condition.await() |
| 等待隊列個數(shù) | 1個 | 多個 |
| 當前線程釋放鎖并進入等待狀態(tài) | 支持 | 支持 |
| 當前線程釋放鎖并進入等待狀態(tài),在等待狀態(tài)中不響應(yīng)中斷 | 不支持 | 支持 |
| 當前線程釋放鎖并進入超時等待狀態(tài) | 支持 | 支持 |
| 當前線程釋放鎖并進入等待狀態(tài)到將來的某個時間點 | 不支持 | 支持 |
| 喚醒等待隊列中的一個線程 | 支持 | 支持 |
| 喚醒等待隊列中的全部線程 | 支持 | 支持 |
Condition中的方法如下:(一般會將Condition對象作為成員變量)
說明:當前線程調(diào)用await()方法后,當前線程會釋放鎖并在此等候,當其他線程調(diào)用signal()方法通知當前線程后,當前線程才從await()方法中返回,并且在返回前已經(jīng)獲取了鎖(re-acquire)。
public interface Condition {
/**
* 當前線程進入等待狀態(tài)直到被通知(signalled)或中斷(interrupted)
*
* 如果當前線程從該方法返回,則表明當前線程已經(jīng)獲取了Condition對象所對應(yīng)的鎖
*
* @throws InterruptedException
*/
void await() throws InterruptedException;
/**
* 與await()不同是:該方法對中斷操作不敏感
*
* 如果當前線程在等待的過程中被中斷,當前線程仍會繼續(xù)等待,直到被通知(signalled),
* 但當前線程會保留線程的中斷狀態(tài)值
*
*/
void awaitUninterruptibly();
/**
* 當前線程進入等待狀態(tài),直到被通知或被中斷或超時
*
* 返回值表示剩余時間,
* 如果當前線程在nanosTimeout納秒之前被喚醒,那么返回值就是(nanosTimeout-實際耗時),
* 如果返回值是0或者負數(shù),則表示等待已超時
*
*/
long awaitNanos(long nanosTimeout) throws InterruptedException;
/**
* 該方法等價于awaitNanos(unit.toNanos(time)) > 0
*/
boolean await(long time, TimeUnit unit) throws InterruptedException;
/**
* 當前線程進入等待狀態(tài),直到被通知或被中斷或到達時間點deadline
*
* 如果在沒有到達截止時間就被通知,返回true
* 如果在到了截止時間仍未被通知,返回false
*/
boolean awaitUntil(Date deadline) throws InterruptedException;
/**
* 喚醒一個等待在Condition上的線程
* 該線程從等待方法返回前必須獲得與Condition相關(guān)聯(lián)的鎖
*/
void signal();
/**
* 喚醒所有等待在Condition上的線程
* 每個線程從等待方法返回前必須獲取Condition相關(guān)聯(lián)的鎖
*/
void signalAll();
}
使用Condition實現(xiàn)一個有界阻塞隊列的例子:當隊列為空時,隊列的獲取操作將會阻塞當前線程,直到隊列中有新增元素;當隊列已滿時,隊列的插入操作就會阻塞插入線程,直到隊列中出現(xiàn)空位。(其實這個例子就是簡化版的ArrayBlockingQueue)
class BoundedBlockingQueue<T> {
//使用數(shù)組維護隊列
private Object[] queue;
//當前數(shù)組中的元素個數(shù)
private int count = 0;
//當前添加元素到數(shù)組的位置
private int addIndex = 0;
//當前移除元素在數(shù)組中的位置
private int removeIndex = 0;
private Lock lock = new ReentrantLock();
private Condition notEmptyCondition = lock.newCondition();
private Condition notFullCondition = lock.newCondition();
private BoundedBlockingQueue() {
}
public BoundedBlockingQueue(int capacity) {
queue = new Object[capacity];
}
public void put(T t) throws InterruptedException {
lock.lock();//獲得鎖,保證內(nèi)部數(shù)組修改的可見性和排他性
try {
//使用while,而非if:防止過早或意外的通知,
//加入當前線程釋放了鎖進入等待狀態(tài),然后其他線程進行了signal,
//則當前線程會從await()方法中返回,再次判斷count == queue.length
//todo:哪些情況下的過早或意外???
while (count == queue.length) {
notFullCondition.await();//釋放鎖,等待隊列不滿,即等待隊列出現(xiàn)空位
}
queue[addIndex] = t;
addIndex++;
if (addIndex == queue.length) {
addIndex = 0;
}
count++;
notEmptyCondition.signal();
} finally {
//確保會釋放鎖
lock.unlock();
}
}
@SuppressWarnings("unchecked")
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmptyCondition.await();//釋放鎖,等待隊列不為空,即等待隊列中至少有一個元素
}
Object x = queue[removeIndex];
removeIndex++;
if (removeIndex == queue.length) {
removeIndex = 0;
}
count--;
notFullCondition.signal();//通知那些等待隊列非空的線程,可以向隊列中插入元素了
return (T) x;
} finally {
//確保會釋放鎖
lock.unlock();
}
}
}
TODO:Condition的實現(xiàn)分析
參考
大部分來自《Java并發(fā)編程的藝術(shù)》,部分參考JDK中的注釋說明。