ReentrantLock 實(shí)現(xiàn)原理

image
使用 `synchronize` 來做同步處理時(shí),鎖的獲取和釋放都是隱式的,實(shí)現(xiàn)的原理是通過編譯后加上不同的機(jī)器指令來實(shí)現(xiàn)。

ReentrantLock 就是一個(gè)普通的類,它是基于 AQS(AbstractQueuedSynchronizer)來實(shí)現(xiàn)的。

是一個(gè)重入鎖:一個(gè)線程獲得了鎖之后仍然可以反復(fù)的加鎖,不會(huì)出現(xiàn)自己阻塞自己的情況。

AQSJava 并發(fā)包里實(shí)現(xiàn)鎖、同步的一個(gè)重要的基礎(chǔ)框架。

鎖類型

ReentrantLock 分為公平鎖非公平鎖,可以通過構(gòu)造方法來指定具體類型:

    //默認(rèn)非公平鎖
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    //公平鎖
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

默認(rèn)一般使用非公平鎖,它的效率和吞吐量都比公平鎖高的多(后面會(huì)分析具體原因)。

獲取鎖

通常的使用方式如下:

    private ReentrantLock lock = new ReentrantLock();
    public void run() {
        lock.lock();
        try {
            //do bussiness
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

公平鎖獲取鎖

首先看下獲取鎖的過程:

    public void lock() {
        sync.lock();
    }

可以看到是使用 sync的方法,而這個(gè)方法是一個(gè)抽象方法,具體是由其子類(FairSync)來實(shí)現(xiàn)的,以下是公平鎖的實(shí)現(xiàn):

        final void lock() {
            acquire(1);
        }
        
        //AbstractQueuedSynchronizer 中的 acquire()
        public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
        }

第一步是嘗試獲取鎖(tryAcquire(arg)),這個(gè)也是由其子類實(shí)現(xiàn):

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

首先會(huì)判斷 AQS 中的 state 是否等于 0,0 表示目前沒有其他線程獲得鎖,當(dāng)前線程就可以嘗試獲取鎖。

注意:嘗試之前會(huì)利用 hasQueuedPredecessors() 方法來判斷 AQS 的隊(duì)列中中是否有其他線程,如果有則不會(huì)嘗試獲取鎖(這是公平鎖特有的情況)。

如果隊(duì)列中沒有線程就利用 CAS 來將 AQS 中的 state 修改為1,也就是獲取鎖,獲取成功則將當(dāng)前線程置為獲得鎖的獨(dú)占線程(setExclusiveOwnerThread(current))。

如果 state 大于 0 時(shí),說明鎖已經(jīng)被獲取了,則需要判斷獲取鎖的線程是否為當(dāng)前線程(ReentrantLock 支持重入),是則需要將 state + 1,并將值更新。

寫入隊(duì)列

如果 tryAcquire(arg) 獲取鎖失敗,則需要用 addWaiter(Node.EXCLUSIVE) 將當(dāng)前線程寫入隊(duì)列中。

寫入之前需要將當(dāng)前線程包裝為一個(gè) Node 對(duì)象(addWaiter(Node.EXCLUSIVE))。

AQS 中的隊(duì)列是由 Node 節(jié)點(diǎn)組成的雙向鏈表實(shí)現(xiàn)的。

包裝代碼:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

首先判斷隊(duì)列是否為空,不為空時(shí)則將封裝好的 Node 利用 CAS 寫入隊(duì)尾,如果出現(xiàn)并發(fā)寫入失敗就需要調(diào)用 enq(node); 來寫入了。

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

這個(gè)處理邏輯就相當(dāng)于自旋加上 CAS 保證一定能寫入隊(duì)列。

掛起等待線程

寫入隊(duì)列之后需要將當(dāng)前線程掛起(利用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)):

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

首先會(huì)根據(jù) node.predecessor() 獲取到上一個(gè)節(jié)點(diǎn)是否為頭節(jié)點(diǎn),如果是則嘗試獲取一次鎖,獲取成功就萬事大吉了。

如果不是頭節(jié)點(diǎn),或者獲取鎖失敗,則會(huì)根據(jù)上一個(gè)節(jié)點(diǎn)的 waitStatus 狀態(tài)來處理(shouldParkAfterFailedAcquire(p, node))。

waitStatus 用于記錄當(dāng)前節(jié)點(diǎn)的狀態(tài),如節(jié)點(diǎn)取消、節(jié)點(diǎn)等待等。

shouldParkAfterFailedAcquire(p, node) 返回當(dāng)前線程是否需要掛起,如果需要?jiǎng)t調(diào)用 parkAndCheckInterrupt()

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

他是利用 LockSupportpart 方法來掛起當(dāng)前線程的,直到被喚醒。

非公平鎖獲取鎖

公平鎖與非公平鎖的差異主要在獲取鎖:

公平鎖就相當(dāng)于買票,后來的人需要排到隊(duì)尾依次買票,不能插隊(duì)。

而非公平鎖則沒有這些規(guī)則,是搶占模式,每來一個(gè)人不會(huì)去管隊(duì)列如何,直接嘗試獲取鎖。

非公平鎖:

        final void lock() {
            //直接嘗試獲取鎖
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

公平鎖:

        final void lock() {
            acquire(1);
        }

還要一個(gè)重要的區(qū)別是在嘗試獲取鎖時(shí)tryAcquire(arg),非公平鎖是不需要判斷隊(duì)列中是否還有其他線程,也是直接嘗試獲取鎖:

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //沒有 !hasQueuedPredecessors() 判斷
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

釋放鎖

公平鎖和非公平鎖的釋放流程都是一樣的:

    public void unlock() {
        sync.release(1);
    }
    
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                   //喚醒被掛起的線程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    //嘗試釋放鎖
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }        

首先會(huì)判斷當(dāng)前線程是否為獲得鎖的線程,由于是重入鎖所以需要將 state 減到 0 才認(rèn)為完全釋放鎖。

釋放之后需要調(diào)用 unparkSuccessor(h) 來喚醒被掛起的線程。

總結(jié)

由于公平鎖需要關(guān)心隊(duì)列的情況,得按照隊(duì)列里的先后順序來獲取鎖(會(huì)造成大量的線程上下文切換),而非公平鎖則沒有這個(gè)限制。

所以也就能解釋非公平鎖的效率會(huì)被公平鎖更高。

號(hào)外

最近在總結(jié)一些 Java 相關(guān)的知識(shí)點(diǎn),感興趣的朋友可以一起維護(hù)。

地址: https://github.com/crossoverJie/Java-Interview

weixin
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容