Select Git revision
default_plugins.config
__main__.py 59.50 KiB
#!/bin/env python -u
longhelp="""
Daemon that polls ClusterCockpit for newly finished jobs and executes the prule program to process the measurement data.
There are two threads that communicate using a queue.
The CCCheckThread asks regularly for jobs and recognizes newly finished jobs.
These jobs are added to the queue.
The PruleThread sequentially gets jobs from the queue and executes the prule program on them.
There are two important files: configuration file and state file.
The configuration file is a JSON file that contains a few configuration values.
The state file is a JSON file that contains the timestamps, used to recognize finished jobs, and the queue of unprocessed jobs.
On start, the state is restored from the state file and the daemon continues processing.
Example configuration file:
{
"CC_URL":"https://cc.example.com/",
"CC_TOKEN":"VGhpcyBzaG91bGQgbG9vayBsaWtlIHNvbWUgYmFzZTY0IHN0cmluZy4gQnV0IHRoYW5rcyBmb3IgY2hlY2tpbmcuIEhhdmUgYSBuaWNlIGRheSEgOikK",
"CC_CHECK_INTERVAL":30,
"JOB_PROCESS_DELAY_MIN":60,
"STATE_PATH":"/path/to/state_file.json",
"PRULE_PARAMETERS_FILE_PATH":"/path/to/file.json",
"PRULE_CLUSTERS_FILE_PATH":"/path/to/file.json",
"PRULE_RULES_FILE_PATH":"/path/to/file.json",
"JOBARCHIVE_PATH":"/path/to/job-archive/",
"OUTPUT_PATH":"/path/to/12345.json",
"DB_PATH":"/path/to/db.sqlite3",
"API_METADATA":false,
"STORE_OUTPUT":false,
"API_TAG":false,
"API_JOBARCHIVE":false,
"CACHE_DB":false
}
Configuration file description:
CC_URL URL to ClusterCockpit endpoint
CC_TOKEN ClusterCockpit authentication token for REST API
CC_CHECK_INTERVAL Integer, Seconds to wait between checking for finished jobs
CC_CHECK_ONCE Boolean, If true, job check is run only once and the daemon exits after all queued jobs are processed
JOB_PROCESS_DELAY_MIN Integer, Seconds to wait after a job finishes before processing data. Default: 0
CC_CHECK_INTERVAL is may be added to total delay.
JOB_PROCESS_DELAY_MAX Integer, Maximal seconds to wait for last try, before giving up on job. Default: 960
Exponential backoff is used to wait for failing jobs.
STATE_PATH Path to state JSON file
PRULE_PARAMETERS_FILE_PATH Path to parameters JSON file
PRULE_CLUSTERS_FILE_PATH Path to clusters JSON file
PRULE_RULES_FILE_PATH Path to rule JSON file
JOBARCHIVE_PATH Path to cc-backend archive directory, used in case API_METADATA is false
OUTPUT_PATH Path to storing result files, used in case STORE_OUTPUT is true
DB_PATH Path to sqlite3 file for storing results, used in case CACHE_DB is true
API_METADATA Boolean, If true, the ClusterCockpit REST API is used to store the results as JSON in the job metadata
STORE_OUTPUT Boolean, If true, the results are stored as result files (see OUTPUT_PATH)
API_TAG Boolean, If true, the ClusterCockpit REST API is used to store tags for the jobs
API_JOBARCHIVE Boolean, If true, the ClusterCockpit REST API is used to load job archive data,
if false, the job archive data is read from the filesystem (see JOBARCHIVE_PATH)
CACHE_DB Boolean, If true, the results are stored in a sqlite3 database (see DB_PATH)
API_TAG_SCOPE Boolean, If true, uses scopes and adds default 'global' scope (if missing),
if false, the scopes are removed from the tags (default: false)
METADATA_MESSAGE HTML insert added to the metadata
JOB_PROCESS_QUIET Boolean, If true, pass --quiet to prule and do not print full result json,
if false print result json (default: true)
PRINT_TIMING Boolean, If true, print debug timing information
Example state file:
{
"smallest_starttime":123,
"last_check":123,
"queue":[]
}
"""
import typing
import os.path
import sys
import argparse
import json
import traceback
import urllib.request
import urllib.error
import time
import tempfile
import sqlite3
import datetime
import html
import copy
import signal
import threading
import subprocess
import concurrent.futures
import prule.db
import prule.debug
#TODO: Add idle behavior: In case no jobs need to be processed, old jobs can be processed. Add backwards updated search times.
#TODO: Add metadata fetch before job processing. If patho results already existent, skip job.
config_keys = ["CC_URL", "CC_TOKEN", "CC_CHECK_INTERVAL", "STATE_PATH", "PRULE_PARAMETERS_FILE_PATH", "PRULE_CLUSTERS_FILE_PATH", "PRULE_RULES_FILE_PATH", "JOBARCHIVE_PATH", "OUTPUT_PATH", "API_METADATA", "API_TAG", "API_JOBARCHIVE", "CACHE_DB", "DB_PATH", "STORE_OUTPUT"]
config_types = [str, str, int, str, str, str, str, str, str, bool, bool, bool, bool, str, bool, int]
"""
Simply loads and holds a json.
"""
class Config:
def __init__(self, main_tid: int, path: str):
self.path :str = path
self.config :dict = {}
self.main_tid :int = main_tid
self.shutdown :bool = False
def load(self) -> None:
data = None
with open(self.path, "r") as f:
data = json.load(f)
for i,c in enumerate(config_keys):
if c not in data:
raise Exception("Key {} not found in configuration file loaded from {}.".format(c, self.path))
if type(data[c]) != config_types[i]:
raise Exception("Key {} in configuration file has wrong type {}. It should be of type {}.".format(c, type(data[c]), config_types[i]))
config.config = data
def signal_shutdown(self) -> None:
if self.shutdown == False:
self.shutdown = True
signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown
"""
Create the message that is inserted into the ClusterCockpit UI.
"""
def prepare_patho_message(config: Config, result_json: dict) -> typing.Optional[str]:
if len(result_json["tags"]) == 0:
return None
message = "<ul>"
for rule in result_json["rules"]:
if rule["match"] == False:
continue
# Escape HTML: https://wiki.python.org/moin/EscapingHtml
# First escape & < >
# Second transform non-ascii to xml-escape-sequences (creates a bytes object)
# Third transform it back to a str object
type_escaped = html.escape(rule["tag"]["type"]).encode('ascii', 'xmlcharrefreplace').decode()
name_escaped = html.escape(rule["tag"]["name"]).encode('ascii', 'xmlcharrefreplace').decode()
temp_escaped = html.escape(rule["template"]).encode('ascii', 'xmlcharrefreplace').decode()
message += "<li><b>{}: {}:</b> {}</li>".format(type_escaped, name_escaped, temp_escaped)
message += "</ul>"
if "METADATA_MESSAGE" in config.config:
message += config.config["METADATA_MESSAGE"]
return message
class JobQueueItem:
def __init__(self, ccjobid: int, metadata: typing.Optional[dict] =None, first_check: typing.Optional[int] =None):
self.ccjobid :int = ccjobid
self.metadata :typing.Optional[dict] = metadata
self.first_check :int = int(time.time()) if first_check == None else first_check
self.backoff :int = 0
def toDict(self) -> dict:
return {"ccjobid":self.ccjobid, "metadata":self.metadata, "first_check":self.first_check, "backoff":self.backoff}
@staticmethod
def fromDict(d: dict) -> typing.Optional['JobQueueItem']:
if type(d) != dict:
return None
item = JobQueueItem(0)
if "ccjobid" not in d or type(d["ccjobid"]) != int:
return None
item.ccjobid = d["ccjobid"]
item.metadata = d["metadata"] if "metadata" in d else None
item.first_check = d["first_check"] if "first_check" in d else 0
item.backoff = d["backoff"] if "backoff" in d else 0
return item
def __eq__(self, other) -> bool:
if type(other) != JobQueueItem:
return False
return self.ccjobid == other.ccjobid
def __str__(self) -> str:
return "<JobQueueItem ccjobid={} metadata={} first_check={} backoff={}>".format(self.ccjobid, self.metadata != None, self.first_check, self.backoff)
"""
Holds the known jobs of the daemon.
The queue itself is thread-safe, filled by CCCheckThread and emptied by PruleThread.
Additionally, there are multiple maps containing information about running jobs.
"""
class JobQueue:
def __init__(self):
self.lock = threading.Lock()
self.condition = threading.Condition(lock=self.lock)
self.stopQueue = False
self.running_ids_set = set()
self.stop_once_empty = False
# attributes that are stored
self.queue = []
self.queue_active = []
self.smallest_starttime = 0
self.last_check = 0
def loadFromJson(self, path: str) -> bool:
data = None
try:
with open(path, "r") as f:
data = json.load(f)
except Exception as e:
traceback.print_exc()
print(e)
return False
self.smallest_starttime = data["smallest_starttime"]
self.last_check = data["last_check"]
self.queue = []
for d in data["queue"]:
i = JobQueueItem.fromDict(d)
if i == None:
raise Exception("Invalid item in JobQueue JSON {}".format(d))
self.queue.append(i)
print("Loaded state: ", data)
return True
def saveToJson(self, path: str) -> bool:
data = {}
data["smallest_starttime"] = self.smallest_starttime
data["last_check"] = self.last_check
data["queue"] = []
for i in self.queue:
data["queue"].append(i.toDict())
for i in self.queue_active:
data["queue"].append(i.toDict())
try:
with open(path, "w") as f:
json.dump(data, f)
print("Saved state to {}.".format(path))
except Exception as e:
traceback.print_exc()
print(e)
print("Failed to save the state to {}:".format(path))
print(data)
return False
return True
def stop(self) -> None:
with self.condition:
self.stopQueue = True
self.condition.notify_all()
def stopped(self) -> bool:
return self.stopQueue == True or (len(self.queue) == 0 and self.stop_once_empty == True)
def add(self, v, stop_once_empty: bool =False) -> None:
if v == None and stop_once_empty == False: # prevent adding None value
return
with self.condition:
if v != None:
self.queue.append(v)
if stop_once_empty == True:
self.stop_once_empty = True
self.condition.notify()
def add_all(self, l: typing.Optional[typing.Sequence], stop_once_empty: bool =False) -> None:
if (l == None or len(l) == 0) and stop_once_empty == False:
return
with self.condition:
if l != None and len(l) > 0:
for v in l:
if v != None:
self.queue.append(v)
if stop_once_empty == True:
self.stop_once_empty == True
self.condition.notify()
def clear(self) -> typing.Sequence:
with self.condition:
values = self.queue
self.queue = []
return values
def peek(self):
with self.condition:
while len(self.queue) == 0 and self.stopQueue == False and self.stop_once_empty == False:
self.condition.wait()
if self.stopQueue == True or (len(self.queue) == 0 and self.stop_once_empty == True):
return None
return self.queue[0]
def empty(self) -> bool:
with self.condition:
return len(self.queue) == 0
def size(self) -> int:
with self.condition:
return len(self.queue)
def wait_add(self, timeout: typing.Optional[float] =None) -> typing.Optional[bool]:
# return True if condition is notified
# return False if timeout expires
# return None if queue is stopped
if self.stopQueue == True or (len(self.queue) == 0 and self.stop_once_empty == True):
return None
with self.condition:
if self.stopQueue == True:
return None
return self.condition.wait(timeout)
def safe_fun(self, fun: typing.Callable):
with self.condition:
queue_copy = copy.copy(self.queue)
return fun(queue_copy)
def get_min(self, min_select: typing.Callable):
# select the item with the smallest value or next item with value <= 0
# does not block if queue is empty !
# Moves item automatically into queue_active
min_value = 999999999
index = -1
with self.condition:
if len(self.queue) == 0:
return "empty"
# find item with smallest value
for ix,i in enumerate(self.queue):
value = min_select(i)
if value <= 0:
min_value = value
index = ix
break
if value < min_value:
min_value = value
index = ix
if index != -1:
value = self.queue.pop(index)
self.queue_active.append(value)
return value
return None
def get(self):
# Waits until the queue is not empty or the queue is shut down
# Returns a value if the queue is not empty or None if the queue is shut down
# Moves item automatically into queue_active
with self.condition:
while len(self.queue) == 0 and self.stopQueue == False and self.stop_once_empty == False:
self.condition.wait()
if self.stopQueue == True or (len(self.queue) == 0 and self.stop_once_empty == True):
return None
value = self.queue[0]
self.queue = self.queue[1:]
self.queue_active.append(value)
return value
def deactivate_item(self, item) -> typing.Optional[bool]:
with self.condition:
if self.stopQueue == True:
return None
try:
self.queue_active.remove(item)
return True
except ValueError as e:
traceback.print_exc()
print("JobQueueItem {} to deactivate not found in active queue".format(item))
return False
"""
Checks for newly finished jobs and puts them into the queue.
"""
class CCCheckThread(threading.Thread):
def __init__(self, config: Config, queue: JobQueue, check_once: bool =False):
self.config = config
self.queue = queue
threading.Thread.__init__(self)
self.stopThread = False
self.stopCondition = threading.Condition()
self.requestFuture :typing.Optional[concurrent.futures.Future] = None
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
self.check_once = check_once
def request_jobs(self, starttime: typing.Optional[int], state: typing.Optional[str], page: typing.Optional[int], items_per_page: int) -> typing.Optional[dict]:
tempfile = None
jobs = None
try:
params = []
if starttime != None:
params.append("start-time={}-9999999999".format(starttime))
if state != None:
params.append("state={}".format(state))
if page != None:
params.append("page={}".format(page))
params.append("items-per-page={}".format(items_per_page))
params_str = "&".join(params)
items = ""
url = config.config["CC_URL"]+"/api/jobs/?"+params_str
headers = {}
headers["Access-Control-Request-Headers"] = "x-auth-token"
headers["X-Auth-Token"] = config.config["CC_TOKEN"]
#print("CCheckThread: Request {}".format(url))
req = urllib.request.Request(url, headers=headers, method="GET")
def execRequest(req: urllib.request.Request) -> typing.Optional[dict]:
try:
with urllib.request.urlopen(req, timeout=10) as response:
if response.status == 200:
jobs = json.load(response)
#print(jobs)
return jobs
else:
if response.status == 401:
print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
self.config.signal_shutdown()
print(response.status)
return None
except urllib.error.HTTPError as e:
print("Error {} for URL {}".format(e.code, e.url))
if e.code == 401:
print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
self.config.signal_shutdown()
return None
except Exception as e:
traceback.print_exc()
print(e)
#traceback.print_exc()
return None
with self.stopCondition:
self.requestFuture = self.executor.submit(execRequest, req)
except Exception as e:
traceback.print_exc()
print(e)
return None
#await self.requestTask
try:
jobs = self.requestFuture.result()
except concurrent.futures.CancelledError as e:
jobs = None
with self.stopCondition:
self.requestTask = None
self.requestFuture = None
return jobs
def get_jobs(self, starttime: int, state: typing.Optional[str]) -> typing.Optional[list]:
jobs: typing.Optional[list] = []
items_per_page = 10000
ret = 0
page = 0
startTime = time.time()
# request pages until no jobs are returned anymore
while self.stopThread == False and (page < 1 or ret > 0):
page += 1
with prule.debug.Timing("ccthread.request_jobs", "PRINT_TIMING" in config.config):
newjobs = self.request_jobs(starttime, state, page, items_per_page)
if newjobs == None:
jobs = None
break
ret = len(newjobs["jobs"])
jobs += newjobs["jobs"]
if ret < items_per_page:
break
#print("Page {} Jobs {}".format(page, ret))
stopTime = time.time()
print("Request {} jobs using {} requests in {:.3} seconds".format(len(jobs) if jobs != None else None, page, stopTime-startTime))
return jobs
def check(self) -> None:
# current check
smallest_starttime = sys.maxsize
new_running_ids_set = set()
cur_time = int(time.time())
# Get all jobs started later than smallest_starttime.
jobs = self.get_jobs(queue.smallest_starttime, "running")
if jobs == None:
return
jobs = []
print("Got {} jobs".format(len(jobs)))
# fill running job set and compute smallest_starttime
for ix,j in enumerate(jobs):
new_running_ids_set.add((j["id"], j["jobId"], j["cluster"], j["startTime"]))
smallest_starttime = min(smallest_starttime, j["startTime"])
# compare running jobs with last check's running jobs to get finished jobs
finished_ids_set = queue.running_ids_set - new_running_ids_set
# get complete list of jobs started since last check and gather missing finished jobs
# this has two reasons:
# - find finished jobs that started AND finished since the last check (these won't show up in the running jobs lists)
# - get all jobs that finished since the daemon was shut down
# Do not do this, if there was no last_check, else you would get ALL jobs from the database.
if queue.last_check > 0:
jobs = self.get_jobs(queue.last_check, None)
if jobs == None:
return
for ix,j in enumerate(jobs):
# possible states: running, completed, failed, cancelled, stopped, timeout
if j["jobState"] != "running":
finished_ids_set.add((j["id"], j["jobId"], j["cluster"], j["startTime"]))
print("Running jobs {} Finished jobs {} Previous smallest starttime {} Smallest starttime {} Previous queue size {}".format(len(new_running_ids_set), len(finished_ids_set), queue.smallest_starttime, smallest_starttime, queue.size()))
# add found jobs to queue
if len(finished_ids_set) > 0:
q_ids = []
for j in finished_ids_set:
item = JobQueueItem(j[0], first_check=cur_time) # ccid, jobid, cluster, startTime
# set initial delay
item.backoff = config.config["JOB_PROCESS_DELAY_MIN"] if "JOB_PROCESS_DELAY_MIN" in config.config else 0
item.metadata = {}
item.metadata["id"] = j[0]
item.metadata["jobId"] = j[1]
item.metadata["cluster"] = j[2]
item.metadata["startTime"] = j[3]
q_ids.append(item)
queue.add_all(q_ids)
queue.running_ids_set = new_running_ids_set
queue.last_check = cur_time
if smallest_starttime < sys.maxsize:
queue.smallest_starttime = smallest_starttime
def run_main(self) -> None:
while self.stopThread == False:
# check CC for finished jobs and add them to queue
print("CCCheckThread")
self.check()
if self.check_once == True:
queue.add(None, stop_once_empty = True)
break
# sleep
def threadStopped():
return self.stopThread
with self.stopCondition:
if self.stopThread == False:
print("CCheckThread: sleep")
self.stopCondition.wait_for(threadStopped, config.config["CC_CHECK_INTERVAL"])
# thread done
print("CCCheckThread Done")
self.executor.shutdown(wait=False)
def run(self) -> None:
try:
self.run_main()
except Exception as e:
traceback.print_exc()
print("CCCheckThread",e)
if self.config.shutdown == False:
self.config.signal_shutdown()
def stop(self) -> None:
with self.stopCondition:
print("Stop CCCheckThread")
self.stopThread = True
if self.requestFuture != None and self.requestFuture.done() == False:
self.requestFuture.cancel()
#self.requestFuture.set_result(None)
self.stopCondition.notify()
"""
Gets a job from the queue and starts the prule program.
"""
class PruleThread(threading.Thread):
def __init__(self, config: Config, stop_on_empty: bool, queue: JobQueue):
self.config = config
self.queue = queue
threading.Thread.__init__(self)
self.stopThread = False
self.stopCondition = threading.Condition()
self.stop_on_empty = stop_on_empty
self.currentProcess: typing.Optional[subprocess.Popen] = None
self.processTerminated = False
self.db_con: typing.Optional[prule.db.ResultsDB] = None
def request_job_meta(self, id: int) -> typing.Tuple[typing.Union[None, str, dict], int]:
url = config.config["CC_URL"]+"/api/jobs/{}".format(id)
headers = {}
headers["Access-Control-Request-Headers"] = "x-auth-token"
headers["X-Auth-Token"] = config.config["CC_TOKEN"]
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data="[]".encode("UTF-8"), headers=headers, method="POST")
try:
with urllib.request.urlopen(req, timeout=10) as response:
try:
if int(response.headers.get('content-length')) == 0:
print("request_job_meta: empty response")
return ("job-failure", 1)
except:
pass
if response.status == 200:
job_meta = json.load(response)
return (job_meta, 0)
print("Error {} for URL {}".format(response.status, url))
return (None, response.status if response.status > 0 else 999)
except urllib.error.HTTPError as e:
print("Error {} for URL {}".format(e.code, e.url))
if e.code >= 500 or e.code == 429: # internal server error or too many requests
return ("wait", e.code)
if e.code == 401:
print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
self.config.signal_shutdown()
return (None, 401)
return (None, e.code if e.code > 0 else 999)
except Exception as e: # something went horribly wrong
traceback.print_exc()
print("request_job_meta",e)
return (None, 999)
return (None, 990)
def request_tag_job(self, id: int, tags: dict) -> typing.Tuple[bool, int]:
#[{"type":"foo","name":"bar"},{"type":"asdf","name":"fdsa"}]
url = config.config["CC_URL"]+"/api/jobs/tag_job/{}".format(id)
headers = {}
headers["Access-Control-Request-Headers"] = "x-auth-token"
headers["X-Auth-Token"] = config.config["CC_TOKEN"]
headers["Content-Type"] = "application/json"
data = json.dumps(tags)
req = urllib.request.Request(url, data=data.encode('UTF-8'), headers=headers, method="POST")
try:
with urllib.request.urlopen(req, timeout=10) as response:
if response.status == 200:
return (True, 0)
print("Error {} for URL {}".format(response.status, url))
except urllib.error.HTTPError as e: # raised for all non-2XX http codes
msg = ""
try:
msg = e.fp.read().decode('utf-8', 'ignore')
except:
pass
print("Error {} for URL {} Reason {} Msg {}".format(e.code, e.url, e.reason, msg))
if e.code == 401:
print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
self.config.signal_shutdown()
return (False, 401)
if e.code == 500 and "Duplicate entry" in msg: # TODO: Tag is inserted twice. Fix once tag removal is possible.
return (True, 0)
if e.code == 500 and "UNIQUE constraint failed: jobtag.job_id, jobtag.tag_id" in msg: # TODO: Tag is inserted twice. Fix once tag removal is possible.
return (True, 0)
return (False, e.code if e.code > 0 else 999)
except Exception as e: # something went horribly wrong
traceback.print_exc()
print(e)
return (False, 999)
return (False, 990)
def request_jobarchive(self, id:int ) -> typing.Tuple[typing.Union[bool, tempfile.TemporaryDirectory, str], int]:
url = config.config["CC_URL"]+"/api/jobs/{}?all-metrics=true".format(id)
headers = {}
headers["Access-Control-Request-Headers"] = "x-auth-token"
headers["X-Auth-Token"] = config.config["CC_TOKEN"]
req = urllib.request.Request(url, headers=headers, method="GET")
tdir = tempfile.TemporaryDirectory(prefix="prule_jobarchive_{}_".format(id), delete="NO_TMPDIR_CLEAN" not in self.config.config)
try:
with urllib.request.urlopen(req, timeout=10) as response:
if response.status == 200:
data = json.load(response)
#print(data)
data_path = os.path.join(tdir.name, "data.json")
meta_path = os.path.join(tdir.name, "meta.json")
with open(data_path, "w") as f:
json.dump(data["Data"], f)
with open(meta_path, "w") as f:
json.dump(data["Meta"], f)
return (tdir, 0)
print("Error {} for URL {}".format(response.status, url))
except urllib.error.HTTPError as e: # raised for all non-2XX http codes
print("Error {} for URL {}".format(e.code, e.url))
if e.code == 401:
print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
self.config.signal_shutdown()
return ("wait", 401)
return (False, e.code if e.code > 0 else 999)
except Exception as e: # something went horribly wrong
traceback.print_exc()
print(e)
try:
tdir.cleanup()
except:
print("Cleaning up {} failed".format(tdir.name))
return (False, 999)
return (False, 990)
def request_metadata_upload(self, id: int, metadata: dict) -> typing.Tuple[typing.Union[bool, dict], int]:
url = config.config["CC_URL"]+"/api/jobs/edit_meta/{}".format(id)
headers = {}
headers["Access-Control-Request-Headers"] = "x-auth-token"
headers["X-Auth-Token"] = config.config["CC_TOKEN"]
headers["Content-Type"] = "application/json"
data = json.dumps(metadata)
req = urllib.request.Request(url, data=data.encode('UTF-8'), headers=headers, method="POST")
try:
with urllib.request.urlopen(req, timeout=10) as response:
if response.status == 200:
return (json.load(response), 0)
print("Error {} for URL {}".format(response.status, url))
except urllib.error.HTTPError as e: # raised for all non-2XX http codes
print("Error {} for URL {}".format(e.code, e.url))
if e.code == 401:
print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
self.config.signal_shutdown()
return (False, 401)
return (False, e.code if e.code > 0 else 999)
except Exception as e: # something went horribly wrong
traceback.print_exc()
print(e)
return (False, 999)
return (False, 990)
def prule_start(self) -> None:
params = ["python3","-u","-m","prule"]
params += ["--parameters-file", config.config["PRULE_PARAMETERS_FILE_PATH"]]
params += ["--clusters-file", config.config["PRULE_CLUSTERS_FILE_PATH"]]
params += ["--rules-file", config.config["PRULE_RULES_FILE_PATH"]]
params += ["--log-stderr"]
params += ["--job-stdin"]
params += ["--overwrite"]
if 'JOB_PROCESS_QUIET' not in config.config or config.config['JOB_PROCESS_QUIET'] == True:
params += ["--quiet"]
def preexec():
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGINT, signal.SIGTERM, signal.SIGUSR1, signal.SIGPIPE])
with self.stopCondition:
if self.currentProcess == None:
self.currentProcess = subprocess.Popen(params, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=sys.stderr, preexec_fn = preexec)
print("Prule process {} started".format(self.currentProcess.pid))
def prule_job(self, job: dict) -> typing.Tuple[typing.Union[None, dict, bool], float, int]:
if self.currentProcess == None:
return (None, 0.0, 10)
process_time_start = datetime.datetime.now().timestamp()
tries = 0
result = None
while tries <2:
try:
data = json.dumps(job) + "\n"
if self.currentProcess.stdin != None:
self.currentProcess.stdin.write(data.encode("utf-8"))
self.currentProcess.stdin.flush()
if self.currentProcess.stdout != None:
line = self.currentProcess.stdout.readline()
result = json.loads(line)
break
except Exception as e:
traceback.print_exc()
print(e)
if self.stopThread == True:
return (None, 0.0, 20)
if tries == 0:
self.prule_restart()
tries += 1
continue
self.prule_restart()
return (None, 0.0, 30)
process_time_stop = datetime.datetime.now().timestamp()
process_time = process_time_stop - process_time_start
if type(result) != dict or len(result) == 0:
return (False, process_time, 40)
return (result, process_time, 0)
def prule_restart(self) -> None:
self.prule_stop()
self.prule_start()
def prule_stop(self) -> typing.Optional[int]:
proc = None
with self.stopCondition:
if self.currentProcess == None:
return None
proc = self.currentProcess
try:
if self.currentProcess.stdin != None:
self.currentProcess.stdin.close()
except:
pass
try:
self.currentProcess.terminate()
except:
pass
returncode = self.currentProcess.wait()
print("Prule process {} stopped, returncode {}".format(self.currentProcess.pid, returncode))
with self.stopCondition:
self.currentProcess = None
return returncode
# job: "id" - CC database id, not "jobId", which is the SLURM job id
def processJob(self, job: JobQueueItem) -> typing.Tuple[str, int]:
# track process error
process_result = "success"
error_code = 0
# complete metadata (later used for entry in sqlite database)
job_meta = None
# slurmid, cluster, startTime necessary for loading of jobarchive from filesystem
job_cluster = None
job_slurmid = None
job_startTime = None
if job.metadata != None:
if "jobId" in job.metadata and "cluster" in job.metadata and "startTime" in job.metadata:
job_cluster = job.metadata["cluster"]
job_slurmid = job.metadata["jobId"]
job_startTime = str(job.metadata["startTime"]) if type(job.metadata["startTime"]) == int else str(int(datetime.datetime.fromisoformat(job.metadata['startTime']).timestamp()))
if "duration" in job.metadata: # assume the rest is also present in job.metadata
job_meta = job.metadata
# check if already processed
already_processed = False
if config.config["CACHE_DB"] == True:
try:
with prule.debug.Timing("prulethread.db_get_result", "PRINT_TIMING" in config.config):
if self.db_con != None:
old_result = self.db_con.db_get_result(job.ccjobid)
else:
error_code = 101000000
return ("failure-shutdown", error_code)
except:
error_code = 101000000
return ("failure-shutdown", error_code)
if len(old_result) > 0:
already_processed = already_processed or old_result[-1]["evaluated"] == True
if already_processed == True:
print("skip, already_processed",job.ccjobid)
return ("success", error_code)
# retrieve current job metadata if needed
if job_cluster == None or job_slurmid == None or job_startTime == None:
with prule.debug.Timing("prulethread.request_job_meta", "PRINT_TIMING" in config.config):
job_res, error_code = self.request_job_meta(job.ccjobid)
if error_code > 0:
error_code += 102000000
if job_res == None:
return ("failure-shutdown", error_code)
elif type(job_res) == str:
if job_res == "job-failure":
return ("failure-drop", error_code)
if job_res == "wait":
return ("failure-wait", error_code)
elif type(job_res) == dict:
if type(job_res['Meta']) == dict:
job_meta = job_res['Meta']
if job_meta == None:
return ("failure-drop", 102000000)
job_cluster = job_meta["cluster"]
job_slurmid = job_meta["jobId"]
job_startTime = str(job_meta["startTime"]) if type(job_meta["startTime"]) == int else str(int(datetime.datetime.fromisoformat(job_meta['startTime']).timestamp()))
# prepare job path for filesystem access or download jobarchive from API
job_path = ""
job_tempdir = None
if config.config["API_JOBARCHIVE"] == False:
# Load job from filesystem
job_path = os.path.join(config.config["JOBARCHIVE_PATH"], job_cluster, str(job_slurmid)[:-3], str(job_slurmid)[-3:], job_startTime)
else:
# Load job from jobarchive api and write it to tempdir
with prule.debug.Timing("prulethread.request_jobarchive", "PRINT_TIMING" in config.config):
job_tempdir_res, error_code = self.request_jobarchive(job.ccjobid)
if error_code > 0:
error_code += 103000000
if type(job_tempdir) == bool:
return ("failure-shutdown", error_code)
elif type(job_tempdir) == str: # "wait"
return ("failure-wait", error_code)
elif type(job_tempdir_res) == tempfile.TemporaryDirectory:
job_path = job_tempdir_res.name
job_tempdir = job_tempdir_res
print("Job path:",job_path)
# check if input files actually exist
if os.path.exists(os.path.join(job_path, "meta.json")) == False or (os.path.exists(os.path.join(job_path, "data.json")) == False and os.path.exists(os.path.join(job_path, "data.json.gz")) == False):
if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
try:
job_tempdir.cleanup()
except:
print("Cleaning up {} failed".format(job_tempdir.name))
print("Process: job {} Missing files in {}".format(job.ccjobid, job_path))
return ("failure-requeue", 104000000)
result_json, process_time, error_code = self.prule_job({"job-dir":job_path})
if error_code > 0:
error_code += 105000000
if result_json == None or result_json == False:
if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
try:
job_tempdir.cleanup()
except:
print("Cleaning up {} failed".format(job_tempdir.name))
if result_json == None:
return ("failure-shutdown", error_code)
if result_json == False:
return ("failure-drop", error_code)
if type(result_json) != dict:
return ("failure-drop", 105000000)
print("Process: job {} jobId {} time {:.6f}".format(job.ccjobid, job_slurmid, process_time))
if self.processTerminated == True:
print("Job {} process was terminated.".format(job.ccjobid))
if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
try:
job_tempdir.cleanup()
except:
print("Cleaning up {} failed".format(job_tempdir.name))
return ("failure-requeue", 0)
# print process result
if 'JOB_PROCESS_QUIET' in config.config and config.config['JOB_PROCESS_QUIET'] == False:
print("Job {} process result {}".format(job.ccjobid, result_json))
else:
try:
print("Job {} process result, tags: {} err: {} {} fail: {} eval: {} noeval: {} evaltime: {:.6f}".format(
job.ccjobid,
len(result_json["tags"]),
result_json["error"],
len(result_json["errors"]),
len(result_json["rules_failed"]),
len(result_json["rules_evaluated"]),
len(result_json["rules_not_evaluated"]),
result_json["evaluation_time"]
))
except Exception as e:
traceback.print_exc()
print(e)
print("Job {} process result {}".format(job.ccjobid, result_json))
# store result to JSON file
if config.config["STORE_OUTPUT"] == True and result_json != None:
# store result in output json
job_result_path = os.path.join(config.config["OUTPUT_PATH"], "{}.json".format(job.ccjobid))
try:
with open(job_result_path, "w") as outf:
json.dump(result_json, outf)
except Exception as e:
traceback.print_exc()
print(e)
print("Failed to write result to {}".format(job_result_path))
if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
try:
job_tempdir.cleanup()
except:
print("Cleaning up {} failed".format(job_tempdir.name))
return ("failure-requeue", 106000000)
# upload result to metadata api
if config.config["API_METADATA"] == True and result_json != None:
patho_message = prepare_patho_message(config, result_json)
if patho_message != None:
with prule.debug.Timing("prulethread.request_metadata_upload", "PRINT_TIMING" in config.config):
res, error_code = self.request_metadata_upload(job.ccjobid, {"key":"issues","value":patho_message})
if error_code > 0:
error_code += 107000000
if type(res) == bool:
print("Job {} process failed to write metadata using API_METADATA".format(job.ccjobid))
process_result = "failure-shutdown"
else:
job_meta = res
# upload tag
if config.config["API_TAG"] == True and result_json != None:
if len(result_json["tags"]) > 0:
# set default scope
for t in result_json["tags"]:
if "API_TAG_SCOPE" in self.config.config and self.config.config["API_TAG_SCOPE"] == True:
if "scope" not in t:
t["scope"] = "global"
else:
if "scope" in t:
t.pop("scope")
with prule.debug.Timing("prulethread.request_tag_job", "PRINT_TIMING" in config.config):
res, error_code = self.request_tag_job(job.ccjobid, result_json["tags"])
if error_code > 0:
error_code += 108000000
if res == False:
print("Job {} process failed to write tags using API_TAG".format(job.ccjobid))
process_result = "failure-shutdown"
# save result to cache db
if config.config["CACHE_DB"] == True:
# load meta data from file if necessary
if job_meta == None:
try:
with open(os.path.join(job_path, "meta.json"), "r") as meta_infile:
job_meta = json.load(meta_infile)
except Exception as e:
traceback.print_exc()
print(e)
print("Failed to load meta data from file {}".format(os.path.join(job_path, "meta.json")))
error_code = 109000000
# load meta data from api if necessary
if job_meta == None:
with prule.debug.Timing("prulethread.request_job_meta", "PRINT_TIMING" in config.config):
job_res, error_code = self.request_job_meta(job.ccjobid)
if error_code > 0:
error_code += 109000000
if type(job_res) != dict:
if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
try:
job_tempdir.cleanup()
except:
print("Cleaning up {} failed".format(job_tempdir.name))
if job_res == None:
return ("failure-shutdown", error_code)
elif type(job_res) == str:
if job_res == "job-failure":
return ("failure-drop", error_code)
if job_res == "wait":
return ("failure-wait", error_code)
elif type(job_res) == dict:
if type(job_res['Meta']) == dict:
job_meta = job_res["Meta"]
# overwrite metadata in job from prule results
if "metadata" in result_json and job_meta != None:
for key,value in result_json["metadata"].items():
job_meta[key] = result_json["metadata"][key]
# save result to cache db
try:
evaluated = "error" in result_json and result_json["error"] == False
with prule.debug.Timing("prulethread.db_insert_result", "PRINT_TIMING" in config.config):
if self.db_con != None:
self.db_con.db_insert_result(job.ccjobid, result_json, job_meta, process_time, 0 if evaluated else 1, int(datetime.datetime.now().timestamp()), error_code)
else:
return ("failure-shutdown", 11000000)
except Exception as e:
traceback.print_exc()
print(e)
print("ERROR: db_insert_result failed for job ccid {}".format(job.ccjobid))
process_result = "failure-shutdown"
error_code = 110000000
# cleanup temp directory
if job_tempdir != None and "NO_TMPDIR_CLEAN" not in config.config:
try:
job_tempdir.cleanup()
except:
print("Cleaning up {} failed".format(job_tempdir.name))
return (process_result, error_code)
def run_main(self) -> None:
if self.config.config["CACHE_DB"] == True:
self.db_con = prule.db.ResultsDB(self.config.config["DB_PATH"])
# start prule subprocess
self.prule_start()
process_delay_min = config.config["JOB_PROCESS_DELAY_MIN"] if "JOB_PROCESS_DELAY_MIN" in config.config else 0
process_delay_max = config.config["JOB_PROCESS_DELAY_MAX"] if "JOB_PROCESS_DELAY_MAX" in config.config else 960
while self.stopThread == False:
if self.stop_on_empty == True and queue.empty() == True:
self.config.signal_shutdown()
break
cur_time = int(time.time())
def smallest_backoff(item):
return item.backoff - (cur_time - item.first_check)
# get new job from queue and block if empty
job = queue.get_min(smallest_backoff)
if job == "empty":
queue.wait_add() # wait for new jobs
continue
if job == None:
if queue.stopped(): # queue was shut down
break
continue
# sleep to match job process delay
time_offset = cur_time - job.first_check
if time_offset < job.backoff:
# sleep now or wait for jobs
print("PruleThread: delay {}".format(job.backoff - time_offset))
res = queue.wait_add(job.backoff - time_offset) # !! blocks without checking self.stopCondition
if res == None: # queue stopped
queue.add(job)
queue.deactivate_item(job)
break
if res == True:
queue.add(job) # other job was added to queue
queue.deactivate_item(job)
continue
# else timeout expired and job read for try
if self.stopThread == True:
# put back job and return
queue.add(job)
queue.deactivate_item(job)
break
# process job
print("Process ", job.ccjobid)
process_time_start = datetime.datetime.now().timestamp()
result, process_error_code = self.processJob(job)
process_time = datetime.datetime.now().timestamp()-process_time_start
# Possible results:
# "success" - All fine
# "failure-requeue" - Temporary failure
# Requeue job and move on
# "failure-wait" - Temporary failure
# Requeue job, wait and move on
# "failure-drop" - Failure, expected to repeat for this job (e.g. rule processing failed)
# Drop job, note process failure in CACHE_DB and move on
# "failure-shutdown" - Failure, that is expected to need developer/administrator intervention
# Requeue job and shutdown daemon
print("Process job {} {} {}, time {:.6f}".format(job.ccjobid, result, process_error_code, process_time))
if result == "failure-requeue" or result == "failure-wait" or result == "failure-shutdown":
if job.backoff < process_delay_max: # wait a maximum of 16 minutes
job.backoff = 15 if job.backoff == 0 else job.backoff * 2
queue.add(job)
queue.deactivate_item(job)
else:
print("Job {} failed too long after first check. Drop job.".format(job.ccjobid))
result = "failure-drop"
if result == "failure-drop":
# save failure to cache db
if config.config["CACHE_DB"] == True:
try:
with prule.debug.Timing("prulethread.db_insert_failure", "PRINT_TIMING" in config.config):
if self.db_con != None:
self.db_con.db_insert_failure(job.ccjobid, process_error_code, int(datetime.datetime.now().timestamp()))
else:
raise Exception("Failed to open sqlite database")
except Exception as e:
traceback.print_exc()
print(e)
print("ERROR: db_insert_failure failed for job ccid {}, requeue".format(job.ccjobid))
queue.add(job)
queue.deactivate_item(job)
self.stopThread = True
self.config.signal_shutdown()
queue.deactivate_item(job)
if result == "success":
queue.deactivate_item(job)
pass
if result == "failure-shutdown":
self.stopThread = True
self.config.signal_shutdown()
if result == "failure-wait":
# sleep in case CC service is not responsive
def threadStopped():
return self.stopThread
with self.stopCondition:
if self.stopThread == False:
print("CCCheckThread: sleep")
self.stopCondition.wait(30)
self.stopCondition.wait_for(threadStopped, 30)
self.prule_stop()
if self.config.config["CACHE_DB"] == True and self.db_con != None:
self.db_con.close()
self.db_con = None
def run(self) -> None:
try:
self.run_main()
except Exception as e:
traceback.print_exc()
print("PruleThread:",e)
if self.config.shutdown == False:
self.config.signal_shutdown()
def stop(self) -> None:
with self.stopCondition:
if self.currentProcess != None:
self.currentProcess.terminate()
self.processTerminated = True
self.stopThread = True
self.stopCondition.notify()
if __name__ == "__main__":
# block signals, so startup will not be interrupted
signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGINT, signal.SIGTERM, signal.SIGUSR1, signal.SIGPIPE])
parser = argparse.ArgumentParser(
prog='prule daemon',
description='Daemon service that waits for finished ClusterCockpit jobs and applies pathological job rules to measured metrics',
argument_default=argparse.SUPPRESS
)
parser.add_argument('--args', help=argparse.SUPPRESS, action='store_true')
parser.add_argument('--long-help', help="Print detailed help text", action='store_true')
parser.add_argument('--config-path', type=str, help='Path to configuration file.')
parser.add_argument('--no-check', help=argparse.SUPPRESS, action='store_true') # do not start job check thread
parser.add_argument('--check-once', help=argparse.SUPPRESS, action='store_true') # run job check thread once
parser.add_argument('--job-ids-file', type=str, help='Read job ids from file, process the jobs and quit. State file will be ignored. Check for new jobs will not be performed.')
parser.add_argument('--job-ids-json-file', type=str, help='Read job ids from JSON file, process the jobs and quit. State file will be ignored. Check for new jobs will not be performed.')
parser.add_argument('--db-path', type=str, help='Path to database file. Overwrites the DB_PATH value from the config file.')
parser.add_argument('--cache-db', action='store_true', help='Store results in sqlite3 database. Overwrites the CACHE_DB value from the config file.')
parser.add_argument('--job-process-quiet', action='store_true', help='Pass --quiet to prule and do not print full result json')
parser.add_argument('--job-process-no-quiet', dest='job_process_quiet', action='store_false', help='Print full result json')
parser.add_argument('--no-tmpdir-clean', action='store_true', help='Keep temporary directories')
parser.add_argument('--print-timing', action='store_true', help='Print debug timings')
args = vars(parser.parse_args())
if "args" in args:
print(args)
sys.exit(1)
if "long_help" in args:
print(longhelp)
sys.exit(0)
# load config
config = Config(threading.get_ident(), args["config_path"])
config.load()
# overwrite JOB_PROCESS_QUIET config variable
if 'job_process_quiet' in args:
config.config['JOB_PROCESS_QUIET'] = args['job_process_quiet']
# overwrite DB_PATH config variable
if 'db_path' in args:
config.config['DB_PATH'] = args['db_path']
# overwrite CACHE_DB config variable
if 'cache_db' in args:
config.config['CACHE_DB'] = True
if 'print_timing' in args:
config.config['PRINT_TIMING'] = True
if 'no_tmpdir_clean' in args:
config.config['NO_TMPDIR_CLEAN'] = True
# load rules
rules = []
with open(config.config['PRULE_RULES_FILE_PATH'], 'r') as jsonf:
rules_list = json.load(jsonf)
for r in rules_list:
rules.append(r["name"])
# Open cache database and initialize it
if config.config['CACHE_DB'] == True:
db_con = prule.db.ResultsDB(config.config['DB_PATH'])
if db_con.db_check() == False:
db_con.db_initialize()
db_con.db_update_rules(rules)
db_con.close() # reopen sqlite connection again in the prule thread
# read job ids from file
job_ids_preset = []
if 'job_ids_file' in args:
with open(args['job_ids_file'], 'r') as f:
process_delay = config.config["JOB_PROCESS_DELAY"] if "JOB_PROCESS_DELAY" in config.config else 0
for line in f.readlines():
line = line.strip()
if len(line) > 0:
ccjid = int(line.strip())
item = JobQueueItem(ccjid)
item.first_check = item.first_check - process_delay
job_ids_preset.append(item)
if len(job_ids_preset) == 0:
print("No job ids found in job-id-file ", args['job_ids_file'], file=sys.stderr)
sys.exit(1)
# read job ids from JSON file
if 'job_ids_json_file' in args:
with open(args['job_ids_json_file'], 'r') as f:
process_delay = config.config["JOB_PROCESS_DELAY"] if "JOB_PROCESS_DELAY" in config.config else 0
data = json.load(f)
for j in data:
item = JobQueueItem(j["id"])
item.metadata = j
item.first_check = item.first_check - process_delay
job_ids_preset.append(item)
if len(job_ids_preset) == 0:
print("No job ids found in job-id-json-file ", args['job_ids_json_file'], file=sys.stderr)
sys.exit(1)
# create queue and load state
queue = JobQueue()
if len(job_ids_preset) > 0:
queue.add_all(job_ids_preset)
else:
if os.path.exists(config.config["STATE_PATH"]):
ok = queue.loadFromJson(config.config["STATE_PATH"])
if ok == False:
print("Failed to load state from path {} although file exists.".format(config.config["STATE_PATH"]))
print("Make sure the file is readable (and writable) and the folder to the file exists.")
sys.exit(1)
else:
print("State file not found: {}".format(config.config["STATE_PATH"]))
ok = queue.saveToJson(config.config["STATE_PATH"])
if ok == False:
print("Failed to save state to file {}.".format(config.config["STATE_PATH"]))
print("Make sure the file is writable and the folder to the file exists.")
sys.exit(1)
# run check only once?
cc_thread_check_once = True if "CC_CHECK_ONCE" in config.config and config.config["CC_CHECK_ONCE"] == True else False
cc_thread_check_once = True if "check_once" in args else cc_thread_check_once
# create threads
cc_thread = CCCheckThread(config, queue, cc_thread_check_once)
prule_thread = PruleThread(config, len(job_ids_preset) > 0, queue)
# start threads
prule_thread.start()
if 'no_check' not in args and len(job_ids_preset) == 0:
cc_thread.start()
# wait for shutdown signal
while True:
if hasattr(signal, "sigwaitinfo"):
# send CC jobid to daemon process using the following command
# /bin/kill -s USR1 -q CC_JOBID -- DAEMON_PID
# make sure not to use the bash internal kill command
info = signal.sigwaitinfo([signal.SIGINT, signal.SIGTERM, signal.SIGUSR1])
if info.si_signo != signal.SIGUSR1:
break
queue.add((info.si_status, int(time.time())))
else:
signal.sigwait([signal.SIGINT, signal.SIGTERM])
break
config.shutdown = True
print("Got stop signal")
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGINT, signal.SIGTERM])
# stop queue and threads
queue.stop()
cc_thread.stop()
prule_thread.stop()
# wait for threads
try:
if cc_thread.is_alive():
cc_thread.join(1)
except:
pass
try:
if prule_thread.is_alive():
prule_thread.join(1)
except:
pass
# save state
if len(job_ids_preset) == 0:
ok = queue.saveToJson(config.config["STATE_PATH"])
if ok == False:
sys.exit(1)
sys.exit(0)