Skip to content
Snippets Groups Projects
Commit c2b89bbb authored by Leon Michel Gorißen's avatar Leon Michel Gorißen
Browse files

chore: Remove obsolete scripts from panda_autodynamics package

- No longer needed as all in panda_autodynamics_v1.py

- Deleted : This script was responsible for data listening and saving but is no longer needed.
- Deleted : This script handled motion execution but is now redundant.
- Deleted : This script provided motion planning functionalities which have been deprecated.
- Deleted : This script was part of an older version of the Panda Autodynamics package and has been superseded by more recent implementations.
- Deleted : This script was used for task control but is no longer in use.

These removals clean up the repository and remove deprecated or unused code.
parent 971cdc8e
Branches
Tags
No related merge requests found
#!/usr/bin/env python3
# Data Listening and Saving Node
import rospy
import numpy as np
from franka_msgs.msg import FrankaState
from moveit_msgs.msg import DisplayTrajectory
def state_listener_callback(data):
# Implementation remains the same
pass
def trajectory_listener_callback(data):
# Implementation remains the same
pass
def main():
rospy.init_node("data_listening_saving_node", anonymous=True)
rospy.Subscriber("/franka_state_controller/franka_states", FrankaState, state_listener_callback)
rospy.Subscriber("/move_group/display_planned_path", DisplayTrajectory, trajectory_listener_callback)
# Data saving functionality here
# Access the parameter, defaulting to False if not set
is_test_mode = rospy.get_param('/is_test_mode', False)
if is_test_mode:
# Do something for test mode
pass
else:
# Normal operation
pass
if __name__ == "__main__":
try:
main()
except rospy.ROSInterruptException:
pass
except KeyboardInterrupt:
# Handle any additional cleanup here
print("Shutdown requested...exiting")
finally:
# Optional: additional cleanup actions
print("Performing final cleanup actions before node shutdown.")
#!/usr/bin/env python3
# Motion Execution Node
import sys
import moveit_commander
import rospy
def go_to_home(move_group):
# Implementation remains the same
pass
def main():
moveit_commander.roscpp_initialize(sys.argv)
rospy.init_node("motion_execution_node", anonymous=True)
move_group = moveit_commander.MoveGroupCommander("panda_arm")
# Execution functionality here
if __name__ == "__main__":
try:
main()
except rospy.ROSInterruptException:
pass
except KeyboardInterrupt:
# Handle any additional cleanup here
print("Shutdown requested...exiting")
finally:
# Optional: additional cleanup actions
print("Performing final cleanup actions before node shutdown.")
\ No newline at end of file
#!/usr/bin/env python3
# Motion Planning Node
import sys
import moveit_commander
import rospy
from geometry_msgs.msg import PoseStamped
import numpy as np
import copy
from argparse import ArgumentParser
import yaml
yaml.Dumper.ignore_aliases = lambda *args: True
def plan_cartesian_iso9283_path(group: moveit_commander.MoveGroupCommander) -> "tuple[moveit_commander.RobotTrajectory,float]":
"""Plans a Cartesian path following the ISO 9283 standard for robot manipulators.
This function generates a sequence of waypoints to form a complex path that includes
straight lines and circular arcs, typical for testing or calibrating robotic arms. The
path is designed based on the ISO 9283 standard, which specifies performance criteria
such as accuracy, repeatability, and path following performance for manipulators.
Args:
group (moveit_commander.MoveGroupCommander): The MoveIt move group commander associated with the robot.
Returns:
tuple[moveit_commander.RobotTrajectory,float]: A tuple containing the RobotTrajectory object
detailing the computed path, and a float representing the fraction of the path
that was successfully planned.
"""
waypoints = []
factor = 0.001 # Scale factor to convert units to meters (assuming input is in millimeters).
start_pose = group.get_current_pose().pose # Starting position of the robot's end-effector.
# Define waypoints for the path, including straight lines and circular motions.
# Each waypoint modifies the position of the robot's end-effector in 3D space.
# Point 11
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 70
temp_pose.position.y += factor * 70
waypoints.append(temp_pose)
# Point 12
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 140
temp_pose.position.y += factor * 0
waypoints.append(temp_pose)
# Point 13
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 210
temp_pose.position.y += factor * 0
waypoints.append(temp_pose)
# Point 14
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 210
temp_pose.position.y += factor * 70
waypoints.append(temp_pose)
# Circular movement (Points 15 to 17) - First arc.
# Circular Move 15-17
r = 80
for theta in range(0,180):
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * (280 + r * np.sin(theta * np.pi / 180))
temp_pose.position.y += factor * (-10 + r * np.cos(theta * np.pi / 180))
waypoints.append(temp_pose)
# Circular movement (Points 18 to 22) - Second arc.
# Circular Move 18-22
r = 20
for theta in range(0,360):
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * (140 - r * np.sin(theta * np.pi / 180))
temp_pose.position.y += factor * (-110 + r * np.cos(theta * np.pi / 180))
waypoints.append(temp_pose)
# Point 23
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 70
temp_pose.position.y += factor * -90
waypoints.append(temp_pose)
# Point 24
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 0
temp_pose.position.y += factor * -130
waypoints.append(temp_pose)
# Point 25
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 70
temp_pose.position.y += factor * -170
waypoints.append(temp_pose)
# Point 26
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 140
temp_pose.position.y += factor * -170
waypoints.append(temp_pose)
# Point 27
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 140
temp_pose.position.y += factor * -160
waypoints.append(temp_pose)
# Point 28
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 150
temp_pose.position.y += factor * -160
waypoints.append(temp_pose)
# Point 29
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 150
temp_pose.position.y += factor * -170
waypoints.append(temp_pose)
# Point 30
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 160
temp_pose.position.y += factor * -170
waypoints.append(temp_pose)
# Point 31
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 160
temp_pose.position.y += factor * -160
waypoints.append(temp_pose)
# Point 32
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 170
temp_pose.position.y += factor * -160
waypoints.append(temp_pose)
# Point 33
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 170
temp_pose.position.y += factor * -170
waypoints.append(temp_pose)
# Point 34
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 260
temp_pose.position.y += factor * -170
waypoints.append(temp_pose)
# Point 35
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 360
temp_pose.position.y += factor * -70
waypoints.append(temp_pose)
# Point 36
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 260
temp_pose.position.y += factor * -170
waypoints.append(temp_pose)
# We want the Cartesian path to be interpolated at a resolution of 1 cm
# which is why we will specify 0.01 as the eef_step in Cartesian
# translation. We will disable the jump threshold by setting it to 0.0 disabling:
(plan, fraction) = group.compute_cartesian_path(
waypoints, # waypoints to follow
0.01, # eef_step
0.0 # jump_threshold
)
return plan, fraction
def create_scene(scene: moveit_commander.planning_scene_interface.PlanningSceneInterface, planning_frame: str) -> None:
"""Create moveit commander planning scene
Creates a scene with table and supply channel at the panda.
Args:
scene (moveit_commander.planning_scene_interface.PlanningSceneInterface): the scene where to add the items
planning_frame (str): used to specify the pose
"""
p = PoseStamped()
p.header.frame_id = planning_frame
p.pose.position.x = 0.55
p.pose.position.y = 0.0
p.pose.position.z = -0.5
p.pose.orientation.x = 0
p.pose.orientation.y = 0
p.pose.orientation.z = 1 / np.sqrt(2)
p.pose.orientation.w = 1 / np.sqrt(2)
scene.add_box("table_1", p, size=(0.9, 0.9, 1.0))
p = PoseStamped()
p.header.frame_id = planning_frame
p.pose.position.x = -0.65
p.pose.position.y = 0.0
p.pose.position.z = -0.5
p.pose.orientation.x = 0
p.pose.orientation.y = 0
p.pose.orientation.z = 1 / np.sqrt(2)
p.pose.orientation.w = 1 / np.sqrt(2)
scene.add_box("table_2", p, size=(0.9, 0.9, 1.0))
p = PoseStamped()
p.header.frame_id = planning_frame
p.pose.position.x = -0.05
p.pose.position.y = 0.3
p.pose.position.z = -0.5
p.pose.orientation.x = 0
p.pose.orientation.y = 0
p.pose.orientation.z = 1 / np.sqrt(2)
p.pose.orientation.w = 1 / np.sqrt(2)
scene.add_box("table_3", p, size=(0.3, 0.3, 1.0))
p = PoseStamped()
p.header.frame_id = planning_frame
p.pose.position.x = -0.05
p.pose.position.y = -0.3
p.pose.position.z = -0.5
p.pose.orientation.x = 0
p.pose.orientation.y = 0
p.pose.orientation.z = 1 / np.sqrt(2)
p.pose.orientation.w = 1 / np.sqrt(2)
scene.add_box("table_4", p, size=(0.3, 0.3, 1.0))
p = PoseStamped()
p.header.frame_id = planning_frame
p.pose.position.x = 0.0
p.pose.position.y = -0.5
p.pose.position.z = 1.0
p.pose.orientation.x = 1 / np.sqrt(2)
p.pose.orientation.y = 0
p.pose.orientation.z = 0
p.pose.orientation.w = 1 / np.sqrt(2)
scene.add_box("utils", p, size=(3, 0.4, 0.2))
return
def setup_parser() -> ArgumentParser:
parser = ArgumentParser(
prog="panda_autodynamics",
description="Automatically generate random movement and save trajectory data.",
epilog="(c) Schneider, 2022, https://git-ce.rwth-aachen.de/llt_dpp/all/franka_wwl_demonstrator",
)
parser.add_argument("--count", type=int, default=2, help="The number of trajectories to try to plan. This does not necessarily match the number of executed trajectories.")
parser.add_argument("--test", type=bool, default=False, help="If true, test data is generated. ISO9283 optional testpath from appendix 5 is planned with fixed velocity and acceleration in cartesian space. Data is saved in ./test/")
return parser
def main():
# Argparser Setup
parser = setup_parser()
args = parser.parse_args()
if (args.test):
rospy.set_param('/is_test_mode', True)
# Initialize Node
rospy.init_node("motion_planning_node", anonymous=True)
rospy.sleep(0.5)
# Initialize MoveIt
moveit_commander.roscpp_initialize(sys.argv)
robot = moveit_commander.RobotCommander()
scene = moveit_commander.PlanningSceneInterface()
move_group = moveit_commander.MoveGroupCommander("panda_arm")
rospy.sleep(0.5)
# Create Planning Scene
create_scene(scene, robot.get_plannning_frame())
rospy.sleep(0.5)
# Load Controller Parameters
with open(
"/opt/ros/noetic/share/franka_control/config/default_controllers.yaml", "r"
) as file_open:
default_controller = yaml.load(file_open, Loader=yaml.Loader)
frequency = default_controller["franka_state_controller"]["publish_rate"]
rospy.loginfo("Franka state publish rate is set to {} Hz".format(frequency))
# Why such a low publish rate?
# Load default config
with open(
"/opt/ros/noetic/share/panda_moveit_config/config/joint_limits.yaml", "r"
) as file_open:
global_joint_limits = yaml.load(file_open, Loader=yaml.Loader)
global_qd_factor = global_joint_limits["default_velocity_scaling_factor"]
global_qdd_factor = global_joint_limits["default_acceleration_scaling_factor"]
rospy.loginfo(
f"Franka default_velocity_scaling_factor is set to {global_qd_factor}")
rospy.loginfo(
f"Franka default_acceleration_scaling_factor is set to {global_qdd_factor}")
move_group.set_goal_position_tolerance(0.01) # in meters
move_group.set_goal_orientation_tolerance(0.1) # in radians
for iter_traj in range(0, args.count):
if (args.test):
global_qd_factor = 0.25
global_qdd_factor = 0.1
move_group.set_max_velocity_scaling_factor(global_qd_factor)
move_group.set_max_acceleration_scaling_factor(global_qdd_factor)
move_group.set_pose_target([-0.18, 0.6, 0.2, -np.pi, 0, 0])
move_group.go(wait=True)
plan,_ = plan_cartesian_iso9283_path(move_group)
# TODO This needs to be moved to motion_execution
rospy.loginfo(f"""Path No.: {iter_traj}
Velocity factor: {global_qd_factor}
Acceleration factor: {global_qdd_factor}""")
success = move_group.execute(plan, wait=True)
rospy.loginfo("Move group returns: " + str(success))
else:
# numpy doc for np.random.uniform:
# Samples are uniformly distributed over the half-open
# interval [low, high) (includes low, but excludes high).
# But the script need the interval (0, 1].
# Multiply with -1 to get right interval.
qd_factor = -round(np.random.uniform(-0.5, -0.1), 2)
move_group.set_max_velocity_scaling_factor(qd_factor)
qdd_factor = -round(np.random.uniform(-0.5, -0.1), 2)
move_group.set_max_acceleration_scaling_factor(qdd_factor)
target_joint_values = move_group.get_random_joint_values()
rospy.loginfo(f"""Path No.: {iter_traj}
Velocity factor: {global_qd_factor}
Acceleration factor: {global_qdd_factor}""")
success = move_group.go(target_joint_values, wait=True)
rospy.loginfo("Move group returns: " + str(success))
if __name__ == "__main__":
try:
main()
except rospy.ROSInterruptException:
pass
except KeyboardInterrupt:
# Handle any additional cleanup here
print("Shutdown requested...exiting")
finally:
# Optional: additional cleanup actions
print("Performing final cleanup actions before node shutdown.")
#!/app/venv38/bin/python
# -*- coding:utf-8 -*-
"""Panda Autodynamics package."""
# TODO DOCSTRING DOCUMENTATION
import struct
import sys
from argparse import ArgumentParser
from datetime import datetime
from pathlib import Path
import moveit_commander
import moveit_msgs.msg
import numpy as np
import rospy
import yaml
from franka_msgs.msg import FrankaState
from geometry_msgs.msg import PoseStamped
#from pyModbusTCP.client import ModbusClient
import copy
from control_msgs.msg import FollowJointTrajectoryGoal
yaml.Dumper.ignore_aliases = lambda *args: True
def plan_cartesian_path(group):
waypoints = []
factor = 0.001
start_pose = group.get_current_pose().pose
# Point 11
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 70
temp_pose.position.y += factor * 70
waypoints.append(temp_pose)
# Point 12
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 140
temp_pose.position.y += factor * 0
waypoints.append(temp_pose)
# Point 13
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 210
temp_pose.position.y += factor * 0
waypoints.append(temp_pose)
# Point 14
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 210
temp_pose.position.y += factor * 70
waypoints.append(temp_pose)
# Circular Move 15-17
r = 80
for theta in range(0,180):
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * (280 + r * np.sin(theta * np.pi / 180))
temp_pose.position.y += factor * (-10 + r * np.cos(theta * np.pi / 180))
waypoints.append(temp_pose)
# Circular Move 18-22
r = 20
for theta in range(0,360):
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * (140 - r * np.sin(theta * np.pi / 180))
temp_pose.position.y += factor * (-110 + r * np.cos(theta * np.pi / 180))
waypoints.append(temp_pose)
# Point 23
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 70
temp_pose.position.y += factor * -90
waypoints.append(temp_pose)
# Point 24
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 0
temp_pose.position.y += factor * -130
waypoints.append(temp_pose)
# Point 25
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 70
temp_pose.position.y += factor * -170
waypoints.append(temp_pose)
# Point 26
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 140
temp_pose.position.y += factor * -170
waypoints.append(temp_pose)
# Point 27
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 140
temp_pose.position.y += factor * -160
waypoints.append(temp_pose)
# Point 28
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 150
temp_pose.position.y += factor * -160
waypoints.append(temp_pose)
# Point 29
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 150
temp_pose.position.y += factor * -170
waypoints.append(temp_pose)
# Point 30
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 160
temp_pose.position.y += factor * -170
waypoints.append(temp_pose)
# Point 31
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 160
temp_pose.position.y += factor * -160
waypoints.append(temp_pose)
# Point 32
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 170
temp_pose.position.y += factor * -160
waypoints.append(temp_pose)
# Point 33
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 170
temp_pose.position.y += factor * -170
waypoints.append(temp_pose)
# Point 34
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 260
temp_pose.position.y += factor * -170
waypoints.append(temp_pose)
# Point 35
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 360
temp_pose.position.y += factor * -70
waypoints.append(temp_pose)
# Point 36
temp_pose = copy.deepcopy(start_pose)
temp_pose.position.x += factor * 260
temp_pose.position.y += factor * -170
waypoints.append(temp_pose)
# We want the Cartesian path to be interpolated at a resolution of 1 cm
# which is why we will specify 0.01 as the eef_step in Cartesian
# translation. We will disable the jump threshold by setting it to 0.0 disabling:
(plan, fraction) = group.compute_cartesian_path(
waypoints, # waypoints to follow
0.01, # eef_step
0.0) # jump_threshold
return plan, fraction
def create_scene(scene: moveit_commander.planning_scene_interface.PlanningSceneInterface, planning_frame: str) -> None:
"""Create moveit commander planning scene
Creates a scene with table and supply channel at the panda.
Args:
scene (moveit_commander.planning_scene_interface.PlanningSceneInterface): the scene where to add the items
planning_frame (str): used to specify the pose
"""
p = PoseStamped()
p.header.frame_id = planning_frame
p.pose.position.x = 0.55
p.pose.position.y = 0.0
p.pose.position.z = -0.5
p.pose.orientation.x = 0
p.pose.orientation.y = 0
p.pose.orientation.z = 1 / np.sqrt(2)
p.pose.orientation.w = 1 / np.sqrt(2)
scene.add_box("table_1", p, size=(0.9, 0.9, 1.0))
p = PoseStamped()
p.header.frame_id = planning_frame
p.pose.position.x = -0.65
p.pose.position.y = 0.0
p.pose.position.z = -0.5
p.pose.orientation.x = 0
p.pose.orientation.y = 0
p.pose.orientation.z = 1 / np.sqrt(2)
p.pose.orientation.w = 1 / np.sqrt(2)
scene.add_box("table_2", p, size=(0.9, 0.9, 1.0))
p = PoseStamped()
p.header.frame_id = planning_frame
p.pose.position.x = -0.05
p.pose.position.y = 0.3
p.pose.position.z = -0.5
p.pose.orientation.x = 0
p.pose.orientation.y = 0
p.pose.orientation.z = 1 / np.sqrt(2)
p.pose.orientation.w = 1 / np.sqrt(2)
scene.add_box("table_3", p, size=(0.3, 0.3, 1.0))
p = PoseStamped()
p.header.frame_id = planning_frame
p.pose.position.x = -0.05
p.pose.position.y = -0.3
p.pose.position.z = -0.5
p.pose.orientation.x = 0
p.pose.orientation.y = 0
p.pose.orientation.z = 1 / np.sqrt(2)
p.pose.orientation.w = 1 / np.sqrt(2)
scene.add_box("table_4", p, size=(0.3, 0.3, 1.0))
p = PoseStamped()
p.header.frame_id = planning_frame
p.pose.position.x = 0.0
p.pose.position.y = -0.5
p.pose.position.z = 1.0
p.pose.orientation.x = 1 / np.sqrt(2)
p.pose.orientation.y = 0
p.pose.orientation.z = 0
p.pose.orientation.w = 1 / np.sqrt(2)
scene.add_box("utils", p, size=(3, 0.4, 0.2))
return
def go_to_home(move_group:moveit_commander.move_group.MoveGroupCommander) -> None:
"""Robot will move to home position
Args:
move_group (moveit_commander.move_group.MoveGroupCommander): _description_
"""
move_group.set_named_target("ready")
rospy.logwarn(
"Robot moves to Home Position. Please wait."
)
move_group.set_max_velocity_scaling_factor(0.25)
move_group.set_max_acceleration_scaling_factor(0.25)
move_group.go(wait=True)
move_group.stop()
return
def state_listener_callback(data):
global t_meas, q_meas, qd_meas, tau_meas, U_l1n, I1, P1
t_meas = np.append(t_meas, data.time)
q_meas = np.vstack((q_meas, data.q))
qd_meas = np.vstack((qd_meas, data.dq))
tau_meas = np.vstack((tau_meas, data.tau_J))
# U_l1n_in = powermeter.read_input_registers(reg_addr=1, reg_nb=2)
# I1_in = powermeter.read_input_registers(reg_addr=13, reg_nb=2)
# P1_in = powermeter.read_input_registers(reg_addr=25, reg_nb=2)
# if U_l1n_in:
# try:
# U_l1n_formatted = struct.unpack(
# "!f",
# bytes.fromhex(
# "{0:02x}".format(U_l1n_in[0]) + "{0:02x}".format(U_l1n_in[1])
# ),
# )[0]
# except Exception:
# U_l1n_formatted = -1
# if I1_in:
# try:
# I1_formatted = struct.unpack(
# "!f",
# bytes.fromhex("{0:02x}".format(I1_in[0]) + "{0:02x}".format(I1_in[1])),
# )[0]
# except Exception:
# I1_formatted = -1
# if P1_in:
# try:
# P1_formatted = struct.unpack(
# "!f",
# bytes.fromhex("{0:02x}".format(P1_in[0]) + "{0:02x}".format(P1_in[1])),
# )[0]
# except Exception:
# P1_formatted = -1
U_l1n = np.append(U_l1n, 0)
I1 = np.append(I1, 0)
P1 = np.append(P1, 0)
def trajectory_listener_callback(data):
global t_com, q_com, qd_com, qdd_com
data = data.trajectory[0].joint_trajectory.points
t_com = np.empty((0, 1))
q_com = np.empty((0, 7))
qd_com = np.empty((0, 7))
qdd_com = np.empty((0, 7))
for i in range(0, len(data)):
t_com = np.append(
t_com,
data[i].time_from_start.secs + data[i].time_from_start.nsecs / 1000000000,
)
q_com = np.vstack([q_com, data[i].positions])
qd_com = np.vstack([qd_com, data[i].velocities])
qdd_com = np.vstack([qdd_com, data[i].accelerations])
def main():
global t_meas, q_meas, qd_meas, tau_meas
global t_com, q_com, qd_com, qdd_com
global U_l1n, I1, P1, powermeter
global state_listener, trajectory_listener
try:
# Argparser for max count
parser = ArgumentParser(
prog="panda_autodynamics",
description="Automatically generate random movement and save trajectory data.",
epilog="(c) Schneider, 2022, https://git-ce.rwth-aachen.de/llt_dpp/all/franka_wwl_demonstrator",
)
parser.add_argument("--count", type=int, default=2, help="The number of trajectories to try to plan. This does not necessarily match the number of executed trajectories.")
parser.add_argument("--test", type=bool, default=False, help="If true, test data is generated. ISO9283 optional testpath from appendix 5 is planned with fixed velocity and acceleration in cartesian space. Data is saved in ./test/")
args = parser.parse_args()
# TODO implement reseting error message here
# First initialize `moveit_commander`_ and a `rospy`_ node:
moveit_commander.roscpp_initialize(sys.argv)
rospy.init_node("panda_auto_dynamics_v2", anonymous=True)
rospy.sleep(2)
# Instantiate a `RobotCommander`_ object.
# Provides information such as the robot's
# kinematic model and the robot's current joint states
scene = moveit_commander.PlanningSceneInterface()
robot = moveit_commander.RobotCommander()
# Instantiate a `MoveGroupCommander`_ object.
# This object is an interface to a planning group (group of joints).
move_group = moveit_commander.MoveGroupCommander("panda_arm")
display_trajectory_publisher = rospy.Publisher('/move_group/display_planned_path',
moveit_msgs.msg.DisplayTrajectory,
queue_size=20)
# Add collision object namely the table
# (coordinates didnt fit well since otherwise it leads to problems)
rospy.sleep(2)
create_scene(scene, robot.get_planning_frame())
rospy.sleep(2)
# TCP auto connect on modbus request, close after it
#powermeter = ModbusClient(host="169.254.64.30", auto_open=True, auto_close=True)
# Read out Publisher Frequency of Franka State Publisher
with open(
"/opt/ros/noetic/share/franka_control/config/default_controllers.yaml", "r"
) as file_open:
default_controller = yaml.load(file_open, Loader=yaml.Loader)
frequency = default_controller["franka_state_controller"]["publish_rate"]
rospy.loginfo("Franka state publish rate is set to {} Hz".format(frequency))
# Read out Velocity and acceleration global limit factors
with open(
"/opt/ros/noetic/share/panda_moveit_config/config/joint_limits.yaml", "r"
) as file_open:
global_joint_limits = yaml.load(file_open, Loader=yaml.Loader)
global_qd_factor = global_joint_limits["default_velocity_scaling_factor"]
global_qdd_factor = global_joint_limits["default_acceleration_scaling_factor"]
rospy.loginfo(
"Franka default_velocity_scaling_factor is set to {}".format(global_qd_factor)
)
rospy.loginfo(
"Franka default_acceleration_scaling_factor is set to {}".format(
global_qdd_factor
)
)
move_group.set_goal_position_tolerance(0.01) # in meters
move_group.set_goal_orientation_tolerance(0.1) # in radians
for iter_traj in range(0, args.count):
if (args.test):
move_group.set_max_velocity_scaling_factor(0.25)
move_group.set_max_acceleration_scaling_factor(0.1)
move_group.set_pose_target([-0.18, 0.6, 0.2, -np.pi, 0, 0])
move_group.go(wait=True)
now = datetime.now()
t_meas = np.empty((0, 1))
q_meas = np.empty((0, 7))
qd_meas = np.empty((0, 7))
tau_meas = np.empty((0, 7))
t_com = np.empty((0, 1))
q_com = np.empty((0, 7))
qd_com = np.empty((0, 7))
qdd_com = np.empty((0, 7))
U_l1n = np.empty((0, 1))
I1 = np.empty((0, 1))
P1 = np.empty((0, 1))
trajectory_listener = rospy.Subscriber(
"/move_group/display_planned_path",
moveit_msgs.msg.DisplayTrajectory,
trajectory_listener_callback,
)
# numpy doc for np.random.uniform:
# Samples are uniformly distributed over the half-open
# interval [low, high) (includes low, but excludes high).
# But the script need the interval (0, 1].
# Multiply with -1 to get right interval.
qd_factor = -round(np.random.uniform(-0.5, -0.1), 2)
move_group.set_max_velocity_scaling_factor(qd_factor)
qdd_factor = -round(np.random.uniform(-0.5, -0.1), 2)
move_group.set_max_acceleration_scaling_factor(qdd_factor)
#rospy.logwarn(
#"Path No.: {}\t Velocity factor: {}\t Acceleration factor: {}".format(
#iter_traj, move_group.get_max_velocity_scaling_factor(), move_group.get_max_acceleration_scaling_factor()
#)
#)
rospy.logwarn(f"{move_group.__dict__}")
# How to change publisher rate?
# /opt/ros/noetic/share/franka_control/config/default_controllers.yaml
# under franka_state_controller -> publish_rate
state_listener = rospy.Subscriber(
"/franka_state_controller/franka_states",
FrankaState,
state_listener_callback,
)
if (args.test):
plan,fraction = plan_cartesian_path(move_group)
success = move_group.execute(plan, wait=True)
else:
# Start movement in joint space
target_joint_values = move_group.get_random_joint_values()
success = move_group.go(target_joint_values, wait=True)
rospy.logwarn("Move group returns: " + str(success))
# Do not listen to publisher anymore
state_listener.unregister()
trajectory_listener.unregister()
# Save data to yaml file for database
if (success):
# rospy.loginfo("Saving Data... Please wait")
try:
t_meas = t_meas - t_meas[0]
meas = np.concatenate(
(
t_meas[:, None],
q_meas,
qd_meas,
tau_meas,
U_l1n[:, None],
I1[:, None],
P1[:, None],
),
axis=1,
)
if (args.test):
save_at = "test"
else:
save_at = "train"
save_path = Path(f"/app/{save_at}/")
save_path = save_path.expanduser()
save_path.mkdir(parents=True, exist_ok=True)
filename = f"{now.strftime('%Y%m%d_%H%M%S')}_meas.csv"
result_file = save_path / filename
# "/app/{}/{}_meas.csv".format(
# now.strftime("%Y%m%d_%H%M%S"), save_at
#)
np.savetxt(
result_file,
meas,
delimiter=",",
header="t_meas, q1_meas, q2_meas, q3_meas, q4_meas, q5_meas, q6_meas, q7_meas, qd1_meas, qd2_meas, qd3_meas, qd4_meas, qd5_meas, qd6_meas, qd7_meas, tau1_meas, tau2_meas, tau3_meas, tau4_meas, tau5_meas, tau6_meas, tau7_meas, U, I, P",
)
rospy.loginfo("saved measured values")
com = np.concatenate((t_com[:, None], q_com, qd_com, qdd_com), axis=1)
rospy.loginfo("read command values")
filename = f"{now.strftime('%Y%m%d_%H%M%S')}_com.csv"
result_file = save_path / filename
np.savetxt(
result_file,
com,
delimiter=",",
header="t_com, q1_com, q2_com, q3_com, q4_com, q5_com, q6_com, q7_com, qd1_com, qd2_com, qd3_com, qd4_com, qd5_com, qd6_com, qd7_com, qdd1_com, qdd2_com, qdd3_com, qdd4_com, qdd5_com, qdd6_com, qdd7_com",
)
rospy.loginfo("Measurement complete. Success!")
except Exception:
rospy.loginfo("Error with saving data!")
else:
rospy.loginfo("Planning failed. Next try!")
# TODO implement tcp client to send csv files
except KeyboardInterrupt:
rospy.loginfo("Shutting down")
if 'move_group' in globals():
move_group.stop()
if 'state_listener' in globals():
state_listener.unregister()
if 'trajectory_listener' in globals():
trajectory_listener.unregister()
moveit_commander.roscpp_shutdown()
moveit_commander.os._exit(0)
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment