Skip to content
Snippets Groups Projects
Select Git revision
  • 9c1e02e3a30ec6030adf89b2a773beb3152a5cc8
  • develop default protected
  • 5.5
  • 5.1
  • master protected
  • deprecated/4-22
6 results

UniversalLogging.cpp

Blame
  • utils.py 7.61 KiB
    import atexit
    import csv
    import logging
    import os
    import sys
    import time
    import traceback
    from datetime import datetime
    from functools import wraps
    from multiprocessing import Process, Queue
    
    import numpy as np
    import pandas as pd
    import psutil
    import pytest
    import ujson
    
    from experiment_impact_tracker.data_utils import *
    from experiment_impact_tracker.data_utils import load_data_into_frame
    from experiment_impact_tracker.emissions.constants import PUE
    
    _timer = getattr(time, "monotonic", time.time)
    
    log = logging.getLogger(__name__)
    
    
    def get_timestamp(*args, **kwargs):
        now = datetime.now()
        timestamp = datetime.timestamp(now)
        return timestamp
    
    
    def get_flop_count_tensorflow(graph=None, session=None):
        import tensorflow as tf  # import within function so as not to require tf for package
        from tensorflow.python.framework import graph_util
    
        def load_pb(pb):
            with tf.gfile.GFile(pb, "rb") as f:
                graph_def = tf.GraphDef()
                graph_def.ParseFromString(f.read())
            with tf.Graph().as_default() as graph:
                tf.import_graph_def(graph_def, name="")
                return graph
    
        if graph is None and session is None:
            graph = tf.get_default_graph()
        if session is not None:
            graph = session.graph
    
        run_meta = tf.RunMetadata()
        opts = tf.profiler.ProfileOptionBuilder.float_operation()
    
        # We use the Keras session graph in the call to the profiler.
        flops = tf.profiler.profile(graph=graph, run_meta=run_meta, cmd="op", options=opts)
    
        return flops.total_float_ops  # Prints the "flops" of the model.
    
    
    def processify(func):
        """Decorator to run a function as a process.
        Be sure that every argument and the return value
        is *pickable*.
        The created process is joined, so the code does not
        run in parallel.
        """
    
        def process_func(q, *args, **kwargs):
            try:
                ret = func(q, *args, **kwargs)
            except Exception as e:
                ex_type, ex_value, tb = sys.exc_info()
                error = ex_type, ex_value, "".join(traceback.format_tb(tb))
                ret = None
                q.put((ret, error))
                raise e
            else:
                error = None
            q.put((ret, error))
    
        # register original function with different name
        # in sys.modules so it is pickable
        process_func.__name__ = func.__name__ + "processify_func"
        setattr(sys.modules[__name__], process_func.__name__, process_func)
    
        @wraps(func)
        def wrapper(*args, **kwargs):
            queue = Queue()  # not the same as a Queue.Queue()
            p = Process(target=process_func, args=[queue] + list(args), kwargs=kwargs)
            p.start()
            return p, queue
    
        return wrapper
    
    
    def _get_cpu_hours_from_per_process_data(json_array):
        latest_per_pid = {}
        for datapoint in json_array:
            cpu_point = datapoint.get("cpu_time_seconds", {})
            for pid, value in cpu_point.items():
                latest_per_pid[pid] = value["user"] + value["system"]
        return sum(latest_per_pid.values())
    
    
    def gather_additional_info(info, logdir):
        df, json_array = load_data_into_frame(logdir)
        cpu_seconds = _get_cpu_hours_from_per_process_data(json_array)
        exp_len = datetime.timestamp(info["experiment_end"]) - datetime.timestamp(
            info["experiment_start"]
        )
        exp_len_hours = exp_len / 3600.0
        # integrate power
        # https://electronics.stackexchange.com/questions/237025/converting-watt-values-over-time-to-kwh
        # multiply by carbon intensity to get Kg Carbon eq
        time_differences = df["timestamp"].diff()
        time_differences[0] = df["timestamp"][0] - datetime.timestamp(
            info["experiment_start"]
        )
    
        # Add final timestamp and extrapolate last row of power estimates
        time_differences.loc[len(time_differences)] = (
            datetime.timestamp(info["experiment_end"])
            - df["timestamp"][len(df["timestamp"]) - 1]
        )
    
        # elementwise multiplication and sum
        time_differences_in_hours = time_differences / 3600.0
        power_draw_rapl_kw = None
    
        if "rapl_estimated_attributable_power_draw" in df:
            power_draw_rapl_kw = df.get["rapl_estimated_attributable_power_draw"] / 1000.0
            power_draw_rapl_kw.loc[len(power_draw_rapl_kw)] = power_draw_rapl_kw.loc[
                len(power_draw_rapl_kw) - 1
            ]
        has_gpu = False
    
        if "gpu_info" in info.keys():
            has_gpu = True
            num_gpus = len(info["gpu_info"])
            nvidia_power_draw_kw = df["nvidia_estimated_attributable_power_draw"] / 1000.0
            nvidia_power_draw_kw.loc[len(nvidia_power_draw_kw)] = nvidia_power_draw_kw.loc[
                len(nvidia_power_draw_kw) - 1
            ]
            gpu_absolute_util = df["average_gpu_estimated_utilization_absolute"]
            gpu_absolute_util.loc[len(gpu_absolute_util)] = gpu_absolute_util.loc[
                len(gpu_absolute_util) - 1
            ]
    
            # elementwise multiplication and sum
            kw_hr_nvidia = np.multiply(time_differences_in_hours, nvidia_power_draw_kw)
    
        kw_hr_rapl = (
            np.multiply(time_differences_in_hours, power_draw_rapl_kw)
            if power_draw_rapl_kw
            else None
        )
    
        total_power_per_timestep = None
        if has_gpu:
            total_power_per_timestep = PUE * (kw_hr_nvidia + kw_hr_rapl)
        else:
            if kw_hr_rapl:
                total_power_per_timestep = PUE * (kw_hr_rapl)
    
        realtime_carbon = None
        total_power = None
        estimated_carbon_impact_grams = None
        if "realtime_carbon_intensity" in df:
            realtime_carbon = df["realtime_carbon_intensity"]
            realtime_carbon.loc[len(realtime_carbon)] = realtime_carbon.loc[
                len(realtime_carbon) - 1
            ]
            # If we lost some values due to network errors, forward fill the last available value.
            # Backfill in a second pass to get any values that haven't been picked up.
            # Then finally, if any values remain, replace with the region average.
            realtime_carbon = (
                pd.to_numeric(realtime_carbon, errors="coerce")
                .fillna(method="ffill")
                .fillna(method="bfill")
                .fillna(value=info["region_carbon_intensity_estimate"]["carbonIntensity"])
            )
            try:
                estimated_carbon_impact_grams_per_timestep = np.multiply(
                    total_power_per_timestep, realtime_carbon
                ) if total_power_per_timestep else None
            except:
                import pdb
    
                pdb.set_trace()
            estimated_carbon_impact_grams = estimated_carbon_impact_grams_per_timestep.sum() if estimated_carbon_impact_grams_per_timestep  else None
        else:
            if total_power_per_timestep:
                total_power = total_power_per_timestep.sum()
                estimated_carbon_impact_grams = (
                total_power * info["region_carbon_intensity_estimate"]["carbonIntensity"])
    
        estimated_carbon_impact_kg = estimated_carbon_impact_grams / 1000.0 if estimated_carbon_impact_grams else None
    
        cpu_hours = cpu_seconds / 3600.0
    
        data = { }
    
        if cpu_hours:
            data["cpu_hours"] = cpu_hours
        if estimated_carbon_impact_kg:
            data["estimated_carbon_impact_kg"] = estimated_carbon_impact_kg
        if total_power:
            data["total_power"] = total_power
        if kw_hr_rapl:
            data["kw_hr_cpu"] = kw_hr_rapl.sum()
        if exp_len_hours:
            data["exp_len_hours"] = exp_len_hours
    
        if has_gpu:
            # GPU-hours percent utilization * length of time utilized (assumes absolute utliziation)
            gpu_hours = (
                np.multiply(time_differences_in_hours, gpu_absolute_util).sum() * num_gpus
            )
            data.update({"gpu_hours": gpu_hours, "kw_hr_gpu": kw_hr_nvidia.sum()})
    
        if realtime_carbon is not None:
            data["average_realtime_carbon_intensity"] = realtime_carbon.mean()
    
        return data