"""Module containing optimisation objective primites"""
from __future__ import annotations
import os
import os.path
import time
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Type, TypeVar, Tuple
from threading import Lock
from dataclasses import dataclass
import numpy as np
import torch
import pandas as pd
from matplotlib.figure import Figure
from piglot.parameter import ParameterSet
T = TypeVar('T', bound='Objective')
[docs]
class Composition(ABC):
"""Abstract class for defining composition functionals with gradients"""
[docs]
def composition(self, inner: np.ndarray, params: np.ndarray) -> np.ndarray:
"""Abstract method for computing the outer function of the composition
Parameters
----------
inner : np.ndarray
Return value from the inner function
params : np.ndarray
Parameters for the given responses
Returns
-------
np.ndarray
Composition result
"""
result = self.composition_torch(torch.from_numpy(inner), torch.from_numpy(params))
return result.numpy(force=True)
[docs]
@abstractmethod
def composition_torch(self, inner: torch.Tensor, params: torch.Tensor) -> torch.Tensor:
"""Abstract method for computing the outer function of the composition with gradients
Parameters
----------
inner : torch.Tensor
Return value from the inner function
params : torch.Tensor
Parameters for the given responses
Returns
-------
torch.Tensor
Composition result
"""
[docs]
class DynamicPlotter(ABC):
"""Abstract class for dynamically-updating plots"""
[docs]
@abstractmethod
def update(self) -> None:
"""Update the plot with the most recent data"""
[docs]
@dataclass
class ObjectiveResult:
"""Container for objective results."""
params: np.ndarray
values: np.ndarray
obj_values: np.ndarray
covariances: Optional[np.ndarray] = None
obj_variances: Optional[np.ndarray] = None
scalar_value: Optional[float] = None
scalar_variance: Optional[float] = None
[docs]
class IndividualObjective(ABC):
"""Base class for individual objectives for generic optimisation problems."""
def __init__(
self,
maximise: bool = False,
weight: float = 1.0,
bounds: Tuple[float, float] = None,
) -> None:
self.maximise = maximise
self.weight = float(weight)
self.bounds = None
if bounds is not None:
self.bounds = tuple(float(b) for b in bounds)
if self.bounds[0] > self.bounds[1]:
raise ValueError(f"Invalid bounds {self.bounds}.")
if self.maximise:
self.bounds = (-self.bounds[1], -self.bounds[0])
[docs]
class Scalarisation(ABC):
"""Base class for scalarisations."""
def __init__(self, objectives: List[IndividualObjective]) -> None:
self.objectives = objectives
self.weights = torch.tensor([obj.weight for obj in objectives], dtype=torch.float64)
self.bounds = (
torch.tensor([obj.bounds for obj in objectives], dtype=torch.float64)
if all(obj.bounds is not None for obj in objectives)
else None
)
[docs]
def scalarise(
self,
values: np.ndarray,
variances: Optional[np.ndarray] = None,
) -> Tuple[float, Optional[float]]:
"""Scalarise a set of objectives.
Parameters
----------
values : np.ndarray
Mean objective values.
variances : Optional[np.ndarray]
Optional variances of the objectives.
Returns
-------
Tuple[float, Optional[float]]
Mean and variance of the scalarised objective.
"""
torch_mean, torch_var = self.scalarise_torch(
torch.from_numpy(values),
torch.from_numpy(variances) if variances is not None else None,
)
if torch_var is None:
return torch_mean.numpy(force=True), None
return torch_mean.item(), torch_var.item()
[docs]
@abstractmethod
def scalarise_torch(
self,
values: torch.Tensor,
variances: Optional[torch.Tensor] = None,
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
"""Scalarise a set of objectives with gradients.
Parameters
----------
values : torch.Tensor
Mean objective values.
variances : Optional[torch.Tensor]
Optional variances of the objectives.
Returns
-------
Tuple[torch.Tensor, Optional[torch.Tensor]]
Mean and variance of the scalarised objective.
"""
[docs]
class Objective(ABC):
"""Abstract class for optimisation objectives"""
[docs]
@abstractmethod
def __call__(self, *args: Any, **kwds: Any) -> ObjectiveResult:
"""Objective computation for the outside world"""
[docs]
@abstractmethod
def prepare(self) -> None:
"""Generic method to prepare output files before optimising the problem"""
[docs]
@classmethod
@abstractmethod
def read(
cls: Type[T],
config: Dict[str, Any],
parameters: ParameterSet,
output_dir: str,
) -> T:
"""Read the objective from a configuration dictionary.
Parameters
----------
config : Dict[str, Any]
Terms from the configuration dictionary.
parameters : ParameterSet
Set of parameters for this problem.
output_dir : str
Path to the output directory.
Returns
-------
Objective
Objective function to optimise.
"""
[docs]
def plot_case(self, case_hash: str, options: Dict[str, Any] = None) -> List[Figure]:
"""Plot a given function call given the parameter hash
Parameters
----------
case_hash : str, optional
Parameter hash for the case to plot
options : Dict[str, Any], optional
Options to pass to the plotting function, by default None
Returns
-------
List[Figure]
List of figures with the plot
"""
raise NotImplementedError("Single case plotting not implemented for this objective")
[docs]
def plot_best(self) -> List[Figure]:
"""Plot the current best case
Returns
-------
List[Figure]
List of figures with the plot
"""
raise NotImplementedError("Best case plotting not implemented for this objective")
[docs]
def plot_current(self) -> List[DynamicPlotter]:
"""Plot the currently running function call
Returns
-------
List[DynamicPlotter]
List of instances of a updatable plots
"""
raise NotImplementedError("Current case plotting not implemented for this objective")
[docs]
def get_history(self) -> Dict[str, Dict[str, Any]]:
"""Get the objective history
Returns
-------
Dict[str, Dict[str, Any]]
Dictionary of objective history
"""
raise NotImplementedError("Objective history not implemented for this objective")
[docs]
class GenericObjective(Objective):
"""Class for generic objectives."""
def __init__(
self,
parameters: ParameterSet,
stochastic: bool = False,
composition: Composition = None,
scalarisation: Scalarisation = None,
num_objectives: int = 1,
multi_objective: bool = False,
output_dir: str = None,
) -> None:
super().__init__()
self.parameters = parameters
self.output_dir = output_dir
self.stochastic = stochastic
self.scalarisation = scalarisation
self.composition = composition
self.num_objectives = num_objectives
self.multi_objective = multi_objective
self.func_calls = 0
self.begin_time = time.perf_counter()
self.__mutex = Lock()
self.func_calls_file = os.path.join(output_dir, "func_calls") if output_dir else None
[docs]
def prepare(self) -> None:
"""Prepare output directories for the optimsation."""
super().prepare()
if self.output_dir:
# Build header for function calls file
with open(os.path.join(self.func_calls_file), 'w', encoding='utf8') as file:
file.write(f'{"Start Time /s":>15}\t{"Run Time /s":>15}')
# When we have multiple objectives, write each one
if self.num_objectives > 1:
for i in range(self.num_objectives):
file.write(f'\t{"Objective_" + str(i + 1):>15}')
if self.stochastic:
file.write(f'\t{"Variance_" + str(i + 1):>15}')
# But we also write the scalarised objective if possible
if not self.multi_objective:
file.write(f'\t{"Objective":>15}')
if self.stochastic:
file.write(f'\t{"Variance":>15}')
for param in self.parameters:
file.write(f"\t{param.name:>15}")
file.write(f'\t{"Hash":>64}\n')
@abstractmethod
def _objective(self, params: np.ndarray, concurrent: bool = False) -> ObjectiveResult:
"""Abstract method for objective computation.
Parameters
----------
params : np.ndarray
Set of parameters to evaluate the objective for.
concurrent : bool, optional
Whether this call may be concurrent to others, by default False.
Returns
-------
ObjectiveResult
Objective result.
"""
[docs]
def __call__(self, params: np.ndarray, concurrent: bool = False) -> ObjectiveResult:
"""Objective computation for the outside world. Also handles output file writing.
Parameters
----------
params : np.ndarray
Set of parameters to evaluate the objective for.
concurrent : bool, optional
Whether this call may be concurrent to others, by default False.
Returns
-------
ObjectiveResult
Objective result.
"""
# Evaluate objective
self.func_calls += 1
begin_time = time.perf_counter()
result = self._objective(params, concurrent=concurrent)
end_time = time.perf_counter()
# Update function call history file
if self.output_dir:
with self.__mutex:
with open(os.path.join(self.func_calls_file), 'a', encoding='utf8') as file:
file.write(f'{begin_time - self.begin_time:>15.8e}\t')
file.write(f'{end_time - begin_time:>15.8e}\t')
# Write out each objective value
if self.num_objectives > 1:
if self.stochastic:
for val, var in zip(result.obj_values, result.obj_variances):
file.write(f'{val:>15.8e}\t{var:>15.8e}\t')
else:
for val in result.obj_values:
file.write(f'{val:>15.8e}\t')
# Write out the scalarised objective value
if not self.multi_objective:
file.write(f'{result.scalar_value:>15.8e}\t')
if self.stochastic:
file.write(f'{result.scalar_variance:>15.8e}\t')
for val in params:
file.write(f"{val:>15.6f}\t")
file.write(f'{self.parameters.hash(params)}\n')
return result
[docs]
def plot_best(self) -> List[Figure]:
"""Plot the current best case.
Returns
-------
List[Figure]
List of figures with the plot.
"""
# Build the objective list
objective_list = ["Objective"]
if self.multi_objective:
objective_list = [f"Objective_{i + 1}" for i in range(self.num_objectives)]
# Plot the best case for each objective
figures = []
for i, objective in enumerate(objective_list):
# Find hash associated with the best case
df = pd.read_table(self.func_calls_file)
df.columns = df.columns.str.strip()
min_series = df.iloc[df[objective].idxmin()]
call_hash = str(min_series["Hash"])
# Use the single case plotting utility
options = {'append_title': f'Best {objective} eval'} if self.multi_objective else None
figures += self.plot_case(call_hash, options=options)
# Also display the best case
print(f"Best run{' (' + objective + ')'}:")
print(min_series.drop(objective_list + ["Hash"]))
print(f"Hash: {call_hash}")
print(f"{objective}: {min_series[objective]:15.8e}\n")
return figures
[docs]
def get_history(self) -> Dict[str, Dict[str, Any]]:
"""Get the objective history.
Returns
-------
Dict[str, Dict[str, Any]]
Dictionary of objective history.
"""
df = pd.read_table(self.func_calls_file)
df.columns = df.columns.str.strip()
x_axis = df["Start Time /s"] + df["Run Time /s"]
params = df[[param.name for param in self.parameters]]
param_hash = df["Hash"].to_list()
# Multi-objective case
if self.multi_objective:
values = [df[f"Objective_{i + 1}"] for i in range(self.num_objectives)]
if self.stochastic:
variances = [df[f"Variance_{i + 1}"] for i in range(self.num_objectives)]
return_dict = {}
for i in range(self.num_objectives):
result = {
"time": x_axis.to_numpy(),
"values": values[i],
"params": params.to_numpy(),
"hashes": param_hash,
}
if self.stochastic:
result["variances"] = variances[i]
return_dict[f"Objective_{i + 1}"] = result
return return_dict
# Single objective case
result = {
"time": x_axis.to_numpy(),
"values": df["Objective"].to_numpy(),
"params": params.to_numpy(),
"hashes": param_hash,
}
if self.stochastic:
result["variances"] = df["Variance"].to_numpy()
return {"Objective": result}