diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index e1ee071315fcb4698e6b69cb8af7199ab29a1698..3598fcca5106bb4c3b839aed8ea553b9e4fb6e93 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -40,17 +40,17 @@ Copy To Package Registry: script: - '& cmd /k "C:\Anaconda3\Scripts\activate.bat E:\environments\$env:UUID & pip install twine & python -m twine upload -u gitlab-ci-token -p $CI_JOB_TOKEN --repository-url https://git-ce.rwth-aachen.de/api/v4/projects/${CI_PROJECT_ID}/packages/pypi dist/*"' -Deploy Documentation: - stage: deploy - script: - - '& cmd /k "C:\Anaconda3\Scripts\activate.bat E:\environments\$env:UUID & sphinx-build -b html ./doc/source public"' - artifacts: - paths: - - public - only: - - master - tags: - - wzlmq-apps +#Deploy Documentation: +# stage: deploy +# script: +# - '& cmd /k "C:\Anaconda3\Scripts\activate.bat E:\environments\$env:UUID & sphinx-build -b html ./doc/source public"' +# artifacts: +# paths: +# - public +# only: +# - master +# tags: +# - wzlmq-apps Notify Rocket.Chat: only: diff --git a/README.md b/README.md index 2fb415c11cabde3020cfacc7f9fbb044ada88a02..2b8d7e5ea86723b1bb357d2011824f738e7182b3 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # WZL-MQTT -Current stable version: 1.3.0 +Current stable version: 2.0.0 ## Installation 1. Install the *WZL-Utilities* dependency via pip @@ -12,9 +12,69 @@ pip install --extra-index-url https://package-read:gkYP4xrm2PxicUbW1wra@git-ce.r ``` ## Usage -For ussage of the provided MQTT-Publisher and -Subscriber clients, please refer to the *sample* file in the corresponding folder. -If there are any question contact [Matthias Bodenbenner](mailto:m.bodenbenner@wzl.rwth-aachen.de). + +### Publish messages + +```python +# username and password required to connect to the broker +MQTT_USER = "" +MQTT_PASSWORD = "" + +# address, port and virtual host of the broker to connect to +MQTT_BROKER = "127.0.0.1" +MQTT_PORT = 1883 +MQTT_VHOST = "/" + +# initialize publisher and connect to the broker +client = mqtt.MQTTPublisher(MQTT_USER) +client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) + +# create message and publish the message as UTF-8 encoded string +message = json.dumps({"value": [random.uniform(0, 5) for i in range(3)], "timestamp": datetime.datetime.utcnow().isoformat() + "Z", + "covariance": [[2, 0, 0], [0, 2, 0], [0, 0, 0]], "nonce": str(uuid.uuid4()), "hash": None, "unit": "MTR"}) +client.publish(MQTT_USER + "/channel-001", message.encode("utf-8")) +``` + +### Subscribe to topics and receive messages +```python +# username and password required to connect to the broker +MQTT_USER = "" +MQTT_PASSWORD = "" + +# address, port and virtual host of the broker to connect to +MQTT_BROKER = "127.0.0.1" +MQTT_PORT = 1883 +MQTT_VHOST = "/" + +# define callback which will be executed when a message is received +def print_mqtt_message(topic, message): + print("{}\r\n### {} ###\r\n{}\r\n".format(datetime.datetime.utcnow().isoformat() + "Z", topic, message.decode("utf-8"))) + +# initialize subscriber and connect to the broker +client = mqtt.MQTTSubscriber(MQTT_USER) +client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":"+ MQTT_USER, MQTT_PASSWORD) + +# register the callback and subscribe topic +client.set_callback("PRINT", print_mqtt_message) +client.subscribe('#') + +# start waiting loop to prevent program from exiting +while True: + try: + time.sleep(1) + except KeyboardInterrupt: + break + +``` + +Slightly more detailled examples can be found in the *sample* directory. +If there are any questions contact [Matthias Bodenbenner](mailto:m.bodenbenner@wzl.rwth-aachen.de). +To obtain credentials for MQTT-Broker of WZL-MQ-MS contact [Mark Sanders](mailto:m.sanders@wzl.rwth-aachen.de) ## Recent changes +2.0.0 +- renamed the MQTReceiver to MQTTSubscriber for the sake of convenience +- added extensive documentation + 1.3.0 - added wzl-utilities dependency by sourcing out logging functionality \ No newline at end of file diff --git a/doc/source/index.rst b/doc/source/index.rst index adb047619a0911e8bbb8c58fb8e001af92ad5d57..e00aa69d8f81946eec9082d510b617eb0e59eddc 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -6,6 +6,22 @@ Welcome to WZL-MQTT's documentation! ==================================== +Installation +------------ + +1. Install the *WZL-Utilities* dependency via pip + +.. code-block:: bat + + pip install --extra-index-url https://package-read:_UtxUoKFjoHGs9XJusBq@git-ce.rwth-aachen.de/api/v4/projects/2815/packages/pypi/simple wzl-utilities + +2. Install the WZL-MQTT package via pip + +.. code-block:: bat + + pip install --extra-index-url https://package-read:gkYP4xrm2PxicUbW1wra@git-ce.rwth-aachen.de/api/v4/projects/1708/packages/pypi/simple wzl-mqtt + + .. toctree:: :maxdepth: 2 :caption: Contents: diff --git a/doc/source/mqtt.rst b/doc/source/mqtt.rst index 1954b14d2e57731ce321db850afe607426350ddc..5b63c647028578af313bedb0118ff8d70cb82b7f 100644 --- a/doc/source/mqtt.rst +++ b/doc/source/mqtt.rst @@ -1,7 +1,10 @@ -MQTT Package +API-Documentation ============ -MQTT Client Module +Submodules +---------- + +mqtt.client module ------------------ .. automodule:: mqtt.client @@ -9,7 +12,7 @@ MQTT Client Module :undoc-members: :show-inheritance: -MQTT Exception Module +mqtt.exceptions module ---------------------- .. automodule:: mqtt.exceptions @@ -17,3 +20,10 @@ MQTT Exception Module :undoc-members: :show-inheritance: +Module contents +--------------- + +.. automodule:: mqtt + :members: + :undoc-members: + :show-inheritance: diff --git a/doc/source/usage.rst b/doc/source/usage.rst index 39806a19b97e19f0f8998f0fe18a9b1d8215f202..efd59f305d425f2f0ad0dccd6e50dc8ec3b7435c 100644 --- a/doc/source/usage.rst +++ b/doc/source/usage.rst @@ -1,19 +1,59 @@ Usage of the MQTT-Package -============ +========================= -MQTT Client Module ------------------- +Publishing MQTT Packages +------------------------ -.. automodule:: mqtt.client - :members: - :undoc-members: - :show-inheritance: +.. code-block:: -MQTT Exception Module ----------------------- + # username and password required to connect to the broker + MQTT_USER = "" + MQTT_PASSWORD = "" -.. automodule:: mqtt.exceptions - :members: - :undoc-members: - :show-inheritance: + # address, port and virtual host of the broker to connect to + MQTT_BROKER = "127.0.0.1" + MQTT_PORT = 1883 + MQTT_VHOST = "/" + + # initialize publisher and connect to the broker + client = mqtt.MQTTPublisher(MQTT_USER) + client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) + + # create message and publish the message as UTF-8 encoded string + message = json.dumps({"value": [random.uniform(0, 5) for i in range(3)], "timestamp": datetime.datetime.utcnow().isoformat() + "Z", + "covariance": [[2, 0, 0], [0, 2, 0], [0, 0, 0]], "nonce": str(uuid.uuid4()), "hash": None, "unit": "MTR"}) + client.publish(MQTT_USER + "/channel-001", message.encode("utf-8")) + +Receiving MQTT Messages +----------------------- + +.. code-block:: + + # username and password required to connect to the broker + MQTT_USER = "" + MQTT_PASSWORD = "" + + # address, port and virtual host of the broker to connect to + MQTT_BROKER = "127.0.0.1" + MQTT_PORT = 1883 + MQTT_VHOST = "/" + + # define callback which will be executed when a message is received + def print_mqtt_message(topic, message): + print("{}\r\n### {} ###\r\n{}\r\n".format(datetime.datetime.utcnow().isoformat() + "Z", topic, message.decode("utf-8"))) + + # initialize subscriber and connect to the broker + client = mqtt.MQTTSubscriber(MQTT_USER) + client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":"+ MQTT_USER, MQTT_PASSWORD) + + # register the callback and subscribe topic + client.set_callback("PRINT", print_mqtt_message) + client.subscribe('#') + + # start waiting loop to prevent program from exiting + while True: + try: + time.sleep(1) + except KeyboardInterrupt: + break diff --git a/sample/__init__.py b/sample/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/sample/publish.py b/sample/publish.py index e62fff87dcb5aea5139d1c1c434ebe7bd7d55d81..51c7612fbb2374be0cb000b9c9e61fe8b1f77f2d 100644 --- a/sample/publish.py +++ b/sample/publish.py @@ -1,11 +1,12 @@ import datetime -# For sample data import json import logging import random import time import uuid +from wzl import mqtt + ### Ask for settings and individual credentials ### MQTT_USER = "" @@ -23,22 +24,21 @@ MQTT_VHOST = "/" ### Ask for settings and individual credentials ### - -if __name__=="__main__": +if __name__ == "__main__": console_log = logging.StreamHandler() - console_log.setFormatter(MQTT.formatter) + console_log.setFormatter(mqtt.formatter) - MQTT.logger.addHandler(console_log) - MQTT.logger.setLevel(logging.INFO) + mqtt.logger.addHandler(console_log) + mqtt.logger.setLevel(logging.INFO) - client = MQTTPublisher(MQTT_USER) - client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":"+ MQTT_USER, MQTT_PASSWORD) + client = mqtt.MQTTPublisher(MQTT_USER) + client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) while True: try: - message = json.dumps({"value" : random.random.uniform(0, 5,3).tolist(), "timestamp" : datetime.datetime.utcnow().isoformat() + "Z", "covariance": [[2,0,0], [0,2,0], [0,0,0]], "nonce" : str(uuid.uuid4()), "hash" : None, "unit" : "MTR"}) - client.publish(MQTT_USER + "/channel-001",message.encode("utf-8"), 0) + message = json.dumps({"value": [random.uniform(0, 5) for i in range(3)], "timestamp": datetime.datetime.utcnow().isoformat() + "Z", + "covariance": [[2, 0, 0], [0, 2, 0], [0, 0, 0]], "nonce": str(uuid.uuid4()), "hash": None, "unit": "MTR"}) + client.publish(MQTT_USER + "/channel-001", message.encode("utf-8"), 0) time.sleep(1) except KeyboardInterrupt: break - diff --git a/sample/receive.py b/sample/receive.py index 522e07d067ecdc04e2262041a8852775ffceabd1..f0b520043dbc572e7d47824b122fb2eb60555d34 100644 --- a/sample/receive.py +++ b/sample/receive.py @@ -1,13 +1,14 @@ -from wzl import MQTT import time import logging import asyncio import datetime +from wzl import mqtt + ### Ask for settings and individual credentials ### -MQTT_USER = -MQTT_PASSWORD = +MQTT_USER = "" +MQTT_PASSWORD = "" # MQTT_BROKER = "127.0.0.1" # MQTT_PORT = 1883 @@ -28,19 +29,17 @@ def print_mqtt_message(topic, message): if __name__=="__main__": console_log = logging.StreamHandler() - console_log.setFormatter(MQTT.formatter) + console_log.setFormatter(mqtt.formatter) - MQTT.logger.addHandler(console_log) - MQTT.logger.setLevel(logging.INFO) + mqtt.logger.addHandler(console_log) + mqtt.logger.setLevel(logging.INFO) - client = MQTT.MQTTSubscriber(MQTT_USER) + client = mqtt.MQTTSubscriber(MQTT_USER) client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":"+ MQTT_USER, MQTT_PASSWORD) client.set_callback("PRINT", print_mqtt_message) client.subscribe(topic, qos) - loop = asyncio.get_event_loop() - while True: try: time.sleep(1) diff --git a/setup.py b/setup.py index 3e5ba304c60e44822eb9274f6c59e00b4fdc06d8..e930476b4d7940c511509395410957721ed81089 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import setup, find_packages setup(name='wzl-mqtt', - version='2.0', + version='2.0.0', url='', author='Benjamin Montavon, Matthias Bodenbenner', author_email='m.bodenbenner@wzl.rwth-aachen.de', diff --git a/src/mqtt/client.py b/src/mqtt/client.py index 7f4c58d91f5d2ff88c85a1bc5c2bc083e9400291..b8dc3cb62e0bec96b4ce1783ceec958968bc48b7 100644 --- a/src/mqtt/client.py +++ b/src/mqtt/client.py @@ -1,6 +1,7 @@ import functools import inspect import warnings +from typing import Callable import paho.mqtt.client as mqtt from wzl.utilities import root_logger @@ -13,20 +14,29 @@ logger = root_logger.get('MQTTClient') class MQTTClient: - """Client base class. + """Client base class for MQTT Publisher and Subscriber. Provides methods required for connecting to a MQTT-Broker. + + Attributes: + instances: Static attribute of the class, containing all instances of the class. + By using a unique identifier for each instance doubles can be avoided. """ instances = {} - def __init__(self, uuid, *args, **kwargs): - """Constructor. + def __init__(self, uuid: str, *args, **kwargs): + """ + + Args: + uuid: Unique identifier for the client to be created. + *args: Additional optional arguments for initializing the client as of the paho-mqtt package. + **kwargs: Additional keyword-arguments for initializing the client as of the paho-mqtt package. - :param uuid: Unique identifier for the client to be created. - :param args: Additional arguments for initializing the client as of the paho-mqtt package. - :param kwargs: Additional keyword-arguments for initializing the client as of the paho-mqtt package. + Raises: + IndexError: If there already exists a client with the given uuid. """ + if uuid in MQTTClient.instances: logger.error("MQTT Client {} already exists!".format(uuid)) raise IndexError("MQTT Client {} already exists!".format(uuid)) @@ -34,30 +44,48 @@ class MQTTClient: MQTTClient.instances[uuid] = self self.name = uuid self._client = mqtt.Client(uuid, *args, **kwargs) - self._client.on_connect = self.__on_connect - self._client.on_disconnect = self.__on_disconnect + self._client.on_connect = self._on_connect + self._client.on_disconnect = self._on_disconnect self._client.loop_start() self._connected = False self.logger = logger - def __del__(self): - try: - self._client.loop_stop() - MQTTClient.instances.pop(self.name) - except: - pass - @classmethod - def get(cls, key): + def get(cls, key: str) -> 'MQTTClient': + """Returns the client with given key, if one exists. + + Args: + key: The unique key identifier of the client to be returned. + + Returns: The client identified by the given key. + + """ return cls.instances[key] @property - def connected(self): + def connected(self) -> bool: + """ Returns the current connection status of the client + + Returns: True, if the client is connected to a broker, false otherwise. + + """ return self._connected - def connect(self, broker, port, username, password, websocket=False, ssl=False, keep_alive=60): + def connect(self, broker: str, port: str, username: str, password: str, websocket: bool = False, ssl: bool = False, keep_alive: int = 60): + """Opens a connection to an MQTT-broker under the given address and post. + + Args: + broker: The address (URL) of the MQTT-broker to connect to. + port: The port behind which the broker is running and accessible. + username: The username required for authentication at the broker + password: The password required for authentication at the broker + websocket: If true MQTT messages are published/received over WebSockets. If false, the default transportation over raw TCP is used. + ssl: If true a secured TLS connection is established. + keep_alive: maximum period in seconds allowed between communications with the broker. + If no other messages are being exchanged, this controls the rate at which the client will send ping messages to the broker. + """ try: address = broker if ssl: @@ -76,13 +104,16 @@ class MQTTClient: raise ConnectionError(str(exception)) def disconnect(self): + """Disconnects the client and closes the connection to the broker. + + """ try: self._client.disconnect() except Exception as exception: self.logger.error("MQTT Client {} could not disconnect: {}".format(self.name, str(exception))) raise ConnectionError(str(exception)) - def __on_connect(self, client, userdata, flags, rc): + def _on_connect(self, client, userdata, flags, rc): if rc == 0: self.logger.info("MQTT Client {} connect terminated with code {} ({}).".format(self.name, rc, mqtt.error_string(rc))) self._connected = True @@ -90,94 +121,126 @@ class MQTTClient: self.logger.error("MQTT Client {} connect terminated with code {} ({}).".format(self.name, rc, mqtt.error_string(rc))) self._connected = False - def __on_disconnect(self, client, userdata, rc): + def _on_disconnect(self, client, userdata, rc): if rc == 1: self.logger.error( - "MQTT Client {} disconnected with code {} ({}). \n \t There are two possible reasons: \n\t\t 1. There already is a client connected, with the same credentials you are using now. \n\t\t 2. You may tried to use Publisher-Credentials for receiving messages or vise versa. Please contact Benjamin Montavon, to check if your login credentials are correct!".format( - self.name, rc, mqtt.error_string(rc))) + "MQTT Client {} disconnected with code {} ({}). \n \t There are two possible reasons: \n" + "\t\t 1. There already is a client connected, with the same credentials you are using now. \n" + "\t\t 2. You may tried to use Publisher-Credentials for receiving messages or vise versa. " + "Please contact Mark Sanders, to check if your login credentials are correct!".format(self.name, rc, mqtt.error_string(rc))) else: self.logger.info("MQTT Client {} disconnected with code {} ({}).".format(self.name, rc, mqtt.error_string(rc))) self._connected = False + def __del__(self): + try: + self._client.loop_stop() + MQTTClient.instances.pop(self.name) + except: + pass + class MQTTPublisher(MQTTClient): - """Publisher class. + """Minimal, simple class for publishing MQTT messages. - Provides methods for publishing messages. """ - def __init__(self, uuid, *args, **kwargs): - """Constructor. + def __init__(self, uuid: str, *args, **kwargs): + """ - :param uuid: Unique identifier for the client to be created. - :param args: Additional arguments for initializing the client as of the paho-mqtt package. - :param kwargs: Additional keyword-arguments for initializing the client as of the paho-mqtt package. + Args: + uuid: Unique identifier for the client to be created. + *args: Additional optional arguments for initializing the client as of the paho-mqtt package. + **kwargs: Additional optional keyword-arguments for initializing the client as of the paho-mqtt package. """ MQTTClient.__init__(self, uuid, *args, **kwargs) - self._client.on_publish = self.__on_publish + self._client.on_publish = self._on_publish - def __on_publish(self, client, userdata, mid): - self.logger.debug("MQTT Client {} published message {}.".format(self.name, mid)) + def publish(self, topic: str, message: str, qos: int = 0, retain: bool = False): + """ Publish the given message under the given topic. - def publish(self, topic, payload, qos=0, retain=False): + Args: + topic: The topic the message is published under. + message: The message to be published. Ideally encoded as UTF-8 string. + qos: Quality of service level, possible values are 0,1,2. + retain: If set to True, the message will be set as the “last known good”/retained message for the topic. + """ try: - self._client.publish(self.name + "/" + topic.strip("/"), payload, qos, retain) - self.logger.debug("MQTT Client {} will publish the following messsage to {}: {}".format(self.name, topic, payload)) + self._client.publish(self.name + "/" + topic.strip("/"), message, qos, retain) + self.logger.debug("MQTT Client {} will publish the following messsage to {}: {}".format(self.name, topic, message)) except Exception as exception: self.logger.error("MQTT Client {} could not publish to: {}".format(self.name, topic, str(exception))) raise PublishError(str(exception)) - def __shortcut(self, topic, payload, root_topic, qos, retain): - self._client.publish(root_topic + topic, payload, qos, retain) + def _on_publish(self, client, userdata, mid): + self.logger.debug("MQTT Client {} published message {}.".format(self.name, mid)) - def shortcut(self, root_topic, qos, retain): - return functools.partial(self.__shortcut, root_topic=root_topic.strip("/") + "/", qos=qos, retain=retain) + # def __shortcut(self, topic, payload, root_topic, qos, retain): + # self._client.publish(root_topic + topic, payload, qos, retain) + # + # def shortcut(self, root_topic, qos, retain): + # return functools.partial(self.__shortcut, root_topic=root_topic.strip("/") + "/", qos=qos, retain=retain) class MQTTSubscriber(MQTTClient): + """Minimal, simple class for subscribing, receiving and processing MQTT messages. + + """ + + def __init__(self, uuid: str, *args, **kwargs): + """ - def __init__(self, uuid, *args, **kwargs): + Args: + uuid: An unique identifier of the client. + *args: Additional optional arguments of the internally used paho-mqtt client. + **kwargs: Additional optional key word arguments of the internally used paho-mqtt client. + """ MQTTClient.__init__(self, uuid, *args, **kwargs) - self._client.on_message = self.__on_message - self._client.on_subscribe = self.__on_subscribe - self._client.on_unsubscribe = self.__on_unsubscribe - self._client.on_connect = self.__on_connect + self._client.on_message = self._on_message + self._client.on_subscribe = self._on_subscribe + self._client.on_unsubscribe = self._on_unsubscribe + self._client.on_connect = self._on_connect - self.__subscriptions = [] - self.__on_message_callbacks = {} + self._subscriptions = [] + self._on_message_callbacks = {} - def __on_connect(self, client, userdata, flags, rc): - if rc == 0: - self.logger.info("MQTT Client {} connect terminated with code {} ({}).".format(self.name, rc, mqtt.error_string(rc))) - self._connected = True - for s in self.__subscriptions: - self._client.subscribe(s["topic"], s["qos"]) - elif rc == 4: - self.logger.info("MQTT Client {} connect terminated with code {} ({})".format(self.name, rc, mqtt.error_string(rc))) - else: - self.logger.error("MQTT Client {} connect terminated with code {} ({}).".format(self.name, rc, mqtt.error_string(rc))) - self._connected = False + def subscribe(self, topic: str, qos: int = 0): + """Subscribes the given topic. + + Args: + topic: The topic to be subscribed given as string. + qos: Quality of service. Possible values are 0,1,2. - def subscribe(self, topic, qos=0): + Raises: + SubscriptionError: If topic could not be subscribed successfully. + """ try: - for s in self.__subscriptions: + for s in self._subscriptions: if s["topic"] == topic: raise RuntimeError("Topic {} is already subscribed!".format(topic)) - self.__subscriptions.append({"topic": topic, "qos": qos}) + self._subscriptions.append({"topic": topic, "qos": qos}) if self.connected: self._client.subscribe(topic, qos) except Exception as exception: self.logger.error("MQTT Client {} could not subscribe to {}: {}".format(self.name, topic, str(exception))) raise SubscriptionError(str(exception)) - def unsubscribe(self, topic): + def unsubscribe(self, topic: str): + """Unsubscribes to updates of the given topic. + + Args: + topic: The topic which should be unsubscribed given as string. + + Raises: + SubscriptionError: If topic could not be unsubscribed successfully. + """ try: - n = len(self.__subscriptions) + n = len(self._subscriptions) for i in range(n): - if self.__subscriptions[i]["topic"] == topic: + if self._subscriptions[i]["topic"] == topic: if self.connected: self._client.unsubscribe(topic) - self.__subscriptions.pop(i) + self._subscriptions.pop(i) return raise RuntimeError("Topic {} is not subscribed!".format(topic)) @@ -185,19 +248,15 @@ class MQTTSubscriber(MQTTClient): self.logger.error("MQTT Client {} could not unsubscribe from {}: {}".format(self.name, topic, str(exception))) raise SubscriptionError(str(exception)) - def __on_subscribe(self, client, userdata, mid, granted_qos, properties=None): - self.logger.info("MQTT Client {} subscribed with ID {}.".format(self.name, mid)) - - def __on_unsubscribe(self, client, userdata, mid): - self.logger.info("MQTT Client {} unsubscribed with ID {}.".format(self.name, mid)) - - def set_callback(self, key, function): + def set_callback(self, key: str, function: Callable): """Add a callback called at each received message. - :param key: Unique identifier of the callback. - :param function: to be called. + Args: + key: Unique identifier of the callback. + function: to be called when a message is received. The function must expect at two arguments. + First argument is the topic and the second the received message. """ - if key in self.__on_message_callbacks: + if key in self._on_message_callbacks: self.logger.warning("Overwriting callback {}!".format(key)) warnings.warn("Overwriting callback {}!".format(key), RuntimeWarning) @@ -206,14 +265,37 @@ class MQTTSubscriber(MQTTClient): self.logger.warning("Callback {} has insufficient parameters!".format(key)) warnings.warn("Callback {} has insufficient parameters!".format(key)) - self.__on_message_callbacks[key] = function + self._on_message_callbacks[key] = function + + def remove_callback(self, key: str): + """Removes the callback with the given key identifier. - def remove_callback(self, key): - self.__on_message_callbacks.pop(key) + Args: + key: The unique key identifier of the callback. + """ + self._on_message_callbacks.pop(key) + + def _on_connect(self, client, userdata, flags, rc): + if rc == 0: + self.logger.info("MQTT Client {} connect terminated with code {} ({}).".format(self.name, rc, mqtt.error_string(rc))) + self._connected = True + for s in self._subscriptions: + self._client.subscribe(s["topic"], s["qos"]) + elif rc == 4: + self.logger.info("MQTT Client {} connect terminated with code {} ({})".format(self.name, rc, mqtt.error_string(rc))) + else: + self.logger.error("MQTT Client {} connect terminated with code {} ({}).".format(self.name, rc, mqtt.error_string(rc))) + self._connected = False + + def _on_subscribe(self, client, userdata, mid, granted_qos, properties=None): + self.logger.info("MQTT Client {} subscribed with ID {}.".format(self.name, mid)) + + def _on_unsubscribe(self, client, userdata, mid): + self.logger.info("MQTT Client {} unsubscribed with ID {}.".format(self.name, mid)) - def __on_message(self, client, userdata, message): - for key in self.__on_message_callbacks: + def _on_message(self, client, userdata, message): + for key in self._on_message_callbacks: try: - self.__on_message_callbacks[key](message.topic, message.payload) + self._on_message_callbacks[key](message.topic, message.payload) except Exception as exception: self.logger.error("Exception while processing callback for topic {}: {}".format(message.topic, str(exception))) diff --git a/test/3d_test.py b/test/3d_test.py index a87855daf0ce328eaa39153f3dcfcaf39ccb9ba2..ed09203e12f896a4b1d574944ff5de79dc2fc1f2 100644 --- a/test/3d_test.py +++ b/test/3d_test.py @@ -21,7 +21,7 @@ while True: i+=1 client.publish( topic="quatsch"+"/VAR-position", - payload=json.dumps({ + message=json.dumps({ "value": pos, "timestamp": datetime.utcnow().isoformat()+"Z", "covariance": None, diff --git a/test/performance_test.py b/test/performance_test.py index 1098fd4dfa4252cabb716be6ae59928d592c36de..0e85820d892a039e24dabb6f776f15eb13b9af46 100644 --- a/test/performance_test.py +++ b/test/performance_test.py @@ -20,7 +20,7 @@ while True: test += random.random()-0.5 client.publish( topic="quatsch"+"/OBJ-Environment/OBJ-0/VAR-TEST", - payload=json.dumps({ + message=json.dumps({ "value": [float(test)], "timestamp": datetime.utcnow().isoformat()+"Z", "covariance": None,