1. 優(yōu)雅的退出消費(fèi)者程序
package com.bonc.rdpe.kafka110.consumer;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
/**
* @author YangYunhe
* @date 2018-07-17 11:05:39
* @description: 優(yōu)雅的退出消費(fèi)者
*/
public class QuitConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092");
props.put("group.id", "dev3-yangyunhe-topic001-group005");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("dev3-yangyunhe-topic001"));
final Thread mainThread = Thread.currentThread();
/*
* 退出循環(huán)需要通過另一個(gè)線程調(diào)用consumer.wakeup()方法
* 調(diào)用consumer.wakeup()可以退出poll(),并拋出WakeupException異常
* 我們不需要處理 WakeupException,因?yàn)樗皇怯糜谔鲅h(huán)的一種方式
* consumer.wakeup()是消費(fèi)者唯一一個(gè)可以從其他線程里安全調(diào)用的方法
* 如果循環(huán)運(yùn)行在主線程里,可以在 ShutdownHook里調(diào)用該方法
*/
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
consumer.wakeup();
try {
// 主線程繼續(xù)執(zhí)行,以便可以關(guān)閉consumer,提交偏移量
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = " + record.topic() + ", partition = " + record.partition()
+ ", offset = " + record.offset());
}
consumer.commitAsync();
}
}catch (WakeupException e) {
// 不處理異常
} finally {
// 在退出線程之前調(diào)用consumer.close()是很有必要的,它會(huì)提交任何還沒有提交的東西,并向組協(xié)調(diào)器發(fā)送消息,告知自己要離開群組。
// 接下來就會(huì)觸發(fā)再均衡,而不需要等待會(huì)話超時(shí)。
consumer.commitSync();
consumer.close();
System.out.println("Closed consumer and we are done");
}
}
}
2. 多線程消費(fèi)者
KafkaConsumer是非線程安全的,多線程需要處理好線程同步,多線程的實(shí)現(xiàn)方式有多種,這里介紹一種:每個(gè)線程各自實(shí)例化一個(gè)KakfaConsumer對象,這種方式的缺點(diǎn)是:當(dāng)這些線程屬于同一個(gè)消費(fèi)組時(shí),線程的數(shù)量受限于分區(qū)數(shù),當(dāng)消費(fèi)者線程的數(shù)量大于分區(qū)數(shù)時(shí),就有一部分消費(fèi)線程一直處于空閑狀態(tài)
多線程消費(fèi)者的線程實(shí)現(xiàn)類代碼如下:
package com.bonc.rdpe.kafka110.thread;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
/**
* @author YangYunhe
* @date 2018-07-17 10:48:45
* @description: 多線程消費(fèi)者的線程實(shí)現(xiàn)類
*/
public class ConsumerLoop implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public ConsumerLoop(int id, String groupId, List<String> topics) {
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092");
props.put("group.id", groupId);
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(this.id + ": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}
多線程消費(fèi)者主程序代碼如下:
package com.bonc.rdpe.kafka110.consumer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.bonc.rdpe.kafka110.thread.ConsumerLoop;
/**
* @author YangYunhe
* @date 2018-07-17 10:39:25
* @description: 多線程消費(fèi)者主程序
*/
public class MultiThreadConsumer {
public static void main(String[] args) {
int numConsumers = 3;
String groupId = "dev3-yangyunhe-topic001-group004";
List<String> topics = Arrays.asList("dev3-yangyunhe-topic001");
ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
final List<ConsumerLoop> consumers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
consumers.add(consumer);
executor.submit(consumer);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (ConsumerLoop consumer : consumers) {
consumer.shutdown();
}
executor.shutdown();
try {
executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
運(yùn)行結(jié)果:
2: {partition=2, offset=1216, value=...}
......
1: {partition=1, offset=1329, value=...}
......
0: {partition=0, offset=1292, value=...}
......
3. 獨(dú)立消費(fèi)者
有時(shí)候你可能只需要一個(gè)消費(fèi)者從一個(gè)主題的所有分區(qū)或者某個(gè)特定的分區(qū)讀取數(shù)據(jù)。這個(gè)時(shí)候就不需要消費(fèi)者群組和再均衡了,只需要把主題或者分區(qū)分配給消費(fèi)者,然后開始讀取消息并提交偏移量。如果是這樣的話,就不需要訂閱主題,取而代之的是為自己分配分區(qū)。一個(gè)消費(fèi)者可以訂閱主題(并加入消費(fèi)者群組),或者為自己分配分區(qū),但不能同時(shí)做這兩件事情。以下是獨(dú)立消費(fèi)者的示例代碼:
package com.bonc.rdpe.kafka110.consumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
/**
* @author YangYunhe
* @date 2018-07-17 12:44:50
* @description: 獨(dú)立消費(fèi)者
*/
public class AloneConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092");
// 獨(dú)立消費(fèi)者不需要設(shè)置消費(fèi)組
// props.put("group.id", "dev3-yangyunhe-topic001-group003");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.offset", false);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
/*
* consumer.partitionsFor(topic)用于獲取topic的分區(qū)信息
* 當(dāng)有新的分區(qū)加入或者原有的分區(qū)被改變后,這個(gè)方法是不能動(dòng)態(tài)感知的
* 所以要么周期性的執(zhí)行這個(gè)方法,要么當(dāng)分區(qū)數(shù)改變的時(shí)候,你需要重新執(zhí)行這個(gè)程序
*/
List<PartitionInfo> partitionInfos = consumer.partitionsFor("dev3-yangyunhe-topic001");
List<TopicPartition> partitions = new ArrayList<>();
if(partitionInfos != null && partitionInfos.size() != 0) {
for (PartitionInfo partition : partitionInfos) {
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
consumer.assign(partitions);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("partition = " + record.partition() + ", offset = " + record.offset());
}
consumer.commitAsync();
}
} finally {
consumer.commitSync();
consumer.close();
}
}
}
}