Skip to content
Snippets Groups Projects
Commit c44d5541 authored by Hu Zhao's avatar Hu Zhao
Browse files

feat: add annotation, use beartype, correct prefixes, improve optimizer check

parent 1a7fcaff
No related branches found
No related tags found
No related merge requests found
import numpy as np import numpy as np
import sys import sys
from scipy import optimize from scipy import optimize
from psimpy.simulator.run_simulator import RunSimulator
from psimpy.sampler.latin import LHS
from psimpy.emulator.robustgasp import ScalarGaSP
from psimpy.utility.util_funcs import check_bounds
from typing import Union
from beartype.typing import Callable
from beartype import beartype
min_10_exp = sys.float_info.min_10_exp _min_float = 10**(sys.float_info.min_10_exp)
class ActiveLearning: class ActiveLearning:
@beartype
def __init__( def __init__(
self, ndim, run_sim_obj, data, likelihood, prior, bounds, n0, nt, self,
lhs_sampler, scalar_gasp, indicator, prefixes=None, ndim: int,
scalar_gasp_mean='constant', optimizer=optimize.brute, bounds: np.ndarray,
args_likelihood=None, kwgs_likelihood=None, data: np.ndarray,
args_prior=None, kwgs_prior=None, run_sim_obj: RunSimulator,
args_optimizer=None,kwgs_optimizer=None): prior: Callable,
likelihood: Callable,
n0: int,
nt: int,
lhs_sampler: LHS,
scalar_gasp: ScalarGaSP,
scalar_gasp_mean: str = 'constant',
prefixes: Union[list[str], None] = None,
indicator: str = 'entropy',
optimizer: Callable = optimize.brute,
args_prior: Union[list, None] = None,
kwgs_prior: Union[dict, None] = None,
args_likelihood: Union[list, None] = None,
kwgs_likelihood: Union[dict, None] = None,
args_optimizer: Union[list, None] = None,
kwgs_optimizer: Union[dict, None] = None) -> None:
""" """
Active learning to contruct a scalar GP emulator for natural logarithm Active learning to contruct a scalar GP emulator for natural logarithm
of the product of prior and likelihood. of the product of prior and likelihood (i.e. unnormalized posterior).
Parameters Parameters
---------- ----------
ndim : int ndim : int
Parameter dimension. Dimension of parameter `x`.
run_sim_obj : instance of class RunSimulator
It has an attribute `simulator` and two methods to run `simulator`,
namely `serial_run` and `parrel_run`.
For each simulation, `simulator` should return outputs as a
one dimensional numpy array, corresponding to `data`.
data : numpy array
Observed data for parameter calibration. One dimensional array.
likelihood : callable
Likelihood function.
Call with `likelihood(y, data, *args_likelihood, *kwgs_likelihood)`.
`y` is a one dimensional numpy array, representing outputs of a
simulation corresponding to obversed `data`, namely returned outputs
of `simulator` at `x`.
prior : callable
Prior probability density distribution.
Call with `prior(x, *args_prior, **kwgs_prior)`.
bounds : numpy array bounds : numpy array
Upper and lower boundaries of each parameter. Shape (ndim, 2). Upper and lower boundaries of each parameter. Shape (ndim, 2).
`bounds[:, 0]` - lower boundaries of each parameter. bounds[:, 0] - lower boundaries of each parameter.
`bounds[:, 1]` - upper boundaries of each parameter. bounds[:, 1] - upper boundaries of each parameter.
data : numpy array
Observed data for parameter calibration. One dimensional array of
shape (k,) where k is the number of observed data.
run_sim_obj : instance of class `RunSimulator`
It has an attribute `simulator` and two methods to run `simulator`,
namely `serial_run` and `parrel_run`.
For each simulation, `simulator` must return outputs `y` as a one
dimensional numpy array of shape (k,), corresponding to `data`.
prior : Callable
Prior probability density function.
Call with `prior(x, *args_prior, **kwgs_prior)` and return the prior
probability density value at x.
likelihood : Callable
Likelihood function constructed based on `data` and corresponding
outputs `y` of `simulator`.
Call with `likelihood(y, data, *args_likelihood, *kwgs_likelihood)`
and return the likelihood value at x.
n0 : int n0 : int
Initial number of simulation runs. Number of initial simulation runs. Used to build an initial emulator.
nt : int nt : int
Total number of simulation runs. Number of total simulation runs. `nt-n0` is therefore the number of
simulation runs guided by active learning.
lhs_sampler : instance of class LHS lhs_sampler : instance of class LHS
Used to draw initial samples of variable input parameters. Used to draw initial samples of `x`.
scalar_gasp : instance of class ScalarGaSP scalar_gasp : instance of class ScalarGaSP
Used to emulate natural logarithm of the product of prior and Used to build emulators and make predictions.
likelihood.
indicator : str
Indicator of uncertainty. 'entropy' or 'variance'.
prefixes : list of str, optional
Consist of `nt` prefixes. Each of them is used to name
corresponding simulation output file.
scalar_gasp_mean : str, optional scalar_gasp_mean : str, optional
Mean function of scalar gasp emulator. Mean function of `scalar_gasp` emulator.
'zero' - mean is set to zero 'zero' - mean is set to zero
'constant' - mean is set to a constant, determined during training 'constant' - mean is set to a constant
'linear' - mean is linear to training input. 'linear' - mean is linear to training input
optimizer : callable, optional prefixes : list of str, optional
Consist of `nt` prefixes. Each of them is used to name corresponding
simulation output file.
indicator : str, optional
Indicator of uncertainty. 'entropy' or 'variance'.
optimizer : Callable, optional
Find input point that minimizes the uncertainty indicator at each Find input point that minimizes the uncertainty indicator at each
iteration step. iteration step.
Call with `optimizer(func, *args_optimizer, **kwgs_optimizer)`. Call with `optimizer(func, *args_optimizer, **kwgs_optimizer)`.
...@@ -70,44 +94,37 @@ class ActiveLearning: ...@@ -70,44 +94,37 @@ class ActiveLearning:
has the attribute x denoting the solution array. has the attribute x denoting the solution array.
By default is set to the brute force method of scipy, namely By default is set to the brute force method of scipy, namely
`scipy.optimize.brute`. `scipy.optimize.brute`.
args_likelihood : list, optional
Positional arguments for `likelihood`.
kwgs_likelihood : dict, optional
Keyword arguments for `likelihood`.
args_prior : list, optional args_prior : list, optional
Positional arguments for `prior`. Positional arguments for `prior`.
kwgs_prior: dict, optional kwgs_prior: dict, optional
Keyword arguments for `prior`. Keyword arguments for `prior`.
args_likelihood : list, optional
Positional arguments for `likelihood`.
kwgs_likelihood : dict, optional
Keyword arguments for `likelihood`.
args_optimizer : list, optional args_optimizer : list, optional
Positional arguments for `optimizer`. Positional arguments for `optimizer`.
kwgs_optimizer : dict, optional kwgs_optimizer : dict, optional
Keyword arguments for `optimizer`. Keyword arguments for `optimizer`.
""" """
if ndim != len(run_sim_obj.var_inp_parameter):
raise RuntimeError("ndim and run_sim_obj are incompatible")
if ndim != lhs_sampler.ndim: if ndim != lhs_sampler.ndim:
raise RuntimeError("ndim and lhs_sampler are incompatible") raise RuntimeError("ndim and lhs_sampler are incompatible")
if ndim != scalar_gasp.ndim:
raise RuntimeError("ndim and scalar_gasp are incompatible")
self.ndim = ndim
if ndim != len(run_sim_obj.var_inp_name): check_bounds(ndim, bounds)
raise RuntimeError("ndim and run_sim_obj are incompatible") self.bounds = bounds
if data.ndim != 1: if data.ndim != 1:
raise ValueError("data must be a one dimensional numpy array") raise ValueError("data must be a one dimensional numpy array")
self.data = data
if bounds.ndim != 2:
raise ValueError("bounds must be a two dimensional numpy array")
if scalar_gasp_mean not in ["zero","constant","linear"]:
raise NotImplementedError(
f"unsupported scalar_gasp_mean {scalar_gasp_mean}")
if indicator not in ["entropy", "variance"]:
raise NotImplementedError(f"unsupported indicator {indicator}")
self.ndim = ndim
self.run_sim_obj = run_sim_obj self.run_sim_obj = run_sim_obj
self.data = data
self.likelihood = likelihood
self.prior = prior self.prior = prior
self.bounds = bounds self.likelihood = likelihood
self.n0 = n0 self.n0 = n0
self.nt = nt self.nt = nt
...@@ -115,37 +132,61 @@ class ActiveLearning: ...@@ -115,37 +132,61 @@ class ActiveLearning:
self.lhs_sampler.bounds = bounds self.lhs_sampler.bounds = bounds
self.scalar_gasp = scalar_gasp self.scalar_gasp = scalar_gasp
self.indicator = indicator
if scalar_gasp_mean not in ["zero","constant","linear"]:
raise NotImplementedError(
f"unsupported scalar_gasp_mean {scalar_gasp_mean}"
f"please choose from 'zero', 'constant', and 'linear'")
self.scalar_gasp_mean = scalar_gasp_mean
if prefixes is None: if prefixes is None:
self.prefixes = [f'sim{i}' for i in range(nt)] prefixes = [f'sim{i}' for i in range(nt)]
elif len(prefixes) != nt:
raise ValueError("prefixes must have nt number of items")
elif len(set(prefixes)) != nt:
raise ValueError("Each item of prefixes must be unique")
self.prefixes = prefixes
self.scalar_gasp_mean = scalar_gasp_mean if indicator not in ["entropy", "variance"]:
raise NotImplementedError(
f"unsupported indicator {indicator}. Please choose from"
f" 'entropy' and 'variance'")
self.indicator = indicator
self.optimizer = optimizer
if optimizer is optimize.brute: if optimizer is optimize.brute:
if args_optimizer is None:
ranges = tuple(tuple(bounds[i]) for i in range(ndim)) ranges = tuple(tuple(bounds[i]) for i in range(ndim))
args_optimizer = tuple([ranges]) args_optimizer = [ranges]
if kwgs_optimizer is not None: if kwgs_optimizer is not None:
if "full_output" in kwgs_optimizer.keys(): allowed_keys = {"ranges", "args", "Ns", "full_output", "finish",
kwgs_optimizer["full_output"] = 0 "disp", "workers"}
if not set(kwgs_optimizer.keys()).issubset(allowed_keys):
raise ValueError(
"unsupported keyword(s) in kwgs_optimizer"
" for optimze.brute")
if "ranges" in kwgs_optimizer.keys(): if "ranges" in kwgs_optimizer.keys():
del kwgs_optimizer["ranges"] del kwgs_optimizer["ranges"]
if "args" in kwgs_optimizer.keys():
del kwgs_optimizer["args"]
if "full_output" in kwgs_optimizer.keys():
kwgs_optimizer["full_output"] = 0
self.optimizer = optimizer
self.args_prior = () if args_prior is None else args_prior self.args_prior = () if args_prior is None else args_prior
self.args_likelihood = () if args_likelihood is None else args_likelihood
self.args_optimizer=() if args_optimizer is None else args_optimizer
self.kwgs_prior = {} if kwgs_prior is None else kwgs_prior self.kwgs_prior = {} if kwgs_prior is None else kwgs_prior
self.args_likelihood = () if args_likelihood is None else args_likelihood
self.kwgs_likelihood = {} if kwgs_likelihood is None else kwgs_likelihood self.kwgs_likelihood = {} if kwgs_likelihood is None else kwgs_likelihood
self.args_optimizer=() if args_optimizer is None else args_optimizer
self.kwgs_optimizer={} if kwgs_optimizer is None else kwgs_optimizer self.kwgs_optimizer={} if kwgs_optimizer is None else kwgs_optimizer
self.var_inp_val = np.zeros((nt, ndim)) self.var_inp_val = np.zeros((nt, ndim))
self.sim_outputs = np.zeros((nt, len(data))) self.sim_outputs = np.zeros((nt, len(data)))
self.ln_pxl_val = np.zeros(nt) self.ln_pxl_val = np.zeros(nt)
def initial_simulation(self, mode='parallel', max_workers=None): @beartype
def initial_simulation(self, mode: str = 'serial',
max_workers: Union[int, None] = None) -> None:
""" """
Run `n0` initial simulations. Run `n0` initial simulations.
...@@ -168,16 +209,18 @@ class ActiveLearning: ...@@ -168,16 +209,18 @@ class ActiveLearning:
self.run_sim_obj.serial_run( self.run_sim_obj.serial_run(
self.var_inp_val[0:self.n0, :], self.prefixes[0:self.n0], self.var_inp_val[0:self.n0, :], self.prefixes[0:self.n0],
append=False) append=False)
else:
raise ValueError("mode must be 'parallel' or 'serial'")
self.sim_outputs[0:self.n0, :] = np.array(self.run_sim_obj.outputs) self.sim_outputs[0:self.n0, :] = np.array(self.run_sim_obj.outputs)
def iterative_emulation(self): def iterative_emulation(self) -> None:
""" """
Sequentially pick `nt - n0` new input points, run simulations, Sequentially pick `nt-n0` new input points, run simulations, and
and build emulators. build emulators.
""" """
self.ln_pxl_val[0:self.n0] = \ self.ln_pxl_val[0:self.n0] = [
[self._compute_ln_pxl(self.var_inp_val[i,:], self.sim_outputs[i,:]) self._compute_ln_pxl(self.var_inp_val[i,:], self.sim_outputs[i,:])
for i in range(self.n0)] for i in range(self.n0)]
for n in range(self.n0, self.nt + 1): for n in range(self.n0, self.nt + 1):
...@@ -187,14 +230,16 @@ class ActiveLearning: ...@@ -187,14 +230,16 @@ class ActiveLearning:
if n == self.nt: if n == self.nt:
break break
opt_res = self.optimizer(self._uncertainty_indicator, opt_res = self.optimizer(
*self.args_optimizer, **self.kwgs_optimizer) self._uncertainty_indicator,
*self.args_optimizer,
**self.kwgs_optimizer)
if isinstance(opt_res, np.ndarray): if isinstance(opt_res, np.ndarray):
new_val = opt_res new_val = opt_res
elif isinstance(opt_res, optimize.OptimizeResult): elif isinstance(opt_res, optimize.OptimizeResult):
new_val = opt_res.x new_val = opt_res.x
else: else:
raise TypeError( raise RuntimeError(
"Optimizer must return a 1d numpy array representing the" "Optimizer must return a 1d numpy array representing the"
" solution or a OptimizeResult object having x attribute") " solution or a OptimizeResult object having x attribute")
new_val = new_val.reshape((1, self.ndim)) new_val = new_val.reshape((1, self.ndim))
...@@ -208,7 +253,7 @@ class ActiveLearning: ...@@ -208,7 +253,7 @@ class ActiveLearning:
self.ln_pxl_val[n] = self._compute_ln_pxl(self.var_inp_val[n,:], self.ln_pxl_val[n] = self._compute_ln_pxl(self.var_inp_val[n,:],
self.sim_outputs[n,:]) self.sim_outputs[n,:])
def _compute_ln_pxl(self, x, y): def _compute_ln_pxl(self, x: np.ndarray, y: np.ndarray) -> float:
"""Compute natural logarithm of the product of prior and likelihood. """Compute natural logarithm of the product of prior and likelihood.
Parameters Parameters
...@@ -224,13 +269,14 @@ class ActiveLearning: ...@@ -224,13 +269,14 @@ class ActiveLearning:
ln_pxl_val : float ln_pxl_val : float
Natural logarithm of the product of prior and likelihood at `x`. Natural logarithm of the product of prior and likelihood at `x`.
""" """
pxl = self.prior(x, *self.args_prior, **self.kwgs_prior) * \ prior_val = self.prior(x, *self.args_prior, **self.kwgs_prior)
self.likelihood(y, self.data, *self.args_likelihood, likelihood_val = self.likelihood(y, self.data, *self.args_likelihood,
**self.kwgs_likelihood) **self.kwgs_likelihood)
pxl_val = prior_val * likelihood_val
return np.log(np.maximum(pxl, 10**(min_10_exp))) return float(np.log(np.maximum(pxl_val, _min_float)))
def _emulate_ln_pxl(self, n): def _emulate_ln_pxl(self, n: int) -> None:
""" """
Train emulator for natural lograthim of prior times likelihood based on Train emulator for natural lograthim of prior times likelihood based on
`n` simulation runs. `n` simulation runs.
...@@ -251,7 +297,7 @@ class ActiveLearning: ...@@ -251,7 +297,7 @@ class ActiveLearning:
self.var_inp_var[0:n,:], self.ln_pxl_val[0:n], self.var_inp_var[0:n,:], self.ln_pxl_val[0:n],
trend=trend) trend=trend)
def _uncertainty_indicator(self, x): def _uncertainty_indicator(self, x: np.array) -> float:
""" """
Indicator of uncertainty. Indicator of uncertainty.
...@@ -282,7 +328,7 @@ class ActiveLearning: ...@@ -282,7 +328,7 @@ class ActiveLearning:
neg_val = -(mean + 0.5*np.log(2*np.pi*np.e*std**2)) neg_val = -(mean + 0.5*np.log(2*np.pi*np.e*std**2))
elif self.indicator == "variance": elif self.indicator == "variance":
neg_val = -(2*mean + std**2) - np.log( neg_val = -(2*mean + std**2) - np.log(
np.maximum(np.exp(std**2)-1, 10**(min_10_exp)) np.maximum(np.exp(std**2)-1, _min_float)
) )
return float(neg_val) return float(neg_val)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment