Source code for GC_Khalil_Gorpinich2012

import numpy as np
import networkx as nx
import random
import sys
import os
import logging
import warnings
import pandas as pd
import math

import GC_utils 

import GridCalEngine.api as gce  # For interfacing with the GridCal API
from GridCalEngine.IO.file_handler import FileOpen, FileSave

warnings.filterwarnings('ignore')  # Ignore warnings during execution

[docs] class BPSO: """Binary Particle Swarm Optimization (BPSO) class for optimizing network configurations. Attributes: net: An object representing the network to be optimized. NumCandidates: Number of candidate solutions to generate. NumOffLines: Number of lines that can be taken out of service. order: Optional parameter for ordering. flagUsed: Flag indicating whether to use certain properties. shuffled: Flag indicating whether to shuffle candidates. verbose_logging: Logging level for debug messages. """
[docs] def __init__(self, grid, NumCandidates=10, TieLines=None, fitness_ratio=1, loss_factor = 0.08, verbose_logging=logging.WARNING): """Initializes the BPSO with the given parameters. Args: net: Network object for optimization. NumCandidates: Number of candidate solutions. order: Order for configurations, if any. flagUsed: Whether to use specific properties. shuffled: Whether to shuffle candidates. verbose_logging: Logging level for debug messages. """ # PSO parameters self.grid = grid self.NumCandidates : int = NumCandidates self.TieLines : list = TieLines self.gbest : list = [] self.fgbest : int = 0 self.NumPF=0 self.totalLoad = abs(sum([self.grid.loads[i].P for i in range(len(self.grid.loads))])) self.loss_factor = loss_factor self.fitness_ratio = fitness_ratio # self.NumOffLines : int = self.net.NumLines - self.net.NumBuses + 1 # Calculate out-of-service lines GC_utils.NetworkReconfiguration(self.grid, all=True, selected_configuration=None, value_all=True, value_configuration=False) self.NetworkLoops = GC_utils.SearchLoopsLines(self.grid, flagUsed=True, shuffle=False) #following the paper, it should be shuffle=True, but not works properly self.NumOffLines: int = len(self.NetworkLoops) self.ConvergenceList : list = [] # Set logging level for the BPSO class self.logger = logging.getLogger('bpso.py') # Create a logger object for debug messages self.logger.setLevel(verbose_logging) # Set the logging level for debug messages # configurate the algorithm self.config() self.logger.info(f"candidates: {self.NetworkLoops}")
def __FitnessCalculation(self, config=None): self.NumPF+=1 if config: GC_utils.NetworkReconfiguration(self.grid, all=True, value_all=True, selected_configuration= config, value_configuration=False) fitness = GC_utils.GC_FitnessCalculation(self.grid, self.totalLoad, loss_factor=self.loss_factor, split_factor=self.fitness_ratio) return fitness def __powerflow(self, config=None): self.NumPF+=1 resultPF,old_losses = GC_utils.GC_PowerFlow(self.grid, config=config) return resultPF,old_losses
[docs] def RandomCandidateGenerator(self, n=1): #the Jakus paper proposed to create the candidates by generating MST with random weights RandomCandidates = [] idx =0 while idx < n: candidate = [random.choice(sublist) for sublist in self.NetworkLoops] #res, loss = GC_utils.GC_PowerFlow(self.grid, config=candidate) #res, loss = self.__powerflow(config= candidate) loss = self.__FitnessCalculation(config = candidate) radiality,_,_ = GC_utils.CheckRadialConnectedNetwork(self.grid) logging.getLogger('bpso.py').debug(f"Candidate generator (random){idx}/{n} : {loss,radiality} -- {candidate}") if radiality and loss < np.inf: RandomCandidates.append(candidate) logging.getLogger('bpso.py').debug(f"Candidate generator:{loss},{radiality} -- {candidate}") idx+=1 return RandomCandidates
[docs] def DefineActionSpace(self): """Defines the action space for the optimization based on network configurations.""" # Generate the list of loops self.ActionSpaceSize = self.NumOffLines # Set action space size based on out-of-service lines
[docs] def config(self): """Configures the BPSO instance by defining the action space and initializing parameters.""" self.DefineActionSpace() # Define action space based on the current network configuration # Create initial list of candidates and log their fitness self.fpbest = np.full(self.NumCandidates, np.inf) # Initialize personal best fitness values self.fitness = np.full(self.NumCandidates, np.inf) # Initialize fitness values for all candidates self.pbest = [] self.CreateCandidateList() for idx, candidate in enumerate(self.Candidates): self.logger.info(f"candidate[{idx}]:{candidate} : {self.fitness[idx]}") idx_best = np.argmin(self.fitness) self.gbest = self.Candidates[idx_best].copy() # Initialize global best candidate as the first candidate self.fgbest = self.fpbest[idx_best] # Initialize global best fitness as the one for the first candidate self.r1 = np.random.rand(self.NumCandidates, self.ActionSpaceSize) # Random values for local search self.r2 = np.random.rand(self.NumCandidates, self.ActionSpaceSize) # Random values for global search
[docs] def getConvergeceList(self): return self.ConvergenceList
[docs] def SortCandidates(self, Candidates, Loops): logging.getLogger('bpso.py').debug(f"SortCandidates") #logging.getLogger('bpso.py').debug(f"Loops to match {Loops}") SortedCandidates = [] for candidate in Candidates: #print("######## Candidate ", candidate) sorted_candidate = [] persistence = False occurrences = [[1 if num in sublist else 0 for sublist in Loops] for num in candidate] occurrences_array = np.array(occurrences) #logging.getLogger('bpso.py').debug(f"Candidate {candidate} occurences_array:{occurrences_array}") while not persistence: persistence = True column_summary = occurrences_array.sum(axis=0) #logging.getLogger('bpso.py').debug(f"occurences_array:{occurrences_array}") #logging.getLogger('bpso.py').debug(f"column_summary {column_summary}") for idx, element in enumerate(column_summary): if element == 1: persistence = False idx_candidate = np.where(occurrences_array[:, idx] == 1)[0] sorted_candidate.append(candidate[idx_candidate[0]]) occurrences_array[:, idx] = 0 occurrences_array[idx_candidate[0], :] = 0 row_summary = occurrences_array.sum(axis=1) #logging.getLogger('bpso.py').debug(f"occurences_array:{occurrences_array}") #logging.getLogger('bpso.py').debug(f"sorted candidate: {sorted_candidate}") #logging.getLogger('bpso.py').debug(f"row_summary {row_summary}") for idx, element in enumerate(row_summary): if element == 1: persistence = False idx_candidate = np.where(occurrences_array[idx, :] == 1)[0][0] #logging.getLogger('bpso.py').debug(f"Candidate:{int(candidate[idx])} row:{idx} - {occurrences_array[idx, :]} found:{idx_candidate}") logging.getLogger('bpso.py').debug(f"idx_candidate:{idx_candidate}") sorted_candidate[idx_candidate[0]] = candidate[idx] occurrences_array[idx, :] = 0 occurrences_array[:, idx_candidate] = 0 #print("sorted candidate:", sorted_candidate) if persistence: #print("persistence") for idx, element in enumerate(column_summary): if element: persistence = False idx_candidate = np.where(occurrences_array[:, idx] == 1)[0] #print(f"Candidate:{int(candidate[idx_candidate[0]])} column:{idx} - {occurrences_array[:, idx]} found:{idx_candidate}") sorted_candidate[idx] = candidate[idx_candidate[0]] occurrences_array[:, idx] = 0 occurrences_array[idx_candidate[0], :] = 0 break #logging.getLogger('bpso.py').debug(f"FINAL sorted candidate: {sorted_candidate}") SortedCandidates.append(list(sorted_candidate)) return SortedCandidates
[docs] def CalculateFitness(self, configuration): """Calculates the fitness of a candidate configuration. Args: candidate: List of selected configurations to evaluate. Returns: A tuple containing the fitness value and a boolean indicating if the configuration is valid. """ GC_utils.NetworkReconfiguration(self.grid, all=True, value_all=True, selected_configuration=list(configuration), value_configuration=False) radialconnected, _, _ = GC_utils.CheckRadialConnectedNetwork(self.grid) if radialconnected: #_,losses = GC_utils.GC_PowerFlow(self.grid) # Compute power flow for the network #_, losses = self.__powerflow() losses = self.__FitnessCalculation() fitness_value = losses logging.getLogger('network.py').debug( f"The configuration is complaint with the radiality constraint and has a fitness={fitness_value}") return (fitness_value, True) logging.getLogger('network.py').debug(f"The configuration does not comply with the radiality constraint") return (np.inf, False) # Return infinite fitness for invalid configurations
[docs] def UpdateCandidateFitness(self, candidate_idx): """Updates the fitness of a specific candidate and updates personal and global bests if needed. Args: candidate_idx: Index of the candidate to update. Returns: A boolean indicating if the candidate configuration is valid. """ fitness_Value, result = self.CalculateFitness(self.Candidates[candidate_idx]) self.fitness[candidate_idx] = fitness_Value # Update the fitness of the candidate if result: # Updating personal best (pbest) if self.fitness[candidate_idx] < self.fpbest[candidate_idx]: self.pbest[candidate_idx] = self.Candidates[candidate_idx].copy() self.fpbest[candidate_idx] = self.fitness[candidate_idx].copy() # Updating global best (gbest) if self.fitness[candidate_idx] < self.fgbest: self.gbest = self.Candidates[candidate_idx].copy() self.fgbest = self.fitness[candidate_idx].copy() logging.getLogger('bpso.py').debug(f"\t\t new gbest= {self.gbest} with fitness={self.fgbest}") else: self.logger.debug( f" bad valid candidate {candidate_idx} -- {self.Candidates[candidate_idx]} -- {self.fitness[candidate_idx]} ") return result
[docs] def CreateCandidateList(self): """Creates a list of candidates and evaluates their fitness.""" self.logger.debug("create candidates") self.Candidates = np.zeros((self.NumCandidates, self.ActionSpaceSize)) # Initialize candidate configurations if len(self.TieLines) > self.NumCandidates: self.TieLines = self.TieLines[:self.NumCandidates] # Add, as first candidates, the ones passed as "TieLines" argument to the algorithm # Generate candidates until the specified number is reached self.Candidates : list = self.RandomCandidateGenerator(n=self.NumCandidates-len(self.TieLines)) self.Candidates.extend(self.TieLines) self.logger.debug(f"TieLines: {self.TieLines}") self.logger.debug(f"Candidates: {self.Candidates}") #convert and adapt the Candidates list #self.Candidates = np.array(self.Candidates) # Convert list to numpy array for efficient processing #self.Candidates = [[int(element) for element in sublist] for sublist in self.Candidates] self.Candidates = self.SortCandidates(self.Candidates, self.NetworkLoops) self.pbest = self.Candidates.copy() #initialize the particular best to the initial value of the candidates for idx_candidates, candidate in enumerate(self.Candidates): self.UpdateCandidateFitness(idx_candidates) self.fpbest = self.fitness.copy()
#for candidate in self.Candidates: # self.logger.debug(candidate)
[docs] def CalculateNewPosition(self, candidate_idx, c1, c2, w): """Calculates the new position of a candidate based on its velocity and best known positions. Args: candidate_idx: Index of the candidate whose position is being updated. c1: Cognitive coefficient for personal best influence. c2: Social coefficient for global best influence. w: Inertia weight for velocity. Returns: A tuple of the new candidate position and the updated velocity. """ newValue = self.Candidates[candidate_idx].copy() # Copy the current candidate position oldValue = self.Candidates[candidate_idx].copy() # Copy for reference during updates for j in range(self.ActionSpaceSize): old_value = oldValue[j] gbest_idx = self.NetworkLoops[j].index(self.gbest[j]) # Index of the global best in the action space #self.logger.debug(f"\taa {candidate_idx},{j}") #self.logger.debug(f"\tbb {self.pbest[candidate_idx][j]}") #self.logger.debug(f"\tcc {self.NetworkLoops[j]}") pbest_idx = self.NetworkLoops[j].index(self.pbest[candidate_idx][j]) # Index of the personal best cand_idx = self.NetworkLoops[j].index(oldValue[j]) # Index of the current candidate # Calculate distances for velocity updates global_distance = gbest_idx - cand_idx local_distance = pbest_idx - cand_idx # Calculate corrections based on distances and randomness global_correction = c2 * self.r2[candidate_idx][j] * global_distance local_correction = c1 * self.r1[candidate_idx][j] * local_distance prev_velocity = self.velocity[candidate_idx][j] # Store the previous velocity self.velocity[candidate_idx][j] = w * self.velocity[candidate_idx][j] + local_correction + global_correction # Ensure velocity is within specified limits self.velocity[candidate_idx][j] = min(max(self.velocity[candidate_idx][j], self.vmin), self.vmax) # Randomly alter velocity if it hasn't changed if abs(self.velocity[candidate_idx][j]) == abs(prev_velocity): self.velocity[candidate_idx][j] = random.random() * self.velocity[candidate_idx][j] # Updating particles' coordinate scope = len(np.nonzero(self.NetworkLoops[j])[0]) - 1 # Scope of available lines on the loop # Sigmoid function to map the velocity to a new position sigmoid = 1 / (1 + math.exp( -self.beta * self.velocity[candidate_idx][j])) - 0.5 # Sigmoid function centered at 0 newPosition = cand_idx + int(np.floor(scope * sigmoid)) # Calculate new position based on sigmoid newPosition = min(max(newPosition, 0), scope - 1) # Clamp new position within bounds newValue[j] = self.NetworkLoops[j][newPosition] # Update new value for the candidate # Log details of the update process self.logger.debug( f"\tCandidate[{candidate_idx},{j}] - loc.dist.//global.dist ({self.pbest[candidate_idx][j]}/{pbest_idx}-{oldValue[j]}/{cand_idx})={local_distance} // {self.gbest[j]}/{gbest_idx}-{oldValue[j]}/{cand_idx})={global_distance} \t\t w//local correction//global correction//wnew//scope//sigmoid//newPosition//old//new value ( {w:.3f} {local_correction:.4f} {global_correction:.4f} {self.velocity[candidate_idx][j]:.4f} {scope} {sigmoid} {newPosition} {old_value} {newValue[j]}") return newValue, self.velocity[candidate_idx] # Return the new position and updated velocity
[docs] def UpdateCandidateNewPosition(self, candidate_idx, w, c1, c2): """Updates the position of a candidate based on the BPSO algorithm. Args: candidate_idx: Index of the candidate to update. w: Inertia weight for velocity calculation. c1: Cognitive coefficient for personal best influence. c2: Social coefficient for global best influence. """ self.logger.debug( f"\tCandidate[{candidate_idx}]: {self.Candidates[candidate_idx]} Pbest {self.pbest[candidate_idx]} Gbest {self.gbest} -self.velocity={self.velocity[candidate_idx]}") # Calculate new position and velocity for the candidate old_candidate = self.Candidates[candidate_idx].copy() self.Candidates[candidate_idx], self.velocity[candidate_idx] = self.CalculateNewPosition(candidate_idx, c1, c2, w) # Update fitness for the new candidate position if self.UpdateCandidateFitness(candidate_idx): self.logger.debug(f"\t\tnew Candidate[{candidate_idx}]: {self.Candidates[candidate_idx]}") else : self.logger.debug(f"\t\tnew Candidate[{candidate_idx}]: {self.Candidates[candidate_idx]} is not valid, so we dismiss the changes which creates the invalid solution") self.Candidates[candidate_idx] = old_candidate self.UpdateCandidateFitness(candidate_idx) #update gbest and fgbest, pbest, fpbest self.logger.debug(f"\t\t\t keep the previuos Candidate[{candidate_idx}]: {self.Candidates[candidate_idx]} ")
# @jit(target_backend='cuda')
[docs] def Solve(self, wmax=0.9, wmin=0.5, beta=1, maxiter=30, vmax=4, c1=2, c2=2): """Solves the optimization problem using the BPSO algorithm. Args: wmax: Maximum inertia weight. wmin: Minimum inertia weight. beta: Parameter for the sigmoid function. maxiter: Maximum number of iterations for the optimization. vmax: Maximum velocity for the particles. Returns: The best global configuration found during optimization. """ # Main loops self.logger.info("Start solving by BPSO") self.wmax = wmax self.wmin = wmin self.beta = beta self.vmax = vmax self.vmin = -vmax # Set minimum velocity to the negative of max velocity self.maxiter = maxiter self.c1 = c1 self.c2 = c2 self.velocity = np.random.uniform(self.vmin, self.vmax, size=(self.NumCandidates, self.NumOffLines)) # Initialize velocities self.ConvergenceList = [] iter = 0 # Iterate for a maximum number of iterations while iter < self.maxiter: iter += 1 self.logger.debug( f"Iter {iter} \n \t Candidates {self.Candidates} \n\t iter{iter} gbest[{self.gbest}]{self.fgbest} ") w = self.wmax - (self.wmax - self.wmin) * iter / self.maxiter # Calculate current inertia weight # self.c1 = 2 # Cognitive coefficient (could be randomized for variability) # self.c2 = 2 # Social coefficient (could be randomized for variability) for loop in self.NetworkLoops: self.logger.debug(f"loop : {loop}") # Update the position for each candidate for candidate_idx in range(self.NumCandidates): self.UpdateCandidateNewPosition(candidate_idx, w, self.c1, self.c2) self.ConvergenceList.append((self.gbest, self.fgbest)) self.logger.debug( f" iter{iter} gbest[{self.gbest}]{self.fgbest} candidate[0]:{self.fitness[1]}-{self.Candidates[1]}") return self.gbest # Return the best global candidate found
if __name__ == '__main__': print('Algorithms to find the optimal distribution network configuration') logging.basicConfig( level=logging.ERROR, # Set the log level to DEBUG format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # Set the log format datefmt='%Y-%m-%d %H:%M:%S' # Set the date format ) case=3 # Create a network object if case==1: gridGC = FileOpen("D:\\15_Thesis-code\\DistributionNetwork_libraries\\NetworkExamples\\gridcal\\case33.gridcal").open() TieLinesName=['line 32','line 33','line 34','line 35','line 36'] if case==2: gridGC = FileOpen("D:\\15_Thesis-code\\DistributionNetwork_libraries\\NetworkExamples\\gridcal\\case69.gridcal").open() TieLinesName = ['line 57','line 10','line 69','line 14','line 19'] if case==3: gridGC = FileOpen("D:\\15_Thesis-code\\DistributionNetwork_libraries\\NetworkExamples\\gridcal\\case118.m").open() TieLinesName = ['75_77_1', '69_75_1', '77_80_1', '80_97_1', '94_95_1', '92_94_1', '105_106_1', '100_103_1', '100_104_1', '103_110_1', '103_104_1', '92_102_1', '80_99_1', '80_98_1', '92_93_1', '89_90_1', '85_88_1', '82_83_1', '83_84_1', '68_81_1', '62_66_1', '60_61_1', '64_65_1', '59_60_1', '63_64_1', '54_55_1', '55_56_1', '54_56_1', '49_51_1', '51_52_1', '49_50_1', '49_54_1', '49_69_1', '45_46_1', '46_47_1', '34_37_1', '37_39_1', '40_42_1', '40_41_1', '15_19_1', '15_17_1', '27_32_1', '23_25_1', '17_31_1', '17_113_1', '27_28_1', '23_24_1', '24_72_1', '19_20_1', '4_5_1', '8_30_1', '3_5_1', '12_14_1', '12_16_1', '5_6_1', '1_2_1', '17_18_1', '34_36_1', '47_69_1', '77_78_1', '70_74_1', '69_70_1'] if case==4: import pandapower as pp import simbench as sb import GC_PandaPowerImporter sb_code1 = "1-HVMV-urban-2.203-0-no_sw" gridPP = sb.get_simbench_net(sb_code1) gridPP.switch.drop([232,234,236,238,240, 242,244,246], inplace=True) gridPP.trafo.drop([1,3,4], inplace=True) gridPP.line.drop(set([123,226,139,140,151,161,166,170,173,178,180,186,187,188,208,223,225,123,226,227,232,228,229,230,231,227,232,233]), inplace=True) gridPP.ext_grid.at[0,'name']="grid_ext" gridPP.line['in_service'] = True pp.runpp(gridPP) gridGC = GC_PandaPowerImporter.PP2GC(gridPP) TieLinesName=['1_2_1', '1_24_1', '1_36_1', '1_47_1', '51_52_1', '1_60_1', '1_74_1', '1_85_1', '117_181_1', '171_117_1', '117_125_1', '127_164_1', '121_188_1', '146_147_1', '171_181_1', '116_196_1', '116_154_1'] TieLinesID=GC_utils.GC_Line_Name2idtag_array(gridGC, TieLinesName) _, loss = GC_utils.GC_PowerFlow(gridGC, config=TieLinesID) radiality = GC_utils.CheckRadialConnectedNetwork(gridGC) print(f"Original network:{loss}, radiality:{radiality} is {GC_utils.GC_Line_idtag2name_array(gridGC,TieLinesID)}" ) for hyper in [1]: # Create an MSTgreedy object khalil = BPSO(gridGC, TieLines=[], NumCandidates=16, fitness_ratio=1, loss_factor=0.08, verbose_logging=logging.ERROR) # Solve the Minimum Spanning Tree problem disabled_lines = khalil.Solve(wmax=0.9, wmin=0.5, beta=1, maxiter=20, vmax=4, c1=2, c2=2) # Print the list of disabled line indices _,loss = GC_utils.GC_PowerFlow(gridGC, config=disabled_lines) radiality = GC_utils.CheckRadialConnectedNetwork(gridGC) print(f"{hyper} : The new optimal configuration losses:{loss}, radiality:{radiality}, numPF:{khalil.NumPF} ") #is {GC_utils.GC_Line_idtag2name_array(gridGC, disabled_lines)}" )