Eino中的面向切片編程實現(xiàn)

在Eino 中的callpacks包為組件的擴展提供回調(diào)(callback)的機制, 它允許用戶在組件執(zhí)行的不同階段注入回調(diào)處理器(handlers), 比如開始, 結(jié)束或者錯誤發(fā)生時, 通過切片注入的方式常用于記錄日志, 監(jiān)控, 和統(tǒng)計分析等功能

示例:

//  func (t *testChatModel) Generate(ctx context.Context, input []*schema.Message, opts ...model.Option) (resp *schema.Message, err error) {
//      defer func() {
//          if err != nil {
//              callbacks.OnEnd(ctx, err)
//          }
//      }()
//
//      ctx = callbacks.OnStart(ctx, &model.CallbackInput{
//          Messages: input,
//          Tools:    nil,
//          Extra:    nil,
//      })
//
//      // do smt
//
//      ctx = callbacks.OnEnd(ctx, &model.CallbackOutput{
//          Message: resp,
//          Extra:   nil,
//      })
//
//      return resp, nil
//  }

定義Handler 接口, 及運行時信息RunInfo 類型

package callbacks

import "context"

type RunInfo struct {
    Name      string
    Type      string
    Component Component
}

type CallbackInput any

type CallbackOutput any

type Handler interface {
    OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
    OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
    OnError(ctx context.Context, info *RunInfo, err error) context.Context
}

Handler 接口定義了組件在運行開始(OnStart), 結(jié)束(OnEnd)以及錯誤發(fā)生時的需要處理的聲明(OnError)

下面通過切面編程實現(xiàn)自動日志的記錄的Handler, loggerCallbacks 實現(xiàn) Handler 接口, 分別定義運行開始(OnStart), 結(jié)束(OnEnd)及錯誤發(fā)生時(OnError)的日志記錄邏輯實現(xiàn):

package callbacks

import (
    "context"
    "github.com/sirupsen/logrus"
)

type Handle[T any] func(context.Context, T, *RunInfo, []Handler) (context.Context, T)

// 日志handler 實現(xiàn)
type loggerCallbacks struct{}

func (l *loggerCallbacks) OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context {
    logrus.Infof("name: %v, type: %v, component: %v, input: %v", info.Name, info.Type, info.Component, input)
    return ctx
}

func (l *loggerCallbacks) OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context {
    logrus.Infof("name: %v, type: %v, component: %v, output: %v", info.Name, info.Type, info.Component, output)
    return ctx
}

func (l *loggerCallbacks) OnError(ctx context.Context, info *RunInfo, err error) context.Context {
    logrus.Errorf("name: %v, type: %v, component: %v, error: %v", info.Name, info.Type, info.Component, err)
    return ctx
}

使用示例:

package callbacks

import (
    "context"
    "fmt"
    "strconv"
    "testing"
)

func TestAspectInject(t *testing.T) {
    t.Run("log_handler", func(t *testing.T) {
        ctx := context.Background()
        hb := &loggerCallbacks{}
        runInfo := &RunInfo{
            Name:      "test_log",
            Type:      "sdf",
            Component: "ChatTemplate",
        }
        ctx = InitCallbacks(ctx, runInfo, hb)
        ctx = OnStart(ctx, 1)

       //業(yè)務(wù)邏輯處理此處省略......

        ctx = OnEnd(ctx, 2)
        ctx = OnError(ctx, fmt.Errorf("3"))

    })

}

示例輸出結(jié)果如下:


image.png
  1. 示例中, 首先需要 InitCallbacks(ctx, runInfo, hb)函數(shù)來通過初始化回調(diào)的運行時信息(RunInfo),處理器信息 (Handler), 以及上下文信息和Manager, 具體實現(xiàn)如下:
func InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) context.Context {
    mgr, ok := newManager(info, handlers...)
    if ok {
        return ctxWithManager(ctx, mgr)
    }

    return ctxWithManager(ctx, nil)
}



var GlobalHandlers []Handler

func newManager(runInfo *RunInfo, handlers ...Handler) (*manager, bool) {
    if len(handlers)+len(GlobalHandlers) == 0 {
        return nil, false
    }

    hs := make([]Handler, len(GlobalHandlers))
    copy(hs, GlobalHandlers)

    return &manager{
        globalHandlers: hs,
        handlers:       handlers,
        runInfo:        runInfo,
    }, true
}


func ctxWithManager(ctx context.Context, manager *manager) context.Context {
    return context.WithValue(ctx, CtxManagerKey, manager)
}

func managerFromCtx(ctx context.Context) (*manager, bool) {
    v := ctx.Value(CtxManagerKey)
    m, ok := v.(*manager)
    if ok && m != nil {
        n := *m
        return &n, true
    }

    return nil, false
}

manager 主要用于管理處理器, 并通過上下文context 在調(diào)用鏈上進行傳遞, 類型定義如下:

type manager struct {
    globalHandlers []Handler
    handlers       []Handler
    runInfo        *RunInfo
}
  1. 示例中通過 ctx = OnStart(ctx, 1) 來聲明業(yè)務(wù)邏輯處理開始時的切面操作, 并傳入業(yè)務(wù)邏輯數(shù)據(jù), OnStart實現(xiàn)如下:
// OnStart invokes the OnStart logic for the particular context, ensuring that all registered
// handlers are executed in reverse order (compared to add order) when a process begins.
func OnStart[T any](ctx context.Context, input T) context.Context {
    ctx, _ = On(ctx, input, OnStartHandle[T], true)
    return ctx
}


// 遍歷handler,  并調(diào)用handler的OnStart函數(shù)
func OnStartHandle[T any](ctx context.Context, input T,
    runInfo *RunInfo, handlers []Handler) (context.Context, T) {

    for i := len(handlers) - 1; i >= 0; i-- {
        ctx = handlers[i].OnStart(ctx, runInfo, input)
    }

    return ctx, input
}



type Handle[T any] func(context.Context, T, *RunInfo, []Handler) (context.Context, T)

func On[T any](ctx context.Context, inOut T, handle Handle[T], start bool) (context.Context, T) {
    mgr, ok := managerFromCtx(ctx)
    if !ok {
        return ctx, inOut
    }
    nMgr := *mgr

    var info *RunInfo
    if start {
        info = nMgr.runInfo
        nMgr.runInfo = nil
        ctx = context.WithValue(ctx, CtxRunInfoKey, info)
    } else {
        if nMgr.runInfo != nil {
            info = nMgr.runInfo
        } else {
            info, _ = ctx.Value(CtxRunInfoKey).(*RunInfo)
        }
    }

    hs := make([]Handler, 0, len(nMgr.handlers)+len(nMgr.globalHandlers))
    for _, handler := range append(nMgr.handlers, nMgr.globalHandlers...) {
        hs = append(hs, handler)
    }

    var out T
    // 從ctx中的manager中獲取處理器, 并調(diào)用handler的OnStart函數(shù)
    ctx, out = handle(ctx, inOut, info, hs)
    return ctxWithManager(ctx, &nMgr), out
}
  1. OnEnd 與 OnError 實現(xiàn)類似:
// OnEnd invokes the OnEnd logic of the particular context, allowing for proper cleanup
// and finalization when a process ends.
// handlers are executed in normal order (compared to add order).
func OnEnd[T any](ctx context.Context, output T) context.Context {
    ctx, _ = On(ctx, output, OnEndHandle[T], false)

    return ctx
}

// OnError invokes the OnError logic of the particular, notice that error in stream will not represent here.
// handlers are executed in normal order (compared to add order).
func OnError(ctx context.Context, err error) context.Context {
    ctx, _ = On(ctx, err, OnErrorHandle, false)

    return ctx
}


func OnErrorHandle(ctx context.Context, err error,
    runInfo *RunInfo, handlers []Handler) (context.Context, error) {

    for _, handler := range handlers {
        ctx = handler.OnError(ctx, runInfo, err)
    }

    return ctx, err
}

func OnEndHandle[T any](ctx context.Context, output T,
    runInfo *RunInfo, handlers []Handler) (context.Context, T) {

    for _, handler := range handlers {
        ctx = handler.OnEnd(ctx, runInfo, output)
    }

    return ctx, output
}

完整代碼:
https://github.com/lbbwyt/eino_test/blob/master/callbacks/aspect_inject_test.go

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容