Kafka介紹之安裝配置API

Kafka集群安裝步驟:

? ? 1、解壓

?? 2、修改server.properties

???????????? broker.id=1

???????????? zookeeper.connect=weekend05:2181,weekend06:2181,weekend07:2181

? 3、將zookeeper集群啟動

? 4、在每一臺節(jié)點(diǎn)上啟動broker

??????????? bin/kafka-server-start.sh config/server.properties

?? 5、在kafka集群中創(chuàng)建一個topic

?????????? bin/kafka-topics.sh --create --zookeeper zk_hostname:2181 --replication-factor 3 --partitions 1 --topic topic_name

?? 6、用一個producer向某一個topic中寫入消息

????????? bin/kafka-console-producer.sh --broker-list broker_hostname:9092 --topic topic_name

??? 7、用一個comsumer從某一個topic中讀取信息

???????? bin/kafka-console-consumer.sh --zookeeper zk_hostname:2181 --from-beginning --topic topic-name

??? 8、查看一個topic的分區(qū)及副本狀態(tài)信息

???????? bin/kafka-topics.sh --describe --zookeeper zk_hostname:2181 --topic topic_name


Java API

???? Producer端:


?????????????? public class ProducerDemo{

?????????????????????? public static void main(String[] args){

?????????????????????????????? //創(chuàng)建Properties實(shí)例,設(shè)置屬性

????????????????????????????? Properties properties = new Properties();

????????????????????????????? //聲明zk ???????????????????????????

???????????????????????????? properties.put("zk.connect","zk_hostname:2181,....");

?????????????????????????????? //聲明broker ??????????????????????????

????????????????????????????? properties.put("metadata.broker.list","broker_hostname:9092,.....");

????????????????????????????? properties.put("serializer.class","kafka.serializer.StringEncoder");

?????????????????????????????? //創(chuàng)建ProducerConfig配置實(shí)例 ????????????????????????????

????????????????????????????? ProducerConfig config = new ProducerConfig(properties);

?????????????????????????????? //創(chuàng)建Producer實(shí)例 ???????????????????????????

????????????????????????????? Producer<String,String> producer = new Producer<String,String>(config);

????????????????????????????? //發(fā)送消息 ????????????????????????

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? producer.send(new KeyedMessage<String,String>(topic_name,content));

???????????? }

}


Consumer端:




???????? public class ConsumerDemo{

????????????????????? public static void main(String[] args){

???????????????????????????? //創(chuàng)建Properties實(shí)例,設(shè)置屬性

????????????????????????? Properties properties = new Properties();

????????????????????????? properties.put("zookeeper.connect","zk_hostname:2181,....");

????????????????????? ? ? ? ? //如果生產(chǎn)者和消費(fèi)者在同一個組中,則不能訪問同一組Topic內(nèi)的數(shù)據(jù)

????????????????????????? properties.put("group.id","id_name");

?????????????????????????? //聲明ConsumerConnector

??????????????????????? ConsumerConnector consumer =? Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

? ? ? ? ? ? ? ? ? ? ? ? MaptopicCountMap = new HashMap();

??????????????????????? //一次從一個主題中讀取數(shù)據(jù)

?????????????????????? topicCountMap.put(topic, 1);

??????????????????????? topicCountMap.put(“topic_name", 1);

??????????????????????? topicCountMap.put("topic_name1", 1);

? ? ? ? ? ? ? ? ? ? ? ? Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap =

???????????????????????????????????????? consumer.createMessageStreams(topicCountMap);

? ? ? ? ? ? ? ? ? ? ? List<KafkaStream<byte[],byte[]>> streams = consumerMap.get("topic_name");

??????????????????????? for(final KafkaStreamkafkaStream : streams){

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? new Thread(new Runnable() {

???????????????????????????????????????????????? @Override

???????????????????????????????????????? public void run() {

??????????????????????????????????????????????? for(MessageAndMetadatamm : kafkaStream){

????????????????????????????????????????????? ? ? ?? ?? String msg = new String(mm.message());

??????????????????????????????????????????? ? ? ? ? ? ?? System.out.println(msg);

?????????????????????????????????????????????????? }

??????????????????????????????????? }

????????? ? ? ? ?? ??? }).start();

??????? ? ? ?? }

???????? }


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評論 19 139
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,984評論 4 54
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,585評論 0 34
  • ** 今天看了一下kafka官網(wǎng),嘗試著在自己電腦上安裝和配置,然后學(xué)一下官方document。** Introd...
    RainChang閱讀 5,145評論 1 30
  • kafka的定義:是一個分布式消息系統(tǒng),由LinkedIn使用Scala編寫,用作LinkedIn的活動流(Act...
    時待吾閱讀 5,539評論 1 15

友情鏈接更多精彩內(nèi)容