在前幾章中,我們使用KafkaTemplate.send(String data)這個方法發(fā)送消息到Kafka中,顯然這個方法并不能滿足我們系統(tǒng)的需求,那我們需要查看一下KafkaTemplate所實現(xiàn)的接口,看看還提供了什么方法。當(dāng)我們發(fā)送消息到Kafka后,我們又怎么去確認(rèn)消息是否發(fā)送成功呢?這就涉及到KafkaTemplate的發(fā)送回調(diào)方法了。接下來我們開始正式講解。
查看發(fā)送接口
首先我們Ctrl+鼠標(biāo)左鍵進(jìn)入KafkaTemplate的源代碼中查看一下,可以看到有關(guān)發(fā)送的接口如下。這里的參數(shù)還是比較簡單的,值得一提的事,方法中有個Long類型的時間戳(timestamp)參數(shù),這是Kafka0.10版本提供的新功能,主要用來使用時間索引進(jìn)行查詢數(shù)據(jù)以及日志切分清除策略。還有一個ProducerRecord參數(shù),這個類其實就是整合了topic、partition、data等數(shù)據(jù)的消費實體類。
稍微提一下這些參數(shù)都是什么意思吧:
topic:這里填寫的是Topic的名字
partition:這里填寫的是分區(qū)的id,其實也是就第幾個分區(qū),id從0開始。表示指定發(fā)送到該分區(qū)中
timestamp:時間戳,一般默認(rèn)當(dāng)前時間戳
key:消息的鍵
data:消息的數(shù)據(jù)
ProducerRecord:消息對應(yīng)的封裝類,包含上述字段
Message<?>:Spring自帶的Message封裝類,包含消息及消息頭
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
使用sendDefault發(fā)送消息
首先在KafkaConfiguration編寫一個帶有默認(rèn)Topic參數(shù)的KafkaTemplate,同時為另外一個KafkaTemplate加上@Primary注解,@Primary注解的意思是在擁有多個同類型的Bean時優(yōu)先使用該Bean,到時候方便我們使用@Autowired注解自動注入。
//這個是我們之前編寫的KafkaTemplate代碼,加入@Primary注解
@Bean
@Primary
public KafkaTemplate<Integer, String> kafkaTemplate() {
KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
return template;
}
@Bean("defaultKafkaTemplate")
public KafkaTemplate<Integer, String> defaultKafkaTemplate() {
KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
template.setDefaultTopic("topic.quick.default");
return template;
}
接著編寫測試方法,可以看到我們這里調(diào)用的是sendDefault方法,而且并沒有在方法參數(shù)上添加topicName,這是因為我們在聲明defaultKafkaTemplate這個Bean的時候添加了這行代碼 template.setDefaultTopic("topic.quick.default"),只要調(diào)用sendDefault方法,kafkaTemplate會自動把消息發(fā)送到名為"topic.quick.default"的Topic中。
@Resource
private KafkaTemplate defaultKafkaTemplate;
@Test
public void testDefaultKafkaTemplate() {
defaultKafkaTemplate.sendDefault("I`m send msg to default topic");
}

這里也順便測試一下其他幾個吧。
@Test
public void testTemplateSend() {
//發(fā)送帶有時間戳的消息
kafkaTemplate.send("topic.quick.demo", 0, System.currentTimeMillis(), 0, "send message with timestamp");
//使用ProducerRecord發(fā)送消息
ProducerRecord record = new ProducerRecord("topic.quick.demo", "use ProducerRecord to send message");
kafkaTemplate.send(record);
//使用Message發(fā)送消息
Map map = new HashMap();
map.put(KafkaHeaders.TOPIC, "topic.quick.demo");
map.put(KafkaHeaders.PARTITION_ID, 0);
map.put(KafkaHeaders.MESSAGE_KEY, 0);
GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map));
kafkaTemplate.send(message);
}
消息結(jié)果回調(diào)
一般來說我們都會去獲取KafkaTemplate發(fā)送消息的結(jié)果去判斷消息是否發(fā)送成功,如果消息發(fā)送失敗,則會重新發(fā)送或者執(zhí)行對應(yīng)的業(yè)務(wù)邏輯。所以這里我們?nèi)崿F(xiàn)這個功能。
KafkaSendResultHandler
第一步還是編寫一個消息結(jié)果回調(diào)類KafkaSendResultHandler。當(dāng)我們使用KafkaTemplate發(fā)送消息成功的時候回調(diào)用OnSuccess方法,發(fā)送失敗則會調(diào)用onError方法。
@Component
public class KafkaSendResultHandler implements ProducerListener {
private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
log.info("Message send success : " + producerRecord.toString());
}
@Override
public void onError(ProducerRecord producerRecord, Exception exception) {
log.info("Message send error : " + producerRecord.toString());
}
}
接下來就使用KafkaSendResultHandler實現(xiàn)消息發(fā)送結(jié)果回調(diào),這里為什么又要休眠,稍后進(jìn)行講解
@Autowired
private KafkaSendResultHandler producerListener;
@Test
public void testProducerListen() throws InterruptedException {
kafkaTemplate.setProducerListener(producerListener);
kafkaTemplate.send("topic.quick.demo", "test producer listen");
Thread.sleep(1000);
}
運行測試方法,我們可以看到控制臺輸出的日志如下
2018-09-08 15:51:39.975 INFO 10268 --- [ad | producer-1] c.v.k.handler.KafkaSendResultHandler : Message send success : ProducerRecord(topic=topic.quick.demo, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=test producer listen, timestamp=null)
KafkaTemplate異步發(fā)送消息
上文提及了發(fā)送消息的時候需要休眠一下,否則發(fā)送時間較長的時候會導(dǎo)致進(jìn)程提前關(guān)閉導(dǎo)致無法調(diào)用回調(diào)時間。主要是因為KafkaTemplate發(fā)送消息是采取異步方式發(fā)送的,我們可以看下KafkaTemplate的源代碼
這是我們剛才調(diào)用的發(fā)送消息方法,可以看到KafkaTemplate會使用ProducerRecord把我們傳遞進(jìn)來的參數(shù)再一次封裝,最后調(diào)用doSend方法發(fā)送消息到Kafka中
send(String topic, V data)
public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data);
return this.doSend(producerRecord);
}
ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord)
doSend方法先是檢測是否開啟事務(wù),緊接著使用SettableListenableFuture發(fā)送消息,然后判斷是否啟動自動沖洗數(shù)據(jù)到Kafka中,我們再接著看看SettableListenableFuture實現(xiàn)了什么接口
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
if (this.transactional) {
Assert.state(this.inTransaction(), "No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");
}
final Producer<K, V> producer = this.getTheProducer();
if (this.logger.isTraceEnabled()) {
this.logger.trace("Sending: " + producerRecord);
}
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture();
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
try {
if (exception == null) {
future.set(new SendResult(producerRecord, metadata));
if (KafkaTemplate.this.producerListener != null) {
KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata);
}
if (KafkaTemplate.this.logger.isTraceEnabled()) {
KafkaTemplate.this.logger.trace("Sent ok: " + producerRecord + ", metadata: " + metadata);
}
} else {
future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
if (KafkaTemplate.this.producerListener != null) {
KafkaTemplate.this.producerListener.onError(producerRecord, exception);
}
if (KafkaTemplate.this.logger.isDebugEnabled()) {
KafkaTemplate.this.logger.debug("Failed to send: " + producerRecord, exception);
}
}
} finally {
if (!KafkaTemplate.this.transactional) {
KafkaTemplate.this.closeProducer(producer, false);
}
}
}
});
if (this.autoFlush) {
this.flush();
}
if (this.logger.isTraceEnabled()) {
this.logger.trace("Sent: " + producerRecord);
}
return future;
}
可以看到SettableListenableFuture實現(xiàn)了ListenableFuture接口,ListenableFuture則實現(xiàn)了Future接口,F(xiàn)uture是Java自帶的實現(xiàn)異步編程的接口,支持返回值的異步,而我們使用Thread或者Runnable都是不帶返回值的。
public class SettableListenableFuture<T> implements ListenableFuture<T>
public interface ListenableFuture<T> extends Future<T>
KafkaTemplate同步發(fā)送消息
KafkaTemplate異步發(fā)送消息大大的提升了生產(chǎn)者的并發(fā)能力,但某些場景下我們并不需要異步發(fā)送消息,這個時候我們可以采取同步發(fā)送方式,實現(xiàn)也是非常簡單的,我們只需要在send方法后面調(diào)用get方法即可。Future模式中,我們采取異步執(zhí)行事件,等到需要返回值得時候我們再調(diào)用get方法獲取future的返回值
@Test
public void testSyncSend() throws ExecutionException, InterruptedException {
kafkaTemplate.send("topic.quick.demo", "test sync send message").get();
}
get方法還有一個比較有意思的重載方法,get(long timeout, TimeUnit unit),當(dāng)send方法耗時大于get方法所設(shè)定的參數(shù)時會拋出一個超時異常,但需要注意,這里僅拋出異常,消息還是會發(fā)送成功的。這里的測試方法設(shè)置send耗時必須小于 一微秒(那必須得失敗呀,嘿嘿嘿),運行后我們可以看到拋出的異常,但也發(fā)現(xiàn)消息能發(fā)送成功并被監(jiān)聽器接收了。那這功能有什么作用呢,如果還沒有接觸過SQL慢查詢可以去了解一下,使用該方法作為SQL慢查詢記錄的條件。
@Test
public void testTimeOut() throws ExecutionException, InterruptedException, TimeoutException {
kafkaTemplate.send("topic.quick.demo", "test send message timeout").get(1,TimeUnit.MICROSECONDS);
}
2018-09-08 16:36:09.110 INFO 7724 --- [ demo-0-C-1] com.viu.kafka.listen.DemoListener : demo receive : test send message timeout
java.util.concurrent.TimeoutException