NSQ 源碼學(xué)習(xí)筆記(一)

首先我們來看一下Nsq的組織結(jié)構(gòu):

  • nsqd:接收,分發(fā)隊(duì)列信息的守護(hù)進(jìn)程,可以單獨(dú)部署,也可以集群化運(yùn)行
  • nsqlookupd:管理nsqd節(jié)點(diǎn),服務(wù)發(fā)現(xiàn)
  • nsqadmin:nsq的可視化管理工具

NSQ的拓補(bǔ)圖

@拓?fù)鋱D | center

NSQ中Topic和channel的關(guān)系

Topic會(huì)將消息發(fā)送到每個(gè)訂閱者(channel)
channel的讀消費(fèi)類似負(fù)載均衡,會(huì)均勻的投遞到各個(gè)消費(fèi)端

@Topic和channel的關(guān)系 | center

三個(gè)模塊中nsqd模塊最為重要,我們從這個(gè)模塊開始學(xué)習(xí)它的源碼

入口函數(shù)

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
var cfg config
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
    _, err := toml.DecodeFile(configFile, &cfg)
    if err != nil {
        log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
    }
}
cfg.Validate()

opts := nsqd.NewOptions()
options.Resolve(opts, flagSet, cfg)
nsqd := nsqd.New(opts)

nsqd.LoadMetadata()
err := nsqd.PersistMetadata()
if err != nil {
    log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
nsqd.Main()
<-signalChan
nsqd.Exit()
  1. 首先用 signal.Notify 阻塞系統(tǒng)的 killctrl+c 信號(hào),讓進(jìn)程可以處于deamon的狀態(tài)運(yùn)行
  2. 按優(yōu)先級(jí)合并配置文件:命令行 > 配置文件 > 默認(rèn)值
  3. nsqd.LoadMetadata 讀取dat文件,加載topic和channel信息,并同步運(yùn)行和停止的狀態(tài)
  4. 將進(jìn)程的運(yùn)行狀態(tài)(topic和channel信息)持久化到dat文件中
  5. 執(zhí)行 nsqd.Main 直到捕捉退出信號(hào)

nsqd.Main 的代碼位于 nsqd/nsqd.go

NSQD主函數(shù)(TCP監(jiān)聽)

func (n *NSQD) Main() {
    var httpListener net.Listener
    var httpsListener net.Listener

    ctx := &context{n}

    tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)
    if err != nil {
        n.logf("FATAL: listen (%s) failed - %s", n.getOpts().TCPAddress, err)
        os.Exit(1)
    }
    n.Lock()
    n.tcpListener = tcpListener
    n.Unlock()
    tcpServer := &tcpServer{ctx: ctx}
    n.waitGroup.Wrap(func() {
        protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
    })
    ...
}

??NSQD首先啟動(dòng)了tcp監(jiān)聽模型,為了保證通用性,在 protocol 包中封裝了TCPServer,需要傳入 Listener, TCPHandler, Logger 對(duì)象。所有的TCP監(jiān)聽均可以用這個(gè)模式來創(chuàng)建監(jiān)聽,只要傳入對(duì)應(yīng)的 ListenerTCPHandler ,那么Listener在Accept到Connect的時(shí)候,將其交給對(duì)應(yīng)TCPHandler.Handle(clientConn) 執(zhí)行。

TCPHandler 的Interface實(shí)現(xiàn)

package protocol

type TCPHandler interface {
    Handle(net.Conn)
}

func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
    l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))

    for {
        clientConn, err := listener.Accept()
        if err != nil {
            if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
                l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
                runtime.Gosched()
                continue
            }
            // theres no direct way to detect this error because it is not exposed
            if !strings.Contains(err.Error(), "use of closed network connection") {
                l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
            }
            break
        }

        // 啟動(dòng)Goroutine 去執(zhí)行Handle函數(shù)
        go handler.Handle(clientConn)
    }

    l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
}

??這里體現(xiàn)了Go在實(shí)現(xiàn)Interface的便捷之處,不需要顯示的聲明實(shí)現(xiàn)了某個(gè)Interface,只需要完全的實(shí)現(xiàn)Interface中定義的方法,那么就會(huì)默認(rèn)該類型實(shí)現(xiàn)了接口。在這里不同的Handler,只要實(shí)現(xiàn)了Handle(net.Conn),就可以被當(dāng)做TCPHandler對(duì)象傳入。在代碼中的體現(xiàn)是:
??執(zhí)行Handle函數(shù)時(shí)是啟動(dòng)一個(gè)Goroutine來執(zhí)行的,這里其實(shí)是per connect per goroutine,由于Golang的特性,Goroutine在執(zhí)行時(shí)的調(diào)度模式是epoll模式,可以很好的利用系統(tǒng)的多核資源。

main函數(shù)中TCPServer的實(shí)現(xiàn)

type tcpServer struct {
    ctx *context
}

func (p *tcpServer) Handle(clientConn net.Conn) {
    p.ctx.nsqd.logf("TCP: new client(%s)", clientConn.RemoteAddr())

    // 客戶端應(yīng)該初始化本身通過發(fā)送一個(gè)4字節(jié)序列表示協(xié)議的版本,
    // 這樣將允許我們優(yōu)雅地升級(jí)兼容協(xié)議
    buf := make([]byte, 4)
    _, err := io.ReadFull(clientConn, buf)
    if err != nil {
        p.ctx.nsqd.logf("ERROR: failed to read protocol version - %s", err)
        return
    }
    protocolMagic := string(buf)

    p.ctx.nsqd.logf("CLIENT(%s): desired protocol magic '%s'",
        clientConn.RemoteAddr(), protocolMagic)

    var prot protocol.Protocol
    switch protocolMagic {
    case "  V2":
        prot = &protocolV2{ctx: p.ctx} // V2版本的協(xié)議操作
    default:
        protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
        clientConn.Close()
        p.ctx.nsqd.logf("ERROR: client(%s) bad protocol magic '%s'",
            clientConn.RemoteAddr(), protocolMagic)
        return
    }

    err = prot.IOLoop(clientConn)
    if err != nil {
        p.ctx.nsqd.logf("ERROR: client(%s) - %s", clientConn.RemoteAddr(), err)
        return
    }
}

??源碼中標(biāo)記了需要在通訊時(shí)預(yù)留4個(gè)字節(jié)的版本號(hào)信息,用來兼容協(xié)議的升級(jí)。如果未來有協(xié)議升級(jí),只需要在protocolMagic中添加新的case分支就可以了。

NSQD主函數(shù)(HTTP/HTTPS監(jiān)聽)

    if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
        httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
        if err != nil {
            n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
            os.Exit(1)
        }
        n.Lock()
        n.httpsListener = httpsListener
        n.Unlock()
        httpsServer := newHTTPServer(ctx, true, true)
        n.waitGroup.Wrap(func() {
            http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.getOpts().Logger)
        })
    }
    httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
    if err != nil {
        n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
        os.Exit(1)
    }
    n.Lock()
    n.httpListener = httpListener
    n.Unlock()
    httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
    n.waitGroup.Wrap(func() {
        http_api.Serve(n.httpListener, httpServer, "HTTP", n.getOpts().Logger)
    })

??這里不論是http還是https的監(jiān)聽,httpsServerhttpServer作為Handler對(duì)象,均在內(nèi)部聲明了路由規(guī)則,不同的請求定義了不同的操作,最后通過http_api.Serve()綁定端口監(jiān)聽

NSQD默認(rèn)自啟的操作

    n.waitGroup.Wrap(func() { n.queueScanLoop() }) // 循環(huán)消息分發(fā)
    n.waitGroup.Wrap(func() { n.idPump() }) // 生產(chǎn)唯一消息id的一個(gè)隊(duì)列
    n.waitGroup.Wrap(func() { n.lookupLoop() }) // 如果nsqd有變化,同步nsqlookup
    if n.getOpts().StatsdAddress != "" {
        // 定時(shí)將nsqd的狀態(tài)以短連接的方式發(fā)送至一個(gè)狀態(tài)監(jiān)護(hù)進(jìn)程.包括了nsqd的應(yīng)用資源信息,以及nsqd上topic的信息
        n.waitGroup.Wrap(func() { n.statsdLoop() })
    }

??啟動(dòng)監(jiān)聽后,除了通過監(jiān)聽啟動(dòng)的操作外,NSQD還有一些類似守護(hù)進(jìn)程的操作會(huì)一直運(yùn)行,包括:

  • 循環(huán)消息分發(fā)
  • 生產(chǎn)唯一消息ID
  • nsqlookup的狀態(tài)同步
  • 狀態(tài)監(jiān)控
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,628評(píng)論 19 139
  • 轉(zhuǎn)載自http://blog.csdn.net/qq295445028/article/details/79930...
    WebSSO閱讀 3,157評(píng)論 0 3
  • 前言 在微服務(wù)架構(gòu)的系統(tǒng)中,我們通常會(huì)使用輕量級(jí)的消息代理來構(gòu)建一個(gè)共用的消息主題讓系統(tǒng)中所有微服務(wù)實(shí)例都連接上來...
    Chandler_玨瑜閱讀 6,787評(píng)論 2 39
  • https://nodejs.org/api/documentation.html 工具模塊 Assert 測試 ...
    KeKeMars閱讀 6,609評(píng)論 0 6
  • 記得讀大學(xué)的時(shí)候因?yàn)樯狭艘淮坞娪靶蕾p的選修課,知道了中國第五代,第六代導(dǎo)演,知道了蒙太奇,當(dāng)時(shí)覺得說要把世界上的經(jīng)...
    生命是一次饋贈(zèng)的旅行閱讀 1,242評(píng)論 0 0

友情鏈接更多精彩內(nèi)容