從源碼看ShardedListAndWatch特性

背景

當(dāng)前控制器新增實(shí)例后要不就是只有單一可以執(zhí)行要不就是多個(gè)分片執(zhí)行(但是客戶端還是會(huì)收到全量事件)
k8s 1.36新增了ShardedListAndWatch這個(gè)特性來(lái)優(yōu)化這個(gè)問(wèn)題,1.36當(dāng)前是alpha階段默認(rèn)關(guān)閉

簡(jiǎn)單介紹

ShardedListAndWatch這個(gè)特性讓event分片在apiserver側(cè)執(zhí)行,客戶端不需要收到全量的event

源碼

staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go中

list過(guò)濾
func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate, options *metainternalversion.ListOptions) (runtime.Object, error) {
    ...
    如果開(kāi)啟了ShardedListAndWatch特性,且指定了ShardSelector,解析并設(shè)置設(shè)置ShardSelector
    if utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) && options.ShardSelector != "" {
        sel, err := sharding.Parse(options.ShardSelector)
        if err != nil {
            return nil, fmt.Errorf("invalid shard selector: %w", err)
        }
        p.ShardSelector = sel
    }
    ...
    err := e.Storage.GetList(ctx, e.KeyRootFunc(ctx), storageOpts, list)

}

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go中

獲取list

func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
    ...
    等待緩存足夠新然后獲取list
    resp, indexUsed, err := c.watchCache.WaitUntilFreshAndGetList(ctx, preparedKey, opts)
    if err != nil {
        return err
    }
    ...
    for i, obj := range resp.Items {
        elem, ok := obj.(*store.Element)
        if !ok {
            return fmt.Errorf("non *store.Element returned from storage: %v", obj)
        }
        shardMatch := true
        if utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) {
            var err error
            shardMatch, err = opts.Predicate.MatchesSharding(elem.Object)
            if err != nil {
                return fmt.Errorf("shard matching failed: %w", err)
            }
        }
        ...
        如果滿足分片匹配且滿足labels和fields過(guò)濾,則添加到selectedObjects
        if shardMatch && opts.Predicate.MatchesObjectAttributes(elem.Labels, elem.Fields) {
            selectedObjects = append(selectedObjects, elem.Object)
            lastSelectedObjectKey = elem.Key
        }
        ...
    }
    ...
}

staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go中

匹配分片
func (s *SelectionPredicate) MatchesSharding(obj runtime.Object) (bool, error) {
    判斷是否開(kāi)啟ShardedListAndWatch特性
    if !utilfeature.DefaultFeatureGate.Enabled(features.ShardedListAndWatch) {
        return true, nil
    }
    判斷是否指定了ShardSelector且非空
    if s.ShardSelector != nil && !s.ShardSelector.Empty() {
        return s.ShardSelector.Matches(obj)
    }
    返回匹配成功
    return true, nil
}

staging/src/k8s.io/apiserver/pkg/sharding/parser.go

解析ShardSelector表達(dá)式
func Parse(expr string) (apisharding.Selector, error) {
    ...
    創(chuàng)建解析器
    p, err := celparser.NewParser(celparser.Macros())
    if err != nil {
        return nil, fmt.Errorf("failed to create CEL parser: %w", err)
    }
    解析ShardSelector表達(dá)式
    parsed, errs := p.Parse(common.NewTextSource(expr))
    if errs != nil && len(errs.GetErrors()) > 0 {
        return nil, fmt.Errorf("CEL parse error: %s", errs.GetErrors()[0].Message)
    }
    遍歷ShardSelector表達(dá)式
    reqs, err := walkExpr(parsed.Expr())
    if err != nil {
        return nil, err
    }
    ...
    構(gòu)建ShardSelector對(duì)象
    return apisharding.NewSelector(reqs...), nil
}

func walkExpr(e ast.Expr) ([]apisharding.ShardRangeRequirement, error) {
        ...
        ||函數(shù)
        if fn == operators.LogicalOr {
            ...
            return append(left, right...), nil

        }
        ...
        shardRange函數(shù)
        if fn == "shardRange" {
            ...
            解析shardRange函數(shù)調(diào)用
            req, err := parseShardRangeCall(call)
            if err != nil {
                return nil, err
            }
            return []apisharding.ShardRangeRequirement{req}, nil
            ...
        }
        ...

}

解析shardRange函數(shù)調(diào)用
func parseShardRangeCall(call ast.CallExpr) (apisharding.ShardRangeRequirement, error) {
    ...
    獲取參數(shù)
    args := call.Args()
    if len(args) != 3 {
        return apisharding.ShardRangeRequirement{}, fmt.Errorf("shardRange() requires exactly 3 arguments, got %d", len(args))
    }
    獲取第一個(gè)參數(shù)作為fieldPath
    fieldPath, err := extractFieldPath(args[0])
    if err != nil {
        return apisharding.ShardRangeRequirement{}, fmt.Errorf("shardRange() first argument: %w", err)
    }

    fieldpath只支持object.metadata.uid和object.metadata.namespace
    switch fieldPath {
    case "object.metadata.uid", "object.metadata.namespace":
        // ok
    default:
        return apisharding.ShardRangeRequirement{}, fmt.Errorf("unsupported field path %q; supported: object.metadata.uid, object.metadata.namespace", fieldPath)
    }

    獲取第二個(gè)參數(shù)作為hexStart
    hexStart, err := extractHexLiteral(args[1], "hexStart")
    if err != nil {
        return apisharding.ShardRangeRequirement{}, err
    }

    獲取第三個(gè)參數(shù)作為hexEnd
    hexEnd, err := extractHexLiteral(args[2], "hexEnd")
    if err != nil {
        return apisharding.ShardRangeRequirement{}, err
    }
    hexStart必須小雨hexEnd
    if !apisharding.HexLess(hexStart, hexEnd) {
        return apisharding.ShardRangeRequirement{}, fmt.Errorf("shard range start %s must be less than end %s", hexStart, hexEnd)
    }

    return apisharding.ShardRangeRequirement{
        Key:   fieldPath,
        Start: hexStart,
        End:   hexEnd,
    }, nil
    ...
}

staging/src/k8s.io/apimachinery/pkg/sharding/selector.go中

構(gòu)建ShardSelector對(duì)象
func NewSelector(reqs ...ShardRangeRequirement) Selector {
    if len(reqs) == 0 {
        return Everything()
    }
    return &shardSelector{
        requirements: reqs,
    }
}


匹配
func (s *shardSelector) Matches(obj runtime.Object) (bool, error) {
    ...
    獲取field
    value, err := ResolveFieldValue(obj, key)
    if err != nil {
        return false, err
    }
    計(jì)算hash
    hash := "0x" + HashField(value)

    如果hash在start和end之間,則成功匹配
    for _, req := range s.requirements {
        if !HexLess(hash, req.Start) && HexLess(hash, req.End) {
            return true, nil
        }
    }
    否則不匹配
    return false, nil
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容