new scheduler uses utc time, thanks Niphlod

This commit is contained in:
mdipierro
2012-08-06 10:15:36 -05:00
parent a244adfb4b
commit bc38d3c8f5
2 changed files with 25 additions and 15 deletions
+1 -1
View File
@@ -1 +1 @@
Version 2.00.0 (2012-08-06 10:13:35) dev
Version 2.00.0 (2012-08-06 10:15:34) dev
+24 -14
View File
@@ -351,7 +351,7 @@ class TYPE(object):
class Scheduler(MetaScheduler):
def __init__(self,db,tasks=None,migrate=True,
worker_name=None,group_names=None,heartbeat=HEARTBEAT,
max_empty_runs=0, discard_results=False):
max_empty_runs=0, discard_results=False, utc_time=False):
MetaScheduler.__init__(self)
@@ -369,17 +369,21 @@ class Scheduler(MetaScheduler):
self.discard_results = discard_results
self.is_a_ticker = False
self.do_assign_tasks = False
self.utc_time = utc_time
from gluon import current
current._scheduler = self
self.define_tables(db,migrate=migrate)
def now(self):
return self.utc_time and datetime.datetime.utcnow() or datetime.datetime.now()
def define_tables(self,db,migrate):
from gluon import current
from gluon.dal import DEFAULT
logging.debug('defining tables (migrate=%s)' % migrate)
now = datetime.datetime.now()
now = self.now()
db.define_table(
'scheduler_task',
Field('application_name',requires=IS_NOT_EMPTY(),
@@ -399,8 +403,8 @@ class Scheduler(MetaScheduler):
Field('start_time','datetime',default=now),
Field('next_run_time','datetime',default=now),
Field('stop_time','datetime'),
Field('repeats','integer',default=1,comment="0=unlimited"),
Field('repeats_failed', 'integer', default=1, comment="0=unlimited"),
Field('repeat','integer',default=1,comment="0=unlimited"),
Field('retry_failed', 'integer', default=0, comment="-1=unlimited"),
Field('period','integer',default=60,comment='seconds'),
Field('timeout','integer',default=60,comment='seconds'),
Field('times_run','integer',default=0,writable=False),
@@ -464,7 +468,7 @@ class Scheduler(MetaScheduler):
self.die()
def pop_task(self):
now = datetime.datetime.now()
now = self.now()
db, ts = self.db, self.db.scheduler_task
if self.is_a_ticker and self.do_assign_tasks:
#I'm a ticker, and 5 loops passed without reassigning tasks, let's do
@@ -493,7 +497,7 @@ class Scheduler(MetaScheduler):
return None
next_run_time = task.last_run_time + datetime.timedelta(seconds=task.period)
times_run = task.times_run + 1
if times_run < task.repeats or task.repeats==0:
if times_run < task.repeat or task.repeat==0:
run_again = True
else:
run_again = False
@@ -523,11 +527,12 @@ class Scheduler(MetaScheduler):
next_run_time=next_run_time,
times_run = times_run,
stop_time = task.stop_time,
repeats_failed = task.repeats_failed,
retry_failed = task.retry_failed,
times_failed = task.times_failed)
def report_task(self,task,task_report):
db = self.db
now = self.now()
if not self.discard_results:
if task_report.result != 'null' or task_report.tb:
#result is 'null' as a string if task completed
@@ -536,7 +541,7 @@ class Scheduler(MetaScheduler):
logging.debug(' recording task report in db (%s)' % task_report.status)
db(db.scheduler_run.id==task.run_id).update(
status = task_report.status,
stop_time = datetime.datetime.now(),
stop_time = now,
result = task_report.result,
output = task_report.output,
traceback = task_report.tb)
@@ -558,8 +563,8 @@ class Scheduler(MetaScheduler):
st_mapping = {'FAILED':'FAILED',
'TIMEOUT':'TIMEOUT',
'STOPPED':'QUEUED'}[task_report.status]
status = (task.repeats_failed and task.times_failed + 1 < task.repeats_failed
and QUEUED or task.repeats_failed==0 and QUEUED or st_mapping)
status = (task.retry_failed and task.times_failed < task.retry_failed
and QUEUED or task.retry_failed==-1 and QUEUED or st_mapping)
db(db.scheduler_task.id==task.task_id)\
(db.scheduler_task.status==RUNNING).update(
times_failed=db.scheduler_task.times_failed+1,
@@ -581,7 +586,7 @@ class Scheduler(MetaScheduler):
try:
db = self.db_thread
sw, st = db.scheduler_worker, db.scheduler_task
now = datetime.datetime.now()
now = self.now()
expiration = now-datetime.timedelta(seconds=self.heartbeat*3)
departure = now-datetime.timedelta(seconds=self.heartbeat*3*MAXHIBERNATION)
# record heartbeat
@@ -655,7 +660,7 @@ class Scheduler(MetaScheduler):
def assign_tasks(self):
db = self.db
sw, ts = db.scheduler_worker, db.scheduler_task
now = datetime.datetime.now()
now = self.now()
all_workers = db(sw.status == ACTIVE).select()
#build workers as dict of groups
wkgroups = {}
@@ -671,7 +676,7 @@ class Scheduler(MetaScheduler):
db(ts.status.belongs((QUEUED,ASSIGNED)))(ts.stop_time<now).update(status=EXPIRED)
all_available = db(ts.status.belongs((QUEUED,ASSIGNED)))\
((ts.times_run<ts.repeats)|(ts.repeats==0))\
((ts.times_run<ts.repeat)|(ts.repeat==0))\
(ts.start_time<=now)\
((ts.stop_time==None) | (ts.stop_time>now))\
(ts.next_run_time<=now)\
@@ -756,6 +761,10 @@ def main():
"-t", "--tasks",dest="tasks",default=None,
help="file containing task files, must define" + \
"tasks = {'task_name':(lambda: 'output')} or similar set of tasks")
parser.add_option(
"-U", "--utc-time", dest="utc_time", default=False,
help="work with UTC timestamps"
)
(options, args) = parser.parse_args()
if not options.tasks or not options.db_uri:
print USAGE
@@ -784,7 +793,8 @@ def main():
migrate = True,
group_names = group_names,
heartbeat = options.heartbeat,
max_empty_runs = options.max_empty_runs)
max_empty_runs = options.max_empty_runs,
utc_time = options.utc_time)
signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1))
print 'starting main worker loop...'
scheduler.loop()