diff --git a/gluon/scheduler.py b/gluon/scheduler.py index 254c442f..91ae2e92 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -87,11 +87,17 @@ if 'WEB2PY_PATH' not in os.environ: os.environ['WEB2PY_PATH'] = path try: - from gluon.contrib.simplejson import loads, dumps -except: + # try external module from simplejson import loads, dumps +except ImportError: + try: + # try stdlib (Python >= 2.6) + from json import loads, dumps + except: + # fallback to pure-Python module + from gluon.contrib.simplejson import loads, dumps -IDENTIFIER = "%s#%s" % (socket.gethostname(),os.getpid()) +IDENTIFIER = "%s#%s" % (socket.gethostname(), os.getpid()) logger = logging.getLogger('web2py.scheduler.%s' % IDENTIFIER) @@ -160,6 +166,7 @@ class TaskReport(object): def __str__(self): return '' % self.status + class JobGraph(object): """Experimental: with JobGraph you can specify dependencies amongs tasks""" @@ -170,7 +177,9 @@ class JobGraph(object): def add_deps(self, task_parent, task_child): """Creates a dependency between task_parent and task_child""" - self.db.scheduler_task_deps.insert(task_parent=task_parent, task_child=task_child, job_name=self.job_name) + self.db.scheduler_task_deps.insert(task_parent=task_parent, + task_child=task_child, + job_name=self.job_name) def validate(self, job_name): """Validates if all tasks job_name can be completed, i.e. there @@ -195,16 +204,18 @@ class JobGraph(object): try: rtn = [] for k, v in nested_dict.items(): - v.discard(k) # Ignore self dependencies + v.discard(k) # Ignore self dependencies extra_items_in_deps = reduce(set.union, nested_dict.values()) - set(nested_dict.keys()) nested_dict.update(dict((item, set()) for item in extra_items_in_deps)) while True: - ordered = set(item for item,dep in nested_dict.items() if not dep) + ordered = set(item for item, dep in nested_dict.items() if not dep) if not ordered: break rtn.append(ordered) - nested_dict = dict((item, (dep - ordered)) for item, dep in nested_dict.items() - if item not in ordered) + nested_dict = dict( + (item, (dep - ordered)) for item, dep in nested_dict.items() + if item not in ordered + ) assert not nested_dict, "A cyclic dependency exists amongst %r" % nested_dict db.commit() return rtn @@ -212,6 +223,7 @@ class JobGraph(object): db.rollback() return None + def demo_function(*argv, **kwargs): """ test function """ for i in range(argv[0]): @@ -268,7 +280,7 @@ def executor(queue, task, out): def write(self, data): self.out_queue.put(data) - W2P_TASK = Storage({'id' : task.task_id, 'uuid' : task.uuid}) + W2P_TASK = Storage({'id': task.task_id, 'uuid': task.uuid}) stdout = LogOutput(out) try: if task.app: @@ -293,7 +305,7 @@ def executor(queue, task, out): raise NameError( "name '%s' not found in scheduler's environment" % f) #Inject W2P_TASK into environment - _env.update({'W2P_TASK' : W2P_TASK}) + _env.update({'W2P_TASK': W2P_TASK}) #Inject W2P_TASK into current from gluon import current current.W2P_TASK = W2P_TASK @@ -357,8 +369,7 @@ class MetaScheduler(threading.Thread): start = time.time() - while p.is_alive() and ( - not task.timeout or time.time() - start < task.timeout): + while p.is_alive() and (not task.timeout or time.time() - start < task.timeout): if tout: try: logger.debug(' partial output saved') @@ -568,7 +579,7 @@ class Scheduler(MetaScheduler): queue=0, distribution=None, workers=0) - ) #dict holding statistics + ) # dict holding statistics from gluon import current current._scheduler = self @@ -740,7 +751,7 @@ class Scheduler(MetaScheduler): contention and retries `assign_task` after 0.5 seconds """ logger.debug('Assigning tasks...') - db.commit() #db.commit() only for Mysql + db.commit() # db.commit() only for Mysql x = 0 while x < 10: try: @@ -761,7 +772,7 @@ class Scheduler(MetaScheduler): contention and retries `pop_task` after 0.5 seconds """ db = self.db - db.commit() #another nifty db.commit() only for Mysql + db.commit() # another nifty db.commit() only for Mysql x = 0 while x < 10: try: @@ -1077,6 +1088,8 @@ class Scheduler(MetaScheduler): #build workers as dict of groups wkgroups = {} for w in all_workers: + if w.worker_stats['status'] == 'RUNNING': + continue group_names = w.group_names for gname in group_names: if gname not in wkgroups: @@ -1090,8 +1103,9 @@ class Scheduler(MetaScheduler): db( (st.status.belongs((QUEUED, ASSIGNED))) & (st.stop_time < now) - ).update(status=EXPIRED) + ).update(status=EXPIRED) + #calculate dependencies deps_with_no_deps = db( (sd.can_visit == False) & (~sd.task_child.belongs( @@ -1163,7 +1177,7 @@ class Scheduler(MetaScheduler): if not task.task_name: d['task_name'] = task.function_name db( - (st.id==task.id) & + (st.id == task.id) & (st.status.belongs((QUEUED, ASSIGNED))) ).update(**d) wkgroups[gname]['workers'][myw]['c'] += 1 @@ -1207,8 +1221,8 @@ class Scheduler(MetaScheduler): else: for group in group_names: workers = self.db( - (ws.group_names.contains(group)) & - (~ws.status.belongs(exclusion)) + (ws.group_names.contains(group)) & + (~ws.status.belongs(exclusion)) )._select(ws.id, limitby=(0,limit)) self.db(ws.id.belongs(workers)).update(status=action) @@ -1339,7 +1353,7 @@ class Scheduler(MetaScheduler): **dict(orderby=orderby, left=left, limitby=(0, 1)) - ).first() + ).first() if row and output: row.result = row.scheduler_run.run_result and \ loads(row.scheduler_run.run_result,