本文作為,其他文章鏈接使用,單獨參考意義不大。
Master-Worker框架如下,首先實現(xiàn)的Master線程,主要用作分配任務(wù),和返回結(jié)果集。
/**
* Created by Joker on 2015/3/9.
*/
public class Master {
//任務(wù)隊列
protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();
//線程隊列
protected Map<String, Thread> threadMap = new HashMap<String, Thread>();
//子任務(wù)處理結(jié)果集
protected Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
public Master(Worker worker, int workerCount) {
worker.setWorkQueue(workQueue);
worker.setResultMap(resultMap);
if (workerCount > 5)
workerCount = 5;
/*根據(jù)workerCount,創(chuàng)建指定數(shù)量的工作線程Worker*/
for (int i = 0; i < workerCount; i++) {
threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i)));
}
}
/**
* 檢查子任務(wù)是否全部執(zhí)行完畢
*/
public boolean isComplete() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
if (entry.getValue().getState() != Thread.State.TERMINATED) {
return false;
}
}
return true;
}
/**
* 提交任務(wù)
*/
public void submit(Object object) {
workQueue.add(object);
}
/**
* 獲取子任務(wù)結(jié)果集
*
* @return 結(jié)果集
*/
public Map<String, Object> getResultMap() {
return resultMap;
}
/**
* 執(zhí)行所有工作線程worker,處理任務(wù)。
*/
public void execute() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
entry.getValue().start();
}
}
}
Worker作為子線程,實現(xiàn)在Handle方法中實現(xiàn)具體業(yè)務(wù)邏輯,并加處理結(jié)果放到結(jié)果集中。
/**
* Created by Joker on 2015/3/9.
*/
public class Worker implements Runnable {
//子任務(wù)隊列,用于過去子任務(wù)
protected Queue<Object> workQueue;
//結(jié)果集
protected Map<String, Object> resultMap;
public void setWorkQueue(Queue<Object> workQueue) {
this.workQueue = workQueue;
}
public void setResultMap(Map<String, Object> resultMap) {
this.resultMap = resultMap;
}
/**
* 執(zhí)行具體業(yè)務(wù)邏輯
* 模擬立方和計算
*
* @param object
* @return
*/
public Object handle(Object object) {
Integer i = (Integer) object;
return i * i * i;
}
@Override
public void run() {
while (true) {
//獲取任務(wù)
Object work = workQueue.poll();
if (work == null)
break;
resultMap.put(Integer.toString(work.hashCode()), this.handle(work));
}
}
}
需要調(diào)用的地方,進行立方和計算。
/**
* 計算立方和1~100的立方和
*/
public void executeCubic() {
//創(chuàng)建三個Worker線程,執(zhí)行任務(wù)
Master master = new Master(new Worker(), 3);
for (int i = 0; i < 100; i++) {
master.submit(i);
}
//執(zhí)行任務(wù)
master.execute();
//計算結(jié)果,初始化
int result = 0;
//計算結(jié)果集
Map<String, Object> resultMap = master.getResultMap();
while (resultMap.size() > 0 || !master.isComplete()) {
String key = null;
Integer i = null;
for (String k : resultMap.keySet()) {
key = k;
break;
}
if (key != null) {
i = (Integer) resultMap.get(key);
//從結(jié)果集中移除倍計算過的key
resultMap.remove(key);
}
if (i != null) {
result += i;
}
}
}
提交100個任務(wù)后由3個Worker線程進行計算,Master并不等待所有Worker線程執(zhí)行完畢,就開始訪問子結(jié)果集,進行相加計算,直到子結(jié)果集中的所有數(shù)據(jù)處理完畢,并且3個活躍Worker線程全部停止,才給出最終的立方總和。result最終結(jié)果便是1~100立方和。