diff --git a/prule/daemon/__main__.py b/prule/daemon/__main__.py index c6d38ef67f6cf9802246b61d6d4bf1f35e8d299a..8c1a15344f5865e6a09440b2fc4bcad615af23bb 100644 --- a/prule/daemon/__main__.py +++ b/prule/daemon/__main__.py @@ -149,10 +149,10 @@ class Config: signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown class JobQueueItem: - def __init__(self, ccjobid, metadata=None): + def __init__(self, ccjobid, metadata=None, first_check=None): self.ccjobid = ccjobid self.metadata = metadata - self.first_check = int(time.time()) + self.first_check = int(time.time()) if first_check == None else first_check self.backoff = 0 def toDict(self): return {"ccjobid":self.ccjobid, "metadata":self.metadata, "first_check":self.first_check, "backoff":self.backoff} @@ -478,10 +478,14 @@ class CCCheckThread(threading.Thread): if len(finished_ids_set) > 0: q_ids = [] for j in finished_ids_set: - item = JobQueueItem(j[0]) # ccid, jobid, cluster, startTime + item = JobQueueItem(j[0], first_check=cur_time) # ccid, jobid, cluster, startTime # set initial delay item.backoff = config.config["JOB_PROCESS_DELAY_MIN"] if "JOB_PROCESS_DELAY_MIN" in config.config else 0 - item.first_check = cur_time + item.metadata = {} + item.metadata["id"] = j[0] + item.metadata["jobId"] = j[1] + item.metadata["cluster"] = j[2] + item.metadata["startTime"] = j[3] q_ids.append(item) queue.add_all(q_ids) @@ -669,7 +673,7 @@ class PruleThread(threading.Thread): try: with urllib.request.urlopen(req, timeout=10) as response: if response.status == 200: - return True + return json.load(resonse) if response.status == 401: print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr) self.config.signal_shutdown() @@ -759,36 +763,51 @@ class PruleThread(threading.Thread): # track process error process_result = "success" - # retrieve current job metadata - with prule.debug.Timing("prulethread.request_job_meta", "PRINT_TIMING" in config.config): - job_res = self.request_job_meta(job.ccjobid) - - if job_res == None: - return "failure-shutdown" - if job_res == "job-failure": - return "failure-drop" - if job_res == "wait": - return "failure-wait" - job_meta = job_res['Meta'] - #print("PRule got job:", job_meta) - already_processed = "metaData" in job_meta and type(job_meta["metaData"]) == list and "patho" in job_meta["metaData"] + # complete metadata (later used for entry in sqlite database) + job_meta = None + # slurmid, cluster, startTime necessary for loading of jobarchive from filesystem + job_cluster = None + job_slurmid = None + job_startTime = None + if job.metadata != None: + if "jobId" in job.metadata and "cluster" in job.metadata and "startTime" in job.metadata: + job_cluster = job.metadata["cluster"] + job_slurmid = job.metadata["jobId"] + job_startTime = str(job.metadata["startTime"]) if type(job.metadata["startTime"]) == int else str(int(datetime.datetime.fromisoformat(job.metadata['startTime']).timestamp())) + if "duration" in job.metadata: # assume the rest is also present in job.metadata + job_meta = job.metadata + + # check if already processed + already_processed = False if config.config["CACHE_DB"] == True: with prule.debug.Timing("prulethread.db_get_result", "PRINT_TIMING" in config.config): - old_result = self.db_con.db_get_result(job_meta["id"]) - - already_processed = already_processed or len(old_result) > 0 + old_result = self.db_con.db_get_result(job.ccjobid) + if len(old_result) > 0: + already_processed = already_processed or old_result[-1]["evaluated"] == True if already_processed == True: - print("skip, already_processed",job_meta["id"]) + print("skip, already_processed",job.ccjobid) return "success" - job_cluster = job_meta["cluster"] - job_id = str(job_meta["id"]) - job_slurmid = str(job_meta["jobId"]) - job_startTime = str(job_meta["startTime"]) if type(job_meta["startTime"]) == int else str(int(datetime.datetime.fromisoformat(job_meta['startTime']).timestamp())) + # retrieve current job metadata if needed + if job_cluster == None or job_slurmid == None or job_startTime == None: + with prule.debug.Timing("prulethread.request_job_meta", "PRINT_TIMING" in config.config): + job_res = self.request_job_meta(job.ccjobid) + + if job_res == None: + return "failure-shutdown" + if job_res == "job-failure": + return "failure-drop" + if job_res == "wait": + return "failure-wait" + job_meta = job_res['Meta'] + + job_cluster = job_meta["cluster"] + job_slurmid = str(job_meta["jobId"]) + job_startTime = str(job_meta["startTime"]) if type(job_meta["startTime"]) == int else str(int(datetime.datetime.fromisoformat(job_meta['startTime']).timestamp())) + # prepare job path for filesystem access or download jobarchive from API job_path = None job_tempdir = None - job_result_path = None if config.config["API_JOBARCHIVE"] == False: # Load job from filesystem @@ -806,14 +825,7 @@ class PruleThread(threading.Thread): print("Job path:",job_path) - if config.config["STORE_OUTPUT"] == True: - # store result in output json - job_result_path = os.path.join(config.config["OUTPUT_PATH"], "{}.json".format(job.ccjobid)) - else: - # store result in temporary file and later, eventually, pass it to metadata api - if job_tempdir == None: - job_tempdir = tempfile.TemporaryDirectory(prefix="prule_jobarchive_{}_".format(job.ccjobid)) - job_result_path = os.path.join(job_tempdir.name, "{}.json".format(job.ccjobid)) + result_json, process_time = self.prule_job({"job-dir":job_path}) @@ -825,7 +837,7 @@ class PruleThread(threading.Thread): if result_json == False: return "failure-drop" - print("Process: job {} jobId {} time {:.6f}".format(job_meta["id"], job_meta["jobId"], process_time)) + print("Process: job {} jobId {} time {:.6f}".format(job.ccjobid, job_slurmid, process_time)) if self.processTerminated == True: print("Job {} process was terminated.".format(job.ccjobid)) if job_tempdir != None: @@ -852,6 +864,20 @@ class PruleThread(threading.Thread): print(e) print("Job {} process result {}".format(job.ccjobid, result_json)) + # store result to JSON file + if config.config["STORE_OUTPUT"] == True and result_json != None: + # store result in output json + job_result_path = os.path.join(config.config["OUTPUT_PATH"], "{}.json".format(job.ccjobid)) + try: + with open(job_result_path, "w") as outf: + json.dump(result_json, outf) + except Exception as e: + traceback.print_exc() + print(e) + print("Failed to write result to {}".format(job_result_path)) + if job_tempdir != None: + job_tempdir.cleanup() + return "failure-requeue" # upload result to metadata api if config.config["API_METADATA"] == True and result_json != None: @@ -863,7 +889,10 @@ class PruleThread(threading.Thread): if res == False: print("Job {} process failed to write metadata using API_METADATA".format(job.ccjobid)) process_result = "failure-shutdown" + else: + job_meta = res + # upload tag if config.config["API_TAG"] == True and result_json != None: if len(result_json["tags"]) > 0: # set default scope @@ -881,8 +910,35 @@ class PruleThread(threading.Thread): print("Job {} process failed to write tags using API_TAG".format(job.ccjobid)) process_result = "failure-shutdown" + # save result to cache db if config.config["CACHE_DB"] == True: + # load meta data from file if necessary + if job_meta == None: + try: + with open(os.path.join(job_path, "meta.json"), "r") as meta_infile: + job_meta = json.load(meta_infile) + except Exception as e: + traceback.print_exc() + print(e) + print("Failed to load meta data from file {}".format(os.path.join(job_path, "meta.json"))) + + # load meta data from api if necessary + if job_meta == None: + with prule.debug.Timing("prulethread.request_job_meta", "PRINT_TIMING" in config.config): + job_res = self.request_job_meta(job.ccjobid) + if type(job_res) != dict: + if job_tempdir != None: + job_tempdir.cleanup() + if job_res == None: + return "failure-shutdown" + if job_res == "job-failure": + return "failure-drop" + if job_res == "wait": + return "failure-wait" + job_meta = job_res["Meta"] + + # save result to cache db try: evaluated = "error" in result_json and result_json["error"] == False with prule.debug.Timing("prulethread.db_insert_result", "PRINT_TIMING" in config.config): @@ -1054,6 +1110,7 @@ if __name__ == "__main__": parser.add_argument('--no-check', help=argparse.SUPPRESS, action='store_true') # do not start job check thread parser.add_argument('--check-once', help=argparse.SUPPRESS, action='store_true') # run job check thread once parser.add_argument('--job-ids-file', type=str, help='Read job ids from file, process the jobs and quit. State file will be ignored. Check for new jobs will not be performed.') + parser.add_argument('--job-ids-json-file', type=str, help='Read job ids from JSON file, process the jobs and quit. State file will be ignored. Check for new jobs will not be performed.') parser.add_argument('--db-path', type=str, help='Path to database file. Overwrites the DB_PATH value from the config file.') parser.add_argument('--cache-db', action='store_true', help='Store results in sqlite3 database. Overwrites the CACHE_DB value from the config file.') parser.add_argument('--job-process-quiet', action='store_true', help='Pass --quiet to prule and do not print full result json') @@ -1121,6 +1178,21 @@ if __name__ == "__main__": print("No job ids found in job-id-file ", args['job_ids_file'], file=sys.stderr) sys.exit(1) + # read job ids from JSON file + if 'job_ids_json_file' in args: + with open(args['job_ids_json_file'], 'r') as f: + process_delay = config.config["JOB_PROCESS_DELAY"] if "JOB_PROCESS_DELAY" in config.config else 0 + data = json.load(f) + for j in data: + item = JobQueueItem(j["id"]) + item.metadata = j + item.first_check = item.first_check - process_delay + job_ids_preset.append(item) + if len(job_ids_preset) == 0: + print("No job ids found in job-id-json-file ", args['job_ids_json_file'], file=sys.stderr) + sys.exit(1) + + # create queue and load state queue = JobQueue() if len(job_ids_preset) > 0: