多線程設(shè)計(jì)模式5-single threaded Execution模式(附分布式環(huán)境下的操作)

多線程設(shè)計(jì)模式5-single threaded Execution模式(附分布式環(huán)境下的操作)

single threaded Execution模式主要是用于確保同一時(shí)間內(nèi)只能讓一個(gè)線程執(zhí)行處理,說(shuō)通俗點(diǎn)就是對(duì)synchronized的標(biāo)準(zhǔn)化使用方式,這是比較基礎(chǔ)的,所以我們前面重點(diǎn)介紹下如何保證同一個(gè)Jvm進(jìn)程內(nèi)的多線程同步,后面擴(kuò)展開來(lái),保證多個(gè)Jvm進(jìn)程間多線程同步(分布式環(huán)境)。兩者有很大的相似性。

單jvm進(jìn)程下:

先看下一個(gè)簡(jiǎn)單的例子:

public class Ticket {

  private int counter = 100;

  public  void dec() {
    if(counter>0) {
      System.out.println(Thread.currentThread().getName() + "號(hào)窗口賣出:" + this.counter-- + "號(hào)票");
    }else{
      System.out.println("票已售完");
    }
  }

}

public class StationThread  extends Thread{
  Ticket ticket;

  public StationThread(Ticket ticket) {
    this.ticket = ticket;
  }
  @Override
  public void run() {
    while (true) {
      try{
        Thread.sleep(500);
      }catch(InterruptedException e){
        e.printStackTrace();
      }
      ticket.dec();
    }
  }

}


public class Main {

  public static void main(String[] args) {
    System.out.println("Testing...");
    Ticket ticket = new Ticket();
    new StationThread(ticket).start();
    new StationThread(ticket).start();
    new StationThread(ticket).start();
  }
}

這里打印結(jié)果可知,執(zhí)行明顯是錯(cuò)誤的:

Testing...
Thread-0號(hào)窗口賣出:100號(hào)票
Thread-1號(hào)窗口賣出:99號(hào)票
Thread-2號(hào)窗口賣出:98號(hào)票
Thread-1號(hào)窗口賣出:97號(hào)票
Thread-2號(hào)窗口賣出:97號(hào)票
Thread-0號(hào)窗口賣出:97號(hào)票
Thread-2號(hào)窗口賣出:96號(hào)票
Thread-1號(hào)窗口賣出:96號(hào)票
Thread-0號(hào)窗口賣出:96號(hào)票
......

為什么會(huì)出錯(cuò)呢?因?yàn)門icket不是線程安全的,this.counter—并不是一個(gè)原子性的操作,其中包含了讀取,修改,寫入,多個(gè)線程執(zhí)行的時(shí)候,這些命令會(huì)交錯(cuò)執(zhí)行,導(dǎo)致執(zhí)行結(jié)果與預(yù)期不一致。

接下來(lái),我們改下Ticket的方法:

 public synchronized void dec() {
    if(counter>0) {
      System.out.println(Thread.currentThread().getName() + "號(hào)窗口賣出:" + this.counter-- + "號(hào)票");
    }else{
      System.out.println("票已售完");
    }
  }

添加了synchronized后,執(zhí)行結(jié)果正常:

Testing...
Thread-0號(hào)窗口賣出:100號(hào)票
Thread-1號(hào)窗口賣出:99號(hào)票
Thread-2號(hào)窗口賣出:98號(hào)票
Thread-0號(hào)窗口賣出:97號(hào)票
Thread-1號(hào)窗口賣出:96號(hào)票
Thread-2號(hào)窗口賣出:95號(hào)票
Thread-0號(hào)窗口賣出:94號(hào)票
Thread-1號(hào)窗口賣出:93號(hào)票
Thread-2號(hào)窗口賣出:92號(hào)票
Thread-1號(hào)窗口賣出:91號(hào)票
Thread-2號(hào)窗口賣出:90號(hào)票
Thread-0號(hào)窗口賣出:89號(hào)票
Thread-1號(hào)窗口賣出:88號(hào)票
Thread-0號(hào)窗口賣出:87號(hào)票
Thread-2號(hào)窗口賣出:86號(hào)票
Thread-1號(hào)窗口賣出:85號(hào)票
Thread-2號(hào)窗口賣出:84號(hào)票
Thread-0號(hào)窗口賣出:83號(hào)票
Thread-1號(hào)窗口賣出:82號(hào)票
Thread-2號(hào)窗口賣出:81號(hào)票
Thread-0號(hào)窗口賣出:80號(hào)票
Thread-1號(hào)窗口賣出:79號(hào)票
Thread-2號(hào)窗口賣出:78號(hào)票
Thread-0號(hào)窗口賣出:77號(hào)票
......

synchronized保證了方法只能由一個(gè)線程,防止了由多個(gè)線程交錯(cuò)執(zhí)行的情況。我們知道,編寫線程安全的代碼,核心在于對(duì)狀態(tài)訪問(wèn)操作進(jìn)行管理,特別是共享和可變的狀態(tài),這里Ticket就是一個(gè)共享資源(SharedResource),通過(guò)single thread execution模式,將非安全的方法聲明為synchronized方法,確保同一時(shí)間只被一個(gè)線程訪問(wèn)。

使用時(shí)注意事項(xiàng):

1、死鎖問(wèn)題:

死鎖指多個(gè)線程同時(shí)被阻塞,它們中的一個(gè)或者全部都在等待某個(gè)資源被釋放。由于線程被無(wú)限期地阻塞,因此程序不可能正常終止。就如同一座橋,只能容納一輛車,有兩輛車,相向而行,分到橋的中途,需要占據(jù)對(duì)方的空間才能通過(guò)。因此,就一直處于阻塞狀態(tài)。

死鎖主要是由于多個(gè)線程加鎖順序不一致導(dǎo)致的,可以從這里角度分析,防止死鎖。

2、性能

獲取鎖花費(fèi)時(shí)間,線程沖突會(huì)引起等待,為了提高性能,需要管理好鎖的臨界區(qū),確定同步代碼塊的合理大小。

多jvm進(jìn)程下:

分布式鎖的方式:

一、通過(guò)如下命令在redis中實(shí)現(xiàn)分布式鎖的功能

SET key value NX EX max-lock-time

其中NX是操作模式,表示只在鍵不存在時(shí),才對(duì)鍵進(jìn)行設(shè)置操作。

EX max-lock-time用于設(shè)置鍵的過(guò)期時(shí)間為max-lock-time秒。

這個(gè)命令連續(xù)兩次執(zhí)行結(jié)果如下:

test-redis:0>SET not-exists-key "value" NX EX 60
OK

test-redis:0>SET not-exists-key "value" NX EX 60
NULL

過(guò)六十秒后再執(zhí)行:

test-redis:0>SET not-exists-key "value" NX EX 60
OK

這樣,當(dāng)一個(gè)線程執(zhí)行命令成功,說(shuō)明key原本不存在,該線程成功得到了鎖;當(dāng)設(shè)置失敗時(shí),說(shuō)明key已經(jīng)存在,該線程搶鎖失敗。

當(dāng)?shù)竭_(dá)過(guò)期時(shí)間,或者key被刪除(del),說(shuō)明鎖被釋放,其他線程可以繼續(xù)執(zhí)行這個(gè)命令來(lái)獲取鎖。

通過(guò)redis實(shí)現(xiàn)分布式鎖有許多實(shí)現(xiàn)細(xì)節(jié)需要注意的:

1、很多人會(huì)通過(guò)setnx代替SET key value NX命令,但前者沒(méi)有設(shè)置過(guò)期時(shí)間的參數(shù),因此設(shè)置key值和設(shè)置過(guò)期時(shí)間便成為一個(gè)復(fù)合操作,不具備原子性,當(dāng)一個(gè)線程設(shè)置了key值,但未設(shè)置過(guò)期時(shí)間,這時(shí)相關(guān)的節(jié)點(diǎn)掛了,但key一直存在,那其他線程就永遠(yuǎn)無(wú)法獲取這個(gè)鎖了(死鎖)。

2、過(guò)期時(shí)間很難設(shè)置,如果設(shè)置短了,假設(shè)獲得鎖的A線程的任務(wù)還沒(méi)執(zhí)行完成,這時(shí)候鎖就被釋放了,其他線程就會(huì)獲得鎖,導(dǎo)致難以預(yù)料的一系列后果。如A線程執(zhí)行完后誤刪了后一個(gè)線程的鎖,共享數(shù)據(jù)被破壞等等。對(duì)此,我們可以通過(guò)開一個(gè)守護(hù)線程,當(dāng)線程任務(wù)未執(zhí)行完成,給鎖續(xù)期。

二、zookeeper實(shí)現(xiàn)分布式鎖

zookeeper分布式鎖,實(shí)現(xiàn)更加完善,封裝更好一點(diǎn),因此,使用更加方便。

首先介紹下zookeeper分布式鎖的實(shí)現(xiàn)原理。

為了構(gòu)建這個(gè)鎖,zookeeper創(chuàng)建一個(gè)持久的znode,它將作為父節(jié)點(diǎn)。試圖獲得鎖的客戶端將在父節(jié)點(diǎn)下面創(chuàng)建順序的、臨時(shí)的子節(jié)點(diǎn)。鎖是由子節(jié)點(diǎn)具有最低的序列號(hào)的客戶端進(jìn)程擁有的。在圖1中,鎖節(jié)點(diǎn)有三個(gè)子節(jié)點(diǎn),而節(jié)點(diǎn)1在這個(gè)時(shí)間點(diǎn)擁有鎖,因?yàn)樗男蛄刑?hào)是最低的。如果客戶端創(chuàng)建的節(jié)點(diǎn)不是最小節(jié)點(diǎn),就獲得該節(jié)點(diǎn)的上一順序節(jié)點(diǎn),并給它注冊(cè)watcher,同時(shí)在這里阻塞,等待監(jiān)聽(tīng)事件的發(fā)生。當(dāng)完成之后,關(guān)閉ZooKeeper連接,進(jìn)而可以引發(fā)監(jiān)聽(tīng)事件,釋放該鎖(在刪除節(jié)點(diǎn)1之后,鎖被釋放), 然后擁有節(jié)點(diǎn)2的客戶端擁有這個(gè)鎖,以此類推。

圖1

zookeeper實(shí)現(xiàn)類似等待隊(duì)列的機(jī)制,大大提升了搶鎖的效率。

另外我們來(lái)看下,zookeeper有沒(méi)有類似redis分布式鎖那樣的問(wèn)題。我們發(fā)現(xiàn)它是不需要設(shè)置過(guò)期時(shí)間的,當(dāng)任務(wù)完成時(shí),客戶端會(huì)刪除節(jié)點(diǎn),進(jìn)而釋放鎖;當(dāng)客戶端掛掉,相應(yīng)的臨時(shí)會(huì)自動(dòng)刪除,鎖被釋放,其下一個(gè)序列的節(jié)點(diǎn)會(huì)收到通知,獲取鎖;當(dāng)連接中斷時(shí)則根據(jù)配置的重試機(jī)制重新連接。

Apache Curator,包含了對(duì)zookeeper分布式鎖的實(shí)現(xiàn),下面是使用代碼,有興趣可以研究下源碼:

1、創(chuàng)建一個(gè)重試策略,然后使用CuratorFrameworkFactory.newClient()來(lái)獲得CuratorFramework的實(shí)例

RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMills, maxRetries);

CuratorFramework client = CuratorFrameworkFactory.newClient(hosts, retryPolicy);

client.start();

2、為特定的鎖路徑(lockPath)創(chuàng)建一個(gè)進(jìn)程互斥鎖,獲取鎖,執(zhí)行一些操作,然后釋放鎖。

InterProcessLock lock = new InterProcessMutex(client, lockPath);

if (lock.acquire(waitTimeSeconds, TimeUnit.SECONDS)) {

  try {

    // do work while we hold the lock

  } catch (Exception ex) {

    // handle exceptions as appropriate

  } finally {

    lock.release();

  }

} else {

  // we timed out waiting for lock, handle appropriately

}

3、不要忘記關(guān)閉client

client.close();
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容