diff --git a/u4py/analysis/inversion.py b/u4py/analysis/inversion.py index c4e247575e0ed6c9000fc35f6c5c2ff7b881bdc1..3680314745aa860335bc00a19f52c7a8979f8dd0 100644 --- a/u4py/analysis/inversion.py +++ b/u4py/analysis/inversion.py @@ -340,8 +340,9 @@ def _clean_inputs(data: dict): :type data: dict """ logging.info("Cleaning inputs.") + ignore_list = ["inversion_results", "time"] for kk in data.keys(): - if isinstance(data[kk], np.ndarray) and kk != "inversion_results": + if isinstance(data[kk], np.ndarray) and kk not in ignore_list: ind = np.isfinite(data[kk]) for ll in data.keys(): if ( @@ -631,7 +632,7 @@ def _set_g_antenna( g_hat = np.zeros_like(time) g_hat[time > tat] = 1 g_AT.append(g_hat) - parameter_list.append(f"antenna offset no. {ii+1}") + parameter_list.append(f"antenna offset no. {ii + 1}") return g_AT, parameter_list @@ -670,7 +671,7 @@ def _set_g_earthquakes( g_heq = np.zeros_like(time) g_heq[time > t_EQ[ii]] = g_heq[time > t_EQ[ii]] + 1 g_EQ.append(g_heq) - parameter_list.append(f"earthquake no. {ii+1}") + parameter_list.append(f"earthquake no. {ii + 1}") return g_EQ, parameter_list @@ -704,7 +705,7 @@ def _set_g_postseismic( iB = time > te g_psm[iB] = g_psm[iB] + np.log(1 + (time[iB] - te)) g_POSTSM.append(g_psm) - parameter_list.append(f"postseismic no. {ii+1}") + parameter_list.append(f"postseismic no. {ii + 1}") return g_POSTSM, parameter_list @@ -735,7 +736,7 @@ def _set_g_extraction( t_off = t_start + 0.5 * duration gex = gex + _expit(time, k=k, t_off=t_off) g_EX.append(gex) - parameter_list.append(f"water extraction no. {ii+1}") + parameter_list.append(f"water extraction no. {ii + 1}") return g_EX, parameter_list diff --git a/u4py/analysis/processing.py b/u4py/analysis/processing.py index 5353cc6a5479889b33ca0a8bdd325064adf23fa1..0bf44b9f72b25bf405e43f4b3a74cab575463a7a 100644 --- a/u4py/analysis/processing.py +++ b/u4py/analysis/processing.py @@ -168,7 +168,6 @@ def inversion_map_worker(data: dict) -> Tuple[Tuple, Tuple]: Returns the data as a tuple of (`x`, `y`, `results`) for each direction. In case the inversion somehow goes wrong, (None, None) is returned. """ - try: matrix_ori, data, _, parameters_list = u4invert.invert_time_series( data @@ -182,8 +181,7 @@ def inversion_map_worker(data: dict) -> Tuple[Tuple, Tuple]: (data["xmid"], data["ymid"], matrix), ) return results - except: - logging.info("Inversion failed") + except RuntimeError: return (None, None) @@ -191,6 +189,7 @@ def batch_mapping( fnc_args: Iterable, fnc: Callable, desc: str = "", + total: int = 0, ) -> list: """Maps the function over a pool of workers. @@ -203,7 +202,14 @@ def batch_mapping( :return: The results as a list. :rtype: list """ - ncpu = min(len(fnc_args), u4config.cpu_count) + try: + nargs = len(fnc_args) + ncpu = min(nargs, u4config.cpu_count) + except TypeError: + ncpu = u4config.cpu_count + nargs = None + if not total: + total = nargs logging.info(f"Starting parallel pool with {ncpu} threads.") if desc: @@ -212,7 +218,7 @@ def batch_mapping( results = list( tqdm( p.imap_unordered(fnc, fnc_args), - total=len(fnc_args), + total=total, desc=desc, leave=False, ) diff --git a/u4py/scripts/data_analysis/Batch_Invert_GPKG.py b/u4py/scripts/data_analysis/Batch_Invert_GPKG.py index ad98ca60a4552996439008454fc1c96c656ec476..d86d63192e8e346cd6ed33ebbcf6c6d6e1cefd37 100644 --- a/u4py/scripts/data_analysis/Batch_Invert_GPKG.py +++ b/u4py/scripts/data_analysis/Batch_Invert_GPKG.py @@ -10,14 +10,15 @@ from pathlib import Path import geopandas as gp import numpy as np +import scipy.spatial as spspat from tqdm import tqdm import u4py.analysis.processing as u4proc import u4py.analysis.spatial as u4spatial import u4py.io.sql as u4sql import u4py.utils.cmd_args as u4args - -# import u4py.utils.config as u4config +import u4py.utils.config as u4config +import u4py.utils.convert as u4convert import u4py.utils.projects as u4proj # u4config.start_logger() @@ -45,7 +46,7 @@ def main(): proj_path=proj_path, ) merged_data = merge_data(project) - # # All points + # All points # if not os.path.exists(project["paths"]["output_path"]) or overwrite: # extracts = u4proc.get_extracts(merged_data) # invert_extracts(extracts, project["paths"]["output_path"]) @@ -73,64 +74,60 @@ def main(): # Get x and y extend for rastering minx = np.floor(bounds.minx[0] / raster_size) * raster_size maxx = np.ceil(bounds.maxx[0] / raster_size) * raster_size - x_rng = np.arange(minx, maxx, raster_size) miny = np.floor(bounds.miny[0] / raster_size) * raster_size maxy = np.ceil(bounds.maxy[0] / raster_size) * raster_size - y_rng = np.arange(miny, maxy, raster_size) - - # Get indices where to put data for each point - ii_x = [ - np.argwhere(x_rng >= x).squeeze()[0] - for x in tqdm( - merged_data["vertikal"]["x"], - desc="Getting x indices", - leave=False, - ) - ] - ii_y = [ - np.argwhere(y_rng >= y).squeeze()[0] - for y in tqdm( - merged_data["vertikal"]["y"], - desc="Getting y indices", - leave=False, - ) - ] - indices = np.array([(ix, iy) for ix, iy in zip(ii_x, ii_y)]) - uqidc = np.unique(indices, axis=0) - extracts = [ - { - "x": x_rng[ii_x[ii]], - "y": y_rng[ii_y[ii]], - "time": merged_data["vertikal"]["time"], - "ps_id": [], - "timeseries_v": [], - "timeseries_ew": [], - } - for ii in tqdm( - range(len(ii_x)), - desc="Preallocating extracts", - leave=False, + + # Build KDTree and queries for fast lookup + x, y = np.mgrid[minx:maxx:raster_size, miny:maxy:raster_size] + tree = spspat.KDTree(np.c_[x.ravel(), y.ravel()]) + queries = [ + (xx, yy) + for xx, yy in zip( + merged_data["vertikal"]["x"], merged_data["vertikal"]["y"] ) ] - for ii, (tsv, tsew, psid) in tqdm( - enumerate( - zip( - merged_data["vertikal"]["timeseries"], - merged_data["Ost_West"]["timeseries"], - merged_data["vertikal"]["ps_id"], - ) - ), - desc="Getting extracts", - leave=False, - total=len(merged_data["vertikal"]["x"]), + + # Lookup indices of grid where to put the data + dd, idx = tree.query(queries, workers=u4config.cpu_count) + + logging.info("Preallocating lists") + ts_v = [list() for _ in range(x.size)] + ts_ew = [list() for _ in range(x.size)] + ps_id = [list() for _ in range(x.size)] + time = [list() for _ in range(x.size)] + xs = [0 for _ in range(x.size)] + ys = [0 for _ in range(x.size)] + x_flat = np.array(x).flat + y_flat = np.array(y).flat + + # Assemble data lists + for ii, jj in tqdm( + enumerate(idx), total=len(idx), desc="Assemble data lists" ): - uqiis = np.nonzero(np.all(uqidc == indices[ii], axis=1))[0] - if len(uqiis) > 1: - logging.info("Error, to many indices found?") - else: - extracts[uqiis[0]]["timeseries_v"].append(tsv) - extracts[uqiis[0]]["timeseries_ew"].append(tsew) - extracts[uqiis[0]]["ps_id"].append(psid) + ts_v[jj].append(merged_data["vertikal"]["timeseries"][ii]) + ts_ew[jj].append(merged_data["Ost_West"]["timeseries"][ii]) + ps_id[jj].append(merged_data["vertikal"]["ps_id"][ii]) + time[jj].append(merged_data["vertikal"]["time"]) + xs[jj] = x_flat[jj] + ys[jj] = y_flat[jj] + + # Extracting data + extracts = [] + for ii in tqdm( + range(len(ts_v)), total=len(ts_v), desc="Extracting data" + ): + if len(ts_v[ii]) > 0: + extracts.append( + u4convert.reformat_gpkg( + xs[ii], + ys[ii], + 0, + ps_id[ii], + np.hstack(time[ii]), + np.hstack(ts_v[ii]), + np.hstack(ts_ew[ii]), + ) + ) invert_extracts(extracts, output_path) diff --git a/u4py/scripts/data_analysis/Convert_Inversion_Results_TIFF.py b/u4py/scripts/data_analysis/Convert_Inversion_Results_TIFF.py index d91bac255b5a957eef8e5c69d422562859002cad..41e36fb4e548b36f0fae80f87fc8ed9498a11476 100644 --- a/u4py/scripts/data_analysis/Convert_Inversion_Results_TIFF.py +++ b/u4py/scripts/data_analysis/Convert_Inversion_Results_TIFF.py @@ -14,6 +14,7 @@ the following data as geotiffs: import logging import os +from pathlib import Path import numpy as np from tqdm import tqdm @@ -23,46 +24,106 @@ import u4py.io.files as u4files import u4py.io.psi as u4psi import u4py.io.tiff as u4tiff import u4py.plotting.preparation as u4plotprep -import u4py.utils.config as u4config + +# import u4py.utils.config as u4config import u4py.utils.projects as u4proj -u4config.start_logger() +# u4config.start_logger() def main(): project = u4proj.get_project( - required=[ - "results_path", - "output_path", - ], + required=["results_path"], interactive=False, - proj_path="/home/rudolf/Documents/umwelt4/Pkl_to_Tiff_BBD2023.u4project", + proj_path=Path( + "~/Documents/umwelt4/Pkl_to_Tiff_EGMS.u4project" + ).expanduser(), ) - chunk_size = 50 logging.info("Setting up paths") - is_normal = False - _, fname = os.path.split(project["paths"]["results_path"]) - fname, _ = os.path.splitext(fname) - output_dir = os.path.join( - project["paths"]["output_path"], fname + "_tiffs" - ) - os.makedirs(output_dir, exist_ok=True) + # Getting list of all pkl files in folder + fnames = [ + fp + for fp in os.listdir(project["paths"]["results_path"]) + if fp.endswith(".pkl") + ] + chunk_sizes = [] + for fn in fnames: + if "EGMS" in fn: + is_egms = False + if "vertikal_inversion_results" in fn: + chunk_sizes.append(100) + elif "5000m" in fn: + chunk_sizes.append(5000) + elif "2500m" in fn: + chunk_sizes.append(2500) + elif "250m" in fn: + chunk_sizes.append(250) + elif "1000m" in fn: + chunk_sizes.append(1000) + elif "500m" in fn: + chunk_sizes.append(500) + else: + raise ValueError("Could not get chunksize from filename.") + + os.makedirs(project["paths"]["results_path"], exist_ok=True) + fail_list = [] + for fname, chunk_size in tqdm( + zip(fnames, chunk_sizes), desc="Converting files", total=len(fnames) + ): + failed = convert_pkl_to_tiff( + fname, project["paths"]["results_path"], is_egms, chunk_size + ) + if failed: + fail_list.append(failed) + if fail_list: + print(" ### Script failed for: ### ") + for fl in fail_list: + print(fl) + + +def convert_pkl_to_tiff( + full_name: str, + output_dir: os.PathLike, + is_normal: bool, + chunk_size: int, +): + pkl_path = os.path.join(output_dir, full_name) + fname = full_name[full_name.index("EGMS") : full_name.index("EGMS") + 14] logging.info("Loading and rearranging data for further analysis") if is_normal: - data = u4psi.get_pickled_inversion_results( - project["paths"]["results_path"] - ) - converted_data, chunk_size = u4plotprep.convert_results_for_grid( - data[0], chunk_size=chunk_size - ) + data = u4psi.get_pickled_inversion_results(pkl_path) + if data[0] and data[1]: + converted_data, chunk_size = u4plotprep.convert_results_for_grid( + data[0], chunk_size=chunk_size + ) + else: + converted_data = [] else: - data = u4files.get_all_pickle_data(project["paths"]["results_path"]) - converted_data, chunk_size = u4plotprep.convert_results_for_grid( - data[1], chunk_size=chunk_size + data = u4files.get_all_pickle_data(pkl_path) + if data[0] and data[1]: + converted_data, chunk_size = u4plotprep.convert_results_for_grid( + data[1], chunk_size=chunk_size + ) + else: + converted_data = [] + + if len(converted_data) > 0: + create_grids( + converted_data, chunk_size, output_dir, fname, crs="EPSG:3035" ) + else: + return full_name + +def create_grids( + converted_data: list, + chunk_size: int, + output_dir: os.PathLike, + fname: str, + crs: str, +): logging.info("Creating numpy arrays from data") grids, extend = u4plotprep.make_gridded_data(converted_data, chunk_size) grid_names = [ @@ -90,7 +151,7 @@ def main(): output_dir, f"{name}_{chunk_size}_{fname}.tif", ), - crs="EPSG:32632", + crs=crs, compress="lzw", ) @@ -109,11 +170,11 @@ def main(): output_dir, f"abs_{name}_{chunk_size}_{fname}.tif", ), - crs="EPSG:32632", + crs=crs, compress="lzw", ) - logging.info(f"Maximum of annual") + logging.info("Maximum of annual") max_vals, max_time = u4other.find_maximum_sines( grids[1], grids[2], interval=365 ) @@ -124,7 +185,7 @@ def main(): output_dir, f"max_vals_annual_{chunk_size}_{fname}.tif", ), - crs="EPSG:32632", + crs=crs, compress="lzw", ) u4tiff.ndarray_to_geotiff( @@ -134,11 +195,11 @@ def main(): output_dir, f"max_time_annual_{chunk_size}_{fname}.tif", ), - crs="EPSG:32632", + crs=crs, compress="lzw", ) - logging.info(f"Maximum of semiannual") + logging.info("Maximum of semiannual") max_vals, max_time = u4other.find_maximum_sines( grids[3], grids[4], interval=365 / 2 ) @@ -149,7 +210,7 @@ def main(): output_dir, f"max_vals_semiannual_{chunk_size}_{fname}.tif", ), - crs="EPSG:32632", + crs=crs, compress="lzw", ) u4tiff.ndarray_to_geotiff( @@ -159,7 +220,7 @@ def main(): output_dir, f"max_time_semiannual_{chunk_size}_{fname}.tif", ), - crs="EPSG:32632", + crs=crs, compress="lzw", )