From f4efdebeb3f4d8cd6ce8f6024ef602402cfbec66 Mon Sep 17 00:00:00 2001 From: mdipierro Date: Thu, 16 Aug 2012 17:51:08 -0500 Subject: [PATCH] better scheduler, thanks Niphlod and dhx --- VERSION | 2 +- gluon/scheduler.py | 175 ++++++++++++++++++++++++++++++--------------- 2 files changed, 119 insertions(+), 58 deletions(-) diff --git a/VERSION b/VERSION index b370822f..0701f20c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -Version 2.00.0 (2012-08-16 11:56:03) dev +Version 2.00.0 (2012-08-16 17:51:02) dev diff --git a/gluon/scheduler.py b/gluon/scheduler.py index 8d986f8c..291f2cf9 100644 --- a/gluon/scheduler.py +++ b/gluon/scheduler.py @@ -29,16 +29,16 @@ or python scheduler.py -h ## schedule jobs using -http://127.0.0.1:8000/scheduler/appadmin/insert/db/scheduler_task +http://127.0.0.1:8000/myapp/appadmin/insert/db/scheduler_task ## monitor scheduled jobs -http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_task.id>0 +http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_task.id>0 ## view completed jobs -http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_run.id>0 +http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_run.id>0 ## view workers -http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_worker.id>0 +http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_worker.id>0 ## To install the scheduler as a permanent daemon on Linux (w/ Upstart), put the ## following into /etc/init/web2py-scheduler.conf: @@ -63,7 +63,6 @@ import os import time import multiprocessing import sys -import signal import cStringIO import threading import traceback @@ -73,6 +72,7 @@ import datetime import logging import optparse import types +import Queue if 'WEB2PY_PATH' in os.environ: sys.path.append(os.environ['WEB2PY_PATH']) @@ -107,8 +107,9 @@ EXPIRED = 'EXPIRED' SECONDS = 1 HEARTBEAT = 3*SECONDS MAXHIBERNATION = 10 +CLEAROUT = '!clear!' -CALLABLETYPES = (types.LambdaType, types.FunctionType, +CALLABLETYPES = (types.LambdaType, types.FunctionType, types.BuiltinFunctionType, types.MethodType, types.BuiltinMethodType) @@ -170,10 +171,27 @@ def _decode_dict(dct): newdict[k] = v return newdict -def executor(queue,task): +def executor(queue,task, out): """ the background process """ logging.debug(' task started') - stdout, sys.stdout = sys.stdout, cStringIO.StringIO() + + class LogOutput(object): + """Facility to log output at intervals""" + def __init__(self, out_queue): + self.out_queue = out_queue + self.stdout = sys.stdout + sys.stdout = self + self.istr = "" + def __del__(self): + sys.stdout = self.stdout + def write(self,data): + self.out_queue.put(data) + self.istr += data + def getvalue(self): + return self.istr + + #stdout, sys.stdout = sys.stdout, cStringIO.StringIO() + stdout = LogOutput(out) try: if task.app: os.chdir(os.environ['WEB2PY_PATH']) @@ -206,17 +224,18 @@ def executor(queue,task): result = eval(task.function)( *loads(task.args, object_hook=_decode_dict), **loads(task.vars, object_hook=_decode_dict)) - stdout, sys.stdout = sys.stdout, stdout + #stdout, sys.stdout = sys.stdout, stdout + sys.stdout = stdout.stdout queue.put(TaskReport(COMPLETED, result,stdout.getvalue())) except BaseException,e: - sys.stdout = stdout + sys.stdout = stdout.stdout tb = traceback.format_exc() - queue.put(TaskReport(FAILED,tb=tb)) + queue.put(TaskReport(FAILED,tb=tb, output=stdout.getvalue())) class MetaScheduler(threading.Thread): def __init__(self): threading.Thread.__init__(self) - self.process = None # the backround process + self.process = None # the background process self.have_heartbeat = True # set to False to kill self.empty_runs = 0 @@ -228,13 +247,33 @@ class MetaScheduler(threading.Thread): ('timeout',None,None) ('terminated',None,None) """ + db = self.db + sr = db.scheduler_run + out = multiprocessing.Queue() queue = multiprocessing.Queue(maxsize=1) - p = multiprocessing.Process(target=executor,args=(queue,task)) + p = multiprocessing.Process(target=executor,args=(queue,task,out)) self.process = p logging.debug(' task starting') p.start() try: - p.join(task.timeout) + if task.sync_output > 0: + task_output = "" + start = time.time() + while p.is_alive() and (time.time()-start < task.timeout): + p.join(timeout=task.sync_output) + tout = "" + while not out.empty(): + tout += out.get() + if tout: + logging.debug(' partial output: "%s"' % str(tout)) + if CLEAROUT in tout: + task_output = tout[tout.rfind(CLEAROUT)+len(CLEAROUT):] + else: + task_output += tout + db(sr.id==task.run_id).update(output = task_output) + db.commit() + else: + p.join(task.timeout) except: p.terminate() p.join() @@ -243,9 +282,13 @@ class MetaScheduler(threading.Thread): return TaskReport(STOPPED) if p.is_alive(): p.terminate() - p.join() logging.debug(' task timeout') - return TaskReport(TIMEOUT) + try: + tr = queue.get(timeout=2) + tr.status = TIMEOUT + except Queue.Empty: + tr = TaskReport(TIMEOUT) + return tr elif queue.empty(): self.have_heartbeat = False logging.debug(' task stopped') @@ -407,6 +450,8 @@ class Scheduler(MetaScheduler): Field('retry_failed', 'integer', default=0, comment="-1=unlimited"), Field('period','integer',default=60,comment='seconds'), Field('timeout','integer',default=60,comment='seconds'), + Field('sync_output', 'integer', default=0, + comment="update output every n sec: 0=never"), Field('times_run','integer',default=0,writable=False), Field('times_failed','integer',default=0,writable=False), Field('last_run_time','datetime',writable=False,readable=False), @@ -483,6 +528,7 @@ class Scheduler(MetaScheduler): db.rollback() logging.error('TICKER: error assigning tasks') return None + db.commit() grabbed = db(ts.assigned_worker_name==self.worker_name)\ (ts.status==ASSIGNED) @@ -528,50 +574,65 @@ class Scheduler(MetaScheduler): times_run = times_run, stop_time = task.stop_time, retry_failed = task.retry_failed, - times_failed = task.times_failed) + times_failed = task.times_failed, + sync_output = task.sync_output) def report_task(self,task,task_report): db = self.db now = self.now() - if not self.discard_results: - if task_report.result != 'null' or task_report.tb: - #result is 'null' as a string if task completed - #if it's stopped it's None as NoneType, so we record - #the STOPPED "run" anyway - logging.debug(' recording task report in db (%s)' % task_report.status) - db(db.scheduler_run.id==task.run_id).update( - status = task_report.status, - stop_time = now, - result = task_report.result, - output = task_report.output, - traceback = task_report.tb) - else: - logging.debug(' deleting task report in db because of no result') - db(db.scheduler_run.id==task.run_id).delete() - is_expired = task.stop_time and task.next_run_time > task.stop_time and True or False - status = (task.run_again and is_expired and EXPIRED - or task.run_again and not is_expired and QUEUED or COMPLETED) - if task_report.status == COMPLETED: - d = dict(status = status, - next_run_time = task.next_run_time, - times_run = task.times_run, - times_failed = 0 #reset times_failed counter for the next run - ) - db(db.scheduler_task.id==task.task_id)\ - (db.scheduler_task.status==RUNNING).update(**d) - else: - st_mapping = {'FAILED':'FAILED', - 'TIMEOUT':'TIMEOUT', - 'STOPPED':'QUEUED'}[task_report.status] - status = (task.retry_failed and task.times_failed < task.retry_failed - and QUEUED or task.retry_failed==-1 and QUEUED or st_mapping) - db(db.scheduler_task.id==task.task_id)\ - (db.scheduler_task.status==RUNNING).update( - times_failed=db.scheduler_task.times_failed+1, - next_run_time = task.next_run_time, - status=status) - db.commit() - logging.info('task completed (%s)' % task_report.status) + while True: + try: + if not self.discard_results: + if task_report.result != 'null' or task_report.tb: + #result is 'null' as a string if task completed + #if it's stopped it's None as NoneType, so we record + #the STOPPED "run" anyway + logging.debug(' recording task report in db (%s)' % task_report.status) + #CLEAROUT clears the output + tout = task_report.output + if tout and CLEAROUT in tout: + tout = tout[tout.rfind(CLEAROUT)+len(CLEAROUT):] + db(db.scheduler_run.id==task.run_id).update( + status = task_report.status, + stop_time = now, + result = task_report.result, + output = tout, + traceback = task_report.tb) + else: + logging.debug(' deleting task report in db because of no result') + db(db.scheduler_run.id==task.run_id).delete() + is_expired = (task.stop_time + and task.next_run_time > task.stop_time + and True or False) + status = (task.run_again and is_expired and EXPIRED + or task.run_again and not is_expired + and QUEUED or COMPLETED) + if task_report.status == COMPLETED: + d = dict(status = status, + next_run_time = task.next_run_time, + times_run = task.times_run, + times_failed = 0 + ) + db(db.scheduler_task.id==task.task_id)\ + (db.scheduler_task.status==RUNNING).update(**d) + else: + st_mapping = {'FAILED':'FAILED', + 'TIMEOUT':'TIMEOUT', + 'STOPPED':'QUEUED'}[task_report.status] + status = (task.retry_failed + and task.times_failed < task.retry_failed + and QUEUED or task.retry_failed==-1 + and QUEUED or st_mapping) + db(db.scheduler_task.id==task.task_id)\ + (db.scheduler_task.status==RUNNING).update( + times_failed=db.scheduler_task.times_failed+1, + next_run_time = task.next_run_time, + status=status) + db.commit() + logging.info('task completed (%s)' % task_report.status) + break + except: + db.rollback() def adj_hibernation(self): if self.worker_status[0] == DISABLED: @@ -652,6 +713,7 @@ class Scheduler(MetaScheduler): db(sw.worker_name == self.worker_name).update(is_ticker = True) db(sw.worker_name != self.worker_name).update(is_ticker = False) logging.info("TICKER: I'm a ticker (%s)" % self.worker_name) + db.commit() return True else: logging.info("%s is a ticker, I'm a poor worker" % ticker.worker_name) @@ -801,4 +863,3 @@ def main(): if __name__=='__main__': main() -