1. 前言
轉(zhuǎn)載請說明原文出處, 尊重他人勞動成果!
源碼位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)
在上文 [istio源碼分析][pilot] pilot之configController (mcp client) 分析了
pilot中的configController的作用, 本文將分析pilot中ServiceController的作用.
2. ServiceController
看一下
serviceController是如何初始化的.
// pilot/cmd/pilot-discovery/main.go
var (
serverArgs = bootstrap.PilotArgs{
CtrlZOptions: ctrlz.DefaultOptions(),
KeepaliveOptions: keepalive.DefaultOption(),
}
...
)
...
discoveryServer, err := bootstrap.NewServer(serverArgs)
...
func init() {
discoveryCmd.PersistentFlags().StringSliceVar(&serverArgs.Service.Registries, "registries",
[]string{string(serviceregistry.KubernetesRegistry)},
fmt.Sprintf("Comma separated list of platform service registries to read from (choose one or more from {%s, %s, %s, %s})",
serviceregistry.KubernetesRegistry, serviceregistry.ConsulRegistry, serviceregistry.MCPRegistry, serviceregistry.MockRegistry))
...
}
// pilot/pkg/bootstrap/server.go
func NewServer(args PilotArgs) (*Server, error) {
...
if err := s.initServiceControllers(&args); err != nil {
return nil, fmt.Errorf("service controllers: %v", err)
}
...
}
可以看到是調(diào)用
initServiceControllers方法來初始化s.ServiceController, 并且傳入了參數(shù)args(來源自serverArgs).
2.1 initServiceControllers
// pilot/pkg/bootstrap/server.go
func (s *Server) initServiceControllers(args *PilotArgs) error {
// 創(chuàng)建一個aggregate controller
serviceControllers := aggregate.NewController()
registered := make(map[serviceregistry.ServiceRegistry]bool)
for _, r := range args.Service.Registries {
serviceRegistry := serviceregistry.ServiceRegistry(r)
if _, exists := registered[serviceRegistry]; exists {
log.Warnf("%s registry specified multiple times.", r)
continue
}
registered[serviceRegistry] = true
log.Infof("Adding %s registry adapter", serviceRegistry)
switch serviceRegistry {
case serviceregistry.MockRegistry:
...
case serviceregistry.KubernetesRegistry:
if err := s.createK8sServiceControllers(serviceControllers, args); err != nil {
return err
}
case serviceregistry.ConsulRegistry:
...
case serviceregistry.MCPRegistry:
...
}
}
// 利用configController創(chuàng)建serviceEntryStore
serviceEntryStore := external.NewServiceDiscovery(s.configController, s.istioConfigStore)
// 關(guān)于ServiceEntry的registry
serviceEntryRegistry := aggregate.Registry{
Name: "ServiceEntries",
Controller: serviceEntryStore,
ServiceDiscovery: serviceEntryStore,
}
serviceControllers.AddRegistry(serviceEntryRegistry)
// serviceControllers是一些aggregate.Registry的集合
s.ServiceController = serviceControllers
// 運(yùn)行ServiceController
s.addStartFunc(func(stop <-chan struct{}) error {
go s.ServiceController.Run(stop)
return nil
})
return nil
}
1. 初始化一個
aggregate.controller.
// pilot/pkg/serviceregistry/aggregate/controller.go
type Controller struct {
registries []Registry
storeLock sync.RWMutex
}
func NewController() *Controller {
return &Controller{
registries: []Registry{},
}
}
可以看到
aggregate.controller主要是一些Registry的集合.
2. 從最初的
init方法可以知道args.Service.Registries在運(yùn)行的時候沒有指定的話默認(rèn)是只有serviceregistry.KubernetesRegistry.
func (s *Server) createK8sServiceControllers(serviceControllers *aggregate.Controller, args *PilotArgs) (err error) {
clusterID := string(serviceregistry.KubernetesRegistry)
log.Infof("Primary Cluster name: %s", clusterID)
args.Config.ControllerOptions.ClusterID = clusterID
// 創(chuàng)建一個controller2.Controller
kubectl := controller2.NewController(s.kubeClient, args.Config.ControllerOptions)
s.kubeRegistry = kubectl
// 組裝成Registry加入到serviceControllers中
serviceControllers.AddRegistry(
aggregate.Registry{
Name: serviceregistry.KubernetesRegistry,
ClusterID: clusterID,
ServiceDiscovery: kubectl,
Controller: kubectl,
})
return
}
創(chuàng)建了一個
controller2.Controller(后面會詳細(xì)分析該Controller), 然后組裝成一個Registry加入到serviceControllers中.
3. 利用
configController創(chuàng)建serviceEntryStore, 并組裝成一個Registry加入到serviceControllers中.
3. ServiceRegistry
從
initServiceControllers中可以到所有的類涉及到pilog/pkg/serviceregistry.
.
├── aggregate // 所有controller的集合
├── consul // consul
├── external // serviceEntry使用
├── kube // k8s環(huán)境
├── memory // 測試使用
└── platform.go // 常量
由于
k8s比較有典型性, 所以這里先分析k8s環(huán)境, 然后別的部分就會比較好理解.
先看一下
Controller的定義:
// pilot/pkg/model/controller.go
type Controller interface {
// AppendServiceHandler notifies about changes to the service catalog.
AppendServiceHandler(f func(*Service, Event)) error
// AppendInstanceHandler notifies about changes to the service instances
// for a service.
AppendInstanceHandler(f func(*ServiceInstance, Event)) error
// Run until a signal is received
Run(stop <-chan struct{})
}
總共三個方法:
1.Run運(yùn)行該controller.
2.AppendServiceHandler和AppendInstanceHandler動態(tài)為該controller添加handler.
3.1 kube
所以就從
Controller實現(xiàn)這三個方法的角度分析.
// pilot/pkg/serviceregistry/kube/controller/controller.go
func NewController(client kubernetes.Interface, options Options) *Controller {
log.Infof("Service controller watching namespace %q for services, endpoints, nodes and pods, refresh %s",
options.WatchedNamespace, options.ResyncPeriod)
// Queue requires a time duration for a retry delay after a handler error
out := &Controller{
domainSuffix: options.DomainSuffix,
client: client,
queue: kube.NewQueue(1 * time.Second),
ClusterID: options.ClusterID,
XDSUpdater: options.XDSUpdater,
servicesMap: make(map[host.Name]*model.Service),
externalNameSvcInstanceMap: make(map[host.Name][]*model.ServiceInstance),
}
sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace))
svcInformer := sharedInformers.Core().V1().Services().Informer()
out.services = out.createCacheHandler(svcInformer, "Services")
epInformer := sharedInformers.Core().V1().Endpoints().Informer()
out.endpoints = out.createEDSCacheHandler(epInformer, "Endpoints")
nodeInformer := sharedInformers.Core().V1().Nodes().Informer()
out.nodes = out.createCacheHandler(nodeInformer, "Nodes")
podInformer := sharedInformers.Core().V1().Pods().Informer()
out.pods = newPodCache(out.createCacheHandler(podInformer, "Pod"), out)
return out
}
可以看到
Controller主要對Service,Endpoint,Node和Pod這幾個資源的初始化.
3.1.1 createCacheHandler
另外通過
createCacheHandler和createEDSCacheHandler創(chuàng)建了cacheHandler.
// pilot/pkg/serviceregistry/kube/controller/controller.go
func (c *Controller) createCacheHandler(informer cache.SharedIndexInformer, otype string) cacheHandler {
handler := &kube.ChainHandler{Funcs: []kube.Handler{c.notify}}
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
incrementEvent(otype, "add")
c.queue.Push(kube.Task{Handler: handler.Apply, Obj: obj, Event: model.EventAdd})
},
...
})
return cacheHandler{informer: informer, handler: handler}
}
可以看到這里使用了一個
queue來做緩存帶, 將對某個資源的Add update delete事件組裝成一個kube.Task放入到c.queue中, 另外c.queue取出來之后進(jìn)行操作. 那怎么操作呢? 看一下ChainHandler是什么結(jié)構(gòu).
// pilot/pkg/serviceregistry/kube/controller/controller.go
type cacheHandler struct {
informer cache.SharedIndexInformer
handler *ChainHandler
}
// pilot/pkg/serviceregistry/kube/queue.go
type ChainHandler struct {
Funcs []Handler
}
// Apply is the handler function
func (ch *ChainHandler) Apply(obj interface{}, event model.Event) error {
for _, f := range ch.Funcs {
if err := f(obj, event); err != nil {
return err
}
}
return nil
}
// Append a handler as the last handler in the chain
func (ch *ChainHandler) Append(h Handler) {
ch.Funcs = append(ch.Funcs, h)
}
func (q *queueImpl) Run(stop <-chan struct{}) {
...
for {
...
var item Task
item, q.queue = q.queue[0], q.queue[1:]
q.cond.L.Unlock()
if err := item.Handler(item.Obj, item.Event); err != nil {
...
}
}
}
所以從
c.queue里面出來之后, 會執(zhí)行ChainHandler.Apply方法, 然后可以看到Apply方法中是執(zhí)行一系列通過Append方法注冊在該ChainHandler中的Handler, 從createCacheHandler中可以知道初始化的時候就已經(jīng)放了一個c.notify這個Handler.
所以接下來分析外部如何可以動態(tài)的添加
handler, 那controller的兩個方法AppendServiceHandler和AppendInstanceHandler就是這個作用.
3.1.2 AppendServiceHandler 和 AppendInstanceHandler
func (c *Controller) AppendServiceHandler(f func(*model.Service, model.Event)) error {
c.services.handler.Append(func(obj interface{}, event model.Event) error {
svc, ok := obj.(*v1.Service)
...
// 將k8s.Service轉(zhuǎn)化為istio.Service
svcConv := kube.ConvertService(*svc, c.domainSuffix, c.ClusterID)
// 生成istio.ServiceInstance
instances := kube.ExternalNameServiceInstances(*svc, svcConv)
switch event {
case model.EventDelete:
c.Lock()
delete(c.servicesMap, svcConv.Hostname)
delete(c.externalNameSvcInstanceMap, svcConv.Hostname)
c.Unlock()
default:
c.Lock()
// fmt.Sprintf("%s.%s.svc.%s", name, namespace, domainSuffix) 為 key
// istio.Service 為 value
c.servicesMap[svcConv.Hostname] = svcConv
if instances == nil {
delete(c.externalNameSvcInstanceMap, svcConv.Hostname)
} else {
c.externalNameSvcInstanceMap[svcConv.Hostname] = instances
}
c.Unlock()
}
// 通過c.XDSUpdater更新
c.XDSUpdater.SvcUpdate(c.ClusterID, hostname, ports, portsByNum)
// 調(diào)用傳入的handler
f(svcConv, event)
return nil
})
return nil
}
這里注意幾個重點:
1. 將k8s.Service轉(zhuǎn)化為 名為svcConv的isito.Service.
2.controller.servicesMap相當(dāng)于本地的緩存保存著istio.service.
3. 如果該Service類型是ExternalName, 需要獲得其instances并保存到本地緩存到externalNameSvcInstanceMap.
4. 通過XDSUpdater來告知下游更新數(shù)據(jù).
5. 調(diào)用傳入的handler方法.
6. 注意這個是增加在c.services中.
func (c *Controller) AppendInstanceHandler(f func(*model.ServiceInstance, model.Event)) error {
if c.endpoints.handler == nil {
return nil
}
c.endpoints.handler.Append(func(obj interface{}, event model.Event) error {
ep, ok := obj.(*v1.Endpoints)
...
c.updateEDS(ep, event)
return nil
})
return nil
}
func (c *Controller) updateEDS(ep *v1.Endpoints, event model.Event) {
hostname := kube.ServiceHostname(ep.Name, ep.Namespace, c.domainSuffix)
mixerEnabled := c.Env != nil && c.Env.Mesh != nil && (c.Env.Mesh.MixerCheckServer != "" || c.Env.Mesh.MixerReportServer != "")
endpoints := make([]*model.IstioEndpoint, 0)
if event != model.EventDelete {
for _, ss := range ep.Subsets {
for _, ea := range ss.Addresses {
pod := c.pods.getPodByIP(ea.IP)
...
var labels map[string]string
locality, sa, uid := "", "", ""
if pod != nil {
locality = c.GetPodLocality(pod)
sa = kube.SecureNamingSAN(pod)
if mixerEnabled {
uid = fmt.Sprintf("kubernetes://%s.%s", pod.Name, pod.Namespace)
}
labels = map[string]string(configKube.ConvertLabels(pod.ObjectMeta))
}
// 組裝成istio.endpoint
for _, port := range ss.Ports {
endpoints = append(endpoints, &model.IstioEndpoint{
Address: ea.IP,
EndpointPort: uint32(port.Port),
ServicePortName: port.Name,
Labels: labels,
UID: uid,
ServiceAccount: sa,
Network: c.endpointNetwork(ea.IP),
Locality: locality,
Attributes: model.ServiceAttributes{Name: ep.Name, Namespace: ep.Namespace},
})
}
}
}
}
...
_ = c.XDSUpdater.EDSUpdate(c.ClusterID, string(hostname), ep.Namespace, endpoints)
}
1. 將當(dāng)前的
endpoint按照Subsets,Addresses和Ports組裝成多個istioEndpoint, 也就是endpoints.
2. 通過c.XDSUpdater.EDSUpdate來告知下游更新數(shù)據(jù).
3.1.3 Run
很常規(guī), 啟動
pod,Service,node和endpoint的informer的Run方法.
k8s.png
3.2 external
// pilot/pkg/bootstrap/server.go
serviceEntryStore := external.NewServiceDiscovery(s.configController, s.istioConfigStore)
// pilot/pkg/serviceregistry/external/servicediscovery.go
func NewServiceDiscovery(callbacks model.ConfigStoreCache, store model.IstioConfigStore) *ServiceEntryStore {
c := &ServiceEntryStore{
serviceHandlers: make([]serviceHandler, 0),
instanceHandlers: make([]instanceHandler, 0),
store: store,
ip2instance: map[string][]*model.ServiceInstance{},
instances: map[host.Name]map[string][]*model.ServiceInstance{},
updateNeeded: true,
}
if callbacks != nil {
callbacks.RegisterEventHandler(model.ServiceEntry.Type, func(config model.Config, event model.Event) {
// Recomputing the index here is too expensive.
c.changeMutex.Lock()
c.lastChange = time.Now()
c.updateNeeded = true
c.changeMutex.Unlock()
// 從config轉(zhuǎn)化成model.Service(istio.Service)
services := convertServices(config)
for _, handler := range c.serviceHandlers {
for _, service := range services {
go handler(service, event)
}
}
// 從config轉(zhuǎn)化成model.ServiceInstance
instances := convertInstances(config)
for _, handler := range c.instanceHandlers {
for _, instance := range instances {
go handler(instance, event)
}
}
})
}
return c
}
1. 可以看到
callbacks就是s.configController, 從 [istio源碼分析][pilot] pilot之configController (mcp client) 知道可以向其注冊handler, 可以當(dāng)類型是ServiceEntry的時候就是通過handler進(jìn)行處理的.
查看
handlers和Run方法
func (d *ServiceEntryStore) AppendServiceHandler(f func(*model.Service, model.Event)) error {
d.serviceHandlers = append(d.serviceHandlers, f)
return nil
}
func (d *ServiceEntryStore) AppendInstanceHandler(f func(*model.ServiceInstance, model.Event)) error {
d.instanceHandlers = append(d.instanceHandlers, f)
return nil
}
func (d *ServiceEntryStore) Run(stop <-chan struct{}) {}
可以看到
Run方法并沒有做什么, 這是因為mcp client在接收到mcp server(galley)的response后會將數(shù)據(jù)交由s.configController處理, 當(dāng)類型是serviceEntry的時候會調(diào)用其注冊的handler, 也就是NewServiceDiscovery方法中的callbacks.RegisterEventHandler.
3.3 aggregate
aggregate本質(zhì)就是一些controller的集合.
4. 參考
1.
istio 1.3.6源碼
