前言
Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個線程,以保證合理的使用公共資源。
Semaphore可以用于做流量控制,特別是公用資源有限的應(yīng)用場景,比如一個擁有固定車位的停車場,當(dāng)停車場里面所有位置都已經(jīng)被占滿后,后面的車子只能等待車位,如果此時有1個車子離開此停車場,那么等待的汽車中就可以有1輛汽車進入該停車場,至于進入的規(guī)則,就需要看是Nonfair還是Fair機制了.
本文源代碼: 代碼下載
例子1:了解Semaphore
此例子初始一個大小為
3的停車場,然后啟動6輛車去停車場停車,每輛車在停車場的時間由隨機數(shù)產(chǎn)生.
package com.sourcecode.concurrencytools_Semaphore;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class SemaphoreTest {
static Random random = new Random();
static class ParkingLot {
Semaphore semaphore;
ParkingLot(int size) {
semaphore = new Semaphore(size);
}
public void park() {
try {
semaphore.acquire();
int waitTime = random.nextInt(10);
System.out.println(Thread.currentThread().getName() + " parks, it takes " + waitTime + " seconds.");
TimeUnit.SECONDS.sleep(waitTime);
System.out.println(Thread.currentThread().getName() + " leaves.");
semaphore.release();
} catch (InterruptedException ie) {
ie.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
static class Car extends Thread {
ParkingLot parkingLot;
public Car(ParkingLot parkingLot) {
this.parkingLot = parkingLot;
}
public void run() {
this.parkingLot.park();
}
}
public static void main(String[] args){
ParkingLot parking = new ParkingLot(3);
for(int i = 0 ; i < 6 ; i++){
new Car(parking).start();
}
}
}
結(jié)果如下: 首先
thread-1和thread-2和thread-0進入到停車場,然后后面的車沒辦法再進入到停車場,因為停車場已經(jīng)滿了,當(dāng)thread-0離開停車場后,可以有一輛車子進入到停車場,這個車子是thread-3. 后面的以此類推即可.
Thread-1 parks, it takes 1 seconds.
Thread-2 parks, it takes 3 seconds.
Thread-0 parks, it takes 0 seconds.
Thread-0 leaves.
Thread-3 parks, it takes 6 seconds.
Thread-1 leaves.
Thread-4 parks, it takes 6 seconds.
Thread-2 leaves.
Thread-5 parks, it takes 0 seconds.
Thread-5 leaves.
Thread-3 leaves.
Thread-4 leaves.
實現(xiàn)思路
該類與
ReentrantLock實現(xiàn)類似,用到了公平和非公平機制,并且都是用到的共享鎖.
從圖中可以知道Semaphore有個內(nèi)部類Sync,該類繼承于AbstractQueuedSynchronizer(AQS是實現(xiàn)的基礎(chǔ)),并且該Sync有兩種實現(xiàn)類,FairSync和NonfairSync,分別代表公平機制和非公平機制.
公平機制表示的是在等待鎖的過程中如果可以獲得鎖了,先等待的線程必須要先獲得鎖.
從ReentrantLock的實現(xiàn)也可以知道Semaphore的方法也是依賴于Sync的實例sync. (所以關(guān)鍵地方還是AQS,是必須要掌握的基礎(chǔ)).
semaphore_framework.png
Sync類及其子類
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 初始化當(dāng)前AQS狀態(tài)
Sync(int permits) {
setState(permits);
}
// 返回當(dāng)前AQS狀態(tài)
final int getPermits() {
return getState();
}
// 返回不公平機制下獲得共享鎖
// 返回值小于0 表示沒有獲得鎖
// 返回值大于等于0 表明獲得鎖
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 釋放共享鎖 釋放的個數(shù)releases
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// 減少許可數(shù)量reductions
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// 將許可數(shù)量減為0
// 返回值是減少的數(shù)量
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
// 非公平鎖
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
// 公平鎖
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
// 返回值小于0 表示沒有獲得鎖
// 返回值大于等于0 表示獲得鎖
protected int tryAcquireShared(int acquires) {
for (;;) {
// 如果有前驅(qū)節(jié)點 保持公平機制 返回一個負數(shù)
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
其實這些跟重入鎖
ReentrantLock差不多,沒什么太多好分析的.
構(gòu)造函數(shù)
private final Sync sync;
// 默認為非公平鎖
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 根據(jù)fair來判斷構(gòu)造公平鎖還是非公平鎖
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
acquire()方法和release()方法
由于
Semaphore中的方法都是通過sync調(diào)用的并且原理類似,因此拿出acquire()和release()方法進行分析.
acquire()方法
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
作用: 可以響應(yīng)中斷式的獲取共享鎖.
該方法會調(diào)用
AQS中的acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
接著會根據(jù)子類
Sync重寫的tryAcquireShared(arg)方法來判斷是否獲得鎖,所以如果是非公平鎖則調(diào)用nonfairTryAcquireShared(int acquires),如果是公平鎖就調(diào)用tryAcquireShared(int acquires)方法.
// 返回值小于0 表示沒有獲得鎖
// 返回值大于等于0 表示獲得鎖
protected int tryAcquireShared(int acquires) {
for (;;) {
// 如果有前驅(qū)節(jié)點 保持公平機制 返回一個負數(shù)
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 返回不公平機制下獲得共享鎖
// 返回值小于0 表示沒有獲得鎖
// 返回值大于等于0 表明獲得鎖
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
可以看到公平鎖與非公平鎖的區(qū)別在于增加了一個
hasQueuedPredecessors()判斷同步等待隊列中是否有等待的線程.
release()方法
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
作用: 釋放
permits個許可
與上面類似,它會先調(diào)用AQS中的releaseShared(int arg),進而調(diào)用Sync中的tryReleaseShared(int arg)方法判斷是否釋放成功.
// AQS
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//Sync
// 釋放共享鎖 釋放的個數(shù)releases
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
可以看到
acquire()和release()在獲取和釋放信號量的時候沒有像重入鎖一樣去判斷是否是當(dāng)前線程,而是直接加上或者減去狀態(tài)數(shù),可以知道該狀態(tài)值是無狀態(tài)型的,跟線程無關(guān).因此也可以在同一個線程中在獲得信號量后繼續(xù)獲取信號量.
例子2: 測試一個線程中不斷獲取信號量
開啟一個大小為
3的信號量,并且使用同一個線程獲得3次.
package com.sourcecode.concurrencytools_Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreTest3 {
static Semaphore semaphore = new Semaphore(3);
public static void main(String[] args) throws InterruptedException {
MyThread thread1 = new MyThread();
thread1.start();
TimeUnit.SECONDS.sleep(2);
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " get locks.");
semaphore.release();
System.out.println(Thread.currentThread().getName() + " finishes");
}
static class MyThread extends Thread {
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " get Locks 1");
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " get Locks 2");
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " get Locks 3");
TimeUnit.SECONDS.sleep(10);
semaphore.release();
System.out.println(Thread.currentThread().getName() + " release locks 1");
semaphore.release();
System.out.println(Thread.currentThread().getName() + " release locks 2");
semaphore.release();
System.out.println(Thread.currentThread().getName() + " release locks 3");
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
結(jié)果如下: 可以看到線程
0獲得了3次并且當(dāng)此線程釋放一個信號量后,主線程就可以去獲取信號量了.
Thread-0 get Locks 1
Thread-0 get Locks 2
Thread-0 get Locks 3
Thread-0 release locks 1
main get locks.
main finishes
Thread-0 release locks 2
Thread-0 release locks 3
例子3: 關(guān)于try...finally...的寫法討論
在網(wǎng)上有可以對于
acquire()和release使用try...finally...的寫法,有點疑惑便根據(jù)Semaphore源代碼分析和測試了一下.先看下面的例子.
初始化了一個大小為
1的信號量,啟動一個線程myThread去獲取信號量,獲取信號量后讓其休眠10s后,再釋放信號量,所以讓另外一個線程myThread2在等待獲取信號量的時候發(fā)生中斷,此時根據(jù)我們上面分析源碼應(yīng)該可以知道acquire()是響應(yīng)中斷的,因此此時會拋出中斷異常(沒有成功獲得鎖),進而此時根據(jù)該寫法會即使沒有獲得信號量也要進入到finally代碼塊中去釋放信號量,進而會使信號量的許可證加1.(因為即使沒有獲得信號量的情況下去調(diào)用release()方法不會報錯,會使許可證加1).
package com.sourcecode.concurrencytools_Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreTest2 {
static Semaphore semaphore = new Semaphore(1);
public static void main(String[] args) throws InterruptedException {
MyThread myThread = new MyThread();
myThread.start();
MyThread myThread2 = new MyThread();
myThread2.start();
TimeUnit.SECONDS.sleep(2);
myThread2.interrupt();
}
static class MyThread extends Thread {
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " get Semaphore");
} catch (InterruptedException ie) {
System.out.println(Thread.currentThread().getName() + " semaphore.acquire, ie:" + ie);
} finally {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ie) {
System.out.println(Thread.currentThread().getName() + "wait 10s, ie:" + ie);
}
System.out.println(Thread.currentThread().getName() + " release Semaphore.");
semaphore.release();
System.out.println(Thread.currentThread().getName() + " available:" + semaphore.availablePermits());
}
}
}
}
結(jié)果如下: 可以看到
Thread-1在沒有獲得信號量的情況下依然調(diào)用了release()方法進而使得信號量許可數(shù)量為2,明顯的錯誤了,因為總的信號量許可證數(shù)量是1.
Thread-0 get Semaphore
Thread-1 semaphore.acquire, ie:java.lang.InterruptedException
Thread-0 release Semaphore.
Thread-0 available:1
Thread-1 release Semaphore.
Thread-1 available:2
正確寫法參考上面的例子
1或例子2即可.
參考
1. Java并發(fā)編程的藝術(shù)
