From 94896931fb2a9b7ff4467517f677d609d0130cd5 Mon Sep 17 00:00:00 2001
From: Alex Wiens <alex.wiens@uni-paderborn.de>
Date: Tue, 4 Mar 2025 13:58:05 +0100
Subject: [PATCH] prule.daemon: Introduce error codes for job processing

---
 prule/daemon/__main__.py | 190 ++++++++++++++++++++++-----------------
 1 file changed, 106 insertions(+), 84 deletions(-)

diff --git a/prule/daemon/__main__.py b/prule/daemon/__main__.py
index ed7bdb9..a8008fc 100644
--- a/prule/daemon/__main__.py
+++ b/prule/daemon/__main__.py
@@ -562,34 +562,28 @@ class PruleThread(threading.Thread):
                 try:
                     if int(response.headers.get('content-length')) == 0:
                         print("request_job_meta: empty response")
-                        return "job-failure"
+                        return ("job-failure", 1)
                 except:
                     pass
                 if response.status == 200:
                     job_meta = json.load(response)
-                    return job_meta
-                if response.status == 401:
-                    print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
-                    self.config.signal_shutdown()
+                    return (job_meta, 0)
                 print("Error {} for URL {}".format(response.status, url))
-                if response.status >= 500 or response.status == 429 or response.status == 401:
-                    return "wait"
-                else:
-                    return None
+                return (None, response.status if response.status > 0 else 999)
         except urllib.error.HTTPError as e:
             print("Error {} for URL {}".format(e.code, e.url))
             if e.code >= 500 or e.code == 429: # internal server error or too many requests
-                return "wait"
+                return ("wait", e.code)
             if e.code == 401:
                 print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
                 self.config.signal_shutdown()
-                return "wait"
-            return None
+                return (None, 401)
+            return (None, e.code if e.code > 0 else 999)
         except Exception as e: # something went horribly wrong
             traceback.print_exc()
             print("request_job_meta",e)
-            return None
-        return None
+            return (None, 999)
+        return (None, 990)
     def request_tag_job(self, id, tags):
         #[{"type":"foo","name":"bar"},{"type":"asdf","name":"fdsa"}]
         url = config.config["CC_URL"]+"/api/jobs/tag_job/{}".format(id)
@@ -602,12 +596,9 @@ class PruleThread(threading.Thread):
         try:
             with urllib.request.urlopen(req, timeout=10) as response:
                 if response.status == 200:
-                    return True
-                if response.status == 401:
-                    print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
-                    self.config.signal_shutdown()
+                    return (True, 0)
                 print("Error {} for URL {}".format(response.status, url))
-        except urllib.error.HTTPError as e:
+        except urllib.error.HTTPError as e: # raised for all non-2XX http codes
             msg = ""
             try:
                 msg = e.fp.read().decode('utf-8', 'ignore')
@@ -617,14 +608,15 @@ class PruleThread(threading.Thread):
             if e.code == 401:
                 print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
                 self.config.signal_shutdown()
+                return (False, 401)
             if e.code == 500 and "Duplicate entry" in msg: # TODO: Tag is inserted twice. Fix once tag removal is possible.
-                return True
-            return False
+                return (True, 0)
+            return (False, e.code if e.code > 0 else 999)
         except Exception as e: # something went horribly wrong
             traceback.print_exc()
             print(e)
-            return False
-        return False
+            return (False, 999)
+        return (False, 990)
     def request_jobarchive(self, id):
         url = config.config["CC_URL"]+"/api/jobs/{}?all-metrics=true".format(id)
         headers = {}
@@ -643,25 +635,24 @@ class PruleThread(threading.Thread):
                         json.dump(data["Data"], f)
                     with open(meta_path, "w") as f:
                         json.dump(data["Meta"], f)
-                    return tdir
-                if response.status == 401:
-                    print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
-                    self.config.signal_shutdown()
-                    return "wait"
+                    return (tdir, 0)
                 print("Error {} for URL {}".format(response.status, url))
-        except urllib.error.HTTPError as e:
+        except urllib.error.HTTPError as e: # raised for all non-2XX http codes
             print("Error {} for URL {}".format(e.code, e.url))
             if e.code == 401:
                 print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
                 self.config.signal_shutdown()
-                return "wait"
-            return False
+                return ("wait", 401)
+            return (False, e.code if e.code > 0 else 999)
         except Exception as e: # something went horribly wrong
             traceback.print_exc()
             print(e)
-            tdir.cleanup()
-            return False
-        return False      
+            try:
+                tdir.cleanup()
+            except:
+                print("Cleaning up {} failed".format(tdir.name))
+            return (False, 999)
+        return (False, 990)
     def request_metadata_upload(self, id, metadata):
         url = config.config["CC_URL"]+"/api/jobs/edit_meta/{}".format(id)
         headers = {}
@@ -673,22 +664,20 @@ class PruleThread(threading.Thread):
         try:
             with urllib.request.urlopen(req, timeout=10) as response:
                 if response.status == 200:
-                    return json.load(response)
-                if response.status == 401:
-                    print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
-                    self.config.signal_shutdown()
+                    return (json.load(response), 0)
                 print("Error {} for URL {}".format(response.status, url))
-        except urllib.error.HTTPError as e:
+        except urllib.error.HTTPError as e: # raised for all non-2XX http codes
             print("Error {} for URL {}".format(e.code, e.url))
             if e.code == 401:
                 print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
                 self.config.signal_shutdown()
-            return False
+                return (False, 401)
+            return (False, e.code if e.code > 0 else 999)
         except Exception as e: # something went horribly wrong
             traceback.print_exc()
             print(e)
-            return False
-        return False
+            return (False, 999)
+        return (False, 990)
     def prule_start(self):
         params = ["python3","-u","-m","prule"]
         params += ["--parameters-file", config.config["PRULE_PARAMETERS_FILE_PATH"]]
@@ -707,7 +696,7 @@ class PruleThread(threading.Thread):
                 print("Prule process {} started".format(self.currentProcess.pid))
     def prule_job(self, job):
         if self.currentProcess == None:
-            return (None, 0.0)
+            return (None, 0.0, 10)
         process_time_start = datetime.datetime.now().timestamp()
         tries = 0
         while tries <2:
@@ -722,18 +711,18 @@ class PruleThread(threading.Thread):
                 traceback.print_exc()
                 print(e)
                 if self.stopThread == True:
-                    return (None, 0.0)
+                    return (None, 0.0, 20)
                 if tries == 0:
                     self.prule_restart()
                     tries += 1
                     continue
                 self.prule_restart()
-                return (None, 0.0)
+                return (None, 0.0, 30)
         process_time_stop = datetime.datetime.now().timestamp()
         process_time = process_time_stop - process_time_start
         if type(result) != dict or len(result) == 0:
-            return (False, process_time)
-        return (result, process_time)
+            return (False, process_time, 40)
+        return (result, process_time, 0)
     def prule_restart(self):
         self.prule_stop()
         self.prule_start()
@@ -762,6 +751,7 @@ class PruleThread(threading.Thread):
 
         # track process error
         process_result = "success"
+        error_code     = 0
 
         # complete metadata (later used for entry in sqlite database)
         job_meta      = None
@@ -780,25 +770,30 @@ class PruleThread(threading.Thread):
         # check if already processed
         already_processed = False
         if config.config["CACHE_DB"] == True:
-            with prule.debug.Timing("prulethread.db_get_result", "PRINT_TIMING" in config.config):
-                old_result = self.db_con.db_get_result(job.ccjobid)
+            try:
+                with prule.debug.Timing("prulethread.db_get_result", "PRINT_TIMING" in config.config):
+                    old_result = self.db_con.db_get_result(job.ccjobid)
+            except:
+                error_code = 101000000
+                return ("failure-shutdown", error_code)
             if len(old_result) > 0:
                 already_processed = already_processed or old_result[-1]["evaluated"] == True
         if already_processed == True:
             print("skip, already_processed",job.ccjobid)
-            return "success"
+            return ("success", error_code)
 
         # retrieve current job metadata if needed
         if job_cluster == None or job_slurmid == None or job_startTime == None:
             with prule.debug.Timing("prulethread.request_job_meta", "PRINT_TIMING" in config.config):
-                job_res = self.request_job_meta(job.ccjobid)
-
+                job_res, error_code = self.request_job_meta(job.ccjobid)
+            if error_code > 0:
+                error_code += 102000000
             if job_res == None:
-                return "failure-shutdown"
+                return ("failure-shutdown", error_code)
             if job_res == "job-failure":
-                return "failure-drop"
+                return ("failure-drop", error_code)
             if job_res == "wait":
-                return "failure-wait"
+                return ("failure-wait", error_code)
             job_meta = job_res['Meta']
 
             job_cluster   = job_meta["cluster"]
@@ -815,12 +810,13 @@ class PruleThread(threading.Thread):
         else:
             # Load job from jobarchive api and write it to tempdir
             with prule.debug.Timing("prulethread.request_jobarchive", "PRINT_TIMING" in config.config):
-                job_tempdir = self.request_jobarchive(job.ccjobid)
-
+                job_tempdir, error_code = self.request_jobarchive(job.ccjobid)
+            if error_code > 0:
+                error_code += 103000000
             if job_tempdir == False:
-                return "failure-shutdown"
+                return ("failure-shutdown", error_code)
             if job_tempdir == "wait":
-                return "failure-wait"
+                return ("failure-wait", error_code)
             job_path = job_tempdir.name
 
         print("Job path:",job_path)
@@ -828,29 +824,41 @@ class PruleThread(threading.Thread):
         # check if input files actually exist
         if os.path.exists(os.path.join(job_path, "meta.json")) == False or (os.path.exists(os.path.join(job_path, "data.json")) == False and os.path.exists(os.path.join(job_path, "data.json.gz")) == False):
             if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
-                job_tempdir.cleanup()
+                try:
+                    job_tempdir.cleanup()
+                except:
+                    print("Cleaning up {} failed".format(job_tempdir.name))
             print("Process: job {} Missing files in {}".format(job.ccjobid, job_path))
-            return "failure-requeue"
+            return ("failure-requeue", 104000000)
+
 
+        result_json, process_time, error_code = self.prule_job({"job-dir":job_path})
 
-        result_json, process_time = self.prule_job({"job-dir":job_path})
+        if error_code > 0:
+            error_code += 105000000
 
         if result_json == None or result_json == False:
             if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
-                job_tempdir.cleanup()
+                try:
+                    job_tempdir.cleanup()
+                except:
+                    print("Cleaning up {} failed".format(job_tempdir.name))
             if result_json == None:
-                return "failure-shutdown"
+                return ("failure-shutdown", error_code)
             if result_json == False:
-                return "failure-drop"
+                return ("failure-drop", error_code)
 
         print("Process: job {} jobId {} time {:.6f}".format(job.ccjobid, job_slurmid, process_time))
         if self.processTerminated == True:
             print("Job {} process was terminated.".format(job.ccjobid))
             if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
-                job_tempdir.cleanup()
-            return "failure-requeue"
+                try:
+                    job_tempdir.cleanup()
+                except:
+                    print("Cleaning up {} failed".format(job_tempdir.name))
+            return ("failure-requeue", 0)
 
-        # process result
+        # print process result
         if 'JOB_PROCESS_QUIET' in config.config and config.config['JOB_PROCESS_QUIET'] == False:
             print("Job {} process result {}".format(job.ccjobid, result_json))
         else:
@@ -882,16 +890,20 @@ class PruleThread(threading.Thread):
                 print(e)
                 print("Failed to write result to {}".format(job_result_path))
                 if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
-                    job_tempdir.cleanup()
-                return "failure-requeue"
+                    try:
+                        job_tempdir.cleanup()
+                    except:
+                        print("Cleaning up {} failed".format(job_tempdir.name))
+                return ("failure-requeue", 106000000)
 
         # upload result to metadata api
         if config.config["API_METADATA"] == True and result_json != None:
             patho_message = prepare_patho_message(config, result_json)
             if patho_message != None:
                 with prule.debug.Timing("prulethread.request_metadata_upload", "PRINT_TIMING" in config.config):
-                    res = self.request_metadata_upload(job.ccjobid, {"key":"issues","value":patho_message})
-
+                    res, error_code = self.request_metadata_upload(job.ccjobid, {"key":"issues","value":patho_message})
+                if error_code > 0:
+                    error_code += 107000000
                 if res == False:
                     print("Job {} process failed to write metadata using API_METADATA".format(job.ccjobid))
                     process_result = "failure-shutdown"
@@ -910,8 +922,9 @@ class PruleThread(threading.Thread):
                         if "scope" in t:
                             t.pop("scope")
                 with prule.debug.Timing("prulethread.request_tag_job", "PRINT_TIMING" in config.config):
-                    res = self.request_tag_job(job.ccjobid, result_json["tags"])
-
+                    res, error_code = self.request_tag_job(job.ccjobid, result_json["tags"])
+                if error_code > 0:
+                    error_code += 108000000
                 if res == False:
                     print("Job {} process failed to write tags using API_TAG".format(job.ccjobid))
                     process_result = "failure-shutdown"
@@ -928,20 +941,26 @@ class PruleThread(threading.Thread):
                     traceback.print_exc()
                     print(e)
                     print("Failed to load meta data from file {}".format(os.path.join(job_path, "meta.json")))
+                    error_code = 109000000
 
             # load meta data from api if necessary
             if job_meta == None:
                 with prule.debug.Timing("prulethread.request_job_meta", "PRINT_TIMING" in config.config):
-                    job_res = self.request_job_meta(job.ccjobid)
+                    job_res, error_code = self.request_job_meta(job.ccjobid)
+                if error_code > 0:
+                    error_code += 109000000
                 if type(job_res) != dict:
                     if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
-                        job_tempdir.cleanup()
+                        try:
+                            job_tempdir.cleanup()
+                        except:
+                            print("Cleaning up {} failed".format(job_tempdir.name))
                     if job_res == None:
-                        return "failure-shutdown"
+                        return ("failure-shutdown", error_code)
                     if job_res == "job-failure":
-                        return "failure-drop"
+                        return ("failure-drop", error_code)
                     if job_res == "wait":
-                        return "failure-wait"
+                        return ("failure-wait", error_code)
                 job_meta = job_res["Meta"]
 
             # overwrite metadata in job from prule results
@@ -959,12 +978,15 @@ class PruleThread(threading.Thread):
                 print(e)
                 print("ERROR: db_insert_result failed for job ccid {}".format(job.ccjobid))
                 process_result = "failure-shutdown"
+                error_code = 110000000
 
         # cleanup temp directory
         if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
-            job_tempdir.cleanup()
-
-        return process_result
+            try:
+                job_tempdir.cleanup()
+            except:
+                print("Cleaning up {} failed".format(job_tempdir.name))
+        return (process_result, error_code)
     def run_main(self):
         if self.config.config["CACHE_DB"] == True:
             self.db_con = prule.db.ResultsDB(self.config.config["DB_PATH"])
@@ -1022,7 +1044,7 @@ class PruleThread(threading.Thread):
             # process job
             print("Process ", job.ccjobid)
             process_time_start = datetime.datetime.now().timestamp()
-            result = self.processJob(job)
+            result, process_error_code = self.processJob(job)
             process_time = datetime.datetime.now().timestamp()-process_time_start
 
             # Possible results:
@@ -1036,7 +1058,7 @@ class PruleThread(threading.Thread):
             # "failure-shutdown" - Failure, that is expected to need developer/administrator intervention
             #                      Requeue job and shutdown daemon
 
-            print("Process job {} {}, time {:.6f}".format(job.ccjobid, result, process_time))
+            print("Process job {} {} {}, time {:.6f}".format(job.ccjobid, result, process_error_code, process_time))
 
             if result == "failure-requeue" or result == "failure-wait" or result == "failure-shutdown":
                 if job.backoff < process_delay_max: # wait a maximum of 16 minutes
-- 
GitLab