Skip to content
Snippets Groups Projects
Select Git revision
  • 0a1bf3d37e385367f7ef55db3c04b6182e29d432
  • master default protected
  • b24_tutorial
  • 2.4.1
  • 1.3.0
  • 1.2.0
6 results

influx_translator.py

Blame
  • influx_translator.py 9.21 KiB
    # -*- coding: utf-8 -*-
    """
    Created on Wed Nov  6 09:26:39 2019
    
    @author: sdr
    """
    import json
    import time
    
    import threading
    import queue
    
    import paho.mqtt.client as mqtt
    import influxdb
    
    # %% CONFIG
    
    with open("config.json") as json_file:
        CONFIG = json.load(json_file)
    
    from _logger import Logger
    
    root_logger = Logger("./logs/", level=CONFIG["logging_level"])
    logger = root_logger.get(__name__)
    
    
    # %% InfluxDB Write Thread
    
    
    class InfluxWorkerThread(threading.Thread):
        def __init__(self, influx_client, dbs, q, identifier):
            threading.Thread.__init__(self)
            self.queue = q
            self.influx_client = influx_client
            self.identifier = identifier
            self.dbs = dbs
    
        def run(self):
            while True:
                if not self.queue.empty():
                    points = {x: [] for x in self.dbs}
                    try:
                        for i in range(CONFIG["MAX_BLOCK_LENGTH"]):
                            point = self.queue.get(timeout=0.01)
                            points[point[0]].append(point[1])
                    except queue.Empty:
                        logger.debug("Queue Empty")
                        pass
                    except Exception as e:
                        logger.error(e)
                    for db in points:
                        if len(points[db]) > 0:
                            try:
                                logger.info(
                                    f"{self.identifier} sent queue with length: {len(points[db])} to {db}"
                                )
                                logger.debug(f"Writing\n\t{points[db]}\n\tto DB: {db}")
                                self.influx_client.write_points(points[db], database=db)
                            except Exception as e:
                                logger.error(e)
                else:
                    time.sleep(0.1)
    
    
    # %% MQTT2InfluxDB
    
    
    class MQTT2InfluxDB:
        """
        Guess what...
    
        Steals a lot from mnt - ruuviiinflux (THANKS), but more general data input
        """
    
        def __init__(
            self, topic_list_db, host, port, username, password, influx_client, num_workers
        ):
            """
            Does, whatever init does.
    
            Args:
                topic_list (TYPE): DESCRIPTION.
                host (TYPE): DESCRIPTION.
                port (TYPE): DESCRIPTION.
                username (TYPE): DESCRIPTION.
                password (TYPE): DESCRIPTION.
                influx_client (TYPE): DESCRIPTION.
    
            Returns:
                None.
    
            """
            self.client = mqtt.Client()
            self.client.username_pw_set(username, password)
            self.client.on_connect = self.on_connect
            self.client.on_message = self.on_message
    
            self.client.connect(host=host, port=port, keepalive=60)
    
            self.topic_list_db = topic_list_db
            print(topic_list_db)
    
            self.queue = queue.Queue(100000)
    
            self.influx_client = influx_client
            logger.info(f"All DBs: {set([self.topic_list_db[i] for i in self.topic_list_db])}")
    
            self.threads = [
                InfluxWorkerThread(
                    influx_client=self.influx_client,
                    dbs=set([self.topic_list_db[i] for i in self.topic_list_db]),
                    q=self.queue,
                    identifier=i,
                )
                for i in range(num_workers)
            ]
            for thread in self.threads:
                thread.start()
    
            self.client.loop_start()
    
        def on_connect(self, client, userdata, flags, rc):
            """
            Connect to user defined topics in topic_list.
    
            Callback for when the client receives a CONNACK response from the server.
            Args:
                client (TYPE): paho.mqtt client object.
                userdata (TYPE): DESCRIPTION.
                flags (TYPE): DESCRIPTION.
                rc (TYPE): DESCRIPTION.
    
            Returns:
                None.
    
            """
            print("Connected with result code " + str(rc))
    
            # Subscribing in on_connect() means that if we lose the connection and
            # reconnect then subscriptions will be renewed.
            for topic in self.topic_list_db:
                # print(topic)
                client.subscribe(topic + "/#", 1)
    
    
        def on_message(self, client, userdata, msg):
            """
            Process message received from the server.
    
            Args:
                client (TYPE): default.
                userdata (TYPE): default.
                msg (TYPE): MY PRECIOUS DATA.
    
            Returns:
                None.
    
            """
            starttime = time.perf_counter()
            try:
                splits = msg.payload.decode("utf-8").split("}{")
                for split in splits:
                    split = str(split)
                    logger.debug("### TOPIC ### \n" + str(msg.topic))
                    logger.debug("### CONTENT ### \n" + split)
                    if len(splits) > 1:
                        if split[0] != "{":
                            split = "".join(["{", split])
                        if split[-1] != "}":
                            split = "".join([split, "}"])
                    payload_dict = json.loads(split)  # .decode("utf-8"))
                    #            if "nonce" in payload_dict:
                    #                payload_dict["nonce"] = json.loads(payload_dict["nonce"])
                    logger.debug("### CONTENT DICT ### \n" + str(payload_dict))
                    self.to_database(topic=msg.topic, payload=payload_dict)
            except Exception as e:
                logger.error(f'{e} - {msg.topic}: {msg.payload.decode("utf-8")}')
                # logger.error(msg.topic)
                # logger.error(msg.payload.decode("utf-8"))
            logger.info(str(1000 * (time.perf_counter() - starttime)) + " ms")
    
        def to_database(self, topic, payload):
            """
            Write stuff received via MQTT to DB.
    
            Args:
                topic (TYPE): topic string in lowercase.
                payload (TYPE): payload as dict.
    
            Returns:
                None.
    
            """
            # Choose time series based on variable (last element in uuid)
            topic_list = topic.split("/")
            logger.debug("### TOPCI LIST ###\n" + str(topic_list))
            measurement = topic_list[-1][4:]
            tags = payload["nonce"] if "nonce" in payload and payload["nonce"] is not None else {}
            logger.debug("### TAGS ###\n"+str(tags))
            if type(payload["value"]) == list:
                if len(payload["value"]) == 1:
                    fields = {
                        measurement.lower(): float(payload["value"][0])
                        if type(payload["value"][0]) == int
                        else payload["value"][0]
                    }
                    tags["uuid"] = "/".join(topic_list)
                    self.queue.put(
                        [
                            self.topic_list_db[topic_list[0]],
                            {
                                "measurement": measurement.lower(),
                                "tags": tags,
                                "fields": fields,
                                "time": payload["timestamp"],
                            },
                        ]
                    )
    
                elif len(payload["value"]) > 1:
                    base_uuid = "/".join(topic_list)
                    fields = {
                        measurement.lower()
                        + "_"
                        + str(num): (float(value) if type(value) == int else value)
                        for num, value in enumerate(payload["value"])
                    }  # (float(value) if type(value)==int else value)
                    self.queue.put(
                        [
                            self.topic_list_db[topic_list[0]],
                            {
                                "measurement": measurement.lower(),
                                "tags": {"uuid": base_uuid, **tags},
                                "fields": fields,
                                "time": payload["timestamp"],
                            },
                        ]
                    )
                else:
                    raise TypeError("Value should be list of length > 0")
            else:
                raise TypeError("Value should be list")
    
    
    # %% MAIN
    
    if __name__ == "__main__":
    
        INFLUX_CLIENT = influxdb.InfluxDBClient(
            host=CONFIG["DB_URL"],
            port=CONFIG["DB_PORT"],
            username=CONFIG["DB_USER"],
            password=CONFIG["DB_PASSWORD"],
            retries=0,
        )
        databases = INFLUX_CLIENT.get_list_database()
        for db in CONFIG["MQTT_DB_TOPIC"]:
            sensor_db_created = False
            for item in databases:
                if item["name"] == db:
                    sensor_db_created = True
            if not sensor_db_created:
                INFLUX_CLIENT.create_database(db)
                print("Database not found. Created new instance.")
            else:
                print("Database found. Ready for use.")
    
        # For config storage [db: topics] is much more sensible,
        # but for programming the other way around is more useful
        # SAME TOPIC IN MULTIPLE DBs IMPOSSIBLE - Double storage also makes no sense
    
        topics_dbs = {}
        for db in CONFIG["MQTT_DB_TOPIC"]:
            for topic in CONFIG["MQTT_DB_TOPIC"][db]:
                if topic in topics_dbs:
                    raise ValueError("TOPIC CANNOT BE WRITTEN TO MULTIPLE DBs")
                topics_dbs[topic] = db
    
        TRANSLATOR = MQTT2InfluxDB(
            topic_list_db=topics_dbs,
            host=CONFIG["MQTT_BROKER"],
            port=CONFIG["MQTT_PORT"],
            username=CONFIG["MQTT_VHOST"] + ":" + CONFIG["MQTT_USER"],
            password=CONFIG["MQTT_PASSWORD"],
            influx_client=INFLUX_CLIENT,
            num_workers=CONFIG["NUM_WORKERS"],
        )