From 0e09f84ab697f7dab98ca590540f633caa8d3e36 Mon Sep 17 00:00:00 2001 From: mdipierro Date: Mon, 9 Jul 2012 21:08:57 -0500 Subject: [PATCH] better scheduler, thanks Niphlod --- VERSION | 2 +- gluon/scheduler.py | 120 +++++++++++++++++++++++++++++---------------- 2 files changed, 80 insertions(+), 42 deletions(-) diff --git a/VERSION b/VERSION index 5f0494d7..aa2f34dd 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -Version 2.00.0 (2012-07-09 17:41:27) dev +Version 2.00.0 (2012-07-09 21:08:54) dev diff --git a/gluon/scheduler.py b/gluon/scheduler.py index 25b162b7..80c8cda9 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -86,7 +86,7 @@ except: from simplejson import loads, dumps -from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET +from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET, IS_NOT_IN_DB from gluon.utils import web2py_uuid QUEUED = 'QUEUED' @@ -97,8 +97,9 @@ FAILED = 'FAILED' TIMEOUT = 'TIMEOUT' STOPPED = 'STOPPED' ACTIVE = 'ACTIVE' -INACTIVE = 'INACTIVE' +TERMINATE = 'TERMINATE' DISABLED = 'DISABLED' +KILL = 'KILL' EXPIRED = 'EXPIRED' SECONDS = 1 HEARTBEAT = 3*SECONDS @@ -183,7 +184,7 @@ def executor(queue,task): f = task.function # First look for the func in tasks, else look in models _function = current._scheduler.tasks.get(f) or _env.get(f) - assert _function, 'Function %s not found in scheduler\'s environmen + assert _function, "Function %s not found in scheduler's environment" globals().update(_env) args = loads(task.args) vars = loads(task.vars, object_hook=_decode_dict) @@ -226,7 +227,7 @@ class MetaScheduler(threading.Thread): p.terminate() p.join() self.have_heartbeat = False - logging.debug(' task stopped') + logging.debug(' task stopped by general exception') return TaskReport(STOPPED) if p.is_alive(): p.terminate() @@ -246,6 +247,10 @@ class MetaScheduler(threading.Thread): self.have_heartbeat = False self.terminate_process() + def give_up(self): + logging.info('Giving up as soon as possible!') + self.have_heartbeat = False + def terminate_process(self): try: self.process.terminate() @@ -293,7 +298,7 @@ class MetaScheduler(threading.Thread): else: self.empty_runs += 1 logging.debug('sleeping...') - if self.max_empty_runs <> 0: + if self.max_empty_runs != 0: logging.debug('empty runs %s/%s', self.empty_runs, self.max_empty_runs) if self.empty_runs >= self.max_empty_runs: logging.info('empty runs limit reached, killing myself') @@ -305,7 +310,7 @@ class MetaScheduler(threading.Thread): TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED, EXPIRED) RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED) -WORKER_STATUS = (ACTIVE,INACTIVE,DISABLED) +WORKER_STATUS = (ACTIVE, DISABLED, TERMINATE, KILL) class TYPE(object): """ @@ -333,7 +338,8 @@ class TYPE(object): class Scheduler(MetaScheduler): def __init__(self,db,tasks={},migrate=True, - worker_name=None,group_names=None,heartbeat=HEARTBEAT,max_empty_runs=0): + worker_name=None,group_names=None,heartbeat=HEARTBEAT, + max_empty_runs=0, discard_results=False): MetaScheduler.__init__(self) @@ -348,6 +354,7 @@ class Scheduler(MetaScheduler): #hibernation (i.e. when someone stop the #worker acting on the scheduler_worker table) self.max_empty_runs = max_empty_runs + self.discard_results = discard_results self.is_a_ticker = False self.do_assign_tasks = False @@ -370,25 +377,30 @@ class Scheduler(MetaScheduler): Field('status',requires=IS_IN_SET(TASK_STATUS), default=QUEUED,writable=False), Field('function_name', - requires=IS_IN_SET(sorted(self.tasks.keys())) \ - if self.tasks else DEFAULT), + requires=IS_IN_SET(sorted(self.tasks.keys())) + if self.tasks else DEFAULT), + Field('uuid', requires=IS_NOT_IN_DB(db, 'scheduler_task.uuid'), + unique=True, default=web2py_uuid), Field('args','text',default='[]',requires=TYPE(list)), Field('vars','text',default='{}',requires=TYPE(dict)), Field('enabled','boolean',default=True), Field('start_time','datetime',default=now), Field('next_run_time','datetime',default=now), Field('stop_time','datetime'), - Field('repeats','integer',default=1,comment="0=unlimted"), + Field('repeats','integer',default=1,comment="0=unlimited"), + Field('repeats_failed', 'integer', default=1, comment="0=unlimited"), Field('period','integer',default=60,comment='seconds'), Field('timeout','integer',default=60,comment='seconds'), Field('times_run','integer',default=0,writable=False), + Field('times_failed','integer',default=0,writable=False), Field('last_run_time','datetime',writable=False,readable=False), Field('assigned_worker_name',default='',writable=False), migrate=migrate,format='%(task_name)s') if hasattr(current,'request'): - db.scheduler_task.application_name.default = \ - '%s/%s' % (current.request.application, - current.request.controller) + db.scheduler_task.application_name.default= '%s/%s' % ( + current.request.application, current.request.controller + ) + db.define_table( 'scheduler_run', Field('scheduler_task','reference scheduler_task'), @@ -403,7 +415,7 @@ class Scheduler(MetaScheduler): db.define_table( 'scheduler_worker', - Field('worker_name'), + Field('worker_name', unique=True), Field('first_heartbeat','datetime'), Field('last_heartbeat','datetime'), Field('status',requires=IS_IN_SET(WORKER_STATUS)), @@ -428,13 +440,14 @@ class Scheduler(MetaScheduler): else: self.empty_runs += 1 logging.debug('sleeping...') - if self.max_empty_runs <> 0: + if self.max_empty_runs != 0: logging.debug('empty runs %s/%s', self.empty_runs, self.max_empty_runs) if self.empty_runs >= self.max_empty_runs: logging.info('empty runs limit reached, killing myself') self.die() self.sleep() - except KeyboardInterrupt: + except (KeyboardInterrupt, SystemExit): + logging.info('catched') self.die() def pop_task(self): @@ -471,8 +484,9 @@ class Scheduler(MetaScheduler): run_again = True else: run_again = False - logging.debug(' new scheduler_run record') - while True: + run_id = 0 + while True and not self.discard_results: + logging.debug(' new scheduler_run record') try: run_id = db.scheduler_run.insert( scheduler_task = task.id, @@ -495,35 +509,49 @@ class Scheduler(MetaScheduler): run_again = run_again, next_run_time=next_run_time, times_run = times_run, - stop_time = task.stop_time) + stop_time = task.stop_time, + repeats_failed = task.repeats_failed, + times_failed = task.times_failed) def report_task(self,task,task_report): - logging.debug(' recording task report in db (%s)' % task_report.status) db = self.db - db(db.scheduler_run.id==task.run_id).update( - status = task_report.status, - stop_time = datetime.datetime.now(), - result = task_report.result, - output = task_report.output, - traceback = task_report.tb) + if not self.discard_results: + if task_report.result != 'null' or task_report.tb: + #result is 'null' as a string if task completed + #if it's stopped it's None as NoneType, so we record + #the STOPPED "run" anyway + logging.debug(' recording task report in db (%s)' % task_report.status) + db(db.scheduler_run.id==task.run_id).update( + status = task_report.status, + stop_time = datetime.datetime.now(), + result = task_report.result, + output = task_report.output, + traceback = task_report.tb) + else: + logging.debug(' deleting task report in db because of no result') + db(db.scheduler_run.id==task.run_id).delete() is_expired = task.stop_time and task.next_run_time > task.stop_time and True or False status = (task.run_again and is_expired and EXPIRED or task.run_again and not is_expired and QUEUED or COMPLETED) if task_report.status == COMPLETED: d = dict(status = status, next_run_time = task.next_run_time, - times_run = task.times_run) - #I'd like to know who worked my task, reviewing some logs... - #,assigned_worker_name = '') + times_run = task.times_run, + times_failed = 0 #reset times_failed counter for the next run + ) + db(db.scheduler_task.id==task.task_id)\ + (db.scheduler_task.status==RUNNING).update(**d) else: - d = dict( - #same as before... - #assigned_worker_name = '', - status = {'FAILED':'FAILED', + st_mapping = {'FAILED':'FAILED', 'TIMEOUT':'TIMEOUT', - 'STOPPED':'QUEUED'}[task_report.status]) - db(db.scheduler_task.id==task.task_id)\ - (db.scheduler_task.status==RUNNING).update(**d) + 'STOPPED':'QUEUED'}[task_report.status] + status = (task.repeats_failed and task.times_failed + 1 < task.repeats_failed + and QUEUED or task.repeats_failed==0 and QUEUED or st_mapping) + db(db.scheduler_task.id==task.task_id)\ + (db.scheduler_task.status==RUNNING).update( + times_failed=db.scheduler_task.times_failed+1, + next_run_time = task.next_run_time, + status=status) db.commit() logging.info('task completed (%s)' % task_report.status) @@ -557,6 +585,16 @@ class Scheduler(MetaScheduler): logging.debug('........recording heartbeat') db(sw.worker_name==self.worker_name).update( last_heartbeat = now) + + elif mybackedstatus.status == TERMINATE: + self.worker_status = TERMINATE, self.worker_status[1] + logging.debug("Waiting to terminate the current task") + self.give_up() + return + elif mybackedstatus.status == KILL: + self.worker_status = KILL, self.worker_status[1] + self.die() + else: logging.debug('........recording heartbeat') db(sw.worker_name==self.worker_name).update( @@ -570,7 +608,7 @@ class Scheduler(MetaScheduler): logging.debug(' freeing workers that have not sent heartbeat') inactive_workers = db( ((sw.last_heartbeat DISABLED: + if self.worker_status[0] == ACTIVE: self.do_assign_tasks = True except: pass @@ -591,10 +629,10 @@ class Scheduler(MetaScheduler): def being_a_ticker(self): db = self.db_thread sw = db.scheduler_worker - ticker = db((sw.worker_name <> self.worker_name) & (sw.is_ticker == True) & (sw.status == ACTIVE)).select().first() + ticker = db((sw.worker_name != self.worker_name) & (sw.is_ticker == True) & (sw.status == ACTIVE)).select().first() if not ticker: db(sw.worker_name == self.worker_name).update(is_ticker = True) - db(sw.worker_name <> self.worker_name).update(is_ticker = False) + db(sw.worker_name != self.worker_name).update(is_ticker = False) logging.info("TICKER: I'm a ticker (%s)" % self.worker_name) return True else: @@ -605,7 +643,7 @@ class Scheduler(MetaScheduler): db = self.db sw, ts = db.scheduler_worker, db.scheduler_task now = datetime.datetime.now() - all_workers = db(sw.status <> DISABLED).select() + all_workers = db(sw.status == ACTIVE).select() #build workers as dict of groups wkgroups = {} for w in all_workers: