better scheduler, thanks Niphlod

This commit is contained in:
mdipierro
2012-07-09 21:08:57 -05:00
parent fed9258070
commit 0e09f84ab6
2 changed files with 80 additions and 42 deletions
+1 -1
View File
@@ -1 +1 @@
Version 2.00.0 (2012-07-09 17:41:27) dev
Version 2.00.0 (2012-07-09 21:08:54) dev
+79 -41
View File
@@ -86,7 +86,7 @@ except:
from simplejson import loads, dumps
from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET
from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET, IS_NOT_IN_DB
from gluon.utils import web2py_uuid
QUEUED = 'QUEUED'
@@ -97,8 +97,9 @@ FAILED = 'FAILED'
TIMEOUT = 'TIMEOUT'
STOPPED = 'STOPPED'
ACTIVE = 'ACTIVE'
INACTIVE = 'INACTIVE'
TERMINATE = 'TERMINATE'
DISABLED = 'DISABLED'
KILL = 'KILL'
EXPIRED = 'EXPIRED'
SECONDS = 1
HEARTBEAT = 3*SECONDS
@@ -183,7 +184,7 @@ def executor(queue,task):
f = task.function
# First look for the func in tasks, else look in models
_function = current._scheduler.tasks.get(f) or _env.get(f)
assert _function, 'Function %s not found in scheduler\'s environmen
assert _function, "Function %s not found in scheduler's environment"
globals().update(_env)
args = loads(task.args)
vars = loads(task.vars, object_hook=_decode_dict)
@@ -226,7 +227,7 @@ class MetaScheduler(threading.Thread):
p.terminate()
p.join()
self.have_heartbeat = False
logging.debug(' task stopped')
logging.debug(' task stopped by general exception')
return TaskReport(STOPPED)
if p.is_alive():
p.terminate()
@@ -246,6 +247,10 @@ class MetaScheduler(threading.Thread):
self.have_heartbeat = False
self.terminate_process()
def give_up(self):
logging.info('Giving up as soon as possible!')
self.have_heartbeat = False
def terminate_process(self):
try:
self.process.terminate()
@@ -293,7 +298,7 @@ class MetaScheduler(threading.Thread):
else:
self.empty_runs += 1
logging.debug('sleeping...')
if self.max_empty_runs <> 0:
if self.max_empty_runs != 0:
logging.debug('empty runs %s/%s', self.empty_runs, self.max_empty_runs)
if self.empty_runs >= self.max_empty_runs:
logging.info('empty runs limit reached, killing myself')
@@ -305,7 +310,7 @@ class MetaScheduler(threading.Thread):
TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED, EXPIRED)
RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED)
WORKER_STATUS = (ACTIVE,INACTIVE,DISABLED)
WORKER_STATUS = (ACTIVE, DISABLED, TERMINATE, KILL)
class TYPE(object):
"""
@@ -333,7 +338,8 @@ class TYPE(object):
class Scheduler(MetaScheduler):
def __init__(self,db,tasks={},migrate=True,
worker_name=None,group_names=None,heartbeat=HEARTBEAT,max_empty_runs=0):
worker_name=None,group_names=None,heartbeat=HEARTBEAT,
max_empty_runs=0, discard_results=False):
MetaScheduler.__init__(self)
@@ -348,6 +354,7 @@ class Scheduler(MetaScheduler):
#hibernation (i.e. when someone stop the
#worker acting on the scheduler_worker table)
self.max_empty_runs = max_empty_runs
self.discard_results = discard_results
self.is_a_ticker = False
self.do_assign_tasks = False
@@ -370,25 +377,30 @@ class Scheduler(MetaScheduler):
Field('status',requires=IS_IN_SET(TASK_STATUS),
default=QUEUED,writable=False),
Field('function_name',
requires=IS_IN_SET(sorted(self.tasks.keys())) \
if self.tasks else DEFAULT),
requires=IS_IN_SET(sorted(self.tasks.keys()))
if self.tasks else DEFAULT),
Field('uuid', requires=IS_NOT_IN_DB(db, 'scheduler_task.uuid'),
unique=True, default=web2py_uuid),
Field('args','text',default='[]',requires=TYPE(list)),
Field('vars','text',default='{}',requires=TYPE(dict)),
Field('enabled','boolean',default=True),
Field('start_time','datetime',default=now),
Field('next_run_time','datetime',default=now),
Field('stop_time','datetime'),
Field('repeats','integer',default=1,comment="0=unlimted"),
Field('repeats','integer',default=1,comment="0=unlimited"),
Field('repeats_failed', 'integer', default=1, comment="0=unlimited"),
Field('period','integer',default=60,comment='seconds'),
Field('timeout','integer',default=60,comment='seconds'),
Field('times_run','integer',default=0,writable=False),
Field('times_failed','integer',default=0,writable=False),
Field('last_run_time','datetime',writable=False,readable=False),
Field('assigned_worker_name',default='',writable=False),
migrate=migrate,format='%(task_name)s')
if hasattr(current,'request'):
db.scheduler_task.application_name.default = \
'%s/%s' % (current.request.application,
current.request.controller)
db.scheduler_task.application_name.default= '%s/%s' % (
current.request.application, current.request.controller
)
db.define_table(
'scheduler_run',
Field('scheduler_task','reference scheduler_task'),
@@ -403,7 +415,7 @@ class Scheduler(MetaScheduler):
db.define_table(
'scheduler_worker',
Field('worker_name'),
Field('worker_name', unique=True),
Field('first_heartbeat','datetime'),
Field('last_heartbeat','datetime'),
Field('status',requires=IS_IN_SET(WORKER_STATUS)),
@@ -428,13 +440,14 @@ class Scheduler(MetaScheduler):
else:
self.empty_runs += 1
logging.debug('sleeping...')
if self.max_empty_runs <> 0:
if self.max_empty_runs != 0:
logging.debug('empty runs %s/%s', self.empty_runs, self.max_empty_runs)
if self.empty_runs >= self.max_empty_runs:
logging.info('empty runs limit reached, killing myself')
self.die()
self.sleep()
except KeyboardInterrupt:
except (KeyboardInterrupt, SystemExit):
logging.info('catched')
self.die()
def pop_task(self):
@@ -471,8 +484,9 @@ class Scheduler(MetaScheduler):
run_again = True
else:
run_again = False
logging.debug(' new scheduler_run record')
while True:
run_id = 0
while True and not self.discard_results:
logging.debug(' new scheduler_run record')
try:
run_id = db.scheduler_run.insert(
scheduler_task = task.id,
@@ -495,35 +509,49 @@ class Scheduler(MetaScheduler):
run_again = run_again,
next_run_time=next_run_time,
times_run = times_run,
stop_time = task.stop_time)
stop_time = task.stop_time,
repeats_failed = task.repeats_failed,
times_failed = task.times_failed)
def report_task(self,task,task_report):
logging.debug(' recording task report in db (%s)' % task_report.status)
db = self.db
db(db.scheduler_run.id==task.run_id).update(
status = task_report.status,
stop_time = datetime.datetime.now(),
result = task_report.result,
output = task_report.output,
traceback = task_report.tb)
if not self.discard_results:
if task_report.result != 'null' or task_report.tb:
#result is 'null' as a string if task completed
#if it's stopped it's None as NoneType, so we record
#the STOPPED "run" anyway
logging.debug(' recording task report in db (%s)' % task_report.status)
db(db.scheduler_run.id==task.run_id).update(
status = task_report.status,
stop_time = datetime.datetime.now(),
result = task_report.result,
output = task_report.output,
traceback = task_report.tb)
else:
logging.debug(' deleting task report in db because of no result')
db(db.scheduler_run.id==task.run_id).delete()
is_expired = task.stop_time and task.next_run_time > task.stop_time and True or False
status = (task.run_again and is_expired and EXPIRED
or task.run_again and not is_expired and QUEUED or COMPLETED)
if task_report.status == COMPLETED:
d = dict(status = status,
next_run_time = task.next_run_time,
times_run = task.times_run)
#I'd like to know who worked my task, reviewing some logs...
#,assigned_worker_name = '')
times_run = task.times_run,
times_failed = 0 #reset times_failed counter for the next run
)
db(db.scheduler_task.id==task.task_id)\
(db.scheduler_task.status==RUNNING).update(**d)
else:
d = dict(
#same as before...
#assigned_worker_name = '',
status = {'FAILED':'FAILED',
st_mapping = {'FAILED':'FAILED',
'TIMEOUT':'TIMEOUT',
'STOPPED':'QUEUED'}[task_report.status])
db(db.scheduler_task.id==task.task_id)\
(db.scheduler_task.status==RUNNING).update(**d)
'STOPPED':'QUEUED'}[task_report.status]
status = (task.repeats_failed and task.times_failed + 1 < task.repeats_failed
and QUEUED or task.repeats_failed==0 and QUEUED or st_mapping)
db(db.scheduler_task.id==task.task_id)\
(db.scheduler_task.status==RUNNING).update(
times_failed=db.scheduler_task.times_failed+1,
next_run_time = task.next_run_time,
status=status)
db.commit()
logging.info('task completed (%s)' % task_report.status)
@@ -557,6 +585,16 @@ class Scheduler(MetaScheduler):
logging.debug('........recording heartbeat')
db(sw.worker_name==self.worker_name).update(
last_heartbeat = now)
elif mybackedstatus.status == TERMINATE:
self.worker_status = TERMINATE, self.worker_status[1]
logging.debug("Waiting to terminate the current task")
self.give_up()
return
elif mybackedstatus.status == KILL:
self.worker_status = KILL, self.worker_status[1]
self.die()
else:
logging.debug('........recording heartbeat')
db(sw.worker_name==self.worker_name).update(
@@ -570,7 +608,7 @@ class Scheduler(MetaScheduler):
logging.debug(' freeing workers that have not sent heartbeat')
inactive_workers = db(
((sw.last_heartbeat<expiration) & (sw.status == ACTIVE)) |
((sw.last_heartbeat<departure) & (sw.status == DISABLED))
((sw.last_heartbeat<departure) & (sw.status != ACTIVE))
)
db(st.assigned_worker_name.belongs(
inactive_workers._select(sw.worker_name)))\
@@ -578,7 +616,7 @@ class Scheduler(MetaScheduler):
.update(assigned_worker_name='',status=QUEUED)
inactive_workers.delete()
self.is_a_ticker = self.being_a_ticker()
if self.worker_status[0] <> DISABLED:
if self.worker_status[0] == ACTIVE:
self.do_assign_tasks = True
except:
pass
@@ -591,10 +629,10 @@ class Scheduler(MetaScheduler):
def being_a_ticker(self):
db = self.db_thread
sw = db.scheduler_worker
ticker = db((sw.worker_name <> self.worker_name) & (sw.is_ticker == True) & (sw.status == ACTIVE)).select().first()
ticker = db((sw.worker_name != self.worker_name) & (sw.is_ticker == True) & (sw.status == ACTIVE)).select().first()
if not ticker:
db(sw.worker_name == self.worker_name).update(is_ticker = True)
db(sw.worker_name <> self.worker_name).update(is_ticker = False)
db(sw.worker_name != self.worker_name).update(is_ticker = False)
logging.info("TICKER: I'm a ticker (%s)" % self.worker_name)
return True
else:
@@ -605,7 +643,7 @@ class Scheduler(MetaScheduler):
db = self.db
sw, ts = db.scheduler_worker, db.scheduler_task
now = datetime.datetime.now()
all_workers = db(sw.status <> DISABLED).select()
all_workers = db(sw.status == ACTIVE).select()
#build workers as dict of groups
wkgroups = {}
for w in all_workers: