Source code for piglot.objectives.response_objective

"""Module for generic response-based objectives."""
from __future__ import annotations
from typing import Dict, Any, List, Optional, Tuple, Type, TypeVar
from abc import ABC, abstractmethod
import warnings
import numpy as np
import torch
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
from matplotlib.figure import Figure
from piglot.parameter import ParameterSet
from piglot.solver.solver import Solver, OutputResult
from piglot.objective import (
    Composition,
    GenericObjective,
    ObjectiveResult,
    DynamicPlotter,
    IndividualObjective,
)
from piglot.utils.reductions import Reduction, NegateReduction
from piglot.utils.response_transformer import ResponseTransformer
from piglot.utils.composition.responses import FlattenUtility, ConcatUtility
from piglot.utils.scalarisations import Scalarisation, SumScalarisation


T = TypeVar('T', bound='ResponseSingleObjective')


[docs] class DynamicResponsePlotter(DynamicPlotter): """Dynamic plotter for response-based objectives.""" def __init__( self, figures: List[Figure], solver: Solver, mapping: Dict[Line2D, str], transformers: Dict[str, ResponseTransformer], ) -> None: self.figures = figures self.solver = solver self.mapping = mapping self.transformers = transformers
[docs] def update(self) -> None: """Update the plot with new results.""" try: result = self.solver.get_current_response() except (FileNotFoundError, IndexError): return # Update the lines for line, name in self.mapping.items(): if name not in result: continue response = result[name] if name in self.transformers: response = self.transformers[name].transform(response) line.set_xdata(response.get_time()) line.set_ydata(response.get_data()) # Redraw the plot for fig in self.figures: for ax in fig.axes: ax.relim() ax.autoscale_view() fig.canvas.draw() fig.canvas.flush_events()
[docs] class ResponseSingleObjective(IndividualObjective, ABC): """Base class for generic response-based objectives.""" def __init__( self, name: str, prediction: List[str], quantity: Reduction, maximise: bool = False, weight: float = 1.0, bounds: Optional[Tuple[float, float]] = None, flatten_utility: Optional[FlattenUtility] = None, prediction_transform: Optional[ResponseTransformer] = None, ) -> None: super().__init__(maximise=maximise, weight=weight, bounds=bounds) self.name = name self.prediction = prediction self.quantity = NegateReduction(quantity) if maximise else quantity self.flatten_utility = flatten_utility self.prediction_transform = prediction_transform def _extract_responses(self, raw_results: Dict[str, OutputResult]) -> List[OutputResult]: """Extract responses of interest from the results and compute any required transformation. Parameters ---------- raw_results : Dict[str, OutputResult] Raw responses from the solver Returns ------- List[OutputResult] List of transformed results. """ results = [raw_results[name] for name in self.prediction] if self.prediction_transform is None: return results return [self.prediction_transform.transform(result) for result in results] @staticmethod def _expand_params(time: torch.Tensor, params: torch.Tensor) -> torch.Tensor: """Expand the set of parameters to match the time grid. Parameters ---------- time : torch.Tensor Time grid for the responses. params : torch.Tensor Parameters for the given responses. Returns ------- torch.Tensor Expanded parameter values. """ # Nothing to do when shapes are consistent if len(params.shape) == len(time.shape): return params # Expand the parameters along the first dimensions return params.expand(*(list(time.shape[:-1]) + [params.shape[-1]]))
[docs] def evaluate( self, params: np.ndarray, raw_results: Dict[str, OutputResult], ) -> Tuple[float, float]: """Evaluate objective value for the given results. Parameters ---------- params : np.ndarray Parameter values for this evaluation. raw_results : Dict[str, OutputResult] Raw responses from the solver. Returns ------- Tuple[float, float] Mean and variance of the objective. """ values = [ self.quantity.reduce(result.time, result.data, params) for result in self._extract_responses(raw_results) ] # Only compute the variance if we have more than one response # TODO: add different stochastic models return ( np.mean(values), np.var(values) / len(values) if len(values) > 1 else 0.0, )
[docs] def latent_space(self, raw_results: Dict[str, OutputResult]) -> Tuple[np.ndarray, np.ndarray]: """Compute latent space representation of the given results (for composite objectives). Parameters ---------- raw_results : Dict[str, OutputResult] Raw responses from the solver. Returns ------- Tuple[np.ndarray, np.ndarray] Mean and variance of the latent space representation. """ latent_space = np.array([ self.flatten_utility.flatten(result.time, result.data) for result in self._extract_responses(raw_results) ]) # Only compute the covariance if we have more than one response # TODO: add different stochastic models covariance = ( np.cov(latent_space.T) / latent_space.shape[0] if latent_space.shape[0] > 1 else np.zeros((latent_space.shape[1], latent_space.shape[1])) ) return np.mean(latent_space, axis=0), covariance
[docs] def evaluate_from_latent_space( self, params: torch.Tensor, latent_responses: torch.Tensor, ) -> torch.Tensor: """Evaluate the objective value(s) from the latent space representation. Parameters ---------- params : torch.Tensor Parameter values for these results. latent_responses : torch.Tensor Response(s) in the latent space representation. Returns ------- torch.Tensor Objective value(s) for the response(s). """ time, data = self.flatten_utility.unflatten_torch(latent_responses) return self.quantity.reduce_torch(time, data, self._expand_params(time, params))
[docs] @abstractmethod def plot(self, axis: plt.Axes, raw_results: Dict[str, OutputResult]) -> Dict[Line2D, str]: """Plot the response for this objective. Parameters ---------- axis : plt.Axes Axis to plot the response on. raw_results : Dict[str, OutputResult] Raw responses from the solver. Returns ------- Dict[Line2D, str] Mapping of lines to response names (for dynamically updating plots). """
[docs] @classmethod @abstractmethod def read(cls: Type[T], name: str, config: Dict[str, Any], output_dir: str) -> T: """Read the objective spec from the configuration dictionary. Parameters ---------- name : str Name of the objective. config : Dict[str, Any] Configuration dictionary. output_dir: str Output directory. Returns ------- ResponseSingleObjective Single objective to use. """
[docs] class ResponseComposition(Composition, ABC): """Generic class for compositions to use for response-based objectives.""" def __init__( self, objectives: List[ResponseSingleObjective], scalarisation: Optional[Scalarisation] = None, ) -> None: super().__init__() self.objectives = objectives self.scalarisation = scalarisation self.concat = ConcatUtility([obj.flatten_utility.length() for obj in self.objectives])
[docs] @abstractmethod def composition_torch(self, inner: torch.Tensor, params: torch.Tensor) -> torch.Tensor: """Compute the composition for all objectives. Parameters ---------- inner : torch.Tensor Return value from the inner function. params : torch.Tensor Paratemers for the given responses. Returns ------- torch.Tensor Composition results. """
[docs] @abstractmethod def get_latent_space( self, params: np.ndarray, raw_responses: Dict[str, OutputResult], ) -> Tuple[np.ndarray, np.ndarray]: """Compute the latent space representation of the given results. Parameters ---------- params : np.ndarray Parameter values for these results. raw_responses : Dict[str, OutputResult] Raw responses from the solver. Returns ------- Tuple[np.ndarray, np.ndarray] Latent space representation of the results: mean and covariance. """
[docs] class FullComposition(ResponseComposition): """Container for the outer composition of composite response-based objectives."""
[docs] def composition_torch(self, inner: torch.Tensor, params: torch.Tensor) -> torch.Tensor: """Compute the composition for all objectives. Parameters ---------- inner : torch.Tensor Return value from the inner function. params : torch.Tensor Paratemers for the given responses. Returns ------- torch.Tensor Composition results. """ # Split the inner responses and compute the objective values latent_responses = self.concat.split_torch(inner) objectives = torch.stack([ objective.evaluate_from_latent_space(params, latent_response) for latent_response, objective in zip(latent_responses, self.objectives) ], dim=-1) # Scalarise the objectives if necessary if self.scalarisation is not None: objectives, _ = self.scalarisation.scalarise_torch(objectives) return objectives
[docs] def get_latent_space( self, params: np.ndarray, raw_responses: Dict[str, OutputResult], ) -> Tuple[np.ndarray, np.ndarray]: """Compute the latent space representation of the given results. Parameters ---------- params : np.ndarray Parameter values for these results. raw_responses : Dict[str, OutputResult] Raw responses from the solver. Returns ------- Tuple[np.ndarray, np.ndarray] Latent space representation of the results: mean and covariance. """ latent_space = [ objective.latent_space(raw_responses) for objective in self.objectives ] return ( self.concat.concat([mean for mean, _ in latent_space]), self.concat.concat_covar([var for _, var in latent_space]), )
[docs] class ScalarisationComposition(ResponseComposition): """Composition for scalarisation of non-composite response objectives."""
[docs] def composition_torch(self, inner: torch.Tensor, params: torch.Tensor) -> torch.Tensor: """Compute the composition for all objectives. Parameters ---------- inner : torch.Tensor Return value from the inner function. params : torch.Tensor Paratemers for the given responses. Returns ------- torch.Tensor Composition results. """ return self.scalarisation.scalarise_torch(inner)[0]
[docs] def get_latent_space( self, params: np.ndarray, raw_responses: Dict[str, OutputResult], ) -> Tuple[np.ndarray, np.ndarray]: """Compute the latent space representation of the given results. Parameters ---------- params : np.ndarray Parameter values for these results. raw_responses : Dict[str, OutputResult] Raw responses from the solver. Returns ------- Tuple[np.ndarray, np.ndarray] Latent space representation of the results: mean and covariance. """ objectives = [ objective.evaluate(params, raw_responses) for objective in self.objectives ] return ( self.concat.concat([np.array([mean]) for mean, _ in objectives]), self.concat.concat_covar([np.array([var]) for _, var in objectives]), )
[docs] class ResponseObjective(GenericObjective): """Objective for generic response-based objectives.""" def __init__( self, parameters: ParameterSet, solver: Solver, objectives: List[ResponseSingleObjective], output_dir: str, scalarisation: Scalarisation = None, stochastic: bool = False, composite: bool = False, full_composite: bool = True, transformers: Dict[str, ResponseTransformer] = None, ) -> None: # Sanitise the scalarisation if scalarisation is None: # Everything fine if we just have a single objective: use a sum scalarisation if len(objectives) == 1: scalarisation = SumScalarisation(objectives) elif composite and not full_composite: raise ValueError('Multi-objective composite problems require full composition') # Get the type of composition to use composite_type = FullComposition if full_composite else ScalarisationComposition super().__init__( parameters, stochastic=stochastic, composition=composite_type(objectives, scalarisation) if composite else None, scalarisation=None if composite else scalarisation, num_objectives=len(objectives), multi_objective=len(objectives) > 1 and scalarisation is None, output_dir=output_dir, ) self.composition: Optional[ResponseComposition] = self.composition self.solver = solver self.objectives = objectives self.transformers = transformers if transformers is not None else {} # Sanitise predictions for objective in self.objectives: for name in objective.prediction: if name not in self.solver.get_output_fields(): raise ValueError(f'Undefined prediction {name}')
[docs] def prepare(self) -> None: """Prepare the objective for optimisation.""" super().prepare() self.solver.prepare()
[docs] def postproc_responses(self, responses: Dict[str, OutputResult]) -> Dict[str, OutputResult]: """Post-process the responses from the solver. Parameters ---------- responses : Dict[str, OutputResult] Raw responses from the solver. Returns ------- Dict[str, OutputResult] Post-processed responses. """ # Sanitise responses empty_responses = [name for name, result in responses.items() if len(result.time) == 0] if len(empty_responses) > 0: warnings.warn( f'Solver call returned empty responses for the output fields {empty_responses}. ' 'Please validate the solver output. Sanitising to zero responses.', RuntimeWarning, ) for name in empty_responses: responses[name] = OutputResult(np.zeros(1), np.zeros(1)) # Transform responses for name, transformer in self.transformers.items(): if name in responses: responses[name] = transformer.transform(responses[name]) return responses
def _objective(self, params: np.ndarray, concurrent: bool = False) -> ObjectiveResult: """Objective computation for design objectives. Parameters ---------- params : np.ndarray Set of parameters to evaluate the objective for. concurrent : bool, optional Whether this call may be concurrent to others, by default False. Returns ------- ObjectiveResult Objective result. """ raw_responses = self.solver.solve(params, concurrent) # Sanitise and post-process the responses raw_responses = self.postproc_responses(raw_responses) # Compute the objective value and variance from each response objective results = [objective.evaluate(params, raw_responses) for objective in self.objectives] obj_values = np.array([mean for mean, _ in results]) obj_variances = np.array([var for _, var in results]) # Under single-objective, compute the scalar objective value scalar_value, scalar_variance = None, None if not self.multi_objective: scalarisation = self.scalarisation or self.composition.scalarisation scalar_value, scalar_variance = scalarisation.scalarise(obj_values, obj_variances) # Get the values to return to the optimiser. Three scenarios: # (i) under composition, return the latent space # (ii) non-composite multi-objective, return the objective values # (iii) non-composite single-objective, return the scalarised objective if self.composition is not None: optim_values, optim_covar = self.composition.get_latent_space(params, raw_responses) elif self.multi_objective: optim_values, optim_covar = obj_values, np.diag(obj_variances) else: optim_values, optim_covar = np.array([scalar_value]), np.array([[scalar_variance]]) # Return the objective result: only return the variances if we are stochastic return ObjectiveResult( params, optim_values, obj_values, scalar_value=scalar_value, covariances=optim_covar if self.stochastic else None, obj_variances=obj_variances if self.stochastic else None, scalar_variance=scalar_variance if self.stochastic else None, )
[docs] def plot_case(self, case_hash: str, options: Dict[str, Any] = None) -> List[Figure]: """Plot a given function call given the parameter hash Parameters ---------- case_hash : str, optional Parameter hash for the case to plot options : Dict[str, Any], optional Options to pass to the plotting function, by default None Returns ------- List[Figure] List of figures with the plot """ append_title = '' if options is not None and 'append_title' in options: append_title = f' ({options["append_title"]})' # Load all responses and post-process them responses = self.postproc_responses(self.solver.get_output_response(case_hash)) # Extract the parameters params = self.solver.get_case_params(case_hash) if options is not None and 'params' in options: append_title += f' - {params}' # Plot each target figures = [] for objective in self.objectives: fig, axis = plt.subplots() objective.plot(axis, responses) axis.set_title(objective.name + append_title) axis.grid() axis.legend() figures.append(fig) return figures
[docs] def plot_current(self) -> List[DynamicPlotter]: """Plot the currently running function call Returns ------- List[DynamicPlotter] List of instances of a updatable plots """ # Get current solver data responses = self.postproc_responses(self.solver.get_current_response()) # Plot each objective figures: List[Figure] = [] mapping: Dict[Line2D, str] = {} for objective in self.objectives: fig, axis = plt.subplots() line, = objective.plot(axis, responses) axis.set_title(objective.name) axis.legend() # Store the line and figure mapping[line] = objective.name figures.append(fig) # Show the plot plt.show() for fig in figures: fig.canvas.draw() fig.canvas.flush_events() return [DynamicResponsePlotter(figures, self.solver, mapping, self.transformers)]