gobox中的consumer處理框架

我們都會有從異步隊列中消費的需求,今天來說下gobox中的consumer處理框架

consumer處理架構(gòu)圖

gobox-consumer.png

重要的對象

IMessage

定義每條消息

type IMessage interface {
    Body() []byte
}

ConsumerHandleFunc

consumer中從隊列收到每條消息后,調(diào)用這個方法

type ConsumerHandleFunc func(message IMessage) error

IConsumer

定義消費者行為

type IConsumer interface {
    SetHandleFunc(hf ConsumerHandleFunc)
    Start()
    Stop()
}

NewWorkerFunc

每個Worker的構(gòu)造方法

type NewWorkerFunc func() IWorker

IWorker

定義Worker

type IWorker interface {
    SetWorkId(id int)
    SetLogger(logger golog.ILogger)

    Work(wg *sync.WaitGroup, lineCh chan []byte, stopCh chan bool)
}

LineProcessFunc

每條消息在Worker中的實際處理方法

type LineProcessFunc func(line []byte) error

BaseWorker

框架提供的一個簡單基礎(chǔ)Worker對象,組合這個對象后,只需要實現(xiàn)LineProcessFunc即可

type BaseWorker struct

Task

Task用于實現(xiàn)consumer的處理框架

使用示例

package main

import (
    "github.com/goinbox/goconsumer"

    "fmt"
    "strconv"
    "time"
)

// 這里實現(xiàn)Worker
type DemoWorker struct {
    *goconsumer.BaseWorker
}

func NewDemoWorker() goconsumer.IWorker {
    worker := &DemoWorker{goconsumer.NewBaseWorker()}
    worker.SetLineProcessFunc(worker.LineProcessFunc)

    return worker
}

func (d *DemoWorker) LineProcessFunc(line []byte) error {
    idStr := strconv.Itoa(d.Id)
    fmt.Println("wid:" + idStr + " process line:" + string(line))

    return nil
}

// 這里實現(xiàn)Message
type DemoMessage struct {
    body []byte
}

func (d *DemoMessage) Body() []byte {
    return d.body
}

// 這里實現(xiàn)一個簡單的Consumer,模擬從隊列中獲得100條消息
type DemoConsumer struct {
    hf goconsumer.ConsumerHandleFunc
}

func (d *DemoConsumer) SetHandleFunc(hf goconsumer.ConsumerHandleFunc) {
    d.hf = hf
}

func (d *DemoConsumer) Start() {
    for i := 0; i < 100; i++ {
        str := "This message is from DemoConsumer loop " + strconv.Itoa(i)
        d.hf(&DemoMessage{[]byte(str)})
    }

    time.Sleep(time.Second * 1)
}

func (d *DemoConsumer) Stop() {

}


// 執(zhí)行Task任務(wù),調(diào)用consumer處理框架
func main() {
    task := goconsumer.NewTask("Demo")
    consumer := new(DemoConsumer)

    task.SetConsumer(consumer).
        SetWorker(10, NewDemoWorker).
        Start()
}

歡迎大家使用,使用中有遇到問題隨時反饋,我們會盡快響應(yīng),謝謝!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,591評論 19 139
  • 大數(shù)據(jù)技術(shù)框架 1. 簡介 2. Hadoop框架2.1. Hadoop-MapReduce2.1.1. 簡介:2...
    sunTengSt閱讀 12,392評論 1 77
  • 1.ios高性能編程 (1).內(nèi)層 最小的內(nèi)層平均值和峰值(2).耗電量 高效的算法和數(shù)據(jù)結(jié)構(gòu)(3).初始化時...
    歐辰_OSR閱讀 30,260評論 8 265
  • 從我在父母的脅迫下出生那刻起, 便奮不顧身(其實是身不由己)的扎進了時間的牢籠。 小時候想著追風(fēng), 現(xiàn)在想來,莫不...
    光皂閱讀 420評論 0 1
  • 今日下大雨,但是我的公益剪紙課還是如期舉行。 我的兩個大弟子,志誠和若愚都做了我的志愿者,給小朋友當小老師...
    從爾閱讀 527評論 0 0

友情鏈接更多精彩內(nèi)容