From 642ec2b934888daafa79f2391f799e2c3b4bed85 Mon Sep 17 00:00:00 2001 From: niphlod Date: Tue, 23 Feb 2016 23:27:45 +0100 Subject: [PATCH] fixes #1191 fix is in lines 828-835 . needed to backport total_seconds for py2.6 (694-701). everything else is just pep8. --- gluon/scheduler.py | 81 ++++++++++++++++++++++++++-------------------- 1 file changed, 46 insertions(+), 35 deletions(-) diff --git a/gluon/scheduler.py b/gluon/scheduler.py index e89b82fa..d97a81b7 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -215,7 +215,7 @@ class JobGraph(object): nested_dict = dict( (item, (dep - ordered)) for item, dep in nested_dict.items() if item not in ordered - ) + ) assert not nested_dict, "A cyclic dependency exists amongst %r" % nested_dict db.commit() return rtn @@ -297,7 +297,7 @@ def executor(queue, task, out): f = task.function functions = current._scheduler.tasks if not functions: - #look into env + # look into env _function = _env.get(f) else: _function = functions.get(f) @@ -314,7 +314,7 @@ def executor(queue, task, out): vars = loads(task.vars, object_hook=_decode_dict) result = dumps(_function(*args, **vars)) else: - ### for testing purpose only + # for testing purpose only result = eval(task.function)( *loads(task.args, object_hook=_decode_dict), **loads(task.vars, object_hook=_decode_dict)) @@ -663,7 +663,7 @@ class Scheduler(MetaScheduler): Field('traceback', 'text'), Field('worker_name', default=self.worker_name), migrate=self.__get_migrate('scheduler_run', migrate) - ) + ) db.define_table( 'scheduler_worker', @@ -675,23 +675,30 @@ class Scheduler(MetaScheduler): Field('group_names', 'list:string', default=self.group_names), Field('worker_stats', 'json'), migrate=self.__get_migrate('scheduler_worker', migrate) - ) + ) db.define_table( 'scheduler_task_deps', Field('job_name', default='job_0'), Field('task_parent', 'integer', - requires=IS_IN_DB(db, 'scheduler_task.id', - '%(task_name)s') - ), + requires=IS_IN_DB(db, 'scheduler_task.id', '%(task_name)s') + ), Field('task_child', 'reference scheduler_task'), Field('can_visit', 'boolean', default=False), migrate=self.__get_migrate('scheduler_task_deps', migrate) - ) + ) if migrate is not False: db.commit() + @staticmethod + def total_seconds(td): + # backport for py2.6 + if hasattr(td, 'total_seconds'): + return td.total_seconds() + else: + return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10.0 ** 6 + def loop(self, worker_name=None): """Main loop @@ -716,7 +723,7 @@ class Scheduler(MetaScheduler): while True and self.have_heartbeat: if self.w_stats.status == DISABLED: logger.debug('Someone stopped me, sleeping until better' - ' times come (%s)', self.w_stats.sleep) + ' times come (%s)', self.w_stats.sleep) self.sleep() continue logger.debug('looping...') @@ -733,7 +740,8 @@ class Scheduler(MetaScheduler): logger.debug('sleeping...') if self.max_empty_runs != 0: logger.debug('empty runs %s/%s', - self.w_stats.empty_runs, self.max_empty_runs) + self.w_stats.empty_runs, + self.max_empty_runs) if self.w_stats.empty_runs >= self.max_empty_runs: logger.info( 'empty runs limit reached, killing myself') @@ -789,15 +797,15 @@ class Scheduler(MetaScheduler): now = self.now() st = self.db.scheduler_task if self.is_a_ticker and self.do_assign_tasks: - #I'm a ticker, and 5 loops passed without reassigning tasks, - #let's do that and loop again + # I'm a ticker, and 5 loops passed without reassigning tasks, + # let's do that and loop again self.wrapped_assign_tasks(db) return None # ready to process something grabbed = db( (st.assigned_worker_name == self.worker_name) & (st.status == ASSIGNED) - ) + ) task = grabbed.select(limitby=(0, 1), orderby=st.next_run_time).first() if task: @@ -817,11 +825,15 @@ class Scheduler(MetaScheduler): if not task.prevent_drift: next_run_time = task.last_run_time + datetime.timedelta( seconds=task.period - ) - else: - next_run_time = task.start_time + datetime.timedelta( - seconds=task.period * times_run ) + else: + # calc next_run_time based on available slots + # see #1191 + next_run_time = task.start_time + secondspassed = self.total_seconds(now - next_run_time) + steps = secondspassed // task.period + 1 + next_run_time += datetime.timedelta(seconds=task.period * steps) + if times_run < task.repeats or task.repeats == 0: # need to run (repeating task) run_again = True @@ -843,7 +855,7 @@ class Scheduler(MetaScheduler): time.sleep(0.5) db.rollback() logger.info('new task %(id)s "%(task_name)s"' - ' %(application_name)s.%(function_name)s' % task) + ' %(application_name)s.%(function_name)s' % task) return Task( app=task.application_name, function=task.function_name, @@ -926,7 +938,7 @@ class Scheduler(MetaScheduler): and QUEUED or task.retry_failed == -1 and QUEUED or st_mapping) db(st.id == task.task_id).update( - times_failed=db.scheduler_task.times_failed + 1, + times_failed=st.times_failed + 1, next_run_time=task.next_run_time, status=status ) @@ -980,7 +992,7 @@ class Scheduler(MetaScheduler): # keep sleeping self.w_stats.status = DISABLED logger.debug('........recording heartbeat (%s)', - self.w_stats.status) + self.w_stats.status) db(sw.worker_name == self.worker_name).update( last_heartbeat=now, worker_stats=self.w_stats) @@ -997,7 +1009,7 @@ class Scheduler(MetaScheduler): logger.info('Asked to kill the current task') self.terminate_process() logger.debug('........recording heartbeat (%s)', - self.w_stats.status) + self.w_stats.status) db(sw.worker_name == self.worker_name).update( last_heartbeat=now, status=ACTIVE, worker_stats=self.w_stats) @@ -1023,7 +1035,7 @@ class Scheduler(MetaScheduler): db( (st.assigned_worker_name.belongs(dead_workers_name)) & (st.status == RUNNING) - ).update(assigned_worker_name='', status=QUEUED) + ).update(assigned_worker_name='', status=QUEUED) dead_workers.delete() try: self.is_a_ticker = self.being_a_ticker() @@ -1108,20 +1120,20 @@ class Scheduler(MetaScheduler): (sd.can_visit == False) & (~sd.task_child.belongs( db(sd.can_visit == False)._select(sd.task_parent) - ) ) - )._select(sd.task_child) + ) + )._select(sd.task_child) no_deps = db( (st.status.belongs((QUEUED, ASSIGNED))) & ( (sd.id == None) | (st.id.belongs(deps_with_no_deps)) ) - )._select(st.id, distinct=True, left=sd.on( - (st.id == sd.task_parent) & - (sd.can_visit == False) - ) - ) + )._select(st.id, distinct=True, left=sd.on( + (st.id == sd.task_parent) & + (sd.can_visit == False) + ) + ) all_available = db( (st.status.belongs((QUEUED, ASSIGNED))) & @@ -1133,7 +1145,6 @@ class Scheduler(MetaScheduler): (st.id.belongs(no_deps)) ) - limit = len(all_workers) * (50 / (len(wkgroups) or 1)) # if there are a moltitude of tasks, let's figure out a maximum of # tasks per worker. This can be further tuned with some added @@ -1177,7 +1188,7 @@ class Scheduler(MetaScheduler): db( (st.id == task.id) & (st.status.belongs((QUEUED, ASSIGNED))) - ).update(**d) + ).update(**d) wkgroups[gname]['workers'][myw]['c'] += 1 db.commit() # I didn't report tasks but I'm working nonetheless!!!! @@ -1215,7 +1226,7 @@ class Scheduler(MetaScheduler): self.db( (ws.group_names.contains(group)) & (~ws.status.belongs(exclusion)) - ).update(status=action) + ).update(status=action) else: for group in group_names: workers = self.db((ws.group_names.contains(group)) & @@ -1300,7 +1311,7 @@ class Scheduler(MetaScheduler): if immediate: self.db( (self.db.scheduler_worker.is_ticker == True) - ).update(status=PICK) + ).update(status=PICK) else: rtn.uuid = None return rtn @@ -1350,7 +1361,7 @@ class Scheduler(MetaScheduler): **dict(orderby=orderby, left=left, limitby=(0, 1)) - ).first() + ).first() if row and output: row.result = row.scheduler_run.run_result and \ loads(row.scheduler_run.run_result,