1. zk分布式鎖原理
(參考:https://blog.csdn.net/xuefeng0707/article/details/80588855)
zk使用臨時(shí)順序節(jié)點(diǎn)+Watch機(jī)制實(shí)現(xiàn)分布式鎖:
- 加鎖:創(chuàng)建臨時(shí)順序節(jié)點(diǎn),如果該節(jié)點(diǎn)是最小節(jié)點(diǎn)則獲取到鎖,否則對(duì)上一個(gè)臨時(shí)節(jié)點(diǎn)注冊(cè)監(jiān)聽。
- 解鎖:刪除節(jié)點(diǎn)。
下面我們?cè)敿?xì)分析下加鎖和解鎖的過程,暫不考慮可重入性和鎖超時(shí),參考了curator源碼。
1.1 加鎖
過程1:創(chuàng)建臨時(shí)順序節(jié)點(diǎn)
舉例子說明吧,在下面將要用到的例子中,我們會(huì)在“/lock/test/testZkLock/”目錄下創(chuàng)建臨時(shí)順序子節(jié)點(diǎn),子節(jié)點(diǎn)的名字不需要我們?nèi)ッ?,curator框架會(huì)自動(dòng)命名為“l(fā)ock-”,創(chuàng)建完臨時(shí)順序節(jié)點(diǎn)之后我們能看到這樣的結(jié)構(gòu):

如果是多個(gè)進(jìn)程或者線程競爭鎖資源,就會(huì)在“/lock/test/testZkLock/”目錄下依次創(chuàng)建臨時(shí)順序節(jié)點(diǎn),在一次運(yùn)行中我們啟動(dòng)3個(gè)線程,看到創(chuàng)建了3個(gè)臨時(shí)節(jié)點(diǎn):

過程2:競爭鎖
在這個(gè)過程中會(huì)判斷當(dāng)前線程創(chuàng)建的臨時(shí)順序節(jié)點(diǎn)是不是序號(hào)最小的那個(gè),如果是則獲取到鎖、繼續(xù)執(zhí)行后面的任務(wù),否則的話對(duì)比當(dāng)前序號(hào)小的節(jié)點(diǎn)注冊(cè)監(jiān)聽。舉例說明,假設(shè)在某一瞬間,某一共享資源下創(chuàng)建的臨時(shí)節(jié)點(diǎn)是這樣的:
lock_001
lock_002
lock_003
那么在加鎖時(shí),
lock_001的線程可以獲取到鎖,
lock_002在lock_001上注冊(cè)Watch監(jiān)聽,然后wait()進(jìn)入等待,
同理lock_003在lock_002上注冊(cè)監(jiān)聽。


curator源碼:

1.2 解鎖
不考慮可重入的話,解鎖直接刪除臨時(shí)節(jié)點(diǎn):
client.delete().guaranteed().forPath(ourPath);
當(dāng)該臨時(shí)節(jié)點(diǎn)被刪除時(shí),注冊(cè)在該節(jié)點(diǎn)上的Watcher會(huì)監(jiān)聽到zk事件,然后執(zhí)行process()方法:

notifyFromWatcher()其實(shí)就一行代碼:

仍然舉上面的例子,假設(shè)目前臨時(shí)有序節(jié)點(diǎn)為:lock_001、lock_002、lock_003
當(dāng)lock_001因?yàn)獒尫沛i而被刪除的時(shí)候,會(huì)觸發(fā)zk通知機(jī)制,那么lock_002節(jié)點(diǎn)會(huì)收到事件通知,在收到通知的時(shí)候執(zhí)行“notifyAll();”,通知當(dāng)前JVM進(jìn)程中所有處于wait狀態(tài)的線程起來搶鎖。
分布式環(huán)境下有多個(gè)進(jìn)程,為什么只通知本進(jìn)程的就可以?
假設(shè):
節(jié)點(diǎn)2監(jiān)聽節(jié)點(diǎn)1是在進(jìn)程A中進(jìn)行的;
節(jié)點(diǎn)3監(jiān)聽節(jié)點(diǎn)2是在進(jìn)程B中進(jìn)行的;
當(dāng)節(jié)點(diǎn)1刪除的時(shí)候,通知到進(jìn)程A,節(jié)點(diǎn)2就能收到;
而節(jié)點(diǎn)2刪除的時(shí)候,通知到進(jìn)程B,節(jié)點(diǎn)3就能收到。
意思是誰注冊(cè)的監(jiān)聽就通知誰。
2. 使用Curator實(shí)現(xiàn)分布式鎖
Curator是Netfix公司開源的一套ZooKeeper客戶端框架,對(duì)zk底層的連接、監(jiān)聽等進(jìn)行了良好的封裝,并且還提供了分布式鎖API,因此我們不必自己實(shí)現(xiàn)上述復(fù)雜的理論,直接使用curator框架即可。
Curator分布式鎖是一種可重入鎖,實(shí)現(xiàn)了分布式的AQS,使用ConcurrentMap實(shí)現(xiàn)了一個(gè)類似ThreadLocal的功能,把線程(Thread.currentThread())作為key,鎖作為value,在加鎖時(shí),如果在key中找到該線程,就對(duì)value加1,解鎖時(shí)減1,減到0的時(shí)候刪除臨時(shí)節(jié)點(diǎn)、移除map中的該線程。
首先定義鎖接口:
package utils.distributed.lock;
public interface DistributedLock {
/**
* 阻塞式鎖
* @return
*/
void lock();
/**
* 非阻塞式鎖
* @return
*/
boolean tryLock();
/**
* 帶超時(shí)時(shí)間的阻塞式鎖
* @param timeout
* @return
*/
boolean tryLock(long timeout);
/**
* 解鎖
*/
void unLock() throws Exception;
/**
* 釋放資源
*/
void shutdown();
}
然后實(shí)現(xiàn)一個(gè)zk鎖:
package utils.distributed.lock;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
* zk分布式鎖
*/
public class ZKLock implements DistributedLock {
private CuratorFramework client;
private InterProcessMutex lock;
public ZKLock(String host, String bizType, String lockKey) {
client = CuratorFrameworkFactory.newClient(host,
new ExponentialBackoffRetry(ZKLockConstant.BASE_SLEEP_TIME_MS, ZKLockConstant.MAX_RETRIES));
client.start();
String path = ZKLockConstant.ZK_SEPERATOR + StringUtils.join(Arrays.asList(ZKLockConstant.ZK_LOCK_BASE_PREFIX, bizType, lockKey), ZKLockConstant.ZK_SEPERATOR);
lock = new InterProcessMutex(client, path);
}
public void lock() {
try {
lock.acquire();
} catch (Exception e) {
e.printStackTrace();
}
}
public boolean tryLock() {
return tryLock(0);
}
public boolean tryLock(long timeout) {
try {
return lock.acquire(timeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public void unLock() throws Exception {
lock.release();
}
public void shutdown() {
client.close();
}
}
3. 測試
3.1 測試lock
package mytest.distributed.lock;
import utils.distributed.lock.ZKLock;
import java.util.concurrent.CountDownLatch;
public class ZkLockTest {
public static void main(String[] args) {
String zkHost = "192.168.160.128:2181";
String bizType = "test";
String lockKey = "testZkLock";
ZKLock zkLock = new ZKLock(zkHost, bizType, lockKey);
// 啟動(dòng)3個(gè)線程模擬分布式鎖競爭
CountDownLatch conutDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
testZkLock(zkLock);
conutDownLatch.countDown();
}).start();
}
try {
conutDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
zkLock.shutdown();
}
private static void testZkLock(ZKLock zkLock) {
System.out.println("######## 開始加鎖,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
zkLock.lock();
try {
System.out.println("######## 加鎖成功,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
try {
zkLock.unLock();
System.out.println("######## 解鎖成功,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
} catch (Exception e) {
System.out.println("######## 解鎖失敗,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
}
}
}
}
打印結(jié)果:

全部成功了。
3.2 測試tryLock()
復(fù)用上面的main方法,將testZkLock換成testZkLockTryLock:
private static void testZkLockTryLock(ZKLock zkLock) {
System.out.println("######## 開始加鎖,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
if (!zkLock.tryLock()) {
System.out.println("######## 加鎖失敗,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
return;
}
try {
System.out.println("######## 加鎖成功,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
} finally {
try {
zkLock.unLock();
System.out.println("######## 解鎖成功,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
} catch (Exception e) {
System.out.println("######## 解鎖失敗,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
}
}
}
打印結(jié)果:

因?yàn)橹粐L試一次,所以有失敗的情況發(fā)生。
3.3 測試tryLock(timeout)
private static void testZkLockTryLockTimeOut(ZKLock zkLock) {
System.out.println("######## 開始加鎖,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
if (!zkLock.tryLock(3000)) {
System.out.println("######## 加鎖失敗,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
return;
}
try {
System.out.println("######## 加鎖成功,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
try {
zkLock.unLock();
System.out.println("######## 解鎖成功,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
} catch (Exception e) {
System.out.println("######## 解鎖失敗,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
}
}
}
打印結(jié)果:

線程18獲取鎖之后要持有5s才會(huì)釋放,而線程16、17加鎖等待時(shí)間只有3s,因此會(huì)超時(shí)。
4. 分布式鎖注解
實(shí)現(xiàn)一個(gè)基于注解的分布式鎖:
4.1 定義分布式鎖注解
package utils.distributed.annotation;
import java.lang.annotation.*;
/**
* ZK分布式鎖注解
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ZkLock {
/**
* zk地址
* @return
*/
String zkHost();
/**
* 業(yè)務(wù)類型
*/
String bizType();
/**
* 鎖名稱
*
* @return
*/
String lockKey();
/**
* 超時(shí)時(shí)間
* @return
*/
long timeout();
}
4.2 定義分布式鎖切面
package utils.distributed.advice;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import utils.distributed.annotation.ZkLock;
import utils.distributed.lock.ZKLock;
/**
* @Description zk分布式鎖切面
* @Author lilong
* @Date 2019-04-08 13:50
*/
@Component
@Aspect
public class ZkLockAspectAdvice {
@Around(value = "@annotation(utils.distributed.annotation.ZkLock) && @annotation(zkLock)")
public Object process(ProceedingJoinPoint pjp, ZkLock zkLock) throws Throwable {
ZKLock lock = new ZKLock(zkLock.zkHost(), zkLock.bizType(), zkLock.lockKey());
boolean acquired = false;
try {
acquired = lock.tryLock(zkLock.timeout());
if (acquired) {
return pjp.proceed();
} else {
System.out.println("######## 加鎖失敗,線程信息:" + Thread.currentThread().getId() + "-"
+ Thread.currentThread().getName());
return null;
}
} finally {
if (acquired) {
lock.unLock();
}
}
}
}
4.3 使用注解
@Override
@ZkLock(zkHost = "192.168.160.128:2181", bizType = "test", lockKey = "queryKeyValue", timeout = 3000)
public KeyValueJsonPO queryKeyValue(String bizType, String key) {
return keyValueJsonPOMapper.queryKeyValue(bizType, key);
}