Merge pull request #538 from niphlod/enhancement/scheduler
update dependencies only if COMPLETED, allow to set status on a specific...
This commit is contained in:
@@ -906,7 +906,8 @@ class Scheduler(MetaScheduler):
|
||||
times_failed=0
|
||||
)
|
||||
db(st.id == task.task_id).update(**d)
|
||||
self.update_dependencies(db, task.task_id)
|
||||
if status == COMPLETED:
|
||||
self.update_dependencies(db, task.task_id)
|
||||
else:
|
||||
st_mapping = {'FAILED': 'FAILED',
|
||||
'TIMEOUT': 'TIMEOUT',
|
||||
@@ -1186,13 +1187,16 @@ class Scheduler(MetaScheduler):
|
||||
# should only sleep until next available task
|
||||
|
||||
def set_worker_status(self, group_names=None, action=ACTIVE,
|
||||
exclude=None, limit=None):
|
||||
exclude=None, limit=None, worker_name=None):
|
||||
"""Internal function to set worker's status"""
|
||||
ws = self.db.scheduler_worker
|
||||
if not group_names:
|
||||
group_names = self.group_names
|
||||
elif isinstance(group_names, str):
|
||||
group_names = [group_names]
|
||||
if worker_name:
|
||||
self.db(ws.worker_name == worker_name).update(status=action)
|
||||
return
|
||||
exclusion = exclude and exclude.append(action) or [action]
|
||||
if not limit:
|
||||
for group in group_names:
|
||||
@@ -1208,7 +1212,7 @@ class Scheduler(MetaScheduler):
|
||||
)._select(ws.id, limitby=(0,limit))
|
||||
self.db(ws.id.belongs(workers)).update(status=action)
|
||||
|
||||
def disable(self, group_names=None, limit=None):
|
||||
def disable(self, group_names=None, limit=None, worker_name=None):
|
||||
"""Sets DISABLED on the workers processing `group_names` tasks.
|
||||
A DISABLED worker will be kept alive but it won't be able to process
|
||||
any waiting tasks, essentially putting it to sleep.
|
||||
@@ -1219,7 +1223,7 @@ class Scheduler(MetaScheduler):
|
||||
exclude=[DISABLED, KILL, TERMINATE],
|
||||
limit=limit)
|
||||
|
||||
def resume(self, group_names=None, limit=None):
|
||||
def resume(self, group_names=None, limit=None, worker_name=None):
|
||||
"""Wakes a worker up (it will be able to process queued tasks)"""
|
||||
self.set_worker_status(
|
||||
group_names=group_names,
|
||||
@@ -1227,7 +1231,7 @@ class Scheduler(MetaScheduler):
|
||||
exclude=[KILL, TERMINATE],
|
||||
limit=limit)
|
||||
|
||||
def terminate(self, group_names=None, limit=None):
|
||||
def terminate(self, group_names=None, limit=None, worker_name=None):
|
||||
"""Sets TERMINATE as worker status. The worker will wait for any
|
||||
currently running tasks to be executed and then it will exit gracefully
|
||||
"""
|
||||
@@ -1237,7 +1241,7 @@ class Scheduler(MetaScheduler):
|
||||
exclude=[KILL],
|
||||
limit=limit)
|
||||
|
||||
def kill(self, group_names=None, limit=None):
|
||||
def kill(self, group_names=None, limit=None, worker_name=None):
|
||||
"""Sets KILL as worker status. The worker will be killed even if it's
|
||||
processing a task."""
|
||||
self.set_worker_status(
|
||||
|
||||
Reference in New Issue
Block a user