Spring-Kafka(四)—— KafkaTemplate發(fā)送消息及結(jié)果回調(diào)

在前幾章中,我們使用KafkaTemplate.send(String data)這個方法發(fā)送消息到Kafka中,顯然這個方法并不能滿足我們系統(tǒng)的需求,那我們需要查看一下KafkaTemplate所實現(xiàn)的接口,看看還提供了什么方法。當(dāng)我們發(fā)送消息到Kafka后,我們又怎么去確認(rèn)消息是否發(fā)送成功呢?這就涉及到KafkaTemplate的發(fā)送回調(diào)方法了。接下來我們開始正式講解。

查看發(fā)送接口

首先我們Ctrl+鼠標(biāo)左鍵進(jìn)入KafkaTemplate的源代碼中查看一下,可以看到有關(guān)發(fā)送的接口如下。這里的參數(shù)還是比較簡單的,值得一提的事,方法中有個Long類型的時間戳(timestamp)參數(shù),這是Kafka0.10版本提供的新功能,主要用來使用時間索引進(jìn)行查詢數(shù)據(jù)以及日志切分清除策略。還有一個ProducerRecord參數(shù),這個類其實就是整合了topic、partition、data等數(shù)據(jù)的消費實體類。


稍微提一下這些參數(shù)都是什么意思吧:
topic:這里填寫的是Topic的名字
partition:這里填寫的是分區(qū)的id,其實也是就第幾個分區(qū),id從0開始。表示指定發(fā)送到該分區(qū)中
timestamp:時間戳,一般默認(rèn)當(dāng)前時間戳
key:消息的鍵
data:消息的數(shù)據(jù)
ProducerRecord:消息對應(yīng)的封裝類,包含上述字段
Message<?>:Spring自帶的Message封裝類,包含消息及消息頭

ListenableFuture<SendResult<K, V>> sendDefault(V data);

ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, V data);

ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

ListenableFuture<SendResult<K, V>> send(Message<?> message);


使用sendDefault發(fā)送消息

首先在KafkaConfiguration編寫一個帶有默認(rèn)Topic參數(shù)的KafkaTemplate,同時為另外一個KafkaTemplate加上@Primary注解,@Primary注解的意思是在擁有多個同類型的Bean時優(yōu)先使用該Bean,到時候方便我們使用@Autowired注解自動注入。

    //這個是我們之前編寫的KafkaTemplate代碼,加入@Primary注解
    @Bean
    @Primary
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
        return template;
    }

    @Bean("defaultKafkaTemplate")
    public KafkaTemplate<Integer, String> defaultKafkaTemplate() {
        KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
        template.setDefaultTopic("topic.quick.default");
        return template;
    }

接著編寫測試方法,可以看到我們這里調(diào)用的是sendDefault方法,而且并沒有在方法參數(shù)上添加topicName,這是因為我們在聲明defaultKafkaTemplate這個Bean的時候添加了這行代碼 template.setDefaultTopic("topic.quick.default"),只要調(diào)用sendDefault方法,kafkaTemplate會自動把消息發(fā)送到名為"topic.quick.default"的Topic中。

    @Resource
    private KafkaTemplate defaultKafkaTemplate;

    @Test
    public void testDefaultKafkaTemplate() {
        defaultKafkaTemplate.sendDefault("I`m send msg to default topic");
    }
測試結(jié)果

這里也順便測試一下其他幾個吧。

    @Test
    public void testTemplateSend() {
        //發(fā)送帶有時間戳的消息
        kafkaTemplate.send("topic.quick.demo", 0, System.currentTimeMillis(), 0, "send message with timestamp");

        //使用ProducerRecord發(fā)送消息
        ProducerRecord record = new ProducerRecord("topic.quick.demo", "use ProducerRecord to send message");
        kafkaTemplate.send(record);

        //使用Message發(fā)送消息
        Map map = new HashMap();
        map.put(KafkaHeaders.TOPIC, "topic.quick.demo");
        map.put(KafkaHeaders.PARTITION_ID, 0);
        map.put(KafkaHeaders.MESSAGE_KEY, 0);
        GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map));
        kafkaTemplate.send(message);
    }


消息結(jié)果回調(diào)

一般來說我們都會去獲取KafkaTemplate發(fā)送消息的結(jié)果去判斷消息是否發(fā)送成功,如果消息發(fā)送失敗,則會重新發(fā)送或者執(zhí)行對應(yīng)的業(yè)務(wù)邏輯。所以這里我們?nèi)崿F(xiàn)這個功能。

KafkaSendResultHandler

第一步還是編寫一個消息結(jié)果回調(diào)類KafkaSendResultHandler。當(dāng)我們使用KafkaTemplate發(fā)送消息成功的時候回調(diào)用OnSuccess方法,發(fā)送失敗則會調(diào)用onError方法。

@Component
public class KafkaSendResultHandler implements ProducerListener {

    private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);

    @Override
    public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
        log.info("Message send success : " + producerRecord.toString());
    }

    @Override
    public void onError(ProducerRecord producerRecord, Exception exception) {
        log.info("Message send error : " + producerRecord.toString());
    }
}



接下來就使用KafkaSendResultHandler實現(xiàn)消息發(fā)送結(jié)果回調(diào),這里為什么又要休眠,稍后進(jìn)行講解

    @Autowired
    private KafkaSendResultHandler producerListener;

    @Test
    public void testProducerListen() throws InterruptedException {
        kafkaTemplate.setProducerListener(producerListener);
        kafkaTemplate.send("topic.quick.demo", "test producer listen");
        Thread.sleep(1000);
    }



運行測試方法,我們可以看到控制臺輸出的日志如下

2018-09-08 15:51:39.975  INFO 10268 --- [ad | producer-1] c.v.k.handler.KafkaSendResultHandler     : Message send success : ProducerRecord(topic=topic.quick.demo, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=test producer listen, timestamp=null)

KafkaTemplate異步發(fā)送消息

上文提及了發(fā)送消息的時候需要休眠一下,否則發(fā)送時間較長的時候會導(dǎo)致進(jìn)程提前關(guān)閉導(dǎo)致無法調(diào)用回調(diào)時間。主要是因為KafkaTemplate發(fā)送消息是采取異步方式發(fā)送的,我們可以看下KafkaTemplate的源代碼


這是我們剛才調(diào)用的發(fā)送消息方法,可以看到KafkaTemplate會使用ProducerRecord把我們傳遞進(jìn)來的參數(shù)再一次封裝,最后調(diào)用doSend方法發(fā)送消息到Kafka中

send(String topic, V data)
    public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data);
        return this.doSend(producerRecord);
    }


ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord)

doSend方法先是檢測是否開啟事務(wù),緊接著使用SettableListenableFuture發(fā)送消息,然后判斷是否啟動自動沖洗數(shù)據(jù)到Kafka中,我們再接著看看SettableListenableFuture實現(xiàn)了什么接口

    protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
        if (this.transactional) {
            Assert.state(this.inTransaction(), "No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");
        }

        final Producer<K, V> producer = this.getTheProducer();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sending: " + producerRecord);
        }

        final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture();
        producer.send(producerRecord, new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                try {
                    if (exception == null) {
                        future.set(new SendResult(producerRecord, metadata));
                        if (KafkaTemplate.this.producerListener != null) {
                            KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata);
                        }

                        if (KafkaTemplate.this.logger.isTraceEnabled()) {
                            KafkaTemplate.this.logger.trace("Sent ok: " + producerRecord + ", metadata: " + metadata);
                        }
                    } else {
                        future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
                        if (KafkaTemplate.this.producerListener != null) {
                            KafkaTemplate.this.producerListener.onError(producerRecord, exception);
                        }

                        if (KafkaTemplate.this.logger.isDebugEnabled()) {
                            KafkaTemplate.this.logger.debug("Failed to send: " + producerRecord, exception);
                        }
                    }
                } finally {
                    if (!KafkaTemplate.this.transactional) {
                        KafkaTemplate.this.closeProducer(producer, false);
                    }

                }

            }
        });
        if (this.autoFlush) {
            this.flush();
        }

        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Sent: " + producerRecord);
        }

        return future;
    }



可以看到SettableListenableFuture實現(xiàn)了ListenableFuture接口,ListenableFuture則實現(xiàn)了Future接口,F(xiàn)uture是Java自帶的實現(xiàn)異步編程的接口,支持返回值的異步,而我們使用Thread或者Runnable都是不帶返回值的。

public class SettableListenableFuture<T> implements ListenableFuture<T>
public interface ListenableFuture<T> extends Future<T> 


KafkaTemplate同步發(fā)送消息

KafkaTemplate異步發(fā)送消息大大的提升了生產(chǎn)者的并發(fā)能力,但某些場景下我們并不需要異步發(fā)送消息,這個時候我們可以采取同步發(fā)送方式,實現(xiàn)也是非常簡單的,我們只需要在send方法后面調(diào)用get方法即可。Future模式中,我們采取異步執(zhí)行事件,等到需要返回值得時候我們再調(diào)用get方法獲取future的返回值

    @Test
    public void testSyncSend() throws ExecutionException, InterruptedException {
        kafkaTemplate.send("topic.quick.demo", "test sync send message").get();
    }



get方法還有一個比較有意思的重載方法,get(long timeout, TimeUnit unit),當(dāng)send方法耗時大于get方法所設(shè)定的參數(shù)時會拋出一個超時異常,但需要注意,這里僅拋出異常,消息還是會發(fā)送成功的。這里的測試方法設(shè)置send耗時必須小于 一微秒(那必須得失敗呀,嘿嘿嘿),運行后我們可以看到拋出的異常,但也發(fā)現(xiàn)消息能發(fā)送成功并被監(jiān)聽器接收了。那這功能有什么作用呢,如果還沒有接觸過SQL慢查詢可以去了解一下,使用該方法作為SQL慢查詢記錄的條件。

    @Test
    public void testTimeOut() throws ExecutionException, InterruptedException, TimeoutException {
        kafkaTemplate.send("topic.quick.demo", "test send message timeout").get(1,TimeUnit.MICROSECONDS);
    }
2018-09-08 16:36:09.110  INFO 7724 --- [     demo-0-C-1] com.viu.kafka.listen.DemoListener        : demo receive : test send message timeout

java.util.concurrent.TimeoutException


更多文章請關(guān)注該 Spring-Kafka史上最強入門教程 專題

博主常駐地~ http://blog.seasedge.cn/archives/15.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Web MVC Spring Web MVC 是包含在 Spring 框架中的 Web 框架,建立于...
    Hsinwong閱讀 22,955評論 1 92
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,628評論 19 139
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,208評論 2 11
  • 到底好不好用呢…… 找個圖片
    時了個光閱讀 286評論 0 0
  • 也許以前是從來沒有在外面過年的經(jīng)歷,又或許是以前的我從來就沒有在意,然而今年的春節(jié)卻讓我深深地感受到了一個普通的農(nóng)...
    小楊林閱讀 331評論 0 0

友情鏈接更多精彩內(nèi)容