Java并發(fā)-Lock接口與隊(duì)列同步器AQS

Lock接口

與synchronized關(guān)鍵字相比擁有了鎖獲取與釋放的可操作性,可非阻塞的獲取鎖、可中斷的獲取鎖、超時(shí)獲取鎖

標(biāo)準(zhǔn)接口定義

 void lock();
 void lockInterruptibly() throws InterruptedException;//可中斷的阻塞獲取鎖
Condition newCondition();
boolean tryLock();//非阻塞的獲取鎖,馬上返回
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;//最大阻塞time時(shí)間,超時(shí)之后返回,可中斷
void unlock();//釋放鎖

標(biāo)準(zhǔn)使用方式

注意不要將獲取鎖的過(guò)程寫(xiě)到try塊里面

Lock lock = new ReentrantLock();
lock.lock();
try{
        //to do
}finally{
        lock.unlock();
}

隊(duì)列同步器AQS(AbstractQueuedSynchronizer)

簡(jiǎn)單的理解是對(duì)于請(qǐng)求獲取鎖的線(xiàn)程,如果當(dāng)前鎖已經(jīng)被別的線(xiàn)程獲取,那么當(dāng)前線(xiàn)程需要到隊(duì)列中排隊(duì)。如果隊(duì)列中的第一個(gè)線(xiàn)程釋放了鎖,那么就會(huì)喚醒排在它后面的一個(gè)線(xiàn)程去獲取鎖(如果非公平鎖,可能會(huì)被插隊(duì))。

核心字段

private volatile int state;//同步狀態(tài)
private transient volatile Node head;//同步隊(duì)列的頭指針
private transient volatile Node tail;//同步隊(duì)列的尾指針

核心方法

int getState();
void setState(int newState);
boolean compareAndSetState(int expect, int update);
boolean isHeldExclusively();

void acquire(long arg);
boolean release(long arg);
void acquireShared(long arg);
boolean releaseShared(long arg);

同步隊(duì)列的Node節(jié)點(diǎn)

這里最好結(jié)合源碼的英文注釋理解

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

獨(dú)占式同步狀態(tài)的獲取源碼分析

acquire方法

首先非阻塞的獲取同步狀態(tài)(tryAcquire方法),如果失敗的話(huà)會(huì)把當(dāng)前線(xiàn)程信息構(gòu)造一個(gè)Node節(jié)點(diǎn)加入到同步隊(duì)列的尾部進(jìn)行排隊(duì)(這里可能會(huì)使線(xiàn)程進(jìn)入waiting狀態(tài))

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire方法

非阻塞的獲取同步狀態(tài),失敗返回false.這個(gè)是子類(lèi)實(shí)現(xiàn)的。(模板方法模式)

addWaiter方法

將當(dāng)前節(jié)點(diǎn)加入到同步隊(duì)列的尾部

private Node addWaiter(Node mode) {
// 這里mode為EXCLUSIVE,也就是 node.nextWaiter==null.
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
//下面if邏輯是嘗試快速插入到隊(duì)列尾部,如果失敗到enq函數(shù)
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);//當(dāng)前節(jié)點(diǎn)插入到隊(duì)列尾部
        return node;
    }

enq 方法

當(dāng)前節(jié)點(diǎn)插入到隊(duì)列尾部,保證成功,可能重試多次。采用CAS更改改共享變量的標(biāo)準(zhǔn)寫(xiě)法。

 private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued方法

對(duì)于剛進(jìn)入隊(duì)列排隊(duì)的節(jié)點(diǎn)或者剛被喚醒的線(xiàn)程(LockSupport.unpark(thread)),會(huì)檢查它前面的那個(gè)節(jié)點(diǎn)是不是head節(jié)點(diǎn),如果是的話(huà)嘗試獲取鎖。如果不是的話(huà)會(huì)判斷當(dāng)前線(xiàn)程是否可以到waiting狀態(tài)(釋放cpu資源,避免盲等),(判斷的依據(jù)主要是當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)的waitStatus是否為SIGNAL),如果可以睡眠,就會(huì)睡眠直到被前一個(gè)節(jié)點(diǎn)喚醒。

 final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //如果節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn),并且獲取同步狀態(tài)成功,則把當(dāng)前節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn),然后返回
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
              // 判斷當(dāng)前節(jié)點(diǎn)是否可以睡眠或者把當(dāng)前節(jié)點(diǎn)狀態(tài)改到可睡眠狀態(tài)(可以睡否最重要的依據(jù)是睡后有人喚醒你,否則你就醒不來(lái)了)
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire方法

如果暫時(shí)不能獲取同步狀態(tài),線(xiàn)程會(huì)考慮睡一會(huì)(park)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)的waitStatus為Node.SIGNAL,則其前驅(qū)節(jié)點(diǎn)在釋放鎖之后就會(huì)喚醒(uppark)當(dāng)前線(xiàn)程,所以當(dāng)前線(xiàn)程可以放心的park
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
//如果前驅(qū)節(jié)點(diǎn)已經(jīng)放棄獲取鎖,更新其前驅(qū)節(jié)點(diǎn)
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            //嘗試把前驅(qū)節(jié)點(diǎn)狀態(tài)改為Node.SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt方法

當(dāng)前線(xiàn)程park,到waiting 狀態(tài),線(xiàn)程阻塞于當(dāng)前方法直到被中斷或者unpark

 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

獨(dú)占式同步狀態(tài)釋放源碼分析

release方法

釋放同步狀態(tài),喚醒隊(duì)里中的下一個(gè)節(jié)點(diǎn)。

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

unparkSuccessor方法

喚醒隊(duì)列中下一個(gè)節(jié)點(diǎn)去參與鎖的競(jìng)爭(zhēng)

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

共享式獲取同步狀態(tài)源碼分析

acquireShared方法

如果沒(méi)有獲取同步狀態(tài)成功,就會(huì)將當(dāng)前節(jié)點(diǎn)更新加入到同步隊(duì)列。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

doAcquireShared

同步隊(duì)列中獲取獨(dú)占式同步狀態(tài),類(lèi)似于獨(dú)占模式同步狀態(tài)的獲取。

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //如果當(dāng)前節(jié)點(diǎn)獲取同步狀態(tài)成功,會(huì)將當(dāng)前節(jié)點(diǎn)設(shè)為頭節(jié)點(diǎn),
                       //并且傳播到它的下一個(gè)節(jié)點(diǎn)(下一個(gè)節(jié)點(diǎn)嘗試獲取同步狀態(tài))
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

setHeadAndPropagate方法

設(shè)置node節(jié)點(diǎn)為head節(jié)點(diǎn),并且嘗試傳播到下一個(gè)節(jié)點(diǎn)(因?yàn)楣蚕砟J降耐綘顟B(tài)是可以有多個(gè)線(xiàn)程同時(shí)獲取的)

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
         //如果下一個(gè)節(jié)點(diǎn)是共享模式的節(jié)點(diǎn),就考慮喚醒下一個(gè)節(jié)點(diǎn)去競(jìng)爭(zhēng)性獲取共享狀態(tài)。
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

共享式釋放同步狀態(tài)源碼分析

releaseShared方法

 public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容