Source code for GC_Jakus2020

import numpy as np
import networkx as nx
import random
import sys
import os
import logging
import warnings
import pandas as pd
import math

import GC_utils 
import GC_Baran1989

import GridCalEngine.api as gce  # For interfacing with the GridCal API
from GridCalEngine.IO.file_handler import FileOpen, FileSave

warnings.filterwarnings('ignore')  # Ignore warnings during execution

## based on Jakus 2020

#network define (case33, case16,....)
#define algorithm settings(Niter, Npop, ...)
    #population size: = 14
    #mutation probability: = 2%
    #number of individuals in the initial population generated using SBEA: = 4
    #number of elite individuals: = 1.

#generate the initial population
    #SBEA
        #run the Baran algorithm, based on fitness (powerflow/losses/voltage drop) on each branch and obtain a radial configuration
        #randomly suffle the the obtained configuration 
        #modify the configuration by the objective function defined (power losses, voltage drop)
        
    #Random Kruskal
        #generate random weights for the branches/lines
        #find the Kruskal's MST for this configuration
        #generate Npop-Nsbea configurations

#Merge the populations generated by SBEA and Random Kruskal

#Loop for Niter
    #Calculate the fitness function for each individual in the generated population
    #Detection of Nel elite individuals
    #Selection of the individuals for the crossover process
    #offspring generation through crossover of selected pairs of individuals
    #offspring mutation
    #replacement of worst individuals obtained after crossover and mutation with the elite individuals

#final power flow for the obtained topology

[docs] class Jakus2020: """Implements the Jakus 2020 algorithm for distribution network reconfiguration (DNR). This class utilizes a genetic algorithms to optimize the distribution network reconfiguration by minimizing fitness (power losses/voltage drop) and ensuring radial connectivity. Attributes: net (DistributionNetwork): The distribution network instance. PopulationSize (int): Size of the population for the genetic algorithm. TieLines (list): List of Tie lines Lists. [previously was a single Tie Lines list for supplying Baran algorithm] MutationProbability (float): Probability of mutation during the genetic process. #discarted : PopulationSize_SBEA (int): Population size for the SBEA. #this parameter is not in use, รง the SBEA candidates will be generated externally and added in the TieLines parameter ElitePopulation (int): Number of elite individuals to retain. Niter (int): Number of iterations for the genetic algorithm. NumParentPairs (int): Number of parent pairs for crossover. OffspringSize (int): Size of the offspring population. Candidates (list): List of candidate solutions. verbose (int): Logging level for verbose output. """
[docs] def __init__(self, grid=None, TieLines=None, PopulationSize=10, MutationProbability=0.02, ElitePopulation=4, Niter=10, fitness_ratio=1, loss_factor = 2, verbose_logging=logging.WARNING): """Initializes the Jakus2020 class. Args: net (DistributionNetwork): The distribution network instance. TieLines (list): List of tie lines in the network. PopulationSize (int): Size of the population for the genetic algorithm. MutationProbability (float): Probability of mutation during the genetic process. PopulationSize_SBEA (int): Population size for the SBEA. ElitePopulation (int): Number of elite individuals to retain. Niter (int): Number of iterations for the genetic algorithm. verbose_logging (int): Logging level for verbose output. """ self.grid = grid self.PopulationSize = PopulationSize self.TieLines = TieLines self.MutationProbability : float = MutationProbability self.PopulationSize_SBEA :int = len(TieLines) self.ElitePopulation : int = ElitePopulation self.Niter : int = Niter self.OffspringSize = self.PopulationSize - self.ElitePopulation self.NumParentPairs = round(self.OffspringSize / 2) self.Candidates : list = [] self.verbose = verbose_logging self.ConvergenceList : list = [] logging.getLogger('jakus2020.py').setLevel(self.verbose) # Set logging level logging.getLogger('jakus2020.py').debug("Created instance of GA") self.NumPF=0 self.totalLoad = abs(sum([self.grid.loads[i].P for i in range(len(self.grid.loads))])) self.loss_factor = loss_factor self.fitness_ratio = fitness_ratio
def __FitnessCalculation(self, config=None): self.NumPF+=1 if config: GC_utils.NetworkReconfiguration(self.grid, all=True, value_all=True, selected_configuration= config, value_configuration=False) fitness = GC_utils.GC_FitnessCalculation(self.grid, self.totalLoad, loss_factor=self.loss_factor, split_factor=self.fitness_ratio) return fitness
[docs] def SortCandidates(self, Candidates, Loops): SortedCandidates = [] for candidate in Candidates: #print("######## Candidate ", candidate) sorted_candidate = np.zeros(len(candidate)) persistence = False occurrences = [[1 if num in sublist else 0 for sublist in Loops] for num in candidate] occurrences_array = np.array(occurrences) while not persistence: persistence = True column_summary = occurrences_array.sum(axis=0) #print(occurrences_array) #print("column_summary", column_summary) for idx, element in enumerate(column_summary): if element == 1: persistence = False idx_candidate = np.where(occurrences_array[:, idx] == 1)[0] #print(f"Candidate:{int(candidate[idx_candidate[0]])} column:{idx} - {occurrences_array[:, idx]} found:{idx_candidate}") sorted_candidate[idx] = int(candidate[idx_candidate[0]]) occurrences_array[:, idx] = 0 occurrences_array[idx_candidate[0], :] = 0 row_summary = occurrences_array.sum(axis=1) #print(occurrences_array) #print("sorted candidate:", sorted_candidate) #print("row_summary", row_summary) for idx, element in enumerate(row_summary): if element == 1: persistence = False idx_candidate = np.where(occurrences_array[idx, :] == 1)[0] #print(f"Candidate:{int(candidate[idx])} row:{idx} - {occurrences_array[idx, :]} found:{idx_candidate}") sorted_candidate[idx_candidate[0]] = int(candidate[idx]) occurrences_array[idx, :] = 0 occurrences_array[:, idx_candidate[0]] = 0 #print("sorted candidate:", sorted_candidate) if persistence: #print("persistence") for idx, element in enumerate(column_summary): if element: persistence = False idx_candidate = np.where(occurrences_array[:, idx] == 1)[0] #print(f"Candidate:{int(candidate[idx_candidate[0]])} column:{idx} - {occurrences_array[:, idx]} found:{idx_candidate}") sorted_candidate[idx] = int(candidate[idx_candidate[0]]) occurrences_array[:, idx] = 0 occurrences_array[idx_candidate[0], :] = 0 break #print("sorted candidate:", sorted_candidate) SortedCandidates.append(list(sorted_candidate)) return SortedCandidates
[docs] def RandomCandidateGenerator(self, n=1): #the Jakus paper proposed to create the candidates by generating MST with random weights Candidates = [] GC_utils.NetworkReconfiguration(self.grid, all=True, selected_configuration=None, value_all=True, value_configuration=False) NetworkLoops = GC_utils.SearchLoopsLines(self.grid, flagUsed=True) for idx in range(n): candidate = self._RandomKruskalCandidate() Candidates.append(candidate) return Candidates
def _RandomKruskalCandidate(self): """Generates a new candidate solution using Kruskal's algorithm. (from Jakus GA algorithm) Arguments: type = 'CurrentKruskal' (default): generates a MST based on the network current with all tielines closed 'RandomKruskal' : generates a MST based on random weights in all the edges 'PSO' : generates a random ST, using a set of Loops without repeated edges Loops = the network loops to be considered for PSO case variation = (real) the weight to apply on the randomization of the edge currents, which allow to generate different valid candidate solutions Returns: A candidate configuration """ logging.getLogger('DistributionNetwork.py').debug("_RandomCandidate: generate a new candidate with Kruskal") GC_utils.NetworkReconfiguration(self.grid, all=True, selected_configuration=None, value_all=True, value_configuration=False) self.graph = GC_utils.GC2Graph(self.grid) for idx, (u, v, attributes) in enumerate(self.graph.edges(data=True)): attributes['weight'] = 1 / (random.random()) MinSpanningTree = nx.minimum_spanning_tree(self.graph, weight='weight', algorithm="kruskal") GC_utils.Graph2GC(MinSpanningTree, self.grid) outofservice = GC_utils.LinesOutofService(self.grid) fitness,_ = self.CalculateFitness(configuration=outofservice) return (outofservice,fitness)
[docs] def BaranCandidateGenerator(self): # Create an MSTgreedy object baran = GC_Baran1989.Baran1989(self.grid, TieLines=self.TieLines,verbose_logging=logging.ERROR) # Solve the Minimum Spanning Tree problem disabled_lines = baran.Solve() fitness,_ = self.CalculateFitness(configuration=disabled_lines) return (disabled_lines,fitness)
[docs] def InitialPopulationGeneration(self): """Generates the initial population combining SBEA and Kruskal's algorithm. This method combines candidates generated from SBEA and Kruskal's algorithms and sorts them based on their fitness values. """ logging.getLogger('jakus2020.py').debug("InitialPopulationGeneration: generate the initial population") fitness, radial = self.CalculateFitness(configuration=self.TieLines) logging.getLogger('jakus2020.py').debug(f"Initial TieLines Candidate (fitness:{fitness}) : {self.TieLines}") self.Candidates.append((self.TieLines,fitness)) self.Candidates.append(self.BaranCandidateGenerator()) self.Candidates.extend(self.RandomCandidateGenerator(n=self.PopulationSize)) logging.getLogger('jakus2020.py').debug(f"InitialPopulationGeneration len: {len(self.Candidates)}")
[docs] def CalculateFitness(self, configuration): """Calculates the fitness of a candidate configuration. Args: candidate: List of selected configurations to evaluate. Returns: A tuple containing the fitness value and a boolean indicating if the configuration is valid. """ GC_utils.NetworkReconfiguration(self.grid, all=True, value_all=True, selected_configuration=list(configuration), value_configuration=False) radialconnected, _, _ = GC_utils.CheckRadialConnectedNetwork(self.grid) if radialconnected: losses = self.__FitnessCalculation() fitness_value = losses ####change fitness logging.getLogger('network.py').debug( f"The configuration is complaint with the radiality constraint and has a fitness={fitness_value}") return (fitness_value, True) logging.getLogger('network.py').debug(f"The configuration does not comply with the radiality constraint") return (np.inf, False) # Return infinite fitness for invalid configurations
[docs] def ParentsSelection(self): """Generates offspring candidates from the current population. The method calculates the fitness for each candidate in the current population and selects parent pairs based on their fitness values. Returns: list: A list of parent pairs for generating offspring. """ logging.getLogger('jakus2020.py').debug("OffspringGeneration") Fitnesses = [] Fitnesses = [t[1] for t in self.Candidates] TotalFitnesses = sum(Fitnesses) IndividualProbability = [x / TotalFitnesses for x in Fitnesses] logging.getLogger('jakus2020.py').debug(f"*************** OffspringGeneration - fitness: {Fitnesses} ") CumulativeProbability = np.cumsum(IndividualProbability) if math.isnan(CumulativeProbability[0]): logging.getLogger('jakus2020.py').debug(f"ERRORRRRRRRRRRRRRRRRR") logging.getLogger('jakus2020.py').debug(f"*************** OffspringGeneration3: {TotalFitnesses} // {IndividualProbability} ") # logging.getLogger('jakus2020.py').debug(f"*************** OffspringGeneration3a: idx: {idx} // lenCumulative: {lenCumulative} CumulativeProbability={CumulativeProbability} len(ParentPairs)={len(ParentPairs)}") return ParentPairs = [] idx = 0 lenCumulative = len(CumulativeProbability) logging.getLogger('jakus2020.py').debug("OffspringGeneration") logging.getLogger('jakus2020.py').debug(f"*************** OffspringGeneration3: {len(ParentPairs)} // {self.NumParentPairs} // {lenCumulative}") while len(ParentPairs) < self.NumParentPairs: r = random.random() if idx < lenCumulative: if (r > CumulativeProbability[idx - 1]) & (r < CumulativeProbability[idx]): ParentPairs.append((self.Candidates[idx], self.Candidates[idx - 1])) idx += 1 #logging.getLogger('jakus2020.py').debug(f"*************** OffspringGeneration3a: idx: {idx} // lenCumulative: {lenCumulative} CumulativeProbability={CumulativeProbability} len(ParentPairs)={len(ParentPairs)}") else: idx = 0 return ParentPairs
[docs] def OffspringCrossOver(self, Parents): """Performs crossover between parent candidates to generate offspring. Args: Parents (list): A list containing two parent candidate configurations. Returns: list: The candidate configuration of the offspring generated. """ #logging.getLogger('jakus2020.py').debug("OffspringCrossOver") GC_utils.NetworkReconfiguration(self.grid, all=True, value_all=True, selected_configuration=Parents[0][0], value_configuration=False) b1= np.array([1 if line.active else 0 for line in self.grid.lines]) GC_utils.NetworkReconfiguration(self.grid, all=True, value_all=True, selected_configuration=Parents[1][0], value_configuration=False) b2= np.array([1 if line.active else 0 for line in self.grid.lines]) parents_and = np.array([1 if b1[i]==1 and b2[i]==1 else 0 for i in range(len(self.grid.lines))]) parents_nand = np.array([1 if b1[i]==0 and b2[i]==0 else 0 for i in range(len(self.grid.lines))]) GC_utils.NetworkReconfiguration(grid=self.grid, all=True, value_all=True) graph=GC_utils.GC2Graph(self.grid) for idx,line in enumerate(self.grid.lines): if parents_nand[idx]==1: try: graph.remove_edge(line.bus_from.idtag,line.bus_to.idtag) except: pass elif parents_and[idx]==1: graph.edges[line.bus_from.idtag,line.bus_to.idtag]['weight']=0 else: graph.edges[line.bus_from.idtag,line.bus_to.idtag]['weight']=random.random()*10 graph2 = nx.minimum_spanning_tree(graph, weight="weight") self.grid = GC_utils.Graph2GC(graph2, grid=self.grid) candidate = GC_utils.LinesOutofService(self.grid) radiality,_,_ = GC_utils.CheckRadialConnectedNetwork(self.grid) if radiality==False: #if the new candidate is not radiality compliant, it is substituted by one of the parents logging.getLogger('jakus2020.py').info(f"BAD CROSSOVER P1,P2= {Parents[0][0]},{Parents[1][0]} child={candidate}") candidate = Parents[0][0] logging.getLogger('jakus2020.py').debug(f"Crossover P1,P2= {Parents[0][0]},{Parents[1][0]} child={candidate}") return candidate
[docs] def MutationProcess(self, candidate): """Processes mutation on a given candidate configuration. If a mutation occurs, a random disabled line is activated and a new minimum spanning tree is generated based on the modified configuration. Args: candidate (list): A candidate configuration of the network. Returns: list: The modified candidate configuration after mutation. """ logging.getLogger('jakus2020.py').debug("MutationProcess") candidateIN = candidate.copy() mutation_idx = random.random() if (mutation_idx < self.MutationProbability): ActivatedLine = random.choice(candidate) candidate.remove(ActivatedLine) GC_utils.NetworkReconfiguration(self.grid, all=True, value_all=True, selected_configuration=candidate, value_configuration=False) graph = GC_utils.GC2Graph(self.grid) for _, (_, _, attributes) in enumerate(graph.edges(data=True)): attributes['weight'] = random.random() * 100. MinSpanningTree = nx.minimum_spanning_tree(graph, weight='weight', algorithm='kruskal') self.grid = GC_utils.Graph2GC(MinSpanningTree, self.grid) candidate = GC_utils.LinesOutofService(self.grid) logging.getLogger('jakus2020.py').debug(f"MutationProcess:{candidateIN}->{candidate}") return candidate
[docs] def Solve(self): """Solves the distribution network reconfiguration problem. This method runs the genetic algorithm for a specified number of iterations, generating offspring and selecting the best candidates from the population. Returns: list: The best candidate configuration found after the iterations. """ logging.getLogger('jakus2020.py').info("Solve") self.InitialPopulationGeneration() logging.getLogger('jakus2020.py').debug(f"---------------------------------------New iter={[candidate[1] for candidate in self.Candidates]}") logging.getLogger('jakus2020.py').debug(f"---------------------------------------New iter={[candidate[0] for candidate in self.Candidates]}") self.ConvergenceList = [] # Loop for Niter for iter in range(self.Niter): logging.getLogger('jakus2020.py').debug(f"---------------------------------------New iter={iter}") # Offspring generation self.Candidates = sorted(self.Candidates, key=lambda x: x[1], reverse=True) ParentPairs = self.ParentsSelection() newCandidates = [] for Parents in ParentPairs: newCandidateA = self.OffspringCrossOver(Parents) newCandidateB = self.OffspringCrossOver(Parents) # Mutation process from the two offsprings newCandidateA = self.MutationProcess(newCandidateA) newCandidateB = self.MutationProcess(newCandidateB) FitnessA, _ = self.CalculateFitness(newCandidateA) FitnessB, _ = self.CalculateFitness(newCandidateB) newCandidates.append((newCandidateA, FitnessA)) newCandidates.append((newCandidateB, FitnessB)) # Elitism: Detecting elite individuals from the new candidates self.Candidates = sorted(self.Candidates, key=lambda x: x[1], reverse=False) ####change fitness True) del self.Candidates[-(self.OffspringSize):] # Remove worst candidates #newCandidates = sorted(newCandidates, key=lambda x: x[1], reverse=True) self.Candidates = self.Candidates + newCandidates self.Candidates = sorted(self.Candidates, key=lambda x: x[1], reverse=False) ####change fitness True) logging.getLogger('jakus2020.py').debug(f"END OF ITER ={iter} with best candidate {self.Candidates}") self.ConvergenceList.append(self.Candidates) logging.getLogger('jakus2020.py').info(f"ITER {iter}") return self.Candidates[0][0] # Return the best candidate configuration
if __name__ == '__main__': print('Algorithms to find the optimal distribution network configuration') logging.basicConfig( level=logging.ERROR, # Set the log level to DEBUG format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # Set the log format datefmt='%Y-%m-%d %H:%M:%S' # Set the date format ) case=1 # Create a network object if case==1: gridGC = FileOpen("D:\\15_Thesis-code\\DistributionNetwork_libraries\\NetworkExamples\\gridcal\\case33.gridcal").open() TieLinesName=['line 32','line 33','line 34','line 35','line 36'] if case==2: gridGC = FileOpen("D:\\15_Thesis-code\\DistributionNetwork_libraries\\NetworkExamples\\gridcal\\case69.gridcal").open() TieLinesName = ['line 57','line 10','line 69','line 14','line 19'] if case==3: gridGC = FileOpen("D:\\15_Thesis-code\\DistributionNetwork_libraries\\NetworkExamples\\gridcal\\case118.m").open() TieLinesName = ['75_77_1', '69_75_1', '77_80_1', '80_97_1', '94_95_1', '92_94_1', '105_106_1', '100_103_1', '100_104_1', '103_110_1', '103_104_1', '92_102_1', '80_99_1', '80_98_1', '92_93_1', '89_90_1', '85_88_1', '82_83_1', '83_84_1', '68_81_1', '62_66_1', '60_61_1', '64_65_1', '59_60_1', '63_64_1', '54_55_1', '55_56_1', '54_56_1', '49_51_1', '51_52_1', '49_50_1', '49_54_1', '49_69_1', '45_46_1', '46_47_1', '34_37_1', '37_39_1', '40_42_1', '40_41_1', '15_19_1', '15_17_1', '27_32_1', '23_25_1', '17_31_1', '17_113_1', '27_28_1', '23_24_1', '24_72_1', '19_20_1', '4_5_1', '8_30_1', '3_5_1', '12_14_1', '12_16_1', '5_6_1', '1_2_1', '17_18_1', '34_36_1', '47_69_1', '77_78_1', '70_74_1', '69_70_1'] if case==4: import pandapower as pp import simbench as sb import GC_PandaPowerImporter sb_code1 = "1-HVMV-urban-2.203-0-no_sw" gridPP = sb.get_simbench_net(sb_code1) gridPP.switch.drop([232,234,236,238,240, 242,244,246], inplace=True) gridPP.trafo.drop([1,3,4], inplace=True) gridPP.line.drop(set([123,226,139,140,151,161,166,170,173,178,180,186,187,188,208,223,225,123,226,227,232,228,229,230,231,227,232,233]), inplace=True) gridPP.ext_grid.at[0,'name']="grid_ext" gridPP.line['in_service'] = True pp.runpp(gridPP) gridGC = GC_PandaPowerImporter.PP2GC(gridPP) TieLinesName=['1_2_1', '1_24_1', '1_36_1', '1_47_1', '51_52_1', '1_60_1', '1_74_1', '1_85_1', '117_181_1', '171_117_1', '117_125_1', '127_164_1', '121_188_1', '146_147_1', '171_181_1', '116_196_1', '116_154_1'] TieLinesID=GC_utils.GC_Line_Name2idtag_array(gridGC, TieLinesName) _, loss = GC_utils.GC_PowerFlow(gridGC, config=TieLinesID) radiality = GC_utils.CheckRadialConnectedNetwork(gridGC) print(f"Original network:{loss}, radiality:{radiality} is {GC_utils.GC_Line_idtag2name_array(gridGC,TieLinesID)}" ) hypers =[1] for hyper in hypers: # Create an MSTgreedy object jakus = Jakus2020(gridGC,Niter=32, TieLines=TieLinesID, PopulationSize=32, ElitePopulation=4, MutationProbability=0.4, fitness_ratio=1, loss_factor=2, verbose_logging=logging.ERROR) # Solve the Minimum Spanning Tree problem disabled_lines = jakus.Solve() # Print the list of disabled line indices GC_utils.NetworkReconfiguration(gridGC,all=True, value_all=True, selected_configuration=jakus.Candidates[0][0], value_configuration=False) _,loss = GC_utils.GC_PowerFlow(gridGC) radiality = GC_utils.CheckRadialConnectedNetwork(gridGC) print(f"{hyper} The new optimal configuration losses:{loss}, radiality:{radiality}, numPF:{jakus.NumPF}") # is {GC_utils.GC_Line_idtag2name_array(gridGC, disabled_lines)}" ) GC_utils.NetworkReconfiguration(gridGC,all=True, value_all=True, selected_configuration=disabled_lines, value_configuration=False) _,loss = GC_utils.GC_PowerFlow(gridGC) radiality = GC_utils.CheckRadialConnectedNetwork(gridGC) print(f"{hyper} The new optimal configuration losses:{loss}, radiality:{radiality}, numPF:{jakus.NumPF}") # is {GC_utils.GC_Line_idtag2name_array(gridGC, disabled_lines)}" )