Cat消息存儲(chǔ)

  1. 消息格式為 應(yīng)用名-IP-小時(shí)正點(diǎn)數(shù)-消息遞增號 MessageId
  2. 每個(gè) 應(yīng)用 + IP + 整點(diǎn)小時(shí) 對應(yīng): 一個(gè)索引文件 和 一個(gè)數(shù)據(jù)文件
  3. 消息經(jīng)過編碼后,首4字節(jié)為該消息的大小,從文件中讀消息的時(shí)候會(huì)用到這個(gè)特性

寫消息過程

  1. 獲取MessageBlock中的MessageTree個(gè)數(shù),進(jìn)行遍歷
  2. 獲得每個(gè)MessageTree的index(索引遞增號) 和 每個(gè)MessageTree的size(數(shù)據(jù)大小)
  3. 設(shè)置索引文件的起始位置 索引遞增號*6
  4. 將該該消息所對應(yīng)block在數(shù)據(jù)文件中的起始地址寫到索引文件(4字節(jié))
  5. 將該該消息在block中的偏移量寫入索引文件(2字節(jié))
  6. 將block的內(nèi)容長度寫入數(shù)據(jù)文件
  7. 將block的內(nèi)容寫入dataFile
// MessageBlockWriter.java
public synchronized void writeBlock(MessageBlock block) throws IOException {
    // block中消息條數(shù)
    int len = block.getBlockSize();
    // block大小
    byte[] data = block.getData();

    // 用于在遍歷過程中記錄每條消息的偏移量,遍歷完成之后,blockSize等于block的大小
    int blockSize = 0;

    ByteBuffer buffer = ByteBuffer.allocate(4 + data.length);
    buffer.order(ByteOrder.BIG_ENDIAN);

    for (int i = 0; i < len; i++) {
        // 消息的遞增號
        int seq = block.getIndex(i);
        // 消息的大小
        int size = block.getSize(i);

        // m_indexFile.seek(seq * 6L);
        // 該消息在索引文件的起始位置 遞增號*6 ,表示每條消息在索引文件中占6個(gè)字節(jié)大小
        m_indexChannel.position(seq * 6L);

        // m_indexFile.writeInt(m_blockAddress);
        // m_indexFile.writeShort(blockSize);
        // 用于記錄該消息所對應(yīng)block在數(shù)據(jù)文件中的起始地址
        buffer.putInt(m_blockAddress);
        // 用于記錄該消息在block中的偏移量
        buffer.putShort((short) blockSize);
        buffer.flip();
        // 寫入索引文件
        m_indexChannel.write(buffer);

        // 計(jì)算下一條消息在該block中的偏移量
        blockSize += size;

        buffer.clear();
    }

    // m_dataFile.writeInt(data.length);
    // m_dataFile.write(data);
    buffer = ByteBuffer.allocate(4 + data.length);
    buffer.order(ByteOrder.BIG_ENDIAN);
    // 先在數(shù)據(jù)文件中用4個(gè)字節(jié)記錄 block 的大小
    buffer.putInt(data.length);
    // 再將block的內(nèi)容寫入數(shù)據(jù)文件
    buffer.put(data);
    buffer.flip();
    m_dataChannel.write(buffer);

    // 更新 m_blockAddress 的值,即數(shù)據(jù)文件下一次寫入時(shí)的起始位置
    m_blockAddress += data.length + 4;
}

即數(shù)據(jù)文件中的存儲(chǔ)結(jié)構(gòu)為: 【blockSize(4byte)->blockData】=>【blockSize(4byte)->blockData】

索引文件的存儲(chǔ)結(jié)構(gòu)為: 【blackAddr(4byte)->messageOffsetInBlock(2byte)】 => 【blackAddr(4byte)->messageOffsetInBlock(2byte)】

讀消息過程

對于真正的文件存儲(chǔ),block在這里其實(shí)是一個(gè)抽象的概念; 如果是直接以Message為單位進(jìn)行寫文件,那這個(gè) block 和 索引文件中的block偏移量 就沒有什么意義了。但實(shí)際上消息是以block為單位進(jìn)行寫文件,一個(gè)block最大為64K,而一個(gè)block中又存在多條消息,所以每條消息在它所屬的block中有一個(gè)偏移量

  1. 根據(jù) 索引遞增號從索引文件讀前4個(gè)字節(jié) 找到block的地址
  2. 該地址為起始地址,從數(shù)據(jù)文件中讀取一個(gè)int類型數(shù)據(jù)(4個(gè)字節(jié))作為該block的長度
  3. 根據(jù)該長度讀取整個(gè)block的內(nèi)容到byte數(shù)組
  4. 根據(jù) 索引遞增號從索引文件讀后2個(gè)字節(jié) 找到該消息在該block中的偏移地址
  5. 以偏移地址為起始地址,讀取一個(gè)int類型數(shù)據(jù)(4個(gè)字節(jié))作為該消息的大小(為什么讀4字節(jié)?這是在對消息編碼時(shí)決定的,首4字節(jié)表示該消息的大小)
  6. 根據(jù)偏移地址 和 上一步獲取的int類型數(shù)據(jù)大小 讀取Message
// MessageBlockReader.java
private DataInputStream createDataInputStream(byte[] buf) {
    DataInputStream in = null;

    try {
        in = new DataInputStream(new SnappyInputStream(new ByteArrayInputStream(buf)));
    } catch (IOException e) {
        try {
            in = new DataInputStream(new GZIPInputStream(new ByteArrayInputStream(buf)));
        } catch (IOException ioe) {
            Cat.logError(ioe);
        }
    }
    return in;
}

public byte[] readMessage(int index) throws IOException {
    int blockAddress = 0;
    int blockOffset = 0;

    // 索引 在索引文件的起始位置
    m_indexFile.seek(index * 6L);

    // 讀出4字節(jié),該值代表block在數(shù)據(jù)文件的起始位置
    blockAddress = m_indexFile.readInt();
    // 讀出2字節(jié) 該值代表Message在block中的偏移量
    blockOffset = m_indexFile.readShort() & 0xFFFF;

    // 從數(shù)據(jù)文件的 blockAddress 地址開始訪問數(shù)據(jù)
    m_dataFile.seek(blockAddress);
    // 4字節(jié)里面存的是block塊的長度
    byte[] buf = new byte[m_dataFile.readInt()];
    // 從數(shù)據(jù)文件中讀取整個(gè)block到buf數(shù)組
    m_dataFile.readFully(buf);

    DataInputStream in = createDataInputStream(buf);

    if (in != null) {
        try {
            // 跳到block中的偏移量
            in.skip(blockOffset);
            
            // 該值代表消息長度
            int len = in.readInt();

            byte[] data = new byte[len];
            
            // 從block中讀取Message
            in.readFully(data);
            return data;
        } finally {
            try {
                in.close();
            } catch (Exception e) {
                // ignore it
            }
        }
    } else {
        return null;
    }
}

聽說還有V2版本,分 以一級索引和二級索引,可我拉代碼沒看到呀

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容