Tendermint abci 項目主頁
這篇文章以 ABCI 示例 KVStore 應用及默認的 socket 連接為例說明 ABCI 應用的啟動及 abci-cli 客戶端與其交互的過程,以加深開發(fā) ABCI 應用的模式及源碼組織方式的理解。
整體流程說明
-
ABCI 應用服務端:在命令行執(zhí)行
abci-cli kvstore啟動應用后,它會在 46658 端口等待客戶端的 TCP 連接。 -
abci-cli 客戶端:這個客戶端指的是
echo、info和deliverTx等子命令。執(zhí)行這些命令會建立一條與 ABCI 應用服務端的 TCP 連接,并將子命令后面的參數(shù)當作請求消息發(fā)送給應用服務端進行處理(這里的 ABCI 應用與 Tendermint 節(jié)點綁定在一起)。
構(gòu)建命令過程
程序入口在 abci/cmd/abci-cli/main.go 的 Execute 函數(shù)。
這里面做事情有:
構(gòu)建
RootCmd命令,即abci-cli命令及各子命令。-
注冊全局 Flags,主要包括:
- ABCI 應用服務端監(jiān)聽地址
flagAddress,默認為:tcp://0.0.0.0:46658 - abci-cli 客戶端與 ABCI 應用服務端的通信協(xié)議
flagAbci,默認為:socket - 日志等級
flagLogLevel,默認為:debug
- ABCI 應用服務端監(jiān)聽地址
添加 ABCI 應用
kvstore和dummy以及echoCmd、infoCmd、deliverTxCmd和commitCmd等客戶端命令。
RootCmd、kvstore 命令的實現(xiàn)及啟動
RootCmd 主命令邏輯
所有子命令都要添加到 RootCmd 主命令下面。
執(zhí)行 abci-cli 命令只會列出其使用文檔,在執(zhí)行具體子命令時才會執(zhí)行其定義的相應應用邏輯。
這里只需看這段代碼:
// 執(zhí)行主命令時如果 client 為空,會創(chuàng)建 client 并啟動,會根據(jù) flagAbci 參數(shù)來判斷要創(chuàng)建 socket 客戶端
// 還是 RPC 客戶端
if client == nil {
var err error
client, err = abcicli.NewClient(flagAddress, flagAbci, false)
if err != nil {
return err
}
client.SetLogger(logger.With("module", "abci-client"))
// 啟動客戶端,實際執(zhí)行的是 client.OnStart() 函數(shù)
if err := client.Start(); err != nil {
return err
}
}
abci-cli 客戶端
在命令行執(zhí)行 abci-cli echo hello,會與 ABCI 應用服務端建立一條 TCP 連接并將 “abc” 發(fā)送到服務端進行處理,收到應后斷開連接。但這樣比較麻煩,可以使用 abci-cli console 命令可以在交互式命令行中與應用服務端交互。
通過調(diào)用 abcicli.NewClient 函數(shù)來創(chuàng)建客戶端。
返回的是接口 abci/client/Client,這個接口有 socket、grpc 和 local 三種實現(xiàn),但 flag 只可以指定前兩種,默認為 socket。
func NewClient(addr, transport string, mustConnect bool) (client Client, err error) {
switch transport {
case "socket":
client = NewSocketClient(addr, mustConnect)
case "grpc":
client = NewGRPCClient(addr, mustConnect)
default:
err = fmt.Errorf("Unknown abci transport %s", transport)
}
return
}
abci/client/Client 接口繼承了 tmlibs/common/service/Service 接口,可以啟動、停止和重置??蛻舳撕头斩硕夹枰@些功能,使用時可以通過把 BaseService 作為自定義結(jié)構(gòu)的匿名字段來實現(xiàn)。
首先看 socketClient 函數(shù)的結(jié)構(gòu),它實現(xiàn)了 abci/client/Client 接口,由于 cmn.BaseService 結(jié)構(gòu)是它的匿名字段,也間接實現(xiàn)了 tmlibs/common/service/Service 接口:
type socketClient struct {
// 這個結(jié)構(gòu)實現(xiàn)了 Service 接口,內(nèi)部包含 impl Service字段,即具體實現(xiàn) Service 的結(jié)構(gòu)
cmn.BaseService
// 用來傳遞請求及應答消息的通道,
reqQueue chan *ReqRes
flushTimer *cmn.ThrottleTimer
mustConnect bool
mtx sync.Mutex
addr string
conn net.Conn
err error
// 標準庫中的雙向鏈表,從 reqQueue 讀取到的請求消息都會先放入這個鏈表的尾端
reqSent *list.List
resCb func(*types.Request, *types.Response) // listens to all callbacks
}
創(chuàng)建 socketClient 的函數(shù):
func NewSocketClient(addr string, mustConnect bool) *socketClient {
cli := &socketClient{
reqQueue: make(chan *ReqRes, reqQueueSize),
flushTimer: cmn.NewThrottleTimer("socketClient", flushThrottleMS),
mustConnect: mustConnect,
addr: addr,
reqSent: list.New(),
resCb: nil,
}
// 這里傳入了具體實現(xiàn) Service 的結(jié)構(gòu) cli
cli.BaseService = *cmn.NewBaseService(nil, "socketClient", cli)
return cli
}
在命令行執(zhí)行 abci-cli kvstore 命令時會執(zhí)行 client.Start() 啟動服務,這里用默認的 socket 連接及 kvstore 應用舉例說明。這里執(zhí)行的是 socketClient 結(jié)構(gòu)中匿名字段 cmn.BaseService 的方法:
func (bs *BaseService) Start() error {
if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
if atomic.LoadUint32(&bs.stopped) == 1 {
bs.Logger.Error(Fmt("Not starting %v -- already stopped", bs.name), "impl", bs.impl)
return ErrAlreadyStopped
}
bs.Logger.Info(Fmt("Starting %v", bs.name), "impl", bs.impl)
// 這里實際執(zhí)行上面?zhèn)魅氲?cli(即 socketClient 結(jié)構(gòu)) 的 OnStart 函數(shù)來啟動服務
err := bs.impl.OnStart()
if err != nil {
// revert flag
atomic.StoreUint32(&bs.started, 0)
return err
}
return nil
}
bs.Logger.Debug(Fmt("Not starting %v -- already started", bs.name), "impl", bs.impl)
return ErrAlreadyStarted
}
以上就是客戶端的啟動過程。
ABCI 應用服務端
現(xiàn)在看一下執(zhí)行 abci-cli kvstore 命令都做了什么。
執(zhí)行此命令時,實際執(zhí)行的是 cmdKVStore 函數(shù),啟動了應用服務端,在 tcp://0.0.0.0:46658 監(jiān)聽連接。
啟動服務端:
// 默認創(chuàng)建 socket 服務端
srv, err := server.NewServer(flagAddrD, flagAbci, app)
if err != nil {
return err
}
srv.SetLogger(logger.With("module", "abci-server"))
if err := srv.Start(); err != nil {
return err
}
NewSocketServer 創(chuàng)建服務端:
func NewSocketServer(protoAddr string, app types.Application) cmn.Service {
proto, addr := cmn.ProtocolAndAddress(protoAddr)
s := &SocketServer{
proto: proto,
addr: addr,
listener: nil,
app: app,
conns: make(map[int]net.Conn),
}
// 這里使用的模式與 Client 端一致
s.BaseService = *cmn.NewBaseService(nil, "ABCIServer", s)
return s
}
執(zhí)行 srv.Start() 函數(shù)時,實際執(zhí)行的是 SocketServer 的實現(xiàn),通過 BaseService 結(jié)構(gòu)的 Start 方法調(diào)用:
func (s *SocketServer) OnStart() error {
if err := s.BaseService.OnStart(); err != nil {
return err
}
ln, err := net.Listen(s.proto, s.addr)
if err != nil {
return err
}
s.listener = ln
// 啟動一個協(xié)程來監(jiān)聽連接
go s.acceptConnectionsRoutine()
return nil
}
至此已經(jīng)把 ABCI 應用服務端是如何啟動的說明了,下面的部分會詳細說明請求及應答處理的細節(jié)。
請求及應答處理
這部分以 deliver_tx 命令為例來進行說明。
abci-cli 客戶端
為了方便,這里再看一下 socketClient 的數(shù)據(jù)結(jié)構(gòu):
type socketClient struct {
cmn.BaseService
reqQueue chan *ReqRes
flushTimer *cmn.ThrottleTimer
mustConnect bool
mtx sync.Mutex
addr string
conn net.Conn
err error
// 這里會把請求寫入雙向鏈表 reqSent 的尾端,在 recvResponseRoutine 函數(shù)中接收到應答時會從此鏈表取出
// 第一個請求進行類型比較,如果與應答類型一樣則返回給前端
reqSent *list.List
resCb func(*types.Request, *types.Response) // listens to all callbacks
}
OnStart 函數(shù)所做的就是與服務端建立連接,啟動兩個協(xié)程來處理請求與應答。
先看處理請求的函數(shù) sendRequestsRoutine:
func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {
w := bufio.NewWriter(conn)
for {
select {
// 發(fā)送 flush 類型請求的定時器
case <-cli.flushTimer.Ch:
select {
case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
default:
// Probably will fill the buffer, or retry later.
}
case <-cli.Quit():
return
case reqres := <-cli.reqQueue:
// 這里會把請求寫入雙向鏈表 reqSent 的尾端
cli.willSendReq(reqres)
// 把請求消息寫入連接緩沖,這時還沒有發(fā)送給服務端
err := types.WriteMessage(reqres.Request, w)
if err != nil {
cli.StopForError(fmt.Errorf("Error writing msg: %v", err))
return
}
// 如果請求是 flush 類型,會把緩沖的請求消息 (包括此 flush 請求) 寫入連接,
// 由 kvstore 服務端接收并處理。
// 有兩種方式發(fā)送 flush 類型的請求:1) 定時器觸發(fā);2) DeliverTxSync 函數(shù)中主動發(fā)送
if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
err = w.Flush()
if err != nil {
cli.StopForError(fmt.Errorf("Error flushing writer: %v", err))
return
}
}
}
}
}
現(xiàn)在看處理應答的 recvResponseRoutine 函數(shù):
func (cli *socketClient) recvResponseRoutine(conn net.Conn) {
r := bufio.NewReader(conn) // Buffer reads
for {
var res = &types.Response{}
// 從連接中讀取應答,出錯時會關閉連接并執(zhí)行 flushQueue() 釋放 wg.WaitGroup
err := types.ReadMessage(r, res)
if err != nil {
cli.StopForError(err)
return
}
switch r := res.Value.(type) {
case *types.Response_Exception:
cli.StopForError(errors.New(r.Exception.Error))
return
default:
// 應答處理邏輯在這里
err := cli.didRecvResponse(res)
if err != nil {
cli.StopForError(err)
return
}
}
}
}
func (cli *socketClient) didRecvResponse(res *types.Response) error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
// 從雙向鏈表 reqSent 中取出第一個請求
next := cli.reqSent.Front()
if next == nil {
return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res.Value))
}
reqres := next.Value.(*ReqRes)
// 檢查請求與應答的類型是否匹配
if !resMatchesReq(reqres.Request, res) {
return fmt.Errorf("Unexpected result type %v when response to %v expected",
reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
}
reqres.Response = res // Set response
reqres.Done() // 釋放此請求創(chuàng)建時執(zhí)行的 wg.Add(1)
cli.reqSent.Remove(next) // 從鏈表中刪除第一個請求
// Notify reqRes listener if set
if cb := reqres.GetCallback(); cb != nil {
cb(res)
}
// Notify client listener if set
if cli.resCb != nil {
cli.resCb(reqres.Request, res)
}
return nil
}
ABCI 應用服務端
創(chuàng)建應用細節(jié):
先看應用的數(shù)據(jù)結(jié)構(gòu) KVStoreApplication:
// 這個結(jié)構(gòu)只實現(xiàn)了 Info、DeliverTx、CheckTx、Commit 和 Query 方法
type KVStoreApplication struct {
// 這個基礎結(jié)構(gòu)實現(xiàn)了 "tendermint/abci/types/Application" 接口(此項目中基本都是用的這種模式)。
// 這個結(jié)構(gòu)實現(xiàn)的接口的方法中沒有具體應用邏輯,以供開發(fā)者在自己的應用結(jié)構(gòu)中繼承此結(jié)構(gòu)后,可以只實現(xiàn)
// 必須的方法,而無需實現(xiàn)接口的全部方法
types.BaseApplication
state State
}
創(chuàng)建應用:
func NewKVStoreApplication() *KVStoreApplication {
state := loadState(dbm.NewMemDB())
return &KVStoreApplication{state: state}
}
主要看 loadState 函數(shù),它根據(jù)鍵 stateKey 從內(nèi)存存儲 MemDB 結(jié)構(gòu)中獲取對應狀態(tài),因為是初始化,肯定沒有對應值,返回的是一個帶有新建 MemDB (就是一個帶鎖的 map)的 State:
func loadState(db dbm.DB) State {
stateBytes := db.Get(stateKey)
var state State
if len(stateBytes) != 0 {
err := json.Unmarshal(stateBytes, &state)
if err != nil {
panic(err)
}
}
state.db = db
return state
}
服務端處理請求及應答細節(jié):
重點看接受連接的函數(shù):
func (s *SocketServer) acceptConnectionsRoutine() {
for {
// 接受連接,下面這些日志就是命令行啟動 kvstore 后看到的信息
s.Logger.Info("Waiting for new connection...")
conn, err := s.listener.Accept()
if err != nil {
if !s.IsRunning() {
return // Ignore error from listener closing.
}
s.Logger.Error("Failed to accept connection: " + err.Error())
continue
}
s.Logger.Info("Accepted a new connection")
// 可以接受多條連接并記錄
connID := s.addConn(conn)
closeConn := make(chan error, 2) // Push to signal connection closed
responses := make(chan *types.Response, 1000) // A channel to buffer responses
// 從連接讀取請求并處理
go s.handleRequests(closeConn, conn, responses)
// 從 'responses' 獲取應答并寫到連接中
go s.handleResponses(closeConn, conn, responses)
// 等待信號來關閉連接
go s.waitForClose(closeConn, connID)
}
}
先看處理請求的 handleRequests 函數(shù):
func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
var count int
var bufReader = bufio.NewReader(conn)
for {
var req = &types.Request{}
// 從連接上讀取請求消息,讀取完畢或出錯后要通知 waitForClose 協(xié)程來關閉連接
err := types.ReadMessage(bufReader, req)
if err != nil {
if err == io.EOF {
closeConn <- err
} else {
closeConn <- fmt.Errorf("Error reading message: %v", err.Error())
}
return
}
s.appMtx.Lock()
count++
// 處理請求時要加鎖,這個函數(shù)會根據(jù)請求的類型調(diào)用具體函數(shù)來處理,
// 比如 types.Request_DeliverTx 類型時就會調(diào)用 KVStoreApplication.DeliverTx 函數(shù)來處理,
// 應答會寫入 responses 通道,以便 handleResponses 函數(shù)處理
s.handleRequest(req, responses)
s.appMtx.Unlock()
}
}
現(xiàn)在看 handleResponses 函數(shù):
func (s *SocketServer) handleResponses(closeConn chan error, conn net.Conn, responses <-chan *types.Response) {
var count int
var bufWriter = bufio.NewWriter(conn)
for {
// 從 responses 通道讀取應答并寫入連接。同樣,出錯時要通知 waitForClose 來關閉連接
var res = <-responses
err := types.WriteMessage(res, bufWriter)
if err != nil {
closeConn <- fmt.Errorf("Error writing message: %v", err.Error())
return
}
// flush 類型的應答是哪里來的?
// 與客戶端處理類似,如果是此類型要進行 Flush 處理,把緩沖的數(shù)據(jù)寫入連接
if _, ok := res.Value.(*types.Response_Flush); ok {
err = bufWriter.Flush()
if err != nil {
closeConn <- fmt.Errorf("Error flushing write buffer: %v", err.Error())
return
}
}
count++
}
}
deliver_tx 子命令執(zhí)行過程
現(xiàn)在以此命令為例說明發(fā)起請求及收到應答的整體過程。
- 服務端啟動,啟動兩個協(xié)程,一個處理請求,一個處理應答。
- 在命令行輸入
abci-cli console進入交互模式,創(chuàng)建客戶端并與服務端建立了持久連接。服務端和客戶端各啟動兩個協(xié)程,一個處理請求,一個處理應答。 - 輸入
deliver_tx "abc",客戶端會識別子命令,調(diào)用cmdDeliverTx函數(shù),此函數(shù)在解析到 tx 后進行編碼,隨后會調(diào)用cli.DeliverTxSync(txBytes)(同步的) 函數(shù)。 - 請求消息寫入連接后,服務端讀取到此請求。
- 根據(jù)請求類型,調(diào)用 KVStore 應用的
DeliverTx函數(shù)進行處理。 - 服務端處理完畢后的應答寫入連接,客戶端接收到應答,前端打印顯示到命令行。