Skip to content
Snippets Groups Projects
Commit bd1d0fc8 authored by Matthias Stefan Bodenbenner's avatar Matthias Stefan Bodenbenner
Browse files

9.0.0 - finished publishing and providing semantic data

parent ee0d9b5a
Branches semantic
Tags 9.0.0
No related merge requests found
[![Build](https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/lava/unified-device-interface/python/badges/master/pipeline.svg)](https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/lava/unified-device-interface/python/commits/master)
# Python Unified Device Interface
Current stable version: 8.2.5
Current stable version: 9.0.0
## Installation
1. Install the WZL-UDI package via pip
......@@ -48,9 +48,16 @@ https://doi.org/10.1117/12.2527461
The authors acknowledge funding from the LaVA project (Large Volume Applications, contract 17IND03 of the European Metrology Programme for Innovation and Research EMPIR). The EMPIR initiative is co-funded by the European Union’s Horizon 2020 research and innovation programme and the EMPIR Participating States.
Funded by the Deutsche Forschungsgemeinschaft (DFG, German Research Foundation) under Germany's Excellence Strategy – EXC-2023 Internet of Production – 390621612.
Funded by the Deutsche Forschungsgemeinschaft (DFG, German Research Foundation) under Project-ID 432233186 -- AIMS.
## Recent changes
**9.0.0** - 2024-01-10
- added semantic features
- the device can return profiles, metadata and data defined and structured according to semantic web standards using RDF and SHACL
- changed signature of StreamScheduler
- instead of a list of publishers, only one publisher is allowed no
**8.2.5** - 2023-04-17
- relaxed required versions of dependencies to avoid conflicts
......
......@@ -7,4 +7,4 @@ rdflib==7.0.0
sphinx==3.5.2
sphinx-rtd-theme==1.0.0
strict-rfc3339==0.7
wzl-mqtt~=2.5.3
\ No newline at end of file
wzl-mqtt~=2.6.0
\ No newline at end of file
......@@ -4,7 +4,7 @@ with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
setup(name='wzl-udi',
version='8.3.0',
version='9.0.0',
url='https://git-ce.rwth-aachen.de/wzl-mq-public/soil/python',
project_urls={
"Bug Tracker": "https://git-ce.rwth-aachen.de/wzl-mq-public/soil/python/-/issues",
......
import datetime
import json
import traceback
from abc import ABC, abstractmethod
from typing import List, Callable, Any, Union, Dict
import rdflib
from wzl.mqtt.client import MQTTPublisher
from wzl.mqtt.exceptions import ClientNotFoundError
from . import figure
from .component import Component
# from .component import Component
from .event import Event
from .figure import Figure
from .semantics import Namespaces
......@@ -111,7 +110,7 @@ class Job(ABC):
except Exception:
return "", rdflib.Graph()
def data(self, model: Dict = None) -> Dict:
def data(self, model: Component = None) -> Dict:
try:
data = self._retrieve_metadata(model)
data['uuid'] = self.topic
......@@ -121,11 +120,11 @@ class Job(ABC):
except Exception as e:
raise JobError()
def semantic_data(self, model: Dict = None) -> (str, rdflib.Graph):
def semantic_data(self, model: Component = None) -> (str, rdflib.Graph):
try:
url, data = self._retrieve_semantic_metadata(model)
measurement_subject = list((data.subjects(predicate=Namespaces.rdf.type, object=Namespaces.soil.Measurement)))[0]
# TODO set mqtt topic as identifier
measurement_subject = \
list((data.subjects(predicate=Namespaces.rdf.type, object=Namespaces.soil.Measurement)))[0]
# replace value
data.remove((None, Namespaces.qudt.value, None))
......@@ -223,7 +222,7 @@ class StreamScheduler(object):
Periodically, checks the status of scheduled jobs. If a job is triggered, it publishes a message via all publishers handed to the scheduler.
"""
def __init__(self, loop, schedule: List[Job], publishers: List[MQTTPublisher] = None,
def __init__(self, loop, schedule: List[Job], publisher: MQTTPublisher = None,
start_immediately: bool = False, dataformat: str = 'json', model: 'Component' = None):
"""Constructor.
......@@ -237,11 +236,11 @@ class StreamScheduler(object):
raise ValueError('Dataformat must be one of "json" or "xml".')
self._loop = loop
self._schedule = schedule
self._publishers = publishers if publishers is not None else []
self._running = start_immediately
self._dataformat = dataformat
self._model = model
self._schedule: List[Job] = schedule
self._publisher: MQTTPublisher = publisher if publisher is not None else []
self._running: bool = start_immediately
self._dataformat: str = dataformat
self._model: Component = model
if start_immediately:
self._update()
......@@ -269,31 +268,6 @@ class StreamScheduler(object):
for job in jobs_to_remove:
self._schedule.remove(job)
def add_publisher(self, publisher: MQTTPublisher) -> None:
"""Adds the given publisher to the list of publishers.
Args:
publisher: MQTT publisher to be added to the list of publishers.
"""
exists = False
for pub in self._publishers:
if pub.name == publisher.name:
exists = True
break
if not exists:
self._publishers += [publisher]
def remove_publisher(self, name: str) -> None:
"""Removes the publisher having the given name form the list publishers.
Args:
name: Name of the publisher to be removed.
"""
index = [i for i, p in enumerate(self._publishers) if p.name == name]
assert (len(index) < 2)
if len(index) == 1:
del self._publishers[index[0]]
def _update(self) -> None:
"""Processes all scheduled jobs.
......@@ -311,12 +285,15 @@ class StreamScheduler(object):
try:
if job.is_triggered(now):
# send syntactic data package
for publisher in self._publishers:
if self._dataformat == 'json':
message = json.dumps(job.data(self._model))
elif self._dataformat == 'xml':
message = serialize.to_xml(job.type, job.data(self._model))
publisher.publish(job.topic, message, 1)
try:
self._publisher.get('tier1').publish(job.topic, message, 1)
except ClientNotFoundError:
self._publisher.publish(job.topic, message, 1)
# try to send semantic data package
try:
......@@ -325,7 +302,12 @@ class StreamScheduler(object):
message = semantic_data.serialize(format='json-ld')
elif self._dataformat == 'xml':
message = semantic_data.serialize(format='xml')
publisher.publish(url, message, 1)
try:
self._publisher.get('tier2').publish(url, message, 1)
except ClientNotFoundError:
self._publisher.publish(url, message, 1)
except JobError:
pass
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment