From bc38d3c8f5cf4c929b83246575b07f7a27be7544 Mon Sep 17 00:00:00 2001 From: mdipierro Date: Mon, 6 Aug 2012 10:15:36 -0500 Subject: [PATCH] new scheduler uses utc time, thanks Niphlod --- VERSION | 2 +- gluon/scheduler.py | 38 ++++++++++++++++++++++++-------------- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/VERSION b/VERSION index 2240f5e6..0e6b9919 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -Version 2.00.0 (2012-08-06 10:13:35) dev +Version 2.00.0 (2012-08-06 10:15:34) dev diff --git a/gluon/scheduler.py b/gluon/scheduler.py index d56c6158..1e4f9fff 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -351,7 +351,7 @@ class TYPE(object): class Scheduler(MetaScheduler): def __init__(self,db,tasks=None,migrate=True, worker_name=None,group_names=None,heartbeat=HEARTBEAT, - max_empty_runs=0, discard_results=False): + max_empty_runs=0, discard_results=False, utc_time=False): MetaScheduler.__init__(self) @@ -369,17 +369,21 @@ class Scheduler(MetaScheduler): self.discard_results = discard_results self.is_a_ticker = False self.do_assign_tasks = False + self.utc_time = utc_time from gluon import current current._scheduler = self self.define_tables(db,migrate=migrate) + def now(self): + return self.utc_time and datetime.datetime.utcnow() or datetime.datetime.now() + def define_tables(self,db,migrate): from gluon import current from gluon.dal import DEFAULT logging.debug('defining tables (migrate=%s)' % migrate) - now = datetime.datetime.now() + now = self.now() db.define_table( 'scheduler_task', Field('application_name',requires=IS_NOT_EMPTY(), @@ -399,8 +403,8 @@ class Scheduler(MetaScheduler): Field('start_time','datetime',default=now), Field('next_run_time','datetime',default=now), Field('stop_time','datetime'), - Field('repeats','integer',default=1,comment="0=unlimited"), - Field('repeats_failed', 'integer', default=1, comment="0=unlimited"), + Field('repeat','integer',default=1,comment="0=unlimited"), + Field('retry_failed', 'integer', default=0, comment="-1=unlimited"), Field('period','integer',default=60,comment='seconds'), Field('timeout','integer',default=60,comment='seconds'), Field('times_run','integer',default=0,writable=False), @@ -464,7 +468,7 @@ class Scheduler(MetaScheduler): self.die() def pop_task(self): - now = datetime.datetime.now() + now = self.now() db, ts = self.db, self.db.scheduler_task if self.is_a_ticker and self.do_assign_tasks: #I'm a ticker, and 5 loops passed without reassigning tasks, let's do @@ -493,7 +497,7 @@ class Scheduler(MetaScheduler): return None next_run_time = task.last_run_time + datetime.timedelta(seconds=task.period) times_run = task.times_run + 1 - if times_run < task.repeats or task.repeats==0: + if times_run < task.repeat or task.repeat==0: run_again = True else: run_again = False @@ -523,11 +527,12 @@ class Scheduler(MetaScheduler): next_run_time=next_run_time, times_run = times_run, stop_time = task.stop_time, - repeats_failed = task.repeats_failed, + retry_failed = task.retry_failed, times_failed = task.times_failed) def report_task(self,task,task_report): db = self.db + now = self.now() if not self.discard_results: if task_report.result != 'null' or task_report.tb: #result is 'null' as a string if task completed @@ -536,7 +541,7 @@ class Scheduler(MetaScheduler): logging.debug(' recording task report in db (%s)' % task_report.status) db(db.scheduler_run.id==task.run_id).update( status = task_report.status, - stop_time = datetime.datetime.now(), + stop_time = now, result = task_report.result, output = task_report.output, traceback = task_report.tb) @@ -558,8 +563,8 @@ class Scheduler(MetaScheduler): st_mapping = {'FAILED':'FAILED', 'TIMEOUT':'TIMEOUT', 'STOPPED':'QUEUED'}[task_report.status] - status = (task.repeats_failed and task.times_failed + 1 < task.repeats_failed - and QUEUED or task.repeats_failed==0 and QUEUED or st_mapping) + status = (task.retry_failed and task.times_failed < task.retry_failed + and QUEUED or task.retry_failed==-1 and QUEUED or st_mapping) db(db.scheduler_task.id==task.task_id)\ (db.scheduler_task.status==RUNNING).update( times_failed=db.scheduler_task.times_failed+1, @@ -581,7 +586,7 @@ class Scheduler(MetaScheduler): try: db = self.db_thread sw, st = db.scheduler_worker, db.scheduler_task - now = datetime.datetime.now() + now = self.now() expiration = now-datetime.timedelta(seconds=self.heartbeat*3) departure = now-datetime.timedelta(seconds=self.heartbeat*3*MAXHIBERNATION) # record heartbeat @@ -655,7 +660,7 @@ class Scheduler(MetaScheduler): def assign_tasks(self): db = self.db sw, ts = db.scheduler_worker, db.scheduler_task - now = datetime.datetime.now() + now = self.now() all_workers = db(sw.status == ACTIVE).select() #build workers as dict of groups wkgroups = {} @@ -671,7 +676,7 @@ class Scheduler(MetaScheduler): db(ts.status.belongs((QUEUED,ASSIGNED)))(ts.stop_timenow))\ (ts.next_run_time<=now)\ @@ -756,6 +761,10 @@ def main(): "-t", "--tasks",dest="tasks",default=None, help="file containing task files, must define" + \ "tasks = {'task_name':(lambda: 'output')} or similar set of tasks") + parser.add_option( + "-U", "--utc-time", dest="utc_time", default=False, + help="work with UTC timestamps" + ) (options, args) = parser.parse_args() if not options.tasks or not options.db_uri: print USAGE @@ -784,7 +793,8 @@ def main(): migrate = True, group_names = group_names, heartbeat = options.heartbeat, - max_empty_runs = options.max_empty_runs) + max_empty_runs = options.max_empty_runs, + utc_time = options.utc_time) signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1)) print 'starting main worker loop...' scheduler.loop()