利用zookeeper實現(xiàn)緩存更新功能

  最近在用了堆外緩存之后,由于存在著緩存不更新的風險,有時候需要對緩存進行處理。該域用的是公司開發(fā)服務(wù)平臺框架,有點類似Tomcat運用。
  費勁腦汁,暫時想到兩種方案:
  1. 開發(fā)一個專門用于運維的接口,每次需要運維時,指定ip進行(若不指定ip,由于有多臺部署機器,請求路由到完全不可知的機器上)

  2. 采用公司分布式配置依賴最多的,功能也相對強大的zookeeper框架。運用zk節(jié)點的內(nèi)容變化時的及時通知機制。

兩種方案的比較

優(yōu)點 缺點
方案一 實施起來簡單 沒有挑戰(zhàn)性,需要知道所有ip,其次ip改變就比較難維護了
方案二 修改節(jié)點值,客戶端能及時監(jiān)聽到,從而能做出相應(yīng)的操作,比如在本例中可以進行緩存的更新,而不需要知道客戶端的ip 需要有操作zk的操作權(quán)限

經(jīng)過上面的比較,決定采用方法二(假定已申請到zk的操作權(quán)限)

動手前準備

1.下載zookeeper-3.3.6,解壓后看到

image

2. 進入conf目錄下,新建一個zoo.cfg文件,并寫入

# The number of milliseconds of each tick  心跳間隔 毫秒每次
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting anacknowledgement
syncLimit=5
# the directory where the snapshot isstored.  //鏡像數(shù)據(jù)位置
dataDir=E:\\develop-tools\\zookeeper-3.3.6\\data
#日志位置
dataLogDir=E:\\develop-tools\\zookeeper-3.3.6\\logs
# the port at which the clients willconnect  客戶端連接的端口
clientPort=2181

server.0=127.0.0.1:8880:7770
#server.1=127.0.0.1:8881:7771
#server.2=127.0.0.1:8882:7772

3. 雙擊bin目錄下的 zkServer.cmd,即可在Windows環(huán)境下啟動一個zk服務(wù)端。(zkCli的用法,大家可以自行學習)

開始動手

為了代碼優(yōu)雅,由于ZooKeeper原生客戶端的各類操作方法比較繁瑣,用戶體驗不好,下面我們用Curator+Spring實現(xiàn)demo:

1.封裝獲取zooKeeper客戶端連接的工廠類:ZookeeperFactory

package com.vip.fcs.ps.zk.demo;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.util.StringUtils;

/**
 * 獲取zookeeper客戶端鏈接工廠類
 */
public class ZookeeperFactory implements FactoryBean<CuratorFramework> {

    private String zkHosts;
    // session超時
    private int sessionTimeout = 30000;
    private int connectionTimeout = 30000;

    // 共享一個zk鏈接
    private boolean singleton = true;

    // 全局path前綴,常用來區(qū)分不同的應(yīng)用
    private String namespace;

    private final static String ROOT = "vip";

    private CuratorFramework zkClient;

    public void setZkHosts(String zkHosts) {
        this.zkHosts = zkHosts;
    }

    public void setSessionTimeout(int sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    public void setConnectionTimeout(int connectionTimeout) {
        this.connectionTimeout = connectionTimeout;
    }

    public void setSingleton(boolean singleton) {
        this.singleton = singleton;
    }

    public void setNamespace(String namespace) {
        this.namespace = namespace;
    }

    @Override
    public CuratorFramework getObject() throws Exception {
        if (singleton) {
            if (zkClient == null) {
                zkClient = create();
                zkClient.start();
            }
            return zkClient;
        }
        return create();
    }

    @Override
    public Class<?> getObjectType() {
        return CuratorFramework.class;
    }

    @Override
    public boolean isSingleton() {
        return singleton;
    }

    public CuratorFramework create() throws Exception {
        if (StringUtils.isEmpty(namespace)) {
            namespace = ROOT;
        } else {
            namespace = ROOT + "/" + namespace;
        }
        return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
    }

    public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout,
            String namespace) {
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
        return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
                .canBeReadOnly(true).namespace(namespace)
                .retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE)).defaultData(null).build();
    }

    public void close() {
        if (zkClient != null) {
            zkClient.close();
        }
    }
}

2.zookeeper的操作類:ZkHandler

package com.vip.fcs.ps.zk.demo;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.InitializingBean;

/**
 * 結(jié)合spring,封裝了zk的相關(guān)操作
 */
public class ZkHandler implements InitializingBean {
    private CuratorFramework zkClient;
    private String service = "ps";
    private String version = "1.0";

    private TreeCache treeCache;

    public void setZkClient(CuratorFramework zkClient) {
        this.zkClient = zkClient;
    }

    @Override
    public void afterPropertiesSet() throws Exception {

        // 如果zk尚未啟動,則啟動
        if (zkClient.getState() == CuratorFrameworkState.LATENT) {
            zkClient.start();
        }
        buildTreeCache(zkClient, getServicePath());
        // 開始監(jiān)聽
        treeCache.start();

    }

    /**
     * 監(jiān)聽的節(jié)點
     * @return
     */
    private String getServicePath() {
        return "/" + service + "/" + version;
    }

    private void buildTreeCache(final CuratorFramework zkClient, String path) {
        // 設(shè)置節(jié)點的cache
        treeCache = new TreeCache(zkClient, path);
        // 設(shè)置監(jiān)聽器和處理過程
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                ChildData data = event.getData();
                String log = ("NODE_ADDED : " + data.getPath() + " --->DATA:" + new String(data.getData()));
                if (data != null) {
                    switch (event.getType()) {
                    case NODE_ADDED:
                        System.out.println(log);
                        // do sth
                        break;
                    case NODE_REMOVED:
                        System.out.println(log);
                        // do sth
                        break;
                    case NODE_UPDATED:
                        System.out.println(log);
                        // do sth
                        break;
                    default:
                        break;
                    }
                } else {
                    System.out.println("data is NULL" + event.getType());
                }
            }
        });
    }

    public void createPersistentNode() throws Exception {
        if (zkClient.checkExists().forPath(getServicePath()) == null) {
            zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT)
                    .forPath(getServicePath());
        }
    }

    public void changeNodeData(String nodeData) throws Exception {
        if (zkClient.checkExists().forPath(getServicePath()) != null) {
            zkClient.setData().forPath(getServicePath(), nodeData.getBytes());
        }
    }

}
  1. 下面我們再創(chuàng)建3個客戶端(2個watcher+一個用來改變節(jié)點值的client)

3.1 ZkWatcherOne

package com.vip.fcs.ps.zk.demo;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ZkWatcherOne {

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
        ZkHandler zkHandler = (ZkHandler) context.getBean("zkHandler");
        while (true) {
            Thread.sleep(2000L);
        }
    }
}

3.2 ZkWatcherTwo

package com.vip.fcs.ps.zk.demo;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ZkWatcherTwo {

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
        ZkHandler zkHandler = (ZkHandler) context.getBean("zkHandler");
        while (true) {
            Thread.sleep(2000L);
        }
    }
}

3.3 ZkSetClient

package com.vip.fcs.ps.zk.demo;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ZkSetClient {

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
        ZkHandler zkHandler = (ZkHandler) context.getBean("zkHandler");
        zkHandler.createPersistentNode();
        for (int i = 0; i < 10; i++) {
            zkHandler.changeNodeData("time" + i);
            Thread.sleep(2000L);
        }
    }
}

4.開始運行相關(guān)代碼

4.1 啟動ZkWatcherOne, 控制臺打印日志

NODE_ADDED : /ps/1.0 --->DATA:

4.2 啟動ZkWatcherTwo, 控制臺打印日志

NODE_ADDED : /ps/1.0 --->DATA:

4.3 這時候啟動ZkSetClient,在ZkWatcherOne和ZkWatcherTwo的控制臺最終打印出如下的日志

image

由此可見,當一個節(jié)點值改變時,其他節(jié)點都能接收到監(jiān)聽事件,并能夠做出相應(yīng)的操作。在監(jiān)聽事件中,我們可以對本機緩存進行更新,由此達到了我們的目的。
4.4 相關(guān)配置文件 applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"
       default-lazy-init="false">

    <!-- zookeeper -->
    <bean id="zookeeper" class="com.vip.fcs.ps.zk.demo.ZookeeperFactory"
          destroy-method="close">
        <property name="zkHosts"
                  value="127.0.0.1:2181"/>
        <property name="namespace" value="ps.api"/>
        <property name="connectionTimeout" value="3000"/>
        <property name="sessionTimeout" value="3000"/>
        <property name="singleton" value="true"/>
    </bean>

    <!-- zookeeper -->
    <bean id="zkHandler" class="com.vip.fcs.ps.zk.demo.ZkHandler">
        <property name="zkClient"
                  ref="zookeeper"/>
    </bean>
</beans>

4.4 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>

   <groupId>com.fcs.ps.demo</groupId>
   <artifactId>zk-demo</artifactId>
   <version>1.0-SNAPSHOT</version>
   <properties>
       <spring.version>4.0.7.RELEASE</spring.version>
   </properties>
   <dependencies>
       <dependency>
           <groupId>commons-pool</groupId>
           <artifactId>commons-pool</artifactId>
           <version>1.6</version>
       </dependency>
       <dependency>
           <groupId>org.springframework</groupId>
           <artifactId>spring-context</artifactId>
           <version>4.0.9.RELEASE</version>
       </dependency>

       <dependency>
           <groupId>org.apache.zookeeper</groupId>
           <artifactId>zookeeper</artifactId>
           <version>3.4.6</version>
       </dependency>
       <dependency>
           <groupId>org.apache.zookeeper</groupId>
           <artifactId>zookeeper</artifactId>
           <version>3.4.9</version>
       </dependency>
       <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-framework</artifactId>
           <version>4.0.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-test</artifactId>
           <version>4.0.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-recipes</artifactId>
           <version>4.0.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-x-discovery</artifactId>
           <version>4.0.0</version>
       </dependency>

   </dependencies>

</project>

拓展 :zookeeper如此強大,我們進一步了解一下其優(yōu)缺點,已經(jīng)相關(guān)的應(yīng)用場景。

zookeeper 特點及優(yōu)點 :

1. zookeeper是一個精簡的文件系統(tǒng)。這點它和hadoop有點像,但是zookeeper這個文件系統(tǒng)是管理小文件的,而hadoop是管理超大文件的。

2. zookeeper提供了豐富的“構(gòu)件”,這些構(gòu)件可以實現(xiàn)很多協(xié)調(diào)數(shù)據(jù)結(jié)構(gòu)和協(xié)議的操作。例如:分布式隊列、分布式鎖以及一組同級節(jié)點的“領(lǐng)導者選舉”算法。

3. zookeeper是高可用的,它本身的穩(wěn)定性是相當之好,分布式集群完全可以依賴zookeeper集群的管理,利用zookeeper避免分布式系統(tǒng)的單點故障的問題。

4. zookeeper采用了松耦合的交互模式。這點在zookeeper提供分布式鎖上表現(xiàn)最為明顯,zookeeper可以被用作一個約會機制,讓參入的進程不在了解其他進程的(或網(wǎng)絡(luò))的情況下能夠彼此發(fā)現(xiàn)并進行交互,參入的各方甚至不必同時存在,只要在zookeeper留下一條消息,在該進程結(jié)束后,另外一個進程還可以讀取這條信息,從而解耦了各個節(jié)點之間的關(guān)系。

5. zookeeper為集群提供了一個共享存儲庫,集群可以從這里集中讀寫共享的信息,避免了每個節(jié)點的共享操作編程,減輕了分布式系統(tǒng)的開發(fā)難度。

6. zookeeper的設(shè)計采用的是觀察者的設(shè)計模式,zookeeper主要是負責存儲和管理大家關(guān)心的數(shù)據(jù),然后接受觀察者的注冊,一旦這些數(shù)據(jù)的狀態(tài)發(fā)生變化,Zookeeper 就將負責通知已經(jīng)在 Zookeeper 上注冊的那些觀察者做出相應(yīng)的反應(yīng),從而實現(xiàn)集群中類似 Master/Slave 管理模式。

個人認為,zookeeper最有價值的東西也許是內(nèi)容變化能夠及時通知到監(jiān)聽者,結(jié)合上面zookeeper的特點,目前zookeeper的使用場景有:

1.數(shù)據(jù)發(fā)布與訂閱(配置中心)(適用于)

2.負載均衡

3.命名服務(wù)(Naming Service)(服務(wù)注冊)

4.分布式通知/協(xié)調(diào)

5.集群管理與Master選舉(hbase用到)

6.分布式鎖

7.分布式隊列

但是zookeeper也有很多缺點:

  1. zookeeper不是為高可用性設(shè)計的

  2. zookeeper的選舉過程速度很慢。網(wǎng)絡(luò)實際上常常是會出現(xiàn)隔離等不完整狀態(tài)的,而zookeeper對那種情況非常敏感。一旦出現(xiàn)網(wǎng)絡(luò)隔離,zookeeper就要發(fā)起選舉流程。zookeeper的選舉流程通常耗時30到120秒,期間zookeeper由于沒有master,都是不可用的。對于網(wǎng)絡(luò)里面偶爾出現(xiàn)的,比如半秒一秒的網(wǎng)絡(luò)隔離,zookeeper會由于選舉過程,而把不可用時間放大幾十倍

  3. zookeeper的性能是有限的

  4. zookeeper的權(quán)限控制非常薄弱

zookeeper其實就是一個高可用的服務(wù)協(xié)調(diào)框架,不能作為存儲,每次寫入都必須集群n/2+1的集群寫入完成之后才算完成,性能自己就可以想象了,而且其每個節(jié)點只能存儲1M的數(shù)據(jù),同時其節(jié)點目錄也不宜過多。

有興趣的童鞋,可以參考深入研究zookeeper,充分利用起優(yōu)點,開發(fā)出更多的應(yīng)用場景來。

        作者:王文雅     唯品會java后端開發(fā)工程師
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容