Skip to content
Snippets Groups Projects
Select Git revision
  • d923691784aa93a130ab5571ee77253c54156196
  • main default protected
  • dev_yhe_citymodel
  • detached
  • dev_jbr_mkr_updating_pandas
  • dev_V2X_jfu
  • dev_jbr_pareto
  • dev_jgn_debug
  • dev_jou_cme
  • dev_jfu_V2X
  • dev_msc_rolinghorizon_prediction
  • dev_dph_jkr
  • dev_jou_fsa_extract_data_quarter
  • dev_yni_network
  • dev_jou_cma_arbitrage
  • dev_jbr_readme
  • dev_jou_fsa
  • dev_demand_yni
18 results

unittest_examples.py

Blame
  • simulation_reading.py 16.90 KiB
    """module for reading and aggregating data from res1d files. tailored for structure_analysis."""
    
    import typing
    import pathlib
    from warnings import warn
    
    import mikeio1d as mi
    import mikeio1d.res1d as mir
    
    import numpy as np
    import numpy.typing as npt
    import pandas as pd
    from tqdm import tqdm
    
    from entfrachten.analysis.drainage_sysytem_data import DrainageSystemData
    from entfrachten.analysis import static_analysis, unit_factors as uf
    
    
    def read_sim_data(
        r1d_folder: pathlib.Path,
        sd: DrainageSystemData,
        storage_struct: pd.DataFrame,
        sub_storage: pd.DataFrame,
        events: list,
        st_mass_balance_storages: list[str] = []
    ) -> dict[str, pd.DataFrame]:
        """
        Read and sum up the data from the simulation results in r1d_folder.
        Arguments
        r1d_folder: Folder where .res1d files are read from.
        sd: Data on the drainage system, see drainage_sysytem_data.py.
        storage_struct: Dataframe with information on storages.
        sub_storage: Dataframe with information on sub_storages.
        events: list of events, res1d files for each event are found by inserting the event into a hard coded pattern.
        st_mass_balance_storages: if st data is read, these storages use a mass balance instead of concentration from outflow_reaches
        """
        warn("Patterns to find files are currently hard coded, adjust variables named *_path inside this file to match your project.")
        dataset_reaches: dict[str, pd.Series] = _create_reach_series(storage_struct)
        # For the inflow, we also need to add the flow from catchments that connect into the storage space.
        storage_catchments = {}
        storage_struct["in_out_volume"] = np.nan
        for s in storage_struct.index:
            nodes_s, reaches_s = static_analysis.in_out_reach_subset(
                sd, dataset_reaches["inflow"][[s]], dataset_reaches["outflow"][[s]]
            )
            storage_catchments[s] = sd.catchment_df.query(
                "reach in @reaches_s or node in @nodes_s"
            )["id"].to_list()
        r1d_catchment_filter = [c for l in storage_catchments.values() for c in l]
        # max velocity has same index as reach_df, since we only need one value per reach
        reach_agg = pd.DataFrame(
            index=sd.reach_df.index, columns=["max_velocity"], data=np.nan
        )
        # # collect data parts
        df_parts = {k: [] for k in dataset_reaches.keys()}
        df_parts["inflow_catchments"] = []
        # also collect parts for waterlevels, later use to calculate unused storage capacities
        df_parts["storage_level"] = []
        df_parts["sub_storage_level"] = []
        # collect parts for afs at nodes
        df_parts["storage_afs"] = []
        df_parts["sub_storage_afs"] = []
        # collect all reaches and nodes for filtering
        r1d_reach_filter = list(set(str(r) for r in pd.concat(dataset_reaches)))
        r1d_node_filter = storage_struct["node"].to_list() + sub_storage["node"].to_list()
        for event in tqdm(events):
            print("in loop for event "+str(event))
            # open files
            # first check if ST or AD model
            # Check if the ST model file exists
            conc_path_st = r1d_folder / f"{event}BaseDefault_Network_ST.res1d"
            conc_path_ad = r1d_folder / f"{event}BaseDefault_Network_AD.res1d"
            # Automatically switches to reading st-data if the file is found. Maybe add a function parameter?
            st_mode = conc_path_st.exists()
            if st_mode:
                conc_path = conc_path_st
                print("In analysis of ST-module")
                # we will later use those reaches to calculate the concentration at the corresponding storage,
                # since the model for st does not output node concentration
                sub_storage_out_reaches = sub_storage["outflow_reaches"].str.split(",", expand=True).stack().droplevel(-1)
                sub_storage_in_reaches = sub_storage["inflow_reaches"].str.split(",", expand=True).stack().droplevel(-1)
                storage_out_reaches = storage_struct["outflow_reaches"].str.split(",", expand=True).stack().droplevel(-1)
                storage_in_reaches = storage_struct["inflow_reaches"].str.split(",", expand=True).stack().droplevel(-1)
                all_storage_out_reaches = pd.concat([sub_storage_out_reaches,storage_out_reaches])
                all_storage_in_reaches = pd.concat([sub_storage_in_reaches,storage_in_reaches])
                # add to filter, so that the values are actually read later.
                r1d_reach_filter = list(set(r1d_reach_filter).union(all_storage_out_reaches).union(all_storage_in_reaches))            
            else:
                conc_path = conc_path_ad
            # now read hydraulic information
            hd_path = r1d_folder / f"{event}BaseDefault_Network_HD.res1d"
            conc_catch_path = (
                r1d_folder / f"{event}BaseDefault_Catchment_discharge_quality.res1d"
            )
            hd_catch_path = r1d_folder / f"{event}BaseDefault_Catchment_discharge.res1d"
            #check if data is available
            if (
                conc_path.exists()
                and hd_path.exists()
                and conc_catch_path.exists()
                and hd_catch_path.exists()
            ):
                print(f"found data for {event}")
            else:
                print(f"No data for {event}")
                continue
            res_conc = mir.Res1D(
                str(conc_path), reaches=r1d_reach_filter, nodes=r1d_node_filter
            )
            # we need velocities for all reaches, therefore do  not limit the reaches to r1d_reach_filter
            res_hd = mir.Res1D(
                str(hd_path),
                reaches=sd.reach_df.index.to_list(),
                nodes=r1d_node_filter,
            )
            res_catch_hd = mir.Res1D(str(hd_catch_path), catchments=r1d_catchment_filter)
            res_catch_conc = mir.Res1D(
                str(conc_catch_path), catchments=r1d_catchment_filter
            )
            time_idx = res_conc.time_index.intersection(res_hd.time_index).intersection(res_catch_hd.time_index).intersection(res_catch_conc.time_index)
            if len(time_idx)==0:
                warn(f"Data for {event}, but times of files do not intersect")
                continue
            # read values
            # where to read what
            if st_mode:
                col_res1d_quantity = [
                    ("afs_transport", res_conc, "MassTransportTotal"),
                    ("discharge", res_hd, "Discharge"),
                ]
            else:
                col_res1d_quantity = [
                    ("afs_transport", res_conc, "AFSTransport"),
                    ("discharge", res_hd, "Discharge"),
                ]
            reach_start_end = {"outflow": "end", "inflow": "start", "overflow": "start"}
            for dataset_name in dataset_reaches.keys():
                for s in storage_struct.index:
                    reaches_to_read = dataset_reaches[dataset_name].loc[
                        dataset_reaches[dataset_name].index == s
                    ]
                    if len(reaches_to_read) >= 1:
                        part_df = read_reaches_sum(
                            reaches_to_read,
                            col_res1d_quantity,
                            reach_start_end[dataset_name],
                        )
                        part_df = part_df.assign(event=event, place=s)
                        df_parts[dataset_name].append(part_df)
            # read catchment values neglects the ST results by always using the AD results
            catch_col_r1d_quantity = [
                ("afs_transport", res_catch_conc, "AFSLoadCD"),
                ("discharge", res_catch_hd, "CatchmentDischarge"),
            ]     
    
            for s in storage_struct.index:
                if len(storage_catchments[s]) >= 1:
                    part_df = read_catchments_sum(
                        storage_catchments[s], catch_col_r1d_quantity
                    )
                    part_df = part_df.assign(event=event, place=s)
                    df_parts["inflow_catchments"].append(part_df)
    
            # read waterlevel for nodes and match it to the index of storage_df/special_storage
            for sdf, part_name in [
                (storage_struct, "storage_level"),
                (sub_storage, "sub_storage_level"),
            ]:
                water_level_part = res_hd.read(
                    [
                        mir.QueryDataNode(name=node, quantity="WaterLevel")
                        for node in sdf["node"]
                    ]
                )
                water_level_part = water_level_part.loc[
                    :, "WaterLevel:" + sdf["node"]
                ].copy()
                water_level_part.index.name = "datetime"
                water_level_part.columns = sdf.index.rename("place")
                water_level_part = (
                    water_level_part.stack().rename("waterlevel").reset_index()
                )
                water_level_part["event"] = event
                df_parts[part_name].append(water_level_part)
            # read afs for nodes and match it to the index of storage_df/special_storage
            for sdf, part_name in [
                (storage_struct, "storage_afs"),
                (sub_storage, "sub_storage_afs"),
            ]:
                # for ST take concentration from the start of the out_reaches or if in side-line use a mass balance
                if st_mode:
                    for s in sdf.index:
                        # for two sub storages the data from out_reaches was not usable, so a mass balance was used.
                        # 
                        if s in st_mass_balance_storages:
                            # change up for sub_storage to use a mass balance
                            in_reaches_s = all_storage_in_reaches.loc[
                                all_storage_in_reaches.index == s
                            ]
                            assert len(in_reaches_s) >= 1, f"No inflow reaches for storage {s}, required to read concentration from ST data."
                            # col_res1d_quantity is same as for other reaches. read at start to be near the node
                            # this reads tranport and discharge
                            afs_part = read_reaches_sum(
                                in_reaches_s,
                                col_res1d_quantity,
                                "start",
                            )
                            # calculate concentration
                            afs_part["afs"]=(afs_part["afs_transport"].cumsum()/afs_part["discharge"].cumsum()*1000).clip(lower=0)
                            # discard discharge and concentration
                            afs_part=afs_part[["datetime","afs"]]
                            # add event and place
                            afs_part = afs_part.assign(event=event, place=s)
                            df_parts[part_name].append(afs_part) 
                        else:    
                            out_reaches_s = all_storage_out_reaches.loc[
                                all_storage_out_reaches.index == s
                            ]
                            assert len(out_reaches_s) >= 1, f"No outflow reaches for storage {s}, required to read concentration from ST data."
                            # col_res1d_quantity is same as for other reaches. read at start to be near the node
                            # this reads tranport and discharge
                            afs_part = read_reaches_sum(
                                out_reaches_s,
                                col_res1d_quantity,
                                "start",
                            )
                            # calculate concentration
                            afs_part["afs"]=(afs_part["afs_transport"]/afs_part["discharge"]*1000).clip(lower=0)
                            # discard discharge and concentration
                            afs_part=afs_part[["datetime","afs"]]
                            # add event and place
                            afs_part = afs_part.assign(event=event, place=s)
                            df_parts[part_name].append(afs_part)         
                # code for AD results
                else:
                    afs_part = res_conc.read(
                        [mir.QueryDataNode(name=node, quantity="AFS") for node in sdf["node"]]
                    )
                    afs_part = afs_part.loc[:, "AFS:" + sdf["node"]].copy()
                    afs_part.index.name = "datetime"
                    afs_part.columns = sdf.index.rename("place")
                    afs_part = afs_part.stack().rename("afs").reset_index()
                    afs_part["event"] = event
                    
                    df_parts[part_name].append(afs_part)
            # read max velocity for event and update current overall maximum
            for r in res_hd.reaches:
                event_max_v = reach_max_velocity(r, res_hd, sd)
                reach_agg.loc[r, "max_velocity"] = np.nanmax(
                    [reach_agg.loc[r, "max_velocity"], event_max_v]
                )
        # turn parts with time series into dataset
        dataset = _parts_to_dataset(df_parts)
        # add reach_agg
        dataset["reach_agg"] = reach_agg
        return dataset
    
    
    def _create_reach_series(storage_df):
        """Extract information on which reaches need to be read from storage_df."""
        reach_series = {}
        reach_series["outflow"] = (
            storage_df["outflow_reaches"].str.split(",", expand=True).stack().droplevel(-1)
        )
        reach_series["inflow"] = (
            storage_df["inflow_reaches"].str.split(",", expand=True).stack().droplevel(-1)
        )
        reach_series["overflow"] = storage_df["weir_reach"].dropna()
        return reach_series
    
    
    def read_reaches_sum(
        reaches: list[str],
        col_r1d_quantity: list[tuple[str, mi.Res1D, str]],
        end_start: typing.Literal["start", "end"],
    ):
        """Read and calculate sum of quantities from reaches. Since it just adds values, do not use this for concentration!"""
        result_df = pd.DataFrame()
        for col, r1d, quantity in col_r1d_quantity:
            # get_reach_end_values returns an array
            reach_end_arrays: list[npt.NDArray] = []
            for r in reaches:
                assert quantity in r1d.quantities
                assert r in r1d.reaches
                if end_start == "end":
                    reach_end_arrays.append(r1d.get_reach_end_values(r, quantity))
                elif end_start == "start":
                    reach_end_arrays.append(r1d.get_reach_start_values(r, quantity))
            if len(reach_end_arrays) > 0:
                result_df[col] = pd.Series(sum(reach_end_arrays),index=r1d.time_index)
            else:
                result_df[col] = pd.Series(np.nan,index=r1d.time_index)
        result_df.index.name = "datetime"
        result_df = result_df.reset_index()
        return result_df
    
    
    def read_catchments_sum(
        catchments: list[str], col_r1d_quantity: list[tuple[str, mi.Res1D, str]]
    ):
        """Read and calculate sum of quantities from catchments. Since it just adds values, do not use this for concentration!"""
        result_df = pd.DataFrame(index=col_r1d_quantity[0][1].time_index)
        for col, r1d, quantity in col_r1d_quantity:
            for c in catchments:
                assert quantity in r1d.quantities, f"{quantity} not in {r1d.file_path}"
                assert c in r1d.catchments
            col_result_df = r1d.read(
                [mir.QueryDataCatchment(name=c, quantity=quantity) for c in catchments]
            )
            if len(col_result_df.columns) != len(catchments):
                warn("count of results columns does not match catchment count")
                print(catchments, col_result_df)
            result_df[col] = col_result_df.sum(axis="columns")
        result_df.index.name = "datetime"
        result_df = result_df.reset_index()
        return result_df
    
    
    def reach_max_velocity(reach, res_hd, sd: DrainageSystemData) -> float:
        """Calculate maximum velocity for reach. If velocity is stored in the result files, use that instead! """
        # Assume that max water level, discharge and velocity coincide.
        # Since water level and discharges are calculated at different chainages,
        # using max is probably more accurate than using the exact same time where one of the values is maximal.
        q = res_hd.reaches[reach][1].Discharge.read().max(axis=None)
        h = res_hd.reaches[reach][0].WaterLevel.read().max(axis=None)
        try:
            _, (a,) = sd.xs_area_at(reach, h, [0.0])
            return q / a
        except Exception as ex:
            warn(str(ex))
            return np.nan
    
    
    def _parts_to_dataset(df_parts):
        dataset = {}
        for dn in df_parts.keys():
            dataset[dn] = pd.concat(df_parts[dn], ignore_index=True).set_index(
                ["place", "event", "datetime"]
            )
            dataset[dn].columns.name = "property"
        dataset["inflow"] = dataset["inflow"].add(
            dataset["inflow_catchments"], fill_value=0
        )
        # calculate concentration, where it was not read. at many points parts need to be summed up, but concentration would need to be a weighted average.
        # therefore it is simpler to calculate it from transport and discharge.
        for n, df in dataset.items():
            if (
                "discharge" in df.columns
                and "afs_transport" in df.columns
                and not "afs" in df.columns
            ):
                print("calculating afs for ", n)
                df["afs"] = (df["afs_transport"] / df["discharge"]).fillna(
                    0
                ) * uf.KGPM3_TO_MGPL
        # Clip is needed to avoid negative concentrations.
        # Negative concentrations can appear when directly calculating concentration from discharge and transport,
        # and sometimes even in the concentration values read from a res1d, so this is unlikely to be an error in our reading procedure.
        for n, df in dataset.items():
            if "afs" in df.columns:
                df["afs"] = df["afs"].clip(lower=0)
        return dataset