Skip to content
Snippets Groups Projects
Commit 49fa2365 authored by Leon Michel Gorißen's avatar Leon Michel Gorißen
Browse files

Ups ... dataset analysis was actually trajectory interpolation :D

parent cc7534c2
No related branches found
No related tags found
No related merge requests found
......@@ -9,6 +9,7 @@ import coscine
import os
from pathlib import Path
from typing import List
import re
# Configure logging
logger = logging.getLogger('coscine_data_retrieval_logger') # Create a logger named 'coscine_receiver_logger'
......@@ -100,6 +101,7 @@ def get_downloaded_files(root_dir:str=f"./{RESOURCE.name}/") -> List:
if path.is_file():
file = str(path).split("/", 1)[1]
if file != "":
if "interp" not in file:
files.append(file)
return files
......
# Import and variables
import sys
from typing import List
import os
sys.path.append("src")
sys.path.append("..")
import numpy as np
import dynamics_learning.preprocessing.panda_limits as panda_limits
def normalize_data(data: np.ndarray, min_vals: List[float], max_vals: List[float], min: float = -1, max: float = 1) -> np.ndarray:
"""
Normalize the data using given min and max values for each parameter.
Args:
data (np.ndarray): The data to normalize.
min_vals (List[float]): The minimum values for each parameter.
max_vals (List[float]): The maximum values for each parameter.
min (float): The new minimum value for normalization.
max (float): The new maximum value for normalization.
Returns:
np.ndarray: The normalized data.
"""
return (data - min_vals) / (max_vals - min_vals) * (max - min) + min
def process_file(directory: str, filename: str) -> None:
"""
Process a given file by normalizing its data and saving interpolated values.
Args:
directory (str): The directory containing the file.
filename (str): The name of the file to process.
"""
# Read the command data
data1 = np.genfromtxt(os.path.join(directory, filename), dtype=float, delimiter=',')
t_command = data1[:,0] # Command times
motion_params = data1[:,1:22] # Command parameters
# Read the measurement data
filename2 = filename.replace("com", "meas")
data2 = np.genfromtxt(os.path.join(directory, filename2), dtype=float, delimiter=',')
t_meas = data2[:,0] # Measurement times
# Normalize positions, velocities, and accelerations
for i in range(0, 7):
motion_params[:, i] = normalize_data(motion_params[:, i], panda_limits.q_lim_min_phys[i], panda_limits.q_lim_max_phys[i])
for i in range(7, 14):
motion_params[:, i] = normalize_data(motion_params[:, i], panda_limits.qd_lim_min_phys[i-7], panda_limits.qd_lim_max_phys[i-7])
for i in range(14, 21):
motion_params[:, i] = normalize_data(motion_params[:, i], panda_limits.qdd_lim_min_phys[i-14], panda_limits.qdd_lim_max_phys[i-14])
# Interpolate command values to match measurement times
motion_params_interpolated = np.empty([t_meas.shape[0], 21])
for i in range(0, 21):
motion_params_interpolated[:, i] = np.interp(t_meas, t_command, motion_params[:, i])
# Combine measurement times with interpolated parameters
meas = np.hstack([t_meas[:, None], motion_params_interpolated])
# Save the result to a new file
file_output = os.path.join(directory, filename.replace("com", "interp_com"))
np.savetxt(
file_output,
meas,
delimiter=",",
header="t_com, q1_com, q2_com, q3_com, q4_com, q5_com, q6_com, q7_com, qd1_com, qd2_com, qd3_com, qd4_com, qd5_com, qd6_com, qd7_com, qdd1_com, qdd2_com, qdd3_com, qdd4_com, qdd5_com, qdd6_com, qdd7_com",
)
import matplotlib.pyplot as plt
import pandas as pd
import rwth_style
import panda_limits
import os
from scipy.optimize import curve_fit
from sklearn.metrics import r2_score
from rich import print
from rich.progress import track
def gauss(x, *p):
A, mu, sigma = p
return A*np.exp(-(x-mu)**2/(2.*sigma**2))
p0 = [1, 0, 1]
def analize():
for directory in ["dataset_v1", "dataset_v2", "dataset_v3"]:
for dataset in ["train", "test"]:
attained_freqs = np.empty([0,1])
duration_meas = np.empty(0, np.float32)
num_samples_meas = np.empty(0, np.float32)
q_command = np.empty((0,7), np.float32)
qd_command = np.empty((0,7), np.float32)
qdd_command = np.empty((0,7), np.float32)
q_meas = np.empty((0,7), np.float32)
qd_meas = np.empty((0,7), np.float32)
tau_meas = np.empty((0,7), np.float32)
directory_in = "data/{}/{}".format(directory, dataset)
file_list = list(filter(lambda k: 'meas.csv' in k, sorted(os.listdir(directory_in))))
for filename in track(file_list):
data2 = np.genfromtxt(os.path.join(directory_in, filename), dtype=float, delimiter=',')
t_meas = data2[:,0]
time_diffs = np.diff(t_meas)
attained_freq = np.mean(1/time_diffs)
attained_freqs = np.append(attained_freqs, attained_freq)
num_samples_meas = np.concatenate((num_samples_meas, [data2.shape[0]]))
duration_meas = np.concatenate((duration_meas, [data2[-1, 0]]), axis=0)
q_meas = np.concatenate((q_meas, data2[:, 1:8]), axis=0)
qd_meas = np.concatenate((qd_meas, data2[:, 8:15]), axis=0)
tau_meas = np.concatenate((tau_meas, data2[:, 15:22]), axis=0)
filename2 = filename.replace("meas", "com")
data1 = np.genfromtxt(os.path.join(directory_in, filename2), dtype=float, delimiter=',')
q_command = np.concatenate((q_command, data1[:,1:8]), axis=0)
qd_command = np.concatenate((qd_command, data1[:, 8:15]), axis=0)
qdd_command = np.concatenate((qdd_command, data1[:, 15:22]), axis=0)
df = pd.DataFrame({"# trajecotries":[duration_meas.size],
"Duration Sum [s]": np.sum(duration_meas),
"Duration Min [s]": np.min(duration_meas),
"Duration Max [s]":np.max(duration_meas),
"Duration Mean [s]":np.mean(duration_meas),
"# Samples Sum":np.sum(num_samples_meas),
"# Samples Min":np.min(num_samples_meas),
"# Samples Max":np.max(num_samples_meas),
"# Samples Mean":np.mean(num_samples_meas)
})
df.index = ['Value']
df1 = df.T
df1.to_csv('data/{}/analysis/trajectory_analysis_{}.csv'.format(directory, dataset), float_format='%.3f')
# Attained frequency analysis
freq_mu = np.mean(attained_freqs)
freq_sigma = np.std(attained_freqs)
freq_worst_case = np.max(freq_mu - attained_freqs)
with plt.style.context("default"):
num_col = 1
num_row = 1
fig, axs = plt.subplots(num_row, num_col, figsize=(10,4))
count, bins, ignored = axs.hist(attained_freqs, density=True, bins=50, color=rwth_style.blue)
plt.plot(bins, 1/(freq_sigma * np.sqrt(2 * np.pi)) *
np.exp( - (bins - freq_mu)**2 / (2 * freq_sigma**2) ),linewidth=2, color=rwth_style.red)
axs.grid()
axs.set_title(r'$\mathrm{Histogram\ of\ Frequency:}\ \mu=%.3f Hz,\ \sigma=%.3f$ Hz' %(freq_mu, freq_sigma))
axs.set_ylabel("Probability")
axs.set_xlabel("Frequency")
fig.tight_layout()
fig.savefig("data/{}/analysis/hist_freq_{}.png".format(directory, dataset))
plt.close(fig)
values = [q_command, qd_command, qdd_command, tau_meas]
names = ["q", "qd", "qdd", "tau"]
labels_y = ["Position [rad]", "Velocity [rad-s]", "Acceleration [rad-s^2]", "Torque [Nm]"]
units = ["[rad]", "[rad-s]", "[rad-s^2]", "[Nm]"]
titles = ["Command Position", "Command Velocity", "Command Acceleration", "Measured Torque"]
labels = ['Axis 1', 'Axis 2', 'Axis 3', 'Axis 4', 'Axis 5', 'Axis 6', 'Axis 7']
# phyiscal limits
min_limits_phys = [panda_limits.q_lim_min_phys, panda_limits.qd_lim_min_phys, panda_limits.qdd_lim_min_phys, panda_limits.tau_lim_min_phys]
max_limits_phys = [panda_limits.q_lim_max_phys, panda_limits.qd_lim_max_phys, panda_limits.qdd_lim_max_phys, panda_limits.tau_lim_max_phys]
# Moveit limits
min_limits_moveit = [panda_limits.q_lim_min_moveit, panda_limits.qd_lim_min_moveit, panda_limits.qdd_lim_min_moveit, panda_limits.tau_lim_min_moveit]
max_limits_moveit = [panda_limits.q_lim_max_moveit, panda_limits.qd_lim_max_moveit, panda_limits.qdd_lim_max_moveit, panda_limits.tau_lim_max_moveit]
length= len(values)
for i in range(length):
value = values[i]
std = np.std(value, axis=0)
mean = np.mean(value, axis=0)
min = np.min(value, axis=0)
max = np.max(value, axis=0)
phy_mean = (max_limits_phys[i]+min_limits_phys[i])/2
cov_max_phys = (max-min) / (max_limits_phys[i]-min_limits_phys[i]) * 100 # Percentage of used working space
cov_std_phys = (2*std) / (max_limits_phys[i]-min_limits_phys[i]) * 100 # Percentage of used working space based on standard deviaton
cov_max_moveit = (max-min) / (max_limits_moveit[i]-min_limits_moveit[i]) * 100 # Percentage of used working space
cov_std_moveit = (2*std) / (max_limits_moveit[i]-min_limits_moveit[i]) * 100 # Percentage of used working space based on standard deviaton
df = pd.DataFrame({"Att. Mean {}".format(units[i]):mean,
"Phys. Mean {}".format(units[i]):phy_mean,
"MoveIt Mean {}".format(units[i]):phy_mean,
"Att. Min {}".format(units[i]):min,
"Phys. Min {}".format(units[i]):min_limits_phys[i],
"MoveIt Min {}".format(units[i]):min_limits_moveit[i],
"Att. Max {}".format(units[i]):max,
"Phys. Max {}".format(units[i]):max_limits_phys[i],
"MoveIt Max {}".format(units[i]):max_limits_moveit[i],
"Phy. Coverage Min/Max [\\%]":cov_max_phys ,
"MoveIt Coverage Min/Max [\\%]":cov_max_moveit ,
"Att. Std {}".format(units[i]):std,
"Phy. Coverage Std [\\%]":cov_std_phys ,
"MoveIt Coverage Std [\\%]":cov_std_moveit ,
})
df.index = ['Axis_1', 'Axis_2', 'Axis_3', 'Axis_4', 'Axis_5', 'Axis_6', 'Axis_7']
df1 = df.T
df1.to_csv('data/{}/analysis/feature_analysis_{}_{}.csv'.format(directory, dataset, names[i]), float_format='%.3f')
x_pos = np.arange(len(labels))
with plt.style.context("default"):
f4, ax = plt.subplots(figsize=(10,5))
bar1_alpha = 1
bar1_hatch = ''
plt.rcParams.update({'hatch.color': rwth_style.blue_light})
if (i==2 or i==0):
ax.bar(x_pos, max_limits_moveit[i] - min_limits_moveit[i], 0.4, bottom = min_limits_moveit[i], color=rwth_style.blue_light, label='MoveIt Limits', zorder=3)
bar1_alpha = 0.5
bar1_hatch = '//'
bar1 = ax.bar(x_pos, max_limits_phys[i] - min_limits_phys[i], 0.4, bottom = min_limits_phys[i], color=rwth_style.blue_light, alpha=bar1_alpha, hatch=bar1_hatch, label='Physical Limits', zorder=2)
ax.scatter(x_pos, (max_limits_phys[i] + min_limits_phys[i])/2, c=[rwth_style.blue], s=400, marker='_', label='Physical Mean', zorder=4)
errorbar_1 = ax.errorbar(x_pos, mean, std, fmt='o', capsize=5, label='Mean/Std Attained {}'.format(titles[i]), c=rwth_style.blue, zorder=5)
scatter_min = ax.scatter(x_pos, min, c=[rwth_style.blue], s=20, marker='x', label='Min/Max Attained {}'.format(titles[i]), zorder=6)
ax.scatter(x_pos, max, c=[rwth_style.blue], s=20, marker='x', zorder=6)
ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.08), ncol=2)
box = ax.get_position()
ax.set_position([box.x0, box.y0 + box.height * 0.15, box.width, box.height])
ax.set_ylabel(labels_y[i])
ax.set_ylim((-ax.get_ylim()[1], ax.get_ylim()[1]))
ax.set_xticks(x_pos)
ax.set_xticklabels(labels)
ax.yaxis.grid(True)
ax.set_axisbelow(True)
f4.savefig("data/{}/analysis/errorbar_{}_{}.png".format(directory, dataset, names[i]))
plt.close(f4)
with plt.style.context("default"):
num_col = 3
num_row = 3
fig, axs = plt.subplots(num_row, num_col, figsize=(20,10))
for k, label in enumerate(labels):
count, bins, ignored = axs[k//num_col, k%num_col].hist(value[:,k], density=True, bins=40, color=rwth_style.blue)
bins = bins[:-1]
try:
coeff, var_matrix = curve_fit(gauss, bins, count, p0=p0, maxfev = 2000)
# Get the fitted curve
hist_fit = gauss(bins, *coeff)
r_squared = r2_score(count, hist_fit)
axs[k//num_col, k%num_col].plot(bins,hist_fit, color=rwth_style.red)
t = axs[k//num_col, k%num_col].text(0.15,0.85, f"mean: {round(coeff[1],3)}\nstd: {round(coeff[2],3)}\nr squared: {round(round(r_squared,4)*100,2)}%", horizontalalignment='center', verticalalignment='center', transform=axs[k//num_col, k%num_col].transAxes)
t.set_bbox(dict(facecolor='white', alpha=0.5, edgecolor='black'))
except Exception as e:
print(e)
axs[k//num_col, k%num_col].grid()
axs[k//num_col, k%num_col].set_title(labels[k])
axs[k//num_col, k%num_col].set_ylabel("Probability Density")
axs[k//num_col, k%num_col].set_xlabel(labels_y[i])
axs[k//num_col, k%num_col].set_xlim([min_limits_phys[i][k],max_limits_phys[i][k]])
fig.delaxes(axs[-1,-1])
fig.delaxes(axs[-1,-2])
fig.tight_layout()
fig.savefig("data/{}/analysis/hist_{}_{}.png".format(directory, dataset, names[i]))
plt.close(fig)
if __name__ == "__main__":
# Add the custom module path to sys.path
sys.path.append("/root/deep-learning-based-robot-dynamics-modelling-in-robot-based-laser-material-processing/src")
# Define the directory containing the data
directory_in = "/root/deep-learning-based-robot-dynamics-modelling-in-robot-based-laser-material-processing/data/dataset_v3/test_2"
# Process all relevant files in the directory
for filename in sorted(os.listdir(directory_in)):
if filename.endswith("com.csv") and "interp" not in filename:
process_file(directory_in, filename)
analize()
\ No newline at end of file
import sys
sys.path.append("/root/deep-learning-based-robot-dynamics-modelling-in-robot-based-laser-material-processing/src")
import tqdm
import numpy as np
from typing import List
import os
import panda_limits
import numpy as np
import dynamics_learning.preprocessing.panda_limits as panda_limits
from pathlib import Path
def normalize_data(data: np.ndarray, min_vals: List[float], max_vals: List[float], min: float = -1, max: float = 1) -> np.ndarray:
"""
Normalize the data using given min and max values for each parameter.
Args:
data (np.ndarray): The data to normalize.
min_vals (List[float]): The minimum values for each parameter.
max_vals (List[float]): The maximum values for each parameter.
min (float): The new minimum value for normalization.
max (float): The new maximum value for normalization.
Returns:
np.ndarray: The normalized data.
"""
return (data - min_vals) / (max_vals - min_vals) * (max - min) + min
directory_in = "/root/deep-learning-based-robot-dynamics-modelling-in-robot-based-laser-material-processing/data/dataset_v3/test_2"
def process_file(directory: str, filename: str) -> None:
"""
Process a given file by normalizing its data and saving interpolated values.
for filename in sorted(os.listdir(directory_in)):
if (filename.endswith("com.csv") and "interp" not in filename):
data1 = np.genfromtxt(os.path.join(directory_in, filename), dtype=float, delimiter=',')
t_command = data1[:,0]
motion_params = data1[:,1:22]
Args:
directory (str): The directory containing the file.
filename (str): The name of the file to process.
"""
# Read the command data
data1 = np.genfromtxt(os.path.join(directory, filename), dtype=float, delimiter=',')
t_command = data1[:,0] # Command times
motion_params = data1[:,1:22] # Command parameters
# Read the measurement data
filename2 = filename.replace("com", "meas")
data2 = np.genfromtxt(os.path.join(directory_in, filename2), dtype=float, delimiter=',')
t_meas = data2[:,0]
max = 1
min = -1
# Normalization: (X - X_min) / (X_max - X_min) * (max - min) + min
# Positions
data2 = np.genfromtxt(os.path.join(directory, filename2), dtype=float, delimiter=',')
t_meas = data2[:,0] # Measurement times
# Normalize positions, velocities, and accelerations
for i in range(0, 7):
motion_params[:, i] = (motion_params[:, i] - panda_limits.q_lim_min_phys[i] ) / (panda_limits.q_lim_max_phys[i] - panda_limits.q_lim_min_phys[i]) * (max - min) + min
# Velocities
motion_params[:, i] = normalize_data(motion_params[:, i], panda_limits.q_lim_min_phys[i], panda_limits.q_lim_max_phys[i])
for i in range(7, 14):
motion_params[:, i] = (motion_params[:, i] - panda_limits.qd_lim_min_phys[i-7] ) / (panda_limits.qd_lim_max_phys[i-7] - panda_limits.qd_lim_min_phys[i-7]) * (max - min) + min
# Accelerations
motion_params[:, i] = normalize_data(motion_params[:, i], panda_limits.qd_lim_min_phys[i-7], panda_limits.qd_lim_max_phys[i-7])
for i in range(14, 21):
motion_params[:, i] = (motion_params[:, i] - panda_limits.qdd_lim_min_phys[i-14] ) / (panda_limits.qdd_lim_max_phys[i-14] - panda_limits.qdd_lim_min_phys[i-14]) * (max - min) + min
motion_params[:, i] = normalize_data(motion_params[:, i], panda_limits.qdd_lim_min_phys[i-14], panda_limits.qdd_lim_max_phys[i-14])
# Interpolate command values
# Interpolate command values to match measurement times
motion_params_interpolated = np.empty([t_meas.shape[0], 21])
for i in range(0, 21):
motion_params_interpolated[:,i] = (np.interp(t_meas, t_command, motion_params[:,i]))
motion_params_interpolated[:, i] = np.interp(t_meas, t_command, motion_params[:, i])
# Combine measurement times with interpolated parameters
meas = np.hstack([t_meas[:, None], motion_params_interpolated])
file_output = directory_in + "/" + filename.replace("com", "interp_com")
# Save the result to a new file
file_output = os.path.join(directory, filename.replace("com", "interp_com"))
np.savetxt(
file_output,
meas,
......@@ -42,3 +65,12 @@ for filename in sorted(os.listdir(directory_in)):
header="t_com, q1_com, q2_com, q3_com, q4_com, q5_com, q6_com, q7_com, qd1_com, qd2_com, qd3_com, qd4_com, qd5_com, qd6_com, qd7_com, qdd1_com, qdd2_com, qdd3_com, qdd4_com, qdd5_com, qdd6_com, qdd7_com",
)
if __name__ == "__main__":
# Define the directory containing the data
directory_in = "/app/dynamics_learning/dynamics_learning/Trajectory Data/"
# Process all relevant files in the directory
for filename in [file for file in Path(directory_in).rglob('*') if file.is_file()]:
print(filename)
if str(filename).endswith("com.csv") and "interp" not in str(filename):
process_file(directory_in, str(filename))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment