From 038d0d17a410cb368ddb6fc84ac2528a5cb8af93 Mon Sep 17 00:00:00 2001 From: niphlod Date: Thu, 10 Mar 2016 00:01:39 +0100 Subject: [PATCH] fixes the same bugs reported in #1191 for the "standard" scheduler --- gluon/contrib/redis_scheduler.py | 36 +++++++++++++++++++------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/gluon/contrib/redis_scheduler.py b/gluon/contrib/redis_scheduler.py index 109ff071..51c836e0 100644 --- a/gluon/contrib/redis_scheduler.py +++ b/gluon/contrib/redis_scheduler.py @@ -358,20 +358,20 @@ class RScheduler(Scheduler): (sd.can_visit == False) & (~sd.task_child.belongs( db(sd.can_visit == False)._select(sd.task_parent) - ) ) - )._select(sd.task_child) + ) + )._select(sd.task_child) no_deps = db( (st.status.belongs((QUEUED, ASSIGNED))) & ( (sd.id == None) | (st.id.belongs(deps_with_no_deps)) ) - )._select(st.id, distinct=True, left=sd.on( - (st.id == sd.task_parent) & - (sd.can_visit == False) - ) - ) + )._select(st.id, distinct=True, left=sd.on( + (st.id == sd.task_parent) & + (sd.can_visit == False) + ) + ) all_available = db( (st.status.belongs((QUEUED, ASSIGNED))) & @@ -449,8 +449,9 @@ class RScheduler(Scheduler): self.w_stats.status = POLLING # polling for 1 minute in total. If more groups are in, # polling is 1 minute in total - logger.debug(' polling on %s' , group) - task_id = r_server.brpoplpush(queued_list, running_list, timeout=60/len(self.group_names)) + logger.debug(' polling on %s', group) + task_id = r_server.brpoplpush(queued_list, running_list, + timeout=60 / len(self.group_names)) logger.debug(' finished polling') self.w_stats.status = ACTIVE if task_id: @@ -464,7 +465,8 @@ class RScheduler(Scheduler): r_server.lrem(running_list, 0, task_id) r_server.hdel(running_dict, task_id) r_server.lrem(queued_list, 0, task_id) - logger.error("we received a task that isn't there (%s)" % task_id) + logger.error("we received a task that isn't there (%s)", + task_id) return None break now = self.now() @@ -474,7 +476,7 @@ class RScheduler(Scheduler): db.commit() logger.debug(' work to do %s', task.id) else: - logger.info('nothing to do (%s)' % self.w_stats.status) + logger.info('nothing to do') return None times_run = task.times_run + 1 if not task.prevent_drift: @@ -482,9 +484,13 @@ class RScheduler(Scheduler): seconds=task.period ) else: - next_run_time = task.start_time + datetime.timedelta( - seconds=task.period * times_run - ) + # calc next_run_time based on available slots + # see #1191 + next_run_time = task.start_time + secondspassed = self.total_seconds(now - next_run_time) + steps = secondspassed // task.period + 1 + next_run_time += datetime.timedelta(seconds=task.period * steps) + if times_run < task.repeats or task.repeats == 0: # need to run (repeating task) run_again = True @@ -578,7 +584,7 @@ class RScheduler(Scheduler): and QUEUED or task.retry_failed == -1 and QUEUED or st_mapping) db(st.id == task.task_id).update( - times_failed=db.scheduler_task.times_failed + 1, + times_failed=st.times_failed + 1, next_run_time=task.next_run_time, status=status, assigned_worker_name=self.worker_name