diff --git a/prule/daemon/__main__.py b/prule/daemon/__main__.py index cb7c49cc18bf833768b7dad2291e9d036910f8f5..e7e91b49f4ca6d021ff140fa7672830a96734df7 100644 --- a/prule/daemon/__main__.py +++ b/prule/daemon/__main__.py @@ -167,6 +167,12 @@ class JobQueueItem: if item.ccjobid == None: return None return item + def __eq__(self, other): + if type(other) != JobQueueItem: + return False + return self.ccjobid == other.ccjobid + def __str__(self): + return "<JobQueueItem ccjobid={} metadata={} first_check={} backoff={}>".format(self.ccjobid, self.metadata != None, self.first_check, self.backoff) """ Holds the known jobs of the daemon. @@ -182,6 +188,7 @@ class JobQueue: self.stop_once_empty = False # attributes that are stored self.queue = [] + self.queue_active = [] self.smallest_starttime = 0 self.last_check = 0 def loadFromJson(self, path): @@ -210,6 +217,8 @@ class JobQueue: data["queue"] = [] for i in self.queue: data["queue"].append(i.toDict()) + for i in self.queue_active: + data["queue"].append(i.toDict()) try: with open(path, "w") as f: json.dump(data, f) @@ -282,6 +291,7 @@ class JobQueue: def get_min(self, min_select): # select the item with the smallest value or next item with value <= 0 # does not block if queue is empty ! + # Moves item automatically into queue_active min_value = 999999999 index = -1 with self.condition: @@ -298,11 +308,14 @@ class JobQueue: min_value = value index = ix if index != -1: - return self.queue.pop(index) + value = self.queue.pop(index) + self.queue_active.append(value) + return value return None def get(self): # Waits until the queue is not empty or the queue is shut down # Returns a value if the queue is not empty or None if the queue is shut down + # Moves item automatically into queue_active with self.condition: while len(self.queue) == 0 and self.stopQueue == False and self.stop_once_empty == False: self.condition.wait() @@ -310,7 +323,20 @@ class JobQueue: return None value = self.queue[0] self.queue = self.queue[1:] + self.queue_active.append(value) return value + def deactivate_item(self, item): + with self.condition: + if self.stopQueue == True: + return None + try: + self.queue_active.remove(item) + return True + except ValueError as e: + traceback.print_exc() + print("JobQueueItem {} to deactivate not found in active queue".format(item)) + return False + """ Checks for newly finished jobs and puts them into the queue. @@ -898,15 +924,18 @@ class PruleThread(threading.Thread): res = queue.wait_add(job.backoff - time_offset) # !! blocks without checking self.stopCondition if res == None: # queue stopped queue.add(job) + queue.deactivate_item(job) break if res == True: queue.add(job) # other job was added to queue + queue.deactivate_item(job) continue # else timeout expired and job read for try if self.stopThread == True: # put back job and return queue.add(job) + queue.deactivate_item(job) break # process job @@ -932,6 +961,7 @@ class PruleThread(threading.Thread): if job.backoff < process_delay_max: # wait a maximum of 16 minutes job.backoff = 15 if job.backoff == 0 else job.backoff * 2 queue.add(job) + queue.deactivate_item(job) else: print("Job {} failed too long after first check. Drop job.".format(job.ccjobid)) result = "failure-drop" @@ -946,9 +976,12 @@ class PruleThread(threading.Thread): print(e) print("ERROR: db_insert_failure failed for job ccid {}, requeue".format(job.ccjobid)) queue.add(job) + queue.deactivate_item(job) self.stopThread = True self.config.signal_shutdown() + queue.deactivate_item(job) if result == "success": + queue.deactivate_item(job) pass if result == "failure-shutdown":