java ReentrantLock 與 AQS 加解鎖過程

注:關鍵的代碼的部分會有注釋。

AQS介紹,主要是看屬性,方法遇到了再看

內部類 node,用于維護獲取鎖的線程的雙向隊列,詳細的可以具體看注釋,這里就不一一介紹了
    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
     .....
    //獲取鎖隊列隊頭
    private transient volatile Node head;
    //獲取鎖隊列隊尾
    private transient volatile Node tail;
    //加鎖標志位,主要是通過cas操作,算是委托給他判斷是否加鎖成功
    private volatile int state;
    //在aqs父類中,當前持有鎖的線程
    private transient Thread exclusiveOwnerThread;
    ......

ReentrantLock的具體屬性

 //Sync 繼承自AQS
 abstract static class Sync extends AbstractQueuedSynchronizer{...}
 //
private final Sync sync;
//公平實現(xiàn)
static final class FairSync extends Sync {...}
//非公平實現(xiàn)
static final class NonfairSync extends Sync {...}

加鎖過程

lock方法解析
public void lock() {
        sync.lock();
    }

sync.lock() 方法有兩個實現(xiàn),一個是公平鎖的實現(xiàn),另一個是非公平鎖的實現(xiàn),如下圖
1.png

我們先看非公平鎖的實現(xiàn)的加鎖方式(NonfairSync),代碼如下

 final void lock() {
            //如果cas修改值state值成功,則加鎖成功
            if (compareAndSetState(0, 1))
                //設置當前持有鎖的線程為當前線程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //不成功就在重新獲取鎖,過程較為復雜
                acquire(1);
        }

acquire(1)

  public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

此方法可分為三個部分,!tryAcquire(arg)、addWaiter(Node.EXCLUSIVE)、 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),一一分析具體干了什么事。

!tryAcquire(arg),可以跟蹤代碼先調用NonfairSync.tryAcquire()方法,調用Sync.nonfairTryAcquire()
 protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

final boolean nonfairTryAcquire(int acquires) {  
            //獲取當前線程
            final Thread current = Thread.currentThread();
             //獲取state的值
            int c = getState();
            //等于0那么現(xiàn)在無線程持有鎖
            if (c == 0) {
                //cas獲取鎖
                if (compareAndSetState(0, acquires)) {
                    //設置當前線程持有鎖
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果持有鎖的線程和當前線程為同一線程(重入),state累加
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

若以上方法返回true,那么加鎖成功,則!tryAcquire(arg)為false,不會執(zhí)行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)這部分代碼。若以上代碼返回false,加鎖失敗,則!tryAcquire(arg)為true,則會執(zhí)行acquireQueued(addWaiter(Node.EXCLUSIVE), arg),現(xiàn)在先假設加鎖失敗。

addWaiter(Node.EXCLUSIVE),索取鎖線程入隊
  private Node addWaiter(Node mode) {
        //新建node
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //取隊尾
        Node pred = tail;
        //如果隊尾不為空
        if (pred != null) {
            //取當線程的的節(jié)點的前節(jié)點為對位
            node.prev = pred;
            //cas設置隊尾的后節(jié)點為當前線程的節(jié)點
            if (compareAndSetTail(pred, node)) {
                //倒數(shù)第二節(jié)點的下個節(jié)點指向當前線程的節(jié)點
                pred.next = node;
                return node;
            }
        }
        //尾節(jié)點不為空,入隊
        enq(node);
        return node;
    }
    //入隊相關代碼
    private Node enq(final Node node) {
        for (;;) {
            //取隊尾
            Node t = tail;
            //初始化隊列
            if (t == null) { // Must initialize
                //設置頭節(jié)點
                if (compareAndSetHead(new Node()))
                    //隊首隊尾都指向 上一行的new Node()
                    tail = head;
            } else {
                //當前線程節(jié)點的前節(jié)點為隊尾節(jié)點
                node.prev = t;
                //cas操作設置隊尾為當前線程節(jié)點
                if (compareAndSetTail(t, node)) {
                    隊尾(現(xiàn)在倒數(shù)第二個節(jié)點)后節(jié)點為當前線程節(jié)點
                    t.next = node;
                    return t;
                }
            }
        }
    }

到這里入隊的相關操作就完成分析了,開始下一個方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg)的分析。

acquireQueued(addWaiter(Node.EXCLUSIVE), arg),從隊列面獲取線程來持有鎖。
  final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //獲取當前線程節(jié)點的前節(jié)點
                final Node p = node.predecessor();
                //如果前節(jié)點為隊首(這里明白這個雙向隊列的第一個節(jié)點為空),且獲取鎖成功(NonfairSync.tryAcquire()) 
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    //如果不設置為null,可能會引起oom,引用一直存在
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //判斷是否需要掛機線程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    //設置當前節(jié)點為頭節(jié)點,清空了線程和前節(jié)點
     private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
  
    //判斷是否需要掛起線程
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //node 類中有對應描述,代表現(xiàn)需要被unpraking
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        //節(jié)點被取消,跳過 、重試
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            //設置parking標志
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    //park線程
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

以上就是ReentrantLock 的加鎖過程。后面解析ReentrantLock 的解鎖過程

unlock

  public void unlock() {
        //調用sync的release
        sync.release(1);
    }
  
   public final boolean release(int arg) {
        //如果解鎖成功
        if (tryRelease(arg)) {
            //取頭節(jié)點
            Node h = head;
            //如果頭節(jié)點不為空且waitStatus 不為CANCELLED、SIGNAL    、CONDITION 、PROPAGATE 
            if (h != null && h.waitStatus != 0)
                //解鎖線程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

   protected final boolean tryRelease(int releases) {
            //獲取狀態(tài)并減1
            int c = getState() - releases;
             //判斷持有鎖的線程是不是當前線程
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //解鎖成功
            if (c == 0) {
                free = true;
                //清空線程
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

      //unpark線程節(jié)點
       private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        //ws<0 即ws為CANCELLED 、SIGNAL    、CONDITION 、PROPAGATE 中一種
        if (ws < 0)
            //ws設置為0,讓下一個線程節(jié)點嘗試獲取鎖
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

公平鎖與非公平鎖區(qū)別在于tryAcquire的時候,會檢查是不是已經(jīng)有線程在排隊了,如果有那么直接入隊尾,不在嘗試獲取鎖

 protected final boolean tryAcquire(int acquires) {
           final Thread current = Thread.currentThread();
           int c = getState();
           if (c == 0) {
               if (!hasQueuedPredecessors() &&
                   compareAndSetState(0, acquires)) {
                   setExclusiveOwnerThread(current);
                   return true;
               }
           }
           else if (current == getExclusiveOwnerThread()) {
               int nextc = c + acquires;
               if (nextc < 0)
                   throw new Error("Maximum lock count exceeded");
               setState(nextc);
               return true;
           }
           return false;
       }
       //判斷是隊列是否已有線程
       public final boolean hasQueuedPredecessors() {
       // The correctness of this depends on head being initialized
       // before tail and on head.next being accurate if the current
       // thread is first in queue.
       Node t = tail; // Read fields in reverse initialization order
       Node h = head;
       Node s;
       //h != t  初始化的時候,(s = h.next) == null頭節(jié)點的下個節(jié)點為空(第一個節(jié)點為”空“),s.thread != Thread.currentThread()頭節(jié)點的下個節(jié)點與當前線程不為同一線程
       return h != t &&
           ((s = h.next) == null || s.thread != Thread.currentThread());
   }

綜上就是ReentrantLock的加鎖和解鎖過程,現(xiàn)在對自旋有一點感覺了,enq(...)和 acquireQueued(...)中循環(huán)都是自選的體現(xiàn)。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容