Select Git revision
BP_IntenselectableObject.uasset
-
David Gilbert authoredDavid Gilbert authored
server.py 19.22 KiB
# -*- coding: utf-8 -*-
import asyncio
import functools
import json
import os
import traceback
from typing import Dict, Union
import rdflib
from aiohttp import web
from aiohttp.web import middleware
from aiohttp.web_request import Request
from multidict import MultiDict
from wzl.mqtt import MQTTPublisher
from .error import ServerException
from ..soil.component import Component
from ..soil.element import Element
from ..soil.error import InvokationException, ReadOnlyException, ChildNotFoundException
from ..soil.function import Function
from ..soil.measurement import Measurement
from ..soil.parameter import Parameter
from ..soil.semantics import Semantics, Namespaces
from ..soil.stream import StreamScheduler
from ..soil.variable import Variable
from ..utils import root_logger
from ..utils import serialize
from ..utils.constants import BASE_UUID_PATTERN, HTTP_GET, HTTP_OPTIONS
from ..utils.error import DeviceException, UserException, PathResolutionException
from ..utils.resources import ResourceType
logger = root_logger.get(__name__)
@middleware
async def cors(request, handler):
logger.info("CORS Middleware handles request from {}".format(request.url))
logger.debug('Request Headers: {}'.format(request.headers))
response = web.Response()
# check if the request is a preflight request
if 'Access-Control-Request-Method' in request.headers and request.headers['Access-Control-Request-Method'] in [
"POST", "PATCH"]:
response.headers.update({'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, PATCH',
'Access-Control-Allow-Headers': request.headers['Access-Control-Request-Headers']})
response.headers.update({'Access-Control-Allow-Origin': request.headers['Origin']})
logger.debug('Preflight Response Headers :{}'.format(response.headers))
return response
try:
response = await handler(request)
except Exception as e:
logger.error(traceback.format_exc())
response = web.json_response({"description": str(e)}, status=500)
logger.error(["[CORS] {} at {}".format(str(e), request.url)])
finally:
response.headers.update({'Access-Control-Allow-Origin': request.headers.get('Origin', "*")})
logger.debug('Response Headers :{}'.format(response.headers))
return response
class HTTPServer(object):
"""Provides a web application as of the aiohttp-library.
"""
def __init__(self, loop: asyncio.AbstractEventLoop, host: str, port: int, model: Component,
dataformat: str = 'json', legacy_mode=False, scheduler: StreamScheduler = None,
publisher: MQTTPublisher = None, profiles_path: str = None):
"""Constructor
Args:
loop: The asyncio-event-loop used to execute the server.
host: Hostname of the server, i.e. a URL such as 'localhost'.
port: Port the server should run at.
model: The root component of the SOIL model, should be initialized via Component.load(...)
dataformat: String specifying the dataformat of the responses of the server, either 'json' (default) or 'xml'.
legacy_mode: If true, the datatypes are serialized to "bool" and "float" (instead of "boolean" and "float").
scheduler: Stream handler. Is required to hand over jobs of streams and events for dynamic components, if created at runtime.
"""
if dataformat not in ['json', 'xml']:
raise ValueError('Dataformat must be one of "json" or "xml".')
self.loop = loop
self.host = host
self.port = port
self.root = model
self._dataformat = dataformat
self._legacy_mode = legacy_mode
self._scheduler = scheduler
self._publisher = publisher
self._profiles_path = profiles_path
self.app = web.Application(loop=self.loop, middlewares=[cors])
# define two routes for each request to make the 'objects' part optional
self.app.router.add_get(r'/objects{uuids:(/' + BASE_UUID_PATTERN + r')*/?}', self.get)
self.app.router.add_get(r'/{uuids:(' + BASE_UUID_PATTERN + r'($|/))*}', self.get)
self.app.router.add_post(r'/objects{uuids:(/' + BASE_UUID_PATTERN + r')*/?}', self.post)
self.app.router.add_post(r'/{uuids:(' + BASE_UUID_PATTERN + r'($|/))*}', self.post)
self.app.router.add_delete(r'/objects{uuids:(/' + BASE_UUID_PATTERN + r')*/?}', self.delete)
self.app.router.add_delete(r'/{uuids:(' + BASE_UUID_PATTERN + r'($|/))*}', self.delete)
self.app.router.add_route('OPTIONS', r'/objects{uuids:(/' + BASE_UUID_PATTERN + r')*/?}', self.options)
self.app.router.add_route('OPTIONS', r'/{uuids:(' + BASE_UUID_PATTERN + r'($|/))*}', self.options)
self.app.router.add_put(r'/objects{uuids:(/' + BASE_UUID_PATTERN + r')*/?}', self.put)
self.app.router.add_put(r'/{uuids:(' + BASE_UUID_PATTERN + r'($|/))*}', self.put)
self.app.router.add_patch(r'/objects{uuids:(/' + BASE_UUID_PATTERN + r')*/?}', self.patch)
self.app.router.add_patch(r'/{uuids:(' + BASE_UUID_PATTERN + r'($|/))*}', self.patch)
web.run_app(self.app, host=self.host, port=self.port, loop=loop)
logger.info('HTTP-Server serving on {}:{}'.format(host, port))
@staticmethod
def analyze_request_url(request: Request) -> ResourceType:
assert request.url.parts[0] == '/'
url_parts = request.url.parts
if url_parts[-1] == '':
url_parts = url_parts[:-1]
if len(url_parts) == 3 and url_parts[1] == Semantics.prefix:
return ResourceType.profile if 'Profile' in url_parts[-1] else ResourceType.metadata
if request.url.path == '/' or url_parts[-1][:3] in ['COM', 'FUN', 'PAR', 'MEA', 'ARG', 'RET']:
return ResourceType.element
return ResourceType.metadata
@staticmethod
def parse_uuids(request: Request):
"""Splits the request URL to extract the FQID of the targeted element of the SOIL-Interface.
Args:
request:
Returns:
"""
uuids = request.match_info.get('uuids', 'uuids')
uuid_list = uuids.split('/')
while '' in uuid_list:
uuid_list.remove('')
return uuid_list
def _filter_query(self, query: MultiDict):
queried_attributes = []
for key in query:
if key in ['uuid', 'name', 'description', 'datatype', 'range', 'value', 'constant', 'timestamp',
'dimension', 'unit', 'covariance', 'label', 'children', 'arguments', 'returns', 'ontology']:
queried_attributes += [key]
return queried_attributes
def prepare_response(self, body: Union[Dict, rdflib.Graph], element: Element, status: int = 200,
query: MultiDict = None, semantic: bool = False):
dataformat = self._dataformat
if query is not None and 'format' in query and query['format'] in ['json', 'xml', 'turtle']:
dataformat = query['format']
if query is not None and 'semantic' in query and query['semantic'] in ['data', 'metadata', 'profile']:
semantic = True
if dataformat == 'json':
if semantic:
assert isinstance(body, rdflib.Graph)
# rdflib serialization returns a string, so we need to parse it as plain json again to return it properly
body = json.loads(body.serialize(format='json-ld'))
return web.json_response(body, status=status)
elif dataformat == 'xml':
if semantic:
assert isinstance(body, rdflib.Graph)
xml = body.serialize(format='xml')
else:
if element is not None and 200 <= status <= 300:
root = ''
if isinstance(element, Component):
root = 'component'
elif isinstance(element, Function):
root = 'function'
elif isinstance(element, Measurement):
root = 'measurement'
elif isinstance(element, Parameter):
root = 'parameter'
else:
root = 'error'
xml = serialize.to_xml(root, body)
return web.Response(text=xml, status=status, content_type='application/xml')
elif dataformat == 'turtle':
if semantic:
assert isinstance(body, rdflib.Graph)
text = body.serialize(format='turtle')
else:
return web.Response(text='Can not serialize non semantic information to Turtle', status=400,
content_type='text/plain')
return web.Response(text=text, status=status, content_type=' text/plain')
async def get(self, request):
logger.info("GET Request from {}".format(request.url))
logger.debug('Request: {}'.format(request))
logger.debug('Query Parameters: {}'.format(request.query_string))
resource_type = HTTPServer.analyze_request_url(request)
keys = self._filter_query(request.query)
try:
if resource_type == ResourceType.profile:
if self._profiles_path is None:
raise UserException('Can\'t return requested metadata profile, as no profiles have been created for this sensor service.')
profilename = request.url.parts[-2] if request.url.parts[-1] == '' else request.url.parts[-1]
if len(profilename) > 12 and profilename[-12:-7] == 'Range':
filename = profilename.replace('RangeProfile', '.shacl.ttl')
else:
filename = profilename.replace('Profile', '.shacl.ttl')
profile_path = os.path.join(self._profiles_path, filename)
response = rdflib.Graph()
response.parse(profile_path)
response.add(
(rdflib.URIRef(Semantics.namespace[profilename]), Namespaces.dcterms.license, Semantics.profile_license))
item, status = None, 200
elif resource_type == ResourceType.metadata or resource_type == ResourceType.data:
semantic_name = request.url.parts[-2] if request.url.parts[-1] == '' else request.url.parts[-1]
item, resource_type = self.root.resolve_semantic_path(semantic_name)
recursive = request.query is not None and 'all' in request.query
response = item.serialize_semantics(resource_type, recursive)
else:
assert resource_type == ResourceType.element
uuids = HTTPServer.parse_uuids(request)
if request.query is not None and 'semantic' in request.query and request.query[
'semantic'] in ResourceType.semantic_resources:
resource_type = ResourceType.from_string(request.query['semantic'])
try:
item = self.root[uuids]
if resource_type.is_semantic():
recursive = request.query is not None and 'all' in request.query
response = item.serialize_semantics(resource_type, recursive)
else:
response = item.serialize(keys, self._legacy_mode, HTTP_GET)
except KeyError as e:
logger.error(traceback.format_exc())
response = {'error': str(e)}
logger.error('Response: {}'.format(response))
return self.prepare_response(response, None, status=404, query=request.query)
status = 200
logger.info('Response: {}'.format(response))
except (DeviceException, ServerException, UserException) as e:
logger.error(traceback.format_exc())
response = {'error': str(e)}
status = 500
logger.error('Response: {}'.format(response))
return self.prepare_response(response, item, status=status, query=request.query, semantic=resource_type.is_semantic())
async def post(self, request):
logger.info("POST Request from {}".format(request.url))
logger.debug('Request: {}'.format(request))
data = await request.json()
logger.debug('Body: {}'.format(data))
uuids = HTTPServer.parse_uuids(request)
try:
item = self.root[uuids]
except KeyError as e:
logger.error(traceback.format_exc())
response = {'error': str(e)}
logger.error('Response: {}'.format(response))
return self.prepare_response(response, None, status=404, query=request.query)
if isinstance(item, Function):
try:
if item.publishes:
try:
async for item in item.invoke_generator(data["arguments"], legacy_mode=self._legacy_mode):
self._publisher.publish('/'.join(uuids), json.dumps(item))
except RuntimeError as e:
if not isinstance(e.__cause__, StopAsyncIteration):
raise e
response = {}
else:
response = await item.invoke(data["arguments"], legacy_mode=self._legacy_mode)
status = 200
logger.info('Response: {}'.format(response))
except (DeviceException, ServerException, UserException) as e:
logger.error(traceback.format_exc())
response = {'error': str(e)}
status = 500
logger.error('Response: {}'.format(response))
else:
response, status = {}, 405
logger.error('Response: {}'.format(response))
return self.prepare_response(response, item, status=status, query=request.query)
async def delete(self, request):
logger.info("DELETE Request from {}".format(request.url))
logger.debug('Request: {}'.format(request))
uuids = HTTPServer.parse_uuids(request)
try:
item = self.root[uuids[:-1]]
except KeyError as e:
logger.error(traceback.format_exc())
response = {'error': str(e)}
logger.error('Response: {}'.format(response))
return self.prepare_response(response, None, status=404, query=request.query)
if not isinstance(item, Component):
return self.prepare_response({}, None, status=405, query=request.query)
try:
await self.loop.run_in_executor(None, functools.partial(item.remove, uuids[-1]))
if self._scheduler is not None:
self._scheduler.remove_jobs('/'.join(uuids))
status = 200
# logger.info('Response: {}'.format(response))
except ChildNotFoundException as e:
logger.error(traceback.format_exc())
response = {'error': str(e)}
status = 404
logger.error('Response: {}'.format(response))
except (DeviceException, ServerException, UserException) as e:
logger.error(traceback.format_exc())
response = {'error': str(e)}
status = 500
logger.error('Response: {}'.format(response))
return self.prepare_response(response, item, status=status, query=request.query)
async def options(self, request):
logger.info("HEAD Request from {}".format(request.url))
logger.debug('Request: {}'.format(request))
logger.debug('Query Parameters: {}'.format(request.query_string))
keys = self._filter_query(request.query)
try:
item = self.root[HTTPServer.parse_uuids(request)]
except KeyError as e:
logger.error(traceback.format_exc())
response = {'error': str(e)}
logger.error('Response: {}'.format(response))
return self.prepare_response(response, None, status=404, query=request.query)
if not isinstance(item, Variable):
return self.prepare_response({}, None, status=405, query=request.query)
response = item.serialize(keys, self._legacy_mode, HTTP_OPTIONS)
logger.info('Response: {}'.format(response))
return self.prepare_response(response, item, query=request.query)
async def patch(self, request):
logger.info("PATCH Request from {}".format(request.url))
logger.debug('Request: {}'.format(request))
data = await request.json()
logger.debug('Body: {}'.format(data))
try:
item = self.root[HTTPServer.parse_uuids(request)]
except KeyError as e:
logger.error(traceback.format_exc())
response = {'error': str(e)}
logger.error('Response: {}'.format(response))
return self.prepare_response(response, None, status=404, query=request.query)
if isinstance(item, Parameter):
try:
response = await self.loop.run_in_executor(None, item.set, data["value"])
status = 200
logger.info('Response: {}'.format(response))
except ReadOnlyException as e:
logger.error(traceback.format_exc())
response = {'error': str(e)}
status = 403
logger.error('Response: {}'.format(response))
except InvokationException as e:
logger.error(traceback.format_exc())
response = {'error': str(e)}
status = 500
logger.error('Response: {}'.format(response))
else:
response, status = {}, 405
logger.error('Response: {}'.format(response))
return self.prepare_response(response, item, status=status, query=request.query)
async def put(self, request):
logger.info("PUT Request from {}".format(request.url))
logger.debug('Request: {}'.format(request))
data = await request.json()
logger.debug('Body: {}'.format(data))
uuids = HTTPServer.parse_uuids(request)
try:
item = self.root[uuids[:-1]]
except KeyError as e:
logger.error(traceback.format_exc())
response = {'error': str(e)}
logger.error('Response: {}'.format(response))
return self.prepare_response(response, None, status=404, query=request.query)
if not isinstance(item, Component):
return self.prepare_response({}, None, status=405, query=request.query)
try:
implementation = await self.loop.run_in_executor(None,
functools.partial(item.add, uuids[-1], data['class_name'],
data['json_file'], *data['args'],
**data['kwargs']))
if self._scheduler is not None:
self._scheduler.add_jobs(implementation.streams('/'.join(uuids)))
status = 200
response = {}
# logger.info('Response: {}'.format(response))
except (DeviceException, ServerException, UserException) as e:
logger.error(traceback.format_exc())
response = {'error': str(e)}
status = 500
logger.error('Response: {}'.format(response))
return self.prepare_response(response, item, status=status, query=request.query)