使用NodeJS連接Kafka(Kafka-node組件)

通常好產(chǎn)品可以理解為設(shè)計(jì)的原理很奇妙,能夠健壯地執(zhí)行,使用者實(shí)戰(zhàn)也不用花太多精力學(xué)習(xí)。
Kafka是一款簡單又實(shí)用的產(chǎn)品。知識的深入通常都是實(shí)戰(zhàn)中遇到問題累積,沉綻后得到提升。好的產(chǎn)品設(shè)計(jì)的初衷通常是讓使用者可以快速上手,馬上投入運(yùn)行。 至于是否真要深入,那就看使用者意愿了。 畢竟市面上成熟東西實(shí)在太多了,有足夠的精力更應(yīng)該多挖掘其他有前景的產(chǎn)品。
雖然這樣說,在使用前Kafka該懂的原理還是必須得了解的。 尤其是Partitions分區(qū)(簡單點(diǎn),就是多條隊(duì)列分支)。可謂不懂就不知道如何高性能應(yīng)用。
讀了網(wǎng)上很多的相關(guān)介紹,這一篇個人認(rèn)為其圖解是最通俗易懂的:漫畫:圖解 Kafka,看本篇就足夠啦
建議沒有掌握其原理的使用者可以先閱讀了解一二。

使用docker-compose 部署Kafka

由于kafka運(yùn)行依賴zookeeper,除了安裝kafka外,還得先在系統(tǒng)上安裝zookeeper。
經(jīng)過網(wǎng)上的各種介紹和試錯。最簡單無腦的安裝方式就是使用docker compose。
創(chuàng)建好docker-compose.yml 文件

version: '3.9'


services:
  zookeeper: 
    image: wurstmeister/zookeeper  ## 鏡像
    container_name: zookeeper  ## 容器名稱
    ports:
      - "2181:2181"   ## 對外暴露的端口號
    volumes:   
      - /etc/localtime:/etc/localtime  ## 掛載時(shí)區(qū)(kafka鏡像和宿主機(jī)器之間時(shí)間保持一致)
  kafka:
    image: wurstmeister/kafka ## 鏡像
    container_name: kafka ## 容器名稱
    ports:
      - "9092:9092" ## 對外暴露的端口號
    environment: ## 環(huán)境變量
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 #按實(shí)際應(yīng)該是寫入宿主IP
      KAFKA_CREATE_TOPICS: "test:1:1"
    depends_on: ## 依賴情況
      - zookeeper

構(gòu)建容器即可:

cd [當(dāng)前存放docker-compose.yml的目錄]
docker-compose up -d

成功后,查看容器信息

% docker ps
CONTAINER ID   IMAGE                    COMMAND                  CREATED          STATUS          PORTS                                                                   NAMES
bb4a4fc60f84   wurstmeister/kafka       "start-kafka.sh"         58 minutes ago   Up 58 minutes   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp                               kafka
b10413ec436e   wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   59 minutes ago   Up 59 minutes   22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp   zookeeper

測試驗(yàn)證

進(jìn)入容器

docker exec -it kafka bash

1、創(chuàng)建 一個topic
參數(shù)說明:
--create --topic test 指示了要創(chuàng)建topic名字叫 test
--zookeeper zookeeper:2181 指示了 zookeeper 服務(wù)的地址

$ kafka-topics.sh --create --topic test --partitions 4 --zookeeper zookeeper:2181 --replication-factor 1

2、查看 topic 詳情

$ kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test Topic: test    TopicId: Pu_5qECuTR-Gfi7Br9qCvA PartitionCount: 1   ReplicationFactor: 1    Configs: 
    Topic: test Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

3、獲得topic的列表

$ kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
test

4、消息生產(chǎn)(即發(fā)送消息)

$ kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
#可以發(fā)送消息
>hello
>hello world

5、消費(fèi)消息(即指接收消息方)

$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test
#可以開始接收到 producer.sh 發(fā)送的消息。

``

NodeJS連接Kafka(使用Kafka-node組件)

建立一個NodeJS項(xiàng)目(這里略過),下一步就是安裝依賴

npm i kafka-node

第一個步驟就是連接kafka服務(wù)的代碼

#引入組件
const kafka = require('kafka-node');
const client = new kafka.KafkaClient({kafkaHost: '127.0.0.1:9092'})

第二步,可以通過代碼的方式創(chuàng)建topic

#創(chuàng)建兩個topic
#其中主題為topic-for-1 擁有1個分區(qū)數(shù)
#主為 topic-for-8 擁有4個分區(qū)數(shù)

function createTopics(){
        const topicsToCreate = [{
            topic: 'topic-for-1',
            partitions: 1,     //分區(qū)數(shù)量
            replicationFactor: 1        //副本數(shù)量
          },
          {
            topic: 'topic-for-8',
            partitions: 4,           //分區(qū)數(shù)量
            replicationFactor: 1,     //副本數(shù)量
            // Optional set of config entries
            configEntries: [
              {
                name: 'compression.type',
                value: 'gzip'
              },
              {
                name: 'min.compaction.lag.ms',
                value: '50'
              }
            ]            
          }];

          client.createTopics(topicsToCreate, (error, result) => {
                console.log(result);
                console.log(error);
          });
    }

第三步,先編寫消費(fèi)消息(即指接收消息方)的代碼段
這個組件會提供兩種的類(kafka.Consumer 和kafka.ConsumerGroup)給使用者使用。通常情況下,使用Consumer即可。 ConsumerGroup封裝了kafka.KafkaClient和kafka.Consumer,并且提供了更多可控的參數(shù)。

function createConsumer(callback){
        //消費(fèi)者
        let self=this;
        let consumer = new kafka.Consumer(this.client,
        [
             { topic: 'topic-for-1', partition: 0 ,groupId:"my_group"},
             { topic: 'topic-for-8', partition: 1 }

#注:監(jiān)聽兩個topic,分別名為topic-for-1,topic-for-8。其中
#監(jiān)聽topic-for-1的0號分區(qū),設(shè)置組名為 my_group。
#即如果還有機(jī)器加入,并且組名也為my_group時(shí),則代表同屬同一個組。
         ],{
            autoCommit:true    //為true,代表自動提交offset
        });
        consumer.on('message',function(message){
            console.log('consumer receive message:')
            console.log(message);    
            callback(message);       
        });

        consumer.on('error',function(err){
            console.log('consumer err:')
            console.log(err);
        });
        return consumer;
    }

第四步,可以建立另一個項(xiàng)目來作為構(gòu)建消息生產(chǎn),或者同一個項(xiàng)目下使用setTimeout之類函數(shù)做演示。
組件提供兩種消息生產(chǎn)(即發(fā)送消息)的類:kafka.HighLevelProducer和kafka.Producer。用法是一致,其中kafka.HighLevelProducer為高可用消息生產(chǎn)。

function createHighLevelProducter(callback){
        const producer=new kafka.HighLevelProducer(client);
        producer.on('ready',function(){
             console.log('kafka producer is connected and ready');
             callback(producer);
        });
        producer.on('error',function(err){
             console.log('kafka producer is error');
        });
}

最后,封裝一系列的函數(shù)后, 如果只在一個終端上,我們就可以編寫一小段代碼來做測試。

const kafka = require('kafka-node');
const client = new kafka.KafkaClient({kafkaHost: '127.0.0.1:9092'})

#創(chuàng)建topic
createTopics();

#啟動消息消費(fèi)

createConsumer((message)=>{
      console.log('event callback receive message:');
      console.log(message);
});

createHighLevelProducter((producter)=>{
  
    #對主題:topic-for-1的第1個分區(qū)發(fā)送消息,對主題:topic-for-8的第2個分區(qū)發(fā)送消息。
    setTimeout(()=>{
           const record={
            topic:'topic-for-1',
            message:"Hello topic1!",
            key:'topic1-1',
            attributes:1,
            partition:0
          },
          {
            topic:'topic-for-8',
            message:"Hello topic8。",
            key:'topic8-1',
            attributes:1,
            partition:1
          };
          producter.send(record, (err, data) => {
                if (err) {
                  console.log(`producer send err: ${err}`)
                }
                console.log(data);
            });              
    },500);
});

完成后可以觀察具體的數(shù)據(jù)結(jié)果。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容