Containerd Content 服務(wù)

概述

本文主要針對(duì) Containerd content 服務(wù)功能模塊的相關(guān)代碼分析,如下圖 Containerd 官方架構(gòu)圖所示:

containerd-content.png

如同 Containerd 其它服務(wù)功能模塊化機(jī)制,本文將從 Content 服務(wù)相關(guān)接口定義、GRPC 和 Service 插件化注冊(cè)、Plugin 加載相關(guān)過程、最后到服務(wù)功能的底層實(shí)現(xiàn)邏輯進(jìn)行逐步分析 。

Content Store接口與方法定義

Store 接口繼承了各 content 內(nèi)容管理相關(guān)的接口集合,后面每個(gè)接口將都有詳細(xì)說明

!FILENAME content/content.go:136

type Store interface {
    Manager         // 信息查找、刪除管理接口 
    Provider        // 讀取接口 
  IngestManager   // 寫管理接口(寫狀態(tài)、終止)
    Ingester        // 存寫接口
}

Manager 提供了基礎(chǔ)的 content 內(nèi)容管理方法如內(nèi)容元信息獲取、更新、列表查找、刪除

!FILENAME content/content.go:75

type Manager interface {
  // 返回內(nèi)容存儲(chǔ)數(shù)據(jù)庫存放 content 元數(shù)據(jù)信息
    Info(ctx context.Context, dgst digest.Digest) (Info, error)

  // 更新 content 相關(guān)的可變信息項(xiàng),如 labels.* 標(biāo)簽項(xiàng)更新
    Update(ctx context.Context, info Info, fieldpaths ...string) (Info, error)

  // 遍歷內(nèi)容存儲(chǔ)數(shù)據(jù)庫的所有項(xiàng)進(jìn)行查找匹配指定的過濾條件項(xiàng)
    Walk(ctx context.Context, fn WalkFunc, filters ...string) error

    // 從內(nèi)容存儲(chǔ)數(shù)據(jù)庫移除指定的 content
    Delete(ctx context.Context, dgst digest.Digest) error
}

Provider 提供了 content 的讀取接口,返回一個(gè)內(nèi)容讀取器對(duì)象 ReaderAt

!FILENAME content/content.go:35

type Provider interface {
    // ocispec.Descriptor 描述符唯一需要指定 desc.Digest 內(nèi)容的摘要散列值 
    ReaderAt(ctx context.Context, dec ocispec.Descriptor) (ReaderAt, error)
}

// 使用標(biāo)準(zhǔn)的 io 接口 io.Closer 、io.ReaderAt,擴(kuò)展大小計(jì)算報(bào)告
type ReaderAt interface {
    io.ReaderAt
    io.Closer
    Size() int64
}

IngestManager 寫管理接口(存寫狀態(tài)獲取、中止操作)

!FILENAME content/content.go:98

type IngestManager interface {
    // 查看指定 ref 引用 Ingest 操作的狀態(tài)信息
    Status(ctx context.Context, ref string) (Status, error)

  // 列出所有活動(dòng)的寫操作與狀態(tài)信息,可通過 filters 提供的正則表達(dá)式來過濾列出項(xiàng) 
    ListStatuses(ctx context.Context, filters ...string) ([]Status, error)

    // 取消操作
    Abort(ctx context.Context, ref string) error
}

Ingester 提供了 content 的存寫接口,返回一個(gè)內(nèi)容寫入器對(duì)象 Writer

!FILENAME content/content.go:44

type Ingester interface {
    //Writer opts 需帶指定 ref 來唯一標(biāo)識(shí)活動(dòng)
    Writer(ctx context.Context, opts ...WriterOpt) (Writer, error)
}

Content GRPC 注冊(cè)與 Server 實(shí)現(xiàn)

content GRPC 插件注冊(cè),插件 InitFn 最后 contentserver.New(cs.(content.Store)) 返回 api.ContentServer,而GRPC 所依賴的服務(wù)插件 "content-service" 實(shí)例化對(duì)象作為其唯一傳參,類型則是前面所詳述的 content.Store 接口。

!FILENAME services/content/service.go:27

func init() {
    plugin.Register(&plugin.Registration{
        Type: plugin.GRPCPlugin,
        ID:   "content",
        Requires: []plugin.Type{
            plugin.ServicePlugin,
        },
        InitFn: func(ic *plugin.InitContext) (interface{}, error) {
            plugins, err := ic.GetByType(plugin.ServicePlugin)  //獲取所有服務(wù)插件
            if err != nil {
                return nil, err
            }
            p, ok := plugins[services.ContentService]  // Key 為 "content-service" 服務(wù)插件
            if !ok {
                return nil, errors.New("content store service not found")
            }
            cs, err := p.Instance()     // "content-service" 插件實(shí)例化對(duì)象
            if err != nil {
                return nil, err
            }
      // 傳參 cs.(content.Store) 
            return contentserver.New(cs.(content.Store)), nil
        },
    })
}

ContentServer is the server API for Content service.

!FILENAME api/services/content/v1/content.pb.go:1230

type ContentServer interface {
    Info(context.Context, *InfoRequest) (*InfoResponse, error)
    Update(context.Context, *UpdateRequest) (*UpdateResponse, error)
    List(*ListContentRequest, Content_ListServer) error
    Delete(context.Context, *DeleteContentRequest) (*types.Empty, error)
    Read(*ReadContentRequest, Content_ReadServer) error
    Status(context.Context, *StatusRequest) (*StatusResponse, error)
    ListStatuses(context.Context, *ListStatusesRequest) (*ListStatusesResponse, error)
    Write(Content_WriteServer) error
    Abort(context.Context, *AbortRequest) (*types.Empty, error)
}

New returns the content GRPC server

!FILENAME services/content/contentserver/contentserver.go:50

func New(cs content.Store) api.ContentServer {
    return &service{store: cs}   // service
}

!FILENAME services/content/contentserver/contentserver.go:38

type service struct {
    store content.Store          // content.Store 接口類型
}

上層Content Server 包裝的 service 類實(shí)現(xiàn)了 api.ContentServer 接口,其主要功能是底層所注冊(cè)的 "content-service" 插件的服務(wù)方法,如下讀取 Read() 實(shí)現(xiàn)方法邏輯則主要調(diào)用了底層的 store.ReaderAt() 來實(shí)現(xiàn) content 讀取。其它剩余的方法(Info、Update、List、Delete、Status、ListStatuses、WriteAbort、Abort)實(shí)現(xiàn)也類似將不再一一展開。

!FILENAME services/content/contentserver/contentserver.go:144

func (s *service) Read(req *api.ReadContentRequest, session api.Content_ReadServer) error {
    if err := req.Digest.Validate(); err != nil {
        return status.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err)
    }

  // 調(diào)用底層服務(wù)方法 s.store.Info()
    oi, err := s.store.Info(session.Context(), req.Digest)
    if err != nil {
        return errdefs.ToGRPC(err)
    }

  // 調(diào)用底層服務(wù)方法 s.store.ReaderAt() 
    ra, err := s.store.ReaderAt(session.Context(), ocispec.Descriptor{Digest: req.Digest})
    if err != nil {
        return errdefs.ToGRPC(err)
    }
  //...

    return errdefs.ToGRPC(err)
}

Content service 服務(wù)注冊(cè)與實(shí)現(xiàn)

注冊(cè)

!FILENAME services/content/store.go:37

func init() {
    plugin.Register(&plugin.Registration{
        Type: plugin.ServicePlugin,            // 服務(wù)插件類型
        ID:   services.ContentService,         // ID 為"content-service"
        Requires: []plugin.Type{
            plugin.MetadataPlugin,               // 依賴元數(shù)據(jù)插件
        },
        InitFn: func(ic *plugin.InitContext) (interface{}, error) {
            m, err := ic.Get(plugin.MetadataPlugin)  //獲取元數(shù)據(jù)庫注冊(cè)的插件初始化對(duì)象
            if err != nil {
                return nil, err
            }
      // +創(chuàng)建 content.Store 實(shí)例對(duì)象,其輸入的參數(shù)為重點(diǎn)關(guān)注 
      // +m.(metadata.DB).ContentStore() 為元數(shù)據(jù)庫指定的內(nèi)容存儲(chǔ)對(duì)象(后面詳述)
            s, err := newContentStore(m.(*metadata.DB).ContentStore(), ic.Events)
            return s, err
        },
    })
}

!FILENAME services/content/store.go:56

func newContentStore(cs content.Store, publisher events.Publisher) (content.Store, error) {
    return &store{
        Store:     cs,              // 內(nèi)容存儲(chǔ)對(duì)象
        publisher: publisher,
    }, nil
}

store 類結(jié)構(gòu)定義,實(shí)際上包裝了 content.Store 增加事件的推送

!FILENAME services/content/store.go:32

type store struct {
    content.Store
    publisher events.Publisher
}

元數(shù)據(jù)庫及內(nèi)容存儲(chǔ)庫

MetadataPlugin 元數(shù)據(jù)庫在 containerd server 創(chuàng)建過程中對(duì)所有插件進(jìn)行加載時(shí)被指定,同時(shí)指定了內(nèi)容存儲(chǔ)和snapshotter 實(shí)現(xiàn)類對(duì)象

!FILENAME services/server/server.go:304

func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]*plugin.Registration, error) {
    // load all plugins into containerd
    plugin.Register(
    //...
  plugin.Register(&plugin.Registration{
        Type: plugin.ContentPlugin,             // 內(nèi)容插件類型
        ID:   "content",
        InitFn: func(ic *plugin.InitContext) (interface{}, error) {
            ic.Meta.Exports["root"] = ic.Root
            return local.NewStore(ic.Root)       // 插件初始化func,創(chuàng)建與返回內(nèi)容存儲(chǔ)實(shí)例化對(duì)象
        },
    })
    //...
    plugin.Register(&plugin.Registration{
        Type: plugin.MetadataPlugin,
        ID:   "bolt",
        Requires: []plugin.Type{
            plugin.ContentPlugin,
            plugin.SnapshotPlugin,
        },
        Config: &srvconfig.BoltConfig{
            ContentSharingPolicy: srvconfig.SharingPolicyShared,
        },
        InitFn: func(ic *plugin.InitContext) (interface{}, error) {
      //...
      
            path := filepath.Join(ic.Root, "meta.db")
            ic.Meta.Exports["path"] = path

      // 創(chuàng)建bolt DB,文件名為 "meta.db"
            db, err := bolt.Open(path, 0644, nil)
            if err != nil {
                return nil, err
            }

            var dbopts []metadata.DBOpt
            if !shared {
                dbopts = append(dbopts, metadata.WithPolicyIsolated)
            }
      // 創(chuàng)建元數(shù)據(jù)庫對(duì)象,關(guān)注三個(gè)關(guān)鍵的輸入?yún)?shù):
      // db為boltdb對(duì)象
      // cs為內(nèi)容存儲(chǔ)對(duì)象
      // snapshotters為快照管理器對(duì)象
            mdb := metadata.NewDB(db, cs.(content.Store), snapshotters, dbopts...)
            if err := mdb.Init(ic.Context); err != nil {
                return nil, err
            }
            return mdb, nil
        },
    })
    
//...
}

NewStore 本地內(nèi)容存儲(chǔ)構(gòu)建,實(shí)際調(diào)用 NewLabeledStore ,返回 content.Store 接口實(shí)現(xiàn)類對(duì)象 store{root, ls}

!FILENAME content/local/store.go:74

// NewStore returns a local content store
func NewStore(root string) (content.Store, error) {
    return NewLabeledStore(root, nil)
}
func NewLabeledStore(root string, ls LabelStore) (content.Store, error) {
    if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil {
        return nil, err
    }

    return &store{
        root: root,
        ls:   ls,
    }, nil
}

下面我們將重點(diǎn)分析 content store 的實(shí)現(xiàn)類方法邏輯,本文主要分析 Writer 和 ReaderAt 兩個(gè)方法,其它的方法可以查看源碼。

store.Writer() 返回一個(gè)配置好的內(nèi)容 writer 對(duì)象可供內(nèi)容數(shù)據(jù)的寫入

// `ref` 參數(shù)用于為寫事務(wù)生命周期管理的唯一標(biāo)識(shí),必須指定 ref
func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
    var wOpts content.WriterOpts
  // 加載 writer 配置選項(xiàng)
    for _, opt := range opts {
        if err := opt(&wOpts); err != nil {
            return nil, err
        }
    }

  // ref 配置選項(xiàng)檢查不能為空
    if wOpts.Ref == "" {
        return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
    }
  
  //...

  // +實(shí)標(biāo)調(diào)用 writer()方法(下面詳述)
    w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)
    if err != nil {
        unlock(wOpts.Ref)
        return nil, err
    }

    return w, nil // lock is now held by w.
}

!FILENAME content/local/store.go:511

func (s *store) writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) {

    if expected != "" {
    // 通過摘要散列值生成 blob 文件對(duì)象路徑
        p := s.blobPath(expected)
        if _, err := os.Stat(p); err == nil {
            return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
        }
    }
  
  // 基于 ref 定義生成數(shù)據(jù)處理的路徑,返回三個(gè)路徑:
  // path 為整個(gè) ingest 的目錄路徑 $root/ingest/$digest(ref)/
  // refp  以 ref 為名文件路徑 $root/ingest/$digest(ref)/ref
  // data 數(shù)據(jù)文件路徑 $root/ingest/$digest(ref)/data
  
    path, refp, data := s.ingestPaths(ref)

    var (
        digester  = digest.Canonical.Digester()
        offset    int64
        startedAt time.Time
        updatedAt time.Time
    )

    foundValidIngest := false
  // 確保 ingest 目錄被創(chuàng)建
    if err := os.Mkdir(path, 0755); err != nil {
        if !os.IsExist(err) {
            return nil, err
        }
    // 獲取原 ref 及數(shù)據(jù)狀態(tài)信息
        status, err := s.resumeStatus(ref, total, digester)
        if err == nil {
            foundValidIngest = true
            updatedAt = status.UpdatedAt
            startedAt = status.StartedAt
            total = status.Total
            offset = status.Offset
        } else {   
            logrus.Infof("failed to resume the status from path %s: %s. will recreate them", path, err.Error())
        }
    }

  // 如果不存在則創(chuàng)建相關(guān)文件
    if !foundValidIngest {
        startedAt = time.Now()
        updatedAt = startedAt

        // ref 文件寫入內(nèi)容為 ref 指定的字符串信息
        if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil {
            return nil, err
        }

    // 開始時(shí)間
        if err := writeTimestampFile(filepath.Join(path, "startedat"), startedAt); err != nil {
            return nil, err
        }
    // 更新時(shí)間
        if err := writeTimestampFile(filepath.Join(path, "updatedat"), startedAt); err != nil {
            return nil, err
        }

    // 大小
        if total > 0 {
            if err := ioutil.WriteFile(filepath.Join(path, "total"), []byte(fmt.Sprint(total)), 0666); err != nil {
                return nil, err
            }
        }
    }
  
  // 打開數(shù)據(jù)文件句柄
    fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666)
    if err != nil {
        return nil, errors.Wrap(err, "failed to open data file")
    }
 
  // 定位偏移位置
    if _, err := fp.Seek(offset, io.SeekStart); err != nil {
        return nil, errors.Wrap(err, "could not seek to current write offset")
    }

  // 最后返回一個(gè) writer 對(duì)象
    return &writer{
        s:         s,
        fp:        fp,
        ref:       ref,
        path:      path,
        offset:    offset,
        total:     total,
        digester:  digester,
        startedAt: startedAt,
        updatedAt: updatedAt,
    }, nil
}

!FILENAME content/local/store.go:480

func (s *store) resumeStatus(ref string, total int64, digester digest.Digester) (content.Status, error) {
    path, _, data := s.ingestPaths(ref)   // 基于 ref 定義生成數(shù)據(jù)處理的路徑
    status, err := s.status(path)         // 獲取 ingest 目錄的元狀態(tài)信息
    if err != nil {
        return status, errors.Wrap(err, "failed reading status of resume write")
    }
  // ref 值與 ingest 目錄下檢驗(yàn)是否一致
    if ref != status.Ref {
        return status, errors.Wrapf(err, "ref key does not match: %v != %v", ref, status.Ref)
    }

  // 大小檢驗(yàn)
    if total > 0 && status.Total > 0 && total != status.Total {
        return status, errors.Errorf("provided total differs from status: %v != %v", total, status.Total)
    }

    // 打開 blob 數(shù)據(jù)文件句柄
    fp, err := os.Open(data)
    if err != nil {
        return status, err
    }

    p := bufPool.Get().(*[]byte)
    status.Offset, err = io.CopyBuffer(digester.Hash(), fp, *p)
    bufPool.Put(p)
    fp.Close()
    return status, err   //返回狀態(tài)信息
}

store ReaderAt 返回 blob 的 io.ReaderAt ,其代碼實(shí)現(xiàn)為標(biāo)準(zhǔn)的文件打開句柄加上文件大小。 ocispec.Descriptor OCI 標(biāo)準(zhǔn)格式描述符指定了需要讀取的文件 blob 內(nèi)容摘要散列值,通過在路徑 $root/blobs/$digest 查找文件。

!FILENAME content/local/store.go:125

// ReaderAt returns an io.ReaderAt for the blob.
func (s *store) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
    p := s.blobPath(desc.Digest)     // 通過指定的摘要散列值生成 blob 文件對(duì)象路徑
    fi, err := os.Stat(p)
    if err != nil {
        if !os.IsNotExist(err) {
            return nil, err
        }
        return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p)
    }

    fp, err := os.Open(p)           // 打開文件獲取文件句柄
    if err != nil {
        if !os.IsNotExist(err) {
            return nil, err
        }

        return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p)
    }

    return sizeReaderAt{size: fi.Size(), fp: fp}, nil  
}

其它剩余的 content store 方法(Info、Update、List、Delete、Status、ListStatuses、WriteAbort、Abort)實(shí)現(xiàn)也類似,將不再一一展開。

附錄

ctr content 命令

Name:        "get",
Usage:       "get the data for an object",
ArgsUsage:   "[<digest>, ...]",
Description: "display the image object",


Name:        "ingest",
Usage:       "accept content into the store",
ArgsUsage:   "[flags] <key>",
Description: "ingest objects into the local content store",


Name:        "active",
Usage:       "display active transfers",
ArgsUsage:   "[flags] [<regexp>]",
Description: "display the ongoing transfers",


Name:        "list",
Aliases:     []string{"ls"},
Usage:       "list all blobs in the store",
ArgsUsage:   "[flags]",
Description: "list blobs in the content store",


Name:        "label",
Usage:       "add labels to content",
ArgsUsage:   "<digest> [<label>=<value> ...]",
Description: "labels blobs in the content store",


Name:        "edit",
Usage:       "edit a blob and return a new digest",
ArgsUsage:   "[flags] <digest>",
Description: "edit a blob and return a new digest",


Name:      "delete",
Aliases:   []string{"del", "remove", "rm"},
Usage:     "permanently delete one or more blobs",
ArgsUsage: "[<digest>, ...]",
Description: `Delete one or more blobs permanently. Successfully deleted
blobs are printed to stdout.`,


Name:        "fetch-object",
Usage:       "retrieve objects from a remote",
ArgsUsage:   "[flags] <remote> <object> [<hint>, ...]",
Description: `Fetch objects by identifier from a remote.`,
Flags:       commands.RegistryFlags,


Name:        "push-object",
Usage:       "push an object to a remote",
ArgsUsage:   "[flags] <remote> <object> <type>",
Description: `Push objects by identifier to a remote.`,


Name:      "fetch",
Usage:     "fetch all content for an image into containerd",
ArgsUsage: "[flags] <remote> <object>",
Description: `Fetch an image into containerd.

~~ 本文 END ~~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容