diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml deleted file mode 100644 index f833ab3fed099a3fa0374143abcc308ed705e223..0000000000000000000000000000000000000000 --- a/.gitlab-ci.yml +++ /dev/null @@ -1,54 +0,0 @@ -variables: - ErrorActionPreference: STOP - -stages: - - environment - - deploy - - restart - - notify - -Update Environment: - tags: - - wzlmq-apps - stage: environment - only: - - master - script: - - '& cmd /k "C:\Anaconda3\Scripts\activate.bat E:\environments\$env:UUID & C:\Anaconda3\Scripts\conda.exe env update --name=E:\environments\$env:UUID --file .\environment\environment.yml --prune"' - - - -Copy To Container: - tags: - - wzlmq-apps - stage: deploy - only: - - master - script: - - taskkill /FI "WINDOWTITLE eq $env:UUID" - - Remove-Item -Recurse -Force -Path E:\apps\$env:UUID\* - - Copy-Item * -Destination E:\apps\$env:UUID\ -Recurse - # - Copy-Item $env:SECRETS\$env:UUID\config.json -Destination E:\apps\$env:UUID\ - - fsutil hardlink create E:\apps\$env:UUID\config.json $env:SECRETS\$env:UUID\config.json - - -Restart: - only: - - master - tags: - - wzlmq-apps - stage: restart - script: - - taskkill /FI "WINDOWTITLE eq $env:UUID" - - start E:\autostart\$env:UUID.bat - -Notify Rocket.Chat: - only: - - master - tags: - - wzlmq-apps - stage: notify - script: - - "Invoke-WebRequest -URI \"http://localhost:9002/?message=A new version of **MQTT%20Influx%20Bridge** is online!&happy=True\"" - - diff --git a/README.md b/README.md index 13ba11a1cc0cca1379f9c3cafc83543506f4518a..30a55d74c0add725c14b37df61226ef0cf17807a 100644 --- a/README.md +++ b/README.md @@ -1,43 +1,3 @@ -# MQTT +# WZL-MQTT -MQTT Client & Translational layer for InfluxDB - -# Usage -Create your own config.json from the sample file and ADD IT TO YOUR .gitignore to not upload your secrets! - - -# HowTo MQTT2InfluxDB - Workflow - -Preliminaries: - -- Have MQTT publish credentials @device (Acquire @mnt only) -- Ideally for debugging purposes have personal MQTT subscriber credentials (Acquire @mnt only) - - Good tool for debugging: MQTT-Spy -- Have device publishing stuff in mnt/bdn-standardized format Caas EDP (Questions? Sample in "performace_test" in this repo - More Questions? bdn/sdr/mnt) with configurable topic (root topic must be MQTT credential username) under VHost "metrology" -- Use topics according to device model (Again performance_test sample) -- Have Metrology Keycloak Account (bdn/mnt) - - For external DB-Access - - -Server side config: -- Create Database in InfluxDB instance, if not existing (bnd/mnt) - - `influx.exe -port "*port*"` - - `CREATE DATABASE *name*` -- For external DB-Access: - - Add DB read rights to user proxy (bnd/mnt) (did not find posiblity to grant wildcard read to proxy yet?) - - `GRANT READ ON *name* TO *proxy*` -- Add topics to write to database in form "*database*":[*topic-list*] to MQTT2InfluxDB config in E:/secrets/???/config.json (see existing topics there) (bdn/mnt) -- Copy config to MQTT2Influxdb directory -- Restart Translator - -Grafana Dashboard: -- Add datasource to Grafana (sdr, mnt) -- Create & Configure Dashboard (sdr, bdn, mnt) - -External DB Access: -- Have Keycloak access -- Have Keycloak rights for external DB-Access (bdn, mnt) -- Python: - - Use KeykloakInfluxDBClient (https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/digital-mars/keycloak-proxy) -- JS/Browser: - - Sample: https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/digital-mars/non-static/public/bzt \ No newline at end of file +**TODO**: document \ No newline at end of file diff --git a/_logger.py b/_logger.py deleted file mode 100644 index 024232aabb3fab6697471f76574c929c99da273a..0000000000000000000000000000000000000000 --- a/_logger.py +++ /dev/null @@ -1,35 +0,0 @@ -import datetime -import logging -import logging.config - -import sys -sys.argv[0] - -class Logger(object): - def __init__(self, path, level=logging.WARNING): - self.__path = path - self.root_logger = logging.getLogger(None) - self.root_logger.setLevel(level) - - if len(sys.argv[0].split("\\")) > 1: - filename = sys.argv[0].split("\\")[-1] - else: - filename = sys.argv[0].split("/")[-1] - self.file_handler = logging.handlers.RotatingFileHandler(self.__path + "{}.txt".format(filename), maxBytes=20*1024**2, backupCount=3) - self.file_handler.doRollover() - self.file_handler.setLevel(level) - self.console_handler = logging.StreamHandler() - self.console_handler.setLevel(level) - self.root_formatter = logging.Formatter('### %(levelname)-8s %(asctime)s %(name)-40s ###\n%(message)s\n') - self.file_handler.setFormatter(self.root_formatter) - self.console_handler.setFormatter(self.root_formatter) - self.root_logger.addHandler(self.file_handler) - self.root_logger.addHandler(self.console_handler) - - def get(self, name=None): - return logging.getLogger(name) - - def setLoggerLevel(self, level): - self.root_logger.setLevel(level) - self.console_handler.setLevel(level) - self.file_handler.setLevel(level) \ No newline at end of file diff --git a/config_sample.json b/config_sample.json deleted file mode 100644 index aaa0c07e46e8b3c6b1e3bb9dc05cf513133813f8..0000000000000000000000000000000000000000 --- a/config_sample.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "logging_level": 30, - - "DB_URL": "localhost", - "DB_PORT": 8086, - - "DB_USER": null, - "DB_PASSWORD": null, - - "MQTT_DB_TOPIC": { - "MachineToolTrace": - ["H2000-af974dbb-ed15-45b1-89d6-44c5372b1530"], - "DataLake": - ["senselab-6f766d91-1302-485a-b507-b9362dddd41d"], - }, - - "MQTT_USER": "???", - "MQTT_PASSWORD": "???", - - "MQTT_BROKER": "wzl-mbroker01.wzl.rwth-aachen.de", - "MQTT_PORT": 1883, - "MQTT_VHOST": "metrology", - - "NUM_WORKERS": 8, - "MAX_BLOCK_LENGTH": 50 -} \ No newline at end of file diff --git a/environment.yml b/environment.yml new file mode 100644 index 0000000000000000000000000000000000000000..951ad4c2035c87738f048efc73cd5f07ac6f7a1c --- /dev/null +++ b/environment.yml @@ -0,0 +1,20 @@ +name: mqtt +channels: + - defaults +dependencies: + - ca-certificates=2020.1.1=0 + - certifi=2020.4.5.2=py37_0 + - openssl=1.1.1g=he774522_0 + - pip=20.1.1=py37_1 + - python=3.7.7=h81c818b_4 + - setuptools=47.3.0=py37_0 + - sqlite=3.31.1=h2a8f88b_1 + - vc=14.1=h0510ff6_4 + - vs2015_runtime=14.16.27012=hf0eaf9b_2 + - wheel=0.34.2=py37_0 + - wincertstore=0.2=py37_0 + - zlib=1.2.11=h62dcd97_4 + - pip: + - paho-mqtt==1.5.0 + - wzl-utilities==1.0.0 +prefix: C:\Anaconda3\envs\mqtt diff --git a/influx_translator.bat b/influx_translator.bat deleted file mode 100644 index a087877944785c4d9f80195d4503304e6bfbd817..0000000000000000000000000000000000000000 --- a/influx_translator.bat +++ /dev/null @@ -1,2 +0,0 @@ -call activate mqtt -python -i .\influx_translator.py \ No newline at end of file diff --git a/influx_translator.py b/influx_translator.py deleted file mode 100644 index 5f1fb69d6688c06da54f5a25e117026acbf03f7e..0000000000000000000000000000000000000000 --- a/influx_translator.py +++ /dev/null @@ -1,287 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Created on Wed Nov 6 09:26:39 2019 - -@author: sdr -""" -import json -import time - -import threading -import queue - -import paho.mqtt.client as mqtt -import influxdb - -# %% CONFIG - -with open("config.json") as json_file: - CONFIG = json.load(json_file) - -from _logger import Logger - -root_logger = Logger("./logs/", level=CONFIG["logging_level"]) -logger = root_logger.get(__name__) - - -# %% InfluxDB Write Thread - - -class InfluxWorkerThread(threading.Thread): - def __init__(self, influx_client, dbs, q, identifier): - threading.Thread.__init__(self) - self.queue = q - self.influx_client = influx_client - self.identifier = identifier - self.dbs = dbs - - def run(self): - while True: - if not self.queue.empty(): - points = {x: [] for x in self.dbs} - try: - for i in range(CONFIG["MAX_BLOCK_LENGTH"]): - point = self.queue.get(timeout=0.01) - points[point[0]].append(point[1]) - except queue.Empty: - logger.debug("Queue Empty") - pass - except Exception as e: - logger.error(e) - for db in points: - if len(points[db]) > 0: - try: - logger.info( - f"{self.identifier} sent queue with length: {len(points[db])} to {db}" - ) - logger.debug(f"Writing\n\t{points[db]}\n\tto DB: {db}") - self.influx_client.write_points(points[db], database=db) - except Exception as e: - logger.error(e) - else: - time.sleep(0.1) - - -# %% MQTT2InfluxDB - - -class MQTT2InfluxDB: - """ - Guess what... - - Steals a lot from mnt - ruuviiinflux (THANKS), but more general data input - """ - - def __init__( - self, topic_list_db, host, port, username, password, influx_client, num_workers - ): - """ - Does, whatever init does. - - Args: - topic_list (TYPE): DESCRIPTION. - host (TYPE): DESCRIPTION. - port (TYPE): DESCRIPTION. - username (TYPE): DESCRIPTION. - password (TYPE): DESCRIPTION. - influx_client (TYPE): DESCRIPTION. - - Returns: - None. - - """ - self.client = mqtt.Client() - self.client.username_pw_set(username, password) - self.client.on_connect = self.on_connect - self.client.on_message = self.on_message - - self.client.connect(host=host, port=port, keepalive=60) - - self.topic_list_db = topic_list_db - print(topic_list_db) - - self.queue = queue.Queue(100000) - - self.influx_client = influx_client - logger.info(f"All DBs: {set([self.topic_list_db[i] for i in self.topic_list_db])}") - - self.threads = [ - InfluxWorkerThread( - influx_client=self.influx_client, - dbs=set([self.topic_list_db[i] for i in self.topic_list_db]), - q=self.queue, - identifier=i, - ) - for i in range(num_workers) - ] - for thread in self.threads: - thread.start() - - self.client.loop_start() - - def on_connect(self, client, userdata, flags, rc): - """ - Connect to user defined topics in topic_list. - - Callback for when the client receives a CONNACK response from the server. - Args: - client (TYPE): paho.mqtt client object. - userdata (TYPE): DESCRIPTION. - flags (TYPE): DESCRIPTION. - rc (TYPE): DESCRIPTION. - - Returns: - None. - - """ - print("Connected with result code " + str(rc)) - - # Subscribing in on_connect() means that if we lose the connection and - # reconnect then subscriptions will be renewed. - for topic in self.topic_list_db: - # print(topic) - client.subscribe(topic + "/#", 1) - - - def on_message(self, client, userdata, msg): - """ - Process message received from the server. - - Args: - client (TYPE): default. - userdata (TYPE): default. - msg (TYPE): MY PRECIOUS DATA. - - Returns: - None. - - """ - starttime = time.perf_counter() - try: - splits = msg.payload.decode("utf-8").split("}{") - for split in splits: - split = str(split) - logger.debug("### TOPIC ### \n" + str(msg.topic)) - logger.debug("### CONTENT ### \n" + split) - if len(splits) > 1: - if split[0] != "{": - split = "".join(["{", split]) - if split[-1] != "}": - split = "".join([split, "}"]) - payload_dict = json.loads(split) # .decode("utf-8")) - # if "nonce" in payload_dict: - # payload_dict["nonce"] = json.loads(payload_dict["nonce"]) - logger.debug("### CONTENT DICT ### \n" + str(payload_dict)) - self.to_database(topic=msg.topic, payload=payload_dict) - except Exception as e: - logger.error(f'{e} - {msg.topic}: {msg.payload.decode("utf-8")}') - # logger.error(msg.topic) - # logger.error(msg.payload.decode("utf-8")) - logger.info(str(1000 * (time.perf_counter() - starttime)) + " ms") - - def to_database(self, topic, payload): - """ - Write stuff received via MQTT to DB. - - Args: - topic (TYPE): topic string in lowercase. - payload (TYPE): payload as dict. - - Returns: - None. - - """ - # Choose time series based on variable (last element in uuid) - topic_list = topic.split("/") - logger.debug("### TOPCI LIST ###\n" + str(topic_list)) - measurement = topic_list[-1][4:] - tags = payload["nonce"] if "nonce" in payload and payload["nonce"] is not None else {} - logger.debug("### TAGS ###\n"+str(tags)) - if type(payload["value"]) == list: - if len(payload["value"]) == 1: - fields = { - measurement.lower(): float(payload["value"][0]) - if type(payload["value"][0]) == int - else payload["value"][0] - } - tags["uuid"] = "/".join(topic_list) - self.queue.put( - [ - self.topic_list_db[topic_list[0]], - { - "measurement": measurement.lower(), - "tags": tags, - "fields": fields, - "time": payload["timestamp"], - }, - ] - ) - - elif len(payload["value"]) > 1: - base_uuid = "/".join(topic_list) - fields = { - measurement.lower() - + "_" - + str(num): (float(value) if type(value) == int else value) - for num, value in enumerate(payload["value"]) - } # (float(value) if type(value)==int else value) - self.queue.put( - [ - self.topic_list_db[topic_list[0]], - { - "measurement": measurement.lower(), - "tags": {"uuid": base_uuid, **tags}, - "fields": fields, - "time": payload["timestamp"], - }, - ] - ) - else: - raise TypeError("Value should be list of length > 0") - else: - raise TypeError("Value should be list") - - -# %% MAIN - -if __name__ == "__main__": - - INFLUX_CLIENT = influxdb.InfluxDBClient( - host=CONFIG["DB_URL"], - port=CONFIG["DB_PORT"], - username=CONFIG["DB_USER"], - password=CONFIG["DB_PASSWORD"], - retries=0, - ) - databases = INFLUX_CLIENT.get_list_database() - for db in CONFIG["MQTT_DB_TOPIC"]: - sensor_db_created = False - for item in databases: - if item["name"] == db: - sensor_db_created = True - if not sensor_db_created: - INFLUX_CLIENT.create_database(db) - print("Database not found. Created new instance.") - else: - print("Database found. Ready for use.") - - # For config storage [db: topics] is much more sensible, - # but for programming the other way around is more useful - # SAME TOPIC IN MULTIPLE DBs IMPOSSIBLE - Double storage also makes no sense - - topics_dbs = {} - for db in CONFIG["MQTT_DB_TOPIC"]: - for topic in CONFIG["MQTT_DB_TOPIC"][db]: - if topic in topics_dbs: - raise ValueError("TOPIC CANNOT BE WRITTEN TO MULTIPLE DBs") - topics_dbs[topic] = db - - TRANSLATOR = MQTT2InfluxDB( - topic_list_db=topics_dbs, - host=CONFIG["MQTT_BROKER"], - port=CONFIG["MQTT_PORT"], - username=CONFIG["MQTT_VHOST"] + ":" + CONFIG["MQTT_USER"], - password=CONFIG["MQTT_PASSWORD"], - influx_client=INFLUX_CLIENT, - num_workers=CONFIG["NUM_WORKERS"], - ) diff --git a/__init__.py b/sample/__init__.py similarity index 100% rename from __init__.py rename to sample/__init__.py diff --git a/publish.py b/sample/publish.py similarity index 72% rename from publish.py rename to sample/publish.py index 0c49e30cccea228868de73aa10b50fbe19d9066f..e62fff87dcb5aea5139d1c1c434ebe7bd7d55d81 100644 --- a/publish.py +++ b/sample/publish.py @@ -1,17 +1,15 @@ -import wzl.mqtt -import time -import logging - +import datetime # For sample data import json -import numpy +import logging +import random +import time import uuid -import datetime ### Ask for settings and individual credentials ### -MQTT_USER = -MQTT_PASSWORD = +MQTT_USER = "" +MQTT_PASSWORD = "" MQTT_BROKER = "127.0.0.1" MQTT_PORT = 1883 @@ -33,12 +31,12 @@ if __name__=="__main__": MQTT.logger.addHandler(console_log) MQTT.logger.setLevel(logging.INFO) - client = MQTT.MQTTPublisher(MQTT_USER) + client = MQTTPublisher(MQTT_USER) client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":"+ MQTT_USER, MQTT_PASSWORD) while True: try: - message = json.dumps({"value" : numpy.random.uniform(0, 5,3).tolist(), "timestamp" : datetime.datetime.utcnow().isoformat() + "Z", "covariance": [[2,0,0], [0,2,0], [0,0,0]], "nonce" : str(uuid.uuid4()), "hash" : None, "unit" : "MTR"}) + message = json.dumps({"value" : random.random.uniform(0, 5,3).tolist(), "timestamp" : datetime.datetime.utcnow().isoformat() + "Z", "covariance": [[2,0,0], [0,2,0], [0,0,0]], "nonce" : str(uuid.uuid4()), "hash" : None, "unit" : "MTR"}) client.publish(MQTT_USER + "/channel-001",message.encode("utf-8"), 0) time.sleep(1) except KeyboardInterrupt: diff --git a/receive.py b/sample/receive.py similarity index 100% rename from receive.py rename to sample/receive.py diff --git a/setup.py b/setup.py index ce0e0eefb2cd66cde3069e53d22fa30f93f358cb..a369a8c55a7bf5cd9b7520ecadc016f6de3e8e84 100644 --- a/setup.py +++ b/setup.py @@ -1,13 +1,13 @@ from setuptools import setup, find_packages -setup(name='wzl-mqtt', - version='1.2.0', +setup(name='WZL-MQTT', + version='1.3.0', url='', author='Benjamin Montavon, Matthias Bodenbenner', author_email='m.bodenbenner@wzl.rwth-aachen.de', description='Small library containing an MQTT publisher and receiver.', - package_dir={'wzl': 'wzl'}, - packages=['wzl.mqtt'], + package_dir={'wzl': 'src'}, + packages=['wzl.utilities,', 'paho-mqtt'], # long_description=open('./README.md').read(), install_requires=['paho-mqtt'], zip_safe=False) diff --git a/wzl/__init__.py b/src/__init__.py similarity index 100% rename from wzl/__init__.py rename to src/__init__.py diff --git a/wzl/mqtt/__init__.py b/src/mqtt/__init__.py similarity index 100% rename from wzl/mqtt/__init__.py rename to src/mqtt/__init__.py diff --git a/wzl/mqtt/client.py b/src/mqtt/client.py similarity index 96% rename from wzl/mqtt/client.py rename to src/mqtt/client.py index 71327e1b028ba4af238f6afdf506eed44dc0f7c3..0b7dcd1f7fe9fd67a29a8585422ea152a2223f77 100644 --- a/wzl/mqtt/client.py +++ b/src/mqtt/client.py @@ -1,24 +1,15 @@ import functools -import logging - -logger = logging.getLogger('MQTT') -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - -import sys -import pathlib +import inspect +import warnings -__module_path__ = pathlib.Path(__file__).resolve().parent -if str(__module_path__) not in sys.path: - sys.path.append(str(__module_path__)) +import paho.mqtt.client as mqtt +from wzl.utilities import root_logger from .exceptions import ConnectionError -from .exceptions import SubscriptionError from .exceptions import PublishError +from .exceptions import SubscriptionError -import warnings -import inspect - -import paho.mqtt.client as mqtt +logger = root_logger.get('MQTTClient') class MQTTClient: diff --git a/wzl/mqtt/exceptions.py b/src/mqtt/exceptions.py similarity index 100% rename from wzl/mqtt/exceptions.py rename to src/mqtt/exceptions.py diff --git a/3d_test.py b/test/3d_test.py similarity index 100% rename from 3d_test.py rename to test/3d_test.py diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/measure_latency.pyw b/test/measure_latency.pyw similarity index 100% rename from measure_latency.pyw rename to test/measure_latency.pyw diff --git a/performance_test.py b/test/performance_test.py similarity index 100% rename from performance_test.py rename to test/performance_test.py diff --git a/receive_create_log.py b/test/receive_create_log.py similarity index 97% rename from receive_create_log.py rename to test/receive_create_log.py index 21858ac7347db53885479fcceb5c98d837caceab..93dbe1aa6c69cec6484ec49cc7c4ad90e7fdfd61 100644 --- a/receive_create_log.py +++ b/test/receive_create_log.py @@ -1,4 +1,4 @@ -from wzl import MQTT +from src import MQTT import time import logging import asyncio @@ -6,8 +6,8 @@ import datetime ### Ask for settings and individual credentials ### -MQTT_USER = -MQTT_PASSWORD = +MQTT_USER = "" +MQTT_PASSWORD = "" # MQTT_BROKER = "127.0.0.1" # MQTT_PORT = 1883