diff --git a/gluon/scheduler.py b/gluon/scheduler.py index b5d5fac7..ff885439 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -906,7 +906,8 @@ class Scheduler(MetaScheduler): times_failed=0 ) db(st.id == task.task_id).update(**d) - self.update_dependencies(db, task.task_id) + if status == COMPLETED: + self.update_dependencies(db, task.task_id) else: st_mapping = {'FAILED': 'FAILED', 'TIMEOUT': 'TIMEOUT', @@ -1186,13 +1187,16 @@ class Scheduler(MetaScheduler): # should only sleep until next available task def set_worker_status(self, group_names=None, action=ACTIVE, - exclude=None, limit=None): + exclude=None, limit=None, worker_name=None): """Internal function to set worker's status""" ws = self.db.scheduler_worker if not group_names: group_names = self.group_names elif isinstance(group_names, str): group_names = [group_names] + if worker_name: + self.db(ws.worker_name == worker_name).update(status=action) + return exclusion = exclude and exclude.append(action) or [action] if not limit: for group in group_names: @@ -1208,7 +1212,7 @@ class Scheduler(MetaScheduler): )._select(ws.id, limitby=(0,limit)) self.db(ws.id.belongs(workers)).update(status=action) - def disable(self, group_names=None, limit=None): + def disable(self, group_names=None, limit=None, worker_name=None): """Sets DISABLED on the workers processing `group_names` tasks. A DISABLED worker will be kept alive but it won't be able to process any waiting tasks, essentially putting it to sleep. @@ -1219,7 +1223,7 @@ class Scheduler(MetaScheduler): exclude=[DISABLED, KILL, TERMINATE], limit=limit) - def resume(self, group_names=None, limit=None): + def resume(self, group_names=None, limit=None, worker_name=None): """Wakes a worker up (it will be able to process queued tasks)""" self.set_worker_status( group_names=group_names, @@ -1227,7 +1231,7 @@ class Scheduler(MetaScheduler): exclude=[KILL, TERMINATE], limit=limit) - def terminate(self, group_names=None, limit=None): + def terminate(self, group_names=None, limit=None, worker_name=None): """Sets TERMINATE as worker status. The worker will wait for any currently running tasks to be executed and then it will exit gracefully """ @@ -1237,7 +1241,7 @@ class Scheduler(MetaScheduler): exclude=[KILL], limit=limit) - def kill(self, group_names=None, limit=None): + def kill(self, group_names=None, limit=None, worker_name=None): """Sets KILL as worker status. The worker will be killed even if it's processing a task.""" self.set_worker_status(