Skip to content
Snippets Groups Projects
Commit 64f0d9a1 authored by Nicholas Book's avatar Nicholas Book
Browse files

added ravaflow3G module and updated dependencies

parent 147c8b5c
No related branches found
No related tags found
No related merge requests found
...@@ -5,76 +5,77 @@ ...@@ -5,76 +5,77 @@
## Description ## Description
`PSimPy` (Predictive and probabilistic simulation with Python) implements `PSimPy` (Predictive and probabilistic simulation with Python) implements
a Gaussian process emulation-based framework that enables systematically and a Gaussian process emulation-based framework that enables systematic and
efficiently inverstigating uncertainties associated with physics-based models efficient investigation of uncertainties associated with physics-based models
(i.e. simulators). (i.e. simulators).
## Installation ## Installation
`PSimPy` is a pure Python package and can be easily installed using `pip`. All `PSimPy` is a pure Python package and can be easily installed using `pip`. All
Python-related dependencies are automatically taken care of. It should be noted Python-related dependencies are automatically taken care of. It should be noted
that some modules of `PSimPy` rely on / take advantage of non-Python package and that some modules of `PSimPy` rely on / take advantage of non-Python packages and
software. More specifically, the emulator module `robustgasp.py` relies on the R software. More specifically, the emulator module `robustgasp.py` relies on the R
package `RobustGaSP`; the simulator module `ravaflow24.py` relies on the open package `RobustGaSP`; the simulator module `ravaflow3G.py` relies on the open
source software `r.avaflow 2.4`. If you want to use these modules or any other source software `r.avaflow 3G` (which runs in [GRASS GIS](https://grass.osgeo.org/)). If you want to use these or any other
modules relying on these modules, corresponding non-Python dependencies need to dependent modules, corresponding non-Python dependencies need to
be installed. be installed.
If the simulator module `ravaflow.py` is desired, you may follow the official If the simulator module `ravaflow3G.py` is desired, you may follow the official
documentation of `r.avaflow 2.4` under https://www.landslidemodels.org/r.avaflow/ documentation of `r.avaflow 3G` under https://www.landslidemodels.org/r.avaflow/
to install it. The installation of the R package `RobustGaSP` is covered in to install it. Only the installation of the R package `RobustGaSP` is covered in
following steps. following steps.
We recommond you to install `PSimPy` in a virtual environment such as a `conda` We recommond you to install `PSimPy` in a virtual environment such as a `conda`
environment. You may want to first install `Anaconda` or `Miniconda` if you environment. You may want to first install `Anaconda` or `Miniconda` if you
haven't. The steps afterwards are as follows: haven't. The steps afterwards are as follows:
1. Create a conda environment with Python 3.9: 1. Create a conda environment with Python:
```bash ```bash
conda create --name your_env_name python=3.9 $ conda create --name your_env_name python
``` ```
2. Install `R` if you don't have it on your machine (if you have `R`, you can 2. Install `R` if you don't have it on your machine (if you have `R`, you can
skip this step; alternatively, you can still follow this step to install `R` in skip this step; alternatively, you can still follow this step to install `R` in
the conda environment): the conda environment):
```bash ```bash
conda activate your_env_name $ conda activate your_env_name
conda install -c conda-forge r-base=3.6 $ conda install -c conda-forge r-base
``` ```
3. Install the R package `RobustGaSP` in the R terminal: 3. Install the R package `RobustGaSP` in the R terminal:
```bash ```bash
R $ R
install.packages("RobustGaSP",repos="https://cran.r-project.org",version="0.6.4") ...
> install.packages("RobustGaSP",repos="https://cran.r-project.org",version="0.6.4")
``` ```
Try if it is successfully installed by See if it is successfully installed with
```bash ```bash
library("RobustGaSP") > library("RobustGaSP")
``` ```
4. Configure the environment variable `R_HOME` so that `rpy2` knows where to find 4. Configure the environment variable `R_HOME` so that `rpy2` knows where to find
`R` packages. You can find your `R_HOME` by typing the following command in the `R` packages. You can find your `R_HOME` by typing the following command in the
R terminal: R terminal:
```bash ```bash
R.home() > R.home()
q() > q()
``` ```
It is a path like ".../lib/R". Set the environment variable `R_HOME` in your It is a path like `".../lib/R"`. Set the environment variable `R_HOME` in your
conda environment by conda environment with
```bash ```bash
conda env config vars set R_HOME=your_R_HOME $ conda env config vars set R_HOME=your_R_HOME
``` ```
Afterwards reactivate your conda environment to make the change take effect by Afterwards reactivate your conda environment to make the change take effect by
```bash ```bash
conda deactivate $ conda deactivate
conda activate your_env_name $ conda activate your_env_name
``` ```
5. Install `PSimPy` using `pip` in your conda environment by 5. Install `PSimPy` using `pip` in your conda environment by
```bash ```bash
conda install -c conda-forge pip $ conda install -c conda-forge pip
pip install psimpy $ python -m pip install psimpy
``` ```
Now you should have `PSimPy` and its dependencies successfully installed in your Now you should have `PSimPy` and its dependencies successfully installed in your
...@@ -84,10 +85,10 @@ If you would like to use it with Jupyter Notebook (iPython Notebook), there is ...@@ -84,10 +85,10 @@ If you would like to use it with Jupyter Notebook (iPython Notebook), there is
one extra step needed to set your conda environment on your Notebook: one extra step needed to set your conda environment on your Notebook:
6. Install `ipykernel` and install a kernel that points to your conda environment 6. Install `ipykernel` and install a kernel that points to your conda environment
by with
```bash ```bash
conda install -c conda-forge ipykernel $ conda install -c conda-forge ipykernel
python -m ipykernel install --user --name=your_env_name $ python -m ipykernel install --user --name=your_env_name
``` ```
Now you can start your Notebook, change the kernel to your conda environment, and Now you can start your Notebook, change the kernel to your conda environment, and
...@@ -100,11 +101,10 @@ including the API and theory (or reference) of each module. ...@@ -100,11 +101,10 @@ including the API and theory (or reference) of each module.
## Usage ## Usage
Usage examples are provided by the `Example Gallery` at https://mbd.pages.git-ce.rwth-aachen.de/psimpy. Usage examples are provided by the **Example Gallery** at https://mbd.pages.git-ce.rwth-aachen.de/psimpy.
## License ## License
`PSimPy` was created by Hu Zhao at the Chair of Methods for Model-based `PSimPy` was created by Hu Zhao at the Chair of Methods for Model-based
Development in Computational Engineering. It is licensed under the terms of Development in Computational Engineering (RWTH Aachen University, Germany). It is licensed under the terms of the MIT license.
the MIT license. \ No newline at end of file
\ No newline at end of file
This diff is collapsed.
[tool.poetry] [tool.poetry]
name = "psimpy" name = "psimpy"
version = "0.1.2" version = "0.1.3"
description = "Predictive and probabilistic simulation tools." description = "Predictive and probabilistic simulation tools."
authors = ["Hu Zhao"] authors = ["Hu Zhao"]
license = "MIT" license = "MIT"
...@@ -9,15 +9,15 @@ homepage = "https://git-ce.rwth-aachen.de/mbd/psimpy" ...@@ -9,15 +9,15 @@ homepage = "https://git-ce.rwth-aachen.de/mbd/psimpy"
keywords = ["emulator","simulator","inference", "sensitivity","uncertainty"] keywords = ["emulator","simulator","inference", "sensitivity","uncertainty"]
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = ">=3.9,<3.11" python = ">=3.9"
numpy = "^1.22.3" numpy = ">1.22.3"
scipy = "^1.8.0" scipy = ">1.8.0"
SALib = "^1.4.5" SALib = ">1.4.5"
rpy2 = "^3.5.1" rpy2 = "=3.5.12" # 3.5.13 seems to be broken
beartype = "^0.11.0" beartype = ">0.11.0"
[tool.poetry.dev-dependencies] [tool.poetry.dev-dependencies]
pytest = "^7.1.1" pytest = ">7.1.1"
pytest-cov = "^3.0.0" pytest-cov = "^3.0.0"
pytest-order = "^1.0.1" pytest-order = "^1.0.1"
matplotlib = "^3.6.1" matplotlib = "^3.6.1"
......
...@@ -7,25 +7,34 @@ from typing import Union ...@@ -7,25 +7,34 @@ from typing import Union
from beartype import beartype from beartype import beartype
rpy2.robjects.numpy2ri.activate() rpy2.robjects.numpy2ri.activate()
RobustGaSP = importr('RobustGaSP') RobustGaSP = importr("RobustGaSP")
# fetch @ # fetch @
r_base = importr("base") r_base = importr("base")
r_at = r_base.__dict__["@"] r_at = r_base.__dict__["@"]
class RobustGaSPBase:
class RobustGaSPBase:
@beartype @beartype
def __init__( def __init__(
self, ndim: int, zero_mean: str = 'No', nugget: float = 0.0, self,
ndim: int,
zero_mean: str = "No",
nugget: float = 0.0,
nugget_est: bool = False, nugget_est: bool = False,
range_par: Union[np.ndarray, NALogicalType] = NA, range_par: Union[np.ndarray, NALogicalType] = NA,
method: str ='post_mode', prior_choice: str = 'ref_approx', method: str = "post_mode",
a: float = 0.2, b: Union[float, None] = None, prior_choice: str = "ref_approx",
kernel_type: str = 'matern_5_2', a: float = 0.2,
isotropic: bool = False, optimization: str = 'lbfgs', b: Union[float, None] = None,
alpha: Union[np.ndarray, None] = None, lower_bound: bool = True, kernel_type: str = "matern_5_2",
max_eval: Union[int, None] = None, num_initial_values: int = 2) -> None: isotropic: bool = False,
optimization: str = "lbfgs",
alpha: Union[np.ndarray, None] = None,
lower_bound: bool = True,
max_eval: Union[int, None] = None,
num_initial_values: int = 2,
) -> None:
""" """
Set up basic parameters of the emulator. Set up basic parameters of the emulator.
...@@ -102,7 +111,7 @@ class RobustGaSPBase: ...@@ -102,7 +111,7 @@ class RobustGaSPBase:
""" """
self.ndim = ndim self.ndim = ndim
if zero_mean not in ['No', 'Yes']: if zero_mean not in ["No", "Yes"]:
raise ValueError("zero_mean can be either 'No' or 'Yes'.") raise ValueError("zero_mean can be either 'No' or 'Yes'.")
self.zero_mean = zero_mean self.zero_mean = zero_mean
...@@ -111,35 +120,34 @@ class RobustGaSPBase: ...@@ -111,35 +120,34 @@ class RobustGaSPBase:
if range_par is not NA: if range_par is not NA:
if range_par.ndim != 1 or len(range_par) != ndim: if range_par.ndim != 1 or len(range_par) != ndim:
raise ValueError( raise ValueError("range_par must be 1d numpy array of length ndim")
"range_par must be 1d numpy array of length ndim")
self.range_par = range_par self.range_par = range_par
if method not in ['post_mode', 'mmle', 'mle']: if method not in ["post_mode", "mmle", "mle"]:
raise ValueError( raise ValueError("method can only be 'post_mode', 'mmle', or 'mle'.")
"method can only be 'post_mode', 'mmle', or 'mle'.")
self.method = method self.method = method
if prior_choice not in ['ref_xi', 'ref_gamma', 'ref_approx']: if prior_choice not in ["ref_xi", "ref_gamma", "ref_approx"]:
raise ValueError( raise ValueError(
"prior_choice can only be 'ref_xi', 'ref_gamma'," "prior_choice can only be 'ref_xi', 'ref_gamma'," " or 'ref_approx'."
" or 'ref_approx'.") )
self.prior_choice = prior_choice self.prior_choice = prior_choice
self.a = float(a) self.a = float(a)
self.b = b self.b = b
if kernel_type not in ['matern_3_2', 'matern_5_2', 'pow_exp']: if kernel_type not in ["matern_3_2", "matern_5_2", "pow_exp"]:
raise ValueError( raise ValueError(
"kernel_type can only be 'matern_3_2', 'matern_5_2'," "kernel_type can only be 'matern_3_2', 'matern_5_2'," " or 'pow_exp'."
" or 'pow_exp'.") )
self.kernel_type = kernel_type self.kernel_type = kernel_type
self.isotropic = isotropic self.isotropic = isotropic
if optimization not in ['lbfgs', 'nelder-mead', 'brent']: if optimization not in ["lbfgs", "nelder-mead", "brent"]:
raise ValueError( raise ValueError(
"optimization can only be 'lbfgs', 'nelder-mead', or 'brent'.") "optimization can only be 'lbfgs', 'nelder-mead', or 'brent'."
)
self.optimization = optimization self.optimization = optimization
if alpha is None: if alpha is None:
...@@ -158,8 +166,9 @@ class RobustGaSPBase: ...@@ -158,8 +166,9 @@ class RobustGaSPBase:
self.emulator = None self.emulator = None
def _preprocess_design_response_trend(self, design: np.ndarray, def _preprocess_design_response_trend(
response: np.ndarray, trend: np.ndarray) -> None: self, design: np.ndarray, response: np.ndarray, trend: np.ndarray
) -> None:
""" """
Preprocess design, response, and trend before train an emulator. Preprocess design, response, and trend before train an emulator.
...@@ -197,8 +206,7 @@ class RobustGaSPBase: ...@@ -197,8 +206,7 @@ class RobustGaSPBase:
elif response.ndim == 1: elif response.ndim == 1:
response = np.reshape(response, (len(response), 1)) response = np.reshape(response, (len(response), 1))
if response.shape[0] != design.shape[0]: if response.shape[0] != design.shape[0]:
raise ValueError( raise ValueError("response must have same number of points as design")
"response must have same number of points as design")
self.response = response.astype(np.float32) self.response = response.astype(np.float32)
if trend is None: if trend is None:
...@@ -211,17 +219,19 @@ class RobustGaSPBase: ...@@ -211,17 +219,19 @@ class RobustGaSPBase:
raise ValueError("trend must have same numer of points as design") raise ValueError("trend must have same numer of points as design")
self.trend = trend.astype(np.float32) self.trend = trend.astype(np.float32)
if self.zero_mean == 'Yes': if self.zero_mean == "Yes":
self.trend = np.zeros((design.shape[0], 1), dtype=np.float32) self.trend = np.zeros((design.shape[0], 1), dtype=np.float32)
if self.b is None: if self.b is None:
self._b = (1 / design.shape[0])**(1 / design.shape[1]) * \ self._b = (1 / design.shape[0]) ** (1 / design.shape[1]) * (
(self.a + design.shape[1]) self.a + design.shape[1]
)
else: else:
self._b = self.b self._b = self.b
def _preprocess_testing_input_trend(self, testing_input: np.ndarray, def _preprocess_testing_input_trend(
testing_trend: np.ndarray) -> None: self, testing_input: np.ndarray, testing_trend: np.ndarray
) -> None:
""" """
Preprocess testing_input and testing_trend before make prediction or Preprocess testing_input and testing_trend before make prediction or
draw samples. draw samples.
...@@ -246,8 +256,7 @@ class RobustGaSPBase: ...@@ -246,8 +256,7 @@ class RobustGaSPBase:
testing_input = np.reshape(testing_input, (len(testing_input), 1)) testing_input = np.reshape(testing_input, (len(testing_input), 1))
if testing_input.shape[1] != self.ndim: if testing_input.shape[1] != self.ndim:
raise ValueError( raise ValueError("testing_input must match the input dimension ndim")
"testing_input must match the input dimension ndim")
self.testing_input = testing_input.astype(np.float32) self.testing_input = testing_input.astype(np.float32)
...@@ -264,18 +273,24 @@ class RobustGaSPBase: ...@@ -264,18 +273,24 @@ class RobustGaSPBase:
if testing_trend.shape[0] != testing_input.shape[0]: if testing_trend.shape[0] != testing_input.shape[0]:
raise ValueError( raise ValueError(
"testing_trend must have same numer of points as testing_input") "testing_trend must have same numer of points as testing_input"
)
if testing_trend.shape[1] != self.trend.shape[1]: if testing_trend.shape[1] != self.trend.shape[1]:
raise ValueError( raise ValueError(
"testing_trend must have the same basis as the trend used" "testing_trend must have the same basis as the trend used"
" for emulator training.") " for emulator training."
)
class ScalarGaSP(RobustGaSPBase):
class ScalarGaSP(RobustGaSPBase):
@beartype @beartype
def train(self, design: np.ndarray, response: np.ndarray, def train(
trend: Union[np.ndarray, None] = None) -> None: self,
design: np.ndarray,
response: np.ndarray,
trend: Union[np.ndarray, None] = None,
) -> None:
""" """
Train a scalar GP given training data. Train a scalar GP given training data.
...@@ -318,11 +333,13 @@ class ScalarGaSP(RobustGaSPBase): ...@@ -318,11 +333,13 @@ class ScalarGaSP(RobustGaSPBase):
alpha=self.alpha, alpha=self.alpha,
lower_bound=self.lower_bound, lower_bound=self.lower_bound,
max_eval=self.max_eval, max_eval=self.max_eval,
num_initial_values=self.num_initial_values) num_initial_values=self.num_initial_values,
)
@beartype @beartype
def predict(self, testing_input: np.ndarray, def predict(
testing_trend: Union[np.ndarray, None] = None) -> np.ndarray: self, testing_input: np.ndarray, testing_trend: Union[np.ndarray, None] = None
) -> np.ndarray:
""" """
Make prediction using the trained scalar GP emulator. Make prediction using the trained scalar GP emulator.
...@@ -358,14 +375,19 @@ class ScalarGaSP(RobustGaSPBase): ...@@ -358,14 +375,19 @@ class ScalarGaSP(RobustGaSPBase):
self.testing_input, self.testing_input,
testing_trend=self.testing_trend, testing_trend=self.testing_trend,
interval_data=True, interval_data=True,
outasS3=True) outasS3=True,
)
predictions = np.transpose(np.array(r_predictions)) predictions = np.transpose(np.array(r_predictions))
return predictions return predictions
@beartype @beartype
def sample(self, testing_input: np.ndarray, nsamples: int = 1, def sample(
testing_trend: Union[np.ndarray, None] = None) -> np.ndarray: self,
testing_input: np.ndarray,
nsamples: int = 1,
testing_trend: Union[np.ndarray, None] = None,
) -> np.ndarray:
""" """
Draw samples using the trained scalar GP emulator. Draw samples using the trained scalar GP emulator.
...@@ -399,7 +421,8 @@ class ScalarGaSP(RobustGaSPBase): ...@@ -399,7 +421,8 @@ class ScalarGaSP(RobustGaSPBase):
self.emulator, self.emulator,
self.testing_input, self.testing_input,
num_sample=nsamples, num_sample=nsamples,
testing_trend=self.testing_trend) testing_trend=self.testing_trend,
)
samples = np.array(r_samples) samples = np.array(r_samples)
return samples return samples
...@@ -412,8 +435,8 @@ class ScalarGaSP(RobustGaSPBase): ...@@ -412,8 +435,8 @@ class ScalarGaSP(RobustGaSPBase):
------- -------
validation : numpy array validation : numpy array
Shape :code:`(n, 2)`. Shape :code:`(n, 2)`.
validation[:, 0] - leave one out fitted values, validation[:, 0] - leave-one-out fitted values
validation[:, 1] - stand deviation of each prediction validation[:, 1] - standard deviation of each prediction
""" """
if self.emulator is None: if self.emulator is None:
raise RuntimeError("Emulator has not been trained!") raise RuntimeError("Emulator has not been trained!")
...@@ -425,10 +448,13 @@ class ScalarGaSP(RobustGaSPBase): ...@@ -425,10 +448,13 @@ class ScalarGaSP(RobustGaSPBase):
class PPGaSP(RobustGaSPBase): class PPGaSP(RobustGaSPBase):
@beartype @beartype
def train(self, design: np.ndarray, response: np.ndarray, def train(
trend: Union[np.ndarray, None] = None) -> None: self,
design: np.ndarray,
response: np.ndarray,
trend: Union[np.ndarray, None] = None,
) -> None:
""" """
Train a parallel partial GP given training data. Train a parallel partial GP given training data.
...@@ -453,7 +479,8 @@ class PPGaSP(RobustGaSPBase): ...@@ -453,7 +479,8 @@ class PPGaSP(RobustGaSPBase):
if self.response.shape[1] == 1: if self.response.shape[1] == 1:
raise RuntimeError( raise RuntimeError(
"PPGaSP should be used for models with vector output." "PPGaSP should be used for models with vector output."
" For scalar output model, please use ScalarGaSP.") " For scalar output model, please use ScalarGaSP."
)
self.emulator = RobustGaSP.ppgasp( self.emulator = RobustGaSP.ppgasp(
design=self.design, design=self.design,
...@@ -473,11 +500,13 @@ class PPGaSP(RobustGaSPBase): ...@@ -473,11 +500,13 @@ class PPGaSP(RobustGaSPBase):
alpha=self.alpha, alpha=self.alpha,
lower_bound=self.lower_bound, lower_bound=self.lower_bound,
max_eval=self.max_eval, max_eval=self.max_eval,
num_initial_values=self.num_initial_values) num_initial_values=self.num_initial_values,
)
@beartype @beartype
def predict(self, testing_input: np.ndarray, def predict(
testing_trend: Union[np.ndarray, None] = None) -> np.ndarray: self, testing_input: np.ndarray, testing_trend: Union[np.ndarray, None] = None
) -> np.ndarray:
""" """
Make prediction using the trained parallel partial GP emulator. Make prediction using the trained parallel partial GP emulator.
...@@ -514,7 +543,8 @@ class PPGaSP(RobustGaSPBase): ...@@ -514,7 +543,8 @@ class PPGaSP(RobustGaSPBase):
self.testing_input, self.testing_input,
testing_trend=self.testing_trend, testing_trend=self.testing_trend,
interval_data=True, interval_data=True,
outasS3=True) outasS3=True,
)
ntest = self.testing_input.shape[0] ntest = self.testing_input.shape[0]
nresponse = self.response.shape[1] nresponse = self.response.shape[1]
...@@ -526,8 +556,12 @@ class PPGaSP(RobustGaSPBase): ...@@ -526,8 +556,12 @@ class PPGaSP(RobustGaSPBase):
return predictions return predictions
@beartype @beartype
def sample(self, testing_input: np.ndarray, nsamples: int = 1, def sample(
testing_trend: Union[np.ndarray, None] = None) -> np.ndarray: self,
testing_input: np.ndarray,
nsamples: int = 1,
testing_trend: Union[np.ndarray, None] = None,
) -> np.ndarray:
""" """
Draw samples using the trained parallel partial GaSP emulator. Draw samples using the trained parallel partial GaSP emulator.
...@@ -586,17 +620,22 @@ class PPGaSP(RobustGaSPBase): ...@@ -586,17 +620,22 @@ class PPGaSP(RobustGaSPBase):
alpha=self.alpha, alpha=self.alpha,
lower_bound=self.lower_bound, lower_bound=self.lower_bound,
max_eval=self.max_eval, max_eval=self.max_eval,
num_initial_values=self.num_initial_values) num_initial_values=self.num_initial_values,
)
# predictions and samples of y1 at testing_input using rgasp_emulator # predictions and samples of y1 at testing_input using rgasp_emulator
r_predictions_y1 = RobustGaSP.predict_rgasp( r_predictions_y1 = RobustGaSP.predict_rgasp(
rgasp_emulator, self.testing_input, testing_trend=self.testing_trend) rgasp_emulator, self.testing_input, testing_trend=self.testing_trend
)
predictions_y1 = np.transpose(np.array(r_predictions_y1)) predictions_y1 = np.transpose(np.array(r_predictions_y1))
predictions_mean_y1 = np.reshape(predictions_y1[:, 0], (ntest, 1)) predictions_mean_y1 = np.reshape(predictions_y1[:, 0], (ntest, 1))
r_samples_y1 = RobustGaSP.Sample( r_samples_y1 = RobustGaSP.Sample(
rgasp_emulator, self.testing_input, num_sample=nsamples, rgasp_emulator,
testing_trend=self.testing_trend) self.testing_input,
num_sample=nsamples,
testing_trend=self.testing_trend,
)
samples_y1 = np.array(r_samples_y1) samples_y1 = np.array(r_samples_y1)
# use predictions_mean and samples of y1 to calculate LW term # use predictions_mean and samples of y1 to calculate LW term
...@@ -608,7 +647,9 @@ class PPGaSP(RobustGaSPBase): ...@@ -608,7 +647,9 @@ class PPGaSP(RobustGaSPBase):
samples = np.zeros((ntest, nresponse, nsamples)) samples = np.zeros((ntest, nresponse, nsamples))
for i in range(nresponse): for i in range(nresponse):
samples[:,i,:] = np.reshape(predictions_mean[:, i], (ntest, 1)) + \ samples[:, i, :] = (
np.sqrt(sigma2_hat[i]) * LW_term np.reshape(predictions_mean[:, i], (ntest, 1))
+ np.sqrt(sigma2_hat[i]) * LW_term
)
return samples return samples
...@@ -4,18 +4,28 @@ import numpy as np ...@@ -4,18 +4,28 @@ import numpy as np
from scipy.interpolate import interp2d from scipy.interpolate import interp2d
from typing import Union from typing import Union
from beartype import beartype from beartype import beartype
import warnings
class Ravaflow24Mixture:
class Ravaflow24Mixture:
@beartype @beartype
def __init__( def __init__(
self, dir_sim: str, time_step: Union[float, int] = 10, self,
time_end: Union[float, int] = 300, cfl: float = 0.4, dir_sim: str,
time_step_length: float = 0.001, conversion_control: str = '0', time_step: Union[float, int] = 10,
curvature_control: str = '1', surface_control: str = '0', time_end: Union[float, int] = 300,
entrainment_control: str = '0', shear_velocity_coef: float = 0.05, cfl: float = 0.4,
basal_friction_diff: float = 0.0, stopping_control: str = '0', time_step_length: float = 0.001,
stopping_threshold: float = 0.0, friction_control: str = '0') -> None: conversion_control: str = "0",
curvature_control: str = "1",
surface_control: str = "0",
entrainment_control: str = "0",
shear_velocity_coef: float = 0.05,
basal_friction_diff: float = 0.0,
stopping_control: str = "0",
stopping_threshold: float = 0.0,
friction_control: str = "0",
) -> None:
""" """
`r.avaflow 2.4 Mixture model` (Voellmy-type shallow flow model). `r.avaflow 2.4 Mixture model` (Voellmy-type shallow flow model).
...@@ -75,6 +85,11 @@ class Ravaflow24Mixture: ...@@ -75,6 +85,11 @@ class Ravaflow24Mixture:
mixture model). mixture model).
""" """
warnings.warn(
"The ravaflow24 module will be deprecated on 19.01.2024. Please use ravaflow3G module.",
FutureWarning,
)
if not os.path.isdir(dir_sim): if not os.path.isdir(dir_sim):
raise ValueError(f"{dir_sim} does not exist or is not a directory") raise ValueError(f"{dir_sim} does not exist or is not a directory")
self.dir_sim = dir_sim self.dir_sim = dir_sim
...@@ -84,46 +99,49 @@ class Ravaflow24Mixture: ...@@ -84,46 +99,49 @@ class Ravaflow24Mixture:
self.cfl = cfl self.cfl = cfl
self.time_step_length = time_step_length self.time_step_length = time_step_length
if conversion_control not in ['0','1']: if conversion_control not in ["0", "1"]:
raise ValueError("convresion_control must be '0' or '1'") raise ValueError("convresion_control must be '0' or '1'")
self.conversion_control = conversion_control self.conversion_control = conversion_control
if curvature_control not in ['0','1','2']: if curvature_control not in ["0", "1", "2"]:
raise ValueError("curvature_control must be '0' or '1' or '2'") raise ValueError("curvature_control must be '0' or '1' or '2'")
self.curvature_control = curvature_control self.curvature_control = curvature_control
if surface_control not in ['0','1']: if surface_control not in ["0", "1"]:
raise ValueError("surface_control must be '0' or '1'") raise ValueError("surface_control must be '0' or '1'")
self.surface_control = surface_control self.surface_control = surface_control
if entrainment_control not in ['0','1','2','3','4']: if entrainment_control not in ["0", "1", "2", "3", "4"]:
raise ValueError( raise ValueError("entrainment_control must be '0', '1', '2', '3', or '4'")
"entrainment_control must be '0', '1', '2', '3', or '4'")
self.entrainment_control = entrainment_control self.entrainment_control = entrainment_control
self.shear_velocity_coef = shear_velocity_coef self.shear_velocity_coef = shear_velocity_coef
self.basal_friction_diff = basal_friction_diff self.basal_friction_diff = basal_friction_diff
if stopping_control not in ['0','1','2','3']: if stopping_control not in ["0", "1", "2", "3"]:
raise ValueError("stopping_control must be '0', '1', '2', or '3'") raise ValueError("stopping_control must be '0', '1', '2', or '3'")
self.stopping_control = stopping_control self.stopping_control = stopping_control
self.stopping_threshold = stopping_threshold self.stopping_threshold = stopping_threshold
if friction_control not in ['0','1']: if friction_control not in ["0", "1"]:
raise ValueError("friction_control must be '0' or '1'") raise ValueError("friction_control must be '0' or '1'")
self.friction_control = friction_control self.friction_control = friction_control
@beartype @beartype
def preprocess( def preprocess(
self, prefix: str, elevation: str, hrelease: str, self,
prefix: str,
elevation: str,
hrelease: str,
cellsize: Union[float, int, np.floating, np.integer] = 20, cellsize: Union[float, int, np.floating, np.integer] = 20,
hrelease_ratio: Union[float, np.integer] = 1.0, hrelease_ratio: Union[float, np.integer] = 1.0,
internal_friction: Union[float, int, np.floating, np.integer] = 35, internal_friction: Union[float, int, np.floating, np.integer] = 35,
basal_friction: Union[float, int, np.floating, np.integer] = 20, basal_friction: Union[float, int, np.floating, np.integer] = 20,
turbulent_friction: Union[float, int, np.floating, np.integer] = 3, turbulent_friction: Union[float, int, np.floating, np.integer] = 3,
entrainment_coef: Union[float, np.floating] = -7.0, entrainment_coef: Union[float, np.floating] = -7.0,
EPSG: Union[str, None] = None) -> tuple[str, str]: EPSG: Union[str, None] = None,
) -> tuple[str, str]:
""" """
Preprocess simulation input data simulation. Preprocess simulation input data simulation.
...@@ -168,15 +186,24 @@ class Ravaflow24Mixture: ...@@ -168,15 +186,24 @@ class Ravaflow24Mixture:
Name of the shell file (including path), which will be called by Name of the shell file (including path), which will be called by
`GRASS <https://grass.osgeo.org/>`_ to run the simulation. `GRASS <https://grass.osgeo.org/>`_ to run the simulation.
""" """
sh_file = os.path.join(self.dir_sim, f'{prefix}_shell.sh') warnings.warn(
grass_location = os.path.join(self.dir_sim, f'{prefix}_glocation') "The ravaflow24 module will be deprecated on 19.01.2024. Please use ravaflow3G module.",
results_dir = os.path.join(self.dir_sim, f'{prefix}_results') FutureWarning,
)
if os.path.exists(sh_file) or os.path.exists(grass_location) or \
os.path.exists(results_dir): sh_file = os.path.join(self.dir_sim, f"{prefix}_shell.sh")
grass_location = os.path.join(self.dir_sim, f"{prefix}_glocation")
results_dir = os.path.join(self.dir_sim, f"{prefix}_results")
if (
os.path.exists(sh_file)
or os.path.exists(grass_location)
or os.path.exists(results_dir)
):
raise ValueError( raise ValueError(
f"File(s) with prefix={prefix} already exists in {self.dir_sim}." f"File(s) with prefix={prefix} already exists in {self.dir_sim}."
f" Move or delete conflicting files, or use another prefix.") f" Move or delete conflicting files, or use another prefix."
)
if not os.path.exists(elevation): if not os.path.exists(elevation):
raise ValueError(f"{elevation} does not exist") raise ValueError(f"{elevation} does not exist")
...@@ -191,8 +218,7 @@ class Ravaflow24Mixture: ...@@ -191,8 +218,7 @@ class Ravaflow24Mixture:
os.system(f"grass -e -c EPSG:{EPSG} {grass_location}") os.system(f"grass -e -c EPSG:{EPSG} {grass_location}")
# create shell file # create shell file
with open(sh_file, 'w') as sh: with open(sh_file, "w") as sh:
sh.write("# import elevation raster \n") sh.write("# import elevation raster \n")
sh.write(f"r.in.gdal -o input={elevation} output=elev --overwrite") sh.write(f"r.in.gdal -o input={elevation} output=elev --overwrite")
sh.write("\n\n") sh.write("\n\n")
...@@ -200,13 +226,11 @@ class Ravaflow24Mixture: ...@@ -200,13 +226,11 @@ class Ravaflow24Mixture:
sh.write("# set region based on elev \n") sh.write("# set region based on elev \n")
sh.write("g.region raster=elev \n\n") sh.write("g.region raster=elev \n\n")
sh.write('# import hrelease raster \n') sh.write("# import hrelease raster \n")
sh.write( sh.write(f"r.in.gdal -o input={hrelease} output=raw_hrel --overwrite")
f"r.in.gdal -o input={hrelease} output=raw_hrel --overwrite")
sh.write("\n\n") sh.write("\n\n")
sh.write( sh.write(f'r.mapcalc "hrel = raw_hrel*{hrelease_ratio}" --overwrite')
f"r.mapcalc \"hrel = raw_hrel*{hrelease_ratio}\" --overwrite")
sh.write("\n\n") sh.write("\n\n")
sh.write( sh.write(
...@@ -220,7 +244,8 @@ class Ravaflow24Mixture: ...@@ -220,7 +244,8 @@ class Ravaflow24Mixture:
f"{self.surface_control},{self.entrainment_control}," f"{self.surface_control},{self.entrainment_control},"
f"{self.stopping_control},{self.friction_control} " f"{self.stopping_control},{self.friction_control} "
f"time={self.time_step},{self.time_end} " f"time={self.time_step},{self.time_end} "
f"cfl={self.cfl},{self.time_step_length}") f"cfl={self.cfl},{self.time_step_length}"
)
return grass_location, sh_file return grass_location, sh_file
...@@ -236,6 +261,10 @@ class Ravaflow24Mixture: ...@@ -236,6 +261,10 @@ class Ravaflow24Mixture:
sh_file : str sh_file : str
Name of the shell file (including path). Name of the shell file (including path).
""" """
warnings.warn(
"The ravaflow24 module will be deprecated on 19.01.2024. Please use ravaflow3G module.",
FutureWarning,
)
if not os.path.exists(grass_location): if not os.path.exists(grass_location):
raise ValueError(f"{grass_location} does not exist") raise ValueError(f"{grass_location} does not exist")
if not os.path.exists(os.path.join(grass_location, "PERMANENT")): if not os.path.exists(os.path.join(grass_location, "PERMANENT")):
...@@ -249,9 +278,9 @@ class Ravaflow24Mixture: ...@@ -249,9 +278,9 @@ class Ravaflow24Mixture:
os.chdir(self.dir_sim) os.chdir(self.dir_sim)
os.system(f"grass {grass_mapset} --exec sh {sh_file}") os.system(f"grass {grass_mapset} --exec sh {sh_file}")
def extract_impact_area(
def extract_impact_area(self, prefix: str, qoi: str = 'h', self, prefix: str, qoi: str = "h", threshold: Union[float, int] = 0.5
threshold: Union[float, int] = 0.5) -> float: ) -> float:
""" """
Extract impact area defined by a given quantity of interest and its Extract impact area defined by a given quantity of interest and its
threshold. threshold.
...@@ -275,23 +304,34 @@ class Ravaflow24Mixture: ...@@ -275,23 +304,34 @@ class Ravaflow24Mixture:
impact_area : float impact_area : float
A scalar value representing the overall impact area. A scalar value representing the overall impact area.
""" """
if qoi not in ['h','v','p','t']: warnings.warn(
"The ravaflow24 module will be deprecated on 19.01.2024. Please use ravaflow3G module.",
FutureWarning,
)
if qoi not in ["h", "v", "p", "t"]:
raise ValueError("qoi must be 'h', 'v', 'p', or 't'") raise ValueError("qoi must be 'h', 'v', 'p', or 't'")
qoi_max_raster_asc = os.path.join(self.dir_sim, f'{prefix}_results', qoi_max_raster_asc = os.path.join(
f'{prefix}_ascii', f'{prefix}_{qoi}flow_max.asc') self.dir_sim,
f"{prefix}_results",
f"{prefix}_ascii",
f"{prefix}_{qoi}flow_max.asc",
)
qoi_max_raster = np.loadtxt(qoi_max_raster_asc, skiprows=6) qoi_max_raster = np.loadtxt(qoi_max_raster_asc, skiprows=6)
cellsize = linecache.getline(qoi_max_raster_asc, 5) cellsize = linecache.getline(qoi_max_raster_asc, 5)
cellsize = float(cellsize.split()[-1].strip()) cellsize = float(cellsize.split()[-1].strip())
impact_area = np.sum(np.where(qoi_max_raster>threshold,1,0)) * \ impact_area = (
(cellsize)**2 np.sum(np.where(qoi_max_raster > threshold, 1, 0)) * (cellsize) ** 2
)
return float(impact_area) return float(impact_area)
def extract_qoi_max(self, prefix: str, qoi: str, def extract_qoi_max(
aggregate: bool = True) -> Union[float, np.ndarray]: self, prefix: str, qoi: str, aggregate: bool = True
) -> Union[float, np.ndarray]:
""" """
Extract the maximum value(s) of a quantity of interest. Extract the maximum value(s) of a quantity of interest.
...@@ -315,11 +355,20 @@ class Ravaflow24Mixture: ...@@ -315,11 +355,20 @@ class Ravaflow24Mixture:
qoi_max : float or numpy array qoi_max : float or numpy array
Maximum value(s) of ``qoi`` in one simulation. Maximum value(s) of ``qoi`` in one simulation.
""" """
if qoi not in ['h','v','p','t']: warnings.warn(
"The ravaflow24 module will be deprecated on 19.01.2024. Please use ravaflow3G module.",
FutureWarning,
)
if qoi not in ["h", "v", "p", "t"]:
raise ValueError("qoi must be 'h', 'v', 'p', or 't'") raise ValueError("qoi must be 'h', 'v', 'p', or 't'")
qoi_max_raster_asc = os.path.join(self.dir_sim, f'{prefix}_results', qoi_max_raster_asc = os.path.join(
f'{prefix}_ascii', f'{prefix}_{qoi}flow_max.asc') self.dir_sim,
f"{prefix}_results",
f"{prefix}_ascii",
f"{prefix}_{qoi}flow_max.asc",
)
qoi_max_raster = np.loadtxt(qoi_max_raster_asc, skiprows=6) qoi_max_raster = np.loadtxt(qoi_max_raster_asc, skiprows=6)
if aggregate: if aggregate:
...@@ -327,9 +376,7 @@ class Ravaflow24Mixture: ...@@ -327,9 +376,7 @@ class Ravaflow24Mixture:
else: else:
return np.rot90(np.transpose(qoi_max_raster)) return np.rot90(np.transpose(qoi_max_raster))
def extract_qoi_max_loc(self, prefix: str, loc: np.ndarray, qoi: str) -> np.ndarray:
def extract_qoi_max_loc(self, prefix: str, loc: np.ndarray,
qoi: str) -> np.ndarray:
""" """
Extract maximum value(s) of a quantity of interest at specific Extract maximum value(s) of a quantity of interest at specific
location(s). location(s).
...@@ -356,16 +403,26 @@ class Ravaflow24Mixture: ...@@ -356,16 +403,26 @@ class Ravaflow24Mixture:
Consist of maximum value(s) of ``qoi`` at each location. Consist of maximum value(s) of ``qoi`` at each location.
Shape of :code:`(nloc,)`. Shape of :code:`(nloc,)`.
""" """
warnings.warn(
"The ravaflow24 module will be deprecated on 19.01.2024. Please use ravaflow3G module.",
FutureWarning,
)
if not (loc.ndim == 2 and loc.shape[1] == 2): if not (loc.ndim == 2 and loc.shape[1] == 2):
raise ValueError( raise ValueError(
"loc must be a 2d numpy array with first axis corresponding to" "loc must be a 2d numpy array with first axis corresponding to"
" the number of locations and second axis to x and y coords") " the number of locations and second axis to x and y coords"
)
if qoi not in ['h','v','p','t']: if qoi not in ["h", "v", "p", "t"]:
raise ValueError("qoi must be 'h', 'v', 'p', or 't'") raise ValueError("qoi must be 'h', 'v', 'p', or 't'")
qoi_max_raster_asc = os.path.join(self.dir_sim, f'{prefix}_results', qoi_max_raster_asc = os.path.join(
f'{prefix}_ascii', f'{prefix}_{qoi}flow_max.asc') self.dir_sim,
f"{prefix}_results",
f"{prefix}_ascii",
f"{prefix}_{qoi}flow_max.asc",
)
header = [linecache.getline(qoi_max_raster_asc, i) for i in range(1, 6)] header = [linecache.getline(qoi_max_raster_asc, i) for i in range(1, 6)]
header_values = [float(h.split()[-1].strip()) for h in header] header_values = [float(h.split()[-1].strip()) for h in header]
...@@ -376,17 +433,24 @@ class Ravaflow24Mixture: ...@@ -376,17 +433,24 @@ class Ravaflow24Mixture:
x = np.arange(xll, xll + (cellsize * ncols), cellsize) x = np.arange(xll, xll + (cellsize * ncols), cellsize)
y = np.arange(yll, yll + (cellsize * nrows), cellsize) y = np.arange(yll, yll + (cellsize * nrows), cellsize)
if not all([loc[i,0]>=x[0] and loc[i,0]<=x[-1] and loc[i,1]>=y[0] and \ if not all(
loc[i,1]<=y[-1] for i in range(len(loc))]): [
loc[i, 0] >= x[0]
and loc[i, 0] <= x[-1]
and loc[i, 1] >= y[0]
and loc[i, 1] <= y[-1]
for i in range(len(loc))
]
):
raise ValueError( raise ValueError(
"Coordinates in loc must be within the boundary of topography") "Coordinates in loc must be within the boundary of topography"
)
qoi_max_raster = np.loadtxt(qoi_max_raster_asc, skiprows=6) qoi_max_raster = np.loadtxt(qoi_max_raster_asc, skiprows=6)
qoi_max_raster = np.rot90(np.transpose(qoi_max_raster)) qoi_max_raster = np.rot90(np.transpose(qoi_max_raster))
f_qoi_max = interp2d(x, y, qoi_max_raster) f_qoi_max = interp2d(x, y, qoi_max_raster)
qoi_max_loc = [float(f_qoi_max(loc[i,0], loc[i,1])) \ qoi_max_loc = [float(f_qoi_max(loc[i, 0], loc[i, 1])) for i in range(len(loc))]
for i in range(len(loc))]
return np.array(qoi_max_loc) return np.array(qoi_max_loc)
import os
import linecache
import numpy as np
from scipy.interpolate import interp2d
from typing import Union
from beartype import beartype
class Ravaflow3GMixture:
@beartype
def __init__(
self,
dir_sim: str,
time_step: Union[float, int] = 10,
time_end: Union[float, int] = 300,
cfl: float = 0.4,
time_step_length: float = 0.001,
diffusion_control: str = "0",
curvature_control: str = "1",
surface_control: str = "0",
entrainment_control: str = "0",
shear_velocity_coef: float = 0.05,
basal_friction_diff: float = 0.0,
stopping_control: str = "0",
stopping_threshold: float = 0.0,
friction_control: str = "0",
non_hydro_control: str = "0",
hydrograph_control: str = "2",
deceleration_control: str = "0",
) -> None:
"""
`r.avaflow 3G Mixture model` (Voellmy-type shallow flow model).
Parameters
----------
dir_sim : str
Directory to save output files generated by `r.avaflow`.
time_step : float or int
Time step for simulation, in seconds.
time_end : float or int
End time for simulation, in seconds.
cfl : float
CFL criterion, a value being equal or smaller than 0.5.
time_step_length : float
If ``cfl`` is not applicable, ``time_step_length`` is used.
In seconds. Recommended range is around :math:`0.1` to :math:`0.5`.
diffusion_control : str
`'0'`: no diffusion control (default)
`'1'`: experimental diffusion control is applied
curvature_control : str
`'0'`: curvature is neglected (default).
`'1'`: curvature is considered in the decelerating source terms (default).
`'2'`: curvature is considered in all relevant terms.
surface_control : str
`'0'`: no balancing of forces (default).
`'1'`: apply balancing of forces.
entrainment_control : str
`'0'`: no entrainment (default).
`'1'`: the entrainment coefficient is multiplied with flow momentum.
`'2'`: simplified entrainment and deposition model is used.
`'3'`: combination of `'1'` for entrainmnet and `'2'` for deposition.
`'4'`: acceleration-deceleration entrainment and deposition model.
shear_velocity_coef : float
Only used with ``entrainment_control`` being `'2'` or `'3'`, range
:math:`[0,1]`.
basal_friction_diff : float
Difference between the basal friction angles of the basal surface
and the flow, in degrees. Only used with ``entrainment_control``
being `'2'` or `'3'`.
stopping_control : str
`'0'`: no stopping control is used (default).
`'1'`: stop if the flow kinetic energy is equal or smaller than a
given threshold.
`'2'`: stop if the flow momentum is equal or smaller than a given
threshold.
`'3'`: stop if the dynamic flow pressure of all raster cells is
euqal or smaller than a given threshold.
stopping_threshold : float
Threshold value for ``stopping_control``.
If ``stopping_control`` is `'1'` or `'2'`, ``stopping_threshold``
has to be given as the maximum value reached during the flow.
If ``stopping_control`` is `'3'`, the pressure threshold has to be
specified.
friction_control : str, optional
`'0'`: no dynamic adaptation of friction parameters (default).
`'1'`: dynamic adaptation of friction parameters (ignored for the
mixture model).
non_hydro_control: str
`'0'`: Non-hydrostatic effects are excluded (default).
`'1'`: Only dispersion is considered.
`'2'`: Only enhanced gravity is considered.
`'3'`: Both dispersion and enhanced gravity are considered.
hydrograph_control: str
`'0'`: Impose hydrograph on the flow.
`'1'`: Reset flow at the hydrograph profile before imposing the hydrograph.
`'2'`: Impose the entire material added through the hydrograph on the centre of the hydrograph profile (default).
deceleration_control: str
`'0'`: Suppress change of direction induced by frictional or viscous forces (default).
`'1'`: Do not suppress the change of direction.
"""
if not os.path.isdir(dir_sim):
raise ValueError(f"{dir_sim} does not exist or is not a directory")
self.dir_sim = dir_sim
self.time_step = time_step
self.time_end = time_end
self.cfl = cfl
self.time_step_length = time_step_length
if diffusion_control not in ["0", "1"]:
raise ValueError("diffusion_control must be '0' or '1'")
self.diffusion_control = diffusion_control
if curvature_control not in ["0", "1", "2"]:
raise ValueError("curvature_control must be '0' or '1' or '2'")
self.curvature_control = curvature_control
if surface_control not in ["0", "1"]:
raise ValueError("surface_control must be '0' or '1'")
self.surface_control = surface_control
if entrainment_control not in ["0", "1", "2", "3", "4"]:
raise ValueError("entrainment_control must be '0', '1', '2', '3', or '4'")
self.entrainment_control = entrainment_control
self.shear_velocity_coef = shear_velocity_coef
self.basal_friction_diff = basal_friction_diff
if stopping_control not in ["0", "1", "2", "3"]:
raise ValueError("stopping_control must be '0', '1', '2', or '3'")
self.stopping_control = stopping_control
self.stopping_threshold = stopping_threshold
if friction_control not in ["0", "1"]:
raise ValueError("friction_control must be '0' or '1'")
self.friction_control = friction_control
if non_hydro_control not in ["0", "1", "2", "3"]:
raise ValueError("non_hydro_control must be '0', '1', '2', or '3'")
self.non_hydro_control = non_hydro_control
if hydrograph_control not in ["0", "1", "2"]:
raise ValueError("hydrograph_control must be '0', '1', or '2'")
self.hydrograph_control = hydrograph_control
if deceleration_control not in ["0", "1"]:
raise ValueError("deceleration_control must be '0' or '1'")
self.deceleration_control = deceleration_control
@beartype
def preprocess(
self,
prefix: str,
elevation: str,
hrelease: str,
cellsize: Union[float, int, np.floating, np.integer] = 20,
hrelease_ratio: Union[float, np.integer] = 1.0,
internal_friction: Union[float, int, np.floating, np.integer] = 35,
basal_friction: Union[float, int, np.floating, np.integer] = 20,
turbulent_friction: Union[float, int, np.floating, np.integer] = 3,
entrainment_coef: Union[float, np.floating] = -7.0,
EPSG: Union[str, None] = None,
) -> tuple[str, str]:
"""
Preprocess simulation input data simulation.
Parameters
----------
prefix : str
Prefix required by `r.avaflow` to name output files.
elevation : str
Name of elevation raster file (including its path).
The file format should be supported by `GDAL`.
Its unit is in meters.
hrelease : str
Name of release height raster file (including its path).
The file format should be supported by `GDAL`.
Its unit is in meters.
cellsize : float or int, optional
Cell size in meters to be used for simulation.
hrelease_ratio : float, optional
A positive value to multiple ``hrelease`` in order to control
the release volume.
internal_friction : float or int, optional
Internal friction angle, in degrees, range :math:`[0,90)`.
basal_friction : float or int, optional
Basal friction angle, in degrees, range :math:`[0,90)`.
turbulent_friction : float or int, optional
Logarithm with base :math:`10` of the turbulent friction, in
:math:`m/s^2`.
entrainment_coef : float, optional
Logarithm with base :math:`10` of the entrainment coefficient,
except for :math:`0` meaning no entrainment.
EPSG : str, optional
`EPSG` (European Petroleum Survey Group) code to create
`GRASS Location <https://grass.osgeo.org/grass82/manuals/grass_database.html>`_.
If `None`, ``elevation`` must be a georeferenced file which has
metadata to create the `GRASS Location`.
Returns
-------
grass_location : str
Name of the `GRASS Location` (including path).
sh_file : str
Name of the shell file (including path), which will be called by
`GRASS <https://grass.osgeo.org/>`_ to run the simulation.
"""
sh_file = os.path.join(self.dir_sim, f"{prefix}_shell.sh")
grass_location = os.path.join(self.dir_sim, f"{prefix}_glocation")
results_dir = os.path.join(self.dir_sim, f"{prefix}_results")
if (
os.path.exists(sh_file)
or os.path.exists(grass_location)
or os.path.exists(results_dir)
):
raise ValueError(
f"File(s) with prefix={prefix} already exists in {self.dir_sim}."
f" Move or delete conflicting files, or use another prefix."
)
if not os.path.exists(elevation):
raise ValueError(f"{elevation} does not exist")
if not os.path.exists(hrelease):
raise ValueError(f"{hrelease} does not exist")
# create grass location
if EPSG is None:
os.system(f"grass -e -c {elevation} {grass_location}")
else:
os.system(f"grass -e -c EPSG:{EPSG} {grass_location}")
# create shell file
with open(sh_file, "w") as sh:
sh.write("# import elevation raster \n")
sh.write(f"r.in.gdal -o input={elevation} output=elev --overwrite")
sh.write("\n\n")
sh.write("# set region based on elev \n")
sh.write("g.region raster=elev \n\n")
sh.write("# import hrelease raster \n")
sh.write(f"r.in.gdal -o input={hrelease} output=raw_hrel --overwrite")
sh.write("\n\n")
sh.write(f'r.mapcalc "hrel = raw_hrel*{hrelease_ratio}" --overwrite')
sh.write("\n\n")
sh.write(
f"r.avaflow -e -a prefix={prefix} cellsize={cellsize} phases=x "
f"elevation=elev hrelease=hrel "
f"friction={internal_friction},{basal_friction},"
f"{turbulent_friction} "
f"basal={entrainment_coef},{self.shear_velocity_coef},"
f"{self.basal_friction_diff},{self.stopping_threshold} "
f"controls=0,{self.diffusion_control},{self.curvature_control},"
f"{self.surface_control},{self.entrainment_control},"
f"{self.stopping_control},{self.friction_control},{self.non_hydro_control},"
f"{self.hydrograph_control},{self.deceleration_control} "
f"time={self.time_step},{self.time_end} "
f"cfl={self.cfl},{self.time_step_length}"
)
return grass_location, sh_file
@beartype
def run(self, grass_location: str, sh_file: str) -> None:
"""
Run simulation.
Parameters
----------
grass_location : str
Name of the `GRASS Location` (including path).
sh_file : str
Name of the shell file (including path).
"""
if not os.path.exists(grass_location):
raise ValueError(f"{grass_location} does not exist")
if not os.path.exists(os.path.join(grass_location, "PERMANENT")):
raise ValueError(f"{grass_location} is not a grass location")
if not os.path.exists(sh_file):
raise ValueError(f"{sh_file} does not exist")
grass_mapset = os.path.join(grass_location, "PERMANENT", "")
os.chdir(self.dir_sim)
os.system(f"grass {grass_mapset} --exec sh {sh_file}")
def extract_impact_area(
self, prefix: str, qoi: str = "h", threshold: Union[float, int] = 0.5
) -> float:
"""
Extract impact area defined by a given quantity of interest and its
threshold.
Parameters
----------
prefix : str
Prefix used by `r.avaflow` to name output files.
qoi : str
Quantity of interest to determine the impact area.
`'h'`: maximum flow height, in meters.
`'v'`: maximum flow velocity, in :math:`m/s`.
`'p'`: maximum flow pressure, in :math:`Pa`.
`'t'`: maximum flow kinetic energy, in :math:`J`.
threshold : float or int
Threshold of ``qoi`` to determine impact area. Areas where ``qoi``
is larger than ``threshold`` are regarded as impar area.
Returns
-------
impact_area : float
A scalar value representing the overall impact area.
"""
if qoi not in ["h", "v", "p", "t"]:
raise ValueError("qoi must be 'h', 'v', 'p', or 't'")
qoi_max_raster_asc = os.path.join(
self.dir_sim,
f"{prefix}_results",
f"{prefix}_ascii",
f"{prefix}_{qoi}flow_max.asc",
)
qoi_max_raster = np.loadtxt(qoi_max_raster_asc, skiprows=6)
cellsize = linecache.getline(qoi_max_raster_asc, 5)
cellsize = float(cellsize.split()[-1].strip())
impact_area = (
np.sum(np.where(qoi_max_raster > threshold, 1, 0)) * (cellsize) ** 2
)
return float(impact_area)
def extract_qoi_max(
self, prefix: str, qoi: str, aggregate: bool = True
) -> Union[float, np.ndarray]:
"""
Extract the maximum value(s) of a quantity of interest.
Parameters
----------
prefix : str
Prefix used by `r.avaflow` to name output files.
qoi : str
Quantity of interest.
`'h'`: maximum flow height, in meters.
`'v'`: maximum flow velocity, in :math:`m/s`.
`'p'`: maximum flow pressure, in :math:`Pa`.
`'t'`: maximum flow kinetic energy, in :math:`J`.
aggregate : bool
If `True`, returns the overall maximum value over all spatio-tempo
grids. If `False`, returns the maximum values over all time steps at
each spatio location, namely a raster of maximum values.
Returns
-------
qoi_max : float or numpy array
Maximum value(s) of ``qoi`` in one simulation.
"""
if qoi not in ["h", "v", "p", "t"]:
raise ValueError("qoi must be 'h', 'v', 'p', or 't'")
qoi_max_raster_asc = os.path.join(
self.dir_sim,
f"{prefix}_results",
f"{prefix}_ascii",
f"{prefix}_{qoi}flow_max.asc",
)
qoi_max_raster = np.loadtxt(qoi_max_raster_asc, skiprows=6)
if aggregate:
return float(np.max(qoi_max_raster))
else:
return np.rot90(np.transpose(qoi_max_raster))
def extract_qoi_max_loc(self, prefix: str, loc: np.ndarray, qoi: str) -> np.ndarray:
"""
Extract maximum value(s) of a quantity of interest at specific
location(s).
Parameters
----------
prefix : str
Prefix used by `r.avaflow` to name output files.
loc : numpy array
Coordinates of interested locations. Shape :code:`(nloc, 2)`, where
``nloc`` is the number of interested locations.
:code:`loc[:,0]` corresponds to `x` coordinates and
:code:`loc[:,1]` to `y` coordinates.
qoi : str
Quantity of interest.
`'h'`: maximum flow height, in meters.
`'v'`: maximum flow velocity, in :math:`m/s`.
`'p'`: maximum flow pressure, in :math:`Pa`.
`'t'`: maximum flow kinetic energy, in :math:`J`.
Returns
-------
qoi_max_loc : numpy array
Consist of maximum value(s) of ``qoi`` at each location.
Shape of :code:`(nloc,)`.
"""
if not (loc.ndim == 2 and loc.shape[1] == 2):
raise ValueError(
"loc must be a 2d numpy array with first axis corresponding to"
" the number of locations and second axis to x and y coords"
)
if qoi not in ["h", "v", "p", "t"]:
raise ValueError("qoi must be 'h', 'v', 'p', or 't'")
qoi_max_raster_asc = os.path.join(
self.dir_sim,
f"{prefix}_results",
f"{prefix}_ascii",
f"{prefix}_{qoi}flow_max.asc",
)
header = [linecache.getline(qoi_max_raster_asc, i) for i in range(1, 6)]
header_values = [float(h.split()[-1].strip()) for h in header]
ncols, nrows, xll, yll, cellsize = header_values
ncols = int(ncols)
nrows = int(nrows)
x = np.arange(xll, xll + (cellsize * ncols), cellsize)
y = np.arange(yll, yll + (cellsize * nrows), cellsize)
if not all(
[
loc[i, 0] >= x[0]
and loc[i, 0] <= x[-1]
and loc[i, 1] >= y[0]
and loc[i, 1] <= y[-1]
for i in range(len(loc))
]
):
raise ValueError(
"Coordinates in loc must be within the boundary of topography"
)
qoi_max_raster = np.loadtxt(qoi_max_raster_asc, skiprows=6)
qoi_max_raster = np.rot90(np.transpose(qoi_max_raster))
f_qoi_max = interp2d(x, y, qoi_max_raster)
qoi_max_loc = [float(f_qoi_max(loc[i, 0], loc[i, 1])) for i in range(len(loc))]
return np.array(qoi_max_loc)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment