最近項(xiàng)目中遇到了一個(gè)特殊的問題,需要使用異步的方式完成某個(gè)小任務(wù),還要求異步任務(wù)完成之后返回的結(jié)果集,由發(fā)起任務(wù)的線程繼續(xù)去處理。這樣的狀況用回調(diào)的方式去處理,需要切換線程,可能不太合適。筆者想到這個(gè)問題應(yīng)該比較類似于操作系統(tǒng)里面的生產(chǎn)者消費(fèi)者問題,就決定借鑒一下生產(chǎn)者消費(fèi)者問題的解決思路。線程A和線程B共享同一塊內(nèi)存區(qū),線程A缺少執(zhí)行鎖需要的資源時(shí),向線程B發(fā)送消息,讓線程B去獲取資源,發(fā)完消息之后,線程A進(jìn)入阻塞狀態(tài),線程B得到消息之后,從阻塞態(tài)變?yōu)閳?zhí)行狀態(tài),執(zhí)行完成之后,將結(jié)果集放入兩個(gè)線程共享的內(nèi)存區(qū),然后向線程A發(fā)送消息,讓其繼續(xù)執(zhí)行,若此時(shí)線程B沒有其他任務(wù)需要執(zhí)行,便進(jìn)入阻塞狀態(tài)。
一、Object.wait()
線程執(zhí)行執(zhí)行實(shí)例對(duì)象的wait()方法之前,必須獲取到該對(duì)象的鎖,執(zhí)行到實(shí)例對(duì)象的wait()方法之后,會(huì)釋放該對(duì)象的鎖,然后進(jìn)入阻塞狀態(tài)。線程A就是扮演的這個(gè)角色。示例代碼如下:
synchronized (obj) {
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
*** 二、obj.notify()***
線程執(zhí)行實(shí)例對(duì)象的notify()方法之前,也需要獲取到該對(duì)象的鎖,執(zhí)行完該對(duì)象的notify()方法,并且釋放鎖之后,會(huì)喚醒因?yàn)樵搶?duì)象而阻塞的線程之一,具體釋放哪個(gè)線程就要看線程調(diào)度怎么安排了。線程B就是扮演的這個(gè)角色。示例代碼如下:
synchronized (obj) {
obj.notify();
}
三、趟坑第一步
考慮到我這個(gè)項(xiàng)目很可能經(jīng)常會(huì)遇到這種控制線程執(zhí)行狀態(tài)的問題,所以我決定創(chuàng)建一個(gè)能管理所有線程的類Manager,讓它維持一個(gè)標(biāo)志位對(duì)象的Map,這樣以后遇到類似的問題,就不用再麻煩了。我們創(chuàng)建一個(gè)MonitorObject類,用它來產(chǎn)生標(biāo)志位對(duì)象,直接用Object類也完全可以。因?yàn)槲覀儾恍枰裁磳傩?,只要wait()和notify()方法可以用就行。
public class Manager {
public static Map<String, Object> map = new HashMap<String, Object>();
public static void create(String key) {
if (map.get(key) == null) {
Object monitorObject = new Object();
map.put(key, monitorObject);
}
}
public static void doWait(String key) {
create(key);
Object obj = map.get(key);
if (obj == null) return;
synchronized (obj) {
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void doNotify(String key) {
Object obj = map.get(key);
if (obj == null) return;
synchronized (obj) {
obj.notify();
}
}
}
四、線程調(diào)用
線程A在調(diào)起線程B執(zhí)行異步任務(wù)之后,使用約定好的一個(gè)作為標(biāo)志的key值,調(diào)用Manager的doWait()方法,線程B執(zhí)行完任務(wù)之后,調(diào)用Manager的doNotify()方法。示例如下:
class ThreadA extends Thread {
public void run() {
/*
喚起線程B執(zhí)行異步任務(wù)
*/
//線程A執(zhí)行完doWait()方法后進(jìn)入阻塞狀態(tài)
Manager.doWait("missonFlag");
/*
線程A喚醒之后的操作
*/
}
}
class ThreadB extends Thread {
public void run() {
/*
執(zhí)行子任務(wù)
*/
Manager.doNotify("missonFlag");
}
}
五、發(fā)現(xiàn)問題后進(jìn)行改造
上面是趟坑的第一步,寫這段程序的時(shí)候,我剛畢業(yè)入職不到兩個(gè)月,當(dāng)時(shí)還感覺自己棒棒噠,這邏輯看上去完美無(wú)瑕。確實(shí),最開始用的時(shí)候,確實(shí)還算正常,直到有幾次發(fā)現(xiàn)線程A有時(shí)候喚醒不了,一開始還以為是線程B異步任務(wù)執(zhí)行時(shí)遇到了網(wǎng)絡(luò)問題,最后一步步的調(diào)試才發(fā)現(xiàn),原來有的時(shí)候,doNotify()方法執(zhí)行在了doWait()方法之前。這樣的情況下,線程B執(zhí)行doNotify()方法的時(shí)候,由于線程A還沒有被阻塞,所以不會(huì)有任何作用。因?yàn)閚otify()方法的作用是在因?yàn)樵搶?shí)例對(duì)象而阻塞的線程中,喚醒一個(gè)線程去執(zhí)行,線程A并不符合條件。但是線程A還是會(huì)順序執(zhí)行doWait()方法,這樣線程A就被阻塞了,而且也不會(huì)受到喚醒它的消息,它就一直死等了,可憐的線程A!于是乎,我把Manager類進(jìn)行了改造。
現(xiàn)在發(fā)現(xiàn)只用Object是不行了,必須添加一個(gè)屬性才能行,需要添加一個(gè)計(jì)數(shù)器,用來記錄等待的狀態(tài),執(zhí)行 obj.wait(),則計(jì)數(shù)器加1,執(zhí)行obj.notify()方法,計(jì)數(shù)器減1。其中還出現(xiàn)過一個(gè)小插曲,就是doNotify()方法執(zhí)行在了doWait()方法之前的時(shí)候,MonitorObject的實(shí)例也沒有被創(chuàng)建,所以把create()方法也單獨(dú)拿了出來,在線程A喚起異步任務(wù)之前調(diào)用。
class MonitorObject {
public int count = 0;
}
public class Manager {
public static Map<String, MonitorObject> map = new HashMap<String, MonitorObject>();
public static void create(String key) {
if (map.get(key) == null) {
MonitorObject monitorObject = new MonitorObject();
map.put(key, monitorObject);
}
}
public static void doWait(String key) {
MonitorObject obj = map.get(key);
if (obj == null) return;
synchronized (obj) {
try {
if (obj.count < 0){
obj.count++;
return;
}
obj.count++;
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void doNotify(String key) {
MonitorObject obj = map.get(key);
if (obj == null) return;
synchronized (obj) {
obj.count--;
obj.notify();
}
}
}
六、改造之后使用Manager
改造之后,線程A在使用Manager之前,必須先創(chuàng)建MonitorObject對(duì)象,這一步是為了保證不管doWait()和doNotify()哪個(gè)先執(zhí)行,計(jì)數(shù)器的作用能正常發(fā)揮。改造之后的Manager仍然不能控制doWait()和doNotify()哪個(gè)先執(zhí)行,但是如果doNotify()先于doWait()執(zhí)行,在執(zhí)行doWait()操作時(shí),線程不會(huì)阻塞,只會(huì)把MonitorObject的計(jì)數(shù)器做自增操作。因?yàn)閐oNotify()先執(zhí)行,說明異步任務(wù)已經(jīng)完成,線程A已經(jīng)無(wú)需等待了。
示例代碼如下:
class ThreadA extends Thread {
public void run() {
Manager.create("missonFlag");
/*
喚起線程B執(zhí)行異步任務(wù)
*/
//線程A執(zhí)行完doWait()方法后進(jìn)入阻塞狀態(tài)
Manager.doWait("missonFlag");
/*
線程A喚醒之后的操作
*/
}
}
class ThreadB extends Thread {
public void run() {
/*
執(zhí)行子任務(wù)
*/
Manager.doNotify("missonFlag");
}
}