Select Git revision
unittest_examples.py
simulation_reading.py 16.90 KiB
"""module for reading and aggregating data from res1d files. tailored for structure_analysis."""
import typing
import pathlib
from warnings import warn
import mikeio1d as mi
import mikeio1d.res1d as mir
import numpy as np
import numpy.typing as npt
import pandas as pd
from tqdm import tqdm
from entfrachten.analysis.drainage_sysytem_data import DrainageSystemData
from entfrachten.analysis import static_analysis, unit_factors as uf
def read_sim_data(
r1d_folder: pathlib.Path,
sd: DrainageSystemData,
storage_struct: pd.DataFrame,
sub_storage: pd.DataFrame,
events: list,
st_mass_balance_storages: list[str] = []
) -> dict[str, pd.DataFrame]:
"""
Read and sum up the data from the simulation results in r1d_folder.
Arguments
r1d_folder: Folder where .res1d files are read from.
sd: Data on the drainage system, see drainage_sysytem_data.py.
storage_struct: Dataframe with information on storages.
sub_storage: Dataframe with information on sub_storages.
events: list of events, res1d files for each event are found by inserting the event into a hard coded pattern.
st_mass_balance_storages: if st data is read, these storages use a mass balance instead of concentration from outflow_reaches
"""
warn("Patterns to find files are currently hard coded, adjust variables named *_path inside this file to match your project.")
dataset_reaches: dict[str, pd.Series] = _create_reach_series(storage_struct)
# For the inflow, we also need to add the flow from catchments that connect into the storage space.
storage_catchments = {}
storage_struct["in_out_volume"] = np.nan
for s in storage_struct.index:
nodes_s, reaches_s = static_analysis.in_out_reach_subset(
sd, dataset_reaches["inflow"][[s]], dataset_reaches["outflow"][[s]]
)
storage_catchments[s] = sd.catchment_df.query(
"reach in @reaches_s or node in @nodes_s"
)["id"].to_list()
r1d_catchment_filter = [c for l in storage_catchments.values() for c in l]
# max velocity has same index as reach_df, since we only need one value per reach
reach_agg = pd.DataFrame(
index=sd.reach_df.index, columns=["max_velocity"], data=np.nan
)
# # collect data parts
df_parts = {k: [] for k in dataset_reaches.keys()}
df_parts["inflow_catchments"] = []
# also collect parts for waterlevels, later use to calculate unused storage capacities
df_parts["storage_level"] = []
df_parts["sub_storage_level"] = []
# collect parts for afs at nodes
df_parts["storage_afs"] = []
df_parts["sub_storage_afs"] = []
# collect all reaches and nodes for filtering
r1d_reach_filter = list(set(str(r) for r in pd.concat(dataset_reaches)))
r1d_node_filter = storage_struct["node"].to_list() + sub_storage["node"].to_list()
for event in tqdm(events):
print("in loop for event "+str(event))
# open files
# first check if ST or AD model
# Check if the ST model file exists
conc_path_st = r1d_folder / f"{event}BaseDefault_Network_ST.res1d"
conc_path_ad = r1d_folder / f"{event}BaseDefault_Network_AD.res1d"
# Automatically switches to reading st-data if the file is found. Maybe add a function parameter?
st_mode = conc_path_st.exists()
if st_mode:
conc_path = conc_path_st
print("In analysis of ST-module")
# we will later use those reaches to calculate the concentration at the corresponding storage,
# since the model for st does not output node concentration
sub_storage_out_reaches = sub_storage["outflow_reaches"].str.split(",", expand=True).stack().droplevel(-1)
sub_storage_in_reaches = sub_storage["inflow_reaches"].str.split(",", expand=True).stack().droplevel(-1)
storage_out_reaches = storage_struct["outflow_reaches"].str.split(",", expand=True).stack().droplevel(-1)
storage_in_reaches = storage_struct["inflow_reaches"].str.split(",", expand=True).stack().droplevel(-1)
all_storage_out_reaches = pd.concat([sub_storage_out_reaches,storage_out_reaches])
all_storage_in_reaches = pd.concat([sub_storage_in_reaches,storage_in_reaches])
# add to filter, so that the values are actually read later.
r1d_reach_filter = list(set(r1d_reach_filter).union(all_storage_out_reaches).union(all_storage_in_reaches))
else:
conc_path = conc_path_ad
# now read hydraulic information
hd_path = r1d_folder / f"{event}BaseDefault_Network_HD.res1d"
conc_catch_path = (
r1d_folder / f"{event}BaseDefault_Catchment_discharge_quality.res1d"
)
hd_catch_path = r1d_folder / f"{event}BaseDefault_Catchment_discharge.res1d"
#check if data is available
if (
conc_path.exists()
and hd_path.exists()
and conc_catch_path.exists()
and hd_catch_path.exists()
):
print(f"found data for {event}")
else:
print(f"No data for {event}")
continue
res_conc = mir.Res1D(
str(conc_path), reaches=r1d_reach_filter, nodes=r1d_node_filter
)
# we need velocities for all reaches, therefore do not limit the reaches to r1d_reach_filter
res_hd = mir.Res1D(
str(hd_path),
reaches=sd.reach_df.index.to_list(),
nodes=r1d_node_filter,
)
res_catch_hd = mir.Res1D(str(hd_catch_path), catchments=r1d_catchment_filter)
res_catch_conc = mir.Res1D(
str(conc_catch_path), catchments=r1d_catchment_filter
)
time_idx = res_conc.time_index.intersection(res_hd.time_index).intersection(res_catch_hd.time_index).intersection(res_catch_conc.time_index)
if len(time_idx)==0:
warn(f"Data for {event}, but times of files do not intersect")
continue
# read values
# where to read what
if st_mode:
col_res1d_quantity = [
("afs_transport", res_conc, "MassTransportTotal"),
("discharge", res_hd, "Discharge"),
]
else:
col_res1d_quantity = [
("afs_transport", res_conc, "AFSTransport"),
("discharge", res_hd, "Discharge"),
]
reach_start_end = {"outflow": "end", "inflow": "start", "overflow": "start"}
for dataset_name in dataset_reaches.keys():
for s in storage_struct.index:
reaches_to_read = dataset_reaches[dataset_name].loc[
dataset_reaches[dataset_name].index == s
]
if len(reaches_to_read) >= 1:
part_df = read_reaches_sum(
reaches_to_read,
col_res1d_quantity,
reach_start_end[dataset_name],
)
part_df = part_df.assign(event=event, place=s)
df_parts[dataset_name].append(part_df)
# read catchment values neglects the ST results by always using the AD results
catch_col_r1d_quantity = [
("afs_transport", res_catch_conc, "AFSLoadCD"),
("discharge", res_catch_hd, "CatchmentDischarge"),
]
for s in storage_struct.index:
if len(storage_catchments[s]) >= 1:
part_df = read_catchments_sum(
storage_catchments[s], catch_col_r1d_quantity
)
part_df = part_df.assign(event=event, place=s)
df_parts["inflow_catchments"].append(part_df)
# read waterlevel for nodes and match it to the index of storage_df/special_storage
for sdf, part_name in [
(storage_struct, "storage_level"),
(sub_storage, "sub_storage_level"),
]:
water_level_part = res_hd.read(
[
mir.QueryDataNode(name=node, quantity="WaterLevel")
for node in sdf["node"]
]
)
water_level_part = water_level_part.loc[
:, "WaterLevel:" + sdf["node"]
].copy()
water_level_part.index.name = "datetime"
water_level_part.columns = sdf.index.rename("place")
water_level_part = (
water_level_part.stack().rename("waterlevel").reset_index()
)
water_level_part["event"] = event
df_parts[part_name].append(water_level_part)
# read afs for nodes and match it to the index of storage_df/special_storage
for sdf, part_name in [
(storage_struct, "storage_afs"),
(sub_storage, "sub_storage_afs"),
]:
# for ST take concentration from the start of the out_reaches or if in side-line use a mass balance
if st_mode:
for s in sdf.index:
# for two sub storages the data from out_reaches was not usable, so a mass balance was used.
#
if s in st_mass_balance_storages:
# change up for sub_storage to use a mass balance
in_reaches_s = all_storage_in_reaches.loc[
all_storage_in_reaches.index == s
]
assert len(in_reaches_s) >= 1, f"No inflow reaches for storage {s}, required to read concentration from ST data."
# col_res1d_quantity is same as for other reaches. read at start to be near the node
# this reads tranport and discharge
afs_part = read_reaches_sum(
in_reaches_s,
col_res1d_quantity,
"start",
)
# calculate concentration
afs_part["afs"]=(afs_part["afs_transport"].cumsum()/afs_part["discharge"].cumsum()*1000).clip(lower=0)
# discard discharge and concentration
afs_part=afs_part[["datetime","afs"]]
# add event and place
afs_part = afs_part.assign(event=event, place=s)
df_parts[part_name].append(afs_part)
else:
out_reaches_s = all_storage_out_reaches.loc[
all_storage_out_reaches.index == s
]
assert len(out_reaches_s) >= 1, f"No outflow reaches for storage {s}, required to read concentration from ST data."
# col_res1d_quantity is same as for other reaches. read at start to be near the node
# this reads tranport and discharge
afs_part = read_reaches_sum(
out_reaches_s,
col_res1d_quantity,
"start",
)
# calculate concentration
afs_part["afs"]=(afs_part["afs_transport"]/afs_part["discharge"]*1000).clip(lower=0)
# discard discharge and concentration
afs_part=afs_part[["datetime","afs"]]
# add event and place
afs_part = afs_part.assign(event=event, place=s)
df_parts[part_name].append(afs_part)
# code for AD results
else:
afs_part = res_conc.read(
[mir.QueryDataNode(name=node, quantity="AFS") for node in sdf["node"]]
)
afs_part = afs_part.loc[:, "AFS:" + sdf["node"]].copy()
afs_part.index.name = "datetime"
afs_part.columns = sdf.index.rename("place")
afs_part = afs_part.stack().rename("afs").reset_index()
afs_part["event"] = event
df_parts[part_name].append(afs_part)
# read max velocity for event and update current overall maximum
for r in res_hd.reaches:
event_max_v = reach_max_velocity(r, res_hd, sd)
reach_agg.loc[r, "max_velocity"] = np.nanmax(
[reach_agg.loc[r, "max_velocity"], event_max_v]
)
# turn parts with time series into dataset
dataset = _parts_to_dataset(df_parts)
# add reach_agg
dataset["reach_agg"] = reach_agg
return dataset
def _create_reach_series(storage_df):
"""Extract information on which reaches need to be read from storage_df."""
reach_series = {}
reach_series["outflow"] = (
storage_df["outflow_reaches"].str.split(",", expand=True).stack().droplevel(-1)
)
reach_series["inflow"] = (
storage_df["inflow_reaches"].str.split(",", expand=True).stack().droplevel(-1)
)
reach_series["overflow"] = storage_df["weir_reach"].dropna()
return reach_series
def read_reaches_sum(
reaches: list[str],
col_r1d_quantity: list[tuple[str, mi.Res1D, str]],
end_start: typing.Literal["start", "end"],
):
"""Read and calculate sum of quantities from reaches. Since it just adds values, do not use this for concentration!"""
result_df = pd.DataFrame()
for col, r1d, quantity in col_r1d_quantity:
# get_reach_end_values returns an array
reach_end_arrays: list[npt.NDArray] = []
for r in reaches:
assert quantity in r1d.quantities
assert r in r1d.reaches
if end_start == "end":
reach_end_arrays.append(r1d.get_reach_end_values(r, quantity))
elif end_start == "start":
reach_end_arrays.append(r1d.get_reach_start_values(r, quantity))
if len(reach_end_arrays) > 0:
result_df[col] = pd.Series(sum(reach_end_arrays),index=r1d.time_index)
else:
result_df[col] = pd.Series(np.nan,index=r1d.time_index)
result_df.index.name = "datetime"
result_df = result_df.reset_index()
return result_df
def read_catchments_sum(
catchments: list[str], col_r1d_quantity: list[tuple[str, mi.Res1D, str]]
):
"""Read and calculate sum of quantities from catchments. Since it just adds values, do not use this for concentration!"""
result_df = pd.DataFrame(index=col_r1d_quantity[0][1].time_index)
for col, r1d, quantity in col_r1d_quantity:
for c in catchments:
assert quantity in r1d.quantities, f"{quantity} not in {r1d.file_path}"
assert c in r1d.catchments
col_result_df = r1d.read(
[mir.QueryDataCatchment(name=c, quantity=quantity) for c in catchments]
)
if len(col_result_df.columns) != len(catchments):
warn("count of results columns does not match catchment count")
print(catchments, col_result_df)
result_df[col] = col_result_df.sum(axis="columns")
result_df.index.name = "datetime"
result_df = result_df.reset_index()
return result_df
def reach_max_velocity(reach, res_hd, sd: DrainageSystemData) -> float:
"""Calculate maximum velocity for reach. If velocity is stored in the result files, use that instead! """
# Assume that max water level, discharge and velocity coincide.
# Since water level and discharges are calculated at different chainages,
# using max is probably more accurate than using the exact same time where one of the values is maximal.
q = res_hd.reaches[reach][1].Discharge.read().max(axis=None)
h = res_hd.reaches[reach][0].WaterLevel.read().max(axis=None)
try:
_, (a,) = sd.xs_area_at(reach, h, [0.0])
return q / a
except Exception as ex:
warn(str(ex))
return np.nan
def _parts_to_dataset(df_parts):
dataset = {}
for dn in df_parts.keys():
dataset[dn] = pd.concat(df_parts[dn], ignore_index=True).set_index(
["place", "event", "datetime"]
)
dataset[dn].columns.name = "property"
dataset["inflow"] = dataset["inflow"].add(
dataset["inflow_catchments"], fill_value=0
)
# calculate concentration, where it was not read. at many points parts need to be summed up, but concentration would need to be a weighted average.
# therefore it is simpler to calculate it from transport and discharge.
for n, df in dataset.items():
if (
"discharge" in df.columns
and "afs_transport" in df.columns
and not "afs" in df.columns
):
print("calculating afs for ", n)
df["afs"] = (df["afs_transport"] / df["discharge"]).fillna(
0
) * uf.KGPM3_TO_MGPL
# Clip is needed to avoid negative concentrations.
# Negative concentrations can appear when directly calculating concentration from discharge and transport,
# and sometimes even in the concentration values read from a res1d, so this is unlikely to be an error in our reading procedure.
for n, df in dataset.items():
if "afs" in df.columns:
df["afs"] = df["afs"].clip(lower=0)
return dataset