nsq源碼解讀之nsqd

nsqd是一個守護進程,用來接收和轉(zhuǎn)發(fā)消息。和前文提到的nsqdlookup類似,它同樣使用go-svc來管理進程。而在啟動服務(wù)的時候,不僅支持tcp和http,還支持https。本文主要分析nsqd源碼中值得借鑒的點。

1. 加載數(shù)據(jù)文件

在nsqd的start和stop函數(shù)中,程序除了讀取配置以后,還涉及數(shù)據(jù)文件的加載和寫入。數(shù)據(jù)文件中主要包含當(dāng)前nsqd的topic和channel信息。

  • 數(shù)據(jù)是以json格式存放的,格式定義為meta。包含topic和channel的信息,以及是否paused
  • 數(shù)據(jù)存在兩個文件中,一個代表當(dāng)前的,另一個帶ID的是用來作回滾的(從注釋上看是這樣的,但是這個文件可以是符號鏈接,具體應(yīng)用場景未知)

有以下幾點是可以學(xué)習(xí)的:

  • 使用atomic原子操作來處理一些線程間共享的數(shù)據(jù),避免使用鎖,可以簡化代碼,降低開銷
  • 寫文件的時候,可以先寫一個tmp文件,寫成功了再rename為正式的問題。
  • 寫文件時,將文件fsync落盤用sync()
  • windows建立symlink需要管理員權(quán)限,所以需要重新寫一份,而linux可以通過os.Symlink直接建立符號連接,不用寫兩份文件。
// nsqd/nsqd.go
type meta struct {
    Topics []struct {
        Name     string `json:"name"`
        Paused   bool   `json:"paused"`
        Channels []struct {
            Name   string `json:"name"`
            Paused bool   `json:"paused"`
        } `json:"channels"`
    } `json:"topics"`
}

func writeSyncFile(fn string, data []byte) error {
    f, err := os.OpenFile(fn, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
    if err != nil {
        return err
    }

    _, err = f.Write(data)
    if err == nil {
        err = f.Sync()
    }
    f.Close()
    return err
}

func (n *NSQD) LoadMetadata() error {
    atomic.StoreInt32(&n.isLoading, 1)
    defer atomic.StoreInt32(&n.isLoading, 0)

    fn := newMetadataFile(n.getOpts())
    // old metadata filename with ID, maintained in parallel to enable roll-back
    fnID := oldMetadataFile(n.getOpts())

    // ......
    // 此處省略代碼為讀取文件的過程
    // ......

    var m meta
    err = json.Unmarshal(data, &m)


    // ......
    // 此處省略代碼為恢復(fù)數(shù)據(jù)過程
    // ......

    return nil
}

func (n *NSQD) PersistMetadata() error {

    // ......
    // 此處省略代碼為獲取數(shù)據(jù)過程
    // ......
    
    tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())

    err = writeSyncFile(tmpFileName, data)
    if err != nil {
        return err
    }
    err = os.Rename(tmpFileName, fileName)
    if err != nil {
        return err
    }
    // technically should fsync DataPath here

    stat, err := os.Lstat(fileNameID)
    if err == nil && stat.Mode()&os.ModeSymlink != 0 {
        return nil
    }

    // if no symlink (yet), race condition:
    // crash right here may cause next startup to see metadata conflict and abort

    tmpFileNameID := fmt.Sprintf("%s.%d.tmp", fileNameID, rand.Int())

    if runtime.GOOS != "windows" {
        err = os.Symlink(fileName, tmpFileNameID)
    } else {
        // on Windows need Administrator privs to Symlink
        // instead write copy every time
        err = writeSyncFile(tmpFileNameID, data)
    }
    if err != nil {
        return err
    }

    err = os.Rename(tmpFileNameID, fileNameID)
    if err != nil {
        return err
    }
    // technically should fsync DataPath here

    return nil
}
2. 在用flag模塊讀取命令行配置時,可以使用flag.Var,讀取指定類型的配置。
type Value interface {
    String() string
    Set(string) error
}

func (f *FlagSet) Var(value Value, name string, usage string)

Var方法使用指定的名字、使用信息注冊一個flag。該flag的類型和值由第一個參數(shù)表示,該參數(shù)應(yīng)實現(xiàn)了Value接口。例如:

// nsqd/nsqd.go
type tlsMinVersionOption uint16

func (t *tlsMinVersionOption) Set(s string) error {
    s = strings.ToLower(s)
    switch s {
    case "":
        return nil
    case "ssl3.0":
        *t = tls.VersionSSL30
    case "tls1.0":
        *t = tls.VersionTLS10
    case "tls1.1":
        *t = tls.VersionTLS11
    case "tls1.2":
        *t = tls.VersionTLS12
    default:
        return fmt.Errorf("unknown tlsVersionOption %q", s)
    }
    return nil
}

func (t *tlsMinVersionOption) Get() interface{} { return uint16(*t) }

func (t *tlsMinVersionOption) String() string {
    return strconv.FormatInt(int64(*t), 10)
}

......
......

    tlsRequired := tlsRequiredOption(opts.TLSRequired)
    tlsMinVersion := tlsMinVersionOption(opts.TLSMinVersion)
3. 可以使用一個chan來結(jié)束一個線程。原理是close chan之后,chan會輸出該類型的零值。
// 結(jié)束的時候
// nsqd/nsqd.go
func (n *NSQD) Exit() {
    ......
    close(n.exitChan)
    ......
}

func (n *NSQD) queueScanLoop() {
    ......
    for {
        select {
        ......
        case <-n.exitChan:
            goto exit
        ......
        }
    }
    ......
exit:
    n.logf("QUEUESCAN: closing")
    ......
}

// nsqd/lookup.go
func (n *NSQD) lookupLoop() {
    ......
    for {
        select {
        ......
        case <-n.exitChan:
            goto exit
        ......
        }
    }
    ......
}
4. 最關(guān)鍵的topic的創(chuàng)建和消息的發(fā)布,將在接下來的文章中詳細(xì)分析。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,659評論 19 139
  • kafka的定義:是一個分布式消息系統(tǒng),由LinkedIn使用Scala編寫,用作LinkedIn的活動流(Act...
    時待吾閱讀 5,539評論 1 15
  • 前言 在微服務(wù)架構(gòu)的系統(tǒng)中,我們通常會使用輕量級的消息代理來構(gòu)建一個共用的消息主題讓系統(tǒng)中所有微服務(wù)實例都連接上來...
    Chandler_玨瑜閱讀 6,787評論 2 39
  • 新員工的學(xué)習(xí)思路 一個人,首先要確立自己的人生觀,價值觀,世界觀。三...
    ajin1973閱讀 265評論 0 0
  • 在落日下 持劍的手是金黃色 長劍一挑 擊破 落雪紛紛 淚無痕 人無魂 酒后風(fēng)采 仗劍多幾分 山之巔 手脫劍 腳步如...
    紅與黑1830閱讀 320評論 0 2

友情鏈接更多精彩內(nèi)容