diff --git a/src/psimpy/inference/active_learning.py b/src/psimpy/inference/active_learning.py index 10114b429c2151b26b2da12199d71ecf6add95cd..bf3735aa82151311a6fc5d696b1687af79717574 100644 --- a/src/psimpy/inference/active_learning.py +++ b/src/psimpy/inference/active_learning.py @@ -22,12 +22,9 @@ class ActiveLearning: run_sim_obj: RunSimulator, prior: Callable, likelihood: Callable, - n0: int, - nt: int, lhs_sampler: LHS, scalar_gasp: ScalarGaSP, scalar_gasp_mean: str = 'constant', - prefixes: Union[list[str], None] = None, indicator: str = 'entropy', optimizer: Callable = optimize.brute, args_prior: Union[list, None] = None, @@ -49,27 +46,22 @@ class ActiveLearning: bounds[:, 0] - lower boundaries of each parameter. bounds[:, 1] - upper boundaries of each parameter. data : numpy array - Observed data for parameter calibration. One dimensional array of - shape (k,) where k is the number of observed data. + Observed data for parameter calibration. 1d array of shape (k,) + where k is the number of observed data. run_sim_obj : instance of class `RunSimulator` It has an attribute `simulator` and two methods to run `simulator`, namely `serial_run` and `parrel_run`. - For each simulation, `simulator` must return outputs `y` as a one - dimensional numpy array of shape (k,), corresponding to `data`. + For each simulation, `simulator` must return outputs `y` as a 1d + numpy array of shape (k,), corresponding to `data`. prior : Callable Prior probability density function. Call with `prior(x, *args_prior, **kwgs_prior)` and return the prior probability density value at x. likelihood : Callable Likelihood function constructed based on `data` and corresponding - outputs `y` of `simulator`. + outputs `y` of `simulator` evaluated at `x`. Call with `likelihood(y, data, *args_likelihood, *kwgs_likelihood)` and return the likelihood value at x. - n0 : int - Number of initial simulation runs. Used to build an initial emulator. - nt : int - Number of total simulation runs. `nt-n0` is therefore the number of - simulation runs guided by active learning. lhs_sampler : instance of class LHS Used to draw initial samples of `x`. scalar_gasp : instance of class ScalarGaSP @@ -79,9 +71,6 @@ class ActiveLearning: 'zero' - mean is set to zero 'constant' - mean is set to a constant 'linear' - mean is linear to training input - prefixes : list of str, optional - Consist of `nt` prefixes. Each of them is used to name corresponding - simulation output file. indicator : str, optional Indicator of uncertainty. 'entropy' or 'variance'. optimizer : Callable, optional @@ -125,8 +114,6 @@ class ActiveLearning: self.run_sim_obj = run_sim_obj self.prior = prior self.likelihood = likelihood - self.n0 = n0 - self.nt = nt self.lhs_sampler = lhs_sampler self.lhs_sampler.bounds = bounds @@ -138,14 +125,6 @@ class ActiveLearning: f"unsupported scalar_gasp_mean {scalar_gasp_mean}" f"please choose from 'zero', 'constant', and 'linear'") self.scalar_gasp_mean = scalar_gasp_mean - - if prefixes is None: - prefixes = [f'sim{i}' for i in range(nt)] - elif len(prefixes) != nt: - raise ValueError("prefixes must have nt number of items") - elif len(set(prefixes)) != nt: - raise ValueError("Each item of prefixes must be unique") - self.prefixes = prefixes if indicator not in ["entropy", "variance"]: raise NotImplementedError( @@ -179,79 +158,179 @@ class ActiveLearning: self.args_optimizer=() if args_optimizer is None else args_optimizer self.kwgs_optimizer={} if kwgs_optimizer is None else kwgs_optimizer - - self.var_inp_val = np.zeros((nt, ndim)) - self.sim_outputs = np.zeros((nt, len(data))) - self.ln_pxl_val = np.zeros(nt) + @beartype - def initial_simulation(self, mode: str = 'serial', - max_workers: Union[int, None] = None) -> None: + def initial_simulation( + self, + n0: int, + prefixes: Union[list[str], None] = None, + mode: str = 'serial', + max_workers: Union[int, None] = None) -> tuple[np.ndarray, np.ndarray]: """ Run `n0` initial simulations. Parameters ---------- + n0 : int + Number of initial simulation runs. + prefixes : list of str, optional + Consist of `n0` strings. Each of them is used to name corresponding + simulation output file. + If None, 'sim0','sim1',... are used. mode : str, optional 'parallel' - run `n0` simulations in parallel. 'serial' - run `n0` simulations in serial. max_workers : int, optional Controls the maximum number of tasks running in parallel. - Default is the number of CPUs on the host. + Default is the number of CPUs on the host. + + Returns + ------- + init_var_samples: numpy array + Variable input samples for `n0` initial simulations. + Shape of (n0, ndim). + init_sim_outputs : numpy array + Outputs of `n0` intial simulations, corrresponding to `data`. + Shape of (n0, len(data)). """ - self.var_inp_val[0:self.n0, :] = self.lhs_sampler.sample(self.n0) + init_var_samples = self.lhs_sampler.sample(n0) if mode == 'parallel': - self.run_sim_obj.parallel_run( - self.var_inp_val[0:self.n0, :], self.prefixes[0:self.n0], + self.run_sim_obj.parallel_run(init_var_samples, prefixes, append=False, max_workers=max_workers) elif mode == 'serial': - self.run_sim_obj.serial_run( - self.var_inp_val[0:self.n0, :], self.prefixes[0:self.n0], + self.run_sim_obj.serial_run(init_var_samples, prefixes, append=False) else: raise ValueError("mode must be 'parallel' or 'serial'") + + init_sim_outputs = np.array(self.run_sim_obj.outputs) - self.sim_outputs[0:self.n0, :] = np.array(self.run_sim_obj.outputs) + return init_var_samples, init_sim_outputs - def iterative_emulation(self) -> None: + @beartype + def iterative_emulation( + self, + ninit: int, + init_var_samples: np.ndarray, + init_sim_outputs: np.ndarray, + niter: int, + iter_prefixes: Union[list[str], None] = None + ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ - Sequentially pick `nt-n0` new input points, run simulations, and - build emulators. + Sequentially pick `niter` new input points based on `ninit` simulations. + + Parameters + ---------- + niter : int + Number of interative simulaitons. + ninit : int + Number of initial simulations. + init_var_samples: numpy array + Variable input samples for `ninit` simulations. + Shape of (ninit, ndim). + init_sim_outputs : numpy array + Outputs of `ninit` simulations, corrresponding to `data`. + Shape of (ninit, len(data)). + iter_prefixes : list of str, optional + Consist of `niter` strings. Each of them is used to name + corresponding iterative simulation output file. + If None, f'iter_sim0',f'iter_sim1',... are used. + + Returns + ------- + var_samples : numpy array + Variable input samples of `ninit` simulations and `niter` + iterative simulations. 2d array of shape (ninit+niter, ndim). + ln_pxl_values : numpy array + Natural logarithm values of the product of prior and likelihood + at `ninit` and `niter` simulations. + 1d array of shape (ninit+niter,). + sim_outputs : numpy array + Outputs of `ninit` and `niter` simulations, corresponding to `data`. + 2d array of shape (ninit+niter, len(data)). """ - self.ln_pxl_val[0:self.n0] = [ - self._compute_ln_pxl(self.var_inp_val[i,:], self.sim_outputs[i,:]) - for i in range(self.n0)] + if init_var_samples.shape != (ninit, self.ndim): + raise ValueError("init_var_samples must be of shape (ninit, ndim)") + + if init_sim_outputs.shape != (ninit, len(self.data)): + raise ValueError( + "init_sim_outputs must be of shape (ninit, len(data))") + + if iter_prefixes is None: + iter_prefixes = [f'iter_sim{i}' for i in range(niter)] + elif len(iter_prefixes) != niter: + raise ValueError("iter_prefixes must have niter number of items") + elif len(set(iter_prefixes)) != niter: + raise ValueError("Each item of iter_prefixes must be unique") - for n in range(self.n0, self.nt + 1): + ln_pxl_values = [ + self._compute_ln_pl(init_var_samples[i,:], init_sim_outputs[i,:]) + for i in range(ninit) + ] + var_samples = init_var_samples + sim_outputs = init_sim_outputs + + for i in range(niter): - self._emulate_ln_pxl(n) - - if n == self.nt: - break + self._emulate_ln_pxl(var_samples, np.array(ln_pxl_values)) opt_res = self.optimizer( self._uncertainty_indicator, *self.args_optimizer, **self.kwgs_optimizer) if isinstance(opt_res, np.ndarray): - new_val = opt_res + next_var_sample = opt_res elif isinstance(opt_res, optimize.OptimizeResult): - new_val = opt_res.x + next_var_sample = opt_res.x else: raise RuntimeError( "Optimizer must return a 1d numpy array representing the" " solution or a OptimizeResult object having x attribute") - new_val = new_val.reshape((1, self.ndim)) - - self.run_sim_obj.serial_run( - new_val, [self.prefixes[n]], append=True) - self.sim_outputs[n, :] = self.run_sim_obj.outputs[n] + next_var_sample = next_var_sample.reshape((1, self.ndim)) + var_samples = np.vstack((var_samples, next_var_sample)) + + self.run_sim_obj.serial_run(next_var_sample, [iter_prefixes[i]], + append=False) + next_sim_output = self.run_sim_obj.outputs[0] + sim_outputs = np.vstack((sim_outputs, next_sim_output)) + + next_ln_pxl_value = self._compute_ln_pxl( + next_var_sample.reshape(-1), next_sim_output) + ln_pxl_values.append(next_ln_pxl_value) + + ln_pxl_values = np.array(ln_pxl_values) + + return var_samples, ln_pxl_values, sim_outputs + + @beartype + def emulate_and_predict_ln_pxl(self, x: np.ndarray, var_samples: np.ndarray, + ln_pxl_values: np.ndarray) -> float: + """ + Build a scalar GP emulator for ln_pxl and make prediction at new x. + + Parameters + ---------- + x : numpy array + One variable sample at which ln_pxl is to be approximated. 1d array + of shape (ndim,) + var_samples : numpy array + Samples of variable inputs. 2d array of shape (n, ndim). + ln_pxl_values : numpy array + Natural logarithm values of the product of prior and likelihood + at `var_samples`. 1d array of shape (n,). + + Returns + ------- + A float value which is the emulator-predicted ln_pxl value at x. + """ + self._emulate_ln_pxl(var_samples, ln_pxl_values) + predict = self._predict_ln_pxl(x) + + return float(predict[:,0]) - self.var_inp_val[n, :] = new_val - self.ln_pxl_val[n] = self._compute_ln_pxl(self.var_inp_val[n,:], - self.sim_outputs[n,:]) def _compute_ln_pxl(self, x: np.ndarray, y: np.ndarray) -> float: """Compute natural logarithm of the product of prior and likelihood. @@ -259,10 +338,10 @@ class ActiveLearning: Parameters ---------- x : numpy array - Variable input of `simulator`. One dimension array of length `ndim`. + Variable input of `simulator`. 1d array of shape (ndim,). y : numpy array - Simulation outputs at `x` corresponding to `data`. One dimension - array having same length as `data`. + Simulation outputs at `x` corresponding to `data`. 1d array of shape + (len(data),). Returns ------- @@ -275,41 +354,51 @@ class ActiveLearning: pxl_val = prior_val * likelihood_val return float(np.log(np.maximum(pxl_val, _min_float))) + - def _emulate_ln_pxl(self, n: int) -> None: + def _emulate_ln_pxl(self, var_samples: np.ndarray, ln_pxl_values: np.ndarray + ) -> None: """ - Train emulator for natural lograthim of prior times likelihood based on - `n` simulation runs. + Train scalar gasp emulator for natural lograthim of prior times + likelihood based on `n` simulations. Parameters ---------- - n : int - Number of simulations - """ + var_samples: numpy array + Samples of variable inputs of `n` simulations. 2d array of shape + (n, ndim). + ln_pxl_values : numpy array + Natural lograthim of prior times likelhood values corresponding to + `n` simulations. 1d array of shape (n,) + """ + n = len(var_samples) + if self.scalar_gasp_mean == "zero": trend = np.zeros((n, 1)) elif self.scalar_gasp_mean == "constant": trend = np.ones((n, 1)) elif self.scalar_gasp_mean == "linear": - trend = np.c_[np.ones(n), self.var_inp_val[0:n,:]] + trend = np.c_[np.ones(n), var_samples] - self.scalar_gasp.train( - self.var_inp_var[0:n,:], self.ln_pxl_val[0:n], - trend=trend) + self.scalar_gasp.train(var_samples, ln_pxl_values, trend) - def _uncertainty_indicator(self, x: np.array) -> float: + def _predict_ln_pxl(self, x: np.ndarray) -> np.ndarray: """ - Indicator of uncertainty. + Make prediction of ln_pxl at `x` using scalar GP emulator. Parameters ---------- x : numpy array - One variable input point, 1d array of length `ndim`. + One variable input point, 1d array of shape (ndim,). Returns ------- - neg_val : float - Negative value of uncertainty indicator at `x`. + predictions : numpy array + Emulator-prediction at `x`. Shape (1, 4). + predictions[0, 0] - mean + predictions[0, 1] - low95 + predictions[0, 2] - upper95 + predictions[0, 3] - sd """ x = x.reshape((1, self.ndim)) @@ -320,7 +409,26 @@ class ActiveLearning: elif self.scalar_gasp_mean == "linear": trend = np.c_[np.ones(1), x] - predict = self.scalar_gasp.predict(x, testing_trend=trend) + predictions = self.scalar_gasp.predict(x, trend) + + return predictions + + def _uncertainty_indicator(self, x: np.ndarray) -> float: + """ + Indicator of uncertainty. + + Parameters + ---------- + x : numpy array + One variable input point, 1d array of shape (ndim,). + + Returns + ------- + neg_val : float + Negative value of uncertainty indicator at `x`. + """ + predict = self._predict_ln_pxl(x) + mean = predict[:, 0] std = predict[:, 3]