import numpy as np
import networkx as nx
import random
import sys
import os
import logging
import warnings
import pandas as pd
import math
import GC_utils
import GC_Baran1989
import GridCalEngine.api as gce # For interfacing with the GridCal API
from GridCalEngine.IO.file_handler import FileOpen, FileSave
warnings.filterwarnings('ignore') # Ignore warnings during execution
## based on Jakus 2020
#network define (case33, case16,....)
#define algorithm settings(Niter, Npop, ...)
#population size: = 14
#mutation probability: = 2%
#number of individuals in the initial population generated using SBEA: = 4
#number of elite individuals: = 1.
#generate the initial population
#SBEA
#run the Baran algorithm, based on fitness (powerflow/losses/voltage drop) on each branch and obtain a radial configuration
#randomly suffle the the obtained configuration
#modify the configuration by the objective function defined (power losses, voltage drop)
#Random Kruskal
#generate random weights for the branches/lines
#find the Kruskal's MST for this configuration
#generate Npop-Nsbea configurations
#Merge the populations generated by SBEA and Random Kruskal
#Loop for Niter
#Calculate the fitness function for each individual in the generated population
#Detection of Nel elite individuals
#Selection of the individuals for the crossover process
#offspring generation through crossover of selected pairs of individuals
#offspring mutation
#replacement of worst individuals obtained after crossover and mutation with the elite individuals
#final power flow for the obtained topology
[docs]
class Jakus2020:
"""Implements the Jakus 2020 algorithm for distribution network reconfiguration (DNR).
This class utilizes a genetic algorithms to optimize the distribution network
reconfiguration by minimizing fitness (power losses/voltage drop) and ensuring radial connectivity.
Attributes:
net (DistributionNetwork): The distribution network instance.
PopulationSize (int): Size of the population for the genetic algorithm.
TieLines (list): List of Tie lines Lists. [previously was a single Tie Lines list for supplying Baran algorithm]
MutationProbability (float): Probability of mutation during the genetic process.
#discarted : PopulationSize_SBEA (int): Population size for the SBEA. #this parameter is not in use, รง
the SBEA candidates will be generated externally and added in the TieLines parameter
ElitePopulation (int): Number of elite individuals to retain.
Niter (int): Number of iterations for the genetic algorithm.
NumParentPairs (int): Number of parent pairs for crossover.
OffspringSize (int): Size of the offspring population.
Candidates (list): List of candidate solutions.
verbose (int): Logging level for verbose output.
"""
[docs]
def __init__(self, grid=None, TieLines=None, PopulationSize=10, MutationProbability=0.02,
ElitePopulation=4, Niter=10, fitness_ratio=1, loss_factor = 2, verbose_logging=logging.WARNING):
"""Initializes the Jakus2020 class.
Args:
net (DistributionNetwork): The distribution network instance.
TieLines (list): List of tie lines in the network.
PopulationSize (int): Size of the population for the genetic algorithm.
MutationProbability (float): Probability of mutation during the genetic process.
PopulationSize_SBEA (int): Population size for the SBEA.
ElitePopulation (int): Number of elite individuals to retain.
Niter (int): Number of iterations for the genetic algorithm.
verbose_logging (int): Logging level for verbose output.
"""
self.grid = grid
self.PopulationSize = PopulationSize
self.TieLines = TieLines
self.MutationProbability : float = MutationProbability
self.PopulationSize_SBEA :int = len(TieLines)
self.ElitePopulation : int = ElitePopulation
self.Niter : int = Niter
self.OffspringSize = self.PopulationSize - self.ElitePopulation
self.NumParentPairs = round(self.OffspringSize / 2)
self.Candidates : list = []
self.verbose = verbose_logging
self.ConvergenceList : list = []
logging.getLogger('jakus2020.py').setLevel(self.verbose) # Set logging level
logging.getLogger('jakus2020.py').debug("Created instance of GA")
self.NumPF=0
self.totalLoad = abs(sum([self.grid.loads[i].P for i in range(len(self.grid.loads))]))
self.loss_factor = loss_factor
self.fitness_ratio = fitness_ratio
def __FitnessCalculation(self, config=None):
self.NumPF+=1
if config:
GC_utils.NetworkReconfiguration(self.grid, all=True, value_all=True, selected_configuration= config, value_configuration=False)
fitness = GC_utils.GC_FitnessCalculation(self.grid, self.totalLoad, loss_factor=self.loss_factor, split_factor=self.fitness_ratio)
return fitness
[docs]
def SortCandidates(self, Candidates, Loops):
SortedCandidates = []
for candidate in Candidates:
#print("######## Candidate ", candidate)
sorted_candidate = np.zeros(len(candidate))
persistence = False
occurrences = [[1 if num in sublist else 0 for sublist in Loops] for num in candidate]
occurrences_array = np.array(occurrences)
while not persistence:
persistence = True
column_summary = occurrences_array.sum(axis=0)
#print(occurrences_array)
#print("column_summary", column_summary)
for idx, element in enumerate(column_summary):
if element == 1:
persistence = False
idx_candidate = np.where(occurrences_array[:, idx] == 1)[0]
#print(f"Candidate:{int(candidate[idx_candidate[0]])} column:{idx} - {occurrences_array[:, idx]} found:{idx_candidate}")
sorted_candidate[idx] = int(candidate[idx_candidate[0]])
occurrences_array[:, idx] = 0
occurrences_array[idx_candidate[0], :] = 0
row_summary = occurrences_array.sum(axis=1)
#print(occurrences_array)
#print("sorted candidate:", sorted_candidate)
#print("row_summary", row_summary)
for idx, element in enumerate(row_summary):
if element == 1:
persistence = False
idx_candidate = np.where(occurrences_array[idx, :] == 1)[0]
#print(f"Candidate:{int(candidate[idx])} row:{idx} - {occurrences_array[idx, :]} found:{idx_candidate}")
sorted_candidate[idx_candidate[0]] = int(candidate[idx])
occurrences_array[idx, :] = 0
occurrences_array[:, idx_candidate[0]] = 0
#print("sorted candidate:", sorted_candidate)
if persistence:
#print("persistence")
for idx, element in enumerate(column_summary):
if element:
persistence = False
idx_candidate = np.where(occurrences_array[:, idx] == 1)[0]
#print(f"Candidate:{int(candidate[idx_candidate[0]])} column:{idx} - {occurrences_array[:, idx]} found:{idx_candidate}")
sorted_candidate[idx] = int(candidate[idx_candidate[0]])
occurrences_array[:, idx] = 0
occurrences_array[idx_candidate[0], :] = 0
break
#print("sorted candidate:", sorted_candidate)
SortedCandidates.append(list(sorted_candidate))
return SortedCandidates
[docs]
def RandomCandidateGenerator(self, n=1):
#the Jakus paper proposed to create the candidates by generating MST with random weights
Candidates = []
GC_utils.NetworkReconfiguration(self.grid, all=True, selected_configuration=None, value_all=True, value_configuration=False)
NetworkLoops = GC_utils.SearchLoopsLines(self.grid, flagUsed=True)
for idx in range(n):
candidate = self._RandomKruskalCandidate()
Candidates.append(candidate)
return Candidates
def _RandomKruskalCandidate(self):
"""Generates a new candidate solution using Kruskal's algorithm. (from Jakus GA algorithm)
Arguments:
type = 'CurrentKruskal' (default): generates a MST based on the network current with all tielines closed
'RandomKruskal' : generates a MST based on random weights in all the edges
'PSO' : generates a random ST, using a set of Loops without repeated edges
Loops = the network loops to be considered for PSO case
variation = (real) the weight to apply on the randomization of the edge currents, which allow to generate different valid candidate solutions
Returns:
A candidate configuration
"""
logging.getLogger('DistributionNetwork.py').debug("_RandomCandidate: generate a new candidate with Kruskal")
GC_utils.NetworkReconfiguration(self.grid, all=True, selected_configuration=None, value_all=True, value_configuration=False)
self.graph = GC_utils.GC2Graph(self.grid)
for idx, (u, v, attributes) in enumerate(self.graph.edges(data=True)):
attributes['weight'] = 1 / (random.random())
MinSpanningTree = nx.minimum_spanning_tree(self.graph, weight='weight', algorithm="kruskal")
GC_utils.Graph2GC(MinSpanningTree, self.grid)
outofservice = GC_utils.LinesOutofService(self.grid)
fitness,_ = self.CalculateFitness(configuration=outofservice)
return (outofservice,fitness)
[docs]
def BaranCandidateGenerator(self):
# Create an MSTgreedy object
baran = GC_Baran1989.Baran1989(self.grid, TieLines=self.TieLines,verbose_logging=logging.ERROR)
# Solve the Minimum Spanning Tree problem
disabled_lines = baran.Solve()
fitness,_ = self.CalculateFitness(configuration=disabled_lines)
return (disabled_lines,fitness)
[docs]
def InitialPopulationGeneration(self):
"""Generates the initial population combining SBEA and Kruskal's algorithm.
This method combines candidates generated from SBEA and Kruskal's algorithms
and sorts them based on their fitness values.
"""
logging.getLogger('jakus2020.py').debug("InitialPopulationGeneration: generate the initial population")
fitness, radial = self.CalculateFitness(configuration=self.TieLines)
logging.getLogger('jakus2020.py').debug(f"Initial TieLines Candidate (fitness:{fitness}) : {self.TieLines}")
self.Candidates.append((self.TieLines,fitness))
self.Candidates.append(self.BaranCandidateGenerator())
self.Candidates.extend(self.RandomCandidateGenerator(n=self.PopulationSize))
logging.getLogger('jakus2020.py').debug(f"InitialPopulationGeneration len: {len(self.Candidates)}")
[docs]
def CalculateFitness(self, configuration):
"""Calculates the fitness of a candidate configuration.
Args:
candidate: List of selected configurations to evaluate.
Returns:
A tuple containing the fitness value and a boolean indicating if the configuration is valid.
"""
GC_utils.NetworkReconfiguration(self.grid, all=True, value_all=True,
selected_configuration=list(configuration),
value_configuration=False)
radialconnected, _, _ = GC_utils.CheckRadialConnectedNetwork(self.grid)
if radialconnected:
losses = self.__FitnessCalculation()
fitness_value = losses ####change fitness
logging.getLogger('network.py').debug(
f"The configuration is complaint with the radiality constraint and has a fitness={fitness_value}")
return (fitness_value, True)
logging.getLogger('network.py').debug(f"The configuration does not comply with the radiality constraint")
return (np.inf, False) # Return infinite fitness for invalid configurations
[docs]
def ParentsSelection(self):
"""Generates offspring candidates from the current population.
The method calculates the fitness for each candidate in the current population
and selects parent pairs based on their fitness values.
Returns:
list: A list of parent pairs for generating offspring.
"""
logging.getLogger('jakus2020.py').debug("OffspringGeneration")
Fitnesses = []
Fitnesses = [t[1] for t in self.Candidates]
TotalFitnesses = sum(Fitnesses)
IndividualProbability = [x / TotalFitnesses for x in Fitnesses]
logging.getLogger('jakus2020.py').debug(f"*************** OffspringGeneration - fitness: {Fitnesses} ")
CumulativeProbability = np.cumsum(IndividualProbability)
if math.isnan(CumulativeProbability[0]):
logging.getLogger('jakus2020.py').debug(f"ERRORRRRRRRRRRRRRRRRR")
logging.getLogger('jakus2020.py').debug(f"*************** OffspringGeneration3: {TotalFitnesses} // {IndividualProbability} ")
# logging.getLogger('jakus2020.py').debug(f"*************** OffspringGeneration3a: idx: {idx} // lenCumulative: {lenCumulative} CumulativeProbability={CumulativeProbability} len(ParentPairs)={len(ParentPairs)}")
return
ParentPairs = []
idx = 0
lenCumulative = len(CumulativeProbability)
logging.getLogger('jakus2020.py').debug("OffspringGeneration")
logging.getLogger('jakus2020.py').debug(f"*************** OffspringGeneration3: {len(ParentPairs)} // {self.NumParentPairs} // {lenCumulative}")
while len(ParentPairs) < self.NumParentPairs:
r = random.random()
if idx < lenCumulative:
if (r > CumulativeProbability[idx - 1]) & (r < CumulativeProbability[idx]):
ParentPairs.append((self.Candidates[idx], self.Candidates[idx - 1]))
idx += 1
#logging.getLogger('jakus2020.py').debug(f"*************** OffspringGeneration3a: idx: {idx} // lenCumulative: {lenCumulative} CumulativeProbability={CumulativeProbability} len(ParentPairs)={len(ParentPairs)}")
else:
idx = 0
return ParentPairs
[docs]
def OffspringCrossOver(self, Parents):
"""Performs crossover between parent candidates to generate offspring.
Args:
Parents (list): A list containing two parent candidate configurations.
Returns:
list: The candidate configuration of the offspring generated.
"""
#logging.getLogger('jakus2020.py').debug("OffspringCrossOver")
GC_utils.NetworkReconfiguration(self.grid, all=True, value_all=True, selected_configuration=Parents[0][0], value_configuration=False)
b1= np.array([1 if line.active else 0 for line in self.grid.lines])
GC_utils.NetworkReconfiguration(self.grid, all=True, value_all=True, selected_configuration=Parents[1][0], value_configuration=False)
b2= np.array([1 if line.active else 0 for line in self.grid.lines])
parents_and = np.array([1 if b1[i]==1 and b2[i]==1 else 0 for i in range(len(self.grid.lines))])
parents_nand = np.array([1 if b1[i]==0 and b2[i]==0 else 0 for i in range(len(self.grid.lines))])
GC_utils.NetworkReconfiguration(grid=self.grid, all=True, value_all=True)
graph=GC_utils.GC2Graph(self.grid)
for idx,line in enumerate(self.grid.lines):
if parents_nand[idx]==1:
try:
graph.remove_edge(line.bus_from.idtag,line.bus_to.idtag)
except:
pass
elif parents_and[idx]==1:
graph.edges[line.bus_from.idtag,line.bus_to.idtag]['weight']=0
else:
graph.edges[line.bus_from.idtag,line.bus_to.idtag]['weight']=random.random()*10
graph2 = nx.minimum_spanning_tree(graph, weight="weight")
self.grid = GC_utils.Graph2GC(graph2, grid=self.grid)
candidate = GC_utils.LinesOutofService(self.grid)
radiality,_,_ = GC_utils.CheckRadialConnectedNetwork(self.grid)
if radiality==False: #if the new candidate is not radiality compliant, it is substituted by one of the parents
logging.getLogger('jakus2020.py').info(f"BAD CROSSOVER P1,P2= {Parents[0][0]},{Parents[1][0]} child={candidate}")
candidate = Parents[0][0]
logging.getLogger('jakus2020.py').debug(f"Crossover P1,P2= {Parents[0][0]},{Parents[1][0]} child={candidate}")
return candidate
[docs]
def MutationProcess(self, candidate):
"""Processes mutation on a given candidate configuration.
If a mutation occurs, a random disabled line is activated and a new
minimum spanning tree is generated based on the modified configuration.
Args:
candidate (list): A candidate configuration of the network.
Returns:
list: The modified candidate configuration after mutation.
"""
logging.getLogger('jakus2020.py').debug("MutationProcess")
candidateIN = candidate.copy()
mutation_idx = random.random()
if (mutation_idx < self.MutationProbability):
ActivatedLine = random.choice(candidate)
candidate.remove(ActivatedLine)
GC_utils.NetworkReconfiguration(self.grid, all=True, value_all=True, selected_configuration=candidate, value_configuration=False)
graph = GC_utils.GC2Graph(self.grid)
for _, (_, _, attributes) in enumerate(graph.edges(data=True)):
attributes['weight'] = random.random() * 100.
MinSpanningTree = nx.minimum_spanning_tree(graph, weight='weight', algorithm='kruskal')
self.grid = GC_utils.Graph2GC(MinSpanningTree, self.grid)
candidate = GC_utils.LinesOutofService(self.grid)
logging.getLogger('jakus2020.py').debug(f"MutationProcess:{candidateIN}->{candidate}")
return candidate
[docs]
def Solve(self):
"""Solves the distribution network reconfiguration problem.
This method runs the genetic algorithm for a specified number of iterations,
generating offspring and selecting the best candidates from the population.
Returns:
list: The best candidate configuration found after the iterations.
"""
logging.getLogger('jakus2020.py').info("Solve")
self.InitialPopulationGeneration()
logging.getLogger('jakus2020.py').debug(f"---------------------------------------New iter={[candidate[1] for candidate in self.Candidates]}")
logging.getLogger('jakus2020.py').debug(f"---------------------------------------New iter={[candidate[0] for candidate in self.Candidates]}")
self.ConvergenceList = []
# Loop for Niter
for iter in range(self.Niter):
logging.getLogger('jakus2020.py').debug(f"---------------------------------------New iter={iter}")
# Offspring generation
self.Candidates = sorted(self.Candidates, key=lambda x: x[1], reverse=True)
ParentPairs = self.ParentsSelection()
newCandidates = []
for Parents in ParentPairs:
newCandidateA = self.OffspringCrossOver(Parents)
newCandidateB = self.OffspringCrossOver(Parents)
# Mutation process from the two offsprings
newCandidateA = self.MutationProcess(newCandidateA)
newCandidateB = self.MutationProcess(newCandidateB)
FitnessA, _ = self.CalculateFitness(newCandidateA)
FitnessB, _ = self.CalculateFitness(newCandidateB)
newCandidates.append((newCandidateA, FitnessA))
newCandidates.append((newCandidateB, FitnessB))
# Elitism: Detecting elite individuals from the new candidates
self.Candidates = sorted(self.Candidates, key=lambda x: x[1], reverse=False) ####change fitness True)
del self.Candidates[-(self.OffspringSize):] # Remove worst candidates
#newCandidates = sorted(newCandidates, key=lambda x: x[1], reverse=True)
self.Candidates = self.Candidates + newCandidates
self.Candidates = sorted(self.Candidates, key=lambda x: x[1], reverse=False) ####change fitness True)
logging.getLogger('jakus2020.py').debug(f"END OF ITER ={iter} with best candidate {self.Candidates}")
self.ConvergenceList.append(self.Candidates)
logging.getLogger('jakus2020.py').info(f"ITER {iter}")
return self.Candidates[0][0] # Return the best candidate configuration
if __name__ == '__main__':
print('Algorithms to find the optimal distribution network configuration')
logging.basicConfig(
level=logging.ERROR, # Set the log level to DEBUG
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # Set the log format
datefmt='%Y-%m-%d %H:%M:%S' # Set the date format
)
case=1
# Create a network object
if case==1:
gridGC = FileOpen("D:\\15_Thesis-code\\DistributionNetwork_libraries\\NetworkExamples\\gridcal\\case33.gridcal").open()
TieLinesName=['line 32','line 33','line 34','line 35','line 36']
if case==2:
gridGC = FileOpen("D:\\15_Thesis-code\\DistributionNetwork_libraries\\NetworkExamples\\gridcal\\case69.gridcal").open()
TieLinesName = ['line 57','line 10','line 69','line 14','line 19']
if case==3:
gridGC = FileOpen("D:\\15_Thesis-code\\DistributionNetwork_libraries\\NetworkExamples\\gridcal\\case118.m").open()
TieLinesName = ['75_77_1', '69_75_1', '77_80_1', '80_97_1', '94_95_1', '92_94_1', '105_106_1', '100_103_1', '100_104_1', '103_110_1', '103_104_1', '92_102_1', '80_99_1', '80_98_1', '92_93_1', '89_90_1', '85_88_1', '82_83_1', '83_84_1', '68_81_1', '62_66_1', '60_61_1', '64_65_1', '59_60_1', '63_64_1', '54_55_1', '55_56_1', '54_56_1', '49_51_1', '51_52_1', '49_50_1', '49_54_1', '49_69_1', '45_46_1', '46_47_1', '34_37_1', '37_39_1', '40_42_1', '40_41_1', '15_19_1', '15_17_1', '27_32_1', '23_25_1', '17_31_1', '17_113_1', '27_28_1', '23_24_1', '24_72_1', '19_20_1', '4_5_1', '8_30_1', '3_5_1', '12_14_1', '12_16_1', '5_6_1', '1_2_1', '17_18_1', '34_36_1', '47_69_1', '77_78_1', '70_74_1', '69_70_1']
if case==4:
import pandapower as pp
import simbench as sb
import GC_PandaPowerImporter
sb_code1 = "1-HVMV-urban-2.203-0-no_sw"
gridPP = sb.get_simbench_net(sb_code1)
gridPP.switch.drop([232,234,236,238,240, 242,244,246], inplace=True)
gridPP.trafo.drop([1,3,4], inplace=True)
gridPP.line.drop(set([123,226,139,140,151,161,166,170,173,178,180,186,187,188,208,223,225,123,226,227,232,228,229,230,231,227,232,233]), inplace=True)
gridPP.ext_grid.at[0,'name']="grid_ext"
gridPP.line['in_service'] = True
pp.runpp(gridPP)
gridGC = GC_PandaPowerImporter.PP2GC(gridPP)
TieLinesName=['1_2_1', '1_24_1', '1_36_1', '1_47_1', '51_52_1', '1_60_1', '1_74_1', '1_85_1', '117_181_1', '171_117_1', '117_125_1', '127_164_1', '121_188_1', '146_147_1', '171_181_1', '116_196_1', '116_154_1']
TieLinesID=GC_utils.GC_Line_Name2idtag_array(gridGC, TieLinesName)
_, loss = GC_utils.GC_PowerFlow(gridGC, config=TieLinesID)
radiality = GC_utils.CheckRadialConnectedNetwork(gridGC)
print(f"Original network:{loss}, radiality:{radiality} is {GC_utils.GC_Line_idtag2name_array(gridGC,TieLinesID)}" )
hypers =[1]
for hyper in hypers:
# Create an MSTgreedy object
jakus = Jakus2020(gridGC,Niter=32, TieLines=TieLinesID, PopulationSize=32, ElitePopulation=4, MutationProbability=0.4, fitness_ratio=1, loss_factor=2, verbose_logging=logging.ERROR)
# Solve the Minimum Spanning Tree problem
disabled_lines = jakus.Solve()
# Print the list of disabled line indices
GC_utils.NetworkReconfiguration(gridGC,all=True, value_all=True, selected_configuration=jakus.Candidates[0][0], value_configuration=False)
_,loss = GC_utils.GC_PowerFlow(gridGC)
radiality = GC_utils.CheckRadialConnectedNetwork(gridGC)
print(f"{hyper} The new optimal configuration losses:{loss}, radiality:{radiality}, numPF:{jakus.NumPF}") # is {GC_utils.GC_Line_idtag2name_array(gridGC, disabled_lines)}" )
GC_utils.NetworkReconfiguration(gridGC,all=True, value_all=True, selected_configuration=disabled_lines, value_configuration=False)
_,loss = GC_utils.GC_PowerFlow(gridGC)
radiality = GC_utils.CheckRadialConnectedNetwork(gridGC)
print(f"{hyper} The new optimal configuration losses:{loss}, radiality:{radiality}, numPF:{jakus.NumPF}") # is {GC_utils.GC_Line_idtag2name_array(gridGC, disabled_lines)}" )