diff --git a/README.md b/README.md index a1884378f11cc4c4501ec9d3831f207369c091c2..ab8bcae307b52dc2bc6e1ee0f777173713b7b1d8 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ [](https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/lava/unified-device-interface/python/commits/master) # Python Unified Device Interface -Current stable version: 8.2.5 +Current stable version: 9.0.0 ## Installation 1. Install the WZL-UDI package via pip @@ -48,9 +48,16 @@ https://doi.org/10.1117/12.2527461 The authors acknowledge funding from the LaVA project (Large Volume Applications, contract 17IND03 of the European Metrology Programme for Innovation and Research EMPIR). The EMPIR initiative is co-funded by the European Union’s Horizon 2020 research and innovation programme and the EMPIR Participating States. Funded by the Deutsche Forschungsgemeinschaft (DFG, German Research Foundation) under Germany's Excellence Strategy – EXC-2023 Internet of Production – 390621612. +Funded by the Deutsche Forschungsgemeinschaft (DFG, German Research Foundation) under Project-ID 432233186 -- AIMS. ## Recent changes +**9.0.0** - 2024-01-10 + - added semantic features + - the device can return profiles, metadata and data defined and structured according to semantic web standards using RDF and SHACL + - changed signature of StreamScheduler + - instead of a list of publishers, only one publisher is allowed no + **8.2.5** - 2023-04-17 - relaxed required versions of dependencies to avoid conflicts diff --git a/requirements.txt b/requirements.txt index 345d0acecfdaa2ac3dc4ae8f7cadbfc426d80f43..042289c3677d361567cbd9fec5c4b022b474ba22 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,4 @@ rdflib==7.0.0 sphinx==3.5.2 sphinx-rtd-theme==1.0.0 strict-rfc3339==0.7 -wzl-mqtt~=2.5.3 \ No newline at end of file +wzl-mqtt~=2.6.0 \ No newline at end of file diff --git a/setup.py b/setup.py index 96dc1e2d6bbbf930b9ddd13c788e5291d7bb7327..c019b33b380c662a58cf79946294701534df8286 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ with open("README.md", "r", encoding="utf-8") as fh: long_description = fh.read() setup(name='wzl-udi', - version='8.3.0', + version='9.0.0', url='https://git-ce.rwth-aachen.de/wzl-mq-public/soil/python', project_urls={ "Bug Tracker": "https://git-ce.rwth-aachen.de/wzl-mq-public/soil/python/-/issues", diff --git a/src/soil/stream.py b/src/soil/stream.py index d7795113924c3c48415afa19e29512ade0eac1a7..8675e012641d34a450f1679d5370e1b4e7dd8147 100644 --- a/src/soil/stream.py +++ b/src/soil/stream.py @@ -1,15 +1,14 @@ import datetime import json -import traceback from abc import ABC, abstractmethod from typing import List, Callable, Any, Union, Dict import rdflib from wzl.mqtt.client import MQTTPublisher +from wzl.mqtt.exceptions import ClientNotFoundError from . import figure from .component import Component -# from .component import Component from .event import Event from .figure import Figure from .semantics import Namespaces @@ -111,7 +110,7 @@ class Job(ABC): except Exception: return "", rdflib.Graph() - def data(self, model: Dict = None) -> Dict: + def data(self, model: Component = None) -> Dict: try: data = self._retrieve_metadata(model) data['uuid'] = self.topic @@ -121,11 +120,11 @@ class Job(ABC): except Exception as e: raise JobError() - def semantic_data(self, model: Dict = None) -> (str, rdflib.Graph): + def semantic_data(self, model: Component = None) -> (str, rdflib.Graph): try: url, data = self._retrieve_semantic_metadata(model) - measurement_subject = list((data.subjects(predicate=Namespaces.rdf.type, object=Namespaces.soil.Measurement)))[0] - # TODO set mqtt topic as identifier + measurement_subject = \ + list((data.subjects(predicate=Namespaces.rdf.type, object=Namespaces.soil.Measurement)))[0] # replace value data.remove((None, Namespaces.qudt.value, None)) @@ -223,7 +222,7 @@ class StreamScheduler(object): Periodically, checks the status of scheduled jobs. If a job is triggered, it publishes a message via all publishers handed to the scheduler. """ - def __init__(self, loop, schedule: List[Job], publishers: List[MQTTPublisher] = None, + def __init__(self, loop, schedule: List[Job], publisher: MQTTPublisher = None, start_immediately: bool = False, dataformat: str = 'json', model: 'Component' = None): """Constructor. @@ -237,11 +236,11 @@ class StreamScheduler(object): raise ValueError('Dataformat must be one of "json" or "xml".') self._loop = loop - self._schedule = schedule - self._publishers = publishers if publishers is not None else [] - self._running = start_immediately - self._dataformat = dataformat - self._model = model + self._schedule: List[Job] = schedule + self._publisher: MQTTPublisher = publisher if publisher is not None else [] + self._running: bool = start_immediately + self._dataformat: str = dataformat + self._model: Component = model if start_immediately: self._update() @@ -269,31 +268,6 @@ class StreamScheduler(object): for job in jobs_to_remove: self._schedule.remove(job) - def add_publisher(self, publisher: MQTTPublisher) -> None: - """Adds the given publisher to the list of publishers. - - Args: - publisher: MQTT publisher to be added to the list of publishers. - """ - exists = False - for pub in self._publishers: - if pub.name == publisher.name: - exists = True - break - if not exists: - self._publishers += [publisher] - - def remove_publisher(self, name: str) -> None: - """Removes the publisher having the given name form the list publishers. - - Args: - name: Name of the publisher to be removed. - """ - index = [i for i, p in enumerate(self._publishers) if p.name == name] - assert (len(index) < 2) - if len(index) == 1: - del self._publishers[index[0]] - def _update(self) -> None: """Processes all scheduled jobs. @@ -311,25 +285,33 @@ class StreamScheduler(object): try: if job.is_triggered(now): # send syntactic data package - for publisher in self._publishers: + if self._dataformat == 'json': + message = json.dumps(job.data(self._model)) + elif self._dataformat == 'xml': + message = serialize.to_xml(job.type, job.data(self._model)) + + try: + self._publisher.get('tier1').publish(job.topic, message, 1) + except ClientNotFoundError: + self._publisher.publish(job.topic, message, 1) + + # try to send semantic data package + try: + url, semantic_data = job.semantic_data(self._model) if self._dataformat == 'json': - message = json.dumps(job.data(self._model)) + message = semantic_data.serialize(format='json-ld') elif self._dataformat == 'xml': - message = serialize.to_xml(job.type, job.data(self._model)) - publisher.publish(job.topic, message, 1) + message = semantic_data.serialize(format='xml') - # try to send semantic data package try: - url, semantic_data = job.semantic_data(self._model) - if self._dataformat == 'json': - message = semantic_data.serialize(format='json-ld') - elif self._dataformat == 'xml': - message = semantic_data.serialize(format='xml') - publisher.publish(url, message, 1) - except JobError: - pass - - job.schedule() + self._publisher.get('tier2').publish(url, message, 1) + except ClientNotFoundError: + self._publisher.publish(url, message, 1) + + except JobError: + pass + + job.schedule() next = job.determine_next(next) except JobError: # logger.error(traceback.format_exc())