From 45740f1db16993c251fe426b20fe6376755c1aa9 Mon Sep 17 00:00:00 2001 From: Sebastian Kerger <sebastian.kerger@rwth-aachen.de> Date: Wed, 5 Mar 2025 10:32:40 +0100 Subject: [PATCH] Added py_package. environment and gitignore --- .gitignore | 192 +++ env.yml | 37 + py_package/pyproject.toml | 3 + py_package/src/entfrachten/__init__.py | 0 .../src/entfrachten/analysis/__init__.py | 0 .../analysis/cross_section_info_loader.py | 191 +++ .../analysis/drainage_system_graph.py | 122 ++ .../analysis/drainage_sysytem_data.py | 315 +++++ .../entfrachten/analysis/dynamic_analysis.py | 320 +++++ .../src/entfrachten/analysis/sewer_map.py | 257 ++++ .../analysis/simulation_reading.py | 361 +++++ .../entfrachten/analysis/static_analysis.py | 53 + .../src/entfrachten/analysis/unit_factors.py | 12 + py_package/src/entfrachten/optim/__init__.py | 0 .../src/entfrachten/optim/loss_functions.py | 80 ++ .../src/entfrachten/optim/optim_dumper.py | 82 ++ .../src/entfrachten/optim/simulation.py | 262 ++++ .../src/entfrachten/optim/simulation_dummy.py | 91 ++ .../src/entfrachten/plotting/struct_plot.py | 1163 +++++++++++++++++ 19 files changed, 3541 insertions(+) create mode 100644 .gitignore create mode 100644 env.yml create mode 100644 py_package/pyproject.toml create mode 100644 py_package/src/entfrachten/__init__.py create mode 100644 py_package/src/entfrachten/analysis/__init__.py create mode 100644 py_package/src/entfrachten/analysis/cross_section_info_loader.py create mode 100644 py_package/src/entfrachten/analysis/drainage_system_graph.py create mode 100644 py_package/src/entfrachten/analysis/drainage_sysytem_data.py create mode 100644 py_package/src/entfrachten/analysis/dynamic_analysis.py create mode 100644 py_package/src/entfrachten/analysis/sewer_map.py create mode 100644 py_package/src/entfrachten/analysis/simulation_reading.py create mode 100644 py_package/src/entfrachten/analysis/static_analysis.py create mode 100644 py_package/src/entfrachten/analysis/unit_factors.py create mode 100644 py_package/src/entfrachten/optim/__init__.py create mode 100644 py_package/src/entfrachten/optim/loss_functions.py create mode 100644 py_package/src/entfrachten/optim/optim_dumper.py create mode 100644 py_package/src/entfrachten/optim/simulation.py create mode 100644 py_package/src/entfrachten/optim/simulation_dummy.py create mode 100644 py_package/src/entfrachten/plotting/struct_plot.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ef0f007 --- /dev/null +++ b/.gitignore @@ -0,0 +1,192 @@ +# Project specific + +# files generated by mike+ +*.log +*.res1d +*_ErrorLog*.html +*_Summary*.html +# Mike+ models and data +*model*/ +!model_reading/ +# output files +tmp*/ +out*/ +*.pickle + + + +# For Python files, copied from https://github.com/github/gitignore/blob/main/Python.gitignore +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# UV +# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +#uv.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +# Ruff stuff: +.ruff_cache/ + +# PyPI configuration file +.pypirc \ No newline at end of file diff --git a/env.yml b/env.yml new file mode 100644 index 0000000..7c19179 --- /dev/null +++ b/env.yml @@ -0,0 +1,37 @@ +name: entfrachten +channels: + - conda-forge + - defaults +dependencies: + - python=3.9 + - dill + - matplotlib + - numpy=1.26.3 + - pandas=2.2.2 + - geopandas=1.0.1 + - folium + - mapclassify + - xarray=2024.1.1 + - openpyxl + - plotly + - seaborn + - importnb + - papermill=2.5.0 + - scipy + - tqdm + - ipywidgets + - sqlalchemy + - psycopg2 + - networkx=3.2.1 + - ipykernel + - pythonnet=3.0.3 + - pip + - pip: + - mikeio==2.1.2 + - mikeio1d==0.9.1 + - https://github.com/DHI/mikepluspy/archive/refs/tags/v2024.0-latest.zip + - bayes-optim==0.3.0 + - hydroeval + - nevergrad==1.0.5 + # installs content of py_package in edit mode, so that changes to it are reflected without reinstalling + - -e py_package diff --git a/py_package/pyproject.toml b/py_package/pyproject.toml new file mode 100644 index 0000000..92b87d0 --- /dev/null +++ b/py_package/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["setuptools>=62.0"] +build-backend = "setuptools.build_meta" \ No newline at end of file diff --git a/py_package/src/entfrachten/__init__.py b/py_package/src/entfrachten/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/py_package/src/entfrachten/analysis/__init__.py b/py_package/src/entfrachten/analysis/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/py_package/src/entfrachten/analysis/cross_section_info_loader.py b/py_package/src/entfrachten/analysis/cross_section_info_loader.py new file mode 100644 index 0000000..c533f7b --- /dev/null +++ b/py_package/src/entfrachten/analysis/cross_section_info_loader.py @@ -0,0 +1,191 @@ +"""Script to load reach CrossSection Information. +# The program can be executed like so: 'python -m entfrachten.analysis.cross_section_info_loader some_folder/some_model.sqlite -a "\\.\pipe\some_pipe"' +Where some_model is the mike model you would like to read from. +# """ + +def start_listener(): + """Start the listener. Note that this also executes imports which may prevent you from using mikeio1d.""" + # imports are inside the function because they have side effects + import argparse + from multiprocessing.connection import Listener + from warnings import warn + + import numpy as np + import scipy.integrate + + # mike api loading, its a c# api, clr.AddReference is used for loading + # first import clr + import clr + + # use AddReference to add a reference to the dll (in c# this would be a using directive). + # DHI.Mike.Install needs some additional parameters, I have copied those from the python examples from DHI. + # see https://github.com/DHI/Mike1D-SDK-examples/blob/master/scripts/ResultDataExtract.py + clr.AddReference( + "DHI.Mike.Install, Version=1.0.0.0, Culture=neutral, PublicKeyToken=c513450b5d0bf0bf" + ) + # after using AddReference, we can import the c# libraries like python modules. + from DHI.Mike.Install import MikeImport, MikeProducts + + MikeImport.SetupLatest(MikeProducts.MikePlus) + clr.AddReference("DHI.Mike1D.Generic") + from DHI.Mike1D.Generic import ( + Connection, + Diagnostics, + Location, + LocationSpan, + XYZLocationSpan, + ZLocation, + ) + + clr.AddReference("DHI.Mike1D.Mike1DDataAccess") + from DHI.Mike1D.Mike1DDataAccess import Mike1DBridge + + clr.AddReference("DHI.Mike1D.NetworkDataAccess") + from DHI.Mike1D.NetworkDataAccess import IReach + + clr.AddReference("DHI.Mike1D.CrossSectionModule") + from DHI.Mike1D.CrossSectionModule import ICrossSection, CrossSectionData, ICrossSection + + # ArgumentParser helps to add command line arguments. + # Just use .add_argument and it takes care of parsing the command line input. + parser = argparse.ArgumentParser("cross_section_info_loader") + parser.add_argument("model_path", help="path of mike model", type=str) + parser.add_argument( + "-a", + "--address", + default=R"\\.\pipe\xs_pipe", + help="address of listener, client needs to use the same", + type=str, + ) + # this is where the parser does the actual parsing, raises an exception and prints help info if it fails. + args = parser.parse_args() + + # we do not have topos, but have to pass an argument, which needs to be an empty string. + NULL_TOPO = "" + + # Load the model data. + print("loading file ", args.model_path) + # Many methods from the mike1d api need this diagnostics object as an input. Probably used for error messages. + diagnostics = Diagnostics("diagnostics") + # the connection tells the api where to read the model data from + connection = Connection.Create(args.model_path) + # May need a simulation id if there are multiple scenarios: + # connection.Options.Add("simulationId", "397") + # The bridge helps to read the model data from any supported format + mike1DBridge = Mike1DBridge() + mike1DData = mike1DBridge.Open(connection, diagnostics) + # reaches are contained as list, put into dict with name as key + reach_dict: dict[str, IReach] = { + r.LocationSpan.ID: r for r in mike1DData.Network.Reaches + } + # this is the part of the data we need + csData = mike1DData.CrossSections + + # use the Listener to communicate with another process. + address = args.address + if address.startswith("(") and address.endswith(")"): + print("trying to parse port from {address}") + port = int(address.split(",")[-1].removesuffix(")")) + print(f"set port to {port}") + address = ("localhost", port) + print(f"set address to {address}") + print(f"starting server at {address} with authkey b'reach_reader'") + listener = Listener(address, authkey=b"reach_reader") + print(f"started server at {address} with authkey b'reach_reader'") + conn = listener.accept() + print("connection accepted from", listener.last_accepted) + should_run = True + while should_run: + try: + msg = conn.recv() + # otherwise assume its a reach id + if isinstance(msg, tuple): + fkt = msg[0] + if fkt == "full_xs_area": + if not len(msg) == 2: + raise ValueError( + "Could not parse. To get full cross section area for a reach send ('full_xs_area',reach)." + ) + reach = msg[1] + # first argument is the reach name. Second is topo_id, which is not used in our case + cross_section = csData.FindGlobalCrossSection(reach, NULL_TOPO) + # need to call initialize before the cross section can give us values. + cross_section.Initialize() + # send data back to client + conn.send( + cross_section.GetArea( + cross_section.BottomLevel + cross_section.Height + ) + ) + elif fkt == "xs_areas_at": + if not len(msg) == 4: + raise ValueError( + "Could not parse. To get cross section area for a reach at specific waterlevel and chainages send ('xs_areas_at', reach, water_level, chainage_list)." + ) + reach = msg[1] + reach_span: XYZLocationSpan = reach_dict[reach].LocationSpan + water_level = float(msg[2]) + chainages = np.array(msg[3]).clip(0, reach_span.EndChainage) + cross_areas = np.full_like(chainages, np.nan) + for i, c in enumerate(chainages): + # GetCrossSection does interpolation, this should take care of changing reach heights or cross_section shapes. + # This requires a location with Z-Value, which we can get from the LocationSpan of the IReach object. + loc = reach_span.CreateLocation(c) + cross_section = csData.GetCrossSection(loc, NULL_TOPO) + if not cross_section is None: + # need to call initialize before the cross section can give us values. + cross_section.Initialize() + # might raise an exception if the water level is too high, clip. + effective_water_level = np.clip( + water_level, + cross_section.BottomLevel, + cross_section.BottomLevel + cross_section.Height, + ) + cross_areas[i] = cross_section.GetArea(effective_water_level) + conn.send((chainages, cross_areas)) + elif fkt == "volume_from_waterlevels": + if not len(msg) == 4: + raise ValueError( + "Could not parse. To get volume in a reach from water levels send ('volume_from_waterlevels', reach, chainages ,water_levels)." + ) + reach = msg[1] + chainages = msg[2] + water_levels = msg[3] + if not len(chainages) == len(water_levels): + raise ValueError( + "chainages and water levels need to have same length" + ) + reach_span: XYZLocationSpan = reach_dict[reach].LocationSpan + + cross_areas = np.full_like(chainages, np.nan) + for i, c in enumerate(chainages): + # GetCrossSection does interpolation, this should take care of changing reach heights or cross_section shapes. + # This requires a location with Z-Value, which we can get from the LocationSpan of the IReach object. + loc = reach_span.CreateLocation(c) + cross_section = csData.GetCrossSection(loc, NULL_TOPO) + if not cross_section is None: + # need to call initialize before the cross section can give us values. + cross_section.Initialize() + # might raise an exception if the water level is too high, clip. + effective_water_level = np.clip( + water_levels[i], + cross_section.BottomLevel, + cross_section.BottomLevel + cross_section.Height, + ) + cross_areas[i] = cross_section.GetArea(effective_water_level) + volume = scipy.integrate.trapezoid(y=cross_areas, x=chainages) + conn.send(volume) + # close if msg is "close" + if msg == "close": + print("closing now") + should_run = False + except Exception as e: + print("Exception occurred.") + warn(str(e)) + # send a new Exception made with just the string, this avoids errors where the original exception cannot be pickled + conn.send(Exception(str(e))) + conn.close() + listener.close() + +if __name__ == "__main__": + start_listener() diff --git a/py_package/src/entfrachten/analysis/drainage_system_graph.py b/py_package/src/entfrachten/analysis/drainage_system_graph.py new file mode 100644 index 0000000..62e5087 --- /dev/null +++ b/py_package/src/entfrachten/analysis/drainage_system_graph.py @@ -0,0 +1,122 @@ +"""Module for graph problems on the sewer network. Names are the same as in the mike model: +Nodes may be manholes or basins or pipe junctions. +Any connection between the nodes are called reaches, reaches are usually pipes but may also be weirs or pumps. +So reaches are what would be called edges in graph theory.""" + +import collections +import typing +import pandas as pd + +class PathNotFoundException(Exception): + pass + +class DrainageSystemGraph: + def __init__(self,reach_df:pd.DataFrame) -> None: + """reach_df: index is the name of the reach, needs columns 'start' and 'end' which define which nodes the reach connects.""" + self.reach_df = reach_df + + def find_upstream_nodes_reaches(self,start:str, stop_list:typing.Iterable[str]) -> typing.Tuple[set[str],set[str]]: + """finds all nodes and reaches upstream of start, where the connection does not go through a node from stop_list. + Return a set of nodes and a set of reaches.""" + # set is better at membership tests + stop_set = set(stop_list) + q = collections.deque([start]) + found_nodes = {start} + found_reaches = set() + while len(q) > 0: + current_node = q.popleft() + # query for reaches that end at current_node. + # checks for q or stop_list has to be done on the node, + # because the reaches have to be added, even if the node should not be added to the list, + reaches = self.reach_df.query( + "end == @current_node " + ) + for reach_name in reaches.index: + found_reaches.add(reach_name) + for next_node in reaches["start"]: + if not next_node in stop_set or next_node in found_nodes: + found_nodes.add(next_node) + q.append(next_node) + return found_nodes, found_reaches + + + + def find_shortest_path_reach(self,start_reach:str, end_reach:str): + """Find the shortest path (in number of links!) from start to end. + Return the path as a list of reaches""" + # Standard breadth first search. + # Since we only care about reaches, those are treated as the nodes of the graph, + # the edges are reach-pairs where start and end match. + q = collections.deque([start_reach]) + explored = {start_reach} + parents = {} + while len(q) > 0: + current_reach = q.popleft() + if current_reach == end_reach: + break + # Note that this variable is used in query() + current_end_node = self.reach_df.loc[current_reach, "end"] + for next_reach in self.reach_df.query("start==@current_end_node").index: + if not next_reach in explored: + explored.add(next_reach) + parents[next_reach] = current_reach + q.append(next_reach) + if current_reach != end_reach: + raise PathNotFoundException(f"No path found from {start} to {end}") + path = [current_reach] + while path[-1] != start_reach: + path.append(parents[path[-1]]) + return list(reversed(path)) + + + def find_shortest_path(self, start:str, end:str): + """Find the shortest path (in number of links!) from start to end. + Return the path as a list of reaches""" + # Standard breadth first search. + q = collections.deque([start]) + # set of explored nodes, those are not visited again + explored = {start} + # for each visited node, save the reach from which the node was visited. + # this is later used to follow the path back + to_parent = {} + while len(q) > 0: + current_node = q.popleft() + if current_node == end: + break + for reach, next_node in self.reach_df.query("start==@current_node")["end"].items(): + if not next_node in explored: + explored.add(next_node) + to_parent[next_node] = reach + q.append(next_node) + if current_node != end: + raise PathNotFoundException(f"No path found from {start} to {end}") + reach = to_parent[current_node] + reach_start = self.reach_df.loc[reach,"start"] + path = [reach] + while reach_start != start: + reach = to_parent[reach_start] + reach_start = self.reach_df.loc[reach,"start"] + path.append(reach) + return list(reversed(path)) + + def find_next_upstream_nodes(self,subset:typing.Iterable[str]) -> typing.Dict[str,set[str]]: + "For each node in subset find the other nodes from subset that can be reached upstream, without going over another node from subset." + # set is better for member checks + subset = set(subset) + next_subset_nodes = {} + for s in subset: + next_subset_nodes[s] = set() + q = collections.deque([s]) + explored = {s} + while len(q) > 0: + # Note that this variable is used in query() + current_node = q.popleft() + for next_node in self.reach_df.query("end==@current_node and not start in @explored")["start"]: + explored.add(next_node) + if next_node in subset: + # add to next_structures. + # do not add to q, so that exploration stops here. + next_subset_nodes[s].add(next_node) + else: + q.append(next_node) + return next_subset_nodes \ No newline at end of file diff --git a/py_package/src/entfrachten/analysis/drainage_sysytem_data.py b/py_package/src/entfrachten/analysis/drainage_sysytem_data.py new file mode 100644 index 0000000..f4d5433 --- /dev/null +++ b/py_package/src/entfrachten/analysis/drainage_sysytem_data.py @@ -0,0 +1,315 @@ +from warnings import warn +import subprocess +from multiprocessing.connection import Client, Connection +from time import sleep +import random + +import sqlalchemy as sa +import pandas as pd +import numpy as np +import numpy.typing as npt +import scipy +import mikeio1d as mi + + +class DrainageSystemData: + """Object for static drainage system data. Takes care of reading and holding data. Also some calculations.""" + + # Volume is checked to be <=VOLUME_CHECK_TOLERANCE_FACTOR*full_volume, + # to make sure that the values are not completely wrong. The tolerance is used because it may be slightly off. + VOLUME_CHECK_TOLERANCE_FACTOR = 1.1 + + def __init__(self, model_sql_path: str, r1d_path: str) -> None: + """Load data. + Args: + model_sql_path (str): path to the sql of the mike model. + r1d_path (str): path to a result file. + """ + # model data is saved as a sqlite, which we can read from + self.model_db = sa.create_engine("sqlite:///" + model_sql_path) + # Nodes and reaches are read from the res1d file. + # A reach in this file can be any connection between nodes, including weirs, pumps, etc. Therefore this is simpler than the model sqlite. + # for reference on reading the data see: + # https://github.com/DHI/mikeio1d/discussions/53#discussion-5231633 + self.nwr = mi.Res1D(r1d_path) + self.xs_reader = _connect_xs_reader(model_sql_path) + self.catchment_df = _read_catch(self.model_db) + self.node_df = _read_nodes(self.nwr, self.model_db) + # this is later needed to get the start and end nodes of reaches, where the r1d only contains an index as a number + self._r1d_node = self.node_df.reset_index()[["r1d_index", "name"]].set_index( + "r1d_index" + )["name"] + self.manhole_df = _read_manhole(self.model_db) + self.basin_df, self.basin_geometry = _read_basin(self.model_db) + self.node_df["full_volume"] = 0.0 + self.node_df["full_volume"].update(self.manhole_df["full_volume"]) + self.node_df["full_volume"].update(self.basin_df["full_volume"]) + self.reach_df = _read_reach(self.nwr, self._r1d_node, self.xs_reader) + self.weir_df = _read_weir(self.model_db) + + def calc_node_volume(self, node:str, water_level:float): + """calculate volume for a node at specified water level + + Args: + node (str): name of the node + water_level (float): water_level (m), zero point is not the node bottom, but a general one + + Returns: + float: volume (m³) + """ + node_type = self.node_df.loc[node, "type"] + if node_type == "Manhole": + effective_water_level = np.clip( + water_level, + self.manhole_df.loc[node, "bottom_level"], + self.manhole_df.loc[node, "ground_level"], + ) + volume = ( + (effective_water_level - self.manhole_df.loc[node, "bottom_level"]) + * (self.manhole_df.loc[node, "diameter"] / 2) ** 2 + * np.pi + ) + assert ( + volume + <= self.VOLUME_CHECK_TOLERANCE_FACTOR + * self.manhole_df.loc[node, "full_volume"] + ), f"Volume for node {node} too large. Got {volume} expected<={self.manhole_df.loc[node,'full_volume']}" + elif node_type == "Basin": + bg = self.basin_geometry.query("node_name==@node") + volume = (bg["h"].clip(upper=water_level).diff() * bg["surface_area"]).sum() + assert ( + volume + <= self.VOLUME_CHECK_TOLERANCE_FACTOR + * self.basin_df.loc[node, "full_volume"] + ), f"Volume for node {node} too large. Got {volume} expected<={self.basin_df.loc[node,'full_volume']}" + else: + volume = 0 + return volume + + def calc_reach_volume(self, reach, water_level): + """calculate volume for a reach at specified water level. Assume that the water is flat. + + Args: + reach (str): name of the reach + water_level (float): water_level (m), zero point is not the reah bottom, but a general one + + Returns: + float: volume (m³) + """ + # get volume by numericaly integration the cross sections over the reach length + STEP_SIZE = 1.0 + reach_length = self.reach_df.loc[reach, "length"] + chainages_desired = np.append( + np.arange(0, reach_length, STEP_SIZE), [reach_length] + ) + self.xs_reader.send(("xs_areas_at", reach, water_level, chainages_desired)) + rsp = self.xs_reader.recv() + if isinstance(rsp, Exception): + warn(f"Could not read volume for {reach}. {rsp}") + return 0 + else: + chainages, areas = np.array(rsp) + area_finite = np.isfinite(areas) + volume = scipy.integrate.trapezoid( + y=areas[area_finite], x=chainages[area_finite] + ) + assert ( + volume + <= self.VOLUME_CHECK_TOLERANCE_FACTOR + * self.reach_df.loc[reach, "full_volume"] + ), f"Volume for reach {reach} too large. Got {volume} expected<={self.reach_df.loc[reach,'full_volume']}" + return volume + + def xs_area_at( + self, reach: str, waterlevel: float, chainages: list[float] + ) -> tuple[npt.NDArray[np.floating], npt.NDArray[np.floating]]: + self.xs_reader.send(('xs_areas_at', reach, waterlevel, chainages)) + rsp = self.xs_reader.recv() + if isinstance(rsp,Exception): + raise rsp + else: + return rsp + + +def _connect_xs_reader(model_sql_path: str): + """Conect to xs_reader""" + # For data on pipe cross sections, we need to use the mike1d api. + # This does not seem to work when also importing mike1d. + # Therefore use a multiprocessing.Listener, which runs in its own process with its own imports, and reads the file. + + # address for connection to reader. Getting a free port is somewhat complicated, but just using a random one should usually work. + reader_address = ("localhost", random.randint(6000, 6100)) + auth_key = b"reach_reader" + # Start the listener in its own console window. + # normal + # subprocess.Popen( + # [ + # "python", + # "-m", + # "entfrachten.analysis.cross_section_info_loader", + # model_sql_path, + # "-a", + # str(reader_address), + # ], + # creationflags=subprocess.CREATE_NEW_CONSOLE, + # ) + # for keeping cmd open if an error occurs use this, only for windows + subprocess.Popen( + f'cmd /k "python -m entfrachten.analysis.cross_section_info_loader "{model_sql_path}" -a "{reader_address}""', creationflags=subprocess.CREATE_NEW_CONSOLE + ) + # Listener may take some time to start, try to connect until it is accepted + for n_try in range(20): + try: + # try to connect to server + xs_reader = Client(reader_address, authkey=auth_key) + break + except (ConnectionRefusedError, FileNotFoundError) as e: + # give server some time to start + sleep(3) + else: + raise Exception( + "Could not connect to listener. Check printed messages and that connection detail match up with cross_section_info_loader.py" + ) + return xs_reader + + +def _read_catch(model_db: sa.Engine): + # get catchment data from sqlite + # msm_catchment contains some basic data. msm_catchcon defines where catchments are connected to. + # note that area unit is m² + with model_db.begin() as con: + catchment_df = pd.read_sql( + """ + SELECT catchid as id, nodeid as node, linkid as reach, area, ModelAImpArea as impervious_ratio , persons + FROM msm_catchcon LEFT JOIN msm_catchment ON msm_catchcon.catchid=msm_catchment.muid + WHERE enabled=1""", + con, + ) + catchment_df["impervious_area"] = ( + catchment_df["area"] * catchment_df["impervious_ratio"] + ) + return catchment_df + + +def _read_nodes(nwr: mi.Res1D, model_db: sa.Engine) -> pd.DataFrame: + # for accessing data see: https://github.com/DHI/mikeio1d/discussions/53 + node_records = [] + for i, node in enumerate(list(nwr.data.Nodes)): + node_records.append( + { + "name": node.ID, + "r1d_index": i, + "type": str(node.GetType()).split(".")[-1].removeprefix("Res1D"), + "x": node.XCoordinate, + "y": node.YCoordinate, + } + ) + node_df = pd.DataFrame.from_records(node_records) + node_df = node_df.set_index("name") + # some data is not in the res1d, use sqlite + with model_db.begin() as con: + node_sqlite = pd.read_sql( + """ + SELECT muid as name, groundlevel as ground_level, invertlevel as bottom_level + FROM msm_Node""", + con, + ).set_index("name") + node_df = node_df.join(node_sqlite) + return node_df + + +def _read_manhole(model_db: sa.Engine) -> pd.DataFrame: + # read data in manholes. Those are denoted by typeno 1. + with model_db.begin() as con: + manhole_df = pd.read_sql( + """ + SELECT muid as node_name, groundlevel as ground_level, invertlevel as bottom_level, diameter + FROM msm_Node + WHERE typeno=1""", + con, + ).set_index("node_name") + manhole_df["full_volume"] = ( + (manhole_df["ground_level"] - manhole_df["bottom_level"]) + * (manhole_df["diameter"] / 2) ** 2 + * np.pi + ) + return manhole_df + + +def _read_basin(model_db: sa.Engine) -> tuple[pd.DataFrame, pd.DataFrame]: + # read basin data + # typeno=2 denotes basins. The geometry is saved in an extra table + with model_db.begin() as con: + basin_df = pd.read_sql( + """ + SELECT msm_node.muid as node_name, groundlevel as ground_level, invertlevel as bottom_level + FROM msm_node + WHERE msm_node.typeno=2""", + con, + ).set_index("node_name") + # the geometry is saved in an extra table. This table is very generic, therefore columns are called value1, value2 etc. + # The important columns for us are value1, which contains the height, and value3, which contains the surface area at the height of the same row. + # Which column is which can be seen from the tooltips in Mike+. + basin_geometry = pd.read_sql( + """ + SELECT msm_node.muid as node_name, msm_node.geometryid as geometry_id, ms_tabd.value1 as h, ms_tabd.value3 as surface_area + FROM msm_node LEFT JOIN ms_tabd ON msm_node.geometryid=ms_tabd.tabid + WHERE msm_node.typeno=2 + ORDER BY node_name,h""", + con, + ).set_index("node_name") + basin_df["full_volume"] = ( + ( + basin_geometry["h"].groupby(level="node_name").diff() + * basin_geometry["surface_area"] + ) + .groupby(level="node_name") + .sum() + ) + basin_df["geometry_id"] = ( + basin_geometry["geometry_id"].groupby(level="node_name").first() + ) + return basin_df, basin_geometry + + +def _read_reach( + nwr: mi.Res1D, r1d_node: pd.Series, xs_reader: Connection +) -> pd.DataFrame: + reach_records = [] + for reach in list(nwr.data.Reaches): + reach_records.append( + { + "name": reach.Name, + "start": r1d_node[reach.StartNodeIndex], + "end": r1d_node[reach.EndNodeIndex], + "length": reach.Length, + } + ) + reach_df = pd.DataFrame.from_records(reach_records).set_index("name") + # add type from name, everything other than link has a prefix "{type}:" + has_colon = reach_df.index.to_series().str.contains(":") + reach_df.loc[~has_colon, "type"] = "Link" + reach_df.loc[has_colon, "type"] = reach_df.index.to_series().str.split( + ":", expand=True + )[0] + assert reach_df.index.is_unique + # add cross_section area of reach + for reach in reach_df.index: + xs_reader.send(("full_xs_area", reach)) + a = xs_reader.recv() + if not isinstance(a, float): + a = 0 + reach_df.loc[reach, "xs_area"] = a + reach_df.loc[reach, "full_volume"] = reach_df.loc[reach, "length"] * a + return reach_df + + +def _read_weir(model_db: sa.Engine) -> pd.DataFrame: + with model_db.begin() as con: + weir_df = pd.read_sql( + """ + SELECT muid as id, 'Weir:'||muid as reach, fromnodeid as start, tonodeid as end, crestlevel as crest_level + FROM msm_weir;""", + con, + ).set_index("id") + return weir_df diff --git a/py_package/src/entfrachten/analysis/dynamic_analysis.py b/py_package/src/entfrachten/analysis/dynamic_analysis.py new file mode 100644 index 0000000..b8ab632 --- /dev/null +++ b/py_package/src/entfrachten/analysis/dynamic_analysis.py @@ -0,0 +1,320 @@ +"""Module with helper methods for dynamic analysis, i.e data that changes with time, like flow and concentrations. """ +import typing + +import pandas as pd +import numpy as np +import scipy.signal + +from entfrachten.analysis import unit_factors as uf + +_REDUCTION_FACTOR = 0.75 +ids = pd.IndexSlice +# Parts that are considered for how water volume and tss mass are distributed. +dist_agg_parts = [ + "inflow", + "storage", + "overflow", + "outflow", +] + +# size of slices for volume sections +slice_mm = 1 + +def _calc_timestep_seconds(s:pd.Series): + """Return series with timestep in seconds. Assume that the index levels are [place, event, datetime].""" + return ( + s.index.to_frame()["datetime"] + .groupby(["place", "event"]) + .diff() + .bfill() + .dt.total_seconds() + ) + + +def calc_raindepth(volume: pd.Series, sealed_area: float) -> pd.Series: + """Convert volume to corresponding accumulated rain depth""" + # Calculate the rainfall depth + # m³/m2=m, multiply with 1000 to get mm + return volume / sealed_area / _REDUCTION_FACTOR * 1000 + + +def calc_volume_from_raindepth(rain_depth: pd.Series, sealed_area: float) -> pd.Series: + """Convert volume to corresponding accumulated rain depth""" + # Calculate the rainfall depth + # m³/m2=m, multiply with 1000 to get mm + return rain_depth * sealed_area * _REDUCTION_FACTOR / 1000 + + +def volume_sections_from_raindepth( + volume: pd.Series, sealed_area: float +) -> pd.Series: + """Make sections based on volume converted to rain depth and returns the upper bound for volume(m³) in the section for each entry in volume. + This is intended to make comparable volume sections for all storages.""" + rain_depth = calc_raindepth(volume, sealed_area) + rain_slice = np.floor(rain_depth / slice_mm) + rain_slice_end = rain_slice * slice_mm + slice_mm + volume_ubound = calc_volume_from_raindepth(rain_slice_end, sealed_area) + return volume_ubound + + +def sections_from_raindepth( + volume: pd.Series, sealed_area: float +) -> pd.Series: + """Make sections based on volume converted to rain depth and returns the upper bound for rain depth (mm) in the section for each entry in volume. + This is intended to make comparable volume sections for all storages.""" + rain_depth = calc_raindepth(volume, sealed_area) + rain_slice = np.floor(rain_depth / slice_mm) + rain_slice_end = rain_slice * slice_mm + slice_mm + return rain_slice_end + + +def volume_dist( + inflow: pd.Series, + outflow: pd.Series, + overflow: pd.Series, + storage_volume: pd.Series, + sub_storage_volume: pd.Series, + sub_attribute_to: typing.Mapping, +): + """Calculate accumulated volume for all parts. Substorages get extra columns in the output.""" + # inflow ---> storage -----> outflow + # ==weir== + # | + # V + # overflow + # timestep in seconds + timestep_sec = _calc_timestep_seconds(inflow) + # Ignore negative flow, we want to group by incoming volume. Water going backwards (i.e out) is ignored. + in_volume = inflow.clip(lower=0).groupby(["place", "event"]).cumsum() * timestep_sec + out_volume = outflow.groupby(["place", "event"]).cumsum() * timestep_sec + overflow = overflow.reindex_like(inflow).fillna(0) + over_volume = overflow.groupby(["place", "event"]).cumsum() * timestep_sec + # dist storage, use attribute_to as place and put each actual place into a new column. + # This is not efficient, but makes it simpler to arrange the fitting data points. + sub_storage_volume = ( + sub_storage_volume.copy() + .rename("volume") + .reset_index() + .rename(columns={"place": "storage_part"}) + ) + sub_storage_volume["place"] = sub_storage_volume["storage_part"].map( + sub_attribute_to + ) + sub_storage_volume = ( + sub_storage_volume.set_index(["storage_part", "place", "event", "datetime"]) + .unstack("storage_part") + .droplevel(0, axis="columns") + ) + assert ( + len( + sub_storage_volume.columns.intersection( + ["inflow", "storage", "overflow", "outflow"] + ) + ) + == 0 + ) + volume_dist = pd.DataFrame( + { + "inflow": in_volume, + "storage": storage_volume, + "overflow": over_volume, + "outflow": out_volume, + } + | {c: sub_storage_volume[c] for c in sub_storage_volume.columns}, + index=in_volume.index, + ) + volume_dist.columns.name = "part" + return volume_dist + + +def load_dist( + inflow_transport: pd.Series, + outflow_transport: pd.Series, + overflow_transport: pd.Series, + storage_volume: pd.Series, + storage_conc: pd.Series, + sub_storage_volume: pd.Series, + sub_storage_conc: pd.Series, + sub_attribute_to: typing.Mapping, +) -> pd.DataFrame: + """Calculate accumulated load for all parts. Substorages get extra columns in the output.""" + timestep_sec = _calc_timestep_seconds(inflow_transport) + # calculate these parts from transport accumulation + in_load = inflow_transport.groupby(["place", "event"]).cumsum() * timestep_sec + out_load = outflow_transport.groupby(["place", "event"]).cumsum() * timestep_sec + overflow_transport = overflow_transport.reindex_like(outflow_transport).fillna(0) + overflow_load = overflow_transport.groupby(["place", "event"]).cumsum() * timestep_sec + + # load in storages is calculated from concentration and volume + storage_load = storage_volume * uf.MGPL_TO_KGPM3 * storage_conc + # like for the unused storage analysis volume, we need to handle the fact that + # some storages are made up of multiple parts, which are defined in storage_dist + sub_storage = sub_storage_volume * uf.MGPL_TO_KGPM3 * sub_storage_conc + # now merge l_s_sp according to dist_storage[attribute_to] into l_s + # same procedure as for unused storages + # reform attribute_to to have a values for each row of dist_storage_load + long_attribute_to = sub_storage.index.get_level_values("place").map(sub_attribute_to) + # 'as_index=False' is not allowed on series. It is also not needed here, since this operation seems to leave the index as is. + for storage, dist_storage_load_part in sub_storage.groupby( + long_attribute_to + ): + # index slice of volume to insert data into l_s + to_slice = ids[storage, :, :] + # sum loads up, per event and datetime pair, i.e sum over the places + l_sum = dist_storage_load_part.groupby(["event", "datetime"]).sum() + # add attribute_to as index level (by misusing concat) and reorder to match l_s + l_sum = pd.concat([l_sum], keys=[storage], names=["place"]) + l_sum = l_sum.reorder_levels(storage_load.index.names) + storage_load.loc[to_slice] = l_sum + # now that l_s_sp was aggregated into l_s, change its form to also make the detailed values available, same as for volume + sub_storage = ( + sub_storage.rename("load").reset_index().rename(columns={"place": "storage_part"}) + ) + sub_storage["place"] = sub_storage["storage_part"].map(sub_attribute_to) + sub_storage = ( + sub_storage.set_index(["storage_part", "place", "event", "datetime"]) + .unstack("storage_part") + .droplevel(0, axis="columns") + ) + load_dist = pd.DataFrame( + { + "inflow": in_load, + "storage": storage_load, + "overflow": overflow_load, + "outflow": out_load, + } + | {c: sub_storage[c] for c in sub_storage.columns}, + index=in_load.index, + ) + load_dist.columns.name = "part" + return load_dist + +def volume_load_dist_delta(volume_dist,load_dist,sealed_area_m2): + """Calculates changes over volume sections from accumulated load and volume.""" + v_in = volume_dist["inflow"] + # note that this function assigns a section to every value in the input + volume_sections = pd.Series(index=v_in.index, dtype=v_in.dtype) + section_sizes = pd.Series(index=sealed_area_m2.index, dtype=float, data=None) + # the area in m2 is later needed to convert volume to a corresponding rain depth + + for p in volume_sections.index.get_level_values("place").unique(): + # using p instead of [p] would make the assignment fail, probably because the version with [p] removes p from the index + sections_p = volume_sections_from_raindepth(v_in.loc[[p], :, :], sealed_area_m2[p]) + volume_sections.loc[[p], :, :] = sections_p + section_sizes[p] = sections_p.iloc[0] + assert (volume_sections > 0).all() + # check where the volume sections are not monotonic increasing, i.e incoming volume decreases enough to be in a smaller section + is_increasing = volume_sections.groupby(["place","event"]).is_monotonic_increasing + if ~is_increasing.sum()>0: + print("volume sections not monotonic increasing:") + print((~is_increasing).groupby("place").sum()) + # get indices where v_in is nearest to the section value per place, event and section. + section_delta = (volume_sections - v_in).abs() + section_idx = ( + pd.DataFrame( + { + "delta": section_delta, + "volume_section": volume_sections, + } + ) + .groupby(["place", "event", "volume_section"]) + .idxmin()["delta"] + ) + # first get the accumulated values + # some values (like storage) may no start at 0, correct that + load_dist_acc = load_dist-load_dist.groupby(["place","event"]).transform("first") + volume_dist_acc = volume_dist-volume_dist.groupby(["place","event"]).transform("first") + load_dist_acc = load_dist_acc.loc[section_idx, :] + volume_dist_acc = volume_dist_acc.loc[section_idx, :] + # section_idx values are where the values are found, the index contains the volume sections + load_dist_acc.index = section_idx.index + volume_dist_acc.index = section_idx.index + # now convert to difference + # diff is nan for first value. here we insert the original value, since we can assume 0 as first value + load_dist_d = load_dist_acc.groupby(["place", "event"]).diff().fillna(load_dist_acc) + volume_dist_d = ( + volume_dist_acc.groupby(["place", "event"]).diff().fillna(volume_dist_acc) + ) + # compare intended section size to the actual changes in volume + # this is expected to not always match because the timesteps may not match up with desired volume changes. + section_sizes_each_section = volume_dist_d.index.get_level_values("place").map(section_sizes) + in_section_ratio = volume_dist_d["inflow"]/section_sizes_each_section + print("actual inflow change/desired section size") + print(in_section_ratio.describe()) + # correct values + volume_dist_d=volume_dist_d.div(in_section_ratio,axis="index") + load_dist_d=load_dist_d.div(in_section_ratio,axis="index") + return volume_dist_d, load_dist_d + +def consecutive_true_size(bool_series: pd.Series): + """Calculate the maximum size of consecutive true values in bool_series. + Size is calculated from the differences of index-values, i.e for a single true value it will be zero. + For int index size is int, for Timestamp size is timedelta, etc. + Can be used for calculating event or gap durations. + """ + prev_true = bool_series.shift(1, fill_value=False) + next_true = bool_series.shift(-1, fill_value=False) + first_true = bool_series & (~prev_true) + last_true = bool_series & (~next_true) + + times = bool_series.index.get_level_values("datetime") + first_true_pos = pd.Series( + data=pd.NaT, index=bool_series.index, + ) + last_true_pos = pd.Series( + data=pd.NaT, index=bool_series.index, + ) + + first_true_pos[first_true] = times[first_true] + last_true_pos[last_true] = times[last_true] + + first_true_pos = first_true_pos.ffill() + last_true_pos = last_true_pos.bfill() + + true_size = last_true_pos - first_true_pos + # set size to nan where bool_series is false + true_size[~bool_series] = None + return true_size + +def calc_lag(a:pd.Series,b:pd.Series,window_steps:int,vmin): + """Calculate the lag from window wise cross correlation. + vmin is used to ignore parts where a and b are lower than vmin, there nan is returned + Returns: The lag in number of timesteps""" + assert a.index.equals(b.index) + a_numpy = a.to_numpy().astype("double") + b_numpy = b.to_numpy().astype("double") + # set the maximum amount of lag in number of timesteps + # set size of the window as distance from the middle to the edges. + win_pad = window_steps + # save lag for each point in a series. + lag = pd.Series( + data=np.nan, + index=a.index, + name="lag", + dtype=float, + ) + # calculate lag for each position, if one of the values is larger than vmin + for t in range(len(lag)): + if (a_numpy[t] >= vmin) or (b_numpy[t] >= vmin): + # Start and end of windwow, make sure indices are inside the valid range. + win_start = max(0, t - win_pad) + win_end = min(t + win_pad, len(a_numpy) - 1) + # extract window data + res_part = a_numpy[win_start : win_end + 1] + ref_part = b_numpy[win_start : win_end + 1] + # use scyipy.signal to calculate correlation + correlation_mode: str = "same" + correlation = scipy.signal.correlate( + res_part, ref_part, mode=correlation_mode + ) + # signal.correlation_lags tells us, how the index of the correlation result matches a lag in the original data. + # For our cases the lags shoudl just be and array of the from [-n,...,0,...,n], but this method handles edge cases + # and different correlation_modes for us. + all_lags = scipy.signal.correlation_lags( + res_part.size, ref_part.size, mode=correlation_mode + ) + # use is_lag_valid to only search lags smaller than the limit for the best result. + is_lag_valid = np.abs(all_lags) <= window_steps + idx_lag = all_lags[is_lag_valid][np.argmax(correlation[is_lag_valid])] + lag.iloc[t] = idx_lag + return lag \ No newline at end of file diff --git a/py_package/src/entfrachten/analysis/sewer_map.py b/py_package/src/entfrachten/analysis/sewer_map.py new file mode 100644 index 0000000..b0301bf --- /dev/null +++ b/py_package/src/entfrachten/analysis/sewer_map.py @@ -0,0 +1,257 @@ +"""Module to make folium maps of the drainage system.""" +# TODO less copy paste +import collections +from warnings import warn +import numpy as np +import pandas as pd +import geopandas as gpd +import shapely +import shapely.affinity as saf +import folium +import branca +import matplotlib as mpl + + +from tqdm import tqdm + +from entfrachten.analysis.drainage_sysytem_data import DrainageSystemData +from entfrachten.analysis.drainage_system_graph import DrainageSystemGraph + + +# colormap for volume +_invalid_value_color = "#999999" +_volume_over_color = "#000000" +_volume_cmap = mpl.colormaps.get_cmap("plasma_r").copy() +_volume_cmap.set_bad(_invalid_value_color) +_volume_cmap.set_under(_invalid_value_color) +_volume_cmap.set_over(_volume_over_color) + + + +def _map_df(sd:DrainageSystemData) -> gpd.GeoDataFrame: + """Prepares a GeoDataframe for use as a map.""" + # to_geopandas translates the network to a geopandas dataframe. + # This is very handy for visualization. + map_df = sd.nwr.network.to_geopandas() + map_df = map_df.merge(sd.node_df[["type"]],left_on="name",right_on="name",how="left") + is_reach = map_df["group"]=="Reach" + has_colon = map_df["name"].str.contains(":") + map_df.loc[is_reach&~has_colon,"type"]="Link" + map_df.loc[is_reach&has_colon,"type"]=map_df["name"].str.split(":",expand=True)[0] + return map_df # type: ignore + +def type_map(sd:DrainageSystemData,storage_df:pd.DataFrame): + """Draw a map of the network colored by the type of the element.""" + color_map = { + "Manhole":"#0000ff", + "Basin":"#00ff00", + "SewerJunction":"#888888", + "Outlet":"#ff0000", + "Link":"#0000ff", + "Weir":"#ff0000", + "Valve":"#8888ff", + "Pump":"#ffff00", + "Orifice":"#000000", + } + map_df=_map_df(sd) + map_df["color"]=map_df["type"].map(color_map) + # draw interesting parts larger + interesting = ((map_df["type"]!="Link")&(map_df["type"]!="Manhole")&(map_df["type"]!="SewerJunction"))|map_df["name"].isin(storage_df["node"]) + m = map_df.loc[~interesting,:].explore(color=map_df.loc[~interesting,"color"],marker_kwds={"radius":1.5}) + map_df.loc[interesting,:].explore(color=map_df.loc[interesting,"color"],marker_kwds={"radius":6},m=m) + _add_storage_markers(m, map_df[map_df["group"] == "Node"], storage_df) + return m + + +def _add_storage_markers(m,node_gdf,storage_df): + """Add a marker for storage nodes.""" + node_gdf=node_gdf.reset_index() + node_icon = folium.Icon(icon_color="green", icon="tint") + storage_map_df = node_gdf[ + node_gdf["name"].isin(storage_df["node"]) + ] + for s, n in storage_df["node"].items(): + storage_map_df.loc[node_gdf["name"] == n, "storage"] = s + storage_map_df.explore( + m=m, marker_type="marker", marker_kwds={"icon": node_icon, "radius": 4} + ) + +def partition_map(sd:DrainageSystemData,storage_df:pd.DataFrame,graph:DrainageSystemGraph): + """Draw the partition of network that is used to sum up data.""" + neutral_color = "#AAAAAA" + mix_color = "#000000" + map_df = _map_df(sd) + map_df["color"] = neutral_color + color_cycle = [ + "#FBB3C1", + "#E7C095", + "#B9CF8E", + "#81D7B4", + "#79D4E1", + "#B6C5F8", + "#ECB5EB", + "#A86371", + "#95703F", + "#6A7F36", + "#158764", + "#008591", + "#6576A8", + "#9C639B", + ] + map_df["storages"] = "{" + node_storage = storage_df.reset_index()[["node","name"]].set_index("node")["name"] + if len(storage_df)<=len(color_cycle): + warn("Not enough colors to show all storages!") + for n, c in tqdm(list(zip(storage_df["node"], color_cycle))): + s = node_storage[n] + next_up_nodes, next_up_reaches = graph.find_upstream_nodes_reaches( + n, storage_df["node"] + ) + n_match = (map_df["name"].isin(next_up_nodes) & (map_df["group"] == "Node")) | ( + map_df["name"].isin(next_up_reaches) & (map_df["group"] == "Reach") + ) + map_df.loc[n_match, "storages"] += "," + s + is_colored = map_df["color"] != neutral_color + map_df.loc[n_match, "color"] = c + map_df.loc[n_match & is_colored, "color"] = mix_color + map_df["storages"] += "}" + is_storage = map_df["name"].isin(storage_df["node"]) & (map_df["group"] == "Node") + map_df_fat = map_df.loc[is_storage] + map_df_thin = map_df.loc[~is_storage] + m = map_df_thin.explore( + color=map_df_thin["color"], marker_kwds={"radius": 1.5}, tiles="Cartodb Positron" + ) + map_df_fat.explore(color=map_df_fat["color"], marker_kwds={"radius": 6}, m=m) + _add_storage_markers(m,map_df,storage_df) + return m + + +def _vol_to_col(v, vmax): + """Map volume to color.""" + assert np.ndim(v) == 0, f"{v} not scalar" + if v <= 0: + return _invalid_value_color + rgba = _volume_cmap(v / vmax) + return mpl.colors.rgb2hex(rgba) + + +def volume_map(sd:DrainageSystemData,storage_df): + """Make a map that shows reach cross section and node volume.""" + # convert values to this crs so that coordinates are in meters + metric_crs = "EPSG:25834" + reach_max_width = 10 + node_max_radius = 20 + normalization_quantile = 0.99 + max_xs = sd.reach_df["xs_area"].quantile(normalization_quantile) + node_gdf = sd.nwr.nodes.to_geopandas().to_crs("EPSG:25834").set_index("name") + node_gdf = node_gdf.drop(columns="group").join( + sd.node_df.drop(columns=["x", "y", "r1d_index"]) + ) + # make a version of node_gdf where the area is proportional to the volume + node_vol_gdf = node_gdf.copy() + node_vol_gdf["full_volume"] = 0.0 + min_radius = 0.5 + node_vol_gdf["map_radius"] = min_radius + for n in node_vol_gdf.index: + node_vol_gdf.loc[n, "full_volume"] = sd.calc_node_volume( + n, node_vol_gdf.loc[n, "ground_level"] + ) + max_node_vol = node_vol_gdf["full_volume"].quantile(normalization_quantile) + for n in node_vol_gdf.index: + # a==v <=> r**2==v <=> r == v**0.5 + t = node_vol_gdf.loc[n, "full_volume"] + node_vol_gdf.loc[n, "map_radius"] = max( + (node_max_radius * t / max_node_vol) ** 0.5, min_radius + ) + + node_vol_gdf.geometry = node_vol_gdf.geometry.buffer( + node_vol_gdf["map_radius"], resolution=8 + ) + reach_vol_gdf = gpd.GeoDataFrame( + data=sd.reach_df, geometry=[None] * len(sd.reach_df), crs=metric_crs + ) + for r in reach_vol_gdf.index: + # make a triangle where one point is at end and one side at start + # area should be proportional to volume + # a~v <=> w*l ~ xs*l <=> w~xs + s = node_gdf.geometry.loc[reach_vol_gdf.loc[r, "start"]] + e = node_gdf.geometry.loc[reach_vol_gdf.loc[r, "end"]] + width = reach_max_width * sd.reach_df.loc[r, "xs_area"] / max_xs + scale = width / shapely.distance(s, e) + e_scaled = saf.scale(e, xfact=scale, yfact=scale, origin=s) + s1 = saf.rotate(e_scaled, 90, origin=s) + s2 = saf.rotate(e_scaled, -90, origin=s) + triangle = shapely.Polygon([s1, s2, e]) + reach_vol_gdf.loc[r, "geometry"] = triangle + # add colors + max_col_vol = pd.concat( + [node_vol_gdf["full_volume"], reach_vol_gdf["full_volume"]] + ).quantile(normalization_quantile) + node_vol_gdf["color"] = node_vol_gdf["full_volume"].apply(_vol_to_col, vmax=max_col_vol) + reach_vol_gdf["color"] = reach_vol_gdf["full_volume"].apply( + _vol_to_col, vmax=max_col_vol + ) + cm_pad = 0.1 + cm_samples = np.linspace(-cm_pad * max_col_vol, (1 + cm_pad) * max_col_vol, 400) + map_colormap = branca.colormap.LinearColormap( + [_vol_to_col(v, vmax=max_col_vol) for v in cm_samples], + vmin=cm_samples[0], + vmax=cm_samples[-1], + caption="Volume(m³)\n Area is proportional to volume, but not comparable for reaches and nodes", + ) + volume_map = node_vol_gdf.explore( + color=node_vol_gdf["color"], tiles="Cartodb Positron", popup=True, max_zoom=24 + ) + reach_vol_gdf.explore(color=reach_vol_gdf["color"], m=volume_map, popup=True) + _add_storage_markers(volume_map, node_gdf, storage_df) + map_colormap.add_to(volume_map) + return volume_map + + +def _time_to_col(t, vmax): + """Map time to color""" + time_cm = mpl.colormaps.get_cmap("viridis") + if vmax == 0 or vmax == pd.Timedelta("0s"): + return "#999999" + assert np.ndim(t) == 0, f"{t} not scalar" + rgba = time_cm(t / vmax) + return mpl.colors.rgb2hex(rgba) + + +def make_path_time_map(sd:DrainageSystemData, path, max_time=None): + """Draw flowtime on map, normalized with max from path. Reaches in Path are drawn thicker.""" + map_df_diff = _map_df(sd).set_index("name") + map_df_diff["length"] = sd.reach_df["length"] + map_df_diff["max_velocity"] = sd.reach_df["max_velocity"] + map_df_diff["flow_time(min)"] = sd.reach_df["flow_time"] / pd.Timedelta(minutes=1) + map_df_diff["flow_time_norm"] = ( + sd.reach_df["flow_time"] / pd.Timedelta(seconds=1) + ) / sd.reach_df["length"] + if max_time == None: + max_time = map_df_diff.loc[path, "flow_time_norm"].max() + map_df_diff["color"] = map_df_diff["flow_time_norm"].apply( + _time_to_col, vmax=max_time + ) + map_df_path = map_df_diff.loc[path].reset_index() + map_df_rest = map_df_diff.loc[map_df_diff.index.difference(path)].reset_index() + m = map_df_rest.explore( + color=map_df_rest["color"], + tiles="Cartodb Positron", + popup=True, + max_zoom=24, + style_kwds=dict(weight=2, opacity=0.7, fillOpacity=0.6), + marker_kwds=dict(radius=1, stroke=False, fill_opacity=0.2), + ) + map_df_path.explore( + color=map_df_path["color"], popup=True, style_kwds=dict(weight=5), m=m + ) + # add a colormap legend + cm_samples = np.linspace(0, max_time, 100) + map_colormap = branca.colormap.LinearColormap( + [_time_to_col(t, vmax=max_time) for t in cm_samples], + vmin=cm_samples[0], + vmax=cm_samples[-1], + caption="normalized flow time (s/m)", + ) + map_colormap.add_to(m) + return m diff --git a/py_package/src/entfrachten/analysis/simulation_reading.py b/py_package/src/entfrachten/analysis/simulation_reading.py new file mode 100644 index 0000000..500684b --- /dev/null +++ b/py_package/src/entfrachten/analysis/simulation_reading.py @@ -0,0 +1,361 @@ +"""module for reading and aggregating data from res1d files. tailored for structure_analysis.""" + +import typing +import pathlib +from warnings import warn + +import mikeio1d as mi +import mikeio1d.res1d as mir + +import numpy as np +import numpy.typing as npt +import pandas as pd +from tqdm import tqdm + +from entfrachten.analysis.drainage_sysytem_data import DrainageSystemData +from entfrachten.analysis import static_analysis, unit_factors as uf + + +def read_sim_data( + r1d_folder: pathlib.Path, + sd: DrainageSystemData, + storage_struct: pd.DataFrame, + sub_storage: pd.DataFrame, + events: list, + st_mass_balance_storages: list[str] = [] +) -> dict[str, pd.DataFrame]: + """ + Read and sum up the data from the simulation results in r1d_folder. + Arguments + r1d_folder: Folder where .res1d files are read from. + sd: Data on the drainage system, see drainage_sysytem_data.py. + storage_struct: Dataframe with information on storages. + sub_storage: Dataframe with information on sub_storages. + events: list of events, res1d files for each event are found by inserting the event into a hard coded pattern. + st_mass_balance_storages: if st data is read, these storages use a mass balance instead of concentration from outflow_reaches + """ + dataset_reaches: dict[str, pd.Series] = _create_reach_series(storage_struct) + # For the inflow, we also need to add the flow from catchments that connect into the storage space. + storage_catchments = {} + storage_struct["in_out_volume"] = np.nan + for s in storage_struct.index: + nodes_s, reaches_s = static_analysis.in_out_reach_subset( + sd, dataset_reaches["inflow"][[s]], dataset_reaches["outflow"][[s]] + ) + storage_catchments[s] = sd.catchment_df.query( + "reach in @reaches_s or node in @nodes_s" + )["id"].to_list() + r1d_catchment_filter = [c for l in storage_catchments.values() for c in l] + # max velocity has same index as reach_df, since we only need one value per reach + reach_agg = pd.DataFrame( + index=sd.reach_df.index, columns=["max_velocity"], data=np.nan + ) + # # collect data parts + df_parts = {k: [] for k in dataset_reaches.keys()} + df_parts["inflow_catchments"] = [] + # also collect parts for waterlevels, later use to calculate unused storage capacities + df_parts["storage_level"] = [] + df_parts["sub_storage_level"] = [] + # collect parts for afs at nodes + df_parts["storage_afs"] = [] + df_parts["sub_storage_afs"] = [] + # collect all reaches and nodes for filtering + r1d_reach_filter = list(set(str(r) for r in pd.concat(dataset_reaches))) + r1d_node_filter = storage_struct["node"].to_list() + sub_storage["node"].to_list() + for event in tqdm(events): + print("in loop for event "+str(event)) + # open files + # first check if ST or AD model + # Check if the ST model file exists + conc_path_st = r1d_folder / f"{event}BaseDefault_Network_ST.res1d" + conc_path_ad = r1d_folder / f"{event}BaseDefault_Network_AD.res1d" + # Automatically switches to reading st-data if the file is found. Maybe add a function parameter? + st_mode = conc_path_st.exists() + if st_mode: + conc_path = conc_path_st + print("In analysis of ST-module") + # we will later use those reaches to calculate the concentration at the corresponding storage, + # since the model for st does not output node concentration + sub_storage_out_reaches = sub_storage["outflow_reaches"].str.split(",", expand=True).stack().droplevel(-1) + sub_storage_in_reaches = sub_storage["inflow_reaches"].str.split(",", expand=True).stack().droplevel(-1) + storage_out_reaches = storage_struct["outflow_reaches"].str.split(",", expand=True).stack().droplevel(-1) + storage_in_reaches = storage_struct["inflow_reaches"].str.split(",", expand=True).stack().droplevel(-1) + all_storage_out_reaches = pd.concat([sub_storage_out_reaches,storage_out_reaches]) + all_storage_in_reaches = pd.concat([sub_storage_in_reaches,storage_in_reaches]) + # add to filter, so that the values are actually read later. + r1d_reach_filter = list(set(r1d_reach_filter).union(all_storage_out_reaches).union(all_storage_in_reaches)) + else: + conc_path = conc_path_ad + # now read hydraulic information + hd_path = r1d_folder / f"{event}BaseDefault_Network_HD.res1d" + conc_catch_path = ( + r1d_folder / f"{event}BaseDefault_Catchment_discharge_quality.res1d" + ) + hd_catch_path = r1d_folder / f"{event}BaseDefault_Catchment_discharge.res1d" + #check if data is available + if ( + conc_path.exists() + and hd_path.exists() + and conc_catch_path.exists() + and hd_catch_path.exists() + ): + print(f"found data for {event}") + else: + print(f"No data for {event}") + continue + res_conc = mir.Res1D( + str(conc_path), reaches=r1d_reach_filter, nodes=r1d_node_filter + ) + # we need velocities for all reaches, therefore do not limit the reaches to r1d_reach_filter + res_hd = mir.Res1D( + str(hd_path), + reaches=sd.reach_df.index.to_list(), + nodes=r1d_node_filter, + ) + res_catch_hd = mir.Res1D(str(hd_catch_path), catchments=r1d_catchment_filter) + res_catch_conc = mir.Res1D( + str(conc_catch_path), catchments=r1d_catchment_filter + ) + time_idx = res_conc.time_index.intersection(res_hd.time_index).intersection(res_catch_hd.time_index).intersection(res_catch_conc.time_index) + if len(time_idx)==0: + warn(f"Data for {event}, but times of files do not intersect") + continue + # read values + # where to read what + if st_mode: + col_res1d_quantity = [ + ("afs_transport", res_conc, "MassTransportTotal"), + ("discharge", res_hd, "Discharge"), + ] + else: + col_res1d_quantity = [ + ("afs_transport", res_conc, "AFSTransport"), + ("discharge", res_hd, "Discharge"), + ] + reach_start_end = {"outflow": "end", "inflow": "start", "overflow": "start"} + for dataset_name in dataset_reaches.keys(): + for s in storage_struct.index: + reaches_to_read = dataset_reaches[dataset_name].loc[ + dataset_reaches[dataset_name].index == s + ] + if len(reaches_to_read) >= 1: + part_df = read_reaches_sum( + reaches_to_read, + col_res1d_quantity, + reach_start_end[dataset_name], + ) + part_df = part_df.assign(event=event, place=s) + df_parts[dataset_name].append(part_df) + # read catchment values neglects the ST results by always using the AD results + catch_col_r1d_quantity = [ + ("afs_transport", res_catch_conc, "AFSLoadCD"), + ("discharge", res_catch_hd, "CatchmentDischarge"), + ] + + for s in storage_struct.index: + if len(storage_catchments[s]) >= 1: + part_df = read_catchments_sum( + storage_catchments[s], catch_col_r1d_quantity + ) + part_df = part_df.assign(event=event, place=s) + df_parts["inflow_catchments"].append(part_df) + + # read waterlevel for nodes and match it to the index of storage_df/special_storage + for sdf, part_name in [ + (storage_struct, "storage_level"), + (sub_storage, "sub_storage_level"), + ]: + water_level_part = res_hd.read( + [ + mir.QueryDataNode(name=node, quantity="WaterLevel") + for node in sdf["node"] + ] + ) + water_level_part = water_level_part.loc[ + :, "WaterLevel:" + sdf["node"] + ].copy() + water_level_part.index.name = "datetime" + water_level_part.columns = sdf.index.rename("place") + water_level_part = ( + water_level_part.stack().rename("waterlevel").reset_index() + ) + water_level_part["event"] = event + df_parts[part_name].append(water_level_part) + # read afs for nodes and match it to the index of storage_df/special_storage + for sdf, part_name in [ + (storage_struct, "storage_afs"), + (sub_storage, "sub_storage_afs"), + ]: + # for ST take concentration from the start of the out_reaches or if in side-line use a mass balance + if st_mode: + for s in sdf.index: + # for two sub storages the data from out_reaches was not usable, so a mass balance was used. + # + if s in st_mass_balance_storages: + # change up for sub_storage to use a mass balance + in_reaches_s = all_storage_in_reaches.loc[ + all_storage_in_reaches.index == s + ] + assert len(in_reaches_s) >= 1, f"No inflow reaches for storage {s}, required to read concentration from ST data." + # col_res1d_quantity is same as for other reaches. read at start to be near the node + # this reads tranport and discharge + afs_part = read_reaches_sum( + in_reaches_s, + col_res1d_quantity, + "start", + ) + # calculate concentration + afs_part["afs"]=(afs_part["afs_transport"].cumsum()/afs_part["discharge"].cumsum()*1000).clip(lower=0) + # discard discharge and concentration + afs_part=afs_part[["datetime","afs"]] + # add event and place + afs_part = afs_part.assign(event=event, place=s) + df_parts[part_name].append(afs_part) + else: + out_reaches_s = all_storage_out_reaches.loc[ + all_storage_out_reaches.index == s + ] + assert len(out_reaches_s) >= 1, f"No outflow reaches for storage {s}, required to read concentration from ST data." + # col_res1d_quantity is same as for other reaches. read at start to be near the node + # this reads tranport and discharge + afs_part = read_reaches_sum( + out_reaches_s, + col_res1d_quantity, + "start", + ) + # calculate concentration + afs_part["afs"]=(afs_part["afs_transport"]/afs_part["discharge"]*1000).clip(lower=0) + # discard discharge and concentration + afs_part=afs_part[["datetime","afs"]] + # add event and place + afs_part = afs_part.assign(event=event, place=s) + df_parts[part_name].append(afs_part) + # code for AD results + else: + afs_part = res_conc.read( + [mir.QueryDataNode(name=node, quantity="AFS") for node in sdf["node"]] + ) + afs_part = afs_part.loc[:, "AFS:" + sdf["node"]].copy() + afs_part.index.name = "datetime" + afs_part.columns = sdf.index.rename("place") + afs_part = afs_part.stack().rename("afs").reset_index() + afs_part["event"] = event + + df_parts[part_name].append(afs_part) + # read max velocity for event and update current overall maximum + for r in res_hd.reaches: + event_max_v = reach_max_velocity(r, res_hd, sd) + reach_agg.loc[r, "max_velocity"] = np.nanmax( + [reach_agg.loc[r, "max_velocity"], event_max_v] + ) + # turn parts with time series into dataset + dataset = _parts_to_dataset(df_parts) + # add reach_agg + dataset["reach_agg"] = reach_agg + return dataset + + +def _create_reach_series(storage_df): + """Extract information on which reaches need to be read from storage_df.""" + reach_series = {} + reach_series["outflow"] = ( + storage_df["outflow_reaches"].str.split(",", expand=True).stack().droplevel(-1) + ) + reach_series["inflow"] = ( + storage_df["inflow_reaches"].str.split(",", expand=True).stack().droplevel(-1) + ) + reach_series["overflow"] = storage_df["weir_reach"].dropna() + return reach_series + + +def read_reaches_sum( + reaches: list[str], + col_r1d_quantity: list[tuple[str, mi.Res1D, str]], + end_start: typing.Literal["start", "end"], +): + """Read and calculate sum of quantities from reaches. Since it just adds values, do not use this for concentration!""" + result_df = pd.DataFrame() + for col, r1d, quantity in col_r1d_quantity: + # get_reach_end_values returns an array + reach_end_arrays: list[npt.NDArray] = [] + for r in reaches: + assert quantity in r1d.quantities + assert r in r1d.reaches + if end_start == "end": + reach_end_arrays.append(r1d.get_reach_end_values(r, quantity)) + elif end_start == "start": + reach_end_arrays.append(r1d.get_reach_start_values(r, quantity)) + if len(reach_end_arrays) > 0: + result_df[col] = pd.Series(sum(reach_end_arrays),index=r1d.time_index) + else: + result_df[col] = pd.Series(np.nan,index=r1d.time_index) + result_df.index.name = "datetime" + result_df = result_df.reset_index() + return result_df + + +def read_catchments_sum( + catchments: list[str], col_r1d_quantity: list[tuple[str, mi.Res1D, str]] +): + """Read and calculate sum of quantities from catchments. Since it just adds values, do not use this for concentration!""" + result_df = pd.DataFrame(index=col_r1d_quantity[0][1].time_index) + for col, r1d, quantity in col_r1d_quantity: + for c in catchments: + assert quantity in r1d.quantities, f"{quantity} not in {r1d.file_path}" + assert c in r1d.catchments + col_result_df = r1d.read( + [mir.QueryDataCatchment(name=c, quantity=quantity) for c in catchments] + ) + if len(col_result_df.columns) != len(catchments): + warn("count of results columns does not match catchment count") + print(catchments, col_result_df) + result_df[col] = col_result_df.sum(axis="columns") + result_df.index.name = "datetime" + result_df = result_df.reset_index() + return result_df + + +def reach_max_velocity(reach, res_hd, sd: DrainageSystemData) -> float: + """Calculate maximum velocity for reach. If velocity is stored in the result files, use that instead! """ + # Assume that max water level, discharge and velocity coincide. + # Since water level and discharges are calculated at different chainages, + # using max is probably more accurate than using the exact same time where one of the values is maximal. + q = res_hd.reaches[reach][1].Discharge.read().max(axis=None) + h = res_hd.reaches[reach][0].WaterLevel.read().max(axis=None) + try: + _, (a,) = sd.xs_area_at(reach, h, [0.0]) + return q / a + except Exception as ex: + warn(str(ex)) + return np.nan + + +def _parts_to_dataset(df_parts): + dataset = {} + for dn in df_parts.keys(): + dataset[dn] = pd.concat(df_parts[dn], ignore_index=True).set_index( + ["place", "event", "datetime"] + ) + dataset[dn].columns.name = "property" + dataset["inflow"] = dataset["inflow"].add( + dataset["inflow_catchments"], fill_value=0 + ) + # calculate concentration, where it was not read. at many points parts need to be summed up, but concentration would need to be a weighted average. + # therefore it is simpler to calculate it from transport and discharge. + for n, df in dataset.items(): + if ( + "discharge" in df.columns + and "afs_transport" in df.columns + and not "afs" in df.columns + ): + print("calculating afs for ", n) + df["afs"] = (df["afs_transport"] / df["discharge"]).fillna( + 0 + ) * uf.KGPM3_TO_MGPL + # Clip is needed to avoid negative concentrations. + # Negative concentrations can appear when directly calculating concentration from discharge and transport, + # and sometimes even in the concentration values read from a res1d, so this is unlikely to be an error in our reading procedure. + for n, df in dataset.items(): + if "afs" in df.columns: + df["afs"] = df["afs"].clip(lower=0) + return dataset diff --git a/py_package/src/entfrachten/analysis/static_analysis.py b/py_package/src/entfrachten/analysis/static_analysis.py new file mode 100644 index 0000000..3179a5f --- /dev/null +++ b/py_package/src/entfrachten/analysis/static_analysis.py @@ -0,0 +1,53 @@ +"""Module for analysis of static network data.""" + +from warnings import warn + +import networkx as nx + +from entfrachten.analysis.drainage_sysytem_data import DrainageSystemData + +def in_out_reach_subset(sd:DrainageSystemData,in_reaches:list[str],out_reaches:list[str]) -> tuple[list[str],list[str]]: + """Find all nodes and reaches between in_reaches and out_reaches.""" + # we want to look at everything between the in/out reaches. + # the nodes where inflow starts are not included, so we take end here instead of start. + in_nodes = sd.reach_df.loc[in_reaches, "end"] + out_nodes = sd.reach_df.loc[out_reaches, "start"] + # make a graph where in and out reaches are removed, the nodes connected to in or out are then the subgraph we need. + cut_reaches = sd.reach_df.copy() + cut_reaches = cut_reaches.loc[sd.reach_df.index.difference(in_reaches).difference(out_reaches)] + # also remove weirs, since we should not count parts where the water overflows. + cut_reaches = cut_reaches.query("type != 'Weir'") + cut_graph = nx.from_pandas_edgelist( + cut_reaches.reset_index(), + source="start", + target="end", + edge_attr=True, + create_using=nx.DiGraph, + ) + # Add in/out nodes to make sure that the next calculations run without error. + # They may be missing if in/outreaches and weirs are the only reaches that connect there. + cut_graph.add_nodes_from(in_nodes) + cut_graph.add_nodes_from(out_nodes) + + in_downstream = set() + for ni in in_nodes: + in_downstream.update(nx.descendants(cut_graph, ni)) + in_downstream.add(ni) + out_upstream = set() + for no in out_nodes: + out_upstream.update(nx.ancestors(cut_graph, no)) + out_upstream.add(no) + for r, ni in in_nodes.items(): + if not ni in out_upstream: + warn(f"in_node {ni} (in_reach {r}) not connected to out") + for r, no in out_nodes.items(): + if not no in in_downstream: + warn(f"out_node {no} (out_reach {r}) not connected to in") + nodes = list(in_downstream | out_upstream) + subgraph = cut_graph.subgraph(nodes) + reaches = set() + reaches.update(in_reaches) + reaches.update(out_reaches) + reaches.update(nx.get_edge_attributes(subgraph, "name").values()) + reaches = list(reaches) + return nodes, reaches \ No newline at end of file diff --git a/py_package/src/entfrachten/analysis/unit_factors.py b/py_package/src/entfrachten/analysis/unit_factors.py new file mode 100644 index 0000000..651e668 --- /dev/null +++ b/py_package/src/entfrachten/analysis/unit_factors.py @@ -0,0 +1,12 @@ +"""Collection of factors for unit conversion. Usage: new_value = old_value * OLD_UNIT_TO_NEW_UNIT. +A number behind a unit stands for an exponent. _P_ is used instead of division (short for 'per'). +For example m²/s is written as M2_P_S.""" +HA_TO_M2 = 100**2 +M2_TO_HA = 1/HA_TO_M2 +M3_P_HA_TO_MM = 0.1 +M3_P_S_P_HA_TO_MM_P_H = 360 +# (mg/l)*(m³)=(10**-6kg/10**-3m³)*(m³)=10**-3kg +MGPL_TO_KGPM3 = 10**-3 +KGPM3_TO_MGPL = 1/MGPL_TO_KGPM3 +CM_TO_INCH = 1/2.54 +M3_TO_L = 1000 diff --git a/py_package/src/entfrachten/optim/__init__.py b/py_package/src/entfrachten/optim/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/py_package/src/entfrachten/optim/loss_functions.py b/py_package/src/entfrachten/optim/loss_functions.py new file mode 100644 index 0000000..bcbc31f --- /dev/null +++ b/py_package/src/entfrachten/optim/loss_functions.py @@ -0,0 +1,80 @@ +"""This module defines multiple loss functions which may be used to compare the simulation results to the reference. +Note that nevergrad performs minimization, therefore smaller values need to indicate a better result.""" +from warnings import warn + +import pandas as pd +import numpy as np +import hydroeval as hde + +def rmse(ref: pd.Series, res: pd.Series) -> float: + """Root Mean Square Error. + No weighting or averaging over labels is applied.""" + if not ref.index.equals(res.index): + raise ValueError("Indices are not equal, cannot compare") + return ((ref - res) ** 2).mean(axis=None) ** 0.5 + + +def _vfp(ref: pd.Series, res: pd.Series, divisor_on_zero) -> float: + """Difference of total volumes as a percentage.""" + if not ref.index.equals(res.index): + raise ValueError("Indices are not equal, cannot compare") + # Total reference Volume + ref_volume = ref.sum() + # Total result Volume + res_volume = res.sum() + dif = abs((res_volume - ref_volume)) + # if reference volume is 0 use the replacement value this is more useful for optimization than nan or zero. + if ref_volume == 0: + ref_volume = divisor_on_zero + # Convert to percent + return 100 * dif / ref_volume + +def weighted_vfp_factory(result_spec,reference): + """From result spec and reference create a function that applies vfp to each label + and then takes the weighted mean. + """ + # for vfp: replacement value for the divisor when ref_volume is zero. + # it may be better manually define one for each label. + divisor_on_zero = reference.groupby(level="Label").sum().mean() + def weighted_vfp(ref: pd.Series, res: pd.Series) -> float: + """Applies vfp to each label and then takes the weighted mean.""" + labels = result_spec.index.get_level_values("Label") + # also put timestep into weights, since vfp does not account for time passed + weights = ( + result_spec["EventWeight"].to_numpy() + * pd.to_timedelta(result_spec["Timestep"]).dt.total_seconds().to_numpy() + ) + per_label = np.array( + [_vfp(ref.xs(l, level="Label"), res.xs(l, level="Label"),divisor_on_zero) for l in labels] + ) + nan_parts = np.isnan(weights) | np.isnan(per_label) + if np.any(nan_parts): + warn("weighted_vfp: some parts are nan, skipping those") + return np.average(per_label[~nan_parts], weights=weights[~nan_parts]) + return weighted_vfp + + +# note: if your objective function should be maximized, use the negative objective function as loss function +def mean_negative_bounded_nse(ref: pd.Series, res: pd.Series) -> float: + """ + Mean over labels of negative nse_c2m. range [-1,1]. + nse_c2m is very similar to NSE, but bounded to range[-1,1], which is better for optimization. + Negation is applied so that better values are smaller. + Args: + actual (pandas.DataFrame): DataFrame containing actual values. + forecast (pandas.DataFrame): DataFrame containing forecasted values. + Returns: + float: Cost of the solution. + """ + if not ref.index.equals(res.index): + raise ValueError("Indices are not equal, cannot compare") + loss_parts = [] + for label in ref.index.unique("Label"): + ref_part = ref.xs(label, level="Label") + res_part = res.xs(label, level="Label") + if ref_part.isna().any() or res_part.isna().any(): + warn(f"Label {label} ref or res has nan values") + ref_part = ref_part.interpolate().ffill().bfill().fillna(0) + res_part = res_part.interpolate().ffill().bfill().fillna(0) + loss_parts.append(-hde.nse_c2m(res_part.to_numpy(), ref_part.to_numpy())) + return np.mean(loss_parts) \ No newline at end of file diff --git a/py_package/src/entfrachten/optim/optim_dumper.py b/py_package/src/entfrachten/optim/optim_dumper.py new file mode 100644 index 0000000..5cb6e2f --- /dev/null +++ b/py_package/src/entfrachten/optim/optim_dumper.py @@ -0,0 +1,82 @@ + +from pathlib import Path +from warnings import warn +import dill + + +class OptimDumper: + """Class for dumping and loading the optimization state of a nevergrad optimizer. + New files are created for each step, the old files are kept as a back up. + For a large optimization budget dump should not be called every step to reduce storage use. + """ + + _fin_format = "{0}.fin" + + def __init__(self, dump_folder): + """folder_name: folder (relative to temp_folder) where dumps are stored.""" + self.dump_folder = Path(dump_folder) + self.dump_folder.mkdir(exist_ok=True) + + # the following methods define the path where the file for a variable and step should be stored. + + def _get_path_opt(self, step): + return self.dump_folder / f"{step}.opt.dill" + + def _get_path_params(self, step): + return self.dump_folder / f"{step}.par.dill" + + def _get_path_losses(self, step): + return self.dump_folder / f"{step}.los.dill" + + def _get_path_fin(self, step): + return self.dump_folder / self._fin_format.format(step) + + def get_last_step(self) -> int: + """Returns the last step for which a dump was finished.""" + # {n}.fin file is used to mark successful dump, where n is the optimization step. + # The .fin file ensures that we can easily find tha last valid dump, even if the next dump was interrupted. + fin_steps = [ + int(f.stem) for f in self.dump_folder.glob(self._fin_format.format("*")) + ] + if len(fin_steps) == 0: + return -1 + return max(fin_steps) + + def load(self): + """Load the last optimizer state. + Returns: optimizer, list of asked parameters and list of losses.""" + last_step = self.get_last_step() + if last_step < 0: + raise DumpNotFoundException(f"No dump was finished for {self.dump_folder}") + with open(self._get_path_opt(last_step), "rb") as fo: + optimizer = dill.load(fo) + if optimizer.num_tell != (last_step + 1): + warn("optimizer.num_tell does not match detected last step") + print( + f"loaded optimizer from {self._get_path_opt(last_step)}. executed iterations: {optimizer.num_tell}" + ) + with open(self._get_path_params(last_step), "rb") as fp: + params = dill.load(fp) + if optimizer.num_tell != len(params): + warn("length of params does not match") + with open(self._get_path_losses(last_step), "rb") as fl: + losses = dill.load(fl) + if optimizer.num_tell != len(losses): + warn("length of losses does not match") + return optimizer, params, losses + + def dump(self, optimizer, params, losses): + """Save optimization variables, which cannot be easily recreated, to disk.""" + step = optimizer.num_tell - 1 + with open(self._get_path_opt(step), "wb") as f: + dill.dump(optimizer, f) + with open(self._get_path_params(step), "wb") as f: + dill.dump(params, f) + with open(self._get_path_losses(step), "wb") as f: + dill.dump(losses, f) + # create file to mark end of step. only the name matters, thus do not write anything into the file. + open(self._get_path_fin(step), "w").close() + + +class DumpNotFoundException(Exception): + pass diff --git a/py_package/src/entfrachten/optim/simulation.py b/py_package/src/entfrachten/optim/simulation.py new file mode 100644 index 0000000..cd21a59 --- /dev/null +++ b/py_package/src/entfrachten/optim/simulation.py @@ -0,0 +1,262 @@ +"""Provides a class for setting parameters and running a simulation with mikepluspy. +""" + +import os +import typing +from contextlib import redirect_stdout +from warnings import warn +from pathlib import Path +import threading +from multiprocessing.pool import ThreadPool + +import pandas as pd +from mikeplus import DataTableAccess +from mikeplus.engines import Engine1D +from mikeio1d.res1d import Res1D + +# delete result files(as specified by event config) before a sim, assures that no old file is reread in the case that a simulation does not run +DELETE_RESFILE_BEFORE_SIM = True +# For dates the dta returns a .NET Datetime, which pandas cannot convert. Use ToString first. +DOTNET_DT_ISO_FORMAT = "yyyy-MM-ddTHH:mm:ss" + + +class SimulationRunner: + def __init__( + self, + db_path: typing.Union[str, Path], + result_folder: typing.Union[str, Path], + param_spec: pd.DataFrame, + result_spec: pd.DataFrame, + parallel_limit: int, + adjust_end_time: bool = True, + ) -> None: + self.result_folder = Path(result_folder) + self.param_spec = param_spec + if not { + "ResultFile", + "ResultReach", + "ResultNode", + "ResultColumn", + "Timestep", + "Start", + "End", + }.issubset(result_spec.columns): + raise ValueError( + "result_spec needs columns: ResultFile,ResultReach,ResultNode,ResultColumn,Timestep,Start,End" + ) + if not ( + result_spec["Start"].dt.tz is None and result_spec["End"].dt.tz is None + ): + raise ValueError( + "Timezone for Start and End has to be None. Use dt.tz_localize(None) to drop timezone." + ) + + # Preparations for running the simulation + # open the MIKE+ database using MIKE+Py. + self.result_spec = result_spec + self.dta = DataTableAccess(db_path) + self.dta.open_database() + assert self.dta.is_database_open() + # Dict Engine1D objects that will be used to run MIKE 1D simulations, one per thread + self.engines = {} + + def init_engine(): + self.engines[threading.current_thread()] = Engine1D(self.dta.datatables) + print(f"initialized engine for {threading.current_thread()}") + + # sim_executor is used to run multiple simulations in parallel. + # Since the actual simulation runs in its own process anyway, it makes no sense to use multiple python processes. + # Threadpool from multiprocessing is used, because it has a rather simple interface. + self.sim_executor = ThreadPool( + processes=parallel_limit, initializer=init_engine + ) + if adjust_end_time: + self._adjust_computation_end() + self.sim_longest_first = self._get_sim_ordered_by_duration() + + def _adjust_computation_end(self): + """Adjust computation times so that nothing unnecessary is computed""" + sim_group = self.result_spec.reset_index().groupby("SimulationName") + # these define the time span we need to read from + read_end = sim_group["End"].max() + sim_pad = pd.to_timedelta("5min") + for sim in read_end.index: + + # Adjust end. Use string to set value, since a Pandas datetime cannot be converted. + end_string = (read_end[sim] + sim_pad).isoformat() + self.dta.set_value("msm_Project", sim, "ComputationEnd", end_string) + # make sure the result matches + assert ( + self.dta.get_field_values("msm_Project", sim, ["ComputationEnd"])[ + 0 + ].ToString(DOTNET_DT_ISO_FORMAT) + == end_string + ) + + def _get_sim_ordered_by_duration(self): + """Make a list of simulations where the longest comes first. + Assuming that longer simulations use more computation time, + this prevents a case where a long simulation runs at the end and takes up only on thread, + while other threads are idle.""" + sim_group = self.result_spec.reset_index().groupby("SimulationName") + # start of reading time, checked later + read_start = sim_group["Start"].min() + duration = pd.Series(data=pd.to_timedelta("NaT"), index=read_start.index) + for sim in duration.index: + # read start + computation_start = pd.to_datetime( + self.dta.get_field_values("msm_Project", sim, ["ComputationBegin"])[ + 0 + ].ToString(DOTNET_DT_ISO_FORMAT) + ) + # check start time + if computation_start > read_start[sim]: + warn( + f"Simulation {sim} start too late. Got {computation_start} need {read_start[sim]}" + ) + # read end + computation_end = pd.to_datetime( + self.dta.get_field_values("msm_Project", sim, ["ComputationEnd"])[ + 0 + ].ToString(DOTNET_DT_ISO_FORMAT) + ) + duration[sim] = computation_end - computation_start + return duration.sort_values(ascending=False).index.to_list() + + def close(self): + self.dta.close_database() + self.sim_executor.close() + + def _single_sim_run(self, sim_name: str): + """Runs a single simulation for event ev. Returns a string message on successful finish. + Intended for usage with ThreadPool.map(). Use map to complete all simulations and read results after that + """ + # also measure the time the simulation takes + start_time = pd.Timestamp.now() + print(f"Running simulation {sim_name} in {threading.current_thread()}.") + self.engines[threading.current_thread()].run(sim_name) + end_time = pd.Timestamp.now() + return f"Completed sim for {sim_name},took {end_time-start_time},in {threading.current_thread()}" + + # the ** means that simulation run may be called like so: + # result = simulation_run(key1=p1,key2=p2, ...) + # but kwparams will contain a dict of form {"key1":p1,"key2":p2,...} + # if you called the simulation like simulation_run(key1=p1,key2=p2, ...), + # key1, key2, ... would need to be valid python names + # but with simulation_run(**{"key1":p1,"key2":p2,...}) key1, key2 could be any string. + # Because the optimizer uses the second form, we can use the index from param_specs as a key, + # even though the index may not be a valid python name. + def simulation_run(self, **kwparams): + """Takes parameters as arguments, runs simulations as defined by events with them + and returns the results as a series with MultiIndex[label, time]. + + ParamSetError is raised if a parameter could not be set correctly, + this exception indicates an error in the parameter configuration and should not be ignored. + + Note that kwparams needs keys that match the index of DataFrame self.param_specs. + """ + + if not set(kwparams.keys()) == set(self.param_spec.index): + raise ValueError("Parameters need to match self.param_specs.") + # set all parameters to the values defined by params. + # param_specs defines where the parameter is located in the mike database + for k in kwparams.keys(): + tab = self.param_spec.loc[k, "Table"] + col = self.param_spec.loc[k, "Column"] + muid = self.param_spec.loc[k, "Muid"] + val = kwparams[k] + self.dta.set_value(tab, muid, col, val) + try: + val_get = self.dta.get_field_values(tab, muid, [col])[0] + except Exception as ex: + raise ParamSetError( + f"Exception while trying to confirm parameter value. tab: {tab} col: {col} muid: {muid}" + ) from ex + if not val_get == val: + raise ParamSetError( + f"""Value not correctly set. + set {val}, got {val_get}. + tab: {tab} + col: {col} + muid: {muid}""" + ) + if DELETE_RESFILE_BEFORE_SIM: + for l in self.result_spec.index: + self._delete_result_file(l) + # Now run each simulation on its own thread. Map only returns when all simulations are finished. + # console output is suppressed, because mikepluspy print far too much stuff for many runs. + with open(os.devnull,mode="r+") as null_stream: + with redirect_stdout(null_stream): + finish_msgs = self.sim_executor.map( + self._single_sim_run, self.sim_longest_first + ) + print(finish_msgs) + for l in self.result_spec.index: + assert self._exists_result_file(l), "Result file not found." + return self.read_results() + + def read_results(self) -> pd.Series: + """Reads all results according to event table. Reads each file at most once + returns: + a pandas Series with (label,Datetime) as index. + """ + # collect results for each label in a dictionary + result_dict = {} + # go over all required files and read all results for that file in one go + for result_file, events_file in self.result_spec.groupby("ResultFile"): + # collect necessary reaches and nodes + reaches = set(events_file["ResultReach"]) - {""} + nodes = set(events_file["ResultNode"]) - {""} + file_path: Path = self.result_folder / result_file + params:dict[str,typing.Any] = { + "file_path": str(file_path.absolute()), + } + # these parameters would make the reading faster, but currently do not work: the results are wrong. + #if len(reaches) > 0: + # params["reaches"] = list(reaches) + #if len(nodes) > 0: + # params["nodes"] = list(nodes) + file_results = Res1D(**params).read_all() + + for label in events_file.index: + column = events_file.at[label, "ResultColumn"] + timestep = events_file.at[label, "Timestep"] + start = events_file.at[label, "Start"] + end = events_file.at[label, "End"] + series = file_results[column] + # Reindex to be comparable to results. + # There should not be many missing values, otherwise issue a warning. + # If you wand to use resampling on irregular spaced times, note that linear interpolation with pandas first throws away all + # values where the index does not match to the frequency. + full_idx = pd.date_range(start, end, freq=timestep) + if len(full_idx.intersection(series.index)) < 0.95 * len(full_idx): + actual_timestep = series.index.to_series().diff().mean() + warn( + f"too few values in result for {label} in file '{file_path}'.\n" + + f"\tWant {start} to {end}, with timestep {timestep}.\n" + + f"\tGot {series.index.min()} to {series.index.max()}, with timestep {actual_timestep}." + + "Timestep of result files may not match expectation ({timestep})." + ) + print(series) + series = series.reindex(full_idx) + series.index.name = "Datetime" + result_dict[label] = series + return pd.concat(result_dict, names=["Label"]) + + def _delete_result_file(self, label: str): + """Deletes the file holding the result for label. + Useful to make sure that no old values are read.""" + file_path: Path = self.result_folder / self.result_spec.at[label, "ResultFile"] + file_path.unlink(True) + + def _exists_result_file(self, label: str): + """Check if the file holding the result for label exists.""" + file_path: Path = self.result_folder / self.result_spec.at[label, "ResultFile"] + return file_path.exists() + + +class ParamSetError(Exception): + """Class that marks an Error where Parameters were not correctly set.""" + + # class only needed to distinguish Error, no special behavior needed. + pass diff --git a/py_package/src/entfrachten/optim/simulation_dummy.py b/py_package/src/entfrachten/optim/simulation_dummy.py new file mode 100644 index 0000000..6ac2aaf --- /dev/null +++ b/py_package/src/entfrachten/optim/simulation_dummy.py @@ -0,0 +1,91 @@ +"""Dummy simulation intended for testing. +""" + +from warnings import warn + +import numpy as np +import pandas as pd + + + +class SimulationDummy: + """Dummy for Simulation testing. + Only for testing if the program works, do not expect realistic behavior!""" + def __init__( + self, + reference:pd.Series, + param_spec:pd.DataFrame, + result_spec:pd.DataFrame, + seed=1234, + ) -> None: + self.param_spec = param_spec + if not { + "ResultFile", + "ResultReach", + "ResultNode", + "ResultColumn", + "Timestep", + "Start", + "End", + }.issubset(result_spec.columns): + raise ValueError( + "result_spec needs columns: ResultFile,ResultReach,ResultNode,ResultColumn,Timestep,Start,End" + ) + if not ( + result_spec["Start"].dt.tz is None and result_spec["End"].dt.tz is None + ): + raise ValueError( + "Timezone for Start and End has to be None. Use dt.tz_localize(None) to drop timezone." + ) + self.reference = reference + rng = np.random.default_rng(seed=seed) + # these parameters present ideal values, where the reference is returned + self.ideal_params = pd.Series(index=param_spec.index,data=rng.uniform(param_spec["Min"],param_spec["Max"])) + # these values + self.deviations = pd.DataFrame(index=reference.index,columns=param_spec.index,data=0.0) + for p in param_spec.index: + for l in reference.index.get_level_values("Label").unique(): + sn = slice(None) + ref_part = reference.loc[(l,sn)] + amp = rng.uniform(0,ref_part.max()) + center = rng.uniform(0,1) + x_stretch = rng.uniform(1,20) + self.deviations.loc[(l,sn),p]=_make_bell_array(len(ref_part),center,amp,x_stretch) + + def close(self): + pass + + + # the ** means that simulation run may be called like so: + # result = simulation_run(key1=p1,key2=p2, ...) + # but kwparams will contain a dict of form {"key1":p1,"key2":p2,...} + # if you called the simulation like simulation_run(key1=p1,key2=p2, ...), + # key1, key2, ... would need to be valid python names + # but with simulation_run(**{"key1":p1,"key2":p2,...}) key1, key2 could be any string. + # Because the optimizer uses the second form, we can use the index from param_specs as a key, + # even though the index may not be a valid python name. + def simulation_run(self, **kwparams): + """ Dummy simulation. + Takes parameters as arguments and returns the + dummy results as a series with MultiIndex[label, time]. + Note that kwparams needs keys that match the index of DataFrame self.param_specs.""" + + if not set(kwparams.keys()) == set(self.param_spec.index): + raise ValueError("parameters need to match self.param_specs.") + # squared normalized difference from ideal parameters is used to multiply the deviation + dev_factor = ((pd.Series(kwparams)-self.ideal_params)/(self.param_spec["Max"]-self.param_spec["Min"]))**2 + return (self.deviations*dev_factor).sum(axis="columns")+self.reference.copy() + + + def read_results(self) -> pd.Series: + """Reads all results according to event table. Reads each file at most once + returns: + a pandas Series with (label,Datetime) as index. + """ + return self.simulation_run(**self.param_spec["Default"]) + + +def _make_bell_array(length:int, center:float, amp:float, x_stretch:float): + """Makes a numpy array which represents a bell curve""" + lin_01 = np.linspace(0, 1, num=length) + return amp / (1+((lin_01[..., np.newaxis] - center) * x_stretch) ** 2) \ No newline at end of file diff --git a/py_package/src/entfrachten/plotting/struct_plot.py b/py_package/src/entfrachten/plotting/struct_plot.py new file mode 100644 index 0000000..18fac72 --- /dev/null +++ b/py_package/src/entfrachten/plotting/struct_plot.py @@ -0,0 +1,1163 @@ +"""Plots for structure analysis.""" + +import typing +import numpy as np +import pathlib +import pandas as pd +import scipy.stats +import matplotlib as mpl +import matplotlib.pyplot as plt +import seaborn as sns +from tqdm import tqdm + +from entfrachten.analysis import ( + drainage_system_graph, + drainage_sysytem_data, + unit_factors as uf, + dynamic_analysis as da, + sewer_map, +) + +# Helper for more concise indexing +ids = pd.IndexSlice + +fig_height = 8 * uf.CM_TO_INCH +font_size = 9 + +# common arguments for boxplots +box_kwargs = dict( + whis=(10.0, 90.0), + showmeans=True, + meanline=True, + meanprops=dict(linestyle=":", linewidth=1.2, color="black"), +) + + +def adjust_mpl_settings(): + """Adjusts matplotlib settings. Should be called before plotting.""" + mpl.rcParams.update( + { + "font.size": font_size, # Set font size + "font.family": "Arial", # and font family + "figure.figsize": [fig_height, fig_height], + } + ) + + +def dist_line( + df_dist: pd.DataFrame, out_folder: pathlib.Path, quantity: str, unit: str +): + """Draws line plots for all places and events. Uses 'part' level for color. Intended for analysis of inflow,outflow,overflow and storage""" + quantity_label = f"{quantity} ({unit})" + max_events_per_plot = 16 + line_folder = out_folder / "lines/" + line_folder.mkdir(exist_ok=True) + for p in tqdm(df_dist.index.get_level_values("place").unique()): + df_dist_p = df_dist.xs(p, level="place") + events_p = sorted(df_dist_p.index.get_level_values("event").unique()) + event_chunks = [ + events_p[i : i + max_events_per_plot] + for i in range(0, len(events_p), max_events_per_plot) + ] + for ec in tqdm(event_chunks): + e_min = min(ec) + e_max = max(ec) + title = f"{quantity} at {p} events {e_min}-{e_max}" + filename = f"{p.replace(' ','')}_lines_{quantity.replace(' ','_')}_{e_min}-{e_max}.svg" + plot_df = ( + df_dist_p.query("event in @ec") + .stack() + .rename(quantity_label) + .reset_index() + ) + fg = ( + sns.FacetGrid( + plot_df.reset_index(), + col="event", + col_wrap=4, + hue="part", + palette=_dist_palette(df_dist_p), + sharex=False, + sharey=False, + height=fig_height, + aspect=2.7, + ) + .map(sns.lineplot, "datetime", quantity_label, alpha=0.6) + .add_legend() + ) + fg.figure.suptitle(title, y=1.04) + path = line_folder / filename + print(f"saving to {path}") + fg.savefig(path) + + +def dist_box_max( + df_dist: pd.DataFrame, out_folder: pathlib.Path, quantity: str, unit: str +): + """One Plot per place. Draws box plots for of the max values per event. Uses 'part' level for color. + Intended for analysis of inflow,outflow,overflow and storage""" + # boxplots per place of max values + for p, df_dist_p in df_dist.groupby("place"): + assert isinstance(p, str) + # drop columns with only nan, this should be the distribute_storage columns for places where they do not apply + df_dist_p = df_dist_p.dropna(how="all", axis="columns") + title = f"max {quantity} parts at {p}, absolute" + filename = f"{p.replace(' ','')}_{quantity.replace(' ','_')}_max_box.svg" + plot_df = ( + df_dist_p.groupby("event") + .max() + .stack() + .rename(f"{quantity} ({unit})") + .reset_index() + ) + fg = ( + sns.FacetGrid( + plot_df.reset_index(), + hue="part", + palette=_dist_palette(df_dist), + sharex=False, + sharey=False, + height=fig_height, + aspect=1.5, + ) + .map( + sns.boxplot, + "part", + f"{quantity} ({unit})", + **box_kwargs, + order=list(df_dist_p.columns), + ) + .add_legend() + ) + plt.suptitle(title, y=1.04) + plt.show() + path = out_folder / filename + # print(f"saving to {path}") + fg.savefig(path) + + +def dist_in_volume_section( + df_dist_d: pd.DataFrame, + sealed_area_m2: pd.Series, + quantity: str, + unit: str, + out_folder: pathlib.Path, +): + """One plot per place. Draws box plots and line plots of the median for of the values per volume section. Uses 'part' level for color. + Intended for analysis of inflow,outflow,overflow and storage.""" + + box_kw = box_kwargs | dict(fliersize=2, native_scale=True) + _dist_in_volume_section_impl( + f"{quantity}_dist_box", + df_dist_d, + sealed_area_m2, + quantity, + unit, + out_folder, + True, + sns.boxplot, + box_kw, + ) + _dist_in_volume_section_impl( + f"{quantity}_dist_box", + df_dist_d, + sealed_area_m2, + quantity, + unit, + out_folder, + False, + sns.boxplot, + box_kw, + ) + line_kw = dict( + estimator="median", + errorbar=("pi", 50), + marker=".", + markersize=6, + err_style="band", + ) + _dist_in_volume_section_impl( + f"{quantity}_dist_quartline", + df_dist_d, + sealed_area_m2, + quantity, + unit, + out_folder, + True, + sns.lineplot, + line_kw, + ) + _dist_in_volume_section_impl( + f"{quantity}_dist_quartline", + df_dist_d, + sealed_area_m2, + quantity, + unit, + out_folder, + False, + sns.lineplot, + line_kw, + ) + + +def _dist_in_volume_section_impl( + plot_name, + df_dist_d: pd.DataFrame, + sealed_area_m2: pd.Series, + quantity: str, + unit: str, + out_folder: pathlib.Path, + filter_overflow: bool, + plot_fkt: typing.Callable, + fkt_kwargs: dict, +): + """implementation for dist_in_volume_section. The plot function is just an argument, + most of the code is for preparing the data and is the same for line or box plots.""" + for p in tqdm(df_dist_d.index.get_level_values("place").unique()): + df_dist_p = df_dist_d.xs(p, level="place") + df_dist_p = df_dist_p.dropna(how="all", axis="columns") + section_size = ( + df_dist_p.index.get_level_values("volume_section") + .to_series() + .diff() + .median() + ) + title = f"{p}: {quantity} change over incoming volume sections({section_size:.2f} m³)" + filename = f"{p.replace(' ','')}_{plot_name}" + if filter_overflow: + # filter out events where no overflow occurs + events_p = sorted(df_dist_p.index.get_level_values("event").unique()) + events_p = [ + e for e in events_p if df_dist_p.loc[ids[e, :], "overflow"].max() > 0 + ] + if len(events_p) == 0: + print("no events with overflow for " + p) + continue + df_dist_p = df_dist_p.query("event in @events_p") + title += ", only overflow events" + filename += "_overflows_only" + filename += ".svg" + # stack dataframe such that a volume_in exists for every row while other parts are stackes on top of each + parts = list(df_dist_p.columns) + df_dist_p = df_dist_p.reset_index() + rain_depth = da.calc_raindepth(df_dist_p["volume_section"], sealed_area_m2[p]) + plot_df = pd.concat( + [ + pd.DataFrame( + { + "volume section(mm)": rain_depth, + f"Δ {quantity} ({unit})": df_dist_p[part], + "part": part, + } + ) + for part in parts + ] + ) + fg = sns.FacetGrid( + data=plot_df, + col="part", + col_order=parts, + hue="part", + palette=_dist_palette(df_dist_d), + hue_order=parts, + sharey=False, + sharex=False, + height=fig_height, + aspect=1, + ) + fg.map_dataframe( + plot_fkt, x="volume section(mm)", y=f"Δ {quantity} ({unit})", **fkt_kwargs + ) + # adjust ylim for substorages to be equal + sub_parts = df_dist_p.columns.difference(da.dist_agg_parts) + sub_min = df_dist_p["storage"].min(axis=None) + sub_max = df_dist_p["storage"].max(axis=None) + pad = 0.05 + sub_ylim = (sub_min, sub_max) # previously had *subrange for both limits + for part, ax in fg.axes_dict.items(): + if part in sub_parts: + ax.set_ylim(sub_ylim) + fg.add_legend() + plt.suptitle(title, y=1.04) + path = out_folder / filename + # print(f"saving to {path}") + fg.savefig(path) + +def dist_in_volume_section_SST( + df_dist_d: pd.DataFrame, + sealed_area_m2: pd.Series, + quantity: str, + unit: str, + out_folder: pathlib.Path, +): + """One plot per place. Draws box plots and line plots of the median for of the values per volume section. Uses 'part' level for color. + Intended for analysis of inflow,outflow,overflow and storage.""" + + box_kw = box_kwargs | dict(fliersize=2, native_scale=True) + _dist_in_volume_section_impl_SST( + f"{quantity}_dist_box", + df_dist_d, + sealed_area_m2, + quantity, + unit, + out_folder, + True, + sns.boxplot, + box_kw, + ) + _dist_in_volume_section_impl_SST( + f"{quantity}_dist_box", + df_dist_d, + sealed_area_m2, + quantity, + unit, + out_folder, + False, + sns.boxplot, + box_kw, + ) + line_kw = dict( + estimator="median", + errorbar=("pi", 50), + marker=".", + markersize=6, + err_style="band", + ) + _dist_in_volume_section_impl_SST( + f"{quantity}_dist_quartline", + df_dist_d, + sealed_area_m2, + quantity, + unit, + out_folder, + True, + sns.lineplot, + line_kw, + ) + _dist_in_volume_section_impl_SST( + f"{quantity}_dist_quartline", + df_dist_d, + sealed_area_m2, + quantity, + unit, + out_folder, + False, + sns.lineplot, + line_kw, + ) + + + +def _dist_in_volume_section_impl_SST( + plot_name, + df_dist_d: pd.DataFrame, + sealed_area_m2: pd.Series, + quantity: str, + unit: str, + out_folder: pathlib.Path, + filter_overflow: bool, + plot_fkt: typing.Callable, + fkt_kwargs: dict, +): + """implementation for dist_in_volume_section. The plot function is just an argument, + most of the code is for preparing the data and is the same for line or box plots.""" + for p in tqdm(df_dist_d.index.get_level_values("place").unique()): + df_dist_p = df_dist_d.xs(p, level="place") + df_dist_p = df_dist_p.dropna(how="all", axis="columns") + section_size = ( + df_dist_p.index.get_level_values("volume_section") + .to_series() + .diff() + .median() + ) + title = f"{p}: {quantity} change over incoming volume sections({section_size:.2f} m³)" + filename = f"{p.replace(' ','')}_{plot_name}" + if filter_overflow: + # filter out events where no overflow occurs + events_p = sorted(df_dist_p.index.get_level_values("event").unique()) + events_p = [ + e for e in events_p if df_dist_p.loc[ids[e, :], "overflow"].max() > 0 + ] + if len(events_p) == 0: + print("no events with overflow for " + p) + continue + df_dist_p = df_dist_p.query("event in @events_p") + title += ", only overflow events" + filename += "_overflows_only" + filename += "publication.svg" + + # Drop the 'overflow' column here + df_dist_p = df_dist_p.drop(columns=["overflow"], errors='ignore') + + # stack dataframe such that a volume_in exists for every row while other parts are stacked on top of each + parts = list(df_dist_p.columns) + df_dist_p = df_dist_p.reset_index() + rain_depth = da.calc_raindepth(df_dist_p["volume_section"], sealed_area_m2[p]) + plot_df = pd.concat( + [ + pd.DataFrame( + { + "volume section(mm)": rain_depth, + f"Δ {quantity} ({unit})": df_dist_p[part], + "part": part, + } + ) + for part in parts + ] + ) + fg = sns.FacetGrid( + data=plot_df, + col="part", + col_order=parts, + hue="part", + palette=_dist_palette(df_dist_d), + hue_order=parts, + sharey=False, + sharex=False, + height=fig_height, + aspect=1, + col_wrap=3, # Adjust the number of columns (3) to arrange the plots in 2 rows + ) + fg.map_dataframe( + plot_fkt, x="volume section(mm)", y=f"Δ {quantity} ({unit})", **fkt_kwargs + ) + # adjust ylim for substorages to be equal + sub_parts = df_dist_p.columns.difference(da.dist_agg_parts) + sub_min = df_dist_p["storage"].min(axis=None) + sub_max = df_dist_p["storage"].max(axis=None) + sub_ylim = (sub_min, sub_max) # previously had *subrange for both limits + for part, ax in fg.axes_dict.items(): + if part in sub_parts: + ax.set_ylim(sub_ylim) + fg.set_xlabels("volume section (mm)") + #fg.add_legend() + # plt.suptitle(title, y=1.04) + path = out_folder / filename + # print(f"saving to {path}") + fg.savefig(path) + + + +def _dist_palette(df) -> dict[str, str]: + """color palette for plotting the parts""" + part_colors = { + "sum": "#AAAA00", + "inflow": "#6A6A6A", + "storage": "#328529", + "overflow": "#BF375A", + "outflow": "#3F63D0", + } + dist_distributed_cols = df.columns.difference(part_colors.keys()) + for c in dist_distributed_cols: + part_colors[c] = part_colors["storage"] + return part_colors + + +def diff_plot1( + up_storages: dict[str, set[str]], + overflow_filtered: pd.DataFrame, + storage_afs: pd.DataFrame, + timestep: pd.Timedelta, + diff_afs_thresh: tuple[float,float], # changed to tuple for perc and abs deviation + out_folder: pathlib.Path, +): + """First version of plot showing differences in concentration from up to downstream.""" + # note that overflow_filtered contains ony the times where an overflow occurs, getting the timestep from it may not work. + for down, all_ups in up_storages.items(): + for up in all_ups: + if not (down in overflow_filtered.index.get_level_values("place")): + print("no overflow data", down) + continue + overflow_down = overflow_filtered.xs(down, level="place") + overflow_idx_down = overflow_down.index + if len(overflow_idx_down) == 0: + print("no overflow for", down) + storage_up_afs = storage_afs.xs(up, level="place").loc[ + overflow_idx_down, "afs" + ] + storage_down_afs = storage_afs.xs(down, level="place").loc[ + overflow_idx_down, "afs" + ] + storage_diff_afs = storage_down_afs - storage_up_afs + storage_diff_afs_rel = 1-(storage_up_afs/storage_down_afs) + diff_label = f"{down}\n-{up}" + # load that would not be in a overflow if, in times where the concentration upstream is lower, + # the overflow would happen upstream instead of downstream. + is_up_lower = storage_up_afs < storage_down_afs + load_current = ( + overflow_down["afs_transport"].sum() * timestep.total_seconds() + ) + load_diff = ( + ( + storage_diff_afs[is_up_lower] + * overflow_down.loc[is_up_lower, "discharge"] + ).sum() + * uf.MGPL_TO_KGPM3 + * timestep.total_seconds() + ) + overflow_duration = len(overflow_idx_down) * timestep + # calculate afs load diff with threshold value + up_significant_lower = (storage_diff_afs >= diff_afs_thresh[0]) & (storage_diff_afs_rel >= diff_afs_thresh[1]) + load_diff_signifcant = ( + ( + storage_diff_afs[up_significant_lower] + * overflow_down.loc[up_significant_lower, "discharge"] + ).sum() + * uf.MGPL_TO_KGPM3 + * timestep.total_seconds() + ) + overflow_duration = len(overflow_idx_down) * timestep + # time duration where the concentration difference is greater than some threshold. + diff_greater_duration = (up_significant_lower).sum() * timestep + + duration_text = f"""total overflow duration at {down}: {overflow_duration} + tss concentration difference >= {diff_afs_thresh[0]} & >= {diff_afs_thresh[1]*100}% : {diff_greater_duration}({round(100*diff_greater_duration/overflow_duration)}%) + Potential overflow load difference: {load_diff:.2f}kg ({round(100*load_diff/load_current)}% of {load_current:.2f}kg) + tss concentration difference >= {diff_afs_thresh[0]} & >= {diff_afs_thresh[1]*100}% : {load_diff_signifcant:.2f}kg ({round(100*load_diff_signifcant/load_current)}% of {load_current:.2f}kg))""" + + print("making plot for ", up, "->", down) + plot_df = pd.concat( + [ + pd.DataFrame( + { + "TSS (mg/l)": storage_up_afs, + "part": "inflow", + "place": up, + } + ), + pd.DataFrame( + { + "TSS (mg/l)": storage_diff_afs, + "part": "inflow", + "place": diff_label, + } + ), + pd.DataFrame( + { + "TSS (mg/l)": storage_down_afs, + "part": "inflow", + "place": down, + } + ), + ] + ).reset_index() + fig, ax = plt.subplots(figsize=(fig_height, fig_height)) + ax.axhline(0, color="gray") + sns.boxplot( + plot_df, + x="place", + y="TSS (mg/l)", + hue="place", + fliersize=2, + order=[up, diff_label, down], + **box_kwargs, + ax=ax, + ) + ax.set(xlabel=None) + fig.text( + -0.1, + -0.07, + duration_text, + horizontalalignment="left", + verticalalignment="top", + fontsize="medium", + ) + fig.suptitle(f"TSS concentration during overflow at {down}", y=1.04) + fig.savefig(out_folder / f"diff_conc_{up}-{down}.svg", bbox_inches="tight") + fig, ax = plt.subplots(figsize=(fig_height, fig_height)) + ax.axhline(0, color="gray") + sns.violinplot( + plot_df, + x="place", + hue="place", + y="TSS (mg/l)", + inner=None, + order=[up, diff_label, down], + ax=ax, + ) + ax.set(xlabel=None) + plt.suptitle(f"TSS concentration during overflow at {down}", y=1.04) + fig.text( + -0.1, + -0.07, + duration_text, + horizontalalignment="left", + verticalalignment="top", + fontsize="medium", + ) + fig.savefig( + out_folder / f"diff_conc_violin{up}-{down}.svg", bbox_inches="tight" + ) + + +def diff_plot2( + up_storages: dict[str, set[str]], + overflow_filtered: pd.DataFrame, + inflow: pd.DataFrame, + outflow: pd.DataFrame, + storage_afs: pd.DataFrame, + storage_struct: pd.DataFrame, + graph: drainage_system_graph.DrainageSystemGraph, + uds: drainage_sysytem_data.DrainageSystemData, + timestep: pd.Timedelta, + diff_afs_thresh: tuple[float,float], #changed to tuple + out_folder: pathlib.Path, +): + """Second version of plot showing differences in concentration from up to downstream. + Accounts for difference in flow times and shows potential for overflow load reduction. + Also creates line plots for some events to show the time shift.""" + for down, all_ups in up_storages.items(): + for up in all_ups: + up_node = storage_struct.loc[up, "node"] + down_node = storage_struct.loc[down, "node"] + path = graph.find_shortest_path(up_node, down_node) + sewer_map.make_path_time_map(uds, path).save( + out_folder / f"diff_conc2_{up}-{down}_flowtime.html" + ) + flow_len = uds.reach_df.loc[path, "length"].sum() + flow_time = uds.reach_df.loc[path, "flow_time"].sum() + example_lines_shifted(down, up, flow_time, inflow, out_folder) + if not (down in overflow_filtered.index.get_level_values("place")): + print("no overflow data", down) + continue + overflow_down = overflow_filtered.xs(down, level="place") + overflow_idx_down = overflow_down.index + if len(overflow_idx_down) == 0: + print("no overflow for", down) + # we need to account for flow time. + # water that could overflow at up would only get to down after flow_time has passed + # therefore we need to get the values from up at t-flow_time. + # to operate on the index values, first convert to dataframe + overflow_idx_up = overflow_idx_down.to_frame() + # we need to round flow_time to match the timestep of our data + overflow_idx_up["datetime"] -= flow_time.round(timestep) + overflow_idx_up = pd.MultiIndex.from_frame(overflow_idx_up) + storage_up_afs = storage_afs.xs(up, level="place").reindex(overflow_idx_up)[ + "afs" + ] + # overwrite index, otherwise pandas would match the original times. + storage_up_afs.index = overflow_idx_down + storage_down_afs = storage_afs.xs(down, level="place").loc[ + overflow_idx_down, "afs" + ] + storage_diff_afs = storage_down_afs - storage_up_afs + storage_diff_afs_rel = 1-(storage_up_afs/storage_down_afs) + diff_label = f"{down}\n-{up}" + # load that would not be in a overflow if, in times where the concentration upstream is lower, + # the overflow would happen upstream instead of downstream. + is_up_lower = storage_up_afs < storage_down_afs + load_current = ( + overflow_down["afs_transport"].sum() * timestep.total_seconds() + ) + # the overflow that is shifted to up cannot be greater than the flow that comes from there + # TODO: flow_up ändern zum maximalen aufgezeichneten Abfluss + flow_up = outflow.xs(up, level="place").reindex(overflow_idx_up)[ + "discharge" + ] + flow_up.index = overflow_idx_down + flow_shiftable = overflow_down["discharge"].clip(upper=flow_up) + + load_diff = ( + (storage_diff_afs[is_up_lower] * flow_shiftable.loc[is_up_lower]).sum() + * uf.MGPL_TO_KGPM3 + * timestep.total_seconds() + ) + overflow_duration = len(overflow_idx_down) * timestep + # calculate afs load diff with threshold value + up_significant_lower = (storage_diff_afs >= diff_afs_thresh[0]) & (storage_diff_afs_rel >= diff_afs_thresh[1]) + load_diff_signifcant = ( + ( + storage_diff_afs[up_significant_lower] + * flow_shiftable.loc[up_significant_lower] + ).sum() + * uf.MGPL_TO_KGPM3 + * timestep.total_seconds() + ) + overflow_duration = len(overflow_idx_down) * timestep + # time duration where the concentration difference is greater than some threshold. + diff_greater_duration = up_significant_lower.sum() * timestep + note_text = f"""total overflow duration at {down}: {overflow_duration} + tss concentration difference >= {diff_afs_thresh[0]} & >= {diff_afs_thresh[1]*100}% : {diff_greater_duration}({round(100*diff_greater_duration/overflow_duration)}%) + Potential overflow load difference: {load_diff:.2f}kg ({round(100*load_diff/load_current)}% of {load_current:.2f}kg) + tss concentration difference >= {diff_afs_thresh[0]} & >= {diff_afs_thresh[1]*100}% : {load_diff_signifcant:.2f}kg ({round(100*load_diff_signifcant/load_current)}% of {load_current:.2f}kg) + flow time: {flow_time}s, flow length: {flow_len:.2f}m, missing upstream data:{round(100*storage_up_afs.isna().mean())}%""" + print("making plot for ", up, "->", down) + plot_df = pd.concat( + [ + pd.DataFrame( + { + "TSS (mg/l)": storage_up_afs, + "place": up, + } + ), + pd.DataFrame( + { + "TSS (mg/l)": storage_diff_afs, + "place": diff_label, + } + ), + pd.DataFrame( + { + "TSS (mg/l)": storage_down_afs, + "place": down, + } + ), + ] + ).reset_index() + fig, (ax, ax_zoom) = plt.subplots( + ncols=2, + figsize=(fig_height * 2.5, fig_height), + gridspec_kw=dict(wspace=0.24), + ) + ax.axhline(0, color="gray") + sns.boxplot( + plot_df, + x="place", + y="TSS (mg/l)", + hue="place", + fliersize=2, + order=[up, diff_label, down], + **box_kwargs, + ax=ax, + ) + ax.set(xlabel=None) + ax.axhline(0, color="gray") + sns.boxplot( + plot_df, + x="place", + y="TSS (mg/l)", + hue="place", + showfliers=False, + order=[up, diff_label, down], + **box_kwargs, + ax=ax_zoom, + ) + ax_zoom.set(xlabel=None) + ax_zoom.axhline(0, color="gray") + fig.text( + 0.2, + -0.07, + note_text, + horizontalalignment="left", + verticalalignment="top", + fontsize="medium", + ) + fig.suptitle(f"TSS concentration during overflow at {down}", y=1.04) + fig.savefig(out_folder / f"diff_conc2_{up}-{down}.svg", bbox_inches="tight") + plt.close(fig) + + #create a new graph for publication + anonymize_dict = { + "SKU 0901": "CSO1", "SKU 0216": "CSO2", "SKU 0222": "CSO3", + "SKU 0225": "CSO4", "SKU 0228": "CSO5", "RUE 0139": "CSO6", + "MWE 0145": "CSO7", "SKU 0148": "CSO8", "SKU 0147": "CSO9", + "SKU 0133": "CSO10", "RKB 0337": "SSO1", "RRB 0301": "SST1" + } + # Rename up and down before using them + up_anon = anonymize_dict.get(up, up) # Default to original value if not found + down_anon = anonymize_dict.get(down, down) + + # Create the label using renamed values + diff_label2 = f"{down_anon}\n-{up_anon}" + + # Create the DataFrame with renamed values + plot_df2 = pd.concat( + [ + pd.DataFrame({"TSS (mg/l)": storage_up_afs, "place": up_anon}), + pd.DataFrame({"TSS (mg/l)": storage_diff_afs, "place": diff_label2}), + pd.DataFrame({"TSS (mg/l)": storage_down_afs, "place": down_anon}), + ] + ).reset_index(drop=True) + + fig, ax = plt.subplots( + ncols=1, + figsize=(fig_height, fig_height), + gridspec_kw=dict(wspace=0.24), + ) + ax.set(xlabel=None) + ax.axhline(0, color="gray") + sns.boxplot( + plot_df2, + x="place", + y="TSS (mg/l)", + #hue="place", + showfliers=False, + order=[up_anon, diff_label2, down_anon], + **box_kwargs, + ax=ax, + ) + fig.savefig(out_folder / f"diff_conc2_{up}-{down}_publication.svg", bbox_inches="tight") + plt.close(fig) + + # Create a dictionary for storing data of the printed text to an excel file + data_dict = { + "up":up, + "down": down, + "flow_time (h)": flow_time.total_seconds()/60/60, + "flow_length_m": flow_len, + "overflow_duration (h)": overflow_duration.total_seconds()/60/60, + "threshold absolute": diff_afs_thresh[0], + "threshold percentage": diff_afs_thresh[1] * 100, + "diff >0 (h)": diff_greater_duration.total_seconds()/60/60, + "diff >0 (%)": round(100 * diff_greater_duration / overflow_duration), + "overflow load": load_current, + "relocatable load (kg)": load_diff, + "load_diff_percent": round(100 * load_diff / load_current), + "relocatable load significant (kg)": load_diff_signifcant, + "load_diff_significant_percent": round(100 * load_diff_signifcant / load_current), + "missing_upstream_data_percent": round(100 * storage_up_afs.isna().mean()), + } + + # Convert to a DataFrame + df_summary = pd.DataFrame([data_dict]) + + # Save as CSV (or Excel if preferred) + out_path = out_folder / f"diff_conc2_{up}-{down}.xlsx" + df_summary.to_excel(out_path, index=False) + + +def example_lines_shifted(down, up, up_shift, inflow, out_folder): + """Creates line plots for some events to show the time shift.""" + example_events = [363, 505, 588, 320, 490, 563] + discharge_down = inflow.query("place==@down and event in @example_events")[ + "discharge" + ].reset_index() + discharge_up = inflow.query("place==@up and event in @example_events")[ + "discharge" + ].reset_index() + discharge_up_shifted = discharge_up.copy() + discharge_up_shifted["datetime"] = discharge_up_shifted["datetime"] + up_shift + discharge_up["place"] += "_original" + discharge_up_shifted["place"] += "_shifted" + plot_df = pd.concat( + [discharge_down, discharge_up, discharge_up_shifted], ignore_index=True + ) + fg = sns.FacetGrid( + plot_df, + col="event", + col_wrap=2, + hue="place", + palette={ + down: "#467AC7", + up + "_original": "#89AC87", + up + "_shifted": "#298C0A", + }, + height=fig_height, + aspect=2, + sharex=False, + sharey=False, + ) + fg.map_dataframe(sns.lineplot, x="datetime", y="discharge") + fg.add_legend() + fg.figure.text(1.05, 0.8, str(up_shift)) + fg.savefig(out_folder / f"diff_conc2_{up}-{down}_shift_lines.svg") + + +def ff_line(ff_df: pd.DataFrame, out_folder: pathlib.Path): + """Draw accumulated mass vs volume plots as line plots. All lines are colored by event and overlap in one single ax.""" + for p, ff_df_p in tqdm(ff_df.groupby("place")): + plot_df = ( + ff_df_p.copy() + .reset_index() + .rename( + columns={"volume_ratio": "volume ratio", "load_ratio": "load ratio"} + ) + ) + fg = sns.FacetGrid( + plot_df.reset_index(), + hue="event", + palette="rocket", + sharex=False, + sharey=False, + height=fig_height, + aspect=1, + ).map(sns.lineplot, "volume ratio", "load ratio", alpha=0.7) + for ax in fg.axes_dict.values(): + ax.plot([0, 1], [0, 1], "--", color="gray", linewidth=1.0) + #plt.suptitle(f"{p}: accumulated load vs volume", y=1.04) + fg.savefig(out_folder / f"{p.replace(' ','')}_ff_plot.svg") + + +def ff_box_leutnant2016(ff_df: pd.DataFrame, out_folder: pathlib.Path): + """box plot graph according to Leutnant 2016. Like ff_line but with boxplots instead of lines.""" + for p, ff_df_p in tqdm(ff_df.groupby("place")): + volume_ratios = np.linspace(0.1, 1, 10) + volume_ratio_idx = pd.concat( + { + r: (ff_df_p["volume_ratio"] - r) + .dropna() + .abs() + .groupby(level=["place", "event"]) + .idxmin() + for r in volume_ratios + }, + names=["volume_ratio"], + ) + plot_df = ( + pd.DataFrame( + index=volume_ratio_idx.index, + data={ + "load_ratio": ff_df_p.loc[volume_ratio_idx, "load_ratio"].to_numpy() + }, + ) + .reset_index() + .rename( + columns={"volume_ratio": "volume ratio", "load_ratio": "load ratio"} + ) + ) + plt.figure(figsize=(fig_height * 1.2, fig_height)) + ax = sns.boxplot( + plot_df, + x="volume ratio", + y="load ratio", + ) + ax.xaxis.set_major_formatter(mpl.ticker.FormatStrFormatter("%.1f")) + plt.suptitle(p, y=10.4) + ax.get_figure().savefig(out_folder / f"{p.replace(' ','')}ff_leutnant2016.svg") + + +def ff_bachmann_2020( + ff_df: pd.DataFrame, sealed_area_m2: pd.Series, out_folder: pathlib.Path +): + """box plot graph according to Bachmann 2020. Here the volume is absolute.""" + max_mff = ff_df["mff"].quantile(0.95) + pad = 0.05 + ylim = (-pad * max_mff, (1 + pad) * max_mff) + for p, ff_df_p in tqdm(ff_df.groupby("place")): + # MFF_Vol graph according to bachmann 2020 + vol_sections = sorted( + da.volume_sections_from_raindepth( + ff_df_p["volume_cum"], sealed_area_m2[p] + ).unique() + ) + vol_section_idx = {} + for t in vol_sections: + idx = (ff_df_p["volume_cum"] - t).abs().groupby(level=["event"]).idxmin() + valid = ff_df_p["timestep_volume"].groupby(level=["event"]).sum() > t + vol_section_idx[t] = idx[valid] + vol_section_idx = pd.concat(vol_section_idx, names=["volume section"]) + plot_df = pd.DataFrame(index=vol_section_idx.index).reset_index() + plot_df["mass first flush ratio"] = ff_df_p.loc[vol_section_idx, "mff"].values + fg_box = sns.FacetGrid( + plot_df, height=fig_height, aspect=1.6, sharex=True, sharey=False, ylim=ylim + ) + fg_box.map_dataframe( + sns.boxplot, + x="volume section", + y="mass first flush ratio", + **box_kwargs, + native_scale=True, + ) + for ax in fg_box.axes_dict.values(): + ax.tick_params(axis="x", rotation=30) + ax.grid(True, axis="y") + section_size = da.calc_volume_from_raindepth(da.slice_mm, sealed_area_m2[p]) + plt.suptitle( + f"mff at volume sections({section_size} m³), storage = {p}", y=1.04 + ) + fg_box.savefig(out_folder / f"{p.replace(' ','')}ff_bachmann2020_box.svg") + fg_line = sns.FacetGrid( + plot_df, height=fig_height, aspect=1.6, sharex=True, sharey=False, ylim=ylim + ) + fg_line.map_dataframe( + sns.lineplot, + x="volume section", + y="mass first flush ratio", + estimator="mean", + errorbar=("pi", 50), + marker=".", + markersize=6, + err_style="band", + ) + for ax in fg_line.axes_dict.values(): + ax.tick_params(axis="x", rotation=30) + ax.hlines( + y=1, + xmin=0, + xmax=vol_sections[-1], + color="black", + linestyles="dashed", + linewidth=0.5, + ) + ax.grid(True, axis="y") + plt.suptitle( + f"mff at volume sections ({section_size} m³), storage = {p}", y=1.04 + ) + fg_line.savefig(out_folder / f"{p.replace(' ','')}ff_bachmann2020_line.svg") + + +def ff_bach_2010( + ff_df: pd.DataFrame, sealed_area_m2: pd.Series, out_folder: pathlib.Path, slice_size:float = 1 +): + """ + see: + Peter M. Bach, David T. McCarthy, Ana Deletic, + Redefining the stormwater first flush phenomenon, + Water Research, + Volume 44, Issue 8, + 2010, + Pages 2487-2498, + ISSN 0043-1354, + https://doi.org/10.1016/j.watres.2010.01.022. + (https://www.sciencedirect.com/science/article/pii/S0043135410000564) + """ + # In the process of creatign the plots some data is collected, this is returned in the dataframe + ff_info = pd.DataFrame() + for storage, ff_part in ff_df.groupby("place"): + df = ff_part.copy() + # Calculate the rainfall depth + df["rainfall_depth"] = da.calc_raindepth(df["volume_cum"], sealed_area_m2[storage]) + # note that this is the number if the slice, starting from 0, not the upper bound of the slice + df["slice_num"] = np.floor(df["rainfall_depth"] / slice_size).astype(int) + # mean concentration for a slice should be a flow-weighted sum: sum(afs*discharge*dt)/sum(sum(discharge*dt)) + # since timestep is constant, we can leave out dt. do not use transport here, otherwise units would need to be converted. + df["afs_x_discharge"] = df["afs"]*df["discharge"] + sg = df.groupby(["event", "slice_num"]) + slice_stats = pd.DataFrame( + { + "slice_size_max": sg["rainfall_depth"].max(), + "mean_conc": sg["afs_x_discharge"].sum()/sg["discharge"].sum(), + } + ).reset_index() + # filter slices with fewer that 5 values + slice_stats["ev_count"]=slice_stats.groupby("slice_num")["event"].transform("count") + slice_stats = slice_stats.query("ev_count >= 5").copy() + # merge slices that are similar + slice_stats["original_slice_num"]=slice_stats["slice_num"] + max_slice_num = slice_stats["slice_num"].max() + # go over all slices and assign merged_slice_num to the next slice num if the slices are similar + # note that the slices are filtered with merged slice_num instead of original_slice_num, + # so that next_means is compared against the whole merge groupp. + for i in range(max_slice_num): + i_mask = slice_stats["slice_num"]==i + i_means = slice_stats.loc[i_mask,"mean_conc"].to_numpy() + next_means = slice_stats.loc[slice_stats["slice_num"]==(i+1),"mean_conc"].to_numpy() + # Perform the Wilcoxon Rank Sum Test + _, p_value = scipy.stats.ranksums( + i_means, next_means + ) + # Check if the p-value is greater than 0.05 + if p_value > 0.05: + slice_stats.loc[i_mask,"slice_num"]=i+1 + # slice_stats is mostly accessed by slice num, set that as index + slice_stats = slice_stats.set_index("slice_num") + # after the merge slice num is not a simple range, use slice_nums to iterate over all values + slice_nums=np.array(sorted(slice_stats.index.unique())) + # plot + fig, ax = plt.subplots() + # Upper bounds of the slices, used for x-ticks + slice_ubounds = slice_nums*slice_size+slice_size + # Widths of the slices and corresponding box plots. + # Calculate the widths based on the differences to previous values, just the value for the first entry + widths = slice_ubounds - np.roll(slice_ubounds,1) + widths[0] = slice_ubounds[0] + # x-positions of the plots, each boxplot is horizontally centered around this position. + # These positions are also used for extra notes. + positions = slice_ubounds - widths/2 + # the values used in the boxplot, list with one array for each boxplot + boxplot_data = [] + for i in slice_nums: + boxplot_data.append(slice_stats.loc[[i], "mean_conc"].to_numpy()) + # subtract some padding from the boxplot widths, so that they have a gap. + wpad = 0.05 + ax.boxplot( + boxplot_data, + positions=positions, + widths=widths - wpad * 2, + boxprops=dict(facecolor="#65b22e"), + flierprops=dict( + marker="o", + markersize=1, + markerfacecolor="none", + markeredgecolor="black", + ), + medianprops=dict(color="black"), + **box_kwargs, + patch_artist=True, + ) + ax.set_xticks(slice_ubounds) + ax.set_xticklabels([str(s) for s in slice_ubounds]) + # Set axis limits + ax.set_xlim(-0.5, max(slice_ubounds) + 0.5) + ax.set_ylim(-10, 1000) + + # Add number of values in a slice above each boxplot + for i in range(len(slice_nums)): + ax.text( + positions[i], + 1.007, + f"n={len(boxplot_data[i])}", + transform=ax.get_xaxis_transform(), + horizontalalignment="center", + size="small", + weight="semibold", + color="black", + rotation=45, + ) + + # Set axis labels + ax.set_ylabel("TSS Concentration (mg/l)") + ax.set_xlabel("Cumulative runoff depth (mm)") + + # Reduces the borders to fit in the labels + fig.tight_layout() + #fig.suptitle(storage, y=1.06) + # Save the plot + file_path = ( + out_folder + / f"{storage.replace(' ','')}_bachmann_runoffdepth_conc_1mm_combinedslices_old.svg" + ) + fig.savefig(file_path, format="svg", bbox_inches="tight") + + # plot load + df["rainfall_depth_ceil"] = ( + np.ceil(df["rainfall_depth"] / slice_size) * slice_size + ) + max_x = max(slice_ubounds) + 0.5 + df_cut = df.query("rainfall_depth<=@max_x") + + # using the mean of the accumulated load over each event is confusing, because this mean is not monotonic increasing + # instead we sum up the changes in load for each rainfall_depth_ceil section per event + # and than take the mean of the change over the events. + # finally the changes are accumulated with cumsum(). + load_mean = ( + df_cut.groupby(["event", "rainfall_depth_ceil"])["timestep_load"] + .sum() + .groupby("rainfall_depth_ceil") + .mean() + .cumsum() + .to_numpy() + ) + load_depth_slices = df_cut["rainfall_depth_ceil"].unique() + # add 0,0 + load_depth_slices = np.concatenate([[0], load_depth_slices]) + load_mean = np.concatenate([[0], load_mean]) + ax_load = ax.twinx() + ax_load.plot(load_depth_slices, load_mean, color="black", alpha=0.6) + ax_load.set_ylim(-0.01 * load_mean.max(), 1.01 * load_mean.max()) + ax_load.set_ylabel("cumulated mean TSS load (kg)") + file_path_load = ( + out_folder + / f"{storage.replace(' ','')}_bachmann_runoffdepth_conc_1mm_combinedslices_load.svg" + ) + fig.savefig(file_path_load, format="svg", bbox_inches="tight") + + # Extract data for the first and last slices + first_conc = slice_stats.loc[[slice_nums[0]], "mean_conc"] + last_conc = slice_stats.loc[[slice_nums[-1]], "mean_conc"] + if len(slice_nums) >= 3: + before_last_data = slice_stats.loc[[slice_nums[-2]]] + else: + before_last_data = pd.DataFrame(index=[0], columns=slice_stats.columns, data=np.nan) + # Perform the Wilcoxon Rank Sum Test + _, p_value = scipy.stats.ranksums(first_conc.to_numpy(), last_conc.to_numpy()) + ff_info.loc[storage, "bach_init_conc"] = first_conc.median() + ff_info.loc[storage, "bach_init_conc_mean"] = first_conc.mean() + ff_info.loc[storage, "bach_bg_conc"] = last_conc.median() + ff_info.loc[storage, "bach_ff_strength"] = p_value + ff_info.loc[storage, "bach_before_bg_conc"] = before_last_data["mean_conc"].median() + ff_info.loc[storage, "bach_max_depth"] = slice_stats["slice_size_max"].max() + ff_info.loc[storage, "bach_before_last_depth"] = before_last_data[ + "slice_size_max" + ].max() + return ff_info -- GitLab