Skip to content
Snippets Groups Projects
Commit 45c9f3e2 authored by René Ebeling's avatar René Ebeling
Browse files

Refactor point cloud processing: add publishers for safe zones and adjust trajectory stopping logic

parent eb3876c5
No related branches found
No related tags found
No related merge requests found
...@@ -6,6 +6,8 @@ import numpy as np ...@@ -6,6 +6,8 @@ import numpy as np
import trimesh import trimesh
from std_msgs.msg import String from std_msgs.msg import String
import sensor_msgs_py.point_cloud2 as pc2 import sensor_msgs_py.point_cloud2 as pc2
from std_msgs.msg import Header
from sensor_msgs.msg import PointField
class PointCloudProcessor(Node): class PointCloudProcessor(Node):
def __init__(self): def __init__(self):
...@@ -20,13 +22,20 @@ class PointCloudProcessor(Node): ...@@ -20,13 +22,20 @@ class PointCloudProcessor(Node):
self.cancellation_pub = self.create_publisher( self.cancellation_pub = self.create_publisher(
String, "/trajectory_execution_event", 1 String, "/trajectory_execution_event", 1
) )
self.out_zone_pub = self.create_publisher(
PointCloud2,
'/out_safe_zone',
10)
self.in_zone_pub = self.create_publisher(
PointCloud2,
'/in_safe_zone',
10)
self.tf_buffer = Buffer() self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self) self.tf_listener = TransformListener(self.tf_buffer, self)
# Create meshes 5cm larger than the original example # Create meshes 5cm larger than the original example
self.meshes_static = [ self.meshes_static = [
trimesh.creation.cylinder(radius=0.2, height=1.0), # Cylinder for upper_arm_link trimesh.creation.cylinder(radius=0.2, height=0.8), # Cylinder for upper_arm_link
trimesh.creation.cylinder(radius=0.2, height=1.05) # Cylinder for forearm_link trimesh.creation.cylinder(radius=0.2, height=1.05) # Cylinder for forearm_link
] ]
...@@ -44,7 +53,7 @@ class PointCloudProcessor(Node): ...@@ -44,7 +53,7 @@ class PointCloudProcessor(Node):
mesh.apply_transform(translation_matrix_x) mesh.apply_transform(translation_matrix_x)
# Move the upper_arm_link cylinder 10cm in the positive x direction # Move the upper_arm_link cylinder 10cm in the positive x direction
translation_matrix_upper_arm = trimesh.transformations.translation_matrix([0.1, 0, 0]) translation_matrix_upper_arm = trimesh.transformations.translation_matrix([0, 0, 0])
self.meshes_static[0].apply_transform(translation_matrix_upper_arm) self.meshes_static[0].apply_transform(translation_matrix_upper_arm)
# Move the forearm_link cylinder 5cm in the negative x direction # Move the forearm_link cylinder 5cm in the negative x direction
...@@ -57,7 +66,7 @@ class PointCloudProcessor(Node): ...@@ -57,7 +66,7 @@ class PointCloudProcessor(Node):
mesh.apply_transform(translation_matrix_z) mesh.apply_transform(translation_matrix_z)
# Threshold for stopping the trajectory # Threshold for stopping the trajectory
self.point_threshold = 33 # Adjust as needed self.point_threshold = 2 # Adjust as needed
def pointcloud_callback(self, msg): def pointcloud_callback(self, msg):
self.perspective_frame = 'vl53l7cx_link' self.perspective_frame = 'vl53l7cx_link'
...@@ -80,10 +89,13 @@ class PointCloudProcessor(Node): ...@@ -80,10 +89,13 @@ class PointCloudProcessor(Node):
inside_robot = np.logical_or.reduce([mesh.contains(points) for mesh in transformed_meshes]) inside_robot = np.logical_or.reduce([mesh.contains(points) for mesh in transformed_meshes])
points_inside_robot = points[inside_robot] points_inside_robot = points[inside_robot]
self.get_logger().info(f'Number of points inside robot: {len(points_inside_robot)}') self.get_logger().info(f'Number of points to close to robot: {len(points_inside_robot)}')
if len(points_inside_robot) > self.point_threshold: # Publish the point clouds
self.stop_trajectory() self.out_zone_pub.publish(self.create_point_cloud(points[~inside_robot], self.perspective_frame))
self.in_zone_pub.publish(self.create_point_cloud(points_inside_robot, self.perspective_frame))
#if len(points_inside_robot) > self.point_threshold:
# self.stop_trajectory()
def transform_mesh(self, transform, mesh_static): def transform_mesh(self, transform, mesh_static):
translation = transform.transform.translation translation = transform.transform.translation
...@@ -98,10 +110,36 @@ class PointCloudProcessor(Node): ...@@ -98,10 +110,36 @@ class PointCloudProcessor(Node):
return transformed_mesh return transformed_mesh
def stop_trajectory(self): def stop_trajectory(self):
self.get_logger().info("Stopping trajectory execution due to point cloud threshold.")
cancel_string = String() cancel_string = String()
cancel_string.data = "stop" cancel_string.data = "stop"
self.cancellation_pub.publish(cancel_string) self.cancellation_pub.publish(cancel_string)
def create_point_cloud(self, points, parent_frame):
"""
Create a PointCloud2 message from the 3D points.
"""
ros_dtype = PointField.FLOAT32 # ROS 2 data type for floating-point numbers
dtype = np.float32 # NumPy data type for floating-point numbers
itemsize = np.dtype(dtype).itemsize # Size of each data item in bytes
data = points.astype(dtype).tobytes() # Convert points to binary data
fields = [PointField(name=n, offset=i * itemsize, datatype=ros_dtype, count=1)
for i, n in enumerate('xyz')] # Define the fields for x, y, and z
header = Header(frame_id=parent_frame) # Create a header with the parent frame
return PointCloud2(
header=header,
height=1, # Single row of points
width=points.shape[0], # Number of points
is_dense=True, # Allow for invalid points
is_bigendian=False, # Data is in little-endian format
fields=fields, # Fields for x, y, and z
point_step=(itemsize * 3), # Size of each point in bytes
row_step=(itemsize * 3 * points.shape[0]), # Size of each row in bytes
data=data # Binary data for the points
)
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
pointcloud_processor = PointCloudProcessor() pointcloud_processor = PointCloudProcessor()
......
...@@ -10,7 +10,6 @@ import sensor_msgs_py.point_cloud2 as pc2 ...@@ -10,7 +10,6 @@ import sensor_msgs_py.point_cloud2 as pc2
from std_msgs.msg import Header from std_msgs.msg import Header
from scipy.spatial.transform import Rotation as R from scipy.spatial.transform import Rotation as R
import os import os
from geometry_msgs.msg import TransformStamped
class PointCloudMeshChecker(Node): class PointCloudMeshChecker(Node):
def __init__(self): def __init__(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment