From 9360b7ddf191a3ba0c220a6c25ea37b9c9ea9281 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leon=20Michel=20Gori=C3=9Fen?= <leon.gorissen@llt.rwth-aachen.de> Date: Fri, 24 May 2024 12:10:52 +0200 Subject: [PATCH] wip --- .../preprocessing/dataset_analysis.py | 346 +++++++++--------- 1 file changed, 174 insertions(+), 172 deletions(-) diff --git a/dynamics_learning/dynamics_learning/preprocessing/dataset_analysis.py b/dynamics_learning/dynamics_learning/preprocessing/dataset_analysis.py index c45524f..7896ca7 100644 --- a/dynamics_learning/dynamics_learning/preprocessing/dataset_analysis.py +++ b/dynamics_learning/dynamics_learning/preprocessing/dataset_analysis.py @@ -31,186 +31,188 @@ def gauss(x: np.ndarray, *p: float) -> np.ndarray: # Initial guess for Gaussian parameters p0 = [1, 0, 1] -def analyze() -> None: +def analyze(directory: str, dataset: str) -> None: """ - Analyzes datasets in specified directories, generates statistics, and creates plots. + Analyzes the specified dataset in the given directory, generates statistics, and creates plots. - This function processes datasets, performs statistical analysis, and saves the results - and plots for each dataset. + Args: + directory (str): The directory containing the dataset. + dataset (str): The dataset to be analyzed (e.g., "train", "test"). Returns: None """ + # Initialize empty arrays for data collection + attained_freqs = np.empty([0, 1]) + duration_meas = np.empty(0, np.float32) + num_samples_meas = np.empty(0, np.float32) + q_command = np.empty((0, 7), np.float32) + qd_command = np.empty((0, 7), np.float32) + qdd_command = np.empty((0, 7), np.float32) + q_meas = np.empty((0, 7), np.float32) + qd_meas = np.empty((0, 7), np.float32) + tau_meas = np.empty((0, 7), np.float32) + + directory_in = f"data/{directory}/{dataset}" + file_list = list(filter(lambda k: 'meas.csv' in k, sorted(os.listdir(directory_in)))) + + for filename in track(file_list): + # Read measurement data + data2 = np.genfromtxt(os.path.join(directory_in, filename), dtype=float, delimiter=',') + t_meas = data2[:, 0] + time_diffs = np.diff(t_meas) + attained_freq = np.mean(1 / time_diffs) + attained_freqs = np.append(attained_freqs, attained_freq) + num_samples_meas = np.concatenate((num_samples_meas, [data2.shape[0]])) + duration_meas = np.concatenate((duration_meas, [data2[-1, 0]]), axis=0) + q_meas = np.concatenate((q_meas, data2[:, 1:8]), axis=0) + qd_meas = np.concatenate((qd_meas, data2[:, 8:15]), axis=0) + tau_meas = np.concatenate((tau_meas, data2[:, 15:22]), axis=0) + + # Read command data + filename2 = filename.replace("meas", "com") + data1 = np.genfromtxt(os.path.join(directory_in, filename2), dtype=float, delimiter=',') + q_command = np.concatenate((q_command, data1[:, 1:8]), axis=0) + qd_command = np.concatenate((qd_command, data1[:, 8:15]), axis=0) + qdd_command = np.concatenate((qdd_command, data1[:, 15:22]), axis=0) + + # Create DataFrame for trajectory analysis + df = pd.DataFrame({ + "# trajectories": [duration_meas.size], + "Duration Sum [s]": np.sum(duration_meas), + "Duration Min [s]": np.min(duration_meas), + "Duration Max [s]": np.max(duration_meas), + "Duration Mean [s]": np.mean(duration_meas), + "# Samples Sum": np.sum(num_samples_meas), + "# Samples Min": np.min(num_samples_meas), + "# Samples Max": np.max(num_samples_meas), + "# Samples Mean": np.mean(num_samples_meas) + }) + df.index = ['Value'] + df.T.to_csv(f'data/{directory}/analysis/trajectory_analysis_{dataset}.csv', float_format='%.3f') + + # Attained frequency analysis + freq_mu = np.mean(attained_freqs) + freq_sigma = np.std(attained_freqs) + freq_worst_case = np.max(freq_mu - attained_freqs) + + with plt.style.context("default"): + fig, ax = plt.subplots(figsize=(10, 4)) + ax.hist(attained_freqs, density=True, bins=50, color='blue') + bins = np.linspace(min(attained_freqs), max(attained_freqs), 50) + ax.plot(bins, 1 / (freq_sigma * np.sqrt(2 * np.pi)) * np.exp(-(bins - freq_mu)**2 / (2 * freq_sigma**2)), linewidth=2, color='red') + ax.grid() + ax.set_title(f'Histogram of Frequency: μ={freq_mu:.3f} Hz, σ={freq_sigma:.3f} Hz') + ax.set_ylabel("Probability") + ax.set_xlabel("Frequency") + fig.tight_layout() + fig.savefig(f"data/{directory}/analysis/hist_freq_{dataset}.png") + plt.close(fig) + + # Feature analysis + values = [q_command, qd_command, qdd_command, tau_meas] + names = ["q", "qd", "qdd", "tau"] + labels_y = ["Position [rad]", "Velocity [rad/s]", "Acceleration [rad/s^2]", "Torque [Nm]"] + units = ["[rad]", "[rad/s]", "[rad/s^2]", "[Nm]"] + titles = ["Command Position", "Command Velocity", "Command Acceleration", "Measured Torque"] + labels = ['Axis 1', 'Axis 2', 'Axis 3', 'Axis 4', 'Axis 5', 'Axis 6', 'Axis 7'] + + min_limits_phys = [panda_limits.q_lim_min_phys, panda_limits.qd_lim_min_phys, panda_limits.qdd_lim_min_phys, panda_limits.tau_lim_min_phys] + max_limits_phys = [panda_limits.q_lim_max_phys, panda_limits.qd_lim_max_phys, panda_limits.qdd_lim_max_phys, panda_limits.tau_lim_max_phys] + min_limits_moveit = [panda_limits.q_lim_min_moveit, panda_limits.qd_lim_min_moveit, panda_limits.qdd_lim_min_moveit, panda_limits.tau_lim_min_moveit] + max_limits_moveit = [panda_limits.q_lim_max_moveit, panda_limits.qd_lim_max_moveit, panda_limits.qdd_lim_max_moveit, panda_limits.tau_lim_max_moveit] + + for i in range(len(values)): + value = values[i] + std = np.std(value, axis=0) + mean = np.mean(value, axis=0) + min_val = np.min(value, axis=0) + max_val = np.max(value, axis=0) + phy_mean = (max_limits_phys[i] + min_limits_phys[i]) / 2 + cov_max_phys = (max_val - min_val) / (max_limits_phys[i] - min_limits_phys[i]) * 100 + cov_std_phys = (2 * std) / (max_limits_phys[i] - min_limits_phys[i]) * 100 + cov_max_moveit = (max_val - min_val) / (max_limits_moveit[i] - min_limits_moveit[i]) * 100 + cov_std_moveit = (2 * std) / (max_limits_moveit[i] - min_limits_moveit[i]) * 100 + + df = pd.DataFrame({ + f"Att. Mean {units[i]}": mean, + f"Phys. Mean {units[i]}": phy_mean, + f"MoveIt Mean {units[i]}": phy_mean, + f"Att. Min {units[i]}": min_val, + f"Phys. Min {units[i]}": min_limits_phys[i], + f"MoveIt Min {units[i]}": min_limits_moveit[i], + f"Att. Max {units[i]}": max_val, + f"Phys. Max {units[i]}": max_limits_phys[i], + f"MoveIt Max {units[i]}": max_limits_moveit[i], + "Phy. Coverage Min/Max [%]": cov_max_phys, + "MoveIt Coverage Min/Max [%]": cov_max_moveit, + f"Att. Std {units[i]}": std, + "Phy. Coverage Std [%]": cov_std_phys, + "MoveIt Coverage Std [%]": cov_std_moveit, + }) + df.index = ['Axis 1', 'Axis 2', 'Axis 3', 'Axis 4', 'Axis 5', 'Axis 6', 'Axis 7'] + df.T.to_csv(f'data/{directory}/analysis/feature_analysis_{dataset}_{names[i]}.csv', float_format='%.3f') + + # Create bar plots for limits and measured values + x_pos = np.arange(len(labels)) + with plt.style.context("default"): + fig, ax = plt.subplots(figsize=(10, 5)) + bar1_alpha = 1 + bar1_hatch = '' + if i == 2 or i == 0: + ax.bar(x_pos, max_limits_moveit[i] - min_limits_moveit[i], 0.4, bottom=min_limits_moveit[i], color='lightblue', label='MoveIt Limits', zorder=3) + bar1_alpha = 0.5 + bar1_hatch = '//' + ax.bar(x_pos, max_limits_phys[i] - min_limits_phys[i], 0.4, bottom=min_limits_phys[i], color='lightblue', alpha=bar1_alpha, hatch=bar1_hatch, label='Physical Limits', zorder=2) + ax.scatter(x_pos, (max_limits_phys[i] + min_limits_phys[i]) / 2, c='blue', s=400, marker='_', label='Physical Mean', zorder=4) + ax.errorbar(x_pos, mean, std, fmt='o', capsize=5, label=f'Mean/Std Attained {titles[i]}', c='blue', zorder=5) + ax.scatter(x_pos, min_val, c='blue', s=20, marker='x', label=f'Min/Max Attained {titles[i]}', zorder=6) + ax.scatter(x_pos, max_val, c='blue', s=20, marker='x', zorder=6) + ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.08), ncol=2) + box = ax.get_position() + ax.set_position([box.x0, box.y0 + box.height * 0.15, box.width, box.height]) + ax.set_ylabel(labels_y[i]) + ax.set_ylim((-ax.get_ylim()[1], ax.get_ylim()[1])) + ax.set_xticks(x_pos) + ax.set_xticklabels(labels) + ax.yaxis.grid(True) + ax.set_axisbelow(True) + fig.savefig(f"data/{directory}/analysis/errorbar_{dataset}_{names[i]}.png") + plt.close(fig) + + # Create histograms for the values + with plt.style.context("default"): + fig, axs = plt.subplots(3, 3, figsize=(20, 10)) + for k, label in enumerate(labels): + count, bins, _ = axs[k // 3, k % 3].hist(value[:, k], density=True, bins=40, color='blue') + bins = bins[:-1] + try: + coeff, _ = curve_fit(gauss, bins, count, maxfev=2000) + hist_fit = gauss(bins, *coeff) + r_squared = r2_score(count, hist_fit) + axs[k // 3, k % 3].plot(bins, hist_fit, color='red') + t = axs[k // 3, k % 3].text(0.15, 0.85, f"mean: {round(coeff[1], 3)}\nstd: {round(coeff[2], 3)}\nr squared: {round(r_squared * 100, 2)}%", horizontalalignment='center', verticalalignment='center', transform=axs[k // 3, k % 3].transAxes) + t.set_bbox(dict(facecolor='white', alpha=0.5, edgecolor='black')) + except Exception as e: + print(e) + axs[k // 3, k % 3].grid() + axs[k // 3, k % 3].set_title(label) + axs[k // 3, k % 3].set_ylabel("Probability Density") + axs[k // 3, k % 3].set_xlabel(labels_y[i]) + axs[k // 3, k % 3].set_xlim([min_limits_phys[i][k], max_limits_phys[i][k]]) + + fig.delaxes(axs[-1, -1]) + fig.delaxes(axs[-1, -2]) + fig.tight_layout() + fig.savefig(f"data/{directory}/analysis/hist_{dataset}_{names[i]}.png") + plt.close(fig) + +if __name__ == "__main__": directories = ["dataset_v1", "dataset_v2", "dataset_v3"] datasets = ["train", "test"] + + #TODO test if this works for directory in directories: for dataset in datasets: - # Initialize empty arrays for data collection - attained_freqs = np.empty([0, 1]) - duration_meas = np.empty(0, np.float32) - num_samples_meas = np.empty(0, np.float32) - q_command = np.empty((0, 7), np.float32) - qd_command = np.empty((0, 7), np.float32) - qdd_command = np.empty((0, 7), np.float32) - q_meas = np.empty((0, 7), np.float32) - qd_meas = np.empty((0, 7), np.float32) - tau_meas = np.empty((0, 7), np.float32) - - directory_in = f"data/{directory}/{dataset}" - file_list = list(filter(lambda k: 'meas.csv' in k, sorted(os.listdir(directory_in)))) - - for filename in track(file_list): - # Read measurement data - data2 = np.genfromtxt(os.path.join(directory_in, filename), dtype=float, delimiter=',') - t_meas = data2[:, 0] - time_diffs = np.diff(t_meas) - attained_freq = np.mean(1 / time_diffs) - attained_freqs = np.append(attained_freqs, attained_freq) - num_samples_meas = np.concatenate((num_samples_meas, [data2.shape[0]])) - duration_meas = np.concatenate((duration_meas, [data2[-1, 0]]), axis=0) - q_meas = np.concatenate((q_meas, data2[:, 1:8]), axis=0) - qd_meas = np.concatenate((qd_meas, data2[:, 8:15]), axis=0) - tau_meas = np.concatenate((tau_meas, data2[:, 15:22]), axis=0) - - # Read command data - filename2 = filename.replace("meas", "com") - data1 = np.genfromtxt(os.path.join(directory_in, filename2), dtype=float, delimiter=',') - q_command = np.concatenate((q_command, data1[:, 1:8]), axis=0) - qd_command = np.concatenate((qd_command, data1[:, 8:15]), axis=0) - qdd_command = np.concatenate((qdd_command, data1[:, 15:22]), axis=0) - - # Create DataFrame for trajectory analysis - df = pd.DataFrame({ - "# trajectories": [duration_meas.size], - "Duration Sum [s]": np.sum(duration_meas), - "Duration Min [s]": np.min(duration_meas), - "Duration Max [s]": np.max(duration_meas), - "Duration Mean [s]": np.mean(duration_meas), - "# Samples Sum": np.sum(num_samples_meas), - "# Samples Min": np.min(num_samples_meas), - "# Samples Max": np.max(num_samples_meas), - "# Samples Mean": np.mean(num_samples_meas) - }) - df.index = ['Value'] - df.T.to_csv(f'data/{directory}/analysis/trajectory_analysis_{dataset}.csv', float_format='%.3f') - - # Attained frequency analysis - freq_mu = np.mean(attained_freqs) - freq_sigma = np.std(attained_freqs) - freq_worst_case = np.max(freq_mu - attained_freqs) - - with plt.style.context("default"): - fig, ax = plt.subplots(figsize=(10, 4)) - ax.hist(attained_freqs, density=True, bins=50, color='blue') - bins = np.linspace(min(attained_freqs), max(attained_freqs), 50) - ax.plot(bins, 1 / (freq_sigma * np.sqrt(2 * np.pi)) * np.exp(-(bins - freq_mu)**2 / (2 * freq_sigma**2)), linewidth=2, color='red') - ax.grid() - ax.set_title(f'Histogram of Frequency: μ={freq_mu:.3f} Hz, σ={freq_sigma:.3f} Hz') - ax.set_ylabel("Probability") - ax.set_xlabel("Frequency") - fig.tight_layout() - fig.savefig(f"data/{directory}/analysis/hist_freq_{dataset}.png") - plt.close(fig) - - # Feature analysis - values = [q_command, qd_command, qdd_command, tau_meas] - names = ["q", "qd", "qdd", "tau"] - labels_y = ["Position [rad]", "Velocity [rad/s]", "Acceleration [rad/s^2]", "Torque [Nm]"] - units = ["[rad]", "[rad/s]", "[rad/s^2]", "[Nm]"] - titles = ["Command Position", "Command Velocity", "Command Acceleration", "Measured Torque"] - labels = ['Axis 1', 'Axis 2', 'Axis 3', 'Axis 4', 'Axis 5', 'Axis 6', 'Axis 7'] - - min_limits_phys = [panda_limits.q_lim_min_phys, panda_limits.qd_lim_min_phys, panda_limits.qdd_lim_min_phys, panda_limits.tau_lim_min_phys] - max_limits_phys = [panda_limits.q_lim_max_phys, panda_limits.qd_lim_max_phys, panda_limits.qdd_lim_max_phys, panda_limits.tau_lim_max_phys] - min_limits_moveit = [panda_limits.q_lim_min_moveit, panda_limits.qd_lim_min_moveit, panda_limits.qdd_lim_min_moveit, panda_limits.tau_lim_min_moveit] - max_limits_moveit = [panda_limits.q_lim_max_moveit, panda_limits.qd_lim_max_moveit, panda_limits.qdd_lim_max_moveit, panda_limits.tau_lim_max_moveit] - - for i in range(len(values)): - value = values[i] - std = np.std(value, axis=0) - mean = np.mean(value, axis=0) - min_val = np.min(value, axis=0) - max_val = np.max(value, axis=0) - phy_mean = (max_limits_phys[i] + min_limits_phys[i]) / 2 - cov_max_phys = (max_val - min_val) / (max_limits_phys[i] - min_limits_phys[i]) * 100 - cov_std_phys = (2 * std) / (max_limits_phys[i] - min_limits_phys[i]) * 100 - cov_max_moveit = (max_val - min_val) / (max_limits_moveit[i] - min_limits_moveit[i]) * 100 - cov_std_moveit = (2 * std) / (max_limits_moveit[i] - min_limits_moveit[i]) * 100 - - df = pd.DataFrame({ - f"Att. Mean {units[i]}": mean, - f"Phys. Mean {units[i]}": phy_mean, - f"MoveIt Mean {units[i]}": phy_mean, - f"Att. Min {units[i]}": min_val, - f"Phys. Min {units[i]}": min_limits_phys[i], - f"MoveIt Min {units[i]}": min_limits_moveit[i], - f"Att. Max {units[i]}": max_val, - f"Phys. Max {units[i]}": max_limits_phys[i], - f"MoveIt Max {units[i]}": max_limits_moveit[i], - "Phy. Coverage Min/Max [%]": cov_max_phys, - "MoveIt Coverage Min/Max [%]": cov_max_moveit, - f"Att. Std {units[i]}": std, - "Phy. Coverage Std [%]": cov_std_phys, - "MoveIt Coverage Std [%]": cov_std_moveit, - }) - df.index = ['Axis 1', 'Axis 2', 'Axis 3', 'Axis 4', 'Axis 5', 'Axis 6', 'Axis 7'] - df.T.to_csv(f'data/{directory}/analysis/feature_analysis_{dataset}_{names[i]}.csv', float_format='%.3f') - - # Create bar plots for limits and measured values - x_pos = np.arange(len(labels)) - with plt.style.context("default"): - fig, ax = plt.subplots(figsize=(10, 5)) - bar1_alpha = 1 - bar1_hatch = '' - if i == 2 or i == 0: - ax.bar(x_pos, max_limits_moveit[i] - min_limits_moveit[i], 0.4, bottom=min_limits_moveit[i], color='lightblue', label='MoveIt Limits', zorder=3) - bar1_alpha = 0.5 - bar1_hatch = '//' - ax.bar(x_pos, max_limits_phys[i] - min_limits_phys[i], 0.4, bottom=min_limits_phys[i], color='lightblue', alpha=bar1_alpha, hatch=bar1_hatch, label='Physical Limits', zorder=2) - ax.scatter(x_pos, (max_limits_phys[i] + min_limits_phys[i]) / 2, c='blue', s=400, marker='_', label='Physical Mean', zorder=4) - ax.errorbar(x_pos, mean, std, fmt='o', capsize=5, label=f'Mean/Std Attained {titles[i]}', c='blue', zorder=5) - ax.scatter(x_pos, min_val, c='blue', s=20, marker='x', label=f'Min/Max Attained {titles[i]}', zorder=6) - ax.scatter(x_pos, max_val, c='blue', s=20, marker='x', zorder=6) - ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.08), ncol=2) - box = ax.get_position() - ax.set_position([box.x0, box.y0 + box.height * 0.15, box.width, box.height]) - ax.set_ylabel(labels_y[i]) - ax.set_ylim((-ax.get_ylim()[1], ax.get_ylim()[1])) - ax.set_xticks(x_pos) - ax.set_xticklabels(labels) - ax.yaxis.grid(True) - ax.set_axisbelow(True) - fig.savefig(f"data/{directory}/analysis/errorbar_{dataset}_{names[i]}.png") - plt.close(fig) - - # Create histograms for the values - with plt.style.context("default"): - fig, axs = plt.subplots(3, 3, figsize=(20, 10)) - for k, label in enumerate(labels): - count, bins, _ = axs[k // 3, k % 3].hist(value[:, k], density=True, bins=40, color='blue') - bins = bins[:-1] - try: - coeff, _ = curve_fit(gauss, bins, count, maxfev=2000) - hist_fit = gauss(bins, *coeff) - r_squared = r2_score(count, hist_fit) - axs[k // 3, k % 3].plot(bins, hist_fit, color='red') - t = axs[k // 3, k % 3].text(0.15, 0.85, f"mean: {round(coeff[1], 3)}\nstd: {round(coeff[2], 3)}\nr squared: {round(r_squared * 100, 2)}%", horizontalalignment='center', verticalalignment='center', transform=axs[k // 3, k % 3].transAxes) - t.set_bbox(dict(facecolor='white', alpha=0.5, edgecolor='black')) - except Exception as e: - print(e) - axs[k // 3, k % 3].grid() - axs[k // 3, k % 3].set_title(label) - axs[k // 3, k % 3].set_ylabel("Probability Density") - axs[k // 3, k % 3].set_xlabel(labels_y[i]) - axs[k // 3, k % 3].set_xlim([min_limits_phys[i][k], max_limits_phys[i][k]]) - - fig.delaxes(axs[-1, -1]) - fig.delaxes(axs[-1, -2]) - fig.tight_layout() - fig.savefig(f"data/{directory}/analysis/hist_{dataset}_{names[i]}.png") - plt.close(fig) - - -if __name__ == "__main__": - analize() \ No newline at end of file + analyze(directory,dataset) \ No newline at end of file -- GitLab