diff --git a/README.md b/README.md index 559d85d0cb8d912f9a76294bb728e94174800388..8748f4513e716cd9e2a119bfdfc71d9aaf1c6daa 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,11 @@ # WZL-MQTT [](https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/digital-mars/mqtt/badges/master) -Current stable version: 2.3.0 +Current stable version: 2.4.0 ## Installation +Requires at least Python 3.6 + 1. Install the WZL-MQTT package via pip ``` pip install --extra-index-url https://package-read:gkYP4xrm2PxicUbW1wra@git-ce.rwth-aachen.de/api/v4/projects/1708/packages/pypi/simple wzl-mqtt @@ -25,7 +27,7 @@ MQTT_VHOST = "/" # initialize publisher and connect to the broker client = mqtt.MQTTPublisher() -client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) +client.connect(MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD, vhost=MQTT_VHOST) # create message and publish the message as UTF-8 encoded string message = json.dumps({"value": [random.uniform(0, 5) for i in range(3)], "timestamp": datetime.datetime.utcnow().isoformat() + "Z", @@ -45,7 +47,7 @@ MQTT_PORT = 1883 MQTT_VHOST = "/" # initialize logger -logger = mqtt.root_logger.get('Receiver') # change 'Receiver' to any string you like +logger = mqtt.Logger.get('Receiver') # change 'Receiver' to any string you like # define callback which will be executed when a message is received def print_mqtt_message(topic, message): @@ -53,7 +55,7 @@ def print_mqtt_message(topic, message): # initialize subscriber and connect to the broker client = mqtt.MQTTSubscriber() -client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":"+ MQTT_USER, MQTT_PASSWORD) +client.connect(MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD, vhost=MQTT_VHOST) # register the callback and subscribe topic client.set_callback("PRINT", print_mqtt_message) @@ -77,6 +79,12 @@ To obtain credentials for MQTT-Broker of WZL-MQ-MS contact [Mark Sanders](mailto For more detailed explanation and full API documentation, view [https://iot.wzl-mq.rwth-aachen.de/documentation/libs/mqtt/](https://iot.wzl-mq.rwth-aachen.de/documentation/libs/mqtt/) ## Changelog +2.4.0 +- added vhost parameter to connect function of client +- changed default logging behaviour + - now by default logging is only written to console + - if logging should also create log-files, the logger must be explicitly initialized + 2.3.0 - a prefix can now be defined which is prepended to every published or subscribed topic diff --git a/doc/source/mqtt.rst b/doc/source/mqtt.rst index 30ffdec29565a5f9edce30089d4c7961e720174c..ba6fd4de1f95425050a42005c7bea6a9855be1b8 100644 --- a/doc/source/mqtt.rst +++ b/doc/source/mqtt.rst @@ -20,6 +20,14 @@ mqtt.exceptions module :undoc-members: :show-inheritance: +mqtt.logger module +------------------ + +.. automodule:: mqtt.logger + :members: + :undoc-members: + :show-inheritance: + Module contents --------------- @@ -27,4 +35,3 @@ Module contents :members: :undoc-members: :show-inheritance: - diff --git a/doc/source/usage.rst b/doc/source/usage.rst index 3e2869929a1529563a18d067c02c35a6077525d0..c9ccf66f5cdc2f0ff8c3f5a7ed3bf920c92d3a43 100644 --- a/doc/source/usage.rst +++ b/doc/source/usage.rst @@ -13,11 +13,10 @@ Publishing MQTT Packages # address, port and virtual host of the broker to connect to MQTT_BROKER = "127.0.0.1" MQTT_PORT = 1883 - MQTT_VHOST = "/" # initialize publisher and connect to the broker client = mqtt.MQTTPublisher() - client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) + client.connect(MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD) # create message and publish the message as UTF-8 encoded string message = json.dumps({"value": [random.uniform(0, 5) for i in range(3)], "timestamp": datetime.datetime.utcnow().isoformat() + "Z", @@ -39,9 +38,10 @@ Receiving MQTT Messages MQTT_VHOST = "/" ## To connect to the central MQTT-Broker of MQ-MS use the settings below. - ## Ask Mark Sanders (sdr) for personal credentials. - # MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de" - # MQTT_PORT = 1883 + ## Ask Mark Sanders (sdr) or Matthias Bodenbenner (bdn) for personal credentials. + ## Set the "ssl" flag of the connect function to "True". + # MQTT_BROKER = "mqtt.wzl-mq.rwth-aachen.de" + # MQTT_PORT = 8883 # MQTT_VHOST = "metrology" topic = "#" # set topic to subscribe according to MQTT syntax! @@ -54,7 +54,7 @@ Receiving MQTT Messages if __name__ == "__main__": client = mqtt.MQTTSubscriber() - client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) + client.connect(MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD, vhost=MQTT_VHOST) client.set_callback("PRINT", print_mqtt_message) client.subscribe(topic, qos) diff --git a/private_testing/publish.py b/private_testing/publish.py new file mode 100644 index 0000000000000000000000000000000000000000..7842d210a4f3506b36f150e4e3648414c5302301 --- /dev/null +++ b/private_testing/publish.py @@ -0,0 +1,39 @@ +import datetime +import json +import random +import time +import uuid + +from src.mqtt import Logger +from src import mqtt + +### Ask for settings and individual credentials ### + +MQTT_USER = "bdn-212d8419-c75c-471f-8ea5-f3a5c19ac42e" +MQTT_PASSWORD = "azesR4Q8~M7UBKh<7S~d\"NN-|)i9:Le[" + +MQTT_BROKER = "mqtt.wzl-mq.rwth-aachen.de" +MQTT_PORT = 8883 +MQTT_VHOST = "metrology" + +### to connect to the central MQTT-Broker of MQ-MS use the following settings: +# MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de" +# MQTT_PORT = 1883 +# MQTT_VHOST = "metrology" + +### Ask for settings and individual credentials ### + + +if __name__ == "__main__": + + client = mqtt.MQTTPublisher(prefix="bdn-212d8419-c75c-471f-8ea5-f3a5c19ac42e") + client.connect(MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD, vhost=MQTT_VHOST, ssl=True) + + while True: + try: + message = json.dumps({"value": [random.uniform(0, 5) for i in range(3)], "timestamp": datetime.datetime.utcnow().isoformat() + "Z", + "covariance": [[2, 0, 0], [0, 2, 0], [0, 0, 0]], "nonce": str(uuid.uuid4()), "hash": None, "unit": "MTR"}) + client.publish("channel-001", message.encode("utf-8"), 2) + time.sleep(1) + except KeyboardInterrupt: + break diff --git a/private_testing/receive.py b/private_testing/receive.py new file mode 100644 index 0000000000000000000000000000000000000000..d08d8ae8dfaf0eda23ee7044f886b3bd2ca55015 --- /dev/null +++ b/private_testing/receive.py @@ -0,0 +1,42 @@ +import time + +from src.mqtt import Logger +from src import mqtt + +### Ask for settings and individual credentials ### + +MQTT_USER = "bdn-212d8419-c75c-471f-8ea5-f3a5c19ac42e" +MQTT_PASSWORD = "azesR4Q8~M7UBKh<7S~d\"NN-|)i9:Le[" + +MQTT_BROKER = "mqtt.wzl-mq.rwth-aachen.de" +MQTT_PORT = 8883 +MQTT_VHOST = "metrology" + +## to connect to the central MQTT-Broker of MQ-MS use the following settings: +# MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de" +# MQTT_PORT = 1883 +# MQTT_VHOST = "metrology" + +### Ask for settings and individual credentials ### + +topic = "#" # set topic to subscribe according to MQTT syntax! +qos = 0 # set QoS according to MQTT specifications! + +logger = Logger() + +def print_mqtt_message(topic, message): + logger.get('Receiver').info("### {} ###\r\n{}\r\n".format(topic, message.decode("utf-8"))) + +if __name__=="__main__": + client = mqtt.MQTTSubscriber(prefix="bdn-212d8419-c75c-471f-8ea5-f3a5c19ac42e", logger=logger) + client.connect(MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD, vhost=MQTT_VHOST) + + client.set_callback("PRINT", print_mqtt_message) + # client.subscribe(topic, qos) + + while True: + try: + time.sleep(1) + except KeyboardInterrupt: + break + diff --git a/setup.py b/setup.py index 140d57dc607cb390634917ac8b8a5885349a62a6..7ce55fa730211a04711ff3e4894359da551fcf55 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import setup, find_packages setup(name='wzl-mqtt', - version='2.3.0', + version='2.4.0', url='', author='Benjamin Montavon, Matthias Bodenbenner', author_email='m.bodenbenner@wzl.rwth-aachen.de', diff --git a/src/mqtt/__init__.py b/src/mqtt/__init__.py index 3e09dfc7fa8b1db7544d449cd51c98f5abc7afbe..5919f59a911e0c66ab90c828762186d0dac38366 100644 --- a/src/mqtt/__init__.py +++ b/src/mqtt/__init__.py @@ -6,4 +6,4 @@ from .exceptions import PublishError from .client import MQTTPublisher from .client import MQTTSubscriber -from ._logger import root_logger +from .logger import Logger diff --git a/src/mqtt/_logger.py b/src/mqtt/_logger.py deleted file mode 100644 index 1ca112c5eba703d370d1884711ebf5365bb042cf..0000000000000000000000000000000000000000 --- a/src/mqtt/_logger.py +++ /dev/null @@ -1,62 +0,0 @@ -import datetime -import logging -import logging.config -import os -import sys - - -class Logger(object): - def __init__(self, path, level=None): - self._path = path - self.root_logger = logging.getLogger(None) - self.root_logger.setLevel(logging.DEBUG) - - if len(sys.argv[0].split("\\")) > 1: - filename = sys.argv[0].split("\\")[-1] - else: - filename = sys.argv[0].split("/")[-1] - - self.file_handler = logging.handlers.RotatingFileHandler( - os.path.join(self._path, "{}_{}.txt".format(filename, datetime.datetime.now().isoformat()).replace(':', '-')), maxBytes=20 * 1024 ** 2, - backupCount=3) - self.file_handler.doRollover() - if level is None: - self.file_handler.setLevel(logging.DEBUG) - else: - self.file_handler.setLevel(level) - self.console_handler = logging.StreamHandler() - if level is None: - self.console_handler.setLevel(logging.INFO) - else: - self.console_handler.setLevel(level) - self.root_formatter = logging.Formatter('### %(levelname)-8s %(asctime)s %(name)-40s ###\n%(message)s\n') - self.file_handler.setFormatter(self.root_formatter) - self.console_handler.setFormatter(self.root_formatter) - self.root_logger.addHandler(self.file_handler) - self.root_logger.addHandler(self.console_handler) - - def get(self, name=None): - return logging.getLogger(name) - - def set_logging_level(self, level, target=''): - if target == 'FileHandler': - self.file_handler.setLevel(level) - elif target == 'StreamHandler': - self.console_handler.setLevel(level) - else: - self.console_handler.setLevel(level) - self.file_handler.setLevel(level) - - -path = './logs' -for aid, arg in enumerate(sys.argv): - if arg == '--logging-path': - if aid == len(sys.argv) - 1: - print('ERROR: No path argument given!') - else: - path = sys.argv[aid + 1] - break - -if not os.path.isdir(path): - os.makedirs(path) -root_logger = Logger(path) diff --git a/src/mqtt/client.py b/src/mqtt/client.py index 46da7cd36e0ca18168c861e23d03d97d2d82e20e..a17815c1586a3dd17d4e8b95e563d103b2173e7f 100644 --- a/src/mqtt/client.py +++ b/src/mqtt/client.py @@ -1,19 +1,15 @@ -import functools import inspect +import uuid import warnings -from typing import Callable +from typing import Callable, Union import paho.mqtt.client as mqtt -import uuid - -from ._logger import root_logger +from .logger import Logger from .exceptions import ConnectionError from .exceptions import PublishError from .exceptions import SubscriptionError -logger = root_logger.get('MQTTClient') - class MQTTClient: """Client base class for MQTT Publisher and Subscriber. @@ -27,12 +23,14 @@ class MQTTClient: instances = {} - def __init__(self, username: str = None, prefix: str = None, *args, **kwargs): + def __init__(self, username: str = None, prefix: str = None, + logger: Logger = None, *args, **kwargs): """ Args: username: Unique username used as identifier for the client to be created. If None is given, it is created automatically. prefix: Inserted to the beginning of each topic. + logger: Logger for creating a log of activities to console/terminal and/or file. *args: Additional optional arguments for initializing the client as of the paho-mqtt package. **kwargs: Additional keyword-arguments for initializing the client as of the paho-mqtt package. @@ -40,8 +38,12 @@ class MQTTClient: IndexError: If there already exists a client with the given uuid. """ + self._logger = logger.get( + 'MQTTClient') if logger is not None else Logger().get('MQTTClient') + if username in MQTTClient.instances: - logger.error("MQTT Client {} already exists!".format(username)) + self._logger.error( + "MQTT Client {} already exists!".format(username)) raise IndexError("MQTT Client {} already exists!".format(username)) MQTTClient.instances[username] = self @@ -54,7 +56,6 @@ class MQTTClient: self._client.loop_start() self._connected = False - self.logger = logger @classmethod def get(cls, username: str) -> 'MQTTClient': @@ -77,14 +78,17 @@ class MQTTClient: """ return self._connected - def connect(self, broker: str, port: str, username: str, password: str, websocket: bool = False, ssl: bool = False, keep_alive: int = 60): + def connect(self, broker: str, port: Union[str, int], username: str, + password: str, vhost: str = '', websocket: bool = False, + ssl: bool = False, keep_alive: int = 60): """Opens a connection to an MQTT-broker under the given address and post. Args: broker: The address (URL) of the MQTT-broker to connect to. port: The port behind which the broker is running and accessible. - username: The username required for authentication at the broker - password: The password required for authentication at the broker + username: The username required for authentication at the broker. + password: The password required for authentication at the broker. + vhost: Virtual host to connect to at the MQTT-Broker. websocket: If true MQTT messages are published/received over WebSockets. If false, the default transportation over raw TCP is used. ssl: If true a secured TLS connection is established. keep_alive: maximum period in seconds allowed between communications with the broker. @@ -100,11 +104,17 @@ class MQTTClient: address = fields[0] path = "/".join(fields[1:]) self._client.ws_set_options(path=path) - self._client.username_pw_set(username, password) + + if vhost != '' and vhost != '/': + self._client.username_pw_set(f'{vhost}:{username}', password) + else: + self._client.username_pw_set(username, password) self._client.connect(address, port, keep_alive) except Exception as exception: - self.logger.error("MQTT Client {} could not connect to {}:{} : {}".format(self.name, broker, port, str(exception))) + self._logger.error( + "MQTT Client {} could not connect to {}:{} : {}".format( + self.name, broker, port, str(exception))) raise ConnectionError(str(exception)) def disconnect(self): @@ -114,26 +124,35 @@ class MQTTClient: try: self._client.disconnect() except Exception as exception: - self.logger.error("MQTT Client {} could not disconnect: {}".format(self.name, str(exception))) + self._logger.error( + "MQTT Client {} could not disconnect: {}".format(self.name, + str(exception))) raise ConnectionError(str(exception)) def _on_connect(self, client, userdata, flags, rc): if rc == 0: - self.logger.info("MQTT Client {} connect terminated with code {} ({}).".format(self.name, rc, mqtt.error_string(rc))) + self._logger.info( + "MQTT Client {} connect terminated with code {} ({}).".format( + self.name, rc, mqtt.error_string(rc))) self._connected = True else: - self.logger.error("MQTT Client {} connect terminated with code {} ({}).".format(self.name, rc, mqtt.error_string(rc))) + self._logger.error( + "MQTT Client {} connect terminated with code {} ({}).".format( + self.name, rc, mqtt.error_string(rc))) self._connected = False def _on_disconnect(self, client, userdata, rc): if rc == 1: - self.logger.error( + self._logger.error( "MQTT Client {} disconnected with code {} ({}). \n \t There are two possible reasons: \n" - "\t\t 1. There already is a client connected, with the same credentials you are using now. \n" - "\t\t 2. You may tried to use Publisher-Credentials for receiving messages or vise versa. " - "Please contact Mark Sanders, to check if your login credentials are correct!".format(self.name, rc, mqtt.error_string(rc))) + "\t\t 1. You may tried to use Publisher-Credentials for receiving messages or vise versa.\n" + "\t\t 2. You tried to publish to or subscribe a topic which you are not allowed to do. \n" + "Please contact Mark Sanders or Matthias Bodenbenner, to check if your login credentials are correct!".format( + self.name, rc, mqtt.error_string(rc))) else: - self.logger.info("MQTT Client {} disconnected with code {} ({}).".format(self.name, rc, mqtt.error_string(rc))) + self._logger.info( + "MQTT Client {} disconnected with code {} ({}).".format( + self.name, rc, mqtt.error_string(rc))) self._connected = False def __del__(self): @@ -149,19 +168,22 @@ class MQTTPublisher(MQTTClient): """ - def __init__(self, username: str = None, prefix: str = None, *args, **kwargs): + def __init__(self, username: str = None, prefix: str = None, + logger: Logger = None, *args, **kwargs): """ Args: username: Unique username used as identifier for the client to be created. If None is given, it is created automatically. prefix: Inserted to the beginning of each topic. + logger: Logger for creating a log of activities to console/terminal and/or file. *args: Additional optional arguments for initializing the client as of the paho-mqtt package. **kwargs: Additional optional keyword-arguments for initializing the client as of the paho-mqtt package. """ - MQTTClient.__init__(self, username, prefix, *args, **kwargs) + MQTTClient.__init__(self, username, prefix, logger, *args, **kwargs) self._client.on_publish = self._on_publish - def publish(self, topic: str, message: str, qos: int = 0, retain: bool = False): + def publish(self, topic: str, message: str, qos: int = 0, + retain: bool = False): """ Publish the given message under the given topic. Args: @@ -172,16 +194,23 @@ class MQTTPublisher(MQTTClient): """ try: if self.prefix is not None and self.prefix != "": - self._client.publish(self.prefix + "/" + topic.strip("/"), message, qos, retain) + self._client.publish(self.prefix + "/" + topic.strip("/"), + message, qos, retain) else: self._client.publish(topic.strip("/"), message, qos, retain) - self.logger.debug("MQTT Client {} will publish the following messsage to {}: {}".format(self.name, topic, message)) + self._logger.debug( + "MQTT Client {} will publish the following messsage to {}: {}".format( + self.name, topic, message)) except Exception as exception: - self.logger.error("MQTT Client {} could not publish to {}: {}".format(self.name, topic, str(exception))) + self._logger.error( + "MQTT Client {} could not publish to {}: {}".format(self.name, + topic, + str(exception))) raise PublishError(str(exception)) def _on_publish(self, client, userdata, mid): - self.logger.debug("MQTT Client {} published message {}.".format(self.name, mid)) + self._logger.debug( + "MQTT Client {} published message {}.".format(self.name, mid)) # def __shortcut(self, topic, payload, root_topic, qos, retain): # self._client.publish(root_topic + topic, payload, qos, retain) @@ -195,16 +224,18 @@ class MQTTSubscriber(MQTTClient): """ - def __init__(self, username: str = None, prefix: str = None, *args, **kwargs): + def __init__(self, username: str = None, prefix: str = None, + logger: Logger = None, *args, **kwargs): """ Args: username:Unique username used as identifier for the client to be created. If None is given, it is created automatically. prefix: Inserted to the beginning of each topic. + logger: Logger for creating a log of activities to console/terminal and/or file. *args: Additional optional arguments of the internally used paho-mqtt client. **kwargs: Additional optional key word arguments of the internally used paho-mqtt client. """ - MQTTClient.__init__(self, username, prefix, *args, **kwargs) + MQTTClient.__init__(self, username, prefix, logger, *args, **kwargs) self._client.on_message = self._on_message self._client.on_subscribe = self._on_subscribe self._client.on_unsubscribe = self._on_unsubscribe @@ -224,15 +255,19 @@ class MQTTSubscriber(MQTTClient): SubscriptionError: If topic could not be subscribed successfully. """ try: - topic = self.prefix + "/" + topic if self.prefix is not None and self.prefix != "" else topic + topic = f'{self.prefix}/{topic}' if self.prefix is not None and self.prefix != "" else topic for s in self._subscriptions: if s["topic"] == topic: - raise RuntimeError("Topic {} is already subscribed!".format(topic)) + raise RuntimeError( + "Topic {} is already subscribed!".format(topic)) self._subscriptions.append({"topic": topic, "qos": qos}) if self.connected: self._client.subscribe(topic, qos) except Exception as exception: - self.logger.error("MQTT Client {} could not subscribe to {}: {}".format(self.name, topic, str(exception))) + self._logger.error( + "MQTT Client {} could not subscribe to {}: {}".format(self.name, + topic, + str(exception))) raise SubscriptionError(str(exception)) def unsubscribe(self, topic: str): @@ -256,7 +291,9 @@ class MQTTSubscriber(MQTTClient): raise RuntimeError("Topic {} is not subscribed!".format(topic)) except Exception as exception: - self.logger.error("MQTT Client {} could not unsubscribe from {}: {}".format(self.name, topic, str(exception))) + self._logger.error( + "MQTT Client {} could not unsubscribe from {}: {}".format( + self.name, topic, str(exception))) raise SubscriptionError(str(exception)) def set_callback(self, key: str, function: Callable): @@ -268,13 +305,16 @@ class MQTTSubscriber(MQTTClient): First argument is the topic and the second the received message. """ if key in self._on_message_callbacks: - self.logger.warning("Overwriting callback {}!".format(key)) - warnings.warn("Overwriting callback {}!".format(key), RuntimeWarning) + self._logger.warning("Overwriting callback {}!".format(key)) + warnings.warn("Overwriting callback {}!".format(key), + RuntimeWarning) signature = inspect.signature(function) if len(signature.parameters) < 2: - self.logger.warning("Callback {} has insufficient parameters!".format(key)) - warnings.warn("Callback {} has insufficient parameters!".format(key)) + self._logger.warning( + "Callback {} has insufficient parameters!".format(key)) + warnings.warn( + "Callback {} has insufficient parameters!".format(key)) self._on_message_callbacks[key] = function @@ -288,25 +328,36 @@ class MQTTSubscriber(MQTTClient): def _on_connect(self, client, userdata, flags, rc): if rc == 0: - self.logger.info("MQTT Client {} connect terminated with code {} ({}).".format(self.name, rc, mqtt.error_string(rc))) + self._logger.info( + "MQTT Client {} connect terminated with code {} ({}).".format( + self.name, rc, mqtt.error_string(rc))) self._connected = True for s in self._subscriptions: self._client.subscribe(s["topic"], s["qos"]) elif rc == 4: - self.logger.info("MQTT Client {} connect terminated with code {} ({})".format(self.name, rc, mqtt.error_string(rc))) + self._logger.info( + "MQTT Client {} connect terminated with code {} ({})".format( + self.name, rc, mqtt.error_string(rc))) else: - self.logger.error("MQTT Client {} connect terminated with code {} ({}).".format(self.name, rc, mqtt.error_string(rc))) + self._logger.error( + "MQTT Client {} connect terminated with code {} ({}).".format( + self.name, rc, mqtt.error_string(rc))) self._connected = False - def _on_subscribe(self, client, userdata, mid, granted_qos, properties=None): - self.logger.info("MQTT Client {} subscribed with ID {}.".format(self.name, mid)) + def _on_subscribe(self, client, userdata, mid, granted_qos, + properties=None): + self._logger.info( + "MQTT Client {} subscribed with ID {}.".format(self.name, mid)) def _on_unsubscribe(self, client, userdata, mid): - self.logger.info("MQTT Client {} unsubscribed with ID {}.".format(self.name, mid)) + self._logger.info( + "MQTT Client {} unsubscribed with ID {}.".format(self.name, mid)) def _on_message(self, client, userdata, message): for key in self._on_message_callbacks: try: self._on_message_callbacks[key](message.topic, message.payload) except Exception as exception: - self.logger.error("Exception while processing callback for topic {}: {}".format(message.topic, str(exception))) + self._logger.error( + "Exception while processing callback for topic {}: {}".format( + message.topic, str(exception))) diff --git a/src/mqtt/logger.py b/src/mqtt/logger.py new file mode 100644 index 0000000000000000000000000000000000000000..f924e93aa18be48f7f5baa981b8cf14e1edbe1bd --- /dev/null +++ b/src/mqtt/logger.py @@ -0,0 +1,86 @@ +import datetime +import logging +import logging.config +import os + + +class Logger(object): + + def __init__(self, filename: str = None, path: str = None, + level: int = None): + """Creates a basic logger for console/terminal and optional file output. + + Args: + filename: Name of the file the output should be logged to, file format can be ommited. + Current timestamp and file format is automatically appened to the given filename. + If no, filename is given, the output is written to console/terminal only. + path: Optional path to the logging file. If omitted, file is created at the current working directory. + level: Specifies which information should be logged as specified by the python logging module. + If not set, default console level is INFO and default file level is DEBUG. + """ + if filename is not None: + self._filename = filename.split("\\")[-1] if len( + filename.split("\\")) > 1 else filename.split("/")[-1] + else: + self._filename = filename + self._path = path if path is not None else '' + self.root_logger = logging.getLogger(None) + self.root_logger.setLevel(logging.DEBUG) + + self.root_formatter = logging.Formatter( + '### %(levelname)-8s %(asctime)s %(name)-40s ###\n%(message)s\n') + + # initialize FileHandler + if self._filename is not None: + self.file_handler = logging.handlers.RotatingFileHandler( + os.path.join(self._path, "{}_{}.txt".format(filename, + datetime.datetime.now().isoformat()).replace( + ':', '-')), maxBytes=20 * 1024 ** 2, + backupCount=3) + self.file_handler.doRollover() + if level is None: + self.file_handler.setLevel(logging.DEBUG) + else: + self.file_handler.setLevel(level) + self.file_handler.setFormatter(self.root_formatter) + self.root_logger.addHandler(self.file_handler) + + # initialize StreamHandler (Console Output) + self.console_handler = logging.StreamHandler() + if level is None: + self.console_handler.setLevel(logging.INFO) + else: + self.console_handler.setLevel(level) + self.console_handler.setFormatter(self.root_formatter) + self.root_logger.addHandler(self.console_handler) + + def get(self, name: str = None) -> logging.Logger: + """Returns the basic python logging utility. + + Args: + name: Name to identify the logger. If no name is given, the RootLogger is returned. + + Returns: A basic python logger with the given name. + + """ + return logging.getLogger(name) + + def set_logging_level(self, level: int, target: str = '') -> None: + """Sets the logging level. + + Args: + level: Specifies which information should be logged as specified by the python logging module. + target: If specified as "FileHandler" the minimum file output is set to the given level. + If specified as "StreamHandler" the minimum console/terminal output is set to the given level. + If omitted, the level is set for both handlers. + + """ + if target == 'FileHandler': + if self._filename is not None: + self.file_handler.setLevel(level) + elif target == 'StreamHandler': + self.console_handler.setLevel(level) + else: + self.console_handler.setLevel(level) + if self._filename is not None: + self.file_handler.setLevel(level) diff --git a/test/measure_latency.pyw b/test/measure_latency.pyw index 59e24dd77e77e49306d01591bebfc1547459902f..5ae236ffc2193f7398b38bda14bb8a71dd8b2a60 100644 --- a/test/measure_latency.pyw +++ b/test/measure_latency.pyw @@ -13,11 +13,11 @@ from src import mqtt ### Ask for settings and individual credentials ### -MQTT_PUBLISHER_USER = "bdn-7f5e7ac0-b517-429f-91ca-e719366405fe" -MQTT_PUBLISHER_PASSWORD = "tK+VuJUAOmP3v8YTMOagydjonOYLFpLSYCJGPMLNzd4=" +MQTT_PUBLISHER_USER = "" +MQTT_PUBLISHER_PASSWORD = "" -MQTT_RECEIVER_USER = "bdn-5845b5e7-3d6c-45ee-a7f1-ceebd0f99089" -MQTT_RECEIVER_PASSWORD = "x0KHMYV8Ypcp23mTvoCG1UcMZv9c7JXMWuk2kSni/wc=" +MQTT_RECEIVER_USER = "" +MQTT_RECEIVER_PASSWORD = "" ### to connect to the central MQTT-Broker of MQ-MS use the following settings: MQTT_BROKER = "localhost" # wzl-mbroker01.wzl.rwth-aachen.de" @@ -26,7 +26,7 @@ MQTT_VHOST = "" # "metrology" ### Ask for settings and individual credentials ### -topic = "bdn-7f5e7ac0-b517-429f-91ca-e719366405fe/#" # set topic to subscribe according to MQTT syntax! +topic = "#" # set topic to subscribe according to MQTT syntax! qos = 0 # set QoS according to MQTT specifications! counter = 0 diff --git a/test/publish.py b/test/publish.py index 2ad3d756c495180af3c55855b37a9c891002cf7f..926458ec830a94b2c04c2a244d85bb4d9673a1fb 100644 --- a/test/publish.py +++ b/test/publish.py @@ -11,9 +11,9 @@ from src import mqtt MQTT_USER = "" MQTT_PASSWORD = "" -MQTT_BROKER = "127.0.0.1" -MQTT_PORT = 1883 -MQTT_VHOST = "/" +MQTT_BROKER = "mqtt.wzl-mq.rwth-aachen.de" +MQTT_PORT = 8883 +MQTT_VHOST = "metrology" ### to connect to the central MQTT-Broker of MQ-MS use the following settings: # MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de" @@ -24,14 +24,14 @@ MQTT_VHOST = "/" if __name__ == "__main__": - client = mqtt.MQTTPublisher() - client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) + client = mqtt.MQTTPublisher(prefix="bdn-212d8419-c75c-471f-8ea5-f3a5c19ac42e") + client.connect(MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD, vhost=MQTT_VHOST, ssl=True) while True: try: message = json.dumps({"value": [random.uniform(0, 5) for i in range(3)], "timestamp": datetime.datetime.utcnow().isoformat() + "Z", "covariance": [[2, 0, 0], [0, 2, 0], [0, 0, 0]], "nonce": str(uuid.uuid4()), "hash": None, "unit": "MTR"}) - client.publish("channel-001", message.encode("utf-8"), 0) - time.sleep(1) + client.publish("channel-001", message.encode("utf-8"), 2) + # time.sleep(0.01) except KeyboardInterrupt: break diff --git a/test/receive.py b/test/receive.py index 374beb5dfe39cff7c729eaedf5beca9c209e57ac..ed7a604ec69f67ca89bd7a44800499c42cc37054 100644 --- a/test/receive.py +++ b/test/receive.py @@ -9,9 +9,9 @@ logger = mqtt.root_logger.get('Receiver') MQTT_USER = "" MQTT_PASSWORD = "" -MQTT_BROKER = "127.0.0.1" -MQTT_PORT = 1883 -MQTT_VHOST = "/" +MQTT_BROKER = "mqtt.wzl-mq.rwth-aachen.de" +MQTT_PORT = 8883 +MQTT_VHOST = "metrology" ## to connect to the central MQTT-Broker of MQ-MS use the following settings: # MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de" @@ -27,11 +27,11 @@ def print_mqtt_message(topic, message): logger.info("### {} ###\r\n{}\r\n".format(topic, message.decode("utf-8"))) if __name__=="__main__": - client = mqtt.MQTTSubscriber() - client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":"+ MQTT_USER, MQTT_PASSWORD) + client = mqtt.MQTTSubscriber(prefix="bdn-212d8419-c75c-471f-8ea5-f3a5c19ac42e") + client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, vhost=MQTT_VHOST, MQTT_PASSWORD) client.set_callback("PRINT", print_mqtt_message) - client.subscribe(topic, qos) + # client.subscribe(topic, qos) while True: try: diff --git a/test/receive_create_log.py b/test/receive_create_log.py deleted file mode 100644 index 17e0d4f3ee3659d1d85d75aa4bb3ea52ae8fe547..0000000000000000000000000000000000000000 --- a/test/receive_create_log.py +++ /dev/null @@ -1,72 +0,0 @@ -from src import MQTT -import time -import logging -import asyncio -import datetime - -### Ask for settings and individual credentials ### - -MQTT_USER = "" -MQTT_PASSWORD = "" - -# MQTT_BROKER = "127.0.0.1" -# MQTT_PORT = 1883 -# MQTT_VHOST = "/" - -# to connect to the central MQTT-Broker of MQ-MS use the following settings: -MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de" -MQTT_PORT = 1883 -MQTT_VHOST = "metrology" - -### Ask for settings and individual credentials ### - -topic = "#" # set topic to subscribe according to MQTT syntax! -qos = 0 # set QoS according to MQTT specifications! - -lasertracker_counter = 0 -border = [6e5, 12e5, 18e5, 24e5, 30e5, 36e5, 42e5, 48e5, 54e5, 60e5] - - -def print_mqtt_message(topic, message): - global lasertracker_counter - line = "{}\r\n### {} ###\r\n{}\r\n".format(datetime.datetime.utcnow().isoformat() + "Z", topic, - message.decode("utf-8")) - with open('./out.txt', 'a') as outfile: - if 'radian-e7ccdff2-764d-42b2-91af-4dd52f9187e6' in topic: - if lasertracker_counter < border[0] or border[1] < lasertracker_counter < border[2] \ - or border[5] < lasertracker_counter < border[6]: - if lasertracker_counter % 10 == 0: - outfile.write(line) - elif border[0] < lasertracker_counter < border[1] or border[4] < lasertracker_counter < border[5] \ - or lasertracker_counter > border[9]: - return - elif border[2] < lasertracker_counter < border[3] or border[6] < lasertracker_counter < border[7]: - outfile.write(line) - elif border[3] < lasertracker_counter < border[4] or border[7] < lasertracker_counter < border[8]: - if lasertracker_counter % 100 == 0: - outfile.write(line) - lasertracker_counter += 1 - else: - outfile.write(line) - - -if __name__ == "__main__": - console_log = logging.StreamHandler() - console_log.setFormatter(MQTT.formatter) - - MQTT.logger.addHandler(console_log) - MQTT.logger.setLevel(logging.INFO) - - client = MQTT.MQTTSubscriber(MQTT_USER) - client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) - - client.set_callback("PRINT", print_mqtt_message) - client.subscribe(topic, qos) - - loop = asyncio.get_event_loop() - - while True: - try: - time.sleep(0.001) - except KeyboardInterrupt: - break diff --git a/test/receive_test.py b/test/receive_test.py deleted file mode 100644 index 781aceb934eb0f5a960db9d60da3237a9047241d..0000000000000000000000000000000000000000 --- a/test/receive_test.py +++ /dev/null @@ -1,51 +0,0 @@ -from src import mqtt -import time -import logging -import asyncio -import datetime - -### Ask for settings and individual credentials ### - -MQTT_USER = "" -MQTT_PASSWORD = "" - -MQTT_BROKER = "127.0.0.1" -MQTT_PORT = 1883 -MQTT_VHOST = "/" - -# # to connect to the central mqtt-Broker of MQ-MS use the following settings: -# MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de" -# MQTT_PORT = 1883 -# MQTT_VHOST = "metrology" - -### Ask for settings and individual credentials ### - -topic = "#" # set topic to subscribe according to mqtt syntax! -qos = 0 # set QoS according to mqtt specifications! - - -def print_mqtt_message(topic, message): - print("{}\r\n### {} ###\r\n{}\r\n".format(datetime.datetime.utcnow().isoformat() + "Z", topic, - message.decode("utf-8"))) - - -if __name__ == "__main__": - console_log = logging.StreamHandler() - console_log.setFormatter(mqtt.formatter) - - mqtt.logger.addHandler(console_log) - mqtt.logger.setLevel(logging.INFO) - - client = mqtt.MQTTSubscriber(MQTT_USER) - client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD) - - client.set_callback("PRINT", print_mqtt_message) - client.subscribe(topic, qos) - - loop = asyncio.get_event_loop() - - while True: - try: - time.sleep(0.001) - except KeyboardInterrupt: - break