diff --git a/u4py/analysis/classify.py b/u4py/analysis/classify.py index b4fec9b02cebd08269f394138526203b178bda48..2fcd0a616d94c9c73a3c80c665a32aba546785ca 100644 --- a/u4py/analysis/classify.py +++ b/u4py/analysis/classify.py @@ -16,6 +16,7 @@ import u4py.analysis.spatial as u4spatial import u4py.io.gpkg as u4gpkg import u4py.io.tiff as u4tiff import u4py.plotting.plots as u4plots +from u4py.utils.types import U4ResDict def classify_shape( @@ -29,7 +30,7 @@ def classify_shape( save_shapes: bool = False, save_fig: bool = False, save_report: bool = False, -) -> dict: +) -> U4ResDict: """Classifies a subset of shapes in `shp_gdf` based on the group. To include the surroundings of the shapes, a buffered hull around all shapes is used. @@ -55,7 +56,7 @@ def classify_shape( :param save_report: Create a text report containing the results, defaults to False :type save_report: bool, optional :return: The results as a dictionary for further processing - :rtype: dict + :rtype: U4ResDict As of now the following classifications are included: @@ -184,6 +185,9 @@ def classify_shape( ) res.update(hydrogeology(res, sub_set_hull, out_folder=cache_path)) res.update(topsoil(res, sub_set_hull, out_folder=cache_path)) + res.update( + structural_area(res, sub_set_hull, out_folder=cache_path) + ) # Geohazard res.update(landslides(res, sub_set_hull, shp_path=hlnug_path)) @@ -206,11 +210,11 @@ def classify_shape( return dict() -def preallocate_results() -> dict: +def preallocate_results() -> U4ResDict: """Preallocates all possible results for unified output and easier generation of master geodataframe. :return: A dictionary containing all keys but with empty values. - :rtype: dict + :rtype: U4ResDict """ res = { "additional_areas": np.nan, @@ -238,6 +242,8 @@ def preallocate_results() -> dict: "geology_area": [], "geology_percent": [], "geology_units": [], + "geology_mapnum": [], + "geology_mapname": [], "geometry": gp.GeoDataFrame(), "group": np.nan, "hydro_area": [], @@ -270,7 +276,15 @@ def preallocate_results() -> dict: "manual_unclear_1": False, "manual_unclear_2": False, "manual_unclear_3": False, - "roads_has_motorway": np.nan, + "roads_has_motorway": False, + "roads_has_primary": False, + "roads_has_secondary": False, + "roads_motorway_names": [], + "roads_primary_names": [], + "roads_secondary_names": [], + "roads_motorway_length": [], + "roads_primary_length": [], + "roads_secondary_length": [], "roads_main_area": np.nan, "roads_main": gp.GeoDataFrame(), "roads_minor_area": np.nan, @@ -282,6 +296,9 @@ def preallocate_results() -> dict: "shape_ellipse_theta": [], "shape_flattening": [], "shape_roundness": [], + "shape_width": [], + "shape_breadth": [], + "shape_aspect": [], "slope_hull_mean_14": [], "slope_hull_mean_19": [], "slope_hull_mean_21": [], @@ -304,6 +321,7 @@ def preallocate_results() -> dict: "subsidence_percent": [], "subsidence_total": np.nan, "subsidence_units": [], + "structural_region": [], "timeseries_annual_cosine": np.nan, "timeseries_annual_max_amplitude": np.nan, "timeseries_annual_max_time": np.nan, @@ -332,7 +350,7 @@ def preallocate_results() -> dict: def roads( - res: dict, + res: U4ResDict, sub_set_hull: gp.GeoDataFrame, osm_path: os.PathLike, shp_cfg: dict, @@ -340,7 +358,7 @@ def roads( """Loads the roads from the openstreetmap database and returns the area of minor and major roads. Also establishes if a motorway is present in the area. :param res: The results dictionary as of now. - :type res: dict + :type res: U4ResDict :param sub_set_hull: The area where to look for roads. :type sub_set_hull: gp.GeoDataFrame :param osm_path: The path to the openstreetmap database. @@ -348,7 +366,7 @@ def roads( :param shp_cfg: The shape config containing buffer sizes. :type shp_cfg: dict :return: An updated version of the results dictionary. - :rtype: dict + :rtype: U4ResDict """ def get_clipped_road_area( @@ -390,8 +408,6 @@ def roads( if len(roads_data) > 0: logging.info("Classifying Road Data") - # Look for motorways - res["roads_has_motorway"] = "motorway" in roads_data.fclass.to_list() # Extract area of main roads res["roads_main"] = get_clipped_road_area( @@ -415,22 +431,49 @@ def roads( ) res["additional_areas"] += res["roads_minor_area"] + # Look for road classes + road_classes = roads_data.fclass.to_list() + if ("motorway" in road_classes) or ("trunk" in road_classes): + res["roads_has_motorway"] = True + if "primary" in road_classes: + res["roads_has_primary"] = True + for ii, road in roads_data[ + roads_data.fclass == "primary" + ].iterrows(): + if road.ref not in res["roads_primary_names"]: + res["roads_primary_names"].append(road.ref) + res["roads_primary_length"].append(road.geometry.length) + else: + ridx = res["roads_primary_names"].index(road.ref) + res["roads_primary_length"][ridx] += road.geometry.length + + if "secondary" in road_classes: + res["roads_has_secondary"] = True + for ii, road in roads_data[ + roads_data.fclass == "secondary" + ].iterrows(): + if road.ref not in res["roads_secondary_names"]: + res["roads_secondary_names"].append(road.ref) + res["roads_secondary_length"].append(road.geometry.length) + else: + ridx = res["roads_secondary_names"].index(road.ref) + res["roads_secondary_length"][ridx] += road.geometry.length return res def buildings( - res: dict, sub_set_hull: gp.GeoDataFrame, osm_path: os.PathLike -) -> dict: + res: U4ResDict, sub_set_hull: gp.GeoDataFrame, osm_path: os.PathLike +) -> U4ResDict: """Calculates the area that is covered by buildings :param res: The results dictionary including some previous results. - :type res: dict + :type res: U4ResDict :param sub_set_hull: The hull of the area. :type sub_set_hull: gp.GeoDataFrame :param osm_path: The path to the osm dataset. :type osm_path: os.PathLike :return: The results dictionary with the classified data appended. - :rtype: dict + :rtype: U4ResDict """ logging.info("Loading Building Data") res["buildings"] = u4gpkg.load_gpkg_data_region_ogr( @@ -445,15 +488,15 @@ def buildings( def rivers_water( - res: dict, + res: U4ResDict, sub_set_hull: gp.GeoDataFrame, osm_path: os.PathLike, shp_cfg: dict, -) -> dict: +) -> U4ResDict: """Calculates the area that is covered by water :param res: The results dictionary including some previous results. - :type res: dict + :type res: U4ResDict :param sub_set_hull: The hull of the area. :type sub_set_hull: gp.GeoDataFrame :param osm_path: The path to the osm dataset. @@ -461,7 +504,7 @@ def rivers_water( :param shp_cfg: The shape config to calculate the buffers for the rivers. :type shp_cfg: dict :return: The results dictionary with the classified data appended. - :rtype: dict + :rtype: U4ResDict """ logging.info("Loading River Data") rivers_data = u4gpkg.load_gpkg_data_region_ogr( @@ -491,19 +534,19 @@ def rivers_water( def landuse( - res: dict, sub_set_hull: gp.GeoDataFrame, osm_path: os.PathLike -) -> dict: + res: U4ResDict, sub_set_hull: gp.GeoDataFrame, osm_path: os.PathLike +) -> U4ResDict: """Calculates the area of each landuse including some other landuses from previous results, e.g. roads or water. :param res: The results dictionary including some previous results. - :type res: dict + :type res: U4ResDict :param sub_set_hull: The hull of the area. :type sub_set_hull: gp.GeoDataFrame :param osm_path: The path to the osm dataset. :type osm_path: os.PathLike :return: The results dictionary with the classified data appended. - :rtype: dict + :rtype: U4ResDict """ logging.info("Loading Landuse Data") landuse_data = u4gpkg.load_gpkg_data_region_ogr( @@ -579,7 +622,7 @@ def slope( dem_path_14: os.PathLike, dem_path_19: os.PathLike, dem_path_21: os.PathLike, -) -> dict: +) -> U4ResDict: """Calculates the average slope in the area :param sub_set_hull: The hull of the area. @@ -639,27 +682,33 @@ def slope( return res -def shape(sub_set: gp.GeoDataFrame) -> dict: +def shape(sub_set: gp.GeoDataFrame) -> U4ResDict: """Calculates shape parameters for all shapes in the geodataframe. :param sub_set: The geodataframe with the polygons of the group. :type sub_set: gp.GeoDataFrame :return: The results dictionary with the classified data appended. - :rtype: dict + :rtype: U4ResDict """ logging.info("Computing shape parameters") roundness = u4spatial.roundness(sub_set) aa, bb, tt, flattn = u4spatial.flattening(sub_set) + min_f, max_f = u4spatial.feret_diameters( + sub_set.convex_hull.geometry.get_coordinates().to_numpy() + ) res = dict() res["shape_roundness"] = roundness res["shape_ellipse_a"] = aa res["shape_ellipse_b"] = bb res["shape_ellipse_theta"] = tt res["shape_flattening"] = flattn + res["shape_width"] = max_f + res["shape_breadth"] = min_f + res["shape_aspect"] = max_f / min_f return res -def volume(geometry: gp.GeoDataFrame, diffplan_path: os.PathLike) -> dict: +def volume(geometry: gp.GeoDataFrame, diffplan_path: os.PathLike) -> U4ResDict: """Calculates the some volumetric quantities for the input geometry. :param geometry: The geometry where to calculate the volume. @@ -667,7 +716,7 @@ def volume(geometry: gp.GeoDataFrame, diffplan_path: os.PathLike) -> dict: :param diffplan_path: The path to the folder where the diff plan tiffs are located. :type diffplan_path: os.PathLike :return: The results dictionary with the classified data appended. - :rtype: dict + :rtype: U4ResDict """ logging.info("Computing volumes in shapes") volumes = u4tiff.calculate_volume_in_shape(geometry, diffplan_path) @@ -686,15 +735,15 @@ def volume(geometry: gp.GeoDataFrame, diffplan_path: os.PathLike) -> dict: def geology( - res: dict, + res: U4ResDict, sub_set_hull: gp.GeoDataFrame, out_folder: os.PathLike = "", use_internal: bool = True, -) -> dict: +) -> U4ResDict: """Gets the geological units and their spatial extend in the area. :param res: The results dictionary including some previous results. - :type res: dict + :type res: U4ResDict :param sub_set_hull: The hull of the area. :type sub_set_hull: gp.GeoDataFrame :param out_folder: The path to a gpkg file containing the data (not implemented yet), defaults to "" @@ -702,7 +751,7 @@ def geology( :param use_internal: Try internal web server first, defaults to True. :type use_internal: os.PathLike, optional :return: The results dictionary with the classified data appended. - :rtype: dict + :rtype: U4ResDict """ logging.info("Looking for geology data") if use_internal: @@ -716,7 +765,21 @@ def geology( region_crs=sub_set_hull.crs, out_folder=out_folder, suffix=f"{res['group']:05}", - ).clip(sub_set_hull) + ) + if geology_data.empty: + logging.debug("Retrying to get geological data.") + geology_data = u4web.query_internal( + "gk25-hessen:GK25_f_GK3", + region=[ + bounds.minx, + bounds.miny, + bounds.maxx, + bounds.maxy, + ], + region_crs=sub_set_hull.crs, + out_folder=out_folder, + suffix=f"{res['group']:05}", + ) if len(geology_data) < 1: logging.info("Got empty response, try HLNUG") raise NotImplementedError @@ -729,7 +792,32 @@ def geology( region=sub_set_hull, out_folder=out_folder, suffix=f"{res['group']:05}", - ).clip(sub_set_hull) + ) + if geology_data.empty: + logging.debug("Retrying to get geological data.") + geology_data = u4web.query_hlnug( + "geologie/gk25/MapServer", + "Geologie (Kartiereinheiten)", + region=sub_set_hull, + out_folder=out_folder, + suffix=f"{res['group']:05}", + ) + geology_metadata = u4web.query_hlnug( + "geologie/gk25/MapServer", + "GK25 Blattschnitt", + region=sub_set_hull, + out_folder=out_folder, + suffix=f"{res['group']:05}", + ) + if geology_metadata.empty: + logging.debug("Retrying to get geological metadata.") + geology_metadata = u4web.query_hlnug( + "geologie/gk25/MapServer", + "GK25 Blattschnitt", + region=sub_set_hull, + out_folder=out_folder, + suffix=f"{res['group']:05}", + ) else: unit_name = "GEOLOGISCHE_EINHEIT" petro_name = "PETROGRAPHIE" @@ -739,9 +827,35 @@ def geology( region=sub_set_hull, out_folder=out_folder, suffix=f"{res['group']:05}", - ).clip(sub_set_hull) + ) + if geology_data.empty: + logging.debug("Retrying to get geological data.") + geology_data = u4web.query_hlnug( + "geologie/gk25/MapServer", + "Geologie (Kartiereinheiten)", + region=sub_set_hull, + out_folder=out_folder, + suffix=f"{res['group']:05}", + ) + geology_metadata = u4web.query_hlnug( + "geologie/gk25/MapServer", + "GK25 Blattschnitt", + region=sub_set_hull, + out_folder=out_folder, + suffix=f"{res['group']:05}", + ) + if geology_metadata.empty: + logging.debug("Retrying to get geological metadata.") + geology_metadata = u4web.query_hlnug( + "geologie/gk25/MapServer", + "GK25 Blattschnitt", + region=sub_set_hull, + out_folder=out_folder, + suffix=f"{res['group']:05}", + ) if len(geology_data) > 0: + geology_data = geology_data.clip(sub_set_hull) # Replace empty unit names with petrographic description unit_list = geology_data[unit_name] if None in unit_list: @@ -764,26 +878,28 @@ def geology( round((area / res["area"]) * 100, 1) for area in res["geology_area"] ] + res["geology_mapnum"] = list(np.unique(geology_metadata.GK25_NUMMER)) + res["geology_mapname"] = list(np.unique(geology_metadata.GK25_NAME)) else: logging.info(f"No geology data found for group {res['group']:05}.") return res def hydrogeology( - res: dict, + res: U4ResDict, sub_set_hull: gp.GeoDataFrame, out_folder: os.PathLike = "", -) -> dict: +) -> U4ResDict: """Gets the hydraulic conductivity and their spatial extend in the area. :param res: The results dictionary including some previous results. - :type res: dict + :type res: U4ResDict :param sub_set_hull: The hull of the area. :type sub_set_hull: gp.GeoDataFrame :param out_folder: The path to a gpkg file containing the data (not implemented yet), defaults to "" :type out_folder: os.PathLike, optional :return: The results dictionary with the classified data appended. - :rtype: dict + :rtype: U4ResDict """ logging.info("Querying HLNUG for hydro_units_data") hydro_units_data = u4web.query_hlnug( @@ -792,9 +908,19 @@ def hydrogeology( region=sub_set_hull, out_folder=out_folder, suffix=f"{res['group']:05}", - ).clip(sub_set_hull) + ) + if hydro_units_data.empty: + logging.debug("Retrying to get hydrological information") + hydro_units_data = u4web.query_hlnug( + "geologie/huek200/MapServer", + "Hydrogeologische Einheiten", + region=sub_set_hull, + out_folder=out_folder, + suffix=f"{res['group']:05}", + ) if len(hydro_units_data) > 0: + hydro_units_data = hydro_units_data.clip(sub_set_hull) logging.info("Classifying hydrogeology.") res["hydro_units"], res["hydro_area"] = u4spatial.area_per_feature( hydro_units_data, "L_CH_TXT" @@ -810,11 +936,11 @@ def hydrogeology( def landslides( - res: dict, + res: U4ResDict, sub_set_hull: gp.GeoDataFrame, use_online: bool = False, shp_path: os.PathLike = "", -) -> dict: +) -> U4ResDict: """Gets the landslide prone units and their spatial extend in the area as well as the number of known landslides in the area. @@ -827,7 +953,7 @@ def landslides( :param shp_path: The path to a gpkg file containing the data, defaults to "" :type shp_path: os.PathLike, optional :return: The results dictionary with the classified data appended. - :rtype: dict + :rtype: U4ResDict """ if use_online: logging.info("Querying HLNUG for landslide_data") @@ -876,7 +1002,7 @@ def rockfall( sub_set_hull: gp.GeoDataFrame, use_online: bool = False, shp_path: os.PathLike = "", -) -> dict: +) -> U4ResDict: """Gets the number of known rockfalls in the area. :param sub_set_hull: The hull of the area. :type sub_set_hull: gp.GeoDataFrame @@ -885,7 +1011,7 @@ def rockfall( :param shp_path: The path to a gpkg file containing the data, defaults to "" :type shp_path: os.PathLike, optional :return: The results dictionary with the classified data appended. - :rtype: dict + :rtype: U4ResDict """ if use_online: logging.info("Querying HLNUG for rockfall_points") @@ -914,15 +1040,15 @@ def rockfall( def subsidence( - res: dict, + res: U4ResDict, sub_set_hull: gp.GeoDataFrame, use_online: bool = False, shp_path: os.PathLike = "", -) -> dict: +) -> U4ResDict: """Gets the subsidence prone units and their spatial extend in the area. :param res: The results dictionary including some previous results. - :type res: dict + :type res: U4ResDict :param sub_set_hull: The hull of the area. :type sub_set_hull: gp.GeoDataFrame :param use_online: Whether to query the online webservice of the HLNUG, defaults to False @@ -930,7 +1056,7 @@ def subsidence( :param shp_path: The path to a gpkg file containing the data, defaults to "" :type shp_path: os.PathLike, optional :return: The results dictionary with the classified data appended. - :rtype: dict + :rtype: U4ResDict """ if use_online: logging.info("Querying HLNUG for subsidence_data") @@ -960,16 +1086,16 @@ def subsidence( def karst( - res: dict, + res: U4ResDict, sub_set_hull: gp.GeoDataFrame, use_online: bool = False, shp_path: os.PathLike = "", -) -> dict: +) -> U4ResDict: """Gets the known karst risk and the spatial extend in the area as well as the number of known sinkholes in the area. :param res: The results dictionary including some previous results. - :type res: dict + :type res: U4ResDict :param sub_set_hull: The hull of the area. :type sub_set_hull: gp.GeoDataFrame :param use_online: Whether to query the online webservice of the HLNUG, defaults to False @@ -977,7 +1103,7 @@ def karst( :param shp_path: The path to a gpkg file containing the data, defaults to "" :type shp_path: os.PathLike, optional :return: The results dictionary with the classified data appended. - :rtype: dict + :rtype: U4ResDict """ if use_online: logging.info("Querying HLNUG for karst_data") @@ -1020,20 +1146,20 @@ def karst( def topsoil( - res: dict, + res: U4ResDict, sub_set_hull: gp.GeoDataFrame, out_folder: os.PathLike = "", -) -> dict: +) -> U4ResDict: """Gets the composition of the topsoil and its spatial extent in the area. :param res: The results dictionary including some previous results. - :type res: dict + :type res: U4ResDict :param sub_set_hull: The hull of the area. :type sub_set_hull: gp.GeoDataFrame :param out_folder: The path to a gpkg file containing the data (not implemented yet), defaults to "" :type out_folder: os.PathLike, optional :return: The results dictionary with the classified data appended. - :rtype: dict + :rtype: U4ResDict """ logging.info("Looking for topsoil_data") topsoil_data = u4web.query_hlnug( @@ -1058,7 +1184,9 @@ def topsoil( return res -def psi_data(sub_set_hull: gp.GeoDataFrame, psi_path: os.PathLike) -> dict: +def psi_data( + sub_set_hull: gp.GeoDataFrame, psi_path: os.PathLike +) -> U4ResDict: """Gets the PSI Data in the region and does a time series inversion. :param sub_set_hull: The region to select the data. @@ -1066,7 +1194,7 @@ def psi_data(sub_set_hull: gp.GeoDataFrame, psi_path: os.PathLike) -> dict: :param psi_path: The path to the file containing the PSI data. :type psi_path: os.PathLike :return: The results dictionary with the classified data appended. - :rtype: dict + :rtype: U4ResDict """ logging.info("Loading PSI Data") data = u4gpkg.load_gpkg_data_region(sub_set_hull, psi_path, "vertikal") @@ -1100,7 +1228,7 @@ def psi_data(sub_set_hull: gp.GeoDataFrame, psi_path: os.PathLike) -> dict: def write_shape( - res: dict, + res: U4ResDict, sub_set: gp.GeoDataFrame, group: str, project: configparser.ConfigParser, @@ -1108,7 +1236,7 @@ def write_shape( """Writes the shapes including their individual results to a shapefile. :param res: The results dictionary. - :type res: dict + :type res: U4ResDict :param sub_set: The shapes for which the results where compiled. :type sub_set: gp.GeoDataFrame :param group: The name of the group. @@ -1128,11 +1256,13 @@ def write_shape( gdf.to_file(os.path.join(shape_folder, f"{group}.shp")) -def write_report(res: dict, group: str, project: configparser.ConfigParser): +def write_report( + res: U4ResDict, group: str, project: configparser.ConfigParser +): """Saves the results for the specified group to a text file. :param res: The results dictionary. - :type res: dict + :type res: U4ResDict :param group: The name of the group. :type group: str :param project: The project containing paths. @@ -1154,7 +1284,7 @@ def write_report(res: dict, group: str, project: configparser.ConfigParser): def write_fig( - res: dict, + res: U4ResDict, sub_set_hull: gp.GeoDataFrame, group: str, project: configparser.ConfigParser, @@ -1162,7 +1292,7 @@ def write_fig( """Creates several plots for each site, including statistics. :param res: The results dictionary. - :type res: dict + :type res: U4ResDict :param sub_set_hull: The hull of the region. :type sub_set_hull: gp.GeoDataFrame :param group: The name of the group. @@ -1184,11 +1314,11 @@ def write_fig( def manual_classification( - res: dict, + res: U4ResDict, sub_set_hull: gp.GeoDataFrame, group: str, project: configparser.ConfigParser, -) -> dict: +) -> U4ResDict: """ Loads the manually defined classification from a shape-file and checks if the hull overlaps with any of the points. This is necessary because in @@ -1196,7 +1326,7 @@ def manual_classification( association is persisting. :param res: The results dictionary. - :type res: dict + :type res: U4ResDict :param sub_set_hull: The hull of the region. :type sub_set_hull: gp.GeoDataFrame :param group: The name of the group. @@ -1204,7 +1334,7 @@ def manual_classification( :param project: The project containing paths. :type project: configparser.ConfigParser :return: The results dictionary with the results appended. - :rtype: dict + :rtype: U4ResDict """ man_path = os.path.join( @@ -1251,7 +1381,7 @@ def manual_classification( # Extract three most common classes for ii, cla in enumerate(unique_classes): if ii < 3: - res[f"manual_class_{ii+1}"] = cla + res[f"manual_class_{ii + 1}"] = cla elif len(candidates) > 1: logging.info( f"Multiple classifications found for group {group:05}" @@ -1267,3 +1397,41 @@ def manual_classification( logging.info(f"No manual classification found for group {group:05}") return res + + +def structural_area( + res: U4ResDict, + sub_set_hull: gp.GeoDataFrame, + out_folder: os.PathLike = "", +) -> U4ResDict: + """Gets the geological structural region of the area. + + :param res: The results dictionary including some previous results. + :type res: U4ResDict + :param sub_set_hull: The hull of the area. + :type sub_set_hull: gp.GeoDataFrame + :param out_folder: The path to a gpkg file containing the data (not implemented yet), defaults to "" + :type out_folder: os.PathLike, optional + :return: The results dictionary with the classified data appended. + :rtype: U4ResDict + """ + + logging.info("Looking for structural data") + structural_data = u4web.query_hlnug( + "geologie/guek300_ogc/MapServer", + "strukturraeume", + region=sub_set_hull, + out_folder=out_folder, + suffix=f"{res['group']:05}", + ) + + if len(structural_data) > 0: + structural_data = structural_data.clip(sub_set_hull) + logging.info("Classifying structural area") + structural_areas = structural_data.EBENE3_TXT.to_list() + res["structural_region"] = structural_areas + else: + logging.info( + f"No structural area data found for group {res['group']:05}." + ) + return res diff --git a/u4py/analysis/spatial.py b/u4py/analysis/spatial.py index 0c175eed4d94af798386b21f7a315976c67ad417..f613aa9b0a577f2c339ee66f00c9760002944b60 100644 --- a/u4py/analysis/spatial.py +++ b/u4py/analysis/spatial.py @@ -637,7 +637,7 @@ def contour_shapes( data["geometry"].append(pgon) if ii < len(levels) - 1: data["polygon_levels"].append( - f"{levels[ii]} - {levels[ii+1]}" + f"{levels[ii]} - {levels[ii + 1]}" ) else: data["polygon_levels"].append(f">{levels[ii]}") @@ -1283,3 +1283,31 @@ def shape_in_tiff(shp: shapely.Polygon, src: rasterio.DatasetReader) -> bool: or (shp_bnd[3] >= rst_bnd.bottom and shp_bnd[3] <= rst_bnd.top) ) return result + + +def feret_diameters(polygon: np.ndarray) -> Tuple[float, float]: + """ + Calculates the minimum and maximum Feret diameters of a convex polygon + based on the rotating caliper method + + :param polygon: A numpy array of (x, y) coordinates + :type polygon: np.ndarray + :return: The minimum and maximum Feret diameter + :rtype: Tuple[float, float] + """ + if np.all(polygon[0] == polygon[-1]): + polygon = polygon[:-1] + length = len(polygon) + Ds = np.empty(length) + + for i in range(length): + p1 = polygon[i] + p2 = polygon[(i + 1) % length] + + ds = np.abs(np.cross(p2 - p1, p1 - polygon) / np.linalg.norm(p2 - p1)) + + Ds[i] = np.max(ds) + minf = np.min(Ds) + maxf = np.max(Ds) + + return (minf, maxf) diff --git a/u4py/scripts/gis_workflows/Classify_HLNUG_Data.py b/u4py/scripts/gis_workflows/Classify_HLNUG_Data.py deleted file mode 100644 index 4eed2cd93997156e0ea89807fbe445ca03e43b03..0000000000000000000000000000000000000000 --- a/u4py/scripts/gis_workflows/Classify_HLNUG_Data.py +++ /dev/null @@ -1,138 +0,0 @@ -""" -Uses the internal shape file of HLNUG to automatically create maps and reports -of the known mass movements database of HLNUG -""" - -import os -from multiprocessing import Pool -from pathlib import Path - -import geopandas as gp -import matplotlib.lines as mlines -import matplotlib.patches as mpatches -import matplotlib.patheffects as pe -import matplotlib.pyplot as plt -import numpy as np -from tqdm import tqdm - -import u4py.plotting.axes as u4ax -import u4py.plotting.formatting as u4pltfmt -import u4py.plotting.plots as u4plots -import u4py.utils.config as u4config - - -def main(): - shp_path = Path( - "~/Documents/umwelt4/Places/HLNUG Daten/SHP/RD_Rutschungen_gesamt.shp" - ).expanduser() - output_path = Path( - "~/Documents/umwelt4/INSAR_plots/HLNUG_Plots" - ).expanduser() - os.makedirs(output_path, exist_ok=True) - data = gp.read_file(shp_path) - args = [(data, ii, output_path) for ii in data.AMT_NR_.to_list()] - for arg in tqdm(args): - topo_plot(*arg) - # with Pool(u4config.cpu_count) as p: - # list( - # tqdm( - # p.imap_unordered(plot_wrapper, args), - # total=len(args), - # desc="Generating HLNUG Plots", - # leave=False, - # ) - # ) - - -def plot_wrapper(args: tuple): - try: - topo_plot(*args) - except TypeError: - print(f"Error for ID: {args[1]}") - - -def topo_plot(data: gp.GeoDataFrame, num: int, output_path: os.PathLike): - polygon = data[data.AMT_NR_ == num] - cx = polygon.centroid.x.values[0] - cy = polygon.centroid.y.values[0] - - fig, ax = plt.subplots(figsize=(16 / 2.54, 11.3 / 2.54), dpi=300) - polygon.plot( - ax=ax, facecolor="None", edgecolor="C0", linewidth=2, zorder=3 - ) - # buffer_size = 3 * np.sqrt(polygon.area) - buffer_size = 750 - polygon.buffer(buffer_size).plot(ax=ax, facecolor="None", edgecolor="None") - ax.annotate( - f"{num}", - (cx, cy), - horizontalalignment="center", - verticalalignment="center", - # fontsize="xx-large", - fontweight="bold", - path_effects=[pe.withStroke(linewidth=2, foreground="C0")], - color="w", - ) - - ax.set_position([0, 0, 1, 1]) - plt.axis("equal") - ax.legend( - handles=[ - mpatches.Patch( - facecolor="None", - edgecolor="C0", - linewidth=2, - label="Rutschung aus DGM", - ), - mpatches.Patch( - # alpha=0.33, - edgecolor="k", - facecolor="None", - linestyle=":", - # linewidth=0.5, - label="weitere Rutschungen", - ), - mlines.Line2D( - [], - [], - linewidth=2, - color="orange", - path_effects=[ - pe.Stroke(linewidth=3, foreground="sienna"), - pe.Normal(), - ], - label="Landesstraße", - ), - ], - ) - plt.draw() - xlim = ax.get_xlim() - ylim = ax.get_ylim() - data[data.AMT_NR_ != num].plot( - ax=ax, - # alpha=0.33, - edgecolor="k", - facecolor="None", - linestyle=":", - # linewidth=0.5, - zorder=2, - ) - ax.set_xlim(xlim) - ax.set_ylim(ylim) - u4pltfmt.add_scalebar(ax, width=0) - u4ax.add_basemap(ax=ax) - - # Use HVBG WMS -> currently not working - # u4ax.add_web_map_service( - # ax=ax, - # crs=str(polygon.crs), - # # wms_url="https://ows.terrestris.de/osm/service", - # # layer="OSM-WMS", - # ) - fig.savefig(os.path.join(output_path, f"topo_{num:04}.png")) - # fig.savefig(os.path.join(output_path, f"topo_{num}.pdf")) - plt.close(fig) - - -if __name__ == "__main__": - main() diff --git a/u4py/scripts/gis_workflows/Classify_Shapes.py b/u4py/scripts/gis_workflows/Classify_Shapes.py index 9f2de68e95363dd79263b397d1875dcf2713004a..f715f4641e839766270086da2c6a97668700d3eb 100644 --- a/u4py/scripts/gis_workflows/Classify_Shapes.py +++ b/u4py/scripts/gis_workflows/Classify_Shapes.py @@ -14,6 +14,7 @@ from tqdm import tqdm import u4py.analysis.classify as u4class import u4py.analysis.processing as u4proc +import u4py.utils.cmd_args as u4args import u4py.utils.config as u4config import u4py.utils.projects as u4proj @@ -21,8 +22,16 @@ import u4py.utils.projects as u4proj def main(): + args = u4args.load() + if args.input: + proj_path = args.input + else: + proj_path = ( + "/home/rudolf/Documents/umwelt4/Classify_ShapesHLNUG.u4project" + ) + project = u4proj.get_project( - proj_path="/home/rudolf/Documents/umwelt4/Classify_ShapesHLNUG.u4project", + proj_path=proj_path, required=[ "base_path", "psi_path", @@ -34,30 +43,29 @@ def main(): interactive=False, ) shp_cfg = u4config.get_shape_config() - use_parallel = True - use_online = True - use_internal = False # Setting up paths shp_file = os.path.join( project["paths"]["sites_path"], "thresholded_contours_all_shapes.gpkg", ) + if not (os.path.exists(shp_file)): shp_file = os.path.join( project["paths"]["sites_path"], "RD_Rutschungen_gesamt.shp" ) # Getting Data - # sub_region = create_test_region() - # shp_gdf = u4gpkg.load_gpkg_data_region_ogr(sub_region, shp_file) shp_gdf = gp.read_file(shp_file).to_crs("EPSG:32632") - # shp_gdf = shp_gdf[:100] if "groups" in shp_gdf.keys(): unique_groups = np.unique(shp_gdf.groups) else: - unique_groups = np.unique(shp_gdf.AMT_NR_) - # unique_groups = [9] + logging.info( + "The dataset is the HLNUG Landslide database, extracting only the Landslides" + ) + shp_gdf = shp_gdf[shp_gdf.OBJEKT == "Rutschung"] + shp_gdf["groups"] = shp_gdf.AMT_NR_ + unique_groups = np.unique(shp_gdf.groups) kwargs = [ { "shp_gdf": shp_gdf, @@ -65,13 +73,13 @@ def main(): "buffer_size": 5, "shp_cfg": shp_cfg, "project": project, - "use_online": use_online, - "use_internal": use_internal, + "use_online": project.getboolean("config", "use_online"), + "use_internal": project.getboolean("config", "use_internal"), "save_report": True, } for group in unique_groups ] - if use_parallel: + if project.getboolean("config", "use_parallel"): main_list = u4proc.batch_mapping( kwargs, classifier_wrapper, "Classifying Groups" ) diff --git a/u4py/utils/cmd_args.py b/u4py/utils/cmd_args.py index 933cbf05e5d115fae490c6aca3af8d787c1cd39e..6a3c6f304cfb0e93e03a6eb07bb3910bf88ac535 100644 --- a/u4py/utils/cmd_args.py +++ b/u4py/utils/cmd_args.py @@ -8,9 +8,12 @@ import argparse import logging import u4py.utils.config as u4config +import u4py.utils.types as u4types -def load(module_descript: str = "No description given."): +def load( + module_descript: str = "No description given.", +) -> u4types.U4Namespace: """Loads the commandline parser and sets everything in the config for later use. :param module_descript: The description of the module, defaults to "No description given." diff --git a/u4py/utils/types.py b/u4py/utils/types.py index 1df5e32a4bef0c9aa55722be9863d9575404c2bd..b770931a7db495b48a7307e9d0cb0af7b260d2b3 100644 --- a/u4py/utils/types.py +++ b/u4py/utils/types.py @@ -3,10 +3,16 @@ Contains types for better type hints in Python """ import os +from argparse import Namespace +from configparser import ConfigParser from typing import TypedDict +import geopandas as gp + class U4PathsConfig(TypedDict): + """Paths for the u4py project""" + base_path: os.PathLike ext_path: os.PathLike places_path: os.PathLike @@ -24,22 +30,175 @@ class U4PathsConfig(TypedDict): psiew_path: os.PathLike results_path: os.PathLike + class U4Config(TypedDict): + """Options for u4py""" + overwrite: bool use_filtered: bool use_parallel: bool + use_online: bool + use_internal: bool generate_plots: bool overwrite_plots: bool generate_document: bool single_report: bool is_hlnug: bool + class U4Metadata(TypedDict): + """Some additional data needed for the scripts""" + report_title: str report_subtitle: str report_suffix: str -class U4Project(TypedDict): + +class U4Project(ConfigParser): + """A u4py project configuration""" + paths: U4PathsConfig config: U4Config - metadata: U4Metadata \ No newline at end of file + metadata: U4Metadata + + +class U4ResDict(TypedDict): + """A dictionary with results of classifications""" + + additional_areas: float + area: float + aspect_hull_mean_14: list + aspect_hull_mean_19: list + aspect_hull_mean_21: list + aspect_hull_median_14: list + aspect_hull_median_19: list + aspect_hull_median_21: list + aspect_hull_std_14: list + aspect_hull_std_19: list + aspect_hull_std_21: list + aspect_polygons_mean_14: list + aspect_polygons_mean_19: list + aspect_polygons_mean_21: list + aspect_polygons_median_14: list + aspect_polygons_median_19: list + aspect_polygons_median_21: list + aspect_polygons_std_14: list + aspect_polygons_std_19: list + aspect_polygons_std_21: list + buildings_area: float + buildings: gp.GeoDataFrame + geology_area: list + geology_percent: list + geology_units: list + geology_mapnum: list + geology_mapname: list + geometry: gp.GeoDataFrame + group: float + hydro_area: list + hydro_percent: list + hydro_units: list + karst_area: list + karst_num_1km: float + karst_num_inside: float + karst_percent: list + karst_total: float + karst_units: list + landslide_area: list + landslide_percent: list + landslide_total: float + landslide_units: list + landslides_num_1km: float + landslides_num_inside: float + landuse_area: list + landuse_major: str + landuse_names: list + landuse_percent: list + landuse_total: float + manual_class_1: str + manual_class_2: str + manual_class_3: str + manual_comment: str + manual_group: float + manual_known: bool + manual_research: bool + manual_unclear_1: bool + manual_unclear_2: bool + manual_unclear_3: bool + roads_has_motorway: bool + roads_has_primary: bool + roads_has_secondary: bool + roads_motorway_names: list + roads_primary_names: list + roads_secondary_names: list + roads_motorway_length: list + roads_primary_length: list + roads_secondary_length: list + roads_main_area: float + roads_main: gp.GeoDataFrame + roads_minor_area: float + roads_minor: gp.GeoDataFrame + rockfall_num_1km: float + rockfall_num_inside: float + shape_ellipse_a: list + shape_ellipse_b: list + shape_ellipse_theta: list + shape_flattening: list + shape_roundness: list + shape_width: list + shape_breadth: list + shape_aspect: list + slope_hull_mean_14: list + slope_hull_mean_19: list + slope_hull_mean_21: list + slope_hull_median_14: list + slope_hull_median_19: list + slope_hull_median_21: list + slope_hull_std_14: list + slope_hull_std_19: list + slope_hull_std_21: list + slope_polygons_mean_14: list + slope_polygons_mean_19: list + slope_polygons_mean_21: list + slope_polygons_median_14: list + slope_polygons_median_19: list + slope_polygons_median_21: list + slope_polygons_std_14: list + slope_polygons_std_19: list + slope_polygons_std_21: list + subsidence_area: list + subsidence_percent: list + subsidence_total: float + subsidence_units: list + structural_region: list + timeseries_annual_cosine: float + timeseries_annual_max_amplitude: float + timeseries_annual_max_time: float + timeseries_annual_sine: float + timeseries_linear: float + timeseries_num_psi: float + timeseries_offset: float + timeseries_semiannual_cosine: float + timeseries_semiannual_max_amplitude: float + timeseries_semiannual_max_time: float + timeseries_semiannual_sine: float + topsoil_area: float + topsoil_percent: float + topsoil_units: float + volumes_added: float + volumes_moved: float + volumes_removed: float + volumes_total: float + volumes_polygons_added: float + volumes_polygons_moved: float + volumes_polygons_removed: float + volumes_polygons_total: float + water_area: float + well_number: int + + +class U4Namespace(Namespace): + """Commandline arguments for U4Py""" + + input: os.PathLike + overwrite: bool + cpus: int