Skip to content
Snippets Groups Projects
Commit 3b4b4757 authored by Nicholas Book's avatar Nicholas Book
Browse files

updated ravaflow-related tests

parent c5950906
Branches
No related tags found
No related merge requests found
...@@ -126,3 +126,6 @@ cython_debug/ ...@@ -126,3 +126,6 @@ cython_debug/
# auto_examples # auto_examples
docs/source/auto_examples/ docs/source/auto_examples/
tests/temp_ravaflow_results/
tests/temp_run_ravaflow/
\ No newline at end of file
import os import os
import pytest import pytest
import numpy as np import numpy as np
from psimpy.simulator import Ravaflow24Mixture from psimpy.simulator import Ravaflow3GMixture
@pytest.mark.parametrize( @pytest.mark.parametrize(
"dir_sim, conversion_control, curvature_control, surface_control, \ "dir_sim, diffusion_control, curvature_control, surface_control, \
entrainment_control, stopping_control", entrainment_control, stopping_control",
[ [
(os.path.abspath(os.path.join(__file__, '../not_exist_dir')), (
'0', '0', '0', '0', '0'), os.path.abspath(os.path.join(__file__, "../not_exist_dir")),
(os.path.abspath(os.path.join(__file__, '../data/synthetic_rel.tif')), "0",
'0', '0', '0', '0', '0'), "0",
(os.path.abspath(os.path.join(__file__, '../data')), "0",
'2', '0', '0', '0', '0'), "0",
(os.path.abspath(os.path.join(__file__, '../data')), "0",
'0', '3', '0', '0', '0'), ),
(os.path.abspath(os.path.join(__file__, '../data')), (
'0', '0', '2', '0', '0'), os.path.abspath(os.path.join(__file__, "../data/synthetic_rel.tif")),
(os.path.abspath(os.path.join(__file__, '../data')), "0",
'0', '0', '0', '5', '0'), "0",
(os.path.abspath(os.path.join(__file__, '../data')), "0",
'0', '0', '0', '0', '4') "0",
] "0",
),
(os.path.abspath(os.path.join(__file__, "../data")), "2", "0", "0", "0", "0"),
(os.path.abspath(os.path.join(__file__, "../data")), "0", "3", "0", "0", "0"),
(os.path.abspath(os.path.join(__file__, "../data")), "0", "0", "2", "0", "0"),
(os.path.abspath(os.path.join(__file__, "../data")), "0", "0", "0", "5", "0"),
(os.path.abspath(os.path.join(__file__, "../data")), "0", "0", "0", "0", "4"),
],
) )
def test_ravaflow24_mixture_init_ValueError(dir_sim, conversion_control, def test_ravaflow24_mixture_init_ValueError(
curvature_control, surface_control, entrainment_control, stopping_control): dir_sim,
diffusion_control,
curvature_control,
surface_control,
entrainment_control,
stopping_control,
):
with pytest.raises(ValueError): with pytest.raises(ValueError):
_ = Ravaflow24Mixture( _ = Ravaflow3GMixture(
dir_sim=dir_sim, dir_sim=dir_sim,
conversion_control=conversion_control, diffusion_control=diffusion_control,
curvature_control=curvature_control, curvature_control=curvature_control,
surface_control=surface_control, surface_control=surface_control,
entrainment_control=entrainment_control, entrainment_control=entrainment_control,
stopping_control=stopping_control) stopping_control=stopping_control,
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"prefix, elevation, hrelease", "prefix, elevation, hrelease",
[ [
('sim', (
os.path.abspath(os.path.join(__file__, '../data/synthetic_topo.tif')), "sim",
os.path.abspath(os.path.join(__file__, '../data/synthetic_rel.tif')) os.path.abspath(os.path.join(__file__, "../data/synthetic_topo.tif")),
os.path.abspath(os.path.join(__file__, "../data/synthetic_rel.tif")),
), ),
('sim_new', (
os.path.abspath(os.path.join(__file__, '../data/syn_topo.tif')), "sim_new",
os.path.abspath(os.path.join(__file__, '../data/synthetic_rel.tif')) os.path.abspath(os.path.join(__file__, "../data/syn_topo.tif")),
os.path.abspath(os.path.join(__file__, "../data/synthetic_rel.tif")),
), ),
('sim_new', (
os.path.abspath(os.path.join(__file__, '../data/synthetic_topo.tif')), "sim_new",
os.path.abspath(os.path.join(__file__, '../data/syn_rel.tif')) os.path.abspath(os.path.join(__file__, "../data/synthetic_topo.tif")),
) os.path.abspath(os.path.join(__file__, "../data/syn_rel.tif")),
] ),
],
) )
def test_ravaflow24_mixture_preprocess_ValueError(prefix, elevation, hrelease): def test_ravaflow24_mixture_preprocess_ValueError(prefix, elevation, hrelease):
dir_test = os.path.abspath(os.path.join(__file__, '../')) dir_test = os.path.abspath(os.path.join(__file__, "../"))
os.chdir(dir_test) os.chdir(dir_test)
if not os.path.exists('temp_ravaflow_1'): if not os.path.exists("temp_ravaflow_1"):
os.mkdir('temp_ravaflow_1') os.mkdir("temp_ravaflow_1")
os.chdir('temp_ravaflow_1') os.chdir("temp_ravaflow_1")
dir_sim = os.path.join(dir_test, 'temp_ravaflow_1') dir_sim = os.path.join(dir_test, "temp_ravaflow_1")
if not os.path.exists('sim_results'): if not os.path.exists("sim_results"):
os.mkdir('sim_results') os.mkdir("sim_results")
rflow24_mixture = Ravaflow24Mixture(dir_sim=dir_sim) ravaflow3G_mixture = Ravaflow3GMixture(dir_sim=dir_sim)
with pytest.raises(ValueError): with pytest.raises(ValueError):
rflow24_mixture.preprocess( ravaflow3G_mixture.preprocess(
prefix=prefix, elevation=elevation, hrelease=hrelease) prefix=prefix, elevation=elevation, hrelease=hrelease
)
def test_ravaflow24_mixture_run_and_extract_output(): def test_ravaflow24_mixture_run_and_extract_output():
dir_test = os.path.abspath(os.path.join(__file__, '../')) dir_test = os.path.abspath(os.path.join(__file__, "../"))
os.chdir(dir_test) os.chdir(dir_test)
if not os.path.exists('temp_ravaflow_2'): if not os.path.exists("temp_ravaflow_2"):
os.mkdir('temp_ravaflow_2') os.mkdir("temp_ravaflow_2")
dir_sim = os.path.join(dir_test, 'temp_ravaflow_2') dir_sim = os.path.join(dir_test, "temp_ravaflow_2")
rflow24_mixture = Ravaflow24Mixture(dir_sim=dir_sim, time_end=100) ravaflow3G_mixture = Ravaflow3GMixture(dir_sim=dir_sim, time_end=100)
elevation = os.path.join(dir_test, 'data/synthetic_topo.tif') elevation = os.path.join(dir_test, "data/synthetic_topo.tif")
hrelease = os.path.join(dir_test, 'data/synthetic_rel.tif') hrelease = os.path.join(dir_test, "data/synthetic_rel.tif")
prefix = 'test' prefix = "test"
grass_location, sh_file = rflow24_mixture.preprocess( grass_location, sh_file = ravaflow3G_mixture.preprocess(
prefix=prefix, elevation=elevation, hrelease=hrelease, EPSG='2326') prefix=prefix, elevation=elevation, hrelease=hrelease, EPSG="2326"
)
rflow24_mixture.run(grass_location, sh_file) ravaflow3G_mixture.run(grass_location, sh_file)
assert os.path.exists(os.path.join(dir_sim, f'{prefix}_results')) assert os.path.exists(os.path.join(dir_sim, f"{prefix}_results"))
impact_area = rflow24_mixture.extract_impact_area(prefix) impact_area = ravaflow3G_mixture.extract_impact_area(prefix)
print(f"impact_area: {impact_area}") print(f"impact_area: {impact_area}")
assert isinstance(impact_area, float) assert isinstance(impact_area, float)
assert impact_area > 0 assert impact_area > 0
overall_max_velocity = rflow24_mixture.extract_qoi_max(prefix, 'v') overall_max_velocity = ravaflow3G_mixture.extract_qoi_max(prefix, "v")
print(f"overall_max_velocity: {overall_max_velocity}") print(f"overall_max_velocity: {overall_max_velocity}")
assert isinstance(overall_max_velocity, float) assert isinstance(overall_max_velocity, float)
assert overall_max_velocity > 0 assert overall_max_velocity > 0
raster_max_velocity = rflow24_mixture.extract_qoi_max(prefix, 'v', raster_max_velocity = ravaflow3G_mixture.extract_qoi_max(
aggregate=False) prefix, "v", aggregate=False
)
assert isinstance(raster_max_velocity, np.ndarray) assert isinstance(raster_max_velocity, np.ndarray)
assert np.max(raster_max_velocity) == overall_max_velocity assert np.max(raster_max_velocity) == overall_max_velocity
loc = np.array([[500, 2000], [600, 2000], [700, 2500]]) loc = np.array([[500, 2000], [600, 2000], [700, 2500]])
loc_max_pressure = rflow24_mixture.extract_qoi_max_loc(prefix, loc, 'p') loc_max_pressure = ravaflow3G_mixture.extract_qoi_max_loc(prefix, loc, "p")
print(f"loc_max_pressure: {loc_max_pressure}") print(f"loc_max_pressure: {loc_max_pressure}")
assert isinstance(loc_max_pressure, np.ndarray) assert isinstance(loc_max_pressure, np.ndarray)
assert loc_max_pressure.ndim == 1 assert loc_max_pressure.ndim == 1
assert len(loc_max_pressure) == len(loc) assert len(loc_max_pressure) == len(loc)
loc = np.array([[500, 2000]]) loc = np.array([[500, 2000]])
loc_max_energy = rflow24_mixture.extract_qoi_max_loc(prefix, loc, 't') loc_max_energy = ravaflow3G_mixture.extract_qoi_max_loc(prefix, loc, "t")
print(f"loc_max_energy: {loc_max_energy}") print(f"loc_max_energy: {loc_max_energy}")
assert isinstance(loc_max_energy, np.ndarray) assert isinstance(loc_max_energy, np.ndarray)
assert loc_max_energy.ndim == 1 assert loc_max_energy.ndim == 1
......
...@@ -3,39 +3,44 @@ import time ...@@ -3,39 +3,44 @@ import time
import itertools import itertools
import numpy as np import numpy as np
from psimpy.simulator import RunSimulator from psimpy.simulator import RunSimulator
from psimpy.simulator import Ravaflow24Mixture from psimpy.simulator import Ravaflow3GMixture
dir_test = os.path.abspath(os.path.join(__file__, '../')) dir_test = os.path.abspath(os.path.join(__file__, "../"))
os.chdir(dir_test) os.chdir(dir_test)
if not os.path.exists('temp_run_ravaflow'): if not os.path.exists("temp_run_ravaflow"):
os.mkdir('temp_run_ravaflow') os.mkdir("temp_run_ravaflow")
dir_out = os.path.join(dir_test, 'temp_run_ravaflow') dir_out = os.path.join(dir_test, "temp_run_ravaflow")
if not os.path.exists('temp_ravaflow_results'): if not os.path.exists("temp_ravaflow_results"):
os.mkdir('temp_ravaflow_results') os.mkdir("temp_ravaflow_results")
dir_sim = os.path.join(dir_test, 'temp_ravaflow_results') dir_sim = os.path.join(dir_test, "temp_ravaflow_results")
elevation = os.path.join(dir_test, 'data/synthetic_topo.tif') elevation = os.path.join(dir_test, "data/synthetic_topo.tif")
hrelease = os.path.join(dir_test, 'data/synthetic_rel.tif') hrelease = os.path.join(dir_test, "data/synthetic_rel.tif")
loc = np.array([[500, 2000], [600, 2000], [700, 2500]]) loc = np.array([[500, 2000], [600, 2000], [700, 2500]])
rflow24_mixture = Ravaflow24Mixture(dir_sim=dir_sim, time_end=100) ravaflow3G_mixture = Ravaflow3GMixture(dir_sim=dir_sim, time_end=100)
# define simulator
def simulator(prefix, elevation, hrelease, basal_friction,
turbulent_friction, EPSG, qoi, loc):
grass_location, sh_file = rflow24_mixture.preprocess( # define simulator
prefix=prefix, elevation=elevation, hrelease=hrelease, def simulator(
prefix, elevation, hrelease, basal_friction, turbulent_friction, EPSG, qoi, loc
):
grass_location, sh_file = ravaflow3G_mixture.preprocess(
prefix=prefix,
elevation=elevation,
hrelease=hrelease,
basal_friction=basal_friction, basal_friction=basal_friction,
turbulent_friction=turbulent_friction, EPSG=EPSG) turbulent_friction=turbulent_friction,
EPSG=EPSG,
)
rflow24_mixture.run(grass_location, sh_file) ravaflow3G_mixture.run(grass_location, sh_file)
impact_area = rflow24_mixture.extract_impact_area(prefix) impact_area = ravaflow3G_mixture.extract_impact_area(prefix)
overall_max_qoi = rflow24_mixture.extract_qoi_max(prefix, qoi) overall_max_qoi = ravaflow3G_mixture.extract_qoi_max(prefix, qoi)
loc_max_qoi = rflow24_mixture.extract_qoi_max_loc(prefix, loc, qoi) loc_max_qoi = ravaflow3G_mixture.extract_qoi_max_loc(prefix, loc, qoi)
output = np.zeros(len(loc_max_qoi) + 2) output = np.zeros(len(loc_max_qoi) + 2)
output[0] = impact_area output[0] = impact_area
...@@ -44,28 +49,39 @@ def simulator(prefix, elevation, hrelease, basal_friction, ...@@ -44,28 +49,39 @@ def simulator(prefix, elevation, hrelease, basal_friction,
return output return output
def test_run_ravaflow24():
var_inp_parameter = ['basal_friction', 'turbulent_friction']
fix_inp = {'elevation': elevation, 'hrelease': hrelease, 'qoi': 'v',
'loc': loc, 'EPSG': '2326'}
o_parameter = 'prefix'
run_ravaflow24 = RunSimulator( def test_run_ravaflow3G():
simulator=simulator, var_inp_parameter=var_inp_parameter, var_inp_parameter = ["basal_friction", "turbulent_friction"]
fix_inp=fix_inp, o_parameter=o_parameter, dir_out=dir_out, fix_inp = {
save_out=True) "elevation": elevation,
"hrelease": hrelease,
"qoi": "v",
"loc": loc,
"EPSG": "2326",
}
o_parameter = "prefix"
run_ravaflow3G = RunSimulator(
simulator=simulator,
var_inp_parameter=var_inp_parameter,
fix_inp=fix_inp,
o_parameter=o_parameter,
dir_out=dir_out,
save_out=True,
)
basal_friction = [20] basal_friction = [20]
turbulent_friction = [3, 4] turbulent_friction = [3, 4]
var_samples = np.array( var_samples = np.array(
[x for x in itertools.product(basal_friction, turbulent_friction)]) [x for x in itertools.product(basal_friction, turbulent_friction)]
)
serial_prefixes = ["serial" + str(i) for i in range(len(var_samples))] serial_prefixes = ["serial" + str(i) for i in range(len(var_samples))]
start = time.time() start = time.time()
run_ravaflow24.serial_run(var_samples=var_samples, prefixes=serial_prefixes) run_ravaflow3G.serial_run(var_samples=var_samples, prefixes=serial_prefixes)
serial_time = time.time() - start serial_time = time.time() - start
serial_output = run_ravaflow24.outputs serial_output = run_ravaflow3G.outputs
print(f"serial_output: {serial_output}") print(f"serial_output: {serial_output}")
assert len(serial_output) == len(var_samples) assert len(serial_output) == len(var_samples)
...@@ -76,12 +92,13 @@ def test_run_ravaflow24(): ...@@ -76,12 +92,13 @@ def test_run_ravaflow24():
parallel_prefixes = ["parallel" + str(i) for i in range(len(var_samples))] parallel_prefixes = ["parallel" + str(i) for i in range(len(var_samples))]
start = time.time() start = time.time()
run_ravaflow24.parallel_run(var_samples, prefixes=parallel_prefixes, run_ravaflow3G.parallel_run(
max_workers=2, append=True) var_samples, prefixes=parallel_prefixes, max_workers=2, append=True
)
parallel_time = time.time() - start parallel_time = time.time() - start
assert len(run_ravaflow24.outputs) == 2 * len(var_samples) assert len(run_ravaflow3G.outputs) == 2 * len(var_samples)
parallel_output = run_ravaflow24.outputs[len(var_samples):] parallel_output = run_ravaflow3G.outputs[len(var_samples) :]
print(f"parallel_output: {parallel_output}") print(f"parallel_output: {parallel_output}")
assert len(parallel_output) == len(var_samples) assert len(parallel_output) == len(var_samples)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment