我們產(chǎn)品的的業(yè)務(wù)中有那么一個場景,在醫(yī)生關(guān)閉問診的3min后,患者將無法繼續(xù)和醫(yī)生進行對話。我根據(jù)對業(yè)務(wù)的理解,和對技術(shù)實現(xiàn)成本的衡量,決定采用通過DelayQueue的方式來實現(xiàn)的方案。
關(guān)于DelayQueue的相關(guān)內(nèi)容介紹和核心源碼解析已在上一篇DelayQueue之源碼分析說明了。
據(jù)我所知,生活中有如下場景可以用得到DelayQueue:
1、下單后一段時間(業(yè)內(nèi)基本上都是30分鐘)內(nèi)不付款,就自動取消訂單。
2、提交打車申請后,一段時間內(nèi)(比如說30秒)沒有附近的司機接單,就自動發(fā)送發(fā)送給更多的司機。
這類場景都有如下特點:
1、需要有一段時間的延遲,如果僅僅是為了異步執(zhí)行,那么消息隊列顯然是是更優(yōu)的選擇。
2、對執(zhí)行時間的精確度有一定要求,當然異常狀況下,也可以對精確度適當放寬松。比如場景1的訂單取消,規(guī)則設(shè)置為30分鐘不支付就取消,但實際場景中,精確到30分自然是最好結(jié)果,但假如出現(xiàn)故障,那么在可允許的范圍內(nèi)將訂單取消也是可以接受的(比如說在放寬到32分鐘內(nèi))。
3、執(zhí)行是高頻率的。這點需要和第2點結(jié)合起來看,如果僅僅是為了低頻率的定時執(zhí)行,個人認為任務(wù)調(diào)度也是可行的。
綜合來看,如果不需要延遲執(zhí)行,那么推薦用消息隊列;如果對執(zhí)行時間的精確度不那么在意且執(zhí)行頻率不高,那么推薦使用任務(wù)調(diào)度;如果需要延遲執(zhí)行,且執(zhí)行比較高頻,對執(zhí)行時間的精確度有一定要求,可以考慮使用延遲隊列。
以上這些是我們?yōu)楹尾捎肈elayQueue來實現(xiàn)這個業(yè)務(wù)場景的原因。
為了方便使用DelayQueue,我封裝了組件對DelayQueue進行了擴展。
首先我定義了一個類TaskMessage,對Delayed進行了擴展,實現(xiàn)了compareTo和getDelay方法。
如下是TaskMessage類的核心代碼。
public class TaskMessage implements Delayed {
private String body; //消息內(nèi)容
private long executeTime;//執(zhí)行時間
private Function function;//執(zhí)行方式
public TaskMessage(Long delayTime,String body, Function function) {
this.body = body;
this.function = function;
this.executeTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
}
@Override
public int compareTo(Delayed delayed) {
TaskMessage msg = (TaskMessage) delayed;
return (int) (this.getDelay(TimeUnit.MILLISECONDS) -msg.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.executeTime - System.nanoTime(), TimeUnit.NANOSECONDS);
}
}
外部調(diào)用只需要TaskMessage m1 = new TaskMessage(delayTime, body, function)就可以生成一個延遲任務(wù)的元素了,內(nèi)部自動就根據(jù)延遲時間計算出這個延遲任務(wù)元素的預(yù)期執(zhí)行時間。
Function是1.8版引入的函數(shù)式接口,主要方法是R apply(T t),功能是將Function對象應(yīng)用到輸入的參數(shù)上,然后返回計算結(jié)果。
那么達到延遲任務(wù)的預(yù)期執(zhí)行時間時,只需要調(diào)用一下function.apply()方法就可以了,不需要關(guān)心apply的具體實現(xiàn)。apply的具體實現(xiàn)方法是在調(diào)用時才明確的。
然后定義一個延遲任務(wù)的執(zhí)行線程類TaskConsumer,實現(xiàn)了Runnable,重寫了run方法。因為延遲任務(wù)的執(zhí)行,必然是需要重新起線程去執(zhí)行的,不能阻礙主線程的操作。
如下是TaskConsumer類的核心代碼。
public class TaskConsumer implements Runnable {
@Override
public void run() {
while (signal) {
try {
TaskMessage take = queue.take();
if (logger.isInfoEnabled()) {
logger.info("處理線程的id為" + threadId + ",消費消息內(nèi)容為:" + take.getBody() + ",預(yù)計執(zhí)行時間為" +
DateFormatUtils.timeStampToString(take.getDelay(TimeUnit.MILLISECONDS) + System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
}
take.getFunction().apply(take.getBody());
} catch (InterruptedException e) {
if (logger.isInfoEnabled()) {
logger.info("id為" + threadId + "的處理線程被強制中斷");
}
} catch (Exception e) {
logger.error("taskConsumer error", e);
}
}
if (logger.isInfoEnabled()) {
logger.info("id為" + threadId + "的處理線程已停止");
}
}
}
這個類核心代碼就只有如下兩行。
TaskMessage take = queue.take(); 獲取延遲隊列的隊首元素。前文已經(jīng)解釋過,Queue的take方法會返回隊列的隊首元素,否則就會掛起線程。所以只要有返回值,必然就能獲取到當前需要執(zhí)行的TaskMessage元素。
take.getFunction().apply(take.getBody()); 執(zhí)行延遲任務(wù)元素的apply方法。applay方法是在定義TaskMessage的時候確定的,表明了到達預(yù)期執(zhí)行時間所需要進行的一系列操作,那么此時只需要執(zhí)行對應(yīng)的apply方法就可以了。
最后是加載TaskConsumer的統(tǒng)一管理類TaskManager。
如下是TaskManager類的核心代碼。
public class TaskManager implements ApplicationContextAware,
InitializingBean,DisposableBean{
@Override
public void afterPropertiesSet() throws Exception {
for (int i = 0; i < threadCount; i++) {
TaskConsumer taskConsumer = new TaskConsumer(queue, i);
taskConsumerList.add(taskConsumer);
Thread thread = new Thread(taskConsumer);
threadList.add(thread);
thread.start();
}
}
@Override
public void destroy() throws Exception {
for(int i=0;i<threadList.size();i++){
threadList.get(i).interrupt();
taskConsumerList.get(i).setSignal(Boolean.FALSE);
}
}
}
這個類的作用在于初始化類后,就啟動線程不斷的去獲取延遲任務(wù)。然后在銷毀類后,先中斷消費者線程,然后設(shè)置信號量使得消費者線程的run方法能跳出死循環(huán),從而使得消費線程正常結(jié)束。
最后是如何調(diào)用的示例。很簡單,就只有兩步:
1、生成延遲任務(wù)元素taskMessage
2、將taskMessage添加到延遲隊列中
TaskMessage taskMessage = new TaskMessage(delayTime * 1000, messageBody,
function -> this.processTask(delayTaskMessage));
DelayQueue<TaskMessage> queue = taskManager.getQueue();
queue.offer(taskMessage);
ok,以上是如何擴展DelayQueue的功能構(gòu)造成高可用的組件的方案,歡迎大家來一起討論。
下一章我準備講一下我們項目中運用DelayQueue的過程中碰到的問題以及如何持久化的方案。