在我們一個多線程程序中,同步是實現(xiàn)對一個方法或者模塊進(jìn)行獨(dú)占式訪問的方法,那么如何進(jìn)行同步的操作呢?首先我們就會想到最常使用的方式就是synchronize同步塊,但是在一個頻繁發(fā)生操作的程序中如果一直使用synchronize同步塊,那么就會大大影響程序的性能,并且同步塊也有其使用的限制語境,所以我們必須尋找其他更靈活,性能更棒的同步方式。假如我們想進(jìn)行在一個時間點(diǎn)內(nèi)修改一個變量的值,我們可以使用volatile來修飾這個變量而不是synchronize,這樣就會大大增加對變量獨(dú)占操作的性能,另外我們也可以使用顯式鎖的方式來增加同步操作的靈活性,下面我們就來了解了解Java中的鎖。
讓我們來想一下synchronize的語義是什么:
1. 進(jìn)入同步塊
2. 對同步塊中的內(nèi)容進(jìn)行單線程操作,阻塞其它線程
3. 跳出同步塊,讓其它的線程獲取鎖并執(zhí)行同步塊中的內(nèi)容,即重復(fù)上面的步驟1和2
我們可以使用顯式的鎖來達(dá)到同樣的目的,這個顯式的鎖就是Lock接口,讓我們來看下面一個例子:
Lock lock = new ReentrantLock();
lock.lock();
try{
//do something
}finally{
lock.unlock();
}
上面就是一個顯式鎖的簡單例子,這種方式比同步塊來的更加的靈活,注意上面的代碼,我們獲取鎖的語句寫在了try-finally的外面,這是因為,假如我們把語句寫到里面,假如我們獲取鎖的過程出現(xiàn)了異常,我們就會不必要地釋放鎖。我們來看下面一個例子
package com.fan.ThreadDemo;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Mutex implements Lock {
//靜態(tài)內(nèi)部類,用以實現(xiàn)對同步狀態(tài)的基本操作
private static class Syn extends AbstractQueuedSynchronizer{
private static final long serialVersionUID = -8152719729798280428L;
protected boolean isHeldExclusively(){
return getState() == 1;//判斷鎖有沒有被獲取
}
public boolean tryAcquire(int acquire){
if(compareAndSetState(0, 1))//嘗試使用CAS來對同步狀態(tài)進(jìn)行原子操作
{
setExclusiveOwnerThread(Thread.currentThread());//將當(dāng)前線程設(shè)為獲取鎖的線程
return true;
}
return false;
}
public boolean tryRealease(){
Thread currentThread = Thread.currentThread();
if(getState() == 0)
throw new IllegalMonitorStateException();
if(currentThread == getExclusiveOwnerThread()){
setExclusiveOwnerThread(null);//釋放鎖
setState(0);
return true;
}
return false;
}
Condition newcondition(){return new ConditionObject();}
}
private final Syn syn = new Syn();
@Override
public void lock() {syn.acquire(1);}
@Override
public void lockInterruptibly() throws InterruptedException {syn.acquireInterruptibly(1);}
@Override
public boolean tryLock() {return syn.tryAcquire(1);}
@Override
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {return syn.tryAcquireNanos(1, unit.toNanos(time));}
@Override
public void unlock() {syn.release(1);}
@Override
public Condition newCondition() {return syn.newcondition();}
public boolean hasQueuedThreads(){return syn.hasQueuedThreads();}
}
上例是一個實現(xiàn)對一個鎖的進(jìn)行線程互斥獲取的類,在Mutex的靜態(tài)內(nèi)部類Syn中,tryAcquire使用判斷CAS(CAS:利用原子操作的方式來進(jìn)行判斷并比較)操作是否成功的方式來設(shè)置當(dāng)前線程為獲取鎖的線程,tryRealease中則采用采用將同步狀態(tài)設(shè)為0,獲取鎖的線程設(shè)為空的方式來釋放鎖
AQS
顯式鎖和同步塊的語義是一樣的,讓我們再看一下鎖的語義的第二句,其中的阻塞其它線程中被阻塞的線程是個什么狀態(tài),等鎖被釋放后,被阻塞的鎖獲取鎖采用的是什么策略呢,這些與一個同步組件的基礎(chǔ)框架---AbstractQueueSynchronizer(AQS:隊列同步器)有關(guān),隊列同步器使用了一個int型的變量來表示同步狀態(tài),通過內(nèi)置的FIFO的同步隊列來完成資源獲取線程的排隊工作,那么AQS是如何實現(xiàn)同步操作的呢,我們看下圖

當(dāng)線程在獲取鎖的過程中被阻塞之后,這個線程會被包裝成一個Node節(jié)點(diǎn)并被添加到如上圖所示的一個線程同步隊列之中

假如現(xiàn)在有多個線程同時被阻塞,那么各個線程所獲取的尾節(jié)點(diǎn)就有可能相同,這樣,線程就不能被正確地添加到同步隊列中去了,我們可以使用
compareAndSetTail(Node expect,Node update)來對加入同步隊列的線程進(jìn)行安全地插入
假如獲取鎖的線程現(xiàn)在釋放了鎖,并且同步隊列的頭結(jié)點(diǎn)獲取了鎖,那么同步隊列的頭結(jié)點(diǎn)的
prev和next會被設(shè)為null,同步器的head指向第二個節(jié)點(diǎn),并改變第二個節(jié)點(diǎn)的狀態(tài)。我們看上面的例子中的
lock()方法,這個方法的實現(xiàn)就是AQS的一個使用,它的代碼如下
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上面的例子的邏輯是這樣的,假如獲取鎖失敗則采用阻塞的方式加入等待的同步隊列,我們再來看一下acquireQueued(addWaiter(Node.EXCLUSIVE), arg))的實現(xiàn)。
addWaiter(Node node):
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
上面的代碼就是講當(dāng)前的線程打包為Node對象并插入同步隊列,我們看一下上面的enq方法,這是為了保證當(dāng)前的線程一定要被插入到同步隊列中,好了,現(xiàn)在線程被插入到了同步隊列,我們下面將插入的節(jié)點(diǎn)交給acquireQueued(Node node,int arg)處理,acquireQueued(Node node,int arg)的實現(xiàn)如下所示
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看到,這個方法會根據(jù)自身的前驅(qū)節(jié)點(diǎn)是不是頭結(jié)點(diǎn)來決定要不要進(jìn)行獲取鎖的操作,加入不是,則進(jìn)入一個休眠期,當(dāng)獲取鎖成功后,則將其next設(shè)為null并喚醒下一個節(jié)點(diǎn)。以上就是根據(jù)阻塞獲取鎖的方式來分析了一下AQS的實現(xiàn)原理
公平鎖和非公平鎖
公平鎖是指希望獲取鎖的線程可以按照請求的順序進(jìn)行獲取鎖,而非公平鎖則是獲取鎖的過程是一個競爭的過程,非公平鎖的效率在大量的獲取鎖的程序中效率要遠(yuǎn)高于公平鎖,但是容易出現(xiàn)獲取鎖時間長的線程無法獲取鎖的現(xiàn)象,我們把這種情況稱為‘饑餓’,ReetrantLock鎖可以使用其構(gòu)造參數(shù)來確定這個鎖是公平的還是非公平的
new ReetrantLock(true);//公平鎖
new ReetrantLock(false);//非公平鎖
重入鎖(ReetrantLock)
重入鎖是一種重復(fù)獲取的鎖,我們在使用這樣的鎖時,可以對其進(jìn)行重復(fù)的加鎖。我們看一下非公平重入鎖的獲取實現(xiàn)
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
可以看到,非公平鎖并沒有把獲取鎖失敗的線程加入同步隊列中去,再看一下它重入的過程,就是只有已經(jīng)獲取鎖的線程才可以繼續(xù)獲取鎖,并更新狀態(tài),一般是自增地增加狀態(tài)
再看一下非公平鎖的釋放過程
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
可以看到也是一個不停地自減鎖的狀態(tài),直到狀態(tài)為0時將鎖設(shè)為null以便后面的線程可以接著獲取這個鎖
既然我們把非公平鎖的實現(xiàn)貼上來了,那我們不妨也將公平鎖的實現(xiàn)也來分析一下
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
可以看到和非公平鎖唯一的區(qū)別就是多了一個! hasQueuedPredecessors()這樣的判斷,這樣就可以保證每次都只是頭結(jié)點(diǎn)獲取鎖了
讀寫鎖
我們之前用來同步的資源是獨(dú)占式訪問的,但是我們有一些資源的訪問方式有兩種,即讀和寫,而且有時候讀取的次數(shù)遠(yuǎn)比寫入的次數(shù)多,假如我們在每次讀的時候都要加鎖,而讀的時候數(shù)據(jù)并沒有發(fā)生變化,那么很明顯就會出現(xiàn)讀訪問效率低下的問題,假如我們在任何時候都不加鎖,那么資源的修改和讀取的安全性就得不到保證,為此,我們引入了讀寫鎖的概念,讀寫鎖的語義如下
1. 在寫鎖被當(dāng)前線程獲取的時候,任何鎖都不能被其它線程獲取
2. 在讀鎖被獲取的情況下,任何線程都可以繼續(xù)獲取讀鎖,但是寫鎖不能被獲取
3. 當(dāng)讀寫鎖在讀模式的鎖狀態(tài)時,如果有另外的線程試圖以寫模式加鎖,讀寫鎖通常會阻塞隨后的讀模式鎖的請求,這樣可以避免讀模式鎖長期占用,而等待的寫模式鎖請求則長期阻塞
很明顯,讀寫鎖很適合大量讀訪問和少量寫訪問的程序。
那么讀寫鎖是怎么實現(xiàn)的呢,讀寫鎖的狀態(tài)是32位的整型數(shù)字,假設(shè)用State表示。前16位表示讀,后16位表示寫,當(dāng)獲取寫鎖時,State的低位自增1,即執(zhí)行這樣的操作:State + 1,當(dāng)讀鎖被獲取時,執(zhí)行這樣的操作:State+(1<<16),具體的我們看下面的代碼
寫鎖的獲?。?/h6>
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
從上面的代碼我們可以看到,當(dāng)只有讀鎖的情況下,是不能獲取寫鎖的,但是寫鎖可以進(jìn)行重入
讀鎖的獲取:
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
從上面的代碼來看,它是這樣工作的
.首先判斷寫鎖是否被獲取,假如被獲取,而且獲取的線程不是當(dāng)前線程,那么讀鎖將不能被獲取
.不能獲取超過設(shè)限數(shù)目的鎖
.利用HoldCounter對象來存儲當(dāng)前線程獲取的讀鎖數(shù)目,假如獲取讀鎖成功,那么當(dāng)前的HoldCounter里面的counter成員變量自增1