import os
import linecache
import numpy as np
from scipy.interpolate import interp2d
from typing import Union
from beartype import beartype
[docs]class Ravaflow24Mixture:
@beartype
def __init__(
self, dir_sim: str, time_step: Union[float, int] = 10,
time_end: Union[float, int] = 300, cfl: float = 0.4,
time_step_length: float = 0.001, conversion_control: str = '0',
curvature_control: str = '1', surface_control: str = '0',
entrainment_control: str = '0', shear_velocity_coef: float = 0.05,
basal_friction_diff: float = 0.0, stopping_control: str = '0',
stopping_threshold: float = 0.0, friction_control: str = '0') -> None:
"""
`r.avaflow 2.4 Mixture model` (Voellmy-type shallow flow model).
Parameters
----------
dir_sim : str
Directory to save output files generated by `r.avaflow`.
time_step : float or int
Time step for simulation, in seconds.
time_end : float or int
End time for simulation, in seconds.
cfl : float
CFL criterion, a value being equal or smaller than 0.5.
time_step_length : float
If ``cfl`` is not applicable, ``time_step_length`` is used.
In seconds. Recommended range is around :math:`0.1` to :math:`0.5`.
conversion_control : str
`'0'`: no conversion of flow heights to flow depths.
`'1'`: conversion is applied.
curvature_control : str
`'0'`: curvature is neglected.
`'1'`: curvature is considered in the decelerating source terms.
`'2'`: curvature is considered in all relevant terms.
surface_control : str
`'0'`: no balancing of forces.
`'1'`: apply balancing of forces.
entrainment_control : str
`'0'`: no entrainment.
`'1'`: the entrainment coefficient is multiplied with flow momentum.
`'2'`: simplified entrainment and deposition model is used.
`'3'`: combination of `'1'` for entrainmnet and `'2'` for deposition.
`'4'`: acceleration-deceleration entrainment and deposition model.
shear_velocity_coef : float
Only used with ``entrainment_control`` being `'2'` or `'3'`, range
:math:`[0,1]`.
basal_friction_diff : float
Difference between the basal friction angles of the basal surface
and the flow, in degrees. Only used with ``entrainment_control``
being `'2'` or `'3'`.
stopping_control : str
`'0'`: no stopping control is used.
`'1'`: stop if the flow kinetic energy is equal or smaller than a
given threshold.
`'2'`: stop if the flow momentum is equal or smaller than a given
threshold.
`'3'`: stop if the dynamic flow pressure of all raster cells is
euqal or smaller than a given threshold.
stopping_threshold : float
Threshold value for ``stopping_control``.
If ``stopping_control`` is `'1'` or `'2'`, ``stopping_threshold``
has to be given as the maximum value reached during the flow.
If ``stopping_control`` is `'3'`, the pressure threshold has to be
specified.
friction_control : str, optional
`'0'`: no dynamic adaptation of friction parameters.
`'1'`: dynamic adaptation of friction parameters (ignored for the
mixture model).
"""
if not os.path.isdir(dir_sim):
raise ValueError(f"{dir_sim} does not exist or is not a directory")
self.dir_sim = dir_sim
self.time_step = time_step
self.time_end = time_end
self.cfl = cfl
self.time_step_length = time_step_length
if conversion_control not in ['0','1']:
raise ValueError("convresion_control must be '0' or '1'")
self.conversion_control = conversion_control
if curvature_control not in ['0','1','2']:
raise ValueError("curvature_control must be '0' or '1' or '2'")
self.curvature_control = curvature_control
if surface_control not in ['0','1']:
raise ValueError("surface_control must be '0' or '1'")
self.surface_control = surface_control
if entrainment_control not in ['0','1','2','3','4']:
raise ValueError(
"entrainment_control must be '0', '1', '2', '3', or '4'")
self.entrainment_control = entrainment_control
self.shear_velocity_coef = shear_velocity_coef
self.basal_friction_diff = basal_friction_diff
if stopping_control not in ['0','1','2','3']:
raise ValueError("stopping_control must be '0', '1', '2', or '3'")
self.stopping_control = stopping_control
self.stopping_threshold = stopping_threshold
if friction_control not in ['0','1']:
raise ValueError("friction_control must be '0' or '1'")
self.friction_control = friction_control
[docs] @beartype
def preprocess(
self, prefix: str, elevation: str, hrelease: str,
cellsize: Union[float, int, np.floating, np.integer] = 20,
hrelease_ratio: Union[float, np.integer] = 1.0,
internal_friction: Union[float, int, np.floating, np.integer] = 35,
basal_friction: Union[float, int, np.floating, np.integer] = 20,
turbulent_friction: Union[float, int, np.floating, np.integer] = 3,
entrainment_coef: Union[float, np.floating] = -7.0,
EPSG: Union[str, None] = None) -> tuple[str, str]:
"""
Preprocess simulation input data simulation.
Parameters
----------
prefix : str
Prefix required by `r.avaflow` to name output files.
elevation : str
Name of elevation raster file (including its path).
The file format should be supported by `GDAL`.
Its unit is in meters.
hrelease : str
Name of release height raster file (including its path).
The file format should be supported by `GDAL`.
Its unit is in meters.
cellsize : float or int, optional
Cell size in meters to be used for simulation.
hrelease_ratio : float, optional
A positive value to multiple ``hrelease`` in order to control
the release volume.
internal_friction : float or int, optional
Internal friction angle, in degrees, range :math:`[0,90)`.
basal_friction : float or int, optional
Basal friction angle, in degrees, range :math:`[0,90)`.
turbulent_friction : float or int, optional
Logarithm with base :math:`10` of the turbulent friction, in
:math:`m/s^2`.
entrainment_coef : float, optional
Logarithm with base :math:`10` of the entrainment coefficient,
except for :math:`0` meaning no entrainment.
EPSG : str, optional
`EPSG` (European Petroleum Survey Group) code to create
`GRASS Location <https://grass.osgeo.org/grass82/manuals/grass_database.html>`_.
If `None`, ``elevation`` must be a georeferenced file which has
metadata to create the `GRASS Location`.
Returns
-------
grass_location : str
Name of the `GRASS Location` (including path).
sh_file : str
Name of the shell file (including path), which will be called by
`GRASS <https://grass.osgeo.org/>`_ to run the simulation.
"""
sh_file = os.path.join(self.dir_sim, f'{prefix}_shell.sh')
grass_location = os.path.join(self.dir_sim, f'{prefix}_glocation')
results_dir = os.path.join(self.dir_sim, f'{prefix}_results')
if os.path.exists(sh_file) or os.path.exists(grass_location) or \
os.path.exists(results_dir):
raise ValueError(
f"File(s) with prefix={prefix} already exists in {self.dir_sim}."
f" Move or delete conflicting files, or use another prefix.")
if not os.path.exists(elevation):
raise ValueError(f"{elevation} does not exist")
if not os.path.exists(hrelease):
raise ValueError(f"{hrelease} does not exist")
# create grass location
if EPSG is None:
os.system(f"grass -e -c {elevation} {grass_location}")
else:
os.system(f"grass -e -c EPSG:{EPSG} {grass_location}")
# create shell file
with open(sh_file, 'w') as sh:
sh.write("# import elevation raster \n")
sh.write(f"r.in.gdal -o input={elevation} output=elev --overwrite")
sh.write("\n\n")
sh.write("# set region based on elev \n")
sh.write("g.region raster=elev \n\n")
sh.write('# import hrelease raster \n')
sh.write(
f"r.in.gdal -o input={hrelease} output=raw_hrel --overwrite")
sh.write("\n\n")
sh.write(
f"r.mapcalc \"hrel = raw_hrel*{hrelease_ratio}\" --overwrite")
sh.write("\n\n")
sh.write(
f"r.avaflow -e -a prefix={prefix} cellsize={cellsize} phases=x "
f"elevation=elev hrelease=hrel "
f"friction={internal_friction},{basal_friction},"
f"{turbulent_friction} "
f"basal={entrainment_coef},{self.shear_velocity_coef},"
f"{self.basal_friction_diff},{self.stopping_threshold} "
f"controls={self.conversion_control},{self.curvature_control},"
f"{self.surface_control},{self.entrainment_control},"
f"{self.stopping_control},{self.friction_control} "
f"time={self.time_step},{self.time_end} "
f"cfl={self.cfl},{self.time_step_length}")
return grass_location, sh_file
[docs] @beartype
def run(self, grass_location: str, sh_file: str) -> None:
"""
Run simulation.
Parameters
----------
grass_location : str
Name of the `GRASS Location` (including path).
sh_file : str
Name of the shell file (including path).
"""
if not os.path.exists(grass_location):
raise ValueError(f"{grass_location} does not exist")
if not os.path.exists(os.path.join(grass_location, "PERMANENT")):
raise ValueError(f"{grass_location} is not a grass location")
if not os.path.exists(sh_file):
raise ValueError(f"{sh_file} does not exist")
grass_mapset = os.path.join(grass_location, "PERMANENT", "")
os.chdir(self.dir_sim)
os.system(f"grass {grass_mapset} --exec sh {sh_file}")