From c44d5541b748fb1bf6faf0271d9d83ab768efb86 Mon Sep 17 00:00:00 2001 From: Hu Zhao <zhao@mbd.rwth-aachen.de> Date: Thu, 27 Oct 2022 23:49:23 +0200 Subject: [PATCH] feat: add annotation, use beartype, correct prefixes, improve optimizer check --- src/psimpy/inference/active_learning.py | 236 ++++++++++++++---------- 1 file changed, 141 insertions(+), 95 deletions(-) diff --git a/src/psimpy/inference/active_learning.py b/src/psimpy/inference/active_learning.py index 507358f..10114b4 100644 --- a/src/psimpy/inference/active_learning.py +++ b/src/psimpy/inference/active_learning.py @@ -1,66 +1,90 @@ import numpy as np import sys from scipy import optimize +from psimpy.simulator.run_simulator import RunSimulator +from psimpy.sampler.latin import LHS +from psimpy.emulator.robustgasp import ScalarGaSP +from psimpy.utility.util_funcs import check_bounds +from typing import Union +from beartype.typing import Callable +from beartype import beartype -min_10_exp = sys.float_info.min_10_exp +_min_float = 10**(sys.float_info.min_10_exp) class ActiveLearning: + @beartype def __init__( - self, ndim, run_sim_obj, data, likelihood, prior, bounds, n0, nt, - lhs_sampler, scalar_gasp, indicator, prefixes=None, - scalar_gasp_mean='constant', optimizer=optimize.brute, - args_likelihood=None, kwgs_likelihood=None, - args_prior=None, kwgs_prior=None, - args_optimizer=None,kwgs_optimizer=None): + self, + ndim: int, + bounds: np.ndarray, + data: np.ndarray, + run_sim_obj: RunSimulator, + prior: Callable, + likelihood: Callable, + n0: int, + nt: int, + lhs_sampler: LHS, + scalar_gasp: ScalarGaSP, + scalar_gasp_mean: str = 'constant', + prefixes: Union[list[str], None] = None, + indicator: str = 'entropy', + optimizer: Callable = optimize.brute, + args_prior: Union[list, None] = None, + kwgs_prior: Union[dict, None] = None, + args_likelihood: Union[list, None] = None, + kwgs_likelihood: Union[dict, None] = None, + args_optimizer: Union[list, None] = None, + kwgs_optimizer: Union[dict, None] = None) -> None: """ Active learning to contruct a scalar GP emulator for natural logarithm - of the product of prior and likelihood. + of the product of prior and likelihood (i.e. unnormalized posterior). Parameters ---------- ndim : int - Parameter dimension. - run_sim_obj : instance of class RunSimulator - It has an attribute `simulator` and two methods to run `simulator`, - namely `serial_run` and `parrel_run`. - For each simulation, `simulator` should return outputs as a - one dimensional numpy array, corresponding to `data`. - data : numpy array - Observed data for parameter calibration. One dimensional array. - likelihood : callable - Likelihood function. - Call with `likelihood(y, data, *args_likelihood, *kwgs_likelihood)`. - `y` is a one dimensional numpy array, representing outputs of a - simulation corresponding to obversed `data`, namely returned outputs - of `simulator` at `x`. - prior : callable - Prior probability density distribution. - Call with `prior(x, *args_prior, **kwgs_prior)`. + Dimension of parameter `x`. bounds : numpy array Upper and lower boundaries of each parameter. Shape (ndim, 2). - `bounds[:, 0]` - lower boundaries of each parameter. - `bounds[:, 1]` - upper boundaries of each parameter. + bounds[:, 0] - lower boundaries of each parameter. + bounds[:, 1] - upper boundaries of each parameter. + data : numpy array + Observed data for parameter calibration. One dimensional array of + shape (k,) where k is the number of observed data. + run_sim_obj : instance of class `RunSimulator` + It has an attribute `simulator` and two methods to run `simulator`, + namely `serial_run` and `parrel_run`. + For each simulation, `simulator` must return outputs `y` as a one + dimensional numpy array of shape (k,), corresponding to `data`. + prior : Callable + Prior probability density function. + Call with `prior(x, *args_prior, **kwgs_prior)` and return the prior + probability density value at x. + likelihood : Callable + Likelihood function constructed based on `data` and corresponding + outputs `y` of `simulator`. + Call with `likelihood(y, data, *args_likelihood, *kwgs_likelihood)` + and return the likelihood value at x. n0 : int - Initial number of simulation runs. + Number of initial simulation runs. Used to build an initial emulator. nt : int - Total number of simulation runs. + Number of total simulation runs. `nt-n0` is therefore the number of + simulation runs guided by active learning. lhs_sampler : instance of class LHS - Used to draw initial samples of variable input parameters. + Used to draw initial samples of `x`. scalar_gasp : instance of class ScalarGaSP - Used to emulate natural logarithm of the product of prior and - likelihood. - indicator : str - Indicator of uncertainty. 'entropy' or 'variance'. - prefixes : list of str, optional - Consist of `nt` prefixes. Each of them is used to name - corresponding simulation output file. + Used to build emulators and make predictions. scalar_gasp_mean : str, optional - Mean function of scalar gasp emulator. + Mean function of `scalar_gasp` emulator. 'zero' - mean is set to zero - 'constant' - mean is set to a constant, determined during training - 'linear' - mean is linear to training input. - optimizer : callable, optional + 'constant' - mean is set to a constant + 'linear' - mean is linear to training input + prefixes : list of str, optional + Consist of `nt` prefixes. Each of them is used to name corresponding + simulation output file. + indicator : str, optional + Indicator of uncertainty. 'entropy' or 'variance'. + optimizer : Callable, optional Find input point that minimizes the uncertainty indicator at each iteration step. Call with `optimizer(func, *args_optimizer, **kwgs_optimizer)`. @@ -70,82 +94,99 @@ class ActiveLearning: has the attribute x denoting the solution array. By default is set to the brute force method of scipy, namely `scipy.optimize.brute`. - args_likelihood : list, optional - Positional arguments for `likelihood`. - kwgs_likelihood : dict, optional - Keyword arguments for `likelihood`. args_prior : list, optional Positional arguments for `prior`. kwgs_prior: dict, optional Keyword arguments for `prior`. + args_likelihood : list, optional + Positional arguments for `likelihood`. + kwgs_likelihood : dict, optional + Keyword arguments for `likelihood`. args_optimizer : list, optional Positional arguments for `optimizer`. kwgs_optimizer : dict, optional Keyword arguments for `optimizer`. """ + if ndim != len(run_sim_obj.var_inp_parameter): + raise RuntimeError("ndim and run_sim_obj are incompatible") if ndim != lhs_sampler.ndim: raise RuntimeError("ndim and lhs_sampler are incompatible") - - if ndim != len(run_sim_obj.var_inp_name): - raise RuntimeError("ndim and run_sim_obj are incompatible") + if ndim != scalar_gasp.ndim: + raise RuntimeError("ndim and scalar_gasp are incompatible") + self.ndim = ndim + + check_bounds(ndim, bounds) + self.bounds = bounds if data.ndim != 1: raise ValueError("data must be a one dimensional numpy array") - - if bounds.ndim != 2: - raise ValueError("bounds must be a two dimensional numpy array") - - if scalar_gasp_mean not in ["zero","constant","linear"]: - raise NotImplementedError( - f"unsupported scalar_gasp_mean {scalar_gasp_mean}") - - if indicator not in ["entropy", "variance"]: - raise NotImplementedError(f"unsupported indicator {indicator}") + self.data = data - self.ndim = ndim self.run_sim_obj = run_sim_obj - self.data = data - self.likelihood = likelihood self.prior = prior - self.bounds = bounds + self.likelihood = likelihood self.n0 = n0 self.nt = nt self.lhs_sampler = lhs_sampler self.lhs_sampler.bounds = bounds - + self.scalar_gasp = scalar_gasp - self.indicator = indicator - if prefixes is None: - self.prefixes = [f'sim{i}' for i in range(nt)] - + if scalar_gasp_mean not in ["zero","constant","linear"]: + raise NotImplementedError( + f"unsupported scalar_gasp_mean {scalar_gasp_mean}" + f"please choose from 'zero', 'constant', and 'linear'") self.scalar_gasp_mean = scalar_gasp_mean - self.optimizer = optimizer + if prefixes is None: + prefixes = [f'sim{i}' for i in range(nt)] + elif len(prefixes) != nt: + raise ValueError("prefixes must have nt number of items") + elif len(set(prefixes)) != nt: + raise ValueError("Each item of prefixes must be unique") + self.prefixes = prefixes + + if indicator not in ["entropy", "variance"]: + raise NotImplementedError( + f"unsupported indicator {indicator}. Please choose from" + f" 'entropy' and 'variance'") + self.indicator = indicator + if optimizer is optimize.brute: - if args_optimizer is None: - ranges = tuple(tuple(bounds[i]) for i in range(ndim)) - args_optimizer = tuple([ranges]) + ranges = tuple(tuple(bounds[i]) for i in range(ndim)) + args_optimizer = [ranges] if kwgs_optimizer is not None: - if "full_output" in kwgs_optimizer.keys(): - kwgs_optimizer["full_output"] = 0 + allowed_keys = {"ranges", "args", "Ns", "full_output", "finish", + "disp", "workers"} + if not set(kwgs_optimizer.keys()).issubset(allowed_keys): + raise ValueError( + "unsupported keyword(s) in kwgs_optimizer" + " for optimze.brute") if "ranges" in kwgs_optimizer.keys(): del kwgs_optimizer["ranges"] + if "args" in kwgs_optimizer.keys(): + del kwgs_optimizer["args"] + if "full_output" in kwgs_optimizer.keys(): + kwgs_optimizer["full_output"] = 0 + self.optimizer = optimizer self.args_prior = () if args_prior is None else args_prior - self.args_likelihood = () if args_likelihood is None else args_likelihood - self.args_optimizer=() if args_optimizer is None else args_optimizer - self.kwgs_prior = {} if kwgs_prior is None else kwgs_prior + + self.args_likelihood = () if args_likelihood is None else args_likelihood self.kwgs_likelihood = {} if kwgs_likelihood is None else kwgs_likelihood + + self.args_optimizer=() if args_optimizer is None else args_optimizer self.kwgs_optimizer={} if kwgs_optimizer is None else kwgs_optimizer self.var_inp_val = np.zeros((nt, ndim)) self.sim_outputs = np.zeros((nt, len(data))) self.ln_pxl_val = np.zeros(nt) - def initial_simulation(self, mode='parallel', max_workers=None): + @beartype + def initial_simulation(self, mode: str = 'serial', + max_workers: Union[int, None] = None) -> None: """ Run `n0` initial simulations. @@ -168,17 +209,19 @@ class ActiveLearning: self.run_sim_obj.serial_run( self.var_inp_val[0:self.n0, :], self.prefixes[0:self.n0], append=False) + else: + raise ValueError("mode must be 'parallel' or 'serial'") self.sim_outputs[0:self.n0, :] = np.array(self.run_sim_obj.outputs) - def iterative_emulation(self): + def iterative_emulation(self) -> None: """ - Sequentially pick `nt - n0` new input points, run simulations, - and build emulators. + Sequentially pick `nt-n0` new input points, run simulations, and + build emulators. """ - self.ln_pxl_val[0:self.n0] = \ - [self._compute_ln_pxl(self.var_inp_val[i,:], self.sim_outputs[i,:]) - for i in range(self.n0)] + self.ln_pxl_val[0:self.n0] = [ + self._compute_ln_pxl(self.var_inp_val[i,:], self.sim_outputs[i,:]) + for i in range(self.n0)] for n in range(self.n0, self.nt + 1): @@ -187,14 +230,16 @@ class ActiveLearning: if n == self.nt: break - opt_res = self.optimizer(self._uncertainty_indicator, - *self.args_optimizer, **self.kwgs_optimizer) + opt_res = self.optimizer( + self._uncertainty_indicator, + *self.args_optimizer, + **self.kwgs_optimizer) if isinstance(opt_res, np.ndarray): new_val = opt_res elif isinstance(opt_res, optimize.OptimizeResult): new_val = opt_res.x else: - raise TypeError( + raise RuntimeError( "Optimizer must return a 1d numpy array representing the" " solution or a OptimizeResult object having x attribute") new_val = new_val.reshape((1, self.ndim)) @@ -207,8 +252,8 @@ class ActiveLearning: self.var_inp_val[n, :] = new_val self.ln_pxl_val[n] = self._compute_ln_pxl(self.var_inp_val[n,:], self.sim_outputs[n,:]) - - def _compute_ln_pxl(self, x, y): + + def _compute_ln_pxl(self, x: np.ndarray, y: np.ndarray) -> float: """Compute natural logarithm of the product of prior and likelihood. Parameters @@ -224,13 +269,14 @@ class ActiveLearning: ln_pxl_val : float Natural logarithm of the product of prior and likelihood at `x`. """ - pxl = self.prior(x, *self.args_prior, **self.kwgs_prior) * \ - self.likelihood(y, self.data, *self.args_likelihood, - **self.kwgs_likelihood) + prior_val = self.prior(x, *self.args_prior, **self.kwgs_prior) + likelihood_val = self.likelihood(y, self.data, *self.args_likelihood, + **self.kwgs_likelihood) + pxl_val = prior_val * likelihood_val - return np.log(np.maximum(pxl, 10**(min_10_exp))) + return float(np.log(np.maximum(pxl_val, _min_float))) - def _emulate_ln_pxl(self, n): + def _emulate_ln_pxl(self, n: int) -> None: """ Train emulator for natural lograthim of prior times likelihood based on `n` simulation runs. @@ -251,7 +297,7 @@ class ActiveLearning: self.var_inp_var[0:n,:], self.ln_pxl_val[0:n], trend=trend) - def _uncertainty_indicator(self, x): + def _uncertainty_indicator(self, x: np.array) -> float: """ Indicator of uncertainty. @@ -282,7 +328,7 @@ class ActiveLearning: neg_val = -(mean + 0.5*np.log(2*np.pi*np.e*std**2)) elif self.indicator == "variance": neg_val = -(2*mean + std**2) - np.log( - np.maximum(np.exp(std**2)-1, 10**(min_10_exp)) + np.maximum(np.exp(std**2)-1, _min_float) ) return float(neg_val) \ No newline at end of file -- GitLab