分布式鎖主要用于在分布式環(huán)境中保證數(shù)據(jù)的一致性。
包括跨進(jìn)程、跨機(jī)器、跨網(wǎng)絡(luò)導(dǎo)致共享資源不一致的問題。
分布式鎖的實(shí)現(xiàn)思路

image.png
這種實(shí)現(xiàn)會(huì)有一個(gè)缺點(diǎn),即當(dāng)有很多進(jìn)程在等待鎖的時(shí)候,在釋放鎖的時(shí)候會(huì)有很多進(jìn)程來(lái)爭(zhēng)奪鎖,這種現(xiàn)象稱為“驚群效應(yīng)”
分布式鎖優(yōu)化后的實(shí)現(xiàn)思路

image.png
分布式鎖實(shí)現(xiàn)類 ImproveLock
public class ImproveLock implements Lock {
private static Logger logger = LoggerFactory.getLogger(ImproveLock.class);
private static final String ZOOKEEPER_IP_PORT = "10.143.143.185:6182";
private static final String LOCK_PATH = "/LOCK";
private ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, 10000, 10000, new SerializableSerializer());
private CountDownLatch cdl;
private String beforePath;// 當(dāng)前請(qǐng)求的節(jié)點(diǎn)前一個(gè)節(jié)點(diǎn)
private String currentPath;// 當(dāng)前請(qǐng)求的節(jié)點(diǎn)
// 判斷有沒有LOCK目錄,沒有則創(chuàng)建
public ImproveLock() {
if (!this.client.exists(LOCK_PATH)) {
this.client.createPersistent(LOCK_PATH);
}
}
public boolean tryLock() {
// 如果currentPath為空則為第一次嘗試加鎖,第一次加鎖賦值currentPath
if (currentPath == null || currentPath.length() <= 0) {
// 創(chuàng)建一個(gè)臨時(shí)順序節(jié)點(diǎn)
currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
System.out.println("---------------------------->" + currentPath);
}
// 獲取所有臨時(shí)節(jié)點(diǎn)并排序,臨時(shí)節(jié)點(diǎn)名稱為自增長(zhǎng)的字符串如:0000000400
List<String> childrens = this.client.getChildren(LOCK_PATH);
Collections.sort(childrens);
if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {// 如果當(dāng)前節(jié)點(diǎn)在所有節(jié)點(diǎn)中排名第一則獲取鎖成功
return true;
} else {// 如果當(dāng)前節(jié)點(diǎn)在所有節(jié)點(diǎn)中排名中不是排名第一,則獲取前面的節(jié)點(diǎn)名稱,并賦值給beforePath
int wz = Collections.binarySearch(childrens, currentPath.substring(6));
beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
}
return false;
}
public void unlock() {
// 刪除當(dāng)前臨時(shí)節(jié)點(diǎn)
client.delete(currentPath);
}
public void lock() {
if (!tryLock()) {
waitForLock();
lock();
} else {
logger.info(Thread.currentThread().getName() + " 獲得分布式鎖!");
}
}
private void waitForLock() {
IZkDataListener listener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
logger.info(Thread.currentThread().getName() + ":捕獲到DataDelete事件!---------------------------");
if (cdl != null) {
cdl.countDown();
}
}
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
// 給排在前面的的節(jié)點(diǎn)增加數(shù)據(jù)刪除的watcher
this.client.subscribeDataChanges(beforePath, listener);
if (this.client.exists(beforePath)) {
cdl = new CountDownLatch(1);
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.client.unsubscribeDataChanges(beforePath, listener);
}
// ==========================================
public void lockInterruptibly() throws InterruptedException {
}
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
public Condition newCondition() {
return null;
}
}
自增長(zhǎng)序列 OrderCodeGenerator
public class OrderCodeGenerator {
// 自增長(zhǎng)序列
private static int i = 0;
// 按照“年-月-日-小時(shí)-分鐘-秒-自增長(zhǎng)序列”的規(guī)則生成訂單編號(hào)
public String getOrderCode() {
Date now = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
return sdf.format(now) + ++i;
}
public static void main(String[] args) {
OrderCodeGenerator ong = new OrderCodeGenerator();
for (int i = 0; i < 10; i++) {
System.out.println(ong.getOrderCode());
}
}
}
用10個(gè)線程去訪問 OrderServiceImpl
public class OrderServiceImpl implements Runnable {
private static OrderCodeGenerator ong = new OrderCodeGenerator();
private Logger logger = LoggerFactory.getLogger(OrderServiceImpl.class);
// 同時(shí)并發(fā)的線程數(shù)
private static final int NUM = 10;
// 按照線程數(shù)初始化倒計(jì)數(shù)器,倒計(jì)數(shù)器
private static CountDownLatch cdl = new CountDownLatch(NUM);
// private static Lock lock = new ReentrantLock();
private Lock lock = new ImproveLock();
// 創(chuàng)建訂單接口
public void createOrder() {
String orderCode = null;
lock.lock();
try {
// 獲取訂單編號(hào)
orderCode = ong.getOrderCode();
} catch (Exception e) {
// TODO: handle exception
} finally {
lock.unlock();
}
// ……業(yè)務(wù)代碼,此處省略100行代碼
logger.info("insert into DB使用id:=======================>" + orderCode);
}
@Override
public void run() {
try {
// 等待其他線程初始化
cdl.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 創(chuàng)建訂單
createOrder();
}
public static void main(String[] args) {
for (int i = 1; i <= NUM; i++) {
// 按照線程數(shù)迭代實(shí)例化線程
new Thread(new OrderServiceImpl()).start();
// 創(chuàng)建一個(gè)線程,倒計(jì)數(shù)器減1
cdl.countDown();
}
}
}