Skip to content
Snippets Groups Projects
Commit 0a422767 authored by Mateus Harano's avatar Mateus Harano
Browse files

Populated with code

parent c823f96c
Branches
Tags
No related merge requests found
Showing
with 3234 additions and 61 deletions
.python-version
*venv*
*__pycache__
*__runtime_stats__*
*.log
data/
# Mac storage system
*.DS_Store
\ No newline at end of file
LICENSE 0 → 100644
MIT License
Copyright (c) 2023 Production Metrology and Quality Management | RWTH Aachen University
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
This diff is collapsed.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTIBILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from .utils import init_logging, running_in_blender
if 'SmoPa3D' not in logging.Logger.manager.loggerDict.keys():
init_logging('DEBUG')
bl_info = {
"name" : "FDM Simulator",
"author" : "MateusHarano",
"description" : "This addon simulates the printing process of an FDM 3D printer.",
"blender" : (2, 80, 0),
"version" : (0, 0, 0),
"location" : "",
"warning" : "",
"category" : "Generic"
}
if running_in_blender:
from . import auto_load
auto_load.init()
def register():
auto_load.register()
def unregister():
auto_load.unregister()
import os
import bpy
import sys
import typing
import inspect
import pkgutil
import importlib
from pathlib import Path
from .utils import running_in_blender
__all__ = (
"init",
"register",
"unregister",
)
if running_in_blender:
blender_version = bpy.app.version
modules = None
ordered_classes = None
def init():
ensure_site_packages([("scipy", "scipy"), ("open3d", "open3d"), ("tqdm", "tqdm"), ("memory_profiler", "memory_profiler"), ("seaborn", "seaborn"), ("skimage", "scikit-image"), ("sklearn", "scikit-learn"), ("pandas", "pandas"), ("matplotlib", "matplotlib"), ("peakutils", "PeakUtils")])
global modules
global ordered_classes
modules = get_all_submodules(Path(__file__).parent)
ordered_classes = get_ordered_classes_to_register(modules)
def register():
for cls in ordered_classes:
bpy.utils.register_class(cls)
for module in modules:
if module.__name__ == __name__:
continue
if hasattr(module, "register"):
module.register()
def unregister():
for cls in reversed(ordered_classes):
bpy.utils.unregister_class(cls)
for module in modules:
if module.__name__ == __name__:
continue
if hasattr(module, "unregister"):
module.unregister()
# Import modules
#################################################
def get_all_submodules(directory):
return list(iter_submodules(directory, directory.name))
def iter_submodules(path, package_name):
for name in sorted(iter_submodule_names(path)):
yield importlib.import_module("." + name, package_name)
def iter_submodule_names(path, root=""):
for _, module_name, is_package in pkgutil.iter_modules([str(path)]):
if is_package:
sub_path = path / module_name
sub_root = root + module_name + "."
yield from iter_submodule_names(sub_path, sub_root)
else:
yield root + module_name
# Find classes to register
#################################################
def get_ordered_classes_to_register(modules):
return toposort(get_register_deps_dict(modules))
def get_register_deps_dict(modules):
my_classes = set(iter_my_classes(modules))
my_classes_by_idname = {cls.bl_idname : cls for cls in my_classes if hasattr(cls, "bl_idname")}
deps_dict = {}
for cls in my_classes:
deps_dict[cls] = set(iter_my_register_deps(cls, my_classes, my_classes_by_idname))
return deps_dict
def iter_my_register_deps(cls, my_classes, my_classes_by_idname):
yield from iter_my_deps_from_annotations(cls, my_classes)
yield from iter_my_deps_from_parent_id(cls, my_classes_by_idname)
def iter_my_deps_from_annotations(cls, my_classes):
for value in typing.get_type_hints(cls, {}, {}).values():
dependency = get_dependency_from_annotation(value)
if dependency is not None:
if dependency in my_classes:
yield dependency
def get_dependency_from_annotation(value):
if blender_version >= (2, 93):
if isinstance(value, bpy.props._PropertyDeferred):
return value.keywords.get("type")
else:
if isinstance(value, tuple) and len(value) == 2:
if value[0] in (bpy.props.PointerProperty, bpy.props.CollectionProperty):
return value[1]["type"]
return None
def iter_my_deps_from_parent_id(cls, my_classes_by_idname):
if bpy.types.Panel in cls.__bases__:
parent_idname = getattr(cls, "bl_parent_id", None)
if parent_idname is not None:
parent_cls = my_classes_by_idname.get(parent_idname)
if parent_cls is not None:
yield parent_cls
def iter_my_classes(modules):
base_types = get_register_base_types()
for cls in get_classes_in_modules(modules):
if any(base in base_types for base in cls.__bases__):
if not getattr(cls, "is_registered", False):
yield cls
def get_classes_in_modules(modules):
classes = set()
for module in modules:
for cls in iter_classes_in_module(module):
classes.add(cls)
return classes
def iter_classes_in_module(module):
for value in module.__dict__.values():
if inspect.isclass(value):
yield value
def get_register_base_types():
return set(getattr(bpy.types, name) for name in [
"Panel", "Operator", "PropertyGroup",
"AddonPreferences", "Header", "Menu",
"Node", "NodeSocket", "NodeTree",
"UIList", "RenderEngine",
"Gizmo", "GizmoGroup",
])
# Find order to register to solve dependencies
#################################################
def toposort(deps_dict):
sorted_list = []
sorted_values = set()
while len(deps_dict) > 0:
unsorted = []
for value, deps in deps_dict.items():
if len(deps) == 0:
sorted_list.append(value)
sorted_values.add(value)
else:
unsorted.append(value)
deps_dict = {value : deps_dict[value] - sorted_values for value in unsorted}
return sorted_list
def ensure_site_packages(packages):
""" `packages`: list of tuples (<import name>, <pip name>) """
if not packages:
return
import site
import importlib
import importlib.util
user_site_packages = site.getusersitepackages()
if not user_site_packages in sys.path:
sys.path.append(user_site_packages)
modules_to_install = [module[1] for module in packages if not importlib.util.find_spec(module[0])]
if not modules_to_install:
return
if bpy.app.version < (2,91,0):
python_binary = bpy.app.binary_path_python
else:
python_binary = sys.executable
import subprocess
subprocess.run([python_binary, '-m', 'ensurepip'], check=True)
subprocess.run([python_binary, '-m', 'pip', 'install', *modules_to_install, "--user"], check=True)
importlib.invalidate_caches()
\ No newline at end of file
import os
import numpy as np
from skimage.measure import marching_cubes
import logging
log = logging.getLogger("SmoPa3D")
from .gcode.parser import GcodeCommand
from .node import Node, join_nodes, assign_values
from .geometry import width_model
class Command:
"""Represents a command from the gcode file"""
def __init__(self, network, gcode:GcodeCommand, id:int) -> None:
self.network = network
self.gcode = gcode
self.id = id
self.nodes:list[Node] = []
self._vertices:np.ndarray = None
self._faces:np.ndarray = None
self.vertices_filepath:str = None
self.faces_filepath:str = None
# Update network to include this command
self.network.commands[id] = self
# Calculate distribution of nodes along the command
self.start = np.array((self.gcode.last_position['x'], self.gcode.last_position['y'], self.gcode.last_position['z']))
self.end = np.array((self.gcode.x if self.gcode.x is not None else self.gcode.last_position['x'], self.gcode.y if self.gcode.y is not None else self.gcode.last_position['y'], self.gcode.z if self.gcode.z is not None else self.gcode.last_position['z']))
self.trajectory_length = np.linalg.norm(self.end - self.start)
# Calculate properties
self.e = self.gcode.e
self.feedrate = (self.e - self.gcode.last_ocurrence('e')) / self.trajectory_length
self.speed = self.gcode.f if self.gcode.f is not None else self.gcode.last_ocurrence('f')
nominal_width = width_model(self.network.temperature, self.feedrate, self.speed)
self.node_distance = nominal_width * self.network.node_distance
self.qtd_nodes = round(self.trajectory_length / self.node_distance) + 1
self.nodes_coords = np.linspace(self.start, self.end, self.qtd_nodes)
if (self.start[0], self.start[1], self.start[2]) in self.network.coords: # Remove coincident starts and endings
self.nodes_coords = self.nodes_coords[1:]
self.qtd_nodes -= 1
if self.qtd_nodes <= 0: return
# volume_distribution = volume_profile(self.qtd_nodes, self.trajectory_length, self.speed) * self.total_extruded_volume
volume_distribution = np.ones(self.qtd_nodes)/self.qtd_nodes * self.total_extruded_volume
# Assign layer
self.layer = self.network.get_layer(self.start[2])
# Initialize nodes
for coord, volume in zip(self.nodes_coords, volume_distribution):
Node(self.network, self, self.layer, coord[0], coord[1], coord[2], volume)
@property
def total_extruded_volume(self,
constant:float = 0.0025726112346777796,
feedrate_multiplier:float = 1.812969202377806
) -> float:
"""Volume, in mm^3, of the filament deposited in the node
@param constant: constant value to calculate the area of the profile. Value retrieved experimentally
@param feedrate_multiplier: multiplier value to calculate the area of the profile. Value retrieved experimentally"""
area = constant + feedrate_multiplier * self.feedrate
volume = area * self.trajectory_length * self.network.extrusion_multiplier
# volume = self.feedrate * self.trajectory_length * np.pi * 1.75 ** 2 / 4 * self.network.extrusion_multiplier
return volume
@property
def vertices(self) -> np.ndarray:
"""Vertices of the mesh. If the vertices are not saved, returns None"""
if self._vertices is None:
if self.vertices_filepath is not None:
return np.load(self.vertices_filepath)
else:
return None
else:
return self._vertices
@property
def faces(self) -> np.ndarray:
"""Vertices of the mesh. If the vertices are not saved, returns None"""
if self._faces is None:
if self.faces_filepath is not None:
return np.load(self.faces_filepath)
else:
return None
else:
return self._faces
def volume_profile(
qtd_nodes:int,
distance:float=100,
nozzle_speed:float=1000,
jerk:float=20,
acceleration:float=500
):
times = []
umax = min(nozzle_speed, np.sqrt(jerk ** 2 + acceleration * distance))
s_ramp = (umax ** 2 - jerk ** 2) / (2 * acceleration)
for s in np.linspace(0, distance, qtd_nodes+1)[1:]:
if s <= s_ramp:
roots = np.roots((acceleration/2, jerk, -s))
roots = min(roots[roots > 0].real)
times.append(roots)
elif s <= (distance - s_ramp):
times.append((umax - jerk) / acceleration + (s - s_ramp) / umax)
else:
roots = np.roots((-acceleration/2, umax, (distance - s_ramp - s)))
roots = min(roots[roots > 0].real)
times.append((umax - jerk) / acceleration + (distance - 2*s_ramp)/umax + roots)
total_time = times[-1]
discretization = [times[i] - times[i-1] if i > 0 else times[i] for i in range(len(times))]
return (discretization / total_time).astype(float)
def calculate_command_mesh(command:Command) -> tuple[str, str]:
"""Calculate the mesh of the command. Returns the saved path of the vertices and faces"""
if command.qtd_nodes <= 0:
return None, None
elif command.qtd_nodes == 1:
if command.nodes[0].pointcloud is None: return None, None
joined_pcl = command.nodes[0].pointcloud
command.nodes[0].wipe()
else: # If there are more than one node, join the pointclouds into one collection of points (shape = (n, 3) [x, y, z])
joined_pcl = np.vstack([join_nodes(command.nodes[0], node) for node in command.nodes]).astype(np.int64)
for node in command.nodes:
node.wipe()
if joined_pcl is None or len(joined_pcl) == 0:
return None, None
# Shift the pointcloud to remove the negative values
shift = joined_pcl.min(0) - [1, 1, 1]
joined_pcl -= shift
volume_size = (joined_pcl.max(0) + [5,5,5])[[2,1,0]]
# Populate the voxel with the pointcloud
voxel = assign_values(np.zeros(volume_size, dtype=bool), joined_pcl)
# Calculate the mesh
vertices, faces, _, _ = marching_cubes(voxel)
# Positioning in the environment
vertices = (vertices.astype(np.float32)[:, ::-1] + shift - command.network.env.node_grid_size // 2 - 1) * command.network.env.resolution + command.nodes[0].coord
# Save the mesh to disk so it does not occupy memory
vert_path = os.path.join(command.network.saving_path, "commands", f"{command.id}", "vertices.npy")
face_path = os.path.join(command.network.saving_path, "commands", f"{command.id}", "faces.npy")
os.makedirs(os.path.dirname(vert_path), exist_ok=True)
os.makedirs(os.path.dirname(face_path), exist_ok=True)
np.save(vert_path, vertices)
np.save(face_path, faces)
del vertices, faces, joined_pcl
return vert_path, face_path
class Layer:
"""Represents a layer of the print"""
def __init__(self, network, z:float) -> None:
self.network = network
self.z = z
self.commands:list[Command] = []
self.nodes:list[Node] = []
def wipe_memory(self) -> None:
"""Delete the nodes from the memory"""
for node in self.nodes:
node.wipe()
log.info(f'Layer {self.z} wiped')
# -*- coding: utf-8 -*-
__all__ = ["factory", "runtime", "delay"]
import os
import time
import random
import functools
from memory_profiler import memory_usage
import matplotlib.pyplot as plt
import logging
log = logging.getLogger('SmoPa3D')
def factory(**default_opts):
"""
Factory function to create decorators for tasks's run methods. Default options for the decorator
function can be given in *default_opts*. The returned decorator can be used with or without
actual invocation. Example:
.. code-block:: python
@factory(digits=2)
def runtime(fn, opts, task, *args, **kwargs):
t0 = time.time()
try:
return fn(task, *args, **kwargs)
finally:
t1 = time.time()
diff = round(t1 - t0, opts["digits"])
print("runtime:")
print(diff)
...
class MyTask():
@runtime
def run(self):
...
# or
@runtime(digits=3):
def run(self):
...
"""
def wrapper(decorator):
@functools.wraps(decorator)
def wrapper(fn=None, **opts):
_opts = default_opts.copy()
_opts.update(opts)
def wrapper(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
return decorator(fn, _opts, *args, **kwargs)
return wrapper
return wrapper if fn is None else wrapper(fn)
return wrapper
return wrapper
@factory(digits=2)
def runtime(fn, opts, *args, **kwargs):
"""
Decorator for inspecting a methods performance with a precision of *digits=2*.
"""
t0 = time.time()
try:
return fn(*args, **kwargs)
finally:
t1 = time.time()
if (t1-t0) < 2:
diff = round(1000*(t1 - t0), opts["digits"])
log.debug(f"{fn.__name__} runtime: {diff} ms")
else:
diff = round((t1 - t0), opts["digits"])
log.debug(f"{fn.__name__} runtime: {diff} s")
@factory(t=5, stddev=0, pdf="gauss")
def delay(fn, opts, *args, **kwargs):
""" delay(t=5, stddev=0., pdf="gauss")
Wraps a bound method of a task and delays its execution by *t* seconds.
"""
if opts["stddev"] <= 0:
t = opts["t"]
elif opts["pdf"] == "gauss":
t = random.gauss(opts["t"], opts["stddev"])
elif opts["pdf"] == "uniform":
t = random.uniform(opts["t"], opts["stddev"])
else:
raise ValueError(
"unknown delay decorator pdf '{}'".format(opts["pdf"]))
time.sleep(t)
return fn(*args, **kwargs)
class Tracker:
def __init__(self) -> None:
self.runtime:list[float] = []
# self.memory:list[float] = []
def plot_results(self, function_name:str, path:str='./__runtime_stats__'):
os.makedirs(path, exist_ok=True)
fig, axs = plt.subplots(1, 1, layout='constrained')
# Plot runtime
axs.plot(self.runtime)
axs.set_ylabel('Runtime [ms]')
axs.set_xlabel('Iterations')
axs.set_title('Runtime')
# # Plot memory
# axs[1].plot(self.memory)
# axs[1].set_ylabel('Memory [MiB]')
# axs[1].set_xlabel('Iterations')
# axs[1].set_title('Memory')
print(f'Code statistics saved in {os.path.abspath(path)}')
fig.savefig(f"{path}/{function_name}.png")
@factory()
def track(fn, opts, *args, **kwargs):
"""Keep track of the runtime of a function over time"""
if f"{fn.__name__}_tracker" not in globals().keys():
globals()[f"{fn.__name__}_tracker"] = Tracker()
tracker:Tracker = globals()[f"{fn.__name__}_tracker"]
t0 = time.time()
try:
# memory = max(memory_usage((fn, [*args], {**kwargs})))
return fn(*args, **kwargs)
finally:
t1 = time.time()
tracker.runtime.append(round((t1-t0)*1000, 1))
# if memory is not None: tracker.memory.append(memory)
def save_tracking_stats():
"""Save the statistics in charts"""
to_delete = []
for varname, value in globals().items():
if isinstance(value, Tracker):
value.plot_results(function_name=varname)
to_delete.append(varname)
for varname in to_delete:
del globals()[varname]
\ No newline at end of file
# -*- coding: utf-8 -*-
"""
@author: grh
Project: SmoPa3D
Description: This code is used for the manipulation of the gcode in order to generate slices
"""
import pandas as pd
from random import sample, randint
import re
import numpy as np
from scipy.interpolate import interp1d
import os
#===============================================================================
## Generating of gcode slices in order to send them to the printer
#===============================================================================
def convertGcode2Slices(rel_path = "/data/printjobs/test_part.gcode"):
with open(os.path.join(os.environ['ROOT'], rel_path), "r") as gcode:
data = gcode.read() ### read in the G-Code
##Get the layer height
layerheight_start = re.search(";Layer height: ", data).end()
layerheight_end = re.search(";MINX:", data).start()
layerheight = float(data[layerheight_start:layerheight_end])
##Get the number of Layers
numberOfLayers_start = re.search(";LAYER_COUNT:", data).end()
numberOfLayers_end = re.search(";LAYER:0", data).start()
numberOfLayers = int(data[numberOfLayers_start:numberOfLayers_end])
numberOfLayers = numberOfLayers - 1 ## subtracting -1 in order to get the real number of layers (the counting starts at 0 -> for example if 169 layers are displayed, the last layer ist layer 168)
##Get YMin und YMax for to select the measuring area
#Getting YMin
numberOfLayers_start = re.search(";MINY:", data).end()
numberOfLayers_end = re.search(";MINZ:", data).start()
YMin = float(data[numberOfLayers_start:numberOfLayers_end])
#Getting YMax
numberOfLayers_start = re.search(";MAXY:", data).end()
numberOfLayers_end = re.search(";MAXZ:", data).start()
YMax = float(data[numberOfLayers_start:numberOfLayers_end])
##Seperate the first Layer from the G-Code
numberOfLayers_end = re.search(";LAYER:1", data).start()
slices = np.array(str(data[:numberOfLayers_end]))
i = 1 ## starting the counter at 1 because the 0 layer was already seperated
while i <= numberOfLayers:
## Select start and end Layer (code is searching for ";LAYER:i" and "LAYER:i+1" in order to seperate each layer)
startLayer = re.search(";LAYER:" + str(i), data).start() ## getting the number of the character of the current layer in order to extract the layer
## Seperate the other Layers from the G-Code
if i < numberOfLayers:
endLayer = re.search(";LAYER:" + str(i+1), data).start() ## getting the number of the character of the next layer in order to extract the layer
slices = np.append(slices, (str(data[startLayer:endLayer]))) ## seperating the layer and appending it to the numpy array
## Seperate the last Layer from the G-Code
else:
slices = np.append(slices, (str(data[startLayer:]))) ## seperating the layer and appending it to the numpy array
i += 1
return numberOfLayers, layerheight, YMin, YMax, slices
#===============================================================================
## Generating of gcode with randomly induced defects (pores)
#===============================================================================
def convertGcode2SlicesWithDefects(rel_path = "/data/printjobs/test_part.gcode", output_path = "/data/printjobs/test_part_defective.gcode"):
with open(os.path.join(os.environ['ROOT'], rel_path), "r") as gcode:
data = gcode.read() ### read in the G-Code
##Get the layer height
layerheight_start = re.search(";Layer height: ", data).end()
layerheight_end = re.search(";MINX:", data).start()
layerheight = float(data[layerheight_start:layerheight_end])
##Get the number of Layers
numberOfLayers_start = re.search(";LAYER_COUNT:", data).end()
numberOfLayers_end = re.search(";LAYER:0", data).start()
numberOfLayers = int(data[numberOfLayers_start:numberOfLayers_end])
numberOfLayers = numberOfLayers - 1 ## subtracting -1 in order to get the real number of layers (the counting starts at 0 -> for example if 169 layers are displayed, the last layer ist layer 168)
##Get YMin und YMax for to select the measuring area
#Getting YMin
numberOfLayers_start = re.search(";MINY:", data).end()
numberOfLayers_end = re.search(";MINZ:", data).start()
YMin = float(data[numberOfLayers_start:numberOfLayers_end])
#Getting YMax
numberOfLayers_start = re.search(";MAXY:", data).end()
numberOfLayers_end = re.search(";MAXZ:", data).start()
YMax = float(data[numberOfLayers_start:numberOfLayers_end])
with open(os.path.join(os.environ['ROOT'], rel_path), "r") as gcode:
## Convert gcode into a Pandas Dataframe in order to process it
lines = gcode.readlines()
lines = [line.strip() for line in lines]
df = pd.Series(lines)
## get the locations in the gcode, which contain those strings in order to seperate the code regarding the type which is printed
locations = df.loc[df.str.contains(";MESH:NONMESH|;TYPE:SKIN|;TYPE:FILL|;TYPE:WALL-INNER|;TYPE:WALL-OUTER", case= False)].index.values
i = 0
## sets the resolution/size of the defects in mm
resolution = 1
## go through each location. If its SKIN oder FILL, defects will be generated
while i < (len(locations)-1):
if df[locations[i]] == ";TYPE:SKIN" or df[locations[i]] == ";TYPE:FILL":
## get each for each command block the commands in which material is extruded
position_commands = df[(locations[i]+2):locations[i+1]].loc[df.str.contains("G1", case= False)].index.values
position_commands = df[position_commands].loc[df.str.contains("X", case= False)].index.values
position_commands = df[position_commands].loc[df.str.contains("E", case= False)].index.values
## creates random numbers in range of the len of the position commands in order get a random entry from position commands
len_commands = len(position_commands)
random_number_array = sample(range(0, len_commands), int(len_commands/20))
random_number_array.sort(reverse=True)
## see if there are any random numbers created
if random_number_array != []:
## go throug each printing section (FILL or SKIN) and create defects
j = 0
while j < int(len_commands/20):
## Gets the x-position, y-position and e-position in order to get the starting point of the considered command
## Get X
X_start = re.search("X", df[position_commands[random_number_array][j]]).end()
X_end = re.search("Y", df[position_commands[random_number_array][j]]).start()
x = float(df[position_commands[random_number_array][j]][X_start:X_end])
## Get Y
Y_start = re.search("Y", df[position_commands[random_number_array][j]]).end()
Y_end = re.search("E", df[position_commands[random_number_array][j]]).start()
y = float(df[position_commands[random_number_array][j]][Y_start:Y_end])
## Get E
E_start = re.search("E", df[position_commands[random_number_array][j]]).end()
e = float(df[position_commands[random_number_array][j]][E_start:])
## Create a coordinate variable
coordinate = np.array((x,y))
## Gets the previous x-position and y-position in order to get the starting point of the previous command
## Get X
X_start = re.search("X", df[position_commands[random_number_array][j]-1]).end()
X_end = re.search("Y", df[position_commands[random_number_array][j]-1]).start()
prev_x = float(df[position_commands[random_number_array][j]-1][X_start:X_end])
## Get Y
Y_start = re.search("Y", df[position_commands[random_number_array][j]-1]).end()
if re.search("E", df[position_commands[random_number_array][j]-1]) is not None:
Y_end = re.search("E", df[position_commands[random_number_array][j]-1]).start()
prev_y = float(df[position_commands[random_number_array][j]-1][Y_start:Y_end])
else:
prev_y = float(df[position_commands[random_number_array][j]-1][Y_start:])
## Create a coordinate variable
prev_coordinates = np.array((prev_x,prev_y))
## gets the total distance between the considered commands ("coordinates" and "prev_coordinates")
dist_total = np.linalg.norm(coordinate-prev_coordinates)
## if the total distance is lower than 2mm the command will be executed but no filament is extruded -> the defect is then max 2 mm long
if dist_total < 2:
df[position_commands[random_number_array][j]] = "G1 X" + str(x) + " Y" + str(y) + " \nG92 E" + str(e)
## if the total distance is higher than 2mm the "line" will be interupted on a random position and a defect will be inserted (nearly 1 mm long)
elif dist_total > 2:
## Interpolation of the previous coordinates (from the previous command) and the current coordinates (from the considered command)
prev_e = e - (dist_total * 0.033) ## 0.033 is the distance the E-axis is moving per mm
increment = int(dist_total/resolution) ## total number of points which are created in the interpolated array
x_interplt=[prev_x,x]
y_interplt=[prev_y,y]
f=interp1d(x_interplt,y_interplt)
x_coordinates = np.linspace(prev_coordinates[0],coordinate[0],increment)
y_coordinates = f(x_coordinates)
## creates a random number in order to select on position in the created "line" of the interpolation
random_number = randint(1, (len(x_coordinates)-1))
## gets the coordinate before the "defect"
coordinate_before_defect = np.array((x_coordinates[random_number-1], y_coordinates[random_number-1]))
## gets the coordinate after the "defect"
coordinate_after_defect = np.array((x_coordinates[random_number], y_coordinates[random_number]))
## gets the distance between the previous coordinate and the coordinate before the "defect" in order to calculate the extrusion
dist_1 = np.linalg.norm(prev_coordinates-coordinate_before_defect)
e_value_1 = round(prev_e + (0.033 * dist_1),5)
## gets the distance between the the coordinate before the "defect" and the coordinate after the "defect" in order to calculate the "extrusion"
dist_2 = np.linalg.norm(coordinate_before_defect-coordinate_after_defect)
e_value_2 = round(e_value_1 + (0.033 * dist_2),5)
## if the random number is 1 and therefore the defect is just in the beginning of the line, the new code is generated
## The defect in a 3 mm line looks like this |x--| (with - as printed normal and x as not printed and therefore a defect)
if random_number == 1:
df[position_commands[random_number_array][j]] = "G0 X" + str(coordinate_after_defect[0]) + " Y" + str(coordinate_after_defect[1]) + " \nG92 E" + str(e_value_2) + " \n" + df[position_commands[random_number_array][j]]
## The defect in a 3 mm line looks like this |--x| (with - as printed normal and x as not printed and therefore a defect)
elif random_number == (len(x_coordinates)-1):
df[position_commands[random_number_array][j]] = "G1 X" + str(coordinate_before_defect[0]) + " Y" + str(coordinate_before_defect[1]) + " E" + str(e_value_1) + " \nG0 X" + str(x) + " Y" + str(y) + " \nG92 E" + str(e)
## The defect in a 3 mm line looks like this |-x-| (with - as printed normal and x as not printed and therefore a defect)
else:
df[position_commands[random_number_array][j]] = "G1 X" + str(coordinate_before_defect[0]) + " Y" + str(coordinate_before_defect[1]) + " E" + str(e_value_1) + " \nG0 X" + str(coordinate_after_defect[0]) + " Y" + str(coordinate_after_defect[1]) + " \nG92 E" + str(e_value_2) + " \n" + df[position_commands[random_number_array][j]]
j +=1
i += 1
## Convert Pandas Dataframe back into str in order to be processed as gcode
newlines = df.values.tolist()
newgcode = "\n".join(newlines)
##Seperate the first Layer from the G-Code
numberOfLayers_end = re.search(";LAYER:1", newgcode).start()
slices = np.array(str(newgcode[:numberOfLayers_end]))
i = 1 ## starting the counter at 1 because the 0 layer was already seperated
while i <= numberOfLayers:
## Select start and end Layer (code is searching for ";LAYER:i" and "LAYER:i+1" in order to seperate each layer)
startLayer = re.search(";LAYER:" + str(i), newgcode).start() ## getting the number of the character of the current layer in order to extract the layer
## Seperate the other Layers from the G-Code
if i < numberOfLayers:
endLayer = re.search(";LAYER:" + str(i+1), newgcode).start() ## getting the number of the character of the next layer in order to extract the layer
slices = np.append(slices, (str(newgcode[startLayer:endLayer]))) ## seperating the layer and appending it to the numpy array
## Seperate the last Layer from the G-Code
else:
slices = np.append(slices, (str(newgcode[startLayer:]))) ## seperating the layer and appending it to the numpy array
i += 1
#file = open(os.path.join(os.environ['ROOT'],utput_path), "w")
#file.write(newgcode)
#file.close
return numberOfLayers, layerheight, YMin, YMax, slices
if __name__ == '__main__':
while True:
try:
convertGcode2SlicesWithDefects()
except AttributeError:
continue
break
\ No newline at end of file
# TODO:
# 1. Functions for each of the defects
# a. ABC
# b. Over extrusion
# c. Under extrusion
# d. Void
# e. Geometric deviation
from abc import ABC, abstractmethod
from .parser import Command, Gcode
class Defect(ABC):
def __init__(self, incidence_ratio:float, intensity:float) -> None:
"""Parameters:
-----
* @param incidence_ratio ∈ [0, 1]: percentage of the gcode commands upon which the defect may incide\n
* @param intensity ∈ [0, 1]: percentage of the intensity of the defect"""
super().__init__()
assert 0 <= incidence_ratio <= 1, "incidence_ratio should be a float between 0 and 1"
assert 0 <= intensity <= 1, "incidence_ratio should be a float between 0 and 1"
self.incidence_ratio = incidence_ratio
self.intensity = intensity
@abstractmethod
def apply(self, command:Command) -> None:
pass
class OverExtrusion(Defect):
def apply(self, command:Command) -> None:
if not (type(command) == float or type(command) == float): return
last_coords = command.get_last_position()
original_extrusion = command.e - last_coords['e']
command.e += original_extrusion * self.intensity
if __name__ == '__main__':
gcode = Gcode('data/defects_scans/single_line_cross/no_defect/CE3PRO_single_line_cross.gcode')
defects = [OverExtrusion(1, 1)]
gcode.apply_defects(defects)
gcode.save('./source/gcode/gcode_with_defects.gcode')
\ No newline at end of file
"""
@author: grh, grh_mh
Project: SmoPa3D
Description: This code is used to generate a nominal out of the gcode given by a slicer.
"""
from scipy.spatial.transform import Rotation as R
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import re
import logging
from itertools import repeat
import multiprocessing
log = logging.getLogger('SmoPa3D')
def gcode2dataframe(gcode_file_path:str):
"""Reads a gcode file and transforms it into a pandas dataframe, each row representing a command.\n
Dataframe format:\n
``` bash
+-------+---------+---------+---------+-----------+---------+-------+
| index | X-Value | Y-Value | Z-Value | Extrusion | Command | Layer |
+-------+---------+---------+---------+-----------+---------+-------+
| int | float | float | float | float | int | int |
"""
with open(gcode_file_path, "r") as gcode:
data = gcode.read() ### read in the G-Code
numberOfLayers_start = re.search(";LAYER_COUNT:", data).end()
numberOfLayers_end = re.search(";LAYER:0", data).start()
numberOfLayers = int(data[numberOfLayers_start:numberOfLayers_end])
lines = [line.strip() for line in data.split('\n')]
data_size = len(lines)
df = pd.Series(lines)
gcode_values = np.zeros(((data_size),6))
amount_extracted_values = 0
amount_extracted_values_start_layer = 0
for i in range(numberOfLayers):
### Selection of the i-th layer and separation of the code from the entire code block
startLayer = (";LAYER:" + str(i))
if i < (numberOfLayers-1):
startLayerplusone = (";LAYER:" + str(i+1))
layer_start = df[df==str(startLayer)].index.values[0]
layer_end = df[df==str(startLayerplusone)].index.values[0]
layer = df[(layer_start+1):layer_end]
elif i == (numberOfLayers-1):
layer = df[(layer_start+1):]
### Separation of only G-code where extrude and process
layer = layer.drop(layer.loc[layer.str.contains(";", case= False)].index.values)
layer = layer.drop(layer.loc[layer.str.contains("M20", case= False)].index.values)
layer = layer.drop(layer.loc[layer.str.contains("Z", case= False)].index.values) ## Skipping commands in which Z changes
layer = layer.drop(layer.loc[~layer.str.contains("Y", case= False)].index.values)
layer = layer.drop(layer.loc[~layer.str.contains("X", case= False)].index.values)
for (j, position) in enumerate(layer):
## Extraction of position in X
X_start = re.search("X", position).end()
X_end = re.search("Y", position).start()
X_position = float(position[X_start:X_end])
## Extraction of position in Y
Y_start = re.search("Y", position).end()
if re.search("E", position) is not None:
Y_end = re.search("E", position).start()
Y_position = float(position[Y_start:Y_end])
else:
Y_position = float(position[Y_start:])
## Extraction of position in Z
Z_position = 0.2 + (i*0.2)
## Extraction of feedrate
if (re.search("E", position) is None) != True:
Extrusion_start = re.search("E", position).end()
Extrusion = float(position[Extrusion_start:])
else: Extrusion= float(0.00)
## Assignment of layer count to the values
numberOfLayer = i
## Assignment of the command number to the values
numberOfCommand = j
extracted_value = np.array((X_position,Y_position,Z_position, Extrusion, numberOfCommand, numberOfLayer))
gcode_values[amount_extracted_values: (amount_extracted_values + 1)] = extracted_value
amount_extracted_values += 1
gcode_values = gcode_values[:amount_extracted_values]
df2 = pd.DataFrame(gcode_values)
df2 = df2.set_axis(("X-Value", "Y-Value", "Z-Value", "Extrusion", "Command", "Layer"), axis='columns')
return df2
def calculate_profile_dimensions(method:str, D:float=0.4, V_over_U:float=1.58, Rn:float=1, gap:float=0.2, alpha:float=1.505) -> tuple[float, float]:
"""Calculates height and width of the strand. Available options:\n
* hebda
* xu
* comminal
References:\n
* [COMMINAL et al., 2018](https://doi.org/10.1016/j.addma.2017.12.013)\n
* [Hebda et al., 2019](http://hdl.handle.net/10454/16895)\n
* [Xu et al., 2022](https://hal-mines-paristech.archives-ouvertes.fr/hal-03766358)"""
if method == 'hebda':
W = D * alpha * np.sqrt(1/V_over_U)
H = D / alpha * np.sqrt(1 / V_over_U)
elif method == 'xu':
W = D * (1 - Rn/D + np.sqrt((Rn/D - 1)**2 + np.pi * D / (gap * V_over_U) * (Rn/D - 0.5)))
H = D**2 / (V_over_U * W)
elif method == 'comminal':
W = np.pi / 4 * 1 / V_over_U * D**2 / gap + gap * (1 - np.pi / 4)
H = gap
else:
log.warning('Calculation method for profile dimensions do not exist. Possible options are: hebda, xu and comminal')
W = 0
H = 0
return W, H
def draw_profile(geometry:str='ellipse', width:float=0.374109, height:float=0.266511, resolution:float=0.07) -> pd.DataFrame:
"""Creates a pinned ellipse profile in 2D to simulate the extrusion profile. It is made in Y = 0 plane.\n
Available format options:\n
* oblong
* ellipse
* pinned ellipse
* ellipse oblong
References:\n
* [COMMINAL et al., 2018](https://doi.org/10.1016/j.addma.2017.12.013)\n
* [Hebda et al., 2019](http://hdl.handle.net/10454/16895)\n
* [Xu et al., 2022](https://hal-mines-paristech.archives-ouvertes.fr/hal-03766358)"""
df = pd.DataFrame(columns=['x', 'y', 'z'])
z_vector = np.linspace(0, height, int(height / resolution) + 2)
# Draw contours right side
if geometry == 'oblong':
for z in z_vector:
angle = np.arcsin((z - height / 2) * 2/ height)
x = np.cos(angle) * height / 2 + (width - height) / 2
new_row = pd.DataFrame([[x, 0, z]], columns=['x', 'y', 'z'])
df = pd.concat([df, new_row])
elif geometry == 'ellipse':
for z in z_vector:
angle = np.arcsin((z - height / 2) * 2 / height)
x = np.cos(angle) * width / 2
new_row = pd.DataFrame([[x, 0, z]], columns=['x', 'y', 'z'])
df = pd.concat([df, new_row])
elif geometry == 'pinned ellipse':
x = 0
for z in z_vector[::-1]:
if x <= 0.95 * width / 2: # Upper side (ellipse)
angle = np.arcsin((z - height / 2) * 2 / height)
x = np.cos(angle) * width / 2
new_row = pd.DataFrame([[x, 0, z]], columns=['x', 'y', 'z'])
df = pd.concat([df, new_row])
else: # Lower side (rectangle)
new_row = pd.DataFrame([[x, 0, z]], columns=['x', 'y', 'z'])
df = pd.concat([df, new_row])
elif geometry == 'ellipse oblong':
for z in z_vector:
if z >= height / 2: # Upper side (ellipse)
angle = np.arcsin((z - height / 2) * 2 / height)
x = np.cos(angle) * width / 2
new_row = pd.DataFrame([[x, 0, z]], columns=['x', 'y', 'z'])
df = pd.concat([df, new_row])
else: # Lower side (oblong)
angle = np.arcsin((z - height / 2) * 2 / height)
x = np.cos(angle) * height/ 2 + (width - height) / 2
new_row = pd.DataFrame([[x, 0, z]], columns=['x', 'y', 'z'])
df = pd.concat([df, new_row])
# Mirror to the left side
df = pd.concat([df, df * [-1, 1, 1]])
# Fill up
for z in z_vector:
x = df[df.z == z].x.to_list()
x_vector = np.linspace(min(x), max(x), int((max(x) - min(x)) / resolution))[1:-1]
z_row = np.ones_like(x_vector) * z
new_row = new_row = pd.DataFrame.from_dict({'x':x_vector, 'y':np.zeros_like(x_vector), 'z':z_row})
df = pd.concat([df, new_row])
return df
def draw_rounded_ending(profile:pd.DataFrame, steps:int=7):
"""Draw a rounded ending by revoluting half of the profile in its Z axis"""
ending = pd.DataFrame(columns=profile.columns)
one_side_profile = profile[profile.x < 0]
for angle in np.linspace(0, np.pi, steps)[1:-1]:
r = R.from_euler('z', angle, False)
rotated = one_side_profile.to_numpy() @ r.as_matrix()
df_rotated = pd.DataFrame(rotated, columns=profile.columns)
ending = pd.concat([ending, df_rotated])
return ending
def show_profile(profile:pd.DataFrame, axis:tuple=['x', 'z']):
"""Plot the profile to check it graphically"""
fig, ax = plt.subplots()
ax.plot(profile[axis[0]], profile[axis[1]], 'ko')
ax.set_box_aspect(1)
ax.set_title('profile')
plt.gca().set_aspect('equal')
plt.show()
def draw_tubular_point_cloud(pt0:tuple[float], pt1:tuple[float], profile:pd.DataFrame, resolution:float):
"""Given two coordinates and a profile, returns a DataFrame representing a tubular point cloud between them.\n
Args:\n
- points = [x, y, z]\n
- profile = pd.DataFrame(columns=['x', 'y', 'z'])
"""
x0, y0, z0 = pt0
x1, y1, z1 = pt1
angle = -np.arctan2(y1 - y0, x1 - x0)
r = R.from_euler('z', angle + np.pi/2, False).as_matrix()
rotated = profile.to_numpy() @ r
dist = np.linalg.norm(np.array(pt1) - np.array(pt0))
segments = int(dist/resolution) + 2
X = np.repeat(np.linspace(x0, x1, num=segments), len(rotated))
Y = np.repeat(np.linspace(y0, y1, num=segments), len(rotated))
Z = np.array([z0 for i in range(len(X))])
pcl = np.array([X, Y, Z]).transpose() + np.tile(rotated, (segments, 1))
df_pcl = pd.DataFrame(pcl, columns=['X', 'Y', 'Z'])
# Add rounded endings to both sides of tubular point cloud
ending_part = draw_rounded_ending(profile)
ending = pd.DataFrame(ending_part.to_numpy() @ r + np.array(pt1), columns=df_pcl.columns)
opposite_r = R.from_euler('z', angle - np.pi/2, False).as_matrix()
beginning = pd.DataFrame(ending_part.to_numpy() @ opposite_r + np.array(pt0), columns=df_pcl.columns)
df_pcl = pd.concat([df_pcl, beginning, ending])
return df_pcl
def build_command(index:int, commands:pd.DataFrame, profile:pd.DataFrame, resolution:float):
"""Builds one line of gcode command"""
start_row = commands.iloc[index - 1] if index > 0 else commands.iloc[0]
start_point = [start_row['X-Value'], start_row['Y-Value'], start_row['Z-Value']]
end_point = [commands.iloc[index]['X-Value'], commands.iloc[index]['Y-Value'], commands.iloc[index]['Z-Value']]
pcl = draw_tubular_point_cloud(start_point, end_point, profile, resolution)
return pcl
def build_part(commands:pd.DataFrame, profile:pd.DataFrame, resolution:float, processes:int=2):
"""Builds every line in gcode commands. Number of processes can be changed, but with caution."""
with multiprocessing.Pool(processes) as pool:
df_pcl = pd.concat(pool.starmap(build_command, zip(commands[commands.Extrusion > 0].index, repeat(commands), repeat(profile), repeat(resolution))))
return df_pcl
# if __name__ == '__main__':
# import generate_pointcloud as gp
# import utils
# import os
# start = time.time()
# gcode_path = utils.open_or_download('63847798504e7d63865e5769')
# df = gcode2dataframe(gcode_path, create_pcl_for_each_layer=False)
# resol = 0.07
# W, H = calculate_profile_dimensions('hebda')
# profile = draw_profile('oblong', W, H, resolution=resol)
# pcl = build_part(df, profile, resol)
# path_to_pcl = gp.generate_pcl(pcl, path="files/", name = "00_nominal_part", LLS ="")
# log.info("I took {:.1f} s to generate realistic point cloud.".format(time.time() - start))
\ No newline at end of file
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Optional, TYPE_CHECKING
import os, random
import logging
log = logging.getLogger("SmoPa3D")
if TYPE_CHECKING:
from .defects import Defect
@dataclass
class GcodeCommand:
gcode: Gcode
id: int
command_line: Optional[str] = None
m: Optional[int] = None
s: Optional[int] = None
g: Optional[int] = None
f: Optional[float] = None
x: Optional[float] = None
y: Optional[float] = None
z: Optional[float] = None
e: Optional[float] = None
def __post_init__(self) -> None:
assert sum([self.command_line is not None, self.g is not None, self.m is not None]) == 1, "Either command_line or command must be input to Command class"
self.command_list = ['M', 'S', 'G', 'F', 'X', 'Y', 'Z', 'E']
if self.command_line[-1] == '\n': self.command_line = self.command_line[:-1]
if self.command_line is not None:
for arg in self.command_line.split(' '):
if arg[0] == ';': break
if arg[0] in self.command_list:
self.__dict__[arg[0].lower()] = float(arg[1:]) if len(arg[1:]) > 0 else ''
def __str__(self) -> str:
if not self.is_transformable: return self.command_line
parameters = []
for arg in self.command_list:
value = self.__dict__[arg.lower()]
if value is None:
continue
elif type(value) == str:
parameters.append(value)
else:
parameters.append(arg + str(round(value, 5)))
return ' '.join(parameters)
def format(self) -> str:
return self.__str__()
@property
def last_position(self, parameters_to_search:list[str]=['x', 'y', 'z', 'e']) -> dict:
"""Locate the last ocurrence of coordinates x, y, z, e in parent gcode. If no previous ocurrence is found, returns `None`\n
```
return {x: float, y: float, z: float, e: float}
```"""
coords = {param: None for param in parameters_to_search}
for i in range(self.id-1, 0, -1):
if all([pos is not None for pos in coords.values()]):
break
for coord in coords.keys():
if self.gcode.commands[i].__dict__[coord] is not None and self.gcode.commands[i].__dict__[coord] != '' and coords[coord] is None:
coords[coord] = self.gcode.commands[i].__dict__[coord]
return coords
def last_ocurrence(self, parameter:str) -> Optional[float]:
"""Locate the last ocurrence of the parameter in parent gcode. If no previous ocurrence is found, returns `None`"""
if parameter.upper() not in self.command_list:
log.warning(f'No previous ocurrence of parameter {parameter} not found in command list')
return None
for i in range(self.id-1, 0, -1):
if self.gcode.commands[i].__dict__[parameter] is not None and self.gcode.commands[i].__dict__[parameter] != '':
return self.gcode.commands[i].__dict__[parameter]
@property
def is_transformable(self) -> bool:
"""Check if command is transformable, i.e., if it:\n
* is not part of printer setup
* is not part of printer unset
* is not a comment
* is not M command
* has moved
* has extruded
"""
is_setup = self.id <= self.gcode.setup_end
is_unset = self.id >= self.gcode.unset_start
is_comment = self.command_line[0] == ';'
is_m_command = self.m is not None
has_moved = sum([self.x is not None, self.y is not None, self.z is not None]) > 0
has_extruded = self.e is not None
return not any([is_setup, is_unset, is_comment, is_m_command, not has_moved, not has_extruded])
def __setattr__(self, __name: str, __value: Any) -> None:
self.__dict__[__name] = __value
class Gcode:
def __init__(self, path:str) -> None:
assert os.path.exists(path), f"File path {os.path.abspath(path)} does not exist"
self.path = path
# Read gcode
with open(path, 'r') as gcodefile:
gcode = gcodefile.readlines()
self.gcode = gcode
self.setup_end = next(i for (i, row) in enumerate(self.gcode) if ';LAYER_COUNT' in row)
self.unset_start = next(i for i in range(len(self.gcode)-1, 0, -1) if ';TIME_ELAPSED' in self.gcode[i])
self.commands = [GcodeCommand(self, command_line=command, id=i) for (i, command) in enumerate(gcode) if len(command.replace('\n', '')) > 0]
def apply_defects(self, defectset:list[Defect], overlap:bool=False) -> None:
"""Transform gcode by applying randomly the set of defects. Together with the transformed gcode, it outputs a label in json format indicating the coordinates of each of the synthetized defects.\n
Parameters:
-----
* @param overlap: if false, each command can contain a maximum of 1 defect. If True, more than one defect can happen in each command\n"""
# if not overlap: assert if sum is equal or less than 1
transformable = [command for command in self.commands if command.is_transformable]
total_commands = len(transformable)
for defect in defectset:
sample = random.sample(transformable, int(defect.incidence_ratio * total_commands))
for command in sample:
if not overlap:
transformable.remove(command)
command = defect.apply(command)
def save(self, path:str=None) -> None:
if path is None:
path = self.path.replace('.gcode', '_with_defects.gcode')
with open(path, 'w') as fle:
for command in self.commands:
fle.write(command.format() + '\n')
\ No newline at end of file
import numpy as np
import open3d as o3d
from scipy.spatial import KDTree
from scipy.spatial.transform import Rotation as R
import logging
log = logging.getLogger("SmoPa3D")
def apply_revolution(f_of_z, height:int, width:int) -> np.ndarray:
"""Create a solid by revolutioning a function of z over the z-axis.
@param f_of_z: function that returns a float by given z value (in grid units)
@param height: length in z of the volume in grid units
@param width: length in x and y of the volume in grid units"""
if width % 2 == 0: width += 1 # Volume must be uneven
max_x = width // 2 + 1
solid = np.zeros((height, max_x, width), dtype=int) # z, y, x
for z in range(height):
for y in range(max_x):
if f_of_z(z) < y:
break
x_surface = round(np.sqrt(f_of_z(z) ** 2 - y ** 2))
x_array = [1 if x < x_surface else 0 for x in range(max_x)]
solid[z, y] = np.array([*x_array[::-1], *x_array[1:]])
solid = np.hstack((solid[:, ::-1], solid[:, 1:]))
return solid
def place_ellipsoid(space:np.ndarray, w:int, l:int, h:int, direction:np.ndarray, center:np.ndarray) -> np.ndarray:
"""Function to place an ellipsoid in a 3D space
@param space: 3D numpy array (environment)
@param w: width of the ellipsoid
@param l: length of the ellipsoid
@param h: height of the ellipsoid
@param direction: direction of the ellipsoid
@param center: center of the ellipsoid (z, y, x)"""
# # Create a rotation matrix from the direction vector
# enclosed_space = np.zeros((h, max(w, l), max(w, l)))
h += 1
direction = direction / np.linalg.norm(direction)
r = R.from_euler('z', np.arctan2(direction[1], direction[0]), degrees=False)
r = R.from_matrix(r.as_matrix()[::-1,::-1].T)
# Create a grid of points
z,y,x = np.meshgrid(np.arange(space.shape[0]), np.arange(space.shape[1]), np.arange(space.shape[2]), indexing='ij')
# Get the points relative to the center of the space
points = np.vstack((z.ravel(), y.ravel(), x.ravel())) - center.reshape(-1, 1)
# Rotate the points by the rotation matrix
points = r.apply(points.T).T
# Ellipsoid equation
inside = (points[0, :] / h) ** 2 + (points[1, :] / w) ** 2 + (points[2, :] / l) ** 2 < 0.98
# Cilinder equation
# inside = ((points[0, :] / h) ** 2 + (points[1, :] / w) ** 2 <= 1) * (abs(points[2, :] / l) <= 1)
# Set the value of the points inside the ellipsoid to 1
space.ravel()[inside] = 1
return space
def calculate_shell(voxel:np.ndarray) -> np.ndarray:
"""Remove the inner points of the geometry"""
dz, dy, dx = np.gradient(voxel)
grads = np.absolute(dz) + np.absolute(dy) + np.absolute(dx)
shell = grads * voxel
shell = np.where(shell != 0)
hull = np.array(shell[::-1]).transpose()
return hull
def calculate_mesh(pointcloud:np.ndarray, simplify_factor:int=32, resolution:float=0.02) -> tuple[np.ndarray]:
"""Calculate and simplify mesh by a given voxel. Returns vertices coordinates and a list of faces, each one given by a list of indexes of the vertices that make part of the face."""
basis = KDTree(pointcloud)
radius = resolution * (np.sqrt(3)+0.1)
neighbours_list = basis.query_ball_point(pointcloud, r=radius, p=2, return_length=False)
del basis
gradients = np.zeros((len(pointcloud), 3))
for i, neighbours in enumerate(neighbours_list):
if len(neighbours) >=27: continue
mean = np.mean(pointcloud[neighbours], axis=0)
gradients[i] = mean - pointcloud[i]
norm = np.linalg.norm(gradients[i])
gradients[i] = gradients[i] / norm if norm > 0 else np.zeros(3)
xyz = np.reshape(pointcloud[gradients.any(1)], (-1, 3))
normals = np.reshape(gradients[gradients.any(1)] * -1, (-1, 3))
del gradients, pointcloud
if len(normals) == 0: return np.zeros((0, 3)), np.zeros((0, 3))
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(xyz)
pcd.normals = o3d.utility.Vector3dVector(normals)
mesh, densities = o3d.geometry.TriangleMesh.create_from_point_cloud_poisson(pcd, depth=9, scale=2, n_threads=1)
# vertices_to_remove = densities < np.quantile(densities, 0.01)
# mesh.remove_vertices_by_mask(vertices_to_remove)
# Simplify mesh
if simplify_factor > 0:
voxel_size = max(mesh.get_max_bound() - mesh.get_min_bound()) / simplify_factor
mesh = mesh.simplify_vertex_clustering(
voxel_size=voxel_size,
contraction=o3d.geometry.SimplificationContraction.Average)
vertices = np.asarray(mesh.vertices)
faces = np.asarray(mesh.triangles)
pcd.clear()
mesh.clear()
densities.clear()
return vertices, faces
def reconstruct_pointcloud_mesh(pointcloud:np.ndarray, simplify_factor:int=32) -> tuple[np.ndarray]:
xyz = np.reshape(pointcloud, (-1, 3))
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(xyz)
pcd.estimate_normals()
last_normals = np.asarray(pcd.normals)
pcd.normals = o3d.utility.Vector3dVector(np.absolute(last_normals))
radii = [0.16, 0.32]
mesh = o3d.geometry.TriangleMesh.create_from_point_cloud_ball_pivoting(
pcd, o3d.utility.DoubleVector(radii))
# Simplify mesh
if simplify_factor > 0:
voxel_size = max(mesh.get_max_bound() - mesh.get_min_bound()) / simplify_factor
mesh = mesh.simplify_vertex_clustering(
voxel_size=voxel_size,
contraction=o3d.geometry.SimplificationContraction.Average)
vertices = np.asarray(mesh.vertices)
faces = np.asarray(mesh.triangles)
pcd.clear()
mesh.clear()
return vertices, faces
def shift_array(array:np.ndarray, shift_vector:np.ndarray):
if any([shift > array.shape[i] for (i, shift) in enumerate(shift_vector)]): return np.zeros_like(array)
H, W, D = array.shape
dx, dy, dz = shift_vector
dx, dy, dz = int(dx), int(dy), int(dz)
augmented_volume = np.zeros((H+abs(dz), W+abs(dy), D+abs(dx)))
augmented_volume[max(0, dz):H+max(0, dz), max(0, dy):W+max(0, dy), max(0, dx):D+max(0, dx)] = array
shifted_array = augmented_volume[max(0, -dz):H+max(0, -dz), max(0, -dy):W+max(0, -dy), max(0, -dx):D+max(0, -dx)]
return shifted_array
def visualize(voxel:np.array) -> None:
import matplotlib.pyplot as plt
fig = plt.figure()
ax = fig.add_subplot(projection='3d')
x, y, z = calculate_shell(voxel).transpose()
ax.scatter3D(x, y, z, c=z, cmap='Greens')
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
plt.show()
def width_model(temperature:float, feedrate:float, speed:float) -> float:
"""Returns the width of the filament in mm for a given temperature, speed and feedrate. Formulation is based on experimental values."""
root = np.cbrt(feedrate)
return (
-1.2411217231463934
+ 0.004062025031923957 * temperature
- 0.0001331731552701792 * speed
+ 2.984109335460154 * root
)
def height_model(area:float, width:float) -> float:
"""Returns the height of the filament in mm for a given area and width.
Calculation based on the area of the ellipse."""
return 2 * area / (np.pi * width / 2)
\ No newline at end of file
from __future__ import annotations
import bpy
from mathutils import Vector
import numpy as np
import logging
import itertools
from tqdm import tqdm
from tqdm.contrib.itertools import product
import concurrent.futures
from . import main
log = logging.getLogger('SmoPa3D')
class LaserLineScannerOperator(bpy.types.Operator):
bl_idname = "fdm_simulator.scan"
bl_label = "Simulate Laser Line Scanner"
def execute(self, context):
pcl = scan(draw=False, x_resolution=0.05, y_resolution=0.05, y_range=[40, 60])
np.save("virtual_scan.npy", pcl)
return {'FINISHED'}
def scan(
x_range: tuple[float, float] = (95.306, 97.306),
x_resolution: float = 0.02,
y_range: tuple[float, float] = (-10, 10),
y_resolution: float = 1,
angle_range: tuple[float, float] = (-0.243, 0.243),
z: float = 130,
draw: bool = False
) -> np.ndarray:
"""Simulate the laser line scanner. Returns the point cloud of the scan in the format `np.array([[x0, y0, z0], ..., [xn, yn, zn]])`.\n
Arguments:
@param x_range: The limits of the laser line scanner source in the x axis
@param x_resolution: The resolution of the x axis. Resolution of the lls is 0.003 ~ 0.05 mm according to the datasheet
@param y_range: The range of the y axis
@param y_resolution: The resolution of the y axis. Resolution of the encoder is
@param angle_range: The range of the angle of the lls. Default value assessed experimentally
@param z: The height of the lls. Default value assessed experimentally
@param draw: Whether to draw the beams and the point cloud in the scene
"""
bpy.context.view_layer.update()
width = 2 * z * np.tan(max(angle_range)) + abs(x_range[1] - x_range[0])
x_divisions = round(width / x_resolution)
x_range = np.linspace(*x_range, x_divisions)
y_range = np.arange(*y_range, y_resolution)
angle_range = np.linspace(*angle_range, x_divisions)
log.info("Casting beams")
beams = [Beam((x, y, z), (angle, 0, -1)) for (x, angle), y in product(zip(x_range, angle_range), y_range)]
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(lambda beam: beam.cast(), beams)
if draw:
log.info("Drawing beams")
for beam in tqdm(Beam._instances):
beam.draw()
pointcloud = np.zeros((len(Beam._instances), 3))
i = 0
for beam in Beam._instances:
if beam.is_hit and beam.is_read:
pointcloud[i] = beam.hit_position
i += 1
pointcloud = pointcloud[:i]
del Beam._instances[:]
return pointcloud
class Beam:
"""Class for handling the beam of the laser line scanner."""
id_iter = itertools.count() # Iterator for assigning unique ids to beams
_instances:list[Beam] = [] # List of all instances of this class
def __init__(self, origin:tuple[float, float, float], direction:tuple[float, float, float]) -> None:
self.origin = origin
self.direction = direction
self.id = next(self.id_iter)
self.detector_offset = (0, -65, 0) # The offset of the detector from the origin of the beam
self.detector_position = Vector(self.origin) + Vector(self.detector_offset)
Beam._instances.append(self)
def __getitem__(self, id:int) -> Beam:
return self._instances[id]
def cast(self) -> tuple[bool, tuple[float, float, float]]:
"""Cast the beam and return the result."""
reading = bpy.context.scene.ray_cast(bpy.context.view_layer.depsgraph, Vector(self.origin), Vector(self.direction))
self.is_hit, self.hit_position, self.hit_normal, self.hit_index, self.hit_object, self.hit_matrix = reading
# Check if the beam can be seen by the detector
if self.is_hit:
detector_direction = self.detector_position - Vector(self.hit_position)
detector_direction.normalize()
reading = bpy.context.scene.ray_cast(bpy.context.view_layer.depsgraph, self.hit_position + Vector((0, 0, 0.001)), detector_direction)
self.is_read = not reading[0] # If the beam is blocked by an object
if not self.is_read:
self.detector_position = reading[1]
return reading
def draw(self) -> None:
"""Draw the path of the beam and the point where it hits."""
if not self.is_hit: return
color = green() if self.is_read else red()
if self.is_hit:
# Draw an icosphere where the beam hits
hit_pointer = reference_icosphere()
hit_pointer.location = self.hit_position
hit_pointer.active_material = color
main.group(hit_pointer, f"Laser beams/{self.id}")
# Draw the beam path
emission_beam = main.add_bezier(self.origin, self.hit_position)
emission_curve = emission_beam.data
emission_curve.dimensions = '3D'
emission_curve.bevel_depth = 0.02
emission_curve.bevel_resolution = 3
emission_curve.materials.append(color)
emission_curve.name = "Beam path"
main.group(emission_beam, f"Laser beams/{self.id}")
reading_beam = main.add_bezier(self.hit_position, self.detector_position)
reading_curve = reading_beam.data
reading_curve.dimensions = '3D'
reading_curve.bevel_depth = 0.02
reading_curve.bevel_resolution = 3
reading_curve.materials.append(color)
reading_curve.name = "Beam path"
main.group(reading_beam, f"Laser beams/{self.id}")
def reference_icosphere() -> bpy.types.Object:
"""Create a reference icosphere and return it."""
if "Reference Icosphere" in bpy.data.objects:
icosphere = bpy.data.objects["Reference Icosphere"]
icosphere = icosphere.copy()
return icosphere
bpy.ops.mesh.primitive_ico_sphere_add(radius=0.02, enter_editmode=False, location=(0, 0, 0))
icosphere = bpy.context.object
icosphere.data.name = "Reference Icosphere"
try:
bpy.context.scene.collection.children["Collection"].objects.unlink(icosphere)
except:
pass
icosphere = icosphere.copy()
return icosphere
def red():
material = bpy.data.materials.new("Red")
material.diffuse_color = (1, 0, 0, 1)
return material
def green():
material = bpy.data.materials.new("Green")
material.diffuse_color = (0, 1, 0, 1)
return material
\ No newline at end of file
import os
import open3d as o3d
import numpy as np
from scipy.spatial import KDTree
from scipy.spatial.transform import Rotation as R
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.contrib.itertools import product
sns.set()
from .pointcloud_processing.process_pointclouds import layerize_pointclouds
from .pointcloud_processing.calibration import prepare_dataset, execute_global_registration, draw_registration_result
def get_samples(translation:np.ndarray=np.array((0, 0, 0)), rotation:float=30, plot: bool = True):
square = np.zeros((10, 10))
square[3:7, 3:7] = 1
straight = np.array(np.where(square == 1))
straight = np.vstack((straight, np.zeros((1, len(straight[0])))))
rotation_matrix = R.as_matrix(R.from_euler('z', rotation, degrees=True))
transformed = (rotation_matrix @ straight)
transformed = transformed + translation.reshape(3, 1)
if plot:
plot_squares(straight, transformed)
return straight.T, transformed.T
def get_one_missplaced(deviation:np.ndarray=np.array([-1, 0, 0]), deviation_index:tuple[float, float, float]=[0, 0, 0], plot:bool=True):
straight, transformed = get_samples(rotation=0, plot=False)
transformed[deviation_index] += deviation
if plot:
plot_squares(straight, transformed)
return straight, transformed
def plot_squares(pcd1:np.ndarray, pcd2:np.ndarray):
plt.plot(pcd1[:, 0], pcd1[:, 1], 'o')
plt.plot(pcd2[:, 0], pcd2[:, 1], 'o')
plt.legend(['straight', 'transformed'])
def plot3d(pcls:list[np.ndarray], alpha:float=0.4, size_multiplier:float=1, figsize:tuple[int]=(10, 10)) -> plt.Figure:
"""Plots a list of point clouds in 3D"""
fig = plt.figure(figsize=figsize)
fig.tight_layout()
ax = fig.add_subplot(projection='3d')
if not isinstance(pcls, list):
pcls = [pcls]
try:
s = size_multiplier * 0.06/(pcls[0][:, 0].max()-pcls[0][:, 0].min())
except:
s = size_multiplier * 0.06
for pcl in pcls:
if len(pcl) == 0:
continue
x, y, z = pcl[:, 0], pcl[:, 1], pcl[:, 2]
ax.set_box_aspect((np.ptp(x), np.ptp(y), np.ptp(z)))
ax.scatter(x, y, z, s=s, marker='o', alpha=alpha)
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
return fig
def RMSE(reference:np.ndarray, target:np.ndarray, workers:int=-1) -> float:
octree = KDTree(reference)
distances, indices = octree.query(target, k=1, p=2, workers=workers)
rmse = np.sqrt(np.mean(distances**2))
return rmse
def calculate_accuracy(reference:np.ndarray, target:np.ndarray, threshold:float, workers:int=-1) -> tuple[float, np.ndarray]:
"""Calculates the accuracy of the target point cloud compared to the reference point cloud.
If reference and target are swept, metric is called 'completeness'.
Returns the accuracy and the indices of the inaccurate points, i.e. the points of target that do not match any of the reference."""
octree = KDTree(reference)
distances, indices = octree.query(target, k=1, distance_upper_bound=threshold, p=2, workers=workers)
accuracy = np.array(distances <= threshold, dtype=int).sum() / len(distances)
return accuracy, np.array(range(len(target)))[distances > threshold]
def f1_score(reference:np.ndarray, target:np.ndarray, threshold:float, workers:int=-1, plot_results:bool=False) -> tuple[float, np.ndarray, np.ndarray]:
"""Calculates the F1 score between two pointclouds. Reference and target are interchangeable.
Returns the score, the indices of the inaccurate points of the reference and the indices of the uncomplete points of the target."""
accuracy, inaccurate_ids = calculate_accuracy(reference, target, threshold, workers=workers)
completeness, uncomplete_ids = calculate_accuracy(target, reference, threshold, workers=workers)
score = 2 * (accuracy * completeness) / (accuracy + completeness)
if plot_results:
inaccurate = target[inaccurate_ids]
accurate = target[np.setdiff1d(np.arange(len(target)), inaccurate_ids)]
uncomplete = reference[uncomplete_ids]
complete = reference[np.setdiff1d(np.arange(len(reference)), uncomplete_ids)]
matched = np.concatenate([accurate, complete], axis=0)
plot3d([matched, inaccurate, uncomplete])
return score, inaccurate_ids, uncomplete_ids
def EMD(reference:np.ndarray, target:np.ndarray, workers:int=-1) -> float:
octree = KDTree(target)
distances, indices = octree.query(reference, k=1, p=2, workers=workers)
return distances.sum()
def average_squared_distance(reference:np.ndarray, target:np.ndarray, neighbours:int=1, workers:int=-1) -> np.ndarray:
"""Calculates the average squared distance to neighbours of all target's points to the reference"""
octree = KDTree(reference)
distances, indices = octree.query(target, k=neighbours, p=2, workers=workers)
return distances**2 / neighbours
def k_chamf(reference:np.ndarray, target:np.ndarray, neighbours:int=1, workers:float=-1) -> float:
"""Calculates the k-Nearest Chamfer distance between two point clouds. Reference and target are interchangeable."""
first_term = average_squared_distance(reference, target, neighbours, workers=workers)
first_term = np.mean(first_term)
second_term = average_squared_distance(target, reference, neighbours, workers=workers)
second_term = np.mean(second_term)
return first_term + second_term
def load_pointcloud(path:str, bed_path:str, roi:list[list[float]]=[[None, None], [None, None], [None, None]], intensity_threshold:int=60, plot:bool=True) -> np.ndarray:
"""Load the pointcloud from npy file, and layerize it by comparing with its bed.
@param path: the path to the npy file of the measurement point cloud
@param bed: the path to the npy file of the bed point cloud
@param roi: region that will be used. The rest of the part is cut off. Leave it as `None` not to define a limit.
Format: [[x0, x1], [y0, y1], [z0, z1]]
@param intensity_threshold: the threshold of the intensity of the point cloud. Points with intensity below this threshold will be removed.
@param plot: plot the point cloud using matplotlib if requested"""
pcl = np.load(path)
bed = np.load(bed_path)[:, :3]
pcl = pcl[pcl[:, 3] > intensity_threshold]
pcl = pcl[:, :3]
_, pcl = layerize_pointclouds([bed, pcl]) # Layerize pointclouds
pcl[:, 2] = -pcl[:, 2] + max(pcl[:, 2]) # Invert z axis
pcl = pcl[pcl[:, 2] < 100] # Removing outliers
for i in range(3):
if roi[i][0]is not None: pcl = pcl[:, i] > roi[i][0]
if roi[i][1]is not None: pcl = pcl[:, i] < roi[i][1]
if plot:
plot3d(pcl)
return pcl
def icp(source:np.ndarray, target:np.ndarray, voxel_size:float=0.5) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Performs Iteractive Closest Point registration on two point clouds.
The source point cloud is transformed to match the target point cloud.
Returns the transformed source point cloud, the target point cloud and the transformation matrix.
"""
threshold = voxel_size * 0.4
pcd1 = o3d.geometry.PointCloud()
pcd1.points = o3d.utility.Vector3dVector(np.reshape(source, (-1, 3)))
pcd2 = o3d.geometry.PointCloud()
pcd2.points = o3d.utility.Vector3dVector(np.reshape(target, (-1, 3)))
source_pcl, target_pcl, source_down, target_down, source_fpfh, target_fpfh = prepare_dataset(voxel_size, source=pcd1, target=pcd2)
result_ransac = execute_global_registration(source_down, target_down, source_fpfh, target_fpfh, voxel_size)
result_icp = o3d.pipelines.registration.registration_icp(
source_pcl, target_pcl, threshold, result_ransac.transformation,
o3d.pipelines.registration.TransformationEstimationPointToPoint(),
o3d.pipelines.registration.ICPConvergenceCriteria(max_iteration=5000))
transformation = result_icp.transformation
return apply_transformation(source, transformation), target, transformation
def apply_transformation(source:np.ndarray, transformation:np.ndarray) -> np.ndarray:
"""Applies a transformation matrix to a point cloud"""
source = np.hstack((source, np.ones((len(source), 1))))
source = transformation @ source.T
source = source.T
return source[:, :3]
def remove_noise(pointcloud:np.ndarray, threshold:float=0.5, workers:int=-1) -> np.ndarray:
"""Removes noise from a point cloud. Returns the pointcloud without noise and log the deletion ratio, i.e. the percentage of points that were removed."""
basis = KDTree(pointcloud)
distances, indices = basis.query(pointcloud, k=[2], p=2, distance_upper_bound=threshold, workers=workers)
not_noise = distances[:, 0] < threshold
remove_ratio = 1-not_noise.sum()/len(pointcloud)
print(f"Removed {remove_ratio*100:.2f}% of the pointcloud")
return pointcloud[not_noise]
def load_dataset(path="../data/metrics/measurements") -> list[tuple[str, str]]:
files = []
for filename in os.listdir(path):
if not filename.endswith(".npy") or filename.startswith("bed") or int(filename.split(".")[0][-1]) > 1:
continue
run = filename.split("_")[1]
bed = os.path.join(path, f"bed_before_printing_{run}_0.npy")
files.append((bed, os.path.join(path, filename)))
return files
def evaluate_metrics(dataset:list[tuple[str, str]], results_path:str="../data/metrics/results") -> None:
"""Evaluates the metrics on a dataset of point clouds."""
os.makedirs(results_path, exist_ok=True)
with open(os.path.join(results_path, "results.csv"), "w") as f:
f.write("target,source,rmse,f1,emd,chamfer\n")
for s, t in product(dataset, dataset):
s_file = os.path.split(s[1])[-1]
t_file = os.path.split(t[1])[-1]
if s == t:
continue
elif 'printing_0' in s_file or 'printing_0' in t_file: # Ignore the first measurement
continue
elif 'measurement' not in s_file and 'measurement' not in t_file: # Only compare scenarios to as-is
continue
source = load_pointcloud(s[1], s[0], plot=False)
target = load_pointcloud(t[1], t[0], plot=False)
source = remove_noise(source, 0.2)
target = remove_noise(target, 0.2)
source, _, _ = icp(source, target)
rmse = RMSE(source, target)
f1, _, _ = f1_score(source, target, 0.15, plot_results=False)
emd = EMD(source, target)
chamfer = k_chamf(source, target)
with open(os.path.join(results_path, "results.csv"), "a") as f:
f.write(",".join((
os.path.split(t[1])[-1],
os.path.split(s[1])[-1],
str(round(rmse, 4)),
str(round(f1, 4)),
str(round(emd, 4)),
str(round(chamfer, 4))
)))
f.write("\n")
del source, target
\ No newline at end of file
import os
import pickle
import numpy as np
from scipy.spatial import KDTree
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor as Pool
import logging
log = logging.getLogger("SmoPa3D")
from .gcode.parser import Gcode
from .node import Node, Environment
from .command import Command, Layer, calculate_command_mesh
from .decorators import runtime, track
class Network:
"""Class to create and manage Nodes from the given gcode
@param gcode_path: path to the gcode file
@param resolution: the concentration of the pointcloud per mm
@param node_size: the lenght, in mm, of the pointcloud around the node
@param filament_thickness: the diameter, in mm, of the filament used in the printing simulation
"""
def __init__(self, gcode_path:str, resolution:float, node_size:float, node_distance:float=0.2, extrusion_multiplier:float=1, saving_path:str='data/simulation/pickle') -> None:
log.info('Creating network...')
self.saving_path = saving_path
os.makedirs(self.saving_path, exist_ok=True)
self.gcode = Gcode(path=gcode_path)
self.env = Environment(resolution=resolution, node_size=node_size)
self.node_distance = node_distance
self.extrusion_multiplier = extrusion_multiplier
self.nodes:list[Node] = []
self.commands:dict[int, Command] = {}
self.layers:dict[float, Layer] = {}
self.coords:list[tuple[float, float, float]] = []
# Get layer height from gcode
self.layer_height = None
for command in self.gcode.gcode[:self.gcode.setup_end]:
if ';Layer height: ' in command:
self.layer_height = float(command.split(';Layer height: ')[1])
break
if self.layer_height is None:
log.warning('Layer height not found in gcode. Using 0.2 mm as default')
self.layer_height = 0.2
self.create_network()
@runtime
def create_network(self) -> None:
"""Create the nodes from the gcode commands"""
empty_commands = 0
for (command_id, gcode_command) in enumerate(self.gcode.commands):
if gcode_command.m == 109: # Set extruder temperature
self.temperature = gcode_command.s
if not gcode_command.is_transformable: continue # Only considers commands that extrude filament
# Create command and nodes
command = Command(self, gcode_command, id=command_id)
if command.qtd_nodes == 0:
if empty_commands < 15: log.warning(f'Command {command.id} has no nodes')
empty_commands += 1
continue
self.tree = KDTree(self.coords)
log.warning(f'{empty_commands} ({empty_commands/len(self.commands)*100:.2f}%) commands have no nodes, and will not be simulated')
def get_layer(self, z:float) -> Layer:
"""Get the layer of the given z coordinate"""
if z not in self.layers:
self.layers[z] = Layer(self, z)
return self.layers[z]
@runtime
def simulate_printer(self, node_limit:int=-1, workers:int=-1, optmize_memory:bool=True) -> None:
"""Run the calculations of each node
@param node_limit: limit the number of nodes to be calculated. If -1, all nodes will be calculated
@param workers: number of processes to be used in the calculation. If -1, all available cores will be used
@param optmize_memory: if True, the nodes will be saved after each iteration, and the previous layers will be deleted from memory"""
log.info('Simulating printer...')
current_layer = self.nodes[0].layer
for (i, node) in enumerate(tqdm(self.nodes[:node_limit])):
obstacles = self.calculate_obstacles(node, workers=workers)
node.iterate_volume(obstacles)
node.save()
if node.layer != current_layer:
if optmize_memory:
threshold_in_use = node.layer.z - self.layer_height * 3 # Delete layers that are not in the search radius of the calculate_obstacles function
layers_to_wipe = [layer for layer in self.layers.values() if layer.z < threshold_in_use]
for layer in layers_to_wipe:
layer.wipe_memory()
current_layer = node.layer
@track
def calculate_obstacles(self, node:Node, workers:int) -> np.ndarray:
search_radius = self.env.node_size / np.sqrt(3)
query = self.tree.query_ball_point(node.coord, search_radius, p=2, workers=workers)
# Restrict teh search in the z axis to range between the node and 2 layers below
neighbours = [self.nodes[x] for x in query if self.nodes[x].z >= node.z - 2 * self.layer_height and self.nodes[x].z <= node.z]
return node.obstacles(neighbour_nodes=neighbours) # TODO: processes = workers
def calculate_meshes(self, processes:int=None) -> None:
"""Calculate the meshes of each command. Pass processes as 0 not to use multiprocessing"""
log.info('Calculating meshes...')
if processes == 0:
for command in tqdm(self.commands.values()):
command.vertices_filepath, command.faces_filepath = calculate_command_mesh(command)
else:
with Pool(processes) as pool:
meshes = list(tqdm(pool.map(calculate_command_mesh, (list(self.commands.values()))), total=len(self.commands)))
log.info('Assigning meshes to commands...')
for command, (vertices_path, faces_path) in zip(self.commands.values(), meshes):
command.vertices_filepath = vertices_path
command.faces_filepath = faces_path
def save(self, filename:str="network") -> None:
"""Save the network in a pickle file"""
if filename[-4:] != ".pkl": filename += ".pkl"
try:
with open(os.path.join(self.saving_path, filename), "wb") as fle:
pickle.dump(self, fle)
log.info(f'Network saved in {os.path.join(self.saving_path, filename)}')
except Exception as e:
log.warning(f'Could not save network due to {e}')
def load_network(path:str="data/simulation/pickle/network.pkl") -> Network:
"""Load the network from a pickle file"""
with open(path, "rb") as fle:
return pickle.load(fle)
if __name__ == '__main__':
net = Network('test/sample.gcode', 0.01, 2)
net.simulate_printer()
\ No newline at end of file
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .network import Network
from .command import Command, Layer
from dataclasses import dataclass
import os
import numpy as np
import logging
from concurrent.futures import ThreadPoolExecutor as Pool
from functools import partial
from . import geometry as geo
log = logging.getLogger('SmoPa3D')
@dataclass
class Environment:
resolution: float # Size, in mm, of the edge of the cubic grid unit
node_size: float # Size, in mm, of the edge of the cubic volume around the node
# Node grid size must be uneven
def __post_init__(self) -> None:
# Create the volume
self.node_grid_size = round(self.node_size / self.resolution)
if self.node_grid_size % 2 == 0: self.node_grid_size += 1 # Node grid size must be uneven
self.volume = np.zeros((self.node_grid_size, self.node_grid_size, self.node_grid_size))
self.calculate_nozzle()
def calculate_nozzle(self, D:float=0.4, angle:float=0.716, height:float=2.5) -> np.array:
"""Create the 3D model of the nozzle in the environment grid.
----
Arguments
----
@param D: Diameter of nozzle, in mm
@param angle: angle of the trunk of cone of the nozzle, in rad"""
z0 = self.node_grid_size // 2 # Tip of the nozzle is always the center of the volume
height = height / self.resolution
radius = D / (2 * self.resolution)
B = 1 / np.tan(angle)
A = radius - z0 * B
final_diameter = A + (height + z0) * B
def nozzle_2d(z:int) -> float:
"""Build the nozzle in 2D.
@param z: coordinate in grid units"""
if z < z0:
return 0
elif z < z0 + height:
return A + z * B
else:
return final_diameter
revolutionized = geo.apply_revolution(nozzle_2d, self.node_grid_size, self.node_grid_size).astype(int)
self.nozzle = np.where(revolutionized > 0)
self.nozzle = np.array(self.nozzle[::-1]).transpose()
return self.nozzle
def bed(self, node_z:float) -> np.array:
"""Create the 3D model of the printing bed in the environment grid placing the bed according to the
position in z of the node.
----
Arguments
----
@param node_z: position, in mm, between the bottom of the nozzle in the node (that corresponds to the
top of the printed filament)"""
z0 = self.node_grid_size//2 - 1 - round(node_z/self.resolution)
bed = np.zeros_like(self.volume)
if z0 > 0:
bed[:z0] = 1
return bed
class Node:
def __init__(self, network:Network, command:Command, layer:Layer, x:float, y:float, z:float, filament_volume:float) -> None:
"""@param index: index of the node in the network
@param x: Coordinate position of the center of the node
@param y: Coordinate position of the center of the node
@param z: Coordinate position of the center of the node
@param filament_volume: Volume, in mm^3, of the filament deposited in the node"""
self.network = network
self.command = command
self.x = x
self.y = y
self.z = z
self.filament_volume = filament_volume
self._placed_filament = None # Voxel reprensenting the volume occupied by the node. It is None until iterate_volume() is computed
self._pointcloud = None # Pointcloud of the filament, in (x, y, z) coordinates. It is None until iterate_volume() is computed
self.active:bool = True # Set as False when the node is saved and wiped from memory
self.simulated:bool = False # Set as True when the node is simulated
# Update Network to include this node
self.network.nodes.append(self)
self.index = len(self.network.nodes) - 1
self.network.coords.append((self.x, self.y, self.z))
# Update Command to include this node
self.command.nodes.append(self)
# Update Layer to include this node
self.layer = layer
self.layer.nodes.append(self)
@property
def coord(self) -> np.ndarray:
"""(x, y, z)"""
return np.array((self.x, self.y, self.z))
def __repr__(self) -> str:
return f"({self.x}, {self.y}, {self.z})"
def save(self) -> None:
"""Save the node data in npy files"""
if self.active:
self.saving_path = os.path.join(self.network.saving_path, 'nodes', f'{self.index}')
os.makedirs(self.saving_path, exist_ok=True)
np.save(os.path.join(self.saving_path, 'placed_filament.npy'), self.placed_filament)
np.save(os.path.join(self.saving_path, 'pointcloud.npy'), self.pointcloud)
else:
log.warn('Node is already inactive. Cannot save it.')
def load(self) -> None:
"""Load the node from the npy files"""
if self.saving_path is None:
log.warning('Node was not saved before or saving path could not be found. Cannot load it.')
return
self._placed_filament = np.load(os.path.join(self.saving_path, 'placed_filament.npy'), allow_pickle=True)
self._pointcloud = np.load(os.path.join(self.saving_path, 'pointcloud.npy'), allow_pickle=True)
def wipe(self) -> None:
"""Delete the node from the memory, but keeps it in the network to be loaded back again if necessary"""
if not self.active: return
del self._placed_filament
del self._pointcloud
self.active = False
@property
def placed_filament(self) -> np.ndarray:
"""Voxel reprensenting the volume occupied by the node. If node is inactive, load it from the npy files"""
if not self.active:
self.load()
return self._placed_filament
@property
def pointcloud(self) -> np.ndarray:
"""Pointcloud of the filament, in (x, y, z) coordinates. If node is inactive, load it from the npy files"""
if not self.active:
self.load()
return self._pointcloud
def backpropagate_feedrate(self, volume:float=None, constant:float=0.0025726112346777796, feedrate_multiplier:float=1.812969202377806) -> float:
"""Calculate the feedrate that would be necessary to deposit the given volume of filament
@param volume: filament volume, in mm^3
@param constant: constant value to calculate the area of the profile. Value retrieved experimentally
@param feedrate_multiplier: multiplier value to calculate the area of the profile. Value retrieved experimentally"""
area = volume / self.command.trajectory_length * self.command.qtd_nodes / self.network.extrusion_multiplier
return (area - constant) / feedrate_multiplier
# return volume / (self.command.trajectory_length / self.command.qtd_nodes * np.pi * 1.75 ** 2 / 4 * self.network.extrusion_multiplier)
def draw_revolutionized_profile(self, ground:int, volume:float=None) -> np.ndarray:
"""Draw a drop of the profile with the given volume.
@param ground: z coordinate of the ground level
@param volume: filament volume, in mm^3"""
if volume is None:
volume = self.filament_volume
volume_multiplier = volume / self.filament_volume
self.applied_feedrate = self.backpropagate_feedrate(volume)
self.width = geo.width_model(self.network.temperature, self.applied_feedrate, self.command.speed)
self.length = 1.5 * np.cbrt(volume_multiplier) * self.command.trajectory_length / self.command.qtd_nodes
self.height = 6 * volume / (np.pi * self.width * self.length)
h = round(self.height / self.network.env.resolution / 2)
w = round(self.width / self.network.env.resolution / 2)
l = round(self.length / self.network.env.resolution / 2)
grid = self.network.env.volume.copy()
if any(np.array(grid.shape) < np.array((w, l, h))):
log.warning(f'Node size is too small for node {self.index}. It was cut to fit in the simulation.')
center = np.array(grid.shape)//2 - 1
# if self.height > self.network.layer_height:
# center[0] = ground + (center[0] - ground)//2
# else:
# center[0] = ground + h
center[0] = center[0] - round(self.network.layer_height / self.network.env.resolution / 2)
geo.place_ellipsoid(grid, w, l, h, self.command.end - self.command.start, center=center)
return grid
def iterate_volume(self, obstacles:np.ndarray, increment_offset:float=0.1, precision:float=0.1, max_iterations:int=30) -> np.ndarray:
"""Calculates the volume that results from the intersection between the deposited filament of the node
and the environment given by obstacles. Then expands the volume by the interference plus an increment_offset
and calculates the volume interference again, until the interference volume reaches a value as low as the
precision, or it gets to the max iterations.
@param obstacles: an array representing the interacting environment, given by the obstacles method
@param increment_offset: the extra volume, in percentage, that is increased during each iteration
to get faster to the result
@param precision: the acceptable error, in percentage, between the nominal_volume and the actual volume got from the iterations
@param max_iterations: the maximum size of the loop that is conducted to get to the final volume"""
precision = self.filament_volume * precision # Convert the units to mm^3
virtual_volume = self.filament_volume # In the end of the iterations: virtual volume = self.filament_volume + intersection_volume
# ground_level = np.where(obstacles[:obstacles.shape[0]//2, obstacles.shape[1]//2, obstacles.shape[2]//2])[0].max() # Get the ground level of the obstacles
for it in range(max_iterations):
if virtual_volume < 0:
log.warning(f'Negative filament volume in node {self.index}. It is ignored in the simulation.')
virtual_volume = 0
return None
drawn_profile = self.draw_revolutionized_profile(ground=0, volume=virtual_volume)
molded_profile = 1*((drawn_profile - obstacles) > 0)
molded_volume = molded_profile.sum() * self.network.env.resolution**3
diff = self.filament_volume - molded_volume
if abs(diff) <= precision:
break
else:
virtual_volume += diff * (1 + increment_offset)
if it == max_iterations - 1:
log.warning(f'Node {self.index} did not converge. Difference of {diff*10**6:.0f} μm^3 ({(diff/self.filament_volume)*100:.2f}% difference) between nominal and actual volume.')
self._placed_filament = molded_profile.astype(int)
self._pointcloud = np.where(self._placed_filament > 0)
self._pointcloud = np.array(self._pointcloud[::-1]).transpose()
self.simulated = True
return self._placed_filament
def obstacles(self, neighbour_nodes:list[Node], processes:int=None) -> np.ndarray:
"""Places all obstacles in the grid, including the nozzle, the bed (if applicable) and the adjacent nodes that already have
their volumes calculated.
@param neighbour_nodes: list of Node objects that must be considered in the surroundings of the node"""
if processes is not None and processes == 0:
neighbours_pts = np.empty((0, 3))
for nbr in neighbour_nodes:
neighbours_pts = np.concatenate([neighbours_pts, place_obstacle(self, nbr)])
else:
with Pool(max_workers=processes) as pool:
self_place_obstacle = partial(place_obstacle, self)
neighbours_pts = np.concatenate(list(pool.map(self_place_obstacle, neighbour_nodes)))
filter_in_boundaries = np.all((neighbours_pts >= 0) & (neighbours_pts < self.network.env.node_grid_size), axis=1)
neighbours = assign_values(self.network.env.volume.copy(), neighbours_pts[filter_in_boundaries])
return (self.network.env.bed(self.z) + neighbours) > 0
def place_obstacle(main_node:Node, obstacle_node:Node) -> np.ndarray:
neighbours_pts = np.empty((0, 3))
shift_vector = np.around((obstacle_node.coord - main_node.coord)/main_node.network.env.resolution)
if any(np.abs(shift_vector) > main_node.network.env.node_grid_size): return np.empty((0, 3)) # Ignore nodes that are too far away (more than the node grid size)
if obstacle_node.z == main_node.z and (obstacle_node in main_node.command.nodes or obstacle_node.placed_filament is None): # Add the nozzle of the next and last nodes even if they have not been placed yet
new_neighbour = obstacle_node.network.env.nozzle + shift_vector
neighbours_pts = np.concatenate([neighbours_pts, new_neighbour])
if not obstacle_node.placed_filament is None: # if the filament has been placed, add it to the neighbours_pts
new_neighbour = obstacle_node.pointcloud + shift_vector
neighbours_pts = np.concatenate([neighbours_pts, new_neighbour])
return neighbours_pts
def assign_values(volume, neighbours_pts) -> np.ndarray:
"""Assigns 1 to the neighbours_pts in the volume array.\n
Works exactly as `volume[neighbours_pts] = 1` but more efficient."""
ravel = np.ravel_multi_index(neighbours_pts[:, [2, 1, 0]].astype(int).T, volume.shape)
np.put(volume, ravel, 1)
return volume
def join_nodes(main_node:Node, obstacle_node:Node):
"""Join the pointclouds of two nodes, placing the obstacle_node in the position relative to the main_node"""
if obstacle_node.placed_filament is None: return np.empty((0, 3))
shift_vector = np.around((obstacle_node.coord - main_node.coord)/main_node.network.env.resolution)
new_neighbour = obstacle_node.pointcloud + shift_vector
return new_neighbour
\ No newline at end of file
import random
import numpy as np
from .utils import running_in_blender
if running_in_blender:
import bpy
import bmesh
class PointCloud:
"""Class for handling pointclouds."""
def __init__(self, path:str) -> None:
self.pcl:np.ndarray = np.load(path)
self.correct_z_axis()
def correct_z_axis(self) -> float:
"""Invert z axis, then get the height of the bed from the pointcloud and move pointcloud so that the bed is at z=0."""
self.pcl[:, 2] = -self.pcl[:, 2]
z_values = self.pcl[:, 2].copy()
z_values.sort()
mode = z_values[int(len(z_values)*0.4):int(len(z_values)*0.6)]
bed_z = mode.mean() - 5* mode.std()
self.pcl[:, 2] = self.pcl[:, 2] - bed_z
return bed_z
def crop_ROI(self, x:tuple[float, float]=(None, None), y:tuple[float, float]=(None, None), z:tuple[float, float]=(None, None)) -> np.ndarray:
"""Crop the pointcloud to the region of interest."""
if x[0] is not None: self.pcl = self.pcl[self.pcl[:, 0] > x[0]]
if x[1] is not None: self.pcl = self.pcl[self.pcl[:, 0] < x[1]]
if y[0] is not None: self.pcl = self.pcl[self.pcl[:, 1] > y[0]]
if y[1] is not None: self.pcl = self.pcl[self.pcl[:, 1] < y[1]]
if z[0] is not None: self.pcl = self.pcl[self.pcl[:, 2] > z[0]]
if z[1] is not None: self.pcl = self.pcl[self.pcl[:, 2] < z[1]]
return self.pcl
def downsample(self, select_rate:float=0.05) -> np.ndarray:
# Decrease amount of points
sample = random.sample(range(len(self.pcl)), int(len(self.pcl) * select_rate))
self.pcl = self.pcl[sample]
return self.pcl
def place_pointcloud(self, name:str, parent:bpy.types.Object, move:tuple[float, float, float]=None) -> bpy.types.Object:
"""Plot pointcloud given by x, y and z data in blender as points"""
mesh_data = bpy.data.meshes.new(name)
bm = bmesh.new()
for v in self.pcl[:, :3]:
bm.verts.new(v)
bm.to_mesh(mesh_data)
mesh_obj = bpy.data.objects.new(mesh_data.name, mesh_data)
parent.parent = mesh_obj
parent.parent_type = 'OBJECT'
# Move pointcloud
if move is not None:
mesh_obj.location = move
# Visuals
mesh_obj.display.show_shadows = False
mesh_obj.show_all_edges = False
mesh_obj.show_instancer_for_viewport = False
return mesh_obj
\ No newline at end of file
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment