Source code for piglot.optimisers.botorch.bayes

"""Main optimiser classes for using BoTorch with piglot"""
from typing import Tuple, List, Type, Dict
from multiprocessing.pool import ThreadPool as Pool
import os
import warnings
import numpy as np
import torch
from scipy.stats import qmc
from gpytorch.mlls import ExactMarginalLogLikelihood
import botorch
from botorch.fit import fit_gpytorch_mll
from botorch.optim import optimize_acqf
from botorch.models import SingleTaskGP
from botorch.models.model import Model
from botorch.models.converter import batched_to_model_list
from botorch.models.transforms import Normalize
from botorch.acquisition import (
    AcquisitionFunction,
    qUpperConfidenceBound,
    qExpectedImprovement,
    qProbabilityOfImprovement,
    qLogExpectedImprovement,
    qNoisyExpectedImprovement,
    qLogNoisyExpectedImprovement,
    qKnowledgeGradient,
)
from botorch.acquisition.objective import GenericMCObjective
from botorch.acquisition.multi_objective import (
    qExpectedHypervolumeImprovement,
    qNoisyExpectedHypervolumeImprovement,
    qHypervolumeKnowledgeGradient,
)
from botorch.utils.multi_objective.box_decompositions.non_dominated import (
    FastNondominatedPartitioning,
)
from botorch.acquisition.multi_objective.logei import (
    qLogExpectedHypervolumeImprovement,
    qLogNoisyExpectedHypervolumeImprovement,
)
from botorch.acquisition.multi_objective.objective import GenericMCMultiOutputObjective
from botorch.sampling import SobolQMCNormalSampler
from piglot.objective import (
    Objective,
    GenericObjective,
    ObjectiveResult,
)
from piglot.optimiser import Optimiser
from piglot.optimisers.botorch.dataset import BayesDataset


AVAILABLE_ACQUISITIONS: Dict[str, Type[AcquisitionFunction]] = {
    # Quasi-Monte Carlo acquisitions
    'qucb': qUpperConfidenceBound,
    'qei': qExpectedImprovement,
    'qlogei': qLogExpectedImprovement,
    'qpi': qProbabilityOfImprovement,
    'qkg': qKnowledgeGradient,
    # Analytical and quasi-Monte Carlo acquisitions for noisy problems
    'qnei': qNoisyExpectedImprovement,
    'qlognei': qLogNoisyExpectedImprovement,
    # Multi-objective acquisitions
    'qehvi': qExpectedHypervolumeImprovement,
    'qnehvi': qNoisyExpectedHypervolumeImprovement,
    'qlogehvi': qLogExpectedHypervolumeImprovement,
    'qlognehvi': qLogNoisyExpectedHypervolumeImprovement,
    'qhvkg': qHypervolumeKnowledgeGradient
}


[docs] def get_default_torch_device() -> str: """Utility to return the default PyTorch device (pre Pytorch v2.3). Returns ------- str Name of the default PyTorch device. """ return str(torch.tensor([0.0]).device)
[docs] def default_acquisition( composite: bool, multi_objective: bool, noisy: bool, q: int, ) -> str: """Return the default acquisition function for the given optimisation problem. Parameters ---------- composite : bool, optional Whether the optimisation problem is a composition. multi_objective : bool, optional Whether the optimisation problem is multi-objective. noisy : bool, optional Whether the optimisation problem is noisy. q : int, optional Number of candidates to generate. Returns ------- str Name of the default acquisition function. """ if multi_objective: return 'qlognehvi' if noisy else 'qlogehvi' if noisy: return 'qlognei' if composite or q > 1: return 'qlogei' return 'qlogei'
[docs] def fit_mll_pytorch_loop(mll: ExactMarginalLogLikelihood, n_iters: int = 100) -> None: """Fit a GP model using a PyTorch optimisation loop. Parameters ---------- mll : ExactMarginalLogLikelihood Marginal log-likelihood to optimise. n_iters : int, optional Number of iterations to optimise for, by default 100 """ mll.train() mll.model.likelihood.train() optimizer = torch.optim.Adam(mll.model.parameters(), lr=0.1) for _ in range(n_iters): optimizer.zero_grad() output = mll.model(mll.model.train_inputs[0]) loss = -torch.mean(mll(output, mll.model.train_targets)) loss.backward() optimizer.step() mll.model.eval() mll.model.likelihood.eval()
[docs] class BayesianBoTorch(Optimiser): """Driver for optimisation using BoTorch.""" def __init__( self, objective: Objective, n_initial: int = None, n_test: int = 0, acquisition: str = None, beta: float = 1.0, noisy: float = False, q: int = 1, seed: int = 1, load_file: str = None, export: str = None, device: str = None, reference_point: List[float] = None, nadir_scale: float = 0.1, skip_initial: bool = False, pca_variance: float = None, num_restarts: int = None, raw_samples: int = None, mc_samples: int = None, batch_size: int = None, num_fantasies: int = None, sequential: bool = False, ) -> None: if not isinstance(objective, GenericObjective): raise RuntimeError("Bayesian optimiser requires a GenericObjective") if bool(noisy) and objective.stochastic: warnings.warn("Noisy setting with stochastic objective - ignoring objective variance") super().__init__('BoTorch', objective) self.objective = objective self.n_initial = n_initial self.acquisition = acquisition self.beta = beta self.noisy = bool(noisy) self.q = q self.seed = seed self.load_file = load_file self.export = export self.n_test = n_test self.device = get_default_torch_device() if device is None else device self.skip_initial = bool(skip_initial) self.partitioning: FastNondominatedPartitioning = None self.adjusted_ref_point = reference_point is None self.ref_point = None if reference_point is None else -torch.tensor(reference_point) self.nadir_scale = nadir_scale self.pca_variance = pca_variance self.num_restarts = num_restarts self.raw_samples = raw_samples self.mc_samples = mc_samples self.batch_size = batch_size self.sequential = bool(sequential) self.num_fantasies = num_fantasies if acquisition is None: self.acquisition = default_acquisition( objective.composition, objective.multi_objective, bool(noisy) or objective.stochastic, self.q, ) else: orig_name = self.acquisition if not self.acquisition.startswith('q'): self.acquisition = 'q' + self.acquisition if self.acquisition not in AVAILABLE_ACQUISITIONS: raise RuntimeError(f"Unkown acquisition function {orig_name}") if self.pca_variance and not (objective.composition or objective.multi_objective): warnings.warn("Ignoring PCA variance for non-composite single-objective problem") self.pca_variance = None elif self.pca_variance is None and objective.composition: self.pca_variance = 1e-6 torch.set_num_threads(1) def _validate_problem(self, objective: Objective) -> None: """Validate the combination of optimiser and objective Parameters ---------- objective : Objective Objective to optimise """ def _build_model(self, dataset: BayesDataset) -> Model: # Transform outcomes and clamp variances to prevent warnings from GPyTorch values, variances = dataset.transform_outcomes(dataset.values, dataset.covariances) variances = torch.clamp_min(variances, 1e-6) # Initialise model instance depending on noise setting model = SingleTaskGP( dataset.params, values, train_Yvar=None if self.noisy else variances, input_transform=Normalize(d=dataset.params.shape[-1]), ) # Fit the GP (in case of trouble, fall back to an Adam-based optimiser) mll = ExactMarginalLogLikelihood(model.likelihood, model) try: fit_gpytorch_mll(mll) except botorch.exceptions.ModelFittingError: warnings.warn('Optimisation of the MLL failed, falling back to PyTorch optimiser') fit_mll_pytorch_loop(mll) # MOBO requires a model list (except when there is only one output) if self.objective.multi_objective and values.shape[-1] > 1: return batched_to_model_list(model) return model def _get_candidates( self, bounds: np.ndarray, dataset: BayesDataset, test_dataset: BayesDataset, ) -> Tuple[np.ndarray, float]: # Build model model = self._build_model(dataset) # Evaluate GP performance with the test dataset cv_error = None if self.n_test > 0: std_test_values, _ = dataset.transform_outcomes( test_dataset.values, test_dataset.covariances, ) with torch.no_grad(): posterior = model.posterior(test_dataset.params) cv_error = (posterior.mean - std_test_values).square().mean().item() # Build the acquisition function acq = self._acq_func(dataset, model) # Optimise acquisition to find next candidate(s) candidates, _ = optimize_acqf( acq, bounds=torch.from_numpy(bounds.T).to(self.device).to(dataset.dtype), q=self.q, num_restarts=self.num_restarts, raw_samples=self.raw_samples, sequential=self.sequential, options={ "sample_around_best": True, "seed": self.seed, "init_batch_limit": self.batch_size, }, ) return candidates.cpu().numpy(), cv_error def _eval_candidates(self, candidates: np.ndarray) -> List[ObjectiveResult]: # Single candidate case if self.q == 1: return [self.objective(candidate) for candidate in candidates] # Multi-candidate: run cases in parallel with Pool(self.q) as pool: results = pool.map(lambda x: self.objective(x, concurrent=True), candidates) return list(results) def _get_random_points( self, n_points: int, n_dim: int, seed: int, bound: np.ndarray, ) -> List[np.ndarray]: points = qmc.Sobol(n_dim, seed=seed).random(n_points) return [point * (bound[:, 1] - bound[:, 0]) + bound[:, 0] for point in points] def _result_to_dataset(self, result: ObjectiveResult) -> Tuple[np.ndarray, np.ndarray]: covariances = ( result.covariances if self.objective.stochastic else np.diag(np.zeros_like(result.values)) ) return result.values, covariances def _composition(self, vals: torch.Tensor, params: torch.Tensor) -> torch.Tensor: # Just negate the untransformed outcomes if no composition is available if not self.objective.composition: return -vals.squeeze(-1) # Otherwise, use the composition function return -self.objective.composition.composition_torch(vals, params) def _update_mo_data(self, dataset: BayesDataset) -> float: y_points = self._composition(dataset.values, dataset.params) # Compute reference point if needed if self.adjusted_ref_point: nadir = torch.min(y_points, dim=0).values self.ref_point = nadir - self.nadir_scale * (torch.max(y_points, dim=0).values - nadir) # Update partitioning and Pareto front self.partitioning = FastNondominatedPartitioning(self.ref_point, Y=y_points) hypervolume = self.partitioning.compute_hypervolume().item() pareto = self.partitioning.pareto_Y # Map each Pareto point to the original parameter space param_indices = [ torch.argmin((y_points - pareto[i, :]).norm(dim=1)).item() for i in range(pareto.shape[0]) ] # Dump the Pareto front to a file with open(os.path.join(self.output_dir, "pareto_front"), 'w', encoding='utf8') as file: # Write header num_obj = pareto.shape[1] file.write('\t'.join([f'{"Objective_" + str(i + 1):>15}' for i in range(num_obj)])) file.write('\t' + '\t'.join([f'{param.name:>15}' for param in self.parameters]) + '\n') # Write each point for i, idx in enumerate(param_indices): file.write('\t'.join([f'{-x.item():>15.8f}' for x in pareto[i, :]]) + '\t') file.write('\t'.join([f'{x.item():>15.8f}' for x in dataset.params[idx, :]]) + '\n') return -np.log(hypervolume) def _acq_func( self, dataset: BayesDataset, model: Model, ) -> AcquisitionFunction: # Default values for multi-restart optimisation sampler = SobolQMCNormalSampler(torch.Size([self.mc_samples]), seed=self.seed) # Find best value for the acquisition best = torch.max(self._composition(dataset.values, dataset.params)).item() # Build composite MC objective def mc_objective(vals: torch.Tensor, X: torch.Tensor) -> torch.Tensor: return self._composition(dataset.untransform_outcomes(vals), X) # Delegate to the correct acquisition function # The arguments for each acquisition are different, so we group them into families acq_class = AVAILABLE_ACQUISITIONS[self.acquisition] if self.acquisition == 'qucb': acq = acq_class( model, self.beta, sampler=sampler, objective=GenericMCObjective(mc_objective), ) elif self.acquisition in ('qei', 'qlogei', 'qpi'): acq = acq_class( model, best, sampler=sampler, objective=GenericMCObjective(mc_objective), ) elif self.acquisition in ('qnei', 'qlognei'): acq = acq_class( model, dataset.params, sampler=sampler, objective=GenericMCObjective(mc_objective), ) elif self.acquisition == 'qkg': acq = acq_class( model, num_fantasies=self.num_fantasies, inner_sampler=sampler, objective=GenericMCObjective(mc_objective), ) # Quasi-Monte Carlo multi-objective acquisitions elif self.acquisition in ('qehvi', 'qlogehvi'): acq = acq_class( model, self.ref_point, self.partitioning, objective=GenericMCMultiOutputObjective(mc_objective), sampler=sampler, ) elif self.acquisition in ('qnehvi', 'qlognehvi'): acq = acq_class( model, self.ref_point, dataset.params, objective=GenericMCMultiOutputObjective(mc_objective), sampler=sampler, ) elif self.acquisition == 'qhvkg': acq = acq_class( model, self.ref_point, num_fantasies=self.num_fantasies, inner_sampler=sampler, objective=GenericMCMultiOutputObjective(mc_objective), ) else: raise RuntimeError(f"Unknown acquisition {self.acquisition}") return acq def _init_dataset(self, n_dim: int, bound: np.ndarray, init_shot: np.ndarray) -> BayesDataset: # Can we load straight from the input file? if self.load_file: return BayesDataset.load(self.load_file) # Evaluate initial shot and use it to infer number of dimensions if not self.skip_initial: init_result = self.objective(init_shot) init_values, init_covariances = self._result_to_dataset(init_result) n_outputs = len(init_values) # If requested, sample some random points before starting (in parallel if possible) random_points = self._get_random_points(self.n_initial, n_dim, self.seed, bound) results = self._eval_candidates(random_points) # Infer number of points to store when skipping initial shot if self.skip_initial: values, covariances = self._result_to_dataset(results[0]) n_outputs = len(values) # Build initial dataset with the initial shot (if available) dataset = BayesDataset( n_dim, n_outputs, # pylint: disable=E0606 export=self.export, device=self.device, pca_variance=self.pca_variance, ) if not self.skip_initial: dataset.push(init_shot, init_values, init_covariances, init_result.scalar_value) # Add random points to the dataset for i, result in enumerate(results): values, covariances = self._result_to_dataset(result) dataset.push(random_points[i], values, covariances, result.scalar_value) return dataset def _get_extra_info(self, cv_error: float, dataset: BayesDataset) -> str: extra = None if cv_error: extra = f'Val. {cv_error:6.4}' if self.objective.multi_objective: extra += f' Num Pareto: {self.partitioning.pareto_Y.shape[0]}' if self.pca_variance: extra += f' Num PCA: {dataset.pca.num_components}' elif self.objective.multi_objective: extra = f'Num Pareto: {self.partitioning.pareto_Y.shape[0]}' if self.pca_variance: extra += f' Num PCA: {dataset.pca.num_components}' elif self.pca_variance: extra = f'Num PCA: {dataset.pca.num_components}' return extra def _optimise( self, n_dim: int, n_iter: int, bound: np.ndarray, init_shot: np.ndarray, ): """ Parameters ---------- func : callable function to optimize n_dim : integer dimension, i.e., number of parameters to optimize n_iter : integer maximum number of iterations bound : array first column corresponding to the lower bound, and second column to the upper bound init_shot : list initial shot for the optimization problem Returns ------- best_value : float best loss function value best_solution : list best parameter solution """ # Initialise heuristic variables self.n_initial = self.n_initial or max(8, 2 * n_dim) self.num_restarts = self.num_restarts or 12 self.raw_samples = self.raw_samples or max(256, 16 * n_dim * n_dim) self.mc_samples = self.mc_samples or 512 self.batch_size = self.batch_size or 128 self.num_fantasies = self.num_fantasies or 16 # Build initial dataset dataset = self._init_dataset(n_dim, bound, init_shot) # Build test dataset (in parallel if possible) test_dataset = BayesDataset(n_dim, dataset.n_outputs, device=self.device) if self.n_test > 0: test_points = self._get_random_points(self.n_test, n_dim, self.seed + 1, bound) test_results = self._eval_candidates(test_points) for i, result in enumerate(test_results): values, covariances = self._result_to_dataset(result) test_dataset.push(test_points[i], values, covariances, result.scalar_value) # Find current best point to return to the driver if self.objective.multi_objective: best_value = self._update_mo_data(dataset) best_params = None else: best_params, best_value = dataset.min() self._progress_check(0, best_value, best_params) # Optimisation loop for i_iter in range(n_iter): # Generate candidates and catch CUDA OOM errors candidates = None while candidates is None: try: candidates, cv_error = self._get_candidates(bound, dataset, test_dataset) except torch.cuda.OutOfMemoryError: if self.batch_size > 1: warnings.warn( f'CUDA out of memory: halving batch size to {self.batch_size // 2}' ) self.batch_size //= 2 else: warnings.warn('CUDA out of memory: falling back to CPU') self.device = 'cpu' torch.set_default_device('cpu') dataset = dataset.to(self.device) test_dataset = test_dataset.to(self.device) if self.objective.multi_objective: self.ref_point = self.ref_point.to(self.device) self._update_mo_data(dataset) # Evaluate candidates (in parallel if possible) results = self._eval_candidates(candidates) # Update dataset values_batch = [] for i, result in enumerate(results): values, covariances = self._result_to_dataset(result) values_batch.append(result.scalar_value) dataset.push(candidates[i, :], values, covariances, result.scalar_value) # Find best observation for this batch if self.objective.multi_objective: best_value = self._update_mo_data(dataset) best_params = None else: best_idx = np.argmin(values_batch) best_value = values_batch[best_idx] best_params = candidates[best_idx, :] # Update progress (with extra data if available) if self._progress_check( i_iter + 1, best_value, best_params, extra_info=self._get_extra_info(cv_error, dataset), ): break # Return optimisation result if self.objective.multi_objective: best_result = self._update_mo_data(dataset) best_params = None else: best_params, best_result = dataset.min() return best_params, best_result