Source code for piglot.optimisers.pso

"""PSO optimiser module."""
from typing import Tuple, Callable, Optional
import logging
from collections import deque
import numpy as np
try:
    from pyswarms.single.global_best import GlobalBestPSO
    from pyswarms.backend.operators import compute_pbest
except ImportError:
    # Show a nice exception when this package is used
    from piglot.optimiser import missing_method
    GlobalBestPSO = missing_method("PSO", "pyswarms")
from piglot.objective import Objective
from piglot.optimiser import ScalarOptimiser


[docs] class GlobalBestPSOMod(GlobalBestPSO):
[docs] def optimize(self, optimiser, objective_func, iters, n_processes=None, verbose=False, **kwargs): """Optimize the swarm for a number of iterations Performs the optimization to evaluate the objective function :code:`f` for a number of iterations :code:`iter.` Parameters ---------- objective_func : callable objective function to be evaluated iters : int number of iterations n_processes : int number of processes to use for parallel particle evaluation (default: None = no parallelization) verbose : bool enable or disable the logs and progress bar (default: True = enable logs) kwargs : dict arguments for the objective function Returns ------- tuple the global best cost and the global best position. """ # Apply verbosity if verbose: log_level = logging.INFO else: log_level = logging.NOTSET # Populate memory of the handlers self.bh.memory = self.swarm.position self.vh.memory = self.swarm.position # Setup Pool of processes for parallel evaluation pool = None if n_processes is None else mp.Pool(n_processes) self.swarm.pbest_cost = np.full(self.swarm_size[0], np.inf) ftol_history = deque(maxlen=self.ftol_iter) for i in self.rep.pbar(iters, self.name) if verbose else range(iters): # Compute cost for current position and personal best # fmt: off #print(self.swarm) self.swarm.current_cost = self.compute_objective_function(self.swarm, objective_func, pool=pool, **kwargs) self.swarm.pbest_pos, self.swarm.pbest_cost = compute_pbest(self.swarm) # Set best_cost_yet_found for ftol best_cost_yet_found = self.swarm.best_cost self.swarm.best_pos, self.swarm.best_cost = self.top.compute_gbest(self.swarm) # fmt: on if verbose: self.rep.hook(best_cost=self.swarm.best_cost) # Save to history hist = self.ToHistory( best_cost=self.swarm.best_cost, mean_pbest_cost=np.mean(self.swarm.pbest_cost), mean_neighbor_cost=self.swarm.best_cost, position=self.swarm.position, velocity=self.swarm.velocity, ) self._populate_history(hist) # Verify stop criteria based on the relative acceptable cost ftol relative_measure = self.ftol * (1 + np.abs(best_cost_yet_found)) delta = ( np.abs(self.swarm.best_cost - best_cost_yet_found) < relative_measure ) if i < self.ftol_iter: ftol_history.append(delta) else: ftol_history.append(delta) if all(ftol_history): break # Perform options update self.swarm.options = self.oh( self.options, iternow=i, itermax=iters ) # Perform velocity and position updates self.swarm.velocity = self.top.compute_velocity( self.swarm, self.velocity_clamp, self.vh, self.bounds ) self.swarm.position = self.top.compute_position( self.swarm, self.bounds, self.bh ) if optimiser._progress_check(i+1, self.swarm.best_cost, self.swarm.pbest_pos[self.swarm.pbest_cost.argmin()]): break # Obtain the final best_cost and the final best_position final_best_cost = self.swarm.best_cost.copy() final_best_pos = self.swarm.pbest_pos[ self.swarm.pbest_cost.argmin() ].copy() # Close Pool of Processes if n_processes is not None: pool.close() return (final_best_cost, final_best_pos)
[docs] def compute_objective_function(self, swarm, objective_func, pool=None, **kwargs): """Evaluate particles using the objective function This method evaluates each particle in the swarm according to the objective function passed. If a pool is passed, then the evaluation of the particles is done in parallel using multiple processes. Parameters ---------- swarm : pyswarms.backend.swarms.Swarm a Swarm instance objective_func : function objective function to be evaluated pool: multiprocessing.Pool multiprocessing.Pool to be used for parallel particle evaluation kwargs : dict arguments for the objective function Returns ------- numpy.ndarray Cost-matrix for the given swarm """ func_value = [] if pool is None: for i in swarm.position: func_value.append(objective_func(i, **kwargs)) return func_value else: raise NotImplementedError("Parallel solving not implemented!")
# results = pool.map( # partial(objective_func, **kwargs), # np.array_split(swarm.position, pool._processes), # ) # return np.concatenate(results)
[docs] class PSO(ScalarOptimiser): """ PSO optimiser. Documentation: https://pyswarms.readthedocs.io/en/latest/_modules/pyswarms/single/global_best.html#GlobalBestPSO Attributes ---------- n_part : int number of particles in the swarm. options : dict with keys :code:`{'c1', 'c2', 'w'}` a dictionary containing the parameters for the specific optimization technique. * c1 : float cognitive parameter * c2 : float social parameter * w : float inertia parameter oh_strategy : dict, optional, default=None(constant options) a dict of update strategies for each option. bh_strategy : str a strategy for the handling of out-of-bounds particles. velocity_clamp : tuple, optional a tuple of size 2 where the first entry is the minimum velocity and the second entry is the maximum velocity. It sets the limits for velocity clamping. vh_strategy : str a strategy for the handling of the velocity of out-of-bounds particles. center : list (default is :code:`None`) an array of size :code:`dimensions` ftol_iter : int number of iterations over which the relative error in objective_func(best_pos) is acceptable for convergence. Default is :code:`1` n_processes : int number of processes to use for parallel particle evaluation (default: None = no parallelization) Methods ------- _optimise(self, func, n_dim, n_iter, bound, init_shot): Solves the optimization problem """ def __init__(self, objective: Objective, n_part, options, oh_strategy=None, bh_strategy='periodic', velocity_clamp=None, vh_strategy='unmodified', center=1.0, ftol_iter=1, n_processes=None): """ Constructs all the necessary attributes for the PSO optimiser Parameters ---------- objective : Objective Objective function to optimise. n_part : int number of particles in the swarm. options : dict with keys :code:`{'c1', 'c2', 'w'}` a dictionary containing the parameters for the specific optimization technique. * c1 : float cognitive parameter * c2 : float social parameter * w : float inertia parameter oh_strategy : dict, optional, default=None(constant options) a dict of update strategies for each option. bh_strategy : str a strategy for the handling of out-of-bounds particles. velocity_clamp : tuple, optional a tuple of size 2 where the first entry is the minimum velocity and the second entry is the maximum velocity. It sets the limits for velocity clamping. vh_strategy : str a strategy for the handling of the velocity of out-of-bounds particles. center : list (default is :code:`None`) an array of size :code:`dimensions` ftol_iter : int number of iterations over which the relative error in objective_func(best_pos) is acceptable for convergence. Default is :code:`1` n_processes : int number of processes to use for parallel particle evaluation (default: None = no parallelization) """ super().__init__('PSO', objective) self.n_part = n_part self.options = options self.oh_strategy = oh_strategy self.bh_strategy = bh_strategy self.velocity_clamp = velocity_clamp self.vh_strategy = vh_strategy self.center = center self.ftol_iter = ftol_iter self.n_processes = n_processes self.rng = np.random.default_rng(1) def _scalar_optimise( self, objective: Callable[[np.ndarray, Optional[bool]], float], n_dim: int, n_iter: int, bound: np.ndarray, init_shot: np.ndarray, ) -> Tuple[float, np.ndarray]: """ Abstract method for optimising the objective. Parameters ---------- objective : Callable[[np.ndarray], float] Objective function to optimise. n_dim : int Number of parameters to optimise. n_iter : int Maximum number of iterations. bound : np.ndarray Array where first and second columns correspond to lower and upper bounds, respectively. init_shot : np.ndarray Initial shot for the optimisation problem. Returns ------- float Best observed objective value. np.ndarray Observed optimum of the objective. """ if bound is not None: new_bound = tuple(map(tuple, np.stack((bound[:, 0], bound[:, 1])))) population = ((bound[:, 1] - bound[:, 0]) * self.rng.random(size=(self.n_part, n_dim)) + bound[:, 0]) population[0, :] = init_shot model = GlobalBestPSOMod(self.n_part, n_dim, self.options, new_bound, self.oh_strategy, self.bh_strategy, self.velocity_clamp, self.vh_strategy, self.center, -np.inf, self.ftol_iter, population) x, new_value = model.optimize(self, objective, n_iter, self.n_processes, False) return x, new_value