Skip to content
Snippets Groups Projects
Commit 7a0bdba9 authored by Alex Wiens's avatar Alex Wiens
Browse files

prule.db,summary: Update summary

parent da2a23a8
No related branches found
No related tags found
No related merge requests found
...@@ -4,16 +4,32 @@ Common database functions for PRULE results database. ...@@ -4,16 +4,32 @@ Common database functions for PRULE results database.
import datetime import datetime
import sqlite3 import sqlite3
import prule.debug
import threading
import concurrent.futures
class ResultsDB: class ResultsDB:
def __init__(self, path): def __init__(self, path, trace_sql=None, ro=False):
self.path = path self.path = path
# check sqlite3.threadsafety to see if check_same_thread can be safely deactivated
if ro == True:
#self.con = sqlite3.connect("file:{}?mode=ro&immutable=1&nolock=1".format(path))
self.con = sqlite3.connect("file:{}?mode=ro&immutable=1".format(path))
else:
self.con = sqlite3.connect(path) self.con = sqlite3.connect(path)
if trace_sql != None:
self.con.set_trace_callback(trace_sql)
#self.config()
self.rule_dict = None self.rule_dict = None
def close(self): def close(self):
if self.con != None: if self.con != None:
self.con.close() self.con.close()
self.con = None self.con = None
def config(self):
cur = self.con.cursor()
cur.execute("PRAGMA cache_size=10000;")
self.con.commit()
def db_check(self): def db_check(self):
cur = self.con.cursor() cur = self.con.cursor()
res = cur.execute("SELECT name FROM sqlite_master") res = cur.execute("SELECT name FROM sqlite_master")
...@@ -365,3 +381,60 @@ rulename TEXT ...@@ -365,3 +381,60 @@ rulename TEXT
res = cur.execute('SELECT COUNT(*) FROM jobs where evaluated=1 and cluster=?;', [cluster]) res = cur.execute('SELECT COUNT(*) FROM jobs where evaluated=1 and cluster=?;', [cluster])
evaluated = res.fetchall()[0][0] evaluated = res.fetchall()[0][0]
return (evaluated, rule_match, rule_eval, rule_error) return (evaluated, rule_match, rule_eval, rule_error)
"""
# Use separate connection
mult = MultiDb(db_con)
future_result = mult.submitobj("all_results", db_con, prule.db.ResultsDB.db_get_all_results, columns=['COUNT(ccid)'], transform=False)
result = future_result.result()
# Use same connection
mult = MultiDb(db_con)
future_result = mult.submit("non_processed", db_con.db_get_all_results, columns=['COUNT(processed)'], conditions=["processed != 1"], transform=False)
result = future_result.result()
# Multiple queries
mult = MultiDb(db_con)
queries = {}
queries["one"] = mult.submit(...)
queries["two"] = mult.submit(...)
concurrent.futures.wait(queries.values())
one = queries["one"].result()
two = queries["two"].result()
# don't forget to close the pool
mult.shutdown()
"""
class MultiDb:
def __init__(self, db):
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4, initializer=MultiDb.init_thread)
self.db = db
def init_thread():
pid = os.getpid()
ident = threading.get_native_id()
aff = None
if hasattr(os, "sched_getaffinity"):
aff = os.sched_getaffinity(0)
print(pid, ident, aff)
def measure(name, fun, args, kwargs):
with prule.debug.Timing(name, True):
return fun(*args, **kwargs)
def measureobj(name, obj, fun, args, kwargs):
with prule.debug.Timing(name, True):
return fun(obj, *args, **kwargs)
def submit(self, name, fun, *args, **kwargs):
#fut = self.executor.submit(fun, arg)
fut = self.executor.submit(MultiDb.measure, name, fun, args, kwargs)
return fut
def submitobj(self, name, obj, fun, *args, **kwargs):
#fut = self.executor.submit(fun, arg)
newdb = prule.db.ResultsDB(self.db.path, ro=True)
fut = self.executor.submit(MultiDb.measureobj, name, newdb, fun, args, kwargs)
return fut
def shutdown(self):
self.executor.shutdown(wait=False)
...@@ -10,39 +10,47 @@ import re ...@@ -10,39 +10,47 @@ import re
import sqlite3 import sqlite3
#from .svg import svg_gen_daily_summary
import prule.db import prule.db
import prule.debug
#TODO: sorting
#TODO: make grouping optional
#TODO: resource hours
helptext="""Usage: helptext="""Usage:
The program reads entries from the prule database file and creates summaries. The program reads entries from the prule database file and creates summaries.
There are multiple parameters to control how the summary is done. There are multiple parameters to control how the summary is done.
1. Summarize by users or by accounts? 1. Time
--summary-group-type {user|account} (default: user) By default the 7 days before the database's last job's end time is considered.
If group type 'user' is used, the results can be subdivided into results per account. You can specify a different duration with --duration or use --start/--stop.
--summary-user-per-account
Combinations:
2. Only consider one specific user/account/cluster? Else there is no restriction. --duration 14-00 - Last 14 days of database.
--summary-user USER-NAME --start 2024-12-01 - From December 2024 until end of database.
--summary-account ACCOUNT-NAME --stop 2025-01-01 - From start of database until end of 2024.
--summary-cluster CLUSTER-NAME --start 2024-12-01 --duration 7-00 - First week of December 2024.
--stop 2024-12-01 --duration 0-12 - Last 12 hours of November 2024.
3. How to limit the considered jobs from the database? --start 2024-12-01 --stop 2024-12-02 - First day of December 2024.
Timebased limit considers all jobs after a certain time.
--summary-jobs-timeframe TIME (default: now-1week) 2. Filters
Countbased limit considers the last X jobs. Use --users, --accounts, --clusters to limit the summary to the specified groups.
--summary-jobs-count COUNT (default: 1000)
Examples:
4. Which timestamp to use for the limitation? --users usrtr001
Jobs have a start and a end timestamp. --users usrtr001,johndoe
Also this timestamp is used for sorting for the countbased selection. --clusters noctua1
--summary-jobs-context {start|end} (default: end) --clusters noctua1,noctua2
Note: still running jobs are not in the database. --accounts hpc-lco-usrtr
Therefore, if using disjunct intervals, some jobs might be overlooked. --accounts hpc-lco-usrtr,pc2-shks
5. In case of countbased selection, the set of users/accounts needs to be limited. 3. Details
This is done by first check the set of users/accounts active in a certain time interval Use --summary-overlap to account only the cpu hours that overlap with the timeframe.
and afterwards select the last X jobs of these users/accounts.
--summary-jobs-count-user-filter-timeframe TIME 4. Grouping
TODO
""" """
# non-capturing groups # non-capturing groups
...@@ -184,13 +192,185 @@ def test_parse_timeframe(): ...@@ -184,13 +192,185 @@ def test_parse_timeframe():
#test_parse_timeframe() #test_parse_timeframe()
# Acceptable time formats include "minutes", "minutes:seconds", "hours:minutes:seconds", "days-hours", "days-hours:minutes" and "days-hours:minutes:seconds".
re_num = "(?:[0-9]+)"
re_timeduration_min = re.compile("({minute:})".format(minute=re_num))
re_timeduration_minsec = re.compile("({minutes:}):({seconds:})".format(minutes=re_num, seconds=re_num))
re_timeduration_hourminsec = re.compile("({hours:}):({minutes:}):{seconds:}".format(hours=re_num, minutes=re_num, seconds=re_num))
re_timeduration_dayhour = re.compile("({days:})-({hours:})".format(days=re_num, hours=re_num))
re_timeduration_dayhourmin = re.compile("({days:})-({hours:}):({minutes:})".format(days=re_num, hours=re_num, minutes=re_num))
re_timeduration_dayhourminsec = re.compile("({days:})-({hours:}):({minutes:}):({seconds:})".format(days=re_num, hours=re_num, minutes=re_num, seconds=re_num))
def parse_timeduration(input):
input = input.lower().strip()
match = re_timeduration_min.fullmatch(input)
if match:
minutes = match.groups()
return datetime.timedelta(minutes=int(minutes))
match = re_timeduration_minsec.fullmatch(input)
if match:
minutes, seconds = match.groups()
return datetime.timedelta(minutes=int(minutes), seconds=int(seconds))
match = re_timeduration_hourminsec.fullmatch(input)
if match:
hours, minutes, seconds = match.groups()
return datetime.timedelta(hours=int(hours), minutes=int(minutes), seconds=int(seconds))
match = re_timeduration_dayhour.fullmatch(input)
if match:
days, hours = match.groups()
return datetime.timedelta(days=int(days), hours=int(hours))
match = re_timeduration_dayhourmin.fullmatch(input)
if match:
days, hours, minutes = match.groups()
return datetime.timedelta(days=int(days), hours=int(hours), minutes=int(minutes))
match = re_timeduration_dayhourminsec.fullmatch(input)
if match:
days, hours, minutes, seconds = match.groups()
return datetime.timedelta(days=int(days), hours=int(hours), minutes=int(minutes), seconds=int(seconds))
raise Exception("Timeduration '{:}' could not be parsed.".format(input))
def genTable(table, header=None, align=None, margin=1, header_line="="):
#columns = len(table[0])
columns = 0
for l in table:
columns = max(columns, len(l))
cmax = [0]*columns
for row in table:
for cix,col in enumerate(row):
cmax[cix] = max(cmax[cix], len(str(col)))
out = []
for rix,row in enumerate(table):
l = ""
for cix,col in enumerate(row):
if align==None or align=="left":
l += str(col) + " "*(cmax[cix]-len(str(col)))
elif align=="right":
l += " "*(cmax[cix]-len(str(col))) + str(col)
if cix < len(row)-1:
l += " "*margin
out += [l]
if header != None and rix == header-1:
l = ""
for cix, col in enumerate(cmax):
l += header_line*col
if cix < len(row)-1:
l += " "*margin
out += [l]
return out
#def analyse_user(user_name, jobs): #def analyse_user(user_name, jobs):
def info_print(db_con, args):
print("Database: {}".format(args["db_path"]))
if args["db_path"] != db_con.path:
print("DB Copy: {}".format(db_con.path))
with prule.debug.Timing("info_print db_get_clusters", prule.debug.debug_settings['print_timings']):
clusters = db_con.db_get_clusters()
print("Clusters: {} {}".format(len(clusters), ",".join(clusters)))
with prule.debug.Timing("info_print db_get_users", prule.debug.debug_settings['print_timings']):
users_all = db_con.db_get_users()
print("Users: {}".format(len(users_all)))
with prule.debug.Timing("info_print db_get_projects", prule.debug.debug_settings['print_timings']):
projects_all = db_con.db_get_projects()
print("Projects: {}".format(len(projects_all)))
with prule.debug.Timing("info_print jobs_count", prule.debug.debug_settings['print_timings']):
jobs_count = db_con.db_get_all_results(columns=['COUNT(ccid)'], transform=False)[0][0]
print("Jobs: {}".format(jobs_count))
with prule.debug.Timing("info_print non_processed", prule.debug.debug_settings['print_timings']):
non_processed = db_con.db_get_all_results(columns=['COUNT(processed)'], conditions=["processed != 1"], transform=False)[0][0]
print("Non-processed: {}".format(non_processed))
with prule.debug.Timing("info_print non_evaluated", prule.debug.debug_settings['print_timings']):
non_evaluated = db_con.db_get_all_results(columns=['COUNT(evaluated)'], conditions=["evaluated != 1"], transform=False)[0][0]
print("Non-evaluated: {}".format(non_evaluated))
with prule.debug.Timing("info_print db_first_result", prule.debug.debug_settings['print_timings']):
firstres = db_con.db_first_result()
if len(firstres)>0:
firstres = firstres[0]
start = firstres["start"]
stop = firstres["stop"]
start_s = datetime.datetime.fromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") if start != None else "None"
stop_s = datetime.datetime.fromtimestamp(stop).strftime("%Y-%m-%d %H:%M:%S") if stop != None else "None"
print("First job: {} - {} (start: {} stop: {})".format(start_s, stop_s, start, stop ))
with prule.debug.Timing("info_print db_last_result", prule.debug.debug_settings['print_timings']):
lastres = db_con.db_last_result()
if len(lastres)>0:
lastres = lastres[0]
start = lastres["start"]
stop = lastres["stop"]
start_s = datetime.datetime.fromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") if start != None else "None"
stop_s = datetime.datetime.fromtimestamp(stop).strftime("%Y-%m-%d %H:%M:%S") if stop != None else "None"
print("Last job: {} - {} (start: {} stop: {})".format(start_s, stop_s, start, stop))
with prule.debug.Timing("info_print started_earliest", prule.debug.debug_settings['print_timings']):
started_earliest = db_con.db_get_all_results(columns=['start','stop','MIN(start)'], conditions=["start IS NOT NULL"])[0]
if started_earliest != None:
start = started_earliest["start"]
stop = started_earliest["stop"]
start_s = datetime.datetime.fromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") if start != None else "None"
stop_s = datetime.datetime.fromtimestamp(stop).strftime("%Y-%m-%d %H:%M:%S") if stop != None else "None"
print("Earliest start: {} - {} (start: {} stop: {})".format(start_s, stop_s, start, stop))
with prule.debug.Timing("info_print started_latest", prule.debug.debug_settings['print_timings']):
started_latest = db_con.db_get_all_results(columns=['start','stop','MAX(start)'], conditions=["start IS NOT NULL"])[0]
if started_latest != None:
start = started_latest["start"]
stop = started_latest["stop"]
start_s = datetime.datetime.fromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") if start != None else "None"
stop_s = datetime.datetime.fromtimestamp(stop).strftime("%Y-%m-%d %H:%M:%S") if stop != None else "None"
print("Latest start: {} - {} (start: {} stop: {})".format(start_s, stop_s, start, stop))
with prule.debug.Timing("info_print finished_earliest", prule.debug.debug_settings['print_timings']):
finished_earliest = db_con.db_get_all_results(columns=['start','stop','ccid','MIN(stop)'], conditions=["stop IS NOT NULL"])[0]
if finished_earliest != None:
start = finished_earliest["start"]
stop = finished_earliest["stop"]
start_s = datetime.datetime.fromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") if start != None else "None"
stop_s = datetime.datetime.fromtimestamp(stop).strftime("%Y-%m-%d %H:%M:%S") if stop != None else "None"
ccid = finished_earliest["ccid"]
print("Earliest stop: {} - {} (start: {} stop: {} ccid: {})".format(start_s, stop_s, start, stop, ccid))
with prule.debug.Timing("info_print finished_latest", prule.debug.debug_settings['print_timings']):
finished_latest = db_con.db_get_all_results(columns=['start','stop','ccid','MAX(stop)'],conditions=["stop IS NOT NULL"])[0]
if finished_latest != None:
start = finished_latest["start"]
stop = finished_latest["stop"]
start_s = datetime.datetime.fromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") if start != None else "None"
stop_s = datetime.datetime.fromtimestamp(stop).strftime("%Y-%m-%d %H:%M:%S") if stop != None else "None"
ccid = finished_latest["ccid"]
print("Latest stop: {} - {} (start: {} stop: {} ccid: {})".format(start_s, stop_s, start, stop, ccid))
for cluster in clusters:
with prule.debug.Timing("info_print db_get_users", prule.debug.debug_settings['print_timings']):
users = db_con.db_get_all_results(columns=['COUNT(DISTINCT user)'], conditions=['cluster = "{}"'.format(cluster)], transform=False)[0][0]
with prule.debug.Timing("info_print db_get_projects", prule.debug.debug_settings['print_timings']):
projects = db_con.db_get_all_results(columns=['COUNT(DISTINCT project)'], conditions=['cluster = "{}"'.format(cluster)], transform=False)[0][0]
print("Cluster {}: Users {} Projects {}".format(cluster, users, projects))
def main(db_path, args): def main(db_path, args):
db_con = prule.db.ResultsDB(db_path) with prule.debug.Timing("main.ResultsDB", prule.debug.debug_settings['print_timings']):
trace_sql = None
if 'print_sql' in prule.debug.debug_settings:
trace_sql = lambda s : print("SQL: {}".format(s), file=sys.stderr)
db_con = prule.db.ResultsDB(db_path, trace_sql=trace_sql, ro=True)
try: try:
with prule.debug.Timing("main.db_integrity_check", prule.debug.debug_settings['print_timings']):
integrity_res = db_con.db_integrity_check() integrity_res = db_con.db_integrity_check()
if integrity_res == False: if integrity_res == False:
print("Database failed integrity check: {:}".format(db_path)) print("Database failed integrity check: {:}".format(db_path))
...@@ -202,86 +382,112 @@ def main(db_path, args): ...@@ -202,86 +382,112 @@ def main(db_path, args):
def main_db(db_con, args): def main_db(db_con, args):
#signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGINT, signal.SIGTERM]) #signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGINT, signal.SIGTERM])
if "info" in args:
with prule.debug.Timing("info_print", prule.debug.debug_settings['print_timings']):
info_print(db_con, args)
return
# which clusters to consider? # which clusters to consider?
clusters = [] clusters = []
if "summary_cluster" in args: if "clusters" in args:
clusters = [args["summary_cluster"]] clusters = [c.strip() for c in args["clusters"].split(",")]
else: else:
clusters = db_con.db_get_clusters() clusters = db_con.db_get_clusters()
# which users to consider?
users = None # all users
if "users" in args:
users = [u.strip() for u in args["users"].split(",")]
# which accounts to consider?
accounts = None # all accounts
if "accounts" in args:
accounts = [a.strip() for a in args["accounts"].split(",")]
jobs = {} jobs = {}
for cluster in clusters: for cluster in clusters:
jobs[cluster] = db_con.summary_cluster(cluster, args) with prule.debug.Timing("summary_cluster", prule.debug.debug_settings['print_timings']):
jobs[cluster] = summary_cluster(db_con, cluster, users, accounts, args)
# print results as CSV
results = [header] + results #if 'svg_file' in args:
for result in results: # svg_gen_daily_summary(db_con, jobs, args)
str_array = [str(r) for r in result] #else:
line = ",".join(str_array) # summary_print(db_con, jobs, args)
print(line)
def summary_cluster(db_con, cluster, args): def summary_cluster(db_con, cluster, users_list, accounts_list, args):
jobs = {} # group name -> job list # check timeframe for each cluster separately
group_type = args["summary_group_type"] # start of first job
group_name = args["summary_group"] if "summary_group" in args else None job_started_earliest = db_con.db_get_all_results(columns=['start','stop','MIN(start)'], conditions=["start IS NOT NULL", "cluster = '{}'".format(cluster)])[0]
group_user_account = True if "summary_user_per_account" in args else False first_start = datetime.datetime.fromtimestamp(job_started_earliest['start'])
time_context = args["summary_jobs_context"] # end of last job
job_finished_latest = db_con.db_get_all_results(columns=['start','stop','ccid','MAX(stop)'],conditions=["stop IS NOT NULL", "cluster = '{}'".format(cluster)])[0]
# get jobs by group name last_stop = datetime.datetime.fromtimestamp(job_finished_latest['stop'])
if "summary_jobs_timeframe" in args:
timeframe = int(parse_timeframe(args["summary_jobs_timeframe"]).timestamp()) # which timeframe to consider?
time_jobs = [] start = None
if group_name == None: stop = None
time_jobs = db_con.db_last_results_time(cluster, None, None, time_context, timeframe) if 'duration' in args:
else: duration = parse_timeduration(args['duration'])
time_jobs = db_con.db_last_results_time(cluster, group_type, group_name, time_context, timeframe) if 'start' in args:
if group_type == "user": start = parse_timeframe(args['start'])
for j in time_jobs: stop = start + duration
if j["user"] not in jobs: elif 'stop' in args:
jobs[j["user"]] = [j] stop = parse_timeframe(args['stop'])
else: start = stop - duration
jobs[j["user"]].append(j)
elif group_type == "account":
for j in time_jobs:
if j["project"] not in jobs:
jobs[j["project"]] = [j]
else:
jobs[j["project"]].append(j)
elif "summary_jobs_count" in args:
count = args["summary_jobs_count"]
filter_groups = []
# first get jobs to filter users/accounts
if group_name != None:
filter_groups = [group_name]
else: else:
filter_timeframe = int(parse_timeframe(args["summary_jobs_count_user_filter_timeframe"]).timestamp()) start = last_stop - duration
filter_jobs = db_con.db_last_results_time(cluster, None, None, time_context, filter_timeframe) stop = last_stop
filter_group_set = set() elif 'start' in args or 'stop' in args:
if group_type == "user": start = first_start
for j in filter_jobs: stop = last_stop
filter_group_set.add(j["user"]) if 'start' in args:
elif group_type == "account": start = parse_timeframe(args['start'])
for j in filter_jobs: if 'stop' in args:
filter_group_set.add(j["project"]) stop = parse_timeframe(args['stop'])
filter_groups = sorted(list(filter_group_set))
for n in filter_groups:
jobs[n] = db_con.db_last_results_num(cluster, group_type, n, time_context, count)
else: else:
raise Exception("Invalid summary_jobs_* parameter") # default: 7 days before last job ended
return jobs duration = datetime.timedelta(days=7)
start = last_stop - duration
def summary_print(db_con, jobs, args): stop = last_stop
group_type = args["summary_group_type"] if start == None or stop == None:
group_name = None raise Exception("Start or stop missing. This shouldn't have happened.")
if group_type == "account":
group_name = args["summary_account"] if "summary_account" in args else None
elif group_type == "user": start_ts = int(start.timestamp())
group_name = args["summary_user"] if "summary_user" in args else None stop_ts = int(stop.timestamp())
group_user_account = True if "summary_user_per_account" in args else False
cond = []
def gen_lines(db_con, jobs): cond.append('stop >= {}'.format(start_ts)) # consider overlap
# create summary by rule cond.append('start <= {}'.format(stop_ts)) # consider overlap
cond.append("cluster = '{}'".format(cluster))
if users_list != None:
cond.append("user IN ({:})".format(",".join(["\"{:}\"".format(u) for u in users_list])))
if accounts_list != None:
cond.append("project IN ({:})".format(",".join(["\"{:}\"".format(a) for a in accounts_list])))
results_it = db_con.db_get_all_results(conditions=cond, iterator=True)
results = {} # account -> user -> job
for j in results_it:
account = j['project']
user = j['user']
#filters should work with SQL conditions
#if accounts_list != None and account not in accounts_list:
# continue
#if users_list != None and user not in users_list:
# continue
if account not in results:
results[account] = {}
if user not in results[account]:
results[account][user] = []
results[account][user].append(j)
do_overlap = False
if 'summary_overlap' in args:
do_overlap = True
rules_dict = db_con.db_get_rule_dict() rules_dict = db_con.db_get_rule_dict()
rules_len = len(rules_dict) rules_len = len(rules_dict)
rule_names = [""]*rules_len rule_names = [""]*rules_len
...@@ -289,73 +495,75 @@ def summary_print(db_con, jobs, args): ...@@ -289,73 +495,75 @@ def summary_print(db_con, jobs, args):
rule_names[rules_dict[n]-1] = n.replace(' ','_') rule_names[rules_dict[n]-1] = n.replace(' ','_')
rule_names_match = ["rule_{:}_match".format(rule_i) for rule_i in range(1, rules_len+1)] rule_names_match = ["rule_{:}_match".format(rule_i) for rule_i in range(1, rules_len+1)]
# print header line c_total_jobs = 0
if group_type == "user" and group_user_account == True: c_total_cpuh = 0.0
header = [group_type, "cluster", "account", "total_jobs", "matched_jobs"] # columns: account/user, total (jobs/cpuh), matched (jobs/cpuh), rule(number/cpuh)
else: accounts = sorted(results.keys())
header = [group_type, "cluster", "total_jobs", "matched_jobs"] account_rows = []
for r in rule_names: for a in accounts:
header += [r] users = sorted(results[a].keys())
a_total_jobs = 0
# print group lines a_matched_jobs = 0
group_list = sorted(jobs.keys()) a_total_cpuh = 0
results = [] a_matched_cpuh = 0
for g in group_list: user_rows = []
group_jobs = jobs[g] for u in users:
match_num_acc = {} # account -> number of matched jobs u_total_jobs = 0
group_jobs_acc = {} # account -> total number of jobs u_matched_jobs = 0
rule_matched_acc = {} # account -> matched numbers for each rule u_total_cpuh = 0
for j in group_jobs: u_matched_cpuh = 0
matched = False rule_total = [(0.0,0.0)]*rules_len
match_group = "all" # catch all account name for j in results[a][u]:
if group_user_account == True: sec = j['duration']
match_group = j["project"] if do_overlap == True: # only consider overlap
if match_group not in rule_matched_acc: # account not yet dicts j_start = j['start']
rule_matched_acc[match_group] = [0]*rules_len j_stop = j['stop']
match_num_acc[match_group] = 0 if j_start < stop_ts and j_stop > start_ts:
group_jobs_acc[match_group] = 0 if j_start < start_ts or j_stop > stop_ts:
# update total job number o_start = max(j_start, start_ts)
group_jobs_acc[match_group] += 1 o_stop = min(j_stop, stop_ts)
# update matched rules numbers sec = o_stop - o_start
for rule_i, rule_match_str in enumerate(rule_names_match):
if j[rule_match_str] == 1:
rule_matched_acc[match_group][rule_i-1] += 1
matched = True
# update matched job number
if matched == True:
match_num_acc[match_group] += 1
accounts = sorted(rule_matched_acc.keys())
for account in accounts:
job_num = group_jobs_acc[account]
match_num = match_num_acc[account]
rule_matched = rule_matched_acc[account]
if group_type == "user":
if group_user_account == True:
res = [g, cluster, account, job_num, match_num]
else: else:
res = [g, cluster, job_num, match_num] # job duration falls completly into timeframe, use normal duration
pass
else: else:
res = [g, cluster, job_num, match_num] sec = 0.0
for m in rule_matched: hwt = j['num_hwthreads']
res += [m] cpuh = hwt * (sec/3600.0)
results.append(res) matches = 0
return header, results for rix,r in enumerate(rule_names_match):
match = j[r]
#return header, results if match == 1:
results = [] # list of result rows matches += 1
header = [] # header row rcount, rcpuh = rule_total[rix]
for cluster in jobs: rule_total[rix] = (rcount+1, rcpuh+cpuh)
header, res = gen_lines(db_con, jobs[cluster]) if matches > 0:
results += res u_matched_jobs += 1
u_matched_cpuh += cpuh
# print results as CSV u_total_jobs += 1
results = [header] + results u_total_cpuh += cpuh
for result in results: a_total_jobs += u_total_jobs
str_array = [str(r) for r in result] a_matched_jobs += u_matched_jobs
line = ",".join(str_array) a_total_cpuh += u_total_cpuh
print(line) a_matched_cpuh += u_matched_cpuh
rule_columns = [ "{:}/{:.2f}".format(rjobs,rcoreh) for rjobs,rcoreh in rule_total]
user_rows.append([u, "{:}/{:.2f}".format(u_total_jobs, u_total_cpuh), "{:}/{:.2f}".format(u_matched_jobs, u_matched_cpuh)] + rule_columns)
account_rows.append([a, "{:}/{:.2f}".format(a_total_jobs, a_total_cpuh), "{:}/{:.2f}".format(a_matched_jobs, a_matched_cpuh)])
account_rows += user_rows
c_total_jobs += a_total_jobs
c_total_cpuh += a_total_cpuh
# print header
header = ["account/user", "total (jobs/cpuh)", "matched (jobs/cpuh)"] + rule_names
# print rows
cluster_times = "Earliest: {:} {:} Latest: {:} {:}".format(
datetime.datetime.fromtimestamp(job_started_earliest['start']),
datetime.datetime.fromtimestamp(job_started_earliest['stop']),
datetime.datetime.fromtimestamp(job_finished_latest['start']),
datetime.datetime.fromtimestamp(job_finished_latest['stop']))
print("Summary:", cluster, start, stop, "Total jobs: {:}".format(c_total_jobs), "Total cpuh: {:.2f}".format(c_total_cpuh), cluster_times)
out = genTable([header]+account_rows, header=1)
for l in out:
print(l)
if __name__ == "__main__": if __name__ == "__main__":
...@@ -381,28 +589,45 @@ if __name__ == "__main__": ...@@ -381,28 +589,45 @@ if __name__ == "__main__":
"You may use this, if you are confident that file locking works with your file system. "+ "You may use this, if you are confident that file locking works with your file system. "+
"NFS apparently does have problems with that.") "NFS apparently does have problems with that.")
info_group = parser.add_argument_group('Info parameter',
'Info about the database file')
info_group.add_argument('--info', action='store_true',
help="Show general information about the file and quit.")
time_group = parser.add_argument_group('Time parameters',
'Configure the timeframe for summary. Only two of start, stop, duration can be used at once. Timeframe consideration is independent of users or accounts. Default is last 7 days of job data.')
time_group.add_argument('--duration', type=str, metavar='DURATION',
help="Duration of summary timeframe. Without start/stop it means DURATION before the end of job data.")
time_group.add_argument('--start', type=str, metavar='START',
help="Start datetime of summary timeframe.")
time_group.add_argument('--stop', type=str, metavar='STOP',
help="Stop datetime of summary timeframe.")
summary_group = parser.add_argument_group('Summary parameters', summary_group = parser.add_argument_group('Summary parameters',
'Configure the summary.') 'Configure the summary.')
summary_group.add_argument('--summary-group-type', type=str, choices=["user", "account"], default="user",
help="Define if the summary shows user or group statistics. (Default: user)")
summary_group.add_argument('--summary-user', type=str,
help="Specify user for summary.")
summary_group.add_argument('--summary-account', type=str,
help="Specify account for summary.")
summary_group.add_argument('--summary-cluster', type=str,
help="Specify cluster for summary.")
summary_group.add_argument('--summary-user-per-account', action='store_true',
help="If group type 'user' is used, the summary is given per account.")
summary_mutex_group = summary_group.add_mutually_exclusive_group(required=True)
summary_mutex_group.add_argument('--summary-jobs-timeframe', type=str, nargs='?', const="now-1week",
help="Select jobs by timeframe, e.g. select jobs in last 7 days. (default: now-1week)")
summary_mutex_group.add_argument('--summary-jobs-count', type=int, nargs='?', const=1000,
help="Select jobs by count, e.g. select last 1000 jobs. (default: 1000)")
summary_group.add_argument('--summary-jobs-context', type=str, choices=["start", "end"], default="end",
help="Which time context to use: job start time or job end time. Note: in case of 'start time' context, still running jobs will be missing. (default: end)")
summary_group.add_argument('--summary-jobs-count-user-filter-timeframe', type=str, nargs='?', const="now-1week", default="now-1week",
help="Only consider users/accounts that had jobs in the given timeframe, e.g. only consider last 1000 jobs of users that had jobs in the last week. (default: now-1week)")
summary_group.add_argument('--accounts', type=str, metavar='ACCOUNTS',
help="Accounts in summary. (Comma separated list)")
summary_group.add_argument('--users', type=str, metavar='USERS',
help="Users in summary. (Comma separated list)")
summary_group.add_argument('--clusters', type=str, metavar='CLUSTERS',
help="Clusters in summary. (Comma separated list)")
summary_group.add_argument('--summary-value', type=str, choices=['jobs','cpuh'], default='jobs',
help="Count affected job numbers as job count or as cpu hours.")
summary_group.add_argument('--summary-overlap', action='store_true',
help="Only consider cpu hours that overlap with timeframe.")
# svg_group = parser.add_argument_group('SVG parameters',
# 'Configure SVG output.')
# svg_group.add_argument('--svg-file', type=str,
# help="Create SVG and store it at the given filepath.")
debug_group = parser.add_argument_group('Debug arguments',
'')
debug_group.add_argument('--print-timings', action='store_true', help="Print timings")
debug_group.add_argument('--print-sql', action='store_true', help="Print SQL queries")
#summary_group.add_argument('--csv', action='store_true', #summary_group.add_argument('--csv', action='store_true',
# help="Ouput summary as CSV. First line is the header line.") # help="Ouput summary as CSV. First line is the header line.")
...@@ -413,6 +638,19 @@ if __name__ == "__main__": ...@@ -413,6 +638,19 @@ if __name__ == "__main__":
print(args) print(args)
sys.exit(1) sys.exit(1)
# if 'summary_jobs_timeframe' not in args and 'summary_jobs_count' not in args and 'info' not in args:
# args['summary_jobs_timeframe'] = '7-00'
if 'duration' in args and 'start' in args and 'stop' in args:
print("Parameters `duration`, `start` and `stop` can't be used together.", file=sys.stderr)
sys.exit(1)
prule.debug.debug_settings = {'print_timings':False}
if "print_timings" in args:
prule.debug.debug_settings['print_timings'] = True
if "print_sql" in args:
prule.debug.debug_settings['print_sql'] = True
real_db_path = args['db_path'] real_db_path = args['db_path']
if 'no_db_copy' in args: if 'no_db_copy' in args:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment