Skip to content
Snippets Groups Projects
Commit ee0d9b5a authored by Matthias Stefan Bodenbenner's avatar Matthias Stefan Bodenbenner
Browse files

implemented publishing semantic data

parent e6f44405
Branches
Tags
No related merge requests found
...@@ -414,6 +414,9 @@ class Component(Element): ...@@ -414,6 +414,9 @@ class Component(Element):
raise ChildNotFoundException('Could not resolve the semantic path.') raise ChildNotFoundException('Could not resolve the semantic path.')
@property
def semantic_name(self) -> str:
if self._metadata is None:
return ""
subject = next(self._metadata.subjects(predicate=Namespaces.rdf.type, object=Namespaces.ssn.System))
return subject.toPython()
\ No newline at end of file
...@@ -31,4 +31,3 @@ class Datatype(enum.Enum): ...@@ -31,4 +31,3 @@ class Datatype(enum.Enum):
if legacy_mode: if legacy_mode:
return ["bool", "int", "double", "string", "time", "enum"][self.value] return ["bool", "int", "double", "string", "time", "enum"][self.value]
return ["boolean", "int", "float", "string", "time", "enum"][self.value] return ["boolean", "int", "float", "string", "time", "enum"][self.value]
...@@ -6,7 +6,6 @@ from typing import Any, Dict, List ...@@ -6,7 +6,6 @@ from typing import Any, Dict, List
import rdflib import rdflib
from .error import ChildNotFoundException from .error import ChildNotFoundException
from .semantics import Namespaces
from ..utils.constants import BASE_UUID_PATTERN, HTTP_GET from ..utils.constants import BASE_UUID_PATTERN, HTTP_GET
from ..utils.error import SerialisationException from ..utils.error import SerialisationException
...@@ -122,3 +121,7 @@ class Element(ABC): ...@@ -122,3 +121,7 @@ class Element(ABC):
raise ChildNotFoundException('Could not resolve the semantic path.') raise ChildNotFoundException('Could not resolve the semantic path.')
@property
@abstractmethod
def semantic_name(self) -> str:
...
from enum import auto, Flag, IntEnum
import datetime import datetime
from enum import auto, Flag, IntEnum
from .datatype import Datatype from .datatype import Datatype
......
import asyncio import asyncio
import copy
import datetime import datetime
import inspect import inspect
import time import time
...@@ -293,7 +292,8 @@ class Figure(Element, ABC): ...@@ -293,7 +292,8 @@ class Figure(Element, ABC):
if isinstance(value, list): if isinstance(value, list):
blank_node = rdflib.BNode() blank_node = rdflib.BNode()
data_graph.add((blank_node, Namespaces.rdf.rest, Namespaces.rdf.nil)) data_graph.add((blank_node, Namespaces.rdf.rest, Namespaces.rdf.nil))
data_graph.add((blank_node, Namespaces.rdf.first, Figure.serialize_value(data_graph, value[len(value) - 1]))) data_graph.add(
(blank_node, Namespaces.rdf.first, Figure.serialize_value(data_graph, value[len(value) - 1])))
for entry in reversed(value[:-1]): for entry in reversed(value[:-1]):
new_blank_node = rdflib.BNode() new_blank_node = rdflib.BNode()
data_graph.add((new_blank_node, Namespaces.rdf.rest, blank_node)) data_graph.add((new_blank_node, Namespaces.rdf.rest, blank_node))
......
...@@ -198,3 +198,7 @@ class Function(Element): ...@@ -198,3 +198,7 @@ class Function(Element):
def resolve_semantic_path(self, path: List[str]) -> ('Element', str): def resolve_semantic_path(self, path: List[str]) -> ('Element', str):
# This method does nothing intentionally, as we do not have any semantic definition for function # This method does nothing intentionally, as we do not have any semantic definition for function
raise ChildNotFoundException('Could not resolve the semantic path.') raise ChildNotFoundException('Could not resolve the semantic path.')
@property
def semantic_name(self) -> str:
return ""
\ No newline at end of file
...@@ -6,6 +6,7 @@ import rdflib ...@@ -6,6 +6,7 @@ import rdflib
from deprecated import deprecated from deprecated import deprecated
from .datatype import Datatype from .datatype import Datatype
from .element import Element
from .error import ChildNotFoundException from .error import ChildNotFoundException
from .figure import Figure from .figure import Figure
from .semantics import Semantics, Namespaces from .semantics import Semantics, Namespaces
...@@ -222,7 +223,7 @@ class Measurement(Figure): ...@@ -222,7 +223,7 @@ class Measurement(Figure):
raise DeviceException('The provided kind of semantic information cannot be returned.') raise DeviceException('The provided kind of semantic information cannot be returned.')
return result return result
def resolve_semantic_path(self, path: List[str]) -> ('Element', str): def resolve_semantic_path(self, path: List[str]) -> (Element, str):
try: try:
super().resolve_semantic_path(path) super().resolve_semantic_path(path)
except ChildNotFoundException: except ChildNotFoundException:
...@@ -234,3 +235,10 @@ class Measurement(Figure): ...@@ -234,3 +235,10 @@ class Measurement(Figure):
return self, 'metadata' return self, 'metadata'
raise ChildNotFoundException('Could not resolve the semantic path.') raise ChildNotFoundException('Could not resolve the semantic path.')
@property
def semantic_name(self) -> str:
if self._metadata is None:
return ""
subject = next(self._metadata.subjects(predicate=Namespaces.rdf.type, object=Namespaces.sosa.ObservableProperty))
return subject.toPython()
...@@ -166,3 +166,10 @@ class Parameter(Figure): ...@@ -166,3 +166,10 @@ class Parameter(Figure):
return self, 'metadata' return self, 'metadata'
raise ChildNotFoundException('Could not resolve the semantic path.') raise ChildNotFoundException('Could not resolve the semantic path.')
@property
def semantic_name(self) -> str:
if self._metadata is None:
return ""
subject = next(self._metadata.subjects(predicate=Namespaces.rdf.type, object=Namespaces.ssn.Property))
return subject.toPython()
...@@ -2,7 +2,6 @@ import rdflib ...@@ -2,7 +2,6 @@ import rdflib
class Semantics(object): class Semantics(object):
prefix: str = None prefix: str = None
url: str = None url: str = None
namespace: rdflib.Namespace = None namespace: rdflib.Namespace = None
...@@ -14,7 +13,6 @@ class Semantics(object): ...@@ -14,7 +13,6 @@ class Semantics(object):
class Namespaces(object): class Namespaces(object):
m4i = rdflib.Namespace('http://w3id.org/nfdi4ing/metadata4ing#') m4i = rdflib.Namespace('http://w3id.org/nfdi4ing/metadata4ing#')
quantitykind = rdflib.Namespace('http://qudt.org/vocab/quantitykind/') quantitykind = rdflib.Namespace('http://qudt.org/vocab/quantitykind/')
qudt = rdflib.Namespace('http://qudt.org/schema/qudt/') qudt = rdflib.Namespace('http://qudt.org/schema/qudt/')
......
import datetime import datetime
import json import json
import traceback
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import List, Callable, Any, Union, Dict from typing import List, Callable, Any, Union, Dict
...@@ -7,8 +8,11 @@ import rdflib ...@@ -7,8 +8,11 @@ import rdflib
from wzl.mqtt.client import MQTTPublisher from wzl.mqtt.client import MQTTPublisher
from . import figure from . import figure
from .component import Component
# from .component import Component # from .component import Component
from .event import Event from .event import Event
from .figure import Figure
from .semantics import Namespaces
from ..utils import root_logger from ..utils import root_logger
from ..utils import serialize from ..utils import serialize
...@@ -87,7 +91,7 @@ class Job(ABC): ...@@ -87,7 +91,7 @@ class Job(ABC):
def stop(self) -> None: def stop(self) -> None:
self._next = None self._next = None
def _retrieve_metadata(self, model: 'Component' = None): def _retrieve_metadata(self, model: Component = None):
if model is None: if model is None:
return {} return {}
try: try:
...@@ -97,15 +101,15 @@ class Job(ABC): ...@@ -97,15 +101,15 @@ class Job(ABC):
return {} return {}
return metadata return metadata
def _retrieve_semantic_metadata(self, model: 'Component' = None): def _retrieve_semantic_metadata(self, model: Component = None) -> (str, rdflib.Graph):
if model is None: if model is None:
return rdflib.Graph() return "", rdflib.Graph()
try: try:
uuids = self.topic.split('/') uuids = self.topic.split('/')
metadata = model.__getitem__(uuids).serialize_semantics([], 'data') element = model.__getitem__(uuids)
return element.semantic_name, element.serialize_semantics('data')
except Exception: except Exception:
return rdflib.Graph() return "", rdflib.Graph()
return metadata
def data(self, model: Dict = None) -> Dict: def data(self, model: Dict = None) -> Dict:
try: try:
...@@ -117,25 +121,22 @@ class Job(ABC): ...@@ -117,25 +121,22 @@ class Job(ABC):
except Exception as e: except Exception as e:
raise JobError() raise JobError()
def semantic_data(self, model: Dict = None) -> Dict: def semantic_data(self, model: Dict = None) -> (str, rdflib.Graph):
try: try:
data = self._retrieve_semantic_metadata(model) url, data = self._retrieve_semantic_metadata(model)
measurement_subject = list((data.subjects(predicate=Namespaces.rdf.type, object=Namespaces.soil.Measurement)))[0]
# TODO set mqtt topic as identifier # TODO set mqtt topic as identifier
triples = list(self._metadata.triples((None, rdflib.URIRef("http://schema.org/name"), None))) # replace value
assert (len(triples) == 1) data.remove((None, Namespaces.qudt.value, None))
triple = triples[0] data.add((measurement_subject, Namespaces.qudt.value, Figure.serialize_value(data, self.value)))
subject = triple[0]
predicate = rdflib.URIRef("http://qudt.org/schema/qudt/value")
object = rdflib.Literal(self.value)
# TODO creating the Measurement # replace timestamp
data.remove((None, Namespaces.schema.dateCreated, None))
data.add((measurement_subject, Namespaces.schema.dateCreated, rdflib.Literal(datetime.datetime.now())))
# data.add(()) # TODO add the uncertainty/covariance
# data['value'] = self.value return url, data
# data['timestamp'] = figure.serialize_time(datetime.datetime.now())
return data
except Exception as e: except Exception as e:
raise JobError() raise JobError()
...@@ -309,18 +310,31 @@ class StreamScheduler(object): ...@@ -309,18 +310,31 @@ class StreamScheduler(object):
for job in self._schedule: for job in self._schedule:
try: try:
if job.is_triggered(now): if job.is_triggered(now):
# send syntactic data package
for publisher in self._publishers: for publisher in self._publishers:
if self._dataformat == 'json': if self._dataformat == 'json':
message = json.dumps(job.data(self._model)) message = json.dumps(job.data(self._model))
elif self._dataformat == 'xml': elif self._dataformat == 'xml':
message = serialize.to_xml(job.type, job.data(self._model)) message = serialize.to_xml(job.type, job.data(self._model))
publisher.publish(job.topic, message, 1) publisher.publish(job.topic, message, 1)
# try to send semantic data package
try:
url, semantic_data = job.semantic_data(self._model)
if self._dataformat == 'json':
message = semantic_data.serialize(format='json-ld')
elif self._dataformat == 'xml':
message = semantic_data.serialize(format='xml')
publisher.publish(url, message, 1)
except JobError:
pass
job.schedule() job.schedule()
next = job.determine_next(next) next = job.determine_next(next)
except JobError: except JobError:
pass
# logger.error(traceback.format_exc()) # logger.error(traceback.format_exc())
# job.stop() # job.stop()
pass
if next is None: if next is None:
next = now + datetime.timedelta(seconds=10) next = now + datetime.timedelta(seconds=10)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment