深入淺出Zookeeper源碼(二):存儲技術(shù)

本文首發(fā)于泊浮目的簡書:http://www.itdecent.cn/u/204b8aaab8ba

版本 日期 備注
1.0 2020.3.12 文章首發(fā)
1.0.1 2020.3.16 改進(jìn)部分大小寫問題及形容方式
1.0.2 2020.3.21 改進(jìn)可能會引起錯誤理解的部分
1.0.3 2020.3.29 修改標(biāo)題
1.0.4 2020.4.18 改進(jìn)小結(jié)部分
1.0.5 2020.6.26 更新部分部分解釋,改進(jìn)注釋風(fēng)格
1.0.6 2020.7.6 增加部分詳細(xì)解釋
1.1 2021.6.22 標(biāo)題從深入淺出Zookeeper(二):存儲技術(shù)變更為深入淺出Zookeeper源碼(二):存儲技術(shù)

前言

在上篇文章中,我們簡單提到了Zookeeper的幾個核心點(diǎn)。在這篇文章中,我們就來探索其存儲技術(shù)。在開始前,讀者可以考慮思考下列問題:

  • Zookeeper的數(shù)據(jù)存儲是如何實現(xiàn)的?
  • Zookeeper進(jìn)行一次寫操作的時候,會發(fā)生什么?
  • 當(dāng)一個Zookeeper新加入現(xiàn)有集群時,如何同步現(xiàn)集群中的數(shù)據(jù)?

抱著問題,我們進(jìn)入下面的內(nèi)容。

Zookeper本地存儲模型

眾所周知,Zookeeper不擅長大量數(shù)據(jù)的讀寫,因為:

  1. 本質(zhì)上就是一個內(nèi)存里的字典。
  2. 持久化節(jié)點(diǎn)的寫入由于WAL會導(dǎo)致刷盤,過大的數(shù)據(jù)會引起額外的seek。
  3. 同樣的,在zk啟動時,所有的數(shù)據(jù)會從WAL的日志中讀出。如果過大,也會導(dǎo)致啟動時間較長。

而內(nèi)存中的數(shù)據(jù),也被稱為ZkDatabase(Zk的內(nèi)存數(shù)據(jù)庫),由它來負(fù)責(zé)管理Zk的會話DataTree存儲和事務(wù)日志,它也會定時向磁盤dump快照數(shù)據(jù),在Zk啟動時,也會通過事務(wù)日志和快照數(shù)據(jù)來恢復(fù)內(nèi)存中的數(shù)據(jù)。

既然Zk的數(shù)據(jù)是在內(nèi)存里的,那么它是如何解決數(shù)據(jù)持久化問題的呢?上一段我們已經(jīng)提到了:即通過事務(wù)日志——WAL,在每次寫請求前,都會根據(jù)目前的zxid來寫log,將請求先記錄到日志中。

接下來,我們來談?wù)刉AL的優(yōu)化措施。

WAL的優(yōu)化

WAL優(yōu)化方案1:Group Commit

一般的WAL中每次寫完END都要調(diào)用一次耗時的sync API,這其實是會影響到系統(tǒng)的性能。為了解決這個問題,我們可以一次提交多個數(shù)據(jù)寫入——只在最后一個數(shù)據(jù)寫入的END日志之后,才調(diào)用sync API。like this:

  • without group commit: BEGIN Data1 END Sync BEGIN Data2 END Sync BEGIN Data3 END Sync
  • with group commit: BEGIN Data1 END BEGIN Data2 END BEGIN Data3 END Sync

凡事都有代價,這可能會引起數(shù)據(jù)一致性相關(guān)的問題。

WAL優(yōu)化方案2:File Padding

在往 WAL 里面追加日志的時候,如果當(dāng)前的文件 block 不能保存新添加的日志,就要為文件分配新的 block,這要更新文件 inode 里面的信息(例如 size)。如果我們使用的是 HHD 的話,就要先 seek 到 inode 所在的位置,然后回到新添加 block 的位置進(jìn)行日志追加,這些都是發(fā)生在寫事務(wù)日志時,這會明顯拖慢系統(tǒng)的性能。

為了減少這些 seek,我們可以預(yù)先為 WAL 分配 block。例如 ZooKeeper 當(dāng)檢測到當(dāng)前事務(wù)日志文件不足4KB時,就會填充0使該文件到64MB(這里0僅僅作為填充位)。并新建一個64MB的文件。

所以這也是Zookeeper不擅長讀寫大數(shù)據(jù)的原因之一,這會引起大量的block分配。

WAL優(yōu)化方案3:Snapshot

如果我們使用一個內(nèi)存數(shù)據(jù)結(jié)構(gòu)加 WAL 的存儲方案,WAL 就會一直增長。這樣在存儲系統(tǒng)啟動的時候,就要讀取大量的 WAL 日志數(shù)據(jù)來重建內(nèi)存數(shù)據(jù)??煺湛梢越鉀Q這個問題。

除了解決啟動時間過長的問題之外,快照還可以減少存儲空間的使用。WAL 的多個日志條目有可能是對同一個數(shù)據(jù)的改動,通過快照,就可以只保留最新的數(shù)據(jù)改動(Merge)。

Zk的確采用了這個方案來做優(yōu)化。還帶來的一個好處是:在一個節(jié)點(diǎn)加入時,可以用最新的Snapshot傳過去便于同步數(shù)據(jù)。

源碼解析

本節(jié)內(nèi)容都以3.5.7版本為例

核心接口和類

  • TxnLog:接口類型,提供讀寫事務(wù)日志的API。
  • FileTxnLog:基于文件的TxnLog實現(xiàn)。
  • Snapshot:快照接口類型,提供序列化、反序列化、訪問快照API。
  • FileSnapshot:基于文件的Snapshot實現(xiàn)。
  • FileTxnSnapLog:TxnLog和Snapshot的封裝
  • DataTree:Zookeeper的內(nèi)存數(shù)據(jù)結(jié)構(gòu),ZNode構(gòu)成的樹。
  • DataNode:表示一個ZNode。

TxnLog

TxnLog是我們前面提到的事務(wù)日志。那么接下來我們就來看它的相關(guān)源碼。

先看注釋:

package org.apache.zookeeper.server.persistence;

import ...

/**
 * This class implements the TxnLog interface. It provides api's
 * to access the txnlogs and add entries to it.
 * <p>
 * The format of a Transactional log is as follows:
 * <blockquote><pre>
 * LogFile:
 *     FileHeader TxnList ZeroPad
 *
 * FileHeader: {
 *     magic 4bytes (ZKLG)
 *     version 4bytes
 *     dbid 8bytes
 *   }
 *
 * TxnList:
 *     Txn || Txn TxnList
 *
 * Txn:
 *     checksum Txnlen TxnHeader Record 0x42
 *
 * checksum: 8bytes Adler32 is currently used
 *   calculated across payload -- Txnlen, TxnHeader, Record and 0x42
 *
 * Txnlen:
 *     len 4bytes
 *
 * TxnHeader: {
 *     sessionid 8bytes
 *     cxid 4bytes
 *     zxid 8bytes
 *     time 8bytes
 *     type 4bytes
 *   }
 *
 * Record:
 *     See Jute definition file for details on the various record types
 *
 * ZeroPad:
 *     0 padded to EOF (filled during preallocation stage)
 * </pre></blockquote>
 */
public class FileTxnLog implements TxnLog, Closeable {

在注釋中,我們可以看到一個FileLog由三部分組成:

  • FileHeader
  • TxnList
  • ZerdPad

關(guān)于FileHeader,可以理解其為一個標(biāo)示符。TxnList則為主要內(nèi)容。ZeroPad是一個終結(jié)符。

TxnLog.append

我們來看看最典型的append方法,可以將其理解WAL過程中的核心方法:

    /**
     * append an entry to the transaction log
     * @param hdr the header of the transaction
     * @param txn the transaction part of the entry
     * returns true iff something appended, otw false
     */
    public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        if (hdr == null) { //為null意味著這是一個讀請求,直接返回
            return false;
        }
        if (hdr.getZxid() <= lastZxidSeen) {
            LOG.warn("Current zxid " + hdr.getZxid()
                    + " is <= " + lastZxidSeen + " for "
                    + hdr.getType());
        } else {
            lastZxidSeen = hdr.getZxid();
        }
        if (logStream==null) { //為空的話則new一個Stream
           if(LOG.isInfoEnabled()){
                LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
           }

           logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
           fos = new FileOutputStream(logFileWrite);
           logStream=new BufferedOutputStream(fos);
           oa = BinaryOutputArchive.getArchive(logStream);
           FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
           fhdr.serialize(oa, "fileheader");   //寫file header
           // Make sure that the magic number is written before padding.
           logStream.flush();      // zxid必須比日志先落盤
           filePadding.setCurrentSize(fos.getChannel().position());
           streamsToFlush.add(fos); //加入需要Flush的隊列
        }
        filePadding.padFile(fos.getChannel());   //確定是否要擴(kuò)容。每次64m擴(kuò)容
        byte[] buf = Util.marshallTxnEntry(hdr, txn);  //序列化寫入
        if (buf == null || buf.length == 0) {
            throw new IOException("Faulty serialization for header " +
                    "and txn");
        }
        Checksum crc = makeChecksumAlgorithm();   //生成butyArray的checkSum
        crc.update(buf, 0, buf.length);
        oa.writeLong(crc.getValue(), "txnEntryCRC");//寫入日志里
        Util.writeTxnBytes(oa, buf);
        return true;
    }

這里有個zxid(ZooKeeper Transaction Id),有點(diǎn)像MySQL的GTID。每次對Zookeeper的狀態(tài)的改變都會產(chǎn)生一個zxid,zxid是全局有序的,如果zxid1小于zxid2,則zxid1在zxid2之前發(fā)生。

簡單分析一下寫入過程:

  1. 確定要寫的事務(wù)日志:當(dāng)Zk啟動完成或日志寫滿時,會與日志文件斷開連接。這個時候會根據(jù)zxid創(chuàng)建一個日志。
  2. 是否需要預(yù)分配:如果檢測到當(dāng)前日志剩余空間不足4KB時
  3. 事務(wù)序列化
  4. 為每個事務(wù)生成一個Checksum,目的是為了校驗數(shù)據(jù)的完整性和一致性。
  5. 寫入文件,不過是寫在Buffer里,并未落盤。
  6. 落盤。根據(jù)用戶配置來決定是否強(qiáng)制落盤。

TxnLog.commit

這個方法被調(diào)用的時機(jī)大致有:

  • 服務(wù)端比較閑的時候去調(diào)用
  • 到請求數(shù)量超出1000時,調(diào)用。之前提到過GroupCommit,其實就是在這個時候調(diào)用的。
  • zk的shutdown鉤子被調(diào)用時,調(diào)用
    /**
     * commit the logs. make sure that everything hits the
     * disk
     */
    public synchronized void commit() throws IOException {
        if (logStream != null) {
            logStream.flush();
        }
        for (FileOutputStream log : streamsToFlush) {
            log.flush();
            if (forceSync) {
                long startSyncNS = System.nanoTime();

                FileChannel channel = log.getChannel();
                channel.force(false);//對應(yīng)fdataSync

                syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
                if (syncElapsedMS > fsyncWarningThresholdMS) {
                    if(serverStats != null) {
                        serverStats.incrementFsyncThresholdExceedCount();
                    }
                    LOG.warn("fsync-ing the write ahead log in "
                            + Thread.currentThread().getName()
                            + " took " + syncElapsedMS
                            + "ms which will adversely effect operation latency. "
                            + "File size is " + channel.size() + " bytes. "
                            + "See the ZooKeeper troubleshooting guide");
                }
            }
        }
        while (streamsToFlush.size() > 1) {
            streamsToFlush.removeFirst().close();
        }
    }

代碼非常的簡單。如果logStream還有,那就先刷下去。然后遍歷待flush的隊列(是個鏈表,用來保持操作順序),同時還會關(guān)注寫入的時間,如果過長,則會打一個Warn的日志。

DataTree和DataNode

DataTree是Zk的內(nèi)存數(shù)據(jù)結(jié)構(gòu)——就是我們之前說到的MTable。它以樹狀結(jié)構(gòu)來組織DataNode。

這么聽起來可能有點(diǎn)云里霧里,不妨直接看一下DataNode的相關(guān)代碼。

public class DataNode implements Record {
    /** the data for this datanode */
    byte data[];

    /**
     * the acl map long for this datanode. the datatree has the map
     */
    Long acl;

    /**
     * the stat for this node that is persisted to disk.
     */
    public StatPersisted stat;

    /**
     * the list of children for this node. note that the list of children string
     * does not contain the parent path -- just the last part of the path. This
     * should be synchronized on except deserializing (for speed up issues).
     */
    private Set<String> children = null;
.....
}

如果用過ZkClient的小伙伴,可能非常熟悉。這就是我們根據(jù)一個path獲取數(shù)據(jù)時返回的相關(guān)屬性——這就是用來描述存儲數(shù)據(jù)的一個類。注意,DataNode還會維護(hù)它的Children。

簡單了解DataNode后,我們來看一下DataTree。為了避免干擾,我們選出最關(guān)鍵的成員變量:

public class DataTree {
    private static final Logger LOG = LoggerFactory.getLogger(DataTree.class);

    /**
     * This hashtable provides a fast lookup to the datanodes. The tree is the
     * source of truth and is where all the locking occurs
     */
    private final ConcurrentHashMap<String, DataNode> nodes =
        new ConcurrentHashMap<String, DataNode>();

    private final WatchManager dataWatches = new WatchManager();

    private final WatchManager childWatches = new WatchManager();

    /**
     * This hashtable lists the paths of the ephemeral nodes of a session.
     */
    private final Map<Long, HashSet<String>> ephemerals =
        new ConcurrentHashMap<Long, HashSet<String>>();
    .......
}

我們可以看到,DataTree本質(zhì)上是通過一個ConcurrentHashMap來存儲DataNode的(臨時節(jié)點(diǎn)也是)。保存的是 DataNode 的 path 到 DataNode 的映射。

那為什么要保存兩個狀態(tài)呢?這得看調(diào)用它們被調(diào)用的場景:

  • 一般CRUD ZNode的請求都是走ConcurrentHashMap的
  • 序列化DataTree的時候會從Root節(jié)點(diǎn)開始遍歷所有節(jié)點(diǎn)

如果需要獲取所有節(jié)點(diǎn)的信息,顯然遍歷樹會比一個個從ConcurrentHashMap 拿快。

接下來看一下序列化的相關(guān)代碼:

DataNode的序列化方法

    /**
     * this method uses a stringbuilder to create a new path for children. This
     * is faster than string appends ( str1 + str2).
     *
     * @param oa
     *            OutputArchive to write to.
     * @param path
     *            a string builder.
     * @throws IOException
     * @throws InterruptedException
     */
    void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
        String pathString = path.toString();
        DataNode node = getNode(pathString);
        if (node == null) {
            return;
        }
        String children[] = null;
        DataNode nodeCopy;
        synchronized (node) {
            StatPersisted statCopy = new StatPersisted();
            copyStatPersisted(node.stat, statCopy);
            //we do not need to make a copy of node.data because the contents
            //are never changed
            nodeCopy = new DataNode(node.data, node.acl, statCopy);
            Set<String> childs = node.getChildren();
            children = childs.toArray(new String[childs.size()]);
        }
        serializeNodeData(oa, pathString, nodeCopy);
        path.append('/');
        int off = path.length();
        for (String child : children) {
            // since this is single buffer being resused
            // we need
            // to truncate the previous bytes of string.
            path.delete(off, Integer.MAX_VALUE);
            path.append(child);
            serializeNode(oa, path);
        }
    }

可以看到,的確是通過DataNode的Children來遍歷所有節(jié)點(diǎn)。

DataNode的反序列化方法

接下來看一下反序列化的代碼:

    public void deserialize(InputArchive ia, String tag) throws IOException {
        aclCache.deserialize(ia);
        nodes.clear();
        pTrie.clear();
        String path = ia.readString("path");
        while (!"/".equals(path)) {
            DataNode node = new DataNode();
            ia.readRecord(node, "node");
            nodes.put(path, node);
            synchronized (node) {
                aclCache.addUsage(node.acl);
            }
            int lastSlash = path.lastIndexOf('/');
            if (lastSlash == -1) {
                root = node;
            } else {
                String parentPath = path.substring(0, lastSlash);
                DataNode parent = nodes.get(parentPath);
                if (parent == null) {
                    throw new IOException("Invalid Datatree, unable to find " +
                            "parent " + parentPath + " of path " + path);
                }
                parent.addChild(path.substring(lastSlash + 1));
                long eowner = node.stat.getEphemeralOwner();
                EphemeralType ephemeralType = EphemeralType.get(eowner);
                if (ephemeralType == EphemeralType.CONTAINER) {
                    containers.add(path);
                } else if (ephemeralType == EphemeralType.TTL) {
                    ttls.add(path);
                } else if (eowner != 0) {
                    HashSet<String> list = ephemerals.get(eowner);
                    if (list == null) {
                        list = new HashSet<String>();
                        ephemerals.put(eowner, list);
                    }
                    list.add(path);
                }
            }
            path = ia.readString("path");
        }
        nodes.put("/", root);
        // we are done with deserializing the
        // the datatree
        // update the quotas - create path trie
        // and also update the stat nodes
        setupQuota();

        aclCache.purgeUnused();
    }

因為序列化的時候是前序遍歷。所以反序列化時是先反序列化父親節(jié)點(diǎn),再反序列化孩子節(jié)點(diǎn)。

Snapshot

那么DataTree在什么情況下會序列化呢?在這里就要提到快照了。

前面提到過:如果我們使用一個內(nèi)存數(shù)據(jù)結(jié)構(gòu)加 WAL 的存儲方案,WAL 就會一直增長。這樣在存儲系統(tǒng)啟動的時候,就要讀取大量的 WAL 日志數(shù)據(jù)來重建內(nèi)存數(shù)據(jù)??煺湛梢越鉀Q這個問題。

除了減少WAL日志,Snapshot還會在Zk全量同步時被用到——當(dāng)一個全新的ZkServer(這個一般叫Learner)被加入集群時,Leader服務(wù)器會將本機(jī)上的數(shù)據(jù)全量同步給新來的ZkServer。

序列化

接下來看一下代碼入口:

    /**
     * serialize the datatree and session into the file snapshot
     * @param dt the datatree to be serialized
     * @param sessions the sessions to be serialized
     * @param snapShot the file to store snapshot into
     */
    public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
            throws IOException {
        if (!close) {
            try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
                 CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) {
                //CheckedOutputStream cout = new CheckedOutputStream()
                OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
                FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
                serialize(dt, sessions, oa, header);
                long val = crcOut.getChecksum().getValue();
                oa.writeLong(val, "val");
                oa.writeString("/", "path");
                sessOS.flush();
            }
        } else {
            throw new IOException("FileSnap has already been closed");
        }
    }

JavaIO的基礎(chǔ)知識在這不再介紹,有興趣的人可以自行查閱資料或看 從一段代碼談起——淺談JavaIO接口。

本質(zhì)就是創(chuàng)建文件,并調(diào)用DataTree的序列化方法,DataTree的序列化其實就是遍歷DataNode去序列化,最后將這些序列化的內(nèi)容寫入文件。

反序列化

    /**
     * deserialize a data tree from the most recent snapshot
     * @return the zxid of the snapshot
     */
    public long deserialize(DataTree dt, Map<Long, Integer> sessions)
            throws IOException {
        // we run through 100 snapshots (not all of them)
        // if we cannot get it running within 100 snapshots
        // we should  give up
        List<File> snapList = findNValidSnapshots(100);
        if (snapList.size() == 0) {
            return -1L;
        }
        File snap = null;
        boolean foundValid = false;
        for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
            snap = snapList.get(i);
            LOG.info("Reading snapshot " + snap);
            try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
                 CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
                InputArchive ia = BinaryInputArchive.getArchive(crcIn);
                deserialize(dt, sessions, ia);
                long checkSum = crcIn.getChecksum().getValue();
                long val = ia.readLong("val");
                if (val != checkSum) {
                    throw new IOException("CRC corruption in snapshot :  " + snap);
                }
                foundValid = true;
                break;
            } catch (IOException e) {
                LOG.warn("problem reading snap file " + snap, e);
            }
        }
        if (!foundValid) {
            throw new IOException("Not able to find valid snapshots in " + snapDir);
        }
        dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
        return dt.lastProcessedZxid;
    }

簡單來說,先讀取Snapshot文件們。并反序列化它們,組成DataTree。

小結(jié)

在本文中,筆者和大家一起學(xué)習(xí)了Zk的底層存儲技術(shù)。在此處,我們做個簡單的回顧:

  • zk的數(shù)據(jù)主要維護(hù)在內(nèi)存中。在寫入內(nèi)存前,會做WAL,同時也會定期的做快照持久化到磁盤
  • WAL的常見優(yōu)化手段有三種:Group Commit、File Padding、Snapshot

另外,Zk中序列化技術(shù)用的是Apache Jute——本質(zhì)上調(diào)用了JavaDataOutput和Input,較為簡單。故沒在本文中展開。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容