Select Git revision
OptiXObjectComponent.cpp
broker_api.py 9.65 KiB
import asyncio
from uuid import uuid4
from s3i.broker import BrokerAMQP
from s3i import broker_message
from s3i.exception import S3IBMessageError
from basyx.aas.adapter.json.json_serialization import AASToJsonEncoder
from basyx.aas.adapter.json.json_deserialization import AASFromJsonDecoder
from basyx.aas.model import BasicEventElement, ModelReference
import json
import logging
import traceback
import base64
from typing import Iterable
from datetime import datetime
import time
import api
import helpers
logger = logging.getLogger(__name__)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
class S3IBServer:
def __init__(self, access_token: str, provider: api.ModelProvider, id: str, loop):
self.id = id
self.provider = provider
self.loop = loop
self.broker = BrokerAMQP(access_token, "s3ibs://" + self.id, self.callback, loop)
self.serializer = AASToJsonEncoder()
self.deserializer = AASFromJsonDecoder()
def _check_for_events(self, path) -> Iterable[BasicEventElement]:
triggered_events = []
submodel_id = "https://www.company.com/submodels/events"
submodel_id_encoded = base64.urlsafe_b64encode(submodel_id.encode()).decode()
events = self.provider.getValue(f"/aas/submodels/{submodel_id_encoded}/submodel")
for event in events.submodel_element:
event: BasicEventElement = event
obs_path = helpers.id_short_path_from_ref(event.observed)
req_path = helpers.id_short_path_from_path(path)
if obs_path == req_path:
triggered_events.append(event)
logger.info(f"[Event] [{event.message_topic}] {event.id_short} was triggered")
return triggered_events
def callback(self, ch, method, properties, body):
try:
msg = broker_message.Message(base_msg=body)
path = msg.base_msg["attributePath"]
receiver_endpoint = msg.base_msg["replyToEndpoint"]
receiver_id = msg.base_msg["sender"]
received_msg_id = msg.base_msg["identifier"]
message_type = msg.base_msg["messageType"]
logger.info(f"[S3I] Received {message_type} from {receiver_id}")
if message_type == "getValueRequest":
value = self.provider.getValue(path)
reply = broker_message.GetValueReply()
reply.fillGetValueReply(
sender=self.id,
receivers=[receiver_id],
message_id="s3i:" + str(uuid4()),
value=self.serializer.encode(value),
replying_to_msg=received_msg_id,
reply_to_endpoint="s3ibs://" + self.id
)
self.broker.send([receiver_endpoint], json.dumps(reply.base_msg))
elif message_type == "setValueRequest":
new_value = msg.base_msg["newValue"]
self.provider.setValue(path, self.deserializer.decode(new_value))
reply = broker_message.SetValueReply()
reply.fillSetValueReply(
sender=self.id,
receivers=[receiver_id],
message_id="s3i:" + str(uuid4()),
replying_to_msg=received_msg_id,
ok=True,
reply_to_endpoint="s3ibs://" + self.id
)
self.broker.send([receiver_endpoint], json.dumps(reply.base_msg))
# check if element is being observed
triggered_events = self._check_for_events(path)
for event in triggered_events:
evt_msg = broker_message.EventMessage()
evt_msg.fillEventMessage(
sender=self.id,
message_id="s3i:" + str(uuid4()),
topic=event.message_topic,
timestamp= round(datetime.now().timestamp()),
content={path: self.deserializer.decode(new_value)}
)
self.broker.publish_event(json.dumps(evt_msg.base_msg), event.message_topic)
elif message_type == "createAttributeRequest":
value = msg.base_msg["newValue"]
self.provider.createValue(path, self.deserializer.decode(value))
reply = broker_message.Message()
reply.base_msg["sender"] = self.id
reply.base_msg["receivers"] = [receiver_id]
reply.base_msg["identifier"] = "s3i:" + str(uuid4())
reply.base_msg["messageType"] = "createAttributeReply"
reply.base_msg["replyToEndpoint"] = "s3ibs://" + self.id
reply.base_msg["replyingToMessage"] = received_msg_id
reply.base_msg["ok"] = True
self.broker.send([receiver_endpoint], json.dumps(reply.base_msg))
except S3IBMessageError as e:
self.broker.send([receiver_endpoint], json.dumps(e.error_msg))
except Exception as e:
print(traceback.format_exc())
print(e.args)
def run(self):
self.broker.connect()
self.loop.run_forever()
class S3IBAsyncClient:
def __init__(self, access_token: str, id: str, loop: asyncio.AbstractEventLoop):
self.id = id
self.loop = loop
self.broker = BrokerAMQP(access_token, "s3ibs://" + self.id, self.callback, loop)
self.broker.add_on_channel_open_callback(self.on_channel_open, True)
self.broker.create_event_queue()
self.channel_open = self.loop.create_future()
self.requests: dict[str, asyncio.Future] = {}
self.deserializer = AASFromJsonDecoder()
self.serializer = AASToJsonEncoder()
self.broker.connect()
def on_channel_open(self):
self.channel_open.set_result(True)
def callback(self, ch, method, properties, body):
try:
msg = broker_message.Message(base_msg=body)
msg_id = msg.base_msg.get("replyingToMessage")
sender = msg.base_msg["sender"]
message_type = msg.base_msg["messageType"]
value = msg.base_msg.get("value")
ok = msg.base_msg.get("ok")
logger.info(f"[S3I] Received {message_type} from {sender}")
if message_type == "eventMessage":
content = msg.base_msg.get("content")
topic = msg.base_msg.get("topic")
timestamp = msg.base_msg.get("timestamp")
logger.info(f"[Event] [{topic}] [{datetime.fromtimestamp(timestamp)}] {content}")
self.requests[topic].set_result(content)
else:
future: asyncio.Future = self.requests[msg_id]
if value:
future.set_result(self.deserializer.decode(value))
else:
future.set_result(ok)
except S3IBMessageError as e:
raise Exception(body)
async def getValue(self, receiver_id: str, endpoint: str, path: str):
if not self.channel_open.done():
await self.channel_open
msg = broker_message.GetValueRequest()
receiver_endpoint = endpoint
my_endpoint = "s3ibs://" + self.id
msg_id = "s3i:" + str(uuid4())
msg.fillGetValueRequest(
sender=self.id,
receivers=[receiver_id],
message_id=msg_id,
attribute_path=path,
reply_to_endpoint=my_endpoint
)
self.broker.send([receiver_endpoint], json.dumps(msg.base_msg))
reply = self.loop.create_future()
self.requests[msg_id] = reply
await reply
return reply.result()
async def setValue(self, receiver_id: str, endpoint: str, path: str, new_value):
if not self.channel_open.done():
await self.channel_open
msg = broker_message.SetValueRequest()
receiver_endpoint = endpoint
my_endpoint = "s3ibs://" + self.id
msg_id = "s3i:" + str(uuid4())
msg.fillSetValueRequest(
sender=self.id,
receivers=[receiver_id],
message_id=msg_id,
attribute_path=path,
reply_to_endpoint=my_endpoint,
new_value=self.serializer.encode(new_value)
)
self.broker.send([receiver_endpoint], json.dumps(msg.base_msg))
reply = self.loop.create_future()
self.requests[msg_id] = reply
await reply
return reply.result()
async def createValue(self, receiver_id: str, endpoint: str, path: str, value):
if not self.channel_open.done():
await self.channel_open
msg = broker_message.Message()
receiver_endpoint = endpoint
my_endpoint = "s3ibs://" + self.id
msg_id = "s3i:" + str(uuid4())
msg.base_msg["sender"] = self.id
msg.base_msg["receivers"] = [receiver_id]
msg.base_msg["identifier"] = msg_id
msg.base_msg["attributePath"] = path
msg.base_msg["newValue"] = self.serializer.encode(value)
msg.base_msg["messageType"] = "createAttributeRequest"
msg.base_msg["replyToEndpoint"] = my_endpoint
self.broker.send([receiver_endpoint], json.dumps(msg.base_msg))
reply = self.loop.create_future()
self.requests[msg_id] = reply
await reply
return reply.result()
async def subscribeToEvent(self, topic: str):
if not self.channel_open.done():
await self.channel_open
self.broker.subscribe_topic(topic)
handle = self.loop.create_future()
self.requests[topic] = handle
return handle