diff --git a/dynamics_learning/dynamics_learning/data_retrieval/__init__.py b/dynamics_learning/dynamics_learning/data_retrieval/__init__.py index e01a8cf777e81b88e42740f1af6c1c247c5ad938..3cf38bfdff501772e99815bb6955f882c7f3a597 100644 --- a/dynamics_learning/dynamics_learning/data_retrieval/__init__.py +++ b/dynamics_learning/dynamics_learning/data_retrieval/__init__.py @@ -29,6 +29,17 @@ RESOURCE = PROJECT.resource( ) # Address the specific resource in the project +def download_resource_content_into_uuid_folders(): + files = RESOURCE.files(path="train", recursive=True, with_metadata=True) + for file in track(files): + if file.is_folder: + continue + logger.info(f"File: {file.name}") + robot_uuid = file.metadata_form()["Robot UUID"][0] + Path(f"./Trajectory Data/train/{robot_uuid}").mkdir(parents=True, exist_ok=True) + file.download(f"./Trajectory Data/train/{robot_uuid}/{file.name}") + + def download_resource_content(resource: coscine.resource.Resource = RESOURCE) -> Path: """Downloads none-existend contents of the resource. diff --git a/dynamics_learning/dynamics_learning/model_io/__init__.py b/dynamics_learning/dynamics_learning/model_io/__init__.py index 55024f6f0fb89c9be218c6afca1d5ad4a0cd99d7..413f9bea0ae9bb5268fb103b780d2fc1a8c881d3 100644 --- a/dynamics_learning/dynamics_learning/model_io/__init__.py +++ b/dynamics_learning/dynamics_learning/model_io/__init__.py @@ -3,12 +3,25 @@ import io import h5py import tensorflow as tf from pritty_logger import RichLogger +from keras.models import Sequential +import json +import wandb +from typing import Any logger = RichLogger("dynamics_learning-model_io") # Save model to a binary file -def save_model_to_binary_file(model, file_path): +def save_model_to_binary_file(model: Sequential, file_path: str) -> None: + """Save the model to a binary file serialized in HDF5 format. Can be used to upload the model to coscine. + + Args: + model (Sequential): Model to be saved. + file_path (str): File path to save the model to. Ends with .h5. + + Returns: + None + """ # Create an in-memory file with io.BytesIO() as byte_io: # Save the model to the in-memory file @@ -20,10 +33,19 @@ def save_model_to_binary_file(model, file_path): with open(file_path, "wb") as f: f.write(byte_io.getvalue()) logger.log(f"Model saved to {file_path}") + return None # Function to load the model from a binary file -def load_model_from_binary_file(file_path): +def load_model_from_binary_file(file_path: str) -> Sequential: + """Loads model from a binary file serialized in HDF5 format. + + Args: + file_path (str): File path to load the model from. Ends with .h5. + + Returns: + Sequential: loaded model + """ # Read the binary file content with open(file_path, "rb") as f: binary_data = f.read() @@ -36,3 +58,38 @@ def load_model_from_binary_file(file_path): logger.log("Model loaded from in-memory file") return model + + +def save_config(config: wandb.config, config_path: str) -> None: + """Save the config to a json file. + + Args: + config (wandb.config): wandb.config object to be saved. + config_path (str): File path to save the config to. Ends with .json. + + Returns: + None + """ + config_data = json.dumps(dict(config.items()), indent=4) + config_bytes = config_data.encode("utf-8") + with open(config_path, "wb") as file: + file.write(config_bytes) + logger.log(f"Config saved to {config_path}") + return None + + +def load_config(config_path: str) -> Any: + """Load the config from a json file. + + Args: + config_path (str): File path to load the config from. Ends with .json. + + Returns: + Any: python object of loaded config + """ + with open(config_path, "rb") as file: + config_bytes = file.read() + config_data = json.loads(config_bytes.decode("utf-8")) + logger.log(f"Config loaded from {config_path}:") + logger.log(config_data) + return config_data diff --git a/dynamics_learning/dynamics_learning/preprocessing/dataset_analysis.py b/dynamics_learning/dynamics_learning/preprocessing/dataset_analysis.py index ce5ee90fbf7bc7865f8290e9b634d961fde4ce66..e26511710e5213a597eedec205a77d4a315e95a0 100644 --- a/dynamics_learning/dynamics_learning/preprocessing/dataset_analysis.py +++ b/dynamics_learning/dynamics_learning/preprocessing/dataset_analysis.py @@ -53,7 +53,7 @@ class CommandDataDict(TypedDict): def analyze(directory: str) -> Tuple[AttainedDataDict, CommandDataDict]: """ - Analyzes the specified dataset in the given directory, generates statistics, creates plotsand concat all files into one in alphabetical order. + Analyzes the specified dataset in the given directory, generates statistics, creates plots and concat all files into one in alphabetical order. Args: directory (str): The directory containing the dataset. @@ -91,6 +91,7 @@ def analyze(directory: str) -> Tuple[AttainedDataDict, CommandDataDict]: tau_meas = np.empty((0, 7), np.float32) directory_in = f"{directory}" + logger.info(f"Analyzing dataset in {directory_in}") # Return a list of all meas.csv files in the directory in alphabetical order file_list = list( diff --git a/dynamics_learning/dynamics_learning/sweep/setup.py b/dynamics_learning/dynamics_learning/sweep/setup.py index 192059ee36887aad61745529031789ac2044089e..a72c7ad8a4cb78793319bcefc797780ff4774a69 100644 --- a/dynamics_learning/dynamics_learning/sweep/setup.py +++ b/dynamics_learning/dynamics_learning/sweep/setup.py @@ -4,26 +4,35 @@ # Released under MIT License from pritty_logger import RichLogger +from typing import Any import wandb from dynamics_learning.environment import ( - SWEEP_ID, WANDB_ENTITY, WANDB_NOTES, WANDB_PROJECT, ) -from dynamics_learning.sweep.sweep_config import sweep_config +from dynamics_learning.sweep.sweep_config import ( + sweep_config, + sweep_config_from_hyperparameters, +) # Create a logger named 'sweep_setup_logger' -logger = RichLogger("sweep_setup_logger") +logger = RichLogger("dynamics_learning-sweep_setup_logger") + +def setup_sweep(create_sweep: bool = False) -> tuple[str, dict[str, Any]]: + """Sets up a Sweep from the given sweep_config. -def setup_sweep(): - """Sets up a Sweep. + Args: + create_sweep (bool, optional): If True a new sweep is initialized. Defaults to False. - If SWEEP_ID is set in environment, this sweep will be used. Else - a new sweep will be created using the sweep_config from sweep_config.py. + Returns: + tuple[str, dict[str, Any]]: sweep_id and sweep_config """ + global SWEEP_ID + if create_sweep: + SWEEP_ID = None if not SWEEP_ID: logger.log("Running the sweep with this config:") logger.log(sweep_config) @@ -36,3 +45,71 @@ def setup_sweep(): sweep_config["notes"] = WANDB_NOTES logger.log(f"Running sweep_id {sweep_id}") return sweep_id, sweep_config + + +def setup_sweep_from_hyperparameters( + config_data: Any, create_sweep: bool = False +) -> tuple[str, dict[str, Any]]: + """Sets up a Sweep from given hyperparameters within a sain range. + + Args: + config_data (Any): hyperparameters object. + create_sweep (bool, optional): If True, a new sweep is created. Defaults to False. + + Returns: + tuple[str, dict[str, Any]]: sweep_id and sweep_config + """ + global SWEEP_ID + parameters_dict_from_hyperparameters = { + "optimizer": config_data.optimizer, + "clipnorm": { + "distribution": "log_uniform_values", + "min": config_data.clipnorm * 0.5, + "max": config_data.clipnorm * 1.5, + }, + "learning_rate": { + "distribution": "log_uniform_values", + "min": config_data.learning_rate * 0.5, + "max": min(config_data.learning_rate * 1.5, 0.9), + }, + "window_size": {"value": config_data.window_size}, + "batch_size": {"value": config_data.batch_size}, + "units": { + "distribution": "int_uniform", + "min": max(1, config_data.units - 10), + "max": min(config_data.units + 10, 100), + }, + "dropout": { + "distribution": "log_uniform_values", + "min": config_data.dropout * 0.5, + "max": min(config_data.dropout * 1.5, 1), + }, + "layers": { + "distribution": "int_uniform", + "min": max(1, config_data.layers - 10), + "max": min(config_data.layers + 10, 100), + }, # {"value": 10}, + "epochs": {"value": config_data.epochs}, + } + sweep_config_from_hyperparameters["parameters"] = ( + parameters_dict_from_hyperparameters + ) + + if create_sweep: + SWEEP_ID = None + if not SWEEP_ID: + logger.log("Running the sweep with this config:") + logger.log(sweep_config_from_hyperparameters) + sweep_id = wandb.sweep( + sweep_config_from_hyperparameters, + project=WANDB_PROJECT, + entity=WANDB_ENTITY, + ) + else: + sweep_id = SWEEP_ID + sweep_config_from_hyperparameters["sweep_id"] = sweep_id + sweep_config_from_hyperparameters["project"] = WANDB_PROJECT + sweep_config_from_hyperparameters["entity"] = WANDB_ENTITY + sweep_config_from_hyperparameters["notes"] = WANDB_NOTES + logger.log(f"Running sweep_id {sweep_id}") + return sweep_id, sweep_config_from_hyperparameters diff --git a/dynamics_learning/dynamics_learning/sweep/sweep_config.py b/dynamics_learning/dynamics_learning/sweep/sweep_config.py index 64f472afa22675c30bdc1860859e2ca5a8c94e1c..b913243f20b3388f11ad60069e3ba69a79f533ed 100644 --- a/dynamics_learning/dynamics_learning/sweep/sweep_config.py +++ b/dynamics_learning/dynamics_learning/sweep/sweep_config.py @@ -27,7 +27,11 @@ parameters_dict = { "units": {"distribution": "int_uniform", "min": 1, "max": 100}, "dropout": {"distribution": "log_uniform_values", "min": 1e-5, "max": 1}, "layers": {"distribution": "int_uniform", "min": 1, "max": 100}, # {"value": 10}, - "epochs": {"value": 100}, + "epochs": {"value": 2}, } sweep_config["parameters"] = parameters_dict + +sweep_config_from_hyperparameters = {"method": "bayes"} +sweep_config_from_hyperparameters["metric"] = metric +sweep_config_from_hyperparameters["early_terminate"] = early_terminate diff --git a/dynamics_learning/dynamics_learning/training/__init__.py b/dynamics_learning/dynamics_learning/training/__init__.py index 9c51d7423df4b5c79efb2b2b60dcc092e3cd3fc5..18505eb88b520125e82889840175e30b46a50d02 100644 --- a/dynamics_learning/dynamics_learning/training/__init__.py +++ b/dynamics_learning/dynamics_learning/training/__init__.py @@ -2,7 +2,7 @@ # -*- coding:utf-8 -*- # Copyright Leon Gorissen # Released under MIT License -import json +# import json from pathlib import Path from typing import Dict, Tuple @@ -15,6 +15,7 @@ from keras.metrics import Accuracy, KLDivergence, MeanAbsolutePercentageError from keras.models import Sequential from pritty_logger import RichLogger from tensorflow.keras.callbacks import History +from tensorflow.keras.models import Model # , Sequential import wandb from dynamics_learning.data_retrieval import ( @@ -23,7 +24,7 @@ from dynamics_learning.data_retrieval import ( update_existing_file, ) from dynamics_learning.environment import WANDB_ENTITY, WANDB_NOTES, WANDB_PROJECT -from dynamics_learning.model_io import save_model_to_binary_file +from dynamics_learning.model_io import save_model_to_binary_file, save_config # Create a logger named 'sweep_setup_logger' logger = RichLogger("dynamics_learning-model_training") @@ -92,7 +93,9 @@ class Metrics(Callback): def train( - q_qd_qdd_interpolated_command_input: tf.Tensor, tau_attained_input: tf.Tensor + q_qd_qdd_interpolated_command_input: tf.Tensor, + tau_attained_input: tf.Tensor, + model: Sequential = None, ) -> Tuple[Sequential, History]: """Train the model. @@ -110,6 +113,21 @@ def train( ) as run: config = wandb.config + if model: + # Take window size from the loaded model's input shape + model_input_shape = model.layers[0].input_shape + window_size = model_input_shape[1] + batch_size = model_input_shape[0] + else: + # Use the window size defined in config if no model is loaded + model_input_shape = ( + None, + config.window_size, + q_qd_qdd_interpolated_command_input.shape[-1], + ) + window_size = config.window_size + batch_size = config.batch_size + # Split Dataset to train and validation train_size = int(0.8 * q_qd_qdd_interpolated_command_input.shape[0]) val_size = q_qd_qdd_interpolated_command_input.shape[0] - train_size @@ -120,10 +138,10 @@ def train( _, y_train, _, y_val = tf.split( tau_attained_input, [ - config.window_size - 1, - train_size - config.window_size + 1, - config.window_size - 1, - val_size - config.window_size + 1, + window_size - 1, + train_size - window_size + 1, + window_size - 1, + val_size - window_size + 1, ], 0, ) @@ -131,10 +149,88 @@ def train( # Sliding Window for input interpolated command joint data import tensorflow_text as tf_text - x_train = tf_text.sliding_window(data=x_train, width=config.window_size, axis=0) - x_val = tf_text.sliding_window(data=x_val, width=config.window_size, axis=0) + x_train = tf_text.sliding_window(data=x_train, width=window_size, axis=0) + x_val = tf_text.sliding_window(data=x_val, width=window_size, axis=0) + + # if a model is passed, use it, else create a new model + if model: + model = model + logger.info(f"model shape is {model.layers[0].input_shape}") + # Define how many layers you want to keep (e.g., exclude last 2 layers) + # Get the number of layers in the model + total_layers = len(model.layers) + + # Print the total number of layers + logger.info(f"Total layers in the model: {total_layers}") + + # Define how many layers you want to remove + layers_to_remove = 5 + + # Check if you can remove five layers; if not, adjust to delete fewer layers + layers_to_remove = min(layers_to_remove, total_layers - 2) + + # If it's a Sequential model + if isinstance(model, Sequential): + # Remove the specified number of layers + for _ in range(layers_to_remove): + model.pop() + new_model = model + else: + # If it's a Functional model + # Truncate the model to remove the specified number of layers + new_model = Model( + inputs=model.input, + outputs=model.layers[-(layers_to_remove + 1)].output, + ) + + for layer in new_model.layers: + layer.trainable = False + + # Add three LSTM layers, a dropout layer, and a dense layer + model.add( + LSTM( + units=config.units, + return_sequences=True, + input_shape=(x_train.shape[1], x_train.shape[2]), + activation="tanh", + recurrent_activation="sigmoid", + recurrent_dropout=0, + unroll=False, + use_bias=True, + name="LSTM_1", + ) + ) + model.add( + LSTM( + units=config.units, + return_sequences=True, + input_shape=(x_train.shape[1], x_train.shape[2]), + activation="tanh", + recurrent_activation="sigmoid", + recurrent_dropout=0, + unroll=False, + use_bias=True, + name="LSTM_2", + ) + ) + model.add(LSTM(units=config.units, return_sequences=False, name="LSTM_3")) + model.add(Dropout(config.dropout)) + model.add(Dense(units=y_train.shape[1])) + + # Compile the model + opt = optimizers.Adam( + learning_rate=config.learning_rate, clipvalue=config.clipnorm + ) + model.compile( + optimizer=opt, + loss=MeanSquaredError(), + metrics=[Accuracy(), KLDivergence(), MeanAbsolutePercentageError()], + run_eagerly=True, + ) + model.build(x_train.shape) - model = make_model(config, x_train, y_train) + else: + model = make_model(config, x_train, y_train) from wandb.integration.keras import WandbCallback @@ -143,7 +239,7 @@ def train( y_train, epochs=config.epochs, shuffle=True, - batch_size=config.batch_size, + batch_size=batch_size, verbose=2, validation_data=(x_val, y_val), callbacks=[ @@ -172,7 +268,7 @@ def upload(model: Sequential, history: History, config: Dict): resource = PROJECT.resource("Foundation_Model") metadataform = resource.metadata_form() metadataform["Title"] = ( - f"{decision_metric}_val_loss" # TODO implement custom coscine application profile for the models + f"{decision_metric}_val_loss" # TODO implement custom coscine application profile for the models based on MITM base profile ) logger.info(f"The following meta data is saved\n{metadataform}") _resource_files, resource_objects = get_resource_content(resource, path="models/") @@ -189,10 +285,7 @@ def upload(model: Sequential, history: History, config: Dict): config_local_file_name = ( "/app/dynamics_learning/Foundation_Model/hyperparameters/hyperparameters.json" ) - config_data = json.dumps(dict(config.items()), indent=4) - config_bytes = config_data.encode("utf-8") - with open(config_local_file_name, "wb") as file: - file.write(config_bytes) + save_config(config, config_local_file_name) logger.info("Checking if a model exists in the resource.") # if no model exits, upload the current model to coscine