我們都會有從異步隊列中消費的需求,今天來說下gobox中的consumer處理框架
consumer處理架構(gòu)圖

gobox-consumer.png
重要的對象
IMessage
定義每條消息
type IMessage interface {
Body() []byte
}
ConsumerHandleFunc
consumer中從隊列收到每條消息后,調(diào)用這個方法
type ConsumerHandleFunc func(message IMessage) error
IConsumer
定義消費者行為
type IConsumer interface {
SetHandleFunc(hf ConsumerHandleFunc)
Start()
Stop()
}
NewWorkerFunc
每個Worker的構(gòu)造方法
type NewWorkerFunc func() IWorker
IWorker
定義Worker
type IWorker interface {
SetWorkId(id int)
SetLogger(logger golog.ILogger)
Work(wg *sync.WaitGroup, lineCh chan []byte, stopCh chan bool)
}
LineProcessFunc
每條消息在Worker中的實際處理方法
type LineProcessFunc func(line []byte) error
BaseWorker
框架提供的一個簡單基礎(chǔ)Worker對象,組合這個對象后,只需要實現(xiàn)LineProcessFunc即可
type BaseWorker struct
Task
Task用于實現(xiàn)consumer的處理框架
使用示例
package main
import (
"github.com/goinbox/goconsumer"
"fmt"
"strconv"
"time"
)
// 這里實現(xiàn)Worker
type DemoWorker struct {
*goconsumer.BaseWorker
}
func NewDemoWorker() goconsumer.IWorker {
worker := &DemoWorker{goconsumer.NewBaseWorker()}
worker.SetLineProcessFunc(worker.LineProcessFunc)
return worker
}
func (d *DemoWorker) LineProcessFunc(line []byte) error {
idStr := strconv.Itoa(d.Id)
fmt.Println("wid:" + idStr + " process line:" + string(line))
return nil
}
// 這里實現(xiàn)Message
type DemoMessage struct {
body []byte
}
func (d *DemoMessage) Body() []byte {
return d.body
}
// 這里實現(xiàn)一個簡單的Consumer,模擬從隊列中獲得100條消息
type DemoConsumer struct {
hf goconsumer.ConsumerHandleFunc
}
func (d *DemoConsumer) SetHandleFunc(hf goconsumer.ConsumerHandleFunc) {
d.hf = hf
}
func (d *DemoConsumer) Start() {
for i := 0; i < 100; i++ {
str := "This message is from DemoConsumer loop " + strconv.Itoa(i)
d.hf(&DemoMessage{[]byte(str)})
}
time.Sleep(time.Second * 1)
}
func (d *DemoConsumer) Stop() {
}
// 執(zhí)行Task任務(wù),調(diào)用consumer處理框架
func main() {
task := goconsumer.NewTask("Demo")
consumer := new(DemoConsumer)
task.SetConsumer(consumer).
SetWorker(10, NewDemoWorker).
Start()
}
歡迎大家使用,使用中有遇到問題隨時反饋,我們會盡快響應(yīng),謝謝!