better scheduler, thanks Niphlod and dhx

This commit is contained in:
mdipierro
2012-08-16 17:51:08 -05:00
parent a608df5a29
commit f4efdebeb3
2 changed files with 119 additions and 58 deletions

View File

@@ -1 +1 @@
Version 2.00.0 (2012-08-16 11:56:03) dev
Version 2.00.0 (2012-08-16 17:51:02) dev

View File

@@ -29,16 +29,16 @@ or
python scheduler.py -h
## schedule jobs using
http://127.0.0.1:8000/scheduler/appadmin/insert/db/scheduler_task
http://127.0.0.1:8000/myapp/appadmin/insert/db/scheduler_task
## monitor scheduled jobs
http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_task.id>0
http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_task.id>0
## view completed jobs
http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_run.id>0
http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_run.id>0
## view workers
http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_worker.id>0
http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_worker.id>0
## To install the scheduler as a permanent daemon on Linux (w/ Upstart), put the
## following into /etc/init/web2py-scheduler.conf:
@@ -63,7 +63,6 @@ import os
import time
import multiprocessing
import sys
import signal
import cStringIO
import threading
import traceback
@@ -73,6 +72,7 @@ import datetime
import logging
import optparse
import types
import Queue
if 'WEB2PY_PATH' in os.environ:
sys.path.append(os.environ['WEB2PY_PATH'])
@@ -107,8 +107,9 @@ EXPIRED = 'EXPIRED'
SECONDS = 1
HEARTBEAT = 3*SECONDS
MAXHIBERNATION = 10
CLEAROUT = '!clear!'
CALLABLETYPES = (types.LambdaType, types.FunctionType,
CALLABLETYPES = (types.LambdaType, types.FunctionType,
types.BuiltinFunctionType,
types.MethodType, types.BuiltinMethodType)
@@ -170,10 +171,27 @@ def _decode_dict(dct):
newdict[k] = v
return newdict
def executor(queue,task):
def executor(queue,task, out):
""" the background process """
logging.debug(' task started')
stdout, sys.stdout = sys.stdout, cStringIO.StringIO()
class LogOutput(object):
"""Facility to log output at intervals"""
def __init__(self, out_queue):
self.out_queue = out_queue
self.stdout = sys.stdout
sys.stdout = self
self.istr = ""
def __del__(self):
sys.stdout = self.stdout
def write(self,data):
self.out_queue.put(data)
self.istr += data
def getvalue(self):
return self.istr
#stdout, sys.stdout = sys.stdout, cStringIO.StringIO()
stdout = LogOutput(out)
try:
if task.app:
os.chdir(os.environ['WEB2PY_PATH'])
@@ -206,17 +224,18 @@ def executor(queue,task):
result = eval(task.function)(
*loads(task.args, object_hook=_decode_dict),
**loads(task.vars, object_hook=_decode_dict))
stdout, sys.stdout = sys.stdout, stdout
#stdout, sys.stdout = sys.stdout, stdout
sys.stdout = stdout.stdout
queue.put(TaskReport(COMPLETED, result,stdout.getvalue()))
except BaseException,e:
sys.stdout = stdout
sys.stdout = stdout.stdout
tb = traceback.format_exc()
queue.put(TaskReport(FAILED,tb=tb))
queue.put(TaskReport(FAILED,tb=tb, output=stdout.getvalue()))
class MetaScheduler(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.process = None # the backround process
self.process = None # the background process
self.have_heartbeat = True # set to False to kill
self.empty_runs = 0
@@ -228,13 +247,33 @@ class MetaScheduler(threading.Thread):
('timeout',None,None)
('terminated',None,None)
"""
db = self.db
sr = db.scheduler_run
out = multiprocessing.Queue()
queue = multiprocessing.Queue(maxsize=1)
p = multiprocessing.Process(target=executor,args=(queue,task))
p = multiprocessing.Process(target=executor,args=(queue,task,out))
self.process = p
logging.debug(' task starting')
p.start()
try:
p.join(task.timeout)
if task.sync_output > 0:
task_output = ""
start = time.time()
while p.is_alive() and (time.time()-start < task.timeout):
p.join(timeout=task.sync_output)
tout = ""
while not out.empty():
tout += out.get()
if tout:
logging.debug(' partial output: "%s"' % str(tout))
if CLEAROUT in tout:
task_output = tout[tout.rfind(CLEAROUT)+len(CLEAROUT):]
else:
task_output += tout
db(sr.id==task.run_id).update(output = task_output)
db.commit()
else:
p.join(task.timeout)
except:
p.terminate()
p.join()
@@ -243,9 +282,13 @@ class MetaScheduler(threading.Thread):
return TaskReport(STOPPED)
if p.is_alive():
p.terminate()
p.join()
logging.debug(' task timeout')
return TaskReport(TIMEOUT)
try:
tr = queue.get(timeout=2)
tr.status = TIMEOUT
except Queue.Empty:
tr = TaskReport(TIMEOUT)
return tr
elif queue.empty():
self.have_heartbeat = False
logging.debug(' task stopped')
@@ -407,6 +450,8 @@ class Scheduler(MetaScheduler):
Field('retry_failed', 'integer', default=0, comment="-1=unlimited"),
Field('period','integer',default=60,comment='seconds'),
Field('timeout','integer',default=60,comment='seconds'),
Field('sync_output', 'integer', default=0,
comment="update output every n sec: 0=never"),
Field('times_run','integer',default=0,writable=False),
Field('times_failed','integer',default=0,writable=False),
Field('last_run_time','datetime',writable=False,readable=False),
@@ -483,6 +528,7 @@ class Scheduler(MetaScheduler):
db.rollback()
logging.error('TICKER: error assigning tasks')
return None
db.commit()
grabbed = db(ts.assigned_worker_name==self.worker_name)\
(ts.status==ASSIGNED)
@@ -528,50 +574,65 @@ class Scheduler(MetaScheduler):
times_run = times_run,
stop_time = task.stop_time,
retry_failed = task.retry_failed,
times_failed = task.times_failed)
times_failed = task.times_failed,
sync_output = task.sync_output)
def report_task(self,task,task_report):
db = self.db
now = self.now()
if not self.discard_results:
if task_report.result != 'null' or task_report.tb:
#result is 'null' as a string if task completed
#if it's stopped it's None as NoneType, so we record
#the STOPPED "run" anyway
logging.debug(' recording task report in db (%s)' % task_report.status)
db(db.scheduler_run.id==task.run_id).update(
status = task_report.status,
stop_time = now,
result = task_report.result,
output = task_report.output,
traceback = task_report.tb)
else:
logging.debug(' deleting task report in db because of no result')
db(db.scheduler_run.id==task.run_id).delete()
is_expired = task.stop_time and task.next_run_time > task.stop_time and True or False
status = (task.run_again and is_expired and EXPIRED
or task.run_again and not is_expired and QUEUED or COMPLETED)
if task_report.status == COMPLETED:
d = dict(status = status,
next_run_time = task.next_run_time,
times_run = task.times_run,
times_failed = 0 #reset times_failed counter for the next run
)
db(db.scheduler_task.id==task.task_id)\
(db.scheduler_task.status==RUNNING).update(**d)
else:
st_mapping = {'FAILED':'FAILED',
'TIMEOUT':'TIMEOUT',
'STOPPED':'QUEUED'}[task_report.status]
status = (task.retry_failed and task.times_failed < task.retry_failed
and QUEUED or task.retry_failed==-1 and QUEUED or st_mapping)
db(db.scheduler_task.id==task.task_id)\
(db.scheduler_task.status==RUNNING).update(
times_failed=db.scheduler_task.times_failed+1,
next_run_time = task.next_run_time,
status=status)
db.commit()
logging.info('task completed (%s)' % task_report.status)
while True:
try:
if not self.discard_results:
if task_report.result != 'null' or task_report.tb:
#result is 'null' as a string if task completed
#if it's stopped it's None as NoneType, so we record
#the STOPPED "run" anyway
logging.debug(' recording task report in db (%s)' % task_report.status)
#CLEAROUT clears the output
tout = task_report.output
if tout and CLEAROUT in tout:
tout = tout[tout.rfind(CLEAROUT)+len(CLEAROUT):]
db(db.scheduler_run.id==task.run_id).update(
status = task_report.status,
stop_time = now,
result = task_report.result,
output = tout,
traceback = task_report.tb)
else:
logging.debug(' deleting task report in db because of no result')
db(db.scheduler_run.id==task.run_id).delete()
is_expired = (task.stop_time
and task.next_run_time > task.stop_time
and True or False)
status = (task.run_again and is_expired and EXPIRED
or task.run_again and not is_expired
and QUEUED or COMPLETED)
if task_report.status == COMPLETED:
d = dict(status = status,
next_run_time = task.next_run_time,
times_run = task.times_run,
times_failed = 0
)
db(db.scheduler_task.id==task.task_id)\
(db.scheduler_task.status==RUNNING).update(**d)
else:
st_mapping = {'FAILED':'FAILED',
'TIMEOUT':'TIMEOUT',
'STOPPED':'QUEUED'}[task_report.status]
status = (task.retry_failed
and task.times_failed < task.retry_failed
and QUEUED or task.retry_failed==-1
and QUEUED or st_mapping)
db(db.scheduler_task.id==task.task_id)\
(db.scheduler_task.status==RUNNING).update(
times_failed=db.scheduler_task.times_failed+1,
next_run_time = task.next_run_time,
status=status)
db.commit()
logging.info('task completed (%s)' % task_report.status)
break
except:
db.rollback()
def adj_hibernation(self):
if self.worker_status[0] == DISABLED:
@@ -652,6 +713,7 @@ class Scheduler(MetaScheduler):
db(sw.worker_name == self.worker_name).update(is_ticker = True)
db(sw.worker_name != self.worker_name).update(is_ticker = False)
logging.info("TICKER: I'm a ticker (%s)" % self.worker_name)
db.commit()
return True
else:
logging.info("%s is a ticker, I'm a poor worker" % ticker.worker_name)
@@ -801,4 +863,3 @@ def main():
if __name__=='__main__':
main()