From 29935f79685b1c4a2d164676b54fb55200642ca3 Mon Sep 17 00:00:00 2001 From: Michael Rudolf <rudolf@geo.tu-darmstadt.de> Date: Mon, 12 May 2025 16:28:19 +0200 Subject: [PATCH] HLNUG update - Added more classifications: - Railways are now included - Names of roads are included - Also calculates the distance to roads and railways - Added error estimation for elevation changes --- u4py/analysis/classify.py | 147 ++++++++++++++++++++++++-------------- u4py/analysis/spatial.py | 145 +++++++++++++++++++++++++++++++++++-- u4py/io/tiff.py | 39 ++++++---- u4py/utils/types.py | 10 +++ 4 files changed, 269 insertions(+), 72 deletions(-) diff --git a/u4py/analysis/classify.py b/u4py/analysis/classify.py index 2fcd0a6..c8f96ad 100644 --- a/u4py/analysis/classify.py +++ b/u4py/analysis/classify.py @@ -145,8 +145,9 @@ def classify_shape( # Manual Classification res.update(manual_classification(res, sub_set_hull, group, project)) - # Roads + # Roads and Railways res.update(roads(res, sub_set_hull, osm_path, shp_cfg)) + res.update(railways(res, sub_set_hull, osm_path)) # Buildings res.update(buildings(res, sub_set_hull, osm_path)) @@ -276,6 +277,9 @@ def preallocate_results() -> U4ResDict: "manual_unclear_1": False, "manual_unclear_2": False, "manual_unclear_3": False, + "railways_has": False, + "railways_length": np.nan, + "railways_close": False, "roads_has_motorway": False, "roads_has_primary": False, "roads_has_secondary": False, @@ -286,6 +290,13 @@ def preallocate_results() -> U4ResDict: "roads_primary_length": [], "roads_secondary_length": [], "roads_main_area": np.nan, + "roads_nearest_motorway_name": "", + "roads_nearest_primary_name": "", + "roads_nearest_secondary_name": "", + "roads_nearest_motorway_dist": np.nan, + "roads_nearest_primary_dist": np.nan, + "roads_nearest_secondary_dist": np.nan, + "roads_main_area": np.nan, "roads_main": gp.GeoDataFrame(), "roads_minor_area": np.nan, "roads_minor": gp.GeoDataFrame(), @@ -340,6 +351,7 @@ def preallocate_results() -> U4ResDict: "volumes_moved": np.nan, "volumes_removed": np.nan, "volumes_total": np.nan, + "volumes_error": np.nan, "volumes_polygons_added": np.nan, "volumes_polygons_moved": np.nan, "volumes_polygons_removed": np.nan, @@ -369,48 +381,20 @@ def roads( :rtype: U4ResDict """ - def get_clipped_road_area( - roads: gp.GeoDataFrame, - road_type: str, - clip_reg: gp.GeoDataFrame, - shp_cfg: dict, - ) -> gp.GeoDataFrame: - """Buffers and clips the roads to the area of interest, removing tunnels as well. - - :param roads: The roads dataframe. - :type roads: gp.GeoDataFrame - :param road_type: The list of roads to use. - :type road_type: str - :param clip_reg: The region used for clipping. - :type clip_reg: gp.GeoDataFrame - :param shp_cfg: The shape config containing buffer sizes. - :type shp_cfg: dict - :return: The clipped roads as polygons. - :rtype: gp.GeoDataFrame - """ - roads = roads[roads["tunnel"] == "F"] - road_gdf = gp.GeoDataFrame( - geometry=gp.pd.concat( - [ - roads[roads["fclass"] == rds] - for rds in shp_cfg["fclass"][road_type] - ] - ).buffer(shp_cfg["buffer_dist"][road_type]), - crs=roads.crs, - ) - - return road_gdf.clip(clip_reg) - logging.info("Loading Road Data") roads_data = u4gpkg.load_gpkg_data_region_ogr( sub_set_hull, osm_path, "gis_osm_roads_free_1" ) - + close_roads_data = u4gpkg.load_gpkg_data_region_ogr( + sub_set_hull.buffer(1000), osm_path, "gis_osm_roads_free_1" + ) + if len(close_roads_data) > 0: + close_road_classes = close_roads_data.fclass.to_list() if len(roads_data) > 0: logging.info("Classifying Road Data") # Extract area of main roads - res["roads_main"] = get_clipped_road_area( + res["roads_main"] = u4spatial.get_clipped_road_area( roads_data, "mainroads", sub_set_hull, shp_cfg ) if len(res["roads_main"]) > 0: @@ -422,7 +406,7 @@ def roads( res["roads_main_area"] = 0 # Extract area of minor roads - res["roads_minor"] = get_clipped_road_area( + res["roads_minor"] = u4spatial.get_clipped_road_area( roads_data, "minor_roads", sub_set_hull, shp_cfg ) if len(res["roads_minor"]) > 0: @@ -435,29 +419,78 @@ def roads( road_classes = roads_data.fclass.to_list() if ("motorway" in road_classes) or ("trunk" in road_classes): res["roads_has_motorway"] = True + res["roads_motorway_names"], res["roads_motorway_length"] = ( + u4spatial.road_info(roads_data, "motorway") + ) + elif "motorway" in close_road_classes: + name, dist = u4spatial.get_nearest_road_segment( + sub_set_hull.centroid.iloc[0], close_roads_data, "motorway" + ) + res["roads_nearest_motorway_name"] = name + res["roads_nearest_motorway_dist"] = dist + if "primary" in road_classes: res["roads_has_primary"] = True - for ii, road in roads_data[ - roads_data.fclass == "primary" - ].iterrows(): - if road.ref not in res["roads_primary_names"]: - res["roads_primary_names"].append(road.ref) - res["roads_primary_length"].append(road.geometry.length) - else: - ridx = res["roads_primary_names"].index(road.ref) - res["roads_primary_length"][ridx] += road.geometry.length + res["roads_primary_names"], res["roads_primary_length"] = ( + u4spatial.road_info(roads_data, "primary") + ) + elif "primary" in close_road_classes: + name, dist = u4spatial.get_nearest_road_segment( + sub_set_hull.centroid.iloc[0], close_roads_data, "primary" + ) + res["roads_nearest_primary_name"] = name + res["roads_nearest_primary_dist"] = dist if "secondary" in road_classes: res["roads_has_secondary"] = True - for ii, road in roads_data[ - roads_data.fclass == "secondary" - ].iterrows(): - if road.ref not in res["roads_secondary_names"]: - res["roads_secondary_names"].append(road.ref) - res["roads_secondary_length"].append(road.geometry.length) - else: - ridx = res["roads_secondary_names"].index(road.ref) - res["roads_secondary_length"][ridx] += road.geometry.length + res["roads_secondary_names"], res["roads_secondary_length"] = ( + u4spatial.road_info(roads_data, "secondary") + ) + elif "secondary" in close_road_classes: + name, dist = u4spatial.get_nearest_road_segment( + sub_set_hull.centroid.iloc[0], close_roads_data, "secondary" + ) + res["roads_nearest_secondary_name"] = name + res["roads_nearest_secondary_dist"] = dist + return res + + +def railways( + res: U4ResDict, + sub_set_hull: gp.GeoDataFrame, + osm_path: os.PathLike, +) -> dict: + """Loads the railways from the openstreetmap database and calculates length or distance to closest points. + + :param res: The results dictionary as of now. + :type res: U4ResDict + :param sub_set_hull: The area where to look for railways. + :type sub_set_hull: gp.GeoDataFrame + :param osm_path: The path to the openstreetmap database. + :type osm_path: os.PathLike + :return: An updated version of the results dictionary. + :rtype: U4ResDict + """ + + logging.info("Loading Railways Data") + railways_data = u4gpkg.load_gpkg_data_region_ogr( + sub_set_hull, osm_path, "gis_osm_railways_free_1" + ) + close_rails_data = u4gpkg.load_gpkg_data_region_ogr( + sub_set_hull.buffer(1000), osm_path, "gis_osm_railways_free_1" + ) + logging.info("Classifying Railways Data") + if len(railways_data) > 0: + _, res["railways_length"] = u4spatial.road_info(railways_data, "rail") + if res["railways_length"] > 0: + res["railways_has"] = True + elif len(close_rails_data) > 0: + _, res["railways_length"] = u4spatial.get_nearest_road_segment( + sub_set_hull.centroid.iloc[0], close_rails_data, "rail" + ) + if res["railways_length"] > 0: + res["railways_close"] = True + return res @@ -731,6 +764,12 @@ def volume(geometry: gp.GeoDataFrame, diffplan_path: os.PathLike) -> U4ResDict: res["volumes_moved"] = int( np.round(np.nansum(volumes["volumes_moved"]), -2) ) + if len(volumes["volumes_error"]) > 1: + res["volumes_error"] = int( + np.sqrt(np.nansum([v**2 for v in volumes["volumes_error"]])) + ) + else: + res["volumes_error"] = int(volumes["volumes_error"][0]) return res diff --git a/u4py/analysis/spatial.py b/u4py/analysis/spatial.py index f613aa9..ebf2520 100644 --- a/u4py/analysis/spatial.py +++ b/u4py/analysis/spatial.py @@ -5,7 +5,9 @@ or spatial lookup of features. from __future__ import annotations +import itertools import logging +import operator as op import os import pickle as pkl import re @@ -75,7 +77,7 @@ def reproject_raster( return out_path -def get_cKDTree(file_path: os.PathLike) -> spspatial.cKDTree: +def get_cKDTree(file_path: os.PathLike) -> spspatial.KDTree: """Loads all x and y coordinates from the given file or folder and returns a cKDTree for easy spatial lookup. :param file_path: The path to the folder or file. @@ -1155,7 +1157,7 @@ def vol_added(im_data: np.ndarray) -> np.ndarray: :param im_data: The input raster dem data. :type im_data: np.ndarray - :return: The slope at each pixel in degrees. + :return: The volume added. :rtype: np.ndarray """ return np.nansum(im_data[im_data > 0]) @@ -1166,7 +1168,7 @@ def vol_removed(im_data: np.ndarray) -> np.ndarray: :param im_data: The input raster dem data. :type im_data: np.ndarray - :return: The slope at each pixel in degrees. + :return: The volume removed. :rtype: np.ndarray """ return np.nansum(im_data[im_data < 0]) @@ -1177,12 +1179,23 @@ def vol_moved(im_data: np.ndarray) -> np.ndarray: :param im_data: The input raster dem data. :type im_data: np.ndarray - :return: The slope at each pixel in degrees. + :return: The volume moved. :rtype: np.ndarray """ return np.nansum(np.abs(im_data)) +def vol_error(im_data: np.ndarray) -> np.ndarray: + """Calculates the error of volume estimates inside the area of the numpy array. + + :param im_data: The input raster dem data. + :type im_data: np.ndarray + :return: The error of volume estimates. + :rtype: np.ndarray + """ + return 0.3**2 * len(im_data[np.isfinite(im_data)]) + + def roundness(shapes: gp.GeoDataFrame) -> list: """Calculates the roundness of all polygons in shapes. @@ -1311,3 +1324,127 @@ def feret_diameters(polygon: np.ndarray) -> Tuple[float, float]: maxf = np.max(Ds) return (minf, maxf) + + +def get_clipped_road_area( + roads: gp.GeoDataFrame, + road_type: str, + clip_reg: gp.GeoDataFrame, + shp_cfg: dict, +) -> gp.GeoDataFrame: + """Buffers and clips the roads to the area of interest, removing tunnels as well. + + :param roads: The roads dataframe. + :type roads: gp.GeoDataFrame + :param road_type: The list of roads to use. + :type road_type: str + :param clip_reg: The region used for clipping. + :type clip_reg: gp.GeoDataFrame + :param shp_cfg: The shape config containing buffer sizes. + :type shp_cfg: dict + :return: The clipped roads as polygons. + :rtype: gp.GeoDataFrame + """ + roads = roads[roads["tunnel"] == "F"] + road_gdf = gp.GeoDataFrame( + geometry=gp.pd.concat( + [ + roads[roads["fclass"] == rds] + for rds in shp_cfg["fclass"][road_type] + ] + ).buffer(shp_cfg["buffer_dist"][road_type]), + crs=roads.crs, + ) + + return road_gdf.clip(clip_reg) + + +def road_info( + roads_data: gp.GeoDataFrame, fclass: str +) -> Tuple[list] | Tuple[list, float]: + """ + Calculates the length and names of the features in the geodataframe with + the given fclass. + + :param roads_data: A set of roads or railways + :type roads_data: gp.GeoDataFrame + :param fclass: The feature class to filter for, e.g. "motorway" + :type fclass: str + :return: The road names and road lengths, for railways the name is empty and the length is a single float + :rtype: Tuple[list] | Tuple[list, float] + """ + if hasattr(roads_data, "ref"): + road_names = [] + road_length = [] + for ii, road in roads_data[roads_data.fclass == fclass].iterrows(): + if road.ref not in road_names: + road_names.append(road.ref) + road_length.append(road.geometry.length) + else: + ridx = road_names.index(road.ref) + road_length[ridx] += road.geometry.length + elif hasattr(roads_data, "code"): + road_names = [] + road_length = 0 + for ii, road in roads_data[roads_data.fclass == fclass].iterrows(): + road_length += road.geometry.length + + return road_names, road_length + + +def ckdnearest( + geom_a: shapely.Point, gdf_b: gp.GeoDataFrame, col: list[str] = ["Place"] +) -> gp.GeoDataFrame: + """Finds nearest line features in `gdf_b` to the points in `gdf_a`. + + :param gdf_a: A geodataframe containing points + :type gdf_a: gp.GeoDataFrame + :param gdf_b: A geodataframe containing lines + :type gdf_b: gp.GeoDataFrame + :param col: The column to select, defaults to ["Place"] + :type col: list, optional + :return: _description_ + :rtype: gp.GeoDataFrame + """ + geom_a = (geom_a.x, geom_a.y) + geom_b = [np.array(geom.coords) for geom in gdf_b.geometry.to_list()] + idx_b = tuple( + itertools.chain.from_iterable( + [ + itertools.repeat(i, x) + for i, x in enumerate(list(map(len, geom_b))) + ] + ) + ) + geom_b = np.concatenate(geom_b) + ckd_tree = spspatial.cKDTree(geom_b) + dist, idx = ckd_tree.query([geom_a], k=1) + idx = op.itemgetter(*idx)(idx_b) + return dist, idx + + +def get_nearest_road_segment( + point: shapely.Point, road_data: gp.GeoDataFrame, fclass: str +) -> Tuple[str, int]: + """Gets the closest road segment from `road_data` to the given `point`. Returns the name of the road and its name. + + :param point: The point to use for querying. + :type point: shapely.Point + :param road_data: A set of roads to select from. + :type road_data: gp.GeoDataFrame + :param fclass: The feature class to filter the selection. + :type fclass: str + :return: The name of the road and the distance in metres. + :rtype: Tuple[str, int] + """ + dist, idx = ckdnearest( + point, + road_data[road_data.fclass == fclass], + ) + if hasattr(road_data, "name") and hasattr(road_data, "ref"): + name = road_data[road_data.fclass == fclass].iloc[idx].ref + if not name: + name = road_data[road_data.fclass == fclass].iloc[idx].name + else: + name = "" + return name, int(dist) diff --git a/u4py/io/tiff.py b/u4py/io/tiff.py index ff85139..2f3d268 100644 --- a/u4py/io/tiff.py +++ b/u4py/io/tiff.py @@ -278,7 +278,7 @@ def get_clipped_tiff_list( """ logging.info("Getting clipped tiffs using GPKG shapes and SQL.") base_path = u4files.multi_split(tiff_file_list[0], 2) - ctiff_fol = os.path.join(base_path, f"clipped_tiffs_gpkg") + ctiff_fol = os.path.join(base_path, "clipped_tiffs_gpkg") os.makedirs(ctiff_fol, exist_ok=True) clipped_tiff_list = [] @@ -416,6 +416,7 @@ def calculate_volume_in_shape( volumes_removed = [] volumes_added = [] volumes_moved = [] + volumes_error = [] for geom in shapes.geometry: part = coverage.clip(geom) if len(part) > 0: @@ -423,42 +424,51 @@ def calculate_volume_in_shape( vol_removed = 0 vol_added = 0 vol_moved = 0 + vol_error = 0 flist = part.path.to_list() for fp in flist: v = u4spatial.compute_for_raster_in_geom( fp, geom, np.nansum, shapes.crs ) - if not v is None: + if v is not None: vol += v v = u4spatial.compute_for_raster_in_geom( fp, geom, u4spatial.vol_removed, shapes.crs ) - if not v is None: + if v is not None: vol_removed += v v = u4spatial.compute_for_raster_in_geom( fp, geom, u4spatial.vol_added, shapes.crs ) - if not v is None: + if v is not None: vol_added += v v = u4spatial.compute_for_raster_in_geom( fp, geom, u4spatial.vol_moved, shapes.crs ) - if not v is None: + if v is not None: vol_moved += v + v = u4spatial.compute_for_raster_in_geom( + fp, geom, u4spatial.vol_error, shapes.crs + ) + if v is not None: + vol_error += v else: vol = np.nan vol_removed = np.nan vol_added = np.nan vol_moved = np.nan + vol_error = np.nan volumes.append(vol) volumes_removed.append(vol_removed) volumes_added.append(vol_added) volumes_moved.append(vol_moved) + volumes_error.append(np.sqrt(vol_error)) return { "volumes": volumes, "volumes_removed": volumes_removed, "volumes_added": volumes_added, "volumes_moved": volumes_moved, + "volumes_error": volumes_error, } @@ -514,12 +524,12 @@ def get_terrain( :rtype: os.PathLike """ - processing_keywords = { - "slope": {"slopeFormat": "percent"}, - "aspect": {"zeroForFlat": True}, - "hillshade": {"zFactor": 3}, - "multi_hillshade": {"zFactor": 3, "multiDirectional": True}, - } + # processing_keywords = { + # "slope": {"slopeFormat": "percent"}, + # "aspect": {"zeroForFlat": True}, + # "hillshade": {"zFactor": 3}, + # "multi_hillshade": {"zFactor": 3, "multiDirectional": True}, + # } processing = { "slope": "slope", "aspect": "aspect", @@ -538,9 +548,10 @@ def get_terrain( if not os.path.exists(out_path) or overwrite: logging.debug(f"Creating slope for {file_name}.") - gdal_opts = gdal.DEMProcessingOptions( - **processing_keywords[terrain_feature] - ) # For now the Options seem not to work properly, DEMProcessing does + # gdal_opts = gdal.DEMProcessingOptions( + # **processing_keywords[terrain_feature] + # ) + # For now the Options seem not to work properly, DEMProcessing does # not accept further kwargs, even though its mentioned in the # documentation. gdal.DEMProcessing(out_path, file_path, processing[terrain_feature]) diff --git a/u4py/utils/types.py b/u4py/utils/types.py index b770931..3c7e3c0 100644 --- a/u4py/utils/types.py +++ b/u4py/utils/types.py @@ -124,6 +124,9 @@ class U4ResDict(TypedDict): manual_unclear_1: bool manual_unclear_2: bool manual_unclear_3: bool + railways_has: bool + railways_length: float + railways_close: bool roads_has_motorway: bool roads_has_primary: bool roads_has_secondary: bool @@ -133,6 +136,12 @@ class U4ResDict(TypedDict): roads_motorway_length: list roads_primary_length: list roads_secondary_length: list + roads_nearest_motorway_name: str + roads_nearest_primary_name: str + roads_nearest_secondary_name: str + roads_nearest_motorway_dist: float + roads_nearest_primary_dist: float + roads_nearest_secondary_dist: float roads_main_area: float roads_main: gp.GeoDataFrame roads_minor_area: float @@ -188,6 +197,7 @@ class U4ResDict(TypedDict): volumes_moved: float volumes_removed: float volumes_total: float + volumes_error: float volumes_polygons_added: float volumes_polygons_moved: float volumes_polygons_removed: float -- GitLab