利用SynchronousQueue多線程處理ActiveMQ消息

當(dāng)我們想通過多條線程處理activemq中的消息,直覺上會使用固定大小線程池去處理,然而這種方式并不妥當(dāng),這么做我們只是將消息從activemq轉(zhuǎn)移到線程池的阻塞隊(duì)列之中,當(dāng)線程池開始工作,activemq中的消息快速被消費(fèi)完畢,而消息所代表的任務(wù)卻并未真正被處理, 他們被堆積在處理程序的內(nèi)存中,并陸續(xù)由線程中的線程處理。這會產(chǎn)生副作用,此時(shí)當(dāng)處理程序因?yàn)槟撤N原因而崩潰,這些待處理的任務(wù)都將丟失。

如何實(shí)現(xiàn)既能通過多個(gè)線程處理任務(wù),又能保證未完成的任務(wù)的安全性,此時(shí) SynchronousQueue 就有了用武之地。

我們可以把SynchronousQueue 當(dāng)作長度為1的阻塞隊(duì)列,當(dāng)隊(duì)列被塞入一個(gè)元素,假如這個(gè)元素未被消費(fèi)掉,那么后續(xù)的塞入操作將被阻塞。我們可以利用它的這個(gè)特性,把它當(dāng)作是activemq與處理線程之間的緩沖層。在 SynchronousQueue 的一端,我們從activemq中讀取一個(gè)元素,并將它put進(jìn)SynchronousQueue 。在另一端,多條線程分別從 SynchronousQueue 中 take 元素進(jìn)行處理,只有當(dāng) SynchronousQueue 中不存在任何元素,也就是線程們將當(dāng)前的任務(wù)都處理完畢,還有一端的從activemq中提取消息的操作才能執(zhí)行,反之則將被阻塞。 通過這種方式,我們便能保證任務(wù)不丟失的同時(shí)又能通過多線程處理它們。示例代碼如下

初始化一個(gè) SynchronousQueue

private SynchronousQueue<ActiveMQObjectMessage> synchronousQueue = new SynchronousQueue<>();

從activemq中將消息轉(zhuǎn)移至synchronousQueue,一次轉(zhuǎn)移一條,如果上一條未被處理,下一條不能繼續(xù)

ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection
                    .DEFAULT_PASSWORD, brokerUrl);
connection = factory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue(dest);
MessageConsumer consumer = session.createConsumer(destination);

while (true) {
  try {
    Message message = consumer.receive();
    if (message instanceof ActiveMQObjectMessage) {
      ActiveMQObjectMessage activeMQObjectMessage = (ActiveMQObjectMessage) message;
      synchronousQueue.put(activeMQObjectMessage);
    } else {
      if (message != null) {
        message.acknowledge();
        logger.error("消息格式錯(cuò)誤,msg={}",message.toString());
      }
    }
  } catch (JMSException | InterruptedException e) {
    e.printStackTrace();
  }
}

開啟多條線程同時(shí)處理消息

  Runnable task = () -> {
            while (true) {
                try {
                    ActiveMQObjectMessage activeMQObjectMessage = synchronousQueue.take();
                      //消費(fèi)消息,處理成功后確認(rèn)
                       boolean complete = handle(msg);
                        if (complete) {
                            activeMQObjectMessage.acknowledge();
                        }
                } catch ( JMSException e) {
                    e.printStackTrace();
                }
            }
        };

        for (int i = 0; i < threads; i++) {
            Thread thread = new Thread(task);
            thread.setName("log-task-" + i);
            thread.start();
        }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容