diff --git a/demos/demo2/api.py b/demos/demo2/api.py index bc5cd8a6a9549143c95f9c344d0773d63c3a1024..fb0071c6194b2f1195b1848db7e005d8cb80432f 100644 --- a/demos/demo2/api.py +++ b/demos/demo2/api.py @@ -1,5 +1,14 @@ from typing import Set from basyx.aas import model +import base64 + +from urllib.parse import unquote + + +AAS = "aas" +SUBMODELS = "submodels" +SUBMODEL = "submodel" +SUBMODELELEMENTS = "submodelElements" class AASAPI: @@ -40,17 +49,11 @@ class SubmodelAPI: return list(self.submodel.submodel_element) def get_submodel_element_by_path(self, idshort_path: str): - path_elements = idshort_path.split(".") - submodel_element = self.submodel.get_referable(path_elements.pop(0)) - for idshort in path_elements: - submodel_element = submodel_element.get_referable(idshort) + submodel_element = self._get_se_by_path(idshort_path) return submodel_element def set_submodel_element_value_by_path(self, idshort_path: str, value): - path_elements = idshort_path.split(".") - submodel_element = self.submodel.get_referable(path_elements.pop(0)) - for idshort in path_elements: - submodel_element = submodel_element.get_referable(idshort) + submodel_element = self._get_se_by_path(idshort_path) submodel_element.value = value def delete_submodel_element_by_path(self, idshort_path: str): @@ -59,28 +62,43 @@ class SubmodelAPI: last_idshort = path_elements.pop(-1) for idshort in path_elements: submodel_element = submodel_element.get_referable(idshort) - submodel_element.remove_referable(last_idshort) + if isinstance(submodel_element, model.UniqueIdShortNamespace): + submodel_element.remove_referable(last_idshort) def post_submodel_element(self, submodel_element: model.SubmodelElement): self.submodel.submodel_element.add(submodel_element) def post_submodel_element_by_path(self, idshort_path: str, submodel_element: model.SubmodelElement): + se = self._get_se_by_path(idshort_path) + if isinstance(se, (model.SubmodelElementCollection, model.SubmodelElementList)): + se.value.add(submodel_element) + + def _get_se_by_path(self, idshort_path: str): + idshort_path = unquote(idshort_path) path_elements = idshort_path.split(".") - se = self.submodel.get_referable(path_elements.pop(0)) - for idshort in path_elements: - se = se.get_referable(idshort) - se.value.add(submodel_element) + if "[" in path_elements[0]: + submodel_element = self.submodel + else: + submodel_element = self.submodel.get_referable(path_elements.pop(0)) + for i, idshort in enumerate(path_elements): + if "[" in idshort: + sub_elements = idshort.replace("[", "]").split("]") + se_list: model.SubmodelElementList = submodel_element.get_referable(sub_elements[0]) + submodel_element = se_list.value[int(sub_elements[1])] + else: + submodel_element = submodel_element.get_referable(idshort) + return submodel_element class ModelProvider: - AAS = "aas" - SUBMODELS = "submodels" - SUBMODEL = "submodel" - SUBMODELELEMENTS = "submodelElements" + """ + """ def __init__(self, aas: model.AssetAdministrationShell, submodels: Set[model.Submodel]): self.aas_api = AASAPI(aas) - self.submodel_api = {x.id_short: SubmodelAPI(x) for x in submodels} + self.submodel_api = {x.id: SubmodelAPI(x) for x in submodels} + self._id_to_type = {base64.urlsafe_b64encode(x.id.encode()).decode(): x.__class__.__name__ for x in submodels} + def process_path(self, path: str): path_elems = path.split("/") @@ -91,17 +109,17 @@ class ModelProvider: def getValue(self, path: str): path_elems = self.process_path(path) - if path_elems and path_elems[0] == self.AAS: + if path_elems and path_elems[0] == AAS: if len(path_elems) == 1: return self.aas_api.get_aas() - if path_elems[1] == self.SUBMODELS: + if path_elems[1] == SUBMODELS: if len(path_elems) == 2: return self.aas_api.get_all_submodel_references() - submodel_id = path_elems[2] - if path_elems[3] == self.SUBMODEL: + submodel_id = base64.urlsafe_b64decode(path_elems[2]).decode() + if path_elems[3] == SUBMODEL: if len(path_elems) == 4: return self.submodel_api[submodel_id].get_submodel() - if path_elems[4] == self.SUBMODELELEMENTS: + if path_elems[4] == SUBMODELELEMENTS: if len(path_elems) == 5: return self.submodel_api[submodel_id].get_all_submodel_elements() else: @@ -109,13 +127,13 @@ class ModelProvider: def createValue(self, path: str, value): path_elems = self.process_path(path) - if path_elems and path_elems[0] == self.AAS: - if path_elems[1] == self.SUBMODELS: + if path_elems and path_elems[0] == AAS: + if path_elems[1] == SUBMODELS: if len(path_elems) == 2: return self.aas_api.post_submodel_reference(value) - submodel_id = path_elems[2] - if path_elems[3] == self.SUBMODEL: - if path_elems[4] == self.SUBMODELELEMENTS: + submodel_id = base64.urlsafe_b64decode(path_elems[2]).decode() + if path_elems[3] == SUBMODEL: + if path_elems[4] == SUBMODELELEMENTS: if len(path_elems) == 5: return self.submodel_api[submodel_id].post_submodel_element(value) else: @@ -123,21 +141,21 @@ class ModelProvider: def deleteValue(self, path: str): path_elems = self.process_path(path) - if path_elems and path_elems[0] == self.AAS: - if path_elems[1] == self.SUBMODELS: - submodel_id = path_elems[2] + if path_elems and path_elems[0] == AAS: + if path_elems[1] == SUBMODELS: + submodel_id = base64.urlsafe_b64decode(path_elems[2]).decode() if len(path_elems) == 3: return self.aas_api.delete_submodel_reference(submodel_id) - if path_elems[3] == self.SUBMODEL: - if path_elems[4] == self.SUBMODELELEMENTS: + if path_elems[3] == SUBMODEL: + if path_elems[4] == SUBMODELELEMENTS: return self.submodel_api[submodel_id].delete_submodel_element_by_path(path_elems[5]) def setValue(self, path: str, value): path_elems = self.process_path(path) - if path_elems and path_elems[0] == self.AAS: - if path_elems[1] == self.SUBMODELS: - submodel_id = path_elems[2] - if path_elems[3] == self.SUBMODEL: + if path_elems and path_elems[0] == AAS: + if path_elems[1] == SUBMODELS: + submodel_id = base64.urlsafe_b64decode(path_elems[2]).decode() + if path_elems[3] == SUBMODEL: if len(path_elems) == 4: # create submodel if it does not exist, may be different from standard if not self.submodel_api.get(submodel_id): @@ -146,5 +164,5 @@ class ModelProvider: else: self.submodel_api[submodel_id].put_submodel(value) return - if path_elems[4] == self.SUBMODELELEMENTS: + if path_elems[4] == SUBMODELELEMENTS: return self.submodel_api[submodel_id].set_submodel_element_value_by_path(path_elems[5], value) diff --git a/demos/demo2/broker_api.py b/demos/demo2/broker_api.py index 35c1d0349dd331c9597e07d0cecb5693edef743c..38c0c674785baf63c3065fd38b6f37d7bc2fe62a 100644 --- a/demos/demo2/broker_api.py +++ b/demos/demo2/broker_api.py @@ -5,11 +5,17 @@ from s3i import broker_message from s3i.exception import S3IBMessageError from basyx.aas.adapter.json.json_serialization import AASToJsonEncoder from basyx.aas.adapter.json.json_deserialization import AASFromJsonDecoder +from basyx.aas.model import BasicEventElement, ModelReference import json import logging import traceback +import base64 +from typing import Iterable +from datetime import datetime +import time import api +import helpers logger = logging.getLogger(__name__) ch = logging.StreamHandler() @@ -28,6 +34,22 @@ class S3IBServer: self.serializer = AASToJsonEncoder() self.deserializer = AASFromJsonDecoder() + def _check_for_events(self, path) -> Iterable[BasicEventElement]: + triggered_events = [] + submodel_id = "https://www.company.com/submodels/events" + submodel_id_encoded = base64.urlsafe_b64encode(submodel_id.encode()).decode() + events = self.provider.getValue(f"/aas/submodels/{submodel_id_encoded}/submodel") + + for event in events.submodel_element: + event: BasicEventElement = event + obs_path = helpers.id_short_path_from_ref(event.observed) + req_path = helpers.id_short_path_from_path(path) + if obs_path == req_path: + triggered_events.append(event) + logger.info(f"[Event] [{event.message_topic}] {event.id_short} was triggered") + + return triggered_events + def callback(self, ch, method, properties, body): try: msg = broker_message.Message(base_msg=body) @@ -66,6 +88,20 @@ class S3IBServer: ) self.broker.send([receiver_endpoint], json.dumps(reply.base_msg)) + # check if element is being observed + triggered_events = self._check_for_events(path) + for event in triggered_events: + evt_msg = broker_message.EventMessage() + evt_msg.fillEventMessage( + sender=self.id, + message_id="s3i:" + str(uuid4()), + topic=event.message_topic, + timestamp= round(datetime.now().timestamp()), + content={path: self.deserializer.decode(new_value)} + ) + self.broker.publish_event(json.dumps(evt_msg.base_msg), event.message_topic) + + elif message_type == "createAttributeRequest": value = msg.base_msg["newValue"] self.provider.createValue(path, self.deserializer.decode(value)) @@ -96,8 +132,9 @@ class S3IBAsyncClient: self.loop = loop self.broker = BrokerAMQP(access_token, "s3ibs://" + self.id, self.callback, loop) self.broker.add_on_channel_open_callback(self.on_channel_open, True) + self.broker.create_event_queue() self.channel_open = self.loop.create_future() - self.requests = {} + self.requests: dict[str, asyncio.Future] = {} self.deserializer = AASFromJsonDecoder() self.serializer = AASToJsonEncoder() self.broker.connect() @@ -108,7 +145,7 @@ class S3IBAsyncClient: def callback(self, ch, method, properties, body): try: msg = broker_message.Message(base_msg=body) - msg_id = msg.base_msg["replyingToMessage"] + msg_id = msg.base_msg.get("replyingToMessage") sender = msg.base_msg["sender"] message_type = msg.base_msg["messageType"] value = msg.base_msg.get("value") @@ -116,11 +153,19 @@ class S3IBAsyncClient: logger.info(f"[S3I] Received {message_type} from {sender}") - future: asyncio.Future = self.requests[msg_id] - if value: - future.set_result(self.deserializer.decode(value)) + if message_type == "eventMessage": + content = msg.base_msg.get("content") + topic = msg.base_msg.get("topic") + timestamp = msg.base_msg.get("timestamp") + logger.info(f"[Event] [{topic}] [{datetime.fromtimestamp(timestamp)}] {content}") + self.requests[topic].set_result(content) else: - future.set_result(ok) + future: asyncio.Future = self.requests[msg_id] + if value: + future.set_result(self.deserializer.decode(value)) + else: + future.set_result(ok) + except S3IBMessageError as e: raise Exception(body) @@ -184,3 +229,11 @@ class S3IBAsyncClient: self.requests[msg_id] = reply await reply return reply.result() + + async def subscribeToEvent(self, topic: str): + if not self.channel_open.done(): + await self.channel_open + self.broker.subscribe_topic(topic) + handle = self.loop.create_future() + self.requests[topic] = handle + return handle \ No newline at end of file diff --git a/demos/demo2/demo2_dzwald.py b/demos/demo2/demo2_dzwald.py index 81e990d5cf0ef118056dfffad51fb62faf6afb54..3fdece4d938d8a3c9de501f57aeb1d2af55bdca3 100644 --- a/demos/demo2/demo2_dzwald.py +++ b/demos/demo2/demo2_dzwald.py @@ -1,5 +1,5 @@ import os, sys, inspect -from typing import List +from typing import List, Set import basyx from basyx.aas import model @@ -8,6 +8,8 @@ from s3i import IdentityProvider, TokenType, Directory import asyncio import logging import datetime +import base64 +from urllib.parse import quote import api import broker_api @@ -18,15 +20,15 @@ parentdir = os.path.dirname(currentdir) sys.path.insert(0, parentdir) sys.path.insert(0, os.path.dirname(parentdir)) -from model import enums, models +from model import enums, models, configs # print info logs to console logger = logging.getLogger("broker_api") logger.setLevel(logging.INFO) # Thing ID and secret in the S3I-Identityprovider -dzwald_id = "s3i:6b1c90d0-0234-4ce7-9094-06ec2259dd5c" -dzwald_secret = "nIcLGAkBM0zbmU4uHTbqkAc53ZoSOQQ3" +dzwald_id = "s3i:bc30c279-02c5-4918-a2ad-761e927214dd" +dzwald_secret = "syhdDLoC9HSov8nuw2IK6YixTK9072wy" forestmanager_hmi_id = "s3i:e8ef672c-109b-4c36-8999-f4ababa0bffc" forstify_hmi_id = "s3i:8a8ee1ab-63d2-42ea-92d1-1ae682a55e7a" @@ -345,6 +347,21 @@ def create_aas(): waldweg = create_waldweg() zu_faellende_baeume = create_zu_faellende_baeume() + events = model.Submodel( + id_="https://www.company.com/submodels/events", + id_short="Events" + ) + + event = model.BasicEventElement( + id_short="Auftragsstatus_Updated", + observed=model.ModelReference.from_referable(arbeitsauftrag.get_referable(configs.AUFTRAGSSTATUS)), + direction=model.Direction.OUTPUT, + state=model.StateOfEvent.ON, + message_topic="topic123" + ) + + events.add_referable(event) + submodels = [ arbeitsauftrag, beobachtung, @@ -354,6 +371,7 @@ def create_aas(): verkaufslos, waldweg, zu_faellende_baeume, + events ] aas_dz_wald = models.DZWald( dzwald_id="https://www.company.com/dz_wald/1", @@ -391,13 +409,13 @@ def main(): # view helpers.py for a better insight # function call is commented because it needs to be done a single time s3i_dir = Directory("https://dir.s3i.vswf.dev/api/2/", access_token) - # helpers.grant_entry_read_permissions(s3i_dir, dzwald_id, [forestmanager_hmi_id, forstify_hmi_id]) + helpers.grant_entry_read_permissions(s3i_dir, dzwald_id, [forestmanager_hmi_id, forstify_hmi_id]) # create AAS aas, submodels = create_aas() # update S3I-Directory entry to include a short description of your AAS - helpers.add_aas_to_dir_entry(aas, s3i_dir, dzwald_id) + # helpers.add_aas_to_dir_entry(aas, s3i_dir, dzwald_id) # wrap them into a ModelProvider to expose an Interface of type 2 provider = api.ModelProvider(aas, submodels) @@ -407,8 +425,19 @@ def main(): # corrosponding to GetAllSubmodelReferences: print(provider.getValue("/aas/submodels")) + # the following will retrieve the submodel identified by + # https://www.company.com/holzliste/1 + submodel_id = "https://www.company.com/holzliste/1" + submodel_id_encoded = base64.urlsafe_b64encode(submodel_id.encode()).decode() + id_short_path_encoded = quote("Preismatrix[0].Preis") + preis: model.Property = provider.getValue( + f"/aas/submodels/{submodel_id_encoded}/submodel/submodelElements/{id_short_path_encoded}" + ) + print(preis) + # now create a server instance that will translate incoming S3I-B messages # to methods exposed by the ModelProvider + loop = asyncio.get_event_loop() server = broker_api.S3IBServer(access_token, provider, dzwald_id, loop) @@ -418,5 +447,6 @@ def main(): loop.stop() + if __name__ == "__main__": main() diff --git a/demos/demo2/demo2_forestmanager.py b/demos/demo2/demo2_forestmanager.py index fb731df7c94c0d57a3349aafb87dba97ee798a16..6bbc5d997ff8540e9bd4244cf8c83a8d59664c55 100644 --- a/demos/demo2/demo2_forestmanager.py +++ b/demos/demo2/demo2_forestmanager.py @@ -3,6 +3,7 @@ from s3i import IdentityProvider, TokenType, Directory import asyncio import logging import os, sys, inspect +import base64 import broker_api @@ -26,7 +27,7 @@ forestmanager_hmi_secret = "cfs2YMj2bzPIS3kDWpHYFyUKzCorAQuV" # The id of forstify's HMI and dz_wald is known beforehand forstify_hmi_id = "s3i:8a8ee1ab-63d2-42ea-92d1-1ae682a55e7a" -dzwald_id = "s3i:6b1c90d0-0234-4ce7-9094-06ec2259dd5c" +dzwald_id = "s3i:bc30c279-02c5-4918-a2ad-761e927214dd" def authenticate(): @@ -50,17 +51,28 @@ async def main(): dzwald_endpoint = s3ib_endpoints[0] # use async client to access AAS and its submodels through their REST API asynchronously - client = broker_api.S3IBAsyncClient(access_token, forestmanager_hmi_id, loop) + client = broker_api.S3IBAsyncClient(access_token, forestmanager_hmi_id, loop) # Speichern einer Beobachtung beobachtung_submodel = create_beobachtung() + beobachtung_id_encoded = base64.urlsafe_b64encode(beobachtung_submodel.id.encode()).decode() task0 = client.setValue(dzwald_id, dzwald_endpoint, - "/aas/submodels/Beobachtung/submodel", beobachtung_submodel) + f"/aas/submodels/{beobachtung_id_encoded}/submodel", beobachtung_submodel) + + # listen on event "Auftragsstatus_Updated" + events_submodel_id = "https://www.company.com/submodels/events" + events_submodel_id_encoded = base64.urlsafe_b64encode(events_submodel_id.encode()).decode() + print(events_submodel_id_encoded) + event: model.BasicEventElement = await client.getValue(dzwald_id, dzwald_endpoint, + f"/aas/submodels/{events_submodel_id_encoded} \ + /submodel/submodelElements/Auftragsstatus_Updated") + auftragsstatus_updated = await client.subscribeToEvent(event.message_topic) # Speichern eines Waldweges waldweg_submodel = create_waldweg() + waldweg_id_encoded = base64.urlsafe_b64encode(waldweg_submodel.id.encode()).decode() task1 = client.setValue(dzwald_id, dzwald_endpoint, - "/aas/submodels/Waldweg/submodel", waldweg_submodel) + f"/aas/submodels/{waldweg_id_encoded}/submodel", waldweg_submodel) # values of the ok-element in the S3I-B messages are returned (reply0, reply1) = await asyncio.gather(task0, task1) @@ -71,18 +83,23 @@ async def main(): # Speichern des Arbeitsauftrags arbeitsauftrag_submodel = create_arbeitsauftrag() + arbeitsauftrag_id = "https://www.company.com/submodels/arbeitsauftrag" + arbeitsauftrag_id_encoded = base64.urlsafe_b64encode(arbeitsauftrag_id.encode()).decode() + reply2 = await client.setValue(dzwald_id, dzwald_endpoint, - "/aas/submodels/Arbeitsauftrag/submodel", + f"/aas/submodels/{arbeitsauftrag_id_encoded}/submodel", arbeitsauftrag_submodel) print(f"Response to add arbeitsauftrag: {reply2}") + # event = client.subscribeToEvent("update_arbeitsauftrag") + # TODO: SOLL-IST comparison # Update Arbeitsauftrag (Status) reply3 = await client.setValue( dzwald_id, dzwald_endpoint, - "/aas/submodels/Arbeitsauftrag/submodel/submodelElements/Auftragsstatus", + f"/aas/submodels/{arbeitsauftrag_id_encoded}/submodel/submodelElements/Auftragsstatus", f"{enums.Auftragsstatus.Gesendet.value}" ) print(f"Response to update auftragsstatus: {reply3}") @@ -91,10 +108,14 @@ async def main(): auftragsstatus: model.Property = await client.getValue( dzwald_id, dzwald_endpoint, - "/aas/submodels/Arbeitsauftrag/submodel/submodelElements/Auftragsstatus" + f"/aas/submodels/{arbeitsauftrag_id_encoded}/submodel/submodelElements/Auftragsstatus" ) + print(auftragsstatus) print( f"{auftragsstatus.id_short}: {enums.Auftragsstatus(int(auftragsstatus.value))}") + + await asyncio.wait([auftragsstatus_updated]) + if __name__ == "__main__": diff --git a/demos/demo2/helpers.py b/demos/demo2/helpers.py index 71b5e163da546b8c26535098e9cabd37cf227b66..a012abaee2f094df040a427113274842a1e60ab0 100644 --- a/demos/demo2/helpers.py +++ b/demos/demo2/helpers.py @@ -1,6 +1,11 @@ +import base64 +from urllib.parse import unquote + from basyx.aas import model from s3i import Directory +from api import SUBMODELS, SUBMODELELEMENTS + def add_aas_to_dir_entry(aas: model.AssetAdministrationShell, dir: Directory, thing_id: str): # get thing entry from directory @@ -44,3 +49,31 @@ def grant_entry_read_permissions(dir: Directory, thing_id, receivers: list): policy['entries']['observer']['subjects'][f"nginx:{receiver}"] = {'type': 'nginx basic auth client'} dir.updatePolicyIDBased(thing_id, policy) return + + +def id_short_path_from_ref(ref: model.ModelReference): + arr = [] + for i, key in enumerate(ref.key): + if key.value.isnumeric(): + arr[i-1] = f"{arr[i-1]}[{key.value}]" + elif key.type == model.KeyTypes.SUBMODEL: + arr.append(base64.urlsafe_b64encode(key.value.encode()).decode()) + else: + arr.append(key.value) + return ".".join(arr) + + +def id_short_path_from_path(path: str): + arr = path.split("/") + try: + submodel_id_index = arr.index(SUBMODELS) + 1 + id_short_path_index = arr.index(SUBMODELELEMENTS) + 1 + except: + return "" + return f"{arr[submodel_id_index]}.{unquote(arr[id_short_path_index])}" + + + + + + diff --git a/docker-compose.yml b/docker-compose.yml index c166c8ceb0352ba468aae2e23264fa0d1ff318ce..c53bbd8b3db81b8c72d9c04e80828c2719aa7d93 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,6 +2,11 @@ version: '3.8' services: dzwald: - image: basys4forestry_dzwald #TODO: Change + image: registry.git-ce.rwth-aachen.de/acplt/basys4forestry:latest container_name: dzwald + networks: + - my-network + +networks: + my-network: