Skip to content
Snippets Groups Projects
Commit 46a7af96 authored by Rene Ebeling's avatar Rene Ebeling
Browse files

added the UR_Gazebo_sim again

parent 3f386e50
No related branches found
No related tags found
No related merge requests found
Showing with 973 additions and 0 deletions
cmake_minimum_required(VERSION 3.5)
project(ur_simulation_gazebo)
find_package(ament_cmake REQUIRED)
install(DIRECTORY config launch
DESTINATION share/${PROJECT_NAME}
)
if(BUILD_TESTING)
find_package(launch_testing_ament_cmake)
find_package(ament_cmake_pytest REQUIRED)
add_launch_test(test/test_gazebo.py
TIMEOUT
180
)
endif()
ament_package()
controller_manager:
ros__parameters:
update_rate: 100 # Hz
joint_state_broadcaster:
type: joint_state_broadcaster/JointStateBroadcaster
io_and_status_controller:
type: ur_controllers/GPIOController
speed_scaling_state_broadcaster:
type: ur_controllers/SpeedScalingStateBroadcaster
force_torque_sensor_broadcaster:
type: ur_controllers/ForceTorqueStateBroadcaster
joint_trajectory_controller:
type: joint_trajectory_controller/JointTrajectoryController
scaled_joint_trajectory_controller:
type: ur_controllers/ScaledJointTrajectoryController
forward_velocity_controller:
type: velocity_controllers/JointGroupVelocityController
forward_position_controller:
type: position_controllers/JointGroupPositionController
speed_scaling_state_broadcaster:
ros__parameters:
state_publish_rate: 100.0
force_torque_sensor_broadcaster:
ros__parameters:
sensor_name: tcp_fts_sensor
state_interface_names:
- force.x
- force.y
- force.z
- torque.x
- torque.y
- torque.z
frame_id: tool0
topic_name: ft_data
joint_trajectory_controller:
ros__parameters:
joints:
- shoulder_pan_joint
- shoulder_lift_joint
- elbow_joint
- wrist_1_joint
- wrist_2_joint
- wrist_3_joint
command_interfaces:
- position
state_interfaces:
- position
- velocity
state_publish_rate: 100.0
action_monitor_rate: 20.0
allow_partial_joints_goal: false
constraints:
stopped_velocity_tolerance: 0.2
goal_time: 0.0
shoulder_pan_joint: { trajectory: 0.2, goal: 0.1 }
shoulder_lift_joint: { trajectory: 0.2, goal: 0.1 }
elbow_joint: { trajectory: 0.2, goal: 0.1 }
wrist_1_joint: { trajectory: 0.2, goal: 0.1 }
wrist_2_joint: { trajectory: 0.2, goal: 0.1 }
wrist_3_joint: { trajectory: 0.2, goal: 0.1 }
scaled_joint_trajectory_controller:
ros__parameters:
joints:
- shoulder_pan_joint
- shoulder_lift_joint
- elbow_joint
- wrist_1_joint
- wrist_2_joint
- wrist_3_joint
command_interfaces:
- position
state_interfaces:
- position
- velocity
state_publish_rate: 100.0
action_monitor_rate: 20.0
allow_partial_joints_goal: false
constraints:
stopped_velocity_tolerance: 0.2
goal_time: 0.0
shoulder_pan_joint: { trajectory: 0.2, goal: 0.1 }
shoulder_lift_joint: { trajectory: 0.2, goal: 0.1 }
elbow_joint: { trajectory: 0.2, goal: 0.1 }
wrist_1_joint: { trajectory: 0.2, goal: 0.1 }
wrist_2_joint: { trajectory: 0.2, goal: 0.1 }
wrist_3_joint: { trajectory: 0.2, goal: 0.1 }
forward_velocity_controller:
ros__parameters:
joints:
- shoulder_pan_joint
- shoulder_lift_joint
- elbow_joint
- wrist_1_joint
- wrist_2_joint
- wrist_3_joint
interface_name: velocity
forward_position_controller:
ros__parameters:
joints:
- shoulder_pan_joint
- shoulder_lift_joint
- elbow_joint
- wrist_1_joint
- wrist_2_joint
- wrist_3_joint
# Copyright (c) 2021 Stogl Robotics Consulting UG (haftungsbeschränkt)
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the {copyright_holder} nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Author: Denis Stogl
from launch import LaunchDescription
from launch.actions import (
DeclareLaunchArgument,
IncludeLaunchDescription,
OpaqueFunction,
RegisterEventHandler,
)
from launch.conditions import IfCondition, UnlessCondition
from launch.event_handlers import OnProcessExit
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import Command, FindExecutable, LaunchConfiguration, PathJoinSubstitution
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
def launch_setup(context, *args, **kwargs):
# Initialize Arguments
ur_type = LaunchConfiguration("ur_type")
safety_limits = LaunchConfiguration("safety_limits")
safety_pos_margin = LaunchConfiguration("safety_pos_margin")
safety_k_position = LaunchConfiguration("safety_k_position")
# General arguments
runtime_config_package = LaunchConfiguration("runtime_config_package")
controllers_file = LaunchConfiguration("controllers_file")
initial_positions_file = LaunchConfiguration("initial_positions_file")
description_package = LaunchConfiguration("description_package")
description_file = LaunchConfiguration("description_file")
description_file = LaunchConfiguration("description_file")
prefix = LaunchConfiguration("prefix")
start_joint_controller = LaunchConfiguration("start_joint_controller")
initial_joint_controller = LaunchConfiguration("initial_joint_controller")
launch_rviz = LaunchConfiguration("launch_rviz")
gazebo_gui = LaunchConfiguration("gazebo_gui")
initial_joint_controllers = PathJoinSubstitution(
[FindPackageShare(runtime_config_package), "config", controllers_file]
)
initial_positions_file_abs = PathJoinSubstitution(
[FindPackageShare(runtime_config_package), "config", initial_positions_file]
)
rviz_config_file = PathJoinSubstitution(
[FindPackageShare(description_package), "rviz", "view_robot.rviz"]
)
robot_description_content = Command(
[
PathJoinSubstitution([FindExecutable(name="xacro")]),
" ",
PathJoinSubstitution(
[FindPackageShare(description_package), "urdf", description_file]
),
" ",
"safety_limits:=",
safety_limits,
" ",
"safety_pos_margin:=",
safety_pos_margin,
" ",
"safety_k_position:=",
safety_k_position,
" ",
"name:=",
"ur",
" ",
"ur_type:=",
ur_type,
" ",
"prefix:=",
prefix,
" ",
"sim_gazebo:=true",
" ",
"simulation_controllers:=",
initial_joint_controllers,
" ",
"initial_positions_file:=",
initial_positions_file_abs,
]
)
robot_description = {"robot_description": robot_description_content}
robot_state_publisher_node = Node(
package="robot_state_publisher",
executable="robot_state_publisher",
output="both",
parameters=[{"use_sim_time": True}, robot_description],
)
rviz_node = Node(
package="rviz2",
executable="rviz2",
name="rviz2",
output="log",
arguments=["-d", rviz_config_file],
condition=IfCondition(launch_rviz),
)
joint_state_broadcaster_spawner = Node(
package="controller_manager",
executable="spawner",
arguments=["joint_state_broadcaster", "--controller-manager", "/controller_manager"],
)
# Delay rviz start after `joint_state_broadcaster`
delay_rviz_after_joint_state_broadcaster_spawner = RegisterEventHandler(
event_handler=OnProcessExit(
target_action=joint_state_broadcaster_spawner,
on_exit=[rviz_node],
),
condition=IfCondition(launch_rviz),
)
# There may be other controllers of the joints, but this is the initially-started one
initial_joint_controller_spawner_started = Node(
package="controller_manager",
executable="spawner",
arguments=[initial_joint_controller, "-c", "/controller_manager"],
condition=IfCondition(start_joint_controller),
)
initial_joint_controller_spawner_stopped = Node(
package="controller_manager",
executable="spawner",
arguments=[initial_joint_controller, "-c", "/controller_manager", "--stopped"],
condition=UnlessCondition(start_joint_controller),
)
# Gazebo nodes
gazebo = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[FindPackageShare("gazebo_ros"), "/launch", "/gazebo.launch.py"]
),
launch_arguments={
"gui": gazebo_gui,
}.items(),
)
# Spawn robot
gazebo_spawn_robot = Node(
package="gazebo_ros",
executable="spawn_entity.py",
name="spawn_ur",
arguments=["-entity", "ur", "-topic", "robot_description"],
output="screen",
)
nodes_to_start = [
robot_state_publisher_node,
joint_state_broadcaster_spawner,
delay_rviz_after_joint_state_broadcaster_spawner,
initial_joint_controller_spawner_stopped,
initial_joint_controller_spawner_started,
gazebo,
gazebo_spawn_robot,
]
return nodes_to_start
def generate_launch_description():
declared_arguments = []
# UR specific arguments
declared_arguments.append(
DeclareLaunchArgument(
"ur_type",
description="Type/series of used UR robot.",
choices=["ur3", "ur3e", "ur5", "ur5e", "ur10", "ur10e", "ur16e", "ur20", "ur30"],
default_value="ur5e",
)
)
declared_arguments.append(
DeclareLaunchArgument(
"safety_limits",
default_value="true",
description="Enables the safety limits controller if true.",
)
)
declared_arguments.append(
DeclareLaunchArgument(
"safety_pos_margin",
default_value="0.15",
description="The margin to lower and upper limits in the safety controller.",
)
)
declared_arguments.append(
DeclareLaunchArgument(
"safety_k_position",
default_value="20",
description="k-position factor in the safety controller.",
)
)
# General arguments
declared_arguments.append(
DeclareLaunchArgument(
"runtime_config_package",
default_value="ur_simulation_gazebo",
description='Package with the controller\'s configuration in "config" folder. \
Usually the argument is not set, it enables use of a custom setup.',
)
)
declared_arguments.append(
DeclareLaunchArgument(
"controllers_file",
default_value="ur_controllers.yaml",
description="YAML file with the controllers configuration.",
)
)
declared_arguments.append(
DeclareLaunchArgument(
"initial_positions_file",
default_value=PathJoinSubstitution(
[
FindPackageShare("ur_description"),
"config",
"initial_positions.yaml",
]
),
description="YAML file (absolute path) with the robot's initial joint positions.",
)
)
declared_arguments.append(
DeclareLaunchArgument(
"description_package",
default_value="ur_description",
description="Description package with robot URDF/XACRO files. Usually the argument \
is not set, it enables use of a custom description.",
)
)
declared_arguments.append(
DeclareLaunchArgument(
"description_file",
default_value="ur.urdf.xacro",
description="URDF/XACRO description file with the robot.",
)
)
declared_arguments.append(
DeclareLaunchArgument(
"prefix",
default_value='""',
description="Prefix of the joint names, useful for \
multi-robot setup. If changed than also joint names in the controllers' configuration \
have to be updated.",
)
)
declared_arguments.append(
DeclareLaunchArgument(
"start_joint_controller",
default_value="true",
description="Enable headless mode for robot control",
)
)
declared_arguments.append(
DeclareLaunchArgument(
"initial_joint_controller",
default_value="joint_trajectory_controller",
description="Robot controller to start.",
)
)
declared_arguments.append(
DeclareLaunchArgument("launch_rviz", default_value="true", description="Launch RViz?")
)
declared_arguments.append(
DeclareLaunchArgument(
"gazebo_gui", default_value="true", description="Start gazebo with GUI?"
)
)
return LaunchDescription(declared_arguments + [OpaqueFunction(function=launch_setup)])
# Copyright (c) 2022 Stogl Robotics Consulting UG (haftungsbeschränkt)
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the {copyright_holder} nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Author: Denis Stogl
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, IncludeLaunchDescription, OpaqueFunction
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import LaunchConfiguration
from launch_ros.substitutions import FindPackageShare
def launch_setup(context, *args, **kwargs):
# Initialize Arguments
ur_type = LaunchConfiguration("ur_type")
safety_limits = LaunchConfiguration("safety_limits")
# General arguments
runtime_config_package = LaunchConfiguration("runtime_config_package")
controllers_file = LaunchConfiguration("controllers_file")
description_package = LaunchConfiguration("description_package")
description_file = LaunchConfiguration("description_file")
moveit_config_package = LaunchConfiguration("moveit_config_package")
moveit_config_file = LaunchConfiguration("moveit_config_file")
prefix = LaunchConfiguration("prefix")
ur_control_launch = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[FindPackageShare("ur_simulation_gazebo"), "/launch", "/ur_sim_control.launch.py"]
),
launch_arguments={
"ur_type": ur_type,
"safety_limits": safety_limits,
"runtime_config_package": runtime_config_package,
"controllers_file": controllers_file,
"description_package": description_package,
"description_file": description_file,
"prefix": prefix,
"launch_rviz": "false",
}.items(),
)
ur_moveit_launch = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[FindPackageShare("ur_moveit_config"), "/launch", "/ur_moveit.launch.py"]
),
launch_arguments={
"ur_type": ur_type,
"safety_limits": safety_limits,
"description_package": description_package,
"description_file": description_file,
"moveit_config_package": moveit_config_package,
"moveit_config_file": moveit_config_file,
"prefix": prefix,
"use_sim_time": "true",
"launch_rviz": "true",
"use_fake_hardware": "true", # to change moveit default controller to joint_trajectory_controller
}.items(),
)
nodes_to_launch = [
ur_control_launch,
ur_moveit_launch,
]
return nodes_to_launch
def generate_launch_description():
declared_arguments = []
# UR specific arguments
declared_arguments.append(
DeclareLaunchArgument(
"ur_type",
description="Type/series of used UR robot.",
choices=["ur3", "ur3e", "ur5", "ur5e", "ur10", "ur10e", "ur16e", "ur20", "ur30"],
default_value="ur5e",
)
)
declared_arguments.append(
DeclareLaunchArgument(
"safety_limits",
default_value="true",
description="Enables the safety limits controller if true.",
)
)
# General arguments
declared_arguments.append(
DeclareLaunchArgument(
"runtime_config_package",
default_value="ur_simulation_gazebo",
description='Package with the controller\'s configuration in "config" folder. \
Usually the argument is not set, it enables use of a custom setup.',
)
)
declared_arguments.append(
DeclareLaunchArgument(
"controllers_file",
default_value="ur_controllers.yaml",
description="YAML file with the controllers configuration.",
)
)
declared_arguments.append(
DeclareLaunchArgument(
"description_package",
default_value="ur_description",
description="Description package with robot URDF/XACRO files. Usually the argument \
is not set, it enables use of a custom description.",
)
)
declared_arguments.append(
DeclareLaunchArgument(
"description_file",
default_value="ur.urdf.xacro",
description="URDF/XACRO description file with the robot.",
)
)
declared_arguments.append(
DeclareLaunchArgument(
"moveit_config_package",
default_value="ur_moveit_config",
description="MoveIt config package with robot SRDF/XACRO files. Usually the argument \
is not set, it enables use of a custom moveit config.",
)
)
declared_arguments.append(
DeclareLaunchArgument(
"moveit_config_file",
default_value="ur.srdf.xacro",
description="MoveIt SRDF/XACRO description file with the robot.",
)
)
declared_arguments.append(
DeclareLaunchArgument(
"prefix",
default_value='""',
description="Prefix of the joint names, useful for \
multi-robot setup. If changed than also joint names in the controllers' configuration \
have to be updated.",
)
)
return LaunchDescription(declared_arguments + [OpaqueFunction(function=launch_setup)])
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>ur_simulation_gazebo</name>
<version>0.0.0</version>
<description>Example and configuration files for Gazebo Classic simulation of UR manipulators.</description>
<author>Lovro Ivanov</author>
<maintainer email="denis@stoglrobotics.de">Denis Stogl</maintainer>
<license>BSD-3-Clause</license>
<url type="bugtracker">https://github.com/UniversalRobots/Universal_Robots_ROS2_Gazebo_Simulation/issues</url>
<url type="repository">https://github.com/UniversalRobots/Universal_Robots_ROS2_Gazebo_Simulation</url>
<buildtool_depend>ament_cmake</buildtool_depend>
<exec_depend>controller_manager</exec_depend>
<exec_depend>gazebo_ros2_control</exec_depend>
<exec_depend>gazebo_ros</exec_depend>
<exec_depend>joint_state_broadcaster</exec_depend>
<exec_depend>launch</exec_depend>
<exec_depend>launch_ros</exec_depend>
<exec_depend>robot_state_publisher</exec_depend>
<exec_depend>rviz2</exec_depend>
<exec_depend>ur_controllers</exec_depend>
<exec_depend>ur_description</exec_depend>
<exec_depend>ur_moveit_config</exec_depend>
<exec_depend>urdf</exec_depend>
<exec_depend>xacro</exec_depend>
<test_depend>ament_cmake_pytest</test_depend>
<test_depend>launch_testing_ament_cmake</test_depend>
<test_depend>launch_testing_ros</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>
# Copyright 2024, FZI Forschungszentrum Informatik
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the {copyright_holder} nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import logging
import time
import rclpy
from rclpy.action import ActionClient
from controller_manager_msgs.srv import ListControllers
TIMEOUT_WAIT_SERVICE = 10
TIMEOUT_WAIT_SERVICE_INITIAL = 120
TIMEOUT_WAIT_ACTION = 20
def _wait_for_service(node, srv_name, srv_type, timeout):
client = node.create_client(srv_type, srv_name)
logging.info("Waiting for service '%s' with timeout %fs...", srv_name, timeout)
if client.wait_for_service(timeout) is False:
raise Exception(f"Could not reach service '{srv_name}' within timeout of {timeout}")
logging.info(" Successfully connected to service '%s'", srv_name)
return client
def _wait_for_action(node, action_name, action_type, timeout):
client = ActionClient(node, action_type, action_name)
logging.info("Waiting for action server '%s' with timeout %fs...", action_name, timeout)
if client.wait_for_server(timeout) is False:
raise Exception(
f"Could not reach action server '{action_name}' within timeout of {timeout}"
)
logging.info(" Successfully connected to action server '%s'", action_name)
return client
def wait_for_controller(
node, controller_name, timeout, cm_list_service="/controller_manager/list_controllers"
):
logging.info("Waiting for controller '%s' with timeout %fs...", controller_name, timeout)
client = _wait_for_service(node, cm_list_service, ListControllers, TIMEOUT_WAIT_SERVICE)
controller_active = False
start_time = node.get_clock().now()
while not controller_active and (
node.get_clock().now() - start_time < rclpy.time.Duration(seconds=timeout)
):
result = _call_service(node, client, ListControllers.Request())
for controller in result.controller:
if controller.name == controller_name:
controller_active = controller.state == "active"
if controller_active:
logging.info("Controller '%s' is active.", controller_name)
return True
time.sleep(1)
raise Exception(
f"Could not find active controller '{controller_name}' within timeout of {timeout}"
)
def _call_service(node, client, request):
logging.info("Calling service client '%s' with request '%s'", client.srv_name, request)
future = client.call_async(request)
rclpy.spin_until_future_complete(node, future)
if future.result() is not None:
logging.info(" Received result: %s", future.result())
return future.result()
raise Exception(f"Error while calling service '{client.srv_name}': {future.exception()}")
class _ServiceInterface:
def __init__(
self, node, initial_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, timeout=TIMEOUT_WAIT_SERVICE
):
self.__node = node
self.__service_clients = {
srv_name: (
_wait_for_service(self.__node, srv_name, srv_type, initial_timeout),
srv_type,
)
for srv_name, srv_type in self.__initial_services.items()
}
self.__service_clients.update(
{
srv_name: (_wait_for_service(self.__node, srv_name, srv_type, timeout), srv_type)
for srv_name, srv_type in self.__services.items()
}
)
def __init_subclass__(mcs, namespace="", initial_services={}, services={}, **kwargs):
super().__init_subclass__(**kwargs)
mcs.__initial_services = {
namespace + "/" + srv_name: srv_type for srv_name, srv_type in initial_services.items()
}
mcs.__services = {
namespace + "/" + srv_name: srv_type for srv_name, srv_type in services.items()
}
for srv_name, srv_type in list(initial_services.items()) + list(services.items()):
full_srv_name = namespace + "/" + srv_name
setattr(
mcs,
srv_name,
lambda s, full_srv_name=full_srv_name, *args, **kwargs: _call_service(
s.__node,
s.__service_clients[full_srv_name][0],
s.__service_clients[full_srv_name][1].Request(*args, **kwargs),
),
)
class ActionInterface:
def __init__(self, node, action_name, action_type, timeout=TIMEOUT_WAIT_ACTION):
self.__node = node
self.__action_name = action_name
self.__action_type = action_type
self.__action_client = _wait_for_action(node, action_name, action_type, timeout)
def send_goal(self, *args, **kwargs):
goal = self.__action_type.Goal(*args, **kwargs)
logging.info("Sending goal to action server '%s': %s", self.__action_name, goal)
future = self.__action_client.send_goal_async(goal)
rclpy.spin_until_future_complete(self.__node, future)
if future.result() is not None:
logging.info(" Received result: %s", future.result())
return future.result()
pass
def get_result(self, goal_handle, timeout):
future_res = goal_handle.get_result_async()
logging.info(
"Waiting for action result from '%s' with timeout %fs", self.__action_name, timeout
)
rclpy.spin_until_future_complete(self.__node, future_res, timeout_sec=timeout)
if future_res.result() is not None:
logging.info(" Received result: %s", future_res.result().result)
return future_res.result().result
else:
raise Exception(
f"Exception while calling action '{self.__action_name}': {future_res.exception()}"
)
#!/usr/bin/env python
# Copyright 2024, FZI Forschungszentrum Informatik
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the {copyright_holder} nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import logging
import os
import pytest
import sys
import time
import unittest
import rclpy
from rclpy.node import Node
from launch import LaunchDescription
from launch.actions import (
IncludeLaunchDescription,
)
from launch.substitutions import PathJoinSubstitution
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch_ros.substitutions import FindPackageShare
from launch_testing.actions import ReadyToTest
import launch_testing
from builtin_interfaces.msg import Duration
from control_msgs.action import FollowJointTrajectory
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
sys.path.append(os.path.dirname(__file__))
from test_common import ActionInterface, wait_for_controller # noqa: E402
TIMEOUT_EXECUTE_TRAJECTORY = 30
ROBOT_JOINTS = [
"elbow_joint",
"shoulder_lift_joint",
"shoulder_pan_joint",
"wrist_1_joint",
"wrist_2_joint",
"wrist_3_joint",
]
# TODO: Add tf_prefix parametrization
@pytest.mark.launch_test
@launch_testing.parametrize(
"ur_type",
["ur3", "ur3e", "ur5", "ur5e", "ur10", "ur10e", "ur16e", "ur20", "ur30"],
)
def generate_test_description(ur_type):
gazebo = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
PathJoinSubstitution(
[FindPackageShare("ur_simulation_gazebo"), "launch", "ur_sim_control.launch.py"]
)
),
launch_arguments={
"ur_type": ur_type,
"launch_rviz": "false",
"start_joint_controller": "true",
"launch_rviz": "false",
"gazebo_gui": "false",
}.items(),
)
return LaunchDescription([ReadyToTest(), gazebo])
class GazeboTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Initialize the ROS context
rclpy.init()
cls.node = Node("ur_gazebo_test")
time.sleep(1)
cls.init_robot(cls)
@classmethod
def tearDownClass(cls):
# Shutdown the ROS context
cls.node.destroy_node()
rclpy.shutdown()
def init_robot(self):
wait_for_controller(self.node, "joint_trajectory_controller", 30)
self._follow_joint_trajectory = ActionInterface(
self.node,
"/joint_trajectory_controller/follow_joint_trajectory",
FollowJointTrajectory,
)
def test_trajectory(self, ur_type):
"""Test robot movement."""
# Construct test trajectory
test_trajectory = [
(Duration(sec=6, nanosec=0), [-0.1 for j in ROBOT_JOINTS]),
(Duration(sec=9, nanosec=0), [-0.5 for j in ROBOT_JOINTS]),
(Duration(sec=12, nanosec=0), [-1.0 for j in ROBOT_JOINTS]),
]
trajectory = JointTrajectory(
# joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS],
joint_names=ROBOT_JOINTS,
points=[
JointTrajectoryPoint(positions=test_pos, time_from_start=test_time)
for (test_time, test_pos) in test_trajectory
],
)
# Sending trajectory goal
logging.info("Sending simple goal")
goal_handle = self._follow_joint_trajectory.send_goal(trajectory=trajectory)
self.assertTrue(goal_handle.accepted)
# Verify execution
result = self._follow_joint_trajectory.get_result(goal_handle, TIMEOUT_EXECUTE_TRAJECTORY)
self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment