"""Module for a simple query optimiser."""
from typing import Tuple
import os
import numpy as np
import torch
from botorch.utils.multi_objective.box_decompositions.non_dominated import (
FastNondominatedPartitioning,
)
from piglot.optimiser import Optimiser, InvalidOptimiserException
from piglot.objective import Objective, GenericObjective
[docs]
class QueryOptimiser(Optimiser):
"""Query optimiser."""
def __init__(
self,
objective: Objective,
param_list_file: str,
reference_point: list[float] = None,
nadir_scale: float = 0.1,
) -> None:
"""Constructor for the Query optimiser class.
Parameters
----------
objective : Objective
Objective to optimise.
param_list_file : str
File containing the list of parameters.
reference_point : list[float], optional
Reference point for multi-objective optimisation, by default None.
nadir_scale : float, optional
Scale factor for the nadir point, by default 0.1.
"""
super().__init__('Query', objective)
self.param_list = np.genfromtxt(param_list_file)
self.reference_point = reference_point
self.nadir_scale = nadir_scale
if len(self.param_list.shape) == 1:
self.param_list = self.param_list.reshape(-1, 1)
def _validate_problem(self, objective: Objective) -> None:
"""Validate the combination of optimiser and objective.
Parameters
----------
objective : Objective
Objective to optimise.
"""
if not isinstance(objective, GenericObjective):
raise InvalidOptimiserException('Generic objective required for this optimiser')
if objective.composition is not None:
raise InvalidOptimiserException('This optimiser does not support composition')
if objective.stochastic:
raise InvalidOptimiserException('This optimiser does not support stochasticity')
[docs]
def update_mo_data(self, parameters: np.ndarray, observations: np.ndarray) -> Tuple[float, int]:
"""Get the partitioning of the observations in multi-objective optimisation.
Parameters
----------
parameters : np.ndarray
Array of parameters.
observations : np.ndarray
Array of observations.
Returns
-------
Tuple[float, int]
Hypervolume and number of non-dominated points.
"""
if self.reference_point is not None:
ref_point = np.array(self.reference_point)
else:
nadir = np.max(observations, axis=0)
ref_point = nadir + self.nadir_scale * (nadir - np.min(observations, axis=0))
partitioning = FastNondominatedPartitioning(
ref_point=torch.from_numpy(-ref_point),
Y=torch.from_numpy(-observations),
)
hypervolume = partitioning.compute_hypervolume().item()
pareto = -partitioning.pareto_Y
# Map each Pareto point to the original parameter space
param_indices = [
torch.argmin((torch.from_numpy(observations) - pareto[i, :]).norm(dim=1)).item()
for i in range(pareto.shape[0])
]
# Dump the Pareto front to a file
with open(os.path.join(self.output_dir, "pareto_front"), 'w', encoding='utf8') as file:
# Write header
num_obj = pareto.shape[1]
file.write('\t'.join([f'{"Objective_" + str(i + 1):>15}' for i in range(num_obj)]))
file.write('\t' + '\t'.join([f'{param.name:>15}' for param in self.parameters]) + '\n')
# Write each point
for i, idx in enumerate(param_indices):
file.write('\t'.join([f'{x.item():>15.8f}' for x in pareto[i, :]]) + '\t')
file.write('\t'.join([f'{x.item():>15.8f}' for x in parameters[idx, :]]) + '\n')
return -np.log(hypervolume)
def _optimise(
self,
n_dim: int,
n_iter: int,
bound: np.ndarray,
init_shot: np.ndarray,
) -> Tuple[float, np.ndarray]:
"""
Optimise the objective.
Parameters
----------
n_dim : int
Number of parameters to optimise.
n_iter : int
Maximum number of iterations.
bound : np.ndarray
Array where first and second columns correspond to lower and upper bounds, respectively.
init_shot : np.ndarray
Initial shot for the optimisation problem.
Returns
-------
float
Best observed objective value.
np.ndarray
Observed optimum of the objective.
"""
# Sanitise input
if n_dim != self.param_list.shape[1]:
raise ValueError('Number of parameters does not match the number of columns.')
if n_iter != self.param_list.shape[0]:
raise ValueError('Number of iterations does not match the number of rows.')
for i in range(n_dim):
if np.any(self.param_list[:, i] < bound[i, 0]):
raise ValueError('Parameter values outside lower bounds.')
if np.any(self.param_list[:, i] > bound[i, 1]):
raise ValueError('Parameter values outside upper bounds.')
# Initial shot
result = self.objective(init_shot)
best_value = result.values if self.objective.multi_objective else result.scalar_value
best_solution = init_shot
# Build observation datasets
param_dataset = np.array([best_solution])
objective_dataset = np.array([best_value])
if len(objective_dataset.shape) == 1:
objective_dataset = objective_dataset.reshape(-1, 1)
# Update progress
if self.objective.multi_objective:
best_value = self.update_mo_data(param_dataset, objective_dataset)
best_solution = None
self._progress_check(0, best_value, best_solution)
# Iterate over all parameter sets
for i, param_set in enumerate(self.param_list):
# Evaluate objective and add to dataset
result = self.objective(param_set)
value = result.values if self.objective.multi_objective else result.scalar_value
param_dataset = np.vstack((param_dataset, param_set))
objective_dataset = np.vstack((objective_dataset, value))
# Update best-observed value
if self.objective.multi_objective:
best_value = self.update_mo_data(param_dataset, objective_dataset)
best_solution = None
elif value < best_value:
best_value = value
best_solution = param_set
self._progress_check(i + 1, best_value, best_solution)
return best_value, best_solution