From 20d68a1ff2eaa6f0fa3985e79dbfa02df5b1eb4d Mon Sep 17 00:00:00 2001
From: Alex Wiens <alex.wiens@uni-paderborn.de>
Date: Sat, 15 Feb 2025 23:01:26 +0100
Subject: [PATCH] Prule.daemon: Add JobQueueItem and time backoff for job
 processing

"JOB_PROCESS_DELAY" option was replaced by "JOB_PROCESS_DELAY_MIN" and "JOB_PROCESS_DELAY_MAX".
---
 prule/daemon/__main__.py | 211 ++++++++++++++++++++++++++++-----------
 1 file changed, 150 insertions(+), 61 deletions(-)

diff --git a/prule/daemon/__main__.py b/prule/daemon/__main__.py
index a56a898..4e5ef27 100644
--- a/prule/daemon/__main__.py
+++ b/prule/daemon/__main__.py
@@ -18,7 +18,7 @@ Example configuration file:
     "CC_URL":"https://cc.example.com/",
     "CC_TOKEN":"VGhpcyBzaG91bGQgbG9vayBsaWtlIHNvbWUgYmFzZTY0IHN0cmluZy4gQnV0IHRoYW5rcyBmb3IgY2hlY2tpbmcuIEhhdmUgYSBuaWNlIGRheSEgOikK",
     "CC_CHECK_INTERVAL":30,
-    "JOB_PROCESS_DELAY":60,
+    "JOB_PROCESS_DELAY_MIN":60,
     "STATE_PATH":"/path/to/state_file.json",
     "PRULE_PARAMETERS_FILE_PATH":"/path/to/file.json",
     "PRULE_CLUSTERS_FILE_PATH":"/path/to/file.json",
@@ -39,8 +39,10 @@ CC_URL                          URL to ClusterCockpit endpoint
 CC_TOKEN                        ClusterCockpit authentication token for REST API
 CC_CHECK_INTERVAL               Integer, Seconds to wait between checking for finished jobs
 CC_CHECK_ONCE                   Boolean, If true, job check is run only once and the daemon exits after all queued jobs are processed
-JOB_PROCESS_DELAY               Integer, Seconds to wait after a job finishes before processing data
+JOB_PROCESS_DELAY_MIN           Integer, Seconds to wait after a job finishes before processing data. Default: 0
                                          CC_CHECK_INTERVAL is may be added to total delay.
+JOB_PROCESS_DELAY_MAX           Integer, Maximal seconds to wait for last try, before giving up on job. Default: 960
+                                         Exponential backoff is used to wait for failing jobs.
 STATE_PATH                      Path to state JSON file
 PRULE_PARAMETERS_FILE_PATH      Path to parameters JSON file
 PRULE_CLUSTERS_FILE_PATH        Path to clusters JSON file
@@ -82,6 +84,7 @@ import tempfile
 import sqlite3
 import datetime
 import html
+import copy
 
 import signal
 import threading
@@ -94,7 +97,7 @@ import prule.debug
 #TODO: Add idle behavior: In case no jobs need to be processed, old jobs can be processed. Add backwards updated search times.
 #TODO: Add metadata fetch before job processing. If patho results already existent, skip job.
 
-config_keys  = ["CC_URL", "CC_TOKEN", "CC_CHECK_INTERVAL", "STATE_PATH", "PRULE_PARAMETERS_FILE_PATH", "PRULE_CLUSTERS_FILE_PATH", "PRULE_RULES_FILE_PATH", "JOBARCHIVE_PATH", "OUTPUT_PATH", "API_METADATA", "API_TAG", "API_JOBARCHIVE", "CACHE_DB", "DB_PATH", "STORE_OUTPUT", "JOB_PROCESS_DELAY"]
+config_keys  = ["CC_URL", "CC_TOKEN", "CC_CHECK_INTERVAL", "STATE_PATH", "PRULE_PARAMETERS_FILE_PATH", "PRULE_CLUSTERS_FILE_PATH", "PRULE_RULES_FILE_PATH", "JOBARCHIVE_PATH", "OUTPUT_PATH", "API_METADATA", "API_TAG", "API_JOBARCHIVE", "CACHE_DB", "DB_PATH", "STORE_OUTPUT"]
 config_types = [str, str, int, str, str, str, str, str, str, bool, bool, bool, bool, str, bool, int]
 
 """
@@ -139,7 +142,26 @@ class Config:
             if type(data[c]) != config_types[i]:
                 raise Exception("Key {} in configuration file has wrong type {}. It should be of type {}.".format(c, type(data[c]), config_types[i]))
         config.config = data
-        
+
+class JobQueueItem:
+    def __init__(self, ccjobid, metadata=None):
+        self.ccjobid     = ccjobid
+        self.metadata    = metadata
+        self.first_check = int(time.time())
+        self.backoff     = 0
+    def toDict(self):
+        return {"ccjobid":self.ccjobid, "metadata":self.metadata, "first_check":self.first_check, "backoff":self.backoff}
+    def fromDict(d):
+        if type(d) != dict:
+            return None
+        item = JobQueueItem(0)
+        item.ccjobid     = d["ccjobid"]     if "ccjobid"     in d else None
+        item.metadata    = d["metadata"]    if "metadata"    in d else None
+        item.first_check = d["first_check"] if "first_check" in d else 0
+        item.backoff     = d["backoff"]     if "backoff"     in d else 0
+        if item.ccjobid == None:
+            return None
+        return item
 
 """
 Holds the known jobs of the daemon.
@@ -167,14 +189,21 @@ class JobQueue:
             return False
         self.smallest_starttime = data["smallest_starttime"]
         self.last_check = data["last_check"]
-        self.queue = data["queue"]
+        self.queue = []
+        for d in data["queue"]:
+            i = JobQueueItem.fromDict(d)
+            if i == None:
+                raise Exception("Invalid item in JobQueue JSON {}".format(d))
+            self.queue.append(i)
         print("Loaded state: ", data)
         return True
     def saveToJson(self, path):
         data = {}
         data["smallest_starttime"] = self.smallest_starttime
         data["last_check"] = self.last_check
-        data["queue"] = self.queue
+        data["queue"] = []
+        for i in self.queue:
+            data["queue"].append(i.toDict())
         try:
             with open(path, "w") as f:
                 json.dump(data, f)
@@ -188,7 +217,9 @@ class JobQueue:
     def stop(self):
         with self.condition:
             self.stopQueue = True
-            self.condition.notify()
+            self.condition.notify_all()
+    def stopped(self):
+        return self.stopQueue == True or (len(self.queue) == 0 and self.stop_once_empty == True)
     def add(self, v, stop_once_empty=False):
         if v == None and stop_once_empty == False: # prevent adding None value
             return
@@ -227,6 +258,41 @@ class JobQueue:
     def size(self):
         with self.condition:
             return len(self.queue)
+    def wait_add(self, timeout=None):
+        # return True  if condition is notified
+        # return False if timeout expires
+        # return None  if queue is stopped
+        if self.stopQueue == True or (len(self.queue) == 0 and self.stop_once_empty == True):
+            return None
+        with self.condition:
+            if self.stopQueue == True:
+                return None
+            return self.condition.wait(timeout)
+    def safe_fun(self, fun):
+        with self.condition:
+            queue_copy = copy.copy(self.queue)
+            return fun(queue_copy)
+    def get_min(self, min_select):
+        # select the item with the smallest value or next item with value <= 0
+        # does not block if queue is empty !
+        min_value = 999999999
+        index     = -1
+        with self.condition:
+            if len(self.queue) == 0:
+                return "empty"
+            # find item with smallest value
+            for ix,i in enumerate(self.queue):
+                value = min_select(i)
+                if value <= 0:
+                    min_value = value
+                    index = ix
+                    break
+                if value < min_value:
+                    min_value = value
+                    index = ix
+            if index != -1:
+                return self.queue.pop(index)
+            return None
     def get(self):
         # Waits until the queue is not empty or the queue is shut down
         # Returns a value if the queue is not empty or None if the queue is shut down
@@ -338,7 +404,7 @@ class CCCheckThread(threading.Thread):
         # current check
         smallest_starttime = sys.maxsize
         new_running_ids_set = set()
-        cur_time = int(time.time())-10
+        cur_time = int(time.time())
 
         # Get all jobs started later than smallest_starttime.
         jobs = self.get_jobs(queue.smallest_starttime, "running")
@@ -349,7 +415,7 @@ class CCCheckThread(threading.Thread):
 
         # fill running job set and compute smallest_starttime
         for ix,j in enumerate(jobs):
-            new_running_ids_set.add(j["id"])
+            new_running_ids_set.add((j["id"], j["jobId"], j["cluster"], j["startTime"]))
             smallest_starttime = min(smallest_starttime, j["startTime"])
 
         # compare running jobs with last check's running jobs to get finished jobs
@@ -367,15 +433,20 @@ class CCCheckThread(threading.Thread):
             for ix,j in enumerate(jobs):
                 # possible states: running, completed, failed, cancelled, stopped, timeout
                 if j["jobState"] != "running":
-                    finished_ids_set.add(j["id"])
+                    finished_ids_set.add((j["id"], j["jobId"], j["cluster"], j["startTime"]))
 
         print("Running jobs {} Finished jobs {} Previous smallest starttime {} Smallest starttime {} Previous queue size {}".format(len(new_running_ids_set), len(finished_ids_set), queue.smallest_starttime, smallest_starttime, queue.size()))
 
         # add found jobs to queue
         if len(finished_ids_set) > 0:
             q_ids = []
-            for jid in finished_ids_set:
-                q_ids.append((jid, cur_time))
+            for j in finished_ids_set:
+                item = JobQueueItem(j[0]) # ccid, jobid, cluster, startTime
+                # set initial delay
+                item.backoff = config.config["JOB_PROCESS_DELAY_MIN"] if "JOB_PROCESS_DELAY_MIN" in config.config else 0
+                item.first_check = cur_time
+                q_ids.append(item)
+
             queue.add_all(q_ids)
         queue.running_ids_set = new_running_ids_set
         queue.last_check = cur_time
@@ -627,14 +698,14 @@ class PruleThread(threading.Thread):
         return returncode
 
     # job: "id" - CC database id,  not "jobId", which is the SLURM job id
-    def processJob(self, ccjob_id, check_time):
+    def processJob(self, job):
 
         # track process error
         process_result = "success"
 
         # retrieve current job metadata
         with prule.debug.Timing("prulethread.request_job_meta", "PRINT_TIMING" in config.config):
-            job_res = self.request_job_meta(ccjob_id)
+            job_res = self.request_job_meta(job.ccjobid)
 
         if job_res == None:
             return "failure-shutdown"
@@ -657,6 +728,7 @@ class PruleThread(threading.Thread):
         job_cluster   = job_meta["cluster"]
         job_id        = str(job_meta["id"])
         job_slurmid   = str(job_meta["jobId"])
+        job_startTime = str(job_meta["startTime"])
 
         job_path        = None
         job_tempdir     = None
@@ -664,20 +736,11 @@ class PruleThread(threading.Thread):
 
         if config.config["API_JOBARCHIVE"] == False:
             # Load job from filesystem
-            job_path = os.path.join(config.config["JOBARCHIVE_PATH"], job_cluster, job_slurmid[:-3], job_slurmid[-3:])
-            # figure out subdirectory of job archive
-            jobarchive_subdir = None
-            try:
-                jobarchive_subdir = os.listdir(job_path)[0]
-            except Exception as e:
-                print(e)
-                print("Failed to access job archive directory {} {}".format(ccjob_id, job_path))
-                return "failure-shutdown"
-            job_path = os.path.join(job_path, jobarchive_subdir)
+            job_path = os.path.join(config.config["JOBARCHIVE_PATH"], job_cluster, job_slurmid[:-3], job_slurmid[-3:], job_startTime)
         else:
             # Load job from jobarchive api and write it to tempdir
             with prule.debug.Timing("prulethread.request_jobarchive", "PRINT_TIMING" in config.config):
-                job_tempdir = self.request_jobarchive(ccjob_id)
+                job_tempdir = self.request_jobarchive(job.ccjobid)
 
             if job_tempdir == False:
                 return "failure-shutdown"
@@ -689,12 +752,12 @@ class PruleThread(threading.Thread):
 
         if config.config["STORE_OUTPUT"] == True:
             # store result in output json
-            job_result_path = os.path.join(config.config["OUTPUT_PATH"], "{}.json".format(ccjob_id))
+            job_result_path = os.path.join(config.config["OUTPUT_PATH"], "{}.json".format(job.ccjobid))
         else:
             # store result in temporary file and later, eventually, pass it to metadata api
             if job_tempdir == None:
-                job_tempdir = tempfile.TemporaryDirectory(prefix="prule_jobarchive_{}_".format(ccjob_id))
-            job_result_path = os.path.join(job_tempdir.name, "{}.json".format(ccjob_id))
+                job_tempdir = tempfile.TemporaryDirectory(prefix="prule_jobarchive_{}_".format(job.ccjobid))
+            job_result_path = os.path.join(job_tempdir.name, "{}.json".format(job.ccjobid))
 
         result_json, process_time = self.prule_job({"job-dir":job_path})
 
@@ -706,20 +769,20 @@ class PruleThread(threading.Thread):
             if result_json == False:
                 return "failure-drop"
 
-        print("Process: job {} jobId {} time {}".format(job_meta["id"], job_meta["jobId"], process_time))
+        print("Process: job {} jobId {} time {:.6f}".format(job_meta["id"], job_meta["jobId"], process_time))
         if self.processTerminated == True:
-            print("Job {} process was terminated.".format(ccjob_id))
+            print("Job {} process was terminated.".format(job.ccjobid))
             if job_tempdir != None:
                 job_tempdir.cleanup()
             return "failure-requeue"
 
         # process result
         if 'JOB_PROCESS_QUIET' in config.config and config.config['JOB_PROCESS_QUIET'] == False:
-            print("Job {} process result {}".format(ccjob_id, result_json))
+            print("Job {} process result {}".format(job.ccjobid, result_json))
         else:
             try:
-                print("Job {} process result, tags: {} err: {} {} fail: {} eval: {} noeval: {} evaltime: {}".format(
-                    ccjob_id,
+                print("Job {} process result, tags: {} err: {} {} fail: {} eval: {} noeval: {} evaltime: {:.6f}".format(
+                    job.ccjobid,
                     len(result_json["tags"]),
                     result_json["error"],
                     len(result_json["errors"]),
@@ -730,7 +793,7 @@ class PruleThread(threading.Thread):
                 ))
             except Exception as e:
                 print(e)
-                print("Job {} process result {}".format(ccjob_id, result_json))
+                print("Job {} process result {}".format(job.ccjobid, result_json))
 
 
         # upload result to metadata api
@@ -738,10 +801,10 @@ class PruleThread(threading.Thread):
             patho_message = prepare_patho_message(config, result_json)
             if patho_message != None:
                 with prule.debug.Timing("prulethread.request_metadata_upload", "PRINT_TIMING" in config.config):
-                    res = self.request_metadata_upload(ccjob_id, {"key":"issues","value":patho_message})
+                    res = self.request_metadata_upload(job.ccjobid, {"key":"issues","value":patho_message})
 
                 if res == False:
-                    print("Job {} process failed to write metadata using API_METADATA".format(ccjob_id))
+                    print("Job {} process failed to write metadata using API_METADATA".format(job.ccjobid))
                     process_result = "failure-shutdown"
 
         if config.config["API_TAG"] == True and result_json != None:
@@ -755,20 +818,20 @@ class PruleThread(threading.Thread):
                         if "scope" in t:
                             t.pop("scope")
                 with prule.debug.Timing("prulethread.request_tag_job", "PRINT_TIMING" in config.config):
-                    res = self.request_tag_job(ccjob_id, result_json["tags"])
+                    res = self.request_tag_job(job.ccjobid, result_json["tags"])
 
                 if res == False:
-                    print("Job {} process failed to write tags using API_TAG".format(ccjob_id))
+                    print("Job {} process failed to write tags using API_TAG".format(job.ccjobid))
                     process_result = "failure-shutdown"
 
         # save result to cache db
         if config.config["CACHE_DB"] == True:
             try:
                 with prule.debug.Timing("prulethread.db_insert_result", "PRINT_TIMING" in config.config):
-                    self.db_con.db_insert_result(ccjob_id, result_json, job_meta, process_time, True)
+                    self.db_con.db_insert_result(job.ccjobid, result_json, job_meta, process_time, True)
             except Exception as e:
                 print(e)
-                print("ERROR: db_insert_result failed for job ccid {}".format(ccjob_id))
+                print("ERROR: db_insert_result failed for job ccid {}".format(job.ccjobid))
                 process_result = "failure-shutdown"
 
         # cleanup temp directory
@@ -783,26 +846,44 @@ class PruleThread(threading.Thread):
         # start prule subprocess
         self.prule_start()
 
+        process_delay_min = config.config["JOB_PROCESS_DELAY_MIN"] if "JOB_PROCESS_DELAY_MIN" in config.config else 0
+        process_delay_max = config.config["JOB_PROCESS_DELAY_MAX"] if "JOB_PROCESS_DELAY_MAX" in config.config else 960
+
         while self.stopThread == False:
 
             if self.stop_on_empty == True and queue.empty() == True:
                 signal.pthread_kill(config.main_tid, signal.SIGTERM)
                 break
 
+            cur_time = int(time.time())
+            def smallest_backoff(item):
+                return item.backoff - (cur_time - item.first_check)
+
             # get new job from queue and block if empty
-            job = queue.get()
-            if job == None: # queue was shut down
-                break
+            job = queue.get_min(smallest_backoff)
 
-            ccjob_id, check_time = job
+            if job == "empty":
+                queue.wait_add() # wait for new jobs
+                continue
+
+            if job == None:
+                if queue.stopped(): # queue was shut down
+                    break
+                continue
 
             # sleep to match job process delay
-            time_offset = int(time.time()) - check_time
-            if config.config["JOB_PROCESS_DELAY"] != 0 and time_offset <= config.config["JOB_PROCESS_DELAY"]:
-                time_sleep = (config.config["JOB_PROCESS_DELAY"] - time_offset) + 1
-                with self.stopCondition:
-                    if self.stopThread == False:
-                        self.stopCondition.wait(time_sleep) # ignore interrupts
+            time_offset = cur_time - job.first_check
+            if time_offset < job.backoff:
+                # sleep now or wait for jobs
+                print("PruleThread: delay {}".format(job.backoff - time_offset))
+                res = queue.wait_add(job.backoff - time_offset) # !! blocks without checking self.stopCondition
+                if res == None: # queue stopped
+                    queue.add(job)
+                    break
+                if res == True:
+                    queue.add(job) # other job was added to queue
+                    continue
+                # else timeout expired and job read for try
 
             if self.stopThread == True:
                 # put back job and return
@@ -810,11 +891,10 @@ class PruleThread(threading.Thread):
                 break
 
             # process job
-            print("Process ", ccjob_id)
+            print("Process ", job.ccjobid)
             process_time_start = datetime.datetime.now().timestamp()
-            result = self.processJob(ccjob_id, check_time)
+            result = self.processJob(job)
             process_time = datetime.datetime.now().timestamp()-process_time_start
-            #print("process job id {} jobId {} cluster {} duration {}".format(job["id"],job["jobId"],job["cluster"], job["duration"]))
 
             # Possible results:
             # "success"          - All fine
@@ -827,23 +907,28 @@ class PruleThread(threading.Thread):
             # "failure-shutdown" - Failure, that is expected to need developer/administrator intervention
             #                      Requeue job and shutdown daemon
 
-            print("Process job {} {}, time {}".format(job, result, process_time))
+            print("Process job {} {}, time {:.6f}".format(job.ccjobid, result, process_time))
 
             if result == "failure-requeue" or result == "failure-wait" or result == "failure-shutdown":
-                queue.add(job)
-            elif result == "failure-drop":
+                if job.backoff < process_delay_max: # wait a maximum of 16 minutes
+                    job.backoff = 15 if job.backoff == 0 else job.backoff * 2
+                    queue.add(job)
+                else:
+                    print("Job {} failed too long after first check. Drop job.".format(job.ccjobid))
+                    result = "failure-drop"
+            if result == "failure-drop":
                 # save failure to cache db
                 if config.config["CACHE_DB"] == True:
                     try:
                         with prule.debug.Timing("prulethread.db_insert_failure", "PRINT_TIMING" in config.config):
-                            self.db_con.db_insert_failure(ccjob_id)
+                            self.db_con.db_insert_failure(job.ccjobid)
                     except Exception as e:
                         print(e)
-                        print("ERROR: db_insert_failure failed for job ccid {}, requeue".format(ccjob_id))
+                        print("ERROR: db_insert_failure failed for job ccid {}, requeue".format(job.ccjobid))
                         queue.add(job)
                         self.stopThread = True
                         signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown
-            elif result == "success":
+            if result == "success":
                 pass
 
             if result == "failure-shutdown":
@@ -949,11 +1034,14 @@ if __name__ == "__main__":
     job_ids_preset = []
     if 'job_ids_file' in args:
         with open(args['job_ids_file'], 'r') as f:
+            process_delay = config.config["JOB_PROCESS_DELAY"] if "JOB_PROCESS_DELAY" in config.config else 0
             for line in f.readlines():
                 line = line.strip()
                 if len(line) > 0:
-                    jid = int(line.strip())
-                    job_ids_preset.append((jid,0))
+                    ccjid = int(line.strip())
+                    item = JobQueueItem(ccjid)
+                    item.first_check = item.first_check - process_delay
+                    job_ids_preset.append(item)
         if len(job_ids_preset) == 0:
             print("No job ids found in job-id-file ", args['job_ids_file'], file=sys.stderr)
             sys.exit(1)
@@ -1003,6 +1091,7 @@ if __name__ == "__main__":
         else:
             signal.sigwait([signal.SIGINT, signal.SIGTERM])
             break
+    print("Got stop signal")
     signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGINT, signal.SIGTERM])
 
     # stop queue and threads
-- 
GitLab