Skip to content
Snippets Groups Projects
Select Git revision
  • c06bd998dd75ab178f81445e77178f3ce10f617c
  • 5.4 default protected
  • 5.5
  • dev/5.5
  • dev/5.4
  • dev/5.3_downgrade
  • feature/experimenttime_hack
  • 5.3 protected
  • _IntenSelect5.3
  • IntenSelect5.3
  • 4.27 protected
  • 4.26 protected
  • 5.0 protected
  • 4.22 protected
  • 4.21 protected
  • UE5.4-2024.1
  • UE5.4-2024.1-rc1
  • UE5.3-2023.1-rc3
  • UE5.3-2023.1-rc2
  • UE5.3-2023.1-rc
20 results

BP_IntenselectableObject.uasset

Blame
  • server.py 19.22 KiB
    # -*- coding: utf-8 -*-
    import asyncio
    import functools
    import json
    import os
    import traceback
    from typing import Dict, Union
    
    import rdflib
    from aiohttp import web
    from aiohttp.web import middleware
    from aiohttp.web_request import Request
    from multidict import MultiDict
    from wzl.mqtt import MQTTPublisher
    
    from .error import ServerException
    from ..soil.component import Component
    from ..soil.element import Element
    from ..soil.error import InvokationException, ReadOnlyException, ChildNotFoundException
    from ..soil.function import Function
    from ..soil.measurement import Measurement
    from ..soil.parameter import Parameter
    from ..soil.semantics import Semantics, Namespaces
    from ..soil.stream import StreamScheduler
    from ..soil.variable import Variable
    from ..utils import root_logger
    from ..utils import serialize
    from ..utils.constants import BASE_UUID_PATTERN, HTTP_GET, HTTP_OPTIONS
    from ..utils.error import DeviceException, UserException, PathResolutionException
    from ..utils.resources import ResourceType
    
    logger = root_logger.get(__name__)
    
    
    @middleware
    async def cors(request, handler):
        logger.info("CORS Middleware handles request from {}".format(request.url))
        logger.debug('Request Headers: {}'.format(request.headers))
        response = web.Response()
        # check if the request is a preflight request
        if 'Access-Control-Request-Method' in request.headers and request.headers['Access-Control-Request-Method'] in [
            "POST", "PATCH"]:
            response.headers.update({'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, PATCH',
                                     'Access-Control-Allow-Headers': request.headers['Access-Control-Request-Headers']})
            response.headers.update({'Access-Control-Allow-Origin': request.headers['Origin']})
            logger.debug('Preflight Response Headers :{}'.format(response.headers))
            return response
        try:
            response = await handler(request)
        except Exception as e:
            logger.error(traceback.format_exc())
            response = web.json_response({"description": str(e)}, status=500)
            logger.error(["[CORS] {} at {}".format(str(e), request.url)])
        finally:
            response.headers.update({'Access-Control-Allow-Origin': request.headers.get('Origin', "*")})
            logger.debug('Response Headers :{}'.format(response.headers))
            return response
    
    
    class HTTPServer(object):
        """Provides a web application as of the aiohttp-library.
    
        """
    
        def __init__(self, loop: asyncio.AbstractEventLoop, host: str, port: int, model: Component,
                     dataformat: str = 'json', legacy_mode=False, scheduler: StreamScheduler = None,
                     publisher: MQTTPublisher = None, profiles_path: str = None):
            """Constructor
    
            Args:
                loop: The asyncio-event-loop used to execute the server.
                host: Hostname of the server, i.e. a URL such as 'localhost'.
                port: Port the server should run at.
                model: The root component of the SOIL model, should be initialized via Component.load(...)
                dataformat: String specifying the dataformat of the responses of the server, either 'json' (default) or 'xml'.
                legacy_mode: If true, the datatypes are serialized to "bool" and "float" (instead of "boolean" and "float").
                scheduler: Stream handler. Is required to hand over jobs of streams and events for dynamic components, if created at runtime.
            """
            if dataformat not in ['json', 'xml']:
                raise ValueError('Dataformat must be one of "json" or "xml".')
    
            self.loop = loop
            self.host = host
            self.port = port
            self.root = model
            self._dataformat = dataformat
            self._legacy_mode = legacy_mode
            self._scheduler = scheduler
            self._publisher = publisher
            self._profiles_path = profiles_path
    
            self.app = web.Application(loop=self.loop, middlewares=[cors])
    
            # define two routes for each request to make the 'objects' part optional
            self.app.router.add_get(r'/objects{uuids:(/' + BASE_UUID_PATTERN + r')*/?}', self.get)
            self.app.router.add_get(r'/{uuids:(' + BASE_UUID_PATTERN + r'($|/))*}', self.get)
            self.app.router.add_post(r'/objects{uuids:(/' + BASE_UUID_PATTERN + r')*/?}', self.post)
            self.app.router.add_post(r'/{uuids:(' + BASE_UUID_PATTERN + r'($|/))*}', self.post)
            self.app.router.add_delete(r'/objects{uuids:(/' + BASE_UUID_PATTERN + r')*/?}', self.delete)
            self.app.router.add_delete(r'/{uuids:(' + BASE_UUID_PATTERN + r'($|/))*}', self.delete)
            self.app.router.add_route('OPTIONS', r'/objects{uuids:(/' + BASE_UUID_PATTERN + r')*/?}', self.options)
            self.app.router.add_route('OPTIONS', r'/{uuids:(' + BASE_UUID_PATTERN + r'($|/))*}', self.options)
            self.app.router.add_put(r'/objects{uuids:(/' + BASE_UUID_PATTERN + r')*/?}', self.put)
            self.app.router.add_put(r'/{uuids:(' + BASE_UUID_PATTERN + r'($|/))*}', self.put)
            self.app.router.add_patch(r'/objects{uuids:(/' + BASE_UUID_PATTERN + r')*/?}', self.patch)
            self.app.router.add_patch(r'/{uuids:(' + BASE_UUID_PATTERN + r'($|/))*}', self.patch)
            web.run_app(self.app, host=self.host, port=self.port, loop=loop)
            logger.info('HTTP-Server serving on {}:{}'.format(host, port))
    
        @staticmethod
        def analyze_request_url(request: Request) -> ResourceType:
            assert request.url.parts[0] == '/'
    
            url_parts = request.url.parts
            if url_parts[-1] == '':
                url_parts = url_parts[:-1]
    
            if len(url_parts) == 3 and url_parts[1] == Semantics.prefix:
                return ResourceType.profile if 'Profile' in url_parts[-1] else ResourceType.metadata
    
            if request.url.path == '/' or url_parts[-1][:3] in ['COM', 'FUN', 'PAR', 'MEA', 'ARG', 'RET']:
                return ResourceType.element
    
            return ResourceType.metadata
    
        @staticmethod
        def parse_uuids(request: Request):
            """Splits the request URL to extract the FQID of the targeted element of the SOIL-Interface.
    
            Args:
                request:
    
            Returns:
    
            """
            uuids = request.match_info.get('uuids', 'uuids')
            uuid_list = uuids.split('/')
            while '' in uuid_list:
                uuid_list.remove('')
            return uuid_list
    
        def _filter_query(self, query: MultiDict):
            queried_attributes = []
            for key in query:
                if key in ['uuid', 'name', 'description', 'datatype', 'range', 'value', 'constant', 'timestamp',
                           'dimension', 'unit', 'covariance', 'label', 'children', 'arguments', 'returns', 'ontology']:
                    queried_attributes += [key]
            return queried_attributes
    
        def prepare_response(self, body: Union[Dict, rdflib.Graph], element: Element, status: int = 200,
                             query: MultiDict = None, semantic: bool = False):
            dataformat = self._dataformat
            if query is not None and 'format' in query and query['format'] in ['json', 'xml', 'turtle']:
                dataformat = query['format']
            if query is not None and 'semantic' in query and query['semantic'] in ['data', 'metadata', 'profile']:
                semantic = True
    
            if dataformat == 'json':
                if semantic:
                    assert isinstance(body, rdflib.Graph)
                    # rdflib serialization returns a string, so we need to parse it as plain json again to return it properly
                    body = json.loads(body.serialize(format='json-ld'))
                return web.json_response(body, status=status)
            elif dataformat == 'xml':
                if semantic:
                    assert isinstance(body, rdflib.Graph)
                    xml = body.serialize(format='xml')
                else:
                    if element is not None and 200 <= status <= 300:
                        root = ''
                        if isinstance(element, Component):
                            root = 'component'
                        elif isinstance(element, Function):
                            root = 'function'
                        elif isinstance(element, Measurement):
                            root = 'measurement'
                        elif isinstance(element, Parameter):
                            root = 'parameter'
                    else:
                        root = 'error'
                    xml = serialize.to_xml(root, body)
                return web.Response(text=xml, status=status, content_type='application/xml')
            elif dataformat == 'turtle':
                if semantic:
                    assert isinstance(body, rdflib.Graph)
                    text = body.serialize(format='turtle')
                else:
                    return web.Response(text='Can not serialize non semantic information to Turtle', status=400,
                                        content_type='text/plain')
                return web.Response(text=text, status=status, content_type=' text/plain')
    
        async def get(self, request):
            logger.info("GET Request from {}".format(request.url))
            logger.debug('Request: {}'.format(request))
            logger.debug('Query Parameters: {}'.format(request.query_string))
    
            resource_type = HTTPServer.analyze_request_url(request)
            keys = self._filter_query(request.query)
    
            try:
                if resource_type == ResourceType.profile:
                    if self._profiles_path is None:
                        raise UserException('Can\'t return requested metadata profile, as no profiles have been created for this sensor service.')
    
                    profilename = request.url.parts[-2] if request.url.parts[-1] == '' else request.url.parts[-1]
    
                    if len(profilename) > 12 and profilename[-12:-7] == 'Range':
                        filename = profilename.replace('RangeProfile', '.shacl.ttl')
                    else:
                        filename = profilename.replace('Profile', '.shacl.ttl')
    
                    profile_path = os.path.join(self._profiles_path, filename)
                    response = rdflib.Graph()
                    response.parse(profile_path)
                    response.add(
                        (rdflib.URIRef(Semantics.namespace[profilename]), Namespaces.dcterms.license, Semantics.profile_license))
                    item, status = None, 200
    
                elif resource_type == ResourceType.metadata or resource_type == ResourceType.data:
                    semantic_name = request.url.parts[-2] if request.url.parts[-1] == '' else request.url.parts[-1]
    
                    item, resource_type = self.root.resolve_semantic_path(semantic_name)
                    recursive = request.query is not None and 'all' in request.query
                    response = item.serialize_semantics(resource_type, recursive)
                else:
                    assert resource_type == ResourceType.element
                    uuids = HTTPServer.parse_uuids(request)
    
                    if request.query is not None and 'semantic' in request.query and request.query[
                        'semantic'] in ResourceType.semantic_resources:
                        resource_type = ResourceType.from_string(request.query['semantic'])
    
                    try:
                        item = self.root[uuids]
    
                        if resource_type.is_semantic():
                            recursive = request.query is not None and 'all' in request.query
                            response = item.serialize_semantics(resource_type, recursive)
                        else:
                            response = item.serialize(keys, self._legacy_mode, HTTP_GET)
                    except KeyError as e:
                        logger.error(traceback.format_exc())
                        response = {'error': str(e)}
                        logger.error('Response: {}'.format(response))
                        return self.prepare_response(response, None, status=404, query=request.query)
    
                status = 200
                logger.info('Response: {}'.format(response))
            except (DeviceException, ServerException, UserException) as e:
                logger.error(traceback.format_exc())
                response = {'error': str(e)}
                status = 500
                logger.error('Response: {}'.format(response))
    
            return self.prepare_response(response, item, status=status, query=request.query, semantic=resource_type.is_semantic())
    
        async def post(self, request):
            logger.info("POST Request from {}".format(request.url))
            logger.debug('Request: {}'.format(request))
            data = await request.json()
            logger.debug('Body: {}'.format(data))
            uuids = HTTPServer.parse_uuids(request)
    
            try:
                item = self.root[uuids]
            except KeyError as e:
                logger.error(traceback.format_exc())
                response = {'error': str(e)}
                logger.error('Response: {}'.format(response))
                return self.prepare_response(response, None, status=404, query=request.query)
    
            if isinstance(item, Function):
                try:
                    if item.publishes:
                        try:
                            async for item in item.invoke_generator(data["arguments"], legacy_mode=self._legacy_mode):
                                self._publisher.publish('/'.join(uuids), json.dumps(item))
                        except RuntimeError as e:
                            if not isinstance(e.__cause__, StopAsyncIteration):
                                raise e
                        response = {}
                    else:
                        response = await item.invoke(data["arguments"], legacy_mode=self._legacy_mode)
                    status = 200
                    logger.info('Response: {}'.format(response))
                except (DeviceException, ServerException, UserException) as e:
                    logger.error(traceback.format_exc())
                    response = {'error': str(e)}
                    status = 500
                    logger.error('Response: {}'.format(response))
            else:
                response, status = {}, 405
                logger.error('Response: {}'.format(response))
    
            return self.prepare_response(response, item, status=status, query=request.query)
    
        async def delete(self, request):
            logger.info("DELETE Request from {}".format(request.url))
            logger.debug('Request: {}'.format(request))
            uuids = HTTPServer.parse_uuids(request)
    
            try:
                item = self.root[uuids[:-1]]
            except KeyError as e:
                logger.error(traceback.format_exc())
                response = {'error': str(e)}
                logger.error('Response: {}'.format(response))
                return self.prepare_response(response, None, status=404, query=request.query)
    
            if not isinstance(item, Component):
                return self.prepare_response({}, None, status=405, query=request.query)
            try:
                await self.loop.run_in_executor(None, functools.partial(item.remove, uuids[-1]))
    
                if self._scheduler is not None:
                    self._scheduler.remove_jobs('/'.join(uuids))
                status = 200
                # logger.info('Response: {}'.format(response))
            except ChildNotFoundException as e:
                logger.error(traceback.format_exc())
                response = {'error': str(e)}
                status = 404
                logger.error('Response: {}'.format(response))
            except (DeviceException, ServerException, UserException) as e:
                logger.error(traceback.format_exc())
                response = {'error': str(e)}
                status = 500
                logger.error('Response: {}'.format(response))
            return self.prepare_response(response, item, status=status, query=request.query)
    
        async def options(self, request):
            logger.info("HEAD Request from {}".format(request.url))
            logger.debug('Request: {}'.format(request))
            logger.debug('Query Parameters: {}'.format(request.query_string))
            keys = self._filter_query(request.query)
    
            try:
                item = self.root[HTTPServer.parse_uuids(request)]
            except KeyError as e:
                logger.error(traceback.format_exc())
                response = {'error': str(e)}
                logger.error('Response: {}'.format(response))
                return self.prepare_response(response, None, status=404, query=request.query)
    
            if not isinstance(item, Variable):
                return self.prepare_response({}, None, status=405, query=request.query)
    
            response = item.serialize(keys, self._legacy_mode, HTTP_OPTIONS)
            logger.info('Response: {}'.format(response))
            return self.prepare_response(response, item, query=request.query)
    
        async def patch(self, request):
            logger.info("PATCH Request from {}".format(request.url))
            logger.debug('Request: {}'.format(request))
            data = await request.json()
            logger.debug('Body: {}'.format(data))
    
            try:
                item = self.root[HTTPServer.parse_uuids(request)]
            except KeyError as e:
                logger.error(traceback.format_exc())
                response = {'error': str(e)}
                logger.error('Response: {}'.format(response))
                return self.prepare_response(response, None, status=404, query=request.query)
    
            if isinstance(item, Parameter):
                try:
                    response = await self.loop.run_in_executor(None, item.set, data["value"])
                    status = 200
                    logger.info('Response: {}'.format(response))
                except ReadOnlyException as e:
                    logger.error(traceback.format_exc())
                    response = {'error': str(e)}
                    status = 403
                    logger.error('Response: {}'.format(response))
                except InvokationException as e:
                    logger.error(traceback.format_exc())
                    response = {'error': str(e)}
                    status = 500
                    logger.error('Response: {}'.format(response))
            else:
                response, status = {}, 405
                logger.error('Response: {}'.format(response))
            return self.prepare_response(response, item, status=status, query=request.query)
    
        async def put(self, request):
            logger.info("PUT Request from {}".format(request.url))
            logger.debug('Request: {}'.format(request))
            data = await request.json()
            logger.debug('Body: {}'.format(data))
            uuids = HTTPServer.parse_uuids(request)
    
            try:
                item = self.root[uuids[:-1]]
            except KeyError as e:
                logger.error(traceback.format_exc())
                response = {'error': str(e)}
                logger.error('Response: {}'.format(response))
                return self.prepare_response(response, None, status=404, query=request.query)
    
            if not isinstance(item, Component):
                return self.prepare_response({}, None, status=405, query=request.query)
            try:
                implementation = await self.loop.run_in_executor(None,
                                                                 functools.partial(item.add, uuids[-1], data['class_name'],
                                                                                   data['json_file'], *data['args'],
                                                                                   **data['kwargs']))
                if self._scheduler is not None:
                    self._scheduler.add_jobs(implementation.streams('/'.join(uuids)))
                status = 200
                response = {}
                # logger.info('Response: {}'.format(response))
            except (DeviceException, ServerException, UserException) as e:
                logger.error(traceback.format_exc())
                response = {'error': str(e)}
                status = 500
                logger.error('Response: {}'.format(response))
            return self.prepare_response(response, item, status=status, query=request.query)