Skip to content
Snippets Groups Projects
Commit db812499 authored by Matthias Stefan Bodenbenner's avatar Matthias Stefan Bodenbenner
Browse files

Merge branch 'master' into develop

parents 3b919632 5b29389a
No related branches found
No related tags found
No related merge requests found
Pipeline #60121 passed
Showing
with 799 additions and 16 deletions
# Python Unified Device Interface
Current stable version: 5.2.5
Current stable version: 6.0.2
## Installation
1. Install the *WZL-Utilities* dependency via pip
......@@ -34,14 +34,6 @@ Setup your server as follows:
sensors = [Sensor(macs[i]) for i in range(3)]
implementation = Environment(sensors)
# configure MQTT publisher
mqtt = MQTTPublisher(MQTT_USER)
mqtt.connect(MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD)
# configure message publishing
message_schedule = docstring_parser.parse_docstrings_for_mqtt(implementation)
message_scheduler = MessageScheduluer(loop, message_schedule, [mqtt])
# configure HTTP server
mapping = src.test.devices.mapping.mapping(implementation)
model = Component.load('./src/test/devices/environmental_sensor/Environment.json', mapping)
......@@ -61,10 +53,19 @@ Setup your server as follows:
2. If you are not using MQTT or HTTP leave out the respective lines.
## Recent changes
5.2.5
6.0.2 | 5.2.5
- bug fix
- fixed parsing of parameters and variables/ measurements of type "time" for higher dimensions
6.0.1
- bug fix
- fixed parsing of parameters and measurements of type "time" for higher dimensions
6.0.0
- renamed Object to Component and Variable to Measurement. UUID now starts with COM MEA respectively
- marked Object and Variable as deprecated
- marked docstring parsing as deprecated due to its error-prone behaviour
5.2.4
- bug fix
- variables and parameters of type 'enum' and 'time' are now returned correctly
......
from setuptools import setup, find_packages
setup(name='wzl-udi',
version='5.2.5',
version='6.0.2',
url='',
author='Matthias Bodenbenner',
author_email='m.bodenbenner@wzl.rwth-aachen.de',
......
......@@ -7,7 +7,7 @@ import datetime
import nest_asyncio
import strict_rfc3339 as rfc3339
import time
from typing import Any, List, Callable
from typing import Any, List, Callable, Union
from wzl.utilities import root_logger
nest_asyncio.apply()
......@@ -46,13 +46,14 @@ class Figure(Element, ABC):
if getter is not None and not callable(getter):
raise TypeError("{}: The getter of the Figure must be callable!".format(uuid))
self._datatype = datatype
if datatype == 'time' and value is not None and value != "":
self._dimension = dimension
self._range = range
if datatype == 'time':
self._value = parse_time(value)
else:
self._value = value
self._dimension = dimension
self._getter = getter
self._range = range
@property
def datatype(self):
......
{
"components": [
{
"components": [
{
"components": [],
"functions": [
{
"arguments": [
{
"range": [
null,
null
],
"datatype": "double",
"dimension": [],
"value": 0,
"unit": "C81",
"uuid": "ARG-Azimuth",
"name": "Azimuth",
"description": "Azimuth angle the laser tracker head is jogged."
},
{
"range": [
null,
null
],
"datatype": "double",
"dimension": [],
"value": 0,
"unit": "C81",
"uuid": "ARG-Elevation",
"name": "Elevation",
"description": "Elevation angle the laser tracker head is jogged."
}
],
"returns": [],
"uuid": "FUN-Jog",
"name": "Jog",
"description": "Jogs the tracker head by the given angles for the azimuth and elevation."
},
{
"arguments": [
{
"range": [
null,
null
],
"datatype": "double",
"dimension": [
3
],
"value": [
0,
0,
0
],
"unit": "MTR",
"uuid": "ARG-Position",
"name": "Position",
"description": "Position to which the laser tracker should point."
}
],
"returns": [],
"uuid": "FUN-PointTo",
"name": "PointTo",
"description": "Moves the tracker head so that the laser points to the specified position."
}
],
"measurements": [
{
"range": [
null,
null
],
"datatype": "double",
"dimension": [
3
],
"value": [
0,
0,
0
],
"unit": "MTR",
"uuid": "MEA-Position",
"name": "Position",
"description": "Most recently dispatched measured position."
},
{
"range": [
null,
null
],
"datatype": "double",
"dimension": [
4
],
"value": [
0,
0,
0,
0
],
"unit": "NONE",
"uuid": "MEA-Quaternion",
"name": "Quaternion",
"description": "Most recently dispatched measured orientation as quaternion, if available."
},
{
"range": [
null,
null
],
"datatype": "double",
"dimension": [],
"value": 0,
"unit": "C81",
"uuid": "MEA-Azimuth",
"name": "Azimuth",
"description": "Current position of azimuth rotation encoder in Radian."
},
{
"range": [
null,
null
],
"datatype": "double",
"dimension": [],
"value": 0,
"unit": "C81",
"uuid": "MEA-Elevation",
"name": "Elevation",
"description": "Current position of elevation rotation encoder."
},
{
"range": [
null,
null
],
"datatype": "double",
"dimension": [],
"value": 0,
"unit": "MTR",
"uuid": "MEA-Distance",
"name": "Distance",
"description": "Measured distance to the currently activate target."
},
{
"range": [
"False",
"True"
],
"datatype": "bool",
"dimension": [
2,
3
],
"value": [
[
true,
true,
true
],
[
true,
true,
true
]
],
"unit": "NONE",
"uuid": "MEA-Online",
"name": "Online",
"description": "Some description..."
}
],
"parameters": [
{
"constant": true,
"range": [
"OK",
"WARNING",
"ERROR",
"MAINTENANCE"
],
"datatype": "enum",
"dimension": [],
"value": "ERROR",
"unit": "NONE",
"uuid": "PAR-State",
"name": "State",
"description": "Reflects the current state of the target. If logged in: OK. If not stable: WARNING. If lost: ERROR."
},
{
"constant": true,
"range": [
0,
null
],
"datatype": "string",
"dimension": [],
"value": "",
"unit": "NONE",
"uuid": "PAR-Calibration",
"name": "Calibration",
"description": "Unique identifier that can be used to retrieve a corresponding calibration certificate."
},
{
"constant": false,
"range": [
null,
null
],
"datatype": "int",
"dimension": [],
"value": 0,
"unit": "NONE",
"uuid": "PAR-Interval",
"name": "Interval",
"description": "Some description..."
}
],
"uuid": "COM-Base",
"name": "Base",
"description": "Represents a base station in a distributed system. "
}
],
"functions": [],
"measurements": [],
"parameters": [],
"uuid": "COM-BaseStations",
"name": "Base Stations",
"description": "Object acting as a list of base stations of the metrology system. "
},
{
"components": [
{
"uuid": "COM-Home-Target",
"name": "Home Target",
"description": "Represents an individual mobile entity ",
"parameters": [
{
"uuid": "PAR-State",
"value": "ERROR"
},
{
"uuid": "PAR-Mode",
"value": "Continuous"
},
{
"uuid": "PAR-Type",
"value": "SMR"
},
{
"uuid": "PAR-Calibration",
"value": ""
}
],
"file": "./MobileEntities/Target.json"
}
],
"functions": [],
"measurements": [],
"parameters": [],
"uuid": "COM-MobileEntities",
"name": "Mobile Entities",
"description": "Object acting as a list of mobile entities in the metrology system."
}
],
"functions": [
{
"arguments": [],
"returns": [],
"uuid": "FUN-Reset",
"name": "Reset",
"description": "Resets the device into the state like directly after start-up."
},
{
"arguments": [],
"returns": [],
"uuid": "FUN-Shutdown",
"name": "Shutdown",
"description": "Gracefully shutdown the device."
}
],
"measurements": [],
"parameters": [
{
"constant": false,
"range": [
"OK",
"WARNING",
"ERROR",
"MAINTENANCE"
],
"datatype": "enum",
"dimension": [],
"value": "OK",
"unit": "NONE",
"uuid": "PAR-State",
"name": "State",
"description": "The current state of the device."
},
{
"constant": true,
"range": [
null,
null
],
"datatype": "string",
"dimension": [],
"value": "Laboratory for Machine Tools and Production Engineering WZL of RWTH Aachen",
"unit": "NONE",
"uuid": "PAR-Manufacturer",
"name": "Manufacturer",
"description": "Name of manufacturing company."
},
{
"constant": true,
"range": [
0,
null
],
"datatype": "int",
"dimension": [],
"value": 1,
"unit": "NONE",
"uuid": "PAR-Version",
"name": "Version",
"description": "Incremental API-Version."
},
{
"constant": true,
"range": [
null,
null
],
"datatype": "time",
"dimension": [
2
],
"value": [
"2021-05-03T15:30:00.0000Z",
"2021-05-03T15:30:00.0000Z"
],
"unit": "NONE",
"uuid": "PAR-Time",
"name": "Time",
"description": "Current system time."
}
],
"uuid": "COM-Lasertracker",
"name": "Lasertracker",
"description": "Active coordinate measurement device based on laser interferometry for Large-Scale metrology applications."
}
\ No newline at end of file
{"components": [], "functions": [{"arguments": [], "returns": [], "uuid": "FUN-Reset", "name": "Reset", "description": "Starts the search routine around the current direction."}, {"arguments": [], "returns": [], "uuid": "FUN-Trigger", "name": "Trigger", "description": "Trigger count measurements and set the resulting label to nonce. This function is only allowed in triggered acquisition mode."}], "measurements": [{"range": [null, null], "datatype": "double", "dimension": [3], "value": [0, 0, 0], "unit": "MTR", "uuid": "MEA-Position", "name": "Position", "description": "Most recently dispatched measured position."}, {"range": [null, null], "datatype": "double", "dimension": [4], "value": [0, 0, 0, 0], "unit": "NONE", "uuid": "MEA-Quaternion", "name": "Quaternion", "description": "Most recently dispatched measured orientation as quaternion, if available."}], "parameters": [{"constant": true, "range": ["OK", "WARNING", "ERROR", "MAINTENANCE"], "datatype": "enum", "dimension": [], "value": "ERROR", "unit": "NONE", "uuid": "PAR-State", "name": "State", "description": "Reflects the current state of the target. If logged in: OK. If not stable: WARNING. If lost: ERROR."}, {"constant": true, "range": ["Continuous", "Triggered", "External", "Idle"], "datatype": "enum", "dimension": [], "value": "Continuous", "unit": "NONE", "uuid": "PAR-Mode", "name": "Mode", "description": "Current state of the entity. In CONTINUOUS mode, values are dispatched as fast as possible. In TRIGGERED mode, values are only dispatches after a software trigger. In EXTERNAL mode, values are dispatched in accordance to an external trigger, e.g. probe or TTL. IDLE means the entity is currently not used."}, {"constant": true, "range": [null, null], "datatype": "string", "dimension": [], "value": "SMR", "unit": "NONE", "uuid": "PAR-Type", "name": "Type", "description": "System specific identifier of the target Type, e.g. SMR or Active SMR."}, {"constant": true, "range": [0, null], "datatype": "string", "dimension": [], "value": "", "unit": "NONE", "uuid": "PAR-Calibration", "name": "Calibration", "description": "Unique identifier that can be used to retrieve a corresponding calibration certificate."}, {"constant": false, "range": ["False", "True"], "datatype": "bool", "dimension": [], "value": false, "unit": "NONE", "uuid": "PAR-Locked", "name": "Locked", "description": "Some description..."}], "uuid": "COM-Target", "name": "Target", "description": "Represents an individual mobile entity "}
\ No newline at end of file
MQTT_USERNAME = ''
MQTT_PASSWORD = ''
# MQTT_BROKER = 'wzl-mbroker01.wzl.rwth-aachen.de'
# MQTT_PORT = 1883
# MQTT_VHOST = "metrology"
MQTT_BROKER = 'localhost'
MQTT_PORT = 1883
MQTT_VHOST = ""
from typing import Dict
from typing import List
from device.enums import StateEnum
class COMBase(object):
def __init__(self, device):
self._device = device
self._mea_position = [0, 0, 0]
self._mea_quaternion = [0, 0, 0, 0]
self._mea_azimuth = 0
self._mea_elevation = 0
self._mea_distance = 0
self._mea_online = [[True, True, True], [True, True, True]]
self._par_state = StateEnum.ERROR
self._par_calibration = ""
self._par_interval = 0
# TODO implement
def get_mea_position(self) -> List[float]:
# TODO implement
return self._mea_position
def get_mea_quaternion(self) -> List[float]:
# TODO implement
return self._mea_quaternion
def get_mea_azimuth(self) -> float:
# TODO implement
return self._mea_azimuth
def get_mea_elevation(self) -> float:
# TODO implement
return self._mea_elevation
def get_mea_distance(self) -> float:
# TODO implement
return self._mea_distance
def get_mea_online(self) -> List[List[bool]]:
# TODO implement
return self._mea_online
def get_par_state(self) -> StateEnum:
# TODO implement
return self._par_state
def get_par_calibration(self) -> str:
# TODO implement
return self._par_calibration
def get_par_interval(self) -> int:
# TODO implement
return self._par_interval
def set_par_interval(self, par_interval: int):
# TODO implement
self._par_interval = par_interval
def fun_jog(self, arg_azimuth: float = 0, arg_elevation: float = 0):
# TODO implement
pass
def fun_pointto(self, arg_position: List[float] = [0, 0, 0]):
arg_position = [0, 0, 0] if arg_position is None else arg_position
# TODO implement
pass
\ No newline at end of file
from typing import Dict
from device.com_base import COMBase
class COMBaseStations(object):
def __init__(self, device):
self._device = device
self._com_base = COMBase(device)
# TODO implement
from typing import Dict
from device.com_basestations import COMBaseStations
from device.com_mobileentities import COMMobileEntities
import datetime
from typing import List
from device.enums import StateEnum
class COMLasertracker(object):
def __init__(self, device):
self._device = device
self._par_state = StateEnum.OK
self._par_manufacturer = "Laboratory for Machine Tools and Production Engineering WZL of RWTH Aachen"
self._par_version = 1
self._par_time = [datetime.datetime.fromtimestamp(1620048600.0), datetime.datetime.fromtimestamp(1620048600.0)]
self._com_basestations = COMBaseStations(device)
self._com_mobileentities = COMMobileEntities(device)
# TODO implement
def get_par_state(self) -> StateEnum:
# TODO implement
return self._par_state
def set_par_state(self, par_state: StateEnum):
# TODO implement
self._par_state = par_state
def get_par_manufacturer(self) -> str:
# TODO implement
return self._par_manufacturer
def get_par_version(self) -> int:
# TODO implement
return self._par_version
def get_par_time(self) -> List['datetime']:
# TODO implement
return self._par_time
def fun_reset(self):
# TODO implement
pass
def fun_shutdown(self):
# TODO implement
pass
\ No newline at end of file
from typing import Dict
from device.com_target import COMTarget
from device.enums import StateEnum, ModeEnum
class COMMobileEntities(object):
def __init__(self, device):
self._device = device
self._com_target = {}
self._com_target["COM-Home-Target"] = COMTarget(device, StateEnum.ERROR, ModeEnum.Continuous, "SMR", "")
# TODO implement
def add(self, uuid: str, child: COMTarget):
self._com_target[uuid] = child
def remove(self, uuid: str):
del self._com_target[uuid]
from typing import Dict
from typing import List
from device.enums import StateEnum, ModeEnum
class COMTarget(object):
def __init__(self, device, par_state: StateEnum = StateEnum.ERROR, par_mode: ModeEnum = ModeEnum.Continuous, par_type: str = "SMR", par_calibration: str = "", par_locked: bool = False):
self._device = device
self._mea_position = [0, 0, 0]
self._mea_quaternion = [0, 0, 0, 0]
self._par_state = par_state
self._par_mode = par_mode
self._par_type = par_type
self._par_calibration = par_calibration
self._par_locked = par_locked
# TODO implement
def get_mea_position(self) -> List[float]:
# TODO implement
return self._mea_position
def get_mea_quaternion(self) -> List[float]:
# TODO implement
return self._mea_quaternion
def get_par_state(self) -> StateEnum:
# TODO implement
return self._par_state
def get_par_mode(self) -> ModeEnum:
# TODO implement
return self._par_mode
def get_par_type(self) -> str:
# TODO implement
return self._par_type
def get_par_calibration(self) -> str:
# TODO implement
return self._par_calibration
def get_par_locked(self) -> bool:
# TODO implement
return self._par_locked
def set_par_locked(self, par_locked: bool):
# TODO implement
self._par_locked = par_locked
def fun_reset(self):
# TODO implement
pass
def fun_trigger(self):
# TODO implement
pass
\ No newline at end of file
class Device(object):
def __init__(self):
# TODO implement
pass
def __del__(self):
# TODO implement
pass
\ No newline at end of file
from enum import Enum
class StateEnum(Enum):
OK = 0
WARNING = 1
ERROR = 2
MAINTENANCE = 3
def __str__(self):
return ["OK", "WARNING", "ERROR", "MAINTENANCE"][self.value]
@classmethod
def from_string(cls, name):
return {"OK": StateEnum.OK, "WARNING": StateEnum.WARNING, "ERROR": StateEnum.ERROR, "MAINTENANCE": StateEnum.MAINTENANCE}[name]
class ModeEnum(Enum):
Continuous = 0
Triggered = 1
External = 2
Idle = 3
def __str__(self):
return ["Continuous", "Triggered", "External", "Idle"][self.value]
@classmethod
def from_string(cls, name):
return {"Continuous": ModeEnum.Continuous, "Triggered": ModeEnum.Triggered, "External": ModeEnum.External, "Idle": ModeEnum.Idle}[name]
# -*- coding: utf-8 -*-
import asyncio
import datetime
import sys
from concurrent.futures import ThreadPoolExecutor
from devices.lasertracker.const import MQTT_USERNAME, MQTT_PASSWORD, MQTT_BROKER, MQTT_VHOST, MQTT_PORT
from wzl.utilities import root_logger
from wzl.mqtt import MQTTPublisher
from src.soil.component import Component
from src.http.server import HTTPServer
from src.soil.event import Event, EventSeverity, EventTrigger
from src.soil.stream import FixedJob, ConfigurableJob, EventJob, UpdateJob, MessageScheduler, EventScheduler, UpdateScheduler
from devices.lasertracker.device.enums import StateEnum, ModeEnum
from devices.lasertracker.device.com_lasertracker import COMLasertracker
sys.setswitchinterval(0.0005)
def start(com_lasertracker: COMLasertracker):
# server settings
address = '127.0.0.1'
port = '8000'
main_logger = root_logger.get(__name__)
# set up servers
executor = ThreadPoolExecutor(max_workers=100)
loop = asyncio.get_event_loop()
loop.set_default_executor(executor)
# configure mqtt
mqtt = MQTTPublisher(MQTT_USERNAME)
mqtt.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USERNAME, MQTT_PASSWORD)
# configure messages
message_schedule = []
update_schedule = []
message_schedule += [
FixedJob("COM-Lasertracker/COM-BaseStations/COM-Base/MEA-Position", 1, com_lasertracker._com_basestations._com_base.get_mea_position)]
for child0_uuid in com_lasertracker._com_mobileentities._com_target:
message_schedule += [FixedJob("COM-Lasertracker/COM-MobileEntities/{}/MEA-Position".format(child0_uuid), 1,
com_lasertracker._com_mobileentities._com_target[child0_uuid].get_mea_position)]
message_scheduler = MessageScheduler(loop, message_schedule, [mqtt])
for child0_uuid in com_lasertracker._com_mobileentities._com_target:
update_schedule += [UpdateJob("COM-Lasertracker/COM-MobileEntities/{}/MEA-Quaternion".format(child0_uuid),
com_lasertracker._com_mobileentities._com_target[child0_uuid].get_mea_quaternion)]
update_scheduler = UpdateScheduler(loop, update_schedule, [mqtt])
# configure events
event_schedule = []
event_schedule += [
EventJob("COM-Lasertracker/COM-BaseStations/COM-Base/MEA-Distance", 10, com_lasertracker._com_basestations._com_base.get_mea_distance,
Event(EventSeverity.WARNING, EventTrigger.LARGER, "double", 50, "The target is distance is large. Uncertainty might be high."))]
event_schedule += [
EventJob("COM-Lasertracker/COM-BaseStations/COM-Base/PAR-State", 10, com_lasertracker._com_basestations._com_base.get_par_state,
Event(EventSeverity.ERROR, EventTrigger.EQUALS, "enum", StateEnum.ERROR, "An error occured!"))]
for child0_uuid in com_lasertracker._com_mobileentities._com_target:
event_schedule += [EventJob("COM-Lasertracker/COM-MobileEntities/{}/PAR-Locked".format(child0_uuid), 10,
com_lasertracker._com_mobileentities._com_target[child0_uuid].get_par_locked,
Event(EventSeverity.WARNING, EventTrigger.EQUALS, "bool", False, "Target is not locked!"))]
event_scheduler = EventScheduler(loop, event_schedule, [mqtt])
# configure model
mapping = {}
mapping['COM-Lasertracker'] = {'add': None, 'remove': None}
submapping = mapping['COM-Lasertracker']
submapping['PAR-State'] = {'getter': com_lasertracker.get_par_state, 'setter': com_lasertracker.set_par_state}
submapping['PAR-Manufacturer'] = {'getter': com_lasertracker.get_par_manufacturer, 'setter': None}
submapping['PAR-Version'] = {'getter': com_lasertracker.get_par_version, 'setter': None}
submapping['PAR-Time'] = {'getter': com_lasertracker.get_par_time, 'setter': None}
submapping['FUN-Reset'] = {'method': com_lasertracker.fun_reset, 'signature': {'arguments': {}, 'returns': []}}
submapping['FUN-Shutdown'] = {'method': com_lasertracker.fun_shutdown, 'signature': {'arguments': {}, 'returns': []}}
mapping['COM-Lasertracker']['COM-BaseStations'] = {'add': None, 'remove': None}
submapping = mapping['COM-Lasertracker']['COM-BaseStations']
mapping['COM-Lasertracker']['COM-BaseStations']['COM-Base'] = {'add': None, 'remove': None}
submapping = mapping['COM-Lasertracker']['COM-BaseStations']['COM-Base']
submapping['MEA-Position'] = com_lasertracker._com_basestations._com_base.get_mea_position
submapping['MEA-Quaternion'] = com_lasertracker._com_basestations._com_base.get_mea_quaternion
submapping['MEA-Azimuth'] = com_lasertracker._com_basestations._com_base.get_mea_azimuth
submapping['MEA-Elevation'] = com_lasertracker._com_basestations._com_base.get_mea_elevation
submapping['MEA-Distance'] = com_lasertracker._com_basestations._com_base.get_mea_distance
submapping['MEA-Online'] = com_lasertracker._com_basestations._com_base.get_mea_online
submapping['PAR-State'] = {'getter': com_lasertracker._com_basestations._com_base.get_par_state, 'setter': None}
submapping['PAR-Calibration'] = {'getter': com_lasertracker._com_basestations._com_base.get_par_calibration, 'setter': None}
submapping['PAR-Interval'] = {'getter': com_lasertracker._com_basestations._com_base.get_par_interval,
'setter': com_lasertracker._com_basestations._com_base.set_par_interval}
submapping['FUN-Jog'] = {'method': com_lasertracker._com_basestations._com_base.fun_jog,
'signature': {'arguments': {'ARG-Azimuth': 'arg_azimuth', 'ARG-Elevation': 'arg_elevation'}, 'returns': []}}
submapping['FUN-PointTo'] = {'method': com_lasertracker._com_basestations._com_base.fun_pointto,
'signature': {'arguments': {'ARG-Position': 'arg_position'}, 'returns': []}}
mapping['COM-Lasertracker']['COM-MobileEntities'] = {'add': com_lasertracker._com_mobileentities.add,
'remove': com_lasertracker._com_mobileentities.remove}
submapping = mapping['COM-Lasertracker']['COM-MobileEntities']
for child0_uuid in com_lasertracker._com_mobileentities._com_target:
mapping['COM-Lasertracker']['COM-MobileEntities'][child0_uuid] = {'add': None, 'remove': None}
submapping = mapping['COM-Lasertracker']['COM-MobileEntities'][child0_uuid]
submapping['MEA-Position'] = com_lasertracker._com_mobileentities._com_target[child0_uuid].get_mea_position
submapping['MEA-Quaternion'] = com_lasertracker._com_mobileentities._com_target[child0_uuid].get_mea_quaternion
submapping['PAR-State'] = {'getter': com_lasertracker._com_mobileentities._com_target[child0_uuid].get_par_state, 'setter': None}
submapping['PAR-Mode'] = {'getter': com_lasertracker._com_mobileentities._com_target[child0_uuid].get_par_mode, 'setter': None}
submapping['PAR-Type'] = {'getter': com_lasertracker._com_mobileentities._com_target[child0_uuid].get_par_type, 'setter': None}
submapping['PAR-Calibration'] = {'getter': com_lasertracker._com_mobileentities._com_target[child0_uuid].get_par_calibration, 'setter': None}
submapping['PAR-Locked'] = {'getter': com_lasertracker._com_mobileentities._com_target[child0_uuid].get_par_locked,
'setter': com_lasertracker._com_mobileentities._com_target[child0_uuid].set_par_locked}
submapping['FUN-Reset'] = {'method': com_lasertracker._com_mobileentities._com_target[child0_uuid].fun_reset,
'signature': {'arguments': {}, 'returns': []}}
submapping['FUN-Trigger'] = {'method': com_lasertracker._com_mobileentities._com_target[child0_uuid].fun_trigger,
'signature': {'arguments': {}, 'returns': []}}
model = Component.load('./Lasertracker.json', mapping['COM-Lasertracker'])
http = HTTPServer(loop, address, port, model)
# start servers
main_logger.info("Starting main asynchronous loop")
try:
loop.run_forever()
except:
pass
finally:
loop.close()
import datetime
import random
from device.enums import StateEnum
from device.com_lasertracker import COMLasertracker
class Lasertracker(COMLasertracker):
def __init__(self, device):
COMLasertracker.__init__(self, device)
def get_par_state(self):
if random.random() < 0.5:
self._par_state = StateEnum.OK
else:
self._par_state = StateEnum.WARNING
return self._par_state
def get_par_time(self):
# self._par_time = datetime.datetime.now()
return self._par_time
def fun_reset(self):
# TODO implement
pass
def fun_shutdown(self):
# TODO implement
pass
# -*- coding: utf-8 -*-
from device.device import Device
from device.start import start
from lasertracker import Lasertracker
if __name__ == "__main__":
device = Device()
lasertracker = Lasertracker(device)
start(lasertracker)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment