Skip to content
Snippets Groups Projects
Commit 59bf8a73 authored by Leon Michel Gorißen's avatar Leon Michel Gorißen
Browse files

Refactor data retrieval, model I/O, and training logic for improved modularity and flexibility

- Added  to support organized data downloads by robot UUID.
- Enhanced  and  functions with typing and documentation.
- Introduced functions for saving and loading configurations to/from JSON files.
- Updated training workflow to handle pre-trained model layers: selectively remove last layers and freeze them before adding new layers.
- Implemented flexible setup for W&B sweeps, including hyperparameter configuration from external data.
- Streamlined upload process by refactoring config saving logic and reducing redundant code.
- Improved logging for data analysis and sweep setup for better traceability.
parent aba59654
Branches
Tags
No related merge requests found
......@@ -29,6 +29,17 @@ RESOURCE = PROJECT.resource(
) # Address the specific resource in the project
def download_resource_content_into_uuid_folders():
files = RESOURCE.files(path="train", recursive=True, with_metadata=True)
for file in track(files):
if file.is_folder:
continue
logger.info(f"File: {file.name}")
robot_uuid = file.metadata_form()["Robot UUID"][0]
Path(f"./Trajectory Data/train/{robot_uuid}").mkdir(parents=True, exist_ok=True)
file.download(f"./Trajectory Data/train/{robot_uuid}/{file.name}")
def download_resource_content(resource: coscine.resource.Resource = RESOURCE) -> Path:
"""Downloads none-existend contents of the resource.
......
......@@ -3,12 +3,25 @@ import io
import h5py
import tensorflow as tf
from pritty_logger import RichLogger
from keras.models import Sequential
import json
import wandb
from typing import Any
logger = RichLogger("dynamics_learning-model_io")
# Save model to a binary file
def save_model_to_binary_file(model, file_path):
def save_model_to_binary_file(model: Sequential, file_path: str) -> None:
"""Save the model to a binary file serialized in HDF5 format. Can be used to upload the model to coscine.
Args:
model (Sequential): Model to be saved.
file_path (str): File path to save the model to. Ends with .h5.
Returns:
None
"""
# Create an in-memory file
with io.BytesIO() as byte_io:
# Save the model to the in-memory file
......@@ -20,10 +33,19 @@ def save_model_to_binary_file(model, file_path):
with open(file_path, "wb") as f:
f.write(byte_io.getvalue())
logger.log(f"Model saved to {file_path}")
return None
# Function to load the model from a binary file
def load_model_from_binary_file(file_path):
def load_model_from_binary_file(file_path: str) -> Sequential:
"""Loads model from a binary file serialized in HDF5 format.
Args:
file_path (str): File path to load the model from. Ends with .h5.
Returns:
Sequential: loaded model
"""
# Read the binary file content
with open(file_path, "rb") as f:
binary_data = f.read()
......@@ -36,3 +58,38 @@ def load_model_from_binary_file(file_path):
logger.log("Model loaded from in-memory file")
return model
def save_config(config: wandb.config, config_path: str) -> None:
"""Save the config to a json file.
Args:
config (wandb.config): wandb.config object to be saved.
config_path (str): File path to save the config to. Ends with .json.
Returns:
None
"""
config_data = json.dumps(dict(config.items()), indent=4)
config_bytes = config_data.encode("utf-8")
with open(config_path, "wb") as file:
file.write(config_bytes)
logger.log(f"Config saved to {config_path}")
return None
def load_config(config_path: str) -> Any:
"""Load the config from a json file.
Args:
config_path (str): File path to load the config from. Ends with .json.
Returns:
Any: python object of loaded config
"""
with open(config_path, "rb") as file:
config_bytes = file.read()
config_data = json.loads(config_bytes.decode("utf-8"))
logger.log(f"Config loaded from {config_path}:")
logger.log(config_data)
return config_data
......@@ -91,6 +91,7 @@ def analyze(directory: str) -> Tuple[AttainedDataDict, CommandDataDict]:
tau_meas = np.empty((0, 7), np.float32)
directory_in = f"{directory}"
logger.info(f"Analyzing dataset in {directory_in}")
# Return a list of all meas.csv files in the directory in alphabetical order
file_list = list(
......
......@@ -4,26 +4,35 @@
# Released under MIT License
from pritty_logger import RichLogger
from typing import Any
import wandb
from dynamics_learning.environment import (
SWEEP_ID,
WANDB_ENTITY,
WANDB_NOTES,
WANDB_PROJECT,
)
from dynamics_learning.sweep.sweep_config import sweep_config
from dynamics_learning.sweep.sweep_config import (
sweep_config,
sweep_config_from_hyperparameters,
)
# Create a logger named 'sweep_setup_logger'
logger = RichLogger("sweep_setup_logger")
logger = RichLogger("dynamics_learning-sweep_setup_logger")
def setup_sweep(create_sweep: bool = False) -> tuple[str, dict[str, Any]]:
"""Sets up a Sweep from the given sweep_config.
def setup_sweep():
"""Sets up a Sweep.
Args:
create_sweep (bool, optional): If True a new sweep is initialized. Defaults to False.
If SWEEP_ID is set in environment, this sweep will be used. Else
a new sweep will be created using the sweep_config from sweep_config.py.
Returns:
tuple[str, dict[str, Any]]: sweep_id and sweep_config
"""
global SWEEP_ID
if create_sweep:
SWEEP_ID = None
if not SWEEP_ID:
logger.log("Running the sweep with this config:")
logger.log(sweep_config)
......@@ -36,3 +45,71 @@ def setup_sweep():
sweep_config["notes"] = WANDB_NOTES
logger.log(f"Running sweep_id {sweep_id}")
return sweep_id, sweep_config
def setup_sweep_from_hyperparameters(
config_data: Any, create_sweep: bool = False
) -> tuple[str, dict[str, Any]]:
"""Sets up a Sweep from given hyperparameters within a sain range.
Args:
config_data (Any): hyperparameters object.
create_sweep (bool, optional): If True, a new sweep is created. Defaults to False.
Returns:
tuple[str, dict[str, Any]]: sweep_id and sweep_config
"""
global SWEEP_ID
parameters_dict_from_hyperparameters = {
"optimizer": config_data.optimizer,
"clipnorm": {
"distribution": "log_uniform_values",
"min": config_data.clipnorm * 0.5,
"max": config_data.clipnorm * 1.5,
},
"learning_rate": {
"distribution": "log_uniform_values",
"min": config_data.learning_rate * 0.5,
"max": min(config_data.learning_rate * 1.5, 0.9),
},
"window_size": {"value": config_data.window_size},
"batch_size": {"value": config_data.batch_size},
"units": {
"distribution": "int_uniform",
"min": max(1, config_data.units - 10),
"max": min(config_data.units + 10, 100),
},
"dropout": {
"distribution": "log_uniform_values",
"min": config_data.dropout * 0.5,
"max": min(config_data.dropout * 1.5, 1),
},
"layers": {
"distribution": "int_uniform",
"min": max(1, config_data.layers - 10),
"max": min(config_data.layers + 10, 100),
}, # {"value": 10},
"epochs": {"value": config_data.epochs},
}
sweep_config_from_hyperparameters["parameters"] = (
parameters_dict_from_hyperparameters
)
if create_sweep:
SWEEP_ID = None
if not SWEEP_ID:
logger.log("Running the sweep with this config:")
logger.log(sweep_config_from_hyperparameters)
sweep_id = wandb.sweep(
sweep_config_from_hyperparameters,
project=WANDB_PROJECT,
entity=WANDB_ENTITY,
)
else:
sweep_id = SWEEP_ID
sweep_config_from_hyperparameters["sweep_id"] = sweep_id
sweep_config_from_hyperparameters["project"] = WANDB_PROJECT
sweep_config_from_hyperparameters["entity"] = WANDB_ENTITY
sweep_config_from_hyperparameters["notes"] = WANDB_NOTES
logger.log(f"Running sweep_id {sweep_id}")
return sweep_id, sweep_config_from_hyperparameters
......@@ -27,7 +27,11 @@ parameters_dict = {
"units": {"distribution": "int_uniform", "min": 1, "max": 100},
"dropout": {"distribution": "log_uniform_values", "min": 1e-5, "max": 1},
"layers": {"distribution": "int_uniform", "min": 1, "max": 100}, # {"value": 10},
"epochs": {"value": 100},
"epochs": {"value": 2},
}
sweep_config["parameters"] = parameters_dict
sweep_config_from_hyperparameters = {"method": "bayes"}
sweep_config_from_hyperparameters["metric"] = metric
sweep_config_from_hyperparameters["early_terminate"] = early_terminate
......@@ -2,7 +2,7 @@
# -*- coding:utf-8 -*-
# Copyright Leon Gorissen
# Released under MIT License
import json
# import json
from pathlib import Path
from typing import Dict, Tuple
......@@ -15,6 +15,7 @@ from keras.metrics import Accuracy, KLDivergence, MeanAbsolutePercentageError
from keras.models import Sequential
from pritty_logger import RichLogger
from tensorflow.keras.callbacks import History
from tensorflow.keras.models import Model # , Sequential
import wandb
from dynamics_learning.data_retrieval import (
......@@ -23,7 +24,7 @@ from dynamics_learning.data_retrieval import (
update_existing_file,
)
from dynamics_learning.environment import WANDB_ENTITY, WANDB_NOTES, WANDB_PROJECT
from dynamics_learning.model_io import save_model_to_binary_file
from dynamics_learning.model_io import save_model_to_binary_file, save_config
# Create a logger named 'sweep_setup_logger'
logger = RichLogger("dynamics_learning-model_training")
......@@ -92,7 +93,9 @@ class Metrics(Callback):
def train(
q_qd_qdd_interpolated_command_input: tf.Tensor, tau_attained_input: tf.Tensor
q_qd_qdd_interpolated_command_input: tf.Tensor,
tau_attained_input: tf.Tensor,
model: Sequential = None,
) -> Tuple[Sequential, History]:
"""Train the model.
......@@ -110,6 +113,21 @@ def train(
) as run:
config = wandb.config
if model:
# Take window size from the loaded model's input shape
model_input_shape = model.layers[0].input_shape
window_size = model_input_shape[1]
batch_size = model_input_shape[0]
else:
# Use the window size defined in config if no model is loaded
model_input_shape = (
None,
config.window_size,
q_qd_qdd_interpolated_command_input.shape[-1],
)
window_size = config.window_size
batch_size = config.batch_size
# Split Dataset to train and validation
train_size = int(0.8 * q_qd_qdd_interpolated_command_input.shape[0])
val_size = q_qd_qdd_interpolated_command_input.shape[0] - train_size
......@@ -120,10 +138,10 @@ def train(
_, y_train, _, y_val = tf.split(
tau_attained_input,
[
config.window_size - 1,
train_size - config.window_size + 1,
config.window_size - 1,
val_size - config.window_size + 1,
window_size - 1,
train_size - window_size + 1,
window_size - 1,
val_size - window_size + 1,
],
0,
)
......@@ -131,9 +149,87 @@ def train(
# Sliding Window for input interpolated command joint data
import tensorflow_text as tf_text
x_train = tf_text.sliding_window(data=x_train, width=config.window_size, axis=0)
x_val = tf_text.sliding_window(data=x_val, width=config.window_size, axis=0)
x_train = tf_text.sliding_window(data=x_train, width=window_size, axis=0)
x_val = tf_text.sliding_window(data=x_val, width=window_size, axis=0)
# if a model is passed, use it, else create a new model
if model:
model = model
logger.info(f"model shape is {model.layers[0].input_shape}")
# Define how many layers you want to keep (e.g., exclude last 2 layers)
# Get the number of layers in the model
total_layers = len(model.layers)
# Print the total number of layers
logger.info(f"Total layers in the model: {total_layers}")
# Define how many layers you want to remove
layers_to_remove = 5
# Check if you can remove five layers; if not, adjust to delete fewer layers
layers_to_remove = min(layers_to_remove, total_layers - 2)
# If it's a Sequential model
if isinstance(model, Sequential):
# Remove the specified number of layers
for _ in range(layers_to_remove):
model.pop()
new_model = model
else:
# If it's a Functional model
# Truncate the model to remove the specified number of layers
new_model = Model(
inputs=model.input,
outputs=model.layers[-(layers_to_remove + 1)].output,
)
for layer in new_model.layers:
layer.trainable = False
# Add three LSTM layers, a dropout layer, and a dense layer
model.add(
LSTM(
units=config.units,
return_sequences=True,
input_shape=(x_train.shape[1], x_train.shape[2]),
activation="tanh",
recurrent_activation="sigmoid",
recurrent_dropout=0,
unroll=False,
use_bias=True,
name="LSTM_1",
)
)
model.add(
LSTM(
units=config.units,
return_sequences=True,
input_shape=(x_train.shape[1], x_train.shape[2]),
activation="tanh",
recurrent_activation="sigmoid",
recurrent_dropout=0,
unroll=False,
use_bias=True,
name="LSTM_2",
)
)
model.add(LSTM(units=config.units, return_sequences=False, name="LSTM_3"))
model.add(Dropout(config.dropout))
model.add(Dense(units=y_train.shape[1]))
# Compile the model
opt = optimizers.Adam(
learning_rate=config.learning_rate, clipvalue=config.clipnorm
)
model.compile(
optimizer=opt,
loss=MeanSquaredError(),
metrics=[Accuracy(), KLDivergence(), MeanAbsolutePercentageError()],
run_eagerly=True,
)
model.build(x_train.shape)
else:
model = make_model(config, x_train, y_train)
from wandb.integration.keras import WandbCallback
......@@ -143,7 +239,7 @@ def train(
y_train,
epochs=config.epochs,
shuffle=True,
batch_size=config.batch_size,
batch_size=batch_size,
verbose=2,
validation_data=(x_val, y_val),
callbacks=[
......@@ -172,7 +268,7 @@ def upload(model: Sequential, history: History, config: Dict):
resource = PROJECT.resource("Foundation_Model")
metadataform = resource.metadata_form()
metadataform["Title"] = (
f"{decision_metric}_val_loss" # TODO implement custom coscine application profile for the models
f"{decision_metric}_val_loss" # TODO implement custom coscine application profile for the models based on MITM base profile
)
logger.info(f"The following meta data is saved\n{metadataform}")
_resource_files, resource_objects = get_resource_content(resource, path="models/")
......@@ -189,10 +285,7 @@ def upload(model: Sequential, history: History, config: Dict):
config_local_file_name = (
"/app/dynamics_learning/Foundation_Model/hyperparameters/hyperparameters.json"
)
config_data = json.dumps(dict(config.items()), indent=4)
config_bytes = config_data.encode("utf-8")
with open(config_local_file_name, "wb") as file:
file.write(config_bytes)
save_config(config, config_local_file_name)
logger.info("Checking if a model exists in the resource.")
# if no model exits, upload the current model to coscine
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment