From cb6f2ca302369802a7039e73b810e571f7604c50 Mon Sep 17 00:00:00 2001 From: niphlod Date: Mon, 4 Aug 2014 21:42:11 +0200 Subject: [PATCH] task dependencies, a little bit of refactoring --- gluon/scheduler.py | 137 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 119 insertions(+), 18 deletions(-) diff --git a/gluon/scheduler.py b/gluon/scheduler.py index 9639d50b..ca48556e 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -91,7 +91,7 @@ try: except: from simplejson import loads, dumps -IDENTIFIER = "%s#%s" % (socket.gethostname(), os.getpid()) +IDENTIFIER = "%s#%s" % (socket.gethostname(),os.getpid()) logger = logging.getLogger('web2py.scheduler.%s' % IDENTIFIER) @@ -117,7 +117,7 @@ STOP_TASK = 'STOP_TASK' EXPIRED = 'EXPIRED' SECONDS = 1 HEARTBEAT = 3 * SECONDS -MAXHIBERNATION = 10 * HEARTBEAT +MAXHIBERNATION = 10 CLEAROUT = '!clear!' CALLABLETYPES = (types.LambdaType, types.FunctionType, @@ -129,7 +129,6 @@ class Task(object): """Defines a "task" object that gets passed from the main thread to the executor's one """ - def __init__(self, app, function, timeout, args='[]', vars='{}', **kwargs): logger.debug(' new task allocated: %s.%s', app, function) self.app = app @@ -147,7 +146,6 @@ class TaskReport(object): """Defines a "task report" object that gets passed from the executor's thread to the main one """ - def __init__(self, status, result=None, output=None, tb=None): logger.debug(' new task report: %s', status) if tb: @@ -162,6 +160,52 @@ class TaskReport(object): def __str__(self): return '' % self.status +class JobGraph(object): + """Experimental: with JobGraph you can specify + dependencies amongs tasks""" + + def __init__(self, db, job_name): + self.job_name = job_name or 'job_0' + self.db = db + + def add_deps(self, task_parent, task_child): + self.db.scheduler_task_deps.insert(task_parent=task_parent, task_child=task_child, job_name=self.job_name) + + def validate(self, job_name): + db = self.db + sd = db.scheduler_task_deps + if job_name: + q = sd.job_name == job_name + else: + q = sd.id > 0 + + edges = db(q).select() + nested_dict = {} + for row in edges: + k = row.task_parent + if k in nested_dict: + nested_dict[k].add(row.task_child) + else: + nested_dict[k] = set((row.task_child,)) + try: + rtn = [] + for k, v in nested_dict.items(): + v.discard(k) # Ignore self dependencies + extra_items_in_deps = reduce(set.union, nested_dict.values()) - set(nested_dict.keys()) + nested_dict.update(dict((item, set()) for item in extra_items_in_deps)) + while True: + ordered = set(item for item,dep in nested_dict.items() if not dep) + if not ordered: + break + rtn.append(ordered) + nested_dict = dict((item, (dep - ordered)) for item, dep in nested_dict.items() + if item not in ordered) + assert not nested_dict, "A cyclic dependency exists amongst %r" % nested_dict + db.commit() + return rtn + except: + db.rollback() + return None def demo_function(*argv, **kwargs): """ test function """ @@ -170,10 +214,9 @@ def demo_function(*argv, **kwargs): time.sleep(1) return 'done' -#the two functions below deal with simplejson decoding as unicode, -#esp for the dict decode and subsequent usage as function Keyword arguments -#unicode variable names won't work! -#borrowed from http://stackoverflow.com/questions/956867/ +#the two functions below deal with simplejson decoding as unicode, esp for the dict decode +#and subsequent usage as function Keyword arguments unicode variable names won't work! +#borrowed from http://stackoverflow.com/questions/956867/how-to-get-string-objects-instead-unicode-ones-from-json-in-python def _decode_list(lst): @@ -220,7 +263,7 @@ def executor(queue, task, out): def write(self, data): self.out_queue.put(data) - W2P_TASK = Storage({'id': task.task_id, 'uuid': task.uuid}) + W2P_TASK = Storage({'id' : task.task_id, 'uuid' : task.uuid}) stdout = LogOutput(out) try: if task.app: @@ -245,7 +288,7 @@ def executor(queue, task, out): raise NameError( "name '%s' not found in scheduler's environment" % f) #Inject W2P_TASK into environment - _env.update({'W2P_TASK': W2P_TASK}) + _env.update({'W2P_TASK' : W2P_TASK}) #Inject W2P_TASK into current from gluon import current current.W2P_TASK = W2P_TASK @@ -620,6 +663,15 @@ class Scheduler(MetaScheduler): migrate=self.__get_migrate('scheduler_worker', migrate) ) + db.define_table( + 'scheduler_task_deps', + Field('job_name', default='job_0'), + Field('task_parent', 'reference scheduler_task'), + Field('task_child', 'reference scheduler_task'), + Field('can_visit', 'boolean', default=False), + migrate=self.__get_migrate('scheduler_task_deps', migrate) + ) + if migrate is not False: db.commit() @@ -846,6 +898,7 @@ class Scheduler(MetaScheduler): times_failed=0 ) db(st.id == task.task_id).update(**d) + self.update_dependencies(db, task.task_id) else: st_mapping = {'FAILED': 'FAILED', 'TIMEOUT': 'TIMEOUT', @@ -861,6 +914,9 @@ class Scheduler(MetaScheduler): ) logger.info('task completed (%s)', task_report.status) + def update_dependencies(self, db, task_id): + db(db.scheduler_task_deps.task_child == task_id).update(can_visit=True) + def adj_hibernation(self): """Used to increase the "sleep" interval for DISABLED workers""" if self.w_stats.status == DISABLED: @@ -905,12 +961,11 @@ class Scheduler(MetaScheduler): if mybackedstatus == DISABLED: # keep sleeping self.w_stats.status = DISABLED - if self.w_stats.sleep >= MAXHIBERNATION: - logger.debug('........recording heartbeat (%s)', - self.w_stats.status) - db(sw.worker_name == self.worker_name).update( - last_heartbeat=now, - worker_stats=self.w_stats) + logger.debug('........recording heartbeat (%s)', + self.w_stats.status) + db(sw.worker_name == self.worker_name).update( + last_heartbeat=now, + worker_stats=self.w_stats) elif mybackedstatus == TERMINATE: self.w_stats.status = TERMINATE logger.debug("Waiting to terminate the current task") @@ -1007,7 +1062,7 @@ class Scheduler(MetaScheduler): Deals with group_name(s) logic, in order to assign linearly tasks to available workers for those groups """ - sw, st = db.scheduler_worker, db.scheduler_task + sw, st, sd = db.scheduler_worker, db.scheduler_task, db.scheduler_task_deps now = self.now() all_workers = db(sw.status == ACTIVE).select() #build workers as dict of groups @@ -1028,14 +1083,36 @@ class Scheduler(MetaScheduler): (st.stop_time < now) ).update(status=EXPIRED) + deps_with_no_deps = db( + (sd.can_visit == False) & + (~sd.task_child.belongs( + db(sd.can_visit == False)._select(sd.task_parent) + ) + ) + )._select(sd.task_child) + no_deps = db( + (st.status.belongs((QUEUED,ASSIGNED))) & + ( + (sd.id == None) | (st.id.belongs(deps_with_no_deps)) + + ) + )._select(st.id, distinct=True, left=sd.on( + (st.id == sd.task_parent) & + (sd.can_visit == False) + ) + ) + all_available = db( (st.status.belongs((QUEUED, ASSIGNED))) & ((st.times_run < st.repeats) | (st.repeats == 0)) & (st.start_time <= now) & ((st.stop_time == None) | (st.stop_time > now)) & (st.next_run_time <= now) & - (st.enabled == True) + (st.enabled == True) & + (st.id.belongs(no_deps)) ) + + limit = len(all_workers) * (50 / (len(wkgroups) or 1)) #if there are a moltitude of tasks, let's figure out a maximum of #tasks per worker. This can be further tuned with some added @@ -1302,6 +1379,30 @@ class Scheduler(MetaScheduler): status=STOPPED) return rtn + def get_workers(self, only_ticker=False): + """ Returns a dict holding worker_name : {**columns} + representing all "registered" workers + only_ticker returns only the worker running as a TICKER, + if there is any + """ + db = self.db + if only_ticker: + workers = db(db.scheduler_worker.is_ticker == True).select() + else: + workers = db(db.scheduler_worker.id > 0).select() + all_workers = {} + for row in workers: + all_workers[row.worker_name] = Storage(dict( + status=row.status, + first_heartbeat=row.first_heartbeat, + last_heartbeat=row.last_heartbeat, + group_names=row.group_names, + is_ticker=row.is_ticker, + worker_stats=row.worker_stats + ) + ) + return all_workers + def main(): """