一、Semaphore實現(xiàn)原理解析
1.1Semaphore實例

image.png
package executors;
import java.util.concurrent.Semaphore;
public class SemphoreTest {
public static void main(String[] args) {
//信號量
Semaphore windows = new Semaphore(3);
for(int i=0;i<5;i++){
new Thread(()->{
try {
windows.acquire();
System.out.println(Thread.currentThread().getName()+"--開始買票");
Thread.sleep(5000l);
System.out.println(Thread.currentThread().getName()+"--結束買票");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
windows.release();
}
},"買票者"+i).start();
}
}
}
1.2Semaphore源碼解析
new Semaphore(3)創(chuàng)建了一個非公平的同步器,并設置同步器維護的state=3
public class Semaphore implements java.io.Serializable {
//同步器
private final Sync sync;
public Semaphore(int permits) {
//非公平同步器
sync = new NonfairSync(permits);
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
}
}
windows.acquire()分析
因為資源設置的是3也就是state=3,有三個資源可以同時被占用;前三個線程可以順利的獲取資源
每次獲取資源state狀態(tài)-1。

image.png
從第四個線程開始由于資源state=0 已經(jīng)沒有資源了所以會進行阻塞

image.png
public class Semaphore implements java.io.Serializable {
//同步器
private final Sync sync;
public Semaphore(int permits) {
//非公平同步器
sync = new NonfairSync(permits);
}
public void acquire() throws InterruptedException {
//1.調(diào)用AbstractQueuedSynchronizer.acquireSharedInterruptibly(arg)方法
sync.acquireSharedInterruptibly(1);
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
...
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //2.調(diào)回NonfairSync.tryAcquireShared減資源及state=state-1
//state<0說明沒有資源了,這時候需要構建等待隊列,并當前阻塞線程。
doAcquireSharedInterruptibly(arg);
}
/*
1.構建等待隊列,并入隊操作
2.設置前驅節(jié)點的waitState=-1
3.阻塞當前節(jié)點的線程。
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//入等待隊列Node.SHARED=new Node()---用于建立初始頭或共享標記
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//獲取前驅節(jié)點
final Node p = node.predecessor();
//如果前驅節(jié)點是head節(jié)點,嘗試獲取鎖
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//設置頭結點并傳播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&//設置前驅節(jié)點為-1,如果前驅節(jié)點已經(jīng)是-1了返回true
parkAndCheckInterrupt())//調(diào)用LockSupport.park(this);阻塞當前線程
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private Node addWaiter(Node mode) {
/*
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
*/
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//構建隊列
enq(node);
return node;
}
/*
設置前驅節(jié)點為-1,如果前驅節(jié)點已經(jīng)是-1了返回true
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//設置前驅節(jié)-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
}
semaphore.release();
public class Semaphore implements java.io.Serializable {
//同步器
private final Sync sync;
public Semaphore(int permits) {
//非公平同步器
sync = new NonfairSync(permits);
}
public void release() {
sync.releaseShared(1);
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
...
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//設置state狀態(tài)state=state+1
doReleaseShared();//
return true;
}
return false;
}
/*
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);//喚醒頭結點的第二個節(jié)點
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//頭結點的第二個節(jié)點
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//喚醒頭節(jié)的第二個節(jié)點
if (s != null)
LockSupport.unpark(s.thread);
}
}
被喚醒后出隊邏輯
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...
/*
線程喚醒后出隊
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//這里如果獲取鎖成功
int r = tryAcquireShared(arg);
if (r >= 0) {
//設置head為當前node節(jié)點,并釋放原有的head節(jié)點
//設置后,從tail開始循環(huán)遍歷所有節(jié)點喚醒隊列中的線程
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//線程在這里被喚醒,喚醒后再次進入循環(huán)體
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
//釋放原有head節(jié)點,讓當前節(jié)點設置為head
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
}