7、zk典型應(yīng)用場(chǎng)景之分布式鎖

分布式鎖:分布式鎖是控制分布式系統(tǒng)之間同步訪問(wèn)共享資源的一種方式,如果不同的系統(tǒng)化或者同一個(gè)系統(tǒng)的不同主機(jī)之間共享一個(gè)或者一組資源,那么在訪問(wèn)這些資源的時(shí)候,往往需要通過(guò)一些互斥手段防止彼此之間的干擾,以保證一致性,這種情況就需要使用分布式鎖。

1、排他鎖實(shí)現(xiàn)

排他鎖:核心是如何保證當(dāng)前有且僅有一個(gè)事務(wù)獲取到鎖,并且鎖釋放之后,所有正在等待獲取鎖的事務(wù)都能夠被通知到。
定義和獲取鎖:在zk中,通過(guò)一個(gè)數(shù)據(jù)節(jié)點(diǎn)來(lái)表示一個(gè)鎖(因?yàn)樵趜k中數(shù)據(jù)節(jié)點(diǎn)是唯一的),zk會(huì)保證在所有客戶端中,最終只有一個(gè)客戶端能夠創(chuàng)建一個(gè)節(jié)點(diǎn)成功,最終只有一個(gè)客戶端能夠創(chuàng)建成功,那么久可以認(rèn)為這個(gè)客戶端獲得了鎖。同時(shí)沒(méi)有獲取到鎖的客戶端就需要到相應(yīng)節(jié)點(diǎn)上注冊(cè)一個(gè)子節(jié)點(diǎn)變更的watcher監(jiān)聽(tīng),以便實(shí)時(shí)的監(jiān)聽(tīng)lock節(jié)點(diǎn)的變更情況。
釋放鎖:由于定義鎖的節(jié)點(diǎn)是一個(gè)臨時(shí)節(jié)點(diǎn),因此存在兩種情況釋放鎖1、當(dāng)前獲取鎖的客戶端機(jī)器發(fā)生宕機(jī)了。zk就會(huì)把該臨時(shí)節(jié)點(diǎn)刪除2、正常執(zhí)行完業(yè)務(wù)邏輯,客戶端會(huì)主動(dòng)將自己創(chuàng)建的臨時(shí)節(jié)點(diǎn)刪除。而無(wú)論什么情況下面刪除了節(jié)點(diǎn),zk都會(huì)通知所有在該節(jié)點(diǎn)上注冊(cè)了子節(jié)點(diǎn)變更watcher將的客戶端。這些客戶端在接收到通知后,會(huì)再次重新發(fā)起分布式鎖獲取,即重復(fù)“獲取鎖過(guò)程”。
(獲取鎖和釋放鎖流程)

2、共享鎖實(shí)現(xiàn)

共享鎖:稱讀鎖。如果事務(wù)T對(duì)數(shù)據(jù)對(duì)象O加了共享鎖,那么當(dāng)前的事務(wù)只能對(duì)O進(jìn)行讀取操作,其他事務(wù)也只能對(duì)這個(gè)數(shù)據(jù)對(duì)象加共享鎖-直到該數(shù)據(jù)對(duì)象上的所有共享鎖都被釋放。
定義鎖:同樣是使用zk上的數(shù)據(jù)節(jié)點(diǎn)來(lái)表示一個(gè)鎖,是一個(gè)類似“/share_lock/[hostname]-請(qǐng)求類型-序號(hào)”的臨時(shí)順序節(jié)點(diǎn)。
獲取鎖:在需要獲取共享鎖時(shí)候,所有客戶端都會(huì)到/share_lock這個(gè)節(jié)點(diǎn)下面創(chuàng)建一個(gè)臨時(shí)順序節(jié)點(diǎn),如果當(dāng)前是讀請(qǐng)求,那么就是例如/share_lock/192.168.0.1-R-0000000001的節(jié)點(diǎn);如果是寫(xiě)請(qǐng)求,那么就會(huì)創(chuàng)建例如/share_lock/192.168.0.1-W-0000000001的節(jié)點(diǎn)。
判斷讀寫(xiě)順序:不同事務(wù)都可以同時(shí)對(duì)一個(gè)數(shù)據(jù)對(duì)象進(jìn)行讀寫(xiě)操作,而更新操作必須在當(dāng)前沒(méi)有任何事務(wù)進(jìn)行讀寫(xiě)操作的情況下面進(jìn)行?;谶@個(gè)原則可以通過(guò)下面四個(gè)步驟來(lái)確定分布式讀寫(xiě)順序:

1、客戶端調(diào)用create方法創(chuàng)建一個(gè)臨時(shí)順序節(jié)點(diǎn)
2、客戶端調(diào)用getChildren接口來(lái)獲取所有已經(jīng)創(chuàng)建的子節(jié)點(diǎn)列表(注意這里不注冊(cè)任何的watcher)
3、如果無(wú)法獲取共享鎖,那么就調(diào)用exist來(lái)對(duì)比比自己小的那個(gè)節(jié)點(diǎn)注冊(cè)watcher。
對(duì)于讀請(qǐng)求:向比自己序號(hào)小的最后一個(gè)寫(xiě)請(qǐng)求節(jié)點(diǎn)注冊(cè)watcher監(jiān)聽(tīng)
對(duì)于寫(xiě)請(qǐng)求:向比自己序號(hào)小的最后一個(gè)節(jié)點(diǎn)注冊(cè)watcher監(jiān)聽(tīng)
4、等待watcher通知,繼續(xù)進(jìn)入步驟2

釋放鎖:和排他鎖一樣。

3、使用curator實(shí)現(xiàn)分布式鎖案例
  • 以一個(gè)“流水號(hào)生成“的場(chǎng)景為例”:
    沒(méi)加鎖時(shí)候:
public class LockTest {
    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(1);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    latch.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss|SSS");
                String orderNo = format.format(new Date());
                System.out.println("生成的訂單號(hào):" + orderNo);
            }).start();
        }
        latch.countDown();
    }
}
結(jié)果:
生成的訂單號(hào):21:33:22|161
生成的訂單號(hào):21:33:22|160
生成的訂單號(hào):21:33:22|160
生成的訂單號(hào):21:33:22|161
生成的訂單號(hào):21:33:22|161
生成的訂單號(hào):21:33:22|160
生成的訂單號(hào):21:33:22|161
生成的訂單號(hào):21:33:22|160
生成的訂單號(hào):21:33:22|161
生成的訂單號(hào):21:33:22|161

加鎖情況:

public class LockTest {
    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();
    private static String orderNo = "1";

    public static void main(String[] args) {
        client.start();
        InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
        CountDownLatch latch = new CountDownLatch(1);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    latch.await();
                    //加鎖
                    mutex.acquire();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss|SSS");
                String orderNo = format.format(new Date());
                System.out.println("生成的訂單號(hào):" + orderNo);
                try {
                    mutex.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
        latch.countDown();
    }
}
4、深入理解curator實(shí)現(xiàn)的分布式鎖
curator分布式鎖分類

InterProcessMutex:分布式可重入排它鎖
InterProcessSemaphoreMutex:分布式排它鎖
InterProcessReadWriteLock:分布式讀寫(xiě)鎖
InterProcessMultiLock:將多個(gè)鎖作為單個(gè)實(shí)體管理的容器

  • 這里重點(diǎn)說(shuō)明InterProcessMutex,InterProcessMutex鎖是一種可重入鎖,采用介紹的共享鎖實(shí)現(xiàn)原理實(shí)現(xiàn)的。

(1)實(shí)現(xiàn)流程

加鎖和解鎖流程圖

1、A創(chuàng)建臨時(shí)順序節(jié)點(diǎn)
2、判斷是否加鎖(是否是第一個(gè)臨時(shí)子節(jié)點(diǎn)),此時(shí)A加鎖成功
3、B創(chuàng)建臨時(shí)順序節(jié)點(diǎn)
4、判斷是否加鎖成功,此時(shí)B加鎖失?。ㄒ?yàn)锳獲取到了鎖)
5、B加鎖失敗后監(jiān)聽(tīng)上一個(gè)節(jié)點(diǎn)的變化(也就是監(jiān)聽(tīng)A節(jié)點(diǎn),加上相應(yīng)的watcher)
6、A完成邏輯,釋放鎖,刪除對(duì)應(yīng)臨時(shí)順序節(jié)點(diǎn)
7、zk通知客戶端B的監(jiān)聽(tīng)器,A客戶端的臨時(shí)節(jié)點(diǎn)刪除了
8、客戶端B再次嘗試去獲取鎖,此時(shí)B字節(jié)的順序是最小的,獲取鎖成功(同時(shí)如果有C重復(fù)步驟4)

(2)源碼解讀

1、InterProcessMutex構(gòu)造函數(shù)

 private final String basePath;
    private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData;
    private static final String LOCK_NAME = "lock-";
 InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
      //1、創(chuàng)建數(shù)據(jù)容器
        this.threadData = Maps.newConcurrentMap();
    //2、驗(yàn)證傳入父路徑的合法性
        this.basePath = PathUtils.validatePath(path);
   //3、初始化LockInternals(LockInternals是加鎖和解鎖的實(shí)現(xiàn)類)
        this.internals = new LockInternals(client, driver, path, lockName, maxLeases);
    }
...
public class LockInternals {
    private final CuratorFramework client;
//臨時(shí)順序子節(jié)點(diǎn)
    private final String path;
//父節(jié)點(diǎn)
    private final String basePath;
    private final LockInternalsDriver driver;
    private final String lockName;
    private volatile int maxLeases;
....

2、加鎖

InterProcessMutex.class

//入?yún)榭眨{(diào)用該方法后,會(huì)一直堵塞,直到搶奪到鎖資源,或者zookeeper連接中斷后,上拋異常。
 public void acquire() throws Exception {
        if (!this.internalLock(-1L, (TimeUnit)null)) {
            throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
        }
    }
//入?yún)魅氤瑫r(shí)時(shí)間以及單位,搶奪時(shí),如果出現(xiàn)堵塞,會(huì)在超過(guò)該時(shí)間后,返回false。
public boolean acquire(long time, TimeUnit unit) throws Exception {
        return this.internalLock(time, unit);
    }

3、鎖的可重入性

private boolean internalLock(long time, TimeUnit unit) throws Exception {
//每個(gè)InterProcessMutex實(shí)例,都會(huì)持有一個(gè)ConcurrentMap類型的threadData對(duì)象,
//以線程對(duì)象作為Key,以LockData作為Value值。
//通過(guò)判斷當(dāng)前線程threadData是否有值,如果有,則表示線程可以重入
//該鎖,于是將lockData的lockCount進(jìn)行累加;如果沒(méi)有,則進(jìn)行鎖的搶奪。
        Thread currentThread = Thread.currentThread();
        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
        if (lockData != null) {
            lockData.lockCount.incrementAndGet();
            return true;
        } else {
            String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
//internals.attemptLock方法返回lockPath!=null時(shí),表明了該線程已經(jīng)成功
//持有了這把鎖,于是乎LockData對(duì)象被new了出來(lái),并存放到threadData中。
            if (lockPath != null) {
                InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
                this.threadData.put(currentThread, newLockData);
                return true;
            } else {
                return false;
            }
        }
    }

4、搶奪鎖

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        long startMillis = System.currentTimeMillis();
        Long millisToWait = unit != null ? unit.toMillis(time) : null;
        byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
        int retryCount = 0;
        String ourPath = null;
        boolean hasTheLock = false;
        boolean isDone = false;
//正常情況下,這個(gè)循環(huán)會(huì)在下一次結(jié)束。但是當(dāng)出現(xiàn)NoNodeException
//異常時(shí),會(huì)根據(jù)zookeeper客戶端的重試策略,進(jìn)行有限次數(shù)的重新獲取鎖。
        while(!isDone) {
            isDone = true;

            try {
//創(chuàng)建這個(gè)鎖,即在zookeeper的指定路徑上,創(chuàng)建一個(gè)臨時(shí)序列節(jié)點(diǎn)。
                ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
//判斷是否獲取到鎖
                hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
            } catch (NoNodeException var14) {
                if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
                    throw var14;
                }

                isDone = false;
            }
        }

        return hasTheLock ? ourPath : null;
    }

  private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;

        try {
//添加watcher
            if (this.revocable.get() != null) {
                ((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
            }
//如果你一開(kāi)始使用無(wú)參的acquire方法,那么此處的循環(huán)可能就是一個(gè)死
//循環(huán)。當(dāng)zookeeper客戶端啟動(dòng)時(shí),并且當(dāng)前線程還沒(méi)有成功獲取到鎖時(shí),就會(huì)開(kāi)始新的一輪循環(huán)。
            while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
//,就是獲取到所有子節(jié)點(diǎn)列表,并且從小到大根據(jù)節(jié)點(diǎn)名稱后10位數(shù)字進(jìn)行排序。
                List<String> children = this.getSortedChildren();
                String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
//判斷是否可以持有鎖,
                PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
                if (predicateResults.getsTheLock()) {
                    haveTheLock = true;
                } else {
//前面字節(jié)點(diǎn)的path
                    String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
                    synchronized(this) {
                        try {
 //首先添加一個(gè)watcher監(jiān)聽(tīng),而監(jiān)聽(tīng)的地址正是上面一步返回的pathToWatch進(jìn)行basePath + "/" 拼接以后的地址。                           ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
                            if (millisToWait == null) {
//wait(millisToWait)。線程交出cpu的占用,進(jìn)入等待狀態(tài),等到被喚醒。
                                this.wait();
                            } else {
                                millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if (millisToWait > 0L) {
                                    this.wait(millisToWait);
                                } else {
                                    doDelete = true;
                                    break;
                                }
                            }
                        } catch (NoNodeException var19) {
                            ;
                        }
                    }
                }
            }
        } catch (Exception var21) {
            ThreadUtils.checkInterrupted(var21);
            doDelete = true;
            throw var21;
        } finally {
            if (doDelete) {
                this.deleteOurPath(ourPath);
            }

        }

        return haveTheLock;
    }
//判斷是否可以持有鎖,判斷規(guī)則:當(dāng)前創(chuàng)建的節(jié)點(diǎn)是否在上一步獲取到的子節(jié)點(diǎn)列表的首位。
//如果是,說(shuō)明可以持有鎖,那么getsTheLock = true,封裝進(jìn)PredicateResults返回。
//如果不是,說(shuō)明有其他線程早已先持有了鎖,那么getsTheLock = false,此處還需要獲取到自己前一個(gè)臨時(shí)節(jié)點(diǎn)的名稱pathToWatch。
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
        int ourIndex = children.indexOf(sequenceNodeName);
        validateOurIndex(sequenceNodeName, ourIndex);
        boolean getsTheLock = ourIndex < maxLeases;
        String pathToWatch = getsTheLock ? null : (String)children.get(ourIndex - maxLeases);
        return new PredicateResults(pathToWatch, getsTheLock);
    }
5、釋放鎖
 public void release() throws Exception {
        Thread currentThread = Thread.currentThread();
        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
        if (lockData == null) {
            throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
        } else {
//減少重入鎖的計(jì)數(shù),直到變成0。
            int newLockCount = lockData.lockCount.decrementAndGet();
            if (newLockCount <= 0) {
                if (newLockCount < 0) {
                    throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
                } else {
                    try {
//釋放鎖,即移除移除Watchers & 刪除創(chuàng)建的節(jié)點(diǎn)
                        this.internals.releaseLock(lockData.lockPath);
                    } finally {
//從threadData中,刪除自己線程的緩存
                        this.threadData.remove(currentThread);
                    }

                }
            }
        }
    }

參考資料:
《從Paxos到Zookeeper 分布式一致性原理與實(shí)踐》》
http://www.itdecent.cn/p/6618471f6e75
https://juejin.im/post/5c01532ef265da61362232ed

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 此文知識(shí)來(lái)自于:《從Paxos到Zookeeper分布式一致性原理與實(shí)踐》第六章 集群管理(子節(jié)點(diǎn)) Master...
    李文文丶閱讀 706評(píng)論 0 1
  • 微信原文: 利用Zookeeper實(shí)現(xiàn) - 分布式鎖 博客原文:利用Zookeeper實(shí)現(xiàn) - 分布式鎖 在許...
    小旋鋒的簡(jiǎn)書(shū)閱讀 12,590評(píng)論 3 44
  • 胡塞孤煙漫,寒沙郁目迎。 古原君不適,長(zhǎng)恨意難呈。 解卻成名器,隱于丘野坪。 風(fēng)吹弓半響,月照草全傾。 良久無(wú)實(shí)至...
    瀟寒月閱讀 393評(píng)論 13 16
  • 一樹(shù)榴花分外明,芙蓉出水濯泥清。 驕陽(yáng)正午聽(tīng)風(fēng)寂,偶聞蟬鳴三兩聲。
    蘭貭冰心閱讀 575評(píng)論 3 16
  • 昨夜的一場(chǎng)雪,給整個(gè)世界都披上了一層銀裝。這銀裝素裹的世界好美啊。那些攝影愛(ài)好者們紛紛曬出了他們的大作,讓朋友們分...
    李永吉閱讀 289評(píng)論 0 0

友情鏈接更多精彩內(nèi)容