Skip to content
Snippets Groups Projects
Select Git revision
  • 52cccc39a65a23723882324c48921e1093fe4401
  • master default protected
  • feature/refactor
  • 4.24
  • develop
  • Rendering
  • temp-optix-6
7 results

OptiXObjectComponent.cpp

Blame
  • broker_api.py 9.65 KiB
    import asyncio
    from uuid import uuid4
    from s3i.broker import BrokerAMQP
    from s3i import broker_message
    from s3i.exception import S3IBMessageError
    from basyx.aas.adapter.json.json_serialization import AASToJsonEncoder
    from basyx.aas.adapter.json.json_deserialization import AASFromJsonDecoder
    from basyx.aas.model import BasicEventElement, ModelReference
    import json
    import logging
    import traceback
    import base64
    from typing import Iterable
    from datetime import datetime
    import time
    
    import api
    import helpers
    
    logger = logging.getLogger(__name__)
    ch = logging.StreamHandler()
    ch.setLevel(logging.INFO)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    
    
    class S3IBServer:
        def __init__(self, access_token: str, provider: api.ModelProvider, id: str, loop):
            self.id = id
            self.provider = provider
            self.loop = loop
            self.broker = BrokerAMQP(access_token, "s3ibs://" + self.id, self.callback, loop)
            self.serializer = AASToJsonEncoder()
            self.deserializer = AASFromJsonDecoder()
    
        def _check_for_events(self, path) -> Iterable[BasicEventElement]:
            triggered_events = []
            submodel_id = "https://www.company.com/submodels/events"
            submodel_id_encoded = base64.urlsafe_b64encode(submodel_id.encode()).decode()
            events = self.provider.getValue(f"/aas/submodels/{submodel_id_encoded}/submodel")
    
            for event in events.submodel_element:
                event: BasicEventElement = event
                obs_path = helpers.id_short_path_from_ref(event.observed)
                req_path = helpers.id_short_path_from_path(path)
                if obs_path == req_path:
                    triggered_events.append(event)
                    logger.info(f"[Event] [{event.message_topic}] {event.id_short} was triggered")
            
            return triggered_events
    
        def callback(self, ch, method, properties, body):
            try:
                msg = broker_message.Message(base_msg=body)
                path = msg.base_msg["attributePath"]
                receiver_endpoint = msg.base_msg["replyToEndpoint"]
                receiver_id = msg.base_msg["sender"]
                received_msg_id = msg.base_msg["identifier"]
                message_type = msg.base_msg["messageType"]
    
                logger.info(f"[S3I] Received {message_type} from {receiver_id}")
    
                if message_type == "getValueRequest":
                    value = self.provider.getValue(path)
                    reply = broker_message.GetValueReply()
                    reply.fillGetValueReply(
                        sender=self.id,
                        receivers=[receiver_id],
                        message_id="s3i:" + str(uuid4()),
                        value=self.serializer.encode(value),
                        replying_to_msg=received_msg_id,
                        reply_to_endpoint="s3ibs://" + self.id
                    )
                    self.broker.send([receiver_endpoint], json.dumps(reply.base_msg))
    
                elif message_type == "setValueRequest":
                    new_value = msg.base_msg["newValue"]
                    self.provider.setValue(path, self.deserializer.decode(new_value))
                    reply = broker_message.SetValueReply()
                    reply.fillSetValueReply(
                        sender=self.id,
                        receivers=[receiver_id],
                        message_id="s3i:" + str(uuid4()),
                        replying_to_msg=received_msg_id,
                        ok=True,
                        reply_to_endpoint="s3ibs://" + self.id
                    )
                    self.broker.send([receiver_endpoint], json.dumps(reply.base_msg))
    
                    # check if element is being observed
                    triggered_events = self._check_for_events(path)
                    for event in triggered_events:
                        evt_msg = broker_message.EventMessage()
                        evt_msg.fillEventMessage(
                            sender=self.id,
                            message_id="s3i:" + str(uuid4()),
                            topic=event.message_topic,
                            timestamp= round(datetime.now().timestamp()),
                            content={path: self.deserializer.decode(new_value)}
                        )
                        self.broker.publish_event(json.dumps(evt_msg.base_msg), event.message_topic)
                        
    
                elif message_type == "createAttributeRequest":
                    value = msg.base_msg["newValue"]
                    self.provider.createValue(path, self.deserializer.decode(value))
                    reply = broker_message.Message()
                    reply.base_msg["sender"] = self.id
                    reply.base_msg["receivers"] = [receiver_id]
                    reply.base_msg["identifier"] = "s3i:" + str(uuid4())
                    reply.base_msg["messageType"] = "createAttributeReply"
                    reply.base_msg["replyToEndpoint"] = "s3ibs://" + self.id
                    reply.base_msg["replyingToMessage"] = received_msg_id
                    reply.base_msg["ok"] = True
                    self.broker.send([receiver_endpoint], json.dumps(reply.base_msg))
            except S3IBMessageError as e:
                self.broker.send([receiver_endpoint], json.dumps(e.error_msg))
            except Exception as e:
                print(traceback.format_exc())
                print(e.args)
    
        def run(self):
            self.broker.connect()
            self.loop.run_forever()
    
    
    class S3IBAsyncClient:
    
        def __init__(self, access_token: str, id: str, loop: asyncio.AbstractEventLoop):
            self.id = id
            self.loop = loop
            self.broker = BrokerAMQP(access_token, "s3ibs://" + self.id, self.callback, loop)
            self.broker.add_on_channel_open_callback(self.on_channel_open, True)
            self.broker.create_event_queue()
            self.channel_open = self.loop.create_future()
            self.requests: dict[str, asyncio.Future] = {}
            self.deserializer = AASFromJsonDecoder()
            self.serializer = AASToJsonEncoder()
            self.broker.connect()
    
        def on_channel_open(self):
            self.channel_open.set_result(True)
    
        def callback(self, ch, method, properties, body):
            try:
                msg = broker_message.Message(base_msg=body)
                msg_id = msg.base_msg.get("replyingToMessage")
                sender = msg.base_msg["sender"]
                message_type = msg.base_msg["messageType"]
                value = msg.base_msg.get("value")
                ok = msg.base_msg.get("ok")
    
                logger.info(f"[S3I] Received {message_type} from {sender}")
    
                if message_type == "eventMessage":
                    content = msg.base_msg.get("content")
                    topic = msg.base_msg.get("topic")
                    timestamp = msg.base_msg.get("timestamp")
                    logger.info(f"[Event] [{topic}] [{datetime.fromtimestamp(timestamp)}] {content}")
                    self.requests[topic].set_result(content)
                else:
                    future: asyncio.Future = self.requests[msg_id]
                    if value:
                        future.set_result(self.deserializer.decode(value))
                    else:
                        future.set_result(ok)
    
            except S3IBMessageError as e:
                raise Exception(body)
    
        async def getValue(self, receiver_id: str, endpoint: str, path: str):
            if not self.channel_open.done():
                await self.channel_open
            msg = broker_message.GetValueRequest()
            receiver_endpoint = endpoint
            my_endpoint = "s3ibs://" + self.id
            msg_id = "s3i:" + str(uuid4())
            msg.fillGetValueRequest(
                sender=self.id,
                receivers=[receiver_id],
                message_id=msg_id,
                attribute_path=path,
                reply_to_endpoint=my_endpoint
            )
            self.broker.send([receiver_endpoint], json.dumps(msg.base_msg))
            reply = self.loop.create_future()
            self.requests[msg_id] = reply
            await reply
            return reply.result()
    
        async def setValue(self, receiver_id: str, endpoint: str, path: str, new_value):
            if not self.channel_open.done():
                await self.channel_open
            msg = broker_message.SetValueRequest()
            receiver_endpoint = endpoint
            my_endpoint = "s3ibs://" + self.id
            msg_id = "s3i:" + str(uuid4())
            msg.fillSetValueRequest(
                sender=self.id,
                receivers=[receiver_id],
                message_id=msg_id,
                attribute_path=path,
                reply_to_endpoint=my_endpoint,
                new_value=self.serializer.encode(new_value)
            )
            self.broker.send([receiver_endpoint], json.dumps(msg.base_msg))
            reply = self.loop.create_future()
            self.requests[msg_id] = reply
            await reply
            return reply.result()
    
        async def createValue(self, receiver_id: str, endpoint: str, path: str, value):
            if not self.channel_open.done():
                await self.channel_open
            msg = broker_message.Message()
            receiver_endpoint = endpoint
            my_endpoint = "s3ibs://" + self.id
            msg_id = "s3i:" + str(uuid4())
            msg.base_msg["sender"] = self.id
            msg.base_msg["receivers"] = [receiver_id]
            msg.base_msg["identifier"] = msg_id
            msg.base_msg["attributePath"] = path
            msg.base_msg["newValue"] = self.serializer.encode(value)
            msg.base_msg["messageType"] = "createAttributeRequest"
            msg.base_msg["replyToEndpoint"] = my_endpoint
            self.broker.send([receiver_endpoint], json.dumps(msg.base_msg))
            reply = self.loop.create_future()
            self.requests[msg_id] = reply
            await reply
            return reply.result()
        
        async def subscribeToEvent(self, topic: str):
            if not self.channel_open.done():
                await self.channel_open
            self.broker.subscribe_topic(topic)
            handle = self.loop.create_future()
            self.requests[topic] = handle
            return handle