Skip to content
Snippets Groups Projects
Commit 58d3a21d authored by Alex Wiens's avatar Alex Wiens
Browse files

Prule.daemon: Add signal_shutdown() to config

parent c205c9d2
Branches
No related tags found
No related merge requests found
...@@ -132,6 +132,7 @@ class Config: ...@@ -132,6 +132,7 @@ class Config:
self.path = path self.path = path
self.config = {} self.config = {}
self.main_tid = main_tid self.main_tid = main_tid
self.shutdown = False
def load(self): def load(self):
data = None data = None
with open(self.path, "r") as f: with open(self.path, "r") as f:
...@@ -142,6 +143,10 @@ class Config: ...@@ -142,6 +143,10 @@ class Config:
if type(data[c]) != config_types[i]: if type(data[c]) != config_types[i]:
raise Exception("Key {} in configuration file has wrong type {}. It should be of type {}.".format(c, type(data[c]), config_types[i])) raise Exception("Key {} in configuration file has wrong type {}. It should be of type {}.".format(c, type(data[c]), config_types[i]))
config.config = data config.config = data
def signal_shutdown(self):
if self.shutdown == False:
self.shutdown = True
signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown
class JobQueueItem: class JobQueueItem:
def __init__(self, ccjobid, metadata=None): def __init__(self, ccjobid, metadata=None):
...@@ -352,14 +357,14 @@ class CCCheckThread(threading.Thread): ...@@ -352,14 +357,14 @@ class CCCheckThread(threading.Thread):
else: else:
if response.status == 401: if response.status == 401:
print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr) print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown self.config.signal_shutdown()
print(response.status) print(response.status)
return None return None
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
print("Error {} for URL {}".format(e.code, e.url)) print("Error {} for URL {}".format(e.code, e.url))
if e.code == 401: if e.code == 401:
print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr) print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown self.config.signal_shutdown()
return None return None
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
...@@ -524,7 +529,7 @@ class PruleThread(threading.Thread): ...@@ -524,7 +529,7 @@ class PruleThread(threading.Thread):
return job_meta return job_meta
if response.status == 401: if response.status == 401:
print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr) print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown self.config.signal_shutdown()
print("Error {} for URL {}".format(response.status, url)) print("Error {} for URL {}".format(response.status, url))
if response.status >= 500 or response.status == 429 or response.status == 401: if response.status >= 500 or response.status == 429 or response.status == 401:
return "wait" return "wait"
...@@ -536,7 +541,7 @@ class PruleThread(threading.Thread): ...@@ -536,7 +541,7 @@ class PruleThread(threading.Thread):
return "wait" return "wait"
if e.code == 401: if e.code == 401:
print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr) print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown self.config.signal_shutdown()
return "wait" return "wait"
return None return None
except Exception as e: # something went horribly wrong except Exception as e: # something went horribly wrong
...@@ -559,7 +564,7 @@ class PruleThread(threading.Thread): ...@@ -559,7 +564,7 @@ class PruleThread(threading.Thread):
return True return True
if response.status == 401: if response.status == 401:
print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr) print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown self.config.signal_shutdown()
print("Error {} for URL {}".format(response.status, url)) print("Error {} for URL {}".format(response.status, url))
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
msg = "" msg = ""
...@@ -570,7 +575,7 @@ class PruleThread(threading.Thread): ...@@ -570,7 +575,7 @@ class PruleThread(threading.Thread):
print("Error {} for URL {} Reason {} Msg {}".format(e.code, e.url, e.reason, msg)) print("Error {} for URL {} Reason {} Msg {}".format(e.code, e.url, e.reason, msg))
if e.code == 401: if e.code == 401:
print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr) print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown self.config.signal_shutdown()
if e.code == 500 and "Duplicate entry" in msg: # TODO: Tag is inserted twice. Fix once tag removal is possible. if e.code == 500 and "Duplicate entry" in msg: # TODO: Tag is inserted twice. Fix once tag removal is possible.
return True return True
return False return False
...@@ -600,14 +605,14 @@ class PruleThread(threading.Thread): ...@@ -600,14 +605,14 @@ class PruleThread(threading.Thread):
return tdir return tdir
if response.status == 401: if response.status == 401:
print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr) print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown self.config.signal_shutdown()
return "wait" return "wait"
print("Error {} for URL {}".format(response.status, url)) print("Error {} for URL {}".format(response.status, url))
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
print("Error {} for URL {}".format(e.code, e.url)) print("Error {} for URL {}".format(e.code, e.url))
if e.code == 401: if e.code == 401:
print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr) print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown self.config.signal_shutdown()
return "wait" return "wait"
return False return False
except Exception as e: # something went horribly wrong except Exception as e: # something went horribly wrong
...@@ -630,13 +635,13 @@ class PruleThread(threading.Thread): ...@@ -630,13 +635,13 @@ class PruleThread(threading.Thread):
return True return True
if response.status == 401: if response.status == 401:
print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr) print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown self.config.signal_shutdown()
print("Error {} for URL {}".format(response.status, url)) print("Error {} for URL {}".format(response.status, url))
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
print("Error {} for URL {}".format(e.code, e.url)) print("Error {} for URL {}".format(e.code, e.url))
if e.code == 401: if e.code == 401:
print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr) print("HTTP Error 401: Unauthorized, ClusterCockpit API TOKEN invalid?", file=sys.stderr)
signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown self.config.signal_shutdown()
return False return False
except Exception as e: # something went horribly wrong except Exception as e: # something went horribly wrong
traceback.print_exc() traceback.print_exc()
...@@ -863,7 +868,7 @@ class PruleThread(threading.Thread): ...@@ -863,7 +868,7 @@ class PruleThread(threading.Thread):
while self.stopThread == False: while self.stopThread == False:
if self.stop_on_empty == True and queue.empty() == True: if self.stop_on_empty == True and queue.empty() == True:
signal.pthread_kill(config.main_tid, signal.SIGTERM) self.config.signal_shutdown()
break break
cur_time = int(time.time()) cur_time = int(time.time())
...@@ -939,13 +944,13 @@ class PruleThread(threading.Thread): ...@@ -939,13 +944,13 @@ class PruleThread(threading.Thread):
print("ERROR: db_insert_failure failed for job ccid {}, requeue".format(job.ccjobid)) print("ERROR: db_insert_failure failed for job ccid {}, requeue".format(job.ccjobid))
queue.add(job) queue.add(job)
self.stopThread = True self.stopThread = True
signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown self.config.signal_shutdown()
if result == "success": if result == "success":
pass pass
if result == "failure-shutdown": if result == "failure-shutdown":
self.stopThread = True self.stopThread = True
signal.pthread_kill(config.main_tid, signal.SIGTERM) # shutdown self.config.signal_shutdown()
if result == "failure-wait": if result == "failure-wait":
# sleep in case CC service is not responsive # sleep in case CC service is not responsive
...@@ -1103,6 +1108,7 @@ if __name__ == "__main__": ...@@ -1103,6 +1108,7 @@ if __name__ == "__main__":
else: else:
signal.sigwait([signal.SIGINT, signal.SIGTERM]) signal.sigwait([signal.SIGINT, signal.SIGTERM])
break break
config.shutdown = True
print("Got stop signal") print("Got stop signal")
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGINT, signal.SIGTERM]) signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGINT, signal.SIGTERM])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment