Skip to content
Snippets Groups Projects
Commit 5f4afe43 authored by Alex Wiens's avatar Alex Wiens
Browse files

Prule.daemon: PruleThread: Make metadata fetch optional and move it around,...

Prule.daemon: PruleThread: Make metadata fetch optional and move it around, Add --job-ids-json-file parameter
parent 2f80f6a3
Branches
No related tags found
No related merge requests found
...@@ -149,10 +149,10 @@ class Config: ...@@ -149,10 +149,10 @@ class Config:
signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown
class JobQueueItem: class JobQueueItem:
def __init__(self, ccjobid, metadata=None): def __init__(self, ccjobid, metadata=None, first_check=None):
self.ccjobid = ccjobid self.ccjobid = ccjobid
self.metadata = metadata self.metadata = metadata
self.first_check = int(time.time()) self.first_check = int(time.time()) if first_check == None else first_check
self.backoff = 0 self.backoff = 0
def toDict(self): def toDict(self):
return {"ccjobid":self.ccjobid, "metadata":self.metadata, "first_check":self.first_check, "backoff":self.backoff} return {"ccjobid":self.ccjobid, "metadata":self.metadata, "first_check":self.first_check, "backoff":self.backoff}
...@@ -478,10 +478,14 @@ class CCCheckThread(threading.Thread): ...@@ -478,10 +478,14 @@ class CCCheckThread(threading.Thread):
if len(finished_ids_set) > 0: if len(finished_ids_set) > 0:
q_ids = [] q_ids = []
for j in finished_ids_set: for j in finished_ids_set:
item = JobQueueItem(j[0]) # ccid, jobid, cluster, startTime item = JobQueueItem(j[0], first_check=cur_time) # ccid, jobid, cluster, startTime
# set initial delay # set initial delay
item.backoff = config.config["JOB_PROCESS_DELAY_MIN"] if "JOB_PROCESS_DELAY_MIN" in config.config else 0 item.backoff = config.config["JOB_PROCESS_DELAY_MIN"] if "JOB_PROCESS_DELAY_MIN" in config.config else 0
item.first_check = cur_time item.metadata = {}
item.metadata["id"] = j[0]
item.metadata["jobId"] = j[1]
item.metadata["cluster"] = j[2]
item.metadata["startTime"] = j[3]
q_ids.append(item) q_ids.append(item)
queue.add_all(q_ids) queue.add_all(q_ids)
...@@ -669,7 +673,7 @@ class PruleThread(threading.Thread): ...@@ -669,7 +673,7 @@ class PruleThread(threading.Thread):
try: try:
with urllib.request.urlopen(req, timeout=10) as response: with urllib.request.urlopen(req, timeout=10) as response:
if response.status == 200: if response.status == 200:
return True return json.load(resonse)
if response.status == 401: if response.status == 401:
print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr) print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
self.config.signal_shutdown() self.config.signal_shutdown()
...@@ -759,7 +763,33 @@ class PruleThread(threading.Thread): ...@@ -759,7 +763,33 @@ class PruleThread(threading.Thread):
# track process error # track process error
process_result = "success" process_result = "success"
# retrieve current job metadata # complete metadata (later used for entry in sqlite database)
job_meta = None
# slurmid, cluster, startTime necessary for loading of jobarchive from filesystem
job_cluster = None
job_slurmid = None
job_startTime = None
if job.metadata != None:
if "jobId" in job.metadata and "cluster" in job.metadata and "startTime" in job.metadata:
job_cluster = job.metadata["cluster"]
job_slurmid = job.metadata["jobId"]
job_startTime = str(job.metadata["startTime"]) if type(job.metadata["startTime"]) == int else str(int(datetime.datetime.fromisoformat(job.metadata['startTime']).timestamp()))
if "duration" in job.metadata: # assume the rest is also present in job.metadata
job_meta = job.metadata
# check if already processed
already_processed = False
if config.config["CACHE_DB"] == True:
with prule.debug.Timing("prulethread.db_get_result", "PRINT_TIMING" in config.config):
old_result = self.db_con.db_get_result(job.ccjobid)
if len(old_result) > 0:
already_processed = already_processed or old_result[-1]["evaluated"] == True
if already_processed == True:
print("skip, already_processed",job.ccjobid)
return "success"
# retrieve current job metadata if needed
if job_cluster == None or job_slurmid == None or job_startTime == None:
with prule.debug.Timing("prulethread.request_job_meta", "PRINT_TIMING" in config.config): with prule.debug.Timing("prulethread.request_job_meta", "PRINT_TIMING" in config.config):
job_res = self.request_job_meta(job.ccjobid) job_res = self.request_job_meta(job.ccjobid)
...@@ -770,25 +800,14 @@ class PruleThread(threading.Thread): ...@@ -770,25 +800,14 @@ class PruleThread(threading.Thread):
if job_res == "wait": if job_res == "wait":
return "failure-wait" return "failure-wait"
job_meta = job_res['Meta'] job_meta = job_res['Meta']
#print("PRule got job:", job_meta)
already_processed = "metaData" in job_meta and type(job_meta["metaData"]) == list and "patho" in job_meta["metaData"]
if config.config["CACHE_DB"] == True:
with prule.debug.Timing("prulethread.db_get_result", "PRINT_TIMING" in config.config):
old_result = self.db_con.db_get_result(job_meta["id"])
already_processed = already_processed or len(old_result) > 0
if already_processed == True:
print("skip, already_processed",job_meta["id"])
return "success"
job_cluster = job_meta["cluster"] job_cluster = job_meta["cluster"]
job_id = str(job_meta["id"])
job_slurmid = str(job_meta["jobId"]) job_slurmid = str(job_meta["jobId"])
job_startTime = str(job_meta["startTime"]) if type(job_meta["startTime"]) == int else str(int(datetime.datetime.fromisoformat(job_meta['startTime']).timestamp())) job_startTime = str(job_meta["startTime"]) if type(job_meta["startTime"]) == int else str(int(datetime.datetime.fromisoformat(job_meta['startTime']).timestamp()))
# prepare job path for filesystem access or download jobarchive from API
job_path = None job_path = None
job_tempdir = None job_tempdir = None
job_result_path = None
if config.config["API_JOBARCHIVE"] == False: if config.config["API_JOBARCHIVE"] == False:
# Load job from filesystem # Load job from filesystem
...@@ -806,14 +825,7 @@ class PruleThread(threading.Thread): ...@@ -806,14 +825,7 @@ class PruleThread(threading.Thread):
print("Job path:",job_path) print("Job path:",job_path)
if config.config["STORE_OUTPUT"] == True:
# store result in output json
job_result_path = os.path.join(config.config["OUTPUT_PATH"], "{}.json".format(job.ccjobid))
else:
# store result in temporary file and later, eventually, pass it to metadata api
if job_tempdir == None:
job_tempdir = tempfile.TemporaryDirectory(prefix="prule_jobarchive_{}_".format(job.ccjobid))
job_result_path = os.path.join(job_tempdir.name, "{}.json".format(job.ccjobid))
result_json, process_time = self.prule_job({"job-dir":job_path}) result_json, process_time = self.prule_job({"job-dir":job_path})
...@@ -825,7 +837,7 @@ class PruleThread(threading.Thread): ...@@ -825,7 +837,7 @@ class PruleThread(threading.Thread):
if result_json == False: if result_json == False:
return "failure-drop" return "failure-drop"
print("Process: job {} jobId {} time {:.6f}".format(job_meta["id"], job_meta["jobId"], process_time)) print("Process: job {} jobId {} time {:.6f}".format(job.ccjobid, job_slurmid, process_time))
if self.processTerminated == True: if self.processTerminated == True:
print("Job {} process was terminated.".format(job.ccjobid)) print("Job {} process was terminated.".format(job.ccjobid))
if job_tempdir != None: if job_tempdir != None:
...@@ -852,6 +864,20 @@ class PruleThread(threading.Thread): ...@@ -852,6 +864,20 @@ class PruleThread(threading.Thread):
print(e) print(e)
print("Job {} process result {}".format(job.ccjobid, result_json)) print("Job {} process result {}".format(job.ccjobid, result_json))
# store result to JSON file
if config.config["STORE_OUTPUT"] == True and result_json != None:
# store result in output json
job_result_path = os.path.join(config.config["OUTPUT_PATH"], "{}.json".format(job.ccjobid))
try:
with open(job_result_path, "w") as outf:
json.dump(result_json, outf)
except Exception as e:
traceback.print_exc()
print(e)
print("Failed to write result to {}".format(job_result_path))
if job_tempdir != None:
job_tempdir.cleanup()
return "failure-requeue"
# upload result to metadata api # upload result to metadata api
if config.config["API_METADATA"] == True and result_json != None: if config.config["API_METADATA"] == True and result_json != None:
...@@ -863,7 +889,10 @@ class PruleThread(threading.Thread): ...@@ -863,7 +889,10 @@ class PruleThread(threading.Thread):
if res == False: if res == False:
print("Job {} process failed to write metadata using API_METADATA".format(job.ccjobid)) print("Job {} process failed to write metadata using API_METADATA".format(job.ccjobid))
process_result = "failure-shutdown" process_result = "failure-shutdown"
else:
job_meta = res
# upload tag
if config.config["API_TAG"] == True and result_json != None: if config.config["API_TAG"] == True and result_json != None:
if len(result_json["tags"]) > 0: if len(result_json["tags"]) > 0:
# set default scope # set default scope
...@@ -881,8 +910,35 @@ class PruleThread(threading.Thread): ...@@ -881,8 +910,35 @@ class PruleThread(threading.Thread):
print("Job {} process failed to write tags using API_TAG".format(job.ccjobid)) print("Job {} process failed to write tags using API_TAG".format(job.ccjobid))
process_result = "failure-shutdown" process_result = "failure-shutdown"
# save result to cache db # save result to cache db
if config.config["CACHE_DB"] == True: if config.config["CACHE_DB"] == True:
# load meta data from file if necessary
if job_meta == None:
try:
with open(os.path.join(job_path, "meta.json"), "r") as meta_infile:
job_meta = json.load(meta_infile)
except Exception as e:
traceback.print_exc()
print(e)
print("Failed to load meta data from file {}".format(os.path.join(job_path, "meta.json")))
# load meta data from api if necessary
if job_meta == None:
with prule.debug.Timing("prulethread.request_job_meta", "PRINT_TIMING" in config.config):
job_res = self.request_job_meta(job.ccjobid)
if type(job_res) != dict:
if job_tempdir != None:
job_tempdir.cleanup()
if job_res == None:
return "failure-shutdown"
if job_res == "job-failure":
return "failure-drop"
if job_res == "wait":
return "failure-wait"
job_meta = job_res["Meta"]
# save result to cache db
try: try:
evaluated = "error" in result_json and result_json["error"] == False evaluated = "error" in result_json and result_json["error"] == False
with prule.debug.Timing("prulethread.db_insert_result", "PRINT_TIMING" in config.config): with prule.debug.Timing("prulethread.db_insert_result", "PRINT_TIMING" in config.config):
...@@ -1054,6 +1110,7 @@ if __name__ == "__main__": ...@@ -1054,6 +1110,7 @@ if __name__ == "__main__":
parser.add_argument('--no-check', help=argparse.SUPPRESS, action='store_true') # do not start job check thread parser.add_argument('--no-check', help=argparse.SUPPRESS, action='store_true') # do not start job check thread
parser.add_argument('--check-once', help=argparse.SUPPRESS, action='store_true') # run job check thread once parser.add_argument('--check-once', help=argparse.SUPPRESS, action='store_true') # run job check thread once
parser.add_argument('--job-ids-file', type=str, help='Read job ids from file, process the jobs and quit. State file will be ignored. Check for new jobs will not be performed.') parser.add_argument('--job-ids-file', type=str, help='Read job ids from file, process the jobs and quit. State file will be ignored. Check for new jobs will not be performed.')
parser.add_argument('--job-ids-json-file', type=str, help='Read job ids from JSON file, process the jobs and quit. State file will be ignored. Check for new jobs will not be performed.')
parser.add_argument('--db-path', type=str, help='Path to database file. Overwrites the DB_PATH value from the config file.') parser.add_argument('--db-path', type=str, help='Path to database file. Overwrites the DB_PATH value from the config file.')
parser.add_argument('--cache-db', action='store_true', help='Store results in sqlite3 database. Overwrites the CACHE_DB value from the config file.') parser.add_argument('--cache-db', action='store_true', help='Store results in sqlite3 database. Overwrites the CACHE_DB value from the config file.')
parser.add_argument('--job-process-quiet', action='store_true', help='Pass --quiet to prule and do not print full result json') parser.add_argument('--job-process-quiet', action='store_true', help='Pass --quiet to prule and do not print full result json')
...@@ -1121,6 +1178,21 @@ if __name__ == "__main__": ...@@ -1121,6 +1178,21 @@ if __name__ == "__main__":
print("No job ids found in job-id-file ", args['job_ids_file'], file=sys.stderr) print("No job ids found in job-id-file ", args['job_ids_file'], file=sys.stderr)
sys.exit(1) sys.exit(1)
# read job ids from JSON file
if 'job_ids_json_file' in args:
with open(args['job_ids_json_file'], 'r') as f:
process_delay = config.config["JOB_PROCESS_DELAY"] if "JOB_PROCESS_DELAY" in config.config else 0
data = json.load(f)
for j in data:
item = JobQueueItem(j["id"])
item.metadata = j
item.first_check = item.first_check - process_delay
job_ids_preset.append(item)
if len(job_ids_preset) == 0:
print("No job ids found in job-id-json-file ", args['job_ids_json_file'], file=sys.stderr)
sys.exit(1)
# create queue and load state # create queue and load state
queue = JobQueue() queue = JobQueue()
if len(job_ids_preset) > 0: if len(job_ids_preset) > 0:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment