golang kafka小試消息隊(duì)列

Kafka 安裝配置、更多資料請(qǐng)參考其官網(wǎng)。

啟動(dòng) kafka server

在這之前需要啟動(dòng) zookeeper 做服務(wù)治理(單機(jī))。

$ bin/zkServer.sh status conf/zoo_sample.cfg

如提示權(quán)限限制加上 sudo 。

啟動(dòng) kafka server

$ bin/kafka-server-start.sh config/server.properties

啟動(dòng)消息隊(duì)列(本部分僅為測(cè)試 server)

新建 Topic

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

$ bin/kafka-topics.sh --list --zookeeper localhost:2181 (test)

1. 啟動(dòng) Producer

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

2. 啟動(dòng) Consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

此時(shí)在 Producer 端發(fā)送消息,在 Consumer 就會(huì)顯示,如下圖所示。

(上圖中 Consumer 多出了好幾個(gè)消息是我截圖之前測(cè)試發(fā)出的)


Action

本文使用 sarama 庫(kù)作為 kafka 的 go API。sarama 庫(kù)沒(méi)有給出很具體的文檔,可以參考其源碼。

Producer

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true

    addr := []string{"localhost:9092"}

    producer, err := sarama.NewSyncProducer(addr, config)
    if err != nil {
        panic(err)
    }

    defer producer.Close()

    msg := &sarama.ProducerMessage{
        Topic:     "hello",
        Partition: int32(-1),
        Key:       sarama.StringEncoder("key"),
    }

    var value string
    for {
        _, err := fmt.Scanf("%s", &value)
        if err != nil {
            break
        }
        msg.Value = sarama.ByteEncoder(value)
        fmt.Println(value)

        partition, offset, err := producer.SendMessage(msg)
        if err != nil {
            fmt.Println("Send message Fail")
        }
        fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
    }
}

Consumer

package main

import (
    "fmt"
    "sync"
    "github.com/Shopify/sarama"
)

var (
    wg  sync.WaitGroup
)

func main() {
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
    if err != nil {
        panic(err)
    }

    partitionList, err := consumer.Partitions("hello")
    if err != nil {
        panic(err)
    }

    for partition := range partitionList {
        pc, err := consumer.ConsumePartition("hello", int32(partition), sarama.OffsetNewest)
        if err != nil {
            panic(err)
        }

        defer pc.AsyncClose()

        wg.Add(1)

        go func(sarama.PartitionConsumer) {
            defer wg.Done()
            for msg := range pc.Messages() {
                fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
            }

        }(pc)
    }
    wg.Wait()
    consumer.Close()
}

結(jié)果如下:

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,506評(píng)論 19 139
  • 背景介紹 Kafka簡(jiǎn)介 Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計(jì)目標(biāo)如下: 以時(shí)間復(fù)雜度為O...
    高廣超閱讀 13,042評(píng)論 8 167
  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,888評(píng)論 13 425
  • 我傾向于用PC寫文章,因?yàn)檫@樣能專注于屏幕,而不是鍵盤,手機(jī)就不同了,要看鍵盤。好吧,我承認(rèn)這是我手機(jī)沒(méi)有練習(xí)過(guò)兩...
    郭青耀閱讀 121評(píng)論 0 0
  • 2018-03-18 【指繪】 [背景建筑線稿是素材] 打了耳洞后用新軟件畫的第一幅圖 (軟件名:Medibang...
    木子明婳閱讀 263評(píng)論 0 1

友情鏈接更多精彩內(nèi)容