diff --git a/src/soil/component.py b/src/soil/component.py index 869f3896f90eac723e34e38c4fc56f178ca20069..b80b9519ba53b1f173ed731d4aaa03ba8e131de0 100644 --- a/src/soil/component.py +++ b/src/soil/component.py @@ -414,6 +414,9 @@ class Component(Element): raise ChildNotFoundException('Could not resolve the semantic path.') - - - + @property + def semantic_name(self) -> str: + if self._metadata is None: + return "" + subject = next(self._metadata.subjects(predicate=Namespaces.rdf.type, object=Namespaces.ssn.System)) + return subject.toPython() \ No newline at end of file diff --git a/src/soil/datatype.py b/src/soil/datatype.py index 2ff9e5d363200ec42ed62d98640ac0a34eb54973..d9d50785061299d60e6058e9030ccd244d495b3f 100644 --- a/src/soil/datatype.py +++ b/src/soil/datatype.py @@ -31,4 +31,3 @@ class Datatype(enum.Enum): if legacy_mode: return ["bool", "int", "double", "string", "time", "enum"][self.value] return ["boolean", "int", "float", "string", "time", "enum"][self.value] - diff --git a/src/soil/element.py b/src/soil/element.py index 3fb465723d1f8edcf29df793c0cbe6eb80870d84..c7e332d409b2a48db4cddf891ce8dd939fede3f6 100644 --- a/src/soil/element.py +++ b/src/soil/element.py @@ -6,7 +6,6 @@ from typing import Any, Dict, List import rdflib from .error import ChildNotFoundException -from .semantics import Namespaces from ..utils.constants import BASE_UUID_PATTERN, HTTP_GET from ..utils.error import SerialisationException @@ -122,3 +121,7 @@ class Element(ABC): raise ChildNotFoundException('Could not resolve the semantic path.') + @property + @abstractmethod + def semantic_name(self) -> str: + ... diff --git a/src/soil/event.py b/src/soil/event.py index 00feaa713b29d8828f52f2fa3366427129426bb3..e2100a2810a421b0357f6b3165a5835268564606 100644 --- a/src/soil/event.py +++ b/src/soil/event.py @@ -1,6 +1,5 @@ -from enum import auto, Flag, IntEnum - import datetime +from enum import auto, Flag, IntEnum from .datatype import Datatype diff --git a/src/soil/figure.py b/src/soil/figure.py index d94c2b3c89ebc718df5625a5f05c27af2a4b94d8..c76c0432e5224af169949b816b5d39fbb32c77c3 100644 --- a/src/soil/figure.py +++ b/src/soil/figure.py @@ -1,5 +1,4 @@ import asyncio -import copy import datetime import inspect import time @@ -293,7 +292,8 @@ class Figure(Element, ABC): if isinstance(value, list): blank_node = rdflib.BNode() data_graph.add((blank_node, Namespaces.rdf.rest, Namespaces.rdf.nil)) - data_graph.add((blank_node, Namespaces.rdf.first, Figure.serialize_value(data_graph, value[len(value) - 1]))) + data_graph.add( + (blank_node, Namespaces.rdf.first, Figure.serialize_value(data_graph, value[len(value) - 1]))) for entry in reversed(value[:-1]): new_blank_node = rdflib.BNode() data_graph.add((new_blank_node, Namespaces.rdf.rest, blank_node)) diff --git a/src/soil/function.py b/src/soil/function.py index c9aad47d8710442369b6b3191473cc6f786a58f6..b531f8c88e86d0d7c8a6bb5379be01b24750ec47 100644 --- a/src/soil/function.py +++ b/src/soil/function.py @@ -198,3 +198,7 @@ class Function(Element): def resolve_semantic_path(self, path: List[str]) -> ('Element', str): # This method does nothing intentionally, as we do not have any semantic definition for function raise ChildNotFoundException('Could not resolve the semantic path.') + + @property + def semantic_name(self) -> str: + return "" \ No newline at end of file diff --git a/src/soil/measurement.py b/src/soil/measurement.py index a6aabb757c8d656e99a3a43c545210aeb9f9ea9d..063431cf5dbe4797006a21e9d4cf7e46e645e028 100644 --- a/src/soil/measurement.py +++ b/src/soil/measurement.py @@ -6,6 +6,7 @@ import rdflib from deprecated import deprecated from .datatype import Datatype +from .element import Element from .error import ChildNotFoundException from .figure import Figure from .semantics import Semantics, Namespaces @@ -222,7 +223,7 @@ class Measurement(Figure): raise DeviceException('The provided kind of semantic information cannot be returned.') return result - def resolve_semantic_path(self, path: List[str]) -> ('Element', str): + def resolve_semantic_path(self, path: List[str]) -> (Element, str): try: super().resolve_semantic_path(path) except ChildNotFoundException: @@ -234,3 +235,10 @@ class Measurement(Figure): return self, 'metadata' raise ChildNotFoundException('Could not resolve the semantic path.') + + @property + def semantic_name(self) -> str: + if self._metadata is None: + return "" + subject = next(self._metadata.subjects(predicate=Namespaces.rdf.type, object=Namespaces.sosa.ObservableProperty)) + return subject.toPython() diff --git a/src/soil/parameter.py b/src/soil/parameter.py index a10e7ab079249d1c2b6c09ea83810680766952c0..5c794b3a64c278cbe11258a759842ad8f9304e15 100644 --- a/src/soil/parameter.py +++ b/src/soil/parameter.py @@ -165,4 +165,11 @@ class Parameter(Figure): if path[0] == triples[0][0].split('/')[-1] and len(path) == 1: return self, 'metadata' - raise ChildNotFoundException('Could not resolve the semantic path.') \ No newline at end of file + raise ChildNotFoundException('Could not resolve the semantic path.') + + @property + def semantic_name(self) -> str: + if self._metadata is None: + return "" + subject = next(self._metadata.subjects(predicate=Namespaces.rdf.type, object=Namespaces.ssn.Property)) + return subject.toPython() diff --git a/src/soil/semantics.py b/src/soil/semantics.py index 1f3baeb74b67e98fb09e97739655e2432447931b..00a59c6d053a81e880a847a83009167bfbad3e1a 100644 --- a/src/soil/semantics.py +++ b/src/soil/semantics.py @@ -2,7 +2,6 @@ import rdflib class Semantics(object): - prefix: str = None url: str = None namespace: rdflib.Namespace = None @@ -14,7 +13,6 @@ class Semantics(object): class Namespaces(object): - m4i = rdflib.Namespace('http://w3id.org/nfdi4ing/metadata4ing#') quantitykind = rdflib.Namespace('http://qudt.org/vocab/quantitykind/') qudt = rdflib.Namespace('http://qudt.org/schema/qudt/') diff --git a/src/soil/stream.py b/src/soil/stream.py index 2cba19d9de418745a2c43136a34ecf09daa610a5..d7795113924c3c48415afa19e29512ade0eac1a7 100644 --- a/src/soil/stream.py +++ b/src/soil/stream.py @@ -1,5 +1,6 @@ import datetime import json +import traceback from abc import ABC, abstractmethod from typing import List, Callable, Any, Union, Dict @@ -7,8 +8,11 @@ import rdflib from wzl.mqtt.client import MQTTPublisher from . import figure +from .component import Component # from .component import Component from .event import Event +from .figure import Figure +from .semantics import Namespaces from ..utils import root_logger from ..utils import serialize @@ -87,7 +91,7 @@ class Job(ABC): def stop(self) -> None: self._next = None - def _retrieve_metadata(self, model: 'Component' = None): + def _retrieve_metadata(self, model: Component = None): if model is None: return {} try: @@ -97,15 +101,15 @@ class Job(ABC): return {} return metadata - def _retrieve_semantic_metadata(self, model: 'Component' = None): + def _retrieve_semantic_metadata(self, model: Component = None) -> (str, rdflib.Graph): if model is None: - return rdflib.Graph() + return "", rdflib.Graph() try: uuids = self.topic.split('/') - metadata = model.__getitem__(uuids).serialize_semantics([], 'data') + element = model.__getitem__(uuids) + return element.semantic_name, element.serialize_semantics('data') except Exception: - return rdflib.Graph() - return metadata + return "", rdflib.Graph() def data(self, model: Dict = None) -> Dict: try: @@ -117,25 +121,22 @@ class Job(ABC): except Exception as e: raise JobError() - def semantic_data(self, model: Dict = None) -> Dict: + def semantic_data(self, model: Dict = None) -> (str, rdflib.Graph): try: - data = self._retrieve_semantic_metadata(model) + url, data = self._retrieve_semantic_metadata(model) + measurement_subject = list((data.subjects(predicate=Namespaces.rdf.type, object=Namespaces.soil.Measurement)))[0] # TODO set mqtt topic as identifier - triples = list(self._metadata.triples((None, rdflib.URIRef("http://schema.org/name"), None))) - assert (len(triples) == 1) - triple = triples[0] - - subject = triple[0] - predicate = rdflib.URIRef("http://qudt.org/schema/qudt/value") - object = rdflib.Literal(self.value) + # replace value + data.remove((None, Namespaces.qudt.value, None)) + data.add((measurement_subject, Namespaces.qudt.value, Figure.serialize_value(data, self.value))) - # TODO creating the Measurement + # replace timestamp + data.remove((None, Namespaces.schema.dateCreated, None)) + data.add((measurement_subject, Namespaces.schema.dateCreated, rdflib.Literal(datetime.datetime.now()))) - # data.add(()) - # data['value'] = self.value - # data['timestamp'] = figure.serialize_time(datetime.datetime.now()) - return data + # TODO add the uncertainty/covariance + return url, data except Exception as e: raise JobError() @@ -309,18 +310,31 @@ class StreamScheduler(object): for job in self._schedule: try: if job.is_triggered(now): + # send syntactic data package for publisher in self._publishers: if self._dataformat == 'json': message = json.dumps(job.data(self._model)) elif self._dataformat == 'xml': message = serialize.to_xml(job.type, job.data(self._model)) publisher.publish(job.topic, message, 1) + + # try to send semantic data package + try: + url, semantic_data = job.semantic_data(self._model) + if self._dataformat == 'json': + message = semantic_data.serialize(format='json-ld') + elif self._dataformat == 'xml': + message = semantic_data.serialize(format='xml') + publisher.publish(url, message, 1) + except JobError: + pass + job.schedule() next = job.determine_next(next) except JobError: - pass # logger.error(traceback.format_exc()) # job.stop() + pass if next is None: next = now + datetime.timedelta(seconds=10)