Skip to content
Snippets Groups Projects
Commit c80bd202 authored by Matthias Stefan Bodenbenner's avatar Matthias Stefan Bodenbenner
Browse files

changed behaviour of logger, added vhost option to connect function

parent 1e14fe84
No related branches found
No related tags found
No related merge requests found
Pipeline #79220 passed
# WZL-MQTT
[![pipeline status](https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/digital-mars/mqtt/badges/master/pipeline.svg)](https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/digital-mars/mqtt/badges/master)
Current stable version: 2.3.0
Current stable version: 2.4.0
## Installation
Requires at least Python 3.6
1. Install the WZL-MQTT package via pip
```
pip install --extra-index-url https://package-read:gkYP4xrm2PxicUbW1wra@git-ce.rwth-aachen.de/api/v4/projects/1708/packages/pypi/simple wzl-mqtt
......@@ -25,7 +27,7 @@ MQTT_VHOST = "/"
# initialize publisher and connect to the broker
client = mqtt.MQTTPublisher()
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD)
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD, vhost=MQTT_VHOST)
# create message and publish the message as UTF-8 encoded string
message = json.dumps({"value": [random.uniform(0, 5) for i in range(3)], "timestamp": datetime.datetime.utcnow().isoformat() + "Z",
......@@ -45,7 +47,7 @@ MQTT_PORT = 1883
MQTT_VHOST = "/"
# initialize logger
logger = mqtt.root_logger.get('Receiver') # change 'Receiver' to any string you like
logger = mqtt.Logger.get('Receiver') # change 'Receiver' to any string you like
# define callback which will be executed when a message is received
def print_mqtt_message(topic, message):
......@@ -53,7 +55,7 @@ def print_mqtt_message(topic, message):
# initialize subscriber and connect to the broker
client = mqtt.MQTTSubscriber()
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":"+ MQTT_USER, MQTT_PASSWORD)
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD, vhost=MQTT_VHOST)
# register the callback and subscribe topic
client.set_callback("PRINT", print_mqtt_message)
......@@ -77,6 +79,12 @@ To obtain credentials for MQTT-Broker of WZL-MQ-MS contact [Mark Sanders](mailto
For more detailed explanation and full API documentation, view [https://iot.wzl-mq.rwth-aachen.de/documentation/libs/mqtt/](https://iot.wzl-mq.rwth-aachen.de/documentation/libs/mqtt/)
## Changelog
2.4.0
- added vhost parameter to connect function of client
- changed default logging behaviour
- now by default logging is only written to console
- if logging should also create log-files, the logger must be explicitly initialized
2.3.0
- a prefix can now be defined which is prepended to every published or subscribed topic
......
......@@ -20,6 +20,14 @@ mqtt.exceptions module
:undoc-members:
:show-inheritance:
mqtt.logger module
------------------
.. automodule:: mqtt.logger
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
......@@ -27,4 +35,3 @@ Module contents
:members:
:undoc-members:
:show-inheritance:
......@@ -13,11 +13,10 @@ Publishing MQTT Packages
# address, port and virtual host of the broker to connect to
MQTT_BROKER = "127.0.0.1"
MQTT_PORT = 1883
MQTT_VHOST = "/"
# initialize publisher and connect to the broker
client = mqtt.MQTTPublisher()
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD)
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD)
# create message and publish the message as UTF-8 encoded string
message = json.dumps({"value": [random.uniform(0, 5) for i in range(3)], "timestamp": datetime.datetime.utcnow().isoformat() + "Z",
......@@ -39,9 +38,10 @@ Receiving MQTT Messages
MQTT_VHOST = "/"
## To connect to the central MQTT-Broker of MQ-MS use the settings below.
## Ask Mark Sanders (sdr) for personal credentials.
# MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de"
# MQTT_PORT = 1883
## Ask Mark Sanders (sdr) or Matthias Bodenbenner (bdn) for personal credentials.
## Set the "ssl" flag of the connect function to "True".
# MQTT_BROKER = "mqtt.wzl-mq.rwth-aachen.de"
# MQTT_PORT = 8883
# MQTT_VHOST = "metrology"
topic = "#" # set topic to subscribe according to MQTT syntax!
......@@ -54,7 +54,7 @@ Receiving MQTT Messages
if __name__ == "__main__":
client = mqtt.MQTTSubscriber()
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD)
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD, vhost=MQTT_VHOST)
client.set_callback("PRINT", print_mqtt_message)
client.subscribe(topic, qos)
......
import datetime
import json
import random
import time
import uuid
from src.mqtt import Logger
from src import mqtt
### Ask for settings and individual credentials ###
MQTT_USER = "bdn-212d8419-c75c-471f-8ea5-f3a5c19ac42e"
MQTT_PASSWORD = "azesR4Q8~M7UBKh<7S~d\"NN-|)i9:Le["
MQTT_BROKER = "mqtt.wzl-mq.rwth-aachen.de"
MQTT_PORT = 8883
MQTT_VHOST = "metrology"
### to connect to the central MQTT-Broker of MQ-MS use the following settings:
# MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de"
# MQTT_PORT = 1883
# MQTT_VHOST = "metrology"
### Ask for settings and individual credentials ###
if __name__ == "__main__":
client = mqtt.MQTTPublisher(prefix="bdn-212d8419-c75c-471f-8ea5-f3a5c19ac42e")
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD, vhost=MQTT_VHOST, ssl=True)
while True:
try:
message = json.dumps({"value": [random.uniform(0, 5) for i in range(3)], "timestamp": datetime.datetime.utcnow().isoformat() + "Z",
"covariance": [[2, 0, 0], [0, 2, 0], [0, 0, 0]], "nonce": str(uuid.uuid4()), "hash": None, "unit": "MTR"})
client.publish("channel-001", message.encode("utf-8"), 2)
time.sleep(1)
except KeyboardInterrupt:
break
from src import mqtt
import time
import logging
import asyncio
import datetime
from src.mqtt import Logger
from src import mqtt
### Ask for settings and individual credentials ###
MQTT_USER = ""
MQTT_PASSWORD = ""
MQTT_USER = "bdn-212d8419-c75c-471f-8ea5-f3a5c19ac42e"
MQTT_PASSWORD = "azesR4Q8~M7UBKh<7S~d\"NN-|)i9:Le["
MQTT_BROKER = "127.0.0.1"
MQTT_PORT = 1883
MQTT_VHOST = "/"
MQTT_BROKER = "mqtt.wzl-mq.rwth-aachen.de"
MQTT_PORT = 8883
MQTT_VHOST = "metrology"
# # to connect to the central mqtt-Broker of MQ-MS use the following settings:
## to connect to the central MQTT-Broker of MQ-MS use the following settings:
# MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de"
# MQTT_PORT = 1883
# MQTT_VHOST = "metrology"
### Ask for settings and individual credentials ###
topic = "#" # set topic to subscribe according to mqtt syntax!
qos = 0 # set QoS according to mqtt specifications!
topic = "#" # set topic to subscribe according to MQTT syntax!
qos = 0 # set QoS according to MQTT specifications!
logger = Logger()
def print_mqtt_message(topic, message):
print("{}\r\n### {} ###\r\n{}\r\n".format(datetime.datetime.utcnow().isoformat() + "Z", topic,
message.decode("utf-8")))
logger.get('Receiver').info("### {} ###\r\n{}\r\n".format(topic, message.decode("utf-8")))
if __name__=="__main__":
console_log = logging.StreamHandler()
console_log.setFormatter(mqtt.formatter)
mqtt.logger.addHandler(console_log)
mqtt.logger.setLevel(logging.INFO)
client = mqtt.MQTTSubscriber(MQTT_USER)
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD)
client = mqtt.MQTTSubscriber(prefix="bdn-212d8419-c75c-471f-8ea5-f3a5c19ac42e", logger=logger)
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD, vhost=MQTT_VHOST)
client.set_callback("PRINT", print_mqtt_message)
client.subscribe(topic, qos)
loop = asyncio.get_event_loop()
# client.subscribe(topic, qos)
while True:
try:
time.sleep(0.001)
time.sleep(1)
except KeyboardInterrupt:
break
from setuptools import setup, find_packages
setup(name='wzl-mqtt',
version='2.3.0',
version='2.4.0',
url='',
author='Benjamin Montavon, Matthias Bodenbenner',
author_email='m.bodenbenner@wzl.rwth-aachen.de',
......
......@@ -6,4 +6,4 @@ from .exceptions import PublishError
from .client import MQTTPublisher
from .client import MQTTSubscriber
from ._logger import root_logger
from .logger import Logger
import datetime
import logging
import logging.config
import os
import sys
class Logger(object):
def __init__(self, path, level=None):
self._path = path
self.root_logger = logging.getLogger(None)
self.root_logger.setLevel(logging.DEBUG)
if len(sys.argv[0].split("\\")) > 1:
filename = sys.argv[0].split("\\")[-1]
else:
filename = sys.argv[0].split("/")[-1]
self.file_handler = logging.handlers.RotatingFileHandler(
os.path.join(self._path, "{}_{}.txt".format(filename, datetime.datetime.now().isoformat()).replace(':', '-')), maxBytes=20 * 1024 ** 2,
backupCount=3)
self.file_handler.doRollover()
if level is None:
self.file_handler.setLevel(logging.DEBUG)
else:
self.file_handler.setLevel(level)
self.console_handler = logging.StreamHandler()
if level is None:
self.console_handler.setLevel(logging.INFO)
else:
self.console_handler.setLevel(level)
self.root_formatter = logging.Formatter('### %(levelname)-8s %(asctime)s %(name)-40s ###\n%(message)s\n')
self.file_handler.setFormatter(self.root_formatter)
self.console_handler.setFormatter(self.root_formatter)
self.root_logger.addHandler(self.file_handler)
self.root_logger.addHandler(self.console_handler)
def get(self, name=None):
return logging.getLogger(name)
def set_logging_level(self, level, target=''):
if target == 'FileHandler':
self.file_handler.setLevel(level)
elif target == 'StreamHandler':
self.console_handler.setLevel(level)
else:
self.console_handler.setLevel(level)
self.file_handler.setLevel(level)
path = './logs'
for aid, arg in enumerate(sys.argv):
if arg == '--logging-path':
if aid == len(sys.argv) - 1:
print('ERROR: No path argument given!')
else:
path = sys.argv[aid + 1]
break
if not os.path.isdir(path):
os.makedirs(path)
root_logger = Logger(path)
import functools
import inspect
import uuid
import warnings
from typing import Callable
from typing import Callable, Union
import paho.mqtt.client as mqtt
import uuid
from ._logger import root_logger
from .logger import Logger
from .exceptions import ConnectionError
from .exceptions import PublishError
from .exceptions import SubscriptionError
logger = root_logger.get('MQTTClient')
class MQTTClient:
"""Client base class for MQTT Publisher and Subscriber.
......@@ -27,12 +23,14 @@ class MQTTClient:
instances = {}
def __init__(self, username: str = None, prefix: str = None, *args, **kwargs):
def __init__(self, username: str = None, prefix: str = None,
logger: Logger = None, *args, **kwargs):
"""
Args:
username: Unique username used as identifier for the client to be created. If None is given, it is created automatically.
prefix: Inserted to the beginning of each topic.
logger: Logger for creating a log of activities to console/terminal and/or file.
*args: Additional optional arguments for initializing the client as of the paho-mqtt package.
**kwargs: Additional keyword-arguments for initializing the client as of the paho-mqtt package.
......@@ -40,8 +38,12 @@ class MQTTClient:
IndexError: If there already exists a client with the given uuid.
"""
self._logger = logger.get(
'MQTTClient') if logger is not None else Logger().get('MQTTClient')
if username in MQTTClient.instances:
logger.error("MQTT Client {} already exists!".format(username))
self._logger.error(
"MQTT Client {} already exists!".format(username))
raise IndexError("MQTT Client {} already exists!".format(username))
MQTTClient.instances[username] = self
......@@ -54,7 +56,6 @@ class MQTTClient:
self._client.loop_start()
self._connected = False
self.logger = logger
@classmethod
def get(cls, username: str) -> 'MQTTClient':
......@@ -77,14 +78,17 @@ class MQTTClient:
"""
return self._connected
def connect(self, broker: str, port: str, username: str, password: str, websocket: bool = False, ssl: bool = False, keep_alive: int = 60):
def connect(self, broker: str, port: Union[str, int], username: str,
password: str, vhost: str = '', websocket: bool = False,
ssl: bool = False, keep_alive: int = 60):
"""Opens a connection to an MQTT-broker under the given address and post.
Args:
broker: The address (URL) of the MQTT-broker to connect to.
port: The port behind which the broker is running and accessible.
username: The username required for authentication at the broker
password: The password required for authentication at the broker
username: The username required for authentication at the broker.
password: The password required for authentication at the broker.
vhost: Virtual host to connect to at the MQTT-Broker.
websocket: If true MQTT messages are published/received over WebSockets. If false, the default transportation over raw TCP is used.
ssl: If true a secured TLS connection is established.
keep_alive: maximum period in seconds allowed between communications with the broker.
......@@ -100,11 +104,17 @@ class MQTTClient:
address = fields[0]
path = "/".join(fields[1:])
self._client.ws_set_options(path=path)
if vhost != '' and vhost != '/':
self._client.username_pw_set(f'{vhost}:{username}', password)
else:
self._client.username_pw_set(username, password)
self._client.connect(address, port, keep_alive)
except Exception as exception:
self.logger.error("MQTT Client {} could not connect to {}:{} : {}".format(self.name, broker, port, str(exception)))
self._logger.error(
"MQTT Client {} could not connect to {}:{} : {}".format(
self.name, broker, port, str(exception)))
raise ConnectionError(str(exception))
def disconnect(self):
......@@ -114,26 +124,35 @@ class MQTTClient:
try:
self._client.disconnect()
except Exception as exception:
self.logger.error("MQTT Client {} could not disconnect: {}".format(self.name, str(exception)))
self._logger.error(
"MQTT Client {} could not disconnect: {}".format(self.name,
str(exception)))
raise ConnectionError(str(exception))
def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.logger.info("MQTT Client {} connect terminated with code {} ({}).".format(self.name, rc, mqtt.error_string(rc)))
self._logger.info(
"MQTT Client {} connect terminated with code {} ({}).".format(
self.name, rc, mqtt.error_string(rc)))
self._connected = True
else:
self.logger.error("MQTT Client {} connect terminated with code {} ({}).".format(self.name, rc, mqtt.error_string(rc)))
self._logger.error(
"MQTT Client {} connect terminated with code {} ({}).".format(
self.name, rc, mqtt.error_string(rc)))
self._connected = False
def _on_disconnect(self, client, userdata, rc):
if rc == 1:
self.logger.error(
self._logger.error(
"MQTT Client {} disconnected with code {} ({}). \n \t There are two possible reasons: \n"
"\t\t 1. There already is a client connected, with the same credentials you are using now. \n"
"\t\t 2. You may tried to use Publisher-Credentials for receiving messages or vise versa. "
"Please contact Mark Sanders, to check if your login credentials are correct!".format(self.name, rc, mqtt.error_string(rc)))
"\t\t 1. You may tried to use Publisher-Credentials for receiving messages or vise versa.\n"
"\t\t 2. You tried to publish to or subscribe a topic which you are not allowed to do. \n"
"Please contact Mark Sanders or Matthias Bodenbenner, to check if your login credentials are correct!".format(
self.name, rc, mqtt.error_string(rc)))
else:
self.logger.info("MQTT Client {} disconnected with code {} ({}).".format(self.name, rc, mqtt.error_string(rc)))
self._logger.info(
"MQTT Client {} disconnected with code {} ({}).".format(
self.name, rc, mqtt.error_string(rc)))
self._connected = False
def __del__(self):
......@@ -149,19 +168,22 @@ class MQTTPublisher(MQTTClient):
"""
def __init__(self, username: str = None, prefix: str = None, *args, **kwargs):
def __init__(self, username: str = None, prefix: str = None,
logger: Logger = None, *args, **kwargs):
"""
Args:
username: Unique username used as identifier for the client to be created. If None is given, it is created automatically.
prefix: Inserted to the beginning of each topic.
logger: Logger for creating a log of activities to console/terminal and/or file.
*args: Additional optional arguments for initializing the client as of the paho-mqtt package.
**kwargs: Additional optional keyword-arguments for initializing the client as of the paho-mqtt package.
"""
MQTTClient.__init__(self, username, prefix, *args, **kwargs)
MQTTClient.__init__(self, username, prefix, logger, *args, **kwargs)
self._client.on_publish = self._on_publish
def publish(self, topic: str, message: str, qos: int = 0, retain: bool = False):
def publish(self, topic: str, message: str, qos: int = 0,
retain: bool = False):
""" Publish the given message under the given topic.
Args:
......@@ -172,16 +194,23 @@ class MQTTPublisher(MQTTClient):
"""
try:
if self.prefix is not None and self.prefix != "":
self._client.publish(self.prefix + "/" + topic.strip("/"), message, qos, retain)
self._client.publish(self.prefix + "/" + topic.strip("/"),
message, qos, retain)
else:
self._client.publish(topic.strip("/"), message, qos, retain)
self.logger.debug("MQTT Client {} will publish the following messsage to {}: {}".format(self.name, topic, message))
self._logger.debug(
"MQTT Client {} will publish the following messsage to {}: {}".format(
self.name, topic, message))
except Exception as exception:
self.logger.error("MQTT Client {} could not publish to {}: {}".format(self.name, topic, str(exception)))
self._logger.error(
"MQTT Client {} could not publish to {}: {}".format(self.name,
topic,
str(exception)))
raise PublishError(str(exception))
def _on_publish(self, client, userdata, mid):
self.logger.debug("MQTT Client {} published message {}.".format(self.name, mid))
self._logger.debug(
"MQTT Client {} published message {}.".format(self.name, mid))
# def __shortcut(self, topic, payload, root_topic, qos, retain):
# self._client.publish(root_topic + topic, payload, qos, retain)
......@@ -195,16 +224,18 @@ class MQTTSubscriber(MQTTClient):
"""
def __init__(self, username: str = None, prefix: str = None, *args, **kwargs):
def __init__(self, username: str = None, prefix: str = None,
logger: Logger = None, *args, **kwargs):
"""
Args:
username:Unique username used as identifier for the client to be created. If None is given, it is created automatically.
prefix: Inserted to the beginning of each topic.
logger: Logger for creating a log of activities to console/terminal and/or file.
*args: Additional optional arguments of the internally used paho-mqtt client.
**kwargs: Additional optional key word arguments of the internally used paho-mqtt client.
"""
MQTTClient.__init__(self, username, prefix, *args, **kwargs)
MQTTClient.__init__(self, username, prefix, logger, *args, **kwargs)
self._client.on_message = self._on_message
self._client.on_subscribe = self._on_subscribe
self._client.on_unsubscribe = self._on_unsubscribe
......@@ -224,15 +255,19 @@ class MQTTSubscriber(MQTTClient):
SubscriptionError: If topic could not be subscribed successfully.
"""
try:
topic = self.prefix + "/" + topic if self.prefix is not None and self.prefix != "" else topic
topic = f'{self.prefix}/{topic}' if self.prefix is not None and self.prefix != "" else topic
for s in self._subscriptions:
if s["topic"] == topic:
raise RuntimeError("Topic {} is already subscribed!".format(topic))
raise RuntimeError(
"Topic {} is already subscribed!".format(topic))
self._subscriptions.append({"topic": topic, "qos": qos})
if self.connected:
self._client.subscribe(topic, qos)
except Exception as exception:
self.logger.error("MQTT Client {} could not subscribe to {}: {}".format(self.name, topic, str(exception)))
self._logger.error(
"MQTT Client {} could not subscribe to {}: {}".format(self.name,
topic,
str(exception)))
raise SubscriptionError(str(exception))
def unsubscribe(self, topic: str):
......@@ -256,7 +291,9 @@ class MQTTSubscriber(MQTTClient):
raise RuntimeError("Topic {} is not subscribed!".format(topic))
except Exception as exception:
self.logger.error("MQTT Client {} could not unsubscribe from {}: {}".format(self.name, topic, str(exception)))
self._logger.error(
"MQTT Client {} could not unsubscribe from {}: {}".format(
self.name, topic, str(exception)))
raise SubscriptionError(str(exception))
def set_callback(self, key: str, function: Callable):
......@@ -268,13 +305,16 @@ class MQTTSubscriber(MQTTClient):
First argument is the topic and the second the received message.
"""
if key in self._on_message_callbacks:
self.logger.warning("Overwriting callback {}!".format(key))
warnings.warn("Overwriting callback {}!".format(key), RuntimeWarning)
self._logger.warning("Overwriting callback {}!".format(key))
warnings.warn("Overwriting callback {}!".format(key),
RuntimeWarning)
signature = inspect.signature(function)
if len(signature.parameters) < 2:
self.logger.warning("Callback {} has insufficient parameters!".format(key))
warnings.warn("Callback {} has insufficient parameters!".format(key))
self._logger.warning(
"Callback {} has insufficient parameters!".format(key))
warnings.warn(
"Callback {} has insufficient parameters!".format(key))
self._on_message_callbacks[key] = function
......@@ -288,25 +328,36 @@ class MQTTSubscriber(MQTTClient):
def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.logger.info("MQTT Client {} connect terminated with code {} ({}).".format(self.name, rc, mqtt.error_string(rc)))
self._logger.info(
"MQTT Client {} connect terminated with code {} ({}).".format(
self.name, rc, mqtt.error_string(rc)))
self._connected = True
for s in self._subscriptions:
self._client.subscribe(s["topic"], s["qos"])
elif rc == 4:
self.logger.info("MQTT Client {} connect terminated with code {} ({})".format(self.name, rc, mqtt.error_string(rc)))
self._logger.info(
"MQTT Client {} connect terminated with code {} ({})".format(
self.name, rc, mqtt.error_string(rc)))
else:
self.logger.error("MQTT Client {} connect terminated with code {} ({}).".format(self.name, rc, mqtt.error_string(rc)))
self._logger.error(
"MQTT Client {} connect terminated with code {} ({}).".format(
self.name, rc, mqtt.error_string(rc)))
self._connected = False
def _on_subscribe(self, client, userdata, mid, granted_qos, properties=None):
self.logger.info("MQTT Client {} subscribed with ID {}.".format(self.name, mid))
def _on_subscribe(self, client, userdata, mid, granted_qos,
properties=None):
self._logger.info(
"MQTT Client {} subscribed with ID {}.".format(self.name, mid))
def _on_unsubscribe(self, client, userdata, mid):
self.logger.info("MQTT Client {} unsubscribed with ID {}.".format(self.name, mid))
self._logger.info(
"MQTT Client {} unsubscribed with ID {}.".format(self.name, mid))
def _on_message(self, client, userdata, message):
for key in self._on_message_callbacks:
try:
self._on_message_callbacks[key](message.topic, message.payload)
except Exception as exception:
self.logger.error("Exception while processing callback for topic {}: {}".format(message.topic, str(exception)))
self._logger.error(
"Exception while processing callback for topic {}: {}".format(
message.topic, str(exception)))
import datetime
import logging
import logging.config
import os
class Logger(object):
def __init__(self, filename: str = None, path: str = None,
level: int = None):
"""Creates a basic logger for console/terminal and optional file output.
Args:
filename: Name of the file the output should be logged to, file format can be ommited.
Current timestamp and file format is automatically appened to the given filename.
If no, filename is given, the output is written to console/terminal only.
path: Optional path to the logging file. If omitted, file is created at the current working directory.
level: Specifies which information should be logged as specified by the python logging module.
If not set, default console level is INFO and default file level is DEBUG.
"""
if filename is not None:
self._filename = filename.split("\\")[-1] if len(
filename.split("\\")) > 1 else filename.split("/")[-1]
else:
self._filename = filename
self._path = path if path is not None else ''
self.root_logger = logging.getLogger(None)
self.root_logger.setLevel(logging.DEBUG)
self.root_formatter = logging.Formatter(
'### %(levelname)-8s %(asctime)s %(name)-40s ###\n%(message)s\n')
# initialize FileHandler
if self._filename is not None:
self.file_handler = logging.handlers.RotatingFileHandler(
os.path.join(self._path, "{}_{}.txt".format(filename,
datetime.datetime.now().isoformat()).replace(
':', '-')), maxBytes=20 * 1024 ** 2,
backupCount=3)
self.file_handler.doRollover()
if level is None:
self.file_handler.setLevel(logging.DEBUG)
else:
self.file_handler.setLevel(level)
self.file_handler.setFormatter(self.root_formatter)
self.root_logger.addHandler(self.file_handler)
# initialize StreamHandler (Console Output)
self.console_handler = logging.StreamHandler()
if level is None:
self.console_handler.setLevel(logging.INFO)
else:
self.console_handler.setLevel(level)
self.console_handler.setFormatter(self.root_formatter)
self.root_logger.addHandler(self.console_handler)
def get(self, name: str = None) -> logging.Logger:
"""Returns the basic python logging utility.
Args:
name: Name to identify the logger. If no name is given, the RootLogger is returned.
Returns: A basic python logger with the given name.
"""
return logging.getLogger(name)
def set_logging_level(self, level: int, target: str = '') -> None:
"""Sets the logging level.
Args:
level: Specifies which information should be logged as specified by the python logging module.
target: If specified as "FileHandler" the minimum file output is set to the given level.
If specified as "StreamHandler" the minimum console/terminal output is set to the given level.
If omitted, the level is set for both handlers.
"""
if target == 'FileHandler':
if self._filename is not None:
self.file_handler.setLevel(level)
elif target == 'StreamHandler':
self.console_handler.setLevel(level)
else:
self.console_handler.setLevel(level)
if self._filename is not None:
self.file_handler.setLevel(level)
......@@ -13,11 +13,11 @@ from src import mqtt
### Ask for settings and individual credentials ###
MQTT_PUBLISHER_USER = "bdn-7f5e7ac0-b517-429f-91ca-e719366405fe"
MQTT_PUBLISHER_PASSWORD = "tK+VuJUAOmP3v8YTMOagydjonOYLFpLSYCJGPMLNzd4="
MQTT_PUBLISHER_USER = ""
MQTT_PUBLISHER_PASSWORD = ""
MQTT_RECEIVER_USER = "bdn-5845b5e7-3d6c-45ee-a7f1-ceebd0f99089"
MQTT_RECEIVER_PASSWORD = "x0KHMYV8Ypcp23mTvoCG1UcMZv9c7JXMWuk2kSni/wc="
MQTT_RECEIVER_USER = ""
MQTT_RECEIVER_PASSWORD = ""
### to connect to the central MQTT-Broker of MQ-MS use the following settings:
MQTT_BROKER = "localhost" # wzl-mbroker01.wzl.rwth-aachen.de"
......@@ -26,7 +26,7 @@ MQTT_VHOST = "" # "metrology"
### Ask for settings and individual credentials ###
topic = "bdn-7f5e7ac0-b517-429f-91ca-e719366405fe/#" # set topic to subscribe according to MQTT syntax!
topic = "#" # set topic to subscribe according to MQTT syntax!
qos = 0 # set QoS according to MQTT specifications!
counter = 0
......
......@@ -11,9 +11,9 @@ from src import mqtt
MQTT_USER = ""
MQTT_PASSWORD = ""
MQTT_BROKER = "127.0.0.1"
MQTT_PORT = 1883
MQTT_VHOST = "/"
MQTT_BROKER = "mqtt.wzl-mq.rwth-aachen.de"
MQTT_PORT = 8883
MQTT_VHOST = "metrology"
### to connect to the central MQTT-Broker of MQ-MS use the following settings:
# MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de"
......@@ -24,14 +24,14 @@ MQTT_VHOST = "/"
if __name__ == "__main__":
client = mqtt.MQTTPublisher()
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD)
client = mqtt.MQTTPublisher(prefix="bdn-212d8419-c75c-471f-8ea5-f3a5c19ac42e")
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_USER, MQTT_PASSWORD, vhost=MQTT_VHOST, ssl=True)
while True:
try:
message = json.dumps({"value": [random.uniform(0, 5) for i in range(3)], "timestamp": datetime.datetime.utcnow().isoformat() + "Z",
"covariance": [[2, 0, 0], [0, 2, 0], [0, 0, 0]], "nonce": str(uuid.uuid4()), "hash": None, "unit": "MTR"})
client.publish("channel-001", message.encode("utf-8"), 0)
time.sleep(1)
client.publish("channel-001", message.encode("utf-8"), 2)
# time.sleep(0.01)
except KeyboardInterrupt:
break
......@@ -9,9 +9,9 @@ logger = mqtt.root_logger.get('Receiver')
MQTT_USER = ""
MQTT_PASSWORD = ""
MQTT_BROKER = "127.0.0.1"
MQTT_PORT = 1883
MQTT_VHOST = "/"
MQTT_BROKER = "mqtt.wzl-mq.rwth-aachen.de"
MQTT_PORT = 8883
MQTT_VHOST = "metrology"
## to connect to the central MQTT-Broker of MQ-MS use the following settings:
# MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de"
......@@ -27,11 +27,11 @@ def print_mqtt_message(topic, message):
logger.info("### {} ###\r\n{}\r\n".format(topic, message.decode("utf-8")))
if __name__=="__main__":
client = mqtt.MQTTSubscriber()
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":"+ MQTT_USER, MQTT_PASSWORD)
client = mqtt.MQTTSubscriber(prefix="bdn-212d8419-c75c-471f-8ea5-f3a5c19ac42e")
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, vhost=MQTT_VHOST, MQTT_PASSWORD)
client.set_callback("PRINT", print_mqtt_message)
client.subscribe(topic, qos)
# client.subscribe(topic, qos)
while True:
try:
......
from src import MQTT
import time
import logging
import asyncio
import datetime
### Ask for settings and individual credentials ###
MQTT_USER = ""
MQTT_PASSWORD = ""
# MQTT_BROKER = "127.0.0.1"
# MQTT_PORT = 1883
# MQTT_VHOST = "/"
# to connect to the central MQTT-Broker of MQ-MS use the following settings:
MQTT_BROKER = "wzl-mbroker01.wzl.rwth-aachen.de"
MQTT_PORT = 1883
MQTT_VHOST = "metrology"
### Ask for settings and individual credentials ###
topic = "#" # set topic to subscribe according to MQTT syntax!
qos = 0 # set QoS according to MQTT specifications!
lasertracker_counter = 0
border = [6e5, 12e5, 18e5, 24e5, 30e5, 36e5, 42e5, 48e5, 54e5, 60e5]
def print_mqtt_message(topic, message):
global lasertracker_counter
line = "{}\r\n### {} ###\r\n{}\r\n".format(datetime.datetime.utcnow().isoformat() + "Z", topic,
message.decode("utf-8"))
with open('./out.txt', 'a') as outfile:
if 'radian-e7ccdff2-764d-42b2-91af-4dd52f9187e6' in topic:
if lasertracker_counter < border[0] or border[1] < lasertracker_counter < border[2] \
or border[5] < lasertracker_counter < border[6]:
if lasertracker_counter % 10 == 0:
outfile.write(line)
elif border[0] < lasertracker_counter < border[1] or border[4] < lasertracker_counter < border[5] \
or lasertracker_counter > border[9]:
return
elif border[2] < lasertracker_counter < border[3] or border[6] < lasertracker_counter < border[7]:
outfile.write(line)
elif border[3] < lasertracker_counter < border[4] or border[7] < lasertracker_counter < border[8]:
if lasertracker_counter % 100 == 0:
outfile.write(line)
lasertracker_counter += 1
else:
outfile.write(line)
if __name__ == "__main__":
console_log = logging.StreamHandler()
console_log.setFormatter(MQTT.formatter)
MQTT.logger.addHandler(console_log)
MQTT.logger.setLevel(logging.INFO)
client = MQTT.MQTTSubscriber(MQTT_USER)
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":" + MQTT_USER, MQTT_PASSWORD)
client.set_callback("PRINT", print_mqtt_message)
client.subscribe(topic, qos)
loop = asyncio.get_event_loop()
while True:
try:
time.sleep(0.001)
except KeyboardInterrupt:
break
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment