Kafka是一個分布式流媒體平臺。發(fā)布和訂閱記錄流,類似于消息隊列或企業(yè)消息傳遞系統(tǒng)。以容錯持久的方式存儲記錄流。處理記錄發(fā)生的流。本文講述如何使用java API 操作kafka集群
主要內(nèi)容:
- 1.消息生產(chǎn)者
- 2.消息消費者
- 3.測試
1.消息生產(chǎn)者
1.1.引入依賴
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.1</version>
</dependency>
1.2.編寫生產(chǎn)者
public class KafkaProducerClient {
public static void main(String[] args) {
//1、準備配置文件 參考:ProducerConfig.java
Properties props = new Properties();
// kakfa 服務(wù)列表
props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
/**
* 當生產(chǎn)者將ack設(shè)置為“全部”(或“-1”)時,min.insync.replicas指定必須確認寫入被認為成功的最小副本數(shù)。
* 如果這個最小值不能滿足,那么生產(chǎn)者將會引發(fā)一個異常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。
* 當一起使用時,min.insync.replicas和acks允許您執(zhí)行更大的耐久性保證。
* 一個典型的情況是創(chuàng)建一個復(fù)制因子為3的主題,將min.insync.replicas設(shè)置為2,并使用“全部”選項來產(chǎn)生。
* 這將確保生產(chǎn)者如果大多數(shù)副本沒有收到寫入引發(fā)異常。
*/
props.put("acks", "all");
/**
* 設(shè)置一個大于零的值,將導(dǎo)致客戶端重新發(fā)送任何失敗的記錄
*/
props.put("retries", 0);
/**
* 只要有多個記錄被發(fā)送到同一個分區(qū),生產(chǎn)者就會嘗試將記錄一起分成更少的請求。
* 這有助于客戶端和服務(wù)器的性能。該配置以字節(jié)為單位控制默認的批量大小。
*/
props.put("batch.size", 16384);
/**
* 在某些情況下,即使在中等負載下,客戶端也可能希望減少請求的數(shù)量。
* 這個設(shè)置通過添加少量的人工延遲來實現(xiàn)這一點,即不是立即發(fā)出一個記錄,
* 而是等待達到給定延遲的記錄,以允許發(fā)送其他記錄,以便發(fā)送可以一起批量發(fā)送
*/
props.put("linger.ms", 1);
/**
* 生產(chǎn)者可用于緩沖等待發(fā)送到服務(wù)器的記錄的總字節(jié)數(shù)。
* 如果記錄的發(fā)送速度比發(fā)送給服務(wù)器的速度快,那么生產(chǎn)者將會阻塞,max.block.ms之后會拋出異常。
* 這個設(shè)置應(yīng)該大致對應(yīng)于生產(chǎn)者將使用的總內(nèi)存,但不是硬性限制,
* 因為不是所有生產(chǎn)者使用的內(nèi)存都用于緩沖。
* 一些額外的內(nèi)存將被用于壓縮(如果壓縮被啟用)以及用于維護正在進行的請求。
*/
props.put("buffer.memory", 33554432);
// key的序列化類型
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value的序列化類型
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2、創(chuàng)建KafkaProducer
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(props);
for (int i = 0; i < 100; i++) {
//3、發(fā)送數(shù)據(jù)
kafkaProducer.send(new ProducerRecord("TEST_JAVA", "key"+i, "value" + i));
}
kafkaProducer.flush();
kafkaProducer.close();
}
}
2.消息消費者
2.1.引入依賴
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.1</version>
</dependency>
2.2.編寫消費者
public class KafkaConsumerClient {
public static void main(String[] args) {
//1、準備配置文件 參考:ConsumerConfig.scala
Properties props = new Properties();
// kakfa 服務(wù)列表
props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
//一個字符串用來指示一組consumer所在的組
props.put("group.id", "test");
// 如果true,consumer定期地往zookeeper寫入每個分區(qū)的offset
props.put("enable.auto.commit", "true");
// 往zookeeper上寫offset的頻率
props.put("auto.commit.interval.ms", "1000");
// key的反序列化類型
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value的反序列化類型
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//2、創(chuàng)建KafkaConsumer
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(props);
// 3、訂閱數(shù)據(jù),這里的topic可以是多個
kafkaConsumer.subscribe(Arrays.asList("TEST_JAVA"));
// 4、獲取數(shù)據(jù)
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value());
}
}
}
}
3.測試
3.1.啟動zookeeper
運行一鍵啟動腳本:startzk.sh
3.2.啟動kafka
參考:Kafka之集群安裝
運行一鍵啟動腳本:startkafka.sh
3.3.啟動消費者
直接run KafkaConsumerClient.main 就會一直阻塞等待消息

3.4.啟動生產(chǎn)者
直接run KafkaProducerClient.main 就會往名為TEST_JAVA的Topic發(fā)送100條消息
可以看到消費者已經(jīng)消費了所有的數(shù)據(jù)

image.png
3.5.查看topics
kafka-topics.sh --describe --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka

其中TEST_JAVA就是我們剛才默認建的Topic