Apache BookKeeper之MetaData管理

MetaData信息

這里主要有兩類

  • 有效的Bookie的列表
  1. 用來跟蹤哪些Bookie是有效的
  • Ledger的相關(guān)信息
    其相關(guān)操作如下:
  1. createLedger: 創(chuàng)建一個新的Ledger, 它擁有一個唯一ID和當前的Version(對應(yīng)到zk的話,這個version就是znode的dataVersion);
  2. removeLedgerMetadata: 移除一個Ledger, 需要提供當前本地保存的Version, 和 MetaData Storage中的Version作check, 一致才允許作remove操作;
  3. readLedgerMetadata: 讀取一個Ledger的相關(guān)meta信息, 同時需要更新此Ledger的meta信息的Version;
  4. writeLedgerMetadata: 更新Ledger的相關(guān)meta信息,同樣需要提供當前本地保存的Version, 和 MetaData Storage中的Version作check, 一致才允許操作;
  5. asyncProcessLedgers: 遍歷當前所有的Ledger,分別對其應(yīng)用一個給定的處理函數(shù);

MetaData Storage的選取

  • 需要首先滿足以下幾點要求:
  1. 支持CAS操作: Check and Set, 比如上面提到的在刪除和更新操作時需先比較Version;
  2. 針對連續(xù)write的優(yōu)化;
  3. 針對Scan操作的優(yōu)化;
  • 目前來看合適的MetaData Storage有zookeeper, etcd, 如果ledger數(shù)量超級大,還可以使用HBase;
  • Apache BookKeeper當前默認使用Zookeeper實現(xiàn);

MetaData操作的實現(xiàn)

MetadataBookieDriver

在Apache BookKeeper中對MetaData的所有操作都被封裝到一個抽象接口MetadataBookieDriver中;

public interface MetadataBookieDriver extends AutoCloseable {
    // 初始化當前的Driver
    MetadataBookieDriver initialize(ServerConfiguration conf,
                                    RegistrationListener listener,
                                    StatsLogger statsLogger)
        throws MetadataException;
    String getScheme();

    // RegistrationManager負責管理Bookie注冊到Storage的相關(guān)操作
    RegistrationManager getRegistrationManager();

    LedgerManagerFactory getLedgerManagerFactory()
        throws MetadataException;

    LayoutManager getLayoutManager();

    @Override
    void close();
}
MetadataDrivers

負責管理所有的MetadataBookieDriver

  • 將所有Driver信息保存在 private static final ConcurrentMap<String, MetadataBookieDriverInfo> bookieDrivers;, 其中key是scheme, value是MetadataBookieDriverInfo, 定義如下:
static class MetadataBookieDriverInfo {
        final Class<? extends MetadataBookieDriver> driverClass;
        final String driverClassName;

        MetadataBookieDriverInfo(Class<? extends MetadataBookieDriver> driverClass) {
            this.driverClass = driverClass;
            this.driverClassName = driverClass.getName();
        }
    }

利用java的反射機制根據(jù)driverClass即可產(chǎn)生出對應(yīng)的MetadataBookieDriver對象;

  • 默認包含org.apache.bookkeeper.meta.zk.ZKMetadataBookieDriver, 即ZkMetadataBookieDriver, 其scheme為zk
  • 獲取MetadataBookieDriver
public static MetadataBookieDriver getBookieDriver(URI uri) {
        //對于zk來說,這個uri形如:zk+hierarchical://10.1.1.1:2181;10.1.1.2:2181;10.1.1.3:2181/ledgers
        String scheme = uri.getScheme();
        // scheme 為 zk
        scheme = scheme.toLowerCase();
        String[] schemeParts = StringUtils.split(scheme, '+');
        
        if (!initialized) {
            initialize();
        }
        
        MetadataBookieDriverInfo driverInfo = bookieDrivers.get(scheme.toLowerCase());
        if (null == driverInfo) {
            throw new IllegalArgumentException("Unknown backend " + scheme);
        }
        // 利用java的反射機制 
        return ReflectionUtils.newInstance(driverInfo.driverClass);
    }
ZkMetadataBookieDriver的實現(xiàn)
  • getRegistrationManager: 返回ZKRegistrationManager
 if (null == regManager) {
            regManager = new ZKRegistrationManager(
                serverConf,
                zk,
                listener
            );
        }
        return regManager;
  • initialize: 主要是調(diào)用其父類ZKMetadataDriverBaseinitialize方法
    主要作的事情就是創(chuàng)建了操作zk的Zookeeper對象和ZkLayoutManager對象
    protected void initialize(AbstractConfiguration<?> conf,
                              StatsLogger statsLogger,
                              RetryPolicy zkRetryPolicy,
                              Optional<Object> optionalCtx) throws MetadataException {
        this.conf = conf;
        this.acls = ZkUtils.getACLs(conf);

        if (optionalCtx.isPresent()
         ... 
        } else {
            final String metadataServiceUriStr;
            try {
                metadataServiceUriStr = conf.getMetadataServiceUri();
            } catch (ConfigurationException e) {
                throw new MetadataException(
                    Code.INVALID_METADATA_SERVICE_URI, e);
            }
            ...
            final String zkServers = getZKServersFromServiceUri(metadataServiceUri);
            try {
                this.zk = ZooKeeperClient.newBuilder()
                    .connectString(zkServers)
                    .sessionTimeoutMs(conf.getZkTimeout())
                    .operationRetryPolicy(zkRetryPolicy)
                    .requestRateLimit(conf.getZkRequestRateLimit())
                    .statsLogger(statsLogger)
                    .build();

                if (null == zk.exists(bookieReadonlyRegistrationPath, false)) {
                    try {
                        zk.create(bookieReadonlyRegistrationPath,
                            EMPTY_BYTE_ARRAY,
                            acls,
                            CreateMode.PERSISTENT);
                    } catch (KeeperException.NodeExistsException e) {
                    } catch (KeeperException.NoNodeException e) {
                    }
                }
            } catch (IOException | KeeperException e) {
                throw me;
            }
            this.ownZKHandle = true;
        }

        // once created the zookeeper client, create the layout manager and registration client
        this.layoutManager = new ZkLayoutManager(
            zk,
            ledgersRootPath,
            acls);
    }
  • getLedgerManagerFactory: 直接沿用其父類ZKMetadataDriverBase的, 返回LedgerManagerFactory對象,用于創(chuàng)建LedgerManager
 public synchronized LedgerManagerFactory getLedgerManagerFactory()
            throws MetadataException {
        if (null == lmFactory) {
            try {
                lmFactory = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
                    conf,
                    layoutManager);
            } catch (IOException e) {
                throw new MetadataException(
                    Code.METADATA_SERVICE_ERROR, "Failed to initialized ledger manager factory", e);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw e;
            }
        }
        return lmFactory;
    }
ZkRegistrationManager
  • 主要用于當前bookie信息以臨時節(jié)點的方式注冊到zk上,取消注冊,寫cookie, 讀取cookie
  • 注冊bookie實現(xiàn) doRegisterBookie: 在zk上創(chuàng)建臨時節(jié)點
 private void doRegisterBookie(String regPath) throws BookieException {
        // ZK ephemeral node for this Bookie.
        try {
            if (!checkRegNodeAndWaitExpired(regPath)) {
                // Create the ZK ephemeral node for this Bookie.
                zk.create(regPath, new byte[0], zkAcls, CreateMode.EPHEMERAL);
                zkRegManagerInitialized = true;
            }
        } catch (KeeperException ke) {
            throw new MetadataStoreException(ke);
        } catch (InterruptedException ie) {
            throw new MetadataStoreException(ie);
        } catch (IOException e) {
            throw new MetadataStoreException(e);
        }
    }
  • 寫cookie操作 writeCookie: 在每個bookie的cookie信息寫在形如/ledgers/cookies/10.209.1.1:3181的節(jié)點, cookie內(nèi)容形如:
4  ----  當前cookie的layout版本號
bookieHost: "10.209.240.36:3181"
journalDir: "/data/bookkeeper/journal"
ledgerDirs: "1\t/data/bookkeeper/ledger"
instanceId: "eb314bf8-885e-4c60-803d-32fd7858d790" ---- 當前集群的id
  • 初始化新的cluster initNewCluster:
    創(chuàng)建 /ledgers 節(jié)點
    創(chuàng)建 /ledgers/available 節(jié)點
    創(chuàng)建 /ledgers/available/readonly 節(jié)點
    創(chuàng)建 /ledgers/INSTANCEID 節(jié)點
    創(chuàng)建新的 LedgerManagerFactory
public boolean initNewCluster() throws Exception {
        String zkServers = ZKMetadataDriverBase.resolveZkServers(conf);
        String instanceIdPath = ledgersRootPath + "/" + INSTANCEID;

        boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false);

        if (ledgerRootExists) {
            return false;
        }

        List<Op> multiOps = Lists.newArrayListWithExpectedSize(4);
        // Create ledgers root node
        multiOps.add(Op.create(ledgersRootPath, EMPTY_BYTE_ARRAY, zkAcls, CreateMode.PERSISTENT));

        // create available bookies node
        multiOps.add(Op.create(bookieRegistrationPath, EMPTY_BYTE_ARRAY, zkAcls, CreateMode.PERSISTENT));

        // create readonly bookies node
        multiOps.add(Op.create(
            bookieReadonlyRegistrationPath,
            EMPTY_BYTE_ARRAY,
            zkAcls,
            CreateMode.PERSISTENT));

        // create INSTANCEID
        String instanceId = UUID.randomUUID().toString();
        multiOps.add(Op.create(instanceIdPath, instanceId.getBytes(UTF_8),
                zkAcls, CreateMode.PERSISTENT));

        // execute the multi ops
        // 這個multi操作組合了對多個node的操作,本質(zhì)上也是原子操作,要么都成功,要么都失敗
        zk.multi(multiOps);

        // creates the new layout and stores in zookeeper
        // 如果當前 zk上/ledger/LAYOUT節(jié)點沒有數(shù)據(jù),且layoutManager不為null, 下面這個調(diào)用會寫入新的/ledger/LAYOUT數(shù)據(jù)
        AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf, layoutManager);
        return true;
    }
LedgerManagerFactory
  • 前面我們已經(jīng)說過存儲在zk上的meta信息,其中最主要的一個就是ledger的信息,ledger的數(shù)量可能很少也可能很多,都存儲在zk上的話,需要有個合理的組織形式,目前主要有兩種:
  1. Flat Ledger Layout: 所有的ledger信息都存儲在唯一的一個znode(比如/ledger)下,這些ledger節(jié)點的命名以"L"開頭,后面是它的id,形如"/ledger/L001";這樣的存儲有一個問題,如果ledger數(shù)據(jù)太多的話,通過zk的getChilds接口獲取所有的ledger時,返回的結(jié)果會超過zk的package size,從而獲取失敗;
  2. Hierarchical ledger manager: 分層存儲,先利用zk的EPHEMERAL_SEQUENTIAL znode產(chǎn)生一個全局唯一的ledger id, 這種方式產(chǎn)生的id有10位,形如0000000001, 將其拆成兩層 /ledger/00/0000/L0001,作為一個znode,存儲相對應(yīng)的ledger信息;
  3. LongHierarchical ledger manager: 上面的ledger id是31位,這個是63位, 在zk上的表示形如 /ledger/000/0000/0000/0000/L0001
  • LedgerManagerFactory: 創(chuàng)建LedgerManager,其繼承關(guān)系為下
ledger-factory-classes1.png
  1. format接口: 刪除zk上所有的ledger信息,刪除/ledger/LAYOUT信息,寫入新的layout信息
public void format(AbstractConfiguration<?> conf, LayoutManager layoutManager)
            throws InterruptedException, KeeperException, IOException {
        try (AbstractZkLedgerManager ledgerManager = (AbstractZkLedgerManager) newLedgerManager()) {
            String ledgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
            List<String> children = zk.getChildren(ledgersRootPath, false);
            for (String child : children) {
              // 采用 hierarchical layou時,ledger信息是在zk的形如 /ledger/00的znode下,下面的代碼就是刪除所有的ledger信息
                if (!AbstractZkLedgerManager.isSpecialZnode(child) && ledgerManager.isLedgerParentNode(child)) {
                    ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
                }
            }
        }

        Class<? extends LedgerManagerFactory> factoryClass;
        try {
            factoryClass = conf.getLedgerManagerFactoryClass();
        } catch (ConfigurationException e) {
            throw new IOException("Failed to get ledger manager factory class from configuration : ", e);
        }

        // 刪除zk上的 /ledger/LAYOUT
        layoutManager.deleteLedgerLayout();
        // Create new layout information again.
        // 將新的layout寫到zk的 /ledger/LAYOUT下
        createNewLMFactory(conf, layoutManager, factoryClass);
    }
  1. validateAndNukeExistingCluster: 清除zk上的所有節(jié)點
  2. newLedgerIdGenerator: 返回一個ledger id的產(chǎn)生器:
public LedgerIdGenerator newLedgerIdGenerator() {
        List<ACL> zkAcls = ZkUtils.getACLs(conf);
        String zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
        ZkLedgerIdGenerator subIdGenerator = new ZkLedgerIdGenerator(zk, zkLedgersRootPath,
                LegacyHierarchicalLedgerManager.IDGEN_ZNODE, zkAcls);
        return new LongZkLedgerIdGenerator(zk, zkLedgersRootPath, LongHierarchicalLedgerManager.IDGEN_ZNODE,
                subIdGenerator, zkAcls);
    }

支持產(chǎn)生31位和64位的id, 目前看起來足夠使用了。具體實現(xiàn)這里不講了,大家可以看下源碼,都是借助于zk的EPHEMERAL_SEQUENTIAL znode;

  1. newLedgerManager:創(chuàng)建Ledgermanager對象
public LedgerManager newLedgerManager() {
        return new HierarchicalLedgerManager(conf, zk);
    }
LedgerManager
  • 先看一下類的層級關(guān)系和需要實現(xiàn)的接口函數(shù)
ledgermanager-hi.png
  • createLedgerMetadata: 異步創(chuàng)建新的Ledger, 返回 CompletableFuture<...>,
    Metadata version大于2時,ledger metadata中需添加ctoken
    public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(long ledgerId,
                                                                             LedgerMetadata inputMetadata) {
        CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
        /*
         * Metadata version大于2時,ledger metadata中需添加ctoken
         */
        final long cToken = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
        final LedgerMetadata metadata;
        if (inputMetadata.getMetadataFormatVersion() > LedgerMetadataSerDe.METADATA_FORMAT_VERSION_2) {
            metadata = LedgerMetadataBuilder.from(inputMetadata).withCToken(cToken).build();
        } else {
            metadata = inputMetadata;
        }
        String ledgerPath = getLedgerPath(ledgerId);
        
        // 這個scb是zk操作完后的回調(diào)函數(shù)
        StringCallback scb = new StringCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, String name) {
                if (rc == Code.OK.intValue()) {
                // 創(chuàng)建ledger成功
                    promise.complete(new Versioned<>(metadata, new LongVersion(0)));
                } else if (rc == Code.NODEEXISTS.intValue()) {
                // 處理創(chuàng)建的ledger節(jié)點已經(jīng)存在的情況
                    if (metadata.getMetadataFormatVersion() > 2) {
                    //讀取當前已有的ledger的meta信息
                        CompletableFuture<Versioned<LedgerMetadata>> readFuture = readLedgerMetadata(ledgerId);
                        readFuture.handle((readMetadata, exception) -> {
                            if (exception == null) {
                            // 利用這個ctoken來判斷是不是當前的操作
                                if (readMetadata.getValue().getCToken() == cToken) {
                                    FutureUtils.complete(promise, new Versioned<>(metadata, new LongVersion(0)));
                                } else {
                                    promise.completeExceptionally(new BKException.BKLedgerExistException());
                                }
                            } else if (exception instanceof KeeperException.NoNodeException) {
                                promise.completeExceptionally(new BKException.BKLedgerExistException());
                            } else {
                                promise.completeExceptionally(new BKException.ZKException());
                            }
                            return null;
                        });
                    } else {
                        promise.completeExceptionally(new BKException.BKLedgerExistException());
                    }
                } else {
                    promise.completeExceptionally(new BKException.ZKException());
                }
            }
        };
        final byte[] data;
        try {
            data = serDe.serialize(metadata);
        } catch (IOException ioe) {
            promise.completeExceptionally(new BKException.BKMetadataSerializationException(ioe));
            return promise;
        }

        List<ACL> zkAcls = ZkUtils.getACLs(conf);
        // 異步創(chuàng)建ledger節(jié)點,如果其父節(jié)點不存在,會遞歸創(chuàng)建
        ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPath, data, zkAcls,
                                              CreateMode.PERSISTENT, scb, null);
        return promise;
    }
  • removeLedgerMetadata: 異步刪除ledger的meta信息,刪除時不光提供ledger id,還要提供其在zk上的data version,供調(diào)用zk.delete時用
  • readLedgerMetadata: 異步讀取ledger的meta信息
protected CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long ledgerId, Watcher watcher) {
        CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
        zk.getData(getLedgerPath(ledgerId), watcher, new DataCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                if (rc == KeeperException.Code.NONODE.intValue()) {
                    promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsOnMetadataServerException());
                    return;
                }
                if (rc != KeeperException.Code.OK.intValue()) {
                    promise.completeExceptionally(new BKException.ZKException());
                    return;
                }
                if (stat == null) {
                    promise.completeExceptionally(new BKException.ZKException());
                    return;
                }

                try {
                // 構(gòu)造 LedgerMetadata信息
                    LongVersion version = new LongVersion(stat.getVersion());
                    LedgerMetadata metadata = serDe.parseConfig(data, Optional.of(stat.getCtime()));
                    promise.complete(new Versioned<>(metadata, version));
                } catch (Throwable t) {
                    promise.completeExceptionally(new BKException.ZKException());
                }
            }
        }, null);
        return promise;
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容