Select Git revision
performance_test.py
-
Matthias Stefan Bodenbenner authoredMatthias Stefan Bodenbenner authored
performance_test.py 828 B
import src.mqtt as mqtt
import json
from datetime import datetime
import random
import time
MQTT_USER = "test"
MQTT_PASSWORD = "test"
MQTT_BROKER = "localhost"#"134.130.225.37"#
MQTT_PORT = 1883
MQTT_VHOST = "/"
client = mqtt.client.MQTTPublisher(MQTT_USER)
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_VHOST + ":"+ MQTT_USER, MQTT_PASSWORD)
test=0
i=0
while True:
i+=1
test += random.random()-0.5
client.publish(
topic="quatsch"+"/OBJ-Environment/OBJ-0/VAR-TEST",
message=json.dumps({
"value": [float(test)],
"timestamp": datetime.utcnow().isoformat()+"Z",
"covariance": None,
"nonce": {},
"hash": None,
"unit": "P1"
}),
qos=0,
)
print(i)
time.sleep(1)