etcd-raft源碼分析2-server間通信機制

在etcd的raft實現(xiàn)中,server之前的消息傳遞并不是簡單的request-response模型,而是讀寫分離模型,即每兩個server之間會建立兩條鏈路,對于每一個server來說,一條鏈路專門用來發(fā)送數(shù)據(jù),另一條鏈路專門用來接收數(shù)據(jù)。在代碼實現(xiàn)中,通過streamWriter發(fā)送數(shù)據(jù),通過streamReader接收數(shù)據(jù)。即通過streamReader接收數(shù)據(jù)接收到數(shù)據(jù)后會直接響應(yīng),在處理完數(shù)據(jù)后通過streamWriter將響應(yīng)發(fā)送到對端。

對于每個server來說,不管是leader、candicate還是follower,都會維持一個peers數(shù)組,每個peer對應(yīng)集群中的一個server,負(fù)責(zé)處理server之間的一些數(shù)據(jù)交互。

server間數(shù)據(jù)交互的框圖如下:

server間數(shù)據(jù)交互.png

當(dāng)server需要向其他server發(fā)送數(shù)據(jù)時,只需要找到其他server對應(yīng)的peer,然后向peer的streamWriter的msgc通道發(fā)送數(shù)據(jù)即可,streamWriter會監(jiān)聽msgc通道的數(shù)據(jù)并發(fā)送到對端server;而streamReader會在一個goroutine中循環(huán)讀取對端發(fā)送來的數(shù)據(jù),一旦接收到數(shù)據(jù),就發(fā)送到peer的p.propc或p.recvc通道,而peer會監(jiān)聽這兩個通道的事件,寫入到node的n.propc或n.recvc通道,node只需要監(jiān)聽這兩個通道的數(shù)據(jù)并處理即可。這就是在etcd的raft實現(xiàn)中server間數(shù)據(jù)交互的流程。

對于每個server,都會創(chuàng)建一個raftNode,并且啟動一個goroutine,執(zhí)行raftNode的serveRaft方法,這個方法的代碼如下:

func (rc *raftNode) serveRaft() {
url, err := url.Parse(rc.peers[rc.id-1])
if err != nil {
    log.Fatalf("raftexample: Failed parsing URL (%v)", err)
}

ln, err := newStoppableListener(url.Host, rc.httpstopc)
if err != nil {
    log.Fatalf("raftexample: Failed to listen rafthttp (%v)", err)
}

err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln)
select {
case <-rc.httpstopc:
default:
    log.Fatalf("raftexample: Failed to serve rafthttp (%v)", err)
}
close(rc.httpdonec)
}

這個方法主要是建立一個httpserver,監(jiān)聽其他server的連接,處理函數(shù)為rc.transport.Handler(),下面看下該處代碼:

func (t *Transport) Handler() http.Handler {
pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
mux := http.NewServeMux()
mux.Handle(RaftPrefix, pipelineHandler)
mux.Handle(RaftStreamPrefix+"/", streamHandler)
mux.Handle(RaftSnapshotPrefix, snapHandler)
mux.Handle(ProbingPrefix, probing.NewHandler())
return mux
}

下面重點看下streamHandler,這個handler用于處理server之間的心跳、投票、附加日志等請求的發(fā)送,該handler的ServeHTTP代碼為:

func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
    w.Header().Set("Allow", "GET")
    http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
    return
}

w.Header().Set("X-Server-Version", version.Version)
w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())

if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
    http.Error(w, err.Error(), http.StatusPreconditionFailed)
    return
}

var t streamType
switch path.Dir(r.URL.Path) {
case streamTypeMsgAppV2.endpoint():
    t = streamTypeMsgAppV2
case streamTypeMessage.endpoint():
    t = streamTypeMessage
default:
    plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path)
    http.Error(w, "invalid path", http.StatusNotFound)
    return
}

fromStr := path.Base(r.URL.Path)
from, err := types.IDFromString(fromStr)
if err != nil {
    plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err)
    http.Error(w, "invalid from", http.StatusNotFound)
    return
}
if h.r.IsIDRemoved(uint64(from)) {
    plog.Warningf("rejected the stream from peer %s since it was removed", from)
    http.Error(w, "removed member", http.StatusGone)
    return
}
p := h.peerGetter.Get(from)
if p == nil {
    // This may happen in following cases:
    // 1. user starts a remote peer that belongs to a different cluster
    // with the same cluster ID.
    // 2. local etcd falls behind of the cluster, and cannot recognize
    // the members that joined after its current progress.
    if urls := r.Header.Get("X-PeerURLs"); urls != "" {
        h.tr.AddRemote(from, strings.Split(urls, ","))
    }
    plog.Errorf("failed to find member %s in cluster %s", from, h.cid)
    http.Error(w, "error sender not found", http.StatusNotFound)
    return
}

wto := h.id.String()
if gto := r.Header.Get("X-Raft-To"); gto != wto {
    plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto)
    http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
    return
}

w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()

c := newCloseNotifier()
conn := &outgoingConn{
    t:       t,
    Writer:  w,
    Flusher: w.(http.Flusher),
    Closer:  c,
}
//一旦接收到對端的連接,則把該連接attach到自己encoder的writer中,這樣自己encoder和對端decoder就能協(xié)同工作了,
// 對于每個節(jié)點,會主動去連接其他節(jié)點,連接成功后便通過自己的decoder循環(huán)讀取該連接的數(shù)據(jù),該節(jié)點通過該decoder讀取其他節(jié)點發(fā)來的數(shù)據(jù);
// 當(dāng)某節(jié)點收到其他節(jié)點連接請求并連接成功后便把該連接attach到該節(jié)點的encoder,該節(jié)點通過該encoder向其他節(jié)點發(fā)送數(shù)據(jù);
p.attachOutgoingConn(conn)
<-c.closeNotify()
}

當(dāng)監(jiān)聽到其他server的連接建立請求并建立連接成功后,其核心處理邏輯是這一行代碼:

p.attachOutgoingConn(conn)

下面看下其函數(shù)實現(xiàn):

func (p *peer) attachOutgoingConn(conn *outgoingConn) {
var ok bool
switch conn.t {
case streamTypeMsgAppV2:
    ok = p.msgAppV2Writer.attach(conn)
case streamTypeMessage:
    ok = p.writer.attach(conn)
default:
    plog.Panicf("unhandled stream type %s", conn.t)
}
if !ok {
    conn.Close()
}
}

其中調(diào)用了streamWriter的attach方法,如下:

func (cw *streamWriter) attach(conn *outgoingConn) bool {
select {
case cw.connc <- conn:
    return true
case <-cw.done:
    return false
}
}

最終將該連接寫入到cw.connc通道,下面看下streamWriter監(jiān)聽該通道的goroutine:

case conn := <-cw.connc:
        cw.mu.Lock()
        closed := cw.closeUnlocked()
        t = conn.t
        switch conn.t {
        case streamTypeMsgAppV2:
            enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
        case streamTypeMessage:
            enc = &messageEncoder{w: conn.Writer}
        default:
            plog.Panicf("unhandled stream type %s", conn.t)
        }
        flusher = conn.Flusher
        unflushed = 0
        cw.status.activate()
        cw.closer = conn.Closer
        cw.working = true
        cw.mu.Unlock()

        if closed {
            plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
        }
        plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
        heartbeatc, msgc = tickc.C, cw.msgc

當(dāng)監(jiān)聽到cw.connc通道有數(shù)據(jù)時,獲取該數(shù)據(jù),即與其他某個server的連接,然后獲取conn.Writer封裝成一個encoder,用來將要發(fā)送的數(shù)據(jù)發(fā)送出去。

上面說了server的連接監(jiān)聽,下面看下server與其他server的連接建立。
在startRaft這個goroutine中,有如下代碼段:

rc.transport = &rafthttp.Transport{
    ID:          types.ID(rc.id),
    ClusterID:   0x1000,
    Raft:        rc,
    ServerStats: ss,
    LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
    ErrorC:      make(chan error),
}

rc.transport.Start()
for i := range rc.peers {
    if i+1 != rc.id {
        rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
    }
}

在rc.transport.AddPeer方法中調(diào)用了startPeer方法,里面創(chuàng)建了streamReader,并開啟了一個goroutine:

func (cr *streamReader) run() {
t := cr.typ
plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t)
for {
    //與對端建立連接
    rc, err := cr.dial(t)
    if err != nil {
        if err != errUnsupportedStreamType {
            cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
        }
    } else {
        cr.status.activate()
        plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
        //循環(huán)讀取對端發(fā)過來的數(shù)據(jù)并處理
        err := cr.decodeLoop(rc, t)
        plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
        switch {
        // all data is read out
        case err == io.EOF:
        // connection is closed by the remote
        case transport.IsClosedConnError(err):
        default:
            cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
        }
    }
    select {
    // Wait 100ms to create a new stream, so it doesn't bring too much
    // overhead when retry.
    case <-time.After(100 * time.Millisecond):
    case <-cr.stopc:
        plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
        close(cr.done)
        return
    }
}
}

通過rc, err := cr.dial(t)與對端建立連接,在err := cr.decodeLoop(rc, t)中循環(huán)讀取該連接的數(shù)據(jù):

func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
var dec decoder
cr.mu.Lock()
switch t {
case streamTypeMsgAppV2:
    dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
case streamTypeMessage:
    dec = &messageDecoder{r: rc}
default:
    plog.Panicf("unhandled stream type %s", t)
}
select {
case <-cr.stopc:
    cr.mu.Unlock()
    if err := rc.Close(); err != nil {
        return err
    }
    return io.EOF
default:
    cr.closer = rc
}
cr.mu.Unlock()
for {
    m, err := dec.decode()
    if err != nil {
        cr.mu.Lock()
        cr.close()
        cr.mu.Unlock()
        return err
    }
    receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))

    cr.mu.Lock()
    paused := cr.paused
    cr.mu.Unlock()

    if paused {
        continue
    }

    if isLinkHeartbeatMessage(&m) {
        // raft is not interested in link layer
        // heartbeat message, so we should ignore
        // it.
        continue
    }

    recvc := cr.recvc
    if m.Type == raftpb.MsgProp {
        recvc = cr.propc
    }
    select {
    case recvc <- m:
    default:
        if cr.status.isActive() {
            plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From))
        }
        plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
        recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
    }
}
}

這里創(chuàng)建了decoder,并在一個for循環(huán)中循環(huán)執(zhí)行m, err := dec.decode(),讀取對端發(fā)送過來的數(shù)據(jù),寫入cr.recvc或cr.propc通道。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,506評論 19 139
  • 1、TCP狀態(tài)linux查看tcp的狀態(tài)命令:1)、netstat -nat 查看TCP各個狀態(tài)的數(shù)量2)、lso...
    北辰青閱讀 9,700評論 0 11
  • https://nodejs.org/api/documentation.html 工具模塊 Assert 測試 ...
    KeKeMars閱讀 6,597評論 0 6
  • 由于最近在項目重構(gòu),所以需要忙的也很多,今天統(tǒng)一解決了應(yīng)用內(nèi)很多頁面在退出后并沒有調(diào)用delloc方法導(dǎo)致內(nèi)存泄漏...
    RelaxLiu閱讀 2,081評論 0 5
  • 第四章:愛不在一朝一夕 “曲哥哥,為何我狼狽的時候,你總會出現(xiàn),讓我促不及防。”從林小煥認(rèn)識曲生以來,她就只當(dāng)是曲...
    冷哼哼閱讀 610評論 0 4

友情鏈接更多精彩內(nèi)容