WorkflowSim中的HEFT算法代碼解讀
HEFT(異構(gòu)計(jì)算環(huán)境下最早完成時(shí)間算法)
WorkflowSim是一個(gè)用于模擬工作流調(diào)度的集成仿真平臺(tái),內(nèi)置了部分基礎(chǔ)的工作流調(diào)度算法,今天就為大家介紹最為基礎(chǔ)的HEFT算法。
1.算法思路
HEFT算法是一種基礎(chǔ)的靜態(tài)調(diào)度算法,假設(shè)工作流DAG中所有任務(wù)的數(shù)據(jù)大小、任務(wù)間通信傳輸數(shù)據(jù)大小、計(jì)算環(huán)境下各虛擬機(jī)的計(jì)算性能和平均帶寬等信息均為已知。
HEFT算法的思路很簡(jiǎn)單,就是將所有任務(wù)都安排在能夠使它最早完成的虛擬機(jī)上執(zhí)行。
那么,它的步驟則是根據(jù)DAG中的依賴關(guān)系,在調(diào)度之前分別計(jì)算所有任務(wù)在各個(gè)虛擬機(jī)上的最早完成時(shí)間,然后通過(guò)比較得到最小的最早完成時(shí)間和能夠滿足該條件的虛擬機(jī),將任務(wù)和該虛擬機(jī)進(jìn)行一個(gè)匹配并對(duì)應(yīng)相應(yīng)的占用時(shí)段,在所有任務(wù)和虛擬機(jī)完成匹配后,再將整個(gè)工作流的任務(wù)依此調(diào)度至對(duì)應(yīng)的虛擬機(jī)上進(jìn)行運(yùn)行。
2.代碼
WorkflowSim的HEFT算法就在souce源文件夾下的planning包中。
直接上代碼:
package org.workflowsim.planning;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.cloudbus.cloudsim.Consts;
import org.cloudbus.cloudsim.Log;
import org.workflowsim.CondorVM;
import org.workflowsim.FileItem;
import org.workflowsim.Task;
import org.workflowsim.utils.Parameters;
/**
* The HEFT planning algorithm.
*/
public class HEFTPlanningAlgorithm extends BasePlanningAlgorithm {
private Map<Task, Map<CondorVM, Double>> computationCosts;
private Map<Task, Map<Task, Double>> transferCosts;
private Map<Task, Double> rank;
private Map<CondorVM, List<Event>> schedules;
private Map<Task, Double> earliestFinishTimes;
private double averageBandwidth;
private class Event {
public double start;
public double finish;
public Event(double start, double finish) {
this.start = start;
this.finish = finish;
}
}
private class TaskRank implements Comparable<TaskRank> {
public Task task;
public Double rank;
public TaskRank(Task task, Double rank) {
this.task = task;
this.rank = rank;
}
@Override
public int compareTo(TaskRank o) {
return o.rank.compareTo(rank);
}
}
public HEFTPlanningAlgorithm() {
computationCosts = new HashMap<>();
transferCosts = new HashMap<>();
rank = new HashMap<>();
earliestFinishTimes = new HashMap<>();
schedules = new HashMap<>();
}
/**
* The main function
*/
@Override
public void run() {
Log.printLine("HEFT planner running with " + getTaskList().size()
+ " tasks.");
averageBandwidth = calculateAverageBandwidth();
for (Object vmObject : getVmList()) {
CondorVM vm = (CondorVM) vmObject;
schedules.put(vm, new ArrayList<>());
}
// Prioritization phase
calculateComputationCosts();
calculateTransferCosts();
calculateRanks();
// Selection phase
allocateTasks();
}
/**
* Calculates the average available bandwidth among all VMs in Mbit/s
*
* @return Average available bandwidth in Mbit/s
*/
//計(jì)算平均帶寬
private double calculateAverageBandwidth() {
double avg = 0.0;
for (Object vmObject : getVmList()) {
CondorVM vm = (CondorVM) vmObject;
avg += vm.getBw();
}
return avg / getVmList().size();
}
/**
* Populates the computationCosts field with the time in seconds to compute
* a task in a vm.
*/
//計(jì)算計(jì)算成本
private void calculateComputationCosts() {
for (Task task : getTaskList()) {
Map<CondorVM, Double> costsVm = new HashMap<>();
for (Object vmObject : getVmList()) {
CondorVM vm = (CondorVM) vmObject;
if (vm.getNumberOfPes() < task.getNumberOfPes()) {
costsVm.put(vm, Double.MAX_VALUE);
} else {
costsVm.put(vm,
task.getCloudletTotalLength() / vm.getMips());
}
}
computationCosts.put(task, costsVm);
}
}
/**
* Populates the transferCosts map with the time in seconds to transfer all
* files from each parent to each child
*/
//計(jì)算傳輸成本
private void calculateTransferCosts() {
// Initializing the matrix
for (Task task1 : getTaskList()) {//對(duì)于所有任務(wù),構(gòu)建TransferCosts映射<任務(wù),映射<任務(wù),double>>并放入其中
Map<Task, Double> taskTransferCosts = new HashMap<>();
for (Task task2 : getTaskList()) {
taskTransferCosts.put(task2, 0.0);
}
transferCosts.put(task1, taskTransferCosts);
}
// Calculating the actual values
//對(duì)于每個(gè)任務(wù),把他們對(duì)應(yīng)的每個(gè)子任務(wù)與其之間的計(jì)算的傳輸成本加入映射中——調(diào)用有參數(shù)類(lèi)calculateTransferCost
for (Task parent : getTaskList()) {
for (Task child : parent.getChildList()) {
transferCosts.get(parent).put(child,
calculateTransferCost(parent, child));
}
}
}
/**
* Accounts the time in seconds necessary to transfer all files described
* between parent and child
*
* @param parent
* @param child
* @return Transfer cost in seconds
*/
private double calculateTransferCost(Task parent, Task child) {
List<FileItem> parentFiles = parent.getFileList();
List<FileItem> childFiles = child.getFileList();
double acc = 0.0;
for (FileItem parentFile : parentFiles) {
if (parentFile.getType() != Parameters.FileType.OUTPUT) {//如果父類(lèi)文件類(lèi)型與要求不符,則跳過(guò)此次循環(huán)
continue;
}
for (FileItem childFile : childFiles) {
//若子類(lèi)文件類(lèi)型符合要求并且名字與父類(lèi)文件相同
if (childFile.getType() == Parameters.FileType.INPUT
&& childFile.getName().equals(parentFile.getName())) {
acc += childFile.getSize();///acc加上子類(lèi)文件的大小
break;
}
}
}
//file Size is in Bytes, acc in MB
acc = acc / Consts.MILLION;
// acc in MB, averageBandwidth in Mb/s
return acc * 8 / averageBandwidth;
}
/**
* Invokes calculateRank for each task to be scheduled
*/
private void calculateRanks() {
for (Task task : getTaskList()) {
calculateRank(task);
}
}
/**
* Populates rank.get(task) with the rank of task as defined in the HEFT
* paper.
*
* @param task The task have the rank calculates
* @return The rank
*/
private double calculateRank(Task task) {
if (rank.containsKey(task)) {//若映射中有task鍵,則返回它的值
return rank.get(task);
}
//計(jì)算computationCosts映射中任務(wù)的平均成本
double averageComputationCost = 0.0;
for (Double cost : computationCosts.get(task).values()) {
averageComputationCost += cost;
}
averageComputationCost /= computationCosts.get(task).size();
double max = 0.0;
for (Task child : task.getChildList()) {
//對(duì)于task的所有子類(lèi),子類(lèi)成本=傳輸成本中對(duì)應(yīng)task.child的值+child計(jì)算的優(yōu)先級(jí)的值
double childCost = transferCosts.get(task).get(child)
+ calculateRank(child);
max = Math.max(max, childCost);//不斷更新max值,得到子類(lèi)中最大的子類(lèi)成本
}
rank.put(task, averageComputationCost + max);//rank映射中放入task和(平均計(jì)算成本+最大子類(lèi)成本)的鍵值對(duì)
return rank.get(task);//返回對(duì)應(yīng)的task的值
}
/**
* Allocates all tasks to be scheduled in non-ascending order of schedule.
*/
private void allocateTasks() {
List<TaskRank> taskRank = new ArrayList<>();
for (Task task : rank.keySet()) {//對(duì)于映射rank的鍵集合,在taskRank加入新的TaskRank,放入task和task在rank中對(duì)應(yīng)的值
taskRank.add(new TaskRank(task, rank.get(task)));
}
// Sorting in non-ascending order of rank
Collections.sort(taskRank);//將taskRank列表中元素以非升序排列,然后依此調(diào)度
for (TaskRank rank : taskRank) {
allocateTask(rank.task);
}
}
/**
* Schedules the task given in one of the VMs minimizing the earliest finish
* time
*
* @param task The task to be scheduled
* @pre All parent tasks are already scheduled
*/
private void allocateTask(Task task) {
CondorVM chosenVM = null;//選中的vm預(yù)設(shè)為空
double earliestFinishTime = Double.MAX_VALUE;//預(yù)設(shè)值最早完成時(shí)間為最大值
double bestReadyTime = 0.0;
double finishTime;//聲明完成時(shí)間
for (Object vmObject : getVmList()) {//對(duì)虛擬機(jī)列表中所有虛擬機(jī)遍歷
CondorVM vm = (CondorVM) vmObject;//把虛擬機(jī)強(qiáng)制轉(zhuǎn)換為CondorVM類(lèi)型的vm
double minReadyTime = 0.0;//預(yù)設(shè)最小等待時(shí)間
for (Task parent : task.getParentList()) {
double readyTime = earliestFinishTimes.get(parent);
if (parent.getVmId() != vm.getId()) {//如果當(dāng)前任務(wù)和父任務(wù)不在一個(gè)虛擬機(jī)上面執(zhí)行,則等待時(shí)間要加上傳輸成本
readyTime += transferCosts.get(parent).get(task);
System.out.println(parent.getCloudletId()+"從虛擬機(jī)"+parent.getVmId()+"到虛擬機(jī)"+vm.getId()+"的任務(wù)"+task.getCloudletId()+"的傳輸時(shí)常"+transferCosts.get(parent).get(task));
System.out.println(task.getCloudletId()+"就緒時(shí)間"+readyTime);
}
minReadyTime = Math.max(minReadyTime, readyTime);//更新最小等待時(shí)間,通過(guò)遍歷取到全部最小等待時(shí)間和等待時(shí)間的最大值
}
finishTime = findFinishTime(task, vm, minReadyTime, false);//調(diào)用findFinishTime來(lái)計(jì)算完成時(shí)間
if (finishTime < earliestFinishTime) {//若完成時(shí)間小于最早完成時(shí)間,就把當(dāng)前的一些數(shù)值賦值給最佳的
bestReadyTime = minReadyTime;
earliestFinishTime = finishTime;//把當(dāng)前完成時(shí)間賦值給最早完成時(shí)間
chosenVM = vm;
}
}
findFinishTime(task, chosenVM, bestReadyTime, true);
earliestFinishTimes.put(task, earliestFinishTime);//把當(dāng)前任務(wù)和最早完成時(shí)間的鍵值對(duì)放入映射中
task.setVmId(chosenVM.getId());
}
/**
* Finds the best time slot available to minimize the finish time of the
* given task in the vm with the constraint of not scheduling it before
* readyTime. If occupySlot is true, reserves the time slot in the schedule.
* 限制為不能在準(zhǔn)備時(shí)間之前開(kāi)始調(diào)度,若ocupuSlot存放的為true,即已占用時(shí)隙,則在schedu中保存這個(gè)時(shí)隙
* @param task The task to have the time slot reserved
* @param vm The vm that will execute the task
* @param readyTime The first moment that the task is available to be
* scheduled
* @param occupySlot If true, reserves the time slot in the schedule.
* @return The minimal finish time of the task in the vmn
*/
private double findFinishTime(Task task, CondorVM vm, double readyTime,
boolean occupySlot) {
List<Event> sched = schedules.get(vm);//schedules中對(duì)應(yīng)vm的列表
double computationCost = computationCosts.get(task).get(vm);
double start, finish;
int pos;//pos
if (sched.isEmpty()) {
if (occupySlot) {
//若vm對(duì)應(yīng)的列表為空,且有要預(yù)留的時(shí)間段,則把這段時(shí)間加入列表中
sched.add(new Event(readyTime, readyTime + computationCost));
}
return readyTime + computationCost;//返回預(yù)留時(shí)間段的結(jié)束時(shí)間
}
if (sched.size() == 1) {//如果vm列表中已經(jīng)有一個(gè)時(shí)間段
if (readyTime >= sched.get(0).finish) {//準(zhǔn)備時(shí)間大于已有時(shí)間段的結(jié)束時(shí)間
pos = 1;//位置賦值1
start = readyTime;
} else if (readyTime + computationCost <= sched.get(0).start) {
//否則若時(shí)間段的結(jié)束時(shí)間小于已有時(shí)間段的開(kāi)始時(shí)間
pos = 0;//位置賦值1
start = readyTime;
} else {//若時(shí)間段和已有事件的時(shí)間段有重合
pos = 1;//則放到已有事件之后
start = sched.get(0).finish;//并且開(kāi)始時(shí)間設(shè)為已有事件段的結(jié)束時(shí)間
}
if (occupySlot) {//若要預(yù)留
sched.add(pos, new Event(start, start + computationCost));
}
return start + computationCost;
}
// Trivial case: Start after the latest task scheduled
//一般情況:在最后一個(gè)調(diào)度
//開(kāi)始時(shí)間設(shè)為準(zhǔn)備時(shí)間和原先列表中最后一個(gè)事件結(jié)束時(shí)間中的最大值
start = Math.max(readyTime, sched.get(sched.size() - 1).finish);
finish = start + computationCost;
int i = sched.size() - 1;//i為列表中最后一個(gè)事件的位置
int j = sched.size() - 2;//j為列表中倒數(shù)第二個(gè)事件的位置
pos = i + 1;//pos為該時(shí)間段要插入的位置
while (j >= 0) {//當(dāng)列表中已經(jīng)存在事件
Event current = sched.get(i);//最后一個(gè)事件記為current
Event previous = sched.get(j);//倒數(shù)第二個(gè)記為previous
if (readyTime > previous.finish) {
if (readyTime + computationCost <= current.start) {
//若該時(shí)間段恰好在最后一個(gè)和倒數(shù)第二個(gè)時(shí)間段的中間
start = readyTime;
finish = readyTime + computationCost;
pos = i;
}
break;
}
if (previous.finish + computationCost <= current.start) {
//若從倒數(shù)第二個(gè)事件結(jié)束時(shí)間恰好開(kāi)始,在最有一個(gè)事件的開(kāi)始前能完成
start = previous.finish;
finish = previous.finish + computationCost;
pos = i;
}
i--;//將位置向前挪動(dòng)進(jìn)行判斷
j--;
}
if (readyTime + computationCost <= sched.get(0).start) {
//當(dāng)該時(shí)間段預(yù)設(shè)的完成時(shí)間在第一個(gè)事件開(kāi)始之前
pos = 0;
start = readyTime;
if (occupySlot) {
//若預(yù)置設(shè)定為是,則插入第一個(gè)位置
sched.add(pos, new Event(start, start + computationCost));
}
return start + computationCost;
}
if (occupySlot) {
sched.add(pos, new Event(start, finish));
}
return finish;
}
}