diff --git a/workspaces/COLCON_WS/src/ser_test/ser_test/ser_test_node.py b/workspaces/COLCON_WS/src/ser_test/ser_test/ser_test_node.py index be0400a9d6905f20c610a37d2f1c1b5856b7b8bb..509f38409fd17ec2508f2801b8136d8889ec81d5 100644 --- a/workspaces/COLCON_WS/src/ser_test/ser_test/ser_test_node.py +++ b/workspaces/COLCON_WS/src/ser_test/ser_test/ser_test_node.py @@ -1,192 +1,228 @@ -import serial -import rclpy -import json -import numpy as np -from rclpy.node import Node -import sensor_msgs.msg as sensor_msgs -from std_msgs.msg import Header -angles = [29.923, 22.515, 15.829, 8.964, 1.964, 188.964, 195.829, 202.515, 209.923] # angles of the sensorzone centers in degrees -# Dictionary structure for sensors -sensor_data = { - "sensor0": [[5], [-1]], # the second value determines if the sensor looks down (-1) or up (1) in 60° steps - "sensor1": [[0], [-1]], # the first value determines how far the points rotate around the z-axis in 60° steps - "sensor2": [[1], [-1]], - "sensor3": [[5], [0]], - "sensor4": [[0], [0]], - "sensor5": [[1], [0]], - "sensor6": [[5], [1]], - "sensor7": [[0], [1]], - "sensor8": [[1], [1]], - "sensor9": [[2], [-1]], - "sensor10": [[3], [-1]], - "sensor11": [[4], [-1]], - "sensor12": [[2], [0]], - "sensor13": [[3], [0]], - "sensor14": [[4], [0]], - "sensor15": [[2], [1]], - "sensor16": [[3], [1]], - "sensor17": [[4], [1]], +import serial # For serial communication with the sensor +import rclpy # ROS 2 Python client library +import json # For parsing JSON data from the sensor +import numpy as np # For numerical operations +from rclpy.node import Node # Base class for ROS 2 nodes +from sensor_msgs.msg import PointCloud2, PointField # ROS 2 message types for point clouds +from std_msgs.msg import Header # ROS 2 message type for headers + +# Predefined angles for the sensor's field of view +ANGLES = [29.923, 22.515, 15.829, 8.964, 1.964, 188.964, 195.829, 202.515, 209.923] + +# Mapping of sensor keys to their translation and rotation data +SENSOR_DATA = { + "sensor0": [[5], [1]], "sensor1": [[0], [1]], "sensor2": [[1], [1]], + "sensor3": [[1], [0]], "sensor4": [[0], [0]], "sensor5": [[5], [0]], + "sensor6": [[5], [-1]], "sensor7": [[0], [-1]], "sensor8": [[1], [-1]], + "sensor9": [[2], [-1]], "sensor10": [[3], [-1]], "sensor11": [[4], [-1]], + "sensor12": [[4], [0]], "sensor13": [[3], [0]], "sensor14": [[2], [0]], + "sensor15": [[2], [1]], "sensor16": [[3], [1]], "sensor17": [[4], [1]], } -points = np.empty((0, 0)) -# Define a translation vector to move points 85 mm in the x direction -translation_vector = np.array([85, 0, 0]) # 85 mm = 0.085 meters +# Translation vector for the sensor in meters +TRANSLATION_VECTOR = np.array([0.085, 0, 0]) # 85 mm in meters + class SerialListPublisher(Node): - def __init__(self, serial_port='/dev/ttyACM0', baudrate=1000000): # the Serialport or baudrate may have to be edited when changing host pc - super().__init__('serial_list_publisher') - self.pcd_publisher = self.create_publisher(sensor_msgs.PointCloud2, 'pcd', 10) + """ + A ROS 2 node that reads data from a serial port, processes it, and publishes it as a PointCloud2 message. + """ + def __init__(self, serial_port='/dev/ttyACM0', baudrate=115200): + super().__init__('serial_list_publisher') # Initialize the ROS 2 node with a name + self.declare_parameter('parent_frame', 'vl53l7cx_link') # Declare a parameter for the parent frame + self.parent_frame = self.get_parameter('parent_frame').get_parameter_value().string_value # Get the parameter value + + # Create a publisher for PointCloud2 messages + self.pcd_publisher = self.create_publisher(PointCloud2, 'pcd', 10) + + # Open the serial port for communication self.serial_port = serial.Serial(serial_port, baudrate, timeout=None) - self.get_logger().info(f"Connected to {serial_port} at {baudrate} baud") + self.get_logger().info(f"Connected to {serial_port} at {baudrate} baud") # Log the connection details + + # Start reading data from the serial port and publishing it self.read_serial_and_publish() def read_serial_and_publish(self): + """ + Continuously read data from the serial port, process it, and publish it as a PointCloud2 message. + """ try: - while rclpy.ok(): - line = self.serial_port.readline().decode('utf-8').strip() # Read a line from the serial port - try: - data = json.loads(line) # Interpret arrays from JSON String - except json.JSONDecodeError as e: # If the JSON String is not valid, we log the error and skip this line - self.get_logger().error(f"JSON decode error: {e} | Line: {line}") - continue - if not isinstance(data, dict): # Check if the parsed JSON is a dictionary - self.get_logger().error(f"Unexpected JSON format: {data}") + while rclpy.ok(): # Keep running while ROS 2 is active + line = self.serial_port.readline().decode('utf-8').strip() # Read a line from the serial port + data = self.parse_json(line) # Parse the JSON data + if not data: # Skip if the data is invalid continue - points_all=[] # Create an empty List for the Arrays of points from the Sensors - for key, array in data.items(): # a for-Loop that goes through the arrays of data from each sensor - points_all.append(self.create_pcd_from_distance(np.array(array).reshape(8,8), key)) # in this line we create an 8x8 np.array from the sensor data list, then we feed it into the the create_plc_from_distance() with the angle we want the points to be rotated around the z-axis - self.points = np.vstack(points_all) # stacking the sensor 64x3 arrays to one (count*64)x3 vertically - self.pcd = self.point_cloud(self.points, 'vl53l7cx_link') # converting the np.array to pointcloud data with the parent_frame. The parent_frame has to be edited in the future to suit the use-case - self.pcd_publisher.publish(self.pcd) # publishing the pointcloud2 topic - + # Process the data for each sensor and create a point cloud + points_all = [self.create_pcd_from_distance(np.array(array).reshape(8, 8), key) + for key, array in data.items()] + points = np.vstack(points_all) # Combine all points into a single array + points = self.distance_filter(points, 2) # Apply a distance filter to the points + pcd = self.create_point_cloud(points, self.parent_frame) # Create a PointCloud2 message + self.pcd_publisher.publish(pcd) # Publish the point cloud except serial.SerialException as e: - self.get_logger().error(f"Serial error: {e}") + self.get_logger().error(f"Serial error: {e}") # Log serial communication errors except KeyboardInterrupt: - self.get_logger().info("Shutting down...") + self.get_logger().info("Shutting down...") # Log when the node is stopped by the user finally: - self.serial_port.close() - + self.serial_port.close() # Close the serial port when done + + def parse_json(self, line): + """ + Parse a JSON string from the serial port. + """ + try: + data = json.loads(line) # Parse the JSON string + if not isinstance(data, dict): # Ensure the data is a dictionary + self.get_logger().info(f"Unexpected JSON format: {data}") + return None + return data + except json.JSONDecodeError as e: + self.get_logger().warn(f"JSON decode error: {e} | Line: {line}") # Log JSON parsing errors + return None + def create_pcd_from_distance(self, distance, sensor_key): + """ + Create a point cloud from distance data for a specific sensor. + """ + x, y, z = self.calculate_coordinates(distance) # Calculate the 3D coordinates + points_local = np.hstack((x, y, z)) / 1000 # Convert coordinates to meters - #declaring the lists for the koordinates components - x_collum = [] - z_collum = [] - y_collum = [] - - # Create a 3D point cloud from the distance data - for j in range(len(distance[:][0])): - for i in range(len(distance[0])): - x = (distance[i][j]) - x_collum.append(x) - - # Calculate x and y using trigonometry - z = x * np.sin(np.radians(angles[j])) - y = x * np.sin(np.radians(angles[i])) - # Append the calculated coordinates to the lists - z_collum.append(z) - y_collum.append(y) - - # Convert the lists to numpy arrays and reshape them - z_collum = np.array(z_collum).reshape(64,1).astype(int) # Convert list to numpy array - y_collum = np.array(y_collum).reshape(64,1).astype(int) # Convert list to numpy array - x_collum = np.array(x_collum).reshape(64,1).astype(int) # Convert list to numpy array - - # Apply the translation vector to the points - x_collum = x_collum + translation_vector[0] # Adding the translation vector to the x_collum - y_collum = y_collum + translation_vector[1] # Adding the translation vector to the y_collum - z_collum = z_collum + translation_vector[2] # Adding the translation vector to the z_collum - - # Apply the rotation matrix to the points - # Retrieve the rotation step from the sensor_data dictionary - rotation_angle_y = sensor_data[sensor_key][0][0] * np.radians(60) - - # Define the rotation matrix for the calculated angle around the y-axis - rotation_matrix_dynamic_y = np.array([ - [np.cos(rotation_angle_y), 0, np.sin(rotation_angle_y)], - [0, 1, 0], - [-np.sin(rotation_angle_y), 0, np.cos(rotation_angle_y)] - ]) - - # Apply the dynamic rotation matrix around the y-axis - self.points = np.dot(np.hstack((x_collum, y_collum, z_collum)) / 1000, rotation_matrix_dynamic_y.T) + # Get the yaw and pitch angles for the sensor + yaw_deg, pitch_deg = self.get_sensor_angles(sensor_key) + points_local = self.apply_transformations(points_local, yaw_deg, pitch_deg) # Apply transformations + return points_local + + def calculate_coordinates(self, distance): + """ + Calculate the 3D coordinates from distance data. + """ + x, y, z = [], [], [] + for j in range(distance.shape[1]): # Iterate over columns + for i in range(distance.shape[0]): # Iterate over rows + dist = distance[i, j] # Get the distance value + z.append(dist * np.sin(np.radians(ANGLES[j]))) # Calculate z-coordinate + y.append(dist * np.sin(np.radians(ANGLES[i]))) # Calculate y-coordinate + x.append(dist) # x-coordinate is the distance itself + return np.array(x).reshape(-1, 1), np.array(y).reshape(-1, 1), np.array(z).reshape(-1, 1) + + def get_sensor_angles(self, sensor_key): + """ + Get the yaw and pitch angles for a specific sensor. + """ + yaw_deg = SENSOR_DATA[sensor_key][1][0] * 60 # Calculate yaw angle + pitch_deg = SENSOR_DATA[sensor_key][0][0] * 60 # Calculate pitch angle + return yaw_deg, pitch_deg - # Define the rotation angle in radians based on the second column from the sensor_data dictionary - theta = sensor_data[sensor_key][1][0] * np.radians(60) # Multiply by 60 degrees step + def apply_transformations(self, points, yaw_deg, pitch_deg): + """ + Apply rotation and translation transformations to the points. + """ + yaw_rad, pitch_rad = np.radians([yaw_deg, pitch_deg]) # Convert angles to radians - self.get_logger().info(f"Sensor key: {sensor_key}, Rotation angle: {np.degrees(theta)} degrees") + # Rotation matrices for x, y, and z axes + Rx = self.rotation_matrix_x(-np.pi / 2) + Ry = self.rotation_matrix_y(yaw_rad) + Rz = self.rotation_matrix_z(pitch_rad) - # Define the rotation matrix around the z-axis - rotation_matrix = np.array([ - [np.cos(theta), -np.sin(theta), 0], - [np.sin(theta), np.cos(theta), 0], - [0, 0, 1] - ]) + # Apply the rotations + points = (Rx @ points.T).T + points = (Ry @ points.T).T + points = (Rz @ points.T).T - # Apply the rotation matrix around the z-axis - self.points = np.dot(self.points, rotation_matrix.T) # Creating a 64x3 array out of the z_collum, y_collum and x_collum - return self.points + # Calculate the translation vector + translation = self.calculate_translation(yaw_rad, pitch_rad) + return points + translation # Apply the translation - def point_cloud(self, points, parent_frame): - """ Creates a point cloud message. - Args: - points: Nx3 array of xyz positions. - parent_frame: frame in which the point cloud is defined - Returns: - sensor_msgs/PointCloud2 message + @staticmethod + def rotation_matrix_x(angle): + """ + Create a rotation matrix for rotation around the x-axis. + """ + return np.array([ + [1, 0, 0], + [0, np.cos(angle), -np.sin(angle)], + [0, np.sin(angle), np.cos(angle)] + ]) - Code source: - https://gist.github.com/pgorczak/5c717baa44479fa064eb8d33ea4587e0 + @staticmethod + def rotation_matrix_y(angle): + """ + Create a rotation matrix for rotation around the y-axis. + """ + return np.array([ + [np.cos(angle), 0, np.sin(angle)], + [0, 1, 0], + [-np.sin(angle), 0, np.cos(angle)] + ]) - References: - http://docs.ros.org/melodic/api/sensor_msgs/html/msg/PointCloud2.html - http://docs.ros.org/melodic/api/sensor_msgs/html/msg/PointField.html - http://docs.ros.org/melodic/api/std_msgs/html/msg/Header.html + @staticmethod + def rotation_matrix_z(angle): + """ + Create a rotation matrix for rotation around the z-axis. + """ + return np.array([ + [np.cos(angle), -np.sin(angle), 0], + [np.sin(angle), np.cos(angle), 0], + [0, 0, 1] + ]) + def calculate_translation(self, yaw_rad, pitch_rad): + """ + Calculate the translation vector based on yaw and pitch angles. """ - # In a PointCloud2 message, the point cloud is stored as an byte - # array. In order to unpack it, we also include some parameters - # which desribes the size of each individual point. - ros_dtype = sensor_msgs.PointField.FLOAT32 - dtype = np.float32 - itemsize = np.dtype(dtype).itemsize # A 32-bit float takes 4 bytes. + tx = TRANSLATION_VECTOR[0] * np.cos(yaw_rad) * np.cos(pitch_rad) # x-component of translation + ty = TRANSLATION_VECTOR[0] * np.sin(yaw_rad) * np.cos(pitch_rad) # y-component of translation + tz = TRANSLATION_VECTOR[0] * np.sin(pitch_rad) # z-component of translation + return np.array([tx, ty, tz]) - data = points.astype(dtype).tobytes() + def distance_filter(self, points, threshold): + """ + Filter points based on a distance threshold. + """ + mask = np.linalg.norm(points, axis=1) < threshold + return points[mask] # Return only the points within the threshold - # The fields specify what the bytes represents. The first 4 bytes - # represents the x-coordinate, the next 4 the y-coordinate, etc. - fields = [sensor_msgs.PointField( - name=n, offset=i*itemsize, datatype=ros_dtype, count=1) - for i, n in enumerate('xyz')] + def create_point_cloud(self, points, parent_frame): + """ + Create a PointCloud2 message from the 3D points. + """ + ros_dtype = PointField.FLOAT32 # ROS 2 data type for floating-point numbers + dtype = np.float32 # NumPy data type for floating-point numbers + itemsize = np.dtype(dtype).itemsize # Size of each data item in bytes - # The PointCloud2 message also has a header which specifies which - # coordinate frame it is represented in. - header = Header(frame_id=parent_frame) + data = points.astype(dtype).tobytes() # Convert points to binary data + fields = [PointField(name=n, offset=i * itemsize, datatype=ros_dtype, count=1) + for i, n in enumerate('xyz')] # Define the fields for x, y, and z - return sensor_msgs.PointCloud2( + header = Header(frame_id=parent_frame) # Create a header with the parent frame + return PointCloud2( header=header, - height=1, - width=points.shape[0], - is_dense=False, - is_bigendian=False, - fields=fields, - point_step=(itemsize * 3), # Every point consists of three float32s. - row_step=(itemsize * 3 * points.shape[0]), - data=data + height=1, # Single row of points + width=points.shape[0], # Number of points + is_dense=False, # Allow for invalid points + is_bigendian=False, # Data is in little-endian format + fields=fields, # Fields for x, y, and z + point_step=(itemsize * 3), # Size of each point in bytes + row_step=(itemsize * 3 * points.shape[0]), # Size of each row in bytes + data=data # Binary data for the points ) def main(args=None): - rclpy.init(args=args) - node = SerialListPublisher(serial_port='/dev/ttyACM0', baudrate= 115200) + """ + Main function to initialize and run the ROS 2 node. + """ + rclpy.init(args=args) # Initialize the ROS 2 Python client library + node = SerialListPublisher(serial_port='/dev/ttyACM0', baudrate=115200) # Create the node try: - rclpy.spin(node) + rclpy.spin(node) # Keep the node running except KeyboardInterrupt: - node.get_logger().info("Node stopped by user.") + node.get_logger().info("Node stopped by user.") # Log when the node is stopped finally: - node.destroy_node() - rclpy.shutdown() + node.destroy_node() # Destroy the node + rclpy.shutdown() # Shut down the ROS 2 client library if __name__ == '__main__': - main() + main() # Run the main function