先上結(jié)論:
當(dāng)客戶端連上groupcache時(shí),能做的只有g(shù)et獲取數(shù)據(jù),如果本地有所需要的數(shù)據(jù),則直接返回,如果沒(méi)有,則通過(guò)一致性哈希函數(shù)判斷這個(gè)key所對(duì)應(yīng)的peer,然后通過(guò)http從這個(gè)peer上獲取數(shù)據(jù);如果這個(gè)peer上有需要的數(shù)據(jù),則通過(guò)http回復(fù)給之前的那個(gè)groupcache;groupcache收到之后,保存在本地hotCache中,并返回給客戶端;如果peer上也沒(méi)有所需要的數(shù)據(jù),則groupcache從數(shù)據(jù)源(數(shù)據(jù)庫(kù)或者文件)獲取數(shù)據(jù),并將數(shù)據(jù)保存在本地mainCache,并返回給客戶端
以上結(jié)論對(duì)應(yīng)的源碼部分出現(xiàn)在groupcache.go中,暫時(shí)不作分析,先來(lái)看看peers.go中的內(nèi)容。
1.NoPeers
peers.go默認(rèn)實(shí)現(xiàn)了一個(gè)沒(méi)有peer的結(jié)構(gòu),也就是在groupcache 源碼系列四 Sink ByteView里那個(gè)簡(jiǎn)單的例子所運(yùn)行的情況。
// Context is an opaque value passed through calls to the
// ProtoGetter. It may be nil if your ProtoGetter implementation does
// not require a context.
type Context interface{}
// ProtoGetter is the interface that must be implemented by a peer.
type ProtoGetter interface {
Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error
}
// PeerPicker is the interface that must be implemented to locate
// the peer that owns a specific key.
type PeerPicker interface {
// PickPeer returns the peer that owns the specific key
// and true to indicate that a remote peer was nominated.
// It returns nil, false if the key owner is the current peer.
PickPeer(key string) (peer ProtoGetter, ok bool)
}
// NoPeers is an implementation of PeerPicker that never finds a peer.
type NoPeers struct{}
func (NoPeers) PickPeer(key string) (peer ProtoGetter, ok bool) { return }
所有的peer結(jié)構(gòu)都要實(shí)現(xiàn)PeerPicker接口,即給定一個(gè)字符串,返回一個(gè)ProtoGetter,ok bool。在后面還會(huì)看到,如果是有peer的情況,用的是HTTPPool來(lái)代替NoPeers。
2.func getPeers
接下來(lái)提供了一個(gè)get方法,會(huì)根據(jù)groupName返回這種peer結(jié)構(gòu),當(dāng)然它們的共同點(diǎn)是一樣的,也就是返回值類型為PeerPicker接口
func getPeers(groupName string) PeerPicker {
if portPicker == nil {
return NoPeers{}
}
pk := portPicker(groupName)
if pk == nil {
pk = NoPeers{}
}
return pk
}
這里portPicker是一個(gè)func類型,允許自定義:
var (
portPicker func(groupName string) PeerPicker
)
// RegisterPeerPicker registers the peer initialization function.
// It is called once, when the first group is created.
// Either RegisterPeerPicker or RegisterPerGroupPeerPicker should be
// called exactly once, but not both.
func RegisterPeerPicker(fn func() PeerPicker) {
if portPicker != nil {
panic("RegisterPeerPicker called more than once")
}
portPicker = func(_ string) PeerPicker { return fn() }
}
// RegisterPerGroupPeerPicker registers the peer initialization function,
// which takes the groupName, to be used in choosing a PeerPicker.
// It is called once, when the first group is created.
// Either RegisterPeerPicker or RegisterPerGroupPeerPicker should be
// called exactly once, but not both.
func RegisterPerGroupPeerPicker(fn func(groupName string) PeerPicker) {
if portPicker != nil {
panic("RegisterPeerPicker called more than once")
}
portPicker = fn
}
也就是說(shuō)只要一個(gè)func返回PeerPicker,就能自定義了。在http.go里可以搜索到這種使用方式:
func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
...
p := &HTTPPool{
self: self,
httpGetters: make(map[string]*httpGetter),
}
...
if p.opts.Replicas == 0 {
p.opts.Replicas = defaultReplicas
}
p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
RegisterPeerPicker(func() PeerPicker { return p })
return p
}
3.HTTPPool
上面的NewHTTPPoolOpts中,RegisterPeerPicker注冊(cè)的函數(shù)里,直接返回了p,是個(gè)HTTPPool類型,那么可以確定HTTPPool必然實(shí)現(xiàn)了PeerPicker接口,也就是PickPeer方法,很容易得到驗(yàn)證:
func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if p.peers.IsEmpty() {
return nil, false
}
if peer := p.peers.Get(key); peer != p.self {
return p.httpGetters[peer], true
}
return nil, false
}
所以p.peers是什么呢,p.httpGetters又是什么?接著找:
peers *consistenthash.Map
簡(jiǎn)單來(lái)說(shuō)這是個(gè)映射,實(shí)現(xiàn)了哈希一致,具體參考groupcache 源碼系列一 consistent hash一致性哈希算法,提供Get,Add方法。順便也能明白上面提到的Replicas,是哈希一致用到的虛擬節(jié)點(diǎn)數(shù)。
func (m *Map) Get(key string) string {
func (m *Map) Add(keys ...string) {
Get已經(jīng)看到在哪里用了,那么Add呢?
// Set updates the pool's list of peers.
// Each peer value should be a valid base URL,
// for example "http://example.net:8000".
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
p.peers.Add(peers...)
p.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
}
}
在這個(gè)Set方法中,還對(duì)peers和httpGetters做了關(guān)聯(lián),就和func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {里看到的一樣,也是通過(guò)映射,用peer這個(gè)字符串作為key。比如
peers_addrs = []string{"http://127.0.0.1:8001",
"http://127.0.0.1:8002", "http://127.0.0.1:8003"}
peers.Set(peers_addrs...)
這樣當(dāng)我們查一個(gè)值時(shí),如果需要找相應(yīng)的peer字符串,就把這個(gè)值扔到consistenthash.Map里哈希一下,找到相應(yīng)的節(jié)點(diǎn),返回那個(gè)節(jié)點(diǎn)對(duì)應(yīng)的字符串,比如"http://127.0.0.1:8001",然后再拿這個(gè)串,找到相應(yīng)的httpGetter。
4.httpGetter
下面肯定是要去看httpGetter是怎么回事了,
type httpGetter struct {
transport func(Context) http.RoundTripper
baseURL string
}
var bufferPool = sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
}
這里可以參考Golang http.RoundTripper和Golang sync.Pool 和 偽共享false share。
然后就是實(shí)現(xiàn)了peers.go里面提到的ProtoGetter接口,也就是說(shuō)httpGetter其實(shí)就是一個(gè)ProtoGetter,它可以幫助我們?cè)趐eer之間使用protobuf來(lái)處理數(shù)據(jù)。
func (h *httpGetter) Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error {
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(in.GetGroup()),
url.QueryEscape(in.GetKey()),
)
req, err := http.NewRequest("GET", u, nil)
if err != nil {
return err
}
tr := http.DefaultTransport
if h.transport != nil {
tr = h.transport(context)
}
res, err := tr.RoundTrip(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("server returned: %v", res.Status)
}
b := bufferPool.Get().(*bytes.Buffer)
b.Reset()
defer bufferPool.Put(b)
_, err = io.Copy(b, res.Body)
if err != nil {
return fmt.Errorf("reading response body: %v", err)
}
err = proto.Unmarshal(b.Bytes(), out)
if err != nil {
return fmt.Errorf("decoding response body: %v", err)
}
return nil
}
通過(guò)fmt.Sprintf參數(shù),用/把group和key分割開,然后做了一個(gè)Get請(qǐng)求,然后再用proto.Unmarshal把數(shù)據(jù)放到out * pb.GetResponse里面。這個(gè)方法在哪里調(diào)用呢,在groupcache.go里面有:
func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) {
req := &pb.GetRequest{
Group: &g.name,
Key: &key,
}
res := &pb.GetResponse{}
err := peer.Get(ctx, req, res)
if err != nil {
return ByteView{}, err
}
value := ByteView{b: res.Value}
// TODO(bradfitz): use res.MinuteQps or something smart to
// conditionally populate hotCache. For now just do it some
// percentage of the time.
if rand.Intn(10) == 0 {
g.populateCache(key, value, &g.hotCache)
}
return value, nil
}
構(gòu)造了GetRequest,GetResponse結(jié)構(gòu),然后用傳入的ProtoGetter執(zhí)行了上面說(shuō)的Get請(qǐng)求。
5.對(duì)peer請(qǐng)求的處理
對(duì)peer的請(qǐng)求確實(shí)是發(fā)出來(lái)了,但是peer作為一個(gè)http服務(wù)器,應(yīng)該怎么處理這個(gè)請(qǐng)求呢?在http.go里很容易找到:
func NewHTTPPool(self string) *HTTPPool {
p := NewHTTPPoolOpts(self, nil)
http.Handle(p.opts.BasePath, p)
return p
}
這里基礎(chǔ)知識(shí)可以參考Golang http.Handler接口,根據(jù)func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {,很顯然HTTPPool還要實(shí)現(xiàn)http.Handler接口,也就是ServeHTTP(w ResponseWriter, r *Request):
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Parse request.
if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) {
panic("HTTPPool serving unexpected path: " + r.URL.Path)
}
parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
if len(parts) != 2 {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
fmt.Println("parts:",parts)
groupName := parts[0]
key := parts[1]
// Fetch the value for this group/key.
group := GetGroup(groupName)
if group == nil {
http.Error(w, "no such group: "+groupName, http.StatusNotFound)
return
}
var ctx Context
if p.Context != nil {
ctx = p.Context(r)
}
group.Stats.ServerRequests.Add(1)
var value []byte
err := group.Get(ctx, key, AllocatingByteSliceSink(&value))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Write the value to the response body as a proto message.
body, err := proto.Marshal(&pb.GetResponse{Value: value})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/x-protobuf")
w.Write(body)
}
前面說(shuō)到包裝Get請(qǐng)求時(shí),用/把groupname和key組合到一起,這里當(dāng)然是要先拆分開,再解析。剩下的事情就是從相應(yīng)的group里找到數(shù)據(jù),用proto包裝好,扔出去。
6.group
上面部分代碼是groupcache.go中的,也涉及到了Group結(jié)構(gòu)。下一篇重點(diǎn)講述Group。