Java并發(fā)編程——信號(hào)量與互斥量

信號(hào)量用于線程同步,互斥量用戶保護(hù)資源的互斥訪問(wèn)。

信號(hào)量與互斥量的區(qū)別

  • 互斥量用于線程的互斥,信號(hào)線用于線程的同步。
  • 互斥量值只能為0/1,信號(hào)量值可以為非負(fù)整數(shù)。信號(hào)量可以實(shí)現(xiàn)多個(gè)同類資源的多線程互斥和同步。
  • 互斥量的加鎖和解鎖必須由同一線程分別對(duì)應(yīng)使用,信號(hào)量可以由一個(gè)線程釋放,另一個(gè)線程得到。

信號(hào)量Semaphore

信號(hào)量是在多線程環(huán)境中,線程間傳遞信號(hào)的一種方式。

簡(jiǎn)單的Semaphore實(shí)現(xiàn)

public class Semaphore {
private boolean signal = false;   //使用signal可以避免信號(hào)丟失
public synchronized void take() {
    this.signal = true;
    this.notify();
}
public synchronized void release() throws InterruptedException{
    while(!this.signal) //使用while避免假喚醒
        wait();
    this.signal = false;
    }
}

使用場(chǎng)景

Semaphore semaphore = new Semaphore();
SendingThread sender = new SendingThread(semaphore);
ReceivingThread receiver = new ReceivingThread(semaphore);
receiver.start();
sender.start();

public class SendingThread {
    Semaphore semaphore = null;
    public SendingThread(Semaphore semaphore){
        this.semaphore = semaphore;
    }
    public void run(){
        while(true){
            //do something, then signal
            this.semaphore.take();
        }
    }
}

public class RecevingThread {
    Semaphore semaphore = null;
    public ReceivingThread(Semaphore semaphore){
        this.semaphore = semaphore;
    }
    public void run(){
        while(true){
        this.semaphore.release();
        //receive signal, then do something...
        }
    }
}

可計(jì)數(shù)的Semaphore

上面提到的Semaphore的簡(jiǎn)單實(shí)現(xiàn)并沒(méi)有計(jì)算通過(guò)調(diào)用take方法所產(chǎn)生信號(hào)的數(shù)量??梢园阉脑斐删哂杏?jì)數(shù)功能的Semaphore。

public class CountingSemaphore {
    private int signals = 0;
    public synchronized void take() {
        this.signals++;
        this.notify();
    }
public synchronized void release() throws InterruptedException{
    while(this.signals == 0) 
        wait();
    this.signals--;
    }
}

有上限的Semaphore

可以將上面的CountingSemaphore改造成一個(gè)信號(hào)數(shù)量有上限的BoundedSemaphore

public class BoundedSemaphore {
    private int signals = 0;
    private int bound   = 0;
    public BoundedSemaphore(int upperBound){
        this.bound = upperBound;
    }
    public synchronized void take() throws InterruptedException{
        while(this.signals == bound) 
            wait();
        this.signals++;
        this.notify();
    }
    public synchronized void release() throws InterruptedException{
        while(this.signals == 0) 
            wait();
        this.signals--;
        this.notify();
    }
}

在BoundedSemaphore中,當(dāng)已經(jīng)產(chǎn)生的信號(hào)數(shù)量達(dá)到了上限,take方法將阻塞新的信號(hào)產(chǎn)生請(qǐng)求,直到某個(gè)線程調(diào)用release方法后,被阻塞于take方法的線程才能傳遞自己的信號(hào)。

Java內(nèi)置的Semaphore

java.util.concurrent包中有Semaphore的實(shí)現(xiàn),可以設(shè)置參數(shù),控制同時(shí)訪問(wèn)的個(gè)數(shù)。
下面的Demo中申明了一個(gè)只有5個(gè)許可的Semaphore,而有20個(gè)線程要訪問(wèn)這個(gè)資源,通過(guò)acquire()和release()獲取和釋放訪問(wèn)許可。

final Semaphore semp = new Semaphore(5);
ExecutorService exec = Executors.newCachedThreadPool();
for (int index = 0; index < 20; index++) {
    final int NO = index;
    Runnable run = new Runnable() {
        public void run() {
            try {
                // 獲取許可
                semp.acquire();
                System.out.println("Accessing: " + NO);
                Thread.sleep((long) (Math.random() * 10000));
                // 訪問(wèn)完后,釋放
                semp.release();
                System.out.println("-----------------" + semp.availablePermits());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };
    exec.execute(run);
}
exec.shutdown();

互斥量Mutex

互斥量:提供對(duì)資源的獨(dú)占訪問(wèn),只能為0/1,如果某一個(gè)資源同時(shí)只能允許一個(gè)訪問(wèn)者對(duì)其訪問(wèn),可以使用互斥量控制線程對(duì)其訪問(wèn)。

互斥量實(shí)現(xiàn):

public class Mutex {
private boolean isLocked = false;
public synchronized void lock() {
    while(this.isLocked) //使用while可以避免線程 假喚醒
        wait();
    this.isLocked= true;
    }
}
public synchronized void unlock() throws InterruptedException{
    this.isLocked= false;
    this.notify();
    }
}

在Mutex中,我們添加了一個(gè)signal用于保存信號(hào)。

將互斥量當(dāng)作來(lái)使用:

Mutex mutex = new Mutex();
mutex.lock();
...
//臨界區(qū)
mutex.unlock();

互斥量的加鎖和解鎖必須由同一個(gè)線程分別對(duì)應(yīng)使用。

參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容