fixes the same bugs reported in #1191 for the "standard" scheduler
This commit is contained in:
@@ -358,20 +358,20 @@ class RScheduler(Scheduler):
|
||||
(sd.can_visit == False) &
|
||||
(~sd.task_child.belongs(
|
||||
db(sd.can_visit == False)._select(sd.task_parent)
|
||||
)
|
||||
)
|
||||
)._select(sd.task_child)
|
||||
)
|
||||
)._select(sd.task_child)
|
||||
no_deps = db(
|
||||
(st.status.belongs((QUEUED, ASSIGNED))) &
|
||||
(
|
||||
(sd.id == None) | (st.id.belongs(deps_with_no_deps))
|
||||
|
||||
)
|
||||
)._select(st.id, distinct=True, left=sd.on(
|
||||
(st.id == sd.task_parent) &
|
||||
(sd.can_visit == False)
|
||||
)
|
||||
)
|
||||
)._select(st.id, distinct=True, left=sd.on(
|
||||
(st.id == sd.task_parent) &
|
||||
(sd.can_visit == False)
|
||||
)
|
||||
)
|
||||
|
||||
all_available = db(
|
||||
(st.status.belongs((QUEUED, ASSIGNED))) &
|
||||
@@ -449,8 +449,9 @@ class RScheduler(Scheduler):
|
||||
self.w_stats.status = POLLING
|
||||
# polling for 1 minute in total. If more groups are in,
|
||||
# polling is 1 minute in total
|
||||
logger.debug(' polling on %s' , group)
|
||||
task_id = r_server.brpoplpush(queued_list, running_list, timeout=60/len(self.group_names))
|
||||
logger.debug(' polling on %s', group)
|
||||
task_id = r_server.brpoplpush(queued_list, running_list,
|
||||
timeout=60 / len(self.group_names))
|
||||
logger.debug(' finished polling')
|
||||
self.w_stats.status = ACTIVE
|
||||
if task_id:
|
||||
@@ -464,7 +465,8 @@ class RScheduler(Scheduler):
|
||||
r_server.lrem(running_list, 0, task_id)
|
||||
r_server.hdel(running_dict, task_id)
|
||||
r_server.lrem(queued_list, 0, task_id)
|
||||
logger.error("we received a task that isn't there (%s)" % task_id)
|
||||
logger.error("we received a task that isn't there (%s)",
|
||||
task_id)
|
||||
return None
|
||||
break
|
||||
now = self.now()
|
||||
@@ -474,7 +476,7 @@ class RScheduler(Scheduler):
|
||||
db.commit()
|
||||
logger.debug(' work to do %s', task.id)
|
||||
else:
|
||||
logger.info('nothing to do (%s)' % self.w_stats.status)
|
||||
logger.info('nothing to do')
|
||||
return None
|
||||
times_run = task.times_run + 1
|
||||
if not task.prevent_drift:
|
||||
@@ -482,9 +484,13 @@ class RScheduler(Scheduler):
|
||||
seconds=task.period
|
||||
)
|
||||
else:
|
||||
next_run_time = task.start_time + datetime.timedelta(
|
||||
seconds=task.period * times_run
|
||||
)
|
||||
# calc next_run_time based on available slots
|
||||
# see #1191
|
||||
next_run_time = task.start_time
|
||||
secondspassed = self.total_seconds(now - next_run_time)
|
||||
steps = secondspassed // task.period + 1
|
||||
next_run_time += datetime.timedelta(seconds=task.period * steps)
|
||||
|
||||
if times_run < task.repeats or task.repeats == 0:
|
||||
# need to run (repeating task)
|
||||
run_again = True
|
||||
@@ -578,7 +584,7 @@ class RScheduler(Scheduler):
|
||||
and QUEUED or task.retry_failed == -1
|
||||
and QUEUED or st_mapping)
|
||||
db(st.id == task.task_id).update(
|
||||
times_failed=db.scheduler_task.times_failed + 1,
|
||||
times_failed=st.times_failed + 1,
|
||||
next_run_time=task.next_run_time,
|
||||
status=status,
|
||||
assigned_worker_name=self.worker_name
|
||||
|
||||
Reference in New Issue
Block a user