多線程設(shè)計(jì)模式5-single threaded Execution模式(附分布式環(huán)境下的操作)
single threaded Execution模式主要是用于確保同一時(shí)間內(nèi)只能讓一個(gè)線程執(zhí)行處理,說(shuō)通俗點(diǎn)就是對(duì)synchronized的標(biāo)準(zhǔn)化使用方式,這是比較基礎(chǔ)的,所以我們前面重點(diǎn)介紹下如何保證同一個(gè)Jvm進(jìn)程內(nèi)的多線程同步,后面擴(kuò)展開來(lái),保證多個(gè)Jvm進(jìn)程間多線程同步(分布式環(huán)境)。兩者有很大的相似性。
單jvm進(jìn)程下:
先看下一個(gè)簡(jiǎn)單的例子:
public class Ticket {
private int counter = 100;
public void dec() {
if(counter>0) {
System.out.println(Thread.currentThread().getName() + "號(hào)窗口賣出:" + this.counter-- + "號(hào)票");
}else{
System.out.println("票已售完");
}
}
}
public class StationThread extends Thread{
Ticket ticket;
public StationThread(Ticket ticket) {
this.ticket = ticket;
}
@Override
public void run() {
while (true) {
try{
Thread.sleep(500);
}catch(InterruptedException e){
e.printStackTrace();
}
ticket.dec();
}
}
}
public class Main {
public static void main(String[] args) {
System.out.println("Testing...");
Ticket ticket = new Ticket();
new StationThread(ticket).start();
new StationThread(ticket).start();
new StationThread(ticket).start();
}
}
這里打印結(jié)果可知,執(zhí)行明顯是錯(cuò)誤的:
Testing...
Thread-0號(hào)窗口賣出:100號(hào)票
Thread-1號(hào)窗口賣出:99號(hào)票
Thread-2號(hào)窗口賣出:98號(hào)票
Thread-1號(hào)窗口賣出:97號(hào)票
Thread-2號(hào)窗口賣出:97號(hào)票
Thread-0號(hào)窗口賣出:97號(hào)票
Thread-2號(hào)窗口賣出:96號(hào)票
Thread-1號(hào)窗口賣出:96號(hào)票
Thread-0號(hào)窗口賣出:96號(hào)票
......
為什么會(huì)出錯(cuò)呢?因?yàn)門icket不是線程安全的,this.counter—并不是一個(gè)原子性的操作,其中包含了讀取,修改,寫入,多個(gè)線程執(zhí)行的時(shí)候,這些命令會(huì)交錯(cuò)執(zhí)行,導(dǎo)致執(zhí)行結(jié)果與預(yù)期不一致。
接下來(lái),我們改下Ticket的方法:
public synchronized void dec() {
if(counter>0) {
System.out.println(Thread.currentThread().getName() + "號(hào)窗口賣出:" + this.counter-- + "號(hào)票");
}else{
System.out.println("票已售完");
}
}
添加了synchronized后,執(zhí)行結(jié)果正常:
Testing...
Thread-0號(hào)窗口賣出:100號(hào)票
Thread-1號(hào)窗口賣出:99號(hào)票
Thread-2號(hào)窗口賣出:98號(hào)票
Thread-0號(hào)窗口賣出:97號(hào)票
Thread-1號(hào)窗口賣出:96號(hào)票
Thread-2號(hào)窗口賣出:95號(hào)票
Thread-0號(hào)窗口賣出:94號(hào)票
Thread-1號(hào)窗口賣出:93號(hào)票
Thread-2號(hào)窗口賣出:92號(hào)票
Thread-1號(hào)窗口賣出:91號(hào)票
Thread-2號(hào)窗口賣出:90號(hào)票
Thread-0號(hào)窗口賣出:89號(hào)票
Thread-1號(hào)窗口賣出:88號(hào)票
Thread-0號(hào)窗口賣出:87號(hào)票
Thread-2號(hào)窗口賣出:86號(hào)票
Thread-1號(hào)窗口賣出:85號(hào)票
Thread-2號(hào)窗口賣出:84號(hào)票
Thread-0號(hào)窗口賣出:83號(hào)票
Thread-1號(hào)窗口賣出:82號(hào)票
Thread-2號(hào)窗口賣出:81號(hào)票
Thread-0號(hào)窗口賣出:80號(hào)票
Thread-1號(hào)窗口賣出:79號(hào)票
Thread-2號(hào)窗口賣出:78號(hào)票
Thread-0號(hào)窗口賣出:77號(hào)票
......
synchronized保證了方法只能由一個(gè)線程,防止了由多個(gè)線程交錯(cuò)執(zhí)行的情況。我們知道,編寫線程安全的代碼,核心在于對(duì)狀態(tài)訪問(wèn)操作進(jìn)行管理,特別是共享和可變的狀態(tài),這里Ticket就是一個(gè)共享資源(SharedResource),通過(guò)single thread execution模式,將非安全的方法聲明為synchronized方法,確保同一時(shí)間只被一個(gè)線程訪問(wèn)。
使用時(shí)注意事項(xiàng):
1、死鎖問(wèn)題:
死鎖指多個(gè)線程同時(shí)被阻塞,它們中的一個(gè)或者全部都在等待某個(gè)資源被釋放。由于線程被無(wú)限期地阻塞,因此程序不可能正常終止。就如同一座橋,只能容納一輛車,有兩輛車,相向而行,分到橋的中途,需要占據(jù)對(duì)方的空間才能通過(guò)。因此,就一直處于阻塞狀態(tài)。
死鎖主要是由于多個(gè)線程加鎖順序不一致導(dǎo)致的,可以從這里角度分析,防止死鎖。
2、性能
獲取鎖花費(fèi)時(shí)間,線程沖突會(huì)引起等待,為了提高性能,需要管理好鎖的臨界區(qū),確定同步代碼塊的合理大小。
多jvm進(jìn)程下:
分布式鎖的方式:
一、通過(guò)如下命令在redis中實(shí)現(xiàn)分布式鎖的功能
SET key value NX EX max-lock-time
其中NX是操作模式,表示只在鍵不存在時(shí),才對(duì)鍵進(jìn)行設(shè)置操作。
EX max-lock-time用于設(shè)置鍵的過(guò)期時(shí)間為max-lock-time秒。
這個(gè)命令連續(xù)兩次執(zhí)行結(jié)果如下:
test-redis:0>SET not-exists-key "value" NX EX 60
OK
test-redis:0>SET not-exists-key "value" NX EX 60
NULL
過(guò)六十秒后再執(zhí)行:
test-redis:0>SET not-exists-key "value" NX EX 60
OK
這樣,當(dāng)一個(gè)線程執(zhí)行命令成功,說(shuō)明key原本不存在,該線程成功得到了鎖;當(dāng)設(shè)置失敗時(shí),說(shuō)明key已經(jīng)存在,該線程搶鎖失敗。
當(dāng)?shù)竭_(dá)過(guò)期時(shí)間,或者key被刪除(del),說(shuō)明鎖被釋放,其他線程可以繼續(xù)執(zhí)行這個(gè)命令來(lái)獲取鎖。
通過(guò)redis實(shí)現(xiàn)分布式鎖有許多實(shí)現(xiàn)細(xì)節(jié)需要注意的:
1、很多人會(huì)通過(guò)setnx代替SET key value NX命令,但前者沒(méi)有設(shè)置過(guò)期時(shí)間的參數(shù),因此設(shè)置key值和設(shè)置過(guò)期時(shí)間便成為一個(gè)復(fù)合操作,不具備原子性,當(dāng)一個(gè)線程設(shè)置了key值,但未設(shè)置過(guò)期時(shí)間,這時(shí)相關(guān)的節(jié)點(diǎn)掛了,但key一直存在,那其他線程就永遠(yuǎn)無(wú)法獲取這個(gè)鎖了(死鎖)。
2、過(guò)期時(shí)間很難設(shè)置,如果設(shè)置短了,假設(shè)獲得鎖的A線程的任務(wù)還沒(méi)執(zhí)行完成,這時(shí)候鎖就被釋放了,其他線程就會(huì)獲得鎖,導(dǎo)致難以預(yù)料的一系列后果。如A線程執(zhí)行完后誤刪了后一個(gè)線程的鎖,共享數(shù)據(jù)被破壞等等。對(duì)此,我們可以通過(guò)開一個(gè)守護(hù)線程,當(dāng)線程任務(wù)未執(zhí)行完成,給鎖續(xù)期。
二、zookeeper實(shí)現(xiàn)分布式鎖
zookeeper分布式鎖,實(shí)現(xiàn)更加完善,封裝更好一點(diǎn),因此,使用更加方便。
首先介紹下zookeeper分布式鎖的實(shí)現(xiàn)原理。
為了構(gòu)建這個(gè)鎖,zookeeper創(chuàng)建一個(gè)持久的znode,它將作為父節(jié)點(diǎn)。試圖獲得鎖的客戶端將在父節(jié)點(diǎn)下面創(chuàng)建順序的、臨時(shí)的子節(jié)點(diǎn)。鎖是由子節(jié)點(diǎn)具有最低的序列號(hào)的客戶端進(jìn)程擁有的。在圖1中,鎖節(jié)點(diǎn)有三個(gè)子節(jié)點(diǎn),而節(jié)點(diǎn)1在這個(gè)時(shí)間點(diǎn)擁有鎖,因?yàn)樗男蛄刑?hào)是最低的。如果客戶端創(chuàng)建的節(jié)點(diǎn)不是最小節(jié)點(diǎn),就獲得該節(jié)點(diǎn)的上一順序節(jié)點(diǎn),并給它注冊(cè)watcher,同時(shí)在這里阻塞,等待監(jiān)聽(tīng)事件的發(fā)生。當(dāng)完成之后,關(guān)閉ZooKeeper連接,進(jìn)而可以引發(fā)監(jiān)聽(tīng)事件,釋放該鎖(在刪除節(jié)點(diǎn)1之后,鎖被釋放), 然后擁有節(jié)點(diǎn)2的客戶端擁有這個(gè)鎖,以此類推。

zookeeper實(shí)現(xiàn)類似等待隊(duì)列的機(jī)制,大大提升了搶鎖的效率。
另外我們來(lái)看下,zookeeper有沒(méi)有類似redis分布式鎖那樣的問(wèn)題。我們發(fā)現(xiàn)它是不需要設(shè)置過(guò)期時(shí)間的,當(dāng)任務(wù)完成時(shí),客戶端會(huì)刪除節(jié)點(diǎn),進(jìn)而釋放鎖;當(dāng)客戶端掛掉,相應(yīng)的臨時(shí)會(huì)自動(dòng)刪除,鎖被釋放,其下一個(gè)序列的節(jié)點(diǎn)會(huì)收到通知,獲取鎖;當(dāng)連接中斷時(shí)則根據(jù)配置的重試機(jī)制重新連接。
Apache Curator,包含了對(duì)zookeeper分布式鎖的實(shí)現(xiàn),下面是使用代碼,有興趣可以研究下源碼:
1、創(chuàng)建一個(gè)重試策略,然后使用CuratorFrameworkFactory.newClient()來(lái)獲得CuratorFramework的實(shí)例
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMills, maxRetries);
CuratorFramework client = CuratorFrameworkFactory.newClient(hosts, retryPolicy);
client.start();
2、為特定的鎖路徑(lockPath)創(chuàng)建一個(gè)進(jìn)程互斥鎖,獲取鎖,執(zhí)行一些操作,然后釋放鎖。
InterProcessLock lock = new InterProcessMutex(client, lockPath);
if (lock.acquire(waitTimeSeconds, TimeUnit.SECONDS)) {
try {
// do work while we hold the lock
} catch (Exception ex) {
// handle exceptions as appropriate
} finally {
lock.release();
}
} else {
// we timed out waiting for lock, handle appropriately
}
3、不要忘記關(guān)閉client
client.close();