WorkflowSim中的HEFT調(diào)度算法代碼解讀

WorkflowSim中的HEFT算法代碼解讀

HEFT(異構(gòu)計(jì)算環(huán)境下最早完成時(shí)間算法)

WorkflowSim是一個(gè)用于模擬工作流調(diào)度的集成仿真平臺(tái),內(nèi)置了部分基礎(chǔ)的工作流調(diào)度算法,今天就為大家介紹最為基礎(chǔ)的HEFT算法。

1.算法思路

HEFT算法是一種基礎(chǔ)的靜態(tài)調(diào)度算法,假設(shè)工作流DAG中所有任務(wù)的數(shù)據(jù)大小、任務(wù)間通信傳輸數(shù)據(jù)大小、計(jì)算環(huán)境下各虛擬機(jī)的計(jì)算性能和平均帶寬等信息均為已知。
HEFT算法的思路很簡(jiǎn)單,就是將所有任務(wù)都安排在能夠使它最早完成的虛擬機(jī)上執(zhí)行。
那么,它的步驟則是根據(jù)DAG中的依賴關(guān)系,在調(diào)度之前分別計(jì)算所有任務(wù)在各個(gè)虛擬機(jī)上的最早完成時(shí)間,然后通過(guò)比較得到最小的最早完成時(shí)間和能夠滿足該條件的虛擬機(jī),將任務(wù)和該虛擬機(jī)進(jìn)行一個(gè)匹配并對(duì)應(yīng)相應(yīng)的占用時(shí)段,在所有任務(wù)和虛擬機(jī)完成匹配后,再將整個(gè)工作流的任務(wù)依此調(diào)度至對(duì)應(yīng)的虛擬機(jī)上進(jìn)行運(yùn)行。

2.代碼

WorkflowSim的HEFT算法就在souce源文件夾下的planning包中。
直接上代碼:

package org.workflowsim.planning;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.cloudbus.cloudsim.Consts;
import org.cloudbus.cloudsim.Log;
import org.workflowsim.CondorVM;
import org.workflowsim.FileItem;
import org.workflowsim.Task;
import org.workflowsim.utils.Parameters;

/**
 * The HEFT planning algorithm.
 */
public class HEFTPlanningAlgorithm extends BasePlanningAlgorithm {

    private Map<Task, Map<CondorVM, Double>> computationCosts;
    private Map<Task, Map<Task, Double>> transferCosts;
    private Map<Task, Double> rank;
    private Map<CondorVM, List<Event>> schedules;
    private Map<Task, Double> earliestFinishTimes;
    private double averageBandwidth;

    private class Event {

        public double start;
        public double finish;

        public Event(double start, double finish) {
            this.start = start;
            this.finish = finish;
        }
    }

    private class TaskRank implements Comparable<TaskRank> {

        public Task task;
        public Double rank;

        public TaskRank(Task task, Double rank) {
            this.task = task;
            this.rank = rank;
        }

        @Override
        public int compareTo(TaskRank o) {
            return o.rank.compareTo(rank);
        }
    }

    public HEFTPlanningAlgorithm() {
        computationCosts = new HashMap<>();
        transferCosts = new HashMap<>();
        rank = new HashMap<>();
        earliestFinishTimes = new HashMap<>();
        schedules = new HashMap<>();
    }

    /**
     * The main function
     */
    @Override
    public void run() {
        Log.printLine("HEFT planner running with " + getTaskList().size()
                + " tasks.");

        averageBandwidth = calculateAverageBandwidth();

        for (Object vmObject : getVmList()) {
            CondorVM vm = (CondorVM) vmObject;
            schedules.put(vm, new ArrayList<>());
        }

        // Prioritization phase
        calculateComputationCosts();
        calculateTransferCosts();
        calculateRanks();

        // Selection phase
        allocateTasks();
    }

    /**
     * Calculates the average available bandwidth among all VMs in Mbit/s
     *
     * @return Average available bandwidth in Mbit/s
     */
    //計(jì)算平均帶寬
    private double calculateAverageBandwidth() {
        double avg = 0.0;
        for (Object vmObject : getVmList()) {
            CondorVM vm = (CondorVM) vmObject;
            avg += vm.getBw();
        }
        return avg / getVmList().size();
    }

    /**
     * Populates the computationCosts field with the time in seconds to compute
     * a task in a vm.
     */
    //計(jì)算計(jì)算成本
    private void calculateComputationCosts() {
        for (Task task : getTaskList()) {
            Map<CondorVM, Double> costsVm = new HashMap<>();
            for (Object vmObject : getVmList()) {
                CondorVM vm = (CondorVM) vmObject;
                if (vm.getNumberOfPes() < task.getNumberOfPes()) {
                    costsVm.put(vm, Double.MAX_VALUE);
                } else {
                    costsVm.put(vm,
                            task.getCloudletTotalLength() / vm.getMips());
                }
            }
            computationCosts.put(task, costsVm);
        }
    }

    /**
     * Populates the transferCosts map with the time in seconds to transfer all
     * files from each parent to each child
     */
    //計(jì)算傳輸成本
    private void calculateTransferCosts() {
        // Initializing the matrix
        for (Task task1 : getTaskList()) {//對(duì)于所有任務(wù),構(gòu)建TransferCosts映射<任務(wù),映射<任務(wù),double>>并放入其中
            Map<Task, Double> taskTransferCosts = new HashMap<>();
            for (Task task2 : getTaskList()) {
                taskTransferCosts.put(task2, 0.0);
            }
            transferCosts.put(task1, taskTransferCosts);
        }

        // Calculating the actual values
        //對(duì)于每個(gè)任務(wù),把他們對(duì)應(yīng)的每個(gè)子任務(wù)與其之間的計(jì)算的傳輸成本加入映射中——調(diào)用有參數(shù)類(lèi)calculateTransferCost
        for (Task parent : getTaskList()) {
            for (Task child : parent.getChildList()) {
                transferCosts.get(parent).put(child,
                        calculateTransferCost(parent, child));
            }
        }
    }

    /**
     * Accounts the time in seconds necessary to transfer all files described
     * between parent and child
     *
     * @param parent
     * @param child
     * @return Transfer cost in seconds
     */
    private double calculateTransferCost(Task parent, Task child) {
        List<FileItem> parentFiles = parent.getFileList();
        List<FileItem> childFiles = child.getFileList();

        double acc = 0.0;

        for (FileItem parentFile : parentFiles) {
            if (parentFile.getType() != Parameters.FileType.OUTPUT) {//如果父類(lèi)文件類(lèi)型與要求不符,則跳過(guò)此次循環(huán)
                continue;
            }

            for (FileItem childFile : childFiles) {
                //若子類(lèi)文件類(lèi)型符合要求并且名字與父類(lèi)文件相同
                if (childFile.getType() == Parameters.FileType.INPUT
                        && childFile.getName().equals(parentFile.getName())) {
                    acc += childFile.getSize();///acc加上子類(lèi)文件的大小
                    break;
                }
            }
        }

        //file Size is in Bytes, acc in MB
        acc = acc / Consts.MILLION;
        // acc in MB, averageBandwidth in Mb/s
        return acc * 8 / averageBandwidth;
    }

    /**
     * Invokes calculateRank for each task to be scheduled
     */
    private void calculateRanks() {
        for (Task task : getTaskList()) {
            calculateRank(task);
        }
    }

    /**
     * Populates rank.get(task) with the rank of task as defined in the HEFT
     * paper.
     *
     * @param task The task have the rank calculates
     * @return The rank
     */
    private double calculateRank(Task task) {
        if (rank.containsKey(task)) {//若映射中有task鍵,則返回它的值
            return rank.get(task);
        }
        //計(jì)算computationCosts映射中任務(wù)的平均成本
        double averageComputationCost = 0.0;

        for (Double cost : computationCosts.get(task).values()) {
            averageComputationCost += cost;
        }

        averageComputationCost /= computationCosts.get(task).size();

        double max = 0.0;
        for (Task child : task.getChildList()) {
            //對(duì)于task的所有子類(lèi),子類(lèi)成本=傳輸成本中對(duì)應(yīng)task.child的值+child計(jì)算的優(yōu)先級(jí)的值
            double childCost = transferCosts.get(task).get(child)
                    + calculateRank(child);
            max = Math.max(max, childCost);//不斷更新max值,得到子類(lèi)中最大的子類(lèi)成本
        }

        rank.put(task, averageComputationCost + max);//rank映射中放入task和(平均計(jì)算成本+最大子類(lèi)成本)的鍵值對(duì)

        return rank.get(task);//返回對(duì)應(yīng)的task的值
    }

    /**
     * Allocates all tasks to be scheduled in non-ascending order of schedule.
     */
    private void allocateTasks() {
        List<TaskRank> taskRank = new ArrayList<>();
        for (Task task : rank.keySet()) {//對(duì)于映射rank的鍵集合,在taskRank加入新的TaskRank,放入task和task在rank中對(duì)應(yīng)的值
            taskRank.add(new TaskRank(task, rank.get(task)));
        }

        // Sorting in non-ascending order of rank
        Collections.sort(taskRank);//將taskRank列表中元素以非升序排列,然后依此調(diào)度
        for (TaskRank rank : taskRank) {
            allocateTask(rank.task);
        }

    }

    /**
     * Schedules the task given in one of the VMs minimizing the earliest finish
     * time
     *
     * @param task The task to be scheduled
     * @pre All parent tasks are already scheduled
     */
    private void allocateTask(Task task) {
        CondorVM chosenVM = null;//選中的vm預(yù)設(shè)為空
        double earliestFinishTime = Double.MAX_VALUE;//預(yù)設(shè)值最早完成時(shí)間為最大值
        double bestReadyTime = 0.0;
        double finishTime;//聲明完成時(shí)間

        for (Object vmObject : getVmList()) {//對(duì)虛擬機(jī)列表中所有虛擬機(jī)遍歷
            CondorVM vm = (CondorVM) vmObject;//把虛擬機(jī)強(qiáng)制轉(zhuǎn)換為CondorVM類(lèi)型的vm
            double minReadyTime = 0.0;//預(yù)設(shè)最小等待時(shí)間

            for (Task parent : task.getParentList()) {
                double readyTime = earliestFinishTimes.get(parent);
                if (parent.getVmId() != vm.getId()) {//如果當(dāng)前任務(wù)和父任務(wù)不在一個(gè)虛擬機(jī)上面執(zhí)行,則等待時(shí)間要加上傳輸成本
                    readyTime += transferCosts.get(parent).get(task);
                    System.out.println(parent.getCloudletId()+"從虛擬機(jī)"+parent.getVmId()+"到虛擬機(jī)"+vm.getId()+"的任務(wù)"+task.getCloudletId()+"的傳輸時(shí)常"+transferCosts.get(parent).get(task));
                    System.out.println(task.getCloudletId()+"就緒時(shí)間"+readyTime);
                }
                minReadyTime = Math.max(minReadyTime, readyTime);//更新最小等待時(shí)間,通過(guò)遍歷取到全部最小等待時(shí)間和等待時(shí)間的最大值
            }

            finishTime = findFinishTime(task, vm, minReadyTime, false);//調(diào)用findFinishTime來(lái)計(jì)算完成時(shí)間

            if (finishTime < earliestFinishTime) {//若完成時(shí)間小于最早完成時(shí)間,就把當(dāng)前的一些數(shù)值賦值給最佳的
                bestReadyTime = minReadyTime;
                earliestFinishTime = finishTime;//把當(dāng)前完成時(shí)間賦值給最早完成時(shí)間
                chosenVM = vm;
            }
        }

        findFinishTime(task, chosenVM, bestReadyTime, true);
        earliestFinishTimes.put(task, earliestFinishTime);//把當(dāng)前任務(wù)和最早完成時(shí)間的鍵值對(duì)放入映射中

        task.setVmId(chosenVM.getId());
    }

    /**
     * Finds the best time slot available to minimize the finish time of the
     * given task in the vm with the constraint of not scheduling it before
     * readyTime. If occupySlot is true, reserves the time slot in the schedule.
     * 限制為不能在準(zhǔn)備時(shí)間之前開(kāi)始調(diào)度,若ocupuSlot存放的為true,即已占用時(shí)隙,則在schedu中保存這個(gè)時(shí)隙
     * @param task The task to have the time slot reserved
     * @param vm The vm that will execute the task
     * @param readyTime The first moment that the task is available to be
     * scheduled
     * @param occupySlot If true, reserves the time slot in the schedule.
     * @return The minimal finish time of the task in the vmn
     */
    private double findFinishTime(Task task, CondorVM vm, double readyTime,
            boolean occupySlot) {
        List<Event> sched = schedules.get(vm);//schedules中對(duì)應(yīng)vm的列表
        double computationCost = computationCosts.get(task).get(vm);
        double start, finish;
        int pos;//pos

        if (sched.isEmpty()) {
            if (occupySlot) {
                //若vm對(duì)應(yīng)的列表為空,且有要預(yù)留的時(shí)間段,則把這段時(shí)間加入列表中
                sched.add(new Event(readyTime, readyTime + computationCost));
            }
            return readyTime + computationCost;//返回預(yù)留時(shí)間段的結(jié)束時(shí)間
        }

        if (sched.size() == 1) {//如果vm列表中已經(jīng)有一個(gè)時(shí)間段
            if (readyTime >= sched.get(0).finish) {//準(zhǔn)備時(shí)間大于已有時(shí)間段的結(jié)束時(shí)間
                pos = 1;//位置賦值1
                start = readyTime;
            } else if (readyTime + computationCost <= sched.get(0).start) {
                //否則若時(shí)間段的結(jié)束時(shí)間小于已有時(shí)間段的開(kāi)始時(shí)間
                pos = 0;//位置賦值1
                start = readyTime;
            } else {//若時(shí)間段和已有事件的時(shí)間段有重合
                pos = 1;//則放到已有事件之后
                start = sched.get(0).finish;//并且開(kāi)始時(shí)間設(shè)為已有事件段的結(jié)束時(shí)間
            }

            if (occupySlot) {//若要預(yù)留
                sched.add(pos, new Event(start, start + computationCost));
            }
            return start + computationCost;
        }

        // Trivial case: Start after the latest task scheduled
        //一般情況:在最后一個(gè)調(diào)度
        //開(kāi)始時(shí)間設(shè)為準(zhǔn)備時(shí)間和原先列表中最后一個(gè)事件結(jié)束時(shí)間中的最大值
        start = Math.max(readyTime, sched.get(sched.size() - 1).finish);
        finish = start + computationCost;
        int i = sched.size() - 1;//i為列表中最后一個(gè)事件的位置
        int j = sched.size() - 2;//j為列表中倒數(shù)第二個(gè)事件的位置
        pos = i + 1;//pos為該時(shí)間段要插入的位置
        while (j >= 0) {//當(dāng)列表中已經(jīng)存在事件
            Event current = sched.get(i);//最后一個(gè)事件記為current
            Event previous = sched.get(j);//倒數(shù)第二個(gè)記為previous

            if (readyTime > previous.finish) {
                if (readyTime + computationCost <= current.start) {
                    //若該時(shí)間段恰好在最后一個(gè)和倒數(shù)第二個(gè)時(shí)間段的中間
                    start = readyTime;
                    finish = readyTime + computationCost;
                    pos = i;
                }

                break;
            }
            if (previous.finish + computationCost <= current.start) {
                //若從倒數(shù)第二個(gè)事件結(jié)束時(shí)間恰好開(kāi)始,在最有一個(gè)事件的開(kāi)始前能完成
                start = previous.finish;
                finish = previous.finish + computationCost;
                pos = i;
            }
            i--;//將位置向前挪動(dòng)進(jìn)行判斷
            j--;
        }

        if (readyTime + computationCost <= sched.get(0).start) {
            //當(dāng)該時(shí)間段預(yù)設(shè)的完成時(shí)間在第一個(gè)事件開(kāi)始之前
            pos = 0;
            start = readyTime;

            if (occupySlot) {
                //若預(yù)置設(shè)定為是,則插入第一個(gè)位置
                sched.add(pos, new Event(start, start + computationCost));
            }
            return start + computationCost;
        }
        if (occupySlot) {
            sched.add(pos, new Event(start, finish));
        }
        return finish;
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容