概述
本文主要針對(duì) Containerd content 服務(wù)功能模塊的相關(guān)代碼分析,如下圖 Containerd 官方架構(gòu)圖所示:

如同 Containerd 其它服務(wù)功能模塊化機(jī)制,本文將從 Content 服務(wù)相關(guān)接口定義、GRPC 和 Service 插件化注冊(cè)、Plugin 加載相關(guān)過程、最后到服務(wù)功能的底層實(shí)現(xiàn)邏輯進(jìn)行逐步分析 。
Content Store接口與方法定義
Store 接口繼承了各 content 內(nèi)容管理相關(guān)的接口集合,后面每個(gè)接口將都有詳細(xì)說明
!FILENAME content/content.go:136
type Store interface {
Manager // 信息查找、刪除管理接口
Provider // 讀取接口
IngestManager // 寫管理接口(寫狀態(tài)、終止)
Ingester // 存寫接口
}
Manager 提供了基礎(chǔ)的 content 內(nèi)容管理方法如內(nèi)容元信息獲取、更新、列表查找、刪除
!FILENAME content/content.go:75
type Manager interface {
// 返回內(nèi)容存儲(chǔ)數(shù)據(jù)庫存放 content 元數(shù)據(jù)信息
Info(ctx context.Context, dgst digest.Digest) (Info, error)
// 更新 content 相關(guān)的可變信息項(xiàng),如 labels.* 標(biāo)簽項(xiàng)更新
Update(ctx context.Context, info Info, fieldpaths ...string) (Info, error)
// 遍歷內(nèi)容存儲(chǔ)數(shù)據(jù)庫的所有項(xiàng)進(jìn)行查找匹配指定的過濾條件項(xiàng)
Walk(ctx context.Context, fn WalkFunc, filters ...string) error
// 從內(nèi)容存儲(chǔ)數(shù)據(jù)庫移除指定的 content
Delete(ctx context.Context, dgst digest.Digest) error
}
Provider 提供了 content 的讀取接口,返回一個(gè)內(nèi)容讀取器對(duì)象 ReaderAt
!FILENAME content/content.go:35
type Provider interface {
// ocispec.Descriptor 描述符唯一需要指定 desc.Digest 內(nèi)容的摘要散列值
ReaderAt(ctx context.Context, dec ocispec.Descriptor) (ReaderAt, error)
}
// 使用標(biāo)準(zhǔn)的 io 接口 io.Closer 、io.ReaderAt,擴(kuò)展大小計(jì)算報(bào)告
type ReaderAt interface {
io.ReaderAt
io.Closer
Size() int64
}
IngestManager 寫管理接口(存寫狀態(tài)獲取、中止操作)
!FILENAME content/content.go:98
type IngestManager interface {
// 查看指定 ref 引用 Ingest 操作的狀態(tài)信息
Status(ctx context.Context, ref string) (Status, error)
// 列出所有活動(dòng)的寫操作與狀態(tài)信息,可通過 filters 提供的正則表達(dá)式來過濾列出項(xiàng)
ListStatuses(ctx context.Context, filters ...string) ([]Status, error)
// 取消操作
Abort(ctx context.Context, ref string) error
}
Ingester 提供了 content 的存寫接口,返回一個(gè)內(nèi)容寫入器對(duì)象 Writer
!FILENAME content/content.go:44
type Ingester interface {
//Writer opts 需帶指定 ref 來唯一標(biāo)識(shí)活動(dòng)
Writer(ctx context.Context, opts ...WriterOpt) (Writer, error)
}
Content GRPC 注冊(cè)與 Server 實(shí)現(xiàn)
content GRPC 插件注冊(cè),插件 InitFn 最后 contentserver.New(cs.(content.Store)) 返回 api.ContentServer,而GRPC 所依賴的服務(wù)插件 "content-service" 實(shí)例化對(duì)象作為其唯一傳參,類型則是前面所詳述的 content.Store 接口。
!FILENAME services/content/service.go:27
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "content",
Requires: []plugin.Type{
plugin.ServicePlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
plugins, err := ic.GetByType(plugin.ServicePlugin) //獲取所有服務(wù)插件
if err != nil {
return nil, err
}
p, ok := plugins[services.ContentService] // Key 為 "content-service" 服務(wù)插件
if !ok {
return nil, errors.New("content store service not found")
}
cs, err := p.Instance() // "content-service" 插件實(shí)例化對(duì)象
if err != nil {
return nil, err
}
// 傳參 cs.(content.Store)
return contentserver.New(cs.(content.Store)), nil
},
})
}
ContentServer is the server API for Content service.
!FILENAME api/services/content/v1/content.pb.go:1230
type ContentServer interface {
Info(context.Context, *InfoRequest) (*InfoResponse, error)
Update(context.Context, *UpdateRequest) (*UpdateResponse, error)
List(*ListContentRequest, Content_ListServer) error
Delete(context.Context, *DeleteContentRequest) (*types.Empty, error)
Read(*ReadContentRequest, Content_ReadServer) error
Status(context.Context, *StatusRequest) (*StatusResponse, error)
ListStatuses(context.Context, *ListStatusesRequest) (*ListStatusesResponse, error)
Write(Content_WriteServer) error
Abort(context.Context, *AbortRequest) (*types.Empty, error)
}
New returns the content GRPC server
!FILENAME services/content/contentserver/contentserver.go:50
func New(cs content.Store) api.ContentServer {
return &service{store: cs} // service
}
!FILENAME services/content/contentserver/contentserver.go:38
type service struct {
store content.Store // content.Store 接口類型
}
上層Content Server 包裝的 service 類實(shí)現(xiàn)了 api.ContentServer 接口,其主要功能是底層所注冊(cè)的 "content-service" 插件的服務(wù)方法,如下讀取 Read() 實(shí)現(xiàn)方法邏輯則主要調(diào)用了底層的 store.ReaderAt() 來實(shí)現(xiàn) content 讀取。其它剩余的方法(Info、Update、List、Delete、Status、ListStatuses、WriteAbort、Abort)實(shí)現(xiàn)也類似將不再一一展開。
!FILENAME services/content/contentserver/contentserver.go:144
func (s *service) Read(req *api.ReadContentRequest, session api.Content_ReadServer) error {
if err := req.Digest.Validate(); err != nil {
return status.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err)
}
// 調(diào)用底層服務(wù)方法 s.store.Info()
oi, err := s.store.Info(session.Context(), req.Digest)
if err != nil {
return errdefs.ToGRPC(err)
}
// 調(diào)用底層服務(wù)方法 s.store.ReaderAt()
ra, err := s.store.ReaderAt(session.Context(), ocispec.Descriptor{Digest: req.Digest})
if err != nil {
return errdefs.ToGRPC(err)
}
//...
return errdefs.ToGRPC(err)
}
Content service 服務(wù)注冊(cè)與實(shí)現(xiàn)
注冊(cè)
!FILENAME services/content/store.go:37
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.ServicePlugin, // 服務(wù)插件類型
ID: services.ContentService, // ID 為"content-service"
Requires: []plugin.Type{
plugin.MetadataPlugin, // 依賴元數(shù)據(jù)插件
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin) //獲取元數(shù)據(jù)庫注冊(cè)的插件初始化對(duì)象
if err != nil {
return nil, err
}
// +創(chuàng)建 content.Store 實(shí)例對(duì)象,其輸入的參數(shù)為重點(diǎn)關(guān)注
// +m.(metadata.DB).ContentStore() 為元數(shù)據(jù)庫指定的內(nèi)容存儲(chǔ)對(duì)象(后面詳述)
s, err := newContentStore(m.(*metadata.DB).ContentStore(), ic.Events)
return s, err
},
})
}
!FILENAME services/content/store.go:56
func newContentStore(cs content.Store, publisher events.Publisher) (content.Store, error) {
return &store{
Store: cs, // 內(nèi)容存儲(chǔ)對(duì)象
publisher: publisher,
}, nil
}
store 類結(jié)構(gòu)定義,實(shí)際上包裝了 content.Store 增加事件的推送
!FILENAME services/content/store.go:32
type store struct {
content.Store
publisher events.Publisher
}
元數(shù)據(jù)庫及內(nèi)容存儲(chǔ)庫
MetadataPlugin 元數(shù)據(jù)庫在 containerd server 創(chuàng)建過程中對(duì)所有插件進(jìn)行加載時(shí)被指定,同時(shí)指定了內(nèi)容存儲(chǔ)和snapshotter 實(shí)現(xiàn)類對(duì)象
!FILENAME services/server/server.go:304
func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]*plugin.Registration, error) {
// load all plugins into containerd
plugin.Register(
//...
plugin.Register(&plugin.Registration{
Type: plugin.ContentPlugin, // 內(nèi)容插件類型
ID: "content",
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Exports["root"] = ic.Root
return local.NewStore(ic.Root) // 插件初始化func,創(chuàng)建與返回內(nèi)容存儲(chǔ)實(shí)例化對(duì)象
},
})
//...
plugin.Register(&plugin.Registration{
Type: plugin.MetadataPlugin,
ID: "bolt",
Requires: []plugin.Type{
plugin.ContentPlugin,
plugin.SnapshotPlugin,
},
Config: &srvconfig.BoltConfig{
ContentSharingPolicy: srvconfig.SharingPolicyShared,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
//...
path := filepath.Join(ic.Root, "meta.db")
ic.Meta.Exports["path"] = path
// 創(chuàng)建bolt DB,文件名為 "meta.db"
db, err := bolt.Open(path, 0644, nil)
if err != nil {
return nil, err
}
var dbopts []metadata.DBOpt
if !shared {
dbopts = append(dbopts, metadata.WithPolicyIsolated)
}
// 創(chuàng)建元數(shù)據(jù)庫對(duì)象,關(guān)注三個(gè)關(guān)鍵的輸入?yún)?shù):
// db為boltdb對(duì)象
// cs為內(nèi)容存儲(chǔ)對(duì)象
// snapshotters為快照管理器對(duì)象
mdb := metadata.NewDB(db, cs.(content.Store), snapshotters, dbopts...)
if err := mdb.Init(ic.Context); err != nil {
return nil, err
}
return mdb, nil
},
})
//...
}
NewStore 本地內(nèi)容存儲(chǔ)構(gòu)建,實(shí)際調(diào)用 NewLabeledStore ,返回 content.Store 接口實(shí)現(xiàn)類對(duì)象 store{root, ls}
!FILENAME content/local/store.go:74
// NewStore returns a local content store
func NewStore(root string) (content.Store, error) {
return NewLabeledStore(root, nil)
}
func NewLabeledStore(root string, ls LabelStore) (content.Store, error) {
if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil {
return nil, err
}
return &store{
root: root,
ls: ls,
}, nil
}
下面我們將重點(diǎn)分析 content store 的實(shí)現(xiàn)類方法邏輯,本文主要分析 Writer 和 ReaderAt 兩個(gè)方法,其它的方法可以查看源碼。
store.Writer() 返回一個(gè)配置好的內(nèi)容 writer 對(duì)象可供內(nèi)容數(shù)據(jù)的寫入
// `ref` 參數(shù)用于為寫事務(wù)生命周期管理的唯一標(biāo)識(shí),必須指定 ref
func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
var wOpts content.WriterOpts
// 加載 writer 配置選項(xiàng)
for _, opt := range opts {
if err := opt(&wOpts); err != nil {
return nil, err
}
}
// ref 配置選項(xiàng)檢查不能為空
if wOpts.Ref == "" {
return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
}
//...
// +實(shí)標(biāo)調(diào)用 writer()方法(下面詳述)
w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)
if err != nil {
unlock(wOpts.Ref)
return nil, err
}
return w, nil // lock is now held by w.
}
!FILENAME content/local/store.go:511
func (s *store) writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) {
if expected != "" {
// 通過摘要散列值生成 blob 文件對(duì)象路徑
p := s.blobPath(expected)
if _, err := os.Stat(p); err == nil {
return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
}
}
// 基于 ref 定義生成數(shù)據(jù)處理的路徑,返回三個(gè)路徑:
// path 為整個(gè) ingest 的目錄路徑 $root/ingest/$digest(ref)/
// refp 以 ref 為名文件路徑 $root/ingest/$digest(ref)/ref
// data 數(shù)據(jù)文件路徑 $root/ingest/$digest(ref)/data
path, refp, data := s.ingestPaths(ref)
var (
digester = digest.Canonical.Digester()
offset int64
startedAt time.Time
updatedAt time.Time
)
foundValidIngest := false
// 確保 ingest 目錄被創(chuàng)建
if err := os.Mkdir(path, 0755); err != nil {
if !os.IsExist(err) {
return nil, err
}
// 獲取原 ref 及數(shù)據(jù)狀態(tài)信息
status, err := s.resumeStatus(ref, total, digester)
if err == nil {
foundValidIngest = true
updatedAt = status.UpdatedAt
startedAt = status.StartedAt
total = status.Total
offset = status.Offset
} else {
logrus.Infof("failed to resume the status from path %s: %s. will recreate them", path, err.Error())
}
}
// 如果不存在則創(chuàng)建相關(guān)文件
if !foundValidIngest {
startedAt = time.Now()
updatedAt = startedAt
// ref 文件寫入內(nèi)容為 ref 指定的字符串信息
if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil {
return nil, err
}
// 開始時(shí)間
if err := writeTimestampFile(filepath.Join(path, "startedat"), startedAt); err != nil {
return nil, err
}
// 更新時(shí)間
if err := writeTimestampFile(filepath.Join(path, "updatedat"), startedAt); err != nil {
return nil, err
}
// 大小
if total > 0 {
if err := ioutil.WriteFile(filepath.Join(path, "total"), []byte(fmt.Sprint(total)), 0666); err != nil {
return nil, err
}
}
}
// 打開數(shù)據(jù)文件句柄
fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return nil, errors.Wrap(err, "failed to open data file")
}
// 定位偏移位置
if _, err := fp.Seek(offset, io.SeekStart); err != nil {
return nil, errors.Wrap(err, "could not seek to current write offset")
}
// 最后返回一個(gè) writer 對(duì)象
return &writer{
s: s,
fp: fp,
ref: ref,
path: path,
offset: offset,
total: total,
digester: digester,
startedAt: startedAt,
updatedAt: updatedAt,
}, nil
}
!FILENAME content/local/store.go:480
func (s *store) resumeStatus(ref string, total int64, digester digest.Digester) (content.Status, error) {
path, _, data := s.ingestPaths(ref) // 基于 ref 定義生成數(shù)據(jù)處理的路徑
status, err := s.status(path) // 獲取 ingest 目錄的元狀態(tài)信息
if err != nil {
return status, errors.Wrap(err, "failed reading status of resume write")
}
// ref 值與 ingest 目錄下檢驗(yàn)是否一致
if ref != status.Ref {
return status, errors.Wrapf(err, "ref key does not match: %v != %v", ref, status.Ref)
}
// 大小檢驗(yàn)
if total > 0 && status.Total > 0 && total != status.Total {
return status, errors.Errorf("provided total differs from status: %v != %v", total, status.Total)
}
// 打開 blob 數(shù)據(jù)文件句柄
fp, err := os.Open(data)
if err != nil {
return status, err
}
p := bufPool.Get().(*[]byte)
status.Offset, err = io.CopyBuffer(digester.Hash(), fp, *p)
bufPool.Put(p)
fp.Close()
return status, err //返回狀態(tài)信息
}
store ReaderAt 返回 blob 的 io.ReaderAt ,其代碼實(shí)現(xiàn)為標(biāo)準(zhǔn)的文件打開句柄加上文件大小。 ocispec.Descriptor OCI 標(biāo)準(zhǔn)格式描述符指定了需要讀取的文件 blob 內(nèi)容摘要散列值,通過在路徑 $root/blobs/$digest 查找文件。
!FILENAME content/local/store.go:125
// ReaderAt returns an io.ReaderAt for the blob.
func (s *store) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
p := s.blobPath(desc.Digest) // 通過指定的摘要散列值生成 blob 文件對(duì)象路徑
fi, err := os.Stat(p)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p)
}
fp, err := os.Open(p) // 打開文件獲取文件句柄
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p)
}
return sizeReaderAt{size: fi.Size(), fp: fp}, nil
}
其它剩余的 content store 方法(Info、Update、List、Delete、Status、ListStatuses、WriteAbort、Abort)實(shí)現(xiàn)也類似,將不再一一展開。
附錄
ctr content 命令
Name: "get",
Usage: "get the data for an object",
ArgsUsage: "[<digest>, ...]",
Description: "display the image object",
Name: "ingest",
Usage: "accept content into the store",
ArgsUsage: "[flags] <key>",
Description: "ingest objects into the local content store",
Name: "active",
Usage: "display active transfers",
ArgsUsage: "[flags] [<regexp>]",
Description: "display the ongoing transfers",
Name: "list",
Aliases: []string{"ls"},
Usage: "list all blobs in the store",
ArgsUsage: "[flags]",
Description: "list blobs in the content store",
Name: "label",
Usage: "add labels to content",
ArgsUsage: "<digest> [<label>=<value> ...]",
Description: "labels blobs in the content store",
Name: "edit",
Usage: "edit a blob and return a new digest",
ArgsUsage: "[flags] <digest>",
Description: "edit a blob and return a new digest",
Name: "delete",
Aliases: []string{"del", "remove", "rm"},
Usage: "permanently delete one or more blobs",
ArgsUsage: "[<digest>, ...]",
Description: `Delete one or more blobs permanently. Successfully deleted
blobs are printed to stdout.`,
Name: "fetch-object",
Usage: "retrieve objects from a remote",
ArgsUsage: "[flags] <remote> <object> [<hint>, ...]",
Description: `Fetch objects by identifier from a remote.`,
Flags: commands.RegistryFlags,
Name: "push-object",
Usage: "push an object to a remote",
ArgsUsage: "[flags] <remote> <object> <type>",
Description: `Push objects by identifier to a remote.`,
Name: "fetch",
Usage: "fetch all content for an image into containerd",
ArgsUsage: "[flags] <remote> <object>",
Description: `Fetch an image into containerd.
~~ 本文 END ~~