ActiveMQ異步發(fā)送使用及常見誤區(qū)

這段時間一邊在重新研究RocketMQ,一邊開始看Artemis。RocketMQ現(xiàn)在有很多中文資料,但是Artemis的中文資料很少很少,倒是官方文檔不錯。后續(xù)應該會出一篇Artemis和RocketMQ的介紹文章。

今天要記錄的是ActiveMQ的異步發(fā)送,直到最近我研究了kafka以后才知道異步發(fā)送該這么玩……

ActiveMQ異步發(fā)送

ActiveMQ官方說異步發(fā)送是很多模式下默認的傳輸方式,但是在發(fā)送非事物持久化消息的時候默認使用的是同步發(fā)送模式。

The good news is that ActiveMQ sends message in async mode by default in several cases. It is only in cases where the JMS specification required the use of sync sending that we default to sync sending. The cases that we are forced to send in sync mode are when persistent messages are being sent outside of a transaction.

同步發(fā)送時,Producer.send() 方法會被阻塞,直到 broker 發(fā)送一個確認消息給生產(chǎn)者,這個確認消息暗示生產(chǎn)者 broker 已經(jīng)成功地將它發(fā)送的消息路由到目標目的并把消息保存到二級存儲中。

同步發(fā)送持久消息能夠提供更好的可靠性,但這潛在地影響了程序的相應速度,因為在接受到 broker 的確認消息之前應用程序或線程會被阻塞。如果應用程序能夠容忍一些消息的丟失,那么可以使用異步發(fā)送。異步發(fā)送不會在受到 broker 的確認之前一直阻塞 Producer.send 方法。

有幾種方式可以使用異步發(fā)送:

  1. 設置ConnectionFactory時指定使用異步
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
  1. 不在構(gòu)造函數(shù)中指定,而是修改ConnectionFactory的配置
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
  1. 在實例化后的ActiveMQConnection對象中設置異步發(fā)送
((ActiveMQConnection)connection).setUseAsyncSend(true)

發(fā)送端代碼

對于發(fā)送端來說,使用中常常把同步發(fā)送和異步發(fā)送混淆使用。因為producer.send(message)太深入人心。而在異步發(fā)送持久化消息時,這種普通的send方法卻會導致消息的丟失。

異步發(fā)送丟失消息的場景是:生產(chǎn)者設置UseAsyncSend=true,使用producer.send(msg)持續(xù)發(fā)送消息。由于消息不阻塞,生產(chǎn)者會認為所有send的消息均被成功發(fā)送至MQ。如果服務端突然宕機,此時生產(chǎn)者端內(nèi)存中尚未被發(fā)送至MQ的消息都會丟失。

正確的異步發(fā)送方法是需要接收回調(diào)的。來看看AMQ提供的UseCase代碼吧

    private double benchmarkCallbackRate() throws JMSException, InterruptedException {
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(getName());
        int count = 1000;
        final CountDownLatch messagesSent = new CountDownLatch(count);
        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        long start = System.currentTimeMillis();
        for (int i = 0; i < count; i++) {
            producer.send(session.createTextMessage("Hello"), new AsyncCallback() {
                @Override
                public void onSuccess() {
                    messagesSent.countDown();
                }

                @Override
                public void onException(JMSException exception) {
                    exception.printStackTrace();
                }
            });
        }
        messagesSent.await();
        return 1000.0 * count / (System.currentTimeMillis() - start);
    }

可以看到,producer.send有帶上AsyncCallback的方法。該方法中需要重寫onSuccess方法和onException方法。onSuccess方法就是表示這條消息成功發(fā)送到MQ上,并接收到了MQ持久化后的回調(diào)。onException表示MQ返回一個入隊異常的回執(zhí)。在上面的示例中用的是CountDownLatch類在onSuccess中記錄。主要是因為onSuccess方法中只能引用final對象。

一般來說,可以寫成下面的形式

public void sendMessage(ActiveMQMessage msg, final String msgid) throws JMSException {
     producer.send(msg, new AsyncCallback() {
          @Override
          public void onSuccess() {
              // 使用msgid標識來進行消息發(fā)送成功的處理
              System.out.println(msgid+" has been successfully sent.");
          }
          @Override
          public void onException(JMSException exception) {
              // 使用msgid表示進行消息發(fā)送失敗的處理
              System.out.println(msgid+" fail to send to mq.");
              exception.printStackTrace();
          }
      });
}

同步發(fā)送和異步發(fā)送的區(qū)別就在此,同步發(fā)送等send不阻塞了就表示一定發(fā)送成功了,可是異步發(fā)送需要接收回執(zhí)并由客戶端再判斷一次是否發(fā)送成功。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容