Skip to content
Snippets Groups Projects
Select Git revision
  • 0a422767b668046deb74bdea76b1d685766cea3d
  • main default protected
  • 0.0.0
3 results

metrics.py

Blame
  • user avatar
    Mateus Harano authored
    0a422767
    History
    metrics.py 10.69 KiB
    import os
    import open3d as o3d
    import numpy as np
    from scipy.spatial import KDTree
    from scipy.spatial.transform import Rotation as R
    import matplotlib.pyplot as plt
    import seaborn as sns
    from tqdm.contrib.itertools import product
    sns.set()
    
    from .pointcloud_processing.process_pointclouds import layerize_pointclouds
    from .pointcloud_processing.calibration import prepare_dataset, execute_global_registration, draw_registration_result
    
    def get_samples(translation:np.ndarray=np.array((0, 0, 0)), rotation:float=30, plot: bool = True):
        square = np.zeros((10, 10))
        square[3:7, 3:7] = 1
        straight = np.array(np.where(square == 1))
        straight = np.vstack((straight, np.zeros((1, len(straight[0])))))
    
        rotation_matrix = R.as_matrix(R.from_euler('z', rotation, degrees=True))
        transformed = (rotation_matrix @ straight)
        transformed = transformed + translation.reshape(3, 1)
    
        if plot:
            plot_squares(straight, transformed)
    
        return straight.T, transformed.T
    
    def get_one_missplaced(deviation:np.ndarray=np.array([-1, 0, 0]), deviation_index:tuple[float, float, float]=[0, 0, 0], plot:bool=True):
        straight, transformed = get_samples(rotation=0, plot=False)
        transformed[deviation_index] += deviation
        if plot:
            plot_squares(straight, transformed)
        return straight, transformed
    
    def plot_squares(pcd1:np.ndarray, pcd2:np.ndarray):
        plt.plot(pcd1[:, 0], pcd1[:, 1], 'o')
        plt.plot(pcd2[:, 0], pcd2[:, 1], 'o')
        plt.legend(['straight', 'transformed'])
    
    
    def plot3d(pcls:list[np.ndarray], alpha:float=0.4, size_multiplier:float=1, figsize:tuple[int]=(10, 10)) -> plt.Figure:
        """Plots a list of point clouds in 3D"""
        fig = plt.figure(figsize=figsize)
        fig.tight_layout()
        ax = fig.add_subplot(projection='3d')
        if not isinstance(pcls, list):
            pcls = [pcls]
        try:
            s = size_multiplier * 0.06/(pcls[0][:, 0].max()-pcls[0][:, 0].min())
        except:
            s = size_multiplier * 0.06
        for pcl in pcls:
            if len(pcl) == 0:
                continue
            x, y, z = pcl[:, 0], pcl[:, 1], pcl[:, 2]
            ax.set_box_aspect((np.ptp(x), np.ptp(y), np.ptp(z)))
            ax.scatter(x, y, z, s=s, marker='o', alpha=alpha)
            ax.set_xlabel('X')
            ax.set_ylabel('Y')
            ax.set_zlabel('Z')
        return fig
    
    def RMSE(reference:np.ndarray, target:np.ndarray, workers:int=-1) -> float:
        octree = KDTree(reference)
        distances, indices = octree.query(target, k=1, p=2, workers=workers)
        rmse = np.sqrt(np.mean(distances**2))
        return rmse
    
    def calculate_accuracy(reference:np.ndarray, target:np.ndarray, threshold:float, workers:int=-1) -> tuple[float, np.ndarray]:
        """Calculates the accuracy of the target point cloud compared to the reference point cloud.
        If reference and target are swept, metric is called 'completeness'.
        Returns the accuracy and the indices of the inaccurate points, i.e. the points of target that do not match any of the reference."""
        octree = KDTree(reference)
        distances, indices = octree.query(target, k=1, distance_upper_bound=threshold, p=2, workers=workers)
        accuracy = np.array(distances <= threshold, dtype=int).sum() / len(distances)
        return accuracy, np.array(range(len(target)))[distances > threshold]
    
    def f1_score(reference:np.ndarray, target:np.ndarray, threshold:float, workers:int=-1, plot_results:bool=False) -> tuple[float, np.ndarray, np.ndarray]:
        """Calculates the F1 score between two pointclouds. Reference and target are interchangeable.
        Returns the score, the indices of the inaccurate points of the reference and the indices of the uncomplete points of the target."""
        accuracy, inaccurate_ids = calculate_accuracy(reference, target, threshold, workers=workers)
        completeness, uncomplete_ids = calculate_accuracy(target, reference, threshold, workers=workers)
        score = 2 * (accuracy * completeness) / (accuracy + completeness)
        if plot_results:
            inaccurate = target[inaccurate_ids]
            accurate = target[np.setdiff1d(np.arange(len(target)), inaccurate_ids)]
            uncomplete = reference[uncomplete_ids]
            complete = reference[np.setdiff1d(np.arange(len(reference)), uncomplete_ids)]
            matched = np.concatenate([accurate, complete], axis=0)
            plot3d([matched, inaccurate, uncomplete])
        return score, inaccurate_ids, uncomplete_ids
    
    def EMD(reference:np.ndarray, target:np.ndarray, workers:int=-1) -> float:
        octree = KDTree(target)
        distances, indices = octree.query(reference, k=1, p=2, workers=workers)
        return distances.sum()
    
    def average_squared_distance(reference:np.ndarray, target:np.ndarray, neighbours:int=1, workers:int=-1) -> np.ndarray:
        """Calculates the average squared distance to neighbours of all target's points to the reference"""
        octree = KDTree(reference)
        distances, indices = octree.query(target, k=neighbours, p=2, workers=workers)
        return distances**2 / neighbours
    
    def k_chamf(reference:np.ndarray, target:np.ndarray, neighbours:int=1, workers:float=-1) -> float:
        """Calculates the k-Nearest Chamfer distance between two point clouds. Reference and target are interchangeable."""
        first_term = average_squared_distance(reference, target, neighbours, workers=workers)
        first_term = np.mean(first_term)
        second_term = average_squared_distance(target, reference, neighbours, workers=workers)
        second_term = np.mean(second_term)
        return first_term + second_term
    
    def load_pointcloud(path:str, bed_path:str, roi:list[list[float]]=[[None, None], [None, None], [None, None]], intensity_threshold:int=60, plot:bool=True) -> np.ndarray:
        """Load the pointcloud from npy file, and layerize it by comparing with its bed.
        @param path: the path to the npy file of the measurement point cloud
        @param bed: the path to the npy file of the bed point cloud
        @param roi: region that will be used. The rest of the part is cut off. Leave it as `None` not to define a limit.
        Format: [[x0, x1], [y0, y1], [z0, z1]]
        @param intensity_threshold: the threshold of the intensity of the point cloud. Points with intensity below this threshold will be removed.
        @param plot: plot the point cloud using matplotlib if requested"""
        pcl =  np.load(path)
        bed = np.load(bed_path)[:, :3]
    
        pcl = pcl[pcl[:, 3] > intensity_threshold]
        pcl = pcl[:, :3]
        _, pcl = layerize_pointclouds([bed, pcl])  # Layerize pointclouds
        pcl[:, 2] = -pcl[:, 2] + max(pcl[:, 2])  # Invert z axis
        pcl = pcl[pcl[:, 2] < 100]  # Removing outliers
        for i in range(3):
            if roi[i][0]is not None: pcl = pcl[:, i] > roi[i][0]
            if roi[i][1]is not None: pcl = pcl[:, i] < roi[i][1]
    
        if plot:
            plot3d(pcl)
        
        return pcl
    
    def icp(source:np.ndarray, target:np.ndarray, voxel_size:float=0.5) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Performs Iteractive Closest Point registration on two point clouds.
        The source point cloud is transformed to match the target point cloud.
        Returns the transformed source point cloud, the target point cloud and the transformation matrix.
        """
        threshold = voxel_size * 0.4
    
        pcd1 = o3d.geometry.PointCloud()
        pcd1.points = o3d.utility.Vector3dVector(np.reshape(source, (-1, 3)))
        pcd2 = o3d.geometry.PointCloud()
        pcd2.points = o3d.utility.Vector3dVector(np.reshape(target, (-1, 3)))
    
        source_pcl, target_pcl, source_down, target_down, source_fpfh, target_fpfh = prepare_dataset(voxel_size, source=pcd1, target=pcd2)
        result_ransac = execute_global_registration(source_down, target_down, source_fpfh, target_fpfh, voxel_size)
    
        result_icp = o3d.pipelines.registration.registration_icp(
            source_pcl, target_pcl, threshold, result_ransac.transformation,
            o3d.pipelines.registration.TransformationEstimationPointToPoint(),
            o3d.pipelines.registration.ICPConvergenceCriteria(max_iteration=5000))
        
        transformation = result_icp.transformation
        return apply_transformation(source, transformation), target, transformation
    
    def apply_transformation(source:np.ndarray, transformation:np.ndarray) -> np.ndarray:
        """Applies a transformation matrix to a point cloud"""
        source = np.hstack((source, np.ones((len(source), 1))))
        source = transformation @ source.T
        source = source.T
        return source[:, :3]
    
    def remove_noise(pointcloud:np.ndarray, threshold:float=0.5, workers:int=-1) -> np.ndarray:
        """Removes noise from a point cloud. Returns the pointcloud without noise and log the deletion ratio, i.e. the percentage of points that were removed."""
        basis = KDTree(pointcloud)
        distances, indices = basis.query(pointcloud, k=[2], p=2, distance_upper_bound=threshold, workers=workers)
        not_noise = distances[:, 0] < threshold
        remove_ratio = 1-not_noise.sum()/len(pointcloud)
        print(f"Removed {remove_ratio*100:.2f}% of the pointcloud")
        return pointcloud[not_noise]
    
    def load_dataset(path="../data/metrics/measurements") -> list[tuple[str, str]]:
        files = []
        for filename in os.listdir(path):
            if not filename.endswith(".npy") or filename.startswith("bed") or int(filename.split(".")[0][-1]) > 1:
                continue
            run = filename.split("_")[1]
            bed = os.path.join(path, f"bed_before_printing_{run}_0.npy")
            files.append((bed, os.path.join(path, filename)))
        return files
    
    def evaluate_metrics(dataset:list[tuple[str, str]], results_path:str="../data/metrics/results") -> None:
        """Evaluates the metrics on a dataset of point clouds."""
        os.makedirs(results_path, exist_ok=True)
        with open(os.path.join(results_path, "results.csv"), "w") as f:
            f.write("target,source,rmse,f1,emd,chamfer\n")
        for s, t in product(dataset, dataset):
            s_file = os.path.split(s[1])[-1]
            t_file = os.path.split(t[1])[-1]
            if s == t:
                continue
            elif 'printing_0' in s_file or 'printing_0' in t_file:  # Ignore the first measurement
                continue
            elif 'measurement' not in s_file and 'measurement' not in t_file:  # Only compare scenarios to as-is
                continue
    
            source = load_pointcloud(s[1], s[0], plot=False)
            target = load_pointcloud(t[1], t[0], plot=False)
    
            source = remove_noise(source, 0.2)
            target = remove_noise(target, 0.2)
    
            source, _, _ = icp(source, target)
            
            rmse = RMSE(source, target)
            f1, _, _ = f1_score(source, target, 0.15, plot_results=False)
            emd = EMD(source, target)
            chamfer = k_chamf(source, target)
    
            with open(os.path.join(results_path, "results.csv"), "a") as f:
                f.write(",".join((
                    os.path.split(t[1])[-1],
                    os.path.split(s[1])[-1],
                    str(round(rmse, 4)),
                    str(round(f1, 4)),
                    str(round(emd, 4)),
                    str(round(chamfer, 4))
                    )))
                f.write("\n")
    
            del source, target