nsq源碼解讀之nsqlookupd

NSQ由3個(gè)進(jìn)程組成:

  • nsqd: 接收,維護(hù)隊(duì)列和分發(fā)消息給客戶端的daemon進(jìn)程
  • nsqlookupd: 管理拓?fù)湫畔⒉⑻峁┳罱K一致性的發(fā)現(xiàn)服務(wù)
  • nsqadmin: 用于實(shí)時(shí)監(jiān)控集群運(yùn)行并提供管理命令的管理網(wǎng)站平臺(tái)。
    我們先從nsqlookupd開始。

1. 程序入口

nsqlookup的入口函數(shù)在apps/nsqlookupd/nsqlookupd.go這個(gè)文件中。

//apps/nsqlookupd/nsqlookupd.go
func main() {
    prg := &program{}
    if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
        log.Fatal(err)
    }
}

這里用到了github.com/judwhite/go-svc/svc管理進(jìn)程。實(shí)際工作中調(diào)用的是Init,Start,Stop三個(gè)函數(shù)。

  • Init函數(shù)判斷了當(dāng)前的操作系統(tǒng)環(huán)境,如果是windwos系統(tǒng)的話,就會(huì)將修改工作目錄。可以參考https://github.com/judwhite/go-svc首頁(yè)的例子。
  • Start函數(shù)實(shí)現(xiàn)了主體功能,接下來會(huì)具體分析。
  • Stop函數(shù)接受外界的signal,如果收到syscall.SIGINT和syscall.SIGTERM信號(hào),就會(huì)被執(zhí)行。

2. Stop函數(shù)

先易后難,先解讀一下Stop函數(shù)。Stop函數(shù)調(diào)用Exit函數(shù),關(guān)閉了tcp服務(wù)和http服務(wù),然后等兩個(gè)服務(wù)關(guān)閉之后,程序結(jié)束。“等兩個(gè)服務(wù)關(guān)閉”這個(gè)動(dòng)作涉及到goroutine同步,nsq通過WaitGroup(參考Goroutine同步)實(shí)現(xiàn)。

//nsqlookupd/nsqlookupd.go
func (l *NSQLookupd) Exit() {
    if l.tcpListener != nil {
        l.tcpListener.Close()
    }

    if l.httpListener != nil {
        l.httpListener.Close()
    }
    l.waitGroup.Wait()
}

//internal/util/wait_group_wrapper.go
func (w *WaitGroupWrapper) Wrap(cb func()) {
    w.Add(1)
    go func() {
        cb()
        w.Done()
    }()
}

其中cb函數(shù)以tcp服務(wù)為例,當(dāng)間接檢測(cè)到tcp已經(jīng)close時(shí),退出for循環(huán),cb執(zhí)行結(jié)束,waitGroup計(jì)數(shù)器減一。
這里通過error的值判斷tcpListener是否關(guān)閉的方式,值得關(guān)注一下。

//internal/protocol/tcp_server.go
func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
    l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))

    for {
        clientConn, err := listener.Accept()
        if err != nil {
            if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
                l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
                runtime.Gosched()
                continue
            }
            // theres no direct way to detect this error because it is not exposed
            if !strings.Contains(err.Error(), "use of closed network connection") {
                l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
            }
            break
        }
        go handler.Handle(clientConn)
    }

    l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
}

3. Start函數(shù)

Start函數(shù)實(shí)現(xiàn)了主要的功能。首先是讀配置,然后初始化nsqlookupd,最后啟動(dòng)了tcp服務(wù)和http服務(wù)。
其中NSQLookupd.DB中維護(hù)了所有的消息的生產(chǎn)者信息。

3.1 tcp服務(wù)

tcp協(xié)議格式: 4字節(jié)的size,4字節(jié)的協(xié)議版本號(hào)(V1),之后的都是數(shù)據(jù)。

[x][x][x][x][x][x][x][x][x][x][x][x]...
|  (int32) ||  (int32) || (binary)
|  4-byte  ||  4-byte  || N-byte
------------------------------------...
    size      frame ID     data

tcp解包和處理的部分代碼為nsqlookupd/tcp.go和nsqlookupd/lookup_protocol_v1.go。需要注意的是,producer與nsqlookupd維持了一個(gè)長(zhǎng)連接。tcp頭域的8個(gè)字節(jié)只有第一次連接時(shí)才會(huì)發(fā)送。
其中IOLoop中這幾行代碼,會(huì)持續(xù)的從tcp連接中讀取數(shù)據(jù)包。

//nsqlookupd/lookup_protocol_v1.go
client := NewClientV1(conn)
reader := bufio.NewReader(client)
for {
    line, err = reader.ReadString('\n')
......

tcp服務(wù)支持4種操作PING,IDENTIFY,REGISTER,UNREGISTER。
PING用來維持連接,IDENTIFY用來nsqlookupd和producer之間交換身份信息和端口配置信息,REGISTER和UNREGISTER分別是注冊(cè)和刪除producer(通過NSQLookupd.DB)

3.2 http服務(wù)

http服務(wù)支持一系列接口。
有兩點(diǎn)比較有趣:

  1. nsq實(shí)現(xiàn)了一個(gè)裝飾器decorator,是的,效果和python里的裝飾器一樣!使用如下:
//nsqlookupd/http.go
router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))

Decorator實(shí)現(xiàn)方式如下:

//internal/http_api/api_response.go
type Decorator func(APIHandler) APIHandler
func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle {
    decorated := f
    for _, decorate := range ds {
        decorated = decorate(decorated)
    }
    return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
        decorated(w, req, ps)
    }
}
  1. 有個(gè)接口叫"/topic/tombstone",tombstone是什么意思呢?字面上是墓碑的意思。在這里的意思,引用官網(wǎng)的一段話:

However, it gets a bit more complicated when a topic is no longer produced on a subset of nodes. Because of the way consumers query nsqlookupd and connect to all producers you enter into race conditions with attempting to remove the information from the cluster and consumers discovering that node and reconnecting (thus pushing updates that the topic is still produced on that node). The solution in these cases is to use “tombstones”. A tombstone in nsqlookupd context is producer specific and lasts for a configurable --tombstone-lifetime time. During that window the producer will not be listed in /lookup queries, allowing the node to delete the topic, propagate that information to nsqlookupd (which then removes the tombstoned producer), and prevent any consumer from re-discovering that node.

如果要下掉某個(gè)topic的部分節(jié)點(diǎn),因?yàn)橄M(fèi)者會(huì)查詢nsqlookup然后去連所有的生產(chǎn)者,會(huì)產(chǎn)生一個(gè)問題:一方面,nsqlookupd會(huì)去刪除集群中相關(guān)的信息,另一方面在下掉這部分生產(chǎn)者之后,消費(fèi)者不會(huì)立刻更新生產(chǎn)者的信息,還是會(huì)繼續(xù)重新連接生產(chǎn)者,這會(huì)促使生產(chǎn)者繼續(xù)生產(chǎn)。解決的辦法就是使用"tombstones"。生產(chǎn)者會(huì)存在tombstone-lifetime的時(shí)間。在那個(gè)時(shí)間窗口里面,消費(fèi)者去/lookup的時(shí)候,看不到這個(gè)生產(chǎn)者,允許這個(gè)生產(chǎn)者節(jié)點(diǎn)刪除這個(gè)topic,同時(shí)將這個(gè)信息傳給nsqlookupd,然后刪除被tombstoned的節(jié)點(diǎn),阻止消費(fèi)者重連這個(gè)生產(chǎn)者節(jié)點(diǎn)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,506評(píng)論 19 139
  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,888評(píng)論 13 425
  • 前言 許多家長(zhǎng)在小孩還沒有踏進(jìn)校門前 就已經(jīng)開始為孩子做各種學(xué)前教育 教他們認(rèn)字、背古詩(shī)、學(xué)兒歌 當(dāng)然也會(huì)教他們抽...
    葡萄科技閱讀 322評(píng)論 0 0
  • 最近天氣熱了,阿焱也越來越和我不對(duì)胃口了,他老是挑我的刺,還總叫我收拾房間打掃客廳,這么熱的天,打掃完又是滿身臭汗...
    候麥閱讀 9,305評(píng)論 0 2
  • 20161121問題解析請(qǐng)點(diǎn)擊今日問題下方的“【Java每日一題】20161122”查看(問題解析在公眾號(hào)首發(fā),公...
    weknow閱讀 177評(píng)論 0 4

友情鏈接更多精彩內(nèi)容