diff --git a/README.md b/README.md index 5bc9ecfc968d30053d291d3d76d0efe69cfa3378..1e9b86d6a992ef4eb8f79ee042e84c3ba634321f 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # WZL-MQTT -Current stable version: 2.2.0 +Current stable version: 2.3.0 ## Installation 1. Install the WZL-MQTT package via pip @@ -71,6 +71,9 @@ If there are any questions contact [Matthias Bodenbenner](mailto:m.bodenbenner@w To obtain credentials for MQTT-Broker of WZL-MQ-MS contact [Mark Sanders](mailto:m.sanders@wzl.rwth-aachen.de) ## Changelog +2.3.0 +- a prefix can now be defined which is prepended to every published or subscribed topic + 2.2.0 - it is possible to connect with more than one client with an identical username to the broker diff --git a/doc/source/usage.rst b/doc/source/usage.rst index efd59f305d425f2f0ad0dccd6e50dc8ec3b7435c..3e2869929a1529563a18d067c02c35a6077525d0 100644 --- a/doc/source/usage.rst +++ b/doc/source/usage.rst @@ -16,7 +16,7 @@ Publishing MQTT Packages MQTT_VHOST = "/" # initialize publisher and connect to the broker - client = mqtt.MQTTPublisher(MQTT_USER) + client = mqtt.MQTTPublisher() client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) # create message and publish the message as UTF-8 encoded string @@ -29,31 +29,39 @@ Receiving MQTT Messages .. code-block:: - # username and password required to connect to the broker + logger = mqtt.root_logger.get('Receiver') + MQTT_USER = "" MQTT_PASSWORD = "" - # address, port and virtual host of the broker to connect to MQTT_BROKER = "127.0.0.1" MQTT_PORT = 1883 MQTT_VHOST = "/" - # define callback which will be executed when a message is received + ## To connect to the central MQTT-Broker of MQ-MS use the settings below. + ## Ask Mark Sanders (sdr) for personal credentials. + # MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de" + # MQTT_PORT = 1883 + # MQTT_VHOST = "metrology" + + topic = "#" # set topic to subscribe according to MQTT syntax! + qos = 0 # set QoS according to MQTT specifications! + + def print_mqtt_message(topic, message): - print("{}\r\n### {} ###\r\n{}\r\n".format(datetime.datetime.utcnow().isoformat() + "Z", topic, message.decode("utf-8"))) + logger.info("### {} ###\r\n{}\r\n".format(topic, message.decode("utf-8"))) + - # initialize subscriber and connect to the broker - client = mqtt.MQTTSubscriber(MQTT_USER) - client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":"+ MQTT_USER, MQTT_PASSWORD) + if __name__ == "__main__": + client = mqtt.MQTTSubscriber() + client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) - # register the callback and subscribe topic - client.set_callback("PRINT", print_mqtt_message) - client.subscribe('#') + client.set_callback("PRINT", print_mqtt_message) + client.subscribe(topic, qos) - # start waiting loop to prevent program from exiting - while True: - try: - time.sleep(1) - except KeyboardInterrupt: + while True: + try: + time.sleep(1) + except KeyboardInterrupt: break diff --git a/sample/publish.py b/sample/publish.py index 67d3b91262acee9af5561e98b337dfdd58c7a17d..60bed0fe43e0834cec3c6fa73d2c3413c4073203 100644 --- a/sample/publish.py +++ b/sample/publish.py @@ -6,8 +6,6 @@ import uuid from wzl import mqtt -### Ask for settings and individual credentials ### - MQTT_USER = "" MQTT_PASSWORD = "" @@ -15,14 +13,12 @@ MQTT_BROKER = "127.0.0.1" MQTT_PORT = 1883 MQTT_VHOST = "/" -### to connect to the central MQTT-Broker of MQ-MS use the following settings: +## To connect to the central MQTT-Broker of MQ-MS use the settings below. +## Ask Mark Sanders (sdr) for personal credentials. # MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de" # MQTT_PORT = 1883 # MQTT_VHOST = "metrology" -### Ask for settings and individual credentials ### - - if __name__ == "__main__": client = mqtt.MQTTPublisher() client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) @@ -31,7 +27,7 @@ if __name__ == "__main__": try: message = json.dumps({"value": [random.uniform(0, 5) for i in range(3)], "timestamp": datetime.datetime.utcnow().isoformat() + "Z", "covariance": [[2, 0, 0], [0, 2, 0], [0, 0, 0]], "nonce": str(uuid.uuid4()), "hash": None, "unit": "MTR"}) - client.publish("/channel-001", message.encode("utf-8"), 0) + client.publish("channel-001", message.encode("utf-8"), 0) time.sleep(1) except KeyboardInterrupt: break diff --git a/sample/receive.py b/sample/receive.py index 4e5fb3f1443356e4063d0cdac9d336d8c05d8e96..1315312004be614751601ad3ffc9de5c08f4080f 100644 --- a/sample/receive.py +++ b/sample/receive.py @@ -4,8 +4,6 @@ from wzl import mqtt logger = mqtt.root_logger.get('Receiver') -### Ask for settings and individual credentials ### - MQTT_USER = "" MQTT_PASSWORD = "" @@ -13,22 +11,23 @@ MQTT_BROKER = "127.0.0.1" MQTT_PORT = 1883 MQTT_VHOST = "/" -## to connect to the central MQTT-Broker of MQ-MS use the following settings: +## To connect to the central MQTT-Broker of MQ-MS use the settings below. +## Ask Mark Sanders (sdr) for personal credentials. # MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de" # MQTT_PORT = 1883 # MQTT_VHOST = "metrology" -### Ask for settings and individual credentials ### +topic = "#" # set topic to subscribe according to MQTT syntax! +qos = 0 # set QoS according to MQTT specifications! -topic = "#" # set topic to subscribe according to MQTT syntax! -qos = 0 # set QoS according to MQTT specifications! def print_mqtt_message(topic, message): logger.info("### {} ###\r\n{}\r\n".format(topic, message.decode("utf-8"))) -if __name__=="__main__": + +if __name__ == "__main__": client = mqtt.MQTTSubscriber() - client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":"+ MQTT_USER, MQTT_PASSWORD) + client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) client.set_callback("PRINT", print_mqtt_message) client.subscribe(topic, qos) @@ -38,4 +37,3 @@ if __name__=="__main__": time.sleep(1) except KeyboardInterrupt: break - diff --git a/setup.py b/setup.py index a324f98a82e7f9331fb03ec37ace2d2c98823710..140d57dc607cb390634917ac8b8a5885349a62a6 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import setup, find_packages setup(name='wzl-mqtt', - version='2.2.0', + version='2.3.0', url='', author='Benjamin Montavon, Matthias Bodenbenner', author_email='m.bodenbenner@wzl.rwth-aachen.de', diff --git a/src/mqtt/client.py b/src/mqtt/client.py index 3f540f62c94581ce6843ce754c103a84403ed53f..46da7cd36e0ca18168c861e23d03d97d2d82e20e 100644 --- a/src/mqtt/client.py +++ b/src/mqtt/client.py @@ -27,11 +27,12 @@ class MQTTClient: instances = {} - def __init__(self, username: str = None, *args, **kwargs): + def __init__(self, username: str = None, prefix: str = None, *args, **kwargs): """ Args: username: Unique username used as identifier for the client to be created. If None is given, it is created automatically. + prefix: Inserted to the beginning of each topic. *args: Additional optional arguments for initializing the client as of the paho-mqtt package. **kwargs: Additional keyword-arguments for initializing the client as of the paho-mqtt package. @@ -45,6 +46,7 @@ class MQTTClient: MQTTClient.instances[username] = self self.name = username if username is not None else uuid.uuid4() + self.prefix = prefix self._client = mqtt.Client(username, *args, **kwargs) self._client.on_connect = self._on_connect self._client.on_disconnect = self._on_disconnect @@ -147,35 +149,18 @@ class MQTTPublisher(MQTTClient): """ - def __init__(self, username: str = None, *args, **kwargs): + def __init__(self, username: str = None, prefix: str = None, *args, **kwargs): """ Args: username: Unique username used as identifier for the client to be created. If None is given, it is created automatically. + prefix: Inserted to the beginning of each topic. *args: Additional optional arguments for initializing the client as of the paho-mqtt package. **kwargs: Additional optional keyword-arguments for initializing the client as of the paho-mqtt package. """ - MQTTClient.__init__(self, username, *args, **kwargs) - self._root_topic = username + MQTTClient.__init__(self, username, prefix, *args, **kwargs) self._client.on_publish = self._on_publish - def connect(self, broker: str, port: str, username: str, password: str, websocket: bool = False, ssl: bool = False, keep_alive: int = 60): - """Opens a connection to an MQTT-broker under the given address and post. - - Args: - broker: The address (URL) of the MQTT-broker to connect to. - port: The port behind which the broker is running and accessible. - username: The username required for authentication at the broker - password: The password required for authentication at the broker - websocket: If true MQTT messages are published/received over WebSockets. If false, the default transportation over raw TCP is used. - ssl: If true a secured TLS connection is established. - keep_alive: maximum period in seconds allowed between communications with the broker. - If no other messages are being exchanged, this controls the rate at which the client will send ping messages to the broker. - """ - splitted_username = username.split(':') - self._root_topic = splitted_username[0] if len(splitted_username) < 2 else splitted_username[1] - super().connect(broker, port, username, password, websocket, ssl, keep_alive) - def publish(self, topic: str, message: str, qos: int = 0, retain: bool = False): """ Publish the given message under the given topic. @@ -186,13 +171,13 @@ class MQTTPublisher(MQTTClient): retain: If set to True, the message will be set as the “last known good”/retained message for the topic. """ try: - if self._root_topic is not None and self._root_topic != "": - self._client.publish(self._root_topic + "/" + topic.strip("/"), message, qos, retain) + if self.prefix is not None and self.prefix != "": + self._client.publish(self.prefix + "/" + topic.strip("/"), message, qos, retain) else: self._client.publish(topic.strip("/"), message, qos, retain) self.logger.debug("MQTT Client {} will publish the following messsage to {}: {}".format(self.name, topic, message)) except Exception as exception: - self.logger.error("MQTT Client {} could not publish to: {}".format(self.name, topic, str(exception))) + self.logger.error("MQTT Client {} could not publish to {}: {}".format(self.name, topic, str(exception))) raise PublishError(str(exception)) def _on_publish(self, client, userdata, mid): @@ -210,15 +195,16 @@ class MQTTSubscriber(MQTTClient): """ - def __init__(self, username: str = None, *args, **kwargs): + def __init__(self, username: str = None, prefix: str = None, *args, **kwargs): """ Args: username:Unique username used as identifier for the client to be created. If None is given, it is created automatically. + prefix: Inserted to the beginning of each topic. *args: Additional optional arguments of the internally used paho-mqtt client. **kwargs: Additional optional key word arguments of the internally used paho-mqtt client. """ - MQTTClient.__init__(self, username, *args, **kwargs) + MQTTClient.__init__(self, username, prefix, *args, **kwargs) self._client.on_message = self._on_message self._client.on_subscribe = self._on_subscribe self._client.on_unsubscribe = self._on_unsubscribe @@ -238,6 +224,7 @@ class MQTTSubscriber(MQTTClient): SubscriptionError: If topic could not be subscribed successfully. """ try: + topic = self.prefix + "/" + topic if self.prefix is not None and self.prefix != "" else topic for s in self._subscriptions: if s["topic"] == topic: raise RuntimeError("Topic {} is already subscribed!".format(topic)) @@ -258,6 +245,7 @@ class MQTTSubscriber(MQTTClient): SubscriptionError: If topic could not be unsubscribed successfully. """ try: + topic = self.prefix + "/" + topic if self.prefix is not None and self.prefix != "" else topic n = len(self._subscriptions) for i in range(n): if self._subscriptions[i]["topic"] == topic: diff --git a/test/publish.py b/test/publish.py new file mode 100644 index 0000000000000000000000000000000000000000..2ad3d756c495180af3c55855b37a9c891002cf7f --- /dev/null +++ b/test/publish.py @@ -0,0 +1,37 @@ +import datetime +import json +import random +import time +import uuid + +from src import mqtt + +### Ask for settings and individual credentials ### + +MQTT_USER = "" +MQTT_PASSWORD = "" + +MQTT_BROKER = "127.0.0.1" +MQTT_PORT = 1883 +MQTT_VHOST = "/" + +### to connect to the central MQTT-Broker of MQ-MS use the following settings: +# MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de" +# MQTT_PORT = 1883 +# MQTT_VHOST = "metrology" + +### Ask for settings and individual credentials ### + + +if __name__ == "__main__": + client = mqtt.MQTTPublisher() + client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) + + while True: + try: + message = json.dumps({"value": [random.uniform(0, 5) for i in range(3)], "timestamp": datetime.datetime.utcnow().isoformat() + "Z", + "covariance": [[2, 0, 0], [0, 2, 0], [0, 0, 0]], "nonce": str(uuid.uuid4()), "hash": None, "unit": "MTR"}) + client.publish("channel-001", message.encode("utf-8"), 0) + time.sleep(1) + except KeyboardInterrupt: + break diff --git a/test/receive.py b/test/receive.py new file mode 100644 index 0000000000000000000000000000000000000000..374beb5dfe39cff7c729eaedf5beca9c209e57ac --- /dev/null +++ b/test/receive.py @@ -0,0 +1,41 @@ +import time + +from src import mqtt + +logger = mqtt.root_logger.get('Receiver') + +### Ask for settings and individual credentials ### + +MQTT_USER = "" +MQTT_PASSWORD = "" + +MQTT_BROKER = "127.0.0.1" +MQTT_PORT = 1883 +MQTT_VHOST = "/" + +## to connect to the central MQTT-Broker of MQ-MS use the following settings: +# MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de" +# MQTT_PORT = 1883 +# MQTT_VHOST = "metrology" + +### Ask for settings and individual credentials ### + +topic = "#" # set topic to subscribe according to MQTT syntax! +qos = 0 # set QoS according to MQTT specifications! + +def print_mqtt_message(topic, message): + logger.info("### {} ###\r\n{}\r\n".format(topic, message.decode("utf-8"))) + +if __name__=="__main__": + client = mqtt.MQTTSubscriber() + client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":"+ MQTT_USER, MQTT_PASSWORD) + + client.set_callback("PRINT", print_mqtt_message) + client.subscribe(topic, qos) + + while True: + try: + time.sleep(1) + except KeyboardInterrupt: + break +