nsqd是一個守護進程,用來接收和轉(zhuǎn)發(fā)消息。和前文提到的nsqdlookup類似,它同樣使用go-svc來管理進程。而在啟動服務(wù)的時候,不僅支持tcp和http,還支持https。本文主要分析nsqd源碼中值得借鑒的點。
1. 加載數(shù)據(jù)文件
在nsqd的start和stop函數(shù)中,程序除了讀取配置以后,還涉及數(shù)據(jù)文件的加載和寫入。數(shù)據(jù)文件中主要包含當(dāng)前nsqd的topic和channel信息。
- 數(shù)據(jù)是以json格式存放的,格式定義為meta。包含topic和channel的信息,以及是否paused
- 數(shù)據(jù)存在兩個文件中,一個代表當(dāng)前的,另一個帶ID的是用來作回滾的(從注釋上看是這樣的,但是這個文件可以是符號鏈接,具體應(yīng)用場景未知)
有以下幾點是可以學(xué)習(xí)的:
- 使用atomic原子操作來處理一些線程間共享的數(shù)據(jù),避免使用鎖,可以簡化代碼,降低開銷
- 寫文件的時候,可以先寫一個tmp文件,寫成功了再rename為正式的問題。
- 寫文件時,將文件fsync落盤用sync()
- windows建立symlink需要管理員權(quán)限,所以需要重新寫一份,而linux可以通過os.Symlink直接建立符號連接,不用寫兩份文件。
// nsqd/nsqd.go
type meta struct {
Topics []struct {
Name string `json:"name"`
Paused bool `json:"paused"`
Channels []struct {
Name string `json:"name"`
Paused bool `json:"paused"`
} `json:"channels"`
} `json:"topics"`
}
func writeSyncFile(fn string, data []byte) error {
f, err := os.OpenFile(fn, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}
_, err = f.Write(data)
if err == nil {
err = f.Sync()
}
f.Close()
return err
}
func (n *NSQD) LoadMetadata() error {
atomic.StoreInt32(&n.isLoading, 1)
defer atomic.StoreInt32(&n.isLoading, 0)
fn := newMetadataFile(n.getOpts())
// old metadata filename with ID, maintained in parallel to enable roll-back
fnID := oldMetadataFile(n.getOpts())
// ......
// 此處省略代碼為讀取文件的過程
// ......
var m meta
err = json.Unmarshal(data, &m)
// ......
// 此處省略代碼為恢復(fù)數(shù)據(jù)過程
// ......
return nil
}
func (n *NSQD) PersistMetadata() error {
// ......
// 此處省略代碼為獲取數(shù)據(jù)過程
// ......
tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
err = writeSyncFile(tmpFileName, data)
if err != nil {
return err
}
err = os.Rename(tmpFileName, fileName)
if err != nil {
return err
}
// technically should fsync DataPath here
stat, err := os.Lstat(fileNameID)
if err == nil && stat.Mode()&os.ModeSymlink != 0 {
return nil
}
// if no symlink (yet), race condition:
// crash right here may cause next startup to see metadata conflict and abort
tmpFileNameID := fmt.Sprintf("%s.%d.tmp", fileNameID, rand.Int())
if runtime.GOOS != "windows" {
err = os.Symlink(fileName, tmpFileNameID)
} else {
// on Windows need Administrator privs to Symlink
// instead write copy every time
err = writeSyncFile(tmpFileNameID, data)
}
if err != nil {
return err
}
err = os.Rename(tmpFileNameID, fileNameID)
if err != nil {
return err
}
// technically should fsync DataPath here
return nil
}
2. 在用flag模塊讀取命令行配置時,可以使用flag.Var,讀取指定類型的配置。
type Value interface {
String() string
Set(string) error
}
func (f *FlagSet) Var(value Value, name string, usage string)
Var方法使用指定的名字、使用信息注冊一個flag。該flag的類型和值由第一個參數(shù)表示,該參數(shù)應(yīng)實現(xiàn)了Value接口。例如:
// nsqd/nsqd.go
type tlsMinVersionOption uint16
func (t *tlsMinVersionOption) Set(s string) error {
s = strings.ToLower(s)
switch s {
case "":
return nil
case "ssl3.0":
*t = tls.VersionSSL30
case "tls1.0":
*t = tls.VersionTLS10
case "tls1.1":
*t = tls.VersionTLS11
case "tls1.2":
*t = tls.VersionTLS12
default:
return fmt.Errorf("unknown tlsVersionOption %q", s)
}
return nil
}
func (t *tlsMinVersionOption) Get() interface{} { return uint16(*t) }
func (t *tlsMinVersionOption) String() string {
return strconv.FormatInt(int64(*t), 10)
}
......
......
tlsRequired := tlsRequiredOption(opts.TLSRequired)
tlsMinVersion := tlsMinVersionOption(opts.TLSMinVersion)
3. 可以使用一個chan來結(jié)束一個線程。原理是close chan之后,chan會輸出該類型的零值。
// 結(jié)束的時候
// nsqd/nsqd.go
func (n *NSQD) Exit() {
......
close(n.exitChan)
......
}
func (n *NSQD) queueScanLoop() {
......
for {
select {
......
case <-n.exitChan:
goto exit
......
}
}
......
exit:
n.logf("QUEUESCAN: closing")
......
}
// nsqd/lookup.go
func (n *NSQD) lookupLoop() {
......
for {
select {
......
case <-n.exitChan:
goto exit
......
}
}
......
}