java.util.concurrent.locks包下的鎖實現(xiàn)分析

該包的結(jié)構(gòu)如圖

java.util.concurrent.locks包結(jié)構(gòu)

其中有java6新加入的一個類LockSupport,這個類已經(jīng)在自旋鎖在高并發(fā)的用處,實現(xiàn)各種鎖的最后提到過,也是基于Unsafe類的實現(xiàn) 主要的兩個方法是

public static void unpark(Thread thread);
public static void park(Object blocker) {
public static void parkNanos(Object blocker, long nanos) ;
public static void parkUntil(Object blocker, long deadline) ;
public static Object getBlocker(Thread t) ;
public static void park() ;
public static void parkNanos(long nanos);
public static void parkUntil(long deadline);

基本是基于

unsafe.putObject
unsafe.unpark
unsafe.park

來實現(xiàn)的。由于已經(jīng)分析過。所以這里不再說了。
這個包下的類 具體的實現(xiàn)類只有ReentrantLock和ReentrantReadWriteLock(除去單獨講的LockSupport),其余的都是抽象類和接口,做一下歸類
抽象類 AbstractOwnableSynchronizer 、AbstractQueuedLongSynchronizer、AbstractQueuedSynchronizer
接口 Lock、ReadWriteLock、Condition
抽象類AbstractOwnableSynchronizer 該類是主要定義讓線程以獨占方式擁有同步器,此類為創(chuàng)建鎖和相關(guān)同步器提供了基礎(chǔ),類本身不管理或使用此信息 很簡單的兩個方法setExclusiveOwnerThread(Thread t)設(shè)置當(dāng)前擁有獨占訪問的線程 和getExclusiveOwnerThread() 返回由 setExclusiveOwnerThread最后設(shè)置的線程;如果從未設(shè)置,則返回 null。
抽象類AbstractQueuedSynchronizer 繼承自AbstractOwnableSynchronizer該類為實現(xiàn)依賴于先進先出 (FIFO) 等待隊列的阻塞鎖和相關(guān)同步器(信號量、事件,等等)提供一個框架 和是MCSLock的擴展。該類有個屬性 private volatile int state;The synchronization state. 同步的狀態(tài)表示。擁有正常的set和get方法后,還有個compareAndSetState方法,是基于unsafe類的compareAndSwapInt來實現(xiàn)的,由此類實現(xiàn)同步。
相似的AbstractQueuedLongSynchronizer類的屬性定義是private volatile long state;可以看出是LONG型的屬性值。調(diào)用的是unsafe.compareAndSwapLong,所以二者的區(qū)別就基本知道了,其它沒啥區(qū)別。所以我們只要繼續(xù)分析AbstractQueuedLongSynchronizer類即可
該類有Node 內(nèi)部類和ConditionObject 內(nèi)部類,其中Node類定義了兩種模式的node屬性,

 /** Marker to indicate a node is waiting in shared mode */
 static final Node SHARED = new Node();
 /** Marker to indicate a node is waiting in exclusive mode */
 static final Node EXCLUSIVE = null;

一種是獨占的一種是共享的。還定義了四種等待狀態(tài)

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
  1. 節(jié)點等待狀態(tài)被取消掉了 值為1;
  2. 節(jié)點等待狀態(tài)標(biāo)示 等待獲得鎖的線程去unparking 通知 值為-1;
  3. 節(jié)點等待狀態(tài) 為處于條件等待狀態(tài) 值為-2;
  4. 節(jié)點等待狀態(tài)為節(jié)點狀態(tài)需要向后傳播,一般是釋放共享需要傳播給其他節(jié)點,一般是頭節(jié)點在doReleaseShared 去確保傳播繼續(xù)下去即使其他操作已經(jīng)介入了。值為-3;
  5. 還有值為0的情況 這個表明處于非上述四種狀態(tài)。

這里還有volatile Node prev;和volatile Node next; 隊列的前后節(jié)點
這里AbstractQueuedSynchronizer也有 private transient volatile Node tail;和 private transient volatile Node head; 這個是首尾節(jié)點 不要混淆
結(jié)構(gòu)圖大致為

node各種屬性結(jié)構(gòu)圖

由前面MCSLock的知識知道volatile Thread thread; 是標(biāo)示是否為當(dāng)前線程獲得鎖的
這里還有個 Node nextWaiter; 這個屬性要結(jié)合Condition去理解,該節(jié)點關(guān)聯(lián)的是等待某條件上的下個節(jié)點。因為 條件隊列只有是獨占模式下才可以被訪問到。
再先來看一下內(nèi)部類ConditionObject 該類實現(xiàn)了Condition接口。
該對象內(nèi)部也維護了一個隊列,

private transient Node firstWaiter;
private transient Node lastWaiter;

Condition主要是為了在J.U.C框架中提供和Java傳統(tǒng)的監(jiān)視器風(fēng)格的wait,notify和notifyAll方法類似的功能。
JDK官方的解釋是

Condition將 Object監(jiān)視器方法(wait、notify和 notifyAll)分解成截然不同的對象,以便通過將這些對象與任意 Lock 實現(xiàn)組合使用,為每個對象提供多個等待 set(wait-set)。其中,Lock 替代了 synchronized方法和語句的使用,Condition替代了 Object 監(jiān)視器方法的使用。條件(也稱為條件隊列條件變量)為線程提供了一個含義,以便在某個狀態(tài)條件現(xiàn)在可能為 true 的另一個線程通知它之前,一直掛起該線程(即讓其“等待”)。因為訪問此共享狀態(tài)信息發(fā)生在不同的線程中,所以它必須受保護,因此要將某種形式的鎖與該條件相關(guān)聯(lián)。等待提供一個條件的主要屬性是:以原子方式 釋放相關(guān)的鎖,并掛起當(dāng)前線程,就像 Object.wait做的那樣。Condition實例實質(zhì)上被綁定到一個鎖上。要為特定 Lock 實例獲得 Condition實例,請使用其 newCondition()方法。

提供了

void await() throws InterruptedException; 
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();

方法,await開頭的都是和await方法一致,只不過添加了寫條件而已,await會在當(dāng)前l(fā)ock的隊列中持有鎖的線程上釋放鎖資源,并新建Condition節(jié)點加到Condition隊列尾部,阻塞當(dāng)前線程。
signal()和signalAll()就是將Condition的頭節(jié)點移動到Lock等待節(jié)點尾部,讓其等待再次獲取鎖。
主要看await方法和signal方法是怎么實現(xiàn)的即可

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

可以看到這里借助了LockSupport.park(this);方法。其他的方法主要是做各種檢測 這里就不說了

signal方法

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

這里調(diào)用了doSignal方法

   private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

看一下while循環(huán)調(diào)用到的方法

 final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

可以看出還是借助了unsafe和 LockSupport.unpark方法實現(xiàn)鎖釋放。
看到一篇不錯的博文ConditionAbstractQueuedSynchronizer的闡述,這里借助下他的圖
以下是AQS隊列和Condition隊列的出入結(jié)點的示意圖,可以通過這幾張圖看出線程結(jié)點在兩個隊列中的出入關(guān)系和條件。
I.初始化狀態(tài):AQS等待隊列有3個Node,Condition隊列有1個Node(也有可能1個都沒有)

初始化狀態(tài)

II.節(jié)點1執(zhí)行Condition.await()
1.將head后移
2.釋放節(jié)點1的鎖并從AQS等待隊列中移除
3.將節(jié)點1加入到Condition的等待隊列中
4.更新lastWaiter為節(jié)點1

節(jié)點1執(zhí)行Condition.await()

III.節(jié)點2執(zhí)行signal()操作
5.將firstWaiter后移
6.將節(jié)點4移出Condition隊列
7.將節(jié)點4加入到AQS的等待隊列中去
8.更新AQS的等待隊列的tail

節(jié)點2執(zhí)行signal()

最后給出官方的Condition例子

package com.alibaba.otter.canal.common;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();
            items[putptr] = x;
            if (++putptr == items.length)
                putptr = 0;
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            Object x = items[takeptr];
            if (++takeptr == items.length)
                takeptr = 0;
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

再看下AbstractQueuedSynchronizer 留給子類去實現(xiàn)的方法,看一下官方文檔的說法

為了將此類用作同步器的基礎(chǔ),需要適當(dāng)?shù)刂匦露x以下方法,這是通過使用 getState setState和/或 compareAndSetState方法來檢查和/或修改同步狀態(tài)來實現(xiàn)的

  1. tryAcquire(int)
  2. tryRelease(int)
  3. tryAcquireShared(int)
  4. tryReleaseShared(int)
  5. isHeldExclusively()

默認(rèn)情況下,每個方法都拋出 UnsupportedOperationException。這些方法的實現(xiàn)在內(nèi)部必須是線程安全的,通常應(yīng)該很短并且不被阻塞。定義這些方法是使用此類的 唯一 受支持的方式。其他所有方法都被聲明為 final
,因為它們無法是各不相同的。

也就是說子類需要自己實現(xiàn)這些方法去構(gòu)造共享鎖和獨占鎖,怎么實現(xiàn)呢?就是利用getState 、setState、和compareAndSetState這幾個AbstractQueuedSynchronizer中已經(jīng)實現(xiàn)好的方法。

我們看下兩個實現(xiàn)類ReentrantLockReentrantReadWriteLock,這算是比較典型的兩個例子了。一個是可重入鎖,是獨占鎖的方式,ReentrantReadWriteLock是共享鎖的實現(xiàn)。完美
ReentrantLock中還實現(xiàn)了公平鎖Sync和非公平鎖NonfairSync

都是借助繼承了AbstractQueuedSynchronizer的內(nèi)部類Sync來實現(xiàn) 看到實現(xiàn)了tryAcquire和tryRelease方法
其中獨占方式的判斷

 protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

這樣我們就可以簡單的利用ReentrantLock的lock和unlock實現(xiàn)鎖啦

再看下ReentrantReadWriteLock 這個類 這個類實現(xiàn)了ReadWriteLock 有兩種鎖readerLock和writerLock

看下內(nèi)部類ReadLock 主要的方法都是共享的

 public void lock() {
       sync.acquireShared(1);
 }
 public  void unlock() {
            sync.releaseShared(1);
        }

共享鎖時通過計數(shù)的方式實現(xiàn)的,最大可以到

 static final int SHARED_SHIFT   = 16;
 static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
 final boolean tryReadLock() {
            Thread current = Thread.currentThread();
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false;
                int r = sharedCount(c);
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != current.getId())
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return true;
                }
            }
        }

可以看到這里還涉及到了

  /**
         * A counter for per-thread read hold counts.
         * Maintained as a ThreadLocal; cached in cachedHoldCounter
         */
        static final class HoldCounter {
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = Thread.currentThread().getId();
        }

再看下
主要的方法都是獨占的

   public void lock() {
            sync.acquire(1);
        }
 public void unlock() {
            sync.release(1);
        }

這二者都借助

 abstract static class Sync extends AbstractQueuedSynchronizer

還是出自AbstractQueuedSynchronizer

這種讀寫鎖,讀共享,寫?yīng)氄伎梢越档玩i的粒度 重入方面其內(nèi)部的WriteLock可以獲取ReadLock,但是反過來ReadLock想要獲得WriteLock則永遠都不要想。 WriteLock可以降級為ReadLock,順序是:先獲得WriteLock再獲得ReadLock,然后釋放WriteLock,這時候線程將保持Readlock的持有。反過來ReadLock想要升級為WriteLock則不可能.ReadLock可以被多個線程持有并且在作用時排斥任何的WriteLock,而WriteLock則是完全的互斥。這一特性最為重要,因為對于高讀取頻率而相對較低寫入的數(shù)據(jù)結(jié)構(gòu),使用此類鎖同步機制則可以提高并發(fā)量.不管是ReadLock還是WriteLock都支持Interrupt,語義與ReentrantLock一致。WriteLock支持Condition并且與ReentrantLock語義一致,而ReadLock則不能使用Condition,否則拋出UnsupportedOperationException異常,這種鎖的用處很多,緩存方面應(yīng)該更明顯 在網(wǎng)上找到的個demo 這里附上

package com.alibaba.otter.canal.common;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockDemo {
     private Map<String, Object> map = new HashMap<String, Object>();//緩存器
        private ReadWriteLock rwl = new ReentrantReadWriteLock();
        public static void main(String[] args) {
            
        }
        public Object get(String id){
            Object value = null;
            rwl.readLock().lock();//首先開啟讀鎖,從緩存中去取
            try{
                value = map.get(id); 
                if(value == null){  //如果緩存中沒有釋放讀鎖,上寫鎖
                    rwl.readLock().unlock();
                    rwl.writeLock().lock();
                    try{
                        if(value == null){
                            value = "aaa";  //此時可以去數(shù)據(jù)庫中查找,這里簡單的模擬一下
                        }
                    }finally{
                        rwl.writeLock().unlock(); //釋放寫鎖
                    }
                    rwl.readLock().lock(); //然后再上讀鎖
                }
            }finally{
                rwl.readLock().unlock(); //最后釋放讀鎖
            }
            return value;
        }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容