kafka node 使用

KafkaClient:

概念:能夠直接連接Kafka brokers的client
初始化:const client = new kafka.KafkaClient({kafkaHost: '10.3.100.196:9092'}); 沒有填寫kafkaHost默認(rèn)是:localhost:9092

Producer

初始化:Producer(KafkaClient, [options], [customPartitioner])
例:

var kafka = require('kafka-node'),
    Producer = kafka.Producer,
    client = new kafka.KafkaClient(),
    producer = new Producer(client);

發(fā)送消息:
send(payloads, cb)

payloads:數(shù)組,item形如json
{
   topic: 'topicName',
   messages: ['message body'], // multi messages should be a array, single message can be just a string or a KeyedMessage instance
   key: 'theKey', // string or buffer, only needed when using keyed partitioner
   partition: 0, // default 0
   attributes: 2, // default: 0
   timestamp: Date.now() // <-- defaults to Date.now() (only available with kafka v0.10+)
}
cb: 處理成功或失敗的回調(diào)函數(shù)

創(chuàng)建Topic
createTopics(topics, cb)

HighLevelProducer

HighLevelProducer(KafkaClient, [options], [customPartitioner])
send(payloads, cb)
createTopics(topics, async, cb)

ProducerStream

ProducerStream (options)

案例:
使用Transform去更新數(shù)據(jù)

Consumer

Consumer(client, payloads, options)
on('error', function (err) {})
on('offsetOutOfRange', function (err) {})

addTopics(topics, cb, fromOffset)
removeTopics(topics, cb)
commit(cb)
setOffset(topic, partition, offset)
pause()
resume()
pauseTopics(topics)
resumeTopics(topics)
close(force, cb)

ConsumerStream

ConsumerStream(client, payloads, options)

ConsumerGroup

ConsumerGroup(options, topics)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容