當(dāng)我們想通過多條線程處理activemq中的消息,直覺上會使用固定大小線程池去處理,然而這種方式并不妥當(dāng),這么做我們只是將消息從activemq轉(zhuǎn)移到線程池的阻塞隊(duì)列之中,當(dāng)線程池開始工作,activemq中的消息快速被消費(fèi)完畢,而消息所代表的任務(wù)卻并未真正被處理, 他們被堆積在處理程序的內(nèi)存中,并陸續(xù)由線程中的線程處理。這會產(chǎn)生副作用,此時(shí)當(dāng)處理程序因?yàn)槟撤N原因而崩潰,這些待處理的任務(wù)都將丟失。
如何實(shí)現(xiàn)既能通過多個(gè)線程處理任務(wù),又能保證未完成的任務(wù)的安全性,此時(shí) SynchronousQueue 就有了用武之地。
我們可以把SynchronousQueue 當(dāng)作長度為1的阻塞隊(duì)列,當(dāng)隊(duì)列被塞入一個(gè)元素,假如這個(gè)元素未被消費(fèi)掉,那么后續(xù)的塞入操作將被阻塞。我們可以利用它的這個(gè)特性,把它當(dāng)作是activemq與處理線程之間的緩沖層。在 SynchronousQueue 的一端,我們從activemq中讀取一個(gè)元素,并將它put進(jìn)SynchronousQueue 。在另一端,多條線程分別從 SynchronousQueue 中 take 元素進(jìn)行處理,只有當(dāng) SynchronousQueue 中不存在任何元素,也就是線程們將當(dāng)前的任務(wù)都處理完畢,還有一端的從activemq中提取消息的操作才能執(zhí)行,反之則將被阻塞。 通過這種方式,我們便能保證任務(wù)不丟失的同時(shí)又能通過多線程處理它們。示例代碼如下
初始化一個(gè) SynchronousQueue
private SynchronousQueue<ActiveMQObjectMessage> synchronousQueue = new SynchronousQueue<>();
從activemq中將消息轉(zhuǎn)移至synchronousQueue,一次轉(zhuǎn)移一條,如果上一條未被處理,下一條不能繼續(xù)
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection
.DEFAULT_PASSWORD, brokerUrl);
connection = factory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue(dest);
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
try {
Message message = consumer.receive();
if (message instanceof ActiveMQObjectMessage) {
ActiveMQObjectMessage activeMQObjectMessage = (ActiveMQObjectMessage) message;
synchronousQueue.put(activeMQObjectMessage);
} else {
if (message != null) {
message.acknowledge();
logger.error("消息格式錯(cuò)誤,msg={}",message.toString());
}
}
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
開啟多條線程同時(shí)處理消息
Runnable task = () -> {
while (true) {
try {
ActiveMQObjectMessage activeMQObjectMessage = synchronousQueue.take();
//消費(fèi)消息,處理成功后確認(rèn)
boolean complete = handle(msg);
if (complete) {
activeMQObjectMessage.acknowledge();
}
} catch ( JMSException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < threads; i++) {
Thread thread = new Thread(task);
thread.setName("log-task-" + i);
thread.start();
}