diff --git a/README.md b/README.md index 8c2e4d49b6f909823659ad4eee6c5cd5922cf6f1..ca76c2f68f97a23452397af837bc044b5f445433 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ [](https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/lava/unified-device-interface/python/commits/master) # Python Unified Device Interface -Current stable version: 9.1.2 +Current stable version: 9.2.0 ## Installation 1. Install the WZL-UDI package via pip diff --git a/setup.py b/setup.py index ec90b99d533467a6fd8b60f9f796f0ec9a26335d..5441d0f88521051b31c87dbe66cf46ac5100f57e 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ with open("README.md", "r", encoding="utf-8") as fh: long_description = fh.read() setup(name='wzl-udi', - version='9.1.2', + version='9.2.0', url='https://git-ce.rwth-aachen.de/wzl-mq-public/soil/python', project_urls={ "Bug Tracker": "https://git-ce.rwth-aachen.de/wzl-mq-public/soil/python/-/issues", diff --git a/src/http/server.py b/src/http/server.py index c6a3d2e164cc78fc2304a6d56f87f8de64d1a774..b8fb03079cce21802a696b02d3629b9a534d5696 100644 --- a/src/http/server.py +++ b/src/http/server.py @@ -10,6 +10,7 @@ from aiohttp import web from aiohttp.web import middleware from aiohttp.web_request import Request from multidict import MultiDict +from wzl.mqtt import MQTTPublisher from .error import ServerException from ..soil.component import Component @@ -60,7 +61,7 @@ class HTTPServer(object): """ def __init__(self, loop: asyncio.AbstractEventLoop, host: str, port: int, model: Component, - dataformat: str = 'json', legacy_mode=False, scheduler: StreamScheduler = None): + dataformat: str = 'json', legacy_mode=False, scheduler: StreamScheduler = None, publisher: MQTTPublisher = None): """Constructor Args: @@ -82,6 +83,7 @@ class HTTPServer(object): self._dataformat = dataformat self._legacy_mode = legacy_mode self._scheduler = scheduler + self._publisher = publisher self.app = web.Application(loop=self.loop, middlewares=[cors]) @@ -223,7 +225,16 @@ class HTTPServer(object): if isinstance(item, Function): try: - response = await item.invoke(data["arguments"], topic='/'.join(uuids)) + if item.publishes: + # generator = item.invoke_generator(data["arguments"]) + try: + async for item in item.invoke_generator(data["arguments"]): + self._publisher.publish('/'.join(uuids), json.dumps(item)) + response = {} + except StopAsyncIteration: + pass + else: + response = await item.invoke(data["arguments"]) status = 200 logger.info('Response: {}'.format(response)) except (DeviceException, ServerException, UserException) as e: diff --git a/src/soil/function.py b/src/soil/function.py index fd44b02b2f375c22fd32047b4950609befbc15d1..504a5b381455a54869e5df55be2e83403633f357 100644 --- a/src/soil/function.py +++ b/src/soil/function.py @@ -1,4 +1,5 @@ import inspect +import json from typing import Any, Dict, List, Union, Callable import rdflib @@ -41,6 +42,10 @@ class Function(Element): self._signature = {'arguments': signature_arguments, 'returns': [ret['uuid'] for ret in self._returns]} # self._mqtt_callback = implementation['mqtt_callback'] if 'mqtt_callback' in implementation.keys() else None + @property + def publishes(self): + return inspect.isasyncgenfunction(self._implementation) or inspect.isgeneratorfunction(self._implementation) + def __getitem__(self, item: Union[str, List[str]], method: int = HTTP_GET) -> Any: if item == "arguments": return self._arguments @@ -82,32 +87,8 @@ class Function(Element): else: super().__setitem__(key, value) - async def invoke(self, arguments: List[Figure], topic) -> Dict[str, List[Dict[str, Any]]]: + def _prepare_invocation_result(self, result: Any) -> Dict[str, List[Dict[str, Any]]]: returns = {"returns": []} - args = {} - if self._implementation is None: - raise NotImplementedException(self._uuid, self._name) - - for a in arguments: - var = self.__getitem__([a["uuid"]]) - Figure.check_all(var.datatype, var.dimension, var.range, a["value"]) - args[self._signature['arguments'][a["uuid"]]] = a["value"] - - # set up servers - try: - if inspect.iscoroutinefunction(self._implementation): - # if self._mqtt_callback is not None: - # result = await self._implementation(functools.partial(self._mqtt_callback, topic), **args) - # else: - result = await self._implementation(**args) - else: - # if self._mqtt_callback is not None: - # result = self._implementation(functools.partial(self._mqtt_callback, topic), **args) - # else: - result = self._implementation(**args) - except Exception as e: - raise DeviceException(str(e), predecessor=e) - if result is not None: # if only one element is returned encapsulate result with tuple to make for-loop working if len(self._signature['returns']) == 1: @@ -127,9 +108,61 @@ class Function(Element): var = var[0] Figure.check_all(var.datatype, var.dimension, var.range, value) returns['returns'] += [{'uuid': uuid, 'value': value}] - return returns + async def invoke_generator(self, arguments: List[Figure]) -> Dict[str, List[Dict[str, Any]]]: + returns = {"returns": []} + args = {} + if self._implementation is None: + raise NotImplementedException(self._uuid, self._name) + + for a in arguments: + var = self.__getitem__([a["uuid"]]) + Figure.check_all(var.datatype, var.dimension, var.range, a["value"]) + args[self._signature['arguments'][a["uuid"]]] = a["value"] + + try: + if inspect.isasyncgenfunction(self._implementation): + generator = self._implementation(**args) + while True: + try: + result = await anext(generator) + yield self._prepare_invocation_result(result) + except StopAsyncIteration as e: + raise e + else: + assert inspect.isgeneratorfunction(self._implementation) + generator = self._implementation(**args) + while True: + try: + result = next(generator) + yield self._prepare_invocation_result(result) + except StopIteration as e: + raise e + except Exception as e: + raise DeviceException(str(e), predecessor=e) + + async def invoke(self, arguments: List[Figure]) -> Dict[str, List[Dict[str, Any]]]: + args = {} + if self._implementation is None: + raise NotImplementedException(self._uuid, self._name) + + for a in arguments: + var = self.__getitem__([a["uuid"]]) + Figure.check_all(var.datatype, var.dimension, var.range, a["value"]) + args[self._signature['arguments'][a["uuid"]]] = a["value"] + + # set up servers + try: + if inspect.iscoroutinefunction(self._implementation): + result = await self._implementation(**args) + else: + result = self._implementation(**args) + except Exception as e: + raise DeviceException(str(e), predecessor=e) + + return self._prepare_invocation_result(result) + def serialize(self, keys: List[str], legacy_mode: bool, method: int = HTTP_GET) -> Dict[str, Any]: if not keys or 'all' in keys: keys = ['uuid', 'name', 'description', 'arguments', 'returns', 'ontology']