Skip to content
Snippets Groups Projects
Commit 05e0700c authored by Matthias Bodenbenner's avatar Matthias Bodenbenner
Browse files

removed influx-brigde related code

parent 3d325d90
Branches
Tags
No related merge requests found
variables:
ErrorActionPreference: STOP
stages:
- environment
- deploy
- restart
- notify
Update Environment:
tags:
- wzlmq-apps
stage: environment
only:
- master
script:
- '& cmd /k "C:\Anaconda3\Scripts\activate.bat E:\environments\$env:UUID & C:\Anaconda3\Scripts\conda.exe env update --name=E:\environments\$env:UUID --file .\environment\environment.yml --prune"'
Copy To Container:
tags:
- wzlmq-apps
stage: deploy
only:
- master
script:
- taskkill /FI "WINDOWTITLE eq $env:UUID"
- Remove-Item -Recurse -Force -Path E:\apps\$env:UUID\*
- Copy-Item * -Destination E:\apps\$env:UUID\ -Recurse
# - Copy-Item $env:SECRETS\$env:UUID\config.json -Destination E:\apps\$env:UUID\
- fsutil hardlink create E:\apps\$env:UUID\config.json $env:SECRETS\$env:UUID\config.json
Restart:
only:
- master
tags:
- wzlmq-apps
stage: restart
script:
- taskkill /FI "WINDOWTITLE eq $env:UUID"
- start E:\autostart\$env:UUID.bat
Notify Rocket.Chat:
only:
- master
tags:
- wzlmq-apps
stage: notify
script:
- "Invoke-WebRequest -URI \"http://localhost:9002/?message=A new version of **MQTT%20Influx%20Bridge** is online!&happy=True\""
# MQTT
# WZL-MQTT
MQTT Client & Translational layer for InfluxDB
# Usage
Create your own config.json from the sample file and ADD IT TO YOUR .gitignore to not upload your secrets!
# HowTo MQTT2InfluxDB - Workflow
Preliminaries:
- Have MQTT publish credentials @device (Acquire @mnt only)
- Ideally for debugging purposes have personal MQTT subscriber credentials (Acquire @mnt only)
- Good tool for debugging: MQTT-Spy
- Have device publishing stuff in mnt/bdn-standardized format Caas EDP (Questions? Sample in "performace_test" in this repo - More Questions? bdn/sdr/mnt) with configurable topic (root topic must be MQTT credential username) under VHost "metrology"
- Use topics according to device model (Again performance_test sample)
- Have Metrology Keycloak Account (bdn/mnt)
- For external DB-Access
Server side config:
- Create Database in InfluxDB instance, if not existing (bnd/mnt)
- `influx.exe -port "*port*"`
- `CREATE DATABASE *name*`
- For external DB-Access:
- Add DB read rights to user proxy (bnd/mnt) (did not find posiblity to grant wildcard read to proxy yet?)
- `GRANT READ ON *name* TO *proxy*`
- Add topics to write to database in form "*database*":[*topic-list*] to MQTT2InfluxDB config in E:/secrets/???/config.json (see existing topics there) (bdn/mnt)
- Copy config to MQTT2Influxdb directory
- Restart Translator
Grafana Dashboard:
- Add datasource to Grafana (sdr, mnt)
- Create & Configure Dashboard (sdr, bdn, mnt)
External DB Access:
- Have Keycloak access
- Have Keycloak rights for external DB-Access (bdn, mnt)
- Python:
- Use KeykloakInfluxDBClient (https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/digital-mars/keycloak-proxy)
- JS/Browser:
- Sample: https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/digital-mars/non-static/public/bzt
\ No newline at end of file
**TODO**: document
\ No newline at end of file
import datetime
import logging
import logging.config
import sys
sys.argv[0]
class Logger(object):
def __init__(self, path, level=logging.WARNING):
self.__path = path
self.root_logger = logging.getLogger(None)
self.root_logger.setLevel(level)
if len(sys.argv[0].split("\\")) > 1:
filename = sys.argv[0].split("\\")[-1]
else:
filename = sys.argv[0].split("/")[-1]
self.file_handler = logging.handlers.RotatingFileHandler(self.__path + "{}.txt".format(filename), maxBytes=20*1024**2, backupCount=3)
self.file_handler.doRollover()
self.file_handler.setLevel(level)
self.console_handler = logging.StreamHandler()
self.console_handler.setLevel(level)
self.root_formatter = logging.Formatter('### %(levelname)-8s %(asctime)s %(name)-40s ###\n%(message)s\n')
self.file_handler.setFormatter(self.root_formatter)
self.console_handler.setFormatter(self.root_formatter)
self.root_logger.addHandler(self.file_handler)
self.root_logger.addHandler(self.console_handler)
def get(self, name=None):
return logging.getLogger(name)
def setLoggerLevel(self, level):
self.root_logger.setLevel(level)
self.console_handler.setLevel(level)
self.file_handler.setLevel(level)
\ No newline at end of file
{
"logging_level": 30,
"DB_URL": "localhost",
"DB_PORT": 8086,
"DB_USER": null,
"DB_PASSWORD": null,
"MQTT_DB_TOPIC": {
"MachineToolTrace":
["H2000-af974dbb-ed15-45b1-89d6-44c5372b1530"],
"DataLake":
["senselab-6f766d91-1302-485a-b507-b9362dddd41d"],
},
"MQTT_USER": "???",
"MQTT_PASSWORD": "???",
"MQTT_BROKER": "wzl-mbroker01.wzl.rwth-aachen.de",
"MQTT_PORT": 1883,
"MQTT_VHOST": "metrology",
"NUM_WORKERS": 8,
"MAX_BLOCK_LENGTH": 50
}
\ No newline at end of file
name: mqtt
channels:
- defaults
dependencies:
- ca-certificates=2020.1.1=0
- certifi=2020.4.5.2=py37_0
- openssl=1.1.1g=he774522_0
- pip=20.1.1=py37_1
- python=3.7.7=h81c818b_4
- setuptools=47.3.0=py37_0
- sqlite=3.31.1=h2a8f88b_1
- vc=14.1=h0510ff6_4
- vs2015_runtime=14.16.27012=hf0eaf9b_2
- wheel=0.34.2=py37_0
- wincertstore=0.2=py37_0
- zlib=1.2.11=h62dcd97_4
- pip:
- paho-mqtt==1.5.0
- wzl-utilities==1.0.0
prefix: C:\Anaconda3\envs\mqtt
call activate mqtt
python -i .\influx_translator.py
\ No newline at end of file
# -*- coding: utf-8 -*-
"""
Created on Wed Nov 6 09:26:39 2019
@author: sdr
"""
import json
import time
import threading
import queue
import paho.mqtt.client as mqtt
import influxdb
# %% CONFIG
with open("config.json") as json_file:
CONFIG = json.load(json_file)
from _logger import Logger
root_logger = Logger("./logs/", level=CONFIG["logging_level"])
logger = root_logger.get(__name__)
# %% InfluxDB Write Thread
class InfluxWorkerThread(threading.Thread):
def __init__(self, influx_client, dbs, q, identifier):
threading.Thread.__init__(self)
self.queue = q
self.influx_client = influx_client
self.identifier = identifier
self.dbs = dbs
def run(self):
while True:
if not self.queue.empty():
points = {x: [] for x in self.dbs}
try:
for i in range(CONFIG["MAX_BLOCK_LENGTH"]):
point = self.queue.get(timeout=0.01)
points[point[0]].append(point[1])
except queue.Empty:
logger.debug("Queue Empty")
pass
except Exception as e:
logger.error(e)
for db in points:
if len(points[db]) > 0:
try:
logger.info(
f"{self.identifier} sent queue with length: {len(points[db])} to {db}"
)
logger.debug(f"Writing\n\t{points[db]}\n\tto DB: {db}")
self.influx_client.write_points(points[db], database=db)
except Exception as e:
logger.error(e)
else:
time.sleep(0.1)
# %% MQTT2InfluxDB
class MQTT2InfluxDB:
"""
Guess what...
Steals a lot from mnt - ruuviiinflux (THANKS), but more general data input
"""
def __init__(
self, topic_list_db, host, port, username, password, influx_client, num_workers
):
"""
Does, whatever init does.
Args:
topic_list (TYPE): DESCRIPTION.
host (TYPE): DESCRIPTION.
port (TYPE): DESCRIPTION.
username (TYPE): DESCRIPTION.
password (TYPE): DESCRIPTION.
influx_client (TYPE): DESCRIPTION.
Returns:
None.
"""
self.client = mqtt.Client()
self.client.username_pw_set(username, password)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(host=host, port=port, keepalive=60)
self.topic_list_db = topic_list_db
print(topic_list_db)
self.queue = queue.Queue(100000)
self.influx_client = influx_client
logger.info(f"All DBs: {set([self.topic_list_db[i] for i in self.topic_list_db])}")
self.threads = [
InfluxWorkerThread(
influx_client=self.influx_client,
dbs=set([self.topic_list_db[i] for i in self.topic_list_db]),
q=self.queue,
identifier=i,
)
for i in range(num_workers)
]
for thread in self.threads:
thread.start()
self.client.loop_start()
def on_connect(self, client, userdata, flags, rc):
"""
Connect to user defined topics in topic_list.
Callback for when the client receives a CONNACK response from the server.
Args:
client (TYPE): paho.mqtt client object.
userdata (TYPE): DESCRIPTION.
flags (TYPE): DESCRIPTION.
rc (TYPE): DESCRIPTION.
Returns:
None.
"""
print("Connected with result code " + str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
for topic in self.topic_list_db:
# print(topic)
client.subscribe(topic + "/#", 1)
def on_message(self, client, userdata, msg):
"""
Process message received from the server.
Args:
client (TYPE): default.
userdata (TYPE): default.
msg (TYPE): MY PRECIOUS DATA.
Returns:
None.
"""
starttime = time.perf_counter()
try:
splits = msg.payload.decode("utf-8").split("}{")
for split in splits:
split = str(split)
logger.debug("### TOPIC ### \n" + str(msg.topic))
logger.debug("### CONTENT ### \n" + split)
if len(splits) > 1:
if split[0] != "{":
split = "".join(["{", split])
if split[-1] != "}":
split = "".join([split, "}"])
payload_dict = json.loads(split) # .decode("utf-8"))
# if "nonce" in payload_dict:
# payload_dict["nonce"] = json.loads(payload_dict["nonce"])
logger.debug("### CONTENT DICT ### \n" + str(payload_dict))
self.to_database(topic=msg.topic, payload=payload_dict)
except Exception as e:
logger.error(f'{e} - {msg.topic}: {msg.payload.decode("utf-8")}')
# logger.error(msg.topic)
# logger.error(msg.payload.decode("utf-8"))
logger.info(str(1000 * (time.perf_counter() - starttime)) + " ms")
def to_database(self, topic, payload):
"""
Write stuff received via MQTT to DB.
Args:
topic (TYPE): topic string in lowercase.
payload (TYPE): payload as dict.
Returns:
None.
"""
# Choose time series based on variable (last element in uuid)
topic_list = topic.split("/")
logger.debug("### TOPCI LIST ###\n" + str(topic_list))
measurement = topic_list[-1][4:]
tags = payload["nonce"] if "nonce" in payload and payload["nonce"] is not None else {}
logger.debug("### TAGS ###\n"+str(tags))
if type(payload["value"]) == list:
if len(payload["value"]) == 1:
fields = {
measurement.lower(): float(payload["value"][0])
if type(payload["value"][0]) == int
else payload["value"][0]
}
tags["uuid"] = "/".join(topic_list)
self.queue.put(
[
self.topic_list_db[topic_list[0]],
{
"measurement": measurement.lower(),
"tags": tags,
"fields": fields,
"time": payload["timestamp"],
},
]
)
elif len(payload["value"]) > 1:
base_uuid = "/".join(topic_list)
fields = {
measurement.lower()
+ "_"
+ str(num): (float(value) if type(value) == int else value)
for num, value in enumerate(payload["value"])
} # (float(value) if type(value)==int else value)
self.queue.put(
[
self.topic_list_db[topic_list[0]],
{
"measurement": measurement.lower(),
"tags": {"uuid": base_uuid, **tags},
"fields": fields,
"time": payload["timestamp"],
},
]
)
else:
raise TypeError("Value should be list of length > 0")
else:
raise TypeError("Value should be list")
# %% MAIN
if __name__ == "__main__":
INFLUX_CLIENT = influxdb.InfluxDBClient(
host=CONFIG["DB_URL"],
port=CONFIG["DB_PORT"],
username=CONFIG["DB_USER"],
password=CONFIG["DB_PASSWORD"],
retries=0,
)
databases = INFLUX_CLIENT.get_list_database()
for db in CONFIG["MQTT_DB_TOPIC"]:
sensor_db_created = False
for item in databases:
if item["name"] == db:
sensor_db_created = True
if not sensor_db_created:
INFLUX_CLIENT.create_database(db)
print("Database not found. Created new instance.")
else:
print("Database found. Ready for use.")
# For config storage [db: topics] is much more sensible,
# but for programming the other way around is more useful
# SAME TOPIC IN MULTIPLE DBs IMPOSSIBLE - Double storage also makes no sense
topics_dbs = {}
for db in CONFIG["MQTT_DB_TOPIC"]:
for topic in CONFIG["MQTT_DB_TOPIC"][db]:
if topic in topics_dbs:
raise ValueError("TOPIC CANNOT BE WRITTEN TO MULTIPLE DBs")
topics_dbs[topic] = db
TRANSLATOR = MQTT2InfluxDB(
topic_list_db=topics_dbs,
host=CONFIG["MQTT_BROKER"],
port=CONFIG["MQTT_PORT"],
username=CONFIG["MQTT_VHOST"] + ":" + CONFIG["MQTT_USER"],
password=CONFIG["MQTT_PASSWORD"],
influx_client=INFLUX_CLIENT,
num_workers=CONFIG["NUM_WORKERS"],
)
File moved
import wzl.mqtt
import time
import logging
import datetime
# For sample data
import json
import numpy
import logging
import random
import time
import uuid
import datetime
### Ask for settings and individual credentials ###
MQTT_USER =
MQTT_PASSWORD =
MQTT_USER = ""
MQTT_PASSWORD = ""
MQTT_BROKER = "127.0.0.1"
MQTT_PORT = 1883
......@@ -33,12 +31,12 @@ if __name__=="__main__":
MQTT.logger.addHandler(console_log)
MQTT.logger.setLevel(logging.INFO)
client = MQTT.MQTTPublisher(MQTT_USER)
client = MQTTPublisher(MQTT_USER)
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":"+ MQTT_USER, MQTT_PASSWORD)
while True:
try:
message = json.dumps({"value" : numpy.random.uniform(0, 5,3).tolist(), "timestamp" : datetime.datetime.utcnow().isoformat() + "Z", "covariance": [[2,0,0], [0,2,0], [0,0,0]], "nonce" : str(uuid.uuid4()), "hash" : None, "unit" : "MTR"})
message = json.dumps({"value" : random.random.uniform(0, 5,3).tolist(), "timestamp" : datetime.datetime.utcnow().isoformat() + "Z", "covariance": [[2,0,0], [0,2,0], [0,0,0]], "nonce" : str(uuid.uuid4()), "hash" : None, "unit" : "MTR"})
client.publish(MQTT_USER + "/channel-001",message.encode("utf-8"), 0)
time.sleep(1)
except KeyboardInterrupt:
......
File moved
from setuptools import setup, find_packages
setup(name='wzl-mqtt',
version='1.2.0',
setup(name='WZL-MQTT',
version='1.3.0',
url='',
author='Benjamin Montavon, Matthias Bodenbenner',
author_email='m.bodenbenner@wzl.rwth-aachen.de',
description='Small library containing an MQTT publisher and receiver.',
package_dir={'wzl': 'wzl'},
packages=['wzl.mqtt'],
package_dir={'wzl': 'src'},
packages=['wzl.utilities,', 'paho-mqtt'],
# long_description=open('./README.md').read(),
install_requires=['paho-mqtt'],
zip_safe=False)
File moved
File moved
import functools
import logging
logger = logging.getLogger('MQTT')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
import sys
import pathlib
import inspect
import warnings
__module_path__ = pathlib.Path(__file__).resolve().parent
if str(__module_path__) not in sys.path:
sys.path.append(str(__module_path__))
import paho.mqtt.client as mqtt
from wzl.utilities import root_logger
from .exceptions import ConnectionError
from .exceptions import SubscriptionError
from .exceptions import PublishError
from .exceptions import SubscriptionError
import warnings
import inspect
import paho.mqtt.client as mqtt
logger = root_logger.get('MQTTClient')
class MQTTClient:
......
File moved
File moved
File moved
File moved
from wzl import MQTT
from src import MQTT
import time
import logging
import asyncio
......@@ -6,8 +6,8 @@ import datetime
### Ask for settings and individual credentials ###
MQTT_USER =
MQTT_PASSWORD =
MQTT_USER = ""
MQTT_PASSWORD = ""
# MQTT_BROKER = "127.0.0.1"
# MQTT_PORT = 1883
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment