昨天在測試環(huán)境搭建了一套zookeeper+kafka(各一臺)的機器,開始進行kafka的實踐之旅。昨天下班前一直都出現(xiàn)無法發(fā)送無法接收的問題,今天終于搞定了。
zookeeper的安裝
直接從官網(wǎng)下載bin包后,解壓即可
tar -zxvf zookeeper-3.4.9.tar.gz
需要修改的配置有:
- 把conf目錄下的zoo_sample.cfg改名為zoo.cfg(并修改dataDir)
- 修改bin目錄下的zkEnv.sh腳本中的ZOO_LOG_DIR和ZOO_LOG4J_PROP
啟動zookeeper
bin/zkServer.sh start
Kafka的安裝
由于只使用了一個broker,所以直接解壓包
tar -zxvf kafka_2.11-0.10.2.0.tgz
需要修改的配置為config/server.properties文件,主要修改的有l(wèi)og.dirs和listeners。
listeners=PLAINTEXT://localhost:9092
這里有個坑,server.properties中一定要配置host.name或者listeners,不然會出現(xiàn)無法收發(fā)消息的現(xiàn)象
然后啟動即可
bin/kafka-server-start.sh config/server.properties &
客戶端
安裝完以后需要寫生產(chǎn)者的消費者了,直接用最簡單的方法來寫。
Producer
package producer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class Producer {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties props = new Properties();
props.put("bootstrap.servers","122.20.109.68:9092");
props.put("acks","1");
props.put("retries","0");
props.put("batch.size","16384");
// props.put("linger.ms","1");
// props.put("buffer.memory","33554432");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//生產(chǎn)者的建立
KafkaProducer producer = new KafkaProducer<>(props);
for (int i=0;i<100;i++) {
System.out.println("seding message "+i);
ProducerRecord record = new ProducerRecord("testTopic",String.valueOf(i),"this is message"+i);
producer.send(record, new Callback() {
public void onCompletion (RecordMetadata metadata, Exception e) {
if (null != e) {
e.printStackTrace();
} else {
System.out.println(metadata.offset());
}
}
});
}
Thread.sleep(100000);
producer.close();
}
}
這里有個坑,如果我直接用producer.send(ProducerRecord)方法,發(fā)完100條以后producer.close(),會導(dǎo)致Kafka無法收到消息,懷疑是異步發(fā)送導(dǎo)致的,需要真的發(fā)送到Kafka以后才能停止Producer,所以我在后面sleep了一下,加上以后就可以正常發(fā)送了。
使用callback是異步發(fā)送,此外還能使用同步發(fā)送,直接在send方法后加上一個get方法就會直接阻塞直到broker返回消息已收到。
producer.send(record).get();
Producer的properties有幾個常用配置:
- bootstrap.servers:Kafka集群連接串,可以由多個host:port組成
- acks:broker消息確認的模式,有三種:
0:不進行消息接收確認,即Client端發(fā)送完成后不會等待Broker的確認
1:由Leader確認,Leader接收到消息后會立即返回確認信息
all:集群完整確認,Leader會等待所有in-sync的follower節(jié)點都確認收到消息后,再返回確認信息
我們可以根據(jù)消息的重要程度,設(shè)置不同的確認模式。默認為1 - retries:發(fā)送失敗時Producer端的重試次數(shù),默認為0
- batch.size:當(dāng)同時有大量消息要向同一個分區(qū)發(fā)送時,Producer端會將消息打包后進行批量發(fā)送。如果設(shè)置為0,則每條消息都DuLi發(fā)送。默認為16384字節(jié)
- linger.ms:發(fā)送消息前等待的毫秒數(shù),與batch.size配合使用。在消息負載不高的情況下,配置linger.ms能夠讓Producer在發(fā)送消息前等待一定時間,以積累更多的消息打包發(fā)送,達到節(jié)省網(wǎng)絡(luò)資源的目的。默認為0
- key.serializer/value.serializer:消息key/value的序列器Class,根據(jù)key和value的類型決定
- buffer.memory:消息緩沖池大小。尚未被發(fā)送的消息會保存在Producer的內(nèi)存中,如果消息產(chǎn)生的速度大于消息發(fā)送的速度,那么緩沖池滿后發(fā)送消息的請求會被阻塞。默認33554432字節(jié)(32MB)
Consumer
package consumer;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class Consumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers","122.20.109.68:9092");
props.put("group.id","test");
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms","1000");
props.put("session.timeout.ms","30000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("testTopic"));
while(true) {
ConsumerRecords records = consumer.poll(1000);
for (ConsumerRecord record: records) {
System.out.println("offset "+record.offset()+" Message: "+record.value());
}
}
}
}
Consumer的Properties的常用配置有:
- bootstrap.servers/key.deserializer/value.deserializer:和Producer端的含義一樣,不再贅述
- fetch.min.bytes:每次最小拉取的消息大?。╞yte)。Consumer會等待消息積累到一定尺寸后進行批量拉取。默認為1,代表有一條就拉一條
- max.partition.fetch.bytes:每次從單個分區(qū)中拉取的消息最大尺寸(byte),默認為1M
- group.id:Consumer的group id,同一個group下的多個Consumer不會拉取到重復(fù)的消息,不同group下的Consumer則會保證拉取到每一條消息。注意,同一個group下的consumer數(shù)量不能超過分區(qū)數(shù)。
- enable.auto.commit:是否自動提交已拉取消息的offset。提交offset即視為該消息已經(jīng)成功被消費,該組下的Consumer無法再拉取到該消息(除非手動修改offset)。默認為true
- auto.commit.interval.ms:自動提交offset的間隔毫秒數(shù),默認5000。