"""Main optimiser module"""
from __future__ import annotations
from typing import Dict, Any, Tuple, Callable, Optional
import os
import time
from abc import ABC, abstractmethod
import numpy as np
from tqdm import tqdm
from piglot.parameter import ParameterSet
from piglot.utils.assorted import pretty_time
from piglot.objective import Objective, GenericObjective
[docs]
def boundary_check(arg, bounds):
"""Check if the values are within the bounds and correct them if not.
Parameters
----------
arg : array
Values to check.
bounds : array
Lower and upper bounds.
Returns
-------
array
Corrected values.
"""
arg = np.where(arg > bounds[:, 1], bounds[:, 1], arg)
arg = np.where(arg < bounds[:, 0], bounds[:, 0], arg)
return arg
[docs]
def missing_method(name, package):
"""Class generator for missing packages.
Parameters
----------
name : str
Name of the missing method.
package : str
Name of the package to install.
"""
def err_func(name, package):
"""Raise an error for this missing method.
Parameters
----------
name : str
Name of the missing method.
package : str
Name of the package to install.
Raises
------
ImportError
Every time it is called.
"""
raise ImportError(f"{name} is not available. You need to install package {package}!")
return type(f'Missing_{package}', (),
{
'name': name,
'package': package,
'__init__': (lambda *args, **kwargs: err_func(name, package))
})
[docs]
class StoppingCriteria:
"""
Implements different stopping criteria.
Attributes
----------
conv_tol : float
Stop the optimiser if the loss becomes small than this value.
max_iters_no_improv : int
Stop the optimiser if the loss does not improve after this number of iterations in a row.
max_func_calls : int
Stop the optimiser after this number of function calls.
max_timeout : float
Stop the optimiser after this elapsed time (in seconds).
Methods
-------
check_criteria(loss_value, iters_no_improv, func_calls):
check the status of the stopping criteria.
"""
def __init__(
self,
conv_tol: float = None,
max_iters_no_improv: int = None,
max_func_calls: int = None,
max_timeout: float = None,
):
"""
Constructs all the necessary attributes for the stopping criteria.
Parameters
----------
conv_tol : float
Stop the optimiser if the loss becomes small than this value.
max_iters_no_improv : int
Stop the optimiser if the loss does not improve after this number of iterations.
max_func_calls : int
Stop the optimiser after this number of function calls.
max_timeout : float
Stop the optimiser after this elapsed time (in seconds).
"""
self.conv_tol = conv_tol
self.max_iters_no_improv = max_iters_no_improv
self.max_func_calls = max_func_calls
self.max_timeout = max_timeout
[docs]
def check_criteria(
self,
loss_value: float,
iters_no_improv: int,
func_calls: int,
elapsed: float,
) -> bool:
"""
Check the status of the stopping criteria.
Parameters
----------
loss_value : float
Current loss value.
iters_no_improv : int
Current number of iterations without improvement.
func_calls : int
Current number of function calls.
elapsed : float
Elapsed time in seconds.
Returns
-------
bool
Whether any of the criteria are satisfied.
"""
conv = self.conv_tol is not None and loss_value < self.conv_tol
func = self.max_func_calls is not None and func_calls > self.max_func_calls
impr = self.max_iters_no_improv is not None and iters_no_improv > self.max_iters_no_improv
timr = self.max_timeout is not None and elapsed > self.max_timeout
return conv or impr or func or timr
[docs]
@staticmethod
def read(config: Dict[str, Any]) -> StoppingCriteria:
"""Read the stopping criteria from the configuration dictionary.
Parameters
----------
config : Dict[str, Any]
Configuration dictionary.
Returns
-------
StoppingCriteria
Stopping criteria.
"""
def optional_get(key, convert):
return convert(config[key]) if key in config else None
return StoppingCriteria(
conv_tol=optional_get('conv_tol', float),
max_iters_no_improv=optional_get('max_iters_no_improv', int),
max_func_calls=optional_get('max_func_calls', int),
max_timeout=optional_get('max_timeout', float),
)
[docs]
class InvalidOptimiserException(Exception):
"""Exception signaling invalid combination of optimiser and objective function."""
[docs]
class Optimiser(ABC):
"""
Interface for implementing different optimization algorithms.
Methods
-------
_init_optimiser(n_iter, parameters, pbar, loss, stop_criteria):
constructs the attributes for the optimiser.
optimise(loss, n_iter, parameters, stop_criteria = StoppingCriteria()):
initiates optimiser.
_optimise(self, func, n_dim, n_iter, bound, init_shot):
performs the optimization.
_progress_check(self, i_iter, curr_value, curr_solution):
evaluates the optimiser progress.
"""
def __init__(self, name: str, objective: Objective) -> None:
self.name = name
self.objective = objective
self.parameters = None
self.i_iter = None
self.n_iter = None
self.iters_no_improv = None
self.pbar = None
self.stop_criteria = None
self.output_dir = None
self.best_value = None
self.best_solution = None
self.begin_time = None
@abstractmethod
def _validate_problem(self, objective: Objective) -> None:
"""Validate the combination of optimiser and objective.
Parameters
----------
objective : Objective
Objective to optimise.
"""
[docs]
def optimise(
self,
n_iter: int,
parameters: ParameterSet,
output_dir: str,
stop_criteria: StoppingCriteria = StoppingCriteria(),
verbose: bool = True,
) -> Tuple[float, np.ndarray]:
"""
Optimiser for the outside world.
Parameters
----------
objective : Objective
Objective function to optimise.
n_iter : int
Maximum number of iterations.
parameters : ParameterSet
Set of parameters to optimise.
output_dir : str
Whether to write output to the output directory, by default None.
stop_criteria : StoppingCriteria
List of stopping criteria, by default none attributed.
verbose : bool
Whether to output progress status, by default True.
Returns
-------
float
Best observed objective value.
np.ndarray
Observed optimum of the objective.
"""
# Sanity check
self._validate_problem(self.objective)
# Initialise optimiser
self.n_iter = n_iter
self.parameters = parameters
self.stop_criteria = stop_criteria
self.output_dir = output_dir
self.iters_no_improv = 0
# Build initial shot and bounds
n_dim = len(self.parameters)
init_shot = np.array([par.inital_value for par in self.parameters])
bounds = np.array([[par.lbound, par.ubound] for par in self.parameters])
# Build best solution
self.best_value = np.nan
self.best_solution = None
# Prepare history output files
with open(os.path.join(self.output_dir, "history"), 'w', encoding='utf8') as file:
file.write(f'{"Iteration":>10}\t')
file.write(f'{"Time /s":>15}\t')
file.write(f'{"Best Loss":>15}\t')
file.write(f'{"Current Loss":>15}\t')
for par in self.parameters:
file.write(f'{par.name:>15}\t')
file.write('\tOptimiser info')
file.write('\n')
# Prepare optimiser
self.objective.prepare()
self.pbar = tqdm(total=n_iter, desc=self.name) if verbose else None
# Optimise
self.begin_time = time.perf_counter()
self._optimise(n_dim, n_iter, bounds, init_shot)
elapsed = time.perf_counter() - self.begin_time
# Output progress
if verbose:
self.pbar.close()
print(f'Completed {self.i_iter} iterations in {pretty_time(elapsed)}')
print(f'Best loss: {self.best_value:15.8e}')
if self.best_solution is not None:
print('Best parameters')
max_width = max(len(par.name) for par in self.parameters)
for i, par in enumerate(self.parameters):
print(f'- {par.name.rjust(max_width)}: {self.best_solution[i]:>12.6f}')
# Return the best value
return self.best_value, self.best_solution
@abstractmethod
def _optimise(
self,
n_dim: int,
n_iter: int,
bound: np.ndarray,
init_shot: np.ndarray,
) -> Tuple[float, np.ndarray]:
"""
Abstract method for optimising the objective.
Parameters
----------
n_dim : int
Number of parameters to optimise.
n_iter : int
Maximum number of iterations.
bound : np.ndarray
Array where first and second columns correspond to lower and upper bounds, respectively.
init_shot : np.ndarray
Initial shot for the optimisation problem.
Returns
-------
float
Best observed objective value.
np.ndarray
Observed optimum of the objective.
"""
def __update_progress_files(
self,
i_iter: int,
curr_solution: np.ndarray,
curr_value: float,
extra_info: str,
) -> None:
"""Update progress on output files.
Parameters
----------
i_iter : int
Current iteration number.
curr_solution : np.ndarray
Current objective minimiser.
curr_value : float
Current objective value.
extra_info : str
Additional information to pass to user.
"""
elapsed = time.perf_counter() - self.begin_time
skip_pars = curr_solution is None
# Update progress file
with open(os.path.join(self.output_dir, "progress"), 'w', encoding='utf8') as file:
file.write(f'Iteration: {i_iter}\n')
file.write(f'Function calls: {self.objective.func_calls}\n')
file.write(f'Best loss: {self.best_value}\n')
if extra_info is not None:
file.write(f'Optimiser info: {extra_info}\n')
if not skip_pars:
file.write('Best parameters:\n')
for i, par in enumerate(self.parameters):
file.write(f'\t{par.name}: {self.best_solution[i]}\n')
file.write(f'\nElapsed time: {pretty_time(elapsed)}\n')
# Update history file
with open(os.path.join(self.output_dir, "history"), 'a', encoding='utf8') as file:
file.write(f'{i_iter:>10}\t')
file.write(f'{elapsed:>15.8e}\t')
file.write(f'{self.best_value:>15.8e}\t')
file.write(f'{curr_value:>15.8e}\t')
for i, par in enumerate(self.parameters):
file.write('None\t'.rjust(16) if skip_pars else f'{curr_solution[i]:>15.8f}\t')
file.write(f"\t{'-' if extra_info is None else extra_info}")
file.write('\n')
def _progress_check(
self,
i_iter: int,
curr_value: float,
curr_solution: np.ndarray,
extra_info: str = None,
) -> bool:
"""
Report the optimiser progress and check for termination.
Parameters
----------
i_iter : int
Current iteration number.
curr_value : float
Current objective value.
curr_solution : np.ndarray
Current objective minimiser.
extra_info : str
Additional information to pass to user.
Returns
-------
bool
Whether any of the stopping criteria is satisfied.
"""
# Update new value to best value
self.i_iter = i_iter
if curr_value >= self.best_value:
self.iters_no_improv += 1
else:
self.best_value = curr_value
self.best_solution = curr_solution
self.iters_no_improv = 0
# Update progress bar
if self.pbar is not None:
info = f'Loss: {self.best_value:6.4e}' + (f' ({extra_info})' if extra_info else '')
self.pbar.set_postfix_str(info)
if i_iter > 0:
self.pbar.update()
# Update progress in output files
self.__update_progress_files(i_iter, curr_solution, curr_value, extra_info)
# Convergence criterion
return i_iter > self.n_iter or self.stop_criteria.check_criteria(
curr_value,
self.iters_no_improv,
self.objective.func_calls,
time.perf_counter() - self.begin_time,
)
[docs]
class ScalarOptimiser(Optimiser):
"""Base class for scalar optimisers."""
def __init__(self, name: str, objective: Objective) -> None:
super().__init__(name, objective)
self.bounds = None
def _validate_problem(self, objective: Objective) -> None:
"""Validate the combination of optimiser and objective.
Parameters
----------
objective : Objective
Objective to optimise.
Raises
------
InvalidOptimiserException
With an invalid combination of optimiser and objective function.
"""
if not isinstance(objective, GenericObjective):
raise InvalidOptimiserException('Generic objective required for this optimiser')
if objective.composition is not None:
raise InvalidOptimiserException('This optimiser does not support composition')
if objective.stochastic:
raise InvalidOptimiserException('This optimiser does not support stochasticity')
@abstractmethod
def _scalar_optimise(
self,
objective: Callable[[np.ndarray, Optional[bool]], float],
n_dim: int,
n_iter: int,
bound: np.ndarray,
init_shot: np.ndarray,
) -> Tuple[float, np.ndarray]:
"""
Abstract method for optimising the objective.
Parameters
----------
objective : Callable[[np.ndarray], float]
Objective function to optimise.
n_dim : int
Number of parameters to optimise.
n_iter : int
Maximum number of iterations.
bound : np.ndarray
Array where first and second columns correspond to lower and upper bounds, respectively.
init_shot : np.ndarray
Initial shot for the optimisation problem.
Returns
-------
float
Best observed objective value.
np.ndarray
Observed optimum of the objective.
"""
def _norm_params(self, params: np.ndarray) -> np.ndarray:
"""Normalise the parameters.
Parameters
----------
params : np.ndarray
Denormalised parameters.
Returns
-------
np.ndarray
Normalised parameters.
"""
return 2.0 * (params - self.bounds[:, 0]) / (self.bounds[:, 1] - self.bounds[:, 0]) - 1.0
def _denorm_params(self, params: np.ndarray) -> np.ndarray:
"""Denormalise the parameters.
Parameters
----------
params : np.ndarray
Normalised parameters.
Returns
-------
np.ndarray
Denormalised parameters.
"""
return self.bounds[:, 0] + (1.0 + params) * (self.bounds[:, 1] - self.bounds[:, 0]) / 2.0
def _optimise(
self,
n_dim: int,
n_iter: int,
bound: np.ndarray,
init_shot: np.ndarray,
) -> Tuple[float, np.ndarray]:
"""
Abstract method for optimising the objective.
Parameters
----------
objective : Objective
Objective function to optimise.
n_dim : int
Number of parameters to optimise.
n_iter : int
Maximum number of iterations.
bound : np.ndarray
Array where first and second columns correspond to lower and upper bounds, respectively.
init_shot : np.ndarray
Initial shot for the optimisation problem.
Returns
-------
float
Best observed objective value.
np.ndarray
Observed optimum of the objective.
"""
self.bounds = bound
# Optimise the scalarised objective
return self._scalar_optimise(
lambda x, concurrent=False: self.objective(
self._denorm_params(x),
concurrent=concurrent
).values.item(),
n_dim,
n_iter,
np.array([[-1.0, 1.0]]).repeat(n_dim, axis=0),
self._norm_params(init_shot),
)
def _progress_check(
self,
i_iter: int,
curr_value: float,
curr_solution: np.ndarray,
extra_info: str = None,
) -> bool:
"""
Report the optimiser progress and check for termination (with parameter denormalisation).
Parameters
----------
i_iter : int
Current iteration number.
curr_value : float
Current objective value.
curr_solution : np.ndarray
Current objective minimiser.
extra_info : str
Additional information to pass to user.
Returns
-------
bool
Whether any of the stopping criteria is satisfied.
"""
denorm_solution = None if curr_solution is None else self._denorm_params(curr_solution)
super()._progress_check(i_iter, curr_value, denorm_solution, extra_info)