diff --git a/prule/db/__init__.py b/prule/db/__init__.py index a4d341bc3e09262dd556315ef59a374e0c2f2843..2b2e567fb468309a539a572444b9e34a78b18bdc 100644 --- a/prule/db/__init__.py +++ b/prule/db/__init__.py @@ -4,16 +4,32 @@ Common database functions for PRULE results database. import datetime import sqlite3 +import prule.debug + +import threading +import concurrent.futures class ResultsDB: - def __init__(self, path): + def __init__(self, path, trace_sql=None, ro=False): self.path = path - self.con = sqlite3.connect(path) + # check sqlite3.threadsafety to see if check_same_thread can be safely deactivated + if ro == True: + #self.con = sqlite3.connect("file:{}?mode=ro&immutable=1&nolock=1".format(path)) + self.con = sqlite3.connect("file:{}?mode=ro&immutable=1".format(path)) + else: + self.con = sqlite3.connect(path) + if trace_sql != None: + self.con.set_trace_callback(trace_sql) + #self.config() self.rule_dict = None def close(self): if self.con != None: self.con.close() self.con = None + def config(self): + cur = self.con.cursor() + cur.execute("PRAGMA cache_size=10000;") + self.con.commit() def db_check(self): cur = self.con.cursor() res = cur.execute("SELECT name FROM sqlite_master") @@ -365,3 +381,60 @@ rulename TEXT res = cur.execute('SELECT COUNT(*) FROM jobs where evaluated=1 and cluster=?;', [cluster]) evaluated = res.fetchall()[0][0] return (evaluated, rule_match, rule_eval, rule_error) + +""" +# Use separate connection +mult = MultiDb(db_con) +future_result = mult.submitobj("all_results", db_con, prule.db.ResultsDB.db_get_all_results, columns=['COUNT(ccid)'], transform=False) +result = future_result.result() + +# Use same connection +mult = MultiDb(db_con) +future_result = mult.submit("non_processed", db_con.db_get_all_results, columns=['COUNT(processed)'], conditions=["processed != 1"], transform=False) +result = future_result.result() + +# Multiple queries +mult = MultiDb(db_con) +queries = {} +queries["one"] = mult.submit(...) +queries["two"] = mult.submit(...) +concurrent.futures.wait(queries.values()) +one = queries["one"].result() +two = queries["two"].result() + +# don't forget to close the pool +mult.shutdown() + +""" +class MultiDb: + def __init__(self, db): + self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4, initializer=MultiDb.init_thread) + self.db = db + def init_thread(): + pid = os.getpid() + ident = threading.get_native_id() + aff = None + if hasattr(os, "sched_getaffinity"): + aff = os.sched_getaffinity(0) + print(pid, ident, aff) + def measure(name, fun, args, kwargs): + with prule.debug.Timing(name, True): + return fun(*args, **kwargs) + def measureobj(name, obj, fun, args, kwargs): + with prule.debug.Timing(name, True): + return fun(obj, *args, **kwargs) + def submit(self, name, fun, *args, **kwargs): + + #fut = self.executor.submit(fun, arg) + + fut = self.executor.submit(MultiDb.measure, name, fun, args, kwargs) + return fut + def submitobj(self, name, obj, fun, *args, **kwargs): + + #fut = self.executor.submit(fun, arg) + newdb = prule.db.ResultsDB(self.db.path, ro=True) + fut = self.executor.submit(MultiDb.measureobj, name, newdb, fun, args, kwargs) + return fut + def shutdown(self): + self.executor.shutdown(wait=False) + diff --git a/prule/summary/__main__.py b/prule/summary/__main__.py index 35394e429dab6659daebd659d5f2e3cf459e9f90..c9b90542b4433255582235f38780067053011ede 100644 --- a/prule/summary/__main__.py +++ b/prule/summary/__main__.py @@ -10,39 +10,47 @@ import re import sqlite3 +#from .svg import svg_gen_daily_summary + import prule.db +import prule.debug + +#TODO: sorting +#TODO: make grouping optional +#TODO: resource hours helptext="""Usage: The program reads entries from the prule database file and creates summaries. There are multiple parameters to control how the summary is done. - 1. Summarize by users or by accounts? - --summary-group-type {user|account} (default: user) - If group type 'user' is used, the results can be subdivided into results per account. - --summary-user-per-account - - 2. Only consider one specific user/account/cluster? Else there is no restriction. - --summary-user USER-NAME - --summary-account ACCOUNT-NAME - --summary-cluster CLUSTER-NAME - - 3. How to limit the considered jobs from the database? - Timebased limit considers all jobs after a certain time. - --summary-jobs-timeframe TIME (default: now-1week) - Countbased limit considers the last X jobs. - --summary-jobs-count COUNT (default: 1000) - - 4. Which timestamp to use for the limitation? - Jobs have a start and a end timestamp. - Also this timestamp is used for sorting for the countbased selection. - --summary-jobs-context {start|end} (default: end) - Note: still running jobs are not in the database. - Therefore, if using disjunct intervals, some jobs might be overlooked. - - 5. In case of countbased selection, the set of users/accounts needs to be limited. - This is done by first check the set of users/accounts active in a certain time interval - and afterwards select the last X jobs of these users/accounts. - --summary-jobs-count-user-filter-timeframe TIME + 1. Time + By default the 7 days before the database's last job's end time is considered. + You can specify a different duration with --duration or use --start/--stop. + + Combinations: + --duration 14-00 - Last 14 days of database. + --start 2024-12-01 - From December 2024 until end of database. + --stop 2025-01-01 - From start of database until end of 2024. + --start 2024-12-01 --duration 7-00 - First week of December 2024. + --stop 2024-12-01 --duration 0-12 - Last 12 hours of November 2024. + --start 2024-12-01 --stop 2024-12-02 - First day of December 2024. + + 2. Filters + Use --users, --accounts, --clusters to limit the summary to the specified groups. + + Examples: + --users usrtr001 + --users usrtr001,johndoe + --clusters noctua1 + --clusters noctua1,noctua2 + --accounts hpc-lco-usrtr + --accounts hpc-lco-usrtr,pc2-shks + + 3. Details + Use --summary-overlap to account only the cpu hours that overlap with the timeframe. + + 4. Grouping + TODO """ # non-capturing groups @@ -184,14 +192,186 @@ def test_parse_timeframe(): #test_parse_timeframe() +# Acceptable time formats include "minutes", "minutes:seconds", "hours:minutes:seconds", "days-hours", "days-hours:minutes" and "days-hours:minutes:seconds". +re_num = "(?:[0-9]+)" +re_timeduration_min = re.compile("({minute:})".format(minute=re_num)) +re_timeduration_minsec = re.compile("({minutes:}):({seconds:})".format(minutes=re_num, seconds=re_num)) +re_timeduration_hourminsec = re.compile("({hours:}):({minutes:}):{seconds:}".format(hours=re_num, minutes=re_num, seconds=re_num)) +re_timeduration_dayhour = re.compile("({days:})-({hours:})".format(days=re_num, hours=re_num)) +re_timeduration_dayhourmin = re.compile("({days:})-({hours:}):({minutes:})".format(days=re_num, hours=re_num, minutes=re_num)) +re_timeduration_dayhourminsec = re.compile("({days:})-({hours:}):({minutes:}):({seconds:})".format(days=re_num, hours=re_num, minutes=re_num, seconds=re_num)) +def parse_timeduration(input): + input = input.lower().strip() + match = re_timeduration_min.fullmatch(input) + if match: + minutes = match.groups() + return datetime.timedelta(minutes=int(minutes)) + match = re_timeduration_minsec.fullmatch(input) + if match: + minutes, seconds = match.groups() + return datetime.timedelta(minutes=int(minutes), seconds=int(seconds)) + match = re_timeduration_hourminsec.fullmatch(input) + if match: + hours, minutes, seconds = match.groups() + return datetime.timedelta(hours=int(hours), minutes=int(minutes), seconds=int(seconds)) + match = re_timeduration_dayhour.fullmatch(input) + if match: + days, hours = match.groups() + return datetime.timedelta(days=int(days), hours=int(hours)) + match = re_timeduration_dayhourmin.fullmatch(input) + if match: + days, hours, minutes = match.groups() + return datetime.timedelta(days=int(days), hours=int(hours), minutes=int(minutes)) + match = re_timeduration_dayhourminsec.fullmatch(input) + if match: + days, hours, minutes, seconds = match.groups() + return datetime.timedelta(days=int(days), hours=int(hours), minutes=int(minutes), seconds=int(seconds)) + raise Exception("Timeduration '{:}' could not be parsed.".format(input)) + +def genTable(table, header=None, align=None, margin=1, header_line="="): + #columns = len(table[0]) + columns = 0 + for l in table: + columns = max(columns, len(l)) + cmax = [0]*columns + for row in table: + for cix,col in enumerate(row): + cmax[cix] = max(cmax[cix], len(str(col))) + out = [] + for rix,row in enumerate(table): + l = "" + for cix,col in enumerate(row): + if align==None or align=="left": + l += str(col) + " "*(cmax[cix]-len(str(col))) + elif align=="right": + l += " "*(cmax[cix]-len(str(col))) + str(col) + if cix < len(row)-1: + l += " "*margin + out += [l] + if header != None and rix == header-1: + l = "" + for cix, col in enumerate(cmax): + l += header_line*col + if cix < len(row)-1: + l += " "*margin + out += [l] + return out + + #def analyse_user(user_name, jobs): + +def info_print(db_con, args): + + print("Database: {}".format(args["db_path"])) + if args["db_path"] != db_con.path: + print("DB Copy: {}".format(db_con.path)) + + with prule.debug.Timing("info_print db_get_clusters", prule.debug.debug_settings['print_timings']): + clusters = db_con.db_get_clusters() + print("Clusters: {} {}".format(len(clusters), ",".join(clusters))) + + with prule.debug.Timing("info_print db_get_users", prule.debug.debug_settings['print_timings']): + users_all = db_con.db_get_users() + print("Users: {}".format(len(users_all))) + + with prule.debug.Timing("info_print db_get_projects", prule.debug.debug_settings['print_timings']): + projects_all = db_con.db_get_projects() + print("Projects: {}".format(len(projects_all))) + + with prule.debug.Timing("info_print jobs_count", prule.debug.debug_settings['print_timings']): + jobs_count = db_con.db_get_all_results(columns=['COUNT(ccid)'], transform=False)[0][0] + print("Jobs: {}".format(jobs_count)) + + with prule.debug.Timing("info_print non_processed", prule.debug.debug_settings['print_timings']): + non_processed = db_con.db_get_all_results(columns=['COUNT(processed)'], conditions=["processed != 1"], transform=False)[0][0] + print("Non-processed: {}".format(non_processed)) + + with prule.debug.Timing("info_print non_evaluated", prule.debug.debug_settings['print_timings']): + non_evaluated = db_con.db_get_all_results(columns=['COUNT(evaluated)'], conditions=["evaluated != 1"], transform=False)[0][0] + print("Non-evaluated: {}".format(non_evaluated)) + + with prule.debug.Timing("info_print db_first_result", prule.debug.debug_settings['print_timings']): + firstres = db_con.db_first_result() + if len(firstres)>0: + firstres = firstres[0] + start = firstres["start"] + stop = firstres["stop"] + start_s = datetime.datetime.fromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") if start != None else "None" + stop_s = datetime.datetime.fromtimestamp(stop).strftime("%Y-%m-%d %H:%M:%S") if stop != None else "None" + print("First job: {} - {} (start: {} stop: {})".format(start_s, stop_s, start, stop )) + + with prule.debug.Timing("info_print db_last_result", prule.debug.debug_settings['print_timings']): + lastres = db_con.db_last_result() + if len(lastres)>0: + lastres = lastres[0] + start = lastres["start"] + stop = lastres["stop"] + start_s = datetime.datetime.fromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") if start != None else "None" + stop_s = datetime.datetime.fromtimestamp(stop).strftime("%Y-%m-%d %H:%M:%S") if stop != None else "None" + print("Last job: {} - {} (start: {} stop: {})".format(start_s, stop_s, start, stop)) + + with prule.debug.Timing("info_print started_earliest", prule.debug.debug_settings['print_timings']): + started_earliest = db_con.db_get_all_results(columns=['start','stop','MIN(start)'], conditions=["start IS NOT NULL"])[0] + if started_earliest != None: + start = started_earliest["start"] + stop = started_earliest["stop"] + start_s = datetime.datetime.fromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") if start != None else "None" + stop_s = datetime.datetime.fromtimestamp(stop).strftime("%Y-%m-%d %H:%M:%S") if stop != None else "None" + print("Earliest start: {} - {} (start: {} stop: {})".format(start_s, stop_s, start, stop)) + + with prule.debug.Timing("info_print started_latest", prule.debug.debug_settings['print_timings']): + started_latest = db_con.db_get_all_results(columns=['start','stop','MAX(start)'], conditions=["start IS NOT NULL"])[0] + if started_latest != None: + start = started_latest["start"] + stop = started_latest["stop"] + start_s = datetime.datetime.fromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") if start != None else "None" + stop_s = datetime.datetime.fromtimestamp(stop).strftime("%Y-%m-%d %H:%M:%S") if stop != None else "None" + print("Latest start: {} - {} (start: {} stop: {})".format(start_s, stop_s, start, stop)) + + with prule.debug.Timing("info_print finished_earliest", prule.debug.debug_settings['print_timings']): + finished_earliest = db_con.db_get_all_results(columns=['start','stop','ccid','MIN(stop)'], conditions=["stop IS NOT NULL"])[0] + if finished_earliest != None: + start = finished_earliest["start"] + stop = finished_earliest["stop"] + start_s = datetime.datetime.fromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") if start != None else "None" + stop_s = datetime.datetime.fromtimestamp(stop).strftime("%Y-%m-%d %H:%M:%S") if stop != None else "None" + ccid = finished_earliest["ccid"] + print("Earliest stop: {} - {} (start: {} stop: {} ccid: {})".format(start_s, stop_s, start, stop, ccid)) + + with prule.debug.Timing("info_print finished_latest", prule.debug.debug_settings['print_timings']): + finished_latest = db_con.db_get_all_results(columns=['start','stop','ccid','MAX(stop)'],conditions=["stop IS NOT NULL"])[0] + if finished_latest != None: + start = finished_latest["start"] + stop = finished_latest["stop"] + start_s = datetime.datetime.fromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") if start != None else "None" + stop_s = datetime.datetime.fromtimestamp(stop).strftime("%Y-%m-%d %H:%M:%S") if stop != None else "None" + ccid = finished_latest["ccid"] + print("Latest stop: {} - {} (start: {} stop: {} ccid: {})".format(start_s, stop_s, start, stop, ccid)) + + for cluster in clusters: + with prule.debug.Timing("info_print db_get_users", prule.debug.debug_settings['print_timings']): + users = db_con.db_get_all_results(columns=['COUNT(DISTINCT user)'], conditions=['cluster = "{}"'.format(cluster)], transform=False)[0][0] + with prule.debug.Timing("info_print db_get_projects", prule.debug.debug_settings['print_timings']): + projects = db_con.db_get_all_results(columns=['COUNT(DISTINCT project)'], conditions=['cluster = "{}"'.format(cluster)], transform=False)[0][0] + print("Cluster {}: Users {} Projects {}".format(cluster, users, projects)) + + + + + + def main(db_path, args): - db_con = prule.db.ResultsDB(db_path) + with prule.debug.Timing("main.ResultsDB", prule.debug.debug_settings['print_timings']): + trace_sql = None + if 'print_sql' in prule.debug.debug_settings: + trace_sql = lambda s : print("SQL: {}".format(s), file=sys.stderr) + db_con = prule.db.ResultsDB(db_path, trace_sql=trace_sql, ro=True) try: - integrity_res = db_con.db_integrity_check() + with prule.debug.Timing("main.db_integrity_check", prule.debug.debug_settings['print_timings']): + integrity_res = db_con.db_integrity_check() if integrity_res == False: print("Database failed integrity check: {:}".format(db_path)) return @@ -202,160 +382,188 @@ def main(db_path, args): def main_db(db_con, args): #signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGINT, signal.SIGTERM]) + if "info" in args: + with prule.debug.Timing("info_print", prule.debug.debug_settings['print_timings']): + info_print(db_con, args) + return + # which clusters to consider? clusters = [] - if "summary_cluster" in args: - clusters = [args["summary_cluster"]] + if "clusters" in args: + clusters = [c.strip() for c in args["clusters"].split(",")] else: clusters = db_con.db_get_clusters() + # which users to consider? + users = None # all users + if "users" in args: + users = [u.strip() for u in args["users"].split(",")] + + # which accounts to consider? + accounts = None # all accounts + if "accounts" in args: + accounts = [a.strip() for a in args["accounts"].split(",")] + + + jobs = {} for cluster in clusters: - jobs[cluster] = db_con.summary_cluster(cluster, args) - - # print results as CSV - results = [header] + results - for result in results: - str_array = [str(r) for r in result] - line = ",".join(str_array) - print(line) - -def summary_cluster(db_con, cluster, args): - jobs = {} # group name -> job list - group_type = args["summary_group_type"] - group_name = args["summary_group"] if "summary_group" in args else None - group_user_account = True if "summary_user_per_account" in args else False - time_context = args["summary_jobs_context"] - - # get jobs by group name - if "summary_jobs_timeframe" in args: - timeframe = int(parse_timeframe(args["summary_jobs_timeframe"]).timestamp()) - time_jobs = [] - if group_name == None: - time_jobs = db_con.db_last_results_time(cluster, None, None, time_context, timeframe) + with prule.debug.Timing("summary_cluster", prule.debug.debug_settings['print_timings']): + jobs[cluster] = summary_cluster(db_con, cluster, users, accounts, args) + + #if 'svg_file' in args: + # svg_gen_daily_summary(db_con, jobs, args) + #else: + # summary_print(db_con, jobs, args) + + +def summary_cluster(db_con, cluster, users_list, accounts_list, args): + # check timeframe for each cluster separately + # start of first job + job_started_earliest = db_con.db_get_all_results(columns=['start','stop','MIN(start)'], conditions=["start IS NOT NULL", "cluster = '{}'".format(cluster)])[0] + first_start = datetime.datetime.fromtimestamp(job_started_earliest['start']) + # end of last job + job_finished_latest = db_con.db_get_all_results(columns=['start','stop','ccid','MAX(stop)'],conditions=["stop IS NOT NULL", "cluster = '{}'".format(cluster)])[0] + last_stop = datetime.datetime.fromtimestamp(job_finished_latest['stop']) + + # which timeframe to consider? + start = None + stop = None + if 'duration' in args: + duration = parse_timeduration(args['duration']) + if 'start' in args: + start = parse_timeframe(args['start']) + stop = start + duration + elif 'stop' in args: + stop = parse_timeframe(args['stop']) + start = stop - duration else: - time_jobs = db_con.db_last_results_time(cluster, group_type, group_name, time_context, timeframe) - if group_type == "user": - for j in time_jobs: - if j["user"] not in jobs: - jobs[j["user"]] = [j] - else: - jobs[j["user"]].append(j) - elif group_type == "account": - for j in time_jobs: - if j["project"] not in jobs: - jobs[j["project"]] = [j] - else: - jobs[j["project"]].append(j) - elif "summary_jobs_count" in args: - count = args["summary_jobs_count"] - - filter_groups = [] - # first get jobs to filter users/accounts - if group_name != None: - filter_groups = [group_name] - else: - filter_timeframe = int(parse_timeframe(args["summary_jobs_count_user_filter_timeframe"]).timestamp()) - filter_jobs = db_con.db_last_results_time(cluster, None, None, time_context, filter_timeframe) - filter_group_set = set() - if group_type == "user": - for j in filter_jobs: - filter_group_set.add(j["user"]) - elif group_type == "account": - for j in filter_jobs: - filter_group_set.add(j["project"]) - filter_groups = sorted(list(filter_group_set)) - for n in filter_groups: - jobs[n] = db_con.db_last_results_num(cluster, group_type, n, time_context, count) + start = last_stop - duration + stop = last_stop + elif 'start' in args or 'stop' in args: + start = first_start + stop = last_stop + if 'start' in args: + start = parse_timeframe(args['start']) + if 'stop' in args: + stop = parse_timeframe(args['stop']) else: - raise Exception("Invalid summary_jobs_* parameter") - return jobs - -def summary_print(db_con, jobs, args): - group_type = args["summary_group_type"] - group_name = None - if group_type == "account": - group_name = args["summary_account"] if "summary_account" in args else None - elif group_type == "user": - group_name = args["summary_user"] if "summary_user" in args else None - group_user_account = True if "summary_user_per_account" in args else False - - def gen_lines(db_con, jobs): - # create summary by rule - rules_dict = db_con.db_get_rule_dict() - rules_len = len(rules_dict) - rule_names = [""]*rules_len - for n in rules_dict: - rule_names[rules_dict[n]-1] = n.replace(' ','_') - rule_names_match = ["rule_{:}_match".format(rule_i) for rule_i in range(1, rules_len+1)] - - # print header line - if group_type == "user" and group_user_account == True: - header = [group_type, "cluster", "account", "total_jobs", "matched_jobs"] - else: - header = [group_type, "cluster", "total_jobs", "matched_jobs"] - for r in rule_names: - header += [r] - - # print group lines - group_list = sorted(jobs.keys()) - results = [] - for g in group_list: - group_jobs = jobs[g] - match_num_acc = {} # account -> number of matched jobs - group_jobs_acc = {} # account -> total number of jobs - rule_matched_acc = {} # account -> matched numbers for each rule - for j in group_jobs: - matched = False - match_group = "all" # catch all account name - if group_user_account == True: - match_group = j["project"] - if match_group not in rule_matched_acc: # account not yet dicts - rule_matched_acc[match_group] = [0]*rules_len - match_num_acc[match_group] = 0 - group_jobs_acc[match_group] = 0 - # update total job number - group_jobs_acc[match_group] += 1 - # update matched rules numbers - for rule_i, rule_match_str in enumerate(rule_names_match): - if j[rule_match_str] == 1: - rule_matched_acc[match_group][rule_i-1] += 1 - matched = True - # update matched job number - if matched == True: - match_num_acc[match_group] += 1 - - accounts = sorted(rule_matched_acc.keys()) - for account in accounts: - job_num = group_jobs_acc[account] - match_num = match_num_acc[account] - rule_matched = rule_matched_acc[account] - if group_type == "user": - if group_user_account == True: - res = [g, cluster, account, job_num, match_num] + # default: 7 days before last job ended + duration = datetime.timedelta(days=7) + start = last_stop - duration + stop = last_stop + if start == None or stop == None: + raise Exception("Start or stop missing. This shouldn't have happened.") + + + start_ts = int(start.timestamp()) + stop_ts = int(stop.timestamp()) + + cond = [] + cond.append('stop >= {}'.format(start_ts)) # consider overlap + cond.append('start <= {}'.format(stop_ts)) # consider overlap + cond.append("cluster = '{}'".format(cluster)) + if users_list != None: + cond.append("user IN ({:})".format(",".join(["\"{:}\"".format(u) for u in users_list]))) + if accounts_list != None: + cond.append("project IN ({:})".format(",".join(["\"{:}\"".format(a) for a in accounts_list]))) + + results_it = db_con.db_get_all_results(conditions=cond, iterator=True) + results = {} # account -> user -> job + for j in results_it: + account = j['project'] + user = j['user'] + #filters should work with SQL conditions + #if accounts_list != None and account not in accounts_list: + # continue + #if users_list != None and user not in users_list: + # continue + if account not in results: + results[account] = {} + if user not in results[account]: + results[account][user] = [] + results[account][user].append(j) + + do_overlap = False + if 'summary_overlap' in args: + do_overlap = True + + rules_dict = db_con.db_get_rule_dict() + rules_len = len(rules_dict) + rule_names = [""]*rules_len + for n in rules_dict: + rule_names[rules_dict[n]-1] = n.replace(' ','_') + rule_names_match = ["rule_{:}_match".format(rule_i) for rule_i in range(1, rules_len+1)] + + c_total_jobs = 0 + c_total_cpuh = 0.0 + # columns: account/user, total (jobs/cpuh), matched (jobs/cpuh), rule(number/cpuh) + accounts = sorted(results.keys()) + account_rows = [] + for a in accounts: + users = sorted(results[a].keys()) + a_total_jobs = 0 + a_matched_jobs = 0 + a_total_cpuh = 0 + a_matched_cpuh = 0 + user_rows = [] + for u in users: + u_total_jobs = 0 + u_matched_jobs = 0 + u_total_cpuh = 0 + u_matched_cpuh = 0 + rule_total = [(0.0,0.0)]*rules_len + for j in results[a][u]: + sec = j['duration'] + if do_overlap == True: # only consider overlap + j_start = j['start'] + j_stop = j['stop'] + if j_start < stop_ts and j_stop > start_ts: + if j_start < start_ts or j_stop > stop_ts: + o_start = max(j_start, start_ts) + o_stop = min(j_stop, stop_ts) + sec = o_stop - o_start + else: + # job duration falls completly into timeframe, use normal duration + pass else: - res = [g, cluster, job_num, match_num] - else: - res = [g, cluster, job_num, match_num] - for m in rule_matched: - res += [m] - results.append(res) - return header, results - - #return header, results - results = [] # list of result rows - header = [] # header row - for cluster in jobs: - header, res = gen_lines(db_con, jobs[cluster]) - results += res - - # print results as CSV - results = [header] + results - for result in results: - str_array = [str(r) for r in result] - line = ",".join(str_array) - print(line) - + sec = 0.0 + hwt = j['num_hwthreads'] + cpuh = hwt * (sec/3600.0) + matches = 0 + for rix,r in enumerate(rule_names_match): + match = j[r] + if match == 1: + matches += 1 + rcount, rcpuh = rule_total[rix] + rule_total[rix] = (rcount+1, rcpuh+cpuh) + if matches > 0: + u_matched_jobs += 1 + u_matched_cpuh += cpuh + u_total_jobs += 1 + u_total_cpuh += cpuh + a_total_jobs += u_total_jobs + a_matched_jobs += u_matched_jobs + a_total_cpuh += u_total_cpuh + a_matched_cpuh += u_matched_cpuh + rule_columns = [ "{:}/{:.2f}".format(rjobs,rcoreh) for rjobs,rcoreh in rule_total] + user_rows.append([u, "{:}/{:.2f}".format(u_total_jobs, u_total_cpuh), "{:}/{:.2f}".format(u_matched_jobs, u_matched_cpuh)] + rule_columns) + account_rows.append([a, "{:}/{:.2f}".format(a_total_jobs, a_total_cpuh), "{:}/{:.2f}".format(a_matched_jobs, a_matched_cpuh)]) + account_rows += user_rows + c_total_jobs += a_total_jobs + c_total_cpuh += a_total_cpuh + # print header + header = ["account/user", "total (jobs/cpuh)", "matched (jobs/cpuh)"] + rule_names + # print rows + cluster_times = "Earliest: {:} {:} Latest: {:} {:}".format( + datetime.datetime.fromtimestamp(job_started_earliest['start']), + datetime.datetime.fromtimestamp(job_started_earliest['stop']), + datetime.datetime.fromtimestamp(job_finished_latest['start']), + datetime.datetime.fromtimestamp(job_finished_latest['stop'])) + print("Summary:", cluster, start, stop, "Total jobs: {:}".format(c_total_jobs), "Total cpuh: {:.2f}".format(c_total_cpuh), cluster_times) + out = genTable([header]+account_rows, header=1) + for l in out: + print(l) if __name__ == "__main__": @@ -381,28 +589,45 @@ if __name__ == "__main__": "You may use this, if you are confident that file locking works with your file system. "+ "NFS apparently does have problems with that.") + info_group = parser.add_argument_group('Info parameter', + 'Info about the database file') + info_group.add_argument('--info', action='store_true', + help="Show general information about the file and quit.") + + time_group = parser.add_argument_group('Time parameters', + 'Configure the timeframe for summary. Only two of start, stop, duration can be used at once. Timeframe consideration is independent of users or accounts. Default is last 7 days of job data.') + + time_group.add_argument('--duration', type=str, metavar='DURATION', + help="Duration of summary timeframe. Without start/stop it means DURATION before the end of job data.") + time_group.add_argument('--start', type=str, metavar='START', + help="Start datetime of summary timeframe.") + time_group.add_argument('--stop', type=str, metavar='STOP', + help="Stop datetime of summary timeframe.") + summary_group = parser.add_argument_group('Summary parameters', 'Configure the summary.') - summary_group.add_argument('--summary-group-type', type=str, choices=["user", "account"], default="user", - help="Define if the summary shows user or group statistics. (Default: user)") - summary_group.add_argument('--summary-user', type=str, - help="Specify user for summary.") - summary_group.add_argument('--summary-account', type=str, - help="Specify account for summary.") - summary_group.add_argument('--summary-cluster', type=str, - help="Specify cluster for summary.") - summary_group.add_argument('--summary-user-per-account', action='store_true', - help="If group type 'user' is used, the summary is given per account.") - summary_mutex_group = summary_group.add_mutually_exclusive_group(required=True) - summary_mutex_group.add_argument('--summary-jobs-timeframe', type=str, nargs='?', const="now-1week", - help="Select jobs by timeframe, e.g. select jobs in last 7 days. (default: now-1week)") - summary_mutex_group.add_argument('--summary-jobs-count', type=int, nargs='?', const=1000, - help="Select jobs by count, e.g. select last 1000 jobs. (default: 1000)") - summary_group.add_argument('--summary-jobs-context', type=str, choices=["start", "end"], default="end", - help="Which time context to use: job start time or job end time. Note: in case of 'start time' context, still running jobs will be missing. (default: end)") - summary_group.add_argument('--summary-jobs-count-user-filter-timeframe', type=str, nargs='?', const="now-1week", default="now-1week", - help="Only consider users/accounts that had jobs in the given timeframe, e.g. only consider last 1000 jobs of users that had jobs in the last week. (default: now-1week)") + summary_group.add_argument('--accounts', type=str, metavar='ACCOUNTS', + help="Accounts in summary. (Comma separated list)") + summary_group.add_argument('--users', type=str, metavar='USERS', + help="Users in summary. (Comma separated list)") + summary_group.add_argument('--clusters', type=str, metavar='CLUSTERS', + help="Clusters in summary. (Comma separated list)") + + summary_group.add_argument('--summary-value', type=str, choices=['jobs','cpuh'], default='jobs', + help="Count affected job numbers as job count or as cpu hours.") + summary_group.add_argument('--summary-overlap', action='store_true', + help="Only consider cpu hours that overlap with timeframe.") + +# svg_group = parser.add_argument_group('SVG parameters', +# 'Configure SVG output.') +# svg_group.add_argument('--svg-file', type=str, +# help="Create SVG and store it at the given filepath.") + + debug_group = parser.add_argument_group('Debug arguments', + '') + debug_group.add_argument('--print-timings', action='store_true', help="Print timings") + debug_group.add_argument('--print-sql', action='store_true', help="Print SQL queries") #summary_group.add_argument('--csv', action='store_true', # help="Ouput summary as CSV. First line is the header line.") @@ -413,6 +638,19 @@ if __name__ == "__main__": print(args) sys.exit(1) +# if 'summary_jobs_timeframe' not in args and 'summary_jobs_count' not in args and 'info' not in args: +# args['summary_jobs_timeframe'] = '7-00' + + if 'duration' in args and 'start' in args and 'stop' in args: + print("Parameters `duration`, `start` and `stop` can't be used together.", file=sys.stderr) + sys.exit(1) + + prule.debug.debug_settings = {'print_timings':False} + if "print_timings" in args: + prule.debug.debug_settings['print_timings'] = True + if "print_sql" in args: + prule.debug.debug_settings['print_sql'] = True + real_db_path = args['db_path'] if 'no_db_copy' in args: