Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
D
Demonstrating Data to Knowledge Pipelines
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Leon Michel Gorißen
Demonstrating Data to Knowledge Pipelines
Commits
fac4b3f0
Commit
fac4b3f0
authored
1 year ago
by
Leon Michel Gorißen
Browse files
Options
Downloads
Patches
Plain Diff
Chore: Document
parent
c2b89bbb
No related branches found
No related tags found
No related merge requests found
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
dynamics_learning/dynamics_learning/preprocessing/dataset_analysis.py
+68
-183
68 additions, 183 deletions
...rning/dynamics_learning/preprocessing/dataset_analysis.py
with
68 additions
and
183 deletions
dynamics_learning/dynamics_learning/preprocessing/dataset_analysis.py
+
68
−
183
View file @
fac4b3f0
#%%
# Import and variables
import
sys
sys
.
path
.
append
(
"
src
"
)
sys
.
path
.
append
(
"
..
"
)
from
typing
import
List
,
Tuple
import
os
import
numpy
as
np
import
matplotlib.pyplot
as
plt
import
pandas
as
pd
import
rwth_style
import
panda_limits
import
os
from
scipy.optimize
import
curve_fit
from
sklearn.metrics
import
r2_score
from
rich
import
print
from
rich.progress
import
track
def
gauss
(
x
,
*
p
):
A
,
mu
,
sigma
=
p
return
A
*
np
.
exp
(
-
(
x
-
mu
)
**
2
/
(
2.
*
sigma
**
2
))
p0
=
[
1
,
0
,
1
]
# %%
for
directory
in
[
"
dataset_v1
"
,
"
dataset_v2
"
,
"
dataset_v3
"
]:
for
dataset
in
[
"
train
"
,
"
test
"
]:
attained_freqs
=
np
.
empty
([
0
,
1
])
duration_meas
=
np
.
empty
(
0
,
np
.
float32
)
num_samples_meas
=
np
.
empty
(
0
,
np
.
float32
)
q_command
=
np
.
empty
((
0
,
7
),
np
.
float32
)
qd_command
=
np
.
empty
((
0
,
7
),
np
.
float32
)
qdd_command
=
np
.
empty
((
0
,
7
),
np
.
float32
)
q_meas
=
np
.
empty
((
0
,
7
),
np
.
float32
)
qd_meas
=
np
.
empty
((
0
,
7
),
np
.
float32
)
tau_meas
=
np
.
empty
((
0
,
7
),
np
.
float32
)
directory_in
=
"
data/{}/{}
"
.
format
(
directory
,
dataset
)
file_list
=
list
(
filter
(
lambda
k
:
'
meas.csv
'
in
k
,
sorted
(
os
.
listdir
(
directory_in
))))
for
filename
in
track
(
file_list
):
data2
=
np
.
genfromtxt
(
os
.
path
.
join
(
directory_in
,
filename
),
dtype
=
float
,
delimiter
=
'
,
'
)
t_meas
=
data2
[:,
0
]
time_diffs
=
np
.
diff
(
t_meas
)
attained_freq
=
np
.
mean
(
1
/
time_diffs
)
attained_freqs
=
np
.
append
(
attained_freqs
,
attained_freq
)
num_samples_meas
=
np
.
concatenate
((
num_samples_meas
,
[
data2
.
shape
[
0
]]))
duration_meas
=
np
.
concatenate
((
duration_meas
,
[
data2
[
-
1
,
0
]]),
axis
=
0
)
q_meas
=
np
.
concatenate
((
q_meas
,
data2
[:,
1
:
8
]),
axis
=
0
)
qd_meas
=
np
.
concatenate
((
qd_meas
,
data2
[:,
8
:
15
]),
axis
=
0
)
tau_meas
=
np
.
concatenate
((
tau_meas
,
data2
[:,
15
:
22
]),
axis
=
0
)
filename2
=
filename
.
replace
(
"
meas
"
,
"
com
"
)
data1
=
np
.
genfromtxt
(
os
.
path
.
join
(
directory_in
,
filename2
),
dtype
=
float
,
delimiter
=
'
,
'
)
q_command
=
np
.
concatenate
((
q_command
,
data1
[:,
1
:
8
]),
axis
=
0
)
qd_command
=
np
.
concatenate
((
qd_command
,
data1
[:,
8
:
15
]),
axis
=
0
)
qdd_command
=
np
.
concatenate
((
qdd_command
,
data1
[:,
15
:
22
]),
axis
=
0
)
df
=
pd
.
DataFrame
({
"
# trajecotries
"
:[
duration_meas
.
size
],
"
Duration Sum [s]
"
:
np
.
sum
(
duration_meas
),
"
Duration Min [s]
"
:
np
.
min
(
duration_meas
),
"
Duration Max [s]
"
:
np
.
max
(
duration_meas
),
"
Duration Mean [s]
"
:
np
.
mean
(
duration_meas
),
"
# Samples Sum
"
:
np
.
sum
(
num_samples_meas
),
"
# Samples Min
"
:
np
.
min
(
num_samples_meas
),
"
# Samples Max
"
:
np
.
max
(
num_samples_meas
),
"
# Samples Mean
"
:
np
.
mean
(
num_samples_meas
)
})
df
.
index
=
[
'
Value
'
]
df1
=
df
.
T
df1
.
to_csv
(
'
data/{}/analysis/trajectory_analysis_{}.csv
'
.
format
(
directory
,
dataset
),
float_format
=
'
%.3f
'
)
# Attained frequency analysis
freq_mu
=
np
.
mean
(
attained_freqs
)
freq_sigma
=
np
.
std
(
attained_freqs
)
freq_worst_case
=
np
.
max
(
freq_mu
-
attained_freqs
)
with
plt
.
style
.
context
(
"
default
"
):
num_col
=
1
num_row
=
1
fig
,
axs
=
plt
.
subplots
(
num_row
,
num_col
,
figsize
=
(
10
,
4
))
count
,
bins
,
ignored
=
axs
.
hist
(
attained_freqs
,
density
=
True
,
bins
=
50
,
color
=
rwth_style
.
blue
)
plt
.
plot
(
bins
,
1
/
(
freq_sigma
*
np
.
sqrt
(
2
*
np
.
pi
))
*
np
.
exp
(
-
(
bins
-
freq_mu
)
**
2
/
(
2
*
freq_sigma
**
2
)
),
linewidth
=
2
,
color
=
rwth_style
.
red
)
axs
.
grid
()
axs
.
set_title
(
r
'
$\mathrm{Histogram\ of\ Frequency:}\ \mu=%.3f Hz,\ \sigma=%.3f$ Hz
'
%
(
freq_mu
,
freq_sigma
))
axs
.
set_ylabel
(
"
Probability
"
)
axs
.
set_xlabel
(
"
Frequency
"
)
fig
.
tight_layout
()
fig
.
savefig
(
"
data/{}/analysis/hist_freq_{}.png
"
.
format
(
directory
,
dataset
))
plt
.
close
(
fig
)
values
=
[
q_command
,
qd_command
,
qdd_command
,
tau_meas
]
names
=
[
"
q
"
,
"
qd
"
,
"
qdd
"
,
"
tau
"
]
labels_y
=
[
"
Position [rad]
"
,
"
Velocity [rad-s]
"
,
"
Acceleration [rad-s^2]
"
,
"
Torque [Nm]
"
]
units
=
[
"
[rad]
"
,
"
[rad-s]
"
,
"
[rad-s^2]
"
,
"
[Nm]
"
]
titles
=
[
"
Command Position
"
,
"
Command Velocity
"
,
"
Command Acceleration
"
,
"
Measured Torque
"
]
labels
=
[
'
Axis 1
'
,
'
Axis 2
'
,
'
Axis 3
'
,
'
Axis 4
'
,
'
Axis 5
'
,
'
Axis 6
'
,
'
Axis 7
'
]
# phyiscal limits
min_limits_phys
=
[
panda_limits
.
q_lim_min_phys
,
panda_limits
.
qd_lim_min_phys
,
panda_limits
.
qdd_lim_min_phys
,
panda_limits
.
tau_lim_min_phys
]
max_limits_phys
=
[
panda_limits
.
q_lim_max_phys
,
panda_limits
.
qd_lim_max_phys
,
panda_limits
.
qdd_lim_max_phys
,
panda_limits
.
tau_lim_max_phys
]
# Moveit limits
min_limits_moveit
=
[
panda_limits
.
q_lim_min_moveit
,
panda_limits
.
qd_lim_min_moveit
,
panda_limits
.
qdd_lim_min_moveit
,
panda_limits
.
tau_lim_min_moveit
]
max_limits_moveit
=
[
panda_limits
.
q_lim_max_moveit
,
panda_limits
.
qd_lim_max_moveit
,
panda_limits
.
qdd_lim_max_moveit
,
panda_limits
.
tau_lim_max_moveit
]
length
=
len
(
values
)
for
i
in
range
(
length
):
value
=
values
[
i
]
std
=
np
.
std
(
value
,
axis
=
0
)
mean
=
np
.
mean
(
value
,
axis
=
0
)
min
=
np
.
min
(
value
,
axis
=
0
)
max
=
np
.
max
(
value
,
axis
=
0
)
phy_mean
=
(
max_limits_phys
[
i
]
+
min_limits_phys
[
i
])
/
2
cov_max_phys
=
(
max
-
min
)
/
(
max_limits_phys
[
i
]
-
min_limits_phys
[
i
])
*
100
# Percentage of used working space
cov_std_phys
=
(
2
*
std
)
/
(
max_limits_phys
[
i
]
-
min_limits_phys
[
i
])
*
100
# Percentage of used working space based on standard deviaton
cov_max_moveit
=
(
max
-
min
)
/
(
max_limits_moveit
[
i
]
-
min_limits_moveit
[
i
])
*
100
# Percentage of used working space
cov_std_moveit
=
(
2
*
std
)
/
(
max_limits_moveit
[
i
]
-
min_limits_moveit
[
i
])
*
100
# Percentage of used working space based on standard deviaton
df
=
pd
.
DataFrame
({
"
Att. Mean {}
"
.
format
(
units
[
i
]):
mean
,
"
Phys. Mean {}
"
.
format
(
units
[
i
]):
phy_mean
,
"
MoveIt Mean {}
"
.
format
(
units
[
i
]):
phy_mean
,
"
Att. Min {}
"
.
format
(
units
[
i
]):
min
,
"
Phys. Min {}
"
.
format
(
units
[
i
]):
min_limits_phys
[
i
],
"
MoveIt Min {}
"
.
format
(
units
[
i
]):
min_limits_moveit
[
i
],
"
Att. Max {}
"
.
format
(
units
[
i
]):
max
,
"
Phys. Max {}
"
.
format
(
units
[
i
]):
max_limits_phys
[
i
],
"
MoveIt Max {}
"
.
format
(
units
[
i
]):
max_limits_moveit
[
i
],
"
Phy. Coverage Min/Max [
\\
%]
"
:
cov_max_phys
,
"
MoveIt Coverage Min/Max [
\\
%]
"
:
cov_max_moveit
,
"
Att. Std {}
"
.
format
(
units
[
i
]):
std
,
"
Phy. Coverage Std [
\\
%]
"
:
cov_std_phys
,
"
MoveIt Coverage Std [
\\
%]
"
:
cov_std_moveit
,
})
df
.
index
=
[
'
Axis_1
'
,
'
Axis_2
'
,
'
Axis_3
'
,
'
Axis_4
'
,
'
Axis_5
'
,
'
Axis_6
'
,
'
Axis_7
'
]
df1
=
df
.
T
df1
.
to_csv
(
'
data/{}/analysis/feature_analysis_{}_{}.csv
'
.
format
(
directory
,
dataset
,
names
[
i
]),
float_format
=
'
%.3f
'
)
x_pos
=
np
.
arange
(
len
(
labels
))
with
plt
.
style
.
context
(
"
default
"
):
f4
,
ax
=
plt
.
subplots
(
figsize
=
(
10
,
5
))
bar1_alpha
=
1
bar1_hatch
=
''
plt
.
rcParams
.
update
({
'
hatch.color
'
:
rwth_style
.
blue_light
})
if
(
i
==
2
or
i
==
0
):
ax
.
bar
(
x_pos
,
max_limits_moveit
[
i
]
-
min_limits_moveit
[
i
],
0.4
,
bottom
=
min_limits_moveit
[
i
],
color
=
rwth_style
.
blue_light
,
label
=
'
MoveIt Limits
'
,
zorder
=
3
)
bar1_alpha
=
0.5
bar1_hatch
=
'
//
'
bar1
=
ax
.
bar
(
x_pos
,
max_limits_phys
[
i
]
-
min_limits_phys
[
i
],
0.4
,
bottom
=
min_limits_phys
[
i
],
color
=
rwth_style
.
blue_light
,
alpha
=
bar1_alpha
,
hatch
=
bar1_hatch
,
label
=
'
Physical Limits
'
,
zorder
=
2
)
ax
.
scatter
(
x_pos
,
(
max_limits_phys
[
i
]
+
min_limits_phys
[
i
])
/
2
,
c
=
[
rwth_style
.
blue
],
s
=
400
,
marker
=
'
_
'
,
label
=
'
Physical Mean
'
,
zorder
=
4
)
errorbar_1
=
ax
.
errorbar
(
x_pos
,
mean
,
std
,
fmt
=
'
o
'
,
capsize
=
5
,
label
=
'
Mean/Std Attained {}
'
.
format
(
titles
[
i
]),
c
=
rwth_style
.
blue
,
zorder
=
5
)
scatter_min
=
ax
.
scatter
(
x_pos
,
min
,
c
=
[
rwth_style
.
blue
],
s
=
20
,
marker
=
'
x
'
,
label
=
'
Min/Max Attained {}
'
.
format
(
titles
[
i
]),
zorder
=
6
)
ax
.
scatter
(
x_pos
,
max
,
c
=
[
rwth_style
.
blue
],
s
=
20
,
marker
=
'
x
'
,
zorder
=
6
)
ax
.
legend
(
loc
=
'
upper center
'
,
bbox_to_anchor
=
(
0.5
,
-
0.08
),
ncol
=
2
)
box
=
ax
.
get_position
()
ax
.
set_position
([
box
.
x0
,
box
.
y0
+
box
.
height
*
0.15
,
box
.
width
,
box
.
height
])
ax
.
set_ylabel
(
labels_y
[
i
])
ax
.
set_ylim
((
-
ax
.
get_ylim
()[
1
],
ax
.
get_ylim
()[
1
]))
ax
.
set_xticks
(
x_pos
)
ax
.
set_xticklabels
(
labels
)
ax
.
yaxis
.
grid
(
True
)
ax
.
set_axisbelow
(
True
)
f4
.
savefig
(
"
data/{}/analysis/errorbar_{}_{}.png
"
.
format
(
directory
,
dataset
,
names
[
i
]))
plt
.
close
(
f4
)
with
plt
.
style
.
context
(
"
default
"
):
num_col
=
3
num_row
=
3
fig
,
axs
=
plt
.
subplots
(
num_row
,
num_col
,
figsize
=
(
20
,
10
))
for
k
,
label
in
enumerate
(
labels
):
count
,
bins
,
ignored
=
axs
[
k
//
num_col
,
k
%
num_col
].
hist
(
value
[:,
k
],
density
=
True
,
bins
=
40
,
color
=
rwth_style
.
blue
)
bins
=
bins
[:
-
1
]
try
:
coeff
,
var_matrix
=
curve_fit
(
gauss
,
bins
,
count
,
p0
=
p0
,
maxfev
=
2000
)
# Get the fitted curve
hist_fit
=
gauss
(
bins
,
*
coeff
)
r_squared
=
r2_score
(
count
,
hist_fit
)
axs
[
k
//
num_col
,
k
%
num_col
].
plot
(
bins
,
hist_fit
,
color
=
rwth_style
.
red
)
t
=
axs
[
k
//
num_col
,
k
%
num_col
].
text
(
0.15
,
0.85
,
f
"
mean:
{
round
(
coeff
[
1
],
3
)
}
\n
std:
{
round
(
coeff
[
2
],
3
)
}
\n
r squared:
{
round
(
round
(
r_squared
,
4
)
*
100
,
2
)
}
%
"
,
horizontalalignment
=
'
center
'
,
verticalalignment
=
'
center
'
,
transform
=
axs
[
k
//
num_col
,
k
%
num_col
].
transAxes
)
t
.
set_bbox
(
dict
(
facecolor
=
'
white
'
,
alpha
=
0.5
,
edgecolor
=
'
black
'
))
except
Exception
as
e
:
print
(
e
)
axs
[
k
//
num_col
,
k
%
num_col
].
grid
()
axs
[
k
//
num_col
,
k
%
num_col
].
set_title
(
labels
[
k
])
axs
[
k
//
num_col
,
k
%
num_col
].
set_ylabel
(
"
Probability Density
"
)
axs
[
k
//
num_col
,
k
%
num_col
].
set_xlabel
(
labels_y
[
i
])
axs
[
k
//
num_col
,
k
%
num_col
].
set_xlim
([
min_limits_phys
[
i
][
k
],
max_limits_phys
[
i
][
k
]])
fig
.
delaxes
(
axs
[
-
1
,
-
1
])
fig
.
delaxes
(
axs
[
-
1
,
-
2
])
fig
.
tight_layout
()
fig
.
savefig
(
"
data/{}/analysis/hist_{}_{}.png
"
.
format
(
directory
,
dataset
,
names
[
i
]))
plt
.
close
(
fig
)
import
tqdm
# Add the custom module path to sys.path
sys
.
path
.
append
(
"
/root/deep-learning-based-robot-dynamics-modelling-in-robot-based-laser-material-processing/src
"
)
# Define the directory containing the data
directory_in
=
"
/root/deep-learning-based-robot-dynamics-modelling-in-robot-based-laser-material-processing/data/dataset_v3/test_2
"
def
normalize_data
(
data
:
np
.
ndarray
,
min_vals
:
List
[
float
],
max_vals
:
List
[
float
],
min
:
float
=
-
1
,
max
:
float
=
1
)
->
np
.
ndarray
:
"""
Normalize the data using given min and max values for each parameter.
Args:
data (np.ndarray): The data to normalize.
min_vals (List[float]): The minimum values for each parameter.
max_vals (List[float]): The maximum values for each parameter.
min (float): The new minimum value for normalization.
max (float): The new maximum value for normalization.
Returns:
np.ndarray: The normalized data.
"""
return
(
data
-
min_vals
)
/
(
max_vals
-
min_vals
)
*
(
max
-
min
)
+
min
def
process_file
(
directory
:
str
,
filename
:
str
)
->
None
:
"""
Process a given file by normalizing its data and saving interpolated values.
Args:
directory (str): The directory containing the file.
filename (str): The name of the file to process.
"""
# Read the command data
data1
=
np
.
genfromtxt
(
os
.
path
.
join
(
directory
,
filename
),
dtype
=
float
,
delimiter
=
'
,
'
)
t_command
=
data1
[:,
0
]
# Command times
motion_params
=
data1
[:,
1
:
22
]
# Command parameters
# Read the measurement data
filename2
=
filename
.
replace
(
"
com
"
,
"
meas
"
)
data2
=
np
.
genfromtxt
(
os
.
path
.
join
(
directory
,
filename2
),
dtype
=
float
,
delimiter
=
'
,
'
)
t_meas
=
data2
[:,
0
]
# Measurement times
# Normalize positions, velocities, and accelerations
for
i
in
range
(
0
,
7
):
motion_params
[:,
i
]
=
normalize_data
(
motion_params
[:,
i
],
panda_limits
.
q_lim_min_phys
[
i
],
panda_limits
.
q_lim_max_phys
[
i
])
for
i
in
range
(
7
,
14
):
motion_params
[:,
i
]
=
normalize_data
(
motion_params
[:,
i
],
panda_limits
.
qd_lim_min_phys
[
i
-
7
],
panda_limits
.
qd_lim_max_phys
[
i
-
7
])
for
i
in
range
(
14
,
21
):
motion_params
[:,
i
]
=
normalize_data
(
motion_params
[:,
i
],
panda_limits
.
qdd_lim_min_phys
[
i
-
14
],
panda_limits
.
qdd_lim_max_phys
[
i
-
14
])
# Interpolate command values to match measurement times
motion_params_interpolated
=
np
.
empty
([
t_meas
.
shape
[
0
],
21
])
for
i
in
range
(
0
,
21
):
motion_params_interpolated
[:,
i
]
=
np
.
interp
(
t_meas
,
t_command
,
motion_params
[:,
i
])
# Combine measurement times with interpolated parameters
meas
=
np
.
hstack
([
t_meas
[:,
None
],
motion_params_interpolated
])
# Save the result to a new file
file_output
=
os
.
path
.
join
(
directory
,
filename
.
replace
(
"
com
"
,
"
interp_com
"
))
np
.
savetxt
(
file_output
,
meas
,
delimiter
=
"
,
"
,
header
=
"
t_com, q1_com, q2_com, q3_com, q4_com, q5_com, q6_com, q7_com, qd1_com, qd2_com, qd3_com, qd4_com, qd5_com, qd6_com, qd7_com, qdd1_com, qdd2_com, qdd3_com, qdd4_com, qdd5_com, qdd6_com, qdd7_com
"
,
)
# Process all relevant files in the directory
for
filename
in
sorted
(
os
.
listdir
(
directory_in
)):
if
filename
.
endswith
(
"
com.csv
"
)
and
"
interp
"
not
in
filename
:
process_file
(
directory_in
,
filename
)
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment