遺傳算法實踐(六) 最小費用流問題

最小費用流問題描述

在網(wǎng)絡(luò)模型中,除了考慮網(wǎng)絡(luò)上各邊的容量以外,通常還需要考慮各邊上流動的費用。最小費用流問題就是在給定網(wǎng)絡(luò)模型中各節(jié)點的需求量和供應(yīng)量的情況下,如何分配流量和路徑,使得費用達到最小的問題。

在由m個節(jié)點的點集合Vn個邊的邊集合A組成的圖G=(V,A)中,當(dāng)邊(i,j)上的最大容量u_{ij}和費用c_{ij}給定時,最小費用流問題的數(shù)學(xué)模型可以描述為:
min\ z=\sum_{i=1}^{m}\sum_{j=1}^{m}c_{ij}x_{ij} \\s.t.\ \sum_{j\in Suc(i)}x_{ij}-\sum_{k\in Pre(i)}x_{ki}=\left\{ \begin{array} qq,\ i=1\\ 0,\ i=2,3,...,m-1\\ -q,\ i=m \end{array} \right. \\0 \le x_{ij} \le u_{ij}, \ i,j=1,2,...,m
其中x_{ij}代表從節(jié)點i到節(jié)點j的流量,Suc(i)Pre(i)分別為節(jié)點i的后續(xù)節(jié)點集和前續(xù)節(jié)點集,\sum_{k\in Pre(i)}x_{ki}表示從其他節(jié)點進入到節(jié)點i的總流量,而\sum_{k\in Suc(i)}x_{ij}表示從節(jié)點i流出的流量。

求解最小費用流問題的代表性算法有Successive shortest path algorithm,Primal-dual法和Out of Kilter法等。

最小費用流問題示例

一個最小費用流問題的示例問題如圖所示:

MinimumCostFlow_description

遺傳算法求解最小費用流問題

個體編碼

此處依然采用優(yōu)先級編碼,對各個節(jié)點給予優(yōu)先級;在生成路徑時根據(jù)優(yōu)先級挑選路徑下一步的節(jié)點。

解碼

同最大流量問題的解碼。

其他遺傳算法操作

  • 評價函數(shù):解碼過程中得到的各邊流量和代價的乘積之和
  • 育種選擇:binary錦標(biāo)賽選擇
  • 變異算法:交叉-有序交叉(OX),突變-亂序突變(Shuffle Index)
  • 環(huán)境選擇:子代完全替換父代(無精英保留)

代碼示例

完整代碼如下:

## 環(huán)境設(shè)定
import numpy as np
import matplotlib.pyplot as plt
from deap import base, tools, creator, algorithms
import random

params = {
    'font.family': 'serif',
    'figure.dpi': 300,
    'savefig.dpi': 300,
    'font.size': 12,
    'legend.fontsize': 'small'
}
plt.rcParams.update(params)

import copy
# --------------------
# 問題定義
creator.create('FitnessMax', base.Fitness, weights=(-1.0,))  # 最小化問題
creator.create('Individual', list, fitness=creator.FitnessMax)

# 個體編碼
toolbox = base.Toolbox()
geneLength = 25
toolbox.register('perm', np.random.permutation, geneLength)
toolbox.register('individual', tools.initIterate,
                 creator.Individual, toolbox.perm)

# 評價函數(shù)
# 類似鏈表,存儲每個節(jié)點的可行路徑,用于解碼
nodeDict = {'1': [2,3,4,5,6], '2': [7,8], '3': [7,8,9], '4': [8,9,10], '5': [9,10,11], 
            '6': [10,11], '7': [13], '8': [12,14], '9': [13,15], '10': [14,16],
            '11':[15], '12':[17], '13':[17,18], '14':[18,19], '15':[19,20],
            '16':[20,21], '17':[18,22], '18':[19,22,23], '19':[20,23,24],
            '20':[23,24], '21':[24], '22':[25], '23':[25], '24':[25]
           }

def genPath(ind, nodeDict=nodeDict):
    '''輸入一個優(yōu)先度序列之后,返回一條從節(jié)點1到節(jié)點25的可行路徑 '''
    path = [1]
    endNode = len(ind)
    while not path[-1] == endNode:
        curNode = path[-1]  # 當(dāng)前所在節(jié)點
        if nodeDict[str(curNode)]:  # 當(dāng)前節(jié)點指向的下一個節(jié)點不為空時,到達下一個節(jié)點
            toBeSelected = nodeDict[str(curNode)]  # 獲取可以到達的下一個節(jié)點列表
        else:
            return path
        priority = np.asarray(ind)[np.asarray(
            toBeSelected)-1]  # 獲取優(yōu)先級,注意列表的index是0-9
        nextNode = toBeSelected[np.argmax(priority)]
        path.append(nextNode)
    return path


# 存儲每條邊的剩余容量,用于計算路徑流量和更新節(jié)點鏈表
capacityDict = {'1,2': 20, '1,3': 20, '1,4': 20, '1,5': 20, '1,6': 20,
                '2,7': 10, '2,8': 8,
               '3,7': 6, '3,8': 5, '3,9': 4, 
                '4,8': 5, '4,9': 8, '4,10': 10,
                '5,9': 10, '5,10': 4, '5,11': 10,
                '6,10': 10, '6,11': 10,
                '7,13': 15,
                '8,12': 15, '8,14': 15,
                '9,13': 15, '9,15': 15,
                '10,14': 15, '10,16': 15,
                '11,15': 15,
                '12,17': 20,
                '13,17': 20, '13,18': 20,
                '14,18': 20, '14,19': 20,
                '15,19': 20, '15,20': 20,
                '16,20': 20, '16,21': 20,
                '17,18': 8, '17,22': 25,
                '18,19': 8, '18,22': 25, '18,23': 20,
                '19,20': 8, '19,23': 10, '19,24': 25,
                '20,23': 10, '20,24': 25,
                '21,24': 25,
                '22,25': 30, '23,25': 30, '24,25':30}

# 存儲每條邊的代價
costDict = {'1,2': 10, '1,3': 13, '1,4': 32, '1,5': 135, '1,6': 631,
                '2,7': 10, '2,8': 13,
               '3,7': 10, '3,8': 15, '3,9': 33, 
                '4,8': 4, '4,9': 15, '4,10': 35,
                '5,9': 3, '5,10': 13, '5,11': 33,
                '6,10': 7, '6,11': 7,
                '7,13': 10,
                '8,12': 4, '8,14': 9,
                '9,13': 11, '9,15': 12,
                '10,14': 9, '10,16': 14,
                '11,15': 5,
                '12,17': 8,
                '13,17': 6, '13,18': 7,
                '14,18': 7, '14,19': 7,
                '15,19': 5, '15,20': 14,
                '16,20': 4, '16,21': 14,
                '17,18': 11, '17,22': 11,
                '18,19': 5, '18,22': 8, '18,23': 34,
                '19,20': 3, '19,23': 10, '19,24': 35,
                '20,23': 3, '20,24': 14,
                '21,24': 12,
                '22,25': 10, '23,25': 2, '24,25':3}

def traceCapacity(path, capacityDict):
    ''' 獲取給定path的最大流量,更新各邊容量 '''
    pathEdge = list(zip(path[::1], path[1::1]))
    keys = []
    edgeCapacity = []
    for edge in pathEdge:
        key = str(edge[0]) + ',' + str(edge[1])
        keys.append(key)  # 保存edge對應(yīng)的key
        edgeCapacity.append(capacityDict[key])  # 該邊對應(yīng)的剩余容量
    pathFlow = min(edgeCapacity)  # 路徑上的最大流量
    # 更新各邊的剩余容量
    for key in keys:
        capacityDict[key] -= pathFlow  # 注意這里是原位修改
    return pathFlow

def updateNodeDict(capacityDict, nodeDict):
    ''' 對剩余流量為0的節(jié)點,刪除節(jié)點指向;對于鏈表指向為空的節(jié)點,由于沒有下一步可以移動的方向,
    從其他所有節(jié)點的指向中刪除該節(jié)點
    '''
    for edge, capacity in capacityDict.items():
        if capacity == 0:
            key, toBeDel = str(edge).split(',')  # 用來索引節(jié)點字典的key,和需要刪除的節(jié)點toBeDel
            if int(toBeDel) in nodeDict[key]:
                nodeDict[key].remove(int(toBeDel))
    delList = []
    for node, nextNode in nodeDict.items():
        if not nextNode:  # 如果鏈表指向為空的節(jié)點,從其他所有節(jié)點的指向中刪除該節(jié)點
            delList.append(node)
    for delNode in delList:
        for node, nextNode in nodeDict.items():
            if delNode in nextNode:
                nodeDict[node].remove(delNode)

def calCost(path, pathFlow, costDict):
    '''計算給定路徑的成本'''
    pathEdge = list(zip(path[::1], path[1::1]))
    keys = []
    edgeCost = []
    for edge in pathEdge:
        key = str(edge[0]) + ',' + str(edge[1])
        keys.append(key)  # 保存edge對應(yīng)的key
        edgeCost.append(costDict[key])  # 該邊對應(yīng)的cost 
    pathCost = sum([eachEdgeCost*pathFlow for eachEdgeCost in edgeCost])
    return pathCost

def evaluate(ind, outputPaths=False):
    '''評價函數(shù)'''
    # 初始化所需變量
    nodeDictCopy = copy.deepcopy(nodeDict)  # 淺復(fù)制
    capacityDictCopy = copy.deepcopy(capacityDict)
    paths = []
    pathFlows = []
    overallCost = 0
    givenFlow = 70 # 需要運送的流量
    eps = 1e-5
    # 開始循環(huán)
    while nodeDictCopy['1'] and (abs(givenFlow) > eps):
        path = genPath(ind, nodeDictCopy)  # 生成路徑
        # 當(dāng)路徑無法抵達終點,說明經(jīng)過這個節(jié)點已經(jīng)無法往下走,從所有其他節(jié)點的指向中刪除該節(jié)點
        if path[-1] != geneLength:
             for node, nextNode in nodeDictCopy.items():
                 if path[-1] in nextNode:
                     nodeDictCopy[node].remove(path[-1])
             continue
        paths.append(path) # 保存路徑
        pathFlow = traceCapacity(path, capacityDictCopy) # 計算路徑最大流量
        if givenFlow < pathFlow: # 當(dāng)剩余流量不能填滿該路徑的最大流量時,將所有剩余流量分配給該路徑
            pathFlow = givenFlow            
        pathFlows.append(pathFlow) # 保存路徑的流量
        givenFlow -= pathFlow # 更新需要運送的剩余流量
        updateNodeDict(capacityDictCopy, nodeDictCopy) # 更新節(jié)點鏈表
        # 計算路徑上的cost
        pathCost = calCost(path, pathFlow, costDict)
        overallCost += pathCost        
    if outputPaths:
        return overallCost, paths, pathFlows
    return overallCost,
toolbox.register('evaluate', evaluate)

# 迭代數(shù)據(jù)
stats = tools.Statistics(key=lambda ind:ind.fitness.values)
stats.register('min', np.min)
stats.register('avg', np.mean)
stats.register('std', np.std)

# 生成初始族群
toolbox.popSize = 100
toolbox.register('population', tools.initRepeat, list, toolbox.individual)
pop = toolbox.population(toolbox.popSize)

# 注冊工具
toolbox.register('select', tools.selTournament, tournsize=2)
toolbox.register('mate', tools.cxOrdered)
toolbox.register('mutate', tools.mutShuffleIndexes, indpb=0.5)

# --------------------
# 遺傳算法參數(shù)
toolbox.ngen = 200
toolbox.cxpb = 0.8
toolbox.mutpb = 0.05

# 遺傳算法主程序部分
hallOfFame = tools.HallOfFame(maxsize=1)
pop,logbook = algorithms.eaSimple(pop, toolbox, cxpb=toolbox.cxpb, mutpb=toolbox.mutpb,
                   ngen = toolbox.ngen, stats=stats, halloffame=hallOfFame, verbose=True)

結(jié)果如下:

# 輸出結(jié)果
from pprint import pprint
bestInd = hallOfFame.items[0]
overallCost, paths, pathFlows = eval(bestInd, outputPaths=True)
print('最優(yōu)解路徑為: ')
pprint(paths)
print('各路徑上的流量為:'+str(pathFlows))
print('最小費用為: '+str(overallCost))

# 可視化迭代過程
minFit = logbook.select('min')
avgFit = logbook.select('avg')
plt.plot(minFit, 'b-', label='Minimum Fitness')
plt.plot(avgFit, 'r-', label='Average Fitness')
plt.xlabel('# Gen')
plt.ylabel('Fitness')
plt.legend(loc='best')

## 結(jié)果:
#最優(yōu)解路徑為: 
#[[1, 2, 7, 13, 17, 22, 25],
# [1, 2, 8, 14, 19, 20, 23, 25],
# [1, 3, 7, 13, 17, 22, 25],
# [1, 3, 9, 13, 17, 22, 25],
# [1, 3, 8, 14, 19, 23, 25],
# [1, 4, 9, 13, 17, 22, 25],
# [1, 4, 9, 13, 18, 22, 25],
# [1, 4, 8, 14, 19, 23, 25],
# [1, 4, 8, 12, 17, 22, 25],
# [1, 4, 10, 16, 20, 23, 25],
# [1, 4, 10, 16, 20, 24, 25],
# [1, 5, 9, 13, 18, 19, 23, 25],
# [1, 5, 9, 15, 20, 24, 25],
# [1, 5, 11, 15, 20, 24, 25]]
#各路徑上的流量為:[10, 8, 5, 4, 5, 1, 7, 2, 3, 2, 5, 3, 7, 8]
#最小費用為: 6971

迭代過程的可視化如下:
MinimumCostProblem_iterations

得到的最優(yōu)解可視化如下:

MinimumCostFlow_rsl
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容