ZK實現(xiàn)的分布式鎖

工作需要。寫了一個基于ZK的分布式鎖,記錄一下:

原理

zk能保證集群上的路徑同一時刻只有一個客戶端來創(chuàng)建。因此,通過在集群上順序創(chuàng)建和刪除臨時路徑,在實現(xiàn)分布式鎖的獲取和釋放。

代碼

zk上有一個客戶端框架Curator已經(jīng)對分布式互斥鎖進行了封裝,幾乎是開箱即用:

  • 封裝一下框架的初始化
public class ZKCuratorManager {
    private static InterProcessMutex lock;
    private static CuratorFramework cf;
    private static String zkAddr = "*.*.*.*:2181";
    private static String lockPath = "/distribute-lock";

    static {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        cf = CuratorFrameworkFactory.builder()
                .connectString(zkAddr)
                .sessionTimeoutMs(2000)
                .retryPolicy(retryPolicy)
                .build();
        cf.start();
    }

    public static InterProcessMutex getLock() {
        lock = new InterProcessMutex(cf, lockPath);
        return lock;
    }
}
  • 封裝一下工具類
public class ZKCuratorLockUtil {

    /**
     * 從配置類中獲取分布式鎖對象
     */
    private static InterProcessMutex lock = ZKCuratorManager.getLock();

    /**
     * 加鎖
     *
     * @return
     */
    public static boolean acquire() {
        try {
            lock.acquire();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return true;
    }

    /**
     * 鎖的釋放
     */
    public static void release() {
        try {
            lock.release();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • 測試
    • 所有線程啟動后sleep5秒鐘
    • 用CyclicBarrier使四個線程同時嘗試獲取鎖
    • 結(jié)果應該是四個線程依次獲取-釋放鎖
public class ZkLockTest {

    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier = new CyclicBarrier(N);
        for (int i = 0; i < N; i++) {

            new WriterTest(barrier).start();
        }

        System.out.println("END");
    }

    static class WriterTest extends Thread {
        private CyclicBarrier cyclicBarrier;

        public WriterTest(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("線程" + Thread.currentThread().getName() + "正在寫入數(shù)據(jù)...");
            try {
                //以睡眠來模擬寫入數(shù)據(jù)操作
                Thread.sleep(5000);
                System.out.println("線程" + Thread.currentThread().getName() + "寫入數(shù)據(jù)完畢,等待其他線程寫入完畢");
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("所有線程寫入完畢,繼續(xù)處理其他任務...");
            //加鎖
            ZKCuratorLockUtil.acquire();
            System.out.println("線程" + Thread.currentThread().getName() + "獲得分布式鎖");
            try {
                Thread.sleep(2000);
                ZKCuratorLockUtil.release();
                System.out.println("線程" + Thread.currentThread().getName() + "釋放分布式鎖");
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("END");
        }
    }
}
  • 測試結(jié)果
線程Thread-0正在寫入數(shù)據(jù)...
線程Thread-1正在寫入數(shù)據(jù)...
END
線程Thread-2正在寫入數(shù)據(jù)...
線程Thread-3正在寫入數(shù)據(jù)...
線程Thread-1寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-2寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-0寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
線程Thread-3寫入數(shù)據(jù)完畢,等待其他線程寫入完畢
所有線程寫入完畢,繼續(xù)處理其他任務...
所有線程寫入完畢,繼續(xù)處理其他任務...
所有線程寫入完畢,繼續(xù)處理其他任務...
所有線程寫入完畢,繼續(xù)處理其他任務...
線程Thread-3獲得分布式鎖
線程Thread-3釋放分布式鎖
END
線程Thread-1獲得分布式鎖
線程Thread-1釋放分布式鎖
END
線程Thread-2獲得分布式鎖
線程Thread-2釋放分布式鎖
END
線程Thread-0獲得分布式鎖
線程Thread-0釋放分布式鎖
END
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容