Skip to content
Snippets Groups Projects
Commit dd096055 authored by Leon Michel Gorißen's avatar Leon Michel Gorißen
Browse files

chore: remove further explorative coding files

parent 28df8589
No related branches found
No related tags found
No related merge requests found
#%%
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pritty_logger import RichLogger
from rich.progress import track
from scipy.optimize import curve_fit
from sklearn.metrics import r2_score
# Physical joint limits
q_lim_max_phys = (
np.array([2.7973, 1.6628, 2.7973, -0.1698, 2.7973, 3.6525, 2.7973]) + 0.1
)
q_lim_min_phys = (
np.array([-2.7973, -1.6628, -2.7973, -2.9718, -2.7973, 0.1175, -2.7973]) - 0.1
)
qd_lim_max_phys = np.array([2.175, 2.175, 2.175, 2.175, 2.61, 2.61, 2.61])
qd_lim_min_phys = -1 * qd_lim_max_phys
qdd_lim_max_phys = np.array([15, 7.5, 10, 12.5, 15, 20, 20])
qdd_lim_min_phys = -1 * qdd_lim_max_phys
tau_lim_max_phys = np.array([87, 87, 87, 87, 12, 12, 12])
tau_lim_min_phys = -1 * tau_lim_max_phys
physical_limits = {
"q_lim_max_phys": q_lim_max_phys,
"q_lim_min_phys": q_lim_min_phys,
"qd_lim_max_phys": qd_lim_max_phys,
"qd_lim_min_phys": qd_lim_min_phys,
"qdd_lim_max_phys": qdd_lim_max_phys,
"qdd_lim_min_phys": qdd_lim_min_phys,
"tau_lim_max_phys": tau_lim_max_phys,
"tau_lim_min_phys": tau_lim_min_phys,
}
# Moveit limits
# /opt/ros/noetic/share/franka_description/robots/panda/joint_limits.yaml
# /opt/ros/noetic/share/panda_moveit_config/config/joint_limits.yaml
q_lim_max_moveit = q_lim_max_phys - 0.1
q_lim_min_moveit = q_lim_min_phys + 0.1
qd_lim_max_moveit = qd_lim_max_phys
qd_lim_min_moveit = qd_lim_min_phys
qdd_lim_max_moveit = np.array([3.75, 1.875, 2.5, 3.125, 3.75, 5, 5])
qdd_lim_min_moveit = -1 * qdd_lim_max_moveit
tau_lim_max_moveit = np.array([87, 87, 87, 87, 12, 12, 12])
tau_lim_min_moveit = -1 * tau_lim_max_phys
moveit_limits = {
"q_lim_max_moveit": q_lim_max_moveit,
"q_lim_min_moveit": q_lim_min_moveit,
"qd_lim_max_moveit": qd_lim_max_moveit,
"qd_lim_min_moveit": qd_lim_min_moveit,
"qdd_lim_max_moveit": qdd_lim_max_moveit,
"qdd_lim_min_moveit": qdd_lim_min_moveit,
"tau_lim_max_moveit": tau_lim_max_moveit,
"tau_lim_min_moveit": tau_lim_min_moveit,
}
import csv
def has_more_than_one_line(file_path: str) -> bool:
"""Check if the file has more than one line (excluding the header).
Args:
file_path (str): The path to the file.
Returns:
bool: True if the file has more than one line (excluding the header), False otherwise.
"""
# Check if the file has more than one line (excluding the header)
with open(file_path, 'r') as file:
reader = csv.reader(file)
# Skip the header
next(reader)
# Check if there are more lines after the header
line_count = sum(1 for _ in reader)
return line_count > 1
logger = RichLogger("dynamics_learning-dataset_analysis_logger")
def get_file_list(directory: Path, suffix: str) -> list:
"""Retrieve a list of files in a directory with a given suffix."""
return list(sorted([f for f in directory.iterdir() if f.suffix == ".csv" and suffix in f.name]))
def validate_and_read_file(file_path: Path) -> np.ndarray:
"""Validate that a file has more than one line and read it into a NumPy array."""
if not has_more_than_one_line(file_path):
logger.warn(f"Skipping {file_path.name} due to insufficient data.")
return np.empty((0,))
return np.genfromtxt(file_path, dtype=float, delimiter=",")
def extract_measurement_data(data: np.ndarray) -> dict:
"""Extract measurement-related data from the array."""
if data.size == 0:
return {}
time_diffs = np.diff(data[1:, 0])
attained_freq = np.mean(1 / time_diffs)
return {
"t_meas": data[1:, 0].reshape(-1, 1),
"freq": attained_freq,
"duration": data[-1, 0],
"q_meas": data[1:, 1:8],
"qd_meas": data[1:, 8:15],
"tau_meas": data[1:, 15:22],
}
def extract_command_data(data: np.ndarray) -> dict:
"""Extract command-related data from the array."""
if data.size == 0:
return {}
return {
"t_command": data[1:, 0:1],
"q_command": data[1:, 1:8],
"qd_command": data[1:, 8:15],
"qdd_command": data[1:, 15:22],
}
def analyze_file_pair(meas_file: Path, com_file: Path) -> dict:
"""Analyze a pair of measurement and command files."""
meas_data = validate_and_read_file(meas_file)
com_data = validate_and_read_file(com_file)
if meas_data.size == 0 or com_data.size == 0:
return {}
return {
**extract_measurement_data(meas_data),
**extract_command_data(com_data),
}
def analyze_trajectories(directory: Path) -> list:
"""Analyze all file pairs in the directory."""
file_list = get_file_list(directory, "meas")
logger.info(f"Found {len(file_list)} measurement files.")
results = []
for meas_file in track(file_list, description="Analyzing trajectories..."):
com_file = directory / meas_file.name.replace("meas", "com")
if not com_file.exists():
logger.warn(f"Missing command file for {meas_file.name}. Skipping...")
continue
result = analyze_file_pair(meas_file, com_file)
if result:
results.append(result)
return results
def save_statistics(results: list, output_path: Path):
"""Save statistical summaries of the results to a CSV file."""
durations = [res["duration"] for res in results]
stats = {
"# trajectories": len(durations),
"Duration Sum [s]": np.sum(durations),
"Duration Min [s]": np.min(durations),
"Duration Max [s]": np.max(durations),
"Duration Mean [s]": np.mean(durations),
}
df = pd.DataFrame([stats])
df.to_csv(output_path, float_format="%.3f")
logger.info(f"Statistics saved to {output_path}")
def plot_histogram(data: list, output_path: Path, title: str, xlabel: str):
"""Plot and save a histogram for the given data."""
plt.hist(data, bins=30, density=True, edgecolor="black")
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel("Frequency")
plt.grid(True)
plt.savefig(output_path)
logger.info(f"Histogram saved to {output_path}")
plt.close()
def plot_frequency_histogram(results: list, output_path: Path):
"""Plot histogram for attained frequencies."""
frequencies = [res["freq"] for res in results]
plot_histogram(frequencies, output_path, "Frequency Histogram", "Frequency [Hz]")
def main(directory: str):
"""Main function to run the analysis."""
dir_path = Path(directory)
output_dir = dir_path / "analysis"
output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Analyzing directory: {dir_path}")
results = analyze_trajectories(dir_path)
if not results:
logger.warn("No valid trajectories found.")
return
save_statistics(results, output_dir / "training_trajectories_statistics.csv")
#plot_frequency_histogram(results, output_dir / "frequency_histogram.png")
#plot_histogram(results, output_dir / "training_trajectories_histogram.png", "TITLE", "x")
return results
def get_qs(results):
qs = {}
for i in range(7):
axis_values = []
length_per_trajectory = []
for trajectory in results:
q_commands = np.array(trajectory["q_meas"])
q_asa = q_commands[:,i]
length_per_trajectory.append(len(q_asa))
axis_values.append(q_asa)
qs[str(i+1)] = np.hstack(axis_values)
avg_points_per_traj = np.mean(length_per_trajectory)
total_points_per_traj = sum(length_per_trajectory)
return qs, avg_points_per_traj, total_points_per_traj, len(results)
if __name__ == "__main__":
print("Running analysis...")
#%%
results = main(directory="./f2e72889-c140-4397-809f-fba1b892f17a")
# %%
llt_qs, llt_avgs, llt_totals, llt_num_tras = get_qs(results)
# %%
results = main(directory="./c9ff52e1-1733-4829-a209-ebd1586a8697")
ita_qs, ita_avgs, ita_totals, ita_num_tras = get_qs(results)
#%%
results = main(directory="./2e60a671-dcc3-4a36-9734-a239c899b57d")
wzl_qs, wzl_avgs, wzl_totals, wzl_num_tras = get_qs(results)
# %%
print(f"""
\tLLT\tITA\tWZL\tSUM
#TRA\t{llt_num_tras}\t{ita_num_tras}\t{wzl_num_tras}\t{llt_num_tras+ita_num_tras+wzl_num_tras}
AVG\t{int(llt_avgs)}\t{int(ita_avgs)}\t{int(wzl_avgs)}\t{int((llt_avgs+ita_avgs+wzl_avgs)/3)}
TOTAL\t{llt_totals}\t{ita_totals}\t{wzl_totals}\t{llt_totals+ita_totals+wzl_totals}
""")
# %%
import matplotlib.pyplot as plt
import numpy as np
plt.rcParams.update({
'font.size': 14, # Adjust font size globally
'axes.titlesize': 14, # Title font size
'axes.labelsize': 14, # X and Y label font size
'xtick.labelsize': 14, # X-axis tick labels
'ytick.labelsize': 14, # Y-axis tick labels
'legend.fontsize': 14 # Legend font size
})
# Example dictionaries (replace with your actual data)
dataset1 = llt_qs
dataset2 = ita_qs
dataset3 = wzl_qs
# Store datasets in a list
datasets = [dataset1, dataset2, dataset3]
labels = ["Dataset LLT", "Dataset ITA", "Dataset WZL"]
colors = ["blue", "orange", "green"]
# Create the subplots: 3 rows, 3 columns
fig, axes = plt.subplots(3, 3, figsize=(15, 10))
# Plot histograms for each key in the dictionaries
for i in range(7):
row, col = divmod(i, 3) # Determine row and column
for dataset, label, color in zip(datasets, labels, colors):
axes[row, col].hist(dataset[str(i + 1)], bins=30, alpha=1, label=label, color=color, density=True, histtype="step",linewidth=2)
axes[row, col].set_title(f"Axis {i+1}")
axes[row, col].set_xlabel("Joint Position in rad")
axes[row, col].set_ylabel("Density")
axes[row, col].grid(True)
# Remove the last two subplots (unused space)
for i in range(7, 9):
row, col = divmod(i, 3)
axes[row, col].axis("off")
# Add the legend in the last subplot space
axes[2, 2].legend(
handles=[plt.Line2D([0], [0], color=color, lw=2) for color in colors],
labels=labels,
loc="center",
fontsize="large"
)
axes[2, 2].axis("off") # Remove axes for the legend space
# Adjust spacing
fig.tight_layout()
#plt.show()
plt.savefig("joint_positions_histogram.pdf", format="pdf")
# %%
# %%
from pathlib import Path
# from dynamics_learning.data_retrieval import download_resource_content_into_uuid_folders
import coscine
import coscine.resource
from pritty_logger import RichLogger
from rich.progress import track
from dynamics_learning.environment import COSCINE_API_TOKEN
logger = RichLogger("dynamics_learning-coscine_data_retrieval")
CLIENT = coscine.ApiClient(
COSCINE_API_TOKEN, timeout=120, retries=5, verbose=False, enable_caching=True
)
PROJECT = CLIENT.project(
"IoP Ws A.III FER WWL Demo"
) # Connect to the specified Coscine project
# print(PROJECT)
# logger.log(
# f"Connecting to Coscine Project\n{PROJECT}"
# ) # Log the connection to the project
RESOURCE = PROJECT.resource(
"Trajectory Data"
) # Address the specific resource in the project
def download_resource_content_into_uuid_folders():
"""Download the resource content into folders named after the robot UUID. Keeps only 50 newest trajectories per robot."""
files = RESOURCE.files(path="train", recursive=True, with_metadata=True)
logger.info(f"Attempting to download {len(files)} files:") # \n{files}
for file in track(files):
if file.is_folder:
continue
logger.info(f"File: {file.name}")
try:
robot_uuid = file.metadata_form()["Robot UUID"][0]
Path(f"/app/dynamics_learning/Pretrained_Model_Trajectory_Data/train/{robot_uuid}").mkdir(parents=True, exist_ok=True)
file.download(f"/app/dynamics_learning/Pretrained_Model_Trajectory_Data/train/{robot_uuid}/{file.name}")
except IndexError:
logger.info(f"No Robot UUID found for file {file.name}.")
continue
# logger.info(f"Keeping only 50 trajectories per robot.")
# delete_files(50, robot_uuid)
def delete_files(num_trajectories_to_keep: int, robot_uuid: str) -> None:
"""Delete files from the training data directory.
Files are sorted by date and the newest files are kept.
Args:
num_trajectories_to_keep (int): Number of trajectories to keep.
robot_uuid (str): Robot UUID.
Returns:
None: This function does not return anything.
"""
files = [
str(file)
for file in Path(
f"/app/dynamics_learning/Trajectory Data/train/{robot_uuid}"
).iterdir()
if file.is_file() and str(file).endswith("meas.csv")
]
files.sort(reverse=True)
for file in files[num_trajectories_to_keep:]:
Path(file).unlink()
try:
file = file.replace("meas.csv", "com.csv")
Path(file).unlink()
except FileNotFoundError:
# logger.info("No com.csv file found.")
pass
try:
file = file.replace("com.csv", "interp_com.csv")
Path(file).unlink()
except FileNotFoundError:
# logger.info("No interp_com.csv file found.")
pass
return None
if __name__ == "__main__":
download_resource_content_into_uuid_folders()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment