std_of_time_series
The snippet can be accessed without any authentication.
Authored by
Khoa Nguyen
Edited
std_of_time_series.py 1.31 KiB
import statistics
from influxdb_client_3 import SYNCHRONOUS, Point
from influxdb_client_3.write_client.client import influxdb_client
def calculate_std(time_series):
if len(time_series) < 2:
return None
std_deviation = statistics.stdev(time_series)
return std_deviation
def std_deviation_to_influxdb(time_serie_data, url, token, org, bucket_name):
""" Function to calculate standard deviation from time series and store it in influxdb
:param time_serie_data: given time series data
:param url: url of influxdb server
:param token: token of influxdb client
:param org: name of influxdb organization
:param bucket_name: bucket where data will be stored
:return: None
"""
write_client = influxdb_client.InfluxDBClient(
url=url,
token=token,
org=org
)
# calculate std and create data point
measurement_name = 'std_of_time_series'
field_name = 'std_deviation'
std_deviation = calculate_std(time_serie_data)
std_deviation_point = (
Point(measurement_name)
.field(field_name, std_deviation)
)
# store data point to corresponding bucket
write_api = write_client.write_api(write_options=SYNCHRONOUS)
write_api.write(bucket=bucket_name,
org=org,
record=std_deviation_point)
Please register or sign in to comment