背景
當(dāng)前控制器新增實(shí)例后要不就是只有單一可以執(zhí)行要不就是多個(gè)分片執(zhí)行(但是客戶端還是會(huì)收到全量事件)
k8s 1.36新增了ShardedListAndWatch這個(gè)特性來(lái)優(yōu)化這個(gè)問(wèn)題,1.36當(dāng)前是alpha階段默認(rèn)關(guān)閉
簡(jiǎn)單介紹
ShardedListAndWatch這個(gè)特性讓event分片在apiserver側(cè)執(zhí)行,客戶端不需要收到全量的event
源碼
staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go中
list過(guò)濾
func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate, options *metainternalversion.ListOptions) (runtime.Object, error) {
...
如果開(kāi)啟了ShardedListAndWatch特性,且指定了ShardSelector,解析并設(shè)置設(shè)置ShardSelector
if utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) && options.ShardSelector != "" {
sel, err := sharding.Parse(options.ShardSelector)
if err != nil {
return nil, fmt.Errorf("invalid shard selector: %w", err)
}
p.ShardSelector = sel
}
...
err := e.Storage.GetList(ctx, e.KeyRootFunc(ctx), storageOpts, list)
}
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go中
獲取list
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
...
等待緩存足夠新然后獲取list
resp, indexUsed, err := c.watchCache.WaitUntilFreshAndGetList(ctx, preparedKey, opts)
if err != nil {
return err
}
...
for i, obj := range resp.Items {
elem, ok := obj.(*store.Element)
if !ok {
return fmt.Errorf("non *store.Element returned from storage: %v", obj)
}
shardMatch := true
if utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) {
var err error
shardMatch, err = opts.Predicate.MatchesSharding(elem.Object)
if err != nil {
return fmt.Errorf("shard matching failed: %w", err)
}
}
...
如果滿足分片匹配且滿足labels和fields過(guò)濾,則添加到selectedObjects
if shardMatch && opts.Predicate.MatchesObjectAttributes(elem.Labels, elem.Fields) {
selectedObjects = append(selectedObjects, elem.Object)
lastSelectedObjectKey = elem.Key
}
...
}
...
}
staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go中
匹配分片
func (s *SelectionPredicate) MatchesSharding(obj runtime.Object) (bool, error) {
判斷是否開(kāi)啟ShardedListAndWatch特性
if !utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) {
return true, nil
}
判斷是否指定了ShardSelector且非空
if s.ShardSelector != nil && !s.ShardSelector.Empty() {
return s.ShardSelector.Matches(obj)
}
返回匹配成功
return true, nil
}
staging/src/k8s.io/apiserver/pkg/sharding/parser.go
解析ShardSelector表達(dá)式
func Parse(expr string) (apisharding.Selector, error) {
...
創(chuàng)建解析器
p, err := celparser.NewParser(celparser.Macros())
if err != nil {
return nil, fmt.Errorf("failed to create CEL parser: %w", err)
}
解析ShardSelector表達(dá)式
parsed, errs := p.Parse(common.NewTextSource(expr))
if errs != nil && len(errs.GetErrors()) > 0 {
return nil, fmt.Errorf("CEL parse error: %s", errs.GetErrors()[0].Message)
}
遍歷ShardSelector表達(dá)式
reqs, err := walkExpr(parsed.Expr())
if err != nil {
return nil, err
}
...
構(gòu)建ShardSelector對(duì)象
return apisharding.NewSelector(reqs...), nil
}
func walkExpr(e ast.Expr) ([]apisharding.ShardRangeRequirement, error) {
...
||函數(shù)
if fn == operators.LogicalOr {
...
return append(left, right...), nil
}
...
shardRange函數(shù)
if fn == "shardRange" {
...
解析shardRange函數(shù)調(diào)用
req, err := parseShardRangeCall(call)
if err != nil {
return nil, err
}
return []apisharding.ShardRangeRequirement{req}, nil
...
}
...
}
解析shardRange函數(shù)調(diào)用
func parseShardRangeCall(call ast.CallExpr) (apisharding.ShardRangeRequirement, error) {
...
獲取參數(shù)
args := call.Args()
if len(args) != 3 {
return apisharding.ShardRangeRequirement{}, fmt.Errorf("shardRange() requires exactly 3 arguments, got %d", len(args))
}
獲取第一個(gè)參數(shù)作為fieldPath
fieldPath, err := extractFieldPath(args[0])
if err != nil {
return apisharding.ShardRangeRequirement{}, fmt.Errorf("shardRange() first argument: %w", err)
}
fieldpath只支持object.metadata.uid和object.metadata.namespace
switch fieldPath {
case "object.metadata.uid", "object.metadata.namespace":
// ok
default:
return apisharding.ShardRangeRequirement{}, fmt.Errorf("unsupported field path %q; supported: object.metadata.uid, object.metadata.namespace", fieldPath)
}
獲取第二個(gè)參數(shù)作為hexStart
hexStart, err := extractHexLiteral(args[1], "hexStart")
if err != nil {
return apisharding.ShardRangeRequirement{}, err
}
獲取第三個(gè)參數(shù)作為hexEnd
hexEnd, err := extractHexLiteral(args[2], "hexEnd")
if err != nil {
return apisharding.ShardRangeRequirement{}, err
}
hexStart必須小雨hexEnd
if !apisharding.HexLess(hexStart, hexEnd) {
return apisharding.ShardRangeRequirement{}, fmt.Errorf("shard range start %s must be less than end %s", hexStart, hexEnd)
}
return apisharding.ShardRangeRequirement{
Key: fieldPath,
Start: hexStart,
End: hexEnd,
}, nil
...
}
staging/src/k8s.io/apimachinery/pkg/sharding/selector.go中
構(gòu)建ShardSelector對(duì)象
func NewSelector(reqs ...ShardRangeRequirement) Selector {
if len(reqs) == 0 {
return Everything()
}
return &shardSelector{
requirements: reqs,
}
}
匹配
func (s *shardSelector) Matches(obj runtime.Object) (bool, error) {
...
獲取field
value, err := ResolveFieldValue(obj, key)
if err != nil {
return false, err
}
計(jì)算hash
hash := "0x" + HashField(value)
如果hash在start和end之間,則成功匹配
for _, req := range s.requirements {
if !HexLess(hash, req.Start) && HexLess(hash, req.End) {
return true, nil
}
}
否則不匹配
return false, nil
}