從farbic ledger block中提取對(duì)應(yīng)kafka的offset

從farbic ledger block中提取對(duì)應(yīng)kafka的offset

如果kafka模式下面的沒(méi)有block都包含對(duì)應(yīng)kafka topic的offset,下面的代碼從一個(gè)block里面提取kafka的offset值。

輸入:blockfile
輸出:kafkaoffset

package main

import (
    "os"
    "fmt"
    "log"
    "io/ioutil"
    "github.com/Shopify/sarama"
    "github.com/golang/protobuf/proto"

 cb "github.com/hyperledger/fabric/protos/common"
 ab "github.com/hyperledger/fabric/protos/orderer"
)

// Get kafka offset of a given block file
// input: path to blockfile
// output: kafka offset
func main() {
    if len(os.Args) < 2 {
        log.Printf("ERROR: invalid parameter, usage: %s path/to/blockfile\n", os.Args[0])
        os.Exit(1)
    }

    blockBytes, err := ioutil.ReadFile(os.Args[1])
    if err != nil {
        log.Printf("ERROR: cannot read block file, error: %v\n", err)
        os.Exit(1)
    }

    block := &cb.Block{}
    if err = proto.Unmarshal(blockBytes, block); err != nil {
        log.Printf("ERROR: invalid block file content, unmarshal error: %v\n", err)
        os.Exit(1)
    }
    metadataValue := block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER]

    md := &cb.Metadata{}
    err = proto.Unmarshal(metadataValue, md)
    if err != nil {
        log.Printf("ERROR: invalid metadata value, unmarshal error: %v\n", err)
        os.Exit(1)
    }

    if lastoffset, _, _, err := getOffsets(md.Value); err != nil {
        os.Exit(1)
    } else {
        fmt.Printf("%d\n", lastoffset)
    }
}

func getOffsets(metadataValue []byte) (persisted int64, processed int64, resubmitted int64, err error) {
    if metadataValue != nil {
        kafkaMetadata := &ab.KafkaMetadata{}
        if err := proto.Unmarshal(metadataValue, kafkaMetadata); err != nil {
            log.Printf("ERROR: invalid kafka metavalue, unmarshal error: %v\n", err)
            return 0, 0, 0, err
        }
        return kafkaMetadata.LastOffsetPersisted,
            kafkaMetadata.LastOriginalOffsetProcessed,
            kafkaMetadata.LastResubmittedConfigOffset,
            nil
    }
    return sarama.OffsetOldest - 1, int64(0), int64(0), nil // default
}

用法:

$ ./main path/to/config.pb 
12203

blockfile的獲取可以使用peer fetch:

$ peer channel fetch newest path/to/config.pb -o ${ORDERERADDR} -c ${CHANNEL}

然后我們用kafka命令行去查看topic的值:

$ /opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <KAFKAHOST>:<PORT> --topic <TOPIC>
<TOPIC>:0:12203

需要注意的是,用kafka命令行去讀取的時(shí)候返回值有時(shí)會(huì)比newest塊里面讀出的值大于幾個(gè)3,例如上述例子可能會(huì)讀出:12206, 12209, 等等,12203 + 3 * N,取決于延遲了多久。
因?yàn)檫@有一個(gè)延遲,orderer不停的往kafka里面寫(xiě)數(shù)據(jù),數(shù)據(jù)大小是3;等我有空的時(shí)候再去研究一下這個(gè)大小是3的數(shù)據(jù)都是些什么內(nèi)容。

再補(bǔ)充一點(diǎn)前面文件怎么編譯:

  1. fabric-go-sdk是一個(gè)很不成熟的產(chǎn)品,幾乎不能用。
  2. 所以我們build都是直接用fabric本身的原代碼。

假設(shè)fabric源代碼已經(jīng)下載放到:${GOPATH}/src/github.com/hyperledger/fabric目錄下面。

$ cd ${GOPATH}/src/github.com/hyperledger/fabric
$ vim main.go
$ GOPATH=${GOPATH} go build main.go 
$ ./main.go path/to/blockfile
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容