Merge pull request #477 from niphlod/enhancement/scheduler_deps

new feature (task dependencies) and a little bit of refactoring
This commit is contained in:
mdipierro
2014-08-05 01:44:11 -05:00
+119 -18
View File
@@ -91,7 +91,7 @@ try:
except:
from simplejson import loads, dumps
IDENTIFIER = "%s#%s" % (socket.gethostname(), os.getpid())
IDENTIFIER = "%s#%s" % (socket.gethostname(),os.getpid())
logger = logging.getLogger('web2py.scheduler.%s' % IDENTIFIER)
@@ -117,7 +117,7 @@ STOP_TASK = 'STOP_TASK'
EXPIRED = 'EXPIRED'
SECONDS = 1
HEARTBEAT = 3 * SECONDS
MAXHIBERNATION = 10 * HEARTBEAT
MAXHIBERNATION = 10
CLEAROUT = '!clear!'
CALLABLETYPES = (types.LambdaType, types.FunctionType,
@@ -129,7 +129,6 @@ class Task(object):
"""Defines a "task" object that gets passed from the main thread to the
executor's one
"""
def __init__(self, app, function, timeout, args='[]', vars='{}', **kwargs):
logger.debug(' new task allocated: %s.%s', app, function)
self.app = app
@@ -147,7 +146,6 @@ class TaskReport(object):
"""Defines a "task report" object that gets passed from the executor's
thread to the main one
"""
def __init__(self, status, result=None, output=None, tb=None):
logger.debug(' new task report: %s', status)
if tb:
@@ -162,6 +160,52 @@ class TaskReport(object):
def __str__(self):
return '<TaskReport: %s>' % self.status
class JobGraph(object):
"""Experimental: with JobGraph you can specify
dependencies amongs tasks"""
def __init__(self, db, job_name):
self.job_name = job_name or 'job_0'
self.db = db
def add_deps(self, task_parent, task_child):
self.db.scheduler_task_deps.insert(task_parent=task_parent, task_child=task_child, job_name=self.job_name)
def validate(self, job_name):
db = self.db
sd = db.scheduler_task_deps
if job_name:
q = sd.job_name == job_name
else:
q = sd.id > 0
edges = db(q).select()
nested_dict = {}
for row in edges:
k = row.task_parent
if k in nested_dict:
nested_dict[k].add(row.task_child)
else:
nested_dict[k] = set((row.task_child,))
try:
rtn = []
for k, v in nested_dict.items():
v.discard(k) # Ignore self dependencies
extra_items_in_deps = reduce(set.union, nested_dict.values()) - set(nested_dict.keys())
nested_dict.update(dict((item, set()) for item in extra_items_in_deps))
while True:
ordered = set(item for item,dep in nested_dict.items() if not dep)
if not ordered:
break
rtn.append(ordered)
nested_dict = dict((item, (dep - ordered)) for item, dep in nested_dict.items()
if item not in ordered)
assert not nested_dict, "A cyclic dependency exists amongst %r" % nested_dict
db.commit()
return rtn
except:
db.rollback()
return None
def demo_function(*argv, **kwargs):
""" test function """
@@ -170,10 +214,9 @@ def demo_function(*argv, **kwargs):
time.sleep(1)
return 'done'
#the two functions below deal with simplejson decoding as unicode,
#esp for the dict decode and subsequent usage as function Keyword arguments
#unicode variable names won't work!
#borrowed from http://stackoverflow.com/questions/956867/
#the two functions below deal with simplejson decoding as unicode, esp for the dict decode
#and subsequent usage as function Keyword arguments unicode variable names won't work!
#borrowed from http://stackoverflow.com/questions/956867/how-to-get-string-objects-instead-unicode-ones-from-json-in-python
def _decode_list(lst):
@@ -220,7 +263,7 @@ def executor(queue, task, out):
def write(self, data):
self.out_queue.put(data)
W2P_TASK = Storage({'id': task.task_id, 'uuid': task.uuid})
W2P_TASK = Storage({'id' : task.task_id, 'uuid' : task.uuid})
stdout = LogOutput(out)
try:
if task.app:
@@ -245,7 +288,7 @@ def executor(queue, task, out):
raise NameError(
"name '%s' not found in scheduler's environment" % f)
#Inject W2P_TASK into environment
_env.update({'W2P_TASK': W2P_TASK})
_env.update({'W2P_TASK' : W2P_TASK})
#Inject W2P_TASK into current
from gluon import current
current.W2P_TASK = W2P_TASK
@@ -620,6 +663,15 @@ class Scheduler(MetaScheduler):
migrate=self.__get_migrate('scheduler_worker', migrate)
)
db.define_table(
'scheduler_task_deps',
Field('job_name', default='job_0'),
Field('task_parent', 'reference scheduler_task'),
Field('task_child', 'reference scheduler_task'),
Field('can_visit', 'boolean', default=False),
migrate=self.__get_migrate('scheduler_task_deps', migrate)
)
if migrate is not False:
db.commit()
@@ -846,6 +898,7 @@ class Scheduler(MetaScheduler):
times_failed=0
)
db(st.id == task.task_id).update(**d)
self.update_dependencies(db, task.task_id)
else:
st_mapping = {'FAILED': 'FAILED',
'TIMEOUT': 'TIMEOUT',
@@ -861,6 +914,9 @@ class Scheduler(MetaScheduler):
)
logger.info('task completed (%s)', task_report.status)
def update_dependencies(self, db, task_id):
db(db.scheduler_task_deps.task_child == task_id).update(can_visit=True)
def adj_hibernation(self):
"""Used to increase the "sleep" interval for DISABLED workers"""
if self.w_stats.status == DISABLED:
@@ -905,12 +961,11 @@ class Scheduler(MetaScheduler):
if mybackedstatus == DISABLED:
# keep sleeping
self.w_stats.status = DISABLED
if self.w_stats.sleep >= MAXHIBERNATION:
logger.debug('........recording heartbeat (%s)',
self.w_stats.status)
db(sw.worker_name == self.worker_name).update(
last_heartbeat=now,
worker_stats=self.w_stats)
logger.debug('........recording heartbeat (%s)',
self.w_stats.status)
db(sw.worker_name == self.worker_name).update(
last_heartbeat=now,
worker_stats=self.w_stats)
elif mybackedstatus == TERMINATE:
self.w_stats.status = TERMINATE
logger.debug("Waiting to terminate the current task")
@@ -1007,7 +1062,7 @@ class Scheduler(MetaScheduler):
Deals with group_name(s) logic, in order to assign linearly tasks
to available workers for those groups
"""
sw, st = db.scheduler_worker, db.scheduler_task
sw, st, sd = db.scheduler_worker, db.scheduler_task, db.scheduler_task_deps
now = self.now()
all_workers = db(sw.status == ACTIVE).select()
#build workers as dict of groups
@@ -1028,14 +1083,36 @@ class Scheduler(MetaScheduler):
(st.stop_time < now)
).update(status=EXPIRED)
deps_with_no_deps = db(
(sd.can_visit == False) &
(~sd.task_child.belongs(
db(sd.can_visit == False)._select(sd.task_parent)
)
)
)._select(sd.task_child)
no_deps = db(
(st.status.belongs((QUEUED,ASSIGNED))) &
(
(sd.id == None) | (st.id.belongs(deps_with_no_deps))
)
)._select(st.id, distinct=True, left=sd.on(
(st.id == sd.task_parent) &
(sd.can_visit == False)
)
)
all_available = db(
(st.status.belongs((QUEUED, ASSIGNED))) &
((st.times_run < st.repeats) | (st.repeats == 0)) &
(st.start_time <= now) &
((st.stop_time == None) | (st.stop_time > now)) &
(st.next_run_time <= now) &
(st.enabled == True)
(st.enabled == True) &
(st.id.belongs(no_deps))
)
limit = len(all_workers) * (50 / (len(wkgroups) or 1))
#if there are a moltitude of tasks, let's figure out a maximum of
#tasks per worker. This can be further tuned with some added
@@ -1302,6 +1379,30 @@ class Scheduler(MetaScheduler):
status=STOPPED)
return rtn
def get_workers(self, only_ticker=False):
""" Returns a dict holding worker_name : {**columns}
representing all "registered" workers
only_ticker returns only the worker running as a TICKER,
if there is any
"""
db = self.db
if only_ticker:
workers = db(db.scheduler_worker.is_ticker == True).select()
else:
workers = db(db.scheduler_worker.id > 0).select()
all_workers = {}
for row in workers:
all_workers[row.worker_name] = Storage(dict(
status=row.status,
first_heartbeat=row.first_heartbeat,
last_heartbeat=row.last_heartbeat,
group_names=row.group_names,
is_ticker=row.is_ticker,
worker_stats=row.worker_stats
)
)
return all_workers
def main():
"""