Skip to content
Snippets Groups Projects
Commit fa135844 authored by Hu Zhao's avatar Hu Zhao
Browse files

feat: move n0, nt and prefixes, add new methods

parent 6e796da0
Branches
No related tags found
No related merge requests found
...@@ -22,12 +22,9 @@ class ActiveLearning: ...@@ -22,12 +22,9 @@ class ActiveLearning:
run_sim_obj: RunSimulator, run_sim_obj: RunSimulator,
prior: Callable, prior: Callable,
likelihood: Callable, likelihood: Callable,
n0: int,
nt: int,
lhs_sampler: LHS, lhs_sampler: LHS,
scalar_gasp: ScalarGaSP, scalar_gasp: ScalarGaSP,
scalar_gasp_mean: str = 'constant', scalar_gasp_mean: str = 'constant',
prefixes: Union[list[str], None] = None,
indicator: str = 'entropy', indicator: str = 'entropy',
optimizer: Callable = optimize.brute, optimizer: Callable = optimize.brute,
args_prior: Union[list, None] = None, args_prior: Union[list, None] = None,
...@@ -49,27 +46,22 @@ class ActiveLearning: ...@@ -49,27 +46,22 @@ class ActiveLearning:
bounds[:, 0] - lower boundaries of each parameter. bounds[:, 0] - lower boundaries of each parameter.
bounds[:, 1] - upper boundaries of each parameter. bounds[:, 1] - upper boundaries of each parameter.
data : numpy array data : numpy array
Observed data for parameter calibration. One dimensional array of Observed data for parameter calibration. 1d array of shape (k,)
shape (k,) where k is the number of observed data. where k is the number of observed data.
run_sim_obj : instance of class `RunSimulator` run_sim_obj : instance of class `RunSimulator`
It has an attribute `simulator` and two methods to run `simulator`, It has an attribute `simulator` and two methods to run `simulator`,
namely `serial_run` and `parrel_run`. namely `serial_run` and `parrel_run`.
For each simulation, `simulator` must return outputs `y` as a one For each simulation, `simulator` must return outputs `y` as a 1d
dimensional numpy array of shape (k,), corresponding to `data`. numpy array of shape (k,), corresponding to `data`.
prior : Callable prior : Callable
Prior probability density function. Prior probability density function.
Call with `prior(x, *args_prior, **kwgs_prior)` and return the prior Call with `prior(x, *args_prior, **kwgs_prior)` and return the prior
probability density value at x. probability density value at x.
likelihood : Callable likelihood : Callable
Likelihood function constructed based on `data` and corresponding Likelihood function constructed based on `data` and corresponding
outputs `y` of `simulator`. outputs `y` of `simulator` evaluated at `x`.
Call with `likelihood(y, data, *args_likelihood, *kwgs_likelihood)` Call with `likelihood(y, data, *args_likelihood, *kwgs_likelihood)`
and return the likelihood value at x. and return the likelihood value at x.
n0 : int
Number of initial simulation runs. Used to build an initial emulator.
nt : int
Number of total simulation runs. `nt-n0` is therefore the number of
simulation runs guided by active learning.
lhs_sampler : instance of class LHS lhs_sampler : instance of class LHS
Used to draw initial samples of `x`. Used to draw initial samples of `x`.
scalar_gasp : instance of class ScalarGaSP scalar_gasp : instance of class ScalarGaSP
...@@ -79,9 +71,6 @@ class ActiveLearning: ...@@ -79,9 +71,6 @@ class ActiveLearning:
'zero' - mean is set to zero 'zero' - mean is set to zero
'constant' - mean is set to a constant 'constant' - mean is set to a constant
'linear' - mean is linear to training input 'linear' - mean is linear to training input
prefixes : list of str, optional
Consist of `nt` prefixes. Each of them is used to name corresponding
simulation output file.
indicator : str, optional indicator : str, optional
Indicator of uncertainty. 'entropy' or 'variance'. Indicator of uncertainty. 'entropy' or 'variance'.
optimizer : Callable, optional optimizer : Callable, optional
...@@ -125,8 +114,6 @@ class ActiveLearning: ...@@ -125,8 +114,6 @@ class ActiveLearning:
self.run_sim_obj = run_sim_obj self.run_sim_obj = run_sim_obj
self.prior = prior self.prior = prior
self.likelihood = likelihood self.likelihood = likelihood
self.n0 = n0
self.nt = nt
self.lhs_sampler = lhs_sampler self.lhs_sampler = lhs_sampler
self.lhs_sampler.bounds = bounds self.lhs_sampler.bounds = bounds
...@@ -139,14 +126,6 @@ class ActiveLearning: ...@@ -139,14 +126,6 @@ class ActiveLearning:
f"please choose from 'zero', 'constant', and 'linear'") f"please choose from 'zero', 'constant', and 'linear'")
self.scalar_gasp_mean = scalar_gasp_mean self.scalar_gasp_mean = scalar_gasp_mean
if prefixes is None:
prefixes = [f'sim{i}' for i in range(nt)]
elif len(prefixes) != nt:
raise ValueError("prefixes must have nt number of items")
elif len(set(prefixes)) != nt:
raise ValueError("Each item of prefixes must be unique")
self.prefixes = prefixes
if indicator not in ["entropy", "variance"]: if indicator not in ["entropy", "variance"]:
raise NotImplementedError( raise NotImplementedError(
f"unsupported indicator {indicator}. Please choose from" f"unsupported indicator {indicator}. Please choose from"
...@@ -180,78 +159,178 @@ class ActiveLearning: ...@@ -180,78 +159,178 @@ class ActiveLearning:
self.args_optimizer=() if args_optimizer is None else args_optimizer self.args_optimizer=() if args_optimizer is None else args_optimizer
self.kwgs_optimizer={} if kwgs_optimizer is None else kwgs_optimizer self.kwgs_optimizer={} if kwgs_optimizer is None else kwgs_optimizer
self.var_inp_val = np.zeros((nt, ndim))
self.sim_outputs = np.zeros((nt, len(data)))
self.ln_pxl_val = np.zeros(nt)
@beartype @beartype
def initial_simulation(self, mode: str = 'serial', def initial_simulation(
max_workers: Union[int, None] = None) -> None: self,
n0: int,
prefixes: Union[list[str], None] = None,
mode: str = 'serial',
max_workers: Union[int, None] = None) -> tuple[np.ndarray, np.ndarray]:
""" """
Run `n0` initial simulations. Run `n0` initial simulations.
Parameters Parameters
---------- ----------
n0 : int
Number of initial simulation runs.
prefixes : list of str, optional
Consist of `n0` strings. Each of them is used to name corresponding
simulation output file.
If None, 'sim0','sim1',... are used.
mode : str, optional mode : str, optional
'parallel' - run `n0` simulations in parallel. 'parallel' - run `n0` simulations in parallel.
'serial' - run `n0` simulations in serial. 'serial' - run `n0` simulations in serial.
max_workers : int, optional max_workers : int, optional
Controls the maximum number of tasks running in parallel. Controls the maximum number of tasks running in parallel.
Default is the number of CPUs on the host. Default is the number of CPUs on the host.
Returns
-------
init_var_samples: numpy array
Variable input samples for `n0` initial simulations.
Shape of (n0, ndim).
init_sim_outputs : numpy array
Outputs of `n0` intial simulations, corrresponding to `data`.
Shape of (n0, len(data)).
""" """
self.var_inp_val[0:self.n0, :] = self.lhs_sampler.sample(self.n0) init_var_samples = self.lhs_sampler.sample(n0)
if mode == 'parallel': if mode == 'parallel':
self.run_sim_obj.parallel_run( self.run_sim_obj.parallel_run(init_var_samples, prefixes,
self.var_inp_val[0:self.n0, :], self.prefixes[0:self.n0],
append=False, max_workers=max_workers) append=False, max_workers=max_workers)
elif mode == 'serial': elif mode == 'serial':
self.run_sim_obj.serial_run( self.run_sim_obj.serial_run(init_var_samples, prefixes,
self.var_inp_val[0:self.n0, :], self.prefixes[0:self.n0],
append=False) append=False)
else: else:
raise ValueError("mode must be 'parallel' or 'serial'") raise ValueError("mode must be 'parallel' or 'serial'")
self.sim_outputs[0:self.n0, :] = np.array(self.run_sim_obj.outputs) init_sim_outputs = np.array(self.run_sim_obj.outputs)
return init_var_samples, init_sim_outputs
def iterative_emulation(self) -> None: @beartype
def iterative_emulation(
self,
ninit: int,
init_var_samples: np.ndarray,
init_sim_outputs: np.ndarray,
niter: int,
iter_prefixes: Union[list[str], None] = None
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
""" """
Sequentially pick `nt-n0` new input points, run simulations, and Sequentially pick `niter` new input points based on `ninit` simulations.
build emulators.
Parameters
----------
niter : int
Number of interative simulaitons.
ninit : int
Number of initial simulations.
init_var_samples: numpy array
Variable input samples for `ninit` simulations.
Shape of (ninit, ndim).
init_sim_outputs : numpy array
Outputs of `ninit` simulations, corrresponding to `data`.
Shape of (ninit, len(data)).
iter_prefixes : list of str, optional
Consist of `niter` strings. Each of them is used to name
corresponding iterative simulation output file.
If None, f'iter_sim0',f'iter_sim1',... are used.
Returns
-------
var_samples : numpy array
Variable input samples of `ninit` simulations and `niter`
iterative simulations. 2d array of shape (ninit+niter, ndim).
ln_pxl_values : numpy array
Natural logarithm values of the product of prior and likelihood
at `ninit` and `niter` simulations.
1d array of shape (ninit+niter,).
sim_outputs : numpy array
Outputs of `ninit` and `niter` simulations, corresponding to `data`.
2d array of shape (ninit+niter, len(data)).
""" """
self.ln_pxl_val[0:self.n0] = [ if init_var_samples.shape != (ninit, self.ndim):
self._compute_ln_pxl(self.var_inp_val[i,:], self.sim_outputs[i,:]) raise ValueError("init_var_samples must be of shape (ninit, ndim)")
for i in range(self.n0)]
for n in range(self.n0, self.nt + 1): if init_sim_outputs.shape != (ninit, len(self.data)):
raise ValueError(
"init_sim_outputs must be of shape (ninit, len(data))")
if iter_prefixes is None:
iter_prefixes = [f'iter_sim{i}' for i in range(niter)]
elif len(iter_prefixes) != niter:
raise ValueError("iter_prefixes must have niter number of items")
elif len(set(iter_prefixes)) != niter:
raise ValueError("Each item of iter_prefixes must be unique")
self._emulate_ln_pxl(n) ln_pxl_values = [
self._compute_ln_pl(init_var_samples[i,:], init_sim_outputs[i,:])
for i in range(ninit)
]
var_samples = init_var_samples
sim_outputs = init_sim_outputs
if n == self.nt: for i in range(niter):
break
self._emulate_ln_pxl(var_samples, np.array(ln_pxl_values))
opt_res = self.optimizer( opt_res = self.optimizer(
self._uncertainty_indicator, self._uncertainty_indicator,
*self.args_optimizer, *self.args_optimizer,
**self.kwgs_optimizer) **self.kwgs_optimizer)
if isinstance(opt_res, np.ndarray): if isinstance(opt_res, np.ndarray):
new_val = opt_res next_var_sample = opt_res
elif isinstance(opt_res, optimize.OptimizeResult): elif isinstance(opt_res, optimize.OptimizeResult):
new_val = opt_res.x next_var_sample = opt_res.x
else: else:
raise RuntimeError( raise RuntimeError(
"Optimizer must return a 1d numpy array representing the" "Optimizer must return a 1d numpy array representing the"
" solution or a OptimizeResult object having x attribute") " solution or a OptimizeResult object having x attribute")
new_val = new_val.reshape((1, self.ndim))
self.run_sim_obj.serial_run( next_var_sample = next_var_sample.reshape((1, self.ndim))
new_val, [self.prefixes[n]], append=True) var_samples = np.vstack((var_samples, next_var_sample))
self.run_sim_obj.serial_run(next_var_sample, [iter_prefixes[i]],
append=False)
next_sim_output = self.run_sim_obj.outputs[0]
sim_outputs = np.vstack((sim_outputs, next_sim_output))
next_ln_pxl_value = self._compute_ln_pxl(
next_var_sample.reshape(-1), next_sim_output)
ln_pxl_values.append(next_ln_pxl_value)
self.sim_outputs[n, :] = self.run_sim_obj.outputs[n] ln_pxl_values = np.array(ln_pxl_values)
return var_samples, ln_pxl_values, sim_outputs
@beartype
def emulate_and_predict_ln_pxl(self, x: np.ndarray, var_samples: np.ndarray,
ln_pxl_values: np.ndarray) -> float:
"""
Build a scalar GP emulator for ln_pxl and make prediction at new x.
Parameters
----------
x : numpy array
One variable sample at which ln_pxl is to be approximated. 1d array
of shape (ndim,)
var_samples : numpy array
Samples of variable inputs. 2d array of shape (n, ndim).
ln_pxl_values : numpy array
Natural logarithm values of the product of prior and likelihood
at `var_samples`. 1d array of shape (n,).
Returns
-------
A float value which is the emulator-predicted ln_pxl value at x.
"""
self._emulate_ln_pxl(var_samples, ln_pxl_values)
predict = self._predict_ln_pxl(x)
return float(predict[:,0])
self.var_inp_val[n, :] = new_val
self.ln_pxl_val[n] = self._compute_ln_pxl(self.var_inp_val[n,:],
self.sim_outputs[n,:])
def _compute_ln_pxl(self, x: np.ndarray, y: np.ndarray) -> float: def _compute_ln_pxl(self, x: np.ndarray, y: np.ndarray) -> float:
"""Compute natural logarithm of the product of prior and likelihood. """Compute natural logarithm of the product of prior and likelihood.
...@@ -259,10 +338,10 @@ class ActiveLearning: ...@@ -259,10 +338,10 @@ class ActiveLearning:
Parameters Parameters
---------- ----------
x : numpy array x : numpy array
Variable input of `simulator`. One dimension array of length `ndim`. Variable input of `simulator`. 1d array of shape (ndim,).
y : numpy array y : numpy array
Simulation outputs at `x` corresponding to `data`. One dimension Simulation outputs at `x` corresponding to `data`. 1d array of shape
array having same length as `data`. (len(data),).
Returns Returns
------- -------
...@@ -276,40 +355,50 @@ class ActiveLearning: ...@@ -276,40 +355,50 @@ class ActiveLearning:
return float(np.log(np.maximum(pxl_val, _min_float))) return float(np.log(np.maximum(pxl_val, _min_float)))
def _emulate_ln_pxl(self, n: int) -> None:
def _emulate_ln_pxl(self, var_samples: np.ndarray, ln_pxl_values: np.ndarray
) -> None:
""" """
Train emulator for natural lograthim of prior times likelihood based on Train scalar gasp emulator for natural lograthim of prior times
`n` simulation runs. likelihood based on `n` simulations.
Parameters Parameters
---------- ----------
n : int var_samples: numpy array
Number of simulations Samples of variable inputs of `n` simulations. 2d array of shape
(n, ndim).
ln_pxl_values : numpy array
Natural lograthim of prior times likelhood values corresponding to
`n` simulations. 1d array of shape (n,)
""" """
n = len(var_samples)
if self.scalar_gasp_mean == "zero": if self.scalar_gasp_mean == "zero":
trend = np.zeros((n, 1)) trend = np.zeros((n, 1))
elif self.scalar_gasp_mean == "constant": elif self.scalar_gasp_mean == "constant":
trend = np.ones((n, 1)) trend = np.ones((n, 1))
elif self.scalar_gasp_mean == "linear": elif self.scalar_gasp_mean == "linear":
trend = np.c_[np.ones(n), self.var_inp_val[0:n,:]] trend = np.c_[np.ones(n), var_samples]
self.scalar_gasp.train( self.scalar_gasp.train(var_samples, ln_pxl_values, trend)
self.var_inp_var[0:n,:], self.ln_pxl_val[0:n],
trend=trend)
def _uncertainty_indicator(self, x: np.array) -> float: def _predict_ln_pxl(self, x: np.ndarray) -> np.ndarray:
""" """
Indicator of uncertainty. Make prediction of ln_pxl at `x` using scalar GP emulator.
Parameters Parameters
---------- ----------
x : numpy array x : numpy array
One variable input point, 1d array of length `ndim`. One variable input point, 1d array of shape (ndim,).
Returns Returns
------- -------
neg_val : float predictions : numpy array
Negative value of uncertainty indicator at `x`. Emulator-prediction at `x`. Shape (1, 4).
predictions[0, 0] - mean
predictions[0, 1] - low95
predictions[0, 2] - upper95
predictions[0, 3] - sd
""" """
x = x.reshape((1, self.ndim)) x = x.reshape((1, self.ndim))
...@@ -320,7 +409,26 @@ class ActiveLearning: ...@@ -320,7 +409,26 @@ class ActiveLearning:
elif self.scalar_gasp_mean == "linear": elif self.scalar_gasp_mean == "linear":
trend = np.c_[np.ones(1), x] trend = np.c_[np.ones(1), x]
predict = self.scalar_gasp.predict(x, testing_trend=trend) predictions = self.scalar_gasp.predict(x, trend)
return predictions
def _uncertainty_indicator(self, x: np.ndarray) -> float:
"""
Indicator of uncertainty.
Parameters
----------
x : numpy array
One variable input point, 1d array of shape (ndim,).
Returns
-------
neg_val : float
Negative value of uncertainty indicator at `x`.
"""
predict = self._predict_ln_pxl(x)
mean = predict[:, 0] mean = predict[:, 0]
std = predict[:, 3] std = predict[:, 3]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment