工作需要。寫了一個基于ZK的分布式鎖,記錄一下:
原理
zk能保證集群上的路徑同一時刻只有一個客戶端來創(chuàng)建。因此,通過在集群上順序創(chuàng)建和刪除臨時路徑,在實現(xiàn)分布式鎖的獲取和釋放。
代碼
zk上有一個客戶端框架Curator已經(jīng)對分布式互斥鎖進行了封裝,幾乎是開箱即用:
- 封裝一下框架的初始化
public class ZKCuratorManager {
private static InterProcessMutex lock;
private static CuratorFramework cf;
private static String zkAddr = "*.*.*.*:2181";
private static String lockPath = "/distribute-lock";
static {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
cf = CuratorFrameworkFactory.builder()
.connectString(zkAddr)
.sessionTimeoutMs(2000)
.retryPolicy(retryPolicy)
.build();
cf.start();
}
public static InterProcessMutex getLock() {
lock = new InterProcessMutex(cf, lockPath);
return lock;
}
}
- 封裝一下工具類
public class ZKCuratorLockUtil {
/**
* 從配置類中獲取分布式鎖對象
*/
private static InterProcessMutex lock = ZKCuratorManager.getLock();
/**
* 加鎖
*
* @return
*/
public static boolean acquire() {
try {
lock.acquire();
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
/**
* 鎖的釋放
*/
public static void release() {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
-
測試
- 所有線程啟動后sleep5秒鐘
- 用CyclicBarrier使四個線程同時嘗試獲取鎖
- 結(jié)果應該是四個線程依次獲取-釋放鎖
public class ZkLockTest {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for (int i = 0; i < N; i++) {
new WriterTest(barrier).start();
}
System.out.println("END");
}
static class WriterTest extends Thread {
private CyclicBarrier cyclicBarrier;
public WriterTest(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("線程" + Thread.currentThread().getName() + "正在寫入數(shù)據(jù)...");
try {
//以睡眠來模擬寫入數(shù)據(jù)操作
Thread.sleep(5000);
System.out.println("線程" + Thread.currentThread().getName() + "寫入數(shù)據(jù)完畢,等待其他線程寫入完畢");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("所有線程寫入完畢,繼續(xù)處理其他任務...");
//加鎖
ZKCuratorLockUtil.acquire();
System.out.println("線程" + Thread.currentThread().getName() + "獲得分布式鎖");
try {
Thread.sleep(2000);
ZKCuratorLockUtil.release();
System.out.println("線程" + Thread.currentThread().getName() + "釋放分布式鎖");
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("END");
}
}
}
- 測試結(jié)果
線程Thread-0正在寫入數(shù)據(jù)...
線程Thread-1正在寫入數(shù)據(jù)...
END
線程Thread-2正在寫入數(shù)據(jù)...
線程Thread-3正在寫入數(shù)據(jù)...
線程Thread-1寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-2寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-0寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-3寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
所有線程寫入完畢,繼續(xù)處理其他任務...
所有線程寫入完畢,繼續(xù)處理其他任務...
所有線程寫入完畢,繼續(xù)處理其他任務...
所有線程寫入完畢,繼續(xù)處理其他任務...
線程Thread-3獲得分布式鎖
線程Thread-3釋放分布式鎖
END
線程Thread-1獲得分布式鎖
線程Thread-1釋放分布式鎖
END
線程Thread-2獲得分布式鎖
線程Thread-2釋放分布式鎖
END
線程Thread-0獲得分布式鎖
線程Thread-0釋放分布式鎖
END