Skip to content
Snippets Groups Projects
Select Git revision
  • 0f54feb75cb411987c48a0ca27804dda05fd7ae1
  • master default protected
2 results

default_plugins.config

Blame
  • __main__.py 59.50 KiB
    #!/bin/env python -u
    
    longhelp="""
    Daemon that polls ClusterCockpit for newly finished jobs and executes the prule program to process the measurement data.
    
    There are two threads that communicate using a queue.
    The CCCheckThread asks regularly for jobs and recognizes newly finished jobs.
    These jobs are added to the queue.
    The PruleThread sequentially gets jobs from the queue and executes the prule program on them.
    
    There are two important files: configuration file and state file.
    The configuration file is a JSON file that contains a few configuration values.
    The state file is a JSON file that contains the timestamps, used to recognize finished jobs, and the queue of unprocessed jobs.
    On start, the state is restored from the state file and the daemon continues processing.
    
    Example configuration file:
    {
        "CC_URL":"https://cc.example.com/",
        "CC_TOKEN":"VGhpcyBzaG91bGQgbG9vayBsaWtlIHNvbWUgYmFzZTY0IHN0cmluZy4gQnV0IHRoYW5rcyBmb3IgY2hlY2tpbmcuIEhhdmUgYSBuaWNlIGRheSEgOikK",
        "CC_CHECK_INTERVAL":30,
        "JOB_PROCESS_DELAY_MIN":60,
        "STATE_PATH":"/path/to/state_file.json",
        "PRULE_PARAMETERS_FILE_PATH":"/path/to/file.json",
        "PRULE_CLUSTERS_FILE_PATH":"/path/to/file.json",
        "PRULE_RULES_FILE_PATH":"/path/to/file.json",
        "JOBARCHIVE_PATH":"/path/to/job-archive/",
        "OUTPUT_PATH":"/path/to/12345.json",
        "DB_PATH":"/path/to/db.sqlite3",
        "API_METADATA":false,
        "STORE_OUTPUT":false,
        "API_TAG":false,
        "API_JOBARCHIVE":false,
        "CACHE_DB":false
    }
    
    Configuration file description:
    
    CC_URL                          URL to ClusterCockpit endpoint
    CC_TOKEN                        ClusterCockpit authentication token for REST API
    CC_CHECK_INTERVAL               Integer, Seconds to wait between checking for finished jobs
    CC_CHECK_ONCE                   Boolean, If true, job check is run only once and the daemon exits after all queued jobs are processed
    JOB_PROCESS_DELAY_MIN           Integer, Seconds to wait after a job finishes before processing data. Default: 0
                                             CC_CHECK_INTERVAL is may be added to total delay.
    JOB_PROCESS_DELAY_MAX           Integer, Maximal seconds to wait for last try, before giving up on job. Default: 960
                                             Exponential backoff is used to wait for failing jobs.
    STATE_PATH                      Path to state JSON file
    PRULE_PARAMETERS_FILE_PATH      Path to parameters JSON file
    PRULE_CLUSTERS_FILE_PATH        Path to clusters JSON file
    PRULE_RULES_FILE_PATH           Path to rule JSON file
    JOBARCHIVE_PATH                 Path to cc-backend archive directory, used in case API_METADATA is false
    OUTPUT_PATH                     Path to storing result files, used in case STORE_OUTPUT is true
    DB_PATH                         Path to sqlite3 file for storing results, used in case CACHE_DB is true
    API_METADATA                    Boolean, If true, the ClusterCockpit REST API is used to store the results as JSON in the job metadata
    STORE_OUTPUT                    Boolean, If true, the results are stored as result files (see OUTPUT_PATH)
    API_TAG                         Boolean, If true, the ClusterCockpit REST API is used to store tags for the jobs
    API_JOBARCHIVE                  Boolean, If true, the ClusterCockpit REST API is used to load job archive data,
                                             if false, the job archive data is read from the filesystem (see JOBARCHIVE_PATH)
    CACHE_DB                        Boolean, If true, the results are stored in a sqlite3 database (see DB_PATH)
    API_TAG_SCOPE                   Boolean, If true, uses scopes and adds default 'global' scope (if missing),
                                             if false, the scopes are removed from the tags (default: false)
    METADATA_MESSAGE                HTML insert added to the metadata
    JOB_PROCESS_QUIET               Boolean, If true, pass --quiet to prule and do not print full result json,
                                             if false print result json (default: true)
    PRINT_TIMING                    Boolean, If true, print debug timing information
    
    Example state file:
    {
        "smallest_starttime":123,
        "last_check":123,
        "queue":[]
    }
    
    """
    
    import typing
    import os.path
    import sys
    import argparse
    import json
    import traceback
    import urllib.request
    import urllib.error
    import time
    import tempfile
    import sqlite3
    import datetime
    import html
    import copy
    
    import signal
    import threading
    import subprocess
    import concurrent.futures
    
    import prule.db
    import prule.debug
    
    #TODO: Add idle behavior: In case no jobs need to be processed, old jobs can be processed. Add backwards updated search times.
    #TODO: Add metadata fetch before job processing. If patho results already existent, skip job.
    
    config_keys  = ["CC_URL", "CC_TOKEN", "CC_CHECK_INTERVAL", "STATE_PATH", "PRULE_PARAMETERS_FILE_PATH", "PRULE_CLUSTERS_FILE_PATH", "PRULE_RULES_FILE_PATH", "JOBARCHIVE_PATH", "OUTPUT_PATH", "API_METADATA", "API_TAG", "API_JOBARCHIVE", "CACHE_DB", "DB_PATH", "STORE_OUTPUT"]
    config_types = [str, str, int, str, str, str, str, str, str, bool, bool, bool, bool, str, bool, int]
    
    
    """
    Simply loads and holds a json.
    """
    class Config:
        def __init__(self, main_tid: int, path: str):
            self.path     :str  = path
            self.config   :dict = {}
            self.main_tid :int  = main_tid
            self.shutdown :bool = False
        def load(self) -> None:
            data = None
            with open(self.path, "r") as f:
                data = json.load(f)
            for i,c in enumerate(config_keys):
                if c not in data:
                    raise Exception("Key {} not found in configuration file loaded from {}.".format(c, self.path))
                if type(data[c]) != config_types[i]:
                    raise Exception("Key {} in configuration file has wrong type {}. It should be of type {}.".format(c, type(data[c]), config_types[i]))
            config.config = data
        def signal_shutdown(self) -> None:
            if self.shutdown == False:
                self.shutdown = True
                signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown
    
    """
    Create the message that is inserted into the ClusterCockpit UI.
    """
    def prepare_patho_message(config: Config, result_json: dict) -> typing.Optional[str]:
        if len(result_json["tags"]) == 0:
            return None
        message = "<ul>"
        for rule in result_json["rules"]:
            if rule["match"] == False:
                continue
            # Escape HTML: https://wiki.python.org/moin/EscapingHtml
            # First escape & < >
            # Second transform non-ascii to xml-escape-sequences (creates a bytes object)
            # Third transform it back to a str object
            type_escaped = html.escape(rule["tag"]["type"]).encode('ascii', 'xmlcharrefreplace').decode()
            name_escaped = html.escape(rule["tag"]["name"]).encode('ascii', 'xmlcharrefreplace').decode()
            temp_escaped = html.escape(rule["template"]).encode('ascii', 'xmlcharrefreplace').decode()
            message += "<li><b>{}: {}:</b> {}</li>".format(type_escaped, name_escaped, temp_escaped)
        message += "</ul>"
        if "METADATA_MESSAGE" in config.config:
            message += config.config["METADATA_MESSAGE"]
        return message
    
    class JobQueueItem:
        def __init__(self, ccjobid: int, metadata: typing.Optional[dict] =None, first_check: typing.Optional[int] =None):
            self.ccjobid     :int  = ccjobid
            self.metadata    :typing.Optional[dict] = metadata
            self.first_check :int                   = int(time.time()) if first_check == None else first_check
            self.backoff     :int                   = 0
        def toDict(self) -> dict:
            return {"ccjobid":self.ccjobid, "metadata":self.metadata, "first_check":self.first_check, "backoff":self.backoff}
        @staticmethod
        def fromDict(d: dict) -> typing.Optional['JobQueueItem']:
            if type(d) != dict:
                return None
            item = JobQueueItem(0)
            if "ccjobid" not in d or type(d["ccjobid"]) != int:
                return None
            item.ccjobid     = d["ccjobid"]
            item.metadata    = d["metadata"]    if "metadata"    in d else None
            item.first_check = d["first_check"] if "first_check" in d else 0
            item.backoff     = d["backoff"]     if "backoff"     in d else 0
            return item
        def __eq__(self, other) -> bool:
            if type(other) != JobQueueItem:
                return False
            return self.ccjobid == other.ccjobid
        def __str__(self) -> str:
            return "<JobQueueItem ccjobid={} metadata={} first_check={} backoff={}>".format(self.ccjobid, self.metadata != None, self.first_check, self.backoff)
    
    """
    Holds the known jobs of the daemon.
    The queue itself is thread-safe, filled by CCCheckThread and emptied by PruleThread.
    Additionally, there are multiple maps containing information about running jobs.
    """
    class JobQueue:
        def __init__(self):
            self.lock = threading.Lock()
            self.condition = threading.Condition(lock=self.lock)
            self.stopQueue = False
            self.running_ids_set = set()
            self.stop_once_empty = False
            # attributes that are stored
            self.queue = []
            self.queue_active = []
            self.smallest_starttime = 0
            self.last_check = 0
        def loadFromJson(self, path: str) -> bool:
            data = None
            try:
                with open(path, "r") as f:
                    data = json.load(f)
            except Exception as e:
                traceback.print_exc()
                print(e)
                return False
            self.smallest_starttime = data["smallest_starttime"]
            self.last_check = data["last_check"]
            self.queue = []
            for d in data["queue"]:
                i = JobQueueItem.fromDict(d)
                if i == None:
                    raise Exception("Invalid item in JobQueue JSON {}".format(d))
                self.queue.append(i)
            print("Loaded state: ", data)
            return True
        def saveToJson(self, path: str) -> bool:
            data = {}
            data["smallest_starttime"] = self.smallest_starttime
            data["last_check"] = self.last_check
            data["queue"] = []
            for i in self.queue:
                data["queue"].append(i.toDict())
            for i in self.queue_active:
                data["queue"].append(i.toDict())
            try:
                with open(path, "w") as f:
                    json.dump(data, f)
                print("Saved state to {}.".format(path))
            except Exception as e:
                traceback.print_exc()
                print(e)
                print("Failed to save the state to {}:".format(path))
                print(data)
                return False
            return True
        def stop(self) -> None:
            with self.condition:
                self.stopQueue = True
                self.condition.notify_all()
        def stopped(self) -> bool:
            return self.stopQueue == True or (len(self.queue) == 0 and self.stop_once_empty == True)
        def add(self, v, stop_once_empty: bool =False) -> None:
            if v == None and stop_once_empty == False: # prevent adding None value
                return
            with self.condition:
                if v != None:
                    self.queue.append(v)
                if stop_once_empty == True:
                    self.stop_once_empty = True
                self.condition.notify()
        def add_all(self, l: typing.Optional[typing.Sequence], stop_once_empty: bool =False) -> None:
            if (l == None or len(l) == 0) and stop_once_empty == False:
                return
            with self.condition:
                if l != None and len(l) > 0:
                    for v in l:
                        if v != None:
                            self.queue.append(v)
                if stop_once_empty == True:
                    self.stop_once_empty == True
                self.condition.notify()
        def clear(self) -> typing.Sequence:
            with self.condition:
                values = self.queue
                self.queue = []
                return values
        def peek(self):
            with self.condition:
                while len(self.queue) == 0 and self.stopQueue == False and self.stop_once_empty == False:
                    self.condition.wait()
                if self.stopQueue == True or (len(self.queue) == 0 and self.stop_once_empty == True):
                    return None
                return self.queue[0]
        def empty(self) -> bool:
            with self.condition:
                return len(self.queue) == 0
        def size(self) -> int:
            with self.condition:
                return len(self.queue)
        def wait_add(self, timeout: typing.Optional[float] =None) -> typing.Optional[bool]:
            # return True  if condition is notified
            # return False if timeout expires
            # return None  if queue is stopped
            if self.stopQueue == True or (len(self.queue) == 0 and self.stop_once_empty == True):
                return None
            with self.condition:
                if self.stopQueue == True:
                    return None
                return self.condition.wait(timeout)
        def safe_fun(self, fun: typing.Callable):
            with self.condition:
                queue_copy = copy.copy(self.queue)
                return fun(queue_copy)
        def get_min(self, min_select: typing.Callable):
            # select the item with the smallest value or next item with value <= 0
            # does not block if queue is empty !
            # Moves item automatically into queue_active
            min_value = 999999999
            index     = -1
            with self.condition:
                if len(self.queue) == 0:
                    return "empty"
                # find item with smallest value
                for ix,i in enumerate(self.queue):
                    value = min_select(i)
                    if value <= 0:
                        min_value = value
                        index = ix
                        break
                    if value < min_value:
                        min_value = value
                        index = ix
                if index != -1:
                    value = self.queue.pop(index)
                    self.queue_active.append(value)
                    return value
                return None
        def get(self):
            # Waits until the queue is not empty or the queue is shut down
            # Returns a value if the queue is not empty or None if the queue is shut down
            # Moves item automatically into queue_active
            with self.condition:
                while len(self.queue) == 0 and self.stopQueue == False and self.stop_once_empty == False:
                    self.condition.wait()
                if self.stopQueue == True or (len(self.queue) == 0 and self.stop_once_empty == True):
                    return None
                value = self.queue[0]
                self.queue = self.queue[1:]
                self.queue_active.append(value)
                return value
        def deactivate_item(self, item) -> typing.Optional[bool]:
            with self.condition:
                if self.stopQueue == True:
                    return None
                try:
                    self.queue_active.remove(item)
                    return True
                except ValueError as e:
                    traceback.print_exc()
                    print("JobQueueItem {} to deactivate not found in active queue".format(item))
                return False
    
    
    """
    Checks for newly finished jobs and puts them into the queue.
    """
    class CCCheckThread(threading.Thread):
        def __init__(self, config: Config, queue: JobQueue, check_once: bool =False):
            self.config = config
            self.queue = queue
            threading.Thread.__init__(self)
            self.stopThread = False
            self.stopCondition = threading.Condition()
            self.requestFuture :typing.Optional[concurrent.futures.Future]            = None
            self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
            self.check_once = check_once
        def request_jobs(self, starttime: typing.Optional[int], state: typing.Optional[str], page: typing.Optional[int], items_per_page: int) -> typing.Optional[dict]:
            tempfile = None
            jobs = None
            try:
                params = []
                if starttime != None:
                    params.append("start-time={}-9999999999".format(starttime))
                if state != None:
                    params.append("state={}".format(state))
                if page != None:
                    params.append("page={}".format(page))
                params.append("items-per-page={}".format(items_per_page))
                params_str = "&".join(params)
                items = ""
                url = config.config["CC_URL"]+"/api/jobs/?"+params_str
                headers = {}
                headers["Access-Control-Request-Headers"] = "x-auth-token"
                headers["X-Auth-Token"] = config.config["CC_TOKEN"]
                
                #print("CCheckThread: Request {}".format(url))
                req = urllib.request.Request(url, headers=headers, method="GET")
    
                def execRequest(req: urllib.request.Request) -> typing.Optional[dict]:
                    try:
                        with urllib.request.urlopen(req, timeout=10) as response:
                            if response.status == 200:
                                jobs = json.load(response)
                                #print(jobs)
                                return jobs
                            else:
                                if response.status == 401:
                                    print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
                                    self.config.signal_shutdown()
                                print(response.status)
                                return None
                    except urllib.error.HTTPError as e:
                        print("Error {} for URL {}".format(e.code, e.url))
                        if e.code == 401:
                            print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
                            self.config.signal_shutdown()
                        return None
                    except Exception as e:
                        traceback.print_exc()
                        print(e)
                        #traceback.print_exc()
                        return None
                with self.stopCondition:
                    self.requestFuture = self.executor.submit(execRequest, req)
            except Exception as e:
                traceback.print_exc()
                print(e)
                return None
    
            #await self.requestTask
            try:
                jobs = self.requestFuture.result()
            except concurrent.futures.CancelledError as e:
                jobs = None
            with self.stopCondition:
                self.requestTask = None
                self.requestFuture = None
    
            return jobs
        def get_jobs(self, starttime: int, state: typing.Optional[str]) -> typing.Optional[list]:
            jobs: typing.Optional[list] = []
            items_per_page = 10000
            ret = 0
            page = 0
            startTime = time.time()
            # request pages until no jobs are returned anymore
            while self.stopThread == False and (page < 1 or ret > 0):
                page += 1
                with prule.debug.Timing("ccthread.request_jobs", "PRINT_TIMING" in config.config):
                    newjobs = self.request_jobs(starttime, state, page, items_per_page)
    
                if newjobs == None:
                    jobs = None
                    break
                ret = len(newjobs["jobs"])
                jobs += newjobs["jobs"]
                if ret < items_per_page:
                    break
                #print("Page {} Jobs {}".format(page, ret))
            stopTime = time.time()
            print("Request {} jobs using {} requests in {:.3} seconds".format(len(jobs) if jobs != None else None, page, stopTime-startTime))
            return jobs
        def check(self) -> None:
            # current check
            smallest_starttime = sys.maxsize
            new_running_ids_set = set()
            cur_time = int(time.time())
    
            # Get all jobs started later than smallest_starttime.
            jobs = self.get_jobs(queue.smallest_starttime, "running")
            if jobs == None:
                return
                jobs = []
            print("Got {} jobs".format(len(jobs)))
    
            # fill running job set and compute smallest_starttime
            for ix,j in enumerate(jobs):
                new_running_ids_set.add((j["id"], j["jobId"], j["cluster"], j["startTime"]))
                smallest_starttime = min(smallest_starttime, j["startTime"])
    
            # compare running jobs with last check's running jobs to get finished jobs
            finished_ids_set = queue.running_ids_set - new_running_ids_set
    
            # get complete list of jobs started since last check and gather missing finished jobs
            # this has two reasons:
            # - find finished jobs that started AND finished since the last check (these won't show up in the running jobs lists)
            # - get all jobs that finished since the daemon was shut down
            # Do not do this, if there was no last_check, else you would get ALL jobs from the database.
            if queue.last_check > 0:
                jobs = self.get_jobs(queue.last_check, None)
                if jobs == None:
                    return
                for ix,j in enumerate(jobs):
                    # possible states: running, completed, failed, cancelled, stopped, timeout
                    if j["jobState"] != "running":
                        finished_ids_set.add((j["id"], j["jobId"], j["cluster"], j["startTime"]))
    
            print("Running jobs {} Finished jobs {} Previous smallest starttime {} Smallest starttime {} Previous queue size {}".format(len(new_running_ids_set), len(finished_ids_set), queue.smallest_starttime, smallest_starttime, queue.size()))
    
            # add found jobs to queue
            if len(finished_ids_set) > 0:
                q_ids = []
                for j in finished_ids_set:
                    item = JobQueueItem(j[0], first_check=cur_time) # ccid, jobid, cluster, startTime
                    # set initial delay
                    item.backoff = config.config["JOB_PROCESS_DELAY_MIN"] if "JOB_PROCESS_DELAY_MIN" in config.config else 0
                    item.metadata = {}
                    item.metadata["id"] = j[0]
                    item.metadata["jobId"] = j[1]
                    item.metadata["cluster"] = j[2]
                    item.metadata["startTime"] = j[3]
                    q_ids.append(item)
    
                queue.add_all(q_ids)
            queue.running_ids_set = new_running_ids_set
            queue.last_check = cur_time
    
            if smallest_starttime < sys.maxsize:
                queue.smallest_starttime = smallest_starttime
    
        def run_main(self) -> None:
            while self.stopThread == False:
    
                # check CC for finished jobs and add them to queue
                print("CCCheckThread")
                self.check()
    
                if self.check_once == True:
                    queue.add(None, stop_once_empty = True)
                    break
    
                # sleep
                def threadStopped():
                    return self.stopThread
                with self.stopCondition:
                    if self.stopThread == False:
                        print("CCheckThread: sleep")
                        self.stopCondition.wait_for(threadStopped, config.config["CC_CHECK_INTERVAL"])
    
            # thread done
            print("CCCheckThread Done")
            self.executor.shutdown(wait=False)
        def run(self) -> None:
            try:
                self.run_main()
            except Exception as e:
                traceback.print_exc()
                print("CCCheckThread",e)
                if self.config.shutdown == False:
                    self.config.signal_shutdown()
        def stop(self) -> None:
            with self.stopCondition:
                print("Stop CCCheckThread")
                self.stopThread = True
                if self.requestFuture != None and self.requestFuture.done() == False:
                    self.requestFuture.cancel()
                    #self.requestFuture.set_result(None)
                self.stopCondition.notify()
    
    """
    Gets a job from the queue and starts the prule program.
    """
    class PruleThread(threading.Thread):
        def __init__(self, config: Config, stop_on_empty: bool, queue: JobQueue):
            self.config = config
            self.queue = queue
            threading.Thread.__init__(self)
            self.stopThread = False
            self.stopCondition = threading.Condition()
            self.stop_on_empty = stop_on_empty
            self.currentProcess: typing.Optional[subprocess.Popen] = None
            self.processTerminated = False
            self.db_con: typing.Optional[prule.db.ResultsDB] = None
        def request_job_meta(self, id: int) -> typing.Tuple[typing.Union[None, str, dict], int]:
            url = config.config["CC_URL"]+"/api/jobs/{}".format(id)
            headers = {}
            headers["Access-Control-Request-Headers"] = "x-auth-token"
            headers["X-Auth-Token"] = config.config["CC_TOKEN"]
            headers["Content-Type"] = "application/json"
            
            req = urllib.request.Request(url, data="[]".encode("UTF-8"), headers=headers, method="POST")
            try:
                with urllib.request.urlopen(req, timeout=10) as response:
                    try:
                        if int(response.headers.get('content-length')) == 0:
                            print("request_job_meta: empty response")
                            return ("job-failure", 1)
                    except:
                        pass
                    if response.status == 200:
                        job_meta = json.load(response)
                        return (job_meta, 0)
                    print("Error {} for URL {}".format(response.status, url))
                    return (None, response.status if response.status > 0 else 999)
            except urllib.error.HTTPError as e:
                print("Error {} for URL {}".format(e.code, e.url))
                if e.code >= 500 or e.code == 429: # internal server error or too many requests
                    return ("wait", e.code)
                if e.code == 401:
                    print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
                    self.config.signal_shutdown()
                    return (None, 401)
                return (None, e.code if e.code > 0 else 999)
            except Exception as e: # something went horribly wrong
                traceback.print_exc()
                print("request_job_meta",e)
                return (None, 999)
            return (None, 990)
        def request_tag_job(self, id: int, tags: dict) -> typing.Tuple[bool, int]:
            #[{"type":"foo","name":"bar"},{"type":"asdf","name":"fdsa"}]
            url = config.config["CC_URL"]+"/api/jobs/tag_job/{}".format(id)
            headers = {}
            headers["Access-Control-Request-Headers"] = "x-auth-token"
            headers["X-Auth-Token"] = config.config["CC_TOKEN"]
            headers["Content-Type"] = "application/json"
            data = json.dumps(tags)
            req = urllib.request.Request(url, data=data.encode('UTF-8'), headers=headers, method="POST")
            try:
                with urllib.request.urlopen(req, timeout=10) as response:
                    if response.status == 200:
                        return (True, 0)
                    print("Error {} for URL {}".format(response.status, url))
            except urllib.error.HTTPError as e: # raised for all non-2XX http codes
                msg = ""
                try:
                    msg = e.fp.read().decode('utf-8', 'ignore')
                except:
                    pass
                print("Error {} for URL {} Reason {} Msg {}".format(e.code, e.url, e.reason, msg))
                if e.code == 401:
                    print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
                    self.config.signal_shutdown()
                    return (False, 401)
                if e.code == 500 and "Duplicate entry" in msg: # TODO: Tag is inserted twice. Fix once tag removal is possible.
                    return (True, 0)
                if e.code == 500 and "UNIQUE constraint failed: jobtag.job_id, jobtag.tag_id" in msg: # TODO: Tag is inserted twice. Fix once tag removal is possible.
                    return (True, 0)
                return (False, e.code if e.code > 0 else 999)
            except Exception as e: # something went horribly wrong
                traceback.print_exc()
                print(e)
                return (False, 999)
            return (False, 990)
        def request_jobarchive(self, id:int ) -> typing.Tuple[typing.Union[bool, tempfile.TemporaryDirectory, str], int]:
            url = config.config["CC_URL"]+"/api/jobs/{}?all-metrics=true".format(id)
            headers = {}
            headers["Access-Control-Request-Headers"] = "x-auth-token"
            headers["X-Auth-Token"] = config.config["CC_TOKEN"]
            req = urllib.request.Request(url, headers=headers, method="GET")
            tdir = tempfile.TemporaryDirectory(prefix="prule_jobarchive_{}_".format(id), delete="NO_TMPDIR_CLEAN" not in self.config.config)
            try:
                with urllib.request.urlopen(req, timeout=10) as response:
                    if response.status == 200:
                        data = json.load(response)
                        #print(data)
                        data_path = os.path.join(tdir.name, "data.json")
                        meta_path = os.path.join(tdir.name, "meta.json")
                        with open(data_path, "w") as f:
                            json.dump(data["Data"], f)
                        with open(meta_path, "w") as f:
                            json.dump(data["Meta"], f)
                        return (tdir, 0)
                    print("Error {} for URL {}".format(response.status, url))
            except urllib.error.HTTPError as e: # raised for all non-2XX http codes
                print("Error {} for URL {}".format(e.code, e.url))
                if e.code == 401:
                    print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
                    self.config.signal_shutdown()
                    return ("wait", 401)
                return (False, e.code if e.code > 0 else 999)
            except Exception as e: # something went horribly wrong
                traceback.print_exc()
                print(e)
                try:
                    tdir.cleanup()
                except:
                    print("Cleaning up {} failed".format(tdir.name))
                return (False, 999)
            return (False, 990)
        def request_metadata_upload(self, id: int, metadata: dict) -> typing.Tuple[typing.Union[bool, dict], int]:
            url = config.config["CC_URL"]+"/api/jobs/edit_meta/{}".format(id)
            headers = {}
            headers["Access-Control-Request-Headers"] = "x-auth-token"
            headers["X-Auth-Token"] = config.config["CC_TOKEN"]
            headers["Content-Type"] = "application/json"
            data = json.dumps(metadata)
            req = urllib.request.Request(url, data=data.encode('UTF-8'), headers=headers, method="POST")
            try:
                with urllib.request.urlopen(req, timeout=10) as response:
                    if response.status == 200:
                        return (json.load(response), 0)
                    print("Error {} for URL {}".format(response.status, url))
            except urllib.error.HTTPError as e: # raised for all non-2XX http codes
                print("Error {} for URL {}".format(e.code, e.url))
                if e.code == 401:
                    print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
                    self.config.signal_shutdown()
                    return (False, 401)
                return (False, e.code if e.code > 0 else 999)
            except Exception as e: # something went horribly wrong
                traceback.print_exc()
                print(e)
                return (False, 999)
            return (False, 990)
        def prule_start(self) -> None:
            params = ["python3","-u","-m","prule"]
            params += ["--parameters-file", config.config["PRULE_PARAMETERS_FILE_PATH"]]
            params += ["--clusters-file", config.config["PRULE_CLUSTERS_FILE_PATH"]]
            params += ["--rules-file", config.config["PRULE_RULES_FILE_PATH"]]
            params += ["--log-stderr"]
            params += ["--job-stdin"]
            params += ["--overwrite"]
            if 'JOB_PROCESS_QUIET' not in config.config or config.config['JOB_PROCESS_QUIET'] == True:
                params += ["--quiet"]
            def preexec():
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGINT, signal.SIGTERM, signal.SIGUSR1, signal.SIGPIPE])
            with self.stopCondition:
                if self.currentProcess == None:
                    self.currentProcess = subprocess.Popen(params, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=sys.stderr, preexec_fn = preexec)
                    print("Prule process {} started".format(self.currentProcess.pid))
        def prule_job(self, job: dict) -> typing.Tuple[typing.Union[None, dict, bool], float, int]:
            if self.currentProcess == None:
                return (None, 0.0, 10)
            process_time_start = datetime.datetime.now().timestamp()
            tries = 0
            result = None
            while tries <2:
                try:
                    data = json.dumps(job) + "\n"
                    if self.currentProcess.stdin != None:
                        self.currentProcess.stdin.write(data.encode("utf-8"))
                        self.currentProcess.stdin.flush()
                    if self.currentProcess.stdout != None:
                        line = self.currentProcess.stdout.readline()
                        result = json.loads(line)
                    break
                except Exception as e:
                    traceback.print_exc()
                    print(e)
                    if self.stopThread == True:
                        return (None, 0.0, 20)
                    if tries == 0:
                        self.prule_restart()
                        tries += 1
                        continue
                    self.prule_restart()
                    return (None, 0.0, 30)
            process_time_stop = datetime.datetime.now().timestamp()
            process_time = process_time_stop - process_time_start
            if type(result) != dict or len(result) == 0:
                return (False, process_time, 40)
            return (result, process_time, 0)
        def prule_restart(self) -> None:
            self.prule_stop()
            self.prule_start()
        def prule_stop(self) -> typing.Optional[int]:
            proc = None
            with self.stopCondition:
                if self.currentProcess == None:
                    return None
                proc = self.currentProcess
            try:
                if self.currentProcess.stdin != None:
                    self.currentProcess.stdin.close()
            except:
                pass
            try:
                self.currentProcess.terminate()
            except:
                pass
            returncode = self.currentProcess.wait()
            print("Prule process {} stopped, returncode {}".format(self.currentProcess.pid, returncode))
            with self.stopCondition:
                self.currentProcess = None
            return returncode
    
        # job: "id" - CC database id,  not "jobId", which is the SLURM job id
        def processJob(self, job: JobQueueItem) -> typing.Tuple[str, int]:
    
            # track process error
            process_result = "success"
            error_code     = 0
    
            # complete metadata (later used for entry in sqlite database)
            job_meta      = None
            # slurmid, cluster, startTime necessary for loading of jobarchive from filesystem
            job_cluster   = None
            job_slurmid   = None
            job_startTime = None
            if job.metadata != None:
                if "jobId" in job.metadata and "cluster" in job.metadata and "startTime" in job.metadata:
                    job_cluster   = job.metadata["cluster"]
                    job_slurmid   = job.metadata["jobId"]
                    job_startTime = str(job.metadata["startTime"]) if type(job.metadata["startTime"]) == int else str(int(datetime.datetime.fromisoformat(job.metadata['startTime']).timestamp()))
                    if "duration" in job.metadata: # assume the rest is also present in job.metadata
                        job_meta = job.metadata
    
            # check if already processed
            already_processed = False
            if config.config["CACHE_DB"] == True:
                try:
                    with prule.debug.Timing("prulethread.db_get_result", "PRINT_TIMING" in config.config):
                        if self.db_con != None:
                            old_result = self.db_con.db_get_result(job.ccjobid)
                        else:
                            error_code = 101000000
                            return ("failure-shutdown", error_code)
                except:
                    error_code = 101000000
                    return ("failure-shutdown", error_code)
                if len(old_result) > 0:
                    already_processed = already_processed or old_result[-1]["evaluated"] == True
            if already_processed == True:
                print("skip, already_processed",job.ccjobid)
                return ("success", error_code)
    
            # retrieve current job metadata if needed
            if job_cluster == None or job_slurmid == None or job_startTime == None:
                with prule.debug.Timing("prulethread.request_job_meta", "PRINT_TIMING" in config.config):
                    job_res, error_code = self.request_job_meta(job.ccjobid)
                if error_code > 0:
                    error_code += 102000000
                if job_res == None:
                    return ("failure-shutdown", error_code)
                elif type(job_res) == str:
                    if job_res == "job-failure":
                        return ("failure-drop", error_code)
                    if job_res == "wait":
                        return ("failure-wait", error_code)
                elif type(job_res) == dict:
                    if type(job_res['Meta']) == dict:
                        job_meta = job_res['Meta']
                if job_meta == None:
                    return ("failure-drop", 102000000)
    
                job_cluster   = job_meta["cluster"]
                job_slurmid   = job_meta["jobId"]
                job_startTime = str(job_meta["startTime"]) if type(job_meta["startTime"]) == int else str(int(datetime.datetime.fromisoformat(job_meta['startTime']).timestamp()))
    
            # prepare job path for filesystem access or download jobarchive from API
            job_path        = ""
            job_tempdir     = None
    
            if config.config["API_JOBARCHIVE"] == False:
                # Load job from filesystem
                job_path = os.path.join(config.config["JOBARCHIVE_PATH"], job_cluster, str(job_slurmid)[:-3], str(job_slurmid)[-3:], job_startTime)
            else:
                # Load job from jobarchive api and write it to tempdir
                with prule.debug.Timing("prulethread.request_jobarchive", "PRINT_TIMING" in config.config):
                    job_tempdir_res, error_code = self.request_jobarchive(job.ccjobid)
                if error_code > 0:
                    error_code += 103000000
                if type(job_tempdir) == bool:
                    return ("failure-shutdown", error_code)
                elif type(job_tempdir) == str: # "wait"
                    return ("failure-wait", error_code)
                elif type(job_tempdir_res) == tempfile.TemporaryDirectory:
                    job_path = job_tempdir_res.name
                    job_tempdir = job_tempdir_res
    
            print("Job path:",job_path)
    
            # check if input files actually exist
            if os.path.exists(os.path.join(job_path, "meta.json")) == False or (os.path.exists(os.path.join(job_path, "data.json")) == False and os.path.exists(os.path.join(job_path, "data.json.gz")) == False):
                if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
                    try:
                        job_tempdir.cleanup()
                    except:
                        print("Cleaning up {} failed".format(job_tempdir.name))
                print("Process: job {} Missing files in {}".format(job.ccjobid, job_path))
                return ("failure-requeue", 104000000)
    
    
            result_json, process_time, error_code = self.prule_job({"job-dir":job_path})
    
            if error_code > 0:
                error_code += 105000000
    
            if result_json == None or result_json == False:
                if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
                    try:
                        job_tempdir.cleanup()
                    except:
                        print("Cleaning up {} failed".format(job_tempdir.name))
                if result_json == None:
                    return ("failure-shutdown", error_code)
                if result_json == False:
                    return ("failure-drop", error_code)
            if type(result_json) != dict:
                return ("failure-drop", 105000000)
    
            print("Process: job {} jobId {} time {:.6f}".format(job.ccjobid, job_slurmid, process_time))
            if self.processTerminated == True:
                print("Job {} process was terminated.".format(job.ccjobid))
                if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
                    try:
                        job_tempdir.cleanup()
                    except:
                        print("Cleaning up {} failed".format(job_tempdir.name))
                return ("failure-requeue", 0)
    
            # print process result
            if 'JOB_PROCESS_QUIET' in config.config and config.config['JOB_PROCESS_QUIET'] == False:
                print("Job {} process result {}".format(job.ccjobid, result_json))
            else:
                try:
                    print("Job {} process result, tags: {} err: {} {} fail: {} eval: {} noeval: {} evaltime: {:.6f}".format(
                        job.ccjobid,
                        len(result_json["tags"]),
                        result_json["error"],
                        len(result_json["errors"]),
                        len(result_json["rules_failed"]),
                        len(result_json["rules_evaluated"]),
                        len(result_json["rules_not_evaluated"]),
                        result_json["evaluation_time"]
                    ))
                except Exception as e:
                    traceback.print_exc()
                    print(e)
                    print("Job {} process result {}".format(job.ccjobid, result_json))
    
            # store result to JSON file
            if config.config["STORE_OUTPUT"] == True and result_json != None:
                # store result in output json
                job_result_path = os.path.join(config.config["OUTPUT_PATH"], "{}.json".format(job.ccjobid))
                try:
                    with open(job_result_path, "w") as outf:
                        json.dump(result_json, outf)
                except Exception as e:
                    traceback.print_exc()
                    print(e)
                    print("Failed to write result to {}".format(job_result_path))
                    if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
                        try:
                            job_tempdir.cleanup()
                        except:
                            print("Cleaning up {} failed".format(job_tempdir.name))
                    return ("failure-requeue", 106000000)
    
            # upload result to metadata api
            if config.config["API_METADATA"] == True and result_json != None:
                patho_message = prepare_patho_message(config, result_json)
                if patho_message != None:
                    with prule.debug.Timing("prulethread.request_metadata_upload", "PRINT_TIMING" in config.config):
                        res, error_code = self.request_metadata_upload(job.ccjobid, {"key":"issues","value":patho_message})
                    if error_code > 0:
                        error_code += 107000000
                    if type(res) == bool:
                        print("Job {} process failed to write metadata using API_METADATA".format(job.ccjobid))
                        process_result = "failure-shutdown"
                    else:
                        job_meta = res
    
            # upload tag
            if config.config["API_TAG"] == True and result_json != None:
                if len(result_json["tags"]) > 0:
                    # set default scope
                    for t in result_json["tags"]:
                        if "API_TAG_SCOPE" in self.config.config and self.config.config["API_TAG_SCOPE"] == True:
                            if "scope" not in t:
                                t["scope"] = "global"
                        else:
                            if "scope" in t:
                                t.pop("scope")
                    with prule.debug.Timing("prulethread.request_tag_job", "PRINT_TIMING" in config.config):
                        res, error_code = self.request_tag_job(job.ccjobid, result_json["tags"])
                    if error_code > 0:
                        error_code += 108000000
                    if res == False:
                        print("Job {} process failed to write tags using API_TAG".format(job.ccjobid))
                        process_result = "failure-shutdown"
    
    
            # save result to cache db
            if config.config["CACHE_DB"] == True:
                # load meta data from file if necessary
                if job_meta == None:
                    try:
                        with open(os.path.join(job_path, "meta.json"), "r") as meta_infile:
                            job_meta = json.load(meta_infile)
                    except Exception as e:
                        traceback.print_exc()
                        print(e)
                        print("Failed to load meta data from file {}".format(os.path.join(job_path, "meta.json")))
                        error_code = 109000000
    
                # load meta data from api if necessary
                if job_meta == None:
                    with prule.debug.Timing("prulethread.request_job_meta", "PRINT_TIMING" in config.config):
                        job_res, error_code = self.request_job_meta(job.ccjobid)
                    if error_code > 0:
                        error_code += 109000000
                    if type(job_res) != dict:
                        if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
                            try:
                                job_tempdir.cleanup()
                            except:
                                print("Cleaning up {} failed".format(job_tempdir.name))
                    if job_res == None:
                        return ("failure-shutdown", error_code)
                    elif type(job_res) == str:
                        if job_res == "job-failure":
                            return ("failure-drop", error_code)
                        if job_res == "wait":
                            return ("failure-wait", error_code)
                    elif type(job_res) == dict:
                        if type(job_res['Meta']) == dict:
                            job_meta = job_res["Meta"]
    
                # overwrite metadata in job from prule results
                if "metadata" in result_json and job_meta != None:
                    for key,value in result_json["metadata"].items():
                        job_meta[key] = result_json["metadata"][key]
    
                # save result to cache db
                try:
                    evaluated = "error" in result_json and result_json["error"] == False
                    with prule.debug.Timing("prulethread.db_insert_result", "PRINT_TIMING" in config.config):
                        if self.db_con != None:
                            self.db_con.db_insert_result(job.ccjobid, result_json, job_meta, process_time, 0 if evaluated else 1, int(datetime.datetime.now().timestamp()), error_code)
                        else:
                            return ("failure-shutdown", 11000000)
                except Exception as e:
                    traceback.print_exc()
                    print(e)
                    print("ERROR: db_insert_result failed for job ccid {}".format(job.ccjobid))
                    process_result = "failure-shutdown"
                    error_code = 110000000
    
            # cleanup temp directory
            if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
                try:
                    job_tempdir.cleanup()
                except:
                    print("Cleaning up {} failed".format(job_tempdir.name))
            return (process_result, error_code)
        def run_main(self) -> None:
            if self.config.config["CACHE_DB"] == True:
                self.db_con = prule.db.ResultsDB(self.config.config["DB_PATH"])
    
            # start prule subprocess
            self.prule_start()
    
            process_delay_min = config.config["JOB_PROCESS_DELAY_MIN"] if "JOB_PROCESS_DELAY_MIN" in config.config else 0
            process_delay_max = config.config["JOB_PROCESS_DELAY_MAX"] if "JOB_PROCESS_DELAY_MAX" in config.config else 960
    
            while self.stopThread == False:
    
                if self.stop_on_empty == True and queue.empty() == True:
                    self.config.signal_shutdown()
                    break
    
                cur_time = int(time.time())
                def smallest_backoff(item):
                    return item.backoff - (cur_time - item.first_check)
    
                # get new job from queue and block if empty
                job = queue.get_min(smallest_backoff)
    
                if job == "empty":
                    queue.wait_add() # wait for new jobs
                    continue
    
                if job == None:
                    if queue.stopped(): # queue was shut down
                        break
                    continue
    
                # sleep to match job process delay
                time_offset = cur_time - job.first_check
                if time_offset < job.backoff:
                    # sleep now or wait for jobs
                    print("PruleThread: delay {}".format(job.backoff - time_offset))
                    res = queue.wait_add(job.backoff - time_offset) # !! blocks without checking self.stopCondition
                    if res == None: # queue stopped
                        queue.add(job)
                        queue.deactivate_item(job)
                        break
                    if res == True:
                        queue.add(job) # other job was added to queue
                        queue.deactivate_item(job)
                        continue
                    # else timeout expired and job read for try
    
                if self.stopThread == True:
                    # put back job and return
                    queue.add(job)
                    queue.deactivate_item(job)
                    break
    
                # process job
                print("Process ", job.ccjobid)
                process_time_start = datetime.datetime.now().timestamp()
                result, process_error_code = self.processJob(job)
                process_time = datetime.datetime.now().timestamp()-process_time_start
    
                # Possible results:
                # "success"          - All fine
                # "failure-requeue"  - Temporary failure
                #                      Requeue job and move on
                # "failure-wait"     - Temporary failure
                #                      Requeue job, wait and move on
                # "failure-drop"     - Failure, expected to repeat for this job (e.g. rule processing failed)
                #                      Drop job, note process failure in CACHE_DB and move on
                # "failure-shutdown" - Failure, that is expected to need developer/administrator intervention
                #                      Requeue job and shutdown daemon
    
                print("Process job {} {} {}, time {:.6f}".format(job.ccjobid, result, process_error_code, process_time))
    
                if result == "failure-requeue" or result == "failure-wait" or result == "failure-shutdown":
                    if job.backoff < process_delay_max: # wait a maximum of 16 minutes
                        job.backoff = 15 if job.backoff == 0 else job.backoff * 2
                        queue.add(job)
                        queue.deactivate_item(job)
                    else:
                        print("Job {} failed too long after first check. Drop job.".format(job.ccjobid))
                        result = "failure-drop"
                if result == "failure-drop":
                    # save failure to cache db
                    if config.config["CACHE_DB"] == True:
                        try:
                            with prule.debug.Timing("prulethread.db_insert_failure", "PRINT_TIMING" in config.config):
                                if self.db_con != None:
                                    self.db_con.db_insert_failure(job.ccjobid, process_error_code, int(datetime.datetime.now().timestamp()))
                                else:
                                    raise Exception("Failed to open sqlite database")
                        except Exception as e:
                            traceback.print_exc()
                            print(e)
                            print("ERROR: db_insert_failure failed for job ccid {}, requeue".format(job.ccjobid))
                            queue.add(job)
                            queue.deactivate_item(job)
                            self.stopThread = True
                            self.config.signal_shutdown()
                    queue.deactivate_item(job)
                if result == "success":
                    queue.deactivate_item(job)
                    pass
    
                if result == "failure-shutdown":
                    self.stopThread = True
                    self.config.signal_shutdown()
    
                if result == "failure-wait":
                    # sleep in case CC service is not responsive
                    def threadStopped():
                        return self.stopThread
                    with self.stopCondition:
                        if self.stopThread == False:
                            print("CCCheckThread: sleep")
                            self.stopCondition.wait(30)
                            self.stopCondition.wait_for(threadStopped, 30)
    
            self.prule_stop()
    
            if self.config.config["CACHE_DB"] == True and self.db_con != None:
                self.db_con.close()
                self.db_con = None
        def run(self) -> None:
            try:
                self.run_main()
            except Exception as e:
                traceback.print_exc()
                print("PruleThread:",e)
                if self.config.shutdown == False:
                    self.config.signal_shutdown()
        def stop(self) -> None:
            with self.stopCondition:
                if self.currentProcess != None:
                    self.currentProcess.terminate()
                    self.processTerminated = True
                self.stopThread = True
                self.stopCondition.notify()
    
    
    
    
    if __name__ == "__main__":
    
        # block signals, so startup will not be interrupted
        signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT, signal.SIGTERM, signal.SIGUSR1, signal.SIGPIPE])
    
        parser = argparse.ArgumentParser(
            prog='prule daemon',
            description='Daemon service that waits for finished ClusterCockpit jobs and applies pathological job rules to measured metrics',
            argument_default=argparse.SUPPRESS
        )
    
        parser.add_argument('--args', help=argparse.SUPPRESS, action='store_true')
    
        parser.add_argument('--long-help', help="Print detailed help text", action='store_true')
        parser.add_argument('--config-path', type=str, help='Path to configuration file.')
        parser.add_argument('--no-check', help=argparse.SUPPRESS, action='store_true') # do not start job check thread
        parser.add_argument('--check-once', help=argparse.SUPPRESS, action='store_true') # run job check thread once
        parser.add_argument('--job-ids-file', type=str, help='Read job ids from file, process the jobs and quit. State file will be ignored. Check for new jobs will not be performed.')
        parser.add_argument('--job-ids-json-file', type=str, help='Read job ids from JSON file, process the jobs and quit. State file will be ignored. Check for new jobs will not be performed.')
        parser.add_argument('--db-path', type=str, help='Path to database file. Overwrites the DB_PATH value from the config file.')
        parser.add_argument('--cache-db', action='store_true', help='Store results in sqlite3 database. Overwrites the CACHE_DB value from the config file.')
        parser.add_argument('--job-process-quiet', action='store_true', help='Pass --quiet to prule and do not print full result json')
        parser.add_argument('--job-process-no-quiet', dest='job_process_quiet', action='store_false', help='Print full result json')
        parser.add_argument('--no-tmpdir-clean', action='store_true', help='Keep temporary directories')
        parser.add_argument('--print-timing', action='store_true', help='Print debug timings')
    
        args = vars(parser.parse_args())
    
        if "args" in args:
            print(args)
            sys.exit(1)
    
        if "long_help" in args:
            print(longhelp)
            sys.exit(0)
    
        # load config
        config = Config(threading.get_ident(), args["config_path"])
        config.load()
    
        # overwrite JOB_PROCESS_QUIET config variable
        if 'job_process_quiet' in args:
            config.config['JOB_PROCESS_QUIET'] = args['job_process_quiet']
    
        # overwrite DB_PATH config variable
        if 'db_path' in args:
            config.config['DB_PATH'] = args['db_path']
    
        # overwrite CACHE_DB config variable
        if 'cache_db' in args:
            config.config['CACHE_DB'] = True
    
        if 'print_timing' in args:
            config.config['PRINT_TIMING'] = True
    
        if 'no_tmpdir_clean' in args:
            config.config['NO_TMPDIR_CLEAN'] = True
    
        # load rules
        rules = []
        with open(config.config['PRULE_RULES_FILE_PATH'], 'r') as jsonf:
            rules_list = json.load(jsonf)
            for r in rules_list:
                rules.append(r["name"])
    
        # Open cache database and initialize it
        if config.config['CACHE_DB'] == True:
            db_con = prule.db.ResultsDB(config.config['DB_PATH'])
            if db_con.db_check() == False:
                db_con.db_initialize()
            db_con.db_update_rules(rules)
            db_con.close() # reopen sqlite connection again in the prule thread
    
        # read job ids from file
        job_ids_preset = []
        if 'job_ids_file' in args:
            with open(args['job_ids_file'], 'r') as f:
                process_delay = config.config["JOB_PROCESS_DELAY"] if "JOB_PROCESS_DELAY" in config.config else 0
                for line in f.readlines():
                    line = line.strip()
                    if len(line) > 0:
                        ccjid = int(line.strip())
                        item = JobQueueItem(ccjid)
                        item.first_check = item.first_check - process_delay
                        job_ids_preset.append(item)
            if len(job_ids_preset) == 0:
                print("No job ids found in job-id-file ", args['job_ids_file'], file=sys.stderr)
                sys.exit(1)
    
        # read job ids from JSON file
        if 'job_ids_json_file' in args:
            with open(args['job_ids_json_file'], 'r') as f:
                process_delay = config.config["JOB_PROCESS_DELAY"] if "JOB_PROCESS_DELAY" in config.config else 0
                data = json.load(f)
                for j in data:
                    item = JobQueueItem(j["id"])
                    item.metadata = j
                    item.first_check = item.first_check - process_delay
                    job_ids_preset.append(item)
            if len(job_ids_preset) == 0:
                print("No job ids found in job-id-json-file ", args['job_ids_json_file'], file=sys.stderr)
                sys.exit(1)
    
    
        # create queue and load state
        queue = JobQueue()
        if len(job_ids_preset) > 0:
            queue.add_all(job_ids_preset)
        else:
            if os.path.exists(config.config["STATE_PATH"]):
                ok = queue.loadFromJson(config.config["STATE_PATH"])
                if ok == False:
                    print("Failed to load state from path {} although file exists.".format(config.config["STATE_PATH"]))
                    print("Make sure the file is readable (and writable) and the folder to the file exists.")
                    sys.exit(1)
            else:
                print("State file not found: {}".format(config.config["STATE_PATH"]))
                ok = queue.saveToJson(config.config["STATE_PATH"])
                if ok == False:
                    print("Failed to save state to file {}.".format(config.config["STATE_PATH"]))
                    print("Make sure the file is writable and the folder to the file exists.")
                    sys.exit(1)
    
        # run check only once?
        cc_thread_check_once = True if "CC_CHECK_ONCE" in config.config and config.config["CC_CHECK_ONCE"] == True else False
        cc_thread_check_once = True if "check_once" in args else cc_thread_check_once
    
        # create threads
        cc_thread = CCCheckThread(config, queue, cc_thread_check_once)
        prule_thread = PruleThread(config, len(job_ids_preset) > 0, queue)
    
        # start threads
        prule_thread.start()
        if 'no_check' not in args and len(job_ids_preset) == 0:
            cc_thread.start()
    
        # wait for shutdown signal
        while True:
            if hasattr(signal, "sigwaitinfo"):
                # send CC jobid to daemon process using the following command
                # /bin/kill -s USR1 -q CC_JOBID -- DAEMON_PID
                # make sure not to use the bash internal kill command
                info = signal.sigwaitinfo([signal.SIGINT, signal.SIGTERM, signal.SIGUSR1])
                if info.si_signo != signal.SIGUSR1:
                    break
                queue.add((info.si_status, int(time.time())))
            else:
                signal.sigwait([signal.SIGINT, signal.SIGTERM])
                break
        config.shutdown = True
        print("Got stop signal")
        signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGINT, signal.SIGTERM])
    
        # stop queue and threads
        queue.stop()
        cc_thread.stop()
        prule_thread.stop()
    
        # wait for threads
        try:
            if cc_thread.is_alive():
                cc_thread.join(1)
        except:
            pass
        try:
            if prule_thread.is_alive():
                prule_thread.join(1)
        except:
            pass
    
        # save state
        if len(job_ids_preset) == 0:
            ok = queue.saveToJson(config.config["STATE_PATH"])
            if ok == False:
                sys.exit(1)
    
        sys.exit(0)