在Eino 中的callpacks包為組件的擴展提供回調(diào)(callback)的機制, 它允許用戶在組件執(zhí)行的不同階段注入回調(diào)處理器(handlers), 比如開始, 結(jié)束或者錯誤發(fā)生時, 通過切片注入的方式常用于記錄日志, 監(jiān)控, 和統(tǒng)計分析等功能
示例:
// func (t *testChatModel) Generate(ctx context.Context, input []*schema.Message, opts ...model.Option) (resp *schema.Message, err error) {
// defer func() {
// if err != nil {
// callbacks.OnEnd(ctx, err)
// }
// }()
//
// ctx = callbacks.OnStart(ctx, &model.CallbackInput{
// Messages: input,
// Tools: nil,
// Extra: nil,
// })
//
// // do smt
//
// ctx = callbacks.OnEnd(ctx, &model.CallbackOutput{
// Message: resp,
// Extra: nil,
// })
//
// return resp, nil
// }
定義Handler 接口, 及運行時信息RunInfo 類型
package callbacks
import "context"
type RunInfo struct {
Name string
Type string
Component Component
}
type CallbackInput any
type CallbackOutput any
type Handler interface {
OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
OnError(ctx context.Context, info *RunInfo, err error) context.Context
}
Handler 接口定義了組件在運行開始(OnStart), 結(jié)束(OnEnd)以及錯誤發(fā)生時的需要處理的聲明(OnError)
下面通過切面編程實現(xiàn)自動日志的記錄的Handler, loggerCallbacks 實現(xiàn) Handler 接口, 分別定義運行開始(OnStart), 結(jié)束(OnEnd)及錯誤發(fā)生時(OnError)的日志記錄邏輯實現(xiàn):
package callbacks
import (
"context"
"github.com/sirupsen/logrus"
)
type Handle[T any] func(context.Context, T, *RunInfo, []Handler) (context.Context, T)
// 日志handler 實現(xiàn)
type loggerCallbacks struct{}
func (l *loggerCallbacks) OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context {
logrus.Infof("name: %v, type: %v, component: %v, input: %v", info.Name, info.Type, info.Component, input)
return ctx
}
func (l *loggerCallbacks) OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context {
logrus.Infof("name: %v, type: %v, component: %v, output: %v", info.Name, info.Type, info.Component, output)
return ctx
}
func (l *loggerCallbacks) OnError(ctx context.Context, info *RunInfo, err error) context.Context {
logrus.Errorf("name: %v, type: %v, component: %v, error: %v", info.Name, info.Type, info.Component, err)
return ctx
}
使用示例:
package callbacks
import (
"context"
"fmt"
"strconv"
"testing"
)
func TestAspectInject(t *testing.T) {
t.Run("log_handler", func(t *testing.T) {
ctx := context.Background()
hb := &loggerCallbacks{}
runInfo := &RunInfo{
Name: "test_log",
Type: "sdf",
Component: "ChatTemplate",
}
ctx = InitCallbacks(ctx, runInfo, hb)
ctx = OnStart(ctx, 1)
//業(yè)務(wù)邏輯處理此處省略......
ctx = OnEnd(ctx, 2)
ctx = OnError(ctx, fmt.Errorf("3"))
})
}
示例輸出結(jié)果如下:

image.png
- 示例中, 首先需要 InitCallbacks(ctx, runInfo, hb)函數(shù)來通過初始化回調(diào)的運行時信息(RunInfo),處理器信息 (Handler), 以及上下文信息和Manager, 具體實現(xiàn)如下:
func InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) context.Context {
mgr, ok := newManager(info, handlers...)
if ok {
return ctxWithManager(ctx, mgr)
}
return ctxWithManager(ctx, nil)
}
var GlobalHandlers []Handler
func newManager(runInfo *RunInfo, handlers ...Handler) (*manager, bool) {
if len(handlers)+len(GlobalHandlers) == 0 {
return nil, false
}
hs := make([]Handler, len(GlobalHandlers))
copy(hs, GlobalHandlers)
return &manager{
globalHandlers: hs,
handlers: handlers,
runInfo: runInfo,
}, true
}
func ctxWithManager(ctx context.Context, manager *manager) context.Context {
return context.WithValue(ctx, CtxManagerKey, manager)
}
func managerFromCtx(ctx context.Context) (*manager, bool) {
v := ctx.Value(CtxManagerKey)
m, ok := v.(*manager)
if ok && m != nil {
n := *m
return &n, true
}
return nil, false
}
manager 主要用于管理處理器, 并通過上下文context 在調(diào)用鏈上進行傳遞, 類型定義如下:
type manager struct {
globalHandlers []Handler
handlers []Handler
runInfo *RunInfo
}
- 示例中通過 ctx = OnStart(ctx, 1) 來聲明業(yè)務(wù)邏輯處理開始時的切面操作, 并傳入業(yè)務(wù)邏輯數(shù)據(jù), OnStart實現(xiàn)如下:
// OnStart invokes the OnStart logic for the particular context, ensuring that all registered
// handlers are executed in reverse order (compared to add order) when a process begins.
func OnStart[T any](ctx context.Context, input T) context.Context {
ctx, _ = On(ctx, input, OnStartHandle[T], true)
return ctx
}
// 遍歷handler, 并調(diào)用handler的OnStart函數(shù)
func OnStartHandle[T any](ctx context.Context, input T,
runInfo *RunInfo, handlers []Handler) (context.Context, T) {
for i := len(handlers) - 1; i >= 0; i-- {
ctx = handlers[i].OnStart(ctx, runInfo, input)
}
return ctx, input
}
type Handle[T any] func(context.Context, T, *RunInfo, []Handler) (context.Context, T)
func On[T any](ctx context.Context, inOut T, handle Handle[T], start bool) (context.Context, T) {
mgr, ok := managerFromCtx(ctx)
if !ok {
return ctx, inOut
}
nMgr := *mgr
var info *RunInfo
if start {
info = nMgr.runInfo
nMgr.runInfo = nil
ctx = context.WithValue(ctx, CtxRunInfoKey, info)
} else {
if nMgr.runInfo != nil {
info = nMgr.runInfo
} else {
info, _ = ctx.Value(CtxRunInfoKey).(*RunInfo)
}
}
hs := make([]Handler, 0, len(nMgr.handlers)+len(nMgr.globalHandlers))
for _, handler := range append(nMgr.handlers, nMgr.globalHandlers...) {
hs = append(hs, handler)
}
var out T
// 從ctx中的manager中獲取處理器, 并調(diào)用handler的OnStart函數(shù)
ctx, out = handle(ctx, inOut, info, hs)
return ctxWithManager(ctx, &nMgr), out
}
- OnEnd 與 OnError 實現(xiàn)類似:
// OnEnd invokes the OnEnd logic of the particular context, allowing for proper cleanup
// and finalization when a process ends.
// handlers are executed in normal order (compared to add order).
func OnEnd[T any](ctx context.Context, output T) context.Context {
ctx, _ = On(ctx, output, OnEndHandle[T], false)
return ctx
}
// OnError invokes the OnError logic of the particular, notice that error in stream will not represent here.
// handlers are executed in normal order (compared to add order).
func OnError(ctx context.Context, err error) context.Context {
ctx, _ = On(ctx, err, OnErrorHandle, false)
return ctx
}
func OnErrorHandle(ctx context.Context, err error,
runInfo *RunInfo, handlers []Handler) (context.Context, error) {
for _, handler := range handlers {
ctx = handler.OnError(ctx, runInfo, err)
}
return ctx, err
}
func OnEndHandle[T any](ctx context.Context, output T,
runInfo *RunInfo, handlers []Handler) (context.Context, T) {
for _, handler := range handlers {
ctx = handler.OnEnd(ctx, runInfo, output)
}
return ctx, output
}
完整代碼:
https://github.com/lbbwyt/eino_test/blob/master/callbacks/aspect_inject_test.go