Skip to content
Snippets Groups Projects
Commit 16e3bf8f authored by Hu Zhao's avatar Hu Zhao
Browse files

test: implement test for active learning process

parent b01f7006
No related branches found
No related tags found
No related merge requests found
import pytest import pytest
import numpy as np import numpy as np
from scipy import optimize
from psimpy.inference.active_learning import ActiveLearning from psimpy.inference.active_learning import ActiveLearning
from psimpy.simulator.run_simulator import RunSimulator from psimpy.simulator.run_simulator import RunSimulator
from psimpy.simulator.mass_point_model import MassPointModel from psimpy.simulator.mass_point_model import MassPointModel
from psimpy.sampler.latin import LHS from psimpy.sampler.latin import LHS
from psimpy.sampler.saltelli import Saltelli from psimpy.sampler.saltelli import Saltelli
from psimpy.emulator.robustgasp import ScalarGaSP, PPGaSP from psimpy.emulator.robustgasp import ScalarGaSP, PPGaSP
from psimpy.inference.bayes_inference import GridEstimation
from psimpy.inference.bayes_inference import MetropolisHastingsEstimation
from psimpy.sampler.metropolis_hastings import MetropolisHastings
from scipy.stats import uniform, norm from scipy.stats import uniform, norm
from scipy import optimize from scipy import optimize
from beartype.roar import BeartypeCallHintParamViolation from beartype.roar import BeartypeCallHintParamViolation
import matplotlib.pyplot as plt
import os
dir_test = os.path.abspath(os.path.join(__file__, '../'))
@pytest.mark.parametrize( @pytest.mark.parametrize(
"run_sim_obj, prior, likelihood, lhs_sampler, scalar_gasp, optimizer", "run_sim_obj, prior, likelihood, lhs_sampler, scalar_gasp, optimizer",
...@@ -63,13 +71,13 @@ def test_ActiveLearning_init_RuntimeError(run_sim_obj, lhs_sampler, ...@@ -63,13 +71,13 @@ def test_ActiveLearning_init_RuntimeError(run_sim_obj, lhs_sampler,
@pytest.mark.parametrize( @pytest.mark.parametrize(
"scalar_gasp_mean, indicator", "scalar_gasp_trend, indicator",
[ [
('cubic', 'entropy'), ('cubic', 'entropy'),
('linear', 'divergence') ('linear', 'divergence')
] ]
) )
def test_ActiveLearning_init_NotImplementedError(scalar_gasp_mean, indicator): def test_ActiveLearning_init_NotImplementedError(scalar_gasp_trend, indicator):
ndim = 1 ndim = 1
bounds = np.array([[0,1]]) bounds = np.array([[0,1]])
data = np.array([1,2,3]) data = np.array([1,2,3])
...@@ -80,7 +88,7 @@ def test_ActiveLearning_init_NotImplementedError(scalar_gasp_mean, indicator): ...@@ -80,7 +88,7 @@ def test_ActiveLearning_init_NotImplementedError(scalar_gasp_mean, indicator):
likelihood = norm.pdf likelihood = norm.pdf
with pytest.raises(NotImplementedError): with pytest.raises(NotImplementedError):
_ = ActiveLearning(ndim, bounds, data, run_sim_obj, prior, likelihood, _ = ActiveLearning(ndim, bounds, data, run_sim_obj, prior, likelihood,
lhs_sampler, scalar_gasp, scalar_gasp_mean=scalar_gasp_mean, lhs_sampler, scalar_gasp, scalar_gasp_trend=scalar_gasp_trend,
indicator=indicator) indicator=indicator)
def test_ActiveLearning_init_ValueError(): def test_ActiveLearning_init_ValueError():
...@@ -96,3 +104,159 @@ def test_ActiveLearning_init_ValueError(): ...@@ -96,3 +104,159 @@ def test_ActiveLearning_init_ValueError():
with pytest.raises(ValueError): with pytest.raises(ValueError):
_ = ActiveLearning(ndim, bounds, data, run_sim_obj, prior, likelihood, _ = ActiveLearning(ndim, bounds, data, run_sim_obj, prior, likelihood,
lhs_sampler, scalar_gasp, kwgs_optimizer=kwgs_optimizer) lhs_sampler, scalar_gasp, kwgs_optimizer=kwgs_optimizer)
def f(x1, x2, dir_sim, output_name):
"""Set simulator as y=x for Rosenbrock function."""
np.savetxt(os.path.join(dir_sim, f'{output_name}.txt'), np.array([x1,x2]))
return np.array([x1,x2])
@pytest.fixture
def active_learner():
"""Create an instance of ActiveLearning."""
ndim = 2
bounds = np.array([[-5,5],[-5,5]])
data = np.array([1])
if not os.path.exists(os.path.join(dir_test, 'temp_active_learning')):
os.chdir(dir_test)
os.mkdir('temp_active_learning')
os.chdir('temp_active_learning')
os.mkdir('simulator_internal_outputs')
os.mkdir('run_simulator_outputs')
fix_inp = {'dir_sim':
os.path.join(dir_test,'temp_active_learning/simulator_internal_outputs')}
run_Rosenbrock = RunSimulator(
f,
var_inp_parameter=['x1','x2'],
fix_inp=fix_inp,
o_parameter = 'output_name',
dir_out=os.path.join(dir_test,
'temp_active_learning/run_simulator_outputs'),
save_out=True)
def prior(x):
"""Uniform prior U[-5,5]x[-5,5]"""
return 1/100
def likelihood(y, data):
"""Rosenbrock function."""
return np.exp(-(y[0]-data)**2/100 - (y[0]**2-y[1])**2)
seed = 2
lhs_sampler = LHS(ndim=ndim, bounds=bounds, seed=seed)
scalar_gasp = ScalarGaSP(ndim=ndim)
return ActiveLearning(ndim, bounds, data, run_Rosenbrock, prior, likelihood,
lhs_sampler, scalar_gasp)
def test_ActiveLearning_brute_entropy(active_learner):
test_name = 'brute_entropy_grid_estimation'
active_learner.indicator = 'entropy'
active_learner.optimizer = optimize.brute
active_learner.kwgs_optimizer = {'Ns':50}
n0 = 40
prefixes = [f'init_{test_name}_sim{i}' for i in range(n0)]
init_var_samples, init_sim_outputs = active_learner.initial_simulation(
n0, prefixes, mode='parallel', max_workers=4)
assert init_var_samples.shape == (n0, active_learner.ndim)
assert init_sim_outputs.shape[0] == n0
niter = 60
iter_prefixes = [f'iter_{test_name}_sim{i}' for i in range(niter)]
var_samples, _, _ = active_learner.iterative_emulation(n0, init_var_samples,
init_sim_outputs, niter=niter, iter_prefixes=iter_prefixes)
assert var_samples.shape == (n0+niter, active_learner.ndim)
grid_estimator = GridEstimation(
ndim=active_learner.ndim,
bounds=active_learner.bounds,
ln_pxl=active_learner.approx_ln_pxl)
nbins = [50, 50]
posterior, x_ndim = grid_estimator.run(nbins)
# plot
fig, ax = plt.subplots(1,1,figsize=(6,4))
ax.scatter(init_var_samples[:,0], init_var_samples[:,1], s=10, c='r',
marker='o', zorder=1, alpha=0.8)
ax.scatter(var_samples[n0:,0], var_samples[n0:,1], s=15, c='k', marker='+',
zorder=2, alpha=0.8)
posterior = np.where(posterior < 1e-10, None, posterior)
contour = ax.contour(x_ndim[0], x_ndim[1], np.transpose(posterior),
levels=10, zorder=0)
plt.colorbar(contour, ax=ax)
ax.set_xlim(-5, 5)
ax.set_ylim(-5, 5)
ax.set_title(
'Active learning \n prior U[-5,5]x[-5,5] \n simulator y=x \n'
'likelihood Rosenbrock \n'+
f'{test_name}')
ax.set_xlabel('x1')
ax.set_ylabel('x2')
png_file = os.path.join(dir_test,
f'temp_active_learning/active_learning_{test_name}.png')
plt.savefig(png_file, bbox_inches='tight')
assert os.path.exists(png_file)
def test_ActiveLearning_brute_variance(active_learner):
test_name = 'brute_variance_grid_estimation'
active_learner.indicator = 'variance'
active_learner.optimizer = optimize.brute
active_learner.kwgs_optimizer = {'Ns':50}
n0 = 40
prefixes = [f'init_{test_name}_sim{i}' for i in range(n0)]
init_var_samples, init_sim_outputs = active_learner.initial_simulation(
n0, prefixes, mode='serial')
assert init_var_samples.shape == (n0, active_learner.ndim)
assert init_sim_outputs.shape[0] == n0
niter = 60
iter_prefixes = [f'iter_{test_name}_sim{i}' for i in range(niter)]
var_samples, _, _ = active_learner.iterative_emulation(n0, init_var_samples,
init_sim_outputs, niter=niter, iter_prefixes=iter_prefixes)
assert var_samples.shape == (n0+niter, active_learner.ndim)
grid_estimator = GridEstimation(
ndim=active_learner.ndim,
bounds=active_learner.bounds,
ln_pxl=active_learner.approx_ln_pxl)
nbins = [50, 50]
posterior, x_ndim = grid_estimator.run(nbins)
# plot
fig, ax = plt.subplots(1,1,figsize=(6,4))
ax.scatter(init_var_samples[:,0], init_var_samples[:,1], s=10, c='r',
marker='o', zorder=1, alpha=0.8)
ax.scatter(var_samples[n0:,0], var_samples[n0:,1], s=15, c='k', marker='+',
zorder=2, alpha=0.8)
posterior = np.where(posterior < 1e-10, None, posterior)
contour = ax.contour(x_ndim[0], x_ndim[1], np.transpose(posterior),
levels=10, zorder=0)
plt.colorbar(contour, ax=ax)
ax.set_xlim(-5, 5)
ax.set_ylim(-5, 5)
ax.set_title(
'Active learning \n prior U[-5,5]x[-5,5] \n simulator y=x \n'
'likelihood Rosenbrock \n '+
f'{test_name}')
ax.set_xlabel('x1')
ax.set_ylabel('x2')
png_file = os.path.join(dir_test,
f'temp_active_learning/active_learning_{test_name}.png')
plt.savefig(png_file, bbox_inches='tight')
assert os.path.exists(png_file)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment