groupcache 源碼系列五 peers.go http.go

先上結(jié)論:

當(dāng)客戶端連上groupcache時(shí),能做的只有g(shù)et獲取數(shù)據(jù),如果本地有所需要的數(shù)據(jù),則直接返回,如果沒(méi)有,則通過(guò)一致性哈希函數(shù)判斷這個(gè)key所對(duì)應(yīng)的peer,然后通過(guò)http從這個(gè)peer上獲取數(shù)據(jù);如果這個(gè)peer上有需要的數(shù)據(jù),則通過(guò)http回復(fù)給之前的那個(gè)groupcache;groupcache收到之后,保存在本地hotCache中,并返回給客戶端;如果peer上也沒(méi)有所需要的數(shù)據(jù),則groupcache從數(shù)據(jù)源(數(shù)據(jù)庫(kù)或者文件)獲取數(shù)據(jù),并將數(shù)據(jù)保存在本地mainCache,并返回給客戶端

以上結(jié)論對(duì)應(yīng)的源碼部分出現(xiàn)在groupcache.go中,暫時(shí)不作分析,先來(lái)看看peers.go中的內(nèi)容。

1.NoPeers

peers.go默認(rèn)實(shí)現(xiàn)了一個(gè)沒(méi)有peer的結(jié)構(gòu),也就是在groupcache 源碼系列四 Sink ByteView里那個(gè)簡(jiǎn)單的例子所運(yùn)行的情況。

// Context is an opaque value passed through calls to the
// ProtoGetter. It may be nil if your ProtoGetter implementation does
// not require a context.
type Context interface{}

// ProtoGetter is the interface that must be implemented by a peer.
type ProtoGetter interface {
    Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error
}

// PeerPicker is the interface that must be implemented to locate
// the peer that owns a specific key.
type PeerPicker interface {
    // PickPeer returns the peer that owns the specific key
    // and true to indicate that a remote peer was nominated.
    // It returns nil, false if the key owner is the current peer.
    PickPeer(key string) (peer ProtoGetter, ok bool)
}

// NoPeers is an implementation of PeerPicker that never finds a peer.
type NoPeers struct{}

func (NoPeers) PickPeer(key string) (peer ProtoGetter, ok bool) { return }

所有的peer結(jié)構(gòu)都要實(shí)現(xiàn)PeerPicker接口,即給定一個(gè)字符串,返回一個(gè)ProtoGetter,ok bool。在后面還會(huì)看到,如果是有peer的情況,用的是HTTPPool來(lái)代替NoPeers。

2.func getPeers

接下來(lái)提供了一個(gè)get方法,會(huì)根據(jù)groupName返回這種peer結(jié)構(gòu),當(dāng)然它們的共同點(diǎn)是一樣的,也就是返回值類型為PeerPicker接口

func getPeers(groupName string) PeerPicker {
    if portPicker == nil {
        return NoPeers{}
    }
    pk := portPicker(groupName)
    if pk == nil {
        pk = NoPeers{}
    }
    return pk
}

這里portPicker是一個(gè)func類型,允許自定義:

var (
    portPicker func(groupName string) PeerPicker
)

// RegisterPeerPicker registers the peer initialization function.
// It is called once, when the first group is created.
// Either RegisterPeerPicker or RegisterPerGroupPeerPicker should be
// called exactly once, but not both.
func RegisterPeerPicker(fn func() PeerPicker) {
    if portPicker != nil {
        panic("RegisterPeerPicker called more than once")
    }
    portPicker = func(_ string) PeerPicker { return fn() }
}

// RegisterPerGroupPeerPicker registers the peer initialization function,
// which takes the groupName, to be used in choosing a PeerPicker.
// It is called once, when the first group is created.
// Either RegisterPeerPicker or RegisterPerGroupPeerPicker should be
// called exactly once, but not both.
func RegisterPerGroupPeerPicker(fn func(groupName string) PeerPicker) {
    if portPicker != nil {
        panic("RegisterPeerPicker called more than once")
    }
    portPicker = fn
}

也就是說(shuō)只要一個(gè)func返回PeerPicker,就能自定義了。在http.go里可以搜索到這種使用方式:

func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
    ...

    p := &HTTPPool{
        self:        self,
        httpGetters: make(map[string]*httpGetter),
    }
    ...
    if p.opts.Replicas == 0 {
        p.opts.Replicas = defaultReplicas
    }
    p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)

    RegisterPeerPicker(func() PeerPicker { return p })
    return p
}
3.HTTPPool

上面的NewHTTPPoolOpts中,RegisterPeerPicker注冊(cè)的函數(shù)里,直接返回了p,是個(gè)HTTPPool類型,那么可以確定HTTPPool必然實(shí)現(xiàn)了PeerPicker接口,也就是PickPeer方法,很容易得到驗(yàn)證:

func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {
    p.mu.Lock()
    defer p.mu.Unlock()
    if p.peers.IsEmpty() {
        return nil, false
    }
    if peer := p.peers.Get(key); peer != p.self {
        return p.httpGetters[peer], true
    }
    return nil, false
}

所以p.peers是什么呢,p.httpGetters又是什么?接著找:

peers       *consistenthash.Map

簡(jiǎn)單來(lái)說(shuō)這是個(gè)映射,實(shí)現(xiàn)了哈希一致,具體參考groupcache 源碼系列一 consistent hash一致性哈希算法,提供Get,Add方法。順便也能明白上面提到的Replicas,是哈希一致用到的虛擬節(jié)點(diǎn)數(shù)。

func (m *Map) Get(key string) string {
func (m *Map) Add(keys ...string) {

Get已經(jīng)看到在哪里用了,那么Add呢?

// Set updates the pool's list of peers.
// Each peer value should be a valid base URL,
// for example "http://example.net:8000".
func (p *HTTPPool) Set(peers ...string) {
    p.mu.Lock()
    defer p.mu.Unlock()
    p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
    p.peers.Add(peers...)
    p.httpGetters = make(map[string]*httpGetter, len(peers))
    for _, peer := range peers {
        p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
    }
}

在這個(gè)Set方法中,還對(duì)peers和httpGetters做了關(guān)聯(lián),就和func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {里看到的一樣,也是通過(guò)映射,用peer這個(gè)字符串作為key。比如

peers_addrs = []string{"http://127.0.0.1:8001", 
"http://127.0.0.1:8002", "http://127.0.0.1:8003"}
peers.Set(peers_addrs...)

這樣當(dāng)我們查一個(gè)值時(shí),如果需要找相應(yīng)的peer字符串,就把這個(gè)值扔到consistenthash.Map里哈希一下,找到相應(yīng)的節(jié)點(diǎn),返回那個(gè)節(jié)點(diǎn)對(duì)應(yīng)的字符串,比如"http://127.0.0.1:8001",然后再拿這個(gè)串,找到相應(yīng)的httpGetter。

4.httpGetter

下面肯定是要去看httpGetter是怎么回事了,

type httpGetter struct {
    transport func(Context) http.RoundTripper
    baseURL   string
}

var bufferPool = sync.Pool{
    New: func() interface{} { return new(bytes.Buffer) },
}

這里可以參考Golang http.RoundTripperGolang sync.Pool 和 偽共享false share。

然后就是實(shí)現(xiàn)了peers.go里面提到的ProtoGetter接口,也就是說(shuō)httpGetter其實(shí)就是一個(gè)ProtoGetter,它可以幫助我們?cè)趐eer之間使用protobuf來(lái)處理數(shù)據(jù)。

func (h *httpGetter) Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error {
    u := fmt.Sprintf(
        "%v%v/%v",
        h.baseURL,
        url.QueryEscape(in.GetGroup()),
        url.QueryEscape(in.GetKey()),
    )
    req, err := http.NewRequest("GET", u, nil)
    if err != nil {
        return err
    }
    tr := http.DefaultTransport
    if h.transport != nil {
        tr = h.transport(context)
    }
    res, err := tr.RoundTrip(req)
    if err != nil {
        return err
    }
    defer res.Body.Close()
    if res.StatusCode != http.StatusOK {
        return fmt.Errorf("server returned: %v", res.Status)
    }
    b := bufferPool.Get().(*bytes.Buffer)
    b.Reset()
    defer bufferPool.Put(b)
    _, err = io.Copy(b, res.Body)
    if err != nil {
        return fmt.Errorf("reading response body: %v", err)
    }
    err = proto.Unmarshal(b.Bytes(), out)
    if err != nil {
        return fmt.Errorf("decoding response body: %v", err)
    }
    return nil
}

通過(guò)fmt.Sprintf參數(shù),用/把group和key分割開,然后做了一個(gè)Get請(qǐng)求,然后再用proto.Unmarshal把數(shù)據(jù)放到out * pb.GetResponse里面。這個(gè)方法在哪里調(diào)用呢,在groupcache.go里面有:

func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) {
    req := &pb.GetRequest{
        Group: &g.name,
        Key:   &key,
    }
    res := &pb.GetResponse{}
    err := peer.Get(ctx, req, res)
    if err != nil {
        return ByteView{}, err
    }
    value := ByteView{b: res.Value}
    // TODO(bradfitz): use res.MinuteQps or something smart to
    // conditionally populate hotCache.  For now just do it some
    // percentage of the time.
    if rand.Intn(10) == 0 {
        g.populateCache(key, value, &g.hotCache)
    }
    return value, nil
}

構(gòu)造了GetRequest,GetResponse結(jié)構(gòu),然后用傳入的ProtoGetter執(zhí)行了上面說(shuō)的Get請(qǐng)求。

5.對(duì)peer請(qǐng)求的處理

對(duì)peer的請(qǐng)求確實(shí)是發(fā)出來(lái)了,但是peer作為一個(gè)http服務(wù)器,應(yīng)該怎么處理這個(gè)請(qǐng)求呢?在http.go里很容易找到:

func NewHTTPPool(self string) *HTTPPool {
    p := NewHTTPPoolOpts(self, nil)
    http.Handle(p.opts.BasePath, p)
    return p
}

這里基礎(chǔ)知識(shí)可以參考Golang http.Handler接口,根據(jù)func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {,很顯然HTTPPool還要實(shí)現(xiàn)http.Handler接口,也就是ServeHTTP(w ResponseWriter, r *Request):

func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // Parse request.
    if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) {
        panic("HTTPPool serving unexpected path: " + r.URL.Path)
    }
    parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
    if len(parts) != 2 {
        http.Error(w, "bad request", http.StatusBadRequest)
        return
    }
    fmt.Println("parts:",parts)
    groupName := parts[0]
    key := parts[1]

    // Fetch the value for this group/key.
    group := GetGroup(groupName)
    if group == nil {
        http.Error(w, "no such group: "+groupName, http.StatusNotFound)
        return
    }
    var ctx Context
    if p.Context != nil {
        ctx = p.Context(r)
    }

    group.Stats.ServerRequests.Add(1)
    var value []byte
    err := group.Get(ctx, key, AllocatingByteSliceSink(&value))
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    // Write the value to the response body as a proto message.
    body, err := proto.Marshal(&pb.GetResponse{Value: value})
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    w.Header().Set("Content-Type", "application/x-protobuf")
    w.Write(body)
}

前面說(shuō)到包裝Get請(qǐng)求時(shí),用/把groupname和key組合到一起,這里當(dāng)然是要先拆分開,再解析。剩下的事情就是從相應(yīng)的group里找到數(shù)據(jù),用proto包裝好,扔出去。

6.group
上面部分代碼是groupcache.go中的,也涉及到了Group結(jié)構(gòu)。下一篇重點(diǎn)講述Group。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容