Select Git revision
__init__.py 55.35 KiB
import typing
import sys
import os.path
import re
import ast
import csv
import copy
import datetime
import jinja2
import traceback
import numpy as np
#from .builtin import foo
from .log import log
import prule.builtin
import pint
import prule.debug
spec_names = ["sockets_per_node","cores_per_socket","threads_per_core","flop_rate_scalar","flop_rate_simd","memory_bandwidth","cores_per_node"]
job_names = ["job","hwthreads"]
#TODO: fix access to job metadata and known names
#TODO: What to do if rule requires certain metadata (e.g. hwthreads), but the field is not available
# Disable rule for this cluster. Disable rule for this job. Print reason in output.
#TODO: How to express conditional execution in rule?
# get(field, Fallback value) -> Would not fail in case of missing field. -> Makes static analysis more complicated. Try rule and dismiss on failure?
# Make certain values depending on boolean? (e.g. GPU Shader programming value1*boolean + value2*(!boolean))
#TODO: How to check if requirements are met for the rule? E.g. rule is for 1 node only, but job is on 2 nodes?
# Add a list of "requirements", an array of python expressions that need to evaluate to TRUE and only then the rule will be evaluated?
#TODO: how to make all python builtins functions available to jinja template rendering?
#TODO: Deal with samples not measured in a constant interval.
#TODO: Fix computation of resources in the `numthreads` attribute - partially allocated nodes lead to wrong numbers of e.g. sockets
#TODO: Heterogenous Jobs are split by SLURM into two different jobs and ClusterCockpit creates two different JobArchives; Merge JobArchives?
#TODO: Literal values in requirements are unit less. Comparison with unit-values will fail. Add "unit(value, unit_name)" method to attach unit to literal values?
#TODO: Optimisation: The Python json library loads the complete JSON file to memory, before parsing. This results in a memory usage of sizeof(JSON file)+sizeof(Python result structure). A different JSON library could parse the JSON file in chunks.
base_path = os.path.dirname(__file__)
unit_path = os.path.join(base_path, "units.txt")
try:
unit_registry = pint.UnitRegistry()
unit_registry.load_definitions(unit_path)
except Exception as e:
error_msg = "Error on loading pint unit file {}".format(unit_path)
log.print(log.error, error_msg)
log.print(log.error, str(e))
sys.exit(1)
# Create core mask string from arrays and topology
# Ff entry in mask array is >0 then the core is masked.
def core_mask_str(mask, cores_per_socket: int, sockets_per_node: int, memorydomains_per_node: int) -> str:
mstr = ""
cores_per_memorydomain = (cores_per_socket * sockets_per_node / memorydomains_per_node)
socket_cores = cores_per_socket
md_cores = 0
mstr += "|"
for cix in range(0, len(mask)):
mstr += "B" if mask[cix] > 0 else "-"
socket_cores -= 1
md_cores -= 1
if cix == len(mask)-1:
pass
elif (cix+1) % cores_per_memorydomain == 0 and (cix+1) % cores_per_socket != 0:
mstr += "||"
elif (cix+1) % cores_per_socket == 0:
mstr += "||||"
mstr += "|"
return mstr
# Generates the set of defined local names and used global names
def rule_used_names(rule: dict) -> typing.Tuple[list, dict]:
local_names = {}
global_names = {}
# iterate over all terms
for term in rule["terms"]:
#for (term_index, (term_var, term_str)) in enumerate(term):
# get term variable name
for (term_index, term_var) in enumerate(term):
#print("Term: ", term[term_var])
for node in ast.walk(ast.parse(term[term_var], mode='eval')):
if type(node) == ast.Name:
#print("Node Name: ", node.id)
if node.id not in local_names:
#raise Exception("In rule {} the term with index {} uses the unknown literal {}.".format(rule["name"], term_index, node.id))
global_names[node.id] = term_index
elif type(node) == ast.Attribute and False:
#print("Node Attribute: ", node.attr)
stack = []
attr = node
while attr != None:
if type(node) == ast.Attribute:
stack.append(attr.attr)
attr = attr.value
elif type(node) == ast.Name:
stack.append(attr.id)
attr = None
else:
attr = None
local_names[term_var] = True
return (list(local_names.keys()), global_names)
# Consistency check for parameter, rule and cluster specification
def configuration_check(parameters: dict, rules: list, clusters: list) -> None:
# check cluster topology specs
log.print_color(log.color.magenta, log.debug, "-"*25,"Load cluster specifications:","-"*25)
for cluster in clusters:
log.print(log.debug, cluster["name"])
for subCluster in cluster["subClusters"]:
log.print(log.debug, " ", "\""+str(subCluster["name"])+"\"", "topology:", ", ".join(subCluster["topology"].keys()))
# namespace conflicts
names = {} # dictionary of all names
# fill in all builtin names
for name in dir(prule.builtin):
names[name] = True
#print(name)
#TODO: fix import of names from builtin, use list of exported names or import all attributes?
# check parameters definitions
for p in parameters:
if p in names:
raise Exception("Parameter name {} already defined in rule variable namespace.".format(p))
names[p] = True
#print(p)
# check metric definitions from cluster spec
metric_names = {}
for cluster in clusters:
for m in cluster["metricConfig"]:
metric_names[m["name"]] = True
for metric in metric_names:
if metric in names:
raise Exception("Metric name {} already defined in rule variable namespace.".format(m["name"]))
names[metric] = True
#print(metric)
for sn in spec_names:
names[sn] = True
for jn in job_names:
names[jn] = True
# prepare input for rules to check available global variables
# check used names by rules
log.print_color(log.color.magenta, log.debug, "-"*25,"Load rules:","-"*25, log.color.reset)
for rule in rules:
log.print(log.debug, rule["name"])
try:
local_names, global_names = rule_used_names(rule)
except Exception as e:
log.print(log.error, "Error on checking rule \"{}\".".format(rule["name"]))
raise e
for g in global_names:
if g not in names:
#raise Exception("In rule {} the term with index {} uses the unknown literal {}.".format(rule["name"], global_names[g], g))
log.print_color(log.color.yellow, log.debug, "In rule \"{}\" the term with index {} uses the unknown literal \"{}\".".format(rule["name"], global_names[g], g))
job_meta_fields: list = [
]
# Dummy class to be able to define attributes on objects.
# This is not possible for objects created with object().
class JobMetadata:
pass
# The scope id vector identifies the scope of a specific metric.
# The ids can be used to accumulate samples from a level to values of a level above.
# The node id is identified by index of the hostname in the resource list.
# For the resource hierarchy, a metric for specific scope level should make the id in the levels above identifiable.
# E.g. if one knows the core id, then the memoryDomain and socket ids can be identified.
def get_scopeids(clusters: list, job_meta: dict, hostname: str, thread: typing.Optional[int] =None, core: typing.Optional[int] =None, memoryDomain: typing.Optional[int] =None, socket: typing.Optional[int] =None, node: typing.Optional[int] =None, accelerator: typing.Optional[int] =None) -> dict:
cluster: typing.Optional[dict] = None
for c in clusters:
if c["name"] == job_meta["cluster"]:
cluster = c
break
if cluster == None:
raise Exception("Cluster {} not found in cluster input.".format(job_meta["cluster"]))
subCluster: typing.Optional[dict] = None
for s in cluster["subClusters"]:
if s["name"] == job_meta["subCluster"]:
subCluster = s
break
if subCluster == None:
raise Exception("Subcluster {} not found in cluster {} input.".format(job_meta["subCluster"], job_meta["cluster"]))
topology = subCluster["topology"]
scopeIds = {}
if thread != None:
for ix,coreList in enumerate(topology["core"]):
if thread in coreList:
core = ix
break
for ix,numaList in enumerate(topology["memoryDomain"]):
if thread in numaList:
memoryDomain = ix
break
for ix,socketList in enumerate(topology["socket"]):
if thread in socketList:
socket = ix
break
if thread == None and core != None:
coreThread = topology["core"][core][0]
if memoryDomain == None:
for ix,numaList in enumerate(topology["memoryDomain"]):
if coreThread in numaList:
memoryDomain = ix
break
if socket == None:
for ix,socketList in enumerate(topology["socket"]):
if coreThread in socketList:
socket = ix
break
if thread == None and core == None and memoryDomain != None:
memThread = topology["memoryDomain"][memoryDomain][0]
if socket == None:
for ix,socketList in enumerate(topology["socket"]):
if memThread in socketList:
socket = ix
break
if accelerator != None:
for ix,a in enumerate(topology["accelerators"]):
if a["id"] == accelerator:
accelerator = ix
break
for ix,o in enumerate(job_meta["resources"]):
if o["hostname"] == hostname:
node = ix
break
scopeIds["thread"] = thread
scopeIds["core"] = core
scopeIds["memoryDomain"] = memoryDomain
scopeIds["socket"] = socket
scopeIds["node"] = node
scopeIds["accelerator"] = accelerator
return scopeIds
# example: gpu:a100:3(IDX:0-1,3),fpga:0
slurm_reg_res = re.compile("^([^:]+):((([^,:]*):)?([0-9]+))?(\\(IDX:([^)]+)\\))?,?")
def slurm_parse_resources(s: str) -> dict:
r = s
res = {}
while r != "":
m = slurm_reg_res.search(r)
if m == None:
break
g = m.groups()
name = g[0]
type = g[3]
count = g[4]
ids = g[6]
used = []
if ids != None:
if g[5].startswith("(IDX"):
for n in ids.split(","):
mix = n.find("-")
if mix == -1 and n != "N/A":
try:
used.append(int(n))
except:
pass
elif mix != -1:
used += list(range(int(n[:mix]),int(n[mix+1:])+1))
elif g[5].startswith("(CNT"):
count = ids
try:
count = int(count)
except:
count = 0
t = [name, type, count, used]
res[name] = t
r = r[m.end():]
return res
#print(slurm_parse_resources("gpu:a100:1(IDX:1)"))
#print(slurm_parse_resources(""))
def slurm_seq_cpuids(s: str) -> typing.List[int]:
cids = []
for term in s.split(","):
mix = term.find("-")
if mix == -1:
if term != "N/A":
try:
cids.append(int(term))
except:
pass
else:
cids += list( range( int(term[:mix]), int(term[mix+1:])+1 ) )
return cids
def slurm_seq_nodelist(pre: str, start: str, end: str) -> typing.List[str]:
#print(start,end)
res = []
slen = len(start)
for i in range(int(start), int(end)+1):
s = "{}".format(i)
s = pre + "0"*(slen-len(s)) + s
res.append(s)
return res
def slurm_expand_nodelist_num(pre: str, numlist: str) -> typing.List[str]:
#print(numlist)
res = []
nlist = numlist
while len(nlist) > 0:
re_numlist = reg_numlist.match(nlist)
if re_numlist != None:
start = re_numlist.groups()[0]
end = re_numlist.groups()[1]
if end == None:
res.append(pre+ start)
else:
res += slurm_seq_nodelist(pre, start, end[1:])
nlist = re_numlist.groups()[2]
else:
res.append(pre + nlist)
break
return res
reg_numlist=re.compile("^([0-9]+)(-[0-9]+)?,?(.*)?$")
reg_nodelist=re.compile("^([^ [,]+)(\\[([^]]+)])?,?(.*)$")
def slurm_expand_nodelist(nodes: str) -> list:
#print(nodes)
res = []
nlist = nodes
while len(nlist) > 0:
re_nodelist = reg_nodelist.match(nlist)
#print(nlist, re_nodelist)
if re_nodelist != None:
re_groups = re_nodelist.groups()
pre = re_groups[0]
numlist = re_groups[2]
if numlist == None:
res.append(pre)
break
else:
res += slurm_expand_nodelist_num(pre, numlist)
#print(nlist, re_groups)
if len(re_groups) > 2:
nlist = re_groups[3]
else:
break
#print(res)
return res
def parse_slurminfo(info: str) -> dict:
slurm: dict = {}
for l in info.split("\n"):
if l == "":
break # end of slurm info
if l.strip() == "":
break # empty line
if l.startswith(" "): # node info
# example: Nodes=n2gpu12[04-07] CPU_IDs=0-127 Mem=2048 GRES=gpu:a100(CNT:2)
node = {}
for t in l.strip().split(" "):
k,v = t.split("=")
node[k] = v
nlist = slurm_expand_nodelist(node["Nodes"])
if "Nodes" not in slurm:
slurm["Nodes"] = []
for n in nlist:
ncopy = copy.deepcopy(node)
ncopy["Nodes"] = n
slurm["Nodes"].append(ncopy)
elif l.startswith(" "): # normal info
if l.startswith(" WorkDir="):
slurm["WorkDir"] = l[11:].strip("\n")
elif l.startswith(" Command="):
slurm["Command"] = l[11:].strip("\n")
elif l.startswith(" TRES="):
slurm["TRES"] = l[8:].strip("\n")
elif l.startswith(" ReqTRES="):
slurm["ReqTRES"] = l[11:].strip("\n")
elif l.startswith(" AllocTRES="):
slurm["AllocTRES"] = l[13:].strip("\n")
elif l.startswith(" StdErr="):
slurm["StdErr"] = l[10:].strip("\n")
elif l.startswith(" StdOut="):
slurm["StdOut"] = l[10:].strip("\n")
elif l.startswith(" StdIn="):
slurm["StdIn"] = l[9:].strip("\n")
elif l.startswith(" Reservation="):
slurm["Reservation"] = l[15:].strip("\n")
elif l.startswith(" Comment="):
slurm["Comment"] = l[11:].strip("\n")
elif l.startswith(" TresPerTask="):
slurm["TresPerTask"] = l[15:].strip("\n")
else:
for t in l.strip().split(" "):
v_ix = t.find("=")
if t.find("=", v_ix+1) != -1:
log.print_color(log.color.yellow, log.warn, "Unknown field during SLURM info parsing: {}".format(l))
k = t[:v_ix]
v = t[v_ix+1:]
slurm[k] = v
elif l.startswith("JobId="): # first line
# first line
slurm["JobId"] = int(l[l.find("=")+1: l.find(" ")])
slurm["JobName"] = l[l.find("=", l.find(" "))+1: ].strip()
return slurm
def parse_slurm_size(size: str) -> float:
num = 0.0
if size.endswith("K"):
num = float(size[:-1]) * 1024
elif size.endswith("M"):
num = float(size[:-1]) * 1024 * 1024
elif size.endswith("G"):
num = float(size[:-1]) * 1024 * 1024 * 1024
elif size.endswith("T"):
num = float(size[:-1]) * 1024 * 1024 * 1024 * 1024
else: # default is MiB
num = float(size) * 1024 * 1024
return num
# Prepare variables available for rule evaluation.
# Variables generated from:
# - parameters
# - cluster specification
# - job meta data
# - job measurement data
def rule_prepare_input(parameters: dict, rules: list, clusters: list, job_meta: dict, job_data: dict) -> dict:
globals = {}
#globals["mean"] = builtin.mean
# add definitions from prule builtins
for k in prule.builtin.public:
globals[k] = getattr(prule.builtin, k)
for key, value in parameters.items():
if key == "job_requirements":
continue
if type(value) in [int,float]:
globals[key] = unit_registry.parse_expression(str(value))
else:
globals[key] = unit_registry.parse_expression(value)
for sn in spec_names:
globals[sn] = True
#for jn in job_names:
# globals[jn] = True
def quantity_create(value, unit=None) -> pint.Quantity:
if unit == None:
if type(value) == str:
return unit_registry.parse_expression(value)
else:
raise Exception("Unit creation: expected string, got {}".format(value))
else:
if type(value) == str and type(unit) == str:
return unit_registry.parse_expression(value+" "+unit)
elif type(unit) == str:
return unit_registry.Quantity(value, unit_registry.parse_units(unit))
else:
raise Exception("Unit creation: expected string for unit, got {}".format(unit))
globals["quantity"] = quantity_create
# prepare job metadata
job: typing.Any = JobMetadata()
# copy all attributes from json to job object
for attr in job_meta:
if type(job_meta[attr]) not in [dict,list]:
#print(attr)
setattr(job, attr, job_meta[attr])
# parse slurminfo if available
slurminfo = None
if "metaData" in job_meta and "slurmInfo" in job_meta["metaData"]:
slurminfo = parse_slurminfo(job_meta["metaData"]["slurmInfo"])
setattr(job, "slurminfo", slurminfo)
# parse "allocated_memory" per node from slurminfo
# "allocated_memory" (if specified) is an array with allocated memory size per node in Bytes
if slurminfo != None:
alloc = []
if "Nodes" in slurminfo:
for h in job_meta["resources"]:
for n in slurminfo["Nodes"]:
if "Nodes" in n and "Mem" in n and n["Nodes"] == h["hostname"]:
alloc.append(parse_slurm_size(n["Mem"]))
break
elif "MinMemoryNode" in slurminfo: # --mem=
size = parse_slurm_size(slurminfo["MinMemoryNode"])
for h in job_meta["resources"]:
alloc.append(size)
elif "MinMemoryCPU" in slurminfo: # --mem-per-cpu=
size = parse_slurm_size(slurminfo["MinMemoryCPU"])
for h in job_meta["resources"]:
if "hwthreads" in h and type(h["hwthreads"]) == list:
alloc.append(size * len(h["hwthreads"]))
elif "MinMemoryGPU" in slurminfo: # --mem-per-gpu=
size = parse_slurm_size(slurminfo["MinMemoryGPU"])
for h in job_meta["resources"]:
if "accelerators" in h and type(h["accelerators"]) == list:
alloc.append(size * len(h["accelerators"]))
if len(alloc) > 0:
alloc_arr = np.zeros(shape=(1,len(alloc)))
for ix,s in enumerate(alloc):
if s == 0:
# TODO: set max memory
pass
#alloc_arr[0,ix] = unit_registry.Quantity(s, unit_registry.B)
alloc_arr[0,ix] = s
alloc_arr = unit_registry.Quantity(alloc_arr, unit_registry.B) # type: ignore
setattr(job, "allocated_memory", alloc_arr)
# metadata conversion
setattr(job, "walltime", unit_registry.Quantity(job.walltime,unit_registry.s)) # type: ignore
setattr(job, "duration", unit_registry.Quantity(job.duration,unit_registry.s)) # type: ignore
# SMT enabled?
cluster = None
subcluster = None
threadsPerCore = None
for p_cluster in clusters:
if p_cluster["name"] != job_meta["cluster"]:
continue
cluster = p_cluster
for p_subcluster in p_cluster["subClusters"]:
if p_subcluster["name"] != job_meta["subCluster"]:
continue
subcluster = p_subcluster
threadsPerCore = subcluster["threadsPerCore"]
if cluster == None:
raise Exception("Job runs on an unknown cluster: "+str(job_meta["cluster"]))
if subcluster == None:
raise Exception("Job runs on an unknown subcluster: "+str(job_meta["subCluster"]))
# check if resources are set
missing_resources = False
for host_resource in job_meta["resources"]:
hwthreads_missing = "hwthreads" not in host_resource or host_resource["hwthreads"] == None or len(host_resource["hwthreads"]) == 0
accelerators_missing = "accelerators" not in host_resource or host_resource["accelerators"] == None or len(host_resource["accelerators"]) == 0
missing_resources = missing_resources or hwthreads_missing or ("numAcc" in job_meta and job_meta["numAcc"] > 1 and accelerators_missing)
# assumption: either the array is correct or completely missing
if missing_resources == True:
break
# if resources are incomplete, reconstruct them from data
resources = []
if missing_resources == True:
log.print_color(log.color.yellow, log.warn, "Warning: resources, such as used hwthreads and accelerators, not entirely specified in job metadata. Reconstructing from job measurements.")
host_resources: dict = {}
for metric in job_data:
metric_data = job_data[metric]
for scope in metric_data:
scope_data = metric_data[scope]
for series_data in scope_data["series"]:
hostname = series_data["hostname"]
if hostname not in host_resources:
host_resources[hostname] = {}
if "id" not in series_data:
continue
id = series_data["id"]
if scope != "node" and scope != "accelerator":
id = int(id)
if scope not in host_resources[hostname]:
host_resources[hostname][scope] = []
if id not in host_resources[hostname][scope]:
host_resources[hostname][scope].append(id)
metadata_missing_resources = False
for hostname in host_resources:
host_res = {}
host_res["hostname"] = hostname
if "accelerator" in host_resources[hostname]:
host_res["accelerators"] = host_resources[hostname]["accelerator"]
if threadsPerCore == 1 and "core" in host_resources[hostname]:
host_res["hwthreads"] = host_resources[hostname]["core"]
elif "hwthread" in host_resources[hostname]:
host_res["hwthreads"] = host_resources[hostname]["hwthread"]
else:
metadata_missing_resources = True
log.print_color(log.color.yellow, log.warn, "Failed to reconstruct list of hwthread ids from job measurements. Host {}".format(hostname))
resources.append(host_res)
if metadata_missing_resources == True: # try to use slurminfo
if slurminfo != None and "Nodes" in slurminfo:
for node in slurminfo["Nodes"]:
host_res = {}
host_res["hostname"] = node["Nodes"]
if "CPU_IDs" in node:
cpuids = slurm_seq_cpuids(node["CPU_IDs"])
host_res["hwthreads"] = cpuids if len(cpuids)>0 else None
else:
host_res["hwthreads"] = None
if "GRES" in node and len(node["GRES"]) > 0:
gres = slurm_parse_resources(node["GRES"])
# Assume only one type of accelerators exists and just pick the id according to the IDX
idlist = []
idlist_all = []
if "accelerators" in subcluster["topology"] and subcluster["topology"]["accelerators"] != None:
for acc in subcluster["topology"]["accelerators"]:
idlist_all.append(acc["id"])
for restype,res in gres.items():
# res = (type, model, count, array of indices), e.g. ['gpu', 'a100', 1, [1]]
if res[2] == 0:
continue
for idx in res[3]:
if idx < len(idlist_all):
idlist.append(idlist_all[idx])
host_res["accelerators"] = idlist if len(idlist)>0 else None
else:
host_res["accelerators"] = None
# check if node is already present in resources
old_res = None
for noderes in resources:
if noderes["hostname"] == host_res["hostname"]:
old_res = noderes
break
if old_res != None:
# enrich present resource object
if "hwthreads" not in old_res or old_res["hwthreads"] == None or len(old_res["hwthreads"])<len(host_res["hwthreads"]):
old_res["hwthreads"] = host_res["hwthreads"]
if "accelerators" not in old_res or old_res["accelerators"] == None or len(old_res["accelerators"])<len(host_res["accelerators"]):
old_res["accelerators"] = host_res["accelerators"]
else:
# add new resource object
if host_res["hwthreads"] == None:
raise Exception("Error: failed to reconstruct list of hwthread ids from job measurements. Reading slurminfo.Host {}".format(host_res["hostname"]))
resources.append(host_res)
else:
raise Exception("Error: failed to reconstruct list of hwthread ids from job measurements. No slurminfo")
job_meta["resources"] = resources
# check if numHwthreads is available, else compute it (numHwthreads is not required according to job-meta schema)
if "numHwthreads" not in job_meta:
job.numHwthreads = 0
for r in job_meta["resources"]:
job.numHwthreads += len(r["hwthreads"])
# add units
#job.duration = unit_registry.Quantity(job.duration, unit_registry.s)
# add scope specific thread numbers
numthreads: typing.Any = JobMetadata()
# threads per node
memoryDomainsPerNode = len(subcluster["topology"]["memoryDomain"]) if "memoryDomain" in subcluster["topology"] else 1
diesPerNode = len(subcluster["topology"]["die"]) if "die" in subcluster["topology"] else 1
acceleratorsPerNode = len(subcluster["topology"]["accelerators"]) if "accelerators" in subcluster["topology"] and subcluster["topology"]["accelerators"] != None else 0
numthreads.job = np.array([job.numHwthreads])
numthreads.node = np.zeros([job.numNodes])
numthreads.socket = np.zeros([subcluster["socketsPerNode"] * job.numNodes]) # TODO: number of actually used sockets depends not on the number of nodes, but on the x
numthreads.core = np.zeros([subcluster["coresPerSocket"] * subcluster["socketsPerNode"] * job.numNodes])
numthreads.die = np.zeros([diesPerNode * job.numNodes])
numthreads.memoryDomain = np.zeros([memoryDomainsPerNode * job.numNodes])
numthreads.accelerator = np.zeros([job.numNodes])
coresPerNode = subcluster["coresPerSocket"] * subcluster["socketsPerNode"] # Assume all used nodes belong to subcluster
exclusive_check = True # Assume nodes are all exclusive, check in following loop
realSockets = 0 # number of sockets that have at least one allocated core (according to resource meta data)
for nix,r in enumerate(job_meta["resources"]):
# node
if "hwthreads" in r:
numthreads.node[nix] = len(r["hwthreads"])
if len(r["hwthreads"]) < coresPerNode:
exclusive_check = exclusive_check and False
else:
# TODO: do something smarter in this case, although "hwthreads" attribute is not required
log.print(log.error, "hwthreads attribute missing in meta->resources JSON")
raise Exception("hwthreads attribute missing in meta->resources JSON")
# core
for cix,c in enumerate(subcluster["topology"]["core"]):
for hwt in c:
if hwt in r["hwthreads"]:
numthreads.core[cix + (subcluster["coresPerSocket"] * subcluster["socketsPerNode"]) * nix ] += 1
# socket
for six in range(0,subcluster["socketsPerNode"]):
coresInSocketSum = numthreads.core[(subcluster["coresPerSocket"] * subcluster["socketsPerNode"]) * nix + (subcluster["coresPerSocket"]) * six : (subcluster["coresPerSocket"] * subcluster["socketsPerNode"]) * nix + (subcluster["coresPerSocket"]) * (six+1) ].sum()
numthreads.socket[six + (subcluster["socketsPerNode"]) * nix] = coresInSocketSum
if coresInSocketSum > 0:
realSockets += 1
# memoryDomain
if memoryDomainsPerNode == 1:
numthreads.memoryDomain[nix] = numthreads.node[nix]
else:
for mix,m in enumerate(subcluster["topology"]["memoryDomain"]):
for hwt in m:
if hwt in r["hwthreads"]:
numthreads.memoryDomain[mix + (memoryDomainsPerNode) * nix ] += 1
# die
if diesPerNode == 1:
numthreads.die[nix] = numthreads.node[nix]
else:
for dix,d in enumerate(subcluster["topology"]["die"]):
for hwt in d:
if hwt in r["numHwthreads"]:
numthreads.die[mix + (diesPerNode) * nix ] += 1
# accelerators
if acceleratorsPerNode > 0 and "accelerators" in r:
numthreads.accelerator[nix] = len(r["accelerators"])
if exclusive_check == True and getattr(job, "exclusive") != 1:
setattr(job, "exclusive", 1)
job_meta["exclusive"] = 1
log.print_color(log.color.yellow, log.warn, "Overwrite job's exclusive state.")
# save real socket number to meta data
job_meta["sockets_real"] = realSockets
# print
log.print_color(log.color.magenta, log.debug, "-"*25,"Job allocated hwthreads:","-"*25)
log.print(log.debug, "Job: ",numthreads.job)
log.print(log.debug, "Node: ",numthreads.node)
log.print(log.debug, "Socket: ",numthreads.socket)
log.print(log.debug, "Core: ",numthreads.core)
log.print(log.debug, "Die: ",numthreads.die)
log.print(log.debug, "MemoryDomain:",numthreads.memoryDomain)
log.print(log.debug, "Accelerators:",numthreads.accelerator)
for nix,r in enumerate(job_meta["resources"]):
# cstr=""
# for cix in range(0, subcluster["coresPerSocket"] * subcluster["socketsPerNode"]):
# if numthreads.core[ cix + nix*subcluster["coresPerSocket"] * subcluster["socketsPerNode"] ] > 0:
# cstr += "B"
# else:
# cstr += "-"
coresPerNode = subcluster["coresPerSocket"] * subcluster["socketsPerNode"]
cstr = core_mask_str(numthreads.core[nix*coresPerNode:(nix+1)*coresPerNode], subcluster["coresPerSocket"], subcluster["socketsPerNode"], memoryDomainsPerNode)
log.print(log.debug, "{:10s}: {}".format(r["hostname"] if "hostname" in r else "???", cstr))
globals["job"] = job
globals["numthreads"] = numthreads
# sort metric
# this is important to guarantee consistency with topology
for m,scope_data in job_data.items():
metric_scopes = []
for scope,data in scope_data.items():
if len(data["series"]) == 1:
continue
metric_scopes.append(scope)
if "id" in data["series"][0]:
series_sorted = sorted(data["series"], key=lambda x: x["id"])
data["series"] = series_sorted
elif scope == "node" and "hostname" in data["series"][0]:
series_sorted = []
for h in job_meta["resources"]:
for series in data["series"]:
if series["hostname"] == h["hostname"]:
series_sorted.append(series)
break
data["series"] = series_sorted
else:
log.print_color(log.color.red, log.warn, "Metric {} series {} is not sortable?".format(m, scope))
# check necessary metrics
ignore_metrics = True
required_metrics = []
for r in rules:
if "metrics" not in r:
log.print(log.debug, "Found rule \"{}\" and no required metrics.".format(r["Name"]))
ignore_metrics = False
log.print(log.debug, "No metrics will be ignored, because rule \"{}\" does not specify required metrics.".format(r["name"]))
break
else:
log.print(log.debug, "Found rule \"{}\" and required metrics: {}".format(r["name"], r["metrics"]))
for m in r["metrics"]:
if m not in required_metrics:
required_metrics.append(m)
if ignore_metrics == True:
log.print(log.info, "The following metrics are required and will be loaded: {}".format(required_metrics))
# add metrics
metric_names = {}
metric_units = {}
metric_max_sample_count = {}
metric_min_sample_count = {}
starttime = job_meta["startTime"] if type(job_meta["startTime"]) == int else int(datetime.datetime.fromisoformat(job_meta['startTime']).timestamp())
metric_nbytes = 0
for metric, scope_data in job_data.items():
# early skip of metrics
if ignore_metrics == True and metric not in required_metrics:
log.print(log.debug, "Ignore metric \"{}\", because not required by any rule.".format(metric))
continue
#TODO: expecting only one scope per metric!
scope = list(scope_data.keys())[0]
metric_names[metric] = [scope]
data = list(scope_data.values())[0]
dataseries = []
metadataseries = []
columnlabel = []
min_samples = sys.maxsize
max_samples = 0
unit_name = data["unit"]["base"]
if "prefix" in data["unit"]:
unit_name = data["unit"]["prefix"] + unit_name
if unit_name == "%":
unit_name = "percent"
unit = unit_registry.parse_expression(unit_name)
metric_units[metric] = unit
# turn measurement data to numpy arrays
for series_ix,series in enumerate(data["series"]):
metadata = {}
metadata["metric"] = metric
metadata["hostname"] = series["hostname"]
if scope != "node" and scope != "accelerator":
metadata["id"] = int(series["id"])
else:
metadata["id"] = series_ix
metadata["scope"] = scope
metadata["unit_name"] = unit_name
# compute scope ids
scopeids = {}
scopeids[scope] = metadata["id"]
scopeids = get_scopeids(clusters, job_meta, metadata["hostname"], **scopeids)
metadata["scopeids"] = scopeids
column_data = series["data"]
max_samples = max(max_samples, len(column_data))
min_samples = min(min_samples, len(column_data))
# create array
array = np.array(column_data)
dataseries.append(array)
metadataseries.append(metadata)
columnlabel.append(metric+"_"+str(metadata["id"]))
# generate timestep column values
# TODO: specify how timestamps are given for measurements with a irregular sampling interval
if "timeseries" not in data:
timedata = []
timestep = data["timestep"]
for step in range(0,max_samples):
#TODO: time as timestamps?
timedata.append((starttime + timestep*step)*1000000000)
else:
timedata = data["timeseries"]
timeseries = np.array(timedata)
timeseries = unit_registry.Quantity(timeseries, unit_registry.parse_expression("ns")) # type: ignore
# check sample count
min_samples = sys.maxsize
max_samples = 0
for column in dataseries:
min_samples = min(min_samples, len(column))
max_samples = max(max_samples, len(column))
if max_samples - min_samples > 2:
log.print_color(log.color.yellow, log.warn, "For metric \"{}\" the difference between sample counts is {}.".format(metric, max_samples-min_samples))
if min_samples == 0:
log.print_color(log.color.yellow, log.warn, "For metric \"{}\" some series contain no samples. Ignoring this metric! min samples: {} max samples: {}.".format(metric, min_samples, max_samples))
continue
# extend missing samples at the end with null values
for ix in range(0,len(dataseries)):
if len(dataseries[ix]) < max_samples:
dataseries[ix] = np.append(dataseries[ix], [np.nan] * (max_samples-len(dataseries[ix])), axis=0)
# interpolate missing/nan samples
# if first/last sample is missing, use 0.0 value
for ix in range(0,len(dataseries)):
a = None
b = None
none_count = 0
# iterate over samples and find NaN sequences
for i,v in enumerate(dataseries[ix]):
#print(ix, i, v, type(v), v != None and np.isnan(v)) # debugging
if (v == None or np.isnan(v)) and a == None: # start of NaN sequence
a = i
elif (v != None and np.isnan(v) == False) and a != None: # end of NaN sequence
b = i-1
if (v == None or np.isnan(v)) and a != None and i == len(dataseries[ix])-1: # NaN sequence at end of series
b = i
if a != None and b != None: # found NaN sequence
#print("fix sequence ", a, b, " of", metadataseries[ix]["metric"]) # debugging
none_count += b-a if b != a else 1
if a == 0: # sequence at start, set to 0.0
dataseries[ix][a] = 0.0
else:
a = a-1
if b == len(dataseries[ix])-1: # sequence at end, set to 0.0
dataseries[ix][b] = 0.0
else:
b = b+1
if a == b:
dataseries[ix][a] = 0.0
a = None
b = None
continue
base = dataseries[ix][a]
step = (dataseries[ix][b] - dataseries[ix][a]) / (b-a)
for s in range(1,b-a):
dataseries[ix][a+s] = base + step*s
a = None
b = None
if none_count > 0:
log.print_color(log.color.yellow, log.warn, "For metric \"{}\" {} of {} NaN/None value found and interpolated.".format(metric, none_count, len(dataseries[ix])))
metric_min_sample_count[metric] = min_samples
metric_max_sample_count[metric] = max_samples
data_array = np.column_stack(dataseries)
# if the array contained None values at the start, then the numpy array will have dtype "object" and some methods, e.g. sqrt, will fail
# set type to float
if data_array.dtype == object:
data_array = data_array.astype(float)
data_array = unit_registry.Quantity(data_array, unit) # type: ignore
data = prule.builtin.Data(data_array, timeseries, metadataseries)
#TODO: sort columns by scope id, e.g. scope=="core", 1. column time, rest: core id
data.set_writeable(False)
# Save dataframe as global variable
globals[metric] = data
metric_nbytes += data.nbytes()
job_meta["metric_min_sample_count"] = metric_min_sample_count
job_meta["metric_max_sample_count"] = metric_max_sample_count
log.print_color(log.color.magenta, log.debug, "-"*25,"Load metrics:","-"*25)
max_metric_name = 0
for k in metric_names.keys():
max_metric_name = max(max_metric_name, len(k))
for k,v in metric_names.items():
if k in globals:
log.print(log.debug, k+" "*(max_metric_name-len(k)),": ", ", ".join(v)+" min_count:{} max_count:{} ".format(metric_min_sample_count[k],metric_max_sample_count[k])+str(globals[k])+" "+str(metric_units[k]))
log.print(log.debug, "Globals Size: numpy nbytes for metrics: ", str(metric_nbytes))
return globals
def debug_rule_store_terms(rule_name, term_index, var_name, var_value, debug_settings) -> None:
if debug_settings == None or "debug_log_terms_dir" not in debug_settings:
return
outfile = "{}_{}_{}.csv".format(rule_name, term_index, var_name)
debug_write_file(outfile, var_value)
def debug_write_file(outfile, value) -> None:
if prule.debug.debug_settings == None:
return
outpath = os.path.join(prule.debug.debug_settings["debug_log_terms_dir"], outfile)
with open (outpath, "w", newline='') as csvfile:
cwriter = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
if type(value) == prule.builtin.Data:
for row in value.to_array():
cwriter.writerow(row)
else:
cwriter.writerow([value])
def debug_prompt(rule, rinput, job_meta, locals, local_names, globals, term_index, term, skip_prompt) -> bool:
if prule.debug.debug_settings == None:
return True
if skip_prompt == True:
return True
print("Next term:", term_index, term)
while True:
line = input("> ")
line = line.strip()
if line.startswith("!h") or line.startswith("help"):
print("""Commands:
!h - print this help text
!p EXPR - eval python expression
!pr - print current rule
!pt - print next term
!nt - continue with next term
!nr - continue until next rule
[VAR =] EXPR - eval term expression
!w FILENAME EXPR - write result to file (requires --debug-log-terms-dir)
FILENAME must not contain space or slash
!q - quit
""")
elif line.startswith("!q"):
sys.exit(0)
elif line.startswith("!nt"):
return False
elif line.startswith("!nr"):
return True
elif line.startswith("!pr"):
print(rule)
elif line.startswith("!pt"):
print(term_index, term)
elif line.startswith("!p"):
try:
eval(line[2:])
except Exception as e:
print(e)
elif line.startswith("!w"):
line = line[2:].strip()
ix = line.find(" ")
if ix == -1:
print("FILENAME or EXPRESSION missing")
continue
fname = line[:ix]
if " " in fname:
print("FILENAME", fname, "must not contain space")
continue
if "/" in fname:
print("FILENAME", fname, "must not contain slash")
continue
expr = line[ix:].strip()
try:
return_value = eval(expr, locals, globals)
print(type(return_value))
print(return_value)
if isinstance(return_value, prule.builtin.Data):
print(return_value.array)
debug_write_file(fname, return_value)
except Exception as e:
print(e)
else:
var_name = None
ex_line = line
ef = ex_line.find("=")
if ef != -1:
var_name = ex_line[:ef].strip()
ex_line = ex_line[ef+1:].strip()
try:
return_value = eval(ex_line, locals, globals)
print(type(return_value))
print(return_value)
if isinstance(return_value, prule.builtin.Data):
print(return_value.array)
if var_name != None:
locals[var_name] = return_value
if var_name not in local_names:
local_names[var_name] = True
except Exception as e:
print(e)
return False
# Evaluate rule.
# Iterate over terms and evaluate python expression.
# Track local and global variables.
def rule_evaluate(rule: dict, rinput: dict, job_meta: dict, check_requirements: bool) -> dict:
log.print_color(log.color.magenta+log.color.bold, log.debug, "#"*25,"Evaluate rule:",rule["name"],"#"*25)
output = {}
output["name"] = rule["name"]
output["tag"] = rule["tag"]
output["error"] = False
output["errors"] = []
output["match"] = False
output["evaluated"] = True
output["template"] = None
# check rule requirements
# parameters
if "parameters" in rule:
for p in rule["parameters"]:
if p not in rinput:
error_msg = "Rule \"{}\" will not be evaluated, because parameter \"{}\" is undefined.".format(rule["name"], p)
log.print(log.warn, error_msg)
output["errors"].append(error_msg)
output["evaluated"] = False
return output
# metrics
required_metrics_min_samples = sys.maxsize
if "metrics" in rule:
for m in rule["metrics"]:
if m not in rinput:
error_msg = "Rule \"{}\" will not be evaluated, because metric \"{}\" is undefined.".format(rule["name"], m)
log.print(log.warn, error_msg)
output["errors"].append(error_msg)
output["evaluated"] = False
return output
else:
required_metrics_min_samples = min(job_meta["metric_min_sample_count"][m], required_metrics_min_samples)
# prepare locals
locals_template: dict = {}
locals_template["required_metrics_min_samples"] = required_metrics_min_samples
# requirements
if "requirements" in rule and check_requirements == True:
for eix,expr in enumerate(rule["requirements"]):
locals = copy.deepcopy(locals_template)
globals = rinput
return_value = eval(expr, locals, globals)
if type(return_value) is not bool:
error_msg = "Requirement expression number {}: \"{}\" does not return a boolean value.".format(eix, expr)
log.print(log.warn, error_msg)
output["errors"].append(error_msg)
elif return_value == False:
error_msg = "Rule \"{}\" will not be evaluated, because requirement number {}: \"{}\" is not fullfilled.".format(rule["name"], eix, expr)
log.print(log.warn, error_msg)
output["errors"].append(error_msg)
output["evaluated"] = False
return output
# evaluate terms
locals = copy.deepcopy(locals_template)
globals = rinput
local_names: dict = {}
skip_prompt = False
for tix,term in enumerate(rule["terms"]):
skip_prompt = debug_prompt(rule, rinput, job_meta, locals, local_names, globals, tix, term, skip_prompt)
term_var = list(term.keys())[0]
term_str = term[term_var]
return_value = None
error = False
try:
if tix>0:
log.print(log.debug, "-"*10)
log.print_color(log.color.green, log.debug, "Term",tix,": ",log.color.reset, term_var,"=",term_str)
return_value = eval(term_str, locals, globals)
log.print_color(log.color.blue, log.debug, "Result for ",term_var,": ",type(return_value))
log.print_color(log.color.blue, log.debug, str(return_value))
if isinstance(return_value, prule.builtin.Data):
log.print_color(log.color.blue, log.debug, return_value.array)
debug_rule_store_terms(rule["tag"], tix, term_var, return_value, prule.debug.debug_settings)
except Exception as e:
error = True
error_msg = "Error while evaluating rule rule: {} TERM INDEX: {} TERM: {}".format(rule["name"], tix, term)
log.print(log.warn, error_msg)
output["errors"].append(error_msg + " ERROR: "+str(e))
log.print(log.debug, log.color.red, end="")
traceback.print_exc(file=log.file.debug)
log.print(log.debug, e)
log.print(log.debug, "Exception when evaluating rule: {}".format(rule["name"]))
log.print(log.debug, "Offending term: {} = {}".format(term_var, term_str))
log.print(log.debug, log.color.reset)
#raise Exception("Rule evaluation error")
if error == False:
locals[term_var] = return_value
if term_var not in local_names:
local_names[term_var] = True
skip_prompt = debug_prompt(rule, rinput, job_meta, locals, local_names, globals, -1, None, skip_prompt)
log.print_color(log.color.magenta, log.debug, "-"*25,"Local names","-"*25)
for name in local_names:
log.print(log.debug, log.color.blue + str(name) + log.color.reset, type(locals[name]))
log.print(log.debug, locals[name])
log.print_color(log.color.magenta, log.debug, "-"*25,"Output variables","-"*25)
if rule["output"] in locals:
log.print(log.debug, log.color.blue + "output: " + log.color.reset, rule["output"], type(locals[rule["output"]]))
log.print(log.debug, locals[rule["output"]])
output_value = None
if isinstance(locals[rule["output"]], prule.builtin.Data):
log.print(log.debug, locals[rule["output"]].array)
output_value = locals[rule["output"]]
try:
output_value = bool(output_value)
except:
output_value = None
error = True
error_msg = "Error on converting rule output to bool for rule {}".format(rule["name"])
output["errors"].append(error_msg)
log.print(log.error, error_msg)
elif isinstance(locals[rule["output"]], bool):
output_value = locals[rule["output"]]
elif isinstance(locals[rule["output"]], np.bool):
output_value = bool(locals[rule["output"]])
elif isinstance(locals[rule["output"]], list) and len(locals[rule["output"]]) == 1 and isinstance(locals[rule["output"]][0], bool):
output_value = locals[rule["output"]][0]
elif isinstance(locals[rule["output"]], np.ndarray):
if len(locals[rule["output"]]) != 1:
error = True
error_msg = "Error on converting rule output to bool for rule {}: size of output array is not 1".format(rule["name"])
output["errors"].append(error_msg)
log.print(log.error, error_msg)
else:
output_value = locals[rule["output"]][0]
try:
output_value = bool(output_value)
except:
output_value = None
error = True
error_msg = "Error on converting rule output to bool for rule {}".format(rule["name"])
output["errors"].append(error_msg)
log.print(log.error, error_msg)
else:
error = True
error_msg = "Error on converting rule output for rule {}: Unknown variable type {}".format(rule["name"], str(type(locals[rule["output"]])))
output["errors"].append(error_msg)
log.print(log.error, error_msg)
if output_value != None:
if output_value == True:
output["match"] = True
log.print(log.debug, "MATCH")
else:
output["match"] = False
log.print(log.debug, "NO MATCH")
else:
log.print(log.error, log.color.blue + "output: ", rule["output"], log.color.red + "Not found!" + log.color.reset)
error = True
error_msg = "Error on finding output value {} in result variables for rule {}: ".format(rule["output"], rule["name"])
output["errors"].append(error_msg)
log.print(log.error, error_msg)
# optional percentage of rule matching
if "output_scalar" in rule:
if rule["output_scalar"] in locals:
log.print(log.debug, log.color.blue + "output_scalar: " + log.color.reset, rule["output_scalar"], type(locals[rule["output_scalar"]]))
log.print(log.debug, locals[rule["output_scalar"]])
if isinstance(locals[rule["output_scalar"]], prule.builtin.Data):
log.print(log.debug, locals[rule["output_scalar"]].array)
scalar_tmp = locals[rule["output_scalar"]].array
if hasattr(scalar_tmp, 'magnitude') == True:
scalar_tmp = scalar_tmp.magnitude
if len(scalar_tmp) == 1:
scalar_tmp = float(scalar_tmp)
output["scalar"] = scalar_tmp
else:
error = True
error_msg = "Error on converting scalar output value {} for rule {}: multiple values: {} in variable of type {}".format(rule["output_scalar"], rule["name"], len(scalar_tmp), type(scalar_tmp))
output["errors"].append(error_msg)
log.print(log.error, error_msg)
output["scalar"] = None
else:
try:
scalar_value = float(locals[rule["output_scalar"]])
output["scalar"] = scalar_value
except:
log.print(log.debug, log.color.blue + "output: ", rule["output_scalar"], log.color.red + "Unknown type "+ str(type(locals[rule["output_scalar"]])) + " " + log.color.reset)
else:
log.print(log.debug, log.color.blue + "output: ", rule["output_scalar"], log.color.red + "Not found!" + log.color.reset)
# render template
log.print_color(log.color.magenta, log.debug, "-"*25,"Render template","-"*25)
template_env = jinja2.Environment()
template = template_env.from_string(rule["template"])
try:
gen_str = template.render(globals|locals|{"rule":rule},float=float) #TODO: how to make all python builtins functions available to template?
if output["match"] == True:
output["template"] = gen_str
log.print(log.debug, gen_str)
except Exception as e:
error = True
error_msg = "Error while rendering template for rule {}".format(rule["name"])
output["errors"].append(error_msg)
log.print(log.warn, error_msg+" ERROR: "+str(e))
log.print(log.debug, "Template string:")
log.print(log.debug, rule["template"])
log.print_color(log.color.red, log.debug, "Template error: "+str(e))
output["error"] = error
return output