From ee0d9b5a53e8958c0932e3764f21a22644dc9a76 Mon Sep 17 00:00:00 2001 From: Matthias Bodenbenner <m.bodenbenner@wzl-mq.rwth-aachen.de> Date: Tue, 9 Jan 2024 08:04:50 +0100 Subject: [PATCH] implemented publishing semantic data --- src/soil/component.py | 9 ++++--- src/soil/datatype.py | 1 - src/soil/element.py | 5 +++- src/soil/event.py | 3 +-- src/soil/figure.py | 4 +-- src/soil/function.py | 4 +++ src/soil/measurement.py | 10 +++++++- src/soil/parameter.py | 9 ++++++- src/soil/semantics.py | 2 -- src/soil/stream.py | 56 +++++++++++++++++++++++++---------------- 10 files changed, 69 insertions(+), 34 deletions(-) diff --git a/src/soil/component.py b/src/soil/component.py index 869f389..b80b951 100644 --- a/src/soil/component.py +++ b/src/soil/component.py @@ -414,6 +414,9 @@ class Component(Element): raise ChildNotFoundException('Could not resolve the semantic path.') - - - + @property + def semantic_name(self) -> str: + if self._metadata is None: + return "" + subject = next(self._metadata.subjects(predicate=Namespaces.rdf.type, object=Namespaces.ssn.System)) + return subject.toPython() \ No newline at end of file diff --git a/src/soil/datatype.py b/src/soil/datatype.py index 2ff9e5d..d9d5078 100644 --- a/src/soil/datatype.py +++ b/src/soil/datatype.py @@ -31,4 +31,3 @@ class Datatype(enum.Enum): if legacy_mode: return ["bool", "int", "double", "string", "time", "enum"][self.value] return ["boolean", "int", "float", "string", "time", "enum"][self.value] - diff --git a/src/soil/element.py b/src/soil/element.py index 3fb4657..c7e332d 100644 --- a/src/soil/element.py +++ b/src/soil/element.py @@ -6,7 +6,6 @@ from typing import Any, Dict, List import rdflib from .error import ChildNotFoundException -from .semantics import Namespaces from ..utils.constants import BASE_UUID_PATTERN, HTTP_GET from ..utils.error import SerialisationException @@ -122,3 +121,7 @@ class Element(ABC): raise ChildNotFoundException('Could not resolve the semantic path.') + @property + @abstractmethod + def semantic_name(self) -> str: + ... diff --git a/src/soil/event.py b/src/soil/event.py index 00feaa7..e2100a2 100644 --- a/src/soil/event.py +++ b/src/soil/event.py @@ -1,6 +1,5 @@ -from enum import auto, Flag, IntEnum - import datetime +from enum import auto, Flag, IntEnum from .datatype import Datatype diff --git a/src/soil/figure.py b/src/soil/figure.py index d94c2b3..c76c043 100644 --- a/src/soil/figure.py +++ b/src/soil/figure.py @@ -1,5 +1,4 @@ import asyncio -import copy import datetime import inspect import time @@ -293,7 +292,8 @@ class Figure(Element, ABC): if isinstance(value, list): blank_node = rdflib.BNode() data_graph.add((blank_node, Namespaces.rdf.rest, Namespaces.rdf.nil)) - data_graph.add((blank_node, Namespaces.rdf.first, Figure.serialize_value(data_graph, value[len(value) - 1]))) + data_graph.add( + (blank_node, Namespaces.rdf.first, Figure.serialize_value(data_graph, value[len(value) - 1]))) for entry in reversed(value[:-1]): new_blank_node = rdflib.BNode() data_graph.add((new_blank_node, Namespaces.rdf.rest, blank_node)) diff --git a/src/soil/function.py b/src/soil/function.py index c9aad47..b531f8c 100644 --- a/src/soil/function.py +++ b/src/soil/function.py @@ -198,3 +198,7 @@ class Function(Element): def resolve_semantic_path(self, path: List[str]) -> ('Element', str): # This method does nothing intentionally, as we do not have any semantic definition for function raise ChildNotFoundException('Could not resolve the semantic path.') + + @property + def semantic_name(self) -> str: + return "" \ No newline at end of file diff --git a/src/soil/measurement.py b/src/soil/measurement.py index a6aabb7..063431c 100644 --- a/src/soil/measurement.py +++ b/src/soil/measurement.py @@ -6,6 +6,7 @@ import rdflib from deprecated import deprecated from .datatype import Datatype +from .element import Element from .error import ChildNotFoundException from .figure import Figure from .semantics import Semantics, Namespaces @@ -222,7 +223,7 @@ class Measurement(Figure): raise DeviceException('The provided kind of semantic information cannot be returned.') return result - def resolve_semantic_path(self, path: List[str]) -> ('Element', str): + def resolve_semantic_path(self, path: List[str]) -> (Element, str): try: super().resolve_semantic_path(path) except ChildNotFoundException: @@ -234,3 +235,10 @@ class Measurement(Figure): return self, 'metadata' raise ChildNotFoundException('Could not resolve the semantic path.') + + @property + def semantic_name(self) -> str: + if self._metadata is None: + return "" + subject = next(self._metadata.subjects(predicate=Namespaces.rdf.type, object=Namespaces.sosa.ObservableProperty)) + return subject.toPython() diff --git a/src/soil/parameter.py b/src/soil/parameter.py index a10e7ab..5c794b3 100644 --- a/src/soil/parameter.py +++ b/src/soil/parameter.py @@ -165,4 +165,11 @@ class Parameter(Figure): if path[0] == triples[0][0].split('/')[-1] and len(path) == 1: return self, 'metadata' - raise ChildNotFoundException('Could not resolve the semantic path.') \ No newline at end of file + raise ChildNotFoundException('Could not resolve the semantic path.') + + @property + def semantic_name(self) -> str: + if self._metadata is None: + return "" + subject = next(self._metadata.subjects(predicate=Namespaces.rdf.type, object=Namespaces.ssn.Property)) + return subject.toPython() diff --git a/src/soil/semantics.py b/src/soil/semantics.py index 1f3baeb..00a59c6 100644 --- a/src/soil/semantics.py +++ b/src/soil/semantics.py @@ -2,7 +2,6 @@ import rdflib class Semantics(object): - prefix: str = None url: str = None namespace: rdflib.Namespace = None @@ -14,7 +13,6 @@ class Semantics(object): class Namespaces(object): - m4i = rdflib.Namespace('http://w3id.org/nfdi4ing/metadata4ing#') quantitykind = rdflib.Namespace('http://qudt.org/vocab/quantitykind/') qudt = rdflib.Namespace('http://qudt.org/schema/qudt/') diff --git a/src/soil/stream.py b/src/soil/stream.py index 2cba19d..d779511 100644 --- a/src/soil/stream.py +++ b/src/soil/stream.py @@ -1,5 +1,6 @@ import datetime import json +import traceback from abc import ABC, abstractmethod from typing import List, Callable, Any, Union, Dict @@ -7,8 +8,11 @@ import rdflib from wzl.mqtt.client import MQTTPublisher from . import figure +from .component import Component # from .component import Component from .event import Event +from .figure import Figure +from .semantics import Namespaces from ..utils import root_logger from ..utils import serialize @@ -87,7 +91,7 @@ class Job(ABC): def stop(self) -> None: self._next = None - def _retrieve_metadata(self, model: 'Component' = None): + def _retrieve_metadata(self, model: Component = None): if model is None: return {} try: @@ -97,15 +101,15 @@ class Job(ABC): return {} return metadata - def _retrieve_semantic_metadata(self, model: 'Component' = None): + def _retrieve_semantic_metadata(self, model: Component = None) -> (str, rdflib.Graph): if model is None: - return rdflib.Graph() + return "", rdflib.Graph() try: uuids = self.topic.split('/') - metadata = model.__getitem__(uuids).serialize_semantics([], 'data') + element = model.__getitem__(uuids) + return element.semantic_name, element.serialize_semantics('data') except Exception: - return rdflib.Graph() - return metadata + return "", rdflib.Graph() def data(self, model: Dict = None) -> Dict: try: @@ -117,25 +121,22 @@ class Job(ABC): except Exception as e: raise JobError() - def semantic_data(self, model: Dict = None) -> Dict: + def semantic_data(self, model: Dict = None) -> (str, rdflib.Graph): try: - data = self._retrieve_semantic_metadata(model) + url, data = self._retrieve_semantic_metadata(model) + measurement_subject = list((data.subjects(predicate=Namespaces.rdf.type, object=Namespaces.soil.Measurement)))[0] # TODO set mqtt topic as identifier - triples = list(self._metadata.triples((None, rdflib.URIRef("http://schema.org/name"), None))) - assert (len(triples) == 1) - triple = triples[0] - - subject = triple[0] - predicate = rdflib.URIRef("http://qudt.org/schema/qudt/value") - object = rdflib.Literal(self.value) + # replace value + data.remove((None, Namespaces.qudt.value, None)) + data.add((measurement_subject, Namespaces.qudt.value, Figure.serialize_value(data, self.value))) - # TODO creating the Measurement + # replace timestamp + data.remove((None, Namespaces.schema.dateCreated, None)) + data.add((measurement_subject, Namespaces.schema.dateCreated, rdflib.Literal(datetime.datetime.now()))) - # data.add(()) - # data['value'] = self.value - # data['timestamp'] = figure.serialize_time(datetime.datetime.now()) - return data + # TODO add the uncertainty/covariance + return url, data except Exception as e: raise JobError() @@ -309,18 +310,31 @@ class StreamScheduler(object): for job in self._schedule: try: if job.is_triggered(now): + # send syntactic data package for publisher in self._publishers: if self._dataformat == 'json': message = json.dumps(job.data(self._model)) elif self._dataformat == 'xml': message = serialize.to_xml(job.type, job.data(self._model)) publisher.publish(job.topic, message, 1) + + # try to send semantic data package + try: + url, semantic_data = job.semantic_data(self._model) + if self._dataformat == 'json': + message = semantic_data.serialize(format='json-ld') + elif self._dataformat == 'xml': + message = semantic_data.serialize(format='xml') + publisher.publish(url, message, 1) + except JobError: + pass + job.schedule() next = job.determine_next(next) except JobError: - pass # logger.error(traceback.format_exc()) # job.stop() + pass if next is None: next = now + datetime.timedelta(seconds=10) -- GitLab