[Java源碼][并發(fā)J.U.C]---并發(fā)工具類Semaphore

前言

Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個線程,以保證合理的使用公共資源。

Semaphore可以用于做流量控制,特別是公用資源有限的應(yīng)用場景,比如一個擁有固定車位的停車場,當(dāng)停車場里面所有位置都已經(jīng)被占滿后,后面的車子只能等待車位,如果此時有1個車子離開此停車場,那么等待的汽車中就可以有1輛汽車進入該停車場,至于進入的規(guī)則,就需要看是Nonfair還是Fair機制了.

本文源代碼: 代碼下載

例子1:了解Semaphore

此例子初始一個大小為3的停車場,然后啟動6輛車去停車場停車,每輛車在停車場的時間由隨機數(shù)產(chǎn)生.

package com.sourcecode.concurrencytools_Semaphore;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class SemaphoreTest {
    static Random random = new Random();
    static class ParkingLot {
        Semaphore semaphore;
        ParkingLot(int size) {
            semaphore = new Semaphore(size);
        }
        public void park() {
            try {
                semaphore.acquire();
                int waitTime = random.nextInt(10);
                System.out.println(Thread.currentThread().getName() + " parks, it takes " + waitTime + " seconds.");
                TimeUnit.SECONDS.sleep(waitTime);
                System.out.println(Thread.currentThread().getName() + " leaves.");
                semaphore.release();
            } catch (InterruptedException ie) {
                ie.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
    }

    static class Car extends Thread {
        ParkingLot parkingLot;
        public Car(ParkingLot parkingLot) {
            this.parkingLot = parkingLot;
        }
        public void run() {
            this.parkingLot.park();
        }
    }

    public static void main(String[] args){
        ParkingLot parking = new ParkingLot(3);
        for(int i = 0 ; i < 6 ; i++){
            new Car(parking).start();
        }
    }
}

結(jié)果如下: 首先thread-1thread-2thread-0進入到停車場,然后后面的車沒辦法再進入到停車場,因為停車場已經(jīng)滿了,當(dāng)thread-0離開停車場后,可以有一輛車子進入到停車場,這個車子是thread-3. 后面的以此類推即可.

Thread-1 parks, it takes 1 seconds.
Thread-2 parks, it takes 3 seconds.
Thread-0 parks, it takes 0 seconds.
Thread-0 leaves.
Thread-3 parks, it takes 6 seconds.
Thread-1 leaves.
Thread-4 parks, it takes 6 seconds.
Thread-2 leaves.
Thread-5 parks, it takes 0 seconds.
Thread-5 leaves.
Thread-3 leaves.
Thread-4 leaves.

實現(xiàn)思路

該類與ReentrantLock實現(xiàn)類似,用到了公平和非公平機制,并且都是用到的共享鎖.
從圖中可以知道Semaphore有個內(nèi)部類Sync,該類繼承于AbstractQueuedSynchronizer(AQS是實現(xiàn)的基礎(chǔ)),并且該Sync有兩種實現(xiàn)類,FairSyncNonfairSync,分別代表公平機制和非公平機制.
公平機制表示的是在等待鎖的過程中如果可以獲得鎖了,先等待的線程必須要先獲得鎖.
ReentrantLock的實現(xiàn)也可以知道Semaphore的方法也是依賴于Sync的實例sync. (所以關(guān)鍵地方還是AQS,是必須要掌握的基礎(chǔ)).

semaphore_framework.png

Sync類及其子類

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        // 初始化當(dāng)前AQS狀態(tài)
        Sync(int permits) {
            setState(permits);
        }
        // 返回當(dāng)前AQS狀態(tài)
        final int getPermits() {
            return getState();
        }

        // 返回不公平機制下獲得共享鎖
        // 返回值小于0       表示沒有獲得鎖
        // 返回值大于等于0    表明獲得鎖
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        // 釋放共享鎖 釋放的個數(shù)releases
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        // 減少許可數(shù)量reductions
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        // 將許可數(shù)量減為0
        // 返回值是減少的數(shù)量
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }
// 非公平鎖
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    // 公平鎖
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        // 返回值小于0 表示沒有獲得鎖
        // 返回值大于等于0 表示獲得鎖
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                // 如果有前驅(qū)節(jié)點 保持公平機制 返回一個負數(shù)
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

其實這些跟重入鎖ReentrantLock差不多,沒什么太多好分析的.

構(gòu)造函數(shù)

    private final Sync sync;
    // 默認為非公平鎖
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    // 根據(jù)fair來判斷構(gòu)造公平鎖還是非公平鎖
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

acquire()方法和release()方法

由于Semaphore中的方法都是通過sync調(diào)用的并且原理類似,因此拿出acquire()release()方法進行分析.

acquire()方法

public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
}

作用: 可以響應(yīng)中斷式的獲取共享鎖.

該方法會調(diào)用AQS中的acquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

接著會根據(jù)子類Sync重寫的tryAcquireShared(arg)方法來判斷是否獲得鎖,所以如果是非公平鎖則調(diào)用nonfairTryAcquireShared(int acquires),如果是公平鎖就調(diào)用tryAcquireShared(int acquires)方法.

// 返回值小于0 表示沒有獲得鎖
        // 返回值大于等于0 表示獲得鎖
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                // 如果有前驅(qū)節(jié)點 保持公平機制 返回一個負數(shù)
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                    return remaining;
            }
        }

// 返回不公平機制下獲得共享鎖
        // 返回值小于0       表示沒有獲得鎖
        // 返回值大于等于0    表明獲得鎖
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                    return remaining;
            }
        }

可以看到公平鎖與非公平鎖的區(qū)別在于增加了一個hasQueuedPredecessors()判斷同步等待隊列中是否有等待的線程.

release()方法

public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

作用: 釋放permits個許可
與上面類似,它會先調(diào)用AQS中的releaseShared(int arg),進而調(diào)用Sync中的tryReleaseShared(int arg)方法判斷是否釋放成功.

// AQS
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
//Sync
// 釋放共享鎖 釋放的個數(shù)releases
      protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

可以看到acquire()release()在獲取和釋放信號量的時候沒有像重入鎖一樣去判斷是否是當(dāng)前線程,而是直接加上或者減去狀態(tài)數(shù),可以知道該狀態(tài)值是無狀態(tài)型的,跟線程無關(guān).因此也可以在同一個線程中在獲得信號量后繼續(xù)獲取信號量.

例子2: 測試一個線程中不斷獲取信號量

開啟一個大小為3的信號量,并且使用同一個線程獲得3次.

package com.sourcecode.concurrencytools_Semaphore;

import java.util.concurrent.TimeUnit;
public class SemaphoreTest3 {
    static Semaphore semaphore = new Semaphore(3);
    public static void main(String[] args) throws InterruptedException {
        MyThread thread1 = new MyThread();
        thread1.start();
        TimeUnit.SECONDS.sleep(2);
        semaphore.acquire();
        System.out.println(Thread.currentThread().getName() + " get locks.");
        semaphore.release();
        System.out.println(Thread.currentThread().getName() + " finishes");
    }

    static class MyThread extends Thread {
        public void run() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + " get Locks 1");
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + " get Locks 2");
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + " get Locks 3");
                TimeUnit.SECONDS.sleep(10);
                semaphore.release();
                System.out.println(Thread.currentThread().getName() + " release locks 1");
                semaphore.release();
                System.out.println(Thread.currentThread().getName() + " release locks 2");
                semaphore.release();
                System.out.println(Thread.currentThread().getName() + " release locks 3");
            } catch (InterruptedException ie) {
                ie.printStackTrace();
            }
        }
    }
}

結(jié)果如下: 可以看到線程0獲得了3次并且當(dāng)此線程釋放一個信號量后,主線程就可以去獲取信號量了.

Thread-0 get Locks 1
Thread-0 get Locks 2
Thread-0 get Locks 3
Thread-0 release locks 1
main get locks.
main finishes
Thread-0 release locks 2
Thread-0 release locks 3

例子3: 關(guān)于try...finally...的寫法討論

在網(wǎng)上有可以對于acquire()release使用try...finally...的寫法,有點疑惑便根據(jù)Semaphore源代碼分析和測試了一下.先看下面的例子.

初始化了一個大小為1的信號量,啟動一個線程myThread去獲取信號量,獲取信號量后讓其休眠10s后,再釋放信號量,所以讓另外一個線程myThread2在等待獲取信號量的時候發(fā)生中斷,此時根據(jù)我們上面分析源碼應(yīng)該可以知道acquire()是響應(yīng)中斷的,因此此時會拋出中斷異常(沒有成功獲得鎖),進而此時根據(jù)該寫法會即使沒有獲得信號量也要進入到finally代碼塊中去釋放信號量,進而會使信號量的許可證加1.(因為即使沒有獲得信號量的情況下去調(diào)用release()方法不會報錯,會使許可證加1).

package com.sourcecode.concurrencytools_Semaphore;

import java.util.concurrent.TimeUnit;
public class SemaphoreTest2 {

    static Semaphore semaphore = new Semaphore(1);

    public static void main(String[] args) throws InterruptedException {
        MyThread myThread = new MyThread();
        myThread.start();
        MyThread myThread2 = new MyThread();
        myThread2.start();
        TimeUnit.SECONDS.sleep(2);
        myThread2.interrupt();
    }

    static class MyThread extends Thread {
        public void run() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName() + " get Semaphore");
            } catch (InterruptedException ie) {
                System.out.println(Thread.currentThread().getName() + " semaphore.acquire, ie:" + ie);
            } finally {
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException ie) {
                    System.out.println(Thread.currentThread().getName() + "wait 10s, ie:" + ie);
                }
                System.out.println(Thread.currentThread().getName() + " release Semaphore.");
                semaphore.release();
                System.out.println(Thread.currentThread().getName() + " available:" + semaphore.availablePermits());
            }
        }
    }
}

結(jié)果如下: 可以看到Thread-1在沒有獲得信號量的情況下依然調(diào)用了release()方法進而使得信號量許可數(shù)量為2,明顯的錯誤了,因為總的信號量許可證數(shù)量是1.

Thread-0 get Semaphore
Thread-1 semaphore.acquire, ie:java.lang.InterruptedException
Thread-0 release Semaphore.
Thread-0 available:1
Thread-1 release Semaphore.
Thread-1 available:2

正確寫法參考上面的例子1或例子2即可.

參考

1. Java并發(fā)編程的藝術(shù)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容