Source code for psimpy.simulator.mass_point_model

import os
import math
import linecache
import numpy as np
from scipy.integrate import ode
from scipy.interpolate import interp2d
from typing import Union
from beartype import beartype

[docs]class MassPointModel: """Simulate the movement of a masspoint on a topography.""" @staticmethod def _preprocess(elevation, x0, y0): """ Preprocess input data. Parameters ---------- elevation : str Name of the elevation raster file, in ESRI ascii format, including the path. x0 : float or int x coordinate of initial position. y0 : float or int y coordinate of initial position. Returns ------- fgrad_x : Callable Call with :code:`fgrad_x(x, y)`. Return first order partial derivative of ``elevation`` with respect to `x` at location `(x, y)`. fgrad_y : callable Call with :code:`fgrad_y(x, y)`. Return first order partial derivative of ``elevation`` with respect to `y` at location `(x, y)`. fgrad_xx : Callable Call with :code:`fgrad_xx(x, y)`. Return second order partial derivative of ``elevation`` with respect to `x` at location `(x, y)`. fgrad_yy : Callable Call with :code:`fgrad_yy(x, y)`. Return second order partial derivative of ``elevation`` with respect to `y` at location `(x, y)`. fgrad_xy : Callable Call with :code:`fgrad_xy(x ,y)`. Return second order partial derivative of ``elevation`` with respect to `x` and `y` at location `(x, y)`. """ if not os.path.exists(elevation): raise ValueError(f"{elevation} does not exist") header = [linecache.getline(elevation, i) for i in range(1,6)] header_values = [float(h.split()[-1].strip()) for h in header] ncols, nrows, xll, yll, cellsize = header_values ncols = int(ncols) nrows = int(nrows) x_values = np.arange(xll, xll+(cellsize*ncols), cellsize) y_values = np.arange(yll, yll+(cellsize*nrows), cellsize) if not all([x0>x_values[0], x0<x_values[-1], y0>y_values[0], y0<y_values[-1]]): raise ValueError( "x0 and y0 must be within the boundary of elevation") z_values = np.loadtxt(elevation, skiprows=6) z_values = np.rot90(np.transpose(z_values)) dzdx = np.gradient(z_values, x_values, axis=1) dzdy = np.gradient(z_values, y_values, axis=0) dzdxx = np.gradient(dzdx, x_values, axis=1) dzdyy = np.gradient(dzdy, y_values, axis=0) dzdxy = np.gradient(dzdx, y_values, axis=0) fgrad_x = interp2d(x_values, y_values, dzdx) fgrad_y = interp2d(x_values, y_values, dzdy) fgrad_xx = interp2d(x_values, y_values, dzdxx) fgrad_yy = interp2d(x_values, y_values, dzdyy) fgrad_xy = interp2d(x_values, y_values, dzdxy) return fgrad_x, fgrad_y, fgrad_xx, fgrad_yy, fgrad_xy @staticmethod def _mass_point_model( t, alpha, coulomb_friction, turbulent_friction, fgrad_x, fgrad_y, fgrad_xx, fgrad_yy, fgrad_xy, g, curvature): """ Define mass point model (3d) which serves as the callable function :code:`f(t,alpha,*f_args)` in :class:`scipy.integrate.ode(f)`. Parameters: ----------- t : float or int Time in seconds. alpha : list Contains `x`, `y`, `ux`, `uy` at time `t`, i.e., `[x(t), y(t), ux(t), uy(t)]`. coulomb_friction : float Dry Coulomb friction coefficient. turbulent_friction : float or int Turbulent friction coefficient, in :math:`m/s^2`. fgrad_x : Callable Call with :code:`fgrad_x(x, y)`. Return first order partial derivative of ``elevation`` with respect to `x` at location `(x, y)`. fgrad_y : Callable Call with :code:`fgrad_y(x, y)`. Return first order partial derivative of ``elevation`` with respect to `y` at location `(x, y)`. fgrad_xx : Callable Call with :code:`fgrad_xx(x, y)`. Return second order partial derivative of ``elevation`` with respect to `x` at location `(x, y)`. fgrad_yy : Callable Call with :code:`fgrad_yy(x, y)`. Return second order partial derivative of ``elevation`` with respect to `y` at location `(x, y)`. fgrad_xy : Callable Call with :code:`fgrad_xy(x ,y)`. Return second order partial derivative of ``elevation`` with respect to `x` and `y` at location `(x, y)`. g : float or int Gravity acceleration, in :math:`m/s^2`. curvature : bool If `True`, take the curvature effect into account. Return: ------- dalphadt : numpy array Right hand side of the mass point model. """ x, y, ux, uy = alpha[0], alpha[1], alpha[2], alpha[3] gradx, grady, gradxx, gradyy, gradxy = fgrad_x(x,y), fgrad_y(x,y), \ fgrad_xx(x,y), fgrad_yy(x,y), fgrad_xy(x,y) sqrt_x = math.sqrt(1 + gradx**2) sqrt_y = math.sqrt(1 + grady**2) sqrt_xy = math.sqrt(1 + gradx**2 + grady**2) gx = gradx * (-g) / sqrt_x gy = grady * (-g) / sqrt_y gn = g / sqrt_xy if curvature: kx = gradxx / (1 + gradx**2) / sqrt_xy kxy = gradxy / sqrt_x / sqrt_y / sqrt_xy ky = gradyy / (1 + grady**2) / sqrt_xy uku = kx*ux**2 + 2*kxy*ux*uy + ky*uy**2 else: uku = 0 u = math.sqrt(ux**2 + uy**2 + 2*ux*uy*gradx*grady/sqrt_x/sqrt_y) dalphadt = np.zeros(len(alpha)) dalphadt[0] = ux / sqrt_x dalphadt[1] = uy / sqrt_y if u != 0: friction = coulomb_friction*(gn+uku) + g*u**2/turbulent_friction dalphadt[2] = gx - (ux/u)*friction dalphadt[3] = gy - (uy/u)*friction else: friction = coulomb_friction*gn dalphadt[2] = gx + friction*gradx*(1+gradx**2+grady**2)/sqrt_x/ \ math.sqrt(gradx**2+grady**2+(gradx**2+grady**2)**2) dalphadt[3] = gy + friction*grady*(1+gradx**2+grady**2)/sqrt_y/ \ math.sqrt(gradx**2+grady**2+(gradx**2+grady**2)**2) return dalphadt
[docs] @beartype def run( self, elevation: str, coulomb_friction: float, turbulent_friction: Union[float, int], x0: Union[float, int], y0: Union[float, int], ux0: Union[float, int] = 0, uy0: Union[float, int] = 0, dt: Union[float, int] = 1, tend: Union[float, int] = 300, t0: Union[float, int] = 0, g: Union[float, int] = 9.8, atol: float = 1e-6, rtol: float = 1e-6, curvature: bool = False) -> np.ndarray: """ Solve the mass point model using :class:`scipy.integrate.ode` solver given required input data. Parameters ---------- elevation : str Name of the elevation raster file, in ESRI ascii format, including the path. coulomb_friction : float Dry Coulomb friction coefficient. turbulent_friction : float or int Turbulent friction coefficient, in :math:`m/s^2`. x0 : float or int `x` coordinate of initial position. y0 : float or int `y` coordinate of initial position. ux0 : float or int Initial velocity in `x` direction. uy0 : float or int Initial velocity in `y` direction. dt : float or int Time step in seconds. tend : float or int End time in seconds. t0 : float or int Initial time. g : float or int Gravity acceleration, in :math:`m/s^2`. atol : float Absolute tolerance for solution. rtol : float Relative tolerance for solution. curvature : bool If `True`, take the curvature effect into account. Returns ------- output: numpy array Time history of the mass point's location and velocity. 2d :class:`numpy.ndarray`. Each row corresponds to a time step and each column corresponds to a quantity. In total :math:`6` columns, namely :code:`output.shape[1]=6`. More specifically: :code:`output[:,0]` are the time steps, :code:`output[:,1]` are the `x` coordinates, :code:`output[:,2]` are the `y` coordinates, :code:`output[:,3]` are velocity values in `x` direction, :code:`output[:,4]` are velocity values in `y` direction, :code:`output[:,5]` are total velocity values. """ fgrad_x, fgrad_y, fgrad_xx, fgrad_yy, fgrad_xy = \ self._preprocess(elevation, x0, y0) s = ode(self._mass_point_model).set_integrator('dopri5', method='bdf', atol=atol, rtol=rtol) s.set_f_params(coulomb_friction, turbulent_friction, fgrad_x, fgrad_y, fgrad_xx,fgrad_yy,fgrad_xy, g, curvature) alpha0 = [x0, y0, ux0, uy0] s.set_initial_value(alpha0, t0) t_all = [t0] alpha_all = [alpha0] while s.successful() and s.t < tend: s.integrate(s.t + dt) t_all.append(s.t) alpha_all.append(s.y) t_all = np.array(t_all) alpha_all = np.array(alpha_all) u = np.zeros(len(t_all)) for i in range(len(t_all)): gradx = fgrad_x(alpha_all[i,0], alpha_all[i,1]) grady = fgrad_y(alpha_all[i,0], alpha_all[i,1]) ux = alpha_all[i,2] uy = alpha_all[i,3] u[i] = math.sqrt(ux**2 + uy**2 + 2*ux*uy*gradx*grady/ math.sqrt(1+gradx**2)/math.sqrt(1+grady**2)) t_all = np.reshape(t_all,(len(t_all),-1)) u = np.reshape(u,(len(u),-1)) output = np.concatenate((t_all, alpha_all, u), axis=1) return output