diff --git a/gluon/scheduler.py b/gluon/scheduler.py index 606404cc..dae66a5f 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -231,9 +231,9 @@ def demo_function(*argv, **kwargs): time.sleep(1) return 'done' -#the two functions below deal with simplejson decoding as unicode, esp for the dict decode -#and subsequent usage as function Keyword arguments unicode variable names won't work! -#borrowed from http://stackoverflow.com/questions/956867/how-to-get-string-objects-instead-unicode-ones-from-json-in-python +# the two functions below deal with simplejson decoding as unicode, esp for the dict decode +# and subsequent usage as function Keyword arguments unicode variable names won't work! +# borrowed from http://stackoverflow.com/questions/956867/how-to-get-string-objects-instead-unicode-ones-from-json-in-python def _decode_list(lst): @@ -304,9 +304,9 @@ def executor(queue, task, out): if not isinstance(_function, CALLABLETYPES): raise NameError( "name '%s' not found in scheduler's environment" % f) - #Inject W2P_TASK into environment + # Inject W2P_TASK into environment _env.update({'W2P_TASK': W2P_TASK}) - #Inject W2P_TASK into current + # Inject W2P_TASK into current from gluon import current current.W2P_TASK = W2P_TASK globals().update(_env) @@ -795,7 +795,7 @@ class Scheduler(MetaScheduler): #let's do that and loop again self.wrapped_assign_tasks(db) return None - #ready to process something + # ready to process something grabbed = db( (st.assigned_worker_name == self.worker_name) & (st.status == ASSIGNED) @@ -804,12 +804,12 @@ class Scheduler(MetaScheduler): task = grabbed.select(limitby=(0, 1), orderby=st.next_run_time).first() if task: task.update_record(status=RUNNING, last_run_time=now) - #noone will touch my task! + # noone will touch my task! db.commit() logger.debug(' work to do %s', task.id) else: if self.is_a_ticker and self.greedy: - #there are other tasks ready to be assigned + # there are other tasks ready to be assigned logger.info('TICKER: greedy loop') self.wrapped_assign_tasks(db) else: @@ -825,10 +825,10 @@ class Scheduler(MetaScheduler): seconds=task.period * times_run ) if times_run < task.repeats or task.repeats == 0: - #need to run (repeating task) + # need to run (repeating task) run_again = True else: - #no need to run again + # no need to run again run_again = False run_id = 0 while True and not self.discard_results: @@ -889,9 +889,9 @@ class Scheduler(MetaScheduler): sr = db.scheduler_run if not self.discard_results: if task_report.result != 'null' or task_report.tb: - #result is 'null' as a string if task completed - #if it's stopped it's None as NoneType, so we record - #the STOPPED "run" anyway + # result is 'null' as a string if task completed + # if it's stopped it's None as NoneType, so we record + # the STOPPED "run" anyway logger.debug(' recording task report in db (%s)', task_report.status) db(sr.id == task.run_id).update( @@ -903,7 +903,7 @@ class Scheduler(MetaScheduler): else: logger.debug(' deleting task report in db because of no result') db(sr.id == task.run_id).delete() - #if there is a stop_time and the following run would exceed it + # if there is a stop_time and the following run would exceed it is_expired = (task.stop_time and task.next_run_time > task.stop_time and True or False) @@ -1056,16 +1056,16 @@ class Scheduler(MetaScheduler): ticker = all_active.find(lambda row: row.is_ticker is True).first() not_busy = self.w_stats.status == ACTIVE if not ticker: - #if no other tickers are around + # if no other tickers are around if not_busy: - #only if I'm not busy + # only if I'm not busy db(sw.worker_name == my_name).update(is_ticker=True) db(sw.worker_name != my_name).update(is_ticker=False) logger.info("TICKER: I'm a ticker") else: - #I'm busy + # I'm busy if len(all_active) >= 1: - #so I'll "downgrade" myself to a "poor worker" + # so I'll "downgrade" myself to a "poor worker" db(sw.worker_name == my_name).update(is_ticker=False) else: not_busy = True @@ -1085,7 +1085,7 @@ class Scheduler(MetaScheduler): sw, st, sd = db.scheduler_worker, db.scheduler_task, db.scheduler_task_deps now = self.now() all_workers = db(sw.status == ACTIVE).select() - #build workers as dict of groups + # build workers as dict of groups wkgroups = {} for w in all_workers: if w.worker_stats['status'] == 'RUNNING': @@ -1098,14 +1098,14 @@ class Scheduler(MetaScheduler): else: wkgroups[gname]['workers'].append( {'name': w.worker_name, 'c': 0}) - #set queued tasks that expired between "runs" (i.e., you turned off - #the scheduler): then it wasn't expired, but now it is + # set queued tasks that expired between "runs" (i.e., you turned off + # the scheduler): then it wasn't expired, but now it is db( (st.status.belongs((QUEUED, ASSIGNED))) & (st.stop_time < now) ).update(status=EXPIRED) - #calculate dependencies + # calculate dependencies deps_with_no_deps = db( (sd.can_visit == False) & (~sd.task_child.belongs( @@ -1114,7 +1114,7 @@ class Scheduler(MetaScheduler): ) )._select(sd.task_child) no_deps = db( - (st.status.belongs((QUEUED,ASSIGNED))) & + (st.status.belongs((QUEUED, ASSIGNED))) & ( (sd.id == None) | (st.id.belongs(deps_with_no_deps)) @@ -1137,27 +1137,27 @@ class Scheduler(MetaScheduler): limit = len(all_workers) * (50 / (len(wkgroups) or 1)) - #if there are a moltitude of tasks, let's figure out a maximum of - #tasks per worker. This can be further tuned with some added - #intelligence (like esteeming how many tasks will a worker complete - #before the ticker reassign them around, but the gain is quite small - #50 is a sweet spot also for fast tasks, with sane heartbeat values - #NB: ticker reassign tasks every 5 cycles, so if a worker completes its - #50 tasks in less than heartbeat*5 seconds, - #it won't pick new tasks until heartbeat*5 seconds pass. + # if there are a moltitude of tasks, let's figure out a maximum of + # tasks per worker. This can be further tuned with some added + # intelligence (like esteeming how many tasks will a worker complete + # before the ticker reassign them around, but the gain is quite small + # 50 is a sweet spot also for fast tasks, with sane heartbeat values + # NB: ticker reassign tasks every 5 cycles, so if a worker completes its + # 50 tasks in less than heartbeat*5 seconds, + # it won't pick new tasks until heartbeat*5 seconds pass. - #If a worker is currently elaborating a long task, its tasks needs to - #be reassigned to other workers - #this shuffles up things a bit, in order to give a task equal chances - #to be executed + # If a worker is currently elaborating a long task, its tasks needs to + # be reassigned to other workers + # this shuffles up things a bit, in order to give a task equal chances + # to be executed - #let's freeze it up + # let's freeze it up db.commit() x = 0 for group in wkgroups.keys(): tasks = all_available(st.group_name == group).select( limitby=(0, limit), orderby = st.next_run_time) - #let's break up the queue evenly among workers + # let's break up the queue evenly among workers for task in tasks: x += 1 gname = task.group_name @@ -1182,13 +1182,13 @@ class Scheduler(MetaScheduler): ).update(**d) wkgroups[gname]['workers'][myw]['c'] += 1 db.commit() - #I didn't report tasks but I'm working nonetheless!!!! + # I didn't report tasks but I'm working nonetheless!!!! if x > 0: self.w_stats.empty_runs = 0 self.w_stats.queue = x self.w_stats.distribution = wkgroups self.w_stats.workers = len(all_workers) - #I'll be greedy only if tasks assigned are equal to the limit + # I'll be greedy only if tasks assigned are equal to the limit # (meaning there could be others ready to be assigned) self.greedy = x >= limit logger.info('TICKER: workers are %s', len(all_workers)) @@ -1201,7 +1201,7 @@ class Scheduler(MetaScheduler): # should only sleep until next available task def set_worker_status(self, group_names=None, action=ACTIVE, - exclude=None, limit=None, worker_name=None): + exclude=None, limit=None, worker_name=None): """Internal function to set worker's status""" ws = self.db.scheduler_worker if not group_names: @@ -1220,10 +1220,9 @@ class Scheduler(MetaScheduler): ).update(status=action) else: for group in group_names: - workers = self.db( - (ws.group_names.contains(group)) & - (~ws.status.belongs(exclusion)) - )._select(ws.id, limitby=(0,limit)) + workers = self.db((ws.group_names.contains(group)) & + (~ws.status.belongs(exclusion)) + )._select(ws.id, limitby=(0, limit)) self.db(ws.id.belongs(workers)).update(status=action) def disable(self, group_names=None, limit=None, worker_name=None):