摘要
本節(jié)講解
zk的持久化框架
事務(wù)日志FileTxnLog日志結(jié)構(gòu)
FileTxnLog源碼
LogFormatter完成事務(wù)日志的反序列化
分析事務(wù)日志demo
持久化總體框架
持久化的類主要在包org.apache.zookeeper.server.persistence下,結(jié)構(gòu)如下圖

TxnLog,接口類型,讀取事務(wù)性日志的接口。
FileTxnLog,實現(xiàn)TxnLog接口,添加了訪問該事務(wù)性日志的API。
Snapshot,接口類型,持久層快照接口。
FileSnap,實現(xiàn)Snapshot接口,負責存儲、序列化、反序列化、訪問快照。
FileTxnSnapLog,封裝了TxnLog和SnapShot。
Util,工具類,提供持久化所需的API。
兩種日志
zk主要存放了兩類文件
snapshot(內(nèi)存快照)
log(事務(wù)日志,類似MySQL的binlog,將所有與修改數(shù)據(jù)相關(guān)的操作記錄在log中)
關(guān)于事務(wù)性日志的定義,可以參照refer,簡而言之就是
zk事務(wù)日志文件用來記錄事物操作,每一個事務(wù)操作如添加,刪除節(jié)點等等,都會在事務(wù)日志中記錄一條記錄,用來在zookeeper異常情況下恢復(fù)數(shù)據(jù)
下面介紹事務(wù)日志
事務(wù)日志
正常運行過程中,針對所有更新操作,在返回客戶端“更新成功”的響應(yīng)前,ZK會確保已經(jīng)將本次更新操作的事務(wù)日志寫到磁盤上,只有這樣,整個更新操作才會生效。
接口TxnLog
public interface TxnLog {
/**
* roll the current
* log being appended to
* @throws IOException
*/
// 滾動日志,從當前日志滾到下一個日志,不是回滾
void rollLog() throws IOException;
/**
* Append a request to the transaction log
* @param hdr the transaction header
* @param r the transaction itself
* returns true iff something appended, otw false
* @throws IOException
*/
// 添加一個請求至事務(wù)性日志
boolean append(TxnHeader hdr, Record r) throws IOException;
/**
* Start reading the transaction logs
* from a given zxid
* @param zxid
* @return returns an iterator to read the
* next transaction in the logs.
* @throws IOException
*/
// 讀取事務(wù)性日志
TxnIterator read(long zxid) throws IOException;
/**
* the last zxid of the logged transactions.
* @return the last zxid of the logged transactions.
* @throws IOException
*/
// 事務(wù)性操作的最新zxid
long getLastLoggedZxid() throws IOException;
/**
* truncate the log to get in sync with the
* leader.
* @param zxid the zxid to truncate at.
* @throws IOException
*/
// 清空zxid以后的日志
boolean truncate(long zxid) throws IOException;
/**
* the dbid for this transaction log.
* @return the dbid for this transaction log.
* @throws IOException
*/
// 獲取數(shù)據(jù)庫的id
long getDbId() throws IOException;
/**
* commmit the trasaction and make sure
* they are persisted
* @throws IOException
*/
// 提交事務(wù)并進行確認
void commit() throws IOException;
/**
* close the transactions logs
*/
// 關(guān)閉事務(wù)性日志
void close() throws IOException;
/**
* an iterating interface for reading
* transaction logs.
*/
// 讀取事務(wù)日志的迭代器接口
public interface TxnIterator {
/**
* return the transaction header.
* @return return the transaction header.
*/
// 獲取事務(wù)頭部
TxnHeader getHeader();
/**
* return the transaction record.
* @return return the transaction record.
*/
// 獲取事務(wù)
Record getTxn();
/**
* go to the next transaction record.
* @throws IOException
*/
// 下個事務(wù)
boolean next() throws IOException;
/**
* close files and release the
* resources
* @throws IOException
*/
// 關(guān)閉文件釋放資源
void close() throws IOException;
}
}
實現(xiàn)類 FileTxnLog
文件結(jié)構(gòu)
/**
* The format of a Transactional log is as follows:
* <blockquote><pre>
* LogFile:
* FileHeader TxnList ZeroPad
*
* FileHeader: {
* magic 4bytes (ZKLG)
* version 4bytes
* dbid 8bytes
* }
*
* TxnList:
* Txn || Txn TxnList
*
* Txn:
* checksum Txnlen TxnHeader Record 0x42
*
* checksum: 8bytes Adler32 is currently used
* calculated across payload -- Txnlen, TxnHeader, Record and 0x42
*
* Txnlen:
* len 4bytes
*
* TxnHeader: {
* sessionid 8bytes
* cxid 4bytes
* zxid 8bytes
* time 8bytes
* type 4bytes
* }
*
* Record:
* See Jute definition file for details on the various record types
*
* ZeroPad:
* 0 padded to EOF (filled during preallocation stage)
* </pre></blockquote>
*/
主要接口
append
//添加一條事務(wù)性日志
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
if (hdr != null) { // 事務(wù)頭部不為空
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
}
if (logStream==null) { //日志流為空
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: log." +
Long.toHexString(hdr.getZxid()));
}
//生成一個新的log文件
logFileWrite = new File(logDir, ("log." +
Long.toHexString(hdr.getZxid())));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
//用TXNLOG_MAGIC VERSION dbId來生成文件頭
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");//序列化
// Make sure that the magic number is written before padding.
logStream.flush();
currentSize = fos.getChannel().position();
streamsToFlush.add(fos);
}
padFile(fos);//剩余空間不夠4k時則填充文件64M
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
Checksum crc = makeChecksumAlgorithm();//生成驗證算法
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");//將驗證算法的值寫入long
Util.writeTxnBytes(oa, buf);//將序列化事務(wù)記錄寫入OutputArchive,以0x42('B')結(jié)束
return true;
}
return false;
}
)
getLogFiles
//找出<=snapshot的中最大的zxid的logfile以及后續(xù)的logfile
public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
List<File> files = Util.sortDataDir(logDirList, "log", true);//按照后綴抽取zxid,按zxid升序排序
long logZxid = 0;
// Find the log file that starts before or at the same time as the
// zxid of the snapshot
for (File f : files) {
long fzxid = Util.getZxidFromName(f.getName(), "log");
if (fzxid > snapshotZxid) {
continue;
}
// the files
// are sorted with zxid's
if (fzxid > logZxid) {
logZxid = fzxid;
}
}
List<File> v=new ArrayList<File>(5);
for (File f : files) {
long fzxid = Util.getZxidFromName(f.getName(), "log");
if (fzxid < logZxid) {
continue;
}
v.add(f);
}
return v.toArray(new File[0]);
}
getLastLoggedZxid
//獲取記錄在log中的最后一個zxid
public long getLastLoggedZxid() {
File[] files = getLogFiles(logDir.listFiles(), 0);
//找到最大的zxid所在的文件
long maxLog=files.length>0?
Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;
// if a log file is more recent we must scan it to find
// the highest zxid
long zxid = maxLog;
TxnIterator itr = null;
try {
FileTxnLog txn = new FileTxnLog(logDir);
itr = txn.read(maxLog);
while (true) {
if(!itr.next())
break;
TxnHeader hdr = itr.getHeader();//遍歷這個文件,找到最后一條事務(wù)日志記錄
zxid = hdr.getZxid();//取出zxid
}
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
} finally {
close(itr);
}
return zxid;
}
commit
//提交事務(wù)日志至磁盤
public synchronized void commit() throws IOException {
if (logStream != null) {
logStream.flush();// 強制刷到磁盤
}
for (FileOutputStream log : streamsToFlush) {
log.flush();// 強制刷到磁盤
if (forceSync) {
long startSyncNS = System.nanoTime();
log.getChannel().force(false);
long syncElapsedMS =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS) {
LOG.warn("fsync-ing the write ahead log in "
+ Thread.currentThread().getName()
+ " took " + syncElapsedMS
+ "ms which will adversely effect operation latency. "
+ "See the ZooKeeper troubleshooting guide");
}
}
}
while (streamsToFlush.size() > 1) {
streamsToFlush.removeFirst().close();// 移除流并關(guān)閉
}
}
truncate
//清空大于指定zxid的事務(wù)日志
public boolean truncate(long zxid) throws IOException {
FileTxnIterator itr = null;
try {
itr = new FileTxnIterator(this.logDir, zxid);//根據(jù)zxid找到迭代器
PositionInputStream input = itr.inputStream;
if(input == null) {
throw new IOException("No log files found to truncate! This could " +
"happen if you still have snapshots from an old setup or " +
"log files were deleted accidentally or dataLogDir was changed in zoo.cfg.");
}
long pos = input.getPosition();
// now, truncate at the current position
RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
raf.setLength(pos);//把當前l(fā)og后面的部分(zxid更大的)截斷
raf.close();
while (itr.goToNextLog()) {
if (!itr.logFile.delete()) {//把后面的log文件都刪除
LOG.warn("Unable to truncate {}", itr.logFile);
}
}
} finally {
close(itr);
}
return true;
}
rollLog
這個一定要看注釋,意思不是回滾日志,是從當前日志滾到下一個
/**
* rollover the current log file to a new one.
* @throws IOException
*/
public synchronized void rollLog() throws IOException {
if (logStream != null) {
this.logStream.flush();
this.logStream = null;
oa = null;
}
}
事務(wù)日志可視化 LogFormatter
可以結(jié)合org.apache.zookeeper.server.persistence.FileTxnLog#append進行理解
傳入?yún)?shù)為對應(yīng)的事務(wù)日志路徑即可
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("USAGE: LogFormatter log_file");
System.exit(2);
}
FileInputStream fis = new FileInputStream(args[0]);
BinaryInputArchive logStream = BinaryInputArchive.getArchive(fis);
FileHeader fhdr = new FileHeader();
fhdr.deserialize(logStream, "fileheader");
//反序列化header完成驗證
if (fhdr.getMagic() != FileTxnLog.TXNLOG_MAGIC) {
System.err.println("Invalid magic number for " + args[0]);
System.exit(2);
}
System.out.println("ZooKeeper Transactional Log File with dbid "
+ fhdr.getDbid() + " txnlog format version "
+ fhdr.getVersion());
int count = 0;
while (true) {
long crcValue;
byte[] bytes;
try {
crcValue = logStream.readLong("crcvalue");//獲取反序列化的checksum
bytes = logStream.readBuffer("txnEntry");
} catch (EOFException e) {
System.out.println("EOF reached after " + count + " txns.");
return;
}
if (bytes.length == 0) {
// Since we preallocate, we define EOF to be an
// empty transaction
System.out.println("EOF reached after " + count + " txns.");
return;
}
Checksum crc = new Adler32();
crc.update(bytes, 0, bytes.length);
if (crcValue != crc.getValue()) {//比較自己生成的checksum與傳遞過來的checksum
throw new IOException("CRC doesn't match " + crcValue +
" vs " + crc.getValue());
}
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(bytes, hdr);//反序列化事務(wù)
System.out.println(DateFormat.getDateTimeInstance(DateFormat.SHORT,
DateFormat.LONG).format(new Date(hdr.getTime()))
+ " session 0x"
+ Long.toHexString(hdr.getClientId())
+ " cxid 0x"
+ Long.toHexString(hdr.getCxid())
+ " zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " " + TraceFormatter.op2String(hdr.getType()) + " " + txn);
if (logStream.readByte("EOR") != 'B') {
LOG.error("Last transaction was partial.");
throw new EOFException("Last transaction was partial.");
}
count++;
}
}
事務(wù)日志可視化效果
針對http://www.itdecent.cn/p/d1f8b9d6ad57貼出的demo
利用LogFormatter進行解析,事先把事務(wù)日志目錄清空
輸出為
ZooKeeper Transactional Log File with dbid 0 txnlog format version 2
17-5-24 下午04時15分41秒 session 0x15c398687180000 cxid 0x0 zxid 0x1 createSession 20000
17-5-24 下午04時15分41秒 session 0x15c398687180000 cxid 0x2 zxid 0x2 create '/test1,#7a6e6f646531,v{s{31,s{'world,'anyone}}},T,1
17-5-24 下午04時15分41秒 session 0x15c398687180000 cxid 0x3 zxid 0x3 create '/test2,#7a6e6f646532,v{s{31,s{'world,'anyone}}},T,2
17-5-24 下午04時15分41秒 session 0x15c398687180000 cxid 0x4 zxid 0x4 create '/test3,#7a6e6f646533,v{s{31,s{'world,'anyone}}},T,3
17-5-24 下午04時15分43秒 session 0x15c398687180000 cxid 0x9 zxid 0x5 setData '/test2,#7a4e6f64653232,1
17-5-24 下午04時15分43秒 session 0x15c398687180000 cxid 0xb zxid 0x6 delete '/test2
17-5-24 下午04時15分43秒 session 0x15c398687180000 cxid 0xc zxid 0x7 delete '/test1
17-5-24 下午04時16分04秒 session 0x15c398687180000 cxid 0x0 zxid 0x8 closeSession null
EOF reached after 8 txns.
結(jié)合FileTxnLog#append很好理解
吐槽
tag不匹配
序列化時
org.apache.zookeeper.server.persistence.FileTxnLog#append里面是
oa.writeLong(crc.getValue(), "txnEntryCRC");//將驗證算法的值寫入long
反序列化,解析的時候是
org.apache.zookeeper.server.LogFormatter#main
crcValue = logStream.readLong("crcvalue");
這倆tag都不一樣,雖然并不影響運行?。?!
FileTxnLog#getLogFiles效率低
都已經(jīng)按zxid升序排序了,一次循環(huán)就該搞定了
思考
文件后綴是按照zxid來生成的
logFileWrite = new File(logDir, ("log." + Long.toHexString(hdr.getZxid())));
這對于定位文件,zxid都提供了一些便利
比如在getLastLoggedZxid中的調(diào)用
rollLog函數(shù)的意義
函數(shù)沒有參數(shù)
一定要注意,是從當前日志,滾到下一個日志(比如日志量太大了之類的場景)
不是回滾日志里面的記錄,試想回滾怎么能不告訴回滾的zxid呢
可以比較一下,rollLog函數(shù)造成logstream為null,append函數(shù)便會生成新的文件logFileWrite,新的流logStream
commit和rollLog兩個函數(shù)都調(diào)用了flush,區(qū)別是什么
涉及到FileChannel,nio相關(guān),
寫入FileChannel調(diào)用鏈如下
org.apache.zookeeper.server.persistence.FileTxnLog#append
org.apache.zookeeper.server.persistence.FileTxnLog#padFile
org.apache.zookeeper.server.persistence.Util#padLogFile
java.nio.channels.FileChannel#write(java.nio.ByteBuffer, long)
用了FileChannel的write方法
在commit函數(shù)中調(diào)用了
log.getChannel().force(false);
即java.nio.channels.FileChannel#force
查閱相關(guān)資料如 https://java-nio.avenwu.net/java-nio-filechannel.html
說明了
force方法會把所有未寫磁盤的數(shù)據(jù)都強制寫入磁盤。
這是因為在操作系統(tǒng)中出于性能考慮回把數(shù)據(jù)放入緩沖區(qū),所以不能保證數(shù)據(jù)在調(diào)用write寫入文件通道后就及時寫到磁盤上了,除非手動調(diào)用force方法。
force方法需要一個布爾參數(shù),代表是否把meta data也一并強制寫入。
也就是只有commit方法會進行真正的寫入磁盤,rollLog并沒有
事務(wù)日志什么時候會調(diào)用truncate 清空部分日志
集群版learner向leader同步的時候,leader告訴learner需要回滾同步
調(diào)用方Learner#syncWithLeader,后面40節(jié)會講
zxid是什么,cxid是什么
cxid是client端的xid號,如ClientCnxn#getXid
zxid是server端的xid號,如ZooKeeperServer#getNextZxid
這個在后續(xù)看完了請求處理的相關(guān)源碼之后就好理解了
問題
rollLog函數(shù)調(diào)用flush的作用
上面講了commit和rollLog兩個函數(shù)的區(qū)別
rollLog調(diào)用flush,最后的效果是什么呢?又沒有寫入磁盤(否則不會再調(diào)用commit)
寫入了內(nèi)存嗎?又沒有調(diào)用FileChannel的相關(guān)方法。
refer
http://www.cnblogs.com/leesf456/p/6279956.html
如何查看事務(wù)日志 FileTxnLog
什么是事務(wù)性日志
ZooKeeper運維之數(shù)據(jù)文件和事務(wù)日志