diff --git a/dynamics_learning/analysis.py b/dynamics_learning/analysis.py deleted file mode 100644 index 39a6f97caf3831d7ae0b630f14a5eb76f9d6cdce..0000000000000000000000000000000000000000 --- a/dynamics_learning/analysis.py +++ /dev/null @@ -1,297 +0,0 @@ -#%% -from pathlib import Path -import numpy as np -import pandas as pd -import matplotlib.pyplot as plt -from pritty_logger import RichLogger -from rich.progress import track -from scipy.optimize import curve_fit -from sklearn.metrics import r2_score - -# Physical joint limits -q_lim_max_phys = ( - np.array([2.7973, 1.6628, 2.7973, -0.1698, 2.7973, 3.6525, 2.7973]) + 0.1 -) -q_lim_min_phys = ( - np.array([-2.7973, -1.6628, -2.7973, -2.9718, -2.7973, 0.1175, -2.7973]) - 0.1 -) -qd_lim_max_phys = np.array([2.175, 2.175, 2.175, 2.175, 2.61, 2.61, 2.61]) -qd_lim_min_phys = -1 * qd_lim_max_phys -qdd_lim_max_phys = np.array([15, 7.5, 10, 12.5, 15, 20, 20]) -qdd_lim_min_phys = -1 * qdd_lim_max_phys -tau_lim_max_phys = np.array([87, 87, 87, 87, 12, 12, 12]) -tau_lim_min_phys = -1 * tau_lim_max_phys - -physical_limits = { - "q_lim_max_phys": q_lim_max_phys, - "q_lim_min_phys": q_lim_min_phys, - "qd_lim_max_phys": qd_lim_max_phys, - "qd_lim_min_phys": qd_lim_min_phys, - "qdd_lim_max_phys": qdd_lim_max_phys, - "qdd_lim_min_phys": qdd_lim_min_phys, - "tau_lim_max_phys": tau_lim_max_phys, - "tau_lim_min_phys": tau_lim_min_phys, -} - -# Moveit limits -# /opt/ros/noetic/share/franka_description/robots/panda/joint_limits.yaml -# /opt/ros/noetic/share/panda_moveit_config/config/joint_limits.yaml -q_lim_max_moveit = q_lim_max_phys - 0.1 -q_lim_min_moveit = q_lim_min_phys + 0.1 -qd_lim_max_moveit = qd_lim_max_phys -qd_lim_min_moveit = qd_lim_min_phys -qdd_lim_max_moveit = np.array([3.75, 1.875, 2.5, 3.125, 3.75, 5, 5]) -qdd_lim_min_moveit = -1 * qdd_lim_max_moveit -tau_lim_max_moveit = np.array([87, 87, 87, 87, 12, 12, 12]) -tau_lim_min_moveit = -1 * tau_lim_max_phys - -moveit_limits = { - "q_lim_max_moveit": q_lim_max_moveit, - "q_lim_min_moveit": q_lim_min_moveit, - "qd_lim_max_moveit": qd_lim_max_moveit, - "qd_lim_min_moveit": qd_lim_min_moveit, - "qdd_lim_max_moveit": qdd_lim_max_moveit, - "qdd_lim_min_moveit": qdd_lim_min_moveit, - "tau_lim_max_moveit": tau_lim_max_moveit, - "tau_lim_min_moveit": tau_lim_min_moveit, -} - -import csv -def has_more_than_one_line(file_path: str) -> bool: - """Check if the file has more than one line (excluding the header). - - Args: - file_path (str): The path to the file. - - Returns: - bool: True if the file has more than one line (excluding the header), False otherwise. - """ - # Check if the file has more than one line (excluding the header) - with open(file_path, 'r') as file: - reader = csv.reader(file) - # Skip the header - next(reader) - # Check if there are more lines after the header - line_count = sum(1 for _ in reader) - return line_count > 1 - -logger = RichLogger("dynamics_learning-dataset_analysis_logger") - - -def get_file_list(directory: Path, suffix: str) -> list: - """Retrieve a list of files in a directory with a given suffix.""" - return list(sorted([f for f in directory.iterdir() if f.suffix == ".csv" and suffix in f.name])) - - -def validate_and_read_file(file_path: Path) -> np.ndarray: - """Validate that a file has more than one line and read it into a NumPy array.""" - if not has_more_than_one_line(file_path): - logger.warn(f"Skipping {file_path.name} due to insufficient data.") - return np.empty((0,)) - return np.genfromtxt(file_path, dtype=float, delimiter=",") - - -def extract_measurement_data(data: np.ndarray) -> dict: - """Extract measurement-related data from the array.""" - if data.size == 0: - return {} - time_diffs = np.diff(data[1:, 0]) - attained_freq = np.mean(1 / time_diffs) - return { - "t_meas": data[1:, 0].reshape(-1, 1), - "freq": attained_freq, - "duration": data[-1, 0], - "q_meas": data[1:, 1:8], - "qd_meas": data[1:, 8:15], - "tau_meas": data[1:, 15:22], - } - - -def extract_command_data(data: np.ndarray) -> dict: - """Extract command-related data from the array.""" - if data.size == 0: - return {} - return { - "t_command": data[1:, 0:1], - "q_command": data[1:, 1:8], - "qd_command": data[1:, 8:15], - "qdd_command": data[1:, 15:22], - } - - -def analyze_file_pair(meas_file: Path, com_file: Path) -> dict: - """Analyze a pair of measurement and command files.""" - meas_data = validate_and_read_file(meas_file) - com_data = validate_and_read_file(com_file) - - if meas_data.size == 0 or com_data.size == 0: - return {} - - return { - **extract_measurement_data(meas_data), - **extract_command_data(com_data), - } - - -def analyze_trajectories(directory: Path) -> list: - """Analyze all file pairs in the directory.""" - file_list = get_file_list(directory, "meas") - logger.info(f"Found {len(file_list)} measurement files.") - - results = [] - for meas_file in track(file_list, description="Analyzing trajectories..."): - com_file = directory / meas_file.name.replace("meas", "com") - if not com_file.exists(): - logger.warn(f"Missing command file for {meas_file.name}. Skipping...") - continue - result = analyze_file_pair(meas_file, com_file) - if result: - results.append(result) - return results - - -def save_statistics(results: list, output_path: Path): - """Save statistical summaries of the results to a CSV file.""" - durations = [res["duration"] for res in results] - stats = { - "# trajectories": len(durations), - "Duration Sum [s]": np.sum(durations), - "Duration Min [s]": np.min(durations), - "Duration Max [s]": np.max(durations), - "Duration Mean [s]": np.mean(durations), - } - df = pd.DataFrame([stats]) - df.to_csv(output_path, float_format="%.3f") - logger.info(f"Statistics saved to {output_path}") - - -def plot_histogram(data: list, output_path: Path, title: str, xlabel: str): - """Plot and save a histogram for the given data.""" - plt.hist(data, bins=30, density=True, edgecolor="black") - plt.title(title) - plt.xlabel(xlabel) - plt.ylabel("Frequency") - plt.grid(True) - plt.savefig(output_path) - logger.info(f"Histogram saved to {output_path}") - plt.close() - - -def plot_frequency_histogram(results: list, output_path: Path): - """Plot histogram for attained frequencies.""" - frequencies = [res["freq"] for res in results] - plot_histogram(frequencies, output_path, "Frequency Histogram", "Frequency [Hz]") - - -def main(directory: str): - """Main function to run the analysis.""" - dir_path = Path(directory) - output_dir = dir_path / "analysis" - output_dir.mkdir(parents=True, exist_ok=True) - - logger.info(f"Analyzing directory: {dir_path}") - results = analyze_trajectories(dir_path) - - if not results: - logger.warn("No valid trajectories found.") - return - - save_statistics(results, output_dir / "training_trajectories_statistics.csv") - #plot_frequency_histogram(results, output_dir / "frequency_histogram.png") - #plot_histogram(results, output_dir / "training_trajectories_histogram.png", "TITLE", "x") - return results - -def get_qs(results): - qs = {} - for i in range(7): - axis_values = [] - length_per_trajectory = [] - for trajectory in results: - q_commands = np.array(trajectory["q_meas"]) - q_asa = q_commands[:,i] - length_per_trajectory.append(len(q_asa)) - axis_values.append(q_asa) - qs[str(i+1)] = np.hstack(axis_values) - avg_points_per_traj = np.mean(length_per_trajectory) - total_points_per_traj = sum(length_per_trajectory) - return qs, avg_points_per_traj, total_points_per_traj, len(results) - -if __name__ == "__main__": - print("Running analysis...") - #%% -results = main(directory="./f2e72889-c140-4397-809f-fba1b892f17a") -# %% -llt_qs, llt_avgs, llt_totals, llt_num_tras = get_qs(results) - -# %% -results = main(directory="./c9ff52e1-1733-4829-a209-ebd1586a8697") -ita_qs, ita_avgs, ita_totals, ita_num_tras = get_qs(results) -#%% -results = main(directory="./2e60a671-dcc3-4a36-9734-a239c899b57d") -wzl_qs, wzl_avgs, wzl_totals, wzl_num_tras = get_qs(results) - -# %% -print(f""" -\tLLT\tITA\tWZL\tSUM -#TRA\t{llt_num_tras}\t{ita_num_tras}\t{wzl_num_tras}\t{llt_num_tras+ita_num_tras+wzl_num_tras} -AVG\t{int(llt_avgs)}\t{int(ita_avgs)}\t{int(wzl_avgs)}\t{int((llt_avgs+ita_avgs+wzl_avgs)/3)} -TOTAL\t{llt_totals}\t{ita_totals}\t{wzl_totals}\t{llt_totals+ita_totals+wzl_totals} -""") - -# %% -import matplotlib.pyplot as plt -import numpy as np - -plt.rcParams.update({ - 'font.size': 14, # Adjust font size globally - 'axes.titlesize': 14, # Title font size - 'axes.labelsize': 14, # X and Y label font size - 'xtick.labelsize': 14, # X-axis tick labels - 'ytick.labelsize': 14, # Y-axis tick labels - 'legend.fontsize': 14 # Legend font size -}) - -# Example dictionaries (replace with your actual data) -dataset1 = llt_qs -dataset2 = ita_qs -dataset3 = wzl_qs - -# Store datasets in a list -datasets = [dataset1, dataset2, dataset3] -labels = ["Dataset LLT", "Dataset ITA", "Dataset WZL"] -colors = ["blue", "orange", "green"] - -# Create the subplots: 3 rows, 3 columns -fig, axes = plt.subplots(3, 3, figsize=(15, 10)) - -# Plot histograms for each key in the dictionaries -for i in range(7): - row, col = divmod(i, 3) # Determine row and column - for dataset, label, color in zip(datasets, labels, colors): - axes[row, col].hist(dataset[str(i + 1)], bins=30, alpha=1, label=label, color=color, density=True, histtype="step",linewidth=2) - axes[row, col].set_title(f"Axis {i+1}") - axes[row, col].set_xlabel("Joint Position in rad") - axes[row, col].set_ylabel("Density") - axes[row, col].grid(True) - -# Remove the last two subplots (unused space) -for i in range(7, 9): - row, col = divmod(i, 3) - axes[row, col].axis("off") - -# Add the legend in the last subplot space -axes[2, 2].legend( - handles=[plt.Line2D([0], [0], color=color, lw=2) for color in colors], - labels=labels, - loc="center", - fontsize="large" -) -axes[2, 2].axis("off") # Remove axes for the legend space - -# Adjust spacing -fig.tight_layout() - -#plt.show() -plt.savefig("joint_positions_histogram.pdf", format="pdf") - -# %% diff --git a/dynamics_learning/test.py b/dynamics_learning/test.py deleted file mode 100644 index eaaf30fafa7efd60b6d5931ef95386a62f6ed45d..0000000000000000000000000000000000000000 --- a/dynamics_learning/test.py +++ /dev/null @@ -1,89 +0,0 @@ -# %% -from pathlib import Path -# from dynamics_learning.data_retrieval import download_resource_content_into_uuid_folders - - -import coscine -import coscine.resource -from pritty_logger import RichLogger -from rich.progress import track - - -from dynamics_learning.environment import COSCINE_API_TOKEN - -logger = RichLogger("dynamics_learning-coscine_data_retrieval") - -CLIENT = coscine.ApiClient( - COSCINE_API_TOKEN, timeout=120, retries=5, verbose=False, enable_caching=True -) -PROJECT = CLIENT.project( - "IoP Ws A.III FER WWL Demo" -) # Connect to the specified Coscine project -# print(PROJECT) - -# logger.log( -# f"Connecting to Coscine Project\n{PROJECT}" -# ) # Log the connection to the project -RESOURCE = PROJECT.resource( - "Trajectory Data" -) # Address the specific resource in the project - - -def download_resource_content_into_uuid_folders(): - """Download the resource content into folders named after the robot UUID. Keeps only 50 newest trajectories per robot.""" - files = RESOURCE.files(path="train", recursive=True, with_metadata=True) - logger.info(f"Attempting to download {len(files)} files:") # \n{files} - for file in track(files): - if file.is_folder: - continue - logger.info(f"File: {file.name}") - try: - robot_uuid = file.metadata_form()["Robot UUID"][0] - Path(f"/app/dynamics_learning/Pretrained_Model_Trajectory_Data/train/{robot_uuid}").mkdir(parents=True, exist_ok=True) - file.download(f"/app/dynamics_learning/Pretrained_Model_Trajectory_Data/train/{robot_uuid}/{file.name}") - except IndexError: - logger.info(f"No Robot UUID found for file {file.name}.") - continue - # logger.info(f"Keeping only 50 trajectories per robot.") - # delete_files(50, robot_uuid) - - -def delete_files(num_trajectories_to_keep: int, robot_uuid: str) -> None: - """Delete files from the training data directory. - - Files are sorted by date and the newest files are kept. - - Args: - num_trajectories_to_keep (int): Number of trajectories to keep. - robot_uuid (str): Robot UUID. - - Returns: - None: This function does not return anything. - """ - files = [ - str(file) - for file in Path( - f"/app/dynamics_learning/Trajectory Data/train/{robot_uuid}" - ).iterdir() - if file.is_file() and str(file).endswith("meas.csv") - ] - files.sort(reverse=True) - for file in files[num_trajectories_to_keep:]: - Path(file).unlink() - try: - file = file.replace("meas.csv", "com.csv") - Path(file).unlink() - except FileNotFoundError: - # logger.info("No com.csv file found.") - pass - try: - file = file.replace("com.csv", "interp_com.csv") - Path(file).unlink() - except FileNotFoundError: - # logger.info("No interp_com.csv file found.") - pass - return None - - -if __name__ == "__main__": - download_resource_content_into_uuid_folders()