From 4f08b84503ac28f76a51b206b6cf9574776bdabd Mon Sep 17 00:00:00 2001 From: Alex Wiens <alex.wiens@uni-paderborn.de> Date: Fri, 18 Apr 2025 21:37:56 +0200 Subject: [PATCH] prule.summary: Add more output options --- prule/summary/__main__.py | 318 ++++++++++++++++++++++++++------------ 1 file changed, 220 insertions(+), 98 deletions(-) diff --git a/prule/summary/__main__.py b/prule/summary/__main__.py index c9b9054..460bdf7 100644 --- a/prule/summary/__main__.py +++ b/prule/summary/__main__.py @@ -7,6 +7,7 @@ import datetime import tempfile import shutil import re +import copy import sqlite3 @@ -15,9 +16,6 @@ import sqlite3 import prule.db import prule.debug -#TODO: sorting -#TODO: make grouping optional -#TODO: resource hours helptext="""Usage: The program reads entries from the prule database file and creates summaries. @@ -228,35 +226,80 @@ def parse_timeduration(input): return datetime.timedelta(days=int(days), hours=int(hours), minutes=int(minutes), seconds=int(seconds)) raise Exception("Timeduration '{:}' could not be parsed.".format(input)) -def genTable(table, header=None, align=None, margin=1, header_line="="): - #columns = len(table[0]) +def genTable(rows, header=None, subcol=None, align=None, margin=1, header_line="="): + #columns = len(rows[0]) columns = 0 - for l in table: + for l in rows: columns = max(columns, len(l)) cmax = [0]*columns - for row in table: + for rix,row in enumerate(rows): + if header != None and rix < header: + continue for cix,col in enumerate(row): cmax[cix] = max(cmax[cix], len(str(col))) + cmax_header = [0]*len(rows[0]) # in case headers exist + if header > 0: + cgroups = [] + chix = 0 + for cix, col in enumerate(cmax): + if len(cgroups) == chix: + cgroups.append([]) + cmax_header[chix] += cmax[cix] + cgroups[-1].append(cix) + if cix < len(cmax)-1 and cix in subcol: + cmax_header[chix] += margin + if cix not in subcol: + chix += 1 + for cix,cg in enumerate(cgroups): + cmax_head = max([ len(r[cix]) for r in rows[0:header] ]) + cmax_group = cmax_header[cix] + if cmax_head > cmax_group: + cmax[cg[0]] += cmax_head-cmax_group + cmax_header[cix] = cmax_head out = [] - for rix,row in enumerate(table): + norm_margin = " "*margin + subc_margin = " "*margin + if margin > 0: + subc_margin = subc_margin[:int(len(subc_margin)/2.0)] + "/" + subc_margin[int(len(subc_margin)/2.0)+1:] + # header + for rix,row in enumerate(rows): + if header == None or rix >= header: + break l = "" for cix,col in enumerate(row): + mar = norm_margin + colsize = cmax_header[cix] + if align==None or align=="left": + l += str(col) + " "*(colsize-len(str(col))) + elif align=="right": + l += " "*(colsize-len(str(col))) + str(col) + if cix < len(row)-1: + l += mar + out += [l] + if header != None: + l = "" + for cix, col in enumerate(cmax_header): + l += header_line*col + if cix < len(cmax_header)-1: + l += " "*margin + out += [l] + # normal row + for rix,row in enumerate(rows): + if header != None and rix < header: + continue + l = "" + for cix,col in enumerate(row): + mar = norm_margin + if subcol != None and cix in subcol: + mar = subc_margin if align==None or align=="left": l += str(col) + " "*(cmax[cix]-len(str(col))) elif align=="right": l += " "*(cmax[cix]-len(str(col))) + str(col) if cix < len(row)-1: - l += " "*margin + l += mar out += [l] - if header != None and rix == header-1: - l = "" - for cix, col in enumerate(cmax): - l += header_line*col - if cix < len(row)-1: - l += " "*margin - out += [l] - return out - + return out #def analyse_user(user_name, jobs): @@ -468,21 +511,6 @@ def summary_cluster(db_con, cluster, users_list, accounts_list, args): if accounts_list != None: cond.append("project IN ({:})".format(",".join(["\"{:}\"".format(a) for a in accounts_list]))) - results_it = db_con.db_get_all_results(conditions=cond, iterator=True) - results = {} # account -> user -> job - for j in results_it: - account = j['project'] - user = j['user'] - #filters should work with SQL conditions - #if accounts_list != None and account not in accounts_list: - # continue - #if users_list != None and user not in users_list: - # continue - if account not in results: - results[account] = {} - if user not in results[account]: - results[account][user] = [] - results[account][user].append(j) do_overlap = False if 'summary_overlap' in args: @@ -495,73 +523,161 @@ def summary_cluster(db_con, cluster, users_list, accounts_list, args): rule_names[rules_dict[n]-1] = n.replace(' ','_') rule_names_match = ["rule_{:}_match".format(rule_i) for rule_i in range(1, rules_len+1)] - c_total_jobs = 0 - c_total_cpuh = 0.0 - # columns: account/user, total (jobs/cpuh), matched (jobs/cpuh), rule(number/cpuh) - accounts = sorted(results.keys()) - account_rows = [] - for a in accounts: - users = sorted(results[a].keys()) - a_total_jobs = 0 - a_matched_jobs = 0 - a_total_cpuh = 0 - a_matched_cpuh = 0 - user_rows = [] - for u in users: - u_total_jobs = 0 - u_matched_jobs = 0 - u_total_cpuh = 0 - u_matched_cpuh = 0 - rule_total = [(0.0,0.0)]*rules_len - for j in results[a][u]: - sec = j['duration'] - if do_overlap == True: # only consider overlap - j_start = j['start'] - j_stop = j['stop'] - if j_start < stop_ts and j_stop > start_ts: - if j_start < start_ts or j_stop > stop_ts: - o_start = max(j_start, start_ts) - o_stop = min(j_stop, stop_ts) - sec = o_stop - o_start + sort_column = args["sort"] if "sort" in args else None + sort_reverse = False if "sort_reverse" in args else True + + def add_vec(a,b): + if len(a) == 0: + return copy.copy(b) + c = [] + for ix in range(len(a)): + if type(a[ix]) == tuple: + c.append(tuple( [ a[ix][jx] + b[ix][jx] for jx in range(len(a[ix])) ] )) + else: + c.append(a[ix] + b[ix]) + return c + def job_time(job, start, stop, overlap): + sec = job['duration'] + if overlap == True: + j_start = job['start'] + j_stop = job['stop'] + if j_start < stop_ts and j_stop > start_ts: + if j_start < start_ts or j_stop > stop_ts: + o_start = max(j_start, start_ts) + o_stop = min(j_stop, stop_ts) + sec = o_stop - o_start + else: + # job duration falls completly into timeframe, use normal duration + pass + else: + sec = 0.0 + return sec + def job_matched(job, c_time, r_time): + rule_vec = [] + matches = 0 + for rix,r in enumerate(rule_names_match): + match = job[r] + if match == 1: + matches += 1 + rule_vec.append((1, c_time, r_time)) + else: + rule_vec.append((0, 0.0, 0.0)) + return matches, rule_vec + def job_vec(job, start, stop, overlap): + # total_jobs, total_cpuh, total_resh, matched_jobs, matched_cpuh, matched_resh + j_time = job_time(job, start, stop, overlap) + j_time = j_time/3600.0 # cpu seconds to cpu hours + c_time = job["num_hwthreads"] * j_time + r_time = job["num_acc"] * j_time + matches, rule_vec = job_matched(job, c_time, r_time) + matched = 1 if matches > 0 else 0 + v = [1, c_time, r_time, matched, matched * c_time, matched * r_time] + rule_vec + return v + class Group: + def __init__(self, name): + self.name = name + self.subgroups = {} + self.jobs = [] + self.vec = [] + def add(self, job, vec): + raise Exception("Group.add not implemented") + def print_vec(self): + r = [self.name] + for c in self.vec: + if type(c) == float: + r.append("{:.2f}".format(c)) + elif type(c) == tuple: + allzero = True + t = [] + for j in c: + if type(j) == float: + allzero = allzero and j == 0.0 + t.append("{:.2f}".format(j)) + elif type(j) == int: + allzero = allzero and j == 0 + t.append(str(j)) else: - # job duration falls completly into timeframe, use normal duration - pass + allzero = False + t.append(str(j)) + if allzero == True: + r += [""]*len(t) else: - sec = 0.0 - hwt = j['num_hwthreads'] - cpuh = hwt * (sec/3600.0) - matches = 0 - for rix,r in enumerate(rule_names_match): - match = j[r] - if match == 1: - matches += 1 - rcount, rcpuh = rule_total[rix] - rule_total[rix] = (rcount+1, rcpuh+cpuh) - if matches > 0: - u_matched_jobs += 1 - u_matched_cpuh += cpuh - u_total_jobs += 1 - u_total_cpuh += cpuh - a_total_jobs += u_total_jobs - a_matched_jobs += u_matched_jobs - a_total_cpuh += u_total_cpuh - a_matched_cpuh += u_matched_cpuh - rule_columns = [ "{:}/{:.2f}".format(rjobs,rcoreh) for rjobs,rcoreh in rule_total] - user_rows.append([u, "{:}/{:.2f}".format(u_total_jobs, u_total_cpuh), "{:}/{:.2f}".format(u_matched_jobs, u_matched_cpuh)] + rule_columns) - account_rows.append([a, "{:}/{:.2f}".format(a_total_jobs, a_total_cpuh), "{:}/{:.2f}".format(a_matched_jobs, a_matched_cpuh)]) - account_rows += user_rows - c_total_jobs += a_total_jobs - c_total_cpuh += a_total_cpuh - # print header - header = ["account/user", "total (jobs/cpuh)", "matched (jobs/cpuh)"] + rule_names - # print rows - cluster_times = "Earliest: {:} {:} Latest: {:} {:}".format( - datetime.datetime.fromtimestamp(job_started_earliest['start']), - datetime.datetime.fromtimestamp(job_started_earliest['stop']), - datetime.datetime.fromtimestamp(job_finished_latest['start']), - datetime.datetime.fromtimestamp(job_finished_latest['stop'])) - print("Summary:", cluster, start, stop, "Total jobs: {:}".format(c_total_jobs), "Total cpuh: {:.2f}".format(c_total_cpuh), cluster_times) - out = genTable([header]+account_rows, header=1) + r += t + elif type(c) == int: + r.append(str(c)) + else: + r.append(str(c)) + return r + def print_rows(self, sortcol=None, sortrev=True): + r = [self.print_vec()] + subg = self.subgroups.values() + if sortcol != None: + subg = sorted(subg, reverse=sortrev, key=lambda s: ([s.name]+s.vec)[sortcol]) + for g in subg: + r += g.print_rows(sortcol=sortcol, sortrev=sortrev) + return r + class ClusterGroupAcc(Group): + def add(self, job, vec): + self.vec = add_vec(self.vec, vec) + self.jobs.append(job) + account = job['project'] + if account not in self.subgroups: + self.subgroups[account] = AccountGroup(account) + self.subgroups[account].add(job, vec) + class ClusterGroupUse(Group): + def add(self, job, vec): + self.vec = add_vec(self.vec, vec) + self.jobs.append(job) + user = job['user'] + if user not in self.subgroups: + self.subgroups[user] = UserGroup(user) + self.subgroups[user].add(job, vec) + class AccountGroup(Group): + def add(self, job, vec): + self.vec = add_vec(self.vec, vec) + self.jobs.append(job) + user = job['user'] + if user not in self.subgroups: + self.subgroups[user] = UserGroup(user) + self.subgroups[user].add(job, vec) + class UserGroup(Group): + def add(self, job, vec): + self.vec = add_vec(self.vec, vec) + self.jobs.append(job) + + grouping = "group_acc" in args + if grouping == True: + cgroup = ClusterGroupAcc(cluster) + else: + cgroup = ClusterGroupUse(cluster) + + results_it = db_con.db_get_all_results(conditions=cond, iterator=True) + results = {} # account -> user -> job + for j in results_it: + account = j['project'] + user = j['user'] + #filters should work with SQL conditions + #if accounts_list != None and account not in accounts_list: + # continue + #if users_list != None and user not in users_list: + # continue + #if account not in results: + # results[account] = {} + #if user not in results[account]: + # results[account][user] = [] + #results[account][user].append(j) + j_vec = job_vec(j, start, stop, do_overlap) + cgroup.add(j, j_vec) + + r = cgroup.print_rows(sortcol=sort_column, sortrev=sort_reverse) + header = ["account/user", "total (jobs/cpuh/resh)", "matched (jobs/cpuh/resh)"] + rule_names + align = ["left"] + (["right"]*(6 + (3*len(rule_names)))) + subcol_1 = set([1,2,4,5]) + subcol_2 = set(range(7, 7+(3*len(rule_names)))) + subcol_3 = set(range(7+2, 7+(3*len(rule_names))+2, 3)) + subcol = subcol_1.union( subcol_2.difference(subcol_3) ) + out = genTable([header]+r, header=1, subcol=subcol, align="right") + #out = genTable([header]+account_rows, header=1) for l in out: print(l) @@ -618,6 +734,12 @@ if __name__ == "__main__": help="Count affected job numbers as job count or as cpu hours.") summary_group.add_argument('--summary-overlap', action='store_true', help="Only consider cpu hours that overlap with timeframe.") + summary_group.add_argument('--sort', type=int, metavar='COLUMN', + help="Sort by column index (starting with 0).") + summary_group.add_argument('--sort-reverse', action='store_true', + help="Sort ascending instead of descending.") + summary_group.add_argument('--group-acc', action='store_true', + help="Group by account.") # svg_group = parser.add_argument_group('SVG parameters', # 'Configure SVG output.') -- GitLab