Zookeeper實現(xiàn)分布式鎖

前幾天分析了一下三種分布式鎖的實現(xiàn),但是沒有利用zookeeper實現(xiàn)一個分布式鎖,因為感覺基于Zookeeper實現(xiàn)分布式鎖還是稍微復(fù)雜的,同時也需要使用Watcher機制,所以就單獨搞一篇Zookeeper實現(xiàn)的分布式鎖。

首先,第一種實現(xiàn)。我們可以利用Zookeeper不能重復(fù)創(chuàng)建一個節(jié)點的特性來實現(xiàn)一個分布式鎖,這看起來和redis實現(xiàn)分布式鎖很像。但是也是有差異的,后面會詳細分析。
主要流程圖如下:

Paste_Image.png

上面的流程很簡單:

  1. 查看目標(biāo)Node是否已經(jīng)創(chuàng)建,已經(jīng)創(chuàng)建,那么等待鎖。
  2. 如果未創(chuàng)建,創(chuàng)建一個瞬時Node,表示已經(jīng)占有鎖。
  3. 如果創(chuàng)建失敗,那么證明鎖已經(jīng)被其他線程占有了,那么同樣等待鎖。
  4. 當(dāng)釋放鎖,或者當(dāng)前Session超時的時候,節(jié)點被刪除,喚醒之前等待鎖的線程去爭搶鎖。

上面是一個完整的流程,簡單的代碼實現(xiàn)如下:

package com.codertom.params.engine;

import com.google.common.base.Strings;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;

/**
 * Zookeepr實現(xiàn)分布式鎖
 */
public class LockTest {

    private String zkQurom = "localhost:2181";

    private String lockNameSpace = "/mylock";

    private String nodeString = lockNameSpace + "/test1";

    private Lock mainLock;

    private ZooKeeper zk;

    public LockTest(){
        try {
            zk = new ZooKeeper(zkQurom, 6000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("Receive event "+watchedEvent);
                    if(Event.KeeperState.SyncConnected == watchedEvent.getState())
                        System.out.println("connection is established...");
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }


    }

    private void ensureRootPath() throws InterruptedException {
        try {
            if (zk.exists(lockNameSpace,true)==null){
                zk.create(lockNameSpace,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    private void watchNode(String nodeString, final Thread thread) throws InterruptedException {
        try {
            zk.exists(nodeString, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println( "==" + watchedEvent.toString());
                    if(watchedEvent.getType() == Event.EventType.NodeDeleted){
                        System.out.println("Threre is a Thread released Lock==============");
                        thread.interrupt();
                    }
                    try {
                        zk.exists(nodeString,new Watcher() {
                            @Override
                            public void process(WatchedEvent watchedEvent) {
                                System.out.println( "==" + watchedEvent.toString());
                                if(watchedEvent.getType() == Event.EventType.NodeDeleted){
                                    System.out.println("Threre is a Thread released Lock==============");
                                    thread.interrupt();
                                }
                                try {
                                    zk.exists(nodeString,true);
                                } catch (KeeperException e) {
                                    e.printStackTrace();
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }

                        });
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            });
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    /**
     * 獲取鎖
     * @return
     * @throws InterruptedException
     */
    public boolean lock() throws InterruptedException {
        String path = null;
        ensureRootPath();
        watchNode(nodeString,Thread.currentThread());
        while (true) {
            try {
                path = zk.create(nodeString, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            } catch (KeeperException e) {
                System.out.println(Thread.currentThread().getName() + "  getting Lock but can not get");
                try {
                    Thread.sleep(5000);
                }catch (InterruptedException ex){
                    System.out.println("thread is notify");
                }
            }
            if (!Strings.nullToEmpty(path).trim().isEmpty()) {
                System.out.println(Thread.currentThread().getName() + "  get Lock...");
                return true;
            }
        }
    }

    /**
     * 釋放鎖
     */
    public void unlock(){
        try {
            zk.delete(nodeString,-1);
            System.out.println("Thread.currentThread().getName() +  release Lock...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public static void main(String args[]) throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(10);
        for (int i = 0;i<4;i++){
            service.execute(()-> {
                LockTest test = new LockTest();
                try {
                    test.lock();
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                test.unlock();
            });
        }
        service.shutdown();
    }
}

代碼比較糙,但是大致的實現(xiàn)思路和上述一致,這里需要注意:

  1. 因為使用的是原生的Zookeeper API實現(xiàn),Watch需要重復(fù)的設(shè)置,所以代碼復(fù)雜的些。
  2. 喚醒直接用的Thread.interupt這樣其實控制流程其實是不好的。

其實上面的實現(xiàn)有優(yōu)點也有缺點:
優(yōu)點:
實現(xiàn)比較簡單,有通知機制,能提供較快的響應(yīng),有點類似reentrantlock的思想,對于節(jié)點刪除失敗的場景由Session超時保證節(jié)點能夠刪除掉。
缺點:
重量級,同時在大量鎖的情況下會有“驚群”的問題。

“驚群”就是在一個節(jié)點刪除的時候,大量對這個節(jié)點的刪除動作有訂閱Watcher的線程會進行回調(diào),這對Zk集群是十分不利的。所以需要避免這種現(xiàn)象的發(fā)生。

解決“驚群”:

為了解決“驚群“問題,我們需要放棄訂閱一個節(jié)點的策略,那么怎么做呢?

  1. 我們將鎖抽象成目錄,多個線程在此目錄下創(chuàng)建瞬時的順序節(jié)點,因為Zk會為我們保證節(jié)點的順序性,所以可以利用節(jié)點的順序進行鎖的判斷。
  2. 首先創(chuàng)建順序節(jié)點,然后獲取當(dāng)前目錄下最小的節(jié)點,判斷最小節(jié)點是不是當(dāng)前節(jié)點,如果是那么獲取鎖成功,如果不是那么獲取鎖失敗。
  3. 獲取鎖失敗的節(jié)點獲取當(dāng)前節(jié)點上一個順序節(jié)點,對此節(jié)點注冊監(jiān)聽,當(dāng)節(jié)點刪除的時候通知當(dāng)前節(jié)點。
  4. 當(dāng)unlock的時候刪除節(jié)點之后會通知下一個節(jié)點。

上面的實現(xiàn)和reentrantlock的公平鎖實現(xiàn)還是比較類似的,下面是簡單的實現(xiàn):

package com.codertom.params.engine;

import com.google.common.base.Strings;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by zhiming on 2017-02-05.
 */
public class FairLockTest {

    private String zkQurom = "localhost:2181";

    private String lockName = "/mylock";

    private String lockZnode = null;

    private ZooKeeper zk;

    public FairLockTest(){
        try {
            zk = new ZooKeeper(zkQurom, 6000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("Receive event "+watchedEvent);
                    if(Event.KeeperState.SyncConnected == watchedEvent.getState())
                        System.out.println("connection is established...");
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }


    }

    private void ensureRootPath(){
        try {
            if (zk.exists(lockName,true)==null){
                zk.create(lockName,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 獲取鎖
     * @return
     * @throws InterruptedException
     */
    public void lock(){
        String path = null;
        ensureRootPath();
            try {
                path = zk.create(lockName+"/mylock_", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                lockZnode = path;
                List<String> minPath = zk.getChildren(lockName,false);
                System.out.println(minPath);
                Collections.sort(minPath);
                System.out.println(minPath.get(0)+" and path "+path);
                if (!Strings.nullToEmpty(path).trim().isEmpty()&&!Strings.nullToEmpty(minPath.get(0)).trim().isEmpty()&&path.equals(lockName+"/"+minPath.get(0))) {
                    System.out.println(Thread.currentThread().getName() + "  get Lock...");
                    return;
                }
                String watchNode = null;
                for (int i=minPath.size()-1;i>=0;i--){
                    if(minPath.get(i).compareTo(path.substring(path.lastIndexOf("/") + 1))<0){
                        watchNode = minPath.get(i);
                        break;
                    }
                }

                if (watchNode!=null){
                    final String watchNodeTmp = watchNode;
                    final Thread thread = Thread.currentThread();
                    Stat stat = zk.exists(lockName + "/" + watchNodeTmp,new Watcher() {
                        @Override
                        public void process(WatchedEvent watchedEvent) {
                            if(watchedEvent.getType() == Event.EventType.NodeDeleted){
                                thread.interrupt();
                            }
                            try {
                                zk.exists(lockName + "/" + watchNodeTmp,true);
                            } catch (KeeperException e) {
                                e.printStackTrace();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }

                    });
                    if(stat != null){
                        System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + lockName + "/" + watchNode);
                    }
                }
                try {
                    Thread.sleep(1000000000);
                }catch (InterruptedException ex){
                    System.out.println(Thread.currentThread().getName() + " notify");
                    System.out.println(Thread.currentThread().getName() + "  get Lock...");
                    return;
                }

            } catch (Exception e) {
               e.printStackTrace();
            }
    }

    /**
     * 釋放鎖
     */
    public void unlock(){
        try {
            System.out.println(Thread.currentThread().getName() +  "release Lock...");
            zk.delete(lockZnode,-1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }



    public static void main(String args[]) throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(10);
        for (int i = 0;i<4;i++){
            service.execute(()-> {
                FairLockTest test = new FairLockTest();
                try {
                    test.lock();
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                test.unlock();
            });
        }
        service.shutdown();
    }

}

同樣上面的程序也有幾點需要注意:

  1. Zookeeper的API沒有提供直接的獲取上一個節(jié)點或者最小節(jié)點的API需要我們自己實現(xiàn)。
  2. 使用了interrupt做線程的喚醒,這樣不科學(xué),因為不想將JVM的lock引進來所以沒有用countdownlatch來做流程控制。
  3. Watch也是要重新設(shè)置的,這里使用了Watch的復(fù)用,所以代碼簡單些。

其實上面的實現(xiàn)還是很復(fù)雜的,因為你需要反復(fù)的去關(guān)注Watcher,實現(xiàn)一個Demo可以,做一個生產(chǎn)環(huán)境可用的Lock并不容易。因為你的代碼bug在生產(chǎn)環(huán)境上會引起很嚴(yán)重的bug。

其實對于Zookeeper的一些常用功能是有一些成熟的包實現(xiàn)的,像Curator。Curator的確是足夠牛逼,不僅封裝了Zookeeper的常用API,也包裝了很多常用Case的實現(xiàn)。但是它的編程風(fēng)格其實還是吧比較難以接受的。

可以用Curator輕易的實現(xiàn)一個分布式鎖:

InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) ) 
{
    try 
    {
        // do some work inside of the critical section here
    }
    finally
    {
        lock.release();
    }
}

是的就這么簡單,一個直接拿過來可用的輪子。

基于Zookeeper的分布式鎖就說完了。基于Zookeeper實現(xiàn)分布式鎖,其實是不常用的。雖然它實現(xiàn)鎖十分優(yōu)雅,但編程復(fù)雜,同時還要單獨維護一套Zookeeper集群,頻繁的Watch對Zookeeper集群的壓力還是蠻大的,如果不是原有的項目以來Zookeeper,同時鎖的量級比較小的話,還是不用為妙。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容