Skip to content
Snippets Groups Projects
Commit d39a60fd authored by Matthias Bodenbenner's avatar Matthias Bodenbenner
Browse files

improved streaming and bug fixes

parent 4256c92f
No related branches found
No related tags found
No related merge requests found
......@@ -63,6 +63,12 @@ Funded by the Deutsche Forschungsgemeinschaft (DFG, German Research Foundation)
## Recent changes
**8.2.0** - 2023-03-16
- improved FAIRness of streaming
- published data contains metadata now
- bug fixes
- serialization of the complete model
**8.1.1** - 2023-03-15
- bug fix
- resolving the methods of the sensor logic for dynamic components
......
......
from setuptools import setup, find_packages
setup(name='wzl-udi',
version='8.1.1',
version='8.2.0',
url='https://git-ce.rwth-aachen.de/wzl-mq-public/soil/python',
author='Matthias Bodenbenner',
author_email='m.bodenbenner@wzl.mq.rwth-aachen.de',
......
......
......@@ -188,7 +188,7 @@ class Component(Element):
dictionary['measurements'] = list(map(lambda x: x.serialize([], legacy_mode), self._measurements))
dictionary['functions'] = list(map(lambda x: x.serialize(['all'], legacy_mode), self._functions))
dictionary['components'] = list(map(lambda x: x.serialize(['all'], legacy_mode), self._components))
dictionary['parameters'] = list(map(lambda x: x.serialize(['all'], legacy_mode), self._parameters))
dictionary['parameters'] = list(map(lambda x: x.serialize([], legacy_mode), self._parameters))
return dictionary
dictionary = super().serialize(keys, legacy_mode, method)
......
......
import datetime
import json
import traceback
from abc import ABC, abstractmethod
from typing import List, Callable, Any, Union, Dict
from wzl.mqtt.client import MQTTPublisher
from wzl.utilities import root_logger
from . import figure
# from .component import Component
from .event import Event
from ..utils import serialize
......@@ -86,10 +86,23 @@ class Job(ABC):
def stop(self) -> None:
self._next = None
@property
def data(self) -> Dict:
def _retrieve_metadata(self, model: 'Component' = None):
if model is None:
return {}
try:
uuids = self.topic.split('/')
metadata = model.__getitem__(uuids).serialize([], False)
except Exception:
return {}
return metadata
def data(self, model: Dict = None) -> Dict:
try:
return {'uuid': self.topic, 'value': self.value}
data = self._retrieve_metadata(model)
data['uuid'] = self.topic
data['value'] = self.value
data['timestamp'] = figure.serialize_time(datetime.datetime.now())
return data
except Exception as e:
raise JobError()
......@@ -165,8 +178,7 @@ class EventJob(FixedJob):
updated = any(updated)
return updated
@property
def data(self) -> Dict:
def data(self, model: Dict = None) -> Dict:
self._event.trigger(self._last_value)
return self._event.serialize()
......@@ -178,7 +190,7 @@ class StreamScheduler(object):
"""
def __init__(self, loop, schedule: List[Job], publishers: List[MQTTPublisher] = None,
start_immediately: bool = False, dataformat: str = 'json'):
start_immediately: bool = False, dataformat: str = 'json', model: 'Component' = None):
"""Constructor.
Args:
......@@ -195,6 +207,7 @@ class StreamScheduler(object):
self._publishers = publishers if publishers is not None else []
self._running = start_immediately
self._dataformat = dataformat
self._model = model
if start_immediately:
self._update()
......@@ -265,15 +278,16 @@ class StreamScheduler(object):
if job.is_triggered(now):
for publisher in self._publishers:
if self._dataformat == 'json':
message = json.dumps(job.data)
message = json.dumps(job.data(self._model))
elif self._dataformat == 'xml':
message = serialize.to_xml(job.type, job.data)
message = serialize.to_xml(job.type, job.data(self._model))
publisher.publish(job.topic, message, 1)
job.schedule()
next = job.determine_next(next)
except JobError:
logger.error(traceback.format_exc())
job.stop()
pass
# logger.error(traceback.format_exc())
# job.stop()
if next is None:
next = now + datetime.timedelta(seconds=10)
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment