Kafka初試

昨天在測試環(huán)境搭建了一套zookeeper+kafka(各一臺)的機器,開始進行kafka的實踐之旅。昨天下班前一直都出現(xiàn)無法發(fā)送無法接收的問題,今天終于搞定了。

zookeeper的安裝

直接從官網(wǎng)下載bin包后,解壓即可

tar -zxvf zookeeper-3.4.9.tar.gz

需要修改的配置有:

  1. 把conf目錄下的zoo_sample.cfg改名為zoo.cfg(并修改dataDir)
  2. 修改bin目錄下的zkEnv.sh腳本中的ZOO_LOG_DIR和ZOO_LOG4J_PROP

啟動zookeeper

bin/zkServer.sh start

Kafka的安裝

由于只使用了一個broker,所以直接解壓包

tar -zxvf kafka_2.11-0.10.2.0.tgz

需要修改的配置為config/server.properties文件,主要修改的有l(wèi)og.dirs和listeners。

listeners=PLAINTEXT://localhost:9092

這里有個坑,server.properties中一定要配置host.name或者listeners,不然會出現(xiàn)無法收發(fā)消息的現(xiàn)象
然后啟動即可

bin/kafka-server-start.sh config/server.properties &

客戶端

安裝完以后需要寫生產(chǎn)者的消費者了,直接用最簡單的方法來寫。

Producer

package producer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class Producer {
  public static void main(String[] args) throws InterruptedException, ExecutionException {
    Properties props = new Properties();
    props.put("bootstrap.servers","122.20.109.68:9092");
    props.put("acks","1");
    props.put("retries","0");
    props.put("batch.size","16384");
// props.put("linger.ms","1");
// props.put("buffer.memory","33554432");
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//生產(chǎn)者的建立
    KafkaProducer producer = new KafkaProducer<>(props);

    for (int i=0;i<100;i++) {
      System.out.println("seding message "+i);
      ProducerRecord record = new ProducerRecord("testTopic",String.valueOf(i),"this is message"+i);
      producer.send(record, new Callback() {
        public void onCompletion (RecordMetadata metadata, Exception e) {
          if (null != e) {
            e.printStackTrace();
          } else {
            System.out.println(metadata.offset());
          }
        }
      });
    }
    Thread.sleep(100000);
    producer.close();
  }
} 

這里有個坑,如果我直接用producer.send(ProducerRecord)方法,發(fā)完100條以后producer.close(),會導(dǎo)致Kafka無法收到消息,懷疑是異步發(fā)送導(dǎo)致的,需要真的發(fā)送到Kafka以后才能停止Producer,所以我在后面sleep了一下,加上以后就可以正常發(fā)送了。
使用callback是異步發(fā)送,此外還能使用同步發(fā)送,直接在send方法后加上一個get方法就會直接阻塞直到broker返回消息已收到。

producer.send(record).get();

Producer的properties有幾個常用配置:

  • bootstrap.servers:Kafka集群連接串,可以由多個host:port組成
  • acks:broker消息確認的模式,有三種:
    0:不進行消息接收確認,即Client端發(fā)送完成后不會等待Broker的確認
    1:由Leader確認,Leader接收到消息后會立即返回確認信息
    all:集群完整確認,Leader會等待所有in-sync的follower節(jié)點都確認收到消息后,再返回確認信息
    我們可以根據(jù)消息的重要程度,設(shè)置不同的確認模式。默認為1
  • retries:發(fā)送失敗時Producer端的重試次數(shù),默認為0
  • batch.size:當(dāng)同時有大量消息要向同一個分區(qū)發(fā)送時,Producer端會將消息打包后進行批量發(fā)送。如果設(shè)置為0,則每條消息都DuLi發(fā)送。默認為16384字節(jié)
  • linger.ms:發(fā)送消息前等待的毫秒數(shù),與batch.size配合使用。在消息負載不高的情況下,配置linger.ms能夠讓Producer在發(fā)送消息前等待一定時間,以積累更多的消息打包發(fā)送,達到節(jié)省網(wǎng)絡(luò)資源的目的。默認為0
  • key.serializer/value.serializer:消息key/value的序列器Class,根據(jù)key和value的類型決定
  • buffer.memory:消息緩沖池大小。尚未被發(fā)送的消息會保存在Producer的內(nèi)存中,如果消息產(chǎn)生的速度大于消息發(fā)送的速度,那么緩沖池滿后發(fā)送消息的請求會被阻塞。默認33554432字節(jié)(32MB)

Consumer

package consumer;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;


public class Consumer {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers","122.20.109.68:9092");
    props.put("group.id","test");
    props.put("enable.auto.commit","true");
    props.put("auto.commit.interval.ms","1000");
    props.put("session.timeout.ms","30000");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("testTopic"));
    while(true) {
      ConsumerRecords records = consumer.poll(1000);
      for (ConsumerRecord record: records) {
        System.out.println("offset "+record.offset()+" Message: "+record.value());
      }
    }
  }
}

Consumer的Properties的常用配置有:

  • bootstrap.servers/key.deserializer/value.deserializer:和Producer端的含義一樣,不再贅述
  • fetch.min.bytes:每次最小拉取的消息大?。╞yte)。Consumer會等待消息積累到一定尺寸后進行批量拉取。默認為1,代表有一條就拉一條
  • max.partition.fetch.bytes:每次從單個分區(qū)中拉取的消息最大尺寸(byte),默認為1M
  • group.id:Consumer的group id,同一個group下的多個Consumer不會拉取到重復(fù)的消息,不同group下的Consumer則會保證拉取到每一條消息。注意,同一個group下的consumer數(shù)量不能超過分區(qū)數(shù)。
  • enable.auto.commit:是否自動提交已拉取消息的offset。提交offset即視為該消息已經(jīng)成功被消費,該組下的Consumer無法再拉取到該消息(除非手動修改offset)。默認為true
  • auto.commit.interval.ms:自動提交offset的間隔毫秒數(shù),默認5000。

參考:http://www.cnblogs.com/edison2012/p/5774207.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,981評論 4 54
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,544評論 19 139
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,583評論 0 34
  • 背景介紹 Kafka簡介 Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計目標如下: 以時間復(fù)雜度為O...
    高廣超閱讀 13,051評論 8 167
  • 一、入門1、簡介Kafka is a distributed,partitioned,replicated com...
    HxLiang閱讀 3,673評論 0 9

友情鏈接更多精彩內(nèi)容