Zookeeper實(shí)現(xiàn)分布式鎖

1. zk分布式鎖原理

(參考:https://blog.csdn.net/xuefeng0707/article/details/80588855

zk使用臨時(shí)順序節(jié)點(diǎn)+Watch機(jī)制實(shí)現(xiàn)分布式鎖:

  • 加鎖:創(chuàng)建臨時(shí)順序節(jié)點(diǎn),如果該節(jié)點(diǎn)是最小節(jié)點(diǎn)則獲取到鎖,否則對(duì)上一個(gè)臨時(shí)節(jié)點(diǎn)注冊(cè)監(jiān)聽。
  • 解鎖:刪除節(jié)點(diǎn)。

下面我們?cè)敿?xì)分析下加鎖和解鎖的過程,暫不考慮可重入性和鎖超時(shí),參考了curator源碼。

1.1 加鎖

過程1:創(chuàng)建臨時(shí)順序節(jié)點(diǎn)
舉例子說明吧,在下面將要用到的例子中,我們會(huì)在“/lock/test/testZkLock/”目錄下創(chuàng)建臨時(shí)順序子節(jié)點(diǎn),子節(jié)點(diǎn)的名字不需要我們?nèi)ッ?,curator框架會(huì)自動(dòng)命名為“l(fā)ock-”,創(chuàng)建完臨時(shí)順序節(jié)點(diǎn)之后我們能看到這樣的結(jié)構(gòu):

image.png

如果是多個(gè)進(jìn)程或者線程競爭鎖資源,就會(huì)在“/lock/test/testZkLock/”目錄下依次創(chuàng)建臨時(shí)順序節(jié)點(diǎn),在一次運(yùn)行中我們啟動(dòng)3個(gè)線程,看到創(chuàng)建了3個(gè)臨時(shí)節(jié)點(diǎn):


image.png

過程2:競爭鎖
在這個(gè)過程中會(huì)判斷當(dāng)前線程創(chuàng)建的臨時(shí)順序節(jié)點(diǎn)是不是序號(hào)最小的那個(gè),如果是則獲取到鎖、繼續(xù)執(zhí)行后面的任務(wù),否則的話對(duì)比當(dāng)前序號(hào)小的節(jié)點(diǎn)注冊(cè)監(jiān)聽。舉例說明,假設(shè)在某一瞬間,某一共享資源下創(chuàng)建的臨時(shí)節(jié)點(diǎn)是這樣的:

lock_001
lock_002
lock_003

那么在加鎖時(shí),
lock_001的線程可以獲取到鎖,
lock_002在lock_001上注冊(cè)Watch監(jiān)聽,然后wait()進(jìn)入等待,
同理lock_003在lock_002上注冊(cè)監(jiān)聽。

原理圖出處

image.png

image.png

curator源碼:

image.png

1.2 解鎖

不考慮可重入的話,解鎖直接刪除臨時(shí)節(jié)點(diǎn):

client.delete().guaranteed().forPath(ourPath);

當(dāng)該臨時(shí)節(jié)點(diǎn)被刪除時(shí),注冊(cè)在該節(jié)點(diǎn)上的Watcher會(huì)監(jiān)聽到zk事件,然后執(zhí)行process()方法:


image.png

notifyFromWatcher()其實(shí)就一行代碼:


image.png

仍然舉上面的例子,假設(shè)目前臨時(shí)有序節(jié)點(diǎn)為:lock_001、lock_002、lock_003

當(dāng)lock_001因?yàn)獒尫沛i而被刪除的時(shí)候,會(huì)觸發(fā)zk通知機(jī)制,那么lock_002節(jié)點(diǎn)會(huì)收到事件通知,在收到通知的時(shí)候執(zhí)行“notifyAll();”,通知當(dāng)前JVM進(jìn)程中所有處于wait狀態(tài)的線程起來搶鎖。

分布式環(huán)境下有多個(gè)進(jìn)程,為什么只通知本進(jìn)程的就可以?

假設(shè):
節(jié)點(diǎn)2監(jiān)聽節(jié)點(diǎn)1是在進(jìn)程A中進(jìn)行的;
節(jié)點(diǎn)3監(jiān)聽節(jié)點(diǎn)2是在進(jìn)程B中進(jìn)行的;
當(dāng)節(jié)點(diǎn)1刪除的時(shí)候,通知進(jìn)程A節(jié)點(diǎn)2就能收到;
節(jié)點(diǎn)2刪除的時(shí)候,通知進(jìn)程B,節(jié)點(diǎn)3就能收到。
意思是注冊(cè)的監(jiān)聽就通知。

2. 使用Curator實(shí)現(xiàn)分布式鎖

Curator是Netfix公司開源的一套ZooKeeper客戶端框架,對(duì)zk底層的連接、監(jiān)聽等進(jìn)行了良好的封裝,并且還提供了分布式鎖API,因此我們不必自己實(shí)現(xiàn)上述復(fù)雜的理論,直接使用curator框架即可。

Curator分布式鎖是一種可重入鎖,實(shí)現(xiàn)了分布式的AQS,使用ConcurrentMap實(shí)現(xiàn)了一個(gè)類似ThreadLocal的功能,把線程(Thread.currentThread())作為key,鎖作為value,在加鎖時(shí),如果在key中找到該線程,就對(duì)value加1,解鎖時(shí)減1,減到0的時(shí)候刪除臨時(shí)節(jié)點(diǎn)、移除map中的該線程。

首先定義鎖接口:

package utils.distributed.lock;
 
public interface DistributedLock {
    /**
     * 阻塞式鎖
     * @return
     */
    void lock();
 
    /**
     * 非阻塞式鎖
     * @return
     */
    boolean tryLock();
 
    /**
     * 帶超時(shí)時(shí)間的阻塞式鎖
     * @param timeout
     * @return
     */
    boolean tryLock(long timeout);
 
    /**
     * 解鎖
     */
    void unLock() throws Exception;
 
    /**
     * 釋放資源
     */
    void shutdown();
}

然后實(shí)現(xiàn)一個(gè)zk鎖:

package utils.distributed.lock;
 
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
 
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
 
/**
 * zk分布式鎖
 */
public class ZKLock implements DistributedLock {
    private CuratorFramework client;
 
    private InterProcessMutex lock;
 
    public ZKLock(String host, String bizType, String lockKey) {
        client = CuratorFrameworkFactory.newClient(host,
                new ExponentialBackoffRetry(ZKLockConstant.BASE_SLEEP_TIME_MS, ZKLockConstant.MAX_RETRIES));
        client.start();
 
        String path = ZKLockConstant.ZK_SEPERATOR + StringUtils.join(Arrays.asList(ZKLockConstant.ZK_LOCK_BASE_PREFIX, bizType, lockKey), ZKLockConstant.ZK_SEPERATOR);
        lock = new InterProcessMutex(client, path);
    }
 
    public void lock() {
        try {
            lock.acquire();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    public boolean tryLock() {
        return tryLock(0);
    }
 
    public boolean tryLock(long timeout) {
        try {
            return lock.acquire(timeout, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
 
 
    public void unLock() throws Exception {
        lock.release();
    }
 
    public void shutdown() {
        client.close();
    }
}

3. 測試

3.1 測試lock
package mytest.distributed.lock;
 
import utils.distributed.lock.ZKLock;
 
import java.util.concurrent.CountDownLatch;
 
public class ZkLockTest {
    public static void main(String[] args) {
        String zkHost = "192.168.160.128:2181";
        String bizType = "test";
        String lockKey = "testZkLock";
        ZKLock zkLock = new ZKLock(zkHost, bizType, lockKey);
 
        // 啟動(dòng)3個(gè)線程模擬分布式鎖競爭
        CountDownLatch conutDownLatch = new CountDownLatch(3);
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                testZkLock(zkLock);
                conutDownLatch.countDown();
            }).start();
        }
 
        try {
            conutDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        zkLock.shutdown();
    }
 
    private static void testZkLock(ZKLock zkLock) {
        System.out.println("######## 開始加鎖,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
        zkLock.lock();
 
        try {
            System.out.println("######## 加鎖成功,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } finally {
            try {
                zkLock.unLock();
                System.out.println("######## 解鎖成功,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
            } catch (Exception e) {
                System.out.println("######## 解鎖失敗,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
            }
        }
    }
}

打印結(jié)果:


image.png

全部成功了。

3.2 測試tryLock()

復(fù)用上面的main方法,將testZkLock換成testZkLockTryLock

private static void testZkLockTryLock(ZKLock zkLock) {
    System.out.println("######## 開始加鎖,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
 
    if (!zkLock.tryLock()) {
        System.out.println("######## 加鎖失敗,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
        return;
    }
 
    try {
        System.out.println("######## 加鎖成功,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
    } finally {
        try {
            zkLock.unLock();
            System.out.println("######## 解鎖成功,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
        } catch (Exception e) {
            System.out.println("######## 解鎖失敗,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
        }
    }
}

打印結(jié)果:


image.png

因?yàn)橹粐L試一次,所以有失敗的情況發(fā)生。

3.3 測試tryLock(timeout)
private static void testZkLockTryLockTimeOut(ZKLock zkLock) {
    System.out.println("######## 開始加鎖,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
 
    if (!zkLock.tryLock(3000)) {
        System.out.println("######## 加鎖失敗,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
        return;
    }
 
    try {
        System.out.println("######## 加鎖成功,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    } finally {
        try {
            zkLock.unLock();
            System.out.println("######## 解鎖成功,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
        } catch (Exception e) {
            System.out.println("######## 解鎖失敗,線程信息:" + Thread.currentThread().getId() + "-" + Thread.currentThread().getName());
        }
    }
}

打印結(jié)果:


image.png

線程18獲取鎖之后要持有5s才會(huì)釋放,而線程16、17加鎖等待時(shí)間只有3s,因此會(huì)超時(shí)。

4. 分布式鎖注解

實(shí)現(xiàn)一個(gè)基于注解的分布式鎖:

4.1 定義分布式鎖注解
package utils.distributed.annotation;
 
import java.lang.annotation.*;
 
/**
 * ZK分布式鎖注解
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ZkLock {
    /**
     * zk地址
     * @return
     */
    String zkHost();
 
    /**
     * 業(yè)務(wù)類型
     */
    String bizType();
 
    /**
     * 鎖名稱
     *
     * @return
     */
    String lockKey();
 
    /**
     * 超時(shí)時(shí)間
     * @return
     */
    long timeout();
}

4.2 定義分布式鎖切面

package utils.distributed.advice;
 
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import utils.distributed.annotation.ZkLock;
import utils.distributed.lock.ZKLock;
 
/**
 * @Description zk分布式鎖切面
 * @Author lilong
 * @Date 2019-04-08 13:50
 */
@Component
@Aspect
public class ZkLockAspectAdvice {
    @Around(value = "@annotation(utils.distributed.annotation.ZkLock) && @annotation(zkLock)")
    public Object process(ProceedingJoinPoint pjp, ZkLock zkLock) throws Throwable {
        ZKLock lock = new ZKLock(zkLock.zkHost(), zkLock.bizType(), zkLock.lockKey());
 
        boolean acquired = false;
        try {
            acquired = lock.tryLock(zkLock.timeout());
            if (acquired) {
                return pjp.proceed();
            } else {
                System.out.println("######## 加鎖失敗,線程信息:" + Thread.currentThread().getId() + "-"
                        + Thread.currentThread().getName());
                return null;
            }
        } finally {
            if (acquired) {
                lock.unLock();
            }
        }
    }
}
4.3 使用注解
@Override
@ZkLock(zkHost = "192.168.160.128:2181", bizType = "test", lockKey = "queryKeyValue", timeout = 3000)
public KeyValueJsonPO queryKeyValue(String bizType, String key) {
    return keyValueJsonPOMapper.queryKeyValue(bizType, key);
}

https://blog.csdn.net/u010266988/article/details/89074155

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容