import statistics from influxdb_client_3 import SYNCHRONOUS, Point from influxdb_client_3.write_client.client import influxdb_client def calculate_std(time_series): if len(time_series) < 2: return None std_deviation = statistics.stdev(time_series) return std_deviation def std_deviation_to_influxdb(time_serie_data, url, token, org, bucket_name): """ Function to calculate standard deviation from time series and store it in influxdb :param time_serie_data: given time series data :param url: url of influxdb server :param token: token of influxdb client :param org: name of influxdb organization :param bucket_name: bucket where data will be stored :return: None """ write_client = influxdb_client.InfluxDBClient( url=url, token=token, org=org ) # calculate std and create data point measurement_name = 'std_of_time_series' field_name = 'std_deviation' std_deviation = calculate_std(time_serie_data) std_deviation_point = ( Point(measurement_name) .field(field_name, std_deviation) ) # store data point to corresponding bucket write_api = write_client.write_api(write_options=SYNCHRONOUS) write_api.write(bucket=bucket_name, org=org, record=std_deviation_point)