Skip to content
Snippets Groups Projects
Commit 51680d82 authored by Matthias Stefan Bodenbenner's avatar Matthias Stefan Bodenbenner
Browse files

added option to specify prefixes for topics

parent 57a025d9
No related branches found
No related tags found
No related merge requests found
Pipeline #53266 passed
# WZL-MQTT # WZL-MQTT
Current stable version: 2.2.0 Current stable version: 2.3.0
## Installation ## Installation
1. Install the WZL-MQTT package via pip 1. Install the WZL-MQTT package via pip
...@@ -71,6 +71,9 @@ If there are any questions contact [Matthias Bodenbenner](mailto:m.bodenbenner@w ...@@ -71,6 +71,9 @@ If there are any questions contact [Matthias Bodenbenner](mailto:m.bodenbenner@w
To obtain credentials for MQTT-Broker of WZL-MQ-MS contact [Mark Sanders](mailto:m.sanders@wzl.rwth-aachen.de) To obtain credentials for MQTT-Broker of WZL-MQ-MS contact [Mark Sanders](mailto:m.sanders@wzl.rwth-aachen.de)
## Changelog ## Changelog
2.3.0
- a prefix can now be defined which is prepended to every published or subscribed topic
2.2.0 2.2.0
- it is possible to connect with more than one client with an identical username to the broker - it is possible to connect with more than one client with an identical username to the broker
......
...@@ -16,7 +16,7 @@ Publishing MQTT Packages ...@@ -16,7 +16,7 @@ Publishing MQTT Packages
MQTT_VHOST = "/" MQTT_VHOST = "/"
# initialize publisher and connect to the broker # initialize publisher and connect to the broker
client = mqtt.MQTTPublisher(MQTT_USER) client = mqtt.MQTTPublisher()
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD)
# create message and publish the message as UTF-8 encoded string # create message and publish the message as UTF-8 encoded string
...@@ -29,28 +29,36 @@ Receiving MQTT Messages ...@@ -29,28 +29,36 @@ Receiving MQTT Messages
.. code-block:: .. code-block::
# username and password required to connect to the broker logger = mqtt.root_logger.get('Receiver')
MQTT_USER = "" MQTT_USER = ""
MQTT_PASSWORD = "" MQTT_PASSWORD = ""
# address, port and virtual host of the broker to connect to
MQTT_BROKER = "127.0.0.1" MQTT_BROKER = "127.0.0.1"
MQTT_PORT = 1883 MQTT_PORT = 1883
MQTT_VHOST = "/" MQTT_VHOST = "/"
# define callback which will be executed when a message is received ## To connect to the central MQTT-Broker of MQ-MS use the settings below.
## Ask Mark Sanders (sdr) for personal credentials.
# MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de"
# MQTT_PORT = 1883
# MQTT_VHOST = "metrology"
topic = "#" # set topic to subscribe according to MQTT syntax!
qos = 0 # set QoS according to MQTT specifications!
def print_mqtt_message(topic, message): def print_mqtt_message(topic, message):
print("{}\r\n### {} ###\r\n{}\r\n".format(datetime.datetime.utcnow().isoformat() + "Z", topic, message.decode("utf-8"))) logger.info("### {} ###\r\n{}\r\n".format(topic, message.decode("utf-8")))
# initialize subscriber and connect to the broker if __name__ == "__main__":
client = mqtt.MQTTSubscriber(MQTT_USER) client = mqtt.MQTTSubscriber()
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD)
# register the callback and subscribe topic
client.set_callback("PRINT", print_mqtt_message) client.set_callback("PRINT", print_mqtt_message)
client.subscribe('#') client.subscribe(topic, qos)
# start waiting loop to prevent program from exiting
while True: while True:
try: try:
time.sleep(1) time.sleep(1)
......
...@@ -6,8 +6,6 @@ import uuid ...@@ -6,8 +6,6 @@ import uuid
from wzl import mqtt from wzl import mqtt
### Ask for settings and individual credentials ###
MQTT_USER = "" MQTT_USER = ""
MQTT_PASSWORD = "" MQTT_PASSWORD = ""
...@@ -15,14 +13,12 @@ MQTT_BROKER = "127.0.0.1" ...@@ -15,14 +13,12 @@ MQTT_BROKER = "127.0.0.1"
MQTT_PORT = 1883 MQTT_PORT = 1883
MQTT_VHOST = "/" MQTT_VHOST = "/"
### to connect to the central MQTT-Broker of MQ-MS use the following settings: ## To connect to the central MQTT-Broker of MQ-MS use the settings below.
## Ask Mark Sanders (sdr) for personal credentials.
# MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de" # MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de"
# MQTT_PORT = 1883 # MQTT_PORT = 1883
# MQTT_VHOST = "metrology" # MQTT_VHOST = "metrology"
### Ask for settings and individual credentials ###
if __name__ == "__main__": if __name__ == "__main__":
client = mqtt.MQTTPublisher() client = mqtt.MQTTPublisher()
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD)
...@@ -31,7 +27,7 @@ if __name__ == "__main__": ...@@ -31,7 +27,7 @@ if __name__ == "__main__":
try: try:
message = json.dumps({"value": [random.uniform(0, 5) for i in range(3)], "timestamp": datetime.datetime.utcnow().isoformat() + "Z", message = json.dumps({"value": [random.uniform(0, 5) for i in range(3)], "timestamp": datetime.datetime.utcnow().isoformat() + "Z",
"covariance": [[2, 0, 0], [0, 2, 0], [0, 0, 0]], "nonce": str(uuid.uuid4()), "hash": None, "unit": "MTR"}) "covariance": [[2, 0, 0], [0, 2, 0], [0, 0, 0]], "nonce": str(uuid.uuid4()), "hash": None, "unit": "MTR"})
client.publish("/channel-001", message.encode("utf-8"), 0) client.publish("channel-001", message.encode("utf-8"), 0)
time.sleep(1) time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
break break
...@@ -4,8 +4,6 @@ from wzl import mqtt ...@@ -4,8 +4,6 @@ from wzl import mqtt
logger = mqtt.root_logger.get('Receiver') logger = mqtt.root_logger.get('Receiver')
### Ask for settings and individual credentials ###
MQTT_USER = "" MQTT_USER = ""
MQTT_PASSWORD = "" MQTT_PASSWORD = ""
...@@ -13,19 +11,20 @@ MQTT_BROKER = "127.0.0.1" ...@@ -13,19 +11,20 @@ MQTT_BROKER = "127.0.0.1"
MQTT_PORT = 1883 MQTT_PORT = 1883
MQTT_VHOST = "/" MQTT_VHOST = "/"
## to connect to the central MQTT-Broker of MQ-MS use the following settings: ## To connect to the central MQTT-Broker of MQ-MS use the settings below.
## Ask Mark Sanders (sdr) for personal credentials.
# MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de" # MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de"
# MQTT_PORT = 1883 # MQTT_PORT = 1883
# MQTT_VHOST = "metrology" # MQTT_VHOST = "metrology"
### Ask for settings and individual credentials ###
topic = "#" # set topic to subscribe according to MQTT syntax! topic = "#" # set topic to subscribe according to MQTT syntax!
qos = 0 # set QoS according to MQTT specifications! qos = 0 # set QoS according to MQTT specifications!
def print_mqtt_message(topic, message): def print_mqtt_message(topic, message):
logger.info("### {} ###\r\n{}\r\n".format(topic, message.decode("utf-8"))) logger.info("### {} ###\r\n{}\r\n".format(topic, message.decode("utf-8")))
if __name__ == "__main__": if __name__ == "__main__":
client = mqtt.MQTTSubscriber() client = mqtt.MQTTSubscriber()
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD)
...@@ -38,4 +37,3 @@ if __name__=="__main__": ...@@ -38,4 +37,3 @@ if __name__=="__main__":
time.sleep(1) time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
break break
from setuptools import setup, find_packages from setuptools import setup, find_packages
setup(name='wzl-mqtt', setup(name='wzl-mqtt',
version='2.2.0', version='2.3.0',
url='', url='',
author='Benjamin Montavon, Matthias Bodenbenner', author='Benjamin Montavon, Matthias Bodenbenner',
author_email='m.bodenbenner@wzl.rwth-aachen.de', author_email='m.bodenbenner@wzl.rwth-aachen.de',
......
...@@ -27,11 +27,12 @@ class MQTTClient: ...@@ -27,11 +27,12 @@ class MQTTClient:
instances = {} instances = {}
def __init__(self, username: str = None, *args, **kwargs): def __init__(self, username: str = None, prefix: str = None, *args, **kwargs):
""" """
Args: Args:
username: Unique username used as identifier for the client to be created. If None is given, it is created automatically. username: Unique username used as identifier for the client to be created. If None is given, it is created automatically.
prefix: Inserted to the beginning of each topic.
*args: Additional optional arguments for initializing the client as of the paho-mqtt package. *args: Additional optional arguments for initializing the client as of the paho-mqtt package.
**kwargs: Additional keyword-arguments for initializing the client as of the paho-mqtt package. **kwargs: Additional keyword-arguments for initializing the client as of the paho-mqtt package.
...@@ -45,6 +46,7 @@ class MQTTClient: ...@@ -45,6 +46,7 @@ class MQTTClient:
MQTTClient.instances[username] = self MQTTClient.instances[username] = self
self.name = username if username is not None else uuid.uuid4() self.name = username if username is not None else uuid.uuid4()
self.prefix = prefix
self._client = mqtt.Client(username, *args, **kwargs) self._client = mqtt.Client(username, *args, **kwargs)
self._client.on_connect = self._on_connect self._client.on_connect = self._on_connect
self._client.on_disconnect = self._on_disconnect self._client.on_disconnect = self._on_disconnect
...@@ -147,35 +149,18 @@ class MQTTPublisher(MQTTClient): ...@@ -147,35 +149,18 @@ class MQTTPublisher(MQTTClient):
""" """
def __init__(self, username: str = None, *args, **kwargs): def __init__(self, username: str = None, prefix: str = None, *args, **kwargs):
""" """
Args: Args:
username: Unique username used as identifier for the client to be created. If None is given, it is created automatically. username: Unique username used as identifier for the client to be created. If None is given, it is created automatically.
prefix: Inserted to the beginning of each topic.
*args: Additional optional arguments for initializing the client as of the paho-mqtt package. *args: Additional optional arguments for initializing the client as of the paho-mqtt package.
**kwargs: Additional optional keyword-arguments for initializing the client as of the paho-mqtt package. **kwargs: Additional optional keyword-arguments for initializing the client as of the paho-mqtt package.
""" """
MQTTClient.__init__(self, username, *args, **kwargs) MQTTClient.__init__(self, username, prefix, *args, **kwargs)
self._root_topic = username
self._client.on_publish = self._on_publish self._client.on_publish = self._on_publish
def connect(self, broker: str, port: str, username: str, password: str, websocket: bool = False, ssl: bool = False, keep_alive: int = 60):
"""Opens a connection to an MQTT-broker under the given address and post.
Args:
broker: The address (URL) of the MQTT-broker to connect to.
port: The port behind which the broker is running and accessible.
username: The username required for authentication at the broker
password: The password required for authentication at the broker
websocket: If true MQTT messages are published/received over WebSockets. If false, the default transportation over raw TCP is used.
ssl: If true a secured TLS connection is established.
keep_alive: maximum period in seconds allowed between communications with the broker.
If no other messages are being exchanged, this controls the rate at which the client will send ping messages to the broker.
"""
splitted_username = username.split(':')
self._root_topic = splitted_username[0] if len(splitted_username) < 2 else splitted_username[1]
super().connect(broker, port, username, password, websocket, ssl, keep_alive)
def publish(self, topic: str, message: str, qos: int = 0, retain: bool = False): def publish(self, topic: str, message: str, qos: int = 0, retain: bool = False):
""" Publish the given message under the given topic. """ Publish the given message under the given topic.
...@@ -186,13 +171,13 @@ class MQTTPublisher(MQTTClient): ...@@ -186,13 +171,13 @@ class MQTTPublisher(MQTTClient):
retain: If set to True, the message will be set as the “last known good”/retained message for the topic. retain: If set to True, the message will be set as the “last known good”/retained message for the topic.
""" """
try: try:
if self._root_topic is not None and self._root_topic != "": if self.prefix is not None and self.prefix != "":
self._client.publish(self._root_topic + "/" + topic.strip("/"), message, qos, retain) self._client.publish(self.prefix + "/" + topic.strip("/"), message, qos, retain)
else: else:
self._client.publish(topic.strip("/"), message, qos, retain) self._client.publish(topic.strip("/"), message, qos, retain)
self.logger.debug("MQTT Client {} will publish the following messsage to {}: {}".format(self.name, topic, message)) self.logger.debug("MQTT Client {} will publish the following messsage to {}: {}".format(self.name, topic, message))
except Exception as exception: except Exception as exception:
self.logger.error("MQTT Client {} could not publish to: {}".format(self.name, topic, str(exception))) self.logger.error("MQTT Client {} could not publish to {}: {}".format(self.name, topic, str(exception)))
raise PublishError(str(exception)) raise PublishError(str(exception))
def _on_publish(self, client, userdata, mid): def _on_publish(self, client, userdata, mid):
...@@ -210,15 +195,16 @@ class MQTTSubscriber(MQTTClient): ...@@ -210,15 +195,16 @@ class MQTTSubscriber(MQTTClient):
""" """
def __init__(self, username: str = None, *args, **kwargs): def __init__(self, username: str = None, prefix: str = None, *args, **kwargs):
""" """
Args: Args:
username:Unique username used as identifier for the client to be created. If None is given, it is created automatically. username:Unique username used as identifier for the client to be created. If None is given, it is created automatically.
prefix: Inserted to the beginning of each topic.
*args: Additional optional arguments of the internally used paho-mqtt client. *args: Additional optional arguments of the internally used paho-mqtt client.
**kwargs: Additional optional key word arguments of the internally used paho-mqtt client. **kwargs: Additional optional key word arguments of the internally used paho-mqtt client.
""" """
MQTTClient.__init__(self, username, *args, **kwargs) MQTTClient.__init__(self, username, prefix, *args, **kwargs)
self._client.on_message = self._on_message self._client.on_message = self._on_message
self._client.on_subscribe = self._on_subscribe self._client.on_subscribe = self._on_subscribe
self._client.on_unsubscribe = self._on_unsubscribe self._client.on_unsubscribe = self._on_unsubscribe
...@@ -238,6 +224,7 @@ class MQTTSubscriber(MQTTClient): ...@@ -238,6 +224,7 @@ class MQTTSubscriber(MQTTClient):
SubscriptionError: If topic could not be subscribed successfully. SubscriptionError: If topic could not be subscribed successfully.
""" """
try: try:
topic = self.prefix + "/" + topic if self.prefix is not None and self.prefix != "" else topic
for s in self._subscriptions: for s in self._subscriptions:
if s["topic"] == topic: if s["topic"] == topic:
raise RuntimeError("Topic {} is already subscribed!".format(topic)) raise RuntimeError("Topic {} is already subscribed!".format(topic))
...@@ -258,6 +245,7 @@ class MQTTSubscriber(MQTTClient): ...@@ -258,6 +245,7 @@ class MQTTSubscriber(MQTTClient):
SubscriptionError: If topic could not be unsubscribed successfully. SubscriptionError: If topic could not be unsubscribed successfully.
""" """
try: try:
topic = self.prefix + "/" + topic if self.prefix is not None and self.prefix != "" else topic
n = len(self._subscriptions) n = len(self._subscriptions)
for i in range(n): for i in range(n):
if self._subscriptions[i]["topic"] == topic: if self._subscriptions[i]["topic"] == topic:
......
import datetime
import json
import random
import time
import uuid
from src import mqtt
### Ask for settings and individual credentials ###
MQTT_USER = ""
MQTT_PASSWORD = ""
MQTT_BROKER = "127.0.0.1"
MQTT_PORT = 1883
MQTT_VHOST = "/"
### to connect to the central MQTT-Broker of MQ-MS use the following settings:
# MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de"
# MQTT_PORT = 1883
# MQTT_VHOST = "metrology"
### Ask for settings and individual credentials ###
if __name__ == "__main__":
client = mqtt.MQTTPublisher()
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD)
while True:
try:
message = json.dumps({"value": [random.uniform(0, 5) for i in range(3)], "timestamp": datetime.datetime.utcnow().isoformat() + "Z",
"covariance": [[2, 0, 0], [0, 2, 0], [0, 0, 0]], "nonce": str(uuid.uuid4()), "hash": None, "unit": "MTR"})
client.publish("channel-001", message.encode("utf-8"), 0)
time.sleep(1)
except KeyboardInterrupt:
break
import time
from src import mqtt
logger = mqtt.root_logger.get('Receiver')
### Ask for settings and individual credentials ###
MQTT_USER = ""
MQTT_PASSWORD = ""
MQTT_BROKER = "127.0.0.1"
MQTT_PORT = 1883
MQTT_VHOST = "/"
## to connect to the central MQTT-Broker of MQ-MS use the following settings:
# MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de"
# MQTT_PORT = 1883
# MQTT_VHOST = "metrology"
### Ask for settings and individual credentials ###
topic = "#" # set topic to subscribe according to MQTT syntax!
qos = 0 # set QoS according to MQTT specifications!
def print_mqtt_message(topic, message):
logger.info("### {} ###\r\n{}\r\n".format(topic, message.decode("utf-8")))
if __name__=="__main__":
client = mqtt.MQTTSubscriber()
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":"+ MQTT_USER, MQTT_PASSWORD)
client.set_callback("PRINT", print_mqtt_message)
client.subscribe(topic, qos)
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
break
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment