四、RocketMQ-Producer的Send方法

一、概述

RocketMQ的producer默認(rèn)有兩個,一個是DefaultMQProducer,另一個是TransactionMQProducer,本文只對send方法做一個總結(jié),其他的細(xì)節(jié)在其他章節(jié)介紹

二、DefaultMQProducer

一共定義了17種send方法,從4.x版本,事務(wù)消息被放到了TransactionMQProducer中,所以有15個send方法,這15個方法中,又有兩個異步帶超時時間的send方法被廢棄了,所以有效的send方法有13個:

1、同步發(fā)送

/**
     * 同步發(fā)送模式. 只有消息被成功接收并且被固化完成后才會收到反饋。
     * 內(nèi)置有重發(fā)機(jī)制, producer將會重試
     * {@link #retryTimesWhenSendFailed,default=2} 次 ,然后才會報錯. 
     * 因此,有一定的概率向broker發(fā)送重復(fù)的消息
     * 使用者有責(zé)任去解決潛在的重復(fù)數(shù)據(jù)造成的影響
     * @param msg 待發(fā)送數(shù)據(jù)
     * @return {@link SendResult} 實體,來通知發(fā)送者發(fā)送狀態(tài)等信息, 比如消息的ID
     * {@link SendStatus} 指明 broker 存儲/復(fù)制 的狀態(tài), 發(fā)送到了哪個隊列等等
     * @throws MQClientException 客戶端異常
     * @throws RemotingException 網(wǎng)絡(luò)連接異常
     * @throws MQBrokerException broker異常
     * @throws InterruptedException 發(fā)送線程中斷異常
     */
    @Override
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg);
    }

2、同步發(fā)送,帶超時時間

/**
     * 與 {@link #send(Message)} 相同,只不過多了超時時間的指定.
     *
     * @param msg 待發(fā)送消息
     * @param timeout 發(fā)送超時時間
     * @return {@link SendResult} 同上
     * {@link SendStatus} 同上
     * @throws MQClientException 客戶端異常
     * @throws RemotingException 網(wǎng)絡(luò)連接異常
     * @throws MQBrokerException broker異常
     * @throws InterruptedException 發(fā)送線程中斷異常
     */
    @Override
    public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, timeout);
    }

3、異步發(fā)送

/**
     * 異步發(fā)送消息
     * 消息發(fā)送后,立即返回。broker處理返程后, 觸發(fā)sendCallback回調(diào)方法
     * 與上面一樣,在給出發(fā)送失敗標(biāo)志前,會嘗試2次,所以開發(fā)者要處理重復(fù)發(fā)送帶來的問題
     * @param msg 待發(fā)送消息
     * @param sendCallback 回調(diào)函數(shù)
     * @throws MQClientException 客戶端異常
     * @throws RemotingException 網(wǎng)絡(luò)異常
     * @throws InterruptedException 發(fā)送線程中斷異常
     */
    @Override
    public void send(Message msg,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, sendCallback);
    }

4、異步發(fā)送,帶超時時間

@Override
    public void send(Message msg, SendCallback sendCallback, long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
    }

5、單向發(fā)送,不等待broker回饋

/**
     * 發(fā)送方法不會等待broker的反饋,只會一直發(fā)
     * 所以有很高的吞吐量,但是有一定概率丟失消息
     *
     * @param msg 待發(fā)送消息
     * @throws MQClientException 客戶端異常
     * @throws RemotingException 網(wǎng)絡(luò)異常
     * @throws InterruptedException 發(fā)送線程中斷異常
     */
    @Override
    public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.sendOneway(msg);
    }

6、同步發(fā)送,指定隊列

/**
     * 同步發(fā)送,指定隊列
     * @param msg 待發(fā)送消息
     * @param mq 指定的消息隊列
     * @return {@link SendResult} 同上
     * {@link SendStatus} 同上
     */
    @Override
    public SendResult send(Message msg, MessageQueue mq)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, mq);
    }

7、同步發(fā)送,指定隊列,并附帶超時時間

@Override
    public SendResult send(Message msg, MessageQueue mq, long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, mq, timeout);
    }

8、異步發(fā)送,指定隊列

@Override
    public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, mq, sendCallback);
    }

9、異步發(fā)送,指定隊列,附帶超時時間

這個在4.4.0版本被設(shè)置為廢棄,后續(xù)版本會給出

/**
  * 因為在處超時異常存在問題,所以廢棄
 */
@Override
    public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
    }

10、單向發(fā)送,指定隊列

@Override
    public void sendOneway(Message msg,
        MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.sendOneway(msg, mq);
    }

11、同步發(fā)送,指定隊列選擇策略

官方的有序消息的DEMO就是基于隊列選擇器做的,讓一些列有序的消息(相同ID)發(fā)送到同一個隊列

/**
     * 指定隊列選擇策略MessageQueueSelector 
     *
     * @param msg 待發(fā)送消息
     * @param selector 隊列選擇器
     * @param arg 配合隊列選擇器選擇隊列的參數(shù),一般可以是業(yè)務(wù)參數(shù)(ID等)
     * @return {@link SendResult} 同上
     * {@link SendStatus} 同上
     */
@Override
    public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, selector, arg);
    }

12、同步發(fā)送消息,指定隊列選擇策略,并附帶超時時間

@Override
    public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);
    }

13、異步發(fā)送消息,指定隊列選擇策略

@Override
    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
    }

14、異步發(fā)送消息,指定隊列選擇策略,并附帶超時時間

這個方法在4.4.0版本廢棄,后續(xù)提供

@Override
    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
    }

15、單向發(fā)送,指定隊列選擇策略

 @Override
    public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
        throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
    }

三、TransactionMQProducer

1、發(fā)送事務(wù)消息

@Override
    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final Object arg) throws MQClientException {
        if (null == this.transactionListener) {
            throw new MQClientException("TransactionListener is null", null);
        }

        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,208評論 2 11
  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對...
    cosWriter閱讀 11,663評論 1 32
  • 最全的iOS面試題及答案 iOS面試小貼士 ———————————————回答好下面的足夠了-----------...
    zweic閱讀 2,803評論 0 73
  • 每個人的想法不同 , RocketMQ 介紹的時候就說 是阿里從他們使用的上 解耦出來 近一步簡化 便捷的 目...
    樓亭樵客閱讀 459評論 0 0
  • 1資金分成若干分,不斷地投項目,上線了漲不動就賣,破發(fā)也賣。時間耗不起。資金必須滾動,對項目要做到拔掉無情。 2熊...
    于海濤_290e閱讀 219評論 0 0

友情鏈接更多精彩內(nèi)容