Select Git revision
influx_translator.py
influx_translator.py 9.21 KiB
# -*- coding: utf-8 -*-
"""
Created on Wed Nov 6 09:26:39 2019
@author: sdr
"""
import json
import time
import threading
import queue
import paho.mqtt.client as mqtt
import influxdb
# %% CONFIG
with open("config.json") as json_file:
CONFIG = json.load(json_file)
from _logger import Logger
root_logger = Logger("./logs/", level=CONFIG["logging_level"])
logger = root_logger.get(__name__)
# %% InfluxDB Write Thread
class InfluxWorkerThread(threading.Thread):
def __init__(self, influx_client, dbs, q, identifier):
threading.Thread.__init__(self)
self.queue = q
self.influx_client = influx_client
self.identifier = identifier
self.dbs = dbs
def run(self):
while True:
if not self.queue.empty():
points = {x: [] for x in self.dbs}
try:
for i in range(CONFIG["MAX_BLOCK_LENGTH"]):
point = self.queue.get(timeout=0.01)
points[point[0]].append(point[1])
except queue.Empty:
logger.debug("Queue Empty")
pass
except Exception as e:
logger.error(e)
for db in points:
if len(points[db]) > 0:
try:
logger.info(
f"{self.identifier} sent queue with length: {len(points[db])} to {db}"
)
logger.debug(f"Writing\n\t{points[db]}\n\tto DB: {db}")
self.influx_client.write_points(points[db], database=db)
except Exception as e:
logger.error(e)
else:
time.sleep(0.1)
# %% MQTT2InfluxDB
class MQTT2InfluxDB:
"""
Guess what...
Steals a lot from mnt - ruuviiinflux (THANKS), but more general data input
"""
def __init__(
self, topic_list_db, host, port, username, password, influx_client, num_workers
):
"""
Does, whatever init does.
Args:
topic_list (TYPE): DESCRIPTION.
host (TYPE): DESCRIPTION.
port (TYPE): DESCRIPTION.
username (TYPE): DESCRIPTION.
password (TYPE): DESCRIPTION.
influx_client (TYPE): DESCRIPTION.
Returns:
None.
"""
self.client = mqtt.Client()
self.client.username_pw_set(username, password)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(host=host, port=port, keepalive=60)
self.topic_list_db = topic_list_db
print(topic_list_db)
self.queue = queue.Queue(100000)
self.influx_client = influx_client
logger.info(f"All DBs: {set([self.topic_list_db[i] for i in self.topic_list_db])}")
self.threads = [
InfluxWorkerThread(
influx_client=self.influx_client,
dbs=set([self.topic_list_db[i] for i in self.topic_list_db]),
q=self.queue,
identifier=i,
)
for i in range(num_workers)
]
for thread in self.threads:
thread.start()
self.client.loop_start()
def on_connect(self, client, userdata, flags, rc):
"""
Connect to user defined topics in topic_list.
Callback for when the client receives a CONNACK response from the server.
Args:
client (TYPE): paho.mqtt client object.
userdata (TYPE): DESCRIPTION.
flags (TYPE): DESCRIPTION.
rc (TYPE): DESCRIPTION.
Returns:
None.
"""
print("Connected with result code " + str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
for topic in self.topic_list_db:
# print(topic)
client.subscribe(topic + "/#", 1)
def on_message(self, client, userdata, msg):
"""
Process message received from the server.
Args:
client (TYPE): default.
userdata (TYPE): default.
msg (TYPE): MY PRECIOUS DATA.
Returns:
None.
"""
starttime = time.perf_counter()
try:
splits = msg.payload.decode("utf-8").split("}{")
for split in splits:
split = str(split)
logger.debug("### TOPIC ### \n" + str(msg.topic))
logger.debug("### CONTENT ### \n" + split)
if len(splits) > 1:
if split[0] != "{":
split = "".join(["{", split])
if split[-1] != "}":
split = "".join([split, "}"])
payload_dict = json.loads(split) # .decode("utf-8"))
# if "nonce" in payload_dict:
# payload_dict["nonce"] = json.loads(payload_dict["nonce"])
logger.debug("### CONTENT DICT ### \n" + str(payload_dict))
self.to_database(topic=msg.topic, payload=payload_dict)
except Exception as e:
logger.error(f'{e} - {msg.topic}: {msg.payload.decode("utf-8")}')
# logger.error(msg.topic)
# logger.error(msg.payload.decode("utf-8"))
logger.info(str(1000 * (time.perf_counter() - starttime)) + " ms")
def to_database(self, topic, payload):
"""
Write stuff received via MQTT to DB.
Args:
topic (TYPE): topic string in lowercase.
payload (TYPE): payload as dict.
Returns:
None.
"""
# Choose time series based on variable (last element in uuid)
topic_list = topic.split("/")
logger.debug("### TOPCI LIST ###\n" + str(topic_list))
measurement = topic_list[-1][4:]
tags = payload["nonce"] if "nonce" in payload and payload["nonce"] is not None else {}
logger.debug("### TAGS ###\n"+str(tags))
if type(payload["value"]) == list:
if len(payload["value"]) == 1:
fields = {
measurement.lower(): float(payload["value"][0])
if type(payload["value"][0]) == int
else payload["value"][0]
}
tags["uuid"] = "/".join(topic_list)
self.queue.put(
[
self.topic_list_db[topic_list[0]],
{
"measurement": measurement.lower(),
"tags": tags,
"fields": fields,
"time": payload["timestamp"],
},
]
)
elif len(payload["value"]) > 1:
base_uuid = "/".join(topic_list)
fields = {
measurement.lower()
+ "_"
+ str(num): (float(value) if type(value) == int else value)
for num, value in enumerate(payload["value"])
} # (float(value) if type(value)==int else value)
self.queue.put(
[
self.topic_list_db[topic_list[0]],
{
"measurement": measurement.lower(),
"tags": {"uuid": base_uuid, **tags},
"fields": fields,
"time": payload["timestamp"],
},
]
)
else:
raise TypeError("Value should be list of length > 0")
else:
raise TypeError("Value should be list")
# %% MAIN
if __name__ == "__main__":
INFLUX_CLIENT = influxdb.InfluxDBClient(
host=CONFIG["DB_URL"],
port=CONFIG["DB_PORT"],
username=CONFIG["DB_USER"],
password=CONFIG["DB_PASSWORD"],
retries=0,
)
databases = INFLUX_CLIENT.get_list_database()
for db in CONFIG["MQTT_DB_TOPIC"]:
sensor_db_created = False
for item in databases:
if item["name"] == db:
sensor_db_created = True
if not sensor_db_created:
INFLUX_CLIENT.create_database(db)
print("Database not found. Created new instance.")
else:
print("Database found. Ready for use.")
# For config storage [db: topics] is much more sensible,
# but for programming the other way around is more useful
# SAME TOPIC IN MULTIPLE DBs IMPOSSIBLE - Double storage also makes no sense
topics_dbs = {}
for db in CONFIG["MQTT_DB_TOPIC"]:
for topic in CONFIG["MQTT_DB_TOPIC"][db]:
if topic in topics_dbs:
raise ValueError("TOPIC CANNOT BE WRITTEN TO MULTIPLE DBs")
topics_dbs[topic] = db
TRANSLATOR = MQTT2InfluxDB(
topic_list_db=topics_dbs,
host=CONFIG["MQTT_BROKER"],
port=CONFIG["MQTT_PORT"],
username=CONFIG["MQTT_VHOST"] + ":" + CONFIG["MQTT_USER"],
password=CONFIG["MQTT_PASSWORD"],
influx_client=INFLUX_CLIENT,
num_workers=CONFIG["NUM_WORKERS"],
)