Skip to content
Snippets Groups Projects
Commit de701715 authored by Matthias Stefan Bodenbenner's avatar Matthias Stefan Bodenbenner
Browse files

implemented possibility to publish messages in functions

parent 16524643
No related branches found
No related tags found
No related merge requests found
[![Build](https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/lava/unified-device-interface/python/badges/master/pipeline.svg)](https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/lava/unified-device-interface/python/commits/master) [![Build](https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/lava/unified-device-interface/python/badges/master/pipeline.svg)](https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/lava/unified-device-interface/python/commits/master)
# Python Unified Device Interface # Python Unified Device Interface
Current stable version: 9.1.2 Current stable version: 9.2.0
## Installation ## Installation
1. Install the WZL-UDI package via pip 1. Install the WZL-UDI package via pip
......
...@@ -4,7 +4,7 @@ with open("README.md", "r", encoding="utf-8") as fh: ...@@ -4,7 +4,7 @@ with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read() long_description = fh.read()
setup(name='wzl-udi', setup(name='wzl-udi',
version='9.1.2', version='9.2.0',
url='https://git-ce.rwth-aachen.de/wzl-mq-public/soil/python', url='https://git-ce.rwth-aachen.de/wzl-mq-public/soil/python',
project_urls={ project_urls={
"Bug Tracker": "https://git-ce.rwth-aachen.de/wzl-mq-public/soil/python/-/issues", "Bug Tracker": "https://git-ce.rwth-aachen.de/wzl-mq-public/soil/python/-/issues",
......
...@@ -10,6 +10,7 @@ from aiohttp import web ...@@ -10,6 +10,7 @@ from aiohttp import web
from aiohttp.web import middleware from aiohttp.web import middleware
from aiohttp.web_request import Request from aiohttp.web_request import Request
from multidict import MultiDict from multidict import MultiDict
from wzl.mqtt import MQTTPublisher
from .error import ServerException from .error import ServerException
from ..soil.component import Component from ..soil.component import Component
...@@ -60,7 +61,7 @@ class HTTPServer(object): ...@@ -60,7 +61,7 @@ class HTTPServer(object):
""" """
def __init__(self, loop: asyncio.AbstractEventLoop, host: str, port: int, model: Component, def __init__(self, loop: asyncio.AbstractEventLoop, host: str, port: int, model: Component,
dataformat: str = 'json', legacy_mode=False, scheduler: StreamScheduler = None): dataformat: str = 'json', legacy_mode=False, scheduler: StreamScheduler = None, publisher: MQTTPublisher = None):
"""Constructor """Constructor
Args: Args:
...@@ -82,6 +83,7 @@ class HTTPServer(object): ...@@ -82,6 +83,7 @@ class HTTPServer(object):
self._dataformat = dataformat self._dataformat = dataformat
self._legacy_mode = legacy_mode self._legacy_mode = legacy_mode
self._scheduler = scheduler self._scheduler = scheduler
self._publisher = publisher
self.app = web.Application(loop=self.loop, middlewares=[cors]) self.app = web.Application(loop=self.loop, middlewares=[cors])
...@@ -223,7 +225,16 @@ class HTTPServer(object): ...@@ -223,7 +225,16 @@ class HTTPServer(object):
if isinstance(item, Function): if isinstance(item, Function):
try: try:
response = await item.invoke(data["arguments"], topic='/'.join(uuids)) if item.publishes:
# generator = item.invoke_generator(data["arguments"])
try:
async for item in item.invoke_generator(data["arguments"]):
self._publisher.publish('/'.join(uuids), json.dumps(item))
response = {}
except StopAsyncIteration:
pass
else:
response = await item.invoke(data["arguments"])
status = 200 status = 200
logger.info('Response: {}'.format(response)) logger.info('Response: {}'.format(response))
except (DeviceException, ServerException, UserException) as e: except (DeviceException, ServerException, UserException) as e:
......
import inspect import inspect
import json
from typing import Any, Dict, List, Union, Callable from typing import Any, Dict, List, Union, Callable
import rdflib import rdflib
...@@ -41,6 +42,10 @@ class Function(Element): ...@@ -41,6 +42,10 @@ class Function(Element):
self._signature = {'arguments': signature_arguments, 'returns': [ret['uuid'] for ret in self._returns]} self._signature = {'arguments': signature_arguments, 'returns': [ret['uuid'] for ret in self._returns]}
# self._mqtt_callback = implementation['mqtt_callback'] if 'mqtt_callback' in implementation.keys() else None # self._mqtt_callback = implementation['mqtt_callback'] if 'mqtt_callback' in implementation.keys() else None
@property
def publishes(self):
return inspect.isasyncgenfunction(self._implementation) or inspect.isgeneratorfunction(self._implementation)
def __getitem__(self, item: Union[str, List[str]], method: int = HTTP_GET) -> Any: def __getitem__(self, item: Union[str, List[str]], method: int = HTTP_GET) -> Any:
if item == "arguments": if item == "arguments":
return self._arguments return self._arguments
...@@ -82,32 +87,8 @@ class Function(Element): ...@@ -82,32 +87,8 @@ class Function(Element):
else: else:
super().__setitem__(key, value) super().__setitem__(key, value)
async def invoke(self, arguments: List[Figure], topic) -> Dict[str, List[Dict[str, Any]]]: def _prepare_invocation_result(self, result: Any) -> Dict[str, List[Dict[str, Any]]]:
returns = {"returns": []} returns = {"returns": []}
args = {}
if self._implementation is None:
raise NotImplementedException(self._uuid, self._name)
for a in arguments:
var = self.__getitem__([a["uuid"]])
Figure.check_all(var.datatype, var.dimension, var.range, a["value"])
args[self._signature['arguments'][a["uuid"]]] = a["value"]
# set up servers
try:
if inspect.iscoroutinefunction(self._implementation):
# if self._mqtt_callback is not None:
# result = await self._implementation(functools.partial(self._mqtt_callback, topic), **args)
# else:
result = await self._implementation(**args)
else:
# if self._mqtt_callback is not None:
# result = self._implementation(functools.partial(self._mqtt_callback, topic), **args)
# else:
result = self._implementation(**args)
except Exception as e:
raise DeviceException(str(e), predecessor=e)
if result is not None: if result is not None:
# if only one element is returned encapsulate result with tuple to make for-loop working # if only one element is returned encapsulate result with tuple to make for-loop working
if len(self._signature['returns']) == 1: if len(self._signature['returns']) == 1:
...@@ -127,9 +108,61 @@ class Function(Element): ...@@ -127,9 +108,61 @@ class Function(Element):
var = var[0] var = var[0]
Figure.check_all(var.datatype, var.dimension, var.range, value) Figure.check_all(var.datatype, var.dimension, var.range, value)
returns['returns'] += [{'uuid': uuid, 'value': value}] returns['returns'] += [{'uuid': uuid, 'value': value}]
return returns return returns
async def invoke_generator(self, arguments: List[Figure]) -> Dict[str, List[Dict[str, Any]]]:
returns = {"returns": []}
args = {}
if self._implementation is None:
raise NotImplementedException(self._uuid, self._name)
for a in arguments:
var = self.__getitem__([a["uuid"]])
Figure.check_all(var.datatype, var.dimension, var.range, a["value"])
args[self._signature['arguments'][a["uuid"]]] = a["value"]
try:
if inspect.isasyncgenfunction(self._implementation):
generator = self._implementation(**args)
while True:
try:
result = await anext(generator)
yield self._prepare_invocation_result(result)
except StopAsyncIteration as e:
raise e
else:
assert inspect.isgeneratorfunction(self._implementation)
generator = self._implementation(**args)
while True:
try:
result = next(generator)
yield self._prepare_invocation_result(result)
except StopIteration as e:
raise e
except Exception as e:
raise DeviceException(str(e), predecessor=e)
async def invoke(self, arguments: List[Figure]) -> Dict[str, List[Dict[str, Any]]]:
args = {}
if self._implementation is None:
raise NotImplementedException(self._uuid, self._name)
for a in arguments:
var = self.__getitem__([a["uuid"]])
Figure.check_all(var.datatype, var.dimension, var.range, a["value"])
args[self._signature['arguments'][a["uuid"]]] = a["value"]
# set up servers
try:
if inspect.iscoroutinefunction(self._implementation):
result = await self._implementation(**args)
else:
result = self._implementation(**args)
except Exception as e:
raise DeviceException(str(e), predecessor=e)
return self._prepare_invocation_result(result)
def serialize(self, keys: List[str], legacy_mode: bool, method: int = HTTP_GET) -> Dict[str, Any]: def serialize(self, keys: List[str], legacy_mode: bool, method: int = HTTP_GET) -> Dict[str, Any]:
if not keys or 'all' in keys: if not keys or 'all' in keys:
keys = ['uuid', 'name', 'description', 'arguments', 'returns', 'ontology'] keys = ['uuid', 'name', 'description', 'arguments', 'returns', 'ontology']
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment