讀書筆記:淺談Java的鎖

在我們一個多線程程序中,同步是實現(xiàn)對一個方法或者模塊進(jìn)行獨(dú)占式訪問的方法,那么如何進(jìn)行同步的操作呢?首先我們就會想到最常使用的方式就是synchronize同步塊,但是在一個頻繁發(fā)生操作的程序中如果一直使用synchronize同步塊,那么就會大大影響程序的性能,并且同步塊也有其使用的限制語境,所以我們必須尋找其他更靈活,性能更棒的同步方式。假如我們想進(jìn)行在一個時間點(diǎn)內(nèi)修改一個變量的值,我們可以使用volatile來修飾這個變量而不是synchronize,這樣就會大大增加對變量獨(dú)占操作的性能,另外我們也可以使用顯式鎖的方式來增加同步操作的靈活性,下面我們就來了解了解Java中的鎖。
讓我們來想一下synchronize的語義是什么:

1. 進(jìn)入同步塊
2. 對同步塊中的內(nèi)容進(jìn)行單線程操作,阻塞其它線程
3. 跳出同步塊,讓其它的線程獲取鎖并執(zhí)行同步塊中的內(nèi)容,即重復(fù)上面的步驟1和2

我們可以使用顯式的鎖來達(dá)到同樣的目的,這個顯式的鎖就是Lock接口,讓我們來看下面一個例子:

        Lock lock = new ReentrantLock();
        lock.lock();
        try{
            //do something
        }finally{
            lock.unlock();
        }

上面就是一個顯式鎖的簡單例子,這種方式比同步塊來的更加的靈活,注意上面的代碼,我們獲取鎖的語句寫在了try-finally的外面,這是因為,假如我們把語句寫到里面,假如我們獲取鎖的過程出現(xiàn)了異常,我們就會不必要地釋放鎖。我們來看下面一個例子

package com.fan.ThreadDemo;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class Mutex implements Lock {
    
    //靜態(tài)內(nèi)部類,用以實現(xiàn)對同步狀態(tài)的基本操作
    private static class Syn extends AbstractQueuedSynchronizer{
        private static final long serialVersionUID = -8152719729798280428L;

        protected boolean isHeldExclusively(){
            return getState() == 1;//判斷鎖有沒有被獲取
        } 
        
        public boolean tryAcquire(int acquire){
            if(compareAndSetState(0, 1))//嘗試使用CAS來對同步狀態(tài)進(jìn)行原子操作
            {
                setExclusiveOwnerThread(Thread.currentThread());//將當(dāng)前線程設(shè)為獲取鎖的線程
                return true;
            }
            return false;
        }
        
        public boolean tryRealease(){
            Thread currentThread = Thread.currentThread();
            if(getState() == 0)
                throw new IllegalMonitorStateException();
            if(currentThread == getExclusiveOwnerThread()){
                setExclusiveOwnerThread(null);//釋放鎖
                setState(0);
                return true;
            }
            return false;
        }
        
        Condition newcondition(){return new ConditionObject();}
    } 
    
    private final Syn syn = new Syn();
    @Override
    public void lock() {syn.acquire(1);}

    @Override
    public void lockInterruptibly() throws InterruptedException {syn.acquireInterruptibly(1);}
    
    @Override
    public boolean tryLock() {return syn.tryAcquire(1);}

    @Override
    public boolean tryLock(long time, TimeUnit unit)
            throws InterruptedException {return syn.tryAcquireNanos(1, unit.toNanos(time));}

    @Override
    public void unlock() {syn.release(1);}

    @Override
    public Condition newCondition() {return syn.newcondition();}
    
    public boolean hasQueuedThreads(){return syn.hasQueuedThreads();}
}

上例是一個實現(xiàn)對一個鎖的進(jìn)行線程互斥獲取的類,在Mutex的靜態(tài)內(nèi)部類Syn中,tryAcquire使用判斷CAS(CAS:利用原子操作的方式來進(jìn)行判斷并比較)操作是否成功的方式來設(shè)置當(dāng)前線程為獲取鎖的線程,tryRealease中則采用采用將同步狀態(tài)設(shè)為0,獲取鎖的線程設(shè)為空的方式來釋放鎖

AQS

顯式鎖和同步塊的語義是一樣的,讓我們再看一下鎖的語義的第二句,其中的阻塞其它線程中被阻塞的線程是個什么狀態(tài),等鎖被釋放后,被阻塞的鎖獲取鎖采用的是什么策略呢,這些與一個同步組件的基礎(chǔ)框架---AbstractQueueSynchronizer(AQS:隊列同步器)有關(guān),隊列同步器使用了一個int型的變量來表示同步狀態(tài),通過內(nèi)置的FIFO的同步隊列來完成資源獲取線程的排隊工作,那么AQS是如何實現(xiàn)同步操作的呢,我們看下圖

一個同步隊列

當(dāng)線程在獲取鎖的過程中被阻塞之后,這個線程會被包裝成一個Node節(jié)點(diǎn)并被添加到如上圖所示的一個線程同步隊列之中
同步隊列的節(jié)點(diǎn)的加入

假如現(xiàn)在有多個線程同時被阻塞,那么各個線程所獲取的尾節(jié)點(diǎn)就有可能相同,這樣,線程就不能被正確地添加到同步隊列中去了,我們可以使用compareAndSetTail(Node expect,Node update)來對加入同步隊列的線程進(jìn)行安全地插入
同步隊列節(jié)點(diǎn)的刪除

假如獲取鎖的線程現(xiàn)在釋放了鎖,并且同步隊列的頭結(jié)點(diǎn)獲取了鎖,那么同步隊列的頭結(jié)點(diǎn)的prevnext會被設(shè)為null,同步器的head指向第二個節(jié)點(diǎn),并改變第二個節(jié)點(diǎn)的狀態(tài)。
我們看上面的例子中的lock()方法,這個方法的實現(xiàn)就是AQS的一個使用,它的代碼如下

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

上面的例子的邏輯是這樣的,假如獲取鎖失敗則采用阻塞的方式加入等待的同步隊列,我們再來看一下acquireQueued(addWaiter(Node.EXCLUSIVE), arg))的實現(xiàn)。
addWaiter(Node node):

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

上面的代碼就是講當(dāng)前的線程打包為Node對象并插入同步隊列,我們看一下上面的enq方法,這是為了保證當(dāng)前的線程一定要被插入到同步隊列中,好了,現(xiàn)在線程被插入到了同步隊列,我們下面將插入的節(jié)點(diǎn)交給acquireQueued(Node node,int arg)處理,acquireQueued(Node node,int arg)的實現(xiàn)如下所示

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

可以看到,這個方法會根據(jù)自身的前驅(qū)節(jié)點(diǎn)是不是頭結(jié)點(diǎn)來決定要不要進(jìn)行獲取鎖的操作,加入不是,則進(jìn)入一個休眠期,當(dāng)獲取鎖成功后,則將其next設(shè)為null并喚醒下一個節(jié)點(diǎn)。以上就是根據(jù)阻塞獲取鎖的方式來分析了一下AQS的實現(xiàn)原理

公平鎖和非公平鎖

公平鎖是指希望獲取鎖的線程可以按照請求的順序進(jìn)行獲取鎖,而非公平鎖則是獲取鎖的過程是一個競爭的過程,非公平鎖的效率在大量的獲取鎖的程序中效率要遠(yuǎn)高于公平鎖,但是容易出現(xiàn)獲取鎖時間長的線程無法獲取鎖的現(xiàn)象,我們把這種情況稱為‘饑餓’,ReetrantLock鎖可以使用其構(gòu)造參數(shù)來確定這個鎖是公平的還是非公平的

new ReetrantLock(true);//公平鎖
new ReetrantLock(false);//非公平鎖

重入鎖(ReetrantLock)

重入鎖是一種重復(fù)獲取的鎖,我們在使用這樣的鎖時,可以對其進(jìn)行重復(fù)的加鎖。我們看一下非公平重入鎖的獲取實現(xiàn)

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

可以看到,非公平鎖并沒有把獲取鎖失敗的線程加入同步隊列中去,再看一下它重入的過程,就是只有已經(jīng)獲取鎖的線程才可以繼續(xù)獲取鎖,并更新狀態(tài),一般是自增地增加狀態(tài)
再看一下非公平鎖的釋放過程

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

可以看到也是一個不停地自減鎖的狀態(tài),直到狀態(tài)為0時將鎖設(shè)為null以便后面的線程可以接著獲取這個鎖
既然我們把非公平鎖的實現(xiàn)貼上來了,那我們不妨也將公平鎖的實現(xiàn)也來分析一下

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

可以看到和非公平鎖唯一的區(qū)別就是多了一個! hasQueuedPredecessors()這樣的判斷,這樣就可以保證每次都只是頭結(jié)點(diǎn)獲取鎖了

讀寫鎖

我們之前用來同步的資源是獨(dú)占式訪問的,但是我們有一些資源的訪問方式有兩種,即讀和寫,而且有時候讀取的次數(shù)遠(yuǎn)比寫入的次數(shù)多,假如我們在每次讀的時候都要加鎖,而讀的時候數(shù)據(jù)并沒有發(fā)生變化,那么很明顯就會出現(xiàn)讀訪問效率低下的問題,假如我們在任何時候都不加鎖,那么資源的修改和讀取的安全性就得不到保證,為此,我們引入了讀寫鎖的概念,讀寫鎖的語義如下

1. 在寫鎖被當(dāng)前線程獲取的時候,任何鎖都不能被其它線程獲取
2. 在讀鎖被獲取的情況下,任何線程都可以繼續(xù)獲取讀鎖,但是寫鎖不能被獲取
3. 當(dāng)讀寫鎖在讀模式的鎖狀態(tài)時,如果有另外的線程試圖以寫模式加鎖,讀寫鎖通常會阻塞隨后的讀模式鎖的請求,這樣可以避免讀模式鎖長期占用,而等待的寫模式鎖請求則長期阻塞

很明顯,讀寫鎖很適合大量讀訪問和少量寫訪問的程序。
那么讀寫鎖是怎么實現(xiàn)的呢,讀寫鎖的狀態(tài)是32位的整型數(shù)字,假設(shè)用State表示。前16位表示讀,后16位表示寫,當(dāng)獲取寫鎖時,State的低位自增1,即執(zhí)行這樣的操作:State + 1,當(dāng)讀鎖被獲取時,執(zhí)行這樣的操作:State+(1<<16),具體的我們看下面的代碼

寫鎖的獲?。?/h6>
protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

從上面的代碼我們可以看到,當(dāng)只有讀鎖的情況下,是不能獲取寫鎖的,但是寫鎖可以進(jìn)行重入

讀鎖的獲取:
final boolean tryReadLock() {
            Thread current = Thread.currentThread();
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false;
                int r = sharedCount(c);
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return true;
                }
            }
        }

從上面的代碼來看,它是這樣工作的

.首先判斷寫鎖是否被獲取,假如被獲取,而且獲取的線程不是當(dāng)前線程,那么讀鎖將不能被獲取
.不能獲取超過設(shè)限數(shù)目的鎖
.利用HoldCounter對象來存儲當(dāng)前線程獲取的讀鎖數(shù)目,假如獲取讀鎖成功,那么當(dāng)前的HoldCounter里面的counter成員變量自增1
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容