下面展示的例子是kafka的客戶(hù)端的使用,包含了發(fā)送端的同步發(fā)送消息和異步發(fā)送消息的使用,以及接收端的消費(fèi)消息的使用,以及自定分區(qū)的使用
1.環(huán)境的搭建
需要配置kafka的集群環(huán)境: 可以參考http://www.itdecent.cn/p/d39ade36f606
需要依賴(lài)kafka的客戶(hù)端的jar,maven的依賴(lài)如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
2.kafka的發(fā)送端的同步發(fā)送和異步發(fā)送
這里可以參考kafkaProducer的api : http://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
在代碼執(zhí)行之前,我默認(rèn)在kafka自己創(chuàng)建了三個(gè)分區(qū)的mytopic,并且副本為1
bin/kafka-topics.sh --zookeeper 192.168.44.129:2181 --partitions 3 --replication-factor 1 --create --topic my-topic
代碼入下:
/**
* @Project: kafka
* @description: kafka的producer的同步發(fā)送和異步發(fā)送
* @author: sunkang
* @create: 2018-12-16 21:24
* @ModificationHistory who when What
**/
public class KafkaProducerDemo extends Thread {
private final KafkaProducer<Integer,String> producer;
private String topic;
//是否為異步發(fā)送
private boolean async;
public KafkaProducerDemo(String topic,boolean async){
Properties properties = new Properties();
//bootstrap.servers kafka的集群地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.44.129:9092,192.168.44.129:9093,192.168.44.129:9094");
//client.id 客戶(hù)端id
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducerDemo");
//acks =-1,表示集群中的所有成員都需要確認(rèn)
properties.put(ProducerConfig.ACKS_CONFIG,"-1");
//發(fā)送到同一分區(qū),批量發(fā)送數(shù)據(jù)包的大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
//兩次發(fā)送的時(shí)間間隔內(nèi),把所有的request進(jìn)行聚合在發(fā)送
properties.put(ProducerConfig.LINGER_MS_CONFIG,10);
//發(fā)送的消息的key的序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerSerializer");
//發(fā)送消息的value的序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<Integer, String>(properties);
this.topic = topic;
this.async = async;
}
public void run(){
int num = 0;
while (true){
String message = "message_"+num;
System.out.println("begin send message"+ message);
//異步發(fā)送
if(async){
producer.send(new ProducerRecord<Integer, String>(topic, num, message), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null){
e.printStackTrace();
}else{
System.out.println("async-offset:"+ recordMetadata.offset()+"partition:"+recordMetadata.partition());
}
}
});
}else{ //同步發(fā)送
Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<Integer, String>(topic,num,message));
try {
RecordMetadata recordMetadata = recordMetadataFuture.get();
System.out.println("sync-offset:"+ recordMetadata.offset()+"partition:"+ recordMetadata.partition());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
num++;
try {//間隔一秒之后在發(fā)送
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
//異步發(fā)送
new KafkaProducerDemo("my-topic",true).start();
}
}
輸出如下:默認(rèn)是有三個(gè)分區(qū)的,可以看到消息存儲(chǔ)的分區(qū)都不一樣,實(shí)現(xiàn)了消息的分片的作用
begin send messagemessage_0
async-offset:56->partition:1
begin send messagemessage_1
async-offset:59->partition:0
begin send messagemessage_2
async-offset:57->partition:2
3.kafka的消費(fèi)端的消費(fèi)
消費(fèi)端的例子可以參考官網(wǎng)KafkaConsumer的api: http://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
- 消費(fèi)端的自動(dòng)偏移量提交和手動(dòng)偏移提交
/**
* @Project: kafka
* @description:消費(fèi)端的自動(dòng)偏移量提交和手動(dòng)偏移提交
* @author: sunkang
* @create: 2018-12-16 21:34
* @ModificationHistory who when What
**/
public class KafkaConsumerDemo extends Thread{
private final KafkaConsumer kafkaConsumer;
private final boolean autoOffesetCommit;
public KafkaConsumerDemo(String topic, boolean autoOffesetCommitt) {
Properties properties=new Properties();
//服務(wù)地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"192.168.44.129:9092,192.168.44.129:9093,192.168.44.129:9094");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerDemo");
this.autoOffesetCommit = autoOffesetCommitt;
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoOffesetCommit == true? "true":"false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
kafkaConsumer=new KafkaConsumer(properties);
kafkaConsumer.subscribe(Collections.singletonList(topic));
}
public void run(){
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
//一次性批量確認(rèn)
final int minBatchSize = 10;
while (true){
if(this.autoOffesetCommit){//自動(dòng)偏移提交
//從broker拉取消息
ConsumerRecords<Integer,String> consumerRecords= kafkaConsumer.poll(100);
for(ConsumerRecord record : consumerRecords ){
System.out.println("message receive:"+ record.value()+"->offset:"+record.offset()+"->partition:"+record.partition()); }
}else{//需要手動(dòng)偏移量控制
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("message receive:"+ record.value()+"->offset:"+record.offset()+"->partition:"+record.partition());
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
//當(dāng)接收消息需要處理的進(jìn)行邏輯處理的時(shí)候,需要手動(dòng)偏移量控制,比如當(dāng)消息插入數(shù)據(jù)庫(kù)完全成功的時(shí)候, 才認(rèn)為消息完全消費(fèi)了
// insertIntoDb(buffer);
kafkaConsumer.commitSync();
buffer.clear();
}
}
}
}
public static void main(String[] args) {
new KafkaConsumerDemo("my-topic",true).start();
}
}
4.kafka自定義分區(qū)
自定義分區(qū)策略是根據(jù)消息的key來(lái)映射具體的分區(qū),需要實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner接口
/**
* 自定義分區(qū)策略
*/
public class MyPartition implements Partitioner {
private Random random=new Random();
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//獲得分區(qū)列表
List<PartitionInfo> partitionInfos=cluster.partitionsForTopic(topic);
int partitionNum=0;
if(key==null){
partitionNum=random.nextInt(partitionInfos.size()); //隨機(jī)分區(qū)
}else{
partitionNum=Math.abs((key.hashCode())%partitionInfos.size());
}
System.out.println("key ->"+key+"->value->"+value+"->"+partitionNum);
return partitionNum; //指定發(fā)送的分區(qū)值
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
如果要指定自己實(shí)現(xiàn)的自定義分區(qū)策略,需要增加partitioner.class的配置屬性
Properties properties=new Properties();
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.kafka.partion.MyPartition");
producer=new KafkaProducer<Integer, String>(properties);
5.配置信息分析
發(fā)送端的可選配置信息分析
acks
acks 配置表示 producer 發(fā)送消息到 broker 上以后的確認(rèn)值。有三個(gè)可選項(xiàng) :
0:表示 producer 不需要等待 broker 的消息確認(rèn)。這個(gè)選項(xiàng)時(shí)延最小但同時(shí)風(fēng)險(xiǎn)最大(因?yàn)楫?dāng) server 宕機(jī)時(shí),數(shù)據(jù)將會(huì)丟失)。
1:表示 producer 只需要獲得 kafka 集群中的 leader 節(jié)點(diǎn)確認(rèn)即可,這個(gè)選擇時(shí)延較小同時(shí)確保了 leader 節(jié)點(diǎn)確認(rèn)接收成功
all(-1):需要 ISR 中所有的 Replica 給予接收確認(rèn),速度最慢,安全性最
但是由于 ISR 可能會(huì)縮小到僅包含一個(gè) Replica,所以設(shè)置參數(shù)為 all 并不能一定避免數(shù)據(jù)丟失
batch.size
生產(chǎn)者發(fā)送多個(gè)消息到 broker 上的同一個(gè)分區(qū)時(shí),為了減少網(wǎng)絡(luò)請(qǐng)求帶來(lái)的性能開(kāi)銷(xiāo),通過(guò)批量的方式來(lái)提交消息,可以通過(guò)這個(gè)參數(shù)來(lái)控制批量提交的字節(jié)數(shù)大小,默認(rèn)大小是 16384byte,也就是 16kb,意味著當(dāng)一批消息大小達(dá)到指定的 batch.size 的時(shí)候會(huì)統(tǒng)一發(fā)送
linger.ms
Producer 默認(rèn)會(huì)把兩次發(fā)送時(shí)間間隔內(nèi)收集到的所有 Requests 進(jìn)行一次聚合然后再發(fā)送,以此提高吞吐量,而 linger.ms 就是為每次發(fā)送到 broker 的請(qǐng)求,增加一些 delay,以此來(lái)聚合更多的Message 請(qǐng)求。 這個(gè)有點(diǎn)想 TCP 里面的Nagle 算法,在 TCP 協(xié)議的傳輸中,為了減少大量小數(shù)據(jù)包的發(fā)送,采用了Nagle 算法,也就是基于小包的等停協(xié)議
batch.size 和 linger.ms 這兩個(gè)參數(shù)是 kafka 性能優(yōu)化的關(guān)鍵參數(shù),很多同學(xué)會(huì)發(fā)現(xiàn) batch.size 和 linger.ms 這兩者的作用是一樣的,如果兩個(gè)都配置了,那么怎么工作的呢?實(shí)際上,當(dāng)二者都配置的時(shí)候,只要滿(mǎn)足其中一個(gè)要求,就會(huì)發(fā)送請(qǐng)求到 broker 上
max.request.size
設(shè)置請(qǐng)求的數(shù)據(jù)的最大字節(jié)數(shù),為了防止發(fā)生較大的數(shù)據(jù)包影響到吞吐量,默認(rèn)值為 1MB
消費(fèi)端的可選配置分析
group.id
consumer group 是 kafka 提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)者機(jī)制。既然是一個(gè)組,那么組內(nèi)必然可以有多個(gè)消費(fèi)者或消費(fèi)者實(shí)例(consumer instance),它們共享一個(gè)公共的 ID,即 group ID。組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來(lái)消費(fèi)訂閱主題(subscribed topics)的所有分區(qū)(partition)。當(dāng)然,每個(gè)分區(qū)只能由同一個(gè)消費(fèi)
如下圖所示,分別有三個(gè)消費(fèi)者,屬于兩個(gè)不同的 group,那么對(duì)于 firstTopic 這個(gè) topic 來(lái)說(shuō),這兩個(gè)組的消費(fèi)者都能同時(shí)消費(fèi)這個(gè) topic 中的消息,對(duì)于此事的架構(gòu)來(lái)說(shuō),這個(gè) firstTopic 就類(lèi)似于 ActiveMQ 中的 topic 概念。
如最下圖所示,如果 3 個(gè)消費(fèi)者都屬于同一個(gè)group,那么此事 firstTopic 就是一個(gè) Queue 的概念

enable.auto.commit
消費(fèi)者消費(fèi)消息以后自動(dòng)提交,只有當(dāng)消息提交以后,該消息才不會(huì)被再次接收到,還可以配合 auto.commit.interval.ms 控制自動(dòng)提交的頻率。
當(dāng)然,我們也可以通過(guò) consumer.commitSync()的方式實(shí)現(xiàn)手動(dòng)提交
auto.offset.reset
這個(gè)參數(shù)是針對(duì)新的 groupid 中的消費(fèi)者而言的,當(dāng)有新 groupid 的消費(fèi)者來(lái)消費(fèi)指定的 topic 時(shí),對(duì)于該參數(shù)的配置,會(huì)有不同的語(yǔ)義
auto.offset.reset=latest 情況下,新的消費(fèi)者將會(huì)從其他消費(fèi)者最后消費(fèi)的offset 處開(kāi)始消費(fèi) Topic 下的消息
auto.offset.reset= earliest 情況下,新的消費(fèi)者會(huì)從該 topic 最早的消息開(kāi)始消費(fèi)
auto.offset.reset=none 情況下,新的消費(fèi)者加入以后,由于之前不存在offset,則會(huì)直接拋出異常。
max.poll.records
此設(shè)置限制每次調(diào)用 poll 返回的消息數(shù),這樣可以更容易的預(yù)測(cè)每次 poll 間隔要處理的最大值。通過(guò)調(diào)整此值,可以減少 poll 間隔
6.與spring-kafka集成
依賴(lài)的maven的配置如下
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.4.RELEASE</version>
</dependency>
發(fā)送端的producerKafka.xml的spring配置如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="192.168.44.129:9092,192.168.44.129:9093,192.168.44.129:9094"/>
<entry key="client.id" value="sping-kafka-producer"/>
<entry key="acks" value="-1"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
</map>
</constructor-arg>
</bean>
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg ref="producerProperties"/>
</bean>
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory"/>
<constructor-arg name="autoFlush" value="true"/>
</bean>
</beans>
消費(fèi)端的配置consumerKafka.xml的配置如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="192.168.44.129:9092,192.168.44.129:9093,192.168.44.129:9094"/>
<entry key="group.id" value="registryConsumer"/>
<entry key="enable.auto.commit" value="true"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg ref="consumerProperties"/>
</bean>
<bean id="registryListener" class="com.kafka.spring.RegistryListener"/>
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="my-topic"/>
<property name="messageListener" ref="registryListener"/>
</bean>
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
</bean>
</beans>
發(fā)送端的啟動(dòng)代碼
/**
* @Project: kafka
* @description: 通過(guò)KafkaTemplate進(jìn)行發(fā)送消息
* @author: sunkang
* @create: 2018-12-23 18:52
* @ModificationHistory who when What
**/
public class SpringKafkaProducerDemo {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:producerKafka.xml");
KafkaTemplate kafkaTemplate = context.getBean("kafkaTemplate", KafkaTemplate.class);
kafkaTemplate.send("my-topic",1,"message_1");
kafkaTemplate.send("my-topic",2,"message_2");
}
}
消費(fèi)端的啟動(dòng)代碼
/**
* @Project: kafka
* @description: 消費(fèi)端的啟動(dòng)代碼
* @author: sunkang
* @create: 2018-12-23 18:51
* @ModificationHistory who when What
**/
public class SpringKafkaConsumerDemo {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("consumerKafka.xml");
}
}
//設(shè)置消息監(jiān)聽(tīng)類(lèi)
class RegistryListener implements MessageListener<Integer,String> {
@Override
public void onMessage(ConsumerRecord<Integer, String> integerStringConsumerRecord) {
System.out.println("收到了消息");
System.out.println("key:"+integerStringConsumerRecord.key()+"->value:"+integerStringConsumerRecord.value());
}
}