Skip to content
Snippets Groups Projects

std_of_time_series

  • Clone with SSH
  • Clone with HTTPS
  • Embed
  • Share
    The snippet can be accessed without any authentication.
    Authored by Khoa Nguyen
    Edited
    std_of_time_series.py 1.31 KiB
    import statistics
    
    from influxdb_client_3 import SYNCHRONOUS, Point
    from influxdb_client_3.write_client.client import influxdb_client
    
    
    def calculate_std(time_series):
        if len(time_series) < 2:
            return None
        std_deviation = statistics.stdev(time_series)
        return std_deviation
    
    
    def std_deviation_to_influxdb(time_serie_data, url, token, org, bucket_name):
        """ Function to calculate standard deviation from time series and store it in influxdb
    
        :param time_serie_data: given time series data
        :param url: url of influxdb server
        :param token: token of influxdb client
        :param org: name of influxdb organization
        :param bucket_name: bucket where data will be stored
        :return: None
        """
        write_client = influxdb_client.InfluxDBClient(
            url=url,
            token=token,
            org=org
        )
    
        # calculate std and create data point
        measurement_name = 'std_of_time_series'
        field_name = 'std_deviation'
        std_deviation = calculate_std(time_serie_data)
        std_deviation_point = (
            Point(measurement_name)
            .field(field_name, std_deviation)
        )
    
        # store data point to corresponding bucket
        write_api = write_client.write_api(write_options=SYNCHRONOUS)
        write_api.write(bucket=bucket_name,
                        org=org,
                        record=std_deviation_point)
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Please register or to comment