- 消息格式為 應(yīng)用名-IP-小時(shí)正點(diǎn)數(shù)-消息遞增號 MessageId
- 每個(gè)
應(yīng)用 + IP + 整點(diǎn)小時(shí)對應(yīng): 一個(gè)索引文件 和 一個(gè)數(shù)據(jù)文件 - 消息經(jīng)過編碼后,首4字節(jié)為該消息的大小,從文件中讀消息的時(shí)候會(huì)用到這個(gè)特性

寫消息過程
- 獲取
MessageBlock中的MessageTree個(gè)數(shù),進(jìn)行遍歷 - 獲得每個(gè)
MessageTree的index(索引遞增號) 和 每個(gè)MessageTree的size(數(shù)據(jù)大小) - 設(shè)置索引文件的起始位置
索引遞增號*6 - 將該該消息所對應(yīng)block在數(shù)據(jù)文件中的起始地址寫到索引文件(4字節(jié))
- 將該該消息在block中的偏移量寫入索引文件(2字節(jié))
- 將block的內(nèi)容長度寫入數(shù)據(jù)文件
- 將block的內(nèi)容寫入dataFile
// MessageBlockWriter.java
public synchronized void writeBlock(MessageBlock block) throws IOException {
// block中消息條數(shù)
int len = block.getBlockSize();
// block大小
byte[] data = block.getData();
// 用于在遍歷過程中記錄每條消息的偏移量,遍歷完成之后,blockSize等于block的大小
int blockSize = 0;
ByteBuffer buffer = ByteBuffer.allocate(4 + data.length);
buffer.order(ByteOrder.BIG_ENDIAN);
for (int i = 0; i < len; i++) {
// 消息的遞增號
int seq = block.getIndex(i);
// 消息的大小
int size = block.getSize(i);
// m_indexFile.seek(seq * 6L);
// 該消息在索引文件的起始位置 遞增號*6 ,表示每條消息在索引文件中占6個(gè)字節(jié)大小
m_indexChannel.position(seq * 6L);
// m_indexFile.writeInt(m_blockAddress);
// m_indexFile.writeShort(blockSize);
// 用于記錄該消息所對應(yīng)block在數(shù)據(jù)文件中的起始地址
buffer.putInt(m_blockAddress);
// 用于記錄該消息在block中的偏移量
buffer.putShort((short) blockSize);
buffer.flip();
// 寫入索引文件
m_indexChannel.write(buffer);
// 計(jì)算下一條消息在該block中的偏移量
blockSize += size;
buffer.clear();
}
// m_dataFile.writeInt(data.length);
// m_dataFile.write(data);
buffer = ByteBuffer.allocate(4 + data.length);
buffer.order(ByteOrder.BIG_ENDIAN);
// 先在數(shù)據(jù)文件中用4個(gè)字節(jié)記錄 block 的大小
buffer.putInt(data.length);
// 再將block的內(nèi)容寫入數(shù)據(jù)文件
buffer.put(data);
buffer.flip();
m_dataChannel.write(buffer);
// 更新 m_blockAddress 的值,即數(shù)據(jù)文件下一次寫入時(shí)的起始位置
m_blockAddress += data.length + 4;
}
即數(shù)據(jù)文件中的存儲(chǔ)結(jié)構(gòu)為: 【blockSize(4byte)->blockData】=>【blockSize(4byte)->blockData】
索引文件的存儲(chǔ)結(jié)構(gòu)為: 【blackAddr(4byte)->messageOffsetInBlock(2byte)】 => 【blackAddr(4byte)->messageOffsetInBlock(2byte)】
讀消息過程
對于真正的文件存儲(chǔ),block在這里其實(shí)是一個(gè)抽象的概念; 如果是直接以Message為單位進(jìn)行寫文件,那這個(gè) block 和 索引文件中的block偏移量 就沒有什么意義了。但實(shí)際上消息是以block為單位進(jìn)行寫文件,一個(gè)block最大為64K,而一個(gè)block中又存在多條消息,所以每條消息在它所屬的block中有一個(gè)偏移量
- 根據(jù) 索引遞增號從索引文件讀前4個(gè)字節(jié) 找到block的地址
- 該地址為起始地址,從數(shù)據(jù)文件中讀取一個(gè)int類型數(shù)據(jù)(4個(gè)字節(jié))作為該block的長度
- 根據(jù)該長度讀取整個(gè)block的內(nèi)容到byte數(shù)組
- 根據(jù) 索引遞增號從索引文件讀后2個(gè)字節(jié) 找到該消息在該block中的偏移地址
- 以偏移地址為起始地址,讀取一個(gè)int類型數(shù)據(jù)(4個(gè)字節(jié))作為該消息的大小(為什么讀4字節(jié)?這是在對消息編碼時(shí)決定的,首4字節(jié)表示該消息的大小)
- 根據(jù)偏移地址 和 上一步獲取的int類型數(shù)據(jù)大小 讀取Message
// MessageBlockReader.java
private DataInputStream createDataInputStream(byte[] buf) {
DataInputStream in = null;
try {
in = new DataInputStream(new SnappyInputStream(new ByteArrayInputStream(buf)));
} catch (IOException e) {
try {
in = new DataInputStream(new GZIPInputStream(new ByteArrayInputStream(buf)));
} catch (IOException ioe) {
Cat.logError(ioe);
}
}
return in;
}
public byte[] readMessage(int index) throws IOException {
int blockAddress = 0;
int blockOffset = 0;
// 索引 在索引文件的起始位置
m_indexFile.seek(index * 6L);
// 讀出4字節(jié),該值代表block在數(shù)據(jù)文件的起始位置
blockAddress = m_indexFile.readInt();
// 讀出2字節(jié) 該值代表Message在block中的偏移量
blockOffset = m_indexFile.readShort() & 0xFFFF;
// 從數(shù)據(jù)文件的 blockAddress 地址開始訪問數(shù)據(jù)
m_dataFile.seek(blockAddress);
// 4字節(jié)里面存的是block塊的長度
byte[] buf = new byte[m_dataFile.readInt()];
// 從數(shù)據(jù)文件中讀取整個(gè)block到buf數(shù)組
m_dataFile.readFully(buf);
DataInputStream in = createDataInputStream(buf);
if (in != null) {
try {
// 跳到block中的偏移量
in.skip(blockOffset);
// 該值代表消息長度
int len = in.readInt();
byte[] data = new byte[len];
// 從block中讀取Message
in.readFully(data);
return data;
} finally {
try {
in.close();
} catch (Exception e) {
// ignore it
}
}
} else {
return null;
}
}
聽說還有V2版本,分 以一級索引和二級索引,可我拉代碼沒看到呀