從farbic ledger block中提取對(duì)應(yīng)kafka的offset
如果kafka模式下面的沒(méi)有block都包含對(duì)應(yīng)kafka topic的offset,下面的代碼從一個(gè)block里面提取kafka的offset值。
輸入:blockfile
輸出:kafkaoffset
package main
import (
"os"
"fmt"
"log"
"io/ioutil"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
)
// Get kafka offset of a given block file
// input: path to blockfile
// output: kafka offset
func main() {
if len(os.Args) < 2 {
log.Printf("ERROR: invalid parameter, usage: %s path/to/blockfile\n", os.Args[0])
os.Exit(1)
}
blockBytes, err := ioutil.ReadFile(os.Args[1])
if err != nil {
log.Printf("ERROR: cannot read block file, error: %v\n", err)
os.Exit(1)
}
block := &cb.Block{}
if err = proto.Unmarshal(blockBytes, block); err != nil {
log.Printf("ERROR: invalid block file content, unmarshal error: %v\n", err)
os.Exit(1)
}
metadataValue := block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER]
md := &cb.Metadata{}
err = proto.Unmarshal(metadataValue, md)
if err != nil {
log.Printf("ERROR: invalid metadata value, unmarshal error: %v\n", err)
os.Exit(1)
}
if lastoffset, _, _, err := getOffsets(md.Value); err != nil {
os.Exit(1)
} else {
fmt.Printf("%d\n", lastoffset)
}
}
func getOffsets(metadataValue []byte) (persisted int64, processed int64, resubmitted int64, err error) {
if metadataValue != nil {
kafkaMetadata := &ab.KafkaMetadata{}
if err := proto.Unmarshal(metadataValue, kafkaMetadata); err != nil {
log.Printf("ERROR: invalid kafka metavalue, unmarshal error: %v\n", err)
return 0, 0, 0, err
}
return kafkaMetadata.LastOffsetPersisted,
kafkaMetadata.LastOriginalOffsetProcessed,
kafkaMetadata.LastResubmittedConfigOffset,
nil
}
return sarama.OffsetOldest - 1, int64(0), int64(0), nil // default
}
用法:
$ ./main path/to/config.pb
12203
blockfile的獲取可以使用peer fetch:
$ peer channel fetch newest path/to/config.pb -o ${ORDERERADDR} -c ${CHANNEL}
然后我們用kafka命令行去查看topic的值:
$ /opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <KAFKAHOST>:<PORT> --topic <TOPIC>
<TOPIC>:0:12203
需要注意的是,用kafka命令行去讀取的時(shí)候返回值有時(shí)會(huì)比newest塊里面讀出的值大于幾個(gè)3,例如上述例子可能會(huì)讀出:12206, 12209, 等等,12203 + 3 * N,取決于延遲了多久。
因?yàn)檫@有一個(gè)延遲,orderer不停的往kafka里面寫(xiě)數(shù)據(jù),數(shù)據(jù)大小是3;等我有空的時(shí)候再去研究一下這個(gè)大小是3的數(shù)據(jù)都是些什么內(nèi)容。
再補(bǔ)充一點(diǎn)前面文件怎么編譯:
- fabric-go-sdk是一個(gè)很不成熟的產(chǎn)品,幾乎不能用。
- 所以我們build都是直接用fabric本身的原代碼。
假設(shè)fabric源代碼已經(jīng)下載放到:${GOPATH}/src/github.com/hyperledger/fabric目錄下面。
$ cd ${GOPATH}/src/github.com/hyperledger/fabric
$ vim main.go
$ GOPATH=${GOPATH} go build main.go
$ ./main.go path/to/blockfile