Tendermint ABCI 應用 KVStore 源碼詳解

Tendermint abci 項目主頁

這篇文章以 ABCI 示例 KVStore 應用及默認的 socket 連接為例說明 ABCI 應用的啟動及 abci-cli 客戶端與其交互的過程,以加深開發(fā) ABCI 應用的模式及源碼組織方式的理解。

整體流程說明

  1. ABCI 應用服務端:在命令行執(zhí)行 abci-cli kvstore 啟動應用后,它會在 46658 端口等待客戶端的 TCP 連接。
  2. abci-cli 客戶端:這個客戶端指的是 echoinfodeliverTx 等子命令。執(zhí)行這些命令會建立一條與 ABCI 應用服務端的 TCP 連接,并將子命令后面的參數(shù)當作請求消息發(fā)送給應用服務端進行處理(這里的 ABCI 應用與 Tendermint 節(jié)點綁定在一起)。

構(gòu)建命令過程

程序入口在 abci/cmd/abci-cli/main.goExecute 函數(shù)。

這里面做事情有:

  • 構(gòu)建 RootCmd 命令,即 abci-cli 命令及各子命令。

  • 注冊全局 Flags,主要包括:

    • ABCI 應用服務端監(jiān)聽地址 flagAddress ,默認為:tcp://0.0.0.0:46658
    • abci-cli 客戶端與 ABCI 應用服務端的通信協(xié)議 flagAbci,默認為:socket
    • 日志等級 flagLogLevel,默認為:debug
  • 添加 ABCI 應用 kvstoredummy 以及 echoCmd、infoCmd、deliverTxCmdcommitCmd 等客戶端命令。

RootCmd、kvstore 命令的實現(xiàn)及啟動

RootCmd 主命令邏輯

所有子命令都要添加到 RootCmd 主命令下面。

執(zhí)行 abci-cli 命令只會列出其使用文檔,在執(zhí)行具體子命令時才會執(zhí)行其定義的相應應用邏輯。

這里只需看這段代碼:

// 執(zhí)行主命令時如果 client 為空,會創(chuàng)建 client 并啟動,會根據(jù) flagAbci 參數(shù)來判斷要創(chuàng)建 socket 客戶端
// 還是 RPC 客戶端
if client == nil {
            var err error
            client, err = abcicli.NewClient(flagAddress, flagAbci, false)
            if err != nil {
                return err
            }
            client.SetLogger(logger.With("module", "abci-client"))
    
            // 啟動客戶端,實際執(zhí)行的是 client.OnStart() 函數(shù)
            if err := client.Start(); err != nil {
                return err
            }
        }

abci-cli 客戶端

在命令行執(zhí)行 abci-cli echo hello,會與 ABCI 應用服務端建立一條 TCP 連接并將 “abc” 發(fā)送到服務端進行處理,收到應后斷開連接。但這樣比較麻煩,可以使用 abci-cli console 命令可以在交互式命令行中與應用服務端交互。

通過調(diào)用 abcicli.NewClient 函數(shù)來創(chuàng)建客戶端。

返回的是接口 abci/client/Client,這個接口有 socketgrpclocal 三種實現(xiàn),但 flag 只可以指定前兩種,默認為 socket。

func NewClient(addr, transport string, mustConnect bool) (client Client, err error) {
  switch transport {
  case "socket":
      client = NewSocketClient(addr, mustConnect)
  case "grpc":
      client = NewGRPCClient(addr, mustConnect)
  default:
      err = fmt.Errorf("Unknown abci transport %s", transport)
  }
  return
}

abci/client/Client 接口繼承了 tmlibs/common/service/Service 接口,可以啟動、停止和重置??蛻舳撕头斩硕夹枰@些功能,使用時可以通過把 BaseService 作為自定義結(jié)構(gòu)的匿名字段來實現(xiàn)。

首先看 socketClient 函數(shù)的結(jié)構(gòu),它實現(xiàn)了 abci/client/Client 接口,由于 cmn.BaseService 結(jié)構(gòu)是它的匿名字段,也間接實現(xiàn)了 tmlibs/common/service/Service 接口:

type socketClient struct {
    // 這個結(jié)構(gòu)實現(xiàn)了 Service 接口,內(nèi)部包含 impl Service字段,即具體實現(xiàn) Service 的結(jié)構(gòu)
    cmn.BaseService

    // 用來傳遞請求及應答消息的通道,
    reqQueue    chan *ReqRes
    flushTimer  *cmn.ThrottleTimer
    mustConnect bool

    mtx     sync.Mutex
    addr    string
    conn    net.Conn
    err     error
    
    // 標準庫中的雙向鏈表,從 reqQueue 讀取到的請求消息都會先放入這個鏈表的尾端
    reqSent *list.List
    resCb   func(*types.Request, *types.Response) // listens to all callbacks
}

創(chuàng)建 socketClient 的函數(shù):

func NewSocketClient(addr string, mustConnect bool) *socketClient {
    cli := &socketClient{
        reqQueue:    make(chan *ReqRes, reqQueueSize),
        flushTimer:  cmn.NewThrottleTimer("socketClient", flushThrottleMS),
        mustConnect: mustConnect,

        addr:    addr,
        reqSent: list.New(),
        resCb:   nil,
    }
    
    // 這里傳入了具體實現(xiàn) Service 的結(jié)構(gòu) cli
    cli.BaseService = *cmn.NewBaseService(nil, "socketClient", cli)
    return cli
}

在命令行執(zhí)行 abci-cli kvstore 命令時會執(zhí)行 client.Start() 啟動服務,這里用默認的 socket 連接及 kvstore 應用舉例說明。這里執(zhí)行的是 socketClient 結(jié)構(gòu)中匿名字段 cmn.BaseService 的方法:

func (bs *BaseService) Start() error {
    if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
        if atomic.LoadUint32(&bs.stopped) == 1 {
            bs.Logger.Error(Fmt("Not starting %v -- already stopped", bs.name), "impl", bs.impl)
            return ErrAlreadyStopped
        }
        bs.Logger.Info(Fmt("Starting %v", bs.name), "impl", bs.impl)
        
        // 這里實際執(zhí)行上面?zhèn)魅氲?cli(即 socketClient 結(jié)構(gòu)) 的 OnStart 函數(shù)來啟動服務
        err := bs.impl.OnStart()
        if err != nil {
            // revert flag
            atomic.StoreUint32(&bs.started, 0)
            return err
        }
        return nil
    }
    bs.Logger.Debug(Fmt("Not starting %v -- already started", bs.name), "impl", bs.impl)
    return ErrAlreadyStarted
}

以上就是客戶端的啟動過程。

ABCI 應用服務端

現(xiàn)在看一下執(zhí)行 abci-cli kvstore 命令都做了什么。

執(zhí)行此命令時,實際執(zhí)行的是 cmdKVStore 函數(shù),啟動了應用服務端,在 tcp://0.0.0.0:46658 監(jiān)聽連接。

啟動服務端:

// 默認創(chuàng)建 socket 服務端
srv, err := server.NewServer(flagAddrD, flagAbci, app)
    if err != nil {
        return err
    }
    srv.SetLogger(logger.With("module", "abci-server"))
    if err := srv.Start(); err != nil {
        return err
    }

NewSocketServer 創(chuàng)建服務端:

func NewSocketServer(protoAddr string, app types.Application) cmn.Service {
    proto, addr := cmn.ProtocolAndAddress(protoAddr)
    s := &SocketServer{
        proto:    proto,
        addr:     addr,
        listener: nil,
        app:      app,
        conns:    make(map[int]net.Conn),
    }
    // 這里使用的模式與 Client 端一致
    s.BaseService = *cmn.NewBaseService(nil, "ABCIServer", s)
    return s
}

執(zhí)行 srv.Start() 函數(shù)時,實際執(zhí)行的是 SocketServer 的實現(xiàn),通過 BaseService 結(jié)構(gòu)的 Start 方法調(diào)用:

func (s *SocketServer) OnStart() error {
    if err := s.BaseService.OnStart(); err != nil {
        return err
    }
    ln, err := net.Listen(s.proto, s.addr)
    if err != nil {
        return err
    }
    s.listener = ln
    // 啟動一個協(xié)程來監(jiān)聽連接
    go s.acceptConnectionsRoutine()
    return nil
}

至此已經(jīng)把 ABCI 應用服務端是如何啟動的說明了,下面的部分會詳細說明請求及應答處理的細節(jié)。

請求及應答處理

這部分以 deliver_tx 命令為例來進行說明。

abci-cli 客戶端

為了方便,這里再看一下 socketClient 的數(shù)據(jù)結(jié)構(gòu):

type socketClient struct {
    cmn.BaseService

    reqQueue    chan *ReqRes
    flushTimer  *cmn.ThrottleTimer
    mustConnect bool

    mtx     sync.Mutex
    addr    string
    conn    net.Conn
    err     error
    
    // 這里會把請求寫入雙向鏈表 reqSent 的尾端,在 recvResponseRoutine 函數(shù)中接收到應答時會從此鏈表取出
    // 第一個請求進行類型比較,如果與應答類型一樣則返回給前端
    reqSent *list.List
    resCb   func(*types.Request, *types.Response) // listens to all callbacks

}

OnStart 函數(shù)所做的就是與服務端建立連接,啟動兩個協(xié)程來處理請求與應答。

先看處理請求的函數(shù) sendRequestsRoutine

func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {
    w := bufio.NewWriter(conn)
    for {
        select {
            // 發(fā)送 flush 類型請求的定時器
        case <-cli.flushTimer.Ch:
            select {
            case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
            default:
                // Probably will fill the buffer, or retry later.
            }
        case <-cli.Quit():
            return
        case reqres := <-cli.reqQueue:
             // 這里會把請求寫入雙向鏈表 reqSent 的尾端
            cli.willSendReq(reqres)
             // 把請求消息寫入連接緩沖,這時還沒有發(fā)送給服務端
            err := types.WriteMessage(reqres.Request, w)
            if err != nil {
                cli.StopForError(fmt.Errorf("Error writing msg: %v", err))
                return
            }
             // 如果請求是 flush 類型,會把緩沖的請求消息 (包括此 flush 請求) 寫入連接,
             // 由 kvstore 服務端接收并處理。
             // 有兩種方式發(fā)送 flush 類型的請求:1) 定時器觸發(fā);2) DeliverTxSync 函數(shù)中主動發(fā)送
            if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
                err = w.Flush()
                if err != nil {
                    cli.StopForError(fmt.Errorf("Error flushing writer: %v", err))
                    return
                }
            }
        }
    }
}

現(xiàn)在看處理應答的 recvResponseRoutine 函數(shù):

func (cli *socketClient) recvResponseRoutine(conn net.Conn) {

    r := bufio.NewReader(conn) // Buffer reads
    for {
        var res = &types.Response{}
        // 從連接中讀取應答,出錯時會關閉連接并執(zhí)行 flushQueue() 釋放 wg.WaitGroup
        err := types.ReadMessage(r, res)
        if err != nil {
            cli.StopForError(err)
            return
        }
        switch r := res.Value.(type) {
        case *types.Response_Exception:
            cli.StopForError(errors.New(r.Exception.Error))
            return
        default:
             // 應答處理邏輯在這里
            err := cli.didRecvResponse(res)
            if err != nil {
                cli.StopForError(err)
                return
            }
        }
    }
}

func (cli *socketClient) didRecvResponse(res *types.Response) error {
    cli.mtx.Lock()
    defer cli.mtx.Unlock()

    // 從雙向鏈表 reqSent 中取出第一個請求
    next := cli.reqSent.Front()
    if next == nil {
        return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res.Value))
    }
    reqres := next.Value.(*ReqRes)
     // 檢查請求與應答的類型是否匹配
    if !resMatchesReq(reqres.Request, res) {
        return fmt.Errorf("Unexpected result type %v when response to %v expected",
            reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
    }

    reqres.Response = res    // Set response
    reqres.Done()            // 釋放此請求創(chuàng)建時執(zhí)行的 wg.Add(1)
    cli.reqSent.Remove(next) // 從鏈表中刪除第一個請求

    // Notify reqRes listener if set
    if cb := reqres.GetCallback(); cb != nil {
        cb(res)
    }

    // Notify client listener if set
    if cli.resCb != nil {
        cli.resCb(reqres.Request, res)
    }

    return nil
}

ABCI 應用服務端

創(chuàng)建應用細節(jié)

先看應用的數(shù)據(jù)結(jié)構(gòu) KVStoreApplication

// 這個結(jié)構(gòu)只實現(xiàn)了 Info、DeliverTx、CheckTx、Commit 和 Query 方法
type KVStoreApplication struct {
    // 這個基礎結(jié)構(gòu)實現(xiàn)了 "tendermint/abci/types/Application" 接口(此項目中基本都是用的這種模式)。
    // 這個結(jié)構(gòu)實現(xiàn)的接口的方法中沒有具體應用邏輯,以供開發(fā)者在自己的應用結(jié)構(gòu)中繼承此結(jié)構(gòu)后,可以只實現(xiàn)
    // 必須的方法,而無需實現(xiàn)接口的全部方法
    types.BaseApplication

    state State
}

創(chuàng)建應用:

func NewKVStoreApplication() *KVStoreApplication {
    state := loadState(dbm.NewMemDB())
    return &KVStoreApplication{state: state}
}

主要看 loadState 函數(shù),它根據(jù)鍵 stateKey 從內(nèi)存存儲 MemDB 結(jié)構(gòu)中獲取對應狀態(tài),因為是初始化,肯定沒有對應值,返回的是一個帶有新建 MemDB (就是一個帶鎖的 map)的 State

func loadState(db dbm.DB) State {
    stateBytes := db.Get(stateKey)
    var state State
    if len(stateBytes) != 0 {
        err := json.Unmarshal(stateBytes, &state)
        if err != nil {
            panic(err)
        }
    }
    state.db = db
    return state
}

服務端處理請求及應答細節(jié)

重點看接受連接的函數(shù):

func (s *SocketServer) acceptConnectionsRoutine() {
    for {
        // 接受連接,下面這些日志就是命令行啟動 kvstore 后看到的信息
        s.Logger.Info("Waiting for new connection...")
        conn, err := s.listener.Accept()
        if err != nil {
            if !s.IsRunning() {
                return // Ignore error from listener closing.
            }
            s.Logger.Error("Failed to accept connection: " + err.Error())
            continue
        }

        s.Logger.Info("Accepted a new connection")

        // 可以接受多條連接并記錄
        connID := s.addConn(conn)

        closeConn := make(chan error, 2)              // Push to signal connection closed
        responses := make(chan *types.Response, 1000) // A channel to buffer responses

         // 從連接讀取請求并處理
        go s.handleRequests(closeConn, conn, responses)
        
         // 從 'responses' 獲取應答并寫到連接中
        go s.handleResponses(closeConn, conn, responses)

        // 等待信號來關閉連接
        go s.waitForClose(closeConn, connID)
    }
}

先看處理請求的 handleRequests 函數(shù):

func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
    var count int
    var bufReader = bufio.NewReader(conn)
    
    for {
        var req = &types.Request{}
        // 從連接上讀取請求消息,讀取完畢或出錯后要通知 waitForClose 協(xié)程來關閉連接
        err := types.ReadMessage(bufReader, req)
        if err != nil {
            if err == io.EOF {
                closeConn <- err
            } else {
                closeConn <- fmt.Errorf("Error reading message: %v", err.Error())
            }
            return
        }
        s.appMtx.Lock()
        count++
        // 處理請求時要加鎖,這個函數(shù)會根據(jù)請求的類型調(diào)用具體函數(shù)來處理,
        // 比如 types.Request_DeliverTx 類型時就會調(diào)用 KVStoreApplication.DeliverTx 函數(shù)來處理,
        // 應答會寫入 responses 通道,以便 handleResponses 函數(shù)處理
        s.handleRequest(req, responses)
        s.appMtx.Unlock()
    }
}

現(xiàn)在看 handleResponses 函數(shù):

func (s *SocketServer) handleResponses(closeConn chan error, conn net.Conn, responses <-chan *types.Response) {
    var count int
    var bufWriter = bufio.NewWriter(conn)
    for {
        // 從 responses 通道讀取應答并寫入連接。同樣,出錯時要通知 waitForClose 來關閉連接
        var res = <-responses
        err := types.WriteMessage(res, bufWriter)
        if err != nil {
            closeConn <- fmt.Errorf("Error writing message: %v", err.Error())
            return
        }
        
         // flush 類型的應答是哪里來的?
         // 與客戶端處理類似,如果是此類型要進行 Flush 處理,把緩沖的數(shù)據(jù)寫入連接
        if _, ok := res.Value.(*types.Response_Flush); ok {
            err = bufWriter.Flush()
            if err != nil {
                closeConn <- fmt.Errorf("Error flushing write buffer: %v", err.Error())
                return
            }
        }
        count++
    }
}

deliver_tx 子命令執(zhí)行過程

現(xiàn)在以此命令為例說明發(fā)起請求及收到應答的整體過程。

  1. 服務端啟動,啟動兩個協(xié)程,一個處理請求,一個處理應答。
  2. 在命令行輸入 abci-cli console 進入交互模式,創(chuàng)建客戶端并與服務端建立了持久連接。服務端和客戶端各啟動兩個協(xié)程,一個處理請求,一個處理應答。
  3. 輸入 deliver_tx "abc",客戶端會識別子命令,調(diào)用 cmdDeliverTx 函數(shù),此函數(shù)在解析到 tx 后進行編碼,隨后會調(diào)用 cli.DeliverTxSync(txBytes) (同步的) 函數(shù)。
  4. 請求消息寫入連接后,服務端讀取到此請求。
  5. 根據(jù)請求類型,調(diào)用 KVStore 應用的 DeliverTx 函數(shù)進行處理。
  6. 服務端處理完畢后的應答寫入連接,客戶端接收到應答,前端打印顯示到命令行。
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,568評論 19 139
  • 國家電網(wǎng)公司企業(yè)標準(Q/GDW)- 面向?qū)ο蟮挠秒娦畔?shù)據(jù)交換協(xié)議 - 報批稿:20170802 前言: 排版 ...
    庭說閱讀 12,423評論 6 13
  • Spring Web MVC Spring Web MVC 是包含在 Spring 框架中的 Web 框架,建立于...
    Hsinwong閱讀 22,942評論 1 92
  • 如果我首先看到了關于阿爾瓦倫加438天海上漂流的新聞,腦中一定會構(gòu)建出一個具有悲劇色彩而最終大團圓結(jié)局的傳奇故事。...
    陸衷閱讀 448評論 0 0
  • 好的東西習慣了也不覺得它有多好,而糟的東西習慣了依舊知道它有多糟,這大概就是人類的矯情之處。
    王宇宙她姐玉娘閱讀 122評論 0 0

友情鏈接更多精彩內(nèi)容