Skip to content
Snippets Groups Projects
Commit 72ff0c11 authored by René Ebeling's avatar René Ebeling
Browse files

Refactor sensor data processing and enhance point cloud generation with...

Refactor sensor data processing and enhance point cloud generation with improved transformations and error handling
parent 9ef2af6f
Branches
No related tags found
No related merge requests found
import serial import serial # For serial communication with the sensor
import rclpy import rclpy # ROS 2 Python client library
import json import json # For parsing JSON data from the sensor
import numpy as np import numpy as np # For numerical operations
from rclpy.node import Node from rclpy.node import Node # Base class for ROS 2 nodes
import sensor_msgs.msg as sensor_msgs from sensor_msgs.msg import PointCloud2, PointField # ROS 2 message types for point clouds
from std_msgs.msg import Header from std_msgs.msg import Header # ROS 2 message type for headers
angles = [29.923, 22.515, 15.829, 8.964, 1.964, 188.964, 195.829, 202.515, 209.923] # angles of the sensorzone centers in degrees
# Dictionary structure for sensors # Predefined angles for the sensor's field of view
sensor_data = { ANGLES = [29.923, 22.515, 15.829, 8.964, 1.964, 188.964, 195.829, 202.515, 209.923]
"sensor0": [[5], [-1]], # the second value determines if the sensor looks down (-1) or up (1) in 60° steps
"sensor1": [[0], [-1]], # the first value determines how far the points rotate around the z-axis in 60° steps # Mapping of sensor keys to their translation and rotation data
"sensor2": [[1], [-1]], SENSOR_DATA = {
"sensor3": [[5], [0]], "sensor0": [[5], [1]], "sensor1": [[0], [1]], "sensor2": [[1], [1]],
"sensor4": [[0], [0]], "sensor3": [[1], [0]], "sensor4": [[0], [0]], "sensor5": [[5], [0]],
"sensor5": [[1], [0]], "sensor6": [[5], [-1]], "sensor7": [[0], [-1]], "sensor8": [[1], [-1]],
"sensor6": [[5], [1]], "sensor9": [[2], [-1]], "sensor10": [[3], [-1]], "sensor11": [[4], [-1]],
"sensor7": [[0], [1]], "sensor12": [[4], [0]], "sensor13": [[3], [0]], "sensor14": [[2], [0]],
"sensor8": [[1], [1]], "sensor15": [[2], [1]], "sensor16": [[3], [1]], "sensor17": [[4], [1]],
"sensor9": [[2], [-1]],
"sensor10": [[3], [-1]],
"sensor11": [[4], [-1]],
"sensor12": [[2], [0]],
"sensor13": [[3], [0]],
"sensor14": [[4], [0]],
"sensor15": [[2], [1]],
"sensor16": [[3], [1]],
"sensor17": [[4], [1]],
} }
points = np.empty((0, 0))
# Define a translation vector to move points 85 mm in the x direction # Translation vector for the sensor in meters
translation_vector = np.array([85, 0, 0]) # 85 mm = 0.085 meters TRANSLATION_VECTOR = np.array([0.085, 0, 0]) # 85 mm in meters
class SerialListPublisher(Node): class SerialListPublisher(Node):
def __init__(self, serial_port='/dev/ttyACM0', baudrate=1000000): # the Serialport or baudrate may have to be edited when changing host pc """
super().__init__('serial_list_publisher') A ROS 2 node that reads data from a serial port, processes it, and publishes it as a PointCloud2 message.
self.pcd_publisher = self.create_publisher(sensor_msgs.PointCloud2, 'pcd', 10) """
def __init__(self, serial_port='/dev/ttyACM0', baudrate=115200):
super().__init__('serial_list_publisher') # Initialize the ROS 2 node with a name
self.declare_parameter('parent_frame', 'vl53l7cx_link') # Declare a parameter for the parent frame
self.parent_frame = self.get_parameter('parent_frame').get_parameter_value().string_value # Get the parameter value
# Create a publisher for PointCloud2 messages
self.pcd_publisher = self.create_publisher(PointCloud2, 'pcd', 10)
# Open the serial port for communication
self.serial_port = serial.Serial(serial_port, baudrate, timeout=None) self.serial_port = serial.Serial(serial_port, baudrate, timeout=None)
self.get_logger().info(f"Connected to {serial_port} at {baudrate} baud") self.get_logger().info(f"Connected to {serial_port} at {baudrate} baud") # Log the connection details
# Start reading data from the serial port and publishing it
self.read_serial_and_publish() self.read_serial_and_publish()
def read_serial_and_publish(self): def read_serial_and_publish(self):
"""
Continuously read data from the serial port, process it, and publish it as a PointCloud2 message.
"""
try: try:
while rclpy.ok(): while rclpy.ok(): # Keep running while ROS 2 is active
line = self.serial_port.readline().decode('utf-8').strip() # Read a line from the serial port line = self.serial_port.readline().decode('utf-8').strip() # Read a line from the serial port
try: data = self.parse_json(line) # Parse the JSON data
data = json.loads(line) # Interpret arrays from JSON String if not data: # Skip if the data is invalid
except json.JSONDecodeError as e: # If the JSON String is not valid, we log the error and skip this line
self.get_logger().error(f"JSON decode error: {e} | Line: {line}")
continue continue
if not isinstance(data, dict): # Check if the parsed JSON is a dictionary
self.get_logger().error(f"Unexpected JSON format: {data}")
continue
points_all=[] # Create an empty List for the Arrays of points from the Sensors
for key, array in data.items(): # a for-Loop that goes through the arrays of data from each sensor
points_all.append(self.create_pcd_from_distance(np.array(array).reshape(8,8), key)) # in this line we create an 8x8 np.array from the sensor data list, then we feed it into the the create_plc_from_distance() with the angle we want the points to be rotated around the z-axis
self.points = np.vstack(points_all) # stacking the sensor 64x3 arrays to one (count*64)x3 vertically
self.pcd = self.point_cloud(self.points, 'vl53l7cx_link') # converting the np.array to pointcloud data with the parent_frame. The parent_frame has to be edited in the future to suit the use-case
self.pcd_publisher.publish(self.pcd) # publishing the pointcloud2 topic
# Process the data for each sensor and create a point cloud
points_all = [self.create_pcd_from_distance(np.array(array).reshape(8, 8), key)
for key, array in data.items()]
points = np.vstack(points_all) # Combine all points into a single array
points = self.distance_filter(points, 2) # Apply a distance filter to the points
pcd = self.create_point_cloud(points, self.parent_frame) # Create a PointCloud2 message
self.pcd_publisher.publish(pcd) # Publish the point cloud
except serial.SerialException as e: except serial.SerialException as e:
self.get_logger().error(f"Serial error: {e}") self.get_logger().error(f"Serial error: {e}") # Log serial communication errors
except KeyboardInterrupt: except KeyboardInterrupt:
self.get_logger().info("Shutting down...") self.get_logger().info("Shutting down...") # Log when the node is stopped by the user
finally: finally:
self.serial_port.close() self.serial_port.close() # Close the serial port when done
def parse_json(self, line):
"""
Parse a JSON string from the serial port.
"""
try:
data = json.loads(line) # Parse the JSON string
if not isinstance(data, dict): # Ensure the data is a dictionary
self.get_logger().info(f"Unexpected JSON format: {data}")
return None
return data
except json.JSONDecodeError as e:
self.get_logger().warn(f"JSON decode error: {e} | Line: {line}") # Log JSON parsing errors
return None
def create_pcd_from_distance(self, distance, sensor_key): def create_pcd_from_distance(self, distance, sensor_key):
"""
Create a point cloud from distance data for a specific sensor.
"""
x, y, z = self.calculate_coordinates(distance) # Calculate the 3D coordinates
points_local = np.hstack((x, y, z)) / 1000 # Convert coordinates to meters
#declaring the lists for the koordinates components # Get the yaw and pitch angles for the sensor
x_collum = [] yaw_deg, pitch_deg = self.get_sensor_angles(sensor_key)
z_collum = [] points_local = self.apply_transformations(points_local, yaw_deg, pitch_deg) # Apply transformations
y_collum = [] return points_local
# Create a 3D point cloud from the distance data
for j in range(len(distance[:][0])):
for i in range(len(distance[0])):
x = (distance[i][j])
x_collum.append(x)
# Calculate x and y using trigonometry
z = x * np.sin(np.radians(angles[j]))
y = x * np.sin(np.radians(angles[i]))
# Append the calculated coordinates to the lists
z_collum.append(z)
y_collum.append(y)
# Convert the lists to numpy arrays and reshape them
z_collum = np.array(z_collum).reshape(64,1).astype(int) # Convert list to numpy array
y_collum = np.array(y_collum).reshape(64,1).astype(int) # Convert list to numpy array
x_collum = np.array(x_collum).reshape(64,1).astype(int) # Convert list to numpy array
# Apply the translation vector to the points
x_collum = x_collum + translation_vector[0] # Adding the translation vector to the x_collum
y_collum = y_collum + translation_vector[1] # Adding the translation vector to the y_collum
z_collum = z_collum + translation_vector[2] # Adding the translation vector to the z_collum
# Apply the rotation matrix to the points
# Retrieve the rotation step from the sensor_data dictionary
rotation_angle_y = sensor_data[sensor_key][0][0] * np.radians(60)
# Define the rotation matrix for the calculated angle around the y-axis
rotation_matrix_dynamic_y = np.array([
[np.cos(rotation_angle_y), 0, np.sin(rotation_angle_y)],
[0, 1, 0],
[-np.sin(rotation_angle_y), 0, np.cos(rotation_angle_y)]
])
# Apply the dynamic rotation matrix around the y-axis def calculate_coordinates(self, distance):
self.points = np.dot(np.hstack((x_collum, y_collum, z_collum)) / 1000, rotation_matrix_dynamic_y.T) """
Calculate the 3D coordinates from distance data.
"""
x, y, z = [], [], []
for j in range(distance.shape[1]): # Iterate over columns
for i in range(distance.shape[0]): # Iterate over rows
dist = distance[i, j] # Get the distance value
z.append(dist * np.sin(np.radians(ANGLES[j]))) # Calculate z-coordinate
y.append(dist * np.sin(np.radians(ANGLES[i]))) # Calculate y-coordinate
x.append(dist) # x-coordinate is the distance itself
return np.array(x).reshape(-1, 1), np.array(y).reshape(-1, 1), np.array(z).reshape(-1, 1)
def get_sensor_angles(self, sensor_key):
"""
Get the yaw and pitch angles for a specific sensor.
"""
yaw_deg = SENSOR_DATA[sensor_key][1][0] * 60 # Calculate yaw angle
pitch_deg = SENSOR_DATA[sensor_key][0][0] * 60 # Calculate pitch angle
return yaw_deg, pitch_deg
# Define the rotation angle in radians based on the second column from the sensor_data dictionary def apply_transformations(self, points, yaw_deg, pitch_deg):
theta = sensor_data[sensor_key][1][0] * np.radians(60) # Multiply by 60 degrees step """
Apply rotation and translation transformations to the points.
"""
yaw_rad, pitch_rad = np.radians([yaw_deg, pitch_deg]) # Convert angles to radians
self.get_logger().info(f"Sensor key: {sensor_key}, Rotation angle: {np.degrees(theta)} degrees") # Rotation matrices for x, y, and z axes
Rx = self.rotation_matrix_x(-np.pi / 2)
Ry = self.rotation_matrix_y(yaw_rad)
Rz = self.rotation_matrix_z(pitch_rad)
# Define the rotation matrix around the z-axis # Apply the rotations
rotation_matrix = np.array([ points = (Rx @ points.T).T
[np.cos(theta), -np.sin(theta), 0], points = (Ry @ points.T).T
[np.sin(theta), np.cos(theta), 0], points = (Rz @ points.T).T
[0, 0, 1]
])
# Apply the rotation matrix around the z-axis # Calculate the translation vector
self.points = np.dot(self.points, rotation_matrix.T) # Creating a 64x3 array out of the z_collum, y_collum and x_collum translation = self.calculate_translation(yaw_rad, pitch_rad)
return self.points return points + translation # Apply the translation
def point_cloud(self, points, parent_frame): @staticmethod
""" Creates a point cloud message. def rotation_matrix_x(angle):
Args: """
points: Nx3 array of xyz positions. Create a rotation matrix for rotation around the x-axis.
parent_frame: frame in which the point cloud is defined """
Returns: return np.array([
sensor_msgs/PointCloud2 message [1, 0, 0],
[0, np.cos(angle), -np.sin(angle)],
[0, np.sin(angle), np.cos(angle)]
])
Code source: @staticmethod
https://gist.github.com/pgorczak/5c717baa44479fa064eb8d33ea4587e0 def rotation_matrix_y(angle):
"""
Create a rotation matrix for rotation around the y-axis.
"""
return np.array([
[np.cos(angle), 0, np.sin(angle)],
[0, 1, 0],
[-np.sin(angle), 0, np.cos(angle)]
])
References: @staticmethod
http://docs.ros.org/melodic/api/sensor_msgs/html/msg/PointCloud2.html def rotation_matrix_z(angle):
http://docs.ros.org/melodic/api/sensor_msgs/html/msg/PointField.html """
http://docs.ros.org/melodic/api/std_msgs/html/msg/Header.html Create a rotation matrix for rotation around the z-axis.
"""
return np.array([
[np.cos(angle), -np.sin(angle), 0],
[np.sin(angle), np.cos(angle), 0],
[0, 0, 1]
])
def calculate_translation(self, yaw_rad, pitch_rad):
"""
Calculate the translation vector based on yaw and pitch angles.
""" """
# In a PointCloud2 message, the point cloud is stored as an byte tx = TRANSLATION_VECTOR[0] * np.cos(yaw_rad) * np.cos(pitch_rad) # x-component of translation
# array. In order to unpack it, we also include some parameters ty = TRANSLATION_VECTOR[0] * np.sin(yaw_rad) * np.cos(pitch_rad) # y-component of translation
# which desribes the size of each individual point. tz = TRANSLATION_VECTOR[0] * np.sin(pitch_rad) # z-component of translation
ros_dtype = sensor_msgs.PointField.FLOAT32 return np.array([tx, ty, tz])
dtype = np.float32
itemsize = np.dtype(dtype).itemsize # A 32-bit float takes 4 bytes.
data = points.astype(dtype).tobytes() def distance_filter(self, points, threshold):
"""
Filter points based on a distance threshold.
"""
mask = np.linalg.norm(points, axis=1) < threshold
return points[mask] # Return only the points within the threshold
# The fields specify what the bytes represents. The first 4 bytes def create_point_cloud(self, points, parent_frame):
# represents the x-coordinate, the next 4 the y-coordinate, etc. """
fields = [sensor_msgs.PointField( Create a PointCloud2 message from the 3D points.
name=n, offset=i*itemsize, datatype=ros_dtype, count=1) """
for i, n in enumerate('xyz')] ros_dtype = PointField.FLOAT32 # ROS 2 data type for floating-point numbers
dtype = np.float32 # NumPy data type for floating-point numbers
itemsize = np.dtype(dtype).itemsize # Size of each data item in bytes
# The PointCloud2 message also has a header which specifies which data = points.astype(dtype).tobytes() # Convert points to binary data
# coordinate frame it is represented in. fields = [PointField(name=n, offset=i * itemsize, datatype=ros_dtype, count=1)
header = Header(frame_id=parent_frame) for i, n in enumerate('xyz')] # Define the fields for x, y, and z
return sensor_msgs.PointCloud2( header = Header(frame_id=parent_frame) # Create a header with the parent frame
return PointCloud2(
header=header, header=header,
height=1, height=1, # Single row of points
width=points.shape[0], width=points.shape[0], # Number of points
is_dense=False, is_dense=False, # Allow for invalid points
is_bigendian=False, is_bigendian=False, # Data is in little-endian format
fields=fields, fields=fields, # Fields for x, y, and z
point_step=(itemsize * 3), # Every point consists of three float32s. point_step=(itemsize * 3), # Size of each point in bytes
row_step=(itemsize * 3 * points.shape[0]), row_step=(itemsize * 3 * points.shape[0]), # Size of each row in bytes
data=data data=data # Binary data for the points
) )
def main(args=None): def main(args=None):
rclpy.init(args=args) """
node = SerialListPublisher(serial_port='/dev/ttyACM0', baudrate= 115200) Main function to initialize and run the ROS 2 node.
"""
rclpy.init(args=args) # Initialize the ROS 2 Python client library
node = SerialListPublisher(serial_port='/dev/ttyACM0', baudrate=115200) # Create the node
try: try:
rclpy.spin(node) rclpy.spin(node) # Keep the node running
except KeyboardInterrupt: except KeyboardInterrupt:
node.get_logger().info("Node stopped by user.") node.get_logger().info("Node stopped by user.") # Log when the node is stopped
finally: finally:
node.destroy_node() node.destroy_node() # Destroy the node
rclpy.shutdown() rclpy.shutdown() # Shut down the ROS 2 client library
if __name__ == '__main__': if __name__ == '__main__':
main() main() # Run the main function
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment