| 版本 | 日期 | 備注 |
|---|---|---|
| 1.0 | 2020.3.12 | 文章首發(fā) |
| 1.0.1 | 2020.3.16 | 改進(jìn)部分大小寫問題及形容方式 |
| 1.0.2 | 2020.3.21 | 改進(jìn)可能會引起錯誤理解的部分 |
| 1.0.3 | 2020.3.29 | 修改標(biāo)題 |
| 1.0.4 | 2020.4.18 | 改進(jìn)小結(jié)部分 |
| 1.0.5 | 2020.6.26 | 更新部分部分解釋,改進(jìn)注釋風(fēng)格 |
| 1.0.6 | 2020.7.6 | 增加部分詳細(xì)解釋 |
| 1.1 | 2021.6.22 | 標(biāo)題從深入淺出Zookeeper(二):存儲技術(shù)變更為深入淺出Zookeeper源碼(二):存儲技術(shù)
|
前言
在上篇文章中,我們簡單提到了Zookeeper的幾個核心點(diǎn)。在這篇文章中,我們就來探索其存儲技術(shù)。在開始前,讀者可以考慮思考下列問題:
- Zookeeper的數(shù)據(jù)存儲是如何實現(xiàn)的?
- Zookeeper進(jìn)行一次寫操作的時候,會發(fā)生什么?
- 當(dāng)一個Zookeeper新加入現(xiàn)有集群時,如何同步現(xiàn)集群中的數(shù)據(jù)?
抱著問題,我們進(jìn)入下面的內(nèi)容。
Zookeper本地存儲模型
眾所周知,Zookeeper不擅長大量數(shù)據(jù)的讀寫,因為:
- 本質(zhì)上就是一個內(nèi)存里的字典。
- 持久化節(jié)點(diǎn)的寫入由于WAL會導(dǎo)致刷盤,過大的數(shù)據(jù)會引起額外的
seek。 - 同樣的,在zk啟動時,所有的數(shù)據(jù)會從WAL的日志中讀出。如果過大,也會導(dǎo)致啟動時間較長。
而內(nèi)存中的數(shù)據(jù),也被稱為ZkDatabase(Zk的內(nèi)存數(shù)據(jù)庫),由它來負(fù)責(zé)管理Zk的會話DataTree存儲和事務(wù)日志,它也會定時向磁盤dump快照數(shù)據(jù),在Zk啟動時,也會通過事務(wù)日志和快照數(shù)據(jù)來恢復(fù)內(nèi)存中的數(shù)據(jù)。
既然Zk的數(shù)據(jù)是在內(nèi)存里的,那么它是如何解決數(shù)據(jù)持久化問題的呢?上一段我們已經(jīng)提到了:即通過事務(wù)日志——WAL,在每次寫請求前,都會根據(jù)目前的zxid來寫log,將請求先記錄到日志中。
接下來,我們來談?wù)刉AL的優(yōu)化措施。
WAL的優(yōu)化
WAL優(yōu)化方案1:Group Commit
一般的WAL中每次寫完END都要調(diào)用一次耗時的sync API,這其實是會影響到系統(tǒng)的性能。為了解決這個問題,我們可以一次提交多個數(shù)據(jù)寫入——只在最后一個數(shù)據(jù)寫入的END日志之后,才調(diào)用sync API。like this:
- without group commit:
BEGIN Data1 END SyncBEGIN Data2 END SyncBEGIN Data3 END Sync - with group commit:
BEGIN Data1 END BEGIN Data2 END BEGIN Data3 END Sync
凡事都有代價,這可能會引起數(shù)據(jù)一致性相關(guān)的問題。
WAL優(yōu)化方案2:File Padding
在往 WAL 里面追加日志的時候,如果當(dāng)前的文件 block 不能保存新添加的日志,就要為文件分配新的 block,這要更新文件 inode 里面的信息(例如 size)。如果我們使用的是 HHD 的話,就要先 seek 到 inode 所在的位置,然后回到新添加 block 的位置進(jìn)行日志追加,這些都是發(fā)生在寫事務(wù)日志時,這會明顯拖慢系統(tǒng)的性能。
為了減少這些 seek,我們可以預(yù)先為 WAL 分配 block。例如 ZooKeeper 當(dāng)檢測到當(dāng)前事務(wù)日志文件不足4KB時,就會填充0使該文件到64MB(這里0僅僅作為填充位)。并新建一個64MB的文件。
所以這也是Zookeeper不擅長讀寫大數(shù)據(jù)的原因之一,這會引起大量的block分配。
WAL優(yōu)化方案3:Snapshot
如果我們使用一個內(nèi)存數(shù)據(jù)結(jié)構(gòu)加 WAL 的存儲方案,WAL 就會一直增長。這樣在存儲系統(tǒng)啟動的時候,就要讀取大量的 WAL 日志數(shù)據(jù)來重建內(nèi)存數(shù)據(jù)??煺湛梢越鉀Q這個問題。
除了解決啟動時間過長的問題之外,快照還可以減少存儲空間的使用。WAL 的多個日志條目有可能是對同一個數(shù)據(jù)的改動,通過快照,就可以只保留最新的數(shù)據(jù)改動(Merge)。
Zk的確采用了這個方案來做優(yōu)化。還帶來的一個好處是:在一個節(jié)點(diǎn)加入時,可以用最新的Snapshot傳過去便于同步數(shù)據(jù)。
源碼解析
本節(jié)內(nèi)容都以3.5.7版本為例
核心接口和類
- TxnLog:接口類型,提供讀寫事務(wù)日志的API。
- FileTxnLog:基于文件的TxnLog實現(xiàn)。
- Snapshot:快照接口類型,提供序列化、反序列化、訪問快照API。
- FileSnapshot:基于文件的Snapshot實現(xiàn)。
- FileTxnSnapLog:TxnLog和Snapshot的封裝
- DataTree:Zookeeper的內(nèi)存數(shù)據(jù)結(jié)構(gòu),ZNode構(gòu)成的樹。
- DataNode:表示一個ZNode。
TxnLog
TxnLog是我們前面提到的事務(wù)日志。那么接下來我們就來看它的相關(guān)源碼。
先看注釋:
package org.apache.zookeeper.server.persistence;
import ...
/**
* This class implements the TxnLog interface. It provides api's
* to access the txnlogs and add entries to it.
* <p>
* The format of a Transactional log is as follows:
* <blockquote><pre>
* LogFile:
* FileHeader TxnList ZeroPad
*
* FileHeader: {
* magic 4bytes (ZKLG)
* version 4bytes
* dbid 8bytes
* }
*
* TxnList:
* Txn || Txn TxnList
*
* Txn:
* checksum Txnlen TxnHeader Record 0x42
*
* checksum: 8bytes Adler32 is currently used
* calculated across payload -- Txnlen, TxnHeader, Record and 0x42
*
* Txnlen:
* len 4bytes
*
* TxnHeader: {
* sessionid 8bytes
* cxid 4bytes
* zxid 8bytes
* time 8bytes
* type 4bytes
* }
*
* Record:
* See Jute definition file for details on the various record types
*
* ZeroPad:
* 0 padded to EOF (filled during preallocation stage)
* </pre></blockquote>
*/
public class FileTxnLog implements TxnLog, Closeable {
在注釋中,我們可以看到一個FileLog由三部分組成:
- FileHeader
- TxnList
- ZerdPad
關(guān)于FileHeader,可以理解其為一個標(biāo)示符。TxnList則為主要內(nèi)容。ZeroPad是一個終結(jié)符。
TxnLog.append
我們來看看最典型的append方法,可以將其理解WAL過程中的核心方法:
/**
* append an entry to the transaction log
* @param hdr the header of the transaction
* @param txn the transaction part of the entry
* returns true iff something appended, otw false
*/
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
if (hdr == null) { //為null意味著這是一個讀請求,直接返回
return false;
}
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
} else {
lastZxidSeen = hdr.getZxid();
}
if (logStream==null) { //為空的話則new一個Stream
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
}
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader"); //寫file header
// Make sure that the magic number is written before padding.
logStream.flush(); // zxid必須比日志先落盤
filePadding.setCurrentSize(fos.getChannel().position());
streamsToFlush.add(fos); //加入需要Flush的隊列
}
filePadding.padFile(fos.getChannel()); //確定是否要擴(kuò)容。每次64m擴(kuò)容
byte[] buf = Util.marshallTxnEntry(hdr, txn); //序列化寫入
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
Checksum crc = makeChecksumAlgorithm(); //生成butyArray的checkSum
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");//寫入日志里
Util.writeTxnBytes(oa, buf);
return true;
}
這里有個zxid(ZooKeeper Transaction Id),有點(diǎn)像MySQL的GTID。每次對Zookeeper的狀態(tài)的改變都會產(chǎn)生一個zxid,zxid是全局有序的,如果zxid1小于zxid2,則zxid1在zxid2之前發(fā)生。
簡單分析一下寫入過程:
- 確定要寫的事務(wù)日志:當(dāng)Zk啟動完成或日志寫滿時,會與日志文件斷開連接。這個時候會根據(jù)zxid創(chuàng)建一個日志。
- 是否需要預(yù)分配:如果檢測到當(dāng)前日志剩余空間不足4KB時
- 事務(wù)序列化
- 為每個事務(wù)生成一個Checksum,目的是為了校驗數(shù)據(jù)的完整性和一致性。
- 寫入文件,不過是寫在Buffer里,并未落盤。
- 落盤。根據(jù)用戶配置來決定是否強(qiáng)制落盤。
TxnLog.commit
這個方法被調(diào)用的時機(jī)大致有:
- 服務(wù)端比較閑的時候去調(diào)用
- 到請求數(shù)量超出1000時,調(diào)用。之前提到過GroupCommit,其實就是在這個時候調(diào)用的。
- zk的shutdown鉤子被調(diào)用時,調(diào)用
/**
* commit the logs. make sure that everything hits the
* disk
*/
public synchronized void commit() throws IOException {
if (logStream != null) {
logStream.flush();
}
for (FileOutputStream log : streamsToFlush) {
log.flush();
if (forceSync) {
long startSyncNS = System.nanoTime();
FileChannel channel = log.getChannel();
channel.force(false);//對應(yīng)fdataSync
syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS) {
if(serverStats != null) {
serverStats.incrementFsyncThresholdExceedCount();
}
LOG.warn("fsync-ing the write ahead log in "
+ Thread.currentThread().getName()
+ " took " + syncElapsedMS
+ "ms which will adversely effect operation latency. "
+ "File size is " + channel.size() + " bytes. "
+ "See the ZooKeeper troubleshooting guide");
}
}
}
while (streamsToFlush.size() > 1) {
streamsToFlush.removeFirst().close();
}
}
代碼非常的簡單。如果logStream還有,那就先刷下去。然后遍歷待flush的隊列(是個鏈表,用來保持操作順序),同時還會關(guān)注寫入的時間,如果過長,則會打一個Warn的日志。
DataTree和DataNode
DataTree是Zk的內(nèi)存數(shù)據(jù)結(jié)構(gòu)——就是我們之前說到的MTable。它以樹狀結(jié)構(gòu)來組織DataNode。
這么聽起來可能有點(diǎn)云里霧里,不妨直接看一下DataNode的相關(guān)代碼。
public class DataNode implements Record {
/** the data for this datanode */
byte data[];
/**
* the acl map long for this datanode. the datatree has the map
*/
Long acl;
/**
* the stat for this node that is persisted to disk.
*/
public StatPersisted stat;
/**
* the list of children for this node. note that the list of children string
* does not contain the parent path -- just the last part of the path. This
* should be synchronized on except deserializing (for speed up issues).
*/
private Set<String> children = null;
.....
}
如果用過ZkClient的小伙伴,可能非常熟悉。這就是我們根據(jù)一個path獲取數(shù)據(jù)時返回的相關(guān)屬性——這就是用來描述存儲數(shù)據(jù)的一個類。注意,DataNode還會維護(hù)它的Children。
簡單了解DataNode后,我們來看一下DataTree。為了避免干擾,我們選出最關(guān)鍵的成員變量:
public class DataTree {
private static final Logger LOG = LoggerFactory.getLogger(DataTree.class);
/**
* This hashtable provides a fast lookup to the datanodes. The tree is the
* source of truth and is where all the locking occurs
*/
private final ConcurrentHashMap<String, DataNode> nodes =
new ConcurrentHashMap<String, DataNode>();
private final WatchManager dataWatches = new WatchManager();
private final WatchManager childWatches = new WatchManager();
/**
* This hashtable lists the paths of the ephemeral nodes of a session.
*/
private final Map<Long, HashSet<String>> ephemerals =
new ConcurrentHashMap<Long, HashSet<String>>();
.......
}
我們可以看到,DataTree本質(zhì)上是通過一個ConcurrentHashMap來存儲DataNode的(臨時節(jié)點(diǎn)也是)。保存的是 DataNode 的 path 到 DataNode 的映射。
那為什么要保存兩個狀態(tài)呢?這得看調(diào)用它們被調(diào)用的場景:
- 一般CRUD ZNode的請求都是走ConcurrentHashMap的
- 序列化DataTree的時候會從Root節(jié)點(diǎn)開始遍歷所有節(jié)點(diǎn)
如果需要獲取所有節(jié)點(diǎn)的信息,顯然遍歷樹會比一個個從ConcurrentHashMap 拿快。
接下來看一下序列化的相關(guān)代碼:
DataNode的序列化方法
/**
* this method uses a stringbuilder to create a new path for children. This
* is faster than string appends ( str1 + str2).
*
* @param oa
* OutputArchive to write to.
* @param path
* a string builder.
* @throws IOException
* @throws InterruptedException
*/
void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
String pathString = path.toString();
DataNode node = getNode(pathString);
if (node == null) {
return;
}
String children[] = null;
DataNode nodeCopy;
synchronized (node) {
StatPersisted statCopy = new StatPersisted();
copyStatPersisted(node.stat, statCopy);
//we do not need to make a copy of node.data because the contents
//are never changed
nodeCopy = new DataNode(node.data, node.acl, statCopy);
Set<String> childs = node.getChildren();
children = childs.toArray(new String[childs.size()]);
}
serializeNodeData(oa, pathString, nodeCopy);
path.append('/');
int off = path.length();
for (String child : children) {
// since this is single buffer being resused
// we need
// to truncate the previous bytes of string.
path.delete(off, Integer.MAX_VALUE);
path.append(child);
serializeNode(oa, path);
}
}
可以看到,的確是通過DataNode的Children來遍歷所有節(jié)點(diǎn)。
DataNode的反序列化方法
接下來看一下反序列化的代碼:
public void deserialize(InputArchive ia, String tag) throws IOException {
aclCache.deserialize(ia);
nodes.clear();
pTrie.clear();
String path = ia.readString("path");
while (!"/".equals(path)) {
DataNode node = new DataNode();
ia.readRecord(node, "node");
nodes.put(path, node);
synchronized (node) {
aclCache.addUsage(node.acl);
}
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1) {
root = node;
} else {
String parentPath = path.substring(0, lastSlash);
DataNode parent = nodes.get(parentPath);
if (parent == null) {
throw new IOException("Invalid Datatree, unable to find " +
"parent " + parentPath + " of path " + path);
}
parent.addChild(path.substring(lastSlash + 1));
long eowner = node.stat.getEphemeralOwner();
EphemeralType ephemeralType = EphemeralType.get(eowner);
if (ephemeralType == EphemeralType.CONTAINER) {
containers.add(path);
} else if (ephemeralType == EphemeralType.TTL) {
ttls.add(path);
} else if (eowner != 0) {
HashSet<String> list = ephemerals.get(eowner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(eowner, list);
}
list.add(path);
}
}
path = ia.readString("path");
}
nodes.put("/", root);
// we are done with deserializing the
// the datatree
// update the quotas - create path trie
// and also update the stat nodes
setupQuota();
aclCache.purgeUnused();
}
因為序列化的時候是前序遍歷。所以反序列化時是先反序列化父親節(jié)點(diǎn),再反序列化孩子節(jié)點(diǎn)。
Snapshot
那么DataTree在什么情況下會序列化呢?在這里就要提到快照了。
前面提到過:如果我們使用一個內(nèi)存數(shù)據(jù)結(jié)構(gòu)加 WAL 的存儲方案,WAL 就會一直增長。這樣在存儲系統(tǒng)啟動的時候,就要讀取大量的 WAL 日志數(shù)據(jù)來重建內(nèi)存數(shù)據(jù)??煺湛梢越鉀Q這個問題。
除了減少WAL日志,Snapshot還會在Zk全量同步時被用到——當(dāng)一個全新的ZkServer(這個一般叫Learner)被加入集群時,Leader服務(wù)器會將本機(jī)上的數(shù)據(jù)全量同步給新來的ZkServer。
序列化
接下來看一下代碼入口:
/**
* serialize the datatree and session into the file snapshot
* @param dt the datatree to be serialized
* @param sessions the sessions to be serialized
* @param snapShot the file to store snapshot into
*/
public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
throws IOException {
if (!close) {
try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) {
//CheckedOutputStream cout = new CheckedOutputStream()
OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
serialize(dt, sessions, oa, header);
long val = crcOut.getChecksum().getValue();
oa.writeLong(val, "val");
oa.writeString("/", "path");
sessOS.flush();
}
} else {
throw new IOException("FileSnap has already been closed");
}
}
JavaIO的基礎(chǔ)知識在這不再介紹,有興趣的人可以自行查閱資料或看 從一段代碼談起——淺談JavaIO接口。
本質(zhì)就是創(chuàng)建文件,并調(diào)用DataTree的序列化方法,DataTree的序列化其實就是遍歷DataNode去序列化,最后將這些序列化的內(nèi)容寫入文件。
反序列化
/**
* deserialize a data tree from the most recent snapshot
* @return the zxid of the snapshot
*/
public long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException {
// we run through 100 snapshots (not all of them)
// if we cannot get it running within 100 snapshots
// we should give up
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
File snap = null;
boolean foundValid = false;
for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
snap = snapList.get(i);
LOG.info("Reading snapshot " + snap);
try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
deserialize(dt, sessions, ia);
long checkSum = crcIn.getChecksum().getValue();
long val = ia.readLong("val");
if (val != checkSum) {
throw new IOException("CRC corruption in snapshot : " + snap);
}
foundValid = true;
break;
} catch (IOException e) {
LOG.warn("problem reading snap file " + snap, e);
}
}
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
return dt.lastProcessedZxid;
}
簡單來說,先讀取Snapshot文件們。并反序列化它們,組成DataTree。
小結(jié)
在本文中,筆者和大家一起學(xué)習(xí)了Zk的底層存儲技術(shù)。在此處,我們做個簡單的回顧:
- zk的數(shù)據(jù)主要維護(hù)在內(nèi)存中。在寫入內(nèi)存前,會做WAL,同時也會定期的做快照持久化到磁盤
- WAL的常見優(yōu)化手段有三種:Group Commit、File Padding、Snapshot
另外,Zk中序列化技術(shù)用的是Apache Jute——本質(zhì)上調(diào)用了JavaDataOutput和Input,較為簡單。故沒在本文中展開。