8.AQS共享鎖

一、Semaphore實現(xiàn)原理解析

1.1Semaphore實例

image.png
package executors;

import java.util.concurrent.Semaphore;

public class SemphoreTest {
    public static void main(String[] args) {
        //信號量
        Semaphore windows = new Semaphore(3);
        for(int i=0;i<5;i++){
            new Thread(()->{
                try {
                    windows.acquire();
                    System.out.println(Thread.currentThread().getName()+"--開始買票");
                    Thread.sleep(5000l);
                    System.out.println(Thread.currentThread().getName()+"--結束買票");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    windows.release();
                }
            },"買票者"+i).start();
        }

    }
}

1.2Semaphore源碼解析

new Semaphore(3)創(chuàng)建了一個非公平的同步器,并設置同步器維護的state=3

public class Semaphore implements java.io.Serializable {
    //同步器
    private final Sync sync;
    public Semaphore(int permits) {
        //非公平同步器
        sync = new NonfairSync(permits);
    }
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
    abstract static class Sync extends AbstractQueuedSynchronizer {
       Sync(int permits) {
            setState(permits);
       }
    
    }
}

windows.acquire()分析
因為資源設置的是3也就是state=3,有三個資源可以同時被占用;前三個線程可以順利的獲取資源
每次獲取資源state狀態(tài)-1。


image.png

從第四個線程開始由于資源state=0 已經(jīng)沒有資源了所以會進行阻塞


image.png
public class Semaphore implements java.io.Serializable {
    //同步器
    private final Sync sync;
    public Semaphore(int permits) {
        //非公平同步器
        sync = new NonfairSync(permits);
    }
    public void acquire() throws InterruptedException {
        //1.調(diào)用AbstractQueuedSynchronizer.acquireSharedInterruptibly(arg)方法
        sync.acquireSharedInterruptibly(1);
    }
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
    abstract static class Sync extends AbstractQueuedSynchronizer {
      ...
      final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
    

}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
     ...
     public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0) //2.調(diào)回NonfairSync.tryAcquireShared減資源及state=state-1
            //state<0說明沒有資源了,這時候需要構建等待隊列,并當前阻塞線程。
            doAcquireSharedInterruptibly(arg);
    }
    
    /*
    1.構建等待隊列,并入隊操作
    2.設置前驅節(jié)點的waitState=-1
    3.阻塞當前節(jié)點的線程。
    */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //入等待隊列Node.SHARED=new Node()---用于建立初始頭或共享標記
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //獲取前驅節(jié)點
                final Node p = node.predecessor();
                //如果前驅節(jié)點是head節(jié)點,嘗試獲取鎖
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //設置頭結點并傳播
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&//設置前驅節(jié)點為-1,如果前驅節(jié)點已經(jīng)是-1了返回true
                    parkAndCheckInterrupt())//調(diào)用LockSupport.park(this);阻塞當前線程
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    private Node addWaiter(Node mode) {
        /*
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        */
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //構建隊列
        enq(node);
        return node;
    }
    /*
    設置前驅節(jié)點為-1,如果前驅節(jié)點已經(jīng)是-1了返回true
    */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //設置前驅節(jié)-1
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
}

semaphore.release();

public class Semaphore implements java.io.Serializable {
    //同步器
    private final Sync sync;
    public Semaphore(int permits) {
        //非公平同步器
        sync = new NonfairSync(permits);
    }
    public void release() {
        sync.releaseShared(1);
    }
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
    abstract static class Sync extends AbstractQueuedSynchronizer {
      ...
      protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
      }
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
     ...
     public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//設置state狀態(tài)state=state+1
            doReleaseShared();//
            return true;
        }
        return false;
    }
    /*
    
    */
    private void doReleaseShared() {
      
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);//喚醒頭結點的第二個節(jié)點
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
   
    private void unparkSuccessor(Node node) {
       
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
            
        Node s = node.next;//頭結點的第二個節(jié)點
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //喚醒頭節(jié)的第二個節(jié)點
        if (s != null)
            LockSupport.unpark(s.thread);
    }

}

被喚醒后出隊邏輯

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
     ...
    /*
    線程喚醒后出隊
    */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
       
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                 
                final Node p = node.predecessor();
                if (p == head) {
                    //這里如果獲取鎖成功
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //設置head為當前node節(jié)點,并釋放原有的head節(jié)點
                        //設置后,從tail開始循環(huán)遍歷所有節(jié)點喚醒隊列中的線程
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//線程在這里被喚醒,喚醒后再次進入循環(huán)體
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
    //釋放原有head節(jié)點,讓當前節(jié)點設置為head
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
   
}
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容